Skip to content

Commit

Permalink
Basic Broker ingress metrics (#937)
Browse files Browse the repository at this point in the history
* Add a prometheus server to broker ingress

* correct method comment

* Add a metrics port to the broker's ingress Service

Ultimately access to this port might need different permissions than the
default (ingress) port.

* Always shut down runnableServer

If ShutdownTimeout is not positive, call Shutdown anyway without a
timeout.

* Add port values to the ingress container spec

These are informational, and more useful now that two ports are
exposed.

* Instrument message count and dispatch time

Also register the exporter so it can start serving metrics.

* Simplify runnableServer shutdown

No need to select since we're just waiting for the channel to be closed.

* Use a WaitGroup to stop runnableServers

Manager doesn't actually wait for Runnables to stop, so we need to add a
WaitGroup to wait for the HTTP Server Shutdown to complete. This is
hopefully temporary until
kubernetes-sigs/controller-runtime#350 is
fixed.

* Hook up internal controller-runtime logger

* Include GCP auth library

Running outside GCP seems to not work without this.

* Translate commented code into a directive

* Tag measurements with the broker name

The BROKER environment variable may contain the broker name. If
non-empty, measurements will be tagged with the given string.

* Make BROKER env var required

There's no use case for running this without an existing broker context,
so just require the BROKER name to be present.

* Add a separate shutdownTimeout var

Currently the same as writeTimeout, but allows for having separate write
and shutdown timeouts later.

* Add a shutdown timer for the waitgroup

wg.Wait() can block indefinitely. Adding a timer here ensures the
process shutdown time is bounded.

* Don't redeclare brokerName

We want to use the package var here.

* Move wg.Add outside the goroutine

This eliminates a case in which wg.Done could be called before wg.Add
depending on how the goroutine is scheduled.

* Use Fatalf instead of Fatal

* Update Gopkg.lock

* Get broker name from env var BROKER

It's now a struct field instead of a package var.

* Use ok instead of success

For consistency with HTTP error codes.

* Test RunnableServer

Tests ensure the server starts, responds to requests, and stops or
shuts down as requested.

* Set brokerName in handler struct

This was accidentally removed in a merge.

* Format and expand comments on shutdown behavior

* Remove unnecessary comments

* Update copyright year

* Add test to verify correct usage of context

Verifies that the context is correctly timing out shutdown.

* Remove logging from RunnableServer

Return the error instead and let the caller decide whether to log it.

The ShutdownContext test now occasionally flakes; still tracking that
down.

* Attempt to document metrics

Lists metrics exposed by Broker ingress and the port on which they're exposed.
As we add metrics to other components, we can list them in this file.

* Improve stability of shutdown test

Request goroutine needs a bit more time to start sometimes.

Removed Logf calls from goroutines since they sometimes happen
after the test completes, causing a panic.

Removed the error check from the http get for the same reason.

* Rename logf to crlog

Documents source of package more clearly.

* Remove obsolete logger field from RunnableServer
  • Loading branch information
grantr authored and knative-prow-robot committed Mar 25, 2019
1 parent eb69716 commit 87d1bf9
Show file tree
Hide file tree
Showing 8 changed files with 734 additions and 4 deletions.
7 changes: 6 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

76 changes: 73 additions & 3 deletions cmd/broker/ingress/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ import (
"context"
"errors"
"flag"
"fmt"
"log"
"net/http"
"net/url"
"os"
"sync"
"time"

"github.com/cloudevents/sdk-go/pkg/cloudevents"
Expand All @@ -32,23 +34,35 @@ import (
eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
"github.com/knative/eventing/pkg/broker"
"github.com/knative/eventing/pkg/provisioners"
"github.com/knative/eventing/pkg/utils"
"github.com/knative/pkg/signals"
"go.opencensus.io/exporter/prometheus"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"go.uber.org/zap"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
"sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/manager"
crlog "sigs.k8s.io/controller-runtime/pkg/runtime/log"
)

var (
defaultPort = 8080
metricsPort = 9090

writeTimeout = 1 * time.Minute
writeTimeout = 1 * time.Minute
shutdownTimeout = 1 * time.Minute

wg sync.WaitGroup
)

func main() {
logConfig := provisioners.NewLoggingConfig()
logger := provisioners.NewProvisionerLoggerFromConfig(logConfig).Desugar()
defer logger.Sync()
flag.Parse()
crlog.SetLogger(crlog.ZapLogger(false))

logger.Info("Starting...")

Expand All @@ -61,6 +75,8 @@ func main() {
logger.Fatal("Unable to add eventingv1alpha1 scheme", zap.Error(err))
}

brokerName := getRequiredEnv("BROKER")

channelURI := &url.URL{
Scheme: "http",
Host: getRequiredEnv("CHANNEL"),
Expand All @@ -81,6 +97,7 @@ func main() {
ceClient: ceClient,
ceHTTP: ceHTTP,
channelURI: channelURI,
brokerName: brokerName,
}

// Run the event handler with the manager.
Expand All @@ -89,6 +106,30 @@ func main() {
logger.Fatal("Unable to add handler", zap.Error(err))
}

// Metrics
e, err := prometheus.NewExporter(prometheus.Options{Namespace: metricsNamespace})
if err != nil {
logger.Fatal("Unable to create Prometheus exporter", zap.Error(err))
}
view.RegisterExporter(e)
sm := http.NewServeMux()
sm.Handle("/metrics", e)
metricsSrv := &http.Server{
Addr: fmt.Sprintf(":%d", metricsPort),
Handler: e,
ErrorLog: zap.NewStdLog(logger),
WriteTimeout: writeTimeout,
}

err = mgr.Add(&utils.RunnableServer{
Server: metricsSrv,
ShutdownTimeout: shutdownTimeout,
WaitGroup: &wg,
})
if err != nil {
logger.Fatal("Unable to add metrics runnableServer", zap.Error(err))
}

// Set up signals so we handle the first shutdown signal gracefully.
stopCh := signals.SetupSignalHandler()
// Start blocks forever.
Expand All @@ -97,7 +138,17 @@ func main() {
}
logger.Info("Exiting...")

// TODO Gracefully shutdown the server. CloudEvents SDK doesn't seem to let us do that today.
// TODO Gracefully shutdown the ingress server. CloudEvents SDK doesn't seem
// to let us do that today.
go func() {
<-time.After(shutdownTimeout)
log.Fatalf("Shutdown took longer than %v", shutdownTimeout)
}()

// Wait for runnables to stop. This blocks indefinitely, but the above
// goroutine will exit the process if it takes longer than shutdownTimeout.
wg.Wait()
logger.Info("Done.")
}

func getRequiredEnv(envKey string) string {
Expand All @@ -113,6 +164,7 @@ type handler struct {
ceClient ceclient.Client
ceHTTP *cehttp.Transport
channelURI *url.URL
brokerName string
}

func (h *handler) Start(stopCh <-chan struct{}) error {
Expand All @@ -138,7 +190,7 @@ func (h *handler) Start(stopCh <-chan struct{}) error {
select {
case err := <-errCh:
return err
case <-time.After(writeTimeout):
case <-time.After(shutdownTimeout):
return errors.New("timeout shutting down ceClient")
}
}
Expand All @@ -156,13 +208,31 @@ func (h *handler) serveHTTP(ctx context.Context, event cloudevents.Event, resp *
return nil
}

ctx, _ = tag.New(ctx, tag.Insert(TagBroker, h.brokerName))
defer func() {
stats.Record(ctx, MeasureMessagesTotal.M(1))
}()

// TODO Filter.

ctx, _ = tag.New(ctx, tag.Insert(TagResult, "dispatched"))
return h.sendEvent(ctx, tctx, event)
}

func (h *handler) sendEvent(ctx context.Context, tctx cehttp.TransportContext, event cloudevents.Event) error {
sendingCTX := broker.SendingContext(ctx, tctx, h.channelURI)

startTS := time.Now()
defer func() {
dispatchTimeMS := int64(time.Now().Sub(startTS) / time.Millisecond)
stats.Record(sendingCTX, MeasureDispatchTime.M(dispatchTimeMS))
}()

_, err := h.ceHTTP.Send(sendingCTX, event)
if err != nil {
sendingCTX, _ = tag.New(sendingCTX, tag.Insert(TagResult, "error"))
} else {
sendingCTX, _ = tag.New(sendingCTX, tag.Insert(TagResult, "ok"))
}
return err
}
90 changes: 90 additions & 0 deletions cmd/broker/ingress/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright 2019 The Knative Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package main

import (
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
)

const (
metricsNamespace = "broker_ingress"
)

var (
// MeasureMessagesTotal is a counter which records the number of messages received
// by the ingress. The value of the Result tag indicates whether the message
// was filtered or dispatched.
MeasureMessagesTotal = stats.Int64(
"knative.dev/eventing/broker/ingress/measures/messages_total",
"Total number of messages received",
stats.UnitNone,
)

// MeasureDispatchTime records the time spent dispatching a message, in milliseconds.
MeasureDispatchTime = stats.Int64(
"knative.dev/eventing/broker/ingress/measures/dispatch_time",
"Time spent dispatching a message",
stats.UnitMilliseconds,
)

// Tag keys must conform to the restrictions described in
// go.opencensus.io/tag/validate.go. Currently those restrictions are:
// - length between 1 and 255 inclusive
// - characters are printable US-ASCII

// TagResult is a tag key referring to the observed result of an operation.
TagResult = mustNewTagKey("result")

// TagBroker is a tag key referring to the Broker name serviced by this
// ingress process.
TagBroker = mustNewTagKey("broker")
)

func init() {
// Create views for exporting measurements. This returns an error if a
// previously registered view has the same name with a different value.
err := view.Register(
&view.View{
Name: "messages_total",
Measure: MeasureMessagesTotal,
Aggregation: view.Count(),
TagKeys: []tag.Key{TagResult, TagBroker},
},
&view.View{
Name: "dispatch_time",
Measure: MeasureDispatchTime,
Aggregation: view.Distribution(10, 100, 1000, 10000),
TagKeys: []tag.Key{TagResult, TagBroker},
},
)
if err != nil {
panic(err)
}
}

// mustNewTagKey creates a Tag or panics. This will only fail if the tag key
// doesn't conform to tag name validations.
// TODO OC library should provide this
func mustNewTagKey(k string) tag.Key {
tagKey, err := tag.NewKey(k)
if err != nil {
panic(err)
}
return tagKey
}
10 changes: 10 additions & 0 deletions docs/metrics.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Metrics

This is a list of metrics exported by Knative Eventing components.

## Broker ingress

| Name | Type | Description | Tags
| ---- | ---- | ---- | ---- |
| `messages_total` | count | Number of messages received. | `result`, `broker` |
| `dispatch_time` | histogram | Time to dispatch a message. | `result`, `broker` |
18 changes: 18 additions & 0 deletions pkg/reconciler/v1alpha1/broker/resources/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,20 @@ func MakeIngress(args *IngressArgs) *appsv1.Deployment {
Name: "CHANNEL",
Value: args.ChannelAddress,
},
{
Name: "BROKER",
Value: args.Broker.Name,
},
},
Ports: []corev1.ContainerPort{
{
ContainerPort: 8080,
Name: "http",
},
{
ContainerPort: 9090,
Name: "metrics",
},
},
},
},
Expand Down Expand Up @@ -106,6 +120,10 @@ func MakeIngressService(b *eventingv1alpha1.Broker) *corev1.Service {
Port: 80,
TargetPort: intstr.FromInt(8080),
},
{
Name: "metrics",
Port: 9090,
},
},
},
}
Expand Down
Loading

0 comments on commit 87d1bf9

Please sign in to comment.