Skip to content

Commit

Permalink
YARN-8234. Improve RM system metrics publisher's performance by pushi…
Browse files Browse the repository at this point in the history
…ng events to timeline server in batch (#3793)

Signed-off-by: Akira Ajisaka <[email protected]>
  • Loading branch information
ashutoshcipher authored Dec 23, 2021
1 parent 97ed029 commit 00e2405
Show file tree
Hide file tree
Showing 4 changed files with 258 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -762,6 +762,20 @@ public static boolean isAclEnabled(Configuration conf) {
public static final int
DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE = 10;

public static final String RM_TIMELINE_SERVER_V1_PUBLISHER_DISPATCHER_BATCH_SIZE =
RM_PREFIX + "system-metrics-publisher.timeline-server-v1.batch-size";
public static final int
DEFAULT_RM_TIMELINE_SERVER_V1_PUBLISHER_DISPATCHER_BATCH_SIZE =
1000;
public static final String RM_TIMELINE_SERVER_V1_PUBLISHER_INTERVAL =
RM_PREFIX + "system-metrics-publisher.timeline-server-v1.interval-seconds";
public static final int DEFAULT_RM_TIMELINE_SERVER_V1_PUBLISHER_INTERVAL =
60;
public static final String RM_TIMELINE_SERVER_V1_PUBLISHER_BATCH_ENABLED =
RM_PREFIX + "system-metrics-publisher.timeline-server-v1.enable-batch";
public static final boolean DEFAULT_RM_TIMELINE_SERVER_V1_PUBLISHER_BATCH_ENABLED =
false;

//RM delegation token related keys
public static final String RM_DELEGATION_KEY_UPDATE_INTERVAL_KEY =
RM_PREFIX + "delegation.key.update-interval";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1008,6 +1008,33 @@
<value>10</value>
</property>

<property>
<description>
This setting enables/disables timeline server v1 publisher to publish timeline events in batch.
</description>
<name>yarn.resourcemanager.system-metrics-publisher.timeline-server-v1.enable-batch</name>
<value>false</value>
</property>

<property>
<description>
The size of timeline server v1 publisher sending events in one request.
</description>
<name>yarn.resourcemanager.system-metrics-publisher.timeline-server-v1.batch-size</name>
<value>1000</value>
</property>

<property>
<description>
When enable batch publishing in timeline server v1, we must avoid that the
publisher waits for a batch to be filled up and hold events in buffer for long
time. So we add another thread which send event's in the buffer periodically.
This config sets the interval of the cyclical sending thread.
</description>
<name>yarn.resourcemanager.system-metrics-publisher.timeline-server-v1.interval-seconds</name>
<value>60</value>
</property>

<property>
<description>Number of diagnostics/failure messages can be saved in RM for
log aggregation. It also defines the number of diagnostics/failure
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,13 @@

package org.apache.hadoop.yarn.server.resourcemanager.metrics;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -32,6 +37,7 @@
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
Expand Down Expand Up @@ -59,16 +65,92 @@ public TimelineServiceV1Publisher() {
}

private TimelineClient client;
private LinkedBlockingQueue<TimelineEntity> entityQueue;
private ExecutorService sendEventThreadPool;
private int dispatcherPoolSize;
private int dispatcherBatchSize;
private int putEventInterval;
private boolean isTimeLineServerBatchEnabled;
private volatile boolean stopped = false;
private PutEventThread putEventThread;
private Object sendEntityLock;

@Override
protected void serviceInit(Configuration conf) throws Exception {
isTimeLineServerBatchEnabled =
conf.getBoolean(
YarnConfiguration.RM_TIMELINE_SERVER_V1_PUBLISHER_BATCH_ENABLED,
YarnConfiguration.DEFAULT_RM_TIMELINE_SERVER_V1_PUBLISHER_BATCH_ENABLED);
if (isTimeLineServerBatchEnabled) {
putEventInterval =
conf.getInt(YarnConfiguration.RM_TIMELINE_SERVER_V1_PUBLISHER_INTERVAL,
YarnConfiguration.DEFAULT_RM_TIMELINE_SERVER_V1_PUBLISHER_INTERVAL)
* 1000;
if (putEventInterval <= 0) {
throw new IllegalArgumentException(
"RM_TIMELINE_SERVER_V1_PUBLISHER_INTERVAL should be greater than 0");
}
dispatcherPoolSize = conf.getInt(
YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE,
YarnConfiguration.
DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE);
if (dispatcherPoolSize <= 0) {
throw new IllegalArgumentException(
"RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE should be greater than 0");
}
dispatcherBatchSize = conf.getInt(
YarnConfiguration.RM_TIMELINE_SERVER_V1_PUBLISHER_DISPATCHER_BATCH_SIZE,
YarnConfiguration.
DEFAULT_RM_TIMELINE_SERVER_V1_PUBLISHER_DISPATCHER_BATCH_SIZE);
if (dispatcherBatchSize <= 1) {
throw new IllegalArgumentException(
"RM_TIMELINE_SERVER_V1_PUBLISHER_DISPATCHER_BATCH_SIZE should be greater than 1");
}
putEventThread = new PutEventThread();
sendEventThreadPool = Executors.newFixedThreadPool(dispatcherPoolSize);
entityQueue = new LinkedBlockingQueue<>(dispatcherBatchSize + 1);
sendEntityLock = new Object();
LOG.info("Timeline service v1 batch publishing enabled");
} else {
LOG.info("Timeline service v1 batch publishing disabled");
}
client = TimelineClient.createTimelineClient();
addIfService(client);
super.serviceInit(conf);
getDispatcher().register(SystemMetricsEventType.class,
new TimelineV1EventHandler());
}

protected void serviceStart() throws Exception {
if (isTimeLineServerBatchEnabled) {
stopped = false;
putEventThread.start();
}
super.serviceStart();
}

protected void serviceStop() throws Exception {
super.serviceStop();
if (isTimeLineServerBatchEnabled) {
stopped = true;
putEventThread.interrupt();
try {
putEventThread.join();
SendEntity task = new SendEntity();
if (!task.buffer.isEmpty()) {
LOG.info("Initiating final putEntities, remaining entities left in entityQueue: {}",
task.buffer.size());
sendEventThreadPool.submit(task);
}
} finally {
sendEventThreadPool.shutdown();
if (!sendEventThreadPool.awaitTermination(3, TimeUnit.SECONDS)) {
sendEventThreadPool.shutdownNow();
}
}
}
}

@SuppressWarnings("unchecked")
@Override
public void appCreated(RMApp app, long createdTime) {
Expand Down Expand Up @@ -257,7 +339,7 @@ public void appAttemptRegistered(RMAppAttempt appAttempt,
@SuppressWarnings("unchecked")
@Override
public void appAttemptFinished(RMAppAttempt appAttempt,
RMAppAttemptState appAttemtpState, RMApp app, long finishedTime) {
RMAppAttemptState appAttemptState, RMApp app, long finishedTime) {
TimelineEntity entity =
createAppAttemptEntity(appAttempt.getAppAttemptId());

Expand All @@ -274,7 +356,7 @@ public void appAttemptFinished(RMAppAttempt appAttempt,
eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_INFO,
app.getFinalApplicationStatus().toString());
eventInfo.put(AppAttemptMetricsConstants.STATE_INFO, RMServerUtils
.createApplicationAttemptState(appAttemtpState).toString());
.createApplicationAttemptState(appAttemptState).toString());
if (appAttempt.getMasterContainer() != null) {
eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_INFO,
appAttempt.getMasterContainer().getId().toString());
Expand Down Expand Up @@ -374,23 +456,68 @@ private static TimelineEntity createContainerEntity(ContainerId containerId) {
}

private void putEntity(TimelineEntity entity) {
try {
if (isTimeLineServerBatchEnabled) {
try {
entityQueue.put(entity);
if (entityQueue.size() > dispatcherBatchSize) {
SendEntity task = null;
synchronized (sendEntityLock) {
if (entityQueue.size() > dispatcherBatchSize) {
task = new SendEntity();
}
}
if (task != null) {
sendEventThreadPool.submit(task);
}
}
} catch (Exception e) {
LOG.error("Error when publishing entity batch [ " + entity.getEntityType() + ","
+ entity.getEntityId() + " ] ", e);
}
} else {
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Publishing the entity " + entity.getEntityId()
+ ", JSON-style content: "
+ TimelineUtils.dumpTimelineRecordtoJSON(entity));
}
client.putEntities(entity);
} catch (Exception e) {
LOG.error("Error when publishing entity [ " + entity.getEntityType() + ","
+ entity.getEntityId() + " ] ", e);
}
}
}

private class SendEntity implements Runnable {

private ArrayList<TimelineEntity> buffer;

SendEntity() {
buffer = new ArrayList();
entityQueue.drainTo(buffer);
}

@Override
public void run() {
if (LOG.isDebugEnabled()) {
LOG.debug("Publishing the entity " + entity.getEntityId()
+ ", JSON-style content: "
+ TimelineUtils.dumpTimelineRecordtoJSON(entity));
LOG.debug("Number of timeline entities being sent in batch: {}", buffer.size());
}
if (buffer.isEmpty()) {
return;
}
try {
client.putEntities(buffer.toArray(new TimelineEntity[0]));
} catch (Exception e) {
LOG.error("Error when publishing entity: ", e);
}
client.putEntities(entity);
} catch (Exception e) {
LOG.error("Error when publishing entity [" + entity.getEntityType() + ","
+ entity.getEntityId() + "]", e);
}
}

private class TimelineV1PublishEvent extends TimelinePublishEvent {
private TimelineEntity entity;

public TimelineV1PublishEvent(SystemMetricsEventType type,
TimelineV1PublishEvent(SystemMetricsEventType type,
TimelineEntity entity, ApplicationId appId) {
super(type, appId);
this.entity = entity;
Expand All @@ -408,4 +535,46 @@ public void handle(TimelineV1PublishEvent event) {
putEntity(event.getEntity());
}
}
}

private class PutEventThread extends Thread {
PutEventThread() {
super("PutEventThread");
}

@Override
public void run() {
LOG.info("System metrics publisher will put events every " +
String.valueOf(putEventInterval) + " milliseconds");
while (!stopped && !Thread.currentThread().isInterrupted()) {
if (System.currentTimeMillis() % putEventInterval >= 1000) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
LOG.warn(SystemMetricsPublisher.class.getName()
+ " is interrupted. Exiting.");
break;
}
continue;
}
SendEntity task = null;
synchronized (sendEntityLock) {
if (LOG.isDebugEnabled()) {
LOG.debug("Creating SendEntity task in PutEventThread");
}
task = new SendEntity();
}
if (task != null) {
sendEventThreadPool.submit(task);
}
try {
// sleep added to avoid multiple SendEntity task within a single interval.
Thread.sleep(1000);
} catch (InterruptedException e) {
LOG.warn(SystemMetricsPublisher.class.getName()
+ " is interrupted. Exiting.");
break;
}
}
}
}
}
Loading

0 comments on commit 00e2405

Please sign in to comment.