Skip to content

Commit

Permalink
use the real context from exterior (#466)
Browse files Browse the repository at this point in the history
Co-authored-by: anrs <[email protected]>
  • Loading branch information
anrs and anrs authored Aug 27, 2021
1 parent fd7dc02 commit 46f170b
Show file tree
Hide file tree
Showing 10 changed files with 63 additions and 84 deletions.
4 changes: 2 additions & 2 deletions cluster/calcium/calcium.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ func New(config types.Config, t *testing.T) (*Calcium, error) {
}

// DisasterRecover .
func (c *Calcium) DisasterRecover() {
c.wal.Recover()
func (c *Calcium) DisasterRecover(ctx context.Context) {
c.wal.Recover(ctx)
}

// Finalizer use for defer
Expand Down
20 changes: 10 additions & 10 deletions cluster/calcium/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,13 @@ func (h *CreateWorkloadHandler) Event() string {
}

// Check .
func (h *CreateWorkloadHandler) Check(raw interface{}) (bool, error) {
func (h *CreateWorkloadHandler) Check(ctx context.Context, raw interface{}) (bool, error) {
wrk, ok := raw.(*types.Workload)
if !ok {
return false, types.NewDetailedErr(types.ErrInvalidType, raw)
}

ctx, cancel := getReplayContext(context.Background())
ctx, cancel := getReplayContext(ctx)
defer cancel()

_, err := h.calcium.GetWorkload(ctx, wrk.ID)
Expand Down Expand Up @@ -122,13 +122,13 @@ func (h *CreateWorkloadHandler) Decode(bs []byte) (interface{}, error) {
}

// Handle .
func (h *CreateWorkloadHandler) Handle(raw interface{}) error {
func (h *CreateWorkloadHandler) Handle(ctx context.Context, raw interface{}) error {
wrk, ok := raw.(*types.Workload)
if !ok {
return types.NewDetailedErr(types.ErrInvalidType, raw)
}

ctx, cancel := getReplayContext(context.Background())
ctx, cancel := getReplayContext(ctx)
defer cancel()

// There hasn't been the exact workload metadata, so we must remove it.
Expand Down Expand Up @@ -173,7 +173,7 @@ func (h *CreateLambdaHandler) Event() string {
}

// Check .
func (h *CreateLambdaHandler) Check(interface{}) (bool, error) {
func (h *CreateLambdaHandler) Check(context.Context, interface{}) (bool, error) {
return true, nil
}

Expand All @@ -194,20 +194,20 @@ func (h *CreateLambdaHandler) Decode(bs []byte) (interface{}, error) {
}

// Handle .
func (h *CreateLambdaHandler) Handle(raw interface{}) error {
func (h *CreateLambdaHandler) Handle(ctx context.Context, raw interface{}) error {
opts, ok := raw.(*types.ListWorkloadsOptions)
if !ok {
return types.NewDetailedErr(types.ErrInvalidType, raw)
}

workloadIDs, err := h.getWorkloadIDs(opts)
workloadIDs, err := h.getWorkloadIDs(ctx, opts)
if err != nil {
log.Errorf(context.TODO(), "[CreateLambdaHandler.Handle] Get workloads %s/%s/%v failed: %v",
opts.Appname, opts.Entrypoint, opts.Labels, err)
return err
}

ctx, cancel := getReplayContext(context.Background())
ctx, cancel := getReplayContext(ctx)
defer cancel()

if err := h.calcium.doRemoveWorkloadSync(ctx, workloadIDs); err != nil {
Expand All @@ -220,8 +220,8 @@ func (h *CreateLambdaHandler) Handle(raw interface{}) error {
return nil
}

func (h *CreateLambdaHandler) getWorkloadIDs(opts *types.ListWorkloadsOptions) ([]string, error) {
ctx, cancel := getReplayContext(context.Background())
func (h *CreateLambdaHandler) getWorkloadIDs(ctx context.Context, opts *types.ListWorkloadsOptions) ([]string, error) {
ctx, cancel := getReplayContext(ctx)
defer cancel()

workloads, err := h.calcium.ListWorkloads(ctx, opts)
Expand Down
26 changes: 13 additions & 13 deletions cluster/calcium/wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ func TestHandleCreateWorkloadNoHandle(t *testing.T) {
defer store.AssertExpectations(t)
store.On("GetWorkload", mock.Anything, wrkid).Return(wrk, nil).Once()

c.wal.Recover()
c.wal.Recover(context.TODO())

// Recovers nothing.
c.wal.Recover()
c.wal.Recover(context.TODO())
}

func TestHandleCreateWorkloadError(t *testing.T) {
Expand All @@ -59,12 +59,12 @@ func TestHandleCreateWorkloadError(t *testing.T) {
store := c.store.(*storemocks.Store)
defer store.AssertExpectations(t)
store.On("GetWorkload", mock.Anything, wrkid).Return(wrk, fmt.Errorf("err")).Once()
c.wal.Recover()
c.wal.Recover(context.TODO())

err = types.NewDetailedErr(types.ErrBadCount, fmt.Sprintf("keys: [%s]", wrkid))
store.On("GetWorkload", mock.Anything, wrkid).Return(wrk, err)
store.On("GetNode", mock.Anything, wrk.Nodename).Return(nil, fmt.Errorf("err")).Once()
c.wal.Recover()
c.wal.Recover(context.TODO())

store.On("GetNode", mock.Anything, wrk.Nodename).Return(node, nil)
eng, ok := node.Engine.(*enginemocks.API)
Expand All @@ -73,15 +73,15 @@ func TestHandleCreateWorkloadError(t *testing.T) {
eng.On("VirtualizationRemove", mock.Anything, wrk.ID, true, true).
Return(fmt.Errorf("err")).
Once()
c.wal.Recover()
c.wal.Recover(context.TODO())

eng.On("VirtualizationRemove", mock.Anything, wrk.ID, true, true).
Return(fmt.Errorf("Error: No such container: %s", wrk.ID)).
Once()
c.wal.Recover()
c.wal.Recover(context.TODO())

// Nothing recovered.
c.wal.Recover()
c.wal.Recover(context.TODO())
}

func TestHandleCreateWorkloadHandled(t *testing.T) {
Expand Down Expand Up @@ -118,10 +118,10 @@ func TestHandleCreateWorkloadHandled(t *testing.T) {
Return(nil).
Once()

c.wal.Recover()
c.wal.Recover(context.TODO())

// Recovers nothing.
c.wal.Recover()
c.wal.Recover(context.TODO())
}

func TestHandleCreateLambda(t *testing.T) {
Expand Down Expand Up @@ -154,7 +154,7 @@ func TestHandleCreateLambda(t *testing.T) {
Return(nil, fmt.Errorf("err")).
Once()
store.On("ListNodeWorkloads", mock.Anything, mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD)
c.wal.Recover()
c.wal.Recover(context.TODO())

store.On("ListWorkloads", mock.Anything, deployOpts.Name, deployOpts.Entrypoint.Name, "", int64(0), deployOpts.Labels).
Return([]*types.Workload{wrk}, nil).
Expand All @@ -179,11 +179,11 @@ func TestHandleCreateLambda(t *testing.T) {
Once()

lock := &lockmocks.DistributedLock{}
lock.On("Lock", mock.Anything).Return(context.Background(), nil)
lock.On("Lock", mock.Anything).Return(context.TODO(), nil)
lock.On("Unlock", mock.Anything).Return(nil)
store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil)

c.wal.Recover()
c.wal.Recover(context.TODO())
// Recovered nothing.
c.wal.Recover()
c.wal.Recover(context.TODO())
}
2 changes: 1 addition & 1 deletion core.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func serve(c *cli.Context) error {
return err
}
defer cluster.Finalizer()
cluster.DisasterRecover()
cluster.DisasterRecover(c.Context)

stop := make(chan struct{}, 1)
vibranium := rpc.New(cluster, config, stop)
Expand Down
12 changes: 6 additions & 6 deletions store/redis/ephemeral.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func (r *Rediaron) StartEphemeral(ctx context.Context, path string, heartbeat ti
return nil, nil, errors.Wrap(types.ErrKeyExists, path)
}

cctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(ctx)
expiry := make(chan struct{})

var wg sync.WaitGroup
Expand All @@ -37,11 +37,11 @@ func (r *Rediaron) StartEphemeral(ctx context.Context, path string, heartbeat ti
for {
select {
case <-tick.C:
if err := r.refreshEphemeral(path, heartbeat); err != nil {
if err := r.refreshEphemeral(ctx, path, heartbeat); err != nil {
r.revokeEphemeral(path)
return
}
case <-cctx.Done():
case <-ctx.Done():
r.revokeEphemeral(path)
return
}
Expand All @@ -55,15 +55,15 @@ func (r *Rediaron) StartEphemeral(ctx context.Context, path string, heartbeat ti
}

func (r *Rediaron) revokeEphemeral(path string) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
ctx, cancel := context.WithTimeout(context.TODO(), time.Second)
defer cancel()
if _, err := r.cli.Del(ctx, path).Result(); err != nil {
log.Errorf(context.TODO(), "[refreshEphemeral] revoke with %s failed: %v", path, err)
}
}

func (r *Rediaron) refreshEphemeral(path string, ttl time.Duration) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
func (r *Rediaron) refreshEphemeral(ctx context.Context, path string, ttl time.Duration) error {
ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
_, err := r.cli.Expire(ctx, path, ttl).Result()
return err
Expand Down
10 changes: 5 additions & 5 deletions wal/hydro.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (h *Hydro) Register(handler EventHandler) {
}

// Recover starts a disaster recovery, which will replay all the events.
func (h *Hydro) Recover() {
func (h *Hydro) Recover(ctx context.Context) {
ch, _ := h.kv.Scan([]byte(EventPrefix))

events := []HydroEvent{}
Expand All @@ -62,27 +62,27 @@ func (h *Hydro) Recover() {
continue
}

if err := h.recover(handler, ev); err != nil {
if err := h.recover(ctx, handler, ev); err != nil {
log.Errorf(context.TODO(), "[Recover] handle event %d (%s) failed: %v", ev.ID, ev.Type, err)
continue
}
}
}

func (h *Hydro) recover(handler EventHandler, event HydroEvent) error {
func (h *Hydro) recover(ctx context.Context, handler EventHandler, event HydroEvent) error {
item, err := handler.Decode(event.Item)
if err != nil {
return err
}

switch handle, err := handler.Check(item); {
switch handle, err := handler.Check(ctx, item); {
case err != nil:
return err
case !handle:
return event.Delete()
}

if err := handler.Handle(item); err != nil {
if err := handler.Handle(ctx, item); err != nil {
return err
}

Expand Down
13 changes: 7 additions & 6 deletions wal/hydro_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package wal

import (
"context"
"fmt"
"io/ioutil"
"os"
Expand Down Expand Up @@ -74,7 +75,7 @@ func TestRecoverFailedAsNoSuchHandler(t *testing.T) {

hydro.handlers.Delete(eventype)

hydro.Recover()
hydro.Recover(context.TODO())
require.True(t, encoded)
require.False(t, decoded)
require.False(t, checked)
Expand All @@ -98,7 +99,7 @@ func TestRecoverFailedAsCheckError(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, commit)

hydro.Recover()
hydro.Recover(context.TODO())
require.True(t, encoded)
require.True(t, decoded)
require.True(t, checked)
Expand Down Expand Up @@ -145,7 +146,7 @@ func TestRecoverFailedAsDecodeLogError(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, commit)

hydro.Recover()
hydro.Recover(context.TODO())
require.True(t, encoded)
require.True(t, decoded)
require.False(t, checked)
Expand All @@ -171,7 +172,7 @@ func TestHydroRecoverDiscardNoNeedEvent(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, commit)

hydro.Recover()
hydro.Recover(context.TODO())
require.True(t, encoded)
require.True(t, decoded)
require.True(t, checked)
Expand All @@ -191,7 +192,7 @@ func TestHydroRecover(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, commit)

hydro.Recover()
hydro.Recover(context.TODO())
require.True(t, encoded)
require.True(t, decoded)
require.True(t, checked)
Expand Down Expand Up @@ -236,7 +237,7 @@ func TestHydroRecoverWithRealLithium(t *testing.T) {
hydro.Log(handler.event, struct{}{})
hydro.Log(handler.event, struct{}{})

hydro.Recover()
hydro.Recover(context.TODO())

ch, _ := hydro.kv.Scan([]byte(EventPrefix))
for range ch {
Expand Down
7 changes: 4 additions & 3 deletions wal/mocks/WAL.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 46f170b

Please sign in to comment.