|
16 | 16 | package com.mongodb.kafka.connect.mongodb;
|
17 | 17 |
|
18 | 18 | import static com.mongodb.kafka.connect.mongodb.ChangeStreamOperations.ChangeStreamOperation;
|
19 |
| -import static com.mongodb.kafka.connect.mongodb.ChangeStreamOperations.createChangeStreamOperation; |
20 | 19 | import static java.lang.String.format;
|
21 | 20 | import static java.util.Collections.singletonList;
|
22 | 21 | import static org.apache.kafka.common.utils.Utils.sleep;
|
23 |
| -import static org.junit.jupiter.api.Assertions.assertEquals; |
24 | 22 | import static org.junit.jupiter.api.Assertions.assertIterableEquals;
|
25 | 23 |
|
26 | 24 | import java.time.Duration;
|
27 | 25 | import java.util.ArrayList;
|
28 | 26 | import java.util.List;
|
29 | 27 | import java.util.Properties;
|
30 | 28 | import java.util.concurrent.atomic.AtomicInteger;
|
| 29 | +import java.util.function.Function; |
31 | 30 | import java.util.stream.Collectors;
|
| 31 | +import java.util.stream.IntStream; |
32 | 32 |
|
33 | 33 | import io.confluent.connect.avro.AvroConverter;
|
34 | 34 |
|
@@ -57,6 +57,8 @@ public class MongoKafkaTestCase {
|
57 | 57 | protected static final Logger LOGGER = LoggerFactory.getLogger(MongoKafkaTestCase.class);
|
58 | 58 | protected static final AtomicInteger POSTFIX = new AtomicInteger();
|
59 | 59 |
|
| 60 | + private static final int DEFAULT_MAX_RETRIES = 15; |
| 61 | + |
60 | 62 | @RegisterExtension public static final EmbeddedKafka KAFKA = new EmbeddedKafka();
|
61 | 63 | @RegisterExtension public static final MongoDBHelper MONGODB = new MongoDBHelper();
|
62 | 64 |
|
@@ -108,88 +110,90 @@ public boolean isGreaterThanThreeDotSix() {
|
108 | 110 | }
|
109 | 111 |
|
110 | 112 | public void assertProduced(final String topicName, final int expectedCount) {
|
111 |
| - assertEquals(expectedCount, getProduced(topicName, expectedCount).size()); |
| 113 | + List<Integer> expected = IntStream.range(1, expectedCount).boxed().collect(Collectors.toList()); |
| 114 | + AtomicInteger counter = new AtomicInteger(); |
| 115 | + List<Integer> produced = |
| 116 | + getProduced(topicName, b -> counter.addAndGet(1), expected, DEFAULT_MAX_RETRIES); |
| 117 | + assertIterableEquals(expected, produced); |
112 | 118 | }
|
113 | 119 |
|
114 | 120 | public void assertProduced(
|
115 | 121 | final List<ChangeStreamOperation> operationTypes, final MongoCollection<?> coll) {
|
116 |
| - assertProduced(operationTypes, coll.getNamespace().getFullName()); |
| 122 | + assertProduced(operationTypes, coll, DEFAULT_MAX_RETRIES); |
117 | 123 | }
|
118 | 124 |
|
119 | 125 | public void assertProduced(
|
120 |
| - final List<ChangeStreamOperation> operationTypes, final String topicName) { |
121 |
| - List<ChangeStreamOperation> produced = |
122 |
| - getProduced(topicName, operationTypes.size()).stream() |
123 |
| - .map((b) -> createChangeStreamOperation(b.toString())) |
124 |
| - .collect(Collectors.toList()); |
125 |
| - assertIterableEquals(operationTypes, produced); |
| 126 | + final List<ChangeStreamOperation> operationTypes, |
| 127 | + final MongoCollection<?> coll, |
| 128 | + final int maxRetryCount) { |
| 129 | + assertProduced(operationTypes, coll.getNamespace().getFullName(), maxRetryCount); |
126 | 130 | }
|
127 | 131 |
|
128 |
| - public void assertEventuallyProduces( |
129 |
| - final List<ChangeStreamOperation> operationTypes, final MongoCollection<?> coll) { |
130 |
| - assertEventuallyProduces(operationTypes, coll.getNamespace().getFullName()); |
| 132 | + public void assertProduced( |
| 133 | + final List<ChangeStreamOperation> operationTypes, final String topicName) { |
| 134 | + assertProduced(operationTypes, topicName, DEFAULT_MAX_RETRIES); |
131 | 135 | }
|
132 | 136 |
|
133 |
| - public void assertEventuallyProduces( |
134 |
| - final List<ChangeStreamOperation> operationTypes, final String topicName) { |
| 137 | + public void assertProduced( |
| 138 | + final List<ChangeStreamOperation> operationTypes, |
| 139 | + final String topicName, |
| 140 | + final int maxRetryCount) { |
135 | 141 | List<ChangeStreamOperation> produced =
|
136 |
| - getProduced(topicName, Integer.MAX_VALUE).stream() |
137 |
| - .map((b) -> createChangeStreamOperation(b.toString())) |
138 |
| - .collect(Collectors.toList()); |
139 |
| - |
140 |
| - if (produced.size() > operationTypes.size()) { |
141 |
| - boolean startsWith = |
142 |
| - produced |
143 |
| - .get(operationTypes.size() - 1) |
144 |
| - .equals(operationTypes.get(operationTypes.size() - 1)); |
145 |
| - if (startsWith) { |
146 |
| - assertIterableEquals(operationTypes, produced.subList(0, operationTypes.size())); |
147 |
| - } else { |
148 |
| - assertIterableEquals( |
| 142 | + getProduced( |
| 143 | + topicName, |
| 144 | + ChangeStreamOperations::createChangeStreamOperation, |
149 | 145 | operationTypes,
|
150 |
| - produced.subList(produced.lastIndexOf(operationTypes.get(0)), produced.size())); |
151 |
| - } |
152 |
| - } else { |
153 |
| - assertIterableEquals(operationTypes, produced); |
154 |
| - } |
| 146 | + maxRetryCount); |
| 147 | + assertIterableEquals(operationTypes, produced); |
155 | 148 | }
|
156 | 149 |
|
157 | 150 | public void assertProducedDocs(final List<Document> docs, final MongoCollection<?> coll) {
|
158 |
| - assertEquals( |
159 |
| - docs, |
160 |
| - getProduced(coll.getNamespace().getFullName(), docs.size()).stream() |
161 |
| - .map((b) -> Document.parse(b.toString())) |
162 |
| - .collect(Collectors.toList())); |
| 151 | + List<Document> produced = |
| 152 | + getProduced( |
| 153 | + coll.getNamespace().getFullName(), |
| 154 | + b -> Document.parse(b.toString()), |
| 155 | + docs, |
| 156 | + DEFAULT_MAX_RETRIES); |
| 157 | + assertIterableEquals(docs, produced); |
163 | 158 | }
|
164 | 159 |
|
165 |
| - public List<Bytes> getProduced(final String topicName, final int expectedCount) { |
166 |
| - if (expectedCount != Integer.MAX_VALUE) { |
167 |
| - LOGGER.info("Subscribing to {} expecting to see #{}", topicName, expectedCount); |
168 |
| - } else { |
169 |
| - LOGGER.info("Subscribing to {} getting all messages", topicName); |
170 |
| - } |
| 160 | + public <T> List<T> getProduced( |
| 161 | + final String topicName, |
| 162 | + final Function<Bytes, T> mapper, |
| 163 | + final List<T> expected, |
| 164 | + final int maxRetryCount) { |
| 165 | + LOGGER.info("Subscribing to {}", topicName); |
171 | 166 |
|
172 | 167 | try (KafkaConsumer<?, ?> consumer = createConsumer()) {
|
173 | 168 | consumer.subscribe(singletonList(topicName));
|
174 |
| - List<Bytes> data = new ArrayList<>(); |
| 169 | + List<T> data = new ArrayList<>(); |
| 170 | + T firstExpected = expected.isEmpty() ? null : expected.get(0); |
| 171 | + T lastExpected = expected.isEmpty() ? null : expected.get(expected.size() - 1); |
175 | 172 | int counter = 0;
|
176 | 173 | int retryCount = 0;
|
177 | 174 | int previousDataSize;
|
178 |
| - while (data.size() < expectedCount && retryCount < 30) { |
| 175 | + |
| 176 | + while (retryCount < maxRetryCount) { |
179 | 177 | counter++;
|
180 |
| - LOGGER.info("Polling {} ({}) seen: #{}", topicName, counter, data.size()); |
181 | 178 | previousDataSize = data.size();
|
182 |
| - |
183 | 179 | consumer
|
184 | 180 | .poll(Duration.ofSeconds(2))
|
185 | 181 | .records(topicName)
|
186 |
| - .forEach((r) -> data.add((Bytes) r.value())); |
| 182 | + .forEach((r) -> data.add(mapper.apply((Bytes) r.value()))); |
| 183 | + |
| 184 | + int firstExpectedIndex = data.lastIndexOf(firstExpected); |
| 185 | + int lastExpectedIndex = data.lastIndexOf(lastExpected); |
| 186 | + int dataSize = lastExpectedIndex - firstExpectedIndex + 1; |
| 187 | + if (firstExpectedIndex > -1 && lastExpectedIndex > -1 && dataSize == expected.size()) { |
| 188 | + return data.subList(firstExpectedIndex, lastExpectedIndex + 1); |
| 189 | + } |
187 | 190 |
|
188 | 191 | // Wait at least 3 minutes for the first set of data to arrive
|
189 |
| - if (data.size() > 0 || counter > 90) { |
190 |
| - retryCount = data.size() == previousDataSize ? retryCount + 1 : 0; |
| 192 | + if (expected.size() == 0 || data.size() > 0 || counter > 90) { |
| 193 | + retryCount += previousDataSize == data.size() ? 1 : 0; |
191 | 194 | }
|
192 | 195 | }
|
| 196 | + |
193 | 197 | return data;
|
194 | 198 | }
|
195 | 199 | }
|
|
0 commit comments