Skip to content

Commit

Permalink
Merge pull request #614 from gregschohn/NewAsyncBindings
Browse files Browse the repository at this point in the history
  • Loading branch information
gregschohn authored Apr 30, 2024
2 parents 60cd859 + 4994898 commit a94008c
Show file tree
Hide file tree
Showing 42 changed files with 1,367 additions and 825 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.opensearch.migrations;

import lombok.AllArgsConstructor;

import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collector;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package org.opensearch.migrations.utils;

import lombok.AllArgsConstructor;

/**
* This class can be used to reduce a stream of Integers into a string (calling getFinalAccumulation()) To use
* ```
* Stream<Integer>...reduce(new SequentialSpanCompressingReducer(-1), SequentialSpanCompressingReducer::addNext,
* (c, d) -> { throw new IllegalStateException("parallel streams aren't allowed"); })
* ```
*/
@AllArgsConstructor
public class SequentialSpanCompressingReducer {
private static final int IGNORED_SENTINEL_VALUE = -1;
private static final char RUN_CHARACTER = '-';

private final int shift;
private final int last;
private final StringBuilder accumulatedText;

public SequentialSpanCompressingReducer(int shift) {
this.shift = shift;
this.last = IGNORED_SENTINEL_VALUE;
this.accumulatedText = new StringBuilder();
}

private boolean lastWasSpan() {
var len = accumulatedText.length();
return len > 0 && accumulatedText.charAt(len-1) == RUN_CHARACTER;
}

public SequentialSpanCompressingReducer addNext(int b) {
if (last+shift == b) {
if (lastWasSpan()) {
return new SequentialSpanCompressingReducer(shift, b,
accumulatedText);
} else {
return new SequentialSpanCompressingReducer(shift, b,
accumulatedText.append(RUN_CHARACTER));
}
} else {
if (lastWasSpan()) {
return new SequentialSpanCompressingReducer(shift, b,
accumulatedText.append(last).append(",").append(b));
} else {
return new SequentialSpanCompressingReducer(shift, b,
accumulatedText.append(last == IGNORED_SENTINEL_VALUE ? "" : ",").append(b));
}
}
}

public String getFinalAccumulation() {
if (lastWasSpan()) {
accumulatedText.append(last);
}
return accumulatedText.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import org.junit.jupiter.api.Test;
import org.opensearch.migrations.replay.datahandlers.http.HttpJsonTransformingConsumer;
import org.opensearch.migrations.replay.datatypes.HttpRequestTransformationStatus;
import org.opensearch.migrations.replay.util.DiagnosticTrackableCompletableFuture;
import org.opensearch.migrations.replay.util.TrackedFuture;
import org.opensearch.migrations.tracing.InstrumentationTest;
import org.opensearch.migrations.transform.JsonJoltTransformBuilder;
import org.opensearch.migrations.transform.JsonJoltTransformer;
Expand Down Expand Up @@ -49,10 +49,9 @@ public void addingCompressionRequestHeaderCompressesPayload() throws ExecutionEx
"host: localhost\n" +
"content-length: " + (numParts * payloadPartSize) + "\n";

DiagnosticTrackableCompletableFuture<String, Void> tail =
compressingTransformer.consumeBytes(sourceHeaders.getBytes(StandardCharsets.UTF_8))
.thenCompose(v -> compressingTransformer.consumeBytes("\n".getBytes(StandardCharsets.UTF_8)),
() -> "AddCompressionEncodingTest.compressingTransformer");
var tail = compressingTransformer.consumeBytes(sourceHeaders.getBytes(StandardCharsets.UTF_8))
.thenCompose(v -> compressingTransformer.consumeBytes("\n".getBytes(StandardCharsets.UTF_8)),
() -> "AddCompressionEncodingTest.compressingTransformer");
final byte[] payloadPart = new byte[payloadPartSize];
Arrays.fill(payloadPart, BYTE_FILL_VALUE);
for (var i = new AtomicInteger(numParts); i.get() > 0; i.decrementAndGet()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package org.opensearch.migrations;

import io.netty.channel.EventLoop;
import io.netty.util.concurrent.Future;
import org.opensearch.migrations.replay.util.TrackedFuture;
import org.opensearch.migrations.replay.util.TextTrackedFuture;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;

public class NettyFutureBinders {

private NettyFutureBinders() {}

public static CompletableFuture<Void>
bindNettyFutureToCompletableFuture(Future<?> nettyFuture, CompletableFuture<Void> cf) {
nettyFuture.addListener(f -> {
if (!f.isSuccess()) {
cf.completeExceptionally(f.cause());
} else {
cf.complete(null);
}
});
return cf;
}

public static CompletableFuture<Void>
bindNettyFutureToCompletableFuture(Future<?> nettyFuture) {
return bindNettyFutureToCompletableFuture(nettyFuture, new CompletableFuture<>());
}

public static TrackedFuture<String, Void>
bindNettyFutureToTrackableFuture(Future<?> nettyFuture, String label) {
return new TextTrackedFuture<>(bindNettyFutureToCompletableFuture(nettyFuture), label);
}

public static TrackedFuture<String, Void>
bindNettyFutureToTrackableFuture(Future<?> nettyFuture, Supplier<String> labelProvider) {
return new TextTrackedFuture<>(bindNettyFutureToCompletableFuture(nettyFuture), labelProvider);
}

public static TrackedFuture<String, Void>
bindNettyFutureToTrackableFuture(Function<Runnable, Future<?>> nettyFutureGenerator, String label) {
return bindNettyFutureToTrackableFuture(nettyFutureGenerator.apply(() -> {
}), label);
}

public static TrackedFuture<String, Void>
bindNettySubmitToTrackableFuture(EventLoop eventLoop) {
return bindNettyFutureToTrackableFuture(eventLoop::submit, "waiting for event loop submission");
}

public static TrackedFuture<String, Void>
bindNettyScheduleToCompletableFuture(EventLoop eventLoop, Duration delay) {
var delayMs = Math.max(0, delay.toMillis());
return bindNettyFutureToTrackableFuture(eventLoop.schedule(() -> {}, delayMs, TimeUnit.MILLISECONDS),
"scheduling to run next send in " + delay + " (clipped: " + delayMs + "ms)");
}

public static CompletableFuture<Void>
bindNettyScheduleToCompletableFuture(EventLoop eventLoop, Duration delay, CompletableFuture<Void> cf) {
var delayMs = Math.max(0, delay.toMillis());
return bindNettyFutureToCompletableFuture(eventLoop.schedule(() -> {
}, delayMs, TimeUnit.MILLISECONDS), cf);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import io.github.resilience4j.ratelimiter.RateLimiterConfig;
import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryConfig;
import org.opensearch.migrations.replay.util.DiagnosticTrackableCompletableFuture;
import org.opensearch.migrations.replay.util.TrackedFuture;

import java.time.Duration;
import java.util.function.Supplier;
Expand All @@ -22,8 +22,8 @@
*/
public class AdaptiveRateLimiter<D,T> {

public DiagnosticTrackableCompletableFuture<D,T>
get(Supplier<DiagnosticTrackableCompletableFuture<D,T>> producer) {
public TrackedFuture<D,T>
get(Supplier<TrackedFuture<D,T>> producer) {
var intervalFunction = IntervalFunction.ofExponentialBackoff(Duration.ofMillis(1),2,Duration.ofSeconds(1));
RetryConfig retryConfig = RetryConfig.custom()
.maxAttempts(Integer.MAX_VALUE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,22 @@
import io.netty.channel.EventLoop;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.handler.ssl.SslContext;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.Future;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.NonNull;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.opensearch.migrations.NettyFutureBinders;
import org.opensearch.migrations.replay.datahandlers.NettyPacketToHttpConsumer;
import org.opensearch.migrations.replay.datatypes.ConnectionReplaySession;
import org.opensearch.migrations.replay.tracing.IReplayContexts;
import org.opensearch.migrations.replay.util.DiagnosticTrackableCompletableFuture;
import org.opensearch.migrations.replay.util.StringTrackableCompletableFuture;
import org.opensearch.migrations.replay.util.TextTrackedFuture;
import org.opensearch.migrations.replay.util.TrackedFuture;

import java.net.URI;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

@Slf4j
public class ClientConnectionPool {
Expand Down Expand Up @@ -78,45 +75,27 @@ public ConnectionReplaySession buildConnectionReplaySession(IReplayContexts.ICha
()->getResilientClientChannelProducer(eventLoop, channelKeyCtx));
}

private DiagnosticTrackableCompletableFuture<String, ChannelFuture>
private TrackedFuture<String, ChannelFuture>
getResilientClientChannelProducer(EventLoop eventLoop, IReplayContexts.IChannelKeyContext connectionContext) {
return new AdaptiveRateLimiter<String, ChannelFuture>()
.get(() -> {
var channelFuture = NettyPacketToHttpConsumer.createClientConnection(eventLoop,
sslContext, serverUri, connectionContext, timeout);
return getCompletedChannelFutureAsCompletableFuture(connectionContext, channelFuture);
});
}

public static StringTrackableCompletableFuture<ChannelFuture>
getCompletedChannelFutureAsCompletableFuture(IReplayContexts.IChannelKeyContext connectionContext,
ChannelFuture channelFuture) {
var clientConnectionChannelCreatedFuture =
new StringTrackableCompletableFuture<ChannelFuture>(new CompletableFuture<>(),
() -> "waiting for createClientConnection to finish");
channelFuture.addListener(f -> {
log.atInfo().setMessage(()->
"New network connection result for " + connectionContext + "=" + f.isSuccess()).log();
if (f.isSuccess()) {
clientConnectionChannelCreatedFuture.future.complete(channelFuture);
} else {
clientConnectionChannelCreatedFuture.future.completeExceptionally(f.cause());
}
});
return clientConnectionChannelCreatedFuture;
.get(() ->
NettyPacketToHttpConsumer.createClientConnection(eventLoop,
sslContext, serverUri, connectionContext, timeout)
.whenComplete((v,t)-> {
if (t == null) {
log.atDebug().setMessage(() -> "New network connection result for " +
connectionContext + " =" + v).log();
} else {
log.atInfo().setMessage(() -> "got exception for " + connectionContext)
.setCause(t).log();
}
}, () -> "waiting for createClientConnection to finish"));
}

public CompletableFuture<Void> shutdownNow() {
CompletableFuture<Void> shutdownFuture = new CompletableFuture<>();
connectionId2ChannelCache.invalidateAll();
eventLoopGroup.shutdownGracefully().addListener(f->{
if (f.isSuccess()) {
shutdownFuture.complete(null);
} else {
shutdownFuture.completeExceptionally(f.cause());
}
});
return shutdownFuture;
return NettyFutureBinders.bindNettyFutureToCompletableFuture(eventLoopGroup.shutdownGracefully());
}

public void closeConnection(IReplayContexts.IChannelKeyContext ctx, int sessionNumber) {
Expand All @@ -142,28 +121,24 @@ public void closeConnection(IReplayContexts.IChannelKeyContext ctx, int sessionN
return crs;
}

private DiagnosticTrackableCompletableFuture<String, Channel>
private TrackedFuture<String, Channel>
closeClientConnectionChannel(ConnectionReplaySession channelAndFutureWork) {
var channelClosedFuture =
new StringTrackableCompletableFuture<Channel>(new CompletableFuture<>(),
()->"Waiting for closeFuture() on channel");

channelAndFutureWork.getFutureThatReturnsChannelFuture(false)
.thenAccept(channelFuture-> {
return channelAndFutureWork.getFutureThatReturnsChannelFutureInAnyState(false)
.thenCompose(channelFuture-> {
if (channelFuture == null) {
return;
log.atTrace().setMessage(() -> "Asked to close channel for " +
channelAndFutureWork.getChannelKeyContext() + " but the channel wasn't found. " +
"It may have already been reset.").log();
return TextTrackedFuture.completedFuture(null, ()->"");
}
log.atTrace().setMessage(() -> "closing channel " + channelFuture.channel() +
"(" + channelAndFutureWork.getChannelKeyContext() + ")...").log();
channelFuture.channel().close()
.addListener(closeFuture -> {
return NettyFutureBinders.bindNettyFutureToTrackableFuture(
channelFuture.channel().close(),
"calling channel.close()")
.thenApply(v -> {
log.atTrace().setMessage(() -> "channel.close() has finished for " +
channelAndFutureWork.getChannelKeyContext()).log();
if (closeFuture.isSuccess()) {
channelClosedFuture.future.complete(channelFuture.channel());
} else {
channelClosedFuture.future.completeExceptionally(closeFuture.cause());
}
channelAndFutureWork.getChannelKeyContext() + " with value=" + v).log();
if (channelAndFutureWork.hasWorkRemaining()) {
log.atWarn().setMessage(() ->
"Work items are still remaining for this connection session" +
Expand All @@ -172,19 +147,9 @@ public void closeConnection(IReplayContexts.IChannelKeyContext ctx, int sessionN
"). " + channelAndFutureWork.calculateSizeSlowly() +
" requests that were enqueued won't be run").log();
}
var schedule = channelAndFutureWork.schedule;
while (channelAndFutureWork.schedule.hasPendingTransmissions()) {
var scheduledItemToKill = schedule.peekFirstItem();
schedule.removeFirstItem();
}
});
}, () -> "calling channel.close()")
.exceptionally(t->{
log.atWarn().setMessage(()->"client connection encountered an exception while closing")
.setCause(t).log();
channelClosedFuture.future.completeExceptionally(t);
return null;
}, () -> "handling any potential exceptions");
return channelClosedFuture;
channelAndFutureWork.schedule.clear();
return channelFuture.channel();
}, () -> "clearing work");
}, () -> "");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import org.opensearch.migrations.replay.datatypes.IndexedChannelInteraction;
import org.opensearch.migrations.replay.tracing.IReplayContexts;
import org.opensearch.migrations.replay.traffic.source.BufferedFlowController;
import org.opensearch.migrations.replay.util.DiagnosticTrackableCompletableFuture;
import org.opensearch.migrations.replay.util.TrackedFuture;

import java.time.Duration;
import java.time.Instant;
Expand Down Expand Up @@ -101,8 +101,8 @@ public boolean isWorkOutstanding() {
return totalCountOfScheduledTasksOutstanding.get() > 0;
}

private <T> DiagnosticTrackableCompletableFuture<String, T>
hookWorkFinishingUpdates(DiagnosticTrackableCompletableFuture<String, T> future, Instant timestamp,
private <T> TrackedFuture<String, T>
hookWorkFinishingUpdates(TrackedFuture<String, T> future, Instant timestamp,
Object stringableKey, String taskDescription) {
return future.map(f->f
.whenComplete((v,t)->Utils.setIfLater(lastCompletedSourceTimeEpochMs, timestamp.toEpochMilli()))
Expand All @@ -124,9 +124,9 @@ private static void logStartOfWork(Object stringableKey, long newCount, Instant
") to run at " + start + " incremented tasksOutstanding to "+ newCount).log();
}

public <T> DiagnosticTrackableCompletableFuture<String, T>
public <T> TrackedFuture<String, T>
scheduleTransformationWork(IReplayContexts.IReplayerHttpTransactionContext requestCtx, Instant originalStart,
Supplier<DiagnosticTrackableCompletableFuture<String,T>> task) {
Supplier<TrackedFuture<String,T>> task) {
var newCount = totalCountOfScheduledTasksOutstanding.incrementAndGet();
final String label = "processing";
var start = timeShifter.transformSourceTimeToRealTime(originalStart);
Expand All @@ -136,7 +136,7 @@ private static void logStartOfWork(Object stringableKey, long newCount, Instant
return hookWorkFinishingUpdates(result, originalStart, requestCtx, label);
}

public DiagnosticTrackableCompletableFuture<String, AggregatedRawResponse>
public TrackedFuture<String, AggregatedRawResponse>
scheduleRequest(IReplayContexts.IReplayerHttpTransactionContext ctx,
Instant originalStart, Instant originalEnd,
int numPackets, Stream<ByteBuf> packets) {
Expand All @@ -154,7 +154,7 @@ private static void logStartOfWork(Object stringableKey, long newCount, Instant
return hookWorkFinishingUpdates(sendResult, originalStart, requestKey, label);
}

public DiagnosticTrackableCompletableFuture<String, Void>
public TrackedFuture<String, Void>
closeConnection(int channelInteractionNum,
IReplayContexts.IChannelKeyContext ctx,
int channelSessionNumber, Instant timestamp) {
Expand Down
Loading

0 comments on commit a94008c

Please sign in to comment.