Skip to content

Commit

Permalink
feat(rocksdb): Adapt prefix bloom filter to speedup scans by hashkey
Browse files Browse the repository at this point in the history
  • Loading branch information
acelyc111 committed Dec 10, 2019
1 parent 59e54d6 commit 444d223
Show file tree
Hide file tree
Showing 9 changed files with 224 additions and 20 deletions.
2 changes: 1 addition & 1 deletion run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ function run_build()
echo "INFO: start build rocksdb..."
ROCKSDB_BUILD_DIR="$ROOT/rocksdb/build"
ROCKSDB_BUILD_OUTPUT="$ROCKSDB_BUILD_DIR/output"
CMAKE_OPTIONS="-DCMAKE_C_COMPILER=$C_COMPILER -DCMAKE_CXX_COMPILER=$CXX_COMPILER -DWITH_LZ4=ON -DWITH_ZSTD=ON -DWITH_SNAPPY=ON -DWITH_BZ2=OFF -DWITH_TESTS=OFF"
CMAKE_OPTIONS="-DCMAKE_C_COMPILER=$C_COMPILER -DCMAKE_CXX_COMPILER=$CXX_COMPILER -DWITH_LZ4=ON -DWITH_ZSTD=ON -DWITH_SNAPPY=ON -DWITH_BZ2=OFF -DWITH_TESTS=OFF -DCMAKE_CXX_FLAGS=-g"
if [ "$WARNING_ALL" == "YES" ]
then
echo "WARNING_ALL=YES"
Expand Down
4 changes: 2 additions & 2 deletions src/base/pegasus_key_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ void pegasus_generate_next_blob(::dsn::blob &next, const T &hash_key, const T &s
next = buf.range(0, p - (unsigned char *)(buf.data()) + 1);
}

// restore hash_key and sort_key from rocksdb value.
// restore hash_key and sort_key from rocksdb key.
// no data copied.
inline void
pegasus_restore_key(const ::dsn::blob &key, ::dsn::blob &hash_key, ::dsn::blob &sort_key)
Expand All @@ -106,7 +106,7 @@ pegasus_restore_key(const ::dsn::blob &key, ::dsn::blob &hash_key, ::dsn::blob &
}
}

// restore hash_key and sort_key from rocksdb value.
// restore hash_key and sort_key from rocksdb key.
// data is copied into output 'hash_key' and 'sort_key'.
inline void
pegasus_restore_key(const ::dsn::blob &key, std::string &hash_key, std::string &sort_key)
Expand Down
1 change: 1 addition & 0 deletions src/server/config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@
rocksdb_block_cache_capacity = 10737418240
rocksdb_block_cache_num_shard_bits = -1
rocksdb_disable_bloom_filter = false
rocksdb_filter_type = common

checkpoint_reserve_min_count = 2
checkpoint_reserve_time_seconds = 1800
Expand Down
1 change: 1 addition & 0 deletions src/server/config.min.ini
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
[pegasus.server]
perf_counter_cluster_name = onebox
perf_counter_enable_logging = false
rocksdb_filter_type = common

[pegasus.collector]
cluster = onebox
Expand Down
51 changes: 51 additions & 0 deletions src/server/hashkey_transform.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright (c) 2017, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.

#pragma once

#include <rocksdb/slice_transform.h>
#include <rocksdb/slice.h>

#include <dsn/c/api_utilities.h>
#include <dsn/utility/blob.h>

namespace pegasus {
namespace server {

class HashkeyTransform : public rocksdb::SliceTransform
{
public:
HashkeyTransform() = default;

// NOTE: You must change the name if Transform() algorithm changed.
const char *Name() const override { return "pegasus.HashkeyTransform"; }

rocksdb::Slice Transform(const rocksdb::Slice &src) const override
{
// TODO(yingchun): There is a bug in rocksdb 5.9.2, it has been fixed by
// cca141ecf8634a42b5eb548cb0ac3a6b77d783c1, we can remove this judgement after upgrading
// rocksdb.
if (src.size() < 2) {
return src;
}

// hash_key_len is in big endian
uint16_t hash_key_len = be16toh(*(int16_t *)(src.data()));
dassert(src.size() >= 2 + hash_key_len,
"key length must be no less than (2 + hash_key_len)");
return rocksdb::Slice(src.data(), 2 + hash_key_len);
}

bool InDomain(const rocksdb::Slice &src) const override
{
// Empty put keys are not in domain.
return src.size() >= 2;
}

bool InRange(const rocksdb::Slice &dst) const override { return true; }

bool SameResultWhenAppended(const rocksdb::Slice &prefix) const override { return false; }
};
} // namespace server
} // namespace pegasus
57 changes: 51 additions & 6 deletions src/server/pegasus_server_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "base/pegasus_value_schema.h"
#include "base/pegasus_utils.h"
#include "capacity_unit_calculator.h"
#include "hashkey_transform.h"
#include "pegasus_event_listener.h"
#include "pegasus_server_write.h"

