From e99f0b753a19714aa60d5d5d563f6f3cb7e63a09 Mon Sep 17 00:00:00 2001 From: rodrigolourencolopes Date: Tue, 22 Aug 2023 11:55:57 +0200 Subject: [PATCH 1/3] feat: set default stream timeout --- .../api/worker/JobWorkerBuilderStep1.java | 14 ++++++++++++ .../impl/worker/JobWorkerBuilderImpl.java | 8 +++++++ .../impl/worker/JobWorkerBuilderImplTest.java | 22 +++++++++++++++++++ 3 files changed, 44 insertions(+) diff --git a/clients/java/src/main/java/io/camunda/zeebe/client/api/worker/JobWorkerBuilderStep1.java b/clients/java/src/main/java/io/camunda/zeebe/client/api/worker/JobWorkerBuilderStep1.java index 398545803b65..f34c3393fb44 100644 --- a/clients/java/src/main/java/io/camunda/zeebe/client/api/worker/JobWorkerBuilderStep1.java +++ b/clients/java/src/main/java/io/camunda/zeebe/client/api/worker/JobWorkerBuilderStep1.java @@ -213,6 +213,20 @@ interface JobWorkerBuilderStep3 { @ExperimentalApi("https://github.com/camunda/zeebe/issues/11231") JobWorkerBuilderStep3 enableStreaming(); + /** + * Opt-in feature flag to disable job streaming. + * + *

If the stream is closed, e.g. the server closed the connection, was restarted, etc., it + * will be immediately recreated as long as the worker is opened. + * + *

NOTE: Job streaming is still under active development, and should be disabled if you + * notice any issues. + * + * @return the builder for this worker + */ + @ExperimentalApi("https://github.com/camunda/zeebe/issues/11231") + JobWorkerBuilderStep3 disableStreaming(); + /** * If streaming is enabled, sets a maximum lifetime for a given stream. Once this timeout is * reached, the stream is closed, such that no more jobs are activated and received. If the diff --git a/clients/java/src/main/java/io/camunda/zeebe/client/impl/worker/JobWorkerBuilderImpl.java b/clients/java/src/main/java/io/camunda/zeebe/client/impl/worker/JobWorkerBuilderImpl.java index a5fced250c6b..18dbfee65796 100644 --- a/clients/java/src/main/java/io/camunda/zeebe/client/impl/worker/JobWorkerBuilderImpl.java +++ b/clients/java/src/main/java/io/camunda/zeebe/client/impl/worker/JobWorkerBuilderImpl.java @@ -40,6 +40,7 @@ public final class JobWorkerBuilderImpl public static final BackoffSupplier DEFAULT_BACKOFF_SUPPLIER = BackoffSupplier.newBackoffBuilder().build(); + public static final Duration DEFAULT_STREAMING_TIMEOUT = Duration.ofHours(8); private final JobClient jobClient; private final ScheduledExecutorService executorService; private final List closeables; @@ -71,6 +72,7 @@ public JobWorkerBuilderImpl( pollInterval = configuration.getDefaultJobPollInterval(); requestTimeout = configuration.getDefaultRequestTimeout(); backoffSupplier = DEFAULT_BACKOFF_SUPPLIER; + streamingTimeout = DEFAULT_STREAMING_TIMEOUT; } @Override @@ -143,6 +145,12 @@ public JobWorkerBuilderStep3 enableStreaming() { return this; } + @Override + public JobWorkerBuilderStep3 disableStreaming() { + enableStreaming = false; + return this; + } + @Override public JobWorkerBuilderStep3 streamTimeout(final Duration timeout) { streamingTimeout = timeout; diff --git a/clients/java/src/test/java/io/camunda/zeebe/client/impl/worker/JobWorkerBuilderImplTest.java b/clients/java/src/test/java/io/camunda/zeebe/client/impl/worker/JobWorkerBuilderImplTest.java index 14ff40b6c50f..c4745481396f 100644 --- a/clients/java/src/test/java/io/camunda/zeebe/client/impl/worker/JobWorkerBuilderImplTest.java +++ b/clients/java/src/test/java/io/camunda/zeebe/client/impl/worker/JobWorkerBuilderImplTest.java @@ -150,4 +150,26 @@ void shouldUseStreamTimeoutInsteadOfRequestTimeout() { // then verify(lastStep, atLeast(1)).requestTimeout(Duration.ofHours(5)); } + + @Test + void shouldTimeoutStreamAfterEightHours() { + // given + final StreamJobsCommandStep3 lastStep = Mockito.mock(Answers.RETURNS_SELF); + Mockito.when(jobClient.newStreamJobsCommand().jobType(anyString()).consumer(any())) + .thenReturn(lastStep); + Mockito.when(lastStep.send()).thenReturn(Mockito.mock()); + + // when + jobWorkerBuilder + .jobType("type") + .handler((c, j) -> {}) + .timeout(1) + .name("test") + .maxJobsActive(30) + .enableStreaming() + .open(); + + // then + verify(lastStep, atLeast(1)).requestTimeout(Duration.ofHours(8)); + } } From 495eac8ae18237721082793abce45b1ca258e71b Mon Sep 17 00:00:00 2001 From: rodrigolourencolopes Date: Tue, 22 Aug 2023 12:29:00 +0200 Subject: [PATCH 2/3] fix: typos --- .github/workflows/benchmark.yml | 4 ++-- .../java/io/camunda/zeebe/client/api/worker/JobWorker.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index fa89ea7e37fe..744fca7ce7dc 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -18,7 +18,7 @@ on: default: europe-west1-b required: false benchmark-load: - description: 'Specifies which benchmark components to deploy. `starter`, `timer` and `publisher` can be assigned with the rate at which they publish. Allows arbitary helm arguments, like --set starter.rate=100' + description: 'Specifies which benchmark components to deploy. `starter`, `timer` and `publisher` can be assigned with the rate at which they publish. Allows arbitrary helm arguments, like --set starter.rate=100' required: false publish: description: 'Where to publish the results, can be "slack" or "comment"' @@ -58,7 +58,7 @@ on: type: string required: false benchmark-load: - description: 'Specifies which benchmark components to deploy. `starter`, `timer` and `publisher` can be assigned with the rate at which they publish. Allows arbitary helm arguments, like --set starter.rate=100' + description: 'Specifies which benchmark components to deploy. `starter`, `timer` and `publisher` can be assigned with the rate at which they publish. Allows arbitrary helm arguments, like --set starter.rate=100' type: string required: false measure: diff --git a/clients/java/src/main/java/io/camunda/zeebe/client/api/worker/JobWorker.java b/clients/java/src/main/java/io/camunda/zeebe/client/api/worker/JobWorker.java index 92471ecdc210..30176ef37512 100644 --- a/clients/java/src/main/java/io/camunda/zeebe/client/api/worker/JobWorker.java +++ b/clients/java/src/main/java/io/camunda/zeebe/client/api/worker/JobWorker.java @@ -16,7 +16,7 @@ package io.camunda.zeebe.client.api.worker; /** - * Represents an active job worker that perfors jobs of a certain type. While a registration is + * Represents an active job worker that performs jobs of a certain type. While a registration is * open, the client continuously receives jobs from the broker and hands them to a registered {@link * JobHandler}. */ From 64670143a5e75d92615dffbedb599ad1208063a7 Mon Sep 17 00:00:00 2001 From: rodrigolourencolopes Date: Wed, 23 Aug 2023 10:55:55 +0200 Subject: [PATCH 3/3] feat: enable streaming as default in all benchmarks --- .../main/java/io/camunda/zeebe/Worker.java | 2 ++ .../io/camunda/zeebe/config/WorkerCfg.java | 28 +++++++++++-------- .../src/main/resources/application.conf | 1 + .../api/worker/JobWorkerBuilderStep1.java | 24 ++++------------ .../impl/worker/JobWorkerBuilderImpl.java | 10 ++----- .../impl/worker/JobWorkerBuilderImplTest.java | 6 ++-- .../client/impl/worker/JobWorkerImplTest.java | 2 +- .../it/client/command/JobWorkerTest.java | 6 ++-- 8 files changed, 34 insertions(+), 45 deletions(-) diff --git a/benchmarks/project/src/main/java/io/camunda/zeebe/Worker.java b/benchmarks/project/src/main/java/io/camunda/zeebe/Worker.java index d5ae327f42f0..8179928ef51d 100644 --- a/benchmarks/project/src/main/java/io/camunda/zeebe/Worker.java +++ b/benchmarks/project/src/main/java/io/camunda/zeebe/Worker.java @@ -40,6 +40,7 @@ public void run() { final WorkerCfg workerCfg = appCfg.getWorker(); final String jobType = workerCfg.getJobType(); final long completionDelay = workerCfg.getCompletionDelay().toMillis(); + final boolean isStreamEnabled = workerCfg.isStreamEnabled(); final var variables = readVariables(workerCfg.getPayloadPath()); final BlockingQueue> requestFutures = new ArrayBlockingQueue<>(10_000); final BlockingDeque delayedCommands = new LinkedBlockingDeque<>(10_000); @@ -67,6 +68,7 @@ public void run() { requestFutures.add(command.send()); } }) + .streamEnabled(isStreamEnabled) .open(); final ResponseChecker responseChecker = new ResponseChecker(requestFutures); diff --git a/benchmarks/project/src/main/java/io/camunda/zeebe/config/WorkerCfg.java b/benchmarks/project/src/main/java/io/camunda/zeebe/config/WorkerCfg.java index 5e3b135d615a..fee25c2a643c 100644 --- a/benchmarks/project/src/main/java/io/camunda/zeebe/config/WorkerCfg.java +++ b/benchmarks/project/src/main/java/io/camunda/zeebe/config/WorkerCfg.java @@ -21,21 +21,19 @@ public class WorkerCfg { private String jobType; private String workerName; - private int threads; - private int capacity; private Duration pollingDelay; private Duration completionDelay; private boolean completeJobsAsync; - private String payloadPath; + private boolean isStreamEnabled; public String getJobType() { return jobType; } - public void setJobType(String jobType) { + public void setJobType(final String jobType) { this.jobType = jobType; } @@ -43,7 +41,7 @@ public String getWorkerName() { return workerName; } - public void setWorkerName(String workerName) { + public void setWorkerName(final String workerName) { this.workerName = workerName; } @@ -51,7 +49,7 @@ public int getThreads() { return threads; } - public void setThreads(int threads) { + public void setThreads(final int threads) { this.threads = threads; } @@ -59,7 +57,7 @@ public int getCapacity() { return capacity; } - public void setCapacity(int capacity) { + public void setCapacity(final int capacity) { this.capacity = capacity; } @@ -67,7 +65,7 @@ public Duration getPollingDelay() { return pollingDelay; } - public void setPollingDelay(Duration pollingDelay) { + public void setPollingDelay(final Duration pollingDelay) { this.pollingDelay = pollingDelay; } @@ -75,7 +73,7 @@ public Duration getCompletionDelay() { return completionDelay; } - public void setCompletionDelay(Duration completionDelay) { + public void setCompletionDelay(final Duration completionDelay) { this.completionDelay = completionDelay; } @@ -83,7 +81,7 @@ public String getPayloadPath() { return payloadPath; } - public void setPayloadPath(String payloadPath) { + public void setPayloadPath(final String payloadPath) { this.payloadPath = payloadPath; } @@ -91,7 +89,15 @@ public boolean isCompleteJobsAsync() { return completeJobsAsync; } - public void setCompleteJobsAsync(boolean completeJobsAsync) { + public void setCompleteJobsAsync(final boolean completeJobsAsync) { this.completeJobsAsync = completeJobsAsync; } + + public boolean isStreamEnabled() { + return isStreamEnabled; + } + + public void setStreamEnabled(final boolean isStreamEnabled) { + this.isStreamEnabled = isStreamEnabled; + } } diff --git a/benchmarks/project/src/main/resources/application.conf b/benchmarks/project/src/main/resources/application.conf index 2a5bfc66a2ae..c1bb28e1d563 100644 --- a/benchmarks/project/src/main/resources/application.conf +++ b/benchmarks/project/src/main/resources/application.conf @@ -26,5 +26,6 @@ app { completionDelay = 300ms completeJobsAsync = false payloadPath = "bpmn/big_payload.json" + streamEnabled = true } } diff --git a/clients/java/src/main/java/io/camunda/zeebe/client/api/worker/JobWorkerBuilderStep1.java b/clients/java/src/main/java/io/camunda/zeebe/client/api/worker/JobWorkerBuilderStep1.java index f34c3393fb44..e81bee4f25dc 100644 --- a/clients/java/src/main/java/io/camunda/zeebe/client/api/worker/JobWorkerBuilderStep1.java +++ b/clients/java/src/main/java/io/camunda/zeebe/client/api/worker/JobWorkerBuilderStep1.java @@ -197,10 +197,10 @@ interface JobWorkerBuilderStep3 { JobWorkerBuilderStep3 backoffSupplier(BackoffSupplier backoffSupplier); /** - * Opt-in feature flag to enable job streaming. If called, the job worker will use a mix of - * streaming and polling to activate jobs. A long living stream will be opened onto which jobs - * will be eagerly pushed, and the polling mechanism will be used strictly to fetch jobs created - * before any streams were opened. + * Opt-in feature flag to enable job streaming. If set as enabled, the job worker will use a mix + * of streaming and polling to activate jobs. A long living stream will be opened onto which + * jobs will be eagerly pushed, and the polling mechanism will be used strictly to fetch jobs + * created before any streams were opened. * *

If the stream is closed, e.g. the server closed the connection, was restarted, etc., it * will be immediately recreated as long as the worker is opened. @@ -211,21 +211,7 @@ interface JobWorkerBuilderStep3 { * @return the builder for this worker */ @ExperimentalApi("https://github.com/camunda/zeebe/issues/11231") - JobWorkerBuilderStep3 enableStreaming(); - - /** - * Opt-in feature flag to disable job streaming. - * - *

If the stream is closed, e.g. the server closed the connection, was restarted, etc., it - * will be immediately recreated as long as the worker is opened. - * - *

NOTE: Job streaming is still under active development, and should be disabled if you - * notice any issues. - * - * @return the builder for this worker - */ - @ExperimentalApi("https://github.com/camunda/zeebe/issues/11231") - JobWorkerBuilderStep3 disableStreaming(); + JobWorkerBuilderStep3 streamEnabled(boolean isStreamEnabled); /** * If streaming is enabled, sets a maximum lifetime for a given stream. Once this timeout is diff --git a/clients/java/src/main/java/io/camunda/zeebe/client/impl/worker/JobWorkerBuilderImpl.java b/clients/java/src/main/java/io/camunda/zeebe/client/impl/worker/JobWorkerBuilderImpl.java index 18dbfee65796..b4d8f0b5e60f 100644 --- a/clients/java/src/main/java/io/camunda/zeebe/client/impl/worker/JobWorkerBuilderImpl.java +++ b/clients/java/src/main/java/io/camunda/zeebe/client/impl/worker/JobWorkerBuilderImpl.java @@ -140,14 +140,8 @@ public JobWorkerBuilderStep3 backoffSupplier(final BackoffSupplier backoffSuppli } @Override - public JobWorkerBuilderStep3 enableStreaming() { - enableStreaming = true; - return this; - } - - @Override - public JobWorkerBuilderStep3 disableStreaming() { - enableStreaming = false; + public JobWorkerBuilderStep3 streamEnabled(final boolean isStreamEnabled) { + enableStreaming = isStreamEnabled; return this; } diff --git a/clients/java/src/test/java/io/camunda/zeebe/client/impl/worker/JobWorkerBuilderImplTest.java b/clients/java/src/test/java/io/camunda/zeebe/client/impl/worker/JobWorkerBuilderImplTest.java index c4745481396f..2c273d8e2476 100644 --- a/clients/java/src/test/java/io/camunda/zeebe/client/impl/worker/JobWorkerBuilderImplTest.java +++ b/clients/java/src/test/java/io/camunda/zeebe/client/impl/worker/JobWorkerBuilderImplTest.java @@ -121,7 +121,7 @@ void shouldUseStreamingIfOptedIn() { .maxJobsActive(30); // when - builder.enableStreaming().open(); + builder.streamEnabled(true).open(); // then verify(jobClient, atLeast(1)).newStreamJobsCommand(); @@ -144,7 +144,7 @@ void shouldUseStreamTimeoutInsteadOfRequestTimeout() { .timeout(1) .name("test") .maxJobsActive(30) - .enableStreaming() + .streamEnabled(true) .open(); // then @@ -166,7 +166,7 @@ void shouldTimeoutStreamAfterEightHours() { .timeout(1) .name("test") .maxJobsActive(30) - .enableStreaming() + .streamEnabled(true) .open(); // then diff --git a/clients/java/src/test/java/io/camunda/zeebe/client/impl/worker/JobWorkerImplTest.java b/clients/java/src/test/java/io/camunda/zeebe/client/impl/worker/JobWorkerImplTest.java index 545db72acf39..ec20bae94c5a 100644 --- a/clients/java/src/test/java/io/camunda/zeebe/client/impl/worker/JobWorkerImplTest.java +++ b/clients/java/src/test/java/io/camunda/zeebe/client/impl/worker/JobWorkerImplTest.java @@ -125,7 +125,7 @@ public void shouldBackoffWhenGatewayRespondsWithResourceExhausted() { public void shouldOpenStreamIfOptedIn() { // given final JobWorkerBuilderStep3 builder = - client.newWorker().jobType("test").handler(NOOP_JOB_HANDLER).enableStreaming(); + client.newWorker().jobType("test").handler(NOOP_JOB_HANDLER).streamEnabled(true); // when try (final JobWorker ignored = builder.open()) { diff --git a/qa/integration-tests/src/test/java/io/camunda/zeebe/it/client/command/JobWorkerTest.java b/qa/integration-tests/src/test/java/io/camunda/zeebe/it/client/command/JobWorkerTest.java index 85c27a2ab248..fd7822ae75d3 100644 --- a/qa/integration-tests/src/test/java/io/camunda/zeebe/it/client/command/JobWorkerTest.java +++ b/qa/integration-tests/src/test/java/io/camunda/zeebe/it/client/command/JobWorkerTest.java @@ -162,7 +162,7 @@ void shouldStreamAndActivateJobs() { // when - create a worker that streams, and create a new job after the stream is registered final var jobHandler = new RecordingJobHandler(); final var builder = - client.getClient().newWorker().jobType(jobType).handler(jobHandler).enableStreaming(); + client.getClient().newWorker().jobType(jobType).handler(jobHandler).streamEnabled(true); try (final var ignored = builder.open()) { awaitStreamRegistered(jobType); client.createSingleJob(jobType, b -> {}); @@ -178,7 +178,7 @@ void shouldRecreateStreamOnGatewayRestart() { // given final var jobHandler = new RecordingJobHandler(); final var builder = - client.getClient().newWorker().jobType(jobType).handler(jobHandler).enableStreaming(); + client.getClient().newWorker().jobType(jobType).handler(jobHandler).streamEnabled(true); // when try (final var ignored = builder.open()) { @@ -209,7 +209,7 @@ void shouldRecreateStreamOnGatewayRestart() { private static JobWorker prepareStreamingWorker( final String jobType, final JobWorkerBuilderStep3 builder) { - final var worker = builder.enableStreaming().open(); + final var worker = builder.streamEnabled(true).open(); awaitStreamRegistered(jobType); return worker; }