Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(enhancement): rename is_txn_mode to txn_context_enabled #2644

Merged
merged 5 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/cluster/slot_migrate.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@

enum class MigrationType {
/// Use Redis commands to migrate data.
/// It will trying to extract commands from existing data and log, then replay
/// It will try to extract commands from existing data and log, then replay
/// them on the destination node.
kRedisCommand = 0,
/// Using raw key-value and "APPLYBATCH" command in kvrocks to migrate data.
Expand Down
4 changes: 4 additions & 0 deletions src/commands/cmd_txn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ class CommandExec : public Commander {
auto s = storage->BeginTxn();
if (s.IsOK()) {
conn->ExecuteCommands(conn->GetMultiExecCommands());
// In Redis, errors happening after EXEC instead are not handled in a special way:
// all the other commands will be executed even if some command fails during
// the transaction.
// So, if conn->IsMultiError(), the transaction should still be committed.
s = storage->CommitTxn();
}
return s;
Expand Down
2 changes: 1 addition & 1 deletion src/config/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ constexpr const size_t GiB = 1024L * MiB;
constexpr const uint32_t kDefaultPort = 6666;

constexpr const char *kDefaultNamespace = "__namespace";
constexpr const size_t KVROCKS_MAX_LSM_LEVEL = 7;
constexpr int KVROCKS_MAX_LSM_LEVEL = 7;

enum class BlockCacheType { kCacheTypeLRU = 0, kCacheTypeHCC };

Expand Down
2 changes: 1 addition & 1 deletion src/server/redis_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ void Connection::ExecuteCommands(std::deque<CommandTokens> *to_process_cmds) {

// We don't execute commands, but queue them, and then execute in EXEC command
if (is_multi_exec && !in_exec_ && !(cmd_flags & kCmdEndMulti)) {
multi_cmds_.emplace_back(cmd_tokens);
multi_cmds_.emplace_back(std::move(cmd_tokens));
Reply(redis::SimpleString("QUEUED"));
continue;
}
Expand Down
4 changes: 3 additions & 1 deletion src/storage/batch_indexer.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@

#include "storage.h"

// WriteBatchIndexer traverses the operations in WriteBatch and appends to the specified WriteBatchWithIndex
/// WriteBatchIndexer traverses the operations in WriteBatch and appends to the
/// specified WriteBatchWithIndex.
class WriteBatchIndexer : public rocksdb::WriteBatch::Handler {
public:
explicit WriteBatchIndexer(engine::Storage* storage, rocksdb::WriteBatchWithIndex* dest_batch,
Expand All @@ -41,6 +42,7 @@ class WriteBatchIndexer : public rocksdb::WriteBatch::Handler {
}
explicit WriteBatchIndexer(engine::Context& ctx)
: WriteBatchIndexer(ctx.storage, ctx.batch.get(), ctx.GetSnapshot()) {}

rocksdb::Status PutCF(uint32_t column_family_id, const rocksdb::Slice& key, const rocksdb::Slice& value) override {
return dest_batch_->Put(storage_->GetCFHandle(static_cast<ColumnFamilyID>(column_family_id)), key, value);
}
Expand Down
29 changes: 16 additions & 13 deletions src/storage/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -597,14 +597,14 @@ rocksdb::Status Storage::Get(engine::Context &ctx, const rocksdb::ReadOptions &o
rocksdb::Status Storage::Get(engine::Context &ctx, const rocksdb::ReadOptions &options,
rocksdb::ColumnFamilyHandle *column_family, const rocksdb::Slice &key,
std::string *value) {
if (ctx.is_txn_mode) {
if (ctx.txn_context_enabled) {
DCHECK_NOTNULL(options.snapshot);
DCHECK_EQ(ctx.GetSnapshot()->GetSequenceNumber(), options.snapshot->GetSequenceNumber());
}
rocksdb::Status s;
if (is_txn_mode_ && txn_write_batch_->GetWriteBatch()->Count() > 0) {
s = txn_write_batch_->GetFromBatchAndDB(db_.get(), options, column_family, key, value);
} else if (ctx.batch && ctx.is_txn_mode) {
} else if (ctx.batch && ctx.txn_context_enabled) {
s = ctx.batch->GetFromBatchAndDB(db_.get(), options, column_family, key, value);
} else {
s = db_->Get(options, column_family, key, value);
Expand All @@ -622,14 +622,14 @@ rocksdb::Status Storage::Get(engine::Context &ctx, const rocksdb::ReadOptions &o
rocksdb::Status Storage::Get(engine::Context &ctx, const rocksdb::ReadOptions &options,
rocksdb::ColumnFamilyHandle *column_family, const rocksdb::Slice &key,
rocksdb::PinnableSlice *value) {
if (ctx.is_txn_mode) {
if (ctx.txn_context_enabled) {
DCHECK_NOTNULL(options.snapshot);
DCHECK_EQ(ctx.GetSnapshot()->GetSequenceNumber(), options.snapshot->GetSequenceNumber());
}
rocksdb::Status s;
if (is_txn_mode_ && txn_write_batch_->GetWriteBatch()->Count() > 0) {
s = txn_write_batch_->GetFromBatchAndDB(db_.get(), options, column_family, key, value);
} else if (ctx.is_txn_mode && ctx.batch) {
} else if (ctx.txn_context_enabled && ctx.batch) {
s = ctx.batch->GetFromBatchAndDB(db_.get(), options, column_family, key, value);
} else {
s = db_->Get(options, column_family, key, value);
Expand All @@ -655,14 +655,14 @@ void Storage::recordKeyspaceStat(const rocksdb::ColumnFamilyHandle *column_famil

rocksdb::Iterator *Storage::NewIterator(engine::Context &ctx, const rocksdb::ReadOptions &options,
rocksdb::ColumnFamilyHandle *column_family) {
if (ctx.is_txn_mode) {
if (ctx.txn_context_enabled) {
DCHECK_NOTNULL(options.snapshot);
DCHECK_EQ(ctx.GetSnapshot()->GetSequenceNumber(), options.snapshot->GetSequenceNumber());
}
auto iter = db_->NewIterator(options, column_family);
if (is_txn_mode_ && txn_write_batch_->GetWriteBatch()->Count() > 0) {
return txn_write_batch_->NewIteratorWithBase(column_family, iter, &options);
} else if (ctx.is_txn_mode && ctx.batch && ctx.batch->GetWriteBatch()->Count() > 0) {
} else if (ctx.txn_context_enabled && ctx.batch && ctx.batch->GetWriteBatch()->Count() > 0) {
return ctx.batch->NewIteratorWithBase(column_family, iter, &options);
}
return iter;
Expand All @@ -671,14 +671,14 @@ rocksdb::Iterator *Storage::NewIterator(engine::Context &ctx, const rocksdb::Rea
void Storage::MultiGet(engine::Context &ctx, const rocksdb::ReadOptions &options,
rocksdb::ColumnFamilyHandle *column_family, const size_t num_keys, const rocksdb::Slice *keys,
rocksdb::PinnableSlice *values, rocksdb::Status *statuses) {
if (ctx.is_txn_mode) {
if (ctx.txn_context_enabled) {
DCHECK_NOTNULL(options.snapshot);
DCHECK_EQ(ctx.GetSnapshot()->GetSequenceNumber(), options.snapshot->GetSequenceNumber());
}
if (is_txn_mode_ && txn_write_batch_->GetWriteBatch()->Count() > 0) {
txn_write_batch_->MultiGetFromBatchAndDB(db_.get(), options, column_family, num_keys, keys, values, statuses,
false);
} else if (ctx.is_txn_mode && ctx.batch) {
} else if (ctx.txn_context_enabled && ctx.batch) {
ctx.batch->MultiGetFromBatchAndDB(db_.get(), options, column_family, num_keys, keys, values, statuses, false);
} else {
db_->MultiGet(options, column_family, num_keys, keys, values, statuses, false);
Expand All @@ -700,18 +700,21 @@ rocksdb::Status Storage::Write(engine::Context &ctx, const rocksdb::WriteOptions

rocksdb::Status Storage::writeToDB(engine::Context &ctx, const rocksdb::WriteOptions &options,
rocksdb::WriteBatch *updates) {
// Put replication id logdata at the end of write batch
// Put replication id logdata at the end of `updates`.
if (replid_.length() == kReplIdLength) {
updates->PutLogData(ServerLogData(kReplIdLog, replid_).Encode());
}

if (ctx.is_txn_mode) {
if (ctx.txn_context_enabled) {
// Extract writes from the updates and append to the ctx.batch
if (ctx.batch == nullptr) {
ctx.batch = std::make_unique<rocksdb::WriteBatchWithIndex>();
}
WriteBatchIndexer handle(ctx);
auto s = updates->Iterate(&handle);
if (!s.ok()) return s;
} else {
DCHECK(ctx.batch == nullptr);
}

return db_->Write(options, updates);
Expand Down Expand Up @@ -1278,19 +1281,19 @@ bool Storage::ReplDataManager::FileExists(Storage *storage, const std::string &d

[[nodiscard]] rocksdb::ReadOptions Context::GetReadOptions() {
rocksdb::ReadOptions read_options;
if (is_txn_mode) read_options.snapshot = GetSnapshot();
if (txn_context_enabled) read_options.snapshot = GetSnapshot();
return read_options;
}

[[nodiscard]] rocksdb::ReadOptions Context::DefaultScanOptions() {
rocksdb::ReadOptions read_options = storage->DefaultScanOptions();
if (is_txn_mode) read_options.snapshot = GetSnapshot();
if (txn_context_enabled) read_options.snapshot = GetSnapshot();
return read_options;
}

[[nodiscard]] rocksdb::ReadOptions Context::DefaultMultiGetOptions() {
rocksdb::ReadOptions read_options = storage->DefaultMultiGetOptions();
if (is_txn_mode) read_options.snapshot = GetSnapshot();
if (txn_context_enabled) read_options.snapshot = GetSnapshot();
return read_options;
}

Expand Down
49 changes: 33 additions & 16 deletions src/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -248,11 +248,12 @@ class Storage {
rocksdb::ColumnFamilyHandle *column_family);
rocksdb::Iterator *NewIterator(engine::Context &ctx, const rocksdb::ReadOptions &options);

[[nodiscard]] rocksdb::Status Write(engine::Context &ctx, const rocksdb::WriteOptions &options,
rocksdb::WriteBatch *updates);
const rocksdb::WriteOptions &DefaultWriteOptions() { return default_write_opts_; }
const rocksdb::WriteOptions &DefaultWriteOptions() const { return default_write_opts_; }
rocksdb::ReadOptions DefaultScanOptions() const;
rocksdb::ReadOptions DefaultMultiGetOptions() const;

[[nodiscard]] rocksdb::Status Write(engine::Context &ctx, const rocksdb::WriteOptions &options,
rocksdb::WriteBatch *updates);
[[nodiscard]] rocksdb::Status Delete(engine::Context &ctx, const rocksdb::WriteOptions &options,
rocksdb::ColumnFamilyHandle *cf_handle, const rocksdb::Slice &key);
[[nodiscard]] rocksdb::Status DeleteRange(engine::Context &ctx, const rocksdb::WriteOptions &options,
Expand Down Expand Up @@ -336,6 +337,9 @@ class Storage {
void SetDBInRetryableIOError(bool yes_or_no) { db_in_retryable_io_error_ = yes_or_no; }
bool IsDBInRetryableIOError() const { return db_in_retryable_io_error_; }

/// Redis PSYNC relies on a Unique Replication Sequence Id when use-rsid-psync
/// enabled.
/// ShiftReplId would generate an Id and write it to propagate cf.
Status ShiftReplId(engine::Context &ctx);
std::string GetReplIdFromWalBySeq(rocksdb::SequenceNumber seq);
std::string GetReplIdFromDbEngine();
Expand Down Expand Up @@ -363,6 +367,8 @@ class Storage {

std::atomic<bool> db_in_retryable_io_error_{false};

// is_txn_mode_ is used to determine whether the current Storage is in transactional mode,
// .i.e, in "EXEC" command(CommandExec).
std::atomic<bool> is_txn_mode_ = false;
// txn_write_batch_ is used as the global write batch for the transaction mode,
// all writes will be grouped in this write batch when entering the transaction mode,
Expand All @@ -380,39 +386,48 @@ class Storage {

/// Context passes fixed snapshot and batch between APIs
///
/// Limitations: Performing a large number of writes on the same Context may reduce performance.
/// Please choose to use the same Context or create a new Context based on the actual situation.
/// Limitations: Performing a large number of writes or apply operations like DeleteRange
/// on the same Context may reduce performance.
/// Please choose to use the same Context or create a new Context based on the actual
/// situation.
///
/// Context does not provide thread safety guarantees and is generally only passed as a parameter between APIs.
struct Context {
engine::Storage *storage = nullptr;

/// batch can be nullptr if
/// 1. The Context is not in transactional mode.
/// 2. The Context is in transactional mode, but no write operation is performed.
std::unique_ptr<rocksdb::WriteBatchWithIndex> batch = nullptr;

/// is_txn_mode is used to determine whether the current Context is in transactional mode,
/// txn_context_enabled is used to determine whether the current Context is in transactional mode,
/// if it is not transactional mode, then Context is equivalent to a Storage.
/// If the configuration of txn-context-enabled is no, it is false.
bool is_txn_mode = true;
bool txn_context_enabled = true;

/// NoTransactionContext returns a Context with a is_txn_mode of false
static Context NoTransactionContext(engine::Storage *storage) { return Context(storage, false); }
static Context NoTransactionContext(engine::Storage *storage) { return Context(storage, /*txn_mode=*/false); }

/// GetReadOptions returns a default ReadOptions, and if is_txn_mode = true, then its snapshot is specified by the
/// Context
/// GetReadOptions returns a default ReadOptions, and if txn_context_enabled = true,
/// then its snapshot is specified by the Context.
/// Otherwise it is the same as Storage::DefaultReadOptions().
[[nodiscard]] rocksdb::ReadOptions GetReadOptions();
/// DefaultScanOptions returns a DefaultScanOptions, and if is_txn_mode = true, then its snapshot is specified by the
/// Context. Otherwise it is the same as Storage::DefaultScanOptions
/// DefaultScanOptions returns a DefaultScanOptions, and if txn_context_enabled = true,
/// then its snapshot is specified by the Context.
/// Otherwise it is the same as Storage::DefaultScanOptions().
[[nodiscard]] rocksdb::ReadOptions DefaultScanOptions();
/// DefaultMultiGetOptions returns a DefaultMultiGetOptions, and if is_txn_mode = true, then its snapshot is specified
/// by the Context. Otherwise it is the same as Storage::DefaultMultiGetOptions
/// DefaultMultiGetOptions returns a DefaultMultiGetOptions, and if txn_context_enabled = true,
/// then its snapshot is specified by the Context.
/// Otherwise it is the same as Storage::DefaultMultiGetOptions
[[nodiscard]] rocksdb::ReadOptions DefaultMultiGetOptions();

void RefreshLatestSnapshot();

/// TODO: Change it to defer getting the context, and the snapshot is pinned after the first read operation
explicit Context(engine::Storage *storage)
: storage(storage), is_txn_mode(storage->GetConfig()->txn_context_enabled) {}
: storage(storage), txn_context_enabled(storage->GetConfig()->txn_context_enabled) {}
~Context() {
// A moved-from object doesn't have `storage`.
if (storage) {
if (snapshot_ && storage->GetDB()) {
storage->GetDB()->ReleaseSnapshot(snapshot_);
Expand Down Expand Up @@ -441,14 +456,16 @@ struct Context {
// and it's not a thread-safe operation.
const rocksdb::Snapshot *GetSnapshot() {
if (snapshot_ == nullptr) {
// Should not acquire a snapshot_ on a moved-from object.
DCHECK(storage != nullptr);
snapshot_ = storage->GetDB()->GetSnapshot(); // NOLINT
}
return snapshot_;
}

private:
/// It is only used by NonTransactionContext
explicit Context(engine::Storage *storage, bool txn_mode) : storage(storage), is_txn_mode(txn_mode) {}
explicit Context(engine::Storage *storage, bool txn_mode) : storage(storage), txn_context_enabled(txn_mode) {}

/// If is_txn_mode is true, snapshot should be specified instead of nullptr when used,
/// and should be consistent with snapshot in ReadOptions to avoid ambiguity.
Expand Down
Loading