Skip to content

[fix][broker]Fix deadlock when compaction and topic deletion execute concurrently #24366

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jun 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static java.util.Objects.requireNonNull;
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC;
import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import java.time.Clock;
import java.util.ArrayList;
Expand Down Expand Up @@ -114,6 +115,8 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener {

protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

@VisibleForTesting
@Getter
protected volatile boolean isFenced;

protected final HierarchyTopicPolicies topicPolicies;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ protected TopicStatsHelper initialValue() {
private volatile double lastUpdatedAvgPublishRateInMsg = 0;
private volatile double lastUpdatedAvgPublishRateInByte = 0;

@Getter
private volatile boolean isClosingOrDeleting = false;

private ScheduledFuture<?> fencedTopicMonitoringTask = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,24 @@ public interface RawReader {
*/

static CompletableFuture<RawReader> create(PulsarClient client, String topic, String subscription) {
return create(client, topic, subscription, true);
return create(client, topic, subscription, true, true);
}

static CompletableFuture<RawReader> create(PulsarClient client, String topic, String subscription,
boolean createTopicIfDoesNotExist) {
boolean createTopicIfDoesNotExist, boolean retryOnRecoverableErrors) {
CompletableFuture<Consumer<byte[]>> future = new CompletableFuture<>();
RawReader r =
new RawReaderImpl((PulsarClientImpl) client, topic, subscription, future, createTopicIfDoesNotExist);
new RawReaderImpl((PulsarClientImpl) client, topic, subscription, future, createTopicIfDoesNotExist,
retryOnRecoverableErrors);
return future.thenApply(__ -> r);
}

static CompletableFuture<RawReader> create(PulsarClient client,
ConsumerConfigurationData<byte[]> consumerConfiguration,
boolean createTopicIfDoesNotExist) {
boolean createTopicIfDoesNotExist, boolean retryOnRecoverableErrors) {
CompletableFuture<Consumer<byte[]>> future = new CompletableFuture<>();
RawReader r = new RawReaderImpl((PulsarClientImpl) client,
consumerConfiguration, future, createTopicIfDoesNotExist);
consumerConfiguration, future, createTopicIfDoesNotExist, retryOnRecoverableErrors);
return future.thenApply(__ -> r);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.api.RawReader;
import org.apache.pulsar.client.api.Schema;
Expand All @@ -52,7 +53,7 @@ public class RawReaderImpl implements RawReader {

public RawReaderImpl(PulsarClientImpl client, String topic, String subscription,
CompletableFuture<Consumer<byte[]>> consumerFuture,
boolean createTopicIfDoesNotExist) {
boolean createTopicIfDoesNotExist, boolean retryOnRecoverableErrors) {
consumerConfiguration = new ConsumerConfigurationData<>();
consumerConfiguration.getTopicNames().add(topic);
consumerConfiguration.setSubscriptionName(subscription);
Expand All @@ -62,14 +63,16 @@ public RawReaderImpl(PulsarClientImpl client, String topic, String subscription,
consumerConfiguration.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
consumerConfiguration.setAckReceiptEnabled(true);

consumer = new RawConsumerImpl(client, consumerConfiguration, consumerFuture, createTopicIfDoesNotExist);
consumer = new RawConsumerImpl(client, consumerConfiguration, consumerFuture, createTopicIfDoesNotExist,
retryOnRecoverableErrors);
}

public RawReaderImpl(PulsarClientImpl client, ConsumerConfigurationData<byte[]> consumerConfiguration,
CompletableFuture<Consumer<byte[]>> consumerFuture,
boolean createTopicIfDoesNotExist) {
boolean createTopicIfDoesNotExist, boolean retryOnRecoverableErrors) {
this.consumerConfiguration = consumerConfiguration;
consumer = new RawConsumerImpl(client, consumerConfiguration, consumerFuture, createTopicIfDoesNotExist);
consumer = new RawConsumerImpl(client, consumerConfiguration, consumerFuture, createTopicIfDoesNotExist,
retryOnRecoverableErrors);
}


Expand Down Expand Up @@ -117,9 +120,11 @@ public String toString() {
static class RawConsumerImpl extends ConsumerImpl<byte[]> {
final BlockingQueue<RawMessageAndCnx> incomingRawMessages;
final Queue<CompletableFuture<RawMessage>> pendingRawReceives;
final boolean retryOnRecoverableErrors;

RawConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData<byte[]> conf,
CompletableFuture<Consumer<byte[]>> consumerFuture, boolean createTopicIfDoesNotExist) {
CompletableFuture<Consumer<byte[]>> consumerFuture, boolean createTopicIfDoesNotExist,
boolean retryOnRecoverableErrors) {
super(client,
conf.getSingleTopic(),
conf,
Expand All @@ -135,6 +140,14 @@ static class RawConsumerImpl extends ConsumerImpl<byte[]> {
);
incomingRawMessages = new GrowableArrayBlockingQueue<>();
pendingRawReceives = new ConcurrentLinkedQueue<>();
this.retryOnRecoverableErrors = retryOnRecoverableErrors;
}

protected boolean isUnrecoverableError(Throwable t) {
if (!retryOnRecoverableErrors && (t instanceof PulsarClientException.ServiceNotReadyException)) {
return true;
}
return super.isUnrecoverableError(t);
}

void tryCompletePending() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public Compactor(ServiceConfiguration conf,
}

public CompletableFuture<Long> compact(String topic) {
return RawReader.create(pulsar, topic, COMPACTION_SUBSCRIPTION, false).thenComposeAsync(
return RawReader.create(pulsar, topic, COMPACTION_SUBSCRIPTION, false, false).thenComposeAsync(
this::compactAndCloseReader, scheduler);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,19 @@
*/
package org.apache.pulsar.client.impl;

import static org.testng.Assert.assertTrue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
Expand Down Expand Up @@ -76,7 +81,7 @@ public void testInterruptedWhenCreateConsumer() throws InterruptedException {
.subscribe();
Assert.fail("Should have thrown an exception");
} catch (PulsarClientException e) {
Assert.assertTrue(e.getCause() instanceof InterruptedException);
assertTrue(e.getCause() instanceof InterruptedException);
}
});
startConsumer.start();
Expand All @@ -88,4 +93,39 @@ public void testInterruptedWhenCreateConsumer() throws InterruptedException {
Assert.assertEquals(clientImpl.consumersCount(), 0);
});
}

@Test
public void testReceiveWillDoneAfterClosedConsumer() throws Exception {
String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
String subName = "test-sub";
admin.topics().createNonPartitionedTopic(tpName);
admin.topics().createSubscription(tpName, subName, MessageId.earliest);
ConsumerImpl<byte[]> consumer =
(ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(tpName).subscriptionName(subName).subscribe();
CompletableFuture<Message<byte[]>> future = consumer.receiveAsync();
consumer.close();
Awaitility.await().untilAsserted(() -> {
assertTrue(future.isDone());
});
}

@Test
public void testReceiveWillDoneAfterTopicDeleted() throws Exception {
String namespace = "public/default";
admin.namespaces().setAutoTopicCreation(namespace, AutoTopicCreationOverride.builder()
.allowAutoTopicCreation(false).build());
String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
String subName = "test-sub";
admin.topics().createNonPartitionedTopic(tpName);
admin.topics().createSubscription(tpName, subName, MessageId.earliest);
ConsumerImpl<byte[]> consumer =
(ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(tpName).subscriptionName(subName).subscribe();
CompletableFuture<Message<byte[]>> future = consumer.receiveAsync();
admin.topics().delete(tpName, true);
Awaitility.await().untilAsserted(() -> {
assertTrue(future.isDone());
});
// cleanup.
admin.namespaces().removeAutoTopicCreation(namespace);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,17 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.commons.lang3.tuple.ImmutableTriple;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
Expand All @@ -44,6 +49,7 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.api.RawReader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
Expand All @@ -54,12 +60,17 @@
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.protocol.Commands;
import org.awaitility.Awaitility;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import static org.apache.pulsar.client.impl.RawReaderImpl.DEFAULT_RECEIVER_QUEUE_SIZE;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;

@Test(groups = "broker-impl")
@Slf4j
Expand Down Expand Up @@ -215,7 +226,7 @@ public void testRawReaderWithConfigurationCreation() throws Exception {
consumerConfiguration.setReadCompacted(true);
consumerConfiguration.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
consumerConfiguration.setAckReceiptEnabled(true);
RawReader reader = RawReader.create(pulsarClient, consumerConfiguration, true).get();
RawReader reader = RawReader.create(pulsarClient, consumerConfiguration, true, true).get();

MessageId lastMessageId = reader.getLastMessageIdAsync().get();
while (true) {
Expand Down Expand Up @@ -547,11 +558,62 @@ public void testAutoCreateTopic() throws ExecutionException, InterruptedExceptio

String topic2 = "persistent://my-property/my-ns/" + BrokerTestUtil.newUniqueName("reader");
try {
reader = RawReader.create(pulsarClient, topic2, subscription, false).get();
reader = RawReader.create(pulsarClient, topic2, subscription, false, true).get();
Assert.fail();
} catch (Exception e) {
Assert.assertTrue(e.getCause() instanceof PulsarClientException.TopicDoesNotExistException);
}
reader.closeAsync().join();
}

@Test(timeOut = 60000)
public void testReconnectsWhenServiceNotReady() throws Exception {
String topic = "persistent://my-property/my-ns/" + BrokerTestUtil.newUniqueName("reader");
String subscriptionName = "s1";
admin.topics().createNonPartitionedTopic(topic);
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
RawReader reader = RawReader.create(pulsarClient, topic, subscription).get();

// Inject a delay event for topic close, which leads to that the raw-reader will get a ServiceNotReady error,
PersistentTopic persistentTopic =
(PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get();
ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
ManagedCursor compactionCursor = ml.openCursor(subscriptionName);
ManagedCursor spyCompactionCursor = spy(compactionCursor);
CountDownLatch delayCloseCursorSignal = new CountDownLatch(1);
Answer answer = new Answer() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
delayCloseCursorSignal.await();
return invocationOnMock.callRealMethod();
}
};
doAnswer(answer).when(spyCompactionCursor).asyncClose(any(AsyncCallbacks.CloseCallback.class), any());
ml.getCursors().removeCursor(subscriptionName);
ml.getCursors().add(spyCompactionCursor, ml.getLastConfirmedEntry());

// Unload topic after reader is connected.
// The topic state comes to "fenced", then RawReader will get a ServiceNotReady error,
CompletableFuture<RawMessage> msgFuture = reader.readNextAsync();
CompletableFuture<Void> unloadFuture = admin.topics().unloadAsync(topic);
Awaitility.await().untilAsserted(() -> {
Assert.assertTrue(persistentTopic.isFenced());
});

// Verify: RasReader reconnected after that the unloading is finished, and it can consume successfully.
delayCloseCursorSignal.countDown();
unloadFuture.get();
MessageIdImpl msgIdSent = (MessageIdImpl) producer.send("msg");
RawMessage rawMessage = msgFuture.get();
Assert.assertNotNull(rawMessage);
MessageIdImpl msgIdReceived = (MessageIdImpl) rawMessage.getMessageId();
Assert.assertEquals(msgIdSent.getLedgerId(), msgIdReceived.getLedgerId());
Assert.assertEquals(msgIdSent.getEntryId(), msgIdReceived.getEntryId());

// cleanup.
rawMessage.close();;
producer.close();
reader.closeAsync().get();
admin.topics().delete(topic, false);
}
}
Loading
Loading