Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: use rocksdb_wrapper::write_batch_put to reimplement check_and_set #656

Merged
merged 13 commits into from
Dec 30, 2020
1 change: 1 addition & 0 deletions src/server/meta_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class meta_store : public dsn::replication::replica_base
std::string *value);

friend class pegasus_write_service;
friend class rocksdb_wrapper;

// Keys of meta data wrote into meta column family.
static const std::string DATA_VERSION;
Expand Down
24 changes: 15 additions & 9 deletions src/server/pegasus_write_service_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "base/pegasus_key_schema.h"
#include "meta_store.h"
#include "rocksdb_wrapper.h"
#include "rocksdb_write_batch_cleaner.h"

#include <dsn/utility/fail_point.h>
#include <dsn/utility/filesystem.h>
Expand Down Expand Up @@ -338,22 +339,24 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
} else {
set_key = check_key;
}
resp.error = db_write_batch_put(decree,
set_key,
update.set_value,
static_cast<uint32_t>(update.set_expire_ts_seconds));
resp.error = _rocksdb_wrapper->write_batch_put(
decree,
set_key,
update.set_value,
static_cast<uint32_t>(update.set_expire_ts_seconds));
} else {
// check not passed, write empty record to update rocksdb's last flushed decree
resp.error = db_write_batch_put(decree, dsn::string_view(), dsn::string_view(), 0);
resp.error = _rocksdb_wrapper->write_batch_put(
decree, dsn::string_view(), dsn::string_view(), 0);
}
if (resp.error) {
clear_up_batch_states(decree, resp.error);
rocksdb_write_batch_cleaner cleaner(_rocksdb_wrapper.get());
levy5307 marked this conversation as resolved.
Show resolved Hide resolved
return resp.error;
}

resp.error = db_write(decree);
resp.error = _rocksdb_wrapper->write(decree);
if (resp.error) {
clear_up_batch_states(decree, resp.error);
rocksdb_write_batch_cleaner cleaner(_rocksdb_wrapper.get());
return resp.error;
}

Expand All @@ -363,7 +366,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
invalid_argument ? rocksdb::Status::kInvalidArgument : rocksdb::Status::kTryAgain;
}

clear_up_batch_states(decree, resp.error);
rocksdb_write_batch_cleaner cleaner(_rocksdb_wrapper.get());
return 0;
}

Expand Down Expand Up @@ -550,10 +553,13 @@ class pegasus_write_service::impl : public dsn::replication::replica_base

void set_default_ttl(uint32_t ttl)
{
// TODO(zlw): remove these lines after the refactor is done
if (_default_ttl != ttl) {
_default_ttl = ttl;
ddebug_replica("update _default_ttl to {}.", ttl);
}

_rocksdb_wrapper->set_default_ttl(ttl);
}

private:
Expand Down
115 changes: 114 additions & 1 deletion src/server/rocksdb_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include <rocksdb/db.h>
#include "pegasus_write_service_impl.h"
#include "base/pegasus_value_schema.h"

namespace pegasus {
namespace server {
Expand All @@ -31,9 +32,17 @@ rocksdb_wrapper::rocksdb_wrapper(pegasus_server_impl *server)
: replica_base(server),
_db(server->_db),
_rd_opts(server->_data_cf_rd_opts),
_meta_cf(server->_meta_cf),
_pegasus_data_version(server->_pegasus_data_version),
_pfc_recent_expire_count(server->_pfc_recent_expire_count)
_pfc_recent_expire_count(server->_pfc_recent_expire_count),
_default_ttl(0)
{
_write_batch = dsn::make_unique<rocksdb::WriteBatch>();
_value_generator = dsn::make_unique<pegasus_value_generator>();

_wt_opts = dsn::make_unique<rocksdb::WriteOptions>();
// disable write ahead logging as replication handles logging instead now
_wt_opts->disableWAL = true;
}

int rocksdb_wrapper::get(dsn::string_view raw_key, /*out*/ db_get_context *ctx)
Expand Down Expand Up @@ -66,5 +75,109 @@ int rocksdb_wrapper::get(dsn::string_view raw_key, /*out*/ db_get_context *ctx)
return s.code();
}

int rocksdb_wrapper::write_batch_put(int64_t decree,
dsn::string_view raw_key,
dsn::string_view value,
uint32_t expire_sec)
{
return write_batch_put_ctx(db_write_context::empty(decree), raw_key, value, expire_sec);
}

int rocksdb_wrapper::write_batch_put_ctx(const db_write_context &ctx,
dsn::string_view raw_key,
dsn::string_view value,
uint32_t expire_sec)
{
FAIL_POINT_INJECT_F("db_write_batch_put",
[](dsn::string_view) -> int { return FAIL_DB_WRITE_BATCH_PUT; });

uint64_t new_timetag = ctx.remote_timetag;
if (!ctx.is_duplicated_write()) { // local write
new_timetag = generate_timetag(ctx.timestamp, get_cluster_id_if_exists(), false);
}

if (ctx.verify_timetag && // needs read-before-write
_pegasus_data_version >= 1 && // data version 0 doesn't support timetag.
!raw_key.empty()) { // not an empty write

db_get_context get_ctx;
int err = get(raw_key, &get_ctx);
if (dsn_unlikely(err != 0)) {
return err;
}
// if record exists and is not expired.
if (get_ctx.found && !get_ctx.expired) {
uint64_t local_timetag =
pegasus_extract_timetag(_pegasus_data_version, get_ctx.raw_value);

if (local_timetag >= new_timetag) {
// ignore this stale update with lower timetag,
// and write an empty record instead
raw_key = value = dsn::string_view();
}
}
}

rocksdb::Slice skey = utils::to_rocksdb_slice(raw_key);
rocksdb::SliceParts skey_parts(&skey, 1);
rocksdb::SliceParts svalue = _value_generator->generate_value(
_pegasus_data_version, value, db_expire_ts(expire_sec), new_timetag);
rocksdb::Status s = _write_batch->Put(skey_parts, svalue);
if (dsn_unlikely(!s.ok())) {
::dsn::blob hash_key, sort_key;
pegasus_restore_key(::dsn::blob(raw_key.data(), 0, raw_key.size()), hash_key, sort_key);
derror_rocksdb("WriteBatchPut",
s.ToString(),
"decree: {}, hash_key: {}, sort_key: {}, expire_ts: {}",
ctx.decree,
utils::c_escape_string(hash_key),
utils::c_escape_string(sort_key),
expire_sec);
}
return s.code();
}

int rocksdb_wrapper::write(int64_t decree)
{
dassert(_write_batch->Count() != 0, "the number of updates in the batch is 0");

FAIL_POINT_INJECT_F("db_write", [](dsn::string_view) -> int { return FAIL_DB_WRITE; });

rocksdb::Status status =
_write_batch->Put(_meta_cf, meta_store::LAST_FLUSHED_DECREE, std::to_string(decree));
if (dsn_unlikely(!status.ok())) {
derror_rocksdb("Write",
status.ToString(),
"put decree of meta cf into batch error, decree: {}",
decree);
return status.code();
}

status = _db->Write(*_wt_opts, _write_batch.get());
if (dsn_unlikely(!status.ok())) {
derror_rocksdb("Write", status.ToString(), "write rocksdb error, decree: {}", decree);
}
return status.code();
}

void rocksdb_wrapper::clear_up_write_batch() { _write_batch->Clear(); }

void rocksdb_wrapper::set_default_ttl(uint32_t ttl)
{
if (_default_ttl != ttl) {
_default_ttl = ttl;
ddebug_replica("update _default_ttl to {}", ttl);
}
}

uint32_t rocksdb_wrapper::db_expire_ts(uint32_t expire_ts)
{
// use '_default_ttl' when ttl is not set for this write operation.
if (_default_ttl != 0 && expire_ts == 0) {
return utils::epoch_now() + _default_ttl;
}

return expire_ts;
}
} // namespace server
} // namespace pegasus
33 changes: 33 additions & 0 deletions src/server/rocksdb_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,26 @@
#pragma once

