Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add events for major lifecycle events #1137

Merged
merged 1 commit into from
Jul 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions pkg/backends/ig_linker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"k8s.io/ingress-gce/pkg/annotations"
"k8s.io/ingress-gce/pkg/backends/features"
"k8s.io/ingress-gce/pkg/instances"
"k8s.io/ingress-gce/pkg/test"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/legacy-cloud-providers/gce"
)
Expand All @@ -47,7 +48,7 @@ func newTestIGLinker(fakeGCE *gce.Cloud, fakeInstancePool instances.NodePool) *i

func TestLink(t *testing.T) {
fakeIGs := instances.NewFakeInstanceGroups(sets.NewString(), defaultNamer)
fakeNodePool := instances.NewNodePool(fakeIGs, defaultNamer)
fakeNodePool := instances.NewNodePool(fakeIGs, defaultNamer, &test.FakeRecorderSource{})
fakeGCE := gce.NewFakeGCECloud(gce.DefaultTestClusterValues())
linker := newTestIGLinker(fakeGCE, fakeNodePool)

Expand Down Expand Up @@ -77,7 +78,7 @@ func TestLink(t *testing.T) {

func TestLinkWithCreationModeError(t *testing.T) {
fakeIGs := instances.NewFakeInstanceGroups(sets.NewString(), defaultNamer)
fakeNodePool := instances.NewNodePool(fakeIGs, defaultNamer)
fakeNodePool := instances.NewNodePool(fakeIGs, defaultNamer, &test.FakeRecorderSource{})
fakeGCE := gce.NewFakeGCECloud(gce.DefaultTestClusterValues())
linker := newTestIGLinker(fakeGCE, fakeNodePool)

Expand Down
3 changes: 2 additions & 1 deletion pkg/backends/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"k8s.io/ingress-gce/pkg/annotations"
"k8s.io/ingress-gce/pkg/healthchecks"
"k8s.io/ingress-gce/pkg/instances"
"k8s.io/ingress-gce/pkg/test"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/legacy-cloud-providers/gce"
)
Expand All @@ -45,7 +46,7 @@ func newTestJig(fakeGCE *gce.Cloud) *Jig {
fakeBackendPool := NewPool(fakeGCE, defaultNamer)

fakeIGs := instances.NewFakeInstanceGroups(sets.NewString(), defaultNamer)
fakeInstancePool := instances.NewNodePool(fakeIGs, defaultNamer)
fakeInstancePool := instances.NewNodePool(fakeIGs, defaultNamer, &test.FakeRecorderSource{})
fakeInstancePool.Init(&instances.FakeZoneLister{Zones: []string{defaultZone}})

// Add standard hooks for mocking update calls. Each test can set a different update hook if it chooses to.
Expand Down
30 changes: 16 additions & 14 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"k8s.io/ingress-gce/pkg/common/operator"
"k8s.io/ingress-gce/pkg/context"
"k8s.io/ingress-gce/pkg/controller/translator"
"k8s.io/ingress-gce/pkg/events"
"k8s.io/ingress-gce/pkg/flags"
"k8s.io/ingress-gce/pkg/frontendconfig"
"k8s.io/ingress-gce/pkg/healthchecks"
Expand Down Expand Up @@ -108,7 +109,7 @@ func NewLoadBalancerController(
})

healthChecker := healthchecks.NewHealthChecker(ctx.Cloud, ctx.HealthCheckPath, ctx.DefaultBackendSvcPort.ID.Service)
instancePool := instances.NewNodePool(ctx.Cloud, ctx.ClusterNamer)
instancePool := instances.NewNodePool(ctx.Cloud, ctx.ClusterNamer, ctx)
backendPool := backends.NewPool(ctx.Cloud, ctx.ClusterNamer)

lbc := LoadBalancerController{
Expand Down Expand Up @@ -139,8 +140,8 @@ func NewLoadBalancerController(
return
}

klog.V(3).Infof("Ingress %v added, enqueuing", common.NamespacedName(addIng))
lbc.ctx.Recorder(addIng.Namespace).Eventf(addIng, apiv1.EventTypeNormal, "ADD", common.NamespacedName(addIng))
klog.V(2).Infof("Ingress %v added, enqueuing", common.NamespacedName(addIng))
lbc.ctx.Recorder(addIng.Namespace).Eventf(addIng, apiv1.EventTypeNormal, events.SyncIngress, "Scheduled for sync")
lbc.ingQueue.Enqueue(obj)
},
DeleteFunc: func(obj interface{}) {
Expand Down Expand Up @@ -176,11 +177,11 @@ func NewLoadBalancerController(
return
}
if reflect.DeepEqual(old, cur) {
klog.V(3).Infof("Periodic enqueueing of %v", common.NamespacedName(curIng))
klog.V(2).Infof("Periodic enqueueing of %v", common.NamespacedName(curIng))
} else {
klog.V(3).Infof("Ingress %v changed, enqueuing", common.NamespacedName(curIng))
klog.V(2).Infof("Ingress %v changed, enqueuing", common.NamespacedName(curIng))
}

lbc.ctx.Recorder(curIng.Namespace).Eventf(curIng, apiv1.EventTypeNormal, events.SyncIngress, "Scheduled for sync")
lbc.ingQueue.Enqueue(cur)
},
})
Expand Down Expand Up @@ -558,7 +559,8 @@ func (lbc *LoadBalancerController) sync(key string) error {
err := lbc.ingSyncer.GC(allIngresses, ing, frontendGCAlgorithm)
// Skip emitting an event if ingress does not exist as we cannot retrieve ingress namespace.
if err != nil && ingExists {
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, "GC", fmt.Sprintf("Error during GC: %v", err))
klog.Errorf("Error in GC for %s/%s: %v", ing.Namespace, ing.Name, err)
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, events.GarbageCollection, "Error: %v", err)
}
// Delete the ingress state for metrics after GC is successful.
if err == nil && ingExists {
Expand All @@ -578,16 +580,16 @@ func (lbc *LoadBalancerController) sync(key string) error {
urlMap, errs := lbc.Translator.TranslateIngress(ing, lbc.ctx.DefaultBackendSvcPort.ID, lbc.ctx.ClusterNamer)

if errs != nil {
msg := fmt.Errorf("error while evaluating the ingress spec: %v", utils.JoinErrs(errs))
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, "Translate", msg.Error())
msg := fmt.Errorf("invalid ingress spec: %v", utils.JoinErrs(errs))
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, events.TranslateIngress, "Translation failed: %v", msg)
return msg
}

