diff --git a/include/pika_client_conn.h b/include/pika_client_conn.h index 9008d848c8..30a371f6cf 100644 --- a/include/pika_client_conn.h +++ b/include/pika_client_conn.h @@ -19,6 +19,7 @@ struct TimeStat { void Reset() { enqueue_ts_ = dequeue_ts_ = 0; process_done_ts_ = 0; + before_queue_ts_ = 0; } uint64_t start_ts() const { @@ -37,8 +38,13 @@ struct TimeStat { return process_done_ts_ > dequeue_ts_ ? process_done_ts_ - dequeue_ts_ : 0; } + uint64_t before_queue_time() const { + return process_done_ts_ > dequeue_ts_ ? before_queue_ts_ - enqueue_ts_ : 0; + } + uint64_t enqueue_ts_; uint64_t dequeue_ts_; + uint64_t before_queue_ts_; uint64_t process_done_ts_; }; @@ -67,8 +73,11 @@ class PikaClientConn : public net::RedisConn { const net::HandleType& handle_type, int max_conn_rbuf_size); ~PikaClientConn() = default; + bool IsInterceptedByRTC(std::string& opt); + void ProcessRedisCmds(const std::vector& argvs, bool async, std::string* response) override; + bool ReadCmdInCache(const net::RedisCmdArgsType& argv, const std::string& opt); void BatchExecRedisCmd(const std::vector& argvs); int DealMessage(const net::RedisCmdArgsType& argv, std::string* response) override { return 0; } static void DoBackgroundTask(void* arg); diff --git a/include/pika_command.h b/include/pika_command.h index 2c1d5c84fb..de06c332c8 100644 --- a/include/pika_command.h +++ b/include/pika_command.h @@ -247,6 +247,7 @@ const std::string kCmdNameXInfo = "xinfo"; const std::string kClusterPrefix = "pkcluster"; + /* * If a type holds a key, a new data structure * that uses the key will use this error @@ -289,7 +290,7 @@ enum CmdFlags { kCmdFlagsOperateKey = (1 << 19), // redis keySpace kCmdFlagsStream = (1 << 20), kCmdFlagsFast = (1 << 21), - kCmdFlagsSlow = (1 << 22), + kCmdFlagsSlow = (1 << 22) }; void inline RedisAppendContent(std::string& str, const std::string& value); @@ -536,6 +537,7 @@ class Cmd : public std::enable_shared_from_this { bool hasFlag(uint32_t flag) const; bool is_read() const; bool is_write() const; + bool isCacheRead() const; bool IsLocal() const; bool IsSuspend() const; @@ -579,6 +581,7 @@ class Cmd : public std::enable_shared_from_this { void ProcessCommand(const HintKeys& hint_key = HintKeys()); void InternalProcessCommand(const HintKeys& hint_key); void DoCommand(const HintKeys& hint_key); + bool DoReadCommandInCache(); void LogCommand() const; std::string name_; diff --git a/src/pika_client_conn.cc b/src/pika_client_conn.cc index daff036ceb..7834c057ef 100644 --- a/src/pika_client_conn.cc +++ b/src/pika_client_conn.cc @@ -18,6 +18,7 @@ #include "include/pika_server.h" #include "net/src/dispatch_thread.h" #include "net/src/worker_thread.h" +#include "src/pstd/include/scope_record_lock.h" extern std::unique_ptr g_pika_conf; extern PikaServer* g_pika_server; @@ -237,6 +238,7 @@ void PikaClientConn::ProcessSlowlog(const PikaCmdArgsType& argv, uint64_t do_dur LOG(ERROR) << "ip_port: " << ip_port() << ", db: " << current_db_ << ", command:" << slow_log << ", command_size: " << cmd_size - 1 << ", arguments: " << argv.size() << ", total_time(ms): " << time_stat_->total_time() / 1000 + << ", before_queue_time(ms): " << time_stat_->before_queue_time() / 1000 << ", queue_time(ms): " << time_stat_->queue_time() / 1000 << ", process_time(ms): " << time_stat_->process_time() / 1000 << ", cmd_time(ms): " << do_duration / 1000; @@ -255,13 +257,24 @@ void PikaClientConn::ProcessMonitor(const PikaCmdArgsType& argv) { g_pika_server->AddMonitorMessage(monitor_message); } +bool PikaClientConn::IsInterceptedByRTC(std::string& opt) { + //currently we only Intercept: Get, HGet + if (opt == kCmdNameGet && g_pika_conf->GetCacheString()) { + return true; + } + if (opt == kCmdNameHGet && g_pika_conf->GetCacheHash()) { + return true; + } + return false; +} + void PikaClientConn::ProcessRedisCmds(const std::vector& argvs, bool async, std::string* response) { time_stat_->Reset(); if (async) { auto arg = new BgTaskArg(); arg->redis_cmds = argvs; - time_stat_->enqueue_ts_ = pstd::NowMicros(); + time_stat_->enqueue_ts_ = time_stat_->before_queue_ts_ = pstd::NowMicros(); arg->conn_ptr = std::dynamic_pointer_cast(shared_from_this()); /** * If using the pipeline method to transmit batch commands to Pika, it is unable to @@ -273,6 +286,19 @@ void PikaClientConn::ProcessRedisCmds(const std::vector& pstd::StringToLower(opt); bool is_slow_cmd = g_pika_conf->is_slow_cmd(opt); bool is_admin_cmd = g_pika_conf->is_admin_cmd(opt); + + //we don't intercept pipeline batch (argvs.size() > 1) + if (argvs.size() == 1 && IsInterceptedByRTC(opt) && + PIKA_CACHE_NONE != g_pika_conf->cache_mode() && + !IsInTxn()) { + // read in cache + if (ReadCmdInCache(argvs[0], opt)) { + delete arg; + return; + } + time_stat_->before_queue_ts_ = pstd::NowMicros(); + } + g_pika_server->ScheduleClientPool(&DoBackgroundTask, arg, is_slow_cmd, is_admin_cmd); return; } @@ -308,6 +334,51 @@ void PikaClientConn::BatchExecRedisCmd(const std::vector& TryWriteResp(); } +bool PikaClientConn::ReadCmdInCache(const net::RedisCmdArgsType& argv, const std::string& opt) { + resp_num.store(1); + std::shared_ptr c_ptr = g_pika_cmd_table_manager->GetCmd(opt); + if (!c_ptr) { + return false; + } + // Check authed + if (AuthRequired()) { // the user is not authed, need to do auth + if (!(c_ptr->flag() & kCmdFlagsNoAuth)) { + return false; + } + } + // Initial + c_ptr->Initial(argv, current_db_); + //acl check + int8_t subCmdIndex = -1; + std::string errKey; + auto checkRes = user_->CheckUserPermission(c_ptr, argv, subCmdIndex, &errKey); + std::string object; + if (checkRes == AclDeniedCmd::CMD || + checkRes == AclDeniedCmd::KEY || + checkRes == AclDeniedCmd::CHANNEL || + checkRes == AclDeniedCmd::NO_SUB_CMD || + checkRes == AclDeniedCmd::NO_AUTH + ) { + //acl check failed + return false; + } + //only read command(Get, HGet) will reach here, no need of record lock + if (c_ptr->db_->cache()->CacheStatus() != PIKA_CACHE_STATUS_OK) { + return false; + } + bool read_status = c_ptr->DoReadCommandInCache(); + auto cmdstat_map = g_pika_cmd_table_manager->GetCommandStatMap(); + resp_num--; + if (read_status) { + time_stat_->process_done_ts_ = pstd::NowMicros(); + (*cmdstat_map)[argv[0]].cmd_count.fetch_add(1); + (*cmdstat_map)[argv[0]].cmd_time_consuming.fetch_add(time_stat_->total_time()); + resp_array.emplace_back(std::make_shared(std::move(c_ptr->res().message()))); + TryWriteResp(); + } + return read_status; +} + void PikaClientConn::TryWriteResp() { int expected = 0; if (resp_num.compare_exchange_strong(expected, -1)) { diff --git a/src/pika_command.cc b/src/pika_command.cc index bae2edd144..b92f75dd22 100644 --- a/src/pika_command.cc +++ b/src/pika_command.cc @@ -243,7 +243,7 @@ void InitCmdTable(CmdTable* cmd_table) { cmd_table->insert(std::pair>(kCmdNameSet, std::move(setptr))); ////GetCmd std::unique_ptr getptr = - std::make_unique(kCmdNameGet, 2, kCmdFlagsRead | kCmdFlagsKv | kCmdFlagsDoThroughDB | kCmdFlagsUpdateCache | kCmdFlagsReadCache | kCmdFlagsSlow); + std::make_unique(kCmdNameGet, 2, kCmdFlagsRead | kCmdFlagsKv | kCmdFlagsDoThroughDB | kCmdFlagsUpdateCache | kCmdFlagsReadCache | kCmdFlagsSlow); cmd_table->insert(std::pair>(kCmdNameGet, std::move(getptr))); ////DelCmd std::unique_ptr delptr = @@ -392,7 +392,7 @@ void InitCmdTable(CmdTable* cmd_table) { cmd_table->insert(std::pair>(kCmdNameHSet, std::move(hsetptr))); ////HGetCmd std::unique_ptr hgetptr = - std::make_unique(kCmdNameHGet, 3, kCmdFlagsRead | kCmdFlagsHash | kCmdFlagsUpdateCache | kCmdFlagsDoThroughDB | kCmdFlagsReadCache | kCmdFlagsFast); + std::make_unique(kCmdNameHGet, 3, kCmdFlagsRead | kCmdFlagsHash | kCmdFlagsUpdateCache | kCmdFlagsDoThroughDB | kCmdFlagsReadCache |kCmdFlagsFast); cmd_table->insert(std::pair>(kCmdNameHGet, std::move(hgetptr))); ////HGetallCmd std::unique_ptr hgetallptr = @@ -400,7 +400,7 @@ void InitCmdTable(CmdTable* cmd_table) { cmd_table->insert(std::pair>(kCmdNameHGetall, std::move(hgetallptr))); ////HExistsCmd std::unique_ptr hexistsptr = - std::make_unique(kCmdNameHExists, 3, kCmdFlagsRead | kCmdFlagsHash | kCmdFlagsUpdateCache | kCmdFlagsDoThroughDB | kCmdFlagsReadCache | kCmdFlagsFast); + std::make_unique(kCmdNameHExists, 3, kCmdFlagsRead | kCmdFlagsHash | kCmdFlagsUpdateCache | kCmdFlagsDoThroughDB | kCmdFlagsReadCache | kCmdFlagsFast ); cmd_table->insert(std::pair>(kCmdNameHExists, std::move(hexistsptr))); ////HIncrbyCmd std::unique_ptr hincrbyptr = @@ -420,7 +420,7 @@ void InitCmdTable(CmdTable* cmd_table) { cmd_table->insert(std::pair>(kCmdNameHLen, std::move(hlenptr))); ////HMgetCmd std::unique_ptr hmgetptr = - std::make_unique(kCmdNameHMget, -3, kCmdFlagsRead | kCmdFlagsHash | kCmdFlagsUpdateCache | kCmdFlagsDoThroughDB | kCmdFlagsReadCache | kCmdFlagsFast); + std::make_unique(kCmdNameHMget, -3, kCmdFlagsRead | kCmdFlagsHash | kCmdFlagsUpdateCache | kCmdFlagsDoThroughDB | kCmdFlagsReadCache |kCmdFlagsFast); cmd_table->insert(std::pair>(kCmdNameHMget, std::move(hmgetptr))); ////HMsetCmd std::unique_ptr hmsetptr = @@ -736,50 +736,50 @@ void InitCmdTable(CmdTable* cmd_table) { // PubSub ////Publish std::unique_ptr publishptr = - std::make_unique(kCmdNamePublish, 3, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsFast); + std::make_unique(kCmdNamePublish, 3, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsFast ); cmd_table->insert(std::pair>(kCmdNamePublish, std::move(publishptr))); ////Subscribe std::unique_ptr subscribeptr = - std::make_unique(kCmdNameSubscribe, -2, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsSlow); + std::make_unique(kCmdNameSubscribe, -2, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsSlow ); cmd_table->insert(std::pair>(kCmdNameSubscribe, std::move(subscribeptr))); ////UnSubscribe std::unique_ptr unsubscribeptr = - std::make_unique(kCmdNameUnSubscribe, -1, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsSlow); + std::make_unique(kCmdNameUnSubscribe, -1, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsSlow ); cmd_table->insert(std::pair>(kCmdNameUnSubscribe, std::move(unsubscribeptr))); ////PSubscribe std::unique_ptr psubscribeptr = - std::make_unique(kCmdNamePSubscribe, -2, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsSlow); + std::make_unique(kCmdNamePSubscribe, -2, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsSlow ); cmd_table->insert(std::pair>(kCmdNamePSubscribe, std::move(psubscribeptr))); ////PUnSubscribe std::unique_ptr punsubscribeptr = - std::make_unique(kCmdNamePUnSubscribe, -1, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsSlow); + std::make_unique(kCmdNamePUnSubscribe, -1, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsSlow ); cmd_table->insert(std::pair>(kCmdNamePUnSubscribe, std::move(punsubscribeptr))); ////PubSub std::unique_ptr pubsubptr = - std::make_unique(kCmdNamePubSub, -2, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsSlow); + std::make_unique(kCmdNamePubSub, -2, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsSlow ); cmd_table->insert(std::pair>(kCmdNamePubSub, std::move(pubsubptr))); ////ACL - std::unique_ptr aclptr = std::make_unique(KCmdNameAcl, -2, kCmdFlagsAdmin | kCmdFlagsSlow); + std::unique_ptr aclptr = std::make_unique(KCmdNameAcl, -2, kCmdFlagsAdmin | kCmdFlagsSlow ); cmd_table->insert(std::pair>(KCmdNameAcl, std::move(aclptr))); // Transaction ////Multi std::unique_ptr multiptr = - std::make_unique(kCmdNameMulti, 1, kCmdFlagsRead | kCmdFlagsFast); + std::make_unique(kCmdNameMulti, 1, kCmdFlagsRead | kCmdFlagsFast ); cmd_table->insert(std::pair>(kCmdNameMulti, std::move(multiptr))); ////Exec std::unique_ptr execptr = std::make_unique( - kCmdNameExec, 1, kCmdFlagsRead | kCmdFlagsWrite | kCmdFlagsSuspend | kCmdFlagsSlow); + kCmdNameExec, 1, kCmdFlagsRead | kCmdFlagsWrite | kCmdFlagsSuspend | kCmdFlagsSlow ); cmd_table->insert(std::pair>(kCmdNameExec, std::move(execptr))); ////Discard - std::unique_ptr discardptr = std::make_unique(kCmdNameDiscard, 1, kCmdFlagsRead | kCmdFlagsFast); + std::unique_ptr discardptr = std::make_unique(kCmdNameDiscard, 1, kCmdFlagsRead | kCmdFlagsFast ); cmd_table->insert(std::pair>(kCmdNameDiscard, std::move(discardptr))); ////Watch - std::unique_ptr watchptr = std::make_unique(kCmdNameWatch, -2, kCmdFlagsRead | kCmdFlagsFast); + std::unique_ptr watchptr = std::make_unique(kCmdNameWatch, -2, kCmdFlagsRead | kCmdFlagsFast ); cmd_table->insert(std::pair>(kCmdNameWatch, std::move(watchptr))); ////Unwatch - std::unique_ptr unwatchptr = std::make_unique(kCmdNameUnWatch, 1, kCmdFlagsRead | kCmdFlagsFast); + std::unique_ptr unwatchptr = std::make_unique(kCmdNameUnWatch, 1, kCmdFlagsRead | kCmdFlagsFast ); cmd_table->insert(std::pair>(kCmdNameUnWatch, std::move(unwatchptr))); // Stream @@ -911,6 +911,29 @@ void Cmd::DoCommand(const HintKeys& hint_keys) { } } +bool Cmd::DoReadCommandInCache() { + if (!IsSuspend()) { + db_->DBLockShared(); + } + DEFER { + if (!IsSuspend()) { + db_->DBUnlockShared(); + } + }; + + if (db_->cache()->CacheStatus() == PIKA_CACHE_STATUS_OK) { + if (IsNeedReadCache()) { + ReadCache(); + } + // return true only the read command hit + if (is_read() && !res().CacheMiss()) { + return true; + } + } + return false; +} + + void Cmd::DoBinlog() { if (res().ok() && is_write() && g_pika_conf->write_binlog()) { std::shared_ptr conn_ptr = GetConn();