From fa8a93ac188d1a96ed0ca95d7a5d17cde7199df8 Mon Sep 17 00:00:00 2001 From: Brent Layne Date: Wed, 10 Jan 2024 10:14:10 -0500 Subject: [PATCH 1/4] Confirm the client isOpen before disconnecting --- packages/client/lib/cluster/cluster-slots.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/client/lib/cluster/cluster-slots.ts b/packages/client/lib/cluster/cluster-slots.ts index b540c2fa85f..b1cc49b4c82 100644 --- a/packages/client/lib/cluster/cluster-slots.ts +++ b/packages/client/lib/cluster/cluster-slots.ts @@ -562,7 +562,7 @@ export default class RedisClusterSlots< const client = await this.getPubSubClient(); await unsubscribe(client); - if (!client.isPubSubActive) { + if (!client.isPubSubActive && client.isOpen) { await client.disconnect(); this.pubSubNode = undefined; } @@ -613,7 +613,7 @@ export default class RedisClusterSlots< const client = await master.pubSubClient; await unsubscribe(client); - if (!client.isPubSubActive) { + if (!client.isPubSubActive && client.isOpen) { await client.disconnect(); master.pubSubClient = undefined; } From 9720e08d28545a07ab005db666e765665578342a Mon Sep 17 00:00:00 2001 From: Brent Layne Date: Wed, 10 Jan 2024 10:14:43 -0500 Subject: [PATCH 2/4] Write tests --- packages/client/lib/cluster/index.spec.ts | 32 +++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/packages/client/lib/cluster/index.spec.ts b/packages/client/lib/cluster/index.spec.ts index 8200375056a..425bc29dcf8 100644 --- a/packages/client/lib/cluster/index.spec.ts +++ b/packages/client/lib/cluster/index.spec.ts @@ -235,6 +235,20 @@ describe('Cluster', () => { assert.equal(cluster.pubSubNode, undefined); }, GLOBAL.CLUSTERS.OPEN); + + testUtils.testWithCluster('concurrent unsubscribe does not throw an error', async cluster => { + const listener = spy(); + + await cluster.subscribe('channel', listener); + await cluster.subscribe('channel2', listener); + + await Promise.all([ + cluster.unsubscribe('channel', listener), + cluster.unsubscribe('channel2', listener) + ]); + + assert.equal(cluster.isOpen, false); + }, GLOBAL.CLUSTERS.OPEN); testUtils.testWithCluster('psubscribe & punsubscribe', async cluster => { const listener = spy(); @@ -323,6 +337,24 @@ describe('Cluster', () => { minimumDockerVersion: [7] }); + testUtils.testWithCluster('concurrent sunsubscribe does not throw an error', async cluster => { + const listener = spy(); + + await cluster.sSubscribe('channel', listener); + await cluster.sSubscribe('channel2', listener); + + await Promise.all([ + cluster.sUnsubscribe('channel', listener), + cluster.sUnsubscribe('channel2', listener) + ]) + + + assert.equal(cluster.isOpen, false); + }, { + ...GLOBAL.CLUSTERS.OPEN, + minimumDockerVersion: [7] + }); + testUtils.testWithCluster('should handle sharded-channel-moved events', async cluster => { const SLOT = 10328, migrating = cluster.slots[SLOT].master, From db1cd5f8afc623cd6aa13e94c39c2bf4f6320628 Mon Sep 17 00:00:00 2001 From: Leibale Eidelman Date: Tue, 16 Jan 2024 15:12:19 -0500 Subject: [PATCH 3/4] fix tests --- packages/client/lib/cluster/index.spec.ts | 29 ++++++++++------------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/packages/client/lib/cluster/index.spec.ts b/packages/client/lib/cluster/index.spec.ts index 425bc29dcf8..4ae41c026cb 100644 --- a/packages/client/lib/cluster/index.spec.ts +++ b/packages/client/lib/cluster/index.spec.ts @@ -236,18 +236,18 @@ describe('Cluster', () => { assert.equal(cluster.pubSubNode, undefined); }, GLOBAL.CLUSTERS.OPEN); - testUtils.testWithCluster('concurrent unsubscribe does not throw an error', async cluster => { + testUtils.testWithCluster('concurrent UNSUBSCRIBE does not throw an error (#2685)', async cluster => { const listener = spy(); - await cluster.subscribe('channel', listener); - await cluster.subscribe('channel2', listener); + await Promise.all([ + cluster.subscribe('1', listener), + cluster.subscribe('2', listener) + ]); await Promise.all([ - cluster.unsubscribe('channel', listener), - cluster.unsubscribe('channel2', listener) + cluster.unsubscribe('1', listener), + cluster.unsubscribe('2', listener) ]); - - assert.equal(cluster.isOpen, false); }, GLOBAL.CLUSTERS.OPEN); testUtils.testWithCluster('psubscribe & punsubscribe', async cluster => { @@ -337,19 +337,16 @@ describe('Cluster', () => { minimumDockerVersion: [7] }); - testUtils.testWithCluster('concurrent sunsubscribe does not throw an error', async cluster => { + testUtils.testWithCluster('concurrent SUNSUBCRIBE does not throw an error (#2685)', async cluster => { const listener = spy(); - - await cluster.sSubscribe('channel', listener); - await cluster.sSubscribe('channel2', listener); - + await Promise.all([ + await cluster.sSubscribe('channel', listener), + await cluster.sSubscribe('channel2', listener) + ]); await Promise.all([ cluster.sUnsubscribe('channel', listener), cluster.sUnsubscribe('channel2', listener) - ]) - - - assert.equal(cluster.isOpen, false); + ]); }, { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [7] From 4d7d5412113aa91948f8802d7e034a451620837e Mon Sep 17 00:00:00 2001 From: Leibale Eidelman Date: Tue, 16 Jan 2024 15:13:41 -0500 Subject: [PATCH 4/4] fix tests --- packages/client/lib/cluster/index.spec.ts | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/packages/client/lib/cluster/index.spec.ts b/packages/client/lib/cluster/index.spec.ts index 4ae41c026cb..569d716272a 100644 --- a/packages/client/lib/cluster/index.spec.ts +++ b/packages/client/lib/cluster/index.spec.ts @@ -238,12 +238,10 @@ describe('Cluster', () => { testUtils.testWithCluster('concurrent UNSUBSCRIBE does not throw an error (#2685)', async cluster => { const listener = spy(); - await Promise.all([ cluster.subscribe('1', listener), cluster.subscribe('2', listener) ]); - await Promise.all([ cluster.unsubscribe('1', listener), cluster.unsubscribe('2', listener) @@ -340,12 +338,12 @@ describe('Cluster', () => { testUtils.testWithCluster('concurrent SUNSUBCRIBE does not throw an error (#2685)', async cluster => { const listener = spy(); await Promise.all([ - await cluster.sSubscribe('channel', listener), - await cluster.sSubscribe('channel2', listener) + await cluster.sSubscribe('1', listener), + await cluster.sSubscribe('2', listener) ]); await Promise.all([ - cluster.sUnsubscribe('channel', listener), - cluster.sUnsubscribe('channel2', listener) + cluster.sUnsubscribe('1', listener), + cluster.sUnsubscribe('2', listener) ]); }, { ...GLOBAL.CLUSTERS.OPEN,