Skip to content

Commit

Permalink
Automatically switch to upload_graph when the graph is large (#28621)
Browse files Browse the repository at this point in the history
* Automatically Switch to upload_graph when the graph is large

* fix the formats

* updated the CHANGES.md

* added one test

* fixed the styles

* fixed the styles

* fixed the styles

* addressed the comments
  • Loading branch information
liferoad authored Oct 27, 2023
1 parent f4574bc commit 3b89039
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ should handle this. ([#25252](https://github.com/apache/beam/issues/25252)).
## New Features / Improvements

* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* `upload_graph` as one of the Experiments options for DataflowRunner is no longer required when the graph is larger than 10MB for Java SDK ([PR#28621](https://github.com/apache/beam/pull/28621).

## Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,6 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Joiner;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Utf8;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
Expand Down Expand Up @@ -1330,15 +1329,26 @@ public DataflowPipelineJob run(Pipeline pipeline) {
hooks.modifyEnvironmentBeforeSubmission(newJob.getEnvironment());
}

// enable upload_graph when the graph is too large
byte[] jobGraphBytes = DataflowPipelineTranslator.jobToString(newJob).getBytes(UTF_8);
int jobGraphByteSize = jobGraphBytes.length;
if (jobGraphByteSize >= CREATE_JOB_REQUEST_LIMIT_BYTES
&& !hasExperiment(options, "upload_graph")) {
List<String> experiments = firstNonNull(options.getExperiments(), new ArrayList<>());
experiments.add("upload_graph");
options.setExperiments(ImmutableList.copyOf(experiments));
LOG.info(
"The job graph size ({} in bytes) is larger than {}. Automatically add "
+ "the upload_graph option to experiments.",
jobGraphByteSize,
CREATE_JOB_REQUEST_LIMIT_BYTES);
}

// Upload the job to GCS and remove the graph object from the API call. The graph
// will be downloaded from GCS by the service.
if (hasExperiment(options, "upload_graph")) {
DataflowPackage stagedGraph =
options
.getStager()
.stageToFile(
DataflowPipelineTranslator.jobToString(newJob).getBytes(UTF_8),
DATAFLOW_GRAPH_FILE_NAME);
options.getStager().stageToFile(jobGraphBytes, DATAFLOW_GRAPH_FILE_NAME);
newJob.getSteps().clear();
newJob.setStepsLocation(stagedGraph.getLocation());
}
Expand Down Expand Up @@ -1398,7 +1408,7 @@ public DataflowPipelineJob run(Pipeline pipeline) {
} catch (GoogleJsonResponseException e) {
String errorMessages = "Unexpected errors";
if (e.getDetails() != null) {
if (Utf8.encodedLength(newJob.toString()) >= CREATE_JOB_REQUEST_LIMIT_BYTES) {
if (jobGraphByteSize >= CREATE_JOB_REQUEST_LIMIT_BYTES) {
errorMessages =
"The size of the serialized JSON representation of the pipeline "
+ "exceeds the allowable limit. "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ public void setUp() throws IOException {
mockJobs = mock(Dataflow.Projects.Locations.Jobs.class);
}

private Pipeline buildDataflowPipeline(DataflowPipelineOptions options) {
private static Pipeline buildDataflowPipeline(DataflowPipelineOptions options) {
options.setStableUniqueNames(CheckEnabled.ERROR);
options.setRunner(DataflowRunner.class);
Pipeline p = Pipeline.create(options);
Expand All @@ -256,6 +256,22 @@ private Pipeline buildDataflowPipeline(DataflowPipelineOptions options) {
return p;
}

private static Pipeline buildDataflowPipelineWithLargeGraph(DataflowPipelineOptions options) {
options.setStableUniqueNames(CheckEnabled.ERROR);
options.setRunner(DataflowRunner.class);
Pipeline p = Pipeline.create(options);

for (int i = 0; i < 100; i++) {
p.apply("ReadMyFile_" + i, TextIO.read().from("gs://bucket/object"))
.apply("WriteMyFile_" + i, TextIO.write().to("gs://bucket/object"));
}

// Enable the FileSystems API to know about gs:// URIs in this test.
FileSystems.setDefaultPipelineOptions(options);

return p;
}

private static Dataflow buildMockDataflow(Dataflow.Projects.Locations.Jobs mockJobs)
throws IOException {
Dataflow mockDataflowClient = mock(Dataflow.class);
Expand Down Expand Up @@ -824,6 +840,24 @@ public void testUploadGraph() throws IOException {
.startsWith("gs://valid-bucket/temp/staging/dataflow_graph"));
}

/** Test for automatically using upload_graph when the job graph is too large (>10MB). */
@Test
public void testUploadGraphWithAutoUpload() throws IOException {
DataflowPipelineOptions options = buildPipelineOptions();
Pipeline p = buildDataflowPipelineWithLargeGraph(options);
p.run();

ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
Mockito.verify(mockJobs).create(eq(PROJECT_ID), eq(REGION_ID), jobCaptor.capture());
assertValidJob(jobCaptor.getValue());
assertTrue(jobCaptor.getValue().getSteps().isEmpty());
assertTrue(
jobCaptor
.getValue()
.getStepsLocation()
.startsWith("gs://valid-bucket/temp/staging/dataflow_graph"));
}

@Test
public void testUpdateNonExistentPipeline() throws IOException {
thrown.expect(IllegalArgumentException.class);
Expand Down

0 comments on commit 3b89039

Please sign in to comment.