Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Node: Add Binary support for stream commands, part 2 #2222

53 changes: 36 additions & 17 deletions node/src/BaseClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,14 @@ export type DecoderOption = {
decoder?: Decoder;
};

/** A replacement for `Record<GlideString, T>` - array of key-value pairs. */
export type GlideRecord<T> = {
/** The value name. */
key: GlideString;
/** The value itself. */
value: T;
}[];

/**
* This function converts an input from HashDataType or Record types to HashDataType.
*
Expand Down Expand Up @@ -3481,7 +3489,7 @@ export class BaseClient {
* ```
*/
public async xrange(
key: string,
key: GlideString,
start: Boundary<string>,
end: Boundary<string>,
count?: number,
Expand Down Expand Up @@ -3521,7 +3529,7 @@ export class BaseClient {
* ```
*/
public async xrevrange(
key: string,
key: GlideString,
end: Boundary<string>,
start: Boundary<string>,
count?: number,
Expand Down Expand Up @@ -4809,7 +4817,7 @@ export class BaseClient {
* @returns The number of entries deleted from the stream. If `key` doesn't exist, 0 is returned.
*/
public async xtrim(
key: string,
key: GlideString,
options: StreamTrimOptions,
): Promise<number> {
return this.createWritePromise(createXTrim(key, options));
Expand Down Expand Up @@ -4841,9 +4849,14 @@ export class BaseClient {
* ```
*/
public async xread(
keys_and_ids: Record<string, string>,
keys_and_ids: Record<string, GlideString> | GlideRecord<GlideString>,
options?: StreamReadOptions,
): Promise<Record<string, Record<string, [string, string][]>>> {
if (!Array.isArray(keys_and_ids)) {
keys_and_ids = Object.entries(keys_and_ids).map((e) => {
return {key: e[0], value: e[1]}
})
}
return this.createWritePromise(createXRead(keys_and_ids, options));
}

Expand Down Expand Up @@ -4878,14 +4891,19 @@ export class BaseClient {
* ```
*/
public async xreadgroup(
group: string,
consumer: string,
keys_and_ids: Record<string, string>,
group: GlideString,
consumer: GlideString,
keys_and_ids: Record<string, GlideString> | GlideRecord<GlideString>,
options?: StreamReadGroupOptions,
): Promise<Record<
string,
Record<string, [string, string][] | null>
> | null> {
if (!Array.isArray(keys_and_ids)) {
keys_and_ids = Object.entries(keys_and_ids).map((e) => {
return {key: e[0], value: e[1]}
})
}
return this.createWritePromise(
createXReadGroup(group, consumer, keys_and_ids, options),
);
Expand All @@ -4905,7 +4923,7 @@ export class BaseClient {
* console.log(numEntries); // Output: 2 - "my_stream" contains 2 entries.
* ```
*/
public async xlen(key: string): Promise<number> {
public async xlen(key: GlideString): Promise<number> {
return this.createWritePromise(createXLen(key));
}

Expand All @@ -4932,9 +4950,9 @@ export class BaseClient {
* ```
*/
public async xpending(
key: string,
group: string,
): Promise<[number, string, string, [string, number][]]> {
key: GlideString,
group: GlideString,
): Promise<[number, GlideString, GlideString, [GlideString, number][]]> {
return this.createWritePromise(createXPending(key, group));
}

Expand Down Expand Up @@ -4973,10 +4991,10 @@ export class BaseClient {
* ```
*/
public async xpendingWithOptions(
key: string,
group: string,
key: GlideString,
group: GlideString,
options: StreamPendingOptions,
): Promise<[string, string, number, number][]> {
): Promise<[GlideString, GlideString, number, number][]> {
return this.createWritePromise(createXPending(key, group, options));
}

Expand Down Expand Up @@ -5007,8 +5025,9 @@ export class BaseClient {
* ```
*/
public async xinfoConsumers(
key: string,
group: string,
key: GlideString,
group: GlideString,
// TODO: change return type to be compatible with GlideString
): Promise<Record<string, string | number>[]> {
return this.createWritePromise(createXInfoConsumers(key, group));
}
Expand Down Expand Up @@ -5356,7 +5375,7 @@ export class BaseClient {
* ```
*/
public async xinfoStream(
key: string,
key: GlideString,
fullOptions?: boolean | number,
): Promise<ReturnTypeXinfoStream> {
return this.createWritePromise(
Expand Down
104 changes: 52 additions & 52 deletions node/src/Commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { createLeakedStringVec, MAX_REQUEST_ARGS_LEN } from "glide-rs";
import Long from "long";

/* eslint-disable-next-line @typescript-eslint/no-unused-vars */
import { BaseClient, HashDataType } from "src/BaseClient";
import { BaseClient, GlideRecord, HashDataType } from "src/BaseClient";
/* eslint-disable-next-line @typescript-eslint/no-unused-vars */
import { GlideClient } from "src/GlideClient";
/* eslint-disable-next-line @typescript-eslint/no-unused-vars */
Expand Down Expand Up @@ -141,10 +141,10 @@ export type SetOptions = {
* `KEEPTTL` in the Redis API.
*/
| "keepExisting"
| {
type: TimeUnit;
count: number;
};
| {
type: TimeUnit;
count: number;
};
};

/**
Expand Down Expand Up @@ -1392,7 +1392,7 @@ export function createZAdd(
if (options.conditionalChange) {
if (
options.conditionalChange ===
ConditionalChange.ONLY_IF_DOES_NOT_EXIST &&
ConditionalChange.ONLY_IF_DOES_NOT_EXIST &&
options.updateOptions
) {
throw new Error(
Expand Down Expand Up @@ -1640,15 +1640,15 @@ export type Boundary<T> =
* Represents a specific boundary.
*/
| {
/**
* The comparison value.
*/
value: T;
/**
* Whether the value is inclusive. Defaults to `true`.
*/
isInclusive?: boolean;
};
/**
* The comparison value.
*/
value: T;
/**
* Whether the value is inclusive. Defaults to `true`.
*/
isInclusive?: boolean;
};

/**
* Represents a range by index (rank) in a sorted set.
Expand Down Expand Up @@ -2015,21 +2015,21 @@ export function createZRank(

export type StreamTrimOptions = (
| {
/**
* Trim the stream according to entry ID.
* Equivalent to `MINID` in the Redis API.
*/
method: "minid";
threshold: string;
}
/**
* Trim the stream according to entry ID.
* Equivalent to `MINID` in the Redis API.
*/
method: "minid";
threshold: GlideString;
}
| {
/**
* Trim the stream according to length.
* Equivalent to `MAXLEN` in the Redis API.
*/
method: "maxlen";
threshold: number;
}
/**
* Trim the stream according to length.
* Equivalent to `MAXLEN` in the Redis API.
*/
method: "maxlen";
threshold: number;
}
) & {
/**
* If `true`, the stream will be trimmed exactly. Equivalent to `=` in the
Expand Down Expand Up @@ -2130,7 +2130,7 @@ export function createXDel(
* @internal
*/
export function createXTrim(
key: string,
key: GlideString,
options: StreamTrimOptions,
): command_request.Command {
const args = [key];
Expand All @@ -2142,7 +2142,7 @@ export function createXTrim(
* @internal
*/
export function createXRange(
key: string,
key: GlideString,
start: Boundary<string>,
end: Boundary<string>,
count?: number,
Expand All @@ -2161,7 +2161,7 @@ export function createXRange(
* @internal
*/
export function createXRevRange(
key: string,
key: GlideString,
start: Boundary<string>,
end: Boundary<string>,
count?: number,
Expand Down Expand Up @@ -2538,7 +2538,7 @@ export type StreamReadGroupOptions = StreamReadOptions & {
};

/** @internal */
function addReadOptions(options?: StreamReadOptions): string[] {
function addReadOptions(options?: StreamReadOptions): GlideString[] {
const args = [];

if (options?.count !== undefined) {
Expand All @@ -2555,35 +2555,34 @@ function addReadOptions(options?: StreamReadOptions): string[] {
}

/** @internal */
function addStreamsArgs(keys_and_ids: Record<string, string>): string[] {
function addStreamsArgs(keys_and_ids: GlideRecord<GlideString>): GlideString[] {
return [
"STREAMS",
...Object.keys(keys_and_ids),
...Object.values(keys_and_ids),
...keys_and_ids.map((e) => e.key),
...keys_and_ids.map((e) => e.value)
];
}

/**
* @internal
*/
export function createXRead(
keys_and_ids: Record<string, string>,
keys_and_ids: GlideRecord<GlideString>,
options?: StreamReadOptions,
): command_request.Command {
const args = addReadOptions(options);
args.push(...addStreamsArgs(keys_and_ids));

return createCommand(RequestType.XRead, args);
}

/** @internal */
export function createXReadGroup(
group: string,
consumer: string,
keys_and_ids: Record<string, string>,
group: GlideString,
consumer: GlideString,
keys_and_ids: GlideRecord<GlideString>,
options?: StreamReadGroupOptions,
): command_request.Command {
const args: string[] = ["GROUP", group, consumer];
const args: GlideString[] = ["GROUP", group, consumer];

if (options) {
args.push(...addReadOptions(options));
Expand All @@ -2598,10 +2597,11 @@ export function createXReadGroup(
/**
* Represents a the return type for XInfo Stream in the response
*/
// TODO: change return type to be compatible with GlideString
export type ReturnTypeXinfoStream = {
[key: string]:
| StreamEntries
| Record<string, StreamEntries | Record<string, StreamEntries>[]>[];
| StreamEntries
| Record<string, StreamEntries | Record<string, StreamEntries>[]>[];
};

/**
Expand All @@ -2613,10 +2613,10 @@ export type StreamEntries = string | number | (string | number | string[])[][];
* @internal
*/
export function createXInfoStream(
key: string,
key: GlideString,
options: boolean | number,
): command_request.Command {
const args: string[] = [key];
const args: GlideString[] = [key];

if (options != false) {
args.push("FULL");
Expand All @@ -2638,7 +2638,7 @@ export function createXInfoGroups(key: string): command_request.Command {
/**
* @internal
*/
export function createXLen(key: string): command_request.Command {
export function createXLen(key: GlideString): command_request.Command {
return createCommand(RequestType.XLen, [key]);
}

Expand All @@ -2653,13 +2653,13 @@ export type StreamPendingOptions = {
/** Limit the number of messages returned. */
count: number;
/** Filter pending entries by consumer. */
consumer?: string;
consumer?: GlideString;
};

/** @internal */
export function createXPending(
key: string,
group: string,
key: GlideString,
group: GlideString,
options?: StreamPendingOptions,
): command_request.Command {
const args = [key, group];
Expand All @@ -2680,8 +2680,8 @@ export function createXPending(

/** @internal */
export function createXInfoConsumers(
key: string,
group: string,
key: GlideString,
group: GlideString,
): command_request.Command {
return createCommand(RequestType.XInfoConsumers, [key, group]);
}
Expand Down
Loading
Loading