Skip to content

Commit

Permalink
feat:zmax && zmin (#2966)
Browse files Browse the repository at this point in the history
* zmax && zmin

* zopmax && zpopmin

---------

Co-authored-by: chejinge <[email protected]>
  • Loading branch information
chejinge and brother-jin authored Dec 11, 2024
1 parent 713e6fb commit 0726525
Show file tree
Hide file tree
Showing 8 changed files with 178 additions and 4 deletions.
9 changes: 7 additions & 2 deletions include/pika_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,12 +166,17 @@ class PikaCache : public pstd::noncopyable, public std::enable_shared_from_this<
const std::shared_ptr<DB>& db);
rocksdb::Status ZRevrangebylex(std::string& key, std::string& min, std::string& max, std::vector<std::string>* members,
const std::shared_ptr<DB>& db);
rocksdb::Status ZRevrank(std::string& key, std::string& member, int64_t *rank, const std::shared_ptr<DB>& db);
rocksdb::Status ZRevrank(std::string& key, std::string& member, int64_t* rank, const std::shared_ptr<DB>& db);
rocksdb::Status ZScore(std::string& key, std::string& member, double* score, const std::shared_ptr<DB>& db);
rocksdb::Status ZRangebylex(std::string& key, std::string& min, std::string& max, std::vector<std::string>* members, const std::shared_ptr<DB>& db);
rocksdb::Status ZRangebylex(std::string& key, std::string& min, std::string& max, std::vector<std::string>* members,
const std::shared_ptr<DB>& db);
rocksdb::Status ZLexcount(std::string& key, std::string& min, std::string& max, uint64_t* len,
const std::shared_ptr<DB>& db);
rocksdb::Status ZRemrangebylex(std::string& key, std::string& min, std::string& max, const std::shared_ptr<DB>& db);
rocksdb::Status ZPopMin(std::string& key, int64_t count, std::vector<storage::ScoreMember>* score_members,
const std::shared_ptr<DB>& db);
rocksdb::Status ZPopMax(std::string& key, int64_t count, std::vector<storage::ScoreMember>* score_members,
const std::shared_ptr<DB>& db);

// Bit Commands
rocksdb::Status SetBit(std::string& key, size_t offset, int64_t value);
Expand Down
4 changes: 4 additions & 0 deletions include/pika_zset.h
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,8 @@ class ZPopmaxCmd : public Cmd {
void Do() override;
void Split(const HintKeys& hint_keys) override {};
void Merge() override {};
void DoThroughDB() override;
void DoUpdateCache() override;
Cmd* Clone() override { return new ZPopmaxCmd(*this); }

private:
Expand All @@ -623,6 +625,8 @@ class ZPopminCmd : public Cmd {
void Do() override;
void Split(const HintKeys& hint_keys) override {};
void Merge() override {};
void DoThroughDB() override;
void DoUpdateCache() override;
Cmd* Clone() override { return new ZPopminCmd(*this); }

private:
Expand Down
2 changes: 2 additions & 0 deletions src/cache/include/cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ class RedisCache {
std::vector<std::string> *members);
Status ZLexcount(std::string& key, std::string &min, std::string &max, uint64_t *len);
Status ZRemrangebylex(std::string& key, std::string &min, std::string &max);
Status ZPopMin(std::string& key, int64_t count, std::vector<storage::ScoreMember>* score_members);
Status ZPopMax(std::string& key, int64_t count, std::vector<storage::ScoreMember>* score_members);

// Bit Commands
Status SetBit(std::string& key, size_t offset, int64_t value);
Expand Down
78 changes: 78 additions & 0 deletions src/cache/src/zset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -405,5 +405,83 @@ Status RedisCache::ZRemrangebylex(std::string& key, std::string &min, std::strin
return Status::OK();
}


Status RedisCache::ZPopMin(std::string& key, int64_t count, std::vector<storage::ScoreMember>* score_members) {
zitem* items = nullptr;
unsigned long items_size = 0;
robj* kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size()));
DEFER {
DecrObjectsRefCount(kobj);
};

int ret = RcZrange(cache_, kobj, 0, -1, &items, &items_size);
if (C_OK != ret) {
if (REDIS_KEY_NOT_EXIST == ret) {
return Status::NotFound("key not in cache");
}
return Status::Corruption("RcZrange failed");
}

unsigned long to_return = std::min(static_cast<unsigned long>(count), items_size);
for (unsigned long i = 0; i < to_return; ++i) {
storage::ScoreMember sm;
sm.score = items[i].score;
sm.member.assign(items[i].member, sdslen(items[i].member));
score_members->push_back(sm);
}

robj** members_obj = (robj**)zcallocate(sizeof(robj*) * items_size);
for (unsigned long i = 0; i < items_size; ++i) {
members_obj[i] = createObject(OBJ_STRING, sdsnewlen(items[i].member, sdslen(items[i].member)));
}
DEFER {
FreeObjectList(members_obj, items_size);
};

RcZRem(cache_, kobj, members_obj, to_return);

FreeZitemList(items, items_size);
return Status::OK();
}

