diff --git a/examples/multi-language/python/wordcount_external.py b/examples/multi-language/python/wordcount_external.py index 7298d81c1b44..580c0269d361 100644 --- a/examples/multi-language/python/wordcount_external.py +++ b/examples/multi-language/python/wordcount_external.py @@ -18,8 +18,8 @@ import logging import apache_beam as beam +from apache_beam.io import ReadFromText from apache_beam.options.pipeline_options import PipelineOptions -from apache_beam.transforms.external import BeamJarExpansionService from apache_beam.transforms.external_transform_provider import ExternalTransformProvider from apache_beam.typehints.row_type import RowTypeConstraint """A Python multi-language pipeline that counts words using multiple Java SchemaTransforms. @@ -60,35 +60,39 @@ --expansion_service_port """ +# Original Java transform is in ExtractWordsProvider.java EXTRACT_IDENTIFIER = "beam:schematransform:org.apache.beam:extract_words:v1" +# Original Java transform is in JavaCountProvider.java COUNT_IDENTIFIER = "beam:schematransform:org.apache.beam:count:v1" +# Original Java transform is in WriteWordsProvider.java WRITE_IDENTIFIER = "beam:schematransform:org.apache.beam:write_words:v1" def run(input_path, output_path, expansion_service_port, pipeline_args): pipeline_options = PipelineOptions(pipeline_args) + # Discover and get external transforms from this expansion service + provider = ExternalTransformProvider("localhost:" + expansion_service_port) + # Get transforms with identifiers, then use them as you would a regular + # native PTransform + Extract = provider.get_urn(EXTRACT_IDENTIFIER) + Count = provider.get_urn(COUNT_IDENTIFIER) + Write = provider.get_urn(WRITE_IDENTIFIER) + with beam.Pipeline(options=pipeline_options) as p: - expansion_service = BeamJarExpansionService( - "examples:multi-language:shadowJar") - if expansion_service_port: - expansion_service = "localhost:" + expansion_service_port - - provider = ExternalTransformProvider(expansion_service) - # Retrieve portable transforms - Extract = provider.get_urn(EXTRACT_IDENTIFIER) - Count = provider.get_urn(COUNT_IDENTIFIER) - Write = provider.get_urn(WRITE_IDENTIFIER) - - _ = (p - | 'Read' >> beam.io.ReadFromText(input_path) - | 'Prepare Rows' >> beam.Map(lambda line: beam.Row(line=line)) - | 'Extract Words' >> Extract(drop=["king", "palace"]) - | 'Count Words' >> Count() - | 'Format Text' >> beam.Map(lambda row: beam.Row(line="%s: %s" % ( - row.word, row.count))).with_output_types( - RowTypeConstraint.from_fields([('line', str)])) - | 'Write' >> Write(file_path_prefix=output_path)) + lines = p | 'Read' >> ReadFromText(input_path) + + words = (lines + | 'Prepare Rows' >> beam.Map(lambda line: beam.Row(line=line)) + | 'Extract Words' >> Extract()) + word_counts = words | 'Count Words' >> Count() + formatted_words = ( + word_counts + | 'Format Text' >> beam.Map(lambda row: beam.Row(line="%s: %s" % ( + row.word, row.count))).with_output_types( + RowTypeConstraint.from_fields([('line', str)]))) + + formatted_words | 'Write' >> Write(file_path_prefix=output_path) if __name__ == '__main__': @@ -106,10 +110,8 @@ def run(input_path, output_path, expansion_service_port, pipeline_args): help='Output file') parser.add_argument('--expansion_service_port', dest='expansion_service_port', - required=False, - help='Expansion service port. If left empty, the ' - 'existing multi-language examples service will ' - 'be used by default.') + required=True, + help='Expansion service port') known_args, pipeline_args = parser.parse_known_args() run(known_args.input, known_args.output, known_args.expansion_service_port, diff --git a/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/ExtractWordsProvider.java b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/ExtractWordsProvider.java index b7224ecec6b4..724dbce276fb 100644 --- a/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/ExtractWordsProvider.java +++ b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/ExtractWordsProvider.java @@ -21,12 +21,9 @@ import com.google.auto.service.AutoService; import com.google.auto.value.AutoValue; -import java.util.Arrays; -import java.util.List; import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; -import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; @@ -39,6 +36,7 @@ /** Splits a line into separate words and returns each word. */ @AutoService(SchemaTransformProvider.class) public class ExtractWordsProvider extends TypedSchemaTransformProvider { + public static final Schema OUTPUT_SCHEMA = Schema.builder().addStringField("word").build(); @Override public String identifier() { @@ -47,60 +45,32 @@ public String identifier() { @Override protected SchemaTransform from(Configuration configuration) { - return new ExtractWordsTransform(configuration); + return new SchemaTransform() { + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + return PCollectionRowTuple.of( + "output", + input.get("input").apply(ParDo.of(new ExtractWordsFn())).setRowSchema(OUTPUT_SCHEMA)); + } + }; } - static class ExtractWordsTransform extends SchemaTransform { - private static final Schema OUTPUT_SCHEMA = Schema.builder().addStringField("word").build(); - private final List drop; + static class ExtractWordsFn extends DoFn { + @ProcessElement + public void processElement(@Element Row element, OutputReceiver receiver) { + // Split the line into words. + String line = Preconditions.checkStateNotNull(element.getString("line")); + String[] words = line.split("[^\\p{L}]+", -1); - ExtractWordsTransform(Configuration configuration) { - this.drop = configuration.getDrop(); - } - - @Override - public PCollectionRowTuple expand(PCollectionRowTuple input) { - return PCollectionRowTuple.of( - "output", - input - .getSinglePCollection() - .apply( - ParDo.of( - new DoFn() { - @ProcessElement - public void process(@Element Row element, OutputReceiver receiver) { - // Split the line into words. - String line = Preconditions.checkStateNotNull(element.getString("line")); - String[] words = line.split("[^\\p{L}]+", -1); - Arrays.stream(words) - .filter(w -> !drop.contains(w)) - .forEach( - word -> - receiver.output( - Row.withSchema(OUTPUT_SCHEMA) - .withFieldValue("word", word) - .build())); - } - })) - .setRowSchema(OUTPUT_SCHEMA)); + for (String word : words) { + if (!word.isEmpty()) { + receiver.output(Row.withSchema(OUTPUT_SCHEMA).withFieldValue("word", word).build()); + } + } } } @DefaultSchema(AutoValueSchema.class) @AutoValue - public abstract static class Configuration { - public static Builder builder() { - return new AutoValue_ExtractWordsProvider_Configuration.Builder(); - } - - @SchemaFieldDescription("List of words to drop.") - public abstract List getDrop(); - - @AutoValue.Builder - public abstract static class Builder { - public abstract Builder setDrop(List foo); - - public abstract Configuration build(); - } - } + protected abstract static class Configuration {} } diff --git a/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/JavaCountProvider.java b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/JavaCountProvider.java index 90d02d92c3cb..cabea594ae18 100644 --- a/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/JavaCountProvider.java +++ b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/JavaCountProvider.java @@ -44,37 +44,35 @@ public String identifier() { @Override protected SchemaTransform from(Configuration configuration) { - return new JavaCountTransform(); - } - - static class JavaCountTransform extends SchemaTransform { - static final Schema OUTPUT_SCHEMA = - Schema.builder().addStringField("word").addInt64Field("count").build(); + return new SchemaTransform() { + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + Schema outputSchema = + Schema.builder().addStringField("word").addInt64Field("count").build(); - @Override - public PCollectionRowTuple expand(PCollectionRowTuple input) { - PCollection wordCounts = - input - .get("input") - .apply(Count.perElement()) - .apply( - MapElements.into(TypeDescriptors.rows()) - .via( - kv -> - Row.withSchema(OUTPUT_SCHEMA) - .withFieldValue( - "word", - Preconditions.checkStateNotNull( - kv.getKey().getString("word"))) - .withFieldValue("count", kv.getValue()) - .build())) - .setRowSchema(OUTPUT_SCHEMA); + PCollection wordCounts = + input + .get("input") + .apply(Count.perElement()) + .apply( + MapElements.into(TypeDescriptors.rows()) + .via( + kv -> + Row.withSchema(outputSchema) + .withFieldValue( + "word", + Preconditions.checkStateNotNull( + kv.getKey().getString("word"))) + .withFieldValue("count", kv.getValue()) + .build())) + .setRowSchema(outputSchema); - return PCollectionRowTuple.of("output", wordCounts); - } + return PCollectionRowTuple.of("output", wordCounts); + } + }; } @DefaultSchema(AutoValueSchema.class) @AutoValue - public abstract static class Configuration {} + protected abstract static class Configuration {} } diff --git a/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/WriteWordsProvider.java b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/WriteWordsProvider.java index faf9590a7f16..0b2017c5587a 100644 --- a/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/WriteWordsProvider.java +++ b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/WriteWordsProvider.java @@ -42,32 +42,24 @@ public String identifier() { @Override protected SchemaTransform from(Configuration configuration) { - return new WriteWordsTransform(configuration); - } - - static class WriteWordsTransform extends SchemaTransform { - private final String filePathPrefix; - - WriteWordsTransform(Configuration configuration) { - this.filePathPrefix = configuration.getFilePathPrefix(); - } - - @Override - public PCollectionRowTuple expand(PCollectionRowTuple input) { - input - .get("input") - .apply( - MapElements.into(TypeDescriptors.strings()) - .via(row -> Preconditions.checkStateNotNull(row.getString("line")))) - .apply(TextIO.write().to(filePathPrefix)); + return new SchemaTransform() { + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + input + .get("input") + .apply( + MapElements.into(TypeDescriptors.strings()) + .via(row -> Preconditions.checkStateNotNull(row.getString("line")))) + .apply(TextIO.write().to(configuration.getFilePathPrefix())); - return PCollectionRowTuple.empty(input.getPipeline()); - } + return PCollectionRowTuple.empty(input.getPipeline()); + } + }; } @DefaultSchema(AutoValueSchema.class) @AutoValue - public abstract static class Configuration { + protected abstract static class Configuration { public static Builder builder() { return new AutoValue_WriteWordsProvider_Configuration.Builder(); } diff --git a/sdks/python/apache_beam/transforms/external.py b/sdks/python/apache_beam/transforms/external.py index 9ca5886f4cc2..fb37a8fd974d 100644 --- a/sdks/python/apache_beam/transforms/external.py +++ b/sdks/python/apache_beam/transforms/external.py @@ -239,8 +239,7 @@ def dict_to_row(schema_proto, py_value): extra = set(py_value.keys()) - set(row_type._fields) if extra: raise ValueError( - f"Transform '{self.identifier()}' was configured with unknown " - f"fields: {extra}. Valid fields: {set(row_type._fields)}") + f"Unknown fields: {extra}. Valid fields: {row_type._fields}") return row_type( *[ dict_to_row_recursive(