Skip to content

Kafka bindings for Kotlin `suspend`, and Kafka streaming operators for KotlinX Flow.

License

Notifications You must be signed in to change notification settings

nomisRev/kotlin-kafka

Folders and files

NameName
Last commit message
Last commit date

Latest commit

c0c8f23 · Jul 2, 2022
Jul 2, 2022
Jul 2, 2022
Apr 4, 2022
Jul 2, 2022
Jul 2, 2022
Nov 21, 2021
Nov 15, 2021
Nov 15, 2021
Jul 1, 2022
Nov 15, 2021
Jul 2, 2022
Jul 2, 2022
Nov 21, 2021
Nov 15, 2021
Jul 1, 2022
Nov 22, 2021
Jul 2, 2022
Nov 21, 2021
Nov 21, 2021

Repository files navigation

Module kotlin-kafka

Maven Central

This project is still under development, andd started as a playground where I was playing around with Kafka in Kotlin and the Kafka SDK whilst reading the Kafka book Definite Guide from Confluent. https://www.confluent.io/resources/kafka-the-definitive-guide-v2/

Rationale

At the time of starting this repository I didn't find any bindings between Kafka SDK and Kotlin suspension. These operators should be implemented low-level, so they can guarantee correct cancellation support, and high optimised runtimes.

Goals

  • Lean Core library built on top of Kotlin Std & KotlinX Coroutines (possible extensions with Arrow in additional module)
  • Extensions to easily operate over the Kafka SDK with KotlinX Coroutines and suspend.
  • Flow based operators, so you can easily compose KotlinX Flow based Kafka programs
  • example for testing Kafka with Test Containers in Kotlin.

Adding Dependency

Simply add the following dependency as implementation in the build.gradle dependencies` block.

dependencies {
  implementation("io.github.nomisrev:kotlin-kafka:0.1")
}

Example

@JvmInline
value class Key(val index: Int)

@JvmInline
value class Message(val content: String)

fun main(): Unit =
  runBlocking(Default) {
    val topicName = "test-topic"
    val msgCount = 10
    val kafka = Kafka.container

    Admin(AdminSettings(kafka.bootstrapServers)).use { client ->
      client.createTopic(NewTopic(topicName, 1, 1))
    }

    coroutineScope { // Run produces and consumer in a single scope
      launch { // Send 20 messages, and then close the producer
        val settings: ProducerSettings<Key, Message> =
          ProducerSettings(
            kafka.bootstrapServers,
            IntegerSerializer().imap { key: Key -> key.index },
            StringSerializer().imap { msg: Message -> msg.content },
            Acks.All
          )
        (1..msgCount)
          .map { index -> ProducerRecord(topicName, Key(index), Message("msg: $index")) }
          .asFlow()
          .produce(settings)
          .collect(::println)
      }

      launch { // Consume 20 messages as a stream, and then close the consumer
        val settings: ConsumerSettings<Key, Message> =
          ConsumerSettings(
            kafka.bootstrapServers,
            IntegerDeserializer().map(::Key),
            StringDeserializer().map(::Message),
            groupId = UUID.randomUUID().toString(),
            autoOffsetReset = AutoOffsetReset.Earliest
          )
        kafkaConsumer(settings)
          .subscribeTo(topicName)
          .take(msgCount)
          .map { "${it.key()} -> ${it.value()}" }
          .collect(::println)
      }
    }
  }

You can get the full code here.

test-topic-0@0
test-topic-0@1
test-topic-0@2
test-topic-0@3
test-topic-0@4
test-topic-0@5
test-topic-0@6
test-topic-0@7
test-topic-0@8
test-topic-0@9
Key(index=1) -> Message(content=msg: 1)
Key(index=2) -> Message(content=msg: 2)
Key(index=3) -> Message(content=msg: 3)
Key(index=4) -> Message(content=msg: 4)
Key(index=5) -> Message(content=msg: 5)
Key(index=6) -> Message(content=msg: 6)
Key(index=7) -> Message(content=msg: 7)
Key(index=8) -> Message(content=msg: 8)
Key(index=9) -> Message(content=msg: 9)
Key(index=10) -> Message(content=msg: 10)