-
Notifications
You must be signed in to change notification settings - Fork 14.3k
KAFKA-19159: Removed time based evictions for share sessions #19500
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly looks good to me, some doubts.
} else if (now - lastUsedEntry.getKey().lastUsedMs() > evictionMs) { | ||
ShareSession session = lastUsedEntry.getValue(); | ||
remove(session); | ||
evictionsMeter.mark(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about evictionsMeter metric i.e. where it will get updated in new design? Should it go in remove
method now? Having said that, you were right that now there is actually no eviction per se i.e. no forceful eviction based on size hence we might not require the metrics. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If not eviction then should we have ShareSessionRemovePerSec
meter, will it be of any help?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review. There are no evictions but sessions are still dropped on client disconnections. I believe ShareSessionsDroppedPerSec
would be a better name. We can target the question of where it will go in the PR that introduces the client connection drop logic. Will update that PR once this one gets merged. But yeah, that shouldn't go in the remove method because I don't think we need this metric to also include the sessions dropped due to Final Share Fetch request.
@@ -428,8 +428,8 @@ class BrokerServer( | |||
val fetchManager = new FetchManager(Time.SYSTEM, new FetchSessionCache(fetchSessionCacheShards)) | |||
|
|||
val shareFetchSessionCache : ShareSessionCache = new ShareSessionCache( | |||
config.shareGroupConfig.shareGroupMaxGroups * config.groupCoordinatorConfig.shareGroupMaxSize, | |||
KafkaBroker.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS) | |||
config.shareGroupConfig.shareGroupMaxGroups * config.groupCoordinatorConfig.shareGroupMaxSize |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will replace this with new configuration in subsequent PRs, correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review. A PR is already in review to take care of this.
#19505
TestUtils.waitForCondition(() -> yammerMetricValue(ShareSessionCache.SHARE_SESSIONS_COUNT).intValue() == 3, | ||
"Share session count should be 3."); | ||
TestUtils.waitForCondition(() -> yammerMetricValue(ShareSessionCache.SHARE_PARTITIONS_COUNT).intValue() == 60, | ||
"Share partition count should be 60."); | ||
assertEquals(0, cache.evictionsMeter().count()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why session and partitions count metric is removed in test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review. These haven't been removed. I have put these assertions in a separate method assertMetricsValues
because these checks are repeated in the tests.
assertEquals(0, cache.evictionsMeter().count()); | ||
|
||
// Touch the sessions to update the last used time, so that the key-2 can be evicted. | ||
cache.touch(cache.get(key1), 200); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shaouldn't we use cache.updateNumPartitions here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review. This test actually touched the cache so that inactive sessions could be evicted and new ones could be added to the cache. But the eviction logic is removed now so replacing touch with updateNumPartitions wouldn't pass the test. Also since the resizing of cache session is being tested in the next test in this file, I removed the other logic to let this one only test the addition of sessions to the cache
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, can you please create a jira to track the change for evictions
metric. EIther we replace or drop but should be tracked.
*/ | ||
public synchronized void touch(ShareSession session, long now) { | ||
public synchronized void updateNumPartitions(ShareSession session) { | ||
synchronized (session) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pardon me, why to have this sync? it seems the methods cachedSize
and cachedSize
are under synchronized
already.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review. Didn't notice that before, I have made the required changes in the latest commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a read and update to the session
, to make the operation atomic over ShareSession hence the sync block was applied here. The individual methods sync can guarantee an updated value value read but overall the operation is not atomic if we miss the sync block here. Am I missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a read and update to the session
You are right. Maybe we can add a specific method to handle the case to ensure the atomicity? for example:
ShareSession
public synchronized int updateCachedSize() {
var previousSize = cachedSize;
cachedSize = partitionMap.size();
return previousSize != -1 ? cachedSize - previousSize : cachedSize;
}
ShareSessionCache
public synchronized void updateNumPartitions(ShareSession session) {
numPartitions += session.updateCachedSize();
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the above also works or a sync block as earlier. I incline towards the former, which you suggested.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have made the updates to the code
cc - @apoorvmittal10
@@ -122,15 +113,9 @@ public synchronized ShareSession remove(ShareSession session) { | |||
* Update a session's position in the lastUsed tree. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please update the docs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the changes, minor comment.
@@ -138,6 +118,12 @@ public synchronized Map<ModifiedTopicIdPartitionType, List<TopicIdPartition>> up | |||
return result; | |||
} | |||
|
|||
public synchronized int updateCachedSize() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method needs javadoc comments saying specially about the return data and what the return value represents.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! I will wait for sometime if others have any further comments.
Currently the share session cache is desgined like the fetch session
cache. If the cache is full and a new share session is trying to get get
initialized, then the sessions which haven't been touched for more than
2minutes are evicted. This wouldn't be right for share sessions as the
members also hold locks on the acquired records, and session eviction
would mean theose locks will need to be dropped and the corresponding
records re-delivered. This PR removes the time based eviction logic for
share sessions.
Refer: KAFKA-19159