Skip to content

Commit

Permalink
[GOBBLIN-1646] Revert yarn container / helix tag group changes (apach…
Browse files Browse the repository at this point in the history
…e#3507)

Revert "Fix bug when shrinking the container in Yarn service (apache#3504)"
This reverts commit dd6d910.

Revert "[GOBBLIN-1620]Make yarn container allocation group by helix tag (apache#3487)"
This reverts commit 3e87795.
  • Loading branch information
homatthew authored and jack-moseley committed Aug 24, 2022
1 parent dfc2e3c commit 5c39100
Show file tree
Hide file tree
Showing 13 changed files with 314 additions and 562 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ public class GobblinClusterConfigurationKeys {
public static final String HELIX_JOB_TAG_KEY = GOBBLIN_CLUSTER_PREFIX + "helixJobTag";
public static final String HELIX_PLANNING_JOB_TAG_KEY = GOBBLIN_CLUSTER_PREFIX + "helixPlanningJobTag";
public static final String HELIX_INSTANCE_TAGS_KEY = GOBBLIN_CLUSTER_PREFIX + "helixInstanceTags";
public static final String HELIX_DEFAULT_TAG = "GobblinHelixDefaultTag";

// Helix job quota
public static final String HELIX_JOB_TYPE_KEY = GOBBLIN_CLUSTER_PREFIX + "helixJobType";
Expand Down Expand Up @@ -185,13 +184,6 @@ public class GobblinClusterConfigurationKeys {
public static final String CONTAINER_EXIT_ON_HEALTH_CHECK_FAILURE_ENABLED = GOBBLIN_CLUSTER_PREFIX + "container.exitOnHealthCheckFailure";
public static final boolean DEFAULT_CONTAINER_EXIT_ON_HEALTH_CHECK_FAILURE_ENABLED = false;

// Config to specify the resource requirement for each Gobblin job run, so that helix tasks within this job will
// be assigned to containers with desired resource. This config need to cooperate with helix job tag, so that helix
// cluster knows how to distribute tasks to correct containers.
public static final String HELIX_JOB_CONTAINER_MEMORY_MBS = GOBBLIN_CLUSTER_PREFIX + "job.container.memory.mbs";
public static final String HELIX_JOB_CONTAINER_CORES = GOBBLIN_CLUSTER_PREFIX + "job.container.cores";



//Config to enable/disable reuse of existing Helix Cluster
public static final String HELIX_CLUSTER_OVERWRITE_KEY = GOBBLIN_CLUSTER_PREFIX + "helix.overwrite";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,20 +413,6 @@ JobConfig.Builder translateGobblinJobConfigToHelixJobConfig(JobState gobblinJobS
gobblinJobState.getPropAsLong(GobblinClusterConfigurationKeys.HELIX_WORKFLOW_EXPIRY_TIME_SECONDS,
GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_EXPIRY_TIME_SECONDS));

Map<String, String> jobConfigMap = new HashMap<>();
if (this.jobConfig.hasPath(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_MEMORY_MBS)) {
jobConfigMap.put(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_MEMORY_MBS,
jobConfig.getString(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_MEMORY_MBS));
log.info("Job {} has specific memory requirement:{}, add this config to command config map",
this.jobContext.getJobId(), jobConfig.getString(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_MEMORY_MBS));
}
if (this.jobConfig.hasPath(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_CORES)) {
jobConfigMap.put(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_CORES,
jobConfig.getString(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_CORES));
log.info("Job {} has specific Vcore requirement:{}, add this config to command config map",
this.jobContext.getJobId(), jobConfig.getString(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_CORES));
}
jobConfigBuilder.setJobCommandConfigMap(jobConfigMap);
return jobConfigBuilder;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,9 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -542,25 +540,16 @@ void connectHelixManagerWithRetry() {
* the job with EXAMPLE_INSTANCE_TAG will remain in the ZK until an instance with EXAMPLE_INSTANCE_TAG was found.
*/
private void addInstanceTags() {
List<String> tags = ConfigUtils.getStringList(this.clusterConfig, GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY);
HelixManager receiverManager = getReceiverManager();
if (receiverManager.isConnected()) {
// The helix instance associated with this container should be consistent on helix tag
List<String> existedTags = receiverManager.getClusterManagmentTool()
.getInstanceConfig(this.clusterName, this.helixInstanceName).getTags();
Set<String> desiredTags = new HashSet<>(
ConfigUtils.getStringList(this.clusterConfig, GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY));
if (!desiredTags.isEmpty()) {
// Remove tag assignments for the current Helix instance from a previous run
for (String tag : existedTags) {
if (!desiredTags.contains(tag))
receiverManager.getClusterManagmentTool().removeInstanceTag(this.clusterName, this.helixInstanceName, tag);
logger.info("Removed unrelated helix tag {} for instance {}", tag, this.helixInstanceName);
}
desiredTags.forEach(desiredTag -> receiverManager.getClusterManagmentTool()
.addInstanceTag(this.clusterName, this.helixInstanceName, desiredTag));
if (!tags.isEmpty()) {
logger.info("Adding tags binding " + tags);
tags.forEach(tag -> receiverManager.getClusterManagmentTool()
.addInstanceTag(this.clusterName, this.helixInstanceName, tag));
logger.info("Actual tags binding " + receiverManager.getClusterManagmentTool()
.getInstanceConfig(this.clusterName, this.helixInstanceName).getTags());
}
logger.info("Actual tags binding " + receiverManager.getClusterManagmentTool()
.getInstanceConfig(this.clusterName, this.helixInstanceName).getTags());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,21 @@

package org.apache.gobblin.yarn;

import com.google.common.base.Strings;
import java.util.ArrayDeque;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.PropertyKey;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.JobContext;
import org.apache.helix.task.JobDag;
import org.apache.helix.task.TaskDriver;
Expand Down Expand Up @@ -72,13 +67,14 @@ public class YarnAutoScalingManager extends AbstractIdleService {
// Only one container will be requested for each N partitions of work
private final String AUTO_SCALING_PARTITIONS_PER_CONTAINER = AUTO_SCALING_PREFIX + "partitionsPerContainer";
private final int DEFAULT_AUTO_SCALING_PARTITIONS_PER_CONTAINER = 1;
private final String AUTO_SCALING_MIN_CONTAINERS = AUTO_SCALING_PREFIX + "minContainers";
private final int DEFAULT_AUTO_SCALING_MIN_CONTAINERS = 1;
private final String AUTO_SCALING_MAX_CONTAINERS = AUTO_SCALING_PREFIX + "maxContainers";
private final String AUTO_SCALING_CONTAINER_OVERPROVISION_FACTOR = AUTO_SCALING_PREFIX + "overProvisionFactor";
private final double DEFAULT_AUTO_SCALING_CONTAINER_OVERPROVISION_FACTOR = 1.0;
// The cluster level default tags for Helix instances
private final String defaultHelixInstanceTags;
private final int defaultContainerMemoryMbs;
private final int defaultContainerCores;

// A rough value of how much containers should be an intolerable number.
private final int DEFAULT_AUTO_SCALING_MAX_CONTAINERS = Integer.MAX_VALUE;
private final String AUTO_SCALING_INITIAL_DELAY = AUTO_SCALING_PREFIX + "initialDelay";
private final int DEFAULT_AUTO_SCALING_INITIAL_DELAY_SECS = 60;

Expand All @@ -91,6 +87,8 @@ public class YarnAutoScalingManager extends AbstractIdleService {
private final ScheduledExecutorService autoScalingExecutor;
private final YarnService yarnService;
private final int partitionsPerContainer;
private final int minContainers;
private final int maxContainers;
private final double overProvisionFactor;
private final SlidingWindowReservoir slidingFixedSizeWindow;
private static int maxIdleTimeInMinutesBeforeScalingDown = DEFAULT_MAX_IDLE_TIME_BEFORE_SCALING_DOWN_MINUTES;
Expand All @@ -105,20 +103,31 @@ public YarnAutoScalingManager(GobblinApplicationMaster appMaster) {
Preconditions.checkArgument(this.partitionsPerContainer > 0,
AUTO_SCALING_PARTITIONS_PER_CONTAINER + " needs to be greater than 0");

this.minContainers = ConfigUtils.getInt(this.config, AUTO_SCALING_MIN_CONTAINERS,
DEFAULT_AUTO_SCALING_MIN_CONTAINERS);

Preconditions.checkArgument(this.minContainers > 0,
DEFAULT_AUTO_SCALING_MIN_CONTAINERS + " needs to be greater than 0");

this.maxContainers = ConfigUtils.getInt(this.config, AUTO_SCALING_MAX_CONTAINERS,
DEFAULT_AUTO_SCALING_MAX_CONTAINERS);

this.overProvisionFactor = ConfigUtils.getDouble(this.config, AUTO_SCALING_CONTAINER_OVERPROVISION_FACTOR,
DEFAULT_AUTO_SCALING_CONTAINER_OVERPROVISION_FACTOR);

Preconditions.checkArgument(this.maxContainers > 0,
DEFAULT_AUTO_SCALING_MAX_CONTAINERS + " needs to be greater than 0");

Preconditions.checkArgument(this.maxContainers >= this.minContainers,
DEFAULT_AUTO_SCALING_MAX_CONTAINERS + " needs to be greater than or equal to "
+ DEFAULT_AUTO_SCALING_MIN_CONTAINERS);

this.slidingFixedSizeWindow = config.hasPath(AUTO_SCALING_WINDOW_SIZE)
? new SlidingWindowReservoir(config.getInt(AUTO_SCALING_WINDOW_SIZE), Integer.MAX_VALUE)
: new SlidingWindowReservoir(Integer.MAX_VALUE);
? new SlidingWindowReservoir(maxContainers, config.getInt(AUTO_SCALING_WINDOW_SIZE))
: new SlidingWindowReservoir(maxContainers);

this.autoScalingExecutor = Executors.newSingleThreadScheduledExecutor(
ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("AutoScalingExecutor")));

this.defaultHelixInstanceTags = ConfigUtils.getString(config,
GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY, GobblinClusterConfigurationKeys.HELIX_DEFAULT_TAG);
this.defaultContainerMemoryMbs = config.getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY);
this.defaultContainerCores = config.getInt(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY);
}

@Override
Expand All @@ -131,10 +140,9 @@ protected void startUp() {
log.info("Scheduling the auto scaling task with an interval of {} seconds", scheduleInterval);

this.autoScalingExecutor.scheduleAtFixedRate(new YarnAutoScalingRunnable(new TaskDriver(this.helixManager),
this.yarnService, this.partitionsPerContainer, this.overProvisionFactor,
this.slidingFixedSizeWindow, this.helixManager.getHelixDataAccessor(), this.defaultHelixInstanceTags,
this.defaultContainerMemoryMbs, this.defaultContainerCores),
initialDelay, scheduleInterval, TimeUnit.SECONDS);
this.yarnService, this.partitionsPerContainer, this.minContainers, this.maxContainers, this.overProvisionFactor,
this.slidingFixedSizeWindow, this.helixManager.getHelixDataAccessor()), initialDelay, scheduleInterval,
TimeUnit.SECONDS);
}

@Override
Expand All @@ -154,13 +162,11 @@ static class YarnAutoScalingRunnable implements Runnable {
private final TaskDriver taskDriver;
private final YarnService yarnService;
private final int partitionsPerContainer;
private final int minContainers;
private final int maxContainers;
private final double overProvisionFactor;
private final SlidingWindowReservoir slidingWindowReservoir;
private final HelixDataAccessor helixDataAccessor;
private final String defaultHelixInstanceTags;
private final int defaultContainerMemoryMbs;
private final int defaultContainerCores;

/**
* A static map that keep track of an idle instance and its latest beginning idle time.
* If an instance is no longer idle when inspected, it will be dropped from this map.
Expand Down Expand Up @@ -196,7 +202,8 @@ private Set<String> getParticipants(String filterString) {
@VisibleForTesting
void runInternal() {
Set<String> inUseInstances = new HashSet<>();
YarnContainerRequestBundle yarnContainerRequestBundle = new YarnContainerRequestBundle();

int numPartitions = 0;
for (Map.Entry<String, WorkflowConfig> workFlowEntry : taskDriver.getWorkflows().entrySet()) {
WorkflowContext workflowContext = taskDriver.getWorkflowContext(workFlowEntry.getKey());

Expand All @@ -210,42 +217,24 @@ void runInternal() {

WorkflowConfig workflowConfig = workFlowEntry.getValue();
JobDag jobDag = workflowConfig.getJobDag();

Set<String> jobs = jobDag.getAllNodes();

// sum up the number of partitions
for (String jobName : jobs) {
JobContext jobContext = taskDriver.getJobContext(jobName);
JobConfig jobConfig = taskDriver.getJobConfig(jobName);
Resource resource = Resource.newInstance(this.defaultContainerMemoryMbs, this.defaultContainerCores);
int numPartitions = 0;
String jobTag = defaultHelixInstanceTags;

if (jobContext != null) {
log.debug("JobContext {} num partitions {}", jobContext, jobContext.getPartitionSet().size());

inUseInstances.addAll(jobContext.getPartitionSet().stream().map(jobContext::getAssignedParticipant)
.filter(Objects::nonNull).collect(Collectors.toSet()));

numPartitions = jobContext.getPartitionSet().size();
// Job level config for helix instance tags takes precedence over other tag configurations
if (jobConfig != null) {
if (!Strings.isNullOrEmpty(jobConfig.getInstanceGroupTag())) {
jobTag = jobConfig.getInstanceGroupTag();
}
Map<String, String> jobCommandConfigMap = jobConfig.getJobCommandConfigMap();
if(jobCommandConfigMap.containsKey(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_MEMORY_MBS)){
resource.setMemory(Integer.parseInt(jobCommandConfigMap.get(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_MEMORY_MBS)));
}
if(jobCommandConfigMap.containsKey(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_CORES)){
resource.setVirtualCores(Integer.parseInt(jobCommandConfigMap.get(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_CORES)));
}
}
.filter(e -> e != null).collect(Collectors.toSet()));

numPartitions += jobContext.getPartitionSet().size();
}
// compute the container count as a ceiling of number of partitions divided by the number of containers
// per partition. Scale the result by a constant overprovision factor.
int containerCount = (int) Math.ceil(((double)numPartitions / this.partitionsPerContainer) * this.overProvisionFactor);
yarnContainerRequestBundle.add(jobTag, containerCount, resource);
}
}

// Find all participants appearing in this cluster. Note that Helix instances can contain cluster-manager
// and potentially replanner-instance.
Set<String> allParticipants = getParticipants(HELIX_YARN_INSTANCE_NAME_PREFIX);
Expand All @@ -264,11 +253,17 @@ void runInternal() {
instanceIdleSince.remove(participant);
}
}
slidingWindowReservoir.add(yarnContainerRequestBundle);

log.debug("There are {} containers being requested in total, tag-count map {}, tag-resource map {}",
yarnContainerRequestBundle.getTotalContainers(), yarnContainerRequestBundle.getHelixTagContainerCountMap(),
yarnContainerRequestBundle.getHelixTagResourceMap());
// compute the target containers as a ceiling of number of partitions divided by the number of containers
// per partition. Scale the result by a constant overprovision factor.
int numTargetContainers = (int) Math.ceil(((double)numPartitions / this.partitionsPerContainer) * this.overProvisionFactor);

// adjust the number of target containers based on the configured min and max container values.
numTargetContainers = Math.max(this.minContainers, Math.min(this.maxContainers, numTargetContainers));

slidingWindowReservoir.add(numTargetContainers);

log.info("There are {} containers being requested", numTargetContainers);

this.yarnService.requestTargetNumberOfContainers(slidingWindowReservoir.getMax(), inUseInstances);
}
Expand All @@ -295,8 +290,8 @@ boolean isInstanceUnused(String participant){
* which captures max value. It is NOT built for general purpose.
*/
static class SlidingWindowReservoir {
private ArrayDeque<YarnContainerRequestBundle> fifoQueue;
private PriorityQueue<YarnContainerRequestBundle> priorityQueue;
private ArrayDeque<Integer> fifoQueue;
private PriorityQueue<Integer> priorityQueue;

// Queue Size
private int maxSize;
Expand All @@ -311,11 +306,10 @@ public SlidingWindowReservoir(int maxSize, int upperBound) {
this.maxSize = maxSize;
this.upperBound = upperBound;
this.fifoQueue = new ArrayDeque<>(maxSize);
this.priorityQueue = new PriorityQueue<>(maxSize, new Comparator<YarnContainerRequestBundle>() {
this.priorityQueue = new PriorityQueue<>(maxSize, new Comparator<Integer>() {
@Override
public int compare(YarnContainerRequestBundle o1, YarnContainerRequestBundle o2) {
Integer i2 = o2.getTotalContainers();
return i2.compareTo(o1.getTotalContainers());
public int compare(Integer o1, Integer o2) {
return o2.compareTo(o1);
}
});
}
Expand All @@ -329,14 +323,14 @@ public SlidingWindowReservoir(int upperBound) {
* When a new element is larger than upperbound, reject the value since we may request too many Yarn containers.
* When queue is full, evict head of FIFO-queue (In FIFO queue, elements are inserted from tail).
*/
public void add(YarnContainerRequestBundle e) {
if (e.getTotalContainers() > upperBound) {
public void add(int e) {
if (e > upperBound) {
log.error(String.format("Request of getting %s containers seems to be excessive, rejected", e));
return;
}

if (fifoQueue.size() == maxSize) {
YarnContainerRequestBundle removedElement = fifoQueue.remove();
Integer removedElement = fifoQueue.remove();
priorityQueue.remove(removedElement);
}

Expand All @@ -351,7 +345,7 @@ public void add(YarnContainerRequestBundle e) {
/**
* If queue is empty, throw {@link IllegalStateException}.
*/
public YarnContainerRequestBundle getMax() {
public int getMax() {
if (priorityQueue.size() > 0) {
return this.priorityQueue.peek();
} else {
Expand Down
Loading

0 comments on commit 5c39100

Please sign in to comment.