diff --git a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/ConnectionManagerWorkflow.java b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/ConnectionManagerWorkflow.java index dcd7fc6637ecc..43597afd7c1cc 100644 --- a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/ConnectionManagerWorkflow.java +++ b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/ConnectionManagerWorkflow.java @@ -56,6 +56,9 @@ public interface ConnectionManagerWorkflow { @SignalMethod void resetConnection(); + @SignalMethod + void resetConnectionAndSkipNextScheduling(); + /** * If an activity fails the workflow will be stuck. This signal activity can be used to retry the * activity. diff --git a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/state/WorkflowState.java b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/state/WorkflowState.java index b345700fcfa02..e182d1cfd7860 100644 --- a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/state/WorkflowState.java +++ b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/state/WorkflowState.java @@ -39,6 +39,7 @@ public WorkflowState(final UUID id, final WorkflowStateChangedListener stateChan @Deprecated private final boolean resetWithScheduling = false; private boolean doneWaiting = false; + private boolean skipSchedulingNextWorkflow = false; public void setRunning(final boolean running) { final ChangedStateEvent event = new ChangedStateEvent( @@ -128,6 +129,14 @@ public void setDoneWaiting(final boolean doneWaiting) { this.doneWaiting = doneWaiting; } + public void setSkipSchedulingNextWorkflow(final boolean skipSchedulingNextWorkflow) { + final ChangedStateEvent event = new ChangedStateEvent( + StateField.SKIP_SCHEDULING_NEXT_WORKFLOW, + skipSchedulingNextWorkflow); + stateChangedListener.addEvent(id, event); + this.skipSchedulingNextWorkflow = skipSchedulingNextWorkflow; + } + // TODO: bmoric -> This is noisy when inpecting the list of event, it should be just a single reset // event. public void reset() { @@ -141,6 +150,7 @@ public void reset() { this.setSuccess(false); this.setQuarantined(false); this.setDoneWaiting(false); + this.setSkipSchedulingNextWorkflow(false); } } diff --git a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/state/listener/WorkflowStateChangedListener.java b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/state/listener/WorkflowStateChangedListener.java index 86866bf62da15..cb302b422c263 100644 --- a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/state/listener/WorkflowStateChangedListener.java +++ b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/state/listener/WorkflowStateChangedListener.java @@ -37,6 +37,7 @@ enum StateField { CANCELLED_FOR_RESET, RESET_WITH_SCHEDULING, DONE_WAITING, + SKIP_SCHEDULING_NEXT_WORKFLOW, } @Value diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java index 179609466aacc..dfe277c1704e1 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java @@ -377,7 +377,8 @@ private JobInfoRead submitManualSyncToWorker(final UUID connectionId) throws IOE private JobInfoRead submitResetConnectionToWorker(final UUID connectionId) throws IOException, JsonValidationException, ConfigNotFoundException { final ManualOperationResult resetConnectionResult = eventRunner.resetConnection( connectionId, - configRepository.getAllStreamsForConnection(connectionId)); + configRepository.getAllStreamsForConnection(connectionId), + false); return readJobFromResult(resetConnectionResult); } diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java index 28293cfadce13..88f98c05389ea 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java @@ -52,7 +52,6 @@ import io.airbyte.server.scheduler.EventRunner; import io.airbyte.validation.json.JsonValidationException; import io.airbyte.workers.helper.ProtocolConverters; -import io.airbyte.workers.temporal.TemporalClient.ManualOperationResult; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -422,12 +421,9 @@ private ConnectionRead resetStreamsIfNeeded(final WebBackendConnectionUpdate web if (stateType == ConnectionStateType.LEGACY || stateType == ConnectionStateType.NOT_SET) { streamsToReset = configRepository.getAllStreamsForConnection(connectionId); } - ManualOperationResult manualOperationResult = eventRunner.synchronousResetConnection( + eventRunner.resetConnection( connectionId, - streamsToReset); - verifyManualOperationResult(manualOperationResult); - manualOperationResult = eventRunner.startNewManualSync(connectionId); - verifyManualOperationResult(manualOperationResult); + streamsToReset, true); // return updated connectionRead after reset return connectionsHandler.getConnection(connectionId); @@ -437,12 +433,6 @@ private ConnectionRead resetStreamsIfNeeded(final WebBackendConnectionUpdate web return updatedConnectionRead; } - private void verifyManualOperationResult(final ManualOperationResult manualOperationResult) throws IllegalStateException { - if (manualOperationResult.getFailingReason().isPresent()) { - throw new IllegalStateException(manualOperationResult.getFailingReason().get()); - } - } - private List createOperations(final WebBackendConnectionCreate webBackendConnectionCreate) throws JsonValidationException, ConfigNotFoundException, IOException { if (webBackendConnectionCreate.getOperations() == null) { diff --git a/airbyte-server/src/main/java/io/airbyte/server/scheduler/EventRunner.java b/airbyte-server/src/main/java/io/airbyte/server/scheduler/EventRunner.java index 366c44e935b98..56d8fb8cae448 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/scheduler/EventRunner.java +++ b/airbyte-server/src/main/java/io/airbyte/server/scheduler/EventRunner.java @@ -18,9 +18,7 @@ public interface EventRunner { ManualOperationResult startNewCancellation(final UUID connectionId); - ManualOperationResult resetConnection(final UUID connectionId, final List streamsToReset); - - ManualOperationResult synchronousResetConnection(final UUID connectionId, final List streamsToReset); + ManualOperationResult resetConnection(final UUID connectionId, final List streamsToReset, final boolean runSyncImmediately); void deleteConnection(final UUID connectionId); diff --git a/airbyte-server/src/main/java/io/airbyte/server/scheduler/TemporalEventRunner.java b/airbyte-server/src/main/java/io/airbyte/server/scheduler/TemporalEventRunner.java index 6d0fb0d91ce4a..1ff3cef84a016 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/scheduler/TemporalEventRunner.java +++ b/airbyte-server/src/main/java/io/airbyte/server/scheduler/TemporalEventRunner.java @@ -33,13 +33,10 @@ public ManualOperationResult startNewCancellation(final UUID connectionId) { } @Override - public ManualOperationResult resetConnection(final UUID connectionId, final List streamsToReset) { - return temporalClient.resetConnection(connectionId, streamsToReset); - } - - @Override - public ManualOperationResult synchronousResetConnection(final UUID connectionId, final List streamsToReset) { - return temporalClient.synchronousResetConnection(connectionId, streamsToReset); + public ManualOperationResult resetConnection(final UUID connectionId, + final List streamsToReset, + final boolean runSyncImmediately) { + return temporalClient.resetConnection(connectionId, streamsToReset, runSyncImmediately); } @Override diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java index 5dbe9a8149a8c..33a8bae6fd2a0 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java @@ -623,7 +623,7 @@ void testResetConnection() throws IOException, JsonValidationException, ConfigNo when(configRepository.getAllStreamsForConnection(connectionId)) .thenReturn(streamDescriptors); - when(eventRunner.resetConnection(connectionId, streamDescriptors)) + when(eventRunner.resetConnection(connectionId, streamDescriptors, false)) .thenReturn(manualOperationResult); doReturn(new JobInfoRead()) @@ -631,7 +631,7 @@ void testResetConnection() throws IOException, JsonValidationException, ConfigNo schedulerHandler.resetConnection(new ConnectionIdRequestBody().connectionId(connectionId)); - verify(eventRunner).resetConnection(connectionId, streamDescriptors); + verify(eventRunner).resetConnection(connectionId, streamDescriptors, false); } @Test diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java index 37e79fed6e93a..f4c3d48652a6e 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java @@ -8,6 +8,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; @@ -642,7 +643,7 @@ void testUpdateConnectionWithUpdatedSchemaLegacy() throws JsonValidationExceptio when(configRepository.getAllStreamsForConnection(expected.getConnectionId())).thenReturn(connectionStreams); final ManualOperationResult successfulResult = ManualOperationResult.builder().jobId(Optional.empty()).failingReason(Optional.empty()).build(); - when(eventRunner.synchronousResetConnection(any(), any())).thenReturn(successfulResult); + when(eventRunner.resetConnection(any(), any(), anyBoolean())).thenReturn(successfulResult); when(eventRunner.startNewManualSync(any())).thenReturn(successfulResult); final WebBackendConnectionRead result = wbHandler.webBackendUpdateConnection(updateBody); @@ -654,8 +655,7 @@ void testUpdateConnectionWithUpdatedSchemaLegacy() throws JsonValidationExceptio verify(schedulerHandler, times(0)).syncConnection(connectionId); verify(connectionsHandler, times(1)).updateConnection(any()); final InOrder orderVerifier = inOrder(eventRunner); - orderVerifier.verify(eventRunner, times(1)).synchronousResetConnection(connectionId.getConnectionId(), connectionStreams); - orderVerifier.verify(eventRunner, times(1)).startNewManualSync(connectionId.getConnectionId()); + orderVerifier.verify(eventRunner, times(1)).resetConnection(connectionId.getConnectionId(), connectionStreams, true); } @Test @@ -708,7 +708,7 @@ void testUpdateConnectionWithUpdatedSchemaPerStream() throws JsonValidationExcep when(connectionsHandler.getConnection(expected.getConnectionId())).thenReturn(connectionRead); final ManualOperationResult successfulResult = ManualOperationResult.builder().jobId(Optional.empty()).failingReason(Optional.empty()).build(); - when(eventRunner.synchronousResetConnection(any(), any())).thenReturn(successfulResult); + when(eventRunner.resetConnection(any(), any(), anyBoolean())).thenReturn(successfulResult); when(eventRunner.startNewManualSync(any())).thenReturn(successfulResult); final WebBackendConnectionRead result = wbHandler.webBackendUpdateConnection(updateBody); @@ -720,12 +720,12 @@ void testUpdateConnectionWithUpdatedSchemaPerStream() throws JsonValidationExcep verify(schedulerHandler, times(0)).syncConnection(connectionId); verify(connectionsHandler, times(1)).updateConnection(any()); final InOrder orderVerifier = inOrder(eventRunner); - orderVerifier.verify(eventRunner, times(1)).synchronousResetConnection(connectionId.getConnectionId(), + orderVerifier.verify(eventRunner, times(1)).resetConnection(connectionId.getConnectionId(), List.of(new io.airbyte.protocol.models.StreamDescriptor().withName("addStream"), new io.airbyte.protocol.models.StreamDescriptor().withName("updateStream"), new io.airbyte.protocol.models.StreamDescriptor().withName("configUpdateStream"), - new io.airbyte.protocol.models.StreamDescriptor().withName("removeStream"))); - orderVerifier.verify(eventRunner, times(1)).startNewManualSync(connectionId.getConnectionId()); + new io.airbyte.protocol.models.StreamDescriptor().withName("removeStream")), + true); } @Test @@ -776,7 +776,7 @@ void testUpdateConnectionNoStreamsToReset() throws JsonValidationException, Conf verify(schedulerHandler, times(0)).syncConnection(connectionId); verify(connectionsHandler, times(1)).updateConnection(any()); final InOrder orderVerifier = inOrder(eventRunner); - orderVerifier.verify(eventRunner, times(0)).synchronousResetConnection(eq(connectionId.getConnectionId()), any()); + orderVerifier.verify(eventRunner, times(0)).resetConnection(eq(connectionId.getConnectionId()), any(), anyBoolean()); orderVerifier.verify(eventRunner, times(0)).startNewManualSync(connectionId.getConnectionId()); } @@ -819,8 +819,7 @@ void testUpdateConnectionWithSkipReset() throws JsonValidationException, ConfigN verify(schedulerHandler, times(0)).syncConnection(connectionId); verify(connectionsHandler, times(0)).getDiff(any(), any()); verify(connectionsHandler, times(1)).updateConnection(any()); - verify(eventRunner, times(0)).synchronousResetConnection(any(), any()); - verify(eventRunner, times(0)).startNewManualSync(any()); + verify(eventRunner, times(0)).resetConnection(any(), any(), eq(true)); } @Test diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java index 1174de133c686..bcb646db3ac42 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java @@ -386,7 +386,9 @@ public ManualOperationResult startNewCancellation(final UUID connectionId) { Optional.of(jobId), Optional.empty()); } - public ManualOperationResult resetConnection(final UUID connectionId, final List streamsToReset) { + public ManualOperationResult resetConnection(final UUID connectionId, + final List streamsToReset, + final boolean syncImmediatelyAfter) { log.info("reset sync request"); try { @@ -402,7 +404,11 @@ public ManualOperationResult resetConnection(final UUID connectionId, final List final long oldJobId = connectionManagerUtils.getCurrentJobId(client, connectionId); try { - connectionManagerUtils.signalWorkflowAndRepairIfNecessary(client, connectionId, workflow -> workflow::resetConnection); + if (syncImmediatelyAfter) { + connectionManagerUtils.signalWorkflowAndRepairIfNecessary(client, connectionId, workflow -> workflow::resetConnectionAndSkipNextScheduling); + } else { + connectionManagerUtils.signalWorkflowAndRepairIfNecessary(client, connectionId, workflow -> workflow::resetConnection); + } } catch (final DeletedWorkflowException e) { log.error("Can't reset a deleted workflow", e); return new ManualOperationResult( @@ -439,36 +445,6 @@ private Optional getNewJobId(final UUID connectionId, final long oldJobId) } } - /** - * This is launching a reset and wait for the reset to be performed. - * - * The way to do so is to wait for the jobId to change, either to a new job id or the default id - * that signal that a workflow is waiting to be submitted - */ - public ManualOperationResult synchronousResetConnection(final UUID connectionId, final List streamsToReset) { - final ManualOperationResult resetResult = resetConnection(connectionId, streamsToReset); - if (resetResult.getFailingReason().isPresent()) { - return resetResult; - } - - final long resetJobId = resetResult.getJobId().get(); - do { - try { - Thread.sleep(DELAY_BETWEEN_QUERY_MS); - } catch (final InterruptedException e) { - return new ManualOperationResult( - Optional.of("Didn't manage to reset a sync for: " + connectionId), - Optional.empty(), Optional.of(ErrorCode.UNKNOWN)); - } - } while (connectionManagerUtils.getCurrentJobId(client, connectionId) == resetJobId); - - log.info("End of reset"); - - return new ManualOperationResult( - Optional.empty(), - Optional.of(resetJobId), Optional.empty()); - } - /** * This should be in the class {@li} * diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java index 3e263bc9472ed..d35f6c515ed4c 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java @@ -492,6 +492,18 @@ public void resetConnection() { } } + @Override + public void resetConnectionAndSkipNextScheduling() { + if (workflowState.isDoneWaiting()) { + workflowState.setCancelledForReset(true); + workflowState.setSkipSchedulingNextWorkflow(true); + cancellableSyncWorkflow.cancel(); + } else { + workflowState.setSkipScheduling(true); + workflowState.setSkipSchedulingNextWorkflow(true); + } + } + @Override public void retryFailedActivity() { workflowState.setRetryFailedActivity(true); @@ -537,6 +549,9 @@ private void prepareForNextRunAndContinueAsNew(final ConnectionUpdaterInput conn workflowInternalState.getFailures().clear(); workflowInternalState.setPartialSuccess(null); final boolean isDeleted = workflowState.isDeleted(); + if (workflowState.isSkipSchedulingNextWorkflow()) { + connectionUpdaterInput.setSkipScheduling(true); + } workflowState.reset(); if (!isDeleted) { Workflow.continueAsNew(connectionUpdaterInput); diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalClientTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalClientTest.java index 37455ce758b99..59b1caa9ee74d 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalClientTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalClientTest.java @@ -258,35 +258,6 @@ void testSubmitSync() { verify(workflowClient).newWorkflowStub(SyncWorkflow.class, TemporalWorkflowUtils.buildWorkflowOptions(TemporalJobType.SYNC)); } - @Test - void testSynchronousResetConnection() throws IOException { - final ConnectionManagerWorkflow mConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class); - final WorkflowState mWorkflowState = mock(WorkflowState.class); - when(mConnectionManagerWorkflow.getState()).thenReturn(mWorkflowState); - when(mWorkflowState.isDeleted()).thenReturn(false); - final long resetJobId = 1L; - - when(mConnectionManagerWorkflow.getJobInformation()).thenReturn( - new JobInformation(NON_RUNNING_JOB_ID, 0), - new JobInformation(NON_RUNNING_JOB_ID, 0), - new JobInformation(resetJobId, 0), - new JobInformation(resetJobId, 0), - new JobInformation(NON_RUNNING_JOB_ID, 0), - new JobInformation(NON_RUNNING_JOB_ID, 0)); - - doReturn(true).when(temporalClient).isWorkflowReachable(any(UUID.class)); - - when(workflowClient.newWorkflowStub(any(Class.class), anyString())).thenReturn(mConnectionManagerWorkflow); - - final List streamsToReset = List.of(STREAM_DESCRIPTOR); - final ManualOperationResult manualOperationResult = temporalClient.synchronousResetConnection(CONNECTION_ID, streamsToReset); - - verify(streamResetPersistence).createStreamResets(CONNECTION_ID, streamsToReset); - verify(mConnectionManagerWorkflow).resetConnection(); - - assertEquals(manualOperationResult.getJobId().get(), resetJobId); - } - } @Nested @@ -663,7 +634,7 @@ void testResetConnectionSuccess() throws IOException { when(workflowClient.newWorkflowStub(any(), anyString())).thenReturn(mConnectionManagerWorkflow); final List streamsToReset = List.of(STREAM_DESCRIPTOR); - final ManualOperationResult result = temporalClient.resetConnection(CONNECTION_ID, streamsToReset); + final ManualOperationResult result = temporalClient.resetConnection(CONNECTION_ID, streamsToReset, false); verify(streamResetPersistence).createStreamResets(CONNECTION_ID, streamsToReset); @@ -673,6 +644,36 @@ void testResetConnectionSuccess() throws IOException { verify(mConnectionManagerWorkflow).resetConnection(); } + @Test + @DisplayName("Test resetConnection successful") + void testResetConnectionSuccessAndContinue() throws IOException { + final ConnectionManagerWorkflow mConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class); + final WorkflowState mWorkflowState = mock(WorkflowState.class); + when(mConnectionManagerWorkflow.getState()).thenReturn(mWorkflowState); + when(mWorkflowState.isDeleted()).thenReturn(false); + when(mWorkflowState.isRunning()).thenReturn(false); + final long jobId1 = 1; + final long jobId2 = 2; + when(mConnectionManagerWorkflow.getJobInformation()).thenReturn( + new JobInformation(jobId1, 0), + new JobInformation(jobId1, 0), + new JobInformation(NON_RUNNING_JOB_ID, 0), + new JobInformation(NON_RUNNING_JOB_ID, 0), + new JobInformation(jobId2, 0), + new JobInformation(jobId2, 0)); + when(workflowClient.newWorkflowStub(any(), anyString())).thenReturn(mConnectionManagerWorkflow); + + final List streamsToReset = List.of(STREAM_DESCRIPTOR); + final ManualOperationResult result = temporalClient.resetConnection(CONNECTION_ID, streamsToReset, true); + + verify(streamResetPersistence).createStreamResets(CONNECTION_ID, streamsToReset); + + assertTrue(result.getJobId().isPresent()); + assertEquals(jobId2, result.getJobId().get()); + assertFalse(result.getFailingReason().isPresent()); + verify(mConnectionManagerWorkflow).resetConnectionAndSkipNextScheduling(); + } + @Test @DisplayName("Test resetConnection repairs the workflow if it is in a bad state") void testResetConnectionRepairsBadWorkflowState() throws IOException { @@ -699,7 +700,7 @@ void testResetConnectionRepairsBadWorkflowState() throws IOException { mNewConnectionManagerWorkflow); final List streamsToReset = List.of(STREAM_DESCRIPTOR); - final ManualOperationResult result = temporalClient.resetConnection(CONNECTION_ID, streamsToReset); + final ManualOperationResult result = temporalClient.resetConnection(CONNECTION_ID, streamsToReset, false); verify(streamResetPersistence).createStreamResets(CONNECTION_ID, streamsToReset); @@ -729,7 +730,7 @@ void testResetConnectionDeletedWorkflow() throws IOException { mockWorkflowStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED); final List streamsToReset = List.of(STREAM_DESCRIPTOR); - final ManualOperationResult result = temporalClient.resetConnection(CONNECTION_ID, streamsToReset); + final ManualOperationResult result = temporalClient.resetConnection(CONNECTION_ID, streamsToReset, false); verify(streamResetPersistence).createStreamResets(CONNECTION_ID, streamsToReset); diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java index 37aa510782658..8c91948b05f2b 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java @@ -717,6 +717,41 @@ void resetStart() throws InterruptedException { } + @Test + @Timeout(value = 10, + unit = TimeUnit.SECONDS) + @DisplayName("Test that resetting a non-running workflow starts a reset job") + void resetAndContinue() throws InterruptedException { + + final UUID testId = UUID.randomUUID(); + final TestStateListener testStateListener = new TestStateListener(); + final WorkflowState workflowState = new WorkflowState(testId, testStateListener); + + final ConnectionUpdaterInput input = ConnectionUpdaterInput.builder() + .connectionId(UUID.randomUUID()) + .jobId(JOB_ID) + .attemptId(ATTEMPT_ID) + .fromFailure(false) + .attemptNumber(1) + .workflowState(workflowState) + .build(); + + startWorkflowAndWaitUntilReady(workflow, input); + testEnv.sleep(Duration.ofMinutes(5L)); + workflow.resetConnectionAndSkipNextScheduling(); + Thread.sleep(500); + + final Queue events = testStateListener.events(testId); + + Assertions.assertThat(events) + .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.SKIPPED_SCHEDULING && changedStateEvent.isValue()) + .hasSizeGreaterThanOrEqualTo(1); + + Assertions.assertThat(events) + .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.SKIP_SCHEDULING_NEXT_WORKFLOW && changedStateEvent.isValue()) + .hasSizeGreaterThanOrEqualTo(1); + } + @Test @Timeout(value = 60, unit = TimeUnit.SECONDS)