Skip to content

Commit f7e6be2

Browse files
authored
feat: allow specifying a default replication factor (#35)
1 parent aadbfb2 commit f7e6be2

27 files changed

+290
-30
lines changed

docs/specification.md

+9-3
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,12 @@
22

33
This document describes the specification for how to write your Kafka cluster's desired state file. This currently must be a `YAML` file.
44

5-
?> Current version: `1.0.2`
5+
?> Current version: `1.0.3`
66

77
The desired state file consists of:
88

99
- **Settings** [Optional]: Specific settings for configuring `kafka-gitops`.
10-
- **Topics** [Optional]: Topics and topic configuration definitions.
10+
- **Topics** [Optional]: Topic and topic configuration definitions.
1111
- **Services** [Optional]: Service definitions for generating ACLs.
1212
- **Users** [Optional]: User definitions for generating ACLs.
1313
- **Custom Service ACLs** [Optional]: Definitions for custom, non-generated ACLs.
@@ -20,14 +20,18 @@ The desired state file consists of:
2020
**Options**:
2121

2222
- **ccloud** [Optional]: An object which contains an `enabled` field. Set this to true if using a Confluent Cloud cluster.
23-
- **topics** [Optional]: Add a prefixed topic blacklist for ignoring specific topics when using `kafka-gitops`. This allows topics to be ignored from being deleted if they are not defined in the desired state file.
23+
- **topics** [Optional]:
24+
- **defaults** [Optional]: Specify topic defaults so you don't need to specify them for every topic in the state file. Currently, only replication is supported.
25+
- **blacklist** [Optional]: Add a prefixed topic blacklist for ignoring specific topics when using `kafka-gitops`. This allows topics to be ignored from being deleted if they are not defined in the desired state file.
2426

2527
**Example**:
2628
```yaml
2729
settings:
2830
ccloud:
2931
enabled: true
3032
topics:
33+
defaults:
34+
replication: 3
3135
blacklist:
3236
prefixed:
3337
- _confluent
@@ -51,6 +55,8 @@ topics:
5155
segment.bytes: 1000000
5256
```
5357
58+
If a default `replication` value is supplied in the `settings` block, then the `replication` field can be omitted. If a default `replication` value is provided and the `replication` field in the topic definition is set, the default will be overridden for that topic.
59+
5460
## Services
5561

5662
**Synopsis**: Define the services that will utilize your Kafka cluster. These service definitions allow `kafka-gitops` to generate ACLs for you. Yay!

src/main/java/com/devshawn/kafka/gitops/StateManager.java

+34-2
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import com.devshawn.kafka.gitops.domain.state.CustomAclDetails;
1111
import com.devshawn.kafka.gitops.domain.state.DesiredState;
1212
import com.devshawn.kafka.gitops.domain.state.DesiredStateFile;
13+
import com.devshawn.kafka.gitops.domain.state.TopicDetails;
1314
import com.devshawn.kafka.gitops.domain.state.service.KafkaStreamsService;
1415
import com.devshawn.kafka.gitops.exception.ConfluentCloudException;
1516
import com.devshawn.kafka.gitops.exception.InvalidAclDefinitionException;
@@ -23,6 +24,7 @@
2324
import com.devshawn.kafka.gitops.service.ParserService;
2425
import com.devshawn.kafka.gitops.service.RoleService;
2526
import com.devshawn.kafka.gitops.util.LogUtil;
27+
import com.devshawn.kafka.gitops.util.StateUtil;
2628
import com.fasterxml.jackson.core.JsonParser;
2729
import com.fasterxml.jackson.databind.DeserializationFeature;
2830
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -65,6 +67,7 @@ public StateManager(ManagerConfig managerConfig, ParserService parserService) {
6567

6668
public DesiredStateFile getAndValidateStateFile() {
6769
DesiredStateFile desiredStateFile = parserService.parseStateFile();
70+
validateTopics(desiredStateFile);
6871
validateCustomAcls(desiredStateFile);
6972
return desiredStateFile;
7073
}
@@ -131,8 +134,9 @@ private void createServiceAccount(String name, List<ServiceAccount> serviceAccou
131134
private DesiredState getDesiredState() {
132135
DesiredStateFile desiredStateFile = getAndValidateStateFile();
133136
DesiredState.Builder desiredState = new DesiredState.Builder()
134-
.addAllPrefixedTopicsToIgnore(getPrefixedTopicsToIgnore(desiredStateFile))
135-
.putAllTopics(desiredStateFile.getTopics());
137+
.addAllPrefixedTopicsToIgnore(getPrefixedTopicsToIgnore(desiredStateFile));
138+
139+
generateTopicsState(desiredState, desiredStateFile);
136140

137141
if (isConfluentCloudEnabled(desiredStateFile)) {
138142
generateConfluentCloudServiceAcls(desiredState, desiredStateFile);
@@ -145,6 +149,18 @@ private DesiredState getDesiredState() {
145149
return desiredState.build();
146150
}
147151

152+
private void generateTopicsState(DesiredState.Builder desiredState, DesiredStateFile desiredStateFile) {
153+
Optional<Integer> defaultReplication = StateUtil.fetchReplication(desiredStateFile);
154+
if (defaultReplication.isPresent()) {
155+
desiredStateFile.getTopics().forEach((name, details) -> {
156+
Integer replication = details.getReplication().isPresent() ? details.getReplication().get() : defaultReplication.get();
157+
desiredState.putTopics(name, new TopicDetails.Builder().mergeFrom(details).setReplication(replication).build());
158+
});
159+
} else {
160+
desiredState.putAllTopics(desiredStateFile.getTopics());
161+
}
162+
}
163+
148164
private void generateConfluentCloudServiceAcls(DesiredState.Builder desiredState, DesiredStateFile desiredStateFile) {
149165
List<ServiceAccount> serviceAccounts = confluentCloudService.getServiceAccounts();
150166
desiredStateFile.getServices().forEach((name, service) -> {
@@ -278,6 +294,22 @@ private void validateCustomAcls(DesiredStateFile desiredStateFile) {
278294
});
279295
}
280296

297+
private void validateTopics(DesiredStateFile desiredStateFile) {
298+
Optional<Integer> defaultReplication = StateUtil.fetchReplication(desiredStateFile);
299+
if (!defaultReplication.isPresent()) {
300+
desiredStateFile.getTopics().forEach((name, details) -> {
301+
if (!details.getReplication().isPresent()) {
302+
throw new ValidationException(String.format("Not set: [replication] in state file definition: topics -> %s", name));
303+
}
304+
});
305+
} else {
306+
if (defaultReplication.get() < 1) {
307+
throw new ValidationException("The default replication factor must be a positive integer.");
308+
}
309+
}
310+
311+
}
312+
281313
private boolean isConfluentCloudEnabled(DesiredStateFile desiredStateFile) {
282314
if (desiredStateFile.getSettings().isPresent() && desiredStateFile.getSettings().get().getCcloud().isPresent()) {
283315
return desiredStateFile.getSettings().get().getCcloud().get().isEnabled();

src/main/java/com/devshawn/kafka/gitops/domain/state/TopicDetails.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,15 @@
44
import org.inferred.freebuilder.FreeBuilder;
55

66
import java.util.Map;
7+
import java.util.Optional;
78

89
@FreeBuilder
910
@JsonDeserialize(builder = TopicDetails.Builder.class)
1011
public interface TopicDetails {
1112

1213
Integer getPartitions();
1314

14-
Integer getReplication();
15+
Optional<Integer> getReplication();
1516

1617
Map<String, String> getConfigs();
1718

src/main/java/com/devshawn/kafka/gitops/domain/state/settings/SettingsTopics.java

+2
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
@JsonDeserialize(builder = SettingsTopics.Builder.class)
1010
public interface SettingsTopics {
1111

12+
Optional<SettingsTopicsDefaults> getDefaults();
13+
1214
Optional<SettingsTopicsBlacklist> getBlacklist();
1315

1416
class Builder extends SettingsTopics_Builder {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package com.devshawn.kafka.gitops.domain.state.settings;
2+
3+
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
4+
import org.inferred.freebuilder.FreeBuilder;
5+
6+
import java.util.Optional;
7+
8+
@FreeBuilder
9+
@JsonDeserialize(builder = SettingsTopicsDefaults.Builder.class)
10+
public interface SettingsTopicsDefaults {
11+
12+
Optional<Integer> getReplication();
13+
14+
class Builder extends SettingsTopicsDefaults_Builder {
15+
}
16+
}

src/main/java/com/devshawn/kafka/gitops/service/KafkaService.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.Collections;
2626
import java.util.List;
2727
import java.util.Map;
28+
import java.util.NoSuchElementException;
2829
import java.util.concurrent.ExecutionException;
2930
import java.util.stream.Collectors;
3031

@@ -65,10 +66,10 @@ public void deleteAcl(AclBinding aclBinding) {
6566

6667
public void createTopic(String topicName, TopicDetails topicDetails) {
6768
try (final AdminClient adminClient = buildAdminClient()) {
68-
NewTopic newTopic = new NewTopic(topicName, topicDetails.getPartitions(), topicDetails.getReplication().shortValue());
69+
NewTopic newTopic = new NewTopic(topicName, topicDetails.getPartitions(), topicDetails.getReplication().get().shortValue());
6970
newTopic.configs(topicDetails.getConfigs());
7071
adminClient.createTopics(Collections.singletonList(newTopic)).all().get();
71-
} catch (InterruptedException | ExecutionException ex) {
72+
} catch (InterruptedException | ExecutionException | NoSuchElementException ex) {
7273
throw new KafkaExecutionException("Error thrown when attempting to create a Kafka topic", ex.getMessage());
7374
}
7475
}

src/main/java/com/devshawn/kafka/gitops/util/LogUtil.java

+15-6
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
package com.devshawn.kafka.gitops.util;
22

3-
import com.devshawn.kafka.gitops.domain.plan.*;
3+
import com.devshawn.kafka.gitops.domain.plan.AclPlan;
4+
import com.devshawn.kafka.gitops.domain.plan.DesiredPlan;
5+
import com.devshawn.kafka.gitops.domain.plan.PlanOverview;
6+
import com.devshawn.kafka.gitops.domain.plan.TopicConfigPlan;
7+
import com.devshawn.kafka.gitops.domain.plan.TopicPlan;
48
import com.devshawn.kafka.gitops.domain.state.AclDetails;
59
import com.devshawn.kafka.gitops.domain.state.TopicDetails;
610
import com.devshawn.kafka.gitops.enums.PlanAction;
7-
import com.devshawn.kafka.gitops.exception.InvalidAclDefinitionException;
811
import com.devshawn.kafka.gitops.exception.KafkaExecutionException;
912
import com.devshawn.kafka.gitops.exception.WritePlanOutputException;
1013
import picocli.CommandLine;
@@ -43,6 +46,7 @@ private static void printTopicPlan(TopicPlan topicPlan) {
4346
break;
4447
case UPDATE:
4548
System.out.println(yellow(String.format("~ [TOPIC] %s", topicPlan.getName())));
49+
System.out.println(yellow("\t~ configs:"));
4650
topicPlan.getTopicConfigPlans().forEach(LogUtil::printTopicConfigPlan);
4751
System.out.println("\n");
4852
break;
@@ -54,19 +58,24 @@ private static void printTopicPlan(TopicPlan topicPlan) {
5458
}
5559

5660
private static void printTopicConfigPlanForNewTopics(TopicDetails topicDetails) {
57-
topicDetails.getConfigs().forEach((key, value) -> System.out.println(green(String.format("\t+ %s: %s", key, value))));
61+
System.out.println(green(String.format("\t+ partitions: %s", topicDetails.getPartitions())));
62+
System.out.println(green(String.format("\t+ replication: %s", topicDetails.getReplication().get())));
63+
if (topicDetails.getConfigs().size() > 0) {
64+
System.out.println(green("\t+ configs:"));
65+
topicDetails.getConfigs().forEach((key, value) -> System.out.println(green(String.format("\t\t+ %s: %s", key, value))));
66+
}
5867
}
5968

6069
private static void printTopicConfigPlan(TopicConfigPlan topicConfigPlan) {
6170
switch (topicConfigPlan.getAction()) {
6271
case ADD:
63-
System.out.println(green(String.format("\t+ %s: %s", topicConfigPlan.getKey(), topicConfigPlan.getValue().get())));
72+
System.out.println(green(String.format("\t\t+ %s: %s", topicConfigPlan.getKey(), topicConfigPlan.getValue().get())));
6473
break;
6574
case UPDATE:
66-
System.out.println(yellow(String.format("\t~ %s: %s", topicConfigPlan.getKey(), topicConfigPlan.getValue().get())));
75+
System.out.println(yellow(String.format("\t\t~ %s: %s", topicConfigPlan.getKey(), topicConfigPlan.getValue().get())));
6776
break;
6877
case REMOVE:
69-
System.out.println(red(String.format("\t- %s", topicConfigPlan.getKey())));
78+
System.out.println(red(String.format("\t\t- %s", topicConfigPlan.getKey())));
7079
break;
7180
}
7281
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package com.devshawn.kafka.gitops.util;
2+
3+
import com.devshawn.kafka.gitops.domain.state.DesiredStateFile;
4+
5+
import java.util.Optional;
6+
7+
public class StateUtil {
8+
9+
public static Optional<Integer> fetchReplication(DesiredStateFile desiredStateFile) {
10+
if (desiredStateFile.getSettings().isPresent() && desiredStateFile.getSettings().get().getTopics().isPresent()
11+
&& desiredStateFile.getSettings().get().getTopics().get().getDefaults().isPresent()) {
12+
return desiredStateFile.getSettings().get().getTopics().get().getDefaults().get().getReplication();
13+
}
14+
return Optional.empty();
15+
}
16+
}

src/test/groovy/com/devshawn/kafka/gitops/PlanCommandIntegrationSpec.groovy

+7-2
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,9 @@ class PlanCommandIntegrationSpec extends Specification {
6363
"custom-group-id-connect",
6464
"custom-application-id-streams",
6565
"custom-storage-topic",
66-
"custom-storage-topics"
66+
"custom-storage-topics",
67+
"default-replication",
68+
"default-replication-multiple"
6769
]
6870
}
6971

@@ -101,6 +103,7 @@ class PlanCommandIntegrationSpec extends Specification {
101103
"seed-topic-modification-3" | false
102104
"seed-topic-modification-no-delete" | true
103105
"seed-acl-exists" | true
106+
"seed-blacklist-topics" | false
104107
}
105108

106109
void 'test invalid plans - #planName'() {
@@ -133,7 +136,9 @@ class PlanCommandIntegrationSpec extends Specification {
133136
"unrecognized-property",
134137
"invalid-format",
135138
"invalid-missing-user-principal",
136-
"invalid-storage-topics"
139+
"invalid-storage-topics",
140+
"invalid-default-replication-1",
141+
"invalid-default-replication-2"
137142
]
138143
}
139144

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
{
2+
"topicPlans": [
3+
{
4+
"name": "test-topic",
5+
"action": "ADD",
6+
"topicDetails": {
7+
"partitions": 6,
8+
"replication": 3,
9+
"configs": {}
10+
},
11+
"topicConfigPlans": []
12+
},
13+
{
14+
"name": "another-topic",
15+
"action": "ADD",
16+
"topicDetails": {
17+
"partitions": 3,
18+
"replication": 1,
19+
"configs": {}
20+
},
21+
"topicConfigPlans": []
22+
},
23+
{
24+
"name": "last-topic",
25+
"action": "ADD",
26+
"topicDetails": {
27+
"partitions": 3,
28+
"replication": 4,
29+
"configs": {}
30+
},
31+
"topicConfigPlans": []
32+
}
33+
],
34+
"aclPlans": []
35+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
settings:
2+
topics:
3+
defaults:
4+
replication: 3
5+
6+
topics:
7+
test-topic:
8+
partitions: 6
9+
10+
another-topic:
11+
partitions: 3
12+
replication: 1
13+
14+
last-topic:
15+
partitions: 3
16+
replication: 4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
{
2+
"topicPlans": [
3+
{
4+
"name": "test-topic",
5+
"action": "ADD",
6+
"topicDetails": {
7+
"partitions": 6,
8+
"replication": 1,
9+
"configs": {}
10+
},
11+
"topicConfigPlans": []
12+
}
13+
],
14+
"aclPlans": []
15+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
settings:
2+
topics:
3+
defaults:
4+
replication: 1
5+
6+
topics:
7+
test-topic:
8+
partitions: 6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Generating execution plan...
2+
3+
[INVALID] The default replication factor must be a positive integer.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
settings:
2+
topics:
3+
defaults:
4+
replication: -1
5+
6+
topics:
7+
test-topic:
8+
partitions: 6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Generating execution plan...
2+
3+
[INVALID] Not set: [replication] in state file definition: topics -> test-topic
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
topics:
2+
test-topic:
3+
partitions: 6

src/test/resources/plans/multi-file-apply-output.txt

+2
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ Executing apply...
33
Applying: [CREATE]
44

55
+ [TOPIC] test-topic
6+
+ partitions: 6
7+
+ replication: 1
68

79

810
Successfully applied.

0 commit comments

Comments
 (0)