Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize engine cache checker #542

Merged
merged 1 commit into from
Jan 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions core.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ func serve(c *cli.Context) error {
return err
}

// init engine cache and start engine cache checker
factory.InitEngineCache(c.Context, config)

var t *testing.T
if embeddedStorage {
t = &testing.T{}
Expand Down Expand Up @@ -137,9 +140,6 @@ func serve(c *cli.Context) error {
// start node status checker
go selfmon.RunNodeStatusWatcher(ctx, config, cluster, t)

// start engine cache checker
go factory.EngineCacheChecker(ctx, config.ConnectionTimeout)

<-ctx.Done()

log.Info("[main] Interrupt by signal")
Expand Down
161 changes: 122 additions & 39 deletions engine/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/projecteru2/core/engine"
"github.com/projecteru2/core/engine/docker"
"github.com/projecteru2/core/engine/fake"
"github.com/projecteru2/core/engine/mocks/fakeengine"
"github.com/projecteru2/core/engine/systemd"
"github.com/projecteru2/core/engine/virt"
Expand All @@ -28,44 +29,110 @@ var (
systemd.TCPPrefix: systemd.MakeClient,
fakeengine.PrefixKey: fakeengine.MakeClient,
}
engineCache = utils.NewEngineCache(12*time.Hour, 10*time.Minute)
keysToCheck = sync.Map{}
engineCache *EngineCache
)

func getEngineCacheKey(endpoint, ca, cert, key string) string {
return fmt.Sprintf("%v-%v", endpoint, utils.SHA256(fmt.Sprintf(":%v:%v:%v", ca, cert, key))[:8])
}

// EngineCacheChecker checks if the engine in cache is available
func EngineCacheChecker(ctx context.Context, timeout time.Duration) {
log.Info("[EngineCacheChecker] starts")
defer log.Info("[EngineCacheChecker] ends")
type engineParams struct {
endpoint string
ca string
cert string
key string
}

func (ep engineParams) getCacheKey() string {
return getEngineCacheKey(ep.endpoint, ep.ca, ep.cert, ep.key)
}

// EngineCache .
type EngineCache struct {
cache *utils.EngineCache
keysToCheck sync.Map
config types.Config
}

// NewEngineCache .
func NewEngineCache(config types.Config) *EngineCache {
return &EngineCache{
cache: utils.NewEngineCache(12*time.Hour, 10*time.Minute),
keysToCheck: sync.Map{},
config: config,
}
}

// InitEngineCache init engine cache and start engine cache checker
func InitEngineCache(ctx context.Context, config types.Config) {
engineCache = NewEngineCache(config)
go engineCache.CheckAlive(ctx)
}

// Get .
func (e *EngineCache) Get(key string) engine.API {
return e.cache.Get(key)
}

// Set .
func (e *EngineCache) Set(params engineParams, client engine.API) {
e.cache.Set(params.getCacheKey(), client)
e.keysToCheck.Store(params, struct{}{})
}

// Delete .
func (e *EngineCache) Delete(key string) {
e.cache.Delete(key)
}

// CheckAlive checks if the engine in cache is available
func (e *EngineCache) CheckAlive(ctx context.Context) {
log.Info("[EngineCache] starts")
defer log.Info("[EngineCache] ends")
for {
select {
case <-ctx.Done():
return
default:
}

keysToRemove := []string{}
keysToCheck.Range(func(key, _ interface{}) bool {
cacheKey := key.(string)
client := engineCache.Get(cacheKey)
if client == nil {
keysToRemove = append(keysToRemove, cacheKey)
paramsChan := make(chan engineParams)
go func() {
e.keysToCheck.Range(func(key, _ interface{}) bool {
paramsChan <- key.(engineParams)
return true
}
if err := validateEngine(ctx, client, timeout); err != nil {
log.Errorf(ctx, "[GetEngineFromCache] engine %v is unavailable, will be removed from cache, err: %v", cacheKey, err)
keysToRemove = append(keysToRemove, cacheKey)
}
return true
})
for _, key := range keysToRemove {
engineCache.Delete(key)
keysToCheck.Delete(key)
})
close(paramsChan)
}()

pool := utils.NewGoroutinePool(int(e.config.MaxConcurrency))
for params := range paramsChan {
params := params
pool.Go(ctx, func() {
cacheKey := params.getCacheKey()
client := e.cache.Get(cacheKey)
if client == nil {
e.cache.Delete(params.getCacheKey())
e.keysToCheck.Delete(params)
return
}
if _, ok := client.(*fake.Engine); ok {
if newClient, err := newEngine(ctx, e.config, utils.RandomString(8), params.endpoint, params.ca, params.key, params.cert); err != nil {
log.Errorf(ctx, "[EngineCache] engine %v is still unavailable, err: %v", cacheKey, err)
} else {
e.cache.Set(cacheKey, newClient)
}
return
}
if err := validateEngine(ctx, client, e.config.ConnectionTimeout); err != nil {
log.Errorf(ctx, "[EngineCache] engine %v is unavailable, will be replaced with a fake engine, err: %v", cacheKey, err)
e.cache.Set(cacheKey, &fake.Engine{DefaultErr: err})
}
})
}
time.Sleep(timeout)

pool.Wait(ctx)
time.Sleep(e.config.ConnectionTimeout)
}
}

Expand All @@ -88,21 +155,8 @@ func RemoveEngineFromCache(endpoint, ca, cert, key string) {
engineCache.Delete(cacheKey)
}

// GetEngine get engine
func GetEngine(ctx context.Context, config types.Config, nodename, endpoint, ca, cert, key string) (client engine.API, err error) {
if client = GetEngineFromCache(endpoint, ca, cert, key); client != nil {
return client, nil
}

defer func() {
if err == nil && client != nil {
cacheKey := getEngineCacheKey(endpoint, ca, cert, key)
engineCache.Set(cacheKey, client)
keysToCheck.Store(cacheKey, struct{}{})
log.Infof(ctx, "[GetEngine] store engine %v in cache", cacheKey)
}
}()

// newEngine get engine
func newEngine(ctx context.Context, config types.Config, nodename, endpoint, ca, cert, key string) (client engine.API, err error) {
prefix, err := getEnginePrefix(endpoint)
if err != nil {
return nil, err
Expand All @@ -111,7 +165,10 @@ func GetEngine(ctx context.Context, config types.Config, nodename, endpoint, ca,
if !ok {
return nil, types.ErrNotSupport
}
if client, err = e(ctx, config, nodename, endpoint, ca, cert, key); err != nil {
utils.WithTimeout(ctx, config.ConnectionTimeout, func(ctx context.Context) {
client, err = e(ctx, config, nodename, endpoint, ca, cert, key)
})
if err != nil {
return nil, err
}
if err = validateEngine(ctx, client, config.ConnectionTimeout); err != nil {
Expand All @@ -121,6 +178,32 @@ func GetEngine(ctx context.Context, config types.Config, nodename, endpoint, ca,
return client, nil
}

// GetEngine get engine with cache
func GetEngine(ctx context.Context, config types.Config, nodename, endpoint, ca, cert, key string) (client engine.API, err error) {
if client = GetEngineFromCache(endpoint, ca, cert, key); client != nil {
return client, nil
}

defer func() {
params := engineParams{
endpoint: endpoint,
ca: ca,
cert: cert,
key: key,
}
cacheKey := params.getCacheKey()
if err == nil {
engineCache.Set(params, client)
log.Infof(ctx, "[GetEngine] store engine %v in cache", cacheKey)
} else {
engineCache.Set(params, &fake.Engine{DefaultErr: err})
log.Infof(ctx, "[GetEngine] store fake engine %v in cache", cacheKey)
}
}()

return newEngine(ctx, config, nodename, endpoint, ca, cert, key)
}

func getEnginePrefix(endpoint string) (string, error) {
for prefix := range engines {
if strings.HasPrefix(endpoint, prefix) {
Expand Down
Loading