Skip to content

Commit

Permalink
Bugfix, test fix, lint fix.
Browse files Browse the repository at this point in the history
Bugfix is in the capture proxy's channel context's `sendMeterEventsForEnd()` override to call super so that we'll pickup duration, etc metrics too.
The lint fix is in a new python script to facilitate testing from the migration console.
The test fix is to give each TrafficReplayer run a fresh TestContext.   That context includes channel contexts, which should not be reused across process boundaries and likewise shouldn't be getting reused if we're trying to simulate that for repeated runs.

Signed-off-by: Greg Schohn <[email protected]>
  • Loading branch information
gregschohn committed Feb 7, 2024
1 parent 9aa3432 commit 48a45ab
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ protected MetricInstruments(Meter meter, String activityName) {

@Override
public void sendMeterEventsForEnd() {
super.sendMeterEventsForEnd();
meterDeltaEvent(getMetrics().activeConnectionsCounter, -1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ COPY showFetchMigrationCommand.sh /root/
COPY setupIntegTests.sh /root/
COPY msk-iam-auth.properties /root/kafka-tools/aws
COPY kafkaCmdRef.md /root/kafka-tools
RUN ln -s /usr/bin/python3 /usr/bin/python
RUN chmod ug+x /root/runTestBenchmarks.sh
RUN chmod ug+x /root/humanReadableLogs.py
RUN chmod ug+x /root/simpleDocumentGenerator.py
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,20 @@
from datetime import datetime

# url_base="http://test.elb.us-west-2.amazonaws.com:9200"
username='admin'
password='admin'
username = 'admin'
password = 'admin'

session = requests.Session()
keep_alive_headers = {
'Connection': 'keep-alive'
}


# Function to get current date in a specific format for indexing
def get_current_date_index():
return datetime.now().strftime("%Y-%m-%d")


# Function to send a request
def send_request(index, counter, url_base):
url = f"{url_base}/{index}/_doc/{counter}"
Expand All @@ -24,20 +31,23 @@ def send_request(index, counter, url_base):
}

try:
# a new connection for every request
#response = requests.put(url, json=payload, auth=auth)
response = requests.put(url, auth=auth, json=payload, verify=False)
response = session.put(url, json=payload, auth=auth, headers=keep_alive_headers, verify=False)
print(response.text)
print(f"Request sent at {timestamp}: {response.status_code}")
return response.status_code
except requests.RequestException as e:
print(f"Error sending request: {e}")
return None


def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--endpoint", help="Source cluster endpoint e.g. http://test.elb.us-west-2.amazonaws.com:9200.")
return parser.parse_args()


args = parse_args()
# Main loop
counter = 1
Expand All @@ -58,6 +68,7 @@ def parse_args():
total5xxCount += 1
else:
totalErrorCount += 1
print(f"Summary: 2xx responses = {total2xxCount}, 4xx responses = {total4xxCount}, 5xx responses = {total5xxCount}, Error requests = {totalErrorCount}")
print(f"Summary: 2xx responses = {total2xxCount}, 4xx responses = {total4xxCount}, "
f"5xx responses = {total5xxCount}, Error requests = {totalErrorCount}")
counter += 1
time.sleep(0.1)
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
public abstract class IWireCaptureContexts {

public static class ActivityNames {
private ActivityNames() {}
public static final String BLOCKED = "blocked";
public static final String GATHERING_REQUEST = "gatheringRequest";
public static final String WAITING_FOR_RESPONSE = "waitingForResponse";
public static final String GATHERING_RESPONSE = "gatheringResponse";
}

public static class MetricNames {
private MetricNames() {}
public static final String UNREGISTERED = "unregistered";
public static final String REMOVED = "removed";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@
@WrapWithNettyLeakDetection(disableLeakChecks = true)
public class FullReplayerWithTracingChecksTest extends FullTrafficReplayerTest {

protected TestContext makeInstrumentationContext() { return TestContext.withAllTracking(); }

@Test
public void testSingleStreamWithCloseIsCommitted() throws Throwable {
var random = new Random(1);
Expand All @@ -48,17 +46,19 @@ public void testSingleStreamWithCloseIsCommitted() throws Throwable {
.addSubStream(TrafficObservation.newBuilder()
.setClose(CloseObservation.newBuilder().build()).build())
.build();
var trafficSourceSupplier = new FullTrafficReplayerTest.ArrayCursorTrafficSourceFactory(rootContext,
List.of(trafficStreamWithJustClose));
TrafficReplayerRunner.runReplayerUntilSourceWasExhausted(rootContext, 0,
httpServer.localhostEndpoint(), new FullTrafficReplayerTest.IndexWatchingListenerFactory(), trafficSourceSupplier);
var trafficSourceSupplier = new FullTrafficReplayerTest.ArrayCursorTrafficSourceFactory(List.of(trafficStreamWithJustClose));
TrafficReplayerRunner.runReplayerUntilSourceWasExhausted(0,
httpServer.localhostEndpoint(), new FullTrafficReplayerTest.IndexWatchingListenerFactory(),
() -> TestContext.noOtelTracking(),
trafficSourceSupplier);
Assertions.assertEquals(1, trafficSourceSupplier.nextReadCursor.get());
log.info("done");
}

@ParameterizedTest
@ValueSource(ints = {1,2})
public void testStreamWithRequestsWithCloseIsCommittedOnce(int numRequests) throws Throwable {
var rootContext = TestContext.withAllTracking();
var random = new Random(1);
var httpServer = SimpleNettyHttpServer.makeServer(false, Duration.ofMillis(2),
response->TestHttpServerContext.makeResponse(random, response));
Expand Down Expand Up @@ -91,7 +91,7 @@ public void testStreamWithRequestsWithCloseIsCommittedOnce(int numRequests) thro
.build();
var trafficSource =
new ArrayCursorTrafficCaptureSource(rootContext,
new ArrayCursorTrafficSourceFactory(rootContext, List.of(trafficStream)));
new ArrayCursorTrafficSourceFactory(List.of(trafficStream)));
var tr = new TrafficReplayer(rootContext, httpServer.localhostEndpoint(), null,
new StaticAuthTransformerFactory("TEST"), null,
true, 10, 10 * 1024);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

Expand All @@ -40,7 +41,7 @@
// to the test server, a shutdown will stop those work threads without letting them flush through all of their work
// (since that could take a very long time) and some of the work might have been followed by resource releases.
@WrapWithNettyLeakDetection(disableLeakChecks = true)
public class FullTrafficReplayerTest extends InstrumentationTest {
public class FullTrafficReplayerTest {

public static final int INITIAL_STOP_REPLAYER_REQUEST_COUNT = 1;
public static final String TEST_NODE_ID = "TestNodeId";
Expand Down Expand Up @@ -77,16 +78,19 @@ public void fullTest(int testSize, boolean randomize) throws Throwable {
var random = new Random(1);
var httpServer = SimpleNettyHttpServer.makeServer(false, Duration.ofMillis(200),
response -> TestHttpServerContext.makeResponse(random, response));
var streamAndConsumer =
TrafficStreamGenerator.generateStreamAndSumOfItsTransactions(rootContext, testSize, randomize);
var numExpectedRequests = streamAndConsumer.numHttpTransactions;
var trafficStreams = streamAndConsumer.stream.collect(Collectors.toList());
var streamAndSizes = TrafficStreamGenerator.generateStreamAndSumOfItsTransactions(TestContext.noOtelTracking(),
testSize, randomize);
var numExpectedRequests = streamAndSizes.numHttpTransactions;
var trafficStreams = streamAndSizes.stream.collect(Collectors.toList());
log.atInfo().setMessage(() -> trafficStreams.stream().map(ts -> TrafficStreamUtils.summarizeTrafficStream(ts))
.collect(Collectors.joining("\n"))).log();
var trafficSourceSupplier = new ArrayCursorTrafficSourceFactory(rootContext, trafficStreams);
TrafficReplayerRunner.runReplayerUntilSourceWasExhausted(rootContext, numExpectedRequests,
httpServer.localhostEndpoint(), new IndexWatchingListenerFactory(), trafficSourceSupplier);
Assertions.assertEquals(trafficSourceSupplier.trafficStreamsList.size(), trafficSourceSupplier.nextReadCursor.get());
var trafficSourceSupplier = new ArrayCursorTrafficSourceFactory(trafficStreams);
TrafficReplayerRunner.runReplayerUntilSourceWasExhausted(
numExpectedRequests, httpServer.localhostEndpoint(), new IndexWatchingListenerFactory(),
() -> TestContext.noOtelTracking(),
trafficSourceSupplier);
Assertions.assertEquals(trafficSourceSupplier.trafficStreamsList.size(),
trafficSourceSupplier.nextReadCursor.get());
log.info("done");
}

Expand Down Expand Up @@ -115,17 +119,15 @@ public int compareTo(TrafficStreamCursorKey other) {
}
}

protected static class ArrayCursorTrafficSourceFactory implements Supplier<ISimpleTrafficCaptureSource> {
private final TestContext rootContext;
protected static class ArrayCursorTrafficSourceFactory implements Function<TestContext, ISimpleTrafficCaptureSource> {
List<TrafficStream> trafficStreamsList;
AtomicInteger nextReadCursor = new AtomicInteger();

public ArrayCursorTrafficSourceFactory(TestContext rootContext, List<TrafficStream> trafficStreamsList) {
this.rootContext = rootContext;
public ArrayCursorTrafficSourceFactory(List<TrafficStream> trafficStreamsList) {
this.trafficStreamsList = trafficStreamsList;
}

public ISimpleTrafficCaptureSource get() {
public ISimpleTrafficCaptureSource apply(TestContext rootContext) {
var rval = new ArrayCursorTrafficCaptureSource(rootContext, this);
log.info("trafficSource="+rval+" readCursor="+rval.readCursor.get()+" nextReadCursor="+ nextReadCursor.get());
return rval;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,16 +89,17 @@ public void fullTest(int testSize, boolean randomize) throws Throwable {
var httpServer = SimpleNettyHttpServer.makeServer(false, Duration.ofMillis(2),
response->TestHttpServerContext.makeResponse(random, response));
var streamAndConsumer =
TrafficStreamGenerator.generateStreamAndSumOfItsTransactions(rootContext, testSize, randomize);
TrafficStreamGenerator.generateStreamAndSumOfItsTransactions(TestContext.noOtelTracking(), testSize, randomize);
var trafficStreams = streamAndConsumer.stream.collect(Collectors.toList());
log.atInfo().setMessage(()->trafficStreams.stream().map(TrafficStreamUtils::summarizeTrafficStream)
.collect(Collectors.joining("\n"))).log();

loadStreamsToKafka(buildKafkaConsumer(),
Streams.concat(trafficStreams.stream(), Stream.of(SENTINEL_TRAFFIC_STREAM)));
TrafficReplayerRunner.runReplayerUntilSourceWasExhausted(rootContext, streamAndConsumer.numHttpTransactions,
TrafficReplayerRunner.runReplayerUntilSourceWasExhausted(streamAndConsumer.numHttpTransactions,
httpServer.localhostEndpoint(), new CounterLimitedReceiverFactory(),
() -> new SentinelSensingTrafficSource(
() -> TestContext.noOtelTracking(),
rootContext -> new SentinelSensingTrafficSource(
new KafkaTrafficCaptureSource(rootContext, buildKafkaConsumer(), TEST_TOPIC_NAME,
Duration.ofMillis(DEFAULT_POLL_INTERVAL_MS))));
log.info("done");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand All @@ -35,9 +36,10 @@ static class FabricatedErrorToKillTheReplayer extends Error {

private TrafficReplayerRunner() {}

static void runReplayerUntilSourceWasExhausted(TestContext rootContext, int numExpectedRequests, URI endpoint,
static void runReplayerUntilSourceWasExhausted(int numExpectedRequests, URI endpoint,
Supplier<Consumer<SourceTargetCaptureTuple>> tupleListenerSupplier,
Supplier<ISimpleTrafficCaptureSource> trafficSourceSupplier)
Supplier<TestContext> rootContextSupplier,
Function<TestContext, ISimpleTrafficCaptureSource> trafficSourceFactory)
throws Throwable {
AtomicInteger runNumberRef = new AtomicInteger();
var totalUniqueEverReceived = new AtomicInteger();
Expand All @@ -51,7 +53,8 @@ static void runReplayerUntilSourceWasExhausted(TestContext rootContext, int numE
var counter = new AtomicInteger();
var tupleReceiver = tupleListenerSupplier.get();
try {
runTrafficReplayer(rootContext, trafficSourceSupplier, endpoint, (t) -> {
var rootContext = rootContextSupplier.get();
runTrafficReplayer(rootContext, ()->trafficSourceFactory.apply(rootContext), endpoint, (t) -> {
if (runNumber != runNumberRef.get()) {
// for an old replayer. I'm not sure why shutdown isn't blocking until all threads are dead,
// but that behavior only impacts this test as far as I can tell.
Expand Down

0 comments on commit 48a45ab

Please sign in to comment.