Skip to content

Commit

Permalink
to remove ctx which unused (#372)
Browse files Browse the repository at this point in the history
Co-authored-by: anrs <[email protected]>
  • Loading branch information
anrs and anrs authored Mar 30, 2021
1 parent 5914b8f commit 789ca9d
Show file tree
Hide file tree
Showing 20 changed files with 168 additions and 185 deletions.
5 changes: 2 additions & 3 deletions cluster/calcium/calcium.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package calcium

import (
"context"
"strings"

"github.com/pkg/errors"
Expand Down Expand Up @@ -82,8 +81,8 @@ func New(config types.Config, embeddedStorage bool) (*Calcium, error) {
}

// DisasterRecover .
func (c *Calcium) DisasterRecover(ctx context.Context) {
c.wal.Recover(ctx)
func (c *Calcium) DisasterRecover() {
c.wal.Recover()
}

// Finalizer use for defer
Expand Down
4 changes: 2 additions & 2 deletions cluster/calcium/calcium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ func NewTestCluster() *Calcium {
c.wal = &WAL{WAL: &walmocks.WAL{}}

mwal := c.wal.WAL.(*walmocks.WAL)
commit := wal.Commit(func(context.Context) error { return nil })
mwal.On("Log", mock.Anything, mock.Anything, mock.Anything).Return(commit, nil)
commit := wal.Commit(func() error { return nil })
mwal.On("Log", mock.Anything, mock.Anything).Return(commit, nil)

return c
}
Expand Down
4 changes: 2 additions & 2 deletions cluster/calcium/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func (c *Calcium) doDeployOneWorkload(
var commit wal.Commit
defer func() {
if commit != nil {
if err := commit(context.Background()); err != nil {
if err := commit(); err != nil {
log.Errorf("[doDeployOneWorkload] Commit WAL %s failed: %v", eventCreateWorkload, err)
}
}
Expand All @@ -276,7 +276,7 @@ func (c *Calcium) doDeployOneWorkload(
// We couldn't WAL the workload ID above VirtualizationCreate temporarily,
// so there's a time gap window, once the core process crashes between
// VirtualizationCreate and logCreateWorkload then the worload is leaky.
if commit, err = c.wal.logCreateWorkload(ctx, workload.ID, node.Name); err != nil {
if commit, err = c.wal.logCreateWorkload(workload.ID, node.Name); err != nil {
return err
}
return nil
Expand Down
8 changes: 4 additions & 4 deletions cluster/calcium/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,11 +253,11 @@ func TestCreateWorkloadTxn(t *testing.T) {
engine.On("VirtualizationStart", mock.Anything, mock.Anything).Return(nil)
engine.On("VirtualizationInspect", mock.Anything, mock.Anything).Return(&enginetypes.VirtualizationInfo{}, nil)
store.On("AddWorkload", mock.Anything, mock.Anything).Return(errors.Wrap(context.DeadlineExceeded, "AddWorkload")).Twice()
commit := wal.Commit(func(context.Context) error {
commit := wal.Commit(func() error {
walCommitted = true
return nil
})
mwal.On("Log", mock.Anything, eventCreateWorkload, mock.Anything).Return(commit, nil)
mwal.On("Log", eventCreateWorkload, mock.Anything).Return(commit, nil)
walCommitted = false
ch, err = c.CreateWorkload(ctx, opts)
assert.Nil(t, err)
Expand Down Expand Up @@ -349,8 +349,8 @@ func newCreateWorkloadCluster(t *testing.T) (*Calcium, []*types.Node) {
}, nil)

mwal := c.wal.WAL.(*walmocks.WAL)
commit := wal.Commit(func(context.Context) error { return nil })
mwal.On("Log", mock.Anything, mock.Anything, mock.Anything).Return(commit, nil)
commit := wal.Commit(func() error { return nil })
mwal.On("Log", mock.Anything, mock.Anything).Return(commit, nil)

sche := c.scheduler.(*schedulermocks.Scheduler)
sche.On("SelectStorageNodes", mock.AnythingOfType("[]resourcetypes.ScheduleInfo"), mock.AnythingOfType("int64")).Return(func(scheduleInfos []resourcetypes.ScheduleInfo, _ int64) []resourcetypes.ScheduleInfo {
Expand Down
4 changes: 2 additions & 2 deletions cluster/calcium/lambda.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC
go func() {
defer close(runMsgCh)
wg.Wait()
if err := commit(context.Background()); err != nil {
if err := commit(); err != nil {
logger.Errorf("[RunAndWait] Commit WAL %s failed: %v", eventCreateLambda, err)
}
log.Info("[RunAndWait] Finish run and wait for workloads")
Expand All @@ -148,5 +148,5 @@ func (c *Calcium) walCreateLambda(ctx context.Context, opts *types.DeployOptions
opts.Labels = map[string]string{labelLambdaID: lambdaID}
}

return c.wal.logCreateLambda(ctx, opts)
return c.wal.logCreateLambda(opts)
}
4 changes: 2 additions & 2 deletions cluster/calcium/lambda_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ func TestRunAndWaitFailedThenWALCommitted(t *testing.T) {
mwal := c.wal.WAL.(*walmocks.WAL)
defer mwal.AssertExpectations(t)
var walCommitted bool
commit := wal.Commit(func(context.Context) error {
commit := wal.Commit(func() error {
walCommitted = true
return nil
})
mwal.On("Log", mock.Anything, eventCreateLambda, mock.Anything).Return(commit, nil).Once()
mwal.On("Log", eventCreateLambda, mock.Anything).Return(commit, nil).Once()

opts := &types.DeployOptions{
Name: "zc:name",
Expand Down
11 changes: 6 additions & 5 deletions cluster/calcium/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ func newCalciumWAL(cal *Calcium) (*WAL, error) {
config: cal.config,
calcium: cal,
}
if err := w.WAL.Open(context.Background(), w.config.WALFile, w.config.WALOpenTimeout); err != nil {

if err := w.WAL.Open(w.config.WALFile, w.config.WALOpenTimeout); err != nil {
return nil, err
}

Expand All @@ -43,15 +44,15 @@ func (w *WAL) registerHandlers() {
w.Register(newCreateWorkloadHandler(w.calcium))
}

func (w *WAL) logCreateWorkload(ctx context.Context, workloadID, nodename string) (wal.Commit, error) {
return w.Log(ctx, eventCreateWorkload, &types.Workload{
func (w *WAL) logCreateWorkload(workloadID, nodename string) (wal.Commit, error) {
return w.Log(eventCreateWorkload, &types.Workload{
ID: workloadID,
Nodename: nodename,
})
}

func (w *WAL) logCreateLambda(ctx context.Context, opts *types.DeployOptions) (wal.Commit, error) {
return w.Log(ctx, eventCreateLambda, &types.ListWorkloadsOptions{
func (w *WAL) logCreateLambda(opts *types.DeployOptions) (wal.Commit, error) {
return w.Log(eventCreateLambda, &types.ListWorkloadsOptions{
Appname: opts.Name,
Entrypoint: opts.Entrypoint.Name,
Labels: map[string]string{labelLambdaID: opts.Labels[labelLambdaID]},
Expand Down
32 changes: 16 additions & 16 deletions cluster/calcium/wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestHandleCreateWorkloadNoHandle(t *testing.T) {
c.wal = wal

wrkid := "workload-id"
_, err = c.wal.logCreateWorkload(context.Background(), wrkid, "nodename")
_, err = c.wal.logCreateWorkload(wrkid, "nodename")
require.NoError(t, err)

wrk := &types.Workload{
Expand All @@ -31,10 +31,10 @@ func TestHandleCreateWorkloadNoHandle(t *testing.T) {
defer store.AssertExpectations(t)
store.On("GetWorkload", mock.Anything, wrkid).Return(wrk, nil).Once()

c.wal.Recover(context.Background())
c.wal.Recover()

// Recovers nothing.
c.wal.Recover(context.Background())
c.wal.Recover()
}

func TestHandleCreateWorkloadError(t *testing.T) {
Expand All @@ -48,7 +48,7 @@ func TestHandleCreateWorkloadError(t *testing.T) {
Engine: &enginemocks.API{},
}
wrkid := "workload-id"
_, err = c.wal.logCreateWorkload(context.Background(), wrkid, node.Name)
_, err = c.wal.logCreateWorkload(wrkid, node.Name)
require.NoError(t, err)

wrk := &types.Workload{
Expand All @@ -59,12 +59,12 @@ func TestHandleCreateWorkloadError(t *testing.T) {
store := c.store.(*storemocks.Store)
defer store.AssertExpectations(t)
store.On("GetWorkload", mock.Anything, wrkid).Return(wrk, fmt.Errorf("err")).Once()
c.wal.Recover(context.Background())
c.wal.Recover()

err = types.NewDetailedErr(types.ErrBadCount, fmt.Sprintf("keys: [%s]", wrkid))
store.On("GetWorkload", mock.Anything, wrkid).Return(wrk, err)
store.On("GetNode", mock.Anything, wrk.Nodename).Return(nil, fmt.Errorf("err")).Once()
c.wal.Recover(context.Background())
c.wal.Recover()

store.On("GetNode", mock.Anything, wrk.Nodename).Return(node, nil)
eng, ok := node.Engine.(*enginemocks.API)
Expand All @@ -73,15 +73,15 @@ func TestHandleCreateWorkloadError(t *testing.T) {
eng.On("VirtualizationRemove", mock.Anything, wrk.ID, true, true).
Return(fmt.Errorf("err")).
Once()
c.wal.Recover(context.Background())
c.wal.Recover()

eng.On("VirtualizationRemove", mock.Anything, wrk.ID, true, true).
Return(fmt.Errorf("Error: No such container: %s", wrk.ID)).
Once()
c.wal.Recover(context.Background())
c.wal.Recover()

// Nothing recovered.
c.wal.Recover(context.Background())
c.wal.Recover()
}

func TestHandleCreateWorkloadHandled(t *testing.T) {
Expand All @@ -96,7 +96,7 @@ func TestHandleCreateWorkloadHandled(t *testing.T) {
}

wrkid := "workload-id"
_, err = c.wal.logCreateWorkload(context.Background(), wrkid, node.Name)
_, err = c.wal.logCreateWorkload(wrkid, node.Name)
require.NoError(t, err)

wrk := &types.Workload{
Expand All @@ -118,10 +118,10 @@ func TestHandleCreateWorkloadHandled(t *testing.T) {
Return(nil).
Once()

c.wal.Recover(context.Background())
c.wal.Recover()

// Recovers nothing.
c.wal.Recover(context.Background())
c.wal.Recover()
}

func TestHandleCreateLambda(t *testing.T) {
Expand All @@ -135,7 +135,7 @@ func TestHandleCreateLambda(t *testing.T) {
Entrypoint: &types.Entrypoint{Name: "entry"},
Labels: map[string]string{labelLambdaID: "lambda"},
}
_, err = c.wal.logCreateLambda(context.Background(), deployOpts)
_, err = c.wal.logCreateLambda(deployOpts)
require.NoError(t, err)

node := &types.Node{
Expand All @@ -154,7 +154,7 @@ func TestHandleCreateLambda(t *testing.T) {
Return(nil, fmt.Errorf("err")).
Once()
store.On("ListNodeWorkloads", mock.Anything, mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD)
c.wal.Recover(context.Background())
c.wal.Recover()

store.On("ListWorkloads", mock.Anything, deployOpts.Name, deployOpts.Entrypoint.Name, "", int64(0), deployOpts.Labels).
Return([]*types.Workload{wrk}, nil).
Expand Down Expand Up @@ -183,7 +183,7 @@ func TestHandleCreateLambda(t *testing.T) {
lock.On("Unlock", mock.Anything).Return(nil)
store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil)

c.wal.Recover(context.Background())
c.wal.Recover()
// Recovered nothing.
c.wal.Recover(context.Background())
c.wal.Recover()
}
2 changes: 1 addition & 1 deletion core.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func serve(c *cli.Context) error {
return err
}
defer cluster.Finalizer()
cluster.DisasterRecover(c.Context)
cluster.DisasterRecover()

rpcch := make(chan struct{}, 1)
vibranium := rpc.New(cluster, config, rpcch)
Expand Down
31 changes: 13 additions & 18 deletions wal/hydro.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package wal

import (
"context"
"encoding/json"
"sync"
"time"
Expand All @@ -26,14 +25,14 @@ func NewHydro() *Hydro {
}

// Open connects a kvdb.
func (h *Hydro) Open(ctx context.Context, path string, timeout time.Duration) (err error) {
err = h.kv.Open(ctx, path, 0600, timeout)
func (h *Hydro) Open(path string, timeout time.Duration) (err error) {
err = h.kv.Open(path, 0600, timeout)
return
}

// Close disconnects the kvdb.
func (h *Hydro) Close(ctx context.Context) error {
return h.kv.Close(ctx)
func (h *Hydro) Close() error {
return h.kv.Close()
}

// Register registers a new event handler.
Expand All @@ -42,8 +41,8 @@ func (h *Hydro) Register(handler EventHandler) {
}

// Recover starts a disaster recovery, which will replay all the events.
func (h *Hydro) Recover(ctx context.Context) {
ch, _ := h.kv.Scan(ctx, []byte(EventPrefix))
func (h *Hydro) Recover() {
ch, _ := h.kv.Scan([]byte(EventPrefix))

for ent := range ch {
event, err := h.decodeEvent(ent)
Expand All @@ -58,14 +57,14 @@ func (h *Hydro) Recover(ctx context.Context) {
continue
}

if err := h.recover(ctx, handler, event); err != nil {
if err := h.recover(handler, event); err != nil {
log.Errorf("[Recover] handle event %d (%s) failed: %v", event.ID, event.Type, err)
continue
}
}
}

func (h *Hydro) recover(ctx context.Context, handler EventHandler, event HydroEvent) error {
func (h *Hydro) recover(handler EventHandler, event HydroEvent) error {
item, err := handler.Decode(event.Item)
if err != nil {
return err
Expand All @@ -75,18 +74,18 @@ func (h *Hydro) recover(ctx context.Context, handler EventHandler, event HydroEv
case err != nil:
return err
case !handle:
return event.Delete(ctx)
return event.Delete()
}

if err := handler.Handle(item); err != nil {
return err
}

return event.Delete(ctx)
return event.Delete()
}

// Log records a log item.
func (h *Hydro) Log(ctx context.Context, eventype string, item interface{}) (Commit, error) {
func (h *Hydro) Log(eventype string, item interface{}) (Commit, error) {
handler, ok := h.getEventHandler(eventype)
if !ok {
return nil, coretypes.NewDetailedErr(coretypes.ErrUnregisteredWALEventType, eventype)
Expand All @@ -101,15 +100,11 @@ func (h *Hydro) Log(ctx context.Context, eventype string, item interface{}) (Com
event.Type = eventype
event.Item = bs

if err = event.Create(ctx); err != nil {
if err = event.Create(); err != nil {
return nil, err
}

commit := func(context.Context) error {
return event.Delete(ctx)
}

return commit, nil
return event.Delete, nil
}

func (h *Hydro) getEventHandler(event string) (handler EventHandler, ok bool) {
Expand Down
11 changes: 5 additions & 6 deletions wal/hydro_event.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package wal

import (
"context"
"encoding/json"
"fmt"
"path/filepath"
Expand Down Expand Up @@ -33,8 +32,8 @@ func NewHydroEvent(kv kv.KV) (e *HydroEvent) {
}

// Create persists this event.
func (e *HydroEvent) Create(ctx context.Context) (err error) {
if e.ID, err = e.kv.NextSequence(ctx); err != nil {
func (e *HydroEvent) Create() (err error) {
if e.ID, err = e.kv.NextSequence(); err != nil {
return
}

Expand All @@ -43,12 +42,12 @@ func (e *HydroEvent) Create(ctx context.Context) (err error) {
return err
}

return e.kv.Put(ctx, e.Key(), value)
return e.kv.Put(e.Key(), value)
}

// Delete removes this event from persistence.
func (e HydroEvent) Delete(ctx context.Context) error {
return e.kv.Delete(ctx, e.Key())
func (e HydroEvent) Delete() error {
return e.kv.Delete(e.Key())
}

// Key returns this event's key path.
Expand Down
Loading

0 comments on commit 789ca9d

Please sign in to comment.