diff --git a/.changeset/polite-walls-glow.md b/.changeset/polite-walls-glow.md new file mode 100644 index 0000000..0b4b719 --- /dev/null +++ b/.changeset/polite-walls-glow.md @@ -0,0 +1,28 @@ +--- +"@mondomob/gae-js-tasks": minor +--- + +Add support for creating Http Target tasks (thanks @VivekRajagopal!). + +Use this to target tasks handlers on any http address - i.e. non app engine handlers, app engine +handlers hosted in a different project from the task queue or app engine handlers +but not via the default appspot domain. + +When creating the task service specify the target host and optional authentication configuration. + +```typescript +// Create service +const taskQueueService = new TaskQueueService({ + httpTargetHost: "https://my-host.com", + oidcToken: { + serviceAccountEmail: "my-service-account@example.com", + audience: "my-audience", + }, +}); + +// Create tasks +// e.g. this will result in a task request of: POST https://my-host.com/tasks/example-task +await taskQueueService.enqueue("example-task", { data: { key: "value1" } }) +``` + + diff --git a/packages/gae-js-tasks/src/tasks/local-tasks.ts b/packages/gae-js-tasks/src/tasks/local-tasks.ts index 2fe7d5b..f15de8a 100644 --- a/packages/gae-js-tasks/src/tasks/local-tasks.ts +++ b/packages/gae-js-tasks/src/tasks/local-tasks.ts @@ -6,16 +6,16 @@ import { CreateTaskRequest } from "./types"; const taskNames = new Set(); const logger = createLogger("LocalTasks"); -export const localTasksServiceAccountEmailKey = "x-local-tasks-service-account-email"; export const createLocalTask = async (targetHost: string, createTaskRequest: CreateTaskRequest) => { const { parent, task } = createTaskRequest; if (!parent || !task) throw new BadRequestError("parent and task must be supplied"); const { appEngineHttpRequest, httpRequest } = task; - if (!appEngineHttpRequest && !httpRequest) - throw new BadRequestError("appEngineHttpRequest or httpRequest must be defined"); + if (!appEngineHttpRequest && !httpRequest) { + throw new BadRequestError("appEngineHttpRequest or httpRequest must be supplied"); + } - const getEndpoint = () => { + const getEndpoint = (): string => { if (appEngineHttpRequest) { return `${targetHost}${appEngineHttpRequest.relativeUri}`; } @@ -24,14 +24,12 @@ export const createLocalTask = async (targetHost: string, createTaskRequest: Cre const url = new URL(httpRequest.url); return `${targetHost}${url.pathname}`; } + + throw new BadRequestError("endpoint could not be resolved"); }; const endpoint = getEndpoint(); - if (!endpoint) { - throw new BadRequestError("endpoint could not be resolved"); - } - if (task.name) { if (taskNames.has(task.name)) { throw { @@ -43,15 +41,9 @@ export const createLocalTask = async (targetHost: string, createTaskRequest: Cre } const delayMs = task.scheduleTime?.seconds ? Number(task.scheduleTime?.seconds) * 1000 - new Date().getTime() : 0; - const getBody = () => { - if (appEngineHttpRequest) { - return Buffer.from(appEngineHttpRequest.body as string, "base64").toString("ascii"); - } - if (httpRequest) { - return Buffer.from(httpRequest.body as string, "base64").toString("ascii"); - } - }; + const bodyData = appEngineHttpRequest ? appEngineHttpRequest.body : httpRequest?.body; + const body = bodyData ? Buffer.from(bodyData as string, "base64").toString("ascii") : undefined; // Intentionally don't return this promise because we want the task to be executed // asynchronously - i.e. a tiny bit like a task queue would work. Otherwise, if the caller @@ -61,11 +53,16 @@ export const createLocalTask = async (targetHost: string, createTaskRequest: Cre .then(() => { return fetch(endpoint, { method: "POST", - body: getBody(), + body, headers: { "content-type": "application/json", - "x-appengine-taskname": endpoint, - [localTasksServiceAccountEmailKey]: httpRequest?.oidcToken?.serviceAccountEmail || "", + ...(appEngineHttpRequest ? { "x-appengine-taskname": appEngineHttpRequest.relativeUri ?? "" } : {}), + ...(httpRequest + ? { + "x-local-tasks-oidc-service-account-email": httpRequest.oidcToken?.serviceAccountEmail ?? "", + "x-local-tasks-oidc-audience": httpRequest.oidcToken?.audience ?? "", + } + : {}), }, }); }) diff --git a/packages/gae-js-tasks/src/tasks/task-queue-service.test.ts b/packages/gae-js-tasks/src/tasks/task-queue-service.test.ts index 1f026ba..d89a194 100644 --- a/packages/gae-js-tasks/src/tasks/task-queue-service.test.ts +++ b/packages/gae-js-tasks/src/tasks/task-queue-service.test.ts @@ -95,12 +95,15 @@ describe("TaskQueueService", () => { ); it( - "creates task params for host override routing", + "creates http target task params for host override routing", withEnvVars({ [ENV_VAR_RUNTIME_ENVIRONMENT]: "appengine" }, async () => { tasksProvider.init(); taskQueueService = new TaskQueueService({ - appEngineHost: "https://my-host.com", - oidcServiceAccountEmail: "sacount@gnet.com", + httpTargetHost: "https://my-host.com", + oidcToken: { + serviceAccountEmail: "sacount@gnet.com", + audience: "my-audience", + }, }); await taskQueueService.enqueue("test-task", { data: { key: "value1" } }); @@ -108,12 +111,31 @@ describe("TaskQueueService", () => { expectTaskParams({ httpRequest: { url: "https://my-host.com/tasks/test-task", - httpMethod: "POST", headers: { "Content-Type": "application/json", }, body: Buffer.from(JSON.stringify({ key: "value1" })).toString("base64"), - oidcToken: { serviceAccountEmail: "sacount@gnet.com" }, + oidcToken: { serviceAccountEmail: "sacount@gnet.com", audience: "my-audience" }, + }, + }); + }) + ); + + it( + "creates http target task params for host override routing without auth", + withEnvVars({ [ENV_VAR_RUNTIME_ENVIRONMENT]: "appengine" }, async () => { + tasksProvider.init(); + taskQueueService = new TaskQueueService({ httpTargetHost: "https://my-host.com" }); + + await taskQueueService.enqueue("test-task", { data: { key: "value1" } }); + + expectTaskParams({ + httpRequest: { + url: "https://my-host.com/tasks/test-task", + headers: { + "Content-Type": "application/json", + }, + body: Buffer.from(JSON.stringify({ key: "value1" })).toString("base64"), }, }); }) @@ -212,9 +234,9 @@ describe("TaskQueueService", () => { await waitUntil(() => scope.isDone()); }); - it("posts to local task service given appEngineHost override", async () => { + it("posts to local task service given httpTargetHost override", async () => { const scope = nock("http://127.0.0.1").post("/tasks/local-task").reply(204); - taskQueueService = new TaskQueueService({ appEngineHost: "https://my-host.com" }); + taskQueueService = new TaskQueueService({ httpTargetHost: "https://my-host.com" }); await taskQueueService.enqueue("/local-task"); await waitUntil(() => scope.isDone()); }); diff --git a/packages/gae-js-tasks/src/tasks/task-queue-service.ts b/packages/gae-js-tasks/src/tasks/task-queue-service.ts index 2de74b6..2254bd0 100644 --- a/packages/gae-js-tasks/src/tasks/task-queue-service.ts +++ b/packages/gae-js-tasks/src/tasks/task-queue-service.ts @@ -3,8 +3,12 @@ import { CloudTasksClient } from "@google-cloud/tasks"; import { Status } from "google-gax"; import { configurationProvider, createLogger, runningOnGcp } from "@mondomob/gae-js-core"; import { + AppEngineTargetOptions, CreateTaskQueueServiceOptions, CreateTaskRequest, + HttpTargetOptions, + IAppEngineHttpRequest, + IHttpRequest, TaskOptions, TaskQueueServiceOptions, TaskThrottle, @@ -12,7 +16,6 @@ import { import { tasksProvider } from "./tasks-provider"; import { createLocalTask } from "./local-tasks"; import { isGoogleGaxError } from "../utils/errors"; -import { google } from "@google-cloud/tasks/build/protos/protos"; export class TaskQueueService { private logger = createLogger("taskQueueService"); @@ -73,26 +76,62 @@ export class TaskQueueService { } } - private buildTask(path: string, options: TaskOptions): CreateTaskRequest { + protected buildTask(path: string, options: TaskOptions): CreateTaskRequest { const { projectId, location, queueName } = this.options; const queuePath = runningOnGcp() ? this.getTasksClient().queuePath(projectId, location, queueName) : `projects/${projectId}/locations/${location}/queues/${queueName}`; this.logger.info(`Using queue path: ${queuePath}`); - const { data = {}, inSeconds, throttle } = options; - const body = JSON.stringify(data); - const requestPayload = Buffer.from(body).toString("base64"); + const { inSeconds, throttle } = options; return { parent: queuePath, task: { - ...this.taskRouting(path, requestPayload), + ...this.taskRequest(path, options), ...this.taskSchedule(inSeconds), ...this.taskThrottle(queuePath, throttle), }, }; } + private taskRequest(path: string, options: TaskOptions) { + return "httpTargetHost" in this.options + ? { + httpRequest: this.httpRequest(path, options), + } + : { + appEngineHttpRequest: this.appEngineRequest(path, options), + }; + } + + private commonRequest({ data = {} }: TaskOptions) { + const body = JSON.stringify(data); + const requestPayload = Buffer.from(body).toString("base64"); + return { + headers: { + "Content-Type": "application/json", + }, + body: requestPayload, + }; + } + + private appEngineRequest(path: string, options: TaskOptions): IAppEngineHttpRequest { + return { + ...this.commonRequest(options), + relativeUri: `${this.fullTaskPath(path)}`, + ...this.appEngineRouting(), + }; + } + + private httpRequest(path: string, options: TaskOptions): IHttpRequest { + const { httpTargetHost, oidcToken } = this.options as HttpTargetOptions; + return { + ...this.commonRequest(options), + url: `${httpTargetHost}${this.fullTaskPath(path)}`, + ...(oidcToken && { oidcToken }), + }; + } + private taskSchedule(inSeconds?: number) { return inSeconds ? { @@ -103,47 +142,17 @@ export class TaskQueueService { : {}; } - private taskRouting(path: string, requestPayload?: string) { - const { tasksRoutingService, tasksRoutingVersion, appEngineHost, oidcServiceAccountEmail } = this.options; - - if (appEngineHost) { - const httpRequest: google.cloud.tasks.v2.IHttpRequest = { - url: `${appEngineHost}${this.fullTaskPath(path)}`, - httpMethod: "POST", - body: requestPayload, - headers: { - "Content-Type": "application/json", - }, - ...(oidcServiceAccountEmail ? { oidcToken: { serviceAccountEmail: oidcServiceAccountEmail } } : {}), - }; - return { httpRequest }; - } - + private appEngineRouting() { + const { tasksRoutingService, tasksRoutingVersion } = this.options as AppEngineTargetOptions; if (tasksRoutingVersion || tasksRoutingService) { return { - appEngineHttpRequest: { - relativeUri: `${this.fullTaskPath(path)}`, - headers: { - "Content-Type": "application/json", - }, - body: requestPayload, - appEngineRouting: { - ...(tasksRoutingService ? { service: tasksRoutingService } : {}), - ...(tasksRoutingVersion ? { version: tasksRoutingVersion } : {}), - }, + appEngineRouting: { + ...(tasksRoutingService ? { service: tasksRoutingService } : {}), + ...(tasksRoutingVersion ? { version: tasksRoutingVersion } : {}), }, }; } - - return { - appEngineHttpRequest: { - relativeUri: `${this.fullTaskPath(path)}`, - headers: { - "Content-Type": "application/json", - }, - body: requestPayload, - }, - }; + return {}; } private taskThrottle(queuePath: string, options?: TaskThrottle) { diff --git a/packages/gae-js-tasks/src/tasks/types.ts b/packages/gae-js-tasks/src/tasks/types.ts index ab3ac75..04704d4 100644 --- a/packages/gae-js-tasks/src/tasks/types.ts +++ b/packages/gae-js-tasks/src/tasks/types.ts @@ -3,38 +3,52 @@ import { CloudTasksClient } from "@google-cloud/tasks"; type Mandatory = Pick, K> & Omit; export type CreateTaskRequest = Parameters[0]; +export type ITask = NonNullable; +export type IHttpRequest = NonNullable; +export type IAppEngineHttpRequest = NonNullable; +export type IOidcToken = NonNullable; -export type CreateTaskQueueServiceRouting = - | { - /** - * The specific App Engine version to dispatch requests to. - */ - tasksRoutingVersion?: string; - /** - * The specific App Engine service to dispatch requests to. - */ - tasksRoutingService?: string; - appEngineHost?: never; - oidcServiceAccountEmail?: never; - } - | { - tasksRoutingVersion?: never; - tasksRoutingService?: never; +/** + * Options specific to creation of App Engine Tasks. + */ +export type AppEngineTargetOptions = { + /** + * The specific App Engine version to dispatch requests to. + */ + tasksRoutingVersion?: string; + /** + * The specific App Engine service to dispatch requests to. + */ + tasksRoutingService?: string; + + httpTargetHost?: never; + oidcToken?: never; +}; + +/** + * Options specific to creation of Http Target Tasks + * NOTE: When using http targets - there is no built-in authentication and task handlers + * must perform their own auth validation. + */ +export type HttpTargetOptions = { + /** + * Override the httpTargetHost when creating HTTP Target tasks. This will create a task with `httpRequest` params. + * Use this when you want the request to be routed to a different host than the default GAE appspot domain. + */ + httpTargetHost: string; + /** + * Should be the email of an existing Service Account in the same project. + * Authorizes the request with a Bearer JWT id token. + */ + oidcToken?: IOidcToken; - /** - * Override the appEngineHost when using a push queue. This will create a task with `httpRequest` params. - * Use this when you want the request to be routed to a different host than the default GAE appspot domain. - */ - appEngineHost?: string; + tasksRoutingVersion?: never; + tasksRoutingService?: never; +}; - /** - * Should be the email of an existing Service Account in the same project. - * Authorizes the request with a Bearer JWT id token. - */ - oidcServiceAccountEmail?: string; - }; +export type TaskQueueTargetOptions = AppEngineTargetOptions | HttpTargetOptions; -export type CreateTaskQueueServiceOptions = { +export type CommonTaskQueueServiceOptions = { /** * Tasks projectId - most likely the same project as your application. * Defaults to application projectId configuration @@ -62,17 +76,19 @@ export type CreateTaskQueueServiceOptions = { * Defaults to application "host" configuration. */ localBaseUrl?: string; - /** * Tasks client to use (if not using tasksProvider) */ tasksClient?: CloudTasksClient; -} & CreateTaskQueueServiceRouting; +}; + +export type CreateTaskQueueServiceOptions = CommonTaskQueueServiceOptions & TaskQueueTargetOptions; export type TaskQueueServiceOptions = Mandatory< - CreateTaskQueueServiceOptions, + CommonTaskQueueServiceOptions, "projectId" | "location" | "queueName" | "pathPrefix" ->; +> & + TaskQueueTargetOptions; export interface TaskOptions

{ /**