From db0802d6d1c2c7821022e5940483ff7dd034d40f Mon Sep 17 00:00:00 2001 From: Dan Everton <115049356+deverton-godaddy@users.noreply.github.com> Date: Thu, 3 Aug 2023 02:12:21 +1000 Subject: [PATCH] Refactor endpoint handling and reconcilliation (#21) * Disable cgo As part of the Go 1.20 release it seems like the default for `CGO_ENABLED` is no longer carried over from the tools. This leads to linking issues on systems that use different versions of glibc from what the base image uses. See https://github.com/golang/go/issues/58550 for more details. This change should fix #16 * Slim down the final image. Use `scratch` as a base image since we're generating a static binary anyway. Also be more explicity about the platform and target OS during the build. * Refactor endpoint handling and reconcilliation. DRAFT With the release of Nomad 1.6 it's possible to get the network address of the allocation from Nomad. The change to enable this is only in the client library and does not require updating the Nomad server. The IP was sent back by older Nomad versions, it just wasn't available in the client. This enables refactoring the endpoint reconcilliation to make use of the IP address to identify the endpoint within Cilium. There is no longer a dependency on Consul for policies. Additional, endpoints are now labelled with the task group and task information as services can be created at those levels. * Tidy * Update the readme and add some basic tests. Remove unused flags from the readme and command line and refactor the code to allow for testing. * Slightly better logging of labels * Always update labels on reconcile. * Fixes and align with upstream Dockerfile * Missed one * Reduce Dockerfile diff further * Restore periodic reconcilliation of endpoints * Remove stray return --- Dockerfile | 6 +- Makefile | 3 + README.md | 13 +- go.mod | 8 +- go.sum | 17 +- main.go | 39 +--- reapers/api.go | 34 +++ reapers/endpoints.go | 471 +++++++++++++------------------------- reapers/endpoints_test.go | 158 +++++++++++++ 9 files changed, 384 insertions(+), 365 deletions(-) create mode 100644 reapers/api.go create mode 100644 reapers/endpoints_test.go diff --git a/Dockerfile b/Dockerfile index 9ff0d54..4972baa 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,9 +1,11 @@ FROM golang:1.20-bullseye as builder WORKDIR /netreap -COPY . /netreap +COPY go.mod go.sum /netreap/ +RUN go mod download +COPY . /netreap/ ARG VERSION RUN go build -ldflags "-s -w -X 'main.Version=$VERSION'" - FROM gcr.io/distroless/base-debian11 +WORKDIR / COPY --from=builder /netreap/netreap /usr/bin/netreap ENTRYPOINT ["/usr/bin/netreap"] diff --git a/Makefile b/Makefile index 5ee38a8..39c725b 100644 --- a/Makefile +++ b/Makefile @@ -8,3 +8,6 @@ docker: ci: docker buildx build --platform $(platforms) --tag $(repo):$(VERSION) --push . + +test: + go test -v ./... \ No newline at end of file diff --git a/README.md b/README.md index 951ff8a..3c0deb6 100644 --- a/README.md +++ b/README.md @@ -186,10 +186,6 @@ job "netreap" { task "netreap" { driver = "docker" - env { - NETREAP_CILIUM_CIDR = "172.16.0.0/16" - } - config { image = "ghcr.io/cosmonic/netreap:0.1.2" network_mode = "host" @@ -235,15 +231,14 @@ clients are available to Netreap. | Flag | Env Var | Default | Description | | ---------------------- | --------------------- | ----------------------------- | ------------------------------------------------------------------------------------------------------------- | -| `--cilium-cidr`, `-c`, | `NETREAP_CILIUM_CIDR` | None, this is a required flag | The CIDR block of the address space used by Cilium. This allows netreap to identify if a job is a Cilium one. | | `--debug` | `NETREAP_DEBUG` | `false` | Turns on debug logging | | `--policy-key` | `NETREAP_POLICY_KEY` | `netreap.io/policy` | Consul key that Netreap watches for changes to the Cilium policy JSON value | -| `--exclude-tags` | `NETREAP_EXCLUDE_TAG` | None | List of Consul service tags to use as a filter to exclude from Netreap | -Please note that to configure the Nomad and Consul clients that Netreap uses, +Please note that to configure the Nomad, Consul and Cilium clients that Netreap uses, we leverage the well defined environment variables for -[Nomad](https://www.nomadproject.io/docs/commands#environment-variables) and -[Consul](https://www.consul.io/commands#environment-variables). +[Nomad](https://www.nomadproject.io/docs/commands#environment-variables), +[Consul](https://www.consul.io/commands#environment-variables) and +[Cilium](https://pkg.go.dev/github.com/cilium/cilium/pkg/client#DefaultSockPath). Right now we only allow connecting to the local Unix socket endpoint for the Cilium agent. As we determine how we are going to set things up with Cilium, we diff --git a/go.mod b/go.mod index 0882e20..750c9bd 100644 --- a/go.mod +++ b/go.mod @@ -4,12 +4,10 @@ go 1.20 require ( github.com/cilium/cilium v1.13.2 - github.com/google/uuid v1.3.0 github.com/hashicorp/consul/api v1.18.0 - github.com/hashicorp/nomad/api v0.0.0-20230420161604-206236039ccd + github.com/hashicorp/nomad/api v0.0.0-20230719205936-8d2894699319 github.com/urfave/cli/v2 v2.11.2 go.uber.org/zap v1.23.0 - golang.org/x/exp v0.0.0-20230108222341-4b8118a2686a ) require ( @@ -42,8 +40,9 @@ require ( github.com/golang/protobuf v1.5.2 // indirect github.com/google/go-cmp v0.5.9 // indirect github.com/google/gofuzz v1.2.0 // indirect + github.com/google/uuid v1.3.0 // indirect github.com/gorilla/websocket v1.5.0 // indirect - github.com/hashicorp/cronexpr v1.1.1 // indirect + github.com/hashicorp/cronexpr v1.1.2 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect github.com/hashicorp/go-hclog v1.4.0 // indirect @@ -110,6 +109,7 @@ require ( go.opentelemetry.io/otel/trace v1.11.2 // indirect go.uber.org/atomic v1.10.0 // indirect go.uber.org/multierr v1.8.0 // indirect + golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 // indirect golang.org/x/net v0.7.0 // indirect golang.org/x/oauth2 v0.4.0 // indirect golang.org/x/sync v0.1.0 // indirect diff --git a/go.sum b/go.sum index a5d074e..467359e 100644 --- a/go.sum +++ b/go.sum @@ -113,6 +113,7 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU= github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= +github.com/felixge/httpsnoop v1.0.3 h1:s/nj+GCswXYzN5v2DpNMuMQYe+0DDwt5WVCU6CWBdXk= github.com/frankban/quicktest v1.14.3 h1:FJKSZTDHjyhriyC81FLQ0LY93eSai0ZyR/ZIkd3ZUKE= github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= @@ -278,8 +279,8 @@ github.com/hashicorp/consul/api v1.18.0 h1:R7PPNzTCeN6VuQNDwwhZWJvzCtGSrNpJqfb22 github.com/hashicorp/consul/api v1.18.0/go.mod h1:owRRGJ9M5xReDC5nfT8FTJrNAPbT4NM6p/k+d03q2v4= github.com/hashicorp/consul/sdk v0.13.0 h1:lce3nFlpv8humJL8rNrrGHYSKc3q+Kxfeg3Ii1m6ZWU= github.com/hashicorp/consul/sdk v0.13.0/go.mod h1:0hs/l5fOVhJy/VdcoaNqUSi2AUs95eF5WKtv+EYIQqE= -github.com/hashicorp/cronexpr v1.1.1 h1:NJZDd87hGXjoZBdvyCF9mX4DCq5Wy7+A/w+A7q0wn6c= -github.com/hashicorp/cronexpr v1.1.1/go.mod h1:P4wA0KBl9C5q2hABiMO7cp6jcIg96CDh1Efb3g1PWA4= +github.com/hashicorp/cronexpr v1.1.2 h1:wG/ZYIKT+RT3QkOdgYc+xsKWVRgnxJ1OJtjjy84fJ9A= +github.com/hashicorp/cronexpr v1.1.2/go.mod h1:P4wA0KBl9C5q2hABiMO7cp6jcIg96CDh1Efb3g1PWA4= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= @@ -325,8 +326,8 @@ github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO github.com/hashicorp/mdns v1.0.4/go.mod h1:mtBihi+LeNXGtG8L9dX59gAEa12BDtBQSp4v/YAJqrc= github.com/hashicorp/memberlist v0.5.0 h1:EtYPN8DpAURiapus508I4n9CzHs2W+8NZGbmmR/prTM= github.com/hashicorp/memberlist v0.5.0/go.mod h1:yvyXLpo0QaGE59Y7hDTsTzDD25JYBZ4mHgHUZ8lrOI0= -github.com/hashicorp/nomad/api v0.0.0-20230420161604-206236039ccd h1:jnx5Zofpo0xmn7agq1E1buUTFEGs25vEoOU8t7kENBc= -github.com/hashicorp/nomad/api v0.0.0-20230420161604-206236039ccd/go.mod h1:2TCrNvonL09r7EiQ6M2rNt+Cmjbn1QbzchFoTWJFpj4= +github.com/hashicorp/nomad/api v0.0.0-20230719205936-8d2894699319 h1:o7qzfGTTqk9QlCDQnvcrOe6sNltH4NbcgyWD7aXK2Uw= +github.com/hashicorp/nomad/api v0.0.0-20230719205936-8d2894699319/go.mod h1:O23qLAZuCx4htdY9zBaO4cJPXgleSFEdq6D/sezGgYE= github.com/hashicorp/serf v0.10.1 h1:Z1H2J60yRKvfDYAOZLd2MU0ND4AH/WDz7xYHDWQsIPY= github.com/hashicorp/serf v0.10.1/go.mod h1:yL2t6BqATOLGc5HF7qbFkTfXoPIY0WZdWHfEvMqbG+4= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= @@ -492,7 +493,7 @@ github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUt github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= github.com/shirou/gopsutil/v3 v3.23.1 h1:a9KKO+kGLKEvcPIs4W62v0nu3sciVDOOOPUD0Hz7z/4= github.com/shirou/gopsutil/v3 v3.23.1/go.mod h1:NN6mnm5/0k8jw4cBfCnJtr5L7ErOTg18tMNpgFkn0hA= -github.com/shoenig/test v0.6.3 h1:GVXWJFk9PiOjN0KoJ7VrJGH6uLPnqxR7/fe3HUPfE0c= +github.com/shoenig/test v0.6.6 h1:Oe8TPH9wAbv++YPNDKJWUnI8Q4PPWCx3UbOfH+FxiMU= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q= @@ -615,8 +616,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= -golang.org/x/exp v0.0.0-20230108222341-4b8118a2686a h1:tlXy25amD5A7gOfbXdqCGN5k8ESEed/Ee1E5RcrYnqU= -golang.org/x/exp v0.0.0-20230108222341-4b8118a2686a/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= +golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 h1:MGwJjxBy0HJshjDNfLsYO8xppfqWlA5ZT9OhtUUhTNw= +golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= @@ -642,7 +643,7 @@ golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= -golang.org/x/mod v0.7.0 h1:LapD9S96VoQRhi/GrNTqeBJFrUjs5UHCAtTlgwA5oZA= +golang.org/x/mod v0.11.0 h1:bUO06HqtnRcc/7l71XBe4WcqTZ+3AH1J59zWDDwLKgU= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= diff --git a/main.go b/main.go index 3dba21e..549237e 100644 --- a/main.go +++ b/main.go @@ -4,10 +4,10 @@ import ( "context" "fmt" "log" - "net" "os" "os/signal" + cilium_client "github.com/cilium/cilium/pkg/client" consul_api "github.com/hashicorp/consul/api" nomad_api "github.com/hashicorp/nomad/api" "github.com/urfave/cli/v2" @@ -20,9 +20,7 @@ import ( var Version = "unreleased" type config struct { - net string - policyKey string - excludeTags []string + policyKey string } func main() { @@ -35,8 +33,6 @@ func main() { } defer logger.Sync() - excludeTags := cli.StringSlice{} - app := &cli.App{ Name: "netreap", Usage: "A custom monitor and reaper for cleaning up Cilium endpoints and nodes", @@ -48,14 +44,6 @@ func main() { EnvVars: []string{"NETREAP_DEBUG"}, Destination: &debug, }, - &cli.StringFlag{ - Name: "cilium-cidr", - Aliases: []string{"c"}, - Usage: "The CIDR block that Cilium addresses belong to. This is used for checking if a service is a Cilium service or not", - EnvVars: []string{"NETREAP_CILIUM_CIDR"}, - Destination: &conf.net, - Required: true, - }, &cli.StringFlag{ Name: "policy-key", Aliases: []string{"k"}, @@ -64,13 +52,6 @@ func main() { EnvVars: []string{"NETREAP_POLICY_KEY"}, Destination: &conf.policyKey, }, - &cli.StringSliceFlag{ - Name: "exclude-tag", - Aliases: []string{"e"}, - Usage: "Consul service tags to skip when checking for Cilium-enabled jobs", - EnvVars: []string{"NETREAP_EXCLUDE_TAG"}, - Destination: &excludeTags, - }, }, Before: func(ctx *cli.Context) error { if debug { @@ -84,7 +65,6 @@ func main() { return nil }, Action: func(c *cli.Context) error { - conf.excludeTags = excludeTags.Value() return run(conf) }, Version: Version, @@ -96,17 +76,18 @@ func main() { } func run(conf config) error { - _, net, err := net.ParseCIDR(conf.net) - if err != nil { - return fmt.Errorf("unable to parse Cilium CIDR block: %s", err) - } - // Step 0: Construct clients consul_client, err := consul_api.NewClient(consul_api.DefaultConfig()) if err != nil { return fmt.Errorf("unable to connect to Consul: %s", err) } + // Looks for the default Cilium socket path or uses the value from CILIUM_SOCK + cilium_client, err := cilium_client.NewDefaultClient() + if err != nil { + return fmt.Errorf("error when connecting to cilium agent: %s", err) + } + // DefaultConfig fetches configuration data from well-known nomad variables (e.g. NOMAD_ADDR, // NOMAD_CACERT), so we'll just leverage that for now. nomad_client, err := nomad_api.NewClient(nomad_api.DefaultConfig()) @@ -133,12 +114,12 @@ func run(conf config) error { } zap.S().Debug("Starting endpoint reaper") - endpoint_reaper, err := reapers.NewEndpointReaper(ctx, nomad_client, consul_client, net, conf.excludeTags) + endpoint_reaper, err := reapers.NewEndpointReaper(cilium_client, nomad_client.Allocations(), nomad_client.EventStream()) if err != nil { return err } - endpointFailChan, err := endpoint_reaper.Run() + endpointFailChan, err := endpoint_reaper.Run(ctx) if err != nil { return fmt.Errorf("unable to start endpoint reaper: %s", err) } diff --git a/reapers/api.go b/reapers/api.go new file mode 100644 index 0000000..d0c82fe --- /dev/null +++ b/reapers/api.go @@ -0,0 +1,34 @@ +package reapers + +import ( + "context" + + "github.com/cilium/cilium/api/v1/models" + nomad_api "github.com/hashicorp/nomad/api" +) + +type AllocationInfo interface { + Info(allocID string, q *nomad_api.QueryOptions) (*nomad_api.Allocation, *nomad_api.QueryMeta, error) +} + +type EventStreamer interface { + Stream(ctx context.Context, topics map[nomad_api.Topic][]string, index uint64, q *nomad_api.QueryOptions) (<-chan *nomad_api.Events, error) +} + +type EndpointLister interface { + EndpointList() ([]*models.Endpoint, error) +} + +type EndpointGetter interface { + EndpointGet(id string) (*models.Endpoint, error) +} + +type EndpointPatcher interface { + EndpointPatch(id string, ep *models.EndpointChangeRequest) error +} + +type EndpointUpdater interface { + EndpointLister + EndpointGetter + EndpointPatcher +} diff --git a/reapers/endpoints.go b/reapers/endpoints.go index 8d7972a..f009e56 100644 --- a/reapers/endpoints.go +++ b/reapers/endpoints.go @@ -5,69 +5,67 @@ import ( "fmt" "math" "net" - "strconv" "strings" "time" "github.com/cilium/cilium/api/v1/models" - "github.com/cilium/cilium/pkg/client" - "github.com/google/uuid" - consul_api "github.com/hashicorp/consul/api" + endpoint_id "github.com/cilium/cilium/pkg/endpoint/id" nomad_api "github.com/hashicorp/nomad/api" "go.uber.org/zap" - "golang.org/x/exp/slices" ) const ( netreapLabelPrefix = "netreap" + nomadLabelPrefix = "nomad" jobIDLabel = "nomad.job_id" + taskGroupLabel = "nomad.task_group_id" namespaceLabel = "nomad.namespace" - nomadServicePrefix = "_nomad-task-" ) -// Max number of retries for fetching a newly created service -const maxRetries = 10 -const retryWait = time.Second * 2 - -// Magic number: I took a guess. DO NOT consider this number authoritative -const reapLimit uint = 20 - type EndpointReaper struct { - nomad *nomad_api.Client - consul *consul_api.Client - cilium *client.Client - ctx context.Context - cidr *net.IPNet - numToReap uint - excludeTags []string + cilium EndpointUpdater + nomadAllocations AllocationInfo + nomadEventStream EventStreamer } // NewEndpointReaper creates a new EndpointReaper. This will run an initial reconciliation before // returning the reaper -func NewEndpointReaper(ctx context.Context, nomad_client *nomad_api.Client, consul_client *consul_api.Client, cidr *net.IPNet, excludeTags []string) (*EndpointReaper, error) { - // TODO: Improve this once we figure out how we are setting up cilium everywhere - cilium, err := client.NewDefaultClient() - if err != nil { - return nil, fmt.Errorf("error when connecting to cilium agent: %s", err) +func NewEndpointReaper(ciliumClient EndpointUpdater, nomadAllocations AllocationInfo, nomadEventStream EventStreamer) (*EndpointReaper, error) { + reaper := EndpointReaper{ + cilium: ciliumClient, + nomadAllocations: nomadAllocations, + nomadEventStream: nomadEventStream, } - reaper := EndpointReaper{nomad: nomad_client, consul: consul_client, cidr: cidr, cilium: cilium, ctx: ctx, excludeTags: excludeTags} + // Do the initial reconciliation loop if err := reaper.reconcile(); err != nil { return nil, fmt.Errorf("unable to perform initial reconciliation: %s", err) } + return &reaper, nil } // Run the reaper until the context given in the contructor is cancelled. This function is non // blocking and will only return errors if something occurs during startup // return a channel to notify of consul client failures -func (e *EndpointReaper) Run() (<-chan bool, error) { +func (e *EndpointReaper) Run(ctx context.Context) (<-chan bool, error) { + // NOTE: Specifying uint max so that it starts from the next available index. If there is a // better way to start from latest index, we can change this - eventChan, err := e.nomad.EventStream().Stream(e.ctx, map[nomad_api.Topic][]string{nomad_api.TopicJob: {}}, math.MaxInt64, &nomad_api.QueryOptions{Namespace: "*"}) + eventChan, err := e.nomadEventStream.Stream( + ctx, + map[nomad_api.Topic][]string{ + nomad_api.TopicJob: {}, + }, + math.MaxInt64, + &nomad_api.QueryOptions{ + Namespace: "*", + }, + ) if err != nil { return nil, fmt.Errorf("error when starting node event stream: %s", err) } + failChan := make(chan bool, 1) go func() { @@ -76,51 +74,34 @@ func (e *EndpointReaper) Run() (<-chan bool, error) { for { select { - case <-e.ctx.Done(): - zap.S().Info("Context cancelled, shutting down endpoint reaper") + case <-ctx.Done(): + zap.L().Info("Context cancelled, shutting down endpoint reaper") return + case <-tick.C: - zap.S().Info("Reconciliation loop started") + zap.L().Info("Periodic reconciliation loop started") if err := e.reconcile(); err != nil { - zap.S().Errorw("Error occurred during reconcilation, will retry next loop", "error", err) + zap.L().Error("Error occurred during reconcilation, will retry next loop", zap.Error(err)) } + case events := <-eventChan: if events.Err != nil { - zap.S().Debugw("Got error message from node event channel", "error", events.Err) + zap.L().Debug("Got error message from node event channel", zap.Error(events.Err)) failChan <- true return } - zap.S().Debugf("Got %v job events. Handling...", len(events.Events)) + + zap.L().Debug("Got events from Allocation topic. Handling...", zap.Int("event-count", len(events.Events))) + for _, event := range events.Events { switch event.Type { - case "JobDeregistered": - if err := e.handleJobDelete(event); err != nil { - zap.S().Errorw("Error while handling job delete", "error", err) - continue - } - case "JobRegistered": - go e.handleJobCreate(event) + case "AllocationUpdated": + go e.handleAllocationUpdated(event) default: - zap.S().Debugf("Ignoring Job event with type of %s", event.Type) + zap.L().Debug("Ignoring unhandled event from Allocation topic", zap.String("event-type", event.Type)) continue } } - - // NOTE: Because the process for iterating over all jobs is so intensive (having to - // pull each job with its own http request), we just keep a counter of how many jobs - // have been shutdown. Once that counter is hit, we trigger a reconciliation - if e.numToReap >= reapLimit { - zap.S().Debug("Reached reap limit, triggering reconciliation") - // TODO(thomastaylor312): There is a race condition here where that last job - // that was deleted will not have cleaned up its service from consul yet, so it - // will appear to still be around. For expediency, I'm just leaving it for now - // as it will be caught on the next reconciliation. But we might want to find a - // way to fix it later if we really care - if err := e.reconcile(); err != nil { - zap.S().Errorw("Error when performing reconciliation, will retry", "error", err) - } - } - } } }() @@ -128,307 +109,171 @@ func (e *EndpointReaper) Run() (<-chan bool, error) { return failChan, nil } -type serviceData struct { - Name string -} - -type jobData struct { - ID string - Namespace string -} - -func (e *EndpointReaper) includeService(tags []string) bool { - includeService := true - for _, tag := range tags { - for _, t := range e.excludeTags { - if tag == t { - includeService = false - break - } - } - } - return includeService -} - func (e *EndpointReaper) reconcile() error { - // Get current endpoints list - zap.S().Debug("Starting reconciliation") - catalogsClient := e.consul.Catalog() - servicesList, _, err := catalogsClient.Services(nil) - if err != nil { - return fmt.Errorf("unable to list current consul services") - } - - servicesToQuery := []serviceData{} - for k, v := range servicesList { - if e.includeService(v) { - servicesToQuery = append(servicesToQuery, serviceData{Name: k}) - } - } - - zap.S().Debug("Finished fetching service list, constructing set of IP addresses from services", "service_list", servicesToQuery) - // Create a list of all known IP addresses to job names - ipMap := map[string]jobData{} - for _, serviceInfo := range servicesToQuery { - // Fetch the full job object and skip any one that isn't a cilium one. Yeah, this is gross, - // but the list endpoint doesn't actually list the full object - // NOTE: the empty string is an empty "tag" that selects everything - services, _, err := catalogsClient.Service(serviceInfo.Name, "", nil) - if err != nil { - return fmt.Errorf("unable to fetch additional job data for %s: %s", serviceInfo.Name, err) - } - - for _, service := range services { - addr := net.ParseIP(service.ServiceAddress) - if e.cidr.Contains(addr) { - id := service.ServiceID - if strings.Contains(id, nomadServicePrefix) { - svc := strings.Split(id, nomadServicePrefix) - split := strings.SplitN(svc[1], "-", 6) - fmt.Println(split) - allocID := strings.Join(split[0:5], "-") - - if _, err := uuid.Parse(allocID); err != nil { - return fmt.Errorf("unable to parse alloc id %s: %s", allocID, err) - } - ipMap[service.ServiceAddress] = jobData{ID: allocID} - } - } else { - // Skip if this isn't a cilium service - continue - } - } - } + zap.L().Debug("Starting reconciliation") - zap.S().Debugw("Finished generating current IP list. Fetching endpoints from cilium", "ip_list", ipMap) + // Get current endpoints list endpoints, err := e.cilium.EndpointList() if err != nil { return fmt.Errorf("unable to list current cilium endpoints: %s", err) } - var deleteErrors uint = 0 + zap.L().Debug("checking each endpoint", zap.Int("endpoints-total", len(endpoints))) - zap.S().Debug("Checking all endpoints") - // Loop through all endpoints and check if they are now orphaned. Clean them up if they are. - // Then, make sure existing endpoints are labeled for _, endpoint := range endpoints { - // NOTE(thomastaylor312): As far as I can tell, our services are all classified as an "init" - // label. So we should only look at those ones. Other ones are not registered in consul as - // services necessarily - if !slices.Contains(endpoint.Status.Labels.SecurityRelevant, "reserved:init") { - zap.S().Debugw("Endpoint is not an init service, skipping", "labels", endpoint.Status.Labels.SecurityRelevant) + endpointID := endpoint_id.NewCiliumID(endpoint.ID) + containerID := endpoint.Status.ExternalIdentifiers.ContainerID + + // Only managing endpoints with container IDs + if containerID == "" { + zap.L().Debug("Skipping endpoint that is not associated with a container", + zap.String("endpoint-id", endpointID), + ) continue } - zap.S().Debugw("Checking if endpoint still exists", "endpoint_id", endpoint.ID) - - // NOTE(thomastaylor312): In the future if we get more complicated with multiple addresses, - // this logic will need to be reworked to check if any of them match - for _, ip := range endpoint.Status.Networking.Addressing { - zap.S().Debugw("Got ip", "ip", ip) - data := jobData{} - datav4, existsv4 := ipMap[ip.IPV4] - datav6, existsv6 := ipMap[ip.IPV6] - if !existsv4 && !existsv6 { - zap.S().Debugw("Endpoint no longer exists, deleting", "endpoint_id", endpoint.ID) - if err := e.cilium.EndpointDelete(strconv.FormatInt(endpoint.ID, 10)); err != nil { - deleteErrors++ - zap.S().Errorw("Error when cleaning up IP address. Will retry on next reconciliation", "error", err) - } - // If we are deleting, no further logic is needed, so break out of this loop - break - } - // I believe these should be the same job id as they come from the same endpoint. - if len(datav4.ID) > 0 { - data = datav4 - } else if len(datav6.ID) > 0 { - data = datav6 - } - - // Check if the endpoints have labels if it wasn't a delete. Then label if needed - if !hasLabels(endpoint) && len(data.ID) > 0 { - zap.S().Debugw("Found an endpoint missing labels. Updating with current job labels", "endpoint_id", endpoint.ID) - // First get the alloc - alloc, _, err := e.nomad.Allocations().Info(data.ID, &nomad_api.QueryOptions{Namespace: "*"}) - if err != nil { - zap.S().Warnw("couldn't fetch alloc from nomad", "alloc_id", data.ID, "error", err) - return err - } + // Nomad calls the CNI plugin with the allocation ID as the container ID + allocation, _, err := e.nomadAllocations.Info(containerID, &nomad_api.QueryOptions{Namespace: "*"}) + if err != nil { + zap.L().Warn("Couldn't fetch allocation from Nomad", + zap.String("container-id", containerID), + zap.String("endpoint-id", endpointID), + zap.Error(err), + ) + continue + } - if err := e.labelEndpoint(endpoint.ID, alloc.JobID, alloc.Namespace); err != nil { - return fmt.Errorf("unable to label job with appropriate metadata: %s", err) - } - } + if allocation != nil { + zap.L().Debug("Patching labels on endpoint", + zap.String("container-id", containerID), + zap.String("endpoint-id", endpointID), + zap.Error(err), + ) + + labels := e.createLabelsForAllocation(allocation) + + e.labelEndpoint(endpointID, allocation.ID, allocation.Name, labels) + } else { + zap.L().Debug("Skipping endpoint as allocation not in Nomad", + zap.String("container-id", containerID), + zap.String("endpoint-id", endpointID), + zap.Error(err), + ) } } - zap.S().Debugw("Finished reconciliation", "num_errors", deleteErrors) - e.numToReap = deleteErrors + zap.L().Debug("Finished reconciliation") return nil } -func (e *EndpointReaper) handleJobDelete(event nomad_api.Event) error { - // Filter out only when we get jobs we care about. Each deleted job triggers an event with an - // evaluation and one with the job spec and `getJobData` handles that for us - jobID, _, ok := getJobData(event) - if !ok { - return nil +func (e *EndpointReaper) handleAllocationUpdated(event nomad_api.Event) { + allocation, err := event.Allocation() + if err != nil { + zap.L().Debug("Unable to deserialize Allocation", + zap.String("event-type", event.Type), + zap.Uint64("event-index", event.Index), + ) + return } - // NOTE: We are just incrementing whenever we see a job delete. There is no way for us to go - // fetch the associated service from consul once the job is deleted, so we can't check if it was - // a cilium job - e.numToReap++ - zap.S().Debugw("Got deleted job. Increasing reaper counter", "count", e.numToReap, "job_id", jobID) - return nil -} -func (e *EndpointReaper) handleJobCreate(event nomad_api.Event) { - jobID, namespace, ok := getJobData(event) - if !ok { - // We already logged, so just noop out + if allocation == nil { + zap.L().Debug("Allocation was empty", + zap.String("event-type", event.Type), + zap.Uint64("event-index", event.Index), + ) return } - job, _, err := e.nomad.Jobs().Info(jobID, &nomad_api.QueryOptions{Namespace: namespace}) - if err != nil { - zap.S().Warnw("Unable to fetch job info", "error", err, "job_id", jobID) + if allocation.NetworkStatus == nil || allocation.NetworkStatus.Address == "" { + zap.L().Debug("Allocation has no IP address, ignoring", + zap.String("event-type", event.Type), + zap.Uint64("event-index", event.Index), + zap.String("container-id", allocation.ID), + ) + return } - var serviceNames []string - for _, taskGroup := range job.TaskGroups { - for _, service := range taskGroup.Services { - // TODO(protochron): Add support for Nomad services - if service.Provider == "consul" { - if !e.includeService(service.Tags) { - zap.S().Debugw("Skipping service because it has an excluded tag", "service_name", service.Name) - continue - } - serviceNames = append(serviceNames, service.Name) - } + allocationIP := net.ParseIP(allocation.NetworkStatus.Address) + endpointID := endpoint_id.NewIPPrefixID(allocationIP) + + endpoint, err := e.cilium.EndpointGet(endpointID) + if err != nil { + fields := []zap.Field{zap.String("event-type", event.Type), + zap.Uint64("event-index", event.Index), + zap.String("container-id", allocation.ID), + zap.String("endpoint-id", endpointID), + zap.Error(err), } - for _, task := range taskGroup.Tasks { - for _, service := range task.Services { - // TODO(protochron): Add support for Nomad services - if service.Provider == "consul" { - if !e.includeService(service.Tags) { - zap.S().Debugw("Skipping service because it has an excluded tag", "service_name", service.Name) - continue - } - serviceNames = append(serviceNames, service.Name) - } - } + if strings.Contains(err.Error(), "getEndpointIdNotFound") { + // This is fine, the endpoint probably just isn't on this host + zap.L().Debug("Endpoint not found", fields...) + } else { + zap.L().Warn("Unable to get endpoint", fields...) } + return } - for _, serviceName := range serviceNames { - var services []*consul_api.CatalogService - for i := 0; i < maxRetries; i++ { - zap.S().Debugw("Fetching services from consul for job", "job_id", jobID, "retry_num", i+1) - services, _, err = e.consul.Catalog().Service(serviceName, "", nil) - if err != nil { - zap.S().Errorw("Unable to fetch current list of services", "error", err) - return - } - if len(services) == 0 || services == nil { - zap.S().Debugw("Did not find a ready service in consul", "job_id", jobID, "retry_num", i+1) - time.Sleep(retryWait) - } else { - break - } - } - if len(services) == 0 || services == nil { - zap.S().Errorw("couldn't find any services associated with the job after multiple retries. Aborting labeling attempt", "job_id", jobID) + if allocation.Job == nil { + // Fetch the full allocation since the event didn't have the Job with the metadata + allocation, _, err = e.nomadAllocations.Info(allocation.ID, &nomad_api.QueryOptions{Namespace: allocation.Namespace}) + if err != nil { + zap.L().Warn("Couldn't fetch allocation from Nomad", + zap.String("event-type", event.Type), + zap.Uint64("event-index", event.Index), + zap.String("container-id", allocation.ID), + zap.String("endpoint-id", endpointID), + zap.Error(err), + ) return } - zap.S().Debug("Found services for new job", "job_id", jobID) - for _, service := range services { - stringAddr := service.ServiceAddress - addr := net.ParseIP(stringAddr) - if !e.cidr.Contains(addr) { - // Skip if this isn't a cilium service - zap.S().Debugw("New job is not a cilium service. Skipping further steps", "job_id", jobID) - return - } + } - // TODO(thomastaylor312): We might want to see if there is a more efficient way than querying - // all endpoints and looping - zap.S().Debugw("Finding related cilium endpoint for job", "job_id", jobID) - endpoints, err := e.cilium.EndpointList() - if err != nil { - zap.S().Errorw("Unable to fetch current list of cilium endpoints", "error", err) - return - } + labels := e.createLabelsForAllocation(allocation) - foundEndpoint := false - for _, endpoint := range endpoints { - for _, ip := range endpoint.Status.Networking.Addressing { - if stringAddr == ip.IPV4 || stringAddr == ip.IPV6 { - foundEndpoint = true - // If it hasn't already been labeled, label it - if !hasLabels(endpoint) { - if err := e.labelEndpoint(endpoint.ID, jobID, namespace); err != nil { - zap.S().Errorw("Error when labeling endpoint. Will retry on next reconcile", "job_id", jobID, "error", err) - } - return - } - } - } - } - if !foundEndpoint { - zap.S().Debugw("Got a cilium job, but was unable to find an endpoint on this node", "job_id", jobID) - } - } - } + e.labelEndpoint(endpoint_id.NewCiliumID(endpoint.ID), allocation.ID, allocation.Name, labels) } -func (e *EndpointReaper) labelEndpoint(endpointID int64, jobID string, namespace string) error { - // Fetch the current job - job, _, err := e.nomad.Jobs().Info(jobID, &nomad_api.QueryOptions{Namespace: namespace}) - if err != nil { - // If for some reason the job doesn't exist then we don't need to label or fail. But we - // should warn in the logs - zap.S().Warnw("couldn't fetch job from nomad", "job_id", jobID, "error", err) - return err - } +func (e *EndpointReaper) createLabelsForAllocation(allocation *nomad_api.Allocation) models.Labels { + labels := models.Labels{fmt.Sprintf("%s:%s=%s", netreapLabelPrefix, jobIDLabel, allocation.JobID)} + labels = append(labels, fmt.Sprintf("%s:%s=%s", netreapLabelPrefix, namespaceLabel, allocation.Namespace)) + labels = append(labels, fmt.Sprintf("%s:%s=%s", netreapLabelPrefix, taskGroupLabel, allocation.TaskGroup)) - // Always add the job id for reference - labels := models.Labels{fmt.Sprintf("%s:%s=%s", netreapLabelPrefix, jobIDLabel, jobID)} - labels = append(labels, fmt.Sprintf("%s:%s=%s", netreapLabelPrefix, namespaceLabel, namespace)) - if job.Meta != nil { - for k, v := range job.Meta { - labels = append(labels, fmt.Sprintf("nomad:%s=%s", k, v)) - } + // Combine the metadata from the job and the task group with the task group taking precedence + metadata := make(map[string]string) + for k, v := range allocation.Job.Meta { + metadata[k] = v } - return e.cilium.EndpointLabelsPatch(strconv.FormatInt(endpointID, 10), labels, nil) -} -// Returns the job ID and namespace from an event as well as bool indicating whether or not we were -// able to parse the data -func getJobData(event nomad_api.Event) (string, string, bool) { - job, err := event.Job() - if err != nil { - zap.S().Debugw("Unable to deserialize Job", "event_type", event.Type) - return "", "", false + for _, taskGroup := range allocation.Job.TaskGroups { + if *taskGroup.Name == allocation.TaskGroup { + for k, v := range taskGroup.Meta { + metadata[k] = v + } + } } - if job == nil { - zap.S().Debugw("Job was empty", "event_type", event.Type) - return "", "", false + for k, v := range metadata { + labels = append(labels, fmt.Sprintf("%s:%s=%s", nomadLabelPrefix, k, v)) } - return *job.ID, *job.Namespace, true + return labels } -func hasLabels(endpoint *models.Endpoint) bool { - if endpoint == nil { - return false +func (e *EndpointReaper) labelEndpoint(endpointID string, containerID string, containerName string, labels models.Labels) { + ecr := &models.EndpointChangeRequest{ + ContainerID: containerID, + ContainerName: containerName, + Labels: labels, + State: models.EndpointStateWaitingDashForDashIdentity.Pointer(), } - // If there are any netreaper labels, assume we are ok - for _, label := range endpoint.Status.Labels.Realized.User { - if strings.HasPrefix(label, netreapLabelPrefix) { - return true - } + err := e.cilium.EndpointPatch(endpointID, ecr) + if err != nil { + zap.L().Error("Error while patching the endpoint labels of container", + zap.String("container-id", containerID), + zap.String("endpoint-id", endpointID), + zap.Strings("labels", labels), + zap.Error(err), + ) } - return false } diff --git a/reapers/endpoints_test.go b/reapers/endpoints_test.go new file mode 100644 index 0000000..d7b3e85 --- /dev/null +++ b/reapers/endpoints_test.go @@ -0,0 +1,158 @@ +package reapers + +import ( + "reflect" + "testing" + + "github.com/cilium/cilium/api/v1/models" + endpoint_id "github.com/cilium/cilium/pkg/endpoint/id" + nomad_api "github.com/hashicorp/nomad/api" +) + +type allocationInfoMock struct { + infoFn func(allocID string, q *nomad_api.QueryOptions) (*nomad_api.Allocation, *nomad_api.QueryMeta, error) +} + +func (p *allocationInfoMock) Info(allocID string, q *nomad_api.QueryOptions) (*nomad_api.Allocation, *nomad_api.QueryMeta, error) { + if p != nil && p.infoFn != nil { + return p.infoFn(allocID, q) + } + return nil, nil, nil +} + +type endpointUpdaterMock struct { + endpointListFn func() ([]*models.Endpoint, error) + endpointGetFn func(id string) (*models.Endpoint, error) + endpointPatchFn func(id string, ep *models.EndpointChangeRequest) error +} + +func (p *endpointUpdaterMock) EndpointList() ([]*models.Endpoint, error) { + if p != nil && p.endpointListFn != nil { + return p.endpointListFn() + } + return nil, nil +} + +func (p *endpointUpdaterMock) EndpointGet(id string) (*models.Endpoint, error) { + if p != nil && p.endpointGetFn != nil { + return p.endpointGetFn(id) + } + return nil, nil +} + +func (p *endpointUpdaterMock) EndpointPatch(id string, ep *models.EndpointChangeRequest) error { + if p != nil && p.endpointPatchFn != nil { + return p.endpointPatchFn(id, ep) + } + return nil +} + +func TestEndpointReconcile(t *testing.T) { + endpointOne := &models.Endpoint{ + ID: 1, + Status: &models.EndpointStatus{ + ExternalIdentifiers: &models.EndpointIdentifiers{ + ContainerID: "containerID", + }, + Labels: &models.LabelConfigurationStatus{ + SecurityRelevant: models.Labels{"reserved:init"}, + }, + }, + } + allocationOne := &nomad_api.Allocation{ + ID: "containerID", + JobID: "jobID", + Namespace: "namespace", + TaskGroup: "taskGroup", + Job: &nomad_api.Job{ + Meta: map[string]string{}, + }, + } + endpointOneLabels := models.Labels{ + "netreap:nomad.job_id=jobID", + "netreap:nomad.namespace=namespace", + "netreap:nomad.task_group_id=taskGroup", + } + + tests := []struct { + name string + cilium *endpointUpdaterMock + nomadAllocations *allocationInfoMock + shouldErr bool + }{ + { + "No Endpoints", + &endpointUpdaterMock{ + endpointListFn: func() ([]*models.Endpoint, error) { + return []*models.Endpoint{}, nil + }, + endpointPatchFn: func(id string, ep *models.EndpointChangeRequest) error { + t.Fatalf("unexpected call to patch endpoint") + return nil + }, + }, + &allocationInfoMock{ + infoFn: func(allocID string, q *nomad_api.QueryOptions) (*nomad_api.Allocation, *nomad_api.QueryMeta, error) { + t.Fatalf("unexpected call to allocation info") + return nil, nil, nil + }, + }, + false, + }, + { + "One endpoint", + &endpointUpdaterMock{ + endpointListFn: func() ([]*models.Endpoint, error) { + return []*models.Endpoint{endpointOne}, nil + }, + endpointPatchFn: func(id string, ep *models.EndpointChangeRequest) error { + expectedID := endpoint_id.NewCiliumID(endpointOne.ID) + expectedContainerID := endpointOne.Status.ExternalIdentifiers.ContainerID + + if id != expectedID { + t.Errorf("wrong endpoint ID passed, expected %v, got %v", expectedID, id) + } + + if ep.ContainerID != expectedContainerID { + t.Errorf("wrong container ID passed, expected %v, got %v", expectedContainerID, ep.ContainerID) + } + + if !reflect.DeepEqual(ep.Labels, endpointOneLabels) { + t.Errorf("wrong labels, expected %v, got %v", endpointOneLabels, ep.Labels) + } + + return nil + }, + }, + &allocationInfoMock{ + infoFn: func(allocID string, q *nomad_api.QueryOptions) (*nomad_api.Allocation, *nomad_api.QueryMeta, error) { + expectedContainerID := endpointOne.Status.ExternalIdentifiers.ContainerID + if allocID != expectedContainerID { + t.Errorf("wrong container ID passed, expected %v, got %v", expectedContainerID, allocID) + } + return allocationOne, nil, nil + }, + }, + false, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + reaper, err := NewEndpointReaper(tt.cilium, tt.nomadAllocations, nil) + if err != nil { + t.Fatalf("unexpected error creating poller %v", err) + } + + err = reaper.reconcile() + + if tt.shouldErr && err == nil { + t.Error("expected error but got ") + } + if !tt.shouldErr && err != nil { + t.Errorf("unexpected error %v", err) + } + }) + } +}