Skip to content

Commit dc5bc03

Browse files
committed
follow up
1 parent 687cecf commit dc5bc03

File tree

11 files changed

+40
-39
lines changed

11 files changed

+40
-39
lines changed

core/src/main/scala/kafka/server/KafkaApis.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ import org.apache.kafka.coordinator.share.ShareCoordinator
6262
import org.apache.kafka.metadata.{ConfigRepository, MetadataCache}
6363
import org.apache.kafka.server.{ApiVersionManager, ClientMetricsManager, DelegationTokenManager, ProcessRole}
6464
import org.apache.kafka.server.authorizer._
65-
import org.apache.kafka.server.common.{GroupVersion, RequestLocal, TransactionVersion}
65+
import org.apache.kafka.server.common.{GroupVersion, RequestLocal, StreamsVersion, TransactionVersion}
6666
import org.apache.kafka.server.config.DelegationTokenManagerConfigs
6767
import org.apache.kafka.server.share.context.ShareFetchContext
6868
import org.apache.kafka.server.share.{ErroneousAndValidPartitionData, SharePartitionKey}
@@ -2631,11 +2631,15 @@ class KafkaApis(val requestChannel: RequestChannel,
26312631
}
26322632
}
26332633
}
2634+
}
26342635

2636+
private def streamsGroupVersion(): StreamsVersion = {
2637+
StreamsVersion.fromFeatureLevel(metadataCache.features.finalizedFeatures.getOrDefault(StreamsVersion.FEATURE_NAME, 0.toShort))
26352638
}
26362639

26372640
private def isStreamsGroupProtocolEnabled: Boolean = {
2638-
config.groupCoordinatorRebalanceProtocols.contains(Group.GroupType.STREAMS)
2641+
config.groupCoordinatorRebalanceProtocols.contains(Group.GroupType.STREAMS) &&
2642+
streamsGroupVersion().streamsGroupSupported
26392643
}
26402644

26412645
def handleStreamsGroupHeartbeat(request: RequestChannel.Request): CompletableFuture[Unit] = {

core/src/main/scala/kafka/server/KafkaConfig.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -383,10 +383,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
383383
warn(s"Share groups and the new '${GroupType.SHARE}' rebalance protocol are enabled. " +
384384
"This is part of the early access of KIP-932 and MUST NOT be used in production.")
385385
}
386-
if (protocols.contains(GroupType.STREAMS)) {
387-
warn(s"Streams groups and the new '${GroupType.STREAMS}' rebalance protocol are enabled. " +
388-
"This is part of the early access of KIP-1071 and MUST NOT be used in production.")
389-
}
390386
protocols
391387
}
392388

