Skip to content

Commit 4335027

Browse files
KAFKA-19156: Streamlined share group configs, with usage in ShareSessionCache (#19505)
This PR removes the group.share.max.groups config. This config was used to calculate the maximum size of share session cache. But with the new config group.share.max.share.sessions in place with exactly this purpose, the ShareSessionCache initialization has also been passed the new config. Refer: [KAFKA-19156](https://issues.apache.org/jira/browse/KAFKA-19156) Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Andrew Schofield <aschofield@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
1 parent b088ba7 commit 4335027

File tree

5 files changed

+16
-42
lines changed

5 files changed

+16
-42
lines changed

core/src/main/scala/kafka/server/BrokerServer.scala

+1-3
Original file line numberDiff line numberDiff line change
@@ -426,9 +426,7 @@ class BrokerServer(
426426
))
427427
val fetchManager = new FetchManager(Time.SYSTEM, new FetchSessionCache(fetchSessionCacheShards))
428428

429-
val shareFetchSessionCache : ShareSessionCache = new ShareSessionCache(
430-
config.shareGroupConfig.shareGroupMaxGroups * config.groupCoordinatorConfig.shareGroupMaxSize
431-
)
429+
val shareFetchSessionCache : ShareSessionCache = new ShareSessionCache(config.shareGroupConfig.shareGroupMaxShareSessions())
432430

433431
sharePartitionManager = new SharePartitionManager(
434432
replicaManager,

core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala

-1
Original file line numberDiff line numberDiff line change
@@ -1026,7 +1026,6 @@ class KafkaConfigTest {
10261026
case ShareGroupConfig.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
10271027
case ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
10281028
case ShareGroupConfig.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
1029-
case ShareGroupConfig.SHARE_GROUP_MAX_GROUPS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
10301029
case GroupCoordinatorConfig.SHARE_GROUP_MAX_SIZE_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
10311030
case ShareGroupConfig.SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")
10321031
case ShareGroupConfig.SHARE_GROUP_MAX_SHARE_SESSIONS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java

+7-14
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import static org.apache.kafka.common.config.ConfigDef.Range.between;
3333
import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
3434
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
35-
import static org.apache.kafka.common.config.ConfigDef.Type.SHORT;
3635
import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
3736

3837
public class ShareGroupConfig {
@@ -51,10 +50,6 @@ public class ShareGroupConfig {
5150
public static final int SHARE_GROUP_DELIVERY_COUNT_LIMIT_DEFAULT = 5;
5251
public static final String SHARE_GROUP_DELIVERY_COUNT_LIMIT_DOC = "The maximum number of delivery attempts for a record delivered to a share group.";
5352

54-
public static final String SHARE_GROUP_MAX_GROUPS_CONFIG = "group.share.max.groups";
55-
public static final short SHARE_GROUP_MAX_GROUPS_DEFAULT = 10;
56-
public static final String SHARE_GROUP_MAX_GROUPS_DOC = "The maximum number of share groups.";
57-
5853
public static final String SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG = "group.share.record.lock.duration.ms";
5954
public static final int SHARE_GROUP_RECORD_LOCK_DURATION_MS_DEFAULT = 30000;
6055
public static final String SHARE_GROUP_RECORD_LOCK_DURATION_MS_DOC = "The record acquisition lock duration in milliseconds for share groups.";
@@ -86,7 +81,6 @@ public class ShareGroupConfig {
8681
.define(SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG, INT, SHARE_GROUP_RECORD_LOCK_DURATION_MS_DEFAULT, between(1000, 3600000), MEDIUM, SHARE_GROUP_RECORD_LOCK_DURATION_MS_DOC)
8782
.define(SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG, INT, SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_DEFAULT, between(1000, 30000), MEDIUM, SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_DOC)
8883
.define(SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG, INT, SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_DEFAULT, between(30000, 3600000), MEDIUM, SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_DOC)
89-
.define(SHARE_GROUP_MAX_GROUPS_CONFIG, SHORT, SHARE_GROUP_MAX_GROUPS_DEFAULT, between(1, 100), MEDIUM, SHARE_GROUP_MAX_GROUPS_DOC)
9084
.define(SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG, INT, SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_DEFAULT, between(100, 10000), MEDIUM, SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_DOC)
9185
.define(SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG, INT, SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_DEFAULT, MEDIUM, SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_DOC)
9286
.define(SHARE_GROUP_MAX_SHARE_SESSIONS_CONFIG, INT, SHARE_GROUP_MAX_SHARE_SESSIONS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MAX_SHARE_SESSIONS_DOC)
@@ -95,25 +89,25 @@ public class ShareGroupConfig {
9589
private final boolean isShareGroupEnabled;
9690
private final int shareGroupPartitionMaxRecordLocks;
9791
private final int shareGroupDeliveryCountLimit;
98-
private final short shareGroupMaxGroups;
9992
private final int shareGroupRecordLockDurationMs;
10093
private final int shareGroupMaxRecordLockDurationMs;
10194
private final int shareGroupMinRecordLockDurationMs;
10295
private final int shareFetchPurgatoryPurgeIntervalRequests;
10396
private final int shareGroupMaxShareSessions;
10497
private final String shareGroupPersisterClassName;
98+
private final AbstractConfig config;
10599

106100
public ShareGroupConfig(AbstractConfig config) {
107-
// Share groups are enabled in two cases:
108-
// 1. The internal configuration to enable it is explicitly set
101+
this.config = config;
102+
// Share groups are enabled in either of the two following cases:
103+
// 1. The internal configuration to enable it is explicitly set; or
109104
// 2. the share rebalance protocol is enabled.
110105
Set<String> protocols = config.getList(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG)
111106
.stream().map(String::toUpperCase).collect(Collectors.toSet());
112107
isShareGroupEnabled = config.getBoolean(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG) ||
113108
protocols.contains(GroupType.SHARE.name());
114109
shareGroupPartitionMaxRecordLocks = config.getInt(ShareGroupConfig.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG);
115110
shareGroupDeliveryCountLimit = config.getInt(ShareGroupConfig.SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG);
116-
shareGroupMaxGroups = config.getShort(ShareGroupConfig.SHARE_GROUP_MAX_GROUPS_CONFIG);
117111
shareGroupRecordLockDurationMs = config.getInt(ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG);
118112
shareGroupMaxRecordLockDurationMs = config.getInt(ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG);
119113
shareGroupMinRecordLockDurationMs = config.getInt(ShareGroupConfig.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG);
@@ -136,10 +130,6 @@ public int shareGroupDeliveryCountLimit() {
136130
return shareGroupDeliveryCountLimit;
137131
}
138132

139-
public short shareGroupMaxGroups() {
140-
return shareGroupMaxGroups;
141-
}
142-
143133
public int shareGroupRecordLockDurationMs() {
144134
return shareGroupRecordLockDurationMs;
145135
}
@@ -171,6 +161,9 @@ private void validate() {
171161
Utils.require(shareGroupMaxRecordLockDurationMs >= shareGroupRecordLockDurationMs,
172162
String.format("%s must be greater than or equal to %s",
173163
SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG, SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG));
164+
Utils.require(shareGroupMaxShareSessions >= config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MAX_SIZE_CONFIG),
165+
String.format("%s must be greater than or equal to %s",
166+
SHARE_GROUP_MAX_SHARE_SESSIONS_CONFIG, GroupCoordinatorConfig.SHARE_GROUP_MAX_SIZE_CONFIG));
174167
}
175168

176169
/**

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ public class GroupConfigTest {
4141
private static final boolean SHARE_GROUP_ENABLE = true;
4242
private static final int SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS = 200;
4343
private static final int SHARE_GROUP_DELIVERY_COUNT_LIMIT = 5;
44-
private static final short SHARE_GROUP_MAX_GROUPS = 10;
4544
private static final int SHARE_GROUP_RECORD_LOCK_DURATION_MS = 30000;
4645
private static final int SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS = 15000;
4746
private static final int SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS = 60000;
@@ -284,6 +283,6 @@ private GroupCoordinatorConfig createGroupCoordinatorConfig() {
284283

285284
private ShareGroupConfig createShareGroupConfig() {
286285
return ShareGroupConfigTest.createShareGroupConfig(SHARE_GROUP_ENABLE, SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS, SHARE_GROUP_DELIVERY_COUNT_LIMIT,
287-
SHARE_GROUP_MAX_GROUPS, SHARE_GROUP_RECORD_LOCK_DURATION_MS, SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS, SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS);
286+
SHARE_GROUP_RECORD_LOCK_DURATION_MS, SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS, SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS);
288287
}
289288
}

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java

+7-22
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ public void testConfigs() {
3939
configs.put(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG, true);
4040
configs.put(ShareGroupConfig.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG, 200);
4141
configs.put(ShareGroupConfig.SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG, 5);
42-
configs.put(ShareGroupConfig.SHARE_GROUP_MAX_GROUPS_CONFIG, (short) 10);
4342
configs.put(ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG, 30000);
4443
configs.put(ShareGroupConfig.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG, 15000);
4544
configs.put(ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG, 60000);
@@ -51,7 +50,6 @@ public void testConfigs() {
5150
assertTrue(config.isShareGroupEnabled());
5251
assertEquals(200, config.shareGroupPartitionMaxRecordLocks());
5352
assertEquals(5, config.shareGroupDeliveryCountLimit());
54-
assertEquals(10, config.shareGroupMaxGroups());
5553
assertEquals(30000, config.shareGroupRecordLockDurationMs());
5654
assertEquals(15000, config.shareGroupMinRecordLockDurationMs());
5755
assertEquals(60000, config.shareGroupMaxRecordLockDurationMs());
@@ -89,24 +87,6 @@ public void testInvalidConfigs() {
8987
assertEquals("Invalid value 11 for configuration group.share.delivery.count.limit: Value must be no more than 10",
9088
assertThrows(ConfigException.class, () -> createConfig(configs)).getMessage());
9189

92-
configs.clear();
93-
// test for when SHARE_GROUP_MAX_GROUPS_CONFIG is of incorrect data type
94-
configs.put(ShareGroupConfig.SHARE_GROUP_MAX_GROUPS_CONFIG, 10);
95-
assertEquals("Invalid value 10 for configuration group.share.max.groups: Expected value to be a 16-bit integer (short), but it was a java.lang.Integer",
96-
assertThrows(ConfigException.class, () -> createConfig(configs)).getMessage());
97-
98-
configs.clear();
99-
// test for when SHARE_GROUP_MAX_GROUPS_CONFIG is out of bounds
100-
configs.put(ShareGroupConfig.SHARE_GROUP_MAX_GROUPS_CONFIG, (short) 0);
101-
assertEquals("Invalid value 0 for configuration group.share.max.groups: Value must be at least 1",
102-
assertThrows(ConfigException.class, () -> createConfig(configs)).getMessage());
103-
104-
configs.clear();
105-
// test for when SHARE_GROUP_MAX_GROUPS_CONFIG is out of bounds
106-
configs.put(ShareGroupConfig.SHARE_GROUP_MAX_GROUPS_CONFIG, (short) 110);
107-
assertEquals("Invalid value 110 for configuration group.share.max.groups: Value must be no more than 100",
108-
assertThrows(ConfigException.class, () -> createConfig(configs)).getMessage());
109-
11090
configs.clear();
11191
// test for when SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG is out of bounds
11292
configs.put(ShareGroupConfig.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG, 50);
@@ -118,13 +98,19 @@ public void testInvalidConfigs() {
11898
configs.put(ShareGroupConfig.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG, 20000);
11999
assertEquals("Invalid value 20000 for configuration group.share.partition.max.record.locks: Value must be no more than 10000",
120100
assertThrows(ConfigException.class, () -> createConfig(configs)).getMessage());
101+
102+
configs.clear();
103+
// test for when SHARE_GROUP_MAX_SHARE_SESSIONS_CONFIG is less than SHARE_GROUP_MAX_SIZE_CONFIG
104+
configs.put(GroupCoordinatorConfig.SHARE_GROUP_MAX_SIZE_CONFIG, 2000);
105+
configs.put(ShareGroupConfig.SHARE_GROUP_MAX_SHARE_SESSIONS_CONFIG, 1000);
106+
assertEquals("Invalid value 2000 for configuration group.share.max.size: Value must be no more than 1000",
107+
assertThrows(ConfigException.class, () -> createConfig(configs)).getMessage());
121108
}
122109

123110
public static ShareGroupConfig createShareGroupConfig(
124111
boolean shareGroupEnable,
125112
int shareGroupPartitionMaxRecordLocks,
126113
int shareGroupDeliveryCountLimit,
127-
short shareGroupsMaxGroups,
128114
int shareGroupRecordLockDurationsMs,
129115
int shareGroupMinRecordLockDurationMs,
130116
int shareGroupMaxRecordLockDurationMs
@@ -133,7 +119,6 @@ public static ShareGroupConfig createShareGroupConfig(
133119
configs.put(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG, shareGroupEnable);
134120
configs.put(ShareGroupConfig.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG, shareGroupPartitionMaxRecordLocks);
135121
configs.put(ShareGroupConfig.SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG, shareGroupDeliveryCountLimit);
136-
configs.put(ShareGroupConfig.SHARE_GROUP_MAX_GROUPS_CONFIG, shareGroupsMaxGroups);
137122
configs.put(ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG, shareGroupRecordLockDurationsMs);
138123
configs.put(ShareGroupConfig.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG, shareGroupMinRecordLockDurationMs);
139124
configs.put(ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG, shareGroupMaxRecordLockDurationMs);

0 commit comments

Comments
 (0)