From 4b50c599386bab47f0047548faeb6b68f33070e8 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Wed, 22 Jan 2025 10:20:40 -0800 Subject: [PATCH] Split up & use more appropriate timeouts for parallel LAs test --- .../ParallelLocalActivitiesTest.java | 42 ++++++++++++++----- 1 file changed, 31 insertions(+), 11 deletions(-) diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/activityTests/ParallelLocalActivitiesTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/activityTests/ParallelLocalActivitiesTest.java index 7b873623c..967627095 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/activityTests/ParallelLocalActivitiesTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/activityTests/ParallelLocalActivitiesTest.java @@ -20,6 +20,7 @@ package io.temporal.workflow.activityTests; +import io.temporal.activity.LocalActivityOptions; import io.temporal.client.WorkflowOptions; import io.temporal.testing.internal.SDKTestOptions; import io.temporal.testing.internal.SDKTestWorkflowRule; @@ -38,10 +39,20 @@ import org.junit.Assert; import org.junit.Rule; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +@RunWith(Parameterized.class) public class ParallelLocalActivitiesTest { private final TestActivitiesImpl activitiesImpl = new TestActivitiesImpl(); + static final int TOTAL_LOCAL_ACT_COUNT = 100; + @Parameterized.Parameter public int maxLocalActivityExecutionSize; + + @Parameterized.Parameters + public static Object[] data() { + return new Object[] {50, TOTAL_LOCAL_ACT_COUNT}; + } @Rule public SDKTestWorkflowRule testWorkflowRule = @@ -52,7 +63,9 @@ public class ParallelLocalActivitiesTest { // Use a number lower than the number of concurrent activities to ensure that the // queueing of LAs when task executor is full works .setWorkerOptions( - WorkerOptions.newBuilder().setMaxConcurrentLocalActivityExecutionSize(50).build()) + WorkerOptions.newBuilder() + .setMaxConcurrentLocalActivityExecutionSize(maxLocalActivityExecutionSize) + .build()) .build(); @Test @@ -66,16 +79,18 @@ public void testParallelLocalActivities() { TestWorkflow1 workflowStub = testWorkflowRule.getWorkflowClient().newWorkflowStub(TestWorkflow1.class, options); - String result = workflowStub.execute(testWorkflowRule.getTaskQueue()); + String willQueue = maxLocalActivityExecutionSize < TOTAL_LOCAL_ACT_COUNT ? "yes" : ""; + String result = workflowStub.execute(willQueue); Assert.assertEquals("done", result); - Assert.assertEquals(activitiesImpl.toString(), 100, activitiesImpl.invocations.size()); - List expected = new ArrayList(); + Assert.assertEquals( + activitiesImpl.toString(), TOTAL_LOCAL_ACT_COUNT, activitiesImpl.invocations.size()); + List expected = new ArrayList<>(); expected.add("interceptExecuteWorkflow " + SDKTestWorkflowRule.UUID_REGEXP); expected.add("newThread workflow-method"); - for (int i = 0; i < TestParallelLocalActivitiesWorkflowImpl.COUNT; i++) { + for (int i = 0; i < TOTAL_LOCAL_ACT_COUNT; i++) { expected.add("executeLocalActivity SleepActivity"); } - for (int i = 0; i < TestParallelLocalActivitiesWorkflowImpl.COUNT; i++) { + for (int i = 0; i < TOTAL_LOCAL_ACT_COUNT; i++) { expected.add("local activity SleepActivity"); } testWorkflowRule @@ -84,16 +99,21 @@ public void testParallelLocalActivities() { } public static class TestParallelLocalActivitiesWorkflowImpl implements TestWorkflow1 { - static final int COUNT = 100; @Override - public String execute(String taskQueue) { + public String execute(String willQueue) { + LocalActivityOptions laOptions = SDKTestOptions.newLocalActivityOptions(); + // For the case where LAs will be forced to queue, we want to use start-to-close rather + // than schedule-to-start timeouts. + if (!willQueue.isEmpty()) { + laOptions = + LocalActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(5)).build(); + } VariousTestActivities localActivities = - Workflow.newLocalActivityStub( - VariousTestActivities.class, SDKTestOptions.newLocalActivityOptions()); + Workflow.newLocalActivityStub(VariousTestActivities.class, laOptions); List> laResults = new ArrayList<>(); Random r = Workflow.newRandom(); - for (int i = 0; i < COUNT; i++) { + for (int i = 0; i < TOTAL_LOCAL_ACT_COUNT; i++) { laResults.add(Async.function(localActivities::sleepActivity, (long) r.nextInt(3000), i)); } Promise.allOf(laResults).get();