Status RedisCache::ZPopMax(std::string& key, int64_t count, std::vector<storage::ScoreMember>* score_members) {
zitem* items = nullptr;
unsigned long items_size = 0;
robj* kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size()));
DEFER {
DecrObjectsRefCount(kobj);
};

int ret = RcZrange(cache_, kobj, 0, -1, &items, &items_size);
if (C_OK != ret) {
if (REDIS_KEY_NOT_EXIST == ret) {
return Status::NotFound("key not in cache");
}
return Status::Corruption("RcZrange failed");
}

unsigned long to_return = std::min(static_cast<unsigned long>(count), items_size);
for (unsigned long i = items_size - to_return; i < items_size; ++i) {
storage::ScoreMember sm;
sm.score = items[i].score;
sm.member.assign(items[i].member, sdslen(items[i].member));
score_members->push_back(sm);
}

robj** members_obj = (robj**)zcallocate(sizeof(robj*) * items_size);
for (unsigned long i = items_size - 1; i >= 0; --i) {
members_obj[items_size - 1 - i] = createObject(OBJ_STRING, sdsnewlen(items[i].member, sdslen(items[i].member)));
}

DEFER {
FreeObjectList(members_obj, items_size);
};

RcZRem(cache_, kobj, members_obj, to_return);

FreeZitemList(items, items_size);
return Status::OK();
}

} // namespace cache
/* EOF */
31 changes: 31 additions & 0 deletions src/pika_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1465,6 +1465,37 @@ Status PikaCache::ZRemrangebylex(std::string& key, std::string &min, std::string
}
}

Status PikaCache::ZPopMin(std::string &key, int64_t count, std::vector<storage::ScoreMember> *score_members,
const std::shared_ptr<DB> &db) {
int cache_index = CacheIndex(key);
std::lock_guard lm(*cache_mutexs_[cache_index]);

auto cache_obj = caches_[cache_index];
Status s;

if (cache_obj->Exists(key)) {
return cache_obj->ZPopMin(key, count, score_members);
} else {
return Status::NotFound("key not in cache");
}
}

Status PikaCache::ZPopMax(std::string &key, int64_t count, std::vector<storage::ScoreMember> *score_members,
const std::shared_ptr<DB> &db) {
int cache_index = CacheIndex(key);
std::lock_guard lm(*cache_mutexs_[cache_index]);

auto cache_obj = caches_[cache_index];
Status s;

if (cache_obj->Exists(key)) {
return cache_obj->ZPopMax(key, count, score_members);
} else {
return Status::NotFound("key not in cache");
}
}


