-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
Copy pathautoPipelining.ts
192 lines (171 loc) · 5.4 KB
/
autoPipelining.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
import { isArguments, noop } from "./utils/lodash";
import * as calculateSlot from "cluster-key-slot";
import asCallback from "standard-as-callback";
import { ArgumentType } from "./Command";
export const kExec = Symbol("exec");
export const kCallbacks = Symbol("callbacks");
export const notAllowedAutoPipelineCommands = [
"auth",
"info",
"script",
"quit",
"cluster",
"pipeline",
"multi",
"subscribe",
"psubscribe",
"unsubscribe",
"unpsubscribe",
];
function executeAutoPipeline(client, slotKey: string) {
/*
If a pipeline is already executing, keep queueing up commands
since ioredis won't serve two pipelines at the same time
*/
if (client._runningAutoPipelines.has(slotKey)) {
return;
}
if (!client._autoPipelines.has(slotKey)) {
/*
Rare edge case. Somehow, something has deleted this running autopipeline in an immediate
call to executeAutoPipeline.
Maybe the callback in the pipeline.exec is sometimes called in the same tick,
e.g. if redis is disconnected?
*/
return;
}
client._runningAutoPipelines.add(slotKey);
// Get the pipeline and immediately delete it so that new commands are queued on a new pipeline
const pipeline = client._autoPipelines.get(slotKey);
client._autoPipelines.delete(slotKey);
const callbacks = pipeline[kCallbacks];
// Stop keeping a reference to callbacks immediately after the callbacks stop being used.
// This allows the GC to reclaim objects referenced by callbacks, especially with 16384 slots
// in Redis.Cluster
pipeline[kCallbacks] = null;
// Perform the call
pipeline.exec(function (err, results) {
client._runningAutoPipelines.delete(slotKey);
/*
Invoke all callback in nextTick so the stack is cleared
and callbacks can throw errors without affecting other callbacks.
*/
if (err) {
for (let i = 0; i < callbacks.length; i++) {
process.nextTick(callbacks[i], err);
}
} else {
for (let i = 0; i < callbacks.length; i++) {
process.nextTick(callbacks[i], ...results[i]);
}
}
// If there is another pipeline on the same node, immediately execute it without waiting for nextTick
if (client._autoPipelines.has(slotKey)) {
executeAutoPipeline(client, slotKey);
}
});
}
export function shouldUseAutoPipelining(
client,
functionName: string,
commandName: string
): boolean {
return (
functionName &&
client.options.enableAutoPipelining &&
!client.isPipeline &&
!notAllowedAutoPipelineCommands.includes(commandName) &&
!client.options.autoPipeliningIgnoredCommands.includes(commandName)
);
}
export function getFirstValueInFlattenedArray(
args: ArgumentType[]
): string | Buffer | number | null | undefined {
for (let i = 0; i < args.length; i++) {
const arg = args[i];
if (typeof arg === "string") {
return arg;
} else if (Array.isArray(arg) || isArguments(arg)) {
if (arg.length === 0) {
continue;
}
return arg[0];
}
const flattened = [arg].flat();
if (flattened.length > 0) {
return flattened[0];
}
}
return undefined;
}
export function executeWithAutoPipelining(
client,
functionName: string,
commandName: string,
args: ArgumentType[],
callback
) {
// On cluster mode let's wait for slots to be available
if (client.isCluster && !client.slots.length) {
if (client.status === "wait") client.connect().catch(noop);
return asCallback(
new Promise(function (resolve, reject) {
client.delayUntilReady((err) => {
if (err) {
reject(err);
return;
}
executeWithAutoPipelining(
client,
functionName,
commandName,
args,
null
).then(resolve, reject);
});
}),
callback
);
}
// If we have slot information, we can improve routing by grouping slots served by the same subset of nodes
// Note that the first value in args may be a (possibly empty) array.
// ioredis will only flatten one level of the array, in the Command constructor.
const prefix = client.options.keyPrefix || "";
const slotKey = client.isCluster
? client.slots[
calculateSlot(`${prefix}${getFirstValueInFlattenedArray(args)}`)
].join(",")
: "main";
if (!client._autoPipelines.has(slotKey)) {
const pipeline = client.pipeline();
pipeline[kExec] = false;
pipeline[kCallbacks] = [];
client._autoPipelines.set(slotKey, pipeline);
}
const pipeline = client._autoPipelines.get(slotKey);
/*
Mark the pipeline as scheduled.
The symbol will make sure that the pipeline is only scheduled once per tick.
New commands are appended to an already scheduled pipeline.
*/
if (!pipeline[kExec]) {
pipeline[kExec] = true;
/*
Deferring with setImmediate so we have a chance to capture multiple
commands that can be scheduled by I/O events already in the event loop queue.
*/
setImmediate(executeAutoPipeline, client, slotKey);
}
// Create the promise which will execute the command in the pipeline.
const autoPipelinePromise = new Promise(function (resolve, reject) {
pipeline[kCallbacks].push(function (err: Error | null, value: any) {
if (err) {
reject(err);
return;
}
resolve(value);
});
pipeline[functionName](...args);
});
return asCallback(autoPipelinePromise, callback);
}