-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Persist state to reset connection job config #13867
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,6 +16,7 @@ | |
import io.airbyte.config.SourceConnection; | ||
import io.airbyte.config.StandardSync; | ||
import io.airbyte.config.StandardSyncOperation; | ||
import io.airbyte.config.State; | ||
import io.airbyte.config.StreamDescriptor; | ||
import io.airbyte.config.persistence.ConfigRepository; | ||
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; | ||
|
@@ -24,6 +25,7 @@ | |
import java.io.IOException; | ||
import java.util.List; | ||
import java.util.Optional; | ||
import java.util.UUID; | ||
import javax.annotation.Nullable; | ||
|
||
public class DefaultJobCreator implements JobCreator { | ||
|
@@ -76,7 +78,7 @@ public Optional<Long> createSyncJob(final SourceConnection source, | |
workerResourceRequirements, | ||
JobType.SYNC)); | ||
|
||
configRepository.getConnectionState(standardSync.getConnectionId()).ifPresent(jobSyncConfig::withState); | ||
getCurrentConnectionState(standardSync.getConnectionId()).ifPresent(jobSyncConfig::withState); | ||
|
||
final JobConfig jobConfig = new JobConfig() | ||
.withConfigType(ConfigType.SYNC) | ||
|
@@ -116,10 +118,18 @@ public Optional<Long> createResetConnectionJob(final DestinationConnection desti | |
workerResourceRequirements)) | ||
.withResetSourceConfiguration(new ResetSourceConfiguration().withStreamsToReset(streamsToReset)); | ||
|
||
getCurrentConnectionState(standardSync.getConnectionId()).ifPresent(resetConnectionConfig::withState); | ||
|
||
final JobConfig jobConfig = new JobConfig() | ||
.withConfigType(ConfigType.RESET_CONNECTION) | ||
.withResetConnection(resetConnectionConfig); | ||
return jobPersistence.enqueueJob(standardSync.getConnectionId().toString(), jobConfig); | ||
} | ||
|
||
// TODO (https://github.com/airbytehq/airbyte/issues/13620): update this method implementation | ||
// to fetch and serialize the new per-stream state format into a State object | ||
private Optional<State> getCurrentConnectionState(final UUID connectionId) throws IOException { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah...I see. Disregard my previous comment unless you also want to pass the function in here and return void. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, it felt slightly more clean to just duplicate the |
||
return configRepository.getConnectionState(connectionId); | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,6 +31,7 @@ | |
import io.airbyte.config.StandardSync; | ||
import io.airbyte.config.StandardSyncOperation; | ||
import io.airbyte.config.StandardSyncOperation.OperatorType; | ||
import io.airbyte.config.State; | ||
import io.airbyte.config.StreamDescriptor; | ||
import io.airbyte.config.persistence.ConfigRepository; | ||
import io.airbyte.protocol.models.CatalogHelpers; | ||
|
@@ -42,6 +43,7 @@ | |
import java.io.IOException; | ||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
import java.util.UUID; | ||
import org.junit.jupiter.api.BeforeEach; | ||
|
@@ -333,7 +335,10 @@ void testCreateResetConnectionJob() throws IOException { | |
configuredAirbyteStream.setDestinationSyncMode(DestinationSyncMode.OVERWRITE); | ||
}); | ||
|
||
final JobResetConnectionConfig JobResetConnectionConfig = new JobResetConnectionConfig() | ||
final State connectionState = new State().withState(Jsons.jsonNode(Map.of("key", "val"))); | ||
when(configRepository.getConnectionState(STANDARD_SYNC.getConnectionId())).thenReturn(Optional.of(connectionState)); | ||
|
||
final JobResetConnectionConfig jobResetConnectionConfig = new JobResetConnectionConfig() | ||
.withNamespaceDefinition(STANDARD_SYNC.getNamespaceDefinition()) | ||
.withNamespaceFormat(STANDARD_SYNC.getNamespaceFormat()) | ||
.withPrefix(STANDARD_SYNC.getPrefix()) | ||
|
@@ -342,22 +347,26 @@ void testCreateResetConnectionJob() throws IOException { | |
.withConfiguredAirbyteCatalog(expectedCatalog) | ||
.withOperationSequence(List.of(STANDARD_SYNC_OPERATION)) | ||
.withResourceRequirements(workerResourceRequirements) | ||
.withResetSourceConfiguration(new ResetSourceConfiguration().withStreamsToReset(List.of(STREAM_DESCRIPTOR1, STREAM_DESCRIPTOR2))); | ||
.withResetSourceConfiguration(new ResetSourceConfiguration().withStreamsToReset(List.of(STREAM_DESCRIPTOR1, STREAM_DESCRIPTOR2))) | ||
.withState(connectionState); | ||
|
||
final JobConfig jobConfig = new JobConfig() | ||
.withConfigType(ConfigType.RESET_CONNECTION) | ||
.withResetConnection(JobResetConnectionConfig); | ||
.withResetConnection(jobResetConnectionConfig); | ||
|
||
final String expectedScope = STANDARD_SYNC.getConnectionId().toString(); | ||
when(jobPersistence.enqueueJob(expectedScope, jobConfig)).thenReturn(Optional.of(JOB_ID)); | ||
|
||
final long jobId = jobCreator.createResetConnectionJob( | ||
final Optional<Long> jobId = jobCreator.createResetConnectionJob( | ||
DESTINATION_CONNECTION, | ||
STANDARD_SYNC, | ||
DESTINATION_IMAGE_NAME, | ||
List.of(STANDARD_SYNC_OPERATION), | ||
List.of(STREAM_DESCRIPTOR1, STREAM_DESCRIPTOR2)).orElseThrow(); | ||
assertEquals(JOB_ID, jobId); | ||
List.of(STREAM_DESCRIPTOR1, STREAM_DESCRIPTOR2)); | ||
|
||
verify(jobPersistence).enqueueJob(expectedScope, jobConfig); | ||
assertTrue(jobId.isPresent()); | ||
assertEquals(JOB_ID, jobId.get()); | ||
Comment on lines
+367
to
+369
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I updated these tests a bit to actually verify that the JobResetConnectionConfig defined in the test is actually what is being passed to |
||
} | ||
|
||
@Test | ||
|
@@ -369,7 +378,10 @@ void testCreateResetConnectionJobEnsureNoQueuing() throws IOException { | |
configuredAirbyteStream.setDestinationSyncMode(DestinationSyncMode.OVERWRITE); | ||
}); | ||
|
||
final JobResetConnectionConfig JobResetConnectionConfig = new JobResetConnectionConfig() | ||
final State connectionState = new State().withState(Jsons.jsonNode(Map.of("key", "val"))); | ||
when(configRepository.getConnectionState(STANDARD_SYNC.getConnectionId())).thenReturn(Optional.of(connectionState)); | ||
|
||
final JobResetConnectionConfig jobResetConnectionConfig = new JobResetConnectionConfig() | ||
.withNamespaceDefinition(STANDARD_SYNC.getNamespaceDefinition()) | ||
.withNamespaceFormat(STANDARD_SYNC.getNamespaceFormat()) | ||
.withPrefix(STANDARD_SYNC.getPrefix()) | ||
|
@@ -378,21 +390,25 @@ void testCreateResetConnectionJobEnsureNoQueuing() throws IOException { | |
.withConfiguredAirbyteCatalog(expectedCatalog) | ||
.withOperationSequence(List.of(STANDARD_SYNC_OPERATION)) | ||
.withResourceRequirements(workerResourceRequirements) | ||
.withResetSourceConfiguration(new ResetSourceConfiguration().withStreamsToReset(List.of(STREAM_DESCRIPTOR1, STREAM_DESCRIPTOR2))); | ||
.withResetSourceConfiguration(new ResetSourceConfiguration().withStreamsToReset(List.of(STREAM_DESCRIPTOR1, STREAM_DESCRIPTOR2))) | ||
.withState(connectionState); | ||
|
||
final JobConfig jobConfig = new JobConfig() | ||
.withConfigType(ConfigType.RESET_CONNECTION) | ||
.withResetConnection(JobResetConnectionConfig); | ||
.withResetConnection(jobResetConnectionConfig); | ||
|
||
final String expectedScope = STANDARD_SYNC.getConnectionId().toString(); | ||
when(jobPersistence.enqueueJob(expectedScope, jobConfig)).thenReturn(Optional.empty()); | ||
|
||
assertTrue(jobCreator.createResetConnectionJob( | ||
final Optional<Long> jobId = jobCreator.createResetConnectionJob( | ||
DESTINATION_CONNECTION, | ||
STANDARD_SYNC, | ||
DESTINATION_IMAGE_NAME, | ||
List.of(STANDARD_SYNC_OPERATION), | ||
List.of(STREAM_DESCRIPTOR1, STREAM_DESCRIPTOR2)).isEmpty()); | ||
List.of(STREAM_DESCRIPTOR1, STREAM_DESCRIPTOR2)); | ||
|
||
verify(jobPersistence).enqueueJob(expectedScope, jobConfig); | ||
assertTrue(jobId.isEmpty()); | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like this could be abstracted to a method that takes the function to call if present instead of duplicating between read and reset.