Skip to content

Commit

Permalink
refactor store methods
Browse files Browse the repository at this point in the history
  • Loading branch information
CMGS committed Dec 2, 2019
1 parent 8ad868c commit 1c7fd80
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 89 deletions.
6 changes: 3 additions & 3 deletions store/etcdv3/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (m *Mercury) ContainerStatusStream(ctx context.Context, appname, entrypoint
ch := make(chan *types.ContainerStatus)
go func() {
defer close(ch)
for resp := range m.Watch(ctx, statusKey, clientv3.WithPrefix(), clientv3.WithPrevKV()) {
for resp := range m.watch(ctx, statusKey, clientv3.WithPrefix(), clientv3.WithPrevKV()) {
if resp.Err() != nil {
if !resp.Canceled {
log.Errorf("[ContainerStatusStream] watch failed %v", resp.Err())
Expand Down Expand Up @@ -290,9 +290,9 @@ func (m *Mercury) doOpsContainer(ctx context.Context, container *types.Container

if create {
data[filepath.Join(containerDeployPrefix, appname, entrypoint, container.Nodename, container.ID)] = containerData
_, err = m.BatchCreate(ctx, data)
_, err = m.batchCreate(ctx, data)
} else {
_, err = m.BatchUpdate(ctx, data)
_, err = m.batchUpdate(ctx, data)
}
return err
}
4 changes: 4 additions & 0 deletions store/etcdv3/container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,14 @@ func TestContainer(t *testing.T) {
assert.NoError(t, m.RemoveContainer(ctx, newContainer))
// Deployed
container.StatusMeta = &types.StatusMeta{
ID: container.ID,
Running: true,
}
err = m.SetContainerStatus(ctx, container, 0)
assert.NoError(t, err)
rs, err := m.GetContainerStatus(ctx, container.ID)
assert.NoError(t, err)
assert.Equal(t, rs.ID, container.ID)
container2 := &types.Container{
ID: container.ID,
Nodename: "n2",
Expand Down
130 changes: 54 additions & 76 deletions store/etcdv3/mercury.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"context"
"crypto/tls"
"fmt"
"path/filepath"
"strings"
"sync"
"time"

Expand All @@ -14,6 +12,7 @@ import (
log "github.com/sirupsen/logrus"

"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/namespace"
"github.com/coreos/etcd/mvcc/mvccpb"
"github.com/projecteru2/core/lock"
"github.com/projecteru2/core/lock/etcdlock"
Expand Down Expand Up @@ -77,6 +76,9 @@ func New(config types.Config, embededStorage bool) (*Mercury, error) {
}); err != nil {
return nil, err
}
cliv3.KV = namespace.NewKV(cliv3.KV, config.Etcd.Prefix)
cliv3.Watcher = namespace.NewWatcher(cliv3.Watcher, config.Etcd.Prefix)
cliv3.Lease = namespace.NewLease(cliv3.Lease, config.Etcd.Prefix)
return &Mercury{cliv3: cliv3, config: config}, nil
}

Expand All @@ -94,16 +96,7 @@ func (m *Mercury) CreateLock(key string, ttl time.Duration) (lock.DistributedLoc

// Get get results or noting
func (m *Mercury) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) {
return m.cliv3.Get(ctx, m.parseKey(key), opts...)
}

func (m *Mercury) batchGet(ctx context.Context, keys []string, opt ...clientv3.OpOption) (txnResponse *clientv3.TxnResponse, err error) {
ops := []clientv3.Op{}
for _, key := range keys {
op := clientv3.OpGet(m.parseKey(key), opt...)
ops = append(ops, op)
}
return m.doBatchOp(ctx, nil, ops, nil)
return m.cliv3.Get(ctx, key, opts...)
}

// GetOne get one result or noting
Expand All @@ -112,11 +105,9 @@ func (m *Mercury) GetOne(ctx context.Context, key string, opts ...clientv3.OpOpt
if err != nil {
return nil, err
}

if resp.Count != 1 {
return nil, types.NewDetailedErr(types.ErrBadCount, fmt.Sprintf("key: %s", key))
}

return resp.Kvs[0], nil
}

Expand All @@ -126,67 +117,93 @@ func (m *Mercury) GetMulti(ctx context.Context, keys []string, opts ...clientv3.
if len(keys) == 0 {
return
}

if txnResponse, err = m.batchGet(ctx, keys); err != nil {
return
}

for idx, responseOp := range txnResponse.Responses {
resp := responseOp.GetResponseRange()
if resp.Count != 1 {
err = types.NewDetailedErr(types.ErrBadCount, fmt.Sprintf("key: %s", keys[idx]))
return
return nil, types.NewDetailedErr(types.ErrBadCount, fmt.Sprintf("key: %s", keys[idx]))
}

kvs = append(kvs, resp.Kvs[0])
}

if len(kvs) != len(keys) {
err = types.NewDetailedErr(types.ErrBadCount, fmt.Sprintf("keys: %v", keys))
}

return
}

// Delete delete key
func (m *Mercury) Delete(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.DeleteResponse, error) {
return m.cliv3.Delete(ctx, m.parseKey(key), opts...)
return m.cliv3.Delete(ctx, key, opts...)
}

// BatchDelete batch delete keys
func (m *Mercury) batchDelete(ctx context.Context, keys []string, opts ...clientv3.OpOption) (*clientv3.TxnResponse, error) {
// Put save a key value
func (m *Mercury) Put(ctx context.Context, key, val string, opts ...clientv3.OpOption) (*clientv3.PutResponse, error) {
return m.cliv3.Put(ctx, key, val, opts...)
}

// Create create a key if not exists
func (m *Mercury) Create(ctx context.Context, key, val string, opts ...clientv3.OpOption) (*clientv3.TxnResponse, error) {
return m.batchCreate(ctx, map[string]string{key: val}, opts...)
}

// Update update a key if exists
func (m *Mercury) Update(ctx context.Context, key, val string, opts ...clientv3.OpOption) (*clientv3.TxnResponse, error) {
return m.batchUpdate(ctx, map[string]string{key: val}, opts...)
}

// GetThenPut if key exists, then put
func (m *Mercury) GetThenPut(ctx context.Context, getKeys []string, key, val string, opts ...clientv3.OpOption) (*clientv3.TxnResponse, error) {
ops := []clientv3.Op{clientv3.OpPut(key, val, opts...)}
conds := []clientv3.Cmp{}
for _, getKey := range getKeys {
cond := clientv3.Compare(clientv3.Version(getKey), "!=", 0)
conds = append(conds, cond)
}
return m.doBatchOp(ctx, conds, ops, []clientv3.Op{})
}

// Watch wath a key
func (m *Mercury) watch(ctx context.Context, key string, opts ...clientv3.OpOption) clientv3.WatchChan {
return m.cliv3.Watch(ctx, key, opts...)
}

func (m *Mercury) batchGet(ctx context.Context, keys []string, opt ...clientv3.OpOption) (txnResponse *clientv3.TxnResponse, err error) {
ops := []clientv3.Op{}
for _, key := range keys {
op := clientv3.OpDelete(m.parseKey(key), opts...)
op := clientv3.OpGet(key, opt...)
ops = append(ops, op)
}

return m.doBatchOp(ctx, nil, ops, nil)
}

// Put save a key value
func (m *Mercury) Put(ctx context.Context, key, val string, opts ...clientv3.OpOption) (*clientv3.TxnResponse, error) {
return m.batchPut(ctx, map[string]string{key: val}, nil, opts...)
func (m *Mercury) batchDelete(ctx context.Context, keys []string, opts ...clientv3.OpOption) (*clientv3.TxnResponse, error) {
ops := []clientv3.Op{}
for _, key := range keys {
op := clientv3.OpDelete(key, opts...)
ops = append(ops, op)
}

return m.doBatchOp(ctx, nil, ops, nil)
}

func (m *Mercury) batchPut(ctx context.Context, data map[string]string, limit map[string]map[string]string, opts ...clientv3.OpOption) (*clientv3.TxnResponse, error) {
ops := []clientv3.Op{}
failOps := []clientv3.Op{}
conds := []clientv3.Cmp{}
for key, val := range data {
prefixKey := m.parseKey(key)
op := clientv3.OpPut(prefixKey, val, opts...)
op := clientv3.OpPut(key, val, opts...)
ops = append(ops, op)
if v, ok := limit[key]; ok {
for method, condition := range v {
switch method {
case cmpVersion:
cond := clientv3.Compare(clientv3.Version(prefixKey), condition, 0)
cond := clientv3.Compare(clientv3.Version(key), condition, 0)
conds = append(conds, cond)
case cmpValue:
cond := clientv3.Compare(clientv3.Value(prefixKey), condition, val)
failOp := clientv3.OpGet(prefixKey)
failOps = append(failOps, failOp)
cond := clientv3.Compare(clientv3.Value(key), condition, val)
failOps = append(failOps, clientv3.OpGet(key))
conds = append(conds, cond)
}
}
Expand All @@ -195,13 +212,7 @@ func (m *Mercury) batchPut(ctx context.Context, data map[string]string, limit ma
return m.doBatchOp(ctx, conds, ops, failOps)
}

// Create create a key if not exists
func (m *Mercury) Create(ctx context.Context, key, val string, opts ...clientv3.OpOption) (*clientv3.TxnResponse, error) {
return m.BatchCreate(ctx, map[string]string{key: val}, opts...)
}

// BatchCreate create key values if not exists
func (m *Mercury) BatchCreate(ctx context.Context, data map[string]string, opts ...clientv3.OpOption) (*clientv3.TxnResponse, error) {
func (m *Mercury) batchCreate(ctx context.Context, data map[string]string, opts ...clientv3.OpOption) (*clientv3.TxnResponse, error) {
limit := map[string]map[string]string{}
for key := range data {
limit[key] = map[string]string{cmpVersion: "="}
Expand All @@ -216,26 +227,7 @@ func (m *Mercury) BatchCreate(ctx context.Context, data map[string]string, opts
return resp, nil
}

// Update update a key if exists
func (m *Mercury) Update(ctx context.Context, key, val string, opts ...clientv3.OpOption) (*clientv3.TxnResponse, error) {
return m.BatchUpdate(ctx, map[string]string{key: val}, opts...)
}

// GetThenPut if key exists, then put
func (m *Mercury) GetThenPut(ctx context.Context, getKeys []string, key, val string, opts ...clientv3.OpOption) (*clientv3.TxnResponse, error) {
prefixKey := m.parseKey(key)
ops := []clientv3.Op{clientv3.OpPut(prefixKey, val, opts...)}
conds := []clientv3.Cmp{}
for _, getKey := range getKeys {
prefixGetKey := m.parseKey(getKey)
cond := clientv3.Compare(clientv3.Version(prefixGetKey), "!=", 0)
conds = append(conds, cond)
}
return m.doBatchOp(ctx, conds, ops, []clientv3.Op{})
}

// BatchUpdate batch update keys
func (m *Mercury) BatchUpdate(ctx context.Context, data map[string]string, opts ...clientv3.OpOption) (*clientv3.TxnResponse, error) {
func (m *Mercury) batchUpdate(ctx context.Context, data map[string]string, opts ...clientv3.OpOption) (*clientv3.TxnResponse, error) {
limit := map[string]map[string]string{}
for key := range data {
limit[key] = map[string]string{cmpVersion: "!=", cmpValue: "!="} // ignore same data
Expand All @@ -254,20 +246,6 @@ func (m *Mercury) BatchUpdate(ctx context.Context, data map[string]string, opts
return resp, nil
}

// Watch wath a key
func (m *Mercury) Watch(ctx context.Context, key string, opts ...clientv3.OpOption) clientv3.WatchChan {
key = m.parseKey(key)
return m.cliv3.Watch(ctx, key, opts...)
}

func (m *Mercury) parseKey(key string) string {
after := filepath.Join(m.config.Etcd.Prefix, key)
if strings.HasSuffix(key, "/") {
after = after + "/"
}
return after
}

func (m *Mercury) doBatchOp(ctx context.Context, conds []clientv3.Cmp, ops, failOps []clientv3.Op) (*clientv3.TxnResponse, error) {
if len(ops) == 0 {
return nil, types.ErrNoOps
Expand Down
10 changes: 5 additions & 5 deletions store/etcdv3/mercury_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,11 @@ func TestMercury(t *testing.T) {
"k1": "a1",
"k2": "a2",
}
r, err = m.BatchCreate(ctx, data)
r, err = m.batchCreate(ctx, data)
assert.NoError(t, err)
assert.True(t, r.Succeeded)
// BatchCreateFailed
r, err = m.BatchCreate(ctx, data)
r, err = m.batchCreate(ctx, data)
assert.Error(t, err)
assert.False(t, r.Succeeded)
// Update
Expand All @@ -95,20 +95,20 @@ func TestMercury(t *testing.T) {
"k1": "b1",
"k2": "b2",
}
r, err = m.BatchUpdate(ctx, data)
r, err = m.batchUpdate(ctx, data)
assert.NoError(t, err)
assert.True(t, r.Succeeded)
// BatchUpdateFail
data = map[string]string{
"k1": "c1",
"k3": "b2",
}
r, err = m.BatchUpdate(ctx, data)
r, err = m.batchUpdate(ctx, data)
assert.Error(t, err)
assert.False(t, r.Succeeded)
// Watch
ctx2, cancel := context.WithCancel(ctx)
ch := m.Watch(ctx2, "watchkey", clientv3.WithPrefix())
ch := m.watch(ctx2, "watchkey", clientv3.WithPrefix())
go func() {
for r := range ch {
assert.NotEmpty(t, r.Events)
Expand Down
7 changes: 2 additions & 5 deletions store/etcdv3/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,7 @@ func (m *Mercury) GetNode(ctx context.Context, podname, nodename string) (*types
podNodes := map[string][]string{podname: []string{nodename}}
nodes, err := m.GetNodes(ctx, podNodes)
if _, ok := nodes[nodename]; !ok {
return nil, types.NewDetailedErr(
types.ErrBadMeta,
fmt.Sprintf("nodename: %s, nodes: %v", nodename, nodes),
)
return nil, types.NewDetailedErr(types.ErrBadMeta, fmt.Sprintf("nodename: %s, nodes: %v", nodename, nodes))
}
return nodes[nodename], err
}
Expand Down Expand Up @@ -334,7 +331,7 @@ func (m *Mercury) doAddNode(ctx context.Context, name, endpoint, podname, ca, ce
data[fmt.Sprintf(nodeInfoKey, podname, name)] = string(bytes)
data[fmt.Sprintf(nodePodKey, name)] = podname

_, err = m.BatchCreate(ctx, data)
_, err = m.batchCreate(ctx, data)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 1c7fd80

Please sign in to comment.