Skip to content

Commit

Permalink
[improve][ml] Do not switch thread to execute asyncAddEntry's core lo…
Browse files Browse the repository at this point in the history
…gic (apache#23940)
  • Loading branch information
BewareMyPower authored Feb 10, 2025
1 parent 7a79c78 commit 215b36d
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -802,33 +802,41 @@ public void asyncAddEntry(ByteBuf buffer, int numberOfMessages, AddEntryCallback
buffer.retain();

// Jump to specific thread to avoid contention from writers writing from different threads
executor.execute(() -> {
OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, numberOfMessages, callback, ctx,
currentLedgerTimeoutTriggered);
internalAsyncAddEntry(addOperation);
});
final var addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, numberOfMessages, callback, ctx,
currentLedgerTimeoutTriggered);
var added = false;
try {
// Use synchronized to ensure if `addOperation` is added to queue and fails later, it will be the first
// element in `pendingAddEntries`.
synchronized (this) {
if (managedLedgerInterceptor != null) {
managedLedgerInterceptor.beforeAddEntry(addOperation, addOperation.getNumberOfMessages());
}
final var state = STATE_UPDATER.get(this);
beforeAddEntryToQueue(state);
pendingAddEntries.add(addOperation);
added = true;
afterAddEntryToQueue(state, addOperation);
}
} catch (Throwable throwable) {
if (!added) {
addOperation.failed(ManagedLedgerException.getManagedLedgerException(throwable));
} // else: all elements of `pendingAddEntries` will fail in another thread
}
}

protected synchronized void internalAsyncAddEntry(OpAddEntry addOperation) {
if (!beforeAddEntry(addOperation)) {
return;
}
final State state = STATE_UPDATER.get(this);
protected void beforeAddEntryToQueue(State state) throws ManagedLedgerException {
if (state.isFenced()) {
addOperation.failed(new ManagedLedgerFencedException());
return;
} else if (state == State.Terminated) {
addOperation.failed(new ManagedLedgerTerminatedException("Managed ledger was already terminated"));
return;
} else if (state == State.Closed) {
addOperation.failed(new ManagedLedgerAlreadyClosedException("Managed ledger was already closed"));
return;
} else if (state == State.WriteFailed) {
addOperation.failed(new ManagedLedgerAlreadyClosedException("Waiting to recover from failure"));
return;
throw new ManagedLedgerFencedException();
}
pendingAddEntries.add(addOperation);
switch (state) {
case Terminated -> throw new ManagedLedgerTerminatedException("Managed ledger was already terminated");
case Closed -> throw new ManagedLedgerAlreadyClosedException("Managed ledger was already closed");
case WriteFailed -> throw new ManagedLedgerAlreadyClosedException("Waiting to recover from failure");
}
}

protected void afterAddEntryToQueue(State state, OpAddEntry addOperation) throws ManagedLedgerException {
if (state == State.ClosingLedger || state == State.CreatingLedger) {
// We don't have a ready ledger to write into
// We are waiting for a new ledger to be created
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,25 +223,23 @@ private void initLastConfirmedEntry() {
}

@Override
protected synchronized void internalAsyncAddEntry(OpAddEntry addOperation) {
if (!beforeAddEntry(addOperation)) {
return;
}
protected void beforeAddEntryToQueue(State state) throws ManagedLedgerException {
if (state != State.LedgerOpened) {
addOperation.failed(new ManagedLedgerException("Managed ledger is not opened"));
return;
throw new ManagedLedgerException("Managed ledger is not opened");
}
}

@Override
protected void afterAddEntryToQueue(State state, OpAddEntry addOperation) throws ManagedLedgerException {
if (addOperation.getCtx() == null || !(addOperation.getCtx() instanceof Position position)) {
addOperation.failed(new ManagedLedgerException("Illegal addOperation context object."));
return;
pendingAddEntries.poll();
throw new ManagedLedgerException("Illegal addOperation context object.");
}

if (log.isDebugEnabled()) {
log.debug("[{}] Add entry into shadow ledger lh={} entries={}, pos=({},{})",
name, currentLedger.getId(), currentLedgerEntries, position.getLedgerId(), position.getEntryId());
}
pendingAddEntries.add(addOperation);
if (position.getLedgerId() <= currentLedger.getId()) {
// Write into lastLedger
if (position.getLedgerId() == currentLedger.getId()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
Expand All @@ -29,9 +30,12 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import lombok.Cleanup;
Expand Down Expand Up @@ -499,4 +503,53 @@ public void addFailed(ManagedLedgerException exception, Object ctx) {
ledger.close();
}

@Test
public void testBeforeAddEntry() throws Exception {
final var interceptor = new ManagedLedgerInterceptorImpl(getBrokerEntryMetadataInterceptors(), null);
final var config = new ManagedLedgerConfig();
final var numEntries = 100;
config.setMaxEntriesPerLedger(numEntries);
config.setManagedLedgerInterceptor(interceptor);
@Cleanup final var ml = (ManagedLedgerImpl) factory.open("test_concurrent_add_entry", config);

final var indexesBeforeAdd = new ArrayList<Long>();
final var batchSizes = new ArrayList<Long>();
final var random = new Random();
final var latch = new CountDownLatch(numEntries);
final var executor = Executors.newFixedThreadPool(3);
final var lock = new Object(); // make sure `asyncAddEntry` are called in order
for (int i = 0; i < numEntries; i++) {
final var batchSize = random.nextInt(0, 100);
final var msg = "msg-" + i;
final var callback = new AsyncCallbacks.AddEntryCallback() {

@Override
public void addComplete(Position position, ByteBuf entryData, Object ctx) {
latch.countDown();
}

@Override
public void addFailed(ManagedLedgerException exception, Object ctx) {
log.error("Failed to add {}", msg, exception);
latch.countDown();
}
};
executor.execute(() -> {
synchronized (lock) {
batchSizes.add((long) batchSize);
indexesBeforeAdd.add(interceptor.getIndex() + 1); // index is updated in each asyncAddEntry call
ml.asyncAddEntry(Unpooled.wrappedBuffer(msg.getBytes()), batchSize, callback, null);
}
});
}
assertTrue(latch.await(3, TimeUnit.SECONDS));
synchronized (lock) {
for (int i = 1; i < numEntries; i++) {
final var sum = batchSizes.get(i) + batchSizes.get(i - 1);
batchSizes.set(i, sum);
}
assertEquals(indexesBeforeAdd.subList(1, numEntries), batchSizes.subList(0, numEntries - 1));
}
executor.shutdown();
}
}

0 comments on commit 215b36d

Please sign in to comment.