Skip to content

Commit

Permalink
feat: add rename and renamenx (#299)
Browse files Browse the repository at this point in the history
* feat: add rename and renamenx

---------

Signed-off-by: zztaki <[email protected]>
  • Loading branch information
zztaki authored May 20, 2024
1 parent 2f3b7fe commit cd5e466
Show file tree
Hide file tree
Showing 14 changed files with 656 additions and 90 deletions.
2 changes: 2 additions & 0 deletions src/base_cmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ const std::string kCmdNameExpireat = "expireat";
const std::string kCmdNamePExpireat = "pexpireat";
const std::string kCmdNamePersist = "persist";
const std::string kCmdNameKeys = "keys";
const std::string kCmdNameRename = "rename";
const std::string kCmdNameRenameNX = "renamenx";

// raft cmd
const std::string kCmdNameRaftCluster = "raft.cluster";
Expand Down
41 changes: 41 additions & 0 deletions src/cmd_keys.cc
Original file line number Diff line number Diff line change
Expand Up @@ -295,4 +295,45 @@ void PttlCmd::DoCmd(PClient* client) {
}
}

RenameCmd::RenameCmd(const std::string& name, int16_t arity)
: BaseCmd(name, arity, kCmdFlagsWrite, kAclCategoryWrite | kAclCategoryKeyspace) {}

bool RenameCmd::DoInitial(PClient* client) {
client->SetKey(client->argv_[1]);
return true;
}

void RenameCmd::DoCmd(PClient* client) {
storage::Status s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->Rename(client->Key(), client->argv_[2]);
if (s.ok()) {
client->SetRes(CmdRes::kOK);
} else if (s.IsNotFound()) {
client->SetRes(CmdRes::kNotFound, s.ToString());
} else {
client->SetRes(CmdRes::kErrOther, s.ToString());
}
}

RenameNXCmd::RenameNXCmd(const std::string& name, int16_t arity)
: BaseCmd(name, arity, kCmdFlagsWrite, kAclCategoryWrite | kAclCategoryKeyspace) {}

bool RenameNXCmd::DoInitial(PClient* client) {
client->SetKey(client->argv_[1]);
return true;
}

void RenameNXCmd::DoCmd(PClient* client) {
storage::Status s =
PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->Renamenx(client->Key(), client->argv_[2]);
if (s.ok()) {
client->SetRes(CmdRes::kOK);
} else if (s.IsNotFound()) {
client->SetRes(CmdRes::kNotFound, s.ToString());
} else if (s.IsCorruption()) {
client->AppendInteger(0); // newkey already exists
} else {
client->SetRes(CmdRes::kErrOther, s.ToString());
}
}

} // namespace pikiwidb
22 changes: 22 additions & 0 deletions src/cmd_keys.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,4 +132,26 @@ class PttlCmd : public BaseCmd {
void DoCmd(PClient* client) override;
};

class RenameCmd : public BaseCmd {
public:
RenameCmd(const std::string& name, int16_t arity);

protected:
bool DoInitial(PClient* client) override;

private:
void DoCmd(PClient* client) override;
};

