Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support setting tls configs in "SetNode" #526

Merged
merged 1 commit into from
Dec 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion cluster/calcium/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,13 +150,17 @@ func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*typ
if opts.Endpoint != "" {
n.Endpoint = opts.Endpoint
}
// update ca / cert / key
n.Ca = opts.Ca
n.Cert = opts.Cert
n.Key = opts.Key
// update key value
if len(opts.Labels) != 0 {
n.Labels = opts.Labels
}
// update numa
if len(opts.NUMA) != 0 {
n.NUMA = types.NUMA(opts.NUMA)
n.NUMA = opts.NUMA
}
// update numa memory
for numaNode, memoryDelta := range opts.DeltaNUMAMemory {
Expand Down
6 changes: 6 additions & 0 deletions cluster/calcium/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,12 @@ func TestSetNode(t *testing.T) {
n, err = c.SetNode(ctx, setOpts)
assert.NoError(t, err)
assert.Equal(t, n.Endpoint, "tcp://10.10.10.10:2379")
// set ca / cert / key
setOpts.Ca = "hh"
setOpts.Cert = "hh"
setOpts.Key = "hh"
n, err = c.SetNode(ctx, setOpts)
assert.NoError(t, err)
// set numa
setOpts.NUMA = types.NUMA{"100": "node1"}
n, err = c.SetNode(ctx, setOpts)
Expand Down
1 change: 1 addition & 0 deletions core.yaml.sample
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ bind: ":5001"
statsd: "127.0.0.1:8125"
profile: ":12346"
global_timeout: 300s
connection_timeout: 10s
lock_timeout: 30s
cert_path: "/etc/eru/tls"
sentry_dsn: "https://[email protected]/0"
Expand Down
44 changes: 41 additions & 3 deletions engine/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/projecteru2/core/engine/mocks/fakeengine"
"github.com/projecteru2/core/engine/systemd"
"github.com/projecteru2/core/engine/virt"
"github.com/projecteru2/core/log"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"
)
Expand All @@ -29,15 +30,45 @@ var (
engineCache = utils.NewEngineCache(12*time.Hour, 10*time.Minute)
)

func getEngineCacheKey(endpoint, ca, cert, key string) string {
return utils.SHA256(fmt.Sprintf("%v:%v:%v:%v", endpoint, ca, cert, key))
}

func validateEngine(ctx context.Context, engine engine.API, timeout time.Duration) (err error) {
utils.WithTimeout(ctx, timeout, func(ctx context.Context) {
_, err = engine.Info(ctx)
})
return err
}

// GetEngineFromCache .
func GetEngineFromCache(ctx context.Context, config types.Config, endpoint, ca, cert, key string) engine.API {
client := engineCache.Get(getEngineCacheKey(endpoint, ca, cert, key))
if client == nil {
return nil
}
if err := validateEngine(ctx, client, config.ConnectionTimeout); err != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里和L86都是为了真正检查这个engine的可用性,可以一定程度上防止之前http请求连接到https的问题。

log.Errorf(ctx, "[GetEngineFromCache] engine of %v is unavailable, will be removed from cache, err: %v", endpoint, err)
RemoveEngineFromCache(endpoint, ca, cert, key)
return nil
}
return client
}

// RemoveEngineFromCache .
func RemoveEngineFromCache(endpoint, ca, cert, key string) {
engineCache.Delete(getEngineCacheKey(endpoint, ca, cert, key))
}

