Skip to content

Commit 22c5794

Browse files
KAFKA-19159: Removed time based evictions for share sessions (#19500)
Currently the share session cache is desgined like the fetch session cache. If the cache is full and a new share session is trying to get get initialized, then the sessions which haven't been touched for more than 2minutes are evicted. This wouldn't be right for share sessions as the members also hold locks on the acquired records, and session eviction would mean theose locks will need to be dropped and the corresponding records re-delivered. This PR removes the time based eviction logic for share sessions. Refer: [KAFKA-19159](https://issues.apache.org/jira/browse/KAFKA-19159) Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
1 parent 66147d5 commit 22c5794

File tree

8 files changed

+84
-337
lines changed

8 files changed

+84
-337
lines changed

core/src/main/java/kafka/server/share/SharePartitionManager.java

+3-4
Original file line numberDiff line numberDiff line change
@@ -448,8 +448,7 @@ public ShareFetchContext newContext(String groupId, List<TopicIdPartition> share
448448
ImplicitLinkedHashCollection<>(shareFetchData.size());
449449
shareFetchData.forEach(topicIdPartition ->
450450
cachedSharePartitions.mustAdd(new CachedSharePartition(topicIdPartition, false)));
451-
ShareSessionKey responseShareSessionKey = cache.maybeCreateSession(groupId, reqMetadata.memberId(),
452-
time.milliseconds(), cachedSharePartitions);
451+
ShareSessionKey responseShareSessionKey = cache.maybeCreateSession(groupId, reqMetadata.memberId(), cachedSharePartitions);
453452
if (responseShareSessionKey == null) {
454453
log.error("Could not create a share session for group {} member {}", groupId, reqMetadata.memberId());
455454
throw Errors.SHARE_SESSION_NOT_FOUND.exception();
@@ -476,7 +475,7 @@ public ShareFetchContext newContext(String groupId, List<TopicIdPartition> share
476475
}
477476
Map<ShareSession.ModifiedTopicIdPartitionType, List<TopicIdPartition>> modifiedTopicIdPartitions = shareSession.update(
478477
shareFetchData, toForget);
479-
cache.touch(shareSession, time.milliseconds());
478+
cache.updateNumPartitions(shareSession);
480479
shareSession.epoch = ShareRequestMetadata.nextEpoch(shareSession.epoch);
481480
log.debug("Created a new ShareSessionContext for session key {}, epoch {}: " +
482481
"added {}, updated {}, removed {}", shareSession.key(), shareSession.epoch,
@@ -517,7 +516,7 @@ public void acknowledgeSessionUpdate(String groupId, ShareRequestMetadata reqMet
517516
shareSession.epoch, reqMetadata.epoch());
518517
throw Errors.INVALID_SHARE_SESSION_EPOCH.exception();
519518
}
520-
cache.touch(shareSession, time.milliseconds());
519+
cache.updateNumPartitions(shareSession);
521520
shareSession.epoch = ShareRequestMetadata.nextEpoch(shareSession.epoch);
522521
}
523522
}

core/src/main/scala/kafka/server/BrokerServer.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -427,8 +427,8 @@ class BrokerServer(
427427
val fetchManager = new FetchManager(Time.SYSTEM, new FetchSessionCache(fetchSessionCacheShards))
428428

429429
val shareFetchSessionCache : ShareSessionCache = new ShareSessionCache(
430-
config.shareGroupConfig.shareGroupMaxGroups * config.groupCoordinatorConfig.shareGroupMaxSize,
431-
KafkaBroker.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS)
430+
config.shareGroupConfig.shareGroupMaxGroups * config.groupCoordinatorConfig.shareGroupMaxSize
431+
)
432432

433433
sharePartitionManager = new SharePartitionManager(
434434
replicaManager,

core/src/test/java/kafka/server/share/SharePartitionManagerTest.java

+12-107
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,6 @@
121121
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
122122
import static org.junit.jupiter.api.Assertions.assertNotEquals;
123123
import static org.junit.jupiter.api.Assertions.assertNotNull;
124-
import static org.junit.jupiter.api.Assertions.assertNull;
125124
import static org.junit.jupiter.api.Assertions.assertThrows;
126125
import static org.junit.jupiter.api.Assertions.assertTrue;
127126
import static org.mockito.ArgumentMatchers.any;
@@ -185,7 +184,7 @@ public void tearDown() throws Exception {
185184

186185
@Test
187186
public void testNewContextReturnsFinalContextWithoutRequestData() {
188-
ShareSessionCache cache = new ShareSessionCache(10, 1000);
187+
ShareSessionCache cache = new ShareSessionCache(10);
189188
sharePartitionManager = SharePartitionManagerBuilder.builder()
190189
.withCache(cache)
191190
.build();
@@ -212,7 +211,7 @@ public void testNewContextReturnsFinalContextWithoutRequestData() {
212211

213212
@Test
214213
public void testNewContextReturnsFinalContextWithRequestData() {
215-
ShareSessionCache cache = new ShareSessionCache(10, 1000);
214+
ShareSessionCache cache = new ShareSessionCache(10);
216215
sharePartitionManager = SharePartitionManagerBuilder.builder()
217216
.withCache(cache)
218217
.build();
@@ -244,7 +243,7 @@ public void testNewContextReturnsFinalContextWithRequestData() {
244243

245244
@Test
246245
public void testNewContextReturnsFinalContextWhenTopicPartitionsArePresentInRequestData() {
247-
ShareSessionCache cache = new ShareSessionCache(10, 1000);
246+
ShareSessionCache cache = new ShareSessionCache(10);
248247
sharePartitionManager = SharePartitionManagerBuilder.builder()
249248
.withCache(cache)
250249
.build();
@@ -275,7 +274,7 @@ public void testNewContextReturnsFinalContextWhenTopicPartitionsArePresentInRequ
275274

276275
@Test
277276
public void testNewContext() {
278-
ShareSessionCache cache = new ShareSessionCache(10, 1000);
277+
ShareSessionCache cache = new ShareSessionCache(10);
279278
sharePartitionManager = SharePartitionManagerBuilder.builder()
280279
.withCache(cache)
281280
.build();
@@ -372,100 +371,6 @@ public void testNewContext() {
372371
assertEquals(0, cache.size());
373372
}
374373

375-
@Test
376-
public void testShareSessionExpiration() {
377-
ShareSessionCache cache = new ShareSessionCache(2, 1000);
378-
sharePartitionManager = SharePartitionManagerBuilder.builder()
379-
.withCache(cache)
380-
.withTime(time)
381-
.build();
382-
383-
Map<Uuid, String> topicNames = new HashMap<>();
384-
Uuid fooId = Uuid.randomUuid();
385-
topicNames.put(fooId, "foo");
386-
TopicIdPartition foo0 = new TopicIdPartition(fooId, new TopicPartition("foo", 0));
387-
TopicIdPartition foo1 = new TopicIdPartition(fooId, new TopicPartition("foo", 1));
388-
389-
// Create a new share session, session 1
390-
List<TopicIdPartition> session1req = List.of(foo0, foo1);
391-
392-
String groupId = "grp";
393-
ShareRequestMetadata reqMetadata1 = new ShareRequestMetadata(Uuid.randomUuid(), ShareRequestMetadata.INITIAL_EPOCH);
394-
395-
ShareFetchContext session1context = sharePartitionManager.newContext(groupId, session1req, EMPTY_PART_LIST, reqMetadata1, false);
396-
assertInstanceOf(ShareSessionContext.class, session1context);
397-
398-
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData1 = new LinkedHashMap<>();
399-
respData1.put(foo0, new ShareFetchResponseData.PartitionData().setPartitionIndex(foo0.partition()));
400-
respData1.put(foo1, new ShareFetchResponseData.PartitionData().setPartitionIndex(foo1.partition()));
401-
402-
ShareFetchResponse session1resp = session1context.updateAndGenerateResponseData(groupId, reqMetadata1.memberId(), respData1);
403-
assertEquals(Errors.NONE, session1resp.error());
404-
assertEquals(2, session1resp.responseData(topicNames).size());
405-
406-
ShareSessionKey session1Key = new ShareSessionKey(groupId, reqMetadata1.memberId());
407-
// check share session entered into cache
408-
assertNotNull(cache.get(session1Key));
409-
410-
time.sleep(500);
411-
412-
// Create a second new share session
413-
List<TopicIdPartition> session2req = List.of(foo0, foo1);
414-
415-
ShareRequestMetadata reqMetadata2 = new ShareRequestMetadata(Uuid.randomUuid(), ShareRequestMetadata.INITIAL_EPOCH);
416-
417-
ShareFetchContext session2context = sharePartitionManager.newContext(groupId, session2req, EMPTY_PART_LIST, reqMetadata2, false);
418-
assertInstanceOf(ShareSessionContext.class, session2context);
419-
420-
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData2 = new LinkedHashMap<>();
421-
respData2.put(foo0, new ShareFetchResponseData.PartitionData().setPartitionIndex(foo0.partition()));
422-
respData2.put(foo1, new ShareFetchResponseData.PartitionData().setPartitionIndex(foo1.partition()));
423-
424-
ShareFetchResponse session2resp = session2context.updateAndGenerateResponseData(groupId, reqMetadata2.memberId(), respData2);
425-
assertEquals(Errors.NONE, session2resp.error());
426-
assertEquals(2, session2resp.responseData(topicNames).size());
427-
428-
ShareSessionKey session2Key = new ShareSessionKey(groupId, reqMetadata2.memberId());
429-
430-
// both newly created entries are present in cache
431-
assertNotNull(cache.get(session1Key));
432-
assertNotNull(cache.get(session2Key));
433-
434-
time.sleep(500);
435-
436-
// Create a subsequent share fetch context for session 1
437-
ShareFetchContext session1context2 = sharePartitionManager.newContext(groupId, List.of(), EMPTY_PART_LIST,
438-
new ShareRequestMetadata(reqMetadata1.memberId(), 1), true);
439-
assertInstanceOf(ShareSessionContext.class, session1context2);
440-
441-
// total sleep time will now be large enough that share session 1 will be evicted if not correctly touched
442-
time.sleep(501);
443-
444-
// create one final share session to test that the least recently used entry is evicted
445-
// the second share session should be evicted because the first share session was incrementally fetched
446-
// more recently than the second session was created
447-
List<TopicIdPartition> session3req = List.of(foo0, foo1);
448-
449-
ShareRequestMetadata reqMetadata3 = new ShareRequestMetadata(Uuid.randomUuid(), ShareRequestMetadata.INITIAL_EPOCH);
450-
451-
ShareFetchContext session3context = sharePartitionManager.newContext(groupId, session3req, EMPTY_PART_LIST, reqMetadata3, false);
452-
453-
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData3 = new LinkedHashMap<>();
454-
respData3.put(foo0, new ShareFetchResponseData.PartitionData().setPartitionIndex(foo0.partition()));
455-
respData3.put(foo1, new ShareFetchResponseData.PartitionData().setPartitionIndex(foo1.partition()));
456-
457-
ShareFetchResponse session3resp = session3context.updateAndGenerateResponseData(groupId, reqMetadata3.memberId(), respData3);
458-
assertEquals(Errors.NONE, session3resp.error());
459-
assertEquals(2, session3resp.responseData(topicNames).size());
460-
461-
ShareSessionKey session3Key = new ShareSessionKey(groupId, reqMetadata3.memberId());
462-
463-
assertNotNull(cache.get(session1Key));
464-
assertNull(cache.get(session2Key), "share session 2 should have been evicted by latest share session, " +
465-
"as share session 1 was used more recently");
466-
assertNotNull(cache.get(session3Key));
467-
}
468-
469374
@Test
470375
public void testSubsequentShareSession() {
471376
sharePartitionManager = SharePartitionManagerBuilder.builder().build();
@@ -530,7 +435,7 @@ public void testSubsequentShareSession() {
530435

531436
@Test
532437
public void testZeroSizeShareSession() {
533-
ShareSessionCache cache = new ShareSessionCache(10, 1000);
438+
ShareSessionCache cache = new ShareSessionCache(10);
534439
sharePartitionManager = SharePartitionManagerBuilder.builder()
535440
.withCache(cache)
536441
.build();
@@ -576,7 +481,7 @@ public void testZeroSizeShareSession() {
576481
@Test
577482
public void testToForgetPartitions() {
578483
String groupId = "grp";
579-
ShareSessionCache cache = new ShareSessionCache(10, 1000);
484+
ShareSessionCache cache = new ShareSessionCache(10);
580485
sharePartitionManager = SharePartitionManagerBuilder.builder()
581486
.withCache(cache)
582487
.build();
@@ -614,7 +519,7 @@ public void testToForgetPartitions() {
614519
@Test
615520
public void testShareSessionUpdateTopicIdsBrokerSide() {
616521
String groupId = "grp";
617-
ShareSessionCache cache = new ShareSessionCache(10, 1000);
522+
ShareSessionCache cache = new ShareSessionCache(10);
618523
sharePartitionManager = SharePartitionManagerBuilder.builder()
619524
.withCache(cache)
620525
.build();
@@ -665,7 +570,7 @@ public void testShareSessionUpdateTopicIdsBrokerSide() {
665570

666571
@Test
667572
public void testGetErroneousAndValidTopicIdPartitions() {
668-
ShareSessionCache cache = new ShareSessionCache(10, 1000);
573+
ShareSessionCache cache = new ShareSessionCache(10);
669574
sharePartitionManager = SharePartitionManagerBuilder.builder()
670575
.withCache(cache)
671576
.build();
@@ -758,7 +663,7 @@ public void testGetErroneousAndValidTopicIdPartitions() {
758663

759664
@Test
760665
public void testShareFetchContextResponseSize() {
761-
ShareSessionCache cache = new ShareSessionCache(10, 1000);
666+
ShareSessionCache cache = new ShareSessionCache(10);
762667
sharePartitionManager = SharePartitionManagerBuilder.builder()
763668
.withCache(cache)
764669
.build();
@@ -859,7 +764,7 @@ public void testShareFetchContextResponseSize() {
859764

860765
@Test
861766
public void testCachedTopicPartitionsWithNoTopicPartitions() {
862-
ShareSessionCache cache = new ShareSessionCache(10, 1000);
767+
ShareSessionCache cache = new ShareSessionCache(10);
863768
sharePartitionManager = SharePartitionManagerBuilder.builder()
864769
.withCache(cache)
865770
.build();
@@ -870,7 +775,7 @@ public void testCachedTopicPartitionsWithNoTopicPartitions() {
870775

871776
@Test
872777
public void testCachedTopicPartitionsForValidShareSessions() {
873-
ShareSessionCache cache = new ShareSessionCache(10, 1000);
778+
ShareSessionCache cache = new ShareSessionCache(10);
874779
sharePartitionManager = SharePartitionManagerBuilder.builder()
875780
.withCache(cache)
876781
.build();
@@ -3108,7 +3013,7 @@ static class SharePartitionManagerBuilder {
31083013
private final Persister persister = new NoOpStatePersister();
31093014
private ReplicaManager replicaManager = mock(ReplicaManager.class);
31103015
private Time time = new MockTime();
3111-
private ShareSessionCache cache = new ShareSessionCache(10, 1000);
3016+
private ShareSessionCache cache = new ShareSessionCache(10);
31123017
private Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
31133018
private Timer timer = new MockTimer();
31143019
private ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(time);

core/src/test/scala/unit/kafka/server/KafkaApisTest.scala

+16-20
Original file line numberDiff line numberDiff line change
@@ -4046,9 +4046,8 @@ class KafkaApisTest extends Logging {
40464046

40474047
when(sharePartitionManager.newContext(any(), any(), any(), any(), any())).thenThrow(
40484048
Errors.INVALID_REQUEST.exception()
4049-
).thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 1), new ShareSession(
4050-
new ShareSessionKey(groupId, memberId), cachedSharePartitions, 0L, 0L, 2
4051-
)))
4049+
).thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 1),
4050+
new ShareSession(new ShareSessionKey(groupId, memberId), cachedSharePartitions, 2)))
40524051

40534052
when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
40544053
any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
@@ -4299,9 +4298,8 @@ class KafkaApisTest extends Logging {
42994298
new TopicIdPartition(topicId, partitionIndex, topicName), false))
43004299

43014300
when(sharePartitionManager.newContext(any(), any(), any(), any(), any()))
4302-
.thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 1), new ShareSession(
4303-
new ShareSessionKey(groupId, memberId), cachedSharePartitions, 0L, 0L, 2))
4304-
)
4301+
.thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 1),
4302+
new ShareSession(new ShareSessionKey(groupId, memberId), cachedSharePartitions, 2)))
43054303

43064304
when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
43074305
any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
@@ -4361,9 +4359,8 @@ class KafkaApisTest extends Logging {
43614359
new TopicIdPartition(topicId, partitionIndex, topicName), false))
43624360

43634361
when(sharePartitionManager.newContext(any(), any(), any(), any(), any()))
4364-
.thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 1), new ShareSession(
4365-
new ShareSessionKey(groupId, memberId), cachedSharePartitions, 0L, 0L, 2))
4366-
)
4362+
.thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 1),
4363+
new ShareSession(new ShareSessionKey(groupId, memberId), cachedSharePartitions, 2)))
43674364

43684365
when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
43694366
any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
@@ -4718,10 +4715,10 @@ class KafkaApisTest extends Logging {
47184715
new ShareSessionContext(new ShareRequestMetadata(memberId, 0), util.List.of(
47194716
new TopicIdPartition(topicId, partitionIndex, topicName)
47204717
))
4721-
).thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 1), new ShareSession(
4722-
new ShareSessionKey(groupId, memberId), cachedSharePartitions, 0L, 0L, 2))
4723-
).thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 2), new ShareSession(
4724-
new ShareSessionKey(groupId, memberId), cachedSharePartitions, 0L, 10L, 3))
4718+
).thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 1),
4719+
new ShareSession(new ShareSessionKey(groupId, memberId), cachedSharePartitions, 2))
4720+
).thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 2),
4721+
new ShareSession(new ShareSessionKey(groupId, memberId), cachedSharePartitions, 3))
47254722
)
47264723

