Skip to content

Commit

Permalink
optimize engine cache checker (#539)
Browse files Browse the repository at this point in the history
  • Loading branch information
DuodenumL authored Jan 19, 2022
1 parent 0f54c24 commit d145b4c
Show file tree
Hide file tree
Showing 7 changed files with 172 additions and 79 deletions.
5 changes: 2 additions & 3 deletions core.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ func serve(c *cli.Context) error {
return err
}

factory.InitEngineCache(c.Context, config)

var t *testing.T
if embeddedStorage {
t = &testing.T{}
Expand Down Expand Up @@ -133,9 +135,6 @@ func serve(c *cli.Context) error {
ctx, cancel := signal.NotifyContext(c.Context, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
defer cancel()

// start engine cache checker
go factory.EngineCacheChecker(ctx, config.ConnectionTimeout)

<-ctx.Done()

log.Info("[main] Interrupt by signal")
Expand Down
160 changes: 121 additions & 39 deletions engine/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/projecteru2/core/engine"
"github.com/projecteru2/core/engine/docker"
"github.com/projecteru2/core/engine/fake"
"github.com/projecteru2/core/engine/mocks/fakeengine"
"github.com/projecteru2/core/engine/systemd"
"github.com/projecteru2/core/engine/virt"
Expand All @@ -28,44 +29,109 @@ var (
systemd.TCPPrefix: systemd.MakeClient,
fakeengine.PrefixKey: fakeengine.MakeClient,
}
engineCache = utils.NewEngineCache(12*time.Hour, 10*time.Minute)
keysToCheck = sync.Map{}
engineCache *EngineCache
)

func getEngineCacheKey(endpoint, ca, cert, key string) string {
return endpoint + "-" + utils.SHA256(fmt.Sprintf(":%v:%v:%v", ca, cert, key))[:8]
}

// EngineCacheChecker checks if the engine in cache is available
func EngineCacheChecker(ctx context.Context, timeout time.Duration) {
log.Info("[EngineCacheChecker] starts")
defer log.Info("[EngineCacheChecker] ends")
type engineParams struct {
endpoint string
ca string
cert string
key string
}

func (ep engineParams) getCacheKey() string {
return getEngineCacheKey(ep.endpoint, ep.ca, ep.cert, ep.key)
}

type EngineCache struct {
cache *utils.EngineCache
keysToCheck sync.Map
config types.Config
}

// NewEngineCache .
func NewEngineCache(config types.Config) *EngineCache {
return &EngineCache{
cache: utils.NewEngineCache(12*time.Hour, 10*time.Minute),
keysToCheck: sync.Map{},
config: config,
}
}

// InitEngineCache init engine cache and start engine cache checker
func InitEngineCache(ctx context.Context, config types.Config) {
engineCache = NewEngineCache(config)
go engineCache.CheckAlive(ctx)
}

// Get .
func (e *EngineCache) Get(key string) engine.API {
return e.cache.Get(key)
}

// Set .
func (e *EngineCache) Set(params engineParams, client engine.API) {
e.cache.Set(params.getCacheKey(), client)
e.keysToCheck.Store(params, struct{}{})
}

// Delete .
func (e *EngineCache) Delete(key string) {
e.cache.Delete(key)
}

// CheckAlive checks if the engine in cache is available
func (e *EngineCache) CheckAlive(ctx context.Context) {
log.Info("[EngineCache] starts")
defer log.Info("[EngineCache] ends")
for {
select {
case <-ctx.Done():
return
default:
}

keysToRemove := []string{}
keysToCheck.Range(func(key, _ interface{}) bool {
cacheKey := key.(string)
client := engineCache.Get(cacheKey)
if client == nil {
keysToRemove = append(keysToRemove, cacheKey)
paramsChan := make(chan engineParams)
go func() {
e.keysToCheck.Range(func(key, _ interface{}) bool {
paramsChan <- key.(engineParams)
return true
}
if err := validateEngine(ctx, client, timeout); err != nil {
log.Errorf(ctx, "[GetEngineFromCache] engine %v is unavailable, will be removed from cache, err: %v", cacheKey, err)
keysToRemove = append(keysToRemove, cacheKey)
}
return true
})
for _, key := range keysToRemove {
engineCache.Delete(key)
keysToCheck.Delete(key)
})
close(paramsChan)
}()

pool := utils.NewGoroutinePool(int(e.config.MaxConcurrency))
for params := range paramsChan {
params := params
pool.Go(ctx, func() {
cacheKey := params.getCacheKey()
client := e.cache.Get(cacheKey)
if client == nil {
e.cache.Delete(params.getCacheKey())
e.keysToCheck.Delete(params)
return
}
if _, ok := client.(*fake.Engine); ok {
if newClient, err := newEngine(ctx, e.config, utils.RandomString(8), params.endpoint, params.ca, params.key, params.cert); err != nil {
log.Errorf(ctx, "[EngineCache] engine %v is still unavailable, err: %v", cacheKey, err)
} else {
e.cache.Set(cacheKey, newClient)
}
return
}
if err := validateEngine(ctx, client, e.config.ConnectionTimeout); err != nil {
log.Errorf(ctx, "[EngineCache] engine %v is unavailable, will be replaced with a fake engine, err: %v", cacheKey, err)
e.cache.Set(cacheKey, &fake.Engine{DefaultErr: err})
}
})
}
time.Sleep(timeout)

pool.Wait(ctx)
time.Sleep(e.config.ConnectionTimeout)
}
}

Expand All @@ -88,21 +154,8 @@ func RemoveEngineFromCache(endpoint, ca, cert, key string) {
engineCache.Delete(cacheKey)
}

// GetEngine get engine
func GetEngine(ctx context.Context, config types.Config, nodename, endpoint, ca, cert, key string) (client engine.API, err error) {
if client = GetEngineFromCache(endpoint, ca, cert, key); client != nil {
return client, nil
}

defer func() {
if err == nil && client != nil {
cacheKey := getEngineCacheKey(endpoint, ca, cert, key)
engineCache.Set(cacheKey, client)
keysToCheck.Store(cacheKey, struct{}{})
log.Infof(ctx, "[GetEngine] store engine %v in cache", cacheKey)
}
}()

// newEngine get engine
func newEngine(ctx context.Context, config types.Config, nodename, endpoint, ca, cert, key string) (client engine.API, err error) {
prefix, err := getEnginePrefix(endpoint)
if err != nil {
return nil, err
Expand All @@ -111,7 +164,10 @@ func GetEngine(ctx context.Context, config types.Config, nodename, endpoint, ca,
if !ok {
return nil, types.ErrNotSupport
}
if client, err = e(ctx, config, nodename, endpoint, ca, cert, key); err != nil {
utils.WithTimeout(ctx, config.ConnectionTimeout, func(ctx context.Context) {
client, err = e(ctx, config, nodename, endpoint, ca, cert, key)
})
if err != nil {
return nil, err
}
if err = validateEngine(ctx, client, config.ConnectionTimeout); err != nil {
Expand All @@ -121,6 +177,32 @@ func GetEngine(ctx context.Context, config types.Config, nodename, endpoint, ca,
return client, nil
}

// GetEngine get engine with cache
func GetEngine(ctx context.Context, config types.Config, nodename, endpoint, ca, cert, key string) (client engine.API, err error) {
if client = GetEngineFromCache(endpoint, ca, cert, key); client != nil {
return client, nil
}

defer func() {
params := engineParams{
endpoint: endpoint,
ca: ca,
cert: cert,
key: key,
}
cacheKey := params.getCacheKey()
if err == nil {
engineCache.Set(params, client)
log.Infof(ctx, "[GetEngine] store engine %v in cache", cacheKey)
} else {
engineCache.Set(params, &fake.Engine{DefaultErr: err})
log.Infof(ctx, "[GetEngine] store fake engine %v in cache", cacheKey)
}
}()

return newEngine(ctx, config, nodename, endpoint, ca, cert, key)
}

func getEnginePrefix(endpoint string) (string, error) {
for prefix := range engines {
if strings.HasPrefix(endpoint, prefix) {
Expand Down
Loading

0 comments on commit d145b4c

Please sign in to comment.