Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More context on throws #453

Merged
merged 5 commits into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public MetricsLogBuilder(Logger logger) {
}

public MetricsLogBuilder setAttribute(MetricsAttributeKey key, Object value) {
loggingEventBuilder.addKeyValue(key.getKeyName(), value);
loggingEventBuilder = loggingEventBuilder.addKeyValue(key.getKeyName(), value);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ protected void channelFinishedReadingAnHttpMessage(ChannelHandlerContext ctx, Ob
// This is a spot where we would benefit from having a behavioral policy that different users
// could set as needed. Some users may be fine with just logging a failed offloading of a request
// where other users may want to stop entirely. JIRA here: https://opensearch.atlassian.net/browse/MIGRATIONS-1276
log.warn("Got error: " + t.getMessage());
log.atWarn().setCause(t).setMessage("Got error").log();
ReferenceCountUtil.release(msg);
} else {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void channelRead(final ChannelHandlerContext ctx, Object msg) {
}
});
outboundChannel.config().setAutoRead(true);
} else { // if the outbound channel has died, so be it... let this frontside finish with it's caller naturally
} else { // if the outbound channel has died, so be it... let this frontside finish with its call naturally
log.warn("Output channel (" + outboundChannel + ") is NOT active");
ReferenceCountUtil.release(msg);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,8 @@ public CONNECTION_STATUS addObservationToAccumulation(@NonNull Accumulation accu
.or(() -> handleObservationForReadState(accum, observation, trafficStreamKey, timestamp))
.or(() -> handleObservationForWriteState(accum, observation, trafficStreamKey, timestamp))
.orElseGet(() -> {
log.atWarn().setMessage(()->"unaccounted for observation type " + observation).log();
log.atWarn().setMessage(()->"unaccounted for observation type " + observation +
" for " + accum.trafficChannelKey).log();
return CONNECTION_STATUS.ALIVE;
});
}
Expand Down Expand Up @@ -380,8 +381,9 @@ private void fireAccumulationsCallbacksAndClose(Accumulation accumulation,
// It might be advantageous to replicate these to provide stress to the target server, but
// it's a difficult decision and one to be managed with a policy.
// TODO - add Jira/github issue here.
log.warn("Terminating a TrafficStream reconstruction w/out an accumulated value, " +
"assuming an empty server interaction and NOT reproducing this to the target cluster.");
log.atWarn().setMessage("Terminating a TrafficStream reconstruction before data was accumulated " +
"for " + accumulation.trafficChannelKey + " assuming an empty server interaction and NOT " +
"reproducing this to the target cluster.").log();
if (accumulation.hasRrPair()) {
listener.onTrafficStreamsExpired(status,
Collections.unmodifiableList(accumulation.getRrPair().trafficStreamKeysBeingHeld));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,6 @@ public ConnectionReplaySession getCachedSession(ISourceTrafficChannelKey channel
var schedule = channelAndFutureWork.schedule;
while (channelAndFutureWork.hasWorkRemaining()) {
var scheduledItemToKill = schedule.peekFirstItem();
scheduledItemToKill.getValue();
schedule.removeFirstItem();
}
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.opensearch.migrations.coreutils.MetricsEvent;
import org.opensearch.migrations.coreutils.MetricsLogBuilder;
import org.opensearch.migrations.coreutils.MetricsLogger;
import org.opensearch.migrations.replay.datatypes.UniqueReplayerRequestKey;
import org.opensearch.migrations.replay.datatypes.UniqueSourceRequestKey;

import java.time.Duration;
Expand Down Expand Up @@ -46,7 +47,7 @@ public ParsedHttpMessagesAsDicts(SourceTargetCaptureTuple tuple) {

protected ParsedHttpMessagesAsDicts(SourceTargetCaptureTuple tuple,
Optional<RequestResponsePacketPair> sourcePairOp) {
this(getSourceRequestOp(sourcePairOp),
this(getSourceRequestOp(tuple.uniqueRequestKey, sourcePairOp),
getSourceResponseOp(tuple, sourcePairOp),
getTargetRequestOp(tuple),
getTargetResponseOp(tuple));
Expand All @@ -55,28 +56,30 @@ protected ParsedHttpMessagesAsDicts(SourceTargetCaptureTuple tuple,
private static Optional<Map<String, Object>> getTargetResponseOp(SourceTargetCaptureTuple tuple) {
return Optional.ofNullable(tuple.targetResponseData)
.filter(r -> !r.isEmpty())
.map(d -> convertResponse(d, tuple.targetResponseDuration));
.map(d -> convertResponse(tuple.uniqueRequestKey, d, tuple.targetResponseDuration));
}

private static Optional<Map<String, Object>> getTargetRequestOp(SourceTargetCaptureTuple tuple) {
return Optional.ofNullable(tuple.targetRequestData)
.map(d -> d.asByteArrayStream())
.map(d -> convertRequest(d.collect(Collectors.toList())));
.map(d -> convertRequest(tuple.uniqueRequestKey, d.collect(Collectors.toList())));
}

private static Optional<Map<String, Object>> getSourceResponseOp(SourceTargetCaptureTuple tuple, Optional<RequestResponsePacketPair> sourcePairOp) {
private static Optional<Map<String, Object>> getSourceResponseOp(SourceTargetCaptureTuple tuple,
Optional<RequestResponsePacketPair> sourcePairOp) {
return sourcePairOp.flatMap(p ->
Optional.ofNullable(p.responseData).flatMap(d -> Optional.ofNullable(d.packetBytes))
.map(d -> convertResponse(d,
.map(d -> convertResponse(tuple.uniqueRequestKey, d,
// TODO: These durations are not measuring the same values!
Duration.between(tuple.sourcePair.requestData.getLastPacketTimestamp(),
tuple.sourcePair.responseData.getLastPacketTimestamp()))));
}

private static Optional<Map<String, Object>> getSourceRequestOp(Optional<RequestResponsePacketPair> sourcePairOp) {
private static Optional<Map<String, Object>> getSourceRequestOp(@NonNull UniqueSourceRequestKey diagnosticKey,
Optional<RequestResponsePacketPair> sourcePairOp) {
return sourcePairOp.flatMap(p ->
Optional.ofNullable(p.requestData).flatMap(d -> Optional.ofNullable(d.packetBytes))
.map(d -> convertRequest(d)));
.map(d -> convertRequest(diagnosticKey, d)));
}

public ParsedHttpMessagesAsDicts(Optional<Map<String, Object>> sourceRequestOp1,
Expand Down Expand Up @@ -137,18 +140,22 @@ private static Map<String, Object> fillMap(LinkedHashMap<String, Object> map,
return map;
}

private static Map<String, Object> makeSafeMap(Callable<Map<String, Object>> c) {
private static Map<String, Object> makeSafeMap(@NonNull UniqueSourceRequestKey diagnosticKey,
Callable<Map<String, Object>> c) {
try {
return c.call();
} catch (Exception e) {
log.warn("Putting what may be a bogus value in the output because transforming it " +
"into json threw an exception");
// TODO - this isn't a good design choice.
// We should follow through with the spirit of this class and leave this as empty optional values
log.atWarn().setMessage(()->"Putting what may be a bogus value in the output because transforming it " +
"into json threw an exception for "+diagnosticKey.toString()).setCause(e).log();
return Map.of("Exception", (Object) e.toString());
}
}

private static Map<String, Object> convertRequest(@NonNull List<byte[]> data) {
return makeSafeMap(() -> {
private static Map<String, Object> convertRequest(@NonNull UniqueSourceRequestKey diagnosticKey,
@NonNull List<byte[]> data) {
return makeSafeMap(diagnosticKey, () -> {
var map = new LinkedHashMap<String, Object>();
var message = HttpByteBufFormatter.parseHttpRequestFromBufs(byteToByteBufStream(data), true);
map.put("Request-URI", message.uri());
Expand All @@ -158,8 +165,9 @@ private static Map<String, Object> convertRequest(@NonNull List<byte[]> data) {
});
}

private static Map<String, Object> convertResponse(@NonNull List<byte[]> data, Duration latency) {
return makeSafeMap(() -> {
private static Map<String, Object> convertResponse(@NonNull UniqueSourceRequestKey diagnosticKey,
@NonNull List<byte[]> data, Duration latency) {
return makeSafeMap(diagnosticKey, () -> {
var map = new LinkedHashMap<String, Object>();
var message = HttpByteBufFormatter.parseHttpResponseFromBufs(byteToByteBufStream(data), true);
map.put("HTTP-Version", message.protocolVersion());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ private <T> void scheduleOnConnectionReplaySession(ISourceTrafficChannelKey chan
var sf = eventLoop.schedule(kvp.getValue(), getDelayFromNowMs(kvp.getKey()), TimeUnit.MILLISECONDS);
sf.addListener(sfp->{
if (!sfp.isSuccess()) {
log.atWarn().setCause(sfp.cause()).setMessage(()->"Scheduled future was not successful").log();
log.atWarn().setCause(sfp.cause()).setMessage(()->"Scheduled future was not successful for " +
channelInteraction).log();
}
});
});
Expand All @@ -171,7 +172,7 @@ private <T> void scheduleOnConnectionReplaySession(ISourceTrafficChannelKey chan
eventLoop.schedule(task, getDelayFromNowMs(atTime), TimeUnit.MILLISECONDS);
scheduledFuture.addListener(f->{
if (!f.isSuccess()) {
log.atError().setCause(f.cause()).setMessage(()->"Error scheduling task").log();
log.atError().setCause(f.cause()).setMessage(()->"Error scheduling task for " + channelKey).log();
} else {
log.atInfo().setMessage(()->"scheduled future has finished for "+channelInteraction).log();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ void setupRunAndWaitForReplay(Duration observedPacketConnectionTimeout,
} catch (InterruptedException ex) {
throw ex;
} catch (Exception e) {
log.warn("Terminating runReplay due to", e);
log.atWarn().setCause(e).setMessage("Terminating runReplay due to exception").log();
throw e;
} finally {
trafficToHttpTransactionAccumulator.close();
Expand Down Expand Up @@ -645,7 +645,8 @@ Void handleCompletedTransaction(@NonNull UniqueReplayerRequestKey requestKey, Re
commitTrafficStreams(rrPair.trafficStreamKeysBeingHeld, rrPair.completionStatus);
return null;
} else {
log.atError().setCause(t).setMessage(()->"Throwable passed to handle(). Rethrowing.").log();
log.atError().setCause(t).setMessage(()->"Throwable passed to handle() for " + requestKey +
". Rethrowing.").log();
throw Lombok.sneakyThrow(t);
}
} catch (Error error) {
Expand Down Expand Up @@ -922,7 +923,7 @@ private static String formatWorkItem(DiagnosticTrackableCompletableFuture<String
}

public @NonNull CompletableFuture<Void> shutdown(Error error) {
log.warn("Shutting down "+this+" because of "+error);
log.atWarn().setCause(error).setMessage(()->"Shutting down " + this + " because of error").log();
shutdownReasonRef.compareAndSet(null, error);
if (!shutdownFutureRef.compareAndSet(null, new CompletableFuture<>())) {
log.atError().setMessage(()->"Shutdown was already signaled by {}. " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,7 @@ private static Throwable unwindPossibleCompletionException(Throwable t) {
chunks.stream().collect(
Utils.foldLeft(DiagnosticTrackableCompletableFuture.Factory.
completedFuture(null, ()->"Initial value"),
(dcf, bb) -> dcf.thenCompose(v -> {
var rval = packetConsumer.consumeBytes(bb);
log.error("packetConsumer.consumeBytes()="+rval);
return rval;
},
(dcf, bb) -> dcf.thenCompose(v -> packetConsumer.consumeBytes(bb),
()->"HttpJsonTransformingConsumer.redriveWithoutTransformation collect()")));
DiagnosticTrackableCompletableFuture<String,R> finalizedFuture =
consumptionChainedFuture.thenCompose(v -> packetConsumer.finalizeRequest(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ private void writeHeadersIntoByteBufs(ChannelHandlerContext ctx,
return;
}
} catch (Exception e) {
log.warn("writing headers directly to chunks w/ sizes didn't work: "+e);
log.atWarn().setCause(e).setMessage(()->"writing headers directly to chunks w/ sizes didn't work for " +
httpJson).log();
}

try (var baos = new ByteArrayOutputStream()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package org.opensearch.migrations.replay.datatypes;

import java.util.StringJoiner;

import lombok.EqualsAndHashCode;
import lombok.ToString;
import org.opensearch.migrations.trafficcapture.protos.TrafficStream;
import org.opensearch.migrations.trafficcapture.protos.TrafficStreamUtils;

@ToString
@EqualsAndHashCode()
public class PojoTrafficStreamKey implements ITrafficStreamKey {
private final String nodeId;
Expand Down Expand Up @@ -36,4 +36,13 @@ public String getConnectionId() {
public int getTrafficStreamIndex() {
return trafficStreamIndex;
}

@Override
public String toString() {
return new StringJoiner(".")
.add(nodeId)
.add(connectionId)
.add(""+trafficStreamIndex)
.toString();
}
}
Loading