Skip to content

Commit a5b269c

Browse files
committed
Ensure resume token missing errors respect errors.tolerance
Throw an exception if there is no errors tolerance and the resume token is missing. KAFKA-105
1 parent c3c35bc commit a5b269c

File tree

2 files changed

+8
-9
lines changed

2 files changed

+8
-9
lines changed

src/integrationTest/java/com/mongodb/kafka/connect/source/MongoSourceTaskIntegrationTest.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.apache.kafka.connect.data.Schema;
4646
import org.apache.kafka.connect.data.SchemaBuilder;
4747
import org.apache.kafka.connect.data.Struct;
48+
import org.apache.kafka.connect.errors.ConnectException;
4849
import org.apache.kafka.connect.errors.DataException;
4950
import org.apache.kafka.connect.source.SourceRecord;
5051
import org.apache.kafka.connect.source.SourceTask;
@@ -402,10 +403,8 @@ void testSourceCanUseCustomOffsetPartitionNames() {
402403
when(offsetStorageReader.offset(singletonMap("ns", "oldPartitionName")))
403404
.thenReturn(INVALID_OFFSET);
404405
task.initialize(context);
405-
task.start(cfg);
406406

407-
assertNull(task.poll());
408-
task.stop();
407+
assertThrows(ConnectException.class, () -> task.start(cfg));
409408

410409
assertTrue(
411410
task.logCapture.getEvents().stream()

src/main/java/com/mongodb/kafka/connect/source/MongoSourceTask.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,7 @@ private MongoChangeStreamCursor<? extends BsonDocument> tryCreateCursor(
391391
} else if (doesNotSupportsStartAfter(e)) {
392392
supportsStartAfter = false;
393393
return tryCreateCursor(sourceConfig, mongoClient, resumeToken);
394-
} else if (resumeTokenNotFound(e)) {
394+
} else if (sourceConfig.tolerateErrors() && resumeTokenNotFound(e)) {
395395
LOGGER.warn(
396396
"Failed to resume change stream: {} {}\n"
397397
+ "===================================================================================\n"
@@ -422,6 +422,10 @@ private MongoChangeStreamCursor<? extends BsonDocument> tryCreateCursor(
422422
+ "=====================================================================================\n",
423423
e.getErrorMessage(),
424424
e.getErrorCode());
425+
if (resumeTokenNotFound(e)) {
426+
throw new ConnectException(
427+
"ResumeToken not found. Cannot create a change stream cursor", e);
428+
}
425429
}
426430
return null;
427431
}
@@ -437,12 +441,8 @@ private boolean invalidatedResumeToken(final MongoCommandException e) {
437441
}
438442

439443
private boolean resumeTokenNotFound(final MongoCommandException e) {
440-
if (!sourceConfig.tolerateErrors()) {
441-
return false;
442-
}
443444
String errorMessage = e.getErrorMessage().toLowerCase(Locale.ROOT);
444-
return sourceConfig.tolerateErrors()
445-
&& errorMessage.contains(RESUME_TOKEN)
445+
return errorMessage.contains(RESUME_TOKEN)
446446
&& (errorMessage.contains(NOT_FOUND)
447447
|| errorMessage.contains(DOES_NOT_EXIST)
448448
|| errorMessage.contains(INVALID_RESUME_TOKEN));

0 commit comments

Comments
 (0)