Skip to content

KAFKA-19159: Removed time based evictions for share sessions #19500

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Apr 22, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -448,8 +448,7 @@ public ShareFetchContext newContext(String groupId, List<TopicIdPartition> share
ImplicitLinkedHashCollection<>(shareFetchData.size());
shareFetchData.forEach(topicIdPartition ->
cachedSharePartitions.mustAdd(new CachedSharePartition(topicIdPartition, false)));
ShareSessionKey responseShareSessionKey = cache.maybeCreateSession(groupId, reqMetadata.memberId(),
time.milliseconds(), cachedSharePartitions);
ShareSessionKey responseShareSessionKey = cache.maybeCreateSession(groupId, reqMetadata.memberId(), cachedSharePartitions);
if (responseShareSessionKey == null) {
log.error("Could not create a share session for group {} member {}", groupId, reqMetadata.memberId());
throw Errors.SHARE_SESSION_NOT_FOUND.exception();
Expand All @@ -476,7 +475,7 @@ public ShareFetchContext newContext(String groupId, List<TopicIdPartition> share
}
Map<ShareSession.ModifiedTopicIdPartitionType, List<TopicIdPartition>> modifiedTopicIdPartitions = shareSession.update(
shareFetchData, toForget);
cache.touch(shareSession, time.milliseconds());
cache.updateNumPartitions(shareSession);
shareSession.epoch = ShareRequestMetadata.nextEpoch(shareSession.epoch);
log.debug("Created a new ShareSessionContext for session key {}, epoch {}: " +
"added {}, updated {}, removed {}", shareSession.key(), shareSession.epoch,
Expand Down Expand Up @@ -517,7 +516,7 @@ public void acknowledgeSessionUpdate(String groupId, ShareRequestMetadata reqMet
shareSession.epoch, reqMetadata.epoch());
throw Errors.INVALID_SHARE_SESSION_EPOCH.exception();
}
cache.touch(shareSession, time.milliseconds());
cache.updateNumPartitions(shareSession);
shareSession.epoch = ShareRequestMetadata.nextEpoch(shareSession.epoch);
}
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -427,8 +427,8 @@ class BrokerServer(
val fetchManager = new FetchManager(Time.SYSTEM, new FetchSessionCache(fetchSessionCacheShards))

val shareFetchSessionCache : ShareSessionCache = new ShareSessionCache(
config.shareGroupConfig.shareGroupMaxGroups * config.groupCoordinatorConfig.shareGroupMaxSize,
KafkaBroker.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS)
config.shareGroupConfig.shareGroupMaxGroups * config.groupCoordinatorConfig.shareGroupMaxSize
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will replace this with new configuration in subsequent PRs, correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review. A PR is already in review to take care of this.
#19505

)

sharePartitionManager = new SharePartitionManager(
replicaManager,
Expand Down
119 changes: 12 additions & 107 deletions core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -185,7 +184,7 @@ public void tearDown() throws Exception {

@Test
public void testNewContextReturnsFinalContextWithoutRequestData() {
ShareSessionCache cache = new ShareSessionCache(10, 1000);
ShareSessionCache cache = new ShareSessionCache(10);
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withCache(cache)
.build();
Expand All @@ -212,7 +211,7 @@ public void testNewContextReturnsFinalContextWithoutRequestData() {

@Test
public void testNewContextReturnsFinalContextWithRequestData() {
ShareSessionCache cache = new ShareSessionCache(10, 1000);
ShareSessionCache cache = new ShareSessionCache(10);
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withCache(cache)
.build();
Expand Down Expand Up @@ -244,7 +243,7 @@ public void testNewContextReturnsFinalContextWithRequestData() {

@Test
public void testNewContextReturnsFinalContextWhenTopicPartitionsArePresentInRequestData() {
ShareSessionCache cache = new ShareSessionCache(10, 1000);
ShareSessionCache cache = new ShareSessionCache(10);
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withCache(cache)
.build();
Expand Down Expand Up @@ -275,7 +274,7 @@ public void testNewContextReturnsFinalContextWhenTopicPartitionsArePresentInRequ

@Test
public void testNewContext() {
ShareSessionCache cache = new ShareSessionCache(10, 1000);
ShareSessionCache cache = new ShareSessionCache(10);
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withCache(cache)
.build();
Expand Down Expand Up @@ -372,100 +371,6 @@ public void testNewContext() {
assertEquals(0, cache.size());
}

@Test
public void testShareSessionExpiration() {
ShareSessionCache cache = new ShareSessionCache(2, 1000);
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withCache(cache)
.withTime(time)
.build();

Map<Uuid, String> topicNames = new HashMap<>();
Uuid fooId = Uuid.randomUuid();
topicNames.put(fooId, "foo");
TopicIdPartition foo0 = new TopicIdPartition(fooId, new TopicPartition("foo", 0));
TopicIdPartition foo1 = new TopicIdPartition(fooId, new TopicPartition("foo", 1));

// Create a new share session, session 1
List<TopicIdPartition> session1req = List.of(foo0, foo1);

String groupId = "grp";
ShareRequestMetadata reqMetadata1 = new ShareRequestMetadata(Uuid.randomUuid(), ShareRequestMetadata.INITIAL_EPOCH);

ShareFetchContext session1context = sharePartitionManager.newContext(groupId, session1req, EMPTY_PART_LIST, reqMetadata1, false);
assertInstanceOf(ShareSessionContext.class, session1context);

LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData1 = new LinkedHashMap<>();
respData1.put(foo0, new ShareFetchResponseData.PartitionData().setPartitionIndex(foo0.partition()));
respData1.put(foo1, new ShareFetchResponseData.PartitionData().setPartitionIndex(foo1.partition()));

ShareFetchResponse session1resp = session1context.updateAndGenerateResponseData(groupId, reqMetadata1.memberId(), respData1);
assertEquals(Errors.NONE, session1resp.error());
assertEquals(2, session1resp.responseData(topicNames).size());

ShareSessionKey session1Key = new ShareSessionKey(groupId, reqMetadata1.memberId());
// check share session entered into cache
assertNotNull(cache.get(session1Key));

time.sleep(500);

// Create a second new share session
List<TopicIdPartition> session2req = List.of(foo0, foo1);

ShareRequestMetadata reqMetadata2 = new ShareRequestMetadata(Uuid.randomUuid(), ShareRequestMetadata.INITIAL_EPOCH);

ShareFetchContext session2context = sharePartitionManager.newContext(groupId, session2req, EMPTY_PART_LIST, reqMetadata2, false);
assertInstanceOf(ShareSessionContext.class, session2context);

LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData2 = new LinkedHashMap<>();
respData2.put(foo0, new ShareFetchResponseData.PartitionData().setPartitionIndex(foo0.partition()));
respData2.put(foo1, new ShareFetchResponseData.PartitionData().setPartitionIndex(foo1.partition()));

ShareFetchResponse session2resp = session2context.updateAndGenerateResponseData(groupId, reqMetadata2.memberId(), respData2);
assertEquals(Errors.NONE, session2resp.error());
assertEquals(2, session2resp.responseData(topicNames).size());

ShareSessionKey session2Key = new ShareSessionKey(groupId, reqMetadata2.memberId());

// both newly created entries are present in cache
assertNotNull(cache.get(session1Key));
assertNotNull(cache.get(session2Key));

time.sleep(500);

// Create a subsequent share fetch context for session 1
ShareFetchContext session1context2 = sharePartitionManager.newContext(groupId, List.of(), EMPTY_PART_LIST,
new ShareRequestMetadata(reqMetadata1.memberId(), 1), true);
assertInstanceOf(ShareSessionContext.class, session1context2);

// total sleep time will now be large enough that share session 1 will be evicted if not correctly touched
time.sleep(501);

// create one final share session to test that the least recently used entry is evicted
// the second share session should be evicted because the first share session was incrementally fetched
// more recently than the second session was created
List<TopicIdPartition> session3req = List.of(foo0, foo1);

ShareRequestMetadata reqMetadata3 = new ShareRequestMetadata(Uuid.randomUuid(), ShareRequestMetadata.INITIAL_EPOCH);

ShareFetchContext session3context = sharePartitionManager.newContext(groupId, session3req, EMPTY_PART_LIST, reqMetadata3, false);

LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData3 = new LinkedHashMap<>();
respData3.put(foo0, new ShareFetchResponseData.PartitionData().setPartitionIndex(foo0.partition()));
respData3.put(foo1, new ShareFetchResponseData.PartitionData().setPartitionIndex(foo1.partition()));

ShareFetchResponse session3resp = session3context.updateAndGenerateResponseData(groupId, reqMetadata3.memberId(), respData3);
assertEquals(Errors.NONE, session3resp.error());
assertEquals(2, session3resp.responseData(topicNames).size());

ShareSessionKey session3Key = new ShareSessionKey(groupId, reqMetadata3.memberId());

assertNotNull(cache.get(session1Key));
assertNull(cache.get(session2Key), "share session 2 should have been evicted by latest share session, " +
"as share session 1 was used more recently");
assertNotNull(cache.get(session3Key));
}

@Test
public void testSubsequentShareSession() {
sharePartitionManager = SharePartitionManagerBuilder.builder().build();
Expand Down Expand Up @@ -530,7 +435,7 @@ public void testSubsequentShareSession() {

@Test
public void testZeroSizeShareSession() {
ShareSessionCache cache = new ShareSessionCache(10, 1000);
ShareSessionCache cache = new ShareSessionCache(10);
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withCache(cache)
.build();
Expand Down Expand Up @@ -576,7 +481,7 @@ public void testZeroSizeShareSession() {
@Test
public void testToForgetPartitions() {
String groupId = "grp";
ShareSessionCache cache = new ShareSessionCache(10, 1000);
ShareSessionCache cache = new ShareSessionCache(10);
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withCache(cache)
.build();
Expand Down Expand Up @@ -614,7 +519,7 @@ public void testToForgetPartitions() {
@Test
public void testShareSessionUpdateTopicIdsBrokerSide() {
String groupId = "grp";
ShareSessionCache cache = new ShareSessionCache(10, 1000);
ShareSessionCache cache = new ShareSessionCache(10);
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withCache(cache)
.build();
Expand Down Expand Up @@ -665,7 +570,7 @@ public void testShareSessionUpdateTopicIdsBrokerSide() {

@Test
public void testGetErroneousAndValidTopicIdPartitions() {
ShareSessionCache cache = new ShareSessionCache(10, 1000);
ShareSessionCache cache = new ShareSessionCache(10);
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withCache(cache)
.build();
Expand Down Expand Up @@ -758,7 +663,7 @@ public void testGetErroneousAndValidTopicIdPartitions() {

@Test
public void testShareFetchContextResponseSize() {
ShareSessionCache cache = new ShareSessionCache(10, 1000);
ShareSessionCache cache = new ShareSessionCache(10);
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withCache(cache)
.build();
Expand Down Expand Up @@ -859,7 +764,7 @@ public void testShareFetchContextResponseSize() {

@Test
public void testCachedTopicPartitionsWithNoTopicPartitions() {
ShareSessionCache cache = new ShareSessionCache(10, 1000);
ShareSessionCache cache = new ShareSessionCache(10);
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withCache(cache)
.build();
Expand All @@ -870,7 +775,7 @@ public void testCachedTopicPartitionsWithNoTopicPartitions() {

@Test
public void testCachedTopicPartitionsForValidShareSessions() {
ShareSessionCache cache = new ShareSessionCache(10, 1000);
ShareSessionCache cache = new ShareSessionCache(10);
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withCache(cache)
.build();
Expand Down Expand Up @@ -3108,7 +3013,7 @@ static class SharePartitionManagerBuilder {
private final Persister persister = new NoOpStatePersister();
private ReplicaManager replicaManager = mock(ReplicaManager.class);
private Time time = new MockTime();
private ShareSessionCache cache = new ShareSessionCache(10, 1000);
private ShareSessionCache cache = new ShareSessionCache(10);
private Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
private Timer timer = new MockTimer();
private ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(time);
Expand Down
36 changes: 16 additions & 20 deletions core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4046,9 +4046,8 @@ class KafkaApisTest extends Logging {

when(sharePartitionManager.newContext(any(), any(), any(), any(), any())).thenThrow(
Errors.INVALID_REQUEST.exception()
).thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 1), new ShareSession(
new ShareSessionKey(groupId, memberId), cachedSharePartitions, 0L, 0L, 2
)))
).thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 1),
new ShareSession(new ShareSessionKey(groupId, memberId), cachedSharePartitions, 2)))

when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
Expand Down Expand Up @@ -4299,9 +4298,8 @@ class KafkaApisTest extends Logging {
new TopicIdPartition(topicId, partitionIndex, topicName), false))

when(sharePartitionManager.newContext(any(), any(), any(), any(), any()))
.thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 1), new ShareSession(
new ShareSessionKey(groupId, memberId), cachedSharePartitions, 0L, 0L, 2))
)
.thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 1),
new ShareSession(new ShareSessionKey(groupId, memberId), cachedSharePartitions, 2)))

when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
Expand Down Expand Up @@ -4361,9 +4359,8 @@ class KafkaApisTest extends Logging {
new TopicIdPartition(topicId, partitionIndex, topicName), false))

when(sharePartitionManager.newContext(any(), any(), any(), any(), any()))
.thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 1), new ShareSession(
new ShareSessionKey(groupId, memberId), cachedSharePartitions, 0L, 0L, 2))
)
.thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 1),
new ShareSession(new ShareSessionKey(groupId, memberId), cachedSharePartitions, 2)))

when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
Expand Down Expand Up @@ -4718,10 +4715,10 @@ class KafkaApisTest extends Logging {
new ShareSessionContext(new ShareRequestMetadata(memberId, 0), util.List.of(
new TopicIdPartition(topicId, partitionIndex, topicName)
))
).thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 1), new ShareSession(
new ShareSessionKey(groupId, memberId), cachedSharePartitions, 0L, 0L, 2))
).thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 2), new ShareSession(
new ShareSessionKey(groupId, memberId), cachedSharePartitions, 0L, 10L, 3))
).thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 1),
new ShareSession(new ShareSessionKey(groupId, memberId), cachedSharePartitions, 2))
).thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 2),
new ShareSession(new ShareSessionKey(groupId, memberId), cachedSharePartitions, 3))
)

