Skip to content

Commit 14ceb8a

Browse files
authored
Merge pull request #89 from m8719-github/stop-operation-support
Stop connector support
2 parents e5d5888 + 38ca531 commit 14ceb8a

File tree

3 files changed

+74
-0
lines changed

3 files changed

+74
-0
lines changed

src/main/java/org/sourcelab/kafka/connect/apiclient/KafkaConnectClient.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import org.sourcelab.kafka.connect.apiclient.request.post.PostConnectorTaskRestart;
5757
import org.sourcelab.kafka.connect.apiclient.request.put.PutConnectorConfig;
5858
import org.sourcelab.kafka.connect.apiclient.request.put.PutConnectorPause;
59+
import org.sourcelab.kafka.connect.apiclient.request.put.PutConnectorStop;
5960
import org.sourcelab.kafka.connect.apiclient.request.put.PutConnectorPluginConfigValidate;
6061
import org.sourcelab.kafka.connect.apiclient.request.put.PutConnectorResume;
6162
import org.sourcelab.kafka.connect.apiclient.request.put.PutConnectorTopicsReset;
@@ -284,6 +285,17 @@ public Boolean pauseConnector(final String connectorName) {
284285
return submitRequest(new PutConnectorPause(connectorName));
285286
}
286287

288+
/**
289+
* Stop a connector.
290+
* https://docs.confluent.io/current/connect/references/restapi.html#put--connectors-(string-name)-stop
291+
*
292+
* @param connectorName Name of connector to stop.
293+
* @return Boolean true if success.
294+
*/
295+
public Boolean stopConnector(final String connectorName) {
296+
return submitRequest(new PutConnectorStop(connectorName));
297+
}
298+
287299
/**
288300
* Resume a connector.
289301
* https://docs.confluent.io/current/connect/references/restapi.html#put--connectors-(string-name)-resume
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/**
2+
* Copyright 2018, 2019, 2020, 2021 SourceLab.org https://github.com/SourceLabOrg/kafka-connect-client
3+
*
4+
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
5+
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
6+
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit
7+
* persons to whom the Software is furnished to do so, subject to the following conditions:
8+
*
9+
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of the
10+
* Software.
11+
*
12+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
13+
* WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
14+
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
15+
* OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
16+
*/
17+
18+
package org.sourcelab.kafka.connect.apiclient.request.put;
19+
20+
import org.sourcelab.kafka.connect.apiclient.util.UrlEscapingUtil;
21+
22+
import java.io.IOException;
23+
import java.util.Objects;
24+
25+
/**
26+
* Defines request to stop a connector.
27+
*/
28+
public final class PutConnectorStop implements PutRequest<Boolean> {
29+
private final String connectorName;
30+
31+
/**
32+
* Constructor.
33+
* @param connectorName Name of connector
34+
*/
35+
public PutConnectorStop(final String connectorName) {
36+
Objects.requireNonNull(connectorName);
37+
this.connectorName = connectorName;
38+
}
39+
40+
@Override
41+
public String getApiEndpoint() {
42+
return "/connectors/" + UrlEscapingUtil.escapePath(connectorName) + "/stop";
43+
}
44+
45+
@Override
46+
public Object getRequestBody() {
47+
return null;
48+
}
49+
50+
@Override
51+
public Boolean parseResponse(final String responseStr) throws IOException {
52+
return true;
53+
}
54+
}

src/test/java/org/sourcelab/kafka/connect/apiclient/KafkaConnectClientTest.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,14 @@ public void testPauseConnector() {
192192
logger.info("Result: {}", kafkaConnectClient.pauseConnector(connectorName));
193193
}
194194

195+
/**
196+
* Test stopping a connector.
197+
*/
198+
@Test
199+
public void testStopConnector() {
200+
logger.info("Result: {}", kafkaConnectClient.stopConnector(connectorName));
201+
}
202+
195203
/**
196204
* Test pausing a connector.
197205
*/

0 commit comments

Comments
 (0)