Skip to content

Commit

Permalink
Merge branch 'unstable' into ReadBeforeTaskqueue
Browse files Browse the repository at this point in the history
  • Loading branch information
chejinge authored Jun 25, 2024
2 parents 6e462fc + 3d3c6d1 commit ea811d3
Show file tree
Hide file tree
Showing 20 changed files with 1,947 additions and 1,045 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/codeql.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ name: "CodeQL"

on:
push:
branches: [ "unstable", "3.5" ]
branches: [ "unstable", "3.5" , "4.0"]
pull_request:
# The branches below must be a subset of the branches above
branches: [ "unstable", "3.5" ]
branches: [ "unstable", "3.5" , "4.0"]
schedule:
- cron: '25 19 * * 6'

Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/codis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ name: Codis

on:
push:
branches: [ "unstable", "3.5" ]
branches: [ "unstable", "3.5" , "4.0" ]
pull_request:
branches: [ "unstable", "3.5" ]
branches: [ "unstable", "3.5" , "4.0"]

jobs:

Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/pika.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ name: Pika

on:
push:
branches: [ "unstable", "3.5" ]
branches: [ "unstable", "3.5" , "4.0"]
pull_request:
branches: [ "unstable", "3.5" ]
workflow_dispatch:
branches: [ "unstable", "3.5" , "4.0"]
env:
# Customize the CMake build type here (Release, Debug, RelWithDebInfo, etc.)
BUILD_TYPE: RelWithDebInfo
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/tools_go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ name: Tools_go_build

on:
push:
branches: [ "unstable", "3.5" ]
branches: [ "unstable", "3.5" , "4.0"]
paths:
- 'tools/**'
pull_request:
branches: [ "unstable", "3.5" ]
branches: [ "unstable", "3.5" , "4.0"]
paths:
- 'tools/**'

Expand Down
2 changes: 1 addition & 1 deletion src/pika_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ void InitCmdTable(CmdTable* cmd_table) {
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNamePfCount, std::move(pfcountptr)));
////pfmergeCmd
std::unique_ptr<Cmd> pfmergeptr = std::make_unique<PfMergeCmd>(
kCmdNamePfMerge, -3, kCmdFlagsWrite | kCmdFlagsHyperLogLog | kCmdFlagsSlow);
kCmdNamePfMerge, -2, kCmdFlagsWrite | kCmdFlagsHyperLogLog | kCmdFlagsSlow);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNamePfMerge, std::move(pfmergeptr)));

