diff --git a/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java b/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java index 86e4ef72a7b30..aa4bc3eeed3ab 100644 --- a/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java +++ b/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java @@ -27,41 +27,16 @@ public class YarnConfig extends MapConfig { */ public static final String PACKAGE_PATH = "yarn.package.path"; - // Configs related to each yarn container - /** - * Memory, in megabytes, to request from YARN per container - */ - public static final String CONTAINER_MAX_MEMORY_MB = "yarn.container.memory.mb"; - private static final int DEFAULT_CONTAINER_MEM = 1024; - /** * Name of YARN queue to run jobs on */ public static final String QUEUE_NAME = "yarn.queue"; - /** - * Number of CPU cores to request from YARN per container - */ - public static final String CONTAINER_MAX_CPU_CORES = "yarn.container.cpu.cores"; - private static final int DEFAULT_CPU_CORES = 1; - /** * Label to request from YARN for containers */ public static final String CONTAINER_LABEL = "yarn.container.label"; - /** - * Maximum number of times the AM tries to restart a failed container - */ - public static final String CONTAINER_RETRY_COUNT = "yarn.container.retry.count"; - private static final int DEFAULT_CONTAINER_RETRY_COUNT = 8; - - /** - * Determines how frequently a container is allowed to fail before we give up and fail the job - */ - public static final String CONTAINER_RETRY_WINDOW_MS = "yarn.container.retry.window.ms"; - private static final int DEFAULT_CONTAINER_RETRY_WINDOW_MS = 300000; - // Configs related to the Samza Application Master (AM) /** * (Optional) JVM options to include in the command line when executing the AM @@ -150,26 +125,10 @@ public YarnConfig(Config config) { super(config); } - public int getContainerRetryCount() { - return getInt(CONTAINER_RETRY_COUNT, DEFAULT_CONTAINER_RETRY_COUNT); - } - - public int getContainerRetryWindowMs() { - return getInt(CONTAINER_RETRY_WINDOW_MS, DEFAULT_CONTAINER_RETRY_WINDOW_MS); - } - public int getAMPollIntervalMs() { return getInt(AM_POLL_INTERVAL_MS, DEFAULT_POLL_INTERVAL_MS); } - public int getContainerMaxMemoryMb() { - return getInt(CONTAINER_MAX_MEMORY_MB, DEFAULT_CONTAINER_MEM); - } - - public int getContainerMaxCpuCores() { - return getInt(CONTAINER_MAX_CPU_CORES, DEFAULT_CPU_CORES); - } - public String getContainerLabel() { return get(CONTAINER_LABEL, null); } diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java index 04c78be94b5e1..1fd393984af6c 100644 --- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java +++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java @@ -31,6 +31,7 @@ import org.apache.samza.clustermanager.*; import org.apache.samza.clustermanager.SamzaApplicationState; import org.apache.samza.clustermanager.SamzaContainerLaunchException; +import org.apache.samza.config.ClusterManagerConfig; import org.apache.samza.config.Config; import org.apache.samza.config.ShellCommandConfig; import org.apache.samza.config.YarnConfig; @@ -149,7 +150,14 @@ public YarnClusterResourceManager(Config config, JobModelManager jobModelManager this.service = new SamzaYarnAppMasterService(config, samzaAppState, this.state, registry, hConfig); log.info("ContainerID str {}, Nodehost {} , Nodeport {} , NodeHttpport {}", new Object [] {containerIdStr, nodeHostString, nodePort, nodeHttpPort}); - this.lifecycle = new SamzaYarnAppMasterLifecycle(yarnConfig.getContainerMaxMemoryMb(), yarnConfig.getContainerMaxCpuCores(), samzaAppState, state, amClient ); + ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config); + this.lifecycle = new SamzaYarnAppMasterLifecycle( + clusterManagerConfig.getContainerMemoryMb(), + clusterManagerConfig.getNumCores(), + samzaAppState, + state, + amClient + ); yarnContainerRunner = new YarnContainerRunner(config, hConfig); }