Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support partial manual routing #100

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "celery-node",
"version": "0.5.9",
"version": "0.5.10",
"description": "celery written in nodejs",
"main": "dist/index.js",
"types": "dist/index.d.ts",
Expand Down
17 changes: 10 additions & 7 deletions src/app/client.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { v4 } from "uuid";
import Base from "./base";
import Task from "./task";
import type { TaskOptions } from "./task";
import { AsyncResult } from "./result";

class TaskMessage {
Expand All @@ -12,6 +13,7 @@ class TaskMessage {
) {}
}


export default class Client extends Base {
private taskProtocols = {
1: this.asTaskV1,
Expand All @@ -22,7 +24,7 @@ export default class Client extends Base {
return this.taskProtocols[this.conf.TASK_PROTOCOL];
}

public sendTaskMessage(taskName: string, message: TaskMessage): void {
public sendTaskMessage(taskName: string, message: TaskMessage, options: TaskOptions = {}): void {
const { headers, properties, body /*, sentEvent */ } = message;

const exchange = "";
Expand All @@ -32,8 +34,8 @@ export default class Client extends Base {
this.isReady().then(() =>
this.broker.publish(
body,
exchange,
this.conf.CELERY_QUEUE,
options?.exchange ?? exchange,
options?.routingKey ?? this.conf.CELERY_QUEUE,
headers,
properties
)
Expand Down Expand Up @@ -119,8 +121,8 @@ export default class Client extends Base {
* @example
* client.createTask('task.add').delay([1, 2])
*/
public createTask(name: string): Task {
return new Task(this, name);
public createTask(name: string, options?: TaskOptions): Task {
return new Task(this, name, options);
}

/**
Expand All @@ -136,11 +138,12 @@ export default class Client extends Base {
taskName: string,
args?: Array<any>,
kwargs?: object,
taskId?: string
taskId?: string,
options?: TaskOptions
): AsyncResult {
taskId = taskId || v4();
const message = this.createTaskMessage(taskId, taskName, args, kwargs);
this.sendTaskMessage(taskName, message);
this.sendTaskMessage(taskName, message, options);

const result = new AsyncResult(taskId, this.backend);
return result;
Expand Down
24 changes: 21 additions & 3 deletions src/app/task.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,37 @@
import Client from "./client";
import { AsyncResult } from "./result";

/** Task executation options
* Originally allows these keys:
* ['queue', 'routing_key', 'exchange', 'priority', 'expires',
* 'serializer', 'delivery_mode', 'compression', 'time_limit',
* 'soft_time_limit', 'immediate', 'mandatory']
* but now only part of them are supported.
*/
export type TaskOptions = {
exchange?: string;
queue?: string;
routingKey?: string;
}

export default class Task {
client: Client;
name: string;
options?: TaskOptions;

/**
* Asynchronous Task
* @constructor Task
* @param {Client} clinet celery client instance
* @param {string} name celery task name
* @param {Object} [options]
* @param {string} [options.queue] queue name
* @param {string} [options.routingKey] routing key
*/
constructor(client: Client, name: string) {
constructor(client: Client, name: string, options: TaskOptions = {}) {
this.client = client;
this.name = name;
this.options = options;
}

/**
Expand All @@ -28,7 +46,7 @@ export default class Task {
return this.applyAsync([...args]);
}

public applyAsync(args: Array<any>, kwargs?: object): AsyncResult {
public applyAsync(args: Array<any>, kwargs?: object, options?: TaskOptions): AsyncResult {
if (args && !Array.isArray(args)) {
throw new Error("args is not array");
}
Expand All @@ -37,6 +55,6 @@ export default class Task {
throw new Error("kwargs is not object");
}

return this.client.sendTask(this.name, args || [], kwargs || {});
return this.client.sendTask(this.name, args || [], kwargs || {}, undefined, options || this.options);
}
}
37 changes: 34 additions & 3 deletions test/app/test_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ describe("celery functional tests", () => {
"redis://localhost:6379/0"
);

const worker2 = new Worker(
"redis://localhost:6379/0",
"redis://localhost:6379/0",
"my_queue"
);

before(() => {
worker.register("tasks.add", (a, b) => a + b);
worker.register(
Expand All @@ -25,16 +31,21 @@ describe("celery functional tests", () => {
setTimeout(() => resolve(result), delay);
})
);
worker.start();

worker2.register("tasks.multiply", (a, b) => a * b);
Promise.all([worker.start(), worker2.start()]);
});

afterEach(() => {
sinon.restore();
return worker.whenCurrentJobsFinished();
return Promise.all([
worker.whenCurrentJobsFinished(),
worker2.whenCurrentJobsFinished()
]);
});

after(() => {
Promise.all([client.disconnect(), worker.disconnect()]);
Promise.all([client.disconnect(), worker.disconnect(), worker2.disconnect()]);

const redis = new Redis();
redis.flushdb().then(() => redis.quit());
Expand Down Expand Up @@ -112,4 +123,24 @@ describe("celery functional tests", () => {
})
});
});

describe("custom routing key", () => {
it("should create a task with another routing key", done => {
const task = client.createTask("tasks.multiply", { routingKey: "my_queue" });
const result = task.applyAsync([2, 3]);
result.get(500).then((message) => {
assert.equal(message, 6);
})
done();
});

it('should send_task with another routing key', done => {
const task = client.createTask("tasks.multiply");
const result = task.applyAsync([2, 3], undefined, { routingKey: "my_queue" });
result.get(500).then((message) => {
assert.equal(message, 6);
})
done();
})
})
});