Skip to content

Commit ae608c1

Browse files
authored
KAFKA-19042 Move PlaintextConsumerCallbackTest to client-integration-tests module (#19298)
Use Java to rewrite `PlaintextConsumerCallbackTest` by new test infra and move it to client-integration-tests module. Reviewers: TengYao Chi <kitingiao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
1 parent 73afcc9 commit ae608c1

File tree

2 files changed

+352
-175
lines changed

2 files changed

+352
-175
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,352 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.clients.consumer;
18+
19+
import org.apache.kafka.clients.producer.Producer;
20+
import org.apache.kafka.clients.producer.ProducerRecord;
21+
import org.apache.kafka.common.TopicPartition;
22+
import org.apache.kafka.common.record.TimestampType;
23+
import org.apache.kafka.common.test.ClusterInstance;
24+
import org.apache.kafka.common.test.TestUtils;
25+
import org.apache.kafka.common.test.api.ClusterTest;
26+
import org.apache.kafka.common.test.api.ClusterTestDefaults;
27+
import org.apache.kafka.common.test.api.Type;
28+
29+
import java.time.Duration;
30+
import java.util.ArrayList;
31+
import java.util.Collection;
32+
import java.util.List;
33+
import java.util.Locale;
34+
import java.util.Map;
35+
import java.util.concurrent.atomic.AtomicBoolean;
36+
import java.util.function.BiConsumer;
37+
38+
import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
39+
import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
40+
import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC;
41+
import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER;
42+
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
43+
import static org.junit.jupiter.api.Assertions.assertEquals;
44+
import static org.junit.jupiter.api.Assertions.assertThrows;
45+
import static org.junit.jupiter.api.Assertions.assertTrue;
46+
47+
@ClusterTestDefaults(
48+
types = {Type.KRAFT},
49+
brokers = 3
50+
)
51+
public class PlaintextConsumerCallbackTest {
52+
53+
private final ClusterInstance cluster;
54+
private final String topic = "topic";
55+
private final TopicPartition tp = new TopicPartition(topic, 0);
56+
57+
public PlaintextConsumerCallbackTest(ClusterInstance clusterInstance) {
58+
this.cluster = clusterInstance;
59+
}
60+
61+
@ClusterTest
62+
public void testClassicConsumerRebalanceListenerAssignOnPartitionsAssigned() throws InterruptedException {
63+
testRebalanceListenerAssignOnPartitionsAssigned(CLASSIC);
64+
}
65+
66+
@ClusterTest
67+
public void testAsyncConsumerRebalanceListenerAssignOnPartitionsAssigned() throws InterruptedException {
68+
testRebalanceListenerAssignOnPartitionsAssigned(CONSUMER);
69+
}
70+
71+
private void testRebalanceListenerAssignOnPartitionsAssigned(GroupProtocol groupProtocol) throws InterruptedException {
72+
try (var consumer = createConsumer(groupProtocol)) {
73+
triggerOnPartitionsAssigned(tp, consumer, (executeConsumer, partitions) -> {
74+
var e = assertThrows(IllegalStateException.class, () -> executeConsumer.assign(List.of(tp)));
75+
assertEquals("Subscription to topics, partitions and pattern are mutually exclusive", e.getMessage());
76+
});
77+
}
78+
}
79+
80+
@ClusterTest
81+
public void testClassicConsumerRebalanceListenerAssignmentOnPartitionsAssigned() throws InterruptedException {
82+
testRebalanceListenerAssignmentOnPartitionsAssigned(CLASSIC);
83+
}
84+
85+
@ClusterTest
86+
public void testAsyncConsumerRebalanceListenerAssignmentOnPartitionsAssigned() throws InterruptedException {
87+
testRebalanceListenerAssignmentOnPartitionsAssigned(CONSUMER);
88+
}
89+
90+
private void testRebalanceListenerAssignmentOnPartitionsAssigned(GroupProtocol groupProtocol) throws InterruptedException {
91+
try (var consumer = createConsumer(groupProtocol)) {
92+
triggerOnPartitionsAssigned(tp, consumer,
93+
(executeConsumer, partitions) -> assertTrue(executeConsumer.assignment().contains(tp))
94+
);
95+
}
96+
}
97+
98+
@ClusterTest
99+
public void testClassicConsumerRebalanceListenerBeginningOffsetsOnPartitionsAssigned() throws InterruptedException {
100+
testRebalanceListenerBeginningOffsetsOnPartitionsAssigned(CLASSIC);
101+
}
102+
103+
@ClusterTest
104+
public void testAsyncConsumerRebalanceListenerBeginningOffsetsOnPartitionsAssigned() throws InterruptedException {
105+
testRebalanceListenerBeginningOffsetsOnPartitionsAssigned(CONSUMER);
106+
}
107+
108+
private void testRebalanceListenerBeginningOffsetsOnPartitionsAssigned(GroupProtocol groupProtocol) throws InterruptedException {
109+
try (var consumer = createConsumer(groupProtocol)) {
110+
triggerOnPartitionsAssigned(tp, consumer, (executeConsumer, partitions) -> {
111+
var map = executeConsumer.beginningOffsets(List.of(tp));
112+
assertTrue(map.containsKey(tp));
113+
assertEquals(0L, map.get(tp));
114+
});
115+
}
116+
}
117+
118+
@ClusterTest
119+
public void testClassicConsumerRebalanceListenerAssignOnPartitionsRevoked() throws InterruptedException {
120+
testRebalanceListenerAssignOnPartitionsRevoked(CLASSIC);
121+
}
122+
123+
@ClusterTest
124+
public void testAsyncConsumerRebalanceListenerAssignOnPartitionsRevoked() throws InterruptedException {
125+
testRebalanceListenerAssignOnPartitionsRevoked(CONSUMER);
126+
}
127+
128+
private void testRebalanceListenerAssignOnPartitionsRevoked(GroupProtocol groupProtocol) throws InterruptedException {
129+
triggerOnPartitionsRevoked(tp, groupProtocol, (consumer, partitions) -> {
130+
var e = assertThrows(IllegalStateException.class, () -> consumer.assign(List.of(tp)));
131+
assertEquals("Subscription to topics, partitions and pattern are mutually exclusive", e.getMessage());
132+
});
133+
}
134+
135+
@ClusterTest
136+
public void testClassicConsumerRebalanceListenerAssignmentOnPartitionsRevoked() throws InterruptedException {
137+
triggerOnPartitionsRevoked(tp, CLASSIC,
138+
(consumer, partitions) -> assertTrue(consumer.assignment().contains(tp))
139+
);
140+
}
141+
142+
@ClusterTest
143+
public void testAsyncConsumerRebalanceListenerAssignmentOnPartitionsRevoked() throws InterruptedException {
144+
triggerOnPartitionsRevoked(tp, CONSUMER,
145+
(consumer, partitions) -> assertTrue(consumer.assignment().contains(tp))
146+
);
147+
}
148+
149+
@ClusterTest
150+
public void testClassicConsumerRebalanceListenerBeginningOffsetsOnPartitionsRevoked() throws InterruptedException {
151+
testRebalanceListenerBeginningOffsetsOnPartitionsRevoked(CLASSIC);
152+
}
153+
154+
@ClusterTest
155+
public void testAsyncConsumerRebalanceListenerBeginningOffsetsOnPartitionsRevoked() throws InterruptedException {
156+
testRebalanceListenerBeginningOffsetsOnPartitionsRevoked(CONSUMER);
157+
}
158+
159+
private void testRebalanceListenerBeginningOffsetsOnPartitionsRevoked(GroupProtocol groupProtocol) throws InterruptedException {
160+
triggerOnPartitionsRevoked(tp, groupProtocol, (consumer, partitions) -> {
161+
var map = consumer.beginningOffsets(List.of(tp));
162+
assertTrue(map.containsKey(tp));
163+
assertEquals(0L, map.get(tp));
164+
});
165+
}
166+
167+
@ClusterTest
168+
public void testClassicConsumerGetPositionOfNewlyAssignedPartitionOnPartitionsAssignedCallback() throws InterruptedException {
169+
testGetPositionOfNewlyAssignedPartitionOnPartitionsAssignedCallback(CLASSIC);
170+
}
171+
172+
@ClusterTest
173+
public void testAsyncConsumerGetPositionOfNewlyAssignedPartitionOnPartitionsAssignedCallback() throws InterruptedException {
174+
testGetPositionOfNewlyAssignedPartitionOnPartitionsAssignedCallback(CONSUMER);
175+
}
176+
177+
private void testGetPositionOfNewlyAssignedPartitionOnPartitionsAssignedCallback(GroupProtocol groupProtocol) throws InterruptedException {
178+
try (var consumer = createConsumer(groupProtocol)) {
179+
triggerOnPartitionsAssigned(tp, consumer,
180+
(executeConsumer, partitions) -> assertDoesNotThrow(() -> executeConsumer.position(tp))
181+
);
182+
}
183+
}
184+
185+
@ClusterTest
186+
public void testClassicConsumerSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback() throws InterruptedException {
187+
testSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback(CLASSIC);
188+
}
189+
190+
@ClusterTest
191+
public void testAsyncConsumerSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback() throws InterruptedException {
192+
testSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback(CONSUMER);
193+
}
194+
195+
private void testSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback(GroupProtocol groupProtocol) throws InterruptedException {
196+
try (var consumer = createConsumer(groupProtocol)) {
197+
var startingOffset = 100L;
198+
var totalRecords = 120;
199+
var startingTimestamp = 0L;
200+
201+
sendRecords(totalRecords, startingTimestamp);
202+
203+
triggerOnPartitionsAssigned(tp, consumer, (executeConsumer, partitions) -> {
204+
executeConsumer.seek(tp, startingOffset);
205+
executeConsumer.pause(List.of(tp));
206+
});
207+
208+
assertTrue(consumer.paused().contains(tp));
209+
consumer.resume(List.of(tp));
210+
consumeAndVerifyRecords(
211+
consumer,
212+
(int) (totalRecords - startingOffset),
213+
(int) startingOffset,
214+
(int) startingOffset,
215+
startingOffset
216+
);
217+
}
218+
}
219+
220+
private void triggerOnPartitionsAssigned(
221+
TopicPartition tp,
222+
Consumer<byte[], byte[]> consumer,
223+
BiConsumer<Consumer<byte[], byte[]>, Collection<TopicPartition>> execute
224+
) throws InterruptedException {
225+
var partitionsAssigned = new AtomicBoolean(false);
226+
consumer.subscribe(List.of(topic), new ConsumerRebalanceListener() {
227+
@Override
228+
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
229+
// Make sure the partition used in the test is actually assigned before continuing.
230+
if (partitions.contains(tp)) {
231+
execute.accept(consumer, partitions);
232+
partitionsAssigned.set(true);
233+
}
234+
}
235+
236+
@Override
237+
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
238+
// noop
239+
}
240+
});
241+
TestUtils.waitForCondition(
242+
() -> {
243+
consumer.poll(Duration.ofMillis(100));
244+
return partitionsAssigned.get();
245+
},
246+
"Timed out before expected rebalance completed"
247+
);
248+
}
249+
250+
private void triggerOnPartitionsRevoked(
251+
TopicPartition tp,
252+
GroupProtocol protocol,
253+
BiConsumer<Consumer<byte[], byte[]>, Collection<TopicPartition>> execute
254+
) throws InterruptedException {
255+
var partitionsAssigned = new AtomicBoolean(false);
256+
var partitionsRevoked = new AtomicBoolean(false);
257+
try (var consumer = createConsumer(protocol)) {
258+
consumer.subscribe(List.of(topic), new ConsumerRebalanceListener() {
259+
@Override
260+
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
261+
// Make sure the partition used in the test is actually assigned before continuing.
262+
if (partitions.contains(tp)) {
263+
partitionsAssigned.set(true);
264+
}
265+
}
266+
267+
@Override
268+
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
269+
// Make sure the partition used in the test is actually revoked before continuing.
270+
if (partitions.contains(tp)) {
271+
execute.accept(consumer, partitions);
272+
partitionsRevoked.set(true);
273+
}
274+
}
275+
});
276+
TestUtils.waitForCondition(
277+
() -> {
278+
consumer.poll(Duration.ofMillis(100));
279+
return partitionsAssigned.get();
280+
},
281+
"Timed out before expected rebalance completed"
282+
);
283+
}
284+
assertTrue(partitionsRevoked.get());
285+
}
286+
287+
private Consumer<byte[], byte[]> createConsumer(GroupProtocol protocol) {
288+
return cluster.consumer(Map.of(
289+
GROUP_PROTOCOL_CONFIG, protocol.name().toLowerCase(Locale.ROOT),
290+
ENABLE_AUTO_COMMIT_CONFIG, "false"
291+
));
292+
}
293+
294+
private void sendRecords(int numRecords, long startingTimestamp) {
295+
try (Producer<byte[], byte[]> producer = cluster.producer()) {
296+
for (var i = 0; i < numRecords; i++) {
297+
var timestamp = startingTimestamp + i;
298+
var record = new ProducerRecord<>(
299+
tp.topic(),
300+
tp.partition(),
301+
timestamp,
302+
("key " + i).getBytes(),
303+
("value " + i).getBytes()
304+
);
305+
producer.send(record);
306+
}
307+
producer.flush();
308+
}
309+
}
310+
311+
protected void consumeAndVerifyRecords(
312+
Consumer<byte[], byte[]> consumer,
313+
int numRecords,
314+
int startingOffset,
315+
int startingKeyAndValueIndex,
316+
long startingTimestamp
317+
) throws InterruptedException {
318+
var records = consumeRecords(consumer, numRecords);
319+
for (var i = 0; i < numRecords; i++) {
320+
var record = records.get(i);
321+
var offset = startingOffset + i;
322+
323+
assertEquals(tp.topic(), record.topic());
324+
assertEquals(tp.partition(), record.partition());
325+
326+
assertEquals(TimestampType.CREATE_TIME, record.timestampType());
327+
var timestamp = startingTimestamp + i;
328+
assertEquals(timestamp, record.timestamp());
329+
330+
assertEquals(offset, record.offset());
331+
var keyAndValueIndex = startingKeyAndValueIndex + i;
332+
assertEquals("key " + keyAndValueIndex, new String(record.key()));
333+
assertEquals("value " + keyAndValueIndex, new String(record.value()));
334+
// this is true only because K and V are byte arrays
335+
assertEquals(("key " + keyAndValueIndex).length(), record.serializedKeySize());
336+
assertEquals(("value " + keyAndValueIndex).length(), record.serializedValueSize());
337+
}
338+
}
339+
340+
protected <K, V> List<ConsumerRecord<K, V>> consumeRecords(
341+
Consumer<K, V> consumer,
342+
int numRecords
343+
) throws InterruptedException {
344+
List<ConsumerRecord<K, V>> records = new ArrayList<>();
345+
TestUtils.waitForCondition(() -> {
346+
consumer.poll(Duration.ofMillis(100)).forEach(records::add);
347+
return records.size() >= numRecords;
348+
}, 60000, "Timed out before consuming expected " + numRecords + " records.");
349+
350+
return records;
351+
}
352+
}

0 commit comments

Comments
 (0)