Skip to content

Commit

Permalink
merge: camunda#13976
Browse files Browse the repository at this point in the history
13976: Set default stream timeout and enable stream as default in benchmarks r=rodrigo-lourenco-lopes a=rodrigo-lourenco-lopes

## Description

This PR enables streaming as default in all benchmarks (can be changed in application.conf) and sets the default stream timeout as 8 hours.

## Related issues

<!-- Which issues are closed by this PR or are related -->

related camunda#13914



Co-authored-by: rodrigolourencolopes <[email protected]>
  • Loading branch information
zeebe-bors-camunda[bot] and rodrigo-lourenco-lopes authored Aug 23, 2023
2 parents e59b1d5 + 6467014 commit bab683c
Show file tree
Hide file tree
Showing 10 changed files with 60 additions and 27 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/benchmark.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ on:
default: europe-west1-b
required: false
benchmark-load:
description: 'Specifies which benchmark components to deploy. `starter`, `timer` and `publisher` can be assigned with the rate at which they publish. Allows arbitary helm arguments, like --set starter.rate=100'
description: 'Specifies which benchmark components to deploy. `starter`, `timer` and `publisher` can be assigned with the rate at which they publish. Allows arbitrary helm arguments, like --set starter.rate=100'
required: false
publish:
description: 'Where to publish the results, can be "slack" or "comment"'
Expand Down Expand Up @@ -58,7 +58,7 @@ on:
type: string
required: false
benchmark-load:
description: 'Specifies which benchmark components to deploy. `starter`, `timer` and `publisher` can be assigned with the rate at which they publish. Allows arbitary helm arguments, like --set starter.rate=100'
description: 'Specifies which benchmark components to deploy. `starter`, `timer` and `publisher` can be assigned with the rate at which they publish. Allows arbitrary helm arguments, like --set starter.rate=100'
type: string
required: false
measure:
Expand Down
2 changes: 2 additions & 0 deletions benchmarks/project/src/main/java/io/camunda/zeebe/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public void run() {
final WorkerCfg workerCfg = appCfg.getWorker();
final String jobType = workerCfg.getJobType();
final long completionDelay = workerCfg.getCompletionDelay().toMillis();
final boolean isStreamEnabled = workerCfg.isStreamEnabled();
final var variables = readVariables(workerCfg.getPayloadPath());
final BlockingQueue<Future<?>> requestFutures = new ArrayBlockingQueue<>(10_000);
final BlockingDeque<DelayedCommand> delayedCommands = new LinkedBlockingDeque<>(10_000);
Expand Down Expand Up @@ -67,6 +68,7 @@ public void run() {
requestFutures.add(command.send());
}
})
.streamEnabled(isStreamEnabled)
.open();

