From c3c6b816eee31043b583bee01d3bec00dffbdc68 Mon Sep 17 00:00:00 2001 From: DuodenumL Date: Wed, 29 Dec 2021 17:41:38 +0800 Subject: [PATCH] ETCD/Redis supports "BatchPut" --- 3rdmocks/ServerStream.go | 2 +- cluster/mocks/Cluster.go | 2 +- engine/docker/mocks/APIClient.go | 2 +- engine/factory/factory.go | 11 ++++++--- engine/mocks/API.go | 2 +- lock/mocks/DistributedLock.go | 2 +- rpc/mocks/CoreRPC_RunAndWaitServer.go | 2 +- scheduler/mocks/Scheduler.go | 2 +- source/mocks/Source.go | 2 +- store/etcdv3/meta/etcd.go | 5 ++++ store/etcdv3/meta/meta.go | 1 + store/etcdv3/meta/mocks/ETCDClientV3.go | 2 +- store/etcdv3/meta/mocks/KV.go | 32 ++++++++++++++++++++++++- store/etcdv3/meta/mocks/Txn.go | 2 +- store/etcdv3/node.go | 2 +- store/etcdv3/node_test.go | 2 +- store/mocks/Store.go | 2 +- store/redis/node.go | 2 +- store/redis/node_test.go | 2 +- store/redis/rediaron.go | 22 +++++++++++++++++ 20 files changed, 82 insertions(+), 19 deletions(-) diff --git a/3rdmocks/ServerStream.go b/3rdmocks/ServerStream.go index 547c5a687..3f2cebdfc 100644 --- a/3rdmocks/ServerStream.go +++ b/3rdmocks/ServerStream.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.9.4. DO NOT EDIT. +// Code generated by mockery v0.0.0-dev. DO NOT EDIT. package mocks diff --git a/cluster/mocks/Cluster.go b/cluster/mocks/Cluster.go index a4f8b62a9..aca0d84a8 100644 --- a/cluster/mocks/Cluster.go +++ b/cluster/mocks/Cluster.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.9.4. DO NOT EDIT. +// Code generated by mockery v0.0.0-dev. DO NOT EDIT. package mocks diff --git a/engine/docker/mocks/APIClient.go b/engine/docker/mocks/APIClient.go index e7d756d59..3783fd42e 100644 --- a/engine/docker/mocks/APIClient.go +++ b/engine/docker/mocks/APIClient.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.9.4. DO NOT EDIT. +// Code generated by mockery v0.0.0-dev. DO NOT EDIT. package mocks diff --git a/engine/factory/factory.go b/engine/factory/factory.go index 64c0767b5..07e2bbdf8 100644 --- a/engine/factory/factory.go +++ b/engine/factory/factory.go @@ -43,7 +43,8 @@ func validateEngine(ctx context.Context, engine engine.API, timeout time.Duratio // GetEngineFromCache . func GetEngineFromCache(ctx context.Context, config types.Config, endpoint, ca, cert, key string) engine.API { - client := engineCache.Get(getEngineCacheKey(endpoint, ca, cert, key)) + cacheKey := getEngineCacheKey(endpoint, ca, cert, key) + client := engineCache.Get(cacheKey) if client == nil { return nil } @@ -57,7 +58,9 @@ func GetEngineFromCache(ctx context.Context, config types.Config, endpoint, ca, // RemoveEngineFromCache . func RemoveEngineFromCache(endpoint, ca, cert, key string) { - engineCache.Delete(getEngineCacheKey(endpoint, ca, cert, key)) + cacheKey := getEngineCacheKey(endpoint, ca, cert, key) + log.Infof(context.TODO(), "[RemoveEngineFromCache] remove %v, key %v", endpoint, cacheKey) + engineCache.Delete(cacheKey) } // GetEngine get engine @@ -68,7 +71,9 @@ func GetEngine(ctx context.Context, config types.Config, nodename, endpoint, ca, defer func() { if err == nil && client != nil { - engineCache.Set(getEngineCacheKey(endpoint, ca, cert, key), client) + cacheKey := getEngineCacheKey(endpoint, ca, cert, key) + log.Infof(ctx, "[GetEngine] store engine of %v in cache, key: %v", endpoint, cacheKey) + engineCache.Set(cacheKey, client) } }() diff --git a/engine/mocks/API.go b/engine/mocks/API.go index 50c2be548..30a0a1ea9 100644 --- a/engine/mocks/API.go +++ b/engine/mocks/API.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.9.4. DO NOT EDIT. +// Code generated by mockery v0.0.0-dev. DO NOT EDIT. package mocks diff --git a/lock/mocks/DistributedLock.go b/lock/mocks/DistributedLock.go index 08af5f60c..4b941e5f1 100644 --- a/lock/mocks/DistributedLock.go +++ b/lock/mocks/DistributedLock.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.9.4. DO NOT EDIT. +// Code generated by mockery v0.0.0-dev. DO NOT EDIT. package mocks diff --git a/rpc/mocks/CoreRPC_RunAndWaitServer.go b/rpc/mocks/CoreRPC_RunAndWaitServer.go index 495dd6463..9377f73e7 100644 --- a/rpc/mocks/CoreRPC_RunAndWaitServer.go +++ b/rpc/mocks/CoreRPC_RunAndWaitServer.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.9.4. DO NOT EDIT. +// Code generated by mockery v0.0.0-dev. DO NOT EDIT. package mocks diff --git a/scheduler/mocks/Scheduler.go b/scheduler/mocks/Scheduler.go index edeaa01ca..83b4f41ec 100644 --- a/scheduler/mocks/Scheduler.go +++ b/scheduler/mocks/Scheduler.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.9.4. DO NOT EDIT. +// Code generated by mockery v0.0.0-dev. DO NOT EDIT. package mocks diff --git a/source/mocks/Source.go b/source/mocks/Source.go index 376ccdec1..ac84f3a54 100644 --- a/source/mocks/Source.go +++ b/source/mocks/Source.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.9.4. DO NOT EDIT. +// Code generated by mockery v0.0.0-dev. DO NOT EDIT. package mocks diff --git a/store/etcdv3/meta/etcd.go b/store/etcdv3/meta/etcd.go index 4053ea582..862a44d11 100644 --- a/store/etcdv3/meta/etcd.go +++ b/store/etcdv3/meta/etcd.go @@ -240,6 +240,11 @@ func (e *ETCD) BatchUpdate(ctx context.Context, data map[string]string, opts ... return e.batchUpdate(ctx, data, opts...) } +// BatchPut . +func (e *ETCD) BatchPut(ctx context.Context, data map[string]string, opts ...clientv3.OpOption) (*clientv3.TxnResponse, error) { + return e.batchPut(ctx, data, nil, opts...) +} + // isTTLChanged returns true if there is a lease with a different ttl bound to the key func (e *ETCD) isTTLChanged(ctx context.Context, key string, ttl int64) (bool, error) { resp, err := e.GetOne(ctx, key) diff --git a/store/etcdv3/meta/meta.go b/store/etcdv3/meta/meta.go index b9546f568..b80795085 100644 --- a/store/etcdv3/meta/meta.go +++ b/store/etcdv3/meta/meta.go @@ -30,6 +30,7 @@ type KV interface { BatchCreate(ctx context.Context, data map[string]string, opts ...clientv3.OpOption) (*clientv3.TxnResponse, error) BatchUpdate(ctx context.Context, data map[string]string, opts ...clientv3.OpOption) (*clientv3.TxnResponse, error) BatchDelete(ctx context.Context, keys []string, opts ...clientv3.OpOption) (*clientv3.TxnResponse, error) + BatchPut(ctx context.Context, data map[string]string, opts ...clientv3.OpOption) (*clientv3.TxnResponse, error) StartEphemeral(ctx context.Context, path string, heartbeat time.Duration) (<-chan struct{}, func(), error) CreateLock(key string, ttl time.Duration) (lock.DistributedLock, error) diff --git a/store/etcdv3/meta/mocks/ETCDClientV3.go b/store/etcdv3/meta/mocks/ETCDClientV3.go index 36fa091e8..28a4221c8 100644 --- a/store/etcdv3/meta/mocks/ETCDClientV3.go +++ b/store/etcdv3/meta/mocks/ETCDClientV3.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.9.4. DO NOT EDIT. +// Code generated by mockery v0.0.0-dev. DO NOT EDIT. package mocks diff --git a/store/etcdv3/meta/mocks/KV.go b/store/etcdv3/meta/mocks/KV.go index c7aab6092..6125d4552 100644 --- a/store/etcdv3/meta/mocks/KV.go +++ b/store/etcdv3/meta/mocks/KV.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.9.4. DO NOT EDIT. +// Code generated by mockery v0.0.0-dev. DO NOT EDIT. package mocks @@ -95,6 +95,36 @@ func (_m *KV) BatchDelete(ctx context.Context, keys []string, opts ...clientv3.O return r0, r1 } +// BatchPut provides a mock function with given fields: ctx, data, opts +func (_m *KV) BatchPut(ctx context.Context, data map[string]string, opts ...clientv3.OpOption) (*clientv3.TxnResponse, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, data) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 *clientv3.TxnResponse + if rf, ok := ret.Get(0).(func(context.Context, map[string]string, ...clientv3.OpOption) *clientv3.TxnResponse); ok { + r0 = rf(ctx, data, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*clientv3.TxnResponse) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, map[string]string, ...clientv3.OpOption) error); ok { + r1 = rf(ctx, data, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // BatchUpdate provides a mock function with given fields: ctx, data, opts func (_m *KV) BatchUpdate(ctx context.Context, data map[string]string, opts ...clientv3.OpOption) (*clientv3.TxnResponse, error) { _va := make([]interface{}, len(opts)) diff --git a/store/etcdv3/meta/mocks/Txn.go b/store/etcdv3/meta/mocks/Txn.go index 7192aa0ad..15e96cfc3 100644 --- a/store/etcdv3/meta/mocks/Txn.go +++ b/store/etcdv3/meta/mocks/Txn.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.9.4. DO NOT EDIT. +// Code generated by mockery v0.0.0-dev. DO NOT EDIT. package mocks diff --git a/store/etcdv3/node.go b/store/etcdv3/node.go index f7f6ca29a..0ea2778cf 100644 --- a/store/etcdv3/node.go +++ b/store/etcdv3/node.go @@ -165,7 +165,7 @@ func (m *Mercury) UpdateNodes(ctx context.Context, nodes ...*types.Node) error { enginefactory.RemoveEngineFromCache(node.Endpoint, node.Ca, node.Cert, node.Key) } - resp, err := m.BatchUpdate(ctx, data) + resp, err := m.BatchPut(ctx, data) if err != nil { return err } diff --git a/store/etcdv3/node_test.go b/store/etcdv3/node_test.go index 6f8287b60..0307f9e22 100644 --- a/store/etcdv3/node_test.go +++ b/store/etcdv3/node_test.go @@ -207,7 +207,7 @@ func TestUpdateNode(t *testing.T) { Key: "hh", }, } - assert.Error(t, m.UpdateNodes(ctx, fakeNode)) + assert.NoError(t, m.UpdateNodes(ctx, fakeNode)) assert.NoError(t, m.UpdateNodes(ctx, node)) node.Available = false assert.NoError(t, m.UpdateNodes(ctx, node)) diff --git a/store/mocks/Store.go b/store/mocks/Store.go index ff1db6eaa..e01aa70eb 100644 --- a/store/mocks/Store.go +++ b/store/mocks/Store.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.9.4. DO NOT EDIT. +// Code generated by mockery v0.0.0-dev. DO NOT EDIT. package mocks diff --git a/store/redis/node.go b/store/redis/node.go index c273203ba..009da07c5 100644 --- a/store/redis/node.go +++ b/store/redis/node.go @@ -161,7 +161,7 @@ func (r *Rediaron) UpdateNodes(ctx context.Context, nodes ...*types.Node) error addIfNotEmpty(fmt.Sprintf(nodeCertKey, node.Name), node.Cert) addIfNotEmpty(fmt.Sprintf(nodeKeyKey, node.Name), node.Key) } - return errors.WithStack(r.BatchUpdate(ctx, data)) + return errors.WithStack(r.BatchPut(ctx, data)) } // UpdateNodeResource update cpu and memory on a node, either add or subtract diff --git a/store/redis/node_test.go b/store/redis/node_test.go index b8a5bc26e..e20d0e188 100644 --- a/store/redis/node_test.go +++ b/store/redis/node_test.go @@ -200,7 +200,7 @@ func (s *RediaronTestSuite) TestUpdateNode() { Key: "hh", }, } - s.Error(s.rediaron.UpdateNodes(ctx, fakeNode)) + s.NoError(s.rediaron.UpdateNodes(ctx, fakeNode)) s.NoError(s.rediaron.UpdateNodes(ctx, node)) } diff --git a/store/redis/rediaron.go b/store/redis/rediaron.go index 5c62898ab..588fa09d5 100644 --- a/store/redis/rediaron.go +++ b/store/redis/rediaron.go @@ -238,6 +238,28 @@ func (r *Rediaron) BatchCreate(ctx context.Context, data map[string]string) erro return nil } +// BatchPut is wrapper to adapt etcd batch replace +func (r *Rediaron) BatchPut(ctx context.Context, data map[string]string) error { + replace := func(pipe redis.Pipeliner) error { + for key, value := range data { + pipe.Set(ctx, key, value, 0) + } + return nil + } + + cmds, err := r.cli.TxPipelined(ctx, replace) + if err != nil { + return err + } + + for _, cmd := range cmds { + if err := cmd.Err(); err != nil { + return err + } + } + return nil +} + // BatchCreateAndDecr decr processing and add workload func (r *Rediaron) BatchCreateAndDecr(ctx context.Context, data map[string]string, decrKey string) (err error) { batchCreateAndDecr := func(pipe redis.Pipeliner) error {