diff --git a/core.go b/core.go index 1f6b21fa6..262b11ed3 100644 --- a/core.go +++ b/core.go @@ -72,6 +72,8 @@ func serve(c *cli.Context) error { return err } + factory.InitEngineCache(c.Context, config) + var t *testing.T if embeddedStorage { t = &testing.T{} @@ -133,9 +135,6 @@ func serve(c *cli.Context) error { ctx, cancel := signal.NotifyContext(c.Context, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) defer cancel() - // start engine cache checker - go factory.EngineCacheChecker(ctx, config.ConnectionTimeout) - <-ctx.Done() log.Info("[main] Interrupt by signal") diff --git a/engine/factory/factory.go b/engine/factory/factory.go index c3f028c9c..d4e92fe50 100644 --- a/engine/factory/factory.go +++ b/engine/factory/factory.go @@ -9,6 +9,7 @@ import ( "github.com/projecteru2/core/engine" "github.com/projecteru2/core/engine/docker" + "github.com/projecteru2/core/engine/fake" "github.com/projecteru2/core/engine/mocks/fakeengine" "github.com/projecteru2/core/engine/systemd" "github.com/projecteru2/core/engine/virt" @@ -28,18 +29,65 @@ var ( systemd.TCPPrefix: systemd.MakeClient, fakeengine.PrefixKey: fakeengine.MakeClient, } - engineCache = utils.NewEngineCache(12*time.Hour, 10*time.Minute) - keysToCheck = sync.Map{} + engineCache *EngineCache ) func getEngineCacheKey(endpoint, ca, cert, key string) string { return endpoint + "-" + utils.SHA256(fmt.Sprintf(":%v:%v:%v", ca, cert, key))[:8] } -// EngineCacheChecker checks if the engine in cache is available -func EngineCacheChecker(ctx context.Context, timeout time.Duration) { - log.Info("[EngineCacheChecker] starts") - defer log.Info("[EngineCacheChecker] ends") +type engineParams struct { + endpoint string + ca string + cert string + key string +} + +func (ep engineParams) getCacheKey() string { + return getEngineCacheKey(ep.endpoint, ep.ca, ep.cert, ep.key) +} + +type EngineCache struct { + cache *utils.EngineCache + keysToCheck sync.Map + config types.Config +} + +// NewEngineCache . +func NewEngineCache(config types.Config) *EngineCache { + return &EngineCache{ + cache: utils.NewEngineCache(12*time.Hour, 10*time.Minute), + keysToCheck: sync.Map{}, + config: config, + } +} + +// InitEngineCache init engine cache and start engine cache checker +func InitEngineCache(ctx context.Context, config types.Config) { + engineCache = NewEngineCache(config) + go engineCache.CheckAlive(ctx) +} + +// Get . +func (e *EngineCache) Get(key string) engine.API { + return e.cache.Get(key) +} + +// Set . +func (e *EngineCache) Set(params engineParams, client engine.API) { + e.cache.Set(params.getCacheKey(), client) + e.keysToCheck.Store(params, struct{}{}) +} + +// Delete . +func (e *EngineCache) Delete(key string) { + e.cache.Delete(key) +} + +// CheckAlive checks if the engine in cache is available +func (e *EngineCache) CheckAlive(ctx context.Context) { + log.Info("[EngineCache] starts") + defer log.Info("[EngineCache] ends") for { select { case <-ctx.Done(): @@ -47,25 +95,43 @@ func EngineCacheChecker(ctx context.Context, timeout time.Duration) { default: } - keysToRemove := []string{} - keysToCheck.Range(func(key, _ interface{}) bool { - cacheKey := key.(string) - client := engineCache.Get(cacheKey) - if client == nil { - keysToRemove = append(keysToRemove, cacheKey) + paramsChan := make(chan engineParams) + go func() { + e.keysToCheck.Range(func(key, _ interface{}) bool { + paramsChan <- key.(engineParams) return true - } - if err := validateEngine(ctx, client, timeout); err != nil { - log.Errorf(ctx, "[GetEngineFromCache] engine %v is unavailable, will be removed from cache, err: %v", cacheKey, err) - keysToRemove = append(keysToRemove, cacheKey) - } - return true - }) - for _, key := range keysToRemove { - engineCache.Delete(key) - keysToCheck.Delete(key) + }) + close(paramsChan) + }() + + pool := utils.NewGoroutinePool(int(e.config.MaxConcurrency)) + for params := range paramsChan { + params := params + pool.Go(ctx, func() { + cacheKey := params.getCacheKey() + client := e.cache.Get(cacheKey) + if client == nil { + e.cache.Delete(params.getCacheKey()) + e.keysToCheck.Delete(params) + return + } + if _, ok := client.(*fake.Engine); ok { + if newClient, err := newEngine(ctx, e.config, utils.RandomString(8), params.endpoint, params.ca, params.key, params.cert); err != nil { + log.Errorf(ctx, "[EngineCache] engine %v is still unavailable, err: %v", cacheKey, err) + } else { + e.cache.Set(cacheKey, newClient) + } + return + } + if err := validateEngine(ctx, client, e.config.ConnectionTimeout); err != nil { + log.Errorf(ctx, "[EngineCache] engine %v is unavailable, will be replaced with a fake engine, err: %v", cacheKey, err) + e.cache.Set(cacheKey, &fake.Engine{DefaultErr: err}) + } + }) } - time.Sleep(timeout) + + pool.Wait(ctx) + time.Sleep(e.config.ConnectionTimeout) } } @@ -88,21 +154,8 @@ func RemoveEngineFromCache(endpoint, ca, cert, key string) { engineCache.Delete(cacheKey) } -// GetEngine get engine -func GetEngine(ctx context.Context, config types.Config, nodename, endpoint, ca, cert, key string) (client engine.API, err error) { - if client = GetEngineFromCache(endpoint, ca, cert, key); client != nil { - return client, nil - } - - defer func() { - if err == nil && client != nil { - cacheKey := getEngineCacheKey(endpoint, ca, cert, key) - engineCache.Set(cacheKey, client) - keysToCheck.Store(cacheKey, struct{}{}) - log.Infof(ctx, "[GetEngine] store engine %v in cache", cacheKey) - } - }() - +// newEngine get engine +func newEngine(ctx context.Context, config types.Config, nodename, endpoint, ca, cert, key string) (client engine.API, err error) { prefix, err := getEnginePrefix(endpoint) if err != nil { return nil, err @@ -111,7 +164,10 @@ func GetEngine(ctx context.Context, config types.Config, nodename, endpoint, ca, if !ok { return nil, types.ErrNotSupport } - if client, err = e(ctx, config, nodename, endpoint, ca, cert, key); err != nil { + utils.WithTimeout(ctx, config.ConnectionTimeout, func(ctx context.Context) { + client, err = e(ctx, config, nodename, endpoint, ca, cert, key) + }) + if err != nil { return nil, err } if err = validateEngine(ctx, client, config.ConnectionTimeout); err != nil { @@ -121,6 +177,32 @@ func GetEngine(ctx context.Context, config types.Config, nodename, endpoint, ca, return client, nil } +// GetEngine get engine with cache +func GetEngine(ctx context.Context, config types.Config, nodename, endpoint, ca, cert, key string) (client engine.API, err error) { + if client = GetEngineFromCache(endpoint, ca, cert, key); client != nil { + return client, nil + } + + defer func() { + params := engineParams{ + endpoint: endpoint, + ca: ca, + cert: cert, + key: key, + } + cacheKey := params.getCacheKey() + if err == nil { + engineCache.Set(params, client) + log.Infof(ctx, "[GetEngine] store engine %v in cache", cacheKey) + } else { + engineCache.Set(params, &fake.Engine{DefaultErr: err}) + log.Infof(ctx, "[GetEngine] store fake engine %v in cache", cacheKey) + } + }() + + return newEngine(ctx, config, nodename, endpoint, ca, cert, key) +} + func getEnginePrefix(endpoint string) (string, error) { for prefix := range engines { if strings.HasPrefix(endpoint, prefix) { diff --git a/engine/fake/fake.go b/engine/fake/fake.go index f9b7992b5..2ec7d3f7e 100644 --- a/engine/fake/fake.go +++ b/engine/fake/fake.go @@ -7,100 +7,101 @@ import ( enginetypes "github.com/projecteru2/core/engine/types" coresource "github.com/projecteru2/core/source" - "github.com/projecteru2/core/types" ) // Engine to replace nil engine -type Engine struct{} +type Engine struct { + DefaultErr error +} // Info . func (f *Engine) Info(ctx context.Context) (*enginetypes.Info, error) { - return nil, types.ErrNilEngine + return nil, f.DefaultErr } // Ping . func (f *Engine) Ping(ctx context.Context) error { - return types.ErrNilEngine + return f.DefaultErr } // Execute . func (f *Engine) Execute(ctx context.Context, ID string, config *enginetypes.ExecConfig) (result string, stdout, stderr io.ReadCloser, stdin io.WriteCloser, err error) { - return "", nil, nil, nil, types.ErrNilEngine + return "", nil, nil, nil, f.DefaultErr } // ExecResize . func (f *Engine) ExecResize(ctx context.Context, ID, result string, height, width uint) (err error) { - return types.ErrNilEngine + return f.DefaultErr } // ExecExitCode . func (f *Engine) ExecExitCode(ctx context.Context, ID, result string) (int, error) { - return 0, types.ErrNilEngine + return 0, f.DefaultErr } // NetworkConnect . func (f *Engine) NetworkConnect(ctx context.Context, network, target, ipv4, ipv6 string) ([]string, error) { - return nil, types.ErrNilEngine + return nil, f.DefaultErr } // NetworkDisconnect . func (f *Engine) NetworkDisconnect(ctx context.Context, network, target string, force bool) error { - return types.ErrNilEngine + return f.DefaultErr } // NetworkList . func (f *Engine) NetworkList(ctx context.Context, drivers []string) ([]*enginetypes.Network, error) { - return nil, types.ErrNilEngine + return nil, f.DefaultErr } // ImageList . func (f *Engine) ImageList(ctx context.Context, image string) ([]*enginetypes.Image, error) { - return nil, types.ErrNilEngine + return nil, f.DefaultErr } // ImageRemove . func (f *Engine) ImageRemove(ctx context.Context, image string, force, prune bool) ([]string, error) { - return nil, types.ErrNilEngine + return nil, f.DefaultErr } // ImagesPrune . func (f *Engine) ImagesPrune(ctx context.Context) error { - return types.ErrNilEngine + return f.DefaultErr } // ImagePull . func (f *Engine) ImagePull(ctx context.Context, ref string, all bool) (io.ReadCloser, error) { - return nil, types.ErrNilEngine + return nil, f.DefaultErr } // ImagePush . func (f *Engine) ImagePush(ctx context.Context, ref string) (io.ReadCloser, error) { - return nil, types.ErrNilEngine + return nil, f.DefaultErr } // ImageBuild . func (f *Engine) ImageBuild(ctx context.Context, input io.Reader, refs []string) (io.ReadCloser, error) { - return nil, types.ErrNilEngine + return nil, f.DefaultErr } // ImageBuildCachePrune . func (f *Engine) ImageBuildCachePrune(ctx context.Context, all bool) (uint64, error) { - return 0, types.ErrNilEngine + return 0, f.DefaultErr } // ImageLocalDigests . func (f *Engine) ImageLocalDigests(ctx context.Context, image string) ([]string, error) { - return nil, types.ErrNilEngine + return nil, f.DefaultErr } // ImageRemoteDigest . func (f *Engine) ImageRemoteDigest(ctx context.Context, image string) (string, error) { - return "", types.ErrNilEngine + return "", f.DefaultErr } // ImageBuildFromExist . func (f *Engine) ImageBuildFromExist(ctx context.Context, ID string, refs []string, user string) (string, error) { - return "", types.ErrNilEngine + return "", f.DefaultErr } // BuildRefs . @@ -110,75 +111,75 @@ func (f *Engine) BuildRefs(ctx context.Context, opts *enginetypes.BuildRefOption // BuildContent . func (f *Engine) BuildContent(ctx context.Context, scm coresource.Source, opts *enginetypes.BuildContentOptions) (string, io.Reader, error) { - return "", nil, types.ErrNilEngine + return "", nil, f.DefaultErr } // VirtualizationCreate . func (f *Engine) VirtualizationCreate(ctx context.Context, opts *enginetypes.VirtualizationCreateOptions) (*enginetypes.VirtualizationCreated, error) { - return nil, types.ErrNilEngine + return nil, f.DefaultErr } // VirtualizationResourceRemap . func (f *Engine) VirtualizationResourceRemap(ctx context.Context, options *enginetypes.VirtualizationRemapOptions) (<-chan enginetypes.VirtualizationRemapMessage, error) { - return nil, types.ErrNilEngine + return nil, f.DefaultErr } // VirtualizationCopyTo . func (f *Engine) VirtualizationCopyTo(ctx context.Context, ID, target string, content []byte, uid, gid int, mode int64) error { - return types.ErrNilEngine + return f.DefaultErr } // VirtualizationStart . func (f *Engine) VirtualizationStart(ctx context.Context, ID string) error { - return types.ErrNilEngine + return f.DefaultErr } // VirtualizationStop . func (f *Engine) VirtualizationStop(ctx context.Context, ID string, gracefulTimeout time.Duration) error { - return types.ErrNilEngine + return f.DefaultErr } // VirtualizationRemove . func (f *Engine) VirtualizationRemove(ctx context.Context, ID string, volumes, force bool) error { - return types.ErrNilEngine + return f.DefaultErr } // VirtualizationInspect . func (f *Engine) VirtualizationInspect(ctx context.Context, ID string) (*enginetypes.VirtualizationInfo, error) { - return nil, types.ErrNilEngine + return nil, f.DefaultErr } // VirtualizationLogs . func (f *Engine) VirtualizationLogs(ctx context.Context, opts *enginetypes.VirtualizationLogStreamOptions) (stdout, stderr io.ReadCloser, err error) { - return nil, nil, types.ErrNilEngine + return nil, nil, f.DefaultErr } // VirtualizationAttach . func (f *Engine) VirtualizationAttach(ctx context.Context, ID string, stream, openStdin bool) (stdout, stderr io.ReadCloser, stdin io.WriteCloser, err error) { - return nil, nil, nil, types.ErrNilEngine + return nil, nil, nil, f.DefaultErr } // VirtualizationResize . func (f *Engine) VirtualizationResize(ctx context.Context, ID string, height, width uint) error { - return types.ErrNilEngine + return f.DefaultErr } // VirtualizationWait . func (f *Engine) VirtualizationWait(ctx context.Context, ID, state string) (*enginetypes.VirtualizationWaitResult, error) { - return nil, types.ErrNilEngine + return nil, f.DefaultErr } // VirtualizationUpdateResource . func (f *Engine) VirtualizationUpdateResource(ctx context.Context, ID string, opts *enginetypes.VirtualizationResource) error { - return types.ErrNilEngine + return f.DefaultErr } // VirtualizationCopyFrom . func (f *Engine) VirtualizationCopyFrom(ctx context.Context, ID, path string) (content []byte, uid, gid int, mode int64, _ error) { - return nil, 0, 0, 0, types.ErrNilEngine + return nil, 0, 0, 0, f.DefaultErr } // ResourceValidate . func (f *Engine) ResourceValidate(ctx context.Context, cpu float64, cpumap map[string]int64, memory, storage int64) error { - return types.ErrNilEngine + return f.DefaultErr } diff --git a/store/etcdv3/mercury_test.go b/store/etcdv3/mercury_test.go index 2bdc76c56..7d59d48f8 100644 --- a/store/etcdv3/mercury_test.go +++ b/store/etcdv3/mercury_test.go @@ -1,9 +1,11 @@ package etcdv3 import ( + "context" "testing" "time" + "github.com/projecteru2/core/engine/factory" "github.com/projecteru2/core/types" "github.com/stretchr/testify/assert" @@ -21,6 +23,10 @@ func NewMercury(t *testing.T) *Mercury { config.MaxConcurrency = 20 // config.Docker.CertPath = "/tmp" + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + factory.InitEngineCache(ctx, config) + m, err := New(config, t) assert.NoError(t, err) return m diff --git a/store/etcdv3/node.go b/store/etcdv3/node.go index 267f7939c..843fc013e 100644 --- a/store/etcdv3/node.go +++ b/store/etcdv3/node.go @@ -299,7 +299,7 @@ func (m *Mercury) doGetNodes(ctx context.Context, kvs []*mvccpb.KeyValue, labels return nil, err } node.Init() - node.Engine = &fake.Engine{} + node.Engine = &fake.Engine{DefaultErr: types.ErrNilEngine} if utils.FilterWorkload(node.Labels, labels) { allNodes = append(allNodes, node) } diff --git a/store/redis/node.go b/store/redis/node.go index 19ab2fffc..1f3e49091 100644 --- a/store/redis/node.go +++ b/store/redis/node.go @@ -289,7 +289,7 @@ func (r *Rediaron) doGetNodes(ctx context.Context, kvs map[string]string, labels return nil, err } node.Init() - node.Engine = &fake.Engine{} + node.Engine = &fake.Engine{DefaultErr: types.ErrNilEngine} if utils.FilterWorkload(node.Labels, labels) { allNodes = append(allNodes, node) } diff --git a/store/redis/rediaron_test.go b/store/redis/rediaron_test.go index 3fd5f321a..293323b92 100644 --- a/store/redis/rediaron_test.go +++ b/store/redis/rediaron_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "github.com/projecteru2/core/engine/factory" "github.com/projecteru2/core/types" "github.com/alicebob/miniredis/v2" @@ -84,6 +85,10 @@ func TestRediaron(t *testing.T) { config.GlobalTimeout = 30 * time.Second config.MaxConcurrency = 20 + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + factory.InitEngineCache(ctx, config) + cli := redis.NewClient(&redis.Options{ Addr: s.Addr(), DB: 0,