Skip to content

Commit

Permalink
Merge branch 'master' of github.com:vagetablechicken/pegasus
Browse files Browse the repository at this point in the history
  • Loading branch information
huangwei5 committed Nov 1, 2018
2 parents 703c622 + 7dae1c5 commit 14e078f
Show file tree
Hide file tree
Showing 15 changed files with 356 additions and 56 deletions.
5 changes: 5 additions & 0 deletions src/base/pegasus_const.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,9 @@ const std::string MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_KEY("bottommost_lev
const std::string MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_FORCE("force");
const std::string MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP("skip");

/// default ttl for items in a table. If ttl is not set for
/// * a new writen item, 'default_ttl' will be applied on this item.
/// * an exist item, 'default_ttl' will be applied on this item when it was compacted.
/// <= 0 means no effect
const std::string TABLE_LEVEL_DEFAULT_TTL("default_ttl");
} // namespace pegasus
1 change: 1 addition & 0 deletions src/base/pegasus_const.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,5 @@ extern const std::string MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_KEY;
extern const std::string MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_FORCE;
extern const std::string MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP;

extern const std::string TABLE_LEVEL_DEFAULT_TTL;
} // namespace pegasus
2 changes: 1 addition & 1 deletion src/server/config-server.ini
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ max_concurrent_uploading_file_count = 5
rocksdb_verbose_log = false
rocksdb_write_buffer_size = 10485760
updating_rocksdb_sstsize_interval_seconds = 30
manual_compact_min_interval_seconds = 3600
manual_compact_min_interval_seconds = 30

perf_counter_cluster_name = onebox
perf_counter_update_interval_seconds = 10
Expand Down
78 changes: 59 additions & 19 deletions src/server/key_ttl_compaction_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,36 +18,76 @@ namespace server {
class KeyWithTTLCompactionFilter : public rocksdb::CompactionFilter
{
public:
KeyWithTTLCompactionFilter() : _value_schema_version(0), _enabled(false) {}
virtual bool Filter(int /*level*/,
const rocksdb::Slice &key,
const rocksdb::Slice &existing_value,
std::string *new_value,
bool *value_changed) const override
KeyWithTTLCompactionFilter(uint32_t value_schema_version, uint32_t default_ttl, bool enabled)
: _value_schema_version(value_schema_version), _default_ttl(default_ttl), _enabled(enabled)
{
if (!_enabled.load(std::memory_order_acquire))
}

bool Filter(int /*level*/,
const rocksdb::Slice &key,
const rocksdb::Slice &existing_value,
std::string *new_value,
bool *value_changed) const override
{
if (!_enabled) {
return false;
}

uint32_t expire_ts =
pegasus_extract_expire_ts(_value_schema_version, utils::to_string_view(existing_value));
if (_default_ttl != 0 && expire_ts == 0) {
// should update ttl
dsn::blob user_data;
pegasus_extract_user_data(_value_schema_version,
std::string(existing_value.data(), existing_value.length()),
user_data);
rocksdb::SliceParts sparts = _gen.generate_value(_value_schema_version,
dsn::string_view(user_data),
utils::epoch_now() + _default_ttl);
for (int i = 0; i < sparts.num_parts; i++) {
*new_value += sparts.parts[i].ToString();
}
*value_changed = true;
return false;
return check_if_record_expired(
_value_schema_version, utils::epoch_now(), utils::to_string_view(existing_value));
}
return check_if_ts_expired(utils::epoch_now(), expire_ts);
}
virtual const char *Name() const override { return "KeyWithTTLCompactionFilter"; }
void SetValueSchemaVersion(uint32_t version) { _value_schema_version = version; }
void EnableFilter() { _enabled.store(true, std::memory_order_release); }

const char *Name() const override { return "KeyWithTTLCompactionFilter"; }

private:
uint32_t _value_schema_version;
std::atomic_bool _enabled; // only process filtering when _enabled == true
uint32_t _default_ttl;
bool _enabled; // only process filtering when _enabled == true
mutable pegasus_value_generator _gen;
};

class KeyWithTTLCompactionFilterFactory : public rocksdb::CompactionFilterFactory
{
public:
KeyWithTTLCompactionFilterFactory() {}
virtual std::unique_ptr<rocksdb::CompactionFilter>
KeyWithTTLCompactionFilterFactory() : _value_schema_version(0), _default_ttl(0), _enabled(false)
{
}
std::unique_ptr<rocksdb::CompactionFilter>
CreateCompactionFilter(const rocksdb::CompactionFilter::Context & /*context*/) override
{
return std::unique_ptr<KeyWithTTLCompactionFilter>(new KeyWithTTLCompactionFilter());
return std::unique_ptr<KeyWithTTLCompactionFilter>(new KeyWithTTLCompactionFilter(
_value_schema_version.load(), _default_ttl.load(), _enabled.load()));
}
virtual const char *Name() const override { return "KeyWithTTLCompactionFilterFactory"; }
const char *Name() const override { return "KeyWithTTLCompactionFilterFactory"; }

void SetValueSchemaVersion(uint32_t version)
{
_value_schema_version.store(version, std::memory_order_release);
}
void EnableFilter() { _enabled.store(true, std::memory_order_release); }
void SetDefaultTTL(uint32_t ttl) { _default_ttl.store(ttl, std::memory_order_release); }

private:
std::atomic<uint32_t> _value_schema_version;
std::atomic<uint32_t> _default_ttl;
std::atomic_bool _enabled; // only process filtering when _enabled == true
};
}
} // namespace

} // namespace server
} // namespace pegasus
23 changes: 20 additions & 3 deletions src/server/pegasus_server_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <rocksdb/filter_policy.h>
#include <dsn/utility/utils.h>
#include <dsn/utility/filesystem.h>
#include <dsn/utility/string_conv.h>
#include <dsn/dist/fmt_logging.h>