class RenameNXCmd : public BaseCmd {
public:
RenameNXCmd(const std::string& name, int16_t arity);

protected:
bool DoInitial(PClient* client) override;

private:
void DoCmd(PClient* client) override;
};

} // namespace pikiwidb
2 changes: 2 additions & 0 deletions src/cmd_table_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ void CmdTableManager::InitCmdTable() {
ADD_COMMAND(Pttl, 2);
ADD_COMMAND(Persist, 2);
ADD_COMMAND(Keys, 2);
ADD_COMMAND(Rename, 3);
ADD_COMMAND(RenameNX, 3);

// kv
ADD_COMMAND(Get, 2);
Expand Down
11 changes: 11 additions & 0 deletions src/storage/include/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -1057,6 +1057,17 @@ class Storage {

Status Keys(const DataType& data_type, const std::string& pattern, std::vector<std::string>* keys);

// Return OK if Rename successfully,
// NotFound if key doesn't exist,
// otherwise abort.
Status Rename(const std::string& key, const std::string& newkey);

// Return OK if Renamenx successfully,
// NotFound if key doesn't exist,
// Corruption if newkey already exists,
// otherwise abort.
Status Renamenx(const std::string& key, const std::string& newkey);

// Dynamic switch WAL
void DisableWal(const bool is_wal_disable);

Expand Down
12 changes: 12 additions & 0 deletions src/storage/src/redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,18 @@ class Redis {
virtual Status ZsetsTTL(const Slice& key, uint64_t* timestamp);
virtual Status SetsTTL(const Slice& key, uint64_t* timestamp);

virtual Status StringsRename(const Slice& key, Redis* new_inst, const Slice& newkey);
virtual Status HashesRename(const Slice& key, Redis* new_inst, const Slice& newkey);
virtual Status ListsRename(const Slice& key, Redis* new_inst, const Slice& newkey);
virtual Status ZsetsRename(const Slice& key, Redis* new_inst, const Slice& newkey);
virtual Status SetsRename(const Slice& key, Redis* new_inst, const Slice& newkey);

virtual Status StringsRenamenx(const Slice& key, Redis* new_inst, const Slice& newkey);
virtual Status HashesRenamenx(const Slice& key, Redis* new_inst, const Slice& newkey);
virtual Status ListsRenamenx(const Slice& key, Redis* new_inst, const Slice& newkey);
virtual Status ZsetsRenamenx(const Slice& key, Redis* new_inst, const Slice& newkey);
virtual Status SetsRenamenx(const Slice& key, Redis* new_inst, const Slice& newkey);

// Strings Commands
Status Append(const Slice& key, const Slice& value, int32_t* ret);
Status BitCount(const Slice& key, int64_t start_offset, int64_t end_offset, int32_t* ret, bool have_range);
Expand Down
71 changes: 71 additions & 0 deletions src/storage/src/redis_hashes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1225,6 +1225,77 @@ Status Redis::HashesTTL(const Slice& key, uint64_t* timestamp) {
return s;
}

Status Redis::HashesRename(const Slice& key, Redis* new_inst, const Slice& newkey) {
std::string meta_value;
Status s;
uint64_t statistic = 0;
const std::vector<std::string> keys = {key.ToString(), newkey.ToString()};
MultiScopeRecordLock ml(lock_mgr_, keys);

BaseMetaKey base_meta_key(key);
BaseMetaKey base_meta_newkey(newkey);
s = db_->Get(default_read_options_, handles_[kHashesMetaCF], base_meta_key.Encode(), &meta_value);
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
if (parsed_hashes_meta_value.IsStale()) {
return Status::NotFound("Stale");
} else if (parsed_hashes_meta_value.Count() == 0) {
return Status::NotFound();
}
// copy a new hash with newkey
statistic = parsed_hashes_meta_value.Count();
s = new_inst->GetDB()->Put(default_write_options_, handles_[kHashesMetaCF], base_meta_newkey.Encode(), meta_value);
new_inst->UpdateSpecificKeyStatistics(DataType::kHashes, newkey.ToString(), statistic);

// HashesDel key
parsed_hashes_meta_value.InitialMetaValue();
s = db_->Put(default_write_options_, handles_[kHashesMetaCF], base_meta_key.Encode(), meta_value);
UpdateSpecificKeyStatistics(DataType::kHashes, key.ToString(), statistic);
}
return s;
}

Status Redis::HashesRenamenx(const Slice& key, Redis* new_inst, const Slice& newkey) {
std::string meta_value;
Status s;
uint64_t statistic = 0;
const std::vector<std::string> keys = {key.ToString(), newkey.ToString()};
MultiScopeRecordLock ml(lock_mgr_, keys);

BaseMetaKey base_meta_key(key);
BaseMetaKey base_meta_newkey(newkey);
s = db_->Get(default_read_options_, handles_[kHashesMetaCF], base_meta_key.Encode(), &meta_value);
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
if (parsed_hashes_meta_value.IsStale()) {
return Status::NotFound("Stale");
} else if (parsed_hashes_meta_value.Count() == 0) {
return Status::NotFound();
}
// check if newkey exists.
std::string new_meta_value;
s = new_inst->GetDB()->Get(default_read_options_, handles_[kHashesMetaCF], base_meta_newkey.Encode(),
&new_meta_value);
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_new_meta_value(&new_meta_value);
if (!parsed_hashes_new_meta_value.IsStale() && parsed_hashes_new_meta_value.Count() != 0) {
return Status::Corruption(); // newkey already exists.
}
}

// copy a new hash with newkey
statistic = parsed_hashes_meta_value.Count();
s = new_inst->GetDB()->Put(default_write_options_, handles_[kHashesMetaCF], base_meta_newkey.Encode(), meta_value);
new_inst->UpdateSpecificKeyStatistics(DataType::kHashes, newkey.ToString(), statistic);

// HashesDel key
parsed_hashes_meta_value.InitialMetaValue();
s = db_->Put(default_write_options_, handles_[kHashesMetaCF], base_meta_key.Encode(), meta_value);
UpdateSpecificKeyStatistics(DataType::kHashes, key.ToString(), statistic);
}
return s;
}

