Skip to content

Commit

Permalink
libroach: Migrate MVCCIncrementalIterator logic to C++
Browse files Browse the repository at this point in the history
All of the IncrementalIterator logic was previously written in Go.
This change migrates all the required implementations to C++ and
allows the DBExportToSst method to use this iterator to export
the required keys to an SSTable.

Release note: Performance improvement to the BACKUP process.
  • Loading branch information
adityamaru27 committed May 15, 2019
1 parent d3de2b0 commit f508ca2
Show file tree
Hide file tree
Showing 14 changed files with 20,032 additions and 235 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -1112,7 +1112,7 @@ UI_JS_OSS := pkg/ui/src/js/protos.js
UI_TS_OSS := pkg/ui/src/js/protos.d.ts
UI_PROTOS_OSS := $(UI_JS_OSS) $(UI_TS_OSS)

CPP_PROTOS := $(filter %/roachpb/metadata.proto %/roachpb/data.proto %/roachpb/internal.proto %/engine/enginepb/mvcc.proto %/engine/enginepb/mvcc3.proto %/engine/enginepb/file_registry.proto %/engine/enginepb/rocksdb.proto %/hlc/legacy_timestamp.proto %/hlc/timestamp.proto %/log/log.proto %/unresolved_addr.proto,$(GO_PROTOS))
CPP_PROTOS := $(filter %/roachpb/metadata.proto %/roachpb/data.proto %/roachpb/internal.proto %/roachpb/errors.proto %/engine/enginepb/mvcc.proto %/engine/enginepb/mvcc3.proto %/engine/enginepb/file_registry.proto %/engine/enginepb/rocksdb.proto %/hlc/legacy_timestamp.proto %/hlc/timestamp.proto %/log/log.proto %/unresolved_addr.proto,$(GO_PROTOS))
CPP_HEADERS := $(subst ./pkg,$(CPP_PROTO_ROOT),$(CPP_PROTOS:%.proto=%.pb.h))
CPP_SOURCES := $(subst ./pkg,$(CPP_PROTO_ROOT),$(CPP_PROTOS:%.proto=%.pb.cc))