Expand Down Expand Up @@ -210,11 +211,25 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
_tbl_opts.block_cache = _block_cache;
}

if (!dsn_config_get_value_bool("pegasus.server",
"rocksdb_disable_bloom_filter",
false,
"rocksdb tbl_opts.filter_policy")) {
// Bloom filter configurations.
bool disable_bloom_filter = dsn_config_get_value_bool(
"pegasus.server", "rocksdb_disable_bloom_filter", false, "Whether to disable bloom filter");
if (!disable_bloom_filter) {
_tbl_opts.filter_policy.reset(rocksdb::NewBloomFilterPolicy(10, false));

std::string filter_type =
dsn_config_get_value_string("pegasus.server",
"rocksdb_filter_type",
"common",
"Bloom filter type, should be either 'common' or 'prefix'");
dassert(filter_type == "common" || filter_type == "prefix",
"[pegasus.server]rocksdb_filter_type should be either 'common' or 'prefix'.");
if (filter_type == "prefix") {
_db_opts.prefix_extractor.reset(new HashkeyTransform());
_db_opts.memtable_prefix_bloom_size_ratio = 0.1;

_rd_opts.prefix_same_as_start = true;
}
}

_db_opts.table_factory.reset(NewBlockBasedTableFactory(_tbl_opts));
Expand Down Expand Up @@ -708,9 +723,10 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req
return;
}

