Skip to content

Commit

Permalink
remove lib not used, add test case
Browse files Browse the repository at this point in the history
address comments
  • Loading branch information
hanghangliu committed Apr 21, 2022
1 parent b77d68b commit ea22d96
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ private void addInstanceTags() {
Set<String> desiredTags = new HashSet<>(
ConfigUtils.getStringList(this.clusterConfig, GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY));
if (!desiredTags.isEmpty()) {
// Remove unrelated tags
// If a helix instance already have tag assigned during last run, it won't be auto removed. Need to remove unwanted tags
for (String tag : existedTags) {
if (!desiredTags.contains(tag))
receiverManager.getClusterManagmentTool().removeInstanceTag(this.clusterName, this.helixInstanceName, tag);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ private Set<String> getParticipants(String filterString) {
void runInternal() {
Set<String> inUseInstances = new HashSet<>();
YarnContainerRequestBundle yarnContainerRequestBundle = new YarnContainerRequestBundle();
int numPartitions = 0;
int numTargetContainers = 0;
for (Map.Entry<String, WorkflowConfig> workFlowEntry : taskDriver.getWorkflows().entrySet()) {
WorkflowContext workflowContext = taskDriver.getWorkflowContext(workFlowEntry.getKey());

Expand All @@ -244,16 +244,15 @@ void runInternal() {
JobContext jobContext = taskDriver.getJobContext(jobName);
JobConfig jobConfig = taskDriver.getJobConfig(jobName);
Resource resource = Resource.newInstance(this.defaultContainerMemoryMbs, this.defaultContainerCores);
int partitions = 0;
String jobTag =helixInstanceTags;
int numPartitions = 0;
String jobTag = helixInstanceTags;
if (jobContext != null) {
log.debug("JobContext {} num partitions {}", jobContext, jobContext.getPartitionSet().size());

inUseInstances.addAll(jobContext.getPartitionSet().stream().map(jobContext::getAssignedParticipant)
.filter(Objects::nonNull).collect(Collectors.toSet()));

numPartitions += jobContext.getPartitionSet().size();
partitions += jobContext.getPartitionSet().size();
numPartitions = jobContext.getPartitionSet().size();
// Respect job level config for helix instance tag, specific resource requirement if there's any
if (jobConfig != null) {
if (!Strings.isNullOrEmpty(jobConfig.getInstanceGroupTag())) {
Expand All @@ -268,15 +267,13 @@ void runInternal() {
}
}
}
int containerCount = (int) Math.ceil(((double)partitions / this.partitionsPerContainer) * this.overProvisionFactor);
YarnHelixUtils.ensureResourceFitMaxCapacity(this.yarnService.getMaxResourceCapacity(), resource);
// compute the container count as a ceiling of number of partitions divided by the number of containers
// per partition. Scale the result by a constant overprovision factor.
int containerCount = (int) Math.ceil(((double)numPartitions / this.partitionsPerContainer) * this.overProvisionFactor);
numTargetContainers += containerCount;
yarnContainerRequestBundle.add(jobTag, containerCount, resource);
}
}
log.debug("The yarn container request bundle has config helixTagResourceMap {}, with the helixTagContainerCountMap as {}, "
+ "resourceHelixTagMap as {}",
yarnContainerRequestBundle.getHelixTagResourceMap(), yarnContainerRequestBundle.getHelixTagContainerCountMap(),
yarnContainerRequestBundle.getResourceHelixTagMap());
// Find all participants appearing in this cluster. Note that Helix instances can contain cluster-manager
// and potentially replanner-instance.
Set<String> allParticipants = getParticipants(HELIX_YARN_INSTANCE_NAME_PREFIX);
Expand All @@ -296,16 +293,12 @@ void runInternal() {
}
}

// compute the target containers as a ceiling of number of partitions divided by the number of containers
// per partition. Scale the result by a constant overprovision factor.
int numTargetContainers = (int) Math.ceil(((double)numPartitions / this.partitionsPerContainer) * this.overProvisionFactor);

// adjust the number of target containers based on the configured min and max container values.
numTargetContainers = Math.max(this.minContainers, Math.min(this.maxContainers, numTargetContainers));
trimContainerSize(numTargetContainers, yarnContainerRequestBundle);
slidingWindowReservoir.add(yarnContainerRequestBundle);

log.info("There are {} containers being requested in total, tag-count map {}, tag-resource map {}",
log.debug("There are {} containers being requested in total, tag-count map {}, tag-resource map {}",
yarnContainerRequestBundle.getTotalContainers(), yarnContainerRequestBundle.getHelixTagContainerCountMap(),
yarnContainerRequestBundle.getHelixTagResourceMap());

Expand Down Expand Up @@ -336,7 +329,7 @@ private void trimContainerSize(int numTargetContainers, YarnContainerRequestBund
Preconditions.checkArgument(numTargetContainers >= yarnContainerRequestBundle.getHelixTagContainerCountMap().keySet().size(),
"Num of container requested is less than type of helix tags, which will cause some of job not being processed."
+ "Please change the container configuration");
factor = factor >0 ? 1 : -1;
factor = factor > 0 ? 1 : -1;
List<String> tags = new ArrayList<> (yarnContainerRequestBundle.getHelixTagContainerCountMap().keySet());
int index = 0;
while(yarnContainerRequestBundle.getTotalContainers() != numTargetContainers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

package org.apache.gobblin.yarn;

import com.google.common.base.Preconditions;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.yarn.api.records.Resource;

Expand All @@ -48,12 +48,18 @@ public YarnContainerRequestBundle() {

public void add(String helixTag, int containerCount, Resource resource) {
helixTagContainerCountMap.put(helixTag, helixTagContainerCountMap.getOrDefault(helixTag, 0) + containerCount);
if(!helixTagResourceMap.containsKey(helixTag)) {
if(helixTagResourceMap.containsKey(helixTag)) {
Resource existedResource = helixTagResourceMap.get(helixTag);
Preconditions.checkArgument(resource.getMemory() == existedResource.getMemory() &&
resource.getVirtualCores() == existedResource.getVirtualCores(),
"Helix tag need to have consistent resource requirement. Tag " + helixTag
+ " has existed resource require " + existedResource.toString() + " and different require " + resource.toString());
} else {
helixTagResourceMap.put(helixTag, resource);
Set<String> tagSet = resourceHelixTagMap.getOrDefault(resource.toString(), new HashSet<>());
tagSet.add(helixTag);
resourceHelixTagMap.put(resource.toString(), tagSet);
}
Set<String> tagSet = resourceHelixTagMap.getOrDefault(resource.toString(), new HashSet<>());
tagSet.add(helixTag);
resourceHelixTagMap.put(resource.toString(), tagSet);
totalContainers += containerCount;
}

Expand All @@ -62,17 +68,9 @@ public void add(String helixTag, int containerCount) {
if (!helixTagContainerCountMap.containsKey(helixTag) && !helixTagResourceMap.containsKey(helixTag)) {
log.error("Helix tag {} is not present in the request bundle yet, can't process the request to add {} "
+ "container for it without specifying the resource requirement", helixTag, containerCount);
return;
}
helixTagContainerCountMap.put(helixTag, helixTagContainerCountMap.get(helixTag) + containerCount);
this.totalContainers += containerCount;
}

public void set(String helixTag, int containerCount) {
if (!helixTagContainerCountMap.containsKey(helixTag) && !helixTagResourceMap.containsKey(helixTag)) {
log.error("Helix tag {} is not present in the request bundle yet, can't process the request to set {} "
+ "container for it without specifying the resource requirement", helixTag, containerCount);
}
this.totalContainers += containerCount - helixTagContainerCountMap.get(helixTag);
helixTagContainerCountMap.put(helixTag, helixTagContainerCountMap.get(helixTag) + containerCount);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.gobblin.yarn;

import com.google.common.base.Optional;
import java.io.File;
import java.io.IOException;
import java.net.URL;
Expand All @@ -39,7 +38,6 @@
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.util.ConverterUtils;
Expand Down Expand Up @@ -256,14 +254,4 @@ public static String findHelixTagForContainer(Container container,
}
return foundTag;
}

public static void ensureResourceFitMaxCapacity(Optional<Resource> maxResourceCapacity, Resource requestedResource) {
if (maxResourceCapacity == null || !maxResourceCapacity.isPresent()) {
return;
}
int maxMemoryCapacity = maxResourceCapacity.get().getMemory();
requestedResource.setMemory(Math.min(requestedResource.getMemory(), maxMemoryCapacity));
int maxCoreCapacity = maxResourceCapacity.get().getVirtualCores();
requestedResource.setVirtualCores(Math.min(requestedResource.getVirtualCores(), maxCoreCapacity));
}
}
54 changes: 30 additions & 24 deletions gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,11 @@ public class YarnService extends AbstractIdleService {
// instance names get picked up when replacement containers get allocated.
private final Set<String> unusedHelixInstanceNames = ConcurrentHashMap.newKeySet();

private final Map<String, Integer> helixTagRequestedContainerCount = Maps.newConcurrentMap();
private final Map<String, Integer> helixTagAllocatedContainerCount = Maps.newConcurrentMap();
// The map from helix tag to requested container count
private final Map<String, Integer> requestedContainerCountMap = Maps.newConcurrentMap();
// The map from helix tag to allocated container count
private final Map<String, Integer> allocatedContainerCountMap = Maps.newConcurrentMap();

private volatile YarnContainerRequestBundle yarnContainerRequest;
private final AtomicInteger priorityNumGenerator = new AtomicInteger(0);
private final Map<String, Integer> resourcePriorityMap = new HashMap<>();
Expand Down Expand Up @@ -454,12 +457,12 @@ public synchronized void requestTargetNumberOfContainers(YarnContainerRequestBun
// so overshooting can occur, but periodic calls to this method will make adjustments towards the target.
for (Map.Entry<String, Integer> entry : yarnContainerRequestBundle.getHelixTagContainerCountMap().entrySet()) {
String currentHelixTag = entry.getKey();
int desiredContainer = entry.getValue();
int requestedContainerCount = helixTagRequestedContainerCount.getOrDefault(currentHelixTag, 0);
for(; requestedContainerCount < desiredContainer; requestedContainerCount++) {
int desiredContainerCount = entry.getValue();
int requestedContainerCount = requestedContainerCountMap.getOrDefault(currentHelixTag, 0);
for(; requestedContainerCount < desiredContainerCount; requestedContainerCount++) {
requestContainer(Optional.absent(), yarnContainerRequestBundle.getHelixTagResourceMap().get(currentHelixTag));
}
helixTagRequestedContainerCount.put(currentHelixTag, requestedContainerCount);
requestedContainerCountMap.put(currentHelixTag, requestedContainerCount);
}

// If the total desired is lower than the currently allocated amount then release free containers.
Expand All @@ -479,7 +482,7 @@ public synchronized void requestTargetNumberOfContainers(YarnContainerRequestBun
containersToRelease.add(containerInfo.getContainer());
String helixTag = containerInfo.getHelixTag();
if (!Strings.isNullOrEmpty(helixTag)) {
helixTagRequestedContainerCount.put(helixTag, helixTagRequestedContainerCount.get(helixTag) - 1);
requestedContainerCountMap.put(helixTag, requestedContainerCountMap.get(helixTag) - 1);
}
}

Expand All @@ -495,38 +498,42 @@ public synchronized void requestTargetNumberOfContainers(YarnContainerRequestBun
this.yarnContainerRequest = yarnContainerRequestBundle;
this.numRequestedContainers = numTargetContainers;
LOGGER.info("Current tag-container being requested:{}, tag-container allocated: {}",
this.helixTagRequestedContainerCount, this.helixTagAllocatedContainerCount);
this.requestedContainerCountMap, this.allocatedContainerCountMap);
}

// Request initial containers with default resource and helix tag
// Request initial containers with default resource and helix tag
private void requestInitialContainers(int containersRequested) {
YarnContainerRequestBundle initialYarnContainerRequest = new YarnContainerRequestBundle();
Resource capability = Resource.newInstance(this.requestedContainerMemoryMbs, this.requestedContainerCores);
YarnHelixUtils.ensureResourceFitMaxCapacity(this.maxResourceCapacity, capability);
initialYarnContainerRequest.add(this.helixInstanceTags, containersRequested, capability);
requestTargetNumberOfContainers(initialYarnContainerRequest, Collections.EMPTY_SET);
}

private void requestContainer(Optional<String> preferredNode, Optional<Resource> capability) {
Resource desiredResource = capability.or(Resource.newInstance(
private void requestContainer(Optional<String> preferredNode, Optional<Resource> resourceOptional) {
Resource desiredResource = resourceOptional.or(Resource.newInstance(
this.requestedContainerMemoryMbs, this.requestedContainerCores));
requestContainer(preferredNode, desiredResource);
}

private void requestContainer(Optional<String> preferredNode, Resource capability) {
YarnHelixUtils.ensureResourceFitMaxCapacity(this.maxResourceCapacity, capability);
// Request containers with specific resource requirement
private void requestContainer(Optional<String> preferredNode, Resource resource) {
// Fail if Yarn cannot meet container resource requirements
Preconditions.checkArgument(resource.getMemory() <= this.maxResourceCapacity.get().getMemory() &&
resource.getVirtualCores() <= this.maxResourceCapacity.get().getVirtualCores(),
"Resource requirement must less than the max resource capacity. Requested resource" + resource.toString()
+ " exceed the max resource limit " + this.maxResourceCapacity.get().toString());

// Due to YARN-314, different resource size needs different priority, otherwise Yarn will not allocate container
// Due to YARN-314, different resource capacity needs different priority, otherwise Yarn will not allocate container
Priority priority = Records.newRecord(Priority.class);
if(!resourcePriorityMap.containsKey(capability.toString())) {
resourcePriorityMap.put(capability.toString(), priorityNumGenerator.getAndIncrement());
if(!resourcePriorityMap.containsKey(resource.toString())) {
resourcePriorityMap.put(resource.toString(), priorityNumGenerator.getAndIncrement());
}
int priorityNum = resourcePriorityMap.get(capability.toString());
int priorityNum = resourcePriorityMap.get(resource.toString());
priority.setPriority(priorityNum);

String[] preferredNodes = preferredNode.isPresent() ? new String[] {preferredNode.get()} : null;
this.amrmClientAsync.addContainerRequest(
new AMRMClient.ContainerRequest(capability, preferredNodes, null, priority));
new AMRMClient.ContainerRequest(resource, preferredNodes, null, priority));
}

protected ContainerLaunchContext newContainerLaunchContext(ContainerInfo containerInfo)
Expand Down Expand Up @@ -671,7 +678,7 @@ protected void handleContainerCompletion(ContainerStatus containerStatus) {
//containerId missing from the containersMap.
String completedInstanceName = completedContainerInfo == null? UNKNOWN_HELIX_INSTANCE : completedContainerInfo.getHelixParticipantId();
String helixTag = completedContainerInfo == null ? helixInstanceTags : completedContainerInfo.getHelixTag();
helixTagAllocatedContainerCount.put(helixTag, helixTagAllocatedContainerCount.get(helixTag) - 1);
allocatedContainerCountMap.put(helixTag, allocatedContainerCountMap.get(helixTag) - 1);

LOGGER.info(String.format("Container %s running Helix instance %s has completed with exit status %d",
containerStatus.getContainerId(), completedInstanceName, containerStatus.getExitStatus()));
Expand Down Expand Up @@ -786,8 +793,7 @@ public void onContainersCompleted(List<ContainerStatus> statuses) {
public void onContainersAllocated(List<Container> containers) {
for (final Container container : containers) {
String containerId = container.getId().toString();
String containerHelixTag = YarnHelixUtils.findHelixTagForContainer(container,
helixTagAllocatedContainerCount, yarnContainerRequest);
String containerHelixTag = YarnHelixUtils.findHelixTagForContainer(container, allocatedContainerCountMap, yarnContainerRequest);
if (Strings.isNullOrEmpty(containerHelixTag)) {
containerHelixTag = helixInstanceTags;
}
Expand Down Expand Up @@ -818,8 +824,8 @@ public void onContainersAllocated(List<Container> containers) {
instanceName = null;
}
}
helixTagAllocatedContainerCount.put(containerHelixTag,
helixTagAllocatedContainerCount.getOrDefault(containerHelixTag, 0) + 1);
allocatedContainerCountMap.put(containerHelixTag,
allocatedContainerCountMap.getOrDefault(containerHelixTag, 0) + 1);
}

if (Strings.isNullOrEmpty(instanceName)) {
Expand Down
Loading

0 comments on commit ea22d96

Please sign in to comment.