Skip to content

Commit

Permalink
recify logging system (#349)
Browse files Browse the repository at this point in the history
  • Loading branch information
jschwinger233 authored Mar 1, 2021
1 parent e67535c commit cbb0e3c
Show file tree
Hide file tree
Showing 29 changed files with 309 additions and 206 deletions.
56 changes: 32 additions & 24 deletions cluster/calcium/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,62 +3,65 @@ package calcium
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"os"
"time"

"github.com/pkg/errors"
enginetypes "github.com/projecteru2/core/engine/types"
"github.com/projecteru2/core/log"
"github.com/projecteru2/core/types"
)

// BuildImage will build image
func (c *Calcium) BuildImage(ctx context.Context, opts *types.BuildOptions) (chan *types.BuildImageMessage, error) {
func (c *Calcium) BuildImage(ctx context.Context, opts *types.BuildOptions) (ch chan *types.BuildImageMessage, err error) {
logger := log.WithField("Calcium", "BuildImage").WithField("opts", opts)
// Disable build API if scm not set
if c.source == nil {
return nil, types.ErrSCMNotSet
return nil, logger.Err(errors.WithStack(types.ErrSCMNotSet))
}
// select nodes
node, err := c.selectBuildNode(ctx)
if err != nil {
return nil, err
return nil, logger.Err(errors.WithStack(err))
}
log.Infof("[BuildImage] Building image at pod %s node %s", node.Podname, node.Name)
// get refs
refs := node.Engine.BuildRefs(ctx, opts.Name, opts.Tags)

switch opts.BuildMethod {
case types.BuildFromSCM:
return c.buildFromSCM(ctx, node, refs, opts)
ch, err = c.buildFromSCM(ctx, node, refs, opts)
case types.BuildFromRaw:
return c.buildFromContent(ctx, node, refs, opts.Tar)
ch, err = c.buildFromContent(ctx, node, refs, opts.Tar)
case types.BuildFromExist:
return c.buildFromExist(ctx, refs[0], opts.ExistID)
ch, err = c.buildFromExist(ctx, refs[0], opts.ExistID)
default:
return nil, errors.New("unknown build type")
return nil, logger.Err(errors.WithStack(errors.New("unknown build type")))
}
return ch, logger.Err(errors.WithStack(err))
}

func (c *Calcium) selectBuildNode(ctx context.Context) (*types.Node, error) {
// get pod from config
// TODO can choose multiple pod here for other engine support
if c.config.Docker.BuildPod == "" {
return nil, types.ErrNoBuildPod
return nil, errors.WithStack(types.ErrNoBuildPod)
}

// get node by scheduler
nodes, err := c.ListPodNodes(ctx, c.config.Docker.BuildPod, nil, false)
if err != nil {
return nil, err
return nil, errors.WithStack(err)
}
if len(nodes) == 0 {
return nil, types.ErrInsufficientNodes
return nil, errors.WithStack(types.ErrInsufficientNodes)
}
// get idle max node
return c.scheduler.MaxIdleNode(nodes)
node, err := c.scheduler.MaxIdleNode(nodes)
return node, errors.WithStack(err)
}

func (c *Calcium) buildFromSCM(ctx context.Context, node *types.Node, refs []string, opts *types.BuildOptions) (chan *types.BuildImageMessage, error) {
Expand All @@ -70,38 +73,42 @@ func (c *Calcium) buildFromSCM(ctx context.Context, node *types.Node, refs []str
path, content, err := node.Engine.BuildContent(ctx, c.source, buildContentOpts)
defer os.RemoveAll(path)
if err != nil {
return nil, err
return nil, errors.WithStack(err)
}
return c.buildFromContent(ctx, node, refs, content)
ch, err := c.buildFromContent(ctx, node, refs, content)
return ch, errors.WithStack(err)
}

func (c *Calcium) buildFromContent(ctx context.Context, node *types.Node, refs []string, content io.Reader) (chan *types.BuildImageMessage, error) {
resp, err := node.Engine.ImageBuild(ctx, content, refs)
if err != nil {
return nil, err
return nil, errors.WithStack(err)
}
return c.pushImage(ctx, resp, node, refs)
ch, err := c.pushImage(ctx, resp, node, refs)
return ch, errors.WithStack(err)
}

func (c *Calcium) buildFromExist(ctx context.Context, ref, existID string) (chan *types.BuildImageMessage, error) {
func (c *Calcium) buildFromExist(ctx context.Context, ref, existID string) (chan *types.BuildImageMessage, error) { // nolint:unparam
logger := log.WithField("Calcium", "buildFromExist").WithField("ref", ref).WithField("existID", existID)
return withImageBuiltChannel(func(ch chan *types.BuildImageMessage) {
node, err := c.getWorkloadNode(ctx, existID)
if err != nil {
ch <- buildErrMsg(err)
ch <- buildErrMsg(logger.Err(err))
return
}

imageID, err := node.Engine.ImageBuildFromExist(ctx, existID, ref)
if err != nil {
ch <- buildErrMsg(err)
ch <- buildErrMsg(logger.Err(err))
return
}
go cleanupNodeImages(node, []string{imageID}, c.config.GlobalTimeout)
ch <- &types.BuildImageMessage{ID: imageID}
}), nil
}

func (c *Calcium) pushImage(ctx context.Context, resp io.ReadCloser, node *types.Node, tags []string) (chan *types.BuildImageMessage, error) {
func (c *Calcium) pushImage(ctx context.Context, resp io.ReadCloser, node *types.Node, tags []string) (chan *types.BuildImageMessage, error) { // nolint:unparam
logger := log.WithField("Calcium", "pushImage").WithField("node", node).WithField("tags", tags)
return withImageBuiltChannel(func(ch chan *types.BuildImageMessage) {
defer resp.Close()
decoder := json.NewDecoder(resp)
Expand All @@ -119,7 +126,7 @@ func (c *Calcium) pushImage(ctx context.Context, resp io.ReadCloser, node *types
break
}
malformed, _ := ioutil.ReadAll(decoder.Buffered()) // TODO err check
log.Errorf("[BuildImage] Decode build image message failed %v, buffered: %v", err, malformed)
logger.Errorf("[BuildImage] Decode build image message failed %+v, buffered: %v", err, malformed)
return
}
ch <- message
Expand All @@ -137,7 +144,7 @@ func (c *Calcium) pushImage(ctx context.Context, resp io.ReadCloser, node *types
log.Infof("[BuildImage] Push image %s", tag)
rc, err := node.Engine.ImagePush(ctx, tag)
if err != nil {
ch <- &types.BuildImageMessage{Error: err.Error()}
ch <- &types.BuildImageMessage{Error: logger.Err(err).Error()}
continue
}

Expand Down Expand Up @@ -165,15 +172,16 @@ func withImageBuiltChannel(f func(chan *types.BuildImageMessage)) chan *types.Bu
}

func cleanupNodeImages(node *types.Node, ids []string, ttl time.Duration) {
logger := log.WithField("Calcium", "cleanupNodeImages").WithField("node", node).WithField("ids", ids).WithField("ttl", ttl)
ctx, cancel := context.WithTimeout(context.Background(), ttl)
defer cancel()
for _, id := range ids {
if _, err := node.Engine.ImageRemove(ctx, id, false, true); err != nil {
log.Errorf("[BuildImage] Remove image error: %s", err)
logger.Errorf("[BuildImage] Remove image error: %+v", errors.WithStack(err))
}
}
if spaceReclaimed, err := node.Engine.ImageBuildCachePrune(ctx, true); err != nil {
log.Errorf("[BuildImage] Remove build image cache error: %s", err)
logger.Errorf("[BuildImage] Remove build image cache error: %+v", errors.WithStack(err))
} else {
log.Infof("[BuildImage] Clean cached image and release space %d", spaceReclaimed)
}
Expand Down
12 changes: 7 additions & 5 deletions cluster/calcium/calcium.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"strings"

"github.com/pkg/errors"
"github.com/projecteru2/core/cluster"
"github.com/projecteru2/core/discovery"
"github.com/projecteru2/core/discovery/helium"
Expand All @@ -30,16 +31,17 @@ type Calcium struct {

// New returns a new cluster config
func New(config types.Config, embeddedStorage bool) (*Calcium, error) {
logger := log.WithField("Calcium", "New").WithField("config", config)
// set store
store, err := etcdv3.New(config, embeddedStorage)
if err != nil {
return nil, err
return nil, logger.Err(errors.WithStack(err))
}

// set scheduler
potassium, err := complexscheduler.New(config)
if err != nil {
return nil, err
return nil, logger.Err(errors.WithStack(err))
}
scheduler.InitSchedulerV1(potassium)

Expand All @@ -55,16 +57,16 @@ func New(config types.Config, embeddedStorage bool) (*Calcium, error) {
log.Warn("[Calcium] SCM not set, build API disabled")
}
if err != nil {
log.Errorf("[Calcium] SCAM failed: %v", err)
return nil, err
logger.Errorf("[Calcium] SCAM failed: %+v", err)
return nil, errors.WithStack(err)
}

// set watcher
watcher := helium.New(config.GRPCConfig, store)

cal := &Calcium{store: store, config: config, scheduler: potassium, source: scm, watcher: watcher}
cal.wal, err = newCalciumWAL(cal)
return cal, err
return cal, logger.Err(errors.WithStack(err))
}

// DisasterRecover .
Expand Down
3 changes: 3 additions & 0 deletions cluster/calcium/capacity.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

// CalculateCapacity calculates capacity
func (c *Calcium) CalculateCapacity(ctx context.Context, opts *types.DeployOptions) (*types.CapacityMessage, error) {
logger := log.WithField("Calcium", "CalculateCapacity").WithField("opts", opts)
var err error
msg := &types.CapacityMessage{
Total: 0,
Expand All @@ -21,6 +22,7 @@ func (c *Calcium) CalculateCapacity(ctx context.Context, opts *types.DeployOptio
return msg, c.withNodesLocked(ctx, opts.Podname, opts.Nodenames, nil, false, func(ctx context.Context, nodeMap map[string]*types.Node) error {
if opts.DeployStrategy != strategy.Dummy {
if _, msg.NodeCapacities, err = c.doAllocResource(ctx, nodeMap, opts); err != nil {
logger.Errorf("[Calcium.CalculateCapacity] doAllocResource failed: %+v", err)
return errors.WithStack(err)
}

Expand All @@ -31,6 +33,7 @@ func (c *Calcium) CalculateCapacity(ctx context.Context, opts *types.DeployOptio
var infos []strategy.Info
msg.Total, _, infos, err = c.doCalculateCapacity(nodeMap, opts)
if err != nil {
logger.Errorf("[Calcium.CalculateCapacity] doCalculateCapacity failed: %+v", err)
return errors.WithStack(err)
}
for _, info := range infos {
Expand Down
22 changes: 12 additions & 10 deletions cluster/calcium/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"sync"

"github.com/pkg/errors"
"github.com/projecteru2/core/cluster"
"github.com/projecteru2/core/log"
"github.com/projecteru2/core/types"
Expand All @@ -13,6 +14,7 @@ import (

// ControlWorkload control workloads status
func (c *Calcium) ControlWorkload(ctx context.Context, ids []string, t string, force bool) (chan *types.ControlWorkloadMessage, error) {
logger := log.WithField("Calcium", "ControlWorkload").WithField("ids", ids).WithField("t", t).WithField("force", force)
ch := make(chan *types.ControlWorkloadMessage)

go func() {
Expand All @@ -28,20 +30,20 @@ func (c *Calcium) ControlWorkload(ctx context.Context, ids []string, t string, f
switch t {
case cluster.WorkloadStop:
message, err = c.doStopWorkload(ctx, workload, force)
return err
return errors.WithStack(err)
case cluster.WorkloadStart:
message, err = c.doStartWorkload(ctx, workload, force)
return err
return errors.WithStack(err)
case cluster.WorkloadRestart:
message, err = c.doStopWorkload(ctx, workload, force)
if err != nil {
return err
return errors.WithStack(err)
}
startHook, err := c.doStartWorkload(ctx, workload, force)
message = append(message, startHook...)
return err
return errors.WithStack(err)
}
return types.ErrUnknownControlType
return errors.WithStack(types.ErrUnknownControlType)
})
if err == nil {
log.Infof("[ControlWorkload] Workload %s %s", id, t)
Expand All @@ -50,7 +52,7 @@ func (c *Calcium) ControlWorkload(ctx context.Context, ids []string, t string, f
}
ch <- &types.ControlWorkloadMessage{
WorkloadID: id,
Error: err,
Error: logger.Err(err),
Hook: message,
}
}(id)
Expand All @@ -63,7 +65,7 @@ func (c *Calcium) ControlWorkload(ctx context.Context, ids []string, t string, f

func (c *Calcium) doStartWorkload(ctx context.Context, workload *types.Workload, force bool) (message []*bytes.Buffer, err error) {
if err = workload.Start(ctx); err != nil {
return message, err
return message, errors.WithStack(err)
}
// TODO healthcheck first
if workload.Hook != nil && len(workload.Hook.AfterStart) > 0 {
Expand All @@ -75,7 +77,7 @@ func (c *Calcium) doStartWorkload(ctx context.Context, workload *types.Workload,
force, workload.Engine,
)
}
return message, err
return message, errors.WithStack(err)
}

func (c *Calcium) doStopWorkload(ctx context.Context, workload *types.Workload, force bool) (message []*bytes.Buffer, err error) {
Expand All @@ -88,7 +90,7 @@ func (c *Calcium) doStopWorkload(ctx context.Context, workload *types.Workload,
force, workload.Engine,
)
if err != nil {
return message, err
return message, errors.WithStack(err)
}
}

Expand All @@ -98,5 +100,5 @@ func (c *Calcium) doStopWorkload(ctx context.Context, workload *types.Workload,
if err = workload.Stop(ctx); err != nil {
message = append(message, bytes.NewBufferString(err.Error()))
}
return message, err
return message, errors.WithStack(err)
}
6 changes: 4 additions & 2 deletions cluster/calcium/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@ import (
"context"
"sync"

"github.com/pkg/errors"
"github.com/projecteru2/core/log"
"github.com/projecteru2/core/types"
)

// Copy uses VirtualizationCopyFrom cp to copy specified things and send to remote
func (c *Calcium) Copy(ctx context.Context, opts *types.CopyOptions) (chan *types.CopyMessage, error) {
logger := log.WithField("Calcium", "Copy").WithField("opts", opts)
if err := opts.Validate(); err != nil {
return nil, err
return nil, logger.Err(errors.WithStack(err))
}
ch := make(chan *types.CopyMessage)
go func() {
Expand All @@ -30,7 +32,7 @@ func (c *Calcium) Copy(ctx context.Context, opts *types.CopyOptions) (chan *type
}
return nil
}); err != nil {
ch <- makeCopyMessage(id, "", "", err, nil)
ch <- makeCopyMessage(id, "", "", logger.Err(err), nil)
}
}(id, paths)
}
Expand Down
Loading

0 comments on commit cbb0e3c

Please sign in to comment.