diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index eccf0c8f33d86..fe36bfd4a7838 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -552,8 +552,14 @@ private void handleTasksWithStateUpdater(final Map> private void handleTasksPendingInitialization() { // All tasks pending initialization are not part of the usual bookkeeping + + final Set tasksToCloseDirty = new HashSet<>(); + for (final Task task : tasks.drainPendingTasksToInit()) { - closeTaskClean(task, Collections.emptySet(), Collections.emptyMap()); + closeTaskClean(task, tasksToCloseDirty, new HashMap<>()); + } + for (final Task task : tasksToCloseDirty) { + closeTaskDirty(task, false); } } @@ -1312,7 +1318,6 @@ private void closeRunningTasksDirty() { private void removeLostActiveTasksFromStateUpdaterAndPendingTasksToInit() { if (stateUpdater != null) { final Map> futures = new LinkedHashMap<>(); - final Map failedTasksDuringCleanClose = new HashMap<>(); final Set tasksToCloseClean = new HashSet<>(tasks.drainPendingActiveTasksToInit()); final Set tasksToCloseDirty = new HashSet<>(); for (final Task restoringTask : stateUpdater.tasks()) { @@ -1323,7 +1328,7 @@ private void removeLostActiveTasksFromStateUpdaterAndPendingTasksToInit() { addToTasksToClose(futures, tasksToCloseClean, tasksToCloseDirty); for (final Task task : tasksToCloseClean) { - closeTaskClean(task, tasksToCloseDirty, failedTasksDuringCleanClose); + closeTaskClean(task, tasksToCloseDirty, new HashMap<>()); } for (final Task task : tasksToCloseDirty) { closeTaskDirty(task, false);