From 5aa204af8e6f3bdb0d23c6341159fb657f6c453b Mon Sep 17 00:00:00 2001 From: Linh Tran Tuan Date: Tue, 12 Dec 2023 14:16:24 +0900 Subject: [PATCH] Adapt RocksDB 8.7.3 (#134) --- build.sh | 2 +- c.h | 42 ++++++++++++++++++ db.go | 18 ++++++++ db_test.go | 6 +++ options_read.go | 18 ++++++++ options_read_test.go | 2 + options_wait_for_compact.go | 75 ++++++++++++++++++++++++++++++++ options_wait_for_compact_test.go | 30 +++++++++++++ 8 files changed, 192 insertions(+), 1 deletion(-) create mode 100644 options_wait_for_compact.go create mode 100644 options_wait_for_compact_test.go diff --git a/build.sh b/build.sh index e5ae151..6b20dc0 100755 --- a/build.sh +++ b/build.sh @@ -37,7 +37,7 @@ cd $BUILD_PATH && wget https://github.com/facebook/zstd/archive/v${zstd_version} # Note: if you don't have a good reason, please do not set -DPORTABLE=ON # This one is set here on purpose of compatibility with github action runtime processor -rocksdb_version="8.6.7" +rocksdb_version="8.7.3" cd $BUILD_PATH && wget https://github.com/facebook/rocksdb/archive/v${rocksdb_version}.tar.gz && tar xzf v${rocksdb_version}.tar.gz && cd rocksdb-${rocksdb_version}/ && \ mkdir -p build_place && cd build_place && cmake -DCMAKE_BUILD_TYPE=Release $CMAKE_REQUIRED_PARAMS -DCMAKE_PREFIX_PATH=$INSTALL_PREFIX -DWITH_TESTS=OFF -DWITH_GFLAGS=OFF \ -DWITH_BENCHMARK_TOOLS=OFF -DWITH_TOOLS=OFF -DWITH_MD_LIBRARY=OFF -DWITH_RUNTIME_DEBUG=OFF -DROCKSDB_BUILD_SHARED=OFF -DWITH_SNAPPY=ON -DWITH_LZ4=ON -DWITH_ZLIB=ON -DWITH_LIBURING=OFF \ diff --git a/c.h b/c.h index 6636b59..eecbe1c 100644 --- a/c.h +++ b/c.h @@ -138,6 +138,8 @@ typedef struct rocksdb_memory_consumers_t rocksdb_memory_consumers_t; typedef struct rocksdb_memory_usage_t rocksdb_memory_usage_t; typedef struct rocksdb_statistics_histogram_data_t rocksdb_statistics_histogram_data_t; +typedef struct rocksdb_wait_for_compact_options_t + rocksdb_wait_for_compact_options_t; /* DB operations */ @@ -1944,6 +1946,8 @@ extern ROCKSDB_LIBRARY_API void rocksdb_readoptions_set_timestamp( rocksdb_readoptions_t*, const char* ts, size_t tslen); extern ROCKSDB_LIBRARY_API void rocksdb_readoptions_set_iter_start_ts( rocksdb_readoptions_t*, const char* ts, size_t tslen); +extern ROCKSDB_LIBRARY_API void rocksdb_readoptions_set_auto_readahead_size( + rocksdb_readoptions_t*, unsigned char); /* Write options */ @@ -2927,6 +2931,44 @@ extern ROCKSDB_LIBRARY_API uint64_t rocksdb_statistics_histogram_data_get_sum( extern ROCKSDB_LIBRARY_API double rocksdb_statistics_histogram_data_get_min( rocksdb_statistics_histogram_data_t* data); +extern ROCKSDB_LIBRARY_API void rocksdb_wait_for_compact( + rocksdb_t* db, rocksdb_wait_for_compact_options_t* options, char** errptr); + +extern ROCKSDB_LIBRARY_API rocksdb_wait_for_compact_options_t* +rocksdb_wait_for_compact_options_create(void); + +extern ROCKSDB_LIBRARY_API void rocksdb_wait_for_compact_options_destroy( + rocksdb_wait_for_compact_options_t* opt); + +extern ROCKSDB_LIBRARY_API void +rocksdb_wait_for_compact_options_set_abort_on_pause( + rocksdb_wait_for_compact_options_t* opt, unsigned char v); + +extern ROCKSDB_LIBRARY_API unsigned char +rocksdb_wait_for_compact_options_get_abort_on_pause( + rocksdb_wait_for_compact_options_t* opt); + +extern ROCKSDB_LIBRARY_API void rocksdb_wait_for_compact_options_set_flush( + rocksdb_wait_for_compact_options_t* opt, unsigned char v); + +extern ROCKSDB_LIBRARY_API unsigned char +rocksdb_wait_for_compact_options_get_flush( + rocksdb_wait_for_compact_options_t* opt); + +extern ROCKSDB_LIBRARY_API void rocksdb_wait_for_compact_options_set_close_db( + rocksdb_wait_for_compact_options_t* opt, unsigned char v); + +extern ROCKSDB_LIBRARY_API unsigned char +rocksdb_wait_for_compact_options_get_close_db( + rocksdb_wait_for_compact_options_t* opt); + +extern ROCKSDB_LIBRARY_API void rocksdb_wait_for_compact_options_set_timeout( + rocksdb_wait_for_compact_options_t* opt, uint64_t microseconds); + +extern ROCKSDB_LIBRARY_API uint64_t +rocksdb_wait_for_compact_options_get_timeout( + rocksdb_wait_for_compact_options_t* opt); + #ifdef __cplusplus } /* end extern "C" */ #endif diff --git a/db.go b/db.go index d5b98aa..1fa2632 100644 --- a/db.go +++ b/db.go @@ -1850,6 +1850,24 @@ func (db *DB) GetColumnFamilyMetadataCF(cf *ColumnFamilyHandle) (m *ColumnFamily return } +// WaitForCompact waits for all flush and compactions jobs to finish. Jobs to wait +// include the unscheduled (queued, but not scheduled yet). If the db is shutting down, +// Status::ShutdownInProgress will be returned. +// +// NOTE: This may also never return if there's sufficient ongoing writes that +// keeps flush and compaction going without stopping. The user would have to +// cease all the writes to DB to make this eventually return in a stable +// state. The user may also use timeout option in WaitForCompactOptions to +// make this stop waiting and return when timeout expires. +func (db *DB) WaitForCompact(opts *WaitForCompactOptions) (err error) { + var cErr *C.char + + C.rocksdb_wait_for_compact(db.c, opts.p, &cErr) + err = fromCError(cErr) + + return +} + // Close the database. func (db *DB) Close() { C.rocksdb_close(db.c) diff --git a/db_test.go b/db_test.go index eebf580..e7f490b 100644 --- a/db_test.go +++ b/db_test.go @@ -290,6 +290,12 @@ func TestDBMultiGet(t *testing.T) { require.EqualValues(t, values[1].Data(), givenVal1) require.EqualValues(t, values[2].Data(), givenVal2) require.EqualValues(t, values[3].Data(), givenVal3) + + waitForCompactOpts := NewWaitForCompactOptions() + defer waitForCompactOpts.Destroy() + + err = db.WaitForCompact(waitForCompactOpts) + require.NoError(t, err) } func TestLoadLatestOpts(t *testing.T) { diff --git a/options_read.go b/options_read.go index 11cf2da..7e6189b 100644 --- a/options_read.go +++ b/options_read.go @@ -345,3 +345,21 @@ func (opts *ReadOptions) SetIterStartTimestamp(ts []byte) { cTSLen := C.size_t(len(ts)) C.rocksdb_readoptions_set_iter_start_ts(opts.c, cTS, cTSLen) } + +// SetAutoReadaheadSize (Experimental) if auto_readahead_size is set to true, it will auto tune +// the readahead_size during scans internally. +// For this feature to enabled, iterate_upper_bound must also be specified. +// +// NOTE: - Recommended for forward Scans only. +// - In case of backward scans like Prev or SeekForPrev, the +// cost of these backward operations might increase and affect the +// performace. So this option should not be enabled if workload +// contains backward scans. +// - If there is a backward scans, this option will be +// disabled internally and won't be reset if forward scan is done +// again. +// +// Default: false +func (opts *ReadOptions) SetAutoReadaheadSize(enable bool) { + C.rocksdb_readoptions_set_auto_readahead_size(opts.c, boolToChar(enable)) +} diff --git a/options_read_test.go b/options_read_test.go index 418e71d..e56bc00 100644 --- a/options_read_test.go +++ b/options_read_test.go @@ -74,4 +74,6 @@ func TestReadOptions(t *testing.T) { require.False(t, ro.IsAsyncIO()) ro.SetAsyncIO(true) require.True(t, ro.IsAsyncIO()) + + ro.SetAutoReadaheadSize(true) } diff --git a/options_wait_for_compact.go b/options_wait_for_compact.go new file mode 100644 index 0000000..aadf8ff --- /dev/null +++ b/options_wait_for_compact.go @@ -0,0 +1,75 @@ +package grocksdb + +// #include "rocksdb/c.h" +import "C" + +type WaitForCompactOptions struct { + p *C.rocksdb_wait_for_compact_options_t +} + +func NewWaitForCompactOptions() *WaitForCompactOptions { + return &WaitForCompactOptions{ + p: C.rocksdb_wait_for_compact_options_create(), + } +} + +func (w *WaitForCompactOptions) Destroy() { + C.rocksdb_wait_for_compact_options_destroy(w.p) + w.p = nil +} + +// SetAbortOnPause toggles the abort_on_pause flag, to abort waiting in case of +// a pause (PauseBackgroundWork() called). +// +// - If true, Status::Aborted will be returned immediately. +// - If false, ContinueBackgroundWork() must be called to resume the background jobs. +// +// Otherwise, jobs that were queued, but not scheduled yet may never finish +// and WaitForCompact() may wait indefinitely (if timeout is set, it will +// expire and return Status::TimedOut). +func (w *WaitForCompactOptions) SetAbortOnPause(v bool) { + C.rocksdb_wait_for_compact_options_set_abort_on_pause(w.p, boolToChar(v)) +} + +// IsAbortOnPause checks if abort_on_pause flag is on. +func (w *WaitForCompactOptions) AbortOnPause() bool { + return charToBool(C.rocksdb_wait_for_compact_options_get_abort_on_pause(w.p)) +} + +// SetFlush toggles the "flush" flag to flush all column families before starting to wait. +func (w *WaitForCompactOptions) SetFlush(v bool) { + C.rocksdb_wait_for_compact_options_set_flush(w.p, boolToChar(v)) +} + +// IsFlush checks if "flush" flag is on. +func (w *WaitForCompactOptions) Flush() bool { + return charToBool(C.rocksdb_wait_for_compact_options_get_flush(w.p)) +} + +// SetCloseDB toggles the "close_db" flag to call Close() after waiting is done. +// By the time Close() is called here, there should be no background jobs in progress +// and no new background jobs should be added. +// +// DB may not have been closed if Close() returned Aborted status due to unreleased snapshots +// in the system. +func (w *WaitForCompactOptions) SetCloseDB(v bool) { + C.rocksdb_wait_for_compact_options_set_close_db(w.p, boolToChar(v)) +} + +// CloseDB checks if "close_db" flag is on. +func (w *WaitForCompactOptions) CloseDB() bool { + return charToBool(C.rocksdb_wait_for_compact_options_get_close_db(w.p)) +} + +// SetTimeout in microseconds for waiting for compaction to complete. +// Status::TimedOut will be returned if timeout expires. +// when timeout == 0, WaitForCompact() will wait as long as there's background +// work to finish. +func (w *WaitForCompactOptions) SetTimeout(microseconds uint64) { + C.rocksdb_wait_for_compact_options_set_timeout(w.p, C.uint64_t(microseconds)) +} + +// GetTimeout in microseconds. +func (w *WaitForCompactOptions) GetTimeout() uint64 { + return uint64(C.rocksdb_wait_for_compact_options_get_timeout(w.p)) +} diff --git a/options_wait_for_compact_test.go b/options_wait_for_compact_test.go new file mode 100644 index 0000000..a5285c2 --- /dev/null +++ b/options_wait_for_compact_test.go @@ -0,0 +1,30 @@ +package grocksdb + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestWaitForCompactOptions(t *testing.T) { + t.Parallel() + + opts := NewWaitForCompactOptions() + defer opts.Destroy() + + require.False(t, opts.AbortOnPause()) + opts.SetAbortOnPause(true) + require.True(t, opts.AbortOnPause()) + + require.False(t, opts.Flush()) + opts.SetFlush(true) + require.True(t, opts.Flush()) + + require.False(t, opts.CloseDB()) + opts.SetCloseDB(true) + require.True(t, opts.CloseDB()) + + require.EqualValues(t, 0, opts.GetTimeout()) + opts.SetTimeout(1234) + require.EqualValues(t, 1234, opts.GetTimeout()) +}