Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor wal stage 1 #561

Merged
merged 3 commits into from
Mar 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion 3rdmocks/ServerStream.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ mock: deps
mockery --dir store --output store/mocks --name Store
mockery --dir engine --output engine/mocks --name API
mockery --dir cluster --output cluster/mocks --name Cluster
mockery --dir wal --output wal/mocks --name WAL
mockery --dir lock --output lock/mocks --name DistributedLock
mockery --dir store/etcdv3/meta --output store/etcdv3/meta/mocks --all
mockery --dir vendor/go.etcd.io/etcd/client/v3 --output store/etcdv3/meta/mocks --name Txn
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/calcium.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func New(config types.Config, t *testing.T) (*Calcium, error) {
watcher := helium.New(config.GRPCConfig, store)

cal := &Calcium{store: store, config: config, scheduler: potassium, source: scm, watcher: watcher}
cal.wal, err = newCalciumWAL(cal)
cal.wal, err = newWAL(config, cal)
cal.identifier = config.Identifier()

return cal, logger.Err(nil, errors.WithStack(err)) //nolint
Expand Down
7 changes: 1 addition & 6 deletions cluster/calcium/lambda.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/projecteru2/core/strategy"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"
"github.com/projecteru2/core/wal"

"github.com/pkg/errors"
)
Expand Down Expand Up @@ -68,7 +67,7 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC
}
}

