Skip to content

Commit

Permalink
Update the PTransform and associated APIs to be less class-based. (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
robertwb authored May 19, 2022
1 parent 44274ae commit 6e5ca0c
Show file tree
Hide file tree
Showing 27 changed files with 745 additions and 861 deletions.
6 changes: 6 additions & 0 deletions sdks/typescript/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ used in Python. These can be "ordinary" javascript objects (which are passed
as is) or special DoFnParam objects which provide getters to element-specific
information (such as the current timestamp, window, or side input) at runtime.

* Rather than introduce multiple-output complexity into the map/do operations
themselves, producing multiple outputs is done by following with a new
`Split` primitive that takes a
`PCollection<{a?: AType, b: BType, ... }>` and produces an object
`{a: PCollection<AType>, b: PCollection<BType>, ...}`.

* Javascript supports (and encourages) an asynchronous programing model, with
many libraries requiring use of the async/await paradigm.
As there is no way (by design) to go from the asyncronous style back to
Expand Down
28 changes: 24 additions & 4 deletions sdks/typescript/src/apache_beam/coders/coders.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,36 @@ interface Class<T> {
*/
class CoderRegistry {
internal_registry = {};
get(urn: string): Class<Coder<unknown>> {
const constructor: Class<Coder<unknown>> = this.internal_registry[urn];

getCoder(
urn: string,
payload: Uint8Array | undefined = undefined,
...components: Coder<unknown>[]
) {
const constructor: (...args) => Coder<unknown> =
this.internal_registry[urn];
if (constructor === undefined) {
throw new Error("Could not find coder for URN " + urn);
}
return constructor;
if (payload && payload.length > 0) {
return constructor(payload, ...components);
} else {
return constructor(...components);
}
}

// TODO: Figure out how to branch on constructors (called with new) and
// ordinary functions.
register(urn: string, coderClass: Class<Coder<any>>) {
this.internal_registry[urn] = coderClass;
this.registerClass(urn, coderClass);
}

registerClass(urn: string, coderClass: Class<Coder<any>>) {
this.registerConstructor(urn, (...args) => new coderClass(...args));
}

registerConstructor(urn: string, constructor: (...args) => Coder<any>) {
this.internal_registry[urn] = constructor;
}
}

Expand Down
19 changes: 3 additions & 16 deletions sdks/typescript/src/apache_beam/examples/wordcount.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,34 +34,21 @@ import * as yargs from "yargs";

import * as beam from "../../apache_beam";
import { createRunner } from "../runners/runner";

import { count } from "../transforms/combiners";
import { GroupBy } from "../transforms/group_and_combine";

class CountElements extends beam.PTransform<
beam.PCollection<any>,
beam.PCollection<any>
> {
expand(input: beam.PCollection<any>) {
return input
.map((e) => ({ element: e }))
.apply(new GroupBy("element").combining("element", count, "count"));
}
}
import { countPerElement } from "../transforms/group_and_combine";

function wordCount(lines: beam.PCollection<string>): beam.PCollection<any> {
return lines
.map((s: string) => s.toLowerCase())
.flatMap(function* (line: string) {
yield* line.split(/[^a-z]+/);
})
.apply(new CountElements("Count"));
.apply(countPerElement());
}

async function main() {
await createRunner(yargs.argv).run((root) => {
const lines = root.apply(
new beam.Create([
beam.create([
"In the beginning God created the heaven and the earth.",
"And the earth was without form, and void; and darkness was upon the face of the deep.",
"And the Spirit of God moved upon the face of the waters.",
Expand Down
10 changes: 4 additions & 6 deletions sdks/typescript/src/apache_beam/examples/wordcount_sql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,21 @@ import * as textio from "../io/textio";

import { PortableRunner } from "../runners/portable_runner/runner";

import * as internal from "../../apache_beam/transforms/internal";
import { RowCoder } from "../coders/row_coder";
import { SqlTransform } from "../transforms/sql";
import { sqlTransform } from "../transforms/sql";

async function main() {
// python apache_beam/runners/portability/local_job_service_main.py --port 3333
await new PortableRunner({
environmentType: "LOOPBACK",
jobEndpoint: "localhost:3333",
}).run(async (root) => {
const lines = root.apply(new beam.Create(["a", "b", "c", "c"]));
const lines = root.apply(beam.create(["a", "b", "c", "c"]));

const filtered = await lines
.map((w) => ({ word: w }))
.apply(new internal.WithCoderInternal(RowCoder.fromJSON({ word: "str" })))
.apply(beam.withRowCoder({ word: "str" }))
.asyncApply(
new SqlTransform(
sqlTransform(
"SELECT word, count(*) as c from PCOLLECTION group by word"
)
);
Expand Down
19 changes: 3 additions & 16 deletions sdks/typescript/src/apache_beam/examples/wordcount_textio.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,37 +21,24 @@
import * as beam from "../../apache_beam";
import * as textio from "../io/textio";
import { DirectRunner } from "../runners/direct_runner";

import { count } from "../transforms/combiners";
import { GroupBy } from "../transforms/group_and_combine";

import { PortableRunner } from "../runners/portable_runner/runner";

class CountElements extends beam.PTransform<
beam.PCollection<any>,
beam.PCollection<any>
> {
expand(input: beam.PCollection<any>) {
return input
.map((e) => ({ element: e }))
.apply(new GroupBy("element").combining("element", count, "count"));
}
}
import { countPerElement } from "../transforms/group_and_combine";

function wordCount(lines: beam.PCollection<string>): beam.PCollection<any> {
return lines
.map((s: string) => s.toLowerCase())
.flatMap(function* (line: string) {
yield* line.split(/[^a-z]+/);
})
.apply(new CountElements("Count"));
.apply(countPerElement());
}

async function main() {
// python apache_beam/runners/portability/local_job_service_main.py --port 3333
await new PortableRunner("localhost:3333").run(async (root) => {
const lines = await root.asyncApply(
new textio.ReadFromText("gs://dataflow-samples/shakespeare/kinglear.txt")
textio.readFromText("gs://dataflow-samples/shakespeare/kinglear.txt")
);

lines.apply(wordCount).map(console.log);
Expand Down
36 changes: 16 additions & 20 deletions sdks/typescript/src/apache_beam/internal/pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ import equal from "fast-deep-equal";
import * as runnerApi from "../proto/beam_runner_api";
import * as fnApi from "../proto/beam_fn_api";
import {
PTransform,
AsyncPTransform,
PTransformClass,
AsyncPTransformClass,
extractName,
} from "../transforms/transform";
import { GlobalWindows } from "../transforms/windowings";
import { globalWindows } from "../transforms/windowings";
import * as pvalue from "../pvalue";
import { WindowInto } from "../transforms/window";
import { createWindowingStrategyProto } from "../transforms/window";
import * as environments from "./environments";
import { Coder, globalRegistry as globalCoderRegistry } from "../coders/coders";

Expand All @@ -45,18 +45,14 @@ export class PipelineContext {
const this_ = this;
if (this.coders[coderId] == undefined) {
const coderProto = this.components.coders[coderId];
const coderConstructor = globalCoderRegistry().get(coderProto.spec!.urn);
const components = (coderProto.componentCoderIds || []).map(
this_.getCoder.bind(this_)
const components: Coder<unknown>[] = (
coderProto.componentCoderIds || []
).map(this_.getCoder.bind(this_));
this.coders[coderId] = globalCoderRegistry().getCoder(
coderProto.spec!.urn,
coderProto.spec!.payload,
...components
);
if (coderProto.spec!.payload?.length) {
this.coders[coderId] = new coderConstructor(
coderProto.spec!.payload,
...components
);
} else {
this.coders[coderId] = new coderConstructor(...components);
}
}
return this.coders[coderId];
}
Expand Down Expand Up @@ -132,13 +128,13 @@ export class Pipeline {
environments.defaultJsEnvironment();
this.context = new PipelineContext(this.proto.components!);
this.proto.components!.windowingStrategies[this.globalWindowing] =
WindowInto.createWindowingStrategy(this, new GlobalWindows());
createWindowingStrategyProto(this, globalWindows());
}

preApplyTransform<
InputT extends pvalue.PValue<any>,
OutputT extends pvalue.PValue<any>
>(transform: AsyncPTransform<InputT, OutputT>, input: InputT) {
>(transform: AsyncPTransformClass<InputT, OutputT>, input: InputT) {
const this_ = this;
const transformId = this.context.createUniqueName("transform");
let parent: runnerApi.PTransform | undefined = undefined;
Expand Down Expand Up @@ -168,7 +164,7 @@ export class Pipeline {
applyTransform<
InputT extends pvalue.PValue<any>,
OutputT extends pvalue.PValue<any>
>(transform: PTransform<InputT, OutputT>, input: InputT) {
>(transform: PTransformClass<InputT, OutputT>, input: InputT) {
const { id: transformId, proto: transformProto } = this.preApplyTransform(
transform,
input
Expand All @@ -186,7 +182,7 @@ export class Pipeline {
async asyncApplyTransform<
InputT extends pvalue.PValue<any>,
OutputT extends pvalue.PValue<any>
>(transform: AsyncPTransform<InputT, OutputT>, input: InputT) {
>(transform: AsyncPTransformClass<InputT, OutputT>, input: InputT) {
const { id: transformId, proto: transformProto } = this.preApplyTransform(
transform,
input
Expand All @@ -205,7 +201,7 @@ export class Pipeline {
InputT extends pvalue.PValue<any>,
OutputT extends pvalue.PValue<any>
>(
transform: AsyncPTransform<InputT, OutputT>,
transform: AsyncPTransformClass<InputT, OutputT>,
transformProto: runnerApi.PTransform,
result: OutputT
) {
Expand Down
1 change: 0 additions & 1 deletion sdks/typescript/src/apache_beam/internal/urns.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ export const IDENTITY_DOFN_URN = "beam:dofn:identity:0.1";

export const SERIALIZED_JS_DOFN_INFO = "beam:dofn:serialized_js_dofn_info:v1";
export const SPLITTING_JS_DOFN_URN = "beam:dofn:splitting_dofn:v1";
export const SPLITTING2_JS_DOFN_URN = "beam:dofn:splitting2_dofn:v1";
export const JS_WINDOW_INTO_DOFN_URN = "beam:dofn:js_window_into:v1";
export const JS_ASSIGN_TIMESTAMPS_DOFN_URN =
"beam:dofn:js_assign_timestamps:v1";
Expand Down
22 changes: 7 additions & 15 deletions sdks/typescript/src/apache_beam/io/textio.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,20 @@
import * as beam from "../../apache_beam";
import * as external from "../transforms/external";

export class ReadFromText extends beam.AsyncPTransform<
beam.Root,
beam.PCollection<string>
> {
constructor(private filePattern: string) {
super();
}

async asyncExpand(root: beam.Root) {
export function readFromText(
filePattern: string
): beam.AsyncPTransform<beam.Root, beam.PCollection<string>> {
return async function readFromText(root: beam.Root) {
return await root.asyncApply(
new external.RawExternalTransform<
beam.PValue<any>,
beam.PCollection<any>
>(
external.rawExternalTransform<beam.Root, beam.PCollection<any>>(
"beam:transforms:python:fully_qualified_named",
{
constructor: "apache_beam.io.ReadFromText",
kwargs: { file_pattern: this.filePattern },
kwargs: { file_pattern: filePattern },
},
// python apache_beam/runners/portability/expansion_service_main.py --fully_qualified_name_glob='*' --port 4444 --environment_type='beam:env:embedded_python:v1'
"localhost:4444"
)
);
}
};
}
Loading

0 comments on commit 6e5ca0c

Please sign in to comment.