Skip to content

Commit c0c8f23

Browse files
committed
release 0.2.0
1 parent f7598eb commit c0c8f23

File tree

5 files changed

+92
-51
lines changed

5 files changed

+92
-51
lines changed

gradle.properties

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ kotlin.code.style=official
22

33
# Package definitions
44
projects.group=io.github.nomisrev
5-
projects.version=0.1.1-SNAPSHOT
5+
projects.version=0.2.0
66

77
# Project definitions
88
pom.name=kotlin-kafka

src/main/kotlin/io/github/nomisRev/kafka/KafkaFuture.kt

+1-2
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
@file:JvmName("KafkaFutureExt")
2+
23
package io.github.nomisRev.kafka
34

45
import kotlinx.coroutines.Deferred
5-
import kotlinx.coroutines.InternalCoroutinesApi
66
import kotlinx.coroutines.future.asDeferred
77
import kotlinx.coroutines.future.await
88
import org.apache.kafka.clients.admin.CreateTopicsResult
@@ -31,7 +31,6 @@ public suspend fun <T> KafkaFuture<T>.await(): T =
3131
*
3232
* The [KafkaFuture] is cancelled when the resulting deferred is cancelled.
3333
*/
34-
@OptIn(InternalCoroutinesApi::class)
3534
@Suppress("DeferredIsResult")
3635
public fun <T> KafkaFuture<T>.asDeferred(): Deferred<T> =
3736
toCompletionStage().asDeferred()

src/main/kotlin/io/github/nomisRev/kafka/Producer.kt

