@@ -88,7 +88,7 @@ import org.apache.kafka.raft.QuorumConfig
88
88
import org.apache.kafka.security.authorizer.AclEntry
89
89
import org.apache.kafka.server.{ClientMetricsManager, SimpleApiVersionManager}
90
90
import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer}
91
- import org.apache.kafka.server.common.{FeatureVersion, FinalizedFeatures, GroupVersion, KRaftVersion, MetadataVersion, RequestLocal, TransactionVersion}
91
+ import org.apache.kafka.server.common.{FeatureVersion, FinalizedFeatures, GroupVersion, KRaftVersion, MetadataVersion, RequestLocal, StreamsVersion, TransactionVersion}
92
92
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs}
93
93
import org.apache.kafka.server.metrics.ClientMetricsTestUtils
94
94
import org.apache.kafka.server.share.{CachedSharePartition, ErroneousAndValidPartitionData, SharePartitionKey}
@@ -9902,7 +9902,11 @@ class KafkaApisTest extends Logging {
9902
9902
9903
9903
@Test
9904
9904
def testStreamsGroupHeartbeatRequest(): Unit = {
9905
+ val features = mock(classOf[FinalizedFeatures])
9906
+ when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME, 1.toShort))
9907
+
9905
9908
metadataCache = mock(classOf[KRaftMetadataCache])
9909
+ when(metadataCache.features()).thenReturn(features)
9906
9910
9907
9911
val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group")
9908
9912
@@ -9913,9 +9917,7 @@ class KafkaApisTest extends Logging {
9913
9917
requestChannelRequest.context,
9914
9918
streamsGroupHeartbeatRequest
9915
9919
)).thenReturn(future)
9916
- kafkaApis = createKafkaApis(
9917
- overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams")
9918
- )
9920
+ kafkaApis = createKafkaApis()
9919
9921
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
9920
9922
9921
9923
val streamsGroupHeartbeatResponse = new StreamsGroupHeartbeatResponseData()
@@ -9928,7 +9930,12 @@ class KafkaApisTest extends Logging {
9928
9930
9929
9931
@Test
9930
9932
def testStreamsGroupHeartbeatRequestWithAuthorizedTopology(): Unit = {
9933
+ val features = mock(classOf[FinalizedFeatures])
9934
+ when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME, 1.toShort))
9935
+
9931
9936
metadataCache = mock(classOf[KRaftMetadataCache])
9937
+ when(metadataCache.features()).thenReturn(features)
9938
+
9932
9939
val groupId = "group"
9933
9940
val fooTopicName = "foo"
9934
9941
val barTopicName = "bar"
@@ -9979,8 +9986,7 @@ class KafkaApisTest extends Logging {
9979
9986
streamsGroupHeartbeatRequest
9980
9987
)).thenReturn(future)
9981
9988
kafkaApis = createKafkaApis(
9982
- authorizer = Some(authorizer),
9983
- overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams")
9989
+ authorizer = Some(authorizer)
9984
9990
)
9985
9991
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
9986
9992
@@ -9994,7 +10000,11 @@ class KafkaApisTest extends Logging {
9994
10000
9995
10001
@Test
9996
10002
def testStreamsGroupHeartbeatRequestFutureFailed(): Unit = {
10003
+ val features = mock(classOf[FinalizedFeatures])
10004
+ when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME, 1.toShort))
10005
+
9997
10006
metadataCache = mock(classOf[KRaftMetadataCache])
10007
+ when(metadataCache.features()).thenReturn(features)
9998
10008
9999
10009
val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group")
10000
10010
@@ -10005,9 +10015,7 @@ class KafkaApisTest extends Logging {
10005
10015
requestChannelRequest.context,
10006
10016
streamsGroupHeartbeatRequest
10007
10017
)).thenReturn(future)
10008
- kafkaApis = createKafkaApis(
10009
- overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams")
10010
- )
10018
+ kafkaApis = createKafkaApis()
10011
10019
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
10012
10020
10013
10021
future.completeExceptionally(Errors.FENCED_MEMBER_EPOCH.exception)
@@ -10017,7 +10025,11 @@ class KafkaApisTest extends Logging {
10017
10025
10018
10026
@Test
10019
10027
def testStreamsGroupHeartbeatRequestGroupAuthorizationFailed(): Unit = {
10028
+ val features = mock(classOf[FinalizedFeatures])
10029
+ when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME, 1.toShort))
10030
+
10020
10031
metadataCache = mock(classOf[KRaftMetadataCache])
10032
+ when(metadataCache.features()).thenReturn(features)
10021
10033
10022
10034
val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group")
10023
10035
@@ -10027,8 +10039,7 @@ class KafkaApisTest extends Logging {
10027
10039
when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
10028
10040
.thenReturn(Seq(AuthorizationResult.DENIED).asJava)
10029
10041
kafkaApis = createKafkaApis(
10030
- authorizer = Some(authorizer),
10031
- overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams")
10042
+ authorizer = Some(authorizer)
10032
10043
)
10033
10044
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
10034
10045
@@ -10038,7 +10049,12 @@ class KafkaApisTest extends Logging {
10038
10049
10039
10050
@Test
10040
10051
def testStreamsGroupHeartbeatRequestTopicAuthorizationFailed(): Unit = {
10052
+ val features = mock(classOf[FinalizedFeatures])
10053
+ when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME, 1.toShort))
10054
+
10041
10055
metadataCache = mock(classOf[KRaftMetadataCache])
10056
+ when(metadataCache.features()).thenReturn(features)
10057
+
10042
10058
val groupId = "group"
10043
10059
val fooTopicName = "foo"
10044
10060
val barTopicName = "bar"
@@ -10079,8 +10095,7 @@ class KafkaApisTest extends Logging {
10079
10095
}
10080
10096
10081
10097
kafkaApis = createKafkaApis(
10082
- authorizer = Some(authorizer),
10083
- overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams")
10098
+ authorizer = Some(authorizer)
10084
10099
)
10085
10100
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
10086
10101
@@ -10089,7 +10104,7 @@ class KafkaApisTest extends Logging {
10089
10104
}
10090
10105
10091
10106
@Test
10092
- def testStreamsGroupHeartbeatRequestProtocolDisabled (): Unit = {
10107
+ def testStreamsGroupHeartbeatRequestProtocolDisabledViaConfig (): Unit = {
10093
10108
metadataCache = mock(classOf[KRaftMetadataCache])
10094
10109
10095
10110
val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group")
@@ -10105,9 +10120,32 @@ class KafkaApisTest extends Logging {
10105
10120
assertEquals(Errors.UNSUPPORTED_VERSION.code, response.data.errorCode)
10106
10121
}
10107
10122
10123
+ @Test
10124
+ def testStreamsGroupHeartbeatRequestProtocolDisabledViaFeature(): Unit = {
10125
+ val features = mock(classOf[FinalizedFeatures])
10126
+ when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME, 0.toShort))
10127
+
10128
+ metadataCache = mock(classOf[KRaftMetadataCache])
10129
+ when(metadataCache.features()).thenReturn(features)
10130
+
10131
+ val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group")
10132
+
10133
+ val requestChannelRequest = buildRequest(new StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, true).build())
10134
+
10135
+ kafkaApis = createKafkaApis()
10136
+ kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
10137
+
10138
+ val response = verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest)
10139
+ assertEquals(Errors.UNSUPPORTED_VERSION.code, response.data.errorCode)
10140
+ }
10141
+
10108
10142
@Test
10109
10143
def testStreamsGroupHeartbeatRequestInvalidTopicNames(): Unit = {
10144
+ val features = mock(classOf[FinalizedFeatures])
10145
+ when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME, 1.toShort))
10146
+
10110
10147
metadataCache = mock(classOf[KRaftMetadataCache])
10148
+ when(metadataCache.features()).thenReturn(features)
10111
10149
10112
10150
val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group").setTopology(
10113
10151
new StreamsGroupHeartbeatRequestData.Topology()
@@ -10124,9 +10162,7 @@ class KafkaApisTest extends Logging {
10124
10162
10125
10163
val requestChannelRequest = buildRequest(new StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, true).build())
10126
10164
10127
- kafkaApis = createKafkaApis(
10128
- overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams")
10129
- )
10165
+ kafkaApis = createKafkaApis()
10130
10166
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
10131
10167
10132
10168
val response = verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest)
@@ -10136,7 +10172,11 @@ class KafkaApisTest extends Logging {
10136
10172
10137
10173
@Test
10138
10174
def testStreamsGroupHeartbeatRequestInternalTopicNames(): Unit = {
10175
+ val features = mock(classOf[FinalizedFeatures])
10176
+ when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME, 1.toShort))
10177
+
10139
10178
metadataCache = mock(classOf[KRaftMetadataCache])
10179
+ when(metadataCache.features()).thenReturn(features)
10140
10180
10141
10181
val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group").setTopology(
10142
10182
new StreamsGroupHeartbeatRequestData.Topology()
@@ -10152,9 +10192,7 @@ class KafkaApisTest extends Logging {
10152
10192
10153
10193
val requestChannelRequest = buildRequest(new StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, true).build())
10154
10194
10155
- kafkaApis = createKafkaApis(
10156
- overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams")
10157
- )
10195
+ kafkaApis = createKafkaApis()
10158
10196
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
10159
10197
10160
10198
val response = verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest)
@@ -10164,7 +10202,11 @@ class KafkaApisTest extends Logging {
10164
10202
10165
10203
@Test
10166
10204
def testStreamsGroupHeartbeatRequestWithInternalTopicsToCreate(): Unit = {
10205
+ val features = mock(classOf[FinalizedFeatures])
10206
+ when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME, 1.toShort))
10207
+
10167
10208
metadataCache = mock(classOf[KRaftMetadataCache])
10209
+ when(metadataCache.features()).thenReturn(features)
10168
10210
10169
10211
val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group");
10170
10212
@@ -10176,9 +10218,7 @@ class KafkaApisTest extends Logging {
10176
10218
streamsGroupHeartbeatRequest
10177
10219
)).thenReturn(future)
10178
10220
10179
- kafkaApis = createKafkaApis(
10180
- overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams")
10181
- )
10221
+ kafkaApis = createKafkaApis()
10182
10222
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
10183
10223
10184
10224
val missingTopics = Map("test" -> new CreatableTopic())
@@ -10193,7 +10233,11 @@ class KafkaApisTest extends Logging {
10193
10233
10194
10234
@Test
10195
10235
def testStreamsGroupHeartbeatRequestWithInternalTopicsToCreateMissingCreateACL(): Unit = {
10236
+ val features = mock(classOf[FinalizedFeatures])
10237
+ when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME, 1.toShort))
10238
+
10196
10239
metadataCache = mock(classOf[KRaftMetadataCache])
10240
+ when(metadataCache.features()).thenReturn(features)
10197
10241
10198
10242
val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group");
10199
10243
@@ -10219,8 +10263,7 @@ class KafkaApisTest extends Logging {
10219
10263
}.asJava
10220
10264
})
10221
10265
kafkaApis = createKafkaApis(
10222
- authorizer = Some(authorizer),
10223
- overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams")
10266
+ authorizer = Some(authorizer)
10224
10267
)
10225
10268
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
10226
10269
@@ -10414,7 +10457,12 @@ class KafkaApisTest extends Logging {
10414
10457
@ParameterizedTest
10415
10458
@ValueSource(booleans = Array(true, false))
10416
10459
def testStreamsGroupDescribe(includeAuthorizedOperations: Boolean): Unit = {
10460
+ val features = mock(classOf[FinalizedFeatures])
10461
+ when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME, 1.toShort))
10462
+
10417
10463
metadataCache = mock(classOf[KRaftMetadataCache])
10464
+ when(metadataCache.features()).thenReturn(features)
10465
+
10418
10466
val fooTopicName = "foo"
10419
10467
val barTopicName = "bar"
10420
10468
@@ -10429,9 +10477,7 @@ class KafkaApisTest extends Logging {
10429
10477
any[RequestContext],
10430
10478
any[util.List[String]]
10431
10479
)).thenReturn(future)
10432
- kafkaApis = createKafkaApis(
10433
- overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams")
10434
- )
10480
+ kafkaApis = createKafkaApis()
10435
10481
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
10436
10482
10437
10483
val subtopology0 = new StreamsGroupDescribeResponseData.Subtopology()
@@ -10522,7 +10568,11 @@ class KafkaApisTest extends Logging {
10522
10568
10523
10569
@Test
10524
10570
def testStreamsGroupDescribeAuthorizationFailed(): Unit = {
10571
+ val features = mock(classOf[FinalizedFeatures])
10572
+ when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME, 1.toShort))
10573
+
10525
10574
metadataCache = mock(classOf[KRaftMetadataCache])
10575
+ when(metadataCache.features()).thenReturn(features)
10526
10576
10527
10577
val streamsGroupDescribeRequestData = new StreamsGroupDescribeRequestData()
10528
10578
streamsGroupDescribeRequestData.groupIds.add("group-id")
@@ -10539,8 +10589,7 @@ class KafkaApisTest extends Logging {
10539
10589
)).thenReturn(future)
10540
10590
future.complete(List().asJava)
10541
10591
kafkaApis = createKafkaApis(
10542
- authorizer = Some(authorizer),
10543
- overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams")
10592
+ authorizer = Some(authorizer)
10544
10593
)
10545
10594
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
10546
10595
@@ -10550,7 +10599,11 @@ class KafkaApisTest extends Logging {
10550
10599
10551
10600
@Test
10552
10601
def testStreamsGroupDescribeFutureFailed(): Unit = {
10602
+ val features = mock(classOf[FinalizedFeatures])
10603
+ when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME, 1.toShort))
10604
+
10553
10605
metadataCache = mock(classOf[KRaftMetadataCache])
10606
+ when(metadataCache.features()).thenReturn(features)
10554
10607
10555
10608
val streamsGroupDescribeRequestData = new StreamsGroupDescribeRequestData()
10556
10609
streamsGroupDescribeRequestData.groupIds.add("group-id")
@@ -10561,9 +10614,7 @@ class KafkaApisTest extends Logging {
10561
10614
any[RequestContext],
10562
10615
any[util.List[String]]
10563
10616
)).thenReturn(future)
10564
- kafkaApis = createKafkaApis(
10565
- overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams")
10566
- )
10617
+ kafkaApis = createKafkaApis()
10567
10618
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
10568
10619
10569
10620
future.completeExceptionally(Errors.FENCED_MEMBER_EPOCH.exception)
@@ -10578,7 +10629,11 @@ class KafkaApisTest extends Logging {
10578
10629
val barTopicName = "bar"
10579
10630
val errorMessage = "The described group uses topics that the client is not authorized to describe."
10580
10631
10632
+ val features = mock(classOf[FinalizedFeatures])
10633
+ when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME, 1.toShort))
10634
+
10581
10635
metadataCache = mock(classOf[KRaftMetadataCache])
10636
+ when(metadataCache.features()).thenReturn(features)
10582
10637
10583
10638
val groupIds = List("group-id-0", "group-id-1", "group-id-2").asJava
10584
10639
val streamsGroupDescribeRequestData = new StreamsGroupDescribeRequestData()
@@ -10610,8 +10665,7 @@ class KafkaApisTest extends Logging {
10610
10665
any[util.List[String]]
10611
10666
)).thenReturn(future)
10612
10667
kafkaApis = createKafkaApis(
10613
- authorizer = Some(authorizer),
10614
- overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams")
10668
+ authorizer = Some(authorizer)
10615
10669
)
10616
10670
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
10617
10671
0 commit comments