-
Notifications
You must be signed in to change notification settings - Fork 14.4k
KAFKA-14691; Add TopicId to OffsetFetch API #19515
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dajac: Thanks for the patch!
I have a few minor comments for consideration.
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
Outdated
Show resolved
Hide resolved
...-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
Show resolved
Hide resolved
...-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
Outdated
Show resolved
Hide resolved
clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
Outdated
Show resolved
Hide resolved
This PR is getting out of control because OffsetFetchRequest and OffsetFetchResponse are way too complicated. I will simplify them in separate PRs and come back to this one afterwards. |
Opened #19572. |
Opened #19576. |
While working on #19515, I came to the conclusion that the OffsetFetchRequest is quite messy and overall too complicated. This patch rationalize the constructors. OffsetFetchRequest has a single constructor accepting the OffsetFetchRequestData. This will also simplify adding the topic ids. All the changes are mechanical, replacing data structures by others. Reviewers: PoAn Yang <payang@apache.org>, TengYao Chi <frankvicky@apache.org>, Lianet Magran <lmagrans@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
Opened #19642. |
While working on #19515, I came to the conclusion that the OffsetFetchResponse is quite messy and overall too complicated. This patch rationalize the constructors. OffsetFetchResponse has a single constructor accepting the OffsetFetchResponseData. A builder is introduced to handle the down conversion. This will also simplify adding the topic ids. All the changes are mechanical, replacing data structures by others. Reviewers: Lianet Magrans <lmagrans@confluent.io>
clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchRequestTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the changes! Just some minor comments.
this.data = data; | ||
this.throwOnFetchStableOffsetsUnsupported = throwOnFetchStableOffsetsUnsupported; | ||
} | ||
|
||
@Override | ||
@SuppressWarnings("checkstyle:cyclomaticComplexity") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would it help if we move the logic to validate topic id and names usage to a different func? (this is wild indeed, maybe we could consider also encapsulating the other validation sections consistently: batching, require stable)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea!
.setTopicId(topicId) | ||
.setPartitionIndexes(List[Integer](0, 1, 2).asJava) | ||
).asJava), | ||
requireStable = true, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this change to true intentional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nope, let me revert it.
@@ -8269,6 +8276,7 @@ class KafkaApisTest extends Logging { | |||
.setGroupId("group-1") | |||
.setTopics(List( | |||
new OffsetFetchRequestData.OffsetFetchRequestTopics() | |||
.setTopicId(if (version >= 10) fooId else Uuid.ZERO_UUID) | |||
.setName("foo") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't we set the name in the request only if v<10?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nope because this is the response from the group coordinator service. the topic name is always set but the topic id is only set if version >= 10.
@@ -8320,11 +8347,12 @@ class KafkaApisTest extends Logging { | |||
).asJava) | |||
).asJava) | |||
|
|||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: extra line
// bar does not exist so it must be filtered out. | ||
new OffsetFetchResponseData.OffsetFetchResponseTopics() | ||
.setName(bar) | ||
.setPartitions(List( | ||
new OffsetFetchResponseData.OffsetFetchResponsePartitions() | ||
.setPartitionIndex(0) | ||
.setCommittedOffset(100) | ||
.setCommittedLeaderEpoch(1) | ||
).asJava) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just for my understanding, this would be the case where bar does exist when the coordinator fetched the offsets, but by the time we build the response in KafkaApis it doesn't exist anymore (so filtered out), correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
almost. we requested all topics for group-2
. in this case, we resolve the topic id when the resolve is built. if the topic id does not exist, we drop the topic (it does not exist).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates! LGTM
This patch extends the OffsetFetch API to support topic ids. From
version 10 of the API, topic ids must be used.
The patch only contains the server side changes and it keeps the version
10 as unstable for now. We will mark the version as stable when the
client side changes are merged in.
Reviewers: TengYao Chi frankvicky@apache.org, Lianet Magrans
lmagrans@confluent.io