Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat][broker]PIP-180 ShadowTopic - Part IV - Add Shadow Replicator #17371

Merged
merged 2 commits into from
Sep 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1058,6 +1058,14 @@ public CompletableFuture<Void> deleteTopic(String topic, boolean forceDelete) {
new IllegalStateException("Delete forbidden topic is replicated on clusters " + clusters));
}

// shadow topic should be deleted first.
if (t.isShadowReplicated()) {
final List<String> shadowTopics = t.getShadowReplicators().keys();
log.error("Delete forbidden. Topic {} is replicated to shadow topics: {}", topic, shadowTopics);
return FutureUtil.failedFuture(new IllegalStateException(
"Delete forbidden. Topic " + topic + " is replicated to shadow topics."));
}

return t.delete();
}

Expand Down Expand Up @@ -2549,6 +2557,8 @@ private void updateReplicatorMessageDispatchRate() {
}
topic.getReplicators().forEach((name, persistentReplicator) ->
persistentReplicator.updateRateLimiter());
topic.getShadowReplicators().forEach((name, persistentReplicator) ->
persistentReplicator.updateRateLimiter());
}
);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,8 @@ CompletableFuture<Subscription> createSubscription(String subscriptionName, Init

boolean isReplicated();

boolean isShadowReplicated();

EntryFilters getEntryFiltersPolicy();

ImmutableMap<String, EntryFilterWithClassLoader> getEntryFilters();
Expand All @@ -248,6 +250,8 @@ void updateRates(NamespaceStats nsStats, NamespaceBundleStats currentBundleStats

ConcurrentOpenHashMap<String, ? extends Replicator> getReplicators();

ConcurrentOpenHashMap<String, ? extends Replicator> getShadowReplicators();

TopicStatsImpl getStats(boolean getPreciseBacklog, boolean subscriptionBacklogSize,
boolean getEarliestTimeInBacklog);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,11 @@ public ConcurrentOpenHashMap<String, NonPersistentReplicator> getReplicators() {
return replicators;
}

@Override
public ConcurrentOpenHashMap<String, ? extends Replicator> getShadowReplicators() {
return ConcurrentOpenHashMap.emptyMap();
}

@Override
public Subscription getSubscription(String subscription) {
return subscriptions.get(subscription);
Expand Down Expand Up @@ -1091,6 +1096,11 @@ public boolean isReplicated() {
return replicators.size() > 1;
}

@Override
public boolean isShadowReplicated() {
return false;
}

@Override
public CompletableFuture<Void> unsubscribe(String subscriptionName) {
// checkInactiveSubscriptions iterates over subscriptions map and removing from the map with the same thread.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.pulsar.broker.service.Replicator;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.MessageImpl;
Expand Down Expand Up @@ -132,7 +133,7 @@ public PersistentReplicator(String localCluster, PersistentTopic localTopic, Man
}

@Override
protected void readEntries(org.apache.pulsar.client.api.Producer<byte[]> producer) {
protected void readEntries(Producer<byte[]> producer) {
// Rewind the cursor to be sure to read again all non-acked messages sent while restarting
cursor.rewind();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.net.BookieId;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.namespace.NamespaceService;
Expand Down Expand Up @@ -172,7 +173,10 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
// Subscriptions to this topic
private final ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions;

private final ConcurrentOpenHashMap<String, Replicator> replicators;
private final ConcurrentOpenHashMap<String/*RemoteCluster*/, Replicator> replicators;
private final ConcurrentOpenHashMap<String/*ShadowTopic*/, Replicator> shadowReplicators;
@Getter
private volatile List<String> shadowTopics;

static final String DEDUPLICATION_CURSOR_NAME = "pulsar.dedup";
private static final String TOPIC_EPOCH_PROPERTY_NAME = "pulsar.topic.epoch";
Expand Down Expand Up @@ -259,6 +263,10 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS
.expectedItems(16)
.concurrencyLevel(1)
.build();
this.shadowReplicators = ConcurrentOpenHashMap.<String, Replicator>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
.build();
this.backloggedCursorThresholdEntries =
brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold();
registerTopicPolicyListener();
Expand Down Expand Up @@ -360,6 +368,10 @@ public CompletableFuture<Void> initialize() {
.expectedItems(16)
.concurrencyLevel(1)
.build();
this.shadowReplicators = ConcurrentOpenHashMap.<String, Replicator>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
.build();
this.compactedTopic = new CompactedTopicImpl(brokerService.pulsar().getBookKeeperClient());
this.backloggedCursorThresholdEntries =
brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold();
Expand Down Expand Up @@ -654,12 +666,14 @@ public CompletableFuture<Void> startReplProducers() {
public CompletableFuture<Void> stopReplProducers() {
List<CompletableFuture<Void>> closeFutures = Lists.newArrayList();
replicators.forEach((region, replicator) -> closeFutures.add(replicator.disconnect()));
shadowReplicators.forEach((__, replicator) -> closeFutures.add(replicator.disconnect()));
return FutureUtil.waitForAll(closeFutures);
}

private synchronized CompletableFuture<Void> closeReplProducersIfNoBacklog() {
List<CompletableFuture<Void>> closeFutures = Lists.newArrayList();
replicators.forEach((region, replicator) -> closeFutures.add(replicator.disconnect(true)));
shadowReplicators.forEach((__, replicator) -> closeFutures.add(replicator.disconnect(true)));
return FutureUtil.waitForAll(closeFutures);
}

Expand Down Expand Up @@ -1145,6 +1159,7 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
if (closeIfClientsConnected) {
List<CompletableFuture<Void>> futures = Lists.newArrayList();
replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
shadowReplicators.forEach((__, replicator) -> futures.add(replicator.disconnect()));
producers.values().forEach(producer -> futures.add(producer.disconnect()));
subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));
FutureUtil.waitForAll(futures).thenRun(() -> {
Expand Down Expand Up @@ -1270,6 +1285,7 @@ public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect

futures.add(transactionBuffer.closeAsync());
replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
shadowReplicators.forEach((__, replicator) -> futures.add(replicator.disconnect()));
producers.values().forEach(producer -> futures.add(producer.disconnect()));
if (topicPublishRateLimiter != null) {
topicPublishRateLimiter.close();
Expand Down Expand Up @@ -1387,7 +1403,7 @@ public CompletableFuture<Void> checkReplication() {
}

List<String> configuredClusters = topicPolicies.getReplicationClusters().get();
int newMessageTTLinSeconds = topicPolicies.getMessageTTLInSeconds().get();
int newMessageTTLInSeconds = topicPolicies.getMessageTTLInSeconds().get();

String localCluster = brokerService.pulsar().getConfiguration().getClusterName();

Expand All @@ -1414,14 +1430,46 @@ public CompletableFuture<Void> checkReplication() {
// Check for replicators to be stopped
replicators.forEach((cluster, replicator) -> {
// Update message TTL
((PersistentReplicator) replicator).updateMessageTTL(newMessageTTLinSeconds);
((PersistentReplicator) replicator).updateMessageTTL(newMessageTTLInSeconds);
if (!cluster.equals(localCluster)) {
if (!configuredClusters.contains(cluster)) {
futures.add(removeReplicator(cluster));
}
}
});

futures.add(checkShadowReplication());

return FutureUtil.waitForAll(futures);
}

private CompletableFuture<Void> checkShadowReplication() {
if (CollectionUtils.isEmpty(shadowTopics)) {
return CompletableFuture.completedFuture(null);
}
List<String> configuredShadowTopics = shadowTopics;
int newMessageTTLInSeconds = topicPolicies.getMessageTTLInSeconds().get();

if (log.isDebugEnabled()) {
log.debug("[{}] Checking shadow replication status, shadowTopics={}", topic, configuredShadowTopics);
}
List<CompletableFuture<Void>> futures = Lists.newArrayList();

// Check for missing replicators
for (String shadowTopic : configuredShadowTopics) {
if (!shadowReplicators.containsKey(shadowTopic)) {
futures.add(startShadowReplicator(shadowTopic));
}
}

// Check for replicators to be stopped
shadowReplicators.forEach((shadowTopic, replicator) -> {
// Update message TTL
((PersistentReplicator) replicator).updateMessageTTL(newMessageTTLInSeconds);
if (!configuredShadowTopics.contains(shadowTopic)) {
futures.add(removeShadowReplicator(shadowTopic));
}
});
return FutureUtil.waitForAll(futures);
}

Expand All @@ -1432,6 +1480,8 @@ public void checkMessageExpiry() {
subscriptions.forEach((__, sub) -> sub.expireMessages(messageTtlInSeconds));
replicators.forEach((__, replicator)
-> ((PersistentReplicator) replicator).expireMessages(messageTtlInSeconds));
shadowReplicators.forEach((__, replicator)
-> ((PersistentReplicator) replicator).expireMessages(messageTtlInSeconds));
}
}

Expand Down Expand Up @@ -1578,6 +1628,79 @@ public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
return future;
}

CompletableFuture<Void> startShadowReplicator(String shadowTopic) {
log.info("[{}] Starting shadow topic replicator to remote: {}", topic, shadowTopic);

String name = ShadowReplicator.getShadowReplicatorName(replicatorPrefix, shadowTopic);
ManagedCursor cursor;
try {
cursor = ledger.newNonDurableCursor(PositionImpl.LATEST, name);
} catch (ManagedLedgerException e) {
log.error("[{}]Open non-durable cursor for shadow replicator failed, name={}", topic, name, e);
return FutureUtil.failedFuture(e);
}
CompletableFuture<Void> future = addShadowReplicationCluster(shadowTopic, cursor);
future.exceptionally(ex -> {
log.error("[{}] Add shadow replication cluster failed, shadowTopic={}", topic, shadowTopic, ex);
return null;
});
return future;
}

protected CompletableFuture<Void> addShadowReplicationCluster(String shadowTopic, ManagedCursor cursor) {
String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
return AbstractReplicator.validatePartitionedTopicAsync(PersistentTopic.this.getName(), brokerService)
.thenCompose(__ -> brokerService.pulsar().getPulsarResources().getClusterResources()
.getClusterAsync(localCluster)
.thenApply(clusterData -> brokerService.getReplicationClient(localCluster, clusterData)))
.thenAccept(replicationClient -> {
Replicator replicator = shadowReplicators.computeIfAbsent(shadowTopic, r -> {
try {
return new ShadowReplicator(shadowTopic, PersistentTopic.this, cursor, brokerService,
(PulsarClientImpl) replicationClient);
} catch (PulsarServerException e) {
log.error("[{}] ShadowReplicator startup failed {}", topic, shadowTopic, e);
}
return null;
});

// clean up replicator if startup is failed
if (replicator == null) {
shadowReplicators.removeNullValue(shadowTopic);
}
});
}

CompletableFuture<Void> removeShadowReplicator(String shadowTopic) {
log.info("[{}] Removing shadow topic replicator to {}", topic, shadowTopic);
final CompletableFuture<Void> future = new CompletableFuture<>();
String name = ShadowReplicator.getShadowReplicatorName(replicatorPrefix, shadowTopic);
shadowReplicators.get(shadowTopic).disconnect().thenRun(() -> {

ledger.asyncDeleteCursor(name, new DeleteCursorCallback() {
@Override
public void deleteCursorComplete(Object ctx) {
shadowReplicators.remove(shadowTopic);
future.complete(null);
}

@Override
public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
log.error("[{}] Failed to delete shadow topic replication cursor {} {}",
topic, name, exception.getMessage(), exception);
future.completeExceptionally(new PersistenceException(exception));
}
}, null);

}).exceptionally(e -> {
log.error("[{}] Failed to close shadow topic replication producer {} {}", topic, name, e.getMessage(), e);
future.completeExceptionally(e);
return null;
});

return future;
}

public boolean isDeduplicationEnabled() {
return messageDeduplication.isEnabled();
}
Expand Down Expand Up @@ -1611,6 +1734,11 @@ public ConcurrentOpenHashMap<String, Replicator> getReplicators() {
return replicators;
}

@Override
public ConcurrentOpenHashMap<String, Replicator> getShadowReplicators() {
return shadowReplicators;
}

public Replicator getPersistentReplicator(String remoteCluster) {
return replicators.get(remoteCluster);
}
Expand Down Expand Up @@ -2442,6 +2570,7 @@ public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
return FutureUtil.waitForAll(producerCheckFutures).thenCompose((__) -> {
return updateSubscriptionsDispatcherRateLimiter().thenCompose((___) -> {
replicators.forEach((name, replicator) -> replicator.updateRateLimiter());
shadowReplicators.forEach((name, replicator) -> replicator.updateRateLimiter());
checkMessageExpiry();
CompletableFuture<Void> replicationFuture = checkReplicationAndRetryOnFailure();
CompletableFuture<Void> dedupFuture = checkDeduplicationStatus();
Expand Down Expand Up @@ -2611,6 +2740,11 @@ public boolean isReplicated() {
return !replicators.isEmpty();
}

@Override
public boolean isShadowReplicated() {
return !shadowReplicators.isEmpty();
}

public CompletableFuture<MessageId> terminate() {
CompletableFuture<MessageId> future = new CompletableFuture<>();
ledger.asyncTerminate(new TerminateCallback() {
Expand Down Expand Up @@ -2666,6 +2800,7 @@ public CompletableFuture<Void> clearBacklog() {
List<CompletableFuture<Void>> futures = Lists.newArrayList();
List<String> cursors = getSubscriptions().keys();
cursors.addAll(getReplicators().keys());
cursors.addAll(getShadowReplicators().keys());
for (String cursor : cursors) {
futures.add(clearBacklog(cursor));
}
Expand Down Expand Up @@ -2693,6 +2828,11 @@ public CompletableFuture<Void> clearBacklog(String cursorName) {
return repl.clearBacklog();
}

repl = (PersistentReplicator) shadowReplicators.get(cursorName);
if (repl != null) {
return repl.clearBacklog();
}

return FutureUtil.failedFuture(new BrokerServiceException("Cursor not found"));
}

Expand Down Expand Up @@ -3063,12 +3203,13 @@ public void onUpdate(TopicPolicies policies) {
return;
}
updateTopicPolicy(policies);

shadowTopics = policies.getShadowTopics();
updateDispatchRateLimiter();
updateSubscriptionsDispatcherRateLimiter().thenRun(() -> {
updatePublishDispatcher();
updateSubscribeRateLimiter();
replicators.forEach((name, replicator) -> replicator.updateRateLimiter());
shadowReplicators.forEach((name, replicator) -> replicator.updateRateLimiter());
checkMessageExpiry();
checkReplicationAndRetryOnFailure();

Expand Down
Loading