Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New async bindings #614

Merged
merged 14 commits into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package org.opensearch.migrations;

import io.netty.channel.EventLoop;
import io.netty.util.concurrent.Future;
import org.opensearch.migrations.replay.util.DiagnosticTrackableCompletableFuture;
import org.opensearch.migrations.replay.util.StringTrackableCompletableFuture;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;

public class NettyToCompletableFutureBinders {
public static CompletableFuture<Void>
bindNettyFutureToCompletableFuture(Future<?> nettyFuture, CompletableFuture<Void> cf) {
nettyFuture.addListener(f -> {
if (!f.isSuccess()) {
cf.completeExceptionally(f.cause());
} else {
cf.complete(null);
}
});
return cf;
}

public static CompletableFuture<Void>
bindNettyFutureToCompletableFuture(Future<?> nettyFuture) {
return bindNettyFutureToCompletableFuture(nettyFuture, new CompletableFuture<>());
}

public static DiagnosticTrackableCompletableFuture<String, Void>
bindNettyFutureToTrackableFuture(Future<?> nettyFuture, String label) {
return new StringTrackableCompletableFuture<>(bindNettyFutureToCompletableFuture(nettyFuture), label);
}

public static DiagnosticTrackableCompletableFuture<String, Void>
bindNettyFutureToTrackableFuture(Future<?> nettyFuture, Supplier<String> labelProvider) {
return new StringTrackableCompletableFuture<>(bindNettyFutureToCompletableFuture(nettyFuture), labelProvider);
}

public static DiagnosticTrackableCompletableFuture<String, Void>
bindNettyFutureToTrackableFuture(Function<Runnable, Future<?>> nettyFutureGenerator, String label) {
return bindNettyFutureToTrackableFuture(nettyFutureGenerator.apply(() -> {
}), label);
}

public static DiagnosticTrackableCompletableFuture<String, Void>
bindNettySubmitToTrackableFuture(EventLoop eventLoop) {
return bindNettyFutureToTrackableFuture(eventLoop::submit, "waiting for event loop submission");
}

public static DiagnosticTrackableCompletableFuture<String, Void>
bindNettyScheduleToCompletableFuture(EventLoop eventLoop, Duration delay) {
var delayMs = Math.max(0, delay.toMillis());
AndreKurait marked this conversation as resolved.
Show resolved Hide resolved
return bindNettyFutureToTrackableFuture(eventLoop.schedule(() -> {}, delayMs, TimeUnit.MILLISECONDS),
"scheduling to run next send at " + delay + " in " + delayMs +" ms (clipped)");
AndreKurait marked this conversation as resolved.
Show resolved Hide resolved
}

public static CompletableFuture<Void>
bindNettyScheduleToCompletableFuture(EventLoop eventLoop, Duration delay, CompletableFuture<Void> cf) {
var delayMs = Math.max(0, delay.toMillis());
return bindNettyFutureToCompletableFuture(eventLoop.schedule(() -> {
}, delayMs, TimeUnit.MILLISECONDS), cf);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,7 @@ public ConnectionReplaySession buildConnectionReplaySession(IReplayContexts.ICha
getCompletedChannelFutureAsCompletableFuture(IReplayContexts.IChannelKeyContext connectionContext,
ChannelFuture channelFuture) {
var clientConnectionChannelCreatedFuture =
new StringTrackableCompletableFuture<ChannelFuture>(new CompletableFuture<>(),
() -> "waiting for createClientConnection to finish");
new StringTrackableCompletableFuture<ChannelFuture>("waiting for createClientConnection to finish");
channelFuture.addListener(f -> {
log.atInfo().setMessage(()->
"New network connection result for " + connectionContext + "=" + f.isSuccess()).log();
Expand Down Expand Up @@ -145,8 +144,7 @@ public void closeConnection(IReplayContexts.IChannelKeyContext ctx, int sessionN
private DiagnosticTrackableCompletableFuture<String, Channel>
closeClientConnectionChannel(ConnectionReplaySession channelAndFutureWork) {
var channelClosedFuture =
new StringTrackableCompletableFuture<Channel>(new CompletableFuture<>(),
()->"Waiting for closeFuture() on channel");
new StringTrackableCompletableFuture<Channel>("Waiting for closeFuture() on channel");

channelAndFutureWork.getFutureThatReturnsChannelFuture(false)
.thenAccept(channelFuture-> {
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -105,44 +105,46 @@ class TrafficReplayerAccumulationCallbacks implements AccumulationCallbacks {
@NonNull HttpMessageAndTimestamp request) {
replayEngine.setFirstTimestamp(request.getFirstPacketTimestamp());

var allWorkFinishedForTransaction =
new StringTrackableCompletableFuture<Void>(new CompletableFuture<>(),
()->"waiting for " + ctx + " to be queued and run through TrafficStreamLimiter");
var requestPushFuture = new StringTrackableCompletableFuture<TransformedTargetRequestAndResponse>(
new CompletableFuture<>(), () -> "Waiting to get response from target");
var requestKey = ctx.getReplayerRequestKey();
liveTrafficStreamLimiter.queueWork(1, ctx, wi ->
transformAndSendRequest(replayEngine, request, ctx).future.whenComplete((v,t)->{
liveTrafficStreamLimiter.doneProcessing(wi);
if (t != null) {
requestPushFuture.future.completeExceptionally(t);
} else {
requestPushFuture.future.complete(v);
}
}));
if (!allWorkFinishedForTransaction.future.isDone()) {
log.trace("Adding " + requestKey + " to targetTransactionInProgressMap");
requestWorkTracker.put(requestKey, allWorkFinishedForTransaction);
if (allWorkFinishedForTransaction.future.isDone()) {
requestWorkTracker.remove(requestKey);
}
}

return rrPair ->
requestPushFuture.map(f -> f.handle((v, t) -> {
log.atInfo().setMessage(() -> "Done receiving captured stream for " + ctx +
":" + rrPair.requestData).log();
log.atTrace().setMessage(() ->
"Summary response value for " + requestKey + " returned=" + v).log();
return handleCompletedTransaction(ctx, rrPair, v, t);
}), () -> "logging summary")
.whenComplete((v,t)->{
if (t != null) {
allWorkFinishedForTransaction.future.completeExceptionally(t);
} else {
allWorkFinishedForTransaction.future.complete(null);
}
}, ()->"");
var finishedAccumulatingResponseFuture =
new StringTrackableCompletableFuture<RequestResponsePacketPair>(
()->"waiting for response to be accumulated for " + ctx);
finishedAccumulatingResponseFuture.future.whenComplete((v,t)-> log.atInfo()
.setMessage(() -> "Done receiving captured stream for " + ctx + ":" + v.requestData).log());

var allWorkFinishedForTransactionFuture = sendRequestAfterGoingThroughWorkQueue(ctx, request, requestKey)
.getDeferredFutureThroughHandle((arr,httpRequestException) -> finishedAccumulatingResponseFuture
.thenCompose(rrPair->
StringTrackableCompletableFuture.completedFuture(
handleCompletedTransaction(ctx, rrPair, arr, httpRequestException),
()->"Synchronously committed results"),
() -> "logging summary"),
()->"waiting for accumulation to combine with target response");

assert !allWorkFinishedForTransactionFuture.future.isDone();
log.trace("Adding " + requestKey + " to targetTransactionInProgressMap");
requestWorkTracker.put(requestKey, allWorkFinishedForTransactionFuture);

return finishedAccumulatingResponseFuture.future::complete;
}

private DiagnosticTrackableCompletableFuture<String, TransformedTargetRequestAndResponse>
sendRequestAfterGoingThroughWorkQueue(IReplayContexts.IReplayerHttpTransactionContext ctx,
HttpMessageAndTimestamp request,
UniqueReplayerRequestKey requestKey) {
var workDequeuedByLimiterFuture =
new StringTrackableCompletableFuture<TrafficStreamLimiter.WorkItem>(
()->"waiting for " + ctx + " to be queued and run through TrafficStreamLimiter");
var wi = liveTrafficStreamLimiter.queueWork(1, ctx, workDequeuedByLimiterFuture.future::complete);
var httpSentRequestFuture = workDequeuedByLimiterFuture
.thenCompose(ignored -> transformAndSendRequest(replayEngine, request, ctx),
()->"Waiting to get response from target")
.whenComplete((v,t)-> liveTrafficStreamLimiter.doneProcessing(wi),
()->"releasing work item for the traffic limiter");
httpSentRequestFuture.future.whenComplete((v,t)->log.atTrace().setMessage(() ->
"Summary response value for " + requestKey + " returned=" + v).log());
return httpSentRequestFuture;
}

Void handleCompletedTransaction(@NonNull IReplayContexts.IReplayerHttpTransactionContext context,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import lombok.NonNull;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.opensearch.migrations.replay.datahandlers.NettyPacketToHttpConsumer;
import org.opensearch.migrations.replay.datatypes.UniqueReplayerRequestKey;
import org.opensearch.migrations.replay.tracing.IRootReplayerContext;
import org.opensearch.migrations.replay.tracing.RootReplayerContext;
Expand Down Expand Up @@ -135,7 +136,7 @@ public void setupRunAndWaitForReplayToFinish(Duration observedPacketConnectionTi
Consumer<SourceTargetCaptureTuple> resultTupleConsumer)
throws InterruptedException, ExecutionException {

var senderOrchestrator = new RequestSenderOrchestrator(clientConnectionPool);
var senderOrchestrator = new RequestSenderOrchestrator(clientConnectionPool, NettyPacketToHttpConsumer::new);
var replayEngine = new ReplayEngine(senderOrchestrator, trafficSource, timeShifter);

CapturedTrafficToHttpTransactionAccumulator trafficToHttpTransactionAccumulator =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public NettyPacketToHttpConsumer(ConnectionReplaySession replaySession,
var parentContext = ctx.createTargetRequestContext();
this.setCurrentMessageContext(parentContext.createHttpSendingContext());
responseBuilder = AggregatedRawResponse.builder(Instant.now());
this.activeChannelFuture = new StringTrackableCompletableFuture<>(new CompletableFuture<>(),
this.activeChannelFuture = new StringTrackableCompletableFuture<>(
() -> "incoming connection is ready for " + replaySession);
var initialFuture = this.activeChannelFuture;

Expand Down Expand Up @@ -288,7 +288,7 @@ public DiagnosticTrackableCompletableFuture<String,Void> consumeBytes(ByteBuf pa
System.identityHashCode(packetData) + "): " + httpContext() + ": " +
packetData.toString(StandardCharsets.UTF_8)).log();
return writePacketAndUpdateFuture(packetData).whenComplete((v2,t2)->{
log.atDebug().setMessage(()->"finished writing " + httpContext() + " t=" + t2).log();
log.atTrace().setMessage(()->"finished writing " + httpContext() + " t=" + t2).log();
}, ()->"");
} else {
log.atWarn().setMessage(()-> httpContext().getReplayerRequestKey() +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

@AllArgsConstructor
@Getter
public class ChannelTask {
public class ChannelTask<T> {
public final ChannelTaskType kind;
public final Runnable runnable;
public final FutureTransformer<T> runnable;
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoop;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NonNull;
import lombok.SneakyThrows;
Expand All @@ -12,8 +13,13 @@
import org.opensearch.migrations.replay.util.StringTrackableCompletableFuture;

import java.io.IOException;
import java.util.Comparator;
import java.util.Map;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;

/**
* This class contains everything that is needed to replay packets to a specific channel.
Expand All @@ -34,10 +40,10 @@ public class ConnectionReplaySession {
* EventLoop so that we can route all calls for this object into that loop/thread.
*/
public final EventLoop eventLoop;
public final OnlineRadixSorter scheduleSequencer;
@Getter
private Supplier<DiagnosticTrackableCompletableFuture<String, ChannelFuture>> channelFutureFutureFactory;
private ChannelFuture cachedChannel; // only can be accessed from the eventLoop thread
public final OnlineRadixSorter<Runnable> scheduleSequencer;
public final TimeToResponseFulfillmentFutureMap schedule;
@Getter
private final IReplayContexts.IChannelKeyContext channelKeyContext;
Expand All @@ -49,15 +55,15 @@ public ConnectionReplaySession(EventLoop eventLoop, IReplayContexts.IChannelKeyC
{
this.eventLoop = eventLoop;
this.channelKeyContext = channelKeyContext;
this.scheduleSequencer = new OnlineRadixSorter<>(0);
this.scheduleSequencer = new OnlineRadixSorter(0);
this.schedule = new TimeToResponseFulfillmentFutureMap();
this.channelFutureFutureFactory = channelFutureFutureFactory;
}

public DiagnosticTrackableCompletableFuture<String, ChannelFuture>
getFutureThatReturnsChannelFuture(boolean requireActiveChannel) {
StringTrackableCompletableFuture<ChannelFuture> eventLoopFuture =
new StringTrackableCompletableFuture<>(new CompletableFuture<>(), () -> "procuring a connection");
new StringTrackableCompletableFuture<>("procuring a connection");
eventLoop.submit(() -> {
if (!requireActiveChannel || (cachedChannel != null && cachedChannel.channel().isActive())) {
eventLoopFuture.future.complete(cachedChannel);
Expand Down Expand Up @@ -103,10 +109,10 @@ private static boolean exceptionIsRetryable(@NonNull Throwable t) {
}

public boolean hasWorkRemaining() {
return scheduleSequencer.hasPending() || schedule.hasPendingTransmissions();
return !scheduleSequencer.isEmpty() || schedule.hasPendingTransmissions();
}

public long calculateSizeSlowly() {
return schedule.calculateSizeSlowly() + scheduleSequencer.numPending();
return schedule.calculateSizeSlowly() + scheduleSequencer.size();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package org.opensearch.migrations.replay.datatypes;

import org.opensearch.migrations.replay.util.DiagnosticTrackableCompletableFuture;

import java.util.function.Function;

/**
* This is a function rather than just a supplier so that the future returned can be
* chained to its logical parent dependency.
*/
public interface FutureTransformer<U> extends
Function<DiagnosticTrackableCompletableFuture<String,Void>, DiagnosticTrackableCompletableFuture<String,U>> {
}
Original file line number Diff line number Diff line change
@@ -1,40 +1,41 @@
package org.opensearch.migrations.replay.datatypes;

import java.time.Instant;
import java.util.AbstractMap;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Map;
import java.util.Deque;
import java.util.StringJoiner;
import java.util.TreeMap;

import org.opensearch.migrations.replay.util.DiagnosticTrackableCompletableFuture;
import org.opensearch.migrations.replay.util.StringTrackableCompletableFuture;

public class TimeToResponseFulfillmentFutureMap {

TreeMap<Instant, ArrayDeque<ChannelTask>> timeToRunnableMap = new TreeMap<>();
public static class FutureWorkPoint {
public final Instant startTime;
public final DiagnosticTrackableCompletableFuture<String, Void> scheduleFuture;
private final ChannelTaskType channelTaskType;
public FutureWorkPoint(Instant forTime, ChannelTaskType taskType) {
startTime = forTime;
scheduleFuture = new StringTrackableCompletableFuture<>("scheduled start for " + forTime);
channelTaskType = taskType;
}
}

Deque<FutureWorkPoint> timeToRunnableMap = new ArrayDeque<>();

public void appendTask(Instant start, ChannelTask task) {
assert timeToRunnableMap.keySet().stream().allMatch(t->!t.isAfter(start));
var existing = timeToRunnableMap.computeIfAbsent(start, k->new ArrayDeque<>());
existing.offer(task);
public FutureWorkPoint appendTaskTrigger(Instant start, ChannelTaskType taskType) {
assert timeToRunnableMap.stream().map(fwp->fwp.startTime).allMatch(t->!t.isAfter(start));
var fpp = new FutureWorkPoint(start, taskType);
timeToRunnableMap.offer(fpp);
return fpp;
}

public Map.Entry<Instant, ChannelTask> peekFirstItem() {
var e = timeToRunnableMap.firstEntry();
return e == null ? null : new AbstractMap.SimpleEntry<>(e.getKey(), e.getValue().peek());
public FutureWorkPoint peekFirstItem() {
return timeToRunnableMap.peekFirst();
}

public Instant removeFirstItem() {
var e = timeToRunnableMap.firstEntry();
if (e != null) {
var q = e.getValue();
q.remove();
if (q.isEmpty()) {
timeToRunnableMap.remove(e.getKey());
}
return e.getKey();
} else {
return null;
}
return timeToRunnableMap.isEmpty() ? null : timeToRunnableMap.pop().startTime;
}

public boolean isEmpty() {
Expand All @@ -45,14 +46,12 @@ public boolean hasPendingTransmissions() {
if (timeToRunnableMap.isEmpty()) {
return false;
} else {
return timeToRunnableMap.values().stream()
.flatMap(Collection::stream)
.anyMatch(ct->ct.kind==ChannelTaskType.TRANSMIT);
return timeToRunnableMap.stream().anyMatch(fwp->fwp.channelTaskType==ChannelTaskType.TRANSMIT);
}
}

public long calculateSizeSlowly() {
return timeToRunnableMap.values().stream().map(ArrayDeque::size).mapToInt(x->x).sum();
return timeToRunnableMap.size();
}

@Override
Expand All @@ -64,13 +63,12 @@ private String formatBookends() {
if (timeToRunnableMap.isEmpty()) {
return "";
} else if (timeToRunnableMap.size() == 1) {
return timeToRunnableMap.firstKey().toString();
return timeToRunnableMap.peekFirst().startTime.toString();
} else {
return new StringJoiner("...")
.add(timeToRunnableMap.firstKey().toString())
.add(timeToRunnableMap.lastKey().toString())
.add(timeToRunnableMap.peekFirst().startTime.toString())
.add(timeToRunnableMap.peekLast().toString())
.toString();
}
}

}
Loading