void Redis::ScanHashes() {
rocksdb::ReadOptions iterator_options;
const rocksdb::Snapshot* snapshot;
Expand Down
69 changes: 69 additions & 0 deletions src/storage/src/redis_lists.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1085,6 +1085,75 @@ Status Redis::ListsTTL(const Slice& key, uint64_t* timestamp) {
return s;
}

Status Redis::ListsRename(const Slice& key, Redis* new_inst, const Slice& newkey) {
std::string meta_value;
uint32_t statistic = 0;
const std::vector<std::string> keys = {key.ToString(), newkey.ToString()};
MultiScopeRecordLock ml(lock_mgr_, keys);

BaseMetaKey base_meta_key(key);
BaseMetaKey base_meta_newkey(newkey);
Status s = db_->Get(default_read_options_, handles_[kListsMetaCF], base_meta_key.Encode(), &meta_value);
if (s.ok()) {
ParsedListsMetaValue parsed_lists_meta_value(&meta_value);
if (parsed_lists_meta_value.IsStale()) {
return Status::NotFound("Stale");
} else if (parsed_lists_meta_value.Count() == 0) {
return Status::NotFound();
}
// copy a new list with newkey
statistic = parsed_lists_meta_value.Count();
s = new_inst->GetDB()->Put(default_write_options_, handles_[kListsMetaCF], base_meta_newkey.Encode(), meta_value);
new_inst->UpdateSpecificKeyStatistics(DataType::kLists, newkey.ToString(), statistic);

// ListsDel key
parsed_lists_meta_value.InitialMetaValue();
s = db_->Put(default_write_options_, handles_[kListsMetaCF], base_meta_key.Encode(), meta_value);
UpdateSpecificKeyStatistics(DataType::kLists, key.ToString(), statistic);
}
return s;
}

Status Redis::ListsRenamenx(const Slice& key, Redis* new_inst, const Slice& newkey) {
std::string meta_value;
uint32_t statistic = 0;
const std::vector<std::string> keys = {key.ToString(), newkey.ToString()};
MultiScopeRecordLock ml(lock_mgr_, keys);

BaseMetaKey base_meta_key(key);
BaseMetaKey base_meta_newkey(newkey);
Status s = db_->Get(default_read_options_, handles_[kListsMetaCF], base_meta_key.Encode(), &meta_value);
if (s.ok()) {
ParsedListsMetaValue parsed_lists_meta_value(&meta_value);
if (parsed_lists_meta_value.IsStale()) {
return Status::NotFound("Stale");
} else if (parsed_lists_meta_value.Count() == 0) {
return Status::NotFound();
}
// check if newkey exists.
std::string new_meta_value;
s = new_inst->GetDB()->Get(default_read_options_, handles_[kListsMetaCF], base_meta_newkey.Encode(),
&new_meta_value);
if (s.ok()) {
ParsedSetsMetaValue parsed_lists_new_meta_value(&new_meta_value);
if (!parsed_lists_new_meta_value.IsStale() && parsed_lists_new_meta_value.Count() != 0) {
return Status::Corruption(); // newkey already exists.
}
}

// copy a new list with newkey
statistic = parsed_lists_meta_value.Count();
s = new_inst->GetDB()->Put(default_write_options_, handles_[kListsMetaCF], base_meta_newkey.Encode(), meta_value);
new_inst->UpdateSpecificKeyStatistics(DataType::kLists, newkey.ToString(), statistic);

// ListsDel key
parsed_lists_meta_value.InitialMetaValue();
s = db_->Put(default_write_options_, handles_[kListsMetaCF], base_meta_key.Encode(), meta_value);
UpdateSpecificKeyStatistics(DataType::kLists, key.ToString(), statistic);
}
return s;
}

void Redis::ScanLists() {
rocksdb::ReadOptions iterator_options;
const rocksdb::Snapshot* snapshot;
Expand Down
70 changes: 70 additions & 0 deletions src/storage/src/redis_sets.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <map>
#include <memory>
#include <random>
#include <vector>

#include <fmt/core.h>

Expand Down Expand Up @@ -1316,6 +1317,75 @@ rocksdb::Status Redis::SetsTTL(const Slice& key, uint64_t* timestamp) {
return s;
}

Status Redis::SetsRename(const Slice& key, Redis* new_inst, const Slice& newkey) {
std::string meta_value;
uint32_t statistic = 0;
const std::vector<std::string> keys = {key.ToString(), newkey.ToString()};
MultiScopeRecordLock ml(lock_mgr_, keys);

BaseMetaKey base_meta_key(key);
BaseMetaKey base_meta_newkey(newkey);
rocksdb::Status s = db_->Get(default_read_options_, handles_[kSetsMetaCF], base_meta_key.Encode(), &meta_value);
if (s.ok()) {
ParsedSetsMetaValue parsed_sets_meta_value(&meta_value);
if (parsed_sets_meta_value.IsStale()) {
return rocksdb::Status::NotFound("Stale");
} else if (parsed_sets_meta_value.Count() == 0) {
return rocksdb::Status::NotFound();
}
// copy a new set with newkey
statistic = parsed_sets_meta_value.Count();
s = new_inst->GetDB()->Put(default_write_options_, handles_[kSetsMetaCF], base_meta_newkey.Encode(), meta_value);
new_inst->UpdateSpecificKeyStatistics(DataType::kSets, newkey.ToString(), statistic);

// SetsDel key
parsed_sets_meta_value.InitialMetaValue();
s = db_->Put(default_write_options_, handles_[kSetsMetaCF], base_meta_key.Encode(), meta_value);
UpdateSpecificKeyStatistics(DataType::kSets, key.ToString(), statistic);
}
return s;
}

Status Redis::SetsRenamenx(const Slice& key, Redis* new_inst, const Slice& newkey) {
std::string meta_value;
uint32_t statistic = 0;
const std::vector<std::string> keys = {key.ToString(), newkey.ToString()};
MultiScopeRecordLock ml(lock_mgr_, keys);

BaseMetaKey base_meta_key(key);
BaseMetaKey base_meta_newkey(newkey);
rocksdb::Status s = db_->Get(default_read_options_, handles_[kSetsMetaCF], base_meta_key.Encode(), &meta_value);
if (s.ok()) {
ParsedSetsMetaValue parsed_sets_meta_value(&meta_value);
if (parsed_sets_meta_value.IsStale()) {
return rocksdb::Status::NotFound("Stale");
} else if (parsed_sets_meta_value.Count() == 0) {
return rocksdb::Status::NotFound();
}
// check if newkey exists.
std::string new_meta_value;
s = new_inst->GetDB()->Get(default_read_options_, handles_[kSetsMetaCF], base_meta_newkey.Encode(),
&new_meta_value);
if (s.ok()) {
ParsedSetsMetaValue parsed_sets_new_meta_value(&new_meta_value);
if (!parsed_sets_new_meta_value.IsStale() && parsed_sets_new_meta_value.Count() != 0) {
return Status::Corruption(); // newkey already exists.
}
}

// copy a new set with newkey
statistic = parsed_sets_meta_value.Count();
s = new_inst->GetDB()->Put(default_write_options_, handles_[kSetsMetaCF], base_meta_newkey.Encode(), meta_value);
new_inst->UpdateSpecificKeyStatistics(DataType::kSets, newkey.ToString(), statistic);

// SetsDel key
parsed_sets_meta_value.InitialMetaValue();
s = db_->Put(default_write_options_, handles_[kSetsMetaCF], base_meta_key.Encode(), meta_value);
UpdateSpecificKeyStatistics(DataType::kSets, key.ToString(), statistic);
}
return s;
}

void Redis::ScanSets() {
rocksdb::ReadOptions iterator_options;
const rocksdb::Snapshot* snapshot;
Expand Down
Loading

0 comments on commit cd5e466

Please sign in to comment.