// Sync GCP resources.
syncState := &syncState{urlMap, ing, nil}
syncErr := lbc.ingSyncer.Sync(syncState)
if syncErr != nil {
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, "Sync", fmt.Sprintf("Error during sync: %v", syncErr.Error()))
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, events.SyncIngress, "Error syncing to GCP: %v", syncErr.Error())
} else {
// Insert/update the ingress state for metrics after successful sync.
lbc.metrics.SetIngress(key, metrics.NewIngressState(ing, urlMap.AllServicePorts()))
Expand All @@ -598,7 +600,7 @@ func (lbc *LoadBalancerController) sync(key string) error {
// free up enough quota for the next sync to pass.
frontendGCAlgorithm := frontendGCAlgorithm(ingExists, ing)
if gcErr := lbc.ingSyncer.GC(allIngresses, ing, frontendGCAlgorithm); gcErr != nil {
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, "GC", fmt.Sprintf("Error during GC: %v", gcErr))
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, events.GarbageCollection, "Error during garbage collection: %v", gcErr)
return fmt.Errorf("error during sync %v, error during GC %v", syncErr, gcErr)
}

Expand Down Expand Up @@ -633,7 +635,7 @@ func (lbc *LoadBalancerController) updateIngressStatus(l7 *loadbalancers.L7, ing
klog.Errorf("PatchIngressStatus(%s/%s) failed: %v", currIng.Namespace, currIng.Name, err)
return err
}
lbc.ctx.Recorder(ing.Namespace).Eventf(currIng, apiv1.EventTypeNormal, "CREATE", "ip: %v", ip)
lbc.ctx.Recorder(ing.Namespace).Eventf(currIng, apiv1.EventTypeNormal, events.IPChanged, "IP is now %v", ip)
}
}
annotations, err := loadbalancers.GetLBAnnotations(l7, currIng.Annotations, lbc.backendSyncer)
Expand All @@ -655,7 +657,7 @@ func (lbc *LoadBalancerController) toRuntimeInfo(ing *v1beta1.Ingress, urlMap *u
if apierrors.IsNotFound(err) {
// TODO: this path should be removed when external certificate managers migrate to a better solution.
const msg = "Could not find TLS certificates. Continuing setup for the load balancer to serve HTTP. Note: this behavior is deprecated and will be removed in a future version of ingress-gce"
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, "Sync", msg)
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, events.SyncIngress, msg)
} else {
klog.Errorf("Could not get certificates for ingress %s/%s: %v", ing.Namespace, ing.Name, err)
return nil, err
Expand All @@ -666,7 +668,7 @@ func (lbc *LoadBalancerController) toRuntimeInfo(ing *v1beta1.Ingress, urlMap *u
if lbc.ctx.FrontendConfigEnabled {
feConfig, err = frontendconfig.FrontendConfigForIngress(lbc.ctx.FrontendConfigs().List(), ing)
if err != nil {
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, "Sync", fmt.Sprintf("%v", err))
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, events.SyncIngress, "Error: %v", err)
}
// Object in cache could be changed in-flight. Deepcopy to
// reduce race conditions.
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func newLoadBalancerController() *LoadBalancerController {
ctx := context.NewControllerContext(nil, kubeClient, backendConfigClient, nil, fakeGCE, namer, "" /*kubeSystemUID*/, ctxConfig)
lbc := NewLoadBalancerController(ctx, stopCh)
// TODO(rramkumar): Fix this so we don't have to override with our fake
lbc.instancePool = instances.NewNodePool(instances.NewFakeInstanceGroups(sets.NewString(), namer), namer)
lbc.instancePool = instances.NewNodePool(instances.NewFakeInstanceGroups(sets.NewString(), namer), namer, &test.FakeRecorderSource{})
lbc.l7Pool = loadbalancers.NewLoadBalancerPool(fakeGCE, namer, events.RecorderProducerMock{}, namer_util.NewFrontendNamerFactory(namer, ""))
lbc.instancePool.Init(&instances.FakeZoneLister{Zones: []string{fakeZone}})

Expand Down
49 changes: 49 additions & 0 deletions pkg/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,22 @@ limitations under the License.
package events

import (
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"
)

const (
AddNodes = "IngressGCE_AddNodes"
RemoveNodes = "IngressGCE_RemoveNodes"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need the IngressGCE prefix? Is it to distinguish from other Ingress controllers for the same type of action?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The event is not associated with a resource, so there is no other way to tell where this event came from.


SyncIngress = "Sync"
TranslateIngress = "Translate"
IPChanged = "IPChanged"
GarbageCollection = "GarbageCollection"

SyncService = "Sync"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the difference between this and SyncIngress?

Copy link
Member Author

@bowei bowei Jun 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the value is the same, but this identifies it so we can easily change it if needed

)

type RecorderProducer interface {
Recorder(ns string) record.EventRecorder
}
Expand All @@ -30,3 +43,39 @@ type RecorderProducerMock struct {
func (r RecorderProducerMock) Recorder(ns string) record.EventRecorder {
return &record.FakeRecorder{}
}

// GloablEventf records a Cluster level event not attached to a given object.
func GlobalEventf(r record.EventRecorder, eventtype, reason, messageFmt string, args ...interface{}) {
// Using an empty ObjectReference to indicate no associated
// resource. This apparently works, see the package
// k8s.io/client-go/tools/record.
r.Eventf(&v1.ObjectReference{}, eventtype, reason, messageFmt, args...)
}

// truncatedStringListMax is a variable to make testing easier. This
// value should not be modified.
var truncatedStringListMax = 2000

// TruncateStringList will render the list of items as a string,
// eliding elements with elipsis at the end if there are more than a
// reasonable number of characters in the resulting string. This is
// used to prevent accidentally dumping enormous strings into the
// Event description.
func TruncatedStringList(items []string) string {
var (
ret = "["
first = true
)
for _, s := range items {
if len(ret)+len(s)+1 > truncatedStringListMax {
ret += ", ..."
break
}
if !first {
ret += ", "
}
first = false
ret += s
}
return ret + "]"
}
34 changes: 34 additions & 0 deletions pkg/events/events_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package events

import (
"fmt"
"testing"
)

func TestTruncatedStringList(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

address TODO

var saved int
truncatedStringListMax, saved = 30, truncatedStringListMax
defer func() { truncatedStringListMax = saved }()

for _, tc := range []struct {
desc string
count int
want string
}{
{"zero", 0, "[]"},
{"one", 1, "[elt-0]"},
{"not truncated", 4, "[elt-0, elt-1, elt-2, elt-3]"},
{"truncated", 20, "[elt-0, elt-1, elt-2, elt-3, ...]"},
} {
t.Run(tc.desc, func(t *testing.T) {
var l []string
for i := 0; i < tc.count; i++ {
l = append(l, fmt.Sprintf("elt-%d", i))
}
got := TruncatedStringList(l)
if got != tc.want {
t.Errorf("TruncatedString(%v) = %q; want %q", l, got, tc.want)
}
})
}
}
35 changes: 26 additions & 9 deletions pkg/instances/instances.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ import (
"net/http"
"time"

"google.golang.org/api/compute/v1"
"k8s.io/client-go/tools/record"
"k8s.io/ingress-gce/pkg/events"
"k8s.io/ingress-gce/pkg/utils/namer"
"k8s.io/klog"

"google.golang.org/api/compute/v1"
core "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"

"k8s.io/ingress-gce/pkg/utils"
Expand All @@ -39,16 +42,22 @@ const (
type Instances struct {
cloud InstanceGroups
ZoneLister
namer namer.BackendNamer
namer namer.BackendNamer
recorder record.EventRecorder
}

type recorderSource interface {
Recorder(ns string) record.EventRecorder
}

// NewNodePool creates a new node pool.
// - cloud: implements InstanceGroups, used to sync Kubernetes nodes with
// members of the cloud InstanceGroup.
func NewNodePool(cloud InstanceGroups, namer namer.BackendNamer) NodePool {
func NewNodePool(cloud InstanceGroups, namer namer.BackendNamer, recorders recorderSource) NodePool {
return &Instances{
cloud: cloud,
namer: namer,
cloud: cloud,
namer: namer,
recorder: recorders.Recorder(""), // No namespace
}
}

Expand Down Expand Up @@ -250,7 +259,8 @@ func (i *Instances) splitNodesByZone(names []string) map[string][]string {

// Add adds the given instances to the appropriately zoned Instance Group.
func (i *Instances) Add(groupName string, names []string) error {
errs := []error{}
events.GlobalEventf(i.recorder, core.EventTypeNormal, events.AddNodes, "Adding %s to InstanceGroup %q", events.TruncatedStringList(names), groupName)
var errs []error
for zone, nodeNames := range i.splitNodesByZone(names) {
klog.V(1).Infof("Adding nodes %v to %v in zone %v", nodeNames, groupName, zone)
if err := i.cloud.AddInstancesToInstanceGroup(groupName, zone, i.cloud.ToInstanceReferences(zone, nodeNames)); err != nil {
Expand All @@ -260,12 +270,16 @@ func (i *Instances) Add(groupName string, names []string) error {
if len(errs) == 0 {
return nil
}
return fmt.Errorf("%v", errs)

err := fmt.Errorf("AddInstances: %v", errs)
events.GlobalEventf(i.recorder, core.EventTypeWarning, events.AddNodes, "Error adding %s to InstanceGroup %q: %v", events.TruncatedStringList(names), groupName, err)
return err
}

// Remove removes the given instances from the appropriately zoned Instance Group.
func (i *Instances) Remove(groupName string, names []string) error {
errs := []error{}
events.GlobalEventf(i.recorder, core.EventTypeNormal, events.RemoveNodes, "Removing %s from InstanceGroup %q", events.TruncatedStringList(names), groupName)
var errs []error
for zone, nodeNames := range i.splitNodesByZone(names) {
klog.V(1).Infof("Removing nodes %v from %v in zone %v", nodeNames, groupName, zone)
if err := i.cloud.RemoveInstancesFromInstanceGroup(groupName, zone, i.cloud.ToInstanceReferences(zone, nodeNames)); err != nil {
Expand All @@ -275,7 +289,10 @@ func (i *Instances) Remove(groupName string, names []string) error {
if len(errs) == 0 {
return nil
}
return fmt.Errorf("%v", errs)

err := fmt.Errorf("RemoveInstances: %v", errs)
events.GlobalEventf(i.recorder, core.EventTypeWarning, events.RemoveNodes, "Error removing nodes %s from InstanceGroup %q: %v", events.TruncatedStringList(names), groupName, err)
return err
}

// Sync nodes with the instances in the instance group.
Expand Down
3 changes: 2 additions & 1 deletion pkg/instances/instances_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"testing"

"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/ingress-gce/pkg/test"
"k8s.io/ingress-gce/pkg/utils/namer"
)

Expand All @@ -28,7 +29,7 @@ const defaultZone = "default-zone"
var defaultNamer = namer.NewNamer("uid1", "fw1")

func newNodePool(f *FakeInstanceGroups, zone string) NodePool {
pool := NewNodePool(f, defaultNamer)
pool := NewNodePool(f, defaultNamer, &test.FakeRecorderSource{})
pool.Init(&FakeZoneLister{[]string{zone}})
return pool
}
Expand Down
Loading