diff --git a/store/etcdv3/node.go b/store/etcdv3/node.go index ada07fbd3..c72a21503 100644 --- a/store/etcdv3/node.go +++ b/store/etcdv3/node.go @@ -155,8 +155,14 @@ func (m *Mercury) UpdateNodes(ctx context.Context, nodes ...*types.Node) error { data[fmt.Sprintf(nodePodKey, node.Podname, node.Name)] = d } - _, err := m.BatchUpdate(ctx, data) - return errors.WithStack(err) + resp, err := m.BatchUpdate(ctx, data) + if err != nil { + return err + } + if !resp.Succeeded { + return types.ErrTxnConditionFailed + } + return nil } // UpdateNodeResource update cpu and memory on a node, either add or subtract @@ -246,10 +252,13 @@ func (m *Mercury) doAddNode(ctx context.Context, name, endpoint, podname, ca, ce data[fmt.Sprintf(nodeInfoKey, name)] = d data[fmt.Sprintf(nodePodKey, podname, name)] = d - _, err = m.BatchCreate(ctx, data) + resp, err := m.BatchCreate(ctx, data) if err != nil { return nil, err } + if !resp.Succeeded { + return nil, types.ErrTxnConditionFailed + } go metrics.Client.SendNodeInfo(node.Metrics()) return node, nil diff --git a/store/etcdv3/node_test.go b/store/etcdv3/node_test.go index 58d00689c..6497fd99e 100644 --- a/store/etcdv3/node_test.go +++ b/store/etcdv3/node_test.go @@ -205,6 +205,8 @@ func TestUpdateNode(t *testing.T) { }, } assert.Error(t, m.UpdateNodes(ctx, fakeNode)) + assert.Error(t, m.UpdateNodes(ctx, node), "ETCD Txn condition failed") + node.Available = false assert.NoError(t, m.UpdateNodes(ctx, node)) } diff --git a/store/etcdv3/workload.go b/store/etcdv3/workload.go index 9b8cc2db2..0004510e7 100644 --- a/store/etcdv3/workload.go +++ b/store/etcdv3/workload.go @@ -278,15 +278,22 @@ func (m *Mercury) doOpsWorkload(ctx context.Context, workload *types.Workload, p filepath.Join(workloadDeployPrefix, appname, entrypoint, workload.Nodename, workload.ID): workloadData, } + var resp *clientv3.TxnResponse if create { if processing != nil { processingKey := m.getProcessingKey(processing) err = m.BatchCreateAndDecr(ctx, data, processingKey) } else { - _, err = m.BatchCreate(ctx, data) + resp, err = m.BatchCreate(ctx, data) } } else { - _, err = m.BatchUpdate(ctx, data) + resp, err = m.BatchUpdate(ctx, data) } - return err + if err != nil { + return err + } + if resp != nil && !resp.Succeeded { + return types.ErrTxnConditionFailed + } + return nil } diff --git a/store/etcdv3/workload_test.go b/store/etcdv3/workload_test.go index e1b148379..9a5c42c2f 100644 --- a/store/etcdv3/workload_test.go +++ b/store/etcdv3/workload_test.go @@ -36,6 +36,10 @@ func TestAddORUpdateWorkload(t *testing.T) { assert.NoError(t, err) // success updat err = m.UpdateWorkload(ctx, workload) + assert.Error(t, err, "ETCD Txn condition failed") + // success updat + workload.Name = "test_app_2" + err = m.UpdateWorkload(ctx, workload) assert.NoError(t, err) } diff --git a/types/errors.go b/types/errors.go index 4af398877..4e25a6980 100644 --- a/types/errors.go +++ b/types/errors.go @@ -66,10 +66,11 @@ var ( ErrRunAndWaitCountOneWithStdin = errors.New("Count must be 1 if OpenStdin is true") ErrUnknownControlType = errors.New("Unknown control type") - ErrNoETCD = errors.New("ETCD must be set") - ErrKeyNotExists = errors.New("Key not exists") - ErrKeyExists = errors.New("Key exists") - ErrNoOps = errors.New("No txn ops") + ErrNoETCD = errors.New("ETCD must be set") + ErrKeyNotExists = errors.New("Key not exists") + ErrKeyExists = errors.New("Key exists") + ErrNoOps = errors.New("No txn ops") + ErrTxnConditionFailed = errors.New("ETCD Txn condition failed") ErrNotSupport = errors.New("Not Support") ErrSCMNotSet = errors.New("SCM not set")