Skip to content

Commit

Permalink
SAMZA-1138; Yarn capability check is broken
Browse files Browse the repository at this point in the history
After migration from `yarn.container.*` properties to `cluster-manager.container.*` properties we have to use either of them and `ClusterManagerConfig` provides backward compatibility for these properties. But in `YarnClusterResourceManage`r only old properties are used (from `YarnConfig`), hence if job config migrated to new `cluster-manager.*` properties names then check will be evaluated against default values, not against actual values.

Author: Maksim Logvinenko <[email protected]>

Reviewers: Jagadish <[email protected]>

Closes apache#84 from logarithm/yarn-properties-fix
  • Loading branch information
logarithm authored and jagadish-northguard committed Apr 2, 2017
1 parent 888e061 commit 101ca43
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 42 deletions.
41 changes: 0 additions & 41 deletions samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,41 +27,16 @@ public class YarnConfig extends MapConfig {
*/
public static final String PACKAGE_PATH = "yarn.package.path";

// Configs related to each yarn container
/**
* Memory, in megabytes, to request from YARN per container
*/
public static final String CONTAINER_MAX_MEMORY_MB = "yarn.container.memory.mb";
private static final int DEFAULT_CONTAINER_MEM = 1024;

/**
* Name of YARN queue to run jobs on
*/
public static final String QUEUE_NAME = "yarn.queue";

/**
* Number of CPU cores to request from YARN per container
*/
public static final String CONTAINER_MAX_CPU_CORES = "yarn.container.cpu.cores";
private static final int DEFAULT_CPU_CORES = 1;

/**
* Label to request from YARN for containers
*/
public static final String CONTAINER_LABEL = "yarn.container.label";

/**
* Maximum number of times the AM tries to restart a failed container
*/
public static final String CONTAINER_RETRY_COUNT = "yarn.container.retry.count";
private static final int DEFAULT_CONTAINER_RETRY_COUNT = 8;

/**
* Determines how frequently a container is allowed to fail before we give up and fail the job
*/
public static final String CONTAINER_RETRY_WINDOW_MS = "yarn.container.retry.window.ms";
private static final int DEFAULT_CONTAINER_RETRY_WINDOW_MS = 300000;

// Configs related to the Samza Application Master (AM)
/**
* (Optional) JVM options to include in the command line when executing the AM
Expand Down Expand Up @@ -150,26 +125,10 @@ public YarnConfig(Config config) {
super(config);
}

public int getContainerRetryCount() {
return getInt(CONTAINER_RETRY_COUNT, DEFAULT_CONTAINER_RETRY_COUNT);
}

public int getContainerRetryWindowMs() {
return getInt(CONTAINER_RETRY_WINDOW_MS, DEFAULT_CONTAINER_RETRY_WINDOW_MS);
}

public int getAMPollIntervalMs() {
return getInt(AM_POLL_INTERVAL_MS, DEFAULT_POLL_INTERVAL_MS);
}

public int getContainerMaxMemoryMb() {
return getInt(CONTAINER_MAX_MEMORY_MB, DEFAULT_CONTAINER_MEM);
}

public int getContainerMaxCpuCores() {
return getInt(CONTAINER_MAX_CPU_CORES, DEFAULT_CPU_CORES);
}

public String getContainerLabel() {
return get(CONTAINER_LABEL, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.samza.clustermanager.*;
import org.apache.samza.clustermanager.SamzaApplicationState;
import org.apache.samza.clustermanager.SamzaContainerLaunchException;
import org.apache.samza.config.ClusterManagerConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.ShellCommandConfig;
import org.apache.samza.config.YarnConfig;
Expand Down Expand Up @@ -149,7 +150,14 @@ public YarnClusterResourceManager(Config config, JobModelManager jobModelManager
this.service = new SamzaYarnAppMasterService(config, samzaAppState, this.state, registry, hConfig);

log.info("ContainerID str {}, Nodehost {} , Nodeport {} , NodeHttpport {}", new Object [] {containerIdStr, nodeHostString, nodePort, nodeHttpPort});
this.lifecycle = new SamzaYarnAppMasterLifecycle(yarnConfig.getContainerMaxMemoryMb(), yarnConfig.getContainerMaxCpuCores(), samzaAppState, state, amClient );
ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config);
this.lifecycle = new SamzaYarnAppMasterLifecycle(
clusterManagerConfig.getContainerMemoryMb(),
clusterManagerConfig.getNumCores(),
samzaAppState,
state,
amClient
);

yarnContainerRunner = new YarnContainerRunner(config, hConfig);
}
Expand Down

0 comments on commit 101ca43

Please sign in to comment.