Skip to content

Commit e4cda27

Browse files
authored
KAFKA-428: Mark the copy complete in the source offset for the last copied document (#168)
1 parent d477f00 commit e4cda27

File tree

2 files changed

+183
-64
lines changed

2 files changed

+183
-64
lines changed

src/integrationTest/java/com/mongodb/kafka/connect/source/MongoSourceTaskIntegrationTest.java

Lines changed: 108 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import static org.junit.jupiter.api.Assertions.assertFalse;
3737
import static org.junit.jupiter.api.Assertions.assertIterableEquals;
3838
import static org.junit.jupiter.api.Assertions.assertNotEquals;
39+
import static org.junit.jupiter.api.Assertions.assertNotNull;
3940
import static org.junit.jupiter.api.Assertions.assertNull;
4041
import static org.junit.jupiter.api.Assertions.assertThrows;
4142
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -169,6 +170,9 @@ void testSourceLoadsDataFromMongoClientWithCopyExisting() {
169170
new HashMap<String, String>() {
170171
{
171172
put(MongoSourceConfig.STARTUP_MODE_CONFIG, StartupMode.COPY_EXISTING.propertyValue());
173+
String namespaceRegex =
174+
String.format("(%s\\.coll|%s\\.coll)", db1.getName(), db2.getName());
175+
put(MongoSourceConfig.COPY_EXISTING_NAMESPACE_REGEX_CONFIG, namespaceRegex);
172176
}
173177
};
174178
task.start(cfg);
@@ -178,11 +182,22 @@ void testSourceLoadsDataFromMongoClientWithCopyExisting() {
178182
() -> assertEquals(150, firstPoll.size()),
179183
() -> assertSourceRecordValues(createInserts(1, 75), firstPoll, coll1),
180184
() -> assertSourceRecordValues(createInserts(1, 75), firstPoll, coll2),
185+
// make sure all elements, except the last, contains the "copy" key
181186
() ->
182187
assertTrue(
183188
firstPoll.stream()
184189
.map(SourceRecord::sourceOffset)
185-
.allMatch(i -> i.containsKey("copy"))));
190+
.limit(150 - 1) // exclude the last record
191+
.allMatch(i -> i.containsKey("copy"))),
192+
// make sure that the last copied element does not have the "copy" key
193+
() ->
194+
assertTrue(
195+
firstPoll.stream()
196+
.map(SourceRecord::sourceOffset)
197+
.skip(150 - 1) // exclude the last record
198+
.findFirst()
199+
.filter(i -> !i.containsKey("copy"))
200+
.isPresent()));
186201

187202
assertNull(task.poll());
188203

