Skip to content

Commit

Permalink
Implement backup and load methods
Browse files Browse the repository at this point in the history
  • Loading branch information
aalda authored and gdiazlo committed Mar 14, 2019
1 parent b627664 commit dc377ff
Show file tree
Hide file tree
Showing 14 changed files with 883 additions and 25 deletions.
67 changes: 67 additions & 0 deletions rocksdb/checkpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
Copyright 2018 Banco Bilbao Vizcaya Argentaria, S.A.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rocksdb

// #include <stdlib.h>
// #include <rocksdb/c.h>
import "C"
import (
"errors"
"unsafe"
)

// Checkpoint provides Checkpoint functionality.
// A checkpoint is an openable snapshot of a database at a point in time.
type Checkpoint struct {
c *C.rocksdb_checkpoint_t
}

// NewNativeCheckpoint creates a new checkpoint.
func NewNativeCheckpoint(c *C.rocksdb_checkpoint_t) *Checkpoint {
return &Checkpoint{c: c}
}

// CreateCheckpoint builds an openable snapshot of RocksDB on the same disk, which
// accepts an output directory on the same disk, and under the directory
// (1) hard-linked SST files pointing to existing live SST files
// SST files will be copied if output directory is on a different filesystem
// (2) a copied manifest files and other files
// The directory should not already exist and will be created by this API.
// The directory will be an absolute path
// logSizeForFlush: if the total log file size is equal or larger than
// this value, then a flush is triggered for all the column families. The
// default value is 0, which means flush is always triggered. If you move
// away from the default, the checkpoint may not contain up-to-date data
// if WAL writing is not always enabled.
// Flush will always trigger if it is 2PC.
func (cp *Checkpoint) CreateCheckpoint(checkpointDir string, logSizeForFlush uint64) error {
var cErr *C.char
cDir := C.CString(checkpointDir)
defer C.free(unsafe.Pointer(cDir))

C.rocksdb_checkpoint_create(cp.c, cDir, C.uint64_t(logSizeForFlush), &cErr)
if cErr != nil {
defer C.free(unsafe.Pointer(cErr))
return errors.New(C.GoString(cErr))
}
return nil
}

// Destroy deallocates the Checkpoint object.
func (cp *Checkpoint) Destroy() {
C.rocksdb_checkpoint_object_destroy(cp.c)
cp.c = nil
}
72 changes: 72 additions & 0 deletions rocksdb/checkpoint_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
Copyright 2018 Banco Bilbao Vizcaya Argentaria, S.A.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package rocksdb

import (
"io/ioutil"
"os"
"testing"

"github.com/stretchr/testify/require"
)

