From 1dfd39bc6d01678babebf5d7c35f9d5e91dff0a2 Mon Sep 17 00:00:00 2001 From: Jeff Kinard Date: Fri, 5 Apr 2024 18:27:55 -0400 Subject: [PATCH 01/18] Fix merging with missing environments. (#30864) --- sdks/python/apache_beam/runners/common.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index 630ed7910c8d..721bee4fceb5 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -1981,27 +1981,34 @@ def env_key(env): base_env_key(e) for e in environments.expand_anyof_environments(env))) - cannonical_enviornments = collections.defaultdict(list) + canonical_environments = collections.defaultdict(list) for env_id, env in pipeline_proto.components.environments.items(): - cannonical_enviornments[env_key(env)].append(env_id) + canonical_environments[env_key(env)].append(env_id) - if len(cannonical_enviornments) == len( - pipeline_proto.components.environments): + if len(canonical_environments) == len(pipeline_proto.components.environments): # All environments are already sufficiently distinct. return pipeline_proto environment_remappings = { e: es[0] - for es in cannonical_enviornments.values() for e in es + for es in canonical_environments.values() for e in es } if not inplace: pipeline_proto = copy.copy(pipeline_proto) for t in pipeline_proto.components.transforms.values(): + if t.environment_id not in pipeline_proto.components.environments: + # TODO(https://github.com/apache/beam/issues/30876): Remove this + # workaround. + continue if t.environment_id: t.environment_id = environment_remappings[t.environment_id] for w in pipeline_proto.components.windowing_strategies.values(): + if w.environment_id not in pipeline_proto.components.environments: + # TODO(https://github.com/apache/beam/issues/30876): Remove this + # workaround. + continue if w.environment_id: w.environment_id = environment_remappings[w.environment_id] for e in set(pipeline_proto.components.environments.keys()) - set( From 4452a6c8d9758c9cd3ea39d37aaf149927c633ce Mon Sep 17 00:00:00 2001 From: tvalentyn Date: Fri, 5 Apr 2024 16:43:54 -0700 Subject: [PATCH 02/18] Adds a bound on pydantic to exclude incompatible versions in the compat test. (#30863) * Add a lower bound on pydantic * Add an upper bound on pydantic in the respective test suite. * Restrict deps some more. --- sdks/python/tox.ini | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini index 86d12f31a18e..153b6acde8e7 100644 --- a/sdks/python/tox.ini +++ b/sdks/python/tox.ini @@ -102,7 +102,7 @@ passenv = GIT_*,BUILD_*,ghprb*,CHANGE_ID,BRANCH_NAME,JENKINS_*,CODECOV_*,GITHUB_ extras = test,gcp,interactive,dataframe,aws commands = bash {toxinidir}/scripts/run_pytest.sh {envname} "{posargs}" "--cov-report=xml --cov=. --cov-append" - + [testenv:py38-lint] # Don't set TMPDIR to avoid "AF_UNIX path too long" errors in pylint. setenv = @@ -314,6 +314,8 @@ commands = [testenv:py{38,39}-tft-{113,114}] deps = 113: tensorflow_transform>=1.13.0,<1.14.0 + # Help pip resolve conflict with typing-extensions due to an old version of tensorflow https://github.com/apache/beam/issues/30852 + 113: pydantic<2.0 114: tensorflow_transform>=1.14.0,<1.15.0 commands = bash {toxinidir}/scripts/run_pytest.sh {envname} 'apache_beam/ml/transforms apache_beam/examples/snippets/transforms/elementwise/mltransform_test.py' @@ -373,6 +375,8 @@ commands = [testenv:py{38,39,310}-tensorflow-212] deps = 212: tensorflow>=2.12rc1,<2.13 + # Help pip resolve conflict with typing-extensions for old version of TF https://github.com/apache/beam/issues/30852 + 212: pydantic<2.7 extras = test,gcp commands = # Log tensorflow version for debugging @@ -444,6 +448,8 @@ commands = deps = 014: tensorflow-hub>=0.14.0,<0.15.0 015: tensorflow-hub>=0.15.0,<0.16.0 + # Help pip resolve conflict with typing-extensions due to an old version of tensorboard https://github.com/apache/beam/issues/30852 + 015: pydantic<2.7 tensorflow-text # required to register ops for text embedding models. extras = test,gcp From 3c9e9c80e3badd6af5d2b06d0b9523fabf332760 Mon Sep 17 00:00:00 2001 From: Hai Joey Tran Date: Fri, 5 Apr 2024 19:46:08 -0400 Subject: [PATCH 03/18] Add identity function as default to FlatMap (#30744) * Add identity function as default to FlatMap * Update sdks/python/apache_beam/transforms/core_test.py Co-authored-by: tvalentyn * update docstring * yapf * Add flatmap with no function example * yapf * Update flatmp_nofunction metadata * Update website/www/site/content/en/documentation/transforms/python/elementwise/flatmap.md Co-authored-by: tvalentyn * Update sdks/python/apache_beam/examples/snippets/transforms/elementwise/flatmap_nofunction.py Co-authored-by: tvalentyn * isort --------- Co-authored-by: tvalentyn --- .../elementwise/flatmap_nofunction.py | 54 +++++++++++++++++++ .../transforms/elementwise/flatmap_test.py | 7 +++ sdks/python/apache_beam/transforms/core.py | 10 +++- .../apache_beam/transforms/core_test.py | 11 ++++ .../transforms/python/elementwise/flatmap.md | 22 +++++--- 5 files changed, 95 insertions(+), 9 deletions(-) create mode 100644 sdks/python/apache_beam/examples/snippets/transforms/elementwise/flatmap_nofunction.py diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/flatmap_nofunction.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/flatmap_nofunction.py new file mode 100644 index 000000000000..868172699e29 --- /dev/null +++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/flatmap_nofunction.py @@ -0,0 +1,54 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pytype: skip-file +# pylint:disable=line-too-long + +# beam-playground: +# name: FlatMapNoFunction +# description: Demonstration of FlatMap transform usage without a function. +# multifile: false +# default_example: false +# context_line: 44 +# categories: +# - Core Transforms +# complexity: BASIC +# tags: +# - transforms +# - strings +# - map + + +def flatmap_nofunction(test=None): + # [START flatmap_nofunction] + import apache_beam as beam + + with beam.Pipeline() as pipeline: + plants = ( + pipeline + | 'Gardening plants' >> beam.Create( + [['๐Ÿ“Strawberry', '๐Ÿฅ•Carrot', '๐Ÿ†Eggplant'], ['๐Ÿ…Tomato', '๐Ÿฅ”Potato']]) + | 'Split words' >> beam.FlatMap() + | beam.Map(print)) + # [END flatmap_nofunction] + if test: + test(plants) + + +if __name__ == '__main__': + flatmap_nofunction() diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/flatmap_test.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/flatmap_test.py index 793c12466562..6dd02a208a74 100644 --- a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/flatmap_test.py +++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/flatmap_test.py @@ -30,6 +30,7 @@ from . import flatmap_generator from . import flatmap_lambda from . import flatmap_multiple_arguments +from . import flatmap_nofunction from . import flatmap_side_inputs_dict from . import flatmap_side_inputs_iter from . import flatmap_side_inputs_singleton @@ -65,6 +66,9 @@ def check_valid_plants(actual): @mock.patch( 'apache_beam.examples.snippets.transforms.elementwise.flatmap_function.print', str) +@mock.patch( + 'apache_beam.examples.snippets.transforms.elementwise.flatmap_nofunction.print', + str) @mock.patch( 'apache_beam.examples.snippets.transforms.elementwise.flatmap_lambda.print', str) @@ -93,6 +97,9 @@ def test_flatmap_simple(self): def test_flatmap_function(self): flatmap_function.flatmap_function(check_plants) + def test_flatmap_nofunction(self): + flatmap_nofunction.flatmap_nofunction(check_plants) + def test_flatmap_lambda(self): flatmap_lambda.flatmap_lambda(check_plants) diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 297915023c5c..cb1521176e4e 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -1899,13 +1899,19 @@ def to_runner_api(self, unused_context): return beam_runner_api_pb2.FunctionSpec(urn=self._urn) -def FlatMap(fn, *args, **kwargs): # pylint: disable=invalid-name +def identity(x: T) -> T: + return x + + +def FlatMap(fn=identity, *args, **kwargs): # pylint: disable=invalid-name """:func:`FlatMap` is like :class:`ParDo` except it takes a callable to specify the transformation. The callable must return an iterable for each element of the input :class:`~apache_beam.pvalue.PCollection`. The elements of these iterables will - be flattened into the output :class:`~apache_beam.pvalue.PCollection`. + be flattened into the output :class:`~apache_beam.pvalue.PCollection`. If + no callable is given, then all elements of the input PCollection must already + be iterables themselves and will be flattened into the output PCollection. Args: fn (callable): a callable object. diff --git a/sdks/python/apache_beam/transforms/core_test.py b/sdks/python/apache_beam/transforms/core_test.py index 4fbeaa2ee97a..b0f54b8bb66d 100644 --- a/sdks/python/apache_beam/transforms/core_test.py +++ b/sdks/python/apache_beam/transforms/core_test.py @@ -170,6 +170,17 @@ def test_flatten_mismatched_windows(self): _ = (source1, source2, source3) | "flatten" >> beam.Flatten() +class FlatMapTest(unittest.TestCase): + def test_default(self): + + with beam.Pipeline() as pipeline: + letters = ( + pipeline + | beam.Create(['abc', 'def'], reshuffle=False) + | beam.FlatMap()) + assert_that(letters, equal_to(['a', 'b', 'c', 'd', 'e', 'f'])) + + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) unittest.main() diff --git a/website/www/site/content/en/documentation/transforms/python/elementwise/flatmap.md b/website/www/site/content/en/documentation/transforms/python/elementwise/flatmap.md index 0a4f5e7fae07..3b0164193b1a 100644 --- a/website/www/site/content/en/documentation/transforms/python/elementwise/flatmap.md +++ b/website/www/site/content/en/documentation/transforms/python/elementwise/flatmap.md @@ -49,7 +49,15 @@ We define a function `split_words` which splits an input `str` element using the {{< playground_snippet language="py" path="SDK_PYTHON_FlatMapFunction" show="flatmap_function" >}} {{< /playground >}} -### Example 3: FlatMap with a lambda function +### Example 3: FlatMap without a function + +A common use case of `FlatMap` is to flatten a `PCollection` of iterables into a `PCollection` of elements. To do that, don't specify the function argument to `FlatMap`, which uses the identity mapping function. + +{{< playground height="700px" >}} +{{< playground_snippet language="py" path="SDK_PYTHON_FlatMapNoFunction" show="flatmap_no_function" >}} +{{< /playground >}} + +### Example 4: FlatMap with a lambda function For this example, we want to flatten a `PCollection` of lists of `str`s into a `PCollection` of `str`s. Each input element is already an `iterable`, where each element is what we want in the resulting `PCollection`. @@ -59,7 +67,7 @@ We use a lambda function that returns the same input element it received. {{< playground_snippet language="py" path="SDK_PYTHON_FlatMapLambda" show="flatmap_lambda" >}} {{< /playground >}} -### Example 4: FlatMap with a generator +### Example 5: FlatMap with a generator For this example, we want to flatten a `PCollection` of lists of `str`s into a `PCollection` of `str`s. We use a generator to iterate over the input list and yield each of the elements. @@ -69,7 +77,7 @@ Each yielded result in the generator is an element in the resulting `PCollection {{< playground_snippet language="py" path="SDK_PYTHON_FlatMapGenerator" show="flatmap_generator" >}} {{< /playground >}} -### Example 5: FlatMapTuple for key-value pairs +### Example 6: FlatMapTuple for key-value pairs If your `PCollection` consists of `(key, value)` pairs, you can use `FlatMapTuple` to unpack them into different function arguments. @@ -78,7 +86,7 @@ you can use `FlatMapTuple` to unpack them into different function arguments. {{< playground_snippet language="py" path="SDK_PYTHON_FlatMapTuple" show="flatmap_tuple" >}} {{< /playground >}} -### Example 6: FlatMap with multiple arguments +### Example 7: FlatMap with multiple arguments You can pass functions with multiple arguments to `FlatMap`. They are passed as additional positional arguments or keyword arguments to the function. @@ -89,7 +97,7 @@ In this example, `split_words` takes `text` and `delimiter` as arguments. {{< playground_snippet language="py" path="SDK_PYTHON_FlatMapMultipleArguments" show="flatmap_multiple_arguments" >}} {{< /playground >}} -### Example 7: FlatMap with side inputs as singletons +### Example 8: FlatMap with side inputs as singletons If the `PCollection` has a single value, such as the average from another computation, passing the `PCollection` as a *singleton* accesses that value. @@ -101,7 +109,7 @@ We then use that value as the delimiter for the `str.split` method. {{< playground_snippet language="py" path="SDK_PYTHON_FlatMapSideInputSingleton" show="flatmap_side_inputs_singleton" >}} {{< /playground >}} -### Example 8: FlatMap with side inputs as iterators +### Example 9: FlatMap with side inputs as iterators If the `PCollection` has multiple values, pass the `PCollection` as an *iterator*. This accesses elements lazily as they are needed, @@ -114,7 +122,7 @@ so it is possible to iterate over large `PCollection`s that won't fit into memor > **Note**: You can pass the `PCollection` as a *list* with `beam.pvalue.AsList(pcollection)`, > but this requires that all the elements fit into memory. -### Example 9: FlatMap with side inputs as dictionaries +### Example 10: FlatMap with side inputs as dictionaries If a `PCollection` is small enough to fit into memory, then that `PCollection` can be passed as a *dictionary*. Each element must be a `(key, value)` pair. From 54673996c9bf2ee076b04833bbae2729d6cebbaf Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud <65791736+ahmedabu98@users.noreply.github.com> Date: Mon, 8 Apr 2024 06:55:17 -0400 Subject: [PATCH 04/18] [Java] ManagedIO (#30808) * managed api for java * yaml utils --- build.gradle.kts | 1 + sdks/java/core/build.gradle | 1 + .../beam/sdk/schemas/utils/YamlUtils.java | 171 +++++++++++++ .../apache/beam/sdk/util/YamlUtilsTest.java | 228 ++++++++++++++++++ sdks/java/managed/build.gradle | 37 +++ .../org/apache/beam/sdk/managed/Managed.java | 195 +++++++++++++++ .../ManagedSchemaTransformProvider.java | 183 ++++++++++++++ .../apache/beam/sdk/managed/package-info.java | 20 ++ .../ManagedSchemaTransformProviderTest.java | 103 ++++++++ .../apache/beam/sdk/managed/ManagedTest.java | 114 +++++++++ .../managed/TestSchemaTransformProvider.java | 98 ++++++++ .../src/test/resources/test_config.yaml | 21 ++ settings.gradle.kts | 2 + 13 files changed, 1174 insertions(+) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/util/YamlUtilsTest.java create mode 100644 sdks/java/managed/build.gradle create mode 100644 sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java create mode 100644 sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java create mode 100644 sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/package-info.java create mode 100644 sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java create mode 100644 sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedTest.java create mode 100644 sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/TestSchemaTransformProvider.java create mode 100644 sdks/java/managed/src/test/resources/test_config.yaml diff --git a/build.gradle.kts b/build.gradle.kts index ded692677b53..9c42ffdc8cea 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -303,6 +303,7 @@ tasks.register("javaPreCommit") { dependsOn(":sdks:java:io:synthetic:build") dependsOn(":sdks:java:io:xml:build") dependsOn(":sdks:java:javadoc:allJavadoc") + dependsOn(":sdks:java:managed:build") dependsOn(":sdks:java:testing:expansion-service:build") dependsOn(":sdks:java:testing:jpms-tests:build") dependsOn(":sdks:java:testing:load-tests:build") diff --git a/sdks/java/core/build.gradle b/sdks/java/core/build.gradle index 438a3fb1806c..5a47cb5237ea 100644 --- a/sdks/java/core/build.gradle +++ b/sdks/java/core/build.gradle @@ -98,6 +98,7 @@ dependencies { permitUnusedDeclared enforcedPlatform(library.java.google_cloud_platform_libraries_bom) provided library.java.json_org implementation library.java.everit_json_schema + implementation "org.yaml:snakeyaml:2.0" shadowTest library.java.everit_json_schema provided library.java.junit testImplementation "com.github.stefanbirkner:system-rules:1.19.0" diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java new file mode 100644 index 000000000000..5c05b2bed396 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas.utils; + +import static org.apache.beam.sdk.values.Row.toRow; + +import java.math.BigDecimal; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.Field; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.CaseFormat; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.BaseEncoding; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.yaml.snakeyaml.Yaml; + +public class YamlUtils { + private static final Map> YAML_VALUE_PARSERS = + ImmutableMap + .> + builder() + .put(Schema.TypeName.BYTE, Byte::valueOf) + .put(Schema.TypeName.INT16, Short::valueOf) + .put(Schema.TypeName.INT32, Integer::valueOf) + .put(Schema.TypeName.INT64, Long::valueOf) + .put(Schema.TypeName.FLOAT, Float::valueOf) + .put(Schema.TypeName.DOUBLE, Double::valueOf) + .put(Schema.TypeName.DECIMAL, BigDecimal::new) + .put(Schema.TypeName.BOOLEAN, Boolean::valueOf) + .put(Schema.TypeName.STRING, str -> str) + .put(Schema.TypeName.BYTES, str -> BaseEncoding.base64().decode(str)) + .build(); + + public static Row toBeamRow(@Nullable String yamlString, Schema schema) { + return toBeamRow(yamlString, schema, false); + } + + public static Row toBeamRow( + @Nullable String yamlString, Schema schema, boolean convertNamesToCamelCase) { + if (yamlString == null || yamlString.isEmpty()) { + List requiredFields = + schema.getFields().stream() + .filter(field -> !field.getType().getNullable()) + .collect(Collectors.toList()); + if (requiredFields.isEmpty()) { + return Row.nullRow(schema); + } else { + throw new IllegalArgumentException( + String.format( + "Received an empty YAML string, but output schema contains required fields: %s", + requiredFields)); + } + } + Yaml yaml = new Yaml(); + Object yamlMap = yaml.load(yamlString); + + Preconditions.checkArgument( + yamlMap instanceof Map, + "Expected a YAML mapping but got type '%s' instead.", + Preconditions.checkNotNull(yamlMap).getClass()); + + return toBeamRow( + (Map) Preconditions.checkNotNull(yamlMap), schema, convertNamesToCamelCase); + } + + private static @Nullable Object toBeamValue( + Field field, @Nullable Object yamlValue, boolean convertNamesToCamelCase) { + FieldType fieldType = field.getType(); + + if (yamlValue == null) { + if (fieldType.getNullable()) { + return null; + } else { + throw new IllegalArgumentException( + "Received null value for non-nullable field \"" + field.getName() + "\""); + } + } + + if (yamlValue instanceof String + || yamlValue instanceof Number + || yamlValue instanceof Boolean) { + String yamlStringValue = yamlValue.toString(); + if (YAML_VALUE_PARSERS.containsKey(fieldType.getTypeName())) { + return YAML_VALUE_PARSERS.get(fieldType.getTypeName()).apply(yamlStringValue); + } + } + + if (yamlValue instanceof byte[] && fieldType.getTypeName() == Schema.TypeName.BYTES) { + return yamlValue; + } + + if (yamlValue instanceof List) { + FieldType innerType = + Preconditions.checkNotNull( + fieldType.getCollectionElementType(), + "Cannot convert YAML type '%s` to `%s` because the YAML value is a List, but the output schema field does not define a collection type.", + yamlValue.getClass(), + fieldType); + return ((List) yamlValue) + .stream() + .map( + v -> + Preconditions.checkNotNull( + toBeamValue(field.withType(innerType), v, convertNamesToCamelCase))) + .collect(Collectors.toList()); + } + + if (yamlValue instanceof Map) { + if (fieldType.getTypeName() == Schema.TypeName.ROW) { + Schema nestedSchema = + Preconditions.checkNotNull( + fieldType.getRowSchema(), + "Received a YAML '%s' type, but output schema field '%s' does not define a Row Schema", + yamlValue.getClass(), + fieldType); + return toBeamRow((Map) yamlValue, nestedSchema, convertNamesToCamelCase); + } else if (fieldType.getTypeName() == Schema.TypeName.MAP) { + return yamlValue; + } + } + + throw new UnsupportedOperationException( + String.format( + "Converting YAML type '%s' to '%s' is not supported", yamlValue.getClass(), fieldType)); + } + + @SuppressWarnings("nullness") + public static Row toBeamRow(Map yamlMap, Schema rowSchema, boolean toCamelCase) { + return rowSchema.getFields().stream() + .map( + field -> + toBeamValue( + field, + yamlMap.get(maybeGetSnakeCase(field.getName(), toCamelCase)), + toCamelCase)) + .collect(toRow(rowSchema)); + } + + private static String maybeGetSnakeCase(String str, boolean getSnakeCase) { + return getSnakeCase ? CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, str) : str; + } + + public static String yamlStringFromMap(@Nullable Map map) { + if (map == null || map.isEmpty()) { + return ""; + } + return new Yaml().dumpAsMap(map); + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/YamlUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/YamlUtilsTest.java new file mode 100644 index 000000000000..6e6984dde3a6 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/YamlUtilsTest.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import static org.junit.Assert.assertEquals; + +import java.math.BigDecimal; +import java.util.Arrays; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.utils.YamlUtils; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.BaseEncoding; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class YamlUtilsTest { + @Rule public transient ExpectedException thrown = ExpectedException.none(); + + public String makeNested(String input) { + return Arrays.stream(input.split("\n")) + .map(str -> " " + str) + .collect(Collectors.joining("\n")); + } + + @Test + public void testEmptyYamlString() { + Schema schema = Schema.builder().build(); + + assertEquals(Row.nullRow(schema), YamlUtils.toBeamRow("", schema)); + } + + @Test + public void testInvalidEmptyYamlWithNonEmptySchema() { + Schema schema = Schema.builder().addStringField("dummy").build(); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage( + "Received an empty YAML string, but output schema contains required fields"); + thrown.expectMessage("dummy"); + + YamlUtils.toBeamRow("", schema); + } + + @Test + public void testNullableValues() { + String yamlString = "nullable_string:\n" + "nullable_integer:\n" + "nullable_boolean:\n"; + Schema schema = + Schema.builder() + .addNullableStringField("nullable_string") + .addNullableInt32Field("nullable_integer") + .addNullableBooleanField("nullable_boolean") + .build(); + + assertEquals(Row.nullRow(schema), YamlUtils.toBeamRow(yamlString, schema)); + } + + @Test + public void testMissingNullableValues() { + String yamlString = "nullable_string:"; + Schema schema = + Schema.builder() + .addNullableStringField("nullable_string") + .addNullableInt32Field("nullable_integer") + .addNullableBooleanField("nullable_boolean") + .build(); + + assertEquals(Row.nullRow(schema), YamlUtils.toBeamRow(yamlString, schema)); + } + + @Test + public void testInvalidNullableValues() { + String yamlString = "nullable_string:\n" + "integer:"; + Schema schema = + Schema.builder().addNullableStringField("nullable_string").addInt32Field("integer").build(); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Received null value for non-nullable field \"integer\""); + YamlUtils.toBeamRow(yamlString, schema); + } + + @Test + public void testInvalidMissingRequiredValues() { + String yamlString = "nullable_string:"; + Schema schema = + Schema.builder().addNullableStringField("nullable_string").addInt32Field("integer").build(); + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Received null value for non-nullable field \"integer\""); + + YamlUtils.toBeamRow(yamlString, schema); + } + + @Test + public void testExtraFieldsAreIgnored() { + String yamlString = "field1: val1\n" + "field2: val2"; + Schema schema = Schema.builder().addStringField("field1").build(); + Row expectedRow = Row.withSchema(schema).withFieldValue("field1", "val1").build(); + + assertEquals(expectedRow, YamlUtils.toBeamRow(yamlString, schema)); + } + + @Test + public void testInvalidTopLevelArray() { + String invalidYaml = "- top_level_list" + "- another_list"; + Schema schema = Schema.builder().build(); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Expected a YAML mapping"); + YamlUtils.toBeamRow(invalidYaml, schema); + } + + private static final Schema FLAT_SCHEMA = + Schema.builder() + .addByteField("byte_field") + .addInt16Field("int16_field") + .addInt32Field("int32_field") + .addInt64Field("int64_field") + .addFloatField("float_field") + .addDoubleField("double_field") + .addDecimalField("decimal_field") + .addBooleanField("boolean_field") + .addStringField("string_field") + .addByteArrayField("bytes_field") + .build(); + + private static final Row FLAT_ROW = + Row.withSchema(FLAT_SCHEMA) + .withFieldValue("byte_field", Byte.valueOf("123")) + .withFieldValue("int16_field", Short.valueOf("16")) + .withFieldValue("int32_field", 32) + .withFieldValue("int64_field", 64L) + .withFieldValue("float_field", 123.456F) + .withFieldValue("double_field", 456.789) + .withFieldValue("decimal_field", BigDecimal.valueOf(789.123)) + .withFieldValue("boolean_field", true) + .withFieldValue("string_field", "some string") + .withFieldValue("bytes_field", BaseEncoding.base64().decode("abc")) + .build(); + + private static final String FLAT_YAML = + "byte_field: 123\n" + + "int16_field: 16\n" + + "int32_field: 32\n" + + "int64_field: 64\n" + + "float_field: 123.456\n" + + "double_field: 456.789\n" + + "decimal_field: 789.123\n" + + "boolean_field: true\n" + + "string_field: some string\n" + + "bytes_field: abc"; + + @Test + public void testAllTypesFlat() { + assertEquals(FLAT_ROW, YamlUtils.toBeamRow(FLAT_YAML, FLAT_SCHEMA)); + } + + @Test + public void testAllTypesNested() { + String nestedFlatTypes = makeNested(FLAT_YAML); + String topLevelYaml = "top_string: abc\n" + "nested: \n" + nestedFlatTypes; + + Schema schema = + Schema.builder().addStringField("top_string").addRowField("nested", FLAT_SCHEMA).build(); + Row expectedRow = + Row.withSchema(schema) + .withFieldValue("top_string", "abc") + .withFieldValue("nested", FLAT_ROW) + .build(); + + assertEquals(expectedRow, YamlUtils.toBeamRow(topLevelYaml, schema)); + } + + private static final String INT_ARRAY_YAML = + "arr:\n" + " - 1\n" + " - 2\n" + " - 3\n" + " - 4\n" + " - 5\n"; + + private static final Schema INT_ARRAY_SCHEMA = + Schema.builder().addArrayField("arr", Schema.FieldType.INT32).build(); + + private static final Row INT_ARRAY_ROW = + Row.withSchema(INT_ARRAY_SCHEMA) + .withFieldValue("arr", IntStream.range(1, 6).boxed().collect(Collectors.toList())) + .build(); + + @Test + public void testArray() { + assertEquals(INT_ARRAY_ROW, YamlUtils.toBeamRow(INT_ARRAY_YAML, INT_ARRAY_SCHEMA)); + } + + @Test + public void testNestedArray() { + String nestedArray = makeNested(INT_ARRAY_YAML); + String yamlString = "str_field: some string\n" + "nested: \n" + nestedArray; + + Schema schema = + Schema.builder() + .addStringField("str_field") + .addRowField("nested", INT_ARRAY_SCHEMA) + .build(); + + Row expectedRow = + Row.withSchema(schema) + .withFieldValue("str_field", "some string") + .withFieldValue("nested", INT_ARRAY_ROW) + .build(); + + assertEquals(expectedRow, YamlUtils.toBeamRow(yamlString, schema)); + } +} diff --git a/sdks/java/managed/build.gradle b/sdks/java/managed/build.gradle new file mode 100644 index 000000000000..88e537d66f8c --- /dev/null +++ b/sdks/java/managed/build.gradle @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +apply plugin: 'org.apache.beam.module' +applyJavaNature( + automaticModuleName: 'org.apache.beam.sdk.managed', +) + + +description = "Apache Beam :: SDKs :: Java :: Managed" +ext.summary = """Library that provides managed IOs.""" + + +dependencies { + implementation project(path: ":sdks:java:core", configuration: "shadow") + implementation library.java.vendored_guava_32_1_2_jre +// implementation library.java.vendored_grpc_1_60_1 + + testImplementation library.java.junit + testRuntimeOnly "org.yaml:snakeyaml:2.0" + testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow") +} diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java new file mode 100644 index 000000000000..b2b010b1e434 --- /dev/null +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.managed; + +import com.google.auto.value.AutoValue; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.utils.YamlUtils; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; + +/** + * Top-level {@link org.apache.beam.sdk.transforms.PTransform}s that build and instantiate turnkey + * transforms. + * + *

Available transforms

+ * + *

This API currently supports two operations: {@link Read} and {@link Write}. Each one + * enumerates the available transforms in a {@code TRANSFORMS} map. + * + *

Building a Managed turnkey transform

+ * + *

Turnkey transforms are represented as {@link SchemaTransform}s, which means each one has a + * defined configuration. A given transform can be built with a {@code Map} that + * specifies arguments using like so: + * + *

{@code
+ * PCollectionRowTuple output = PCollectionRowTuple.empty(pipeline).apply(
+ *       Managed.read(ICEBERG)
+ *           .withConfig(ImmutableMap..builder()
+ *               .put("foo", "abc")
+ *               .put("bar", 123)
+ *               .build()));
+ * }
+ * + *

Instead of specifying configuration arguments directly in the code, one can provide the + * location to a YAML file that contains this information. Say we have the following YAML file: + * + *

{@code
+ * foo: "abc"
+ * bar: 123
+ * }
+ * + *

The file's path can be passed in to the Managed API like so: + * + *

{@code
+ * PCollectionRowTuple input = PCollectionRowTuple.of("input", pipeline.apply(Create.of(...)))
+ *
+ * PCollectionRowTuple output = input.apply(
+ *     Managed.write(ICEBERG)
+ *         .withConfigUrl());
+ * }
+ */ +public class Managed { + + // TODO: Dynamically generate a list of supported transforms + public static final String ICEBERG = "iceberg"; + + public static final Map READ_TRANSFORMS = + ImmutableMap.builder() + .put(ICEBERG, "beam:schematransform:org.apache.beam:iceberg_read:v1") + .build(); + public static final Map WRITE_TRANSFORMS = + ImmutableMap.builder() + .put(ICEBERG, "beam:schematransform:org.apache.beam:iceberg_write:v1") + .build(); + + /** + * Instantiates a {@link Managed.Read} transform for the specified source. The supported managed + * sources are: + * + *
    + *
  • {@link Managed#ICEBERG} : Read from Apache Iceberg + *
+ */ + public static ManagedTransform read(String source) { + + return new AutoValue_Managed_ManagedTransform.Builder() + .setIdentifier( + Preconditions.checkNotNull( + READ_TRANSFORMS.get(source.toLowerCase()), + "An unsupported source was specified: '%s'. Please specify one of the following sources: %s", + source, + READ_TRANSFORMS.keySet())) + .setSupportedIdentifiers(new ArrayList<>(READ_TRANSFORMS.values())) + .build(); + } + + /** + * Instantiates a {@link Managed.Write} transform for the specified sink. The supported managed + * sinks are: + * + *
    + *
  • {@link Managed#ICEBERG} : Write to Apache Iceberg + *
+ */ + public static ManagedTransform write(String sink) { + return new AutoValue_Managed_ManagedTransform.Builder() + .setIdentifier( + Preconditions.checkNotNull( + WRITE_TRANSFORMS.get(sink.toLowerCase()), + "An unsupported sink was specified: '%s'. Please specify one of the following sinks: %s", + sink, + WRITE_TRANSFORMS.keySet())) + .setSupportedIdentifiers(new ArrayList<>(WRITE_TRANSFORMS.values())) + .build(); + } + + @AutoValue + public abstract static class ManagedTransform extends SchemaTransform { + abstract String getIdentifier(); + + abstract @Nullable String getConfig(); + + abstract @Nullable String getConfigUrl(); + + @VisibleForTesting + abstract List getSupportedIdentifiers(); + + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setIdentifier(String identifier); + + abstract Builder setConfig(@Nullable String config); + + abstract Builder setConfigUrl(@Nullable String configUrl); + + @VisibleForTesting + abstract Builder setSupportedIdentifiers(List supportedIdentifiers); + + abstract ManagedTransform build(); + } + + /** + * Use the input Map of configuration arguments to build and instantiate the underlying + * transform. The map can ignore nullable parameters, but needs to include all required + * parameters. Check the underlying transform's schema ({@link + * SchemaTransformProvider#configurationSchema()}) to see which parameters are available. + */ + public ManagedTransform withConfig(Map config) { + return toBuilder().setConfig(YamlUtils.yamlStringFromMap(config)).build(); + } + + /** + * Like {@link #withConfig(Map)}, but instead extracts the configuration arguments from a + * specified YAML file location. + */ + public ManagedTransform withConfigUrl(String configUrl) { + return toBuilder().setConfigUrl(configUrl).build(); + } + + @VisibleForTesting + ManagedTransform withSupportedIdentifiers(List supportedIdentifiers) { + return toBuilder().setSupportedIdentifiers(supportedIdentifiers).build(); + } + + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + ManagedSchemaTransformProvider.ManagedConfig managedConfig = + ManagedSchemaTransformProvider.ManagedConfig.builder() + .setTransformIdentifier(getIdentifier()) + .setConfig(getConfig()) + .setConfigUrl(getConfigUrl()) + .build(); + + SchemaTransform underlyingTransform = + new ManagedSchemaTransformProvider(getSupportedIdentifiers()).from(managedConfig); + + return input.apply(underlyingTransform); + } + } +} diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java new file mode 100644 index 000000000000..1ee2b11a90ff --- /dev/null +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.managed; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.service.AutoService; +import com.google.auto.value.AutoValue; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.ServiceLoader; +import javax.annotation.Nullable; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.schemas.utils.YamlUtils; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; + +@AutoService(SchemaTransformProvider.class) +public class ManagedSchemaTransformProvider + extends TypedSchemaTransformProvider { + + @Override + public String identifier() { + return "beam:schematransform:org.apache.beam:managed:v1"; + } + + private final Map schemaTransformProviders = new HashMap<>(); + + public ManagedSchemaTransformProvider() {} + + ManagedSchemaTransformProvider(Collection supportedIdentifiers) { + try { + for (SchemaTransformProvider schemaTransformProvider : + ServiceLoader.load(SchemaTransformProvider.class)) { + if (schemaTransformProviders.containsKey(schemaTransformProvider.identifier())) { + throw new IllegalArgumentException( + "Found multiple SchemaTransformProvider implementations with the same identifier " + + schemaTransformProvider.identifier()); + } + schemaTransformProviders.put(schemaTransformProvider.identifier(), schemaTransformProvider); + } + } catch (Exception e) { + throw new RuntimeException(e.getMessage()); + } + + schemaTransformProviders.entrySet().removeIf(e -> !supportedIdentifiers.contains(e.getKey())); + } + + @DefaultSchema(AutoValueSchema.class) + @AutoValue + @VisibleForTesting + abstract static class ManagedConfig { + public static Builder builder() { + return new AutoValue_ManagedSchemaTransformProvider_ManagedConfig.Builder(); + } + + @SchemaFieldDescription("Identifier of the underlying IO to instantiate.") + public abstract String getTransformIdentifier(); + + @SchemaFieldDescription("URL path to the YAML config file used to build the underlying IO.") + public abstract @Nullable String getConfigUrl(); + + @SchemaFieldDescription("YAML string config used to build the underlying IO.") + public abstract @Nullable String getConfig(); + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setTransformIdentifier(String identifier); + + public abstract Builder setConfigUrl(@Nullable String configUrl); + + public abstract Builder setConfig(@Nullable String config); + + public abstract ManagedConfig build(); + } + + protected void validate() { + boolean configExists = !Strings.isNullOrEmpty(getConfig()); + boolean configUrlExists = !Strings.isNullOrEmpty(getConfigUrl()); + checkArgument( + !(configExists && configUrlExists) && (configExists || configUrlExists), + "Please specify a config or a config URL, but not both."); + } + } + + @Override + protected SchemaTransform from(ManagedConfig managedConfig) { + managedConfig.validate(); + SchemaTransformProvider schemaTransformProvider = + Preconditions.checkNotNull( + schemaTransformProviders.get(managedConfig.getTransformIdentifier()), + "Could not find transform with identifier %s, or it may not be supported", + managedConfig.getTransformIdentifier()); + + // parse config before expansion to check if it matches underlying transform's config schema + Schema transformConfigSchema = schemaTransformProvider.configurationSchema(); + Row transformConfig; + try { + transformConfig = getRowConfig(managedConfig, transformConfigSchema); + } catch (Exception e) { + throw new IllegalArgumentException( + String.format( + "Specified configuration does not align with the underlying transform's configuration schema [%s].", + transformConfigSchema), + e); + } + + return new ManagedSchemaTransform(transformConfig, schemaTransformProvider); + } + + private static class ManagedSchemaTransform extends SchemaTransform { + private final Row transformConfig; + private final SchemaTransformProvider underlyingTransformProvider; + + ManagedSchemaTransform( + Row transformConfig, SchemaTransformProvider underlyingTransformProvider) { + this.transformConfig = transformConfig; + this.underlyingTransformProvider = underlyingTransformProvider; + } + + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + SchemaTransform underlyingTransform = underlyingTransformProvider.from(transformConfig); + + return input.apply(underlyingTransform); + } + } + + @VisibleForTesting + static Row getRowConfig(ManagedConfig config, Schema transformSchema) { + String transformYamlConfig; + if (!Strings.isNullOrEmpty(config.getConfigUrl())) { + try { + MatchResult.Metadata fileMetaData = + FileSystems.matchSingleFileSpec(Preconditions.checkNotNull(config.getConfigUrl())); + ByteBuffer buffer = ByteBuffer.allocate((int) fileMetaData.sizeBytes()); + FileSystems.open(fileMetaData.resourceId()).read(buffer); + transformYamlConfig = new String(buffer.array(), StandardCharsets.UTF_8); + } catch (IOException e) { + throw new RuntimeException(e); + } + } else { + transformYamlConfig = config.getConfig(); + } + + return YamlUtils.toBeamRow(transformYamlConfig, transformSchema, true); + } + + @VisibleForTesting + Map getAllProviders() { + return schemaTransformProviders; + } +} diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/package-info.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/package-info.java new file mode 100644 index 000000000000..d129e4a7a225 --- /dev/null +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Managed reads and writes. */ +package org.apache.beam.sdk.managed; diff --git a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java new file mode 100644 index 000000000000..0c495d0d2c5c --- /dev/null +++ b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.managed; + +import static org.apache.beam.sdk.managed.ManagedSchemaTransformProvider.ManagedConfig; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.net.URISyntaxException; +import java.nio.file.Paths; +import java.util.Arrays; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.values.Row; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class ManagedSchemaTransformProviderTest { + @Rule public transient ExpectedException thrown = ExpectedException.none(); + + @Test + public void testFailWhenNoConfigSpecified() { + ManagedSchemaTransformProvider.ManagedConfig config = + ManagedSchemaTransformProvider.ManagedConfig.builder() + .setTransformIdentifier("some identifier") + .build(); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Please specify a config or a config URL, but not both"); + config.validate(); + } + + @Test + public void testGetRowFromYamlConfig() { + String yamlString = "extra_string: abc\n" + "extra_integer: 123"; + ManagedConfig config = + ManagedConfig.builder() + .setTransformIdentifier(TestSchemaTransformProvider.IDENTIFIER) + .setConfig(yamlString) + .build(); + Schema configSchema = new TestSchemaTransformProvider().configurationSchema(); + Row expectedRow = + Row.withSchema(configSchema) + .withFieldValue("extraString", "abc") + .withFieldValue("extraInteger", 123) + .build(); + Row configRow = + ManagedSchemaTransformProvider.getRowConfig( + config, new TestSchemaTransformProvider().configurationSchema()); + + assertEquals(expectedRow, configRow); + } + + @Test + public void testGetRowFromConfigUrl() throws URISyntaxException { + String yamlConfigPath = + Paths.get(getClass().getClassLoader().getResource("test_config.yaml").toURI()) + .toFile() + .getAbsolutePath(); + ManagedConfig config = + ManagedConfig.builder() + .setTransformIdentifier(TestSchemaTransformProvider.IDENTIFIER) + .setConfigUrl(yamlConfigPath) + .build(); + Schema configSchema = new TestSchemaTransformProvider().configurationSchema(); + Row expectedRow = + Row.withSchema(configSchema) + .withFieldValue("extraString", "abc") + .withFieldValue("extraInteger", 123) + .build(); + Row configRow = + ManagedSchemaTransformProvider.getRowConfig( + config, new TestSchemaTransformProvider().configurationSchema()); + + assertEquals(expectedRow, configRow); + } + + @Test + public void testDiscoverTestProvider() { + ManagedSchemaTransformProvider provider = + new ManagedSchemaTransformProvider(Arrays.asList(TestSchemaTransformProvider.IDENTIFIER)); + + assertTrue(provider.getAllProviders().containsKey(TestSchemaTransformProvider.IDENTIFIER)); + } +} diff --git a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedTest.java b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedTest.java new file mode 100644 index 000000000000..ceb71a06f33c --- /dev/null +++ b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.managed; + +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class ManagedTest { + @Rule public transient ExpectedException thrown = ExpectedException.none(); + + @Test + public void testInvalidTransform() { + thrown.expect(NullPointerException.class); + thrown.expectMessage("An unsupported source was specified"); + Managed.read("nonexistent-source"); + + thrown.expect(NullPointerException.class); + thrown.expectMessage("An unsupported sink was specified"); + Managed.write("nonexistent-sink"); + } + + @Rule public TestPipeline pipeline = TestPipeline.create(); + + private static final Schema SCHEMA = + Schema.builder().addStringField("str").addInt32Field("int").build(); + private static final List ROWS = + Arrays.asList( + Row.withSchema(SCHEMA).withFieldValue("str", "a").withFieldValue("int", 1).build(), + Row.withSchema(SCHEMA).withFieldValue("str", "b").withFieldValue("int", 2).build(), + Row.withSchema(SCHEMA).withFieldValue("str", "c").withFieldValue("int", 3).build()); + + public void runTestProviderTest(Managed.ManagedTransform writeOp) { + PCollection rows = + PCollectionRowTuple.of("input", pipeline.apply(Create.of(ROWS)).setRowSchema(SCHEMA)) + .apply(writeOp) + .get("output"); + + Schema outputSchema = rows.getSchema(); + PAssert.that(rows) + .containsInAnyOrder( + ROWS.stream() + .map( + row -> + Row.withSchema(outputSchema) + .addValues(row.getValues()) + .addValue("abc") + .addValue(123) + .build()) + .collect(Collectors.toList())); + pipeline.run(); + } + + @Test + public void testManagedTestProviderWithConfigMap() { + Managed.ManagedTransform writeOp = + Managed.write(Managed.ICEBERG) + .toBuilder() + .setIdentifier(TestSchemaTransformProvider.IDENTIFIER) + .build() + .withSupportedIdentifiers(Arrays.asList(TestSchemaTransformProvider.IDENTIFIER)) + .withConfig(ImmutableMap.of("extra_string", "abc", "extra_integer", 123)); + + runTestProviderTest(writeOp); + } + + @Test + public void testManagedTestProviderWithConfigFile() throws Exception { + String yamlConfigPath = + Paths.get(getClass().getClassLoader().getResource("test_config.yaml").toURI()) + .toFile() + .getAbsolutePath(); + + Managed.ManagedTransform writeOp = + Managed.write(Managed.ICEBERG) + .toBuilder() + .setIdentifier(TestSchemaTransformProvider.IDENTIFIER) + .build() + .withSupportedIdentifiers(Arrays.asList(TestSchemaTransformProvider.IDENTIFIER)) + .withConfigUrl(yamlConfigPath); + + runTestProviderTest(writeOp); + } +} diff --git a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/TestSchemaTransformProvider.java b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/TestSchemaTransformProvider.java new file mode 100644 index 000000000000..136d98d468d0 --- /dev/null +++ b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/TestSchemaTransformProvider.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.managed; + +import com.google.auto.service.AutoService; +import com.google.auto.value.AutoValue; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptors; + +@AutoService(SchemaTransformProvider.class) +public class TestSchemaTransformProvider + extends TypedSchemaTransformProvider { + static final String IDENTIFIER = "beam:schematransform:org.apache.beam:test_transform:v1"; + + @DefaultSchema(AutoValueSchema.class) + @AutoValue + public abstract static class Config { + public static Builder builder() { + return new AutoValue_TestSchemaTransformProvider_Config.Builder(); + } + + @SchemaFieldDescription("String to add to each row element.") + public abstract String getExtraString(); + + @SchemaFieldDescription("Integer to add to each row element.") + public abstract Integer getExtraInteger(); + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setExtraString(String extraString); + + public abstract Builder setExtraInteger(Integer extraInteger); + + public abstract Config build(); + } + } + + @Override + public SchemaTransform from(Config config) { + String extraString = config.getExtraString(); + Integer extraInteger = config.getExtraInteger(); + return new SchemaTransform() { + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + Schema schema = + Schema.builder() + .addFields(input.get("input").getSchema().getFields()) + .addStringField("extra_string") + .addInt32Field("extra_integer") + .build(); + PCollection rows = + input + .get("input") + .apply( + MapElements.into(TypeDescriptors.rows()) + .via( + row -> + Row.withSchema(schema) + .addValues(row.getValues()) + .addValue(extraString) + .addValue(extraInteger) + .build())) + .setRowSchema(schema); + return PCollectionRowTuple.of("output", rows); + } + }; + } + + @Override + public String identifier() { + return IDENTIFIER; + } +} diff --git a/sdks/java/managed/src/test/resources/test_config.yaml b/sdks/java/managed/src/test/resources/test_config.yaml new file mode 100644 index 000000000000..7725c32b348e --- /dev/null +++ b/sdks/java/managed/src/test/resources/test_config.yaml @@ -0,0 +1,21 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +extra_string: "abc" +extra_integer: 123 \ No newline at end of file diff --git a/settings.gradle.kts b/settings.gradle.kts index ec11fd32fdd3..1e52e425b215 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -353,3 +353,5 @@ include("sdks:java:io:kafka:kafka-100") findProject(":sdks:java:io:kafka:kafka-100")?.name = "kafka-100" include("sdks:java:io:kafka:kafka-01103") findProject(":sdks:java:io:kafka:kafka-01103")?.name = "kafka-01103" +include("sdks:java:managed") +findProject(":sdks:java:managed")?.name = "managed" From 58e4f8325fc0a2f7cd71bf9ccefea06a5ac6f75b Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Mon, 8 Apr 2024 10:25:17 -0400 Subject: [PATCH 05/18] Patch release website changes (#30839) * Patch release website changes * Known issue * Mark as released --- CHANGES.md | 8 ++++++++ website/www/site/config.toml | 2 +- .../www/site/content/en/get-started/downloads.md | 13 ++++++++++--- 3 files changed, 19 insertions(+), 4 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 170d8d08da74..5824c71a98dc 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -52,6 +52,7 @@ * ([#X](https://github.com/apache/beam/issues/X)). --> + # [2.56.0] - Unreleased ## Highlights @@ -88,6 +89,12 @@ * ([#X](https://github.com/apache/beam/issues/X)). +# [2.55.1] + +## Bugfixes + +* Fixed issue that broke WriteToJson in languages other than Java (X-lang) ([#30776](https://github.com/apache/beam/issues/30776)). + # [2.55.0] - 2024-03-25 ## Highlights @@ -133,6 +140,7 @@ ## Known Issues * In Python pipelines, when shutting down inactive bundle processors, shutdown logic can overaggressively hold the lock, blocking acceptance of new work. Symptoms of this issue include slowness or stuckness in long-running jobs. Fixed in 2.56.0 ([#30679](https://github.com/apache/beam/pull/30679)). +* WriteToJson broken in languages other than Java (X-lang) ([#30776](https://github.com/apache/beam/issues/30776)). # [2.54.0] - 2024-02-14 diff --git a/website/www/site/config.toml b/website/www/site/config.toml index e3e2638825f0..9e29281dd795 100644 --- a/website/www/site/config.toml +++ b/website/www/site/config.toml @@ -104,7 +104,7 @@ github_project_repo = "https://github.com/apache/beam" [params] description = "Apache Beam is an open source, unified model and set of language-specific SDKs for defining and executing data processing workflows, and also data ingestion and integration flows, supporting Enterprise Integration Patterns (EIPs) and Domain Specific Languages (DSLs). Dataflow pipelines simplify the mechanics of large-scale batch and streaming data processing and can run on a number of runtimes like Apache Flink, Apache Spark, and Google Cloud Dataflow (a cloud service). Beam also brings DSL in different languages, allowing users to easily implement their data integration processes." -release_latest = "2.55.0" +release_latest = "2.55.1" # The repository and branch where the files live in Github or Colab. This is used # to serve and stage from your local branch, but publish to the master branch. # e.g. https://github.com/{{< param branch_repo >}}/path/to/notebook.ipynb diff --git a/website/www/site/content/en/get-started/downloads.md b/website/www/site/content/en/get-started/downloads.md index 8b11768d4ded..d5e04b33403d 100644 --- a/website/www/site/content/en/get-started/downloads.md +++ b/website/www/site/content/en/get-started/downloads.md @@ -96,10 +96,17 @@ versions denoted `0.x.y`. ## Releases +### 2.55.1 (2024-03-25) +Official [source code download](https://downloads.apache.org/beam/2.55.1/apache-beam-2.55.1-source-release.zip). +[SHA-512](https://downloads.apache.org/beam/2.55.1/apache-beam-2.55.1-source-release.zip.sha512). +[signature](https://downloads.apache.org/beam/2.55.1/apache-beam-2.55.1-source-release.zip.asc). + +[Release notes](https://github.com/apache/beam/releases/tag/v2.55.1) + ### 2.55.0 (2024-03-25) -Official [source code download](https://downloads.apache.org/beam/2.55.0/apache-beam-2.55.0-source-release.zip). -[SHA-512](https://downloads.apache.org/beam/2.55.0/apache-beam-2.55.0-source-release.zip.sha512). -[signature](https://downloads.apache.org/beam/2.55.0/apache-beam-2.55.0-source-release.zip.asc). +Official [source code download](https://archive.apache.org/beam/2.55.0/apache-beam-2.55.0-source-release.zip). +[SHA-512](https://archive.apache.org/beam/2.55.0/apache-beam-2.55.0-source-release.zip.sha512). +[signature](https://archive.apache.org/beam/2.55.0/apache-beam-2.55.0-source-release.zip.asc). [Release notes](https://github.com/apache/beam/releases/tag/v2.55.0) [Blog post](/blog/beam-2.55.0). From 1c962a5ec7ac80eb1165a7fe8fc9c362b94dcd0f Mon Sep 17 00:00:00 2001 From: Daria Bezkorovaina <99457894+dariabezkorovaina@users.noreply.github.com> Date: Mon, 8 Apr 2024 21:14:09 +0100 Subject: [PATCH 06/18] Duet AI Prompts - Documentation Lookup Without Links (#30873) * Create 53_dataflow_runner.md * Create 54_spark_runner.md * Create 55_direct_runner.md * Create 29_advanced_side_inputs.md * Create 30_advanced_additional_outputs.md * Create 31_advanced_batched_dofn.md * Update 31_advanced_batched_dofn.md * Update 30_advanced_additional_outputs.md * Create 01_basic_learning_apache_beam.md * Create 02_basic_pipelines.md * Create 03_basic_configuring_pipelines.md * Update 04_basic_pcollections.md * Create 04_basic_pcollections.md * Update 05_basic_ptransforms.md * Create 05_basic_ptransforms.md * Update 06_basic_schema.md * Create 06_basic_schema.md * Update 07_basic_runners.md * Update 07_basic_runners.md * Create 07_basic_runners.md * Update 07_basic_runners.md * Update 08_basic_windowing.md * Create 08_basic_windowing.md * Update 09_basic_triggers.md * Create 09_basic_triggers.md * Update 10_basic_metrics.md * Update 10_basic_metrics.md * Create 10_basic_metrics.md * Update 11_basic_state.md * Create 11_basic_state.md * Update 12_basic_timers.md * Update 12_basic_timers.md * Create 12_basic_timers.md * Update 13_advanced_splittable_dofn.md * Update 13_advanced_splittable_dofn.md * Create 13_advanced_splittable_dofn.md * Update 13_advanced_splittable_dofn.md * Update 14_advanced_pipeline_patterns.md * Update 14_advanced_pipeline_patterns.md * Create 14_advanced_pipeline_patterns.md * Update 15_advanced_xlang.md * Create 15_advanced_xlang.md * Update 16_advanced_pipeline_lifecycle.md * Update 16_advanced_pipeline_lifecycle.md * Create 16_advanced_pipeline_lifecycle.md * Update 17_advanced_ai_ml.md * Update 17_advanced_ai_ml.md * Update 17_advanced_ai_ml.md * Create 17_advanced_ai_ml.md * Update 18_io_kafka.md * Create 18_io_kafka.md * Update 19_io_pubsub.md * Create 19_io_pubsub.md * Update 20_io_biguery.md * Create 20_io_biguery.md * Update 21_io_bigtable.md * Create 21_io_bigtable.md * Update 22_io_spanner.md * Create 22_io_spanner.md * Update 23_io_gcs.md * Create 23_io_gcs.md * Update 24_io_json.md * Create 24_io_json.md * Update 25_io_csv.md * Create 25_io_csv.md * Update 26_io_avro.md * Create 26_io_avro.md * Update 27_io_parquet.md * Create 27_io_parquet.md * Update 28_io_jdbc.md * Create 28_io_jdbc.md * Update learning/prompts/documentation-lookup-nolinks/08_basic_windowing.md Co-authored-by: Andrey Devyatkin * Update learning/prompts/documentation-lookup-nolinks/09_basic_triggers.md Co-authored-by: Andrey Devyatkin * Update learning/prompts/documentation-lookup-nolinks/18_io_kafka.md Co-authored-by: Andrey Devyatkin * Update learning/prompts/documentation-lookup-nolinks/19_io_pubsub.md Co-authored-by: Andrey Devyatkin * Update learning/prompts/documentation-lookup-nolinks/20_io_biguery.md Co-authored-by: Andrey Devyatkin * Update learning/prompts/documentation-lookup-nolinks/20_io_biguery.md Co-authored-by: Andrey Devyatkin * Update learning/prompts/documentation-lookup-nolinks/21_io_bigtable.md Co-authored-by: Andrey Devyatkin * Update learning/prompts/documentation-lookup-nolinks/22_io_spanner.md Co-authored-by: Andrey Devyatkin * Update learning/prompts/documentation-lookup-nolinks/24_io_json.md Co-authored-by: Andrey Devyatkin * Update learning/prompts/documentation-lookup-nolinks/26_io_avro.md Co-authored-by: Andrey Devyatkin * Update learning/prompts/documentation-lookup-nolinks/27_io_parquet.md Co-authored-by: Andrey Devyatkin * Update learning/prompts/documentation-lookup-nolinks/25_io_csv.md Co-authored-by: Andrey Devyatkin * Update learning/prompts/documentation-lookup-nolinks/28_io_jdbc.md Co-authored-by: Andrey Devyatkin * Update learning/prompts/documentation-lookup-nolinks/30_advanced_additional_outputs.md Co-authored-by: Andrey Devyatkin * Update learning/prompts/documentation-lookup-nolinks/29_advanced_side_inputs.md Co-authored-by: Andrey Devyatkin * Update learning/prompts/documentation-lookup-nolinks/30_advanced_additional_outputs.md Co-authored-by: Andrey Devyatkin * Update learning/prompts/documentation-lookup-nolinks/30_advanced_additional_outputs.md Co-authored-by: Andrey Devyatkin * Update learning/prompts/documentation-lookup-nolinks/30_advanced_additional_outputs.md Co-authored-by: Andrey Devyatkin * Update learning/prompts/documentation-lookup-nolinks/53_dataflow_runner.md Co-authored-by: Andrey Devyatkin * Update learning/prompts/documentation-lookup-nolinks/54_spark_runner.md Co-authored-by: Andrey Devyatkin * Update 53_dataflow_runner.md * Update 07_basic_runners.md * Update 53_dataflow_runner.md --------- Co-authored-by: Andrey Devyatkin --- .../01_basic_learning_apache_beam.md | 16 ++ .../02_basic_pipelines.md | 18 +++ .../03_basic_configuring_pipelines.md | 31 ++++ .../04_basic_pcollections.md | 25 +++ .../05_basic_ptransforms.md | 34 ++++ .../06_basic_schema.md | 25 +++ .../07_basic_runners.md | 25 +++ .../08_basic_windowing.md | 31 ++++ .../09_basic_triggers.md | 31 ++++ .../10_basic_metrics.md | 25 +++ .../11_basic_state.md | 19 +++ .../12_basic_timers.md | 11 ++ .../13_advanced_splittable_dofn.md | 18 +++ .../14_advanced_pipeline_patterns.md | 14 ++ .../15_advanced_xlang.md | 11 ++ .../16_advanced_pipeline_lifecycle.md | 31 ++++ .../17_advanced_ai_ml.md | 18 +++ .../18_io_kafka.md | 29 ++++ .../19_io_pubsub.md | 23 +++ .../20_io_biguery.md | 42 +++++ .../21_io_bigtable.md | 28 ++++ .../22_io_spanner.md | 33 ++++ .../documentation-lookup-nolinks/23_io_gcs.md | 15 ++ .../24_io_json.md | 29 ++++ .../documentation-lookup-nolinks/25_io_csv.md | 30 ++++ .../26_io_avro.md | 25 +++ .../27_io_parquet.md | 21 +++ .../28_io_jdbc.md | 42 +++++ .../29_advanced_side_inputs.md | 36 +++++ .../30_advanced_additional_outputs.md | 100 ++++++++++++ .../31_advanced_batched_dofn.md | 39 +++++ .../53_dataflow_runner.md | 80 ++++++++++ .../54_spark_runner.md | 148 ++++++++++++++++++ .../55_direct_runner.md | 68 ++++++++ .../04_basic_pcollections.md | 6 +- .../05_basic_ptransforms.md | 4 +- .../documentation-lookup/06_basic_schema.md | 8 +- .../documentation-lookup/07_basic_runners.md | 10 +- .../08_basic_windowing.md | 14 +- .../documentation-lookup/09_basic_triggers.md | 14 +- .../documentation-lookup/10_basic_metrics.md | 9 +- .../documentation-lookup/11_basic_state.md | 23 +-- .../documentation-lookup/12_basic_timers.md | 7 +- .../13_advanced_splittable_dofn.md | 12 +- .../14_advanced_pipeline_patterns.md | 5 +- .../documentation-lookup/15_advanced_xlang.md | 10 +- .../16_advanced_pipeline_lifecycle.md | 9 +- .../documentation-lookup/17_advanced_ai_ml.md | 5 +- .../documentation-lookup/18_io_kafka.md | 11 +- .../documentation-lookup/19_io_pubsub.md | 13 +- .../documentation-lookup/20_io_biguery.md | 18 ++- .../documentation-lookup/21_io_bigtable.md | 9 +- .../documentation-lookup/22_io_spanner.md | 6 +- .../prompts/documentation-lookup/23_io_gcs.md | 15 +- .../documentation-lookup/24_io_json.md | 8 +- .../prompts/documentation-lookup/25_io_csv.md | 7 +- .../documentation-lookup/26_io_avro.md | 8 +- .../documentation-lookup/27_io_parquet.md | 6 +- .../documentation-lookup/28_io_jdbc.md | 9 +- .../30_advanced_additional_outputs.md | 12 +- .../31_advanced_batched_dofn.md | 27 ++-- 61 files changed, 1314 insertions(+), 142 deletions(-) create mode 100644 learning/prompts/documentation-lookup-nolinks/01_basic_learning_apache_beam.md create mode 100644 learning/prompts/documentation-lookup-nolinks/02_basic_pipelines.md create mode 100644 learning/prompts/documentation-lookup-nolinks/03_basic_configuring_pipelines.md create mode 100644 learning/prompts/documentation-lookup-nolinks/04_basic_pcollections.md create mode 100644 learning/prompts/documentation-lookup-nolinks/05_basic_ptransforms.md create mode 100644 learning/prompts/documentation-lookup-nolinks/06_basic_schema.md create mode 100644 learning/prompts/documentation-lookup-nolinks/07_basic_runners.md create mode 100644 learning/prompts/documentation-lookup-nolinks/08_basic_windowing.md create mode 100644 learning/prompts/documentation-lookup-nolinks/09_basic_triggers.md create mode 100644 learning/prompts/documentation-lookup-nolinks/10_basic_metrics.md create mode 100644 learning/prompts/documentation-lookup-nolinks/11_basic_state.md create mode 100644 learning/prompts/documentation-lookup-nolinks/12_basic_timers.md create mode 100644 learning/prompts/documentation-lookup-nolinks/13_advanced_splittable_dofn.md create mode 100644 learning/prompts/documentation-lookup-nolinks/14_advanced_pipeline_patterns.md create mode 100644 learning/prompts/documentation-lookup-nolinks/15_advanced_xlang.md create mode 100644 learning/prompts/documentation-lookup-nolinks/16_advanced_pipeline_lifecycle.md create mode 100644 learning/prompts/documentation-lookup-nolinks/17_advanced_ai_ml.md create mode 100644 learning/prompts/documentation-lookup-nolinks/18_io_kafka.md create mode 100644 learning/prompts/documentation-lookup-nolinks/19_io_pubsub.md create mode 100644 learning/prompts/documentation-lookup-nolinks/20_io_biguery.md create mode 100644 learning/prompts/documentation-lookup-nolinks/21_io_bigtable.md create mode 100644 learning/prompts/documentation-lookup-nolinks/22_io_spanner.md create mode 100644 learning/prompts/documentation-lookup-nolinks/23_io_gcs.md create mode 100644 learning/prompts/documentation-lookup-nolinks/24_io_json.md create mode 100644 learning/prompts/documentation-lookup-nolinks/25_io_csv.md create mode 100644 learning/prompts/documentation-lookup-nolinks/26_io_avro.md create mode 100644 learning/prompts/documentation-lookup-nolinks/27_io_parquet.md create mode 100644 learning/prompts/documentation-lookup-nolinks/28_io_jdbc.md create mode 100644 learning/prompts/documentation-lookup-nolinks/29_advanced_side_inputs.md create mode 100644 learning/prompts/documentation-lookup-nolinks/30_advanced_additional_outputs.md create mode 100644 learning/prompts/documentation-lookup-nolinks/31_advanced_batched_dofn.md create mode 100644 learning/prompts/documentation-lookup-nolinks/53_dataflow_runner.md create mode 100644 learning/prompts/documentation-lookup-nolinks/54_spark_runner.md create mode 100644 learning/prompts/documentation-lookup-nolinks/55_direct_runner.md diff --git a/learning/prompts/documentation-lookup-nolinks/01_basic_learning_apache_beam.md b/learning/prompts/documentation-lookup-nolinks/01_basic_learning_apache_beam.md new file mode 100644 index 000000000000..e28d44b1dfd7 --- /dev/null +++ b/learning/prompts/documentation-lookup-nolinks/01_basic_learning_apache_beam.md @@ -0,0 +1,16 @@ +Prompt: +What is Apache Beam? + +Response: +Apache Beam is an open-source unified programming model for data processing pipelines. It provides a set of language-specific SDKs that allow you to define and execute data processing workflows. +Apache Beam allows you to write pipelines in one of the supported languages (Java, Python, Go, and Scala). The pipelines can be executed on one of the supported runners (Apache Flink, Apache Spark, and Google Cloud Dataflow). + +Learning Apache Beam is a valuable skill for data processing and analysis. Here are some resources to get you started: +* The 'Get Started' section of the Apache Beam website is a perfect place to start. It provides an overview of Apache Beam and its key concepts. +* For a hands-on introduction, you can start by exploring the WordCount example provided in the 'Get Started' section, which demonstrates the basic concepts of Apache Beam. +* Tour of Beam is an interactive tour that teaches you core Beam concepts. It provides a sandbox environment to write and run pipelines while exploring different topics. +* Beam Playground offers an interactive space to experiment with the Apache Beam transforms and examples without installing anything on your system. + +After you have a basic understanding of Apache Beam, you can start to build your own pipelines. The Apache Beam website has a number of resources that can help you get started, including Apache Beam Documentation and Apache Beam Community. For those interested in the source code, you can explore the Apache Beam repository on GitHub. + +By leveraging these resources, you can build a strong foundation in Apache Beam and start constructing your own data processing pipelines. Additionally, Apache Beam is an open-source project, and contributions are welcome. Whether you're fixing bugs, improving documentation, or adding new features, your work can benefit the entire Beam community. To learn more about how you can contribute, visit the 'Contribute' section of the Apache Beam website. diff --git a/learning/prompts/documentation-lookup-nolinks/02_basic_pipelines.md b/learning/prompts/documentation-lookup-nolinks/02_basic_pipelines.md new file mode 100644 index 000000000000..29620aad6fd4 --- /dev/null +++ b/learning/prompts/documentation-lookup-nolinks/02_basic_pipelines.md @@ -0,0 +1,18 @@ +Prompt: +What is a pipeline in Apache Beam? + +Response: +In Apache Beam, a pipeline encapsulates the entirety of a data processing task, including all the data and each step of the process. Essentially, it's a Directed Acyclic Graph of transformations (known as `PTransform`) applied to data collections (`PCollection`). + +The simplest pipelines in Apache Beam follow a linear flow of operations, typically adhering to a read-process-write pattern. However, pipelines can also be significantly more complex, featuring multiple input sources, output sinks, and operations (known as a `PTransform`) that can both read from and output to multiple `PCollection` objects. For more information about pipeline design and best practices, see the 'Common Pipeline Patterns' section in the Apache Beam documentation. + +To use Apache Beam, your driver program must first create an instance of the Beam SDK classย `Pipeline`ย (typically in theย `main()`ย function). + +```python +import apache_beam as beam + +with beam.Pipeline() as pipeline: + pass # Build your pipeline here. +``` + +When you create yourย pipeline, you need to set configuration options. You can set your pipelineโ€™s configuration options programmatically. You can also set the options ahead of time, or read them from the command line, and then pass them to theย `Pipeline`ย object when you create the object. This option is often easier than setting the pipeline configuration options programmatically. diff --git a/learning/prompts/documentation-lookup-nolinks/03_basic_configuring_pipelines.md b/learning/prompts/documentation-lookup-nolinks/03_basic_configuring_pipelines.md new file mode 100644 index 000000000000..664b14df87dc --- /dev/null +++ b/learning/prompts/documentation-lookup-nolinks/03_basic_configuring_pipelines.md @@ -0,0 +1,31 @@ +Prompt: +How do I configure pipeline in Apache Beam? + +Response: +Configuring pipeline options in Apache Beam is crucial for specifying the execution environment, managing resources, and tailoring the pipeline's behavior to meet specific data processing needs and performance requirements. + +You can set pipeline options programmatically or pass them in using the command line. These options include runner-specific settings, job name, project ID (for cloud runners), machine types, number of workers, and more. + +Apache Beam offers a variety of standard pipeline options that allow you to customize and optimize your data processing pipelines. + +Beam SDKs include a command-line parser that you can use to set pipeline options. Use command-line arguments in the format `--