From dc377ff3a58ef947dea5654c6eee557f815fec55 Mon Sep 17 00:00:00 2001 From: Alvaro Alda Date: Tue, 5 Mar 2019 11:05:12 +0100 Subject: [PATCH] Implement backup and load methods --- rocksdb/checkpoint.go | 67 ++++++ rocksdb/checkpoint_test.go | 72 ++++++ rocksdb/db.go | 42 +++- rocksdb/db_test.go | 17 -- rocksdb/options.go | 27 +++ rocksdb/options_flush.go | 43 ++++ rocksdb/options_write.go | 8 + rocksdb/test_util.go | 40 ++++ rocksdb/util.go | 8 + storage/pb/backup.pb.go | 353 ++++++++++++++++++++++++++++ storage/pb/backup.proto | 24 ++ storage/pb/gen.go | 19 ++ storage/rocks/rocksdb_store.go | 127 ++++++++++ storage/rocks/rocksdb_store_test.go | 61 ++++- 14 files changed, 883 insertions(+), 25 deletions(-) create mode 100644 rocksdb/checkpoint.go create mode 100644 rocksdb/checkpoint_test.go create mode 100644 rocksdb/options_flush.go create mode 100644 rocksdb/test_util.go create mode 100644 storage/pb/backup.pb.go create mode 100644 storage/pb/backup.proto create mode 100644 storage/pb/gen.go diff --git a/rocksdb/checkpoint.go b/rocksdb/checkpoint.go new file mode 100644 index 000000000..941f11aa0 --- /dev/null +++ b/rocksdb/checkpoint.go @@ -0,0 +1,67 @@ +/* + Copyright 2018 Banco Bilbao Vizcaya Argentaria, S.A. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ +package rocksdb + +// #include +// #include +import "C" +import ( + "errors" + "unsafe" +) + +// Checkpoint provides Checkpoint functionality. +// A checkpoint is an openable snapshot of a database at a point in time. +type Checkpoint struct { + c *C.rocksdb_checkpoint_t +} + +// NewNativeCheckpoint creates a new checkpoint. +func NewNativeCheckpoint(c *C.rocksdb_checkpoint_t) *Checkpoint { + return &Checkpoint{c: c} +} + +// CreateCheckpoint builds an openable snapshot of RocksDB on the same disk, which +// accepts an output directory on the same disk, and under the directory +// (1) hard-linked SST files pointing to existing live SST files +// SST files will be copied if output directory is on a different filesystem +// (2) a copied manifest files and other files +// The directory should not already exist and will be created by this API. +// The directory will be an absolute path +// logSizeForFlush: if the total log file size is equal or larger than +// this value, then a flush is triggered for all the column families. The +// default value is 0, which means flush is always triggered. If you move +// away from the default, the checkpoint may not contain up-to-date data +// if WAL writing is not always enabled. +// Flush will always trigger if it is 2PC. +func (cp *Checkpoint) CreateCheckpoint(checkpointDir string, logSizeForFlush uint64) error { + var cErr *C.char + cDir := C.CString(checkpointDir) + defer C.free(unsafe.Pointer(cDir)) + + C.rocksdb_checkpoint_create(cp.c, cDir, C.uint64_t(logSizeForFlush), &cErr) + if cErr != nil { + defer C.free(unsafe.Pointer(cErr)) + return errors.New(C.GoString(cErr)) + } + return nil +} + +// Destroy deallocates the Checkpoint object. +func (cp *Checkpoint) Destroy() { + C.rocksdb_checkpoint_object_destroy(cp.c) + cp.c = nil +} diff --git a/rocksdb/checkpoint_test.go b/rocksdb/checkpoint_test.go new file mode 100644 index 000000000..268ba7382 --- /dev/null +++ b/rocksdb/checkpoint_test.go @@ -0,0 +1,72 @@ +/* + Copyright 2018 Banco Bilbao Vizcaya Argentaria, S.A. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package rocksdb + +import ( + "io/ioutil" + "os" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestCheckpoint(t *testing.T) { + + checkDir, err := ioutil.TempDir("", "rocksdb-checkpoint") + require.NoError(t, err) + err = os.RemoveAll(checkDir) + require.NoError(t, err) + + db := newTestDB(t, "TestCheckpoint", nil) + defer db.Close() + + // insert keys + givenKeys := [][]byte{ + []byte("key1"), + []byte("key2"), + []byte("key3"), + } + givenValue := []byte("value") + wo := NewDefaultWriteOptions() + for _, k := range givenKeys { + require.NoError(t, db.Put(wo, k, givenValue)) + } + + checkpoint, err := db.NewCheckpoint() + require.NoError(t, err) + require.NotNil(t, checkpoint) + defer checkpoint.Destroy() + + err = checkpoint.CreateCheckpoint(checkDir, 0) + require.NoError(t, err) + + opts := NewDefaultOptions() + dbCheck, err := OpenDBForReadOnly(checkDir, opts, true) + require.NoError(t, err) + defer dbCheck.Close() + + // test keys + var value *Slice + ro := NewDefaultReadOptions() + for _, k := range givenKeys { + value, err = dbCheck.Get(ro, k) + defer value.Free() + require.NoError(t, err) + require.Equal(t, value.Data(), givenValue) + } + +} diff --git a/rocksdb/db.go b/rocksdb/db.go index c1c9751c9..42ac28834 100644 --- a/rocksdb/db.go +++ b/rocksdb/db.go @@ -33,7 +33,7 @@ type DB struct { // OpenDB opens a database with the specified options. func OpenDB(path string, opts *Options) (*DB, error) { var cErr *C.char - var cPath = C.CString(path) + cPath := C.CString(path) defer C.free(unsafe.Pointer(cPath)) db := C.rocksdb_open(opts.opts, cPath, &cErr) @@ -48,6 +48,24 @@ func OpenDB(path string, opts *Options) (*DB, error) { }, nil } +// OpenDBForReadOnly opens a database with the specified options for readonly usage. +func OpenDBForReadOnly(path string, opts *Options, errorIfLogFileExist bool) (*DB, error) { + var cErr *C.char + cPath := C.CString(path) + defer C.free(unsafe.Pointer(cPath)) + + db := C.rocksdb_open_for_read_only(opts.opts, cPath, boolToUchar(errorIfLogFileExist), &cErr) + if cErr != nil { + defer C.free(unsafe.Pointer(cErr)) + return nil, errors.New(C.GoString(cErr)) + } + + return &DB{ + db: db, + opts: opts, + }, nil +} + // Close closes the database. func (db *DB) Close() error { if db.db != nil { @@ -58,6 +76,17 @@ func (db *DB) Close() error { return nil } +// NewCheckpoint creates a new Checkpoint for this db. +func (db *DB) NewCheckpoint() (*Checkpoint, error) { + var cErr *C.char + cCheckpoint := C.rocksdb_checkpoint_object_create(db.db, &cErr) + if cErr != nil { + defer C.free(unsafe.Pointer(cErr)) + return nil, errors.New(C.GoString(cErr)) + } + return NewNativeCheckpoint(cCheckpoint), nil +} + // Put writes data associated with a key to the database. func (db *DB) Put(opts *WriteOptions, key, value []byte) error { cKey := bytesToChar(key) @@ -130,3 +159,14 @@ func (db *DB) NewIterator(opts *ReadOptions) *Iterator { cIter := C.rocksdb_create_iterator(db.db, opts.opts) return NewNativeIterator(unsafe.Pointer(cIter)) } + +// Flush triggers a manuel flush for the database. +func (db *DB) Flush(opts *FlushOptions) error { + var cErr *C.char + C.rocksdb_flush(db.db, opts.opts, &cErr) + if cErr != nil { + defer C.free(unsafe.Pointer(cErr)) + return errors.New(C.GoString(cErr)) + } + return nil +} diff --git a/rocksdb/db_test.go b/rocksdb/db_test.go index e58b9341f..9d3c5922a 100644 --- a/rocksdb/db_test.go +++ b/rocksdb/db_test.go @@ -17,7 +17,6 @@ package rocksdb import ( - "io/ioutil" "testing" "github.com/stretchr/testify/require" @@ -65,19 +64,3 @@ func TestDBCRUD(t *testing.T) { require.Nil(t, slice3.Data()) } - -func newTestDB(t *testing.T, name string, applyOpts func(opts *Options)) *DB { - dir, err := ioutil.TempDir("", "rocksdb-"+name) - require.NoError(t, err) - - opts := NewDefaultOptions() - opts.SetCreateIfMissing(true) - if applyOpts != nil { - applyOpts(opts) - } - - db, err := OpenDB(dir, opts) - require.NoError(t, err) - - return db -} diff --git a/rocksdb/options.go b/rocksdb/options.go index 9342c48e9..67899371c 100644 --- a/rocksdb/options.go +++ b/rocksdb/options.go @@ -16,8 +16,10 @@ package rocksdb +// #include // #include import "C" +import "unsafe" // CompressionType specifies the block compression. // DB contents are stored in a set of blocks, each of which holds a @@ -92,6 +94,31 @@ func (o *Options) SetBlockBasedTableFactory(value *BlockBasedTableOptions) { C.rocksdb_options_set_block_based_table_factory(o.opts, value.opts) } +// SetDBLogDir specifies the absolute info LOG dir. +// +// If it is empty, the log files will be in the same dir as data. +// If it is non empty, the log files will be in the specified dir, +// and the db data dir's absolute path will be used as the log file +// name's prefix. +// Default: empty +func (o *Options) SetDBLogDir(value string) { + cValue := C.CString(value) + defer C.free(unsafe.Pointer(cValue)) + C.rocksdb_options_set_db_log_dir(o.opts, cValue) +} + +// SetWalDir specifies the absolute dir path for write-ahead logs (WAL). +// +// If it is empty, the log files will be in the same dir as data. +// If it is non empty, the log files will be in the specified dir, +// When destroying the db, all log files and the dir itopts is deleted. +// Default: empty +func (o *Options) SetWalDir(value string) { + cValue := C.CString(value) + defer C.free(unsafe.Pointer(cValue)) + C.rocksdb_options_set_wal_dir(o.opts, cValue) +} + // Destroy deallocates the Options object. func (o *Options) Destroy() { C.rocksdb_options_destroy(o.opts) diff --git a/rocksdb/options_flush.go b/rocksdb/options_flush.go new file mode 100644 index 000000000..3e67ba498 --- /dev/null +++ b/rocksdb/options_flush.go @@ -0,0 +1,43 @@ +/* + Copyright 2018 Banco Bilbao Vizcaya Argentaria, S.A. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package rocksdb + +// #include +import "C" + +// FlushOptions represent all of the available options when manual flushing the +// database. +type FlushOptions struct { + opts *C.rocksdb_flushoptions_t +} + +// NewDefaultFlushOptions creates a default FlushOptions object. +func NewDefaultFlushOptions() *FlushOptions { + return &FlushOptions{C.rocksdb_flushoptions_create()} +} + +// SetWait specify if the flush will wait until the flush is done. +// Default: true +func (o *FlushOptions) SetWait(value bool) { + C.rocksdb_flushoptions_set_wait(o.opts, boolToUchar(value)) +} + +// Destroy deallocates the FlushOptions object. +func (o *FlushOptions) Destroy() { + C.rocksdb_flushoptions_destroy(o.opts) + o.opts = nil +} diff --git a/rocksdb/options_write.go b/rocksdb/options_write.go index de5aced9a..555fd187f 100644 --- a/rocksdb/options_write.go +++ b/rocksdb/options_write.go @@ -29,6 +29,14 @@ func NewDefaultWriteOptions() *WriteOptions { return &WriteOptions{C.rocksdb_writeoptions_create()} } +// SetDisableWAL sets whether WAL should be active or not. +// If true, writes will not first go to the write ahead log, +// and the write may got lost after a crash. +// Default: false +func (o *WriteOptions) SetDisableWAL(value bool) { + C.rocksdb_writeoptions_disable_WAL(o.opts, C.int(btoi(value))) +} + // Destroy deallocates the WriteOptions object. func (o *WriteOptions) Destroy() { C.rocksdb_writeoptions_destroy(o.opts) diff --git a/rocksdb/test_util.go b/rocksdb/test_util.go new file mode 100644 index 000000000..874a9011d --- /dev/null +++ b/rocksdb/test_util.go @@ -0,0 +1,40 @@ +/* + Copyright 2018 Banco Bilbao Vizcaya Argentaria, S.A. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package rocksdb + +import ( + "io/ioutil" + "testing" + + "github.com/stretchr/testify/require" +) + +func newTestDB(t *testing.T, name string, applyOpts func(opts *Options)) *DB { + dir, err := ioutil.TempDir("", "rocksdb-"+name) + require.NoError(t, err) + + opts := NewDefaultOptions() + opts.SetCreateIfMissing(true) + if applyOpts != nil { + applyOpts(opts) + } + + db, err := OpenDB(dir, opts) + require.NoError(t, err) + + return db +} diff --git a/rocksdb/util.go b/rocksdb/util.go index 73405604d..5448a23f5 100644 --- a/rocksdb/util.go +++ b/rocksdb/util.go @@ -23,6 +23,14 @@ import ( "unsafe" ) +// btoi converts a bool value to int. +func btoi(b bool) int { + if b { + return 1 + } + return 0 +} + // boolToUchar converts a bool value to C.uchar. func boolToUchar(b bool) C.uchar { if b { diff --git a/storage/pb/backup.pb.go b/storage/pb/backup.pb.go new file mode 100644 index 000000000..02dfbe169 --- /dev/null +++ b/storage/pb/backup.pb.go @@ -0,0 +1,353 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: backup.proto + +/* + Package pb is a generated protocol buffer package. + + It is generated from these files: + backup.proto + + It has these top-level messages: + KVPair +*/ +package pb + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" + +import io "io" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package + +type KVPair struct { + Key []byte `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` + Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` +} + +func (m *KVPair) Reset() { *m = KVPair{} } +func (m *KVPair) String() string { return proto.CompactTextString(m) } +func (*KVPair) ProtoMessage() {} +func (*KVPair) Descriptor() ([]byte, []int) { return fileDescriptorBackup, []int{0} } + +func (m *KVPair) GetKey() []byte { + if m != nil { + return m.Key + } + return nil +} + +func (m *KVPair) GetValue() []byte { + if m != nil { + return m.Value + } + return nil +} + +func init() { + proto.RegisterType((*KVPair)(nil), "pb.KVPair") +} +func (m *KVPair) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *KVPair) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.Key) > 0 { + dAtA[i] = 0xa + i++ + i = encodeVarintBackup(dAtA, i, uint64(len(m.Key))) + i += copy(dAtA[i:], m.Key) + } + if len(m.Value) > 0 { + dAtA[i] = 0x12 + i++ + i = encodeVarintBackup(dAtA, i, uint64(len(m.Value))) + i += copy(dAtA[i:], m.Value) + } + return i, nil +} + +func encodeVarintBackup(dAtA []byte, offset int, v uint64) int { + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return offset + 1 +} +func (m *KVPair) Size() (n int) { + var l int + _ = l + l = len(m.Key) + if l > 0 { + n += 1 + l + sovBackup(uint64(l)) + } + l = len(m.Value) + if l > 0 { + n += 1 + l + sovBackup(uint64(l)) + } + return n +} + +func sovBackup(x uint64) (n int) { + for { + n++ + x >>= 7 + if x == 0 { + break + } + } + return n +} +func sozBackup(x uint64) (n int) { + return sovBackup(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *KVPair) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowBackup + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: KVPair: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: KVPair: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Key", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowBackup + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthBackup + } + postIndex := iNdEx + byteLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Key = append(m.Key[:0], dAtA[iNdEx:postIndex]...) + if m.Key == nil { + m.Key = []byte{} + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Value", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowBackup + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthBackup + } + postIndex := iNdEx + byteLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Value = append(m.Value[:0], dAtA[iNdEx:postIndex]...) + if m.Value == nil { + m.Value = []byte{} + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipBackup(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthBackup + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipBackup(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowBackup + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowBackup + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + return iNdEx, nil + case 1: + iNdEx += 8 + return iNdEx, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowBackup + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + iNdEx += length + if length < 0 { + return 0, ErrInvalidLengthBackup + } + return iNdEx, nil + case 3: + for { + var innerWire uint64 + var start int = iNdEx + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowBackup + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + innerWire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + innerWireType := int(innerWire & 0x7) + if innerWireType == 4 { + break + } + next, err := skipBackup(dAtA[start:]) + if err != nil { + return 0, err + } + iNdEx = start + next + } + return iNdEx, nil + case 4: + return iNdEx, nil + case 5: + iNdEx += 4 + return iNdEx, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + } + panic("unreachable") +} + +var ( + ErrInvalidLengthBackup = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowBackup = fmt.Errorf("proto: integer overflow") +) + +func init() { proto.RegisterFile("backup.proto", fileDescriptorBackup) } + +var fileDescriptorBackup = []byte{ + // 107 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x49, 0x4a, 0x4c, 0xce, + 0x2e, 0x2d, 0xd0, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x2a, 0x48, 0x52, 0x32, 0xe0, 0x62, + 0xf3, 0x0e, 0x0b, 0x48, 0xcc, 0x2c, 0x12, 0x12, 0xe0, 0x62, 0xce, 0x4e, 0xad, 0x94, 0x60, 0x54, + 0x60, 0xd4, 0xe0, 0x09, 0x02, 0x31, 0x85, 0x44, 0xb8, 0x58, 0xcb, 0x12, 0x73, 0x4a, 0x53, 0x25, + 0x98, 0xc0, 0x62, 0x10, 0x8e, 0x93, 0xc0, 0x89, 0x47, 0x72, 0x8c, 0x17, 0x1e, 0xc9, 0x31, 0x3e, + 0x78, 0x24, 0xc7, 0x38, 0xe3, 0xb1, 0x1c, 0x43, 0x12, 0x1b, 0xd8, 0x38, 0x63, 0x40, 0x00, 0x00, + 0x00, 0xff, 0xff, 0x4b, 0x58, 0x23, 0x5a, 0x5e, 0x00, 0x00, 0x00, +} diff --git a/storage/pb/backup.proto b/storage/pb/backup.proto new file mode 100644 index 000000000..7ddf3f7e1 --- /dev/null +++ b/storage/pb/backup.proto @@ -0,0 +1,24 @@ +/* + Copyright 2018 Banco Bilbao Vizcaya Argentaria, S.A. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +syntax = "proto3"; + +package pb; + +message KVPair { + bytes key = 1; + bytes value = 2; +} \ No newline at end of file diff --git a/storage/pb/gen.go b/storage/pb/gen.go new file mode 100644 index 000000000..e92ddb911 --- /dev/null +++ b/storage/pb/gen.go @@ -0,0 +1,19 @@ +/* + Copyright 2018 Banco Bilbao Vizcaya Argentaria, S.A. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +//go:generate protoc --gofast_out=plugins=grpc:. backup.proto + +package pb diff --git a/storage/rocks/rocksdb_store.go b/storage/rocks/rocksdb_store.go index 6c978313b..81e6b7e72 100644 --- a/storage/rocks/rocksdb_store.go +++ b/storage/rocks/rocksdb_store.go @@ -16,10 +16,16 @@ package rocks import ( + "bufio" "bytes" + "encoding/binary" + "io" + "io/ioutil" + "os" "github.com/bbva/qed/rocksdb" "github.com/bbva/qed/storage" + "github.com/bbva/qed/storage/pb" ) type RocksDBStore struct { @@ -160,3 +166,124 @@ func (s RocksDBStore) Close() error { s.db.Close() return nil } + +func (s RocksDBStore) Delete(prefix byte, key []byte) error { + k := append([]byte{prefix}, key...) + return s.db.Delete(rocksdb.NewDefaultWriteOptions(), k) +} + +// Backup dumps a protobuf-encoded list of all entries in the database into the +// given writer, that are newer than the specified version. +func (s *RocksDBStore) Backup(w io.Writer, until uint64) error { + + // create temp directory + checkDir, err := ioutil.TempDir("", "rocksdb-checkpoint") + if err != nil { + return err + } + os.RemoveAll(checkDir) + + // create checkpoint + checkpoint, err := s.db.NewCheckpoint() + if err != nil { + return err + } + defer checkpoint.Destroy() + + checkpoint.CreateCheckpoint(checkDir, 0) + defer os.RemoveAll(checkDir) + + // open db for read-only + opts := rocksdb.NewDefaultOptions() + checkDB, err := rocksdb.OpenDBForReadOnly(checkDir, opts, true) + if err != nil { + return err + } + + // open a new iterator and dump every key + ro := rocksdb.NewDefaultReadOptions() + ro.SetFillCache(false) + it := checkDB.NewIterator(ro) + defer it.Close() + + for it.SeekToFirst(); it.Valid(); it.Next() { + keySlice := it.Key().Data() + valueSlice := it.Value().Data() + key := append(keySlice[:0:0], keySlice...) // See https://github.com/go101/go101/wiki + value := append(valueSlice[:0:0], valueSlice...) + + entry := &pb.KVPair{ + Key: key, + Value: value, + } + + // write entries to disk + if err := writeTo(entry, w); err != nil { + return err + } + } + + return nil +} + +// Load reads a protobuf-encoded list of all entries from a reader and writes +// them to the database. This can be used to restore the database from a backup +// made by calling DB.Backup(). +// +// DB.Load() should be called on a database that is not running any other +// concurrent transactions while it is running. +func (s *RocksDBStore) Load(r io.Reader) error { + + br := bufio.NewReaderSize(r, 16<<10) + unmarshalBuf := make([]byte, 1<<10) + batch := rocksdb.NewWriteBatch() + wo := rocksdb.NewDefaultWriteOptions() + wo.SetDisableWAL(true) + + for { + var data uint64 + err := binary.Read(br, binary.LittleEndian, &data) + if err == io.EOF { + break + } else if err != nil { + return err + } + + if cap(unmarshalBuf) < int(data) { + unmarshalBuf = make([]byte, data) + } + + kv := &pb.KVPair{} + if _, err = io.ReadFull(br, unmarshalBuf[:data]); err != nil { + return err + } + if err = kv.Unmarshal(unmarshalBuf[:data]); err != nil { + return err + } + batch.Put(kv.Key, kv.Value) + + if batch.Count() == 1000 { + s.db.Write(wo, batch) + batch.Clear() + continue + } + } + + if batch.Count() > 0 { + return s.db.Write(wo, batch) + } + + return nil +} + +func writeTo(entry *pb.KVPair, w io.Writer) error { + if err := binary.Write(w, binary.LittleEndian, uint64(entry.Size())); err != nil { + return err + } + buf, err := entry.Marshal() + if err != nil { + return err + } + _, err = w.Write(buf) + return err +} diff --git a/storage/rocks/rocksdb_store_test.go b/storage/rocks/rocksdb_store_test.go index 7111e0df1..0bc8a23c8 100644 --- a/storage/rocks/rocksdb_store_test.go +++ b/storage/rocks/rocksdb_store_test.go @@ -16,6 +16,7 @@ package rocks import ( + "bytes" "fmt" "io/ioutil" "os" @@ -30,7 +31,7 @@ import ( ) func TestMutate(t *testing.T) { - store, closeF := openRockdsDBStore(t) + store, closeF := openRocksDBStore(t) defer closeF() prefix := byte(0x0) @@ -53,7 +54,7 @@ func TestMutate(t *testing.T) { } func TestGetExistentKey(t *testing.T) { - store, closeF := openRockdsDBStore(t) + store, closeF := openRocksDBStore(t) defer closeF() testCases := []struct { @@ -89,7 +90,7 @@ func TestGetExistentKey(t *testing.T) { } func TestGetRange(t *testing.T) { - store, closeF := openRockdsDBStore(t) + store, closeF := openRocksDBStore(t) defer closeF() var testCases = []struct { @@ -133,7 +134,7 @@ func TestGetAll(t *testing.T) { {17, 59, 14}, } - store, closeF := openRockdsDBStore(t) + store, closeF := openRocksDBStore(t) defer closeF() // insert @@ -165,7 +166,7 @@ func TestGetAll(t *testing.T) { } func TestGetLast(t *testing.T) { - store, closeF := openRockdsDBStore(t) + store, closeF := openRocksDBStore(t) defer closeF() // insert @@ -187,8 +188,54 @@ func TestGetLast(t *testing.T) { require.Equalf(t, util.Uint64AsBytes(numElems-1), kv.Value, "The value should match the last inserted element") } +func TestBackupLoad(t *testing.T) { + + store, closeF := openRocksDBStore(t) + defer closeF() + + // insert + numElems := uint64(20) + prefixes := [][]byte{{storage.IndexPrefix}, {storage.HistoryCachePrefix}, {storage.HyperCachePrefix}} + for _, prefix := range prefixes { + for i := uint64(0); i < numElems; i++ { + key := util.Uint64AsBytes(i) + store.Mutate([]*storage.Mutation{ + {Prefix: prefix[0], Key: key, Value: key}, + }) + } + } + + // create backup + ioBuf := bytes.NewBufferString("") + require.NoError(t, store.Backup(ioBuf, 0)) + + // restore backup + restore, recloseF := openRocksDBStore(t) + defer recloseF() + require.NoError(t, restore.Load(ioBuf)) + + // check elements + for _, prefix := range prefixes { + reader := store.GetAll(prefix[0]) + for { + entries := make([]*storage.KVPair, 1000) + n, _ := reader.Read(entries) + if n == 0 { + break + } + for i := 0; i < n; i++ { + kv, err := restore.Get(prefix[0], entries[i].Key) + require.NoError(t, err) + require.Equal(t, entries[i].Value, kv.Value, "The values should match") + } + } + reader.Close() + } + +} + func BenchmarkMutate(b *testing.B) { - store, closeF := openRockdsDBStore(b) + store, closeF := openRocksDBStore(b) defer closeF() prefix := byte(0x0) b.N = 100000 @@ -202,7 +249,7 @@ func BenchmarkMutate(b *testing.B) { } -func openRockdsDBStore(t require.TestingT) (*RocksDBStore, func()) { +func openRocksDBStore(t require.TestingT) (*RocksDBStore, func()) { path := mustTempDir() store, err := NewRocksDBStore(filepath.Join(path, "rockdsdb_store_test.db")) if err != nil {