Skip to content

Commit

Permalink
Fix race condition in Destination's endpoints watcher (#12022)
Browse files Browse the repository at this point in the history
Fixes #12010

## Problem

We're observing crashes in the destination controller in some scenarios, due to data race as described in #12010.

## Cause

The problem is the same instance of the `AddressSet.Addresses` map is getting mutated in the endpoints watcher Server [informer handler](https://github.com/linkerd/linkerd2/blob/edge-24.1.3/controller/api/destination/watcher/endpoints_watcher.go#L1309), and iterated over in the endpoint translator [queue loop](https://github.com/linkerd/linkerd2/blob/edge-24.1.3/controller/api/destination/endpoint_translator.go#L197-L211), which run in different goroutines and the map is not guarded. I believe this doesn't result in Destination returning stale data; it's more of a correctness issue.

## Solution

Make a shallow copy of `pp.addresses` in the endpoints watcher and only pass that to the listeners. It's a shallow copy because we avoid making copies of the pod reference in there, knowing it won't get mutated.

## Repro

Install linkerd core and injected emojivoto and patch the endpoint translator to include a sleep call that will help surfacing the race (don't install the patch in the cluster; we'll only use it locally below):

<details>
  <summary>endpoint_translator.go diff</summary>

```diff
diff --git a/controller/api/destination/endpoint_translator.go b/controller/api/destination/endpoint_translator.go
index d1018d5f9..7d5abd638 100644
--- a/controller/api/destination/endpoint_translator.go
+++ b/controller/api/destination/endpoint_translator.go
@@ -5,6 +5,7 @@ import (
        "reflect"
        "strconv"
        "strings"
+       "time"

        pb "github.com/linkerd/linkerd2-proxy-api/go/destination"
        "github.com/linkerd/linkerd2-proxy-api/go/net"
@@ -195,7 +196,9 @@ func (et *endpointTranslator) processUpdate(update interface{}) {
 }

 func (et *endpointTranslator) add(set watcher.AddressSet) {
        for id, address := range set.Addresses {
+               time.Sleep(1 * time.Second)
                et.availableEndpoints.Addresses[id] = address
        }
```
</details>

Then create these two Server manifests:

<details>
  <summary>emoji-web-server.yml</summary>

```yaml
apiVersion: policy.linkerd.io/v1beta2
kind: Server
metadata:
  namespace: emojivoto
  name: web-http
  labels:
    app.kubernetes.io/part-of: emojivoto
    app.kubernetes.io/name: web
    app.kubernetes.io/version: v11
spec:
  podSelector:
    matchLabels:
      app: web-svc
  port: http
  proxyProtocol: HTTP/1
```
</details>

<details>
  <summary>emoji-web-server-opaque.yml</summary>

```yaml
apiVersion: policy.linkerd.io/v1beta2
kind: Server
metadata:
  namespace: emojivoto
  name: web-http
  labels:
    app.kubernetes.io/part-of: emojivoto
    app.kubernetes.io/name: web
    app.kubernetes.io/version: v11
spec:
  podSelector:
    matchLabels:
      app: web-svc
  port: http
  proxyProtocol: opaque
```
</details>

In separate consoles run the patched destination service and a destination client:

```bash
HOSTNAME=foobar go run -race ./controller/cmd/main.go destination -enable-h2-upgrade=true -enable-endpoint-slices=true -cluster-domain=cluster.local -identity-trust-domain=cluster.local -default-opaque-ports=25,587,3306,4444,5432,6379,9300,11211
```

```bash
go run ./controller/script/destination-client -path web-svc.emojivoto.svc.cluster.local:80
```

And run this to continuously switch the `proxyProtocol` field:

```bash
while true; do kubectl apply -f ~/src/k8s/sample_yamls/emoji-web-server.yml; kubectl apply -f ~/src/k8s/sample_yamls/emoji-web-server-opaque.yml ; done
```

You'll see the following data race report in the Destination controller logs:

<details>
  <summary>destination logs</summary>

```console
==================
WARNING: DATA RACE
Write at 0x00c0006d30e0 by goroutine 178:
  github.com/linkerd/linkerd2/controller/api/destination/watcher.(*portPublisher).updateServer()
      /home/alpeb/pr/destination-race/linkerd2/controller/api/destination/watcher/endpoints_watcher.go:1310 +0x772
  github.com/linkerd/linkerd2/controller/api/destination/watcher.(*servicePublisher).updateServer()
      /home/alpeb/pr/destination-race/linkerd2/controller/api/destination/watcher/endpoints_watcher.go:711 +0x150
  github.com/linkerd/linkerd2/controller/api/destination/watcher.(*EndpointsWatcher).addServer()
      /home/alpeb/pr/destination-race/linkerd2/controller/api/destination/watcher/endpoints_watcher.go:514 +0x173
  github.com/linkerd/linkerd2/controller/api/destination/watcher.(*EndpointsWatcher).updateServer()
      /home/alpeb/pr/destination-race/linkerd2/controller/api/destination/watcher/endpoints_watcher.go:528 +0x26f
  github.com/linkerd/linkerd2/controller/api/destination/watcher.(*EndpointsWatcher).updateServer-fm()
      <autogenerated>:1 +0x64
  k8s.io/client-go/tools/cache.ResourceEventHandlerFuncs.OnUpdate()
      /home/alpeb/go/pkg/mod/k8s.io/[email protected]/tools/cache/controller.go:246 +0x81
  k8s.io/client-go/tools/cache.(*ResourceEventHandlerFuncs).OnUpdate()
      <autogenerated>:1 +0x1f
  k8s.io/client-go/tools/cache.(*processorListener).run.func1()
      /home/alpeb/go/pkg/mod/k8s.io/[email protected]/tools/cache/shared_informer.go:970 +0x1f4
  k8s.io/apimachinery/pkg/util/wait.BackoffUntil.func1()
      /home/alpeb/go/pkg/mod/k8s.io/[email protected]/pkg/util/wait/backoff.go:226 +0x41
  k8s.io/apimachinery/pkg/util/wait.BackoffUntil()
      /home/alpeb/go/pkg/mod/k8s.io/[email protected]/pkg/util/wait/backoff.go:227 +0xbe
  k8s.io/apimachinery/pkg/util/wait.JitterUntil()
      /home/alpeb/go/pkg/mod/k8s.io/[email protected]/pkg/util/wait/backoff.go:204 +0x10a
  k8s.io/apimachinery/pkg/util/wait.Until()
      /home/alpeb/go/pkg/mod/k8s.io/[email protected]/pkg/util/wait/backoff.go:161 +0x9b
  k8s.io/client-go/tools/cache.(*processorListener).run()
      /home/alpeb/go/pkg/mod/k8s.io/[email protected]/tools/cache/shared_informer.go:966 +0x38
  k8s.io/client-go/tools/cache.(*processorListener).run-fm()
      <autogenerated>:1 +0x33
  k8s.io/apimachinery/pkg/util/wait.(*Group).Start.func1()
      /home/alpeb/go/pkg/mod/k8s.io/[email protected]/pkg/util/wait/wait.go:72 +0x86

Previous read at 0x00c0006d30e0 by goroutine 360:
  github.com/linkerd/linkerd2/controller/api/destination.(*endpointTranslator).add()
      /home/alpeb/pr/destination-race/linkerd2/controller/api/destination/endpoint_translator.go:200 +0x1ab
  github.com/linkerd/linkerd2/controller/api/destination.(*endpointTranslator).processUpdate()
      /home/alpeb/pr/destination-race/linkerd2/controller/api/destination/endpoint_translator.go:190 +0x166
  github.com/linkerd/linkerd2/controller/api/destination.(*endpointTranslator).Start.func1()
      /home/alpeb/pr/destination-race/linkerd2/controller/api/destination/endpoint_translator.go:174 +0x45
```
</details>

## Extras

This also removes the unused method `func (as *AddressSet) WithPort(port Port) AddressSet` in endpoints_watcher.go
  • Loading branch information
alpeb authored Feb 7, 2024
1 parent a7cfbf0 commit 8f8bd8f
Showing 1 changed file with 24 additions and 16 deletions.
40 changes: 24 additions & 16 deletions controller/api/destination/watcher/endpoints_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,27 @@ var endpointsVecs = newEndpointsMetricsVecs()

var undefinedEndpointPort = Port(0)

// shallowCopy returns a shallow copy of addr, in the sense that the Pod and
// ExternalWorkload fields of the Addresses map values still point to the
// locations of the original variable
func (addr AddressSet) shallowCopy() AddressSet {
addresses := make(map[ID]Address)
for k, v := range addr.Addresses {
addresses[k] = v
}

labels := make(map[string]string)
for k, v := range addr.Labels {
labels[k] = v
}

return AddressSet{
Addresses: addresses,
Labels: labels,
LocalTrafficPolicy: addr.LocalTrafficPolicy,
}
}

// NewEndpointsWatcher creates an EndpointsWatcher and begins watching the
// k8sAPI for pod, service, and endpoint changes. An EndpointsWatcher will
// watch on Endpoints or EndpointSlice resources, depending on cluster configuration.
Expand Down Expand Up @@ -1152,7 +1173,7 @@ func (pp *portPublisher) updateLocalTrafficPolicy(localTrafficPolicy bool) {
pp.localTrafficPolicy = localTrafficPolicy
pp.addresses.LocalTrafficPolicy = localTrafficPolicy
for _, listener := range pp.listeners {
listener.Add(pp.addresses)
listener.Add(pp.addresses.shallowCopy())
}
}

Expand Down Expand Up @@ -1217,7 +1238,7 @@ func (pp *portPublisher) noEndpoints(exists bool) {
func (pp *portPublisher) subscribe(listener EndpointUpdateListener) {
if pp.exists {
if len(pp.addresses.Addresses) > 0 {
listener.Add(pp.addresses)
listener.Add(pp.addresses.shallowCopy())
} else {
listener.NoEndpoints(true)
}
Expand Down Expand Up @@ -1260,7 +1281,7 @@ func (pp *portPublisher) updateServer(oldServer, newServer *v1beta2.Server) {
}
if updated {
for _, listener := range pp.listeners {
listener.Add(pp.addresses)
listener.Add(pp.addresses.shallowCopy())
}
pp.metrics.incUpdates()
}
Expand Down Expand Up @@ -1328,19 +1349,6 @@ func (pp *portPublisher) isAddressSelected(address Address, server *v1beta2.Serv
/// util ///
////////////

// WithPort sets the port field in all addresses of an address set.
func (as *AddressSet) WithPort(port Port) AddressSet {
wp := AddressSet{
Addresses: map[PodID]Address{},
Labels: as.Labels,
}
for id, addr := range as.Addresses {
addr.Port = port
wp.Addresses[id] = addr
}
return wp
}

// getTargetPort returns the port specified as an argument if no service is
// present. If the service is present and it has a port spec matching the
// specified port, it returns the name of the service's port (not the name
Expand Down

0 comments on commit 8f8bd8f

Please sign in to comment.