Skip to content

Commit

Permalink
Make WorkRequestHandler do a GC after some amount of CPU time has bee…
Browse files Browse the repository at this point in the history
…n used on requests. For Bazel and Blaze, defaults to 10s based on benchmarking.

Linux benchmarks show no significant regressions in wall or CPU time, and possibly a reduction in system time.

RELNOTES: None.
PiperOrigin-RevId: 362006101
  • Loading branch information
larsrc-google authored and copybara-github committed Mar 10, 2021
1 parent 6c5a3ee commit 94ac839
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.io.PrintWriter;
import java.io.Writer;
import java.nio.charset.Charset;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;

Expand All @@ -46,7 +47,8 @@ public static void main(String[] args) {
new WorkRequestHandler(
builder::parseAndBuild,
System.err,
new ProtoWorkerMessageProcessor(System.in, System.out));
new ProtoWorkerMessageProcessor(System.in, System.out),
Duration.ofSeconds(10));
try {
workerHandler.processRequests();
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,19 @@
// limitations under the License.
package com.google.devtools.build.lib.worker;


import com.google.common.annotations.VisibleForTesting;
import com.google.devtools.build.lib.worker.WorkerProtocol.WorkRequest;
import com.google.devtools.build.lib.worker.WorkerProtocol.WorkResponse;
import com.sun.management.OperatingSystemMXBean;
import java.io.IOException;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.lang.management.ManagementFactory;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;

/**
Expand Down Expand Up @@ -53,19 +58,50 @@ public interface WorkerMessageProcessor {

final WorkerMessageProcessor messageProcessor;

private final CpuTimeBasedGcScheduler gcScheduler;

/**
* Creates a {@code WorkRequestHandler} that will call {@code callback} for each WorkRequest
* received. The first argument to {@code callback} is the set of command-line arguments, the
* second is where all error messages and similar should be written to. The callback should return
* an exit code indicating success (0) or failure (nonzero).
* received.
*
* @param callback Callback method for executing a single WorkRequest in a thread. The first
* argument to {@code callback} is the set of command-line arguments, the second is where all
* error messages and other user-oriented messages should be written to. The callback must
* return an exit code indicating success (zero) or failure (nonzero).
* @param stderr Stream that log messages should be written to, typically the process' stderr.
* @param messageProcessor Object responsible for parsing {@code WorkRequest}s from the server and
* writing {@code WorkResponses} to the server.
*/
public WorkRequestHandler(
BiFunction<List<String>, PrintWriter, Integer> callback,
PrintStream stderr,
WorkerMessageProcessor messageProcessor) {
this(callback, stderr, messageProcessor, Duration.ZERO);
}

/**
* Creates a {@code WorkRequestHandler} that will call {@code callback} for each WorkRequest
* received.
*
* @param callback Callback method for executing a single WorkRequest in a thread. The first
* argument to {@code callback} is the set of command-line arguments, the second is where all
* error messages and other user-oriented messages should be written to. The callback must
* return an exit code indicating success (zero) or failure (nonzero).
* @param stderr Stream that log messages should be written to, typically the process' stderr.
* @param messageProcessor Object responsible for parsing {@code WorkRequest}s from the server and
* writing {@code WorkResponses} to the server.
* @param cpuUsageBeforeGc The minimum amount of CPU time between explicit garbage collection
* calls.
*/
public WorkRequestHandler(
BiFunction<List<String>, PrintWriter, Integer> callback,
PrintStream stderr,
WorkerMessageProcessor messageProcessor,
Duration cpuUsageBeforeGc) {
this.callback = callback;
this.stderr = stderr;
this.messageProcessor = messageProcessor;
this.gcScheduler = new CpuTimeBasedGcScheduler(cpuUsageBeforeGc);
}

/**
Expand All @@ -77,13 +113,13 @@ public WorkRequestHandler(
public void processRequests() throws IOException {
while (true) {
WorkRequest request = messageProcessor.readWorkRequest();
if (request == null) {
break;
}
if (request.getRequestId() != 0) {
if (request == null) {
break;
}
if (request.getRequestId() != 0) {
Thread t = createResponseThread(request);
t.start();
} else {
t.start();
} else {
respondToRequest(request);
}
}
Expand Down Expand Up @@ -127,11 +163,62 @@ void respondToRequest(WorkRequest request) throws IOException {
synchronized (this) {
messageProcessor.writeWorkResponse(workResponse);
}
gcScheduler.maybePerformGc();
}
}

@Override
public void close() throws IOException {
messageProcessor.close();
}

/**
* Class that performs GC occasionally, based on how much CPU time has passed. This strikes a
* compromise between blindly doing GC after e.g. every request, which takes too much CPU, and not
* doing explicit GC at all, which causes poor garbage collection in some cases.
*/
private static class CpuTimeBasedGcScheduler {
/**
* After this much CPU time has elapsed, we may force a GC run. Set to {@link Duration#ZERO} to
* disable.
*/
private final Duration cpuUsageBeforeGc;

/** The total process CPU time at the last GC run (or from the start of the worker). */
private final AtomicReference<Duration> cpuTimeAtLastGc;

/** Used to get the CPU time used by this process. */
private static final OperatingSystemMXBean bean =
(OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();

/**
* Creates a new {@link CpuTimeBasedGcScheduler} that may perform GC after {@code
* cpuUsageBeforeGc} amount of CPU time has been used.
*/
public CpuTimeBasedGcScheduler(Duration cpuUsageBeforeGc) {
this.cpuUsageBeforeGc = cpuUsageBeforeGc;
this.cpuTimeAtLastGc = new AtomicReference<>(getCpuTime());
}

private Duration getCpuTime() {
return !cpuUsageBeforeGc.isZero()
? Duration.ofNanos(bean.getProcessCpuTime())
: Duration.ZERO;
}

/** Call occasionally to perform a GC if enough CPU time has been used. */
private void maybePerformGc() {
if (!cpuUsageBeforeGc.isZero()) {
Duration currentCpuTime = getCpuTime();
Duration lastCpuTime = cpuTimeAtLastGc.get();
// Do GC when enough CPU time has been used, but only if nobody else beat us to it.
if (currentCpuTime.minus(lastCpuTime).compareTo(cpuUsageBeforeGc) > 0
&& cpuTimeAtLastGc.compareAndSet(lastCpuTime, currentCpuTime)) {
System.gc();
// Avoid counting GC CPU time against CPU time before next GC.
cpuTimeAtLastGc.compareAndSet(currentCpuTime, getCpuTime());
}
}
}
}
}

0 comments on commit 94ac839

Please sign in to comment.