Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Commit

Permalink
Pass along context to methods making API calls
Browse files Browse the repository at this point in the history
  • Loading branch information
hiddeco committed Jun 20, 2019
1 parent b08ae0d commit 6d4f77c
Show file tree
Hide file tree
Showing 13 changed files with 62 additions and 54 deletions.
7 changes: 4 additions & 3 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cluster

import (
"context"
"errors"

"github.com/weaveworks/flux"
Expand All @@ -25,11 +26,11 @@ const (
// are distinct interfaces.
type Cluster interface {
// Get all of the services (optionally, from a specific namespace), excluding those
AllWorkloads(maybeNamespace string) ([]Workload, error)
SomeWorkloads([]flux.ResourceID) ([]Workload, error)
AllWorkloads(ctx context.Context, maybeNamespace string) ([]Workload, error)
SomeWorkloads(ctx context.Context, ids []flux.ResourceID) ([]Workload, error)
IsAllowedResource(flux.ResourceID) bool
Ping() error
Export() ([]byte, error)
Export(ctx context.Context) ([]byte, error)
Sync(SyncSet) error
PublicSSHKey(regenerate bool) (ssh.PublicKey, error)
}
Expand Down
6 changes: 4 additions & 2 deletions cluster/kubernetes/images.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kubernetes

import (
"context"
"fmt"

"github.com/go-kit/kit/log"
Expand Down Expand Up @@ -122,8 +123,9 @@ func mergeCredentials(log func(...interface{}) error,
// ImagesToFetch is a k8s specific method to get a list of images to update along with their credentials
func (c *Cluster) ImagesToFetch() registry.ImageCreds {
allImageCreds := make(registry.ImageCreds)
ctx := context.Background()

namespaces, err := c.getAllowedAndExistingNamespaces()
namespaces, err := c.getAllowedAndExistingNamespaces(ctx)
if err != nil {
c.logger.Log("err", errors.Wrap(err, "getting namespaces"))
return allImageCreds
Expand All @@ -132,7 +134,7 @@ func (c *Cluster) ImagesToFetch() registry.ImageCreds {
for _, ns := range namespaces {
seenCreds := make(map[string]registry.Credentials)
for kind, resourceKind := range resourceKinds {
workloads, err := resourceKind.getWorkloads(c, ns.Name)
workloads, err := resourceKind.getWorkloads(ctx, c, ns.Name)
if err != nil {
if apierrors.IsNotFound(err) || apierrors.IsForbidden(err) {
// Skip unsupported or forbidden resource kinds
Expand Down
19 changes: 10 additions & 9 deletions cluster/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kubernetes

import (
"bytes"
"context"
"encoding/json"
"fmt"
"sync"
Expand Down Expand Up @@ -127,7 +128,7 @@ func NewCluster(client ExtendedClient, applier Applier, sshKeyRing ssh.KeyRing,
// SomeWorkloads returns the workloads named, missing out any that don't
// exist in the cluster or aren't in an allowed namespace.
// They do not necessarily have to be returned in the order requested.
func (c *Cluster) SomeWorkloads(ids []flux.ResourceID) (res []cluster.Workload, err error) {
func (c *Cluster) SomeWorkloads(ctx context.Context, ids []flux.ResourceID) (res []cluster.Workload, err error) {
var workloads []cluster.Workload
for _, id := range ids {
if !c.IsAllowedResource(id) {
Expand All @@ -141,7 +142,7 @@ func (c *Cluster) SomeWorkloads(ids []flux.ResourceID) (res []cluster.Workload,
continue
}

workload, err := resourceKind.getWorkload(c, ns, name)
workload, err := resourceKind.getWorkload(ctx, c, ns, name)
if err != nil {
if apierrors.IsForbidden(err) || apierrors.IsNotFound(err) {
continue
Expand All @@ -161,8 +162,8 @@ func (c *Cluster) SomeWorkloads(ids []flux.ResourceID) (res []cluster.Workload,

// AllWorkloads returns all workloads in allowed namespaces matching the criteria; that is, in
// the namespace (or any namespace if that argument is empty)
func (c *Cluster) AllWorkloads(namespace string) (res []cluster.Workload, err error) {
namespaces, err := c.getAllowedAndExistingNamespaces()
func (c *Cluster) AllWorkloads(ctx context.Context, namespace string) (res []cluster.Workload, err error) {
namespaces, err := c.getAllowedAndExistingNamespaces(ctx)
if err != nil {
return nil, errors.Wrap(err, "getting namespaces")
}
Expand All @@ -174,7 +175,7 @@ func (c *Cluster) AllWorkloads(namespace string) (res []cluster.Workload, err er
}

for kind, resourceKind := range resourceKinds {
workloads, err := resourceKind.getWorkloads(c, ns.Name)
workloads, err := resourceKind.getWorkloads(ctx, c, ns.Name)
if err != nil {
switch {
case apierrors.IsNotFound(err):
Expand Down Expand Up @@ -219,10 +220,10 @@ func (c *Cluster) Ping() error {
}

// Export exports cluster resources
func (c *Cluster) Export() ([]byte, error) {
func (c *Cluster) Export(ctx context.Context) ([]byte, error) {
var config bytes.Buffer

namespaces, err := c.getAllowedAndExistingNamespaces()
namespaces, err := c.getAllowedAndExistingNamespaces(ctx)
if err != nil {
return nil, errors.Wrap(err, "getting namespaces")
}
Expand All @@ -240,7 +241,7 @@ func (c *Cluster) Export() ([]byte, error) {
}

for _, resourceKind := range resourceKinds {
workloads, err := resourceKind.getWorkloads(c, ns.Name)
workloads, err := resourceKind.getWorkloads(ctx, c, ns.Name)
if err != nil {
switch {
case apierrors.IsNotFound(err):
Expand Down Expand Up @@ -281,7 +282,7 @@ func (c *Cluster) PublicSSHKey(regenerate bool) (ssh.PublicKey, error) {
// the Flux instance is expected to have access to and can look for resources inside of.
// It returns a list of all namespaces unless an explicit list of allowed namespaces
// has been set on the Cluster instance.
func (c *Cluster) getAllowedAndExistingNamespaces() ([]apiv1.Namespace, error) {
func (c *Cluster) getAllowedAndExistingNamespaces(ctx context.Context) ([]apiv1.Namespace, error) {
if len(c.allowedNamespaces) > 0 {
nsList := []apiv1.Namespace{}
for _, name := range c.allowedNamespaces {
Expand Down
3 changes: 2 additions & 1 deletion cluster/kubernetes/kubernetes_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kubernetes

import (
"context"
"reflect"
"testing"

Expand Down Expand Up @@ -28,7 +29,7 @@ func testGetAllowedNamespaces(t *testing.T, namespace []string, expected []strin
client := ExtendedClient{coreClient: clientset}
c := NewCluster(client, nil, nil, log.NewNopLogger(), namespace, []string{})

namespaces, err := c.getAllowedAndExistingNamespaces()
namespaces, err := c.getAllowedAndExistingNamespaces(context.Background())
if err != nil {
t.Errorf("The error should be nil, not: %s", err)
}
Expand Down
29 changes: 15 additions & 14 deletions cluster/kubernetes/resourcekinds.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kubernetes

import (
"context"
"strings"

apiapps "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -30,8 +31,8 @@ const AntecedentAnnotation = "flux.weave.works/antecedent"
// Kind registry

type resourceKind interface {
getWorkload(c *Cluster, namespace, name string) (workload, error)
getWorkloads(c *Cluster, namespace string) ([]workload, error)
getWorkload(ctx context.Context, c *Cluster, namespace, name string) (workload, error)
getWorkloads(ctx context.Context, c *Cluster, namespace string) ([]workload, error)
}

var (
Expand Down Expand Up @@ -114,7 +115,7 @@ func (w workload) toClusterWorkload(resourceID flux.ResourceID) cluster.Workload

type deploymentKind struct{}

func (dk *deploymentKind) getWorkload(c *Cluster, namespace, name string) (workload, error) {
func (dk *deploymentKind) getWorkload(ctx context.Context, c *Cluster, namespace, name string) (workload, error) {
deployment, err := c.client.AppsV1().Deployments(namespace).Get(name, meta_v1.GetOptions{})
if err != nil {
return workload{}, err
Expand All @@ -123,7 +124,7 @@ func (dk *deploymentKind) getWorkload(c *Cluster, namespace, name string) (workl
return makeDeploymentWorkload(deployment), nil
}

func (dk *deploymentKind) getWorkloads(c *Cluster, namespace string) ([]workload, error) {
func (dk *deploymentKind) getWorkloads(ctx context.Context, c *Cluster, namespace string) ([]workload, error) {
deployments, err := c.client.AppsV1().Deployments(namespace).List(meta_v1.ListOptions{})
if err != nil {
return nil, err
Expand Down Expand Up @@ -191,7 +192,7 @@ func makeDeploymentWorkload(deployment *apiapps.Deployment) workload {

type daemonSetKind struct{}

func (dk *daemonSetKind) getWorkload(c *Cluster, namespace, name string) (workload, error) {
func (dk *daemonSetKind) getWorkload(ctx context.Context, c *Cluster, namespace, name string) (workload, error) {
daemonSet, err := c.client.AppsV1().DaemonSets(namespace).Get(name, meta_v1.GetOptions{})
if err != nil {
return workload{}, err
Expand All @@ -200,7 +201,7 @@ func (dk *daemonSetKind) getWorkload(c *Cluster, namespace, name string) (worklo
return makeDaemonSetWorkload(daemonSet), nil
}

func (dk *daemonSetKind) getWorkloads(c *Cluster, namespace string) ([]workload, error) {
func (dk *daemonSetKind) getWorkloads(ctx context.Context, c *Cluster, namespace string) ([]workload, error) {
daemonSets, err := c.client.AppsV1().DaemonSets(namespace).List(meta_v1.ListOptions{})
if err != nil {
return nil, err
Expand Down Expand Up @@ -252,7 +253,7 @@ func makeDaemonSetWorkload(daemonSet *apiapps.DaemonSet) workload {

type statefulSetKind struct{}

func (dk *statefulSetKind) getWorkload(c *Cluster, namespace, name string) (workload, error) {
func (dk *statefulSetKind) getWorkload(ctx context.Context, c *Cluster, namespace, name string) (workload, error) {
statefulSet, err := c.client.AppsV1().StatefulSets(namespace).Get(name, meta_v1.GetOptions{})
if err != nil {
return workload{}, err
Expand All @@ -261,7 +262,7 @@ func (dk *statefulSetKind) getWorkload(c *Cluster, namespace, name string) (work
return makeStatefulSetWorkload(statefulSet), nil
}

func (dk *statefulSetKind) getWorkloads(c *Cluster, namespace string) ([]workload, error) {
func (dk *statefulSetKind) getWorkloads(ctx context.Context, c *Cluster, namespace string) ([]workload, error) {
statefulSets, err := c.client.AppsV1().StatefulSets(namespace).List(meta_v1.ListOptions{})
if err != nil {
return nil, err
Expand Down Expand Up @@ -345,7 +346,7 @@ func makeStatefulSetWorkload(statefulSet *apiapps.StatefulSet) workload {

type cronJobKind struct{}

func (dk *cronJobKind) getWorkload(c *Cluster, namespace, name string) (workload, error) {
func (dk *cronJobKind) getWorkload(ctx context.Context, c *Cluster, namespace, name string) (workload, error) {
cronJob, err := c.client.BatchV1beta1().CronJobs(namespace).Get(name, meta_v1.GetOptions{})
if err != nil {
return workload{}, err
Expand All @@ -354,7 +355,7 @@ func (dk *cronJobKind) getWorkload(c *Cluster, namespace, name string) (workload
return makeCronJobWorkload(cronJob), nil
}

func (dk *cronJobKind) getWorkloads(c *Cluster, namespace string) ([]workload, error) {
func (dk *cronJobKind) getWorkloads(ctx context.Context, c *Cluster, namespace string) ([]workload, error) {
cronJobs, err := c.client.BatchV1beta1().CronJobs(namespace).List(meta_v1.ListOptions{})
if err != nil {
return nil, err
Expand Down Expand Up @@ -382,15 +383,15 @@ func makeCronJobWorkload(cronJob *apibatch.CronJob) workload {

type fluxHelmReleaseKind struct{}

func (fhr *fluxHelmReleaseKind) getWorkload(c *Cluster, namespace, name string) (workload, error) {
func (fhr *fluxHelmReleaseKind) getWorkload(ctx context.Context, c *Cluster, namespace, name string) (workload, error) {
fluxHelmRelease, err := c.client.HelmV1alpha2().FluxHelmReleases(namespace).Get(name, meta_v1.GetOptions{})
if err != nil {
return workload{}, err
}
return makeFluxHelmReleaseWorkload(fluxHelmRelease), nil
}

func (fhr *fluxHelmReleaseKind) getWorkloads(c *Cluster, namespace string) ([]workload, error) {
func (fhr *fluxHelmReleaseKind) getWorkloads(ctx context.Context, c *Cluster, namespace string) ([]workload, error) {
fluxHelmReleases, err := c.client.HelmV1alpha2().FluxHelmReleases(namespace).List(meta_v1.ListOptions{})
if err != nil {
return nil, err
Expand Down Expand Up @@ -444,15 +445,15 @@ func createK8sFHRContainers(values map[string]interface{}) []apiv1.Container {

type helmReleaseKind struct{}

func (hr *helmReleaseKind) getWorkload(c *Cluster, namespace, name string) (workload, error) {
func (hr *helmReleaseKind) getWorkload(ctx context.Context, c *Cluster, namespace, name string) (workload, error) {
helmRelease, err := c.client.FluxV1beta1().HelmReleases(namespace).Get(name, meta_v1.GetOptions{})
if err != nil {
return workload{}, err
}
return makeHelmReleaseWorkload(helmRelease), nil
}

func (hr *helmReleaseKind) getWorkloads(c *Cluster, namespace string) ([]workload, error) {
func (hr *helmReleaseKind) getWorkloads(ctx context.Context, c *Cluster, namespace string) ([]workload, error) {
helmReleases, err := c.client.FluxV1beta1().HelmReleases(namespace).List(meta_v1.ListOptions{})
if err != nil {
return nil, err
Expand Down
3 changes: 2 additions & 1 deletion cluster/kubernetes/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kubernetes

import (
"bytes"
"context"
"crypto/sha1"
"crypto/sha256"
"encoding/base64"
Expand Down Expand Up @@ -292,7 +293,7 @@ func (c *Cluster) listAllowedResources(
}

// List resources only from the allowed namespaces
namespaces, err := c.getAllowedAndExistingNamespaces()
namespaces, err := c.getAllowedAndExistingNamespaces(context.Background())
if err != nil {
return nil, err
}
Expand Down
19 changes: 10 additions & 9 deletions cluster/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package mock

import (
"bytes"
"context"

"github.com/weaveworks/flux"
"github.com/weaveworks/flux/cluster"
Expand All @@ -14,11 +15,11 @@ import (

// Doubles as a cluster.Cluster and cluster.Manifests implementation
type Mock struct {
AllWorkloadsFunc func(maybeNamespace string) ([]cluster.Workload, error)
SomeWorkloadsFunc func([]flux.ResourceID) ([]cluster.Workload, error)
AllWorkloadsFunc func(ctx context.Context, maybeNamespace string) ([]cluster.Workload, error)
SomeWorkloadsFunc func(ctx context.Context, ids []flux.ResourceID) ([]cluster.Workload, error)
IsAllowedResourceFunc func(flux.ResourceID) bool
PingFunc func() error
ExportFunc func() ([]byte, error)
ExportFunc func(ctx context.Context) ([]byte, error)
SyncFunc func(cluster.SyncSet) error
PublicSSHKeyFunc func(regenerate bool) (ssh.PublicKey, error)
SetWorkloadContainerImageFunc func(def []byte, id flux.ResourceID, container string, newImageID image.Ref) ([]byte, error)
Expand All @@ -33,12 +34,12 @@ type Mock struct {
var _ cluster.Cluster = &Mock{}
var _ manifests.Manifests = &Mock{}

func (m *Mock) AllWorkloads(maybeNamespace string) ([]cluster.Workload, error) {
return m.AllWorkloadsFunc(maybeNamespace)
func (m *Mock) AllWorkloads(ctx context.Context, maybeNamespace string) ([]cluster.Workload, error) {
return m.AllWorkloadsFunc(ctx, maybeNamespace)
}

func (m *Mock) SomeWorkloads(s []flux.ResourceID) ([]cluster.Workload, error) {
return m.SomeWorkloadsFunc(s)
func (m *Mock) SomeWorkloads(ctx context.Context, ids []flux.ResourceID) ([]cluster.Workload, error) {
return m.SomeWorkloadsFunc(ctx, ids)
}

func (m *Mock) IsAllowedResource(id flux.ResourceID) bool {
Expand All @@ -49,8 +50,8 @@ func (m *Mock) Ping() error {
return m.PingFunc()
}

func (m *Mock) Export() ([]byte, error) {
return m.ExportFunc()
func (m *Mock) Export(ctx context.Context) ([]byte, error) {
return m.ExportFunc(ctx)
}

func (m *Mock) Sync(c cluster.SyncSet) error {
Expand Down
10 changes: 5 additions & 5 deletions daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (d *Daemon) Ping(ctx context.Context) error {
}

func (d *Daemon) Export(ctx context.Context) ([]byte, error) {
return d.Cluster.Export()
return d.Cluster.Export(ctx)
}

func (d *Daemon) getManifestStore(checkout *git.Checkout) (manifests.Store, error) {
Expand Down Expand Up @@ -122,9 +122,9 @@ func (d *Daemon) ListServicesWithOptions(ctx context.Context, opts v11.ListServi
var clusterWorkloads []cluster.Workload
var err error
if len(opts.Services) > 0 {
clusterWorkloads, err = d.Cluster.SomeWorkloads(opts.Services)
clusterWorkloads, err = d.Cluster.SomeWorkloads(ctx, opts.Services)
} else {
clusterWorkloads, err = d.Cluster.AllWorkloads(opts.Namespace)
clusterWorkloads, err = d.Cluster.AllWorkloads(ctx, opts.Namespace)
}
if err != nil {
return nil, errors.Wrap(err, "getting workloads from cluster")
Expand Down Expand Up @@ -199,12 +199,12 @@ func (d *Daemon) ListImagesWithOptions(ctx context.Context, opts v10.ListImagesO
if err != nil {
return nil, errors.Wrap(err, "treating workload spec as ID")
}
workloads, err = d.Cluster.SomeWorkloads([]flux.ResourceID{id})
workloads, err = d.Cluster.SomeWorkloads(ctx, []flux.ResourceID{id})
if err != nil {
return nil, errors.Wrap(err, "getting some workloads")
}
} else {
workloads, err = d.Cluster.AllWorkloads(opts.Namespace)
workloads, err = d.Cluster.AllWorkloads(ctx, opts.Namespace)
if err != nil {
return nil, errors.Wrap(err, "getting all workloads")
}
Expand Down
Loading

0 comments on commit 6d4f77c

Please sign in to comment.