diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java index 4ed63b74440..5f5c872c67e 100644 --- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java +++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java @@ -30,6 +30,8 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; + +import org.apache.commons.compress.utils.Sets; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; @@ -38,6 +40,7 @@ import org.apache.helix.task.JobContext; import org.apache.helix.task.JobDag; import org.apache.helix.task.TaskDriver; +import org.apache.helix.task.TaskPartitionState; import org.apache.helix.task.TaskState; import org.apache.helix.task.WorkflowConfig; import org.apache.helix.task.WorkflowContext; @@ -68,6 +71,7 @@ public class YarnAutoScalingManager extends AbstractIdleService { private final String AUTO_SCALING_PREFIX = GobblinYarnConfigurationKeys.GOBBLIN_YARN_PREFIX + "autoScaling."; private final String AUTO_SCALING_POLLING_INTERVAL_SECS = AUTO_SCALING_PREFIX + "pollingIntervalSeconds"; + private static final int THRESHOLD_NUMBER_OF_ATTEMPTS_FOR_LOGGING = 20; private final int DEFAULT_AUTO_SCALING_POLLING_INTERVAL_SECS = 60; // Only one container will be requested for each N partitions of work private final String AUTO_SCALING_PARTITIONS_PER_CONTAINER = AUTO_SCALING_PREFIX + "partitionsPerContainer"; @@ -94,6 +98,8 @@ public class YarnAutoScalingManager extends AbstractIdleService { private final double overProvisionFactor; private final SlidingWindowReservoir slidingFixedSizeWindow; private static int maxIdleTimeInMinutesBeforeScalingDown = DEFAULT_MAX_IDLE_TIME_BEFORE_SCALING_DOWN_MINUTES; + private static final HashSet + UNUSUAL_HELIX_TASK_STATES = Sets.newHashSet(TaskPartitionState.ERROR, TaskPartitionState.DROPPED); public YarnAutoScalingManager(GobblinApplicationMaster appMaster) { this.config = appMaster.getConfig(); @@ -189,6 +195,21 @@ private Set getParticipants(String filterString) { .keySet().stream().filter(x -> filterString.isEmpty() || x.contains(filterString)).collect(Collectors.toSet()); } + private String getInuseParticipantForHelixPartition(JobContext jobContext, int partition) { + if (jobContext.getPartitionNumAttempts(partition) > THRESHOLD_NUMBER_OF_ATTEMPTS_FOR_LOGGING) { + log.warn("Helix task {} has been retried for {} times, please check the config to see how we can handle this task better", + jobContext.getTaskIdForPartition(partition), jobContext.getPartitionNumAttempts(partition)); + } + if (!UNUSUAL_HELIX_TASK_STATES.contains(jobContext.getPartitionState(partition))) { + return jobContext.getAssignedParticipant(partition); + } + // adding log here now for debugging + //todo: if this happens frequently, we should reset to status to retriable or at least report the error earlier + log.info("Helix task {} is in {} state which is unexpected, please watch out to see if this get recovered", + jobContext.getTaskIdForPartition(partition), jobContext.getPartitionState(partition)); + return null; + } + /** * Iterate through the workflows configured in Helix to figure out the number of required partitions * and request the {@link YarnService} to scale to the desired number of containers. @@ -220,7 +241,10 @@ void runInternal() { int numPartitions = 0; String jobTag = defaultHelixInstanceTags; if (jobContext != null) { - inUseInstances.addAll(jobContext.getPartitionSet().stream().map(jobContext::getAssignedParticipant) + log.debug("JobContext {} num partitions {}", jobContext, jobContext.getPartitionSet().size()); + + inUseInstances.addAll(jobContext.getPartitionSet().stream() + .map(i -> getInuseParticipantForHelixPartition(jobContext, i)) .filter(Objects::nonNull).collect(Collectors.toSet())); numPartitions = jobContext.getPartitionSet().size();