func TestCheckpoint(t *testing.T) {

checkDir, err := ioutil.TempDir("", "rocksdb-checkpoint")
require.NoError(t, err)
err = os.RemoveAll(checkDir)
require.NoError(t, err)

db := newTestDB(t, "TestCheckpoint", nil)
defer db.Close()

// insert keys
givenKeys := [][]byte{
[]byte("key1"),
[]byte("key2"),
[]byte("key3"),
}
givenValue := []byte("value")
wo := NewDefaultWriteOptions()
for _, k := range givenKeys {
require.NoError(t, db.Put(wo, k, givenValue))
}

checkpoint, err := db.NewCheckpoint()
require.NoError(t, err)
require.NotNil(t, checkpoint)
defer checkpoint.Destroy()

err = checkpoint.CreateCheckpoint(checkDir, 0)
require.NoError(t, err)

opts := NewDefaultOptions()
dbCheck, err := OpenDBForReadOnly(checkDir, opts, true)
require.NoError(t, err)
defer dbCheck.Close()

// test keys
var value *Slice
ro := NewDefaultReadOptions()
for _, k := range givenKeys {
value, err = dbCheck.Get(ro, k)
defer value.Free()
require.NoError(t, err)
require.Equal(t, value.Data(), givenValue)
}

}
42 changes: 41 additions & 1 deletion rocksdb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type DB struct {
// OpenDB opens a database with the specified options.
func OpenDB(path string, opts *Options) (*DB, error) {
var cErr *C.char
var cPath = C.CString(path)
cPath := C.CString(path)
defer C.free(unsafe.Pointer(cPath))

db := C.rocksdb_open(opts.opts, cPath, &cErr)
Expand All @@ -48,6 +48,24 @@ func OpenDB(path string, opts *Options) (*DB, error) {
}, nil
}

// OpenDBForReadOnly opens a database with the specified options for readonly usage.
func OpenDBForReadOnly(path string, opts *Options, errorIfLogFileExist bool) (*DB, error) {
var cErr *C.char
cPath := C.CString(path)
defer C.free(unsafe.Pointer(cPath))

db := C.rocksdb_open_for_read_only(opts.opts, cPath, boolToUchar(errorIfLogFileExist), &cErr)
if cErr != nil {
defer C.free(unsafe.Pointer(cErr))
return nil, errors.New(C.GoString(cErr))
}

return &DB{
db: db,
opts: opts,
}, nil
}

// Close closes the database.
func (db *DB) Close() error {
if db.db != nil {
Expand All @@ -58,6 +76,17 @@ func (db *DB) Close() error {
return nil
}

// NewCheckpoint creates a new Checkpoint for this db.
func (db *DB) NewCheckpoint() (*Checkpoint, error) {
var cErr *C.char
cCheckpoint := C.rocksdb_checkpoint_object_create(db.db, &cErr)
if cErr != nil {
defer C.free(unsafe.Pointer(cErr))
return nil, errors.New(C.GoString(cErr))
}
return NewNativeCheckpoint(cCheckpoint), nil
}

// Put writes data associated with a key to the database.
func (db *DB) Put(opts *WriteOptions, key, value []byte) error {
cKey := bytesToChar(key)
Expand Down Expand Up @@ -130,3 +159,14 @@ func (db *DB) NewIterator(opts *ReadOptions) *Iterator {
cIter := C.rocksdb_create_iterator(db.db, opts.opts)
return NewNativeIterator(unsafe.Pointer(cIter))
}

// Flush triggers a manuel flush for the database.
func (db *DB) Flush(opts *FlushOptions) error {
var cErr *C.char
C.rocksdb_flush(db.db, opts.opts, &cErr)
if cErr != nil {
defer C.free(unsafe.Pointer(cErr))
return errors.New(C.GoString(cErr))
}
return nil
}
17 changes: 0 additions & 17 deletions rocksdb/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package rocksdb

import (
"io/ioutil"
"testing"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -65,19 +64,3 @@ func TestDBCRUD(t *testing.T) {
require.Nil(t, slice3.Data())

}

func newTestDB(t *testing.T, name string, applyOpts func(opts *Options)) *DB {
dir, err := ioutil.TempDir("", "rocksdb-"+name)
require.NoError(t, err)

opts := NewDefaultOptions()
opts.SetCreateIfMissing(true)
if applyOpts != nil {
applyOpts(opts)
}

db, err := OpenDB(dir, opts)
require.NoError(t, err)

return db
}
27 changes: 27 additions & 0 deletions rocksdb/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@

package rocksdb

// #include <stdlib.h>
// #include <rocksdb/c.h>
import "C"
import "unsafe"

// CompressionType specifies the block compression.
// DB contents are stored in a set of blocks, each of which holds a
Expand Down Expand Up @@ -92,6 +94,31 @@ func (o *Options) SetBlockBasedTableFactory(value *BlockBasedTableOptions) {
C.rocksdb_options_set_block_based_table_factory(o.opts, value.opts)
}

// SetDBLogDir specifies the absolute info LOG dir.
//
// If it is empty, the log files will be in the same dir as data.
// If it is non empty, the log files will be in the specified dir,
// and the db data dir's absolute path will be used as the log file
// name's prefix.
// Default: empty
func (o *Options) SetDBLogDir(value string) {
cValue := C.CString(value)
defer C.free(unsafe.Pointer(cValue))
C.rocksdb_options_set_db_log_dir(o.opts, cValue)
}

// SetWalDir specifies the absolute dir path for write-ahead logs (WAL).
//
// If it is empty, the log files will be in the same dir as data.
// If it is non empty, the log files will be in the specified dir,
// When destroying the db, all log files and the dir itopts is deleted.
// Default: empty
func (o *Options) SetWalDir(value string) {
cValue := C.CString(value)
defer C.free(unsafe.Pointer(cValue))
C.rocksdb_options_set_wal_dir(o.opts, cValue)
}

// Destroy deallocates the Options object.
func (o *Options) Destroy() {
C.rocksdb_options_destroy(o.opts)
Expand Down
43 changes: 43 additions & 0 deletions rocksdb/options_flush.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
Copyright 2018 Banco Bilbao Vizcaya Argentaria, S.A.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package rocksdb

// #include <rocksdb/c.h>
import "C"

// FlushOptions represent all of the available options when manual flushing the
// database.
type FlushOptions struct {
opts *C.rocksdb_flushoptions_t
}

// NewDefaultFlushOptions creates a default FlushOptions object.
func NewDefaultFlushOptions() *FlushOptions {
return &FlushOptions{C.rocksdb_flushoptions_create()}
}

// SetWait specify if the flush will wait until the flush is done.
// Default: true
func (o *FlushOptions) SetWait(value bool) {
C.rocksdb_flushoptions_set_wait(o.opts, boolToUchar(value))
}

// Destroy deallocates the FlushOptions object.
func (o *FlushOptions) Destroy() {
C.rocksdb_flushoptions_destroy(o.opts)
o.opts = nil
}
8 changes: 8 additions & 0 deletions rocksdb/options_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ func NewDefaultWriteOptions() *WriteOptions {
return &WriteOptions{C.rocksdb_writeoptions_create()}
}

// SetDisableWAL sets whether WAL should be active or not.
// If true, writes will not first go to the write ahead log,
// and the write may got lost after a crash.
// Default: false
func (o *WriteOptions) SetDisableWAL(value bool) {
C.rocksdb_writeoptions_disable_WAL(o.opts, C.int(btoi(value)))
}

// Destroy deallocates the WriteOptions object.
func (o *WriteOptions) Destroy() {
C.rocksdb_writeoptions_destroy(o.opts)
Expand Down
40 changes: 40 additions & 0 deletions rocksdb/test_util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
Copyright 2018 Banco Bilbao Vizcaya Argentaria, S.A.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package rocksdb

import (
"io/ioutil"
"testing"

"github.com/stretchr/testify/require"
)

func newTestDB(t *testing.T, name string, applyOpts func(opts *Options)) *DB {
dir, err := ioutil.TempDir("", "rocksdb-"+name)
require.NoError(t, err)

opts := NewDefaultOptions()
opts.SetCreateIfMissing(true)
if applyOpts != nil {
applyOpts(opts)
}

db, err := OpenDB(dir, opts)
require.NoError(t, err)

return db
}
8 changes: 8 additions & 0 deletions rocksdb/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ import (
"unsafe"
)

// btoi converts a bool value to int.
func btoi(b bool) int {
if b {
return 1
}
return 0
}

// boolToUchar converts a bool value to C.uchar.
func boolToUchar(b bool) C.uchar {
if b {
Expand Down
Loading

0 comments on commit dc377ff

Please sign in to comment.