Skip to content

Commit

Permalink
Optimize ZUNION[STORE] command by removing unnecessary accumulator di…
Browse files Browse the repository at this point in the history
…ct (valkey-io#829)

In the past implementation of `ZUNION` and `ZUNIONSTORE` commands, we
first create a temporary dict called `accumulator`. After adding all
member-score mappings to `accumulator`, we still need to convert
`accumulator` back to the final dict `dstzset->dict`. However, we can
directly use `dstzset->dict` to avoid the additional copy operation.

This PR removes the `accumulator` dict and directly uses` dstzset->dict
`to store the member-score mappings.

- **Test**
First, I added 300 unique elements to two sorted sets called
'zunion_test1' and 'zunion_test2'. Then, I tested `zunion` and
`zunionstore` on these two sorted sets. The test results shown below
indicate that the performance of both zunion and zunionstore improved
about 31%.

### ZUNION
#### unstable
```
./valkey-benchmark -P 10 -n 100000 zunion 2 zunion_test1 zunion_test2

Summary:
  throughput summary: 2713.41 requests per second
  latency summary (msec):
          avg       min       p50       p95       p99       max
      146.252     3.464   153.343   182.015   184.959   192.895
```
#### This PR
```
./valkey-benchmark -P 10 -n 100000 zunion 2 zunion_test1 zunion_test2

Summary:
  throughput summary: 3543.84 requests per second
  latency summary (msec):
          avg       min       p50       p95       p99       max
      108.259     2.984   114.239   141.695   145.151   160.255
```
### ZUNIONSTORE
#### unstable
```
./valkey-benchmark -P 10 -n 100000 zunionstore out 2 zunion_test1 zunion_test2

Summary:
  throughput summary: 3168.07 requests per second
  latency summary (msec):
          avg       min       p50       p95       p99       max
      157.511     3.368   183.167   189.311   193.535   231.679
```
#### This PR
```
./valkey-benchmark -P 10 -n 100000 zunionstore out 2 zunion_test1 zunion_test2

Summary:
  throughput summary: 4144.73 requests per second
  latency summary (msec):
          avg       min       p50       p95       p99       max
      120.374     2.648   141.823   149.119   153.855   183.167
```

---------

Signed-off-by: RayCao <[email protected]>
Signed-off-by: zisong.cw <[email protected]>
Signed-off-by: mwish <[email protected]>
  • Loading branch information
RayaCoo authored and mapleFU committed Aug 22, 2024
1 parent 168f467 commit b0c2cc8
Showing 1 changed file with 5 additions and 21 deletions.
26 changes: 5 additions & 21 deletions src/t_zset.c
Original file line number Diff line number Diff line change
Expand Up @@ -2529,15 +2529,6 @@ static void zdiff(zsetopsrc *src, long setnum, zset *dstzset, size_t *maxelelen,
}
}

dictType setAccumulatorDictType = {
dictSdsHash, /* hash function */
NULL, /* key dup */
dictSdsKeyCompare, /* key compare */
NULL, /* key destructor */
NULL, /* val destructor */
NULL /* allow to expand */
};

/* The zunionInterDiffGenericCommand() function is called in order to implement the
* following commands: ZUNION, ZINTER, ZDIFF, ZUNIONSTORE, ZINTERSTORE, ZDIFFSTORE,
* ZINTERCARD.
Expand Down Expand Up @@ -2724,15 +2715,14 @@ void zunionInterDiffGenericCommand(client *c, robj *dstkey, int numkeysIndex, in
zuiClearIterator(&src[0]);
}
} else if (op == SET_OP_UNION) {
dict *accumulator = dictCreate(&setAccumulatorDictType);
dictIterator *di;
dictEntry *de, *existing;
double score;

if (setnum) {
/* Our union is at least as large as the largest set.
* Resize the dictionary ASAP to avoid useless rehashing. */
dictExpand(accumulator, zuiLength(&src[setnum - 1]));
dictExpand(dstzset->dict, zuiLength(&src[setnum - 1]));
}

/* Step 1: Create a dictionary of elements -> aggregated-scores
Expand All @@ -2747,7 +2737,7 @@ void zunionInterDiffGenericCommand(client *c, robj *dstkey, int numkeysIndex, in
if (isnan(score)) score = 0;

/* Search for this element in the accumulating dictionary. */
de = dictAddRaw(accumulator, zuiSdsFromValue(&zval), &existing);
de = dictAddRaw(dstzset->dict, zuiSdsFromValue(&zval), &existing);
/* If we don't have it, we need to create a new entry. */
if (!existing) {
tmp = zuiNewSdsFromValue(&zval);
Expand All @@ -2757,7 +2747,7 @@ void zunionInterDiffGenericCommand(client *c, robj *dstkey, int numkeysIndex, in
totelelen += sdslen(tmp);
if (sdslen(tmp) > maxelelen) maxelelen = sdslen(tmp);
/* Update the element with its initial score. */
dictSetKey(accumulator, de, tmp);
dictSetKey(dstzset->dict, de, tmp);
dictSetDoubleVal(de, score);
} else {
/* Update the score with the score of the new instance
Expand All @@ -2774,21 +2764,15 @@ void zunionInterDiffGenericCommand(client *c, robj *dstkey, int numkeysIndex, in
}

/* Step 2: convert the dictionary into the final sorted set. */
di = dictGetIterator(accumulator);

/* We now are aware of the final size of the resulting sorted set,
* let's resize the dictionary embedded inside the sorted set to the
* right size, in order to save rehashing time. */
dictExpand(dstzset->dict, dictSize(accumulator));
di = dictGetIterator(dstzset->dict);

while ((de = dictNext(di)) != NULL) {
sds ele = dictGetKey(de);
score = dictGetDoubleVal(de);
znode = zslInsert(dstzset->zsl, score, ele);
dictAdd(dstzset->dict, ele, &znode->score);
dictSetVal(dstzset->dict, de, &znode->score);
}
dictReleaseIterator(di);
dictRelease(accumulator);
} else if (op == SET_OP_DIFF) {
zdiff(src, setnum, dstzset, &maxelelen, &totelelen);
} else {
Expand Down

0 comments on commit b0c2cc8

Please sign in to comment.