From a1a6359cd985c7146e6bdbdf19fb5def0f04f0cf Mon Sep 17 00:00:00 2001 From: tonic Date: Mon, 13 Jun 2016 20:00:34 +0800 Subject: [PATCH] cluster and store --- cluster/cluster.go | 20 ++++ cluster/interface.go | 22 +++++ cluster/meta.go | 34 +++++++ store/store.go | 224 +++++++++++++++++++++++++++++++++++++++++++ types/config.go | 25 +++++ types/container.go | 7 ++ types/node.go | 19 ++++ types/pod.go | 11 +++ types/specs.go | 99 +++++++++++++++++++ utils/utils.go | 78 +++++++++++++++ 10 files changed, 539 insertions(+) create mode 100644 cluster/cluster.go create mode 100644 cluster/interface.go create mode 100644 cluster/meta.go create mode 100644 store/store.go create mode 100644 types/config.go create mode 100644 types/container.go create mode 100644 types/node.go create mode 100644 types/pod.go create mode 100644 types/specs.go create mode 100644 utils/utils.go diff --git a/cluster/cluster.go b/cluster/cluster.go new file mode 100644 index 000000000..550e5fa61 --- /dev/null +++ b/cluster/cluster.go @@ -0,0 +1,20 @@ +package cluster + +import ( + "gitlab.ricebook.net/platform/core/store" + "gitlab.ricebook.net/platform/core/types" +) + +type Calcium struct { + store *store.Store + config *types.Config +} + +func NewCalcum(config *types.Config) (*Calcium, error) { + store, err := store.NewStore(config) + if err != nil { + return nil, err + } + + return &Calcium{store: store, config: config}, nil +} diff --git a/cluster/interface.go b/cluster/interface.go new file mode 100644 index 000000000..5b5983379 --- /dev/null +++ b/cluster/interface.go @@ -0,0 +1,22 @@ +package cluster + +import ( + "gitlab.ricebook.net/platform/core/types" +) + +type Cluster interface { + // meta data methods + ListPods() ([]types.Pod, error) + AddPod(podname, desc string) (types.Pod, error) + GetPod(podname string) (types.Pod, error) + AddNode(nodename, endpoint, podname string, public bool) (types.Node, error) + GetNode(podname, nodename string) (types.Node, error) + ListPodNodes(podname string) ([]types.Node, error) + + // cluster methods + BuildImage() error + CreateContainer() error + UpdateContainer() error + RemoveContainer() error + MigrateContainer() error +} diff --git a/cluster/meta.go b/cluster/meta.go new file mode 100644 index 000000000..aff0ff44a --- /dev/null +++ b/cluster/meta.go @@ -0,0 +1,34 @@ +package cluster + +import "gitlab.ricebook.net/platform/core/types" + +// All functions are just proxy to store, +// since I don't want store to be exported. +// All these functions are meta data related. +func (c *Calcium) ListPods() ([]types.Pod, error) { + return c.store.GetAllPods() +} + +func (c *Calcium) ListPods() ([]types.Pod, error) { + return c.store.GetAllPods() +} + +func (c *Calcium) AddPod(podname, desc string) (types.Pod, error) { + return c.store.AddPod(podname, desc) +} + +func (c *Calcium) GetPod(podname string) (types.Pod, error) { + return c.store.GetPod(podname) +} + +func (c *Calcium) AddNode(nodename, endpoint, podname string, public bool) (types.Node, error) { + return c.store.AddNode(nodename, endpoint, podname, public) +} + +func (c *Calcium) GetNode(podname, nodename string) (types.Node, error) { + return c.store.GetNode(podname, nodename) +} + +func (c *Calcium) ListPodNodes(podname string) ([]types.Node, error) { + return c.store.GetNodesByPod(podname) +} diff --git a/store/store.go b/store/store.go new file mode 100644 index 000000000..fdcd9113e --- /dev/null +++ b/store/store.go @@ -0,0 +1,224 @@ +package store + +import ( + "encoding/json" + "fmt" + "strconv" + + "github.com/coreos/etcd/client" + "gitlab.ricebook.net/core/types" + "gitlab.ricebook.net/core/utils" + "golang.org/x/net/context" +) + +var ( + allPodsKey = "/eru-core/pod" + podInfoKey = "/eru-core/pod/%s/info" + podNodesKey = "/eru-core/pod/%s/node" + nodeInfoKey = "/eru-core/pod/%s/node/%s/info" + nodeContainerKey = "/eru-core/pod/%s/node/%s/containers" +) + +type Store struct { + etcd *client.KeysAPI + config *types.Config +} + +// get a pod from etcd +func (s *Store) GetPod(name string) (types.Pod, error) { + key := fmt.Sprintf(podInfoKey, name) + resp, err := s.etcd.Get(context.Background(), key, nil) + if err != nil { + return nil, err + } + if resp.Node.Dir { + return nil, fmt.Errorf("Pod storage path %q in etcd is a directory", key) + } + + pod := Pod{} + if err := json.Unmarshal([]byte(resp.Node.Value), &pod); err != nil { + return nil, err + } + + return pod, nil +} + +// add a pod +// save it to etcd +func (s *Store) AddPod(name, desc string) (types.Pod, error) { + key := fmt.Sprintf(podInfoKey, name) + pod := types.Pod{Name: name, Desc: desc} + + bytes, err := json.Marshal(pod) + if err != nil { + return nil, err + } + + _, err = s.etcd.Set(context.Background(), key, string(bytes).nil) + if err != nil { + return nil, err + } + + return pod, nil +} + +// get all pods in etcd +// any error will break and return error immediately +func (s *Store) GetAllPods() ([]types.Pod, error) { + var ( + pods []types.Pod + err error + ) + + resp, err := s.etcd.Get(context.Background(), allPodsKey, nil) + if err != nil { + return pods, err + } + if !resp.Node.Dir { + return nil, fmt.Errorf("Pod storage path %q in etcd is not a directory", allPodsKey) + } + + for _, node := range resp.Node.Nodes { + name := utils.Tail(node.Key) + p, err := s.GetPod(name) + if err != nil { + return pods, err + } + pods = append(pods, p) + } + return pods, err +} + +// get a node from etcd +// and construct it's docker client +// a node must belong to a pod +// and since node is not the smallest unit to user, to get a node we must specify the corresponding pod +func (s *Store) GetNode(podname, nodename string) (types.Node, error) { + key := fmt.Sprintf(nodeInfoKey, podname, nodename) + resp, err := s.etcd.Get(context.Background(), key, nil) + if err != nil { + return nil, err + } + if resp.Node.Dir { + return nil, fmt.Errorf("Node storage path %q in etcd is a directory", key) + } + + node := Node{} + if err := json.Unmarshal([]byte(resp.Node.Value), &node); err != nil { + return nil, err + } + + engine, err := utils.MakeDockerClient(node.Endpoint, s.config) + if err != nil { + return nil, err + } + + node.Engine = engine + return node, nil +} + +// add a node +// save it to etcd +func (s *Store) AddNode(name, endpoint, podname string, public bool) (types.Node, error) { + engine, err := utils.MakeDockerClient(endpoint, s.config) + if err != nil { + return nil, err + } + + info, err := engine.Info(context.Background()) + if err != nil { + return nil, err + } + + cores := make(map[string]int) + for i := 0; i < info.NCPU; i++ { + cores[strconv.itoa(i)] = 1 + } + + key := fmt.Sprintf(podNodesKey, podname, name) + node := types.Node{ + Name: name, + Endpoint: endpoint, + Podname: podname, + Public: public, + Cores: cores, + Engine: engine, + } + + bytes, err := json.Marshal(node) + if err != nil { + return nil, err + } + + _, err = s.etcd.Set(context.Background(), key, string(bytes).nil) + if err != nil { + return nil, err + } + + return node, nil +} + +// get all nodes from etcd +// any error will break and return immediately +func (s *Store) GetAllNodes() ([]types.Node, error) { + var ( + nodes []types.Node + err error + ) + + pods, err := s.GetAllPods() + if err != nil { + return nodes, err + } + + for _, pod := range pods { + ns, err := s.GetNodesByPod(pod.Name) + if err != nil { + return nodes, err + } + nodes = append(nodes, ns...) + } + return nodes, err +} + +// get all nodes bound to pod +// here we use podname instead of pod instance +func (s *Store) GetNodesByPod(podname string) ([]types.Node, error) { + var ( + nodes []types.Node + err error + ) + + key := fmt.Sprintf(podNodesKey, podname) + resp, err := s.etcd.Get(context.Background(), key, nil) + if err != nil { + return nodes, err + } + if !resp.Node.Dir { + return nil, fmt.Errorf("Node storage path %q in etcd is not a directory", key) + } + + for _, node := range resp.Node.Nodes { + nodename := utils.Tail(node.Key) + n, err := s.GetNode(podname, nodename) + if err != nil { + return nodes, err + } + nodes = append(nodes, n) + } + return nodes, err +} + +func NewStore(config *types.Config) (*Store, error) { + if len(config.EtcdMachines) == 0 { + return nil, fmt.Errorf("Must set ETCD") + } + + cli, err := client.New(client.Config{Endpoints: config.EtcdMachines}) + if err != nil { + return nil, err + } + + etcd := &client.NewKeysAPI(cli) + return &Store{etcd: etcd, config: config}, nil +} diff --git a/types/config.go b/types/config.go new file mode 100644 index 000000000..6c3580e83 --- /dev/null +++ b/types/config.go @@ -0,0 +1,25 @@ +package types + +type Config struct { + ListenAddress string `yaml:"bind"` + AgentPort string `yaml:"agent_port"` + PermDir string `yaml:"permdir"` + EtcdMachines []string `yaml:"etcd"` + + Git GitConfig `yaml:"git"` + Docker DockerConfig `yaml:"docker"` +} + +type GitConfig struct { + GitPublicKey string `yaml:"public_key"` + GitPrivateKey string `yaml:"private_key"` +} + +type DockerConfig struct { + DockerAPIVersion string `yaml:"version"` + DockerLogDriver string `yaml:"log_driver"` + DockerNetworkMode string `yaml:"network_mode"` + DockerNetworkDisabled bool `yaml:"network_disabled"` + DockerCertPath string `yaml:"cert_path"` + DockerHub string `yaml:"hub"` +} diff --git a/types/container.go b/types/container.go new file mode 100644 index 000000000..f22ade6df --- /dev/null +++ b/types/container.go @@ -0,0 +1,7 @@ +package types + +type Container struct { + ID string `json:"id"` + Podname string `json:"podname"` + Nodename string `json:"nodename"` +} diff --git a/types/node.go b/types/node.go new file mode 100644 index 000000000..4e0b95606 --- /dev/null +++ b/types/node.go @@ -0,0 +1,19 @@ +package types + +import ( + "net/http" + "sync" + + "github.com/docker/engine-api/client" +) + +type Node struct { + sync.Mutex + + Name string `json:"name"` + Endpoint string `json:"endpoint"` + Podname string `json:"podname"` + Public bool `json:"public"` + Cores map[string]int `json:"cores"` + Engine *client.Client `json:"-"` +} diff --git a/types/pod.go b/types/pod.go new file mode 100644 index 000000000..bbfca4601 --- /dev/null +++ b/types/pod.go @@ -0,0 +1,11 @@ +package types + +import ( + "sync" +) + +type Pod struct { + sync.Mutex + Name string `json:"name"` + Desc string `json:"desc"` +} diff --git a/types/specs.go b/types/specs.go new file mode 100644 index 000000000..c12d3c18c --- /dev/null +++ b/types/specs.go @@ -0,0 +1,99 @@ +package common + +import ( + "fmt" + "strings" + + "github.com/fsouza/go-dockerclient" + "gopkg.in/yaml.v2" +) + +// app.yaml读进来之后就变这个了 +// Entrypoints的key是entrypoint的名字 +// Binds的key是HostPath, 宿主机对应的路径 +type AppSpecs struct { + Appname string `yaml:"appname,omitempty"` + Entrypoints map[string]Entrypoint `yaml:"entrypoints,omitempty,flow"` + Build []string `yaml:"build,omitempty,flow"` + Volumes []string `yaml:"volumes,omitempty,flow"` + Binds map[string]Bind `yaml:"binds,omitempty,flow"` + Meta map[string]string `yaml:"meta,omitempty,flow"` + Base string `yaml:"base"` +} + +// 单个entrypoint +// Ports是端口列表, 形如5000/tcp, 5001/udp +// Exposes是对外暴露端口列表, 形如22:5000, 前面是容器内端口, 后面是宿主机端口 +// Hosts是容器内会加入的/etc/hosts列表 +type Entrypoint struct { + Command string `yaml:"cmd,omitempty"` + Ports []docker.Port `yaml:"ports,omitempty,flow"` + Exposes []Expose `yaml:"exposes,omitempty,flow"` + NetworkMode string `yaml:"network_mode,omitempty"` + MemoryLimit int `yaml:"mem_limit,omitempty"` + RestartPolicy string `yaml:"restart,omitempty"` + HealthCheck string `yaml:"health_check,omitempty"` + ExtraHosts []string `yaml:"hosts,omitempty,flow"` + PermDir bool `yaml:"permdir,omitempty"` + Privileged string `yaml:"privileged,omitempty"` +} + +// 单个bind +// Binds对应的内容, 容器里的路径, 以及是否只读 +type Bind struct { + InContainerPath string `yaml:"bind,omitempty"` + ReadOnly bool `yaml:"ro,omitempty"` +} + +type Expose string + +// suppose expose is like 80/tcp:46656/tcp +func (e Expose) ContainerPort() docker.Port { + ports := strings.Split(string(e), ":") + return docker.Port(ports[0]) +} + +func (e Expose) HostPort() docker.Port { + ports := strings.Split(string(e), ":") + return docker.Port(ports[1]) +} + +// 从content加载一个AppSpecs对象出来 +// content, app.yaml的内容 +// check, 是否要检查配置, 如果直接从etcd读不需要检查 +func LoadAppSpecs(content string, check bool) (AppSpecs, error) { + specs := AppSpecs{} + err := yaml.Unmarshal([]byte(content), &specs) + if err != nil { + return specs, err + } + + if check { + err = verify(specs) + if err != nil { + return specs, err + } + } + return specs, nil +} + +// 基本的检查, 检查失败就挂了 +// 一些对ports, exposes啥的检查还没做 +func verify(a AppSpecs) error { + if a.Appname == "" { + return fmt.Errorf("No appname specified") + } + if len(a.Entrypoints) == 0 { + return fmt.Errorf("No entrypoints specified") + } + if len(a.Build) == 0 { + return fmt.Errorf("No build commands specified") + } + + for name, _ := range a.Entrypoints { + if strings.Contains(name, "_") { + return fmt.Errorf("Sorry but we do not support `_` in entrypoint 눈_눈") + } + } + return nil +} diff --git a/utils/utils.go b/utils/utils.go new file mode 100644 index 000000000..fcb159738 --- /dev/null +++ b/utils/utils.go @@ -0,0 +1,78 @@ +package utils + +import ( + "fmt" + "math/rand" + "net" + "net/http" + "net/url" + "path/filepath" + "strings" + "time" + + "github.com/docker/engine-api/client" + "github.com/docker/go-connections/tlsconfig" + "gitlab.ricebook.net/platform/core/types" +) + +const ( + letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" + shortenLength = 7 +) + +func RandomString(n int) string { + rand.Seed(time.Now().UnixNano()) + r := make([]byte, n) + for i := 0; i < n; i++ { + r[i] = letters[rand.Intn(len(letters))] + } + return string(r) +} + +func TruncateID(id string) string { + if len(id) > shortenLength { + return id[:shortenLength] + } + return id +} + +func Tail(path string) string { + parts := strings.Split(path, "/") + return parts[len(parts)-1] +} + +func MakeDockerClient(endpoint, config *types.Config) (*client.Client, error) { + if !strings.HasPrefix(endpoint, "tcp://") { + endpoint = "tcp://" + endpoint + } + + u, err := url.Parse(endpoint) + if err != nil { + return nil, err + } + + host, _, err := net.SplitHostPort(u.Host) + if err != nil { + return nil, err + } + + dockerCertPath := filepath.Join(config.DockerConfig.DockerCertPath, host) + options := tlsconfig.Options{ + CAFile: filepath.Join(dockerCertPath, "ca.pem"), + CertFile: filepath.Join(dockerCertPath, "cert.pem"), + KeyFile: filepath.Join(dockerCertPath, "key.pem"), + InsecureSkipVerify: false, + } + tlsc, err := tlsconfig.Client(options) + if err != nil { + return nil, err + } + + cli := &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: tlsc, + }, + } + + return client.NewClient(endpoint, config.DockerConfig.DockerAPIVersion, cli, nil) +}