Skip to content

Commit 98e535b

Browse files
authored
MINOR: Simplify OffsetFetchResponse (#19642)
While working on #19515, I came to the conclusion that the OffsetFetchResponse is quite messy and overall too complicated. This patch rationalize the constructors. OffsetFetchResponse has a single constructor accepting the OffsetFetchResponseData. A builder is introduced to handle the down conversion. This will also simplify adding the topic ids. All the changes are mechanical, replacing data structures by others. Reviewers: Lianet Magrans <lmagrans@confluent.io>
1 parent 2dd6126 commit 98e535b

File tree

14 files changed

+1332
-1156
lines changed

14 files changed

+1332
-1156
lines changed

clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java

Lines changed: 40 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
2727
import org.apache.kafka.common.requests.OffsetFetchRequest;
2828
import org.apache.kafka.common.requests.OffsetFetchResponse;
29+
import org.apache.kafka.common.requests.RequestUtils;
2930
import org.apache.kafka.common.utils.LogContext;
3031

3132
import org.slf4j.Logger;
@@ -36,7 +37,6 @@
3637
import java.util.HashMap;
3738
import java.util.List;
3839
import java.util.Map;
39-
import java.util.Optional;
4040
import java.util.Set;
4141
import java.util.stream.Collectors;
4242

@@ -139,40 +139,52 @@ public ApiResult<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> handleR
139139
) {
140140
validateKeys(groupIds);
141141

142-
final OffsetFetchResponse response = (OffsetFetchResponse) abstractResponse;
142+
var response = (OffsetFetchResponse) abstractResponse;
143+
var completed = new HashMap<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>>();
144+
var failed = new HashMap<CoordinatorKey, Throwable>();
145+
var unmapped = new ArrayList<CoordinatorKey>();
143146

144-
Map<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> completed = new HashMap<>();
145-
Map<CoordinatorKey, Throwable> failed = new HashMap<>();
146-
List<CoordinatorKey> unmapped = new ArrayList<>();
147147
for (CoordinatorKey coordinatorKey : groupIds) {
148-
String group = coordinatorKey.idValue;
149-
if (response.groupHasError(group)) {
150-
handleGroupError(CoordinatorKey.byGroupId(group), response.groupLevelError(group), failed, unmapped);
148+
var groupId = coordinatorKey.idValue;
149+
var group = response.group(groupId);
150+
var error = Errors.forCode(group.errorCode());
151+
152+
if (error != Errors.NONE) {
153+
handleGroupError(
154+
coordinatorKey,
155+
error,
156+
failed,
157+
unmapped
158+
);
151159
} else {
152-
final Map<TopicPartition, OffsetAndMetadata> groupOffsetsListing = new HashMap<>();
153-
Map<TopicPartition, OffsetFetchResponse.PartitionData> responseData = response.partitionDataMap(group);
154-
for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> partitionEntry : responseData.entrySet()) {
155-
final TopicPartition topicPartition = partitionEntry.getKey();
156-
OffsetFetchResponse.PartitionData partitionData = partitionEntry.getValue();
157-
final Errors error = partitionData.error;
158-
159-
if (error == Errors.NONE) {
160-
final long offset = partitionData.offset;
161-
final String metadata = partitionData.metadata;
162-
final Optional<Integer> leaderEpoch = partitionData.leaderEpoch;
163-
// Negative offset indicates that the group has no committed offset for this partition
164-
if (offset < 0) {
165-
groupOffsetsListing.put(topicPartition, null);
160+
var offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
161+
162+
group.topics().forEach(topic ->
163+
topic.partitions().forEach(partition -> {
164+
var tp = new TopicPartition(topic.name(), partition.partitionIndex());
165+
var partitionError = Errors.forCode(partition.errorCode());
166+
167+
if (partitionError == Errors.NONE) {
168+
// Negative offset indicates that the group has no committed offset for this partition.
169+
if (partition.committedOffset() < 0) {
170+
offsets.put(tp, null);
171+
} else {
172+
offsets.put(tp, new OffsetAndMetadata(
173+
partition.committedOffset(),
174+
RequestUtils.getLeaderEpoch(partition.committedLeaderEpoch()),
175+
partition.metadata()
176+
));
177+
}
166178
} else {
167-
groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, leaderEpoch, metadata));
179+
log.warn("Skipping return offset for {} due to error {}.", tp, partitionError);
168180
}
169-
} else {
170-
log.warn("Skipping return offset for {} due to error {}.", topicPartition, error);
171-
}
172-
}
173-
completed.put(CoordinatorKey.byGroupId(group), groupOffsetsListing);
181+
})
182+
);
183+
184+
completed.put(coordinatorKey, offsets);
174185
}
175186
}
187+
176188
return new ApiResult<>(completed, failed, unmapped);
177189
}
178190

clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java

Lines changed: 52 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.kafka.common.message.OffsetCommitRequestData;
3636
import org.apache.kafka.common.message.OffsetCommitResponseData;
3737
import org.apache.kafka.common.message.OffsetFetchRequestData;
38+
import org.apache.kafka.common.message.OffsetFetchResponseData;
3839
import org.apache.kafka.common.metrics.Metrics;
3940
import org.apache.kafka.common.protocol.Errors;
4041
import org.apache.kafka.common.record.RecordBatch;
@@ -43,6 +44,7 @@
4344
import org.apache.kafka.common.requests.OffsetCommitResponse;
4445
import org.apache.kafka.common.requests.OffsetFetchRequest;
4546
import org.apache.kafka.common.requests.OffsetFetchResponse;
47+
import org.apache.kafka.common.requests.RequestUtils;
4648
import org.apache.kafka.common.utils.LogContext;
4749
import org.apache.kafka.common.utils.Time;
4850
import org.apache.kafka.common.utils.Timer;
@@ -1012,13 +1014,14 @@ public NetworkClientDelegate.UnsentRequest toUnsentRequest() {
10121014
@Override
10131015
void onResponse(final ClientResponse response) {
10141016
long currentTimeMs = response.receivedTimeMs();
1015-
OffsetFetchResponse fetchResponse = (OffsetFetchResponse) response.responseBody();
1016-
Errors responseError = fetchResponse.groupLevelError(groupId);
1017-
if (responseError != Errors.NONE) {
1018-
onFailure(currentTimeMs, responseError);
1017+
var fetchResponse = (OffsetFetchResponse) response.responseBody();
1018+
var groupResponse = fetchResponse.group(groupId);
1019+
var error = Errors.forCode(groupResponse.errorCode());
1020+
if (error != Errors.NONE) {
1021+
onFailure(currentTimeMs, error);
10191022
return;
10201023
}
1021-
onSuccess(currentTimeMs, fetchResponse);
1024+
onSuccess(currentTimeMs, groupResponse);
10221025
}
10231026

10241027
/**
@@ -1083,53 +1086,58 @@ void removeRequest() {
10831086
* offsets contained in the response, and record a successful request attempt.
10841087
*/
10851088
private void onSuccess(final long currentTimeMs,
1086-
final OffsetFetchResponse response) {
1087-
Set<String> unauthorizedTopics = null;
1088-
Map<TopicPartition, OffsetFetchResponse.PartitionData> responseData =
1089-
response.partitionDataMap(groupId);
1090-
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(responseData.size());
1091-
Set<TopicPartition> unstableTxnOffsetTopicPartitions = new HashSet<>();
1092-
boolean failedRequestRegistered = false;
1093-
for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry : responseData.entrySet()) {
1094-
TopicPartition tp = entry.getKey();
1095-
OffsetFetchResponse.PartitionData partitionData = entry.getValue();
1096-
if (partitionData.hasError()) {
1097-
Errors error = partitionData.error;
1098-
log.debug("Failed to fetch offset for partition {}: {}", tp, error.message());
1089+
final OffsetFetchResponseData.OffsetFetchResponseGroup response) {
1090+
var offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
1091+
var unstableTxnOffsetTopicPartitions = new HashSet<TopicPartition>();
1092+
var unauthorizedTopics = new HashSet<String>();
1093+
var failedRequestRegistered = false;
1094+
1095+
for (var topic : response.topics()) {
1096+
for (var partition : topic.partitions()) {
1097+
var tp = new TopicPartition(
1098+
topic.name(),
1099+
partition.partitionIndex()
1100+
);
1101+
var error = Errors.forCode(partition.errorCode());
1102+
if (error != Errors.NONE) {
1103+
log.debug("Failed to fetch offset for partition {}: {}", tp, error.message());
10991104

1100-
if (!failedRequestRegistered) {
1101-
onFailedAttempt(currentTimeMs);
1102-
failedRequestRegistered = true;
1103-
}
1105+
if (!failedRequestRegistered) {
1106+
onFailedAttempt(currentTimeMs);
1107+
failedRequestRegistered = true;
1108+
}
11041109

1105-
if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
1106-
future.completeExceptionally(new KafkaException("Topic or Partition " + tp + " does not exist"));
1107-
return;
1108-
} else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
1109-
if (unauthorizedTopics == null) {
1110-
unauthorizedTopics = new HashSet<>();
1110+
if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
1111+
future.completeExceptionally(new KafkaException("Topic or Partition " + tp + " does not exist"));
1112+
return;
1113+
} else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
1114+
unauthorizedTopics.add(tp.topic());
1115+
} else if (error == Errors.UNSTABLE_OFFSET_COMMIT) {
1116+
unstableTxnOffsetTopicPartitions.add(tp);
1117+
} else {
1118+
// Fail with a non-retriable KafkaException for all unexpected partition
1119+
// errors (even if they are retriable)
1120+
future.completeExceptionally(new KafkaException("Unexpected error in fetch offset " +
1121+
"response for partition " + tp + ": " + error.message()));
1122+
return;
11111123
}
1112-
unauthorizedTopics.add(tp.topic());
1113-
} else if (error == Errors.UNSTABLE_OFFSET_COMMIT) {
1114-
unstableTxnOffsetTopicPartitions.add(tp);
1124+
} else if (partition.committedOffset() >= 0) {
1125+
// record the position with the offset (-1 indicates no committed offset to fetch);
1126+
// if there's no committed offset, record as null
1127+
offsets.put(tp, new OffsetAndMetadata(
1128+
partition.committedOffset(),
1129+
RequestUtils.getLeaderEpoch(partition.committedLeaderEpoch()),
1130+
partition.metadata()
1131+
));
11151132
} else {
1116-
// Fail with a non-retriable KafkaException for all unexpected partition
1117-
// errors (even if they are retriable)
1118-
future.completeExceptionally(new KafkaException("Unexpected error in fetch offset " +
1119-
"response for partition " + tp + ": " + error.message()));
1120-
return;
1133+
log.info("Found no committed offset for partition {}", tp);
1134+
offsets.put(tp, null);
11211135
}
1122-
} else if (partitionData.offset >= 0) {
1123-
// record the position with the offset (-1 indicates no committed offset to fetch);
1124-
// if there's no committed offset, record as null
1125-
offsets.put(tp, new OffsetAndMetadata(partitionData.offset, partitionData.leaderEpoch, partitionData.metadata));
1126-
} else {
1127-
log.info("Found no committed offset for partition {}", tp);
1128-
offsets.put(tp, null);
1136+
11291137
}
11301138
}
11311139

1132-
if (unauthorizedTopics != null) {
1140+
if (!unauthorizedTopics.isEmpty()) {
11331141
future.completeExceptionally(new TopicAuthorizationException(unauthorizedTopics));
11341142
} else if (!unstableTxnOffsetTopicPartitions.isEmpty()) {
11351143
// TODO: Optimization question: Do we need to retry all partitions upon a single partition error?

clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java

Lines changed: 51 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import org.apache.kafka.common.requests.OffsetCommitResponse;
6464
import org.apache.kafka.common.requests.OffsetFetchRequest;
6565
import org.apache.kafka.common.requests.OffsetFetchResponse;
66+
import org.apache.kafka.common.requests.RequestUtils;
6667
import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
6768
import org.apache.kafka.common.utils.LogContext;
6869
import org.apache.kafka.common.utils.Time;
@@ -1512,64 +1513,71 @@ private OffsetFetchResponseHandler() {
15121513

15131514
@Override
15141515
public void handle(OffsetFetchResponse response, RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future) {
1515-
Errors responseError = response.groupLevelError(rebalanceConfig.groupId);
1516-
if (responseError != Errors.NONE) {
1517-
log.debug("Offset fetch failed: {}", responseError.message());
1516+
var group = response.group(rebalanceConfig.groupId);
1517+
var groupError = Errors.forCode(group.errorCode());
15181518

1519-
if (responseError == Errors.COORDINATOR_NOT_AVAILABLE ||
1520-
responseError == Errors.NOT_COORDINATOR) {
1519+
if (groupError != Errors.NONE) {
1520+
log.debug("Offset fetch failed: {}", groupError.message());
1521+
1522+
if (groupError == Errors.COORDINATOR_NOT_AVAILABLE ||
1523+
groupError == Errors.NOT_COORDINATOR) {
15211524
// re-discover the coordinator and retry
1522-
markCoordinatorUnknown(responseError);
1523-
future.raise(responseError);
1524-
} else if (responseError == Errors.GROUP_AUTHORIZATION_FAILED) {
1525+
markCoordinatorUnknown(groupError);
1526+
future.raise(groupError);
1527+
} else if (groupError == Errors.GROUP_AUTHORIZATION_FAILED) {
15251528
future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId));
1526-
} else if (responseError.exception() instanceof RetriableException) {
1529+
} else if (groupError.exception() instanceof RetriableException) {
15271530
// retry
1528-
future.raise(responseError);
1531+
future.raise(groupError);
15291532
} else {
1530-
future.raise(new KafkaException("Unexpected error in fetch offset response: " + responseError.message()));
1533+
future.raise(new KafkaException("Unexpected error in fetch offset response: " + groupError.message()));
15311534
}
15321535
return;
15331536
}
15341537

1535-
Set<String> unauthorizedTopics = null;
1536-
Map<TopicPartition, OffsetFetchResponse.PartitionData> responseData =
1537-
response.partitionDataMap(rebalanceConfig.groupId);
1538-
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(responseData.size());
1539-
Set<TopicPartition> unstableTxnOffsetTopicPartitions = new HashSet<>();
1540-
for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry : responseData.entrySet()) {
1541-
TopicPartition tp = entry.getKey();
1542-
OffsetFetchResponse.PartitionData partitionData = entry.getValue();
1543-
if (partitionData.hasError()) {
1544-
Errors error = partitionData.error;
1545-
log.debug("Failed to fetch offset for partition {}: {}", tp, error.message());
1546-
1547-
if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
1548-
future.raise(new KafkaException("Topic or Partition " + tp + " does not exist"));
1549-
return;
1550-
} else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
1551-
if (unauthorizedTopics == null) {
1552-
unauthorizedTopics = new HashSet<>();
1538+
var offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
1539+
var unstableTxnOffsetTopicPartitions = new HashSet<TopicPartition>();
1540+
var unauthorizedTopics = new HashSet<String>();
1541+
1542+
for (var topic : group.topics()) {
1543+
for (var partition : topic.partitions()) {
1544+
var tp = new TopicPartition(
1545+
topic.name(),
1546+
partition.partitionIndex()
1547+
);
1548+
var error = Errors.forCode(partition.errorCode());
1549+
1550+
if (error != Errors.NONE) {
1551+
log.debug("Failed to fetch offset for partition {}: {}", tp, error.message());
1552+
1553+
if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
1554+
future.raise(new KafkaException("Topic or Partition " + tp + " does not exist"));
1555+
return;
1556+
} else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
1557+
unauthorizedTopics.add(tp.topic());
1558+
} else if (error == Errors.UNSTABLE_OFFSET_COMMIT) {
1559+
unstableTxnOffsetTopicPartitions.add(tp);
1560+
} else {
1561+
future.raise(new KafkaException("Unexpected error in fetch offset response for partition " +
1562+
tp + ": " + error.message()));
1563+
return;
15531564
}
1554-
unauthorizedTopics.add(tp.topic());
1555-
} else if (error == Errors.UNSTABLE_OFFSET_COMMIT) {
1556-
unstableTxnOffsetTopicPartitions.add(tp);
1565+
} else if (partition.committedOffset() >= 0) {
1566+
// record the position with the offset (-1 indicates no committed offset to fetch);
1567+
// if there's no committed offset, record as null
1568+
offsets.put(tp, new OffsetAndMetadata(
1569+
partition.committedOffset(),
1570+
RequestUtils.getLeaderEpoch(partition.committedLeaderEpoch()),
1571+
partition.metadata()
1572+
));
15571573
} else {
1558-
future.raise(new KafkaException("Unexpected error in fetch offset response for partition " +
1559-
tp + ": " + error.message()));
1560-
return;
1574+
log.info("Found no committed offset for partition {}", tp);
1575+
offsets.put(tp, null);
15611576
}
1562-
} else if (partitionData.offset >= 0) {
1563-
// record the position with the offset (-1 indicates no committed offset to fetch);
1564-
// if there's no committed offset, record as null
1565-
offsets.put(tp, new OffsetAndMetadata(partitionData.offset, partitionData.leaderEpoch, partitionData.metadata));
1566-
} else {
1567-
log.info("Found no committed offset for partition {}", tp);
1568-
offsets.put(tp, null);
15691577
}
15701578
}
15711579

1572-
if (unauthorizedTopics != null) {
1580+
if (!unauthorizedTopics.isEmpty()) {
15731581
future.raise(new TopicAuthorizationException(unauthorizedTopics));
15741582
} else if (!unstableTxnOffsetTopicPartitions.isEmpty()) {
15751583
// just retry

0 commit comments

Comments
 (0)