From 17390383b58e7e894e9a754132509def2ce0d913 Mon Sep 17 00:00:00 2001 From: Binbin Date: Sat, 14 Sep 2024 11:49:49 +0800 Subject: [PATCH] Replica flush the old data after RDB file is ok in disk-based replication (#926) Call emptyData right before rdbLoad to prevent errors in the middle and we drop the replication stream and leaving an empty database. The real changes is in disk-based part, the rest is just code movement. Signed-off-by: Binbin --- src/replication.c | 29 +++++++++++++++++----- tests/integration/replication.tcl | 40 +++++++++++++++++++++++++++++++ 2 files changed, 63 insertions(+), 6 deletions(-) diff --git a/src/replication.c b/src/replication.c index 3b7b3b1709..e49c767881 100644 --- a/src/replication.c +++ b/src/replication.c @@ -2180,11 +2180,6 @@ void readSyncBulkPayload(connection *conn) { temp_functions_lib_ctx = functionsLibCtxCreate(); moduleFireServerEvent(VALKEYMODULE_EVENT_REPL_ASYNC_LOAD, VALKEYMODULE_SUBEVENT_REPL_ASYNC_LOAD_STARTED, NULL); - } else { - replicationAttachToNewPrimary(); - - serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Flushing old data"); - emptyData(-1, empty_db_flags, replicationEmptyDbCallback); } /* Before loading the DB into memory we need to delete the readable @@ -2193,7 +2188,6 @@ void readSyncBulkPayload(connection *conn) { * time for non blocking loading. */ connSetReadHandler(conn, NULL); - serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Loading DB in memory"); rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; if (use_diskless_load) { rio rdb; @@ -2213,6 +2207,14 @@ void readSyncBulkPayload(connection *conn) { dbarray = diskless_load_tempDb; functions_lib_ctx = temp_functions_lib_ctx; } else { + /* We will soon start loading the RDB from socket, the replication history is changed, + * we must discard the cached primary structure and force resync of sub-replicas. */ + replicationAttachToNewPrimary(); + + /* Even though we are on-empty-db and the database is empty, we still call emptyData. */ + serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Flushing old data"); + emptyData(-1, empty_db_flags, replicationEmptyDbCallback); + dbarray = server.db; functions_lib_ctx = functionsLibCtxGetCurrent(); functionsLibCtxClear(functions_lib_ctx); @@ -2224,6 +2226,8 @@ void readSyncBulkPayload(connection *conn) { * We'll restore it when the RDB is received. */ connBlock(conn); connRecvTimeout(conn, server.repl_timeout * 1000); + + serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Loading DB in memory"); startLoading(server.repl_transfer_size, RDBFLAGS_REPLICATION, asyncLoading); int loadingFailed = 0; @@ -2256,6 +2260,7 @@ void readSyncBulkPayload(connection *conn) { serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding temporary DB in background"); } else { /* Remove the half-loaded data in case we started with an empty replica. */ + serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding the half-loaded data"); emptyData(-1, empty_db_flags, replicationEmptyDbCallback); } @@ -2332,6 +2337,17 @@ void readSyncBulkPayload(connection *conn) { return; } + /* We will soon start loading the RDB from disk, the replication history is changed, + * we must discard the cached primary structure and force resync of sub-replicas. */ + replicationAttachToNewPrimary(); + + /* Empty the databases only after the RDB file is ok, that is, before the RDB file + * is actually loaded, in case we encounter an error and drop the replication stream + * and leave an empty database. */ + serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Flushing old data"); + emptyData(-1, empty_db_flags, replicationEmptyDbCallback); + + serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Loading DB in memory"); if (rdbLoad(server.rdb_filename, &rsi, RDBFLAGS_REPLICATION) != RDB_OK) { serverLog(LL_WARNING, "Failed trying to load the PRIMARY synchronization " "DB from disk, check server logs."); @@ -2344,6 +2360,7 @@ void readSyncBulkPayload(connection *conn) { } /* If disk-based RDB loading fails, remove the half-loaded dataset. */ + serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding the half-loaded data"); emptyData(-1, empty_db_flags, replicationEmptyDbCallback); /* Note that there's no point in restarting the AOF on sync failure, diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl index b175f4ff34..203574e391 100644 --- a/tests/integration/replication.tcl +++ b/tests/integration/replication.tcl @@ -1477,3 +1477,43 @@ start_server {tags {"repl external:skip"}} { } } } + +start_server {tags {"repl external:skip"}} { + set replica [srv 0 client] + $replica set replica_key replica_value + + start_server {} { + set primary [srv 0 client] + set primary_host [srv 0 host] + set primary_port [srv 0 port] + $primary set primary_key primary_value + + test {Replica keep the old data if RDB file save fails in disk-based replication} { + # Create a folder called 'dump.rdb' to trigger temp-rdb rename failure + # and it will cause RDB file save to fail at the rename. + set dump_rdb [file join [lindex [$replica config get dir] 1] dump.rdb] + if {[file exists $dump_rdb]} { exec rm -f $dump_rdb } + exec mkdir -p $dump_rdb + + $replica replicaof $primary_host $primary_port + + # Waiting for the rename to fail. + wait_for_log_messages -1 {"*Failed trying to rename the temp DB into dump.rdb*"} 0 1000 10 + + # Make sure the replica has not completed sync and keep the old data. + assert_equal {} [$replica get primary_key] + assert_equal {replica_value} [$replica get replica_key] + + # Remove the test folder and make the rename success + exec rm -rf $dump_rdb + wait_for_condition 500 100 { + [$replica get primary_key] == {primary_value} && + [$replica get replica_key] == {} + } else { + puts [$primary keys *] + puts [$replica keys *] + fail "Replication failed." + } + } + } +}