47274724
when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
@@ -4986,10 +4983,10 @@ class KafkaApisTest extends Logging {
49864983
new TopicIdPartition(topicId2, new TopicPartition(topicName2, 0)),
49874984
new TopicIdPartition(topicId2, new TopicPartition(topicName2, 1))
49884985
))
4989-
).thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 1), new ShareSession(
4990-
new ShareSessionKey(groupId, memberId), cachedSharePartitions1, 0L, 0L, 2))
4991-
).thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 2), new ShareSession(
4992-
new ShareSessionKey(groupId, memberId), cachedSharePartitions2, 0L, 0L, 3))
4986+
).thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 1),
4987+
new ShareSession(new ShareSessionKey(groupId, memberId), cachedSharePartitions1, 2))
4988+
).thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 2),
4989+
new ShareSession(new ShareSessionKey(groupId, memberId), cachedSharePartitions2, 3))
49934990
).thenReturn(new FinalContext())
49944991

49954992
when(sharePartitionManager.releaseSession(any(), any())).thenReturn(
@@ -5963,9 +5960,8 @@ class KafkaApisTest extends Logging {
59635960
new ShareSessionContext(new ShareRequestMetadata(memberId, 0), util.List.of(
59645961
new TopicIdPartition(topicId, partitionIndex, topicName)
59655962
))
5966-
).thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 1), new ShareSession(
5967-
new ShareSessionKey(groupId, memberId), cachedSharePartitions, 0L, 0L, 2))
5968-
)
5963+
).thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 1),
5964+
new ShareSession(new ShareSessionKey(groupId, memberId), cachedSharePartitions, 2)))
59695965

59705966
when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
59715967
any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)

0 commit comments

Comments
 (0)