Skip to content

Commit

Permalink
Refactor Worker code to more cleanly toggle between JSON and Proto wo…
Browse files Browse the repository at this point in the history
…rker

protocol.

PiperOrigin-RevId: 331024018
  • Loading branch information
susinmotion authored and copybara-github committed Sep 10, 2020
1 parent 7c3d8c8 commit aa5592e
Show file tree
Hide file tree
Showing 4 changed files with 210 additions and 118 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright 2020 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.worker;

import static java.nio.charset.StandardCharsets.UTF_8;

import com.google.devtools.build.lib.worker.WorkerProtocol.WorkRequest;
import com.google.devtools.build.lib.worker.WorkerProtocol.WorkResponse;
import com.google.gson.stream.JsonReader;
import com.google.protobuf.util.JsonFormat;
import com.google.protobuf.util.JsonFormat.Printer;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;

/** An implementation of a Bazel worker using JSON to communicate with the worker process. */
final class JsonWorkerProtocol implements WorkerProtocolImpl {
/** Reader for reading the WorkResponse. */
private final JsonReader reader;
/** Printer for printing the WorkRequest */
private final Printer jsonPrinter;
/** Writer for writing the WorkRequest to the worker */
private final BufferedWriter jsonWriter;

JsonWorkerProtocol(InputStream stdin, OutputStream stdout) {
jsonPrinter = JsonFormat.printer().omittingInsignificantWhitespace();
jsonWriter = new BufferedWriter(new OutputStreamWriter(stdout, UTF_8));
reader = new JsonReader(new BufferedReader(new InputStreamReader(stdin, UTF_8)));
reader.setLenient(true);
}

@Override
public void putRequest(WorkRequest request) throws IOException {
jsonPrinter.appendTo(request, jsonWriter);
jsonWriter.flush();
}

@Override
public WorkResponse getResponse() throws IOException {
Integer exitCode = null;
String output = null;
Integer requestId = null;

reader.beginObject();
while (reader.hasNext()) {
String name = reader.nextName();
switch (name) {
case "exitCode":
if (exitCode != null) {
throw new IOException("Work response cannot have more than one exit code");
}
exitCode = reader.nextInt();
break;
case "output":
if (output != null) {
throw new IOException("Work response cannot have more than one output");
}
output = reader.nextString();
break;
case "requestId":
if (requestId != null) {
throw new IOException("Work response cannot have more than one requestId");
}
requestId = reader.nextInt();
break;
default:
throw new IOException(name + " is an incorrect field in work response");
}
}
reader.endObject();

WorkResponse.Builder responseBuilder = WorkResponse.newBuilder();

if (exitCode != null) {
responseBuilder.setExitCode(exitCode);
}
if (output != null) {
responseBuilder.setOutput(output);
}
if (requestId != null) {
responseBuilder.setRequestId(requestId);
}

return responseBuilder.build();
}

@Override
public void close() throws IOException {
reader.close();
jsonWriter.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2020 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.worker;

import com.google.devtools.build.lib.worker.WorkerProtocol.WorkRequest;
import com.google.devtools.build.lib.worker.WorkerProtocol.WorkResponse;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

/** An implementation of a Bazel worker using Proto to communicate with the worker process. */
final class ProtoWorkerProtocol implements WorkerProtocolImpl {

/** The worker process's stdin */
private final InputStream stdin;

/** The worker process's stdout. */
private final OutputStream stdout;

public ProtoWorkerProtocol(InputStream stdin, OutputStream stdout) {
this.stdin = stdin;
this.stdout = stdout;
}

@Override
public void putRequest(WorkRequest request) throws IOException {
request.writeDelimitedTo(stdout);
stdout.flush();
}

@Override
public WorkResponse getResponse() throws IOException {
return WorkResponse.parseDelimitedFrom(stdin);
}

@Override
public void close() {}
}
144 changes: 26 additions & 118 deletions src/main/java/com/google/devtools/build/lib/worker/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,9 @@
// limitations under the License.
package com.google.devtools.build.lib.worker;

import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static java.nio.charset.StandardCharsets.UTF_8;

import com.google.common.collect.ImmutableList;
import com.google.common.hash.HashCode;
import com.google.devtools.build.lib.actions.ExecutionRequirements.WorkerProtocolFormat;
import com.google.devtools.build.lib.sandbox.SandboxHelpers.SandboxInputs;
import com.google.devtools.build.lib.sandbox.SandboxHelpers.SandboxOutputs;
import com.google.devtools.build.lib.shell.Subprocess;
Expand All @@ -28,15 +24,8 @@
import com.google.devtools.build.lib.vfs.PathFragment;
import com.google.devtools.build.lib.worker.WorkerProtocol.WorkRequest;
import com.google.devtools.build.lib.worker.WorkerProtocol.WorkResponse;
import com.google.gson.stream.JsonReader;
import com.google.protobuf.util.JsonFormat;
import com.google.protobuf.util.JsonFormat.Printer;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -64,14 +53,13 @@ class Worker {
protected final Path workDir;
/** The path of the log file for this worker. */
private final Path logFile;
/** Stream for reading the protobuf WorkResponse. */
@Nullable protected RecordingInputStream protoRecordingStream;
/** Reader for reading the JSON WorkResponse. */
@Nullable protected JsonReader jsonReader;
/** Printer for writing the JSON WorkRequest bytes */
@Nullable protected Printer jsonPrinter;
/** BufferedWriter for the JSON WorkRequest bytes */
@Nullable protected BufferedWriter jsonWriter;
/**
* Stream for recording the WorkResponse as it's read, so that it can be printed in the case of
* parsing failures.
*/
@Nullable private RecordingInputStream recordingInputStream;
/** The implementation of the worker protocol (JSON or Proto). */
@Nullable private WorkerProtocolImpl workerProtocol;

private Subprocess process;
private Thread shutdownHook;
Expand Down Expand Up @@ -118,12 +106,11 @@ void destroy() throws IOException {
if (shutdownHook != null) {
Runtime.getRuntime().removeShutdownHook(shutdownHook);
}
if (workerProtocol != null) {
workerProtocol.close();
workerProtocol = null;
}
if (process != null) {
if (workerKey.getProtocolFormat() == WorkerProtocolFormat.JSON) {
jsonReader.close();
jsonWriter.close();
}

wasDestroyed = true;
process.destroyAndWait();
}
Expand Down Expand Up @@ -168,114 +155,35 @@ public Optional<Integer> getExitValue() {
: Optional.empty();
}

// TODO(karlgray): Create wrapper class that handles writing and reading JSON worker protocol to
// and from stream.
void putRequest(WorkRequest request) throws IOException {
switch (workerKey.getProtocolFormat()) {
case JSON:
checkNotNull(jsonWriter, "Did prepareExecution get called before putRequest?");
checkNotNull(jsonPrinter, "Did prepareExecution get called before putRequest?");

jsonPrinter.appendTo(request, jsonWriter);
jsonWriter.flush();
break;

case PROTO:
request.writeDelimitedTo(process.getOutputStream());
process.getOutputStream().flush();
break;
}
workerProtocol.putRequest(request);
}

WorkResponse getResponse() throws IOException {
switch (workerKey.getProtocolFormat()) {
case JSON:
checkNotNull(jsonReader, "Did prepareExecution get called before putRequest?");

return readResponse(jsonReader);

case PROTO:
protoRecordingStream = new RecordingInputStream(process.getInputStream());
protoRecordingStream.startRecording(4096);
// response can be null when the worker has already closed
// stdout at this point and thus the InputStream is at EOF.
return WorkResponse.parseDelimitedFrom(protoRecordingStream);
}

throw new IllegalStateException(
"Invalid protocol format; protocol formats are currently proto or json");
}

private static WorkResponse readResponse(JsonReader reader) throws IOException {
Integer exitCode = null;
String output = null;
Integer requestId = null;

reader.beginObject();
while (reader.hasNext()) {
String name = reader.nextName();
switch (name) {
case "exitCode":
if (exitCode != null) {
throw new IOException("Work response cannot have more than one exit code");
}
exitCode = reader.nextInt();
break;
case "output":
if (output != null) {
throw new IOException("Work response cannot have more than one output");
}
output = reader.nextString();
break;
case "requestId":
if (requestId != null) {
throw new IOException("Work response cannot have more than one requestId");
}
requestId = reader.nextInt();
break;
default:
throw new IOException(name + " is an incorrect field in work response");
}
}
reader.endObject();

WorkResponse.Builder responseBuilder = WorkResponse.newBuilder();

if (exitCode != null) {
responseBuilder.setExitCode(exitCode);
}
if (output != null) {
responseBuilder.setOutput(output);
}
if (requestId != null) {
responseBuilder.setRequestId(requestId);
}

return responseBuilder.build();
recordingInputStream.startRecording(4096);
return workerProtocol.getResponse();
}

String getRecordingStreamMessage() {
protoRecordingStream.readRemaining();
return protoRecordingStream.getRecordedDataAsString();
recordingInputStream.readRemaining();
return recordingInputStream.getRecordedDataAsString();
}

public void prepareExecution(
SandboxInputs inputFiles, SandboxOutputs outputs, Set<PathFragment> workerFiles)
throws IOException {
if (process == null) {
process = createProcess();

if (workerKey.getProtocolFormat() == WorkerProtocolFormat.JSON) {
checkState(jsonReader == null, "JSON streams inconsistent with process status");
checkState(jsonPrinter == null, "JSON streams inconsistent with process status");
checkState(jsonWriter == null, "JSON streams inconsistent with process status");

jsonReader =
new JsonReader(
new BufferedReader(new InputStreamReader(process.getInputStream(), UTF_8)));
jsonReader.setLenient(true);
jsonPrinter = JsonFormat.printer().omittingInsignificantWhitespace();
jsonWriter = new BufferedWriter(new OutputStreamWriter(process.getOutputStream(), UTF_8));
recordingInputStream = new RecordingInputStream(process.getInputStream());
}
if (workerProtocol == null) {
switch (workerKey.getProtocolFormat()) {
case JSON:
workerProtocol = new JsonWorkerProtocol(recordingInputStream, process.getOutputStream());
break;
case PROTO:
workerProtocol = new ProtoWorkerProtocol(recordingInputStream, process.getOutputStream());
break;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2020 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.worker;

import com.google.devtools.build.lib.worker.WorkerProtocol.WorkRequest;
import com.google.devtools.build.lib.worker.WorkerProtocol.WorkResponse;
import java.io.Closeable;
import java.io.IOException;

/** Represents the communication between Bazel and a persistent worker. */
interface WorkerProtocolImpl extends Closeable {
/** Writes the provided work request to the worker. */
void putRequest(WorkRequest request) throws IOException;

/** Reads a response from the worker. */
WorkResponse getResponse() throws IOException;
}

0 comments on commit aa5592e

Please sign in to comment.