commit, err := c.walCreateLambda(message)
commit, err := c.wal.Log(eventCreateLambda, message.WorkloadID)
if err != nil {
return &types.AttachWorkloadMessage{
WorkloadID: message.WorkloadID,
Expand Down Expand Up @@ -197,7 +196,3 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC

return workloadIDs, runMsgCh, nil
}

func (c *Calcium) walCreateLambda(opts *types.CreateWorkloadMessage) (wal.Commit, error) {
return c.wal.logCreateLambda(opts)
}
9 changes: 5 additions & 4 deletions cluster/calcium/remove.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (

// RemoveWorkload remove workloads
// returns a channel that contains removing responses
func (c *Calcium) RemoveWorkload(ctx context.Context, ids []string, force bool, step int) (chan *types.RemoveWorkloadMessage, error) {
logger := log.WithField("Calcium", "RemoveWorkload").WithField("ids", ids).WithField("force", force).WithField("step", step)
func (c *Calcium) RemoveWorkload(ctx context.Context, ids []string, force bool) (chan *types.RemoveWorkloadMessage, error) {
logger := log.WithField("Calcium", "RemoveWorkload").WithField("ids", ids).WithField("force", force)

nodeWorkloadGroup, err := c.groupWorkloadsByNode(ctx, ids)
if err != nil {
Expand Down Expand Up @@ -45,7 +45,7 @@ func (c *Calcium) RemoveWorkload(ctx context.Context, ids []string, force bool,
},
// then
func(ctx context.Context) (err error) {
if err = c.doRemoveWorkload(ctx, workload, force); err != nil {
if err = c.doRemoveWorkload(ctx, workload, force); err == nil {
log.Infof(ctx, "[RemoveWorkload] Workload %s removed", workload.ID)
}
return err
Expand Down Expand Up @@ -104,12 +104,13 @@ func (c *Calcium) doRemoveWorkload(ctx context.Context, workload *types.Workload

// 同步地删除容器, 在某些需要等待的场合异常有用!
func (c *Calcium) doRemoveWorkloadSync(ctx context.Context, ids []string) error {
ch, err := c.RemoveWorkload(ctx, ids, true, 1)
ch, err := c.RemoveWorkload(ctx, ids, true)
if err != nil {
return err
}

for m := range ch {
// TODO deal with failed
log.Debugf(ctx, "[doRemoveWorkloadSync] Removed %s", m.WorkloadID)
}
return nil
Expand Down
8 changes: 4 additions & 4 deletions cluster/calcium/remove_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestRemoveWorkload(t *testing.T) {

// failed by GetWorkload
store.On("GetWorkloads", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once()
ch, err := c.RemoveWorkload(ctx, []string{"xx"}, false, 0)
ch, err := c.RemoveWorkload(ctx, []string{"xx"}, false)
assert.True(t, errors.Is(err, types.ErrNoETCD))
store.AssertExpectations(t)

Expand All @@ -36,7 +36,7 @@ func TestRemoveWorkload(t *testing.T) {
}
store.On("GetWorkloads", mock.Anything, mock.Anything).Return([]*types.Workload{workload}, nil)
store.On("GetNode", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once()
ch, err = c.RemoveWorkload(ctx, []string{"xx"}, false, 0)
ch, err = c.RemoveWorkload(ctx, []string{"xx"}, false)
assert.NoError(t, err)
for r := range ch {
assert.False(t, r.Success)
Expand All @@ -54,7 +54,7 @@ func TestRemoveWorkload(t *testing.T) {
store.On("GetNode", mock.Anything, mock.Anything).Return(node, nil)
store.On("RemoveWorkload", mock.Anything, mock.Anything).Return(types.ErrNoETCD).Twice()
store.On("ListNodeWorkloads", mock.Anything, mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD)
ch, err = c.RemoveWorkload(ctx, []string{"xx"}, false, 0)
ch, err = c.RemoveWorkload(ctx, []string{"xx"}, false)
assert.NoError(t, err)
for r := range ch {
assert.False(t, r.Success)
Expand All @@ -69,7 +69,7 @@ func TestRemoveWorkload(t *testing.T) {
store.On("GetWorkloads", mock.Anything, mock.Anything).Return([]*types.Workload{workload}, nil)
store.On("RemoveWorkload", mock.Anything, mock.Anything).Return(nil)
store.On("UpdateNodeResource", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
ch, err = c.RemoveWorkload(ctx, []string{"xx"}, false, 0)
ch, err = c.RemoveWorkload(ctx, []string{"xx"}, false)
assert.NoError(t, err)
for r := range ch {
assert.True(t, r.Success)
Expand Down
207 changes: 95 additions & 112 deletions cluster/calcium/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,21 @@ const (
// WAL for calcium.
type WAL struct {
wal.WAL
config types.Config
calcium *Calcium
}

func newCalciumWAL(cal *Calcium) (*WAL, error) {
w := &WAL{
WAL: wal.NewHydro(),
config: cal.config,
calcium: cal,
func newWAL(config types.Config, calcium *Calcium) (*WAL, error) {
hydro, err := wal.NewHydro(config.WALFile, config.WALOpenTimeout)
if err != nil {
return nil, err
}

if err := w.WAL.Open(w.config.WALFile, w.config.WALOpenTimeout); err != nil {
return nil, err
w := &WAL{
WAL: hydro,
calcium: calcium,
}

w.registerHandlers()

return w, nil
}

Expand All @@ -49,105 +47,22 @@ func (w *WAL) registerHandlers() {
w.Register(newProcessingCreatedHandler(w.calcium))
}

func (w *WAL) logCreateLambda(opts *types.CreateWorkloadMessage) (wal.Commit, error) {
return w.Log(eventCreateLambda, opts.WorkloadID)
}

// CreateWorkloadHandler indicates event handler for creating workload.
type CreateWorkloadHandler struct {
event string
calcium *Calcium
}

func newCreateWorkloadHandler(cal *Calcium) *CreateWorkloadHandler {
return &CreateWorkloadHandler{
event: eventWorkloadCreated,
calcium: cal,
}
}

// Event .
func (h *CreateWorkloadHandler) Event() string {
return h.event
}

// Check .
func (h *CreateWorkloadHandler) Check(ctx context.Context, raw interface{}) (handle bool, err error) {
_, ok := raw.(*types.Workload)
if !ok {
return false, types.NewDetailedErr(types.ErrInvalidType, raw)
}
return true, nil
}

// Encode .
func (h *CreateWorkloadHandler) Encode(raw interface{}) ([]byte, error) {
wrk, ok := raw.(*types.Workload)
if !ok {
return nil, types.NewDetailedErr(types.ErrInvalidType, raw)
}
return json.Marshal(wrk)
}

// Decode .
func (h *CreateWorkloadHandler) Decode(bs []byte) (interface{}, error) {
wrk := &types.Workload{}
err := json.Unmarshal(bs, wrk)
return wrk, err
}

// Handle will remove instance, remove meta, restore resource
func (h *CreateWorkloadHandler) Handle(ctx context.Context, raw interface{}) (err error) {
wrk, _ := raw.(*types.Workload)
logger := log.WithField("WAL.Handle", "CreateWorkload").WithField("ID", wrk.ID).WithField("nodename", wrk.Nodename)

ctx, cancel := getReplayContext(ctx)
defer cancel()

if _, err = h.calcium.GetWorkload(ctx, wrk.ID); err == nil {
// workload meta exists
ch, err := h.calcium.RemoveWorkload(ctx, []string{wrk.ID}, true, 0)
if err != nil {
return logger.Err(ctx, err)
}
for msg := range ch {
if !msg.Success {
logger.Errorf(ctx, "failed to remove workload")
}
}
logger.Infof(ctx, "workload with meta removed")
return nil
}

// workload meta doesn't exist
node, err := h.calcium.GetNode(ctx, wrk.Nodename)
if err != nil {
return logger.Err(ctx, err)
}
if err = node.Engine.VirtualizationRemove(ctx, wrk.ID, true, true); err != nil && !errors.Is(err, types.ErrWorkloadNotExists) {
return logger.Err(ctx, err)
}

logger.Infof(ctx, "workload removed")
return nil
}

// CreateLambdaHandler indicates event handler for creating lambda.
type CreateLambdaHandler struct {
event string
typ string
calcium *Calcium
}

func newCreateLambdaHandler(cal *Calcium) *CreateLambdaHandler {
func newCreateLambdaHandler(calcium *Calcium) *CreateLambdaHandler {
return &CreateLambdaHandler{
event: eventCreateLambda,
calcium: cal,
typ: eventCreateLambda,
calcium: calcium,
}
}

// Event .
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

打错了...

func (h *CreateLambdaHandler) Event() string {
return h.event
func (h *CreateLambdaHandler) Typ() string {
return h.typ
}

// Check .
Expand Down Expand Up @@ -202,26 +117,90 @@ func (h *CreateLambdaHandler) Handle(ctx context.Context, raw interface{}) error
return nil
}

func getReplayContext(ctx context.Context) (context.Context, context.CancelFunc) {
return context.WithTimeout(ctx, time.Second*32)
// CreateWorkloadHandler indicates event handler for creating workload.
type CreateWorkloadHandler struct {
typ string
calcium *Calcium
}

func newCreateWorkloadHandler(calcium *Calcium) *CreateWorkloadHandler {
return &CreateWorkloadHandler{
typ: eventWorkloadCreated,
calcium: calcium,
}
}

// Event .
func (h *CreateWorkloadHandler) Typ() string {
return h.typ
}

// Check .
func (h *CreateWorkloadHandler) Check(ctx context.Context, raw interface{}) (handle bool, err error) {
_, ok := raw.(*types.Workload)
if !ok {
return false, types.NewDetailedErr(types.ErrInvalidType, raw)
}
return true, nil
}

// Encode .
func (h *CreateWorkloadHandler) Encode(raw interface{}) ([]byte, error) {
wrk, ok := raw.(*types.Workload)
if !ok {
return nil, types.NewDetailedErr(types.ErrInvalidType, raw)
}
return json.Marshal(wrk)
}

// Decode .
func (h *CreateWorkloadHandler) Decode(bs []byte) (interface{}, error) {
wrk := &types.Workload{}
err := json.Unmarshal(bs, wrk)
return wrk, err
}

// Handle will remove instance, remove meta, restore resource
func (h *CreateWorkloadHandler) Handle(ctx context.Context, raw interface{}) (err error) {
wrk, _ := raw.(*types.Workload)
logger := log.WithField("WAL.Handle", "CreateWorkload").WithField("ID", wrk.ID).WithField("nodename", wrk.Nodename)

ctx, cancel := getReplayContext(ctx)
defer cancel()

if _, err = h.calcium.GetWorkload(ctx, wrk.ID); err == nil {
return h.calcium.doRemoveWorkloadSync(ctx, []string{wrk.ID})
}

// workload meta doesn't exist
node, err := h.calcium.GetNode(ctx, wrk.Nodename)
if err != nil {
return logger.Err(ctx, err)
}
if err = node.Engine.VirtualizationRemove(ctx, wrk.ID, true, true); err != nil && !errors.Is(err, types.ErrWorkloadNotExists) {
return logger.Err(ctx, err)
}

logger.Infof(ctx, "workload removed")
return nil
}

// WorkloadResourceAllocatedHandler .
type WorkloadResourceAllocatedHandler struct {
event string
typ string
calcium *Calcium
}

func newWorkloadResourceAllocatedHandler(cal *Calcium) *WorkloadResourceAllocatedHandler {
func newWorkloadResourceAllocatedHandler(calcium *Calcium) *WorkloadResourceAllocatedHandler {
return &WorkloadResourceAllocatedHandler{
event: eventWorkloadResourceAllocated,
calcium: cal,
typ: eventWorkloadResourceAllocated,
calcium: calcium,
}
}

// Event .
func (h *WorkloadResourceAllocatedHandler) Event() string {
return h.event
func (h *WorkloadResourceAllocatedHandler) Typ() string {
return h.typ
}

// Check .
Expand Down Expand Up @@ -276,20 +255,20 @@ func (h *WorkloadResourceAllocatedHandler) Handle(ctx context.Context, raw inter

// ProcessingCreatedHandler .
type ProcessingCreatedHandler struct {
event string
typ string
calcium *Calcium
}

func newProcessingCreatedHandler(cal *Calcium) *ProcessingCreatedHandler {
func newProcessingCreatedHandler(calcium *Calcium) *ProcessingCreatedHandler {
return &ProcessingCreatedHandler{
event: eventProcessingCreated,
calcium: cal,
typ: eventProcessingCreated,
calcium: calcium,
}
}

// Event .
func (h *ProcessingCreatedHandler) Event() string {
return h.event
func (h *ProcessingCreatedHandler) Typ() string {
return h.typ
}

// Check .
Expand Down Expand Up @@ -329,3 +308,7 @@ func (h *ProcessingCreatedHandler) Handle(ctx context.Context, raw interface{})
logger.Infof(ctx, "obsolete processing deleted")
return
}

func getReplayContext(ctx context.Context) (context.Context, context.CancelFunc) {
return context.WithTimeout(ctx, time.Second*32) // TODO why 32?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ctx这块确实有点乱了,以后要花点精力重整一下...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这种我都已经……弃疗了,你整理下吧

}
Loading