Skip to content

Commit

Permalink
Fix packetConsumer consumeBytes in RequestSenderOrchestrator to delay…
Browse files Browse the repository at this point in the history
… and chain completions correctly

Signed-off-by: Andre Kurait <[email protected]>
  • Loading branch information
AndreKurait committed Apr 19, 2024
1 parent 3faf480 commit a272676
Showing 1 changed file with 91 additions and 51 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,18 @@
package org.opensearch.migrations.replay;

import io.netty.buffer.ByteBuf;
import io.netty.channel.EventLoop;
import io.netty.util.concurrent.Future;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Stream;
import lombok.Lombok;
import lombok.extern.slf4j.Slf4j;
import org.opensearch.migrations.replay.datahandlers.NettyPacketToHttpConsumer;
import org.opensearch.migrations.replay.datatypes.ChannelTask;
Expand All @@ -11,21 +21,11 @@
import org.opensearch.migrations.replay.datatypes.IndexedChannelInteraction;
import org.opensearch.migrations.replay.datatypes.UniqueReplayerRequestKey;
import org.opensearch.migrations.replay.tracing.IReplayContexts;
import org.opensearch.migrations.replay.tracing.IReplayContexts.IReplayerHttpTransactionContext;
import org.opensearch.migrations.replay.util.DiagnosticTrackableCompletableFuture;
import org.opensearch.migrations.replay.util.StringTrackableCompletableFuture;
import org.slf4j.event.Level;

import java.time.Duration;
import java.time.Instant;
import java.util.Iterator;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Stream;

@Slf4j
public class RequestSenderOrchestrator {

Expand Down Expand Up @@ -88,20 +88,17 @@ public StringTrackableCompletableFuture<Void> scheduleClose(IReplayContexts.ICha
Instant timestamp) {
var channelKey = ctx.getChannelKey();
var channelInteraction = new IndexedChannelInteraction(channelKey, channelInteractionNum);
var finalTunneledResponse =
new StringTrackableCompletableFuture<Void>(new CompletableFuture<>(),
()->"waiting for final signal to confirm close has finished");
var finalTunneledResponse = new StringTrackableCompletableFuture<Void>(new CompletableFuture<>(),
() -> "waiting for final signal to confirm close has finished");

Check warning on line 92 in TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/RequestSenderOrchestrator.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/RequestSenderOrchestrator.java#L92

Added line #L92 was not covered by tests
log.atDebug().setMessage(() -> "Scheduling CLOSE for " + channelInteraction + " at time " + timestamp).log();
asynchronouslyInvokeRunnable(ctx, sessionNumber, channelInteractionNum, true,
finalTunneledResponse,
channelFutureAndRequestSchedule->
scheduleOnConnectionReplaySession(ctx, channelInteractionNum,
channelFutureAndRequestSchedule, finalTunneledResponse, timestamp,
new ChannelTask(ChannelTaskType.CLOSE, () -> {
log.trace("Closing client connection " + channelInteraction);
clientConnectionPool.closeConnection(ctx, sessionNumber);
finalTunneledResponse.future.complete(null);
})));
asynchronouslyInvokeRunnable(ctx, sessionNumber, channelInteractionNum, true, finalTunneledResponse,
channelFutureAndRequestSchedule -> scheduleOnConnectionReplaySession(ctx, channelInteractionNum,
channelFutureAndRequestSchedule, finalTunneledResponse, timestamp,
new ChannelTask(ChannelTaskType.CLOSE, () -> {
log.trace("Closing client connection " + channelInteraction);
clientConnectionPool.closeConnection(ctx, sessionNumber);
finalTunneledResponse.future.complete(null);
})));
return finalTunneledResponse;
}

Expand Down Expand Up @@ -144,8 +141,8 @@ private void scheduleSendRequestOnConnectionReplaySession(IReplayContexts.IRepla
ctx.getReplayerRequestKey().getSourceRequestIndex(), connectionReplaySession, responseFuture, start,
new ChannelTask(ChannelTaskType.TRANSMIT, ()->{
scheduledContext.close();
sendNextPartAndContinue(new NettyPacketToHttpConsumer(connectionReplaySession, ctx),
eventLoop, packets.iterator(), start, interval, new AtomicInteger(), responseFuture);
sendNextPartAndContinue(ctx, new NettyPacketToHttpConsumer(connectionReplaySession, ctx),
eventLoop, packets.iterator(), start, interval, responseFuture);
}));
}

Expand Down Expand Up @@ -209,32 +206,75 @@ private long getDelayFromNowMs(Instant to) {
return Math.max(0, Duration.between(now(), to).toMillis());
}

