From 00f760d6164b0fcfe99c097d433e011431d15bd2 Mon Sep 17 00:00:00 2001 From: Remco Westerhoud Date: Wed, 23 Aug 2023 09:46:56 +0200 Subject: [PATCH 1/3] fix(engine): use drg and decision key instead of command key When we write the DRG/Decision CREATED events on the other partitions we use the command key. This is the key from the Deployment.CREATE command. That means the CREATED events on distributed partitions have a different key from the distributing partition. By taking the DRG/Decision keys from the records itself we align these keys. --- .../processing/deployment/DeploymentCreateProcessor.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/deployment/DeploymentCreateProcessor.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/deployment/DeploymentCreateProcessor.java index 120a5bb4d6a2..724c38ee81a5 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/deployment/DeploymentCreateProcessor.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/deployment/DeploymentCreateProcessor.java @@ -195,14 +195,15 @@ private void createDmnResources( .forEach( decisionRequirementsRecord -> stateWriter.appendFollowUpEvent( - command.getKey(), + decisionRequirementsRecord.getDecisionRequirementsKey(), DecisionRequirementsIntent.CREATED, decisionRequirementsRecord)); deploymentEvent.decisionsMetadata().stream() .filter(not(DecisionRecord::isDuplicate)) .forEach( (record) -> - stateWriter.appendFollowUpEvent(command.getKey(), DecisionIntent.CREATED, record)); + stateWriter.appendFollowUpEvent( + record.getDecisionKey(), DecisionIntent.CREATED, record)); } /** From 212401e0437c9b89ed39fbc49cfcbdf3d5f29676 Mon Sep 17 00:00:00 2001 From: Remco Westerhoud Date: Wed, 23 Aug 2023 09:49:04 +0200 Subject: [PATCH 2/3] refactor(engine): remove unused parameter We no longer use this parameter, thus it can be removed. --- .../processing/deployment/DeploymentCreateProcessor.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/deployment/DeploymentCreateProcessor.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/deployment/DeploymentCreateProcessor.java index 724c38ee81a5..563d74d28adb 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/deployment/DeploymentCreateProcessor.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/deployment/DeploymentCreateProcessor.java @@ -164,7 +164,7 @@ private void transformAndDistributeDeployment(final TypedRecord command) { final var deploymentEvent = command.getValue(); createBpmnResources(deploymentEvent); - createDmnResources(command, deploymentEvent); + createDmnResources(deploymentEvent); final var recordWithoutResource = createDeploymentWithoutResources(deploymentEvent); stateWriter.appendFollowUpEvent( command.getKey(), DeploymentIntent.CREATED, recordWithoutResource); @@ -187,8 +187,7 @@ private void createBpmnResources(final DeploymentRecord deploymentEvent) { }); } - private void createDmnResources( - final TypedRecord command, final DeploymentRecord deploymentEvent) { + private void createDmnResources(final DeploymentRecord deploymentEvent) { deploymentEvent.decisionRequirementsMetadata().stream() .filter(not(DecisionRequirementsMetadataRecord::isDuplicate)) .map(drg -> createDrgRecord(deploymentEvent, drg)) From 108f062012cfeec1edc38e6b66001542b488ce3c Mon Sep 17 00:00:00 2001 From: Remco Westerhoud Date: Wed, 23 Aug 2023 10:54:00 +0200 Subject: [PATCH 3/3] test(engine): verify created events are written with same key Adds test cases to verify Process.CREATED, DecisionRequirements.CREATED and Decision.CREATED events are written with the same key after distribution. --- ...reateDeploymentMultiplePartitionsTest.java | 58 +++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/engine/src/test/java/io/camunda/zeebe/engine/processing/deployment/CreateDeploymentMultiplePartitionsTest.java b/engine/src/test/java/io/camunda/zeebe/engine/processing/deployment/CreateDeploymentMultiplePartitionsTest.java index 6ca3e10e2959..7a1e7a7785c9 100644 --- a/engine/src/test/java/io/camunda/zeebe/engine/processing/deployment/CreateDeploymentMultiplePartitionsTest.java +++ b/engine/src/test/java/io/camunda/zeebe/engine/processing/deployment/CreateDeploymentMultiplePartitionsTest.java @@ -17,12 +17,16 @@ import io.camunda.zeebe.protocol.record.Record; import io.camunda.zeebe.protocol.record.RecordType; import io.camunda.zeebe.protocol.record.intent.CommandDistributionIntent; +import io.camunda.zeebe.protocol.record.intent.DecisionIntent; +import io.camunda.zeebe.protocol.record.intent.DecisionRequirementsIntent; import io.camunda.zeebe.protocol.record.intent.DeploymentIntent; +import io.camunda.zeebe.protocol.record.intent.ProcessIntent; import io.camunda.zeebe.protocol.record.value.CommandDistributionRecordValue; import io.camunda.zeebe.protocol.record.value.DeploymentRecordValue; import io.camunda.zeebe.protocol.record.value.deployment.DecisionRecordValue; import io.camunda.zeebe.protocol.record.value.deployment.DecisionRequirementsMetadataValue; import io.camunda.zeebe.protocol.record.value.deployment.ProcessMetadataValue; +import io.camunda.zeebe.test.util.Strings; import io.camunda.zeebe.test.util.record.RecordingExporter; import io.camunda.zeebe.test.util.record.RecordingExporterTestWatcher; import java.io.ByteArrayOutputStream; @@ -408,6 +412,60 @@ public void shouldNotFilterDifferentDmnResource() { repeatedDrgs.forEach(r -> assertDifferentDrg(originalDrg.get(0), r)); } + @Test + public void shouldWriteProcessCreatedEventsWithSameKeys() { + // given + final var processId = Strings.newRandomValidBpmnId(); + + // when + ENGINE + .deployment() + .withXmlResource( + "process.bpmn", Bpmn.createExecutableProcess(processId).startEvent().endEvent().done()) + .deploy(); + + // then + assertThat( + RecordingExporter.processRecords() + .withIntents(ProcessIntent.CREATED) + .withBpmnProcessId(processId) + .limit(3) + .map(Record::getKey) + .distinct()) + .describedAs("All created events get the same key") + .hasSize(1); + } + + @Test + public void shouldWriteDrgAndDecisionCreatedEventsWithSameKeys() { + // given + final var drgId = "force_users"; + final var decisionId = "jedi_or_sith"; + + // when + ENGINE.deployment().withXmlClasspathResource(DMN_DECISION_TABLE).deploy(); + + // then + assertThat( + RecordingExporter.decisionRequirementsRecords() + .withIntents(DecisionRequirementsIntent.CREATED) + .withDecisionRequirementsId(drgId) + .limit(3) + .map(Record::getKey) + .distinct()) + .describedAs("All created events get the same key") + .hasSize(1); + assertThat( + RecordingExporter.decisionRecords() + .withIntents(DecisionIntent.CREATED) + .withDecisionId(decisionId) + .limit(3) + .map(Record::getKey) + .distinct()) + .describedAs("All created events get the same key") + .hasSize(1); + } + private void assertDeploymentRecordWithoutResources( final Record deployment, final Record createdDeployment) {