Skip to content

Commit aa69616

Browse files
committed
Improved the error messaging for the missing resume tokens in the source connector.
KAFKA-91
1 parent b4d37bc commit aa69616

File tree

2 files changed

+19
-1
lines changed

2 files changed

+19
-1
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
- [KAFKA-75](https://jira.mongodb.org/browse/KAFKA-75) Added specific configuration for the id strategies: `ProvidedInKeyStrategy` and `ProvidedInValueStrategy`.
1818
Added `document.id.strategy.partial.value.projection.type`, `document.id.strategy.partial.value.projection.list`,
1919
`document.id.strategy.partial.key.projection.type` and `document.id.strategy.partial.key.projection.list`.
20+
- [KAFKA-91](https://jira.mongodb.org/browse/KAFKA-91) Improved the error messaging for the missing resume tokens in the source connector.
2021

2122
## 1.1.0
2223
- [KAFKA-45](https://jira.mongodb.org/browse/KAFKA-45) Allow the Sink connector to ignore unused source record key or value fields.

src/main/java/com/mongodb/kafka/connect/source/MongoSourceTask.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,24 @@ private MongoCursor<BsonDocument> tryCreateCursor(
273273
return tryCreateCursor(sourceConfig, mongoClient, resumeToken);
274274
}
275275
}
276-
LOGGER.info("Failed to resume change stream: {} {}", e.getErrorMessage(), e.getErrorCode());
276+
LOGGER.warn(
277+
"Failed to resume change stream: {} {}\n"
278+
+ "=====================================================================================\n"
279+
+ "If the resume token is no longer available then there is the potential for data loss.\n"
280+
+ "Saved resume tokens are managed by Kafka and stored with the offset data.\n\n"
281+
+ "When running Connect in standalone mode offsets are configured using the:\n"
282+
+ "`offset.storage.file.filename` configuration.\n"
283+
+ "When running Connect in distributed mode the offsets are stored in a topic.\n\n"
284+
+ "Use the `kafka-consumer-groups.sh` tool with the `--reset-offsets` flag to reset\n"
285+
+ "offsets.\n\n"
286+
+ "Resetting the offset will allow for the connector to be resume from the latest resume\n"
287+
+ "token. Using `copy.existing=true` ensures that all data will be outputted by the\n"
288+
+ "connector but it will duplicate existing data.\n"
289+
+ "Future releases will support a configurable `errors.tolerance` level for the source\n"
290+
+ "connector and make use of the `postBatchResumeToken`.\n"
291+
+ "=====================================================================================\n",
292+
e.getErrorMessage(),
293+
e.getErrorCode());
277294
return null;
278295
}
279296
}

0 commit comments

Comments
 (0)