Skip to content

Commit

Permalink
add election controlled service endpoints
Browse files Browse the repository at this point in the history
Manually setting the endpoints for a service ensures that only the
elected pods receive traffic. This works around issues where controlling
traffic via pod readiness works suboptimal (i.e. rolling deployments).

In rolling deployments, readiness of a pod controls the further roll-out
of new versions. This means using readiness to control the service proxy
would lead to a deadlock: the new pods would never become "ready" as an old
pod still holds the lease, but the old pod is never terminated, as the rolling
update only continues if the new pods are ready.
  • Loading branch information
WanzenBug authored and JoelColledge committed Aug 20, 2020
1 parent 07580ae commit ddeca69
Show file tree
Hide file tree
Showing 5 changed files with 225 additions and 26 deletions.
20 changes: 20 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Changelog
All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Added
* Ability to update service endpoints based on current leader

### Changed
* health endpoint now reports error on expired lease

## [v0.1.0] 2020-07-29

### Added
* Running in elected mode in k8s cluster
* Running without election outside of k8s cluster
* Basic error handling
60 changes: 52 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,37 @@ Ensure that only a single instance of a process is running in your Kubernetes cl
capabilities to coordinate commands running in different pods. It acts as a gatekeeper, only
starting the command when the pod becomes a leader.

### Why leader election?
Some applications aren't natively able to run in a replicated way.
For example, they maintain some internal state which is only synchronized at the start.

Running such an application as a single replica is not ideal. Should the node the pod
is running on go offline unexpectedly, Kubernetes will take its time to reschedule
(5min+ by default).

Leader election is able to elect a new leader in about 10-15 seconds in such an event.

## Usage
Just set `k8s-await-election` as entry point into your image. Configuration of the leader election
happens via environment variables. If no environment variables were passed, `k8s-await-election`
will just start the command without waiting to become elected.

The relevant environment variables are

| Variable | Description |
|--------------------------------------|-----------------------------------------------------------------|
| `K8S_AWAIT_ELECTION_ENABLED` | Set to any non-empty value to enable leader election |
| `K8S_AWAIT_ELECTION_NAME` | Name of the election processes. Useful for debugging |
| `K8S_AWAIT_ELECTION_LOCK_NAME` | Name of the `leases.coordination.k8s.io` resource |
| `K8S_AWAIT_ELECTION_LOCK_NAMESPACE` | Namespace of the `leases.coordination.k8s.io` resource |
| `K8S_AWAIT_ELECTION_IDENTITY` | Unique identity for each member of the election process |
| `K8S_AWAIT_ELECTION_STATUS_ENDPOINT` | Optional: endpoint to report if the election process is running |
| Variable | Description |
|-----------------------------------------|-------------------------------------------------------------------|
| `K8S_AWAIT_ELECTION_ENABLED` | Set to any non-empty value to enable leader election |
| `K8S_AWAIT_ELECTION_NAME` | Name of the election processes. Useful for debugging |
| `K8S_AWAIT_ELECTION_LOCK_NAME` | Name of the `leases.coordination.k8s.io` resource |
| `K8S_AWAIT_ELECTION_LOCK_NAMESPACE` | Namespace of the `leases.coordination.k8s.io` resource |
| `K8S_AWAIT_ELECTION_IDENTITY` | Unique identity for each member of the election process |
| `K8S_AWAIT_ELECTION_STATUS_ENDPOINT` | Optional: endpoint to report if the election process is running |
| `K8S_AWAIT_ELECTION_SERVICE_NAME` | Optional: set the service to update. [On Service Updates] |
| `K8S_AWAIT_ELECTION_SERVICE_NAMESPACE` | Optional: set the service namespace. |
| `K8S_AWAIT_ELECTION_SERVICE_PORTS_JSON` | Optional: set to json array of endpoint ports. |
| `K8S_AWAIT_ELECTION_SERVICE_IP` | Optional: IP of the pod, which will be used to update the service |

[On Service Updates]: #service-updates

