Skip to content

MINOR: Simplify OffsetFetchResponse #19642

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
May 8, 2025

Conversation

dajac
Copy link
Member

@dajac dajac commented May 5, 2025

While working on #19515, I came to
the conclusion that the OffsetFetchResponse is quite messy and overall
too complicated. This patch rationalize the constructors.
OffsetFetchResponse has a single constructor accepting the
OffsetFetchResponseData. A builder is introduced to handle the down
conversion. This will also simplify adding the topic ids. All the
changes are mechanical, replacing data structures by others.

Reviewers: Lianet Magrans lmagrans@confluent.io

@github-actions github-actions bot added the core Kafka Broker label May 5, 2025
Copy link
Member

@lianetm lianetm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @dajac, I took a first pass on the non-testing files. Very nice approach with the homogenous group DS! makes sense to me. Will take another closer look asap. Thanks!

topic.partitions().forEach(partition -> {
OffsetFetchResponsePartition newPartition;

if (version < 2 && group.errorCode() != Errors.NONE.code()) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a bug here. This logic assumes that partitions are given along side a group level error. However, looking at KafkaApis, it is not the case. Hence, a group level error is not returned at all for version 1 of the API. I will file a Jira and address it separately.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

@lianetm lianetm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @dajac ! Took a full pass

Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.TOPIC_AUTHORIZATION_FAILED);
Errors.UNKNOWN_TOPIC_OR_PARTITION,
Errors.TOPIC_AUTHORIZATION_FAILED,
Errors.UNSTABLE_OFFSET_COMMIT
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason why we need this error here? this one is expected only if v>7 (requires stable), but PARTITION_ERRORS is used only if v<2 (when we want to bubble up errors)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really. I was debating whether we should be exhaustive here and list all the partition errors or not. Let me remove it and add a comment.

Map<String, Errors> errorMap,
Map<String, Map<TopicPartition, PartitionData>> responseData
) {
return new OffsetFetchResponse(throttleMs, errorMap, responseData);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throttleMs doesn't seem needed after this (on ln 152)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lianetm I don't see any throttleMs in this file after the refactor. Could you please elaborate?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you're right (I probably got mixed up with another file). All good.

@@ -1609,15 +1608,19 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
// without describe permission on the topic, we shouldn't be able to fetch offsets
val offsetFetchRequest = createOffsetFetchRequestAllPartitions
var offsetFetchResponse = connectAndReceive[OffsetFetchResponse](offsetFetchRequest)
assertEquals(Errors.NONE, offsetFetchResponse.groupLevelError(group))
assertTrue(offsetFetchResponse.partitionDataMap(group).isEmpty)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why removing this? this check is what ensures that we're not returning the offsets if no auth right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an oversight. Let me bring it back.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we're still missing this assert it seems

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, forgot to push it.

@dajac dajac requested a review from Copilot May 7, 2025 15:17
Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR simplifies the OffsetFetchResponse by consolidating its constructors into a single data‐driven builder and updating its usage in both server and client code. Key changes include updating tests to use the new builder API, adjusting error handling logic based on response version, and refactoring legacy PartitionData usages.

Reviewed Changes

Copilot reviewed 14 out of 14 changed files in this pull request and generated 2 comments.

File Description
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala Updated OffsetFetchResponse error checks and ACL validations to use the new builder API
core/src/main/scala/kafka/server/KafkaApis.scala Replaced the direct constructor call with the new Builder approach for constructing OffsetFetchResponse
clients/... Various test files and client modules have been updated to use the new OffsetFetchResponse.Builder and new error handling methods
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java Refactored internal error handling and the group() method logic to support multiple response versions

@dajac dajac requested a review from lianetm May 7, 2025 15:23
Copy link
Member

@lianetm lianetm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! LGTM.

@dajac dajac merged commit 98e535b into apache:trunk May 8, 2025
24 checks passed
@dajac dajac deleted the minor-simplify-offset-fetch-response branch May 8, 2025 12:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants