Skip to content

Commit

Permalink
support setting tls configs in "SetNode" (#526)
Browse files Browse the repository at this point in the history
  • Loading branch information
DuodenumL authored Dec 29, 2021
1 parent 117368c commit d903237
Show file tree
Hide file tree
Showing 16 changed files with 1,127 additions and 984 deletions.
6 changes: 5 additions & 1 deletion cluster/calcium/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,13 +150,17 @@ func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*typ
if opts.Endpoint != "" {
n.Endpoint = opts.Endpoint
}
// update ca / cert / key
n.Ca = opts.Ca
n.Cert = opts.Cert
n.Key = opts.Key
// update key value
if len(opts.Labels) != 0 {
n.Labels = opts.Labels
}
// update numa
if len(opts.NUMA) != 0 {
n.NUMA = types.NUMA(opts.NUMA)
n.NUMA = opts.NUMA
}
// update numa memory
for numaNode, memoryDelta := range opts.DeltaNUMAMemory {
Expand Down
6 changes: 6 additions & 0 deletions cluster/calcium/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,12 @@ func TestSetNode(t *testing.T) {
n, err = c.SetNode(ctx, setOpts)
assert.NoError(t, err)
assert.Equal(t, n.Endpoint, "tcp://10.10.10.10:2379")
// set ca / cert / key
setOpts.Ca = "hh"
setOpts.Cert = "hh"
setOpts.Key = "hh"
n, err = c.SetNode(ctx, setOpts)
assert.NoError(t, err)
// set numa
setOpts.NUMA = types.NUMA{"100": "node1"}
n, err = c.SetNode(ctx, setOpts)
Expand Down
1 change: 1 addition & 0 deletions core.yaml.sample
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ bind: ":5001"
statsd: "127.0.0.1:8125"
profile: ":12346"
global_timeout: 300s
connection_timeout: 10s
lock_timeout: 30s
cert_path: "/etc/eru/tls"
sentry_dsn: "https://[email protected]/0"
Expand Down
44 changes: 41 additions & 3 deletions engine/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/projecteru2/core/engine/mocks/fakeengine"
"github.com/projecteru2/core/engine/systemd"
"github.com/projecteru2/core/engine/virt"
"github.com/projecteru2/core/log"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"
)
Expand All @@ -29,15 +30,45 @@ var (
engineCache = utils.NewEngineCache(12*time.Hour, 10*time.Minute)
)

func getEngineCacheKey(endpoint, ca, cert, key string) string {
return utils.SHA256(fmt.Sprintf("%v:%v:%v:%v", endpoint, ca, cert, key))
}

func validateEngine(ctx context.Context, engine engine.API, timeout time.Duration) (err error) {
utils.WithTimeout(ctx, timeout, func(ctx context.Context) {
_, err = engine.Info(ctx)
})
return err
}

// GetEngineFromCache .
func GetEngineFromCache(ctx context.Context, config types.Config, endpoint, ca, cert, key string) engine.API {
client := engineCache.Get(getEngineCacheKey(endpoint, ca, cert, key))
if client == nil {
return nil
}
if err := validateEngine(ctx, client, config.ConnectionTimeout); err != nil {
log.Errorf(ctx, "[GetEngineFromCache] engine of %v is unavailable, will be removed from cache, err: %v", endpoint, err)
RemoveEngineFromCache(endpoint, ca, cert, key)
return nil
}
return client
}

// RemoveEngineFromCache .
func RemoveEngineFromCache(endpoint, ca, cert, key string) {
engineCache.Delete(getEngineCacheKey(endpoint, ca, cert, key))
}

// GetEngine get engine
func GetEngine(ctx context.Context, config types.Config, nodename, endpoint, ca, cert, key string) (client engine.API, err error) {
if client = engineCache.Get(endpoint); client != nil {
if client = GetEngineFromCache(ctx, config, endpoint, ca, cert, key); client != nil {
return
}

defer func() {
if err == nil && client != nil {
engineCache.Set(endpoint, client)
engineCache.Set(getEngineCacheKey(endpoint, ca, cert, key), client)
}
}()

Expand All @@ -49,7 +80,14 @@ func GetEngine(ctx context.Context, config types.Config, nodename, endpoint, ca,
if !ok {
return nil, types.ErrNotSupport
}
return e(ctx, config, nodename, endpoint, ca, cert, key)
if client, err = e(ctx, config, nodename, endpoint, ca, cert, key); err != nil {
return nil, err
}
if err = validateEngine(ctx, client, config.ConnectionTimeout); err != nil {
log.Errorf(ctx, "[GetEngine] engine of %v is unavailable, err: %v", endpoint, err)
return nil, err
}
return client, nil
}

func getEnginePrefix(endpoint string) (string, error) {
Expand Down
1,919 changes: 973 additions & 946 deletions rpc/gen/core.pb.go

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions rpc/gen/core.proto
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ message SetNodeOptions {
bool workloads_down = 10;
string endpoint = 11;
TriOpt bypass_opt = 12;
string ca = 13;
string cert = 14;
string key = 15;
}

message SetNodeStatusOptions {
Expand Down
3 changes: 3 additions & 0 deletions rpc/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ func toCoreSetNodeOptions(b *pb.SetNodeOptions) (*types.SetNodeOptions, error) {
NUMA: b.Numa,
Labels: b.Labels,
BypassOpt: types.TriOptions(b.BypassOpt),
Ca: b.Ca,
Cert: b.Cert,
Key: b.Key,
}
for cpuID, cpuShare := range b.DeltaCpu {
r.DeltaCPU[cpuID] = int64(cpuShare)
Expand Down
42 changes: 33 additions & 9 deletions store/etcdv3/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ import (
"path/filepath"
"strconv"
"strings"
"sync"

"github.com/pkg/errors"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"

"github.com/projecteru2/core/engine"
enginefactory "github.com/projecteru2/core/engine/factory"
Expand All @@ -15,10 +20,6 @@ import (
"github.com/projecteru2/core/store"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"

"github.com/pkg/errors"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
)

// AddNode save it to etcd
Expand Down Expand Up @@ -145,6 +146,11 @@ func (m *Mercury) GetNodesByPod(ctx context.Context, podname string, labels map[
// UpdateNodes .
func (m *Mercury) UpdateNodes(ctx context.Context, nodes ...*types.Node) error {
data := map[string]string{}
addIfNotEmpty := func(key, value string) {
if value != "" {
data[key] = value
}
}
for _, node := range nodes {
bytes, err := json.Marshal(node)
if err != nil {
Expand All @@ -153,6 +159,10 @@ func (m *Mercury) UpdateNodes(ctx context.Context, nodes ...*types.Node) error {
d := string(bytes)
data[fmt.Sprintf(nodeInfoKey, node.Name)] = d
data[fmt.Sprintf(nodePodKey, node.Podname, node.Name)] = d
addIfNotEmpty(fmt.Sprintf(nodeCaKey, node.Name), node.Ca)
addIfNotEmpty(fmt.Sprintf(nodeCertKey, node.Name), node.Cert)
addIfNotEmpty(fmt.Sprintf(nodeKeyKey, node.Name), node.Key)
enginefactory.RemoveEngineFromCache(node.Endpoint, node.Ca, node.Cert, node.Key)
}

resp, err := m.BatchUpdate(ctx, data)
Expand Down Expand Up @@ -180,13 +190,19 @@ func (m *Mercury) UpdateNodeResource(ctx context.Context, node *types.Node, reso
}

func (m *Mercury) makeClient(ctx context.Context, node *types.Node) (client engine.API, err error) {
// try to get from cache without ca/cert/key
if client = enginefactory.GetEngineFromCache(ctx, m.config, node.Endpoint, "", "", ""); client != nil {
return client, nil
}

keyFormats := []string{nodeCaKey, nodeCertKey, nodeKeyKey}
data := []string{"", "", ""}
for i := 0; i < 3; i++ {
ev, err := m.GetOne(ctx, fmt.Sprintf(keyFormats[i], node.Name))
if err != nil {
if !errors.Is(err, types.ErrBadCount) {
log.Warnf(ctx, "[makeClient] Get key failed %v", err)
return nil, err
}
continue
}
Expand Down Expand Up @@ -282,13 +298,21 @@ func (m *Mercury) doGetNodes(ctx context.Context, kvs []*mvccpb.KeyValue, labels
return nil, err
}
node.Init()
if (!node.IsDown() || all) && utils.FilterWorkload(node.Labels, labels) {
if node.Engine, err = m.makeClient(ctx, node); err != nil {
return
nodes = append(nodes, node)
}
wg := &sync.WaitGroup{}
wg.Add(len(nodes))
for _, node := range nodes {
go func(node *types.Node) {
defer wg.Done()
if (!node.IsDown() || all) && utils.FilterWorkload(node.Labels, labels) {
if node.Engine, err = m.makeClient(ctx, node); err != nil {
return
}
}
nodes = append(nodes, node)
}
}(node)
}
wg.Wait()
return nodes, nil
}

Expand Down
14 changes: 8 additions & 6 deletions store/etcdv3/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,14 +141,12 @@ RdCPRPt513WozkJZZAjUSP2U
m.config.CertPath = "/tmp"
node3, err := m.doAddNode(ctx, nodename3, endpoint3, podname, ca, cert, certkey, cpu, share, memory, storage, labels, nil, nil, nil)
assert.NoError(t, err)
engine3, err := m.makeClient(ctx, node3)
assert.NoError(t, err)
_, err = engine3.Info(ctx)
_, err = m.makeClient(ctx, node3)
assert.Error(t, err)
// failed by get key
node3.Name = "nokey"
_, err = m.makeClient(ctx, node3)
assert.NoError(t, err)
assert.Error(t, err)
}

func TestRemoveNode(t *testing.T) {
Expand Down Expand Up @@ -201,8 +199,12 @@ func TestUpdateNode(t *testing.T) {
assert.Equal(t, node.Name, "test")
fakeNode := &types.Node{
NodeMeta: types.NodeMeta{
Name: "nil",
Podname: "wtf",
Name: "nil",
Podname: "wtf",
Endpoint: "mock://hh",
Ca: "hh",
Cert: "hh",
Key: "hh",
},
}
assert.Error(t, m.UpdateNodes(ctx, fakeNode))
Expand Down
18 changes: 14 additions & 4 deletions store/redis/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,11 @@ func (r *Rediaron) GetNodesByPod(ctx context.Context, podname string, labels map
// UpdateNodes .
func (r *Rediaron) UpdateNodes(ctx context.Context, nodes ...*types.Node) error {
data := map[string]string{}
addIfNotEmpty := func(key, value string) {
if value != "" {
data[key] = value
}
}
for _, node := range nodes {
bytes, err := json.Marshal(node)
if err != nil {
Expand All @@ -152,6 +157,9 @@ func (r *Rediaron) UpdateNodes(ctx context.Context, nodes ...*types.Node) error
d := string(bytes)
data[fmt.Sprintf(nodeInfoKey, node.Name)] = d
data[fmt.Sprintf(nodePodKey, node.Podname, node.Name)] = d
addIfNotEmpty(fmt.Sprintf(nodeCaKey, node.Name), node.Ca)
addIfNotEmpty(fmt.Sprintf(nodeCertKey, node.Name), node.Cert)
addIfNotEmpty(fmt.Sprintf(nodeKeyKey, node.Name), node.Key)
}
return errors.WithStack(r.BatchUpdate(ctx, data))
}
Expand All @@ -170,17 +178,19 @@ func (r *Rediaron) UpdateNodeResource(ctx context.Context, node *types.Node, res
return r.UpdateNodes(ctx, node)
}

func (r *Rediaron) makeClient(ctx context.Context, node *types.Node) (engine.API, error) {
// try get client, if nil, create a new one
var client engine.API
var err error
func (r *Rediaron) makeClient(ctx context.Context, node *types.Node) (client engine.API, err error) {
// try to get from cache without ca/cert/key
if client = enginefactory.GetEngineFromCache(ctx, r.config, node.Endpoint, "", "", ""); client != nil {
return client, nil
}
keyFormats := []string{nodeCaKey, nodeCertKey, nodeKeyKey}
data := []string{"", "", ""}
for i := 0; i < 3; i++ {
v, err := r.GetOne(ctx, fmt.Sprintf(keyFormats[i], node.Name))
if err != nil {
if !isRedisNoKeyError(err) {
log.Warnf(ctx, "[makeClient] Get key failed %v", err)
return nil, err
}
continue
}
Expand Down
14 changes: 8 additions & 6 deletions store/redis/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,12 @@ RdCPRPt513WozkJZZAjUSP2U
s.rediaron.config.CertPath = "/tmp"
node3, err := s.rediaron.doAddNode(ctx, nodename3, endpoint3, podname, ca, cert, certkey, cpu, share, memory, storage, labels, nil, nil, nil)
s.NoError(err)
engine3, err := s.rediaron.makeClient(ctx, node3)
s.NoError(err)
_, err = engine3.Info(ctx)
_, err = s.rediaron.makeClient(ctx, node3)
s.Error(err)
// failed by get key
node3.Name = "nokey"
_, err = s.rediaron.makeClient(ctx, node3)
s.NoError(err)
s.Error(err)
}

func (s *RediaronTestSuite) TestRemoveNode() {
Expand Down Expand Up @@ -194,8 +192,12 @@ func (s *RediaronTestSuite) TestUpdateNode() {
s.Equal(node.Name, "test")
fakeNode := &types.Node{
NodeMeta: types.NodeMeta{
Name: "nil",
Podname: "wtf",
Name: "nil",
Podname: "wtf",
Endpoint: "mock://hh",
Ca: "hh",
Cert: "hh",
Key: "hh",
},
}
s.Error(s.rediaron.UpdateNodes(ctx, fakeNode))
Expand Down
19 changes: 10 additions & 9 deletions types/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@ const (

// Config holds eru-core config
type Config struct {
LogLevel string `yaml:"log_level" required:"true" default:"INFO"`
Bind string `yaml:"bind" required:"true" default:"5001"` // HTTP API address
LockTimeout time.Duration `yaml:"lock_timeout" required:"true" default:"30s"` // timeout for lock (ttl)
GlobalTimeout time.Duration `yaml:"global_timeout" required:"true" default:"300s"` // timeout for remove, run_and_wait and build, in second
Statsd string `yaml:"statsd"` // statsd host and port
Profile string `yaml:"profile"` // profile ip:port
CertPath string `yaml:"cert_path"` // docker cert files path
MaxConcurrency int64 `yaml:"max_concurrency" default:"20"` // concurrently call single runtime in the same time
Store string `yaml:"store" default:"etcd"` // store type
LogLevel string `yaml:"log_level" required:"true" default:"INFO"`
Bind string `yaml:"bind" required:"true" default:"5001"` // HTTP API address
LockTimeout time.Duration `yaml:"lock_timeout" required:"true" default:"30s"` // timeout for lock (ttl)
GlobalTimeout time.Duration `yaml:"global_timeout" required:"true" default:"300s"` // timeout for remove, run_and_wait and build, in second
ConnectionTimeout time.Duration `yaml:"connection_timeout" default:"10s"` // timeout for connections
Statsd string `yaml:"statsd"` // statsd host and port
Profile string `yaml:"profile"` // profile ip:port
CertPath string `yaml:"cert_path"` // docker cert files path
MaxConcurrency int64 `yaml:"max_concurrency" default:"20"` // concurrently call single runtime in the same time
Store string `yaml:"store" default:"etcd"` // store type

Auth AuthConfig `yaml:"auth"` // grpc auth
GRPCConfig GRPCConfig `yaml:"grpc"` // grpc config
Expand Down
4 changes: 4 additions & 0 deletions types/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ type NodeMeta struct {
InitStorageCap int64 `json:"init_storage_cap"`
InitNUMAMemory NUMAMemory `json:"init_numa_memory"`
InitVolume VolumeMap `json:"init_volume"`

Ca string `json:"-"`
Cert string `json:"-"`
Key string `json:"-"`
}

// DeepCopy returns a deepcopy of nodemeta
Expand Down
3 changes: 3 additions & 0 deletions types/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,9 @@ type SetNodeOptions struct {
NUMA map[string]string
Labels map[string]string
BypassOpt TriOptions
Ca string
Cert string
Key string
}

// Validate checks options
Expand Down
Loading

0 comments on commit d903237

Please sign in to comment.