#include "base/pegasus_key_schema.h"
Expand Down Expand Up @@ -236,6 +237,8 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
}

_db_opts.table_factory.reset(NewBlockBasedTableFactory(tbl_opts));
_key_ttl_compaction_filter_factory = std::make_shared<KeyWithTTLCompactionFilterFactory>();
_db_opts.compaction_filter_factory = _key_ttl_compaction_filter_factory;

_db_opts.listeners.emplace_back(new pegasus_event_listener());

Expand Down Expand Up @@ -1375,7 +1378,6 @@ ::dsn::error_code pegasus_server_impl::start(int argc, char **argv)
rocksdb::Options opts = _db_opts;
opts.create_if_missing = true;
opts.error_if_exists = false;
opts.compaction_filter = &_key_ttl_compaction_filter;
opts.default_value_schema_version = PEGASUS_VALUE_SCHEMA_MAX_VERSION;

// parse envs for parameters
Expand Down Expand Up @@ -1478,8 +1480,8 @@ ::dsn::error_code pegasus_server_impl::start(int argc, char **argv)
}

// only enable filter after correct value_schema_version set
_key_ttl_compaction_filter.SetValueSchemaVersion(_value_schema_version);
_key_ttl_compaction_filter.EnableFilter();
_key_ttl_compaction_filter_factory->SetValueSchemaVersion(_value_schema_version);
_key_ttl_compaction_filter_factory->EnableFilter();

// update LastManualCompactFinishTime
_manual_compact_svc.init_last_finish_time_ms(_db->GetLastManualCompactFinishTime());
Expand Down Expand Up @@ -2252,6 +2254,7 @@ pegasus_server_impl::get_restore_dir_from_env(const std::map<std::string, std::s
void pegasus_server_impl::update_app_envs(const std::map<std::string, std::string> &envs)
{
update_usage_scenario(envs);
update_default_ttl(envs);
_manual_compact_svc.start_manual_compact_if_needed(envs);
}

Expand Down Expand Up @@ -2285,6 +2288,20 @@ void pegasus_server_impl::update_usage_scenario(const std::map<std::string, std:
}
}

void pegasus_server_impl::update_default_ttl(const std::map<std::string, std::string> &envs)
{
auto find = envs.find(TABLE_LEVEL_DEFAULT_TTL);
if (find != envs.end()) {
int32_t ttl = 0;
if (!dsn::buf2int32(find->second, ttl) || ttl < 0) {
derror_replica("{}={} is invalid.", find->first, find->second);
return;
}
_server_write->set_default_ttl(static_cast<uint32_t>(ttl));
_key_ttl_compaction_filter_factory->SetDefaultTTL(static_cast<uint32_t>(ttl));
}
}

