Skip to content

KAFKA-14691; Add TopicId to OffsetFetch API #19515

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 24 commits into from
May 13, 2025
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ private static Set<CoordinatorKey> coordinatorKeys(Collection<String> groupIds)

public OffsetFetchRequest.Builder buildBatchedRequest(Set<CoordinatorKey> groupIds) {
// Create a request that only contains the consumer groups owned by the coordinator.
return new OffsetFetchRequest.Builder(
return OffsetFetchRequest.Builder.forTopicNames(
new OffsetFetchRequestData()
.setRequireStable(requireStable)
.setGroups(groupIds.stream().map(groupId -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -985,7 +985,7 @@ public NetworkClientDelegate.UnsentRequest toUnsentRequest() {
.collect(Collectors.toList());

OffsetFetchRequest.Builder builder = memberInfo.memberEpoch
.map(epoch -> new OffsetFetchRequest.Builder(
.map(epoch -> OffsetFetchRequest.Builder.forTopicNames(
new OffsetFetchRequestData()
.setRequireStable(true)
.setGroups(List.of(
Expand All @@ -997,7 +997,7 @@ public NetworkClientDelegate.UnsentRequest toUnsentRequest() {
throwOnFetchStableOffsetUnsupported))
// Building request without passing member ID/epoch to leave the logic to choose
// default values when not present on the request builder.
.orElseGet(() -> new OffsetFetchRequest.Builder(
.orElseGet(() -> OffsetFetchRequest.Builder.forTopicNames(
new OffsetFetchRequestData()
.setRequireStable(true)
.setGroups(List.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1492,7 +1492,7 @@ private RequestFuture<Map<TopicPartition, OffsetAndMetadata>> sendOffsetFetchReq
.collect(Collectors.toList())))
.collect(Collectors.toList());

OffsetFetchRequest.Builder requestBuilder = new OffsetFetchRequest.Builder(
OffsetFetchRequest.Builder requestBuilder = OffsetFetchRequest.Builder.forTopicNames(
new OffsetFetchRequestData()
.setRequireStable(true)
.setGroups(List.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.common.requests;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.OffsetFetchRequestData;
import org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestGroup;
Expand Down Expand Up @@ -44,6 +45,7 @@ public class OffsetFetchRequest extends AbstractRequest {
public static final short TOP_LEVEL_ERROR_AND_NULL_TOPICS_MIN_VERSION = 2;
public static final short REQUIRE_STABLE_OFFSET_MIN_VERSION = 7;
public static final short BATCH_MIN_VERSION = 8;
public static final short TOPIC_ID_MIN_VERSION = 10;

private final OffsetFetchRequestData data;

Expand All @@ -52,13 +54,37 @@ public static class Builder extends AbstractRequest.Builder<OffsetFetchRequest>
private final OffsetFetchRequestData data;
private final boolean throwOnFetchStableOffsetsUnsupported;

public Builder(OffsetFetchRequestData data, boolean throwOnFetchStableOffsetsUnsupported) {
super(ApiKeys.OFFSET_FETCH);
public static Builder forTopicIdsOrNames(OffsetFetchRequestData data, boolean throwOnFetchStableOffsetsUnsupported, boolean enableUnstableLastVersion) {
return new Builder(
data,
throwOnFetchStableOffsetsUnsupported,
ApiKeys.OFFSET_FETCH.oldestVersion(),
ApiKeys.OFFSET_FETCH.latestVersion(enableUnstableLastVersion)
);
}

public static Builder forTopicNames(OffsetFetchRequestData data, boolean throwOnFetchStableOffsetsUnsupported) {
return new Builder(
data,
throwOnFetchStableOffsetsUnsupported,
ApiKeys.OFFSET_FETCH.oldestVersion(),
(short) (TOPIC_ID_MIN_VERSION - 1)
);
}

private Builder(
OffsetFetchRequestData data,
boolean throwOnFetchStableOffsetsUnsupported,
short oldestAllowedVersion,
short latestAllowedVersion
) {
super(ApiKeys.OFFSET_FETCH, oldestAllowedVersion, latestAllowedVersion);
this.data = data;
this.throwOnFetchStableOffsetsUnsupported = throwOnFetchStableOffsetsUnsupported;
}

@Override
@SuppressWarnings("checkstyle:cyclomaticComplexity")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it help if we move the logic to validate topic id and names usage to a different func? (this is wild indeed, maybe we could consider also encapsulating the other validation sections consistently: batching, require stable)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea!

public OffsetFetchRequest build(short version) {
if (data.groups().size() > 1 && version < BATCH_MIN_VERSION) {
throw new NoBatchedOffsetFetchRequestException("Broker does not support"
Expand All @@ -75,6 +101,29 @@ public OffsetFetchRequest build(short version) {
data.setRequireStable(false);
}
}
if (version < TOPIC_ID_MIN_VERSION) {
data.groups().forEach(group -> {
if (group.topics() != null) {
group.topics().forEach(topic -> {
if (topic.name() == null || topic.name().isEmpty()) {
throw new UnsupportedVersionException("The broker offset fetch api version " +
version + " does require usage of topic names.");
}
});
}
});
} else {
data.groups().forEach(group -> {
if (group.topics() != null) {
group.topics().forEach(topic -> {
if (topic.topicId() == null || topic.topicId().equals(Uuid.ZERO_UUID)) {
throw new UnsupportedVersionException("The broker offset fetch api version " +
version + " does require usage of topic ids.");
}
});
}
});
}
// convert data to use the appropriate version since version 8 uses different format
if (version < BATCH_MIN_VERSION) {
OffsetFetchRequestData normalizedData;
Expand Down Expand Up @@ -249,6 +298,10 @@ public static OffsetFetchRequest parse(Readable readable, short version) {
return new OffsetFetchRequest(new OffsetFetchRequestData(readable, version), version);
}

public static boolean useTopicIds(short version) {
return version >= TOPIC_ID_MIN_VERSION;
}

@Override
public OffsetFetchRequestData data() {
return data;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,11 @@
//
// Version 9 is the first version that can be used with the new consumer group protocol (KIP-848). It adds
// the MemberId and MemberEpoch fields. Those are filled in and validated when the new consumer protocol is used.
"validVersions": "1-9",
//
// Version 10 adds support for topic ids and removes support for topic names (KIP-848).
"validVersions": "1-10",
"flexibleVersions": "6+",
"latestVersionUnstable": true,
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0-7", "entityType": "groupId",
"about": "The group to fetch offsets for." },
Expand All @@ -60,8 +63,10 @@
"about": "The member epoch if using the new consumer protocol (KIP-848)." },
{ "name": "Topics", "type": "[]OffsetFetchRequestTopics", "versions": "8+", "nullableVersions": "8+",
"about": "Each topic we would like to fetch offsets for, or null to fetch offsets for all topics.", "fields": [
{ "name": "Name", "type": "string", "versions": "8+", "entityType": "topicName",
{ "name": "Name", "type": "string", "versions": "8-9", "entityType": "topicName", "ignorable": true,
"about": "The topic name."},
{ "name": "TopicId", "type": "uuid", "versions": "10+", "ignorable": true,
"about": "The topic ID." },
{ "name": "PartitionIndexes", "type": "[]int32", "versions": "8+",
"about": "The partition indexes we would like to fetch offsets for." }
]}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@
// Version 9 is the first version that can be used with the new consumer group protocol (KIP-848). The response is
// the same as version 8 but can return STALE_MEMBER_EPOCH and UNKNOWN_MEMBER_ID errors when the new consumer group
// protocol is used.
"validVersions": "1-9",
//
// Version 10 adds support for topic ids and removes support for topic names (KIP-848).
"validVersions": "1-10",
"flexibleVersions": "6+",
// Supported errors:
// - GROUP_AUTHORIZATION_FAILED (version 0+)
Expand All @@ -49,6 +51,7 @@
// - UNSTABLE_OFFSET_COMMIT (version 7+)
// - UNKNOWN_MEMBER_ID (version 9+)
// - STALE_MEMBER_EPOCH (version 9+)
// - UNKNOWN_TOPIC_ID (version 10+)
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", "ignorable": true,
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
Expand Down Expand Up @@ -78,8 +81,10 @@
"about": "The group ID." },
{ "name": "Topics", "type": "[]OffsetFetchResponseTopics", "versions": "8+",
"about": "The responses per topic.", "fields": [
{ "name": "Name", "type": "string", "versions": "8+", "entityType": "topicName",
{ "name": "Name", "type": "string", "versions": "8-9", "entityType": "topicName", "ignorable": true,
"about": "The topic name." },
{ "name": "TopicId", "type": "uuid", "versions": "10+", "ignorable": true,
"about": "The topic ID." },
{ "name": "Partitions", "type": "[]OffsetFetchResponsePartitions", "versions": "8+",
"about": "The responses per partition.", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "8+",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,8 @@ public void testOffsetFetchRequestVersions(short version) throws Exception {
.setMemberEpoch(version >= 9 ? 10 : -1)
.setTopics(List.of(
new OffsetFetchRequestTopics()
.setName("foo")
.setName(version < 10 ? "foo" : "")
.setTopicId(version >= 10 ? Uuid.randomUuid() : Uuid.ZERO_UUID)
.setPartitionIndexes(List.of(0, 1, 2))
))
));
Expand Down Expand Up @@ -606,7 +607,8 @@ public void testOffsetFetchResponseVersions(short version) throws Exception {
.setErrorCode(Errors.INVALID_GROUP_ID.code())
.setTopics(List.of(
new OffsetFetchResponseTopics()
.setName("foo")
.setName(version < 10 ? "foo" : "")
.setTopicId(version >= 10 ? Uuid.randomUuid() : Uuid.ZERO_UUID)
.setPartitions(List.of(
new OffsetFetchResponsePartitions()
.setPartitionIndex(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.requests;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.OffsetFetchRequestData;
import org.apache.kafka.common.message.OffsetFetchResponseData;
Expand All @@ -42,17 +43,21 @@ public void testWithMultipleGroups(short version) {
.setTopics(List.of(
new OffsetFetchRequestData.OffsetFetchRequestTopics()
.setName("foo")
.setTopicId(Uuid.randomUuid())
.setTopicId(Uuid.randomUuid())
.setPartitionIndexes(List.of(0, 1, 2))
)),
new OffsetFetchRequestData.OffsetFetchRequestGroup()
.setGroupId("grp2")
.setTopics(List.of(
new OffsetFetchRequestData.OffsetFetchRequestTopics()
.setName("bar")
.setTopicId(Uuid.randomUuid())
.setTopicId(Uuid.randomUuid())
.setPartitionIndexes(List.of(0, 1, 2))
))
));
var builder = new OffsetFetchRequest.Builder(data, false);
var builder = OffsetFetchRequest.Builder.forTopicIdsOrNames(data, false, true);

if (version < 8) {
assertThrows(OffsetFetchRequest.NoBatchedOffsetFetchRequestException.class, () -> builder.build(version));
Expand All @@ -64,7 +69,7 @@ public void testWithMultipleGroups(short version) {
@ParameterizedTest
@ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH)
public void testThrowOnFetchStableOffsetsUnsupported(short version) {
var builder = new OffsetFetchRequest.Builder(
var builder = OffsetFetchRequest.Builder.forTopicIdsOrNames(
new OffsetFetchRequestData()
.setRequireStable(true)
.setGroups(List.of(
Expand All @@ -73,9 +78,11 @@ public void testThrowOnFetchStableOffsetsUnsupported(short version) {
.setTopics(List.of(
new OffsetFetchRequestData.OffsetFetchRequestTopics()
.setName("foo")
.setTopicId(Uuid.randomUuid())
.setPartitionIndexes(List.of(0, 1, 2))
))
)),
true,
true
);

Expand All @@ -96,10 +103,11 @@ public void testSingleGroup(short version) {
.setTopics(List.of(
new OffsetFetchRequestData.OffsetFetchRequestTopics()
.setName("foo")
.setTopicId(Uuid.randomUuid())
.setPartitionIndexes(List.of(0, 1, 2))
))
));
var builder = new OffsetFetchRequest.Builder(data, false);
var builder = OffsetFetchRequest.Builder.forTopicIdsOrNames(data, false, true);

if (version < 8) {
var expectedRequest = new OffsetFetchRequestData()
Expand All @@ -124,7 +132,7 @@ public void testSingleGroupWithAllTopics(short version) {
.setGroupId("grp1")
.setTopics(null)
));
var builder = new OffsetFetchRequest.Builder(data, false);
var builder = OffsetFetchRequest.Builder.forTopicIdsOrNames(data, false, true);

if (version < 2) {
assertThrows(UnsupportedVersionException.class, () -> builder.build(version));
Expand All @@ -141,18 +149,21 @@ public void testSingleGroupWithAllTopics(short version) {
@ParameterizedTest
@ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH)
public void testGetErrorResponse(short version) {
var request = new OffsetFetchRequest.Builder(
var request = OffsetFetchRequest.Builder.forTopicIdsOrNames(
new OffsetFetchRequestData()
.setGroups(List.of(
new OffsetFetchRequestData.OffsetFetchRequestGroup()
.setGroupId("grp1")
.setTopics(List.of(
new OffsetFetchRequestData.OffsetFetchRequestTopics()
.setName("foo")
.setTopicId(Uuid.randomUuid())
.setTopicId(Uuid.randomUuid())
.setPartitionIndexes(List.of(0, 1))
))
)),
false
false,
true
).build(version);

if (version < 2) {
Expand Down Expand Up @@ -197,18 +208,21 @@ public void testGetErrorResponse(short version) {
@ParameterizedTest
@ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH)
public void testGroups(short version) {
var request = new OffsetFetchRequest.Builder(
var request = OffsetFetchRequest.Builder.forTopicIdsOrNames(
new OffsetFetchRequestData()
.setGroups(List.of(
new OffsetFetchRequestData.OffsetFetchRequestGroup()
.setGroupId("grp1")
.setTopics(List.of(
new OffsetFetchRequestData.OffsetFetchRequestTopics()
.setName("foo")
.setTopicId(Uuid.randomUuid())
.setTopicId(Uuid.randomUuid())
.setPartitionIndexes(List.of(0, 1, 2))
))
)),
false
false,
true
).build(version);

if (version < 8) {
Expand All @@ -230,14 +244,15 @@ public void testGroups(short version) {
@ParameterizedTest
@ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH, fromVersion = 2)
public void testGroupsWithAllTopics(short version) {
var request = new OffsetFetchRequest.Builder(
var request = OffsetFetchRequest.Builder.forTopicIdsOrNames(
new OffsetFetchRequestData()
.setGroups(List.of(
new OffsetFetchRequestData.OffsetFetchRequestGroup()
.setGroupId("grp1")
.setTopics(null)
)),
false
false,
true
).build(version);

if (version < 8) {
Expand Down
Loading