Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persist state to reset connection job config #13867

Merged
merged 3 commits into from
Jun 17, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,6 @@ properties:
existingJavaType: io.airbyte.config.ResourceRequirements
resetSourceConfiguration:
"$ref": ResetSourceConfiguration.yaml
state:
description: optional current state of the connection
"$ref": State.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.airbyte.config.SourceConnection;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSyncOperation;
import io.airbyte.config.State;
import io.airbyte.config.StreamDescriptor;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
Expand All @@ -24,6 +25,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import javax.annotation.Nullable;

public class DefaultJobCreator implements JobCreator {
Expand Down Expand Up @@ -76,7 +78,7 @@ public Optional<Long> createSyncJob(final SourceConnection source,
workerResourceRequirements,
JobType.SYNC));

configRepository.getConnectionState(standardSync.getConnectionId()).ifPresent(jobSyncConfig::withState);
getCurrentConnectionState(standardSync.getConnectionId()).ifPresent(jobSyncConfig::withState);

final JobConfig jobConfig = new JobConfig()
.withConfigType(ConfigType.SYNC)
Expand Down Expand Up @@ -116,10 +118,18 @@ public Optional<Long> createResetConnectionJob(final DestinationConnection desti
workerResourceRequirements))
.withResetSourceConfiguration(new ResetSourceConfiguration().withStreamsToReset(streamsToReset));

getCurrentConnectionState(standardSync.getConnectionId()).ifPresent(resetConnectionConfig::withState);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like this could be abstracted to a method that takes the function to call if present instead of duplicating between read and reset.


final JobConfig jobConfig = new JobConfig()
.withConfigType(ConfigType.RESET_CONNECTION)
.withResetConnection(resetConnectionConfig);
return jobPersistence.enqueueJob(standardSync.getConnectionId().toString(), jobConfig);
}

// TODO (https://github.com/airbytehq/airbyte/issues/13620): update this method implementation
// to fetch and serialize the new per-stream state format into a State object
private Optional<State> getCurrentConnectionState(final UUID connectionId) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah...I see. Disregard my previous comment unless you also want to pass the function in here and return void.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it felt slightly more clean to just duplicate the .ifPresent(xConfig::withState) part than to pass that in as a function

return configRepository.getConnectionState(connectionId);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSyncOperation;
import io.airbyte.config.StandardSyncOperation.OperatorType;
import io.airbyte.config.State;
import io.airbyte.config.StreamDescriptor;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.protocol.models.CatalogHelpers;
Expand All @@ -42,6 +43,7 @@
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -333,6 +335,9 @@ void testCreateResetConnectionJob() throws IOException {
configuredAirbyteStream.setDestinationSyncMode(DestinationSyncMode.OVERWRITE);
});

final State connectionState = new State().withState(Jsons.jsonNode(Map.of("key", "val")));
when(configRepository.getConnectionState(STANDARD_SYNC.getConnectionId())).thenReturn(Optional.of(connectionState));