when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
Expand Down Expand Up @@ -4986,10 +4983,10 @@ class KafkaApisTest extends Logging {
new TopicIdPartition(topicId2, new TopicPartition(topicName2, 0)),
new TopicIdPartition(topicId2, new TopicPartition(topicName2, 1))
))
).thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 1), new ShareSession(
new ShareSessionKey(groupId, memberId), cachedSharePartitions1, 0L, 0L, 2))
).thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 2), new ShareSession(
new ShareSessionKey(groupId, memberId), cachedSharePartitions2, 0L, 0L, 3))
).thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 1),
new ShareSession(new ShareSessionKey(groupId, memberId), cachedSharePartitions1, 2))
).thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 2),
new ShareSession(new ShareSessionKey(groupId, memberId), cachedSharePartitions2, 3))
).thenReturn(new FinalContext())

when(sharePartitionManager.releaseSession(any(), any())).thenReturn(
Expand Down Expand Up @@ -5963,9 +5960,8 @@ class KafkaApisTest extends Logging {
new ShareSessionContext(new ShareRequestMetadata(memberId, 0), util.List.of(
new TopicIdPartition(topicId, partitionIndex, topicName)
))
).thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 1), new ShareSession(
new ShareSessionKey(groupId, memberId), cachedSharePartitions, 0L, 0L, 2))
)
).thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 1),
new ShareSession(new ShareSessionKey(groupId, memberId), cachedSharePartitions, 2)))

when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
Expand Down
Loading