Skip to content

Commit

Permalink
fix(gateway): review fixes for gateway max message size assertion
Browse files Browse the repository at this point in the history
* Removed unnecessary if check
* Updated tests to make sure all jobs are created. Otherwise, they might end up with
being flaky tests. Because in the old implementation, when we limit the stream to 5
records, it only throws an exception when the stream has no entries after 5 seconds.
It would wait 5 seconds and then continue if there is only a single job record.
Which can cause flakiness.

fix(gateway): review fixes for gateway max message size assertion

* Removed blocking join() calls
* Removed unused variables
* Added limit to job created assertion to speed up the test execution

test: assert jobs created before worker opened

This helps avoid multiple job activation roundtrips, because the worker
is only opened once all jobs are created.

Otherwise, there's a chance that the worker only receives some of the
jobs and has has to poll for more jobs until the final assertion is met.

(cherry picked from commit d5e3deb)
  • Loading branch information
berkaycanbc committed Feb 5, 2024
1 parent 9359e0e commit 971b017
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -329,11 +329,9 @@ public static JobActivationResult toActivateJobsResponse(
// is still exceeding the maximum response size, we remove the last added job from the response
// and add it to the list of jobs to be reactivated.
// We do this until the response size is below the maximum response size.
if (response.getSerializedSize() > maxResponseSize) {
while (!responseJobs.isEmpty() && response.getSerializedSize() > maxResponseSize) {
sizeExceedingJobs.add(responseJobs.removeLast());
response = ActivateJobsResponse.newBuilder().addAllJobs(responseJobs).build();
}
while (!responseJobs.isEmpty() && response.getSerializedSize() > maxResponseSize) {
sizeExceedingJobs.add(responseJobs.removeLast());
response = ActivateJobsResponse.newBuilder().addAllJobs(responseJobs).build();
}

return new JobActivationResult(response, sizeExceedingJobs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import static org.assertj.core.api.Assertions.assertThat;

import io.camunda.zeebe.broker.test.EmbeddedBrokerRule;
import io.camunda.zeebe.client.api.response.ProcessInstanceEvent;
import io.camunda.zeebe.client.api.worker.JobHandler;
import io.camunda.zeebe.client.api.worker.JobWorker;
import io.camunda.zeebe.client.api.worker.JobWorkerBuilderStep1.JobWorkerBuilderStep3;
Expand All @@ -26,6 +25,7 @@
import java.time.Duration;
import java.util.Map;
import java.util.Random;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.Before;
import org.junit.ClassRule;
Expand Down Expand Up @@ -175,26 +175,26 @@ public void shouldActivateJobsByRespectingMaxMessageSize() {

final int numberOfJobsToActivate = 5;
for (int i = 0; i < numberOfJobsToActivate; i++) {
final ProcessInstanceEvent event =
CLIENT_RULE
.getClient()
.newCreateInstanceCommand()
.bpmnProcessId("foo")
.latestVersion()
.variables(Map.of("message_content", message))
.send()
.join();
CLIENT_RULE
.getClient()
.newCreateInstanceCommand()
.bpmnProcessId("foo")
.latestVersion()
.variables(Map.of("message_content", message))
.send();
}

Assertions.assertThat(
RecordingExporter.jobRecords(JobIntent.CREATED)
.withType("foo")
.limit(numberOfJobsToActivate))
.describedAs("Expect that all jobs are created.")
.hasSize(numberOfJobsToActivate);

// when
final JobWorkerBuilderStep3 builder =
CLIENT_RULE.getClient().newWorker().jobType("foo").handler(COMPLETING_JOB_HANDLER);

RecordingExporter.jobRecords(JobIntent.CREATED)
.withType("foo")
.limit(numberOfJobsToActivate)
.await();

// then
try (final JobWorker ignored = builder.open()) {
Awaitility.await("until all jobs are completed")
Expand Down Expand Up @@ -246,26 +246,26 @@ public void shouldActivateJobsByRespectingMaxMessageSize() {

final int numberOfJobsToActivate = 5;
for (int i = 0; i < numberOfJobsToActivate; i++) {
final ProcessInstanceEvent event =
CLIENT_RULE
.getClient()
.newCreateInstanceCommand()
.bpmnProcessId("foo")
.latestVersion()
.variables(Map.of("message_content", message))
.send()
.join();
CLIENT_RULE
.getClient()
.newCreateInstanceCommand()
.bpmnProcessId("foo")
.latestVersion()
.variables(Map.of("message_content", message))
.send();
}

Assertions.assertThat(
RecordingExporter.jobRecords(JobIntent.CREATED)
.withType("foo")
.limit(numberOfJobsToActivate))
.describedAs("Expect that all jobs are created.")
.hasSize(numberOfJobsToActivate);

// when
final JobWorkerBuilderStep3 builder =
CLIENT_RULE.getClient().newWorker().jobType("foo").handler(COMPLETING_JOB_HANDLER);

RecordingExporter.jobRecords(JobIntent.CREATED)
.withType("foo")
.limit(numberOfJobsToActivate)
.await();

// then
try (final JobWorker ignored = builder.open()) {
Awaitility.await("until all jobs are completed")
Expand Down

0 comments on commit 971b017

Please sign in to comment.