Skip to content

Commit 5d8c3d5

Browse files
committed
Added support for topics.regex
KAFKA-82
1 parent f2173bf commit 5d8c3d5

File tree

10 files changed

+316
-115
lines changed

10 files changed

+316
-115
lines changed

src/integrationTest/java/com/mongodb/kafka/connect/ConnectorValidationTest.java

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050

5151
public final class ConnectorValidationTest {
5252

53-
private static final String DEFAULT_URI = "mongodb://localhost:27017";
53+
private static final String DEFAULT_URI = "mongodb://localhost:27017/";
5454
private static final String URI_SYSTEM_PROPERTY_NAME = "org.mongodb.test.uri";
5555
private static final String DEFAULT_DATABASE_NAME = "MongoKafkaTest";
5656

@@ -83,13 +83,16 @@ void testSinkConfigValidation() {
8383
@DisplayName("Ensure sink configuration validation handles invalid connections")
8484
void testSinkConfigValidationInvalidConnection() {
8585
assertInvalidSink(createSinkProperties("mongodb://192.0.2.0:27017/?connectTimeoutMS=1000"));
86+
assertInvalidSink(createSinkRegexProperties("mongodb://192.0.2.0:27017/?connectTimeoutMS=1000"));
8687
}
8788

8889
@Test
8990
@DisplayName("Ensure sink configuration validation handles invalid user")
9091
void testSinkConfigValidationInvalidUser() {
9192
assertInvalidSink(createSinkProperties(format("mongodb://fakeUser:fakePass@%s/",
9293
String.join(",", getConnectionString().getHosts()))));
94+
assertInvalidSink(createSinkRegexProperties(format("mongodb://fakeUser:fakePass@%s/",
95+
String.join(",", getConnectionString().getHosts()))));
9396
}
9497

9598
@Test
@@ -98,6 +101,7 @@ void testSinkConfigValidationReadUser() {
98101
assumeTrue(isAuthEnabled());
99102
createUser("read");
100103
assertInvalidSink(createSinkProperties(getConnectionStringForCustomUser()));
104+
assertInvalidSink(createSinkRegexProperties(getConnectionStringForCustomUser()));
101105
}
102106

103107
@Test
@@ -106,6 +110,7 @@ void testSinkConfigValidationReadWriteUser() {
106110
assumeTrue(isAuthEnabled());
107111
createUser("readWrite");
108112
assertValidSink(createSinkProperties(getConnectionStringForCustomUser()));
113+
assertValidSink(createSinkRegexProperties(getConnectionStringForCustomUser()));
109114
}
110115

111116
@Test
@@ -121,6 +126,15 @@ void testSinkConfigValidationReadWriteOnSpecificDatabase() {
121126

122127
properties.put(MongoSinkTopicConfig.DATABASE_CONFIG, CUSTOM_DATABASE);
123128
assertValidSink(properties);
129+
130+
// Regex tests
131+
properties = createSinkRegexProperties(getConnectionStringForCustomUser());
132+
133+
// Different database than has permissions for
134+
assertInvalidSink(properties);
135+
136+
properties.put(MongoSinkTopicConfig.DATABASE_CONFIG, CUSTOM_DATABASE);
137+
assertValidSink(properties);
124138
}
125139

126140
@Test
@@ -143,6 +157,20 @@ void testSinkConfigValidationCollectionBasedPrivileges() {
143157
// Different collection than has permissions for
144158
properties.put(MongoSinkTopicConfig.COLLECTION_CONFIG, CUSTOM_COLLECTION);
145159
assertValidSink(properties);
160+
161+
// Regex tests
162+
properties = createSinkRegexProperties(getConnectionStringForCustomUser());
163+
164+
// Different database than has permissions for
165+
assertInvalidSink(properties);
166+
167+
// Different collection than has permissions for
168+
properties.put(MongoSinkTopicConfig.DATABASE_CONFIG, CUSTOM_DATABASE);
169+
assertInvalidSink(properties);
170+
171+
// Different collection than has permissions for
172+
properties.put(MongoSinkTopicConfig.COLLECTION_CONFIG, CUSTOM_COLLECTION);
173+
assertValidSink(properties);
146174
}
147175

148176
@Test
@@ -166,6 +194,20 @@ void testSinkConfigValidationCollectionBasedDifferentAuthPrivileges() {
166194
// Same collection than has permissions for
167195
properties.put(MongoSinkTopicConfig.COLLECTION_CONFIG, CUSTOM_COLLECTION);
168196
assertValidSink(properties);
197+
198+
// Regex tests
199+
properties = createSinkRegexProperties(getConnectionStringForCustomUser(CUSTOM_DATABASE));
200+
201+
// Different database than has permissions for
202+
assertInvalidSink(properties);
203+
204+
// Different collection than has permissions for
205+
properties.put(MongoSinkTopicConfig.DATABASE_CONFIG, CUSTOM_DATABASE);
206+
assertInvalidSink(properties);
207+
208+
// Same collection than has permissions for
209+
properties.put(MongoSinkTopicConfig.COLLECTION_CONFIG, CUSTOM_COLLECTION);
210+
assertValidSink(properties);
169211
}
170212

171213
@Test
@@ -379,12 +421,19 @@ private Map<String, String> createSinkProperties() {
379421

380422
private Map<String, String> createSinkProperties(final String connectionString) {
381423
Map<String, String> properties = createProperties(connectionString);
382-
properties.put("topics", "test");
424+
properties.put(MongoSinkConfig.TOPICS_CONFIG, "test");
383425
properties.put(MongoSinkTopicConfig.DATABASE_CONFIG, "test");
384426
properties.put(MongoSinkTopicConfig.COLLECTION_CONFIG, "test");
385427
return properties;
386428
}
387429

430+
private Map<String, String> createSinkRegexProperties(final String connectionString) {
431+
Map<String, String> properties = createSinkProperties(connectionString);
432+
properties.remove(MongoSinkConfig.TOPICS_CONFIG);
433+
properties.put(MongoSinkConfig.TOPICS_REGEX_CONFIG, "topic-(.*)");
434+
return properties;
435+
}
436+
388437
private Map<String, String> createSourceProperties() {
389438
return createSourceProperties(getConnectionString().toString());
390439
}

src/integrationTest/java/com/mongodb/kafka/connect/MongoSinkConnectorTest.java

Lines changed: 39 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515
*/
1616
package com.mongodb.kafka.connect;
1717

18+
import static com.mongodb.kafka.connect.sink.MongoSinkConfig.TOPICS_REGEX_CONFIG;
19+
import static com.mongodb.kafka.connect.sink.MongoSinkConfig.TOPIC_OVERRIDE_CONFIG;
20+
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.COLLECTION_CONFIG;
1821
import static java.lang.String.format;
1922
import static java.util.Arrays.asList;
2023
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -23,15 +26,12 @@
2326
import java.util.Properties;
2427
import java.util.stream.Collectors;
2528
import java.util.stream.IntStream;
26-
import java.util.stream.Stream;
2729

2830
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
2931

3032
import org.apache.kafka.clients.producer.KafkaProducer;
3133
import org.apache.kafka.clients.producer.ProducerConfig;
3234
import org.apache.kafka.clients.producer.ProducerRecord;
33-
import org.apache.kafka.common.serialization.IntegerSerializer;
34-
import org.apache.kafka.common.serialization.StringSerializer;
3535
import org.junit.jupiter.api.DisplayName;
3636
import org.junit.jupiter.api.Test;
3737

@@ -41,78 +41,59 @@
4141
class MongoSinkConnectorTest extends MongoKafkaTestCase {
4242

4343
@Test
44-
@DisplayName("Ensure simple producer sends data")
45-
void testASimpleProducerSmokeTest() {
44+
@DisplayName("Ensure sink connect saves data to MongoDB")
45+
void testSinkSavesAvroDataToMongoDB() {
4646
String topicName = getTopicName();
4747
KAFKA.createTopic(topicName);
48+
addSinkConnector(topicName);
4849

49-
Properties props = new Properties();
50-
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, topicName);
51-
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.bootstrapServers());
52-
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
53-
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
54-
props.put(ProducerConfig.ACKS_CONFIG, "all");
55-
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
56-
57-
try (KafkaProducer<Integer, String> producer = new KafkaProducer<>(props)) {
58-
producer.initTransactions();
59-
producer.beginTransaction();
60-
61-
IntStream.range(0, 10).forEach(i -> {
62-
producer.send(new ProducerRecord<>(topicName, i, "Hello, World!"));
63-
});
64-
producer.commitTransaction();
65-
66-
assertProduced(10, topicName);
67-
}
50+
assertProducesMessages(topicName, getCollectionName());
6851
}
6952

7053
@Test
71-
@DisplayName("Ensure sink connect saves data to MongoDB")
72-
void testSinkSavesAvroDataToMongoDB() {
73-
Stream<TweetMsg> tweets = IntStream.range(0, 100).mapToObj(i ->
74-
TweetMsg.newBuilder().setId$1(i)
75-
.setText(format("test tweet %s end2end testing apache kafka <-> mongodb sink connector is fun!", i))
76-
.setHashtags(asList(format("t%s", i), "kafka", "mongodb", "testing"))
77-
.build()
78-
);
54+
@DisplayName("Ensure sink connect saves data to MongoDB when using regex")
55+
void testSinkSavesAvroDataToMongoDBWhenUsingRegex() {
56+
String topicName1 = "topic-regex-101";
57+
String topicName2 = "topic-regex-202";
7958

80-
String topicName = getTopicName();
81-
KAFKA.createTopic(topicName);
82-
addSinkConnector(topicName);
59+
String collectionName1 = "regexColl1";
60+
String collectionName2 = "regexColl2";
8361

84-
Properties producerProps = new Properties();
85-
producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, topicName);
86-
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.bootstrapServers());
87-
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer");
88-
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer");
89-
producerProps.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, KAFKA.schemaRegistryUrl());
62+
KAFKA.createTopic(topicName1);
63+
KAFKA.createTopic(topicName2);
9064

91-
try (KafkaProducer<String, TweetMsg> producer = new KafkaProducer<>(producerProps)) {
92-
producer.initTransactions();
93-
producer.beginTransaction();
94-
tweets.forEach(tweet -> producer.send(new ProducerRecord<>(topicName, tweet)));
95-
producer.commitTransaction();
65+
Properties sinkProperties = new Properties();
66+
sinkProperties.put(TOPICS_REGEX_CONFIG, "topic\\-regex\\-(.*)");
67+
sinkProperties.put(format(TOPIC_OVERRIDE_CONFIG, topicName1, COLLECTION_CONFIG), collectionName1);
68+
sinkProperties.put(format(TOPIC_OVERRIDE_CONFIG, topicName2, COLLECTION_CONFIG), collectionName2);
69+
addSinkConnector(sinkProperties);
9670

97-
assertProduced(100, topicName);
98-
assertEquals(100, getCollection().countDocuments());
99-
}
71+
assertProducesMessages(topicName1, collectionName1);
72+
assertProducesMessages(topicName2, collectionName2);
10073
}
10174

10275
@Test
10376
@DisplayName("Ensure sink can survive a restart")
10477
void testSinkSurvivesARestart() {
78+
String topicName = getTopicName();
79+
KAFKA.createTopic(topicName);
80+
addSinkConnector(topicName);
81+
assertProducesMessages(topicName, getCollectionName(), true);
82+
}
83+
84+
private void assertProducesMessages(final String topicName, final String collectionName) {
85+
assertProducesMessages(topicName, collectionName, false);
86+
}
87+
88+
private void assertProducesMessages(final String topicName, final String collectionName, final boolean restartConnector) {
89+
10590
List<TweetMsg> tweets = IntStream.range(0, 100).mapToObj(i ->
10691
TweetMsg.newBuilder().setId$1(i)
10792
.setText(format("test tweet %s end2end testing apache kafka <-> mongodb sink connector is fun!", i))
10893
.setHashtags(asList(format("t%s", i), "kafka", "mongodb", "testing"))
10994
.build()
11095
).collect(Collectors.toList());
11196

112-
String topicName = getTopicName();
113-
KAFKA.createTopic(topicName);
114-
addSinkConnector(topicName);
115-
11697
Properties producerProps = new Properties();
11798
producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, topicName);
11899
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.bootstrapServers());
@@ -127,16 +108,18 @@ void testSinkSurvivesARestart() {
127108
producer.commitTransaction();
128109

129110
assertProduced(50, topicName);
130-
assertEquals(50, getCollection().countDocuments());
111+
assertEquals(50, getCollection(collectionName).countDocuments(), collectionName);
131112

132-
restartSinkConnector(topicName);
113+
if (restartConnector) {
114+
restartSinkConnector(topicName);
115+
}
133116

134117
producer.beginTransaction();
135118
tweets.stream().filter(t -> t.getId$1() >= 50).forEach(tweet -> producer.send(new ProducerRecord<>(topicName, tweet)));
136119
producer.commitTransaction();
137120

138121
assertProduced(100, topicName);
139-
assertEquals(100, getCollection().countDocuments());
122+
assertEquals(100, getCollection(collectionName).countDocuments());
140123
}
141124
}
142125
}

