Skip to content

Commit

Permalink
[GOBBLIN-1585]GaaS (DagManager) keep retrying a failed job beyond max…
Browse files Browse the repository at this point in the history
… attempt number (#3439)

* [GOBBLIN-1585]GaaS (DagManager) keep retrying a failed job beyond max attempt number

* address comments

* adding current attempts in job config and cluster events

* add generation into job status

* address comments

* change comments

* address comments
  • Loading branch information
ZihanLi58 authored Dec 17, 2021
1 parent 27c9e0e commit 59df057
Show file tree
Hide file tree
Showing 12 changed files with 124 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ public class ConfigurationKeys {
public static final String JOB_GROUP_KEY = "job.group";
public static final String JOB_TAG_KEY = "job.tag";
public static final String JOB_DESCRIPTION_KEY = "job.description";
public static final String JOB_CURRENT_ATTEMPTS = "job.currentAttempts";
public static final String JOB_CURRENT_GENERATION = "job.currentGeneration";
// Job launcher type
public static final String JOB_LAUNCHER_TYPE_KEY = "launcher.type";
public static final String JOB_SCHEDULE_KEY = "job.schedule";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,15 @@ private static List<? extends Tag<?>> addAdditionalMetadataTags(Properties jobPr
jobProps.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, jobExecutionId)));
}

if (jobProps.containsKey(ConfigurationKeys.JOB_CURRENT_ATTEMPTS)) {
metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_ATTEMPTS, "")));
metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_GENERATION, "")));
metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD,
"false"));
}

metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,
jobProps.getProperty(ConfigurationKeys.JOB_GROUP_KEY, "")));
metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ public static class FlowEventConstants {
public static final String PROCESSED_COUNT_FIELD = "processedCount";
public static final String MAX_ATTEMPTS_FIELD = "maxAttempts";
public static final String CURRENT_ATTEMPTS_FIELD = "currentAttempts";
//This state should always move forward, more details can be found in method {@link KafkaJobStatusMonitor.addJobStatusToStateStore}
public static final String CURRENT_GENERATION_FIELD = "currentGeneration";
public static final String SHOULD_RETRY_FIELD = "shouldRetry";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,15 @@ private static List<? extends Tag<?>> addAdditionalMetadataTags(Properties jobPr
metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
jobProps.getProperty(ConfigurationKeys.FLOW_NAME_KEY)));

if (jobProps.containsKey(ConfigurationKeys.JOB_CURRENT_ATTEMPTS)) {
metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_ATTEMPTS, "")));
metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_GENERATION, "")));
metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD,
"false"));
}

