Skip to content

Commit

Permalink
More context on throws (#453)
Browse files Browse the repository at this point in the history
* Add more context to a number of warn/error log statements and tighten RequestKey.toString().
Also pass the request key through ParsedHttpMessagesAsDicts so that the request key is present in the (which I've just realized should be sending back optionals or sneaky throws & letting the callers do this conversion... next change).
* Delint - Fix a bug in metrics logging where a fluent API's response was being dropped.
This can create an error down the line if the underlying implementation changes, even if it works now.
I've also removed a getValue() that didn't do anything.

Signed-off-by: Greg Schohn <[email protected]>
  • Loading branch information
gregschohn authored Dec 8, 2023
1 parent 908f360 commit 44ec296
Show file tree
Hide file tree
Showing 16 changed files with 380 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public MetricsLogBuilder(Logger logger) {
}

public MetricsLogBuilder setAttribute(MetricsAttributeKey key, Object value) {
loggingEventBuilder.addKeyValue(key.getKeyName(), value);
loggingEventBuilder = loggingEventBuilder.addKeyValue(key.getKeyName(), value);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ protected void channelFinishedReadingAnHttpMessage(ChannelHandlerContext ctx, Ob
// This is a spot where we would benefit from having a behavioral policy that different users
// could set as needed. Some users may be fine with just logging a failed offloading of a request
// where other users may want to stop entirely. JIRA here: https://opensearch.atlassian.net/browse/MIGRATIONS-1276
log.warn("Got error: " + t.getMessage());
log.atWarn().setCause(t).setMessage("Got error").log();
ReferenceCountUtil.release(msg);
} else {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void channelRead(final ChannelHandlerContext ctx, Object msg) {
}
});
outboundChannel.config().setAutoRead(true);
} else { // if the outbound channel has died, so be it... let this frontside finish with it's caller naturally
} else { // if the outbound channel has died, so be it... let this frontside finish with its call naturally
log.warn("Output channel (" + outboundChannel + ") is NOT active");
ReferenceCountUtil.release(msg);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,8 @@ public CONNECTION_STATUS addObservationToAccumulation(@NonNull Accumulation accu
.or(() -> handleObservationForReadState(accum, observation, trafficStreamKey, timestamp))
.or(() -> handleObservationForWriteState(accum, observation, trafficStreamKey, timestamp))
.orElseGet(() -> {
log.atWarn().setMessage(()->"unaccounted for observation type " + observation).log();
log.atWarn().setMessage(()->"unaccounted for observation type " + observation +
" for " + accum.trafficChannelKey).log();
return CONNECTION_STATUS.ALIVE;
});
}
Expand Down Expand Up @@ -380,8 +381,9 @@ private void fireAccumulationsCallbacksAndClose(Accumulation accumulation,
// It might be advantageous to replicate these to provide stress to the target server, but
// it's a difficult decision and one to be managed with a policy.
// TODO - add Jira/github issue here.
log.warn("Terminating a TrafficStream reconstruction w/out an accumulated value, " +
"assuming an empty server interaction and NOT reproducing this to the target cluster.");
log.atWarn().setMessage("Terminating a TrafficStream reconstruction before data was accumulated " +
"for " + accumulation.trafficChannelKey + " assuming an empty server interaction and NOT " +
"reproducing this to the target cluster.").log();
if (accumulation.hasRrPair()) {
listener.onTrafficStreamsExpired(status,
Collections.unmodifiableList(accumulation.getRrPair().trafficStreamKeysBeingHeld));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,6 @@ public ConnectionReplaySession getCachedSession(ISourceTrafficChannelKey channel
var schedule = channelAndFutureWork.schedule;
while (channelAndFutureWork.hasWorkRemaining()) {
var scheduledItemToKill = schedule.peekFirstItem();
scheduledItemToKill.getValue();
schedule.removeFirstItem();
}
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.opensearch.migrations.coreutils.MetricsEvent;
import org.opensearch.migrations.coreutils.MetricsLogBuilder;
import org.opensearch.migrations.coreutils.MetricsLogger;
import org.opensearch.migrations.replay.datatypes.UniqueReplayerRequestKey;
import org.opensearch.migrations.replay.datatypes.UniqueSourceRequestKey;

import java.time.Duration;
Expand Down Expand Up @@ -46,7 +47,7 @@ public ParsedHttpMessagesAsDicts(SourceTargetCaptureTuple tuple) {

protected ParsedHttpMessagesAsDicts(SourceTargetCaptureTuple tuple,
Optional<RequestResponsePacketPair> sourcePairOp) {
this(getSourceRequestOp(sourcePairOp),
this(getSourceRequestOp(tuple.uniqueRequestKey, sourcePairOp),
getSourceResponseOp(tuple, sourcePairOp),
getTargetRequestOp(tuple),
getTargetResponseOp(tuple));
Expand All @@ -55,28 +56,30 @@ protected ParsedHttpMessagesAsDicts(SourceTargetCaptureTuple tuple,
private static Optional<Map<String, Object>> getTargetResponseOp(SourceTargetCaptureTuple tuple) {
return Optional.ofNullable(tuple.targetResponseData)
.filter(r -> !r.isEmpty())
.map(d -> convertResponse(d, tuple.targetResponseDuration));
.map(d -> convertResponse(tuple.uniqueRequestKey, d, tuple.targetResponseDuration));
}

private static Optional<Map<String, Object>> getTargetRequestOp(SourceTargetCaptureTuple tuple) {
return Optional.ofNullable(tuple.targetRequestData)
.map(d -> d.asByteArrayStream())
.map(d -> convertRequest(d.collect(Collectors.toList())));
.map(d -> convertRequest(tuple.uniqueRequestKey, d.collect(Collectors.toList())));
}

private static Optional<Map<String, Object>> getSourceResponseOp(SourceTargetCaptureTuple tuple, Optional<RequestResponsePacketPair> sourcePairOp) {
private static Optional<Map<String, Object>> getSourceResponseOp(SourceTargetCaptureTuple tuple,
Optional<RequestResponsePacketPair> sourcePairOp) {
return sourcePairOp.flatMap(p ->
Optional.ofNullable(p.responseData).flatMap(d -> Optional.ofNullable(d.packetBytes))
.map(d -> convertResponse(d,
.map(d -> convertResponse(tuple.uniqueRequestKey, d,
// TODO: These durations are not measuring the same values!
Duration.between(tuple.sourcePair.requestData.getLastPacketTimestamp(),
tuple.sourcePair.responseData.getLastPacketTimestamp()))));
}

private static Optional<Map<String, Object>> getSourceRequestOp(Optional<RequestResponsePacketPair> sourcePairOp) {
private static Optional<Map<String, Object>> getSourceRequestOp(@NonNull UniqueSourceRequestKey diagnosticKey,
Optional<RequestResponsePacketPair> sourcePairOp) {
return sourcePairOp.flatMap(p ->
Optional.ofNullable(p.requestData).flatMap(d -> Optional.ofNullable(d.packetBytes))
.map(d -> convertRequest(d)));
.map(d -> convertRequest(diagnosticKey, d)));
}

public ParsedHttpMessagesAsDicts(Optional<Map<String, Object>> sourceRequestOp1,
Expand Down Expand Up @@ -137,18 +140,22 @@ private static Map<String, Object> fillMap(LinkedHashMap<String, Object> map,
return map;
}

private static Map<String, Object> makeSafeMap(Callable<Map<String, Object>> c) {
private static Map<String, Object> makeSafeMap(@NonNull UniqueSourceRequestKey diagnosticKey,
Callable<Map<String, Object>> c) {
try {
return c.call();
} catch (Exception e) {
log.warn("Putting what may be a bogus value in the output because transforming it " +
"into json threw an exception");
// TODO - this isn't a good design choice.
// We should follow through with the spirit of this class and leave this as empty optional values
log.atWarn().setMessage(()->"Putting what may be a bogus value in the output because transforming it " +
"into json threw an exception for "+diagnosticKey.toString()).setCause(e).log();
return Map.of("Exception", (Object) e.toString());
}
}

private static Map<String, Object> convertRequest(@NonNull List<byte[]> data) {
return makeSafeMap(() -> {
private static Map<String, Object> convertRequest(@NonNull UniqueSourceRequestKey diagnosticKey,
@NonNull List<byte[]> data) {
return makeSafeMap(diagnosticKey, () -> {
var map = new LinkedHashMap<String, Object>();
var message = HttpByteBufFormatter.parseHttpRequestFromBufs(byteToByteBufStream(data), true);
map.put("Request-URI", message.uri());
Expand All @@ -158,8 +165,9 @@ private static Map<String, Object> convertRequest(@NonNull List<byte[]> data) {
});
}

private static Map<String, Object> convertResponse(@NonNull List<byte[]> data, Duration latency) {
return makeSafeMap(() -> {
private static Map<String, Object> convertResponse(@NonNull UniqueSourceRequestKey diagnosticKey,
@NonNull List<byte[]> data, Duration latency) {
return makeSafeMap(diagnosticKey, () -> {
var map = new LinkedHashMap<String, Object>();
var message = HttpByteBufFormatter.parseHttpResponseFromBufs(byteToByteBufStream(data), true);
map.put("HTTP-Version", message.protocolVersion());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ private <T> void scheduleOnConnectionReplaySession(ISourceTrafficChannelKey chan
var sf = eventLoop.schedule(kvp.getValue(), getDelayFromNowMs(kvp.getKey()), TimeUnit.MILLISECONDS);
sf.addListener(sfp->{
if (!sfp.isSuccess()) {
log.atWarn().setCause(sfp.cause()).setMessage(()->"Scheduled future was not successful").log();
log.atWarn().setCause(sfp.cause()).setMessage(()->"Scheduled future was not successful for " +
channelInteraction).log();
}
});
});
Expand All @@ -171,7 +172,7 @@ private <T> void scheduleOnConnectionReplaySession(ISourceTrafficChannelKey chan
eventLoop.schedule(task, getDelayFromNowMs(atTime), TimeUnit.MILLISECONDS);
scheduledFuture.addListener(f->{
if (!f.isSuccess()) {
log.atError().setCause(f.cause()).setMessage(()->"Error scheduling task").log();
log.atError().setCause(f.cause()).setMessage(()->"Error scheduling task for " + channelKey).log();
} else {
log.atInfo().setMessage(()->"scheduled future has finished for "+channelInteraction).log();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ void setupRunAndWaitForReplay(Duration observedPacketConnectionTimeout,
} catch (InterruptedException ex) {
throw ex;
} catch (Exception e) {
log.warn("Terminating runReplay due to", e);
log.atWarn().setCause(e).setMessage("Terminating runReplay due to exception").log();
throw e;
} finally {
trafficToHttpTransactionAccumulator.close();
Expand Down Expand Up @@ -645,7 +645,8 @@ Void handleCompletedTransaction(@NonNull UniqueReplayerRequestKey requestKey, Re
commitTrafficStreams(rrPair.trafficStreamKeysBeingHeld, rrPair.completionStatus);
return null;
} else {
log.atError().setCause(t).setMessage(()->"Throwable passed to handle(). Rethrowing.").log();
log.atError().setCause(t).setMessage(()->"Throwable passed to handle() for " + requestKey +
". Rethrowing.").log();
throw Lombok.sneakyThrow(t);
}
} catch (Error error) {
Expand Down Expand Up @@ -922,7 +923,7 @@ private static String formatWorkItem(DiagnosticTrackableCompletableFuture<String
}

public @NonNull CompletableFuture<Void> shutdown(Error error) {
log.warn("Shutting down "+this+" because of "+error);
log.atWarn().setCause(error).setMessage(()->"Shutting down " + this + " because of error").log();
shutdownReasonRef.compareAndSet(null, error);
if (!shutdownFutureRef.compareAndSet(null, new CompletableFuture<>())) {
log.atError().setMessage(()->"Shutdown was already signaled by {}. " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,7 @@ private static Throwable unwindPossibleCompletionException(Throwable t) {
chunks.stream().collect(
Utils.foldLeft(DiagnosticTrackableCompletableFuture.Factory.
completedFuture(null, ()->"Initial value"),
(dcf, bb) -> dcf.thenCompose(v -> {
var rval = packetConsumer.consumeBytes(bb);
log.error("packetConsumer.consumeBytes()="+rval);
return rval;
},
(dcf, bb) -> dcf.thenCompose(v -> packetConsumer.consumeBytes(bb),
()->"HttpJsonTransformingConsumer.redriveWithoutTransformation collect()")));
DiagnosticTrackableCompletableFuture<String,R> finalizedFuture =
consumptionChainedFuture.thenCompose(v -> packetConsumer.finalizeRequest(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ private void writeHeadersIntoByteBufs(ChannelHandlerContext ctx,
return;
}
} catch (Exception e) {
log.warn("writing headers directly to chunks w/ sizes didn't work: "+e);
log.atWarn().setCause(e).setMessage(()->"writing headers directly to chunks w/ sizes didn't work for " +
httpJson).log();
}

try (var baos = new ByteArrayOutputStream()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package org.opensearch.migrations.replay.datatypes;

import java.util.StringJoiner;

import lombok.EqualsAndHashCode;
import lombok.ToString;
import org.opensearch.migrations.trafficcapture.protos.TrafficStream;
import org.opensearch.migrations.trafficcapture.protos.TrafficStreamUtils;

@ToString
@EqualsAndHashCode()
public class PojoTrafficStreamKey implements ITrafficStreamKey {
private final String nodeId;
Expand Down Expand Up @@ -36,4 +36,13 @@ public String getConnectionId() {
public int getTrafficStreamIndex() {
return trafficStreamIndex;
}

@Override
public String toString() {
return new StringJoiner(".")
.add(nodeId)
.add(connectionId)
.add(""+trafficStreamIndex)
.toString();
}
}
Loading

0 comments on commit 44ec296

Please sign in to comment.