From 66e85dab799963368b9d210ac8b1d1d5635efcda Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Fri, 13 May 2022 15:00:42 -0700 Subject: [PATCH] Add some auto-starting runners to the typescript SDK. (#17580) Adds out-of-the-box support for FlinkRunner, DataflowRunner, and the Python Universal Local runner. Also adds a DefaultRunner which chooses between the DirectRunner and the ULR depending on the properties of the pipeline. --- .../extensions/python/bootstrap_beam_venv.py | 2 +- sdks/typescript/package-lock.json | 15 ++- sdks/typescript/package.json | 1 + .../src/apache_beam/examples/wordcount.ts | 20 +++- .../src/apache_beam/runners/dataflow.ts | 44 +++++++ .../src/apache_beam/runners/direct_runner.ts | 47 ++++++++ .../src/apache_beam/runners/flink.ts | 84 ++++++++++++++ .../runners/portable_runner/runner.ts | 85 +++++++++----- .../src/apache_beam/runners/runner.ts | 45 +++++++- .../src/apache_beam/runners/universal.ts | 41 +++++++ .../src/apache_beam/transforms/internal.ts | 2 + .../src/apache_beam/utils/service.ts | 108 +++++++++++++++--- .../typescript/src/apache_beam/worker/data.ts | 3 + .../worker/external_worker_service.ts | 47 +++++--- .../src/apache_beam/worker/worker.ts | 12 +- 15 files changed, 479 insertions(+), 77 deletions(-) create mode 100644 sdks/typescript/src/apache_beam/runners/dataflow.ts create mode 100644 sdks/typescript/src/apache_beam/runners/flink.ts create mode 100644 sdks/typescript/src/apache_beam/runners/universal.ts diff --git a/sdks/java/extensions/python/src/main/resources/org/apache/beam/sdk/extensions/python/bootstrap_beam_venv.py b/sdks/java/extensions/python/src/main/resources/org/apache/beam/sdk/extensions/python/bootstrap_beam_venv.py index 08120b84adba..c113676a8f74 100644 --- a/sdks/java/extensions/python/src/main/resources/org/apache/beam/sdk/extensions/python/bootstrap_beam_venv.py +++ b/sdks/java/extensions/python/src/main/resources/org/apache/beam/sdk/extensions/python/bootstrap_beam_venv.py @@ -77,7 +77,7 @@ def maybe_strict_version(s): or options.beam_version.startswith('https://')): # It's a path to a tarball. beam_version = os.path.basename(options.beam_version) - beam_package = options.beam_version + beam_package = options.beam_version + '[gcp,aws,asure,dataframe]' else: beam_version = options.beam_version beam_package = 'apache_beam[gcp,aws,asure,dataframe]==' + beam_version diff --git a/sdks/typescript/package-lock.json b/sdks/typescript/package-lock.json index 49e62618f887..c39002e60016 100644 --- a/sdks/typescript/package-lock.json +++ b/sdks/typescript/package-lock.json @@ -1,12 +1,12 @@ { "name": "apache_beam", - "version": "0.37.0.dev", + "version": "0.38.0", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "apache_beam", - "version": "0.37.0.dev", + "version": "0.38.0", "dependencies": { "@grpc/grpc-js": "^1.4.6", "@protobuf-ts/grpc-transport": "^2.1.0", @@ -16,6 +16,7 @@ "chai": "^4.3.4", "date-fns": "^2.28.0", "fast-deep-equal": "^3.1.3", + "find-git-root": "^1.0.4", "long": "^4.0.0", "protobufjs": "^6.10.2", "queue-typescript": "^1.0.1", @@ -919,6 +920,11 @@ "node": ">=8" } }, + "node_modules/find-git-root": { + "version": "1.0.4", + "resolved": "https://registry.npmjs.org/find-git-root/-/find-git-root-1.0.4.tgz", + "integrity": "sha512-468fmirKKgcrqfZfPn0xIpwZUUsZQcYXfx0RC2/jX39GPz83TwutQNZZhDrI6HqjO8cRejxQVaUY8GQdXopFfA==" + }, "node_modules/find-up": { "version": "5.0.0", "resolved": "https://registry.npmjs.org/find-up/-/find-up-5.0.0.tgz", @@ -2942,6 +2948,11 @@ "to-regex-range": "^5.0.1" } }, + "find-git-root": { + "version": "1.0.4", + "resolved": "https://registry.npmjs.org/find-git-root/-/find-git-root-1.0.4.tgz", + "integrity": "sha512-468fmirKKgcrqfZfPn0xIpwZUUsZQcYXfx0RC2/jX39GPz83TwutQNZZhDrI6HqjO8cRejxQVaUY8GQdXopFfA==" + }, "find-up": { "version": "5.0.0", "resolved": "https://registry.npmjs.org/find-up/-/find-up-5.0.0.tgz", diff --git a/sdks/typescript/package.json b/sdks/typescript/package.json index 1e13de36bfc6..6ab659ccf2e9 100644 --- a/sdks/typescript/package.json +++ b/sdks/typescript/package.json @@ -32,6 +32,7 @@ "chai": "^4.3.4", "date-fns": "^2.28.0", "fast-deep-equal": "^3.1.3", + "find-git-root": "^1.0.4", "long": "^4.0.0", "protobufjs": "^6.10.2", "queue-typescript": "^1.0.1", diff --git a/sdks/typescript/src/apache_beam/examples/wordcount.ts b/sdks/typescript/src/apache_beam/examples/wordcount.ts index d68d0f256008..961afb43e9bd 100644 --- a/sdks/typescript/src/apache_beam/examples/wordcount.ts +++ b/sdks/typescript/src/apache_beam/examples/wordcount.ts @@ -16,10 +16,24 @@ * limitations under the License. */ -// TODO: Should this be in a top-level examples dir, rather than under apache_beam. +// Run directly with +// +// node dist/src/apache_beam/examples/wordcount.js +// +// A different runner can be chosen via a --runner argument, e.g. +// +// node dist/src/apache_beam/examples/wordcount.js --runner=flink +// +// To run on Dataflow, pass the required arguments: +// +// node dist/src/apache_beam/examples/wordcount.js --runner=dataflow --project=PROJECT_ID --tempLocation=gs://BUCKET/DIR' --region=us-central1 + +// TODO: Should this be in a top-level examples dir, rather than under apache_beam? + +import * as yargs from "yargs"; import * as beam from "../../apache_beam"; -import { DirectRunner } from "../runners/direct_runner"; +import { createRunner } from "../runners/runner"; import { count } from "../transforms/combiners"; import { GroupBy } from "../transforms/group_and_combine"; @@ -45,7 +59,7 @@ function wordCount(lines: beam.PCollection): beam.PCollection { } async function main() { - await new DirectRunner().run((root) => { + await createRunner(yargs.argv).run((root) => { const lines = root.apply( new beam.Create([ "In the beginning God created the heaven and the earth.", diff --git a/sdks/typescript/src/apache_beam/runners/dataflow.ts b/sdks/typescript/src/apache_beam/runners/dataflow.ts new file mode 100644 index 000000000000..958eb99c9565 --- /dev/null +++ b/sdks/typescript/src/apache_beam/runners/dataflow.ts @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { Pipeline } from "../internal/pipeline"; +import { PipelineResult, Runner } from "./runner"; +import { PortableRunner } from "./portable_runner/runner"; +import { PythonService } from "../utils/service"; + +export function dataflowRunner(runnerOptions: { + project: string; + tempLocation: string; + region: string; + [others: string]: any; +}): Runner { + return new (class extends Runner { + async runPipeline( + pipeline: Pipeline, + options: Object = {} + ): Promise { + return new PortableRunner( + runnerOptions as any, + new PythonService("apache_beam.runners.dataflow.dataflow_job_service", [ + "--port", + "{{PORT}}", + ]) + ).runPipeline(pipeline, options); + } + })(); +} diff --git a/sdks/typescript/src/apache_beam/runners/direct_runner.ts b/sdks/typescript/src/apache_beam/runners/direct_runner.ts index a1a10621af98..ff203d06c41d 100644 --- a/sdks/typescript/src/apache_beam/runners/direct_runner.ts +++ b/sdks/typescript/src/apache_beam/runners/direct_runner.ts @@ -44,14 +44,61 @@ import { } from "../values"; import { PaneInfoCoder } from "../coders/standard_coders"; import { Coder, Context as CoderContext } from "../coders/coders"; +import * as environments from "../internal/environments"; import { serializeFn, deserializeFn } from "../internal/serialize"; +const SUPPORTED_REQUIREMENTS: string[] = []; + +export function directRunner(options: Object = {}): Runner { + return new DirectRunner(options); +} + export class DirectRunner extends Runner { // All the operators for a given pipeline should share the same state. // This global mapping allows operators to look up a shared state object for // a given pipeline on deserialization. static inMemoryStatesRefs: Map = new Map(); + constructor(private options: Object = {}) { + super(); + } + + unsupportedFeatures(pipeline, options: Object = {}): string[] { + return [...this.unsupportedFeaturesIter(pipeline, options)]; + } + + *unsupportedFeaturesIter(pipeline, options: Object = {}) { + const proto: runnerApi.Pipeline = pipeline.proto; + for (const requirement of proto.requirements) { + if (!SUPPORTED_REQUIREMENTS.includes(requirement)) { + yield requirement; + } + } + + for (const env of Object.values(proto.components!.environments)) { + if ( + env.urn && + env.urn != environments.TYPESCRIPT_DEFAULT_ENVIRONMENT_URN + ) { + yield env.urn; + } + } + + for (const windowing of Object.values( + proto.components!.windowingStrategies + )) { + if ( + ![ + runnerApi.MergeStatus_Enum.UNSPECIFIED, + runnerApi.MergeStatus_Enum.NON_MERGING, + runnerApi.MergeStatus_Enum.ALREADY_MERGED, + ].includes(windowing.mergeStatus) + ) { + yield "MergeStatus=" + windowing.mergeStatus; + } + } + } + async runPipeline(p): Promise { // console.dir(p.proto, { depth: null }); diff --git a/sdks/typescript/src/apache_beam/runners/flink.ts b/sdks/typescript/src/apache_beam/runners/flink.ts new file mode 100644 index 000000000000..4acb68e642fa --- /dev/null +++ b/sdks/typescript/src/apache_beam/runners/flink.ts @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +const fs = require("fs"); +const os = require("os"); +const path = require("path"); + +import { Pipeline } from "../internal/pipeline"; +import { PipelineResult, Runner } from "./runner"; +import { PortableRunner } from "./portable_runner/runner"; +import { JavaJarService } from "../utils/service"; + +const MAGIC_HOST_NAMES = ["[local]", "[auto]"]; + +// These should stay in sync with gradle.properties. +const PUBLISHED_FLINK_VERSIONS = ["1.12", "1.13", "1.14"]; + +const defaultOptions = { + flinkMaster: "[local]", + flinkVersion: PUBLISHED_FLINK_VERSIONS[PUBLISHED_FLINK_VERSIONS.length - 1], +}; + +export function flinkRunner(runnerOptions: Object = {}): Runner { + return new (class extends Runner { + async runPipeline( + pipeline: Pipeline, + options: Object = {} + ): Promise { + const allOptions = { + ...defaultOptions, + ...runnerOptions, + ...options, + } as any; + if ( + !allOptions.environmentType && + MAGIC_HOST_NAMES.includes(allOptions.flinkMaster) + ) { + allOptions.environmentType = "LOOPBACK"; + } + if (!allOptions.artifactsDir) { + allOptions.artifactsDir = fs.mkdtempSync( + path.join(os.tmpdir(), "flinkArtifactsDir") + ); + } + + const jobServerJar = + allOptions.flinkJobServerJar || + (await JavaJarService.cachedJar( + JavaJarService.gradleToJar( + `runners:flink:${allOptions.flinkVersion}:job-server:shadowJar` + ) + )); + const jobServer = new JavaJarService(jobServerJar, [ + "--flink-master", + allOptions.flinkMaster, + "--artifacts-dir", + allOptions.artifactsDir, + "--job-port", + "{{PORT}}", + "--artifact-port", + "0", + "--expansion-port", + "0", + ]); + + return new PortableRunner(allOptions, jobServer).runPipeline(pipeline); + } + })(); +} diff --git a/sdks/typescript/src/apache_beam/runners/portable_runner/runner.ts b/sdks/typescript/src/apache_beam/runners/portable_runner/runner.ts index 3081249539f1..f5281aa969d4 100644 --- a/sdks/typescript/src/apache_beam/runners/portable_runner/runner.ts +++ b/sdks/typescript/src/apache_beam/runners/portable_runner/runner.ts @@ -28,11 +28,12 @@ import { ArtifactStagingServiceClient } from "../../proto/beam_artifact_api.clie import { Pipeline } from "../../internal/pipeline"; import { PipelineResult, Runner } from "../runner"; import { PipelineOptions } from "../../options/pipeline_options"; -import { JobState_Enum } from "../../proto/beam_job_api"; +import { JobState_Enum, JobStateEvent } from "../../proto/beam_job_api"; import { ExternalWorkerPool } from "../../worker/external_worker_service"; import * as environments from "../../internal/environments"; import * as artifacts from "../artifacts"; +import { Service as JobService } from "../../utils/service"; const TERMINAL_STATES = [ JobState_Enum.DONE, @@ -42,19 +43,22 @@ const TERMINAL_STATES = [ JobState_Enum.DRAINED, ]; +type completionCallback = (terminalState: JobStateEvent) => Promise; + class PortableRunnerPipelineResult implements PipelineResult { jobId: string; runner: PortableRunner; - workers?: ExternalWorkerPool; + completionCallbacks: completionCallback[]; + terminalState?: JobStateEvent; constructor( runner: PortableRunner, jobId: string, - workers: ExternalWorkerPool | undefined = undefined + completionCallbacks: completionCallback[] ) { this.runner = runner; this.jobId = jobId; - this.workers = workers; + this.completionCallbacks = completionCallbacks; } static isTerminal(state: JobState_Enum) { @@ -62,13 +66,15 @@ class PortableRunnerPipelineResult implements PipelineResult { } async getState() { + if (this.terminalState) { + return this.terminalState; + } const state = await this.runner.getJobState(this.jobId); - if ( - this.workers != undefined && - PortableRunnerPipelineResult.isTerminal(state.state) - ) { - this.workers.stop(); - this.workers = undefined; + if (PortableRunnerPipelineResult.isTerminal(state.state)) { + this.terminalState = state; + for (const callback of this.completionCallbacks) { + await callback(state); + } } return state; } @@ -96,11 +102,12 @@ class PortableRunnerPipelineResult implements PipelineResult { } export class PortableRunner extends Runner { - client: JobServiceClient; + client?: JobServiceClient; defaultOptions: any; constructor( - options: string | { jobEndpoint: string; [others: string]: any } + options: string | { jobEndpoint: string; [others: string]: any }, + private jobService: JobService | undefined = undefined ) { super(); if (typeof options == "string") { @@ -108,16 +115,25 @@ export class PortableRunner extends Runner { } else if (options) { this.defaultOptions = options; } - this.client = new JobServiceClient( - new GrpcTransport({ - host: this.defaultOptions?.jobEndpoint, - channelCredentials: ChannelCredentials.createInsecure(), - }) - ); + } + + async getClient(): Promise { + if (!this.client) { + if (this.jobService) { + this.defaultOptions.jobEndpoint = await this.jobService.start(); + } + this.client = new JobServiceClient( + new GrpcTransport({ + host: this.defaultOptions?.jobEndpoint, + channelCredentials: ChannelCredentials.createInsecure(), + }) + ); + } + return this.client; } async getJobState(jobId: string) { - const call = this.client.getState({ jobId }); + const call = (await this.getClient()).getState({ jobId }); return await call.response; } @@ -138,11 +154,18 @@ export class PortableRunner extends Runner { options = { ...this.defaultOptions, ...options }; } - const use_loopback_service = - (options as any)?.environmentType == "LOOPBACK"; - const workers = use_loopback_service ? new ExternalWorkerPool() : undefined; - if (use_loopback_service) { - workers!.start(); + const completionCallbacks: completionCallback[] = []; + + if (this.jobService) { + const jobService = this.jobService; + completionCallbacks.push(() => jobService.stop()); + } + + let loopbackAddress: string | undefined = undefined; + if ((options as any)?.environmentType == "LOOPBACK") { + const workers = new ExternalWorkerPool(); + loopbackAddress = await workers.start(); + completionCallbacks.push(() => workers.stop()); } // Replace the default environment according to the pipeline options. @@ -151,9 +174,9 @@ export class PortableRunner extends Runner { pipeline.components!.environments )) { if (env.urn == environments.TYPESCRIPT_DEFAULT_ENVIRONMENT_URN) { - if (use_loopback_service) { + if (loopbackAddress) { pipeline.components!.environments[envId] = - environments.asExternalEnvironment(env, workers!.address); + environments.asExternalEnvironment(env, loopbackAddress); } else { pipeline.components!.environments[envId] = environments.asDockerEnvironment( @@ -166,6 +189,7 @@ export class PortableRunner extends Runner { } // Inform the runner that we'd like to execute this pipeline. + console.debug("Preparing job."); let message: PrepareJobRequest = { pipeline, jobName: (options as any)?.jobName || "", @@ -182,10 +206,12 @@ export class PortableRunner extends Runner { ) ); } - const prepareResponse = await this.client.prepare(message).response; + const client = await this.getClient(); + const prepareResponse = await client.prepare(message).response; // Allow the runner to fetch any artifacts it can't interpret. if (prepareResponse.artifactStagingEndpoint) { + console.debug("Staging artifacts"); await artifacts.offerArtifacts( new ArtifactStagingServiceClient( new GrpcTransport({ @@ -198,7 +224,8 @@ export class PortableRunner extends Runner { } // Actually kick off the job. - const runCall = this.client.run({ + console.debug("Running job."); + const runCall = client.run({ preparationId: prepareResponse.preparationId, retrievalToken: "", }); @@ -208,6 +235,6 @@ export class PortableRunner extends Runner { // If desired, the user can use this handle to await job completion, but // this function returns as soon as the job is successfully started, not // once the job has completed. - return new PortableRunnerPipelineResult(this, jobId, workers); + return new PortableRunnerPipelineResult(this, jobId, completionCallbacks); } } diff --git a/sdks/typescript/src/apache_beam/runners/runner.ts b/sdks/typescript/src/apache_beam/runners/runner.ts index 272ef8407836..7fc2ad794776 100644 --- a/sdks/typescript/src/apache_beam/runners/runner.ts +++ b/sdks/typescript/src/apache_beam/runners/runner.ts @@ -25,11 +25,29 @@ export interface PipelineResult { waitUntilFinish(duration?: number): Promise; } +export function createRunner(options): Runner { + let runnerConstructor: (any) => Runner; + if (options.runner == undefined || options.runner == "default") { + runnerConstructor = defaultRunner; + } else if (options.runner == "direct") { + runnerConstructor = require("./direct_runner").directRunner; + } else if (options.runner == "universal") { + runnerConstructor = require("./universal").universalRunner; + } else if (options.runner == "flink") { + runnerConstructor = require("./flink").flinkRunner; + } else if (options.runner == "dataflow") { + runnerConstructor = require("./dataflow").dataflowRunner; + } else { + throw new Error("Unknown runner: " + options.runner); + } + return runnerConstructor(options); +} + /** * A Runner is the object that takes a pipeline definition and actually * executes, e.g. locally or on a distributed system. */ -export class Runner { +export abstract class Runner { /** * Runs the transform. * @@ -64,10 +82,27 @@ export class Runner { return this.runPipeline(p); } - protected async runPipeline( + abstract runPipeline( pipeline: Pipeline, options?: PipelineOptions - ): Promise { - throw new Error("Not implemented."); - } + ): Promise; +} + +export function defaultRunner(defaultOptions: Object): Runner { + return new (class extends Runner { + async runPipeline( + pipeline: Pipeline, + options: Object = {} + ): Promise { + const directRunner = + require("./direct_runner").directRunner(defaultOptions); + if (directRunner.unsupportedFeatures(pipeline, options).length == 0) { + return directRunner.runPipeline(pipeline, options); + } else { + return require("./universal") + .universalRunner(defaultOptions) + .runPipeline(pipeline, options); + } + } + })(); } diff --git a/sdks/typescript/src/apache_beam/runners/universal.ts b/sdks/typescript/src/apache_beam/runners/universal.ts new file mode 100644 index 000000000000..c2c4db14c69f --- /dev/null +++ b/sdks/typescript/src/apache_beam/runners/universal.ts @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { Pipeline } from "../internal/pipeline"; +import { PipelineResult, Runner } from "./runner"; +import { PortableRunner } from "./portable_runner/runner"; +import { PythonService } from "../utils/service"; + +export function universalRunner(runnerOptions: { + [others: string]: any; +}): Runner { + return new (class extends Runner { + async runPipeline( + pipeline: Pipeline, + options: Object = {} + ): Promise { + return new PortableRunner( + runnerOptions as any, + new PythonService( + "apache_beam.runners.portability.local_job_service_main", + ["--port", "{{PORT}}"] + ) + ).runPipeline(pipeline, options); + } + })(); +} diff --git a/sdks/typescript/src/apache_beam/transforms/internal.ts b/sdks/typescript/src/apache_beam/transforms/internal.ts index 232cfd2832fe..27f03c836ab7 100644 --- a/sdks/typescript/src/apache_beam/transforms/internal.ts +++ b/sdks/typescript/src/apache_beam/transforms/internal.ts @@ -55,6 +55,7 @@ export class Impulse extends PTransform> { urn: Impulse.urn, payload: urns.IMPULSE_BUFFER, }); + transformProto.environmentId = ""; return pipeline.createPCollectionInternal(new BytesCoder()); } } @@ -134,6 +135,7 @@ export class GroupByKey extends PTransform< urn: GroupByKey.urn, payload: undefined!, }); + transformProto.environmentId = ""; // TODO: (Cleanup) warn about BsonObjectCoder and (non)deterministic key ordering? const keyCoder = pipeline.getCoder(inputCoderProto.componentCoderIds[0]); diff --git a/sdks/typescript/src/apache_beam/utils/service.ts b/sdks/typescript/src/apache_beam/utils/service.ts index 693731dc8655..bb5554e3123e 100644 --- a/sdks/typescript/src/apache_beam/utils/service.ts +++ b/sdks/typescript/src/apache_beam/utils/service.ts @@ -22,6 +22,7 @@ const os = require("os"); const net = require("net"); const path = require("path"); const childProcess = require("child_process"); +const findGitRoot = require("find-git-root"); // TODO: (Typescript) Why can't the var above be used as a namespace? import { ChildProcess } from "child_process"; @@ -47,17 +48,35 @@ export class SubprocessService { process: ChildProcess; cmd: string; args: string[]; + name: string; - constructor(cmd: string, args: string[]) { + constructor( + cmd: string, + args: string[], + name: string | undefined = undefined + ) { this.cmd = cmd; this.args = args; + this.name = name || cmd; + } + + static async freePort(): Promise { + return new Promise((resolve) => { + const srv = net.createServer(); + srv.listen(0, () => { + const port = srv.address().port; + srv.close((_) => resolve(port)); + }); + }); } async start() { - // TODO: (Cleanup) Choose a free port. const host = "localhost"; - const port = "7778"; - console.log(this.args.map((arg) => arg.replace("{{PORT}}", port))); + const port = (await SubprocessService.freePort()).toString(); + console.debug( + this.cmd, + this.args.map((arg) => arg.replace("{{PORT}}", port)) + ); this.process = childProcess.spawn( this.cmd, this.args.map((arg) => arg.replace("{{PORT}}", port)), @@ -67,7 +86,11 @@ export class SubprocessService { ); try { + console.debug( + `Waiting for ${this.name} to be available on port ${port}.` + ); await this.portReady(port, host, 10000); + console.debug(`Service ${this.name} available.`); } catch (error) { this.process.kill(); throw error; @@ -77,6 +100,7 @@ export class SubprocessService { } async stop() { + console.log(`Tearing down ${this.name}.`); this.process.kill(); } @@ -91,9 +115,9 @@ export class SubprocessService { try { await new Promise((resolve, reject) => { const socket = net.createConnection(port, host, () => { - resolve(); - socket.end(); connected = true; + socket.end(); + resolve(); }); socket.on("error", (err) => { reject(err); @@ -123,10 +147,12 @@ export function serviceProviderFromJavaGradleTarget( }; } +const BEAM_CACHE = path.join(os.homedir(), ".apache_beam", "cache"); + export class JavaJarService extends SubprocessService { static APACHE_REPOSITORY = "https://repo.maven.apache.org/maven2"; static BEAM_GROUP_ID = "org.apache.beam"; - static JAR_CACHE = path.join(os.homedir(), ".apache_beam", "cache", "jars"); + static JAR_CACHE = path.join(BEAM_CACHE, "jars"); constructor(jar: string, args: string[] | undefined = undefined) { if (args == undefined) { @@ -185,16 +211,7 @@ export class JavaJarService extends SubprocessService { } const gradlePackage = gradleTarget.match(/^:?(.*):[^:]+:?$/)![1]; const artifactId = "beam-" + gradlePackage.replaceAll(":", "-"); - // TODO: Do this more robustly, e.g. use the git root. - const projectRoot = path.resolve( - __dirname, - "..", - "..", - "..", - "..", - "..", - ".." - ); + const projectRoot = path.dirname(findGitRoot(__dirname)); const localPath = path.join( projectRoot, gradlePackage.replaceAll(":", path.sep), @@ -256,3 +273,60 @@ export class JavaJarService extends SubprocessService { ); } } + +export class PythonService extends SubprocessService { + static VENV_CACHE = path.join(BEAM_CACHE, "venvs"); + + static whichPython(): string { + for (const bin of ["python3", "python"]) { + try { + const result = childProcess.spawnSync(bin, ["--version"]); + if (result.status == 0) { + return bin; + } + } catch (err) { + // Try the next one. + } + } + throw new Error("Can't find a Python executable."); + } + + static beamPython(): string { + const projectRoot = path.dirname(findGitRoot(__dirname)); + // TODO: Package this up with the npm. + const bootstrapScript = path.join( + projectRoot, + "sdks", + "java", + "extensions", + "python", + "src", + "main", + "resources", + "org", + "apache", + "beam", + "sdk", + "extensions", + "python", + "bootstrap_beam_venv.py" + ); + console.debug("Invoking Python bootstrap script."); + const result = childProcess.spawnSync( + PythonService.whichPython(), + [bootstrapScript], + { encoding: "latin1" } + ); + if (result.status == 0) { + console.debug(result.stdout); + const lines = result.stdout.trim().split("\n"); + return lines[lines.length - 1]; + } else { + throw new Error(result.output); + } + } + + constructor(module: string, args: string[] = []) { + super(PythonService.beamPython(), ["-u", "-m", module].concat(args)); + } +} diff --git a/sdks/typescript/src/apache_beam/worker/data.ts b/sdks/typescript/src/apache_beam/worker/data.ts index b68ba41ff46a..436010c109a3 100644 --- a/sdks/typescript/src/apache_beam/worker/data.ts +++ b/sdks/typescript/src/apache_beam/worker/data.ts @@ -72,6 +72,9 @@ export class MultiplexingDataChannel { } } }); + this.dataChannel.on("error", (err) => { + console.log("Data channel error", err); + }); } close() { diff --git a/sdks/typescript/src/apache_beam/worker/external_worker_service.ts b/sdks/typescript/src/apache_beam/worker/external_worker_service.ts index 02f77e0d8770..f20b8fb904a7 100644 --- a/sdks/typescript/src/apache_beam/worker/external_worker_service.ts +++ b/sdks/typescript/src/apache_beam/worker/external_worker_service.ts @@ -36,13 +36,12 @@ export class ExternalWorkerPool { server: grpc.Server; workers: Map = new Map(); - // TODO: (Cleanup) Choose a free port. - constructor(address: string = "localhost:5555") { + constructor(address: string = "localhost:0") { this.address = address; } - start() { - console.log("Starting the workers at ", this.address); + async start(): Promise { + console.log("Starting loopback workers at ", this.address); const this_ = this; this.server = new grpc.Server(); @@ -87,23 +86,35 @@ export class ExternalWorkerPool { }, }; - this.server.bindAsync( - this.address, - grpc.ServerCredentials.createInsecure(), - (err: Error | null, port: number) => { - if (err) { - console.error(`Server error: ${err.message}`); - } else { - console.log(`Server bound on port: ${port}`); - this_.server.start(); - } - } - ); - this.server.addService(beamFnExternalWorkerPoolDefinition, workerService); + + return new Promise((resolve, reject) => { + this.server.bindAsync( + this.address, + grpc.ServerCredentials.createInsecure(), + (err: Error | null, port: number) => { + if (err) { + reject(`Error starting loopback service: ${err.message}`); + } else { + console.log(`Server bound on port: ${port}`); + this_.address = `localhost:${port}`; + this_.server.start(); + resolve(this_.address); + } + } + ); + }); } - stop() { + async stop(timeoutMs = 100) { + console.debug("Shutting down external workers."); + // Let the runner attempt to gracefully shut these down. + const start = Date.now(); + while (Date.now() - start < timeoutMs) { + if (this.workers.size) { + await new Promise((r) => setTimeout(r, timeoutMs / 10)); + } + } this.server.forceShutdown(); } } diff --git a/sdks/typescript/src/apache_beam/worker/worker.ts b/sdks/typescript/src/apache_beam/worker/worker.ts index 722ff6359341..8f30f26126f1 100644 --- a/sdks/typescript/src/apache_beam/worker/worker.ts +++ b/sdks/typescript/src/apache_beam/worker/worker.ts @@ -88,10 +88,18 @@ export class Worker { this.controlChannel.on("end", () => { console.log("Control channel closed."); for (const dataChannel of this.dataChannels.values()) { - dataChannel.close(); + try { + // Best effort. + dataChannel.close(); + } finally { + } } for (const stateChannel of this.stateChannels.values()) { - stateChannel.close(); + try { + // Best effort. + stateChannel.close(); + } finally { + } } }); }