Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Telemetry Job #4896

Merged
merged 29 commits into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
838c94a
Add base telemetry job
shaun-nx Jan 5, 2024
ca9b9f7
Ensure only leader pod reports data
shaun-nx Jan 9, 2024
5e3d349
Allow deployments to opt-out of telemetry collection
shaun-nx Jan 10, 2024
a9224db
Add log line for when telemetry is collected
shaun-nx Jan 10, 2024
54b896b
gofumpt files
shaun-nx Jan 10, 2024
8011a08
Revert deployment yaml and fake manager
shaun-nx Jan 10, 2024
079a906
Fix nginx version assignment
shaun-nx Jan 10, 2024
3a69f51
Merge branch 'main' into feat/telemetry
shaun-nx Jan 10, 2024
82a91a6
Resolve lint issues
shaun-nx Jan 10, 2024
ff7515e
Placeholder for telemetry collector
jjngx Jan 22, 2024
9448ae9
Simplify telemetry reporting flags
jjngx Jan 23, 2024
dc634ac
Limit reporting period to min 1m
jjngx Jan 23, 2024
3fa1756
Set min reporting period to 1h
jjngx Jan 24, 2024
c78e3fd
Merge branch 'main' into feat/telemetry
jjngx Jan 24, 2024
922bbe6
Use temp exporter for sending data
jjngx Jan 24, 2024
da52eed
Merge branch 'main' into feat/telemetry
jjngx Jan 24, 2024
2b8e4ad
Merge branch 'main' into feat/telemetry
jjngx Jan 25, 2024
301ed1e
Merge branch 'main' into feat/telemetry
jjngx Jan 26, 2024
cf31f97
Return fake nginx version
jjngx Jan 29, 2024
e249b56
Revert nginx version check changes
shaun-nx Jan 31, 2024
22ca1e8
Merge branch 'main' into feat/telemetry
shaun-nx Jan 31, 2024
37bf9fe
Merge branch 'main' into feat/telemetry
jjngx Feb 6, 2024
e248aa9
Set min reporting time to 1m
jjngx Feb 6, 2024
ec8678d
Merge branch 'main' into feat/telemetry
jjngx Feb 6, 2024
a56bc49
Merge branch 'main' into feat/telemetry
jjngx Feb 6, 2024
2cae174
Set default reporting period and add unit test for new telemetry coll…
shaun-nx Feb 6, 2024
d401d49
Add telemetry reporting flag to helm values
shaun-nx Feb 6, 2024
1d04236
Fix telemetry unit test
shaun-nx Feb 6, 2024
cebd70c
Merge branch 'main' into feat/telemetry
shaun-nx Feb 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions cmd/nginx-ingress/flags.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package main

import (
"errors"
"flag"
"fmt"
"net"
"os"
"regexp"
"strconv"
"strings"
"time"

"github.com/golang/glog"
api_v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -201,6 +203,10 @@ var (

enableDynamicSSLReload = flag.Bool(dynamicSSLReloadParam, true, "Enable reloading of SSL Certificates without restarting the NGINX process.")

enableTelemetryReporting = flag.Bool("enable-telemetry-reporting", true, "Enable gathering and reporting of product related telemetry.")
oseoin marked this conversation as resolved.
Show resolved Hide resolved
// telemetryReportingPeriod exists only until NIC v3.5 is released. It is used only for demo and testing.
telemetryReportingPeriod = flag.String("telemetry-reporting-period", "24h", "Period at which product telemetry is reported.")

startupCheckFn func() error
)

Expand Down Expand Up @@ -384,6 +390,12 @@ func validationChecks() {
glog.Fatalf("Invalid value for app-protect-log-level: %v", *appProtectLogLevel)
}
}

if telemetryReportingPeriod != nil {
if err := validateReportingPeriod(*telemetryReportingPeriod); err != nil {
glog.Fatalf("Invalid value for telemetry-reporting-period: %v", err)
}
}
}

// validateNamespaceNames validates the namespaces are in the correct format
Expand Down Expand Up @@ -489,3 +501,17 @@ func validateLocation(location string) error {
}
return nil
}

// validateReportingPeriod checks if the reporting period parameter can be parsed.
//
// This function will be deprecated in NIC v3.5. It is used only for demo and testing purpose.
func validateReportingPeriod(period string) error {
duration, err := time.ParseDuration(period)
if err != nil {
return err
}
if duration.Minutes() < 1 {
return errors.New("invalid reporting period, expected minimum 1m")
}
return nil
}
24 changes: 24 additions & 0 deletions cmd/nginx-ingress/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,3 +172,27 @@ func TestValidateNamespaces(t *testing.T) {
}
}
}

func TestValidateReportingPeriodWithInvalidInput(t *testing.T) {
t.Parallel()

periods := []string{"", "-1", "1x", "abc", "-", "30s", "10ms", "0h"}
for _, p := range periods {
err := validateReportingPeriod(p)
if err == nil {
t.Errorf("want error on invalid period %s, got nil", p)
}
}
}