Most of the time you will want to use this process in a Deployment spec or similar context. Here is
an example:
Expand Down Expand Up @@ -63,3 +79,31 @@ spec:
- name: K8S_AWAIT_ELECTION_STATUS_ENDPOINT
value: :9999
```
### Service Updates
`k8s-await-election` can also be used to select which pod should receive traffic for a service.
This is done by updating the endpoint resource associated with a service whenever a new leader is elected.
This leader will be the only pod receiving traffic via the service.
To enable this feature, set the `K8S_AWAIT_ELECTION_SERVICE_*` variables.
See [the full example](./examples/singleton-service.yml)

#### Why update service endpoints?

For deployments that provide some kind of external API (for example a REST API), we would
also like to automatically re-route traffic to the current leader.

This is normally done via the readiness state of the pod: only ready pods associated with
a service receive traffic. Because we only start the application if we are elected, only
one pod is ever "ready" in the sense that it should receive traffic.

This means that if we wanted to use the automatic service configuration via selectors, we
run into some issues. The "ready" state of a pod has a kind of dual use. Consider a rolling upgrade of a deployment:

1. A new pod starts.
2. It won't become leader as the old one is still running
3. Since the app starts, it is never "ready" to receive traffic
4. the deployment controller sees the pod is not ready, and does not continue with upgrading

`k8s-await-election` has all the information it needs to tell Kubernetes which pod should receive
traffic. This works around the above issue, at the cost of a non-usable readiness probe.
61 changes: 61 additions & 0 deletions examples/singleton-service.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
---
apiVersion: v1
kind: Service
metadata:
name: my-service
spec:
clusterIP: ""
ports:
- name: my-port
port: 1024
protocol: TCP
# NOTE: No selector here! A selector would automatically add all matching and ready pods to the endpoint
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: my-server-with-replicas
spec:
replicas: 5
selector:
matchLabels:
app: my-server
template:
metadata:
labels:
app: my-server
spec:
containers:
- name: my-server
image: my-server
args:
- my-singleton-server
ports:
- containerPort: 1024
name: my-port
env:
- name: K8S_AWAIT_ELECTION_ENABLED
value: "1"
- name: K8S_AWAIT_ELECTION_NAME
value: my-server
- name: K8S_AWAIT_ELECTION_LOCK_NAME
value: my-server
- name: K8S_AWAIT_ELECTION_LOCK_NAMESPACE
value: default
- name: K8S_AWAIT_ELECTION_IDENTITY
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: K8S_AWAIT_ELECTION_STATUS_ENDPOINT
value: :9999
- name: K8S_AWAIT_ELECTION_SERVICE_NAME
value: my-service
- name: K8S_AWAIT_ELECTION_SERVICE_NAMESPACE
value: default
- name: K8S_AWAIT_ELECTION_SERVICE_PORTS_JSON
value: '[{"name":"my-port","port":1024}]'
- name: K8S_AWAIT_ELECTION_SERVICE_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
---
106 changes: 88 additions & 18 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@ package main

import (
"context"
"encoding/json"
"fmt"
"net"
"net/http"
"os"
"os/exec"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"

"github.com/linbit/k8s-await-election/pkg/consts"

"github.com/sirupsen/logrus"
Expand All @@ -24,13 +28,17 @@ var Version = "development"
var log = logrus.New()

type AwaitElection struct {
WithElection bool
Name string
LockName string
LockNamespace string
LeaderIdentity string
StatusEndpoint string
LeaderExec func(ctx context.Context) error
WithElection bool
Name string
LockName string
LockNamespace string
LeaderIdentity string
StatusEndpoint string
ServiceName string
ServiceNamespace string
PodIP string
ServicePorts []corev1.EndpointPort
LeaderExec func(ctx context.Context) error
}

type ConfigError struct {
Expand Down Expand Up @@ -71,15 +79,29 @@ func NewAwaitElectionConfig(exec func(ctx context.Context) error) (*AwaitElectio

// Optional
statusEndpoint := os.Getenv(consts.AwaitElectionStatusEndpointKey)
podIP := os.Getenv(consts.AwaitElectionPodIP)
serviceName := os.Getenv(consts.AwaitElectionServiceName)
serviceNamespace := os.Getenv(consts.AwaitElectionServiceNamespace)

servicePortsJson := os.Getenv(consts.AwaitElectionServicePortsJson)
var servicePorts []corev1.EndpointPort
err := json.Unmarshal([]byte(servicePortsJson), &servicePorts)
if serviceName != "" && err != nil {
return nil, fmt.Errorf("failed to parse ports from env: %w", err)
}

return &AwaitElection{
WithElection: true,
Name: name,
LockName: lockName,
LockNamespace: lockNamespace,
LeaderIdentity: leaderIdentity,
StatusEndpoint: statusEndpoint,
LeaderExec: exec,
WithElection: true,
Name: name,
LockName: lockName,
LockNamespace: lockNamespace,
LeaderIdentity: leaderIdentity,
StatusEndpoint: statusEndpoint,
PodIP: podIP,
ServiceName: serviceName,
ServiceNamespace: serviceNamespace,
ServicePorts: servicePorts,
LeaderExec: exec,
}, nil
}

Expand Down Expand Up @@ -129,6 +151,12 @@ func (el *AwaitElection) Run() error {
RetryPeriod: 2 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
// First we need to register our pod as the service endpoint
err := el.setServiceEndpoint(ctx, kubeClient)
if err != nil {
execResult <- err
return
}
// actual start the command here.
// Note: this callback is started in a goroutine, so we can block this
// execution path for as long as we want.
Expand All @@ -147,7 +175,7 @@ func (el *AwaitElection) Run() error {
return fmt.Errorf("failed to create leader elector: %w", err)
}

statusServerResult := el.startStatusEndpoint(ctx)
statusServerResult := el.startStatusEndpoint(ctx, elector)

go elector.Run(ctx)

Expand All @@ -165,7 +193,37 @@ func (el *AwaitElection) Run() error {
}
}

func (el *AwaitElection) startStatusEndpoint(ctx context.Context) <-chan error {
func (el *AwaitElection) setServiceEndpoint(ctx context.Context, client *kubernetes.Clientset) error {
if el.ServiceName == "" {
return nil
}

endpoints := &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: el.ServiceName,
Namespace: el.ServiceNamespace,
},
Subsets: []corev1.EndpointSubset{
{
Addresses: []corev1.EndpointAddress{{IP: el.PodIP}},
Ports: el.ServicePorts,
},
},
}
_, err := client.CoreV1().Endpoints(el.ServiceNamespace).Create(ctx, endpoints, metav1.CreateOptions{})
if err != nil {
if !errors.IsAlreadyExists(err) {
return err
}

_, err := client.CoreV1().Endpoints(el.ServiceNamespace).Update(ctx, endpoints, metav1.UpdateOptions{})
return err
}

return nil
}

func (el *AwaitElection) startStatusEndpoint(ctx context.Context, elector *leaderelection.LeaderElector) <-chan error {
statusServerResult := make(chan error)

if el.StatusEndpoint == "" {
Expand All @@ -175,11 +233,23 @@ func (el *AwaitElection) startStatusEndpoint(ctx context.Context) <-chan error {

serveMux := http.NewServeMux()
serveMux.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) {
_, err := writer.Write([]byte("running"))
err := elector.Check(2 * time.Second)
if err != nil {
log.WithField("err", err).Error("failed to serve status endpoint")
log.WithField("err", err).Error("failed to step down gracefully, reporting unhealthy status")
writer.WriteHeader(500)
_, err := writer.Write([]byte("{\"status\": \"expired\"}"))
if err != nil {
log.WithField("err", err).Error("failed to serve request")
}
return
}

_, err = writer.Write([]byte("{\"status\": \"ok\"}"))
if err != nil {
log.WithField("err", err).Error("failed to serve request")
}
})

statusServer := http.Server{
Addr: el.StatusEndpoint,
Handler: serveMux,
Expand Down
4 changes: 4 additions & 0 deletions pkg/consts/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,8 @@ const (
AwaitElectionLockNamespaceKey = "K8S_AWAIT_ELECTION_LOCK_NAMESPACE"
AwaitElectionIdentityKey = "K8S_AWAIT_ELECTION_IDENTITY"
AwaitElectionStatusEndpointKey = "K8S_AWAIT_ELECTION_STATUS_ENDPOINT"
AwaitElectionPodIP = "K8S_AWAIT_ELECTION_POD_IP"
AwaitElectionServiceName = "K8S_AWAIT_ELECTION_SERVICE_NAME"
AwaitElectionServiceNamespace = "K8S_AWAIT_ELECTION_SERVICE_NAMESPACE"
AwaitElectionServicePortsJson = "K8S_AWAIT_ELECTION_SERVICE_PORTS_JSON"
)

0 comments on commit ddeca69

Please sign in to comment.