Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatically switch to upload_graph when the graph is large #28621

Merged
merged 13 commits into from
Oct 27, 2023
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ should handle this. ([#25252](https://github.com/apache/beam/issues/25252)).
## New Features / Improvements

* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* `upload_graph` as one of the Experiments options for DataflowRunner is no longer required when the graph is larger than 10MB for Java SDK ([PR#28621](https://github.com/apache/beam/pull/28621).

## Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,6 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Joiner;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Utf8;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
Expand Down Expand Up @@ -1330,15 +1329,26 @@ public DataflowPipelineJob run(Pipeline pipeline) {
hooks.modifyEnvironmentBeforeSubmission(newJob.getEnvironment());
}

// enable upload_graph when the graph is too large
byte[] jobGraphBytes = DataflowPipelineTranslator.jobToString(newJob).getBytes(UTF_8);
int jobGraphByteSize = jobGraphBytes.length;
if (jobGraphByteSize >= CREATE_JOB_REQUEST_LIMIT_BYTES
&& !hasExperiment(options, "upload_graph")) {
List<String> experiments = firstNonNull(options.getExperiments(), new ArrayList<>());
experiments.add("upload_graph");
options.setExperiments(ImmutableList.copyOf(experiments));
LOG.info(
"The job graph size ({} in bytes) is larger than {}. Automatically add "
+ "the upload_graph option to experiments.",
jobGraphByteSize,
CREATE_JOB_REQUEST_LIMIT_BYTES);
}

// Upload the job to GCS and remove the graph object from the API call. The graph
// will be downloaded from GCS by the service.
if (hasExperiment(options, "upload_graph")) {
DataflowPackage stagedGraph =
options
.getStager()
.stageToFile(
DataflowPipelineTranslator.jobToString(newJob).getBytes(UTF_8),
DATAFLOW_GRAPH_FILE_NAME);
options.getStager().stageToFile(jobGraphBytes, DATAFLOW_GRAPH_FILE_NAME);
newJob.getSteps().clear();
newJob.setStepsLocation(stagedGraph.getLocation());
}
Expand Down Expand Up @@ -1398,7 +1408,7 @@ public DataflowPipelineJob run(Pipeline pipeline) {
} catch (GoogleJsonResponseException e) {
String errorMessages = "Unexpected errors";
if (e.getDetails() != null) {
if (Utf8.encodedLength(newJob.toString()) >= CREATE_JOB_REQUEST_LIMIT_BYTES) {
if (jobGraphByteSize >= CREATE_JOB_REQUEST_LIMIT_BYTES) {
errorMessages =
"The size of the serialized JSON representation of the pipeline "
+ "exceeds the allowable limit. "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ public void setUp() throws IOException {
mockJobs = mock(Dataflow.Projects.Locations.Jobs.class);
}

private Pipeline buildDataflowPipeline(DataflowPipelineOptions options) {
private static Pipeline buildDataflowPipeline(DataflowPipelineOptions options) {
options.setStableUniqueNames(CheckEnabled.ERROR);
options.setRunner(DataflowRunner.class);
Pipeline p = Pipeline.create(options);
Expand All @@ -256,6 +256,22 @@ private Pipeline buildDataflowPipeline(DataflowPipelineOptions options) {
return p;
}

private static Pipeline buildDataflowPipelineWithLargeGraph(DataflowPipelineOptions options) {
options.setStableUniqueNames(CheckEnabled.ERROR);
options.setRunner(DataflowRunner.class);
Pipeline p = Pipeline.create(options);

for (int i = 0; i < 100; i++) {
p.apply("ReadMyFile_" + i, TextIO.read().from("gs://bucket/object"))
.apply("WriteMyFile_" + i, TextIO.write().to("gs://bucket/object"));
}

// Enable the FileSystems API to know about gs:// URIs in this test.
FileSystems.setDefaultPipelineOptions(options);

return p;
}

private static Dataflow buildMockDataflow(Dataflow.Projects.Locations.Jobs mockJobs)
throws IOException {
Dataflow mockDataflowClient = mock(Dataflow.class);
Expand Down Expand Up @@ -824,6 +840,24 @@ public void testUploadGraph() throws IOException {
.startsWith("gs://valid-bucket/temp/staging/dataflow_graph"));
}

/** Test for automatically using upload_graph when the job graph is too large (>10MB). */
@Test
public void testUploadGraphWithAutoUpload() throws IOException {
DataflowPipelineOptions options = buildPipelineOptions();
Pipeline p = buildDataflowPipelineWithLargeGraph(options);
p.run();

ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
Mockito.verify(mockJobs).create(eq(PROJECT_ID), eq(REGION_ID), jobCaptor.capture());
assertValidJob(jobCaptor.getValue());
assertTrue(jobCaptor.getValue().getSteps().isEmpty());
assertTrue(
jobCaptor
.getValue()
.getStepsLocation()
.startsWith("gs://valid-bucket/temp/staging/dataflow_graph"));
liferoad marked this conversation as resolved.
Show resolved Hide resolved
}

@Test
public void testUpdateNonExistentPipeline() throws IOException {
thrown.expect(IllegalArgumentException.class);
Expand Down
Loading