Skip to content

Commit

Permalink
Merge branch 'master' into client-brokerentrymetadata
Browse files Browse the repository at this point in the history
  • Loading branch information
LeBW committed Aug 11, 2021
2 parents 8be37b1 + efe2d3c commit e5f6a0b
Show file tree
Hide file tree
Showing 70 changed files with 3,250 additions and 536 deletions.
2 changes: 2 additions & 0 deletions docker/pulsar-standalone/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ FROM apachepulsar/pulsar-dashboard:latest as dashboard
# Restart from
FROM ubuntu:20.04

ARG DEBIAN_FRONTEND=noninteractive

# Note that the libpq-dev package is needed here in order to install
# the required python psycopg2 package (for postgresql) later
RUN apt-get update \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ public interface ManagedCursorMXBean {
String getLedgerName();

/**
* persist cursor by ledger
* persist cursor by ledger.
* @param success
*/
void persistToLedger(boolean success);

/**
* persist cursor by zookeeper
* persist cursor by zookeeper.
* @param success
*/
void persistToZookeeper(boolean success);
Expand All @@ -70,4 +70,34 @@ public interface ManagedCursorMXBean {
*/
long getPersistZookeeperErrors();

/**
* Add write data to a ledger of a cursor (in bytes).
* This will update writeCursorLedgerLogicalSize and writeCursorLedgerSize.
*
* @param size Size of data written to cursor (in bytes)
*/
void addWriteCursorLedgerSize(long size);

/**
* Add read data from a ledger of a cursor (in bytes).
*
* @param size Size of data read from cursor (in bytes)
*/
void addReadCursorLedgerSize(long size);

/**
* @return the size of data written to cursor (in bytes)
*/
long getWriteCursorLedgerSize();

/**
* @return the size of data written to cursor without replicas (in bytes)
*/
long getWriteCursorLedgerLogicalSize();

/**
* @return the size of data read from cursor (in bytes)
*/
long getReadCursorLedgerSize();

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.bookkeeper.mledger;

import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

import org.apache.bookkeeper.common.annotation.InterfaceAudience;
Expand Down Expand Up @@ -160,4 +161,14 @@ void asyncOpenReadOnlyCursor(String managedLedgerName, Position startPosition, M
*/
void shutdown() throws InterruptedException, ManagedLedgerException;

/**
* Check managed ledger has been initialized before.
*
* @param ledgerName {@link String}
* @return a future represents the result of the operation.
* an instance of {@link Boolean} is returned
* if the operation succeeds.
*/
CompletableFuture<Boolean> asyncExists(String ledgerName);

}
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,7 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
}

LedgerEntry entry = seq.nextElement();
mbean.addReadCursorLedgerSize(entry.getLength());
PositionInfo positionInfo;
try {
positionInfo = PositionInfo.parseFrom(entry.getEntry());
Expand Down Expand Up @@ -2599,7 +2600,8 @@ void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, fin
}

checkNotNull(lh);
lh.asyncAddEntry(pi.toByteArray(), (rc, lh1, entryId, ctx) -> {
byte[] data = pi.toByteArray();
lh.asyncAddEntry(data, (rc, lh1, entryId, ctx) -> {
if (rc == BKException.Code.OK) {
if (log.isDebugEnabled()) {
log.debug("[{}] Updated cursor {} position {} in meta-ledger {}", ledger.getName(), name, position,
Expand All @@ -2614,6 +2616,7 @@ void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, fin
}

mbean.persistToLedger(true);
mbean.addWriteCursorLedgerSize(data.length);
callback.operationComplete();
} else {
log.warn("[{}] Error updating cursor {} position {} in meta-ledger {}: {}", ledger.getName(), name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ public class ManagedCursorMXBeanImpl implements ManagedCursorMXBean {
private final LongAdder persistZookeeperSucceed = new LongAdder();
private final LongAdder persistZookeeperFailed = new LongAdder();

private final LongAdder writeCursorLedgerSize = new LongAdder();
private final LongAdder writeCursorLedgerLogicalSize = new LongAdder();
private final LongAdder readCursorLedgerSize = new LongAdder();

private final ManagedCursor managedCursor;

public ManagedCursorMXBeanImpl(ManagedCursor managedCursor) {
Expand Down Expand Up @@ -83,4 +87,30 @@ public long getPersistZookeeperSucceed() {
public long getPersistZookeeperErrors() {
return persistZookeeperFailed.longValue();
}

@Override
public void addWriteCursorLedgerSize(final long size) {
writeCursorLedgerSize.add(size * ((ManagedCursorImpl) managedCursor).config.getWriteQuorumSize());
writeCursorLedgerLogicalSize.add(size);
}

@Override
public void addReadCursorLedgerSize(final long size) {
readCursorLedgerSize.add(size);
}

@Override
public long getWriteCursorLedgerSize() {
return writeCursorLedgerSize.longValue();
}

@Override
public long getWriteCursorLedgerLogicalSize() {
return writeCursorLedgerLogicalSize.longValue();
}

@Override
public long getReadCursorLedgerSize() {
return readCursorLedgerSize.longValue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,11 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
entryCacheManager.clear();
}

@Override
public CompletableFuture<Boolean> asyncExists(String ledgerName) {
return store.asyncExists(ledgerName);
}

@Override
public ManagedLedgerInfo getManagedLedgerInfo(String name) throws InterruptedException, ManagedLedgerException {
class Result {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.Queue;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
Expand Down Expand Up @@ -128,6 +129,7 @@
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.OffloadedReadPriority;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.util.DateFormatter;
Expand Down Expand Up @@ -2304,19 +2306,11 @@ private boolean isLedgerRetentionOverSizeQuota(long sizeToDelete) {
&& TOTAL_SIZE_UPDATER.get(this) - sizeToDelete >= config.getRetentionSizeInMB() * MegaByte;
}

private boolean isOffloadedNeedsDelete(OffloadContext offload) {
boolean isOffloadedNeedsDelete(OffloadContext offload, Optional<OffloadPolicies> offloadPolicies) {
long elapsedMs = clock.millis() - offload.getTimestamp();

if (config.getLedgerOffloader() != null && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
&& config.getLedgerOffloader().getOffloadPolicies() != null
&& config.getLedgerOffloader().getOffloadPolicies()
.getManagedLedgerOffloadDeletionLagInMillis() != null) {
return offload.getComplete() && !offload.getBookkeeperDeleted()
&& elapsedMs > config.getLedgerOffloader()
.getOffloadPolicies().getManagedLedgerOffloadDeletionLagInMillis();
} else {
return false;
}
return offloadPolicies.filter(policies -> offload.getComplete() && !offload.getBookkeeperDeleted()
&& policies.getManagedLedgerOffloadDeletionLagInMillis() != null
&& elapsedMs > policies.getManagedLedgerOffloadDeletionLagInMillis()).isPresent();
}

/**
Expand All @@ -2337,6 +2331,10 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) {

List<LedgerInfo> ledgersToDelete = Lists.newArrayList();
List<LedgerInfo> offloadedLedgersToDelete = Lists.newArrayList();
Optional<OffloadPolicies> optionalOffloadPolicies = Optional.ofNullable(config.getLedgerOffloader() != null
&& config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
? config.getLedgerOffloader().getOffloadPolicies()
: null);
synchronized (this) {
if (log.isDebugEnabled()) {
log.debug("[{}] Start TrimConsumedLedgers. ledgers={} totalSize={}", name, ledgers.keySet(),
Expand Down Expand Up @@ -2420,7 +2418,8 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) {
}

for (LedgerInfo ls : ledgers.values()) {
if (isOffloadedNeedsDelete(ls.getOffloadContext()) && !ledgersToDelete.contains(ls)) {
if (isOffloadedNeedsDelete(ls.getOffloadContext(), optionalOffloadPolicies)
&& !ledgersToDelete.contains(ls)) {
log.debug("[{}] Ledger {} has been offloaded, bookkeeper ledger needs to be deleted", name,
ls.getLedgerId());
offloadedLedgersToDelete.add(ls);
Expand Down Expand Up @@ -3205,21 +3204,22 @@ public PositionImpl getPreviousPosition(PositionImpl position) {
// The previous position will be the last position of an earlier ledgers
NavigableMap<Long, LedgerInfo> headMap = ledgers.headMap(position.getLedgerId(), false);

if (headMap.isEmpty()) {
final Map.Entry<Long, LedgerInfo> firstEntry = headMap.firstEntry();
if (firstEntry == null) {
// There is no previous ledger, return an invalid position in the current ledger
return PositionImpl.get(position.getLedgerId(), -1);
}

// We need to find the most recent non-empty ledger
for (long ledgerId : headMap.descendingKeySet()) {
LedgerInfo li = headMap.get(ledgerId);
if (li.getEntries() > 0) {
if (li != null && li.getEntries() > 0) {
return PositionImpl.get(li.getLedgerId(), li.getEntries() - 1);
}
}

// in case there are only empty ledgers, we return a position in the first one
return PositionImpl.get(headMap.firstEntry().getKey(), -1);
return PositionImpl.get(firstEntry.getKey(), -1);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.bookkeeper.mledger.impl;

import java.util.List;
import java.util.concurrent.CompletableFuture;

import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
Expand Down Expand Up @@ -129,4 +131,14 @@ void asyncUpdateCursorInfo(String ledgerName, String cursorName, ManagedCursorIn
* @throws MetaStoreException
*/
Iterable<String> getManagedLedgers() throws MetaStoreException;

/**
* Check ledger exists.
*
* @param ledgerName {@link String}
* @return a future represents the result of the operation.
* an instance of {@link Boolean} is returned
* if the operation succeeds.
*/
CompletableFuture<Boolean> asyncExists(String ledgerName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;

import io.netty.buffer.ByteBuf;
Expand Down Expand Up @@ -257,6 +258,11 @@ public Iterable<String> getManagedLedgers() throws MetaStoreException {
}
}

@Override
public CompletableFuture<Boolean> asyncExists(String path) {
return store.exists(PREFIX + path);
}

//
// update timestamp if missing or 0
// 3 cases - timestamp does not exist for ledgers serialized before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
Expand Down Expand Up @@ -87,6 +89,7 @@
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedCursor.IndividualDeletedEntries;
import org.apache.bookkeeper.mledger.ManagedLedger;
Expand All @@ -105,6 +108,7 @@
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.bookkeeper.mledger.util.Futures;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.mutable.MutableObject;
Expand Down Expand Up @@ -3173,4 +3177,34 @@ public void testInvalidateReadHandleWhenConsumed() throws Exception {
cursor3.close();
ledger.close();
}

@Test
public void testDoNotGetOffloadPoliciesMultipleTimesWhenTrimLedgers() throws Exception {
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMaxEntriesPerLedger(1);
config.setMaxSizePerLedgerMb(1);
LedgerOffloader ledgerOffloader = mock(NullLedgerOffloader.class);
OffloadPoliciesImpl offloadPolicies = mock(OffloadPoliciesImpl.class);
when(ledgerOffloader.getOffloadPolicies()).thenReturn(offloadPolicies);
when(ledgerOffloader.getOffloadDriverName()).thenReturn("s3");
config.setLedgerOffloader(ledgerOffloader);
ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open(
"testDoNotGetOffloadPoliciesMultipleTimesWhenTrimLedgers", config);

// Retain the data.
ledger.openCursor("test-cursor");
final int entries = 10;
byte[] data = new byte[1024 * 1024];
for (int i = 0; i < entries; i++) {
ledger.addEntry(data);
}
assertEquals(ledger.ledgers.size(), 10);

// Set a new offloader to cleanup the execution times of getOffloadPolicies()
ledgerOffloader = mock(NullLedgerOffloader.class);
config.setLedgerOffloader(ledgerOffloader);

ledger.internalTrimConsumedLedgers(Futures.NULL_PROMISE);
verify(ledgerOffloader, times(1)).getOffloadPolicies();
}
}
Loading

0 comments on commit e5f6a0b

Please sign in to comment.