func TestValidateReportingPeriodWithValidInput(t *testing.T) {
t.Parallel()

periods := []string{"1m", "1h", "24h"}
for _, p := range periods {
err := validateReportingPeriod(p)
if err != nil {
t.Error(err)
}
}
}
6 changes: 5 additions & 1 deletion cmd/nginx-ingress/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ import (
)

// Injected during build
var version string
var (
version string
)

const (
nginxVersionLabel = "app.nginx.org/version"
Expand Down Expand Up @@ -199,6 +201,8 @@ func main() {
ExternalDNSEnabled: *enableExternalDNS,
IsIPV6Disabled: *disableIPV6,
WatchNamespaceLabel: *watchNamespaceLabel,
TelemetryReportPeriod: *telemetryReportingPeriod,
EnableTelemetryReporting: *enableTelemetryReporting,
}

lbc := k8s.NewLoadBalancerController(lbcInput)
Expand Down
30 changes: 30 additions & 0 deletions internal/k8s/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"sync"
"time"

"github.com/nginxinc/kubernetes-ingress/internal/telemetry"

"github.com/nginxinc/kubernetes-ingress/pkg/apis/dos/v1beta1"
"golang.org/x/exp/maps"

Expand Down Expand Up @@ -161,6 +163,8 @@ type LoadBalancerController struct {
enableBatchReload bool
isIPV6Disabled bool
namespaceWatcherController cache.Controller
telemetryCollector *telemetry.Collector
telemetryChan chan struct{}
}

var keyFunc = cache.DeletionHandlingMetaNamespaceKeyFunc
Expand Down Expand Up @@ -206,6 +210,8 @@ type NewLoadBalancerControllerInput struct {
ExternalDNSEnabled bool
IsIPV6Disabled bool
WatchNamespaceLabel string
TelemetryReportPeriod string
EnableTelemetryReporting bool
}

// NewLoadBalancerController creates a controller
Expand Down Expand Up @@ -271,6 +277,18 @@ func NewLoadBalancerController(input NewLoadBalancerControllerInput) *LoadBalanc
lbc.externalDNSController = ed_controller.NewController(ed_controller.BuildOpts(context.TODO(), lbc.namespaceList, lbc.recorder, lbc.confClient, input.ResyncPeriod, isDynamicNs))
}

// NIC Telemetry Reporting
if input.EnableTelemetryReporting {
lbc.telemetryChan = make(chan struct{})
collector, err := telemetry.NewCollector(
telemetry.WithTimePeriod(input.TelemetryReportPeriod),
)
if err != nil {
glog.Fatalf("failed to initialize telemetry collector: %v", err)
}
lbc.telemetryCollector = collector
}

glog.V(3).Infof("Nginx Ingress Controller has class: %v", input.IngressClass)

