Skip to content

KAFKA-19159: Removed time based evictions for share sessions #19500

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Apr 22, 2025

Conversation

chirag-wadhwa5
Copy link
Contributor

@chirag-wadhwa5 chirag-wadhwa5 commented Apr 17, 2025

Currently the share session cache is desgined like the fetch session
cache. If the cache is full and a new share session is trying to get get
initialized, then the sessions which haven't been touched for more than
2minutes are evicted. This wouldn't be right for share sessions as the
members also hold locks on the acquired records, and session eviction
would mean theose locks will need to be dropped and the corresponding
records re-delivered. This PR removes the time based eviction logic for
share sessions.

Refer: KAFKA-19159

@github-actions github-actions bot added triage PRs from the community core Kafka Broker KIP-932 Queues for Kafka labels Apr 17, 2025
Copy link
Contributor

@apoorvmittal10 apoorvmittal10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly looks good to me, some doubts.

} else if (now - lastUsedEntry.getKey().lastUsedMs() > evictionMs) {
ShareSession session = lastUsedEntry.getValue();
remove(session);
evictionsMeter.mark();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about evictionsMeter metric i.e. where it will get updated in new design? Should it go in remove method now? Having said that, you were right that now there is actually no eviction per se i.e. no forceful eviction based on size hence we might not require the metrics. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If not eviction then should we have ShareSessionRemovePerSec meter, will it be of any help?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review. There are no evictions but sessions are still dropped on client disconnections. I believe ShareSessionsDroppedPerSec would be a better name. We can target the question of where it will go in the PR that introduces the client connection drop logic. Will update that PR once this one gets merged. But yeah, that shouldn't go in the remove method because I don't think we need this metric to also include the sessions dropped due to Final Share Fetch request.

@@ -428,8 +428,8 @@ class BrokerServer(
val fetchManager = new FetchManager(Time.SYSTEM, new FetchSessionCache(fetchSessionCacheShards))

val shareFetchSessionCache : ShareSessionCache = new ShareSessionCache(
config.shareGroupConfig.shareGroupMaxGroups * config.groupCoordinatorConfig.shareGroupMaxSize,
KafkaBroker.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS)
config.shareGroupConfig.shareGroupMaxGroups * config.groupCoordinatorConfig.shareGroupMaxSize
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will replace this with new configuration in subsequent PRs, correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review. A PR is already in review to take care of this.
#19505

Comment on lines -56 to -60
TestUtils.waitForCondition(() -> yammerMetricValue(ShareSessionCache.SHARE_SESSIONS_COUNT).intValue() == 3,
"Share session count should be 3.");
TestUtils.waitForCondition(() -> yammerMetricValue(ShareSessionCache.SHARE_PARTITIONS_COUNT).intValue() == 60,
"Share partition count should be 60.");
assertEquals(0, cache.evictionsMeter().count());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why session and partitions count metric is removed in test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review. These haven't been removed. I have put these assertions in a separate method assertMetricsValues because these checks are repeated in the tests.

assertEquals(0, cache.evictionsMeter().count());

// Touch the sessions to update the last used time, so that the key-2 can be evicted.
cache.touch(cache.get(key1), 200);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shaouldn't we use cache.updateNumPartitions here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review. This test actually touched the cache so that inactive sessions could be evicted and new ones could be added to the cache. But the eviction logic is removed now so replacing touch with updateNumPartitions wouldn't pass the test. Also since the resizing of cache session is being tested in the next test in this file, I removed the other logic to let this one only test the addition of sessions to the cache

Copy link
Contributor

@apoorvmittal10 apoorvmittal10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, can you please create a jira to track the change for evictions metric. EIther we replace or drop but should be tracked.

*/
public synchronized void touch(ShareSession session, long now) {
public synchronized void updateNumPartitions(ShareSession session) {
synchronized (session) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pardon me, why to have this sync? it seems the methods cachedSize and cachedSize are under synchronized already.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review. Didn't notice that before, I have made the required changes in the latest commit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a read and update to the session, to make the operation atomic over ShareSession hence the sync block was applied here. The individual methods sync can guarantee an updated value value read but overall the operation is not atomic if we miss the sync block here. Am I missing something?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a read and update to the session

You are right. Maybe we can add a specific method to handle the case to ensure the atomicity? for example:

ShareSession

    public synchronized int updateCachedSize() {
        var previousSize = cachedSize;
        cachedSize = partitionMap.size();
        return previousSize != -1 ? cachedSize - previousSize : cachedSize;
    }

ShareSessionCache

    public synchronized void updateNumPartitions(ShareSession session) {
        numPartitions += session.updateCachedSize();
    }

Copy link
Contributor

@apoorvmittal10 apoorvmittal10 Apr 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the above also works or a sync block as earlier. I incline towards the former, which you suggested.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have made the updates to the code
cc - @apoorvmittal10

@@ -122,15 +113,9 @@ public synchronized ShareSession remove(ShareSession session) {
* Update a session's position in the lastUsed tree.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please update the docs

@github-actions github-actions bot removed the triage PRs from the community label Apr 18, 2025
Copy link
Contributor

@apoorvmittal10 apoorvmittal10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the changes, minor comment.

@@ -138,6 +118,12 @@ public synchronized Map<ModifiedTopicIdPartitionType, List<TopicIdPartition>> up
return result;
}

public synchronized int updateCachedSize() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method needs javadoc comments saying specially about the return data and what the return value represents.

Copy link
Contributor

@apoorvmittal10 apoorvmittal10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! I will wait for sometime if others have any further comments.

@apoorvmittal10 apoorvmittal10 merged commit 22c5794 into apache:trunk Apr 22, 2025
20 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ci-approved core Kafka Broker KIP-932 Queues for Kafka
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants