-
-
Notifications
You must be signed in to change notification settings - Fork 151
/
scheduledJobs.ts
executable file
·148 lines (130 loc) · 3.91 KB
/
scheduledJobs.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
#!/usr/bin/env ts-node
// We'll use https://github.com/tejasmanohar/node-schedule for this example,
// but there are many other excellent node scheduling projects
import * as schedule from "node-schedule";
import { Queue, Scheduler, Worker } from "../src";
/* In your projects:
import { Queue, Scheduler, Worker } from "node-resque";
*/
// ////////////////////////
// SET UP THE CONNECTION //
// ////////////////////////
const connectionDetails = {
pkg: "ioredis",
host: "127.0.0.1",
password: null,
port: 6379,
database: 0,
// namespace: 'resque',
// looping: true,
// options: {password: 'abc'},
};
async function boot() {
// ///////////////////////////
// DEFINE YOUR WORKER TASKS //
// ///////////////////////////
const jobs = {
ticktock: (time, callback) => {
console.log(`*** THE TIME IS ${time} ***`);
return true;
},
};
// /////////////////
// START A WORKER //
// /////////////////
const worker = new Worker(
{ connection: connectionDetails, queues: ["time"] },
jobs,
);
await worker.connect();
worker.start();
// ////////////////////
// START A SCHEDULER //
// ////////////////////
const scheduler = new Scheduler({ connection: connectionDetails });
await scheduler.connect();
scheduler.start();
// //////////////////////
// REGESTER FOR EVENTS //
// //////////////////////
worker.on("start", () => {
console.log("worker started");
});
worker.on("end", () => {
console.log("worker ended");
});
worker.on("cleaning_worker", (worker, pid) => {
console.log(`cleaning old worker ${worker}`);
});
worker.on("poll", (queue) => {
console.log(`worker polling ${queue}`);
});
worker.on("job", (queue, job) => {
console.log(`working job ${queue} ${JSON.stringify(job)}`);
});
worker.on("reEnqueue", (queue, job, plugin) => {
console.log(`reEnqueue job (${plugin}) ${queue} ${JSON.stringify(job)}`);
});
worker.on("success", (queue, job, result) => {
console.log(`job success ${queue} ${JSON.stringify(job)} >> ${result}`);
});
worker.on("failure", (queue, job, failure) => {
console.log(`job failure ${queue} ${JSON.stringify(job)} >> ${failure}`);
});
worker.on("error", (error, queue, job) => {
console.log(`error ${queue} ${JSON.stringify(job)} >> ${error}`);
});
worker.on("pause", () => {
console.log("worker paused");
});
scheduler.on("start", () => {
console.log("scheduler started");
});
scheduler.on("end", () => {
console.log("scheduler ended");
});
scheduler.on("poll", () => {
console.log("scheduler polling");
});
scheduler.on("leader", () => {
console.log("scheduler became leader");
});
scheduler.on("error", (error) => {
console.log(`scheduler error >> ${error}`);
});
scheduler.on("workingTimestamp", (timestamp) => {
console.log(`scheduler working timestamp ${timestamp}`);
});
scheduler.on("transferredJob", (timestamp, job) => {
console.log(`scheduler enquing job ${timestamp} >> ${JSON.stringify(job)}`);
});
// //////////////
// DEFINE JOBS //
// //////////////
const queue = new Queue({ connection: connectionDetails }, jobs);
queue.on("error", function (error) {
console.log(error);
});
await queue.connect();
schedule.scheduleJob("0,10,20,30,40,50 * * * * *", async () => {
// do this job every 10 seconds, cron style
// we want to ensure that only one instance of this job is scheduled in our enviornment at once,
// no matter how many schedulers we have running
if (scheduler.leader) {
console.log(">>> enquing a job");
await queue.enqueue("time", "ticktock", [new Date().toString()]);
}
});
// ////////////////////
// SHUTDOWN HELPERS //
// ////////////////////
const shutdown = async () => {
await scheduler.end();
await worker.end();
console.log("bye.");
process.exit();
};
process.on("SIGTERM", shutdown);
process.on("SIGINT", shutdown);
}
boot();