diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/LargeCommitTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/LargeCommitTest.java new file mode 100644 index 000000000000..26f1e60b6773 --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/LargeCommitTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.ValidatesRunner; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.hamcrest.Matchers; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class LargeCommitTest { + + @Rule public transient TestPipeline p = TestPipeline.create(); + + @Test + @Category({ValidatesRunner.class}) + public void testLargeCommit() { + // 5 50MB values shuffling to a single key + String value = bigString('a', 50 << 20); + KV kv = KV.of("a", value); + PCollection>> result = + p.apply(Create.of(kv, kv, kv, kv, kv)).apply(GroupByKey.create()); + + PAssert.that(result) + .satisfies( + kvs -> { + assertTrue(kvs.iterator().hasNext()); + KV> outputKV = kvs.iterator().next(); + assertFalse(kvs.iterator().hasNext()); + assertEquals("a", outputKV.getKey()); + assertThat(outputKV.getValue(), Matchers.contains(value, value, value, value, value)); + return null; + }); + p.run(); + } + + private static String bigString(char c, int size) { + char[] buf = new char[size]; + for (int i = 0; i < size; i++) { + buf[i] = c; + } + return new String(buf); + } +} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OperationalLimits.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OperationalLimits.java index 84f41c473fe0..2a42e3cfa3f9 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OperationalLimits.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OperationalLimits.java @@ -25,8 +25,6 @@ @Internal public abstract class OperationalLimits { - private static final long DEFAULT_MAX_WORK_ITEM_COMMIT_BYTES = 180 << 20; - // Maximum size of a commit from a single work item. public abstract long getMaxWorkItemCommitBytes(); // Maximum size of a single output element's serialized key. @@ -48,7 +46,7 @@ public abstract static class Builder { public static OperationalLimits.Builder builder() { return new AutoValue_OperationalLimits.Builder() - .setMaxWorkItemCommitBytes(DEFAULT_MAX_WORK_ITEM_COMMIT_BYTES) + .setMaxWorkItemCommitBytes(Long.MAX_VALUE) .setMaxOutputKeyBytes(Long.MAX_VALUE) .setMaxOutputValueBytes(Long.MAX_VALUE); }