Skip to content

Commit be194f5

Browse files
authored
MINOR: Simplify OffsetFetchRequest (#19572)
While working on #19515, I came to the conclusion that the OffsetFetchRequest is quite messy and overall too complicated. This patch rationalize the constructors. OffsetFetchRequest has a single constructor accepting the OffsetFetchRequestData. This will also simplify adding the topic ids. All the changes are mechanical, replacing data structures by others. Reviewers: PoAn Yang <payang@apache.org>, TengYao Chi <frankvicky@apache.org>, Lianet Magran <lmagrans@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
1 parent 7293f3a commit be194f5

File tree

11 files changed

+607
-530
lines changed

11 files changed

+607
-530
lines changed

clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
2121
import org.apache.kafka.common.Node;
2222
import org.apache.kafka.common.TopicPartition;
23+
import org.apache.kafka.common.message.OffsetFetchRequestData;
2324
import org.apache.kafka.common.protocol.Errors;
2425
import org.apache.kafka.common.requests.AbstractResponse;
2526
import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
@@ -86,15 +87,32 @@ private static Set<CoordinatorKey> coordinatorKeys(Collection<String> groupIds)
8687
}
8788

8889
public OffsetFetchRequest.Builder buildBatchedRequest(Set<CoordinatorKey> groupIds) {
89-
// Create a map that only contains the consumer groups owned by the coordinator.
90-
Map<String, List<TopicPartition>> coordinatorGroupIdToTopicPartitions = new HashMap<>(groupIds.size());
91-
groupIds.forEach(g -> {
92-
ListConsumerGroupOffsetsSpec spec = groupSpecs.get(g.idValue);
93-
List<TopicPartition> partitions = spec.topicPartitions() != null ? new ArrayList<>(spec.topicPartitions()) : null;
94-
coordinatorGroupIdToTopicPartitions.put(g.idValue, partitions);
95-
});
96-
97-
return new OffsetFetchRequest.Builder(coordinatorGroupIdToTopicPartitions, requireStable, false);
90+
// Create a request that only contains the consumer groups owned by the coordinator.
91+
return new OffsetFetchRequest.Builder(
92+
new OffsetFetchRequestData()
93+
.setRequireStable(requireStable)
94+
.setGroups(groupIds.stream().map(groupId -> {
95+
ListConsumerGroupOffsetsSpec spec = groupSpecs.get(groupId.idValue);
96+
97+
List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics = null;
98+
if (spec.topicPartitions() != null) {
99+
topics = spec.topicPartitions().stream()
100+
.collect(Collectors.groupingBy(TopicPartition::topic))
101+
.entrySet()
102+
.stream()
103+
.map(entry -> new OffsetFetchRequestData.OffsetFetchRequestTopics()
104+
.setName(entry.getKey())
105+
.setPartitionIndexes(entry.getValue().stream()
106+
.map(TopicPartition::partition)
107+
.collect(Collectors.toList())))
108+
.collect(Collectors.toList());
109+
}
110+
return new OffsetFetchRequestData.OffsetFetchRequestGroup()
111+
.setGroupId(groupId.idValue)
112+
.setTopics(topics);
113+
}).collect(Collectors.toList())),
114+
false
115+
);
98116
}
99117

100118
@Override

clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.kafka.common.errors.UnstableOffsetCommitException;
3535
import org.apache.kafka.common.message.OffsetCommitRequestData;
3636
import org.apache.kafka.common.message.OffsetCommitResponseData;
37+
import org.apache.kafka.common.message.OffsetFetchRequestData;
3738
import org.apache.kafka.common.metrics.Metrics;
3839
import org.apache.kafka.common.protocol.Errors;
3940
import org.apache.kafka.common.record.RecordBatch;
@@ -970,21 +971,37 @@ public boolean sameRequest(final OffsetFetchRequestState request) {
970971
}
971972