// TODO - rewrite this - the recursion (at least as it is) is terribly confusing
private void sendNextPartAndContinue(NettyPacketToHttpConsumer packetReceiver,
EventLoop eventLoop, Iterator<ByteBuf> iterator,
Instant start, Duration interval, AtomicInteger counter,
StringTrackableCompletableFuture<AggregatedRawResponse> responseFuture) {
final var oldCounter = counter.getAndIncrement();
log.atTrace().setMessage(()->"sendNextPartAndContinue: counter=" + oldCounter).log();
assert iterator.hasNext() : "Should not have called this with no items to send";

packetReceiver.consumeBytes(iterator.next());
if (iterator.hasNext()) {
Runnable packetSender = () -> sendNextPartAndContinue(packetReceiver, eventLoop,
iterator, start, interval, counter, responseFuture);
var delayMs = Duration.between(now(),
start.plus(interval.multipliedBy(counter.get()))).toMillis();
eventLoop.schedule(packetSender, Math.min(0, delayMs), TimeUnit.MILLISECONDS);
} else {
packetReceiver.finalizeRequest().handle((v,t)-> {
if (t != null) {
responseFuture.future.completeExceptionally(t);
// TODO - Rewrite this with DiagnosticTrackableCompletableFutures
private void sendNextPartAndContinue(IReplayerHttpTransactionContext ctx,
NettyPacketToHttpConsumer packetReceiver,
io.netty.channel.EventLoop eventLoop,
Iterator<ByteBuf> iterator,
Instant start,
Duration interval,
StringTrackableCompletableFuture<AggregatedRawResponse> responseFuture) {

IndexedChannelInteraction channelInteraction = new IndexedChannelInteraction(ctx.getChannelKey(),
ctx.getReplayerRequestKey().getReplayerRequestIndex());
assert iterator.hasNext() : "Should not have called this with no items to send for " + channelInteraction;

var delayFutures = new ArrayDeque<CompletableFuture<Void>>();
var workFutures = new ArrayDeque<CompletableFuture<Void>>();

// initial work future completed to tie to future ones
workFutures.add(CompletableFuture.completedFuture(null));
for (var packetCount = 0; iterator.hasNext(); packetCount++) {
delayFutures.addLast(new CompletableFuture<>());
var byteBuf = iterator.next();

// Create a new future that waits for the last delay and work future to complete, or either to complete exceptionally
// Then logs the activity and processes the ByteBuf using `packetReceiver`.
int finalPacketCount = packetCount;
CompletableFuture<Void> newWorkFuture = CompletableFuture.allOf(
delayFutures.getLast(), workFutures.getLast())
.thenCompose(aVoid -> {
log.atInfo().setMessage("consumeBytes running for " + channelInteraction + " and packet counter " + finalPacketCount).log();
return packetReceiver.consumeBytes(byteBuf).future;
});

// Add the new work future to the list of work futures.
workFutures.addLast(newWorkFuture);
}

var finalFuture = workFutures.getLast().handle((aVoid, consumeThrowable) -> {
var completed = packetReceiver.finalizeRequest();
return completed.handle((v, finalizeThrowable) -> {
var firstThrowable = consumeThrowable != null ? consumeThrowable : finalizeThrowable;
if (firstThrowable != null) {
responseFuture.future.completeExceptionally(firstThrowable);
throw Lombok.sneakyThrow(firstThrowable);
} else {
responseFuture.future.complete(v);
return v;
}
return null;
}, ()->"waiting for finalize to send Aggregated Response");
}, () -> "");

Check warning on line 256 in TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/RequestSenderOrchestrator.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/RequestSenderOrchestrator.java#L256

Added line #L256 was not covered by tests
});

// Now, Schedule Delay Futures to complete.
int count = 0;
var scheduleFutures = new ArrayDeque<ScheduledFuture<?>>();
while (!delayFutures.isEmpty()) {
var minStartTime = start.plus(interval.multipliedBy(count));
long delay = Duration.between(Instant.now(), minStartTime).toMillis();
var futureToSchedule = delayFutures.pop();
scheduleFutures.addLast(eventLoop.schedule(() -> {
futureToSchedule.complete(null);
}, delay, TimeUnit.MILLISECONDS));
count++;
}
}

// During exceptional completion, execution may be done before scheduled delays are complete.
// In this case we will reduce overhead on the event loop by cancelling them
finalFuture.exceptionally((t) -> {
log.atDebug().setCause(t).setMessage("Execution failed for " + channelInteraction + " cancelling in progress delays.").log();
scheduleFutures.forEach(scheduleFuture -> scheduleFuture.cancel(true));
return null;

Check warning on line 277 in TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/RequestSenderOrchestrator.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/RequestSenderOrchestrator.java#L275-L277

Added lines #L275 - L277 were not covered by tests
});
}
}

0 comments on commit a272676

Please sign in to comment.