Skip to content

Commit

Permalink
[Anbarasan/Rooba] Populates upstreams on start up before starting the
Browse files Browse the repository at this point in the history
event handler
  • Loading branch information
roobalimsab committed Nov 12, 2019
1 parent e7957d8 commit e52573d
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 38 deletions.
6 changes: 6 additions & 0 deletions cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/gojekfarm/envoy-lb-operator/server"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)
Expand Down Expand Up @@ -69,6 +70,11 @@ func serve(cmd *cobra.Command, args []string) {
for _, mapping := range config.GetDiscoveryMapping() {
lb := envoy.NewLB(mapping.EnvoyId, envoyConfig, snapshotCache, config.AutoRefreshConn())
go lb.HandleEvents()

// Populate all the existing upstreams during start up
serviceList, _ := kubeClient.CoreV1().Services(mapping.Namespace).List(metav1.ListOptions{LabelSelector: mapping.UpstreamLabel})
lb.InitializeUpstream(serviceList)

svcCancelFn := server.StartSvcKubeHandler(kubeClient, lb.SvcTrigger, mapping.UpstreamLabel, mapping.Namespace)
go cancelOnInterrupt(svcCancelFn)

Expand Down
52 changes: 39 additions & 13 deletions envoy/loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,29 @@ type LoadBalancer struct {
events chan LBEvent
upstreams map[string]kube.Service
nodeID string
Config cache.SnapshotCache
ConfigVersion int32
EnvoyConfig config.EnvoyConfig
AutoRefreshConn bool
cache cache.SnapshotCache
cacheVersion int32
envoyConfig config.EnvoyConfig
autoRefreshConn bool
}

func (lb *LoadBalancer) Trigger(evt LBEvent) {
lb.events <- evt
}

func (lb *LoadBalancer) InitializeUpstream(serviceList *corev1.ServiceList) {
lb.incrementVersion()
for _, service := range serviceList.Items {
svc := lb.getService(&service)
lb.upstreams[svc.Address] = svc
}
log.Debug("Populated all existing upstreams.")
}

func (lb *LoadBalancer) SvcTrigger(eventType LBEventType, svc *corev1.Service) {
log.Debugf("Received event: %s eventtype: %+v for node: %s", svc, eventType, lb.nodeID)
if svc.Spec.ClusterIP == v1.ClusterIPNone {
lb.Trigger(LBEvent{EventType: eventType, Svc: kube.Service{Address: svc.Name, Port: uint32(svc.Spec.Ports[0].TargetPort.IntVal), Type: kube.ServiceType(svc), Path: kube.ServicePath(svc), Domain: kube.ServiceDomain(svc)}})
lb.Trigger(LBEvent{EventType: eventType, Svc: lb.getService(svc)})
}
}

Expand Down Expand Up @@ -80,7 +89,8 @@ func (lb *LoadBalancer) EndpointTrigger() {
}

func (lb *LoadBalancer) SnapshotRunner() {
if lb.AutoRefreshConn {
log.Debug("Executing Snapshot Runner...")
if lb.autoRefreshConn {
lb.incrementVersion()
}
var clusters []cache.Resource
Expand All @@ -89,7 +99,7 @@ func (lb *LoadBalancer) SnapshotRunner() {
if len(lb.upstreams) > 0 {
for _, svc := range lb.upstreams {

clusters = append(clusters, svc.Cluster(lb.EnvoyConfig.ConnectTimeoutMs, lb.EnvoyConfig.CircuitBreaker, lb.EnvoyConfig.OutlierDetection))
clusters = append(clusters, svc.Cluster(lb.envoyConfig.ConnectTimeoutMs, lb.envoyConfig.CircuitBreaker, lb.envoyConfig.OutlierDetection))
if targetsByDomain[svc.Domain] == nil {
targetsByDomain[svc.Domain] = []cp.Target{svc.DefaultTarget()}
} else {
Expand All @@ -99,7 +109,7 @@ func (lb *LoadBalancer) SnapshotRunner() {
}
vhosts := []route.VirtualHost{}
for domain, targets := range targetsByDomain {
retryConfig := lb.EnvoyConfig.RetryConfig
retryConfig := lb.envoyConfig.RetryConfig
vhosts = append(vhosts, cp.VHost(fmt.Sprintf("local_service_%s", domain), []string{domain}, targets, cp.RetryPolicy(retryConfig.RetryOn, retryConfig.RetryPredicate, retryConfig.NumRetries, retryConfig.HostSelectionMaxRetryAttempts)))
}

Expand All @@ -111,18 +121,34 @@ func (lb *LoadBalancer) SnapshotRunner() {
log.Errorf("Error %v", err)
panic(err)
}
snapshot := cache.NewSnapshot(fmt.Sprint(lb.ConfigVersion), nil, clusters, nil, []cache.Resource{listener})
err = lb.Config.SetSnapshot(lb.nodeID, snapshot)
snapshot := cache.NewSnapshot(fmt.Sprint(lb.cacheVersion), nil, clusters, nil, []cache.Resource{listener})
err = lb.cache.SetSnapshot(lb.nodeID, snapshot)
if err != nil {
log.Errorf("snapshot error: %s", err.Error())
}
}

func NewLB(nodeID string, envoyConfig config.EnvoyConfig, snapshotCache cache.SnapshotCache, autoRefreshConn bool) *LoadBalancer {
return &LoadBalancer{events: make(chan LBEvent, 10), upstreams: make(map[string]kube.Service), nodeID: nodeID, Config: snapshotCache, EnvoyConfig: envoyConfig, AutoRefreshConn: autoRefreshConn}
return &LoadBalancer{events: make(chan LBEvent, 10), upstreams: make(map[string]kube.Service), nodeID: nodeID, cache: snapshotCache, envoyConfig: envoyConfig, autoRefreshConn: autoRefreshConn}
}

func (lb *LoadBalancer) incrementVersion() {
atomic.AddInt32(&lb.ConfigVersion, 1)
log.Infof("Incrementing snapshot version to %v\n", lb.ConfigVersion)
atomic.AddInt32(&lb.cacheVersion, 1)
log.Infof("Incrementing snapshot version to %v\n", lb.cacheVersion)
}

func (lb *LoadBalancer) getService(svc *corev1.Service) kube.Service {
return kube.Service{Address: svc.Name, Port: uint32(svc.Spec.Ports[0].TargetPort.IntVal), Type: kube.ServiceType(svc), Path: kube.ServicePath(svc), Domain: kube.ServiceDomain(svc)}
}

func (lb *LoadBalancer) GetCacheVersion() int32 {
return lb.cacheVersion
}

func (lb *LoadBalancer) GetCache() cache.SnapshotCache {
return lb.cache
}

func (lb *LoadBalancer) GetUpstreams() map[string]kube.Service {
return lb.upstreams
}
69 changes: 53 additions & 16 deletions envoy/loadbalancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,58 +2,60 @@ package envoy_test

import (
"encoding/json"
corev1 "k8s.io/api/core/v1"
"testing"

"github.com/envoyproxy/go-control-plane/pkg/cache"
"github.com/gojekfarm/envoy-lb-operator/config"

"github.com/gojekfarm/envoy-lb-operator/envoy"
kube "github.com/gojekfarm/envoy-lb-operator/kube"
"github.com/gojekfarm/envoy-lb-operator/kube"
"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestSnapshotVersion(t *testing.T) {
lb := envoy.NewLB("node1", config.EnvoyConfig{}, cache.NewSnapshotCache(true, envoy.Hasher{}, envoy.Logger{}), false)
assert.Equal(t, int32(0), lb.ConfigVersion)
assert.Equal(t, int32(0), lb.GetCacheVersion())
}

func TestSnapshotVersionIncrementsOnHandleEvents(t *testing.T) {
lb := envoy.NewLB("node1", config.EnvoyConfig{}, cache.NewSnapshotCache(true, envoy.Hasher{}, envoy.Logger{}), false)
assert.Equal(t, int32(0), lb.ConfigVersion)
assert.Equal(t, int32(0), lb.GetCacheVersion())
lb.Trigger(envoy.LBEvent{
Svc: kube.Service{Address: "foo", Port: uint32(8000), Type: kube.GRPC, Path: "/foo", Domain: "*"},
EventType: envoy.ADDED,
})
lb.Close()
lb.HandleEvents()
assert.Equal(t, int32(1), lb.ConfigVersion)
assert.Equal(t, int32(1), lb.GetCacheVersion())
}

func TestSnapshotVersionDoesNotIncrementOnSnapshotRunnerIfAutoRefreshIsDisabled(t *testing.T) {
lb := envoy.NewLB("node1", config.EnvoyConfig{}, cache.NewSnapshotCache(true, envoy.Hasher{}, envoy.Logger{}), false)
assert.Equal(t, int32(0), lb.ConfigVersion)
assert.Equal(t, int32(0), lb.GetCacheVersion())
lb.SnapshotRunner()
assert.Equal(t, int32(0), lb.ConfigVersion)
assert.Equal(t, int32(0), lb.GetCacheVersion())
}

func TestSnapshotVersionIncrementsOnSnapshotRunnerIfAutoRefreshIsEnabled(t *testing.T) {
lb := envoy.NewLB("node1", config.EnvoyConfig{}, cache.NewSnapshotCache(true, envoy.Hasher{}, envoy.Logger{}), true)
assert.Equal(t, int32(0), lb.ConfigVersion)
assert.Equal(t, int32(0), lb.GetCacheVersion())
lb.SnapshotRunner()
assert.Equal(t, int32(1), lb.ConfigVersion)
assert.Equal(t, int32(1), lb.GetCacheVersion())
}

func TestSnapshotVersionIncrementsOnEndpointTrigger(t *testing.T) {
lb := envoy.NewLB("node1", config.EnvoyConfig{}, cache.NewSnapshotCache(true, envoy.Hasher{}, envoy.Logger{}), false)
assert.Equal(t, int32(0), lb.ConfigVersion)
assert.Equal(t, int32(0), lb.GetCacheVersion())
lb.EndpointTrigger()
assert.Equal(t, int32(1), lb.ConfigVersion)
assert.Equal(t, int32(1), lb.GetCacheVersion())
}

func TestInitialState(t *testing.T) {
lb := envoy.NewLB("node1", config.EnvoyConfig{}, cache.NewSnapshotCache(true, envoy.Hasher{}, envoy.Logger{}), false)
lb.SnapshotRunner()
sn, _ := lb.Config.GetSnapshot("node1")
sn, _ := lb.GetCache().GetSnapshot("node1")
assert.Equal(t, 1, len(sn.Listeners.Items))
assert.Equal(t, 0, len(sn.Clusters.Items))
}
Expand All @@ -68,7 +70,7 @@ func TestAddedUpstream(t *testing.T) {
lb.Close()
lb.HandleEvents()
lb.SnapshotRunner()
sn, _ := lb.Config.GetSnapshot("node1")
sn, _ := lb.GetCache().GetSnapshot("node1")
assert.Equal(t, 1, len(sn.Listeners.Items))
assert.Equal(t, 1, len(sn.Clusters.Items))
}
Expand All @@ -89,7 +91,7 @@ func TestAddUpdatedUpstream(t *testing.T) {
lb.Close()
lb.HandleEvents()
lb.SnapshotRunner()
sn, _ := lb.Config.GetSnapshot("node1")
sn, _ := lb.GetCache().GetSnapshot("node1")
assert.Equal(t, 1, len(sn.Listeners.Items))
assert.Equal(t, 1, len(sn.Clusters.Items))
cfg, _ := json.Marshal(sn.Clusters.Items["foo_cluster"])
Expand All @@ -110,7 +112,7 @@ func TestDeletedUpstream(t *testing.T) {
lb.Close()
lb.HandleEvents()
lb.SnapshotRunner()
sn, _ := lb.Config.GetSnapshot("node1")
sn, _ := lb.GetCache().GetSnapshot("node1")
assert.Equal(t, 1, len(sn.Listeners.Items))
assert.Equal(t, 0, len(sn.Clusters.Items))
}
Expand All @@ -129,7 +131,7 @@ func TestSingleVhostDifferentPaths(t *testing.T) {
lb.Close()
lb.HandleEvents()
lb.SnapshotRunner()
sn, _ := lb.Config.GetSnapshot("node1")
sn, _ := lb.GetCache().GetSnapshot("node1")
assert.Equal(t, 1, len(sn.Listeners.Items))
//No Easy way to assert
//cfg, _ := json.Marshal(sn.Listeners.Items["assert.Equal(t, "", string(cfg))
Expand All @@ -149,8 +151,43 @@ func TestMultipleVhostsDifferentPaths(t *testing.T) {
lb.Close()
lb.HandleEvents()
lb.SnapshotRunner()
sn, _ := lb.Config.GetSnapshot("node1")
sn, _ := lb.GetCache().GetSnapshot("node1")
assert.Equal(t, 1, len(sn.Listeners.Items))
//No Easy way to assert
//cfg, _ := json.Marshal(sn.Listeners.Items["assert.Equal(t, "", string(cfg))
}

func TestInitializeMultipleUpstreamsOnStart(t *testing.T) {
lb := envoy.NewLB("node1", config.EnvoyConfig{}, cache.NewSnapshotCache(true, envoy.Hasher{}, envoy.Logger{}), false)
svc1 := corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
},
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{{
Port: int32(1234),
}},
},
}

svc2 := corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "bar",
},
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{{
Port: int32(1234),
}},
},
}
svcList := &corev1.ServiceList{Items: []corev1.Service{svc1, svc2}}
expectedSvc1 := kube.Service(kube.Service{Address: "foo", Port: 0x0, Type: 0, Path: "/", Domain: "*"})
expectedSvc2 := kube.Service(kube.Service{Address: "bar", Port: 0x0, Type: 0, Path: "/", Domain: "*"})

lb.InitializeUpstream(svcList)

assert.Equal(t, int32(1), lb.GetCacheVersion())
assert.Equal(t, 2, len(lb.GetUpstreams()))
assert.Equal(t, expectedSvc1, lb.GetUpstreams()["foo"])
assert.Equal(t, expectedSvc2, lb.GetUpstreams()["bar"])
}
12 changes: 3 additions & 9 deletions server/kubehandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ func filterEvents(endpointLabel string) func(*metav1.ListOptions) {
}
}

func StartSvcKubeHandler(client *kubernetes.Clientset, triggerfunc func(eventType envoy.LBEventType, svc *v1.Service), endpointLabel, namespace string) context.CancelFunc {
func StartSvcKubeHandler(client *kubernetes.Clientset, triggerfunc func(eventType envoy.LBEventType, svc *v1.Service), upstreamLabel, namespace string) context.CancelFunc {
ctx, cancel := context.WithCancel(context.Background())
kubeInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(client, time.Second*1, kubeinformers.WithNamespace(namespace), kubeinformers.WithTweakListOptions(filterEvents(endpointLabel)))
kubeInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(client, time.Second*1, kubeinformers.WithNamespace(namespace), kubeinformers.WithTweakListOptions(filterEvents(upstreamLabel)))
informer := kubeInformerFactory.Core().V1().Services().Informer()
discoveryHandler := &handler.SvcDiscovery{
CoreClient: client.CoreV1(),
Expand All @@ -33,14 +33,8 @@ func StartSvcKubeHandler(client *kubernetes.Clientset, triggerfunc func(eventTyp
}
loop := kubehandler.NewEventLoop("service_queue")
loop.Register(discoveryHandler)
go kubeInformerFactory.Start(ctx.Done())
go loop.Run(20, ctx.Done())

// Initialise for the beginning
serviceList, _ := client.CoreV1().Services(namespace).List(metav1.ListOptions{LabelSelector: endpointLabel})
for _, svc := range serviceList.Items {
triggerfunc(envoy.ADDED, &svc)
}
go kubeInformerFactory.Start(ctx.Done())

return cancel
}
Expand Down

0 comments on commit e52573d

Please sign in to comment.