Skip to content

Commit

Permalink
Fixing some SonarLint findings.
Browse files Browse the repository at this point in the history
Signed-off-by: Greg Schohn <[email protected]>
  • Loading branch information
gregschohn committed Apr 16, 2024
1 parent 911c9fa commit e5aafc6
Show file tree
Hide file tree
Showing 32 changed files with 80 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,13 @@ public CodedOutputStreamWrapper createStream() {
}

// Producer Send will block on actions such as retrieving cluster metadata, allows running fully async
public static <K, V> CompletableFuture<RecordMetadata> sendFullyAsync(Producer<K, V> producer, ProducerRecord<K, V> record) {
public static <K, V> CompletableFuture<RecordMetadata> sendFullyAsync(Producer<K, V> producer,
ProducerRecord<K, V> kafkaRecord) {
CompletableFuture<RecordMetadata> completableFuture = new CompletableFuture<>();

ForkJoinPool.commonPool().execute(() -> {
try {
producer.send(record, (metadata, exception) -> {
producer.send(kafkaRecord, (metadata, exception) -> {
if (exception != null) {
completableFuture.completeExceptionally(exception);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import com.google.protobuf.Descriptors;
import com.google.protobuf.Timestamp;
import io.netty.buffer.ByteBuf;

import java.util.function.IntSupplier;
import java.util.function.Supplier;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -131,9 +133,9 @@ private CodedOutputStreamHolder getOrCreateCodedOutputStreamHolder() throws IOEx
}
}

public CompletableFuture<T> flushIfNeeded(Supplier<Integer> requiredSize) throws IOException {
public CompletableFuture<T> flushIfNeeded(IntSupplier requiredSize) throws IOException {
var spaceLeft = getOrCreateCodedOutputStreamHolder().getOutputStreamSpaceLeft();
if (spaceLeft != -1 && spaceLeft < requiredSize.get()) {
if (spaceLeft != -1 && spaceLeft < requiredSize.getAsInt()) {
return flushCommitAndResetStream(false);
}
return CompletableFuture.completedFuture(null);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.opensearch.migrations.trafficcapture;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;

public interface StreamLifecycleManager<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,5 @@
import org.opensearch.migrations.tracing.IRootOtelContext;

public interface IRootOffloaderContext extends IRootOtelContext {
//public static final String OFFLOADER_SCOPE_NAME = "Offloader";
ConnectionContext.MetricInstruments getConnectionInstruments();

// public RootOffloaderContext(OpenTelemetry openTelemetry) {
// this(openTelemetry, OFFLOADER_SCOPE_NAME);
// }
//
// public RootOffloaderContext(OpenTelemetry openTelemetry, String scopeName) {
// super(scopeName, openTelemetry);
// var meter = openTelemetry.getMeterProvider().get(scopeName);
// connectionInstruments = new ConnectionContext.MetricInstruments(meter, scopeName);
// }
}
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,10 @@ public void testWithLimitlessCodedOutputStreamHolder()
var bb = Unpooled.buffer(0);
serializer.addWriteEvent(REFERENCE_TIMESTAMP, bb);
serializer.addWriteEvent(REFERENCE_TIMESTAMP, bb);
var future = serializer.flushCommitAndResetStream(true);
future.get();
Assertions.assertDoesNotThrow(()-> {
var future = serializer.flushCommitAndResetStream(true);
future.get();
});
bb.release();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import java.util.stream.Collectors;

public class Utils {
private Utils() {}

/**
* See https://en.wikipedia.org/wiki/Fold_(higher-order_function)
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.opensearch.migrations.tracing;

import java.util.Collection;
import java.util.Comparator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ default void onContextCreated(IScopedInstrumentationAttributes newScopedContext)
*/
default void onContextClosed(IScopedInstrumentationAttributes newScopedContext) {}

final static IContextTracker DO_NOTHING_TRACKER = new IContextTracker() {};
IContextTracker DO_NOTHING_TRACKER = new IContextTracker() {};
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.opensearch.migrations.tracing;

import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.trace.Span;
import lombok.NonNull;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.AttributesBuilder;
import org.opensearch.migrations.tracing.IInstrumentationAttributes;
import org.opensearch.migrations.tracing.IScopedInstrumentationAttributes;

public interface IConnectionContext extends IScopedInstrumentationAttributes {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ public List<MetricData> getMetricsUntil(String metricName, IntStream sleepTimes,
" did not").log();
Thread.sleep(sleepAmount);
} catch (InterruptedException e) {
throw new RuntimeException(e);
Thread.currentThread().interrupt();
throw Lombok.sneakyThrow(e);
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,14 +192,23 @@ public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
super.handlerRemoved(ctx);
}

/**
* This provides a callback that subclasses can use to override the default behavior of cycling the
* instrumentation context and continuing to read. Subclasses may determine if additional processing
* or triggers should occur before proceeding, given the current context.
* @param ctx the instrumentation context for this request
* @param msg the original message, which is likely a ByteBuf, that helped to form the httpRequest
* @param shouldCapture false if the current request has been determined to be ignorable
* @param httpRequest the request that has just been fully received (excluding its body)
*/
protected void channelFinishedReadingAnHttpMessage(ChannelHandlerContext ctx, Object msg, boolean shouldCapture,
HttpRequest httpRequest) throws Exception {
messageContext = messageContext.createWaitingForResponseContext();
super.channelRead(ctx, msg);
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
public void channelRead(@NonNull ChannelHandlerContext ctx, @NonNull Object msg) throws Exception {
IWireCaptureContexts.IRequestContext requestContext;
if (!(messageContext instanceof IWireCaptureContexts.IRequestContext)) {
messageContext = requestContext = messageContext.createNextRequestContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,16 @@ public PassThruHttpHeaders(@NonNull HttpHeadersToPreserve headersToPreserve) {
this.mapWithCaseInsensitiveHeaders = headersToPreserve.caseInsensitiveHeadersMap;
}

@Override
public boolean equals(Object o) {
throw new IllegalStateException("equals() is not supported for this stripped-down version of HttpHeaders");
}

@Override
public int hashCode() {
throw new IllegalStateException("hashCode() is not supported for this stripped-down version of HttpHeaders");
}

private boolean headerNameShouldBeTracked(CharSequence name) {
return mapWithCaseInsensitiveHeaders.contains(name);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ private static Consumer<EmbeddedChannel> getSingleByteAtATimeWriter(boolean useP
// This test doesn't work yet, but this is an optimization. Getting connections with only a
// close observation is already a common occurrence. This is nice to have, so it's good to
// keep this warm and ready, but we don't need the feature for correctness.
@Disabled
@Disabled("This is for an optimization that isn't functional yet")
@Test
@ValueSource(booleans = {false, true})
public void testThatSuppressedCaptureWorks() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ public void channelInactive(ChannelHandlerContext ctx) {

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.atError().setCause(cause).setMessage("Caught error").log();
String channelId = ctx.channel().id().asLongText();
log.atError().setCause(cause).setMessage("Caught error for channel: " + ctx.channel().id().asLongText()).log();
FrontsideHandler.closeAndFlush(ctx.channel());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,9 @@ public Entry(F future) {

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("Entry{");
sb.append("timestamp=").append(timestamp);
sb.append(", value=").append(future);
sb.append('}');
return sb.toString();
return "Entry{" + "timestamp=" + timestamp +
", value=" + future +
'}';
}
}

Expand Down Expand Up @@ -150,8 +148,8 @@ private void addExpiredItem() {
private final Consumer<F> onExpirationConsumer;
@Getter
private final EventLoop eventLoop;
private Duration inactivityTimeout;
private GenericFutureListener<F> shuffleInProgressToReady;
private final Duration inactivityTimeout;
private final GenericFutureListener<F> shuffleInProgressToReady;
private final Stats stats;
private int poolSize;

Expand Down Expand Up @@ -305,7 +303,7 @@ private void beginLoadingNewItemIfNecessary() {
@Override
@SneakyThrows
public String toString() {
return eventLoop.submit(() -> toStringOnThread()).get();
return eventLoop.submit(this::toStringOnThread).get();
}

private String toStringOnThread() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public void onTrafficStreamsExpired(RequestResponsePacketPair.ReconstructionStat
public void onTrafficStreamIgnored(@NonNull ITrafficStreamKey tsk) {
underlying.onTrafficStreamIgnored(tsk.getTrafficStreamsContext());
}
};
}

public int numberOfConnectionsCreated() { return liveStreams.numberOfConnectionsCreated(); }
public int numberOfRequestsOnReusedConnections() { return reusedKeepAliveCounter.get(); }
Expand Down Expand Up @@ -361,7 +361,7 @@ private Optional<CONNECTION_STATUS> handleObservationForWriteState(Accumulation
private void handleDroppedRequestForAccumulation(Accumulation accum) {
if (accum.hasRrPair()) {
var rrPair = accum.getRrPair();
rrPair.getTrafficStreamsHeld().forEach(tsk->listener.onTrafficStreamIgnored(tsk));
rrPair.getTrafficStreamsHeld().forEach(listener::onTrafficStreamIgnored);
}
log.atTrace().setMessage(()->"resetting to forget "+ accum.trafficChannelKey).log();
accum.resetToIgnoreAndForgetCurrentRequest();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
Expand Down Expand Up @@ -194,7 +195,7 @@ public static void main(String[] args) throws FileNotFoundException {
}

String baseOutputPath = params.outputDirectoryPath == null ? "./" : params.outputDirectoryPath;
baseOutputPath = !baseOutputPath.endsWith("/") ? baseOutputPath + "/" : baseOutputPath;
baseOutputPath = !baseOutputPath.endsWith(File.separator) ? baseOutputPath + File.separator : baseOutputPath;
String uuid = UUID.randomUUID().toString();
boolean separatePartitionOutputs = false;
Map<Integer, CodedOutputStream> partitionOutputStreams = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,16 +111,15 @@ class TrafficReplayerAccumulationCallbacks implements AccumulationCallbacks {
var requestPushFuture = new StringTrackableCompletableFuture<TransformedTargetRequestAndResponse>(
new CompletableFuture<>(), () -> "Waiting to get response from target");
var requestKey = ctx.getReplayerRequestKey();
liveTrafficStreamLimiter.queueWork(1, ctx, wi -> {
liveTrafficStreamLimiter.queueWork(1, ctx, wi ->
transformAndSendRequest(replayEngine, request, ctx).future.whenComplete((v,t)->{
liveTrafficStreamLimiter.doneProcessing(wi);
if (t != null) {
requestPushFuture.future.completeExceptionally(t);
} else {
requestPushFuture.future.complete(v);
}
});
});
}));
if (!allWorkFinishedForTransaction.future.isDone()) {
log.trace("Adding " + requestKey + " to targetTransactionInProgressMap");
requestWorkTracker.put(requestKey, allWorkFinishedForTransaction);
Expand Down Expand Up @@ -222,9 +221,8 @@ public void onConnectionClose(int channelInteractionNum,
@NonNull Instant timestamp, @NonNull List<ITrafficStreamKey> trafficStreamKeysBeingHeld) {
replayEngine.setFirstTimestamp(timestamp);
var cf = replayEngine.closeConnection(channelInteractionNum, ctx, channelSessionNumber, timestamp);
cf.map(f->f.whenComplete((v,t)->{
commitTrafficStreams(status, trafficStreamKeysBeingHeld);
}), ()->"closing the channel in the ReplayEngine");
cf.map(f->f.whenComplete((v,t) -> commitTrafficStreams(status, trafficStreamKeysBeingHeld)),
()->"closing the channel in the ReplayEngine");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,6 @@ private IReplayContexts.IReplayerHttpTransactionContext httpContext() {
writePacketAndUpdateFuture(ByteBuf packetData) {
final var completableFuture = new DiagnosticTrackableCompletableFuture<String, Void>(new CompletableFuture<>(),
()->"CompletableFuture that will wait for the netty future to fill in the completion value");
final int readableBytes = packetData.readableBytes();
channel.writeAndFlush(packetData)
.addListener((ChannelFutureListener) future -> {
Throwable cause = null;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.opensearch.migrations.replay.datatypes;

import lombok.EqualsAndHashCode;

import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -21,4 +20,13 @@ public boolean equals(Object o) {
}
return true;
}

@Override
public int hashCode() {
int result = 29;
for (byte[] array : this) {
result = 31 * result + Arrays.hashCode(array);
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.time.Instant;
import java.util.AbstractMap;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Map;
import java.util.StringJoiner;
import java.util.TreeMap;
Expand Down Expand Up @@ -45,9 +46,8 @@ public boolean hasPendingTransmissions() {
return false;
} else {
return timeToRunnableMap.values().stream()
.flatMap(d->d.stream())
.filter(ct->ct.kind==ChannelTaskType.TRANSMIT)
.findAny().isPresent();
.flatMap(Collection::stream)
.anyMatch(ct->ct.kind==ChannelTaskType.TRANSMIT);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ public void touch(ITrafficSourceContexts.IBackPressureBlockContext context) {
} finally {
resume();
}
safeCommit(()->context.createCommitContext());
safeCommit(context::createCommitContext);
lastTouchTimeRef.set(clock.instant());
}
}
Expand Down Expand Up @@ -244,9 +244,9 @@ private Collection<TopicPartition> getActivePartitions() {
public <T> Stream<T>
getNextBatchOfRecords(ITrafficSourceContexts.IReadChunkContext context,
BiFunction<KafkaCommitOffsetData, ConsumerRecord<String,byte[]>, T> builder) {
safeCommit(()->context.createCommitContext());
safeCommit(context::createCommitContext);
var records = safePollWithSwallowedRuntimeExceptions(context);
safeCommit(()->context.createCommitContext());
safeCommit(context::createCommitContext);
return applyBuilder(builder, records);
}

Expand Down Expand Up @@ -299,7 +299,7 @@ ITrafficCaptureSource.CommitResult commitKafkaKey(ITrafficStreamKey streamKey, K
+ "). Dropping this commit request since the record would " +
"have been handled again by a current consumer within this process or another. Full key=" +
kafkaTsk).log();
return ITrafficCaptureSource.CommitResult.Ignored;
return ITrafficCaptureSource.CommitResult.IGNORED;
}

var p = kafkaTsk.getPartition();
Expand All @@ -315,12 +315,12 @@ ITrafficCaptureSource.CommitResult commitKafkaKey(ITrafficStreamKey streamKey, K
addKeyContextForEventualCommit(streamKey, kafkaTsk, k);
nextSetOfCommitsMap.put(k, v);
}
return ITrafficCaptureSource.CommitResult.AfterNextRead;
return ITrafficCaptureSource.CommitResult.AFTER_NEXT_READ;
}).orElseGet(() -> {
synchronized (commitDataLock) {
addKeyContextForEventualCommit(streamKey, kafkaTsk, k);
}
return ITrafficCaptureSource.CommitResult.BlockedByOtherCommits;
return ITrafficCaptureSource.CommitResult.BLOCKED_BY_OTHER_COMMITS;
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import lombok.NonNull;
import org.opensearch.migrations.tracing.BaseNestedSpanContext;
import org.opensearch.migrations.tracing.CommonScopedMetricInstruments;
import org.opensearch.migrations.tracing.IInstrumentationAttributes;
import org.opensearch.migrations.tracing.IScopedInstrumentationAttributes;

public class TrafficSourceContexts {
Expand Down
Loading

0 comments on commit e5aafc6

Please sign in to comment.