Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat][broker][PIP-195]Implement Filter out all delayed messages and skip them when reading messages from bookies - part7 #19035

Merged
merged 4 commits into from
Jan 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,21 @@ void asyncReadEntries(int numberOfEntriesToRead, ReadEntriesCallback callback, O
void asyncReadEntries(int numberOfEntriesToRead, long maxSizeBytes, ReadEntriesCallback callback,
Object ctx, PositionImpl maxPosition);

/**
* Asynchronously read entries from the ManagedLedger.
*
* @param numberOfEntriesToRead maximum number of entries to return
* @param maxSizeBytes max size in bytes of the entries to return
* @param callback callback object
* @param ctx opaque context
* @param maxPosition max position can read
* @param skipCondition predicate of read filter out
*/
default void asyncReadEntriesWithSkip(int numberOfEntriesToRead, long maxSizeBytes, ReadEntriesCallback callback,
Object ctx, PositionImpl maxPosition, Predicate<PositionImpl> skipCondition) {
asyncReadEntries(numberOfEntriesToRead, maxSizeBytes, callback, ctx, maxPosition);
}

/**
* Get 'N'th entry from the mark delete position in the cursor without updating any cursor positions.
*
Expand Down Expand Up @@ -264,6 +279,55 @@ void asyncReadEntriesOrWait(int numberOfEntriesToRead, ReadEntriesCallback callb
void asyncReadEntriesOrWait(int maxEntries, long maxSizeBytes, ReadEntriesCallback callback, Object ctx,
PositionImpl maxPosition);

/**
* Asynchronously read entries from the ManagedLedger, up to the specified number and size.
*
* <p/>If no entries are available, the callback will not be triggered. Instead it will be registered to wait until
* a new message will be persisted into the managed ledger
*
* @see #readEntriesOrWait(int, long)
* @param maxEntries
* maximum number of entries to return
* @param callback
* callback object
* @param ctx
* opaque context
* @param maxPosition
* max position can read
* @param skipCondition
* predicate of read filter out
*/
default void asyncReadEntriesWithSkipOrWait(int maxEntries, ReadEntriesCallback callback, Object ctx,
PositionImpl maxPosition, Predicate<PositionImpl> skipCondition) {
asyncReadEntriesOrWait(maxEntries, callback, ctx, maxPosition);
}

/**
* Asynchronously read entries from the ManagedLedger, up to the specified number and size.
*
* <p/>If no entries are available, the callback will not be triggered. Instead it will be registered to wait until
* a new message will be persisted into the managed ledger
*
* @see #readEntriesOrWait(int, long)
* @param maxEntries
* maximum number of entries to return
* @param maxSizeBytes
* max size in bytes of the entries to return
* @param callback
* callback object
* @param ctx
* opaque context
* @param maxPosition
* max position can read
* @param skipCondition
* predicate of read filter out
*/
default void asyncReadEntriesWithSkipOrWait(int maxEntries, long maxSizeBytes, ReadEntriesCallback callback,
Object ctx, PositionImpl maxPosition,
Predicate<PositionImpl> skipCondition) {
asyncReadEntriesOrWait(maxEntries, maxSizeBytes, callback, ctx, maxPosition);
}

/**
* Cancel a previously scheduled asyncReadEntriesOrWait operation.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -762,13 +762,19 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {

@Override
public void asyncReadEntries(final int numberOfEntriesToRead, final ReadEntriesCallback callback,
final Object ctx, PositionImpl maxPosition) {
final Object ctx, PositionImpl maxPosition) {
asyncReadEntries(numberOfEntriesToRead, NO_MAX_SIZE_LIMIT, callback, ctx, maxPosition);
}

@Override
public void asyncReadEntries(int numberOfEntriesToRead, long maxSizeBytes, ReadEntriesCallback callback,
Object ctx, PositionImpl maxPosition) {
asyncReadEntriesWithSkip(numberOfEntriesToRead, maxSizeBytes, callback, ctx, maxPosition, null);
}

@Override
public void asyncReadEntriesWithSkip(int numberOfEntriesToRead, long maxSizeBytes, ReadEntriesCallback callback,
Object ctx, PositionImpl maxPosition, Predicate<PositionImpl> skipCondition) {
checkArgument(numberOfEntriesToRead > 0);
if (isClosed()) {
callback.readEntriesFailed(new ManagedLedgerException
Expand All @@ -779,7 +785,8 @@ public void asyncReadEntries(int numberOfEntriesToRead, long maxSizeBytes, ReadE
int numOfEntriesToRead = applyMaxSizeCap(numberOfEntriesToRead, maxSizeBytes);

PENDING_READ_OPS_UPDATER.incrementAndGet(this);
OpReadEntry op = OpReadEntry.create(this, readPosition, numOfEntriesToRead, callback, ctx, maxPosition);
OpReadEntry op =
OpReadEntry.create(this, readPosition, numOfEntriesToRead, callback, ctx, maxPosition, skipCondition);
ledger.asyncReadEntries(op);
}

Expand Down Expand Up @@ -901,6 +908,20 @@ public void asyncReadEntriesOrWait(int numberOfEntriesToRead, ReadEntriesCallbac
@Override
public void asyncReadEntriesOrWait(int maxEntries, long maxSizeBytes, ReadEntriesCallback callback, Object ctx,
PositionImpl maxPosition) {
asyncReadEntriesWithSkipOrWait(maxEntries, maxSizeBytes, callback, ctx, maxPosition, null);
}

@Override
public void asyncReadEntriesWithSkipOrWait(int maxEntries, ReadEntriesCallback callback,
Object ctx, PositionImpl maxPosition,
Predicate<PositionImpl> skipCondition) {
asyncReadEntriesWithSkipOrWait(maxEntries, NO_MAX_SIZE_LIMIT, callback, ctx, maxPosition, skipCondition);
}

@Override
public void asyncReadEntriesWithSkipOrWait(int maxEntries, long maxSizeBytes, ReadEntriesCallback callback,
Object ctx, PositionImpl maxPosition,
Predicate<PositionImpl> skipCondition) {
checkArgument(maxEntries > 0);
if (isClosed()) {
callback.readEntriesFailed(new CursorAlreadyClosedException("Cursor was already closed"), ctx);
Expand All @@ -914,10 +935,11 @@ public void asyncReadEntriesOrWait(int maxEntries, long maxSizeBytes, ReadEntrie
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Read entries immediately", ledger.getName(), name);
}
asyncReadEntries(numberOfEntriesToRead, callback, ctx, maxPosition);
asyncReadEntriesWithSkip(numberOfEntriesToRead, NO_MAX_SIZE_LIMIT, callback, ctx,
maxPosition, skipCondition);
} else {
OpReadEntry op = OpReadEntry.create(this, readPosition, numberOfEntriesToRead, callback,
ctx, maxPosition);
ctx, maxPosition, skipCondition);

if (!WAITING_READ_OP_UPDATER.compareAndSet(this, null, op)) {
op.recycle();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2056,6 +2056,39 @@ private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry)

long lastEntry = min(firstEntry + opReadEntry.getNumberOfEntriesToRead() - 1, lastEntryInLedger);

// Filer out and skip unnecessary read entry
if (opReadEntry.skipCondition != null) {
long firstValidEntry = -1L;
long lastValidEntry = -1L;
long entryId = firstEntry;
for (; entryId <= lastEntry; entryId++) {
if (opReadEntry.skipCondition.test(PositionImpl.get(ledger.getId(), entryId))) {
if (firstValidEntry == -1L) {
firstValidEntry = entryId;
}
} else {
if (firstValidEntry != -1L) {
break;
}
}

if (firstValidEntry != -1L) {
lastValidEntry = entryId;
}
}

// If all messages in [firstEntry...lastEntry] are filter out,
// then manual call internalReadEntriesComplete to advance read position.
if (firstValidEntry == -1L) {
opReadEntry.internalReadEntriesComplete(Collections.emptyList(), opReadEntry.ctx,
PositionImpl.get(ledger.getId(), lastEntry));
return;
}

firstEntry = firstValidEntry;
lastEntry = lastValidEntry;
}

if (log.isDebugEnabled()) {
log.debug("[{}] Reading entries from ledger {} - first={} last={}", name, ledger.getId(), firstEntry,
lastEntry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Predicate;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
Expand All @@ -45,8 +47,10 @@ class OpReadEntry implements ReadEntriesCallback {
private PositionImpl nextReadPosition;
PositionImpl maxPosition;

Predicate<PositionImpl> skipCondition;

public static OpReadEntry create(ManagedCursorImpl cursor, PositionImpl readPositionRef, int count,
ReadEntriesCallback callback, Object ctx, PositionImpl maxPosition) {
ReadEntriesCallback callback, Object ctx, PositionImpl maxPosition, Predicate<PositionImpl> skipCondition) {
OpReadEntry op = RECYCLER.get();
op.readPosition = cursor.ledger.startReadOperationOnLedger(readPositionRef);
op.cursor = cursor;
Expand All @@ -57,13 +61,13 @@ public static OpReadEntry create(ManagedCursorImpl cursor, PositionImpl readPosi
maxPosition = PositionImpl.LATEST;
}
op.maxPosition = maxPosition;
op.skipCondition = skipCondition;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like we forget to reset it when recycle this obj

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right.

op.ctx = ctx;
op.nextReadPosition = PositionImpl.get(op.readPosition);
return op;
}

@Override
public void readEntriesComplete(List<Entry> returnedEntries, Object ctx) {
void internalReadEntriesComplete(List<Entry> returnedEntries, Object ctx, PositionImpl lastPosition) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if it is good to extend this method because we defined a parameter named context.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

context already be opReadEntry.ctx passing

// Filter the returned entries for individual deleted messages
int entriesCount = returnedEntries.size();
long entriesSize = 0;
Expand All @@ -72,13 +76,19 @@ public void readEntriesComplete(List<Entry> returnedEntries, Object ctx) {
}
cursor.updateReadStats(entriesCount, entriesSize);

final PositionImpl lastPosition = (PositionImpl) returnedEntries.get(entriesCount - 1).getPosition();
if (entriesCount != 0) {
lastPosition = (PositionImpl) returnedEntries.get(entriesCount - 1).getPosition();
}
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Read entries succeeded batch_size={} cumulative_size={} requested_count={}",
cursor.ledger.getName(), cursor.getName(), returnedEntries.size(), entries.size(), count);
}
List<Entry> filteredEntries = cursor.filterReadEntries(returnedEntries);
entries.addAll(filteredEntries);

List<Entry> filteredEntries = Collections.emptyList();
if (entriesCount != 0) {
filteredEntries = cursor.filterReadEntries(returnedEntries);
entries.addAll(filteredEntries);
}

// if entries have been filtered out then try to skip reading of already deletedMessages in that range
final Position nexReadPosition = entriesCount != filteredEntries.size()
Expand All @@ -87,6 +97,11 @@ public void readEntriesComplete(List<Entry> returnedEntries, Object ctx) {
checkReadCompletion();
}

@Override
public void readEntriesComplete(List<Entry> returnedEntries, Object ctx) {
internalReadEntriesComplete(returnedEntries, ctx, null);
}

@Override
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
cursor.readOperationCompleted();
Expand Down Expand Up @@ -190,6 +205,7 @@ public void recycle() {
nextReadPosition = null;
maxPosition = null;
recyclerHandle.recycle(this);
skipCondition = null;
}

private static final Logger log = LoggerFactory.getLogger(OpReadEntry.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public void find() {
}
if (cursor.hasMoreEntries(searchPosition)) {
OpReadEntry opReadEntry = OpReadEntry.create(cursor, searchPosition, batchSize,
this, OpScan.this.ctx, null);
this, OpScan.this.ctx, null, null);
ledger.asyncReadEntries(opReadEntry);
} else {
callback.scanComplete(lastSeenPosition, ScanOutcome.COMPLETED, OpScan.this.ctx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -4126,7 +4127,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {

// op readPosition is bigger than maxReadPosition
OpReadEntry opReadEntry = OpReadEntry.create(cursor, ledger.lastConfirmedEntry, 10, callback,
null, PositionImpl.get(lastPosition.getLedgerId(), -1));
null, PositionImpl.get(lastPosition.getLedgerId(), -1), null);
Field field = ManagedCursorImpl.class.getDeclaredField("readPosition");
field.setAccessible(true);
field.set(cursor, PositionImpl.EARLIEST);
Expand All @@ -4148,7 +4149,7 @@ public void testOpReadEntryRecycle() throws Exception {
};

@Cleanup final MockedStatic<OpReadEntry> mockedStaticOpReadEntry = Mockito.mockStatic(OpReadEntry.class);
mockedStaticOpReadEntry.when(() -> OpReadEntry.create(any(), any(), anyInt(), any(), any(), any()))
mockedStaticOpReadEntry.when(() -> OpReadEntry.create(any(), any(), anyInt(), any(), any(), any(), any()))
.thenAnswer(__ -> createOpReadEntry.get());

final ManagedLedgerConfig ledgerConfig = new ManagedLedgerConfig();
Expand Down Expand Up @@ -4252,5 +4253,67 @@ public void testLazyCursorLedgerCreationForSubscriptionCreation() throws Excepti
factory2.shutdown();
}

@Test
public void testReadEntriesWithFilterOut() throws ManagedLedgerException, InterruptedException, ExecutionException {
int readMaxNumber = 10;
int sendNumber = 20;
ManagedLedger ledger = factory.open("testReadEntriesWithFilter");
ManagedCursor cursor = ledger.openCursor("c");
Position position = PositionImpl.EARLIEST;
Position maxCanReadPosition = PositionImpl.EARLIEST;
for (int i = 0; i < sendNumber; i++) {
if (i == readMaxNumber - 1) {
position = ledger.addEntry(new byte[1024]);
} else if (i == sendNumber - 1) {
maxCanReadPosition = ledger.addEntry(new byte[1024]);
} else {
ledger.addEntry(new byte[1024]);
}

}
CompletableFuture<Integer> completableFuture = new CompletableFuture<>();
cursor.asyncReadEntriesWithSkipOrWait(sendNumber, new ReadEntriesCallback() {
@Override
public void readEntriesComplete(List<Entry> entries, Object ctx) {
completableFuture.complete(entries.size());
}

@Override
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
completableFuture.completeExceptionally(exception);
}
}, null, (PositionImpl) position, pos -> {
return pos.getEntryId() % 2 != 0;
});

int number = completableFuture.get();
assertEquals(number, readMaxNumber / 2);

assertEquals(cursor.getReadPosition().getEntryId(), 10);

CompletableFuture<Integer> completableFuture2 = new CompletableFuture<>();
cursor.asyncReadEntriesWithSkipOrWait(sendNumber, new ReadEntriesCallback() {
@Override
public void readEntriesComplete(List<Entry> entries, Object ctx) {
completableFuture2.complete(entries.size());
}

@Override
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
completableFuture2.completeExceptionally(exception);
}
}, null, (PositionImpl) maxCanReadPosition, pos -> {
return pos.getEntryId() % 2 != 0;
});

int number2 = completableFuture2.get();
assertEquals(number2, readMaxNumber / 2);

assertEquals(cursor.getReadPosition().getEntryId(), 20);

cursor.close();
ledger.close();
}

private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ public void readEntriesComplete(List<Entry> entries, Object ctx) {
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {

}
}, null, maxPosition);
}, null, maxPosition, null);
Assert.assertEquals(opReadEntry.readPosition, position);
}

Expand Down Expand Up @@ -3030,7 +3030,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
responseException2.set(exception);
}

}, null, PositionImpl.LATEST);
}, null, PositionImpl.LATEST, null);
ledger.asyncReadEntry(ledgerHandle, PositionImpl.EARLIEST.getEntryId(), PositionImpl.EARLIEST.getEntryId(),
opReadEntry, ctxStr);
retryStrategically((test) -> {
Expand Down
Loading