Skip to content

Commit

Permalink
Merge pull request apache#10 from robertwb/node-runner
Browse files Browse the repository at this point in the history
node runner
  • Loading branch information
robertwb authored Jan 7, 2022
2 parents d49cc3e + b27b2bc commit 3e23533
Show file tree
Hide file tree
Showing 7 changed files with 427 additions and 0 deletions.
9 changes: 9 additions & 0 deletions sdks/node-ts/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions sdks/node-ts/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
},
"dependencies": {
"@grpc/grpc-js": "^1.4.6",
"@protobuf-ts/grpc-transport": "^2.1.0",
"@protobuf-ts/plugin": "^2.1.0",
"protobufjs": "^6.10.2",
"typescript-formatter": "^7.2.2",
Expand Down
3 changes: 3 additions & 0 deletions sdks/node-ts/src/apache_beam/options/pipeline_options.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import {Struct} from "../proto/google/protobuf/struct";

export type PipelineOptions = Struct;
67 changes: 67 additions & 0 deletions sdks/node-ts/src/apache_beam/runners/node_runner/client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import {ChannelCredentials} from '@grpc/grpc-js';
import {GrpcTransport} from '@protobuf-ts/grpc-transport';
import {RpcTransport} from "@protobuf-ts/runtime-rpc";
import {Struct} from '../../proto/google/protobuf/struct';
import {PrepareJobRequest} from '../../proto/beam_job_api';
import {JobServiceClient, IJobServiceClient} from '../../proto/beam_job_api.client'

import * as runnerApiProto from '../../proto/beam_runner_api';

/**
* Wrapper for JobServiceClient.
*/
export class RemoteJobServiceClient {
client: JobServiceClient;

/**
* @param host Host and port of JobService server.
* @param {optional} transport By default, GrpcTransport is used. Supply an RpcTransport to override.
* @param channelCredentials ChannelCredentials.createInsecure() by default. Override with a ChannelCredentials.
*/
constructor(
host: string,
transport?: RpcTransport,
channelCredentials?: ChannelCredentials,
) {
transport = transport || new GrpcTransport({
host,
channelCredentials: channelCredentials || ChannelCredentials.createInsecure(),
});
this.client = new JobServiceClient(transport!);
}

async prepare(pipeline: runnerApiProto.Pipeline, jobName: string, pipelineOptions?: Struct) {
return await this.callPrepare(this.client, pipeline, jobName, pipelineOptions);
}

async run(preparationId: string) {
return await this.callRun(this.client, preparationId);
}

async getState(jobId: string) {
return await this.callGetState(this.client, jobId);
}

private async callPrepare(
client: IJobServiceClient,
pipeline: runnerApiProto.Pipeline,
jobName: string,
pipelineOptions?: Struct) {
let message: PrepareJobRequest = {pipeline, jobName};
if (pipelineOptions) {
message.pipelineOptions = pipelineOptions;
}
const call = client.prepare(message);
return await call.response;
}

private async callRun(client: IJobServiceClient, preparationId: string) {
const call = client.run({preparationId, retrievalToken: ''});
return await call.response;
}

private async callGetState(client: IJobServiceClient, jobId: string) {
const call = client.getState({jobId});
return await call.response;
}
}
90 changes: 90 additions & 0 deletions sdks/node-ts/src/apache_beam/runners/node_runner/runner.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import {RemoteJobServiceClient} from "./client";
import {Pipeline} from "../../base";
import {PipelineOptions} from "../../options/pipeline_options";
import * as runnerApiProto from '../../proto/beam_runner_api';
import {JobState_Enum} from "../../proto/beam_job_api";

const TERMINAL_STATES = [
JobState_Enum.DONE,
JobState_Enum.FAILED,
JobState_Enum.CANCELLED,
JobState_Enum.UPDATED,
JobState_Enum.DRAINED,
]

export class PipelineResult {
jobId: string;
runner: NodeRunner;

constructor(runner: NodeRunner, jobId: string) {
this.runner = runner;
this.jobId = jobId;
}

static isTerminal(state: JobState_Enum) {
return TERMINAL_STATES.includes(state);
}

async getState() {
return await this.runner.getJobState(this.jobId);
}

/**
* Waits until the pipeline finishes and returns the final status.
* @param duration timeout in milliseconds.
*/
async waitUntilFinish(duration?: number) {
let {state} = await this.getState();
const start = Date.now();
while (!PipelineResult.isTerminal(state)) {
const now = Date.now();
if (duration !== undefined && now - start > duration) {
return state;
}

state = (await this.getState()).state;
}
return state;
}
}

export class NodeRunner {
client: RemoteJobServiceClient;

constructor(client: RemoteJobServiceClient) {
this.client = client;
}

async getJobState(jobId: string) {
return this.client.getState(jobId);
}

async runPipeline(pipeline: Pipeline) {
throw new Error('runPipeline not implemented.')
}

async runPipelineWithProto(
pipeline: runnerApiProto.Pipeline,
jobName: string,
options?: PipelineOptions) {
const {preparationId} = await this.client.prepare(pipeline, jobName, options);
const {jobId} = await this.client.run(preparationId);
return new PipelineResult(this, jobId);
}

async runPipelineWithJsonValueProto(json: string, jobName: string, options?: PipelineOptions) {
return this.runPipelineWithProto(
runnerApiProto.Pipeline.fromJsonString(json),
jobName,
options,
);
}

async runPipelineWithJsonStringProto(json: string, jobName: string, options?: PipelineOptions) {
return this.runPipelineWithProto(
runnerApiProto.Pipeline.fromJsonString(json),
jobName,
options,
);
}
}
25 changes: 25 additions & 0 deletions sdks/node-ts/test/node_runner_test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import fs from 'fs';
import assert from 'assert';

import { RemoteJobServiceClient } from "../src/apache_beam/runners/node_runner/client";
import { NodeRunner } from "../src/apache_beam/runners/node_runner/runner";
import { JobState_Enum } from "../src/apache_beam/proto/beam_job_api";

const JOB_SERVICE_HOST = process.env['JOB_SERVICE_HOST'];
const JSON_PROTO_PATH = __dirname + '/../../test/testdata/pipeline.json';

describe('node runner', () => {
it('runs', async function () {
if (!JOB_SERVICE_HOST) {
this.skip();
}

const pipelineJson = fs.readFileSync(JSON_PROTO_PATH, 'utf-8');

const runner = new NodeRunner(new RemoteJobServiceClient(JOB_SERVICE_HOST));
const pipelineResult = await runner.runPipelineWithJsonStringProto(pipelineJson, 'pipeline');

const event = await pipelineResult.waitUntilFinish(60000);
assert(event == JobState_Enum.DONE);
});
})
Loading

0 comments on commit 3e23533

Please sign in to comment.