// use job execution id if flow execution id is not present
metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
jobProps.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, jobExecutionId)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class JobStatus {
private final String highWatermark;
private final int maxAttempts;
private final int currentAttempts;
private final int currentGeneration;
private final boolean shouldRetry;
private final Supplier<List<Issue>> issues;
private final int progressPercentage;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ protected JobStatus getJobStatus(State jobState) {
long processedCount = Long.parseLong(jobState.getProp(TimingEvent.FlowEventConstants.PROCESSED_COUNT_FIELD, "0"));
int maxAttempts = Integer.parseInt(jobState.getProp(TimingEvent.FlowEventConstants.MAX_ATTEMPTS_FIELD, "1"));
int currentAttempts = Integer.parseInt(jobState.getProp(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, "1"));
int currentGeneration = Integer.parseInt(jobState.getProp(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD, "1"));
boolean shouldRetry = Boolean.parseBoolean(jobState.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD, "false"));
int progressPercentage = jobState.getPropAsInt(TimingEvent.JOB_COMPLETION_PERCENTAGE, 0);
long lastProgressEventTime = jobState.getPropAsLong(TimingEvent.JOB_LAST_PROGRESS_EVENT_TIME, 0);
Expand All @@ -146,7 +147,7 @@ protected JobStatus getJobStatus(State jobState) {
return JobStatus.builder().flowName(flowName).flowGroup(flowGroup).flowExecutionId(flowExecutionId).
jobName(jobName).jobGroup(jobGroup).jobTag(jobTag).jobExecutionId(jobExecutionId).eventName(eventName).
lowWatermark(lowWatermark).highWatermark(highWatermark).orchestratedTime(orchestratedTime).startTime(startTime).endTime(endTime).
message(message).processedCount(processedCount).maxAttempts(maxAttempts).currentAttempts(currentAttempts).
message(message).processedCount(processedCount).maxAttempts(maxAttempts).currentAttempts(currentAttempts).currentGeneration(currentGeneration).
shouldRetry(shouldRetry).progressPercentage(progressPercentage).lastProgressEventTime(lastProgressEventTime).
issues(jobIssues).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,7 @@ private void beginResumingDag(String dagId) throws IOException {
node.getValue().setExecutionStatus(PENDING_RESUME);
// reset currentAttempts because we do not want to count previous execution's attempts in deciding whether to retry a job
node.getValue().setCurrentAttempts(0);
DagManagerUtils.incrementJobGeneration(node);
Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), node.getValue());
this.eventSubmitter.get().getTimingEvent(TimingEvent.LauncherTimings.JOB_PENDING_RESUME).stop(jobMetadata);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.gobblin.service.modules.orchestration;

import com.google.common.collect.ImmutableMap;
import com.typesafe.config.ConfigFactory;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -148,7 +150,12 @@ static JobExecutionPlan getJobExecutionPlan(DagNode<JobExecutionPlan> dagNode) {
}

public static JobSpec getJobSpec(DagNode<JobExecutionPlan> dagNode) {
return dagNode.getValue().getJobSpec();
JobSpec jobSpec = dagNode.getValue().getJobSpec();
Map<String, Integer> configWithCurrentAttempts = ImmutableMap.of(ConfigurationKeys.JOB_CURRENT_ATTEMPTS, dagNode.getValue().getCurrentAttempts(),
ConfigurationKeys.JOB_CURRENT_GENERATION, dagNode.getValue().getCurrentGeneration());
//Return new spec with new config to avoid change the reference to dagNode
return new JobSpec(jobSpec.getUri(), jobSpec.getVersion(), jobSpec.getDescription(), ConfigFactory.parseMap(configWithCurrentAttempts).withFallback(jobSpec.getConfig()),
jobSpec.getConfigAsProperties(), jobSpec.getTemplateURI(), jobSpec.getJobTemplate(), jobSpec.getMetadata());
}

static Config getJobConfig(DagNode<JobExecutionPlan> dagNode) {
Expand Down Expand Up @@ -236,6 +243,15 @@ static void incrementJobAttempt(DagNode<JobExecutionPlan> dagNode) {
dagNode.getValue().setCurrentAttempts(dagNode.getValue().getCurrentAttempts() + 1);
}

/**
* Increment the value of {@link JobExecutionPlan#currentGeneration}
* This method is not thread safe, we achieve correctness by making sure
* one dag will only be handled in the same DagManagerThread
*/
static void incrementJobGeneration(DagNode<JobExecutionPlan> dagNode) {
dagNode.getValue().setCurrentGeneration(dagNode.getValue().getCurrentGeneration() + 1);
}

/**
* Flow start time is the same as the flow execution id which is the timestamp flow request was received, unless it
* is a resumed flow, in which case it is {@link JobExecutionPlan#getFlowStartTime()}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ static Map<String, String> getJobMetadata(Map<String, String> flowMetadata, JobE
jobMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, specExecutor.getClass().getCanonicalName());
jobMetadata.put(TimingEvent.FlowEventConstants.MAX_ATTEMPTS_FIELD, Integer.toString(jobExecutionPlan.getMaxAttempts()));
jobMetadata.put(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, Integer.toString(jobExecutionPlan.getCurrentAttempts()));
jobMetadata.put(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD, Integer.toString(jobExecutionPlan.getCurrentGeneration()));
jobMetadata.put(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD, Boolean.toString(false));

return jobMetadata;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public class JobExecutionPlan {
private final SpecExecutor specExecutor;
private ExecutionStatus executionStatus = ExecutionStatus.PENDING;
private final int maxAttempts;
private int currentGeneration = 1;
private int currentAttempts = 0;
private Optional<Future> jobFuture = Optional.absent();
private long flowStartTime = 0L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,14 +253,25 @@ static void addJobStatusToStateStore(org.apache.gobblin.configuration.State jobS

List<org.apache.gobblin.configuration.State> states = stateStore.getAll(storeName, tableName);
if (states.size() > 0) {
String previousStatus = states.get(states.size() - 1).getProp(JobStatusRetriever.EVENT_NAME_FIELD);
org.apache.gobblin.configuration.State previousJobStatus = states.get(states.size() - 1);
String previousStatus = previousJobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
String currentStatus = jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);

// PENDING_RESUME is allowed to override, because it happens when a flow is being resumed from previously being failed
if (previousStatus != null && currentStatus != null && !currentStatus.equals(ExecutionStatus.PENDING_RESUME.name())
&& ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(currentStatus)) < ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(previousStatus))) {
log.warn(String.format("Received status %s when status is already %s for flow (%s, %s, %s), job (%s, %s)",
currentStatus, previousStatus, flowGroup, flowName, flowExecutionId, jobGroup, jobName));
int previousGeneration = previousJobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD, 1);
// This is to make the change backward compatible as we may not have this info in cluster events
// If we does not have those info, we treat the event as coming from the same attempts as previous one
int currentGeneration = jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD, previousGeneration);
int previousAttempts = previousJobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, 1);
int currentAttempts = jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, previousAttempts);
// We use three things to accurately count and thereby bound retries, even amidst out-of-order events (by skipping late arrivals).
// The generation is monotonically increasing, while the attempts may re-initialize back to 0. this two-part form prevents the composite value from ever repeating.
// And job status reflect the execution status in one attempt
if (previousStatus != null && currentStatus != null &&
(previousGeneration > currentGeneration
|| (previousGeneration == currentGeneration && previousAttempts > currentAttempts)
|| (previousGeneration == currentGeneration && previousAttempts == currentAttempts
&& ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(currentStatus)) < ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(previousStatus))))){
log.warn(String.format("Received status [generation.attempts] = %s [%s.%s] when already %s [%s.%s] for flow (%s, %s, %s), job (%s, %s)",
currentStatus, currentGeneration, currentAttempts, previousStatus, previousGeneration, previousAttempts, flowGroup, flowName, flowExecutionId, jobGroup, jobName));
jobStatus = mergeState(states.get(states.size() - 1), jobStatus);
} else {
jobStatus = mergeState(jobStatus, states.get(states.size() - 1));
Expand All @@ -275,7 +286,9 @@ static void addJobStatusToStateStore(org.apache.gobblin.configuration.State jobS
private static void modifyStateIfRetryRequired(org.apache.gobblin.configuration.State state) {
int maxAttempts = state.getPropAsInt(TimingEvent.FlowEventConstants.MAX_ATTEMPTS_FIELD, 1);
int currentAttempts = state.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, 1);
if (state.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.FAILED.name()) && currentAttempts < maxAttempts) {
// SHOULD_RETRY_FIELD maybe reset by JOB_COMPLETION_PERCENTAGE event
if ((state.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.FAILED.name())
|| state.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.PENDING_RETRY.name())) && currentAttempts < maxAttempts) {
state.setProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD, true);
state.setProp(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.PENDING_RETRY.name());
state.removeProp(TimingEvent.JOB_END_TIME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,19 +60,21 @@ public abstract class JobStatusRetrieverTest {
abstract void setUp() throws Exception;

protected void addJobStatusToStateStore(long flowExecutionId, String jobName, String status) throws IOException {
addFlowIdJobStatusToStateStore(FLOW_GROUP, FLOW_NAME, flowExecutionId, jobName, status, 0, 0);
addFlowIdJobStatusToStateStore(FLOW_GROUP, FLOW_NAME, flowExecutionId, jobName, status, 0, 0, new Properties());
}

protected void addFlowIdJobStatusToStateStore(String flowGroup, String flowName, long flowExecutionId, String jobName, String status) throws IOException {
addFlowIdJobStatusToStateStore(flowGroup, flowName, flowExecutionId, jobName, status, 0, 0);
addFlowIdJobStatusToStateStore(flowGroup, flowName, flowExecutionId, jobName, status, 0, 0, new Properties());
}

protected void addJobStatusToStateStore(long flowExecutionId, String jobName, String status, long startTime, long endTime) throws IOException {
addFlowIdJobStatusToStateStore(FLOW_GROUP, FLOW_NAME, flowExecutionId, jobName, status, startTime, endTime);
addFlowIdJobStatusToStateStore(FLOW_GROUP, FLOW_NAME, flowExecutionId, jobName, status, startTime, endTime, new Properties());
}
protected void addJobStatusToStateStore(long flowExecutionId, String jobName, String status, long startTime, long endTime, Properties properties) throws IOException {
addFlowIdJobStatusToStateStore(FLOW_GROUP, FLOW_NAME, flowExecutionId, jobName, status, startTime, endTime, properties);
}

protected void addFlowIdJobStatusToStateStore(String flowGroup, String flowName, long flowExecutionId, String jobName, String status, long startTime, long endTime) throws IOException {
Properties properties = new Properties();
protected void addFlowIdJobStatusToStateStore(String flowGroup, String flowName, long flowExecutionId, String jobName, String status, long startTime, long endTime, Properties properties) throws IOException {
properties.setProperty(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, flowGroup);
properties.setProperty(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, flowName);
properties.setProperty(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, String.valueOf(flowExecutionId));
Expand All @@ -98,6 +100,57 @@ protected void addFlowIdJobStatusToStateStore(String flowGroup, String flowName,
KafkaJobStatusMonitor.addJobStatusToStateStore(jobStatus, this.jobStatusRetriever.getStateStore());
}

static Properties createAttemptsProperties(int currGen, int currAttempts, boolean shouldRetry) {
Properties properties = new Properties();
properties.setProperty(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD, String.valueOf(currGen));
properties.setProperty(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, String.valueOf(currAttempts));
properties.setProperty(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD, String.valueOf(shouldRetry));
return properties;
}
@Test (dependsOnMethods = "testGetLatestExecutionIdsForFlow")
public void testOutOfOrderJobTimingEventsForRetryingJob() throws IOException {
long flowExecutionId = 1240L;
Properties properties = createAttemptsProperties(1, 0, false);
addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.RUNNING.name(), JOB_START_TIME, JOB_START_TIME, properties);
addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.ORCHESTRATED.name(), JOB_ORCHESTRATED_TIME, JOB_ORCHESTRATED_TIME, properties);
addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.FAILED.name(), 0, 0, properties);
Iterator<JobStatus>
jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId);
JobStatus jobStatus = jobStatusIterator.next();
if (jobStatus.getJobName().equals(JobStatusRetriever.NA_KEY)) {
jobStatus = jobStatusIterator.next();
}
Assert.assertEquals(jobStatus.getEventName(), ExecutionStatus.PENDING_RETRY.name());
Assert.assertEquals(jobStatus.isShouldRetry(), true);
properties = createAttemptsProperties(1, 1, false);
addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.RUNNING.name(), JOB_START_TIME, JOB_START_TIME, properties);
addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.ORCHESTRATED.name(), JOB_ORCHESTRATED_TIME, JOB_ORCHESTRATED_TIME, properties);
jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId);
jobStatus = jobStatusIterator.next();
if (jobStatus.getJobName().equals(JobStatusRetriever.NA_KEY)) {
jobStatus = jobStatusIterator.next();
}
Assert.assertEquals(jobStatus.getEventName(), ExecutionStatus.RUNNING.name());
Assert.assertEquals(jobStatus.isShouldRetry(), false);
Assert.assertEquals(jobStatus.getCurrentAttempts(), 1);
Properties properties_new = createAttemptsProperties(2, 0, false);
addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.PENDING_RESUME.name(), JOB_START_TIME, JOB_START_TIME, properties_new);
addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.COMPLETE.name(), JOB_END_TIME, JOB_END_TIME, properties);
jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId);
jobStatus = jobStatusIterator.next();
if (jobStatus.getJobName().equals(JobStatusRetriever.NA_KEY)) {
jobStatus = jobStatusIterator.next();
}
Assert.assertEquals(jobStatus.getEventName(), ExecutionStatus.PENDING_RESUME.name());
addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.COMPLETE.name(), JOB_END_TIME, JOB_END_TIME, properties_new);
jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId);
jobStatus = jobStatusIterator.next();
if (jobStatus.getJobName().equals(JobStatusRetriever.NA_KEY)) {
jobStatus = jobStatusIterator.next();
}
Assert.assertEquals(jobStatus.getEventName(), ExecutionStatus.COMPLETE.name());
}

@Test
public void testGetJobStatusesForFlowExecution() throws IOException {
long flowExecutionId = 1234L;
Expand Down Expand Up @@ -180,7 +233,6 @@ public void testOutOfOrderJobTimingEvents() throws IOException {
Assert.assertEquals(jobStatus.getEndTime(), JOB_END_TIME);
Assert.assertEquals(jobStatus.getOrchestratedTime(), JOB_ORCHESTRATED_TIME);
}

@Test (dependsOnMethods = "testJobTiming")
public void testGetJobStatusesForFlowExecution1() {
long flowExecutionId = 1234L;
Expand Down

0 comments on commit 59df057

Please sign in to comment.