diff --git a/store/etcdv3/meta/etcd.go b/store/etcdv3/meta/etcd.go index 6c0c491ba..a2b28409a 100644 --- a/store/etcdv3/meta/etcd.go +++ b/store/etcdv3/meta/etcd.go @@ -74,11 +74,6 @@ func NewETCD(config types.EtcdConfig, embeddedStorage bool) (*ETCD, error) { return &ETCD{cliv3: cliv3, config: config}, nil } -// ClientV3 gets the raw ETCD client v3. -func (e *ETCD) ClientV3() *clientv3.Client { - return e.cliv3.(*clientv3.Client) -} - // TerminateEmbededStorage terminate embedded storage func (e *ETCD) TerminateEmbededStorage() { embedded.TerminateCluster() @@ -87,7 +82,7 @@ func (e *ETCD) TerminateEmbededStorage() { // CreateLock create a lock instance func (e *ETCD) CreateLock(key string, ttl time.Duration) (lock.DistributedLock, error) { lockKey := fmt.Sprintf("%s/%s", e.config.LockPrefix, key) - mutex, err := etcdlock.New(e.ClientV3(), lockKey, ttl) + mutex, err := etcdlock.New(e.cliv3.(*clientv3.Client), lockKey, ttl) return mutex, err } @@ -233,6 +228,21 @@ func (e *ETCD) BatchUpdate(ctx context.Context, data map[string]string, opts ... return e.batchUpdate(ctx, data, opts...) } +// Grant creates a new lease. +func (e *ETCD) Grant(ctx context.Context, ttl int64) (*clientv3.LeaseGrantResponse, error) { + return e.cliv3.Grant(ctx, ttl) +} + +// KeepAliveOnce keeps on a lease alive. +func (e *ETCD) KeepAliveOnce(ctx context.Context, id clientv3.LeaseID) (*clientv3.LeaseKeepAliveResponse, error) { + return e.cliv3.KeepAliveOnce(ctx, id) +} + +// Txn creates a new Txn +func (e *ETCD) Txn(ctx context.Context) clientv3.Txn { + return e.cliv3.Txn(ctx) +} + func (e *ETCD) batchUpdate(ctx context.Context, data map[string]string, opts ...clientv3.OpOption) (*clientv3.TxnResponse, error) { limit := map[string]map[string]string{} for key := range data { diff --git a/store/etcdv3/meta/etcd_test.go b/store/etcdv3/meta/etcd_test.go index 36a0e7527..95ae42e7d 100644 --- a/store/etcdv3/meta/etcd_test.go +++ b/store/etcdv3/meta/etcd_test.go @@ -52,6 +52,31 @@ func TestGetMultiFailedAsBatchGetError(t *testing.T) { require.Nil(t, kvs) } +func TestGrant(t *testing.T) { + e := NewMockedETCD(t) + expErr := fmt.Errorf("exp") + e.cliv3.(*mocks.ETCDClientV3).On("Grant", mock.Anything, mock.Anything).Return(nil, expErr) + resp, err := e.Grant(context.Background(), 1) + require.Equal(t, expErr, err) + require.Nil(t, resp) +} + +func TestKeepAliveOnce(t *testing.T) { + e := NewMockedETCD(t) + expErr := fmt.Errorf("exp") + e.cliv3.(*mocks.ETCDClientV3).On("KeepAliveOnce", mock.Anything, mock.Anything).Return(nil, expErr) + resp, err := e.KeepAliveOnce(context.Background(), 1) + require.Equal(t, expErr, err) + require.Nil(t, resp) +} + +func TestTxn(t *testing.T) { + e := NewMockedETCD(t) + expTxn := &mocks.Txn{} + e.cliv3.(*mocks.ETCDClientV3).On("Txn", mock.Anything).Return(expTxn) + require.Equal(t, expTxn, e.Txn(context.Background())) +} + func NewMockedETCD(t *testing.T) *ETCD { e := NewEmbeddedETCD(t) e.cliv3 = &mocks.ETCDClientV3{} diff --git a/store/etcdv3/meta/meta.go b/store/etcdv3/meta/meta.go index 76f869d72..4a9062fec 100644 --- a/store/etcdv3/meta/meta.go +++ b/store/etcdv3/meta/meta.go @@ -12,7 +12,9 @@ import ( // KV . type KV interface { - ClientV3() *clientv3.Client + Grant(ctx context.Context, ttl int64) (*clientv3.LeaseGrantResponse, error) + KeepAliveOnce(ctx context.Context, id clientv3.LeaseID) (*clientv3.LeaseKeepAliveResponse, error) + Txn(context.Context) clientv3.Txn Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) GetOne(ctx context.Context, key string, opts ...clientv3.OpOption) (*mvccpb.KeyValue, error) diff --git a/store/etcdv3/meta/mocks/KV.go b/store/etcdv3/meta/mocks/KV.go new file mode 100644 index 000000000..785daf5b9 --- /dev/null +++ b/store/etcdv3/meta/mocks/KV.go @@ -0,0 +1,483 @@ +// Code generated by mockery v2.3.0. DO NOT EDIT. + +package mocks + +import ( + context "context" + + clientv3 "go.etcd.io/etcd/v3/clientv3" + + lock "github.com/projecteru2/core/lock" + + mock "github.com/stretchr/testify/mock" + + mvccpb "go.etcd.io/etcd/v3/mvcc/mvccpb" + + time "time" +) + +// KV is an autogenerated mock type for the KV type +type KV struct { + mock.Mock +} + +// BatchCreate provides a mock function with given fields: ctx, data, opts +func (_m *KV) BatchCreate(ctx context.Context, data map[string]string, opts ...clientv3.OpOption) (*clientv3.TxnResponse, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, data) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 *clientv3.TxnResponse + if rf, ok := ret.Get(0).(func(context.Context, map[string]string, ...clientv3.OpOption) *clientv3.TxnResponse); ok { + r0 = rf(ctx, data, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*clientv3.TxnResponse) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, map[string]string, ...clientv3.OpOption) error); ok { + r1 = rf(ctx, data, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// BatchDelete provides a mock function with given fields: ctx, keys, opts +func (_m *KV) BatchDelete(ctx context.Context, keys []string, opts ...clientv3.OpOption) (*clientv3.TxnResponse, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, keys) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 *clientv3.TxnResponse + if rf, ok := ret.Get(0).(func(context.Context, []string, ...clientv3.OpOption) *clientv3.TxnResponse); ok { + r0 = rf(ctx, keys, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*clientv3.TxnResponse) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, []string, ...clientv3.OpOption) error); ok { + r1 = rf(ctx, keys, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// BatchUpdate provides a mock function with given fields: ctx, data, opts +func (_m *KV) BatchUpdate(ctx context.Context, data map[string]string, opts ...clientv3.OpOption) (*clientv3.TxnResponse, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, data) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 *clientv3.TxnResponse + if rf, ok := ret.Get(0).(func(context.Context, map[string]string, ...clientv3.OpOption) *clientv3.TxnResponse); ok { + r0 = rf(ctx, data, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*clientv3.TxnResponse) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, map[string]string, ...clientv3.OpOption) error); ok { + r1 = rf(ctx, data, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// ClientV3 provides a mock function with given fields: +func (_m *KV) ClientV3() *clientv3.Client { + ret := _m.Called() + + var r0 *clientv3.Client + if rf, ok := ret.Get(0).(func() *clientv3.Client); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*clientv3.Client) + } + } + + return r0 +} + +// Create provides a mock function with given fields: ctx, key, val, opts +func (_m *KV) Create(ctx context.Context, key string, val string, opts ...clientv3.OpOption) (*clientv3.TxnResponse, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, key, val) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 *clientv3.TxnResponse + if rf, ok := ret.Get(0).(func(context.Context, string, string, ...clientv3.OpOption) *clientv3.TxnResponse); ok { + r0 = rf(ctx, key, val, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*clientv3.TxnResponse) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, string, string, ...clientv3.OpOption) error); ok { + r1 = rf(ctx, key, val, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// CreateLock provides a mock function with given fields: key, ttl +func (_m *KV) CreateLock(key string, ttl time.Duration) (lock.DistributedLock, error) { + ret := _m.Called(key, ttl) + + var r0 lock.DistributedLock + if rf, ok := ret.Get(0).(func(string, time.Duration) lock.DistributedLock); ok { + r0 = rf(key, ttl) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(lock.DistributedLock) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(string, time.Duration) error); ok { + r1 = rf(key, ttl) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Delete provides a mock function with given fields: ctx, key, opts +func (_m *KV) Delete(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.DeleteResponse, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, key) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 *clientv3.DeleteResponse + if rf, ok := ret.Get(0).(func(context.Context, string, ...clientv3.OpOption) *clientv3.DeleteResponse); ok { + r0 = rf(ctx, key, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*clientv3.DeleteResponse) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, string, ...clientv3.OpOption) error); ok { + r1 = rf(ctx, key, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Get provides a mock function with given fields: ctx, key, opts +func (_m *KV) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, key) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 *clientv3.GetResponse + if rf, ok := ret.Get(0).(func(context.Context, string, ...clientv3.OpOption) *clientv3.GetResponse); ok { + r0 = rf(ctx, key, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*clientv3.GetResponse) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, string, ...clientv3.OpOption) error); ok { + r1 = rf(ctx, key, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetMulti provides a mock function with given fields: ctx, keys, opts +func (_m *KV) GetMulti(ctx context.Context, keys []string, opts ...clientv3.OpOption) ([]*mvccpb.KeyValue, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, keys) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 []*mvccpb.KeyValue + if rf, ok := ret.Get(0).(func(context.Context, []string, ...clientv3.OpOption) []*mvccpb.KeyValue); ok { + r0 = rf(ctx, keys, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*mvccpb.KeyValue) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, []string, ...clientv3.OpOption) error); ok { + r1 = rf(ctx, keys, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetOne provides a mock function with given fields: ctx, key, opts +func (_m *KV) GetOne(ctx context.Context, key string, opts ...clientv3.OpOption) (*mvccpb.KeyValue, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, key) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 *mvccpb.KeyValue + if rf, ok := ret.Get(0).(func(context.Context, string, ...clientv3.OpOption) *mvccpb.KeyValue); ok { + r0 = rf(ctx, key, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*mvccpb.KeyValue) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, string, ...clientv3.OpOption) error); ok { + r1 = rf(ctx, key, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Grant provides a mock function with given fields: ctx, ttl +func (_m *KV) Grant(ctx context.Context, ttl int64) (*clientv3.LeaseGrantResponse, error) { + ret := _m.Called(ctx, ttl) + + var r0 *clientv3.LeaseGrantResponse + if rf, ok := ret.Get(0).(func(context.Context, int64) *clientv3.LeaseGrantResponse); ok { + r0 = rf(ctx, ttl) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*clientv3.LeaseGrantResponse) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, int64) error); ok { + r1 = rf(ctx, ttl) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// KeepAliveOnce provides a mock function with given fields: ctx, id +func (_m *KV) KeepAliveOnce(ctx context.Context, id clientv3.LeaseID) (*clientv3.LeaseKeepAliveResponse, error) { + ret := _m.Called(ctx, id) + + var r0 *clientv3.LeaseKeepAliveResponse + if rf, ok := ret.Get(0).(func(context.Context, clientv3.LeaseID) *clientv3.LeaseKeepAliveResponse); ok { + r0 = rf(ctx, id) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*clientv3.LeaseKeepAliveResponse) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, clientv3.LeaseID) error); ok { + r1 = rf(ctx, id) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Put provides a mock function with given fields: ctx, key, val, opts +func (_m *KV) Put(ctx context.Context, key string, val string, opts ...clientv3.OpOption) (*clientv3.PutResponse, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, key, val) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 *clientv3.PutResponse + if rf, ok := ret.Get(0).(func(context.Context, string, string, ...clientv3.OpOption) *clientv3.PutResponse); ok { + r0 = rf(ctx, key, val, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*clientv3.PutResponse) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, string, string, ...clientv3.OpOption) error); ok { + r1 = rf(ctx, key, val, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// StartEphemeral provides a mock function with given fields: ctx, path, heartbeat +func (_m *KV) StartEphemeral(ctx context.Context, path string, heartbeat time.Duration) (<-chan struct{}, func(), error) { + ret := _m.Called(ctx, path, heartbeat) + + var r0 <-chan struct{} + if rf, ok := ret.Get(0).(func(context.Context, string, time.Duration) <-chan struct{}); ok { + r0 = rf(ctx, path, heartbeat) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(<-chan struct{}) + } + } + + var r1 func() + if rf, ok := ret.Get(1).(func(context.Context, string, time.Duration) func()); ok { + r1 = rf(ctx, path, heartbeat) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).(func()) + } + } + + var r2 error + if rf, ok := ret.Get(2).(func(context.Context, string, time.Duration) error); ok { + r2 = rf(ctx, path, heartbeat) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + +// TerminateEmbededStorage provides a mock function with given fields: +func (_m *KV) TerminateEmbededStorage() { + _m.Called() +} + +// Txn provides a mock function with given fields: _a0 +func (_m *KV) Txn(_a0 context.Context) clientv3.Txn { + ret := _m.Called(_a0) + + var r0 clientv3.Txn + if rf, ok := ret.Get(0).(func(context.Context) clientv3.Txn); ok { + r0 = rf(_a0) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(clientv3.Txn) + } + } + + return r0 +} + +// Update provides a mock function with given fields: ctx, key, val, opts +func (_m *KV) Update(ctx context.Context, key string, val string, opts ...clientv3.OpOption) (*clientv3.TxnResponse, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, key, val) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 *clientv3.TxnResponse + if rf, ok := ret.Get(0).(func(context.Context, string, string, ...clientv3.OpOption) *clientv3.TxnResponse); ok { + r0 = rf(ctx, key, val, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*clientv3.TxnResponse) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, string, string, ...clientv3.OpOption) error); ok { + r1 = rf(ctx, key, val, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Watch provides a mock function with given fields: ctx, key, opts +func (_m *KV) Watch(ctx context.Context, key string, opts ...clientv3.OpOption) clientv3.WatchChan { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, key) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 clientv3.WatchChan + if rf, ok := ret.Get(0).(func(context.Context, string, ...clientv3.OpOption) clientv3.WatchChan); ok { + r0 = rf(ctx, key, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(clientv3.WatchChan) + } + } + + return r0 +} diff --git a/store/etcdv3/node.go b/store/etcdv3/node.go index ba7e1bae8..8965194f3 100644 --- a/store/etcdv3/node.go +++ b/store/etcdv3/node.go @@ -311,8 +311,7 @@ func (m *Mercury) SetNodeStatus(ctx context.Context, node *types.Node, ttl int64 return err } - cliv3 := m.ClientV3() - lease, err := cliv3.Grant(ctx, ttl) + lease, err := m.Grant(ctx, ttl) if err != nil { return err } diff --git a/store/etcdv3/workload.go b/store/etcdv3/workload.go index 3123b9eae..230a4addd 100644 --- a/store/etcdv3/workload.go +++ b/store/etcdv3/workload.go @@ -78,14 +78,13 @@ func (m *Mercury) SetWorkloadStatus(ctx context.Context, workload *types.Workloa statusKey := filepath.Join(workloadStatusPrefix, appname, entrypoint, workload.Nodename, workload.ID) updateStatus := []clientv3.Op{clientv3.OpPut(statusKey, val)} lease := &clientv3.LeaseGrantResponse{} - cliv3 := m.ClientV3() if ttl != 0 { - if lease, err = cliv3.Grant(ctx, ttl); err != nil { + if lease, err = m.Grant(ctx, ttl); err != nil { return err } updateStatus = []clientv3.Op{clientv3.OpPut(statusKey, val, clientv3.WithLease(lease.ID))} } - tr, err := cliv3.Txn(ctx). + tr, err := m.Txn(ctx). If(clientv3.Compare(clientv3.Version(fmt.Sprintf(workloadInfoKey, workload.ID)), "!=", 0)). Then( // 保证有容器 clientv3.OpTxn( @@ -112,7 +111,7 @@ func (m *Mercury) SetWorkloadStatus(ctx context.Context, workload *types.Workloa tr3 := tr2.Responses[0].GetResponseTxn() if tr3.Succeeded && ttl != 0 { // 有 status 并且内容还跟之前一样 oldLeaseID := clientv3.LeaseID(tr3.Responses[0].GetResponseRange().Kvs[0].Lease) // 拿到 status 绑定的 leaseID - _, err := cliv3.KeepAliveOnce(ctx, oldLeaseID) // 刷新 lease + _, err := m.KeepAliveOnce(ctx, oldLeaseID) // 刷新 lease return err } return nil