Skip to content

Commit

Permalink
Merge #37496
Browse files Browse the repository at this point in the history
37496: libroach: Migrate IncrementalIterator logic to C++ r=dt a=adityamaru27

Export was previously executed using an `MVCCIncrementalIterator`
where the logic handling iteration over the diff of the key
range `[startKey, endKey)` and time range `(startTime, endTime]`
was written in Go. This iteration involved making a cgo call
to find every key, along with another cgo call for writing
each key to the sstable.
This migration resolves the aforementioned performance
bottleneck (#18884), by pushing all the required logic to C++
and exposing a single export method.

Based on a performance benchmark by running BACKUP on a tpcc database with 1000 warehouses we observe the following:

- Over an average of 3 runs we see a 1.1x improvement in time performance. While the original binary took ~32m04s the changed implementation took ~28m55s. This is due to the elimination of a cgo call per key.

Co-authored-by: Aditya Maru <[email protected]>
  • Loading branch information
craig[bot] and adityamaru27 committed Jun 4, 2019
2 parents ae9ddfe + 959f577 commit 9521979
Show file tree
Hide file tree
Showing 19 changed files with 20,415 additions and 1,011 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -1117,7 +1117,7 @@ UI_JS_OSS := pkg/ui/src/js/protos.js
UI_TS_OSS := pkg/ui/src/js/protos.d.ts
UI_PROTOS_OSS := $(UI_JS_OSS) $(UI_TS_OSS)

CPP_PROTOS := $(filter %/roachpb/metadata.proto %/roachpb/data.proto %/roachpb/internal.proto %/engine/enginepb/mvcc.proto %/engine/enginepb/mvcc3.proto %/engine/enginepb/file_registry.proto %/engine/enginepb/rocksdb.proto %/hlc/legacy_timestamp.proto %/hlc/timestamp.proto %/log/log.proto %/unresolved_addr.proto,$(GO_PROTOS))
CPP_PROTOS := $(filter %/roachpb/metadata.proto %/roachpb/data.proto %/roachpb/internal.proto %/roachpb/errors.proto %/engine/enginepb/mvcc.proto %/engine/enginepb/mvcc3.proto %/engine/enginepb/file_registry.proto %/engine/enginepb/rocksdb.proto %/hlc/legacy_timestamp.proto %/hlc/timestamp.proto %/log/log.proto %/unresolved_addr.proto,$(GO_PROTOS))
CPP_HEADERS := $(subst ./pkg,$(CPP_PROTO_ROOT),$(CPP_PROTOS:%.proto=%.pb.h))
CPP_SOURCES := $(subst ./pkg,$(CPP_PROTO_ROOT),$(CPP_PROTOS:%.proto=%.pb.cc))

Expand Down
2 changes: 2 additions & 0 deletions c-deps/libroach/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ add_library(roach
file_registry.cc
getter.cc
godefs.cc
incremental_iterator.cc
iterator.cc
ldb.cc
merge.cc
Expand All @@ -43,6 +44,7 @@ add_library(roach
table_props.cc
utils.cc
protos/roachpb/data.pb.cc
protos/roachpb/errors.pb.cc
protos/roachpb/internal.pb.cc
protos/roachpb/metadata.pb.cc
protos/storage/engine/enginepb/mvcc.pb.cc
Expand Down
8 changes: 6 additions & 2 deletions c-deps/libroach/batch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,9 @@ DBIterator* DBBatch::NewIter(DBIterOptions iter_options) {

DBStatus DBBatch::GetStats(DBStatsResult* stats) { return FmtStatus("unsupported"); }

DBStatus DBBatch::GetTickersAndHistograms(DBTickersAndHistogramsResult* stats) { return FmtStatus("unsupported"); }
DBStatus DBBatch::GetTickersAndHistograms(DBTickersAndHistogramsResult* stats) {
return FmtStatus("unsupported");
}

DBString DBBatch::GetCompactionStats() { return ToDBString("unsupported"); }

Expand Down Expand Up @@ -654,7 +656,9 @@ DBIterator* DBWriteOnlyBatch::NewIter(DBIterOptions) { return NULL; }

DBStatus DBWriteOnlyBatch::GetStats(DBStatsResult* stats) { return FmtStatus("unsupported"); }

DBStatus DBWriteOnlyBatch::GetTickersAndHistograms(DBTickersAndHistogramsResult* stats) { return FmtStatus("unsupported"); }
DBStatus DBWriteOnlyBatch::GetTickersAndHistograms(DBTickersAndHistogramsResult* stats) {
return FmtStatus("unsupported");
}

DBString DBWriteOnlyBatch::GetCompactionStats() { return ToDBString("unsupported"); }

Expand Down
80 changes: 75 additions & 5 deletions c-deps/libroach/db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

#include "db.h"
#include <algorithm>
#include <iostream>
#include <rocksdb/convenience.h>
#include <rocksdb/perf_context.h>
#include <rocksdb/sst_file_writer.h>
Expand All @@ -29,6 +30,7 @@
#include "fmt.h"
#include "getter.h"
#include "godefs.h"
#include "incremental_iterator.h"
#include "iterator.h"
#include "merge.h"
#include "options.h"
Expand Down Expand Up @@ -129,7 +131,6 @@ DBIterState DBIterGetState(DBIterator* iter) {

return state;
}

} // namespace

namespace cockroach {
Expand Down Expand Up @@ -267,11 +268,10 @@ DBStatus DBCreateCheckpoint(DBEngine* db, DBSlice dir) {
// NB: passing 0 for log_size_for_flush forces a WAL sync, i.e. makes sure
// that the checkpoint is up to date.
status = cp_ptr->CreateCheckpoint(cp_dir, 0 /* log_size_for_flush */);
delete(cp_ptr);
delete (cp_ptr);
return ToDBStatus(status);
}


DBStatus DBDestroy(DBSlice dir) {
rocksdb::Options options;
return ToDBStatus(rocksdb::DestroyDB(ToString(dir), options));
Expand Down Expand Up @@ -833,13 +833,21 @@ DBStatus DBSstFileWriterOpen(DBSstFileWriter* fw) {
return kSuccess;
}

DBStatus DBSstFileWriterAdd(DBSstFileWriter* fw, DBKey key, DBSlice val) {
rocksdb::Status status = fw->rep.Put(EncodeKey(key), ToSlice(val));
namespace {
DBStatus DBSstFileWriterAddRaw(DBSstFileWriter* fw, const rocksdb::Slice key,
const rocksdb::Slice val) {
rocksdb::Status status = fw->rep.Put(key, val);
if (!status.ok()) {
return ToDBStatus(status);
}

return kSuccess;
}
} // namespace

DBStatus DBSstFileWriterAdd(DBSstFileWriter* fw, DBKey key, DBSlice val) {
return DBSstFileWriterAddRaw(fw, EncodeKey(key), ToSlice(val));
}

DBStatus DBSstFileWriterDelete(DBSstFileWriter* fw, DBKey key) {
rocksdb::Status status = fw->rep.Delete(EncodeKey(key));
Expand Down Expand Up @@ -907,3 +915,65 @@ DBStatus DBLockFile(DBSlice filename, DBFileLock* lock) {
DBStatus DBUnlockFile(DBFileLock lock) {
return ToDBStatus(rocksdb::Env::Default()->UnlockFile((rocksdb::FileLock*)lock));
}

DBStatus DBExportToSst(DBKey start, DBKey end, bool export_all_revisions, DBIterOptions iter_opts,
DBEngine* engine, DBString* data, int64_t* entries, int64_t* data_size,
DBString* write_intent) {
DBSstFileWriter* writer = DBSstFileWriterNew();
DBStatus status = DBSstFileWriterOpen(writer);
if (status.data != NULL) {
return status;
}

*entries = 0;
*data_size = 0;

DBIncrementalIterator iter(engine, iter_opts, start, end, write_intent);

bool skip_current_key_versions = !export_all_revisions;
DBIterState state;
const std::string end_key = EncodeKey(end);
for (state = iter.seek(start);; state = iter.next(skip_current_key_versions)) {
if (state.status.data != NULL) {
DBSstFileWriterClose(writer);
return state.status;
} else if (!state.valid || kComparator.Compare(iter.key(), end_key) >= 0) {
break;
}

rocksdb::Slice decoded_key;
int64_t wall_time = 0;
int32_t logical_time = 0;

if (!DecodeKey(iter.key(), &decoded_key, &wall_time, &logical_time)) {
DBSstFileWriterClose(writer);
return ToDBString("Unable to decode key");
}

// Skip tombstone (len=0) records when start time is zero (non-incremental)
// and we are not exporting all versions.
bool is_skipping_deletes = start.wall_time == 0 && start.logical == 0 && !export_all_revisions;
if (is_skipping_deletes && iter.value().size() == 0) {
continue;
}

// Insert key into sst and update statistics.
status = DBSstFileWriterAddRaw(writer, iter.key(), iter.value());
if (status.data != NULL) {
DBSstFileWriterClose(writer);
return status;
}
(*entries)++;
(*data_size) += iter.key().size() + iter.value().size();
}

if (*entries == 0) {
DBSstFileWriterClose(writer);
return kSuccess;
}

auto res = DBSstFileWriterFinish(writer, data);
DBSstFileWriterClose(writer);

return res;
}
7 changes: 3 additions & 4 deletions c-deps/libroach/engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,7 @@ DBStatus DBImpl::GetTickersAndHistograms(DBTickersAndHistogramsResult* stats) {
const std::shared_ptr<rocksdb::Statistics>& s = opts.statistics;
stats->tickers_len = rocksdb::TickersNameMap.size();
// We malloc the result so it can be deallocated by the caller using free().
stats->tickers = static_cast<TickerInfo*>(
malloc(stats->tickers_len * sizeof(TickerInfo)));
stats->tickers = static_cast<TickerInfo*>(malloc(stats->tickers_len * sizeof(TickerInfo)));
if (stats->tickers == nullptr) {
return FmtStatus("malloc failed");
}
Expand All @@ -245,8 +244,8 @@ DBStatus DBImpl::GetTickersAndHistograms(DBTickersAndHistogramsResult* stats) {

stats->histograms_len = rocksdb::HistogramsNameMap.size();
// We malloc the result so it can be deallocated by the caller using free().
stats->histograms = static_cast<HistogramInfo*>(
malloc(stats->histograms_len * sizeof(HistogramInfo)));
stats->histograms =
static_cast<HistogramInfo*>(malloc(stats->histograms_len * sizeof(HistogramInfo)));
if (stats->histograms == nullptr) {
return FmtStatus("malloc failed");
}
Expand Down
6 changes: 6 additions & 0 deletions c-deps/libroach/include/libroach.h
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,12 @@ DBStatus DBLockFile(DBSlice filename, DBFileLock* lock);
// the lock.
DBStatus DBUnlockFile(DBFileLock lock);

// DBExportToSst exports changes over the keyrange and time interval between the
// start and end DBKeys to an SSTable using an IncrementalIterator.
DBStatus DBExportToSst(DBKey start, DBKey end, bool export_all_revisions, DBIterOptions iter_opts,
DBEngine* engine, DBString* data, int64_t* entries, int64_t* data_size,
DBString* write_intent);

#ifdef __cplusplus
} // extern "C"
#endif
Loading

0 comments on commit 9521979

Please sign in to comment.