@@ -186,7 +186,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
186
186
*/
187
187
private class BackgroundEventProcessor implements EventProcessor <BackgroundEvent > {
188
188
189
- private Optional <StreamsRebalanceListener > streamsGroupRebalanceCallbacks = Optional .empty ();
189
+ private Optional <StreamsRebalanceListener > streamsRebalanceListener = Optional .empty ();
190
190
private final Optional <StreamsRebalanceData > streamsRebalanceData ;
191
191
192
192
public BackgroundEventProcessor () {
@@ -202,7 +202,7 @@ private void setStreamsRebalanceListener(final StreamsRebalanceListener streamsR
202
202
throw new IllegalStateException ("Background event processor was not created to be used with Streams " +
203
203
"rebalance protocol events" );
204
204
}
205
- this .streamsGroupRebalanceCallbacks = Optional .of (streamsRebalanceListener );
205
+ this .streamsRebalanceListener = Optional .of (streamsRebalanceListener );
206
206
}
207
207
208
208
@ Override
@@ -277,20 +277,15 @@ private void processStreamsOnAllTasksLostCallbackNeededEvent(final StreamsOnAllT
277
277
278
278
private StreamsOnTasksRevokedCallbackCompletedEvent invokeOnTasksRevokedCallback (final Set <StreamsRebalanceData .TaskId > activeTasksToRevoke ,
279
279
final CompletableFuture <Void > future ) {
280
- final Optional <KafkaException > error ;
281
- final Optional <Exception > exceptionFromCallback = streamsGroupRebalanceCallbacks ().onTasksRevoked (activeTasksToRevoke );
282
- if (exceptionFromCallback .isPresent ()) {
283
- error = Optional .of (ConsumerUtils .maybeWrapAsKafkaException (exceptionFromCallback .get (), "Task revocation callback throws an error" ));
284
- } else {
285
- error = Optional .empty ();
286
- }
280
+ final Optional <Exception > exceptionFromCallback = streamsRebalanceListener ().onTasksRevoked (activeTasksToRevoke );
281
+ final Optional <KafkaException > error = exceptionFromCallback .map (e -> ConsumerUtils .maybeWrapAsKafkaException (e , "Task revocation callback throws an error" ));
287
282
return new StreamsOnTasksRevokedCallbackCompletedEvent (future , error );
288
283
}
289
284
290
285
private StreamsOnTasksAssignedCallbackCompletedEvent invokeOnTasksAssignedCallback (final StreamsRebalanceData .Assignment assignment ,
291
286
final CompletableFuture <Void > future ) {
292
287
final Optional <KafkaException > error ;
293
- final Optional <Exception > exceptionFromCallback = streamsGroupRebalanceCallbacks ().onTasksAssigned (assignment );
288
+ final Optional <Exception > exceptionFromCallback = streamsRebalanceListener ().onTasksAssigned (assignment );
294
289
if (exceptionFromCallback .isPresent ()) {
295
290
error = Optional .of (ConsumerUtils .maybeWrapAsKafkaException (exceptionFromCallback .get (), "Task assignment callback throws an error" ));
296
291
} else {
@@ -302,7 +297,7 @@ private StreamsOnTasksAssignedCallbackCompletedEvent invokeOnTasksAssignedCallba
302
297
303
298
private StreamsOnAllTasksLostCallbackCompletedEvent invokeOnAllTasksLostCallback (final CompletableFuture <Void > future ) {
304
299
final Optional <KafkaException > error ;
305
- final Optional <Exception > exceptionFromCallback = streamsGroupRebalanceCallbacks ().onAllTasksLost ();
300
+ final Optional <Exception > exceptionFromCallback = streamsRebalanceListener ().onAllTasksLost ();
306
301
if (exceptionFromCallback .isPresent ()) {
307
302
error = Optional .of (ConsumerUtils .maybeWrapAsKafkaException (exceptionFromCallback .get (), "All tasks lost callback throws an error" ));
308
303
} else {
@@ -318,8 +313,8 @@ private StreamsRebalanceData streamsRebalanceData() {
318
313
"rebalance protocol events" ));
319
314
}
320
315
321
- private StreamsRebalanceListener streamsGroupRebalanceCallbacks () {
322
- return streamsGroupRebalanceCallbacks .orElseThrow (
316
+ private StreamsRebalanceListener streamsRebalanceListener () {
317
+ return streamsRebalanceListener .orElseThrow (
323
318
() -> new IllegalStateException ("Background event processor was not created to be used with Streams " +
324
319
"rebalance protocol events" ));
325
320
}
0 commit comments