Skip to content

Commit 6fe1598

Browse files
authored
KAFKA-18170: Add scheduled job to snapshot cold share partitions. (#19443)
* There could be scenarios where share partition records in `__share_group_state` internal topic are not updated for a while implying these partitions are basically cold. * In this situation, the presence of these holds back the pruner from keeping the topic clean and of manageable size. * To remedy the situation, we have added a periodic `setupSnapshotColdPartitions` in `ShareCoordinatorService` which does a writeAll operation on the associated shards in the coordinator and forces snapshot creation for any cold partitions. In this way the pruner can continue. This job has been added as a timer task. * A new internal config `share.coordinator.cold.partition.snapshot.interval.ms` has been introduced to set the period of the job. * Any failures are logged and ignored. * New tests have been added to verify the feature. Reviewers: PoAn Yang <payang@apache.org>, Andrew Schofield <aschofield@confluent.io>
1 parent 4335027 commit 6fe1598

File tree

7 files changed

+556
-19
lines changed

7 files changed

+556
-19
lines changed

share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorConfig.java

+12-2
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,10 @@ public class ShareCoordinatorConfig {
7676
public static final int STATE_TOPIC_PRUNE_INTERVAL_MS_DEFAULT = 5 * 60 * 1000; // 5 minutes
7777
public static final String STATE_TOPIC_PRUNE_INTERVAL_MS_DOC = "The duration in milliseconds that the share coordinator will wait between pruning eligible records in share-group state topic.";
7878

79+
public static final String COLD_PARTITION_SNAPSHOT_INTERVAL_MS_CONFIG = "share.coordinator.cold.partition.snapshot.interval.ms";
80+
public static final int COLD_PARTITION_SNAPSHOT_INTERVAL_MS_DEFAULT = 5 * 60 * 1000; // 5 minutes
81+
public static final String COLD_PARTITION_SNAPSHOT_INTERVAL_MS_DOC = "The duration in milliseconds that the share coordinator will wait between force snapshotting share partitions which are not being updated.";
82+
7983
public static final ConfigDef CONFIG_DEF = new ConfigDef()
8084
.define(STATE_TOPIC_NUM_PARTITIONS_CONFIG, INT, STATE_TOPIC_NUM_PARTITIONS_DEFAULT, atLeast(1), HIGH, STATE_TOPIC_NUM_PARTITIONS_DOC)
8185
.define(STATE_TOPIC_REPLICATION_FACTOR_CONFIG, SHORT, STATE_TOPIC_REPLICATION_FACTOR_DEFAULT, atLeast(1), HIGH, STATE_TOPIC_REPLICATION_FACTOR_DOC)
@@ -87,7 +91,8 @@ public class ShareCoordinatorConfig {
8791
.define(STATE_TOPIC_COMPRESSION_CODEC_CONFIG, INT, (int) STATE_TOPIC_COMPRESSION_CODEC_DEFAULT.id, HIGH, STATE_TOPIC_COMPRESSION_CODEC_DOC)
8892
.define(APPEND_LINGER_MS_CONFIG, INT, APPEND_LINGER_MS_DEFAULT, atLeast(0), MEDIUM, APPEND_LINGER_MS_DOC)
8993
.define(WRITE_TIMEOUT_MS_CONFIG, INT, WRITE_TIMEOUT_MS_DEFAULT, atLeast(1), HIGH, WRITE_TIMEOUT_MS_DOC)
90-
.defineInternal(STATE_TOPIC_PRUNE_INTERVAL_MS_CONFIG, INT, STATE_TOPIC_PRUNE_INTERVAL_MS_DEFAULT, atLeast(1), LOW, STATE_TOPIC_PRUNE_INTERVAL_MS_DOC);
94+
.defineInternal(STATE_TOPIC_PRUNE_INTERVAL_MS_CONFIG, INT, STATE_TOPIC_PRUNE_INTERVAL_MS_DEFAULT, atLeast(1), LOW, STATE_TOPIC_PRUNE_INTERVAL_MS_DOC)
95+
.defineInternal(COLD_PARTITION_SNAPSHOT_INTERVAL_MS_CONFIG, INT, COLD_PARTITION_SNAPSHOT_INTERVAL_MS_DEFAULT, atLeast(1), LOW, COLD_PARTITION_SNAPSHOT_INTERVAL_MS_DOC);
9196

9297
private final int stateTopicNumPartitions;
9398
private final short stateTopicReplicationFactor;
@@ -100,7 +105,7 @@ public class ShareCoordinatorConfig {
100105
private final CompressionType compressionType;
101106
private final int appendLingerMs;
102107
private final int pruneIntervalMs;
103-
108+
private final int coldPartitionSnapshotIntervalMs;
104109

105110
public ShareCoordinatorConfig(AbstractConfig config) {
106111
stateTopicNumPartitions = config.getInt(STATE_TOPIC_NUM_PARTITIONS_CONFIG);
@@ -116,6 +121,7 @@ public ShareCoordinatorConfig(AbstractConfig config) {
116121
.orElse(null);
117122
appendLingerMs = config.getInt(APPEND_LINGER_MS_CONFIG);
118123
pruneIntervalMs = config.getInt(STATE_TOPIC_PRUNE_INTERVAL_MS_CONFIG);
124+
coldPartitionSnapshotIntervalMs = config.getInt(COLD_PARTITION_SNAPSHOT_INTERVAL_MS_CONFIG);
119125
validate();
120126
}
121127

@@ -163,6 +169,10 @@ public int shareCoordinatorTopicPruneIntervalMs() {
163169
return pruneIntervalMs;
164170
}
165171

172+
public int shareCoordinatorColdPartitionSnapshotIntervalMs() {
173+
return coldPartitionSnapshotIntervalMs;
174+
}
175+
166176
private void validate() {
167177
Utils.require(snapshotUpdateRecordsPerSnapshot >= 0 && snapshotUpdateRecordsPerSnapshot <= 500,
168178
String.format("%s must be between [0, 500]", SNAPSHOT_UPDATE_RECORDS_PER_SNAPSHOT_CONFIG));

share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java

+28-1
Original file line numberDiff line numberDiff line change
@@ -261,10 +261,15 @@ public void startup(
261261

262262
log.info("Starting up.");
263263
numPartitions = shareGroupTopicPartitionCount.getAsInt();
264-
setupRecordPruning();
264+
setupPeriodicJobs();
265265
log.info("Startup complete.");
266266
}
267267

268+
private void setupPeriodicJobs() {
269+
setupRecordPruning();
270+
setupSnapshotColdPartitions();
271+
}
272+
268273
private void setupRecordPruning() {
269274
log.info("Scheduling share-group state topic prune job.");
270275
timer.add(new TimerTask(config.shareCoordinatorTopicPruneIntervalMs()) {
@@ -341,6 +346,28 @@ private CompletableFuture<Void> performRecordPruning(TopicPartition tp) {
341346
return fut;
342347
}
343348

349+
private void setupSnapshotColdPartitions() {
350+
log.info("Scheduling cold share-partition snapshotting.");
351+
timer.add(new TimerTask(config.shareCoordinatorColdPartitionSnapshotIntervalMs()) {
352+
@Override
353+
public void run() {
354+
List<CompletableFuture<Void>> futures = runtime.scheduleWriteAllOperation(
355+
"snapshot-cold-partitions",
356+
Duration.ofMillis(config.shareCoordinatorWriteTimeoutMs()),
357+
ShareCoordinatorShard::snapshotColdPartitions
358+
);
359+
360+
CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[]{}))
361+
.whenComplete((__, exp) -> {
362+
if (exp != null) {
363+
log.error("Received error while snapshotting cold partitions.", exp);
364+
}
365+
setupSnapshotColdPartitions();
366+
});
367+
}
368+
});
369+
}
370+
344371
@Override
345372
public void shutdown() {
346373
if (!isActive.compareAndSet(true, false)) {

share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java

+46-6
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666

6767
import org.slf4j.Logger;
6868

69+
import java.util.ArrayList;
6970
import java.util.List;
7071
import java.util.Optional;
7172

@@ -574,6 +575,46 @@ public CoordinatorResult<InitializeShareGroupStateResponseData, CoordinatorRecor
574575
return new CoordinatorResult<>(List.of(record), responseData);
575576
}
576577

578+
/**
579+
* Iterates over the soft state to determine the share partitions whose last snapshot is
580+
* older than the allowed time interval. The candidate share partitions are force snapshotted.
581+
*
582+
* @return A result containing snapshot records, if any, and a void response.
583+
*/
584+
public CoordinatorResult<Void, CoordinatorRecord> snapshotColdPartitions() {
585+
long coldSnapshottedPartitionsCount = shareStateMap.values().stream()
586+
.filter(shareGroupOffset -> shareGroupOffset.createTimestamp() - shareGroupOffset.writeTimestamp() != 0)
587+
.count();
588+
589+
// If all share partitions are snapshotted, it means that
590+
// system is quiet and cold snapshotting will not help much.
591+
if (coldSnapshottedPartitionsCount == shareStateMap.size()) {
592+
log.debug("All share snapshot records already cold snapshotted, skipping.");
593+
return new CoordinatorResult<>(List.of(), null);
594+
}
595+
596+
// Some active partitions are there.
597+
List<CoordinatorRecord> records = new ArrayList<>();
598+
599+
shareStateMap.forEach((sharePartitionKey, shareGroupOffset) -> {
600+
long timeSinceLastSnapshot = time.milliseconds() - shareGroupOffset.writeTimestamp();
601+
if (timeSinceLastSnapshot >= config.shareCoordinatorColdPartitionSnapshotIntervalMs()) {
602+
// We need to force create a snapshot here
603+
log.info("Last snapshot for {} is older than allowed interval.", sharePartitionKey);
604+
records.add(ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
605+
sharePartitionKey.groupId(),
606+
sharePartitionKey.topicId(),
607+
sharePartitionKey.partition(),
608+
shareGroupOffset.builderSupplier()
609+
.setSnapshotEpoch(shareGroupOffset.snapshotEpoch() + 1) // We need to increment by one as this is a new snapshot.
610+
.setWriteTimestamp(time.milliseconds())
611+
.build()
612+
));
613+
}
614+
});
615+
return new CoordinatorResult<>(records, null);
616+
}
617+
577618
/**
578619
* Util method to generate a ShareSnapshot or ShareUpdate type record for a key, based on various conditions.
579620
* <p>
@@ -589,6 +630,7 @@ private CoordinatorRecord generateShareStateRecord(
589630
WriteShareGroupStateRequestData.PartitionData partitionData,
590631
SharePartitionKey key
591632
) {
633+
long timestamp = time.milliseconds();
592634
if (!shareStateMap.containsKey(key)) {
593635
// Since this is the first time we are getting a write request for key, we should be creating a share snapshot record.
594636
// The incoming partition data could have overlapping state batches, we must merge them
@@ -600,8 +642,8 @@ private CoordinatorRecord generateShareStateRecord(
600642
.setLeaderEpoch(partitionData.leaderEpoch())
601643
.setStateEpoch(partitionData.stateEpoch())
602644
.setStateBatches(mergeBatches(List.of(), partitionData))
603-
.setCreateTimestamp(time.milliseconds())
604-
.setWriteTimestamp(time.milliseconds())
645+
.setCreateTimestamp(timestamp)
646+
.setWriteTimestamp(timestamp)
605647
.build());
606648
} else if (snapshotUpdateCount.getOrDefault(key, 0) >= config.shareCoordinatorSnapshotUpdateRecordsPerSnapshot()) {
607649
ShareGroupOffset currentState = shareStateMap.get(key); // shareStateMap will have the entry as containsKey is true
@@ -620,8 +662,8 @@ private CoordinatorRecord generateShareStateRecord(
620662
.setLeaderEpoch(newLeaderEpoch)
621663
.setStateEpoch(newStateEpoch)
622664
.setStateBatches(mergeBatches(currentState.stateBatches(), partitionData, newStartOffset))
623-
.setCreateTimestamp(time.milliseconds())
624-
.setWriteTimestamp(time.milliseconds())
665+
.setCreateTimestamp(timestamp)
666+
.setWriteTimestamp(timestamp)
625667
.build());
626668
} else {
627669
ShareGroupOffset currentState = shareStateMap.get(key); // shareStateMap will have the entry as containsKey is true.
@@ -636,8 +678,6 @@ private CoordinatorRecord generateShareStateRecord(
636678
.setStartOffset(partitionData.startOffset())
637679
.setLeaderEpoch(partitionData.leaderEpoch())
638680
.setStateBatches(mergeBatches(List.of(), partitionData))
639-
.setCreateTimestamp(currentState.createTimestamp())
640-
.setWriteTimestamp(currentState.writeTimestamp())
641681
.build());
642682
}
643683
}

share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareGroupOffset.java

+14-8
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.apache.kafka.server.share.persister.PersisterStateBatch;
2525

2626
import java.util.Collections;
27-
import java.util.LinkedHashSet;
2827
import java.util.List;
2928
import java.util.Objects;
3029

@@ -33,8 +32,8 @@
3332
* This class is immutable (state batches is not modified out of context).
3433
*/
3534
public class ShareGroupOffset {
36-
public static final int NO_TIMESTAMP = -1;
37-
public static final int UNINITIALIZED_EPOCH = -1;
35+
public static final int NO_TIMESTAMP = 0;
36+
public static final int UNINITIALIZED_EPOCH = 0;
3837
public static final int DEFAULT_EPOCH = 0;
3938

4039
private final int snapshotEpoch;
@@ -161,10 +160,6 @@ public static ShareGroupOffset fromRequest(InitializeShareGroupStateRequestData.
161160
);
162161
}
163162

164-
public LinkedHashSet<PersisterStateBatch> stateBatchAsSet() {
165-
return new LinkedHashSet<>(stateBatches);
166-
}
167-
168163
public static class Builder {
169164
private int snapshotEpoch;
170165
private int stateEpoch;
@@ -195,7 +190,7 @@ public Builder setStartOffset(long startOffset) {
195190
}
196191

197192
public Builder setStateBatches(List<PersisterStateBatch> stateBatches) {
198-
this.stateBatches = stateBatches;
193+
this.stateBatches = stateBatches == null ? Collections.emptyList() : stateBatches.stream().toList();
199194
return this;
200195
}
201196

@@ -245,4 +240,15 @@ public String toString() {
245240
", stateBatches=" + stateBatches +
246241
'}';
247242
}
243+
244+
public Builder builderSupplier() {
245+
return new Builder()
246+
.setSnapshotEpoch(snapshotEpoch())
247+
.setStateEpoch(stateEpoch())
248+
.setLeaderEpoch(leaderEpoch())
249+
.setStartOffset(startOffset())
250+
.setStateBatches(stateBatches())
251+
.setCreateTimestamp(createTimestamp())
252+
.setWriteTimestamp(writeTimestamp());
253+
}
248254
}

share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java

+109-2
Original file line numberDiff line numberDiff line change
@@ -1458,15 +1458,15 @@ public void testRecordPruningTaskPeriodicityWithAllSuccess() throws Exception {
14581458
any(),
14591459
any());
14601460

1461-
timer.advanceClock(30005L); // prune should be called
1461+
timer.advanceClock(30005L); // Prune should be called.
14621462
verify(runtime, times(1))
14631463
.scheduleWriteOperation(
14641464
eq("write-state-record-prune"),
14651465
any(),
14661466
any(),
14671467
any());
14681468

1469-
timer.advanceClock(30005L); // prune should be called
1469+
timer.advanceClock(30005L); // Prune should be called.
14701470
verify(runtime, times(2))
14711471
.scheduleWriteOperation(
14721472
eq("write-state-record-prune"),
@@ -1871,6 +1871,113 @@ public void testRecordPruningTaskRetriesRepeatedSameOffsetForTopic() throws Exce
18711871
service.shutdown();
18721872
}
18731873

1874+
@Test
1875+
public void testColdPartitionSnapshotTaskPeriodicityWithAllSuccess() throws Exception {
1876+
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
1877+
MockTime time = new MockTime();
1878+
MockTimer timer = new MockTimer(time);
1879+
PartitionWriter writer = mock(PartitionWriter.class);
1880+
1881+
Metrics metrics = new Metrics();
1882+
1883+
ShareCoordinatorService service = spy(new ShareCoordinatorService(
1884+
new LogContext(),
1885+
ShareCoordinatorTestConfig.testConfig(),
1886+
runtime,
1887+
new ShareCoordinatorMetrics(metrics),
1888+
time,
1889+
timer,
1890+
writer
1891+
));
1892+
1893+
when(runtime.scheduleWriteAllOperation(
1894+
eq("snapshot-cold-partitions"),
1895+
any(),
1896+
any()
1897+
)).thenReturn(List.of(CompletableFuture.completedFuture(null)));
1898+
1899+
service.startup(() -> 1);
1900+
verify(runtime, times(0))
1901+
.scheduleWriteOperation(
1902+
eq("snapshot-cold-partitions"),
1903+
any(),
1904+
any(),
1905+
any());
1906+
1907+
timer.advanceClock(10005L); // Snapshotting should be called.
1908+
verify(runtime, times(1))
1909+
.scheduleWriteAllOperation(
1910+
eq("snapshot-cold-partitions"),
1911+
any(),
1912+
any());
1913+
1914+
timer.advanceClock(10005L); // Snapshotting should be called.
1915+
verify(runtime, times(2))
1916+
.scheduleWriteAllOperation(
1917+
eq("snapshot-cold-partitions"),
1918+
any(),
1919+
any());
1920+
1921+
checkMetrics(metrics);
1922+
1923+
service.shutdown();
1924+
}
1925+
1926+
@Test
1927+
public void testColdPartitionSnapshotTaskPeriodicityWithSomeFailures() throws Exception {
1928+
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
1929+
MockTime time = new MockTime();
1930+
MockTimer timer = new MockTimer(time);
1931+
PartitionWriter writer = mock(PartitionWriter.class);
1932+
1933+
when(runtime.scheduleWriteAllOperation(
1934+
eq("snapshot-cold-partitions"),
1935+
any(),
1936+
any()
1937+
)).thenReturn(
1938+
List.of(CompletableFuture.completedFuture(null), CompletableFuture.failedFuture(new Exception("bad stuff")))
1939+
).thenReturn(
1940+
List.of(CompletableFuture.completedFuture(null), CompletableFuture.completedFuture(null))
1941+
);
1942+
1943+
Metrics metrics = new Metrics();
1944+
1945+
ShareCoordinatorService service = spy(new ShareCoordinatorService(
1946+
new LogContext(),
1947+
ShareCoordinatorTestConfig.testConfig(),
1948+
runtime,
1949+
new ShareCoordinatorMetrics(metrics),
1950+
time,
1951+
timer,
1952+
writer
1953+
));
1954+
1955+
service.startup(() -> 2);
1956+
verify(runtime, times(0))
1957+
.scheduleWriteAllOperation(
1958+
eq("snapshot-cold-partitions"),
1959+
any(),
1960+
any());
1961+
1962+
timer.advanceClock(10005L); // Snapshotting should be called.
1963+
verify(runtime, times(1)) // For 2 topic partitions.
1964+
.scheduleWriteAllOperation(
1965+
eq("snapshot-cold-partitions"),
1966+
any(),
1967+
any());
1968+
1969+
timer.advanceClock(10005L); // Snapshotting should be called (despite previous partial failure).
1970+
verify(runtime, times(2)) // For 2 topic partitions.
1971+
.scheduleWriteAllOperation(
1972+
eq("snapshot-cold-partitions"),
1973+
any(),
1974+
any());
1975+
1976+
checkMetrics(metrics);
1977+
1978+
service.shutdown();
1979+
}
1980+
18741981
@Test
18751982
public void testShareStateTopicConfigs() {
18761983
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();

0 commit comments

Comments
 (0)