diff --git a/sdks/node-ts/src/apache_beam/base.ts b/sdks/node-ts/src/apache_beam/base.ts index 66165998686b..91811e445f72 100644 --- a/sdks/node-ts/src/apache_beam/base.ts +++ b/sdks/node-ts/src/apache_beam/base.ts @@ -15,7 +15,7 @@ export function pcollectionName() { var _transform_counter = -1; export function transformName() { _transform_counter += 1; - return 'transform_' + _transform_counter; + return '(' + _transform_counter + ')'; } /** @@ -37,7 +37,11 @@ class PValue { } // TODO(pabloem): What about multiple outputs? (not strictly necessary ATM. Can do with Filters) - apply(transform: PTransform): PCollection { + apply(transform: PTransform, name?: string): PCollection { + // TODO(pabloem): We need a transform name that we can infer! + if (!name) { + name = transform.type ? transform.type + transformName() : transformName(); + } const outPcoll = transform.expand(this); if (outPcoll.type !== 'pcollection') { throw new Error(util.format('Trahsform %s does not return a PCollection', transform)); @@ -45,8 +49,8 @@ class PValue { return outPcoll as PCollection; } - map(callable: DoFn | GenericCallable): PCollection { - return this.apply(new ParDo(callable)); + map(callable: DoFn | GenericCallable, name?: string): PCollection { + return this.apply(new ParDo(callable), name); } // Top-level functions: @@ -87,6 +91,8 @@ export class PCollection extends PValue { } export class PTransform { + type: string = "ptransform"; + name: string; expand(input: PValue): PValue { throw new Error('Method expand has not been implemented.'); } @@ -112,183 +118,192 @@ export interface GenericCallable { } - export class Impulse extends PTransform { - // static urn: string = runnerApi.StandardPTransforms_Primitives.IMPULSE.urn; - // TODO: use above line, not below line. - static urn: string = "beam:transform:impulse:v1"; - expand(input: Pipeline): PCollection { - if (!input.isPipeline()) { - throw new Error("User is attempting to apply Impulse transform to a non-pipeline object."); - } - const pipeline = input as Pipeline; - - const pcollName = pcollectionName(); - - const coderId = translations.registerPipelineCoder(runnerApi.Coder.create({'spec': runnerApi.FunctionSpec.create({'urn': BytesCoder.URN})}), pipeline.proto.components!); - pipeline.coders[coderId] = new BytesCoder(); - - const outputProto = runnerApi.PCollection.create({ - 'uniqueName': pcollName, - 'coderId': coderId, - 'isBounded': runnerApi.IsBounded_Enum.BOUNDED - }); - - const impulseProto = runnerApi.PTransform.create({ - // TODO(pabloem): Get the name for the PTransform - 'uniqueName': 'todouniquename', - 'spec': runnerApi.FunctionSpec.create({ - 'urn': translations.DATA_INPUT_URN, - 'payload': translations.IMPULSE_BUFFER - }), - 'outputs': {'out': pcollName} - }); - - return new PCollection(impulseProto.outputs.out, outputProto, input.pipeline); +export class Impulse extends PTransform { + // static urn: string = runnerApi.StandardPTransforms_Primitives.IMPULSE.urn; + // TODO: use above line, not below line. + static urn: string = "beam:transform:impulse:v1"; + expand(input: Pipeline): PCollection { + if (!input.isPipeline()) { + throw new Error("User is attempting to apply Impulse transform to a non-pipeline object."); } + const pipeline = input as Pipeline; + + const pcollName = pcollectionName(); + + const coderId = translations.registerPipelineCoder(runnerApi.Coder.create({'spec': runnerApi.FunctionSpec.create({'urn': BytesCoder.URN})}), pipeline.proto.components!); + pipeline.coders[coderId] = new BytesCoder(); + + const outputProto = runnerApi.PCollection.create({ + 'uniqueName': pcollName, + 'coderId': coderId, + 'isBounded': runnerApi.IsBounded_Enum.BOUNDED + }); + + const impulseProto = runnerApi.PTransform.create({ + // TODO(pabloem): Get the name for the PTransform + 'uniqueName': transformName(), + 'spec': runnerApi.FunctionSpec.create({ + 'urn': translations.DATA_INPUT_URN, + 'payload': translations.IMPULSE_BUFFER + }), + 'outputs': {'out': pcollName} + }); + input.proto.components!.transforms[impulseProto.uniqueName] = impulseProto; + + return new PCollection(impulseProto.outputs.out, outputProto, input.pipeline); } - - /** - * @returns true if the input is a `DoFn`. - * - * Since type information is lost at runtime, we check the object's attributes - * to determine whether it's a DoFn or not. - * - * @example - * Prints "true" for a new `DoFn` but "false" for a function: - * ```ts - * console.log(new DoFn()); - * console.log(in => in * 2)); - * ``` - * @param callableOrDoFn - * @returns - */ - function isDoFn(callableOrDoFn: DoFn | GenericCallable) { - const df = (callableOrDoFn as DoFn) - if (df.type !== undefined && df.type === "dofn") { - return true; - } else { - return false; - } +} + +/** + * @returns true if the input is a `DoFn`. + * + * Since type information is lost at runtime, we check the object's attributes + * to determine whether it's a DoFn or not. + * + * @example + * Prints "true" for a new `DoFn` but "false" for a function: + * ```ts + * console.log(new DoFn()); + * console.log(in => in * 2)); + * ``` + * @param callableOrDoFn + * @returns + */ + function isDoFn(callableOrDoFn: DoFn | GenericCallable) { + const df = (callableOrDoFn as DoFn) + if (df.type !== undefined && df.type === "dofn") { + return true; + } else { + return false; } - - export class ParDo extends PTransform { - static _CallableWrapperDoFn = class extends DoFn { - private fn; - constructor(fn: GenericCallable) { - super(); - this.fn = fn; - } - process(element: any) { - return this.fn(element); - } +} + +export class ParDo extends PTransform { + static _CallableWrapperDoFn = class extends DoFn { + private fn; + constructor(fn: GenericCallable) { + super(); + this.fn = fn; } - - private doFn; - // static urn: string = runnerApi.StandardPTransforms_Primitives.PAR_DO.urn; - // TODO: use above line, not below line. - static urn: string = "beam:transform:pardo:v1"; - constructor(callableOrDoFn: DoFn | GenericCallable) { - super() - if (isDoFn(callableOrDoFn)) { - this.doFn = callableOrDoFn; - } else { - this.doFn = new ParDo._CallableWrapperDoFn(callableOrDoFn as GenericCallable); - } + process(element: any) { + return this.fn(element); } - - expand(input: PCollection): PCollection { - - if (input.type !== 'pcollection') { - throw new Error('ParDo received the wrong input.'); - } - - const pcollName = pcollectionName(); - - const outputProto = runnerApi.PCollection.create({ - 'uniqueName': pcollName, - 'coderId': BytesCoder.URN, // TODO: Get coder URN - 'isBounded': runnerApi.IsBounded_Enum.BOUNDED - }); - - const inputPCollName = (input as PCollection).proto.uniqueName; - - // TODO(pabloem): Get the name for the PTransform - const pardoName = 'todouniquename'; - const inputId = pardoName + '1'; - - const pardoProto = runnerApi.PTransform.create({ - 'uniqueName': pardoName, - 'spec': runnerApi.FunctionSpec.create({ - 'urn': ParDo.urn, - 'payload': runnerApi.ParDoPayload.toBinary( - runnerApi.ParDoPayload.create({ - 'doFn': runnerApi.FunctionSpec.create({ - 'urn': translations.SERIALIZED_JS_DOFN_INFO, - 'payload': new Uint8Array() - }) - })) - }), - 'inputs': {inputId: inputPCollName}, - 'outputs': {'out': pcollName} - }); - - // TODO(pablom): Do this properly - return new PCollection(pardoProto.outputs.out, outputProto, input.pipeline); + } + + private doFn; + // static urn: string = runnerApi.StandardPTransforms_Primitives.PAR_DO.urn; + // TODO: use above line, not below line. + static urn: string = "beam:transform:pardo:v1"; + constructor(callableOrDoFn: DoFn | GenericCallable) { + super() + if (isDoFn(callableOrDoFn)) { + this.doFn = callableOrDoFn; + } else { + this.doFn = new ParDo._CallableWrapperDoFn(callableOrDoFn as GenericCallable); } } - - // TODO(pabloem): Consider not exporting the GBK - export class GroupByKey extends PTransform { - // static urn: string = runnerApi.StandardPTransforms_Primitives.GROUP_BY_KEY.urn; - // TODO: use above line, not below line. - static urn: string = "beam:transform:group_by_key:v1"; - - expand(input: PCollection): PCollection { - const inputPCollectionProto: runnerApi.PCollection = input.type == 'pcollection' ? (input as PCollection).proto : undefined!; - if (inputPCollectionProto === undefined) { - throw new Error('Input is not a PCollection object.'); - } - - const pipelineComponents: runnerApi.Components = input.pipeline.proto.components!; - - const keyCoderId = pipelineComponents.coders[inputPCollectionProto.coderId].componentCoderIds[0]; - const valueCoderId = pipelineComponents.coders[inputPCollectionProto.coderId].componentCoderIds[1]; - - const iterableValueCoderProto = runnerApi.Coder.create({ - 'spec': {'urn': IterableCoder.URN,}, - 'componentCoderIds': [valueCoderId] - }); - const iterableValueCoderId = translations.registerPipelineCoder(iterableValueCoderProto, pipelineComponents)!; - const iterableValueCoder = new IterableCoder(input.pipeline.coders[valueCoderId]); - input.pipeline.coders[iterableValueCoderId] = iterableValueCoder; - - const outputCoderProto = runnerApi.Coder.create({ - 'spec': runnerApi.FunctionSpec.create({'urn': KVCoder.URN}), - 'componentCoderIds': [keyCoderId, iterableValueCoderId] - }) - const outputPcollCoderId = translations.registerPipelineCoder(outputCoderProto, pipelineComponents)!; - - const outputPCollectionProto = runnerApi.PCollection.create({ - 'uniqueName': pcollectionName(), - 'isBounded': inputPCollectionProto.isBounded, - 'coderId': outputPcollCoderId - }); - pipelineComponents.pcollections[outputPCollectionProto.uniqueName] = outputPCollectionProto; - - const ptransformProto = runnerApi.PTransform.create({ - 'uniqueName': 'TODO NAME', - 'spec': runnerApi.FunctionSpec.create({ - 'urn': GroupByKey.urn, - 'payload': null! // TODO(GBK payload????) - }), - 'outputs': {'out': outputPCollectionProto.uniqueName} - }); - pipelineComponents.transforms[ptransformProto.uniqueName] = ptransformProto; - - input.pipeline.coders[outputPcollCoderId] = new KVCoder( - input.pipeline.coders[keyCoderId], - input.pipeline.coders[iterableValueCoderId]); - - return new PCollection(outputPCollectionProto.uniqueName, outputPCollectionProto, input.pipeline); + + expand(input: PCollection): PCollection { + + if (input.type !== 'pcollection') { + throw new Error('ParDo received the wrong input.'); } - } \ No newline at end of file + + const pcollName = pcollectionName(); + // TODO(paboem): How do we infer the proper coder for this transform?. For now we use the same as input. + const inputCoderProto = input.pipeline.proto.components?.coders[input.proto.coderId]!; + const outputCoderId = translations.registerPipelineCoder( + inputCoderProto, + input.pipeline.proto.components! + ); + input.pipeline.coders[outputCoderId] = new BytesCoder(); + + const outputProto = runnerApi.PCollection.create({ + 'uniqueName': pcollName, + 'coderId': outputCoderId, + 'isBounded': runnerApi.IsBounded_Enum.BOUNDED + }); + + const inputPCollName = (input as PCollection).proto.uniqueName; + + // TODO(pabloem): Get the name for the PTransform + const pardoName = transformName(); + const inputId = pardoName + '1'; + + const pardoProto = runnerApi.PTransform.create({ + 'uniqueName': pardoName, + 'spec': runnerApi.FunctionSpec.create({ + 'urn': ParDo.urn, + 'payload': runnerApi.ParDoPayload.toBinary( + runnerApi.ParDoPayload.create({ + 'doFn': runnerApi.FunctionSpec.create({ + 'urn': translations.SERIALIZED_JS_DOFN_INFO, + 'payload': new Uint8Array() + }) + })) + }), + 'inputs': {inputId: inputPCollName}, + 'outputs': {'out': pcollName} + }); + input.pipeline.proto.components!.transforms[pardoProto.uniqueName] = pardoProto; + + // TODO(pablom): Do this properly + return new PCollection(pardoProto.outputs.out, outputProto, input.pipeline); + } +} + +// TODO(pabloem): Consider not exporting the GBK +export class GroupByKey extends PTransform { + // static urn: string = runnerApi.StandardPTransforms_Primitives.GROUP_BY_KEY.urn; + // TODO: use above line, not below line. + static urn: string = "beam:transform:group_by_key:v1"; + + expand(input: PCollection): PCollection { + const inputPCollectionProto: runnerApi.PCollection = input.type == 'pcollection' ? (input as PCollection).proto : undefined!; + if (inputPCollectionProto === undefined) { + throw new Error('Input is not a PCollection object.'); + } + + const pipelineComponents: runnerApi.Components = input.pipeline.proto.components!; + + const keyCoderId = pipelineComponents.coders[inputPCollectionProto.coderId].componentCoderIds[0]; + const valueCoderId = pipelineComponents.coders[inputPCollectionProto.coderId].componentCoderIds[1]; + + const iterableValueCoderProto = runnerApi.Coder.create({ + 'spec': {'urn': IterableCoder.URN,}, + 'componentCoderIds': [valueCoderId] + }); + const iterableValueCoderId = translations.registerPipelineCoder(iterableValueCoderProto, pipelineComponents)!; + const iterableValueCoder = new IterableCoder(input.pipeline.coders[valueCoderId]); + input.pipeline.coders[iterableValueCoderId] = iterableValueCoder; + + const outputCoderProto = runnerApi.Coder.create({ + 'spec': runnerApi.FunctionSpec.create({'urn': KVCoder.URN}), + 'componentCoderIds': [keyCoderId, iterableValueCoderId] + }) + const outputPcollCoderId = translations.registerPipelineCoder(outputCoderProto, pipelineComponents)!; + + const outputPCollectionProto = runnerApi.PCollection.create({ + 'uniqueName': pcollectionName(), + 'isBounded': inputPCollectionProto.isBounded, + 'coderId': outputPcollCoderId + }); + pipelineComponents.pcollections[outputPCollectionProto.uniqueName] = outputPCollectionProto; + + const ptransformProto = runnerApi.PTransform.create({ + 'uniqueName': transformName(), + 'spec': runnerApi.FunctionSpec.create({ + 'urn': GroupByKey.urn, + 'payload': null! // TODO(GBK payload????) + }), + 'outputs': {'out': outputPCollectionProto.uniqueName} + }); + pipelineComponents.transforms[ptransformProto.uniqueName] = ptransformProto; + + input.pipeline.coders[outputPcollCoderId] = new KVCoder( + input.pipeline.coders[keyCoderId], + input.pipeline.coders[iterableValueCoderId]); + + return new PCollection(outputPCollectionProto.uniqueName, outputPCollectionProto, input.pipeline); + } +} \ No newline at end of file diff --git a/sdks/node-ts/src/apache_beam/transforms/core.ts b/sdks/node-ts/src/apache_beam/transforms/core.ts index 76e26d23bb8e..ceb66a8bcb25 100644 --- a/sdks/node-ts/src/apache_beam/transforms/core.ts +++ b/sdks/node-ts/src/apache_beam/transforms/core.ts @@ -25,11 +25,17 @@ export class GroupBy extends PTransform { const keyCoderId = translations.registerPipelineCoder( runnerApi.Coder.create({'spec': runnerApi.FunctionSpec.create({'urn': BytesCoder.URN}),}), input.pipeline.proto.components!); + input.pipeline.coders[keyCoderId] = new BytesCoder(); - const kvCoderProto = runnerApi.Coder.create({'spec': runnerApi.FunctionSpec.create({'urn': KVCoder.URN})}) + const kvCoderProto = runnerApi.Coder.create({ + 'spec': runnerApi.FunctionSpec.create({'urn': KVCoder.URN}), + 'componentCoderIds': [keyCoderId, inputCoderId] + }) const kvCoderId = translations.registerPipelineCoder(kvCoderProto, input.pipeline.proto.components!); kvPcoll.proto.coderId = kvCoderId; + const kvCoder = new KVCoder(input.pipeline.coders[keyCoderId], input.pipeline.coders[input.proto.coderId]); + input.pipeline.coders[kvCoderId] = kvCoder; return kvPcoll.apply(new GroupByKey()); } diff --git a/sdks/node-ts/test/primitives_test.ts b/sdks/node-ts/test/primitives_test.ts index bf9d6df8003d..0974bffe8113 100644 --- a/sdks/node-ts/test/primitives_test.ts +++ b/sdks/node-ts/test/primitives_test.ts @@ -1,6 +1,6 @@ import * as beam from '../src/apache_beam'; import * as assert from 'assert'; -import { BytesCoder, KVCoder } from '../src/apache_beam/coders/standard_coders'; +import { BytesCoder, IterableCoder, KVCoder } from '../src/apache_beam/coders/standard_coders'; import {GroupBy} from '../src/apache_beam/transforms/core' // TODO(pabloem): Fix installation. @@ -16,19 +16,21 @@ describe("primitives module", function() { it("runs a ParDo expansion", function() { var p = new beam.Pipeline(); var res = p.apply(new beam.Impulse()) - .apply(new beam.ParDo(function(v) {return v*2;})); + .apply(new beam.ParDo(function(v) {return v*2;})) + .apply(new beam.ParDo(function(v) {return v*4;})); + const coder = p.coders[res.proto.coderId]; + assert.deepEqual(coder, new BytesCoder()); assert.equal(res.type, "pcollection"); }); - // it("runs a GroupBy expansion", function() { - // var p = new beam.Pipeline(); - // var res = p.apply(new beam.Impulse()) - // .apply(new beam.ParDo(function(v) {return {"name": "pablo", "lastName": "wat"};})) - // .apply(new GroupBy("lastName")); - - // const coder = p.coders[res.proto.coderId]; + it("runs a GroupBy expansion", function() { + var p = new beam.Pipeline(); + var res = p.apply(new beam.Impulse()) + .apply(new beam.ParDo(function(v) {return {"name": "pablo", "lastName": "wat"};})) + .apply(new GroupBy("lastName")); - // assert.deepEqual(coder, new KVCoder(new BytesCoder(), new BytesCoder())); - // }); + const coder = p.coders[res.proto.coderId]; + assert.deepEqual(coder, new KVCoder(new BytesCoder(), new IterableCoder(new BytesCoder()))); + }); }); }); \ No newline at end of file