Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Commit

Permalink
Pass along context to methods making API calls
Browse files Browse the repository at this point in the history
  • Loading branch information
hiddeco committed Jun 20, 2019
1 parent b08ae0d commit f2402e2
Show file tree
Hide file tree
Showing 13 changed files with 104 additions and 54 deletions.
7 changes: 4 additions & 3 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cluster

import (
"context"
"errors"

"github.com/weaveworks/flux"
Expand All @@ -25,11 +26,11 @@ const (
// are distinct interfaces.
type Cluster interface {
// Get all of the services (optionally, from a specific namespace), excluding those
AllWorkloads(maybeNamespace string) ([]Workload, error)
SomeWorkloads([]flux.ResourceID) ([]Workload, error)
AllWorkloads(ctx context.Context, maybeNamespace string) ([]Workload, error)
SomeWorkloads(ctx context.Context, ids []flux.ResourceID) ([]Workload, error)
IsAllowedResource(flux.ResourceID) bool
Ping() error
Export() ([]byte, error)
Export(ctx context.Context) ([]byte, error)
Sync(SyncSet) error
PublicSSHKey(regenerate bool) (ssh.PublicKey, error)
}
Expand Down
6 changes: 4 additions & 2 deletions cluster/kubernetes/images.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kubernetes

import (
"context"
"fmt"

"github.com/go-kit/kit/log"
Expand Down Expand Up @@ -122,8 +123,9 @@ func mergeCredentials(log func(...interface{}) error,
// ImagesToFetch is a k8s specific method to get a list of images to update along with their credentials
func (c *Cluster) ImagesToFetch() registry.ImageCreds {
allImageCreds := make(registry.ImageCreds)
ctx := context.Background()

namespaces, err := c.getAllowedAndExistingNamespaces()
namespaces, err := c.getAllowedAndExistingNamespaces(ctx)
if err != nil {
c.logger.Log("err", errors.Wrap(err, "getting namespaces"))
return allImageCreds
Expand All @@ -132,7 +134,7 @@ func (c *Cluster) ImagesToFetch() registry.ImageCreds {
for _, ns := range namespaces {
seenCreds := make(map[string]registry.Credentials)
for kind, resourceKind := range resourceKinds {
workloads, err := resourceKind.getWorkloads(c, ns.Name)
workloads, err := resourceKind.getWorkloads(ctx, c, ns.Name)
if err != nil {
if apierrors.IsNotFound(err) || apierrors.IsForbidden(err) {
// Skip unsupported or forbidden resource kinds
Expand Down
25 changes: 16 additions & 9 deletions cluster/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kubernetes

import (
"bytes"
"context"
"encoding/json"
"fmt"
"sync"
Expand Down Expand Up @@ -127,7 +128,7 @@ func NewCluster(client ExtendedClient, applier Applier, sshKeyRing ssh.KeyRing,
// SomeWorkloads returns the workloads named, missing out any that don't
// exist in the cluster or aren't in an allowed namespace.
// They do not necessarily have to be returned in the order requested.
func (c *Cluster) SomeWorkloads(ids []flux.ResourceID) (res []cluster.Workload, err error) {
func (c *Cluster) SomeWorkloads(ctx context.Context, ids []flux.ResourceID) (res []cluster.Workload, err error) {
var workloads []cluster.Workload
for _, id := range ids {
if !c.IsAllowedResource(id) {
Expand All @@ -141,7 +142,7 @@ func (c *Cluster) SomeWorkloads(ids []flux.ResourceID) (res []cluster.Workload,
continue
}

workload, err := resourceKind.getWorkload(c, ns, name)
workload, err := resourceKind.getWorkload(ctx, c, ns, name)
if err != nil {
if apierrors.IsForbidden(err) || apierrors.IsNotFound(err) {
continue
Expand All @@ -161,8 +162,8 @@ func (c *Cluster) SomeWorkloads(ids []flux.ResourceID) (res []cluster.Workload,

// AllWorkloads returns all workloads in allowed namespaces matching the criteria; that is, in
// the namespace (or any namespace if that argument is empty)
func (c *Cluster) AllWorkloads(namespace string) (res []cluster.Workload, err error) {
namespaces, err := c.getAllowedAndExistingNamespaces()
func (c *Cluster) AllWorkloads(ctx context.Context, namespace string) (res []cluster.Workload, err error) {
namespaces, err := c.getAllowedAndExistingNamespaces(ctx)
if err != nil {
return nil, errors.Wrap(err, "getting namespaces")
}
Expand All @@ -174,7 +175,7 @@ func (c *Cluster) AllWorkloads(namespace string) (res []cluster.Workload, err er
}

for kind, resourceKind := range resourceKinds {
workloads, err := resourceKind.getWorkloads(c, ns.Name)
workloads, err := resourceKind.getWorkloads(ctx, c, ns.Name)
if err != nil {
switch {
case apierrors.IsNotFound(err):
Expand Down Expand Up @@ -219,10 +220,10 @@ func (c *Cluster) Ping() error {
}

// Export exports cluster resources
func (c *Cluster) Export() ([]byte, error) {
func (c *Cluster) Export(ctx context.Context) ([]byte, error) {
var config bytes.Buffer

namespaces, err := c.getAllowedAndExistingNamespaces()
namespaces, err := c.getAllowedAndExistingNamespaces(ctx)
if err != nil {
return nil, errors.Wrap(err, "getting namespaces")
}
Expand All @@ -240,7 +241,7 @@ func (c *Cluster) Export() ([]byte, error) {
}

for _, resourceKind := range resourceKinds {
workloads, err := resourceKind.getWorkloads(c, ns.Name)
workloads, err := resourceKind.getWorkloads(ctx, c, ns.Name)
if err != nil {
switch {
case apierrors.IsNotFound(err):
Expand Down Expand Up @@ -281,10 +282,13 @@ func (c *Cluster) PublicSSHKey(regenerate bool) (ssh.PublicKey, error) {
// the Flux instance is expected to have access to and can look for resources inside of.
// It returns a list of all namespaces unless an explicit list of allowed namespaces
// has been set on the Cluster instance.
func (c *Cluster) getAllowedAndExistingNamespaces() ([]apiv1.Namespace, error) {
func (c *Cluster) getAllowedAndExistingNamespaces(ctx context.Context) ([]apiv1.Namespace, error) {
if len(c.allowedNamespaces) > 0 {
nsList := []apiv1.Namespace{}
for _, name := range c.allowedNamespaces {
if err := ctx.Err(); err != nil {
return nil, err
}
ns, err := c.client.CoreV1().Namespaces().Get(name, meta_v1.GetOptions{})
switch {
case err == nil:
Expand All @@ -303,6 +307,9 @@ func (c *Cluster) getAllowedAndExistingNamespaces() ([]apiv1.Namespace, error) {
return nsList, nil
}

if err := ctx.Err(); err != nil {
return nil, err
}
namespaces, err := c.client.CoreV1().Namespaces().List(meta_v1.ListOptions{})
if err != nil {
return nil, err
Expand Down
3 changes: 2 additions & 1 deletion cluster/kubernetes/kubernetes_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kubernetes

import (
"context"
"reflect"
"testing"

Expand Down Expand Up @@ -28,7 +29,7 @@ func testGetAllowedNamespaces(t *testing.T, namespace []string, expected []strin
client := ExtendedClient{coreClient: clientset}
c := NewCluster(client, nil, nil, log.NewNopLogger(), namespace, []string{})

namespaces, err := c.getAllowedAndExistingNamespaces()
namespaces, err := c.getAllowedAndExistingNamespaces(context.Background())
if err != nil {
t.Errorf("The error should be nil, not: %s", err)
}
Expand Down
65 changes: 51 additions & 14 deletions cluster/kubernetes/resourcekinds.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kubernetes

import (
"context"
"strings"

apiapps "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -30,8 +31,8 @@ const AntecedentAnnotation = "flux.weave.works/antecedent"
// Kind registry

type resourceKind interface {
getWorkload(c *Cluster, namespace, name string) (workload, error)
getWorkloads(c *Cluster, namespace string) ([]workload, error)
getWorkload(ctx context.Context, c *Cluster, namespace, name string) (workload, error)
getWorkloads(ctx context.Context, c *Cluster, namespace string) ([]workload, error)
}

var (
Expand Down Expand Up @@ -114,7 +115,10 @@ func (w workload) toClusterWorkload(resourceID flux.ResourceID) cluster.Workload

type deploymentKind struct{}

func (dk *deploymentKind) getWorkload(c *Cluster, namespace, name string) (workload, error) {
func (dk *deploymentKind) getWorkload(ctx context.Context, c *Cluster, namespace, name string) (workload, error) {
if err := ctx.Err(); err != nil {
return workload{}, err
}
deployment, err := c.client.AppsV1().Deployments(namespace).Get(name, meta_v1.GetOptions{})
if err != nil {
return workload{}, err
Expand All @@ -123,7 +127,10 @@ func (dk *deploymentKind) getWorkload(c *Cluster, namespace, name string) (workl
return makeDeploymentWorkload(deployment), nil
}

func (dk *deploymentKind) getWorkloads(c *Cluster, namespace string) ([]workload, error) {
func (dk *deploymentKind) getWorkloads(ctx context.Context, c *Cluster, namespace string) ([]workload, error) {
if err := ctx.Err(); err != nil {
return nil, err
}
deployments, err := c.client.AppsV1().Deployments(namespace).List(meta_v1.ListOptions{})
if err != nil {
return nil, err
Expand Down Expand Up @@ -191,7 +198,10 @@ func makeDeploymentWorkload(deployment *apiapps.Deployment) workload {

type daemonSetKind struct{}

func (dk *daemonSetKind) getWorkload(c *Cluster, namespace, name string) (workload, error) {
func (dk *daemonSetKind) getWorkload(ctx context.Context, c *Cluster, namespace, name string) (workload, error) {
if err := ctx.Err(); err != nil {
return workload{}, err
}
daemonSet, err := c.client.AppsV1().DaemonSets(namespace).Get(name, meta_v1.GetOptions{})
if err != nil {
return workload{}, err
Expand All @@ -200,7 +210,10 @@ func (dk *daemonSetKind) getWorkload(c *Cluster, namespace, name string) (worklo
return makeDaemonSetWorkload(daemonSet), nil
}

func (dk *daemonSetKind) getWorkloads(c *Cluster, namespace string) ([]workload, error) {
func (dk *daemonSetKind) getWorkloads(ctx context.Context, c *Cluster, namespace string) ([]workload, error) {
if err := ctx.Err(); err != nil {
return nil, err
}
daemonSets, err := c.client.AppsV1().DaemonSets(namespace).List(meta_v1.ListOptions{})
if err != nil {
return nil, err
Expand Down Expand Up @@ -252,7 +265,10 @@ func makeDaemonSetWorkload(daemonSet *apiapps.DaemonSet) workload {

type statefulSetKind struct{}

func (dk *statefulSetKind) getWorkload(c *Cluster, namespace, name string) (workload, error) {
func (dk *statefulSetKind) getWorkload(ctx context.Context, c *Cluster, namespace, name string) (workload, error) {
if err := ctx.Err(); err != nil {
return workload{}, err
}
statefulSet, err := c.client.AppsV1().StatefulSets(namespace).Get(name, meta_v1.GetOptions{})
if err != nil {
return workload{}, err
Expand All @@ -261,7 +277,10 @@ func (dk *statefulSetKind) getWorkload(c *Cluster, namespace, name string) (work
return makeStatefulSetWorkload(statefulSet), nil
}

func (dk *statefulSetKind) getWorkloads(c *Cluster, namespace string) ([]workload, error) {
func (dk *statefulSetKind) getWorkloads(ctx context.Context, c *Cluster, namespace string) ([]workload, error) {
if err := ctx.Err(); err != nil {
return nil, err
}
statefulSets, err := c.client.AppsV1().StatefulSets(namespace).List(meta_v1.ListOptions{})
if err != nil {
return nil, err
Expand Down Expand Up @@ -345,7 +364,10 @@ func makeStatefulSetWorkload(statefulSet *apiapps.StatefulSet) workload {

type cronJobKind struct{}

func (dk *cronJobKind) getWorkload(c *Cluster, namespace, name string) (workload, error) {
func (dk *cronJobKind) getWorkload(ctx context.Context, c *Cluster, namespace, name string) (workload, error) {
if err := ctx.Err(); err != nil {
return workload{}, err
}
cronJob, err := c.client.BatchV1beta1().CronJobs(namespace).Get(name, meta_v1.GetOptions{})
if err != nil {
return workload{}, err
Expand All @@ -354,7 +376,10 @@ func (dk *cronJobKind) getWorkload(c *Cluster, namespace, name string) (workload
return makeCronJobWorkload(cronJob), nil
}

func (dk *cronJobKind) getWorkloads(c *Cluster, namespace string) ([]workload, error) {
func (dk *cronJobKind) getWorkloads(ctx context.Context, c *Cluster, namespace string) ([]workload, error) {
if err := ctx.Err(); err != nil {
return nil, err
}
cronJobs, err := c.client.BatchV1beta1().CronJobs(namespace).List(meta_v1.ListOptions{})
if err != nil {
return nil, err
Expand Down Expand Up @@ -382,15 +407,21 @@ func makeCronJobWorkload(cronJob *apibatch.CronJob) workload {

type fluxHelmReleaseKind struct{}

func (fhr *fluxHelmReleaseKind) getWorkload(c *Cluster, namespace, name string) (workload, error) {
func (fhr *fluxHelmReleaseKind) getWorkload(ctx context.Context, c *Cluster, namespace, name string) (workload, error) {
if err := ctx.Err(); err != nil {
return workload{}, err
}
fluxHelmRelease, err := c.client.HelmV1alpha2().FluxHelmReleases(namespace).Get(name, meta_v1.GetOptions{})
if err != nil {
return workload{}, err
}
return makeFluxHelmReleaseWorkload(fluxHelmRelease), nil
}

func (fhr *fluxHelmReleaseKind) getWorkloads(c *Cluster, namespace string) ([]workload, error) {
func (fhr *fluxHelmReleaseKind) getWorkloads(ctx context.Context, c *Cluster, namespace string) ([]workload, error) {
if err := ctx.Err(); err != nil {
return nil, err
}
fluxHelmReleases, err := c.client.HelmV1alpha2().FluxHelmReleases(namespace).List(meta_v1.ListOptions{})
if err != nil {
return nil, err
Expand Down Expand Up @@ -444,15 +475,21 @@ func createK8sFHRContainers(values map[string]interface{}) []apiv1.Container {

type helmReleaseKind struct{}

func (hr *helmReleaseKind) getWorkload(c *Cluster, namespace, name string) (workload, error) {
func (hr *helmReleaseKind) getWorkload(ctx context.Context, c *Cluster, namespace, name string) (workload, error) {
if err := ctx.Err(); err != nil {
return workload{}, err
}
helmRelease, err := c.client.FluxV1beta1().HelmReleases(namespace).Get(name, meta_v1.GetOptions{})
if err != nil {
return workload{}, err
}
return makeHelmReleaseWorkload(helmRelease), nil
}

func (hr *helmReleaseKind) getWorkloads(c *Cluster, namespace string) ([]workload, error) {
func (hr *helmReleaseKind) getWorkloads(ctx context.Context, c *Cluster, namespace string) ([]workload, error) {
if err := ctx.Err(); err != nil {
return nil, err
}
helmReleases, err := c.client.FluxV1beta1().HelmReleases(namespace).List(meta_v1.ListOptions{})
if err != nil {
return nil, err
Expand Down
3 changes: 2 additions & 1 deletion cluster/kubernetes/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kubernetes

import (
"bytes"
"context"
"crypto/sha1"
"crypto/sha256"
"encoding/base64"
Expand Down Expand Up @@ -292,7 +293,7 @@ func (c *Cluster) listAllowedResources(
}

// List resources only from the allowed namespaces
namespaces, err := c.getAllowedAndExistingNamespaces()
namespaces, err := c.getAllowedAndExistingNamespaces(context.Background())
if err != nil {
return nil, err
}
Expand Down
19 changes: 10 additions & 9 deletions cluster/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package mock

import (
"bytes"
"context"

"github.com/weaveworks/flux"
"github.com/weaveworks/flux/cluster"
Expand All @@ -14,11 +15,11 @@ import (

// Doubles as a cluster.Cluster and cluster.Manifests implementation
type Mock struct {
AllWorkloadsFunc func(maybeNamespace string) ([]cluster.Workload, error)
SomeWorkloadsFunc func([]flux.ResourceID) ([]cluster.Workload, error)
AllWorkloadsFunc func(ctx context.Context, maybeNamespace string) ([]cluster.Workload, error)
SomeWorkloadsFunc func(ctx context.Context, ids []flux.ResourceID) ([]cluster.Workload, error)
IsAllowedResourceFunc func(flux.ResourceID) bool
PingFunc func() error
ExportFunc func() ([]byte, error)
ExportFunc func(ctx context.Context) ([]byte, error)
SyncFunc func(cluster.SyncSet) error
PublicSSHKeyFunc func(regenerate bool) (ssh.PublicKey, error)
SetWorkloadContainerImageFunc func(def []byte, id flux.ResourceID, container string, newImageID image.Ref) ([]byte, error)
Expand All @@ -33,12 +34,12 @@ type Mock struct {
var _ cluster.Cluster = &Mock{}
var _ manifests.Manifests = &Mock{}

func (m *Mock) AllWorkloads(maybeNamespace string) ([]cluster.Workload, error) {
return m.AllWorkloadsFunc(maybeNamespace)
func (m *Mock) AllWorkloads(ctx context.Context, maybeNamespace string) ([]cluster.Workload, error) {
return m.AllWorkloadsFunc(ctx, maybeNamespace)
}

func (m *Mock) SomeWorkloads(s []flux.ResourceID) ([]cluster.Workload, error) {
return m.SomeWorkloadsFunc(s)
func (m *Mock) SomeWorkloads(ctx context.Context, ids []flux.ResourceID) ([]cluster.Workload, error) {
return m.SomeWorkloadsFunc(ctx, ids)
}

func (m *Mock) IsAllowedResource(id flux.ResourceID) bool {
Expand All @@ -49,8 +50,8 @@ func (m *Mock) Ping() error {
return m.PingFunc()
}

func (m *Mock) Export() ([]byte, error) {
return m.ExportFunc()
func (m *Mock) Export(ctx context.Context) ([]byte, error) {
return m.ExportFunc(ctx)
}

func (m *Mock) Sync(c cluster.SyncSet) error {
Expand Down
Loading

0 comments on commit f2402e2

Please sign in to comment.