Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update the PTransform and associated APIs to be less class-based. #17699

Merged
merged 4 commits into from
May 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions sdks/typescript/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ used in Python. These can be "ordinary" javascript objects (which are passed
as is) or special DoFnParam objects which provide getters to element-specific
information (such as the current timestamp, window, or side input) at runtime.

* Rather than introduce multiple-output complexity into the map/do operations
themselves, producing multiple outputs is done by following with a new
`Split` primitive that takes a
`PCollection<{a?: AType, b: BType, ... }>` and produces an object
`{a: PCollection<AType>, b: PCollection<BType>, ...}`.

* Javascript supports (and encourages) an asynchronous programing model, with
many libraries requiring use of the async/await paradigm.
As there is no way (by design) to go from the asyncronous style back to
Expand Down
28 changes: 24 additions & 4 deletions sdks/typescript/src/apache_beam/coders/coders.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,36 @@ interface Class<T> {
*/
class CoderRegistry {
internal_registry = {};
get(urn: string): Class<Coder<unknown>> {
const constructor: Class<Coder<unknown>> = this.internal_registry[urn];

getCoder(
urn: string,
payload: Uint8Array | undefined = undefined,
...components: Coder<unknown>[]
) {
const constructor: (...args) => Coder<unknown> =
this.internal_registry[urn];
if (constructor === undefined) {
throw new Error("Could not find coder for URN " + urn);
}
return constructor;
if (payload && payload.length > 0) {
return constructor(payload, ...components);
} else {
return constructor(...components);
}
}

// TODO: Figure out how to branch on constructors (called with new) and
// ordinary functions.
register(urn: string, coderClass: Class<Coder<any>>) {
this.internal_registry[urn] = coderClass;
this.registerClass(urn, coderClass);
}

registerClass(urn: string, coderClass: Class<Coder<any>>) {
this.registerConstructor(urn, (...args) => new coderClass(...args));
}

registerConstructor(urn: string, constructor: (...args) => Coder<any>) {
this.internal_registry[urn] = constructor;
}
}

Expand Down
19 changes: 3 additions & 16 deletions sdks/typescript/src/apache_beam/examples/wordcount.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,34 +34,21 @@ import * as yargs from "yargs";

import * as beam from "../../apache_beam";
import { createRunner } from "../runners/runner";

import { count } from "../transforms/combiners";
import { GroupBy } from "../transforms/group_and_combine";

class CountElements extends beam.PTransform<
beam.PCollection<any>,
beam.PCollection<any>
> {
expand(input: beam.PCollection<any>) {
return input
.map((e) => ({ element: e }))
.apply(new GroupBy("element").combining("element", count, "count"));
}
}
import { countPerElement } from "../transforms/group_and_combine";

function wordCount(lines: beam.PCollection<string>): beam.PCollection<any> {
return lines
.map((s: string) => s.toLowerCase())
.flatMap(function* (line: string) {
yield* line.split(/[^a-z]+/);
})
.apply(new CountElements("Count"));
.apply(countPerElement());
}

async function main() {
await createRunner(yargs.argv).run((root) => {
const lines = root.apply(
new beam.Create([
beam.create([
"In the beginning God created the heaven and the earth.",
"And the earth was without form, and void; and darkness was upon the face of the deep.",
"And the Spirit of God moved upon the face of the waters.",
Expand Down
10 changes: 4 additions & 6 deletions sdks/typescript/src/apache_beam/examples/wordcount_sql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,21 @@ import * as textio from "../io/textio";

import { PortableRunner } from "../runners/portable_runner/runner";

import * as internal from "../../apache_beam/transforms/internal";
import { RowCoder } from "../coders/row_coder";
import { SqlTransform } from "../transforms/sql";
import { sqlTransform } from "../transforms/sql";

async function main() {
// python apache_beam/runners/portability/local_job_service_main.py --port 3333
await new PortableRunner({
environmentType: "LOOPBACK",
jobEndpoint: "localhost:3333",
}).run(async (root) => {
const lines = root.apply(new beam.Create(["a", "b", "c", "c"]));
const lines = root.apply(beam.create(["a", "b", "c", "c"]));

const filtered = await lines
.map((w) => ({ word: w }))
.apply(new internal.WithCoderInternal(RowCoder.fromJSON({ word: "str" })))
.apply(beam.withRowCoder({ word: "str" }))
.asyncApply(
new SqlTransform(
sqlTransform(
"SELECT word, count(*) as c from PCOLLECTION group by word"
)
);
Expand Down
19 changes: 3 additions & 16 deletions sdks/typescript/src/apache_beam/examples/wordcount_textio.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,37 +21,24 @@
import * as beam from "../../apache_beam";
import * as textio from "../io/textio";
import { DirectRunner } from "../runners/direct_runner";

import { count } from "../transforms/combiners";
import { GroupBy } from "../transforms/group_and_combine";

import { PortableRunner } from "../runners/portable_runner/runner";

class CountElements extends beam.PTransform<
beam.PCollection<any>,
beam.PCollection<any>
> {
expand(input: beam.PCollection<any>) {
return input
.map((e) => ({ element: e }))
.apply(new GroupBy("element").combining("element", count, "count"));
}
}
import { countPerElement } from "../transforms/group_and_combine";

function wordCount(lines: beam.PCollection<string>): beam.PCollection<any> {
return lines
.map((s: string) => s.toLowerCase())
.flatMap(function* (line: string) {
yield* line.split(/[^a-z]+/);
})
.apply(new CountElements("Count"));
.apply(countPerElement());
}

async function main() {
// python apache_beam/runners/portability/local_job_service_main.py --port 3333
await new PortableRunner("localhost:3333").run(async (root) => {
const lines = await root.asyncApply(
new textio.ReadFromText("gs://dataflow-samples/shakespeare/kinglear.txt")
textio.readFromText("gs://dataflow-samples/shakespeare/kinglear.txt")
);

lines.apply(wordCount).map(console.log);
Expand Down
36 changes: 16 additions & 20 deletions sdks/typescript/src/apache_beam/internal/pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ import equal from "fast-deep-equal";
import * as runnerApi from "../proto/beam_runner_api";
import * as fnApi from "../proto/beam_fn_api";
import {
PTransform,
AsyncPTransform,
PTransformClass,
AsyncPTransformClass,
extractName,
} from "../transforms/transform";
import { GlobalWindows } from "../transforms/windowings";
import { globalWindows } from "../transforms/windowings";
import * as pvalue from "../pvalue";
import { WindowInto } from "../transforms/window";
import { createWindowingStrategyProto } from "../transforms/window";
import * as environments from "./environments";
import { Coder, globalRegistry as globalCoderRegistry } from "../coders/coders";

Expand All @@ -45,18 +45,14 @@ export class PipelineContext {
const this_ = this;
if (this.coders[coderId] == undefined) {
const coderProto = this.components.coders[coderId];
const coderConstructor = globalCoderRegistry().get(coderProto.spec!.urn);
const components = (coderProto.componentCoderIds || []).map(
this_.getCoder.bind(this_)
const components: Coder<unknown>[] = (
coderProto.componentCoderIds || []
).map(this_.getCoder.bind(this_));
this.coders[coderId] = globalCoderRegistry().getCoder(
coderProto.spec!.urn,
coderProto.spec!.payload,
...components
);
if (coderProto.spec!.payload?.length) {
this.coders[coderId] = new coderConstructor(
coderProto.spec!.payload,
...components
);
} else {
this.coders[coderId] = new coderConstructor(...components);
}
}
return this.coders[coderId];
}
Expand Down Expand Up @@ -132,13 +128,13 @@ export class Pipeline {
environments.defaultJsEnvironment();
this.context = new PipelineContext(this.proto.components!);
this.proto.components!.windowingStrategies[this.globalWindowing] =
WindowInto.createWindowingStrategy(this, new GlobalWindows());
createWindowingStrategyProto(this, globalWindows());
}

preApplyTransform<
InputT extends pvalue.PValue<any>,
OutputT extends pvalue.PValue<any>
>(transform: AsyncPTransform<InputT, OutputT>, input: InputT) {
>(transform: AsyncPTransformClass<InputT, OutputT>, input: InputT) {
const this_ = this;
const transformId = this.context.createUniqueName("transform");
let parent: runnerApi.PTransform | undefined = undefined;
Expand Down Expand Up @@ -168,7 +164,7 @@ export class Pipeline {
applyTransform<
InputT extends pvalue.PValue<any>,
OutputT extends pvalue.PValue<any>
>(transform: PTransform<InputT, OutputT>, input: InputT) {
>(transform: PTransformClass<InputT, OutputT>, input: InputT) {
const { id: transformId, proto: transformProto } = this.preApplyTransform(
transform,
input
Expand All @@ -186,7 +182,7 @@ export class Pipeline {
async asyncApplyTransform<
InputT extends pvalue.PValue<any>,
OutputT extends pvalue.PValue<any>
>(transform: AsyncPTransform<InputT, OutputT>, input: InputT) {
>(transform: AsyncPTransformClass<InputT, OutputT>, input: InputT) {
const { id: transformId, proto: transformProto } = this.preApplyTransform(
transform,
input
Expand All @@ -205,7 +201,7 @@ export class Pipeline {
InputT extends pvalue.PValue<any>,
OutputT extends pvalue.PValue<any>
>(
transform: AsyncPTransform<InputT, OutputT>,
transform: AsyncPTransformClass<InputT, OutputT>,
transformProto: runnerApi.PTransform,
result: OutputT
) {
Expand Down
1 change: 0 additions & 1 deletion sdks/typescript/src/apache_beam/internal/urns.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ export const IDENTITY_DOFN_URN = "beam:dofn:identity:0.1";

export const SERIALIZED_JS_DOFN_INFO = "beam:dofn:serialized_js_dofn_info:v1";
export const SPLITTING_JS_DOFN_URN = "beam:dofn:splitting_dofn:v1";
export const SPLITTING2_JS_DOFN_URN = "beam:dofn:splitting2_dofn:v1";
export const JS_WINDOW_INTO_DOFN_URN = "beam:dofn:js_window_into:v1";
export const JS_ASSIGN_TIMESTAMPS_DOFN_URN =
"beam:dofn:js_assign_timestamps:v1";
Expand Down
22 changes: 7 additions & 15 deletions sdks/typescript/src/apache_beam/io/textio.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,20 @@
import * as beam from "../../apache_beam";
import * as external from "../transforms/external";

export class ReadFromText extends beam.AsyncPTransform<
beam.Root,
beam.PCollection<string>
> {
constructor(private filePattern: string) {
super();
}

async asyncExpand(root: beam.Root) {
export function readFromText(
filePattern: string
): beam.AsyncPTransform<beam.Root, beam.PCollection<string>> {
return async function readFromText(root: beam.Root) {
return await root.asyncApply(
new external.RawExternalTransform<
beam.PValue<any>,
beam.PCollection<any>
>(
external.rawExternalTransform<beam.Root, beam.PCollection<any>>(
"beam:transforms:python:fully_qualified_named",
{
constructor: "apache_beam.io.ReadFromText",
kwargs: { file_pattern: this.filePattern },
kwargs: { file_pattern: filePattern },
},
// python apache_beam/runners/portability/expansion_service_main.py --fully_qualified_name_glob='*' --port 4444 --environment_type='beam:env:embedded_python:v1'
"localhost:4444"
)
);
}
};
}
Loading