Skip to content

Commit

Permalink
fix: only remove node status when the node is unavailable (#516)
Browse files Browse the repository at this point in the history
* fix: keep the node status when a running node is bypassed

* fix the issue that BindStatus cannot handle ttl changes
  • Loading branch information
DuodenumL authored Dec 20, 2021
1 parent b0374f5 commit e160ade
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 54 deletions.
13 changes: 8 additions & 5 deletions cluster/calcium/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,13 @@ func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*typ
n.Available = (opts.StatusOpt == types.TriTrue) || (opts.StatusOpt == types.TriKeep && n.Available)
n.Bypass = (opts.BypassOpt == types.TriTrue) || (opts.BypassOpt == types.TriKeep && n.Bypass)
if n.IsDown() {
logger.Errorf(ctx, "[SetNodeAvailable] node marked down: %s", opts.Nodename)
log.Errorf(ctx, "[SetNodeAvailable] node marked down: %s", opts.Nodename)
}
if !n.Available {
// remove node status
err := c.store.SetNodeStatus(ctx, node, -1)
if err != nil {
if err := c.store.SetNodeStatus(ctx, node, -1); err != nil {
// don't return here
log.Errorf(ctx, "[SetNode] failed to set node status, err: %+v", errors.WithStack(err))
logger.Errorf(ctx, "[SetNode] failed to remove node status, err: %+v", errors.WithStack(err))
}
}
if opts.WorkloadsDown {
Expand Down Expand Up @@ -105,7 +106,9 @@ func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*typ

// mark workload which belongs to this node as unhealthy
if err = c.store.SetWorkloadStatus(ctx, workload.StatusMeta, 0); err != nil {
log.Errorf(ctx, "[SetNodeAvailable] Set workload %s on node %s inactive failed %v", workload.ID, opts.Nodename, err)
log.Errorf(ctx, "[SetNodeAvailable] Set workload %s on node %s as inactive failed %v", workload.ID, opts.Nodename, errors.WithStack(err))
} else {
log.Infof(ctx, "[SetNodeAvailable] Set workload %s on node %s as inactive", workload.ID, opts.Nodename)
}
}
}
Expand Down
116 changes: 100 additions & 16 deletions store/etcdv3/meta/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package meta
import (
"context"
"crypto/tls"
"errors"
"fmt"
"strconv"
"sync"
Expand Down Expand Up @@ -239,6 +240,34 @@ func (e *ETCD) BatchUpdate(ctx context.Context, data map[string]string, opts ...
return e.batchUpdate(ctx, data, opts...)
}

// isTTLChanged returns true if there is a lease with a different ttl bound to the key
func (e *ETCD) isTTLChanged(ctx context.Context, key string, ttl int64) (bool, error) {
resp, err := e.GetOne(ctx, key)
if err != nil {
if errors.Is(err, types.ErrBadCount) {
return ttl != 0, nil
}
return false, err
}

leaseID := clientv3.LeaseID(resp.Lease)
if leaseID == 0 {
return ttl != 0, nil
}

getTTLResp, err := e.cliv3.TimeToLive(ctx, leaseID)
if err != nil {
return false, err
}

changed := getTTLResp.GrantedTTL != ttl
if changed {
log.Infof(ctx, "[isTTLChanged] key %v ttl changed from %v to %v", key, getTTLResp.GrantedTTL, ttl)
}

return changed, nil
}

// BindStatus keeps on a lease alive.
func (e *ETCD) BindStatus(ctx context.Context, entityKey, statusKey, statusValue string, ttl int64) error {
if ttl == 0 {
Expand All @@ -256,19 +285,38 @@ func (e *ETCD) bindStatusWithTTL(ctx context.Context, entityKey, statusKey, stat
leaseID := lease.ID
updateStatus := []clientv3.Op{clientv3.OpPut(statusKey, statusValue, clientv3.WithLease(lease.ID))}

entityTxn, err := e.cliv3.Txn(ctx).
If(clientv3.Compare(clientv3.Version(entityKey), "!=", 0)).
Then( // making sure there's an exists entity kv-pair.
clientv3.OpTxn(
[]clientv3.Cmp{clientv3.Compare(clientv3.Version(statusKey), "!=", 0)}, // Is the status exists?
[]clientv3.Op{clientv3.OpTxn( // there's an exists status
[]clientv3.Cmp{clientv3.Compare(clientv3.Value(statusKey), "=", statusValue)},
[]clientv3.Op{clientv3.OpGet(statusKey)}, // The status hasn't been changed.
updateStatus, // The status had been changed.
)},
updateStatus, // there isn't a status
),
).Commit()
ttlChanged, err := e.isTTLChanged(ctx, statusKey, ttl)
if err != nil {
return err
}

var entityTxn *clientv3.TxnResponse

if ttlChanged {
entityTxn, err = e.cliv3.Txn(ctx).
If(clientv3.Compare(clientv3.Version(entityKey), "!=", 0)).
Then(updateStatus...). // making sure there's an exists entity kv-pair.
Commit()
} else {
entityTxn, err = e.cliv3.Txn(ctx).
If(clientv3.Compare(clientv3.Version(entityKey), "!=", 0)).
Then( // making sure there's an exists entity kv-pair.
clientv3.OpTxn(
[]clientv3.Cmp{clientv3.Compare(clientv3.Version(statusKey), "!=", 0)}, // Is the status exists?
[]clientv3.Op{clientv3.OpTxn( // there's an exists status
[]clientv3.Cmp{clientv3.Compare(clientv3.LeaseValue(statusKey), "!=", 0)}, //
[]clientv3.Op{clientv3.OpTxn( // there has been a lease bound to the status
[]clientv3.Cmp{clientv3.Compare(clientv3.Value(statusKey), "=", statusValue)}, // Is the status changed?
[]clientv3.Op{clientv3.OpGet(statusKey)}, // The status hasn't been changed.
updateStatus, // The status had been changed.
)},
updateStatus, // there is no lease bound to the status
)},
updateStatus, // there isn't a status
),
).Commit()
}

if err != nil {
e.revokeLease(ctx, leaseID)
return err
Expand All @@ -280,15 +328,30 @@ func (e *ETCD) bindStatusWithTTL(ctx context.Context, entityKey, statusKey, stat
return types.ErrEntityNotExists
}

// if ttl is changed, replace with the new lease
if ttlChanged {
log.Infof(ctx, "[bindStatusWithTTL] put: key %s value %s", statusKey, statusValue)
return nil
}

// There isn't a status bound to the entity.
statusTxn := entityTxn.Responses[0].GetResponseTxn()
if !statusTxn.Succeeded {
log.Infof(ctx, "[bindStatusWithTTL] put: key %s value %s", statusKey, statusValue)
return nil
}

// There is no lease bound to the status yet
leaseTxn := statusTxn.Responses[0].GetResponseTxn()
if !leaseTxn.Succeeded {
log.Infof(ctx, "[bindStatusWithTTL] put: key %s value %s", statusKey, statusValue)
return nil
}

// There is a status bound to the entity yet but its value isn't same as the expected one.
valueTxn := statusTxn.Responses[0].GetResponseTxn()
valueTxn := leaseTxn.Responses[0].GetResponseTxn()
if !valueTxn.Succeeded {
log.Infof(ctx, "[bindStatusWithTTL] put: key %s value %s", statusKey, statusValue)
return nil
}

Expand All @@ -309,7 +372,22 @@ func (e *ETCD) bindStatusWithTTL(ctx context.Context, entityKey, statusKey, stat
// agent may report status earlier when core has not recorded the entity.
func (e *ETCD) bindStatusWithoutTTL(ctx context.Context, statusKey, statusValue string) error {
updateStatus := []clientv3.Op{clientv3.OpPut(statusKey, statusValue)}
_, err := e.cliv3.Txn(ctx).

ttlChanged, err := e.isTTLChanged(ctx, statusKey, 0)
if err != nil {
return err
}
if ttlChanged {
_, err := e.Put(ctx, statusKey, statusValue)
if err != nil {
return err
}

log.Infof(ctx, "[bindStatusWithoutTTL] put: key %s value %s", statusKey, statusValue)
return nil
}

resp, err := e.cliv3.Txn(ctx).
If(clientv3.Compare(clientv3.Version(statusKey), "!=", 0)). // if there's an existing status key
Then(clientv3.OpTxn( // deal with existing status key
[]clientv3.Cmp{clientv3.Compare(clientv3.Value(statusKey), "!=", statusValue)}, // if the new value != the old value
Expand All @@ -318,7 +396,13 @@ func (e *ETCD) bindStatusWithoutTTL(ctx context.Context, statusKey, statusValue
)).
Else(updateStatus...). // otherwise deal with non-existing status key
Commit()
return err
if err != nil {
return err
}
if !resp.Succeeded || resp.Responses[0].GetResponseTxn().Succeeded {
log.Infof(ctx, "[bindStatusWithoutTTL] put: key %s value %s", statusKey, statusValue)
}
return nil
}

func (e *ETCD) revokeLease(ctx context.Context, leaseID clientv3.LeaseID) {
Expand Down
72 changes: 39 additions & 33 deletions store/etcdv3/meta/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
func TestGetOneError(t *testing.T) {
e := NewMockedETCD(t)
expErr := fmt.Errorf("exp")
e.cliv3.(*mocks.ETCDClientV3).On("Get", mock.Anything, mock.Anything).Return(nil, expErr).Once()
e.cliv3.(*mocks.ETCDClientV3).On("Get", mock.Anything, mock.Anything).Return(nil, expErr)
kv, err := e.GetOne(context.Background(), "foo")
require.Equal(t, expErr, err)
require.Nil(t, kv)
Expand All @@ -29,7 +29,7 @@ func TestGetOneError(t *testing.T) {
func TestGetOneFailedAsRespondMore(t *testing.T) {
e := NewMockedETCD(t)
expResp := &clientv3.GetResponse{Count: 2}
e.cliv3.(*mocks.ETCDClientV3).On("Get", mock.Anything, mock.Anything).Return(expResp, nil).Once()
e.cliv3.(*mocks.ETCDClientV3).On("Get", mock.Anything, mock.Anything).Return(expResp, nil)
kv, err := e.GetOne(context.Background(), "foo")
require.Error(t, err)
require.Nil(t, kv)
Expand All @@ -46,10 +46,10 @@ func TestGetMultiFailedAsBatchGetError(t *testing.T) {
e := NewMockedETCD(t)
expErr := fmt.Errorf("exp")
expTxn := &mocks.Txn{}
expTxn.On("If", mock.Anything).Return(expTxn).Once()
expTxn.On("Then", mock.Anything).Return(expTxn).Once()
expTxn.On("Else", mock.Anything).Return(expTxn).Once()
expTxn.On("Commit").Return(nil, expErr).Once()
expTxn.On("If", mock.Anything).Return(expTxn)
expTxn.On("Then", mock.Anything).Return(expTxn)
expTxn.On("Else", mock.Anything).Return(expTxn)
expTxn.On("Commit").Return(nil, expErr)
e.cliv3.(*mocks.ETCDClientV3).On("Txn", mock.Anything).Return(expTxn)
kvs, err := e.GetMulti(context.Background(), []string{"foo"})
require.Equal(t, expErr, err)
Expand All @@ -69,7 +69,7 @@ func TestBindStatusFailedAsGrantError(t *testing.T) {
e, etcd, assert := testKeepAliveETCD(t)
defer assert()
expErr := fmt.Errorf("exp")
etcd.On("Grant", mock.Anything, mock.Anything).Return(nil, expErr).Once()
etcd.On("Grant", mock.Anything, mock.Anything).Return(nil, expErr)
require.Equal(t, expErr, e.BindStatus(context.Background(), "/entity", "/status", "status", 1))
}

Expand All @@ -80,12 +80,13 @@ func TestBindStatusFailedAsCommitError(t *testing.T) {
expErr := fmt.Errorf("exp")
txn := &mocks.Txn{}
defer txn.AssertExpectations(t)
txn.On("If", mock.Anything).Return(txn).Once()
txn.On("Then", mock.Anything).Return(txn).Once()
txn.On("Commit").Return(nil, expErr).Once()
etcd.On("Get", mock.Anything, mock.Anything).Return(&clientv3.GetResponse{}, nil)
txn.On("If", mock.Anything).Return(txn)
txn.On("Then", mock.Anything).Return(txn)
txn.On("Commit").Return(nil, expErr)

etcd.On("Grant", mock.Anything, mock.Anything).Return(&clientv3.LeaseGrantResponse{}, nil).Once()
etcd.On("Txn", mock.Anything).Return(txn).Once()
etcd.On("Grant", mock.Anything, mock.Anything).Return(&clientv3.LeaseGrantResponse{}, nil)
etcd.On("Txn", mock.Anything).Return(txn)
require.Equal(t, expErr, e.BindStatus(context.Background(), "/entity", "/status", "status", 1))
}

Expand All @@ -96,12 +97,13 @@ func TestBindStatusButEntityTxnUnsuccessful(t *testing.T) {
entityTxn := &clientv3.TxnResponse{Succeeded: false}
txn := &mocks.Txn{}
defer txn.AssertExpectations(t)
txn.On("If", mock.Anything).Return(txn).Once()
txn.On("Then", mock.Anything).Return(txn).Once()
etcd.On("Get", mock.Anything, mock.Anything).Return(&clientv3.GetResponse{}, nil)
txn.On("If", mock.Anything).Return(txn)
txn.On("Then", mock.Anything).Return(txn)
txn.On("Commit").Return(entityTxn, nil)

etcd.On("Grant", mock.Anything, mock.Anything).Return(&clientv3.LeaseGrantResponse{}, nil).Once()
etcd.On("Txn", mock.Anything).Return(txn).Once()
etcd.On("Grant", mock.Anything, mock.Anything).Return(&clientv3.LeaseGrantResponse{}, nil)
etcd.On("Txn", mock.Anything).Return(txn)
require.Equal(t, types.ErrEntityNotExists, e.BindStatus(context.Background(), "/entity", "/status", "status", 1))
}

Expand All @@ -122,12 +124,13 @@ func TestBindStatusButStatusTxnUnsuccessful(t *testing.T) {
}
txn := &mocks.Txn{}
defer txn.AssertExpectations(t)
txn.On("If", mock.Anything).Return(txn).Once()
txn.On("Then", mock.Anything).Return(txn).Once()
txn.On("If", mock.Anything).Return(txn)
txn.On("Then", mock.Anything).Return(txn)
txn.On("Commit").Return(entityTxn, nil)

etcd.On("Grant", mock.Anything, mock.Anything).Return(&clientv3.LeaseGrantResponse{}, nil).Once()
etcd.On("Txn", mock.Anything).Return(txn).Once()
etcd.On("Grant", mock.Anything, mock.Anything).Return(&clientv3.LeaseGrantResponse{}, nil)
etcd.On("Txn", mock.Anything).Return(txn)
etcd.On("Get", mock.Anything, mock.Anything).Return(&clientv3.GetResponse{}, nil)
require.Equal(t, nil, e.BindStatus(context.Background(), "/entity", "/status", "status", 1))
}

Expand All @@ -148,12 +151,14 @@ func TestBindStatusWithZeroTTL(t *testing.T) {
}
txn := &mocks.Txn{}
defer txn.AssertExpectations(t)
txn.On("If", mock.Anything).Return(txn).Once()
txn.On("Then", mock.Anything).Return(txn).Once()
txn.On("Else", mock.Anything).Return(txn).Once()
txn.On("If", mock.Anything).Return(txn)
txn.On("Then", mock.Anything).Return(txn)
txn.On("Else", mock.Anything).Return(txn)
txn.On("Commit").Return(entityTxn, nil)

etcd.On("Txn", mock.Anything).Return(txn).Once()
etcd.On("Txn", mock.Anything).Return(txn)

etcd.On("Get", mock.Anything, mock.Anything).Return(&clientv3.GetResponse{}, nil)
require.Equal(t, nil, e.BindStatus(context.Background(), "/entity", "/status", "status", 0))
}

Expand Down Expand Up @@ -185,12 +190,13 @@ func TestBindStatusButValueTxnUnsuccessful(t *testing.T) {
}
txn := &mocks.Txn{}
defer txn.AssertExpectations(t)
txn.On("If", mock.Anything).Return(txn).Once()
txn.On("Then", mock.Anything).Return(txn).Once()
txn.On("If", mock.Anything).Return(txn)
txn.On("Then", mock.Anything).Return(txn)
txn.On("Commit").Return(entityTxn, nil)

etcd.On("Txn", mock.Anything).Return(txn).Once()
etcd.On("Grant", mock.Anything, mock.Anything).Return(&clientv3.LeaseGrantResponse{}, nil).Once()
etcd.On("Txn", mock.Anything).Return(txn)
etcd.On("Grant", mock.Anything, mock.Anything).Return(&clientv3.LeaseGrantResponse{}, nil)
etcd.On("Get", mock.Anything, mock.Anything).Return(&clientv3.GetResponse{}, nil)
require.Equal(t, nil, e.BindStatus(context.Background(), "/entity", "/status", "status", 1))
}

Expand Down Expand Up @@ -236,13 +242,13 @@ func TestBindStatus(t *testing.T) {
}
txn := &mocks.Txn{}
defer txn.AssertExpectations(t)
txn.On("If", mock.Anything).Return(txn).Once()
txn.On("Then", mock.Anything).Return(txn).Once()
txn.On("If", mock.Anything).Return(txn)
txn.On("Then", mock.Anything).Return(txn)
txn.On("Commit").Return(entityTxn, nil)

etcd.On("Grant", mock.Anything, mock.Anything).Return(&clientv3.LeaseGrantResponse{}, nil).Once()
etcd.On("Txn", mock.Anything).Return(txn).Once()
etcd.On("KeepAliveOnce", mock.Anything, clientv3.LeaseID(leaseID)).Return(nil, nil).Once()
etcd.On("Grant", mock.Anything, mock.Anything).Return(&clientv3.LeaseGrantResponse{}, nil)
etcd.On("Txn", mock.Anything).Return(txn)
etcd.On("Get", mock.Anything, mock.Anything).Return(&clientv3.GetResponse{}, nil)
require.Equal(t, nil, e.BindStatus(context.Background(), "/entity", "/status", "status", 1))
}

Expand Down

0 comments on commit e160ade

Please sign in to comment.