Skip to content

Commit

Permalink
🔪 Get rid of retries within an attempt. (#5570)
Browse files Browse the repository at this point in the history
See https://airbytehq.slack.com/archives/C019WEENQRM/p1629383779144300 for more details.

TLDR:
Retrying within an attempt is confusing for a UX perspective (we already have attempts), does not provides more value from a retries perspective (we already have attempts) and inefficient from a resource perspective (hogs up api limits and compute/memory resources).

Follow up ticket: #5571
  • Loading branch information
davinchia authored Aug 24, 2021
1 parent fe8f7fa commit 9dd05d4
Show file tree
Hide file tree
Showing 8 changed files with 17 additions and 411 deletions.
1 change: 0 additions & 1 deletion .env
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ RESOURCE_MEMORY_REQUEST=
RESOURCE_MEMORY_LIMIT=

# Max attempts per sync and max retries per attempt
MAX_RETRIES_PER_ATTEMPT=3
MAX_SYNC_JOB_ATTEMPTS=3

# Time in days to reach a timeout to cancel the synchronization
Expand Down
1 change: 0 additions & 1 deletion .env.dev
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,5 @@ HACK_LOCAL_ROOT_PARENT=/tmp
WEBAPP_URL=http://localhost:8000/
API_URL=/api/v1/
INTERNAL_API_HOST=airbyte-server:8001
MAX_RETRIES_PER_ATTEMPT=3
MAX_SYNC_JOB_ATTEMPTS=3
MAX_SYNC_TIMEOUT_DAYS=3
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ public interface Configs {

String getConfigDatabaseUrl();

int getMaxRetriesPerAttempt();

int getMaxSyncJobAttempts();

int getMaxSyncTimeoutDays();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ public class EnvConfigs implements Configs {
public static final String CONFIG_DATABASE_PASSWORD = "CONFIG_DATABASE_PASSWORD";
public static final String CONFIG_DATABASE_URL = "CONFIG_DATABASE_URL";
public static final String WEBAPP_URL = "WEBAPP_URL";
public static final String MAX_RETRIES_PER_ATTEMPT = "MAX_RETRIES_PER_ATTEMPT";
public static final String MAX_SYNC_JOB_ATTEMPTS = "MAX_SYNC_JOB_ATTEMPTS";
public static final String MAX_SYNC_TIMEOUT_DAYS = "MAX_SYNC_TIMEOUT_DAYS";
private static final String MINIMUM_WORKSPACE_RETENTION_DAYS = "MINIMUM_WORKSPACE_RETENTION_DAYS";
Expand Down Expand Up @@ -147,11 +146,6 @@ public String getDatabaseUrl() {
return getEnsureEnv(DATABASE_URL);
}

@Override
public int getMaxRetriesPerAttempt() {
return Integer.parseInt(getEnvOrDefault(MAX_RETRIES_PER_ATTEMPT, "3"));
}

@Override
public int getMaxSyncJobAttempts() {
return Integer.parseInt(getEnvOrDefault(MAX_SYNC_JOB_ATTEMPTS, "3"));
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,17 @@
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.functional.CheckedSupplier;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.util.MoreLists;
import io.airbyte.config.AirbyteConfigValidator;
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.EnvConfigs;
import io.airbyte.config.NormalizationInput;
import io.airbyte.config.OperatorDbtInput;
import io.airbyte.config.ReplicationAttemptSummary;
import io.airbyte.config.ReplicationOutput;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.config.StandardSyncInput;
import io.airbyte.config.StandardSyncOperation;
import io.airbyte.config.StandardSyncOperation.OperatorType;
import io.airbyte.config.StandardSyncOutput;
import io.airbyte.config.StandardSyncSummary;
import io.airbyte.config.StandardSyncSummary.ReplicationStatus;
import io.airbyte.scheduler.models.IntegrationLauncherConfig;
import io.airbyte.scheduler.models.JobRunConfig;
import io.airbyte.workers.DbtTransformationRunner;
Expand Down Expand Up @@ -69,9 +65,6 @@
import io.temporal.workflow.WorkflowMethod;
import java.nio.file.Path;
import java.time.Duration;
import java.util.List;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -151,8 +144,6 @@ class ReplicationActivityImpl implements ReplicationActivity {

private static final Logger LOGGER = LoggerFactory.getLogger(ReplicationActivityImpl.class);

private static final int MAX_RETRIES = new EnvConfigs().getMaxRetriesPerAttempt();

private final ProcessFactory processFactory;
private final Path workspaceRoot;
private final AirbyteConfigValidator validator;
Expand All @@ -179,60 +170,35 @@ public StandardSyncOutput replicate(JobRunConfig jobRunConfig,
return syncInput;
};

final Predicate<ReplicationOutput> shouldAttemptAgain =
output -> output.getReplicationAttemptSummary().getStatus() != ReplicationStatus.COMPLETED;
final TemporalAttemptExecution<StandardSyncInput, ReplicationOutput> temporalAttempt = new TemporalAttemptExecution<>(
workspaceRoot,
jobRunConfig,
getWorkerFactory(sourceLauncherConfig, destinationLauncherConfig, jobRunConfig, syncInput),
inputSupplier,
new CancellationHandler.TemporalCancellationHandler());

final BiFunction<StandardSyncInput, ReplicationOutput, StandardSyncInput> nextAttemptInput = (input, lastOutput) -> {
final StandardSyncInput newInput = Jsons.clone(input);
newInput.setState(lastOutput.getState());
return newInput;
};
final ReplicationOutput attemptOutput = temporalAttempt.get();
final StandardSyncOutput standardSyncOutput = reduceReplicationOutput(attemptOutput);

final RetryingTemporalAttemptExecution<StandardSyncInput, ReplicationOutput> temporalAttemptExecution =
new RetryingTemporalAttemptExecution<>(
workspaceRoot,
jobRunConfig,
getWorkerFactory(sourceLauncherConfig, destinationLauncherConfig, jobRunConfig, syncInput),
inputSupplier,
new CancellationHandler.TemporalCancellationHandler(),
shouldAttemptAgain,
nextAttemptInput,
MAX_RETRIES);

final List<ReplicationOutput> attemptOutputs = temporalAttemptExecution.get();
final StandardSyncOutput standardSyncOutput = reduceReplicationOutputs(attemptOutputs);

LOGGER.info("attempt summaries: {}", attemptOutputs);
LOGGER.info("sync summary: {}", standardSyncOutput);

return standardSyncOutput;
}

// todo (cgardens) - this operation is lossy (we lose the ability to see the amount of data
// replicated by each attempt). likely in the future, we will want to retain this info and surface
// it.
/**
* aggregate each attempts output into a sync summary.
*/
private static StandardSyncOutput reduceReplicationOutputs(List<ReplicationOutput> attemptOutputs) {
final long totalBytesReplicated = attemptOutputs
.stream()
.map(ReplicationOutput::getReplicationAttemptSummary)
.mapToLong(ReplicationAttemptSummary::getBytesSynced).sum();
final long totalRecordsReplicated = attemptOutputs
.stream()
.map(ReplicationOutput::getReplicationAttemptSummary)
.mapToLong(ReplicationAttemptSummary::getRecordsSynced).sum();
private static StandardSyncOutput reduceReplicationOutput(ReplicationOutput output) {
final long totalBytesReplicated = output.getReplicationAttemptSummary().getBytesSynced();
final long totalRecordsReplicated = output.getReplicationAttemptSummary().getRecordsSynced();

final StandardSyncSummary syncSummary = new StandardSyncSummary();
syncSummary.setBytesSynced(totalBytesReplicated);
syncSummary.setRecordsSynced(totalRecordsReplicated);
syncSummary.setStartTime(attemptOutputs.get(0).getReplicationAttemptSummary().getStartTime());
syncSummary.setEndTime(MoreLists.last(attemptOutputs).orElseThrow().getReplicationAttemptSummary().getEndTime());
syncSummary.setStatus(MoreLists.last(attemptOutputs).orElseThrow().getReplicationAttemptSummary().getStatus());
syncSummary.setStartTime(output.getReplicationAttemptSummary().getStartTime());
syncSummary.setEndTime(output.getReplicationAttemptSummary().getEndTime());
syncSummary.setStatus(output.getReplicationAttemptSummary().getStatus());

final StandardSyncOutput standardSyncOutput = new StandardSyncOutput();
standardSyncOutput.setState(MoreLists.last(attemptOutputs).orElseThrow().getState());
standardSyncOutput.setOutputCatalog(MoreLists.last(attemptOutputs).orElseThrow().getOutputCatalog());
standardSyncOutput.setState(output.getState());
standardSyncOutput.setOutputCatalog(output.getOutputCatalog());
standardSyncOutput.setStandardSyncSummary(syncSummary);

return standardSyncOutput;
Expand Down
Loading

0 comments on commit 9dd05d4

Please sign in to comment.