bool pegasus_server_impl::set_usage_scenario(const std::string &usage_scenario)
{
if (usage_scenario == _usage_scenario)
Expand Down
4 changes: 3 additions & 1 deletion src/server/pegasus_server_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,8 @@ class pegasus_server_impl : public ::dsn::apps::rrdb_service

void update_usage_scenario(const std::map<std::string, std::string> &envs);

void update_default_ttl(const std::map<std::string, std::string> &envs);

// return finish time recorded in rocksdb
uint64_t do_manual_compact(const rocksdb::CompactRangeOptions &options);

Expand Down Expand Up @@ -247,7 +249,7 @@ class pegasus_server_impl : public ::dsn::apps::rrdb_service
uint64_t _abnormal_multi_get_size_threshold;
uint64_t _abnormal_multi_get_iterate_count_threshold;

KeyWithTTLCompactionFilter _key_ttl_compaction_filter;
std::shared_ptr<KeyWithTTLCompactionFilterFactory> _key_ttl_compaction_filter_factory;
rocksdb::Options _db_opts;
rocksdb::WriteOptions _wt_opts;
rocksdb::ReadOptions _rd_opts;
Expand Down
5 changes: 5 additions & 0 deletions src/server/pegasus_server_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ int pegasus_server_write::on_batched_write_requests(dsn::message_ex **requests,
return on_batched_writes(requests, count);
}

void pegasus_server_write::set_default_ttl(uint32_t ttl)
{
_write_svc->set_default_ttl(ttl);
}

int pegasus_server_write::on_batched_writes(dsn::message_ex **requests, int count)
{
int err = 0;
Expand Down
2 changes: 2 additions & 0 deletions src/server/pegasus_server_write.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ class pegasus_server_write : public dsn::replication::replica_base
int64_t decree,
uint64_t timestamp);

void set_default_ttl(uint32_t ttl);

private:
/// Delay replying for the batched requests until all of them complete.
int on_batched_writes(dsn::message_ex **requests, int count);
Expand Down
5 changes: 5 additions & 0 deletions src/server/pegasus_write_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,11 @@ void pegasus_write_service::batch_abort(int64_t decree, int err)
clear_up_batch_states();
}

void pegasus_write_service::set_default_ttl(uint32_t ttl)
{
_impl->set_default_ttl(ttl);
}

