From 0f54c24fa04c60d3e582e570d650e1a206daf7e4 Mon Sep 17 00:00:00 2001 From: DuodenumL Date: Tue, 18 Jan 2022 17:39:33 +0800 Subject: [PATCH 1/6] try to make client when node is not available (#538) --- engine/factory/factory.go | 2 +- store/etcdv3/node.go | 10 ++++------ store/redis/node.go | 10 ++++------ 3 files changed, 9 insertions(+), 13 deletions(-) diff --git a/engine/factory/factory.go b/engine/factory/factory.go index af6c253a5..c3f028c9c 100644 --- a/engine/factory/factory.go +++ b/engine/factory/factory.go @@ -33,7 +33,7 @@ var ( ) func getEngineCacheKey(endpoint, ca, cert, key string) string { - return endpoint + utils.SHA256(fmt.Sprintf(":%v:%v:%v", ca, cert, key))[:8] + return endpoint + "-" + utils.SHA256(fmt.Sprintf(":%v:%v:%v", ca, cert, key))[:8] } // EngineCacheChecker checks if the engine in cache is available diff --git a/store/etcdv3/node.go b/store/etcdv3/node.go index 315276e14..267f7939c 100644 --- a/store/etcdv3/node.go +++ b/store/etcdv3/node.go @@ -322,12 +322,10 @@ func (m *Mercury) doGetNodes(ctx context.Context, kvs []*mvccpb.KeyValue, labels } nodeChan <- node - if node.Available { - if client, err := m.makeClient(ctx, node); err != nil { - log.Errorf(ctx, "[doGetNodes] failed to make client for %v, err: %v", node.Name, err) - } else { - node.Engine = client - } + if client, err := m.makeClient(ctx, node); err != nil { + log.Errorf(ctx, "[doGetNodes] failed to make client for %v, err: %v", node.Name, err) + } else { + node.Engine = client } }) } diff --git a/store/redis/node.go b/store/redis/node.go index ed629f305..19ab2fffc 100644 --- a/store/redis/node.go +++ b/store/redis/node.go @@ -312,12 +312,10 @@ func (r *Rediaron) doGetNodes(ctx context.Context, kvs map[string]string, labels } nodeChan <- node - if node.Available { - if client, err := r.makeClient(ctx, node); err != nil { - log.Errorf(ctx, "[doGetNodes] failed to make client for %v, err: %v", node.Name, err) - } else { - node.Engine = client - } + if client, err := r.makeClient(ctx, node); err != nil { + log.Errorf(ctx, "[doGetNodes] failed to make client for %v, err: %v", node.Name, err) + } else { + node.Engine = client } }) } From d145b4c9640aae039180228e9f7e9693207ab426 Mon Sep 17 00:00:00 2001 From: DuodenumL Date: Wed, 19 Jan 2022 19:29:19 +0800 Subject: [PATCH 2/6] optimize engine cache checker (#539) --- core.go | 5 +- engine/factory/factory.go | 160 ++++++++++++++++++++++++++--------- engine/fake/fake.go | 71 ++++++++-------- store/etcdv3/mercury_test.go | 6 ++ store/etcdv3/node.go | 2 +- store/redis/node.go | 2 +- store/redis/rediaron_test.go | 5 ++ 7 files changed, 172 insertions(+), 79 deletions(-) diff --git a/core.go b/core.go index 1f6b21fa6..262b11ed3 100644 --- a/core.go +++ b/core.go @@ -72,6 +72,8 @@ func serve(c *cli.Context) error { return err } + factory.InitEngineCache(c.Context, config) + var t *testing.T if embeddedStorage { t = &testing.T{} @@ -133,9 +135,6 @@ func serve(c *cli.Context) error { ctx, cancel := signal.NotifyContext(c.Context, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) defer cancel() - // start engine cache checker - go factory.EngineCacheChecker(ctx, config.ConnectionTimeout) - <-ctx.Done() log.Info("[main] Interrupt by signal") diff --git a/engine/factory/factory.go b/engine/factory/factory.go index c3f028c9c..d4e92fe50 100644 --- a/engine/factory/factory.go +++ b/engine/factory/factory.go @@ -9,6 +9,7 @@ import ( "github.com/projecteru2/core/engine" "github.com/projecteru2/core/engine/docker" + "github.com/projecteru2/core/engine/fake" "github.com/projecteru2/core/engine/mocks/fakeengine" "github.com/projecteru2/core/engine/systemd" "github.com/projecteru2/core/engine/virt" @@ -28,18 +29,65 @@ var ( systemd.TCPPrefix: systemd.MakeClient, fakeengine.PrefixKey: fakeengine.MakeClient, } - engineCache = utils.NewEngineCache(12*time.Hour, 10*time.Minute) - keysToCheck = sync.Map{} + engineCache *EngineCache ) func getEngineCacheKey(endpoint, ca, cert, key string) string { return endpoint + "-" + utils.SHA256(fmt.Sprintf(":%v:%v:%v", ca, cert, key))[:8] } -// EngineCacheChecker checks if the engine in cache is available -func EngineCacheChecker(ctx context.Context, timeout time.Duration) { - log.Info("[EngineCacheChecker] starts") - defer log.Info("[EngineCacheChecker] ends") +type engineParams struct { + endpoint string + ca string + cert string + key string +} + +func (ep engineParams) getCacheKey() string { + return getEngineCacheKey(ep.endpoint, ep.ca, ep.cert, ep.key) +} + +type EngineCache struct { + cache *utils.EngineCache + keysToCheck sync.Map + config types.Config +} + +// NewEngineCache . +func NewEngineCache(config types.Config) *EngineCache { + return &EngineCache{ + cache: utils.NewEngineCache(12*time.Hour, 10*time.Minute), + keysToCheck: sync.Map{}, + config: config, + } +} + +// InitEngineCache init engine cache and start engine cache checker +func InitEngineCache(ctx context.Context, config types.Config) { + engineCache = NewEngineCache(config) + go engineCache.CheckAlive(ctx) +} + +// Get . +func (e *EngineCache) Get(key string) engine.API { + return e.cache.Get(key) +} + +// Set . +func (e *EngineCache) Set(params engineParams, client engine.API) { + e.cache.Set(params.getCacheKey(), client) + e.keysToCheck.Store(params, struct{}{}) +} + +// Delete . +func (e *EngineCache) Delete(key string) { + e.cache.Delete(key) +} + +// CheckAlive checks if the engine in cache is available +func (e *EngineCache) CheckAlive(ctx context.Context) { + log.Info("[EngineCache] starts") + defer log.Info("[EngineCache] ends") for { select { case <-ctx.Done(): @@ -47,25 +95,43 @@ func EngineCacheChecker(ctx context.Context, timeout time.Duration) { default: } - keysToRemove := []string{} - keysToCheck.Range(func(key, _ interface{}) bool { - cacheKey := key.(string) - client := engineCache.Get(cacheKey) - if client == nil { - keysToRemove = append(keysToRemove, cacheKey) + paramsChan := make(chan engineParams) + go func() { + e.keysToCheck.Range(func(key, _ interface{}) bool { + paramsChan <- key.(engineParams) return true - } - if err := validateEngine(ctx, client, timeout); err != nil { - log.Errorf(ctx, "[GetEngineFromCache] engine %v is unavailable, will be removed from cache, err: %v", cacheKey, err) - keysToRemove = append(keysToRemove, cacheKey) - } - return true - }) - for _, key := range keysToRemove { - engineCache.Delete(key) - keysToCheck.Delete(key) + }) + close(paramsChan) + }() + + pool := utils.NewGoroutinePool(int(e.config.MaxConcurrency)) + for params := range paramsChan { + params := params + pool.Go(ctx, func() { + cacheKey := params.getCacheKey() + client := e.cache.Get(cacheKey) + if client == nil { + e.cache.Delete(params.getCacheKey()) + e.keysToCheck.Delete(params) + return + } + if _, ok := client.(*fake.Engine); ok { + if newClient, err := newEngine(ctx, e.config, utils.RandomString(8), params.endpoint, params.ca, params.key, params.cert); err != nil { + log.Errorf(ctx, "[EngineCache] engine %v is still unavailable, err: %v", cacheKey, err) + } else { + e.cache.Set(cacheKey, newClient) + } + return + } + if err := validateEngine(ctx, client, e.config.ConnectionTimeout); err != nil { + log.Errorf(ctx, "[EngineCache] engine %v is unavailable, will be replaced with a fake engine, err: %v", cacheKey, err) + e.cache.Set(cacheKey, &fake.Engine{DefaultErr: err}) + } + }) } - time.Sleep(timeout) + + pool.Wait(ctx) + time.Sleep(e.config.ConnectionTimeout) } } @@ -88,21 +154,8 @@ func RemoveEngineFromCache(endpoint, ca, cert, key string) { engineCache.Delete(cacheKey) } -// GetEngine get engine -func GetEngine(ctx context.Context, config types.Config, nodename, endpoint, ca, cert, key string) (client engine.API, err error) { - if client = GetEngineFromCache(endpoint, ca, cert, key); client != nil { - return client, nil - } - - defer func() { - if err == nil && client != nil { - cacheKey := getEngineCacheKey(endpoint, ca, cert, key) - engineCache.Set(cacheKey, client) - keysToCheck.Store(cacheKey, struct{}{}) - log.Infof(ctx, "[GetEngine] store engine %v in cache", cacheKey) - } - }() - +// newEngine get engine +func newEngine(ctx context.Context, config types.Config, nodename, endpoint, ca, cert, key string) (client engine.API, err error) { prefix, err := getEnginePrefix(endpoint) if err != nil { return nil, err @@ -111,7 +164,10 @@ func GetEngine(ctx context.Context, config types.Config, nodename, endpoint, ca, if !ok { return nil, types.ErrNotSupport } - if client, err = e(ctx, config, nodename, endpoint, ca, cert, key); err != nil { + utils.WithTimeout(ctx, config.ConnectionTimeout, func(ctx context.Context) { + client, err = e(ctx, config, nodename, endpoint, ca, cert, key) + }) + if err != nil { return nil, err } if err = validateEngine(ctx, client, config.ConnectionTimeout); err != nil { @@ -121,6 +177,32 @@ func GetEngine(ctx context.Context, config types.Config, nodename, endpoint, ca, return client, nil } +// GetEngine get engine with cache +func GetEngine(ctx context.Context, config types.Config, nodename, endpoint, ca, cert, key string) (client engine.API, err error) { + if client = GetEngineFromCache(endpoint, ca, cert, key); client != nil { + return client, nil + } + + defer func() { + params := engineParams{ + endpoint: endpoint, + ca: ca, + cert: cert, + key: key, + } + cacheKey := params.getCacheKey() + if err == nil { + engineCache.Set(params, client) + log.Infof(ctx, "[GetEngine] store engine %v in cache", cacheKey) + } else { + engineCache.Set(params, &fake.Engine{DefaultErr: err}) + log.Infof(ctx, "[GetEngine] store fake engine %v in cache", cacheKey) + } + }() + + return newEngine(ctx, config, nodename, endpoint, ca, cert, key) +} + func getEnginePrefix(endpoint string) (string, error) { for prefix := range engines { if strings.HasPrefix(endpoint, prefix) { diff --git a/engine/fake/fake.go b/engine/fake/fake.go index f9b7992b5..2ec7d3f7e 100644 --- a/engine/fake/fake.go +++ b/engine/fake/fake.go @@ -7,100 +7,101 @@ import ( enginetypes "github.com/projecteru2/core/engine/types" coresource "github.com/projecteru2/core/source" - "github.com/projecteru2/core/types" ) // Engine to replace nil engine -type Engine struct{} +type Engine struct { + DefaultErr error +} // Info . func (f *Engine) Info(ctx context.Context) (*enginetypes.Info, error) { - return nil, types.ErrNilEngine + return nil, f.DefaultErr } // Ping . func (f *Engine) Ping(ctx context.Context) error { - return types.ErrNilEngine + return f.DefaultErr } // Execute . func (f *Engine) Execute(ctx context.Context, ID string, config *enginetypes.ExecConfig) (result string, stdout, stderr io.ReadCloser, stdin io.WriteCloser, err error) { - return "", nil, nil, nil, types.ErrNilEngine + return "", nil, nil, nil, f.DefaultErr } // ExecResize . func (f *Engine) ExecResize(ctx context.Context, ID, result string, height, width uint) (err error) { - return types.ErrNilEngine + return f.DefaultErr } // ExecExitCode . func (f *Engine) ExecExitCode(ctx context.Context, ID, result string) (int, error) { - return 0, types.ErrNilEngine + return 0, f.DefaultErr } // NetworkConnect . func (f *Engine) NetworkConnect(ctx context.Context, network, target, ipv4, ipv6 string) ([]string, error) { - return nil, types.ErrNilEngine + return nil, f.DefaultErr } // NetworkDisconnect . func (f *Engine) NetworkDisconnect(ctx context.Context, network, target string, force bool) error { - return types.ErrNilEngine + return f.DefaultErr } // NetworkList . func (f *Engine) NetworkList(ctx context.Context, drivers []string) ([]*enginetypes.Network, error) { - return nil, types.ErrNilEngine + return nil, f.DefaultErr } // ImageList . func (f *Engine) ImageList(ctx context.Context, image string) ([]*enginetypes.Image, error) { - return nil, types.ErrNilEngine + return nil, f.DefaultErr } // ImageRemove . func (f *Engine) ImageRemove(ctx context.Context, image string, force, prune bool) ([]string, error) { - return nil, types.ErrNilEngine + return nil, f.DefaultErr } // ImagesPrune . func (f *Engine) ImagesPrune(ctx context.Context) error { - return types.ErrNilEngine + return f.DefaultErr } // ImagePull . func (f *Engine) ImagePull(ctx context.Context, ref string, all bool) (io.ReadCloser, error) { - return nil, types.ErrNilEngine + return nil, f.DefaultErr } // ImagePush . func (f *Engine) ImagePush(ctx context.Context, ref string) (io.ReadCloser, error) { - return nil, types.ErrNilEngine + return nil, f.DefaultErr } // ImageBuild . func (f *Engine) ImageBuild(ctx context.Context, input io.Reader, refs []string) (io.ReadCloser, error) { - return nil, types.ErrNilEngine + return nil, f.DefaultErr } // ImageBuildCachePrune . func (f *Engine) ImageBuildCachePrune(ctx context.Context, all bool) (uint64, error) { - return 0, types.ErrNilEngine + return 0, f.DefaultErr } // ImageLocalDigests . func (f *Engine) ImageLocalDigests(ctx context.Context, image string) ([]string, error) { - return nil, types.ErrNilEngine + return nil, f.DefaultErr } // ImageRemoteDigest . func (f *Engine) ImageRemoteDigest(ctx context.Context, image string) (string, error) { - return "", types.ErrNilEngine + return "", f.DefaultErr } // ImageBuildFromExist . func (f *Engine) ImageBuildFromExist(ctx context.Context, ID string, refs []string, user string) (string, error) { - return "", types.ErrNilEngine + return "", f.DefaultErr } // BuildRefs . @@ -110,75 +111,75 @@ func (f *Engine) BuildRefs(ctx context.Context, opts *enginetypes.BuildRefOption // BuildContent . func (f *Engine) BuildContent(ctx context.Context, scm coresource.Source, opts *enginetypes.BuildContentOptions) (string, io.Reader, error) { - return "", nil, types.ErrNilEngine + return "", nil, f.DefaultErr } // VirtualizationCreate . func (f *Engine) VirtualizationCreate(ctx context.Context, opts *enginetypes.VirtualizationCreateOptions) (*enginetypes.VirtualizationCreated, error) { - return nil, types.ErrNilEngine + return nil, f.DefaultErr } // VirtualizationResourceRemap . func (f *Engine) VirtualizationResourceRemap(ctx context.Context, options *enginetypes.VirtualizationRemapOptions) (<-chan enginetypes.VirtualizationRemapMessage, error) { - return nil, types.ErrNilEngine + return nil, f.DefaultErr } // VirtualizationCopyTo . func (f *Engine) VirtualizationCopyTo(ctx context.Context, ID, target string, content []byte, uid, gid int, mode int64) error { - return types.ErrNilEngine + return f.DefaultErr } // VirtualizationStart . func (f *Engine) VirtualizationStart(ctx context.Context, ID string) error { - return types.ErrNilEngine + return f.DefaultErr } // VirtualizationStop . func (f *Engine) VirtualizationStop(ctx context.Context, ID string, gracefulTimeout time.Duration) error { - return types.ErrNilEngine + return f.DefaultErr } // VirtualizationRemove . func (f *Engine) VirtualizationRemove(ctx context.Context, ID string, volumes, force bool) error { - return types.ErrNilEngine + return f.DefaultErr } // VirtualizationInspect . func (f *Engine) VirtualizationInspect(ctx context.Context, ID string) (*enginetypes.VirtualizationInfo, error) { - return nil, types.ErrNilEngine + return nil, f.DefaultErr } // VirtualizationLogs . func (f *Engine) VirtualizationLogs(ctx context.Context, opts *enginetypes.VirtualizationLogStreamOptions) (stdout, stderr io.ReadCloser, err error) { - return nil, nil, types.ErrNilEngine + return nil, nil, f.DefaultErr } // VirtualizationAttach . func (f *Engine) VirtualizationAttach(ctx context.Context, ID string, stream, openStdin bool) (stdout, stderr io.ReadCloser, stdin io.WriteCloser, err error) { - return nil, nil, nil, types.ErrNilEngine + return nil, nil, nil, f.DefaultErr } // VirtualizationResize . func (f *Engine) VirtualizationResize(ctx context.Context, ID string, height, width uint) error { - return types.ErrNilEngine + return f.DefaultErr } // VirtualizationWait . func (f *Engine) VirtualizationWait(ctx context.Context, ID, state string) (*enginetypes.VirtualizationWaitResult, error) { - return nil, types.ErrNilEngine + return nil, f.DefaultErr } // VirtualizationUpdateResource . func (f *Engine) VirtualizationUpdateResource(ctx context.Context, ID string, opts *enginetypes.VirtualizationResource) error { - return types.ErrNilEngine + return f.DefaultErr } // VirtualizationCopyFrom . func (f *Engine) VirtualizationCopyFrom(ctx context.Context, ID, path string) (content []byte, uid, gid int, mode int64, _ error) { - return nil, 0, 0, 0, types.ErrNilEngine + return nil, 0, 0, 0, f.DefaultErr } // ResourceValidate . func (f *Engine) ResourceValidate(ctx context.Context, cpu float64, cpumap map[string]int64, memory, storage int64) error { - return types.ErrNilEngine + return f.DefaultErr } diff --git a/store/etcdv3/mercury_test.go b/store/etcdv3/mercury_test.go index 2bdc76c56..7d59d48f8 100644 --- a/store/etcdv3/mercury_test.go +++ b/store/etcdv3/mercury_test.go @@ -1,9 +1,11 @@ package etcdv3 import ( + "context" "testing" "time" + "github.com/projecteru2/core/engine/factory" "github.com/projecteru2/core/types" "github.com/stretchr/testify/assert" @@ -21,6 +23,10 @@ func NewMercury(t *testing.T) *Mercury { config.MaxConcurrency = 20 // config.Docker.CertPath = "/tmp" + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + factory.InitEngineCache(ctx, config) + m, err := New(config, t) assert.NoError(t, err) return m diff --git a/store/etcdv3/node.go b/store/etcdv3/node.go index 267f7939c..843fc013e 100644 --- a/store/etcdv3/node.go +++ b/store/etcdv3/node.go @@ -299,7 +299,7 @@ func (m *Mercury) doGetNodes(ctx context.Context, kvs []*mvccpb.KeyValue, labels return nil, err } node.Init() - node.Engine = &fake.Engine{} + node.Engine = &fake.Engine{DefaultErr: types.ErrNilEngine} if utils.FilterWorkload(node.Labels, labels) { allNodes = append(allNodes, node) } diff --git a/store/redis/node.go b/store/redis/node.go index 19ab2fffc..1f3e49091 100644 --- a/store/redis/node.go +++ b/store/redis/node.go @@ -289,7 +289,7 @@ func (r *Rediaron) doGetNodes(ctx context.Context, kvs map[string]string, labels return nil, err } node.Init() - node.Engine = &fake.Engine{} + node.Engine = &fake.Engine{DefaultErr: types.ErrNilEngine} if utils.FilterWorkload(node.Labels, labels) { allNodes = append(allNodes, node) } diff --git a/store/redis/rediaron_test.go b/store/redis/rediaron_test.go index 3fd5f321a..293323b92 100644 --- a/store/redis/rediaron_test.go +++ b/store/redis/rediaron_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "github.com/projecteru2/core/engine/factory" "github.com/projecteru2/core/types" "github.com/alicebob/miniredis/v2" @@ -84,6 +85,10 @@ func TestRediaron(t *testing.T) { config.GlobalTimeout = 30 * time.Second config.MaxConcurrency = 20 + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + factory.InitEngineCache(ctx, config) + cli := redis.NewClient(&redis.Options{ Addr: s.Addr(), DB: 0, From 69e4908e4d002d96e4a91435d18cd92cfb5c8e05 Mon Sep 17 00:00:00 2001 From: DuodenumL Date: Wed, 12 Jan 2022 12:03:17 +0800 Subject: [PATCH 3/6] fix the issue of building docker image (#535) --- .github/workflows/dockerimage.yml | 4 ++-- Dockerfile | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/dockerimage.yml b/.github/workflows/dockerimage.yml index b21bffa2c..46dc28302 100644 --- a/.github/workflows/dockerimage.yml +++ b/.github/workflows/dockerimage.yml @@ -2,8 +2,6 @@ name: docker-image on: push: - branches: - - master tags: - v* @@ -13,6 +11,8 @@ jobs: steps: - name: checkout uses: actions/checkout@v2 + with: + fetch-depth: 0 - name: build and push to github packages uses: docker/build-push-action@v1 diff --git a/Dockerfile b/Dockerfile index 6e881f0f5..390c8b3df 100644 --- a/Dockerfile +++ b/Dockerfile @@ -2,7 +2,7 @@ FROM golang:alpine AS BUILD # make binary RUN apk add --no-cache build-base musl-dev git curl make cmake -RUN git clone https://github.com/projecteru2/core.git /go/src/github.com/projecteru2/core +COPY . /go/src/github.com/projecteru2/core WORKDIR /go/src/github.com/projecteru2/core ARG KEEP_SYMBOL RUN make build && ./eru-core --version From b38b88fed78e4aab21a80857a0ac43101e9c4c50 Mon Sep 17 00:00:00 2001 From: Joseph Gu Date: Wed, 12 Jan 2022 16:11:47 +0800 Subject: [PATCH 4/6] Add stdin flag (#536) --- engine/virt/virt.go | 1 + go.mod | 2 +- go.sum | 4 ++-- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/engine/virt/virt.go b/engine/virt/virt.go index 645360749..8212a0d08 100644 --- a/engine/virt/virt.go +++ b/engine/virt/virt.go @@ -190,6 +190,7 @@ func (v *Virt) VirtualizationCreate(ctx context.Context, opts *enginetypes.Virtu DmiUUID: opts.Labels[DmiUUIDKey], Cmd: opts.Cmd, Lambda: opts.Lambda, + Stdin: opts.Stdin, } var resp virttypes.Guest diff --git a/go.mod b/go.mod index 10d850684..8c7d420ab 100644 --- a/go.mod +++ b/go.mod @@ -32,7 +32,7 @@ require ( github.com/opencontainers/runc v1.0.0-rc95 // indirect github.com/patrickmn/go-cache v2.1.0+incompatible github.com/pkg/errors v0.9.1 - github.com/projecteru2/libyavirt v0.0.0-20211217082140-493b61aa9b0d + github.com/projecteru2/libyavirt v0.0.0-20220112061300-ac7002c411ff github.com/prometheus/client_golang v1.11.0 github.com/sanity-io/litter v1.5.1 github.com/sirupsen/logrus v1.7.0 diff --git a/go.sum b/go.sum index 76e40a808..c76df3c36 100644 --- a/go.sum +++ b/go.sum @@ -430,8 +430,8 @@ github.com/pmezard/go-difflib v0.0.0-20151028094244-d8ed2627bdf0/go.mod h1:iKH77 github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI= -github.com/projecteru2/libyavirt v0.0.0-20211217082140-493b61aa9b0d h1:BMFqsvIB3nmK5l53nz8r2ndK8//T0njciu5nr/kj9A4= -github.com/projecteru2/libyavirt v0.0.0-20211217082140-493b61aa9b0d/go.mod h1:FOc+hWBMLsMrmx5p3/moizKeSomedZPNwB6LhS+kEnE= +github.com/projecteru2/libyavirt v0.0.0-20220112061300-ac7002c411ff h1:0pRYgowqjxIxotfc2+xrOHstg/ii80GK0g0v2eAdArg= +github.com/projecteru2/libyavirt v0.0.0-20220112061300-ac7002c411ff/go.mod h1:FOc+hWBMLsMrmx5p3/moizKeSomedZPNwB6LhS+kEnE= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso= github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= From a00c69b4c03a13ceddbd71491609981d9cfd9321 Mon Sep 17 00:00:00 2001 From: zc Date: Mon, 17 Jan 2022 15:48:38 +0800 Subject: [PATCH 5/6] WAL can wait and recycle lambdas (#531) * wal can wait and recycle lambdas * fix test and lint --- cluster/calcium/lambda.go | 61 +++++++++++++---------------- cluster/calcium/lambda_test.go | 12 ------ cluster/calcium/node.go | 5 +-- cluster/calcium/wal.go | 70 +++++++++++++--------------------- cluster/calcium/wal_test.go | 39 +++++++++---------- log/log.go | 6 +++ 6 files changed, 80 insertions(+), 113 deletions(-) diff --git a/cluster/calcium/lambda.go b/cluster/calcium/lambda.go index b5c7a7c99..6af2ed9b3 100644 --- a/cluster/calcium/lambda.go +++ b/cluster/calcium/lambda.go @@ -15,13 +15,11 @@ import ( "github.com/projecteru2/core/utils" "github.com/projecteru2/core/wal" - "github.com/google/uuid" "github.com/pkg/errors" ) const ( exitDataPrefix = "[exitcode] " - labelLambdaID = "LambdaID" ) // RunAndWait implement lambda @@ -39,10 +37,6 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC return workloadIDs, nil, errors.WithStack(types.ErrRunAndWaitCountOneWithStdin) } - commit, err := c.walCreateLambda(opts) - if err != nil { - return workloadIDs, nil, logger.Err(ctx, err) - } createChan, err := c.CreateWorkload(ctx, opts) if err != nil { logger.Errorf(ctx, "[RunAndWait] Create workload error %+v", err) @@ -54,23 +48,40 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC wg = &sync.WaitGroup{} ) - lambda := func(message *types.CreateWorkloadMessage) { + lambda := func(message *types.CreateWorkloadMessage) (attachMessage *types.AttachWorkloadMessage) { // should Done this waitgroup anyway defer wg.Done() + defer func() { + runMsgCh <- attachMessage + }() + // if workload is empty, which means error occurred when created workload // we don't need to remove this non-existing workload // so just send the error message and return if message.Error != nil || message.WorkloadID == "" { logger.Errorf(ctx, "[RunAndWait] Create workload failed %+v", message.Error) - runMsgCh <- &types.AttachWorkloadMessage{ + return &types.AttachWorkloadMessage{ WorkloadID: "", Data: []byte(fmt.Sprintf("Create workload failed %+v", errors.Unwrap(message.Error))), StdStreamType: types.EruError, } - return } + commit, err := c.walCreateLambda(message) + if err != nil { + return &types.AttachWorkloadMessage{ + WorkloadID: message.WorkloadID, + Data: []byte(fmt.Sprintf("Create wal failed: %s, %+v", message.WorkloadID, logger.Err(ctx, err))), + StdStreamType: types.EruError, + } + } + defer func() { + if err := commit(); err != nil { + logger.Errorf(ctx, "[RunAndWait] Commit WAL %s failed: %s, %v", eventCreateLambda, message.WorkloadID, err) + } + }() + // the workload should be removed if it exists // no matter the workload exits successfully or not defer func() { @@ -86,12 +97,11 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC workload, err := c.GetWorkload(ctx, message.WorkloadID) if err != nil { logger.Errorf(ctx, "[RunAndWait] Get workload failed %+v", err) - runMsgCh <- &types.AttachWorkloadMessage{ + return &types.AttachWorkloadMessage{ WorkloadID: message.WorkloadID, Data: []byte(fmt.Sprintf("Get workload %s failed %+v", message.WorkloadID, errors.Unwrap(err))), StdStreamType: types.EruError, } - return } // for other cases, we have the workload and it works fine @@ -105,12 +115,11 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC Stderr: true, }); err != nil { logger.Errorf(ctx, "[RunAndWait] Can't fetch log of workload %s error %+v", message.WorkloadID, err) - runMsgCh <- &types.AttachWorkloadMessage{ + return &types.AttachWorkloadMessage{ WorkloadID: message.WorkloadID, Data: []byte(fmt.Sprintf("Fetch log for workload %s failed %+v", message.WorkloadID, errors.Unwrap(err))), StdStreamType: types.EruError, } - return } splitFunc, split := bufio.ScanLines, byte('\n') @@ -121,12 +130,11 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC stdout, stderr, inStream, err = workload.Engine.VirtualizationAttach(ctx, message.WorkloadID, true, true) if err != nil { logger.Errorf(ctx, "[RunAndWait] Can't attach workload %s error %+v", message.WorkloadID, err) - runMsgCh <- &types.AttachWorkloadMessage{ + return &types.AttachWorkloadMessage{ WorkloadID: message.WorkloadID, Data: []byte(fmt.Sprintf("Attach to workload %s failed %+v", message.WorkloadID, errors.Unwrap(err))), StdStreamType: types.EruError, } - return } processVirtualizationInStream(ctx, inStream, inCh, func(height, width uint) error { @@ -148,12 +156,11 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC r, err := workload.Engine.VirtualizationWait(ctx, message.WorkloadID, "") if err != nil { logger.Errorf(ctx, "[RunAndWait] %s wait failed %+v", utils.ShortID(message.WorkloadID), err) - runMsgCh <- &types.AttachWorkloadMessage{ + return &types.AttachWorkloadMessage{ WorkloadID: message.WorkloadID, Data: []byte(fmt.Sprintf("Wait workload %s failed %+v", message.WorkloadID, errors.Unwrap(err))), StdStreamType: types.EruError, } - return } if r.Code != 0 { @@ -161,7 +168,7 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC } exitData := []byte(exitDataPrefix + strconv.Itoa(int(r.Code))) - runMsgCh <- &types.AttachWorkloadMessage{ + return &types.AttachWorkloadMessage{ WorkloadID: message.WorkloadID, Data: exitData, StdStreamType: types.Stdout, @@ -182,9 +189,6 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC utils.SentryGo(func() { defer close(runMsgCh) wg.Wait() - if err := commit(); err != nil { - logger.Errorf(ctx, "[RunAndWait] Commit WAL %s failed: %v", eventCreateLambda, err) - } log.Info("[RunAndWait] Finish run and wait for workloads") }) @@ -192,19 +196,6 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC return workloadIDs, runMsgCh, nil } -func (c *Calcium) walCreateLambda(opts *types.DeployOptions) (wal.Commit, error) { - uid, err := uuid.NewRandom() - if err != nil { - return nil, errors.WithStack(err) - } - - lambdaID := uid.String() - - if opts.Labels != nil { - opts.Labels[labelLambdaID] = lambdaID - } else { - opts.Labels = map[string]string{labelLambdaID: lambdaID} - } - +func (c *Calcium) walCreateLambda(opts *types.CreateWorkloadMessage) (wal.Commit, error) { return c.wal.logCreateLambda(opts) } diff --git a/cluster/calcium/lambda_test.go b/cluster/calcium/lambda_test.go index de36d4267..322240899 100644 --- a/cluster/calcium/lambda_test.go +++ b/cluster/calcium/lambda_test.go @@ -17,7 +17,6 @@ import ( storemocks "github.com/projecteru2/core/store/mocks" "github.com/projecteru2/core/strategy" "github.com/projecteru2/core/types" - "github.com/projecteru2/core/wal" walmocks "github.com/projecteru2/core/wal/mocks" "github.com/stretchr/testify/assert" @@ -31,12 +30,6 @@ func TestRunAndWaitFailedThenWALCommitted(t *testing.T) { mwal := c.wal.WAL.(*walmocks.WAL) defer mwal.AssertExpectations(t) - var walCommitted bool - commit := wal.Commit(func() error { - walCommitted = true - return nil - }) - mwal.On("Log", eventCreateLambda, mock.Anything).Return(commit, nil).Once() opts := &types.DeployOptions{ Name: "zc:name", @@ -56,7 +49,6 @@ func TestRunAndWaitFailedThenWALCommitted(t *testing.T) { _, ch, err := c.RunAndWait(context.Background(), opts, make(chan []byte)) assert.NoError(err) assert.NotNil(ch) - assert.False(walCommitted) ms := []*types.AttachWorkloadMessage{} for m := range ch { ms = append(ms, m) @@ -65,10 +57,6 @@ func TestRunAndWaitFailedThenWALCommitted(t *testing.T) { assert.Equal(m.WorkloadID, "") assert.True(strings.HasPrefix(string(m.Data), "Create workload failed")) - lambdaID, exists := opts.Labels[labelLambdaID] - assert.True(exists) - assert.True(len(lambdaID) > 1) - assert.True(walCommitted) assert.Equal(m.StdStreamType, types.EruError) } diff --git a/cluster/calcium/node.go b/cluster/calcium/node.go index e7c5b4631..736b14f25 100644 --- a/cluster/calcium/node.go +++ b/cluster/calcium/node.go @@ -9,7 +9,6 @@ import ( "github.com/projecteru2/core/utils" "github.com/pkg/errors" - "github.com/sanity-io/litter" ) // AddNode adds a node @@ -127,13 +126,13 @@ func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*typ } var n *types.Node return n, c.withNodeLocked(ctx, opts.Nodename, func(ctx context.Context, node *types.Node) error { - litter.Dump(opts) + logger.Infof(ctx, "set node") opts.Normalize(node) n = node n.Bypass = (opts.BypassOpt == types.TriTrue) || (opts.BypassOpt == types.TriKeep && n.Bypass) if n.IsDown() { - log.Errorf(ctx, "[SetNodeAvailable] node marked down: %s", opts.Nodename) + logger.Errorf(ctx, "[SetNodeAvailable] node marked down: %s", opts.Nodename) } if opts.WorkloadsDown { c.setAllWorkloadsOnNodeDown(ctx, opts.Nodename) diff --git a/cluster/calcium/wal.go b/cluster/calcium/wal.go index aa7a9ce2d..f038eeedc 100644 --- a/cluster/calcium/wal.go +++ b/cluster/calcium/wal.go @@ -52,12 +52,8 @@ func (w *WAL) logCreateWorkload(workloadID, nodename string) (wal.Commit, error) }) } -func (w *WAL) logCreateLambda(opts *types.DeployOptions) (wal.Commit, error) { - return w.Log(eventCreateLambda, &types.ListWorkloadsOptions{ - Appname: opts.Name, - Entrypoint: opts.Entrypoint.Name, - Labels: map[string]string{labelLambdaID: opts.Labels[labelLambdaID]}, - }) +func (w *WAL) logCreateLambda(opts *types.CreateWorkloadMessage) (wal.Commit, error) { + return w.Log(eventCreateLambda, opts.WorkloadID) } // CreateWorkloadHandler indicates event handler for creating workload. @@ -179,64 +175,52 @@ func (h *CreateLambdaHandler) Check(context.Context, interface{}) (bool, error) // Encode . func (h *CreateLambdaHandler) Encode(raw interface{}) ([]byte, error) { - opts, ok := raw.(*types.ListWorkloadsOptions) + workloadID, ok := raw.(string) if !ok { return nil, types.NewDetailedErr(types.ErrInvalidType, raw) } - return json.Marshal(opts) + return []byte(workloadID), nil } // Decode . func (h *CreateLambdaHandler) Decode(bs []byte) (interface{}, error) { - opts := &types.ListWorkloadsOptions{} - err := json.Unmarshal(bs, opts) - return opts, err + return string(bs), nil } // Handle . func (h *CreateLambdaHandler) Handle(ctx context.Context, raw interface{}) error { - opts, ok := raw.(*types.ListWorkloadsOptions) + workloadID, ok := raw.(string) if !ok { return types.NewDetailedErr(types.ErrInvalidType, raw) } - workloadIDs, err := h.getWorkloadIDs(ctx, opts) - if err != nil { - log.Errorf(nil, "[CreateLambdaHandler.Handle] Get workloads %s/%s/%v failed: %v", //nolint - opts.Appname, opts.Entrypoint, opts.Labels, err) - return err - } - - ctx, cancel := getReplayContext(ctx) - defer cancel() + logger := log.WithField("WAL.Handle", "RunAndWait").WithField("ID", workloadID) + go func() { + logger.Infof(ctx, "recovery start") + workload, err := h.calcium.GetWorkload(ctx, workloadID) + if err != nil { + logger.Errorf(ctx, "Get workload failed: %v", err) + return + } - if err := h.calcium.doRemoveWorkloadSync(ctx, workloadIDs); err != nil { - log.Errorf(ctx, "[CreateLambdaHandler.Handle] Remove lambda %v failed: %v", opts, err) - return err - } + r, err := workload.Engine.VirtualizationWait(ctx, workloadID, "") + if err != nil { + logger.Errorf(ctx, "Wait failed: %+v", err) + return + } + if r.Code != 0 { + logger.Errorf(ctx, "Run failed: %s", r.Message) + } - log.Infof(ctx, "[CreateLambdaHandler.Handle] Lambda %v removed", opts) + if err := h.calcium.doRemoveWorkloadSync(ctx, []string{workloadID}); err != nil { + logger.Errorf(ctx, "Remove failed: %+v", err) + } + logger.Infof(ctx, "waited and removed") + }() return nil } -func (h *CreateLambdaHandler) getWorkloadIDs(ctx context.Context, opts *types.ListWorkloadsOptions) ([]string, error) { - ctx, cancel := getReplayContext(ctx) - defer cancel() - - workloads, err := h.calcium.ListWorkloads(ctx, opts) - if err != nil { - return nil, err - } - - workloadIDs := make([]string, len(workloads)) - for i, wrk := range workloads { - workloadIDs[i] = wrk.ID - } - - return workloadIDs, nil -} - func getReplayContext(ctx context.Context) (context.Context, context.CancelFunc) { return context.WithTimeout(ctx, time.Second*32) } diff --git a/cluster/calcium/wal_test.go b/cluster/calcium/wal_test.go index dca2a14f2..bda4244e0 100644 --- a/cluster/calcium/wal_test.go +++ b/cluster/calcium/wal_test.go @@ -4,8 +4,10 @@ import ( "context" "fmt" "testing" + "time" enginemocks "github.com/projecteru2/core/engine/mocks" + enginetypes "github.com/projecteru2/core/engine/types" lockmocks "github.com/projecteru2/core/lock/mocks" storemocks "github.com/projecteru2/core/store/mocks" "github.com/projecteru2/core/types" @@ -131,12 +133,7 @@ func TestHandleCreateLambda(t *testing.T) { require.NoError(t, err) c.wal = wal - deployOpts := &types.DeployOptions{ - Name: "appname", - Entrypoint: &types.Entrypoint{Name: "entry"}, - Labels: map[string]string{labelLambdaID: "lambda"}, - } - _, err = c.wal.logCreateLambda(deployOpts) + _, err = c.wal.logCreateLambda(&types.CreateWorkloadMessage{WorkloadID: "workloadid"}) require.NoError(t, err) node := &types.Node{ @@ -150,35 +147,34 @@ func TestHandleCreateLambda(t *testing.T) { } store := c.store.(*storemocks.Store) - defer store.AssertExpectations(t) - store.On("ListWorkloads", mock.Anything, deployOpts.Name, deployOpts.Entrypoint.Name, "", int64(0), deployOpts.Labels). - Return(nil, fmt.Errorf("err")). - Once() - store.On("ListNodeWorkloads", mock.Anything, mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD) + store.On("GetWorkload", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once() c.wal.Recover(context.TODO()) + time.Sleep(500 * time.Millisecond) + store.AssertExpectations(t) - store.On("ListWorkloads", mock.Anything, deployOpts.Name, deployOpts.Entrypoint.Name, "", int64(0), deployOpts.Labels). - Return([]*types.Workload{wrk}, nil). + _, err = c.wal.logCreateLambda(&types.CreateWorkloadMessage{WorkloadID: "workloadid"}) + require.NoError(t, err) + store.On("GetWorkload", mock.Anything, mock.Anything). + Return(wrk, nil). Once() - store.On("GetWorkloads", mock.Anything, []string{wrk.ID}). - Return([]*types.Workload{wrk}, nil). - Twice() store.On("GetNode", mock.Anything, wrk.Nodename). Return(node, nil) - eng := wrk.Engine.(*enginemocks.API) - defer eng.AssertExpectations(t) + eng.On("VirtualizationWait", mock.Anything, wrk.ID, "").Return(&enginetypes.VirtualizationWaitResult{Code: 0}, nil).Once() eng.On("VirtualizationRemove", mock.Anything, wrk.ID, true, true). Return(nil). Once() - + eng.On("VirtualizationResourceRemap", mock.Anything, mock.Anything).Return(nil, nil).Once() + store.On("GetWorkloads", mock.Anything, []string{wrk.ID}). + Return([]*types.Workload{wrk}, nil). + Twice() store.On("RemoveWorkload", mock.Anything, wrk). Return(nil). Once() store.On("UpdateNodeResource", mock.Anything, node, mock.Anything, mock.Anything). Return(nil). Once() - + store.On("ListNodeWorkloads", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil).Once() lock := &lockmocks.DistributedLock{} lock.On("Lock", mock.Anything).Return(context.TODO(), nil) lock.On("Unlock", mock.Anything).Return(nil) @@ -187,4 +183,7 @@ func TestHandleCreateLambda(t *testing.T) { c.wal.Recover(context.TODO()) // Recovered nothing. c.wal.Recover(context.TODO()) + time.Sleep(500 * time.Millisecond) + store.AssertExpectations(t) + eng.AssertExpectations(t) } diff --git a/log/log.go b/log/log.go index 7ef4550f5..0c2d55643 100644 --- a/log/log.go +++ b/log/log.go @@ -60,6 +60,12 @@ func (f Fields) Err(ctx context.Context, err error) error { return err } +// Infof . +func (f Fields) Infof(ctx context.Context, format string, args ...interface{}) { + format = getTracingInfo(ctx) + format + f.e.Infof(format, args...) +} + // WithField add kv into log entry func WithField(key string, value interface{}) Fields { return Fields{ From b6bc5afaae2bda0e2dee85f13d010eec6d0f41b5 Mon Sep 17 00:00:00 2001 From: DuodenumL Date: Thu, 20 Jan 2022 18:28:54 +0800 Subject: [PATCH 6/6] move selfmon (#540) --- cluster/calcium/calcium.go | 23 +------- cluster/calcium/dissociate.go | 5 +- cluster/calcium/remove.go | 5 +- core.go | 7 +++ engine/factory/factory.go | 2 +- {cluster/calcium => selfmon}/selfmon.go | 76 ++++++++++++------------- store/etcdv3/node.go | 5 +- store/etcdv3/node_test.go | 5 +- store/redis/node.go | 5 +- store/redis/node_test.go | 5 +- store/store.go | 21 ++++--- types/action.go | 8 +++ 12 files changed, 83 insertions(+), 84 deletions(-) rename {cluster/calcium => selfmon}/selfmon.go (74%) create mode 100644 types/action.go diff --git a/cluster/calcium/calcium.go b/cluster/calcium/calcium.go index 69f1f4e3b..51f295e3c 100644 --- a/cluster/calcium/calcium.go +++ b/cluster/calcium/calcium.go @@ -15,8 +15,6 @@ import ( "github.com/projecteru2/core/source/github" "github.com/projecteru2/core/source/gitlab" "github.com/projecteru2/core/store" - "github.com/projecteru2/core/store/etcdv3" - "github.com/projecteru2/core/store/redis" "github.com/projecteru2/core/types" "github.com/pkg/errors" @@ -31,7 +29,6 @@ type Calcium struct { watcher discovery.Service wal *WAL identifier string - selfmon *NodeStatusWatcher } // New returns a new cluster config @@ -39,19 +36,9 @@ func New(config types.Config, t *testing.T) (*Calcium, error) { logger := log.WithField("Calcium", "New").WithField("config", config) // set store - var store store.Store - var err error - switch config.Store { - case types.Redis: - store, err = redis.New(config, t) - if err != nil { - return nil, logger.Err(context.TODO(), errors.WithStack(err)) - } - default: - store, err = etcdv3.New(config, t) - if err != nil { - return nil, logger.Err(context.TODO(), errors.WithStack(err)) - } + store, err := store.NewStore(config, t) + if err != nil { + return nil, logger.Err(context.TODO(), errors.WithStack(err)) } // set scheduler @@ -83,10 +70,6 @@ func New(config types.Config, t *testing.T) (*Calcium, error) { cal := &Calcium{store: store, config: config, scheduler: potassium, source: scm, watcher: watcher} cal.wal, err = newCalciumWAL(cal) cal.identifier = config.Identifier() - cal.selfmon = NewNodeStatusWatcher(cal) - - // start node status watcher - go cal.selfmon.run() return cal, logger.Err(nil, errors.WithStack(err)) //nolint } diff --git a/cluster/calcium/dissociate.go b/cluster/calcium/dissociate.go index 02379e73a..9f954f269 100644 --- a/cluster/calcium/dissociate.go +++ b/cluster/calcium/dissociate.go @@ -4,7 +4,6 @@ import ( "context" "github.com/projecteru2/core/log" - "github.com/projecteru2/core/store" "github.com/projecteru2/core/types" "github.com/projecteru2/core/utils" @@ -34,7 +33,7 @@ func (c *Calcium) DissociateWorkload(ctx context.Context, ids []string) (chan *t ctx, // if func(ctx context.Context) (err error) { - if err = c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, store.ActionIncr); err == nil { + if err = c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, types.ActionIncr); err == nil { log.Infof(ctx, "[DissociateWorkload] Workload %s dissociated", workload.ID) } return errors.WithStack(err) @@ -48,7 +47,7 @@ func (c *Calcium) DissociateWorkload(ctx context.Context, ids []string) (chan *t if failedByCond { return nil } - return errors.WithStack(c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, store.ActionDecr)) + return errors.WithStack(c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, types.ActionDecr)) }, c.config.GlobalTimeout, ) diff --git a/cluster/calcium/remove.go b/cluster/calcium/remove.go index 166d9af54..5bb4fba76 100644 --- a/cluster/calcium/remove.go +++ b/cluster/calcium/remove.go @@ -6,7 +6,6 @@ import ( "sync" "github.com/projecteru2/core/log" - "github.com/projecteru2/core/store" "github.com/projecteru2/core/types" "github.com/projecteru2/core/utils" @@ -42,7 +41,7 @@ func (c *Calcium) RemoveWorkload(ctx context.Context, ids []string, force bool, ctx, // if func(ctx context.Context) error { - return errors.WithStack(c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, store.ActionIncr)) + return errors.WithStack(c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, types.ActionIncr)) }, // then func(ctx context.Context) (err error) { @@ -56,7 +55,7 @@ func (c *Calcium) RemoveWorkload(ctx context.Context, ids []string, force bool, if failedByCond { return nil } - return errors.WithStack(c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, store.ActionDecr)) + return errors.WithStack(c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, types.ActionDecr)) }, c.config.GlobalTimeout, ) diff --git a/core.go b/core.go index 262b11ed3..575595f95 100644 --- a/core.go +++ b/core.go @@ -18,6 +18,7 @@ import ( "github.com/projecteru2/core/metrics" "github.com/projecteru2/core/rpc" pb "github.com/projecteru2/core/rpc/gen" + "github.com/projecteru2/core/selfmon" "github.com/projecteru2/core/utils" "github.com/projecteru2/core/version" @@ -135,6 +136,12 @@ func serve(c *cli.Context) error { ctx, cancel := signal.NotifyContext(c.Context, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) defer cancel() + // start node status checker + go selfmon.RunNodeStatusWatcher(ctx, config, cluster, t) + + // start engine cache checker + go factory.EngineCacheChecker(ctx, config.ConnectionTimeout) + <-ctx.Done() log.Info("[main] Interrupt by signal") diff --git a/engine/factory/factory.go b/engine/factory/factory.go index d4e92fe50..c52c262c2 100644 --- a/engine/factory/factory.go +++ b/engine/factory/factory.go @@ -33,7 +33,7 @@ var ( ) func getEngineCacheKey(endpoint, ca, cert, key string) string { - return endpoint + "-" + utils.SHA256(fmt.Sprintf(":%v:%v:%v", ca, cert, key))[:8] + return fmt.Sprintf("%v-%v", endpoint, utils.SHA256(fmt.Sprintf(":%v:%v:%v", ca, cert, key))[:8]) } type engineParams struct { diff --git a/cluster/calcium/selfmon.go b/selfmon/selfmon.go similarity index 74% rename from cluster/calcium/selfmon.go rename to selfmon/selfmon.go index 4138c4ed0..c7bd65bb4 100644 --- a/cluster/calcium/selfmon.go +++ b/selfmon/selfmon.go @@ -1,16 +1,17 @@ -package calcium +package selfmon import ( "context" "math/rand" - "os/signal" - "syscall" + "testing" "time" "github.com/pkg/errors" + "github.com/projecteru2/core/cluster" "github.com/projecteru2/core/log" - coretypes "github.com/projecteru2/core/types" + "github.com/projecteru2/core/store" + "github.com/projecteru2/core/types" "github.com/projecteru2/core/utils" ) @@ -19,22 +20,32 @@ const ActiveKey = "/selfmon/active" // NodeStatusWatcher monitors the changes of node status type NodeStatusWatcher struct { - id int64 - cal *Calcium + id int64 + config types.Config + cluster cluster.Cluster + store store.Store } -// NewNodeStatusWatcher . -func NewNodeStatusWatcher(cal *Calcium) *NodeStatusWatcher { +// RunNodeStatusWatcher . +func RunNodeStatusWatcher(ctx context.Context, config types.Config, cluster cluster.Cluster, t *testing.T) { rand.Seed(time.Now().UnixNano()) id := rand.Int63n(10000) // nolint - return &NodeStatusWatcher{ - id: id, - cal: cal, + store, err := store.NewStore(config, t) + if err != nil { + log.Errorf(context.TODO(), "[RunNodeStatusWatcher] %v failed to create store, err: %v", id, err) + return + } + + watcher := &NodeStatusWatcher{ + id: id, + config: config, + store: store, + cluster: cluster, } + watcher.run(ctx) } -func (n *NodeStatusWatcher) run() { - ctx := n.getSignalContext(context.TODO()) +func (n *NodeStatusWatcher) run(ctx context.Context) { for { select { case <-ctx.Done(): @@ -45,22 +56,11 @@ func (n *NodeStatusWatcher) run() { log.Errorf(ctx, "[NodeStatusWatcher] %v stops watching, err: %v", n.id, err) } }) - time.Sleep(n.cal.config.ConnectionTimeout) + time.Sleep(n.config.ConnectionTimeout) } } } -func (n *NodeStatusWatcher) getSignalContext(ctx context.Context) context.Context { - exitCtx, cancel := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) - go func() { - defer cancel() - <-exitCtx.Done() - log.Warnf(ctx, "[NodeStatusWatcher] watcher %v receives a signal to exit", n.id) - }() - - return exitCtx -} - // withActiveLock acquires the active lock synchronously func (n *NodeStatusWatcher) withActiveLock(parentCtx context.Context, f func(ctx context.Context)) { ctx, cancel := context.WithCancel(parentCtx) @@ -90,7 +90,7 @@ func (n *NodeStatusWatcher) withActiveLock(parentCtx context.Context, f func(ctx if errors.Is(err, context.Canceled) { log.Info("[Register] context canceled") return - } else if !errors.Is(err, coretypes.ErrKeyExists) { + } else if !errors.Is(err, types.ErrKeyExists) { log.Errorf(ctx, "[Register] failed to re-register: %v", err) time.Sleep(time.Second) continue @@ -126,20 +126,20 @@ func (n *NodeStatusWatcher) withActiveLock(parentCtx context.Context, f func(ctx } func (n *NodeStatusWatcher) register(ctx context.Context) (<-chan struct{}, func(), error) { - return n.cal.store.StartEphemeral(ctx, ActiveKey, n.cal.config.HAKeepaliveInterval) + return n.store.StartEphemeral(ctx, ActiveKey, n.config.HAKeepaliveInterval) } func (n *NodeStatusWatcher) initNodeStatus(ctx context.Context) { log.Debug(ctx, "[NodeStatusWatcher] init node status started") - nodes := make(chan *coretypes.Node) + nodes := make(chan *types.Node) go func() { defer close(nodes) // Get all nodes which are active status, and regardless of pod. var err error - var ch <-chan *coretypes.Node - utils.WithTimeout(ctx, n.cal.config.GlobalTimeout, func(ctx context.Context) { - ch, err = n.cal.ListPodNodes(ctx, &coretypes.ListNodesOptions{ + var ch <-chan *types.Node + utils.WithTimeout(ctx, n.config.GlobalTimeout, func(ctx context.Context) { + ch, err = n.cluster.ListPodNodes(ctx, &types.ListNodesOptions{ Podname: "", Labels: nil, All: true, @@ -161,9 +161,9 @@ func (n *NodeStatusWatcher) initNodeStatus(ctx context.Context) { }() for node := range nodes { - status, err := n.cal.GetNodeStatus(ctx, node.Name) + status, err := n.cluster.GetNodeStatus(ctx, node.Name) if err != nil { - status = &coretypes.NodeStatus{ + status = &types.NodeStatus{ Nodename: node.Name, Podname: node.Podname, Alive: false, @@ -178,7 +178,7 @@ func (n *NodeStatusWatcher) monitor(ctx context.Context) error { go n.initNodeStatus(ctx) // monitor node status - messageChan := n.cal.NodeStatusStream(ctx) + messageChan := n.cluster.NodeStatusStream(ctx) log.Infof(ctx, "[NodeStatusWatcher] %v watch node status started", n.id) defer log.Infof(ctx, "[NodeStatusWatcher] %v stop watching node status", n.id) @@ -186,7 +186,7 @@ func (n *NodeStatusWatcher) monitor(ctx context.Context) error { select { case message, ok := <-messageChan: if !ok { - return coretypes.ErrMessageChanClosed + return types.ErrMessageChanClosed } go n.dealNodeStatusMessage(ctx, message) case <-ctx.Done(): @@ -195,18 +195,18 @@ func (n *NodeStatusWatcher) monitor(ctx context.Context) error { } } -func (n *NodeStatusWatcher) dealNodeStatusMessage(ctx context.Context, message *coretypes.NodeStatus) { +func (n *NodeStatusWatcher) dealNodeStatusMessage(ctx context.Context, message *types.NodeStatus) { if message.Error != nil { log.Errorf(ctx, "[NodeStatusWatcher] deal with node status stream message failed %+v", message) return } // TODO maybe we need a distributed lock to control concurrency - opts := &coretypes.SetNodeOptions{ + opts := &types.SetNodeOptions{ Nodename: message.Nodename, WorkloadsDown: !message.Alive, } - if _, err := n.cal.SetNode(ctx, opts); err != nil { + if _, err := n.cluster.SetNode(ctx, opts); err != nil { log.Errorf(ctx, "[NodeStatusWatcher] set node %s failed %v", message.Nodename, err) return } diff --git a/store/etcdv3/node.go b/store/etcdv3/node.go index 843fc013e..98ea9c9a4 100644 --- a/store/etcdv3/node.go +++ b/store/etcdv3/node.go @@ -17,7 +17,6 @@ import ( "github.com/projecteru2/core/engine/fake" "github.com/projecteru2/core/log" "github.com/projecteru2/core/metrics" - "github.com/projecteru2/core/store" "github.com/projecteru2/core/types" "github.com/projecteru2/core/utils" ) @@ -178,9 +177,9 @@ func (m *Mercury) UpdateNodes(ctx context.Context, nodes ...*types.Node) error { // UpdateNodeResource update cpu and memory on a node, either add or subtract func (m *Mercury) UpdateNodeResource(ctx context.Context, node *types.Node, resource *types.ResourceMeta, action string) error { switch action { - case store.ActionIncr: + case types.ActionIncr: node.RecycleResources(resource) - case store.ActionDecr: + case types.ActionDecr: node.PreserveResources(resource) default: return types.ErrUnknownControlType diff --git a/store/etcdv3/node_test.go b/store/etcdv3/node_test.go index c81ecafd5..33a925d04 100644 --- a/store/etcdv3/node_test.go +++ b/store/etcdv3/node_test.go @@ -7,7 +7,6 @@ import ( "testing" "time" - "github.com/projecteru2/core/store" "github.com/projecteru2/core/types" "github.com/stretchr/testify/assert" @@ -223,8 +222,8 @@ func TestUpdateNodeResource(t *testing.T) { assert.NoError(t, err) assert.Equal(t, node.Name, "test") assert.Error(t, m.UpdateNodeResource(ctx, node, nil, "wtf")) - assert.NoError(t, m.UpdateNodeResource(ctx, node, &types.ResourceMeta{CPU: map[string]int64{"0": 100}}, store.ActionIncr)) - assert.NoError(t, m.UpdateNodeResource(ctx, node, &types.ResourceMeta{CPU: map[string]int64{"0": 100}}, store.ActionDecr)) + assert.NoError(t, m.UpdateNodeResource(ctx, node, &types.ResourceMeta{CPU: map[string]int64{"0": 100}}, types.ActionIncr)) + assert.NoError(t, m.UpdateNodeResource(ctx, node, &types.ResourceMeta{CPU: map[string]int64{"0": 100}}, types.ActionDecr)) } func TestExtractNodename(t *testing.T) { diff --git a/store/redis/node.go b/store/redis/node.go index 1f3e49091..e58093c03 100644 --- a/store/redis/node.go +++ b/store/redis/node.go @@ -14,7 +14,6 @@ import ( "github.com/projecteru2/core/engine/fake" "github.com/projecteru2/core/log" "github.com/projecteru2/core/metrics" - "github.com/projecteru2/core/store" "github.com/projecteru2/core/types" "github.com/projecteru2/core/utils" @@ -169,9 +168,9 @@ func (r *Rediaron) UpdateNodes(ctx context.Context, nodes ...*types.Node) error // UpdateNodeResource update cpu and memory on a node, either add or subtract func (r *Rediaron) UpdateNodeResource(ctx context.Context, node *types.Node, resource *types.ResourceMeta, action string) error { switch action { - case store.ActionIncr: + case types.ActionIncr: node.RecycleResources(resource) - case store.ActionDecr: + case types.ActionDecr: node.PreserveResources(resource) default: return types.ErrUnknownControlType diff --git a/store/redis/node_test.go b/store/redis/node_test.go index 7b68085f2..dfdb8e5f9 100644 --- a/store/redis/node_test.go +++ b/store/redis/node_test.go @@ -6,7 +6,6 @@ import ( "path/filepath" "time" - "github.com/projecteru2/core/store" "github.com/projecteru2/core/types" ) @@ -213,8 +212,8 @@ func (s *RediaronTestSuite) TestUpdateNodeResource() { s.NoError(err) s.Equal(node.Name, "test") s.Error(s.rediaron.UpdateNodeResource(ctx, node, nil, "wtf")) - s.NoError(s.rediaron.UpdateNodeResource(ctx, node, &types.ResourceMeta{CPU: map[string]int64{"0": 100}}, store.ActionIncr)) - s.NoError(s.rediaron.UpdateNodeResource(ctx, node, &types.ResourceMeta{CPU: map[string]int64{"0": 100}}, store.ActionDecr)) + s.NoError(s.rediaron.UpdateNodeResource(ctx, node, &types.ResourceMeta{CPU: map[string]int64{"0": 100}}, types.ActionIncr)) + s.NoError(s.rediaron.UpdateNodeResource(ctx, node, &types.ResourceMeta{CPU: map[string]int64{"0": 100}}, types.ActionDecr)) } func (s *RediaronTestSuite) TestExtractNodename() { diff --git a/store/store.go b/store/store.go index 5a6a931c7..fe76c6836 100644 --- a/store/store.go +++ b/store/store.go @@ -2,20 +2,16 @@ package store import ( "context" + "testing" "time" "github.com/projecteru2/core/lock" + "github.com/projecteru2/core/store/etcdv3" + "github.com/projecteru2/core/store/redis" "github.com/projecteru2/core/strategy" "github.com/projecteru2/core/types" ) -const ( - // ActionIncr for incr resource - ActionIncr = "+" - // ActionDecr for decr resource - ActionDecr = "-" -) - // Store store eru data type Store interface { // service @@ -65,3 +61,14 @@ type Store interface { // distributed lock CreateLock(key string, ttl time.Duration) (lock.DistributedLock, error) } + +// NewStore creates a store +func NewStore(config types.Config, t *testing.T) (store Store, err error) { + switch config.Store { + case types.Redis: + store, err = redis.New(config, t) + default: + store, err = etcdv3.New(config, t) + } + return store, err +} diff --git a/types/action.go b/types/action.go new file mode 100644 index 000000000..6ecf54689 --- /dev/null +++ b/types/action.go @@ -0,0 +1,8 @@ +package types + +const ( + // ActionIncr for incr resource + ActionIncr = "+" + // ActionDecr for decr resource + ActionDecr = "-" +)