Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

handle errors in legacyMode #2394

Merged
merged 1 commit into from
Jan 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 25 additions & 165 deletions packages/client/lib/client/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,14 @@ import { strict as assert } from 'assert';
import testUtils, { GLOBAL, waitTillBeenCalled } from '../test-utils';
import RedisClient, { RedisClientType } from '.';
import { RedisClientMultiCommandType } from './multi-command';
import { RedisCommandRawReply, RedisModules, RedisFunctions, RedisScripts } from '../commands';
import { AbortError, ClientClosedError, ClientOfflineError, ConnectionTimeoutError, DisconnectsClientError, ErrorReply, SocketClosedUnexpectedlyError, WatchError } from '../errors';
import { RedisCommandArguments, RedisCommandRawReply, RedisModules, RedisFunctions, RedisScripts } from '../commands';
import { AbortError, ClientClosedError, ClientOfflineError, ConnectionTimeoutError, DisconnectsClientError, SocketClosedUnexpectedlyError, WatchError } from '../errors';
import { defineScript } from '../lua-script';
import { spy } from 'sinon';
import { once } from 'events';
import { ClientKillFilters } from '../commands/CLIENT_KILL';
import { ClusterSlotStates } from '../commands/CLUSTER_SETSLOT';
import { promisify } from 'util';

// We need to use 'require', because it's not possible with Typescript to import
// function that are exported as 'module.exports = function`, without esModuleInterop
// set to true.
const calculateSlot = require('cluster-key-slot');