/*-----------------------------------------------------------------------------
* Bit Commands
*----------------------------------------------------------------------------*/
Expand Down
4 changes: 2 additions & 2 deletions src/pika_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -600,11 +600,11 @@ void InitCmdTable(CmdTable* cmd_table) {
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameZRemrangebylex, std::move(zremrangebylexptr)));
////ZPopmax
std::unique_ptr<Cmd> zpopmaxptr = std::make_unique<ZPopmaxCmd>(
kCmdNameZPopmax, -2, kCmdFlagsWrite | kCmdFlagsZset | kCmdFlagsFast);
kCmdNameZPopmax, -2, kCmdFlagsWrite | kCmdFlagsZset | kCmdFlagsFast | kCmdFlagsDoThroughDB | kCmdFlagsUpdateCache);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameZPopmax, std::move(zpopmaxptr)));
////ZPopmin
std::unique_ptr<Cmd> zpopminptr = std::make_unique<ZPopminCmd>(
kCmdNameZPopmin, -2, kCmdFlagsWrite | kCmdFlagsZset | kCmdFlagsFast);
kCmdNameZPopmin, -2, kCmdFlagsWrite | kCmdFlagsZset | kCmdFlagsFast | kCmdFlagsDoThroughDB | kCmdFlagsUpdateCache);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameZPopmin, std::move(zpopminptr)));

// Set
Expand Down
22 changes: 22 additions & 0 deletions src/pika_zset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1507,6 +1507,17 @@ void ZPopmaxCmd::Do() {
}
}

void ZPopmaxCmd::DoThroughDB(){
Do();
}

void ZPopmaxCmd::DoUpdateCache(){
std::vector<storage::ScoreMember> score_members;
if(s_.ok() || s_.IsNotFound()){
db_->cache()->ZPopMax(key_, count_, &score_members, db_);
}
}

void ZPopminCmd::DoInitial() {
if (!CheckArg(argv_.size())) {
res_.SetRes(CmdRes::kWrongNum, kCmdNameZPopmin);
Expand All @@ -1523,6 +1534,17 @@ void ZPopminCmd::DoInitial() {
}
}

void ZPopminCmd::DoThroughDB(){
Do();
}

void ZPopminCmd::DoUpdateCache(){
std::vector<storage::ScoreMember> score_members;
if(s_.ok() || s_.IsNotFound()){
db_->cache()->ZPopMin(key_, count_, &score_members, db_);
}
}

void ZPopminCmd::Do() {
std::vector<storage::ScoreMember> score_members;
rocksdb::Status s = db_->storage()->ZPopMin(key_, count_, &score_members);
Expand Down
32 changes: 32 additions & 0 deletions tests/integration/zset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1440,6 +1440,38 @@ var _ = Describe("Zset Commands", func() {
}}))
})

It("should Zpopmin test", func() {
err := client.ZAdd(ctx, "zpopzset1", redis.Z{
Score: 1,
Member: "m1",
}).Err()
Expect(err).NotTo(HaveOccurred())

err = client.ZAdd(ctx, "zpopzset1", redis.Z{
Score: 3,
Member: "m3",
}).Err()
Expect(err).NotTo(HaveOccurred())

err = client.ZAdd(ctx, "zpopzset1", redis.Z{
Score: 4,
Member: "m4",
}).Err()
Expect(err).NotTo(HaveOccurred())

max, err := client.ZPopMax(ctx, "zpopzset1", 1).Result()
Expect(err).NotTo(HaveOccurred())
Expect(max).To(Equal([]redis.Z{{Score: 4, Member: "m4"}}))

min, err := client.ZPopMin(ctx, "zpopzset1", 1).Result()
Expect(err).NotTo(HaveOccurred())
Expect(min).To(Equal([]redis.Z{{Score: 1, Member: "m1"}}))

rangeResult, err := client.ZRange(ctx, "zpopzset1", 0, -1).Result()
Expect(err).NotTo(HaveOccurred())
Expect(rangeResult).To(Equal([]string{"m3"}))
})

It("should ZRemRangeByRank", func() {
err := client.ZAdd(ctx, "zset", redis.Z{Score: 1, Member: "one"}).Err()
Expect(err).NotTo(HaveOccurred())
Expand Down

0 comments on commit 0726525

Please sign in to comment.