Skip to content

KAFKA-14691; Add TopicId to OffsetFetch API #19515

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 24 commits into from
May 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ private static Set<CoordinatorKey> coordinatorKeys(Collection<String> groupIds)

public OffsetFetchRequest.Builder buildBatchedRequest(Set<CoordinatorKey> groupIds) {
// Create a request that only contains the consumer groups owned by the coordinator.
return new OffsetFetchRequest.Builder(
return OffsetFetchRequest.Builder.forTopicNames(
new OffsetFetchRequestData()
.setRequireStable(requireStable)
.setGroups(groupIds.stream().map(groupId -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -985,7 +985,7 @@ public NetworkClientDelegate.UnsentRequest toUnsentRequest() {
.collect(Collectors.toList());

OffsetFetchRequest.Builder builder = memberInfo.memberEpoch
.map(epoch -> new OffsetFetchRequest.Builder(
.map(epoch -> OffsetFetchRequest.Builder.forTopicNames(
new OffsetFetchRequestData()
.setRequireStable(true)
.setGroups(List.of(
Expand All @@ -997,7 +997,7 @@ public NetworkClientDelegate.UnsentRequest toUnsentRequest() {
throwOnFetchStableOffsetUnsupported))
// Building request without passing member ID/epoch to leave the logic to choose
// default values when not present on the request builder.
.orElseGet(() -> new OffsetFetchRequest.Builder(
.orElseGet(() -> OffsetFetchRequest.Builder.forTopicNames(
new OffsetFetchRequestData()
.setRequireStable(true)
.setGroups(List.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1492,7 +1492,7 @@ private RequestFuture<Map<TopicPartition, OffsetAndMetadata>> sendOffsetFetchReq
.collect(Collectors.toList())))
.collect(Collectors.toList());

OffsetFetchRequest.Builder requestBuilder = new OffsetFetchRequest.Builder(
OffsetFetchRequest.Builder requestBuilder = OffsetFetchRequest.Builder.forTopicNames(
new OffsetFetchRequestData()
.setRequireStable(true)
.setGroups(List.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.common.requests;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.OffsetFetchRequestData;
import org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestGroup;
Expand Down Expand Up @@ -44,26 +45,58 @@ public class OffsetFetchRequest extends AbstractRequest {
public static final short TOP_LEVEL_ERROR_AND_NULL_TOPICS_MIN_VERSION = 2;
public static final short REQUIRE_STABLE_OFFSET_MIN_VERSION = 7;
public static final short BATCH_MIN_VERSION = 8;
public static final short TOPIC_ID_MIN_VERSION = 10;

private final OffsetFetchRequestData data;

public static class Builder extends AbstractRequest.Builder<OffsetFetchRequest> {

private final OffsetFetchRequestData data;
private final boolean throwOnFetchStableOffsetsUnsupported;

public Builder(OffsetFetchRequestData data, boolean throwOnFetchStableOffsetsUnsupported) {
super(ApiKeys.OFFSET_FETCH);
public static Builder forTopicIdsOrNames(
OffsetFetchRequestData data,
boolean throwOnFetchStableOffsetsUnsupported,
boolean enableUnstableLastVersion
) {
return new Builder(
data,
throwOnFetchStableOffsetsUnsupported,
ApiKeys.OFFSET_FETCH.oldestVersion(),
ApiKeys.OFFSET_FETCH.latestVersion(enableUnstableLastVersion)
);
}

public static Builder forTopicNames(
OffsetFetchRequestData data,
boolean throwOnFetchStableOffsetsUnsupported
) {
return new Builder(
data,
throwOnFetchStableOffsetsUnsupported,
ApiKeys.OFFSET_FETCH.oldestVersion(),
(short) (TOPIC_ID_MIN_VERSION - 1)
);
}

private Builder(
OffsetFetchRequestData data,
boolean throwOnFetchStableOffsetsUnsupported,
short oldestAllowedVersion,
short latestAllowedVersion
) {
super(ApiKeys.OFFSET_FETCH, oldestAllowedVersion, latestAllowedVersion);
this.data = data;
this.throwOnFetchStableOffsetsUnsupported = throwOnFetchStableOffsetsUnsupported;
}

@Override
public OffsetFetchRequest build(short version) {
private void throwIfBatchingIsUnsupported(short version) {
if (data.groups().size() > 1 && version < BATCH_MIN_VERSION) {
throw new NoBatchedOffsetFetchRequestException("Broker does not support"
+ " batching groups for fetch offset request on version " + version);
}
}

private void throwIfStableOffsetsUnsupported(short version) {
if (data.requireStable() && version < REQUIRE_STABLE_OFFSET_MIN_VERSION) {
if (throwOnFetchStableOffsetsUnsupported) {
throw new UnsupportedVersionException("Broker unexpectedly " +
Expand All @@ -75,37 +108,77 @@ public OffsetFetchRequest build(short version) {
data.setRequireStable(false);
}
}
// convert data to use the appropriate version since version 8 uses different format
if (version < BATCH_MIN_VERSION) {
OffsetFetchRequestData normalizedData;
if (!data.groups().isEmpty()) {
OffsetFetchRequestGroup group = data.groups().get(0);
String groupName = group.groupId();
List<OffsetFetchRequestTopics> topics = group.topics();
List<OffsetFetchRequestTopic> oldFormatTopics = null;
if (topics != null) {
oldFormatTopics = topics
.stream()
.map(t ->
new OffsetFetchRequestTopic()
.setName(t.name())
.setPartitionIndexes(t.partitionIndexes()))
.collect(Collectors.toList());
}

private void throwIfMissingRequiredTopicIdentifiers(short version) {
if (version < TOPIC_ID_MIN_VERSION) {
data.groups().forEach(group -> {
if (group.topics() != null) {
group.topics().forEach(topic -> {
if (topic.name() == null || topic.name().isEmpty()) {
throw new UnsupportedVersionException("The broker offset fetch api version " +
version + " does require usage of topic names.");
}
});
}
normalizedData = new OffsetFetchRequestData()
.setGroupId(groupName)
.setTopics(oldFormatTopics)
.setRequireStable(data.requireStable());
} else {
normalizedData = data;
}
if (normalizedData.topics() == null && version < TOP_LEVEL_ERROR_AND_NULL_TOPICS_MIN_VERSION) {
throw new UnsupportedVersionException("The broker only supports OffsetFetchRequest " +
"v" + version + ", but we need v2 or newer to request all topic partitions.");
}
return new OffsetFetchRequest(normalizedData, version);
});
} else {
data.groups().forEach(group -> {
if (group.topics() != null) {
group.topics().forEach(topic -> {
if (topic.topicId() == null || topic.topicId().equals(Uuid.ZERO_UUID)) {
throw new UnsupportedVersionException("The broker offset fetch api version " +
version + " does require usage of topic ids.");
}
});
}
});
}
}

private void throwIfRequestingAllTopicsIsUnsupported(short version) {
if (version < TOP_LEVEL_ERROR_AND_NULL_TOPICS_MIN_VERSION) {
data.groups().forEach(group -> {
if (group.topics() == null) {
throw new UnsupportedVersionException("The broker only supports OffsetFetchRequest " +
"v" + version + ", but we need v2 or newer to request all topic partitions.");
}
});
}
return new OffsetFetchRequest(data, version);
}

private OffsetFetchRequestData maybeDowngrade(short version) {
// Convert data to use the appropriate version since version 8
// uses different format.
if (version >= BATCH_MIN_VERSION || data.groups().isEmpty()) return data;

OffsetFetchRequestGroup group = data.groups().get(0);
String groupName = group.groupId();
List<OffsetFetchRequestTopics> topics = group.topics();
List<OffsetFetchRequestTopic> oldFormatTopics = null;

if (topics != null) {
oldFormatTopics = topics
.stream()
.map(t -> new OffsetFetchRequestTopic()
.setName(t.name())
.setPartitionIndexes(t.partitionIndexes()))
.collect(Collectors.toList());
}

return new OffsetFetchRequestData()
.setGroupId(groupName)
.setTopics(oldFormatTopics)
.setRequireStable(data.requireStable());
}

@Override
public OffsetFetchRequest build(short version) {
throwIfBatchingIsUnsupported(version);
throwIfStableOffsetsUnsupported(version);
throwIfMissingRequiredTopicIdentifiers(version);
throwIfRequestingAllTopicsIsUnsupported(version);
return new OffsetFetchRequest(maybeDowngrade(version), version);
}

@Override
Expand Down Expand Up @@ -249,6 +322,10 @@ public static OffsetFetchRequest parse(Readable readable, short version) {
return new OffsetFetchRequest(new OffsetFetchRequestData(readable, version), version);
}

public static boolean useTopicIds(short version) {
return version >= TOPIC_ID_MIN_VERSION;
}

@Override
public OffsetFetchRequestData data() {
return data;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,11 @@
//
// Version 9 is the first version that can be used with the new consumer group protocol (KIP-848). It adds
// the MemberId and MemberEpoch fields. Those are filled in and validated when the new consumer protocol is used.
"validVersions": "1-9",
//
// Version 10 adds support for topic ids and removes support for topic names (KIP-848).
"validVersions": "1-10",
"flexibleVersions": "6+",
"latestVersionUnstable": true,
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0-7", "entityType": "groupId",
"about": "The group to fetch offsets for." },
Expand All @@ -60,8 +63,10 @@
"about": "The member epoch if using the new consumer protocol (KIP-848)." },
{ "name": "Topics", "type": "[]OffsetFetchRequestTopics", "versions": "8+", "nullableVersions": "8+",
"about": "Each topic we would like to fetch offsets for, or null to fetch offsets for all topics.", "fields": [
{ "name": "Name", "type": "string", "versions": "8+", "entityType": "topicName",
{ "name": "Name", "type": "string", "versions": "8-9", "entityType": "topicName", "ignorable": true,
"about": "The topic name."},
{ "name": "TopicId", "type": "uuid", "versions": "10+", "ignorable": true,
"about": "The topic ID." },
{ "name": "PartitionIndexes", "type": "[]int32", "versions": "8+",
"about": "The partition indexes we would like to fetch offsets for." }
]}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@
// Version 9 is the first version that can be used with the new consumer group protocol (KIP-848). The response is
// the same as version 8 but can return STALE_MEMBER_EPOCH and UNKNOWN_MEMBER_ID errors when the new consumer group
// protocol is used.
"validVersions": "1-9",
//
// Version 10 adds support for topic ids and removes support for topic names (KIP-848).
"validVersions": "1-10",
"flexibleVersions": "6+",
// Supported errors:
// - GROUP_AUTHORIZATION_FAILED (version 0+)
Expand All @@ -49,6 +51,7 @@
// - UNSTABLE_OFFSET_COMMIT (version 7+)
// - UNKNOWN_MEMBER_ID (version 9+)
// - STALE_MEMBER_EPOCH (version 9+)
// - UNKNOWN_TOPIC_ID (version 10+)
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", "ignorable": true,
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
Expand Down Expand Up @@ -78,8 +81,10 @@
"about": "The group ID." },
{ "name": "Topics", "type": "[]OffsetFetchResponseTopics", "versions": "8+",
"about": "The responses per topic.", "fields": [
{ "name": "Name", "type": "string", "versions": "8+", "entityType": "topicName",
{ "name": "Name", "type": "string", "versions": "8-9", "entityType": "topicName", "ignorable": true,
"about": "The topic name." },
{ "name": "TopicId", "type": "uuid", "versions": "10+", "ignorable": true,
"about": "The topic ID." },
{ "name": "Partitions", "type": "[]OffsetFetchResponsePartitions", "versions": "8+",
"about": "The responses per partition.", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "8+",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,8 @@ public void testOffsetFetchRequestVersions(short version) throws Exception {
.setMemberEpoch(version >= 9 ? 10 : -1)
.setTopics(List.of(
new OffsetFetchRequestTopics()
.setName("foo")
.setName(version < 10 ? "foo" : "")
.setTopicId(version >= 10 ? Uuid.randomUuid() : Uuid.ZERO_UUID)
.setPartitionIndexes(List.of(0, 1, 2))
))
));
Expand Down Expand Up @@ -606,7 +607,8 @@ public void testOffsetFetchResponseVersions(short version) throws Exception {
.setErrorCode(Errors.INVALID_GROUP_ID.code())
.setTopics(List.of(
new OffsetFetchResponseTopics()
.setName("foo")
.setName(version < 10 ? "foo" : "")
.setTopicId(version >= 10 ? Uuid.randomUuid() : Uuid.ZERO_UUID)
.setPartitions(List.of(
new OffsetFetchResponsePartitions()
.setPartitionIndex(0)
Expand Down
Loading