Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Node: Add FUNCTION KILL command #2114

Merged
merged 7 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#### Changes
* Node: Added FUNCTION KILL command ([#2114](https://github.com/valkey-io/valkey-glide/pull/2114))
* Node: Added XPENDING commands ([#2085](https://github.com/valkey-io/valkey-glide/pull/2085))
* Node: Added XINFO CONSUMERS command ([#2093](https://github.com/valkey-io/valkey-glide/pull/2093))
* Node: Added HRANDFIELD command ([#2096](https://github.com/valkey-io/valkey-glide/pull/2096))
Expand Down
4 changes: 2 additions & 2 deletions java/integTest/src/test/java/glide/TestUtilities.java
Original file line number Diff line number Diff line change
Expand Up @@ -323,8 +323,8 @@ public static GlideString generateLuaLibCodeBinary(
}

/**
* Create a lua lib with a RO function which runs an endless loop up to timeout sec.<br>
* Execution takes at least 5 sec regardless of the timeout configured.<br>
* Create a lua lib with a function which runs an endless loop up to timeout sec.<br>
* Execution takes at least 5 sec regardless of the timeout configured.
*/
public static String createLuaLibWithLongRunningFunction(
String libName, String funcName, int timeout, boolean readOnly) {
Expand Down
5 changes: 5 additions & 0 deletions node/src/Commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2241,6 +2241,11 @@ export function createFunctionStats(): command_request.Command {
return createCommand(RequestType.FunctionStats, []);
}

/** @internal */
export function createFunctionKill(): command_request.Command {
return createCommand(RequestType.FunctionKill, []);
}

/**
* Represents offsets specifying a string interval to analyze in the {@link BaseClient.bitcount|bitcount} command. The offsets are
* zero-based indexes, with `0` being the first index of the string, `1` being the next index and so on.
Expand Down
19 changes: 19 additions & 0 deletions node/src/GlideClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import {
createFlushDB,
createFunctionDelete,
createFunctionFlush,
createFunctionKill,
createFunctionList,
createFunctionLoad,
createFunctionStats,
Expand Down Expand Up @@ -620,6 +621,24 @@ export class GlideClient extends BaseClient {
return this.createWritePromise(createFunctionStats());
}

/**
* Kills a function that is currently executing.
* `FUNCTION KILL` terminates read-only functions only.
*
* See https://valkey.io/commands/function-kill/ for details.
*
* since Valkey version 7.0.0.
*
* @returns `OK` if function is terminated. Otherwise, throws an error.
* @example
* ```typescript
* await client.functionKill();
* ```
*/
public async functionKill(): Promise<"OK"> {
return this.createWritePromise(createFunctionKill());
}

/**
* Deletes all the keys of all the existing databases. This command never fails.
*
Expand Down
24 changes: 24 additions & 0 deletions node/src/GlideClusterClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import {
createFlushDB,
createFunctionDelete,
createFunctionFlush,
createFunctionKill,
createFunctionList,
createFunctionLoad,
createFunctionStats,
Expand Down Expand Up @@ -953,6 +954,29 @@ export class GlideClusterClient extends BaseClient {
);
}

/**
* Kills a function that is currently executing.
* `FUNCTION KILL` terminates read-only functions only.
*
* See https://valkey.io/commands/function-kill/ for details.
*
* since Valkey version 7.0.0.
*
* @param route - The client will route the command to the nodes defined by `route`.
* If not defined, the command will be routed to all primary nodes.
* @returns `OK` if function is terminated. Otherwise, throws an error.
* @example
* ```typescript
* await client.functionKill();
* ```
*/
public async functionKill(route?: Routes): Promise<"OK"> {
return this.createWritePromise(
createFunctionKill(),
toProtobufRoute(route),
);
}

/**
* Deletes all the keys of all the existing databases. This command never fails.
*
Expand Down
149 changes: 149 additions & 0 deletions node/tests/GlideClient.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
ListDirection,
ProtocolVersion,
RequestError,
ReturnType,
Transaction,
} from "..";
import { RedisCluster } from "../../utils/TestUtils.js";
Expand All @@ -27,6 +28,7 @@ import {
checkFunctionListResponse,
checkFunctionStatsResponse,
convertStringArrayToBuffer,
createLuaLibWithLongRunningFunction,
flushAndCloseClient,
generateLuaLibCode,
getClientConfigurationOption,
Expand All @@ -35,6 +37,7 @@ import {
parseEndpoints,
transactionTest,
validateTransactionResponse,
waitForNotBusy,
} from "./TestUtilities";

/* eslint-disable @typescript-eslint/no-var-requires */
Expand Down Expand Up @@ -733,6 +736,152 @@ describe("GlideClient", () => {
},
);

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
"function kill RO func %p",
async (protocol) => {
if (cluster.checkIfServerVersionLessThan("7.0.0")) return;

const config = getClientConfigurationOption(
cluster.getAddresses(),
protocol,
10000,
);
const client = await GlideClient.createClient(config);
const testClient = await GlideClient.createClient(config);

try {
const libName = "function_kill_no_write";
const funcName = "deadlock_no_write";
const code = createLuaLibWithLongRunningFunction(
libName,
funcName,
6,
true,
);
expect(await client.functionFlush()).toEqual("OK");
// nothing to kill
await expect(client.functionKill()).rejects.toThrow(/notbusy/i);

// load the lib
expect(await client.functionLoad(code, true)).toEqual(libName);

try {
// call the function without await
testClient
.fcall(funcName, [], [])
.catch((e) =>
expect((e as Error).message).toContain(
"Script killed",
),
);

let killed = false;
let timeout = 4000;
await new Promise((resolve) => setTimeout(resolve, 1000));

while (timeout >= 0) {
try {
expect(await client.functionKill()).toEqual("OK");
killed = true;
break;
} catch {
/* do nothing */
}

await new Promise((resolve) =>
setTimeout(resolve, 500),
);
timeout -= 500;
}

expect(killed).toBeTruthy();
} finally {
waitForNotBusy(client);
}
} finally {
expect(await client.functionFlush()).toEqual("OK");
client.close();
}
},
);

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
"function kill RW func %p",
async (protocol) => {
if (cluster.checkIfServerVersionLessThan("7.0.0")) return;

const config = getClientConfigurationOption(
cluster.getAddresses(),
protocol,
10000,
);
const client = await GlideClient.createClient(config);
const testClient = await GlideClient.createClient(config);

try {
const libName = "function_kill_write";
const key = libName;
const funcName = "deadlock_write";
const code = createLuaLibWithLongRunningFunction(
libName,
funcName,
6,
false,
);
expect(await client.functionFlush()).toEqual("OK");
// nothing to kill
await expect(client.functionKill()).rejects.toThrow(/notbusy/i);

// load the lib
expect(await client.functionLoad(code, true)).toEqual(libName);

let promise = new Promise<ReturnType>(() => null);

try {
// call the function without await
promise = testClient.fcall(funcName, [key], []);

let foundUnkillable = false;
let timeout = 4000;
await new Promise((resolve) => setTimeout(resolve, 1000));

while (timeout >= 0) {
try {
// valkey kills a function with 5 sec delay
// but this will always throw an error in the test
await client.functionKill();
} catch (err) {
// looking for an error with "unkillable" in the message
// at that point we can break the loop
if (
(err as Error).message
.toLowerCase()
.includes("unkillable")
) {
foundUnkillable = true;
break;
}
}

await new Promise((resolve) =>
setTimeout(resolve, 500),
);
timeout -= 500;
}

expect(foundUnkillable).toBeTruthy();
} finally {
// If function wasn't killed, and it didn't time out - it blocks the server and cause rest
// test to fail. Wait for the function to complete (we cannot kill it)
expect(await promise).toContain("Timed out");
}
} finally {
expect(await client.functionFlush()).toEqual("OK");
client.close();
}
},
);

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
"sort sortstore sort_store sortro sort_ro sortreadonly test_%p",
async (protocol) => {
Expand Down
Loading
Loading