Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: floyd supports "Within the same Redis database, a single key name can only have one type of data structure" #2609

Merged
merged 16 commits into from
May 20, 2024
Merged
2 changes: 1 addition & 1 deletion .github/workflows/pika.yml
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ jobs:
sh integrate_test.sh

build_on_macos:
runs-on: macos-latest
runs-on: macos-12

steps:
- uses: actions/checkout@v3
Expand Down
10 changes: 3 additions & 7 deletions include/pika_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,8 @@ class CompactCmd : public Cmd {
private:
void DoInitial() override;
void Clear() override {
struct_type_.clear();
compact_dbs_.clear();
}
std::string struct_type_;
std::set<std::string> compact_dbs_;
};

Expand All @@ -127,12 +125,10 @@ class CompactRangeCmd : public Cmd {
private:
void DoInitial() override;
void Clear() override {
struct_type_.clear();
compact_dbs_.clear();
start_key_.clear();
end_key_.clear();
}
std::string struct_type_;
std::set<std::string> compact_dbs_;
std::string start_key_;
std::string end_key_;
Expand Down Expand Up @@ -424,9 +420,9 @@ class ScandbCmd : public Cmd {
Cmd* Clone() override { return new ScandbCmd(*this); }

private:
storage::DataType type_ = storage::kAll;
storage::DataType type_ = storage::DataType::kAll;
void DoInitial() override;
void Clear() override { type_ = storage::kAll; }
void Clear() override { type_ = storage::DataType::kAll; }
};

class SlowlogCmd : public Cmd {
Expand Down Expand Up @@ -473,7 +469,7 @@ class PKPatternMatchDelCmd : public Cmd {
Cmd* Clone() override { return new PKPatternMatchDelCmd(*this); }

private:
storage::DataType type_ = storage::kAll;
storage::DataType type_ = storage::DataType::kAll;
std::string pattern_;
void DoInitial() override;
};
Expand Down
2 changes: 1 addition & 1 deletion include/pika_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class PikaCache : public pstd::noncopyable, public std::enable_shared_from_this<

rocksdb::Status Init(uint32_t cache_num, cache::CacheConfig *cache_cfg);
rocksdb::Status Reset(uint32_t cache_num, cache::CacheConfig *cache_cfg = nullptr);
std::map<storage::DataType, int64_t> TTL(std::string &key, std::map<storage::DataType, rocksdb::Status>* type_status);
int64_t TTL(std::string &key);
void ResetConfig(cache::CacheConfig *cache_cfg);
void Destroy(void);
void SetCacheStatus(int status);
Expand Down
12 changes: 11 additions & 1 deletion include/pika_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ const std::string kCmdNameTtl = "ttl";
const std::string kCmdNamePttl = "pttl";
const std::string kCmdNamePersist = "persist";
const std::string kCmdNameType = "type";
const std::string kCmdNamePType = "ptype";
const std::string kCmdNameScan = "scan";
const std::string kCmdNameScanx = "scanx";
const std::string kCmdNamePKSetexAt = "pksetexat";
Expand Down Expand Up @@ -248,6 +247,12 @@ const std::string kCmdNameXInfo = "xinfo";

const std::string kClusterPrefix = "pkcluster";

/*
* If a type holds a key, a new data structure
* that uses the key will use this error
*/
constexpr const char* ErrTypeMessage = "Invalid argument: WRONGTYPE";

using PikaCmdArgsType = net::RedisCmdArgsType;
static const int RAW_ARGS_LEN = 1024 * 1024;

Expand Down Expand Up @@ -326,6 +331,7 @@ class CmdRes {
kInvalidTransaction,
kTxnQueued,
kTxnAbort,
kMultiKey
};

CmdRes() = default;
Expand Down Expand Up @@ -420,6 +426,10 @@ class CmdRes {
result.append(message_);
result.append(kNewLine);
break;
case kMultiKey:
result = "-WRONGTYPE Operation against a key holding the wrong kind of value";
result.append(kNewLine);
break;
default:
break;
}
Expand Down
8 changes: 0 additions & 8 deletions include/pika_define.h
Original file line number Diff line number Diff line change
Expand Up @@ -372,14 +372,6 @@ const std::string kDBSyncModule = "document";

const std::string kBgsaveInfoFile = "info";

// prefix of pika cache
const std::string PCacheKeyPrefixK = "K";
const std::string PCacheKeyPrefixH = "H";
const std::string PCacheKeyPrefixS = "S";
const std::string PCacheKeyPrefixZ = "Z";
const std::string PCacheKeyPrefixL = "L";


/*
* cache status
*/
Expand Down
22 changes: 1 addition & 21 deletions include/pika_kv.h
Original file line number Diff line number Diff line change
Expand Up @@ -732,26 +732,6 @@ class TypeCmd : public Cmd {
rocksdb::Status s_;
};

class PTypeCmd : public Cmd {
public:
PTypeCmd(const std::string& name, int arity, uint32_t flag)
: Cmd(name, arity, flag, static_cast<uint32_t>(AclCategory::KEYSPACE)) {}
std::vector<std::string> current_key() const override {
std::vector<std::string> res;
res.push_back(key_);
return res;
}
void Do() override;
void Split(const HintKeys& hint_keys) override {};
void Merge() override {};
Cmd* Clone() override { return new PTypeCmd(*this); }

private:
std::string key_;
void DoInitial() override;
rocksdb::Status s_;
};

class ScanCmd : public Cmd {
public:
ScanCmd(const std::string& name, int arity, uint32_t flag)
Expand Down Expand Up @@ -865,7 +845,7 @@ class PKRScanRangeCmd : public Cmd {
Cmd* Clone() override { return new PKRScanRangeCmd(*this); }

private:
storage::DataType type_ = storage::kAll;
storage::DataType type_ = storage::DataType::kAll;
std::string key_start_;
std::string key_end_;
std::string pattern_ = "*";
Expand Down
1 change: 1 addition & 0 deletions include/pika_migrate_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "pika_client_conn.h"
#include "pika_db.h"
#include "storage/storage.h"
#include "storage/src/base_data_key_format.h"
#include "strings.h"

void WriteDelKeyToBinlog(const std::string& key, const std::shared_ptr<DB>& db);
Expand Down
11 changes: 1 addition & 10 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,21 +51,12 @@ extern std::unique_ptr<PikaConf> g_pika_conf;

enum TaskType {
kCompactAll,
kCompactStrings,
kCompactHashes,
kCompactSets,
kCompactZSets,
kCompactList,
kResetReplState,
kPurgeLog,
kStartKeyScan,
kStopKeyScan,
kBgSave,
kCompactRangeStrings,
kCompactRangeHashes,
kCompactRangeSets,
kCompactRangeZSets,
kCompactRangeList,
kCompactRangeAll,
};

struct TaskArg {
Expand Down
2 changes: 2 additions & 0 deletions include/pika_slot_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "net/include/net_cli.h"
#include "net/include/net_thread.h"
#include "storage/storage.h"
#include "storage/src/base_data_key_format.h"
#include "strings.h"

const std::string SlotKeyPrefix = "_internal:slotkey:4migrate:";
Expand Down Expand Up @@ -56,6 +57,7 @@ class PikaMigrate {
int ParseSKey(const std::string& key, std::string& wbuf_str, const std::shared_ptr<DB>& db);
int ParseHKey(const std::string& key, std::string& wbuf_str, const std::shared_ptr<DB>& db);
int ParseLKey(const std::string& key, std::string& wbuf_str, const std::shared_ptr<DB>& db);
int ParseMKey(const std::string& key, std::string& wbuf_str, const std::shared_ptr<DB>& db);
bool SetTTL(const std::string& key, std::string& wbuf_str, int64_t ttl);
};

Expand Down
8 changes: 4 additions & 4 deletions src/cache/include/cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,14 @@ class RedisCache {
Status HExists(std::string& key, std::string &field);
Status HIncrby(std::string& key, std::string &field, int64_t value);
Status HIncrbyfloat(std::string& key, std::string &field, double value);
Status HLen(std::string& key, uint64_t *len);
Status HLen(const std::string& key, uint64_t *len);
Status HStrlen(std::string& key, std::string &field, uint64_t *len);

// List Commands
Status LIndex(std::string& key, int64_t index, std::string *element);
Status LInsert(std::string& key, storage::BeforeOrAfter &before_or_after,
std::string &pivot, std::string &value);
Status LLen(std::string& key, uint64_t *len);
Status LLen(const std::string& key, uint64_t *len);
Status LPop(std::string& key, std::string *element);
Status LPush(std::string& key, std::vector<std::string> &values);
Status LPushx(std::string& key, std::vector<std::string> &values);
Expand All @@ -108,15 +108,15 @@ class RedisCache {

// Set Commands
Status SAdd(std::string& key, std::vector<std::string> &members);
Status SCard(std::string& key, uint64_t *len);
Status SCard(const std::string& key, uint64_t *len);
Status SIsmember(std::string& key, std::string& member);
Status SMembers(std::string& key, std::vector<std::string> *members);
Status SRem(std::string& key, std::vector<std::string> &members);
Status SRandmember(std::string& key, int64_t count, std::vector<std::string> *members);

// Zset Commands
Status ZAdd(std::string& key, std::vector<storage::ScoreMember> &score_members);
Status ZCard(std::string& key, uint64_t *len);
Status ZCard(const std::string& key, uint64_t *len);
Status ZCount(std::string& key, std::string &min, std::string &max, uint64_t *len);
Status ZIncrby(std::string& key, std::string& member, double increment);
Status ZRange(std::string& key,
Expand Down
2 changes: 1 addition & 1 deletion src/cache/src/hash.cc
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ Status RedisCache::HIncrbyfloat(std::string& key, std::string &field, double val
return Status::OK();
}

Status RedisCache::HLen(std::string& key, uint64_t *len) {
Status RedisCache::HLen(const std::string& key, uint64_t *len) {
robj *kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size()));
DEFER {
DecrObjectsRefCount(kobj);
Expand Down
2 changes: 1 addition & 1 deletion src/cache/src/list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ Status RedisCache::LInsert(std::string& key, storage::BeforeOrAfter &before_or_a
return Status::OK();
}

Status RedisCache::LLen(std::string& key, uint64_t *len) {
Status RedisCache::LLen(const std::string& key, uint64_t *len) {
robj *kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size()));
DEFER {
DecrObjectsRefCount(kobj);
Expand Down
2 changes: 1 addition & 1 deletion src/cache/src/set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ Status RedisCache::SAdd(std::string& key, std::vector<std::string> &members) {
return Status::OK();
}

Status RedisCache::SCard(std::string& key, uint64_t *len) {
Status RedisCache::SCard(const std::string& key, uint64_t *len) {
robj *kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size()));
DEFER {
DecrObjectsRefCount(kobj);
Expand Down
2 changes: 1 addition & 1 deletion src/cache/src/zset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ Status RedisCache::ZAdd(std::string& key, std::vector<storage::ScoreMember> &sco
return Status::OK();
}

Status RedisCache::ZCard(std::string& key, uint64_t *len) {
Status RedisCache::ZCard(const std::string& key, uint64_t *len) {
robj *kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size()));
DEFER {
DecrObjectsRefCount(kobj);
Expand Down
73 changes: 15 additions & 58 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -344,12 +344,8 @@ void CompactCmd::DoInitial() {
}

if (argv_.size() == 1) {
struct_type_ = "all";
compact_dbs_ = g_pika_server->GetAllDBName();
} else if (argv_.size() == 2) {
struct_type_ = argv_[1];
compact_dbs_ = g_pika_server->GetAllDBName();
} else if (argv_.size() == 3) {
std::vector<std::string> dbs;
pstd::StringSplit(argv_[1], COMMA, dbs);
for (const auto& db : dbs) {
Expand All @@ -360,27 +356,16 @@ void CompactCmd::DoInitial() {
compact_dbs_.insert(db);
}
}
struct_type_ = argv_[2];
}
}

/*
* Because meta-CF stores the meta information of all data structures,
* the compact operation can only operate on all data types without
* specifying data types
*/
void CompactCmd::Do() {
if (strcasecmp(struct_type_.data(), "all") == 0) {
g_pika_server->DoSameThingSpecificDB(compact_dbs_, {TaskType::kCompactAll});
} else if (strcasecmp(struct_type_.data(), "string") == 0) {
g_pika_server->DoSameThingSpecificDB(compact_dbs_, {TaskType::kCompactStrings});
} else if (strcasecmp(struct_type_.data(), "hash") == 0) {
g_pika_server->DoSameThingSpecificDB(compact_dbs_, {TaskType::kCompactHashes});
} else if (strcasecmp(struct_type_.data(), "set") == 0) {
g_pika_server->DoSameThingSpecificDB(compact_dbs_, {TaskType::kCompactSets});
} else if (strcasecmp(struct_type_.data(), "zset") == 0) {
g_pika_server->DoSameThingSpecificDB(compact_dbs_, {TaskType::kCompactZSets});
} else if (strcasecmp(struct_type_.data(), "list") == 0) {
g_pika_server->DoSameThingSpecificDB(compact_dbs_, {TaskType::kCompactList});
} else {
res_.SetRes(CmdRes::kInvalidDbType, struct_type_);
return;
}
g_pika_server->DoSameThingSpecificDB(compact_dbs_, {TaskType::kCompactAll});
LogCommand();
res_.SetRes(CmdRes::kOk);
}
Expand All @@ -406,26 +391,12 @@ void CompactRangeCmd::DoInitial() {
compact_dbs_.insert(db);
}
}
struct_type_ = argv_[2];
start_key_ = argv_[3];
end_key_ = argv_[4];
start_key_ = argv_[2];
end_key_ = argv_[3];
}

void CompactRangeCmd::Do() {
if (strcasecmp(struct_type_.data(), "string") == 0) {
g_pika_server->DoSameThingSpecificDB(compact_dbs_, {TaskType::kCompactRangeStrings, {start_key_, end_key_}});
} else if (strcasecmp(struct_type_.data(), "hash") == 0) {
g_pika_server->DoSameThingSpecificDB(compact_dbs_, {TaskType::kCompactRangeHashes, {start_key_, end_key_}});
} else if (strcasecmp(struct_type_.data(), "set") == 0) {
g_pika_server->DoSameThingSpecificDB(compact_dbs_, {TaskType::kCompactRangeSets, {start_key_, end_key_}});
} else if (strcasecmp(struct_type_.data(), "zset") == 0) {
g_pika_server->DoSameThingSpecificDB(compact_dbs_, {TaskType::kCompactRangeZSets, {start_key_, end_key_}});
} else if (strcasecmp(struct_type_.data(), "list") == 0) {
g_pika_server->DoSameThingSpecificDB(compact_dbs_, {TaskType::kCompactRangeList, {start_key_, end_key_}});
} else {
res_.SetRes(CmdRes::kInvalidDbType, struct_type_);
return;
}
g_pika_server->DoSameThingSpecificDB(compact_dbs_, {TaskType::kCompactRangeAll, {start_key_, end_key_}});
LogCommand();
res_.SetRes(CmdRes::kOk);
}
Expand Down Expand Up @@ -2934,18 +2905,18 @@ void ScandbCmd::DoInitial() {
return;
}
if (argv_.size() == 1) {
type_ = storage::kAll;
type_ = storage::DataType::kAll;
} else {
if (strcasecmp(argv_[1].data(), "string") == 0) {
type_ = storage::kStrings;
type_ = storage::DataType::kStrings;
} else if (strcasecmp(argv_[1].data(), "hash") == 0) {
type_ = storage::kHashes;
type_ = storage::DataType::kHashes;
} else if (strcasecmp(argv_[1].data(), "set") == 0) {
type_ = storage::kSets;
type_ = storage::DataType::kSets;
} else if (strcasecmp(argv_[1].data(), "zset") == 0) {
type_ = storage::kZSets;
type_ = storage::DataType::kZSets;
} else if (strcasecmp(argv_[1].data(), "list") == 0) {
type_ = storage::kLists;
type_ = storage::DataType::kLists;
} else {
res_.SetRes(CmdRes::kInvalidDbType);
}
Expand Down Expand Up @@ -3027,20 +2998,6 @@ void PKPatternMatchDelCmd::DoInitial() {
return;
}
pattern_ = argv_[1];
if (strcasecmp(argv_[2].data(), "set") == 0) {
type_ = storage::kSets;
} else if (strcasecmp(argv_[2].data(), "list") == 0) {
type_ = storage::kLists;
} else if (strcasecmp(argv_[2].data(), "string") == 0) {
type_ = storage::kStrings;
} else if (strcasecmp(argv_[2].data(), "zset") == 0) {
type_ = storage::kZSets;
} else if (strcasecmp(argv_[2].data(), "hash") == 0) {
type_ = storage::kHashes;
} else {
res_.SetRes(CmdRes::kInvalidDbType, kCmdNamePKPatternMatchDel);
return;
}
}

void PKPatternMatchDelCmd::Do() {
Expand Down
Loading
Loading