diff --git a/store/etcdv3/meta/etcd.go b/store/etcdv3/meta/etcd.go index dc6b54de8..97b86a85f 100644 --- a/store/etcdv3/meta/etcd.go +++ b/store/etcdv3/meta/etcd.go @@ -435,6 +435,9 @@ func (e *ETCD) BatchCreateAndDecr(ctx context.Context, data map[string]string, d if err != nil { return } + if len(resp.Kvs) == 0 { + return types.NewDetailedErr(types.ErrKeyNotExists, decrKey) + } decrKv := resp.Kvs[0] putOps := []clientv3.Op{} diff --git a/store/etcdv3/meta/etcd_test.go b/store/etcdv3/meta/etcd_test.go index 673f29d71..8f32a2f81 100644 --- a/store/etcdv3/meta/etcd_test.go +++ b/store/etcdv3/meta/etcd_test.go @@ -3,6 +3,8 @@ package meta import ( "context" "fmt" + "strconv" + "sync" "testing" "github.com/stretchr/testify/mock" @@ -327,7 +329,7 @@ func TestETCD(t *testing.T) { require.True(t, r.Succeeded) // UpdateFail r, err = m.Update(ctx, "test/3", "b") - require.NoError(t, err) + require.EqualError(t, err, "Key not exists") require.False(t, r.Succeeded) // BatchUpdate data = map[string]string{ @@ -343,7 +345,7 @@ func TestETCD(t *testing.T) { "k3": "b2", } r, err = m.BatchUpdate(ctx, data) - require.NoError(t, err) + require.EqualError(t, err, "Key not exists") require.False(t, r.Succeeded) // Watch ctx2, cancel := context.WithCancel(ctx) @@ -358,4 +360,107 @@ func TestETCD(t *testing.T) { }() m.Create(ctx, "watchkey/1", "b") cancel() + + // BatchCreateAndDecr error + data = map[string]string{ + "bcad_k1": "v1", + "bcad_k2": "v1", + } + err = m.BatchCreateAndDecr(context.TODO(), data, "bcad_process") + require.EqualError(t, err, "Key not exists: bcad_process") + + // BatchCreateAndDecr error + _, err = m.Put(context.TODO(), "bcad_process", "a") + require.NoError(t, err) + err = m.BatchCreateAndDecr(context.TODO(), data, "bcad_process") + require.EqualError(t, err, "strconv.Atoi: parsing \"a\": invalid syntax") + + // BatchCreateAndDecr success + _, err = m.Put(context.TODO(), "bcad_process", "20") + require.NoError(t, err) + err = m.BatchCreateAndDecr(context.TODO(), data, "bcad_process") + require.NoError(t, err) + resp, err = m.Get(context.TODO(), "bcad_process") + require.NoError(t, err) + processCnt, err := strconv.Atoi(string(resp.Kvs[0].Value)) + require.NoError(t, err) + require.EqualValues(t, 19, processCnt) + + // BatchCreateAndDecr concurrency + _, err = m.Put(context.TODO(), "bcad_process", "200") + require.NoError(t, err) + wg := sync.WaitGroup{} + for i := 0; i < 200; i++ { + wg.Add(1) + go func() { + defer wg.Done() + m.BatchCreateAndDecr(context.TODO(), data, "bcad_process") + }() + } + wg.Wait() + resp, err = m.Get(context.TODO(), "bcad_process") + require.NoError(t, err) + processCnt, err = strconv.Atoi(string(resp.Kvs[0].Value)) + require.NoError(t, err) + require.EqualValues(t, 0, processCnt) + + // doBatchOp error + _, err = m.doBatchOp(context.TODO(), nil) + require.EqualError(t, err, "No txn ops") + + // doBatchOp: many groups + txnes := []ETCDTxn{} + for i := 0; i < 999; i++ { + txnes = append(txnes, ETCDTxn{Then: []clientv3.Op{clientv3.OpGet("a")}}) + } + txnResp, err := m.doBatchOp(context.TODO(), txnes) + require.NoError(t, err) + require.True(t, txnResp.Succeeded) + require.EqualValues(t, 999, len(txnResp.Responses)) + + // doBatchOp: many then + txnes = []ETCDTxn{{}, {}} + for i := 0; i < 999; i++ { + txnes[0].Then = append(txnes[0].Then, clientv3.OpGet("a")) + txnes[1].Then = append(txnes[1].Then, clientv3.OpGet("a"), clientv3.OpGet("b")) + } + txnResp, err = m.doBatchOp(context.TODO(), txnes) + require.NoError(t, err) + require.True(t, txnResp.Succeeded) + require.EqualValues(t, 999*3, len(txnResp.Responses)) + + // doBatchOp: empty + txnes = []ETCDTxn{{If: []clientv3.Cmp{ + clientv3.Compare(clientv3.Value("a"), "=", string("123")), + }}} + txnResp, err = m.doBatchOp(context.TODO(), txnes) + require.NoError(t, err) + require.False(t, txnResp.Succeeded) + require.EqualValues(t, 0, len(txnResp.Responses)) + + // GetMulti error + _, err = m.GetMulti(context.TODO(), []string{"a", "b"}) + require.EqualError(t, err, "bad `Count` value: key: a") + + // GetMulti success + m.Put(context.TODO(), "a", "b") + m.Put(context.TODO(), "b", "c") + kvs, err := m.GetMulti(context.TODO(), []string{"a", "b"}) + require.NoError(t, err) + require.EqualValues(t, 2, len(kvs)) + + // batchPut: cmpValue branch + data = map[string]string{ + "aa": "bb", + "cc": "dd", + } + limit := map[string]map[string]string{ + "aa": map[string]string{cmpValue: "!="}, + "cc": map[string]string{cmpValue: "!="}, + } + m.Put(context.TODO(), "aa", "aa") + m.Put(context.TODO(), "cc", "cc") + txnResp, err = m.batchPut(context.TODO(), data, limit) + require.NoError(t, err) + require.True(t, txnResp.Succeeded) }