Skip to content

Commit

Permalink
test(qa-integration-tests): Zeebe Gateway fails to stream out activat…
Browse files Browse the repository at this point in the history
…ed jobs by not respecting the maxMessageSize

The test is added to verify gateway failure with RESOURCE_EXHAUSTED.
In the test case, a process instance with 1MB sized variable is created.
In the broker JobBatchCollector sends jobs that has size (<4MB) to the
gateway but since the size of the response becomes more than 4MB after
converting MessagePack to JSON, gateway fails to stream out jobs with
RESOURCE_EXHAUSTED.

(cherry picked from commit a6a6b95)
  • Loading branch information
berkaycanbc committed Feb 5, 2024
1 parent 5ef860e commit a30eb37
Showing 1 changed file with 77 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,27 @@
*/
package io.camunda.zeebe.it.network;

import static org.assertj.core.api.Assertions.assertThat;

import io.camunda.zeebe.broker.test.EmbeddedBrokerRule;
import io.camunda.zeebe.client.api.response.ProcessInstanceEvent;
import io.camunda.zeebe.client.api.worker.JobHandler;
import io.camunda.zeebe.client.api.worker.JobWorker;
import io.camunda.zeebe.client.api.worker.JobWorkerBuilderStep1.JobWorkerBuilderStep3;
import io.camunda.zeebe.it.util.GrpcClientRule;
import io.camunda.zeebe.it.util.ZeebeAssertHelper;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.model.bpmn.BpmnModelInstance;
import io.camunda.zeebe.protocol.record.intent.JobIntent;
import io.camunda.zeebe.protocol.record.value.BpmnElementType;
import io.camunda.zeebe.test.util.BrokerClassRuleHelper;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import io.camunda.zeebe.util.ByteValue;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Map;
import java.util.Random;
import org.awaitility.Awaitility;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
Expand All @@ -24,6 +37,10 @@

public final class LargeMessageSizeTest {

private static final JobHandler COMPLETING_JOB_HANDLER =
(client, job) -> {
client.newCompleteCommand(job.getKey()).send().join();
};
private static final DataSize MAX_MESSAGE_SIZE = DataSize.ofMegabytes(5);
// only use half of the max message size because some commands produce two events
private static final long LARGE_SIZE = ByteValue.ofMegabytes(1);
Expand Down Expand Up @@ -134,4 +151,64 @@ public void shouldCompleteJobWithLargeVariables() {
// then
ZeebeAssertHelper.assertProcessInstanceCompleted(processInstanceEvent.getProcessInstanceKey());
}

@Test
public void shouldActivateJobsByRespectingMaxMessageSize() {
// given
final var modelInstance =
Bpmn.createExecutableProcess("foo")
.startEvent()
.serviceTask()
.zeebeJobType("foo")
.endEvent()
.done();

CLIENT_RULE
.getClient()
.newDeployResourceCommand()
.addProcessModel(modelInstance, "foo.bpmn")
.send()
.join();

final var byteArray = new byte[1024 * 1024]; // 1 MB
new Random().nextBytes(byteArray);
final var message = new String(byteArray, StandardCharsets.UTF_8);

final int numberOfJobsToActivate = 5;
for (int i = 0; i < numberOfJobsToActivate; i++) {
final ProcessInstanceEvent event =
CLIENT_RULE
.getClient()
.newCreateInstanceCommand()
.bpmnProcessId("foo")
.latestVersion()
.variables(Map.of("message_content", message))
.send()
.join();

RecordingExporter.processInstanceRecords()
.withProcessInstanceKey(event.getProcessInstanceKey())
.withElementType(BpmnElementType.PROCESS)
.await();
}

// when
final JobWorkerBuilderStep3 builder =
CLIENT_RULE.getClient().newWorker().jobType("foo").handler(COMPLETING_JOB_HANDLER);

// then
try (final JobWorker ignored = builder.open()) {
Awaitility.await("until all jobs are completed")
.pollInterval(Duration.ofMillis(100))
.atMost(Duration.ofSeconds(5))
.untilAsserted(
() ->
assertThat(
RecordingExporter.jobRecords(JobIntent.COMPLETED)
.withType("foo")
.limit(numberOfJobsToActivate)
.count())
.isEqualTo(numberOfJobsToActivate));
}
}
}

0 comments on commit a30eb37

Please sign in to comment.