From 6e5ca0c0fa450a262fce5fb35dd7c66949e81401 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Thu, 19 May 2022 10:46:10 -0600 Subject: [PATCH] Update the PTransform and associated APIs to be less class-based. (#17699) --- sdks/typescript/README.md | 6 + .../src/apache_beam/coders/coders.ts | 28 +- .../src/apache_beam/examples/wordcount.ts | 19 +- .../src/apache_beam/examples/wordcount_sql.ts | 10 +- .../apache_beam/examples/wordcount_textio.ts | 19 +- .../src/apache_beam/internal/pipeline.ts | 36 ++- .../src/apache_beam/internal/urns.ts | 1 - sdks/typescript/src/apache_beam/io/textio.ts | 22 +- sdks/typescript/src/apache_beam/pvalue.ts | 104 +++++--- .../src/apache_beam/runners/direct_runner.ts | 12 +- .../src/apache_beam/testing/assert.ts | 53 ++-- .../src/apache_beam/transforms/create.ts | 29 +-- .../src/apache_beam/transforms/external.ts | 23 +- .../src/apache_beam/transforms/flatten.ts | 26 +- .../transforms/group_and_combine.ts | 161 ++++++------ .../src/apache_beam/transforms/index.ts | 2 +- .../src/apache_beam/transforms/internal.ts | 150 +++++------ .../src/apache_beam/transforms/pardo.ts | 245 ++++++++---------- .../src/apache_beam/transforms/sql.ts | 35 ++- .../src/apache_beam/transforms/transform.ts | 30 ++- .../src/apache_beam/transforms/window.ts | 186 +++++++------ .../src/apache_beam/transforms/windowings.ts | 135 ++++------ .../src/apache_beam/worker/operators.ts | 78 ++---- sdks/typescript/test/combine_test.ts | 56 ++-- sdks/typescript/test/primitives_test.ts | 88 +++---- sdks/typescript/test/standard_coders_test.ts | 24 +- sdks/typescript/test/wordcount.ts | 28 +- 27 files changed, 745 insertions(+), 861 deletions(-) diff --git a/sdks/typescript/README.md b/sdks/typescript/README.md index 7971bf1d5634..745170bba655 100644 --- a/sdks/typescript/README.md +++ b/sdks/typescript/README.md @@ -93,6 +93,12 @@ used in Python. These can be "ordinary" javascript objects (which are passed as is) or special DoFnParam objects which provide getters to element-specific information (such as the current timestamp, window, or side input) at runtime. +* Rather than introduce multiple-output complexity into the map/do operations +themselves, producing multiple outputs is done by following with a new +`Split` primitive that takes a +`PCollection<{a?: AType, b: BType, ... }>` and produces an object +`{a: PCollection, b: PCollection, ...}`. + * Javascript supports (and encourages) an asynchronous programing model, with many libraries requiring use of the async/await paradigm. As there is no way (by design) to go from the asyncronous style back to diff --git a/sdks/typescript/src/apache_beam/coders/coders.ts b/sdks/typescript/src/apache_beam/coders/coders.ts index ecb276a4bb62..3a9547fff348 100644 --- a/sdks/typescript/src/apache_beam/coders/coders.ts +++ b/sdks/typescript/src/apache_beam/coders/coders.ts @@ -38,16 +38,36 @@ interface Class { */ class CoderRegistry { internal_registry = {}; - get(urn: string): Class> { - const constructor: Class> = this.internal_registry[urn]; + + getCoder( + urn: string, + payload: Uint8Array | undefined = undefined, + ...components: Coder[] + ) { + const constructor: (...args) => Coder = + this.internal_registry[urn]; if (constructor === undefined) { throw new Error("Could not find coder for URN " + urn); } - return constructor; + if (payload && payload.length > 0) { + return constructor(payload, ...components); + } else { + return constructor(...components); + } } + // TODO: Figure out how to branch on constructors (called with new) and + // ordinary functions. register(urn: string, coderClass: Class>) { - this.internal_registry[urn] = coderClass; + this.registerClass(urn, coderClass); + } + + registerClass(urn: string, coderClass: Class>) { + this.registerConstructor(urn, (...args) => new coderClass(...args)); + } + + registerConstructor(urn: string, constructor: (...args) => Coder) { + this.internal_registry[urn] = constructor; } } diff --git a/sdks/typescript/src/apache_beam/examples/wordcount.ts b/sdks/typescript/src/apache_beam/examples/wordcount.ts index 961afb43e9bd..e34b8cab3c5c 100644 --- a/sdks/typescript/src/apache_beam/examples/wordcount.ts +++ b/sdks/typescript/src/apache_beam/examples/wordcount.ts @@ -34,20 +34,7 @@ import * as yargs from "yargs"; import * as beam from "../../apache_beam"; import { createRunner } from "../runners/runner"; - -import { count } from "../transforms/combiners"; -import { GroupBy } from "../transforms/group_and_combine"; - -class CountElements extends beam.PTransform< - beam.PCollection, - beam.PCollection -> { - expand(input: beam.PCollection) { - return input - .map((e) => ({ element: e })) - .apply(new GroupBy("element").combining("element", count, "count")); - } -} +import { countPerElement } from "../transforms/group_and_combine"; function wordCount(lines: beam.PCollection): beam.PCollection { return lines @@ -55,13 +42,13 @@ function wordCount(lines: beam.PCollection): beam.PCollection { .flatMap(function* (line: string) { yield* line.split(/[^a-z]+/); }) - .apply(new CountElements("Count")); + .apply(countPerElement()); } async function main() { await createRunner(yargs.argv).run((root) => { const lines = root.apply( - new beam.Create([ + beam.create([ "In the beginning God created the heaven and the earth.", "And the earth was without form, and void; and darkness was upon the face of the deep.", "And the Spirit of God moved upon the face of the waters.", diff --git a/sdks/typescript/src/apache_beam/examples/wordcount_sql.ts b/sdks/typescript/src/apache_beam/examples/wordcount_sql.ts index f27a08997107..c2453d64b933 100644 --- a/sdks/typescript/src/apache_beam/examples/wordcount_sql.ts +++ b/sdks/typescript/src/apache_beam/examples/wordcount_sql.ts @@ -24,9 +24,7 @@ import * as textio from "../io/textio"; import { PortableRunner } from "../runners/portable_runner/runner"; -import * as internal from "../../apache_beam/transforms/internal"; -import { RowCoder } from "../coders/row_coder"; -import { SqlTransform } from "../transforms/sql"; +import { sqlTransform } from "../transforms/sql"; async function main() { // python apache_beam/runners/portability/local_job_service_main.py --port 3333 @@ -34,13 +32,13 @@ async function main() { environmentType: "LOOPBACK", jobEndpoint: "localhost:3333", }).run(async (root) => { - const lines = root.apply(new beam.Create(["a", "b", "c", "c"])); + const lines = root.apply(beam.create(["a", "b", "c", "c"])); const filtered = await lines .map((w) => ({ word: w })) - .apply(new internal.WithCoderInternal(RowCoder.fromJSON({ word: "str" }))) + .apply(beam.withRowCoder({ word: "str" })) .asyncApply( - new SqlTransform( + sqlTransform( "SELECT word, count(*) as c from PCOLLECTION group by word" ) ); diff --git a/sdks/typescript/src/apache_beam/examples/wordcount_textio.ts b/sdks/typescript/src/apache_beam/examples/wordcount_textio.ts index 03f7149e285d..686be3b05948 100644 --- a/sdks/typescript/src/apache_beam/examples/wordcount_textio.ts +++ b/sdks/typescript/src/apache_beam/examples/wordcount_textio.ts @@ -21,22 +21,9 @@ import * as beam from "../../apache_beam"; import * as textio from "../io/textio"; import { DirectRunner } from "../runners/direct_runner"; - -import { count } from "../transforms/combiners"; -import { GroupBy } from "../transforms/group_and_combine"; - import { PortableRunner } from "../runners/portable_runner/runner"; -class CountElements extends beam.PTransform< - beam.PCollection, - beam.PCollection -> { - expand(input: beam.PCollection) { - return input - .map((e) => ({ element: e })) - .apply(new GroupBy("element").combining("element", count, "count")); - } -} +import { countPerElement } from "../transforms/group_and_combine"; function wordCount(lines: beam.PCollection): beam.PCollection { return lines @@ -44,14 +31,14 @@ function wordCount(lines: beam.PCollection): beam.PCollection { .flatMap(function* (line: string) { yield* line.split(/[^a-z]+/); }) - .apply(new CountElements("Count")); + .apply(countPerElement()); } async function main() { // python apache_beam/runners/portability/local_job_service_main.py --port 3333 await new PortableRunner("localhost:3333").run(async (root) => { const lines = await root.asyncApply( - new textio.ReadFromText("gs://dataflow-samples/shakespeare/kinglear.txt") + textio.readFromText("gs://dataflow-samples/shakespeare/kinglear.txt") ); lines.apply(wordCount).map(console.log); diff --git a/sdks/typescript/src/apache_beam/internal/pipeline.ts b/sdks/typescript/src/apache_beam/internal/pipeline.ts index 4b9f6a7b52a4..f333bb6c99b5 100644 --- a/sdks/typescript/src/apache_beam/internal/pipeline.ts +++ b/sdks/typescript/src/apache_beam/internal/pipeline.ts @@ -21,13 +21,13 @@ import equal from "fast-deep-equal"; import * as runnerApi from "../proto/beam_runner_api"; import * as fnApi from "../proto/beam_fn_api"; import { - PTransform, - AsyncPTransform, + PTransformClass, + AsyncPTransformClass, extractName, } from "../transforms/transform"; -import { GlobalWindows } from "../transforms/windowings"; +import { globalWindows } from "../transforms/windowings"; import * as pvalue from "../pvalue"; -import { WindowInto } from "../transforms/window"; +import { createWindowingStrategyProto } from "../transforms/window"; import * as environments from "./environments"; import { Coder, globalRegistry as globalCoderRegistry } from "../coders/coders"; @@ -45,18 +45,14 @@ export class PipelineContext { const this_ = this; if (this.coders[coderId] == undefined) { const coderProto = this.components.coders[coderId]; - const coderConstructor = globalCoderRegistry().get(coderProto.spec!.urn); - const components = (coderProto.componentCoderIds || []).map( - this_.getCoder.bind(this_) + const components: Coder[] = ( + coderProto.componentCoderIds || [] + ).map(this_.getCoder.bind(this_)); + this.coders[coderId] = globalCoderRegistry().getCoder( + coderProto.spec!.urn, + coderProto.spec!.payload, + ...components ); - if (coderProto.spec!.payload?.length) { - this.coders[coderId] = new coderConstructor( - coderProto.spec!.payload, - ...components - ); - } else { - this.coders[coderId] = new coderConstructor(...components); - } } return this.coders[coderId]; } @@ -132,13 +128,13 @@ export class Pipeline { environments.defaultJsEnvironment(); this.context = new PipelineContext(this.proto.components!); this.proto.components!.windowingStrategies[this.globalWindowing] = - WindowInto.createWindowingStrategy(this, new GlobalWindows()); + createWindowingStrategyProto(this, globalWindows()); } preApplyTransform< InputT extends pvalue.PValue, OutputT extends pvalue.PValue - >(transform: AsyncPTransform, input: InputT) { + >(transform: AsyncPTransformClass, input: InputT) { const this_ = this; const transformId = this.context.createUniqueName("transform"); let parent: runnerApi.PTransform | undefined = undefined; @@ -168,7 +164,7 @@ export class Pipeline { applyTransform< InputT extends pvalue.PValue, OutputT extends pvalue.PValue - >(transform: PTransform, input: InputT) { + >(transform: PTransformClass, input: InputT) { const { id: transformId, proto: transformProto } = this.preApplyTransform( transform, input @@ -186,7 +182,7 @@ export class Pipeline { async asyncApplyTransform< InputT extends pvalue.PValue, OutputT extends pvalue.PValue - >(transform: AsyncPTransform, input: InputT) { + >(transform: AsyncPTransformClass, input: InputT) { const { id: transformId, proto: transformProto } = this.preApplyTransform( transform, input @@ -205,7 +201,7 @@ export class Pipeline { InputT extends pvalue.PValue, OutputT extends pvalue.PValue >( - transform: AsyncPTransform, + transform: AsyncPTransformClass, transformProto: runnerApi.PTransform, result: OutputT ) { diff --git a/sdks/typescript/src/apache_beam/internal/urns.ts b/sdks/typescript/src/apache_beam/internal/urns.ts index 4121444a3088..e08ac5803bfe 100644 --- a/sdks/typescript/src/apache_beam/internal/urns.ts +++ b/sdks/typescript/src/apache_beam/internal/urns.ts @@ -24,7 +24,6 @@ export const IDENTITY_DOFN_URN = "beam:dofn:identity:0.1"; export const SERIALIZED_JS_DOFN_INFO = "beam:dofn:serialized_js_dofn_info:v1"; export const SPLITTING_JS_DOFN_URN = "beam:dofn:splitting_dofn:v1"; -export const SPLITTING2_JS_DOFN_URN = "beam:dofn:splitting2_dofn:v1"; export const JS_WINDOW_INTO_DOFN_URN = "beam:dofn:js_window_into:v1"; export const JS_ASSIGN_TIMESTAMPS_DOFN_URN = "beam:dofn:js_assign_timestamps:v1"; diff --git a/sdks/typescript/src/apache_beam/io/textio.ts b/sdks/typescript/src/apache_beam/io/textio.ts index adf487d3f227..58cc5f241b48 100644 --- a/sdks/typescript/src/apache_beam/io/textio.ts +++ b/sdks/typescript/src/apache_beam/io/textio.ts @@ -19,28 +19,20 @@ import * as beam from "../../apache_beam"; import * as external from "../transforms/external"; -export class ReadFromText extends beam.AsyncPTransform< - beam.Root, - beam.PCollection -> { - constructor(private filePattern: string) { - super(); - } - - async asyncExpand(root: beam.Root) { +export function readFromText( + filePattern: string +): beam.AsyncPTransform> { + return async function readFromText(root: beam.Root) { return await root.asyncApply( - new external.RawExternalTransform< - beam.PValue, - beam.PCollection - >( + external.rawExternalTransform>( "beam:transforms:python:fully_qualified_named", { constructor: "apache_beam.io.ReadFromText", - kwargs: { file_pattern: this.filePattern }, + kwargs: { file_pattern: filePattern }, }, // python apache_beam/runners/portability/expansion_service_main.py --fully_qualified_name_glob='*' --port 4444 --environment_type='beam:env:embedded_python:v1' "localhost:4444" ) ); - } + }; } diff --git a/sdks/typescript/src/apache_beam/pvalue.ts b/sdks/typescript/src/apache_beam/pvalue.ts index bf4dd4ea9fdf..1313f2241e7b 100644 --- a/sdks/typescript/src/apache_beam/pvalue.ts +++ b/sdks/typescript/src/apache_beam/pvalue.ts @@ -21,10 +21,12 @@ import { Pipeline } from "./internal/pipeline"; import { PTransform, AsyncPTransform, + PTransformClass, + AsyncPTransformClass, extractName, withName, } from "./transforms/transform"; -import { ParDo, DoFn } from "./transforms/pardo"; +import { parDo, DoFn } from "./transforms/pardo"; import * as runnerApi from "./proto/beam_runner_api"; /** @@ -38,20 +40,18 @@ export class Root { this.pipeline = pipeline; } - apply>( - transform: PTransform | ((Root) => OutputT) - ) { - if (!(transform instanceof PTransform)) { - transform = new PTransformFromCallable(transform); + apply>(transform: PTransform) { + if (!(transform instanceof PTransformClass)) { + transform = new PTransformClassFromCallable(transform); } return this.pipeline.applyTransform(transform, this); } async asyncApply>( - transform: AsyncPTransform | ((Root) => Promise) + transform: AsyncPTransform ) { - if (!(transform instanceof AsyncPTransform)) { - transform = new AsyncPTransformFromCallable(transform); + if (!(transform instanceof AsyncPTransformClass)) { + transform = new AsyncPTransformClassFromCallable(transform); } return await this.pipeline.asyncApplyTransform(transform, this); } @@ -83,21 +83,19 @@ export class PCollection { } apply>( - transform: PTransform, OutputT> | ((PCollection) => OutputT) + transform: PTransform, OutputT> ) { - if (!(transform instanceof PTransform)) { - transform = new PTransformFromCallable(transform); + if (!(transform instanceof PTransformClass)) { + transform = new PTransformClassFromCallable(transform); } return this.pipeline.applyTransform(transform, this); } asyncApply>( - transform: - | AsyncPTransform, OutputT> - | ((PCollection) => Promise) + transform: AsyncPTransform, OutputT> ) { - if (!(transform instanceof AsyncPTransform)) { - transform = new AsyncPTransformFromCallable(transform); + if (!(transform instanceof AsyncPTransformClass)) { + transform = new AsyncPTransformClassFromCallable(transform); } return this.pipeline.asyncApplyTransform(transform, this); } @@ -111,7 +109,7 @@ export class PCollection { return this.apply( withName( "map(" + extractName(fn) + ")", - new ParDo( + parDo( { process: function* (element: T, context: ContextT) { // While it's legal to call a function with extra arguments which will @@ -136,7 +134,7 @@ export class PCollection { return this.apply( withName( "flatMap(" + extractName(fn) + ")", - new ParDo( + parDo( { process: function (element: T, context: ContextT) { // While it's legal to call a function with extra arguments which will @@ -158,7 +156,7 @@ export class PCollection { } /** - * The type of object that may be consumed or produced by a PTransform. + * The type of object that may be consumed or produced by a PTransformClass. */ export type PValue = | void @@ -209,7 +207,7 @@ export function flattenPValue( * Wraps a PValue in a single object such that a transform can be applied to it. * * For example, Flatten takes a PCollection[] as input, but Array has no - * apply(PTransform) method, so one writes + * apply(PTransformClass) method, so one writes * * P([pcA, pcB, pcC]).apply(new Flatten()) */ @@ -221,21 +219,21 @@ class PValueWrapper> { constructor(private pvalue: T) {} apply>( - transform: PTransform | ((input: T) => O), + transform: PTransform, root: Root | null = null ) { - if (!(transform instanceof PTransform)) { - transform = new PTransformFromCallable(transform); + if (!(transform instanceof PTransformClass)) { + transform = new PTransformClassFromCallable(transform); } return this.pipeline(root).applyTransform(transform, this.pvalue); } async asyncApply>( - transform: AsyncPTransform | ((input: T) => Promise), + transform: AsyncPTransform, root: Root | null = null ) { - if (!(transform instanceof AsyncPTransform)) { - transform = new AsyncPTransformFromCallable(transform); + if (!(transform instanceof AsyncPTransformClass)) { + transform = new AsyncPTransformClassFromCallable(transform); } return await this.pipeline(root).asyncApplyTransform( transform, @@ -253,35 +251,63 @@ class PValueWrapper> { } } -class PTransformFromCallable< +class PTransformClassFromCallable< InputT extends PValue, OutputT extends PValue -> extends PTransform { - expander: (InputT) => OutputT; +> extends PTransformClass { + expander: ( + input: InputT, + pipeline: Pipeline, + transformProto: runnerApi.PTransform + ) => OutputT; - constructor(expander: (InputT) => OutputT) { + constructor( + expander: ( + input: InputT, + pipeline: Pipeline, + transformProto: runnerApi.PTransform + ) => OutputT + ) { super(extractName(expander)); this.expander = expander; } - expand(input: InputT) { - return this.expander(input); + expandInternal( + input: InputT, + pipeline: Pipeline, + transformProto: runnerApi.PTransform + ) { + return this.expander(input, pipeline, transformProto); } } -class AsyncPTransformFromCallable< +class AsyncPTransformClassFromCallable< InputT extends PValue, OutputT extends PValue -> extends AsyncPTransform { - expander: (InputT) => Promise; +> extends AsyncPTransformClass { + expander: ( + input: InputT, + pipeline: Pipeline, + transformProto: runnerApi.PTransform + ) => Promise; - constructor(expander: (InputT) => Promise) { + constructor( + expander: ( + input: InputT, + pipeline: Pipeline, + transformProto: runnerApi.PTransform + ) => Promise + ) { super(extractName(expander)); this.expander = expander; } - async asyncExpand(input: InputT) { - return this.expander(input); + async asyncExpandInternal( + input: InputT, + pipeline: Pipeline, + transformProto: runnerApi.PTransform + ) { + return this.expander(input, pipeline, transformProto); } } diff --git a/sdks/typescript/src/apache_beam/runners/direct_runner.ts b/sdks/typescript/src/apache_beam/runners/direct_runner.ts index ff203d06c41d..1482995eee57 100644 --- a/sdks/typescript/src/apache_beam/runners/direct_runner.ts +++ b/sdks/typescript/src/apache_beam/runners/direct_runner.ts @@ -28,13 +28,13 @@ import { JobState_Enum } from "../proto/beam_job_api"; import { Pipeline } from "../internal/pipeline"; import { Root } from "../pvalue"; -import { Impulse, GroupByKey } from "../transforms/internal"; +import { impulse, groupByKey } from "../transforms/internal"; import { Runner, PipelineResult } from "./runner"; import * as worker from "../worker/worker"; import * as operators from "../worker/operators"; import { createStateKey } from "../worker/pardo_context"; import * as state from "../worker/state"; -import { ParDo } from "../transforms/pardo"; +import { parDo } from "../transforms/pardo"; import { Window, GlobalWindow, @@ -121,7 +121,7 @@ export class DirectRunner extends Runner { descriptor, null!, new state.CachingStateProvider(stateProvider), - [Impulse.urn] + [impulse.urn] ); await processor.process("bundle_id"); @@ -165,7 +165,7 @@ class DirectImpulseOperator implements operators.IOperator { async finishBundle() {} } -operators.registerOperator(Impulse.urn, DirectImpulseOperator); +operators.registerOperator(impulse.urn, DirectImpulseOperator); // Only to be used in direct runner, as this will only group within a single bundle. // TODO: (Extension) This could be used as a base for the PGBKOperation operator, @@ -248,7 +248,7 @@ class DirectGbkOperator implements operators.IOperator { } } -operators.registerOperator(GroupByKey.urn, DirectGbkOperator); +operators.registerOperator(groupByKey.urn, DirectGbkOperator); /** * Rewrites the pipeline to be suitable for running as a single "bundle." @@ -305,7 +305,7 @@ function rewriteSideInputs(p: runnerApi.Pipeline, pipelineStateRef: string) { for (const [transformId, transform] of Object.entries(transforms)) { if ( transform.spec != undefined && - transform.spec.urn == ParDo.urn && + transform.spec.urn == parDo.urn && Object.keys(transform.inputs).length > 1 ) { const spec = runnerApi.ParDoPayload.fromBinary(transform.spec!.payload); diff --git a/sdks/typescript/src/apache_beam/testing/assert.ts b/sdks/typescript/src/apache_beam/testing/assert.ts index 0d6451c5b451..930ffca69314 100644 --- a/sdks/typescript/src/apache_beam/testing/assert.ts +++ b/sdks/typescript/src/apache_beam/testing/assert.ts @@ -17,7 +17,7 @@ */ import * as beam from "../../apache_beam"; -import { GlobalWindows } from "../../apache_beam/transforms/windowings"; +import { globalWindows } from "../../apache_beam/transforms/windowings"; import * as internal from "../transforms/internal"; import * as assert from "assert"; @@ -28,55 +28,39 @@ function callAssertDeepEqual(a, b) { } // TODO: (Naming) -export class AssertDeepEqual extends beam.PTransform< - beam.PCollection, - void -> { - expected: any[]; - - constructor(expected: any[]) { - super("AssertDeepEqual"); - this.expected = expected; - } - - expand(pcoll: beam.PCollection) { - const expected = this.expected; +export function assertDeepEqual( + expected: T[] +): beam.PTransform, void> { + return function assertDeepEqual(pcoll: beam.PCollection) { pcoll.apply( - new Assert("Assert", (actual) => { - const actualArray: any[] = [...actual]; + assertContentsSatisfies((actual: T[]) => { + const actualArray: T[] = [...actual]; expected.sort(); actualArray.sort(); callAssertDeepEqual(actualArray, expected); }) ); - } + }; } -export class Assert extends beam.PTransform, void> { - check: (actual: any[]) => void; - - constructor(name: string, check: (actual: any[]) => void) { - super(name); - this.check = check; - } - - expand(pcoll: beam.PCollection) { - const check = this.check; +export function assertContentsSatisfies( + check: (actual: T[]) => void +): beam.PTransform, void> { + function expand(pcoll: beam.PCollection) { // We provide some value here to ensure there is at least one element // so the DoFn gets invoked. const singleton = pcoll .root() - .apply(new beam.Impulse()) + .apply(beam.impulse()) .map((_) => ({ tag: "expected" })); // CoGBK. const tagged = pcoll .map((e) => ({ tag: "actual", value: e })) - .apply(new beam.WindowInto(new GlobalWindows())); + .apply(beam.windowInto(globalWindows())); beam .P([singleton, tagged]) - .apply(new beam.Flatten()) - .map((e) => ({ key: 0, value: e })) - .apply(new internal.GroupByKey()) // TODO: GroupBy. + .apply(beam.flatten()) + .apply(beam.groupBy((e) => 0)) .map( beam.withName("extractActual", (kv) => { const actual: any[] = @@ -86,6 +70,11 @@ export class Assert extends beam.PTransform, void> { }) ); } + + return beam.withName( + `assertContentsSatisfies(${beam.extractName(check)})`, + expand + ); } import { requireForSerialization } from "../serialization"; diff --git a/sdks/typescript/src/apache_beam/transforms/create.ts b/sdks/typescript/src/apache_beam/transforms/create.ts index e3413d3efb30..bf5eefa673a4 100644 --- a/sdks/typescript/src/apache_beam/transforms/create.ts +++ b/sdks/typescript/src/apache_beam/transforms/create.ts @@ -17,34 +17,19 @@ */ import { PTransform, withName } from "./transform"; -import { Impulse } from "./internal"; +import { impulse } from "./internal"; import { Root, PCollection } from "../pvalue"; /** * A Ptransform that represents a 'static' source with a list of elements passed at construction time. It * returns a PCollection that contains the elements in the input list. - * - * @extends PTransform */ -export class Create extends PTransform> { - elements: T[]; - - /** - * Construct a new Create PTransform. - * @param elements - the list of elements in the PCollection - */ - constructor(elements: T[]) { - super("Create"); - this.elements = elements; +export function create(elements: T[]): PTransform> { + function create(root: Root): PCollection { + return root + .apply(impulse()) + .flatMap(withName("ExtractElements", (_) => elements)); } - expand(root: Root) { - const elements = this.elements; - // TODO: (Cleanup) Store encoded values and conditionally shuffle. - return root.apply(new Impulse()).flatMap( - withName("ExtractElements", function* blarg(_) { - yield* elements; - }) - ); - } + return create; } diff --git a/sdks/typescript/src/apache_beam/transforms/external.ts b/sdks/typescript/src/apache_beam/transforms/external.ts index 73ba81faca7c..bfb3ebe4182d 100644 --- a/sdks/typescript/src/apache_beam/transforms/external.ts +++ b/sdks/typescript/src/apache_beam/transforms/external.ts @@ -49,10 +49,27 @@ import * as service from "../utils/service"; // TODO: (API) (Types) This class expects PCollections to already have the // correct Coders. It would be great if we could infer coders, or at least have // a cleaner way to specify them than using internal.WithCoderInternal. -export class RawExternalTransform< +export function rawExternalTransform< InputT extends PValue, OutputT extends PValue -> extends transform.AsyncPTransform { +>( + urn: string, + payload: Uint8Array | { [key: string]: any }, + serviceProviderOrAddress: string | (() => Promise), + inferPValueType: boolean = true +): transform.AsyncPTransform { + return new RawExternalTransform( + urn, + payload, + serviceProviderOrAddress, + inferPValueType + ); +} + +class RawExternalTransform< + InputT extends PValue, + OutputT extends PValue +> extends transform.AsyncPTransformClass { static namespaceCounter = 0; static freshNamespace() { return "namespace_" + RawExternalTransform.namespaceCounter++ + "_"; @@ -110,7 +127,7 @@ export class RawExternalTransform< request.components!.transforms[fakeImpulseNamespace + pcId] = runnerApi.PTransform.create({ uniqueName: fakeImpulseNamespace + "_create_" + pcId, - spec: { urn: internal.Impulse.urn, payload: new Uint8Array() }, + spec: { urn: internal.impulse.urn, payload: new Uint8Array() }, outputs: { main: pcId }, }); } diff --git a/sdks/typescript/src/apache_beam/transforms/flatten.ts b/sdks/typescript/src/apache_beam/transforms/flatten.ts index 80e80a98d19c..57d29c236a5a 100644 --- a/sdks/typescript/src/apache_beam/transforms/flatten.ts +++ b/sdks/typescript/src/apache_beam/transforms/flatten.ts @@ -17,28 +17,32 @@ */ import * as runnerApi from "../proto/beam_runner_api"; -import { PTransform } from "./transform"; +import { PTransform, withName } from "./transform"; import { PCollection } from "../pvalue"; import { Pipeline } from "../internal/pipeline"; import { GeneralObjectCoder } from "../coders/js_coders"; -export class Flatten extends PTransform[], PCollection> { - // static urn: string = runnerApi.StandardPTransforms_Primitives.GROUP_BY_KEY.urn; - // TODO: (Cleanup) use above line, not below line. - static urn: string = "beam:transform:flatten:v1"; - name = "Flatten"; - - expandInternal( +export function flatten(): PTransform[], PCollection> { + function expandInternal( inputs: PCollection[], pipeline: Pipeline, transformProto: runnerApi.PTransform ) { transformProto.spec = runnerApi.FunctionSpec.create({ - urn: Flatten.urn, + urn: flatten.urn, payload: null!, }); - // TODO: Input coder if they're all the same? UnionCoder? - return pipeline.createPCollectionInternal(new GeneralObjectCoder()); + // TODO: UnionCoder if they're not the same? + const coders = new Set( + inputs.map((pc) => pipeline.context.getPCollectionCoderId(pc)) + ); + const coder = + coders.size == 1 ? [...coders][0] : new GeneralObjectCoder(); + return pipeline.createPCollectionInternal(coder); } + + return withName("flatten", expandInternal); } + +flatten.urn = "beam:transform:flatten:v1"; diff --git a/sdks/typescript/src/apache_beam/transforms/group_and_combine.ts b/sdks/typescript/src/apache_beam/transforms/group_and_combine.ts index 6e7072cc74c4..4f7de5a0d47e 100644 --- a/sdks/typescript/src/apache_beam/transforms/group_and_combine.ts +++ b/sdks/typescript/src/apache_beam/transforms/group_and_combine.ts @@ -17,8 +17,9 @@ */ import { KV } from "../values"; -import { PTransform } from "./transform"; +import { PTransform, PTransformClass, withName } from "./transform"; import { PCollection } from "../pvalue"; +import { PValue } from "../pvalue"; import * as internal from "./internal"; import { count } from "./combiners"; @@ -40,13 +41,13 @@ export interface CombineFn { type Combiner = CombineFn | ((a: any, b: any) => any); /** - * A PTransform that takes a PCollection of elements, and returns a PCollection + * A PTransformClass that takes a PCollection of elements, and returns a PCollection * of elements grouped by a field, multiple fields, an expression that is used * as the grouping key. * - * @extends PTransform + * @extends PTransformClass */ -export class GroupBy extends PTransform< +export class GroupBy extends PTransformClass< PCollection, PCollection>> > { @@ -72,7 +73,7 @@ export class GroupBy extends PTransform< const keyFn = this.keyFn; return input .map((x) => ({ key: keyFn(x), value: x })) - .apply(new internal.GroupByKey()); + .apply(internal.groupByKey()); } combining( @@ -88,6 +89,13 @@ export class GroupBy extends PTransform< } } +export function groupBy( + key: string | string[] | ((element: T) => K), + keyName: string | undefined = undefined +): GroupBy { + return new GroupBy(key, keyName); +} + /** * Groups all elements of the input PCollection together. * @@ -95,7 +103,7 @@ export class GroupBy extends PTransform< * loses parallelization benefits in bringing all elements of a distributed * PCollection together on a single machine. */ -export class GroupGlobally extends PTransform< +export class GroupGlobally extends PTransformClass< PCollection, PCollection> > { @@ -120,7 +128,11 @@ export class GroupGlobally extends PTransform< } } -class GroupByAndCombine extends PTransform< +export function groupGlobally() { + return new GroupGlobally(); +} + +class GroupByAndCombine extends PTransformClass< PCollection, PCollection > { @@ -167,8 +179,8 @@ class GroupByAndCombine extends PTransform< }; }) .apply( - new internal.CombinePerKey( - new MultiCombineFn(this_.combiners.map((c) => c.combineFn)) + internal.combinePerKey( + multiCombineFn(this_.combiners.map((c) => c.combineFn)) ) ) .map(function constructResult(kv) { @@ -190,34 +202,30 @@ class GroupByAndCombine extends PTransform< } } -// TODO: (API) Does this carry its weight as a top-level built-in function? -// Cons: It's just a combine. Pros: It's kind of a non-obvious one. -// NOTE: The encoded form of the elements will be used for equality checking. -export class CountPerElement extends PTransform< +export function countPerElement(): PTransform< PCollection, PCollection<{ element: T; count: number }> > { - expand(input) { - return input.apply( - new GroupBy((e) => e, "element").combining((e) => e, count, "count") - ); - } + return withName( + "countPerElement", + groupBy((e) => e, "element").combining((e) => e, count, "count") + ); } -export class CountGlobally extends PTransform< +export function countGlobally(): PTransform< PCollection, PCollection > { - expand(input) { - return input + return withName("countGlobally", (input) => + input .apply(new GroupGlobally().combining((e) => e, count, "count")) - .map((o) => o.count); - } + .map((o) => o.count) + ); } function toCombineFn(combiner: Combiner): CombineFn { if (typeof combiner == "function") { - return new BinaryCombineFn(combiner); + return binaryCombineFn(combiner); } else { return combiner; } @@ -229,71 +237,62 @@ interface CombineSpec { resultName: string; } -class BinaryCombineFn implements CombineFn { - constructor(private combiner: (a: I, b: I) => I) {} - createAccumulator() { - return undefined; - } - addInput(a, b) { - if (a == undefined) { - return b; - } else { - return this.combiner(a, b); - } - } - mergeAccumulators(accs) { - return accs.filter((a) => a != undefined).reduce(this.combiner, undefined); - } - extractOutput(a) { - return a; - } +function binaryCombineFn( + combiner: (a: I, b: I) => I +): CombineFn { + return { + createAccumulator: () => undefined, + addInput: (a, b) => (a === undefined ? b : combiner(a, b)), + mergeAccumulators: (accs) => + [...accs].filter((a) => a != undefined).reduce(combiner, undefined), + extractOutput: (a) => a, + }; } -class MultiCombineFn implements CombineFn { - batchSize: number = 100; - // TODO: (Typescript) Is there a way to indicate type parameters match the above? - constructor(private combineFns: CombineFn[]) {} +// TODO: (Typescript) Is there a way to indicate type parameters match the above? +function multiCombineFn( + combineFns: CombineFn[], + batchSize: number = 100 +): CombineFn { + return { + createAccumulator: () => combineFns.map((fn) => fn.createAccumulator()), - createAccumulator() { - return this.combineFns.map((fn) => fn.createAccumulator()); - } - - addInput(accumulators: any[], inputs: any[]) { - // TODO: (Cleanup) Does javascript have a clean zip? - let result: any[] = []; - for (let i = 0; i < this.combineFns.length; i++) { - result.push(this.combineFns[i].addInput(accumulators[i], inputs[i])); - } - return result; - } + addInput: (accumulators: any[], inputs: any[]) => { + // TODO: (Cleanup) Does javascript have a clean zip? + let result: any[] = []; + for (let i = 0; i < combineFns.length; i++) { + result.push(combineFns[i].addInput(accumulators[i], inputs[i])); + } + return result; + }, - mergeAccumulators(accumulators: Iterable) { - const combineFns = this.combineFns; - let batches = combineFns.map((fn) => [fn.createAccumulator()]); - for (let acc of accumulators) { + mergeAccumulators: (accumulators: Iterable) => { + let batches = combineFns.map((fn) => [fn.createAccumulator()]); + for (let acc of accumulators) { + for (let i = 0; i < combineFns.length; i++) { + batches[i].push(acc[i]); + if (batches[i].length > batchSize) { + batches[i] = [combineFns[i].mergeAccumulators(batches[i])]; + } + } + } for (let i = 0; i < combineFns.length; i++) { - batches[i].push(acc[i]); - if (batches[i].length > this.batchSize) { + if (batches[i].length > 1) { batches[i] = [combineFns[i].mergeAccumulators(batches[i])]; } } - } - for (let i = 0; i < combineFns.length; i++) { - if (batches[i].length > 1) { - batches[i] = [combineFns[i].mergeAccumulators(batches[i])]; - } - } - return batches.map((batch) => batch[0]); - } + return batches.map((batch) => batch[0]); + }, - extractOutput(accumulators: any[]) { - // TODO: (Cleanup) Does javascript have a clean zip? - let result: any[] = []; - for (let i = 0; i < this.combineFns.length; i++) { - result.push(this.combineFns[i].extractOutput(accumulators[i])); - } - return result; - } + extractOutput: (accumulators: any[]) => { + // TODO: (Cleanup) Does javascript have a clean zip? + let result: any[] = []; + for (let i = 0; i < combineFns.length; i++) { + result.push(combineFns[i].extractOutput(accumulators[i])); + } + return result; + }, + }; } // TODO: (Typescript) Can I type T as "something that has this key" and/or, @@ -330,7 +329,5 @@ function extractFn(extractor: string | string[] | ((T) => K)) { import { requireForSerialization } from "../serialization"; requireForSerialization("apache_beam.transforms.pardo", exports); requireForSerialization("apache_beam.transforms.pardo", { - BinaryCombineFn: BinaryCombineFn, GroupByAndCombine: GroupByAndCombine, - MultiCombineFn: MultiCombineFn, }); diff --git a/sdks/typescript/src/apache_beam/transforms/index.ts b/sdks/typescript/src/apache_beam/transforms/index.ts index c659f87204f5..f0d49a6f5c34 100644 --- a/sdks/typescript/src/apache_beam/transforms/index.ts +++ b/sdks/typescript/src/apache_beam/transforms/index.ts @@ -23,7 +23,7 @@ export * from "./group_and_combine"; export * from "./pardo"; export * from "./transform"; export * from "./window"; -export { Impulse } from "./internal"; +export { impulse, withRowCoder } from "./internal"; import { requireForSerialization } from "../serialization"; requireForSerialization("apache_beam.transforms", exports); diff --git a/sdks/typescript/src/apache_beam/transforms/internal.ts b/sdks/typescript/src/apache_beam/transforms/internal.ts index 27f03c836ab7..1e257704cd13 100644 --- a/sdks/typescript/src/apache_beam/transforms/internal.ts +++ b/sdks/typescript/src/apache_beam/transforms/internal.ts @@ -19,75 +19,82 @@ import * as runnerApi from "../proto/beam_runner_api"; import * as urns from "../internal/urns"; -import { PTransform, withName } from "./transform"; +import { + PTransform, + PTransformClass, + withName, + extractName, +} from "./transform"; import { PCollection, Root } from "../pvalue"; import { Pipeline } from "../internal/pipeline"; import { Coder } from "../coders/coders"; import { BytesCoder, KVCoder, IterableCoder } from "../coders/required_coders"; -import { ParDo } from "./pardo"; +import { parDo } from "./pardo"; import { GeneralObjectCoder } from "../coders/js_coders"; +import { RowCoder } from "../coders/row_coder"; import { KV } from "../values"; import { CombineFn } from "./group_and_combine"; /** - * `Impulse` is the basic *source* primitive `PTransform`. It receives a Beam + * `Impulse` is the basic *source* primitive `PTransformClass`. It receives a Beam * Root as input, and returns a `PCollection` of `Uint8Array` with a single * element with length=0 (i.e. the empty byte array: `new Uint8Array("")`). * * `Impulse` is used to start the execution of a pipeline with a single element * that can trigger execution of a source or SDF. */ -export class Impulse extends PTransform> { - // static urn: string = runnerApi.StandardPTransforms_Primitives.IMPULSE.urn; - // TODO: (Cleanup) use above line, not below line. - static urn: string = "beam:transform:impulse:v1"; - - constructor() { - super("Impulse"); // TODO: (Unique names) pass null/nothing and get from reflection - } - - expandInternal( +export function impulse(): PTransform> { + function expandInternal( input: Root, pipeline: Pipeline, transformProto: runnerApi.PTransform - ): PCollection { + ) { transformProto.spec = runnerApi.FunctionSpec.create({ - urn: Impulse.urn, + urn: impulse.urn, payload: urns.IMPULSE_BUFFER, }); transformProto.environmentId = ""; return pipeline.createPCollectionInternal(new BytesCoder()); } + + return withName("impulse", expandInternal); } +impulse.urn = "beam:transform:impulse:v1"; + // TODO: (API) Should we offer a method on PCollection to do this? -export class WithCoderInternal extends PTransform< - PCollection, - PCollection -> { - constructor(private coder: Coder) { - super("WithCoderInternal(" + coder + ")"); - } - expandInternal( - input: PCollection, - pipeline: Pipeline, - transformProto: runnerApi.PTransform - ) { - // IDENTITY rather than Flatten for better fusion. - transformProto.spec = { - urn: ParDo.urn, - payload: runnerApi.ParDoPayload.toBinary( - runnerApi.ParDoPayload.create({ - doFn: runnerApi.FunctionSpec.create({ - urn: urns.IDENTITY_DOFN_URN, - payload: undefined!, - }), - }) - ), - }; - - return pipeline.createPCollectionInternal(this.coder); - } +export function withCoderInternal( + coder: Coder +): PTransform, PCollection> { + return withName( + `withCoderInternal(${extractName(coder)})`, + ( + input: PCollection, + pipeline: Pipeline, + transformProto: runnerApi.PTransform + ) => { + // IDENTITY rather than Flatten for better fusion. + transformProto.spec = { + urn: parDo.urn, + payload: runnerApi.ParDoPayload.toBinary( + runnerApi.ParDoPayload.create({ + doFn: runnerApi.FunctionSpec.create({ + urn: urns.IDENTITY_DOFN_URN, + payload: undefined!, + }), + }) + ), + }; + + return pipeline.createPCollectionInternal(coder); + } + ); +} + +export function withRowCoder( + exemplar: T +): PTransform, PCollection> { + return withCoderInternal(RowCoder.fromJSON(exemplar)); } /** @@ -101,15 +108,11 @@ export class WithCoderInternal extends PTransform< * `GroupByKey` operations are used under the hood to execute combines, * streaming triggers, stateful transforms, etc. */ -export class GroupByKey extends PTransform< +export function groupByKey(): PTransform< PCollection>, PCollection>> > { - // static urn: string = runnerApi.StandardPTransforms_Primitives.GROUP_BY_KEY.urn; - // TODO: (Cleanup) use above line, not below line. - static urn: string = "beam:transform:group_by_key:v1"; - - expandInternal( + function expandInternal( input: PCollection>, pipeline: Pipeline, transformProto: runnerApi.PTransform @@ -124,15 +127,15 @@ export class GroupByKey extends PTransform< if (inputCoderProto.spec!.urn != KVCoder.URN) { return input .apply( - new WithCoderInternal( + withCoderInternal( new KVCoder(new GeneralObjectCoder(), new GeneralObjectCoder()) ) ) - .apply(new GroupByKey()); + .apply(groupByKey()); } transformProto.spec = runnerApi.FunctionSpec.create({ - urn: GroupByKey.urn, + urn: groupByKey.urn, payload: undefined!, }); transformProto.environmentId = ""; @@ -144,8 +147,13 @@ export class GroupByKey extends PTransform< const outputCoder = new KVCoder(keyCoder, iterableValueCoder); return pipeline.createPCollectionInternal(outputCoder); } + + return withName("groupByKey", expandInternal); } +// TODO: (Cleanup) runnerApi.StandardPTransformClasss_Primitives.GROUP_BY_KEY.urn. +groupByKey.urn = "beam:transform:group_by_key:v1"; + /** * This transform is used to perform aggregations over groups of elements. * @@ -159,28 +167,24 @@ export class GroupByKey extends PTransform< * before a `GroupByKey` and after the `GroupByKey`. The partial aggregations * help reduce the original data into a single aggregator per key per worker. */ -export class CombinePerKey extends PTransform< - PCollection>, - PCollection> -> { - constructor(private combineFn: CombineFn) { - super(); +export function combinePerKey( + combineFn: CombineFn +): PTransform>, PCollection>> { + function expandInternal(input: PCollection>) { + return input // + .apply(groupByKey()) + .map( + withName("applyCombine", (kv) => ({ + key: kv.key, + value: combineFn.extractOutput( + kv.value.reduce( + combineFn.addInput.bind(combineFn), + combineFn.createAccumulator() + ) + ), + })) + ); } - // Let the runner do the combiner lifting, when possible, handling timestamps, - // windowing, and triggering as needed. - expand(input: PCollection>) { - const combineFn = this.combineFn; - return input.apply(new GroupByKey()).map( - withName("applyCombine", (kv) => ({ - key: kv.key, - value: combineFn.extractOutput( - kv.value.reduce( - combineFn.addInput.bind(combineFn), - combineFn.createAccumulator() - ) - ), - })) - ); - } + return withName(`combinePerKey(${extractName(combineFn)})`, expandInternal); } diff --git a/sdks/typescript/src/apache_beam/transforms/pardo.ts b/sdks/typescript/src/apache_beam/transforms/pardo.ts index d6c550299b9a..e05c0cb14f5b 100644 --- a/sdks/typescript/src/apache_beam/transforms/pardo.ts +++ b/sdks/typescript/src/apache_beam/transforms/pardo.ts @@ -23,7 +23,12 @@ import { GeneralObjectCoder } from "../coders/js_coders"; import { PCollection } from "../pvalue"; import { Pipeline } from "../internal/pipeline"; import { serializeFn } from "../internal/serialize"; -import { PTransform, extractName } from "./transform"; +import { + PTransform, + PTransformClass, + withName, + extractName, +} from "./transform"; import { PaneInfo, Instant, Window, WindowedValue } from "../values"; export interface DoFn { @@ -40,46 +45,36 @@ export interface DoFn { // TODO: (API) Do we need an AsyncDoFn (and async[Flat]Map) to be able to call // async functions in the body of the fns. Or can they always be Async? // The latter seems to have perf issues. -// (For PTransforms, it's a major usability issue, but maybe we can always +// (For PTransformClasss, it's a major usability issue, but maybe we can always // await when calling user code. OTOH, I don't know what the performance // impact would be for creating promises for every element of every operation // which is typically a very performance critical spot to optimize.) -export class ParDo extends PTransform< - PCollection, - PCollection -> { - private doFn: DoFn; - private context: ContextT; - // static urn: string = runnerApi.StandardPTransforms_Primitives.PAR_DO.urn; - // TODO: (Cleanup) use above line, not below line. - static urn: string = "beam:transform:pardo:v1"; - // TODO: (Typescript) Can the arg be optional iff ContextT is undefined? - constructor( - doFn: DoFn, - contextx: ContextT = undefined! - ) { - super(() => "ParDo(" + extractName(doFn) + ")"); - this.doFn = doFn; - this.context = contextx; - } - - expandInternal( +// TODO: (Typescript) Can the context arg be optional iff ContextT is undefined? +export function parDo< + InputT, + OutputT, + ContextT extends Object | undefined = undefined +>( + doFn: DoFn, + context: ContextT = undefined! +): PTransform, PCollection> { + function expandInternal( input: PCollection, pipeline: Pipeline, transformProto: runnerApi.PTransform ) { // Extract and populate side inputs from the context. - var context; const sideInputs = {}; - if (typeof this.context == "object") { - context = Object.create(this.context as Object); + var contextCopy; + if (typeof context == "object") { + contextCopy = Object.create(context as Object) as any; const components = pipeline.context.components; - for (const [name, value] of Object.entries(this.context)) { + for (const [name, value] of Object.entries(context)) { if (value instanceof SideInputParam) { const inputName = "side." + name; transformProto.inputs[inputName] = value.pcoll.getId(); - context[name] = copySideInputWithId(value, inputName); + contextCopy[name] = copySideInputWithId(value, inputName); const mainWindowingStrategyId = components.pcollections[input.getId()].windowingStrategyId; const sideWindowingStrategyId = @@ -109,23 +104,23 @@ export class ParDo extends PTransform< }, }; } else { - context[name] = value; + contextCopy[name] = value; } } } else { - context = this.context; + contextCopy = context; } // Now finally construct the proto. transformProto.spec = runnerApi.FunctionSpec.create({ - urn: ParDo.urn, + urn: parDo.urn, payload: runnerApi.ParDoPayload.toBinary( runnerApi.ParDoPayload.create({ doFn: runnerApi.FunctionSpec.create({ urn: urns.SERIALIZED_JS_DOFN_INFO, payload: serializeFn({ - doFn: this.doFn, - context: context, + doFn: doFn, + context: contextCopy, }), }), sideInputs: sideInputs, @@ -140,89 +135,68 @@ export class ParDo extends PTransform< new GeneralObjectCoder() ); } + + return withName(`parDo(${extractName(doFn)})`, expandInternal); } +// TODO: (Cleanup) use runnerApi.StandardPTransformClasss_Primitives.PAR_DO.urn. +parDo.urn = "beam:transform:pardo:v1"; + +export type SplitOptions = { + knownTags?: string[]; + unknownTagBehavior?: "error" | "ignore" | "rename" | undefined; + unknownTagName?: string; + exclusive?: boolean; +}; + +/** + * Splits a single PCollection of objects, with keys k, into an object of + * PCollections, with the same keys k, where each PCollection consists of the + * values associated with that key. That is, + * + * PCollection<{a: T, b: U, ...}> maps to {a: PCollection, b: PCollection, ...} + */ // TODO: (API) Consider as top-level method. // TODO: Naming. -// TODO: Allow default? Technically splitter can be implemented/wrapped to produce such. -// TODO: Can we enforce splitter's output with the typing system to lie in targets? -// TODO: (Optionally?) delete the switched-on field. -// TODO: (API) Consider doing -// [{a: aValue}, {g: bValue}, ...] => a: [aValue, ...], b: [bValue, ...] -// instead of -// [{key: 'a', aValueFields}, {key: 'b', bValueFields}, ...] => -// a: [{key: 'a', aValueFields}, ...], b: [{key: 'b', aValueFields}, ...], -// (implemented below as Split2). -export class Split extends PTransform< - PCollection, - { [key: string]: PCollection } -> { - private tags: string[]; - constructor(private splitter: (T) => string, ...tags: string[]) { - super("Split(" + tags + ")"); - this.tags = tags; - } - expandInternal( +export function split( + tags: string[], + options: SplitOptions = {} +): PTransform, { [P in keyof T]: PCollection }> { + function expandInternal( input: PCollection, pipeline: Pipeline, transformProto: runnerApi.PTransform ) { - transformProto.spec = runnerApi.FunctionSpec.create({ - urn: ParDo.urn, - payload: runnerApi.ParDoPayload.toBinary( - runnerApi.ParDoPayload.create({ - doFn: runnerApi.FunctionSpec.create({ - urn: urns.SPLITTING_JS_DOFN_URN, - payload: serializeFn({ splitter: this.splitter }), - }), - }) - ), - }); - - const this_ = this; - return Object.fromEntries( - this_.tags.map((tag) => [ - tag, - pipeline.createPCollectionInternal( - pipeline.context.getPCollectionCoderId(input) - ), - ]) - ); - } -} + if (options.exclusive === undefined) { + options.exclusive = true; + } + if (options.unknownTagBehavior === undefined) { + options.unknownTagBehavior = "error"; + } + if ( + options.unknownTagBehavior == "rename" && + !tags.includes(options.unknownTagName!) + ) { + tags.push(options.unknownTagName!); + } + if (options.knownTags === undefined) { + options.knownTags = tags; + } -// TODO: (Typescript) Is it possible to type that this takes -// PCollection<{a: T, b: U, ...}> to {a: PCollection, b: PCollection, ...} -// Seems to requires a cast inside expandInternal. But at least the cast is contained there. -export class Split2 extends PTransform< - PCollection, - { [P in keyof T]: PCollection } -> { - private tags: string[]; - constructor(...tags: string[]) { - super("Split2(" + tags + ")"); - this.tags = tags; - } - expandInternal( - input: PCollection, - pipeline: Pipeline, - transformProto: runnerApi.PTransform - ) { transformProto.spec = runnerApi.FunctionSpec.create({ - urn: ParDo.urn, + urn: parDo.urn, payload: runnerApi.ParDoPayload.toBinary( runnerApi.ParDoPayload.create({ doFn: runnerApi.FunctionSpec.create({ - urn: urns.SPLITTING2_JS_DOFN_URN, - payload: new Uint8Array(), + urn: urns.SPLITTING_JS_DOFN_URN, + payload: serializeFn(options), }), }) ), }); - const this_ = this; return Object.fromEntries( - this_.tags.map((tag) => [ + tags.map((tag) => [ tag, pipeline.createPCollectionInternal( pipeline.context.getPCollectionCoderId(input) @@ -230,6 +204,8 @@ export class Split2 extends PTransform< ]) ) as { [P in keyof T]: PCollection }; } + + return withName(`Split(${tags})`, expandInternal); } /* @@ -238,15 +214,11 @@ export class Split2 extends PTransform< * special `lookup` method to retrieve the relevant value associated with the * currently-being-processed element. */ -export abstract class ParDoParam { - readonly parDoParamName: string; - +export class ParDoParam { // Provided externally. private provider: ParamProvider | undefined; - constructor(parDoParamName: string) { - this.parDoParamName = parDoParamName; - } + constructor(readonly parDoParamName: string) {} // TODO: Nameing "get" seems to be special. lookup(): T { @@ -266,23 +238,17 @@ export interface ParamProvider { provide(param: ParDoParam): T; } -export class WindowParam extends ParDoParam { - constructor() { - super("window"); - } +export function windowParam(): ParDoParam { + return new ParDoParam("window"); } -export class TimestampParam extends ParDoParam { - constructor() { - super("timestamp"); - } +export function timestampParam(): ParDoParam { + return new ParDoParam("timestamp"); } // TODO: Naming. Should this be PaneParam? -export class PaneInfoParam extends ParDoParam { - constructor() { - super("paneinfo"); - } +export function paneInfoParam(): ParDoParam { + return new ParDoParam("paneinfo"); } interface SideInputAccessor { @@ -330,35 +296,32 @@ function copySideInputWithId( return copy; } -export class IterableSideInput extends SideInputParam< - T, - Iterable, - Iterable -> { - constructor(pcoll: PCollection) { - super(pcoll, { - accessPattern: "beam:side_input:iterable:v1", - toValue: (iter: Iterable) => iter, - }); - } +export function iterableSideInput( + pcoll: PCollection +): SideInputParam, Iterable> { + return new SideInputParam, Iterable>(pcoll, { + accessPattern: "beam:side_input:iterable:v1", + toValue: (iter: Iterable) => iter, + }); } -export class SingletonSideInput extends SideInputParam, T> { - constructor(pcoll: PCollection, defaultValue: T | undefined = undefined) { - super(pcoll, { - accessPattern: "beam:side_input:iterable:v1", - toValue: (iter: Iterable) => { - const asArray = Array.from(iter); - if (asArray.length == 0 && defaultValue != undefined) { - return defaultValue; - } else if (asArray.length == 1) { - return asArray[0]; - } else { - throw new Error("Expected a single element, got " + asArray.length); - } - }, - }); - } +export function singletonSideInput( + pcoll: PCollection, + defaultValue: T | undefined = undefined +): SideInputParam, T> { + return new SideInputParam, T>(pcoll, { + accessPattern: "beam:side_input:iterable:v1", + toValue: (iter: Iterable) => { + const asArray = Array.from(iter); + if (asArray.length == 0 && defaultValue != undefined) { + return defaultValue; + } else if (asArray.length == 1) { + return asArray[0]; + } else { + throw new Error("Expected a single element, got " + asArray.length); + } + }, + }); } // TODO: (Extension) Map side inputs. diff --git a/sdks/typescript/src/apache_beam/transforms/sql.ts b/sdks/typescript/src/apache_beam/transforms/sql.ts index 33a92423c58c..417a277609ce 100644 --- a/sdks/typescript/src/apache_beam/transforms/sql.ts +++ b/sdks/typescript/src/apache_beam/transforms/sql.ts @@ -31,25 +31,23 @@ import { serviceProviderFromJavaGradleTarget } from "../utils/service"; * corresponding names can be used in the sql statement. * * The input(s) must be schema'd (i.e. use the RowCoder). This can be done - * by explicitly setting the schema with internal.WithCoderInternal or passing + * by explicitly setting the schema with external.withRowCoder or passing * a prototype element in as a second argument, e.g. * * pcoll.applyAsync( - * new SqlTransform( + * sqlTransform( * "select a, b from PCOLLECTION", * {a: 0, b: "string"}, * )); */ -export class SqlTransform< +export function sqlTransform< InputT extends PCollection | { [key: string]: PCollection } -> extends transform.AsyncPTransform> { +>( + query: string, + inputTypes = null +): transform.AsyncPTransform> { // TOOD: (API) (Typescript): How to infer input_types, or at least make it optional. - constructor(private query: string, private inputTypes = null) { - // TODO: Unique names. Should we truncate/omit the full SQL statement? - super("Sql(" + query + ")"); - } - - async asyncExpand(input: InputT): Promise> { + async function expandInternal(input: InputT): Promise> { function withCoder(pcoll: PCollection, type): PCollection { if (type == null) { if ( @@ -67,33 +65,30 @@ export class SqlTransform< } return pcoll; } - return pcoll.apply( - new internal.WithCoderInternal(row_coder.RowCoder.fromJSON(type)) - ); + return pcoll.apply(internal.withRowCoder(type)); } if (input instanceof PCollection) { - input = withCoder(input, this.inputTypes) as InputT; + input = withCoder(input, inputTypes) as InputT; } else { input = Object.fromEntries( Object.keys(input).map((tag) => [ tag, - withCoder( - input[tag], - this.inputTypes == null ? null : this.inputTypes[tag] - ), + withCoder(input[tag], inputTypes == null ? null : inputTypes[tag]), ]) ) as InputT; } return await P(input).asyncApply( - new external.RawExternalTransform( + external.rawExternalTransform( "beam:external:java:sql:v1", - { query: this.query }, + { query: query }, serviceProviderFromJavaGradleTarget( "sdks:java:extensions:sql:expansion-service:shadowJar" ) ) ); } + + return transform.withName(`sqlTransform(${query})`, expandInternal); } diff --git a/sdks/typescript/src/apache_beam/transforms/transform.ts b/sdks/typescript/src/apache_beam/transforms/transform.ts index 4d7fcfcdc02b..55b1da53a75b 100644 --- a/sdks/typescript/src/apache_beam/transforms/transform.ts +++ b/sdks/typescript/src/apache_beam/transforms/transform.ts @@ -63,7 +63,7 @@ export function extractName(withName: T): string { // call rather than forcing the asynchronous nature all the way up the call // hierarchy). -export class AsyncPTransform< +export class AsyncPTransformClass< InputT extends PValue, OutputT extends PValue > { @@ -86,10 +86,10 @@ export class AsyncPTransform< } } -export class PTransform< +export class PTransformClass< InputT extends PValue, OutputT extends PValue -> extends AsyncPTransform { +> extends AsyncPTransformClass { expand(input: InputT): OutputT { throw new Error("Method expand has not been implemented."); } @@ -114,3 +114,27 @@ export class PTransform< return this.expandInternal(input, pipeline, transformProto); } } + +export type AsyncPTransform< + InputT extends PValue, + OutputT extends PValue +> = + | AsyncPTransformClass + | ((input: InputT) => Promise) + | (( + input: InputT, + pipeline: Pipeline, + transformProto: runnerApi.PTransform + ) => Promise); + +export type PTransform< + InputT extends PValue, + OutputT extends PValue +> = + | PTransformClass + | ((input: InputT) => OutputT) + | (( + input: InputT, + pipeline: Pipeline, + transformProto: runnerApi.PTransform + ) => OutputT); diff --git a/sdks/typescript/src/apache_beam/transforms/window.ts b/sdks/typescript/src/apache_beam/transforms/window.ts index 0d66bcbb949c..57a335ba580d 100644 --- a/sdks/typescript/src/apache_beam/transforms/window.ts +++ b/sdks/typescript/src/apache_beam/transforms/window.ts @@ -19,12 +19,12 @@ import * as runnerApi from "../proto/beam_runner_api"; import * as urns from "../internal/urns"; -import { PTransform } from "./transform"; +import { PTransform, withName, extractName } from "./transform"; import { Coder } from "../coders/coders"; import { Window } from "../values"; import { PCollection } from "../pvalue"; import { Pipeline } from "../internal/pipeline"; -import { ParDo } from "./pardo"; +import { parDo } from "./pardo"; import { serializeFn } from "../internal/serialize"; export interface WindowFn { @@ -35,108 +35,96 @@ export interface WindowFn { assignsToOneWindow: () => boolean; } -export class WindowInto extends PTransform< - PCollection, - PCollection -> { - static createWindowingStrategy( - pipeline: Pipeline, - windowFn: WindowFn, - windowingStrategyBase: runnerApi.WindowingStrategy | undefined = undefined - ): runnerApi.WindowingStrategy { - let result: runnerApi.WindowingStrategy; - if (windowingStrategyBase == undefined) { - result = { - windowFn: undefined!, - windowCoderId: undefined!, - mergeStatus: undefined!, - assignsToOneWindow: undefined!, - trigger: { trigger: { oneofKind: "default", default: {} } }, - accumulationMode: runnerApi.AccumulationMode_Enum.DISCARDING, - outputTime: runnerApi.OutputTime_Enum.END_OF_WINDOW, - closingBehavior: runnerApi.ClosingBehavior_Enum.EMIT_ALWAYS, - onTimeBehavior: runnerApi.OnTimeBehavior_Enum.FIRE_ALWAYS, - allowedLateness: BigInt(0), - environmentId: pipeline.defaultEnvironment, - }; - } else { - result = runnerApi.WindowingStrategy.clone(windowingStrategyBase); - } - result.windowFn = windowFn.toProto(); - result.windowCoderId = pipeline.context.getCoderId(windowFn.windowCoder()); - result.mergeStatus = windowFn.isMerging() - ? runnerApi.MergeStatus_Enum.NEEDS_MERGE - : runnerApi.MergeStatus_Enum.NON_MERGING; - result.assignsToOneWindow = windowFn.assignsToOneWindow(); - return result; - } - - constructor( - private windowFn: WindowFn, - private windowingStrategyBase: - | runnerApi.WindowingStrategy - | undefined = undefined - ) { - super("WindowInto(" + windowFn + ", " + windowingStrategyBase + ")"); +export function createWindowingStrategyProto( + pipeline: Pipeline, + windowFn: WindowFn, + windowingStrategyBase: runnerApi.WindowingStrategy | undefined = undefined +): runnerApi.WindowingStrategy { + let result: runnerApi.WindowingStrategy; + if (windowingStrategyBase == undefined) { + result = { + windowFn: undefined!, + windowCoderId: undefined!, + mergeStatus: undefined!, + assignsToOneWindow: undefined!, + trigger: { trigger: { oneofKind: "default", default: {} } }, + accumulationMode: runnerApi.AccumulationMode_Enum.DISCARDING, + outputTime: runnerApi.OutputTime_Enum.END_OF_WINDOW, + closingBehavior: runnerApi.ClosingBehavior_Enum.EMIT_ALWAYS, + onTimeBehavior: runnerApi.OnTimeBehavior_Enum.FIRE_ALWAYS, + allowedLateness: BigInt(0), + environmentId: pipeline.defaultEnvironment, + }; + } else { + result = runnerApi.WindowingStrategy.clone(windowingStrategyBase); } + result.windowFn = windowFn.toProto(); + result.windowCoderId = pipeline.context.getCoderId(windowFn.windowCoder()); + result.mergeStatus = windowFn.isMerging() + ? runnerApi.MergeStatus_Enum.NEEDS_MERGE + : runnerApi.MergeStatus_Enum.NON_MERGING; + result.assignsToOneWindow = windowFn.assignsToOneWindow(); + return result; +} - expandInternal( - input: PCollection, - pipeline: Pipeline, - transformProto: runnerApi.PTransform - ) { - transformProto.spec = runnerApi.FunctionSpec.create({ - urn: ParDo.urn, - payload: runnerApi.ParDoPayload.toBinary( - runnerApi.ParDoPayload.create({ - doFn: runnerApi.FunctionSpec.create({ - urn: urns.JS_WINDOW_INTO_DOFN_URN, - payload: serializeFn({ windowFn: this.windowFn }), - }), - }) - ), - }); +export function windowInto( + windowFn: WindowFn, + windowingStrategyBase: runnerApi.WindowingStrategy | undefined = undefined +): PTransform, PCollection> { + return withName( + `WindowInto(${extractName(windowFn)}, ${windowingStrategyBase})`, + ( + input: PCollection, + pipeline: Pipeline, + transformProto: runnerApi.PTransform + ) => { + transformProto.spec = runnerApi.FunctionSpec.create({ + urn: parDo.urn, + payload: runnerApi.ParDoPayload.toBinary( + runnerApi.ParDoPayload.create({ + doFn: runnerApi.FunctionSpec.create({ + urn: urns.JS_WINDOW_INTO_DOFN_URN, + payload: serializeFn({ windowFn: windowFn }), + }), + }) + ), + }); - const inputCoder = pipeline.context.getPCollectionCoderId(input); - return pipeline.createPCollectionInternal( - inputCoder, - WindowInto.createWindowingStrategy( - pipeline, - this.windowFn, - this.windowingStrategyBase - ) - ); - } + const inputCoder = pipeline.context.getPCollectionCoderId(input); + return pipeline.createPCollectionInternal( + inputCoder, + createWindowingStrategyProto(pipeline, windowFn, windowingStrategyBase) + ); + } + ); } // TODO: (Cleanup) Add restrictions on moving backwards? -export class AssignTimestamps extends PTransform< - PCollection, - PCollection -> { - constructor(private func: (T, Instant) => typeof Instant) { - super(); - } - - expandInternal( - input: PCollection, - pipeline: Pipeline, - transformProto: runnerApi.PTransform - ) { - transformProto.spec = runnerApi.FunctionSpec.create({ - urn: ParDo.urn, - payload: runnerApi.ParDoPayload.toBinary( - runnerApi.ParDoPayload.create({ - doFn: runnerApi.FunctionSpec.create({ - urn: urns.JS_ASSIGN_TIMESTAMPS_DOFN_URN, - payload: serializeFn({ func: this.func }), - }), - }) - ), - }); +export function assignTimestamps( + timestampFn: (T, Instant) => typeof Instant +): PTransform, PCollection> { + return withName( + `assignTimestamp(${extractName(timestampFn)})`, + ( + input: PCollection, + pipeline: Pipeline, + transformProto: runnerApi.PTransform + ) => { + transformProto.spec = runnerApi.FunctionSpec.create({ + urn: parDo.urn, + payload: runnerApi.ParDoPayload.toBinary( + runnerApi.ParDoPayload.create({ + doFn: runnerApi.FunctionSpec.create({ + urn: urns.JS_ASSIGN_TIMESTAMPS_DOFN_URN, + payload: serializeFn({ func: timestampFn }), + }), + }) + ), + }); - return pipeline.createPCollectionInternal( - pipeline.context.getPCollectionCoderId(input) - ); - } + return pipeline.createPCollectionInternal( + pipeline.context.getPCollectionCoderId(input) + ); + } + ); } diff --git a/sdks/typescript/src/apache_beam/transforms/windowings.ts b/sdks/typescript/src/apache_beam/transforms/windowings.ts index 2350e417e047..89c4941ed078 100644 --- a/sdks/typescript/src/apache_beam/transforms/windowings.ts +++ b/sdks/typescript/src/apache_beam/transforms/windowings.ts @@ -30,106 +30,70 @@ import { } from "../coders/standard_coders"; import { GlobalWindow, Instant, IntervalWindow } from "../values"; -export class GlobalWindows implements WindowFn { - assignWindows(Instant) { - return [new GlobalWindow()]; - } - windowCoder() { - return new GlobalWindowCoder(); - } - toProto() { - return { +export function globalWindows(): WindowFn { + return { + assignWindows: (Instant) => [new GlobalWindow()], + windowCoder: () => new GlobalWindowCoder(), + isMerging: () => false, + assignsToOneWindow: () => true, + toProto: () => ({ urn: "beam:window_fn:global_windows:v1", payload: new Uint8Array(), - }; - } - isMerging() { - return false; - } - assignsToOneWindow() { - return true; - } + }), + }; } -export class FixedWindows implements WindowFn { - size: Long; - offset: Instant; // TODO: (Cleanup) Or should this be a long as well? - +export function fixedWindows( + sizeSeconds: number | Long, + offsetSeconds: Instant = Long.fromValue(0) +): WindowFn { // TODO: (Cleanup) Use a time library? - constructor( - sizeSeconds: number | Long, - offsetSeconds: Instant = Long.fromValue(0) - ) { - if (typeof sizeSeconds == "number") { - this.size = Long.fromValue(sizeSeconds).mul(1000); - } else { - this.size = sizeSeconds.mul(1000); - } - this.offset = offsetSeconds.mul(1000); - } + const sizeMillis = secsToMillisLong(sizeSeconds); + const offsetMillis = secsToMillisLong(offsetSeconds); - assignWindows(t: Instant) { - const start = t.sub(t.sub(this.offset).mod(this.size)); - return [new IntervalWindow(start, start.add(this.size))]; - } + return { + assignWindows: (t: Instant) => { + const start = t.sub(t.sub(offsetMillis).mod(sizeMillis)); + return [new IntervalWindow(start, start.add(sizeMillis))]; + }, - windowCoder() { - return new IntervalWindowCoder(); - } + windowCoder: () => new IntervalWindowCoder(), + isMerging: () => false, + assignsToOneWindow: () => true, - toProto() { - return { + toProto: () => ({ urn: "beam:window_fn:fixed_windows:v1", payload: FixedWindowsPayload.toBinary({ - size: millisToProto(this.size), - offset: millisToProto(this.offset), + size: millisToProto(sizeMillis), + offset: millisToProto(offsetMillis), }), - }; - } - - isMerging() { - return false; - } - - assignsToOneWindow() { - return true; - } + }), + }; } -export class Sessions implements WindowFn { - gap: Long; - - constructor(gapSeconds: number | Long) { - if (typeof gapSeconds == "number") { - this.gap = Long.fromValue(gapSeconds).mul(1000); - } else { - this.gap = gapSeconds.mul(1000); - } - } +export function sessions(gapSeconds: number | Long): WindowFn { + const gapMillis = secsToMillisLong(gapSeconds); - assignWindows(t: Instant) { - return [new IntervalWindow(t, t.add(this.gap))]; - } + return { + assignWindows: (t: Instant) => [new IntervalWindow(t, t.add(gapMillis))], + windowCoder: () => new IntervalWindowCoder(), + isMerging: () => true, + assignsToOneWindow: () => true, - windowCoder() { - return new IntervalWindowCoder(); - } - - toProto() { - return { + toProto: () => ({ urn: "beam:window_fn:session_windows:v1", payload: SessionWindowsPayload.toBinary({ - gapSize: millisToProto(this.gap), + gapSize: millisToProto(gapMillis), }), - }; - } - - isMerging() { - return true; - } + }), + }; +} - assignsToOneWindow() { - return true; +function secsToMillisLong(secs: number | Long): Long { + if (typeof secs == "number") { + return Long.fromValue(secs * 1000); + } else { + return secs.mul(1000); } } @@ -139,3 +103,12 @@ function millisToProto(t: Long) { import { requireForSerialization } from "../serialization"; requireForSerialization("apache_beam.transforms.windowings", exports); +requireForSerialization("apache_beam.transforms.windowings", millisToProto); +requireForSerialization( + "apache_beam.transforms.windowings", + FixedWindowsPayload +); +requireForSerialization( + "apache_beam.transforms.windowings", + SessionWindowsPayload +); diff --git a/sdks/typescript/src/apache_beam/worker/operators.ts b/sdks/typescript/src/apache_beam/worker/operators.ts index f7bfa71038bf..366e794c0d6c 100644 --- a/sdks/typescript/src/apache_beam/worker/operators.ts +++ b/sdks/typescript/src/apache_beam/worker/operators.ts @@ -30,7 +30,7 @@ import { PipelineContext } from "../internal/pipeline"; import { deserializeFn } from "../internal/serialize"; import { Coder, Context as CoderContext } from "../coders/coders"; import { Window, Instant, PaneInfo, WindowedValue } from "../values"; -import { ParDo, DoFn, ParDoParam } from "../transforms/pardo"; +import { parDo, DoFn, ParDoParam, SplitOptions } from "../transforms/pardo"; import { WindowFn } from "../transforms/window"; import { @@ -445,42 +445,37 @@ class IdentityParDoOperator implements IOperator { class SplittingDoFnOperator implements IOperator { constructor( - private splitter: (any) => string, - private receivers: { [key: string]: Receiver } + private receivers: { [key: string]: Receiver }, + private options: SplitOptions ) {} async startBundle() {} process(wvalue: WindowedValue) { - const tag = this.splitter(wvalue.value); - const receiver = this.receivers[tag]; - if (receiver) { - return receiver.receive(wvalue); - } else { - // TODO: (API) Make this configurable. + const result = new ProcessResultBuilder(); + const keys = Object.keys(wvalue.value as object); + if (this.options.exclusive && keys.length != 1) { throw new Error( - "Unexpected tag '" + - tag + - "' for " + - wvalue.value + - " not in " + - [...Object.keys(this.receivers)] + "Multiple keys for exclusively split element: " + wvalue.value ); } - } - - async finishBundle() {} -} - -class Splitting2DoFnOperator implements IOperator { - constructor(private receivers: { [key: string]: Receiver }) {} - - async startBundle() {} - - process(wvalue: WindowedValue) { - const result = new ProcessResultBuilder(); - // TODO: (API) Should I exactly one instead of allowing a union? - for (const tag of Object.keys(wvalue.value as object)) { + for (let tag of keys) { + if (!this.options.knownTags!.includes(tag)) { + if (this.options.unknownTagBehavior == "rename") { + tag = this.options.unknownTagName!; + } else if (this.options.unknownTagBehavior == "ignore") { + continue; + } else { + throw new Error( + "Unexpected tag '" + + tag + + "' for " + + wvalue.value + + " not in " + + this.options.knownTags + ); + } + } const receiver = this.receivers[tag]; if (receiver) { result.add( @@ -491,16 +486,6 @@ class Splitting2DoFnOperator implements IOperator { pane: wvalue.pane, }) ); - } else { - // TODO: (API) Make this configurable. - throw new Error( - "Unexpected tag '" + - tag + - "' for " + - wvalue.value + - " not in " + - [...Object.keys(this.receivers)] - ); } } return result.build(); @@ -558,7 +543,7 @@ class AssignTimestampsParDoOperator implements IOperator { } registerOperatorConstructor( - ParDo.urn, + parDo.urn, (transformId: string, transform: PTransform, context: OperatorContext) => { const receiver = context.getReceiver( onlyElement(Object.values(transform.outputs)) @@ -590,22 +575,13 @@ registerOperatorConstructor( ); } else if (spec.doFn?.urn == urns.SPLITTING_JS_DOFN_URN) { return new SplittingDoFnOperator( - deserializeFn(spec.doFn.payload!).splitter, - Object.fromEntries( - Object.entries(transform.outputs).map(([tag, pcId]) => [ - tag, - context.getReceiver(pcId), - ]) - ) - ); - } else if (spec.doFn?.urn == urns.SPLITTING2_JS_DOFN_URN) { - return new Splitting2DoFnOperator( Object.fromEntries( Object.entries(transform.outputs).map(([tag, pcId]) => [ tag, context.getReceiver(pcId), ]) - ) + ), + deserializeFn(spec.doFn.payload!) ); } else { throw new Error("Unknown DoFn type: " + spec); diff --git a/sdks/typescript/test/combine_test.ts b/sdks/typescript/test/combine_test.ts index c98e420ee17f..f54f0a90e801 100644 --- a/sdks/typescript/test/combine_test.ts +++ b/sdks/typescript/test/combine_test.ts @@ -25,17 +25,17 @@ import { PortableRunner } from "../src/apache_beam/runners/portable_runner/runne import * as combiners from "../src/apache_beam/transforms/combiners"; import { CombineFn, - GroupBy, - GroupGlobally, - CountPerElement, - CountGlobally, + groupBy, + groupGlobally, + countPerElement, + countGlobally, } from "../src/apache_beam/transforms/group_and_combine"; describe("Apache Beam combiners", function () { it("runs wordcount with a countPerKey transform and asserts the result", async function () { await new DirectRunner().run((root) => { const lines = root.apply( - new beam.Create([ + beam.create([ "In the beginning God created the heaven and the earth.", "And the earth was without form, and void; and darkness was upon the face of the deep.", "And the Spirit of God moved upon the face of the waters.", @@ -49,9 +49,9 @@ describe("Apache Beam combiners", function () { yield* line.split(/[^a-z]+/); }) .map((elm) => ({ key: elm, value: 1 })) - .apply(new GroupBy("key").combining("value", combiners.sum, "value")) + .apply(groupBy("key").combining("value", combiners.sum, "value")) .apply( - new testing.AssertDeepEqual([ + testing.assertDeepEqual([ { key: "in", value: 1 }, { key: "the", value: 9 }, { key: "beginning", value: 1 }, @@ -86,9 +86,7 @@ describe("Apache Beam combiners", function () { it("runs wordcount with a countGlobally transform and asserts the result", async function () { await new DirectRunner().run((root) => { const lines = root.apply( - new beam.Create([ - "And God said, Let there be light: and there was light", - ]) + beam.create(["And God said, Let there be light: and there was light"]) ); lines @@ -96,8 +94,8 @@ describe("Apache Beam combiners", function () { .flatMap(function* splitWords(line: string) { yield* line.split(/[^a-z]+/); }) - .apply(new CountGlobally()) - .apply(new testing.AssertDeepEqual([11])); + .apply(countGlobally()) + .apply(testing.assertDeepEqual([11])); }); }); @@ -138,7 +136,7 @@ describe("Apache Beam combiners", function () { await new DirectRunner().run((root) => { const lines = root.apply( - new beam.Create([ + beam.create([ "In the beginning God created the heaven and the earth.", "And the earth was without form, and void; and darkness was upon the face of the deep.", "And the Spirit of God moved upon the face of the waters.", @@ -153,12 +151,12 @@ describe("Apache Beam combiners", function () { }) .map((word) => word.length) .apply( - new GroupGlobally() + groupGlobally() .combining((c) => c, combiners.mean, "mean") .combining((c) => c, unstableStdDevCombineFn(), "stdDev") ) .apply( - new testing.AssertDeepEqual([ + testing.assertDeepEqual([ { mean: 3.611111111111111, stdDev: 3.2746913580246897 }, ]) ); @@ -168,7 +166,7 @@ describe("Apache Beam combiners", function () { it("test GroupBy with combining", async function () { await new DirectRunner().run((root) => { const inputs = root.apply( - new beam.Create([ + beam.create([ { k: "k1", a: 1, b: 100 }, { k: "k1", a: 2, b: 200 }, { k: "k2", a: 9, b: 1000 }, @@ -177,13 +175,13 @@ describe("Apache Beam combiners", function () { inputs .apply( - new GroupBy("k") + groupBy("k") .combining("a", combiners.max, "aMax") .combining("a", combiners.sum, "aSum") .combining("b", combiners.mean, "mean") ) .apply( - new testing.AssertDeepEqual([ + testing.assertDeepEqual([ { k: "k1", aMax: 2, aSum: 3, mean: 150 }, { k: "k2", aMax: 9, aSum: 9, mean: 1000 }, ]) @@ -194,7 +192,7 @@ describe("Apache Beam combiners", function () { it("test GroupBy list with combining", async function () { await new DirectRunner().run((root) => { const inputs = root.apply( - new beam.Create([ + beam.create([ { a: 1, b: 10, c: 100 }, { a: 2, b: 10, c: 100 }, { a: 1, b: 10, c: 400 }, @@ -202,18 +200,18 @@ describe("Apache Beam combiners", function () { ); inputs - .apply(new GroupBy(["a", "b"]).combining("c", combiners.sum, "sum")) + .apply(groupBy(["a", "b"]).combining("c", combiners.sum, "sum")) .apply( - new testing.AssertDeepEqual([ + testing.assertDeepEqual([ { a: 1, b: 10, sum: 500 }, { a: 2, b: 10, sum: 100 }, ]) ); inputs - .apply(new GroupBy(["b", "c"]).combining("a", combiners.sum, "sum")) + .apply(groupBy(["b", "c"]).combining("a", combiners.sum, "sum")) .apply( - new testing.AssertDeepEqual([ + testing.assertDeepEqual([ { b: 10, c: 100, sum: 3 }, { b: 10, c: 400, sum: 1 }, ]) @@ -224,7 +222,7 @@ describe("Apache Beam combiners", function () { it("test GroupBy expr with combining", async function () { await new DirectRunner().run((root) => { const inputs = root.apply( - new beam.Create([ + beam.create([ { a: 1, b: 10 }, { a: 0, b: 20 }, { a: -1, b: 30 }, @@ -233,14 +231,14 @@ describe("Apache Beam combiners", function () { inputs .apply( - new GroupBy((element: any) => element.a * element.a).combining( + groupBy((element: any) => element.a * element.a).combining( "b", combiners.sum, "sum" ) ) .apply( - new testing.AssertDeepEqual([ + testing.assertDeepEqual([ { key: 1, sum: 40 }, { key: 0, sum: 20 }, ]) @@ -251,7 +249,7 @@ describe("Apache Beam combiners", function () { it("test GroupBy with binary combinefn", async function () { await new DirectRunner().run((root) => { const inputs = root.apply( - new beam.Create([ + beam.create([ { key: 0, value: 10 }, { key: 1, value: 20 }, { key: 0, value: 30 }, @@ -260,12 +258,12 @@ describe("Apache Beam combiners", function () { inputs .apply( - new GroupBy("key") + groupBy("key") .combining("value", (x, y) => x + y, "sum") .combining("value", (x, y) => Math.max(x, y), "max") ) .apply( - new testing.AssertDeepEqual([ + testing.assertDeepEqual([ { key: 0, sum: 40, max: 30 }, { key: 1, sum: 20, max: 20 }, ]) diff --git a/sdks/typescript/test/primitives_test.ts b/sdks/typescript/test/primitives_test.ts index 0840ca7c93c4..c95c2a2633e5 100644 --- a/sdks/typescript/test/primitives_test.ts +++ b/sdks/typescript/test/primitives_test.ts @@ -25,10 +25,6 @@ import { IterableCoder, KVCoder, } from "../src/apache_beam/coders/standard_coders"; -import { - GroupBy, - GroupGlobally, -} from "../src/apache_beam/transforms/group_and_combine"; import * as combiners from "../src/apache_beam/transforms/combiners"; import { GeneralObjectCoder } from "../src/apache_beam/coders/js_coders"; @@ -45,84 +41,74 @@ describe("primitives module", function () { it("runs a map", async function () { await new DirectRunner().run((root) => { const pcolls = root - .apply(new beam.Create([1, 2, 3])) + .apply(beam.create([1, 2, 3])) .map((x) => x * x) - .apply(new testing.AssertDeepEqual([1, 4, 9])); + .apply(testing.assertDeepEqual([1, 4, 9])); }); }); it("runs a flatmap", async function () { await new DirectRunner().run((root) => { const pcolls = root - .apply(new beam.Create(["a b", "c"])) + .apply(beam.create(["a b", "c"])) .flatMap((s) => s.split(/ +/)) - .apply(new testing.AssertDeepEqual(["a", "b", "c"])); + .apply(testing.assertDeepEqual(["a", "b", "c"])); }); }); it("runs a Splitter", async function () { await new DirectRunner().run((root) => { const pcolls = root - .apply(new beam.Create(["apple", "apricot", "banana"])) - .apply(new beam.Split((e) => e[0], "a", "b")); - pcolls.a.apply(new testing.AssertDeepEqual(["apple", "apricot"])); - pcolls.b.apply(new testing.AssertDeepEqual(["banana"])); - }); - }); - - it("runs a Splitter2", async function () { - await new DirectRunner().run((root) => { - const pcolls = root - .apply(new beam.Create([{ a: 1 }, { b: 10 }, { a: 2, b: 20 }])) - .apply(new beam.Split2("a", "b")); - pcolls.a.apply(new testing.AssertDeepEqual([1, 2])); - pcolls.b.apply(new testing.AssertDeepEqual([10, 20])); + .apply(beam.create([{ a: 1 }, { b: 10 }, { a: 2, b: 20 }])) + .apply(beam.split(["a", "b"], { exclusive: false })); + pcolls.a.apply(testing.assertDeepEqual([1, 2])); + pcolls.b.apply(testing.assertDeepEqual([10, 20])); }); }); it("runs a map with context", async function () { await new DirectRunner().run((root) => { root - .apply(new beam.Create([1, 2, 3])) + .apply(beam.create([1, 2, 3])) .map((a: number, b: number) => a + b, 100) - .apply(new testing.AssertDeepEqual([101, 102, 103])); + .apply(testing.assertDeepEqual([101, 102, 103])); }); }); it("runs a map with singleton side input", async function () { await new DirectRunner().run((root) => { - const input = root.apply(new beam.Create([1, 2, 1])); - const sideInput = root.apply(new beam.Create([4])); + const input = root.apply(beam.create([1, 2, 1])); + const sideInput = root.apply(beam.create([4])); input .map((e, context) => e / context.side.lookup(), { - side: new pardo.SingletonSideInput(sideInput), + side: pardo.singletonSideInput(sideInput), }) - .apply(new testing.AssertDeepEqual([0.25, 0.5, 0.25])); + .apply(testing.assertDeepEqual([0.25, 0.5, 0.25])); }); }); it("runs a map with a side input sharing input root", async function () { await new DirectRunner().run((root) => { - const input = root.apply(new beam.Create([1, 2, 1])); + const input = root.apply(beam.create([1, 2, 1])); // TODO: Can this type be inferred? const sideInput: beam.PCollection<{ sum: number }> = input.apply( - new GroupGlobally().combining((e) => e, combiners.sum, "sum") + beam.groupGlobally().combining((e) => e, combiners.sum, "sum") ); input .map((e, context) => e / context.side.lookup().sum, { - side: new pardo.SingletonSideInput(sideInput), + side: pardo.singletonSideInput(sideInput), }) - .apply(new testing.AssertDeepEqual([0.25, 0.5, 0.25])); + .apply(testing.assertDeepEqual([0.25, 0.5, 0.25])); }); }); it("runs a map with window-sensitive context", async function () { await new DirectRunner().run((root) => { root - .apply(new beam.Create([1, 2, 3, 4, 5, 10, 11, 12])) - .apply(new beam.AssignTimestamps((t) => Long.fromValue(t * 1000))) - .apply(new beam.WindowInto(new windowings.FixedWindows(10))) - .apply(new beam.GroupBy((e: number) => "")) + .apply(beam.create([1, 2, 3, 4, 5, 10, 11, 12])) + .apply(beam.assignTimestamps((t) => Long.fromValue(t * 1000))) + .apply(beam.windowInto(windowings.fixedWindows(10))) + .apply(beam.groupBy((e: number) => "")) .map( withName( "MapWithContext", @@ -138,10 +124,10 @@ describe("primitives module", function () { ), // This is the context to pass as the second argument. // At each element, window.get() will return the associated window. - { window: new pardo.WindowParam(), other: "A" } + { window: pardo.windowParam(), other: "A" } ) .apply( - new testing.AssertDeepEqual([ + testing.assertDeepEqual([ { key: "", value: [1, 2, 3, 4, 5], window_start_ms: 0, a: "A" }, { key: "", value: [10, 11, 12], window_start_ms: 10000, a: "A" }, ]) @@ -152,11 +138,11 @@ describe("primitives module", function () { it("runs a WindowInto", async function () { await new DirectRunner().run((root) => { root - .apply(new beam.Create(["apple", "apricot", "banana"])) - .apply(new beam.WindowInto(new windowings.GlobalWindows())) - .apply(new beam.GroupBy((e: string) => e[0])) + .apply(beam.create(["apple", "apricot", "banana"])) + .apply(beam.windowInto(windowings.globalWindows())) + .apply(beam.groupBy((e: string) => e[0])) .apply( - new testing.AssertDeepEqual([ + testing.assertDeepEqual([ { key: "a", value: ["apple", "apricot"] }, { key: "b", value: ["banana"] }, ]) @@ -167,12 +153,12 @@ describe("primitives module", function () { it("runs a WindowInto IntervalWindow", async function () { await new DirectRunner().run((root) => { root - .apply(new beam.Create([1, 2, 3, 4, 5, 10, 11, 12])) - .apply(new beam.AssignTimestamps((t) => Long.fromValue(t * 1000))) - .apply(new beam.WindowInto(new windowings.FixedWindows(10))) - .apply(new beam.GroupBy((e: number) => "")) + .apply(beam.create([1, 2, 3, 4, 5, 10, 11, 12])) + .apply(beam.assignTimestamps((t) => Long.fromValue(t * 1000))) + .apply(beam.windowInto(windowings.fixedWindows(10))) + .apply(beam.groupBy((e: number) => "")) .apply( - new testing.AssertDeepEqual([ + testing.assertDeepEqual([ { key: "", value: [1, 2, 3, 4, 5] }, { key: "", value: [10, 11, 12] }, ]) @@ -185,7 +171,7 @@ describe("primitives module", function () { // TODO: test output with direct runner. it("runs a basic Impulse expansion", function () { var p = new Pipeline(); - var res = new beam.Root(p).apply(new beam.Impulse()); + var res = new beam.Root(p).apply(beam.impulse()); assert.equal(res.type, "pcollection"); assert.deepEqual(p.context.getPCollectionCoder(res), new BytesCoder()); @@ -193,7 +179,7 @@ describe("primitives module", function () { it("runs a ParDo expansion", function () { var p = new Pipeline(); var res = new beam.Root(p) - .apply(new beam.Impulse()) + .apply(beam.impulse()) .map(function (v: any) { return v * 2; }) @@ -211,11 +197,11 @@ describe("primitives module", function () { it("runs a GroupBy expansion", function () { var p = new Pipeline(); var res = new beam.Root(p) - .apply(new beam.Impulse()) + .apply(beam.impulse()) .map(function createElement(v) { return { name: "pablo", lastName: "wat" }; }) - .apply(new GroupBy("lastName")); + .apply(beam.groupBy("lastName")); assert.deepEqual( p.context.getPCollectionCoder(res), diff --git a/sdks/typescript/test/standard_coders_test.ts b/sdks/typescript/test/standard_coders_test.ts index dd80ce12d667..3d34199fccb7 100644 --- a/sdks/typescript/test/standard_coders_test.ts +++ b/sdks/typescript/test/standard_coders_test.ts @@ -137,29 +137,23 @@ describe("standard Beam coders on Javascript", function () { contexts.forEach((context) => { describe("in Context " + context, function () { const spec = doc; - - const coderConstructor = globalRegistry().get(urn); - var coder; + var components; if (spec.coder.components) { - var components; - try { - components = spec.coder.components.map( - (c) => new (globalRegistry().get(c.urn))() - ); - } catch (Error) { - return; - } - coder = new coderConstructor(...components); + components = spec.coder.components.map( + // Second level coders have neither payloads nor components. + (c) => globalRegistry().getCoder(c.urn) + ); } else { - coder = new coderConstructor(); + components = []; } - describeCoder(coder, urn, context, spec); + const coder = globalRegistry().getCoder(urn, undefined, ...components); + runCoderTest(coder, urn, context, spec); }); }); }); }); -function describeCoder(coder: Coder, urn, context, spec: CoderSpec) { +function runCoderTest(coder: Coder, urn, context, spec: CoderSpec) { describe( util.format( "coder %s (%s)", diff --git a/sdks/typescript/test/wordcount.ts b/sdks/typescript/test/wordcount.ts index f13655616e32..b7267ef55899 100644 --- a/sdks/typescript/test/wordcount.ts +++ b/sdks/typescript/test/wordcount.ts @@ -19,9 +19,6 @@ import * as beam from "../src/apache_beam"; import { DirectRunner } from "../src/apache_beam/runners/direct_runner"; import * as testing from "../src/apache_beam/testing/assert"; -import { KV } from "../src/apache_beam/values"; -import { GroupBy } from "../src/apache_beam/transforms/group_and_combine"; -import * as combiners from "../src/apache_beam/transforms/combiners"; import { PortableRunner } from "../src/apache_beam/runners/portable_runner/runner"; @@ -33,29 +30,14 @@ function wordCount( .flatMap(function* splitWords(line: string) { yield* line.split(/[^a-z]+/); }) - .apply(new CountElements("Count")); -} - -class CountElements extends beam.PTransform< - beam.PCollection, - beam.PCollection> -> { - expand(input: beam.PCollection) { - return input.apply( - new GroupBy((e) => e, "element").combining( - (e) => 1, - combiners.sum, - "count" - ) - ); - } + .apply(beam.countPerElement()); } describe("wordcount", function () { it("wordcount", async function () { await new DirectRunner().run((root) => { const lines = root.apply( - new beam.Create([ + beam.create([ "In the beginning God created the heaven and the earth.", "And the earth was without form, and void; and darkness was upon the face of the deep.", "And the Spirit of God moved upon the face of the waters.", @@ -70,13 +52,11 @@ describe("wordcount", function () { it("wordcount assert", async function () { await new DirectRunner().run((root) => { const lines = root.apply( - new beam.Create([ - "And God said, Let there be light: and there was light", - ]) + beam.create(["And God said, Let there be light: and there was light"]) ); lines.apply(wordCount).apply( - new testing.AssertDeepEqual([ + testing.assertDeepEqual([ { element: "and", count: 2 }, { element: "god", count: 1 }, { element: "said", count: 1 },