Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor wal stage 1 #561

Merged
merged 3 commits into from
Mar 24, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion 3rdmocks/ServerStream.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ mock: deps
mockery --dir store --output store/mocks --name Store
mockery --dir engine --output engine/mocks --name API
mockery --dir cluster --output cluster/mocks --name Cluster
mockery --dir wal --output wal/mocks --name WAL
mockery --dir lock --output lock/mocks --name DistributedLock
mockery --dir store/etcdv3/meta --output store/etcdv3/meta/mocks --all
mockery --dir vendor/go.etcd.io/etcd/client/v3 --output store/etcdv3/meta/mocks --name Txn
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/calcium.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func New(config types.Config, t *testing.T) (*Calcium, error) {
watcher := helium.New(config.GRPCConfig, store)

cal := &Calcium{store: store, config: config, scheduler: potassium, source: scm, watcher: watcher}
cal.wal, err = newCalciumWAL(cal)
cal.wal, err = newWAL(config, cal)
cal.identifier = config.Identifier()

return cal, logger.Err(nil, errors.WithStack(err)) //nolint
Expand Down
7 changes: 1 addition & 6 deletions cluster/calcium/lambda.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/projecteru2/core/strategy"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"
"github.com/projecteru2/core/wal"

"github.com/pkg/errors"
)
Expand Down Expand Up @@ -68,7 +67,7 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC
}
}

