Skip to content

Commit d154a31

Browse files
authored
KAFKA-14691; Add TopicId to OffsetFetch API (#19515)
This patch extends the OffsetFetch API to support topic ids. From version 10 of the API, topic ids must be used. The patch only contains the server side changes and it keeps the version 10 as unstable for now. We will mark the version as stable when the client side changes are merged in. Reviewers: TengYao Chi <frankvicky@apache.org>, Lianet Magrans <lmagrans@confluent.io>
1 parent b4b73c6 commit d154a31

File tree

19 files changed

+750
-283
lines changed

19 files changed

+750
-283
lines changed

clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ private static Set<CoordinatorKey> coordinatorKeys(Collection<String> groupIds)
8888

8989
public OffsetFetchRequest.Builder buildBatchedRequest(Set<CoordinatorKey> groupIds) {
9090
// Create a request that only contains the consumer groups owned by the coordinator.
91-
return new OffsetFetchRequest.Builder(
91+
return OffsetFetchRequest.Builder.forTopicNames(
9292
new OffsetFetchRequestData()
9393
.setRequireStable(requireStable)
9494
.setGroups(groupIds.stream().map(groupId -> {

clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -985,7 +985,7 @@ public NetworkClientDelegate.UnsentRequest toUnsentRequest() {
985985
.collect(Collectors.toList());
986986

987987
OffsetFetchRequest.Builder builder = memberInfo.memberEpoch
988-
.map(epoch -> new OffsetFetchRequest.Builder(
988+
.map(epoch -> OffsetFetchRequest.Builder.forTopicNames(
989989
new OffsetFetchRequestData()
990990
.setRequireStable(true)
991991
.setGroups(List.of(
@@ -997,7 +997,7 @@ public NetworkClientDelegate.UnsentRequest toUnsentRequest() {
997997
throwOnFetchStableOffsetUnsupported))
998998
// Building request without passing member ID/epoch to leave the logic to choose
999999
// default values when not present on the request builder.
1000-
.orElseGet(() -> new OffsetFetchRequest.Builder(
1000+
.orElseGet(() -> OffsetFetchRequest.Builder.forTopicNames(
10011001
new OffsetFetchRequestData()
10021002
.setRequireStable(true)
10031003
.setGroups(List.of(

clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1494,7 +1494,7 @@ private RequestFuture<Map<TopicPartition, OffsetAndMetadata>> sendOffsetFetchReq
14941494
.collect(Collectors.toList())))
14951495
.collect(Collectors.toList());
14961496

1497-
OffsetFetchRequest.Builder requestBuilder = new OffsetFetchRequest.Builder(
1497+
OffsetFetchRequest.Builder requestBuilder = OffsetFetchRequest.Builder.forTopicNames(
14981498
new OffsetFetchRequestData()
14991499
.setRequireStable(true)
15001500
.setGroups(List.of(

clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java

Lines changed: 111 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.apache.kafka.common.requests;
1818

1919
import org.apache.kafka.common.TopicPartition;
20+
import org.apache.kafka.common.Uuid;
2021
import org.apache.kafka.common.errors.UnsupportedVersionException;
2122
import org.apache.kafka.common.message.OffsetFetchRequestData;
2223
import org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestGroup;
@@ -44,26 +45,58 @@ public class OffsetFetchRequest extends AbstractRequest {
4445
public static final short TOP_LEVEL_ERROR_AND_NULL_TOPICS_MIN_VERSION = 2;
4546
public static final short REQUIRE_STABLE_OFFSET_MIN_VERSION = 7;
4647
public static final short BATCH_MIN_VERSION = 8;
48+
public static final short TOPIC_ID_MIN_VERSION = 10;
4749

4850
private final OffsetFetchRequestData data;
4951

5052
public static class Builder extends AbstractRequest.Builder<OffsetFetchRequest> {
51-
5253
private final OffsetFetchRequestData data;
5354
private final boolean throwOnFetchStableOffsetsUnsupported;
5455

55-
public Builder(OffsetFetchRequestData data, boolean throwOnFetchStableOffsetsUnsupported) {
56-
super(ApiKeys.OFFSET_FETCH);
56+
public static Builder forTopicIdsOrNames(
57+
OffsetFetchRequestData data,
58+
boolean throwOnFetchStableOffsetsUnsupported,
59+
boolean enableUnstableLastVersion
60+
) {
61+
return new Builder(
62+
data,
63+
throwOnFetchStableOffsetsUnsupported,
64+
ApiKeys.OFFSET_FETCH.oldestVersion(),
65+
ApiKeys.OFFSET_FETCH.latestVersion(enableUnstableLastVersion)
66+
);
67+
}
68+
69+
public static Builder forTopicNames(
70+
OffsetFetchRequestData data,
71+
boolean throwOnFetchStableOffsetsUnsupported
72+
) {
73+
return new Builder(
74+
data,
75+
throwOnFetchStableOffsetsUnsupported,
76+
ApiKeys.OFFSET_FETCH.oldestVersion(),
77+
(short) (TOPIC_ID_MIN_VERSION - 1)
78+
);
79+
}
80+
81+
private Builder(
82+
OffsetFetchRequestData data,
83+
boolean throwOnFetchStableOffsetsUnsupported,
84+
short oldestAllowedVersion,
85+
short latestAllowedVersion
86+
) {
87+
super(ApiKeys.OFFSET_FETCH, oldestAllowedVersion, latestAllowedVersion);
5788
this.data = data;
5889
this.throwOnFetchStableOffsetsUnsupported = throwOnFetchStableOffsetsUnsupported;
5990
}
6091

61-
@Override
62-
public OffsetFetchRequest build(short version) {
92+
private void throwIfBatchingIsUnsupported(short version) {
6393
if (data.groups().size() > 1 && version < BATCH_MIN_VERSION) {
6494
throw new NoBatchedOffsetFetchRequestException("Broker does not support"
6595
+ " batching groups for fetch offset request on version " + version);
6696
}
97+
}
98+
99+
private void throwIfStableOffsetsUnsupported(short version) {
67100
if (data.requireStable() && version < REQUIRE_STABLE_OFFSET_MIN_VERSION) {
68101
if (throwOnFetchStableOffsetsUnsupported) {
69102
throw new UnsupportedVersionException("Broker unexpectedly " +
@@ -75,37 +108,77 @@ public OffsetFetchRequest build(short version) {
75108
data.setRequireStable(false);
76109
}
77110
}
78-
// convert data to use the appropriate version since version 8 uses different format
79-
if (version < BATCH_MIN_VERSION) {
80-
OffsetFetchRequestData normalizedData;
81-
if (!data.groups().isEmpty()) {
82-
OffsetFetchRequestGroup group = data.groups().get(0);
83-
String groupName = group.groupId();
84-
List<OffsetFetchRequestTopics> topics = group.topics();
85-
List<OffsetFetchRequestTopic> oldFormatTopics = null;
86-
if (topics != null) {
87-
oldFormatTopics = topics
88-
.stream()
89-
.map(t ->
90-
new OffsetFetchRequestTopic()
91-
.setName(t.name())
92-
.setPartitionIndexes(t.partitionIndexes()))
93-
.collect(Collectors.toList());
111+
}
112+
113+
private void throwIfMissingRequiredTopicIdentifiers(short version) {
114+
if (version < TOPIC_ID_MIN_VERSION) {
115+
data.groups().forEach(group -> {
116+
if (group.topics() != null) {
117+
group.topics().forEach(topic -> {
118+
if (topic.name() == null || topic.name().isEmpty()) {
119+
throw new UnsupportedVersionException("The broker offset fetch api version " +
120+
version + " does require usage of topic names.");
121+
}
122+
});
94123
}
95-
normalizedData = new OffsetFetchRequestData()
96-
.setGroupId(groupName)
97-
.setTopics(oldFormatTopics)
98-
.setRequireStable(data.requireStable());
99-
} else {
100-
normalizedData = data;
101-
}
102-
if (normalizedData.topics() == null && version < TOP_LEVEL_ERROR_AND_NULL_TOPICS_MIN_VERSION) {
103-
throw new UnsupportedVersionException("The broker only supports OffsetFetchRequest " +
104-
"v" + version + ", but we need v2 or newer to request all topic partitions.");
105-
}
106-
return new OffsetFetchRequest(normalizedData, version);
124+
});
125+
} else {
126+
data.groups().forEach(group -> {
127+
if (group.topics() != null) {
128+
group.topics().forEach(topic -> {
129+
if (topic.topicId() == null || topic.topicId().equals(Uuid.ZERO_UUID)) {
130+
throw new UnsupportedVersionException("The broker offset fetch api version " +
131+
version + " does require usage of topic ids.");
132+
}
133+
});
134+
}
135+
});
136+
}
137+
}
138+
139+
private void throwIfRequestingAllTopicsIsUnsupported(short version) {
140+
if (version < TOP_LEVEL_ERROR_AND_NULL_TOPICS_MIN_VERSION) {
141+
data.groups().forEach(group -> {
142+
if (group.topics() == null) {
143+
throw new UnsupportedVersionException("The broker only supports OffsetFetchRequest " +
144+
"v" + version + ", but we need v2 or newer to request all topic partitions.");
145+
}
146+
});
107147
}
108-
return new OffsetFetchRequest(data, version);
148+
}
149+
150+
private OffsetFetchRequestData maybeDowngrade(short version) {
151+
// Convert data to use the appropriate version since version 8
152+
// uses different format.
153+
if (version >= BATCH_MIN_VERSION || data.groups().isEmpty()) return data;
154+
155+
OffsetFetchRequestGroup group = data.groups().get(0);
156+
String groupName = group.groupId();
157+
List<OffsetFetchRequestTopics> topics = group.topics();
158+
List<OffsetFetchRequestTopic> oldFormatTopics = null;
159+
160+
if (topics != null) {
161+
oldFormatTopics = topics
162+
.stream()
163+
.map(t -> new OffsetFetchRequestTopic()
164+
.setName(t.name())
165+
.setPartitionIndexes(t.partitionIndexes()))
166+
.collect(Collectors.toList());
167+
}
168+
169+
return new OffsetFetchRequestData()
170+
.setGroupId(groupName)
171+
.setTopics(oldFormatTopics)
172+
.setRequireStable(data.requireStable());
173+
}
174+
175+
@Override
176+
public OffsetFetchRequest build(short version) {
177+
throwIfBatchingIsUnsupported(version);
178+
throwIfStableOffsetsUnsupported(version);
179+
throwIfMissingRequiredTopicIdentifiers(version);
180+
throwIfRequestingAllTopicsIsUnsupported(version);
181+
return new OffsetFetchRequest(maybeDowngrade(version), version);
109182
}
110183

111184
@Override
@@ -249,6 +322,10 @@ public static OffsetFetchRequest parse(Readable readable, short version) {
249322
return new OffsetFetchRequest(new OffsetFetchRequestData(readable, version), version);
250323
}
251324

325+
public static boolean useTopicIds(short version) {
326+
return version >= TOPIC_ID_MIN_VERSION;
327+
}
328+
252329
@Override
253330
public OffsetFetchRequestData data() {
254331
return data;

clients/src/main/resources/common/message/OffsetFetchRequest.json

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,11 @@
3838
//
3939
// Version 9 is the first version that can be used with the new consumer group protocol (KIP-848). It adds
4040
// the MemberId and MemberEpoch fields. Those are filled in and validated when the new consumer protocol is used.
41-
"validVersions": "1-9",
41+
//
42+
// Version 10 adds support for topic ids and removes support for topic names (KIP-848).
43+
"validVersions": "1-10",
4244
"flexibleVersions": "6+",
45+
"latestVersionUnstable": true,
4346
"fields": [
4447
{ "name": "GroupId", "type": "string", "versions": "0-7", "entityType": "groupId",
4548
"about": "The group to fetch offsets for." },
@@ -60,8 +63,10 @@
6063
"about": "The member epoch if using the new consumer protocol (KIP-848)." },
6164
{ "name": "Topics", "type": "[]OffsetFetchRequestTopics", "versions": "8+", "nullableVersions": "8+",
6265
"about": "Each topic we would like to fetch offsets for, or null to fetch offsets for all topics.", "fields": [
63-
{ "name": "Name", "type": "string", "versions": "8+", "entityType": "topicName",
66+
{ "name": "Name", "type": "string", "versions": "8-9", "entityType": "topicName", "ignorable": true,
6467
"about": "The topic name."},
68+
{ "name": "TopicId", "type": "uuid", "versions": "10+", "ignorable": true,
69+
"about": "The topic ID." },
6570
{ "name": "PartitionIndexes", "type": "[]int32", "versions": "8+",
6671
"about": "The partition indexes we would like to fetch offsets for." }
6772
]}

clients/src/main/resources/common/message/OffsetFetchResponse.json

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,9 @@
3838
// Version 9 is the first version that can be used with the new consumer group protocol (KIP-848). The response is
3939
// the same as version 8 but can return STALE_MEMBER_EPOCH and UNKNOWN_MEMBER_ID errors when the new consumer group
4040
// protocol is used.
41-
"validVersions": "1-9",
41+
//
42+
// Version 10 adds support for topic ids and removes support for topic names (KIP-848).
43+
"validVersions": "1-10",
4244
"flexibleVersions": "6+",
4345
// Supported errors:
4446
// - GROUP_AUTHORIZATION_FAILED (version 0+)
@@ -49,6 +51,7 @@
4951
// - UNSTABLE_OFFSET_COMMIT (version 7+)
5052
// - UNKNOWN_MEMBER_ID (version 9+)
5153
// - STALE_MEMBER_EPOCH (version 9+)
54+
// - UNKNOWN_TOPIC_ID (version 10+)
5255
"fields": [
5356
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", "ignorable": true,
5457
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
@@ -78,8 +81,10 @@
7881
"about": "The group ID." },
7982
{ "name": "Topics", "type": "[]OffsetFetchResponseTopics", "versions": "8+",
8083
"about": "The responses per topic.", "fields": [
81-
{ "name": "Name", "type": "string", "versions": "8+", "entityType": "topicName",
84+
{ "name": "Name", "type": "string", "versions": "8-9", "entityType": "topicName", "ignorable": true,
8285
"about": "The topic name." },
86+
{ "name": "TopicId", "type": "uuid", "versions": "10+", "ignorable": true,
87+
"about": "The topic ID." },
8388
{ "name": "Partitions", "type": "[]OffsetFetchResponsePartitions", "versions": "8+",
8489
"about": "The responses per partition.", "fields": [
8590
{ "name": "PartitionIndex", "type": "int32", "versions": "8+",

clients/src/test/java/org/apache/kafka/common/message/MessageTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -567,7 +567,8 @@ public void testOffsetFetchRequestVersions(short version) throws Exception {
567567
.setMemberEpoch(version >= 9 ? 10 : -1)
568568
.setTopics(List.of(
569569
new OffsetFetchRequestTopics()
570-
.setName("foo")
570+
.setName(version < 10 ? "foo" : "")
571+
.setTopicId(version >= 10 ? Uuid.randomUuid() : Uuid.ZERO_UUID)
571572
.setPartitionIndexes(List.of(0, 1, 2))
572573
))
573574
));
@@ -606,7 +607,8 @@ public void testOffsetFetchResponseVersions(short version) throws Exception {
606607
.setErrorCode(Errors.INVALID_GROUP_ID.code())
607608
.setTopics(List.of(
608609
new OffsetFetchResponseTopics()
609-
.setName("foo")
610+
.setName(version < 10 ? "foo" : "")
611+
.setTopicId(version >= 10 ? Uuid.randomUuid() : Uuid.ZERO_UUID)
610612
.setPartitions(List.of(
611613
new OffsetFetchResponsePartitions()
612614
.setPartitionIndex(0)

0 commit comments

Comments
 (0)