Skip to content

Commit

Permalink
feat: Improve the RTC process of Read/Write model (OpenAtomFoundation…
Browse files Browse the repository at this point in the history
…#2629)

* (Demo) Do read cmd before task queue. && add workflow_dispatch for manual action

* Check authed and write lock
,fix go test error in MacOS and cache mode judge

* fix some ut error by  commands filter  and return logic

* rollback some flag,but add kCmdReadBeforeQueuefor get mget hget hget hgetall,hmget

* fix mem error in macos

* move mget and hmget;add before_queue_time metrics

* fix cost to copy cmd_table by remove c_ptr
  • Loading branch information
chenbt-hz authored and cheniujh committed Jul 30, 2024
1 parent bd1d7af commit fd078b8
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 30 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pika.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ on:
branches: [ "unstable", "3.5" , "4.0"]
pull_request:
branches: [ "unstable", "3.5" , "4.0"]

workflow_dispatch:
env:
# Customize the CMake build type here (Release, Debug, RelWithDebInfo, etc.)
BUILD_TYPE: RelWithDebInfo
Expand Down
7 changes: 7 additions & 0 deletions include/pika_client_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ struct TimeStat {
void Reset() {
enqueue_ts_ = dequeue_ts_ = 0;
process_done_ts_ = 0;
before_queue_ts_ = 0;
}

uint64_t start_ts() const {
Expand All @@ -37,8 +38,13 @@ struct TimeStat {
return process_done_ts_ > dequeue_ts_ ? process_done_ts_ - dequeue_ts_ : 0;
}

uint64_t before_queue_time() const {
return process_done_ts_ > dequeue_ts_ ? before_queue_ts_ - enqueue_ts_ : 0;
}

uint64_t enqueue_ts_;
uint64_t dequeue_ts_;
uint64_t before_queue_ts_;
uint64_t process_done_ts_;
};

Expand Down Expand Up @@ -69,6 +75,7 @@ class PikaClientConn : public net::RedisConn {

void ProcessRedisCmds(const std::vector<net::RedisCmdArgsType>& argvs, bool async, std::string* response) override;

bool BatchReadCmdInCache(const std::vector<net::RedisCmdArgsType>& argvs);
void BatchExecRedisCmd(const std::vector<net::RedisCmdArgsType>& argvs);
int DealMessage(const net::RedisCmdArgsType& argv, std::string* response) override { return 0; }
static void DoBackgroundTask(void* arg);
Expand Down
5 changes: 5 additions & 0 deletions include/pika_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,8 @@ const std::string kCmdNameXInfo = "xinfo";

const std::string kClusterPrefix = "pkcluster";

const std::unordered_set<std::string> interceptCmds = {kCmdNameGet, kCmdNameHGet, kCmdNameHGetall};

/*
* If a type holds a key, a new data structure
* that uses the key will use this error
Expand Down Expand Up @@ -290,6 +292,7 @@ enum CmdFlags {
kCmdFlagsStream = (1 << 20),
kCmdFlagsFast = (1 << 21),
kCmdFlagsSlow = (1 << 22),
kCmdReadBeforeQueue = (1 << 23),
};

void inline RedisAppendContent(std::string& str, const std::string& value);
Expand Down Expand Up @@ -536,6 +539,7 @@ class Cmd : public std::enable_shared_from_this<Cmd> {
bool hasFlag(uint32_t flag) const;
bool is_read() const;
bool is_write() const;
bool isCacheRead() const;

bool IsLocal() const;
bool IsSuspend() const;
Expand Down Expand Up @@ -579,6 +583,7 @@ class Cmd : public std::enable_shared_from_this<Cmd> {
void ProcessCommand(const HintKeys& hint_key = HintKeys());
void InternalProcessCommand(const HintKeys& hint_key);
void DoCommand(const HintKeys& hint_key);
bool DoReadCommandInCache(const HintKeys& hint_key = HintKeys());
void LogCommand() const;

std::string name_;
Expand Down
59 changes: 58 additions & 1 deletion src/pika_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "include/pika_server.h"
#include "net/src/dispatch_thread.h"
#include "net/src/worker_thread.h"
#include "src/pstd/include/scope_record_lock.h"

extern std::unique_ptr<PikaConf> g_pika_conf;
extern PikaServer* g_pika_server;
Expand Down Expand Up @@ -237,6 +238,7 @@ void PikaClientConn::ProcessSlowlog(const PikaCmdArgsType& argv, uint64_t do_dur
LOG(ERROR) << "ip_port: " << ip_port() << ", db: " << current_db_ << ", command:" << slow_log
<< ", command_size: " << cmd_size - 1 << ", arguments: " << argv.size()
<< ", total_time(ms): " << time_stat_->total_time() / 1000
<< ", before_queue_time(ms): " << time_stat_->before_queue_time() / 1000
<< ", queue_time(ms): " << time_stat_->queue_time() / 1000
<< ", process_time(ms): " << time_stat_->process_time() / 1000
<< ", cmd_time(ms): " << do_duration / 1000;
Expand All @@ -261,7 +263,7 @@ void PikaClientConn::ProcessRedisCmds(const std::vector<net::RedisCmdArgsType>&
if (async) {
auto arg = new BgTaskArg();
arg->redis_cmds = argvs;
time_stat_->enqueue_ts_ = pstd::NowMicros();
time_stat_->enqueue_ts_ = time_stat_->before_queue_ts_ = pstd::NowMicros();
arg->conn_ptr = std::dynamic_pointer_cast<PikaClientConn>(shared_from_this());
/**
* If using the pipeline method to transmit batch commands to Pika, it is unable to
Expand All @@ -273,6 +275,18 @@ void PikaClientConn::ProcessRedisCmds(const std::vector<net::RedisCmdArgsType>&
pstd::StringToLower(opt);
bool is_slow_cmd = g_pika_conf->is_slow_cmd(opt);
bool is_admin_cmd = g_pika_conf->is_admin_cmd(opt);
bool read_status = false;

if (PIKA_CACHE_NONE != g_pika_conf->cache_mode() && !IsInTxn() && interceptCmds.find(opt) != interceptCmds.end()){
// read in cache
if (BatchReadCmdInCache(argvs)){
delete arg;
arg = nullptr;
return;
}
time_stat_->before_queue_ts_ = pstd::NowMicros();
}

g_pika_server->ScheduleClientPool(&DoBackgroundTask, arg, is_slow_cmd, is_admin_cmd);
return;
}
Expand Down Expand Up @@ -308,6 +322,49 @@ void PikaClientConn::BatchExecRedisCmd(const std::vector<net::RedisCmdArgsType>&
TryWriteResp();
}

bool PikaClientConn::BatchReadCmdInCache(const std::vector<net::RedisCmdArgsType>& argvs){
resp_num.store(static_cast<int32_t>(argvs.size()));
bool read_status = true;
for (const auto& argv : argvs) {
std::shared_ptr<std::string> resp_ptr = std::make_shared<std::string>();
resp_array.push_back(resp_ptr);
std::shared_ptr<Cmd> c_ptr = g_pika_cmd_table_manager->GetCmd(argv[0]);
if (!c_ptr) {
return false;
}
// Check authed
if (AuthRequired()) { // the user is not authed, need to do auth
if (!(c_ptr->flag() & kCmdFlagsNoAuth)) {
c_ptr->res().SetRes(CmdRes::kErrOther, "NOAUTH Authentication required.");
return false;
}
}
// Initial
c_ptr->Initial(argv, current_db_);
pstd::lock::MultiRecordLock record_lock(c_ptr->db_->LockMgr());
auto cur_keys = c_ptr->current_key();
if (!cur_keys.empty()){
record_lock.Lock(cur_keys);
}

read_status = c_ptr->DoReadCommandInCache();
time_stat_->process_done_ts_ = pstd::NowMicros();
auto cmdstat_map = g_pika_cmd_table_manager->GetCommandStatMap();
(*cmdstat_map)[argv[0]].cmd_count.fetch_add(1);
(*cmdstat_map)[argv[0]].cmd_time_consuming.fetch_add(time_stat_->total_time());
*resp_ptr = std::move(c_ptr->res().message());
resp_num--;
record_lock.Unlock(cur_keys);
}

if(!read_status){
return read_status;
}else{
TryWriteResp();
return read_status;
}
}

void PikaClientConn::TryWriteResp() {
int expected = 0;
if (resp_num.compare_exchange_strong(expected, -1)) {
Expand Down
60 changes: 43 additions & 17 deletions src/pika_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ void InitCmdTable(CmdTable* cmd_table) {
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameSet, std::move(setptr)));
////GetCmd
std::unique_ptr<Cmd> getptr =
std::make_unique<GetCmd>(kCmdNameGet, 2, kCmdFlagsRead | kCmdFlagsKv | kCmdFlagsDoThroughDB | kCmdFlagsUpdateCache | kCmdFlagsReadCache | kCmdFlagsSlow);
std::make_unique<GetCmd>(kCmdNameGet, 2, kCmdFlagsRead | kCmdFlagsKv | kCmdFlagsDoThroughDB | kCmdFlagsUpdateCache | kCmdFlagsReadCache | kCmdReadBeforeQueue | kCmdFlagsSlow);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameGet, std::move(getptr)));
////DelCmd
std::unique_ptr<Cmd> delptr =
Expand Down Expand Up @@ -392,15 +392,15 @@ void InitCmdTable(CmdTable* cmd_table) {
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameHSet, std::move(hsetptr)));
////HGetCmd
std::unique_ptr<Cmd> hgetptr =
std::make_unique<HGetCmd>(kCmdNameHGet, 3, kCmdFlagsRead | kCmdFlagsHash | kCmdFlagsUpdateCache | kCmdFlagsDoThroughDB | kCmdFlagsReadCache | kCmdFlagsFast);
std::make_unique<HGetCmd>(kCmdNameHGet, 3, kCmdFlagsRead | kCmdFlagsHash | kCmdFlagsUpdateCache | kCmdFlagsDoThroughDB | kCmdFlagsReadCache | kCmdReadBeforeQueue |kCmdFlagsFast);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameHGet, std::move(hgetptr)));
////HGetallCmd
std::unique_ptr<Cmd> hgetallptr =
std::make_unique<HGetallCmd>(kCmdNameHGetall, 2, kCmdFlagsRead | kCmdFlagsHash | kCmdFlagsSlow | kCmdFlagsUpdateCache | kCmdFlagsDoThroughDB | kCmdFlagsReadCache);
std::make_unique<HGetallCmd>(kCmdNameHGetall, 2, kCmdFlagsRead | kCmdFlagsHash | kCmdFlagsSlow | kCmdFlagsUpdateCache | kCmdFlagsDoThroughDB | kCmdFlagsReadCache | kCmdReadBeforeQueue );
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameHGetall, std::move(hgetallptr)));
////HExistsCmd
std::unique_ptr<Cmd> hexistsptr =
std::make_unique<HExistsCmd>(kCmdNameHExists, 3, kCmdFlagsRead | kCmdFlagsHash | kCmdFlagsUpdateCache | kCmdFlagsDoThroughDB | kCmdFlagsReadCache | kCmdFlagsFast);
std::make_unique<HExistsCmd>(kCmdNameHExists, 3, kCmdFlagsRead | kCmdFlagsHash | kCmdFlagsUpdateCache | kCmdFlagsDoThroughDB | kCmdFlagsReadCache | kCmdFlagsFast );
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameHExists, std::move(hexistsptr)));
////HIncrbyCmd
std::unique_ptr<Cmd> hincrbyptr =
Expand All @@ -420,7 +420,7 @@ void InitCmdTable(CmdTable* cmd_table) {
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameHLen, std::move(hlenptr)));
////HMgetCmd
std::unique_ptr<Cmd> hmgetptr =
std::make_unique<HMgetCmd>(kCmdNameHMget, -3, kCmdFlagsRead | kCmdFlagsHash | kCmdFlagsUpdateCache | kCmdFlagsDoThroughDB | kCmdFlagsReadCache | kCmdFlagsFast);
std::make_unique<HMgetCmd>(kCmdNameHMget, -3, kCmdFlagsRead | kCmdFlagsHash | kCmdFlagsUpdateCache | kCmdFlagsDoThroughDB | kCmdFlagsReadCache |kCmdFlagsFast);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameHMget, std::move(hmgetptr)));
////HMsetCmd
std::unique_ptr<Cmd> hmsetptr =
Expand Down Expand Up @@ -736,50 +736,50 @@ void InitCmdTable(CmdTable* cmd_table) {
// PubSub
////Publish
std::unique_ptr<Cmd> publishptr =
std::make_unique<PublishCmd>(kCmdNamePublish, 3, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsFast);
std::make_unique<PublishCmd>(kCmdNamePublish, 3, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsFast );
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNamePublish, std::move(publishptr)));
////Subscribe
std::unique_ptr<Cmd> subscribeptr =
std::make_unique<SubscribeCmd>(kCmdNameSubscribe, -2, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsSlow);
std::make_unique<SubscribeCmd>(kCmdNameSubscribe, -2, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsSlow );
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameSubscribe, std::move(subscribeptr)));
////UnSubscribe
std::unique_ptr<Cmd> unsubscribeptr =
std::make_unique<UnSubscribeCmd>(kCmdNameUnSubscribe, -1, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsSlow);
std::make_unique<UnSubscribeCmd>(kCmdNameUnSubscribe, -1, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsSlow );
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameUnSubscribe, std::move(unsubscribeptr)));
////PSubscribe
std::unique_ptr<Cmd> psubscribeptr =
std::make_unique<PSubscribeCmd>(kCmdNamePSubscribe, -2, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsSlow);
std::make_unique<PSubscribeCmd>(kCmdNamePSubscribe, -2, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsSlow );
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNamePSubscribe, std::move(psubscribeptr)));
////PUnSubscribe
std::unique_ptr<Cmd> punsubscribeptr =
std::make_unique<PUnSubscribeCmd>(kCmdNamePUnSubscribe, -1, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsSlow);
std::make_unique<PUnSubscribeCmd>(kCmdNamePUnSubscribe, -1, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsSlow );
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNamePUnSubscribe, std::move(punsubscribeptr)));
////PubSub
std::unique_ptr<Cmd> pubsubptr =
std::make_unique<PubSubCmd>(kCmdNamePubSub, -2, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsSlow);
std::make_unique<PubSubCmd>(kCmdNamePubSub, -2, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsSlow );
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNamePubSub, std::move(pubsubptr)));

////ACL
std::unique_ptr<Cmd> aclptr = std::make_unique<PikaAclCmd>(KCmdNameAcl, -2, kCmdFlagsAdmin | kCmdFlagsSlow);
std::unique_ptr<Cmd> aclptr = std::make_unique<PikaAclCmd>(KCmdNameAcl, -2, kCmdFlagsAdmin | kCmdFlagsSlow );
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(KCmdNameAcl, std::move(aclptr)));

// Transaction
////Multi
std::unique_ptr<Cmd> multiptr =
std::make_unique<MultiCmd>(kCmdNameMulti, 1, kCmdFlagsRead | kCmdFlagsFast);
std::make_unique<MultiCmd>(kCmdNameMulti, 1, kCmdFlagsRead | kCmdFlagsFast );
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameMulti, std::move(multiptr)));
////Exec
std::unique_ptr<Cmd> execptr = std::make_unique<ExecCmd>(
kCmdNameExec, 1, kCmdFlagsRead | kCmdFlagsWrite | kCmdFlagsSuspend | kCmdFlagsSlow);
kCmdNameExec, 1, kCmdFlagsRead | kCmdFlagsWrite | kCmdFlagsSuspend | kCmdFlagsSlow );
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameExec, std::move(execptr)));
////Discard
std::unique_ptr<Cmd> discardptr = std::make_unique<DiscardCmd>(kCmdNameDiscard, 1, kCmdFlagsRead | kCmdFlagsFast);
std::unique_ptr<Cmd> discardptr = std::make_unique<DiscardCmd>(kCmdNameDiscard, 1, kCmdFlagsRead | kCmdFlagsFast );
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameDiscard, std::move(discardptr)));
////Watch
std::unique_ptr<Cmd> watchptr = std::make_unique<WatchCmd>(kCmdNameWatch, -2, kCmdFlagsRead | kCmdFlagsFast);
std::unique_ptr<Cmd> watchptr = std::make_unique<WatchCmd>(kCmdNameWatch, -2, kCmdFlagsRead | kCmdFlagsFast );
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameWatch, std::move(watchptr)));
////Unwatch
std::unique_ptr<Cmd> unwatchptr = std::make_unique<UnwatchCmd>(kCmdNameUnWatch, 1, kCmdFlagsRead | kCmdFlagsFast);
std::unique_ptr<Cmd> unwatchptr = std::make_unique<UnwatchCmd>(kCmdNameUnWatch, 1, kCmdFlagsRead | kCmdFlagsFast );
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameUnWatch, std::move(unwatchptr)));

// Stream
Expand Down Expand Up @@ -911,6 +911,31 @@ void Cmd::DoCommand(const HintKeys& hint_keys) {
}
}

// just read in cache
bool Cmd::DoReadCommandInCache(const HintKeys& hint_keys) {
if (!IsSuspend()) {
db_->DBLockShared();
}
DEFER {
if (!IsSuspend()) {
db_->DBUnlockShared();
}
};
if (IsNeedCacheDo()
&& PIKA_CACHE_NONE != g_pika_conf->cache_mode()
&& db_->cache()->CacheStatus() == PIKA_CACHE_STATUS_OK) {
if (IsNeedReadCache()) {
ReadCache();
}
// return true only the read command hit
if (is_read() && !res().CacheMiss()) {
return true;
}
}
return false;
}


void Cmd::DoBinlog() {
if (res().ok() && is_write() && g_pika_conf->write_binlog()) {
std::shared_ptr<net::NetConn> conn_ptr = GetConn();
Expand Down Expand Up @@ -940,6 +965,7 @@ void Cmd::DoBinlog() {
bool Cmd::hasFlag(uint32_t flag) const { return (flag_ & flag); }
bool Cmd::is_read() const { return (flag_ & kCmdFlagsRead); }
bool Cmd::is_write() const { return (flag_ & kCmdFlagsWrite); }
bool Cmd::isCacheRead() const { return (flag_ & kCmdReadBeforeQueue);}
bool Cmd::IsLocal() const { return (flag_ & kCmdFlagsLocal); }

int8_t Cmd::SubCmdIndex(const std::string& cmdName) {
Expand Down
16 changes: 5 additions & 11 deletions tests/integration/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,21 +120,15 @@ func parseStreamEntryID(id string) (ts int64, seqNum int64) {
var _ = Describe("Stream Commands", func() {
ctx := context.TODO()
var client *redis.Client
client = redis.NewClient(PikaOption(SINGLEADDR))
client.FlushDB(ctx)
if GlobalBefore != nil {
GlobalBefore(ctx, client)
}
client = redis.NewClient(PikaOption(SINGLEADDR))
client.FlushDB(ctx)

BeforeEach(func() {
// client = redis.NewClient(pikaOptions1())
// Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
if GlobalBefore != nil {
GlobalBefore(ctx, client)
}
})

AfterEach(func() {
// Expect(client.Close()).NotTo(HaveOccurred())
// Expect(client.Del(ctx, "mystream").Err()).NotTo(HaveOccurred())
})

Describe("passed tests", func() {
It("should concurrently add and read messages in the stream with separate clients", func() {
Expand Down

0 comments on commit fd078b8

Please sign in to comment.