From bd1d7af5e6179df136cdb9800832fbbf2f2982f8 Mon Sep 17 00:00:00 2001 From: QlQl <2458371920@qq.com> Date: Mon, 29 Jul 2024 20:15:49 +0800 Subject: [PATCH 1/7] feat: add a new test env (rocky linux) in github ci. (#2806) (#2823) * add a new test env (rocky linux) in github ci. --- .github/workflows/pika.yml | 66 +++++++++++++++++++++++++++++++++++ .github/workflows/release.yml | 40 ++++++++++++++++++++- CMakeLists.txt | 15 ++++---- ci/release-build.sh | 9 +++++ codis/Makefile | 10 +++--- utils/Get_OS_Version.sh | 3 ++ 6 files changed, 130 insertions(+), 13 deletions(-) diff --git a/.github/workflows/pika.yml b/.github/workflows/pika.yml index 49756598ad..e8875749c4 100644 --- a/.github/workflows/pika.yml +++ b/.github/workflows/pika.yml @@ -78,6 +78,72 @@ jobs: chmod +x integrate_test.sh sh integrate_test.sh + build_on_rocky: + runs-on: ubuntu-latest + container: + image: rockylinux:9 + + steps: + - name: Install deps + run: | + dnf update -y + dnf install -y bash cmake wget git autoconf gcc perl-Digest-SHA tcl which tar g++ tar epel-release gcc-c++ libstdc++-devel gcc-toolset-13 + + - name: Set up Go + uses: actions/setup-go@v3 + with: + go-version: 1.19 + + - name: Checkout + uses: actions/checkout@v3 + with: + fetch-depth: 0 + + - name: Configure CMake + run: | + source /opt/rh/gcc-toolset-13/enable + cmake -B build -DCMAKE_BUILD_TYPE=${{ env.BUILD_TYPE }} -DUSE_PIKA_TOOLS=ON -DCMAKE_CXX_FLAGS_DEBUG=-fsanitize=address . + + - uses: actions/cache@v3 + with: + path: ${{ github.workspace }}/deps + key: ${{ runner.os }}-rocky-deps-${{ hashFiles('**/CMakeLists.txt') }} + + - uses: actions/cache@v3 + with: + path: ${{ github.workspace }}/buildtrees + key: ${{ runner.os }}-rocky-buildtrees-${{ hashFiles('**/CMakeLists.txt') }} + + - name: Build + run: | + source /opt/rh/gcc-toolset-13/enable + cmake --build build --config ${{ env.BUILD_TYPE }} + + - name: Test + working-directory: ${{ github.workspace }}/build + run: ctest -C ${{ env.BUILD_TYPE }} + + - name: Unit Test + working-directory: ${{ github.workspace }} + run: ./pikatests.sh all + + - name: Start codis, pika master and pika slave + working-directory: ${{ github.workspace }}/build + run: | + chmod +x ../tests/integration/start_master_and_slave.sh + ../tests/integration/start_master_and_slave.sh + chmod +x ../tests/integration/start_codis.sh + ../tests/integration/start_codis.sh + + - name: Run Go E2E Tests + working-directory: ${{ github.workspace }}/build + run: | + cd ../tools/pika_keys_analysis/ + go test -v ./... + cd ../../tests/integration/ + chmod +x integrate_test.sh + sh integrate_test.sh + build_on_macos: runs-on: macos-12 diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 23145491ec..8d794a45a5 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -47,9 +47,47 @@ jobs: name: ${{ matrix.name }}.sha256sum path: build/${{ matrix.name }}.sha256sum + rocky: + runs-on: ubuntu-latest + container: + image: rockylinux:9 + env: + name: ${{ github.event.repository.name }}-${{ github.ref_name }}-rocky-amd64.tar.gz + steps: + - name: Install deps + run: | + dnf update -y + dnf install -y bash cmake wget git autoconf gcc perl-Digest-SHA tcl which tar g++ tar epel-release gcc-c++ libstdc++-devel gcc-toolset-13 + + - name: Checkout sources + uses: actions/checkout@v3 + with: + fetch-depth: 0 + + - name: Release build os - rocky + run: | + chmod +x ci/release-build.sh + ./ci/release-build.sh install rocky ${{ env.BUILD_TYPE }} -xe + + - name: Calculate checksum and rename binary + shell: bash + run: ./ci/release-build.sh checksum ${{ github.event.repository.name }} ${{ env.name }} + + - name: Upload artifacts + uses: actions/upload-artifact@v3 + with: + name: ${{ env.name }} + path: build/${{ env.name }} + + - name: Upload checksum of artifacts + uses: actions/upload-artifact@v3 + with: + name: ${{ env.name }}.sha256sum + path: build/${{ env.name }}.sha256sum + release: name: Release artifacts - needs: [ build ] + needs: [ build, rocky ] runs-on: ubuntu-latest steps: - name: Download artifacts diff --git a/CMakeLists.txt b/CMakeLists.txt index 4fe49879c0..79e1cc0418 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -22,6 +22,7 @@ elseif (CMAKE_CXX_COMPILER_ID STREQUAL "GNU") endif() endif() +link_directories("/opt/rh/gcc-toolset-13/root/lib/gcc/x86_64-redhat-linux/13") ############# You should enable sanitizer if you are developing pika ############# # Uncomment the following two lines to enable AddressSanitizer to detect memory leaks and other memory-related bugs. @@ -159,7 +160,7 @@ ExternalProject_Add(gtest make -j${CPU_CORE} ) -if(${OS_VERSION} MATCHES "CentOS") +if(${OS_VERSION} MATCHES "Rocky") set(GTEST_LIBRARY ${INSTALL_LIBDIR_64}/libgtest.a) set(GTEST_MAIN_LIBRARY ${INSTALL_LIBDIR_64}/libgtest_main.a) set(GMOCK_LIBRARY ${INSTALL_LIBDIR_64}/libgmock.a) @@ -282,7 +283,7 @@ else() set(LIB_GLOG libglog.a) endif() -if(${OS_VERSION} MATCHES "CentOS") +if(${OS_VERSION} MATCHES "Rocky") set(GLOG_LIBRARY ${INSTALL_LIBDIR_64}/${LIB_GLOG}) else() set(GLOG_LIBRARY ${INSTALL_LIBDIR}/${LIB_GLOG}) @@ -317,7 +318,7 @@ ExternalProject_Add(snappy make -j${CPU_CORE} ) -if(${OS_VERSION} MATCHES "CentOS") +if(${OS_VERSION} MATCHES "Rocky") set(SNAPPY_LIBRARY ${INSTALL_LIBDIR_64}/libsnappy.a) else() set(SNAPPY_LIBRARY ${INSTALL_LIBDIR}/libsnappy.a) @@ -355,7 +356,7 @@ ExternalProject_Add(zstd make -j${CPU_CORE} ) -if(${OS_VERSION} MATCHES "CentOS") +if(${OS_VERSION} MATCHES "Rocky") set(ZSTD_LIBRARY ${INSTALL_LIBDIR_64}/libzstd.a) else() set(ZSTD_LIBRARY ${INSTALL_LIBDIR}/libzstd.a) @@ -394,7 +395,7 @@ else() set(LIB_FMT libfmt.a) endif() -if(${OS_VERSION} MATCHES "CentOS") +if(${OS_VERSION} MATCHES "Rocky") set(FMT_LIBRARY ${INSTALL_LIBDIR_64}/${LIB_FMT}) else() set(FMT_LIBRARY ${INSTALL_LIBDIR}/${LIB_FMT}) @@ -432,7 +433,7 @@ ExternalProject_Add(lz4 make -j${CPU_CORE} ) -if(${OS_VERSION} MATCHES "CentOS") +if(${OS_VERSION} MATCHES "Rocky") set(LZ4_LIBRARY ${INSTALL_LIBDIR_64}/liblz4.a) else() set(LZ4_LIBRARY ${INSTALL_LIBDIR}/liblz4.a) @@ -718,7 +719,7 @@ if (USE_PIKA_TOOLS) set(BZ2_LIBRARY ${INSTALL_LIBDIR}/libbz2.a) endif() -if(${OS_VERSION} MATCHES "CentOS") +if(${OS_VERSION} MATCHES "Rocky") set(ROCKSDB_LIBRARY ${INSTALL_LIBDIR_64}/librocksdb.a) else() set(ROCKSDB_LIBRARY ${INSTALL_LIBDIR}/librocksdb.a) diff --git a/ci/release-build.sh b/ci/release-build.sh index df8262aad9..d7a61012c6 100644 --- a/ci/release-build.sh +++ b/ci/release-build.sh @@ -9,6 +9,12 @@ function install_deps() { elif [[ $OS == *"ubuntu"* ]]; then sudo apt-get install -y autoconf libprotobuf-dev protobuf-compiler sudo apt-get install -y clang-tidy-12 + elif [[ $OS == *"rocky"* ]]; then + sudo dnf update -y + sudo dnf install -y bash cmake + sudo dnf install -y wget git autoconf gcc perl-Digest-SHA + sudo dnf install -y tcl which tar g++ tar epel-release gcc-c++ libstdc++-devel + sudo dnf install -y gcc-toolset-13 else echo "not support $OS" fi @@ -22,6 +28,9 @@ function configure_cmake() { cmake -B build -DCMAKE_C_COMPILER=/usr/local/opt/gcc@10/bin/gcc-10 -DUSE_PIKA_TOOLS=ON -DCMAKE_BUILD_TYPE=$BUILD_TYPE elif [[ $OS == *"ubuntu"* ]]; then cmake -B build -DCMAKE_BUILD_TYPE=$BUILD_TYPE -DUSE_PIKA_TOOLS=ON -DCMAKE_CXX_FLAGS="-s" -DCMAKE_EXE_LINKER_FLAGS="-s" + elif [[ $OS == *"rocky"* ]]; then + source /opt/rh/gcc-toolset-13/enable + cmake -B build -DCMAKE_BUILD_TYPE=$BUILD_TYPE -DUSE_PIKA_TOOLS=ON -DCMAKE_CXX_FLAGS_DEBUG=-fsanitize=address fi echo "configure cmake after ..." } diff --git a/codis/Makefile b/codis/Makefile index ef20162da3..c3dd8851d9 100644 --- a/codis/Makefile +++ b/codis/Makefile @@ -12,25 +12,25 @@ codis-deps: codis-dashboard: codis-deps $(info build codis-dashboard) - @cd ${PRJ_ROOT}/cmd/dashboard && go mod tidy && go build -o ${PRJ_ROOT}/bin/codis-dashboard . + @cd ${PRJ_ROOT}/cmd/dashboard && go mod tidy && go build -buildvcs=false -o ${PRJ_ROOT}/bin/codis-dashboard . @${PRJ_ROOT}/bin/codis-dashboard --default-config > ${PRJ_ROOT}/config/dashboard.toml codis-proxy: codis-deps $(info build codis-proxy) - @cd ${PRJ_ROOT}/cmd/proxy && go mod tidy && go build -o ${PRJ_ROOT}/bin/codis-proxy . + @cd ${PRJ_ROOT}/cmd/proxy && go mod tidy && go build -buildvcs=false -o ${PRJ_ROOT}/bin/codis-proxy . @${PRJ_ROOT}/bin/codis-proxy --default-config > ${PRJ_ROOT}/config/proxy.toml codis-admin: codis-deps $(info build codis-admin) - @cd ${PRJ_ROOT}/cmd/admin && go mod tidy && go build -o ${PRJ_ROOT}/bin/codis-admin . + @cd ${PRJ_ROOT}/cmd/admin && go mod tidy && go build -buildvcs=false -o ${PRJ_ROOT}/bin/codis-admin . codis-ha: codis-deps $(info build codis-ha) - @cd ${PRJ_ROOT}/cmd/ha && go mod tidy && go build -o ${PRJ_ROOT}/bin/codis-ha . + @cd ${PRJ_ROOT}/cmd/ha && go mod tidy && go build -buildvcs=false -o ${PRJ_ROOT}/bin/codis-ha . codis-fe: codis-deps $(info build codis-fe) - @cd ${PRJ_ROOT}/cmd/fe && go mod tidy && go build -o ${PRJ_ROOT}/bin/codis-fe . + @cd ${PRJ_ROOT}/cmd/fe && go mod tidy && go build -buildvcs=false -o ${PRJ_ROOT}/bin/codis-fe . @rm -rf ${PRJ_ROOT}/bin/assets && cp -rf ${PRJ_ROOT}/cmd/fe/assets ./bin/ clean: diff --git a/utils/Get_OS_Version.sh b/utils/Get_OS_Version.sh index 25811ce57b..0393ba6dec 100644 --- a/utils/Get_OS_Version.sh +++ b/utils/Get_OS_Version.sh @@ -8,6 +8,9 @@ Get_Dist_Name() elif grep -Eqii "CentOS" /etc/issue || grep -Eq "CentOS" /etc/*-release; then DISTRO='CentOS' PM='yum' + elif grep -Eqii "Rocky" /etc/issue || grep -Eq "Rocky" /etc/*-release; then + DISTRO='Rocky' + PM='nfs' elif grep -Eqi "Red Hat Enterprise Linux Server" /etc/issue || grep -Eq "Red Hat Enterprise Linux Server" /etc/*-release; then DISTRO='RHEL' PM='yum' From f2d8e9c5cb10e777d584efe285d44d08b86db25e Mon Sep 17 00:00:00 2001 From: JayLiu <38887641+luky116@users.noreply.github.com> Date: Tue, 30 Jul 2024 19:49:22 +0800 Subject: [PATCH 2/7] test: fix multi bug and compatible ACL test (#2815) * add multi.tcl * Commented test cases in Tcl that cannot pass * fix multi bug and compatible ACL test --------- Co-authored-by: saz97 Co-authored-by: liuyuecai --- include/pika_client_conn.h | 3 +- pikatests.sh | 2 +- src/pika_client_conn.cc | 64 +-- src/pika_transaction.cc | 6 +- tests/assets/default.conf | 27 +- tests/conf/pika.conf | 99 +++- tests/test_helper.tcl | 24 +- tests/unit/multi.tcl | 969 ++++++++++++++++++++++++++++++------- 8 files changed, 979 insertions(+), 215 deletions(-) diff --git a/include/pika_client_conn.h b/include/pika_client_conn.h index 6b5dbab419..9008d848c8 100644 --- a/include/pika_client_conn.h +++ b/include/pika_client_conn.h @@ -99,8 +99,7 @@ class PikaClientConn : public net::RedisConn { void AddKeysToWatch(const std::vector& db_keys); void RemoveWatchedKeys(); void SetTxnFailedFromKeys(const std::vector& db_keys); - void SetAllTxnFailed(); - void SetTxnFailedFromDBs(std::string db_name); + void SetTxnFailedIfKeyExists(const std::string target_db_name = ""); void ExitTxn(); bool IsInTxn(); bool IsTxnInitFailed(); diff --git a/pikatests.sh b/pikatests.sh index 9e1df19563..7d3163c40e 100755 --- a/pikatests.sh +++ b/pikatests.sh @@ -45,7 +45,7 @@ function setup_pika_bin { exit 1 fi cp $PIKA_BIN src/redis-server - cp conf/pika.conf tests/assets/default.conf + cp tests/conf/pika.conf tests/assets/default.conf } diff --git a/src/pika_client_conn.cc b/src/pika_client_conn.cc index 1156cc3d95..daff036ceb 100644 --- a/src/pika_client_conn.cc +++ b/src/pika_client_conn.cc @@ -186,28 +186,28 @@ std::shared_ptr PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const st } } - // Process Command - c_ptr->Execute(); - time_stat_->process_done_ts_ = pstd::NowMicros(); - auto cmdstat_map = g_pika_cmd_table_manager->GetCommandStatMap(); - (*cmdstat_map)[opt].cmd_count.fetch_add(1); - (*cmdstat_map)[opt].cmd_time_consuming.fetch_add(time_stat_->total_time()); - if (c_ptr->res().ok() && c_ptr->is_write() && name() != kCmdNameExec) { if (c_ptr->name() == kCmdNameFlushdb) { auto flushdb = std::dynamic_pointer_cast(c_ptr); - SetTxnFailedFromDBs(flushdb->GetFlushDBname()); + SetTxnFailedIfKeyExists(flushdb->GetFlushDBname()); } else if (c_ptr->name() == kCmdNameFlushall) { - SetAllTxnFailed(); + SetTxnFailedIfKeyExists(); } else { auto table_keys = c_ptr->current_key(); for (auto& key : table_keys) { - key = c_ptr->db_name().append(key); + key = c_ptr->db_name().append("_").append(key); } SetTxnFailedFromKeys(table_keys); } } + // Process Command + c_ptr->Execute(); + time_stat_->process_done_ts_ = pstd::NowMicros(); + auto cmdstat_map = g_pika_cmd_table_manager->GetCommandStatMap(); + (*cmdstat_map)[opt].cmd_count.fetch_add(1); + (*cmdstat_map)[opt].cmd_time_consuming.fetch_add(time_stat_->total_time()); + if (g_pika_conf->slowlog_slower_than() >= 0) { ProcessSlowlog(argv, c_ptr->GetDoDuration()); } @@ -387,32 +387,42 @@ void PikaClientConn::SetTxnFailedFromKeys(const std::vector& db_key auto involved_conns = std::vector>{}; involved_conns = dispatcher->GetInvolvedTxn(db_keys); for (auto& conn : involved_conns) { - if (auto c = std::dynamic_pointer_cast(conn); c != nullptr && c.get() != this) { + if (auto c = std::dynamic_pointer_cast(conn); c != nullptr) { c->SetTxnWatchFailState(true); } } } } -void PikaClientConn::SetAllTxnFailed() { +// if key in target_db exists, then the key been watched multi will be failed +void PikaClientConn::SetTxnFailedIfKeyExists(std::string target_db_name) { auto dispatcher = dynamic_cast(server_thread()); - if (dispatcher != nullptr) { - auto involved_conns = dispatcher->GetAllTxns(); - for (auto& conn : involved_conns) { - if (auto c = std::dynamic_pointer_cast(conn); c != nullptr && c.get() != this) { - c->SetTxnWatchFailState(true); - } - } + if (dispatcher == nullptr) { + return; } -} + auto involved_conns = dispatcher->GetAllTxns(); + for (auto& conn : involved_conns) { + std::shared_ptr c; + if (c = std::dynamic_pointer_cast(conn); c == nullptr) { + continue; + } -void PikaClientConn::SetTxnFailedFromDBs(std::string db_name) { - auto dispatcher = dynamic_cast(server_thread()); - if (dispatcher != nullptr) { - auto involved_conns = dispatcher->GetDBTxns(db_name); - for (auto& conn : involved_conns) { - if (auto c = std::dynamic_pointer_cast(conn); c != nullptr && c.get() != this) { - c->SetTxnWatchFailState(true); + for (const auto& db_key : c->watched_db_keys_) { + size_t pos = db_key.find('_'); + if (pos == std::string::npos) { + continue; + } + + auto db_name = db_key.substr(0, pos); + auto key = db_key.substr(pos + 1); + + if (target_db_name == "" || target_db_name == "all" || target_db_name == db_name) { + auto db = g_pika_server->GetDB(db_name); + // if watched key exists, set watch state to failed + if (db->storage()->Exists({key}) > 0) { + c->SetTxnWatchFailState(true); + break; + } } } } diff --git a/src/pika_transaction.cc b/src/pika_transaction.cc index 29fbac4754..1db66c874f 100644 --- a/src/pika_transaction.cc +++ b/src/pika_transaction.cc @@ -57,14 +57,14 @@ void ExecCmd::Do() { if (cmd->name() == kCmdNameFlushall) { auto flushall = std::dynamic_pointer_cast(cmd); flushall->FlushAllWithoutLock(); - client_conn->SetAllTxnFailed(); + client_conn->SetTxnFailedIfKeyExists(); } else if (cmd->name() == kCmdNameFlushdb) { auto flushdb = std::dynamic_pointer_cast(cmd); flushdb->DoWithoutLock(); if (cmd->res().ok()) { cmd->res().SetRes(CmdRes::kOk); } - client_conn->SetTxnFailedFromDBs(each_cmd_info.db_->GetDBName()); + client_conn->SetTxnFailedIfKeyExists(each_cmd_info.db_->GetDBName()); } else { cmd->Do(); if (cmd->res().ok() && cmd->is_write()) { @@ -258,7 +258,7 @@ void WatchCmd::DoInitial() { size_t pos = 1; while (pos < argv_.size()) { keys_.emplace_back(argv_[pos]); - db_keys_.push_back(db_name() + argv_[pos++]); + db_keys_.push_back(db_name() + "_" + argv_[pos++]); } } diff --git a/tests/assets/default.conf b/tests/assets/default.conf index 468d253e89..1a7b815885 100644 --- a/tests/assets/default.conf +++ b/tests/assets/default.conf @@ -34,10 +34,17 @@ slow-cmd-thread-pool-size : 1 # Slow cmd list e.g. hgetall, mset slow-cmd-list : -# The number of sync-thread for data replication from master, those are the threads work on slave nodes -# and are used to execute commands sent from master node when replicating. +# The number of threads to write DB in slaveNode when replicating. +# It's preferable to set slave's sync-thread-num value close to master's thread-pool-size. sync-thread-num : 6 +# The num of threads to write binlog in slaveNode when replicating, +# each DB cloud only bind to one sync-binlog-thread to write binlog in maximum +#[NOTICE] It's highly recommended to set sync-binlog-thread-num equal to conf item 'database'(then each DB cloud have a exclusive thread to write binlog), +# eg. if you use 8 DBs(databases_ is 8), sync-binlog-thread-num is preferable to be 8 +# Valid range of sync-binlog-thread-num is [1, databases], the final value of it is Min(sync-binlog-thread-num, databases) +sync-binlog-thread-num : 1 + # Directory to store log files of Pika, which contains multiple types of logs, # Including: INFO, WARNING, ERROR log, as well as binglog(write2fine) file which # is used for replication. @@ -101,6 +108,8 @@ instance-mode : classic # The default database id is DB 0. You can select a different one on # a per-connection by using SELECT. The db id range is [0, 'databases' value -1]. # The value range of this parameter is [1, 8]. +# [NOTICE] It's RECOMMENDED to set sync-binlog-thread-num equal to DB num(databases), +# if you've changed the value of databases, remember to check if the value of sync-binlog-thread-num is proper. databases : 1 # The number of followers of a master. Only [0, 1, 2, 3, 4] is valid at present. @@ -308,6 +317,11 @@ max-write-buffer-num : 2 # whether the key exists. Setting this value too high may hurt performance. min-write-buffer-number-to-merge : 1 +# The total size of wal files, when reaches this limit, rocksdb will force the flush of column-families +# whose memtables are backed by the oldest live WAL file. Also used to control the rocksdb open time when +# process restart. +max-total-wal-size : 1073741824 + # rocksdb level0_stop_writes_trigger level0-stop-writes-trigger : 36 @@ -466,9 +480,14 @@ default-slot-num : 1024 # The cache will be sharded into 2^blob-num-shard-bits shards. # blob-num-shard-bits : -1 -# Rsync Rate limiting configuration 200MB/s +# Rsync Rate limiting configuration [Default value is 200MB/s] +# [USED BY SLAVE] The transmitting speed(Rsync Rate) In full replication is controlled BY SLAVE NODE, You should modify the throttle-bytes-per-second in slave's pika.conf if you wanna change the rsync rate limit. +# [Dynamic Change Supported] send command 'config set throttle-bytes-per-second new_value' to SLAVE NODE can dynamically adjust rsync rate during full sync(use config rewrite can persist the changes). throttle-bytes-per-second : 207200000 - +# Rsync timeout in full sync stage[Default value is 1000 ms], unnecessary retries will happen if this value is too small. +# [Dynamic Change Supported] similar to throttle-bytes-per-second, rsync-timeout-ms can be dynamically changed by configset command +# [USED BY SLAVE] Similar to throttle-bytes-per-second, you should change rsync-timeout-ms's value in slave's conf file if it is needed to adjust. +rsync-timeout-ms : 1000 # The valid range for max-rsync-parallel-num is [1, 4]. # If an invalid value is provided, max-rsync-parallel-num will automatically be reset to 4. max-rsync-parallel-num : 4 diff --git a/tests/conf/pika.conf b/tests/conf/pika.conf index e3966c4925..2a2d3dbac5 100644 --- a/tests/conf/pika.conf +++ b/tests/conf/pika.conf @@ -7,7 +7,9 @@ # Port 10221 is used for Rsync, and port 11221 is used for Replication, while the listening port is 9221. port : 9221 -db-instance-num : 3 +db-instance-num : 3 +rocksdb-ttl-second : 86400 * 7; +rocksdb-periodic-second : 86400 * 3; # Random value identifying the Pika server, its string length must be 40. # If not set, Pika will generate a random string with a length of 40 random characters. @@ -25,10 +27,24 @@ thread-num : 1 # are dedicated to handling user requests. thread-pool-size : 12 -# The number of sync-thread for data replication from master, those are the threads work on slave nodes -# and are used to execute commands sent from master node when replicating. +# Size of the low level thread pool, The threads within this pool +# are dedicated to handling slow user requests. +slow-cmd-thread-pool-size : 1 + +# Slow cmd list e.g. hgetall, mset +slow-cmd-list : + +# The number of threads to write DB in slaveNode when replicating. +# It's preferable to set slave's sync-thread-num value close to master's thread-pool-size. sync-thread-num : 6 +# The num of threads to write binlog in slaveNode when replicating, +# each DB cloud only bind to one sync-binlog-thread to write binlog in maximum +#[NOTICE] It's highly recommended to set sync-binlog-thread-num equal to conf item 'database'(then each DB cloud have a exclusive thread to write binlog), +# eg. if you use 8 DBs(databases_ is 8), sync-binlog-thread-num is preferable to be 8 +# Valid range of sync-binlog-thread-num is [1, databases], the final value of it is Min(sync-binlog-thread-num, databases) +sync-binlog-thread-num : 1 + # Directory to store log files of Pika, which contains multiple types of logs, # Including: INFO, WARNING, ERROR log, as well as binglog(write2fine) file which # is used for replication. @@ -70,7 +86,7 @@ requirepass : # [NOTICE] The value of this parameter must match the "requirepass" setting on the master. masterauth : -# The [password of user], which is empty by default.(Deprecated) +# The [password of user], which is empty by default. # [NOTICE] If this user password is the same as admin password (including both being empty), # the value of this parameter will be ignored and all users are considered as administrators, # in this scenario, users are not subject to the restrictions imposed by the userblacklist. @@ -92,7 +108,9 @@ instance-mode : classic # The default database id is DB 0. You can select a different one on # a per-connection by using SELECT. The db id range is [0, 'databases' value -1]. # The value range of this parameter is [1, 8]. -databases : 1 +# [NOTICE] It's RECOMMENDED to set sync-binlog-thread-num equal to DB num(databases), +# if you've changed the value of databases, remember to check if the value of sync-binlog-thread-num is proper. +databases : 3 # The number of followers of a master. Only [0, 1, 2, 3, 4] is valid at present. # By default, this num is set to 0, which means this feature is [not enabled] @@ -219,6 +237,11 @@ slave-priority : 100 # [NOTICE]: compact-interval is prior than compact-cron. #compact-interval : +# The disable_auto_compactions option is [true | false] +disable_auto_compactions : false + +# Rocksdb max_subcompactions +max-subcompactions : 1 # The minimum disk usage ratio for checking resume. # If the disk usage ratio is lower than min-check-resume-ratio, it will not check resume, only higher will check resume. # Its default value is 0.7. @@ -269,6 +292,7 @@ max-cache-statistic-keys : 0 # a small compact is triggered automatically if the small compaction feature is enabled. # small-compaction-threshold default value is 5000 and the value range is [1, 100000]. small-compaction-threshold : 5000 +small-compaction-duration-threshold : 10000 # The maximum total size of all live memtables of the RocksDB instance that owned by Pika. # Flushing from memtable to disk will be triggered if the actual memory usage of RocksDB @@ -283,6 +307,30 @@ max-write-buffer-size : 10737418240 # If max-write-buffer-num > 3, writing will be slowed down. max-write-buffer-num : 2 +# `min_write_buffer_number_to_merge` is the minimum number of memtables +# that need to be merged before placing the order. For example, if the +# option is set to 2, immutable memtables will only be flushed if there +# are two of them - a single immutable memtable will never be flushed. +# If multiple memtables are merged together, less data will be written +# to storage because the two updates are merged into a single key. However, +# each Get() must linearly traverse all unmodifiable memtables and check +# whether the key exists. Setting this value too high may hurt performance. +min-write-buffer-number-to-merge : 1 + +# The total size of wal files, when reaches this limit, rocksdb will force the flush of column-families +# whose memtables are backed by the oldest live WAL file. Also used to control the rocksdb open time when +# process restart. +max-total-wal-size : 1073741824 + +# rocksdb level0_stop_writes_trigger +level0-stop-writes-trigger : 36 + +# rocksdb level0_slowdown_writes_trigger +level0-slowdown-writes-trigger : 20 + +# rocksdb level0_file_num_compaction_trigger +level0-file-num-compaction-trigger : 4 + # The maximum size of the response package to client to prevent memory # exhaustion caused by commands like 'keys *' and 'Scan' which can generate huge response. # Supported Units [K|M|G]. The default unit is in [bytes]. @@ -328,6 +376,12 @@ max-bytes-for-level-multiplier : 10 # slotmigrate [yes | no] slotmigrate : no +# slotmigrate thread num +slotmigrate-thread-num : 1 + +# thread-migrate-keys-num 1/8 of the write_buffer_size_ +thread-migrate-keys-num : 64 + # BlockBasedTable block_size, default 4k # block-size: 4096 @@ -346,6 +400,12 @@ slotmigrate : no # The slot number of pika when used with codis. default-slot-num : 1024 +# enable-partitioned-index-filters [yes | no] +# When `cache-index-and-filter-blocks` is enabled, `pin_l0_filter_and_index_blocks_in_cache` +# and `cache-index-and-filter-blocks` is suggested to be enabled +# https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters +# enable-partitioned-index-filters: default no + # whether or not index and filter blocks is stored in block cache # cache-index-and-filter-blocks: no @@ -364,6 +424,10 @@ default-slot-num : 1024 # https://github.com/EighteenZi/rocksdb_wiki/blob/master/Rate-Limiter.md #######################################################################E####### +# rate limiter mode +# 0: Read 1: Write 2: ReadAndWrite +# rate-limiter-mode : default 1 + # rate limiter bandwidth, default 2000MB/s #rate-limiter-bandwidth : 2097152000 @@ -416,8 +480,16 @@ default-slot-num : 1024 # The cache will be sharded into 2^blob-num-shard-bits shards. # blob-num-shard-bits : -1 -# Rsync Rate limiting configuration 200MB/s +# Rsync Rate limiting configuration [Default value is 200MB/s] +# [USED BY SLAVE] The transmitting speed(Rsync Rate) In full replication is controlled BY SLAVE NODE, You should modify the throttle-bytes-per-second in slave's pika.conf if you wanna change the rsync rate limit. +# [Dynamic Change Supported] send command 'config set throttle-bytes-per-second new_value' to SLAVE NODE can dynamically adjust rsync rate during full sync(use config rewrite can persist the changes). throttle-bytes-per-second : 207200000 +# Rsync timeout in full sync stage[Default value is 1000 ms], unnecessary retries will happen if this value is too small. +# [Dynamic Change Supported] similar to throttle-bytes-per-second, rsync-timeout-ms can be dynamically changed by configset command +# [USED BY SLAVE] Similar to throttle-bytes-per-second, you should change rsync-timeout-ms's value in slave's conf file if it is needed to adjust. +rsync-timeout-ms : 1000 +# The valid range for max-rsync-parallel-num is [1, 4]. +# If an invalid value is provided, max-rsync-parallel-num will automatically be reset to 4. max-rsync-parallel-num : 4 # The synchronization mode of Pika primary/secondary replication is determined by ReplicationID. ReplicationID in one replication_cluster are the same @@ -432,7 +504,7 @@ cache-num : 16 # cache-model 0:cache_none 1:cache_read cache-model : 1 # cache-type: string, set, zset, list, hash, bit -cache-type: string, set, zset, list, hash +cache-type: string, set, zset, list, hash, bit # Maximum number of keys in the zset redis cache # On the disk DB, a zset field may have many fields. In the memory cache, we limit the maximum @@ -498,18 +570,19 @@ cache-lfu-decay-time: 1 # # aclfile : ../conf/users.acl +# (experimental) # It is possible to change the name of dangerous commands in a shared environment. # For instance the CONFIG command may be renamed into something Warning: To prevent # data inconsistency caused by different configuration files, do not use the rename # command to modify write commands on the primary and secondary servers. If necessary, # ensure that the configuration files of the primary and secondary servers are consistent # In addition, when using the command rename, you must not use "" to modify the command, -# for example, rename-command: FLUSHALL "360flushall" is incorrect; instead, use -# rename-command: FLUSHALL 360flushall is correct. After the rename command is executed, +# for example, rename-command: FLUSHDB "360flushdb" is incorrect; instead, use +# rename-command: FLUSHDB 360flushdb is correct. After the rename command is executed, # it is most appropriate to use a numeric string with uppercase or lowercase letters -# for example: rename-command : FLUSHALL joYAPNXRPmcarcR4ZDgC81TbdkSmLAzRPmcarcR +# for example: rename-command : FLUSHDB joYAPNXRPmcarcR4ZDgC81TbdkSmLAzRPmcarcR +# Warning: Currently only applies to flushdb, slaveof, bgsave, shutdown, config command +# Warning: Ensure that the Settings of rename-command on the master and slave servers are consistent # # Example: -# -# rename-command : FLUSHALL 360flushall -# rename-command : FLUSHDB 360flushdb \ No newline at end of file +# rename-command : FLUSHDB 360flushdb diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index 6f363654c4..2dc7499837 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -15,7 +15,6 @@ set ::all_tests { unit/printver unit/basic unit/scan - unit/multi unit/quit unit/pubsub unit/slowlog @@ -32,6 +31,7 @@ set ::all_tests { unit/type/zset unit/type/string unit/type/hash + unit/multi unit/type/stream # unit/expire # unit/protocol @@ -79,7 +79,7 @@ set ::force_failure 0 set ::timeout 600; # 10 minutes without progresses will quit the test. set ::last_progress [clock seconds] set ::active_servers {} ; # Pids of active Redis instances. - +set ::tls 0 # Set to 1 when we are running in client mode. The Redis test uses a # server-client model to run tests simultaneously. The server instance # runs the specified number of client instances that will actually run tests. @@ -179,6 +179,26 @@ proc cleanup {} { if {!$::quiet} {puts "OK"} } +proc redis_client {args} { + set level 0 + if {[llength $args] > 0 && [string is integer [lindex $args 0]]} { + set level [lindex $args 0] + set args [lrange $args 1 end] + } + + # create client that won't defers reading reply + set client [redis [srv $level "host"] [srv $level "port"] 0 $::tls] + + # select the right db and read the response (OK), or at least ping + # the server if we're in a singledb mode. + if {$::singledb} { + $client ping + } else { + $client select 9 + } + return $client +} + proc test_server_main {} { cleanup set tclsh [info nameofexecutable] diff --git a/tests/unit/multi.tcl b/tests/unit/multi.tcl index 9e7102f708..5ebd1cbfca 100644 --- a/tests/unit/multi.tcl +++ b/tests/unit/multi.tcl @@ -1,5 +1,15 @@ +proc wait_for_dbsize {size} { + set r2 [redis_client] + wait_for_condition 50 100 { + [$r2 dbsize] == $size + } else { + fail "Target dbsize not reached" + } + $r2 close +} + start_server {tags {"multi"}} { - test {MUTLI / EXEC basics} { + test {MULTI / EXEC basics} { r del mylist r rpush mylist a r rpush mylist b @@ -47,83 +57,144 @@ start_server {tags {"multi"}} { } {*ERR WATCH*} test {EXEC fails if there are errors while queueing commands #1} { - r del foo1 foo2 + r del foo1{t} foo2{t} r multi - r set foo1 bar1 + r set foo1{t} bar1 catch {r non-existing-command} - r set foo2 bar2 + r set foo2{t} bar2 catch {r exec} e assert_match {EXECABORT*} $e - list [r exists foo1] [r exists foo2] + list [r exists foo1{t}] [r exists foo2{t}] } {0 0} -# This parameter is not available in Pika -# test {EXEC fails if there are errors while queueing commands #2} { -# set rd [redis_deferring_client] -# r del foo1 foo2 -# r multi -# r set foo1 bar1 -# $rd config set maxmemory 1 -# assert {[$rd read] eq {OK}} -# catch {r lpush mylist myvalue} -# $rd config set maxmemory 0 -# assert {[$rd read] eq {OK}} -# r set foo2 bar2 -# catch {r exec} e -# assert_match {EXECABORT*} $e -# $rd close -# list [r exists foo1] [r exists foo2] -# } {0 0} + # Pika not support parameter maxmemory + # test {EXEC fails if there are errors while queueing commands #2} { + # set rd [redis_deferring_client] + # r del foo1{t} foo2{t} + # r multi + # r set foo1{t} bar1 + # $rd config set maxmemory 1 + # assert {[$rd read] eq {OK}} + # catch {r lpush mylist{t} myvalue} + # $rd config set maxmemory 0 + # assert {[$rd read] eq {OK}} + # r set foo2{t} bar2 + # catch {r exec} e + # assert_match {EXECABORT*} $e + # $rd close + # list [r exists foo1{t}] [r exists foo2{t}] + # } {0 0} {needs:config-maxmemory} test {If EXEC aborts, the client MULTI state is cleared} { - r del foo1 foo2 + r del foo1{t} foo2{t} r multi - r set foo1 bar1 + r set foo1{t} bar1 catch {r non-existing-command} - r set foo2 bar2 + r set foo2{t} bar2 catch {r exec} e assert_match {EXECABORT*} $e r ping } {PONG} test {EXEC works on WATCHed key not modified} { - r watch x y z - r watch k + r watch x{t} y{t} z{t} + r watch k{t} r multi r ping r exec } {PONG} -# The return value of Pika is inconsistent with Redis -# test {EXEC fail on WATCHed key modified (1 key of 1 watched)} { -# r set x 30 -# r watch x -# r set x 40 -# r multi -# r ping -# r exec -# } {} - -# The return value of Pika is inconsistent with Redis -# test {EXEC fail on WATCHed key modified (1 key of 5 watched)} { -# r set x 30 -# r watch a b x k z -# r set x 40 -# r multi -# r ping -# r exec -# } {} - -# The return value of Pika is inconsistent with Redis -# test {EXEC fail on WATCHed key modified by SORT with STORE even if the result is empty} { -# r flushdb -# r lpush foo barsync" -# r watch foo -# r sort emptylist store foo -# r multi -# r ping -# r exec -# } {} + test {EXEC fail on WATCHed key modified (1 key of 1 watched)} { + r set x 30 + r watch x + r set x 40 + r multi + r ping + r exec + } {} + + test {EXEC fail on WATCHed key modified (1 key of 5 watched)} { + r set x{t} 30 + r watch a{t} b{t} x{t} k{t} z{t} + r set x{t} 40 + r multi + r ping + r exec + } {} + + # Pika does not support the sort command + # test {EXEC fail on WATCHed key modified by SORT with STORE even if the result is empty} { + # r flushdb + # r lpush foo bar + # r watch foo + # r sort emptylist store foo + # r multi + # r ping + # r exec + # } {} + + # Pika does not support the debug command + # test {EXEC fail on lazy expired WATCHed key} { + # r del key + # r debug set-active-expire 0 + + # for {set j 0} {$j < 10} {incr j} { + # r set key 1 px 100 + # r watch key + # after 101 + # r multi + # r incr key + + # set res [r exec] + # if {$res eq {}} break + # } + # if {$::verbose} { puts "EXEC fail on lazy expired WATCHed key attempts: $j" } + + # r debug set-active-expire 1 + # set _ $res + # } {} {needs:debug} + + # Pika does not support the debug command + # test {WATCH stale keys should not fail EXEC} { + # r del x + # r debug set-active-expire 0 + # r set x foo px 1 + # after 2 + # r watch x + # r multi + # r ping + # assert_equal {PONG} [r exec] + # r debug set-active-expire 1 + # } {OK} {needs:debug} + + # Pika does not support the debug command + # test {Delete WATCHed stale keys should not fail EXEC} { + # r del x + # r debug set-active-expire 0 + # r set x foo px 1 + # after 2 + # r watch x + # # EXISTS triggers lazy expiry/deletion + # assert_equal 0 [r exists x] + # r multi + # r ping + # assert_equal {PONG} [r exec] + # r debug set-active-expire 1 + # } {OK} {needs:debug} + + # Pika does not support the debug command + # test {FLUSHDB while watching stale keys should not fail EXEC} { + # r del x + # r debug set-active-expire 0 + # r set x foo px 1 + # after 2 + # r watch x + # r flushdb + # r multi + # r ping + # assert_equal {PONG} [r exec] + # r debug set-active-expire 1 + # } {OK} {needs:debug} test {After successful EXEC key is no longer watched} { r set x 30 @@ -164,15 +235,14 @@ start_server {tags {"multi"}} { r unwatch } {OK} -# The return value of Pika is inconsistent with Redis -# test {FLUSHALL is able to touch the watched keys} { -# r set x 30 -# r watch x -# r flushall -# r multi -# r ping -# r exec -# } {} + test {FLUSHALL is able to touch the watched keys} { + r set x 30 + r watch x + r flushall + r multi + r ping + r exec + } {} test {FLUSHALL does not touch non affected keys} { r del x @@ -183,15 +253,14 @@ start_server {tags {"multi"}} { r exec } {PONG} -# The return value of Pika is inconsistent with Redis -# test {FLUSHDB is able to touch the watched keys} { -# r set x 30 -# r watch x -# r flushdb -# r multi -# r ping -# r exec -# } {} + test {FLUSHDB is able to touch the watched keys} { + r set x 30 + r watch x + r flushdb + r multi + r ping + r exec + } {} test {FLUSHDB does not touch non affected keys} { r del x @@ -202,43 +271,119 @@ start_server {tags {"multi"}} { r exec } {PONG} -# The return value of Pika is inconsistent with Redis -# test {WATCH is able to remember the DB a key belongs to} { -# r select 5 -# r set x 30 -# r watch x -# r select 1 -# r set x 10 -# r select 5 -# r multi -# r ping -# set res [r exec] -# # Restore original DB -# r select 9 -# set res -# } {PONG} - -# The return value of Pika is inconsistent with Redis -# test {WATCH will consider touched keys target of EXPIRE} { -# r del x -# r set x foo -# r watch x -# r expire x 10 -# r multi -# r ping -# r exec -# } {} - - test {WATCH will not consider touched expired keys} { + # # Pika does not support the swapdb command + # test {SWAPDB is able to touch the watched keys that exist} { + # r flushall + # r select 0 + # r set x 30 + # r watch x ;# make sure x (set to 30) doesn't change (SWAPDB will "delete" it) + # r swapdb 0 1 + # r multi + # r ping + # r exec + # } {} {singledb:skip} + + # # Pika does not support the swapdb command + # test {SWAPDB is able to touch the watched keys that do not exist} { + # r flushall + # r select 1 + # r set x 30 + # r select 0 + # r watch x ;# make sure the key x (currently missing) doesn't change (SWAPDB will create it) + # r swapdb 0 1 + # r multi + # r ping + # r exec + # } {} {singledb:skip} + + # # Pika does not support the swapdb command + # test {SWAPDB does not touch watched stale keys} { + # r flushall + # r select 1 + # r debug set-active-expire 0 + # r set x foo px 1 + # after 2 + # r watch x + # r swapdb 0 1 ; # expired key replaced with no key => no change + # r multi + # r ping + # assert_equal {PONG} [r exec] + # r debug set-active-expire 1 + # } {OK} {singledb:skip needs:debug} + + # # Pika does not support the swapdb command + # test {SWAPDB does not touch non-existing key replaced with stale key} { + # r flushall + # r select 0 + # r debug set-active-expire 0 + # r set x foo px 1 + # after 2 + # r select 1 + # r watch x + # r swapdb 0 1 ; # no key replaced with expired key => no change + # r multi + # r ping + # assert_equal {PONG} [r exec] + # r debug set-active-expire 1 + # } {OK} {singledb:skip needs:debug} + + # Pika does not support the swapdb command + # test {SWAPDB does not touch stale key replaced with another stale key} { + # r flushall + # r debug set-active-expire 0 + # r select 1 + # r set x foo px 1 + # r select 0 + # r set x bar px 1 + # after 2 + # r select 1 + # r watch x + # r swapdb 0 1 ; # no key replaced with expired key => no change + # r multi + # r ping + # assert_equal {PONG} [r exec] + # r debug set-active-expire 1 + # } {OK} {singledb:skip needs:debug} + + test {WATCH is able to remember the DB a key belongs to} { + r select 0 + r set x 30 + r watch x + r select 1 + r set x 10 + r select 0 + r multi + r ping + set res [r exec] + r select 2 + set res + } {PONG} + + test {WATCH will consider touched keys target of EXPIRE} { r del x r set x foo - r expire x 1 r watch x - after 1100 + r expire x 10 r multi r ping r exec - } {PONG} + } {} + + # wait_for_dbsize command not support + # test {WATCH will consider touched expired keys} { + # r flushall + # r del x + # r set x foo + # r expire x 1 + # r watch x + + # # Wait for the keys to expire. + # wait_for_dbsize 0 + + # r multi + # r ping + # r exec + # } {} test {DISCARD should clear the WATCH dirty flag on the client} { r watch x @@ -261,61 +406,559 @@ start_server {tags {"multi"}} { r exec } {11} -# Pika does not support the sync command -# test {MULTI / EXEC is propagated correctly (single write command)} { -# set repl [attach_to_replication_stream] -# r multi -# r set foo bar -# r exec -# assert_replication_stream $repl { -# {select *} -# {multi} -# {set foo bar} -# {exec} -# } -# close_replication_stream $repl -# } - -# Pika does not support the sync command -# test {MULTI / EXEC is propagated correctly (empty transaction)} { -# set repl [attach_to_replication_stream] -# r multi -# r exec -# r set foo bar -# assert_replication_stream $repl { -# {select *} -# {set foo bar} -# } -# close_replication_stream $repl -# } - -# Pika does not support the sync command -# test {MULTI / EXEC is propagated correctly (read-only commands)} { -# r set foo value1 -# set repl [attach_to_replication_stream] -# r multi -# r get foo -# r exec -# r set foo value2 -# assert_replication_stream $repl { -# {select *} -# {set foo value2} -# } -# close_replication_stream $repl -# } - -# Pika does not support the sync command -# test {MULTI / EXEC is propagated correctly (write command, no effect)} { -# r del bar foo bar -# set repl [attach_to_replication_stream] -# r multi -# r del foo -# r exec -# assert_replication_stream $repl { -# {select *} -# {multi} -# {exec} -# } -# close_replication_stream $repl -# } + # Pika does not support the sync command + # test {MULTI / EXEC is not propagated (single write command)} { + # set repl [attach_to_replication_stream] + # r multi + # r set foo bar + # r exec + # r set foo2 bar + # assert_replication_stream $repl { + # {select *} + # {set foo bar} + # {set foo2 bar} + # } + # close_replication_stream $repl + # } {} {needs:repl} + + # Pika does not support the sync command + # test {MULTI / EXEC is propagated correctly (multiple commands)} { + # set repl [attach_to_replication_stream] + # r multi + # r set foo{t} bar + # r get foo{t} + # r set foo2{t} bar2 + # r get foo2{t} + # r set foo3{t} bar3 + # r get foo3{t} + # r exec + + # assert_replication_stream $repl { + # {multi} + # {select *} + # {set foo{t} bar} + # {set foo2{t} bar2} + # {set foo3{t} bar3} + # {exec} + # } + # close_replication_stream $repl + # } {} {needs:repl} + + # Pika does not support the sync command + # test {MULTI / EXEC is propagated correctly (multiple commands with SELECT)} { + # set repl [attach_to_replication_stream] + # r multi + # r select 1 + # r set foo{t} bar + # r get foo{t} + # r select 2 + # r set foo2{t} bar2 + # r get foo2{t} + # r select 3 + # r set foo3{t} bar3 + # r get foo3{t} + # r exec + + # assert_replication_stream $repl { + # {multi} + # {select *} + # {set foo{t} bar} + # {select *} + # {set foo2{t} bar2} + # {select *} + # {set foo3{t} bar3} + # {exec} + # } + # close_replication_stream $repl + # } {} {needs:repl singledb:skip} + + # Pika does not support the sync command + # test {MULTI / EXEC is propagated correctly (empty transaction)} { + # set repl [attach_to_replication_stream] + # r multi + # r exec + # r set foo bar + # assert_replication_stream $repl { + # {select *} + # {set foo bar} + # } + # close_replication_stream $repl + # } {} {needs:repl} + + # Pika does not support the sync command + # test {MULTI / EXEC is propagated correctly (read-only commands)} { + # r set foo value1 + # set repl [attach_to_replication_stream] + # r multi + # r get foo + # r exec + # r set foo value2 + # assert_replication_stream $repl { + # {select *} + # {set foo value2} + # } + # close_replication_stream $repl + # } {} {needs:repl} + + # Pika does not support the sync command + # test {MULTI / EXEC is propagated correctly (write command, no effect)} { + # r del bar + # r del foo + # set repl [attach_to_replication_stream] + # r multi + # r del foo + # r exec + + # # add another command so that when we see it we know multi-exec wasn't + # # propagated + # r incr foo + + # assert_replication_stream $repl { + # {select *} + # {incr foo} + # } + # close_replication_stream $repl + # } {} {needs:repl} + + # Pika does not support the sync command + # test {MULTI / EXEC with REPLICAOF} { + # # This test verifies that if we demote a master to replica inside a transaction, the + # # entire transaction is not propagated to the already-connected replica + # set repl [attach_to_replication_stream] + # r set foo bar + # r multi + # r set foo2 bar + # r replicaof localhost 9999 + # r set foo3 bar + # r exec + # catch {r set foo4 bar} e + # assert_match {READONLY*} $e + # assert_replication_stream $repl { + # {select *} + # {set foo bar} + # } + # r replicaof no one + # } {OK} {needs:repl cluster:skip} + + # Pika does not support the "config set maxmemory" command + # test {DISCARD should not fail during OOM} { + # set rd [redis_deferring_client] + # $rd config set maxmemory 1 + # assert {[$rd read] eq {OK}} + # r multi + # catch {r set x 1} e + # assert_match {OOM*} $e + # r discard + # $rd config set maxmemory 0 + # assert {[$rd read] eq {OK}} + # $rd close + # r ping + # } {PONG} {needs:config-maxmemory} + + # Pika does not support the "config set lua-time-limit" command + # test {MULTI and script timeout} { + # # check that if MULTI arrives during timeout, it is either refused, or + # # allowed to pass, and we don't end up executing half of the transaction + # set rd1 [redis_deferring_client] + # set r2 [redis_client] + # r config set lua-time-limit 10 + # r set xx 1 + # $rd1 eval {while true do end} 0 + # after 200 + # catch { $r2 multi; } e + # catch { $r2 incr xx; } e + # r script kill + # after 200 ; # Give some time to Lua to call the hook again... + # catch { $r2 incr xx; } e + # catch { $r2 exec; } e + # assert_match {EXECABORT*previous errors*} $e + # set xx [r get xx] + # # make sure that either the whole transcation passed or none of it (we actually expect none) + # assert { $xx == 1 || $xx == 3} + # # check that the connection is no longer in multi state + # set pong [$r2 ping asdf] + # assert_equal $pong "asdf" + # $rd1 close; $r2 close + # } + + # Pika does not support the "config set lua-time-limit" command + # test {EXEC and script timeout} { + # # check that if EXEC arrives during timeout, we don't end up executing + # # half of the transaction, and also that we exit the multi state + # set rd1 [redis_deferring_client] + # set r2 [redis_client] + # r config set lua-time-limit 10 + # r set xx 1 + # catch { $r2 multi; } e + # catch { $r2 incr xx; } e + # $rd1 eval {while true do end} 0 + # after 200 + # catch { $r2 incr xx; } e + # catch { $r2 exec; } e + # assert_match {EXECABORT*BUSY*} $e + # r script kill + # after 200 ; # Give some time to Lua to call the hook again... + # set xx [r get xx] + # # make sure that either the whole transcation passed or none of it (we actually expect none) + # assert { $xx == 1 || $xx == 3} + # # check that the connection is no longer in multi state + # set pong [$r2 ping asdf] + # assert_equal $pong "asdf" + # $rd1 close; $r2 close + # } + + # Pika does not support the "config set lua-time-limit" command + # test {MULTI-EXEC body and script timeout} { + # # check that we don't run an incomplete transaction due to some commands + # # arriving during busy script + # set rd1 [redis_deferring_client] + # set r2 [redis_client] + # r config set lua-time-limit 10 + # r set xx 1 + # catch { $r2 multi; } e + # catch { $r2 incr xx; } e + # $rd1 eval {while true do end} 0 + # after 200 + # catch { $r2 incr xx; } e + # r script kill + # after 200 ; # Give some time to Lua to call the hook again... + # catch { $r2 exec; } e + # assert_match {EXECABORT*previous errors*} $e + # set xx [r get xx] + # # make sure that either the whole transcation passed or none of it (we actually expect none) + # assert { $xx == 1 || $xx == 3} + # # check that the connection is no longer in multi state + # set pong [$r2 ping asdf] + # assert_equal $pong "asdf" + # $rd1 close; $r2 close + # } + + # Pika does not support the "config set lua-time-limit" command + # test {just EXEC and script timeout} { + # # check that if EXEC arrives during timeout, we don't end up executing + # # actual commands during busy script, and also that we exit the multi state + # set rd1 [redis_deferring_client] + # set r2 [redis_client] + # r config set lua-time-limit 10 + # r set xx 1 + # catch { $r2 multi; } e + # catch { $r2 incr xx; } e + # $rd1 eval {while true do end} 0 + # after 200 + # catch { $r2 exec; } e + # assert_match {EXECABORT*BUSY*} $e + # r script kill + # after 200 ; # Give some time to Lua to call the hook again... + # set xx [r get xx] + # # make we didn't execute the transaction + # assert { $xx == 1} + # # check that the connection is no longer in multi state + # set pong [$r2 ping asdf] + # assert_equal $pong "asdf" + # $rd1 close; $r2 close + # } + + # Pika does not support the "config set min-replicas-to-write" command + # test {exec with write commands and state change} { + # # check that exec that contains write commands fails if server state changed since they were queued + # set r1 [redis_client] + # r set xx 1 + # r multi + # r incr xx + # $r1 config set min-replicas-to-write 2 + # catch {r exec} e + # assert_match {*EXECABORT*NOREPLICAS*} $e + # set xx [r get xx] + # # make sure that the INCR wasn't executed + # assert { $xx == 1} + # $r1 config set min-replicas-to-write 0 + # $r1 close + # } {0} {needs:repl} + + # Pika does not support the "config set replica-serve-stale-data" command + # test {exec with read commands and stale replica state change} { + # # check that exec that contains read commands fails if server state changed since they were queued + # r config set replica-serve-stale-data no + # set r1 [redis_client] + # r set xx 1 + + # # check that GET and PING are disallowed on stale replica, even if the replica becomes stale only after queuing. + # r multi + # r get xx + # $r1 replicaof localhsot 0 + # catch {r exec} e + # assert_match {*EXECABORT*MASTERDOWN*} $e + + # # reset + # $r1 replicaof no one + + # r multi + # r ping + # $r1 replicaof localhsot 0 + # catch {r exec} e + # assert_match {*EXECABORT*MASTERDOWN*} $e + + # # check that when replica is not stale, GET is allowed + # # while we're at it, let's check that multi is allowed on stale replica too + # r multi + # $r1 replicaof no one + # r get xx + # set xx [r exec] + # # make sure that the INCR was executed + # assert { $xx == 1 } + # $r1 close + # } {0} {needs:repl cluster:skip} + + # Pika does not support the "config set maxmemory" command + # test {EXEC with only read commands should not be rejected when OOM} { + # set r2 [redis_client] + + # r set x value + # r multi + # r get x + # r ping + + # # enforcing OOM + # $r2 config set maxmemory 1 + + # # finish the multi transaction with exec + # assert { [r exec] == {value PONG} } + + # # releasing OOM + # $r2 config set maxmemory 0 + # $r2 close + # } {0} {needs:config-maxmemory} + + # Pika does not support the "config set maxmemory" command + # test {EXEC with at least one use-memory command should fail} { + # set r2 [redis_client] + + # r multi + # r set x 1 + # r get x + + # # enforcing OOM + # $r2 config set maxmemory 1 + + # # finish the multi transaction with exec + # catch {r exec} e + # assert_match {EXECABORT*OOM*} $e + + # # releasing OOM + # $r2 config set maxmemory 0 + # $r2 close + # } {0} {needs:config-maxmemory} + + # Pika does not support the xgroup command + # test {Blocking commands ignores the timeout} { + # r xgroup create s{t} g $ MKSTREAM + + # set m [r multi] + # r blpop empty_list{t} 0 + # r brpop empty_list{t} 0 + # r brpoplpush empty_list1{t} empty_list2{t} 0 + # r blmove empty_list1{t} empty_list2{t} LEFT LEFT 0 + # r bzpopmin empty_zset{t} 0 + # r bzpopmax empty_zset{t} 0 + # r xread BLOCK 0 STREAMS s{t} $ + # r xreadgroup group g c BLOCK 0 STREAMS s{t} > + # set res [r exec] + + # list $m $res + # } {OK {{} {} {} {} {} {} {} {}}} + + # Pika does not support the SYNC command + # test {MULTI propagation of PUBLISH} { + # set repl [attach_to_replication_stream] + + # r multi + # r publish bla bla + # r exec + + # assert_replication_stream $repl { + # {select *} + # {publish bla bla} + # } + # close_replication_stream $repl + # } {} {needs:repl cluster:skip} + + # Pika does not support the SYNC command + # test {MULTI propagation of SCRIPT LOAD} { + # set repl [attach_to_replication_stream] + + # # make sure that SCRIPT LOAD inside MULTI isn't propagated + # r multi + # r script load {redis.call('set', KEYS[1], 'foo')} + # r set foo bar + # set res [r exec] + # set sha [lindex $res 0] + + # assert_replication_stream $repl { + # {select *} + # {set foo bar} + # } + # close_replication_stream $repl + # } {} {needs:repl} + + # Pika does not support the SYNC command + # test {MULTI propagation of EVAL} { + # set repl [attach_to_replication_stream] + + # # make sure that EVAL inside MULTI is propagated in a transaction in effects + # r multi + # r eval {redis.call('set', KEYS[1], 'bar')} 1 bar + # r exec + + # assert_replication_stream $repl { + # {select *} + # {set bar bar} + # } + # close_replication_stream $repl + # } {} {needs:repl} + + # Pika does not support the SYNC command + # test {MULTI propagation of SCRIPT FLUSH} { + # set repl [attach_to_replication_stream] + + # # make sure that SCRIPT FLUSH isn't propagated + # r multi + # r script flush + # r set foo bar + # r exec + + # assert_replication_stream $repl { + # {select *} + # {set foo bar} + # } + # close_replication_stream $repl + # } {} {needs:repl} + + # Pika does not support the SYNC command + # tags {"stream"} { + # test {MULTI propagation of XREADGROUP} { + # set repl [attach_to_replication_stream] + + # r XADD mystream * foo bar + # r XADD mystream * foo2 bar2 + # r XADD mystream * foo3 bar3 + # r XGROUP CREATE mystream mygroup 0 + + # # make sure the XCALIM (propagated by XREADGROUP) is indeed inside MULTI/EXEC + # r multi + # r XREADGROUP GROUP mygroup consumer1 COUNT 2 STREAMS mystream ">" + # r XREADGROUP GROUP mygroup consumer1 STREAMS mystream ">" + # r exec + + # assert_replication_stream $repl { + # {select *} + # {xadd *} + # {xadd *} + # {xadd *} + # {xgroup CREATE *} + # {multi} + # {xclaim *} + # {xclaim *} + # {xgroup SETID * ENTRIESREAD *} + # {xclaim *} + # {xgroup SETID * ENTRIESREAD *} + # {exec} + # } + # close_replication_stream $repl + # } {} {needs:repl} + # } + + # Pika does not support the SAVE command + foreach {cmd} {SAVE SHUTDOWN} { + # The return value of Pika is inconsistent with Redis + # test "MULTI with $cmd" { + # r del foo + # r multi + # r set foo bar + # catch {r $cmd} e1 + # catch {r exec} e2 + # assert_match {*Command not allowed inside a transaction*} $e1 + # assert_match {EXECABORT*} $e2 + # r get foo + # } {} + } + + # Pika does not support the BGREWRITEAOF command + # test "MULTI with BGREWRITEAOF" { + # set forks [s total_forks] + # r multi + # r set foo bar + # r BGREWRITEAOF + # set res [r exec] + # assert_match "*rewriting scheduled*" [lindex $res 1] + # wait_for_condition 50 100 { + # [s total_forks] > $forks + # } else { + # fail "aofrw didn't start" + # } + # waitForBgrewriteaof r + # } {} {external:skip} + + # Pika does not support the "config set appendonly" command + # test "MULTI with config set appendonly" { + # set lines [count_log_lines 0] + # set forks [s total_forks] + # r multi + # r set foo bar + # r config set appendonly yes + # r exec + # verify_log_message 0 "*AOF background was scheduled*" $lines + # wait_for_condition 50 100 { + # [s total_forks] > $forks + # } else { + # fail "aofrw didn't start" + # } + # waitForBgrewriteaof r + # } {} {external:skip} + + # Pika does not support the "config set maxmemory" command + # test "MULTI with config error" { + # r multi + # r set foo bar + # r config set maxmemory bla + + # # letting the redis parser read it, it'll throw an exception instead of + # # reply with an array that contains an error, so we switch to reading + # # raw RESP instead + # r readraw 1 + + # set res [r exec] + # assert_equal $res "*2" + # set res [r read] + # assert_equal $res "+OK" + # set res [r read] + # r readraw 0 + # set _ $res + # } {*CONFIG SET failed*} + + test "Flushall while watching several keys by one client" { + r flushall + r mset a{t} a b{t} b + r watch b{t} a{t} + r flushall + r ping + } } + +# Pika does not support AOF +# start_server {overrides {appendonly {yes} appendfilename {appendonly.aof} appendfsync always} tags {external:skip}} { +# test {MULTI with FLUSHALL and AOF} { +# set aof [get_last_incr_aof_path r] +# r multi +# r set foo bar +# r flushall +# r exec +# assert_aof_content $aof { +# {multi} +# {select *} +# {set *} +# {flushall} +# {exec} +# } +# r get foo +# } {} +# } From 578fe41d7c2d986fe9da93ff5b1efd2e2bd3434c Mon Sep 17 00:00:00 2001 From: cheniujh <41671101+cheniujh@users.noreply.github.com> Date: Wed, 31 Jul 2024 15:37:30 +0800 Subject: [PATCH 3/7] feat: use RTC(Run-to-completion) model to speed up cache read (#2837) * feat: Improve the RTC process of Read/Write model (#2629) * (Demo) Do read cmd before task queue. && add workflow_dispatch for manual action * Check authed and write lock, fix go test error in MacOS and cache mode judge * fix some ut error by commands filter and return logic * rollback some flag,but add kCmdReadBeforeQueuefor get mget hget hget hgetall,hmget * move mget and hmget;add before_queue_time metrics * fix cost to copy cmd_table by remove c_ptr --------- Co-authored-by: chenbt <34958405+chenbt-hz@users.noreply.github.com> --- include/pika_client_conn.h | 9 +++++ include/pika_command.h | 5 ++- src/pika_client_conn.cc | 73 +++++++++++++++++++++++++++++++++++++- src/pika_command.cc | 55 +++++++++++++++++++--------- 4 files changed, 124 insertions(+), 18 deletions(-) diff --git a/include/pika_client_conn.h b/include/pika_client_conn.h index 9008d848c8..30a371f6cf 100644 --- a/include/pika_client_conn.h +++ b/include/pika_client_conn.h @@ -19,6 +19,7 @@ struct TimeStat { void Reset() { enqueue_ts_ = dequeue_ts_ = 0; process_done_ts_ = 0; + before_queue_ts_ = 0; } uint64_t start_ts() const { @@ -37,8 +38,13 @@ struct TimeStat { return process_done_ts_ > dequeue_ts_ ? process_done_ts_ - dequeue_ts_ : 0; } + uint64_t before_queue_time() const { + return process_done_ts_ > dequeue_ts_ ? before_queue_ts_ - enqueue_ts_ : 0; + } + uint64_t enqueue_ts_; uint64_t dequeue_ts_; + uint64_t before_queue_ts_; uint64_t process_done_ts_; }; @@ -67,8 +73,11 @@ class PikaClientConn : public net::RedisConn { const net::HandleType& handle_type, int max_conn_rbuf_size); ~PikaClientConn() = default; + bool IsInterceptedByRTC(std::string& opt); + void ProcessRedisCmds(const std::vector& argvs, bool async, std::string* response) override; + bool ReadCmdInCache(const net::RedisCmdArgsType& argv, const std::string& opt); void BatchExecRedisCmd(const std::vector& argvs); int DealMessage(const net::RedisCmdArgsType& argv, std::string* response) override { return 0; } static void DoBackgroundTask(void* arg); diff --git a/include/pika_command.h b/include/pika_command.h index 2c1d5c84fb..de06c332c8 100644 --- a/include/pika_command.h +++ b/include/pika_command.h @@ -247,6 +247,7 @@ const std::string kCmdNameXInfo = "xinfo"; const std::string kClusterPrefix = "pkcluster"; + /* * If a type holds a key, a new data structure * that uses the key will use this error @@ -289,7 +290,7 @@ enum CmdFlags { kCmdFlagsOperateKey = (1 << 19), // redis keySpace kCmdFlagsStream = (1 << 20), kCmdFlagsFast = (1 << 21), - kCmdFlagsSlow = (1 << 22), + kCmdFlagsSlow = (1 << 22) }; void inline RedisAppendContent(std::string& str, const std::string& value); @@ -536,6 +537,7 @@ class Cmd : public std::enable_shared_from_this { bool hasFlag(uint32_t flag) const; bool is_read() const; bool is_write() const; + bool isCacheRead() const; bool IsLocal() const; bool IsSuspend() const; @@ -579,6 +581,7 @@ class Cmd : public std::enable_shared_from_this { void ProcessCommand(const HintKeys& hint_key = HintKeys()); void InternalProcessCommand(const HintKeys& hint_key); void DoCommand(const HintKeys& hint_key); + bool DoReadCommandInCache(); void LogCommand() const; std::string name_; diff --git a/src/pika_client_conn.cc b/src/pika_client_conn.cc index daff036ceb..7834c057ef 100644 --- a/src/pika_client_conn.cc +++ b/src/pika_client_conn.cc @@ -18,6 +18,7 @@ #include "include/pika_server.h" #include "net/src/dispatch_thread.h" #include "net/src/worker_thread.h" +#include "src/pstd/include/scope_record_lock.h" extern std::unique_ptr g_pika_conf; extern PikaServer* g_pika_server; @@ -237,6 +238,7 @@ void PikaClientConn::ProcessSlowlog(const PikaCmdArgsType& argv, uint64_t do_dur LOG(ERROR) << "ip_port: " << ip_port() << ", db: " << current_db_ << ", command:" << slow_log << ", command_size: " << cmd_size - 1 << ", arguments: " << argv.size() << ", total_time(ms): " << time_stat_->total_time() / 1000 + << ", before_queue_time(ms): " << time_stat_->before_queue_time() / 1000 << ", queue_time(ms): " << time_stat_->queue_time() / 1000 << ", process_time(ms): " << time_stat_->process_time() / 1000 << ", cmd_time(ms): " << do_duration / 1000; @@ -255,13 +257,24 @@ void PikaClientConn::ProcessMonitor(const PikaCmdArgsType& argv) { g_pika_server->AddMonitorMessage(monitor_message); } +bool PikaClientConn::IsInterceptedByRTC(std::string& opt) { + //currently we only Intercept: Get, HGet + if (opt == kCmdNameGet && g_pika_conf->GetCacheString()) { + return true; + } + if (opt == kCmdNameHGet && g_pika_conf->GetCacheHash()) { + return true; + } + return false; +} + void PikaClientConn::ProcessRedisCmds(const std::vector& argvs, bool async, std::string* response) { time_stat_->Reset(); if (async) { auto arg = new BgTaskArg(); arg->redis_cmds = argvs; - time_stat_->enqueue_ts_ = pstd::NowMicros(); + time_stat_->enqueue_ts_ = time_stat_->before_queue_ts_ = pstd::NowMicros(); arg->conn_ptr = std::dynamic_pointer_cast(shared_from_this()); /** * If using the pipeline method to transmit batch commands to Pika, it is unable to @@ -273,6 +286,19 @@ void PikaClientConn::ProcessRedisCmds(const std::vector& pstd::StringToLower(opt); bool is_slow_cmd = g_pika_conf->is_slow_cmd(opt); bool is_admin_cmd = g_pika_conf->is_admin_cmd(opt); + + //we don't intercept pipeline batch (argvs.size() > 1) + if (argvs.size() == 1 && IsInterceptedByRTC(opt) && + PIKA_CACHE_NONE != g_pika_conf->cache_mode() && + !IsInTxn()) { + // read in cache + if (ReadCmdInCache(argvs[0], opt)) { + delete arg; + return; + } + time_stat_->before_queue_ts_ = pstd::NowMicros(); + } + g_pika_server->ScheduleClientPool(&DoBackgroundTask, arg, is_slow_cmd, is_admin_cmd); return; } @@ -308,6 +334,51 @@ void PikaClientConn::BatchExecRedisCmd(const std::vector& TryWriteResp(); } +bool PikaClientConn::ReadCmdInCache(const net::RedisCmdArgsType& argv, const std::string& opt) { + resp_num.store(1); + std::shared_ptr c_ptr = g_pika_cmd_table_manager->GetCmd(opt); + if (!c_ptr) { + return false; + } + // Check authed + if (AuthRequired()) { // the user is not authed, need to do auth + if (!(c_ptr->flag() & kCmdFlagsNoAuth)) { + return false; + } + } + // Initial + c_ptr->Initial(argv, current_db_); + //acl check + int8_t subCmdIndex = -1; + std::string errKey; + auto checkRes = user_->CheckUserPermission(c_ptr, argv, subCmdIndex, &errKey); + std::string object; + if (checkRes == AclDeniedCmd::CMD || + checkRes == AclDeniedCmd::KEY || + checkRes == AclDeniedCmd::CHANNEL || + checkRes == AclDeniedCmd::NO_SUB_CMD || + checkRes == AclDeniedCmd::NO_AUTH + ) { + //acl check failed + return false; + } + //only read command(Get, HGet) will reach here, no need of record lock + if (c_ptr->db_->cache()->CacheStatus() != PIKA_CACHE_STATUS_OK) { + return false; + } + bool read_status = c_ptr->DoReadCommandInCache(); + auto cmdstat_map = g_pika_cmd_table_manager->GetCommandStatMap(); + resp_num--; + if (read_status) { + time_stat_->process_done_ts_ = pstd::NowMicros(); + (*cmdstat_map)[argv[0]].cmd_count.fetch_add(1); + (*cmdstat_map)[argv[0]].cmd_time_consuming.fetch_add(time_stat_->total_time()); + resp_array.emplace_back(std::make_shared(std::move(c_ptr->res().message()))); + TryWriteResp(); + } + return read_status; +} + void PikaClientConn::TryWriteResp() { int expected = 0; if (resp_num.compare_exchange_strong(expected, -1)) { diff --git a/src/pika_command.cc b/src/pika_command.cc index bae2edd144..b92f75dd22 100644 --- a/src/pika_command.cc +++ b/src/pika_command.cc @@ -243,7 +243,7 @@ void InitCmdTable(CmdTable* cmd_table) { cmd_table->insert(std::pair>(kCmdNameSet, std::move(setptr))); ////GetCmd std::unique_ptr getptr = - std::make_unique(kCmdNameGet, 2, kCmdFlagsRead | kCmdFlagsKv | kCmdFlagsDoThroughDB | kCmdFlagsUpdateCache | kCmdFlagsReadCache | kCmdFlagsSlow); + std::make_unique(kCmdNameGet, 2, kCmdFlagsRead | kCmdFlagsKv | kCmdFlagsDoThroughDB | kCmdFlagsUpdateCache | kCmdFlagsReadCache | kCmdFlagsSlow); cmd_table->insert(std::pair>(kCmdNameGet, std::move(getptr))); ////DelCmd std::unique_ptr delptr = @@ -392,7 +392,7 @@ void InitCmdTable(CmdTable* cmd_table) { cmd_table->insert(std::pair>(kCmdNameHSet, std::move(hsetptr))); ////HGetCmd std::unique_ptr hgetptr = - std::make_unique(kCmdNameHGet, 3, kCmdFlagsRead | kCmdFlagsHash | kCmdFlagsUpdateCache | kCmdFlagsDoThroughDB | kCmdFlagsReadCache | kCmdFlagsFast); + std::make_unique(kCmdNameHGet, 3, kCmdFlagsRead | kCmdFlagsHash | kCmdFlagsUpdateCache | kCmdFlagsDoThroughDB | kCmdFlagsReadCache |kCmdFlagsFast); cmd_table->insert(std::pair>(kCmdNameHGet, std::move(hgetptr))); ////HGetallCmd std::unique_ptr hgetallptr = @@ -400,7 +400,7 @@ void InitCmdTable(CmdTable* cmd_table) { cmd_table->insert(std::pair>(kCmdNameHGetall, std::move(hgetallptr))); ////HExistsCmd std::unique_ptr hexistsptr = - std::make_unique(kCmdNameHExists, 3, kCmdFlagsRead | kCmdFlagsHash | kCmdFlagsUpdateCache | kCmdFlagsDoThroughDB | kCmdFlagsReadCache | kCmdFlagsFast); + std::make_unique(kCmdNameHExists, 3, kCmdFlagsRead | kCmdFlagsHash | kCmdFlagsUpdateCache | kCmdFlagsDoThroughDB | kCmdFlagsReadCache | kCmdFlagsFast ); cmd_table->insert(std::pair>(kCmdNameHExists, std::move(hexistsptr))); ////HIncrbyCmd std::unique_ptr hincrbyptr = @@ -420,7 +420,7 @@ void InitCmdTable(CmdTable* cmd_table) { cmd_table->insert(std::pair>(kCmdNameHLen, std::move(hlenptr))); ////HMgetCmd std::unique_ptr hmgetptr = - std::make_unique(kCmdNameHMget, -3, kCmdFlagsRead | kCmdFlagsHash | kCmdFlagsUpdateCache | kCmdFlagsDoThroughDB | kCmdFlagsReadCache | kCmdFlagsFast); + std::make_unique(kCmdNameHMget, -3, kCmdFlagsRead | kCmdFlagsHash | kCmdFlagsUpdateCache | kCmdFlagsDoThroughDB | kCmdFlagsReadCache |kCmdFlagsFast); cmd_table->insert(std::pair>(kCmdNameHMget, std::move(hmgetptr))); ////HMsetCmd std::unique_ptr hmsetptr = @@ -736,50 +736,50 @@ void InitCmdTable(CmdTable* cmd_table) { // PubSub ////Publish std::unique_ptr publishptr = - std::make_unique(kCmdNamePublish, 3, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsFast); + std::make_unique(kCmdNamePublish, 3, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsFast ); cmd_table->insert(std::pair>(kCmdNamePublish, std::move(publishptr))); ////Subscribe std::unique_ptr subscribeptr = - std::make_unique(kCmdNameSubscribe, -2, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsSlow); + std::make_unique(kCmdNameSubscribe, -2, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsSlow ); cmd_table->insert(std::pair>(kCmdNameSubscribe, std::move(subscribeptr))); ////UnSubscribe std::unique_ptr unsubscribeptr = - std::make_unique(kCmdNameUnSubscribe, -1, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsSlow); + std::make_unique(kCmdNameUnSubscribe, -1, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsSlow ); cmd_table->insert(std::pair>(kCmdNameUnSubscribe, std::move(unsubscribeptr))); ////PSubscribe std::unique_ptr psubscribeptr = - std::make_unique(kCmdNamePSubscribe, -2, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsSlow); + std::make_unique(kCmdNamePSubscribe, -2, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsSlow ); cmd_table->insert(std::pair>(kCmdNamePSubscribe, std::move(psubscribeptr))); ////PUnSubscribe std::unique_ptr punsubscribeptr = - std::make_unique(kCmdNamePUnSubscribe, -1, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsSlow); + std::make_unique(kCmdNamePUnSubscribe, -1, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsSlow ); cmd_table->insert(std::pair>(kCmdNamePUnSubscribe, std::move(punsubscribeptr))); ////PubSub std::unique_ptr pubsubptr = - std::make_unique(kCmdNamePubSub, -2, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsSlow); + std::make_unique(kCmdNamePubSub, -2, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsSlow ); cmd_table->insert(std::pair>(kCmdNamePubSub, std::move(pubsubptr))); ////ACL - std::unique_ptr aclptr = std::make_unique(KCmdNameAcl, -2, kCmdFlagsAdmin | kCmdFlagsSlow); + std::unique_ptr aclptr = std::make_unique(KCmdNameAcl, -2, kCmdFlagsAdmin | kCmdFlagsSlow ); cmd_table->insert(std::pair>(KCmdNameAcl, std::move(aclptr))); // Transaction ////Multi std::unique_ptr multiptr = - std::make_unique(kCmdNameMulti, 1, kCmdFlagsRead | kCmdFlagsFast); + std::make_unique(kCmdNameMulti, 1, kCmdFlagsRead | kCmdFlagsFast ); cmd_table->insert(std::pair>(kCmdNameMulti, std::move(multiptr))); ////Exec std::unique_ptr execptr = std::make_unique( - kCmdNameExec, 1, kCmdFlagsRead | kCmdFlagsWrite | kCmdFlagsSuspend | kCmdFlagsSlow); + kCmdNameExec, 1, kCmdFlagsRead | kCmdFlagsWrite | kCmdFlagsSuspend | kCmdFlagsSlow ); cmd_table->insert(std::pair>(kCmdNameExec, std::move(execptr))); ////Discard - std::unique_ptr discardptr = std::make_unique(kCmdNameDiscard, 1, kCmdFlagsRead | kCmdFlagsFast); + std::unique_ptr discardptr = std::make_unique(kCmdNameDiscard, 1, kCmdFlagsRead | kCmdFlagsFast ); cmd_table->insert(std::pair>(kCmdNameDiscard, std::move(discardptr))); ////Watch - std::unique_ptr watchptr = std::make_unique(kCmdNameWatch, -2, kCmdFlagsRead | kCmdFlagsFast); + std::unique_ptr watchptr = std::make_unique(kCmdNameWatch, -2, kCmdFlagsRead | kCmdFlagsFast ); cmd_table->insert(std::pair>(kCmdNameWatch, std::move(watchptr))); ////Unwatch - std::unique_ptr unwatchptr = std::make_unique(kCmdNameUnWatch, 1, kCmdFlagsRead | kCmdFlagsFast); + std::unique_ptr unwatchptr = std::make_unique(kCmdNameUnWatch, 1, kCmdFlagsRead | kCmdFlagsFast ); cmd_table->insert(std::pair>(kCmdNameUnWatch, std::move(unwatchptr))); // Stream @@ -911,6 +911,29 @@ void Cmd::DoCommand(const HintKeys& hint_keys) { } } +bool Cmd::DoReadCommandInCache() { + if (!IsSuspend()) { + db_->DBLockShared(); + } + DEFER { + if (!IsSuspend()) { + db_->DBUnlockShared(); + } + }; + + if (db_->cache()->CacheStatus() == PIKA_CACHE_STATUS_OK) { + if (IsNeedReadCache()) { + ReadCache(); + } + // return true only the read command hit + if (is_read() && !res().CacheMiss()) { + return true; + } + } + return false; +} + + void Cmd::DoBinlog() { if (res().ok() && is_write() && g_pika_conf->write_binlog()) { std::shared_ptr conn_ptr = GetConn(); From 2f3bedb86343fabea1759bb6d333852adc162a8c Mon Sep 17 00:00:00 2001 From: cheniujh <41671101+cheniujh@users.noreply.github.com> Date: Wed, 31 Jul 2024 16:16:20 +0800 Subject: [PATCH 4/7] fix: flushdb may cause master-slave inconsistency (#2808) * add comment to explain the idea * add mtx and waiting for test * use lockfree implementation, some logs are waiting to remove * change the call_back to an optional dtor call back of BGItem * revised based on reviewer's opinion * removed some comments * change the declared position of async_write_db_task_count --------- Co-authored-by: Xin.Zh --- include/pika_consensus.h | 3 ++- include/pika_repl_bgworker.h | 6 +++--- include/pika_repl_client.h | 36 +++++++++++++++++++++++++++++------- include/pika_rm.h | 6 +++++- src/net/include/bg_thread.h | 17 +++++++++++++---- src/net/src/bg_thread.cc | 27 +++++++++++++++++---------- src/pika_consensus.cc | 27 ++++++++++++++++++++++----- src/pika_repl_bgworker.cc | 10 +++++++--- src/pika_repl_client.cc | 17 ++++++++++++----- src/pika_rm.cc | 5 ++--- 10 files changed, 112 insertions(+), 42 deletions(-) diff --git a/include/pika_consensus.h b/include/pika_consensus.h index b289b425b7..bb774b5e3b 100644 --- a/include/pika_consensus.h +++ b/include/pika_consensus.h @@ -170,7 +170,7 @@ class ConsensusCoordinator { pstd::Status InternalAppendLog(const std::shared_ptr& cmd_ptr); pstd::Status InternalAppendBinlog(const std::shared_ptr& cmd_ptr); void InternalApply(const MemLog::LogItem& log); - void InternalApplyFollower(const MemLog::LogItem& log); + void InternalApplyFollower(const std::shared_ptr& cmd_ptr); pstd::Status GetBinlogOffset(const BinlogOffset& start_offset, LogOffset* log_offset); pstd::Status GetBinlogOffset(const BinlogOffset& start_offset, const BinlogOffset& end_offset, @@ -182,6 +182,7 @@ class ConsensusCoordinator { pstd::Status FindLogicOffset(const BinlogOffset& start_offset, uint64_t target_index, LogOffset* found_offset); pstd::Status GetLogsBefore(const BinlogOffset& start_offset, std::vector* hints); + private: // keep members in this class works in order pstd::Mutex order_mu_; diff --git a/include/pika_repl_bgworker.h b/include/pika_repl_bgworker.h index e9d6a1b034..dcf59c4b94 100644 --- a/include/pika_repl_bgworker.h +++ b/include/pika_repl_bgworker.h @@ -8,7 +8,7 @@ #include #include - +#include #include "net/include/bg_thread.h" #include "net/include/pb_conn.h" #include "net/include/thread_pool.h" @@ -25,13 +25,13 @@ class PikaReplBgWorker { int StartThread(); int StopThread(); void Schedule(net::TaskFunc func, void* arg); - void QueueClear(); + void Schedule(net::TaskFunc func, void* arg, std::function& call_back); static void HandleBGWorkerWriteBinlog(void* arg); static void HandleBGWorkerWriteDB(void* arg); + static void WriteDBInSyncWay(const std::shared_ptr& c_ptr); void SetThreadName(const std::string& thread_name) { bg_thread_.set_thread_name(thread_name); } - BinlogItem binlog_item_; net::RedisParser redis_parser_; std::string ip_port_; diff --git a/include/pika_repl_client.h b/include/pika_repl_client.h index 49eb2c9b82..73fb897a62 100644 --- a/include/pika_repl_client.h +++ b/include/pika_repl_client.h @@ -44,12 +44,8 @@ struct ReplClientWriteBinlogTaskArg { struct ReplClientWriteDBTaskArg { const std::shared_ptr cmd_ptr; - LogOffset offset; - std::string db_name; - ReplClientWriteDBTaskArg(std::shared_ptr _cmd_ptr, const LogOffset& _offset, std::string _db_name) - : cmd_ptr(std::move(_cmd_ptr)), - offset(_offset), - db_name(std::move(_db_name)) {} + explicit ReplClientWriteDBTaskArg(std::shared_ptr _cmd_ptr) + : cmd_ptr(std::move(_cmd_ptr)) {} ~ReplClientWriteDBTaskArg() = default; }; @@ -68,7 +64,7 @@ class PikaReplClient { void ScheduleByDBName(net::TaskFunc func, void* arg, const std::string& db_name); void ScheduleWriteBinlogTask(const std::string& db_name, const std::shared_ptr& res, const std::shared_ptr& conn, void* res_private_data); - void ScheduleWriteDBTask(const std::shared_ptr& cmd_ptr, const LogOffset& offset, const std::string& db_name); + void ScheduleWriteDBTask(const std::shared_ptr& cmd_ptr, const std::string& db_name); pstd::Status SendMetaSync(); pstd::Status SendDBSync(const std::string& ip, uint32_t port, const std::string& db_name, @@ -80,6 +76,24 @@ class PikaReplClient { const std::string& local_ip, bool is_first_send); pstd::Status SendRemoveSlaveNode(const std::string& ip, uint32_t port, const std::string& db_name, const std::string& local_ip); + void IncrAsyncWriteDBTaskCount(const std::string& db_name, int32_t incr_step) { + int32_t db_index = db_name.back() - '0'; + assert(db_index >= 0 && db_index <= 7); + async_write_db_task_counts_[db_index].fetch_add(incr_step, std::memory_order::memory_order_seq_cst); + } + + void DecrAsyncWriteDBTaskCount(const std::string& db_name, int32_t incr_step) { + int32_t db_index = db_name.back() - '0'; + assert(db_index >= 0 && db_index <= 7); + async_write_db_task_counts_[db_index].fetch_sub(incr_step, std::memory_order::memory_order_seq_cst); + } + + int32_t GetUnfinishedAsyncWriteDBTaskCount(const std::string& db_name) { + int32_t db_index = db_name.back() - '0'; + assert(db_index >= 0 && db_index <= 7); + return async_write_db_task_counts_[db_index].load(std::memory_order_seq_cst); + } + private: size_t GetBinlogWorkerIndexByDBName(const std::string &db_name); size_t GetHashIndexByKey(const std::string& key); @@ -88,6 +102,14 @@ class PikaReplClient { std::unique_ptr client_thread_; int next_avail_ = 0; std::hash str_hash; + + // async_write_db_task_counts_ is used when consuming binlog, which indicates the nums of async write-DB tasks that are + // queued or being executing by WriteDBWorkers. If a flushdb-binlog need to apply DB, it must wait + // util this count drop to zero. you can also check pika discussion #2807 to know more + // it is only used in slaveNode when consuming binlog + std::atomic async_write_db_task_counts_[MAX_DB_NUM]; + // [NOTICE] write_db_workers_ must be declared after async_write_db_task_counts_ to ensure write_db_workers_ will be destroyed before async_write_db_task_counts_ + // when PikaReplClient is de-constructing, because some of the async task that exec by write_db_workers_ will manipulate async_write_db_task_counts_ std::vector> write_binlog_workers_; std::vector> write_db_workers_; }; diff --git a/include/pika_rm.h b/include/pika_rm.h index 8dcb7a3504..ec80c1ff58 100644 --- a/include/pika_rm.h +++ b/include/pika_rm.h @@ -184,7 +184,7 @@ class PikaReplicaManager { void ScheduleWriteBinlogTask(const std::string& db_name, const std::shared_ptr& res, const std::shared_ptr& conn, void* res_private_data); - void ScheduleWriteDBTask(const std::shared_ptr& cmd_ptr, const LogOffset& offset, const std::string& db_name); + void ScheduleWriteDBTask(const std::shared_ptr& cmd_ptr, const std::string& db_name); void ScheduleReplClientBGTaskByDBName(net::TaskFunc , void* arg, const std::string &db_name); void ReplServerRemoveClientConn(int fd); void ReplServerUpdateClientConnMap(const std::string& ip_port, int fd); @@ -205,6 +205,10 @@ class PikaReplicaManager { return sync_slave_dbs_; } + int32_t GetUnfinishedAsyncWriteDBTaskCount(const std::string& db_name) { + return pika_repl_client_->GetUnfinishedAsyncWriteDBTaskCount(db_name); + } + private: void InitDB(); pstd::Status SelectLocalIp(const std::string& remote_ip, int remote_port, std::string* local_ip); diff --git a/src/net/include/bg_thread.h b/src/net/include/bg_thread.h index 5da80e1d69..b9c5259273 100644 --- a/src/net/include/bg_thread.h +++ b/src/net/include/bg_thread.h @@ -8,7 +8,7 @@ #include #include - +#include #include "net/include/net_thread.h" #include "pstd/include/pstd_mutex.h" @@ -41,7 +41,7 @@ class BGThread final : public Thread { } void Schedule(void (*function)(void*), void* arg); - + void Schedule(void (*function)(void*), void* arg, std::function& call_back); /* * timeout is in millionsecond */ @@ -52,13 +52,22 @@ class BGThread final : public Thread { void SwallowReadyTasks(); private: - struct BGItem { + class BGItem { + public: void (*function)(void*); void* arg; + //dtor_call_back is an optional call back fun + std::function dtor_call_back; BGItem(void (*_function)(void*), void* _arg) : function(_function), arg(_arg) {} + BGItem(void (*_function)(void*), void* _arg, std::function& _dtor_call_back) : function(_function), arg(_arg), dtor_call_back(_dtor_call_back) {} + ~BGItem() { + if (dtor_call_back) { + dtor_call_back(); + } + } }; - std::queue queue_; + std::queue> queue_; std::priority_queue timer_queue_; size_t full_; diff --git a/src/net/src/bg_thread.cc b/src/net/src/bg_thread.cc index 49a0c519e9..b0835330f9 100644 --- a/src/net/src/bg_thread.cc +++ b/src/net/src/bg_thread.cc @@ -4,13 +4,9 @@ // of patent rights can be found in the PATENTS file in the same directory. #include "net/include/bg_thread.h" -#include #include #include -#include "pstd/include/pstd_mutex.h" -#include "pstd/include/xdebug.h" - namespace net { void BGThread::Schedule(void (*function)(void*), void* arg) { @@ -19,11 +15,22 @@ void BGThread::Schedule(void (*function)(void*), void* arg) { wsignal_.wait(lock, [this]() { return queue_.size() < full_ || should_stop(); }); if (!should_stop()) { - queue_.emplace(function, arg); + queue_.emplace(std::make_unique(function, arg)); rsignal_.notify_one(); } } +void BGThread::Schedule(void (*function)(void*), void* arg, std::function& call_back) { + std::unique_lock lock(mu_); + + wsignal_.wait(lock, [this]() { return queue_.size() < full_ || should_stop(); }); + + if (!should_stop()) { + queue_.emplace(std::make_unique(function, arg, call_back)); + rsignal_.notify_one(); + } +}; + void BGThread::QueueSize(int* pri_size, int* qu_size) { std::lock_guard lock(mu_); *pri_size = static_cast(timer_queue_.size()); @@ -32,7 +39,7 @@ void BGThread::QueueSize(int* pri_size, int* qu_size) { void BGThread::QueueClear() { std::lock_guard lock(mu_); - std::queue().swap(queue_); + std::queue>().swap(queue_); std::priority_queue().swap(timer_queue_); wsignal_.notify_one(); } @@ -42,10 +49,10 @@ void BGThread::SwallowReadyTasks() { // while the schedule function would stop to add any tasks. mu_.lock(); while (!queue_.empty()) { - auto [function, arg] = queue_.front(); + std::unique_ptr task_item = std::move(queue_.front()); queue_.pop(); mu_.unlock(); - (*function)(arg); + task_item->function(task_item->arg); mu_.lock(); } mu_.unlock(); @@ -96,11 +103,11 @@ void* BGThread::ThreadMain() { } if (!queue_.empty()) { - auto [function, arg] = queue_.front(); + std::unique_ptr task_item = std::move(queue_.front()); queue_.pop(); wsignal_.notify_one(); lock.unlock(); - (*function)(arg); + task_item->function(task_item->arg); } } // swalloc all the remain tasks in ready and timer queue diff --git a/src/pika_consensus.cc b/src/pika_consensus.cc index 05cf535b48..89f10e0317 100644 --- a/src/pika_consensus.cc +++ b/src/pika_consensus.cc @@ -346,9 +346,26 @@ Status ConsensusCoordinator::ProcessLeaderLog(const std::shared_ptr& cmd_pt return Status::OK(); } - Status s = InternalAppendLog(cmd_ptr); - - InternalApplyFollower(MemLog::LogItem(LogOffset(), cmd_ptr, nullptr, nullptr)); + auto opt = cmd_ptr->argv()[0]; + if (pstd::StringToLower(opt) != kCmdNameFlushdb) { + // apply binlog in sync way + Status s = InternalAppendLog(cmd_ptr); + // apply db in async way + InternalApplyFollower(cmd_ptr); + } else { + // this is a flushdb-binlog, both apply binlog and apply db are in sync way + // ensure all writeDB task that submitted before has finished before we exec this flushdb + int32_t wait_ms = 250; + while (g_pika_rm->GetUnfinishedAsyncWriteDBTaskCount(db_name_) > 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(wait_ms)); + wait_ms *= 2; + wait_ms = wait_ms < 3000 ? wait_ms : 3000; + } + // apply flushdb-binlog in sync way + Status s = InternalAppendLog(cmd_ptr); + // applyDB in sync way + PikaReplBgWorker::WriteDBInSyncWay(cmd_ptr); + } return Status::OK(); } @@ -406,8 +423,8 @@ uint32_t ConsensusCoordinator::term() { return term_; } -void ConsensusCoordinator::InternalApplyFollower(const MemLog::LogItem& log) { - g_pika_rm->ScheduleWriteDBTask(log.cmd_ptr, log.offset, db_name_); +void ConsensusCoordinator::InternalApplyFollower(const std::shared_ptr& cmd_ptr) { + g_pika_rm->ScheduleWriteDBTask(cmd_ptr, db_name_); } int ConsensusCoordinator::InitCmd(net::RedisParser* parser, const net::RedisCmdArgsType& argv) { diff --git a/src/pika_repl_bgworker.cc b/src/pika_repl_bgworker.cc index b9a88e4536..05a7915a0f 100644 --- a/src/pika_repl_bgworker.cc +++ b/src/pika_repl_bgworker.cc @@ -32,7 +32,9 @@ int PikaReplBgWorker::StopThread() { return bg_thread_.StopThread(); } void PikaReplBgWorker::Schedule(net::TaskFunc func, void* arg) { bg_thread_.Schedule(func, arg); } -void PikaReplBgWorker::QueueClear() { bg_thread_.QueueClear(); } +void PikaReplBgWorker::Schedule(net::TaskFunc func, void* arg, std::function& call_back) { + bg_thread_.Schedule(func, arg, call_back); +} void PikaReplBgWorker::ParseBinlogOffset(const InnerMessage::BinlogOffset& pb_offset, LogOffset* offset) { offset->b_offset.filenum = pb_offset.filenum(); @@ -207,9 +209,11 @@ int PikaReplBgWorker::HandleWriteBinlog(net::RedisParser* parser, const net::Red void PikaReplBgWorker::HandleBGWorkerWriteDB(void* arg) { std::unique_ptr task_arg(static_cast(arg)); const std::shared_ptr c_ptr = task_arg->cmd_ptr; + WriteDBInSyncWay(c_ptr); +} + +void PikaReplBgWorker::WriteDBInSyncWay(const std::shared_ptr& c_ptr) { const PikaCmdArgsType& argv = c_ptr->argv(); - LogOffset offset = task_arg->offset; - std::string db_name = task_arg->db_name; uint64_t start_us = 0; if (g_pika_conf->slowlog_slower_than() >= 0) { diff --git a/src/pika_repl_client.cc b/src/pika_repl_client.cc index 2d53be265c..77e3a60f78 100644 --- a/src/pika_repl_client.cc +++ b/src/pika_repl_client.cc @@ -24,7 +24,10 @@ using pstd::Status; extern PikaServer* g_pika_server; extern std::unique_ptr g_pika_rm; -PikaReplClient::PikaReplClient(int cron_interval, int keepalive_timeout) { +PikaReplClient::PikaReplClient(int cron_interval, int keepalive_timeout) { + for (int i = 0; i < MAX_DB_NUM; i++) { + async_write_db_task_counts_[i].store(0, std::memory_order::memory_order_seq_cst); + } client_thread_ = std::make_unique(cron_interval, keepalive_timeout); client_thread_->set_thread_name("PikaReplClient"); for (int i = 0; i < g_pika_conf->sync_binlog_thread_num(); i++) { @@ -98,13 +101,17 @@ void PikaReplClient::ScheduleWriteBinlogTask(const std::string& db_name, write_binlog_workers_[index]->Schedule(&PikaReplBgWorker::HandleBGWorkerWriteBinlog, static_cast(task_arg)); } -void PikaReplClient::ScheduleWriteDBTask(const std::shared_ptr& cmd_ptr, const LogOffset& offset, - const std::string& db_name) { +void PikaReplClient::ScheduleWriteDBTask(const std::shared_ptr& cmd_ptr, const std::string& db_name) { const PikaCmdArgsType& argv = cmd_ptr->argv(); std::string dispatch_key = argv.size() >= 2 ? argv[1] : argv[0]; size_t index = GetHashIndexByKey(dispatch_key); - auto task_arg = new ReplClientWriteDBTaskArg(cmd_ptr, offset, db_name); - write_db_workers_[index]->Schedule(&PikaReplBgWorker::HandleBGWorkerWriteDB, static_cast(task_arg)); + auto task_arg = new ReplClientWriteDBTaskArg(cmd_ptr); + + IncrAsyncWriteDBTaskCount(db_name, 1); + std::function task_finish_call_back = [this, db_name]() { this->DecrAsyncWriteDBTaskCount(db_name, 1); }; + + write_db_workers_[index]->Schedule(&PikaReplBgWorker::HandleBGWorkerWriteDB, static_cast(task_arg), + task_finish_call_back); } size_t PikaReplClient::GetBinlogWorkerIndexByDBName(const std::string &db_name) { diff --git a/src/pika_rm.cc b/src/pika_rm.cc index 2cadef2407..9df7b82101 100644 --- a/src/pika_rm.cc +++ b/src/pika_rm.cc @@ -683,9 +683,8 @@ void PikaReplicaManager::ScheduleWriteBinlogTask(const std::string& db, pika_repl_client_->ScheduleWriteBinlogTask(db, res, conn, res_private_data); } -void PikaReplicaManager::ScheduleWriteDBTask(const std::shared_ptr& cmd_ptr, const LogOffset& offset, - const std::string& db_name) { - pika_repl_client_->ScheduleWriteDBTask(cmd_ptr, offset, db_name); +void PikaReplicaManager::ScheduleWriteDBTask(const std::shared_ptr& cmd_ptr, const std::string& db_name) { + pika_repl_client_->ScheduleWriteDBTask(cmd_ptr, db_name); } void PikaReplicaManager::ReplServerRemoveClientConn(int fd) { pika_repl_server_->RemoveClientConn(fd); } From 061edba70357aee1c8f8e1e7ef11ae011c153adf Mon Sep 17 00:00:00 2001 From: chejinge <945997690@qq.com> Date: Fri, 2 Aug 2024 09:58:33 +0800 Subject: [PATCH 5/7] fix:delete logs (#2840) Co-authored-by: chejinge --- src/pika_cache_load_thread.cc | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/pika_cache_load_thread.cc b/src/pika_cache_load_thread.cc index 5e4a050706..b2205a7d49 100644 --- a/src/pika_cache_load_thread.cc +++ b/src/pika_cache_load_thread.cc @@ -69,8 +69,6 @@ bool PikaCacheLoadThread::LoadHash(std::string& key, const std::shared_ptr& int32_t len = 0; db->storage()->HLen(key, &len); if (0 >= len || CACHE_VALUE_ITEM_MAX_SIZE < len) { - LOG(WARNING) << "can not load key, because item size:" << len - << " beyond max item size:" << CACHE_VALUE_ITEM_MAX_SIZE; return false; } @@ -205,8 +203,6 @@ void *PikaCacheLoadThread::ThreadMain() { for (auto & load_key : load_keys) { if (LoadKey(std::get<0>(load_key), std::get<1>(load_key), std::get<2>(load_key))) { ++async_load_keys_num_; - } else { - LOG(WARNING) << "PikaCacheLoadThread::ThreadMain LoadKey: " << std::get<1>(load_key) << " failed !!!"; } std::unique_lock lm(loadkeys_map_mutex_); From 1e71ff0b36fa797ce219cec13d357a34034acaa5 Mon Sep 17 00:00:00 2001 From: QlQl <2458371920@qq.com> Date: Fri, 2 Aug 2024 15:35:10 +0800 Subject: [PATCH 6/7] fix: execute hincrby cmd more than one times after delete a field which is existing (#2836) * fix hincrby cmd * add test for hincrby cmd --- src/storage/src/redis_hashes.cc | 3 ++- tests/integration/hash_test.go | 21 +++++++++++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/src/storage/src/redis_hashes.cc b/src/storage/src/redis_hashes.cc index 03a3c1c9b8..9193dddd1c 100644 --- a/src/storage/src/redis_hashes.cc +++ b/src/storage/src/redis_hashes.cc @@ -308,7 +308,8 @@ Status Redis::HIncrby(const Slice& key, const Slice& field, int64_t value, int64 batch.Put(handles_[kMetaCF], base_meta_key.Encode(), meta_value); HashesDataKey hashes_data_key(key, version, field); Int64ToStr(value_buf, 32, value); - batch.Put(handles_[kHashesDataCF], hashes_data_key.Encode(), value_buf); + BaseDataValue internal_value(value_buf); + batch.Put(handles_[kHashesDataCF], hashes_data_key.Encode(), internal_value.Encode()); *ret = value; } else { version = parsed_hashes_meta_value.Version(); diff --git a/tests/integration/hash_test.go b/tests/integration/hash_test.go index cf9448f75e..0ee0dccf1b 100644 --- a/tests/integration/hash_test.go +++ b/tests/integration/hash_test.go @@ -140,6 +140,27 @@ var _ = Describe("Hash Commands", func() { Expect(hIncrBy.Val()).To(Equal(int64(-5))) }) + It("should HIncrBy against wrong metadata", func() { + hSet := client.HSet(ctx, "hash", "key", "5") + Expect(hSet.Err()).NotTo(HaveOccurred()) + + hIncrBy := client.HIncrBy(ctx, "hash", "key", 1) + Expect(hIncrBy.Err()).NotTo(HaveOccurred()) + Expect(hIncrBy.Val()).To(Equal(int64(6))) + + hDel := client.HDel(ctx, "hash", "key") + Expect(hDel.Err()).NotTo(HaveOccurred()) + Expect(hDel.Val()).To(Equal(int64(1))) + + hIncrBy = client.HIncrBy(ctx, "hash", "key", 1) + Expect(hIncrBy.Err()).NotTo(HaveOccurred()) + Expect(hIncrBy.Val()).To(Equal(int64(1))) + + hIncrBy = client.HIncrBy(ctx, "hash", "key", 2) + Expect(hIncrBy.Err()).NotTo(HaveOccurred()) + Expect(hIncrBy.Val()).To(Equal(int64(3))) + }) + It("should HIncrByFloat", func() { hSet := client.HSet(ctx, "hash", "field", "10.50") Expect(hSet.Err()).NotTo(HaveOccurred()) From a2acd88fbb7c8d1d1d3a5a3320c5e6c4b8f8dcc7 Mon Sep 17 00:00:00 2001 From: cheniujh <41671101+cheniujh@users.noreply.github.com> Date: Sat, 3 Aug 2024 22:26:55 +0800 Subject: [PATCH 7/7] fix: add switch for RTC cache read (#2841) * 1 add a switch for RTC feature 2 avoid unnecessary cache read if rtc is already cache missed * revised --- conf/pika.conf | 7 ++++++- include/pika_client_conn.h | 7 ++++--- include/pika_command.h | 4 ++++ include/pika_conf.h | 2 ++ src/pika_client_conn.cc | 24 +++++++++++++----------- src/pika_command.cc | 10 +++++++--- src/pika_conf.cc | 5 +++++ 7 files changed, 41 insertions(+), 18 deletions(-) diff --git a/conf/pika.conf b/conf/pika.conf index 496d974174..60772e4c29 100644 --- a/conf/pika.conf +++ b/conf/pika.conf @@ -18,11 +18,16 @@ rocksdb-periodic-second : 86400 * 3; # Master's run-id # master-run-id : -# The number of threads for running Pika. +# The number of Net-worker threads in Pika. # It's not recommended to set this value exceeds # the number of CPU cores on the deployment server. thread-num : 1 +# use Net worker thread to read redis Cache for [Get, HGet] command, +# which can significantly improve QPS and reduce latency when cache hit rate is high +# default value is "yes", set it to "no" if you wanna disable it +rtc-cache-read : yes + # Size of the thread pool, The threads within this pool # are dedicated to handling user requests. thread-pool-size : 12 diff --git a/include/pika_client_conn.h b/include/pika_client_conn.h index 30a371f6cf..5b912592ab 100644 --- a/include/pika_client_conn.h +++ b/include/pika_client_conn.h @@ -59,6 +59,7 @@ class PikaClientConn : public net::RedisConn { std::shared_ptr resp_ptr; LogOffset offset; std::string db_name; + bool cache_miss_in_rtc_; }; struct TxnStateBitMask { @@ -78,7 +79,7 @@ class PikaClientConn : public net::RedisConn { void ProcessRedisCmds(const std::vector& argvs, bool async, std::string* response) override; bool ReadCmdInCache(const net::RedisCmdArgsType& argv, const std::string& opt); - void BatchExecRedisCmd(const std::vector& argvs); + void BatchExecRedisCmd(const std::vector& argvs, bool cache_miss_in_rtc); int DealMessage(const net::RedisCmdArgsType& argv, std::string* response) override { return 0; } static void DoBackgroundTask(void* arg); @@ -136,12 +137,12 @@ class PikaClientConn : public net::RedisConn { std::shared_ptr user_; std::shared_ptr DoCmd(const PikaCmdArgsType& argv, const std::string& opt, - const std::shared_ptr& resp_ptr); + const std::shared_ptr& resp_ptr, bool cache_miss_in_rtc); void ProcessSlowlog(const PikaCmdArgsType& argv, uint64_t do_duration); void ProcessMonitor(const PikaCmdArgsType& argv); - void ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr& resp_ptr); + void ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr& resp_ptr, bool cache_miss_in_rtc); void TryWriteResp(); }; diff --git a/include/pika_command.h b/include/pika_command.h index de06c332c8..c9e924eb8c 100644 --- a/include/pika_command.h +++ b/include/pika_command.h @@ -575,6 +575,9 @@ class Cmd : public std::enable_shared_from_this { uint32_t GetCmdId() const { return cmdId_; }; bool CheckArg(uint64_t num) const; + bool IsCacheMissedInRtc() const; + void SetCacheMissedInRtc(bool value); + protected: // enable copy, used default copy // Cmd(const Cmd&); @@ -603,6 +606,7 @@ class Cmd : public std::enable_shared_from_this { uint64_t do_duration_ = 0; uint32_t cmdId_ = 0; uint32_t aclCategory_ = 0; + bool cache_missed_in_rtc_{false}; private: virtual void DoInitial() = 0; diff --git a/include/pika_conf.h b/include/pika_conf.h index d85b7550dd..6638622ed2 100644 --- a/include/pika_conf.h +++ b/include/pika_conf.h @@ -468,6 +468,7 @@ class PikaConf : public pstd::BaseConf { // Immutable config items, we don't use lock. bool daemonize() { return daemonize_; } + bool rtc_cache_read_enabled() { return rtc_cache_read_enabled_; } std::string pidfile() { return pidfile_; } int binlog_file_size() { return binlog_file_size_; } std::vector compression_per_level(); @@ -930,6 +931,7 @@ class PikaConf : public pstd::BaseConf { int level0_file_num_compaction_trigger_ = 4; int64_t max_client_response_size_ = 0; bool daemonize_ = false; + bool rtc_cache_read_enabled_ = false; int timeout_ = 0; std::string server_id_; std::string run_id_; diff --git a/src/pika_client_conn.cc b/src/pika_client_conn.cc index 7834c057ef..85e740de1a 100644 --- a/src/pika_client_conn.cc +++ b/src/pika_client_conn.cc @@ -36,7 +36,7 @@ PikaClientConn::PikaClientConn(int fd, const std::string& ip_port, net::Thread* } std::shared_ptr PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const std::string& opt, - const std::shared_ptr& resp_ptr) { + const std::shared_ptr& resp_ptr, bool cache_miss_in_rtc) { // Get command info std::shared_ptr c_ptr = g_pika_cmd_table_manager->GetCmd(opt); if (!c_ptr) { @@ -47,6 +47,7 @@ std::shared_ptr PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const st } return tmp_ptr; } + c_ptr->SetCacheMissedInRtc(cache_miss_in_rtc); c_ptr->SetConn(shared_from_this()); c_ptr->SetResp(resp_ptr); @@ -273,6 +274,7 @@ void PikaClientConn::ProcessRedisCmds(const std::vector& time_stat_->Reset(); if (async) { auto arg = new BgTaskArg(); + arg->cache_miss_in_rtc_ = false; arg->redis_cmds = argvs; time_stat_->enqueue_ts_ = time_stat_->before_queue_ts_ = pstd::NowMicros(); arg->conn_ptr = std::dynamic_pointer_cast(shared_from_this()); @@ -288,7 +290,8 @@ void PikaClientConn::ProcessRedisCmds(const std::vector& bool is_admin_cmd = g_pika_conf->is_admin_cmd(opt); //we don't intercept pipeline batch (argvs.size() > 1) - if (argvs.size() == 1 && IsInterceptedByRTC(opt) && + if (g_pika_conf->rtc_cache_read_enabled() && + argvs.size() == 1 && IsInterceptedByRTC(opt) && PIKA_CACHE_NONE != g_pika_conf->cache_mode() && !IsInTxn()) { // read in cache @@ -296,13 +299,14 @@ void PikaClientConn::ProcessRedisCmds(const std::vector& delete arg; return; } + arg->cache_miss_in_rtc_ = true; time_stat_->before_queue_ts_ = pstd::NowMicros(); } g_pika_server->ScheduleClientPool(&DoBackgroundTask, arg, is_slow_cmd, is_admin_cmd); return; } - BatchExecRedisCmd(argvs); + BatchExecRedisCmd(argvs, false); } void PikaClientConn::DoBackgroundTask(void* arg) { @@ -320,15 +324,15 @@ void PikaClientConn::DoBackgroundTask(void* arg) { } } - conn_ptr->BatchExecRedisCmd(bg_arg->redis_cmds); + conn_ptr->BatchExecRedisCmd(bg_arg->redis_cmds, bg_arg->cache_miss_in_rtc_); } -void PikaClientConn::BatchExecRedisCmd(const std::vector& argvs) { +void PikaClientConn::BatchExecRedisCmd(const std::vector& argvs, bool cache_miss_in_rtc) { resp_num.store(static_cast(argvs.size())); for (const auto& argv : argvs) { std::shared_ptr resp_ptr = std::make_shared(); resp_array.push_back(resp_ptr); - ExecRedisCmd(argv, resp_ptr); + ExecRedisCmd(argv, resp_ptr, cache_miss_in_rtc); } time_stat_->process_done_ts_ = pstd::NowMicros(); TryWriteResp(); @@ -363,9 +367,6 @@ bool PikaClientConn::ReadCmdInCache(const net::RedisCmdArgsType& argv, const std return false; } //only read command(Get, HGet) will reach here, no need of record lock - if (c_ptr->db_->cache()->CacheStatus() != PIKA_CACHE_STATUS_OK) { - return false; - } bool read_status = c_ptr->DoReadCommandInCache(); auto cmdstat_map = g_pika_cmd_table_manager->GetCommandStatMap(); resp_num--; @@ -508,7 +509,8 @@ void PikaClientConn::ExitTxn() { } } -void PikaClientConn::ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr& resp_ptr) { +void PikaClientConn::ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr& resp_ptr, + bool cache_miss_in_rtc) { // get opt std::string opt = argv[0]; pstd::StringToLower(opt); @@ -519,7 +521,7 @@ void PikaClientConn::ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr cmd_ptr = DoCmd(argv, opt, resp_ptr); + std::shared_ptr cmd_ptr = DoCmd(argv, opt, resp_ptr, cache_miss_in_rtc); *resp_ptr = std::move(cmd_ptr->res().message()); resp_num--; } diff --git a/src/pika_command.cc b/src/pika_command.cc index b92f75dd22..bab8dd93f6 100644 --- a/src/pika_command.cc +++ b/src/pika_command.cc @@ -828,7 +828,7 @@ Cmd* GetCmdFromDB(const std::string& opt, const CmdTable& cmd_table) { bool Cmd::CheckArg(uint64_t num) const { return !((arity_ > 0 && num != arity_) || (arity_ < 0 && num < -arity_)); } Cmd::Cmd(std::string name, int arity, uint32_t flag, uint32_t aclCategory) - : name_(std::move(name)), arity_(arity), flag_(flag), aclCategory_(aclCategory) { + : name_(std::move(name)), arity_(arity), flag_(flag), aclCategory_(aclCategory), cache_missed_in_rtc_(false) { } void Cmd::Initial(const PikaCmdArgsType& argv, const std::string& db_name) { @@ -891,10 +891,12 @@ void Cmd::DoCommand(const HintKeys& hint_keys) { if (IsNeedCacheDo() && PIKA_CACHE_NONE != g_pika_conf->cache_mode() && db_->cache()->CacheStatus() == PIKA_CACHE_STATUS_OK) { - if (IsNeedReadCache()) { + if (!cache_missed_in_rtc_ + && IsNeedReadCache()) { ReadCache(); } - if (is_read() && res().CacheMiss()) { + if (is_read() + && (res().CacheMiss() || cache_missed_in_rtc_)) { pstd::lock::MultiScopeRecordLock record_lock(db_->LockMgr(), current_key()); DoThroughDB(); if (IsNeedUpdateCache()) { @@ -1064,3 +1066,5 @@ void Cmd::SetResp(const std::shared_ptr& resp) { resp_ = resp; } std::shared_ptr Cmd::GetResp() { return resp_.lock(); } void Cmd::SetStage(CmdStage stage) { stage_ = stage; } +bool Cmd::IsCacheMissedInRtc() const { return cache_missed_in_rtc_; } +void Cmd::SetCacheMissedInRtc(bool value) { cache_missed_in_rtc_ = value; } diff --git a/src/pika_conf.cc b/src/pika_conf.cc index 913f669880..741168be94 100644 --- a/src/pika_conf.cc +++ b/src/pika_conf.cc @@ -541,6 +541,11 @@ int PikaConf::Load() { GetConfStr("daemonize", &dmz); daemonize_ = dmz == "yes"; + // read redis cache in Net worker threads + std::string rtc_enabled; + GetConfStr("rtc-cache-read", &rtc_enabled); + rtc_cache_read_enabled_ = rtc_enabled != "no"; + // binlog std::string wb; GetConfStr("write-binlog", &wb);