#include <dsn/dist/replication/replica_base.h>
#include <gtest/gtest_prod.h>

namespace rocksdb {
class DB;
class ReadOptions;
class WriteBatch;
class ColumnFamilyHandle;
class WriteOptions;
} // namespace rocksdb

namespace dsn {
class perf_counter_wrapper;
} // namespace dsn

namespace pegasus {
class pegasus_value_generator;

namespace server {
struct db_get_context;
struct db_write_context;
class pegasus_server_impl;

class rocksdb_wrapper : public dsn::replication::replica_base
Expand All @@ -46,11 +53,37 @@ class rocksdb_wrapper : public dsn::replication::replica_base
/// \result ctx.found=false if record is not found. Still 0 is returned.
int get(dsn::string_view raw_key, /*out*/ db_get_context *ctx);

int write_batch_put(int64_t decree,
dsn::string_view raw_key,
dsn::string_view value,
uint32_t expire_sec);
int write_batch_put_ctx(const db_write_context &ctx,
dsn::string_view raw_key,
dsn::string_view value,
uint32_t expire_sec);
int write(int64_t decree);
void clear_up_write_batch();

void set_default_ttl(uint32_t ttl);

private:
uint32_t db_expire_ts(uint32_t expire_ts);

rocksdb::DB *_db;
rocksdb::ReadOptions &_rd_opts;
std::unique_ptr<pegasus_value_generator> _value_generator;
std::unique_ptr<rocksdb::WriteBatch> _write_batch;
std::unique_ptr<rocksdb::WriteOptions> _wt_opts;
rocksdb::ColumnFamilyHandle *_meta_cf;
Comment on lines +74 to +77
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
std::unique_ptr<pegasus_value_generator> _value_generator;
std::unique_ptr<rocksdb::WriteBatch> _write_batch;
std::unique_ptr<rocksdb::WriteOptions> _wt_opts;
rocksdb::ColumnFamilyHandle *_meta_cf;
pegasus_value_generator _value_generator;
rocksdb::WriteBatch _write_batch;
rocksdb::WriteOptions _wt_opts;
rocksdb::ColumnFamilyHandle *_meta_cf;

Why not use value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To reduce file dependencies


const uint32_t _pegasus_data_version;
dsn::perf_counter_wrapper &_pfc_recent_expire_count;
volatile uint32_t _default_ttl;

friend class rocksdb_wrapper_test;
FRIEND_TEST(rocksdb_wrapper_test, put_verify_timetag);
FRIEND_TEST(rocksdb_wrapper_test, verify_timetag_compatible_with_version_0);
FRIEND_TEST(rocksdb_wrapper_test, get);
};
} // namespace server
} // namespace pegasus
37 changes: 37 additions & 0 deletions src/server/rocksdb_write_batch_cleaner.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
levy5307 marked this conversation as resolved.
Show resolved Hide resolved
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

#include "rocksdb_wrapper.h"

namespace pegasus {
namespace server {
class rocksdb_write_batch_cleaner
{
public:
rocksdb_write_batch_cleaner(rocksdb_wrapper *rocksdb)
{
if (nullptr != rocksdb) {
rocksdb->clear_up_write_batch();
}
}
};
} // namespace server
} // namespace pegasus
Loading