export const SQUARE_SCRIPT = defineScript({
SCRIPT: 'return ARGV[1] * ARGV[1];',
NUMBER_OF_KEYS: 0,
Expand Down Expand Up @@ -171,6 +165,28 @@ describe('Client', () => {
}
});

testUtils.testWithClient('client.sendCommand should reply with error', async client => {
await assert.rejects(
promisify(client.sendCommand).call(client, '1', '2')
);
}, {
...GLOBAL.SERVERS.OPEN,
clientOptions: {
legacyMode: true
}
});

testUtils.testWithClient('client.hGetAll should reply with error', async client => {
await assert.rejects(
promisify(client.hGetAll).call(client)
);
}, {
...GLOBAL.SERVERS.OPEN,
clientOptions: {
legacyMode: true
}
});

testUtils.testWithClient('client.v4.sendCommand should return a promise', async client => {
assert.equal(
await client.v4.sendCommand(['PING']),
Expand Down Expand Up @@ -347,19 +363,6 @@ describe('Client', () => {
legacyMode: true
}
});

testUtils.testWithClient('pingInterval', async client => {
assert.deepEqual(
await once(client, 'ping-interval'),
['PONG']
);
}, {
...GLOBAL.SERVERS.OPEN,
clientOptions: {
legacyMode: true,
pingInterval: 1
}
});
});

describe('events', () => {
Expand Down Expand Up @@ -823,34 +826,7 @@ describe('Client', () => {
}
}, GLOBAL.SERVERS.OPEN);

testUtils.testWithClient('should be able to PING in PubSub mode', async client => {
await client.connect();

try {
await client.subscribe('channel', () => {
// noop
});

const [string, buffer, customString, customBuffer] = await Promise.all([
client.ping(),
client.ping(client.commandOptions({ returnBuffers: true })),
client.ping('custom'),
client.ping(client.commandOptions({ returnBuffers: true }), 'custom')
]);

assert.equal(string, 'pong');
assert.deepEqual(buffer, Buffer.from('pong'));
assert.equal(customString, 'custom');
assert.deepEqual(customBuffer, Buffer.from('custom'));
} finally {
await client.disconnect();
}
}, {
...GLOBAL.SERVERS.OPEN,
disableClientSetup: true
});

testUtils.testWithClient('should be able to QUIT in PubSub mode', async client => {
testUtils.testWithClient('should be able to quit in PubSub mode', async client => {
await client.subscribe('channel', () => {
// noop
});
Expand All @@ -859,122 +835,6 @@ describe('Client', () => {

assert.equal(client.isOpen, false);
}, GLOBAL.SERVERS.OPEN);

testUtils.testWithClient('should reject GET in PubSub mode', async client => {
await client.connect();

try {
await client.subscribe('channel', () => {
// noop
});

await assert.rejects(client.get('key'), ErrorReply);
} finally {
await client.disconnect();
}
}, {
...GLOBAL.SERVERS.OPEN,
disableClientSetup: true
});

describe('shareded PubSub', () => {
testUtils.isVersionGreaterThanHook([7]);

testUtils.testWithClient('should be able to receive messages', async publisher => {
const subscriber = publisher.duplicate();

await subscriber.connect();

try {
const listener = spy();
await subscriber.sSubscribe('channel', listener);

await Promise.all([
waitTillBeenCalled(listener),
publisher.sPublish('channel', 'message')
]);

assert.ok(listener.calledOnceWithExactly('message', 'channel'));

await subscriber.sUnsubscribe();

// should be able to send commands
await assert.doesNotReject(subscriber.ping());
} finally {
await subscriber.disconnect();
}
}, {
...GLOBAL.SERVERS.OPEN
});

testUtils.testWithClient('should emit sharded-channel-moved event', async publisher => {
await publisher.clusterAddSlotsRange({ start: 0, end: 16383 });

const subscriber = publisher.duplicate();

await subscriber.connect();

try {
await subscriber.sSubscribe('channel', () => {});

await Promise.all([
publisher.clusterSetSlot(
calculateSlot('channel'),
ClusterSlotStates.NODE,
await publisher.clusterMyId()
),
once(subscriber, 'sharded-channel-moved')
]);

assert.equal(
await subscriber.ping(),
'PONG'
);
} finally {
await subscriber.disconnect();
}
}, {
serverArguments: ['--cluster-enabled', 'yes']
});
});

testUtils.testWithClient('should handle errors in SUBSCRIBE', async publisher => {
const subscriber = publisher.duplicate();

await subscriber.connect();

try {
const listener1 = spy();
await subscriber.subscribe('1', listener1);

await publisher.aclSetUser('default', 'resetchannels');


const listener2 = spy();
await assert.rejects(subscriber.subscribe('2', listener2));

await Promise.all([
waitTillBeenCalled(listener1),
publisher.aclSetUser('default', 'allchannels'),
publisher.publish('1', 'message'),
]);
assert.ok(listener1.calledOnceWithExactly('message', '1'));

await subscriber.subscribe('2', listener2);

await Promise.all([
waitTillBeenCalled(listener2),
publisher.publish('2', 'message'),
]);
assert.ok(listener2.calledOnceWithExactly('message', '2'));
} finally {
await subscriber.disconnect();
}
}, {
// this test change ACL rules, running in isolated server
serverArguments: [],
minimumDockerVersion: [6 ,2] // ACL PubSub rules were added in Redis 6.2
});
});

testUtils.testWithClient('ConnectionTimeoutError', async client => {
Expand Down
19 changes: 11 additions & 8 deletions packages/client/lib/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { ClientClosedError, ClientOfflineError, DisconnectsClientError } from '.
import { URL } from 'url';
import { TcpSocketConnectOpts } from 'net';
import { PubSubType, PubSubListener, PubSubTypeListeners, ChannelListeners } from './pub-sub';
import { callbackify } from 'util';

export interface RedisClientOptions<
M extends RedisModules = RedisModules,
Expand Down Expand Up @@ -343,7 +344,9 @@ export default class RedisClient<
(this as any).sendCommand = (...args: Array<any>): void => {
const result = this.#legacySendCommand(...args);
if (result) {
result.promise.then(reply => result.callback(null, reply));
result.promise
.then(reply => result.callback(null, reply))
.catch(err => result.callback(err));
}
};

Expand Down Expand Up @@ -380,18 +383,18 @@ export default class RedisClient<
promise.catch(err => this.emit('error', err));
}

#defineLegacyCommand(this: any, name: string, command?: RedisCommand): void {
this.#v4[name] = this[name].bind(this);
this[name] = command && command.TRANSFORM_LEGACY_REPLY && command.transformReply ?
#defineLegacyCommand(name: string, command?: RedisCommand): void {
this.#v4[name] = (this as any)[name].bind(this);
(this as any)[name] = command && command.TRANSFORM_LEGACY_REPLY && command.transformReply ?
(...args: Array<unknown>) => {
const result = this.#legacySendCommand(name, ...args);
if (result) {
result.promise.then((reply: any) => {
result.callback(null, command.transformReply!(reply));
});
result.promise
.then(reply => result.callback(null, command.transformReply!(reply)))
.catch(err => result.callback(err));
}
} :
(...args: Array<unknown>) => this.sendCommand(name, ...args);
(...args: Array<unknown>) => (this as any).sendCommand(name, ...args);
}

#pingTimer?: NodeJS.Timer;
Expand Down