diff --git a/client/clientpool.go b/client/clientpool.go index ebe8b045d..967edd9b0 100644 --- a/client/clientpool.go +++ b/client/clientpool.go @@ -42,7 +42,7 @@ func NewCoreRPCClientPool(ctx context.Context, config *PoolConfig) (*Pool, error rpc, err = NewClient(ctx, addr, config.Auth) }) if err != nil { - log.Errorf(ctx, err, "[NewCoreRPCClientPool] connect to %s failed, err: %s", addr, err) + log.Errorf(ctx, err, "[NewCoreRPCClientPool] connect to %s failed", addr) continue } rpcClient := rpc.GetRPCClient() @@ -96,7 +96,7 @@ func checkAlive(ctx context.Context, rpc *clientWithStatus, timeout time.Duratio _, err = rpc.client.Info(ctx, &pb.Empty{}) }) if err != nil { - log.Errorf(ctx, err, "[ClientPool] connect to %s failed, err: %s", rpc.addr, err) + log.Errorf(ctx, err, "[ClientPool] connect to %s failed", rpc.addr) return false } log.Debugf(ctx, "[ClientPool] connect to %s success", rpc.addr) diff --git a/client/resolver/eru/resolver.go b/client/resolver/eru/resolver.go index 4dc41ee6c..993c2c1e0 100644 --- a/client/resolver/eru/resolver.go +++ b/client/resolver/eru/resolver.go @@ -51,13 +51,13 @@ func (r *Resolver) sync() { ch, err := r.discovery.Watch(ctx) if err != nil { - log.Errorf(ctx, err, "[EruResolver] failed to watch service status: %v", err) + log.Error(ctx, err, "[EruResolver] failed to watch service status") return } for { select { case <-ctx.Done(): - log.Errorf(ctx, ctx.Err(), "[EruResolver] watch interrupted: %v", ctx.Err()) + log.Error(ctx, ctx.Err(), "[EruResolver] watch interrupted") return case endpoints, ok := <-ch: if !ok { diff --git a/client/servicediscovery/eru_service_discovery.go b/client/servicediscovery/eru_service_discovery.go index 555e184f2..ea65b6bb0 100644 --- a/client/servicediscovery/eru_service_discovery.go +++ b/client/servicediscovery/eru_service_discovery.go @@ -34,7 +34,7 @@ func New(endpoint string, authConfig types.AuthConfig) *EruServiceDiscovery { func (w *EruServiceDiscovery) Watch(ctx context.Context) (_ <-chan []string, err error) { cc, err := w.dial(ctx, w.endpoint, w.authConfig) if err != nil { - log.Errorf(ctx, err, "[EruServiceWatch] dial failed: %v", err) + log.Error(ctx, err, "[EruServiceWatch] dial failed") return } client := pb.NewCoreRPCClient(cc) @@ -48,7 +48,7 @@ func (w *EruServiceDiscovery) Watch(ctx context.Context) (_ <-chan []string, err watchCtx, cancelWatch := context.WithCancel(ctx) stream, err := client.WatchServiceStatus(watchCtx, &pb.Empty{}) if err != nil { - log.Errorf(ctx, err, "[EruServiceWatch] watch failed, try later: %v", err) + log.Error(ctx, err, "[EruServiceWatch] watch failed, try later") time.Sleep(10 * time.Second) continue } @@ -69,7 +69,7 @@ func (w *EruServiceDiscovery) Watch(ctx context.Context) (_ <-chan []string, err status, err := stream.Recv() close(cancelTimer) if err != nil { - log.Errorf(ctx, err, "[EruServiceWatch] recv failed: %v", err) + log.Error(ctx, err, "[EruServiceWatch] recv failed") break } expectedInterval = time.Duration(status.GetIntervalInSecond()) diff --git a/client/utils/servicepusher.go b/client/utils/servicepusher.go index 880109d99..aec3d9747 100644 --- a/client/utils/servicepusher.go +++ b/client/utils/servicepusher.go @@ -111,7 +111,7 @@ func (p *EndpointPusher) pollReachability(ctx context.Context, endpoint string) func (p *EndpointPusher) checkReachability(host string) (err error) { pinger, err := ping.NewPinger(host) if err != nil { - log.Errorf(nil, err, "[EruResolver] failed to create pinger: %+v", err) //nolint + log.Error(nil, err, "[EruResolver] failed to create pinger") //nolint return } pinger.SetPrivileged(os.Getuid() == 0) diff --git a/cluster/calcium/build.go b/cluster/calcium/build.go index 7571db09e..bb8128873 100644 --- a/cluster/calcium/build.go +++ b/cluster/calcium/build.go @@ -9,6 +9,7 @@ import ( "os" "time" + "github.com/pkg/errors" enginetypes "github.com/projecteru2/core/engine/types" "github.com/projecteru2/core/log" "github.com/projecteru2/core/types" @@ -140,20 +141,19 @@ func (c *Calcium) pushImageAndClean(ctx context.Context, resp io.ReadCloser, nod lastMessage := &types.BuildImageMessage{} for { message := &types.BuildImageMessage{} - err := decoder.Decode(message) - if err != nil { + if err := decoder.Decode(message); err != nil { if err == io.EOF { break } if err == context.Canceled || err == context.DeadlineExceeded { - log.Errorf(ctx, err, "[BuildImage] context timeout") + log.Error(ctx, err, "[BuildImage] context timeout") lastMessage.ErrorDetail.Code = -1 lastMessage.ErrorDetail.Message = err.Error() lastMessage.Error = err.Error() break } malformed, _ := io.ReadAll(decoder.Buffered()) // TODO err check - logger.Errorf(ctx, nil, "[BuildImage] Decode build image message failed %+v, buffered: %v", err, malformed) + logger.Errorf(ctx, err, "[BuildImage] Decode build image message failed, buffered: %v", malformed) return } ch <- message @@ -161,7 +161,7 @@ func (c *Calcium) pushImageAndClean(ctx context.Context, resp io.ReadCloser, nod } if lastMessage.Error != "" { - log.Errorf(ctx, nil, "[BuildImage] Build image failed %v", lastMessage.ErrorDetail.Message) + logger.Errorf(ctx, errors.New(lastMessage.Error), "[BuildImage] Build image failed %v", lastMessage.ErrorDetail.Message) return } @@ -216,11 +216,11 @@ func cleanupNodeImages(ctx context.Context, node *types.Node, ids []string, ttl defer cancel() for _, id := range ids { if _, err := node.Engine.ImageRemove(ctx, id, false, true); err != nil { - logger.Errorf(ctx, err, "[BuildImage] Remove image error: %+v", err) + logger.Error(ctx, err, "[BuildImage] Remove image error") } } if spaceReclaimed, err := node.Engine.ImageBuildCachePrune(ctx, true); err != nil { - logger.Errorf(ctx, err, "[BuildImage] Remove build image cache error: %+v", err) + logger.Error(ctx, err, "[BuildImage] Remove build image cache error") } else { logger.Infof(ctx, "[BuildImage] Clean cached image and release space %d", spaceReclaimed) } diff --git a/cluster/calcium/calcium.go b/cluster/calcium/calcium.go index a9e32ea45..37870d4d0 100644 --- a/cluster/calcium/calcium.go +++ b/cluster/calcium/calcium.go @@ -55,7 +55,7 @@ func New(ctx context.Context, config types.Config, t *testing.T) (*Calcium, erro log.Warn(ctx, "[Calcium] SCM not set, build API disabled") } if err != nil { - log.Errorf(ctx, err, "[Calcium] SCM failed: %+v", err) + log.Error(ctx, err, "[Calcium] SCM failed") return nil, err } @@ -72,12 +72,12 @@ func New(ctx context.Context, config types.Config, t *testing.T) (*Calcium, erro // load internal plugins cpumem, err := cpumem.NewPlugin(config) if err != nil { - log.Errorf(ctx, err, "[NewPluginManager] new cpumem plugin error: %v", err) + log.Error(ctx, err, "[NewPluginManager] new cpumem plugin error") return nil, err } volume, err := volume.NewPlugin(config) if err != nil { - log.Errorf(ctx, err, "[NewPluginManager] new volume plugin error: %v", err) + log.Error(ctx, err, "[NewPluginManager] new volume plugin error") return nil, err } rmgr.AddPlugins(cpumem, volume) diff --git a/cluster/calcium/capacity.go b/cluster/calcium/capacity.go index 3b77264a7..fe1125bfd 100644 --- a/cluster/calcium/capacity.go +++ b/cluster/calcium/capacity.go @@ -28,7 +28,7 @@ func (c *Calcium) CalculateCapacity(ctx context.Context, opts *types.DeployOptio if opts.DeployStrategy != strategy.Dummy { if msg.NodeCapacities, err = c.doGetDeployStrategy(ctx, nodenames, opts); err != nil { - logger.Errorf(ctx, err, "[Calcium.CalculateCapacity] doGetDeployMap failed: %+v", err) + logger.Error(ctx, err, "[Calcium.CalculateCapacity] doGetDeployMap failed") return err } @@ -41,7 +41,7 @@ func (c *Calcium) CalculateCapacity(ctx context.Context, opts *types.DeployOptio var infos map[string]*resources.NodeCapacityInfo infos, msg.Total, err = c.rmgr.GetNodesDeployCapacity(ctx, nodenames, opts.ResourceOpts) if err != nil { - logger.Errorf(ctx, err, "[Calcium.CalculateCapacity] failed to get nodes capacity: %+v", err) + logger.Error(ctx, err, "[Calcium.CalculateCapacity] failed to get nodes capacity") return err } if msg.Total <= 0 { diff --git a/cluster/calcium/create.go b/cluster/calcium/create.go index aeff16552..978a210c3 100644 --- a/cluster/calcium/create.go +++ b/cluster/calcium/create.go @@ -62,7 +62,7 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio for nodename := range deployMap { processing := opts.GetProcessing(nodename) if err := c.store.DeleteProcessing(cctx, processing); err != nil { - logger.Errorf(ctx, err, "[Calcium.doCreateWorkloads] delete processing failed for %s: %+v", nodename, err) + logger.Errorf(ctx, err, "[Calcium.doCreateWorkloads] delete processing failed for %s", nodename) } } close(ch) @@ -73,7 +73,7 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio defer func() { if resourceCommit != nil { if err := resourceCommit(); err != nil { - logger.Errorf(ctx, err, "commit wal failed: %s, %+v", eventWorkloadResourceAllocated, err) + logger.Errorf(ctx, err, "commit wal failed: %s", eventWorkloadResourceAllocated) } } }() @@ -85,7 +85,7 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio continue } if err := processingCommits[nodename](); err != nil { - logger.Errorf(ctx, err, "commit wal failed: %s, %s, %+v", eventProcessingCreated, nodename, err) + logger.Errorf(ctx, err, "commit wal failed: %s, %s", eventProcessingCreated, nodename) } } }() @@ -186,7 +186,7 @@ func (c *Calcium) doDeployWorkloads(ctx context.Context, for nodename, deploy := range deployMap { _ = c.pool.Invoke(func(deploy int) func() { return func() { - metrics.Client.SendDeployCount(deploy) + metrics.Client.SendDeployCount(ctx, deploy) } }(deploy)) _ = c.pool.Invoke(func(nodename string, deploy, seq int) func() { @@ -322,7 +322,7 @@ func (c *Calcium) doDeployOneWorkload( defer func() { if commit != nil { if err := commit(); err != nil { - logger.Errorf(ctx, err, "Commit WAL %s failed: %+v", eventWorkloadCreated, err) + logger.Errorf(ctx, err, "Commit WAL %s failed", eventWorkloadCreated) } } }() @@ -427,7 +427,7 @@ func (c *Calcium) doDeployOneWorkload( // remove workload func(ctx context.Context, _ bool) error { - logger.Errorf(ctx, nil, "[doDeployOneWorkload] failed to deploy workload %s, rollback", workload.ID) + logger.Infof(ctx, "[doDeployOneWorkload] failed to deploy workload %s, rollback", workload.ID) if workload.ID == "" { return nil } diff --git a/cluster/calcium/dissociate.go b/cluster/calcium/dissociate.go index 5a5cb7996..4966c020f 100644 --- a/cluster/calcium/dissociate.go +++ b/cluster/calcium/dissociate.go @@ -15,7 +15,7 @@ func (c *Calcium) DissociateWorkload(ctx context.Context, ids []string) (chan *t nodeWorkloadGroup, err := c.groupWorkloadsByNode(ctx, ids) if err != nil { - logger.Errorf(ctx, err, "failed to group workloads by node: %+v", err) + logger.Error(ctx, err, "failed to group workloads by node") return nil, err } @@ -58,7 +58,7 @@ func (c *Calcium) DissociateWorkload(ctx context.Context, ids []string) (chan *t c.config.GlobalTimeout, ) }); err != nil { - logger.WithField("id", workloadID).Errorf(ctx, err, "failed to lock workload: %+v", err) + logger.WithField("id", workloadID).Error(ctx, err, "failed to lock workload") msg.Error = err } ch <- msg @@ -66,7 +66,7 @@ func (c *Calcium) DissociateWorkload(ctx context.Context, ids []string) (chan *t _ = c.pool.Invoke(func() { c.doRemapResourceAndLog(ctx, logger, node) }) return nil }); err != nil { - logger.WithField("nodename", nodename).Errorf(ctx, err, "failed to lock node: %+v", err) + logger.WithField("nodename", nodename).Error(ctx, err, "failed to lock node") } } }) diff --git a/cluster/calcium/execute.go b/cluster/calcium/execute.go index 857c9ba8d..5b6f57e4a 100644 --- a/cluster/calcium/execute.go +++ b/cluster/calcium/execute.go @@ -27,7 +27,7 @@ func (c *Calcium) ExecuteWorkload(ctx context.Context, opts *types.ExecuteWorklo workload, err := c.GetWorkload(ctx, opts.WorkloadID) if err != nil { - logger.Errorf(ctx, err, "[ExecuteWorkload] Failed to get workload: %+v", err) + logger.Error(ctx, err, "[ExecuteWorkload] Failed to get workload") return } @@ -44,7 +44,7 @@ func (c *Calcium) ExecuteWorkload(ctx context.Context, opts *types.ExecuteWorklo execID, stdout, stderr, inStream, err := workload.Engine.Execute(ctx, opts.WorkloadID, execConfig) if err != nil { - logger.Errorf(ctx, err, "[ExecuteWorkload] Failed to attach execID: %+v", err) + logger.Errorf(ctx, err, "[ExecuteWorkload] Failed to attach execID %s", execID) return } @@ -62,7 +62,7 @@ func (c *Calcium) ExecuteWorkload(ctx context.Context, opts *types.ExecuteWorklo execCode, err := workload.Engine.ExecExitCode(ctx, opts.WorkloadID, execID) if err != nil { - logger.Errorf(ctx, err, "[ExecuteWorkload] Failed to get exitcode: %+v", err) + logger.Error(ctx, err, "[ExecuteWorkload] Failed to get exitcode") return } diff --git a/cluster/calcium/helper.go b/cluster/calcium/helper.go index 2c25c543e..a494b750a 100644 --- a/cluster/calcium/helper.go +++ b/cluster/calcium/helper.go @@ -11,7 +11,7 @@ import ( func distributionInspect(ctx context.Context, node *types.Node, image string, digests []string) bool { remoteDigest, err := node.Engine.ImageRemoteDigest(ctx, image) if err != nil { - log.Errorf(ctx, err, "[distributionInspect] get manifest failed %v", err) + log.Error(ctx, err, "[distributionInspect] get manifest failed") return false } @@ -51,7 +51,7 @@ func pullImage(ctx context.Context, node *types.Node, image string) error { rc, err := node.Engine.ImagePull(ctx, image, false) defer utils.EnsureReaderClosed(ctx, rc) if err != nil { - log.Errorf(ctx, err, "[pullImage] Error during pulling image %s: %v", image, err) + log.Errorf(ctx, err, "[pullImage] Error during pulling image %s", image) return err } log.Infof(ctx, "[pullImage] Done pulling image %s", image) diff --git a/cluster/calcium/image.go b/cluster/calcium/image.go index 88ff5ec66..aaefbaccb 100644 --- a/cluster/calcium/image.go +++ b/cluster/calcium/image.go @@ -111,7 +111,7 @@ func (c *Calcium) RemoveImage(ctx context.Context, opts *types.ImageOptions) (ch } if opts.Prune { if err := node.Engine.ImagesPrune(ctx); err != nil { - logger.Errorf(ctx, err, "[RemoveImage] Prune %s pod %s node failed: %+v", opts.Podname, node.Name, err) + logger.Errorf(ctx, err, "[RemoveImage] Prune %s pod %s node failed", opts.Podname, node.Name) } else { logger.Infof(ctx, "[RemoveImage] Prune %s pod %s node", opts.Podname, node.Name) } diff --git a/cluster/calcium/lambda.go b/cluster/calcium/lambda.go index 3a5218e69..8dc7d8cd2 100644 --- a/cluster/calcium/lambda.go +++ b/cluster/calcium/lambda.go @@ -37,7 +37,7 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC createChan, err := c.CreateWorkload(ctx, opts) if err != nil { - logger.Errorf(ctx, err, "[RunAndWait] Create workload error %+v", err) + logger.Error(ctx, err, "[RunAndWait] Create workload error") return workloadIDs, nil, err } @@ -58,7 +58,7 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC // we don't need to remove this non-existing workload // so just send the error message and return if message.Error != nil || message.WorkloadID == "" { - logger.Errorf(ctx, message.Error, "[RunAndWait] Create workload failed %+v", message.Error) + logger.Error(ctx, message.Error, "[RunAndWait] Create workload failed") return &types.AttachWorkloadMessage{ WorkloadID: "", Data: []byte(fmt.Sprintf("Create workload failed %+v", message.Error)), @@ -77,7 +77,7 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC } defer func() { if err := commit(); err != nil { - logger.Errorf(ctx, err, "[RunAndWait] Commit WAL %s failed: %s, %v", eventCreateLambda, message.WorkloadID, err) + logger.Errorf(ctx, err, "[RunAndWait] Commit WAL %s failed: %s", eventCreateLambda, message.WorkloadID) } }() @@ -87,7 +87,7 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC ctx, cancel := context.WithCancel(utils.InheritTracingInfo(ctx, context.TODO())) defer cancel() if err := c.doRemoveWorkloadSync(ctx, []string{message.WorkloadID}); err != nil { - logger.Errorf(ctx, err, "[RunAndWait] Remove lambda workload failed %+v", err) + logger.Error(ctx, err, "[RunAndWait] Remove lambda workload failed") } else { logger.Infof(ctx, "[RunAndWait] Workload %s finished and removed", utils.ShortID(message.WorkloadID)) } @@ -97,7 +97,7 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC // this is weird, we return the error directly and try to delete data workload, err := c.GetWorkload(ctx, message.WorkloadID) if err != nil { - logger.Errorf(ctx, err, "[RunAndWait] Get workload failed %+v", err) + logger.Error(ctx, err, "[RunAndWait] Get workload failed") return &types.AttachWorkloadMessage{ WorkloadID: message.WorkloadID, Data: []byte(fmt.Sprintf("Get workload %s failed %+v", message.WorkloadID, err)), @@ -115,7 +115,7 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC Stdout: true, Stderr: true, }); err != nil { - logger.Errorf(ctx, err, "[RunAndWait] Can't fetch log of workload %s error %+v", message.WorkloadID, err) + logger.Errorf(ctx, err, "[RunAndWait] Can't fetch log of workload %s", message.WorkloadID) return &types.AttachWorkloadMessage{ WorkloadID: message.WorkloadID, Data: []byte(fmt.Sprintf("Fetch log for workload %s failed %+v", message.WorkloadID, err)), @@ -130,7 +130,7 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC var inStream io.WriteCloser stdout, stderr, inStream, err = workload.Engine.VirtualizationAttach(ctx, message.WorkloadID, true, true) if err != nil { - logger.Errorf(ctx, err, "[RunAndWait] Can't attach workload %s error %+v", message.WorkloadID, err) + logger.Errorf(ctx, err, "[RunAndWait] Can't attach workload %s", message.WorkloadID) return &types.AttachWorkloadMessage{ WorkloadID: message.WorkloadID, Data: []byte(fmt.Sprintf("Attach to workload %s failed %+v", message.WorkloadID, err)), @@ -156,7 +156,7 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC // wait and forward exitcode r, err := workload.Engine.VirtualizationWait(ctx, message.WorkloadID, "") if err != nil { - logger.Errorf(ctx, err, "[RunAndWait] %s wait failed %+v", utils.ShortID(message.WorkloadID), err) + logger.Errorf(ctx, err, "[RunAndWait] %s wait failed", utils.ShortID(message.WorkloadID)) return &types.AttachWorkloadMessage{ WorkloadID: message.WorkloadID, Data: []byte(fmt.Sprintf("Wait workload %s failed %+v", message.WorkloadID, err)), diff --git a/cluster/calcium/lock.go b/cluster/calcium/lock.go index 639e7b079..5b73307d6 100644 --- a/cluster/calcium/lock.go +++ b/cluster/calcium/lock.go @@ -23,7 +23,7 @@ func (c *Calcium) doLock(ctx context.Context, name string, timeout time.Duration defer cancel() rollbackCtx = utils.InheritTracingInfo(rollbackCtx, ctx) if e := lock.Unlock(rollbackCtx); e != nil { - log.Errorf(rollbackCtx, err, "failed to unlock %s: %+v", name, err) + log.Errorf(rollbackCtx, err, "failed to unlock %s", name) } } }() @@ -47,7 +47,7 @@ func (c *Calcium) doUnlockAll(ctx context.Context, locks map[string]lock.Distrib } for _, key := range order { if err := c.doUnlock(ctx, locks[key], key); err != nil { - log.Errorf(ctx, err, "[doUnlockAll] Unlock %s failed %v", key, err) + log.Errorf(ctx, err, "[doUnlockAll] Unlock %s failed", key) continue } } diff --git a/cluster/calcium/metrics.go b/cluster/calcium/metrics.go index 19041c9a2..c13dd55b7 100644 --- a/cluster/calcium/metrics.go +++ b/cluster/calcium/metrics.go @@ -14,11 +14,11 @@ import ( func (c *Calcium) InitMetrics(ctx context.Context) { metricsDescriptions, err := c.rmgr.GetMetricsDescription(ctx) if err != nil { - log.Errorf(ctx, err, "[InitMetrics] failed to get metrics description, err: %v", err) + log.Error(ctx, err, "[InitMetrics] failed to get metrics description") return } if err = metrics.InitMetrics(c.config, metricsDescriptions); err != nil { - log.Errorf(ctx, err, "[InitMetrics] failed to init metrics, err: %v", err) + log.Error(ctx, err, "[InitMetrics] failed to init metrics") return } log.Infof(ctx, "[InitMetrics] init metrics %v success", litter.Sdump(metricsDescriptions)) @@ -27,8 +27,8 @@ func (c *Calcium) InitMetrics(ctx context.Context) { func (c *Calcium) doSendNodeMetrics(ctx context.Context, node *types.Node) { nodeMetrics, err := c.rmgr.GetNodeMetrics(ctx, node) if err != nil { - log.Errorf(ctx, err, "[SendNodeMetrics] convert node %s resource info to metrics failed, %v", node.Name, err) + log.Errorf(ctx, err, "[SendNodeMetrics] convert node %s resource info to metrics failed", node.Name) return } - metrics.Client.SendMetrics(nodeMetrics...) + metrics.Client.SendMetrics(ctx, nodeMetrics...) } diff --git a/cluster/calcium/node.go b/cluster/calcium/node.go index 2219d9c15..6bf5c2ffb 100644 --- a/cluster/calcium/node.go +++ b/cluster/calcium/node.go @@ -122,11 +122,11 @@ func (c *Calcium) ListPodNodes(ctx context.Context, opts *types.ListNodesOptions defer wg.Done() var err error if node.Resource.Capacity, node.Resource.Usage, node.Resource.Diffs, err = c.rmgr.GetNodeResourceInfo(ctx, node.Name, nil, false); err != nil { - logger.Errorf(ctx, err, "failed to get node %v resource info: %+v", node.Name, err) + logger.Errorf(ctx, err, "failed to get node %v resource info", node.Name) } if opts.CallInfo { if err := node.Info(ctx); err != nil { - logger.Errorf(ctx, err, "failed to get node %v info: %+v", node.Name, err) + logger.Errorf(ctx, err, "failed to get node %v info", node.Name) } } ch <- node @@ -307,14 +307,14 @@ func (c *Calcium) filterNodes(ctx context.Context, nodeFilter *types.NodeFilter) func (c *Calcium) setAllWorkloadsOnNodeDown(ctx context.Context, nodename string) { workloads, err := c.store.ListNodeWorkloads(ctx, nodename, nil) if err != nil { - log.Errorf(ctx, err, "[setAllWorkloadsOnNodeDown] failed to list node workloads, node %v, err: %v", nodename, err) + log.Errorf(ctx, err, "[setAllWorkloadsOnNodeDown] failed to list node workloads, node %v", nodename) return } for _, workload := range workloads { appname, entrypoint, _, err := utils.ParseWorkloadName(workload.Name) if err != nil { - log.Errorf(ctx, err, "[setAllWorkloadsOnNodeDown] Set workload %s on node %s as inactive failed %v", workload.ID, nodename, err) + log.Errorf(ctx, err, "[setAllWorkloadsOnNodeDown] Set workload %s on node %s as inactive failed", workload.ID, nodename) continue } @@ -331,7 +331,7 @@ func (c *Calcium) setAllWorkloadsOnNodeDown(ctx context.Context, nodename string // mark workload which belongs to this node as unhealthy if err = c.store.SetWorkloadStatus(ctx, workload.StatusMeta, 0); err != nil { - log.Errorf(ctx, err, "[SetNodeAvailable] Set workload %s on node %s as inactive failed %v", workload.ID, nodename, err) + log.Errorf(ctx, err, "[SetNodeAvailable] Set workload %s on node %s as inactive failed", workload.ID, nodename) } else { log.Infof(ctx, "[SetNodeAvailable] Set workload %s on node %s as inactive", workload.ID, nodename) } diff --git a/cluster/calcium/realloc.go b/cluster/calcium/realloc.go index 7a941f0c6..d3d5ddc77 100644 --- a/cluster/calcium/realloc.go +++ b/cluster/calcium/realloc.go @@ -61,7 +61,7 @@ func (c *Calcium) doReallocOnNode(ctx context.Context, node *types.Node, workloa return nil } if err := c.rmgr.RollbackRealloc(ctx, workload.Nodename, deltaResourceArgs); err != nil { - log.Errorf(ctx, err, "[doReallocOnNode] failed to rollback workload %v, resource args %v, engine args %v, err %v", workload.ID, litter.Sdump(resourceArgs), litter.Sdump(engineArgs), err) + log.Errorf(ctx, err, "[doReallocOnNode] failed to rollback workload %v, resource args %v, engine args %v", workload.ID, litter.Sdump(resourceArgs), litter.Sdump(engineArgs)) // don't return here, so the node resource can still be fixed } return c.store.UpdateWorkload(ctx, &originWorkload) diff --git a/cluster/calcium/remap.go b/cluster/calcium/remap.go index ad9908649..9379a1fa5 100644 --- a/cluster/calcium/remap.go +++ b/cluster/calcium/remap.go @@ -31,7 +31,7 @@ func (c *Calcium) doRemapResourceAndLog(ctx context.Context, logger *log.Fields, }) if err != nil { - logger.Errorf(ctx, err, "[doRemapResourceAndLog] remap node %s failed, err: %v", node.Name, err) + logger.Errorf(ctx, err, "[doRemapResourceAndLog] remap node %s failed", node.Name) } } diff --git a/cluster/calcium/remove.go b/cluster/calcium/remove.go index 0c24443ef..38f08cbaf 100644 --- a/cluster/calcium/remove.go +++ b/cluster/calcium/remove.go @@ -18,7 +18,7 @@ func (c *Calcium) RemoveWorkload(ctx context.Context, ids []string, force bool) nodeWorkloadGroup, err := c.groupWorkloadsByNode(ctx, ids) if err != nil { - logger.Errorf(ctx, err, "failed to group workloads by node: %+v", err) + logger.Error(ctx, err, "failed to group workloads by node") return nil, err } @@ -69,7 +69,7 @@ func (c *Calcium) RemoveWorkload(ctx context.Context, ids []string, force bool) c.config.GlobalTimeout, ) }); err != nil { - logger.WithField("id", workloadID).Errorf(ctx, err, "failed to lock workload: %+v", err) + logger.WithField("id", workloadID).Error(ctx, err, "failed to lock workload") ret.Hook = append(ret.Hook, bytes.NewBufferString(err.Error())) ret.Success = false } @@ -78,7 +78,7 @@ func (c *Calcium) RemoveWorkload(ctx context.Context, ids []string, force bool) _ = c.pool.Invoke(func() { c.doRemapResourceAndLog(ctx, logger, node) }) return nil }); err != nil { - logger.WithField("nodename", nodename).Errorf(ctx, err, "failed to lock node: %+v", err) + logger.WithField("nodename", nodename).Error(ctx, err, "failed to lock node") ch <- &types.RemoveWorkloadMessage{Success: false} } } diff --git a/cluster/calcium/replace.go b/cluster/calcium/replace.go index 880fc4de6..9bd3a3e53 100644 --- a/cluster/calcium/replace.go +++ b/cluster/calcium/replace.go @@ -87,7 +87,7 @@ func (c *Calcium) ReplaceWorkload(ctx context.Context, opts *types.ReplaceOption log.Warnf(ctx, "[ReplaceWorkload] ignore workload: %v", err) return } - logger.Errorf(ctx, err, "[ReplaceWorkload] Replace and remove failed %+v, old workload restarted", err) + logger.Error(ctx, err, "[ReplaceWorkload] Replace and remove failed, old workload restarted") } else { log.Infof(ctx, "[ReplaceWorkload] Replace and remove success %s", id) log.Infof(ctx, "[ReplaceWorkload] New workload %s", createMessage.WorkloadID) @@ -164,7 +164,7 @@ func (c *Calcium) doReplaceWorkload( // then func(ctx context.Context) (err error) { if err = c.doRemoveWorkload(ctx, workload, true); err != nil { - log.Errorf(ctx, err, "[doReplaceWorkload] the new started but the old failed to stop") + log.Error(ctx, err, "[doReplaceWorkload] the new started but the old failed to stop") return err } removeMessage.Success = true @@ -178,7 +178,7 @@ func (c *Calcium) doReplaceWorkload( func(ctx context.Context, _ bool) (err error) { messages, err := c.doStartWorkload(ctx, workload, opts.IgnoreHook) if err != nil { - log.Errorf(ctx, err, "[replaceAndRemove] Old workload %s restart failed %v", workload.ID, err) + log.Error(ctx, err, "[replaceAndRemove] Old workload %s restart failed", workload.ID) removeMessage.Hook = append(removeMessage.Hook, bytes.NewBufferString(err.Error())) } else { removeMessage.Hook = append(removeMessage.Hook, messages...) diff --git a/cluster/calcium/resource.go b/cluster/calcium/resource.go index 8e74667de..15c309a6c 100644 --- a/cluster/calcium/resource.go +++ b/cluster/calcium/resource.go @@ -55,21 +55,21 @@ func (c *Calcium) NodeResource(ctx context.Context, nodename string, fix bool) ( func (c *Calcium) doGetNodeResource(ctx context.Context, nodename string, inspect, fix bool) (*types.NodeResource, error) { logger := log.WithField("Calcium", "doGetNodeResource").WithField("nodename", nodename).WithField("inspect", inspect).WithField("fix", fix) if nodename == "" { - logger.Errorf(ctx, types.ErrEmptyNodeName, "") + logger.Error(ctx, types.ErrEmptyNodeName) return nil, types.ErrEmptyNodeName } var nr *types.NodeResource return nr, c.withNodePodLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error { workloads, err := c.store.ListNodeWorkloads(ctx, node.Name, nil) if err != nil { - log.Errorf(ctx, err, "[doGetNodeResource] failed to list node workloads, node %v, err: %v", node.Name, err) + log.Errorf(ctx, err, "[doGetNodeResource] failed to list node workloads, node %v", node.Name) return err } // get node resources resourceCapacity, resourceUsage, resourceDiffs, err := c.rmgr.GetNodeResourceInfo(ctx, node.Name, workloads, fix) if err != nil { - log.Errorf(ctx, err, "[doGetNodeResource] failed to get node resources, node %v, err: %v", node.Name, err) + log.Errorf(ctx, err, "[doGetNodeResource] failed to get node resources, node %v", node.Name) return err } nr = &types.NodeResource{ @@ -97,14 +97,14 @@ func (c *Calcium) doGetDeployStrategy(ctx context.Context, nodenames []string, o // get nodes with capacity > 0 nodeResourceInfoMap, total, err := c.rmgr.GetNodesDeployCapacity(ctx, nodenames, opts.ResourceOpts) if err != nil { - log.Errorf(ctx, err, "[doGetDeployMap] failed to select available nodes, nodes %v, err %v", nodenames, err) + log.Errorf(ctx, err, "[doGetDeployMap] failed to select available nodes, nodes %v", nodenames) return nil, err } // get deployed & processing workload count on each node deployStatusMap, err := c.store.GetDeployStatus(ctx, opts.Name, opts.Entrypoint.Name) if err != nil { - log.Errorf(ctx, err, "failed to get deploy status for %v_%v, err %v", opts.Name, opts.Entrypoint.Name, err) + log.Errorf(ctx, err, "failed to get deploy status for %v_%v", opts.Name, opts.Entrypoint.Name) return nil, err } diff --git a/cluster/calcium/service.go b/cluster/calcium/service.go index 9ff643e67..a6b94c0d1 100644 --- a/cluster/calcium/service.go +++ b/cluster/calcium/service.go @@ -26,7 +26,7 @@ func (c *Calcium) WatchServiceStatus(ctx context.Context) (<-chan types.ServiceS func (c *Calcium) RegisterService(ctx context.Context) (unregister func(), err error) { serviceAddress, err := utils.GetOutboundAddress(c.config.Bind, c.config.ProbeTarget) if err != nil { - log.Errorf(ctx, err, "[RegisterService] failed to get outbound address: %v", err) + log.Error(ctx, err, "[RegisterService] failed to get outbound address") return } @@ -43,7 +43,7 @@ func (c *Calcium) RegisterService(ctx context.Context) (unregister func(), err e time.Sleep(time.Second) continue } - log.Errorf(ctx, err, "[RegisterService] failed to first register service: %+v", err) + log.Error(ctx, err, "[RegisterService] failed to first register service") return nil, err } @@ -61,7 +61,7 @@ func (c *Calcium) RegisterService(ctx context.Context) (unregister func(), err e case <-expiry: // The original one had been expired, we're going to register again. if ne, us, err := c.registerService(ctx, serviceAddress); err != nil { - log.Errorf(ctx, err, "[RegisterService] failed to re-register service: %v", err) + log.Error(ctx, err, "[RegisterService] failed to re-register service") time.Sleep(c.config.GRPCConfig.ServiceHeartbeatInterval) } else { expiry = ne diff --git a/cluster/calcium/stream.go b/cluster/calcium/stream.go index 96be1566b..be3bdf49f 100644 --- a/cluster/calcium/stream.go +++ b/cluster/calcium/stream.go @@ -4,10 +4,10 @@ import ( "bufio" "context" "encoding/json" - "fmt" "io" "sync" + "github.com/pkg/errors" "github.com/projecteru2/core/engine" enginetypes "github.com/projecteru2/core/engine/types" "github.com/projecteru2/core/log" @@ -48,7 +48,7 @@ func (c *Calcium) execuateInside(ctx context.Context, client engine.API, ID, cmd return b, err } if exitCode != 0 { - return b, fmt.Errorf("%s", b) + return b, errors.New(string(b)) } return b, nil } @@ -67,7 +67,7 @@ func (c *Calcium) processVirtualizationInStream( return } if err := resizeFunc(w.Height, w.Width); err != nil { - log.Errorf(ctx, err, "[processVirtualizationInStream] resize window error: %v", err) + log.Error(ctx, err, "[processVirtualizationInStream] resize window error") return } }, @@ -99,7 +99,7 @@ func (c *Calcium) rawProcessVirtualizationInStream( continue } if _, err := inStream.Write(cmd); err != nil { - log.Errorf(ctx, err, "[rawProcessVirtualizationInStream] failed to write virtual input stream: %v", err) + log.Error(ctx, err, "[rawProcessVirtualizationInStream] failed to write virtual input stream") continue } } @@ -150,7 +150,7 @@ func (c *Calcium) processBuildImageStream(ctx context.Context, reader io.ReadClo if err != nil { if err != io.EOF { malformed, _ := io.ReadAll(decoder.Buffered()) // TODO err check - log.Errorf(ctx, err, "[processBuildImageStream] Decode image message failed %v, buffered: %s", err, string(malformed)) + log.Errorf(ctx, err, "[processBuildImageStream] Decode image message failed, buffered: %s", string(malformed)) message.Error = err.Error() ch <- message } diff --git a/cluster/calcium/wal.go b/cluster/calcium/wal.go index e924ed9b2..888130c36 100644 --- a/cluster/calcium/wal.go +++ b/cluster/calcium/wal.go @@ -88,13 +88,13 @@ func (h *CreateLambdaHandler) Handle(ctx context.Context, raw interface{}) error go func() { workload, err := h.calcium.GetWorkload(ctx, workloadID) if err != nil { - logger.Errorf(ctx, err, "Get workload failed: %v", err) + logger.Error(ctx, err, "Get workload failed") return } r, err := workload.Engine.VirtualizationWait(ctx, workloadID, "") if err != nil { - logger.Errorf(ctx, err, "Wait failed: %+v", err) + logger.Error(ctx, err, "Wait failed") return } if r.Code != 0 { @@ -102,7 +102,7 @@ func (h *CreateLambdaHandler) Handle(ctx context.Context, raw interface{}) error } if err := h.calcium.RemoveWorkloadSync(ctx, []string{workloadID}); err != nil { - logger.Errorf(ctx, err, "Remove failed: %+v", err) + logger.Error(ctx, err, "Remove failed") } logger.Infof(ctx, "waited and removed") }() @@ -248,7 +248,7 @@ func (h *WorkloadResourceAllocatedHandler) Handle(ctx context.Context, raw inter _ = h.pool.Invoke(func() { defer wg.Done() if _, err = h.calcium.NodeResource(ctx, node.Name, true); err != nil { - logger.Errorf(ctx, err, "failed to fix node resource: %s, %+v", node.Name, err) + logger.Errorf(ctx, err, "failed to fix node resource: %s", node.Name) return } logger.Infof(ctx, "fixed node resource: %s", node.Name) diff --git a/cluster/calcium/workload.go b/cluster/calcium/workload.go index 5a6912ea0..6a9e9c922 100644 --- a/cluster/calcium/workload.go +++ b/cluster/calcium/workload.go @@ -14,7 +14,7 @@ import ( func (c *Calcium) GetWorkload(ctx context.Context, id string) (workload *types.Workload, err error) { logger := log.WithField("Calcium", "GetWorkload").WithField("id", id) if id == "" { - logger.Errorf(ctx, types.ErrEmptyWorkloadID, "") + logger.Error(ctx, types.ErrEmptyWorkloadID) return workload, types.ErrEmptyWorkloadID } workload, err = c.store.GetWorkload(ctx, id) @@ -41,7 +41,7 @@ func (c *Calcium) ListWorkloads(ctx context.Context, opts *types.ListWorkloadsOp func (c *Calcium) ListNodeWorkloads(ctx context.Context, nodename string, labels map[string]string) (workloads []*types.Workload, err error) { logger := log.WithField("Calcium", "ListNodeWorkloads").WithField("nodename", nodename).WithField("labels", labels) if nodename == "" { - logger.Errorf(ctx, types.ErrEmptyNodeName, "") + logger.Error(ctx, types.ErrEmptyNodeName) return workloads, types.ErrEmptyNodeName } workloads, err = c.store.ListNodeWorkloads(ctx, nodename, labels) diff --git a/core.go b/core.go index 61121da83..2d988af5c 100644 --- a/core.go +++ b/core.go @@ -92,7 +92,7 @@ func serve(c *cli.Context) error { pb.RegisterCoreRPCServer(grpcServer, vibranium) utils.SentryGo(func() { if err := grpcServer.Serve(s); err != nil { - log.Errorf(c.Context, err, "%v", "start grpc failed") + log.Error(c.Context, err, "start grpc failed") } }) @@ -104,14 +104,14 @@ func serve(c *cli.Context) error { ReadHeaderTimeout: 3 * time.Second, } if err := server.ListenAndServe(); err != nil { - log.Errorf(c.Context, err, "%v", "start http failed") + log.Error(c.Context, err, "start http failed") } }) } unregisterService, err := cluster.RegisterService(c.Context) if err != nil { - log.Errorf(c.Context, err, "%v", "failed to register service") + log.Error(c.Context, err, "failed to register service") return err } log.Info(c.Context, "[main] Cluster started successfully.") diff --git a/discovery/helium/helium.go b/discovery/helium/helium.go index a134933c9..a0059005c 100644 --- a/discovery/helium/helium.go +++ b/discovery/helium/helium.go @@ -5,6 +5,7 @@ import ( "sync" "time" + "github.com/pkg/errors" "github.com/projecteru2/core/log" "github.com/projecteru2/core/store" "github.com/projecteru2/core/types" @@ -69,13 +70,13 @@ func (h *Helium) Unsubscribe(id uuid.UUID) { func (h *Helium) start(ctx context.Context) { ch, err := h.store.ServiceStatusStream(ctx) if err != nil { - log.Errorf(ctx, err, "[WatchServiceStatus] failed to start watch: %v", err) //nolint + log.Error(ctx, err, "[WatchServiceStatus] failed to start watch") return } go func() { log.Info(ctx, "[WatchServiceStatus] service discovery start") - defer log.Error(ctx, nil, "[WatchServiceStatus] service discovery exited") //nolint + defer log.Warn(ctx, "[WatchServiceStatus] service discovery exited") var latestStatus types.ServiceStatus ticker := time.NewTicker(h.interval) defer ticker.Stop() @@ -83,7 +84,7 @@ func (h *Helium) start(ctx context.Context) { select { case addresses, ok := <-ch: if !ok { - log.Error(ctx, nil, "[WatchServiceStatus] watch channel closed") //nolint + log.Warn(ctx, "[WatchServiceStatus] watch channel closed") return } @@ -111,7 +112,7 @@ func (h *Helium) dispatch(status types.ServiceStatus) { f := func(key uint32, val entry) { defer func() { if err := recover(); err != nil { - log.Errorf(context.TODO(), nil, "[dispatch] dispatch %v failed, err: %v", key, err) + log.Errorf(context.TODO(), errors.Errorf("%v", err), "[dispatch] dispatch %v failed", key) } }() select { diff --git a/engine/docker/container.go b/engine/docker/container.go index e60f625dd..7e361396a 100644 --- a/engine/docker/container.go +++ b/engine/docker/container.go @@ -83,7 +83,7 @@ func (e *Engine) VirtualizationCreate(ctx context.Context, opts *enginetypes.Vir // parse engine args to resource options opts.VirtualizationResource, err = engine.MakeVirtualizationResource(opts.EngineArgs) if err != nil { - log.Errorf(ctx, err, "[VirtualizationCreate] failed to parse engine args %+v, err %v", opts.EngineArgs, err) + log.Errorf(ctx, err, "[VirtualizationCreate] failed to parse engine args %+v", opts.EngineArgs) return r, coretypes.ErrInvalidEngineArgs } @@ -403,7 +403,7 @@ func (e *Engine) VirtualizationUpdateResource(ctx context.Context, ID string, op // parse engine args to resource options resourceOpts, err := engine.MakeVirtualizationResource(opts.EngineArgs) if err != nil { - log.Errorf(ctx, err, "[VirtualizationUpdateResource] failed to parse engine args %+v, workload id %v, err %v", opts.EngineArgs, ID, err) + log.Errorf(ctx, err, "[VirtualizationUpdateResource] failed to parse engine args %+v, workload id %v", opts.EngineArgs, ID) return coretypes.ErrInvalidEngineArgs } diff --git a/engine/docker/docker.go b/engine/docker/docker.go index 66d38b04c..2f21a37ea 100644 --- a/engine/docker/docker.go +++ b/engine/docker/docker.go @@ -38,7 +38,7 @@ func MakeClient(ctx context.Context, config coretypes.Config, nodename, endpoint } else { client, err = utils.GetHTTPSClient(ctx, config.CertPath, nodename, ca, cert, key) if err != nil { - log.Errorf(ctx, err, "[MakeClient] GetHTTPSClient for %s %s error: %v", nodename, endpoint, err) + log.Errorf(ctx, err, "[MakeClient] GetHTTPSClient for %s %s", nodename, endpoint) return nil, err } } diff --git a/engine/docker/exec.go b/engine/docker/exec.go index 984324c8a..2d58bace4 100644 --- a/engine/docker/exec.go +++ b/engine/docker/exec.go @@ -71,7 +71,7 @@ func (e *Engine) demultiplexStdStream(ctx context.Context, stdStream io.Reader) defer stdoutW.Close() defer stderrW.Close() if _, err := stdcopy.StdCopy(stdoutW, stderrW, stdStream); err != nil { - log.Errorf(ctx, err, "[docker.demultiplex] StdCopy failed: %v", err) + log.Error(ctx, err, "[docker.demultiplex] StdCopy failed") } }() return stdout, stderr diff --git a/engine/docker/helper.go b/engine/docker/helper.go index e010f4f7e..09b14f918 100644 --- a/engine/docker/helper.go +++ b/engine/docker/helper.go @@ -293,7 +293,7 @@ func CreateTarStream(path string) (io.ReadCloser, error) { func GetIP(ctx context.Context, daemonHost string) string { u, err := url.Parse(daemonHost) if err != nil { - log.Errorf(ctx, err, "[GetIP] GetIP %s failed %v", daemonHost, err) + log.Errorf(ctx, err, "[GetIP] GetIP %s failed", daemonHost) return "" } return u.Hostname() diff --git a/engine/factory/factory.go b/engine/factory/factory.go index e5b07bbc3..9c6595c62 100644 --- a/engine/factory/factory.go +++ b/engine/factory/factory.go @@ -113,14 +113,14 @@ func (e *EngineCache) CheckAlive(ctx context.Context) { } if _, ok := client.(*fake.Engine); ok { if newClient, err := newEngine(ctx, e.config, utils.RandomString(8), params.endpoint, params.ca, params.key, params.cert); err != nil { - log.Errorf(ctx, err, "[EngineCache] engine %v is still unavailable, err: %v", cacheKey, err) + log.Errorf(ctx, err, "[EngineCache] engine %v is still unavailable", cacheKey) } else { e.cache.Set(cacheKey, newClient) } return } if err := validateEngine(ctx, client, e.config.ConnectionTimeout); err != nil { - log.Errorf(ctx, err, "[EngineCache] engine %v is unavailable, will be replaced with a fake engine, err: %v", cacheKey, err) + log.Errorf(ctx, err, "[EngineCache] engine %v is unavailable, will be replaced with a fake engine", cacheKey) e.cache.Set(cacheKey, &fake.Engine{DefaultErr: err}) } }) @@ -216,7 +216,7 @@ func newEngine(ctx context.Context, config types.Config, nodename, endpoint, ca, return nil, err } if err = validateEngine(ctx, client, config.ConnectionTimeout); err != nil { - log.Errorf(ctx, err, "[GetEngine] engine of %v is unavailable, err: %v", endpoint, err) + log.Errorf(ctx, err, "[GetEngine] engine of %v is unavailable", endpoint) return nil, err } return client, nil diff --git a/engine/virt/helper.go b/engine/virt/helper.go index 5c87a88c0..22f52a4d7 100644 --- a/engine/virt/helper.go +++ b/engine/virt/helper.go @@ -6,6 +6,7 @@ import ( "strconv" "strings" + "github.com/pkg/errors" coretypes "github.com/projecteru2/core/types" ) @@ -47,7 +48,7 @@ func (v *Virt) parseVolumes(volumes []string) ([]string, error) { } func splitUserImage(combined string) (user, imageName string, err error) { - inputErr := fmt.Errorf("input: \"%s\" not valid", combined) + inputErr := errors.Errorf("input: \"%s\" not valid", combined) if len(combined) < 1 { return "", "", inputErr } diff --git a/engine/virt/virt.go b/engine/virt/virt.go index 9b5903951..21e8b5eaa 100644 --- a/engine/virt/virt.go +++ b/engine/virt/virt.go @@ -10,6 +10,7 @@ import ( "strings" "time" + "github.com/pkg/errors" "github.com/projecteru2/core/cluster" "github.com/projecteru2/core/engine" enginetypes "github.com/projecteru2/core/engine/types" @@ -49,7 +50,7 @@ func MakeClient(ctx context.Context, config coretypes.Config, nodename, endpoint case strings.HasPrefix(endpoint, GRPCPrefixKey): uri = "grpc://" + strings.TrimPrefix(endpoint, GRPCPrefixKey) default: - return nil, fmt.Errorf("invalid endpoint: %s", endpoint) + return nil, errors.Errorf("invalid endpoint: %s", endpoint) } cli, err := virtapi.New(uri) @@ -175,7 +176,7 @@ func (v *Virt) BuildRefs(ctx context.Context, opts *enginetypes.BuildRefOptions) // BuildContent builds content, the use of it is similar to BuildRefs. func (v *Virt) BuildContent(ctx context.Context, scm coresource.Source, opts *enginetypes.BuildContentOptions) (string, io.Reader, error) { - return "", nil, fmt.Errorf("BuildContent does not implement") + return "", nil, coretypes.ErrEngineNotImplemented } // VirtualizationCreate creates a guest. @@ -183,7 +184,7 @@ func (v *Virt) VirtualizationCreate(ctx context.Context, opts *enginetypes.Virtu // parse engine args to resource options opts.VirtualizationResource, err = engine.MakeVirtualizationResource(opts.EngineArgs) if err != nil { - log.Errorf(ctx, err, "[VirtualizationCreate] failed to parse engine args %+v, err %v", opts.EngineArgs, err) + log.Errorf(ctx, err, "[VirtualizationCreate] failed to parse engine args %+v", opts.EngineArgs) return nil, coretypes.ErrInvalidEngineArgs } @@ -324,7 +325,7 @@ func (v *Virt) VirtualizationUpdateResource(ctx context.Context, ID string, reso // parse engine args to resource options opts, err := engine.MakeVirtualizationResource(resourceOpts.EngineArgs) if err != nil { - log.Errorf(ctx, err, "[VirtualizationCreate] failed to parse engine args %+v, err %v", opts.EngineArgs, err) + log.Errorf(ctx, err, "[VirtualizationCreate] failed to parse engine args %+v", opts.EngineArgs) return err } diff --git a/go.mod b/go.mod index a4485f436..7749a415c 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/CMGS/statsd v0.0.0-20160223095033-48c421b3c1ab github.com/alicebob/miniredis/v2 v2.14.3 github.com/cenkalti/backoff/v4 v4.1.1 - github.com/cornelk/hashmap v1.0.8 + github.com/cornelk/hashmap v1.0.9-0.20221031160728-c3a0e7bb43df github.com/docker/distribution v2.8.0+incompatible github.com/docker/docker v20.10.0+incompatible github.com/docker/go-connections v0.4.0 diff --git a/go.sum b/go.sum index 722e28bf8..6c3d4c01d 100644 --- a/go.sum +++ b/go.sum @@ -116,8 +116,8 @@ github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSV github.com/coreos/go-systemd/v22 v22.3.3-0.20220203105225-a9a7ef127534 h1:rtAn27wIbmOGUs7RIbVgPEjb31ehTVniDwPGXyMxm5U= github.com/coreos/go-systemd/v22 v22.3.3-0.20220203105225-a9a7ef127534/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= -github.com/cornelk/hashmap v1.0.8 h1:nv0AWgw02n+iDcawr5It4CjQIAcdMMKRrs10HOJYlrc= -github.com/cornelk/hashmap v1.0.8/go.mod h1:RfZb7JO3RviW/rT6emczVuC/oxpdz4UsSB2LJSclR1k= +github.com/cornelk/hashmap v1.0.9-0.20221031160728-c3a0e7bb43df h1:nZzJODCK5L5pocFghtum9MZJWz9avjtYfowjO3Wt/uI= +github.com/cornelk/hashmap v1.0.9-0.20221031160728-c3a0e7bb43df/go.mod h1:RfZb7JO3RviW/rT6emczVuC/oxpdz4UsSB2LJSclR1k= github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/cpuguy83/go-md2man/v2 v2.0.0 h1:EoUDS0afbrsXAZ9YQ9jdu/mZ2sXgT1/2yyNng4PGlyM= diff --git a/log/log.go b/log/log.go index e524b86d7..21cb53605 100644 --- a/log/log.go +++ b/log/log.go @@ -120,7 +120,7 @@ func (f Fields) Errorf(ctx context.Context, err error, format string, args ...in // Error forwards to sentry func (f Fields) Error(ctx context.Context, err error, args ...interface{}) { - f.Errorf(ctx, err, "%v", args...) + f.Errorf(ctx, err, "%+v", args...) } // for sentry diff --git a/metrics/handler.go b/metrics/handler.go index ae3749b8f..d0c9ef61a 100644 --- a/metrics/handler.go +++ b/metrics/handler.go @@ -17,15 +17,15 @@ func (m *Metrics) ResourceMiddleware(cluster cluster.Cluster) func(http.Handler) defer cancel() nodes, err := cluster.ListPodNodes(ctx, &types.ListNodesOptions{All: true}) if err != nil { - log.Errorf(ctx, err, "[ResourceMiddleware] Get all nodes err %v", err) + log.Error(ctx, err, "[ResourceMiddleware] Get all nodes err") } for node := range nodes { metrics, err := m.rmgr.GetNodeMetrics(ctx, node) if err != nil { - log.Errorf(ctx, err, "[ResourceMiddleware] Get metrics failed %v", err) + log.Error(ctx, err, "[ResourceMiddleware] Get metrics failed") continue } - m.SendMetrics(metrics...) + m.SendMetrics(ctx, metrics...) } h.ServeHTTP(w, r) }) diff --git a/metrics/metrics.go b/metrics/metrics.go index ccb67d9f5..930c72cf6 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -1,6 +1,7 @@ package metrics import ( + "context" "os" "strconv" "sync" @@ -36,7 +37,7 @@ type Metrics struct { } // SendDeployCount update deploy counter -func (m *Metrics) SendDeployCount(n int) { +func (m *Metrics) SendDeployCount(ctx context.Context, n int) { log.Info(nil, "[Metrics] Update deploy counter") //nolint metrics := &resources.Metrics{ Name: deployCountName, @@ -45,44 +46,44 @@ func (m *Metrics) SendDeployCount(n int) { Value: strconv.Itoa(n), } - m.SendMetrics(metrics) + m.SendMetrics(ctx, metrics) } // SendMetrics update metrics -func (m *Metrics) SendMetrics(metrics ...*resources.Metrics) { +func (m *Metrics) SendMetrics(ctx context.Context, metrics ...*resources.Metrics) { for _, metric := range metrics { collector, ok := m.Collectors[metric.Name] if !ok { - log.Errorf(nil, nil, "[SendMetrics] Collector not found: %s", metric.Name) //nolint + log.Warnf(ctx, "[SendMetrics] Collector not found: %s", metric.Name) continue } switch collector.(type) { // nolint case *prometheus.GaugeVec: value, err := strconv.ParseFloat(metric.Value, 64) if err != nil { - log.Errorf(nil, err, "[SendMetrics] Error occurred while parsing %v value %v: %v", metric.Name, metric.Value, err) //nolint + log.Errorf(ctx, err, "[SendMetrics] Error occurred while parsing %v value %v", metric.Name, metric.Value) } collector.(*prometheus.GaugeVec).WithLabelValues(metric.Labels...).Set(value) // nolint - if err := m.gauge(metric.Key, value); err != nil { - log.Errorf(nil, err, "[SendMetrics] Error occurred while sending %v data to statsd: %v", metric.Name, err) //nolint + if err := m.gauge(ctx, metric.Key, value); err != nil { + log.Errorf(ctx, err, "[SendMetrics] Error occurred while sending %v data to statsd", metric.Name) } case *prometheus.CounterVec: value, err := strconv.ParseInt(metric.Value, 10, 32) // nolint if err != nil { - log.Errorf(nil, err, "[SendMetrics] Error occurred while parsing %v value %v: %v", metric.Name, metric.Value, err) //nolint + log.Errorf(ctx, err, "[SendMetrics] Error occurred while parsing %v value %v", metric.Name, metric.Value) } - collector.(*prometheus.CounterVec).WithLabelValues(metric.Labels...).Add(float64(value)) // nolint - if err := m.count(metric.Key, int(value), 1.0); err != nil { - log.Errorf(nil, err, "[SendMetrics] Error occurred while sending %v data to statsd: %v", metric.Name, err) //nolint + collector.(*prometheus.CounterVec).WithLabelValues(metric.Labels...).Add(float64(value)) //nolint + if err := m.count(ctx, metric.Key, int(value), 1.0); err != nil { + log.Errorf(ctx, err, "[SendMetrics] Error occurred while sending %v data to statsd", metric.Name) } default: - log.Errorf(nil, nil, "[SendMetrics] Unknown collector type: %T", collector) //nolint + log.Errorf(ctx, types.ErrInvalidType, "[SendMetrics] Unknown collector type: %T", collector) } } } // Lazy connect -func (m *Metrics) checkConn() error { +func (m *Metrics) checkConn(ctx context.Context) error { if m.statsdClient != nil { return nil } @@ -90,30 +91,30 @@ func (m *Metrics) checkConn() error { // We needn't try to renew/reconnect because of only supporting UDP protocol now // We should add an `errorCount` to reconnect when implementing TCP protocol if m.statsdClient, err = statsdlib.New(m.StatsdAddr, statsdlib.WithErrorHandler(func(err error) { - log.Errorf(nil, err, "[statsd] Sending statsd failed: %v", err) //nolint + log.Error(ctx, err, "[statsd] Sending statsd failed") })); err != nil { - log.Errorf(nil, err, "[statsd] Connect statsd failed: %v", err) //nolint + log.Error(ctx, err, "[statsd] Connect statsd failed") return err } return nil } -func (m *Metrics) gauge(key string, value float64) error { +func (m *Metrics) gauge(ctx context.Context, key string, value float64) error { if m.StatsdAddr == "" { return nil } - if err := m.checkConn(); err != nil { + if err := m.checkConn(ctx); err != nil { return err } m.statsdClient.Gauge(key, value) return nil } -func (m *Metrics) count(key string, n int, rate float32) error { +func (m *Metrics) count(ctx context.Context, key string, n int, rate float32) error { if m.StatsdAddr == "" { return nil } - if err := m.checkConn(); err != nil { + if err := m.checkConn(ctx); err != nil { return err } m.statsdClient.Count(key, n, rate) diff --git a/resources/binary.go b/resources/binary.go index 6d1b97e69..082f2b261 100644 --- a/resources/binary.go +++ b/resources/binary.go @@ -10,6 +10,7 @@ import ( "reflect" "strings" + "github.com/pkg/errors" enginetypes "github.com/projecteru2/core/engine/types" "github.com/projecteru2/core/log" coretypes "github.com/projecteru2/core/types" @@ -239,7 +240,7 @@ func (bp *BinaryPlugin) execCommand(cmd *exec.Cmd) (output, log string, err erro cmd.Stderr = &stderr if err = cmd.Run(); err != nil { - err = fmt.Errorf("err: %v, output: %v, log: %v", err, output, log) + err = errors.Errorf("err: %v, output: %v, log: %v", err, output, log) } return stdout.String(), stderr.String(), err } @@ -260,7 +261,7 @@ func (bp *BinaryPlugin) call(ctx context.Context, cmd string, req interface{}, r defer log.Infof(ctx, "[callBinaryPlugin] output from plugin %s: %s", bp.path, pluginOutput) if err != nil { - log.Errorf(ctx, err, "[callBinaryPlugin] failed to run plugin %s, command %v, err %s", bp.path, args, err) + log.Errorf(ctx, err, "[callBinaryPlugin] failed to run plugin %s, command %v", bp.path, args) return err } @@ -268,7 +269,7 @@ func (bp *BinaryPlugin) call(ctx context.Context, cmd string, req interface{}, r pluginOutput = "{}" } if err := json.Unmarshal([]byte(pluginOutput), resp); err != nil { - log.Errorf(ctx, err, "[callBinaryPlugin] failed to unmarshal output of plugin %s, command %v, output %s, err %s", bp.path, args, pluginOutput, err) + log.Errorf(ctx, err, "[callBinaryPlugin] failed to unmarshal output of plugin %s, command %v, output %s", bp.path, args, pluginOutput) return err } return nil diff --git a/resources/cpumem/models/alloc.go b/resources/cpumem/models/alloc.go index 2054b585e..fab94a4d3 100644 --- a/resources/cpumem/models/alloc.go +++ b/resources/cpumem/models/alloc.go @@ -12,13 +12,13 @@ import ( // GetDeployArgs . func (c *CPUMem) GetDeployArgs(ctx context.Context, node string, deployCount int, opts *types.WorkloadResourceOpts) ([]*types.EngineArgs, []*types.WorkloadResourceArgs, error) { if err := opts.Validate(); err != nil { - log.Errorf(ctx, err, "[GetDeployArgs] invalid resource opts %+v, err: %v", opts, err) + log.Errorf(ctx, err, "[GetDeployArgs] invalid resource opts %+v", opts) return nil, nil, err } resourceInfo, err := c.doGetNodeResourceInfo(ctx, node) if err != nil { - log.Errorf(ctx, err, "[GetDeployArgs] failed to get resource info of node %v, err: %v", node, err) + log.Errorf(ctx, err, "[GetDeployArgs] failed to get resource info of node %v", node) return nil, nil, err } diff --git a/resources/cpumem/models/capacity.go b/resources/cpumem/models/capacity.go index db71156b1..47e2a780e 100644 --- a/resources/cpumem/models/capacity.go +++ b/resources/cpumem/models/capacity.go @@ -13,7 +13,7 @@ import ( // GetNodesDeployCapacity . func (c *CPUMem) GetNodesDeployCapacity(ctx context.Context, nodes []string, opts *types.WorkloadResourceOpts) (map[string]*types.NodeCapacityInfo, int, error) { if err := opts.Validate(); err != nil { - log.Errorf(ctx, err, "[GetNodesDeployCapacity] invalid resource opts %+v, err: %v", opts, err) + log.Errorf(ctx, err, "[GetNodesDeployCapacity] invalid resource opts %+v", opts) return nil, 0, err } @@ -22,7 +22,7 @@ func (c *CPUMem) GetNodesDeployCapacity(ctx context.Context, nodes []string, opt for _, node := range nodes { resourceInfo, err := c.doGetNodeResourceInfo(ctx, node) if err != nil { - log.Errorf(ctx, err, "[GetNodesDeployCapacity] failed to get resource info of node %v, err: %v", node, err) + log.Errorf(ctx, err, "[GetNodesDeployCapacity] failed to get resource info of node %v", node) return nil, 0, err } capacityInfo := c.doGetNodeCapacityInfo(node, resourceInfo, opts) diff --git a/resources/cpumem/models/cpumem.go b/resources/cpumem/models/cpumem.go index 2fba765b6..d68af41b5 100644 --- a/resources/cpumem/models/cpumem.go +++ b/resources/cpumem/models/cpumem.go @@ -18,7 +18,7 @@ func NewCPUMem(config coretypes.Config) (*CPUMem, error) { if len(config.Etcd.Machines) > 0 { c.store, err = meta.NewETCD(config.Etcd, nil) if err != nil { - log.Errorf(nil, err, "[NewCPUMem] failed to create etcd client, err: %v", err) //nolint + log.Error(nil, err, "[NewCPUMem] failed to create etcd client") //nolint return nil, err } } diff --git a/resources/cpumem/models/idle.go b/resources/cpumem/models/idle.go index 3075ed6dc..99ef4a8e0 100644 --- a/resources/cpumem/models/idle.go +++ b/resources/cpumem/models/idle.go @@ -17,7 +17,7 @@ func (c *CPUMem) GetMostIdleNode(ctx context.Context, nodes []string) (string, i for _, node := range nodes { resourceInfo, err := c.doGetNodeResourceInfo(ctx, node) if err != nil { - log.Errorf(ctx, err, "[GetMostIdleNode] failed to get node resource info") + log.Error(ctx, err, "[GetMostIdleNode] failed to get node resource info") return "", 0, err } idle := float64(resourceInfo.Usage.CPUMap.TotalPieces()) / float64(resourceInfo.Capacity.CPUMap.TotalPieces()) diff --git a/resources/cpumem/models/info.go b/resources/cpumem/models/info.go index c7752c92c..e69bf6db2 100644 --- a/resources/cpumem/models/info.go +++ b/resources/cpumem/models/info.go @@ -64,7 +64,7 @@ func (c *CPUMem) GetNodeResourceInfo(ctx context.Context, node string, workloadR NUMAMemory: totalResourceArgs.NUMAMemory, } if err = c.doSetNodeResourceInfo(ctx, node, resourceInfo); err != nil { - log.Errorf(ctx, err, "[GetNodeResourceInfo] failed to fix node resource, err: %v", err) + log.Error(ctx, err, "[GetNodeResourceInfo] failed to fix node resource") diffs = append(diffs, "fix failed") } } @@ -126,7 +126,7 @@ func (c *CPUMem) calculateNodeResourceArgs(origin *types.NodeResourceArgs, nodeR func (c *CPUMem) SetNodeResourceUsage(ctx context.Context, node string, nodeResourceOpts *types.NodeResourceOpts, nodeResourceArgs *types.NodeResourceArgs, workloadResourceArgs []*types.WorkloadResourceArgs, delta bool, incr bool) (before *types.NodeResourceArgs, after *types.NodeResourceArgs, err error) { resourceInfo, err := c.doGetNodeResourceInfo(ctx, node) if err != nil { - log.Errorf(ctx, err, "[SetNodeResourceInfo] failed to get resource info of node %v, err: %v", node, err) + log.Errorf(ctx, err, "[SetNodeResourceInfo] failed to get resource info of node %v", node) return nil, nil, err } @@ -143,7 +143,7 @@ func (c *CPUMem) SetNodeResourceUsage(ctx context.Context, node string, nodeReso func (c *CPUMem) SetNodeResourceCapacity(ctx context.Context, node string, nodeResourceOpts *types.NodeResourceOpts, nodeResourceArgs *types.NodeResourceArgs, delta bool, incr bool) (before *types.NodeResourceArgs, after *types.NodeResourceArgs, err error) { resourceInfo, err := c.doGetNodeResourceInfo(ctx, node) if err != nil { - log.Errorf(ctx, err, "[SetNodeResourceInfo] failed to get resource info of node %v, err: %v", node, err) + log.Errorf(ctx, err, "[SetNodeResourceInfo] failed to get resource info of node %v", node) return nil, nil, err } @@ -185,11 +185,11 @@ func (c *CPUMem) doGetNodeResourceInfo(ctx context.Context, node string) (*types resourceInfo := &types.NodeResourceInfo{} resp, err := c.store.GetOne(ctx, fmt.Sprintf(NodeResourceInfoKey, node)) if err != nil { - log.Errorf(ctx, err, "[doGetNodeResourceInfo] failed to get node resource info of node %v, err: %v", node, err) + log.Errorf(ctx, err, "[doGetNodeResourceInfo] failed to get node resource info of node %v", node) return nil, err } if err = json.Unmarshal(resp.Value, resourceInfo); err != nil { - log.Errorf(ctx, err, "[doGetNodeResourceInfo] failed to unmarshal node resource info of node %v, err: %v", node, err) + log.Errorf(ctx, err, "[doGetNodeResourceInfo] failed to unmarshal node resource info of node %v", node) return nil, err } return resourceInfo, nil @@ -197,18 +197,18 @@ func (c *CPUMem) doGetNodeResourceInfo(ctx context.Context, node string) (*types func (c *CPUMem) doSetNodeResourceInfo(ctx context.Context, node string, resourceInfo *types.NodeResourceInfo) error { if err := resourceInfo.Validate(); err != nil { - log.Errorf(ctx, err, "[doSetNodeResourceInfo] invalid resource info %v, err: %v", litter.Sdump(resourceInfo), err) + log.Errorf(ctx, err, "[doSetNodeResourceInfo] invalid resource info %v", litter.Sdump(resourceInfo)) return err } data, err := json.Marshal(resourceInfo) if err != nil { - log.Errorf(ctx, err, "[doSetNodeResourceInfo] faield to marshal resource info %+v, err: %v", resourceInfo, err) + log.Errorf(ctx, err, "[doSetNodeResourceInfo] faield to marshal resource info %+v", resourceInfo) return err } if _, err = c.store.Put(ctx, fmt.Sprintf(NodeResourceInfoKey, node), string(data)); err != nil { - log.Errorf(ctx, err, "[doSetNodeResourceInfo] faield to put resource info %+v, err: %v", resourceInfo, err) + log.Errorf(ctx, err, "[doSetNodeResourceInfo] faield to put resource info %+v,", resourceInfo) return err } return nil diff --git a/resources/cpumem/models/node.go b/resources/cpumem/models/node.go index 456d5ff07..d13782fd1 100644 --- a/resources/cpumem/models/node.go +++ b/resources/cpumem/models/node.go @@ -14,7 +14,7 @@ import ( func (c *CPUMem) AddNode(ctx context.Context, node string, resourceOpts *types.NodeResourceOpts) (*types.NodeResourceInfo, error) { if _, err := c.doGetNodeResourceInfo(ctx, node); err != nil { if !errors.Is(err, coretypes.ErrBadCount) { - log.Errorf(ctx, err, "[AddNode] failed to get resource info of node %v, err: %v", node, err) + log.Errorf(ctx, err, "[AddNode] failed to get resource info of node %v", node) return nil, err } } else { @@ -61,7 +61,7 @@ func (c *CPUMem) AddNode(ctx context.Context, node string, resourceOpts *types.N // RemoveNode . func (c *CPUMem) RemoveNode(ctx context.Context, node string) error { if _, err := c.store.Delete(ctx, fmt.Sprintf(NodeResourceInfoKey, node)); err != nil { - log.Errorf(ctx, err, "[doSetNodeResourceInfo] faield to delete node %v, err: %v", node, err) + log.Errorf(ctx, err, "[doSetNodeResourceInfo] faield to delete node %v", node) return err } return nil diff --git a/resources/cpumem/models/realloc.go b/resources/cpumem/models/realloc.go index 88ab90ac3..167525681 100644 --- a/resources/cpumem/models/realloc.go +++ b/resources/cpumem/models/realloc.go @@ -16,7 +16,7 @@ func (c *CPUMem) GetReallocArgs(ctx context.Context, node string, originResource resourceInfo, err := c.doGetNodeResourceInfo(ctx, node) if err != nil { - log.Errorf(ctx, err, "[GetReallocArgs] failed to get resource info of node %v, err: %v", node, err) + log.Errorf(ctx, err, "[GetReallocArgs] failed to get resource info of node %v", node) return nil, nil, nil, err } diff --git a/resources/cpumem/models/remap.go b/resources/cpumem/models/remap.go index 0ab708ce6..6e2b28011 100644 --- a/resources/cpumem/models/remap.go +++ b/resources/cpumem/models/remap.go @@ -11,7 +11,7 @@ import ( func (c *CPUMem) GetRemapArgs(ctx context.Context, node string, workloadResourceMap *types.WorkloadResourceArgsMap) (map[string]*types.EngineArgs, error) { resourceInfo, err := c.doGetNodeResourceInfo(ctx, node) if err != nil { - log.Errorf(ctx, err, "[GetRemapArgs] failed to get resource info of node %v, err: %v", node, err) + log.Errorf(ctx, err, "[GetRemapArgs] failed to get resource info of node %v", node) return nil, err } availableNodeResource := resourceInfo.GetAvailableResource() diff --git a/resources/manager.go b/resources/manager.go index 07553d5ba..c83b44294 100644 --- a/resources/manager.go +++ b/resources/manager.go @@ -23,7 +23,7 @@ func (pm *PluginsManager) GetMostIdleNode(ctx context.Context, nodenames []strin respMap, err := callPlugins(ctx, pm.plugins, func(plugin Plugin) (*GetMostIdleNodeResponse, error) { resp, err := plugin.GetMostIdleNode(ctx, nodenames) if err != nil { - log.Errorf(ctx, err, "[GetMostIdleNode] plugin %v failed to get the most idle node of %v, err: %v", plugin.Name(), nodenames, err) + log.Errorf(ctx, err, "[GetMostIdleNode] plugin %v failed to get the most idle node of %v", plugin.Name(), nodenames) } return resp, err }) @@ -54,7 +54,7 @@ func (pm *PluginsManager) GetNodesDeployCapacity(ctx context.Context, nodenames respMap, err := callPlugins(ctx, pm.plugins, func(plugin Plugin) (*GetNodesDeployCapacityResponse, error) { resp, err := plugin.GetNodesDeployCapacity(ctx, nodenames, resourceOpts) if err != nil { - log.Errorf(ctx, err, "[GetNodesDeployCapacity] plugin %v failed to get available nodenames, request %v, err %v", plugin.Name(), resourceOpts, err) + log.Errorf(ctx, err, "[GetNodesDeployCapacity] plugin %v failed to get available nodenames, request %v", plugin.Name(), resourceOpts) } return resp, err }) @@ -103,7 +103,7 @@ func (pm *PluginsManager) SetNodeResourceCapacity(ctx context.Context, nodename respMap, err := callPlugins(ctx, pm.plugins, func(plugin Plugin) (*SetNodeResourceCapacityResponse, error) { resp, err := plugin.SetNodeResourceCapacity(ctx, nodename, nodeResourceOpts, nodeResourceArgs[plugin.Name()], delta, incr) if err != nil { - log.Errorf(ctx, err, "[SetNodeResourceCapacity] node %v plugin %v failed to set node resource capacity, err: %v", nodename, plugin.Name(), err) + log.Errorf(ctx, err, "[SetNodeResourceCapacity] node %v plugin %v failed to set node resource capacity", nodename, plugin.Name()) } return resp, err }) @@ -124,7 +124,7 @@ func (pm *PluginsManager) SetNodeResourceCapacity(ctx context.Context, nodename _, err := callPlugins(ctx, rollbackPlugins, func(plugin Plugin) (*SetNodeResourceCapacityResponse, error) { resp, err := plugin.SetNodeResourceCapacity(ctx, nodename, nil, beforeMap[plugin.Name()], false, false) if err != nil { - log.Errorf(ctx, err, "[SetNodeResourceCapacity] node %v plugin %v failed to rollback node resource capacity, err: %v", nodename, plugin.Name(), err) + log.Errorf(ctx, err, "[SetNodeResourceCapacity] node %v plugin %v failed to rollback node resource capacity", nodename, plugin.Name()) } return resp, err }) @@ -156,7 +156,7 @@ func (pm *PluginsManager) GetNodeResourceInfo(ctx context.Context, nodename stri resp, err = plugin.GetNodeResourceInfo(ctx, nodename, workloads) } if err != nil { - log.Errorf(ctx, err, "[GetNodeResourceInfo] plugin %v failed to get node resource of node %v, err: %v", plugin.Name(), nodename, err) + log.Errorf(ctx, err, "[GetNodeResourceInfo] plugin %v failed to get node resource of node %v", plugin.Name(), nodename) } return resp, err }) @@ -203,7 +203,7 @@ func (pm *PluginsManager) SetNodeResourceUsage(ctx context.Context, nodename str respMap, err := callPlugins(ctx, pm.plugins, func(plugin Plugin) (*SetNodeResourceUsageResponse, error) { resp, err := plugin.SetNodeResourceUsage(ctx, nodename, nodeResourceOpts, nodeResourceArgs[plugin.Name()], workloadResourceArgsMap[plugin.Name()], delta, incr) if err != nil { - log.Errorf(ctx, err, "[SetNodeResourceUsage] node %v plugin %v failed to update node resource, err: %v", nodename, plugin.Name(), err) + log.Errorf(ctx, err, "[SetNodeResourceUsage] node %v plugin %v failed to update node resource", nodename, plugin.Name()) } return resp, err }) @@ -224,7 +224,7 @@ func (pm *PluginsManager) SetNodeResourceUsage(ctx context.Context, nodename str _, err := callPlugins(ctx, rollbackPlugins, func(plugin Plugin) (*SetNodeResourceUsageResponse, error) { resp, err := plugin.SetNodeResourceUsage(ctx, nodename, nil, beforeMap[plugin.Name()], nil, false, false) if err != nil { - log.Errorf(ctx, err, "[UpdateNodeResourceUsage] node %v plugin %v failed to rollback node resource, err: %v", nodename, plugin.Name(), err) + log.Errorf(ctx, err, "[UpdateNodeResourceUsage] node %v plugin %v failed to rollback node resource", nodename, plugin.Name()) } return resp, err }) @@ -251,7 +251,7 @@ func (pm *PluginsManager) Alloc(ctx context.Context, nodename string, deployCoun respMap, err := callPlugins(ctx, pm.plugins, func(plugin Plugin) (*GetDeployArgsResponse, error) { resp, err := plugin.GetDeployArgs(ctx, nodename, deployCount, resourceOpts) if err != nil { - log.Errorf(ctx, err, "[Alloc] plugin %v failed to compute alloc args, request %v, node %v, deploy count %v, err %v", plugin.Name(), resourceOpts, nodename, deployCount, err) + log.Errorf(ctx, err, "[Alloc] plugin %v failed to compute alloc args, request %v, node %v, deploy count %v", plugin.Name(), resourceOpts, nodename, deployCount) } return resp, err }) @@ -267,7 +267,7 @@ func (pm *PluginsManager) Alloc(ctx context.Context, nodename string, deployCoun for index, args := range resp.EngineArgs { resEngineArgs[index], err = pm.mergeEngineArgs(ctx, resEngineArgs[index], args) if err != nil { - log.Errorf(ctx, err, "[Alloc] invalid engine args") + log.Error(ctx, err, "[Alloc] invalid engine args") return err } } @@ -277,7 +277,7 @@ func (pm *PluginsManager) Alloc(ctx context.Context, nodename string, deployCoun // commit: update node resources func(ctx context.Context) error { if _, _, err := pm.SetNodeResourceUsage(ctx, nodename, nil, nil, resResourceArgs, true, Incr); err != nil { - log.Errorf(ctx, err, "[Alloc] failed to update node resource, err: %v", err) + log.Error(ctx, err, "[Alloc] failed to update node resource") return err } return nil @@ -308,7 +308,7 @@ func (pm *PluginsManager) Realloc(ctx context.Context, nodename string, originRe respMap, err := callPlugins(ctx, pm.plugins, func(plugin Plugin) (*GetReallocArgsResponse, error) { resp, err := plugin.GetReallocArgs(ctx, nodename, originResourceArgs[plugin.Name()], resourceOpts) if err != nil { - log.Errorf(ctx, err, "[Realloc] plugin %v failed to calculate realloc args, err: %v", plugin.Name(), err) + log.Errorf(ctx, err, "[Realloc] plugin %v failed to calculate realloc args", plugin.Name()) } return resp, err }) @@ -320,7 +320,7 @@ func (pm *PluginsManager) Realloc(ctx context.Context, nodename string, originRe for plugin, resp := range respMap { if resEngineArgs, err = pm.mergeEngineArgs(ctx, resEngineArgs, resp.EngineArgs); err != nil { - log.Errorf(ctx, err, "[Realloc] invalid engine args, err: %v", err) + log.Error(ctx, err, "[Realloc] invalid engine args") return err } resDeltaResourceArgs[plugin.Name()] = resp.Delta @@ -331,7 +331,7 @@ func (pm *PluginsManager) Realloc(ctx context.Context, nodename string, originRe // commit: update node resource func(ctx context.Context) error { if _, _, err := pm.SetNodeResourceUsage(ctx, nodename, nil, nil, []map[string]types.WorkloadResourceArgs{resDeltaResourceArgs}, true, Incr); err != nil { - log.Errorf(ctx, err, "[Realloc] failed to update nodename resource, err: %v", err) + log.Error(ctx, err, "[Realloc] failed to update nodename resource") return err } return nil @@ -356,13 +356,13 @@ func (pm *PluginsManager) GetMetricsDescription(ctx context.Context) ([]*Metrics respMap, err := callPlugins(ctx, pm.plugins, func(plugin Plugin) (*GetMetricsDescriptionResponse, error) { resp, err := plugin.GetMetricsDescription(ctx) if err != nil { - log.Errorf(ctx, err, "[GetMetricsDescription] plugin %v failed to get metrics description, err: %v", plugin.Name(), err) + log.Errorf(ctx, err, "[GetMetricsDescription] plugin %v failed to get metrics description", plugin.Name()) } return resp, err }) if err != nil { - log.Errorf(ctx, err, "[GetMetricsDescription] failed to get metrics description") + log.Error(ctx, err, "[GetMetricsDescription] failed to get metrics description") return nil, err } @@ -380,13 +380,13 @@ func (pm *PluginsManager) GetNodeMetrics(ctx context.Context, node *types.Node) capacity, usage := node.Resource.Capacity[plugin.Name()], node.Resource.Usage[plugin.Name()] resp, err := plugin.GetNodeMetrics(ctx, node.Podname, node.Name, &NodeResourceInfo{Capacity: capacity, Usage: usage}) if err != nil { - log.Errorf(ctx, err, "[GetNodeMetrics] plugin %v failed to convert node resource info to metrics, err: %v", plugin.Name(), err) + log.Errorf(ctx, err, "[GetNodeMetrics] plugin %v failed to convert node resource info to metrics", plugin.Name()) } return resp, err }) if err != nil { - log.Errorf(ctx, err, "[GetNodeMetrics] failed to convert node resource info to metrics") + log.Error(ctx, err, "[GetNodeMetrics] failed to convert node resource info to metrics") return nil, err } @@ -413,7 +413,7 @@ func (pm *PluginsManager) AddNode(ctx context.Context, nodename string, resource respMap, err := callPlugins(ctx, pm.plugins, func(plugin Plugin) (*AddNodeResponse, error) { resp, err := plugin.AddNode(ctx, nodename, resourceOpts, nodeInfo) if err != nil { - log.Errorf(ctx, err, "[AddNode] node %v plugin %v failed to add node, req: %v, err: %v", nodename, plugin.Name(), resourceOpts, err) + log.Errorf(ctx, err, "[AddNode] node %v plugin %v failed to add node, req: %v", nodename, plugin.Name(), resourceOpts) } return resp, err }) @@ -439,13 +439,13 @@ func (pm *PluginsManager) AddNode(ctx context.Context, nodename string, resource _, err := callPlugins(ctx, rollbackPlugins, func(plugin Plugin) (*RemoveNodeResponse, error) { resp, err := plugin.RemoveNode(ctx, nodename) if err != nil { - log.Errorf(ctx, err, "[AddNode] node %v plugin %v failed to rollback, err: %v", nodename, plugin.Name(), err) + log.Errorf(ctx, err, "[AddNode] node %v plugin %v failed to rollback", nodename, plugin.Name()) } return resp, err }) if err != nil { - log.Errorf(ctx, err, "[AddNode] failed to rollback") + log.Error(ctx, err, "[AddNode] failed to rollback") return err } @@ -467,7 +467,7 @@ func (pm *PluginsManager) RemoveNode(ctx context.Context, nodename string) error var err error resourceCapacityMap, resourceUsageMap, _, err = pm.GetNodeResourceInfo(ctx, nodename, nil, false) if err != nil { - log.Errorf(ctx, err, "[RemoveNode] failed to get node %v resource, err: %v", nodename, err) + log.Errorf(ctx, err, "[RemoveNode] failed to get node %v resource", nodename) return err } return nil @@ -477,7 +477,7 @@ func (pm *PluginsManager) RemoveNode(ctx context.Context, nodename string) error respMap, err := callPlugins(ctx, pm.plugins, func(plugin Plugin) (*RemoveNodeResponse, error) { resp, err := plugin.RemoveNode(ctx, nodename) if err != nil { - log.Errorf(ctx, err, "[AddNode] plugin %v failed to remove node, err: %v", plugin.Name(), err) + log.Errorf(ctx, err, "[AddNode] plugin %v failed to remove node", plugin.Name()) } return resp, err }) @@ -497,13 +497,13 @@ func (pm *PluginsManager) RemoveNode(ctx context.Context, nodename string) error _, err := callPlugins(ctx, rollbackPlugins, func(plugin Plugin) (*SetNodeResourceInfoResponse, error) { resp, err := plugin.SetNodeResourceInfo(ctx, nodename, resourceCapacityMap[plugin.Name()], resourceUsageMap[plugin.Name()]) if err != nil { - log.Errorf(ctx, err, "[RemoveNode] plugin %v node %v failed to rollback, err: %v", plugin.Name(), nodename, err) + log.Errorf(ctx, err, "[RemoveNode] plugin %v node %v failed to rollback", plugin.Name(), nodename) } return resp, err }) if err != nil { - log.Errorf(ctx, err, "[RemoveNode] failed to rollback") + log.Error(ctx, err, "[RemoveNode] failed to rollback") return err } return nil @@ -521,7 +521,7 @@ func (pm *PluginsManager) GetRemapArgs(ctx context.Context, nodename string, wor respMap, err := callPlugins(ctx, pm.plugins, func(plugin Plugin) (*GetRemapArgsResponse, error) { resp, err := plugin.GetRemapArgs(ctx, nodename, workloadMap) if err != nil { - log.Errorf(ctx, err, "[GetRemapArgs] plugin %v node %v failed to remap, err: %v", plugin.Name(), nodename, err) + log.Errorf(ctx, err, "[GetRemapArgs] plugin %v node %v failed to remap", plugin.Name(), nodename) } return resp, err }) @@ -538,7 +538,7 @@ func (pm *PluginsManager) GetRemapArgs(ctx context.Context, nodename string, wor } resEngineArgsMap[workloadID], err = pm.mergeEngineArgs(ctx, resEngineArgsMap[workloadID], engineArgs) if err != nil { - log.Errorf(ctx, err, "[GetRemapArgs] invalid engine args") + log.Error(ctx, err, "[GetRemapArgs] invalid engine args") return nil, err } } @@ -581,7 +581,7 @@ func (pm *PluginsManager) mergeEngineArgs(ctx context.Context, m1 types.EngineAr _, ok1 := res[key].([]string) _, ok2 := value.([]string) if !ok1 || !ok2 { - log.Errorf(ctx, nil, "[mergeEngineArgs] only two string slices can be merged! error key %v, m1[key] = %v, m2[key] = %v", key, m1[key], m2[key]) + log.Errorf(ctx, types.ErrInvalidEngineArgs, "[mergeEngineArgs] only two string slices can be merged! error key %v, m1[key] = %v, m2[key] = %v", key, m1[key], m2[key]) return nil, types.ErrInvalidEngineArgs } res[key] = append(res[key].([]string), value.([]string)...) diff --git a/resources/plugins.go b/resources/plugins.go index 0794b95ec..2735703f2 100644 --- a/resources/plugins.go +++ b/resources/plugins.go @@ -34,7 +34,7 @@ func (pm *PluginsManager) LoadPlugins(ctx context.Context) error { pluginFiles, err := utils.ListAllExecutableFiles(pm.config.ResourcePlugin.Dir) if err != nil { - log.Errorf(ctx, err, "[LoadPlugins] failed to list all executable files dir: %v, err: %v", pm.config.ResourcePlugin.Dir, err) + log.Errorf(ctx, err, "[LoadPlugins] failed to list all executable files dir: %v", pm.config.ResourcePlugin.Dir) return err } @@ -72,7 +72,7 @@ func callPlugins[T any](ctx context.Context, plugins []Plugin, f func(Plugin) (T for _, plugin := range plugins { result, err := f(plugin) if err != nil { - log.Errorf(ctx, err, "[callPlugins] failed to call plugin %v, err: %v", plugin.Name(), err) + log.Errorf(ctx, err, "[callPlugins] failed to call plugin %v", plugin.Name()) combinedErr = multierror.Append(combinedErr, types.NewDetailedErr(err, plugin.Name())) continue } diff --git a/resources/volume/models/alloc.go b/resources/volume/models/alloc.go index 0a28e2c96..e7e28d6b9 100644 --- a/resources/volume/models/alloc.go +++ b/resources/volume/models/alloc.go @@ -15,13 +15,13 @@ import ( // GetDeployArgs . func (v *Volume) GetDeployArgs(ctx context.Context, node string, deployCount int, opts *types.WorkloadResourceOpts) ([]*types.EngineArgs, []*types.WorkloadResourceArgs, error) { if err := opts.Validate(); err != nil { - log.Errorf(ctx, err, "[Alloc] invalid resource opts %+v, err: %v", opts, err) + log.Errorf(ctx, err, "[Alloc] invalid resource opts %+v", opts) return nil, nil, err } resourceInfo, err := v.doGetNodeResourceInfo(ctx, node) if err != nil { - log.Errorf(ctx, err, "[Alloc] failed to get resource info of node %v, err: %v", node, err) + log.Errorf(ctx, err, "[Alloc] failed to get resource info of node %v", node) return nil, nil, err } diff --git a/resources/volume/models/capacity.go b/resources/volume/models/capacity.go index dbee819dc..d923ffd33 100644 --- a/resources/volume/models/capacity.go +++ b/resources/volume/models/capacity.go @@ -13,7 +13,7 @@ import ( // GetNodesDeployCapacity . func (v *Volume) GetNodesDeployCapacity(ctx context.Context, nodes []string, opts *types.WorkloadResourceOpts) (map[string]*types.NodeCapacityInfo, int, error) { if err := opts.Validate(); err != nil { - log.Errorf(ctx, err, "[GetNodesDeployCapacity] invalid resource opts %+v, err: %v", opts, err) + log.Errorf(ctx, err, "[GetNodesDeployCapacity] invalid resource opts %+v", opts) return nil, 0, err } @@ -22,7 +22,7 @@ func (v *Volume) GetNodesDeployCapacity(ctx context.Context, nodes []string, opt for _, node := range nodes { resourceInfo, err := v.doGetNodeResourceInfo(ctx, node) if err != nil { - log.Errorf(ctx, err, "[GetNodesDeployCapacity] failed to get resource info of node %v, err: %v", node, err) + log.Errorf(ctx, err, "[GetNodesDeployCapacity] failed to get resource info of node %v", node) return nil, 0, err } capacityInfo := v.doGetNodeCapacityInfo(ctx, node, resourceInfo, opts) diff --git a/resources/volume/models/info.go b/resources/volume/models/info.go index 7396f5ac4..9a535c6ac 100644 --- a/resources/volume/models/info.go +++ b/resources/volume/models/info.go @@ -9,6 +9,7 @@ import ( "github.com/projecteru2/core/log" "github.com/projecteru2/core/resources/volume/types" + coretypes "github.com/projecteru2/core/types" "github.com/projecteru2/core/utils" ) @@ -140,7 +141,7 @@ func (v *Volume) calculateNodeResourceArgs(origin *types.NodeResourceArgs, nodeR func (v *Volume) SetNodeResourceUsage(ctx context.Context, node string, nodeResourceOpts *types.NodeResourceOpts, nodeResourceArgs *types.NodeResourceArgs, workloadResourceArgs []*types.WorkloadResourceArgs, delta bool, incr bool) (before *types.NodeResourceArgs, after *types.NodeResourceArgs, err error) { resourceInfo, err := v.doGetNodeResourceInfo(ctx, node) if err != nil { - log.Errorf(ctx, err, "[SetNodeResourceInfo] failed to get resource info of node %v, err: %v", node, err) + log.Errorf(ctx, err, "[SetNodeResourceInfo] failed to get resource info of node %v", node) return nil, nil, err } @@ -157,7 +158,7 @@ func (v *Volume) SetNodeResourceUsage(ctx context.Context, node string, nodeReso func (v *Volume) SetNodeResourceCapacity(ctx context.Context, node string, nodeResourceOpts *types.NodeResourceOpts, nodeResourceArgs *types.NodeResourceArgs, delta bool, incr bool) (before *types.NodeResourceArgs, after *types.NodeResourceArgs, err error) { resourceInfo, err := v.doGetNodeResourceInfo(ctx, node) if err != nil { - log.Errorf(ctx, err, "[SetNodeResourceInfo] failed to get resource info of node %v, err: %v", node, err) + log.Errorf(ctx, err, "[SetNodeResourceInfo] failed to get resource info of node %v", node) return nil, nil, err } @@ -165,7 +166,7 @@ func (v *Volume) SetNodeResourceCapacity(ctx context.Context, node string, nodeR if nodeResourceOpts != nil { if len(nodeResourceOpts.RMDisks) > 0 { if delta { - return nil, nil, fmt.Errorf("rm disk is not supported when delta is true") + return nil, nil, coretypes.ErrInvalidEngineArgs } rmDisksMap := map[string]struct{}{} for _, rmDisk := range nodeResourceOpts.RMDisks { @@ -205,11 +206,11 @@ func (v *Volume) doGetNodeResourceInfo(ctx context.Context, node string) (*types resourceInfo := &types.NodeResourceInfo{} resp, err := v.store.GetOne(ctx, fmt.Sprintf(NodeResourceInfoKey, node)) if err != nil { - log.Errorf(ctx, err, "[doGetNodeResourceInfo] failed to get node resource info of node %v, err: %v", node, err) + log.Errorf(ctx, err, "[doGetNodeResourceInfo] failed to get node resource info of node %v", node) return nil, err } if err = json.Unmarshal(resp.Value, resourceInfo); err != nil { - log.Errorf(ctx, err, "[doGetNodeResourceInfo] failed to unmarshal node resource info of node %v, err: %v", node, err) + log.Errorf(ctx, err, "[doGetNodeResourceInfo] failed to unmarshal node resource info of node %v", node) return nil, err } return resourceInfo, nil @@ -217,18 +218,18 @@ func (v *Volume) doGetNodeResourceInfo(ctx context.Context, node string) (*types func (v *Volume) doSetNodeResourceInfo(ctx context.Context, node string, resourceInfo *types.NodeResourceInfo) error { if err := resourceInfo.Validate(); err != nil { - log.Errorf(ctx, err, "[doSetNodeResourceInfo] invalid resource info %v, err: %v", litter.Sdump(resourceInfo), err) + log.Errorf(ctx, err, "[doSetNodeResourceInfo] invalid resource info %v", litter.Sdump(resourceInfo)) return err } data, err := json.Marshal(resourceInfo) if err != nil { - log.Errorf(ctx, err, "[doSetNodeResourceInfo] faield to marshal resource info %+v, err: %v", resourceInfo, err) + log.Errorf(ctx, err, "[doSetNodeResourceInfo] faield to marshal resource info %+v", resourceInfo) return err } if _, err = v.store.Put(ctx, fmt.Sprintf(NodeResourceInfoKey, node), string(data)); err != nil { - log.Errorf(ctx, err, "[doSetNodeResourceInfo] faield to put resource info %+v, err: %v", resourceInfo, err) + log.Errorf(ctx, err, "[doSetNodeResourceInfo] faield to put resource info %+v", resourceInfo) return err } return nil diff --git a/resources/volume/models/node.go b/resources/volume/models/node.go index 7242bdbba..a72fb8cab 100644 --- a/resources/volume/models/node.go +++ b/resources/volume/models/node.go @@ -15,7 +15,7 @@ import ( func (v *Volume) AddNode(ctx context.Context, node string, resourceOpts *types.NodeResourceOpts) (*types.NodeResourceInfo, error) { if _, err := v.doGetNodeResourceInfo(ctx, node); err != nil { if !errors.Is(err, coretypes.ErrBadCount) { - log.Errorf(ctx, err, "[AddNode] failed to get resource info of node %v, err: %v", node, err) + log.Errorf(ctx, err, "[AddNode] failed to get resource info of node %v", node) return nil, err } } else { @@ -37,7 +37,7 @@ func (v *Volume) AddNode(ctx context.Context, node string, resourceOpts *types.N // RemoveNode . func (v *Volume) RemoveNode(ctx context.Context, node string) error { if _, err := v.store.Delete(ctx, fmt.Sprintf(NodeResourceInfoKey, node)); err != nil { - log.Errorf(ctx, err, "[RemoveNode] faield to delete node %v, err: %v", node, err) + log.Errorf(ctx, err, "[RemoveNode] faield to delete node %v", node) return err } return nil diff --git a/resources/volume/models/realloc.go b/resources/volume/models/realloc.go index eeef73faa..92132b72a 100644 --- a/resources/volume/models/realloc.go +++ b/resources/volume/models/realloc.go @@ -15,7 +15,7 @@ import ( func (v *Volume) GetReallocArgs(ctx context.Context, node string, originResourceArgs *types.WorkloadResourceArgs, resourceOpts *types.WorkloadResourceOpts) (*types.EngineArgs, *types.WorkloadResourceArgs, *types.WorkloadResourceArgs, error) { resourceInfo, err := v.doGetNodeResourceInfo(ctx, node) if err != nil { - log.Errorf(ctx, err, "[Realloc] failed to get resource info of node %v, err: %v", node, err) + log.Errorf(ctx, err, "[Realloc] failed to get resource info of node %v", node) return nil, nil, nil, err } @@ -31,7 +31,7 @@ func (v *Volume) GetReallocArgs(ctx context.Context, node string, originResource resourceOpts.SkipAddStorage() if err := resourceOpts.Validate(); err != nil { - log.Errorf(ctx, err, "[Realloc] invalid resource opts %v, err: %v", litter.Sdump(resourceOpts), err) + log.Errorf(ctx, err, "[Realloc] invalid resource opts %v", litter.Sdump(resourceOpts)) return nil, nil, nil, err } diff --git a/resources/volume/models/volume.go b/resources/volume/models/volume.go index 7541f9383..fa8416e23 100644 --- a/resources/volume/models/volume.go +++ b/resources/volume/models/volume.go @@ -19,7 +19,7 @@ func NewVolume(config coretypes.Config) (*Volume, error) { if len(config.Etcd.Machines) > 0 { v.store, err = meta.NewETCD(config.Etcd, nil) if err != nil { - log.Errorf(nil, err, "[NewVolume] failed to create etcd client, err: %v", err) //nolint + log.Error(nil, err, "[NewVolume] failed to create etcd client") //nolint return nil, err } } diff --git a/resources/volume/schedule/schedule.go b/resources/volume/schedule/schedule.go index f05089b4f..7e6074f59 100644 --- a/resources/volume/schedule/schedule.go +++ b/resources/volume/schedule/schedule.go @@ -449,7 +449,7 @@ func (h *host) getVolumePlans(requests types.VolumeBindings) ([]types.VolumePlan normalDiskPlans, monoDiskPlans := bestDiskPlans[0], bestDiskPlans[1] unlimitedVolumePlans, err := h.getUnlimitedPlans(normalVolumePlans, monoVolumePlans, unlimitedRequests, bestCapacity) if err != nil { - log.Errorf(nil, err, "[getVolumePlans] failed to get unlimited volume plans") //nolint + log.Error(nil, err, "[getVolumePlans] failed to get unlimited volume plans") //nolint return nil, nil } @@ -582,14 +582,14 @@ func (h *host) getAffinityPlan(requests types.VolumeBindings, originVolumePlan t // process mount requests mountDiskPlan, err := h.getMountDiskPlan(mountRequests) if err != nil { - log.Errorf(nil, err, "[getAffinityPlan] alloc mount requests failed") //nolint + log.Error(nil, err, "[getAffinityPlan] alloc mount requests failed") //nolint return nil, nil, err } diskPlan.Add(mountDiskPlan) // process normal requests if err = commonProcess(normalRequests); err != nil { - log.Errorf(nil, err, "[getAffinityPlan] alloc normal requests failed") //nolint + log.Error(nil, err, "[getAffinityPlan] alloc normal requests failed") //nolint return nil, nil, err } @@ -628,7 +628,7 @@ func (h *host) getAffinityPlan(requests types.VolumeBindings, originVolumePlan t monoVolumePlan, monoDiskPlan, err := h.getMonoPlan(monoRequests, volume) if err != nil { - log.Errorf(nil, err, "[getAffinityPlan] failed to get new mono plan, err: %v", err) //nolint + log.Error(nil, err, "[getAffinityPlan] failed to get new mono plan") //nolint return nil, nil, err } @@ -638,7 +638,7 @@ func (h *host) getAffinityPlan(requests types.VolumeBindings, originVolumePlan t // process unlimited requests if err := commonProcess(unlimitedRequests); err != nil { - log.Errorf(nil, err, "[getAffinityPlan] alloc mount requests failed") //nolint + log.Error(nil, err, "[getAffinityPlan] alloc mount requests failed") //nolint return nil, nil, err } diff --git a/resources/volume/types/disk.go b/resources/volume/types/disk.go index 634645cc2..abd9b27ff 100644 --- a/resources/volume/types/disk.go +++ b/resources/volume/types/disk.go @@ -28,7 +28,7 @@ func (d *Disk) String() string { func (d *Disk) ParseFromString(s string) (err error) { parts := strings.Split(s, ":") if len(parts) != 6 { - return fmt.Errorf("invalid disk string: %v", s) + return ErrInvalidStorage } d.Device = parts[0] d.Mounts = strings.Split(parts[1], ",") diff --git a/resources/volume/types/errors.go b/resources/volume/types/errors.go index 05da43fc5..ed848a11b 100644 --- a/resources/volume/types/errors.go +++ b/resources/volume/types/errors.go @@ -1,6 +1,6 @@ package types -import "errors" +import "github.com/pkg/errors" var ( ErrInvalidCapacity = errors.New("invalid capacity") diff --git a/rpc/rpc.go b/rpc/rpc.go index 75af52b4b..325b31141 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -529,11 +529,11 @@ func (v *Vibranium) Copy(opts *pb.CopyOptions, stream pb.CoreRPC_CopyServer) err Size: int64(len(m.Content)), } if err = tw.WriteHeader(header); err != nil { - log.Errorf(task.context, err, "[Copy] Error during writing tarball header: %v", err) + log.Error(task.context, err, "[Copy] Error during writing tarball header") return } if _, err = tw.Write(m.Content); err != nil { - log.Errorf(task.context, err, "[Copy] Error during writing tarball content: %v", err) + log.Error(task.context, err, "[Copy] Error during writing tarball content") return } } @@ -543,7 +543,7 @@ func (v *Vibranium) Copy(opts *pb.CopyOptions, stream pb.CoreRPC_CopyServer) err n, err := r.Read(p) if err != nil { if err != io.EOF { - log.Errorf(task.context, err, "[Copy] Error during buffer resp: %v", err) + log.Error(task.context, err, "[Copy] Error during buffer resp") msg.Error = err.Error() if err = stream.Send(msg); err != nil { v.logUnsentMessages(task.context, "Copy", err, m) @@ -814,7 +814,7 @@ func (v *Vibranium) ExecuteWorkload(stream pb.CoreRPC_ExecuteWorkloadServer) err for { execWorkloadOpt, err := stream.Recv() if execWorkloadOpt == nil || err != nil { - log.Errorf(task.context, err, "[ExecuteWorkload] Recv command error: %v", err) + log.Error(task.context, err, "[ExecuteWorkload] Recv command error") return } inCh <- execWorkloadOpt.ReplCmd @@ -934,7 +934,7 @@ func (v *Vibranium) RunAndWait(stream pb.CoreRPC_RunAndWaitServer) error { for { RunAndWaitOptions, err := stream.Recv() if RunAndWaitOptions == nil || err != nil { - log.Errorf(ctx, err, "[RunAndWait] Recv command error: %v", err) + log.Error(ctx, err, "[RunAndWait] Recv command") break } inCh <- RunAndWaitOptions.Cmd @@ -984,7 +984,7 @@ func (v *Vibranium) RunAndWait(stream pb.CoreRPC_RunAndWaitServer) error { defer w.Close() for m := range ch { if _, err := w.Write(m.Data); err != nil { - log.Errorf(ctx, err, "[Async RunAndWait] iterate and forward AttachWorkloadMessage error: %v", err) + log.Error(ctx, err, "[Async RunAndWait] iterate and forward AttachWorkloadMessage") } } }) @@ -998,7 +998,7 @@ func (v *Vibranium) RunAndWait(stream pb.CoreRPC_RunAndWaitServer) error { for { if part, isPrefix, err = bufReader.ReadLine(); err != nil { if err != io.EOF { - log.Errorf(ctx, err, "[Aysnc RunAndWait] read error: %+v", err) + log.Error(ctx, err, "[Aysnc RunAndWait] read error") } return } diff --git a/rpc/transform.go b/rpc/transform.go index c38b49a17..4641ec065 100644 --- a/rpc/transform.go +++ b/rpc/transform.go @@ -467,7 +467,7 @@ func toRPCWorkloads(ctx context.Context, workloads []*types.Workload, labels map for _, c := range workloads { pWorkload, err := toRPCWorkload(ctx, c) if err != nil { - log.Errorf(ctx, err, "[toRPCWorkloads] trans to pb workload failed %v", err) + log.Error(ctx, err, "[toRPCWorkloads] trans to pb workload failed") continue } if !utils.LabelsFilter(pWorkload.Labels, labels) { diff --git a/selfmon/selfmon.go b/selfmon/selfmon.go index dd4a97da1..1ea9b9613 100644 --- a/selfmon/selfmon.go +++ b/selfmon/selfmon.go @@ -32,7 +32,7 @@ func RunNodeStatusWatcher(ctx context.Context, config types.Config, cluster clus id := rand.Int63n(10000) // nolint store, err := store.NewStore(config, t) if err != nil { - log.Errorf(ctx, err, "[RunNodeStatusWatcher] %v failed to create store, err: %v", id, err) + log.Errorf(ctx, err, "[RunNodeStatusWatcher] %v failed to create store", id) return } @@ -53,7 +53,7 @@ func (n *NodeStatusWatcher) run(ctx context.Context) { default: n.withActiveLock(ctx, func(ctx context.Context) { if err := n.monitor(ctx); err != nil { - log.Errorf(ctx, err, "[NodeStatusWatcher] %v stops watching, err: %v", n.id, err) + log.Errorf(ctx, err, "[NodeStatusWatcher] %v stops watching", n.id) } }) time.Sleep(n.config.ConnectionTimeout) @@ -91,7 +91,7 @@ func (n *NodeStatusWatcher) withActiveLock(parentCtx context.Context, f func(ctx log.Info(ctx, "[Register] context canceled") return } else if !errors.Is(err, types.ErrKeyExists) { - log.Errorf(ctx, err, "[Register] failed to re-register: %v", err) + log.Error(ctx, err, "[Register] failed to re-register") time.Sleep(time.Second) continue } @@ -146,7 +146,7 @@ func (n *NodeStatusWatcher) initNodeStatus(ctx context.Context) { CallInfo: false, }) if err != nil { - log.Errorf(ctx, err, "[NodeStatusWatcher] get pod nodes failed %v", err) + log.Error(ctx, err, "[NodeStatusWatcher] get pod nodes failed") return } for node := range ch { @@ -155,7 +155,7 @@ func (n *NodeStatusWatcher) initNodeStatus(ctx context.Context) { } }) if err != nil { - log.Errorf(ctx, err, "[NodeStatusWatcher] get pod nodes failed %v", err) + log.Error(ctx, err, "[NodeStatusWatcher] get pod nodes failed") return } }() @@ -214,7 +214,7 @@ func (n *NodeStatusWatcher) dealNodeStatusMessage(ctx context.Context, message * WorkloadsDown: true, } if _, err := n.cluster.SetNode(ctx, opts); err != nil { - log.Errorf(ctx, err, "[NodeStatusWatcher] set node %s failed %v", message.Nodename, err) + log.Errorf(ctx, err, "[NodeStatusWatcher] set node %s failed", message.Nodename) return } log.Infof(ctx, "[NodeStatusWatcher] set node %s as alive: %v", message.Nodename, message.Alive) diff --git a/source/common/common.go b/source/common/common.go index 4cbb64afc..945ca8b04 100644 --- a/source/common/common.go +++ b/source/common/common.go @@ -2,7 +2,6 @@ package common import ( "context" - "fmt" "io" "net/http" "net/url" @@ -10,6 +9,7 @@ import ( "path/filepath" "strings" + "github.com/pkg/errors" "github.com/projecteru2/core/log" "github.com/projecteru2/core/types" @@ -123,7 +123,7 @@ func (g *GitScm) Artifact(ctx context.Context, artifact, path string) error { } defer resp.Body.Close() if resp.StatusCode != 200 { - return fmt.Errorf("Download artifact error %q, code %d", artifact, resp.StatusCode) + return errors.Errorf("Download artifact error %q, code %d", artifact, resp.StatusCode) } // extract files from zipfile diff --git a/store/etcdv3/meta/ephemeral.go b/store/etcdv3/meta/ephemeral.go index 8d6d5bcd6..317b96be5 100644 --- a/store/etcdv3/meta/ephemeral.go +++ b/store/etcdv3/meta/ephemeral.go @@ -47,7 +47,7 @@ func (e *ETCD) StartEphemeral(ctx context.Context, path string, heartbeat time.D ctx, cancel := context.WithTimeout(context.TODO(), time.Minute) defer cancel() if _, err := e.cliv3.Revoke(ctx, lease.ID); err != nil { - log.Errorf(ctx, err, "[StartEphemeral] revoke %d with %s failed: %v", lease.ID, path, err) + log.Errorf(ctx, err, "[StartEphemeral] revoke %d with %s failed", lease.ID, path) } }() @@ -55,7 +55,7 @@ func (e *ETCD) StartEphemeral(ctx context.Context, path string, heartbeat time.D select { case <-tick.C: if _, err := e.cliv3.KeepAliveOnce(ctx, lease.ID); err != nil { - log.Errorf(ctx, err, "[StartEphemeral] keepalive %d with %s failed: %v", lease.ID, path, err) + log.Errorf(ctx, err, "[StartEphemeral] keepalive %d with %s failed", lease.ID, path) return } case <-ctx.Done(): diff --git a/store/etcdv3/meta/etcd.go b/store/etcdv3/meta/etcd.go index c99ebb7eb..4377c94fd 100644 --- a/store/etcdv3/meta/etcd.go +++ b/store/etcdv3/meta/etcd.go @@ -415,7 +415,7 @@ func (e *ETCD) revokeLease(ctx context.Context, leaseID clientv3.LeaseID) { return } if _, err := e.cliv3.Revoke(ctx, leaseID); err != nil { - log.Errorf(ctx, err, "[etcd revoke lease error] %v", err) + log.Error(ctx, err, "[etcd] revoke lease failed") } } diff --git a/store/etcdv3/node.go b/store/etcdv3/node.go index a55bb6657..04eb8fcd2 100644 --- a/store/etcdv3/node.go +++ b/store/etcdv3/node.go @@ -188,7 +188,7 @@ func (m *Mercury) NodeStatusStream(ctx context.Context) chan *types.NodeStatus { for resp := range m.Watch(ctx, nodeStatusPrefix, clientv3.WithPrefix()) { if resp.Err() != nil { if !resp.Canceled { - log.Errorf(ctx, resp.Err(), "[NodeStatusStream] watch failed %v", resp.Err()) + log.Error(ctx, resp.Err(), "[NodeStatusStream] watch failed") } return } @@ -318,7 +318,7 @@ func (m *Mercury) doGetNodes(ctx context.Context, kvs []*mvccpb.KeyValue, labels _ = m.pool.Invoke(func() { defer wg.Done() if _, err := m.GetNodeStatus(ctx, node.Name); err != nil && !errors.Is(err, types.ErrBadCount) { - log.Errorf(ctx, err, "[doGetNodes] failed to get node status of %v, err: %v", node.Name, err) + log.Errorf(ctx, err, "[doGetNodes] failed to get node status of %v", node.Name) } else { node.Available = err == nil } @@ -329,7 +329,7 @@ func (m *Mercury) doGetNodes(ctx context.Context, kvs []*mvccpb.KeyValue, labels // update engine if client, err := m.makeClient(ctx, node); err != nil { - log.Errorf(ctx, err, "[doGetNodes] failed to make client for %v, err: %v", node.Name, err) + log.Errorf(ctx, err, "[doGetNodes] failed to make client for %v", node.Name) } else { node.Engine = client } diff --git a/store/etcdv3/processing.go b/store/etcdv3/processing.go index a9bdf52e0..4a2a4fcf5 100644 --- a/store/etcdv3/processing.go +++ b/store/etcdv3/processing.go @@ -47,7 +47,7 @@ func (m *Mercury) doLoadProcessing(ctx context.Context, appname, entryname strin nodename := parts[len(parts)-2] count, err := strconv.Atoi(string(ev.Value)) if err != nil { - log.Errorf(ctx, err, "[doLoadProcessing] Load processing status failed %v", err) + log.Error(ctx, err, "[doLoadProcessing] Load processing status failed") continue } if _, ok := nodesCount[nodename]; !ok { diff --git a/store/etcdv3/service.go b/store/etcdv3/service.go index e006a1ae5..16e6c18a9 100644 --- a/store/etcdv3/service.go +++ b/store/etcdv3/service.go @@ -48,7 +48,7 @@ func (m *Mercury) ServiceStatusStream(ctx context.Context) (chan []string, error resp, err := m.Get(ctx, fmt.Sprintf(serviceStatusKey, ""), clientv3.WithPrefix()) if err != nil { - log.Errorf(ctx, err, "[ServiceStatusStream] failed to get current services: %v", err) + log.Error(ctx, err, "[ServiceStatusStream] failed to get current services") return } eps := endpoints{} @@ -60,7 +60,7 @@ func (m *Mercury) ServiceStatusStream(ctx context.Context) (chan []string, error for resp := range watchChan { if resp.Err() != nil { if !resp.Canceled { - log.Errorf(ctx, err, "[ServiceStatusStream] watch failed %v", resp.Err()) + log.Error(ctx, err, "[ServiceStatusStream] watch failed") } return } diff --git a/store/etcdv3/workload.go b/store/etcdv3/workload.go index 3205a9f41..0df39f694 100644 --- a/store/etcdv3/workload.go +++ b/store/etcdv3/workload.go @@ -152,7 +152,7 @@ func (m *Mercury) WorkloadStatusStream(ctx context.Context, appname, entrypoint, for resp := range m.Watch(ctx, statusKey, clientv3.WithPrefix()) { if resp.Err() != nil { if !resp.Canceled { - log.Errorf(ctx, resp.Err(), "[WorkloadStatusStream] watch failed %v", resp.Err()) + log.Error(ctx, resp.Err(), "[WorkloadStatusStream] watch failed") } return } @@ -201,7 +201,7 @@ func (m *Mercury) doGetWorkloads(ctx context.Context, keys []string) (workloads for _, kv := range kvs { workload := &types.Workload{} if err = json.Unmarshal(kv.Value, workload); err != nil { - log.Errorf(ctx, err, "[doGetWorkloads] failed to unmarshal %v, err: %v", string(kv.Key), err) + log.Errorf(ctx, err, "[doGetWorkloads] failed to unmarshal %v", string(kv.Key)) return } workloads = append(workloads, workload) diff --git a/store/redis/ephemeral.go b/store/redis/ephemeral.go index 1c88de1b8..db78f3e5e 100644 --- a/store/redis/ephemeral.go +++ b/store/redis/ephemeral.go @@ -59,7 +59,7 @@ func (r *Rediaron) revokeEphemeral(path string) { ctx, cancel := context.WithTimeout(context.TODO(), time.Second) defer cancel() if _, err := r.cli.Del(ctx, path).Result(); err != nil { - log.Errorf(nil, err, "[refreshEphemeral] revoke with %s failed: %v", path, err) //nolint + log.Errorf(nil, err, "[refreshEphemeral] revoke with %s failed", path) //nolint } } diff --git a/store/redis/node.go b/store/redis/node.go index 8b088f3bb..204f08407 100644 --- a/store/redis/node.go +++ b/store/redis/node.go @@ -301,7 +301,7 @@ func (r *Rediaron) doGetNodes(ctx context.Context, kvs map[string]string, labels _ = r.pool.Invoke(func() { defer wg.Done() if _, err := r.GetNodeStatus(ctx, node.Name); err != nil && !errors.Is(err, types.ErrBadCount) { - log.Errorf(ctx, err, "[doGetNodes] failed to get node status of %v, err: %v", node.Name, err) + log.Errorf(ctx, err, "[doGetNodes] failed to get node status of %v", node.Name) } else { node.Available = err == nil } @@ -312,7 +312,7 @@ func (r *Rediaron) doGetNodes(ctx context.Context, kvs map[string]string, labels nodeChan <- node if client, err := r.makeClient(ctx, node); err != nil { - log.Errorf(ctx, err, "[doGetNodes] failed to make client for %v, err: %v", node.Name, err) + log.Errorf(ctx, err, "[doGetNodes] failed to make client for %v", node.Name) } else { node.Engine = client } diff --git a/store/redis/processing.go b/store/redis/processing.go index daa2226f1..f06f1e8da 100644 --- a/store/redis/processing.go +++ b/store/redis/processing.go @@ -43,7 +43,7 @@ func (r *Rediaron) doLoadProcessing(ctx context.Context, appname, entryname stri nodename := parts[len(parts)-2] count, err := strconv.Atoi(v) if err != nil { - log.Errorf(ctx, err, "[doLoadProcessing] Load processing status failed %v", err) + log.Error(ctx, err, "[doLoadProcessing] Load processing status failed") continue } if _, ok := nodesCount[nodename]; !ok { diff --git a/store/redis/service.go b/store/redis/service.go index 5e39d6182..d05667338 100644 --- a/store/redis/service.go +++ b/store/redis/service.go @@ -45,7 +45,7 @@ func (r *Rediaron) ServiceStatusStream(ctx context.Context) (chan []string, erro data, err := r.getByKeyPattern(ctx, key, 0) if err != nil { - log.Errorf(ctx, err, "[ServiceStatusStream] failed to get current services: %v", err) + log.Error(ctx, err, "[ServiceStatusStream] failed to get current services") return } eps := endpoints{} diff --git a/store/redis/workload.go b/store/redis/workload.go index e80f477c3..0eecd8889 100644 --- a/store/redis/workload.go +++ b/store/redis/workload.go @@ -193,7 +193,7 @@ func (r *Rediaron) doGetWorkloads(ctx context.Context, keys []string) ([]*types. for k, v := range data { workload := &types.Workload{} if err = json.Unmarshal([]byte(v), workload); err != nil { - log.Errorf(ctx, err, "[doGetWorkloads] failed to unmarshal %v, err: %v", k, err) + log.Errorf(ctx, err, "[doGetWorkloads] failed to unmarshal %v", k) return nil, err } workloads = append(workloads, workload) @@ -240,8 +240,8 @@ func (r *Rediaron) bindWorkloadsAdditions(ctx context.Context, workloads []*type } status := &types.StatusMeta{} if err := json.Unmarshal([]byte(v), &status); err != nil { - log.Warnf(ctx, "[bindWorkloadsAdditions] unmarshal %s status data failed %v", workload.ID, err) - log.Errorf(ctx, err, "[bindWorkloadsAdditions] status raw: %s", v) + log.Warnf(ctx, "[bindWorkloadsAdditions] unmarshal %s status data, raw %s", workload.ID, v) + log.Error(ctx, err, "[bindWorkloadsAdditions] unmarshal status failed") continue } workloads[index].StatusMeta = status diff --git a/utils/service.go b/utils/service.go index 921703285..b23d7e05a 100644 --- a/utils/service.go +++ b/utils/service.go @@ -4,13 +4,15 @@ import ( "fmt" "net" "strings" + + "github.com/pkg/errors" ) // GetOutboundAddress finds out self-service address func GetOutboundAddress(bind string, probeTarget string) (string, error) { parts := strings.Split(bind, ":") if len(parts) != 2 { - return "", fmt.Errorf("invalid bind address %s", bind) + return "", errors.Errorf("invalid bind address %s", bind) } ip := parts[0] port := parts[1] diff --git a/utils/utils.go b/utils/utils.go index fc2a940e1..9e93f2b70 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -140,7 +140,7 @@ func DecodePublishInfo(info map[string]string) map[string][]string { func EncodeMetaInLabel(ctx context.Context, meta *types.LabelMeta) string { data, err := json.Marshal(meta) if err != nil { - log.Errorf(ctx, err, "[EncodeMetaInLabel] Encode meta failed %v", err) + log.Error(ctx, err, "[EncodeMetaInLabel] Encode meta failed") return "" } return string(data) @@ -152,7 +152,7 @@ func DecodeMetaInLabel(ctx context.Context, labels map[string]string) *types.Lab metastr, ok := labels[cluster.LabelMeta] if ok { if err := json.Unmarshal([]byte(metastr), meta); err != nil { - log.Errorf(ctx, err, "[DecodeMetaInLabel] Decode failed %v", err) + log.Error(ctx, err, "[DecodeMetaInLabel] Decode failed") } } return meta @@ -212,7 +212,7 @@ func EnsureReaderClosed(ctx context.Context, stream io.ReadCloser) { return } if _, err := io.Copy(io.Discard, stream); err != nil { - log.Errorf(ctx, err, "[EnsureReaderClosed] empty stream failed %v", err) + log.Error(ctx, err, "[EnsureReaderClosed] Empty stream failed") } _ = stream.Close() } diff --git a/wal/hydro.go b/wal/hydro.go index 009b431e1..9719ae6f5 100644 --- a/wal/hydro.go +++ b/wal/hydro.go @@ -51,13 +51,13 @@ func (h *Hydro) Recover(ctx context.Context) { for { scanEntry, ok := <-ch if !ok { - log.Warn(nil, "[Recover] closed ch") // nolint + log.Warn(ctx, "[Recover] closed ch") break } event, err := h.decodeEvent(scanEntry) if err != nil { - log.Errorf(nil, err, "[Recover] decode event error: %v", err) // nolint + log.Error(ctx, err, "[Recover] decode event error") continue } events = append(events, event) @@ -66,12 +66,12 @@ func (h *Hydro) Recover(ctx context.Context) { for _, event := range events { handler, ok := h.getEventHandler(event.Type) if !ok { - log.Warn(nil, "[Recover] no such event handler for %s", event.Type) // nolint + log.Warn(ctx, "[Recover] no such event handler for %s", event.Type) continue } if err := h.recover(ctx, handler, event); err != nil { - log.Errorf(nil, err, "[Recover] handle event %d (%s) failed: %v", event.ID, event.Type, err) // nolint + log.Errorf(ctx, err, "[Recover] handle event %d (%s) failed", event.ID, event.Type) continue } } diff --git a/wal/kv/mocked.go b/wal/kv/mocked.go index af9fcdb22..52185d498 100644 --- a/wal/kv/mocked.go +++ b/wal/kv/mocked.go @@ -1,13 +1,13 @@ package kv import ( - "fmt" "os" "strings" "sync" "time" "github.com/cornelk/hashmap" + "github.com/pkg/errors" ) // MockedKV . @@ -58,8 +58,7 @@ func (m *MockedKV) Put(key, value []byte) (err error) { func (m *MockedKV) Get(key []byte) (value []byte, err error) { value, ok := m.pool.Get(string(key)) if !ok { - err = fmt.Errorf("no such key: %s", key) - return + return value, errors.Errorf("no such key: %s", key) } return }