Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AMQP Perf Test Framework support (Batch and Event Processor APIs) #26031

Merged
merged 13 commits into from
Mar 22, 2022
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.perf.test.core;

import reactor.core.publisher.Mono;

/**
* Represents the abstraction of a Performance test class running operations in batches.
*
* <p>
* The performance test class needs to extend this class. The test class should override {@link BatchPerfTest#runBatch()}
* and {@link BatchPerfTest#runBatchAsync()} methods and the synchronous and asynchronous test logic respectively.
* To add any test setup and logic the test class should override {@link BatchPerfTest#globalSetupAsync()}
* and {@link BatchPerfTest#globalCleanupAsync()} methods .
* </p>
* @param <TOptions> the options configured for the test.
*/
public abstract class BatchPerfTest<TOptions extends PerfStressOptions> extends HttpPerfTestBase<TOptions> {
g2vinay marked this conversation as resolved.
Show resolved Hide resolved

/**
* Creates an instance of Batch performance test.
* @param options the options configured for the test.
* @throws IllegalStateException if SSL context cannot be created.
*/
public BatchPerfTest(TOptions options) {
super(options);
}



public abstract int runBatch();

public abstract Mono<Integer> runBatchAsync();

@Override
int runTest() {
return runBatch();
}

@Override
Mono<Integer> runTestAsync() {
return runBatchAsync();
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.perf.test.core;

import reactor.core.publisher.Mono;

import java.util.concurrent.atomic.AtomicInteger;

/**
* Represents the abstraction of a Performance test class.
*
* <p>
* The performance test class needs to extend this class.
* To add any test setup and logic the test class should override {@link EventPerfTest#globalSetupAsync()}
* and {@link EventPerfTest#globalCleanupAsync()} methods .
* </p>
* @param <TOptions> the options configured for the test.
*/
public abstract class EventPerfTest<TOptions extends PerfStressOptions> extends PerfTestBase<TOptions> {
g2vinay marked this conversation as resolved.
Show resolved Hide resolved

private AtomicInteger completedOps;

private volatile boolean errorRaised;

private long startTime;

/**
* Creates an instance of performance test.
* @param options the options configured for the test.
* @throws IllegalStateException if SSL context cannot be created.
*/
public EventPerfTest(TOptions options) {
super(options);
if (options.getTestProxies() != null && options.getTestProxies().size() > 0) {
throw new IllegalStateException("Test Proxies are not supported for Event Perf Tests.");
}
completedOps = new AtomicInteger(0);
g2vinay marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Indicates an event was raised, and records its count internally.
*/
public void eventRaised() {
g2vinay marked this conversation as resolved.
Show resolved Hide resolved
completedOps.getAndIncrement();
lastCompletionNanoTime = System.nanoTime() - startTime;
}
/**
* Indicates an error was raised, and stops the performance test flow.
*/
public void errorRaised() {
g2vinay marked this conversation as resolved.
Show resolved Hide resolved
errorRaised = true;
lastCompletionNanoTime = System.nanoTime() - startTime;
}

@Override
public void runAll(long endNanoTime) {
startTime = System.nanoTime();
completedOps.set(0);
errorRaised = false;
lastCompletionNanoTime = 0;
while (System.nanoTime() < endNanoTime) {
if (errorRaised) {
break;
g2vinay marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

@Override
public Mono<Void> runAllAsync(long endNanoTime) {
return Mono.fromCallable(() -> {
runAll(endNanoTime);
return Mono.empty();
}).then();
}

@Override
long getCompletedOperations() {
return completedOps.longValue();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
package com.azure.perf.test.core;
g2vinay marked this conversation as resolved.
Show resolved Hide resolved

import com.azure.core.http.HttpClient;
import com.azure.core.http.netty.NettyAsyncHttpClientBuilder;
import com.azure.core.http.policy.HttpPipelinePolicy;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import javax.net.ssl.SSLException;
import java.lang.reflect.Method;
import java.net.URI;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;

/**
* The Base Performance Test class for Http based Perf Tests.
* @param <TOptions> the performance test options to use while running the test.
*/
public abstract class HttpPerfTestBase<TOptions extends PerfStressOptions> extends PerfTestBase<TOptions> {
g2vinay marked this conversation as resolved.
Show resolved Hide resolved
private final reactor.netty.http.client.HttpClient recordPlaybackHttpClient;
g2vinay marked this conversation as resolved.
Show resolved Hide resolved
g2vinay marked this conversation as resolved.
Show resolved Hide resolved
private final URI testProxy;
private final TestProxyPolicy testProxyPolicy;
private String recordingId;

protected final TOptions options;

// Derived classes should use the ConfigureClientBuilder() method by default. If a ClientBuilder does not
// follow the standard convention, it can be configured manually using these fields.
protected final HttpClient httpClient;
protected final Iterable<HttpPipelinePolicy> policies;

private static final AtomicInteger GLOBAL_PARALLEL_INDEX = new AtomicInteger();
protected final int parallelIndex;

/**
* Creates an instance of the Http Based Performance test.
* @param options the performance test options to use while running the test.
*/
public HttpPerfTestBase(TOptions options) {
super(options);
this.options = options;
this.parallelIndex = GLOBAL_PARALLEL_INDEX.getAndIncrement();

final SslContext sslContext;

if (options.isInsecure()) {
try {
sslContext = SslContextBuilder.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.build();
} catch (SSLException e) {
throw new IllegalStateException(e);
}

reactor.netty.http.client.HttpClient nettyHttpClient = reactor.netty.http.client.HttpClient.create()
.secure(sslContextSpec -> sslContextSpec.sslContext(sslContext));

httpClient = new NettyAsyncHttpClientBuilder(nettyHttpClient).build();
} else {
sslContext = null;
httpClient = null;
}

if (options.getTestProxies() != null && !options.getTestProxies().isEmpty()) {
if (options.isInsecure()) {
recordPlaybackHttpClient = reactor.netty.http.client.HttpClient.create()
.secure(sslContextSpec -> sslContextSpec.sslContext(sslContext));
} else {
recordPlaybackHttpClient = reactor.netty.http.client.HttpClient.create();
}

testProxy = options.getTestProxies().get(parallelIndex % options.getTestProxies().size());
testProxyPolicy = new TestProxyPolicy(testProxy);
policies = Arrays.asList(testProxyPolicy);
} else {
recordPlaybackHttpClient = null;
testProxy = null;
testProxyPolicy = null;
policies = null;
}
}

/**
* Attempts to configure a ClientBuilder using reflection. If a ClientBuilder does not follow the standard convention,
* it can be configured manually using the "httpClient" and "policies" fields.
* @param clientBuilder The client builder.
* @throws IllegalStateException If reflective access to get httpClient or addPolicy methods fail.
*/
protected void configureClientBuilder(Object clientBuilder) {
if (httpClient != null || policies != null) {
Class<?> clientBuilderClass = clientBuilder.getClass();

try {
if (httpClient != null) {
Method httpClientMethod = clientBuilderClass.getMethod("httpClient", HttpClient.class);
g2vinay marked this conversation as resolved.
Show resolved Hide resolved
httpClientMethod.invoke(clientBuilder, httpClient);
}

if (policies != null) {
Method addPolicyMethod = clientBuilderClass.getMethod("addPolicy", HttpPipelinePolicy.class);
for (HttpPipelinePolicy policy : policies) {
addPolicyMethod.invoke(clientBuilder, policy);
}
}
} catch (ReflectiveOperationException e) {
throw new IllegalStateException(e);
}
}
}

@Override
public void runAll(long endNanoTime) {
completedOperations = 0;
lastCompletionNanoTime = 0;
long startNanoTime = System.nanoTime();
while (System.nanoTime() < endNanoTime) {
completedOperations += runTest();
lastCompletionNanoTime = System.nanoTime() - startNanoTime;
}
}

@Override
public Mono<Void> runAllAsync(long endNanoTime) {
completedOperations = 0;
lastCompletionNanoTime = 0;
long startNanoTime = System.nanoTime();

return Flux.just(1)
.repeat()
.flatMap(i -> runTestAsync(), 1)
.doOnNext(v -> {
completedOperations +=v;
lastCompletionNanoTime = System.nanoTime() - startNanoTime;
})
.takeWhile(i -> System.nanoTime() < endNanoTime)
.then();
}

/**
* Indicates how many operations were completed in a single run of the test.
* Good to be used for batch operations.
*
* @return the number of successful operations completed.
*/
abstract int runTest();

/**
* Indicates how many operations were completed in a single run of the async test.
* Good to be used for batch operations.
*
* @return the number of successful operations completed.
*/
abstract Mono<Integer> runTestAsync();

/**
* Stops playback tests.
* @return An empty {@link Mono}.
*/
@Override
public Mono<Void> stopPlaybackAsync() {
return recordPlaybackHttpClient
.headers(h -> {
h.set("x-recording-id", recordingId);
h.set("x-purge-inmemory-recording", Boolean.toString(true));
})
g2vinay marked this conversation as resolved.
Show resolved Hide resolved
.post()
.uri(testProxy.resolve("/playback/stop"))
.response()
.doOnSuccess(response -> {
testProxyPolicy.setMode(null);
testProxyPolicy.setRecordingId(null);
})
.then();
}

private Mono<Void> startRecordingAsync() {
return recordPlaybackHttpClient
.post()
.uri(testProxy.resolve("/record/start"))
.response()
.doOnNext(response -> {
recordingId = response.responseHeaders().get("x-recording-id");
})
.then();
}

private Mono<Void> stopRecordingAsync() {
return recordPlaybackHttpClient
.headers(h -> h.set("x-recording-id", recordingId))
.post()
.uri(testProxy.resolve("/record/stop"))
.response()
.then();
}

private Mono<Void> startPlaybackAsync() {
return recordPlaybackHttpClient
.headers(h -> h.set("x-recording-id", recordingId))
.post()
.uri(testProxy.resolve("/playback/start"))
.response()
.doOnNext(response -> {
recordingId = response.responseHeaders().get("x-recording-id");
})
.then();
}


/**
* Records responses and starts tests in playback mode.
*/
@Override
public void postSetup() {
if (testProxyPolicy != null) {

// Make one call to Run() before starting recording, to avoid capturing one-time setup like authorization requests.
runSyncOrAsync();

startRecordingAsync().block();

testProxyPolicy.setRecordingId(recordingId);
testProxyPolicy.setMode("record");

runSyncOrAsync();
stopRecordingAsync().block();
startPlaybackAsync().block();

testProxyPolicy.setRecordingId(recordingId);
testProxyPolicy.setMode("playback");
}
}

private void runSyncOrAsync() {
if (options.isSync()) {
runTest();
} else {
runTestAsync().block();
}
}
}
Loading