Skip to content

Commit

Permalink
refactor wal2
Browse files Browse the repository at this point in the history
  • Loading branch information
CMGS committed Mar 11, 2022
1 parent 1b765fa commit 38221ae
Show file tree
Hide file tree
Showing 30 changed files with 960 additions and 450 deletions.
2 changes: 1 addition & 1 deletion 3rdmocks/ServerStream.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ mock: deps
mockery --dir store --output store/mocks --name Store
mockery --dir engine --output engine/mocks --name API
mockery --dir cluster --output cluster/mocks --name Cluster
mockery --dir wal --output wal/mocks --name WAL
mockery --dir lock --output lock/mocks --name DistributedLock
mockery --dir store/etcdv3/meta --output store/etcdv3/meta/mocks --all
mockery --dir vendor/go.etcd.io/etcd/client/v3 --output store/etcdv3/meta/mocks --name Txn
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/calcium.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func New(config types.Config, t *testing.T) (*Calcium, error) {
watcher := helium.New(config.GRPCConfig, store)

cal := &Calcium{store: store, config: config, scheduler: potassium, source: scm, watcher: watcher}
cal.wal, err = newCalciumWAL(cal)
cal.wal, err = newWAL(config, cal)
cal.identifier = config.Identifier()

return cal, logger.Err(nil, errors.WithStack(err)) //nolint
Expand Down
7 changes: 1 addition & 6 deletions cluster/calcium/lambda.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/projecteru2/core/strategy"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"
"github.com/projecteru2/core/wal"

"github.com/pkg/errors"
)
Expand Down Expand Up @@ -68,7 +67,7 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC
}
}