core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.kafka.common.protocol.ApiKeys
2525
import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse, RequestUtils}
2626
import org.apache.kafka.common.test.ClusterInstance
2727
import org.apache.kafka.common.utils.Utils
28-
import org.apache.kafka.server.common.{EligibleLeaderReplicasVersion, GroupVersion, MetadataVersion, ShareVersion, StreamsGroupVersion, TransactionVersion}
28+
import org.apache.kafka.server.common.{EligibleLeaderReplicasVersion, GroupVersion, MetadataVersion, ShareVersion, StreamsVersion, TransactionVersion}
2929
import org.apache.kafka.test.TestUtils
3030
import org.junit.jupiter.api.Assertions._
3131
import org.junit.jupiter.api.Tag
@@ -89,8 +89,8 @@ abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) {
8989
assertEquals(0, apiVersionsResponse.data().supportedFeatures().find(ShareVersion.FEATURE_NAME).minVersion())
9090
assertEquals(ShareVersion.SV_1.featureLevel(), apiVersionsResponse.data().supportedFeatures().find(ShareVersion.FEATURE_NAME).maxVersion())
9191

92-
assertEquals(0, apiVersionsResponse.data().supportedFeatures().find(StreamsGroupVersion.FEATURE_NAME).minVersion())
93-
assertEquals(StreamsGroupVersion.SGV_1.featureLevel(), apiVersionsResponse.data().supportedFeatures().find(StreamsGroupVersion.FEATURE_NAME).maxVersion())
92+
assertEquals(0, apiVersionsResponse.data().supportedFeatures().find(StreamsVersion.FEATURE_NAME).minVersion())
93+
assertEquals(StreamsVersion.SV_1.featureLevel(), apiVersionsResponse.data().supportedFeatures().find(StreamsVersion.FEATURE_NAME).maxVersion())
9494
}
9595
val expectedApis = if (cluster.controllerListenerName().toScala.contains(listenerName)) {
9696
ApiVersionsResponse.collectApis(

server-common/src/main/java/org/apache/kafka/server/common/Feature.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public enum Feature {
4848
GROUP_VERSION(GroupVersion.FEATURE_NAME, GroupVersion.values(), GroupVersion.LATEST_PRODUCTION),
4949
ELIGIBLE_LEADER_REPLICAS_VERSION(EligibleLeaderReplicasVersion.FEATURE_NAME, EligibleLeaderReplicasVersion.values(), EligibleLeaderReplicasVersion.LATEST_PRODUCTION),
5050
SHARE_VERSION(ShareVersion.FEATURE_NAME, ShareVersion.values(), ShareVersion.LATEST_PRODUCTION),
51-
STREAMS_GROUP_VERSION(StreamsGroupVersion.FEATURE_NAME, StreamsGroupVersion.values(), StreamsGroupVersion.LATEST_PRODUCTION),
51+
STREAMS_VERSION(StreamsVersion.FEATURE_NAME, StreamsVersion.values(), StreamsVersion.LATEST_PRODUCTION),
5252

5353
/**
5454
* Features defined only for unit tests and are not used in production.

server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,8 +130,15 @@ public enum MetadataVersion {
130130
// *** SHARE GROUPS BECOME PRODUCTION-READY IN THE FUTURE. ITS DEFINITION ALLOWS A SHARE ***
131131
// *** GROUPS FEATURE TO BE DEFINED IN 4.1 BUT TURNED OFF BY DEFAULT, ABLE TO BE TURNED ON ***
132132
// *** DYNAMICALLY TO TRY OUT THE PREVIEW CAPABILITY. ***
133-
IBP_4_2_IV0(29, "4.2", "IV0", false);
133+
IBP_4_2_IV0(29, "4.2", "IV0", false),
134134

135+
// Enables "streams" groups by default for new clusters (KIP-1071).
136+
//
137+
// *** THIS IS A PLACEHOLDER UNSTABLE VERSION WHICH IS USED TO DEFINE THE POINT AT WHICH ***
138+
// *** STREAMS GROUPS BECOME PRODUCTION-READY IN THE FUTURE. ITS DEFINITION ALLOWS A STREAMS ***
139+
// *** GROUPS FEATURE TO BE DEFINED IN 4.1 BUT TURNED OFF BY DEFAULT, ABLE TO BE TURNED ON ***
140+
// *** DYNAMICALLY TO TRY OUT THE EARLY ACCESS CAPABILITY. ***
141+
IBP_4_2_IV1(30, "4.2", "IV1", false);
135142

136143
// NOTES when adding a new version:
137144
// Update the default version in @ClusterTest annotation to point to the latest version
@@ -151,7 +158,7 @@ public enum MetadataVersion {
151158
* <strong>Think carefully before you update this value. ONCE A METADATA VERSION IS PRODUCTION,
152159
* IT CANNOT BE CHANGED.</strong>
153160
*/
154-
public static final MetadataVersion LATEST_PRODUCTION = IBP_4_0_IV3;
161+
public static final MetadataVersion LATEST_PRODUCTION = IBP_4_0_IV3; // when do we update this to `IBP_4_1_IV2`+
155162
// If you change the value above please also update
156163
// LATEST_STABLE_METADATA_VERSION version in tests/kafkatest/version.py
157164

server-common/src/main/java/org/apache/kafka/server/common/StreamsGroupVersion.java renamed to server-common/src/main/java/org/apache/kafka/server/common/StreamsVersion.java

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,24 +18,28 @@
1818

1919
import java.util.Map;
2020

21-
public enum StreamsGroupVersion implements FeatureVersion {
21+
public enum StreamsVersion implements FeatureVersion {
2222

23-
// Version 0 does disable "streams" groups (KIP-1071).
24-
SGV_0(0, MetadataVersion.MINIMUM_VERSION, Map.of()),
23+
// Version 0 keeps "streams" groups disabled (KIP-1071).
24+
SV_0(0, MetadataVersion.MINIMUM_VERSION, Map.of()),
2525

2626
// Version 1 enables "streams" groups (KIP-1071).
27-
SGV_1(1, MetadataVersion.IBP_4_1_IV2, Map.of());
27+
// Using metadata version IBP_4_2_IV1 disables it in AK 4.1 release, and enables it in AK 4.2 release.
28+
// - in AK 4.1, this can be enabled as "early access [unstable]"
29+
// - in AK 4.2, it is planned to go GA (cf `LATEST_PRODUCTION`)
30+
SV_1(1, MetadataVersion.IBP_4_2_IV1, Map.of());
2831

29-
public static final String FEATURE_NAME = "streams.group.version";
32+
public static final String FEATURE_NAME = "streams.version";
3033

31-
// Disabled by default in 4.1 (early access only).
32-
public static final StreamsGroupVersion LATEST_PRODUCTION = SGV_0;
34+
// Mark "streams" group as unstable in AK 4.1 release
35+
// Needs to be updated to SV_1 in AK 4.2, to mark as stable
36+
public static final StreamsVersion LATEST_PRODUCTION = SV_0;
3337

3438
private final short featureLevel;
3539
private final MetadataVersion bootstrapMetadataVersion;
3640
private final Map<String, Short> dependencies;
3741

38-
StreamsGroupVersion(
42+
StreamsVersion(
3943
int featureLevel,
4044
MetadataVersion bootstrapMetadataVersion,
4145
Map<String, Short> dependencies
@@ -66,13 +70,13 @@ public Map<String, Short> dependencies() {
6670
}
6771

6872
public boolean streamsGroupSupported() {
69-
return featureLevel >= SGV_1.featureLevel;
73+
return featureLevel >= SV_1.featureLevel;
7074
}
7175

72-
public static StreamsGroupVersion fromFeatureLevel(short version) {
76+
public static StreamsVersion fromFeatureLevel(short version) {
7377
return switch (version) {
74-
case 0 -> SGV_0;
75-
case 1 -> SGV_1;
78+
case 0 -> SV_0;
79+
case 1 -> SV_1;
7680
default -> throw new RuntimeException("Unknown streams group feature level: " + (int) version);
7781
};
7882
}

server/src/test/java/org/apache/kafka/server/BrokerFeaturesTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
import static org.apache.kafka.server.common.Feature.ELIGIBLE_LEADER_REPLICAS_VERSION;
3131
import static org.apache.kafka.server.common.Feature.GROUP_VERSION;
3232
import static org.apache.kafka.server.common.Feature.SHARE_VERSION;
33-
import static org.apache.kafka.server.common.Feature.STREAMS_GROUP_VERSION;
33+
import static org.apache.kafka.server.common.Feature.STREAMS_VERSION;
3434
import static org.apache.kafka.server.common.Feature.TRANSACTION_VERSION;
3535
import static org.junit.jupiter.api.Assertions.assertEquals;
3636
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -100,7 +100,7 @@ public void testDefaultFinalizedFeatures() {
100100
GROUP_VERSION.featureName(), GROUP_VERSION.latestTesting(),
101101
ELIGIBLE_LEADER_REPLICAS_VERSION.featureName(), ELIGIBLE_LEADER_REPLICAS_VERSION.latestTesting(),
102102
SHARE_VERSION.featureName(), SHARE_VERSION.latestTesting(),
103-
STREAMS_GROUP_VERSION.featureName(), STREAMS_GROUP_VERSION.latestTesting(),
103+
STREAMS_VERSION.featureName(), STREAMS_VERSION.latestTesting(),
104104
"kraft.version", (short) 0,
105105
"test_feature_1", (short) 4,
106106
"test_feature_2", (short) 3,

streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@
7575
@Timeout(600)
7676
@Tag("integration")
7777
public class InternalTopicIntegrationTest {
78-
public static final EmbeddedKafkaCluster CLUSTER = EmbeddedKafkaCluster.withStreamsRebalanceProtocol(1);
78+
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
7979

8080
@BeforeAll
8181
public static void startCluster() throws IOException, InterruptedException {

streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151
@Timeout(600)
5252
@Tag("integration")
5353
public class SmokeTestDriverIntegrationTest {
54-
public static final EmbeddedKafkaCluster CLUSTER = EmbeddedKafkaCluster.withStreamsRebalanceProtocol(3);
54+
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3);
5555
public TestInfo testInfo;
5656

5757
@BeforeAll

streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@
5757
public class StandbyTaskCreationIntegrationTest {
5858
private static final int NUM_BROKERS = 1;
5959

60-
public static final EmbeddedKafkaCluster CLUSTER = EmbeddedKafkaCluster.withStreamsRebalanceProtocol(NUM_BROKERS);
60+
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
6161

6262
private String safeTestName;
6363

streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -143,16 +143,6 @@ public EmbeddedKafkaCluster(final int numBrokers,
143143
this.time = new MockTime(mockTimeMillisStart, mockTimeNanoStart);
144144
}
145145

146-
public static EmbeddedKafkaCluster withStreamsRebalanceProtocol(final int numBrokers) {
147-
return withStreamsRebalanceProtocol(numBrokers, new Properties());
148-
}
149-
150-
public static EmbeddedKafkaCluster withStreamsRebalanceProtocol(final int numBrokers, final Properties props) {
151-
props.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "classic,consumer,streams");
152-
props.setProperty(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, "true");
153-
return new EmbeddedKafkaCluster(numBrokers, props);
154-
}
155-
156146
public void start() {
157147
try {
158148
cluster.format();

0 commit comments

Comments
 (0)