Skip to content

Commit

Permalink
Merge pull request kubernetes-sigs#2190 from abursavich/source-cleanup
Browse files Browse the repository at this point in the history
Source: Dedupe Common Logic
  • Loading branch information
k8s-ci-robot authored Aug 11, 2021
2 parents dfcfaf3 + 60c649b commit a690bb9
Show file tree
Hide file tree
Showing 26 changed files with 159 additions and 374 deletions.
9 changes: 2 additions & 7 deletions source/ambassador_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"fmt"
"sort"
"strings"
"time"

ambassador "github.com/datawire/ambassador/pkg/api/getambassador.io/v2"
"github.com/pkg/errors"
Expand Down Expand Up @@ -87,12 +86,8 @@ func NewAmbassadorHostSource(
// TODO informer is not explicitly stopped since controller is not passing in its channel.
informerFactory.Start(wait.NeverStop)

// wait for the local cache to be populated.
err = poll(time.Second, 60*time.Second, func() (bool, error) {
return ambassadorHostInformer.Informer().HasSynced(), nil
})
if err != nil {
return nil, errors.Wrapf(err, "failed to sync cache")
if err := waitForDynamicCacheSync(context.Background(), informerFactory); err != nil {
return nil, err
}

uc, err := newUnstructuredConverter()
Expand Down
52 changes: 9 additions & 43 deletions source/httpproxy.go → source/contour_httpproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,10 @@ limitations under the License.
package source

import (
"bytes"
"context"
"fmt"
"sort"
"strings"
"text/template"
"time"

"github.com/pkg/errors"
projectcontour "github.com/projectcontour/contour/apis/projectcontour/v1"
Expand Down Expand Up @@ -63,17 +60,9 @@ func NewContourHTTPProxySource(
combineFqdnAnnotation bool,
ignoreHostnameAnnotation bool,
) (Source, error) {
var (
tmpl *template.Template
err error
)
if fqdnTemplate != "" {
tmpl, err = template.New("endpoint").Funcs(template.FuncMap{
"trimPrefix": strings.TrimPrefix,
}).Parse(fqdnTemplate)
if err != nil {
return nil, err
}
tmpl, err := parseTemplate(fqdnTemplate)
if err != nil {
return nil, err
}

// Use shared informer to listen for add/update/delete of HTTPProxys in the specified namespace.
Expand All @@ -93,11 +82,8 @@ func NewContourHTTPProxySource(
informerFactory.Start(wait.NeverStop)

// wait for the local cache to be populated.
err = poll(time.Second, 60*time.Second, func() (bool, error) {
return httpProxyInformer.Informer().HasSynced(), nil
})
if err != nil {
return nil, errors.Wrap(err, "failed to sync cache")
if err := waitForDynamicCacheSync(context.Background(), informerFactory); err != nil {
return nil, err
}

uc, err := NewUnstructuredConverter()
Expand Down Expand Up @@ -197,22 +183,17 @@ func (sc *httpProxySource) Endpoints(ctx context.Context) ([]*endpoint.Endpoint,
}

func (sc *httpProxySource) endpointsFromTemplate(httpProxy *projectcontour.HTTPProxy) ([]*endpoint.Endpoint, error) {
// Process the whole template string
var buf bytes.Buffer
err := sc.fqdnTemplate.Execute(&buf, httpProxy)
hostnames, err := execTemplate(sc.fqdnTemplate, httpProxy)
if err != nil {
return nil, errors.Wrapf(err, "failed to apply template on HTTPProxy %s/%s", httpProxy.Namespace, httpProxy.Name)
return nil, err
}

hostnames := buf.String()

ttl, err := getTTLFromAnnotations(httpProxy.Annotations)
if err != nil {
log.Warn(err)
}

targets := getTargetsFromTargetAnnotation(httpProxy.Annotations)

if len(targets) == 0 {
for _, lb := range httpProxy.Status.LoadBalancer.Ingress {
if lb.IP != "" {
Expand All @@ -227,10 +208,7 @@ func (sc *httpProxySource) endpointsFromTemplate(httpProxy *projectcontour.HTTPP
providerSpecific, setIdentifier := getProviderSpecificAnnotations(httpProxy.Annotations)

var endpoints []*endpoint.Endpoint
// splits the FQDN template and removes the trailing periods
hostnameList := strings.Split(strings.Replace(hostnames, " ", "", -1), ",")
for _, hostname := range hostnameList {
hostname = strings.TrimSuffix(hostname, ".")
for _, hostname := range hostnames {
endpoints = append(endpoints, endpointsForHostname(hostname, targets, ttl, providerSpecific, setIdentifier)...)
}
return endpoints, nil
Expand Down Expand Up @@ -324,17 +302,5 @@ func (sc *httpProxySource) AddEventHandler(ctx context.Context, handler func())

// Right now there is no way to remove event handler from informer, see:
// https://github.com/kubernetes/kubernetes/issues/79610
sc.httpProxyInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
handler()
},
UpdateFunc: func(old interface{}, new interface{}) {
handler()
},
DeleteFunc: func(obj interface{}) {
handler()
},
},
)
sc.httpProxyInformer.Informer().AddEventHandler(eventHandlerFunc(handler))
}
File renamed without changes.
37 changes: 8 additions & 29 deletions source/ingressroute.go → source/contour_ingressroute.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,11 @@ limitations under the License.
package source

import (
"bytes"
"context"
"fmt"
"sort"
"strings"
"text/template"
"time"

"github.com/pkg/errors"
contour "github.com/projectcontour/contour/apis/contour/v1beta1"
Expand Down Expand Up @@ -68,17 +66,9 @@ func NewContourIngressRouteSource(
combineFqdnAnnotation bool,
ignoreHostnameAnnotation bool,
) (Source, error) {
var (
tmpl *template.Template
err error
)
if fqdnTemplate != "" {
tmpl, err = template.New("endpoint").Funcs(template.FuncMap{
"trimPrefix": strings.TrimPrefix,
}).Parse(fqdnTemplate)
if err != nil {
return nil, err
}
tmpl, err := parseTemplate(fqdnTemplate)
if err != nil {
return nil, err
}

if _, _, err = parseContourLoadBalancerService(contourLoadBalancerService); err != nil {
Expand All @@ -102,11 +92,8 @@ func NewContourIngressRouteSource(
informerFactory.Start(wait.NeverStop)

// wait for the local cache to be populated.
err = poll(time.Second, 60*time.Second, func() (bool, error) {
return ingressRouteInformer.Informer().HasSynced(), nil
})
if err != nil {
return nil, fmt.Errorf("failed to sync cache: %v", err)
if err := waitForDynamicCacheSync(context.Background(), informerFactory); err != nil {
return nil, err
}

uc, err := NewUnstructuredConverter()
Expand Down Expand Up @@ -208,22 +195,17 @@ func (sc *ingressRouteSource) Endpoints(ctx context.Context) ([]*endpoint.Endpoi
}

func (sc *ingressRouteSource) endpointsFromTemplate(ctx context.Context, ingressRoute *contour.IngressRoute) ([]*endpoint.Endpoint, error) {
// Process the whole template string
var buf bytes.Buffer
err := sc.fqdnTemplate.Execute(&buf, ingressRoute)
hostnames, err := execTemplate(sc.fqdnTemplate, ingressRoute)
if err != nil {
return nil, fmt.Errorf("failed to apply template on ingressroute %s/%s: %v", ingressRoute.Namespace, ingressRoute.Name, err)
return nil, err
}

hostnames := buf.String()

ttl, err := getTTLFromAnnotations(ingressRoute.Annotations)
if err != nil {
log.Warn(err)
}

targets := getTargetsFromTargetAnnotation(ingressRoute.Annotations)

if len(targets) == 0 {
targets, err = sc.targetsFromContourLoadBalancer(ctx)
if err != nil {
Expand All @@ -234,10 +216,7 @@ func (sc *ingressRouteSource) endpointsFromTemplate(ctx context.Context, ingress
providerSpecific, setIdentifier := getProviderSpecificAnnotations(ingressRoute.Annotations)

var endpoints []*endpoint.Endpoint
// splits the FQDN template and removes the trailing periods
hostnameList := strings.Split(strings.Replace(hostnames, " ", "", -1), ",")
for _, hostname := range hostnameList {
hostname = strings.TrimSuffix(hostname, ".")
for _, hostname := range hostnames {
endpoints = append(endpoints, endpointsForHostname(hostname, targets, ttl, providerSpecific, setIdentifier)...)
}
return endpoints, nil
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
51 changes: 9 additions & 42 deletions source/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,11 @@ limitations under the License.
package source

import (
"bytes"
"context"
"fmt"
"sort"
"strings"
"text/template"
"time"

log "github.com/sirupsen/logrus"
"k8s.io/api/extensions/v1beta1"
Expand Down Expand Up @@ -66,17 +64,9 @@ type ingressSource struct {

// NewIngressSource creates a new ingressSource with the given config.
func NewIngressSource(kubeClient kubernetes.Interface, namespace, annotationFilter string, fqdnTemplate string, combineFqdnAnnotation bool, ignoreHostnameAnnotation bool, ignoreIngressTLSSpec bool, ignoreIngressRulesSpec bool) (Source, error) {
var (
tmpl *template.Template
err error
)
if fqdnTemplate != "" {
tmpl, err = template.New("endpoint").Funcs(template.FuncMap{
"trimPrefix": strings.TrimPrefix,
}).Parse(fqdnTemplate)
if err != nil {
return nil, err
}
tmpl, err := parseTemplate(fqdnTemplate)
if err != nil {
return nil, err
}

// Use shared informer to listen for add/update/delete of ingresses in the specified namespace.
Expand All @@ -96,11 +86,8 @@ func NewIngressSource(kubeClient kubernetes.Interface, namespace, annotationFilt
informerFactory.Start(wait.NeverStop)

// wait for the local cache to be populated.
err = poll(time.Second, 60*time.Second, func() (bool, error) {
return ingressInformer.Informer().HasSynced(), nil
})
if err != nil {
return nil, fmt.Errorf("failed to sync cache: %v", err)
if err := waitForCacheSync(context.Background(), informerFactory); err != nil {
return nil, err
}

sc := &ingressSource{
Expand Down Expand Up @@ -175,33 +162,25 @@ func (sc *ingressSource) Endpoints(ctx context.Context) ([]*endpoint.Endpoint, e
}

func (sc *ingressSource) endpointsFromTemplate(ing *v1beta1.Ingress) ([]*endpoint.Endpoint, error) {
// Process the whole template string
var buf bytes.Buffer
err := sc.fqdnTemplate.Execute(&buf, ing)
hostnames, err := execTemplate(sc.fqdnTemplate, ing)
if err != nil {
return nil, fmt.Errorf("failed to apply template on ingress %s: %v", ing.String(), err)
return nil, err
}

hostnames := buf.String()

ttl, err := getTTLFromAnnotations(ing.Annotations)
if err != nil {
log.Warn(err)
}

targets := getTargetsFromTargetAnnotation(ing.Annotations)

if len(targets) == 0 {
targets = targetsFromIngressStatus(ing.Status)
}

providerSpecific, setIdentifier := getProviderSpecificAnnotations(ing.Annotations)

var endpoints []*endpoint.Endpoint
// splits the FQDN template and removes the trailing periods
hostnameList := strings.Split(strings.Replace(hostnames, " ", "", -1), ",")
for _, hostname := range hostnameList {
hostname = strings.TrimSuffix(hostname, ".")
for _, hostname := range hostnames {
endpoints = append(endpoints, endpointsForHostname(hostname, targets, ttl, providerSpecific, setIdentifier)...)
}
return endpoints, nil
Expand Down Expand Up @@ -331,17 +310,5 @@ func (sc *ingressSource) AddEventHandler(ctx context.Context, handler func()) {

// Right now there is no way to remove event handler from informer, see:
// https://github.com/kubernetes/kubernetes/issues/79610
sc.ingressInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
handler()
},
UpdateFunc: func(old interface{}, new interface{}) {
handler()
},
DeleteFunc: func(obj interface{}) {
handler()
},
},
)
sc.ingressInformer.Informer().AddEventHandler(eventHandlerFunc(handler))
}
5 changes: 2 additions & 3 deletions source/ingress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,8 @@ func testEndpointsFromIngressHostnameSourceAnnotation(t *testing.T) {
{
title: "No ingress-hostname-source annotation, one rule.host",
ingress: fakeIngress{
dnsnames: []string{"foo.bar"},
hostnames: []string{"lb.com"},
dnsnames: []string{"foo.bar"},
hostnames: []string{"lb.com"},
},
expected: []*endpoint.Endpoint{
{
Expand Down Expand Up @@ -1190,7 +1190,6 @@ func testIngressEndpoints(t *testing.T) {
ti.ignoreIngressTLSSpec,
ti.ignoreIngressRulesSpec,
)

// Informer cache has all of the ingresses. Retrieve and validate their endpoints.
res, err := source.Endpoints(context.Background())
if ti.expectError {
Expand Down
Loading

0 comments on commit a690bb9

Please sign in to comment.