+47-48
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,9 @@ import org.apache.kafka.common.serialization.Serializer
5050
*/
5151
@FlowPreview
5252
public suspend fun <A, B> Flow<ProducerRecord<A, B>>.produce(
53-
settings: ProducerSettings<A, B>
53+
settings: ProducerSettings<A, B>,
5454
): Flow<RecordMetadata> =
55-
settings.kafkaProducer().flatMapConcat { producer ->
55+
kafkaProducer(settings).flatMapConcat { producer ->
5656
this@produce.map { record -> producer.sendAwait(record) }
5757
}
5858

@@ -75,7 +75,7 @@ public suspend fun <A, B> Flow<ProducerRecord<A, B>>.produce(
7575
* <!--- KNIT example-producer-02.kt -->
7676
*/
7777
public suspend fun <A, B> KafkaProducer<A, B>.sendAwait(
78-
record: ProducerRecord<A, B>
78+
record: ProducerRecord<A, B>,
7979
): RecordMetadata =
8080
suspendCoroutine { cont ->
8181
// Those can be a SerializationException when it fails to serialize the message,
@@ -87,23 +87,49 @@ public suspend fun <A, B> KafkaProducer<A, B>.sendAwait(
8787
}
8888
}
8989

90-
public fun <K, V> KafkaProducer(
91-
props: Properties,
92-
keyDeserializer: Serializer<K>,
93-
valueDeserializer: Serializer<V>
94-
): KafkaProducer<K, V> =
95-
KafkaProducer(props, keyDeserializer, valueDeserializer)
90+
/**
91+
* KafkaKafkaProducer for [K] - [V] which takes
92+
*/
93+
@Suppress("FunctionName")
94+
public fun <K, V> KafkaProducer(setting: ProducerSettings<K, V>): KafkaProducer<K, V> =
95+
KafkaProducer(setting.properties(), setting.keyDeserializer, setting.valueDeserializer)
9696

97+
/**
98+
* Will automatically close, and flush when finished streaming.
99+
* The [Flow] will close when the [KafkaProducer] is consumed from the [Flow].
100+
*
101+
* This means that the [KafkaProducer] will not be closed for a synchronous running stream, but
102+
* when running the [Flow] is offloaded in a separate Coroutine it's prone to be collected, closed
103+
* and flushed. In the example below we construct a producer stream that produces 100 indexed
104+
* messages.
105+
*
106+
* ```kotlin
107+
* fun <Key, Value> KafkaProducer<Key, Value>.produce(topicName: String, count: Int): Flow<Unit> =
108+
* (0..count).asFlow().map { sendAwait(ProducerRecord(topicName, "message #it")) }
109+
*
110+
* val producerStream = kafkaProducer(Properties(), StringSerializer(), StringSerializer())
111+
* .flatMapConcat { producer -> producer.produce("topic-name", 100) }
112+
* ```
113+
*
114+
* Here the `KafkaProducer` will only get collected (and closed/flushed) when all 100 messages
115+
* were produced.
116+
*
117+
* **DO NOT** If instead we'd do something like the following, where we offload in a buffer then
118+
* the `KafkaProducer` gets collected into the buffer and thus closed/flushed.
119+
*
120+
* ```kotlin
121+
* kafkaProducer(Properties(), StringSerializer(), StringSerializer()).buffer(10)
122+
* ```
123+
*/
97124
public fun <K, V> kafkaProducer(
98-
props: Properties,
99-
keyDeserializer: Serializer<K>,
100-
valueDeserializer: Serializer<V>
125+
setting: ProducerSettings<K, V>,
101126
): Flow<KafkaProducer<K, V>> = flow {
102-
val producer = KafkaProducer(props, keyDeserializer, valueDeserializer)
103-
try {
104-
producer.use { emit(it) }
105-
} finally {
106-
producer.flush()
127+
KafkaProducer(setting).use { producer ->
128+
try {
129+
emit(producer)
130+
} finally {
131+
producer.flush()
132+
}
107133
}
108134
}
109135

@@ -115,6 +141,9 @@ public enum class Acks(public val value: String) {
115141
}
116142

117143
/**
144+
* A type-safe constructor for [KafkaProducer] settings.
145+
* It forces you to specify the bootstrapServer, and serializers for [K] and [V].
146+
* These are the minimum requirements for constructing a valid [KafkaProducer].
118147
*
119148
* @see http://kafka.apache.org/documentation.html#producerconfigs
120149
*/
@@ -123,7 +152,7 @@ public data class ProducerSettings<K, V>(
123152
val keyDeserializer: Serializer<K>,
124153
val valueDeserializer: Serializer<V>,
125154
val acks: Acks = Acks.One,
126-
val other: Properties? = null
155+
val other: Properties? = null,
127156
) {
128157
public fun properties(): Properties =
129158
Properties().apply {
@@ -133,34 +162,4 @@ public data class ProducerSettings<K, V>(
133162
put(ProducerConfig.ACKS_CONFIG, acks.value)
134163
other?.let { putAll(other) }
135164
}
136-
137-
/**
138-
* Will automatically close, and flush when finished streaming.
139-
* The [Flow] will close when the [KafkaProducer] is consumed from the [Flow].
140-
*
141-
* This means that the [KafkaProducer] will not be closed for a synchronous running stream, but
142-
* when running the [Flow] is offloaded in a separate Coroutine it's prone to be collected, closed
143-
* and flushed. In the example below we construct a producer stream that produces 100 indexed
144-
* messages.
145-
*
146-
* ```kotlin
147-
* fun <Key, Value> KafkaProducer<Key, Value>.produce(topicName: String, count: Int): Flow<Unit> =
148-
* (0..count).asFlow().map { sendAwait(ProducerRecord(topicName, "message #it")) }
149-
*
150-
* val producerStream = kafkaProducer(Properties(), StringSerializer(), StringSerializer())
151-
* .flatMapConcat { producer -> producer.produce("topic-name", 100) }
152-
* ```
153-
*
154-
* Here the `KafkaProducer` will only get collected (and closed/flushed) when all 100 messages
155-
* were produced.
156-
*
157-
* **DO NOT** If instead we'd do something like the following, where we offload in a buffer then
158-
* the `KafkaProducer` gets collected into the buffer and thus closed/flushed.
159-
*
160-
* ```kotlin
161-
* kafkaProducer(Properties(), StringSerializer(), StringSerializer()).buffer(10)
162-
* ```
163-
*/
164-
public fun kafkaProducer(): Flow<KafkaProducer<K, V>> =
165-
kafkaProducer(properties(), keyDeserializer, valueDeserializer)
166165
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package io.github.nomisrev.kafka
2+
3+
import io.github.nomisRev.kafka.Acks
4+
import io.github.nomisRev.kafka.ProducerSettings
5+
import io.kotest.assertions.assertSoftly
6+
import io.kotest.core.spec.style.StringSpec
7+
import io.kotest.matchers.maps.shouldContainAll
8+
import io.kotest.matchers.shouldBe
9+
import io.kotest.property.Arb
10+
import io.kotest.property.arbitrary.enum
11+
import io.kotest.property.arbitrary.map
12+
import io.kotest.property.arbitrary.string
13+
import io.kotest.property.checkAll
14+
import org.apache.kafka.clients.producer.ProducerConfig
15+
import org.apache.kafka.common.serialization.StringSerializer
16+
import java.util.Properties
17+
18+
class ProducerSettingSpec : StringSpec({
19+
20+
"ProducerSettings Ack" {
21+
checkAll(
22+
Arb.string(),
23+
Arb.enum<Acks>(),
24+
Arb.map(Arb.string(), Arb.string())
25+
) { bootstrapServers, acks, map ->
26+
val settings = ProducerSettings<String, String>(
27+
bootstrapServers,
28+
StringSerializer(),
29+
StringSerializer(),
30+
acks = acks,
31+
other = Properties().apply {
32+
putAll(map)
33+
}
34+
)
35+
36+
assertSoftly(settings.properties()) {
37+
toMap().shouldContainAll(map as Map<Any, Any>)
38+
getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) shouldBe bootstrapServers
39+
getProperty(ProducerConfig.ACKS_CONFIG) shouldBe acks.value
40+
}
41+
}
42+
}
43+
})

0 commit comments

Comments
 (0)