Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drop user agent matches and a bugfix for over-committing KafkaRecords too early #468

Merged
merged 7 commits into from
Dec 11, 2023
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package org.opensearch.migrations.trafficcapture.protos;

import com.google.protobuf.Timestamp;

import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.Optional;
import java.util.stream.Collectors;
Expand All @@ -22,11 +24,32 @@ public static Optional<Instant> getFirstTimestamp(TrafficStream ts) {

public static String summarizeTrafficStream(TrafficStream ts) {
var listSummaryStr = ts.getSubStreamList().stream()
.map(tso->instantFromProtoTimestamp(tso.getTs()) + ": " + captureCaseToString(tso.getCaptureCase()))
.map(tso->instantFromProtoTimestamp(tso.getTs()) + ": " + captureCaseToString(tso.getCaptureCase())
+ getOptionalContext(tso))
.collect(Collectors.joining(", "));
return ts.getConnectionId() + " (#" + getTrafficStreamIndex(ts) + ")[" + listSummaryStr + "]";
}

private static Object getOptionalContext(TrafficObservation tso) {
return Optional.ofNullable(getByteArrayForDataOf(tso))
.map(b->" " + new String(b, 0, Math.min(3, b.length), StandardCharsets.UTF_8))
.orElse("");
}

private static byte[] getByteArrayForDataOf(TrafficObservation tso) {
if (tso.hasRead()) {
return tso.getRead().getData().toByteArray();
} else if (tso.hasReadSegment()) {
return tso.getReadSegment().getData().toByteArray();
} else if (tso.hasWrite()) {
return tso.getWrite().getData().toByteArray();
} else if (tso.hasWriteSegment()) {
return tso.getWriteSegment().getData().toByteArray();
} else {
return null;
}
}

public static int getTrafficStreamIndex(TrafficStream ts) {
return ts.hasNumber() ? ts.getNumber() : ts.getNumberOfThisLastChunk();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import org.opensearch.migrations.replay.datatypes.ISourceTrafficChannelKey;
import org.opensearch.migrations.replay.datatypes.ITrafficStreamKey;
import org.opensearch.migrations.replay.datatypes.UniqueReplayerRequestKey;
import org.opensearch.migrations.trafficcapture.protos.TrafficStream;

import java.time.Instant;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -26,7 +27,12 @@ enum State {
AtomicLong newestPacketTimestampInMillis;
State state;
AtomicInteger numberOfResets;
final int startingSourceRequestIndex;
int startingSourceRequestIndex;

public Accumulation(ITrafficStreamKey key, TrafficStream ts) {
this(key, ts.getPriorRequestsReceived()+(ts.hasLastObservationWasUnterminatedRead()?1:0),
ts.getLastObservationWasUnterminatedRead());
}

public Accumulation(@NonNull ITrafficStreamKey trafficChannelKey, int startingSourceRequestIndex) {
this(trafficChannelKey, startingSourceRequestIndex, false);
Expand Down Expand Up @@ -106,4 +112,12 @@ public void resetForNextRequest() {
this.state = State.ACCUMULATING_READS;
this.rrPair = null;
}

public void resetToIgnoreAndForgetCurrentRequest() {
if (state == State.IGNORING_LAST_REQUEST) {
--startingSourceRequestIndex;
}
this.state = State.WAITING_FOR_NEXT_READ_CHUNK;
this.rrPair = null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@
public void onExpireAccumulation(String partitionId, Accumulation accumulation) {
connectionsExpiredCounter.incrementAndGet();
log.atTrace().setMessage(()->"firing accumulation for accum=["
+ accumulation.getRequestKey() + "]=" + accumulation)
+ accumulation.getRrPair().getBeginningTrafficStreamKey() + "]=" + accumulation)

Check warning on line 90 in TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/CapturedTrafficToHttpTransactionAccumulator.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/CapturedTrafficToHttpTransactionAccumulator.java#L90

Added line #L90 was not covered by tests
.log();
fireAccumulationsCallbacksAndClose(accumulation,
RequestResponsePacketPair.ReconstructionStatus.EXPIRED_PREMATURELY);
Expand Down Expand Up @@ -163,10 +163,7 @@
" are encountered. Full stream object=" + stream).log();
}

var startingSourceRequestOffset =
stream.getPriorRequestsReceived()+(stream.hasLastObservationWasUnterminatedRead()?1:0);
return new Accumulation(streamWithKey.getKey(),
startingSourceRequestOffset, stream.getLastObservationWasUnterminatedRead());
return new Accumulation(streamWithKey.getKey(), stream);
}

private enum CONNECTION_STATUS {
Expand All @@ -176,7 +173,7 @@
public CONNECTION_STATUS addObservationToAccumulation(@NonNull Accumulation accum,
@NonNull ITrafficStreamKey trafficStreamKey,
TrafficObservation observation) {
log.atTrace().setMessage(()->"Adding observation: "+observation).log();
log.atTrace().setMessage(()->"Adding observation: "+observation+" with state="+accum.state).log();
var timestamp = TrafficStreamUtils.instantFromProtoTimestamp(observation.getTs());
liveStreams.expireOldEntries(trafficStreamKey, accum, timestamp);

Expand All @@ -185,19 +182,23 @@
.or(() -> handleObservationForReadState(accum, observation, trafficStreamKey, timestamp))
.or(() -> handleObservationForWriteState(accum, observation, trafficStreamKey, timestamp))
.orElseGet(() -> {
log.atWarn().setMessage(()->"unaccounted for observation type " + observation +
" for " + accum.trafficChannelKey).log();

Check warning on line 186 in TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/CapturedTrafficToHttpTransactionAccumulator.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/CapturedTrafficToHttpTransactionAccumulator.java#L185-L186

Added lines #L185 - L186 were not covered by tests
return CONNECTION_STATUS.ALIVE;
});
}

private static Optional<CONNECTION_STATUS> handleObservationForSkipState(Accumulation accum, TrafficObservation observation) {
private Optional<CONNECTION_STATUS> handleObservationForSkipState(Accumulation accum, TrafficObservation observation) {
assert !observation.hasClose() : "close will be handled earlier in handleCloseObservationThatAffectEveryState";
if (accum.state == Accumulation.State.IGNORING_LAST_REQUEST) {
if (observation.hasWrite() || observation.hasWriteSegment() || observation.hasEndOfMessageIndicator()) {
if (observation.hasWrite() || observation.hasWriteSegment() ||
observation.hasEndOfMessageIndicator()) {
accum.state = Accumulation.State.WAITING_FOR_NEXT_READ_CHUNK;
} else if (observation.hasRequestDropped()) {
handleDroppedRequestForAccumulation(accum);
}
// ignore everything until we hit an EOM
return Optional.of(observation.hasClose() ? CONNECTION_STATUS.CLOSED : CONNECTION_STATUS.ALIVE);
return Optional.of(CONNECTION_STATUS.ALIVE);
} else if (accum.state == Accumulation.State.WAITING_FOR_NEXT_READ_CHUNK) {
// already processed EOMs above. Be on the lookout to ignore writes
if (!(observation.hasRead() || observation.hasReadSegment())) {
Expand Down Expand Up @@ -271,6 +272,11 @@
var rrPair = accum.getRrPair();
assert rrPair.requestData.hasInProgressSegment();
rrPair.requestData.finalizeRequestSegments(timestamp);
} else if (observation.hasRequestDropped()){
requestCounter.decrementAndGet();
handleDroppedRequestForAccumulation(accum);
} else {
return Optional.empty();

Check warning on line 279 in TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/CapturedTrafficToHttpTransactionAccumulator.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/CapturedTrafficToHttpTransactionAccumulator.java#L279

Added line #L279 was not covered by tests
}
return Optional.of(CONNECTION_STATUS.ALIVE);
}
Expand Down Expand Up @@ -304,19 +310,30 @@
} else if (observation.hasRead() || observation.hasReadSegment()) {
rotateAccumulationOnReadIfNecessary(connectionId, accum);
return handleObservationForReadState(accum, observation, trafficStreamKey, timestamp);
} else {
return Optional.empty();

Check warning on line 314 in TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/CapturedTrafficToHttpTransactionAccumulator.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/CapturedTrafficToHttpTransactionAccumulator.java#L314

Added line #L314 was not covered by tests
}
return Optional.of(CONNECTION_STATUS.ALIVE);

}

private void handleDroppedRequestForAccumulation(Accumulation accum) {
if (accum.hasRrPair()) {
accum.getRrPair().getTrafficStreamsHeld().forEach(listener::onTrafficStreamIgnored);
}
log.atTrace().setMessage(()->"resetting to forget "+ accum.trafficChannelKey).log();
accum.resetToIgnoreAndForgetCurrentRequest();
log.atTrace().setMessage(()->"done resetting to forget and accum="+ accum).log();
}

// This function manages the transition case when an observation comes in that would terminate
// any previous HTTP transaction for the connection. It returns true if there WAS a previous
// transaction that has been reset and false otherwise
private boolean rotateAccumulationIfNecessary(String connectionId, Accumulation accum) {
// If this was brand new, we don't need to care about triggering the callback.
// We only need to worry about this if we have yet to send the RESPONSE.
if (accum.state == Accumulation.State.ACCUMULATING_WRITES) {
log.atDebug().setMessage(()->"Resetting accum[" + connectionId + "]=" + accum).log();
log.atDebug().setMessage(()->"handling EOM for accum[" + connectionId + "]=" + accum).log();
handleEndOfResponse(accum, RequestResponsePacketPair.ReconstructionStatus.COMPLETE);
return true;
}
Expand Down Expand Up @@ -357,6 +374,7 @@
var rrPair = accumulation.getRrPair();
rrPair.completionStatus = status;
listener.onFullDataReceived(accumulation.getRequestKey(), rrPair);
log.atTrace().setMessage("resetting for end of response").log();
accumulation.resetForNextRequest();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;

@Slf4j
public class RequestResponsePacketPair {
Expand All @@ -23,17 +22,17 @@ public enum ReconstructionStatus {

HttpMessageAndTimestamp requestData;
HttpMessageAndTimestamp responseData;
final ITrafficStreamKey trafficStreamKey;
final ITrafficStreamKey firstTrafficStreamKeyForRequest;
List<ITrafficStreamKey> trafficStreamKeysBeingHeld;
ReconstructionStatus completionStatus;

public RequestResponsePacketPair(@NonNull ITrafficStreamKey startingAtTrafficStreamKey) {
this.trafficStreamKeysBeingHeld = new ArrayList<>();
trafficStreamKey = startingAtTrafficStreamKey;
firstTrafficStreamKeyForRequest = startingAtTrafficStreamKey;
}

@NonNull ITrafficStreamKey getBeginningTrafficStreamKey() {
return trafficStreamKey;
return firstTrafficStreamKeyForRequest;
}

public void addRequestData(Instant packetTimeStamp, byte[] data) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@
eventLoop.schedule(task.runnable, getDelayFromNowMs(atTime), TimeUnit.MILLISECONDS);
scheduledFuture.addListener(f->{
if (!f.isSuccess()) {
log.atError().setCause(f.cause()).setMessage(()->"Error scheduling task for " + channelKey).log();

Check warning on line 165 in TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/RequestSenderOrchestrator.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/RequestSenderOrchestrator.java#L165

Added line #L165 was not covered by tests
} else {
log.atInfo().setMessage(()->"scheduled future has finished for "+channelInteraction).log();
}
Expand All @@ -187,7 +187,8 @@
var sf = eventLoop.schedule(runnable, getDelayFromNowMs(kvp.getKey()), TimeUnit.MILLISECONDS);
sf.addListener(sfp->{
if (!sfp.isSuccess()) {
log.atWarn().setCause(sfp.cause()).setMessage(()->"Scheduled future was not successful").log();
log.atWarn().setCause(sfp.cause()).setMessage(()->"Scheduled future was not successful for " +
channelInteraction).log();
}
});
});
Expand Down
Loading
Loading