// GetEngine get engine
func GetEngine(ctx context.Context, config types.Config, nodename, endpoint, ca, cert, key string) (client engine.API, err error) {
if client = engineCache.Get(endpoint); client != nil {
if client = GetEngineFromCache(ctx, config, endpoint, ca, cert, key); client != nil {
return
}

defer func() {
if err == nil && client != nil {
engineCache.Set(endpoint, client)
engineCache.Set(getEngineCacheKey(endpoint, ca, cert, key), client)
}
}()

Expand All @@ -49,7 +80,14 @@ func GetEngine(ctx context.Context, config types.Config, nodename, endpoint, ca,
if !ok {
return nil, types.ErrNotSupport
}
return e(ctx, config, nodename, endpoint, ca, cert, key)
if client, err = e(ctx, config, nodename, endpoint, ca, cert, key); err != nil {
return nil, err
}
if err = validateEngine(ctx, client, config.ConnectionTimeout); err != nil {
log.Errorf(ctx, "[GetEngine] engine of %v is unavailable, err: %v", endpoint, err)
return nil, err
}
return client, nil
}

func getEnginePrefix(endpoint string) (string, error) {
Expand Down
1,919 changes: 973 additions & 946 deletions rpc/gen/core.pb.go

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions rpc/gen/core.proto
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ message SetNodeOptions {
bool workloads_down = 10;
string endpoint = 11;
TriOpt bypass_opt = 12;
string ca = 13;
string cert = 14;
string key = 15;
}

message SetNodeStatusOptions {
Expand Down
3 changes: 3 additions & 0 deletions rpc/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ func toCoreSetNodeOptions(b *pb.SetNodeOptions) (*types.SetNodeOptions, error) {
NUMA: b.Numa,
Labels: b.Labels,
BypassOpt: types.TriOptions(b.BypassOpt),
Ca: b.Ca,
Cert: b.Cert,
Key: b.Key,
}
for cpuID, cpuShare := range b.DeltaCpu {
r.DeltaCPU[cpuID] = int64(cpuShare)
Expand Down
42 changes: 33 additions & 9 deletions store/etcdv3/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ import (
"path/filepath"
"strconv"
"strings"
"sync"

"github.com/pkg/errors"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"

"github.com/projecteru2/core/engine"
enginefactory "github.com/projecteru2/core/engine/factory"
Expand All @@ -15,10 +20,6 @@ import (
"github.com/projecteru2/core/store"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"

"github.com/pkg/errors"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
)

// AddNode save it to etcd
Expand Down Expand Up @@ -145,6 +146,11 @@ func (m *Mercury) GetNodesByPod(ctx context.Context, podname string, labels map[
// UpdateNodes .
func (m *Mercury) UpdateNodes(ctx context.Context, nodes ...*types.Node) error {
data := map[string]string{}
addIfNotEmpty := func(key, value string) {
if value != "" {
data[key] = value
}
}
for _, node := range nodes {
bytes, err := json.Marshal(node)
if err != nil {
Expand All @@ -153,6 +159,10 @@ func (m *Mercury) UpdateNodes(ctx context.Context, nodes ...*types.Node) error {
d := string(bytes)
data[fmt.Sprintf(nodeInfoKey, node.Name)] = d
data[fmt.Sprintf(nodePodKey, node.Podname, node.Name)] = d
addIfNotEmpty(fmt.Sprintf(nodeCaKey, node.Name), node.Ca)
addIfNotEmpty(fmt.Sprintf(nodeCertKey, node.Name), node.Cert)
addIfNotEmpty(fmt.Sprintf(nodeKeyKey, node.Name), node.Key)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

所以怎么删掉 ca?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

没法通过api删掉,但是可以去etcd里手动把那三个key给删了

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

虽然我觉得不太好 但是也没想到好办法...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

硬要想办法的话,可以在proto里设置一个"message String",区分nil和""和含义...就是很麻烦和繁琐,看着也不好看

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

要不把 delta 用起来? node set options 里有个 delta 参数, 默认是 false, 规定只有 true 的时候才改变 ca. 这正好符合使用场景, 因为改变 ca 的时候就是需要 cli node set --delta 才能保持资源不变.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

要不把 delta 用起来? node set options 里有个 delta 参数, 默认是 false, 规定只有 true 的时候才改变 ca. 这正好符合使用场景, 因为改变 ca 的时候就是需要 cli node set --delta 才能保持资源不变.

感觉还是有点奇怪,比如原本这个node是有tls相关的这些东西的,然后cli node set --delta --memory 1G xxx-node,那这个时候要不要把ca置为空?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

还是保持现状吧..

enginefactory.RemoveEngineFromCache(node.Endpoint, node.Ca, node.Cert, node.Key)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里提供一个可以清除engine缓存的入口,调用eru-cli node set可以清除对应node的engine缓存。

但是要保证ca / cert / key都跟之前的一致,才能清除掉缓存。例如对于开启了TLS的node,如果只是调用eru-cli node set xxx-node而不传任何参数的话,这里Ca / Cert / Key都是空字符串,自然清除不掉正确的缓存。

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这些很细节的地方需要好好文档记录 过一个月可能自己都忘了怎么清除缓存..

}

resp, err := m.BatchUpdate(ctx, data)
Expand Down Expand Up @@ -180,13 +190,19 @@ func (m *Mercury) UpdateNodeResource(ctx context.Context, node *types.Node, reso
}

func (m *Mercury) makeClient(ctx context.Context, node *types.Node) (client engine.API, err error) {
// try to get from cache without ca/cert/key
if client = enginefactory.GetEngineFromCache(ctx, m.config, node.Endpoint, "", "", ""); client != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

在不知道有没有配置TLS的情况下,先拿空的去试一试,这样可以减轻ETCD的压力。即使配置了TLS,缓存里查不到也不会有网络IO,性能应该还行。

return client, nil
}

keyFormats := []string{nodeCaKey, nodeCertKey, nodeKeyKey}
data := []string{"", "", ""}
for i := 0; i < 3; i++ {
ev, err := m.GetOne(ctx, fmt.Sprintf(keyFormats[i], node.Name))
if err != nil {
if !errors.Is(err, types.ErrBadCount) {
log.Warnf(ctx, "[makeClient] Get key failed %v", err)
return nil, err
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

拿不到就不应该继续了,否则用不正确的tls config去连接,可能会出奇怪问题。

This comment was marked as resolved.

}
continue
}
Expand Down Expand Up @@ -282,13 +298,21 @@ func (m *Mercury) doGetNodes(ctx context.Context, kvs []*mvccpb.KeyValue, labels
return nil, err
}
node.Init()
if (!node.IsDown() || all) && utils.FilterWorkload(node.Labels, labels) {
if node.Engine, err = m.makeClient(ctx, node); err != nil {
return
nodes = append(nodes, node)
}
wg := &sync.WaitGroup{}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

因为后面GetEngine每次都要调用Info来validate一下,这里还是并发请求比较好。

wg.Add(len(nodes))
for _, node := range nodes {
go func(node *types.Node) {
defer wg.Done()
if (!node.IsDown() || all) && utils.FilterWorkload(node.Labels, labels) {
if node.Engine, err = m.makeClient(ctx, node); err != nil {
return
}
}
nodes = append(nodes, node)
}
}(node)
}
wg.Wait()
return nodes, nil
}

Expand Down
14 changes: 8 additions & 6 deletions store/etcdv3/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,14 +141,12 @@ RdCPRPt513WozkJZZAjUSP2U
m.config.CertPath = "/tmp"
node3, err := m.doAddNode(ctx, nodename3, endpoint3, podname, ca, cert, certkey, cpu, share, memory, storage, labels, nil, nil, nil)
assert.NoError(t, err)
engine3, err := m.makeClient(ctx, node3)
assert.NoError(t, err)
_, err = engine3.Info(ctx)
_, err = m.makeClient(ctx, node3)
assert.Error(t, err)
// failed by get key
node3.Name = "nokey"
_, err = m.makeClient(ctx, node3)
assert.NoError(t, err)
assert.Error(t, err)
}

func TestRemoveNode(t *testing.T) {
Expand Down Expand Up @@ -201,8 +199,12 @@ func TestUpdateNode(t *testing.T) {
assert.Equal(t, node.Name, "test")
fakeNode := &types.Node{
NodeMeta: types.NodeMeta{
Name: "nil",
Podname: "wtf",
Name: "nil",
Podname: "wtf",
Endpoint: "mock://hh",
Ca: "hh",
Cert: "hh",
Key: "hh",
},
}
assert.Error(t, m.UpdateNodes(ctx, fakeNode))
Expand Down
18 changes: 14 additions & 4 deletions store/redis/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,11 @@ func (r *Rediaron) GetNodesByPod(ctx context.Context, podname string, labels map
// UpdateNodes .
func (r *Rediaron) UpdateNodes(ctx context.Context, nodes ...*types.Node) error {
data := map[string]string{}
addIfNotEmpty := func(key, value string) {
if value != "" {
data[key] = value
}
}
for _, node := range nodes {
bytes, err := json.Marshal(node)
if err != nil {
Expand All @@ -152,6 +157,9 @@ func (r *Rediaron) UpdateNodes(ctx context.Context, nodes ...*types.Node) error
d := string(bytes)
data[fmt.Sprintf(nodeInfoKey, node.Name)] = d
data[fmt.Sprintf(nodePodKey, node.Podname, node.Name)] = d
addIfNotEmpty(fmt.Sprintf(nodeCaKey, node.Name), node.Ca)
addIfNotEmpty(fmt.Sprintf(nodeCertKey, node.Name), node.Cert)
addIfNotEmpty(fmt.Sprintf(nodeKeyKey, node.Name), node.Key)
}
return errors.WithStack(r.BatchUpdate(ctx, data))
}
Expand All @@ -170,17 +178,19 @@ func (r *Rediaron) UpdateNodeResource(ctx context.Context, node *types.Node, res
return r.UpdateNodes(ctx, node)
}

func (r *Rediaron) makeClient(ctx context.Context, node *types.Node) (engine.API, error) {
// try get client, if nil, create a new one
var client engine.API
var err error
func (r *Rediaron) makeClient(ctx context.Context, node *types.Node) (client engine.API, err error) {
// try to get from cache without ca/cert/key
if client = enginefactory.GetEngineFromCache(ctx, r.config, node.Endpoint, "", "", ""); client != nil {
return client, nil
}
keyFormats := []string{nodeCaKey, nodeCertKey, nodeKeyKey}
data := []string{"", "", ""}
for i := 0; i < 3; i++ {
v, err := r.GetOne(ctx, fmt.Sprintf(keyFormats[i], node.Name))
if err != nil {
if !isRedisNoKeyError(err) {
log.Warnf(ctx, "[makeClient] Get key failed %v", err)
return nil, err
}
continue
}
Expand Down
14 changes: 8 additions & 6 deletions store/redis/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,12 @@ RdCPRPt513WozkJZZAjUSP2U
s.rediaron.config.CertPath = "/tmp"
node3, err := s.rediaron.doAddNode(ctx, nodename3, endpoint3, podname, ca, cert, certkey, cpu, share, memory, storage, labels, nil, nil, nil)
s.NoError(err)
engine3, err := s.rediaron.makeClient(ctx, node3)
s.NoError(err)
_, err = engine3.Info(ctx)
_, err = s.rediaron.makeClient(ctx, node3)
s.Error(err)
// failed by get key
node3.Name = "nokey"
_, err = s.rediaron.makeClient(ctx, node3)
s.NoError(err)
s.Error(err)
}

func (s *RediaronTestSuite) TestRemoveNode() {
Expand Down Expand Up @@ -194,8 +192,12 @@ func (s *RediaronTestSuite) TestUpdateNode() {
s.Equal(node.Name, "test")
fakeNode := &types.Node{
NodeMeta: types.NodeMeta{
Name: "nil",
Podname: "wtf",
Name: "nil",
Podname: "wtf",
Endpoint: "mock://hh",
Ca: "hh",
Cert: "hh",
Key: "hh",
},
}
s.Error(s.rediaron.UpdateNodes(ctx, fakeNode))
Expand Down
19 changes: 10 additions & 9 deletions types/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@ const (

// Config holds eru-core config
type Config struct {
LogLevel string `yaml:"log_level" required:"true" default:"INFO"`
Bind string `yaml:"bind" required:"true" default:"5001"` // HTTP API address
LockTimeout time.Duration `yaml:"lock_timeout" required:"true" default:"30s"` // timeout for lock (ttl)
GlobalTimeout time.Duration `yaml:"global_timeout" required:"true" default:"300s"` // timeout for remove, run_and_wait and build, in second
Statsd string `yaml:"statsd"` // statsd host and port
Profile string `yaml:"profile"` // profile ip:port
CertPath string `yaml:"cert_path"` // docker cert files path
MaxConcurrency int64 `yaml:"max_concurrency" default:"20"` // concurrently call single runtime in the same time
Store string `yaml:"store" default:"etcd"` // store type
LogLevel string `yaml:"log_level" required:"true" default:"INFO"`
Bind string `yaml:"bind" required:"true" default:"5001"` // HTTP API address
LockTimeout time.Duration `yaml:"lock_timeout" required:"true" default:"30s"` // timeout for lock (ttl)
GlobalTimeout time.Duration `yaml:"global_timeout" required:"true" default:"300s"` // timeout for remove, run_and_wait and build, in second
ConnectionTimeout time.Duration `yaml:"connection_timeout" default:"10s"` // timeout for connections
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

加了一个配置项,“小timeout”

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

做咩的

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

做咩的
GlobalTimeout太大了,用于一些api call之类的地方不太合适,所以放了个“小timeout”。

Statsd string `yaml:"statsd"` // statsd host and port
Profile string `yaml:"profile"` // profile ip:port
CertPath string `yaml:"cert_path"` // docker cert files path
MaxConcurrency int64 `yaml:"max_concurrency" default:"20"` // concurrently call single runtime in the same time
Store string `yaml:"store" default:"etcd"` // store type

Auth AuthConfig `yaml:"auth"` // grpc auth
GRPCConfig GRPCConfig `yaml:"grpc"` // grpc config
Expand Down
4 changes: 4 additions & 0 deletions types/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ type NodeMeta struct {
InitStorageCap int64 `json:"init_storage_cap"`
InitNUMAMemory NUMAMemory `json:"init_numa_memory"`
InitVolume VolumeMap `json:"init_volume"`

Ca string `json:"-"`
Cert string `json:"-"`
Key string `json:"-"`
}

// DeepCopy returns a deepcopy of nodemeta
Expand Down
3 changes: 3 additions & 0 deletions types/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,9 @@ type SetNodeOptions struct {
NUMA map[string]string
Labels map[string]string
BypassOpt TriOptions
Ca string
Cert string
Key string
}

// Validate checks options
Expand Down
Loading