commit, err := c.walCreateLambda(message)
commit, err := c.wal.Log(eventCreateLambda, message.WorkloadID)
if err != nil {
return &types.AttachWorkloadMessage{
WorkloadID: message.WorkloadID,
Expand Down Expand Up @@ -197,7 +196,3 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC

return workloadIDs, runMsgCh, nil
}

func (c *Calcium) walCreateLambda(opts *types.CreateWorkloadMessage) (wal.Commit, error) {
return c.wal.logCreateLambda(opts)
}
200 changes: 97 additions & 103 deletions cluster/calcium/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,21 @@ const (
// WAL for calcium.
type WAL struct {
wal.WAL
config types.Config
calcium *Calcium
}

func newCalciumWAL(cal *Calcium) (*WAL, error) {
w := &WAL{
WAL: wal.NewHydro(),
config: cal.config,
calcium: cal,
func newWAL(config types.Config, calcium *Calcium) (*WAL, error) {
hydro, err := wal.NewHydro(config.WALFile, config.WALOpenTimeout)
if err != nil {
return nil, err
}

if err := w.WAL.Open(w.config.WALFile, w.config.WALOpenTimeout); err != nil {
return nil, err
w := &WAL{
WAL: hydro,
calcium: calcium,
}

w.registerHandlers()

return w, nil
}

Expand All @@ -49,26 +47,92 @@ func (w *WAL) registerHandlers() {
w.Register(newProcessingCreatedHandler(w.calcium))
}

func (w *WAL) logCreateLambda(opts *types.CreateWorkloadMessage) (wal.Commit, error) {
return w.Log(eventCreateLambda, opts.WorkloadID)
// CreateLambdaHandler indicates event handler for creating lambda.
type CreateLambdaHandler struct {
typ string
calcium *Calcium
}

func newCreateLambdaHandler(calcium *Calcium) *CreateLambdaHandler {
return &CreateLambdaHandler{
typ: eventCreateLambda,
calcium: calcium,
}
}

// Event .
func (h *CreateLambdaHandler) Typ() string {
return h.typ
}

// Check .
func (h *CreateLambdaHandler) Check(context.Context, interface{}) (bool, error) {
return true, nil
}

// Encode .
func (h *CreateLambdaHandler) Encode(raw interface{}) ([]byte, error) {
workloadID, ok := raw.(string)
if !ok {
return nil, types.NewDetailedErr(types.ErrInvalidType, raw)
}
return []byte(workloadID), nil
}

// Decode .
func (h *CreateLambdaHandler) Decode(bs []byte) (interface{}, error) {
return string(bs), nil
}

// Handle .
func (h *CreateLambdaHandler) Handle(ctx context.Context, raw interface{}) error {
workloadID, ok := raw.(string)
if !ok {
return types.NewDetailedErr(types.ErrInvalidType, raw)
}

logger := log.WithField("WAL.Handle", "RunAndWait").WithField("ID", workloadID)
go func() {
workload, err := h.calcium.GetWorkload(ctx, workloadID)
if err != nil {
logger.Errorf(ctx, "Get workload failed: %v", err)
return
}

r, err := workload.Engine.VirtualizationWait(ctx, workloadID, "")
if err != nil {
logger.Errorf(ctx, "Wait failed: %+v", err)
return
}
if r.Code != 0 {
logger.Errorf(ctx, "Run failed: %s", r.Message)
}

if err := h.calcium.doRemoveWorkloadSync(ctx, []string{workloadID}); err != nil {
logger.Errorf(ctx, "Remove failed: %+v", err)
}
logger.Infof(ctx, "waited and removed")
}()

return nil
}

// CreateWorkloadHandler indicates event handler for creating workload.
type CreateWorkloadHandler struct {
event string
typ string
calcium *Calcium
}

func newCreateWorkloadHandler(cal *Calcium) *CreateWorkloadHandler {
func newCreateWorkloadHandler(calcium *Calcium) *CreateWorkloadHandler {
return &CreateWorkloadHandler{
event: eventWorkloadCreated,
calcium: cal,
typ: eventWorkloadCreated,
calcium: calcium,
}
}

// Event .
func (h *CreateWorkloadHandler) Event() string {
return h.event
func (h *CreateWorkloadHandler) Typ() string {
return h.typ
}

// Check .
Expand Down Expand Up @@ -132,96 +196,22 @@ func (h *CreateWorkloadHandler) Handle(ctx context.Context, raw interface{}) (er
return nil
}

// CreateLambdaHandler indicates event handler for creating lambda.
type CreateLambdaHandler struct {
event string
calcium *Calcium
}

func newCreateLambdaHandler(cal *Calcium) *CreateLambdaHandler {
return &CreateLambdaHandler{
event: eventCreateLambda,
calcium: cal,
}
}

// Event .
func (h *CreateLambdaHandler) Event() string {
return h.event
}

// Check .
func (h *CreateLambdaHandler) Check(context.Context, interface{}) (bool, error) {
return true, nil
}

// Encode .
func (h *CreateLambdaHandler) Encode(raw interface{}) ([]byte, error) {
workloadID, ok := raw.(string)
if !ok {
return nil, types.NewDetailedErr(types.ErrInvalidType, raw)
}
return []byte(workloadID), nil
}

// Decode .
func (h *CreateLambdaHandler) Decode(bs []byte) (interface{}, error) {
return string(bs), nil
}

// Handle .
func (h *CreateLambdaHandler) Handle(ctx context.Context, raw interface{}) error {
workloadID, ok := raw.(string)
if !ok {
return types.NewDetailedErr(types.ErrInvalidType, raw)
}

logger := log.WithField("WAL.Handle", "RunAndWait").WithField("ID", workloadID)
go func() {
workload, err := h.calcium.GetWorkload(ctx, workloadID)
if err != nil {
logger.Errorf(ctx, "Get workload failed: %v", err)
return
}

r, err := workload.Engine.VirtualizationWait(ctx, workloadID, "")
if err != nil {
logger.Errorf(ctx, "Wait failed: %+v", err)
return
}
if r.Code != 0 {
logger.Errorf(ctx, "Run failed: %s", r.Message)
}

if err := h.calcium.doRemoveWorkloadSync(ctx, []string{workloadID}); err != nil {
logger.Errorf(ctx, "Remove failed: %+v", err)
}
logger.Infof(ctx, "waited and removed")
}()

return nil
}

func getReplayContext(ctx context.Context) (context.Context, context.CancelFunc) {
return context.WithTimeout(ctx, time.Second*32)
}

// WorkloadResourceAllocatedHandler .
type WorkloadResourceAllocatedHandler struct {
event string
typ string
calcium *Calcium
}

func newWorkloadResourceAllocatedHandler(cal *Calcium) *WorkloadResourceAllocatedHandler {
func newWorkloadResourceAllocatedHandler(calcium *Calcium) *WorkloadResourceAllocatedHandler {
return &WorkloadResourceAllocatedHandler{
event: eventWorkloadResourceAllocated,
calcium: cal,
typ: eventWorkloadResourceAllocated,
calcium: calcium,
}
}

// Event .
func (h *WorkloadResourceAllocatedHandler) Event() string {
return h.event
func (h *WorkloadResourceAllocatedHandler) Typ() string {
return h.typ
}

// Check .
Expand Down Expand Up @@ -276,20 +266,20 @@ func (h *WorkloadResourceAllocatedHandler) Handle(ctx context.Context, raw inter

// ProcessingCreatedHandler .
type ProcessingCreatedHandler struct {
event string
typ string
calcium *Calcium
}

func newProcessingCreatedHandler(cal *Calcium) *ProcessingCreatedHandler {
func newProcessingCreatedHandler(calcium *Calcium) *ProcessingCreatedHandler {
return &ProcessingCreatedHandler{
event: eventProcessingCreated,
calcium: cal,
typ: eventProcessingCreated,
calcium: calcium,
}
}

// Event .
func (h *ProcessingCreatedHandler) Event() string {
return h.event
func (h *ProcessingCreatedHandler) Typ() string {
return h.typ
}

// Check .
Expand Down Expand Up @@ -329,3 +319,7 @@ func (h *ProcessingCreatedHandler) Handle(ctx context.Context, raw interface{})
logger.Infof(ctx, "obsolete processing deleted")
return
}

func getReplayContext(ctx context.Context) (context.Context, context.CancelFunc) {
return context.WithTimeout(ctx, time.Second*32) // TODO why 32?
}
12 changes: 6 additions & 6 deletions cluster/calcium/wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (

func TestHandleCreateWorkloadNoHandle(t *testing.T) {
c := NewTestCluster()
wal, err := newCalciumWAL(c)
wal, err := newWAL(c.config, c)
require.NoError(t, err)
c.wal = wal

Expand All @@ -43,7 +43,7 @@ func TestHandleCreateWorkloadNoHandle(t *testing.T) {

func TestHandleCreateWorkloadError(t *testing.T) {
c := NewTestCluster()
wal, err := newCalciumWAL(c)
wal, err := newWAL(c.config, c)
require.NoError(t, err)
c.wal = wal

Expand Down Expand Up @@ -90,7 +90,7 @@ func TestHandleCreateWorkloadError(t *testing.T) {

func TestHandleCreateWorkloadHandled(t *testing.T) {
c := NewTestCluster()
wal, err := newCalciumWAL(c)
wal, err := newWAL(c.config, c)
require.NoError(t, err)
c.wal = wal

Expand Down Expand Up @@ -130,11 +130,11 @@ func TestHandleCreateWorkloadHandled(t *testing.T) {

func TestHandleCreateLambda(t *testing.T) {
c := NewTestCluster()
wal, err := newCalciumWAL(c)
wal, err := newWAL(c.config, c)
require.NoError(t, err)
c.wal = wal

_, err = c.wal.logCreateLambda(&types.CreateWorkloadMessage{WorkloadID: "workloadid"})
_, err = c.wal.Log(eventCreateLambda, "workloadid")
require.NoError(t, err)

node := &types.Node{
Expand All @@ -153,7 +153,7 @@ func TestHandleCreateLambda(t *testing.T) {
time.Sleep(500 * time.Millisecond)
store.AssertExpectations(t)

_, err = c.wal.logCreateLambda(&types.CreateWorkloadMessage{WorkloadID: "workloadid"})
_, err = c.wal.Log(eventCreateLambda, "workloadid")
require.NoError(t, err)
store.On("GetWorkload", mock.Anything, mock.Anything).
Return(wrk, nil).
Expand Down
2 changes: 1 addition & 1 deletion engine/docker/mocks/APIClient.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion engine/mocks/API.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion lock/mocks/DistributedLock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion rpc/mocks/CoreRPC_RunAndWaitServer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion scheduler/mocks/Scheduler.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 38221ae

Please sign in to comment.