From bbff9e123f61168396bc7145635760a3b3aa83f0 Mon Sep 17 00:00:00 2001 From: Szilard Nemeth Date: Sat, 26 Dec 2020 23:59:30 +0100 Subject: [PATCH 1/2] YARN-10550.001 --- .../org/apache/hadoop/yarn/sls/AMRunner.java | 14 +- .../org/apache/hadoop/yarn/sls/NMRunner.java | 234 ++++++++++++++++++ .../org/apache/hadoop/yarn/sls/SLSRunner.java | 215 ++++------------ 3 files changed, 286 insertions(+), 177 deletions(-) create mode 100644 hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/NMRunner.java diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMRunner.java index 301b4260f35cd..1f5c3938255c1 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMRunner.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMRunner.java @@ -63,7 +63,6 @@ public class AMRunner { private Map amClassMap; private TraceType inputType; private String[] inputTraces; - private SynthTraceJobProducer stjp; private TaskRunner runner; private SLSRunner slsRunner; private int numAMs, numTasks; @@ -148,16 +147,17 @@ private void startAMFromSLSTrace(String inputTrace) throws IOException { private void startAMFromSynthGenerator() throws YarnException, IOException { Configuration localConf = new Configuration(); localConf.set("fs.defaultFS", "file:///"); - // if we use the nodeFile this could have been not initialized yet. - if (stjp == null) { - stjp = new SynthTraceJobProducer(conf, new Path(inputTraces[0])); - slsRunner.setStjp(stjp); - } + //TODO + //if we use the nodeFile this could have been not initialized yet. +// if (stjp == null) { +// stjp = new SynthTraceJobProducer(conf, new Path(inputTraces[0])); +// slsRunner.setStjp(stjp); +// } SynthJob job; // we use stjp, a reference to the job producer instantiated during node // creation - while ((job = (SynthJob) stjp.getNextJob()) != null) { + while ((job = (SynthJob) slsRunner.getStjp().getNextJob()) != null) { ReservationId reservationId = null; if (job.hasDeadline()) { reservationId = ReservationId diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/NMRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/NMRunner.java new file mode 100644 index 0000000000000..a1e0055e6a89a --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/NMRunner.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.sls; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeLabel; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.sls.SLSRunner.TraceType; +import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; +import org.apache.hadoop.yarn.sls.nodemanager.NMSimulator; +import org.apache.hadoop.yarn.sls.scheduler.TaskRunner; +import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer; +import org.apache.hadoop.yarn.sls.utils.SLSUtils; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +public class NMRunner { + private static final Logger LOG = LoggerFactory.getLogger(NMRunner.class); + + // other simulation information + private int numNMs, numRacks; + + // NM simulator + private Map nmMap; + private Resource nodeManagerResource; + private String nodeFile; + private TaskRunner taskRunner; + private Configuration conf; + private ResourceManager rm; + private String tableMapping; + private int thredPoolSize; + private TraceType inputType; + private String[] inputTraces; + private SynthTraceJobProducer stjp; + + public NMRunner(TaskRunner taskRunner, Configuration conf, ResourceManager rm, String tableMapping, int threadPoolSize) { + this.taskRunner = taskRunner; + this.conf = conf; + this.rm = rm; + this.tableMapping = tableMapping; + this.thredPoolSize = threadPoolSize; + this.nmMap = new ConcurrentHashMap<>(); + this.nodeManagerResource = getNodeManagerResourceFromConf(); + } + + public void startNM() throws YarnException, IOException, + InterruptedException { + // nm configuration + int heartbeatInterval = conf.getInt( + SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS, + SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS_DEFAULT); + float resourceUtilizationRatio = conf.getFloat( + SLSConfiguration.NM_RESOURCE_UTILIZATION_RATIO, + SLSConfiguration.NM_RESOURCE_UTILIZATION_RATIO_DEFAULT); + // nm information (fetch from topology file, or from sls/rumen json file) + Set nodeSet = null; + if (nodeFile.isEmpty()) { + for (String inputTrace : inputTraces) { + switch (inputType) { + case SLS: + nodeSet = SLSUtils.parseNodesFromSLSTrace(inputTrace); + break; + case RUMEN: + nodeSet = SLSUtils.parseNodesFromRumenTrace(inputTrace); + break; + case SYNTH: + stjp = new SynthTraceJobProducer(conf, new Path(inputTraces[0])); + nodeSet = SLSUtils.generateNodes(stjp.getNumNodes(), + stjp.getNumNodes()/stjp.getNodesPerRack()); + break; + default: + throw new YarnException("Input configuration not recognized, " + + "trace type should be SLS, RUMEN, or SYNTH"); + } + } + } else { + nodeSet = SLSUtils.parseNodesFromNodeFile(nodeFile, + nodeManagerResource); + } + + if (nodeSet == null || nodeSet.isEmpty()) { + throw new YarnException("No node! Please configure nodes."); + } + + SLSUtils.generateNodeTableMapping(nodeSet, tableMapping); + + // create NM simulators + Random random = new Random(); + Set rackSet = ConcurrentHashMap.newKeySet(); + int threadPoolSize = Math.max(thredPoolSize, + SLSConfiguration.RUNNER_POOL_SIZE_DEFAULT); + ExecutorService executorService = Executors. + newFixedThreadPool(threadPoolSize); + for (SLSRunner.NodeDetails nodeDetails : nodeSet) { + executorService.submit(new Runnable() { + @Override public void run() { + try { + // we randomize the heartbeat start time from zero to 1 interval + NMSimulator nm = new NMSimulator(); + Resource nmResource = nodeManagerResource; + String hostName = nodeDetails.getHostname(); + if (nodeDetails.getNodeResource() != null) { + nmResource = nodeDetails.getNodeResource(); + } + Set nodeLabels = nodeDetails.getLabels(); + nm.init(hostName, nmResource, + random.nextInt(heartbeatInterval), + heartbeatInterval, rm, resourceUtilizationRatio, nodeLabels); + nmMap.put(nm.getNode().getNodeID(), nm); + taskRunner.schedule(nm); + rackSet.add(nm.getNode().getRackName()); + } catch (IOException | YarnException e) { + LOG.error("Got an error while adding node", e); + } + } + }); + } + executorService.shutdown(); + executorService.awaitTermination(10, TimeUnit.MINUTES); + numRacks = rackSet.size(); + numNMs = nmMap.size(); + } + + void waitForNodesRunning() throws InterruptedException { + long startTimeMS = System.currentTimeMillis(); + while (true) { + int numRunningNodes = 0; + for (RMNode node : rm.getRMContext().getRMNodes().values()) { + if (node.getState() == NodeState.RUNNING) { + numRunningNodes++; + } + } + if (numRunningNodes == numNMs) { + break; + } + LOG.info("SLSRunner is waiting for all nodes RUNNING." + + " {} of {} NMs initialized.", numRunningNodes, numNMs); + Thread.sleep(1000); + } + LOG.info("SLSRunner takes {} ms to launch all nodes.", + System.currentTimeMillis() - startTimeMS); + } + + private Resource getNodeManagerResourceFromConf() { + Resource resource = Resources.createResource(0); + ResourceInformation[] infors = ResourceUtils.getResourceTypesArray(); + for (ResourceInformation info : infors) { + long value; + if (info.getName().equals(ResourceInformation.MEMORY_URI)) { + value = conf.getInt(SLSConfiguration.NM_MEMORY_MB, + SLSConfiguration.NM_MEMORY_MB_DEFAULT); + } else if (info.getName().equals(ResourceInformation.VCORES_URI)) { + value = conf.getInt(SLSConfiguration.NM_VCORES, + SLSConfiguration.NM_VCORES_DEFAULT); + } else { + value = conf.getLong(SLSConfiguration.NM_PREFIX + + info.getName(), SLSConfiguration.NM_RESOURCE_DEFAULT); + } + + resource.setResourceValue(info.getName(), value); + } + + return resource; + } + + public void setNodeFile(String nodeFile) { + this.nodeFile = nodeFile; + } + + + public void setInputType(TraceType inputType) { + this.inputType = inputType; + } + + public void setInputTraces(String[] inputTraces) { + this.inputTraces = inputTraces; + } + + public int getNumNMs() { + return numNMs; + } + + public int getNumRacks() { + return numRacks; + } + + public Resource getNodeManagerResource() { + return nodeManagerResource; + } + + public Map getNmMap() { + return nmMap; + } + + public SynthTraceJobProducer getStjp() { + return stjp; + } + + public void setTableMapping(String tableMapping) { + this.tableMapping = tableMapping; + } +} diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java index e9ae7f51dbafa..05904de079e2f 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java @@ -59,11 +59,9 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeLabel; -import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.sls.appmaster.AMSimulator; import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; import org.apache.hadoop.yarn.sls.nodemanager.NMSimulator; @@ -71,8 +69,6 @@ import org.apache.hadoop.yarn.sls.scheduler.TaskRunner; import org.apache.hadoop.yarn.sls.scheduler.Tracker; import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer; -import org.apache.hadoop.yarn.sls.utils.SLSUtils; -import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.Resources; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,32 +81,19 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; -import java.util.Random; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; @Private @Unstable public class SLSRunner extends Configured implements Tool { private static TaskRunner runner = new TaskRunner(); private String[] inputTraces; - private int poolSize; - - // NM simulator - private Map nmMap; - private Resource nodeManagerResource; - private String nodeFile; // metrics private boolean printSimulation; - // other simulation information - private int numNMs, numRacks; - - private final static Map simulateInfoMap = new HashMap<>(); + private final static Map simulateInfoMap = + new HashMap<>(); // logger public final static Logger LOG = LoggerFactory.getLogger(SLSRunner.class); @@ -118,6 +101,9 @@ public class SLSRunner extends Configured implements Tool { private static boolean exitAtTheFinish = false; private AMRunner amRunner; private RMRunner rmRunner; + private NMRunner nmRunner; + + private SynthTraceJobProducer stjp; /** * The type of trace in input. @@ -130,19 +116,16 @@ public enum TraceType { public static final String NETWORK_NEGATIVE_CACHE_TTL = "networkaddress.cache.negative.ttl"; - private TraceType inputType; - private SynthTraceJobProducer stjp; - public static int getRemainingApps() { return AMRunner.REMAINING_APPS; } - public SLSRunner() throws ClassNotFoundException { + public SLSRunner() throws ClassNotFoundException, YarnException { Configuration tempConf = new Configuration(false); init(tempConf); } - public SLSRunner(Configuration tempConf) throws ClassNotFoundException { + public SLSRunner(Configuration tempConf) throws ClassNotFoundException, YarnException { init(tempConf); } @@ -156,43 +139,31 @@ public void setConf(Configuration conf) { super.setConf(conf); } - private void init(Configuration tempConf) throws ClassNotFoundException { + private void init(Configuration tempConf) throws ClassNotFoundException, YarnException { // runner configuration setConf(tempConf); - - nmMap = new ConcurrentHashMap<>(); - amRunner = new AMRunner(runner, this); - rmRunner = new RMRunner(tempConf, this); - - // runner - poolSize = tempConf.getInt(SLSConfiguration.RUNNER_POOL_SIZE, + + int poolSize = tempConf.getInt(SLSConfiguration.RUNNER_POOL_SIZE, SLSConfiguration.RUNNER_POOL_SIZE_DEFAULT); SLSRunner.runner.setQueueSize(poolSize); + rmRunner = new RMRunner(getConf(), this); + nmRunner = new NMRunner(runner, getConf(), rmRunner.getRm(), rmRunner.getTableMapping(), poolSize); + amRunner = new AMRunner(runner, this); amRunner.init(tempConf); - nodeManagerResource = getNodeManagerResource(); } - private Resource getNodeManagerResource() { - Resource resource = Resources.createResource(0); - ResourceInformation[] infors = ResourceUtils.getResourceTypesArray(); - for (ResourceInformation info : infors) { - long value; - if (info.getName().equals(ResourceInformation.MEMORY_URI)) { - value = getConf().getInt(SLSConfiguration.NM_MEMORY_MB, - SLSConfiguration.NM_MEMORY_MB_DEFAULT); - } else if (info.getName().equals(ResourceInformation.VCORES_URI)) { - value = getConf().getInt(SLSConfiguration.NM_VCORES, - SLSConfiguration.NM_VCORES_DEFAULT); - } else { - value = getConf().getLong(SLSConfiguration.NM_PREFIX + - info.getName(), SLSConfiguration.NM_RESOURCE_DEFAULT); + private SynthTraceJobProducer getSynthJobTraceProducer() throws YarnException { + // if we use the nodeFile this could have been not initialized yet. + if (nmRunner.getStjp() != null) { + return nmRunner.getStjp(); + } else { + try { + return new SynthTraceJobProducer(getConf(), new Path(inputTraces[0])); + } catch (IOException e) { + throw new YarnException("Failed to initialize SynthTraceJobProducer", e); } - - resource.setResourceValue(info.getName(), value); } - - return resource; } /** @@ -213,29 +184,32 @@ public static Map getSimulateInfoMap() { */ public void setSimulationParams(TraceType inType, String[] inTraces, String nodes, String metricsOutputDir, Set trackApps, - boolean printsimulation) { - - this.inputType = inType; + boolean printsimulation) throws YarnException { this.inputTraces = inTraces.clone(); - this.amRunner.setInputType(this.inputType); + this.amRunner.setInputType(inType); this.amRunner.setInputTraces(this.inputTraces); this.amRunner.setTrackedApps(trackApps); - this.nodeFile = nodes; + this.nmRunner.setNodeFile(nodes); + this.nmRunner.setInputType(inType); + this.nmRunner.setInputTraces(this.inputTraces); this.printSimulation = printsimulation; this.rmRunner.setMetricsOutputDir(metricsOutputDir); - this.rmRunner.setTableMapping(metricsOutputDir + "/tableMapping.csv"); + String tableMapping = metricsOutputDir + "/tableMapping.csv"; + this.rmRunner.setTableMapping(tableMapping); + this.nmRunner.setTableMapping(tableMapping); + this.stjp = getSynthJobTraceProducer(); } public void start() throws IOException, ClassNotFoundException, YarnException, InterruptedException { - enableDNSCaching(getConf()); // start resource manager rmRunner.startRM(); amRunner.setResourceManager(rmRunner.getRm()); + // start node managers - startNM(); + nmRunner.startNM(); // start application masters amRunner.startAM(); @@ -248,7 +222,7 @@ public void start() throws IOException, ClassNotFoundException, YarnException, // print out simulation info printSimulationInfo(); // blocked until all nodes RUNNING - waitForNodesRunning(); + nmRunner.waitForNodesRunning(); // starting the runner once everything is ready to go, runner.start(); } @@ -270,104 +244,6 @@ static void enableDNSCaching(Configuration conf) { } } - private void startNM() throws YarnException, IOException, - InterruptedException { - // nm configuration - int heartbeatInterval = getConf().getInt( - SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS, - SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS_DEFAULT); - float resourceUtilizationRatio = getConf().getFloat( - SLSConfiguration.NM_RESOURCE_UTILIZATION_RATIO, - SLSConfiguration.NM_RESOURCE_UTILIZATION_RATIO_DEFAULT); - // nm information (fetch from topology file, or from sls/rumen json file) - Set nodeSet = null; - if (nodeFile.isEmpty()) { - for (String inputTrace : inputTraces) { - switch (inputType) { - case SLS: - nodeSet = SLSUtils.parseNodesFromSLSTrace(inputTrace); - break; - case RUMEN: - nodeSet = SLSUtils.parseNodesFromRumenTrace(inputTrace); - break; - case SYNTH: - stjp = new SynthTraceJobProducer(getConf(), new Path(inputTraces[0])); - nodeSet = SLSUtils.generateNodes(stjp.getNumNodes(), - stjp.getNumNodes()/stjp.getNodesPerRack()); - break; - default: - throw new YarnException("Input configuration not recognized, " - + "trace type should be SLS, RUMEN, or SYNTH"); - } - } - } else { - nodeSet = SLSUtils.parseNodesFromNodeFile(nodeFile, - nodeManagerResource); - } - - if (nodeSet == null || nodeSet.isEmpty()) { - throw new YarnException("No node! Please configure nodes."); - } - - SLSUtils.generateNodeTableMapping(nodeSet, rmRunner.getTableMapping()); - - // create NM simulators - Random random = new Random(); - Set rackSet = ConcurrentHashMap.newKeySet(); - int threadPoolSize = Math.max(poolSize, - SLSConfiguration.RUNNER_POOL_SIZE_DEFAULT); - ExecutorService executorService = Executors. - newFixedThreadPool(threadPoolSize); - for (NodeDetails nodeDetails : nodeSet) { - executorService.submit(new Runnable() { - @Override public void run() { - try { - // we randomize the heartbeat start time from zero to 1 interval - NMSimulator nm = new NMSimulator(); - Resource nmResource = nodeManagerResource; - String hostName = nodeDetails.getHostname(); - if (nodeDetails.getNodeResource() != null) { - nmResource = nodeDetails.getNodeResource(); - } - Set nodeLabels = nodeDetails.getLabels(); - nm.init(hostName, nmResource, - random.nextInt(heartbeatInterval), - heartbeatInterval, rmRunner.getRm(), resourceUtilizationRatio, nodeLabels); - nmMap.put(nm.getNode().getNodeID(), nm); - runner.schedule(nm); - rackSet.add(nm.getNode().getRackName()); - } catch (IOException | YarnException e) { - LOG.error("Got an error while adding node", e); - } - } - }); - } - executorService.shutdown(); - executorService.awaitTermination(10, TimeUnit.MINUTES); - numRacks = rackSet.size(); - numNMs = nmMap.size(); - } - - private void waitForNodesRunning() throws InterruptedException { - long startTimeMS = System.currentTimeMillis(); - while (true) { - int numRunningNodes = 0; - for (RMNode node : rmRunner.getRm().getRMContext().getRMNodes().values()) { - if (node.getState() == NodeState.RUNNING) { - numRunningNodes++; - } - } - if (numRunningNodes == numNMs) { - break; - } - LOG.info("SLSRunner is waiting for all nodes RUNNING." - + " {} of {} NMs initialized.", numRunningNodes, numNMs); - Thread.sleep(1000); - } - LOG.info("SLSRunner takes {} ms to launch all nodes.", - System.currentTimeMillis() - startTimeMS); - } - Resource getDefaultContainerResource() { int containerMemory = getConf().getInt(SLSConfiguration.CONTAINER_MEMORY_MB, SLSConfiguration.CONTAINER_MEMORY_MB_DEFAULT); @@ -392,7 +268,7 @@ private void printSimulationInfo() { LOG.info("------------------------------------"); LOG.info("# nodes = {}, # racks = {}, capacity " + "of each node {}.", - numNMs, numRacks, nodeManagerResource); + nmRunner.getNumNMs(), nmRunner.getNumRacks(), nmRunner.getNodeManagerResource()); LOG.info("------------------------------------"); // job LOG.info("# applications = {}, # total " + @@ -416,12 +292,12 @@ private void printSimulationInfo() { LOG.info("------------------------------------"); } // package these information in the simulateInfoMap used by other places - simulateInfoMap.put("Number of racks", numRacks); - simulateInfoMap.put("Number of nodes", numNMs); + simulateInfoMap.put("Number of racks", nmRunner.getNumRacks()); + simulateInfoMap.put("Number of nodes", nmRunner.getNumNMs()); simulateInfoMap.put("Node memory (MB)", - nodeManagerResource.getResourceValue(ResourceInformation.MEMORY_URI)); + nmRunner.getNodeManagerResource().getResourceValue(ResourceInformation.MEMORY_URI)); simulateInfoMap.put("Node VCores", - nodeManagerResource.getResourceValue(ResourceInformation.VCORES_URI)); + nmRunner.getNodeManagerResource().getResourceValue(ResourceInformation.VCORES_URI)); simulateInfoMap.put("Number of applications", numAMs); simulateInfoMap.put("Number of tasks", numTasks); simulateInfoMap.put("Average tasks per applicaion", @@ -434,7 +310,7 @@ private void printSimulationInfo() { } public Map getNmMap() { - return nmMap; + return nmRunner.getNmMap(); } public static void decreaseRemainingApps() { @@ -458,7 +334,6 @@ public void stop() throws InterruptedException { public int run(final String[] argv) throws IOException, InterruptedException, ParseException, ClassNotFoundException, YarnException { - Options options = new Options(); // Left for compatibility @@ -524,7 +399,6 @@ public int run(final String[] argv) throws IOException, InterruptedException, case "RUMEN": tempTraceType = TraceType.RUMEN; break; - case "SYNTH": tempTraceType = TraceType.SYNTH; break; @@ -537,7 +411,7 @@ public int run(final String[] argv) throws IOException, InterruptedException, setSimulationParams(tempTraceType, inputFiles, tempNodeFile, output, trackedJobSet, cmd.hasOption("printsimulation")); - + start(); return 0; @@ -629,9 +503,10 @@ public SynthTraceJobProducer getStjp() { return stjp; } - public void setStjp(SynthTraceJobProducer stjp) { - this.stjp = stjp; - } + //TODO +// public void setStjp(SynthTraceJobProducer stjp) { +// this.stjp = stjp; +// } public AMSimulator getAMSimulatorByAppId(ApplicationId appId) { return amRunner.getAMSimulator(appId); From 5780f12441cd71a78166d62ec7cc98d50f512120 Mon Sep 17 00:00:00 2001 From: Szilard Nemeth Date: Tue, 29 Mar 2022 17:21:37 +0200 Subject: [PATCH 2/2] 002 --- .../java/org/apache/hadoop/yarn/sls/AMRunner.java | 8 +++----- .../java/org/apache/hadoop/yarn/sls/NMRunner.java | 10 +++++++--- .../java/org/apache/hadoop/yarn/sls/SLSRunner.java | 14 +++++++++----- .../hadoop/yarn/sls/nodemanager/NodeInfo.java | 1 - .../hadoop/yarn/sls/scheduler/RMNodeWrapper.java | 1 - .../yarn/sls/scheduler/SchedulerMetrics.java | 12 ++++++++---- 6 files changed, 27 insertions(+), 19 deletions(-) diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMRunner.java index 1f5c3938255c1..d80337688d5e2 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMRunner.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMRunner.java @@ -147,12 +147,10 @@ private void startAMFromSLSTrace(String inputTrace) throws IOException { private void startAMFromSynthGenerator() throws YarnException, IOException { Configuration localConf = new Configuration(); localConf.set("fs.defaultFS", "file:///"); - //TODO //if we use the nodeFile this could have been not initialized yet. -// if (stjp == null) { -// stjp = new SynthTraceJobProducer(conf, new Path(inputTraces[0])); -// slsRunner.setStjp(stjp); -// } + if (slsRunner.getStjp() == null) { + slsRunner.setStjp(new SynthTraceJobProducer(conf, new Path(inputTraces[0]))); + } SynthJob job; // we use stjp, a reference to the job producer instantiated during node diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/NMRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/NMRunner.java index a1e0055e6a89a..224e1e373ff0b 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/NMRunner.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/NMRunner.java @@ -60,7 +60,7 @@ public class NMRunner { private Configuration conf; private ResourceManager rm; private String tableMapping; - private int thredPoolSize; + private int threadPoolSize; private TraceType inputType; private String[] inputTraces; private SynthTraceJobProducer stjp; @@ -70,7 +70,7 @@ public NMRunner(TaskRunner taskRunner, Configuration conf, ResourceManager rm, S this.conf = conf; this.rm = rm; this.tableMapping = tableMapping; - this.thredPoolSize = threadPoolSize; + this.threadPoolSize = threadPoolSize; this.nmMap = new ConcurrentHashMap<>(); this.nodeManagerResource = getNodeManagerResourceFromConf(); } @@ -119,7 +119,7 @@ public void startNM() throws YarnException, IOException, // create NM simulators Random random = new Random(); Set rackSet = ConcurrentHashMap.newKeySet(); - int threadPoolSize = Math.max(thredPoolSize, + int threadPoolSize = Math.max(this.threadPoolSize, SLSConfiguration.RUNNER_POOL_SIZE_DEFAULT); ExecutorService executorService = Executors. newFixedThreadPool(threadPoolSize); @@ -231,4 +231,8 @@ public SynthTraceJobProducer getStjp() { public void setTableMapping(String tableMapping) { this.tableMapping = tableMapping; } + + public void setRm(ResourceManager rm) { + this.rm = rm; + } } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java index 05904de079e2f..318476427a69b 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java @@ -197,7 +197,11 @@ public void setSimulationParams(TraceType inType, String[] inTraces, String tableMapping = metricsOutputDir + "/tableMapping.csv"; this.rmRunner.setTableMapping(tableMapping); this.nmRunner.setTableMapping(tableMapping); - this.stjp = getSynthJobTraceProducer(); + + //We need this.inputTraces to set before creating SynthTraceJobProducer + if (inType == TraceType.SYNTH) { + this.stjp = getSynthJobTraceProducer(); + } } public void start() throws IOException, ClassNotFoundException, YarnException, @@ -206,6 +210,7 @@ public void start() throws IOException, ClassNotFoundException, YarnException, // start resource manager rmRunner.startRM(); + nmRunner.setRm(rmRunner.getRm()); amRunner.setResourceManager(rmRunner.getRm()); // start node managers @@ -503,10 +508,9 @@ public SynthTraceJobProducer getStjp() { return stjp; } - //TODO -// public void setStjp(SynthTraceJobProducer stjp) { -// this.stjp = stjp; -// } + public void setStjp(SynthTraceJobProducer stjp) { + this.stjp = stjp; + } public AMSimulator getAMSimulatorByAppId(ApplicationId appId) { return amRunner.getAMSimulator(appId); diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java index 32567db666ef3..a22230f86616b 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java @@ -186,7 +186,6 @@ public Set getNodeLabels() { @Override public List pullNewlyIncreasedContainers() { - // TODO Auto-generated method stub return null; } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java index 26d35ac897235..dbbc88fb52d00 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java @@ -176,7 +176,6 @@ public Set getNodeLabels() { @Override public List pullNewlyIncreasedContainers() { - // TODO Auto-generated method stub return Collections.emptyList(); } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerMetrics.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerMetrics.java index 26a9da4cd8bc8..26fbcd78f3969 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerMetrics.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerMetrics.java @@ -316,7 +316,7 @@ private void registerClusterResourceMetrics() { new Gauge() { @Override public Long getValue() { - if (scheduler.getRootQueueMetrics() == null) { + if (isMetricsAvailable()) { return 0L; } else { return scheduler.getRootQueueMetrics().getAllocatedMB(); @@ -328,7 +328,7 @@ public Long getValue() { new Gauge() { @Override public Integer getValue() { - if (scheduler.getRootQueueMetrics() == null) { + if (isMetricsAvailable()) { return 0; } else { return scheduler.getRootQueueMetrics().getAllocatedVirtualCores(); @@ -340,7 +340,7 @@ public Integer getValue() { new Gauge() { @Override public Long getValue() { - if (scheduler.getRootQueueMetrics() == null) { + if (isMetricsAvailable()) { return 0L; } else { return scheduler.getRootQueueMetrics().getAvailableMB(); @@ -352,7 +352,7 @@ public Long getValue() { new Gauge() { @Override public Integer getValue() { - if (scheduler.getRootQueueMetrics() == null) { + if (isMetricsAvailable()) { return 0; } else { return scheduler.getRootQueueMetrics().getAvailableVirtualCores(); @@ -362,6 +362,10 @@ public Integer getValue() { ); } + private boolean isMetricsAvailable() { + return scheduler.getRootQueueMetrics() == null; + } + private void registerContainerAppNumMetrics() { metrics.register("variable.running.application", new Gauge() {