Skip to content

Commit

Permalink
merge: #9895 #9897
Browse files Browse the repository at this point in the history
9895: [Backport stable/8.0] test(qa): wait until message is published before restarting the broker r=deepthidevaki a=backport-action

# Description
Backport of #9886 to `stable/8.0`.

relates to #9813

9897: [Backport stable/8.0] fix(engine): add grace period to detect end of processing r=remcowesterhoud a=backport-action

# Description
Backport of #9082 to `stable/8.0`.

relates to #8738

closes #9641

Co-authored-by: Deepthi Devaki Akkoorath <[email protected]>
Co-authored-by: pihme <[email protected]>
  • Loading branch information
3 people authored Jul 26, 2022
3 parents d349d79 + a863a22 + 7a434a5 commit d32e2b7
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.camunda.zeebe.test.util.bpmn.random.TestDataGenerator.TestDataRecord;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import java.util.Collection;
import java.util.Map;
import org.assertj.core.api.SoftAssertions;
import org.awaitility.Awaitility;
import org.junit.Before;
Expand All @@ -38,6 +39,9 @@ public class ReplayStateRandomizedPropertyTest {
private static final String PROCESS_COUNT = System.getProperty("processCount", "3");
private static final String EXECUTION_PATH_COUNT =
System.getProperty("replayExecutionCount", "1");
/* Grace period to wait if new records come in after processing has reached end */
private static final long GRACE_PERIOD = 50; // ms

@Parameter public TestDataRecord record;

@Rule
Expand Down Expand Up @@ -101,7 +105,12 @@ public void shouldRestoreStateAtEachStepInExecution() {

private void stopAndRestartEngineAndCompareStates() {
// given
waitForProcessingToStop();
Awaitility.await(
"await the last written record to be processed, then wait a GRACE_PERIOD to make sure no new events are added")
.untilAsserted(
() -> {
processingHasStoppedAndNoNewRecordsAreAddedDuringGracePeriod();
});

engineRule.pauseProcessing(1);

Expand All @@ -122,51 +131,58 @@ private void stopAndRestartEngineAndCompareStates() {
.untilAsserted(
() -> {
final var replayState = engineRule.collectState();
assertIdenticalStates(processingState, replayState);
});
}

final var softly = new SoftAssertions();

processingState.entrySet().stream()
.filter(entry -> entry.getKey() != ZbColumnFamilies.DEFAULT)
.forEach(
entry -> {
final var column = entry.getKey();
final var processingEntries = entry.getValue();
final var replayEntries = replayState.get(column);

if (processingEntries.isEmpty()) {
softly
.assertThat(replayEntries)
.describedAs(
"The state column '%s' should be empty after replay", column)
.isEmpty();
} else {
softly
.assertThat(replayEntries)
.describedAs(
"The state column '%s' has different entries after replay",
column)
.containsExactlyInAnyOrderEntriesOf(processingEntries);
}
});

softly.assertAll();
private void assertIdenticalStates(
final Map<ZbColumnFamilies, Map<Object, Object>> expectedState,
final Map<ZbColumnFamilies, Map<Object, Object>> actualState) {
final var softly = new SoftAssertions();
expectedState.entrySet().stream()
.filter(entry -> entry.getKey() != ZbColumnFamilies.DEFAULT)
.forEach(
entry -> {
final var column = entry.getKey();
final var expectedEntries = entry.getValue();
final var actualEntries = actualState.get(column);

if (expectedEntries.isEmpty()) {
softly
.assertThat(actualEntries)
.describedAs("The state column '%s' should be empty", column)
.isEmpty();
} else {
softly
.assertThat(actualEntries)
.describedAs("The state column '%s' has different entries", column)
.containsExactlyInAnyOrderEntriesOf(expectedEntries);
}
});

softly.assertAll();
}

private void waitForProcessingToStop() {
Awaitility.await("await the last written record to be processed")
.untilAsserted(
() ->
assertThat(engineRule.hasReachedEnd())
.describedAs("Processing has reached end of the log.")
.isTrue());
private void processingHasStoppedAndNoNewRecordsAreAddedDuringGracePeriod()
throws InterruptedException {
assertThat(engineRule.hasReachedEnd())
.describedAs("Processing has reached end of the log.")
.isTrue();
final var stateBeforeGracePeriod = engineRule.collectState();
Thread.sleep(GRACE_PERIOD);
assertThat(engineRule.hasReachedEnd())
.describedAs("Processing has reached end of the log.")
.isTrue();
final var stateAfterGracePeriod = engineRule.collectState();

assertIdenticalStates(stateBeforeGracePeriod, stateAfterGracePeriod);
}

@Parameters(name = "{0}")
public static Collection<TestDataRecord> getTestRecords() {
// use the following code to rerun a specific test case:
// final var processSeed = 3499044774323385558L;
// final var executionPathSeed = 3627169465144620203L;
// final var processSeed = 6163452194952018956L;
// final var executionPathSeed = 6499103602285813109L;
// return List.of(TestDataGenerator.regenerateTestRecord(processSeed, executionPathSeed));
return TestDataGenerator.generateTestRecords(
Integer.parseInt(PROCESS_COUNT), Integer.parseInt(EXECUTION_PATH_COUNT));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public static Object[][] actions() {
.newPublishMessageCommand()
.messageName("msg")
.correlationKey("123")
.send(),
.send()
.join(),
(BiConsumer<ClusteringRule, GrpcClientRule>)
(clusteringRule, clientRule) -> {
final var processDefinitionKey =
Expand Down

0 comments on commit d32e2b7

Please sign in to comment.