forked from portworx/kvdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
kvdb_mgr.go
132 lines (119 loc) · 3.19 KB
/
kvdb_mgr.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package kvdb
import (
"fmt"
"sync"
)
var (
instance Kvdb
datastores = make(map[string]DatastoreInit)
datastoreVersions = make(map[string]DatastoreVersion)
wrappers = make(map[WrapperName]WrapperInit)
lock sync.RWMutex
)
// Instance returns instance set via SetInstance, nil if none was set.
func Instance() Kvdb {
return instance
}
// SetInstance sets the singleton instance.
func SetInstance(kvdb Kvdb) error {
instance = kvdb
return nil
}
// New return a new instance of KVDB as specified by datastore name.
// If domain is set all requests to KVDB are prefixed by domain.
// options is interpreted by backend KVDB.
func New(
name string,
domain string,
machines []string,
options map[string]string,
errorCB FatalErrorCB,
) (Kvdb, error) {
lock.RLock()
defer lock.RUnlock()
if dsInit, exists := datastores[name]; exists {
return dsInit(domain, machines, options, errorCB)
}
return nil, ErrNotSupported
}
// AddWrapper adds wrapper is it is not already added
func AddWrapper(
wrapper WrapperName,
kvdb Kvdb,
options map[string]string,
) (Kvdb, error) {
lock.Lock()
defer lock.Unlock()
for w := kvdb; w != nil; w = w.WrappedKvdb() {
if w.WrapperName() == wrapper {
return kvdb, fmt.Errorf("wrapper %v already present in kvdb",
wrapper)
}
}
if initFn, ok := wrappers[wrapper]; !ok {
return kvdb, fmt.Errorf("wrapper %v not found", wrapper)
} else {
// keep log wrapper at the top if it exists
if kvdb.WrapperName() == Wrapper_Log {
newWrapper, err := initFn(kvdb.WrappedKvdb(), options)
if err == nil {
kvdb.SetWrappedKvdb(newWrapper)
return kvdb, nil
} else {
return kvdb, err
}
}
return initFn(kvdb, options)
}
}
// RemoveWrapper adds wrapper is it is not already added
func RemoveWrapper(
wrapper WrapperName,
kvdb Kvdb,
) (Kvdb, error) {
var prevWrapper Kvdb
for w := kvdb; w != nil; w = w.WrappedKvdb() {
if w.WrapperName() == wrapper {
w.Removed()
if prevWrapper != nil {
prevWrapper.SetWrappedKvdb(w.WrappedKvdb())
return kvdb, nil
} else {
// removing the top-most wrapper
return w.WrappedKvdb(), nil
}
}
prevWrapper = w
}
return kvdb, nil // did not find the wrapper to remove
}
// Register adds specified datastore backend to the list of options.
func Register(name string, dsInit DatastoreInit, dsVersion DatastoreVersion) error {
lock.Lock()
defer lock.Unlock()
if _, exists := datastores[name]; exists {
return fmt.Errorf("Datastore provider %q is already registered", name)
}
datastores[name] = dsInit
if _, exists := datastoreVersions[name]; exists {
return fmt.Errorf("Datastore provider's %q version function already registered", name)
}
datastoreVersions[name] = dsVersion
return nil
}
// Register wrapper
func RegisterWrapper(name WrapperName, initFn WrapperInit) error {
lock.Lock()
defer lock.Unlock()
wrappers[name] = initFn
return nil
}
// Version returns the supported version for the provided kvdb endpoint.
func Version(name string, url string, kvdbOptions map[string]string) (string, error) {
lock.RLock()
defer lock.RUnlock()
if dsVersion, exists := datastoreVersions[name]; exists {
return dsVersion(url, kvdbOptions)
}
return "", ErrNotSupported
}