Expand Down
2 changes: 2 additions & 0 deletions c-deps/libroach/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ add_library(roach
file_registry.cc
getter.cc
godefs.cc
incremental_iterator.cc
iterator.cc
ldb.cc
merge.cc
Expand All @@ -43,6 +44,7 @@ add_library(roach
table_props.cc
utils.cc
protos/roachpb/data.pb.cc
protos/roachpb/errors.pb.cc
protos/roachpb/internal.pb.cc
protos/roachpb/metadata.pb.cc
protos/storage/engine/enginepb/mvcc.pb.cc
Expand Down
8 changes: 6 additions & 2 deletions c-deps/libroach/batch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,9 @@ DBIterator* DBBatch::NewIter(DBIterOptions iter_options) {

DBStatus DBBatch::GetStats(DBStatsResult* stats) { return FmtStatus("unsupported"); }

DBStatus DBBatch::GetTickersAndHistograms(DBTickersAndHistogramsResult* stats) { return FmtStatus("unsupported"); }
DBStatus DBBatch::GetTickersAndHistograms(DBTickersAndHistogramsResult* stats) {
return FmtStatus("unsupported");
}

DBString DBBatch::GetCompactionStats() { return ToDBString("unsupported"); }

Expand Down Expand Up @@ -656,7 +658,9 @@ DBIterator* DBWriteOnlyBatch::NewIter(DBIterOptions) { return NULL; }

DBStatus DBWriteOnlyBatch::GetStats(DBStatsResult* stats) { return FmtStatus("unsupported"); }

DBStatus DBWriteOnlyBatch::GetTickersAndHistograms(DBTickersAndHistogramsResult* stats) { return FmtStatus("unsupported"); }
DBStatus DBWriteOnlyBatch::GetTickersAndHistograms(DBTickersAndHistogramsResult* stats) {
return FmtStatus("unsupported");
}

DBString DBWriteOnlyBatch::GetCompactionStats() { return ToDBString("unsupported"); }

Expand Down
73 changes: 68 additions & 5 deletions c-deps/libroach/db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "fmt.h"
#include "getter.h"
#include "godefs.h"
#include "incremental_iterator.h"
#include "iterator.h"
#include "merge.h"
#include "options.h"
Expand Down Expand Up @@ -131,7 +132,6 @@ DBIterState DBIterGetState(DBIterator* iter) {

return state;
}

} // namespace

namespace cockroach {
Expand Down Expand Up @@ -269,11 +269,10 @@ DBStatus DBCreateCheckpoint(DBEngine* db, DBSlice dir) {
// NB: passing 0 for log_size_for_flush forces a WAL sync, i.e. makes sure
// that the checkpoint is up to date.
status = cp_ptr->CreateCheckpoint(cp_dir, 0 /* log_size_for_flush */);
delete(cp_ptr);
delete (cp_ptr);
return ToDBStatus(status);
}


DBStatus DBDestroy(DBSlice dir) {
rocksdb::Options options;
return ToDBStatus(rocksdb::DestroyDB(ToString(dir), options));
Expand Down Expand Up @@ -835,14 +834,20 @@ DBStatus DBSstFileWriterOpen(DBSstFileWriter* fw) {
return kSuccess;
}

DBStatus DBSstFileWriterAdd(DBSstFileWriter* fw, DBKey key, DBSlice val) {
rocksdb::Status status = fw->rep.Put(EncodeKey(key), ToSlice(val));
DBStatus DBSstFileWriterAddRaw(DBSstFileWriter* fw, const rocksdb::Slice key,
const rocksdb::Slice val) {
rocksdb::Status status = fw->rep.Put(key, val);
if (!status.ok()) {
return ToDBStatus(status);
}

return kSuccess;
}

DBStatus DBSstFileWriterAdd(DBSstFileWriter* fw, DBKey key, DBSlice val) {
return DBSstFileWriterAddRaw(fw, EncodeKey(key), ToSlice(val));
}

DBStatus DBSstFileWriterDelete(DBSstFileWriter* fw, DBKey key) {
rocksdb::Status status = fw->rep.Delete(EncodeKey(key));
if (!status.ok()) {
Expand Down Expand Up @@ -909,3 +914,61 @@ DBStatus DBLockFile(DBSlice filename, DBFileLock* lock) {
DBStatus DBUnlockFile(DBFileLock lock) {
return ToDBStatus(rocksdb::Env::Default()->UnlockFile((rocksdb::FileLock*)lock));
}

DBStatus DBExportToSst(DBKey start, DBKey end, bool export_all_revisions, DBIterOptions iter_opts,
DBEngine* engine, DBString* data, int64_t* entries, int64_t* data_size,
DBString* write_intent) {
DBSstFileWriter* writer = DBSstFileWriterNew();
DBStatus status = DBSstFileWriterOpen(writer);
if (status.data != NULL) {
return status;
}

DBIncrementalIterator iter(engine, iter_opts, start, end, write_intent);

bool skip_current_key_versions = !export_all_revisions;
DBIterState state;
for (state = iter.seek(start);; state = iter.next(skip_current_key_versions)) {
if (state.status.data != NULL) {
DBSstFileWriterClose(writer);
return state.status;
} else if (!state.valid || kComparator.Compare(iter.key(), EncodeKey(end)) >= 0) {
break;
}

rocksdb::Slice decoded_key;
int64_t wall_time = 0;
int32_t logical_time = 0;

if (!DecodeKey(iter.key(), &decoded_key, &wall_time, &logical_time)) {
DBSstFileWriterClose(writer);
return ToDBString("Unable to decode key");
}

// Skip tombstone (len=0) records when start time is zero (non-incremental)
// and we are not exporting all versions.
if (!export_all_revisions && iter.value().size() == 0 && start.wall_time == 0 &&
start.logical == 0) {
continue;
}

// Insert key into sst and update statistics.
status = DBSstFileWriterAddRaw(writer, iter.key(), iter.value());
if (status.data != NULL) {
DBSstFileWriterClose(writer);
return status;
}
(*entries)++;
(*data_size) += iter.key().size() + iter.value().size();
}

if (*entries == 0) {
DBSstFileWriterClose(writer);
return kSuccess;
}

auto res = DBSstFileWriterFinish(writer, data);
DBSstFileWriterClose(writer);

return res;
}
5 changes: 5 additions & 0 deletions c-deps/libroach/db.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,8 @@ void BatchSSTablesForCompaction(const std::vector<rocksdb::SstFileMetaData>& sst
uint64_t target_size, std::vector<rocksdb::Range>* ranges);

} // namespace cockroach

// DBSstFileWriterAddRaw is used internally -- DBSstFileWriterAdd is the
// preferred function for Go callers.
DBStatus DBSstFileWriterAddRaw(DBSstFileWriter* fw, const ::rocksdb::Slice key,
const ::rocksdb::Slice val);
7 changes: 3 additions & 4 deletions c-deps/libroach/engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,7 @@ DBStatus DBImpl::GetTickersAndHistograms(DBTickersAndHistogramsResult* stats) {
const std::shared_ptr<rocksdb::Statistics>& s = opts.statistics;
stats->tickers_len = rocksdb::TickersNameMap.size();
// We malloc the result so it can be deallocated by the caller using free().
stats->tickers = static_cast<TickerInfo*>(
malloc(stats->tickers_len * sizeof(TickerInfo)));
stats->tickers = static_cast<TickerInfo*>(malloc(stats->tickers_len * sizeof(TickerInfo)));
if (stats->tickers == nullptr) {
return FmtStatus("malloc failed");
}
Expand All @@ -247,8 +246,8 @@ DBStatus DBImpl::GetTickersAndHistograms(DBTickersAndHistogramsResult* stats) {

stats->histograms_len = rocksdb::HistogramsNameMap.size();
// We malloc the result so it can be deallocated by the caller using free().
stats->histograms = static_cast<HistogramInfo*>(
malloc(stats->histograms_len * sizeof(HistogramInfo)));
stats->histograms =
static_cast<HistogramInfo*>(malloc(stats->histograms_len * sizeof(HistogramInfo)));
if (stats->histograms == nullptr) {
return FmtStatus("malloc failed");
}
Expand Down
6 changes: 6 additions & 0 deletions c-deps/libroach/include/libroach.h
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,12 @@ DBStatus DBLockFile(DBSlice filename, DBFileLock* lock);
// the lock.
DBStatus DBUnlockFile(DBFileLock lock);

// DBExportToSst exports changes over the keyrange and time interval between the
// start and end DBKeys to an SSTable using an IncrementalIterator.
DBStatus DBExportToSst(DBKey start, DBKey end, bool export_all_revisions, DBIterOptions iter_opts,
DBEngine* engine, DBString* data, int64_t* entries, int64_t* data_size,
DBString* write_intent);

#ifdef __cplusplus
} // extern "C"
#endif
Loading

0 comments on commit f508ca2

Please sign in to comment.