final ResponseChecker responseChecker = new ResponseChecker(requestFutures);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,77 +21,83 @@ public class WorkerCfg {

private String jobType;
private String workerName;

private int threads;

private int capacity;
private Duration pollingDelay;
private Duration completionDelay;
private boolean completeJobsAsync;

private String payloadPath;
private boolean isStreamEnabled;

public String getJobType() {
return jobType;
}

public void setJobType(String jobType) {
public void setJobType(final String jobType) {
this.jobType = jobType;
}

public String getWorkerName() {
return workerName;
}

public void setWorkerName(String workerName) {
public void setWorkerName(final String workerName) {
this.workerName = workerName;
}

public int getThreads() {
return threads;
}

public void setThreads(int threads) {
public void setThreads(final int threads) {
this.threads = threads;
}

public int getCapacity() {
return capacity;
}

public void setCapacity(int capacity) {
public void setCapacity(final int capacity) {
this.capacity = capacity;
}

public Duration getPollingDelay() {
return pollingDelay;
}

public void setPollingDelay(Duration pollingDelay) {
public void setPollingDelay(final Duration pollingDelay) {
this.pollingDelay = pollingDelay;
}

public Duration getCompletionDelay() {
return completionDelay;
}

public void setCompletionDelay(Duration completionDelay) {
public void setCompletionDelay(final Duration completionDelay) {
this.completionDelay = completionDelay;
}

public String getPayloadPath() {
return payloadPath;
}

public void setPayloadPath(String payloadPath) {
public void setPayloadPath(final String payloadPath) {
this.payloadPath = payloadPath;
}

public boolean isCompleteJobsAsync() {
return completeJobsAsync;
}

public void setCompleteJobsAsync(boolean completeJobsAsync) {
public void setCompleteJobsAsync(final boolean completeJobsAsync) {
this.completeJobsAsync = completeJobsAsync;
}

public boolean isStreamEnabled() {
return isStreamEnabled;
}

public void setStreamEnabled(final boolean isStreamEnabled) {
this.isStreamEnabled = isStreamEnabled;
}
}
1 change: 1 addition & 0 deletions benchmarks/project/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,6 @@ app {
completionDelay = 300ms
completeJobsAsync = false
payloadPath = "bpmn/big_payload.json"
streamEnabled = true
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package io.camunda.zeebe.client.api.worker;

/**
* Represents an active job worker that perfors jobs of a certain type. While a registration is
* Represents an active job worker that performs jobs of a certain type. While a registration is
* open, the client continuously receives jobs from the broker and hands them to a registered {@link
* JobHandler}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,10 @@ interface JobWorkerBuilderStep3 {
JobWorkerBuilderStep3 backoffSupplier(BackoffSupplier backoffSupplier);

/**
* Opt-in feature flag to enable job streaming. If called, the job worker will use a mix of
* streaming and polling to activate jobs. A long living stream will be opened onto which jobs
* will be eagerly pushed, and the polling mechanism will be used strictly to fetch jobs created
* <em>before</em> any streams were opened.
* Opt-in feature flag to enable job streaming. If set as enabled, the job worker will use a mix
* of streaming and polling to activate jobs. A long living stream will be opened onto which
* jobs will be eagerly pushed, and the polling mechanism will be used strictly to fetch jobs
* created <em>before</em> any streams were opened.
*
* <p>If the stream is closed, e.g. the server closed the connection, was restarted, etc., it
* will be immediately recreated as long as the worker is opened.
Expand All @@ -211,7 +211,7 @@ interface JobWorkerBuilderStep3 {
* @return the builder for this worker
*/
@ExperimentalApi("https://github.com/camunda/zeebe/issues/11231")
JobWorkerBuilderStep3 enableStreaming();
JobWorkerBuilderStep3 streamEnabled(boolean isStreamEnabled);

/**
* If streaming is enabled, sets a maximum lifetime for a given stream. Once this timeout is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public final class JobWorkerBuilderImpl

public static final BackoffSupplier DEFAULT_BACKOFF_SUPPLIER =
BackoffSupplier.newBackoffBuilder().build();
public static final Duration DEFAULT_STREAMING_TIMEOUT = Duration.ofHours(8);
private final JobClient jobClient;
private final ScheduledExecutorService executorService;
private final List<Closeable> closeables;
Expand Down Expand Up @@ -71,6 +72,7 @@ public JobWorkerBuilderImpl(
pollInterval = configuration.getDefaultJobPollInterval();
requestTimeout = configuration.getDefaultRequestTimeout();
backoffSupplier = DEFAULT_BACKOFF_SUPPLIER;
streamingTimeout = DEFAULT_STREAMING_TIMEOUT;
}

@Override
Expand Down Expand Up @@ -138,8 +140,8 @@ public JobWorkerBuilderStep3 backoffSupplier(final BackoffSupplier backoffSuppli
}

@Override
public JobWorkerBuilderStep3 enableStreaming() {
enableStreaming = true;
public JobWorkerBuilderStep3 streamEnabled(final boolean isStreamEnabled) {
enableStreaming = isStreamEnabled;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ void shouldUseStreamingIfOptedIn() {
.maxJobsActive(30);

// when
builder.enableStreaming().open();
builder.streamEnabled(true).open();

// then
verify(jobClient, atLeast(1)).newStreamJobsCommand();
Expand All @@ -144,10 +144,32 @@ void shouldUseStreamTimeoutInsteadOfRequestTimeout() {
.timeout(1)
.name("test")
.maxJobsActive(30)
.enableStreaming()
.streamEnabled(true)
.open();

// then
verify(lastStep, atLeast(1)).requestTimeout(Duration.ofHours(5));
}

@Test
void shouldTimeoutStreamAfterEightHours() {
// given
final StreamJobsCommandStep3 lastStep = Mockito.mock(Answers.RETURNS_SELF);
Mockito.when(jobClient.newStreamJobsCommand().jobType(anyString()).consumer(any()))
.thenReturn(lastStep);
Mockito.when(lastStep.send()).thenReturn(Mockito.mock());

// when
jobWorkerBuilder
.jobType("type")
.handler((c, j) -> {})
.timeout(1)
.name("test")
.maxJobsActive(30)
.streamEnabled(true)
.open();

// then
verify(lastStep, atLeast(1)).requestTimeout(Duration.ofHours(8));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public void shouldBackoffWhenGatewayRespondsWithResourceExhausted() {
public void shouldOpenStreamIfOptedIn() {
// given
final JobWorkerBuilderStep3 builder =
client.newWorker().jobType("test").handler(NOOP_JOB_HANDLER).enableStreaming();
client.newWorker().jobType("test").handler(NOOP_JOB_HANDLER).streamEnabled(true);

// when
try (final JobWorker ignored = builder.open()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ void shouldStreamAndActivateJobs() {
// when - create a worker that streams, and create a new job after the stream is registered
final var jobHandler = new RecordingJobHandler();
final var builder =
client.getClient().newWorker().jobType(jobType).handler(jobHandler).enableStreaming();
client.getClient().newWorker().jobType(jobType).handler(jobHandler).streamEnabled(true);
try (final var ignored = builder.open()) {
awaitStreamRegistered(jobType);
client.createSingleJob(jobType, b -> {});
Expand All @@ -178,7 +178,7 @@ void shouldRecreateStreamOnGatewayRestart() {
// given
final var jobHandler = new RecordingJobHandler();
final var builder =
client.getClient().newWorker().jobType(jobType).handler(jobHandler).enableStreaming();
client.getClient().newWorker().jobType(jobType).handler(jobHandler).streamEnabled(true);

// when
try (final var ignored = builder.open()) {
Expand Down Expand Up @@ -209,7 +209,7 @@ void shouldRecreateStreamOnGatewayRestart() {

private static JobWorker prepareStreamingWorker(
final String jobType, final JobWorkerBuilderStep3 builder) {
final var worker = builder.enableStreaming().open();
final var worker = builder.streamEnabled(true).open();
awaitStreamRegistered(jobType);
return worker;
}
Expand Down

0 comments on commit bab683c

Please sign in to comment.