-
Notifications
You must be signed in to change notification settings - Fork 14.4k
MINOR: Simplify OffsetFetchResponse #19642
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MINOR: Simplify OffsetFetchResponse #19642
Conversation
…set-fetch-response
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @dajac, I took a first pass on the non-testing files. Very nice approach with the homogenous group DS! makes sense to me. Will take another closer look asap. Thanks!
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
Outdated
Show resolved
Hide resolved
.../src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
Outdated
Show resolved
Hide resolved
topic.partitions().forEach(partition -> { | ||
OffsetFetchResponsePartition newPartition; | ||
|
||
if (version < 2 && group.errorCode() != Errors.NONE.code()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a bug here. This logic assumes that partitions are given along side a group level error. However, looking at KafkaApis, it is not the case. Hence, a group level error is not returned at all for version 1 of the API. I will file a Jira and address it separately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
…set-fetch-response
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @dajac ! Took a full pass
Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.TOPIC_AUTHORIZATION_FAILED); | ||
Errors.UNKNOWN_TOPIC_OR_PARTITION, | ||
Errors.TOPIC_AUTHORIZATION_FAILED, | ||
Errors.UNSTABLE_OFFSET_COMMIT |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason why we need this error here? this one is expected only if v>7 (requires stable), but PARTITION_ERRORS
is used only if v<2 (when we want to bubble up errors)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not really. I was debating whether we should be exhaustive here and list all the partition errors or not. Let me remove it and add a comment.
Map<String, Errors> errorMap, | ||
Map<String, Map<TopicPartition, PartitionData>> responseData | ||
) { | ||
return new OffsetFetchResponse(throttleMs, errorMap, responseData); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
throttleMs
doesn't seem needed after this (on ln 152)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lianetm I don't see any throttleMs
in this file after the refactor. Could you please elaborate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you're right (I probably got mixed up with another file). All good.
@@ -1609,15 +1608,19 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { | |||
// without describe permission on the topic, we shouldn't be able to fetch offsets | |||
val offsetFetchRequest = createOffsetFetchRequestAllPartitions | |||
var offsetFetchResponse = connectAndReceive[OffsetFetchResponse](offsetFetchRequest) | |||
assertEquals(Errors.NONE, offsetFetchResponse.groupLevelError(group)) | |||
assertTrue(offsetFetchResponse.partitionDataMap(group).isEmpty) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why removing this? this check is what ensures that we're not returning the offsets if no auth right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an oversight. Let me bring it back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we're still missing this assert
it seems
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry, forgot to push it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR simplifies the OffsetFetchResponse by consolidating its constructors into a single data‐driven builder and updating its usage in both server and client code. Key changes include updating tests to use the new builder API, adjusting error handling logic based on response version, and refactoring legacy PartitionData usages.
Reviewed Changes
Copilot reviewed 14 out of 14 changed files in this pull request and generated 2 comments.
File | Description |
---|---|
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala | Updated OffsetFetchResponse error checks and ACL validations to use the new builder API |
core/src/main/scala/kafka/server/KafkaApis.scala | Replaced the direct constructor call with the new Builder approach for constructing OffsetFetchResponse |
clients/... | Various test files and client modules have been updated to use the new OffsetFetchResponse.Builder and new error handling methods |
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java | Refactored internal error handling and the group() method logic to support multiple response versions |
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! LGTM.
While working on #19515, I came to
the conclusion that the OffsetFetchResponse is quite messy and overall
too complicated. This patch rationalize the constructors.
OffsetFetchResponse has a single constructor accepting the
OffsetFetchResponseData. A builder is introduced to handle the down
conversion. This will also simplify adding the topic ids. All the
changes are mechanical, replacing data structures by others.
Reviewers: Lianet Magrans lmagrans@confluent.io