Skip to content

Commit

Permalink
engine cache checker
Browse files Browse the repository at this point in the history
  • Loading branch information
DuodenumL committed Jan 4, 2022
1 parent d69a8d8 commit 5d45f22
Show file tree
Hide file tree
Showing 11 changed files with 92 additions and 25 deletions.
2 changes: 1 addition & 1 deletion cluster/calcium/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (c *Calcium) ListPodNodes(ctx context.Context, opts *types.ListNodesOptions
return func() {
err := node.Info(ctx)
if err != nil {
logger.Errorf(ctx, "failed to get node info: %+v", err)
logger.Errorf(ctx, "failed to get node %v info: %+v", node.Name, err)
}
ch <- node
}
Expand Down
5 changes: 5 additions & 0 deletions core.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/projecteru2/core/auth"
"github.com/projecteru2/core/cluster/calcium"
"github.com/projecteru2/core/engine/factory"
"github.com/projecteru2/core/log"
"github.com/projecteru2/core/metrics"
"github.com/projecteru2/core/rpc"
Expand Down Expand Up @@ -131,6 +132,10 @@ func serve(c *cli.Context) error {
// wait for unix signals and try to GracefulStop
ctx, cancel := signal.NotifyContext(c.Context, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
defer cancel()

// start engine cache checker
go factory.EngineCacheChecker(ctx, config.ConnectionTimeout)

<-ctx.Done()

log.Info("[main] Interrupt by signal")
Expand Down
6 changes: 6 additions & 0 deletions engine/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,9 @@ func (e *Engine) ResourceValidate(ctx context.Context, cpu float64, cpumap map[s
// TODO list all workloads, calcuate resource
return nil
}

// Ping test connection
func (e *Engine) Ping(ctx context.Context) error {
_, err := e.client.Ping(ctx)
return err
}
1 change: 1 addition & 0 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
// API define a remote engine
type API interface {
Info(ctx context.Context) (*enginetypes.Info, error)
Ping(ctx context.Context) error

Execute(ctx context.Context, ID string, config *enginetypes.ExecConfig) (result string, stdout, stderr io.ReadCloser, stdin io.WriteCloser, err error)
ExecResize(ctx context.Context, ID, result string, height, width uint) (err error)
Expand Down
61 changes: 44 additions & 17 deletions engine/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"strings"
"sync"
"time"

"github.com/projecteru2/core/engine"
Expand All @@ -28,51 +29,77 @@ var (
fakeengine.PrefixKey: fakeengine.MakeClient,
}
engineCache = utils.NewEngineCache(12*time.Hour, 10*time.Minute)
keysToCheck = sync.Map{}
)

func getEngineCacheKey(endpoint, ca, cert, key string) string {
return utils.SHA256(fmt.Sprintf("%v:%v:%v:%v", endpoint, ca, cert, key))
return endpoint + utils.SHA256(fmt.Sprintf(":%v:%v:%v", ca, cert, key))[:8]
}

// EngineCacheChecker checks if the engine in cache is available
func EngineCacheChecker(ctx context.Context, timeout time.Duration) {
log.Info("[EngineCacheChecker] starts")
defer log.Info("[EngineCacheChecker] ends")
for {
select {
case <-ctx.Done():
return
default:
}

keysToRemove := []string{}
keysToCheck.Range(func(key, _ interface{}) bool {
cacheKey := key.(string)
client := engineCache.Get(cacheKey)
if client == nil {
keysToRemove = append(keysToRemove, cacheKey)
return true
}
if err := validateEngine(ctx, client, timeout); err != nil {
log.Errorf(ctx, "[GetEngineFromCache] engine %v is unavailable, will be removed from cache, err: %v", cacheKey, err)
keysToRemove = append(keysToRemove, cacheKey)
}
return true
})
for _, key := range keysToRemove {
engineCache.Delete(key)
keysToCheck.Delete(key)
}
time.Sleep(timeout)
}
}

func validateEngine(ctx context.Context, engine engine.API, timeout time.Duration) (err error) {
utils.WithTimeout(ctx, timeout, func(ctx context.Context) {
_, err = engine.Info(ctx)
err = engine.Ping(ctx)
})
return err
}

// GetEngineFromCache .
func GetEngineFromCache(ctx context.Context, config types.Config, endpoint, ca, cert, key string) engine.API {
client := engineCache.Get(getEngineCacheKey(endpoint, ca, cert, key))
if client == nil {
return nil
}
if err := validateEngine(ctx, client, config.ConnectionTimeout); err != nil {
log.Errorf(ctx, "[GetEngineFromCache] engine of %v is unavailable, will be removed from cache, err: %v", endpoint, err)
RemoveEngineFromCache(endpoint, ca, cert, key)
return nil
}
return client
func GetEngineFromCache(endpoint, ca, cert, key string) engine.API {
return engineCache.Get(getEngineCacheKey(endpoint, ca, cert, key))
}

// RemoveEngineFromCache .
func RemoveEngineFromCache(endpoint, ca, cert, key string) {
cacheKey := getEngineCacheKey(endpoint, ca, cert, key)
log.Debugf(context.TODO(), "[RemoveEngineFromCache] remove %v, key %v", endpoint, cacheKey)
log.Infof(context.TODO(), "[RemoveEngineFromCache] remove engine %v from cache", cacheKey)
engineCache.Delete(cacheKey)
}

// GetEngine get engine
func GetEngine(ctx context.Context, config types.Config, nodename, endpoint, ca, cert, key string) (client engine.API, err error) {
if client = GetEngineFromCache(ctx, config, endpoint, ca, cert, key); client != nil {
return
if client = GetEngineFromCache(endpoint, ca, cert, key); client != nil {
return client, nil
}

defer func() {
if err == nil && client != nil {
cacheKey := getEngineCacheKey(endpoint, ca, cert, key)
log.Debugf(ctx, "[GetEngine] store engine of %v in cache, key: %v", endpoint, cacheKey)
engineCache.Set(cacheKey, client)
keysToCheck.Store(cacheKey, struct{}{})
log.Infof(ctx, "[GetEngine] store engine %v in cache", cacheKey)
}
}()

Expand Down
5 changes: 5 additions & 0 deletions engine/fake/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ func (f *Engine) Info(ctx context.Context) (*enginetypes.Info, error) {
return nil, types.ErrNilEngine
}

// Ping .
func (f *Engine) Ping(ctx context.Context) error {
return types.ErrNilEngine
}

// Execute .
func (f *Engine) Execute(ctx context.Context, ID string, config *enginetypes.ExecConfig) (result string, stdout, stderr io.ReadCloser, stdin io.WriteCloser, err error) {
return "", nil, nil, nil, types.ErrNilEngine
Expand Down
14 changes: 14 additions & 0 deletions engine/mocks/API.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion engine/mocks/fakeengine/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ import (
"io/ioutil"

"github.com/docker/go-units"
mock "github.com/stretchr/testify/mock"

"github.com/projecteru2/core/engine"
enginemocks "github.com/projecteru2/core/engine/mocks"
enginetypes "github.com/projecteru2/core/engine/types"
coretypes "github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"
mock "github.com/stretchr/testify/mock"
)

const (
Expand All @@ -36,6 +37,7 @@ func MakeClient(ctx context.Context, config coretypes.Config, nodename, endpoint
e := &enginemocks.API{}
// info
e.On("Info", mock.Anything).Return(&enginetypes.Info{NCPU: 100, MemTotal: units.GiB * 100, StorageTotal: units.GiB * 100}, nil)
e.On("Ping", mock.Anything).Return(nil)
// exec
var execID string
e.On("Execute", mock.Anything, mock.Anything, mock.Anything).Return(
Expand Down
6 changes: 6 additions & 0 deletions engine/virt/virt.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ func (v *Virt) Info(ctx context.Context) (*enginetypes.Info, error) {
}, nil
}

// Ping tests connection.
func (v *Virt) Ping(ctx context.Context) error {
_, err := v.client.Info(ctx)
return err
}

// Execute executes a command in vm
func (v *Virt) Execute(ctx context.Context, ID string, config *enginetypes.ExecConfig) (pid string, stdout, stderr io.ReadCloser, stdin io.WriteCloser, err error) {
if config.Tty {
Expand Down
6 changes: 3 additions & 3 deletions store/etcdv3/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (m *Mercury) UpdateNodeResource(ctx context.Context, node *types.Node, reso

func (m *Mercury) makeClient(ctx context.Context, node *types.Node) (client engine.API, err error) {
// try to get from cache without ca/cert/key
if client = enginefactory.GetEngineFromCache(ctx, m.config, node.Endpoint, "", "", ""); client != nil {
if client = enginefactory.GetEngineFromCache(node.Endpoint, "", "", ""); client != nil {
return client, nil
}

Expand Down Expand Up @@ -299,6 +299,7 @@ func (m *Mercury) doGetNodes(ctx context.Context, kvs []*mvccpb.KeyValue, labels
return nil, err
}
node.Init()
node.Engine = &fake.Engine{}
if utils.FilterWorkload(node.Labels, labels) {
allNodes = append(allNodes, node)
}
Expand All @@ -324,9 +325,8 @@ func (m *Mercury) doGetNodes(ctx context.Context, kvs []*mvccpb.KeyValue, labels
if node.Available {
if client, err := m.makeClient(ctx, node); err != nil {
log.Errorf(ctx, "[doGetNodes] failed to make client for %v, err: %v", node.Name, err)
n.Engine = &fake.Engine{}
} else {
n.Engine = client
node.Engine = client
}
}
})
Expand Down
7 changes: 4 additions & 3 deletions store/redis/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ func (r *Rediaron) UpdateNodes(ctx context.Context, nodes ...*types.Node) error
addIfNotEmpty(fmt.Sprintf(nodeCaKey, node.Name), node.Ca)
addIfNotEmpty(fmt.Sprintf(nodeCertKey, node.Name), node.Cert)
addIfNotEmpty(fmt.Sprintf(nodeKeyKey, node.Name), node.Key)
enginefactory.RemoveEngineFromCache(node.Endpoint, node.Ca, node.Cert, node.Key)
}
return errors.WithStack(r.BatchPut(ctx, data))
}
Expand All @@ -181,7 +182,7 @@ func (r *Rediaron) UpdateNodeResource(ctx context.Context, node *types.Node, res

func (r *Rediaron) makeClient(ctx context.Context, node *types.Node) (client engine.API, err error) {
// try to get from cache without ca/cert/key
if client = enginefactory.GetEngineFromCache(ctx, r.config, node.Endpoint, "", "", ""); client != nil {
if client = enginefactory.GetEngineFromCache(node.Endpoint, "", "", ""); client != nil {
return client, nil
}
keyFormats := []string{nodeCaKey, nodeCertKey, nodeKeyKey}
Expand Down Expand Up @@ -288,6 +289,7 @@ func (r *Rediaron) doGetNodes(ctx context.Context, kvs map[string]string, labels
return nil, err
}
node.Init()
node.Engine = &fake.Engine{}
if utils.FilterWorkload(node.Labels, labels) {
allNodes = append(allNodes, node)
}
Expand All @@ -313,9 +315,8 @@ func (r *Rediaron) doGetNodes(ctx context.Context, kvs map[string]string, labels
if node.Available {
if client, err := r.makeClient(ctx, node); err != nil {
log.Errorf(ctx, "[doGetNodes] failed to make client for %v, err: %v", node.Name, err)
n.Engine = &fake.Engine{}
} else {
n.Engine = client
node.Engine = client
}
}
})
Expand Down

0 comments on commit 5d45f22

Please sign in to comment.