diff --git a/Dockerfile b/Dockerfile index 4d20bc8..afa9033 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM alpine +FROM alpine:3.4 MAINTAINER Gunjan Patel diff --git a/Makefile b/Makefile index 963a124..53ea705 100644 --- a/Makefile +++ b/Makefile @@ -1,46 +1,45 @@ CALICO_BUILD?=hitomitak/go-build-ppc64le SRC_FILES=$(shell find . -type f -name '*.go') GOBGPD_VERSION?=$(shell git describe --tags --dirty) +PACKAGE_NAME?=github.com/projectcalico/calico-bgp-daemon +LOCAL_USER_ID?=$(shell id -u $$USER) #CONTAINER_NAME?=calico/gobgpd # - build-containerized: clean vendor dist/gobgp mkdir -p dist docker run --rm \ - -v ${PWD}:/go/src/github.com/projectcalico/calico-bgp-daemon \ - -v ${PWD}/dist:/go/src/github.com/projectcalico/calico-bgp-daemon/dist \ - -e LOCAL_USER_ID=`id -u $$USER` \ + -v $(CURDIR):/go/src/$(PACKAGE_NAME) \ + -v $(CURDIR)/dist:/go/src/$(PACKAGE_NAME)/dist \ + -e LOCAL_USER_ID=$(LOCAL_USER_ID) \ $(CALICO_BUILD) sh -c '\ - cd /go/src/github.com/projectcalico/calico-bgp-daemon && \ + cd /go/src/$(PACKAGE_NAME) && \ make binary' -vendor: glide - docker run --rm -v ${PWD}:/go/src/github.com/projectcalico/calico-bgp-daemon:rw --entrypoint=sh \ - glide-ppc64le -c \ - 'cd /go/src/github.com/projectcalico/calico-bgp-daemon; \ - glide install -strip-vcs -strip-vendor --cache; \ - chown $(shell id -u):$(shell id -u) -R vendor' +vendor: glide.yaml + mkdir -p $(HOME)/.glide + docker run --rm \ + -v $(CURDIR):/go/src/$(PACKAGE_NAME):rw \ + -v $(HOME)/.glide:/home/user/.glide:rw --entrypoint=sh \ + glide-ppc64le -c ' \ + cd /go/src/$(PACKAGE_NAME) && \ + glide install -strip-vendor' -glide: + +glide.yaml: docker build -t glide-ppc64le - < Dockerfile.glide binary: dist/gobgpd dist/gobgp: mkdir -p $(@D) - docker run --rm -v `pwd`/dist:/go/code \ - -e LOCAL_USER_ID=`id -u $$USER` \ - $(CALICO_BUILD) sh -c \ - 'mkdir -p /go/code && go get github.com/osrg/gobgp/gobgp && cp /go/bin/gobgp /go/code && \ - chown $(shell id -u):$(shell id -g) /go/code/gobgp' + docker run --rm -v $(CURDIR)/dist:/go/bin \ + -e LOCAL_USER_ID=$(LOCAL_USER_ID) \ + $(CALICO_BUILD) go get -v github.com/osrg/gobgp/gobgp dist/gobgpd: $(SRC_FILES) mkdir -p $(@D) go build -v -o dist/calico-bgp-daemon \ - -ldflags "-X main.VERSION=$(GOBGPD_VERSION) -s -w" main.go - -#$(CONTAINER_NAME): build-containerized -# docker build -t $(CONTAINER_NAME) . + -ldflags "-X main.VERSION=$(GOBGPD_VERSION) -s -w" main.go ipam.go release: clean ifndef VERSION diff --git a/glide.lock b/glide.lock index 10e44ad..80fb93e 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: c2ab068779a7a61daa17c1a76510fe373071953a0b50057e009569d23313abb1 -updated: 2016-12-13T18:38:07.369478917-08:00 +hash: 047e0a504e61547fa0c161d990e9de0d49255f4071e76e9f17cb2ebc8b690262 +updated: 2017-03-21T14:48:02.98783753-07:00 imports: - name: cloud.google.com/go version: 3b1ae45394a234c385be014e9a488f2bb6eef821 @@ -11,7 +11,7 @@ imports: - name: github.com/blang/semver version: 31b736133b98f26d5e078ec9eb591666edfd091f - name: github.com/coreos/etcd - version: 3d5ba43211beec7fbb1472634a9c3b464581658a + version: ac1c7eba21545c4ee025f2c340e143cacb53c754 subpackages: - client - pkg/fileutil @@ -28,13 +28,13 @@ imports: - oauth2 - oidc - name: github.com/coreos/go-systemd - version: bfdc81d0d7e0fb19447b08571f63b774495251ce + version: 48702e0da86bd25e76cfef347e2adeb434a0d0a6 subpackages: - daemon - journal - util - name: github.com/coreos/pkg - version: fa29b1d70f0beaddd4c7021607cc3c3be8ce94b8 + version: 3ac0863d7acf3bc44daf49afef8919af12f704ef subpackages: - capnslog - health @@ -59,7 +59,7 @@ imports: - log - swagger - name: github.com/fsnotify/fsnotify - version: fd9ec7deca8bf46ecd2a795baaacf2b3a9be1197 + version: a904159b9206978bb6d53fcc7a769e5cd726c737 - name: github.com/ghodss/yaml version: 73d445a93680fa1a78ae23a5839bad48f32ba1ee - name: github.com/go-openapi/jsonpointer @@ -71,7 +71,7 @@ imports: - name: github.com/go-openapi/swag version: 1d0bd113de87027671077d3c71eb3ac5d7dbba72 - name: github.com/gogo/protobuf - version: 8d70fb3182befc465c4a1eac8ad4d38ff49778e2 + version: 909568be09de550ed094403c2bf8a261b5bb730a subpackages: - proto - sortkeys @@ -85,7 +85,7 @@ imports: - name: github.com/google/gofuzz version: bbcb9da2d746f8bdbd6a936686a0a6067ada0ec5 - name: github.com/hashicorp/hcl - version: 37ab263305aaeb501a60eb16863e808d426e37f2 + version: 372e8ddaa16fd67e371e9323807d056b799360af subpackages: - hcl/ast - hcl/parser @@ -100,17 +100,19 @@ imports: - name: github.com/imdario/mergo version: 6633656539c1639d9d78127b7d47c622b5d7b6dc - name: github.com/influxdata/influxdb - version: 855c567c67d7fff1e2f91617c292ea20cf26e53c + version: 8c25f0104eba8c0060e44a489f9d549e7d6a7a4d subpackages: - client/v2 - models - pkg/escape - name: github.com/jonboulle/clockwork version: 2eee05ed794112d45db504eb05aa693efd2b8b09 +- name: github.com/juju/ratelimit + version: 77ed1c8a01217656d2080ad51981f6e99adaa177 - name: github.com/kelseyhightower/envconfig version: 5c008110b20b657eb7e005b83d0a5f6aa6bb5f4b - name: github.com/magiconair/properties - version: 9c47895dc1ce54302908ab8a43385d1f5df2c11c + version: b3b15ef068fd0b17ddf408a23669f20811d194d2 - name: github.com/mailru/easyjson version: d5b7844b561a7bc640052f1b935f7b800330d7e0 subpackages: @@ -118,9 +120,9 @@ imports: - jlexer - jwriter - name: github.com/mitchellh/mapstructure - version: bfdb1a85537d60bc7e954e600c250219ea497417 + version: db1efb556f84b25a0a13a04aad883943538ad2e0 - name: github.com/osrg/gobgp - version: ee8ce99e2df1f732ef515e20fb3054c0cff0249b + version: 356c01a9d061b2b6bb4c3068a9888b56dc435600 subpackages: - api - config @@ -136,7 +138,7 @@ imports: - name: github.com/pelletier/go-buffruneio version: df1e16fde7fc330a0ca68167c23bf7ed6ac31d6d - name: github.com/pelletier/go-toml - version: 017119f7a78a0b5fc0ea39ef6be09f03acf3345d + version: d1fa2118c12c44e4f5004da216d1efad10cb4924 - name: github.com/projectcalico/go-json version: 6219dc7339ba20ee4c57df0a8baac62317d19cb1 subpackages: @@ -146,7 +148,7 @@ imports: - name: github.com/projectcalico/go-yaml-wrapper version: 598e54215bee41a19677faa4f0c32acd2a87eb56 - name: github.com/projectcalico/libcalico-go - version: f87d39740b5a49891d6ee2646a17326c5e337b20 + version: 42a36f79c2f92d07467c6ee15603a340e9cf6035 subpackages: - lib/api - lib/api/unversioned @@ -155,14 +157,21 @@ imports: - lib/backend/compat - lib/backend/etcd - lib/backend/k8s + - lib/backend/k8s/resources - lib/backend/k8s/thirdparty - lib/backend/model - lib/client - lib/errors + - lib/hash - lib/hwm + - lib/ipip - lib/net - lib/numorstring - lib/scope + - lib/selector + - lib/selector/parser + - lib/selector/tokenizer + - lib/validator - name: github.com/PuerkitoBio/purell version: 8a290539e2e8629dbc4e6bad948158f790ec31f4 - name: github.com/PuerkitoBio/urlesc @@ -170,25 +179,25 @@ imports: - name: github.com/satori/go.uuid version: b061729afc07e77a8aa4fad0a2fd840958f1942a - name: github.com/Sirupsen/logrus - version: 881bee4e20a5d11a6a88a5667c6f292072ac1963 + version: c078b1e43f58d563c74cebe63c85789e76ddb627 - name: github.com/spf13/afero - version: 2f30b2a92c0e5700bcfe4715891adb1f2a7a406d + version: 72b31426848c6ef12a7a8e216708cb0d1530f074 subpackages: - mem - name: github.com/spf13/cast - version: 24b6558033ffe202bf42f0f3b870dcc798dd2ba8 + version: d1139bab1c07d5ad390a65e7305876b3c1a8370b - name: github.com/spf13/jwalterweatherman - version: 33c24e77fb80341fe7130ee7c594256ff08ccc46 + version: fa7ca7e836cf3a8bb4ebf799f472c12d7e903d66 - name: github.com/spf13/pflag version: 08b1a584251b5b62f458943640fc8ebd4d50aaa5 - name: github.com/spf13/viper version: 5ed0fc31f7f453625df314d8e66b9791e8d13003 - name: github.com/ugorji/go - version: f1f1a805ed361a0e078bb537e4ea78cd37dcf065 + version: ded73eae5db7e7a0ef6f55aace87a2873c5d2b74 subpackages: - codec - name: github.com/vishvananda/netlink - version: c750a61f1836d48aacb1c74deafb05cfb549eb92 + version: fe3b5664d23a11b52ba59bece4ff29c52772a56b subpackages: - nl - name: github.com/vishvananda/netns @@ -200,7 +209,7 @@ imports: - blowfish - ssh/terminal - name: golang.org/x/net - version: 6acef71eb69611914f7a30939ea9f6e194c78172 + version: f2499483f923065a842d38eb4c7f1927e6fc6e6d subpackages: - context - context/ctxhttp @@ -208,6 +217,7 @@ imports: - http2/hpack - idna - internal/timeseries + - lex/httplex - trace - name: golang.org/x/oauth2 version: 3c3a985cb79f52a3190fbc056984415ca6763d01 @@ -246,7 +256,7 @@ imports: - internal/urlfetch - urlfetch - name: google.golang.org/grpc - version: 0d9891286aca15aeb2b0a73be9f5946c3cfefa85 + version: 777daa17ff9b5daef1cfdf915088a2ada3332bf0 subpackages: - codes - credentials @@ -255,9 +265,9 @@ imports: - metadata - naming - peer - - stats - - tap - transport +- name: gopkg.in/go-playground/validator.v8 + version: 5f57d2222ad794d0dffb07e664ea05e2ee07d60c - name: gopkg.in/inf.v0 version: 3887ee99ecf07df5b447e9b00d9c0b2adaa9f3e4 - name: gopkg.in/tchap/go-patricia.v2 @@ -269,7 +279,7 @@ imports: - name: gopkg.in/yaml.v2 version: 53feefa2559fb8dfa8d81baad31be332c97d6c77 - name: k8s.io/client-go - version: 41a99d711af778a177f07402217b85d456b50da1 + version: 243d8a9cb66a51ad8676157f79e71033b4014a2a subpackages: - discovery - kubernetes @@ -289,7 +299,6 @@ imports: - pkg/api/errors - pkg/api/install - pkg/api/meta - - pkg/api/meta/metatypes - pkg/api/resource - pkg/api/unversioned - pkg/api/v1 @@ -319,6 +328,8 @@ imports: - pkg/apis/extensions - pkg/apis/extensions/install - pkg/apis/extensions/v1beta1 + - pkg/apis/meta/v1 + - pkg/apis/meta/v1/unstructured - pkg/apis/policy - pkg/apis/policy/install - pkg/apis/policy/v1beta1 @@ -362,7 +373,6 @@ imports: - pkg/util/net - pkg/util/parsers - pkg/util/rand - - pkg/util/ratelimit - pkg/util/runtime - pkg/util/sets - pkg/util/uuid diff --git a/glide.yaml b/glide.yaml index fa3f977..361df38 100644 --- a/glide.yaml +++ b/glide.yaml @@ -1,13 +1,14 @@ -package: . +package: github.com/projectcalico/calico-bgp-daemon import: - package: github.com/Sirupsen/logrus + version: v0.11.2 - package: github.com/coreos/etcd - version: ~3.1.0-alpha.1 + version: v3.1.1 subpackages: - client - pkg/transport - package: github.com/osrg/gobgp - version: ee8ce99e2df1f732ef515e20fb3054c0cff0249b + version: v1.16.0 subpackages: - api - config @@ -15,25 +16,14 @@ import: - server - table - package: github.com/projectcalico/libcalico-go - version: v1.0.0-rc6 + version: v1.1.3 subpackages: - lib/api - - lib/api/unversioned - - lib/backend - - lib/backend/api - - lib/backend/compat - - lib/backend/etcd - - lib/backend/k8s - - lib/backend/model - lib/client - - lib/errors - - lib/hwm - - lib/net - lib/numorstring - lib/scope - package: github.com/vishvananda/netlink - package: golang.org/x/net subpackages: - context -- package: google.golang.org/grpc - version: 0d9891286aca15aeb2b0a73be9f5946c3cfefa85 +- package: gopkg.in/tomb.v2 diff --git a/ipam.go b/ipam.go new file mode 100644 index 0000000..ca3d07a --- /dev/null +++ b/ipam.go @@ -0,0 +1,154 @@ +// Copyright (C) 2017 Nippon Telegraph and Telephone Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "encoding/json" + "fmt" + "log" + "strings" + "sync" + + etcd "github.com/coreos/etcd/client" + "github.com/osrg/gobgp/table" + "golang.org/x/net/context" +) + +type ipPool struct { + CIDR string `json:"cidr"` + IPIP string `json:"ipip"` + Mode string `json:"ipip_mode"` +} + +func (lhs *ipPool) equal(rhs *ipPool) bool { + if lhs == rhs { + return true + } + if lhs == nil || rhs == nil { + return false + } + return lhs.CIDR == rhs.CIDR && lhs.IPIP == rhs.IPIP && lhs.Mode == rhs.Mode +} + +// Contain returns true if this ipPool contains 'prefix' +func (p *ipPool) contain(prefix string) bool { + k := table.CidrToRadixkey(prefix) + l := table.CidrToRadixkey(p.CIDR) + return strings.HasPrefix(k, l) +} + +type ipamCache struct { + mu sync.RWMutex + m map[string]*ipPool + etcdAPI etcd.KeysAPI + updateHandler func(*ipPool) error +} + +// match checks whether we have an IP pool which contains the given prefix. +// If we have, it returns the pool. +func (c *ipamCache) match(prefix string) *ipPool { + c.mu.RLock() + defer c.mu.RUnlock() + for _, p := range c.m { + if p.contain(prefix) { + return p + } + } + return nil +} + +// update updates the internal map with IPAM updates when the update +// is new addtion to the map or changes the existing item, it calls +// updateHandler +func (c *ipamCache) update(node *etcd.Node, del bool) error { + c.mu.Lock() + defer c.mu.Unlock() + log.Printf("update ipam cache: %s, %v, %t", node.Key, node.Value, del) + if node.Dir { + return nil + } + p := &ipPool{} + if err := json.Unmarshal([]byte(node.Value), p); err != nil { + return err + } + if p.CIDR == "" { + return fmt.Errorf("empty cidr: %s", node.Value) + } + q := c.m[p.CIDR] + if del { + delete(c.m, p.CIDR) + return nil + } else if p.equal(q) { + return nil + } + + c.m[p.CIDR] = p + + if c.updateHandler != nil { + return c.updateHandler(p) + } + return nil +} + +// sync synchronizes the contents under /calico/v1/ipam +func (c *ipamCache) sync() error { + res, err := c.etcdAPI.Get(context.Background(), CALICO_IPAM, &etcd.GetOptions{Recursive: true}) + if err != nil { + return err + } + + var index uint64 + for _, node := range res.Node.Nodes { + if node.ModifiedIndex > index { + index = node.ModifiedIndex + } + if err = c.update(node, false); err != nil { + return err + } + } + + watcher := c.etcdAPI.Watcher(CALICO_IPAM, &etcd.WatcherOptions{Recursive: true, AfterIndex: index}) + for { + res, err := watcher.Next(context.Background()) + if err != nil { + return err + } + del := false + node := res.Node + switch res.Action { + case "set", "create", "update", "compareAndSwap": + case "delete": + del = true + node = res.PrevNode + default: + log.Printf("unhandled action: %s", res.Action) + continue + } + if err = c.update(node, del); err != nil { + return err + } + } + return nil +} + +// create new IPAM cache +func newIPAMCache(api etcd.KeysAPI, updateHandler func(*ipPool) error) *ipamCache { + return &ipamCache{ + m: make(map[string]*ipPool), + updateHandler: updateHandler, + etcdAPI: api, + } +} diff --git a/main.go b/main.go index 1dd92d2..1bf49a0 100644 --- a/main.go +++ b/main.go @@ -1,4 +1,4 @@ -// Copyright (C) 2016 Nippon Telegraph and Telephone Corporation. +// Copyright (C) 2016-2017 Nippon Telegraph and Telephone Corporation. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -22,8 +22,8 @@ import ( "log" "net" "os" - "strconv" "strings" + "syscall" "time" "github.com/Sirupsen/logrus" @@ -34,20 +34,29 @@ import ( bgp "github.com/osrg/gobgp/packet/bgp" bgpserver "github.com/osrg/gobgp/server" bgptable "github.com/osrg/gobgp/table" + calicoapi "github.com/projectcalico/libcalico-go/lib/api" calicocli "github.com/projectcalico/libcalico-go/lib/client" + "github.com/projectcalico/libcalico-go/lib/numorstring" + calicoscope "github.com/projectcalico/libcalico-go/lib/scope" "github.com/vishvananda/netlink" "golang.org/x/net/context" + "gopkg.in/tomb.v2" ) const ( - HOSTNAME = "HOSTNAME" - IP = "IP" - IP6 = "IP6" + NODENAME = "NODENAME" + AS = "AS" CALICO_PREFIX = "/calico" CALICO_BGP = CALICO_PREFIX + "/bgp/v1" CALICO_AGGR = CALICO_PREFIX + "/ipam/v2/host" + CALICO_IPAM = CALICO_PREFIX + "/v1/ipam" defaultDialTimeout = 30 * time.Second + + aggregatedPrefixSetName = "aggregated" + hostPrefixSetName = "host" + + RTPROT_GOBGP = 0x11 ) // VERSION is filled out during the build process (using git describe output) @@ -70,12 +79,8 @@ func errorButKeyNotFound(err error) error { return err } -func getEtcdConfig() (etcd.Config, error) { +func getEtcdConfig(cfg *calicoapi.CalicoAPIConfig) (etcd.Config, error) { var config etcd.Config - cfg, err := calicocli.LoadClientConfigFromEnvironment() - if err != nil { - return config, err - } etcdcfg := cfg.Spec.EtcdConfig etcdEndpoints := etcdcfg.EtcdEndpoints if etcdEndpoints == "" { @@ -95,114 +100,229 @@ func getEtcdConfig() (etcd.Config, error) { return config, nil } -func getGlobalASN(api etcd.KeysAPI) (uint32, error) { - res, err := api.Get(context.Background(), fmt.Sprintf("%s/global/as_num", CALICO_BGP), nil) +type Server struct { + t tomb.Tomb + bgpServer *bgpserver.BgpServer + client *calicocli.Client + etcd etcd.KeysAPI + ipv4 net.IP + ipv6 net.IP + ipam *ipamCache +} + +func NewServer() (*Server, error) { + config, err := calicocli.LoadClientConfigFromEnvironment() if err != nil { - return 0, err + return nil, err } - asn, err := strconv.ParseUint(res.Node.Value, 10, 32) + + etcdConfig, err := getEtcdConfig(config) if err != nil { - return 0, err + return nil, err + } + + cli, err := etcd.New(etcdConfig) + if err != nil { + return nil, err } - return uint32(asn), nil + etcdCli := etcd.NewKeysAPI(cli) + + calicoCli, err := calicocli.New(*config) + if err != nil { + return nil, err + } + + node, err := calicoCli.Nodes().Get(calicoapi.NodeMetadata{Name: os.Getenv(NODENAME)}) + if err != nil { + return nil, err + } + + if node.Spec.BGP == nil { + return nil, fmt.Errorf("Calico is running in policy-only mode") + } + var ipv4, ipv6 net.IP + if ipnet := node.Spec.BGP.IPv4Address; ipnet != nil { + ipv4 = ipnet.IP + } + if ipnet := node.Spec.BGP.IPv6Address; ipnet != nil { + ipv6 = ipnet.IP + } + + bgpServer := bgpserver.NewBgpServer() + + return &Server{ + bgpServer: bgpServer, + client: calicoCli, + etcd: etcdCli, + ipv4: ipv4, + ipv6: ipv6, + }, nil } -func getPeerASN(api etcd.KeysAPI, host string) (uint32, error) { - res, err := api.Get(context.Background(), fmt.Sprintf("%s/host/%s/as_num", CALICO_BGP, host), nil) - if errorButKeyNotFound(err) != nil { - return 0, err +func (s *Server) Serve() { + s.t.Go(func() error { + s.bgpServer.Serve() + return nil + }) + + bgpAPIServer := bgpapi.NewGrpcServer(s.bgpServer, ":50051") + s.t.Go(bgpAPIServer.Serve) + + globalConfig, err := s.getGlobalConfig() + if err != nil { + log.Fatal(err) } - if res != nil && res.Node != nil { - v, err := strconv.ParseUint(res.Node.Value, 10, 32) - if err != nil { - return 0, err + + if err := s.bgpServer.Start(globalConfig); err != nil { + log.Fatal("failed to start BGP server:", err) + } + + if err := s.initialPolicySetting(); err != nil { + log.Fatal(err) + } + + s.ipam = newIPAMCache(s.etcd, s.ipamUpdateHandler) + // sync IPAM and call ipamUpdateHandler + s.t.Go(func() error { return fmt.Errorf("syncIPAM: %s", s.ipam.sync()) }) + // watch routes from other BGP peers and update FIB + s.t.Go(func() error { return fmt.Errorf("watchBGPPath: %s", s.watchBGPPath()) }) + // watch prefix assigned and announce to other BGP peers + s.t.Go(func() error { return fmt.Errorf("watchPrefix: %s", s.watchPrefix()) }) + // watch BGP configuration + s.t.Go(func() error { return fmt.Errorf("watchBGPConfig: %s", s.watchBGPConfig()) }) + // watch routes added by kernel and announce to other BGP peers + s.t.Go(func() error { return fmt.Errorf("watchKernelRoute: %s", s.watchKernelRoute()) }) + + <-s.t.Dying() + log.Fatal(s.t.Err()) + +} + +func isCrossSubnet(gw net.IP, subnet net.IPNet) bool { + p := &ipPool{CIDR: subnet.String()} + result := !p.contain(gw.String() + "/32") + return result +} + +func (s *Server) ipamUpdateHandler(pool *ipPool) error { + filter := &netlink.Route{ + Protocol: RTPROT_GOBGP, + } + list, err := netlink.RouteListFiltered(netlink.FAMILY_V4, filter, netlink.RT_FILTER_PROTOCOL) + if err != nil { + return err + } + node, err := s.client.Nodes().Get(calicoapi.NodeMetadata{Name: os.Getenv(NODENAME)}) + if err != nil { + return err + } + + for _, route := range list { + if route.Dst == nil { + continue + } + if pool.contain(route.Dst.String()) { + ipip := pool.IPIP != "" + if pool.Mode == "cross-subnet" && !isCrossSubnet(route.Gw, node.Spec.BGP.IPv4Address.Network().IPNet) { + ipip = false + } + if ipip { + i, err := net.InterfaceByName(pool.IPIP) + if err != nil { + return err + } + route.LinkIndex = i.Index + route.SetFlag(netlink.FLAG_ONLINK) + } else { + route.LinkIndex = 0 + route.Flags = 0 + } + return netlink.RouteReplace(&route) } - return uint32(v), nil } - return getGlobalASN(api) + return nil +} +func (s *Server) getNodeASN() (numorstring.ASNumber, error) { + return s.getPeerASN(os.Getenv(NODENAME)) } -func getGlobalConfig(api etcd.KeysAPI) (*bgpconfig.Global, error) { - asn, err := getGlobalASN(api) +func (s *Server) getPeerASN(host string) (numorstring.ASNumber, error) { + node, err := s.client.Nodes().Get(calicoapi.NodeMetadata{Name: host}) + if err != nil { + return 0, err + } + if node.Spec.BGP == nil { + return 0, fmt.Errorf("host %s is running in policy-only mode") + } + asn := node.Spec.BGP.ASNumber + if asn == nil { + return s.client.Config().GetGlobalASNumber() + } + return *asn, nil + +} + +func (s *Server) getGlobalConfig() (*bgpconfig.Global, error) { + asn, err := s.getNodeASN() if err != nil { return nil, err } return &bgpconfig.Global{ Config: bgpconfig.GlobalConfig{ - As: asn, - RouterId: os.Getenv(IP), + As: uint32(asn), + RouterId: s.ipv4.String(), }, }, nil } -func isMeshMode(api etcd.KeysAPI) (bool, error) { - res, err := api.Get(context.Background(), fmt.Sprintf("%s/global/node_mesh", CALICO_BGP), nil) - if err != nil { - return false, err - } - m := &struct { - Enabled bool `json:"enabled"` - }{} - if err := json.Unmarshal([]byte(res.Node.Value), m); err != nil { - return false, err - } - return m.Enabled, nil +func (s *Server) isMeshMode() (bool, error) { + return s.client.Config().GetNodeToNodeMesh() } -func getMeshNeighborConfigs(api etcd.KeysAPI) ([]*bgpconfig.Neighbor, error) { - globalASN, err := getGlobalASN(api) +// getMeshNeighborConfigs returns the list of mesh BGP neighbor configuration struct +func (s *Server) getMeshNeighborConfigs() ([]*bgpconfig.Neighbor, error) { + globalASN, err := s.getNodeASN() if err != nil { return nil, err } - res, err := api.Get(context.Background(), fmt.Sprintf("%s/host", CALICO_BGP), &etcd.GetOptions{Recursive: true}) + nodes, err := s.client.Nodes().List(calicoapi.NodeMetadata{}) if err != nil { return nil, err } - ns := make([]*bgpconfig.Neighbor, 0, len(res.Node.Nodes)) - for _, node := range res.Node.Nodes { - var v4, v6 string + ns := make([]*bgpconfig.Neighbor, 0, len(nodes.Items)) + for _, node := range nodes.Items { + if node.Metadata.Name == os.Getenv(NODENAME) { + continue + } peerASN := globalASN - for _, v := range node.Nodes { - path := strings.Split(v.Key, "/") - key := path[len(path)-1] - switch key { - case "ip_addr_v4": - v4 = v.Value - if v4 == os.Getenv(IP) { - v4 = "" - } - case "ip_addr_v6": - v6 = v.Value - if v6 == os.Getenv(IP6) { - v6 = "" - } - case "as_num": - asn, err := strconv.ParseUint(v.Value, 10, 32) - if err != nil { - return nil, err - } - peerASN = uint32(asn) - default: - log.Printf("unhandled key: %s", v.Key) - } + spec := node.Spec.BGP + if spec == nil { + continue + } + + asn := spec.ASNumber + if asn != nil { + peerASN = *asn } - if v4 != "" { - id := strings.Replace(v4, ".", "_", -1) + if v4 := spec.IPv4Address; v4 != nil { + ip := v4.IP.String() + id := strings.Replace(ip, ".", "_", -1) ns = append(ns, &bgpconfig.Neighbor{ Config: bgpconfig.NeighborConfig{ - NeighborAddress: v4, - PeerAs: peerASN, + NeighborAddress: ip, + PeerAs: uint32(peerASN), Description: fmt.Sprintf("Mesh_%s", id), }, }) } - if v6 != "" { - id := strings.Replace(v4, ":", "_", -1) + if v6 := spec.IPv6Address; v6 != nil { + ip := v6.IP.String() + id := strings.Replace(ip, ":", "_", -1) ns = append(ns, &bgpconfig.Neighbor{ Config: bgpconfig.NeighborConfig{ - NeighborAddress: v6, - PeerAs: peerASN, + NeighborAddress: ip, + PeerAs: uint32(peerASN), Description: fmt.Sprintf("Mesh_%s", id), }, }) @@ -212,6 +332,7 @@ func getMeshNeighborConfigs(api etcd.KeysAPI) ([]*bgpconfig.Neighbor, error) { } +// getNeighborConfigFromPeer returns a BGP neighbor configuration struct from *etcd.Node func getNeighborConfigFromPeer(node *etcd.Node, neighborType string) (*bgpconfig.Neighbor, error) { m := &struct { IP string `json:"ip"` @@ -220,7 +341,7 @@ func getNeighborConfigFromPeer(node *etcd.Node, neighborType string) (*bgpconfig if err := json.Unmarshal([]byte(node.Value), m); err != nil { return nil, err } - asn, err := strconv.ParseUint(m.ASN, 10, 32) + asn, err := numorstring.ASNumberFromString(m.ASN) if err != nil { return nil, err } @@ -233,63 +354,54 @@ func getNeighborConfigFromPeer(node *etcd.Node, neighborType string) (*bgpconfig }, nil } -func getNonMeshNeighborConfigs(api etcd.KeysAPI, neighborType, version string) ([]*bgpconfig.Neighbor, error) { - var key string +// getNonMeshNeighborConfigs returns the list of non-mesh BGP neighbor configuration struct +// valid neighborType is either "global" or "node" +func (s *Server) getNonMeshNeighborConfigs(neighborType string) ([]*bgpconfig.Neighbor, error) { + var metadata calicoapi.BGPPeerMetadata switch neighborType { case "global": - key = fmt.Sprintf("%s/global/peer_%s", CALICO_BGP, version) + metadata.Scope = calicoscope.Global case "node": - key = fmt.Sprintf("%s/host/%s/peer_%s", CALICO_BGP, os.Getenv(HOSTNAME), version) + metadata.Scope = calicoscope.Node + metadata.Node = os.Getenv(NODENAME) default: return nil, fmt.Errorf("invalid neighbor type: %s", neighborType) } - res, err := api.Get(context.Background(), key, &etcd.GetOptions{Recursive: true}) - if errorButKeyNotFound(err) != nil { + list, err := s.client.BGPPeers().List(metadata) + if err != nil { return nil, err } - if res == nil { - return nil, nil - } - ns := make([]*bgpconfig.Neighbor, 0, len(res.Node.Nodes)) - for _, node := range res.Node.Nodes { - var n *bgpconfig.Neighbor - if n, err = getNeighborConfigFromPeer(node, neighborType); err != nil { - return nil, err - } - ns = append(ns, n) + ns := make([]*bgpconfig.Neighbor, 0, len(list.Items)) + for _, node := range list.Items { + addr := node.Metadata.PeerIP.String() + ns = append(ns, &bgpconfig.Neighbor{ + Config: bgpconfig.NeighborConfig{ + NeighborAddress: addr, + PeerAs: uint32(node.Spec.ASNumber), + Description: fmt.Sprintf("%s_%s", strings.Title(neighborType), underscore(addr)), + }, + }) } return ns, nil } -func getGlobalNeighborConfigs(api etcd.KeysAPI) ([]*bgpconfig.Neighbor, error) { - v4s, err := getNonMeshNeighborConfigs(api, "global", "v4") - if err != nil { - return nil, err - } - v6s, err := getNonMeshNeighborConfigs(api, "global", "v6") - if err != nil { - return nil, err - } - return append(v4s, v6s...), nil +// getGlobalNeighborConfigs returns the list of global BGP neighbor configuration struct +func (s *Server) getGlobalNeighborConfigs() ([]*bgpconfig.Neighbor, error) { + return s.getNonMeshNeighborConfigs("global") } -func getNodeSpecificNeighborConfigs(api etcd.KeysAPI) ([]*bgpconfig.Neighbor, error) { - v4s, err := getNonMeshNeighborConfigs(api, "node", "v4") - if err != nil { - return nil, err - } - v6s, err := getNonMeshNeighborConfigs(api, "node", "v6") - if err != nil { - return nil, err - } - return append(v4s, v6s...), nil +// getNodeNeighborConfigs returns the list of node specific BGP neighbor configuration struct +func (s *Server) getNodeSpecificNeighborConfigs() ([]*bgpconfig.Neighbor, error) { + return s.getNonMeshNeighborConfigs("node") } -func getNeighborConfigs(api etcd.KeysAPI) ([]*bgpconfig.Neighbor, error) { +// getNeighborConfigs returns the complete list of BGP neighbor configuration +// which the node should peer. +func (s *Server) getNeighborConfigs() ([]*bgpconfig.Neighbor, error) { var neighbors []*bgpconfig.Neighbor // --- Node-to-node mesh --- - if mesh, err := isMeshMode(api); err == nil && mesh { - ns, err := getMeshNeighborConfigs(api) + if mesh, err := s.isMeshMode(); err == nil && mesh { + ns, err := s.getMeshNeighborConfigs() if err != nil { return nil, err } @@ -298,13 +410,13 @@ func getNeighborConfigs(api etcd.KeysAPI) ([]*bgpconfig.Neighbor, error) { return nil, err } // --- Global peers --- - if ns, err := getGlobalNeighborConfigs(api); err != nil { + if ns, err := s.getGlobalNeighborConfigs(); err != nil { return nil, err } else { neighbors = append(neighbors, ns...) } // --- Node-specific peers --- - if ns, err := getNodeSpecificNeighborConfigs(api); err != nil { + if ns, err := s.getNodeSpecificNeighborConfigs(); err != nil { return nil, err } else { neighbors = append(neighbors, ns...) @@ -312,55 +424,53 @@ func getNeighborConfigs(api etcd.KeysAPI) ([]*bgpconfig.Neighbor, error) { return neighbors, nil } -func makePath(key string, isWithdrawal bool) (*bgptable.Path, error) { +func etcdKeyToPrefix(key string) string { path := strings.Split(key, "/") - elems := strings.Split(path[len(path)-1], "-") - if len(elems) != 2 { - return nil, fmt.Errorf("invalid prefix format: %s", path[len(path)-1]) - } - prefix := elems[0] - masklen, err := strconv.ParseUint(elems[1], 10, 8) + return strings.Replace(path[len(path)-1], "-", "/", 1) +} + +func (s *Server) makePath(prefix string, isWithdrawal bool) (*bgptable.Path, error) { + _, ipNet, err := net.ParseCIDR(prefix) if err != nil { return nil, err } - p := net.ParseIP(prefix) + p := ipNet.IP + masklen, _ := ipNet.Mask.Size() v4 := true - if p == nil { - return nil, fmt.Errorf("invalid prefix format: %s", key) - } else if p.To4() == nil { + if p.To4() == nil { v4 = false } var nlri bgp.AddrPrefixInterface - if v4 { - nlri = bgp.NewIPAddrPrefix(uint8(masklen), prefix) - } else { - nlri = bgp.NewIPv6AddrPrefix(uint8(masklen), prefix) - } - attrs := []bgp.PathAttributeInterface{ bgp.NewPathAttributeOrigin(0), } if v4 { - attrs = append(attrs, bgp.NewPathAttributeNextHop(os.Getenv(IP))) + nlri = bgp.NewIPAddrPrefix(uint8(masklen), p.String()) + attrs = append(attrs, bgp.NewPathAttributeNextHop(s.ipv4.String())) } else { - attrs = append(attrs, bgp.NewPathAttributeMpReachNLRI(os.Getenv(IP6), []bgp.AddrPrefixInterface{nlri})) + nlri = bgp.NewIPv6AddrPrefix(uint8(masklen), p.String()) + attrs = append(attrs, bgp.NewPathAttributeMpReachNLRI(s.ipv6.String(), []bgp.AddrPrefixInterface{nlri})) } return bgptable.NewPath(nil, nlri, isWithdrawal, attrs, time.Now(), false), nil } -func getAssignedPrefixes(api etcd.KeysAPI) ([]*bgptable.Path, error) { +// getAssignedPrefixes retrives prefixes assigned to the node and returns them as a +// list of BGP path. +// using etcd directly since libcalico-go doesn't seem to have a method to return +// assigned prefixes yet. +func (s *Server) getAssignedPrefixes(api etcd.KeysAPI) ([]*bgptable.Path, error) { var ps []*bgptable.Path f := func(version string) error { - res, err := api.Get(context.Background(), fmt.Sprintf("%s/%s/%s/block", CALICO_AGGR, os.Getenv(HOSTNAME), version), &etcd.GetOptions{Recursive: true}) + res, err := api.Get(context.Background(), fmt.Sprintf("%s/%s/%s/block", CALICO_AGGR, os.Getenv(NODENAME), version), &etcd.GetOptions{Recursive: true}) if err != nil { return err } for _, v := range res.Node.Nodes { - path, err := makePath(v.Key, false) + path, err := s.makePath(etcdKeyToPrefix(v.Key), false) if err != nil { return err } @@ -368,12 +478,12 @@ func getAssignedPrefixes(api etcd.KeysAPI) ([]*bgptable.Path, error) { } return nil } - if os.Getenv(IP) != "" { + if s.ipv4 != nil { if err := f("ipv4"); err != nil { return nil, err } } - if os.Getenv(IP6) != "" { + if s.ipv6 != nil { if err := f("ipv6"); err != nil { return nil, err } @@ -381,8 +491,25 @@ func getAssignedPrefixes(api etcd.KeysAPI) ([]*bgptable.Path, error) { return ps, nil } -func watchPrefix(api etcd.KeysAPI, bgpServer *bgpserver.BgpServer) error { - watcher := api.Watcher(fmt.Sprintf("%s/%s", CALICO_AGGR, os.Getenv(HOSTNAME)), &etcd.WatcherOptions{Recursive: true}) +// watchPrefix watches etcd /calico/ipam/v2/host/$NODENAME and add/delete +// aggregated routes which are assigned to the node. +// This function also updates policy appropriately. +func (s *Server) watchPrefix() error { + + paths, err := s.getAssignedPrefixes(s.etcd) + if err != nil { + return err + } + + if err = s.updatePrefixSet(paths); err != nil { + return err + } + + if _, err := s.bgpServer.AddPath("", paths); err != nil { + return err + } + + watcher := s.etcd.Watcher(fmt.Sprintf("%s/%s", CALICO_AGGR, os.Getenv(NODENAME)), &etcd.WatcherOptions{Recursive: true}) for { var err error res, err := watcher.Next(context.Background()) @@ -390,23 +517,44 @@ func watchPrefix(api etcd.KeysAPI, bgpServer *bgpserver.BgpServer) error { return err } var path *bgptable.Path + key := etcdKeyToPrefix(res.Node.Key) if res.Action == "delete" { - path, err = makePath(res.Node.Key, true) + path, err = s.makePath(key, true) } else { - path, err = makePath(res.Node.Key, false) + path, err = s.makePath(key, false) } if err != nil { return err } - if _, err := bgpServer.AddPath("", []*bgptable.Path{path}); err != nil { + paths := []*bgptable.Path{path} + if err = s.updatePrefixSet(paths); err != nil { + return err + } + if _, err := s.bgpServer.AddPath("", paths); err != nil { return err } log.Printf("add path: %s", path) } } -func watchBGPConfig(api etcd.KeysAPI, bgpServer *bgpserver.BgpServer) error { - watcher := api.Watcher(fmt.Sprintf("%s", CALICO_BGP), &etcd.WatcherOptions{ +// watchBGPConfig watches etcd path /calico/bgp/v1 and handle various changes +// in etcd. Though this method tries to minimize effects to the existing BGP peers, +// when /calico/bgp/v1/host/$NODENAME or /calico/global/as_num is changed, +// give up handling the change and return error (this leads calico-bgp-daemon to be restarted) +func (s *Server) watchBGPConfig() error { + + neighborConfigs, err := s.getNeighborConfigs() + if err != nil { + return err + } + + for _, n := range neighborConfigs { + if err = s.bgpServer.AddNeighbor(n); err != nil { + return err + } + } + + watcher := s.etcd.Watcher(fmt.Sprintf("%s", CALICO_BGP), &etcd.WatcherOptions{ Recursive: true, }) for { @@ -414,7 +562,15 @@ func watchBGPConfig(api etcd.KeysAPI, bgpServer *bgpserver.BgpServer) error { if err != nil { return err } - log.Printf("watch: %v", res) + prev := "" + if res.PrevNode != nil { + prev = res.PrevNode.Value + } + log.Printf("watch: action: %s, key: %s node: %s, prev-node: %s", res.Action, res.Node.Key, res.Node.Value, prev) + if res.Action == "set" && res.Node.Value == prev { + log.Printf("same value. ignore") + continue + } handleNonMeshNeighbor := func(neighborType string) error { switch res.Action { @@ -423,13 +579,13 @@ func watchBGPConfig(api etcd.KeysAPI, bgpServer *bgpserver.BgpServer) error { if err != nil { return err } - return bgpServer.DeleteNeighbor(n) - case "set": + return s.bgpServer.DeleteNeighbor(n) + case "set", "create", "update", "compareAndSwap": n, err := getNeighborConfigFromPeer(res.Node, neighborType) if err != nil { return err } - return bgpServer.AddNeighbor(n) + return s.bgpServer.AddNeighbor(n) } log.Printf("unhandled action: %s", res.Action) return nil @@ -439,9 +595,9 @@ func watchBGPConfig(api etcd.KeysAPI, bgpServer *bgpserver.BgpServer) error { switch { case strings.HasPrefix(key, fmt.Sprintf("%s/global/peer_", CALICO_BGP)): err = handleNonMeshNeighbor("global") - case strings.HasPrefix(key, fmt.Sprintf("%s/host/%s/peer_", CALICO_BGP, os.Getenv(HOSTNAME))): + case strings.HasPrefix(key, fmt.Sprintf("%s/host/%s/peer_", CALICO_BGP, os.Getenv(NODENAME))): err = handleNonMeshNeighbor("node") - case strings.HasPrefix(key, fmt.Sprintf("%s/host/%s", CALICO_BGP, os.Getenv(HOSTNAME))): + case strings.HasPrefix(key, fmt.Sprintf("%s/host/%s", CALICO_BGP, os.Getenv(NODENAME))): log.Println("Local host config update. Restart") os.Exit(1) case strings.HasPrefix(key, fmt.Sprintf("%s/host", CALICO_BGP)): @@ -451,12 +607,15 @@ func watchBGPConfig(api etcd.KeysAPI, bgpServer *bgpserver.BgpServer) error { continue } deleteNeighbor := func(node *etcd.Node) error { + if node.Value == "" { + return nil + } n := &bgpconfig.Neighbor{ Config: bgpconfig.NeighborConfig{ NeighborAddress: node.Value, }, } - return bgpServer.DeleteNeighbor(n) + return s.bgpServer.DeleteNeighbor(n) } host := elems[len(elems)-2] switch elems[len(elems)-1] { @@ -472,37 +631,39 @@ func watchBGPConfig(api etcd.KeysAPI, bgpServer *bgpserver.BgpServer) error { return err } } - asn, err := getPeerASN(api, host) + if res.Node.Value == "" { + continue + } + asn, err := s.getPeerASN(host) if err != nil { return err } n := &bgpconfig.Neighbor{ Config: bgpconfig.NeighborConfig{ NeighborAddress: res.Node.Value, - PeerAs: asn, + PeerAs: uint32(asn), Description: fmt.Sprintf("Mesh_%s", underscore(res.Node.Value)), }, } - if err = bgpServer.AddNeighbor(n); err != nil { + if err = s.bgpServer.AddNeighbor(n); err != nil { return err } } case "as_num": - var asn uint32 + var asn numorstring.ASNumber if res.Action == "set" { - v, err := strconv.ParseUint(res.Node.Value, 10, 32) + asn, err = numorstring.ASNumberFromString(res.Node.Value) if err != nil { return err } - asn = uint32(v) } else { - asn, err = getGlobalASN(api) + asn, err = s.getNodeASN() if err != nil { return err } } for _, version := range []string{"v4", "v6"} { - res, err := api.Get(context.Background(), fmt.Sprintf("%s/host/%s/ip_addr_%s", CALICO_BGP, host, version), nil) + res, err := s.etcd.Get(context.Background(), fmt.Sprintf("%s/host/%s/ip_addr_%s", CALICO_BGP, host, version), nil) if errorButKeyNotFound(err) != nil { return err } @@ -516,11 +677,11 @@ func watchBGPConfig(api etcd.KeysAPI, bgpServer *bgpserver.BgpServer) error { n := &bgpconfig.Neighbor{ Config: bgpconfig.NeighborConfig{ NeighborAddress: ip, - PeerAs: asn, + PeerAs: uint32(asn), Description: fmt.Sprintf("Mesh_%s", underscore(ip)), }, } - if err = bgpServer.AddNeighbor(n); err != nil { + if err = s.bgpServer.AddNeighbor(n); err != nil { return err } } @@ -531,19 +692,19 @@ func watchBGPConfig(api etcd.KeysAPI, bgpServer *bgpserver.BgpServer) error { log.Println("Global AS number update. Restart") os.Exit(1) case strings.HasPrefix(key, fmt.Sprintf("%s/global/node_mesh", CALICO_BGP)): - mesh, err := isMeshMode(api) + mesh, err := s.isMeshMode() if err != nil { return err } - ns, err := getMeshNeighborConfigs(api) + ns, err := s.getMeshNeighborConfigs() if err != nil { return err } for _, n := range ns { if mesh { - err = bgpServer.AddNeighbor(n) + err = s.bgpServer.AddNeighbor(n) } else { - err = bgpServer.DeleteNeighbor(n) + err = s.bgpServer.DeleteNeighbor(n) } if err != nil { return err @@ -556,50 +717,87 @@ func watchBGPConfig(api etcd.KeysAPI, bgpServer *bgpserver.BgpServer) error { } } -func injectRoute(path *bgptable.Path) error { - nexthop := path.GetNexthop() - nlri := path.GetNlri() - var family int - var d string - - switch f := path.GetRouteFamily(); f { - case bgp.RF_IPv4_UC: - family = netlink.FAMILY_V4 - d = "0.0.0.0/0" - case bgp.RF_IPv6_UC: - family = netlink.FAMILY_V6 - d = "::/0" - default: - log.Printf("only supports injecting ipv4/ipv6 unicast route: %s", f) - return nil +// watchKernelRoute receives netlink route update notification and announces +// kernel/boot routes using BGP. +func (s *Server) watchKernelRoute() error { + ch := make(chan netlink.RouteUpdate) + err := netlink.RouteSubscribe(ch, nil) + if err != nil { + return err + } + for update := range ch { + log.Printf("kernel update: %s", update) + if update.Table == syscall.RT_TABLE_MAIN && (update.Protocol == syscall.RTPROT_KERNEL || update.Protocol == syscall.RTPROT_BOOT) { + isWithdrawal := false + switch update.Type { + case syscall.RTM_DELROUTE: + isWithdrawal = true + case syscall.RTM_NEWROUTE: + default: + log.Printf("unhandled rtm type: %d", update.Type) + continue + } + path, err := s.makePath(update.Dst.String(), isWithdrawal) + if err != nil { + return err + } + log.Printf("made path from kernel update: %s", path) + if _, err = s.bgpServer.AddPath("", []*bgptable.Path{path}); err != nil { + return err + } + } } + return fmt.Errorf("netlink route subscription ended") +} +// injectRoute is a helper function to inject BGP routes to linux kernel +// TODO: multipath support +func (s *Server) injectRoute(path *bgptable.Path) error { + nexthop := path.GetNexthop() + nlri := path.GetNlri() dst, _ := netlink.ParseIPNet(nlri.String()) route := &netlink.Route{ - Dst: dst, - Gw: nexthop, + Dst: dst, + Gw: nexthop, + Protocol: RTPROT_GOBGP, } - routes, _ := netlink.RouteList(nil, family) - for _, route := range routes { - if route.Dst != nil { - d = route.Dst.String() - } - if d == dst.String() { - err := netlink.RouteDel(&route) + + if dst.IP.To4() != nil { + if p := s.ipam.match(nlri.String()); p != nil { + ipip := p.IPIP != "" + + node, err := s.client.Nodes().Get(calicoapi.NodeMetadata{Name: os.Getenv(NODENAME)}) if err != nil { return err } + + if p.Mode == "cross-subnet" && !isCrossSubnet(route.Gw, node.Spec.BGP.IPv4Address.Network().IPNet) { + ipip = false + } + if ipip { + i, err := net.InterfaceByName(p.IPIP) + if err != nil { + return err + } + route.LinkIndex = i.Index + route.SetFlag(netlink.FLAG_ONLINK) + } } } + if path.IsWithdraw { log.Printf("removed route %s from kernel", nlri) - return nil + return netlink.RouteDel(route) } - log.Printf("added route %s to kernel", nlri) - return netlink.RouteAdd(route) + log.Printf("added route %s to kernel %s", nlri, route) + return netlink.RouteReplace(route) } -func monitorPath(watcher *bgpserver.Watcher) error { +// watchBGPPath watches BGP routes from other peers and inject them into +// linux kernel +// TODO: multipath support +func (s *Server) watchBGPPath() error { + watcher := s.bgpServer.Watch(bgpserver.WatchBestPath()) for { ev := <-watcher.Event() msg, ok := ev.(*bgpserver.WatchEventBestPath) @@ -610,91 +808,157 @@ func monitorPath(watcher *bgpserver.Watcher) error { if path.IsLocal() { continue } - if err := injectRoute(path); err != nil { + if err := s.injectRoute(path); err != nil { return err } } } } -func main() { - - // Display the version on "-v", otherwise just delegate to the skel code. - // Use a new flag set so as not to conflict with existing libraries which use "flag" - flagSet := flag.NewFlagSet("Calico", flag.ExitOnError) - - version := flagSet.Bool("v", false, "Display version") - err := flagSet.Parse(os.Args[1:]) +// initialPolicySetting initialize BGP export policy. +// this creates two prefix-sets named 'aggregated' and 'host'. +// A route is allowed to be exported when it matches with 'aggregated' set, +// and not allowed when it matches with 'host' set. +func (s *Server) initialPolicySetting() error { + createEmptyPrefixSet := func(name string) error { + ps, err := bgptable.NewPrefixSet(bgpconfig.PrefixSet{PrefixSetName: name}) + if err != nil { + return err + } + return s.bgpServer.AddDefinedSet(ps) + } + for _, name := range []string{aggregatedPrefixSetName, hostPrefixSetName} { + if err := createEmptyPrefixSet(name); err != nil { + return err + } + } + // intended to work as same as 'calico_pools' export filter of BIRD configuration + definition := bgpconfig.PolicyDefinition{ + Name: "calico_aggr", + Statements: []bgpconfig.Statement{ + bgpconfig.Statement{ + Conditions: bgpconfig.Conditions{ + MatchPrefixSet: bgpconfig.MatchPrefixSet{ + PrefixSet: aggregatedPrefixSetName, + }, + }, + Actions: bgpconfig.Actions{ + RouteDisposition: bgpconfig.ROUTE_DISPOSITION_ACCEPT_ROUTE, + }, + }, + bgpconfig.Statement{ + Conditions: bgpconfig.Conditions{ + MatchPrefixSet: bgpconfig.MatchPrefixSet{ + PrefixSet: hostPrefixSetName, + }, + }, + Actions: bgpconfig.Actions{ + RouteDisposition: bgpconfig.ROUTE_DISPOSITION_REJECT_ROUTE, + }, + }, + }, + } + policy, err := bgptable.NewPolicy(definition) if err != nil { - fmt.Println(err) - os.Exit(1) + return err } - if *version { - fmt.Println(VERSION) - os.Exit(0) + if err = s.bgpServer.AddPolicy(policy, false); err != nil { + return err } + return s.bgpServer.AddPolicyAssignment("", bgptable.POLICY_DIRECTION_EXPORT, + []*bgpconfig.PolicyDefinition{&definition}, + bgptable.ROUTE_TYPE_ACCEPT) +} - logrus.SetLevel(logrus.DebugLevel) - - config, err := getEtcdConfig() - if err != nil { - log.Fatal(err) +func (s *Server) updatePrefixSet(paths []*bgptable.Path) error { + for _, path := range paths { + err := s._updatePrefixSet(path.GetNlri().String(), path.IsWithdraw) + if err != nil { + return err + } } + return nil +} - cli, err := etcd.New(config) +// _updatePrefixSet updates 'aggregated' and 'host' prefix-sets +// we add the exact prefix to 'aggregated' set, and add corresponding longer +// prefixes to 'host' set. +// +// e.g. prefix: "192.168.1.0/26" del: false +// add "192.168.1.0/26" to 'aggregated' set +// add "192.168.1.0/26..32" to 'host' set +// +func (s *Server) _updatePrefixSet(prefix string, del bool) error { + _, ipNet, err := net.ParseCIDR(prefix) if err != nil { - log.Fatal(err) + return err + } + ps, err := bgptable.NewPrefixSet(bgpconfig.PrefixSet{ + PrefixSetName: aggregatedPrefixSetName, + PrefixList: []bgpconfig.Prefix{ + bgpconfig.Prefix{ + IpPrefix: prefix, + }, + }, + }) + if err != nil { + return err + } + if del { + err = s.bgpServer.DeleteDefinedSet(ps, false) + } else { + err = s.bgpServer.AddDefinedSet(ps) } - - bgpServer := bgpserver.NewBgpServer() - go bgpServer.Serve() - - bgpAPIServer := bgpapi.NewGrpcServer(bgpServer, ":50051") - go bgpAPIServer.Serve() - - api := etcd.NewKeysAPI(cli) - globalConfig, err := getGlobalConfig(api) if err != nil { - log.Fatal(err) + return err + } + min, _ := ipNet.Mask.Size() + max := 32 + if ipNet.IP.To4() == nil { + max = 128 + } + ps, err = bgptable.NewPrefixSet(bgpconfig.PrefixSet{ + PrefixSetName: hostPrefixSetName, + PrefixList: []bgpconfig.Prefix{ + bgpconfig.Prefix{ + IpPrefix: prefix, + MasklengthRange: fmt.Sprintf("%d..%d", min, max), + }, + }, + }) + if err != nil { + return err } - - if err := bgpServer.Start(globalConfig); err != nil { - log.Fatal(err) + if del { + return s.bgpServer.DeleteDefinedSet(ps, false) } + return s.bgpServer.AddDefinedSet(ps) +} + +func main() { - watcher := bgpServer.Watch(bgpserver.WatchBestPath()) - go func() { - log.Fatal(monitorPath(watcher)) - }() + // Display the version on "-v", otherwise just delegate to the skel code. + // Use a new flag set so as not to conflict with existing libraries which use "flag" + flagSet := flag.NewFlagSet("Calico", flag.ExitOnError) - paths, err := getAssignedPrefixes(api) + version := flagSet.Bool("v", false, "Display version") + err := flagSet.Parse(os.Args[1:]) if err != nil { - log.Fatal(err) + fmt.Println(err) + os.Exit(1) } - - if _, err := bgpServer.AddPath("", paths); err != nil { - log.Fatal(err) + if *version { + fmt.Println(VERSION) + os.Exit(0) } - go func() { - log.Fatal(watchPrefix(api, bgpServer)) - }() + logrus.SetLevel(logrus.InfoLevel) - neighborConfigs, err := getNeighborConfigs(api) + server, err := NewServer() if err != nil { + log.Printf("failed to create new server") log.Fatal(err) } - for _, n := range neighborConfigs { - if err = bgpServer.AddNeighbor(n); err != nil { - log.Fatal(err) - } - } - - go func() { - log.Fatal(watchBGPConfig(api, bgpServer)) - }() - - ch := make(chan struct{}) - <-ch + server.Serve() }