972973
public NetworkClientDelegate.UnsentRequest toUnsentRequest() {
974+
List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics = requestedPartitions.stream()
975+
.collect(Collectors.groupingBy(TopicPartition::topic))
976+
.entrySet()
977+
.stream()
978+
.map(entry -> new OffsetFetchRequestData.OffsetFetchRequestTopics()
979+
.setName(entry.getKey())
980+
.setPartitionIndexes(entry.getValue().stream()
981+
.map(TopicPartition::partition)
982+
.collect(Collectors.toList())))
983+
.collect(Collectors.toList());
973984

974-
OffsetFetchRequest.Builder builder = memberInfo.memberEpoch.
975-
map(epoch -> new OffsetFetchRequest.Builder(
976-
groupId,
977-
memberInfo.memberId,
978-
epoch,
979-
true,
980-
new ArrayList<>(this.requestedPartitions),
981-
throwOnFetchStableOffsetUnsupported))
985+
OffsetFetchRequest.Builder builder = memberInfo.memberEpoch
986+
.map(epoch -> new OffsetFetchRequest.Builder(
987+
new OffsetFetchRequestData()
988+
.setRequireStable(true)
989+
.setGroups(List.of(
990+
new OffsetFetchRequestData.OffsetFetchRequestGroup()
991+
.setGroupId(groupId)
992+
.setMemberId(memberInfo.memberId)
993+
.setMemberEpoch(epoch)
994+
.setTopics(topics))),
995+
throwOnFetchStableOffsetUnsupported))
982996
// Building request without passing member ID/epoch to leave the logic to choose
983997
// default values when not present on the request builder.
984998
.orElseGet(() -> new OffsetFetchRequest.Builder(
985-
groupId,
986-
true,
987-
new ArrayList<>(this.requestedPartitions),
999+
new OffsetFetchRequestData()
1000+
.setRequireStable(true)
1001+
.setGroups(List.of(
1002+
new OffsetFetchRequestData.OffsetFetchRequestGroup()
1003+
.setGroupId(groupId)
1004+
.setTopics(topics))),
9881005
throwOnFetchStableOffsetUnsupported));
9891006
return buildRequestWithResponseHandling(builder);
9901007
}

clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.apache.kafka.common.message.JoinGroupResponseData;
5050
import org.apache.kafka.common.message.OffsetCommitRequestData;
5151
import org.apache.kafka.common.message.OffsetCommitResponseData;
52+
import org.apache.kafka.common.message.OffsetFetchRequestData;
5253
import org.apache.kafka.common.metrics.Measurable;
5354
import org.apache.kafka.common.metrics.Metrics;
5455
import org.apache.kafka.common.metrics.Sensor;
@@ -1477,9 +1478,27 @@ private RequestFuture<Map<TopicPartition, OffsetAndMetadata>> sendOffsetFetchReq
14771478
return RequestFuture.coordinatorNotAvailable();
14781479

14791480
log.debug("Fetching committed offsets for partitions: {}", partitions);
1481+
14801482
// construct the request
1481-
OffsetFetchRequest.Builder requestBuilder =
1482-
new OffsetFetchRequest.Builder(this.rebalanceConfig.groupId, true, new ArrayList<>(partitions), throwOnFetchStableOffsetsUnsupported);
1483+
List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics = partitions.stream()
1484+
.collect(Collectors.groupingBy(TopicPartition::topic))
1485+
.entrySet()
1486+
.stream()
1487+
.map(entry -> new OffsetFetchRequestData.OffsetFetchRequestTopics()
1488+
.setName(entry.getKey())
1489+
.setPartitionIndexes(entry.getValue().stream()
1490+
.map(TopicPartition::partition)
1491+
.collect(Collectors.toList())))
1492+
.collect(Collectors.toList());
1493+
1494+
OffsetFetchRequest.Builder requestBuilder = new OffsetFetchRequest.Builder(
1495+
new OffsetFetchRequestData()
1496+
.setRequireStable(true)
1497+
.setGroups(List.of(
1498+
new OffsetFetchRequestData.OffsetFetchRequestGroup()
1499+
.setGroupId(this.rebalanceConfig.groupId)
1500+
.setTopics(topics))),
1501+
throwOnFetchStableOffsetsUnsupported);
14831502

14841503
// send the request with a callback
14851504
return client.send(coordinator, requestBuilder)

0 commit comments

Comments
 (0)