Skip to content

Commit

Permalink
fix(engine): add grace period to detect and of processing
Browse files Browse the repository at this point in the history
This commit adds a grace period after the end of processing has been reached.
Only when no new records are added to the logstream during the grace period, do
we proceed with the test.

This mechanism is a bit crude, but it has been utilized to great effect in
zeebe-process-test already. There we tried out different implementations and
found that this mechanism works best.

Downside is that this will make the tests run longer.
Also the exact value for GRACE_PERIOD may need to be tuned. The 50ms were
also taken from experiences in zeebe-process-test. But there we are working
with an in memory engine, so the value might not be optimal here.

(cherry picked from commit 0e61a9e)
  • Loading branch information
pihme authored and github-actions[bot] committed Jul 26, 2022
1 parent d349d79 commit 7a434a5
Showing 1 changed file with 53 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.camunda.zeebe.test.util.bpmn.random.TestDataGenerator.TestDataRecord;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import java.util.Collection;
import java.util.Map;
import org.assertj.core.api.SoftAssertions;
import org.awaitility.Awaitility;
import org.junit.Before;
Expand All @@ -38,6 +39,9 @@ public class ReplayStateRandomizedPropertyTest {
private static final String PROCESS_COUNT = System.getProperty("processCount", "3");
private static final String EXECUTION_PATH_COUNT =
System.getProperty("replayExecutionCount", "1");
/* Grace period to wait if new records come in after processing has reached end */
private static final long GRACE_PERIOD = 50; // ms

@Parameter public TestDataRecord record;

@Rule
Expand Down Expand Up @@ -101,7 +105,12 @@ public void shouldRestoreStateAtEachStepInExecution() {

private void stopAndRestartEngineAndCompareStates() {
// given
waitForProcessingToStop();
Awaitility.await(
"await the last written record to be processed, then wait a GRACE_PERIOD to make sure no new events are added")
.untilAsserted(
() -> {
processingHasStoppedAndNoNewRecordsAreAddedDuringGracePeriod();
});

engineRule.pauseProcessing(1);

Expand All @@ -122,51 +131,58 @@ private void stopAndRestartEngineAndCompareStates() {
.untilAsserted(
() -> {
final var replayState = engineRule.collectState();
assertIdenticalStates(processingState, replayState);
});
}

final var softly = new SoftAssertions();

processingState.entrySet().stream()
.filter(entry -> entry.getKey() != ZbColumnFamilies.DEFAULT)
.forEach(
entry -> {
final var column = entry.getKey();
final var processingEntries = entry.getValue();
final var replayEntries = replayState.get(column);

if (processingEntries.isEmpty()) {
softly
.assertThat(replayEntries)
.describedAs(
"The state column '%s' should be empty after replay", column)
.isEmpty();
} else {
softly
.assertThat(replayEntries)
.describedAs(
"The state column '%s' has different entries after replay",
column)
.containsExactlyInAnyOrderEntriesOf(processingEntries);
}
});

softly.assertAll();
private void assertIdenticalStates(
final Map<ZbColumnFamilies, Map<Object, Object>> expectedState,
final Map<ZbColumnFamilies, Map<Object, Object>> actualState) {
final var softly = new SoftAssertions();
expectedState.entrySet().stream()
.filter(entry -> entry.getKey() != ZbColumnFamilies.DEFAULT)
.forEach(
entry -> {
final var column = entry.getKey();
final var expectedEntries = entry.getValue();
final var actualEntries = actualState.get(column);

if (expectedEntries.isEmpty()) {
softly
.assertThat(actualEntries)
.describedAs("The state column '%s' should be empty", column)
.isEmpty();
} else {
softly
.assertThat(actualEntries)
.describedAs("The state column '%s' has different entries", column)
.containsExactlyInAnyOrderEntriesOf(expectedEntries);
}
});

softly.assertAll();
}

private void waitForProcessingToStop() {
Awaitility.await("await the last written record to be processed")
.untilAsserted(
() ->
assertThat(engineRule.hasReachedEnd())
.describedAs("Processing has reached end of the log.")
.isTrue());
private void processingHasStoppedAndNoNewRecordsAreAddedDuringGracePeriod()
throws InterruptedException {
assertThat(engineRule.hasReachedEnd())
.describedAs("Processing has reached end of the log.")
.isTrue();
final var stateBeforeGracePeriod = engineRule.collectState();
Thread.sleep(GRACE_PERIOD);
assertThat(engineRule.hasReachedEnd())
.describedAs("Processing has reached end of the log.")
.isTrue();
final var stateAfterGracePeriod = engineRule.collectState();

assertIdenticalStates(stateBeforeGracePeriod, stateAfterGracePeriod);
}

@Parameters(name = "{0}")
public static Collection<TestDataRecord> getTestRecords() {
// use the following code to rerun a specific test case:
// final var processSeed = 3499044774323385558L;
// final var executionPathSeed = 3627169465144620203L;
// final var processSeed = 6163452194952018956L;
// final var executionPathSeed = 6499103602285813109L;
// return List.of(TestDataGenerator.regenerateTestRecord(processSeed, executionPathSeed));
return TestDataGenerator.generateTestRecords(
Integer.parseInt(PROCESS_COUNT), Integer.parseInt(EXECUTION_PATH_COUNT));
Expand Down

0 comments on commit 7a434a5

Please sign in to comment.