Skip to content

Commit

Permalink
Adjust task changes from PR for release.
Browse files Browse the repository at this point in the history
  • Loading branch information
mlev committed Oct 18, 2023
1 parent 4f31c51 commit e207a5c
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 100 deletions.
28 changes: 28 additions & 0 deletions .changeset/polite-walls-glow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
---
"@mondomob/gae-js-tasks": minor
---

Add support for creating Http Target tasks (thanks @VivekRajagopal!).

Use this to target tasks handlers on any http address - i.e. non app engine handlers, app engine
handlers hosted in a different project from the task queue or app engine handlers
but not via the default appspot domain.

When creating the task service specify the target host and optional authentication configuration.

```typescript
// Create service
const taskQueueService = new TaskQueueService({
httpTargetHost: "https://my-host.com",
oidcToken: {
serviceAccountEmail: "[email protected]",
audience: "my-audience",
},
});

// Create tasks
// e.g. this will result in a task request of: POST https://my-host.com/tasks/example-task
await taskQueueService.enqueue("example-task", { data: { key: "value1" } })
```


35 changes: 16 additions & 19 deletions packages/gae-js-tasks/src/tasks/local-tasks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,16 @@ import { CreateTaskRequest } from "./types";
const taskNames = new Set<string>();
const logger = createLogger("LocalTasks");

export const localTasksServiceAccountEmailKey = "x-local-tasks-service-account-email";
export const createLocalTask = async (targetHost: string, createTaskRequest: CreateTaskRequest) => {
const { parent, task } = createTaskRequest;
if (!parent || !task) throw new BadRequestError("parent and task must be supplied");

const { appEngineHttpRequest, httpRequest } = task;
if (!appEngineHttpRequest && !httpRequest)
throw new BadRequestError("appEngineHttpRequest or httpRequest must be defined");
if (!appEngineHttpRequest && !httpRequest) {
throw new BadRequestError("appEngineHttpRequest or httpRequest must be supplied");
}

const getEndpoint = () => {
const getEndpoint = (): string => {
if (appEngineHttpRequest) {
return `${targetHost}${appEngineHttpRequest.relativeUri}`;
}
Expand All @@ -24,14 +24,12 @@ export const createLocalTask = async (targetHost: string, createTaskRequest: Cre
const url = new URL(httpRequest.url);
return `${targetHost}${url.pathname}`;
}

throw new BadRequestError("endpoint could not be resolved");
};

const endpoint = getEndpoint();

if (!endpoint) {
throw new BadRequestError("endpoint could not be resolved");
}

if (task.name) {
if (taskNames.has(task.name)) {
throw {
Expand All @@ -43,15 +41,9 @@ export const createLocalTask = async (targetHost: string, createTaskRequest: Cre
}

const delayMs = task.scheduleTime?.seconds ? Number(task.scheduleTime?.seconds) * 1000 - new Date().getTime() : 0;
const getBody = () => {
if (appEngineHttpRequest) {
return Buffer.from(appEngineHttpRequest.body as string, "base64").toString("ascii");
}

if (httpRequest) {
return Buffer.from(httpRequest.body as string, "base64").toString("ascii");
}
};
const bodyData = appEngineHttpRequest ? appEngineHttpRequest.body : httpRequest?.body;
const body = bodyData ? Buffer.from(bodyData as string, "base64").toString("ascii") : undefined;

// Intentionally don't return this promise because we want the task to be executed
// asynchronously - i.e. a tiny bit like a task queue would work. Otherwise, if the caller
Expand All @@ -61,11 +53,16 @@ export const createLocalTask = async (targetHost: string, createTaskRequest: Cre
.then(() => {
return fetch(endpoint, {
method: "POST",
body: getBody(),
body,
headers: {
"content-type": "application/json",
"x-appengine-taskname": endpoint,
[localTasksServiceAccountEmailKey]: httpRequest?.oidcToken?.serviceAccountEmail || "",
...(appEngineHttpRequest ? { "x-appengine-taskname": appEngineHttpRequest.relativeUri ?? "" } : {}),
...(httpRequest
? {
"x-local-tasks-oidc-service-account-email": httpRequest.oidcToken?.serviceAccountEmail ?? "",
"x-local-tasks-oidc-audience": httpRequest.oidcToken?.audience ?? "",
}
: {}),
},
});
})
Expand Down
36 changes: 29 additions & 7 deletions packages/gae-js-tasks/src/tasks/task-queue-service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,25 +95,47 @@ describe("TaskQueueService", () => {
);

it(
"creates task params for host override routing",
"creates http target task params for host override routing",
withEnvVars({ [ENV_VAR_RUNTIME_ENVIRONMENT]: "appengine" }, async () => {
tasksProvider.init();
taskQueueService = new TaskQueueService({
appEngineHost: "https://my-host.com",
oidcServiceAccountEmail: "[email protected]",
httpTargetHost: "https://my-host.com",
oidcToken: {
serviceAccountEmail: "[email protected]",
audience: "my-audience",
},
});

await taskQueueService.enqueue("test-task", { data: { key: "value1" } });

expectTaskParams({
httpRequest: {
url: "https://my-host.com/tasks/test-task",
httpMethod: "POST",
headers: {
"Content-Type": "application/json",
},
body: Buffer.from(JSON.stringify({ key: "value1" })).toString("base64"),
oidcToken: { serviceAccountEmail: "[email protected]" },
oidcToken: { serviceAccountEmail: "[email protected]", audience: "my-audience" },
},
});
})
);

it(
"creates http target task params for host override routing without auth",
withEnvVars({ [ENV_VAR_RUNTIME_ENVIRONMENT]: "appengine" }, async () => {
tasksProvider.init();
taskQueueService = new TaskQueueService({ httpTargetHost: "https://my-host.com" });

await taskQueueService.enqueue("test-task", { data: { key: "value1" } });

expectTaskParams({
httpRequest: {
url: "https://my-host.com/tasks/test-task",
headers: {
"Content-Type": "application/json",
},
body: Buffer.from(JSON.stringify({ key: "value1" })).toString("base64"),
},
});
})
Expand Down Expand Up @@ -212,9 +234,9 @@ describe("TaskQueueService", () => {
await waitUntil(() => scope.isDone());
});

it("posts to local task service given appEngineHost override", async () => {
it("posts to local task service given httpTargetHost override", async () => {
const scope = nock("http://127.0.0.1").post("/tasks/local-task").reply(204);
taskQueueService = new TaskQueueService({ appEngineHost: "https://my-host.com" });
taskQueueService = new TaskQueueService({ httpTargetHost: "https://my-host.com" });
await taskQueueService.enqueue("/local-task");
await waitUntil(() => scope.isDone());
});
Expand Down
93 changes: 51 additions & 42 deletions packages/gae-js-tasks/src/tasks/task-queue-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,19 @@ import { CloudTasksClient } from "@google-cloud/tasks";
import { Status } from "google-gax";
import { configurationProvider, createLogger, runningOnGcp } from "@mondomob/gae-js-core";
import {
AppEngineTargetOptions,
CreateTaskQueueServiceOptions,
CreateTaskRequest,
HttpTargetOptions,
IAppEngineHttpRequest,
IHttpRequest,
TaskOptions,
TaskQueueServiceOptions,
TaskThrottle,
} from "./types";
import { tasksProvider } from "./tasks-provider";
import { createLocalTask } from "./local-tasks";
import { isGoogleGaxError } from "../utils/errors";
import { google } from "@google-cloud/tasks/build/protos/protos";

export class TaskQueueService {
private logger = createLogger("taskQueueService");
Expand Down Expand Up @@ -73,26 +76,62 @@ export class TaskQueueService {
}
}

private buildTask(path: string, options: TaskOptions): CreateTaskRequest {
protected buildTask(path: string, options: TaskOptions): CreateTaskRequest {
const { projectId, location, queueName } = this.options;
const queuePath = runningOnGcp()
? this.getTasksClient().queuePath(projectId, location, queueName)
: `projects/${projectId}/locations/${location}/queues/${queueName}`;
this.logger.info(`Using queue path: ${queuePath}`);

const { data = {}, inSeconds, throttle } = options;
const body = JSON.stringify(data);
const requestPayload = Buffer.from(body).toString("base64");
const { inSeconds, throttle } = options;
return {
parent: queuePath,
task: {
...this.taskRouting(path, requestPayload),
...this.taskRequest(path, options),
...this.taskSchedule(inSeconds),
...this.taskThrottle(queuePath, throttle),
},
};
}

private taskRequest(path: string, options: TaskOptions) {
return "httpTargetHost" in this.options
? {
httpRequest: this.httpRequest(path, options),
}
: {
appEngineHttpRequest: this.appEngineRequest(path, options),
};
}

private commonRequest({ data = {} }: TaskOptions) {
const body = JSON.stringify(data);
const requestPayload = Buffer.from(body).toString("base64");
return {
headers: {
"Content-Type": "application/json",
},
body: requestPayload,
};
}

private appEngineRequest(path: string, options: TaskOptions): IAppEngineHttpRequest {
return {
...this.commonRequest(options),
relativeUri: `${this.fullTaskPath(path)}`,
...this.appEngineRouting(),
};
}

private httpRequest(path: string, options: TaskOptions): IHttpRequest {
const { httpTargetHost, oidcToken } = this.options as HttpTargetOptions;
return {
...this.commonRequest(options),
url: `${httpTargetHost}${this.fullTaskPath(path)}`,
...(oidcToken && { oidcToken }),
};
}

private taskSchedule(inSeconds?: number) {
return inSeconds
? {
Expand All @@ -103,47 +142,17 @@ export class TaskQueueService {
: {};
}

private taskRouting(path: string, requestPayload?: string) {
const { tasksRoutingService, tasksRoutingVersion, appEngineHost, oidcServiceAccountEmail } = this.options;

if (appEngineHost) {
const httpRequest: google.cloud.tasks.v2.IHttpRequest = {
url: `${appEngineHost}${this.fullTaskPath(path)}`,
httpMethod: "POST",
body: requestPayload,
headers: {
"Content-Type": "application/json",
},
...(oidcServiceAccountEmail ? { oidcToken: { serviceAccountEmail: oidcServiceAccountEmail } } : {}),
};
return { httpRequest };
}

private appEngineRouting() {
const { tasksRoutingService, tasksRoutingVersion } = this.options as AppEngineTargetOptions;
if (tasksRoutingVersion || tasksRoutingService) {
return {
appEngineHttpRequest: {
relativeUri: `${this.fullTaskPath(path)}`,
headers: {
"Content-Type": "application/json",
},
body: requestPayload,
appEngineRouting: {
...(tasksRoutingService ? { service: tasksRoutingService } : {}),
...(tasksRoutingVersion ? { version: tasksRoutingVersion } : {}),
},
appEngineRouting: {
...(tasksRoutingService ? { service: tasksRoutingService } : {}),
...(tasksRoutingVersion ? { version: tasksRoutingVersion } : {}),
},
};
}

return {
appEngineHttpRequest: {
relativeUri: `${this.fullTaskPath(path)}`,
headers: {
"Content-Type": "application/json",
},
body: requestPayload,
},
};
return {};
}

private taskThrottle(queuePath: string, options?: TaskThrottle) {
Expand Down
Loading

0 comments on commit e207a5c

Please sign in to comment.