@@ -533,6 +548,7 @@ void testSourceCanUseCustomOffsetPartitionNames() {
533548
@Test
534549
@DisplayName("Copy existing with a restart midway through")
535550
void testCopyingExistingWithARestartMidwayThrough() {
551+
assumeTrue(isGreaterThanThreeDotSix());
536552
try (AutoCloseableSourceTask task = createSourceTask()) {
537553

538554
MongoCollection<Document> coll = getCollection();
@@ -544,7 +560,7 @@ void testCopyingExistingWithARestartMidwayThrough() {
544560
put(MongoSourceConfig.COLLECTION_CONFIG, coll.getNamespace().getCollectionName());
545561
put(MongoSourceConfig.STARTUP_MODE_CONFIG, StartupMode.COPY_EXISTING.propertyValue());
546562
put(MongoSourceConfig.POLL_MAX_BATCH_SIZE_CONFIG, "25");
547-
put(MongoSourceConfig.POLL_AWAIT_TIME_MS_CONFIG, "10000");
563+
put(MongoSourceConfig.POLL_AWAIT_TIME_MS_CONFIG, "1000");
548564
}
549565
};
550566

@@ -556,6 +572,17 @@ void testCopyingExistingWithARestartMidwayThrough() {
556572
assertTrue(
557573
firstPoll.stream().map(SourceRecord::sourceOffset).allMatch(i -> i.containsKey("copy")));
558574

575+
Map<String, ?> lastOffset = firstPoll.get(25 - 1).sourceOffset();
576+
577+
// mock the context so that on restart we know where the last task left off
578+
when(context.offsetStorageReader()).thenReturn(offsetStorageReader);
579+
assertNotNull(lastOffset.get("_id")); // check to make sure the value is an Object
580+
@SuppressWarnings("unchecked")
581+
Map<String, Object> mockedOffset = (Map<String, Object>) lastOffset;
582+
when(offsetStorageReader.offset(any())).thenReturn(mockedOffset);
583+
task.initialize(context);
584+
585+
// perform a restart
559586
task.stop();
560587
task.start(cfg);
561588

@@ -566,8 +593,20 @@ void testCopyingExistingWithARestartMidwayThrough() {
566593

567594
List<SourceRecord> thirdPoll = getNextBatch(task);
568595
assertSourceRecordValues(createInserts(26, 50), thirdPoll, coll);
596+
// Make sure all elements, except the last one, contains the "copy" key
569597
assertTrue(
570-
thirdPoll.stream().map(SourceRecord::sourceOffset).allMatch(i -> i.containsKey("copy")));
598+
thirdPoll.stream()
599+
.map(SourceRecord::sourceOffset)
600+
.limit(25 - 1) // exclude the last record in the batch
601+
.allMatch(i -> i.containsKey("copy")));
602+
// Make sure the last copied element does not contain the "copy" key
603+
assertTrue(
604+
thirdPoll.stream()
605+
.map(SourceRecord::sourceOffset)
606+
.skip(25 - 1) // exclude the last record in the batch
607+
.findFirst()
608+
.filter(i -> !i.containsKey("copy"))
609+
.isPresent());
571610

572611
assertTrue(getNextBatch(task).isEmpty());
573612
insertMany(rangeClosed(51, 75), coll);
@@ -579,6 +618,72 @@ void testCopyingExistingWithARestartMidwayThrough() {
579618
}
580619
}
581620

621+
@Test
622+
@DisplayName("Copy existing with a restart after finishing")
623+
void testCopyingExistingWithARestartAfterFinishing() {
624+
assumeTrue(isGreaterThanThreeDotSix());
625+
try (AutoCloseableSourceTask task = createSourceTask()) {
626+
627+
MongoCollection<Document> coll = getCollection();
628+
629+
HashMap<String, String> cfg =
630+
new HashMap<String, String>() {
631+
{
632+
put(MongoSourceConfig.DATABASE_CONFIG, coll.getNamespace().getDatabaseName());
633+
put(MongoSourceConfig.COLLECTION_CONFIG, coll.getNamespace().getCollectionName());
634+
put(MongoSourceConfig.STARTUP_MODE_CONFIG, StartupMode.COPY_EXISTING.propertyValue());
635+
put(MongoSourceConfig.POLL_MAX_BATCH_SIZE_CONFIG, "25");
636+
put(MongoSourceConfig.POLL_AWAIT_TIME_MS_CONFIG, "1000");
637+
}
638+
};
639+
640+
insertMany(rangeClosed(1, 50), coll);
641+
task.start(cfg);
642+
643+
List<SourceRecord> firstPoll = getNextBatch(task);
644+
assertSourceRecordValues(createInserts(1, 25), firstPoll, coll);
645+
assertTrue(
646+
firstPoll.stream().map(SourceRecord::sourceOffset).allMatch(i -> i.containsKey("copy")));
647+
648+
List<SourceRecord> secondPoll = getNextBatch(task);
649+
assertSourceRecordValues(createInserts(26, 50), secondPoll, coll);
650+
// Make sure all elements, except the last one, contains the "copy" key
651+
assertTrue(
652+
secondPoll.stream()
653+
.map(SourceRecord::sourceOffset)
654+
.limit(25 - 1) // exclude the last record in the batch
655+
.allMatch(i -> i.containsKey("copy")));
656+
657+
Map<String, ?> lastOffset = secondPoll.get(25 - 1).sourceOffset();
658+
659+
// Make sure the last copied element does not contain the "copy" key
660+
assertFalse(lastOffset.containsKey("copy"));
661+
662+
// mock the context so that on restart we know where the last task left off
663+
when(context.offsetStorageReader()).thenReturn(offsetStorageReader);
664+
assertNotNull(lastOffset.get("_id")); // check to make sure the value is an Object
665+
@SuppressWarnings("unchecked")
666+
Map<String, Object> mockedOffset = (Map<String, Object>) lastOffset;
667+
when(offsetStorageReader.offset(any())).thenReturn(mockedOffset);
668+
task.initialize(context);
669+
670+
// perform a restart
671+
task.stop();
672+
task.start(cfg);
673+
674+
// make sure that a copy doesn't occur again because all data was already copied
675+
assertTrue(getNextBatch(task).isEmpty());
676+
677+
// make sure that we can continue to process data
678+
insertMany(rangeClosed(51, 75), coll);
679+
680+
List<SourceRecord> thirdPoll = getNextBatch(task);
681+
assertSourceRecordValues(createInserts(51, 75), thirdPoll, coll);
682+
assertFalse(
683+
thirdPoll.stream().map(SourceRecord::sourceOffset).anyMatch(i -> i.containsKey("copy")));
684+
}
685+
}
686+
582687
@Test
583688
@DisplayName("Ensure source loads data from collection and outputs documents only")
584689
void testSourceLoadsDataFromCollectionDocumentOnly() {

src/main/java/com/mongodb/kafka/connect/source/StartedMongoSourceTask.java

Lines changed: 75 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import java.util.ArrayList;
4949
import java.util.HashMap;
5050
import java.util.HashSet;
51+
import java.util.Iterator;
5152
import java.util.List;
5253
import java.util.Locale;
5354
import java.util.Map;
@@ -210,68 +211,81 @@ private List<SourceRecord> pollInternal() {
210211
createValueSchemaAndValueProvider(sourceConfig);
211212

212213
List<SourceRecord> sourceRecords = new ArrayList<>();
213-
getNextBatch()
214-
.forEach(
215-
changeStreamDocument -> {
216-
Map<String, String> sourceOffset = new HashMap<>();
217-
sourceOffset.put(ID_FIELD, changeStreamDocument.getDocument(ID_FIELD).toJson());
218-
if (isCopying) {
219-
sourceOffset.put(COPY_KEY, "true");
220-
}
214+
Iterator<BsonDocument> batchIterator = getNextBatch().iterator();
215+
while (batchIterator.hasNext()) {
216+
BsonDocument changeStreamDocument = batchIterator.next();
217+
Map<String, String> sourceOffset = new HashMap<>();
218+
sourceOffset.put(ID_FIELD, changeStreamDocument.getDocument(ID_FIELD).toJson());
219+
if (isCopying) {
220+
sourceOffset.put(COPY_KEY, "true");
221+
}
221222

222-
String topicName = topicMapper.getTopic(changeStreamDocument);
223-
if (topicName.isEmpty()) {
224-
LOGGER.warn(
225-
"No topic set. Could not publish the message: {}",
226-
changeStreamDocument.toJson());
227-
} else {
228-
229-
Optional<BsonDocument> valueDocument = Optional.empty();
230-
231-
boolean isTombstoneEvent =
232-
publishFullDocumentOnlyTombstoneOnDelete
233-
&& !changeStreamDocument.containsKey(FULL_DOCUMENT);
234-
if (publishFullDocumentOnly) {
235-
if (changeStreamDocument.containsKey(FULL_DOCUMENT)
236-
&& changeStreamDocument.get(FULL_DOCUMENT).isDocument()) {
237-
valueDocument = Optional.of(changeStreamDocument.getDocument(FULL_DOCUMENT));
238-
}
239-
} else {
240-
valueDocument = Optional.of(changeStreamDocument);
241-
}
242-
243-
if (valueDocument.isPresent() || isTombstoneEvent) {
244-
BsonDocument valueDoc = valueDocument.orElse(new BsonDocument());
245-
LOGGER.trace("Adding {} to {}: {}", valueDoc, topicName, sourceOffset);
246-
247-
if (valueDoc instanceof RawBsonDocument) {
248-
int sizeBytes = ((RawBsonDocument) valueDoc).getByteBuffer().limit();
249-
statisticsManager.currentStatistics().getMongodbBytesRead().sample(sizeBytes);
250-
}
251-
252-
BsonDocument keyDocument;
253-
if (sourceConfig.getKeyOutputFormat() == MongoSourceConfig.OutputFormat.SCHEMA) {
254-
keyDocument = changeStreamDocument;
255-
} else if (sourceConfig.getBoolean(DOCUMENT_KEY_AS_KEY_CONFIG)
256-
&& changeStreamDocument.containsKey(DOCUMENT_KEY_FIELD)) {
257-
keyDocument = changeStreamDocument.getDocument(DOCUMENT_KEY_FIELD);
258-
} else {
259-
keyDocument = new BsonDocument(ID_FIELD, changeStreamDocument.get(ID_FIELD));
260-
}
261-
262-
createSourceRecord(
263-
keySchemaAndValueProducer,
264-
isTombstoneEvent
265-
? TOMBSTONE_SCHEMA_AND_VALUE_PRODUCER
266-
: valueSchemaAndValueProducer,
267-
sourceOffset,
268-
topicName,
269-
keyDocument,
270-
valueDoc)
271-
.map(sourceRecords::add);
272-
}
273-
}
274-
});
223+
// if isCopying is true, we want to set the COPY_KEY flag so that kafka has context that a
224+
// copy is in progress. However, for the last document that we are copying, we should not set
225+
// this flag because the copy has completed, otherwise we are relying on future change stream
226+
// events to signify that we are no longer copying. We also need to set the _id field to be a
227+
// valid resume token, which during copying exists in the cachedResumeToken variable.
228+
// In version 3.6 of mongodb the cachedResumeToken initializes to null so we need to avoid
229+
// this null pointer exception.
230+
boolean lastDocument = !batchIterator.hasNext();
231+
boolean noMoreDataToCopy = copyDataManager != null && !copyDataManager.isCopying();
232+
if (isCopying && lastDocument && noMoreDataToCopy && cachedResumeToken != null) {
233+
sourceOffset.put(ID_FIELD, cachedResumeToken.toJson());
234+
sourceOffset.remove(COPY_KEY);
235+
}
236+
237+
String topicName = topicMapper.getTopic(changeStreamDocument);
238+
if (topicName.isEmpty()) {
239+
LOGGER.warn(
240+
"No topic set. Could not publish the message: {}", changeStreamDocument.toJson());
241+
} else {
242+
243+
Optional<BsonDocument> valueDocument = Optional.empty();
244+
245+
boolean isTombstoneEvent =
246+
publishFullDocumentOnlyTombstoneOnDelete
247+
&& !changeStreamDocument.containsKey(FULL_DOCUMENT);
248+
if (publishFullDocumentOnly) {
249+
if (changeStreamDocument.containsKey(FULL_DOCUMENT)
250+
&& changeStreamDocument.get(FULL_DOCUMENT).isDocument()) {
251+
valueDocument = Optional.of(changeStreamDocument.getDocument(FULL_DOCUMENT));
252+
}
253+
} else {
254+
valueDocument = Optional.of(changeStreamDocument);
255+
}
256+
257+
if (valueDocument.isPresent() || isTombstoneEvent) {
258+
BsonDocument valueDoc = valueDocument.orElse(new BsonDocument());
259+
LOGGER.trace("Adding {} to {}: {}", valueDoc, topicName, sourceOffset);
260+
261+
if (valueDoc instanceof RawBsonDocument) {
262+
int sizeBytes = ((RawBsonDocument) valueDoc).getByteBuffer().limit();
263+
statisticsManager.currentStatistics().getMongodbBytesRead().sample(sizeBytes);
264+
}
265+
266+
BsonDocument keyDocument;
267+
if (sourceConfig.getKeyOutputFormat() == MongoSourceConfig.OutputFormat.SCHEMA) {
268+
keyDocument = changeStreamDocument;
269+
} else if (sourceConfig.getBoolean(DOCUMENT_KEY_AS_KEY_CONFIG)
270+
&& changeStreamDocument.containsKey(DOCUMENT_KEY_FIELD)) {
271+
keyDocument = changeStreamDocument.getDocument(DOCUMENT_KEY_FIELD);
272+
} else {
273+
keyDocument = new BsonDocument(ID_FIELD, changeStreamDocument.get(ID_FIELD));
274+
}
275+
276+
createSourceRecord(
277+
keySchemaAndValueProducer,
278+
isTombstoneEvent
279+
? TOMBSTONE_SCHEMA_AND_VALUE_PRODUCER
280+
: valueSchemaAndValueProducer,
281+
sourceOffset,
282+
topicName,
283+
keyDocument,
284+
valueDoc)
285+
.map(sourceRecords::add);
286+
}
287+
}
288+
}
275289
LOGGER.debug("Return batch of {}", sourceRecords.size());
276290

277291
if (sourceRecords.isEmpty()) {

0 commit comments

Comments
 (0)