commit, err := c.walCreateLambda(message)
commit, err := c.wal.Log(eventCreateLambda, message.WorkloadID)
if err != nil {
return &types.AttachWorkloadMessage{
WorkloadID: message.WorkloadID,
Expand Down Expand Up @@ -197,7 +196,3 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC

return workloadIDs, runMsgCh, nil
}

func (c *Calcium) walCreateLambda(opts *types.CreateWorkloadMessage) (wal.Commit, error) {
return c.wal.logCreateLambda(opts)
}
200 changes: 97 additions & 103 deletions cluster/calcium/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,21 @@ const (
// WAL for calcium.
type WAL struct {
wal.WAL
config types.Config
calcium *Calcium
}

func newCalciumWAL(cal *Calcium) (*WAL, error) {
w := &WAL{
WAL: wal.NewHydro(),
config: cal.config,
calcium: cal,
func newWAL(config types.Config, calcium *Calcium) (*WAL, error) {
hydro, err := wal.NewHydro(config.WALFile, config.WALOpenTimeout)
if err != nil {
return nil, err
}

if err := w.WAL.Open(w.config.WALFile, w.config.WALOpenTimeout); err != nil {
return nil, err
w := &WAL{
WAL: hydro,
calcium: calcium,
}

w.registerHandlers()

return w, nil
}

Expand All @@ -49,26 +47,92 @@ func (w *WAL) registerHandlers() {
w.Register(newProcessingCreatedHandler(w.calcium))
}

func (w *WAL) logCreateLambda(opts *types.CreateWorkloadMessage) (wal.Commit, error) {
return w.Log(eventCreateLambda, opts.WorkloadID)
// CreateLambdaHandler indicates event handler for creating lambda.
type CreateLambdaHandler struct {
typ string
calcium *Calcium
}

func newCreateLambdaHandler(calcium *Calcium) *CreateLambdaHandler {
return &CreateLambdaHandler{
typ: eventCreateLambda,
calcium: calcium,
}
}

// Event .
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

打错了...

func (h *CreateLambdaHandler) Typ() string {
return h.typ
}

// Check .
func (h *CreateLambdaHandler) Check(context.Context, interface{}) (bool, error) {
return true, nil
}

// Encode .
func (h *CreateLambdaHandler) Encode(raw interface{}) ([]byte, error) {
workloadID, ok := raw.(string)
if !ok {
return nil, types.NewDetailedErr(types.ErrInvalidType, raw)
}
return []byte(workloadID), nil
}

// Decode .
func (h *CreateLambdaHandler) Decode(bs []byte) (interface{}, error) {
return string(bs), nil
}

// Handle .
func (h *CreateLambdaHandler) Handle(ctx context.Context, raw interface{}) error {
workloadID, ok := raw.(string)
if !ok {
return types.NewDetailedErr(types.ErrInvalidType, raw)
}

logger := log.WithField("WAL.Handle", "RunAndWait").WithField("ID", workloadID)
go func() {
workload, err := h.calcium.GetWorkload(ctx, workloadID)
if err != nil {
logger.Errorf(ctx, "Get workload failed: %v", err)
return
}

r, err := workload.Engine.VirtualizationWait(ctx, workloadID, "")
if err != nil {
logger.Errorf(ctx, "Wait failed: %+v", err)
return
}
if r.Code != 0 {
logger.Errorf(ctx, "Run failed: %s", r.Message)
}

if err := h.calcium.doRemoveWorkloadSync(ctx, []string{workloadID}); err != nil {
logger.Errorf(ctx, "Remove failed: %+v", err)
}
logger.Infof(ctx, "waited and removed")
}()

return nil
}

// CreateWorkloadHandler indicates event handler for creating workload.
type CreateWorkloadHandler struct {
event string
typ string
calcium *Calcium
}

func newCreateWorkloadHandler(cal *Calcium) *CreateWorkloadHandler {
func newCreateWorkloadHandler(calcium *Calcium) *CreateWorkloadHandler {
return &CreateWorkloadHandler{
event: eventWorkloadCreated,
calcium: cal,
typ: eventWorkloadCreated,
calcium: calcium,
}
}

// Event .
func (h *CreateWorkloadHandler) Event() string {
return h.event
func (h *CreateWorkloadHandler) Typ() string {
return h.typ
}

// Check .
Expand Down Expand Up @@ -132,96 +196,22 @@ func (h *CreateWorkloadHandler) Handle(ctx context.Context, raw interface{}) (er
return nil
}

// CreateLambdaHandler indicates event handler for creating lambda.
type CreateLambdaHandler struct {
event string
calcium *Calcium
}

func newCreateLambdaHandler(cal *Calcium) *CreateLambdaHandler {
return &CreateLambdaHandler{
event: eventCreateLambda,
calcium: cal,
}
}

// Event .
func (h *CreateLambdaHandler) Event() string {
return h.event
}

// Check .
func (h *CreateLambdaHandler) Check(context.Context, interface{}) (bool, error) {
return true, nil
}

// Encode .
func (h *CreateLambdaHandler) Encode(raw interface{}) ([]byte, error) {
workloadID, ok := raw.(string)
if !ok {
return nil, types.NewDetailedErr(types.ErrInvalidType, raw)
}
return []byte(workloadID), nil
}

// Decode .
func (h *CreateLambdaHandler) Decode(bs []byte) (interface{}, error) {
return string(bs), nil
}

// Handle .
func (h *CreateLambdaHandler) Handle(ctx context.Context, raw interface{}) error {
workloadID, ok := raw.(string)
if !ok {
return types.NewDetailedErr(types.ErrInvalidType, raw)
}

logger := log.WithField("WAL.Handle", "RunAndWait").WithField("ID", workloadID)
go func() {
workload, err := h.calcium.GetWorkload(ctx, workloadID)
if err != nil {
logger.Errorf(ctx, "Get workload failed: %v", err)
return
}

r, err := workload.Engine.VirtualizationWait(ctx, workloadID, "")
if err != nil {
logger.Errorf(ctx, "Wait failed: %+v", err)
return
}
if r.Code != 0 {
logger.Errorf(ctx, "Run failed: %s", r.Message)
}

if err := h.calcium.doRemoveWorkloadSync(ctx, []string{workloadID}); err != nil {
logger.Errorf(ctx, "Remove failed: %+v", err)
}
logger.Infof(ctx, "waited and removed")
}()

return nil
}

func getReplayContext(ctx context.Context) (context.Context, context.CancelFunc) {
return context.WithTimeout(ctx, time.Second*32)
}

// WorkloadResourceAllocatedHandler .
type WorkloadResourceAllocatedHandler struct {
event string
typ string
calcium *Calcium
}

func newWorkloadResourceAllocatedHandler(cal *Calcium) *WorkloadResourceAllocatedHandler {
func newWorkloadResourceAllocatedHandler(calcium *Calcium) *WorkloadResourceAllocatedHandler {
return &WorkloadResourceAllocatedHandler{
event: eventWorkloadResourceAllocated,
calcium: cal,
typ: eventWorkloadResourceAllocated,
calcium: calcium,
}
}

// Event .
func (h *WorkloadResourceAllocatedHandler) Event() string {
return h.event
func (h *WorkloadResourceAllocatedHandler) Typ() string {
return h.typ
}

// Check .
Expand Down Expand Up @@ -276,20 +266,20 @@ func (h *WorkloadResourceAllocatedHandler) Handle(ctx context.Context, raw inter

// ProcessingCreatedHandler .
type ProcessingCreatedHandler struct {
event string
typ string
calcium *Calcium
}

func newProcessingCreatedHandler(cal *Calcium) *ProcessingCreatedHandler {
func newProcessingCreatedHandler(calcium *Calcium) *ProcessingCreatedHandler {
return &ProcessingCreatedHandler{
event: eventProcessingCreated,
calcium: cal,
typ: eventProcessingCreated,
calcium: calcium,
}
}

// Event .
func (h *ProcessingCreatedHandler) Event() string {
return h.event
func (h *ProcessingCreatedHandler) Typ() string {
return h.typ
}

// Check .
Expand Down Expand Up @@ -329,3 +319,7 @@ func (h *ProcessingCreatedHandler) Handle(ctx context.Context, raw interface{})
logger.Infof(ctx, "obsolete processing deleted")
return
}

func getReplayContext(ctx context.Context) (context.Context, context.CancelFunc) {
return context.WithTimeout(ctx, time.Second*32) // TODO why 32?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ctx这块确实有点乱了,以后要花点精力重整一下...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这种我都已经……弃疗了,你整理下吧

}
12 changes: 6 additions & 6 deletions cluster/calcium/wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (

func TestHandleCreateWorkloadNoHandle(t *testing.T) {
c := NewTestCluster()
wal, err := newCalciumWAL(c)
wal, err := newWAL(c.config, c)
require.NoError(t, err)
c.wal = wal

Expand All @@ -43,7 +43,7 @@ func TestHandleCreateWorkloadNoHandle(t *testing.T) {

func TestHandleCreateWorkloadError(t *testing.T) {
c := NewTestCluster()
wal, err := newCalciumWAL(c)
wal, err := newWAL(c.config, c)
require.NoError(t, err)
c.wal = wal

Expand Down Expand Up @@ -90,7 +90,7 @@ func TestHandleCreateWorkloadError(t *testing.T) {

func TestHandleCreateWorkloadHandled(t *testing.T) {
c := NewTestCluster()
wal, err := newCalciumWAL(c)
wal, err := newWAL(c.config, c)
require.NoError(t, err)
c.wal = wal

Expand Down Expand Up @@ -130,11 +130,11 @@ func TestHandleCreateWorkloadHandled(t *testing.T) {

func TestHandleCreateLambda(t *testing.T) {
c := NewTestCluster()
wal, err := newCalciumWAL(c)
wal, err := newWAL(c.config, c)
require.NoError(t, err)
c.wal = wal

_, err = c.wal.logCreateLambda(&types.CreateWorkloadMessage{WorkloadID: "workloadid"})
_, err = c.wal.Log(eventCreateLambda, "workloadid")
require.NoError(t, err)

node := &types.Node{
Expand All @@ -153,7 +153,7 @@ func TestHandleCreateLambda(t *testing.T) {
time.Sleep(500 * time.Millisecond)
store.AssertExpectations(t)

_, err = c.wal.logCreateLambda(&types.CreateWorkloadMessage{WorkloadID: "workloadid"})
_, err = c.wal.Log(eventCreateLambda, "workloadid")
require.NoError(t, err)
store.On("GetWorkload", mock.Anything, mock.Anything).
Return(wrk, nil).
Expand Down
2 changes: 1 addition & 1 deletion engine/docker/mocks/APIClient.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion engine/mocks/API.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion lock/mocks/DistributedLock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion rpc/mocks/CoreRPC_RunAndWaitServer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion scheduler/mocks/Scheduler.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading