diff --git a/.github/workflows/pika.yml b/.github/workflows/pika.yml index e8875749c4..20b82bf66a 100644 --- a/.github/workflows/pika.yml +++ b/.github/workflows/pika.yml @@ -5,7 +5,7 @@ on: branches: [ "unstable", "3.5" , "4.0"] pull_request: branches: [ "unstable", "3.5" , "4.0"] - + workflow_dispatch: env: # Customize the CMake build type here (Release, Debug, RelWithDebInfo, etc.) BUILD_TYPE: RelWithDebInfo diff --git a/include/pika_client_conn.h b/include/pika_client_conn.h index 6b5dbab419..d804a113c0 100644 --- a/include/pika_client_conn.h +++ b/include/pika_client_conn.h @@ -19,6 +19,7 @@ struct TimeStat { void Reset() { enqueue_ts_ = dequeue_ts_ = 0; process_done_ts_ = 0; + before_queue_ts_ = 0; } uint64_t start_ts() const { @@ -37,8 +38,13 @@ struct TimeStat { return process_done_ts_ > dequeue_ts_ ? process_done_ts_ - dequeue_ts_ : 0; } + uint64_t before_queue_time() const { + return process_done_ts_ > dequeue_ts_ ? before_queue_ts_ - enqueue_ts_ : 0; + } + uint64_t enqueue_ts_; uint64_t dequeue_ts_; + uint64_t before_queue_ts_; uint64_t process_done_ts_; }; @@ -69,6 +75,7 @@ class PikaClientConn : public net::RedisConn { void ProcessRedisCmds(const std::vector& argvs, bool async, std::string* response) override; + bool BatchReadCmdInCache(const std::vector& argvs); void BatchExecRedisCmd(const std::vector& argvs); int DealMessage(const net::RedisCmdArgsType& argv, std::string* response) override { return 0; } static void DoBackgroundTask(void* arg); diff --git a/include/pika_command.h b/include/pika_command.h index 2c1d5c84fb..92e314a745 100644 --- a/include/pika_command.h +++ b/include/pika_command.h @@ -247,6 +247,8 @@ const std::string kCmdNameXInfo = "xinfo"; const std::string kClusterPrefix = "pkcluster"; +const std::unordered_set interceptCmds = {kCmdNameGet, kCmdNameHGet, kCmdNameHGetall}; + /* * If a type holds a key, a new data structure * that uses the key will use this error @@ -290,6 +292,7 @@ enum CmdFlags { kCmdFlagsStream = (1 << 20), kCmdFlagsFast = (1 << 21), kCmdFlagsSlow = (1 << 22), + kCmdReadBeforeQueue = (1 << 23), }; void inline RedisAppendContent(std::string& str, const std::string& value); @@ -536,6 +539,7 @@ class Cmd : public std::enable_shared_from_this { bool hasFlag(uint32_t flag) const; bool is_read() const; bool is_write() const; + bool isCacheRead() const; bool IsLocal() const; bool IsSuspend() const; @@ -579,6 +583,7 @@ class Cmd : public std::enable_shared_from_this { void ProcessCommand(const HintKeys& hint_key = HintKeys()); void InternalProcessCommand(const HintKeys& hint_key); void DoCommand(const HintKeys& hint_key); + bool DoReadCommandInCache(const HintKeys& hint_key = HintKeys()); void LogCommand() const; std::string name_; diff --git a/src/pika_client_conn.cc b/src/pika_client_conn.cc index 1156cc3d95..e3a53f32af 100644 --- a/src/pika_client_conn.cc +++ b/src/pika_client_conn.cc @@ -18,6 +18,7 @@ #include "include/pika_server.h" #include "net/src/dispatch_thread.h" #include "net/src/worker_thread.h" +#include "src/pstd/include/scope_record_lock.h" extern std::unique_ptr g_pika_conf; extern PikaServer* g_pika_server; @@ -237,6 +238,7 @@ void PikaClientConn::ProcessSlowlog(const PikaCmdArgsType& argv, uint64_t do_dur LOG(ERROR) << "ip_port: " << ip_port() << ", db: " << current_db_ << ", command:" << slow_log << ", command_size: " << cmd_size - 1 << ", arguments: " << argv.size() << ", total_time(ms): " << time_stat_->total_time() / 1000 + << ", before_queue_time(ms): " << time_stat_->before_queue_time() / 1000 << ", queue_time(ms): " << time_stat_->queue_time() / 1000 << ", process_time(ms): " << time_stat_->process_time() / 1000 << ", cmd_time(ms): " << do_duration / 1000; @@ -261,7 +263,7 @@ void PikaClientConn::ProcessRedisCmds(const std::vector& if (async) { auto arg = new BgTaskArg(); arg->redis_cmds = argvs; - time_stat_->enqueue_ts_ = pstd::NowMicros(); + time_stat_->enqueue_ts_ = time_stat_->before_queue_ts_ = pstd::NowMicros(); arg->conn_ptr = std::dynamic_pointer_cast(shared_from_this()); /** * If using the pipeline method to transmit batch commands to Pika, it is unable to @@ -273,6 +275,18 @@ void PikaClientConn::ProcessRedisCmds(const std::vector& pstd::StringToLower(opt); bool is_slow_cmd = g_pika_conf->is_slow_cmd(opt); bool is_admin_cmd = g_pika_conf->is_admin_cmd(opt); + bool read_status = false; + + if (PIKA_CACHE_NONE != g_pika_conf->cache_mode() && !IsInTxn() && interceptCmds.find(opt) != interceptCmds.end()){ + // read in cache + if (BatchReadCmdInCache(argvs)){ + delete arg; + arg = nullptr; + return; + } + time_stat_->before_queue_ts_ = pstd::NowMicros(); + } + g_pika_server->ScheduleClientPool(&DoBackgroundTask, arg, is_slow_cmd, is_admin_cmd); return; } @@ -308,6 +322,49 @@ void PikaClientConn::BatchExecRedisCmd(const std::vector& TryWriteResp(); } +bool PikaClientConn::BatchReadCmdInCache(const std::vector& argvs){ + resp_num.store(static_cast(argvs.size())); + bool read_status = true; + for (const auto& argv : argvs) { + std::shared_ptr resp_ptr = std::make_shared(); + resp_array.push_back(resp_ptr); + std::shared_ptr c_ptr = g_pika_cmd_table_manager->GetCmd(argv[0]); + if (!c_ptr) { + return false; + } + // Check authed + if (AuthRequired()) { // the user is not authed, need to do auth + if (!(c_ptr->flag() & kCmdFlagsNoAuth)) { + c_ptr->res().SetRes(CmdRes::kErrOther, "NOAUTH Authentication required."); + return false; + } + } + // Initial + c_ptr->Initial(argv, current_db_); + pstd::lock::MultiRecordLock record_lock(c_ptr->db_->LockMgr()); + auto cur_keys = c_ptr->current_key(); + if (!cur_keys.empty()){ + record_lock.Lock(cur_keys); + } + + read_status = c_ptr->DoReadCommandInCache(); + time_stat_->process_done_ts_ = pstd::NowMicros(); + auto cmdstat_map = g_pika_cmd_table_manager->GetCommandStatMap(); + (*cmdstat_map)[argv[0]].cmd_count.fetch_add(1); + (*cmdstat_map)[argv[0]].cmd_time_consuming.fetch_add(time_stat_->total_time()); + *resp_ptr = std::move(c_ptr->res().message()); + resp_num--; + record_lock.Unlock(cur_keys); + } + + if(!read_status){ + return read_status; + }else{ + TryWriteResp(); + return read_status; + } +} + void PikaClientConn::TryWriteResp() { int expected = 0; if (resp_num.compare_exchange_strong(expected, -1)) { diff --git a/src/pika_command.cc b/src/pika_command.cc index bae2edd144..08e67470ab 100644 --- a/src/pika_command.cc +++ b/src/pika_command.cc @@ -243,7 +243,7 @@ void InitCmdTable(CmdTable* cmd_table) { cmd_table->insert(std::pair>(kCmdNameSet, std::move(setptr))); ////GetCmd std::unique_ptr getptr = - std::make_unique(kCmdNameGet, 2, kCmdFlagsRead | kCmdFlagsKv | kCmdFlagsDoThroughDB | kCmdFlagsUpdateCache | kCmdFlagsReadCache | kCmdFlagsSlow); + std::make_unique(kCmdNameGet, 2, kCmdFlagsRead | kCmdFlagsKv | kCmdFlagsDoThroughDB | kCmdFlagsUpdateCache | kCmdFlagsReadCache | kCmdReadBeforeQueue | kCmdFlagsSlow); cmd_table->insert(std::pair>(kCmdNameGet, std::move(getptr))); ////DelCmd std::unique_ptr delptr = @@ -392,15 +392,15 @@ void InitCmdTable(CmdTable* cmd_table) { cmd_table->insert(std::pair>(kCmdNameHSet, std::move(hsetptr))); ////HGetCmd std::unique_ptr hgetptr = - std::make_unique(kCmdNameHGet, 3, kCmdFlagsRead | kCmdFlagsHash | kCmdFlagsUpdateCache | kCmdFlagsDoThroughDB | kCmdFlagsReadCache | kCmdFlagsFast); + std::make_unique(kCmdNameHGet, 3, kCmdFlagsRead | kCmdFlagsHash | kCmdFlagsUpdateCache | kCmdFlagsDoThroughDB | kCmdFlagsReadCache | kCmdReadBeforeQueue |kCmdFlagsFast); cmd_table->insert(std::pair>(kCmdNameHGet, std::move(hgetptr))); ////HGetallCmd std::unique_ptr hgetallptr = - std::make_unique(kCmdNameHGetall, 2, kCmdFlagsRead | kCmdFlagsHash | kCmdFlagsSlow | kCmdFlagsUpdateCache | kCmdFlagsDoThroughDB | kCmdFlagsReadCache); + std::make_unique(kCmdNameHGetall, 2, kCmdFlagsRead | kCmdFlagsHash | kCmdFlagsSlow | kCmdFlagsUpdateCache | kCmdFlagsDoThroughDB | kCmdFlagsReadCache | kCmdReadBeforeQueue ); cmd_table->insert(std::pair>(kCmdNameHGetall, std::move(hgetallptr))); ////HExistsCmd std::unique_ptr hexistsptr = - std::make_unique(kCmdNameHExists, 3, kCmdFlagsRead | kCmdFlagsHash | kCmdFlagsUpdateCache | kCmdFlagsDoThroughDB | kCmdFlagsReadCache | kCmdFlagsFast); + std::make_unique(kCmdNameHExists, 3, kCmdFlagsRead | kCmdFlagsHash | kCmdFlagsUpdateCache | kCmdFlagsDoThroughDB | kCmdFlagsReadCache | kCmdFlagsFast ); cmd_table->insert(std::pair>(kCmdNameHExists, std::move(hexistsptr))); ////HIncrbyCmd std::unique_ptr hincrbyptr = @@ -420,7 +420,7 @@ void InitCmdTable(CmdTable* cmd_table) { cmd_table->insert(std::pair>(kCmdNameHLen, std::move(hlenptr))); ////HMgetCmd std::unique_ptr hmgetptr = - std::make_unique(kCmdNameHMget, -3, kCmdFlagsRead | kCmdFlagsHash | kCmdFlagsUpdateCache | kCmdFlagsDoThroughDB | kCmdFlagsReadCache | kCmdFlagsFast); + std::make_unique(kCmdNameHMget, -3, kCmdFlagsRead | kCmdFlagsHash | kCmdFlagsUpdateCache | kCmdFlagsDoThroughDB | kCmdFlagsReadCache |kCmdFlagsFast); cmd_table->insert(std::pair>(kCmdNameHMget, std::move(hmgetptr))); ////HMsetCmd std::unique_ptr hmsetptr = @@ -736,50 +736,50 @@ void InitCmdTable(CmdTable* cmd_table) { // PubSub ////Publish std::unique_ptr publishptr = - std::make_unique(kCmdNamePublish, 3, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsFast); + std::make_unique(kCmdNamePublish, 3, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsFast ); cmd_table->insert(std::pair>(kCmdNamePublish, std::move(publishptr))); ////Subscribe std::unique_ptr subscribeptr = - std::make_unique(kCmdNameSubscribe, -2, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsSlow); + std::make_unique(kCmdNameSubscribe, -2, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsSlow ); cmd_table->insert(std::pair>(kCmdNameSubscribe, std::move(subscribeptr))); ////UnSubscribe std::unique_ptr unsubscribeptr = - std::make_unique(kCmdNameUnSubscribe, -1, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsSlow); + std::make_unique(kCmdNameUnSubscribe, -1, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsSlow ); cmd_table->insert(std::pair>(kCmdNameUnSubscribe, std::move(unsubscribeptr))); ////PSubscribe std::unique_ptr psubscribeptr = - std::make_unique(kCmdNamePSubscribe, -2, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsSlow); + std::make_unique(kCmdNamePSubscribe, -2, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsSlow ); cmd_table->insert(std::pair>(kCmdNamePSubscribe, std::move(psubscribeptr))); ////PUnSubscribe std::unique_ptr punsubscribeptr = - std::make_unique(kCmdNamePUnSubscribe, -1, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsSlow); + std::make_unique(kCmdNamePUnSubscribe, -1, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsSlow ); cmd_table->insert(std::pair>(kCmdNamePUnSubscribe, std::move(punsubscribeptr))); ////PubSub std::unique_ptr pubsubptr = - std::make_unique(kCmdNamePubSub, -2, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsSlow); + std::make_unique(kCmdNamePubSub, -2, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsSlow ); cmd_table->insert(std::pair>(kCmdNamePubSub, std::move(pubsubptr))); ////ACL - std::unique_ptr aclptr = std::make_unique(KCmdNameAcl, -2, kCmdFlagsAdmin | kCmdFlagsSlow); + std::unique_ptr aclptr = std::make_unique(KCmdNameAcl, -2, kCmdFlagsAdmin | kCmdFlagsSlow ); cmd_table->insert(std::pair>(KCmdNameAcl, std::move(aclptr))); // Transaction ////Multi std::unique_ptr multiptr = - std::make_unique(kCmdNameMulti, 1, kCmdFlagsRead | kCmdFlagsFast); + std::make_unique(kCmdNameMulti, 1, kCmdFlagsRead | kCmdFlagsFast ); cmd_table->insert(std::pair>(kCmdNameMulti, std::move(multiptr))); ////Exec std::unique_ptr execptr = std::make_unique( - kCmdNameExec, 1, kCmdFlagsRead | kCmdFlagsWrite | kCmdFlagsSuspend | kCmdFlagsSlow); + kCmdNameExec, 1, kCmdFlagsRead | kCmdFlagsWrite | kCmdFlagsSuspend | kCmdFlagsSlow ); cmd_table->insert(std::pair>(kCmdNameExec, std::move(execptr))); ////Discard - std::unique_ptr discardptr = std::make_unique(kCmdNameDiscard, 1, kCmdFlagsRead | kCmdFlagsFast); + std::unique_ptr discardptr = std::make_unique(kCmdNameDiscard, 1, kCmdFlagsRead | kCmdFlagsFast ); cmd_table->insert(std::pair>(kCmdNameDiscard, std::move(discardptr))); ////Watch - std::unique_ptr watchptr = std::make_unique(kCmdNameWatch, -2, kCmdFlagsRead | kCmdFlagsFast); + std::unique_ptr watchptr = std::make_unique(kCmdNameWatch, -2, kCmdFlagsRead | kCmdFlagsFast ); cmd_table->insert(std::pair>(kCmdNameWatch, std::move(watchptr))); ////Unwatch - std::unique_ptr unwatchptr = std::make_unique(kCmdNameUnWatch, 1, kCmdFlagsRead | kCmdFlagsFast); + std::unique_ptr unwatchptr = std::make_unique(kCmdNameUnWatch, 1, kCmdFlagsRead | kCmdFlagsFast ); cmd_table->insert(std::pair>(kCmdNameUnWatch, std::move(unwatchptr))); // Stream @@ -911,6 +911,31 @@ void Cmd::DoCommand(const HintKeys& hint_keys) { } } +// just read in cache +bool Cmd::DoReadCommandInCache(const HintKeys& hint_keys) { + if (!IsSuspend()) { + db_->DBLockShared(); + } + DEFER { + if (!IsSuspend()) { + db_->DBUnlockShared(); + } + }; + if (IsNeedCacheDo() + && PIKA_CACHE_NONE != g_pika_conf->cache_mode() + && db_->cache()->CacheStatus() == PIKA_CACHE_STATUS_OK) { + if (IsNeedReadCache()) { + ReadCache(); + } + // return true only the read command hit + if (is_read() && !res().CacheMiss()) { + return true; + } + } + return false; +} + + void Cmd::DoBinlog() { if (res().ok() && is_write() && g_pika_conf->write_binlog()) { std::shared_ptr conn_ptr = GetConn(); @@ -940,6 +965,7 @@ void Cmd::DoBinlog() { bool Cmd::hasFlag(uint32_t flag) const { return (flag_ & flag); } bool Cmd::is_read() const { return (flag_ & kCmdFlagsRead); } bool Cmd::is_write() const { return (flag_ & kCmdFlagsWrite); } +bool Cmd::isCacheRead() const { return (flag_ & kCmdReadBeforeQueue);} bool Cmd::IsLocal() const { return (flag_ & kCmdFlagsLocal); } int8_t Cmd::SubCmdIndex(const std::string& cmdName) { diff --git a/tests/integration/stream_test.go b/tests/integration/stream_test.go index 79dc99392c..c36962fe59 100644 --- a/tests/integration/stream_test.go +++ b/tests/integration/stream_test.go @@ -120,21 +120,15 @@ func parseStreamEntryID(id string) (ts int64, seqNum int64) { var _ = Describe("Stream Commands", func() { ctx := context.TODO() var client *redis.Client - client = redis.NewClient(PikaOption(SINGLEADDR)) - client.FlushDB(ctx) - if GlobalBefore != nil { - GlobalBefore(ctx, client) - } + client = redis.NewClient(PikaOption(SINGLEADDR)) + client.FlushDB(ctx) BeforeEach(func() { - // client = redis.NewClient(pikaOptions1()) - // Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred()) + if GlobalBefore != nil { + GlobalBefore(ctx, client) + } }) - AfterEach(func() { - // Expect(client.Close()).NotTo(HaveOccurred()) - // Expect(client.Del(ctx, "mystream").Err()).NotTo(HaveOccurred()) - }) Describe("passed tests", func() { It("should concurrently add and read messages in the stream with separate clients", func() {