Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
002
Browse files Browse the repository at this point in the history
szilard-nemeth committed Mar 29, 2022
1 parent bbff9e1 commit 5780f12
Showing 6 changed files with 27 additions and 19 deletions.
Original file line number Diff line number Diff line change
@@ -147,12 +147,10 @@ private void startAMFromSLSTrace(String inputTrace) throws IOException {
private void startAMFromSynthGenerator() throws YarnException, IOException {
Configuration localConf = new Configuration();
localConf.set("fs.defaultFS", "file:///");
//TODO
//if we use the nodeFile this could have been not initialized yet.
// if (stjp == null) {
// stjp = new SynthTraceJobProducer(conf, new Path(inputTraces[0]));
// slsRunner.setStjp(stjp);
// }
if (slsRunner.getStjp() == null) {
slsRunner.setStjp(new SynthTraceJobProducer(conf, new Path(inputTraces[0])));
}

SynthJob job;
// we use stjp, a reference to the job producer instantiated during node
Original file line number Diff line number Diff line change
@@ -60,7 +60,7 @@ public class NMRunner {
private Configuration conf;
private ResourceManager rm;
private String tableMapping;
private int thredPoolSize;
private int threadPoolSize;
private TraceType inputType;
private String[] inputTraces;
private SynthTraceJobProducer stjp;
@@ -70,7 +70,7 @@ public NMRunner(TaskRunner taskRunner, Configuration conf, ResourceManager rm, S
this.conf = conf;
this.rm = rm;
this.tableMapping = tableMapping;
this.thredPoolSize = threadPoolSize;
this.threadPoolSize = threadPoolSize;
this.nmMap = new ConcurrentHashMap<>();
this.nodeManagerResource = getNodeManagerResourceFromConf();
}
@@ -119,7 +119,7 @@ public void startNM() throws YarnException, IOException,
// create NM simulators
Random random = new Random();
Set<String> rackSet = ConcurrentHashMap.newKeySet();
int threadPoolSize = Math.max(thredPoolSize,
int threadPoolSize = Math.max(this.threadPoolSize,
SLSConfiguration.RUNNER_POOL_SIZE_DEFAULT);
ExecutorService executorService = Executors.
newFixedThreadPool(threadPoolSize);
@@ -231,4 +231,8 @@ public SynthTraceJobProducer getStjp() {
public void setTableMapping(String tableMapping) {
this.tableMapping = tableMapping;
}

public void setRm(ResourceManager rm) {
this.rm = rm;
}
}
Original file line number Diff line number Diff line change
@@ -197,7 +197,11 @@ public void setSimulationParams(TraceType inType, String[] inTraces,
String tableMapping = metricsOutputDir + "/tableMapping.csv";
this.rmRunner.setTableMapping(tableMapping);
this.nmRunner.setTableMapping(tableMapping);
this.stjp = getSynthJobTraceProducer();

//We need this.inputTraces to set before creating SynthTraceJobProducer
if (inType == TraceType.SYNTH) {
this.stjp = getSynthJobTraceProducer();
}
}

public void start() throws IOException, ClassNotFoundException, YarnException,
@@ -206,6 +210,7 @@ public void start() throws IOException, ClassNotFoundException, YarnException,

// start resource manager
rmRunner.startRM();
nmRunner.setRm(rmRunner.getRm());
amRunner.setResourceManager(rmRunner.getRm());

// start node managers
@@ -503,10 +508,9 @@ public SynthTraceJobProducer getStjp() {
return stjp;
}

//TODO
// public void setStjp(SynthTraceJobProducer stjp) {
// this.stjp = stjp;
// }
public void setStjp(SynthTraceJobProducer stjp) {
this.stjp = stjp;
}

public AMSimulator getAMSimulatorByAppId(ApplicationId appId) {
return amRunner.getAMSimulator(appId);
Original file line number Diff line number Diff line change
@@ -186,7 +186,6 @@ public Set<String> getNodeLabels() {

@Override
public List<Container> pullNewlyIncreasedContainers() {
// TODO Auto-generated method stub
return null;
}

Original file line number Diff line number Diff line change
@@ -176,7 +176,6 @@ public Set<String> getNodeLabels() {

@Override
public List<Container> pullNewlyIncreasedContainers() {
// TODO Auto-generated method stub
return Collections.emptyList();
}

Original file line number Diff line number Diff line change
@@ -316,7 +316,7 @@ private void registerClusterResourceMetrics() {
new Gauge<Long>() {
@Override
public Long getValue() {
if (scheduler.getRootQueueMetrics() == null) {
if (isMetricsAvailable()) {
return 0L;
} else {
return scheduler.getRootQueueMetrics().getAllocatedMB();
@@ -328,7 +328,7 @@ public Long getValue() {
new Gauge<Integer>() {
@Override
public Integer getValue() {
if (scheduler.getRootQueueMetrics() == null) {
if (isMetricsAvailable()) {
return 0;
} else {
return scheduler.getRootQueueMetrics().getAllocatedVirtualCores();
@@ -340,7 +340,7 @@ public Integer getValue() {
new Gauge<Long>() {
@Override
public Long getValue() {
if (scheduler.getRootQueueMetrics() == null) {
if (isMetricsAvailable()) {
return 0L;
} else {
return scheduler.getRootQueueMetrics().getAvailableMB();
@@ -352,7 +352,7 @@ public Long getValue() {
new Gauge<Integer>() {
@Override
public Integer getValue() {
if (scheduler.getRootQueueMetrics() == null) {
if (isMetricsAvailable()) {
return 0;
} else {
return scheduler.getRootQueueMetrics().getAvailableVirtualCores();
@@ -362,6 +362,10 @@ public Integer getValue() {
);
}

private boolean isMetricsAvailable() {
return scheduler.getRootQueueMetrics() == null;
}

private void registerContainerAppNumMetrics() {
metrics.register("variable.running.application",
new Gauge<Integer>() {

0 comments on commit 5780f12

Please sign in to comment.