void pegasus_write_service::clear_up_batch_states()
{
uint64_t latency = dsn_now_ns() - _batch_start_time;
Expand Down
2 changes: 2 additions & 0 deletions src/server/pegasus_write_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ class pegasus_write_service
// Abort batch write.
void batch_abort(int64_t decree, int err);

void set_default_ttl(uint32_t ttl);

private:
void clear_up_batch_states();

Expand Down
22 changes: 21 additions & 1 deletion src/server/pegasus_write_service_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
_db(server->_db),
_wt_opts(server->_wt_opts),
_rd_opts(server->_rd_opts),
_default_ttl(0),
_pfc_recent_expire_count(server->_pfc_recent_expire_count)
{
}
Expand Down Expand Up @@ -485,6 +486,14 @@ class pegasus_write_service::impl : public dsn::replication::replica_base

void batch_abort(int64_t decree, int err) { clear_up_batch_states(decree, err); }

void set_default_ttl(uint32_t ttl)
{
if (_default_ttl != ttl) {
_default_ttl = ttl;
ddebug_replica("update _default_ttl to {}.", ttl);
}
}

private:
int db_write_batch_put(int64_t decree,
dsn::string_view raw_key,
Expand All @@ -497,7 +506,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
rocksdb::Slice skey = utils::to_rocksdb_slice(raw_key);
rocksdb::SliceParts skey_parts(&skey, 1);
rocksdb::SliceParts svalue =
_value_generator.generate_value(_value_schema_version, value, expire_sec);
_value_generator.generate_value(_value_schema_version, value, db_expire_ts(expire_sec));
rocksdb::Status s = _batch.Put(skey_parts, svalue);
if (dsn_unlikely(!s.ok())) {
::dsn::blob hash_key, sort_key;
Expand Down Expand Up @@ -679,6 +688,16 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
return false;
}

uint32_t db_expire_ts(uint32_t expire_ts)
{
// use '_default_ttl' when ttl is not set for this write operation.
if (_default_ttl != 0 && expire_ts == 0) {
return utils::epoch_now() + _default_ttl;
}

return expire_ts;
}

private:
friend class pegasus_write_service_test;
friend class pegasus_server_write_test;
Expand All @@ -690,6 +709,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
rocksdb::DB *_db;
rocksdb::WriteOptions &_wt_opts;
rocksdb::ReadOptions &_rd_opts;
volatile uint32_t _default_ttl;
::dsn::perf_counter_wrapper &_pfc_recent_expire_count;

pegasus_value_generator _value_generator;
Expand Down
7 changes: 7 additions & 0 deletions src/test/function_test/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,18 @@
#include <vector>
#include <map>

#include <dsn/dist/replication/replication_ddl_client.h>
#include <dsn/service_api_c.h>
#include <unistd.h>
#include <pegasus/client.h>
#include <gtest/gtest.h>

using namespace ::dsn;
using namespace ::replication;
using namespace ::pegasus;

pegasus_client *client = nullptr;
std::shared_ptr<replication_ddl_client> ddl_client;

GTEST_API_ int main(int argc, char **argv)
{
Expand All @@ -31,6 +35,9 @@ GTEST_API_ int main(int argc, char **argv)

const char *app_name = argv[2];
client = pegasus_client_factory::get_client("mycluster", app_name);
std::vector<rpc_address> meta_list;
replica_helper::load_meta_servers(meta_list, "uri-resolver.dsn://mycluster", "arguments");
ddl_client = std::make_shared<replication_ddl_client>(meta_list);
ddebug("MainThread: app_name=%s", app_name);

int gargc = argc - 2;
Expand Down
2 changes: 2 additions & 0 deletions src/test/function_test/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ exit_if_fail $? "run test check_and_set failed: $test_case $config_file $table_n
GTEST_OUTPUT="xml:$REPORT_DIR/check_and_mutate.xml" GTEST_FILTER="check_and_mutate.*" ./$test_case $config_file $table_name
exit_if_fail $? "run test check_and_mutate failed: $test_case $config_file $table_name"
GTEST_OUTPUT="xml:$REPORT_DIR/scan.xml" GTEST_FILTER="scan.*" ./$test_case $config_file $table_name
GTEST_OUTPUT="xml:$REPORT_DIR/ttl.xml" GTEST_FILTER="ttl.*" ./$test_case $config_file $table_name
exit_if_fail $? "run test ttl failed: $test_case $config_file $table_name"
exit_if_fail $? "run test scan failed: $test_case $config_file $table_name"
GTEST_OUTPUT="xml:$REPORT_DIR/slog_log.xml" GTEST_FILTER="lost_log.*" ./$test_case $config_file $table_name
exit_if_fail $? "run test slog_lost failed: $test_case $config_file $table_name"
Expand Down
31 changes: 0 additions & 31 deletions src/test/function_test/test_basic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1366,37 +1366,6 @@ TEST(basic, set_get_del_async)
while (!callbacked.load(std::memory_order_seq_cst))
usleep(100);

// set and ttl
{
dsn::utils::notify_event evt;
client->async_set("basic_test_hash_key_1",
"basic_test_sort_key_1",
"basic_test_value_1",
[&](int err, internal_info &&info) {
ASSERT_EQ(PERR_OK, err);
evt.notify();
},
10,
3);
evt.wait();
std::string value;
int result = client->get("basic_test_hash_key_1", "basic_test_sort_key_1", value);
ASSERT_EQ(PERR_OK, result);
ASSERT_EQ("basic_test_value_1", value);

int ttl_seconds;
result = client->ttl("basic_test_hash_key_1", "basic_test_sort_key_1", ttl_seconds);
ASSERT_EQ(PERR_OK, result);
ASSERT_TRUE(ttl_seconds > 0 && ttl_seconds <= 3) << "ttl is " << ttl_seconds;

std::this_thread::sleep_for(std::chrono::seconds(4));
result = client->ttl("basic_test_hash_key_1", "basic_test_sort_key_1", ttl_seconds);
ASSERT_EQ(PERR_NOT_FOUND, result);

result = client->get("basic_test_hash_key_1", "basic_test_sort_key_1", value);
ASSERT_EQ(PERR_NOT_FOUND, result);
}

// exist
ret = client->exist("basic_test_hash_key_1", "basic_test_sort_key_1");
ASSERT_EQ(PERR_NOT_FOUND, ret);
Expand Down
Loading

0 comments on commit 14e078f

Please sign in to comment.