From 04aa65ef8ee6b51767c474a0306ad54b6d0d6ba3 Mon Sep 17 00:00:00 2001 From: Greg Schohn Date: Mon, 15 Apr 2024 18:00:08 -0400 Subject: [PATCH] Fixing some SonarLint findings. Signed-off-by: Greg Schohn --- .../kafkaoffloader/KafkaCaptureFactory.java | 5 +++-- .../StreamChannelConnectionCaptureSerializer.java | 6 ++++-- .../trafficcapture/StreamLifecycleManager.java | 1 - .../tracing/IRootOffloaderContext.java | 11 ----------- ...reamChannelConnectionCaptureSerializerTest.java | 6 ++++-- .../main/java/org/opensearch/migrations/Utils.java | 2 ++ .../ActiveContextTrackerByActivityType.java | 1 - .../migrations/tracing/IContextTracker.java | 2 +- .../migrations/tracing/IInstrumentConstructor.java | 1 - .../tracing/commoncontexts/IConnectionContext.java | 1 - .../tracing/InMemoryInstrumentationBundle.java | 3 ++- .../trafficcapture/netty/LoggingHttpHandler.java | 11 ++++++++++- .../trafficcapture/netty/PassThruHttpHeaders.java | 10 ++++++++++ ...onditionallyReliableLoggingHttpHandlerTest.java | 2 +- .../proxyserver/netty/BacksideHandler.java | 3 +-- .../netty/ExpiringSubstitutableItemPool.java | 14 ++++++-------- ...apturedTrafficToHttpTransactionAccumulator.java | 4 ++-- .../opensearch/migrations/replay/KafkaPrinter.java | 3 ++- .../migrations/replay/TrafficReplayerCore.java | 10 ++++------ .../datahandlers/NettyPacketToHttpConsumer.java | 1 - .../migrations/replay/datatypes/RawPackets.java | 10 +++++++++- .../TimeToResponseFulfillmentFutureMap.java | 6 +++--- .../replay/kafka/TrackingKafkaConsumer.java | 12 ++++++------ .../replay/tracing/TrafficSourceContexts.java | 1 - .../traffic/source/BlockingTrafficSource.java | 3 +-- .../traffic/source/ITrafficCaptureSource.java | 3 +-- .../traffic/source/InputStreamOfTraffic.java | 2 +- .../util/DiagnosticTrackableCompletableFuture.java | 4 ++-- .../replay/util/OrderedWorkerTracker.java | 4 ++-- .../replay/BlockingTrafficSourceTest.java | 2 +- .../replay/CompressedFileTrafficCaptureSource.java | 2 +- .../source/ArrayCursorTrafficCaptureSource.java | 2 +- 32 files changed, 80 insertions(+), 68 deletions(-) diff --git a/TrafficCapture/captureKafkaOffloader/src/main/java/org/opensearch/migrations/trafficcapture/kafkaoffloader/KafkaCaptureFactory.java b/TrafficCapture/captureKafkaOffloader/src/main/java/org/opensearch/migrations/trafficcapture/kafkaoffloader/KafkaCaptureFactory.java index 26402b4b7..9e7feddc7 100644 --- a/TrafficCapture/captureKafkaOffloader/src/main/java/org/opensearch/migrations/trafficcapture/kafkaoffloader/KafkaCaptureFactory.java +++ b/TrafficCapture/captureKafkaOffloader/src/main/java/org/opensearch/migrations/trafficcapture/kafkaoffloader/KafkaCaptureFactory.java @@ -135,12 +135,13 @@ public CodedOutputStreamWrapper createStream() { } // Producer Send will block on actions such as retrieving cluster metadata, allows running fully async - public static CompletableFuture sendFullyAsync(Producer producer, ProducerRecord record) { + public static CompletableFuture sendFullyAsync(Producer producer, + ProducerRecord kafkaRecord) { CompletableFuture completableFuture = new CompletableFuture<>(); ForkJoinPool.commonPool().execute(() -> { try { - producer.send(record, (metadata, exception) -> { + producer.send(kafkaRecord, (metadata, exception) -> { if (exception != null) { completableFuture.completeExceptionally(exception); } diff --git a/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java b/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java index 792eb0400..d8c7804d6 100644 --- a/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java +++ b/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java @@ -4,6 +4,8 @@ import com.google.protobuf.Descriptors; import com.google.protobuf.Timestamp; import io.netty.buffer.ByteBuf; + +import java.util.function.IntSupplier; import java.util.function.Supplier; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; @@ -131,9 +133,9 @@ private CodedOutputStreamHolder getOrCreateCodedOutputStreamHolder() throws IOEx } } - public CompletableFuture flushIfNeeded(Supplier requiredSize) throws IOException { + public CompletableFuture flushIfNeeded(IntSupplier requiredSize) throws IOException { var spaceLeft = getOrCreateCodedOutputStreamHolder().getOutputStreamSpaceLeft(); - if (spaceLeft != -1 && spaceLeft < requiredSize.get()) { + if (spaceLeft != -1 && spaceLeft < requiredSize.getAsInt()) { return flushCommitAndResetStream(false); } return CompletableFuture.completedFuture(null); diff --git a/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamLifecycleManager.java b/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamLifecycleManager.java index 74de0277a..18db43cc4 100644 --- a/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamLifecycleManager.java +++ b/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamLifecycleManager.java @@ -1,6 +1,5 @@ package org.opensearch.migrations.trafficcapture; -import java.io.IOException; import java.util.concurrent.CompletableFuture; public interface StreamLifecycleManager { diff --git a/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/tracing/IRootOffloaderContext.java b/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/tracing/IRootOffloaderContext.java index 2b0e6bab0..c087cc7fb 100644 --- a/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/tracing/IRootOffloaderContext.java +++ b/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/tracing/IRootOffloaderContext.java @@ -3,16 +3,5 @@ import org.opensearch.migrations.tracing.IRootOtelContext; public interface IRootOffloaderContext extends IRootOtelContext { - //public static final String OFFLOADER_SCOPE_NAME = "Offloader"; ConnectionContext.MetricInstruments getConnectionInstruments(); - -// public RootOffloaderContext(OpenTelemetry openTelemetry) { -// this(openTelemetry, OFFLOADER_SCOPE_NAME); -// } -// -// public RootOffloaderContext(OpenTelemetry openTelemetry, String scopeName) { -// super(scopeName, openTelemetry); -// var meter = openTelemetry.getMeterProvider().get(scopeName); -// connectionInstruments = new ConnectionContext.MetricInstruments(meter, scopeName); -// } } diff --git a/TrafficCapture/captureOffloader/src/test/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializerTest.java b/TrafficCapture/captureOffloader/src/test/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializerTest.java index b2bd5d54a..4ea9f551b 100644 --- a/TrafficCapture/captureOffloader/src/test/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializerTest.java +++ b/TrafficCapture/captureOffloader/src/test/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializerTest.java @@ -208,8 +208,10 @@ public void testWithLimitlessCodedOutputStreamHolder() var bb = Unpooled.buffer(0); serializer.addWriteEvent(REFERENCE_TIMESTAMP, bb); serializer.addWriteEvent(REFERENCE_TIMESTAMP, bb); - var future = serializer.flushCommitAndResetStream(true); - future.get(); + Assertions.assertDoesNotThrow(()-> { + var future = serializer.flushCommitAndResetStream(true); + future.get(); + }); bb.release(); } diff --git a/TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/Utils.java b/TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/Utils.java index bb181ff09..a06519e52 100644 --- a/TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/Utils.java +++ b/TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/Utils.java @@ -6,6 +6,8 @@ import java.util.stream.Collectors; public class Utils { + private Utils() {} + /** * See https://en.wikipedia.org/wiki/Fold_(higher-order_function) */ diff --git a/TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/tracing/ActiveContextTrackerByActivityType.java b/TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/tracing/ActiveContextTrackerByActivityType.java index da1f91de9..c63921c19 100644 --- a/TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/tracing/ActiveContextTrackerByActivityType.java +++ b/TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/tracing/ActiveContextTrackerByActivityType.java @@ -1,7 +1,6 @@ package org.opensearch.migrations.tracing; import java.util.Collection; -import java.util.Comparator; import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; diff --git a/TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/tracing/IContextTracker.java b/TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/tracing/IContextTracker.java index 193e8f7d3..a4dd92455 100644 --- a/TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/tracing/IContextTracker.java +++ b/TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/tracing/IContextTracker.java @@ -12,5 +12,5 @@ default void onContextCreated(IScopedInstrumentationAttributes newScopedContext) */ default void onContextClosed(IScopedInstrumentationAttributes newScopedContext) {} - final static IContextTracker DO_NOTHING_TRACKER = new IContextTracker() {}; + IContextTracker DO_NOTHING_TRACKER = new IContextTracker() {}; } diff --git a/TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/tracing/IInstrumentConstructor.java b/TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/tracing/IInstrumentConstructor.java index ee6af9e2d..d56f70a76 100644 --- a/TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/tracing/IInstrumentConstructor.java +++ b/TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/tracing/IInstrumentConstructor.java @@ -1,6 +1,5 @@ package org.opensearch.migrations.tracing; -import io.opentelemetry.api.common.AttributesBuilder; import io.opentelemetry.api.trace.Span; import lombok.NonNull; diff --git a/TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/tracing/commoncontexts/IConnectionContext.java b/TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/tracing/commoncontexts/IConnectionContext.java index 48a02873a..3991dfee2 100644 --- a/TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/tracing/commoncontexts/IConnectionContext.java +++ b/TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/tracing/commoncontexts/IConnectionContext.java @@ -2,7 +2,6 @@ import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.AttributesBuilder; -import org.opensearch.migrations.tracing.IInstrumentationAttributes; import org.opensearch.migrations.tracing.IScopedInstrumentationAttributes; public interface IConnectionContext extends IScopedInstrumentationAttributes { diff --git a/TrafficCapture/coreUtilities/src/testFixtures/java/org/opensearch/migrations/tracing/InMemoryInstrumentationBundle.java b/TrafficCapture/coreUtilities/src/testFixtures/java/org/opensearch/migrations/tracing/InMemoryInstrumentationBundle.java index ef3f968fc..3b8c2c7aa 100644 --- a/TrafficCapture/coreUtilities/src/testFixtures/java/org/opensearch/migrations/tracing/InMemoryInstrumentationBundle.java +++ b/TrafficCapture/coreUtilities/src/testFixtures/java/org/opensearch/migrations/tracing/InMemoryInstrumentationBundle.java @@ -110,7 +110,8 @@ public List getMetricsUntil(String metricName, IntStream sleepTimes, " did not").log(); Thread.sleep(sleepAmount); } catch (InterruptedException e) { - throw new RuntimeException(e); + Thread.currentThread().interrupt(); + throw Lombok.sneakyThrow(e); } return false; } diff --git a/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/LoggingHttpHandler.java b/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/LoggingHttpHandler.java index ce1fc14f3..164125c00 100644 --- a/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/LoggingHttpHandler.java +++ b/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/LoggingHttpHandler.java @@ -192,6 +192,15 @@ public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { super.handlerRemoved(ctx); } + /** + * This provides a callback that subclasses can use to override the default behavior of cycling the + * instrumentation context and continuing to read. Subclasses may determine if additional processing + * or triggers should occur before proceeding, given the current context. + * @param ctx the instrumentation context for this request + * @param msg the original message, which is likely a ByteBuf, that helped to form the httpRequest + * @param shouldCapture false if the current request has been determined to be ignorable + * @param httpRequest the request that has just been fully received (excluding its body) + */ protected void channelFinishedReadingAnHttpMessage(ChannelHandlerContext ctx, Object msg, boolean shouldCapture, HttpRequest httpRequest) throws Exception { messageContext = messageContext.createWaitingForResponseContext(); @@ -199,7 +208,7 @@ protected void channelFinishedReadingAnHttpMessage(ChannelHandlerContext ctx, Ob } @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + public void channelRead(@NonNull ChannelHandlerContext ctx, @NonNull Object msg) throws Exception { IWireCaptureContexts.IRequestContext requestContext; if (!(messageContext instanceof IWireCaptureContexts.IRequestContext)) { messageContext = requestContext = messageContext.createNextRequestContext(); diff --git a/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/PassThruHttpHeaders.java b/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/PassThruHttpHeaders.java index a45f85d30..dede207d7 100644 --- a/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/PassThruHttpHeaders.java +++ b/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/PassThruHttpHeaders.java @@ -31,6 +31,16 @@ public PassThruHttpHeaders(@NonNull HttpHeadersToPreserve headersToPreserve) { this.mapWithCaseInsensitiveHeaders = headersToPreserve.caseInsensitiveHeadersMap; } + @Override + public boolean equals(Object o) { + throw new IllegalStateException("equals() is not supported for this stripped-down version of HttpHeaders"); + } + + @Override + public int hashCode() { + throw new IllegalStateException("hashCode() is not supported for this stripped-down version of HttpHeaders"); + } + private boolean headerNameShouldBeTracked(CharSequence name) { return mapWithCaseInsensitiveHeaders.contains(name); } diff --git a/TrafficCapture/nettyWireLogging/src/test/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpHandlerTest.java b/TrafficCapture/nettyWireLogging/src/test/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpHandlerTest.java index 26d4d3100..f63358ed9 100644 --- a/TrafficCapture/nettyWireLogging/src/test/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpHandlerTest.java +++ b/TrafficCapture/nettyWireLogging/src/test/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpHandlerTest.java @@ -189,7 +189,7 @@ private static Consumer getSingleByteAtATimeWriter(boolean useP // This test doesn't work yet, but this is an optimization. Getting connections with only a // close observation is already a common occurrence. This is nice to have, so it's good to // keep this warm and ready, but we don't need the feature for correctness. - @Disabled + @Disabled("This is for an optimization that isn't functional yet") @Test @ValueSource(booleans = {false, true}) public void testThatSuppressedCaptureWorks() throws Exception { diff --git a/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/BacksideHandler.java b/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/BacksideHandler.java index 3fb3212cc..d7a4840f4 100644 --- a/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/BacksideHandler.java +++ b/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/BacksideHandler.java @@ -38,8 +38,7 @@ public void channelInactive(ChannelHandlerContext ctx) { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - log.atError().setCause(cause).setMessage("Caught error").log(); - String channelId = ctx.channel().id().asLongText(); + log.atError().setCause(cause).setMessage("Caught error for channel: " + ctx.channel().id().asLongText()).log(); FrontsideHandler.closeAndFlush(ctx.channel()); } } \ No newline at end of file diff --git a/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/ExpiringSubstitutableItemPool.java b/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/ExpiringSubstitutableItemPool.java index 6c165e39b..2bbcef31c 100644 --- a/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/ExpiringSubstitutableItemPool.java +++ b/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/ExpiringSubstitutableItemPool.java @@ -41,11 +41,9 @@ public Entry(F future) { @Override public String toString() { - final StringBuilder sb = new StringBuilder("Entry{"); - sb.append("timestamp=").append(timestamp); - sb.append(", value=").append(future); - sb.append('}'); - return sb.toString(); + return "Entry{" + "timestamp=" + timestamp + + ", value=" + future + + '}'; } } @@ -150,8 +148,8 @@ private void addExpiredItem() { private final Consumer onExpirationConsumer; @Getter private final EventLoop eventLoop; - private Duration inactivityTimeout; - private GenericFutureListener shuffleInProgressToReady; + private final Duration inactivityTimeout; + private final GenericFutureListener shuffleInProgressToReady; private final Stats stats; private int poolSize; @@ -305,7 +303,7 @@ private void beginLoadingNewItemIfNecessary() { @Override @SneakyThrows public String toString() { - return eventLoop.submit(() -> toStringOnThread()).get(); + return eventLoop.submit(this::toStringOnThread).get(); } private String toStringOnThread() { diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/CapturedTrafficToHttpTransactionAccumulator.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/CapturedTrafficToHttpTransactionAccumulator.java index 06ffe152c..bfa23f60a 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/CapturedTrafficToHttpTransactionAccumulator.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/CapturedTrafficToHttpTransactionAccumulator.java @@ -128,7 +128,7 @@ public void onTrafficStreamsExpired(RequestResponsePacketPair.ReconstructionStat public void onTrafficStreamIgnored(@NonNull ITrafficStreamKey tsk) { underlying.onTrafficStreamIgnored(tsk.getTrafficStreamsContext()); } - }; + } public int numberOfConnectionsCreated() { return liveStreams.numberOfConnectionsCreated(); } public int numberOfRequestsOnReusedConnections() { return reusedKeepAliveCounter.get(); } @@ -361,7 +361,7 @@ private Optional handleObservationForWriteState(Accumulation private void handleDroppedRequestForAccumulation(Accumulation accum) { if (accum.hasRrPair()) { var rrPair = accum.getRrPair(); - rrPair.getTrafficStreamsHeld().forEach(tsk->listener.onTrafficStreamIgnored(tsk)); + rrPair.getTrafficStreamsHeld().forEach(listener::onTrafficStreamIgnored); } log.atTrace().setMessage(()->"resetting to forget "+ accum.trafficChannelKey).log(); accum.resetToIgnoreAndForgetCurrentRequest(); diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/KafkaPrinter.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/KafkaPrinter.java index 044020baf..66c5b32dd 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/KafkaPrinter.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/KafkaPrinter.java @@ -15,6 +15,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; @@ -194,7 +195,7 @@ public static void main(String[] args) throws FileNotFoundException { } String baseOutputPath = params.outputDirectoryPath == null ? "./" : params.outputDirectoryPath; - baseOutputPath = !baseOutputPath.endsWith("/") ? baseOutputPath + "/" : baseOutputPath; + baseOutputPath = !baseOutputPath.endsWith(File.separator) ? baseOutputPath + File.separator : baseOutputPath; String uuid = UUID.randomUUID().toString(); boolean separatePartitionOutputs = false; Map partitionOutputStreams = new HashMap<>(); diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayerCore.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayerCore.java index f7578116e..acdac3203 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayerCore.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayerCore.java @@ -111,7 +111,7 @@ class TrafficReplayerAccumulationCallbacks implements AccumulationCallbacks { var requestPushFuture = new StringTrackableCompletableFuture( new CompletableFuture<>(), () -> "Waiting to get response from target"); var requestKey = ctx.getReplayerRequestKey(); - liveTrafficStreamLimiter.queueWork(1, ctx, wi -> { + liveTrafficStreamLimiter.queueWork(1, ctx, wi -> transformAndSendRequest(replayEngine, request, ctx).future.whenComplete((v,t)->{ liveTrafficStreamLimiter.doneProcessing(wi); if (t != null) { @@ -119,8 +119,7 @@ class TrafficReplayerAccumulationCallbacks implements AccumulationCallbacks { } else { requestPushFuture.future.complete(v); } - }); - }); + })); if (!allWorkFinishedForTransaction.future.isDone()) { log.trace("Adding " + requestKey + " to targetTransactionInProgressMap"); requestWorkTracker.put(requestKey, allWorkFinishedForTransaction); @@ -222,9 +221,8 @@ public void onConnectionClose(int channelInteractionNum, @NonNull Instant timestamp, @NonNull List trafficStreamKeysBeingHeld) { replayEngine.setFirstTimestamp(timestamp); var cf = replayEngine.closeConnection(channelInteractionNum, ctx, channelSessionNumber, timestamp); - cf.map(f->f.whenComplete((v,t)->{ - commitTrafficStreams(status, trafficStreamKeysBeingHeld); - }), ()->"closing the channel in the ReplayEngine"); + cf.map(f->f.whenComplete((v,t) -> commitTrafficStreams(status, trafficStreamKeysBeingHeld)), + ()->"closing the channel in the ReplayEngine"); } @Override diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumer.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumer.java index a98a7def5..a2aba11b8 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumer.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumer.java @@ -304,7 +304,6 @@ private IReplayContexts.IReplayerHttpTransactionContext httpContext() { writePacketAndUpdateFuture(ByteBuf packetData) { final var completableFuture = new DiagnosticTrackableCompletableFuture(new CompletableFuture<>(), ()->"CompletableFuture that will wait for the netty future to fill in the completion value"); - final int readableBytes = packetData.readableBytes(); channel.writeAndFlush(packetData) .addListener((ChannelFutureListener) future -> { Throwable cause = null; diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/RawPackets.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/RawPackets.java index e9e82b9ee..3455b2c07 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/RawPackets.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/RawPackets.java @@ -1,6 +1,5 @@ package org.opensearch.migrations.replay.datatypes; -import lombok.EqualsAndHashCode; import java.util.ArrayList; import java.util.Arrays; @@ -21,4 +20,13 @@ public boolean equals(Object o) { } return true; } + + @Override + public int hashCode() { + int result = 29; + for (byte[] array : this) { + result = 31 * result + Arrays.hashCode(array); + } + return result; + } } diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/TimeToResponseFulfillmentFutureMap.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/TimeToResponseFulfillmentFutureMap.java index 7e8f63387..09653587c 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/TimeToResponseFulfillmentFutureMap.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/TimeToResponseFulfillmentFutureMap.java @@ -3,6 +3,7 @@ import java.time.Instant; import java.util.AbstractMap; import java.util.ArrayDeque; +import java.util.Collection; import java.util.Map; import java.util.StringJoiner; import java.util.TreeMap; @@ -45,9 +46,8 @@ public boolean hasPendingTransmissions() { return false; } else { return timeToRunnableMap.values().stream() - .flatMap(d->d.stream()) - .filter(ct->ct.kind==ChannelTaskType.TRANSMIT) - .findAny().isPresent(); + .flatMap(Collection::stream) + .anyMatch(ct->ct.kind==ChannelTaskType.TRANSMIT); } } diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/TrackingKafkaConsumer.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/TrackingKafkaConsumer.java index d4cde430b..ec50a6a19 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/TrackingKafkaConsumer.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/TrackingKafkaConsumer.java @@ -197,7 +197,7 @@ public void touch(ITrafficSourceContexts.IBackPressureBlockContext context) { } finally { resume(); } - safeCommit(()->context.createCommitContext()); + safeCommit(context::createCommitContext); lastTouchTimeRef.set(clock.instant()); } } @@ -244,9 +244,9 @@ private Collection getActivePartitions() { public Stream getNextBatchOfRecords(ITrafficSourceContexts.IReadChunkContext context, BiFunction, T> builder) { - safeCommit(()->context.createCommitContext()); + safeCommit(context::createCommitContext); var records = safePollWithSwallowedRuntimeExceptions(context); - safeCommit(()->context.createCommitContext()); + safeCommit(context::createCommitContext); return applyBuilder(builder, records); } @@ -299,7 +299,7 @@ ITrafficCaptureSource.CommitResult commitKafkaKey(ITrafficStreamKey streamKey, K + "). Dropping this commit request since the record would " + "have been handled again by a current consumer within this process or another. Full key=" + kafkaTsk).log(); - return ITrafficCaptureSource.CommitResult.Ignored; + return ITrafficCaptureSource.CommitResult.IGNORED; } var p = kafkaTsk.getPartition(); @@ -315,12 +315,12 @@ ITrafficCaptureSource.CommitResult commitKafkaKey(ITrafficStreamKey streamKey, K addKeyContextForEventualCommit(streamKey, kafkaTsk, k); nextSetOfCommitsMap.put(k, v); } - return ITrafficCaptureSource.CommitResult.AfterNextRead; + return ITrafficCaptureSource.CommitResult.AFTER_NEXT_READ; }).orElseGet(() -> { synchronized (commitDataLock) { addKeyContextForEventualCommit(streamKey, kafkaTsk, k); } - return ITrafficCaptureSource.CommitResult.BlockedByOtherCommits; + return ITrafficCaptureSource.CommitResult.BLOCKED_BY_OTHER_COMMITS; }); } diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/tracing/TrafficSourceContexts.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/tracing/TrafficSourceContexts.java index 7f6663f21..4bb6e6ca6 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/tracing/TrafficSourceContexts.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/tracing/TrafficSourceContexts.java @@ -4,7 +4,6 @@ import lombok.NonNull; import org.opensearch.migrations.tracing.BaseNestedSpanContext; import org.opensearch.migrations.tracing.CommonScopedMetricInstruments; -import org.opensearch.migrations.tracing.IInstrumentationAttributes; import org.opensearch.migrations.tracing.IScopedInstrumentationAttributes; public class TrafficSourceContexts { diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/traffic/source/BlockingTrafficSource.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/traffic/source/BlockingTrafficSource.java index f161d1070..e03e10be1 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/traffic/source/BlockingTrafficSource.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/traffic/source/BlockingTrafficSource.java @@ -18,7 +18,6 @@ import java.util.List; import java.util.StringJoiner; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; @@ -179,7 +178,7 @@ private Void blockIfNeeded(ITrafficSourceContexts.IReadChunkContext readContext) @Override public CommitResult commitTrafficStream(ITrafficStreamKey trafficStreamKey) throws IOException { var commitResult = underlyingSource.commitTrafficStream(trafficStreamKey); - if (commitResult == CommitResult.AfterNextRead) { + if (commitResult == CommitResult.AFTER_NEXT_READ) { readGate.drainPermits(); readGate.release(); } diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/traffic/source/ITrafficCaptureSource.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/traffic/source/ITrafficCaptureSource.java index c0bfaf574..366c34186 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/traffic/source/ITrafficCaptureSource.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/traffic/source/ITrafficCaptureSource.java @@ -3,7 +3,6 @@ import org.opensearch.migrations.replay.datatypes.ITrafficStreamKey; import org.opensearch.migrations.replay.tracing.ITrafficSourceContexts; -import java.io.Closeable; import java.io.IOException; import java.time.Instant; import java.util.List; @@ -14,7 +13,7 @@ public interface ITrafficCaptureSource extends AutoCloseable { enum CommitResult { - Immediate, AfterNextRead, BlockedByOtherCommits, Ignored + IMMEDIATE, AFTER_NEXT_READ, BLOCKED_BY_OTHER_COMMITS, IGNORED } CompletableFuture> diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/traffic/source/InputStreamOfTraffic.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/traffic/source/InputStreamOfTraffic.java index 667b18201..401889e00 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/traffic/source/InputStreamOfTraffic.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/traffic/source/InputStreamOfTraffic.java @@ -75,7 +75,7 @@ public IOSTrafficStreamContext(RootReplayerContext rootReplayerContext, public CommitResult commitTrafficStream(ITrafficStreamKey trafficStreamKey) { // do nothing - this datasource isn't transactional channelContextManager.releaseContextFor(trafficStreamKey.getTrafficStreamsContext().getLogicalEnclosingScope()); - return CommitResult.Immediate; + return CommitResult.IMMEDIATE; } @Override diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/DiagnosticTrackableCompletableFuture.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/DiagnosticTrackableCompletableFuture.java index ec4c46fa0..6a6fed701 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/DiagnosticTrackableCompletableFuture.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/DiagnosticTrackableCompletableFuture.java @@ -83,7 +83,7 @@ public DiagnosticTrackableCompletableFuture(@NonNull CompletableFuture future return new DiagnosticTrackableCompletableFuture<>(newCf, diagnosticSupplier, this); } - public DiagnosticTrackableCompletableFuture + public DiagnosticTrackableCompletableFuture thenAccept(Consumer fn, @NonNull Supplier diagnosticSupplier) { return this.map(dcf->dcf.thenAccept(fn), diagnosticSupplier); } @@ -95,7 +95,7 @@ public DiagnosticTrackableCompletableFuture(@NonNull CompletableFuture future public DiagnosticTrackableCompletableFuture whenComplete(BiConsumer fn, @NonNull Supplier diagnosticSupplier) { - return map(cf->cf.whenComplete((v,t)->fn.accept(v,t)), diagnosticSupplier); + return map(cf->cf.whenComplete(fn::accept), diagnosticSupplier); } public DiagnosticTrackableCompletableFuture diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/OrderedWorkerTracker.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/OrderedWorkerTracker.java index d7da7fa7c..304b26b65 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/OrderedWorkerTracker.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/OrderedWorkerTracker.java @@ -22,12 +22,12 @@ static class TimeKeyAndFuture { ConcurrentHashMap> primaryMap = new ConcurrentHashMap<>(); ConcurrentSkipListSet> orderedSet = new ConcurrentSkipListSet<>(Comparator.comparingLong(TimeKeyAndFuture::getNanoTimeKey) - .thenComparingLong(i->System.identityHashCode(i))); + .thenComparingLong(System::identityHashCode)); @Override public void put(UniqueReplayerRequestKey uniqueReplayerRequestKey, DiagnosticTrackableCompletableFuture completableFuture) { - var timedValue = new TimeKeyAndFuture(System.nanoTime(), completableFuture); + var timedValue = new TimeKeyAndFuture<>(System.nanoTime(), completableFuture); primaryMap.put(uniqueReplayerRequestKey, timedValue); orderedSet.add(timedValue); } diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/BlockingTrafficSourceTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/BlockingTrafficSourceTest.java index 15277cdda..5f8e756f5 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/BlockingTrafficSourceTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/BlockingTrafficSourceTest.java @@ -115,7 +115,7 @@ public void close() throws IOException {} @Override public CommitResult commitTrafficStream(ITrafficStreamKey trafficStreamKey) { // do nothing - return CommitResult.Immediate; + return CommitResult.IMMEDIATE; } } } \ No newline at end of file diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/CompressedFileTrafficCaptureSource.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/CompressedFileTrafficCaptureSource.java index 73be373b2..b73ecb08d 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/CompressedFileTrafficCaptureSource.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/CompressedFileTrafficCaptureSource.java @@ -36,7 +36,7 @@ public CompressedFileTrafficCaptureSource(RootReplayerContext context, String fi @Override public CommitResult commitTrafficStream(ITrafficStreamKey trafficStreamKey) { // do nothing - return CommitResult.Immediate; + return CommitResult.IMMEDIATE; } @Override diff --git a/TrafficCapture/trafficReplayer/src/testFixtures/java/org/opensearch/migrations/replay/traffic/source/ArrayCursorTrafficCaptureSource.java b/TrafficCapture/trafficReplayer/src/testFixtures/java/org/opensearch/migrations/replay/traffic/source/ArrayCursorTrafficCaptureSource.java index cb336070b..0a3502023 100644 --- a/TrafficCapture/trafficReplayer/src/testFixtures/java/org/opensearch/migrations/replay/traffic/source/ArrayCursorTrafficCaptureSource.java +++ b/TrafficCapture/trafficReplayer/src/testFixtures/java/org/opensearch/migrations/replay/traffic/source/ArrayCursorTrafficCaptureSource.java @@ -71,6 +71,6 @@ public CommitResult commitTrafficStream(ITrafficStreamKey trafficStreamKey) { } rootContext.channelContextManager.releaseContextFor( ((TrafficStreamCursorKey) trafficStreamKey).trafficStreamsContext.getChannelKeyContext()); - return CommitResult.Immediate; + return CommitResult.IMMEDIATE; } }