From d811c0de95d8d907734118354056d9cd5dcb9e65 Mon Sep 17 00:00:00 2001 From: dustinxie Date: Tue, 16 Apr 2024 16:21:29 -0700 Subject: [PATCH] [db] add KvVersioned interface (#4041) --- db/db_versioned.go | 156 ++++++++++++++++++++++ db/db_versioned_test.go | 27 ++++ db/db_versioned_util.go | 48 +++++++ db/versionpb/version.pb.go | 257 +++++++++++++++++++++++++++++++++++++ db/versionpb/version.proto | 23 ++++ 5 files changed, 511 insertions(+) create mode 100644 db/db_versioned.go create mode 100644 db/db_versioned_test.go create mode 100644 db/db_versioned_util.go create mode 100644 db/versionpb/version.pb.go create mode 100644 db/versionpb/version.proto diff --git a/db/db_versioned.go b/db/db_versioned.go new file mode 100644 index 0000000000..c0c2096b0e --- /dev/null +++ b/db/db_versioned.go @@ -0,0 +1,156 @@ +// Copyright (c) 2024 IoTeX Foundation +// This is an alpha (internal) release and is not suitable for production. This source code is provided 'as is' and no +// warranties are given as to title or non-infringement, merchantability or fitness for purpose and, to the extent +// permitted by law, all liability for your use of the code is disclaimed. This source code is governed by Apache +// License 2.0 that can be found in the LICENSE file. + +package db + +import ( + "context" + + "github.com/iotexproject/iotex-core/pkg/lifecycle" +) + +type ( + // KvVersioned is a versioned key-value store, where each key has multiple + // versions of value (corresponding to different heights in a blockchain) + // + // Versioning is achieved by using (key + 8-byte version) as the actual + // storage key in the underlying DB. For each bucket, a metadata is stored + // at the special key = []byte{0}. The metadata specifies the bucket's name + // and the key length. + // + // For each versioned key, the special location = key + []byte{0} stores the + // key's metadata, which includes the following info: + // 1. the version when the key is first created + // 2. the version when the key is lastly written + // 3. the version when the key is deleted + // 4. hash of the key's last written value (to detect/avoid same write) + // If the location does not store a value, the key has never been written. + // + // How to use a versioned DB: + // + // db := NewBoltDBVersioned(cfg) // creates a versioned DB + // db.Start(ctx) + // defer func() { db.Stop(ctx) }() + // + // kv := db.SetVersion(5) + // value, err := kv.Get("ns", key) // read 'key' at version 5 + // kv = db.SetVersion(8) + // err := kv.Put("ns", key, value) // write 'key' at version 8 + + KvVersioned interface { + lifecycle.StartStopper + + // Version returns the key's most recent version + Version(string, []byte) (uint64, error) + + // SetVersion sets the version, and returns a KVStore to call Put()/Get() + SetVersion(uint64) KVStoreBasic + } + + // BoltDBVersioned is KvVersioned implementation based on bolt DB + BoltDBVersioned struct { + db *BoltDB + } +) + +// Option sets an option +type Option func(b *BoltDBVersioned) + +// NewBoltDBVersioned instantiates an BoltDB which implements KvVersioned +func NewBoltDBVersioned(cfg Config, opts ...Option) *BoltDBVersioned { + b := &BoltDBVersioned{ + db: NewBoltDB(cfg), + } + for _, opt := range opts { + opt(b) + } + return b +} + +// Start starts the DB +func (b *BoltDBVersioned) Start(ctx context.Context) error { + return b.db.Start(ctx) +} + +// Stop stops the DB +func (b *BoltDBVersioned) Stop(ctx context.Context) error { + return b.db.Stop(ctx) +} + +// Put writes a record +func (b *BoltDBVersioned) Put(ns string, version uint64, key, value []byte) error { + if !b.db.IsReady() { + return ErrDBNotStarted + } + // TODO: implement Put + return nil +} + +// Get retrieves the most recent version +func (b *BoltDBVersioned) Get(ns string, version uint64, key []byte) ([]byte, error) { + if !b.db.IsReady() { + return nil, ErrDBNotStarted + } + // TODO: implement Get + return nil, nil +} + +// Delete deletes a record,if key is nil,this will delete the whole bucket +func (b *BoltDBVersioned) Delete(ns string, key []byte) error { + if !b.db.IsReady() { + return ErrDBNotStarted + } + // TODO: implement Delete + return nil +} + +// Version returns the key's most recent version +func (b *BoltDBVersioned) Version(ns string, key []byte) (uint64, error) { + if !b.db.IsReady() { + return 0, ErrDBNotStarted + } + // TODO: implement Version + return 0, nil +} + +// SetVersion sets the version, and returns a KVStore to call Put()/Get() +func (b *BoltDBVersioned) SetVersion(v uint64) KVStoreBasic { + return &KvWithVersion{ + db: b, + version: v, + } +} + +// KvWithVersion wraps the BoltDBVersioned with a certain version +type KvWithVersion struct { + db *BoltDBVersioned + version uint64 // version for Get/Put() +} + +// Start starts the DB +func (b *KvWithVersion) Start(context.Context) error { + panic("should call BoltDBVersioned's Start method") +} + +// Stop stops the DB +func (b *KvWithVersion) Stop(context.Context) error { + panic("should call BoltDBVersioned's Stop method") +} + +// Put writes a record +func (b *KvWithVersion) Put(ns string, key, value []byte) error { + return b.db.Put(ns, b.version, key, value) +} + +// Get retrieves a key's value +func (b *KvWithVersion) Get(ns string, key []byte) ([]byte, error) { + return b.db.Get(ns, b.version, key) +} + +// Delete deletes a key +func (b *KvWithVersion) Delete(ns string, key []byte) error { + return b.db.Delete(ns, key) +} diff --git a/db/db_versioned_test.go b/db/db_versioned_test.go new file mode 100644 index 0000000000..d132a422b1 --- /dev/null +++ b/db/db_versioned_test.go @@ -0,0 +1,27 @@ +// Copyright (c) 2021 IoTeX Foundation +// This is an alpha (internal) release and is not suitable for production. This source code is provided 'as is' and no +// warranties are given as to title or non-infringement, merchantability or fitness for purpose and, to the extent +// permitted by law, all liability for your use of the code is disclaimed. This source code is governed by Apache +// License 2.0 that can be found in the LICENSE file. + +package db + +import ( + "encoding/hex" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestPb(t *testing.T) { + r := require.New(t) + + vn := &versionedNamespace{ + name: "3jfsp5@(%)EW*#)_#¡ªº–ƒ˚œade∆…", + keyLen: 5} + data := vn.serialize() + r.Equal("0a29336a667370354028252945572a23295f23c2a1c2aac2bae28093c692cb9ac593616465e28886e280a61005", hex.EncodeToString(data)) + vn1, err := deserializeVersionedNamespace(data) + r.NoError(err) + r.Equal(vn, vn1) +} diff --git a/db/db_versioned_util.go b/db/db_versioned_util.go new file mode 100644 index 0000000000..76fd9f51e1 --- /dev/null +++ b/db/db_versioned_util.go @@ -0,0 +1,48 @@ +// Copyright (c) 2023 IoTeX Foundation +// This is an alpha (internal) release and is not suitable for production. This source code is provided 'as is' and no +// warranties are given as to title or non-infringement, merchantability or fitness for purpose and, to the extent +// permitted by law, all liability for your use of the code is disclaimed. This source code is governed by Apache +// License 2.0 that can be found in the LICENSE file. + +package db + +import ( + "github.com/iotexproject/go-pkgs/byteutil" + "google.golang.org/protobuf/proto" + + "github.com/iotexproject/iotex-core/db/versionpb" +) + +// versionedNamespace is the metadata for versioned namespace +type versionedNamespace struct { + name string + keyLen uint32 +} + +// serialize to bytes +func (vn *versionedNamespace) serialize() []byte { + return byteutil.Must(proto.Marshal(vn.toProto())) +} + +func (vn *versionedNamespace) toProto() *versionpb.VersionedNamespace { + return &versionpb.VersionedNamespace{ + Name: vn.name, + KeyLen: vn.keyLen, + } +} + +func fromProtoVN(pb *versionpb.VersionedNamespace) *versionedNamespace { + return &versionedNamespace{ + name: pb.Name, + keyLen: pb.KeyLen, + } +} + +// deserializeVersionedNamespace deserializes byte-stream to VersionedNamespace +func deserializeVersionedNamespace(buf []byte) (*versionedNamespace, error) { + var vn versionpb.VersionedNamespace + if err := proto.Unmarshal(buf, &vn); err != nil { + return nil, err + } + return fromProtoVN(&vn), nil +} diff --git a/db/versionpb/version.pb.go b/db/versionpb/version.pb.go new file mode 100644 index 0000000000..e15c6531f5 --- /dev/null +++ b/db/versionpb/version.pb.go @@ -0,0 +1,257 @@ +// Copyright (c) 2023 IoTeX +// This source code is provided 'as is' and no warranties are given as to title or non-infringement, merchantability +// or fitness for purpose and, to the extent permitted by law, all liability for your use of the code is disclaimed. +// This source code is governed by Apache License 2.0 that can be found in the LICENSE file. + +// To compile the proto, run: +// protoc --go_out=. *.proto +// option go_package = "../indexpb"; is to specify the package name in the generated go file + +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.1 +// protoc v3.21.12 +// source: version.proto + +package versionpb + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type VersionedNamespace struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + KeyLen uint32 `protobuf:"varint,2,opt,name=keyLen,proto3" json:"keyLen,omitempty"` +} + +func (x *VersionedNamespace) Reset() { + *x = VersionedNamespace{} + if protoimpl.UnsafeEnabled { + mi := &file_version_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *VersionedNamespace) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*VersionedNamespace) ProtoMessage() {} + +func (x *VersionedNamespace) ProtoReflect() protoreflect.Message { + mi := &file_version_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use VersionedNamespace.ProtoReflect.Descriptor instead. +func (*VersionedNamespace) Descriptor() ([]byte, []int) { + return file_version_proto_rawDescGZIP(), []int{0} +} + +func (x *VersionedNamespace) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *VersionedNamespace) GetKeyLen() uint32 { + if x != nil { + return x.KeyLen + } + return 0 +} + +type KeyMeta struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + LastWriteHash []byte `protobuf:"bytes,1,opt,name=lastWriteHash,proto3" json:"lastWriteHash,omitempty"` // hash of value that was last written + FirstVersion uint64 `protobuf:"varint,2,opt,name=firstVersion,proto3" json:"firstVersion,omitempty"` + LastVersion uint64 `protobuf:"varint,3,opt,name=lastVersion,proto3" json:"lastVersion,omitempty"` + DeleteVersion uint64 `protobuf:"varint,4,opt,name=deleteVersion,proto3" json:"deleteVersion,omitempty"` +} + +func (x *KeyMeta) Reset() { + *x = KeyMeta{} + if protoimpl.UnsafeEnabled { + mi := &file_version_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *KeyMeta) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*KeyMeta) ProtoMessage() {} + +func (x *KeyMeta) ProtoReflect() protoreflect.Message { + mi := &file_version_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use KeyMeta.ProtoReflect.Descriptor instead. +func (*KeyMeta) Descriptor() ([]byte, []int) { + return file_version_proto_rawDescGZIP(), []int{1} +} + +func (x *KeyMeta) GetLastWriteHash() []byte { + if x != nil { + return x.LastWriteHash + } + return nil +} + +func (x *KeyMeta) GetFirstVersion() uint64 { + if x != nil { + return x.FirstVersion + } + return 0 +} + +func (x *KeyMeta) GetLastVersion() uint64 { + if x != nil { + return x.LastVersion + } + return 0 +} + +func (x *KeyMeta) GetDeleteVersion() uint64 { + if x != nil { + return x.DeleteVersion + } + return 0 +} + +var File_version_proto protoreflect.FileDescriptor + +var file_version_proto_rawDesc = []byte{ + 0x0a, 0x0d, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, + 0x09, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x70, 0x62, 0x22, 0x40, 0x0a, 0x12, 0x56, 0x65, + 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x65, 0x64, 0x4e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, + 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, + 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x6b, 0x65, 0x79, 0x4c, 0x65, 0x6e, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x0d, 0x52, 0x06, 0x6b, 0x65, 0x79, 0x4c, 0x65, 0x6e, 0x22, 0x9b, 0x01, 0x0a, + 0x07, 0x4b, 0x65, 0x79, 0x4d, 0x65, 0x74, 0x61, 0x12, 0x24, 0x0a, 0x0d, 0x6c, 0x61, 0x73, 0x74, + 0x57, 0x72, 0x69, 0x74, 0x65, 0x48, 0x61, 0x73, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, + 0x0d, 0x6c, 0x61, 0x73, 0x74, 0x57, 0x72, 0x69, 0x74, 0x65, 0x48, 0x61, 0x73, 0x68, 0x12, 0x22, + 0x0a, 0x0c, 0x66, 0x69, 0x72, 0x73, 0x74, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x04, 0x52, 0x0c, 0x66, 0x69, 0x72, 0x73, 0x74, 0x56, 0x65, 0x72, 0x73, 0x69, + 0x6f, 0x6e, 0x12, 0x20, 0x0a, 0x0b, 0x6c, 0x61, 0x73, 0x74, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, + 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0b, 0x6c, 0x61, 0x73, 0x74, 0x56, 0x65, 0x72, + 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x24, 0x0a, 0x0d, 0x64, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x56, 0x65, + 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0d, 0x64, 0x65, 0x6c, + 0x65, 0x74, 0x65, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x42, 0x31, 0x5a, 0x2f, 0x67, 0x69, + 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x69, 0x6f, 0x74, 0x65, 0x78, 0x70, 0x72, + 0x6f, 0x6a, 0x65, 0x63, 0x74, 0x2f, 0x69, 0x6f, 0x74, 0x65, 0x78, 0x2d, 0x63, 0x6f, 0x72, 0x65, + 0x2f, 0x64, 0x62, 0x2f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x70, 0x62, 0x62, 0x06, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_version_proto_rawDescOnce sync.Once + file_version_proto_rawDescData = file_version_proto_rawDesc +) + +func file_version_proto_rawDescGZIP() []byte { + file_version_proto_rawDescOnce.Do(func() { + file_version_proto_rawDescData = protoimpl.X.CompressGZIP(file_version_proto_rawDescData) + }) + return file_version_proto_rawDescData +} + +var file_version_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_version_proto_goTypes = []interface{}{ + (*VersionedNamespace)(nil), // 0: versionpb.VersionedNamespace + (*KeyMeta)(nil), // 1: versionpb.KeyMeta +} +var file_version_proto_depIdxs = []int32{ + 0, // [0:0] is the sub-list for method output_type + 0, // [0:0] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_version_proto_init() } +func file_version_proto_init() { + if File_version_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_version_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*VersionedNamespace); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_version_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*KeyMeta); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_version_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_version_proto_goTypes, + DependencyIndexes: file_version_proto_depIdxs, + MessageInfos: file_version_proto_msgTypes, + }.Build() + File_version_proto = out.File + file_version_proto_rawDesc = nil + file_version_proto_goTypes = nil + file_version_proto_depIdxs = nil +} diff --git a/db/versionpb/version.proto b/db/versionpb/version.proto new file mode 100644 index 0000000000..69645bed67 --- /dev/null +++ b/db/versionpb/version.proto @@ -0,0 +1,23 @@ +// Copyright (c) 2023 IoTeX +// This source code is provided 'as is' and no warranties are given as to title or non-infringement, merchantability +// or fitness for purpose and, to the extent permitted by law, all liability for your use of the code is disclaimed. +// This source code is governed by Apache License 2.0 that can be found in the LICENSE file. + +// To compile the proto, run: +// protoc --go_out=. *.proto +// option go_package = "../indexpb"; is to specify the package name in the generated go file +syntax = "proto3"; +package versionpb; +option go_package = "github.com/iotexproject/iotex-core/db/versionpb"; + +message VersionedNamespace { + string name = 1; + uint32 keyLen = 2; +} + +message KeyMeta { + bytes lastWriteHash = 1; // hash of value that was last written + uint64 firstVersion = 2; + uint64 lastVersion = 3; + uint64 deleteVersion = 4; +}