Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing some SonarLint findings. #580

Merged
merged 3 commits into from
Apr 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,13 @@ public CodedOutputStreamWrapper createStream() {
}

// Producer Send will block on actions such as retrieving cluster metadata, allows running fully async
public static <K, V> CompletableFuture<RecordMetadata> sendFullyAsync(Producer<K, V> producer, ProducerRecord<K, V> record) {
public static <K, V> CompletableFuture<RecordMetadata> sendFullyAsync(Producer<K, V> producer,
ProducerRecord<K, V> kafkaRecord) {
CompletableFuture<RecordMetadata> completableFuture = new CompletableFuture<>();

ForkJoinPool.commonPool().execute(() -> {
try {
producer.send(record, (metadata, exception) -> {
producer.send(kafkaRecord, (metadata, exception) -> {
if (exception != null) {
completableFuture.completeExceptionally(exception);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import com.google.protobuf.Descriptors;
import com.google.protobuf.Timestamp;
import io.netty.buffer.ByteBuf;

import java.util.function.IntSupplier;
import java.util.function.Supplier;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -131,9 +133,9 @@ private CodedOutputStreamHolder getOrCreateCodedOutputStreamHolder() throws IOEx
}
}

public CompletableFuture<T> flushIfNeeded(Supplier<Integer> requiredSize) throws IOException {
public CompletableFuture<T> flushIfNeeded(IntSupplier requiredSize) throws IOException {
var spaceLeft = getOrCreateCodedOutputStreamHolder().getOutputStreamSpaceLeft();
if (spaceLeft != -1 && spaceLeft < requiredSize.get()) {
if (spaceLeft != -1 && spaceLeft < requiredSize.getAsInt()) {
return flushCommitAndResetStream(false);
}
return CompletableFuture.completedFuture(null);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.opensearch.migrations.trafficcapture;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;

public interface StreamLifecycleManager<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,5 @@
import org.opensearch.migrations.tracing.IRootOtelContext;

public interface IRootOffloaderContext extends IRootOtelContext {
//public static final String OFFLOADER_SCOPE_NAME = "Offloader";
ConnectionContext.MetricInstruments getConnectionInstruments();

// public RootOffloaderContext(OpenTelemetry openTelemetry) {
// this(openTelemetry, OFFLOADER_SCOPE_NAME);
// }
//
// public RootOffloaderContext(OpenTelemetry openTelemetry, String scopeName) {
// super(scopeName, openTelemetry);
// var meter = openTelemetry.getMeterProvider().get(scopeName);
// connectionInstruments = new ConnectionContext.MetricInstruments(meter, scopeName);
// }
}
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,10 @@ public void testWithLimitlessCodedOutputStreamHolder()
var bb = Unpooled.buffer(0);
serializer.addWriteEvent(REFERENCE_TIMESTAMP, bb);
serializer.addWriteEvent(REFERENCE_TIMESTAMP, bb);
var future = serializer.flushCommitAndResetStream(true);
future.get();
Assertions.assertDoesNotThrow(()-> {
var future = serializer.flushCommitAndResetStream(true);
future.get();
});
bb.release();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import java.util.stream.Collectors;

public class Utils {
private Utils() {}

/**
* See https://en.wikipedia.org/wiki/Fold_(higher-order_function)
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.opensearch.migrations.tracing;

import java.util.Collection;
import java.util.Comparator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ default void onContextCreated(IScopedInstrumentationAttributes newScopedContext)
*/
default void onContextClosed(IScopedInstrumentationAttributes newScopedContext) {}

final static IContextTracker DO_NOTHING_TRACKER = new IContextTracker() {};
IContextTracker DO_NOTHING_TRACKER = new IContextTracker() {};
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.opensearch.migrations.tracing;

import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.trace.Span;
import lombok.NonNull;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.AttributesBuilder;
import org.opensearch.migrations.tracing.IInstrumentationAttributes;
import org.opensearch.migrations.tracing.IScopedInstrumentationAttributes;

public interface IConnectionContext extends IScopedInstrumentationAttributes {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ public List<MetricData> getMetricsUntil(String metricName, IntStream sleepTimes,
" did not").log();
Thread.sleep(sleepAmount);
} catch (InterruptedException e) {
throw new RuntimeException(e);
Thread.currentThread().interrupt();
throw Lombok.sneakyThrow(e);
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,14 +192,23 @@ public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
super.handlerRemoved(ctx);
}

/**
* This provides a callback that subclasses can use to override the default behavior of cycling the
* instrumentation context and continuing to read. Subclasses may determine if additional processing
* or triggers should occur before proceeding, given the current context.
* @param ctx the instrumentation context for this request
* @param msg the original message, which is likely a ByteBuf, that helped to form the httpRequest
* @param shouldCapture false if the current request has been determined to be ignorable
* @param httpRequest the request that has just been fully received (excluding its body)
*/
protected void channelFinishedReadingAnHttpMessage(ChannelHandlerContext ctx, Object msg, boolean shouldCapture,
HttpRequest httpRequest) throws Exception {
messageContext = messageContext.createWaitingForResponseContext();
super.channelRead(ctx, msg);
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
public void channelRead(@NonNull ChannelHandlerContext ctx, @NonNull Object msg) throws Exception {
IWireCaptureContexts.IRequestContext requestContext;
if (!(messageContext instanceof IWireCaptureContexts.IRequestContext)) {
messageContext = requestContext = messageContext.createNextRequestContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,16 @@
this.mapWithCaseInsensitiveHeaders = headersToPreserve.caseInsensitiveHeadersMap;
}

@Override
public boolean equals(Object o) {
throw new IllegalStateException("equals() is not supported for this stripped-down version of HttpHeaders");

Check warning on line 36 in TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/PassThruHttpHeaders.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/PassThruHttpHeaders.java#L36

Added line #L36 was not covered by tests
}

@Override
public int hashCode() {
throw new IllegalStateException("hashCode() is not supported for this stripped-down version of HttpHeaders");

Check warning on line 41 in TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/PassThruHttpHeaders.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/PassThruHttpHeaders.java#L41

Added line #L41 was not covered by tests
}

private boolean headerNameShouldBeTracked(CharSequence name) {
return mapWithCaseInsensitiveHeaders.contains(name);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ private static Consumer<EmbeddedChannel> getSingleByteAtATimeWriter(boolean useP
// This test doesn't work yet, but this is an optimization. Getting connections with only a
// close observation is already a common occurrence. This is nice to have, so it's good to
// keep this warm and ready, but we don't need the feature for correctness.
@Disabled
@Disabled("This is for an optimization that isn't functional yet")
@Test
@ValueSource(booleans = {false, true})
public void testThatSuppressedCaptureWorks() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.atError().setCause(cause).setMessage("Caught error").log();
String channelId = ctx.channel().id().asLongText();
log.atError().setCause(cause).setMessage("Caught error for channel: " + ctx.channel().id().asLongText()).log();

Check warning on line 41 in TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/BacksideHandler.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/BacksideHandler.java#L41

Added line #L41 was not covered by tests
FrontsideHandler.closeAndFlush(ctx.channel());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,9 @@ public Entry(F future) {

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("Entry{");
sb.append("timestamp=").append(timestamp);
sb.append(", value=").append(future);
sb.append('}');
return sb.toString();
return "Entry{" + "timestamp=" + timestamp +
", value=" + future +
'}';
}
}

Expand Down Expand Up @@ -150,8 +148,8 @@ private void addExpiredItem() {
private final Consumer<F> onExpirationConsumer;
@Getter
private final EventLoop eventLoop;
private Duration inactivityTimeout;
private GenericFutureListener<F> shuffleInProgressToReady;
private final Duration inactivityTimeout;
private final GenericFutureListener<F> shuffleInProgressToReady;
private final Stats stats;
private int poolSize;

Expand Down Expand Up @@ -305,7 +303,7 @@ private void beginLoadingNewItemIfNecessary() {
@Override
@SneakyThrows
public String toString() {
return eventLoop.submit(() -> toStringOnThread()).get();
return eventLoop.submit(this::toStringOnThread).get();
}

private String toStringOnThread() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public void onTrafficStreamsExpired(RequestResponsePacketPair.ReconstructionStat
public void onTrafficStreamIgnored(@NonNull ITrafficStreamKey tsk) {
underlying.onTrafficStreamIgnored(tsk.getTrafficStreamsContext());
}
};
}

public int numberOfConnectionsCreated() { return liveStreams.numberOfConnectionsCreated(); }
public int numberOfRequestsOnReusedConnections() { return reusedKeepAliveCounter.get(); }
Expand Down Expand Up @@ -361,7 +361,7 @@ private Optional<CONNECTION_STATUS> handleObservationForWriteState(Accumulation
private void handleDroppedRequestForAccumulation(Accumulation accum) {
if (accum.hasRrPair()) {
var rrPair = accum.getRrPair();
rrPair.getTrafficStreamsHeld().forEach(tsk->listener.onTrafficStreamIgnored(tsk));
rrPair.getTrafficStreamsHeld().forEach(listener::onTrafficStreamIgnored);
}
log.atTrace().setMessage(()->"resetting to forget "+ accum.trafficChannelKey).log();
accum.resetToIgnoreAndForgetCurrentRequest();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
Expand Down Expand Up @@ -194,7 +195,7 @@ public static void main(String[] args) throws FileNotFoundException {
}

String baseOutputPath = params.outputDirectoryPath == null ? "./" : params.outputDirectoryPath;
baseOutputPath = !baseOutputPath.endsWith("/") ? baseOutputPath + "/" : baseOutputPath;
baseOutputPath = !baseOutputPath.endsWith(File.separator) ? baseOutputPath + File.separator : baseOutputPath;
String uuid = UUID.randomUUID().toString();
boolean separatePartitionOutputs = false;
Map<Integer, CodedOutputStream> partitionOutputStreams = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,16 +111,15 @@
var requestPushFuture = new StringTrackableCompletableFuture<TransformedTargetRequestAndResponse>(
new CompletableFuture<>(), () -> "Waiting to get response from target");
var requestKey = ctx.getReplayerRequestKey();
liveTrafficStreamLimiter.queueWork(1, ctx, wi -> {
liveTrafficStreamLimiter.queueWork(1, ctx, wi ->
transformAndSendRequest(replayEngine, request, ctx).future.whenComplete((v,t)->{
liveTrafficStreamLimiter.doneProcessing(wi);
if (t != null) {
requestPushFuture.future.completeExceptionally(t);
} else {
requestPushFuture.future.complete(v);
}
});
});
}));
if (!allWorkFinishedForTransaction.future.isDone()) {
log.trace("Adding " + requestKey + " to targetTransactionInProgressMap");
requestWorkTracker.put(requestKey, allWorkFinishedForTransaction);
Expand Down Expand Up @@ -222,9 +221,8 @@
@NonNull Instant timestamp, @NonNull List<ITrafficStreamKey> trafficStreamKeysBeingHeld) {
replayEngine.setFirstTimestamp(timestamp);
var cf = replayEngine.closeConnection(channelInteractionNum, ctx, channelSessionNumber, timestamp);
cf.map(f->f.whenComplete((v,t)->{
commitTrafficStreams(status, trafficStreamKeysBeingHeld);
}), ()->"closing the channel in the ReplayEngine");
cf.map(f->f.whenComplete((v,t) -> commitTrafficStreams(status, trafficStreamKeysBeingHeld)),
()->"closing the channel in the ReplayEngine");

Check warning on line 225 in TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayerCore.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayerCore.java#L225

Added line #L225 was not covered by tests
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,6 @@ private IReplayContexts.IReplayerHttpTransactionContext httpContext() {
writePacketAndUpdateFuture(ByteBuf packetData) {
final var completableFuture = new DiagnosticTrackableCompletableFuture<String, Void>(new CompletableFuture<>(),
()->"CompletableFuture that will wait for the netty future to fill in the completion value");
final int readableBytes = packetData.readableBytes();
channel.writeAndFlush(packetData)
.addListener((ChannelFutureListener) future -> {
Throwable cause = null;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.opensearch.migrations.replay.datatypes;

import lombok.EqualsAndHashCode;

import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -21,4 +20,13 @@
}
return true;
}

@Override
public int hashCode() {
int result = 29;

Check warning on line 26 in TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/RawPackets.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/RawPackets.java#L26

Added line #L26 was not covered by tests
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't love these magic numbers, would prefer some constants here

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline, i'll leave it up to Greg on whether or not to use Arrays.hashCode(toArray());

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm much more comfortable with 2 primes for a hash function than doing an copy.
Honestly, I'd rather see this as a one-liner map-reduce operation like this, but most hashCode operations, especially those generated by IDEs and code generators (we have a lot of protobuf generated ones in out captureProtobufs package).
return stream().mapToInt(Arrays::hashCode).reduce(29, (a,b) -> 31*a+b);

for (byte[] array : this) {
result = 31 * result + Arrays.hashCode(array);
}
return result;

Check warning on line 30 in TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/RawPackets.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/RawPackets.java#L28-L30

Added lines #L28 - L30 were not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.time.Instant;
import java.util.AbstractMap;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Map;
import java.util.StringJoiner;
import java.util.TreeMap;
Expand Down Expand Up @@ -45,9 +46,8 @@
return false;
} else {
return timeToRunnableMap.values().stream()
.flatMap(d->d.stream())
.filter(ct->ct.kind==ChannelTaskType.TRANSMIT)
.findAny().isPresent();
.flatMap(Collection::stream)

Check warning on line 49 in TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/TimeToResponseFulfillmentFutureMap.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/TimeToResponseFulfillmentFutureMap.java#L49

Added line #L49 was not covered by tests
.anyMatch(ct->ct.kind==ChannelTaskType.TRANSMIT);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ public void touch(ITrafficSourceContexts.IBackPressureBlockContext context) {
} finally {
resume();
}
safeCommit(()->context.createCommitContext());
safeCommit(context::createCommitContext);
lastTouchTimeRef.set(clock.instant());
}
}
Expand Down Expand Up @@ -244,9 +244,9 @@ private Collection<TopicPartition> getActivePartitions() {
public <T> Stream<T>
getNextBatchOfRecords(ITrafficSourceContexts.IReadChunkContext context,
BiFunction<KafkaCommitOffsetData, ConsumerRecord<String,byte[]>, T> builder) {
safeCommit(()->context.createCommitContext());
safeCommit(context::createCommitContext);
var records = safePollWithSwallowedRuntimeExceptions(context);
safeCommit(()->context.createCommitContext());
safeCommit(context::createCommitContext);
return applyBuilder(builder, records);
}

Expand Down Expand Up @@ -299,7 +299,7 @@ ITrafficCaptureSource.CommitResult commitKafkaKey(ITrafficStreamKey streamKey, K
+ "). Dropping this commit request since the record would " +
"have been handled again by a current consumer within this process or another. Full key=" +
kafkaTsk).log();
return ITrafficCaptureSource.CommitResult.Ignored;
return ITrafficCaptureSource.CommitResult.IGNORED;
}

var p = kafkaTsk.getPartition();
Expand All @@ -315,12 +315,12 @@ ITrafficCaptureSource.CommitResult commitKafkaKey(ITrafficStreamKey streamKey, K
addKeyContextForEventualCommit(streamKey, kafkaTsk, k);
nextSetOfCommitsMap.put(k, v);
}
return ITrafficCaptureSource.CommitResult.AfterNextRead;
return ITrafficCaptureSource.CommitResult.AFTER_NEXT_READ;
}).orElseGet(() -> {
synchronized (commitDataLock) {
addKeyContextForEventualCommit(streamKey, kafkaTsk, k);
}
return ITrafficCaptureSource.CommitResult.BlockedByOtherCommits;
return ITrafficCaptureSource.CommitResult.BLOCKED_BY_OTHER_COMMITS;
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import lombok.NonNull;
import org.opensearch.migrations.tracing.BaseNestedSpanContext;
import org.opensearch.migrations.tracing.CommonScopedMetricInstruments;
import org.opensearch.migrations.tracing.IInstrumentationAttributes;
import org.opensearch.migrations.tracing.IScopedInstrumentationAttributes;

public class TrafficSourceContexts {
Expand Down
Loading