src/main/java/com/mongodb/kafka/connect/MongoSinkConnector.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package com.mongodb.kafka.connect;
2020

21+
import static com.mongodb.kafka.connect.util.ConnectionValidator.getConfigByName;
2122
import static com.mongodb.kafka.connect.util.ConnectionValidator.validateCanConnect;
2223
import static com.mongodb.kafka.connect.util.ConnectionValidator.validateUserHasActions;
2324
import static java.util.Arrays.asList;
@@ -34,7 +35,6 @@
3435
import com.mongodb.kafka.connect.sink.MongoSinkConfig;
3536
import com.mongodb.kafka.connect.sink.MongoSinkTask;
3637
import com.mongodb.kafka.connect.sink.MongoSinkTopicConfig;
37-
import com.mongodb.kafka.connect.source.MongoSourceConfig;
3838

3939
public class MongoSinkConnector extends SinkConnector {
4040
private static final List<String> REQUIRED_SINK_ACTIONS = asList("insert", "update", "remove");
@@ -83,15 +83,23 @@ public Config validate(final Map<String, String> connectorConfigs) {
8383
validateCanConnect(config, MongoSinkConfig.CONNECTION_URI_CONFIG)
8484
.ifPresent(client -> {
8585
try {
86-
sinkConfig.getTopics().forEach(topic -> {
86+
sinkConfig.getTopics().ifPresent(topics -> topics.forEach(topic -> {
8787
MongoSinkTopicConfig mongoSinkTopicConfig = sinkConfig.getMongoSinkTopicConfig(topic);
8888
validateUserHasActions(client,
8989
sinkConfig.getConnectionString().getCredential(),
9090
REQUIRED_SINK_ACTIONS,
91-
mongoSinkTopicConfig.getString(MongoSourceConfig.DATABASE_CONFIG),
92-
mongoSinkTopicConfig.getString(MongoSourceConfig.COLLECTION_CONFIG),
93-
MongoSourceConfig.CONNECTION_URI_CONFIG, config);
91+
mongoSinkTopicConfig.getString(MongoSinkTopicConfig.DATABASE_CONFIG),
92+
mongoSinkTopicConfig.getString(MongoSinkTopicConfig.COLLECTION_CONFIG),
93+
MongoSinkConfig.CONNECTION_URI_CONFIG, config);
9494

95+
}));
96+
sinkConfig.getTopicRegex().ifPresent(regex -> {
97+
validateUserHasActions(client,
98+
sinkConfig.getConnectionString().getCredential(),
99+
REQUIRED_SINK_ACTIONS,
100+
getConfigByName(config, MongoSinkTopicConfig.DATABASE_CONFIG).map(c -> (String) c.value()).orElse(""),
101+
getConfigByName(config, MongoSinkTopicConfig.COLLECTION_CONFIG).map(c -> (String) c.value()).orElse(""),
102+
MongoSinkConfig.CONNECTION_URI_CONFIG, config);
95103
});
96104
} catch (Exception e) {
97105
// Ignore

0 commit comments

Comments
 (0)