diff --git a/README.md b/README.md index 69fdc0cce..419ef51c5 100644 --- a/README.md +++ b/README.md @@ -81,7 +81,7 @@ Run this command, compare with redis use pipeline commands, try it. - sadd scard srem sismember smembers sdiff sdiffstore sinter sinterstore sunion sunionstore smove spop srandmember sscan #### sorted set commands -- zadd zcard zrank zrevrank zrem zincrby zscore zrange zrevrange zrangebyscore zrevrangebyscore zremrangebyrank zremrangebyscore +- zadd zcard zrank zrevrank zrem zincrby zscore zrange zrevrange zrangebyscore zrevrangebyscore zremrangebyrank zremrangebyscore zpopmin zpopmax zunionstore zinterstore #### pubsub commands - subscribe unsubscribe publish psubscribe punsubscribe pubsub diff --git a/README_CN.md b/README_CN.md index 698e51a62..67d332c87 100644 --- a/README_CN.md +++ b/README_CN.md @@ -103,7 +103,7 @@ RocksDB 可以配置为 PikiwiDB 的持久化存储引擎,可以存储更多 #### sorted set commands -- zadd zcard zrank zrevrank zrem zincrby zscore zrange zrevrange zrangebyscore zrevrangebyscore zremrangebyrank zremrangebyscore +- zadd zcard zrank zrevrank zrem zincrby zscore zrange zrevrange zrangebyscore zrevrangebyscore zremrangebyrank zremrangebyscore zpopmin zpopmax zunionstore zinterstore #### pubsub commands diff --git a/cmake/braft.cmake b/cmake/braft.cmake index ac5d68b94..286663c4b 100644 --- a/cmake/braft.cmake +++ b/cmake/braft.cmake @@ -16,8 +16,12 @@ ExternalProject_Add( extern_braft ${EXTERNAL_PROJECT_LOG_ARGS} DEPENDS brpc + # The pr on braft is not merged, so I am using my own warehouse to run the test for the time being GIT_REPOSITORY https://github.com/panlei-coder/braft.git GIT_TAG merge-master-playback + # GIT_REPOSITORY "https://github.com/pikiwidb/braft.git" + # GIT_TAG master + GIT_SHALLOW true PREFIX ${BRAFT_SOURCES_DIR} UPDATE_COMMAND "" CMAKE_ARGS -DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER} @@ -38,7 +42,7 @@ ExternalProject_Add( -DCMAKE_BUILD_TYPE:STRING=${THIRD_PARTY_BUILD_TYPE} BUILD_IN_SOURCE 1 BUILD_COMMAND $(MAKE) -j ${CPU_CORE} braft-static - INSTALL_COMMAND mkdir -p ${BRAFT_INSTALL_DIR}/lib/ COMMAND cp ${BRAFT_SOURCES_DIR}/src/extern_braft/output/lib/libbraft.a ${BRAFT_LIBRARIES} COMMAND cp -r ${BRAFT_SOURCES_DIR}/src/extern_braft/output/include ${BRAFT_INCLUDE_DIR}/ + INSTALL_COMMAND mkdir -p ${BRAFT_INSTALL_DIR}/lib/ COMMAND cp ${BRAFT_SOURCES_DIR}/src/extern_braft/output/lib/libbraft.a ${BRAFT_LIBRARIES} COMMAND rm -rf ${BRAFT_INCLUDE_DIR} COMMAND cp -r ${BRAFT_SOURCES_DIR}/src/extern_braft/output/include ${BRAFT_INCLUDE_DIR} ) ADD_DEPENDENCIES(extern_braft brpc) ADD_LIBRARY(braft STATIC IMPORTED GLOBAL) diff --git a/cmake/brpc.cmake b/cmake/brpc.cmake index 86e97e86e..a161d5614 100644 --- a/cmake/brpc.cmake +++ b/cmake/brpc.cmake @@ -41,7 +41,6 @@ EXTERNALPROJECT_ADD( -DCMAKE_BUILD_TYPE:STRING=${THIRD_PARTY_BUILD_TYPE} BUILD_IN_SOURCE 1 BUILD_COMMAND $(MAKE) -j ${CPU_CORE} brpc-static - INSTALL_COMMAND mkdir -p ${BRPC_INSTALL_DIR}/lib/ COMMAND cp ${BRPC_SOURCES_DIR}/src/extern_brpc/output/lib/libbrpc.a ${BRPC_LIBRARIES} COMMAND cp -r ${BRPC_SOURCES_DIR}/src/extern_brpc/output/include ${BRPC_INCLUDE_DIR}/ ) ADD_DEPENDENCIES(extern_brpc ssl crypto zlib protobuf leveldb gflags) ADD_LIBRARY(brpc STATIC IMPORTED GLOBAL) diff --git a/cmake/gflags.cmake b/cmake/gflags.cmake index a144028fc..b9a1345a4 100644 --- a/cmake/gflags.cmake +++ b/cmake/gflags.cmake @@ -5,6 +5,9 @@ INCLUDE_GUARD() +SET(GFLAGS_BUILD_TYPE "Release") +SET(CMAKE_BUILD_TYPE ${GFLAGS_BUILD_TYPE}) + FetchContent_Declare(gflags URL https://github.com/gflags/gflags/archive/v2.2.2.zip URL_HASH SHA256=19713a36c9f32b33df59d1c79b4958434cb005b5b47dc5400a7a4b078111d9b5 @@ -21,6 +24,8 @@ FIND_PACKAGE(Threads REQUIRED) TARGET_LINK_LIBRARIES(gflags_static Threads::Threads) -SET(GFLAGS_INCLUDE_PATH ${CMAKE_CURRENT_BINARY_DIR}/_deps/gflags-build/include) -SET(GFLAGS_LIBRARY ${CMAKE_CURRENT_BINARY_DIR}/_deps/gflags-build/libgflags.a) -SET(GFLAGS_LIB ${CMAKE_CURRENT_BINARY_DIR}/_deps/gflags-build/libgflags.a) \ No newline at end of file +SET(GFLAGS_INCLUDE_PATH ${CMAKE_CURRENT_BINARY_DIR}/_deps/gflags-build/include CACHE PATH "" FORCE) +SET(GFLAGS_LIBRARY ${CMAKE_CURRENT_BINARY_DIR}/_deps/gflags-build/libgflags.a CACHE PATH "" FORCE) +SET(GFLAGS_LIB ${CMAKE_CURRENT_BINARY_DIR}/_deps/gflags-build/libgflags.a CACHE PATH "" FORCE) + +SET(CMAKE_BUILD_TYPE ${THIRD_PARTY_BUILD_TYPE}) \ No newline at end of file diff --git a/cmake/zlib.cmake b/cmake/zlib.cmake index b1e300009..79274ae69 100644 --- a/cmake/zlib.cmake +++ b/cmake/zlib.cmake @@ -15,6 +15,7 @@ ExternalProject_Add( ${EXTERNAL_PROJECT_LOG_ARGS} GIT_REPOSITORY "https://github.com/madler/zlib.git" GIT_TAG "v1.2.8" + GIT_SHALLOW true PREFIX ${ZLIB_SOURCES_DIR} UPDATE_COMMAND "" CMAKE_ARGS -DCMAKE_C_COMPILER=${CMAKE_C_COMPILER} diff --git a/src/base_cmd.h b/src/base_cmd.h index 926baf44d..6bb77dfa2 100644 --- a/src/base_cmd.h +++ b/src/base_cmd.h @@ -142,6 +142,10 @@ const std::string kCmdNameRPoplpush = "rpoplpush"; // zset cmd const std::string kCmdNameZAdd = "zadd"; +const std::string kCmdNameZPopMin = "zpopmin"; +const std::string kCmdNameZPopMax = "zpopmax"; +const std::string kCmdNameZInterstore = "zinterstore"; +const std::string kCmdNameZUnionstore = "zunionstore"; const std::string kCmdNameZRevrange = "zrevrange"; const std::string kCmdNameZRangebyscore = "zrangebyscore"; const std::string kCmdNameZRemrangebyscore = "zremrangebyscore"; diff --git a/src/cmd_kv.cc b/src/cmd_kv.cc index 824ae2ca0..0b241daa0 100644 --- a/src/cmd_kv.cc +++ b/src/cmd_kv.cc @@ -37,15 +37,72 @@ void GetCmd::DoCmd(PClient* client) { SetCmd::SetCmd(const std::string& name, int16_t arity) : BaseCmd(name, arity, kCmdFlagsWrite, kAclCategoryWrite | kAclCategoryString) {} +// SET key value [NX | XX] [EX seconds | PX milliseconds] bool SetCmd::DoInitial(PClient* client) { client->SetKey(client->argv_[1]); + + auto argv_ = client->argv_; + value_ = argv_[2]; + condition_ = SetCmd::kNONE; + sec_ = 0; + size_t index = 3; + + while (index != argv_.size()) { + std::string opt = argv_[index]; + if (strcasecmp(opt.data(), "xx") == 0) { + condition_ = SetCmd::kXX; + } else if (strcasecmp(opt.data(), "nx") == 0) { + condition_ = SetCmd::kNX; + } else if ((strcasecmp(opt.data(), "ex") == 0) || (strcasecmp(opt.data(), "px") == 0)) { + condition_ = (condition_ == SetCmd::kNONE) ? SetCmd::kEXORPX : condition_; + index++; + if (index == argv_.size()) { + client->SetRes(CmdRes::kSyntaxErr); + return false; + } + if (pstd::String2int(argv_[index].data(), argv_[index].size(), &sec_) == 0) { + client->SetRes(CmdRes::kInvalidInt); + return false; + } + + if (strcasecmp(opt.data(), "px") == 0) { + sec_ /= 1000; + } + } else { + client->SetRes(CmdRes::kSyntaxErr); + return false; + } + index++; + } + return true; } void SetCmd::DoCmd(PClient* client) { - storage::Status s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->Set(client->Key(), client->argv_[2]); - if (s.ok()) { - client->SetRes(CmdRes::kOK); + int32_t res = 1; + storage::Status s; + auto key_ = client->Key(); + switch (condition_) { + case SetCmd::kXX: + s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->Setxx(key_, value_, &res, sec_); + break; + case SetCmd::kNX: + s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->Setnx(key_, value_, &res, sec_); + break; + case SetCmd::kEXORPX: + s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->Setex(key_, value_, sec_); + break; + default: + s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->Set(key_, value_); + break; + } + + if (s.ok() || s.IsNotFound()) { + if (res == 1) { + client->SetRes(CmdRes::kOK); + } else { + client->AppendStringLen(-1); + } } else { client->SetRes(CmdRes::kErrOther, s.ToString()); } diff --git a/src/cmd_kv.h b/src/cmd_kv.h index 1d73582dd..db4fb0bff 100644 --- a/src/cmd_kv.h +++ b/src/cmd_kv.h @@ -24,6 +24,7 @@ class GetCmd : public BaseCmd { class SetCmd : public BaseCmd { public: + enum SetCondition { kNONE, kNX, kXX, kEXORPX }; SetCmd(const std::string &name, int16_t arity); protected: @@ -31,6 +32,11 @@ class SetCmd : public BaseCmd { private: void DoCmd(PClient *client) override; + + std::string value_; + std::string target_; + int64_t sec_ = 0; + SetCmd::SetCondition condition_{kNONE}; }; class BitOpCmd : public BaseCmd { diff --git a/src/cmd_table_manager.cc b/src/cmd_table_manager.cc index d7eddf50b..5a335465b 100644 --- a/src/cmd_table_manager.cc +++ b/src/cmd_table_manager.cc @@ -163,6 +163,10 @@ void CmdTableManager::InitCmdTable() { // zset ADD_COMMAND(ZAdd, -4); + ADD_COMMAND(ZPopMin, -2); + ADD_COMMAND(ZPopMax, -2); + ADD_COMMAND(ZInterstore, -4); + ADD_COMMAND(ZUnionstore, -4); ADD_COMMAND(ZRevrange, -4); ADD_COMMAND(ZRangebyscore, -4); ADD_COMMAND(ZRemrangebyscore, 4); diff --git a/src/cmd_zset.cc b/src/cmd_zset.cc index 8fc8d02fb..50550cf32 100644 --- a/src/cmd_zset.cc +++ b/src/cmd_zset.cc @@ -117,6 +117,187 @@ void ZAddCmd::DoCmd(PClient* client) { } } +ZPopMinCmd::ZPopMinCmd(const std::string& name, int16_t arity) + : BaseCmd(name, arity, kCmdFlagsWrite, kAclCategoryWrite | kAclCategorySortedSet) {} + +bool ZPopMinCmd::DoInitial(PClient* client) { + if (client->argv_.size() > 3) { + client->SetRes(CmdRes::kWrongNum, client->CmdName()); + return false; + } + + client->SetKey(client->argv_[1]); + return true; +} + +void ZPopMinCmd::DoCmd(PClient* client) { + int32_t count = 1; + if (client->argv_.size() == 3) { + if (pstd::String2int(client->argv_[2].data(), client->argv_[2].size(), &count) == 0) { + client->SetRes(CmdRes::kInvalidInt); + return; + } + } + + std::vector score_members; + storage::Status s = + PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->ZPopMin(client->Key(), count, &score_members); + if (s.ok()) { + char buf[32]; + int64_t len = 0; + client->AppendArrayLen(static_cast(score_members.size()) * 2); + for (auto& score_member : score_members) { + client->AppendStringLenUint64(score_member.member.size()); + client->AppendContent(score_member.member); + len = pstd::D2string(buf, sizeof(buf), score_member.score); + client->AppendStringLen(len); + client->AppendContent(buf); + } + } else { + client->SetRes(CmdRes::kErrOther, s.ToString()); + } +} + +ZPopMaxCmd::ZPopMaxCmd(const std::string& name, int16_t arity) + : BaseCmd(name, arity, kCmdFlagsWrite, kAclCategoryWrite | kAclCategorySortedSet) {} + +bool ZPopMaxCmd::DoInitial(PClient* client) { + if (client->argv_.size() > 3) { + client->SetRes(CmdRes::kWrongNum, client->CmdName()); + return false; + } + + client->SetKey(client->argv_[1]); + return true; +} + +void ZPopMaxCmd::DoCmd(PClient* client) { + int32_t count = 1; + if (client->argv_.size() == 3) { + if (pstd::String2int(client->argv_[2].data(), client->argv_[2].size(), &count) == 0) { + client->SetRes(CmdRes::kInvalidInt); + return; + } + } + + std::vector score_members; + storage::Status s = + PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->ZPopMax(client->Key(), count, &score_members); + if (s.ok()) { + char buf[32]; + int64_t len = 0; + client->AppendArrayLen(static_cast(score_members.size()) * 2); + for (auto& score_member : score_members) { + client->AppendStringLenUint64(score_member.member.size()); + client->AppendContent(score_member.member); + len = pstd::D2string(buf, sizeof(buf), score_member.score); + client->AppendStringLen(len); + client->AppendContent(buf); + } + } else { + client->SetRes(CmdRes::kErrOther, s.ToString()); + } +} + +ZsetUIstoreParentCmd::ZsetUIstoreParentCmd(const std::string& name, int16_t arity) + : BaseCmd(name, arity, kCmdFlagsWrite, kAclCategoryWrite | kAclCategorySortedSet) {} + +// ZINTERSTORE destination numkeys key [key ...] [WEIGHTS weight [weight ...]] [AGGREGATE ] +// ZUNIONSTORE destination numkeys key [key ...] [WEIGHTS weight [weight ...]] [AGGREGATE ] +bool ZsetUIstoreParentCmd::DoInitial(PClient* client) { + auto argv_ = client->argv_; + dest_key_ = argv_[1]; + if (pstd::String2int(argv_[2].data(), argv_[2].size(), &num_keys_) == 0) { + client->SetRes(CmdRes::kInvalidInt); + return false; + } + if (num_keys_ < 1) { + client->SetRes(CmdRes::kErrOther, "at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE"); + return false; + } + auto argc = argv_.size(); + if (argc < num_keys_ + 3) { + client->SetRes(CmdRes::kSyntaxErr); + return false; + } + keys_.assign(argv_.begin() + 3, argv_.begin() + 3 + num_keys_); + weights_.assign(num_keys_, 1); + auto index = num_keys_ + 3; + while (index < argc) { + if (strcasecmp(argv_[index].data(), "weights") == 0) { + index++; + if (argc < index + num_keys_) { + client->SetRes(CmdRes::kSyntaxErr); + return false; + } + double weight; + auto base = index; + for (; index < base + num_keys_; index++) { + if (pstd::String2d(argv_[index].data(), argv_[index].size(), &weight) == 0) { + client->SetRes(CmdRes::kErrOther, "weight value is not a float"); + return false; + } + weights_[index - base] = weight; + } + } else if (strcasecmp(argv_[index].data(), "aggregate") == 0) { + index++; + if (argc < index + 1) { + client->SetRes(CmdRes::kSyntaxErr); + return false; + } + if (strcasecmp(argv_[index].data(), "sum") == 0) { + aggregate_ = storage::SUM; + } else if (strcasecmp(argv_[index].data(), "min") == 0) { + aggregate_ = storage::MIN; + } else if (strcasecmp(argv_[index].data(), "max") == 0) { + aggregate_ = storage::MAX; + } else { + client->SetRes(CmdRes::kSyntaxErr); + return false; + } + index++; + } else { + client->SetRes(CmdRes::kSyntaxErr); + return false; + } + } + return true; +} + +ZInterstoreCmd::ZInterstoreCmd(const std::string& name, int16_t arity) : ZsetUIstoreParentCmd(name, arity) {} + +bool ZInterstoreCmd::DoInitial(PClient* client) { return ZsetUIstoreParentCmd::DoInitial(client); } + +void ZInterstoreCmd::DoCmd(PClient* client) { + int32_t count = 0; + std::vector value_to_dest_; + storage::Status s = PSTORE.GetBackend(client->GetCurrentDB()) + ->GetStorage() + ->ZInterstore(dest_key_, keys_, weights_, aggregate_, value_to_dest_, &count); + if (s.ok()) { + client->AppendInteger(count); + } else { + client->SetRes(CmdRes::kErrOther, s.ToString()); + } +} + +ZUnionstoreCmd::ZUnionstoreCmd(const std::string& name, int16_t arity) : ZsetUIstoreParentCmd(name, arity) {} + +bool ZUnionstoreCmd::DoInitial(PClient* client) { return ZsetUIstoreParentCmd::DoInitial(client); } + +void ZUnionstoreCmd::DoCmd(PClient* client) { + int32_t count = 0; + std::map value_to_dest; + storage::Status s = PSTORE.GetBackend(client->GetCurrentDB()) + ->GetStorage() + ->ZUnionstore(dest_key_, keys_, weights_, aggregate_, value_to_dest, &count); + if (s.ok()) { + client->AppendInteger(count); + } else { + client->SetRes(CmdRes::kErrOther, s.ToString()); + } +} + ZRevrangeCmd::ZRevrangeCmd(const std::string& name, int16_t arity) : BaseCmd(name, arity, kCmdFlagsReadonly, kAclCategoryRead | kAclCategorySortedSet) {} diff --git a/src/cmd_zset.h b/src/cmd_zset.h index 13049eaa9..b2b197b29 100644 --- a/src/cmd_zset.h +++ b/src/cmd_zset.h @@ -23,6 +23,64 @@ class ZAddCmd : public BaseCmd { void DoCmd(PClient *client) override; }; +class ZPopMinCmd : public BaseCmd { + public: + ZPopMinCmd(const std::string &name, int16_t arity); + + protected: + bool DoInitial(PClient *client) override; + + private: + void DoCmd(PClient *client) override; +}; + +class ZPopMaxCmd : public BaseCmd { + public: + ZPopMaxCmd(const std::string &name, int16_t arity); + + protected: + bool DoInitial(PClient *client) override; + + private: + void DoCmd(PClient *client) override; +}; + +class ZsetUIstoreParentCmd : public BaseCmd { + public: + ZsetUIstoreParentCmd(const std::string &name, int16_t arity); + + protected: + bool DoInitial(PClient *client) override; + + std::string dest_key_; + int64_t num_keys_ = 0; + storage::AGGREGATE aggregate_{storage::SUM}; + std::vector keys_; + std::vector weights_; +}; + +class ZInterstoreCmd : public ZsetUIstoreParentCmd { + public: + ZInterstoreCmd(const std::string &name, int16_t arity); + + protected: + bool DoInitial(PClient *client) override; + + private: + void DoCmd(PClient *client) override; +}; + +class ZUnionstoreCmd : public ZsetUIstoreParentCmd { + public: + ZUnionstoreCmd(const std::string &name, int16_t arity); + + protected: + bool DoInitial(PClient *client) override; + + private: + void DoCmd(PClient *client) override; +}; + class ZRevrangeCmd : public BaseCmd { public: ZRevrangeCmd(const std::string &name, int16_t arity); diff --git a/src/db.cc b/src/db.cc index 44b5430b5..a3c4717d6 100644 --- a/src/db.cc +++ b/src/db.cc @@ -6,6 +6,7 @@ */ #include "db.h" +#include #include "config.h" #include "praft/praft.h" @@ -18,6 +19,8 @@ namespace pikiwidb { DB::DB(int db_index, const std::string& db_path) : db_index_(db_index), db_path_(db_path + std::to_string(db_index_) + '/') {} +DB::~DB() { INFO("DB{} is closing...", db_index_); } + rocksdb::Status DB::Open() { storage::StorageOptions storage_options; storage_options.options = g_config.GetRocksDBOptions(); @@ -43,6 +46,11 @@ rocksdb::Status DB::Open() { storage_options.db_instance_num = g_config.db_instance_num.load(); storage_options.db_id = db_index_; + std::unique_ptr old_storage = std::move(storage_); + if (old_storage != nullptr) { + old_storage->Close(); + old_storage.reset(); + } storage_ = std::make_unique(); if (auto s = storage_->Open(storage_options, db_path_); !s.ok()) { diff --git a/src/db.h b/src/db.h index da4cd8f47..c7508273e 100644 --- a/src/db.h +++ b/src/db.h @@ -19,6 +19,7 @@ namespace pikiwidb { class DB { public: DB(int db_index, const std::string& db_path); + ~DB(); rocksdb::Status Open(); diff --git a/src/storage/include/storage/storage.h b/src/storage/include/storage/storage.h index b21e51ee6..8c50f2aba 100644 --- a/src/storage/include/storage/storage.h +++ b/src/storage/include/storage/storage.h @@ -186,6 +186,8 @@ class Storage { Status Open(const StorageOptions& storage_options, const std::string& db_path); + Status Close(); + std::vector> CreateCheckpoint(const std::string& checkpoint_path); Status CreateCheckpointInternal(const std::string& checkpoint_path, int db_index); diff --git a/src/storage/src/redis.cc b/src/storage/src/redis.cc index 6417f07bb..82ac81db6 100644 --- a/src/storage/src/redis.cc +++ b/src/storage/src/redis.cc @@ -5,10 +5,12 @@ #include +#include "pstd/log.h" #include "rocksdb/env.h" #include "src/base_filter.h" #include "src/lists_filter.h" +#include "src/mutex.h" #include "src/redis.h" #include "src/strings_filter.h" #include "src/zsets_filter.h" @@ -45,19 +47,20 @@ Redis::Redis(Storage* const s, int32_t index) } Redis::~Redis() { - rocksdb::CancelAllBackgroundWork(db_, true); - std::vector tmp_handles = handles_; - handles_.clear(); - for (auto handle : tmp_handles) { - delete handle; - } - // delete env_; - delete db_; - - if (default_compact_range_options_.canceled) { - delete default_compact_range_options_.canceled; + if (need_close_.load()) { + rocksdb::CancelAllBackgroundWork(db_, true); + std::vector tmp_handles = handles_; + handles_.clear(); + for (auto& handle : tmp_handles) { + delete handle; + } + // delete env_; + delete db_; + db_ = nullptr; } -} + delete default_compact_range_options_.canceled; + default_compact_range_options_.canceled = nullptr; +}; Status Redis::Open(const StorageOptions& storage_options, const std::string& db_path) { append_log_function_ = storage_options.append_log_function; diff --git a/src/storage/src/redis.h b/src/storage/src/redis.h index af9135a1e..1b3334649 100644 --- a/src/storage/src/redis.h +++ b/src/storage/src/redis.h @@ -100,6 +100,8 @@ class Redis { // Common Commands Status Open(const StorageOptions& storage_options, const std::string& db_path); + void SetNeedClose(bool need_close) { need_close_.store(need_close); } + virtual Status CompactRange(const DataType& option_type, const rocksdb::Slice* begin, const rocksdb::Slice* end, const ColumnFamilyType& type = kMetaAndData); @@ -365,6 +367,7 @@ class Redis { private: int32_t index_ = 0; + std::atomic need_close_ = false; Storage* const storage_; std::shared_ptr lock_mgr_; rocksdb::DB* db_ = nullptr; diff --git a/src/storage/src/redis_zsets.cc b/src/storage/src/redis_zsets.cc index 44e9afc8e..f9e66c0e2 100644 --- a/src/storage/src/redis_zsets.cc +++ b/src/storage/src/redis_zsets.cc @@ -109,7 +109,7 @@ Status Redis::ZsetsPKPatternMatchDel(const std::string& pattern, int32_t* ret) { Status Redis::ZPopMax(const Slice& key, const int64_t count, std::vector* score_members) { uint32_t statistic = 0; score_members->clear(); - rocksdb::WriteBatch batch; + auto batch = Batch::CreateBatch(this); ScopeRecordLock l(lock_mgr_, key); std::string meta_value; @@ -136,16 +136,16 @@ Status Redis::ZPopMax(const Slice& key, const int64_t count, std::vectorkey()); + batch->Delete(kZsetsDataCF, zsets_member_key.Encode()); + batch->Delete(kZsetsScoreCF, iter->key()); } delete iter; if (!parsed_zsets_meta_value.CheckModifyCount(-del_cnt)) { return Status::InvalidArgument("zset size overflow"); } parsed_zsets_meta_value.ModifyCount(-del_cnt); - batch.Put(handles_[kZsetsMetaCF], base_meta_key.Encode(), meta_value); - s = db_->Write(default_write_options_, &batch); + batch->Put(kZsetsMetaCF, base_meta_key.Encode(), meta_value); + s = batch->Commit(); UpdateSpecificKeyStatistics(DataType::kZSets, key.ToString(), statistic); return s; } @@ -157,7 +157,7 @@ Status Redis::ZPopMax(const Slice& key, const int64_t count, std::vector* score_members) { uint32_t statistic = 0; score_members->clear(); - rocksdb::WriteBatch batch; + auto batch = Batch::CreateBatch(this); ScopeRecordLock l(lock_mgr_, key); std::string meta_value; @@ -184,16 +184,16 @@ Status Redis::ZPopMin(const Slice& key, const int64_t count, std::vectorkey()); + batch->Delete(kZsetsDataCF, zsets_member_key.Encode()); + batch->Delete(kZsetsScoreCF, iter->key()); } delete iter; if (!parsed_zsets_meta_value.CheckModifyCount(-del_cnt)) { return Status::InvalidArgument("zset size overflow"); } parsed_zsets_meta_value.ModifyCount(-del_cnt); - batch.Put(handles_[kZsetsMetaCF], base_meta_key.Encode(), meta_value); - s = db_->Write(default_write_options_, &batch); + batch->Put(kZsetsMetaCF, base_meta_key.Encode(), meta_value); + s = batch->Commit(); UpdateSpecificKeyStatistics(DataType::kZSets, key.ToString(), statistic); return s; } @@ -1090,7 +1090,7 @@ Status Redis::ZUnionstore(const Slice& destination, const std::vector& value_to_dest, int32_t* ret) { *ret = 0; uint32_t statistic = 0; - rocksdb::WriteBatch batch; + auto batch = Batch::CreateBatch(this); rocksdb::ReadOptions read_options; const rocksdb::Snapshot* snapshot = nullptr; @@ -1158,13 +1158,13 @@ Status Redis::ZUnionstore(const Slice& destination, const std::vector(member_score_map.size())); - batch.Put(handles_[kZsetsMetaCF], base_destination.Encode(), meta_value); + batch->Put(kZsetsMetaCF, base_destination.Encode(), meta_value); } else { char buf[4]; EncodeFixed32(buf, member_score_map.size()); ZSetsMetaValue zsets_meta_value(Slice(buf, sizeof(int32_t))); version = zsets_meta_value.UpdateVersion(); - batch.Put(handles_[kZsetsMetaCF], base_destination.Encode(), zsets_meta_value.Encode()); + batch->Put(kZsetsMetaCF, base_destination.Encode(), zsets_meta_value.Encode()); } char score_buf[8]; @@ -1174,14 +1174,14 @@ Status Redis::ZUnionstore(const Slice& destination, const std::vector(&sm.second); EncodeFixed64(score_buf, *reinterpret_cast(ptr_score)); BaseDataValue member_i_val(Slice(score_buf, sizeof(uint64_t))); - batch.Put(handles_[kZsetsDataCF], zsets_member_key.Encode(), member_i_val.Encode()); + batch->Put(kZsetsDataCF, zsets_member_key.Encode(), member_i_val.Encode()); ZSetsScoreKey zsets_score_key(destination, version, sm.second, sm.first); BaseDataValue score_i_val(Slice{}); - batch.Put(handles_[kZsetsScoreCF], zsets_score_key.Encode(), score_i_val.Encode()); + batch->Put(kZsetsScoreCF, zsets_score_key.Encode(), score_i_val.Encode()); } *ret = static_cast(member_score_map.size()); - s = db_->Write(default_write_options_, &batch); + s = batch->Commit(); UpdateSpecificKeyStatistics(DataType::kZSets, destination.ToString(), statistic); value_to_dest = std::move(member_score_map); return s; @@ -1196,7 +1196,7 @@ Status Redis::ZInterstore(const Slice& destination, const std::vector(final_score_members.size())); - batch.Put(handles_[kZsetsMetaCF], base_destination.Encode(), meta_value); + batch->Put(kZsetsMetaCF, base_destination.Encode(), meta_value); } else { char buf[4]; EncodeFixed32(buf, final_score_members.size()); ZSetsMetaValue zsets_meta_value(Slice(buf, sizeof(int32_t))); version = zsets_meta_value.UpdateVersion(); - batch.Put(handles_[kZsetsMetaCF], base_destination.Encode(), zsets_meta_value.Encode()); + batch->Put(kZsetsMetaCF, base_destination.Encode(), zsets_meta_value.Encode()); } char score_buf[8]; for (const auto& sm : final_score_members) { @@ -1311,14 +1311,14 @@ Status Redis::ZInterstore(const Slice& destination, const std::vector(&sm.score); EncodeFixed64(score_buf, *reinterpret_cast(ptr_score)); BaseDataValue member_i_val(Slice(score_buf, sizeof(uint64_t))); - batch.Put(handles_[kZsetsDataCF], zsets_member_key.Encode(), member_i_val.Encode()); + batch->Put(kZsetsDataCF, zsets_member_key.Encode(), member_i_val.Encode()); ZSetsScoreKey zsets_score_key(destination, version, sm.score, sm.member); BaseDataValue zsets_score_i_val(Slice{}); - batch.Put(handles_[kZsetsScoreCF], zsets_score_key.Encode(), zsets_score_i_val.Encode()); + batch->Put(kZsetsScoreCF, zsets_score_key.Encode(), zsets_score_i_val.Encode()); } *ret = static_cast(final_score_members.size()); - s = db_->Write(default_write_options_, &batch); + s = batch->Commit(); UpdateSpecificKeyStatistics(DataType::kZSets, destination.ToString(), statistic); value_to_dest = std::move(final_score_members); return s; diff --git a/src/storage/src/storage.cc b/src/storage/src/storage.cc index a8446a319..9e6f76731 100644 --- a/src/storage/src/storage.cc +++ b/src/storage/src/storage.cc @@ -71,9 +71,11 @@ Storage::Storage() { } Storage::~Storage() { + INFO("Storage begin to clear storage!"); bg_tasks_should_exit_.store(true); bg_tasks_cond_var_.notify_one(); if (is_opened_.load()) { + INFO("Storage begin to clear all instances!"); int ret = 0; if (ret = pthread_join(bg_tasks_thread_id_, nullptr); ret != 0) { ERROR("pthread_join failed with bgtask thread error : {}", ret); @@ -82,6 +84,17 @@ Storage::~Storage() { } } +Status Storage::Close() { + if (!is_opened_.load()) { + return Status::OK(); + } + is_opened_.store(false); + for (auto& inst : insts_) { + inst->SetNeedClose(true); + } + return Status::OK(); +} + static std::string AppendSubDirectory(const std::string& db_path, int index) { if (db_path.back() == '/') { return db_path + std::to_string(index); diff --git a/src/store.cc b/src/store.cc index 835823284..24a0f2ba1 100644 --- a/src/store.cc +++ b/src/store.cc @@ -17,6 +17,8 @@ namespace pikiwidb { +PStore::~PStore() { INFO("STORE is closing..."); } + PStore& PStore::Instance() { static PStore store; return store; diff --git a/src/store.h b/src/store.h index 8e8590adb..e7daf3d46 100644 --- a/src/store.h +++ b/src/store.h @@ -45,6 +45,7 @@ class PStore { PStore(const PStore&) = delete; void operator=(const PStore&) = delete; + ~PStore(); void Init(int db_number); diff --git a/tests/consistency_test.go b/tests/consistency_test.go index c14b33250..c0dbd22b2 100644 --- a/tests/consistency_test.go +++ b/tests/consistency_test.go @@ -626,6 +626,74 @@ var _ = Describe("Consistency", Ordered, func() { } }) + It("ZPopMin & ZPopMax Consistency Test", func() { + const testKey = "ZSetsConsistencyTestKey" + i4 := redis.Z{Score: 4, Member: "z4"} + i5 := redis.Z{Score: 5, Member: "z5"} + i8 := redis.Z{Score: 8, Member: "z8"} + { + zadd, err := leader.ZAdd(ctx, testKey, i4, i5, i8).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(zadd).To(Equal(int64(3))) + + vals, err := leader.ZPopMin(ctx, testKey).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(vals).To(Equal([]redis.Z{i4})) + + vals, err = leader.ZPopMax(ctx, testKey).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(vals).To(Equal([]redis.Z{i8})) + + // read check + readChecker(func(c *redis.Client) { + zrange, err := c.ZRevRangeWithScores(ctx, testKey, 0, -1).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(zrange).To(Equal([]redis.Z{i5})) + }) + } + }) + + It("ZUnionstore & ZInterStore Consistency Test", func() { + i4 := redis.Z{Score: 4, Member: "z4"} + i5 := redis.Z{Score: 5, Member: "z5"} + i8 := redis.Z{Score: 8, Member: "z8"} + { + zadd, err := leader.ZAdd(ctx, "in1", i4, i5).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(zadd).To(Equal(int64(2))) + + zadd, err = leader.ZAdd(ctx, "in2", i4, i8).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(zadd).To(Equal(int64(2))) + + vals, err := leader.ZUnionStore(ctx, "out1", &redis.ZStore{ + Keys: []string{"in1", "in2"}, + Weights: []float64{1, 1}, + Aggregate: "MIN", + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(vals).To(Equal(int64(3))) + + vals, err = leader.ZInterStore(ctx, "out2", &redis.ZStore{ + Keys: []string{"in1", "in2"}, + Weights: []float64{1, 1}, + Aggregate: "MIN", + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(vals).To(Equal(int64(1))) + + readChecker(func(c *redis.Client) { + zrange, err := c.ZRevRangeWithScores(ctx, "out1", 0, -1).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(zrange).To(Equal([]redis.Z{i8, i5, i4})) + + zrange, err = c.ZRevRangeWithScores(ctx, "out2", 0, -1).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(zrange).To(Equal([]redis.Z{i4})) + }) + } + }) + It("SetBit Consistency Test", func() { const testKey = "StringsConsistencyTestKey" { diff --git a/tests/key_test.go b/tests/key_test.go index ff457ffc0..97e5b52d9 100644 --- a/tests/key_test.go +++ b/tests/key_test.go @@ -65,6 +65,104 @@ var _ = Describe("Keyspace", Ordered, func() { } }) + It("Set", func() { + { + // set px + res, err := client.Set(ctx, "a", "a", time.Millisecond*1001).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(Equal("OK")) + time.Sleep(time.Millisecond * 2000) + + n, err := client.Exists(ctx, "a").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(Equal(int64(0))) + } + { + // set ex + res, err := client.Set(ctx, "a", "a", time.Second*60).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(Equal("OK")) + + n, err := client.Exists(ctx, "a").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(Equal(int64(1))) + } + { + // set xx + res, err := client.SetXX(ctx, "a", "a", 0).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(Equal(true)) + + n, err := client.Exists(ctx, "a").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(Equal(int64(1))) + } + { + // set ex xx + res, err := client.SetXX(ctx, "a", "a", time.Second*30).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(Equal(true)) + + n, err := client.Exists(ctx, "a").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(Equal(int64(1))) + } + { + // set px xx + res, err := client.SetXX(ctx, "a", "a", time.Millisecond*1001).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(Equal(true)) + time.Sleep(time.Millisecond * 2000) + + n, err := client.Exists(ctx, "a").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(Equal(int64(0))) + } + { + // set ex nx + res, err := client.SetNX(ctx, "a", "a", time.Second).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(Equal(true)) + time.Sleep(time.Second * 2) + + n, err := client.Exists(ctx, "a").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(Equal(int64(0))) + } + { + // set px nx + res, err := client.SetNX(ctx, "a", "a", time.Millisecond*1001).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(Equal(true)) + time.Sleep(time.Millisecond * 2000) + + n, err := client.Exists(ctx, "a").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(Equal(int64(0))) + } + { + // set nx + res, err := client.SetNX(ctx, "a", "a", 0).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(Equal(true)) + + n, err := client.Exists(ctx, "a").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(Equal(int64(1))) + } + { + // setex + res, err := client.SetEx(ctx, "a", "a", time.Second).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(Equal("OK")) + time.Sleep(time.Second * 2) + + n, err := client.Exists(ctx, "a").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(Equal(int64(0))) + } + }) + //TODO(dingxiaoshuai) Add more test cases. It("Exists", func() { n, err := client.Exists(ctx, "key1").Result() @@ -317,28 +415,28 @@ var _ = Describe("Keyspace", Ordered, func() { Expect(client.Do(ctx, "pexpire", DefaultKey, "err").Err()).To(MatchError("ERR value is not an integer or out of range")) }) - It("should Rename", func () { + It("should Rename", func() { client.Set(ctx, "mykey", "hello", 0) client.Rename(ctx, "mykey", "mykey1") client.Rename(ctx, "mykey1", "mykey2") Expect(client.Get(ctx, "mykey2").Val()).To(Equal("hello")) Expect(client.Exists(ctx, "mykey").Val()).To(Equal(int64(0))) - + client.Set(ctx, "mykey", "foo", 0) Expect(client.Rename(ctx, "mykey", "mykey").Val()).To(Equal(OK)) - + client.Del(ctx, "mykey", "mykey2") client.Set(ctx, "mykey", "foo", 0) client.Set(ctx, "mykey2", "bar", 0) - client.Expire(ctx, "mykey2", 100 * time.Second) + client.Expire(ctx, "mykey2", 100*time.Second) Expect(client.TTL(ctx, "mykey").Val()).To(Equal(-1 * time.Nanosecond)) Expect(client.TTL(ctx, "mykey2").Val()).NotTo(Equal(-1 * time.Nanosecond)) client.Rename(ctx, "mykey", "mykey2") Expect(client.TTL(ctx, "mykey2").Val()).To(Equal(-1 * time.Nanosecond)) }) - It("should RenameNX", func () { + It("should RenameNX", func() { client.Del(ctx, "mykey", "mykey1", "mykey2") client.Set(ctx, "mykey", "hello", 0) client.RenameNX(ctx, "mykey", "mykey1") diff --git a/tests/zset_test.go b/tests/zset_test.go index eab3e7838..4a1b7a5b9 100644 --- a/tests/zset_test.go +++ b/tests/zset_test.go @@ -477,4 +477,182 @@ var _ = Describe("Zset", Ordered, func() { Member: "three", }})) }) + + item1 := redis.Z{ + Score: 1, + Member: "one", + } + + item2 := redis.Z{ + Score: 2, + Member: "two", + } + + It("should ZPopMin", func() { + { + err := client.ZAdd(ctx, "ZPopMin", item1, item2).Err() + Expect(err).NotTo(HaveOccurred()) + vals, err := client.ZPopMin(ctx, "ZPopMin").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(vals).To(Equal([]redis.Z{item1})) + } + { + err := client.ZAdd(ctx, "ZPopMin", item1, item2).Err() + Expect(err).NotTo(HaveOccurred()) + vals, err := client.ZPopMin(ctx, "ZPopMin", 2).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(vals).To(Equal([]redis.Z{item1, item2})) + } + { + err := client.ZAdd(ctx, "ZPopMin", item1, item2).Err() + Expect(err).NotTo(HaveOccurred()) + vals, err := client.ZPopMin(ctx, "ZPopMin", 10).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(vals).To(Equal([]redis.Z{item1, item2})) + } + + }) + + It("should ZPopMax", func() { + + { + err := client.ZAdd(ctx, "ZPopMax", item1, item2).Err() + Expect(err).NotTo(HaveOccurred()) + vals, err := client.ZPopMax(ctx, "ZPopMax").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(vals).To(Equal([]redis.Z{item2})) + } + { + err := client.ZAdd(ctx, "ZPopMax", item1, item2).Err() + Expect(err).NotTo(HaveOccurred()) + vals, err := client.ZPopMax(ctx, "ZPopMax", 2).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(vals).To(Equal([]redis.Z{item2, item1})) + } + { + err := client.ZAdd(ctx, "ZPopMax", item1, item2).Err() + Expect(err).NotTo(HaveOccurred()) + vals, err := client.ZPopMax(ctx, "ZPopMax", 10).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(vals).To(Equal([]redis.Z{item2, item1})) + } + }) + + item20 := redis.Z{ + Score: 20, + Member: "two", + } + item30 := redis.Z{ + Score: 30, + Member: "three", + } + It("should ZInterstore", func() { + err := client.ZAdd(ctx, "in1", item1, item2).Err() + Expect(err).NotTo(HaveOccurred()) + err = client.ZAdd(ctx, "in2", item20, item30).Err() + Expect(err).NotTo(HaveOccurred()) + + res, err := client.ZInterStore(ctx, "out", &redis.ZStore{ + Keys: []string{"in1", "in2"}, + Weights: []float64{2, 3}, + Aggregate: "SUM", + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(Equal(int64(1))) + + vals, err := client.ZRangeWithScores(ctx, "out", 0, -1).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(vals).To(Equal([]redis.Z{{ + Score: 64, + Member: "two", + }})) + + res, err = client.ZInterStore(ctx, "out", &redis.ZStore{ + Keys: []string{"in1", "in2"}, + Weights: []float64{1, 1}, + Aggregate: "MIN", + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(Equal(int64(1))) + + vals, err = client.ZRangeWithScores(ctx, "out", 0, -1).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(vals).To(Equal([]redis.Z{{ + Score: 2, + Member: "two", + }})) + + res, err = client.ZInterStore(ctx, "out", &redis.ZStore{ + Keys: []string{"in1", "in2"}, + Weights: []float64{20, 1}, + Aggregate: "MAX", + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(Equal(int64(1))) + + vals, err = client.ZRangeWithScores(ctx, "out", 0, -1).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(vals).To(Equal([]redis.Z{{ + Score: 40, + Member: "two", + }})) + }) + + It("should ZUnionstore && ZInterStore", func() { + err := client.ZAdd(ctx, "in1", item1, item2).Err() + Expect(err).NotTo(HaveOccurred()) + err = client.ZAdd(ctx, "in2", item20, item30).Err() + Expect(err).NotTo(HaveOccurred()) + + res, err := client.ZUnionStore(ctx, "out", &redis.ZStore{ + Keys: []string{"in1", "in2"}, + Weights: []float64{1, 1}, + Aggregate: "SUM", + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(Equal(int64(3))) + + vals, err := client.ZRangeWithScores(ctx, "out", 0, -1).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(vals).To(Equal([]redis.Z{{ + Score: 1, + Member: "one", + }, { + Score: 22, + Member: "two", + }, { + Score: 30, + Member: "three", + }})) + + res, err = client.ZInterStore(ctx, "out", &redis.ZStore{ + Keys: []string{"in1", "in2"}, + Weights: []float64{1, 1}, + Aggregate: "MIN", + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(Equal(int64(1))) + + vals, err = client.ZRangeWithScores(ctx, "out", 0, -1).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(vals).To(Equal([]redis.Z{{ + Score: 2, + Member: "two", + }})) + + res, err = client.ZInterStore(ctx, "out", &redis.ZStore{ + Keys: []string{"in1", "in2"}, + Weights: []float64{2, 3}, + Aggregate: "MAX", + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(Equal(int64(1))) + + vals, err = client.ZRangeWithScores(ctx, "out", 0, -1).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(vals).To(Equal([]redis.Z{{ + Score: 60, + Member: "two", + }})) + }) })