std::unique_ptr<rocksdb::Iterator> it(_db->NewIterator(_rd_opts));
std::unique_ptr<rocksdb::Iterator> it = nullptr;
bool complete = false;
if (!request.reverse) {
it.reset(_db->NewIterator(_rd_opts));
it->Seek(start);
bool first_exclusive = !start_inclusive;
while (count < max_kv_count && size < max_kv_size && it->Valid()) {
Expand Down Expand Up @@ -761,6 +777,18 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req
it->Next();
}
} else { // reverse
rocksdb::ReadOptions rd_opts(_rd_opts);
if (_db_opts.prefix_extractor) {
// TODO(yingchun): Prefix bloom filter is not supported in reverse seek mode (see
// https://github.com/facebook/rocksdb/wiki/Prefix-Seek-API-Changes#limitation for
// more details), and we have to do total order seek on rocksdb which might be worse
// performance. However we consider that reverse scan is a rare use case, and if
// your workload has many reverse scans, you'd better use 'common' bloom filter (by
// set [pegasus.server]rocksdb_filter_type to 'common').
rd_opts.total_order_seek = true;
rd_opts.prefix_same_as_start = false;
}
it.reset(_db->NewIterator(rd_opts));
it->SeekForPrev(stop);
bool first_exclusive = !stop_inclusive;
std::vector<::dsn::apps::key_value> reverse_kvs;
Expand Down Expand Up @@ -1140,6 +1168,17 @@ void pegasus_server_impl::on_get_scanner(const ::dsn::apps::get_scanner_request
return;
}

rocksdb::ReadOptions rd_opts(_rd_opts);
if (_db_opts.prefix_extractor) {
::dsn::blob start_hash_key, tmp;
pegasus_restore_key(request.start_key, start_hash_key, tmp);
if (start_hash_key.size() == 0) {
// hash_key is not passed, only happened when do full scan (scanners got by
// get_unordered_scanners) on a partition, we have to do total order seek on rocksDB.
rd_opts.total_order_seek = true;
rd_opts.prefix_same_as_start = false;
}
}
bool start_inclusive = request.start_inclusive;
bool stop_inclusive = request.stop_inclusive;
rocksdb::Slice start(request.start_key.data(), request.start_key.length());
Expand All @@ -1156,6 +1195,12 @@ void pegasus_server_impl::on_get_scanner(const ::dsn::apps::get_scanner_request
if (prefix_start.compare(start) > 0) {
start = prefix_start;
start_inclusive = true;
// Now 'start' is generated by 'request.hash_key_filter_pattern', it may be not a real
// hashkey, we should not seek this prefix by prefix bloom filter. However, it only
// happend when do full scan (scanners got by get_unordered_scanners), in which case the
// following flags has been updated.
assert(!_db_opts.prefix_extractor || rd_opts.total_order_seek);
assert(!_db_opts.prefix_extractor || !rd_opts.prefix_same_as_start);
}
}

Expand All @@ -1180,7 +1225,7 @@ void pegasus_server_impl::on_get_scanner(const ::dsn::apps::get_scanner_request
return;
}

std::unique_ptr<rocksdb::Iterator> it(_db->NewIterator(_rd_opts));
std::unique_ptr<rocksdb::Iterator> it(_db->NewIterator(rd_opts));
it->Seek(start);
bool complete = false;
bool first_exclusive = !start_inclusive;
Expand Down
60 changes: 60 additions & 0 deletions src/server/test/hashkey_transform_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright (c) 2017, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.

#include "server/hashkey_transform.h"

#include <gtest/gtest.h>
#include <rocksdb/comparator.h>

#include "base/pegasus_key_schema.h"

// User define SliceTransform must obey the 4 rules of ColumnFamilyOptions.prefix_extractor
TEST(HashkeyTransformTest, Basic)
{
pegasus::server::HashkeyTransform prefix_extractor;
const rocksdb::Comparator *comp = rocksdb::BytewiseComparator();

dsn::blob bkey1, bkey2, bkey3, bkey4;
pegasus::pegasus_generate_key(bkey1, std::string("h1"), std::string("s1"));
pegasus::pegasus_generate_key(bkey2, std::string("h2"), std::string("s1"));
pegasus::pegasus_generate_key(bkey3, std::string("h1"), std::string("s2"));
pegasus::pegasus_generate_key(bkey4, std::string("h1"), std::string(""));
rocksdb::Slice skey1(bkey1.data(), bkey1.size());
rocksdb::Slice skey2(bkey2.data(), bkey2.size());
rocksdb::Slice skey3(bkey3.data(), bkey3.size());
rocksdb::Slice skey4(bkey4.data(), bkey4.size());

// 1) key.starts_with(prefix(key))
ASSERT_TRUE(skey1.starts_with(prefix_extractor.Transform(skey1)));
ASSERT_TRUE(skey2.starts_with(prefix_extractor.Transform(skey2)));
ASSERT_TRUE(skey3.starts_with(prefix_extractor.Transform(skey3)));
ASSERT_TRUE(skey4.starts_with(prefix_extractor.Transform(skey4)));

// 2) Compare(prefix(key), key) <= 0.
ASSERT_LT(comp->Compare(prefix_extractor.Transform(skey1), skey1), 0); // h1 < h1s1
ASSERT_LT(comp->Compare(prefix_extractor.Transform(skey2), skey2), 0); // h2 < h2s1
ASSERT_LT(comp->Compare(prefix_extractor.Transform(skey3), skey3), 0); // h1 < h1s2
ASSERT_EQ(comp->Compare(prefix_extractor.Transform(skey4), skey4), 0); // h1 == h1

// 3) If Compare(k1, k2) <= 0, then Compare(prefix(k1), prefix(k2)) <= 0
ASSERT_LT(comp->Compare(skey1, skey2), 0); // h1s1 < h2s1
ASSERT_LT(comp->Compare(prefix_extractor.Transform(skey1), prefix_extractor.Transform(skey2)),
0); // h1 < h2
ASSERT_LT(comp->Compare(skey1, skey3), 0); // h1s1 < h1s2
ASSERT_EQ(comp->Compare(prefix_extractor.Transform(skey1), prefix_extractor.Transform(skey3)),
0); // h1 == h1
ASSERT_GT(comp->Compare(skey1, skey4), 0); // h1s1 > h1
ASSERT_EQ(comp->Compare(prefix_extractor.Transform(skey1), prefix_extractor.Transform(skey4)),
0); // h1 == h1

// 4) prefix(prefix(key)) == prefix(key)
ASSERT_EQ(prefix_extractor.Transform(prefix_extractor.Transform(skey1)),
prefix_extractor.Transform(skey1));
ASSERT_EQ(prefix_extractor.Transform(prefix_extractor.Transform(skey2)),
prefix_extractor.Transform(skey2));
ASSERT_EQ(prefix_extractor.Transform(prefix_extractor.Transform(skey3)),
prefix_extractor.Transform(skey3));
ASSERT_EQ(prefix_extractor.Transform(prefix_extractor.Transform(skey4)),
prefix_extractor.Transform(skey4));
}
Loading

0 comments on commit 444d223

Please sign in to comment.