Skip to content

Commit

Permalink
fix(client): Isolate exceptional logic to xAutoClaim
Browse files Browse the repository at this point in the history
  • Loading branch information
Calyhre committed Jul 19, 2023
1 parent 0d8d05e commit 518eb1a
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 41 deletions.
57 changes: 56 additions & 1 deletion packages/client/lib/commands/XAUTOCLAIM.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ describe('XAUTOCLAIM', () => {
});
});

testUtils.testWithClient('client.xAutoClaim', async client => {
testUtils.testWithClient('client.xAutoClaim without messages', async client => {
await Promise.all([
client.xGroupCreate('key', 'group', '$', {
MKSTREAM: true
Expand All @@ -39,4 +39,59 @@ describe('XAUTOCLAIM', () => {
}
);
}, GLOBAL.SERVERS.OPEN);

testUtils.testWithClient('client.xAutoClaim with messages', async client => {
const [,,id,] = await Promise.all([
client.xGroupCreate('key', 'group', '$', {
MKSTREAM: true
}),
client.xGroupCreateConsumer('key', 'group', 'consumer'),
client.xAdd('key', '*', { foo: 'bar' }),
client.xReadGroup('group', 'consumer', { key: 'key', id: '>' })
]);

assert.deepEqual(
await client.xAutoClaim('key', 'group', 'consumer', 1, '0-0'),
{
nextId: '0-0',
messages: [{
id,
message: Object.create(null, { 'foo': {
value: 'bar',
configurable: true,
enumerable: true
} })
}]
}
);
}, GLOBAL.SERVERS.OPEN);

testUtils.testWithClient('client.xAutoClaim with trimmed messages', async client => {
const [,,,,,id2,] = await Promise.all([
client.xGroupCreate('key', 'group', '$', {
MKSTREAM: true
}),
client.xGroupCreateConsumer('key', 'group', 'consumer'),
client.xAdd('key', '*', { foo: 'bar' }),
client.xReadGroup('group', 'consumer', { key: 'key', id: '>' }),
client.xTrim('key', 'MAXLEN', 0),
client.xAdd('key', '*', { bar: 'baz' }),
client.xReadGroup('group', 'consumer', { key: 'key', id: '>' }),
]);

assert.deepEqual(
await client.xAutoClaim('key', 'group', 'consumer', 1, '0-0'),
{
nextId: '0-0',
messages: [{
id: id2,
message: Object.create(null, { 'bar': {
value: 'baz',
configurable: true,
enumerable: true
} })
}]
}
);
}, GLOBAL.SERVERS.OPEN);
});
26 changes: 23 additions & 3 deletions packages/client/lib/commands/XAUTOCLAIM.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { RedisCommandArgument, RedisCommandArguments } from '.';
import { StreamMessagesReply, transformStreamMessagesReply } from './generic-transformers';
import { StreamMessageReply, transformTuplesReply } from './generic-transformers';

export const FIRST_KEY_INDEX = 1;

Expand Down Expand Up @@ -28,12 +28,32 @@ type XAutoClaimRawReply = [RedisCommandArgument, Array<any>];

interface XAutoClaimReply {
nextId: RedisCommandArgument;
messages: StreamMessagesReply;
messages: XAutoClaimMessagesReply;
}

type XAutoClaimMessagesReply = Array<StreamMessageReply | null>;

function transformXAutoClaimMessagesReply(reply: Array<any>): XAutoClaimMessagesReply {
const messages = [];

for (const tuple of reply) {
if (tuple === null) {
continue;
}

const [id, message] = tuple;
messages.push({
id,
message: transformTuplesReply(message)
});
}

return messages;
}

export function transformReply(reply: XAutoClaimRawReply): XAutoClaimReply {
return {
nextId: reply[0],
messages: transformStreamMessagesReply(reply[1])
messages: transformXAutoClaimMessagesReply(reply[1])
};
}
57 changes: 24 additions & 33 deletions packages/client/lib/commands/generic-transformers.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -194,39 +194,30 @@ describe('Generic Transformers', () => {
);
});

describe('transformStreamMessagesReply', () => {
it('with null', () => {
assert.deepEqual(
transformStreamMessagesReply([null]),
[]
);
})

it('with messages', () => {
assert.deepEqual(
transformStreamMessagesReply([['0-0', ['0key', '0value']], ['1-0', ['1key', '1value']]]),
[{
id: '0-0',
message: Object.create(null, {
'0key': {
value: '0value',
configurable: true,
enumerable: true
}
})
}, {
id: '1-0',
message: Object.create(null, {
'1key': {
value: '1value',
configurable: true,
enumerable: true
}
})
}]
);
})
})
it('transformStreamMessagesReply', () => {
assert.deepEqual(
transformStreamMessagesReply([['0-0', ['0key', '0value']], ['1-0', ['1key', '1value']]]),
[{
id: '0-0',
message: Object.create(null, {
'0key': {
value: '0value',
configurable: true,
enumerable: true
}
})
}, {
id: '1-0',
message: Object.create(null, {
'1key': {
value: '1value',
configurable: true,
enumerable: true
}
})
}]
);
});

describe('transformStreamsMessagesReply', () => {
it('null', () => {
Expand Down
4 changes: 0 additions & 4 deletions packages/client/lib/commands/generic-transformers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,6 @@ export type StreamMessagesReply = Array<StreamMessageReply>;
export function transformStreamMessagesReply(reply: Array<any>): StreamMessagesReply {
const messages = [];

if (reply[0] === null) {
return messages;
}

for (const [id, message] of reply) {
messages.push({
id,
Expand Down

0 comments on commit 518eb1a

Please sign in to comment.