final JobResetConnectionConfig JobResetConnectionConfig = new JobResetConnectionConfig()
.withNamespaceDefinition(STANDARD_SYNC.getNamespaceDefinition())
.withNamespaceFormat(STANDARD_SYNC.getNamespaceFormat())
Expand All @@ -342,7 +347,8 @@ void testCreateResetConnectionJob() throws IOException {
.withConfiguredAirbyteCatalog(expectedCatalog)
.withOperationSequence(List.of(STANDARD_SYNC_OPERATION))
.withResourceRequirements(workerResourceRequirements)
.withResetSourceConfiguration(new ResetSourceConfiguration().withStreamsToReset(List.of(STREAM_DESCRIPTOR1, STREAM_DESCRIPTOR2)));
.withResetSourceConfiguration(new ResetSourceConfiguration().withStreamsToReset(List.of(STREAM_DESCRIPTOR1, STREAM_DESCRIPTOR2)))
.withState(connectionState);

final JobConfig jobConfig = new JobConfig()
.withConfigType(ConfigType.RESET_CONNECTION)
Expand All @@ -351,13 +357,16 @@ void testCreateResetConnectionJob() throws IOException {
final String expectedScope = STANDARD_SYNC.getConnectionId().toString();
when(jobPersistence.enqueueJob(expectedScope, jobConfig)).thenReturn(Optional.of(JOB_ID));

final long jobId = jobCreator.createResetConnectionJob(
final Optional<Long> jobId = jobCreator.createResetConnectionJob(
DESTINATION_CONNECTION,
STANDARD_SYNC,
DESTINATION_IMAGE_NAME,
List.of(STANDARD_SYNC_OPERATION),
List.of(STREAM_DESCRIPTOR1, STREAM_DESCRIPTOR2)).orElseThrow();
assertEquals(JOB_ID, jobId);
List.of(STREAM_DESCRIPTOR1, STREAM_DESCRIPTOR2));

verify(jobPersistence).enqueueJob(expectedScope, jobConfig);
assertTrue(jobId.isPresent());
assertEquals(JOB_ID, jobId.get());
Comment on lines +367 to +369
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated these tests a bit to actually verify that the JobResetConnectionConfig defined in the test is actually what is being passed to enqueueJob. Without these changes, the tests would still pass regardless of if I set the state or not

}

@Test
Expand All @@ -369,6 +378,9 @@ void testCreateResetConnectionJobEnsureNoQueuing() throws IOException {
configuredAirbyteStream.setDestinationSyncMode(DestinationSyncMode.OVERWRITE);
});

final State connectionState = new State().withState(Jsons.jsonNode(Map.of("key", "val")));
when(configRepository.getConnectionState(STANDARD_SYNC.getConnectionId())).thenReturn(Optional.of(connectionState));

final JobResetConnectionConfig JobResetConnectionConfig = new JobResetConnectionConfig()
.withNamespaceDefinition(STANDARD_SYNC.getNamespaceDefinition())
.withNamespaceFormat(STANDARD_SYNC.getNamespaceFormat())
Expand All @@ -378,7 +390,8 @@ void testCreateResetConnectionJobEnsureNoQueuing() throws IOException {
.withConfiguredAirbyteCatalog(expectedCatalog)
.withOperationSequence(List.of(STANDARD_SYNC_OPERATION))
.withResourceRequirements(workerResourceRequirements)
.withResetSourceConfiguration(new ResetSourceConfiguration().withStreamsToReset(List.of(STREAM_DESCRIPTOR1, STREAM_DESCRIPTOR2)));
.withResetSourceConfiguration(new ResetSourceConfiguration().withStreamsToReset(List.of(STREAM_DESCRIPTOR1, STREAM_DESCRIPTOR2)))
.withState(connectionState);

final JobConfig jobConfig = new JobConfig()
.withConfigType(ConfigType.RESET_CONNECTION)
Expand All @@ -387,12 +400,15 @@ void testCreateResetConnectionJobEnsureNoQueuing() throws IOException {
final String expectedScope = STANDARD_SYNC.getConnectionId().toString();
when(jobPersistence.enqueueJob(expectedScope, jobConfig)).thenReturn(Optional.empty());

assertTrue(jobCreator.createResetConnectionJob(
final Optional<Long> jobId = jobCreator.createResetConnectionJob(
DESTINATION_CONNECTION,
STANDARD_SYNC,
DESTINATION_IMAGE_NAME,
List.of(STANDARD_SYNC_OPERATION),
List.of(STREAM_DESCRIPTOR1, STREAM_DESCRIPTOR2)).isEmpty());
List.of(STREAM_DESCRIPTOR1, STREAM_DESCRIPTOR2));

verify(jobPersistence).enqueueJob(expectedScope, jobConfig);
assertTrue(jobId.isEmpty());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ public GeneratedJobInput getSyncWorkflowInput(final SyncInput input) {
.withDestinationConfiguration(resetConnection.getDestinationConfiguration())
.withConfiguredAirbyteCatalog(resetConnection.getConfiguredAirbyteCatalog())
.withOperationSequence(resetConnection.getOperationSequence())
.withResourceRequirements(resetConnection.getResourceRequirements());
.withResourceRequirements(resetConnection.getResourceRequirements())
.withState(resetConnection.getState());
}

final JobRunConfig jobRunConfig = TemporalUtils.createJobRunConfig(jobId, attempt);
Expand Down