// GEO
Expand Down
26 changes: 21 additions & 5 deletions src/storage/src/base_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "src/base_value_format.h"
#include "src/base_meta_value_format.h"
#include "src/lists_meta_value_format.h"
#include "src/pika_stream_meta_value.h"
#include "src/strings_value_format.h"
#include "src/zsets_data_key_format.h"
#include "src/debug.h"
Expand All @@ -36,11 +37,12 @@ class BaseMetaFilter : public rocksdb::CompactionFilter {
* The field designs of the remaining zset,set,hash and stream in meta-value
* are the same, so the same filtering strategy is used
*/
ParsedBaseKey parsed_key(key);
auto type = static_cast<enum DataType>(static_cast<uint8_t>(value[0]));
DEBUG("==========================START==========================");
if (type == DataType::kStrings) {
ParsedStringsValue parsed_strings_value(value);
DEBUG("[StringsFilter] key: {}, value = {}, timestamp: {}, cur_time: {}", key.ToString().c_str(),
DEBUG("[string type] key: %s, value = %s, timestamp: %llu, cur_time: %llu", parsed_key.Key().ToString().c_str(),
parsed_strings_value.UserValue().ToString().c_str(), parsed_strings_value.Etime(), cur_time);
if (parsed_strings_value.Etime() != 0 && parsed_strings_value.Etime() < cur_time) {
DEBUG("Drop[Stale]");
Expand All @@ -49,9 +51,17 @@ class BaseMetaFilter : public rocksdb::CompactionFilter {
DEBUG("Reserve");
return false;
}
} else if (type == DataType::kStreams) {
ParsedStreamMetaValue parsed_stream_meta_value(value);
DEBUG("[stream meta type], key: %s, entries_added = %llu, first_id: %s, last_id: %s, version: %llu",
parsed_key.Key().ToString().c_str(), parsed_stream_meta_value.entries_added(),
parsed_stream_meta_value.first_id().ToString().c_str(),
parsed_stream_meta_value.last_id().ToString().c_str(),
parsed_stream_meta_value.version());
return false;
} else if (type == DataType::kLists) {
ParsedListsMetaValue parsed_lists_meta_value(value);
DEBUG("[ListMetaFilter], key: {}, count = {}, timestamp: {}, cur_time: {}, version: {}", key.ToString().c_str(),
DEBUG("[list meta type], key: %s, count = %d, timestamp: %llu, cur_time: %llu, version: %llu", parsed_key.Key().ToString().c_str(),
parsed_lists_meta_value.Count(), parsed_lists_meta_value.Etime(), cur_time,
parsed_lists_meta_value.Version());

Expand All @@ -68,8 +78,9 @@ class BaseMetaFilter : public rocksdb::CompactionFilter {
return false;
} else {
ParsedBaseMetaValue parsed_base_meta_value(value);
DEBUG("[MetaFilter] key: {}, count = {}, timestamp: {}, cur_time: {}, version: {}", key.ToString().c_str(),
parsed_base_meta_value.Count(), parsed_base_meta_value.Etime(), cur_time, parsed_base_meta_value.Version());
DEBUG("[%s meta type] key: %s, count = %d, timestamp: %llu, cur_time: %llu, version: %llu",
DataTypeToString(type), parsed_key.Key().ToString().c_str(), parsed_base_meta_value.Count(),
parsed_base_meta_value.Etime(), cur_time, parsed_base_meta_value.Version());

if (parsed_base_meta_value.Etime() != 0 && parsed_base_meta_value.Etime() < cur_time &&
parsed_base_meta_value.Version() < cur_time) {
Expand Down Expand Up @@ -143,7 +154,12 @@ class BaseDataFilter : public rocksdb::CompactionFilter {
auto type = static_cast<enum DataType>(static_cast<uint8_t>(meta_value[0]));
if (type != type_) {
return true;
} else if (type == DataType::kHashes || type == DataType::kSets || type == DataType::kStreams || type == DataType::kZSets) {
} else if (type == DataType::kStreams) {
ParsedStreamMetaValue parsed_stream_meta_value(meta_value);
meta_not_found_ = false;
cur_meta_version_ = parsed_stream_meta_value.version();
cur_meta_etime_ = 0; // stream do not support ttl
} else if (type == DataType::kHashes || type == DataType::kSets || type == DataType::kZSets) {
ParsedBaseMetaValue parsed_base_meta_value(&meta_value);
meta_not_found_ = false;
cur_meta_version_ = parsed_base_meta_value.Version();
Expand Down
15 changes: 9 additions & 6 deletions src/storage/src/pika_stream_meta_value.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ class StreamMetaValue {
value_ = std::move(value);
assert(value_.size() == kDefaultStreamValueLength);
if (value_.size() != kDefaultStreamValueLength) {
LOG(ERROR) << "Invalid stream meta value length: ";
LOG(ERROR) << "Invalid stream meta value length: " << value_.size()
<< " expected: " << kDefaultStreamValueLength;
return;
}
char* pos = &value_[0];
Expand Down Expand Up @@ -215,7 +216,8 @@ class ParsedStreamMetaValue {
ParsedStreamMetaValue(const Slice& value) {
assert(value.size() == kDefaultStreamValueLength);
if (value.size() != kDefaultStreamValueLength) {
LOG(ERROR) << "Invalid stream meta value length: ";
LOG(ERROR) << "Invalid stream meta value length: " << value.size()
<< " expected: " << kDefaultStreamValueLength;
return;
}
char* pos = const_cast<char*>(value.data());
Expand Down Expand Up @@ -294,7 +296,7 @@ class StreamCGroupMetaValue {
uint64_t needed = kDefaultStreamCGroupValueLength;
assert(value_.size() == 0);
if (value_.size() != 0) {
LOG(FATAL) << "Init on a existed stream cgroup meta value!";
LOG(ERROR) << "Init on a existed stream cgroup meta value!";
return;
}
value_.resize(needed);
Expand All @@ -314,7 +316,8 @@ class StreamCGroupMetaValue {
value_ = std::move(value);
assert(value_.size() == kDefaultStreamCGroupValueLength);
if (value_.size() != kDefaultStreamCGroupValueLength) {
LOG(FATAL) << "Invalid stream cgroup meta value length: ";
LOG(ERROR) << "Invalid stream cgroup meta value length: " << value_.size()
<< " expected: " << kDefaultStreamValueLength;
return;
}
if (value_.size() == kDefaultStreamCGroupValueLength) {
Expand Down Expand Up @@ -373,7 +376,7 @@ class StreamConsumerMetaValue {
value_ = std::move(value);
assert(value_.size() == kDefaultStreamConsumerValueLength);
if (value_.size() != kDefaultStreamConsumerValueLength) {
LOG(FATAL) << "Invalid stream consumer meta value length: " << value_.size()
LOG(ERROR) << "Invalid stream consumer meta value length: " << value_.size()
<< " expected: " << kDefaultStreamConsumerValueLength;
return;
}
Expand All @@ -391,7 +394,7 @@ class StreamConsumerMetaValue {
pel_ = pel;
assert(value_.size() == 0);
if (value_.size() != 0) {
LOG(FATAL) << "Invalid stream consumer meta value length: " << value_.size() << " expected: 0";
LOG(ERROR) << "Invalid stream consumer meta value length: " << value_.size() << " expected: 0";
return;
}
uint64_t needed = kDefaultStreamConsumerValueLength;
Expand Down
74 changes: 38 additions & 36 deletions src/storage/src/redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,43 +117,44 @@ class Redis {
Status ScanStreamsKeyNum(KeyInfo* key_info);

// Keys Commands
virtual Status StringsExpire(const Slice& key, int64_t ttl);
virtual Status HashesExpire(const Slice& key, int64_t ttl);
virtual Status ListsExpire(const Slice& key, int64_t ttl);
virtual Status ZsetsExpire(const Slice& key, int64_t ttl);
virtual Status SetsExpire(const Slice& key, int64_t ttl);

virtual Status StringsDel(const Slice& key);
virtual Status HashesDel(const Slice& key);
virtual Status ListsDel(const Slice& key);
virtual Status ZsetsDel(const Slice& key);
virtual Status SetsDel(const Slice& key);
virtual Status StreamsDel(const Slice& key);

virtual Status StringsExpireat(const Slice& key, int64_t timestamp);
virtual Status HashesExpireat(const Slice& key, int64_t timestamp);
virtual Status ListsExpireat(const Slice& key, int64_t timestamp);
virtual Status SetsExpireat(const Slice& key, int64_t timestamp);
virtual Status ZsetsExpireat(const Slice& key, int64_t timestamp);

virtual Status StringsPersist(const Slice& key);
virtual Status HashesPersist(const Slice& key);
virtual Status ListsPersist(const Slice& key);
virtual Status ZsetsPersist(const Slice& key);
virtual Status SetsPersist(const Slice& key);

virtual Status StringsTTL(const Slice& key, int64_t* timestamp);
virtual Status HashesTTL(const Slice& key, int64_t* timestamp);
virtual Status ListsTTL(const Slice& key, int64_t* timestamp);
virtual Status ZsetsTTL(const Slice& key, int64_t* timestamp);
virtual Status SetsTTL(const Slice& key, int64_t* timestamp);
virtual Status StringsExpire(const Slice& key, int64_t ttl, std::string&& prefetch_meta = {});
virtual Status HashesExpire(const Slice& key, int64_t ttl, std::string&& prefetch_meta = {});
virtual Status ListsExpire(const Slice& key, int64_t ttl, std::string&& prefetch_meta = {});
virtual Status ZsetsExpire(const Slice& key, int64_t ttl, std::string&& prefetch_meta = {});
virtual Status SetsExpire(const Slice& key, int64_t ttl, std::string&& prefetch_meta = {});

virtual Status StringsDel(const Slice& key, std::string&& prefetch_meta = {});
virtual Status HashesDel(const Slice& key, std::string&& prefetch_meta = {});
virtual Status ListsDel(const Slice& key, std::string&& prefetch_meta = {});
virtual Status ZsetsDel(const Slice& key, std::string&& prefetch_meta = {});
virtual Status SetsDel(const Slice& key, std::string&& prefetch_meta = {});
virtual Status StreamsDel(const Slice& key, std::string&& prefetch_meta = {});

virtual Status StringsExpireat(const Slice& key, int64_t timestamp, std::string&& prefetch_meta = {});
virtual Status HashesExpireat(const Slice& key, int64_t timestamp, std::string&& prefetch_meta = {});
virtual Status ListsExpireat(const Slice& key, int64_t timestamp, std::string&& prefetch_meta = {});
virtual Status SetsExpireat(const Slice& key, int64_t timestamp, std::string&& prefetch_meta = {});
virtual Status ZsetsExpireat(const Slice& key, int64_t timestamp, std::string&& prefetch_meta = {});

virtual Status StringsPersist(const Slice& key, std::string&& prefetch_meta = {});
virtual Status HashesPersist(const Slice& key, std::string&& prefetch_meta = {});
virtual Status ListsPersist(const Slice& key, std::string&& prefetch_meta = {});
virtual Status ZsetsPersist(const Slice& key, std::string&& prefetch_meta = {});
virtual Status SetsPersist(const Slice& key, std::string&& prefetch_meta = {});

virtual Status StringsTTL(const Slice& key, int64_t* timestamp, std::string&& prefetch_meta = {});
virtual Status HashesTTL(const Slice& key, int64_t* timestamp, std::string&& prefetch_meta = {});
virtual Status ListsTTL(const Slice& key, int64_t* timestamp, std::string&& prefetch_meta = {});
virtual Status ZsetsTTL(const Slice& key, int64_t* timestamp, std::string&& prefetch_meta = {});
virtual Status SetsTTL(const Slice& key, int64_t* timestamp, std::string&& prefetch_meta = {});

// Strings Commands
Status Append(const Slice& key, const Slice& value, int32_t* ret);
Status BitCount(const Slice& key, int64_t start_offset, int64_t end_offset, int32_t* ret, bool have_range);
Status BitOp(BitOpType op, const std::string& dest_key, const std::vector<std::string>& src_keys, std::string &value_to_dest, int64_t* ret);
Status Decrby(const Slice& key, int64_t value, int64_t* ret);
Status Get(const Slice& key, std::string* value);
Status HyperloglogGet(const Slice& key, std::string* value);
Status MGet(const Slice& key, std::string* value);
Status GetWithTTL(const Slice& key, std::string* value, int64_t* ttl);
Status MGetWithTTL(const Slice& key, std::string* value, int64_t* ttl);
Expand All @@ -167,6 +168,7 @@ class Redis {
Status MSet(const std::vector<KeyValue>& kvs);
Status MSetnx(const std::vector<KeyValue>& kvs, int32_t* ret);
Status Set(const Slice& key, const Slice& value);
Status HyperloglogSet(const Slice& key, const Slice& value);
Status Setxx(const Slice& key, const Slice& value, int32_t* ret, int64_t ttl = 0);
Status SetBit(const Slice& key, int64_t offset, int32_t value, int32_t* ret);
Status Setex(const Slice& key, const Slice& value, int64_t ttl);
Expand Down Expand Up @@ -200,7 +202,7 @@ class Redis {
Status HIncrby(const Slice& key, const Slice& field, int64_t value, int64_t* ret);
Status HIncrbyfloat(const Slice& key, const Slice& field, const Slice& by, std::string* new_value);
Status HKeys(const Slice& key, std::vector<std::string>* fields);
Status HLen(const Slice& key, int32_t* ret);
Status HLen(const Slice& key, int32_t* ret, std::string&& prefetch_meta = {});
Status HMGet(const Slice& key, const std::vector<std::string>& fields, std::vector<ValueStatus>* vss);
Status HMSet(const Slice& key, const std::vector<FieldValue>& fvs);
Status HSet(const Slice& key, const Slice& field, const Slice& value, int32_t* res);
Expand Down Expand Up @@ -246,7 +248,7 @@ class Redis {

// Sets Commands
Status SAdd(const Slice& key, const std::vector<std::string>& members, int32_t* ret);
Status SCard(const Slice& key, int32_t* ret);
Status SCard(const Slice& key, int32_t* ret, std::string&& prefetch_meta = {});
Status SDiff(const std::vector<std::string>& keys, std::vector<std::string>* members);
Status SDiffstore(const Slice& destination, const std::vector<std::string>& keys, std::vector<std::string>& value_to_dest, int32_t* ret);
Status SInter(const std::vector<std::string>& keys, std::vector<std::string>* members);
Expand All @@ -269,7 +271,7 @@ class Redis {
Status LIndex(const Slice& key, int64_t index, std::string* element);
Status LInsert(const Slice& key, const BeforeOrAfter& before_or_after, const std::string& pivot,
const std::string& value, int64_t* ret);
Status LLen(const Slice& key, uint64_t* len);
Status LLen(const Slice& key, uint64_t* len, std::string&& prefetch_meta = {});
Status LPop(const Slice& key, int64_t count, std::vector<std::string>* elements);
Status LPush(const Slice& key, const std::vector<std::string>& values, uint64_t* ret);
Status LPushx(const Slice& key, const std::vector<std::string>& values, uint64_t* len);
Expand All @@ -285,7 +287,7 @@ class Redis {

// Zsets Commands
Status ZAdd(const Slice& key, const std::vector<ScoreMember>& score_members, int32_t* ret);
Status ZCard(const Slice& key, int32_t* card);
Status ZCard(const Slice& key, int32_t* card, std::string&& prefetch_meta = {});
Status ZCount(const Slice& key, double min, double max, bool left_close, bool right_close, int32_t* ret);
Status ZIncrby(const Slice& key, const Slice& member, double increment, double* ret);
Status ZRange(const Slice& key, int32_t start, int32_t stop, std::vector<ScoreMember>* score_members);
Expand Down Expand Up @@ -323,7 +325,7 @@ class Redis {
Status XAdd(const Slice& key, const std::string& serialized_message, StreamAddTrimArgs& args);
Status XDel(const Slice& key, const std::vector<streamID>& ids, int32_t& count);
Status XTrim(const Slice& key, StreamAddTrimArgs& args, int32_t& count);
Status XRange(const Slice& key, const StreamScanArgs& args, std::vector<IdMessage>& id_messages);
Status XRange(const Slice& key, const StreamScanArgs& args, std::vector<IdMessage>& id_messages, std::string&& prefetch_meta = {});
Status XRevrange(const Slice& key, const StreamScanArgs& args, std::vector<IdMessage>& id_messages);
Status XLen(const Slice& key, int32_t& len);
Status XRead(const StreamReadGroupReadArgs& args, std::vector<std::vector<IdMessage>>& results,
Expand All @@ -333,7 +335,7 @@ class Redis {
rocksdb::ReadOptions& read_options);
// get and parse the stream meta if found
// @return ok only when the stream meta exists
Status GetStreamMeta(StreamMetaValue& tream_meta, const rocksdb::Slice& key, rocksdb::ReadOptions& read_options);
Status GetStreamMeta(StreamMetaValue& tream_meta, const rocksdb::Slice& key, rocksdb::ReadOptions& read_options, std::string&& prefetch_meta = {});

// Before calling this function, the caller should ensure that the ids are valid
Status DeleteStreamMessages(const rocksdb::Slice& key, const StreamMetaValue& stream_meta,
Expand Down
Loading

0 comments on commit ea811d3

Please sign in to comment.