diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..b6fe515 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,20 @@ +# Changelog +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), +and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +## [Unreleased] + +### Added +* Ability to update service endpoints based on current leader + +### Changed +* health endpoint now reports error on expired lease + +## [v0.1.0] 2020-07-29 + +### Added +* Running in elected mode in k8s cluster +* Running without election outside of k8s cluster +* Basic error handling diff --git a/README.md b/README.md index 456efda..6949ca1 100644 --- a/README.md +++ b/README.md @@ -8,6 +8,16 @@ Ensure that only a single instance of a process is running in your Kubernetes cl capabilities to coordinate commands running in different pods. It acts as a gatekeeper, only starting the command when the pod becomes a leader. +### Why leader election? +Some applications aren't natively able to run in a replicated way. +For example, they maintain some internal state which is only synchronized at the start. + +Running such an application as a single replica is not ideal. Should the node the pod +is running on go offline unexpectedly, Kubernetes will take its time to reschedule +(5min+ by default). + +Leader election is able to elect a new leader in about 10-15 seconds in such an event. + ## Usage Just set `k8s-await-election` as entry point into your image. Configuration of the leader election happens via environment variables. If no environment variables were passed, `k8s-await-election` @@ -15,14 +25,20 @@ will just start the command without waiting to become elected. The relevant environment variables are -| Variable | Description | -|--------------------------------------|-----------------------------------------------------------------| -| `K8S_AWAIT_ELECTION_ENABLED` | Set to any non-empty value to enable leader election | -| `K8S_AWAIT_ELECTION_NAME` | Name of the election processes. Useful for debugging | -| `K8S_AWAIT_ELECTION_LOCK_NAME` | Name of the `leases.coordination.k8s.io` resource | -| `K8S_AWAIT_ELECTION_LOCK_NAMESPACE` | Namespace of the `leases.coordination.k8s.io` resource | -| `K8S_AWAIT_ELECTION_IDENTITY` | Unique identity for each member of the election process | -| `K8S_AWAIT_ELECTION_STATUS_ENDPOINT` | Optional: endpoint to report if the election process is running | +| Variable | Description | +|-----------------------------------------|-------------------------------------------------------------------| +| `K8S_AWAIT_ELECTION_ENABLED` | Set to any non-empty value to enable leader election | +| `K8S_AWAIT_ELECTION_NAME` | Name of the election processes. Useful for debugging | +| `K8S_AWAIT_ELECTION_LOCK_NAME` | Name of the `leases.coordination.k8s.io` resource | +| `K8S_AWAIT_ELECTION_LOCK_NAMESPACE` | Namespace of the `leases.coordination.k8s.io` resource | +| `K8S_AWAIT_ELECTION_IDENTITY` | Unique identity for each member of the election process | +| `K8S_AWAIT_ELECTION_STATUS_ENDPOINT` | Optional: endpoint to report if the election process is running | +| `K8S_AWAIT_ELECTION_SERVICE_NAME` | Optional: set the service to update. [On Service Updates] | +| `K8S_AWAIT_ELECTION_SERVICE_NAMESPACE` | Optional: set the service namespace. | +| `K8S_AWAIT_ELECTION_SERVICE_PORTS_JSON` | Optional: set to json array of endpoint ports. | +| `K8S_AWAIT_ELECTION_SERVICE_IP` | Optional: IP of the pod, which will be used to update the service | + +[On Service Updates]: #service-updates Most of the time you will want to use this process in a Deployment spec or similar context. Here is an example: @@ -63,3 +79,31 @@ spec: - name: K8S_AWAIT_ELECTION_STATUS_ENDPOINT value: :9999 ``` + +### Service Updates + +`k8s-await-election` can also be used to select which pod should receive traffic for a service. +This is done by updating the endpoint resource associated with a service whenever a new leader is elected. +This leader will be the only pod receiving traffic via the service. +To enable this feature, set the `K8S_AWAIT_ELECTION_SERVICE_*` variables. +See [the full example](./examples/singleton-service.yml) + +#### Why update service endpoints? + +For deployments that provide some kind of external API (for example a REST API), we would +also like to automatically re-route traffic to the current leader. + +This is normally done via the readiness state of the pod: only ready pods associated with +a service receive traffic. Because we only start the application if we are elected, only +one pod is ever "ready" in the sense that it should receive traffic. + +This means that if we wanted to use the automatic service configuration via selectors, we +run into some issues. The "ready" state of a pod has a kind of dual use. Consider a rolling upgrade of a deployment: + +1. A new pod starts. +2. It won't become leader as the old one is still running +3. Since the app starts, it is never "ready" to receive traffic +4. the deployment controller sees the pod is not ready, and does not continue with upgrading + +`k8s-await-election` has all the information it needs to tell Kubernetes which pod should receive +traffic. This works around the above issue, at the cost of a non-usable readiness probe. diff --git a/examples/singleton-service.yml b/examples/singleton-service.yml new file mode 100644 index 0000000..29e449d --- /dev/null +++ b/examples/singleton-service.yml @@ -0,0 +1,61 @@ +--- +apiVersion: v1 +kind: Service +metadata: + name: my-service +spec: + clusterIP: "" + ports: + - name: my-port + port: 1024 + protocol: TCP + # NOTE: No selector here! A selector would automatically add all matching and ready pods to the endpoint +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: my-server-with-replicas +spec: + replicas: 5 + selector: + matchLabels: + app: my-server + template: + metadata: + labels: + app: my-server + spec: + containers: + - name: my-server + image: my-server + args: + - my-singleton-server + ports: + - containerPort: 1024 + name: my-port + env: + - name: K8S_AWAIT_ELECTION_ENABLED + value: "1" + - name: K8S_AWAIT_ELECTION_NAME + value: my-server + - name: K8S_AWAIT_ELECTION_LOCK_NAME + value: my-server + - name: K8S_AWAIT_ELECTION_LOCK_NAMESPACE + value: default + - name: K8S_AWAIT_ELECTION_IDENTITY + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: K8S_AWAIT_ELECTION_STATUS_ENDPOINT + value: :9999 + - name: K8S_AWAIT_ELECTION_SERVICE_NAME + value: my-service + - name: K8S_AWAIT_ELECTION_SERVICE_NAMESPACE + value: default + - name: K8S_AWAIT_ELECTION_SERVICE_PORTS_JSON + value: '[{"name":"my-port","port":1024}]' + - name: K8S_AWAIT_ELECTION_SERVICE_IP + valueFrom: + fieldRef: + fieldPath: status.podIP +--- diff --git a/main.go b/main.go index 2fd7588..1dce2dc 100644 --- a/main.go +++ b/main.go @@ -2,6 +2,7 @@ package main import ( "context" + "encoding/json" "fmt" "net" "net/http" @@ -9,6 +10,9 @@ import ( "os/exec" "time" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "github.com/linbit/k8s-await-election/pkg/consts" "github.com/sirupsen/logrus" @@ -24,13 +28,17 @@ var Version = "development" var log = logrus.New() type AwaitElection struct { - WithElection bool - Name string - LockName string - LockNamespace string - LeaderIdentity string - StatusEndpoint string - LeaderExec func(ctx context.Context) error + WithElection bool + Name string + LockName string + LockNamespace string + LeaderIdentity string + StatusEndpoint string + ServiceName string + ServiceNamespace string + PodIP string + ServicePorts []corev1.EndpointPort + LeaderExec func(ctx context.Context) error } type ConfigError struct { @@ -71,15 +79,29 @@ func NewAwaitElectionConfig(exec func(ctx context.Context) error) (*AwaitElectio // Optional statusEndpoint := os.Getenv(consts.AwaitElectionStatusEndpointKey) + podIP := os.Getenv(consts.AwaitElectionPodIP) + serviceName := os.Getenv(consts.AwaitElectionServiceName) + serviceNamespace := os.Getenv(consts.AwaitElectionServiceNamespace) + + servicePortsJson := os.Getenv(consts.AwaitElectionServicePortsJson) + var servicePorts []corev1.EndpointPort + err := json.Unmarshal([]byte(servicePortsJson), &servicePorts) + if serviceName != "" && err != nil { + return nil, fmt.Errorf("failed to parse ports from env: %w", err) + } return &AwaitElection{ - WithElection: true, - Name: name, - LockName: lockName, - LockNamespace: lockNamespace, - LeaderIdentity: leaderIdentity, - StatusEndpoint: statusEndpoint, - LeaderExec: exec, + WithElection: true, + Name: name, + LockName: lockName, + LockNamespace: lockNamespace, + LeaderIdentity: leaderIdentity, + StatusEndpoint: statusEndpoint, + PodIP: podIP, + ServiceName: serviceName, + ServiceNamespace: serviceNamespace, + ServicePorts: servicePorts, + LeaderExec: exec, }, nil } @@ -129,6 +151,12 @@ func (el *AwaitElection) Run() error { RetryPeriod: 2 * time.Second, Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: func(ctx context.Context) { + // First we need to register our pod as the service endpoint + err := el.setServiceEndpoint(ctx, kubeClient) + if err != nil { + execResult <- err + return + } // actual start the command here. // Note: this callback is started in a goroutine, so we can block this // execution path for as long as we want. @@ -147,7 +175,7 @@ func (el *AwaitElection) Run() error { return fmt.Errorf("failed to create leader elector: %w", err) } - statusServerResult := el.startStatusEndpoint(ctx) + statusServerResult := el.startStatusEndpoint(ctx, elector) go elector.Run(ctx) @@ -165,7 +193,37 @@ func (el *AwaitElection) Run() error { } } -func (el *AwaitElection) startStatusEndpoint(ctx context.Context) <-chan error { +func (el *AwaitElection) setServiceEndpoint(ctx context.Context, client *kubernetes.Clientset) error { + if el.ServiceName == "" { + return nil + } + + endpoints := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: el.ServiceName, + Namespace: el.ServiceNamespace, + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{{IP: el.PodIP}}, + Ports: el.ServicePorts, + }, + }, + } + _, err := client.CoreV1().Endpoints(el.ServiceNamespace).Create(ctx, endpoints, metav1.CreateOptions{}) + if err != nil { + if !errors.IsAlreadyExists(err) { + return err + } + + _, err := client.CoreV1().Endpoints(el.ServiceNamespace).Update(ctx, endpoints, metav1.UpdateOptions{}) + return err + } + + return nil +} + +func (el *AwaitElection) startStatusEndpoint(ctx context.Context, elector *leaderelection.LeaderElector) <-chan error { statusServerResult := make(chan error) if el.StatusEndpoint == "" { @@ -175,11 +233,23 @@ func (el *AwaitElection) startStatusEndpoint(ctx context.Context) <-chan error { serveMux := http.NewServeMux() serveMux.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) { - _, err := writer.Write([]byte("running")) + err := elector.Check(2 * time.Second) if err != nil { - log.WithField("err", err).Error("failed to serve status endpoint") + log.WithField("err", err).Error("failed to step down gracefully, reporting unhealthy status") + writer.WriteHeader(500) + _, err := writer.Write([]byte("{\"status\": \"expired\"}")) + if err != nil { + log.WithField("err", err).Error("failed to serve request") + } + return + } + + _, err = writer.Write([]byte("{\"status\": \"ok\"}")) + if err != nil { + log.WithField("err", err).Error("failed to serve request") } }) + statusServer := http.Server{ Addr: el.StatusEndpoint, Handler: serveMux, diff --git a/pkg/consts/consts.go b/pkg/consts/consts.go index 1a05cc0..45724e0 100644 --- a/pkg/consts/consts.go +++ b/pkg/consts/consts.go @@ -7,4 +7,8 @@ const ( AwaitElectionLockNamespaceKey = "K8S_AWAIT_ELECTION_LOCK_NAMESPACE" AwaitElectionIdentityKey = "K8S_AWAIT_ELECTION_IDENTITY" AwaitElectionStatusEndpointKey = "K8S_AWAIT_ELECTION_STATUS_ENDPOINT" + AwaitElectionPodIP = "K8S_AWAIT_ELECTION_POD_IP" + AwaitElectionServiceName = "K8S_AWAIT_ELECTION_SERVICE_NAME" + AwaitElectionServiceNamespace = "K8S_AWAIT_ELECTION_SERVICE_NAMESPACE" + AwaitElectionServicePortsJson = "K8S_AWAIT_ELECTION_SERVICE_PORTS_JSON" )