lbc.namespacedInformers = make(map[string]*namespacedInformer)
Expand Down Expand Up @@ -683,10 +701,22 @@ func (lbc *LoadBalancerController) Run() {
if lbc.externalDNSController != nil {
go lbc.externalDNSController.Run(lbc.ctx.Done())
}

if lbc.leaderElector != nil {
go lbc.leaderElector.Run(lbc.ctx)
}

if lbc.telemetryCollector != nil {
go func(ctx context.Context) {
select {
case <-lbc.telemetryChan:
lbc.telemetryCollector.Start(lbc.ctx)
case <-ctx.Done():
return
}
}(lbc.ctx)
}

for _, nif := range lbc.namespacedInformers {
nif.start()
}
Expand Down
4 changes: 4 additions & 0 deletions internal/k8s/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ func createLeaderHandler(lbc *LoadBalancerController) leaderelection.LeaderCallb
return leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
glog.V(3).Info("started leading")
// Closing this channel allows the leader to start the telemetry reporting process
if lbc.telemetryChan != nil {
close(lbc.telemetryChan)
}
if lbc.reportIngressStatus {
ingresses := lbc.configuration.GetResourcesWithFilter(resourceFilter{Ingresses: true})

Expand Down
2 changes: 1 addition & 1 deletion internal/nginx/fake_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (fm *FakeManager) CreateDHParam(_ string) (string, error) {
// Version provides a fake implementation of Version.
func (*FakeManager) Version() Version {
glog.V(3).Info("Printing nginx version")
return Version{}
return NewVersion("nginx version: nginx/1.25.3 (nginx-plus-r31)")
jjngx marked this conversation as resolved.
Show resolved Hide resolved
}

// Start provides a fake implementation of Start.
Expand Down
135 changes: 135 additions & 0 deletions internal/telemetry/telemetry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Package telemetry provides functionality for collecting and exporting NIC telemetry data.
package telemetry

import (
"context"
"fmt"
"io"
"time"

"github.com/golang/glog"
"k8s.io/apimachinery/pkg/util/wait"
)

// DiscardExporter is a temporary exporter
// for discarding collected telemetry data.
var DiscardExporter = Exporter{Endpoint: io.Discard}

// Exporter represents a temporary telemetry data exporter.
type Exporter struct {
Endpoint io.Writer
}

// Export takes context and trace data and writes to the endpoint.
func (e *Exporter) Export(_ context.Context, td TraceData) error {
// Note: exporting functionality will be implemented in a separate module.
fmt.Fprintf(e.Endpoint, "%+v", td)
return nil
}

// TraceData holds collected NIC telemetry data.
type TraceData struct {
// Numer of VirtualServers
VSCount int
// Number of TransportServers
TSCount int

// TODO
// Add more fields for NIC data points
}

// Option is a functional option used for configuring TraceReporter.
type Option func(*Collector) error

// WithTimePeriod configures reporting time on TraceReporter.
func WithTimePeriod(period string) Option {
return func(c *Collector) error {
d, err := time.ParseDuration(period)
if err != nil {
return err
}
c.Period = d
return nil
}
}

// WithExporter configures telemetry collector to use given exporter.
//
// This may change in the future when we use exporter implemented
// in the external module.
func WithExporter(e Exporter) Option {
return func(c *Collector) error {
c.Exporter = e
return nil
}
}

// Collector is NIC telemetry data collector.
type Collector struct {
Period time.Duration

// Exporter is a temp exporter for exporting telemetry data.
// The concrete implementation will be implemented in a separate module.
Exporter Exporter
}

// NewCollector takes 0 or more options and creates a new TraceReporter.
// If no options are provided, NewReporter returns TraceReporter
// configured to gather data every 24h.
func NewCollector(opts ...Option) (*Collector, error) {
c := Collector{
Period: 24 * time.Hour,
Exporter: DiscardExporter, // Use DiscardExporter until the real exporter is available.
}
for _, o := range opts {
if err := o(&c); err != nil {
return nil, err
}
}
return &c, nil
}

// BuildReport takes context and builds report from gathered telemetry data.
func (c *Collector) BuildReport(context.Context) (TraceData, error) {
dt := TraceData{}

// TODO: Implement handling and logging errors for each collected data point

return dt, nil
}

// Collect collects and exports telemetry data.
// It exports data using provided exporter.
func (c *Collector) Collect(ctx context.Context) {
glog.V(3).Info("Collecting telemetry data")
traceData, err := c.BuildReport(ctx)
if err != nil {
glog.Errorf("Error collecting telemetry data: %v", err)
}
err = c.Exporter.Export(ctx, traceData)
if err != nil {
glog.Errorf("Error exporting telemetry data: %v", err)
}
glog.V(3).Info("Exported telemetry data")
}

// Start starts running NIC Telemetry Collector.
func (c *Collector) Start(ctx context.Context) {
wait.JitterUntilWithContext(ctx, c.Collect, c.Period, 0.1, true)
}

// GetVSCount returns number of VirtualServers in watched namespaces.
//
// Note: this is a placeholder function.
func (c *Collector) GetVSCount() int {
// Placeholder function
return 0
}

// GetTSCount returns number of TransportServers in watched namespaces.
//
// Note: this is a placeholder function.
func (c *Collector) GetTSCount() int {
// Placeholder function
return 0
}
61 changes: 61 additions & 0 deletions internal/telemetry/telemetry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package telemetry_test

import (
"bytes"
"context"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/nginxinc/kubernetes-ingress/internal/telemetry"
)

func TestCreateNewDefaultCollector(t *testing.T) {
t.Parallel()

c, err := telemetry.NewCollector()
if err != nil {
t.Fatal(err)
}

want := 24.0
got := c.Period.Hours()

if !cmp.Equal(want, got) {
t.Error(cmp.Diff(want, got))
}
}

func TestCreateNewCollectorWithCustomReportingPeriod(t *testing.T) {
t.Parallel()

c, err := telemetry.NewCollector(telemetry.WithTimePeriod("4h"))
if err != nil {
t.Fatal(err)
}

want := 4.0
got := c.Period.Hours()

if !cmp.Equal(want, got) {
t.Error(cmp.Diff(want, got))
}
}

func TestCreateNewCollectorWithCustomExporter(t *testing.T) {
t.Parallel()

buf := &bytes.Buffer{}
exp := telemetry.Exporter{Endpoint: buf}

c, err := telemetry.NewCollector(telemetry.WithExporter(exp))
if err != nil {
t.Fatal(err)
}
c.Collect(context.Background())

want := "{VSCount:0 TSCount:0}"
got := buf.String()
if !cmp.Equal(want, got) {
t.Error(cmp.Diff(want, got))
}
}
Loading