Skip to content

Commit

Permalink
Cache CLUSTER SLOTS response for improving throughput and reduced lat…
Browse files Browse the repository at this point in the history
…ency. (valkey-io#53)

This commit adds a logic to cache `CLUSTER SLOTS` response for reduced
latency and also updates the cache when a change in the cluster is
detected.

Historically, `CLUSTER SLOTS` command was deprecated, however all the
server clients have been using `CLUSTER SLOTS` and have not migrated to
`CLUSTER SHARDS`. In future this logic can be added to any other
commands to improve the performance of the engine.

---------

Signed-off-by: Roshan Khatri <[email protected]>

convert centos 7 tests to almalinux 8
  • Loading branch information
roshkhatri authored and jonathanspw committed May 23, 2024
1 parent 7253862 commit 185d4fd
Show file tree
Hide file tree
Showing 13 changed files with 202 additions and 73 deletions.
50 changes: 22 additions & 28 deletions .github/workflows/daily.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ on:
inputs:
skipjobs:
description: 'jobs to skip (delete the ones you wanna keep, do not leave empty)'
default: 'valgrind,sanitizer,tls,freebsd,macos,alpine,32bit,iothreads,ubuntu,centos,malloc,specific,fortify,reply-schema'
default: 'valgrind,sanitizer,tls,freebsd,macos,alpine,32bit,iothreads,ubuntu,almalinux,malloc,specific,fortify,reply-schema'
skiptests:
description: 'tests to skip (delete the ones you wanna keep, do not leave empty)'
default: 'valkey,modules,sentinel,cluster,unittest'
Expand Down Expand Up @@ -672,12 +672,12 @@ jobs:
if: true && !contains(github.event.inputs.skiptests, 'unittest')
run: ./src/valkey-unit-tests --accurate

test-centos7-jemalloc:
test-almalinux8-jemalloc:
runs-on: ubuntu-latest
if: |
(github.event_name == 'workflow_dispatch' || (github.event_name != 'workflow_dispatch' && github.repository == 'valkey-io/valkey')) &&
!contains(github.event.inputs.skipjobs, 'centos')
container: centos:7
!contains(github.event.inputs.skipjobs, 'almalinux')
container: almalinux:8
timeout-minutes: 14400
steps:
- name: prep
Expand All @@ -689,18 +689,16 @@ jobs:
echo "skiptests: ${{github.event.inputs.skiptests}}"
echo "test_args: ${{github.event.inputs.test_args}}"
echo "cluster_test_args: ${{github.event.inputs.cluster_test_args}}"
# On centos7 actions/checkout@v4 does not work, so we use v3
# ref. https://github.com/actions/checkout/issues/1487
- uses: actions/checkout@f43a0e5ff2bd294095638e18286ca9a3d1956744 # v3.6.0
- uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1
with:
repository: ${{ env.GITHUB_REPOSITORY }}
ref: ${{ env.GITHUB_HEAD_REF }}
- name: make
run: |
yum -y install gcc make
dnf -y install gcc make
make SERVER_CFLAGS='-Werror'
- name: testprep
run: yum -y install which tcl tclx
run: dnf -y install which tcl tcltls
- name: test
if: true && !contains(github.event.inputs.skiptests, 'valkey')
run: ./runtest --accurate --verbose --dump-logs ${{github.event.inputs.test_args}}
Expand All @@ -714,12 +712,12 @@ jobs:
if: true && !contains(github.event.inputs.skiptests, 'cluster')
run: ./runtest-cluster ${{github.event.inputs.cluster_test_args}}

test-centos7-tls-module:
test-almalinux8-tls-module:
runs-on: ubuntu-latest
if: |
(github.event_name == 'workflow_dispatch' || (github.event_name != 'workflow_dispatch' && github.repository == 'valkey-io/valkey')) &&
!contains(github.event.inputs.skipjobs, 'tls')
container: centos:7
container: almalinux:8
timeout-minutes: 14400
steps:
- name: prep
Expand All @@ -731,20 +729,18 @@ jobs:
echo "skiptests: ${{github.event.inputs.skiptests}}"
echo "test_args: ${{github.event.inputs.test_args}}"
echo "cluster_test_args: ${{github.event.inputs.cluster_test_args}}"
# On centos7 actions/checkout@v4 does not work, so we use v3
# ref. https://github.com/actions/checkout/issues/1487
- uses: actions/checkout@f43a0e5ff2bd294095638e18286ca9a3d1956744 # v3.6.0
- uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1
with:
repository: ${{ env.GITHUB_REPOSITORY }}
ref: ${{ env.GITHUB_HEAD_REF }}
- name: make
run: |
yum -y install centos-release-scl epel-release
yum -y install devtoolset-7 openssl-devel openssl
scl enable devtoolset-7 "make BUILD_TLS=module SERVER_CFLAGS='-Werror'"
dnf -y install epel-release
dnf -y install make gcc openssl-devel openssl
make BUILD_TLS=module SERVER_CFLAGS='-Werror'
- name: testprep
run: |
yum -y install tcl tcltls tclx
dnf -y install tcl tcltls
./utils/gen-test-certs.sh
- name: test
if: true && !contains(github.event.inputs.skiptests, 'valkey')
Expand All @@ -763,12 +759,12 @@ jobs:
run: |
./runtest-cluster --tls-module ${{github.event.inputs.cluster_test_args}}
test-centos7-tls-module-no-tls:
test-almalinux8-tls-module-no-tls:
runs-on: ubuntu-latest
if: |
(github.event_name == 'workflow_dispatch' || (github.event_name != 'workflow_dispatch' && github.repository == 'valkey-io/valkey')) &&
!contains(github.event.inputs.skipjobs, 'tls')
container: centos:7
container: almalinux:8
timeout-minutes: 14400
steps:
- name: prep
Expand All @@ -780,20 +776,18 @@ jobs:
echo "skiptests: ${{github.event.inputs.skiptests}}"
echo "test_args: ${{github.event.inputs.test_args}}"
echo "cluster_test_args: ${{github.event.inputs.cluster_test_args}}"
# On centos7 actions/checkout@v4 does not work, so we use v3
# ref. https://github.com/actions/checkout/issues/1487
- uses: actions/checkout@f43a0e5ff2bd294095638e18286ca9a3d1956744 # v3.6.0
- uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1
with:
repository: ${{ env.GITHUB_REPOSITORY }}
ref: ${{ env.GITHUB_HEAD_REF }}
- name: make
run: |
yum -y install centos-release-scl epel-release
yum -y install devtoolset-7 openssl-devel openssl
scl enable devtoolset-7 "make BUILD_TLS=module SERVER_CFLAGS='-Werror'"
dnf -y install epel-release
dnf -y install make gcc openssl-devel openssl
make BUILD_TLS=module SERVER_CFLAGS='-Werror'
- name: testprep
run: |
yum -y install tcl tcltls tclx
dnf -y install tcl tcltls
./utils/gen-test-certs.sh
- name: test
if: true && !contains(github.event.inputs.skiptests, 'valkey')
Expand Down Expand Up @@ -1074,7 +1068,7 @@ jobs:
notify-about-job-results:
runs-on: ubuntu-latest
if: always() && github.event_name != 'workflow_dispatch' && github.repository == 'valkey-io/valkey'
needs: [test-ubuntu-jemalloc, test-ubuntu-jemalloc-fortify, test-ubuntu-libc-malloc, test-ubuntu-no-malloc-usable-size, test-ubuntu-32bit, test-ubuntu-tls, test-ubuntu-tls-no-tls, test-ubuntu-io-threads, test-ubuntu-reclaim-cache, test-valgrind-test, test-valgrind-misc, test-valgrind-no-malloc-usable-size-test, test-valgrind-no-malloc-usable-size-misc, test-sanitizer-address, test-sanitizer-undefined, test-centos7-jemalloc, test-centos7-tls-module, test-centos7-tls-module-no-tls, test-macos-latest, test-macos-latest-sentinel, test-macos-latest-cluster, build-macos, test-freebsd, test-alpine-jemalloc, test-alpine-libc-malloc, reply-schemas-validator]
needs: [test-ubuntu-jemalloc, test-ubuntu-jemalloc-fortify, test-ubuntu-libc-malloc, test-ubuntu-no-malloc-usable-size, test-ubuntu-32bit, test-ubuntu-tls, test-ubuntu-tls-no-tls, test-ubuntu-io-threads, test-ubuntu-reclaim-cache, test-valgrind-test, test-valgrind-misc, test-valgrind-no-malloc-usable-size-test, test-valgrind-no-malloc-usable-size-misc, test-sanitizer-address, test-sanitizer-undefined, test-almalinux8-jemalloc, test-almalinux8-tls-module, test-almalinux8-tls-module-no-tls, test-macos-latest, test-macos-latest-sentinel, test-macos-latest-cluster, build-macos, test-freebsd, test-alpine-jemalloc, test-alpine-libc-malloc, reply-schemas-validator]
steps:
- name: Collect job status
run: |
Expand Down
106 changes: 72 additions & 34 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -1312,24 +1312,6 @@ int clusterRedirectBlockedClientIfNeeded(client *c) {
return 0;
}

/* Returns an indication if the replica node is fully available
* and should be listed in CLUSTER SLOTS response.
* Returns 1 for available nodes, 0 for nodes that have
* not finished their initial sync, in failed state, or are
* otherwise considered not available to serve read commands. */
static int isReplicaAvailable(clusterNode *node) {
if (clusterNodeIsFailing(node)) {
return 0;
}
long long repl_offset = clusterNodeReplOffset(node);
if (clusterNodeIsMyself(node)) {
/* Nodes do not update their own information
* in the cluster node list. */
repl_offset = replicationGetSlaveOffset();
}
return (repl_offset != 0);
}

void addNodeToNodeReply(client *c, clusterNode *node) {
char* hostname = clusterNodeHostname(node);
addReplyArrayLen(c, 4);
Expand Down Expand Up @@ -1381,10 +1363,28 @@ void addNodeToNodeReply(client *c, clusterNode *node) {
serverAssert(length == 0);
}

/* Returns an indication if the node is fully available
* and should be listed in CLUSTER SLOTS response.
* Returns 1 for available nodes, 0 for nodes that have
* not finished their initial sync, in failed state, or are
* otherwise considered not available to serve read commands. */
int isNodeAvailable(clusterNode *node) {
if (clusterNodeIsFailing(node)) {
return 0;
}
long long repl_offset = clusterNodeReplOffset(node);
if (clusterNodeIsMyself(node)) {
/* Nodes do not update their own information
* in the cluster node list. */
repl_offset = getNodeReplicationOffset(node);
}
return (repl_offset != 0);
}

void addNodeReplyForClusterSlot(client *c, clusterNode *node, int start_slot, int end_slot) {
int i, nested_elements = 3; /* slots (2) + master addr (1) */
for (i = 0; i < clusterNodeNumSlaves(node); i++) {
if (!isReplicaAvailable(clusterNodeGetSlave(node, i))) continue;
if (!isNodeAvailable(clusterNodeGetSlave(node, i))) continue;
nested_elements++;
}
addReplyArrayLen(c, nested_elements);
Expand All @@ -1396,27 +1396,27 @@ void addNodeReplyForClusterSlot(client *c, clusterNode *node, int start_slot, in
for (i = 0; i < clusterNodeNumSlaves(node); i++) {
/* This loop is copy/pasted from clusterGenNodeDescription()
* with modifications for per-slot node aggregation. */
if (!isReplicaAvailable(clusterNodeGetSlave(node, i))) continue;
if (!isNodeAvailable(clusterNodeGetSlave(node, i))) continue;
addNodeToNodeReply(c, clusterNodeGetSlave(node, i));
nested_elements--;
}
serverAssert(nested_elements == 3); /* Original 3 elements */
}

void clusterCommandSlots(client * c) {
/* Format: 1) 1) start slot
* 2) end slot
* 3) 1) master IP
* 2) master port
* 3) node ID
* 4) 1) replica IP
* 2) replica port
* 3) node ID
* ... continued until done
*/
void clearCachedClusterSlotsResponse(void) {
for (connTypeForCaching conn_type = CACHE_CONN_TCP; conn_type < CACHE_CONN_TYPE_MAX; conn_type++) {
if (server.cached_cluster_slot_info[conn_type]) {
sdsfree(server.cached_cluster_slot_info[conn_type]);
server.cached_cluster_slot_info[conn_type] = NULL;
}
}
}

sds generateClusterSlotResponse(void) {
client *recording_client = createCachedResponseClient();
clusterNode *n = NULL;
int num_masters = 0, start = -1;
void *slot_replylen = addReplyDeferredLen(c);
void *slot_replylen = addReplyDeferredLen(recording_client);

for (int i = 0; i <= CLUSTER_SLOTS; i++) {
/* Find start node and slot id. */
Expand All @@ -1430,14 +1430,52 @@ void clusterCommandSlots(client * c) {
/* Add cluster slots info when occur different node with start
* or end of slot. */
if (i == CLUSTER_SLOTS || n != getNodeBySlot(i)) {
addNodeReplyForClusterSlot(c, n, start, i-1);
addNodeReplyForClusterSlot(recording_client, n, start, i-1);
num_masters++;
if (i == CLUSTER_SLOTS) break;
n = getNodeBySlot(i);
start = i;
}
}
setDeferredArrayLen(c, slot_replylen, num_masters);
setDeferredArrayLen(recording_client, slot_replylen, num_masters);
sds cluster_slot_response = aggregateClientOutputBuffer(recording_client);
deleteCachedResponseClient(recording_client);
return cluster_slot_response;
}

int verifyCachedClusterSlotsResponse(sds cached_response) {
sds generated_response = generateClusterSlotResponse();
int is_equal = !sdscmp(generated_response, cached_response);
/* Here, we use LL_WARNING so this gets printed when debug assertions are enabled and the system is about to crash. */
if (!is_equal) serverLog(LL_WARNING,"\ngenerated_response:\n%s\n\ncached_response:\n%s", generated_response, cached_response);
sdsfree(generated_response);
return is_equal;
}

void clusterCommandSlots(client *c) {
/* Format: 1) 1) start slot
* 2) end slot
* 3) 1) master IP
* 2) master port
* 3) node ID
* 4) 1) replica IP
* 2) replica port
* 3) node ID
* ... continued until done
*/
connTypeForCaching conn_type = connIsTLS(c->conn);

if (detectAndUpdateCachedNodeHealth()) clearCachedClusterSlotsResponse();

sds cached_reply = server.cached_cluster_slot_info[conn_type];
if (!cached_reply) {
cached_reply = generateClusterSlotResponse();
server.cached_cluster_slot_info[conn_type] = cached_reply;
} else {
debugServerAssertWithInfo(c, NULL, verifyCachedClusterSlotsResponse(cached_reply) == 1);
}

addReplyProto(c, cached_reply, sdslen(cached_reply));
}

/* -----------------------------------------------------------------------------
Expand Down
7 changes: 7 additions & 0 deletions src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ const char *clusterNodePreferredEndpoint(clusterNode *n);
long long clusterNodeReplOffset(clusterNode *node);
clusterNode *clusterLookupNode(const char *name, int length);
void clusterReplicateOpenSlots(void);
int detectAndUpdateCachedNodeHealth(void);
client *createCachedResponseClient(void);
void deleteCachedResponseClient(client *recording_client);
void clearCachedClusterSlotsResponse(void);

/* functions with shared implementations */
int clusterNodeIsMyself(clusterNode *n);
Expand All @@ -113,4 +117,7 @@ int isValidAuxString(char *s, unsigned int length);
void migrateCommand(client *c);
void clusterCommand(client *c);
ConnectionType *connTypeOfCluster(void);
int isNodeAvailable(clusterNode *node);
long long getNodeReplicationOffset(clusterNode *node);
sds aggregateClientOutputBuffer(client *c);
#endif /* __CLUSTER_H */
41 changes: 33 additions & 8 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -762,6 +762,7 @@ void clusterSaveConfigOrDie(int do_fsync) {
serverLog(LL_WARNING,"Fatal: can't update cluster config file.");
exit(1);
}
clearCachedClusterSlotsResponse();
}

/* Lock the cluster config using flock(), and retain the file descriptor used to
Expand Down Expand Up @@ -1039,6 +1040,9 @@ void clusterInit(void) {

server.cluster->mf_end = 0;
server.cluster->mf_slave = NULL;
for (connTypeForCaching conn_type = CACHE_CONN_TCP; conn_type < CACHE_CONN_TYPE_MAX; conn_type++) {
server.cached_cluster_slot_info[conn_type] = NULL;
}
resetManualFailover();
clusterUpdateMyselfFlags();
clusterUpdateMyselfIp();
Expand Down Expand Up @@ -1363,6 +1367,7 @@ clusterNode *createClusterNode(char *nodename, int flags) {
node->repl_offset_time = 0;
node->repl_offset = 0;
listSetFreeMethod(node->fail_reports,zfree);
node->is_node_healthy = 0;
return node;
}

Expand Down Expand Up @@ -5862,6 +5867,14 @@ void clusterUpdateSlots(client *c, unsigned char *slots, int del) {
}
}

long long getNodeReplicationOffset(clusterNode *node) {
if (node->flags & CLUSTER_NODE_MYSELF) {
return nodeIsSlave(node) ? replicationGetSlaveOffset() : server.master_repl_offset;
} else {
return node->repl_offset;
}
}

/* Add detailed information of a node to the output buffer of the given client. */
void addNodeDetailsToShardReply(client *c, clusterNode *node) {
int reply_count = 0;
Expand Down Expand Up @@ -5896,12 +5909,7 @@ void addNodeDetailsToShardReply(client *c, clusterNode *node) {
reply_count++;
}

long long node_offset;
if (node->flags & CLUSTER_NODE_MYSELF) {
node_offset = nodeIsSlave(node) ? replicationGetSlaveOffset() : server.master_repl_offset;
} else {
node_offset = node->repl_offset;
}
long long node_offset = getNodeReplicationOffset(node);

addReplyBulkCString(c, "role");
addReplyBulkCString(c, nodeIsSlave(node) ? "replica" : "master");
Expand Down Expand Up @@ -6882,9 +6890,26 @@ void clusterPromoteSelfToMaster(void) {
replicationUnsetMaster();
}

int detectAndUpdateCachedNodeHealth(void) {
dictIterator di;
dictInitSafeIterator(&di, server.cluster->nodes);
dictEntry *de;
clusterNode *node;
int overall_health_changed = 0;
while((de = dictNext(&di)) != NULL) {
node = dictGetVal(de);
int present_is_node_healthy = isNodeAvailable(node);
if (present_is_node_healthy != node->is_node_healthy) {
overall_health_changed = 1;
node->is_node_healthy = present_is_node_healthy;
}
}

return overall_health_changed;
}

/* Replicate migrating and importing slot states to all replicas */
void clusterReplicateOpenSlots(void)
{
void clusterReplicateOpenSlots(void) {
if (!server.cluster_enabled) return;

int argc = 5;
Expand Down
Loading

0 comments on commit 185d4fd

Please sign in to comment.