-
Notifications
You must be signed in to change notification settings - Fork 14.4k
KAFKA-19164: Keep track of groups when deleting transactional offsets #19495
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
When deleting pending transactional offsets, we must preserve the list of groups associated with the producer ID, otherwise we cannot clean up the list of pending transactions for the group once the transaction is committed or aborted.
if (!preserveGroups && topicOffsets.isEmpty()) | ||
offsetsByGroup.remove(groupId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use offsetsByGroup
to clean up the list of open transactions per group.
kafka/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
Lines 1046 to 1054 in 6e26ec0
pendingOffsets.offsetsByGroup.keySet().forEach(groupId -> { | |
TimelineHashSet<Long> openTransactions = openTransactionsByGroup.get(groupId); | |
if (openTransactions != null) { | |
openTransactions.remove(producerId); | |
if (openTransactions.isEmpty()) { | |
openTransactionsByGroup.remove(groupId); | |
} | |
} | |
}); |
If we do not maintain an entry here, the clean up will not happen. Later, we will not delete the group when we expire its last offset.
kafka/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
Lines 887 to 888 in 6e26ec0
// We don't want to remove the group if there are ongoing transactions. | |
return allOffsetsExpired.get() && !openTransactionsByGroup.containsKey(groupId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we could also just remove from the openTransactionsByGroup here as well? Not a strong opinion either way. If we do it this way we should also clean it up when we remove from openTransactionByGroup? I don't think we want to preserve it forever?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we could. It depends on whether we are trying to track whether a group has an open transaction with offset commits that will take effect when committed or whether a group has an open transaction at all.
We certainly shouldn't delete groups with uncommitted offsets, since they could be committed afterwards and would then not get cleaned up. I'm just not sure about groups with deleted uncommitted offsets that are a no-op when committed.
@dajac What should we be doing in the second case?
Closing this since #19497 fixes the bug. |
When deleting pending transactional offsets, we must preserve the list
of groups associated with the producer ID, otherwise we cannot clean up
the list of pending transactions for the group once the transaction is
committed or aborted.