Skip to content

Commit

Permalink
minor modify fake engine
Browse files Browse the repository at this point in the history
  • Loading branch information
CMGS committed Dec 19, 2022
1 parent bb7c652 commit 4628b85
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 48 deletions.
2 changes: 1 addition & 1 deletion cluster/calcium/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*typ
}
var n *types.Node
return n, c.withNodePodLocked(ctx, opts.Nodename, func(ctx context.Context, node *types.Node) error {
logger.Infof(ctx, "set node")
logger.Info(ctx, "set node")
// update resource map
var err error
node.Resource.Capacity, node.Resource.Usage, node.Resource.Diffs, err = c.rmgr.GetNodeResourceInfo(ctx, node.Name, nil, false)
Expand Down
12 changes: 7 additions & 5 deletions engine/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,17 +113,17 @@ func (e *EngineCache) CheckAlive(ctx context.Context) {
e.keysToCheck.Del(uintptr(unsafe.Pointer(&params)))
return
}
if _, ok := client.(*fake.Engine); ok {
if newClient, err := newEngine(ctx, e.config, utils.RandomString(8), params.endpoint, params.ca, params.key, params.cert); err != nil {
if _, ok := client.(*fake.EngineWithErr); ok {
if newClient, err := newEngine(ctx, e.config, params.nodename, params.endpoint, params.ca, params.key, params.cert); err != nil {
logger.Errorf(ctx, err, "engine %+v is still unavailable", cacheKey)
} else {
e.cache.Set(cacheKey, newClient)
}
return
}
if err := validateEngine(ctx, client, e.config.ConnectionTimeout); err != nil {
logger.Errorf(ctx, err, "engine %+v is unavailable, will be replaced with a fake engine", cacheKey)
e.cache.Set(cacheKey, &fake.Engine{DefaultErr: err})
logger.Errorf(ctx, err, "engine %+v is unavailable, will be replaced and removed", cacheKey)
e.cache.Set(cacheKey, &fake.EngineWithErr{DefaultErr: err})
}
})
}
Expand Down Expand Up @@ -153,6 +153,7 @@ func GetEngine(ctx context.Context, config types.Config, nodename, endpoint, ca,

defer func() {
params := engineParams{
nodename: nodename,
endpoint: endpoint,
ca: ca,
cert: cert,
Expand All @@ -163,7 +164,7 @@ func GetEngine(ctx context.Context, config types.Config, nodename, endpoint, ca,
engineCache.Set(params, client)
logger.Infof(ctx, "store engine %+v in cache", cacheKey)
} else {
engineCache.Set(params, &fake.Engine{DefaultErr: err})
engineCache.Set(params, &fake.EngineWithErr{DefaultErr: err})
logger.Infof(ctx, "store fake engine %+v in cache", cacheKey)
}
}()
Expand All @@ -172,6 +173,7 @@ func GetEngine(ctx context.Context, config types.Config, nodename, endpoint, ca,
}

type engineParams struct {
nodename string
endpoint string
ca string
cert string
Expand Down
72 changes: 36 additions & 36 deletions engine/fake/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,177 +9,177 @@ import (
coresource "github.com/projecteru2/core/source"
)

// Engine to replace nil engine
type Engine struct {
// EngineWithErr use to mock the nil engine
type EngineWithErr struct {
DefaultErr error
}

// Info .
func (f *Engine) Info(ctx context.Context) (*enginetypes.Info, error) {
func (f *EngineWithErr) Info(ctx context.Context) (*enginetypes.Info, error) {
return nil, f.DefaultErr
}

// Ping .
func (f *Engine) Ping(ctx context.Context) error {
func (f *EngineWithErr) Ping(ctx context.Context) error {
return f.DefaultErr
}

// CloseConn .
func (f *Engine) CloseConn() error {
func (f *EngineWithErr) CloseConn() error {
return nil
}

// Execute .
func (f *Engine) Execute(ctx context.Context, ID string, config *enginetypes.ExecConfig) (execID string, stdout, stderr io.ReadCloser, stdin io.WriteCloser, err error) {
func (f *EngineWithErr) Execute(ctx context.Context, ID string, config *enginetypes.ExecConfig) (execID string, stdout, stderr io.ReadCloser, stdin io.WriteCloser, err error) {
return "", nil, nil, nil, f.DefaultErr
}

// ExecResize .
func (f *Engine) ExecResize(ctx context.Context, execID string, height, width uint) (err error) {
func (f *EngineWithErr) ExecResize(ctx context.Context, execID string, height, width uint) (err error) {
return f.DefaultErr
}

// ExecExitCode .
func (f *Engine) ExecExitCode(ctx context.Context, ID, result string) (int, error) {
func (f *EngineWithErr) ExecExitCode(ctx context.Context, ID, result string) (int, error) {
return 0, f.DefaultErr
}

// NetworkConnect .
func (f *Engine) NetworkConnect(ctx context.Context, network, target, ipv4, ipv6 string) ([]string, error) {
func (f *EngineWithErr) NetworkConnect(ctx context.Context, network, target, ipv4, ipv6 string) ([]string, error) {
return nil, f.DefaultErr
}

// NetworkDisconnect .
func (f *Engine) NetworkDisconnect(ctx context.Context, network, target string, force bool) error {
func (f *EngineWithErr) NetworkDisconnect(ctx context.Context, network, target string, force bool) error {
return f.DefaultErr
}

// NetworkList .
func (f *Engine) NetworkList(ctx context.Context, drivers []string) ([]*enginetypes.Network, error) {
func (f *EngineWithErr) NetworkList(ctx context.Context, drivers []string) ([]*enginetypes.Network, error) {
return nil, f.DefaultErr
}

// ImageList .
func (f *Engine) ImageList(ctx context.Context, image string) ([]*enginetypes.Image, error) {
func (f *EngineWithErr) ImageList(ctx context.Context, image string) ([]*enginetypes.Image, error) {
return nil, f.DefaultErr
}

// ImageRemove .
func (f *Engine) ImageRemove(ctx context.Context, image string, force, prune bool) ([]string, error) {
func (f *EngineWithErr) ImageRemove(ctx context.Context, image string, force, prune bool) ([]string, error) {
return nil, f.DefaultErr
}

// ImagesPrune .
func (f *Engine) ImagesPrune(ctx context.Context) error {
func (f *EngineWithErr) ImagesPrune(ctx context.Context) error {
return f.DefaultErr
}

// ImagePull .
func (f *Engine) ImagePull(ctx context.Context, ref string, all bool) (io.ReadCloser, error) {
func (f *EngineWithErr) ImagePull(ctx context.Context, ref string, all bool) (io.ReadCloser, error) {
return nil, f.DefaultErr
}

// ImagePush .
func (f *Engine) ImagePush(ctx context.Context, ref string) (io.ReadCloser, error) {
func (f *EngineWithErr) ImagePush(ctx context.Context, ref string) (io.ReadCloser, error) {
return nil, f.DefaultErr
}

// ImageBuild .
func (f *Engine) ImageBuild(ctx context.Context, input io.Reader, refs []string, _ string) (io.ReadCloser, error) {
func (f *EngineWithErr) ImageBuild(ctx context.Context, input io.Reader, refs []string, _ string) (io.ReadCloser, error) {
return nil, f.DefaultErr
}

// ImageBuildCachePrune .
func (f *Engine) ImageBuildCachePrune(ctx context.Context, all bool) (uint64, error) {
func (f *EngineWithErr) ImageBuildCachePrune(ctx context.Context, all bool) (uint64, error) {
return 0, f.DefaultErr
}

// ImageLocalDigests .
func (f *Engine) ImageLocalDigests(ctx context.Context, image string) ([]string, error) {
func (f *EngineWithErr) ImageLocalDigests(ctx context.Context, image string) ([]string, error) {
return nil, f.DefaultErr
}

// ImageRemoteDigest .
func (f *Engine) ImageRemoteDigest(ctx context.Context, image string) (string, error) {
func (f *EngineWithErr) ImageRemoteDigest(ctx context.Context, image string) (string, error) {
return "", f.DefaultErr
}

// ImageBuildFromExist .
func (f *Engine) ImageBuildFromExist(ctx context.Context, ID string, refs []string, user string) (string, error) {
func (f *EngineWithErr) ImageBuildFromExist(ctx context.Context, ID string, refs []string, user string) (string, error) {
return "", f.DefaultErr
}

// BuildRefs .
func (f *Engine) BuildRefs(ctx context.Context, opts *enginetypes.BuildRefOptions) []string {
func (f *EngineWithErr) BuildRefs(ctx context.Context, opts *enginetypes.BuildRefOptions) []string {
return nil
}

// BuildContent .
func (f *Engine) BuildContent(ctx context.Context, scm coresource.Source, opts *enginetypes.BuildContentOptions) (string, io.Reader, error) {
func (f *EngineWithErr) BuildContent(ctx context.Context, scm coresource.Source, opts *enginetypes.BuildContentOptions) (string, io.Reader, error) {
return "", nil, f.DefaultErr
}

// VirtualizationCreate .
func (f *Engine) VirtualizationCreate(ctx context.Context, opts *enginetypes.VirtualizationCreateOptions) (*enginetypes.VirtualizationCreated, error) {
func (f *EngineWithErr) VirtualizationCreate(ctx context.Context, opts *enginetypes.VirtualizationCreateOptions) (*enginetypes.VirtualizationCreated, error) {
return nil, f.DefaultErr
}

// VirtualizationResourceRemap .
func (f *Engine) VirtualizationResourceRemap(ctx context.Context, options *enginetypes.VirtualizationRemapOptions) (<-chan enginetypes.VirtualizationRemapMessage, error) {
func (f *EngineWithErr) VirtualizationResourceRemap(ctx context.Context, options *enginetypes.VirtualizationRemapOptions) (<-chan enginetypes.VirtualizationRemapMessage, error) {
return nil, f.DefaultErr
}

// VirtualizationCopyTo .
func (f *Engine) VirtualizationCopyTo(ctx context.Context, ID, target string, content []byte, uid, gid int, mode int64) error {
func (f *EngineWithErr) VirtualizationCopyTo(ctx context.Context, ID, target string, content []byte, uid, gid int, mode int64) error {
return f.DefaultErr
}

// VirtualizationStart .
func (f *Engine) VirtualizationStart(ctx context.Context, ID string) error {
func (f *EngineWithErr) VirtualizationStart(ctx context.Context, ID string) error {
return f.DefaultErr
}

// VirtualizationStop .
func (f *Engine) VirtualizationStop(ctx context.Context, ID string, gracefulTimeout time.Duration) error {
func (f *EngineWithErr) VirtualizationStop(ctx context.Context, ID string, gracefulTimeout time.Duration) error {
return f.DefaultErr
}

// VirtualizationRemove .
func (f *Engine) VirtualizationRemove(ctx context.Context, ID string, volumes, force bool) error {
func (f *EngineWithErr) VirtualizationRemove(ctx context.Context, ID string, volumes, force bool) error {
return f.DefaultErr
}

// VirtualizationInspect .
func (f *Engine) VirtualizationInspect(ctx context.Context, ID string) (*enginetypes.VirtualizationInfo, error) {
func (f *EngineWithErr) VirtualizationInspect(ctx context.Context, ID string) (*enginetypes.VirtualizationInfo, error) {
return nil, f.DefaultErr
}

// VirtualizationLogs .
func (f *Engine) VirtualizationLogs(ctx context.Context, opts *enginetypes.VirtualizationLogStreamOptions) (stdout, stderr io.ReadCloser, err error) {
func (f *EngineWithErr) VirtualizationLogs(ctx context.Context, opts *enginetypes.VirtualizationLogStreamOptions) (stdout, stderr io.ReadCloser, err error) {
return nil, nil, f.DefaultErr
}

// VirtualizationAttach .
func (f *Engine) VirtualizationAttach(ctx context.Context, ID string, stream, openStdin bool) (stdout, stderr io.ReadCloser, stdin io.WriteCloser, err error) {
func (f *EngineWithErr) VirtualizationAttach(ctx context.Context, ID string, stream, openStdin bool) (stdout, stderr io.ReadCloser, stdin io.WriteCloser, err error) {
return nil, nil, nil, f.DefaultErr
}

// VirtualizationResize .
func (f *Engine) VirtualizationResize(ctx context.Context, ID string, height, width uint) error {
func (f *EngineWithErr) VirtualizationResize(ctx context.Context, ID string, height, width uint) error {
return f.DefaultErr
}

// VirtualizationWait .
func (f *Engine) VirtualizationWait(ctx context.Context, ID, state string) (*enginetypes.VirtualizationWaitResult, error) {
func (f *EngineWithErr) VirtualizationWait(ctx context.Context, ID, state string) (*enginetypes.VirtualizationWaitResult, error) {
return nil, f.DefaultErr
}

// VirtualizationUpdateResource .
func (f *Engine) VirtualizationUpdateResource(ctx context.Context, ID string, opts *enginetypes.VirtualizationResource) error {
func (f *EngineWithErr) VirtualizationUpdateResource(ctx context.Context, ID string, opts *enginetypes.VirtualizationResource) error {
return f.DefaultErr
}

// VirtualizationCopyFrom .
func (f *Engine) VirtualizationCopyFrom(ctx context.Context, ID, path string) (content []byte, uid, gid int, mode int64, _ error) {
func (f *EngineWithErr) VirtualizationCopyFrom(ctx context.Context, ID, path string) (content []byte, uid, gid int, mode int64, _ error) {
return nil, 0, 0, 0, f.DefaultErr
}
8 changes: 4 additions & 4 deletions selfmon/selfmon.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (n *NodeStatusWatcher) run(ctx context.Context) {
default:
n.withActiveLock(ctx, func(ctx context.Context) {
if err := n.monitor(ctx); err != nil {
log.WithFunc("selfmon.run").Errorf(ctx, err, "%+v stops watching", n.ID)
log.WithFunc("selfmon.run").Errorf(ctx, err, "stops watching node id %+v", n.ID)
}
})
time.Sleep(n.config.ConnectionTimeout)
Expand Down Expand Up @@ -102,7 +102,7 @@ func (n *NodeStatusWatcher) withActiveLock(parentCtx context.Context, f func(ctx
retryCounter = (retryCounter + 1) % 60
time.Sleep(time.Second)
} else {
logger.Infof(ctx, "node status watcher has been active")
logger.Info(ctx, "node status watcher has been active")
expiry = ne
unregister = un
break
Expand Down Expand Up @@ -182,8 +182,8 @@ func (n *NodeStatusWatcher) monitor(ctx context.Context) error {

// monitor node status
messageChan := n.cluster.NodeStatusStream(ctx)
logger.Info(ctx, "watch node status started", n.ID)
defer logger.Info(ctx, "stop watching node status", n.ID)
logger.Info(ctx, "watch node status started")
defer logger.Info(ctx, "stop watching node status")

for {
select {
Expand Down
2 changes: 1 addition & 1 deletion store/etcdv3/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ func (m *Mercury) doGetNodes(ctx context.Context, kvs []*mvccpb.KeyValue, labels
if err := json.Unmarshal(ev.Value, node); err != nil {
return nil, err
}
node.Engine = &fake.Engine{DefaultErr: types.ErrNilEngine}
node.Engine = &fake.EngineWithErr{DefaultErr: types.ErrNilEngine}
if utils.LabelsFilter(node.Labels, labels) {
allNodes = append(allNodes, node)
}
Expand Down
2 changes: 1 addition & 1 deletion store/redis/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func (r *Rediaron) doGetNodes(ctx context.Context, kvs map[string]string, labels
if err := json.Unmarshal([]byte(value), node); err != nil {
return nil, err
}
node.Engine = &fake.Engine{DefaultErr: types.ErrNilEngine}
node.Engine = &fake.EngineWithErr{DefaultErr: types.ErrNilEngine}
if utils.LabelsFilter(node.Labels, labels) {
allNodes = append(allNodes, node)
}
Expand Down

0 comments on commit 4628b85

Please sign in to comment.