Skip to content

Commit ad2b742

Browse files
committed
adding retryable logic on consumer, working on #25
1 parent 307e614 commit ad2b742

File tree

11 files changed

+92
-59
lines changed

11 files changed

+92
-59
lines changed

command-producer-consumer/pom.xml

+7
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,13 @@
9292
<scope>test</scope>
9393
</dependency>
9494

95+
<dependency>
96+
<groupId>org.awaitility</groupId>
97+
<artifactId>awaitility</artifactId>
98+
<version>4.0.3</version>
99+
<scope>test</scope>
100+
</dependency>
101+
95102
</dependencies>
96103

97104
<build>

command-producer-consumer/src/main/kotlin/org/learning/by/example/petstore/command/consumer/CommandsConsumer.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@ package org.learning.by.example.petstore.command.consumer
22

33
import org.learning.by.example.petstore.command.Command
44
import reactor.core.publisher.Flux
5+
import reactor.core.publisher.Mono
56

67
interface CommandsConsumer {
7-
fun receiveCommands(): Flux<Command>
8+
fun receiveCommands(sink: (Command) -> Mono<Void>): Flux<Void>
89
}

command-producer-consumer/src/main/kotlin/org/learning/by/example/petstore/command/consumer/CommandsConsumerConfig.kt

+3-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@ data class CommandsConsumerConfig(
1515
val clientId: String,
1616
val groupId: String,
1717
val offsetEarliest: String,
18-
val timeoutMS: Int
18+
val timeoutMS: Int,
19+
val retries: CommandsConsumerConfigRetries
20+
1921
) {
2022
companion object Constants {
2123
const val CONSUMER_CONFIG_PREFIX = "service.commands.consumer"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
package org.learning.by.example.petstore.command.consumer
2+
3+
data class CommandsConsumerConfigRetries(val times: Long, val ms: Long)

command-producer-consumer/src/main/kotlin/org/learning/by/example/petstore/command/consumer/CommandsConsumerImpl.kt

+20-8
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,14 @@ import org.apache.kafka.common.KafkaException
77
import org.apache.kafka.common.serialization.StringDeserializer
88
import org.learning.by.example.petstore.command.Command
99
import reactor.core.publisher.Mono
10+
import reactor.core.scheduler.Schedulers
1011
import reactor.kafka.receiver.KafkaReceiver
1112
import reactor.kafka.receiver.ReceiverOptions
1213
import reactor.kotlin.core.publisher.toMono
14+
import reactor.util.retry.Retry
15+
import java.time.Duration
1316

14-
class CommandsConsumerImpl(commandsConsumerConfig: CommandsConsumerConfig, objectMapper: ObjectMapper) :
17+
class CommandsConsumerImpl(private val commandsConsumerConfig: CommandsConsumerConfig, objectMapper: ObjectMapper) :
1518
CommandsConsumer {
1619
private val options = hashMapOf<String, Any>(
1720
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to commandsConsumerConfig.bootstrapServer,
@@ -24,6 +27,8 @@ class CommandsConsumerImpl(commandsConsumerConfig: CommandsConsumerConfig, objec
2427
CommandsDeserializer.OBJECT_MAPPER_CONFIG_KEY to objectMapper
2528
)
2629

30+
private val scheduler = Schedulers.newSingle("sample", true)
31+
2732
private val receiver: KafkaReceiver<String, Command> = KafkaReceiver.create(
2833
ReceiverOptions.create<String, Command>(options).subscription(setOf(commandsConsumerConfig.topic))
2934
)
@@ -34,15 +39,22 @@ class CommandsConsumerImpl(commandsConsumerConfig: CommandsConsumerConfig, objec
3439
// To avoid this we will ask first if we can connect to Kafka. However if we can connect, and them subscribed,
3540
// we may get disconnected latter, but the KafkaConsumer will handle that recovery automatically and we do not
3641
// need to check it in the future connections.
37-
override fun receiveCommands() = isKafkaAvailable.flatMapMany {
38-
receiver.receive().flatMap {
39-
val receiverOffset = it.receiverOffset()
40-
receiverOffset.acknowledge()
41-
it.value().toMono()
42-
}
42+
override fun receiveCommands(sink: (Command) -> Mono<Void>) = isKafkaAvailable.flatMapMany {
43+
receiver.receive()
44+
.publishOn(scheduler)
45+
.concatMap {
46+
sink(it.value()).thenEmpty(it.receiverOffset().commit())
47+
}.retryWhen(
48+
Retry.backoff(
49+
commandsConsumerConfig.retries.times,
50+
Duration.ofMillis(commandsConsumerConfig.retries.ms)
51+
).filter {
52+
it !is ErrorDeserializingObject
53+
}
54+
)
4355
}
4456

45-
// To known if we can't connect to Kafka we could't just get the topics list, only during a set of configurable
57+
// To known if we can't connect to Kafka we couldn't just get the topics list, only during a set of configurable
4658
// time, if we can't get a response in that time we will return an error.
4759
//
4860
// We don't need to check what topics they are since we are not really interested in them.

command-producer-consumer/src/test/kotlin/org/learning/by/example/petstore/command/consumer/CommandsConsumerImplTest.kt

+40-24
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,35 @@
11
package org.learning.by.example.petstore.command.consumer
22

33
import org.assertj.core.api.Assertions.assertThat
4+
import org.awaitility.Awaitility.await
45
import org.junit.jupiter.api.BeforeEach
56
import org.junit.jupiter.api.MethodOrderer
67
import org.junit.jupiter.api.Order
78
import org.junit.jupiter.api.Test
89
import org.junit.jupiter.api.TestMethodOrder
10+
import org.learning.by.example.petstore.command.Command
911
import org.learning.by.example.petstore.command.consumer.CommandsConsumerConfig.Constants.CONSUMER_CONFIG_BOOSTRAP_SERVER
1012
import org.learning.by.example.petstore.command.test.CustomKafkaContainer
1113
import org.springframework.beans.factory.annotation.Autowired
1214
import org.springframework.boot.test.context.SpringBootTest
15+
import org.springframework.test.annotation.DirtiesContext
1316
import org.springframework.test.context.ActiveProfiles
1417
import org.springframework.test.context.DynamicPropertyRegistry
1518
import org.springframework.test.context.DynamicPropertySource
1619
import org.testcontainers.junit.jupiter.Container
1720
import org.testcontainers.junit.jupiter.Testcontainers
21+
import reactor.core.publisher.Mono
1822
import reactor.test.StepVerifier
1923
import java.time.Duration
2024
import java.time.Instant
2125
import java.util.UUID
26+
import java.util.concurrent.TimeUnit
2227

2328
@SpringBootTest
2429
@Testcontainers
2530
@ActiveProfiles("consumer")
2631
@TestMethodOrder(MethodOrderer.OrderAnnotation::class)
32+
@DirtiesContext
2733
internal class CommandsConsumerImplTest(
2834
@Autowired val commandsConsumerImpl: CommandsConsumerImpl,
2935
@Autowired val commandsConsumerConfig: CommandsConsumerConfig
@@ -79,34 +85,44 @@ internal class CommandsConsumerImplTest(
7985
assertThat(KAFKA_CONTAINER.sendMessage(commandsConsumerConfig.topic, FIRST_COMMAND)).isTrue()
8086
assertThat(KAFKA_CONTAINER.sendMessage(commandsConsumerConfig.topic, SECOND_COMMAND)).isTrue()
8187

82-
StepVerifier.create(commandsConsumerImpl.receiveCommands())
83-
.expectSubscription()
84-
.thenRequest(Long.MAX_VALUE)
85-
.consumeNextWith {
86-
assertThat(it.commandName).isEqualTo("example command 1")
87-
assertThat(it.id).isEqualTo(UUID.fromString("4cb5294b-1034-4bc4-9b3d-899adb782d89"))
88-
assertThat(it.timestamp).isEqualTo(Instant.parse("2020-06-28T08:53:35.168283Z"))
89-
assertThat(it.get<String>("attribute1")).isEqualTo("value1")
90-
assertThat(it.get<Int>("attribute2")).isEqualTo(123)
91-
}
92-
.consumeNextWith {
93-
assertThat(it.commandName).isEqualTo("example command 2")
94-
assertThat(it.id).isEqualTo(UUID.fromString("4cb5294b-1034-4bc4-9b3d-542adb232a21"))
95-
assertThat(it.timestamp).isEqualTo(Instant.parse("2020-06-28T11:22:13.456732Z"))
96-
assertThat(it.get<Boolean>("attribute1")).isEqualTo(false)
97-
assertThat(it.get<Double>("attribute2")).isEqualTo(125.5)
98-
}
99-
.expectNextCount(0L)
100-
.thenCancel()
101-
.verify(Duration.ofSeconds(5L))
88+
val commands = arrayListOf<Command>()
89+
90+
val disposable = commandsConsumerImpl.receiveCommands {
91+
commands.add(it)
92+
Mono.empty()
93+
}.subscribe()
94+
95+
await().atMost(30, TimeUnit.SECONDS).until {
96+
commands.size == 2
97+
}
98+
99+
disposable.dispose()
100+
101+
assertThat(commands).hasSize(2)
102+
103+
with(commands[0]) {
104+
assertThat(commandName).isEqualTo("example command 1")
105+
assertThat(id).isEqualTo(UUID.fromString("4cb5294b-1034-4bc4-9b3d-899adb782d89"))
106+
assertThat(timestamp).isEqualTo(Instant.parse("2020-06-28T08:53:35.168283Z"))
107+
assertThat(get<String>("attribute1")).isEqualTo("value1")
108+
assertThat(get<Int>("attribute2")).isEqualTo(123)
109+
}
110+
111+
with(commands[1]) {
112+
assertThat(commandName).isEqualTo("example command 2")
113+
assertThat(id).isEqualTo(UUID.fromString("4cb5294b-1034-4bc4-9b3d-542adb232a21"))
114+
assertThat(timestamp).isEqualTo(Instant.parse("2020-06-28T11:22:13.456732Z"))
115+
assertThat(get<Boolean>("attribute1")).isEqualTo(false)
116+
assertThat(get<Double>("attribute2")).isEqualTo(125.5)
117+
}
102118
}
103119

104120
@Test
105121
@Order(1)
106122
fun `we should error on invalid command`() {
107123
assertThat(KAFKA_CONTAINER.sendMessage(commandsConsumerConfig.topic, INVALID_COMMAND)).isTrue()
108124

109-
StepVerifier.create(commandsConsumerImpl.receiveCommands())
125+
StepVerifier.create(commandsConsumerImpl.receiveCommands { Mono.empty() })
110126
.expectSubscription()
111127
.thenRequest(Long.MAX_VALUE)
112128
.expectErrorMatches {
@@ -120,7 +136,7 @@ internal class CommandsConsumerImplTest(
120136
fun `we should error on empty command`() {
121137
assertThat(KAFKA_CONTAINER.sendMessage(commandsConsumerConfig.topic, EMPTY_COMMAND)).isTrue()
122138

123-
StepVerifier.create(commandsConsumerImpl.receiveCommands())
139+
StepVerifier.create(commandsConsumerImpl.receiveCommands { Mono.empty() })
124140
.expectSubscription()
125141
.thenRequest(Long.MAX_VALUE)
126142
.expectErrorMatches {
@@ -140,7 +156,7 @@ internal class CommandsConsumerImplTest(
140156

141157
@Order(2)
142158
@Test
143-
fun `we can get check if noy alive`() {
159+
fun `we can get check if not alive`() {
144160
if (KAFKA_CONTAINER.isRunning) KAFKA_CONTAINER.stop()
145161

146162
StepVerifier.create(commandsConsumerImpl.isKafkaAvailable)
@@ -156,7 +172,7 @@ internal class CommandsConsumerImplTest(
156172
fun `we should error on failure to connect`() {
157173
if (KAFKA_CONTAINER.isRunning) KAFKA_CONTAINER.stop()
158174

159-
StepVerifier.create(commandsConsumerImpl.receiveCommands())
175+
StepVerifier.create(commandsConsumerImpl.receiveCommands { Mono.empty() })
160176
.expectSubscription()
161177
.thenRequest(Long.MAX_VALUE)
162178
.expectErrorMatches {

command-producer-consumer/src/test/resources/application-consumer.yml

+3
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,6 @@ service:
77
group-id: pet_commands_consumers
88
offset-earliest: earliest
99
timeout-ms: 500
10+
retries:
11+
times: 5
12+
ms: 300000

pet-stream/src/main/kotlin/org/learning/by/example/petstore/petstream/service/sink/CommandSink.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ class CommandSink(val commandsConsumer: CommandsConsumer, val commandHandler: Co
1313

1414
@PostConstruct
1515
fun run() {
16-
disposable = commandsConsumer.receiveCommands().flatMap(commandHandler::handle).subscribe()
16+
disposable = commandsConsumer.receiveCommands(commandHandler::handle).subscribe()
1717
}
1818

1919
@PreDestroy

pet-stream/src/main/resources/application.yml

+4
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,9 @@ service:
1212
group-id: pet_commands_consumers
1313
offset-earliest: earliest
1414
timeout-ms: 500
15+
retries:
16+
times: 5
17+
ms: 300000
18+
1519
db:
1620
initialize: false

pet-stream/src/test/kotlin/org/learning/by/example/petstore/petstream/service/sink/CommandSinkTest.kt

+6-24
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,12 @@ import com.nhaarman.mockitokotlin2.verify
77
import com.nhaarman.mockitokotlin2.whenever
88
import org.junit.jupiter.api.Test
99
import org.learning.by.example.petstore.command.consumer.CommandsConsumer
10-
import org.learning.by.example.petstore.command.dsl.command
1110
import org.learning.by.example.petstore.petstream.service.handler.CommandHandler
1211
import org.learning.by.example.petstore.petstream.test.BasicTest
1312
import org.springframework.boot.test.context.SpringBootTest
1413
import org.springframework.boot.test.mock.mockito.MockBean
1514
import reactor.core.publisher.Mono
1615
import reactor.kotlin.core.publisher.toFlux
17-
import java.time.LocalDateTime
1816

1917
@SpringBootTest
2018
class CommandSinkTest : BasicTest() {
@@ -26,30 +24,14 @@ class CommandSinkTest : BasicTest() {
2624

2725
@Test
2826
fun `sink should sent to handler commands`() {
29-
val cmd1 = command("pet_create") {
30-
"name" value "name1"
31-
"category" value "category"
32-
"breed" value "breed"
33-
"vaccines" values listOf("vaccine1", "vaccine2")
34-
"dob" value LocalDateTime.now()
35-
"tags" values listOf("tag1", "tag2", "tag3")
36-
}
37-
val cmd2 = command("pet_create") {
38-
"name" value "name2"
39-
"category" value "category"
40-
"breed" value "breed"
41-
"vaccines" values listOf("vaccine2")
42-
"dob" value LocalDateTime.now()
43-
"tags" values listOf("tag1", "tag3")
44-
}
45-
46-
doReturn(listOf(cmd1, cmd2).toFlux()).whenever(consumer).receiveCommands()
27+
doReturn(listOf(Mono.empty<Void>(), Mono.empty<Void>()).toFlux()).whenever(consumer).receiveCommands(any())
4728
doReturn(Mono.empty<Void>()).whenever(handler).handle(any())
4829

49-
CommandSink(consumer, handler).run()
30+
val sink = CommandSink(consumer, handler)
31+
32+
sink.run()
33+
sink.end()
5034

51-
verify(handler, times(2)).handle(any())
52-
verify(handler, times(1)).handle(cmd1)
53-
verify(handler, times(1)).handle(cmd2)
35+
verify(consumer, times(1)).receiveCommands(handler::handle)
5436
}
5537
}

pet-stream/src/test/resources/application.yml

+3
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ service:
1212
group-id: pet_commands_consumers
1313
offset-earliest: earliest
1414
timeout-ms: 500
15+
retries:
16+
times: 5
17+
ms: 300000
1518
producer:
1619
bootstrap-server: localhost:9092
1720
topic: pet-commands

0 commit comments

Comments
 (0)