diff --git a/Gopkg.lock b/Gopkg.lock index 35dd78d29f3..2afc965c714 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -639,11 +639,12 @@ revision = "583c0c0531f06d5278b7d917446061adc344b5cd" [[projects]] - digest = "1:5622c6893f87955f945c430461d41ebf6e26498214559cbaf93d284e2c8e9047" + digest = "1:bce7c290509e40fd1c73d700305c1961004d08c9a1812e47533416a8742893a7" name = "go.opencensus.io" packages = [ ".", "exemplar", + "exporter/prometheus", "internal", "internal/tagencoding", "plugin/ocgrpc", @@ -1285,6 +1286,10 @@ "github.com/nats-io/go-nats-streaming", "github.com/nats-io/nats-streaming-server/server", "github.com/prometheus/client_golang/prometheus/promhttp", + "go.opencensus.io/exporter/prometheus", + "go.opencensus.io/stats", + "go.opencensus.io/stats/view", + "go.opencensus.io/tag", "go.opencensus.io/trace", "go.uber.org/atomic", "go.uber.org/zap", diff --git a/cmd/broker/ingress/main.go b/cmd/broker/ingress/main.go index bf851d337f3..68c5d5161d4 100644 --- a/cmd/broker/ingress/main.go +++ b/cmd/broker/ingress/main.go @@ -20,10 +20,12 @@ import ( "context" "errors" "flag" + "fmt" "log" "net/http" "net/url" "os" + "sync" "time" "github.com/cloudevents/sdk-go/pkg/cloudevents" @@ -32,16 +34,27 @@ import ( eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "github.com/knative/eventing/pkg/broker" "github.com/knative/eventing/pkg/provisioners" + "github.com/knative/eventing/pkg/utils" "github.com/knative/pkg/signals" + "go.opencensus.io/exporter/prometheus" + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" "go.uber.org/zap" + _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" "sigs.k8s.io/controller-runtime/pkg/client/config" "sigs.k8s.io/controller-runtime/pkg/manager" + crlog "sigs.k8s.io/controller-runtime/pkg/runtime/log" ) var ( defaultPort = 8080 + metricsPort = 9090 - writeTimeout = 1 * time.Minute + writeTimeout = 1 * time.Minute + shutdownTimeout = 1 * time.Minute + + wg sync.WaitGroup ) func main() { @@ -49,6 +62,7 @@ func main() { logger := provisioners.NewProvisionerLoggerFromConfig(logConfig).Desugar() defer logger.Sync() flag.Parse() + crlog.SetLogger(crlog.ZapLogger(false)) logger.Info("Starting...") @@ -61,6 +75,8 @@ func main() { logger.Fatal("Unable to add eventingv1alpha1 scheme", zap.Error(err)) } + brokerName := getRequiredEnv("BROKER") + channelURI := &url.URL{ Scheme: "http", Host: getRequiredEnv("CHANNEL"), @@ -81,6 +97,7 @@ func main() { ceClient: ceClient, ceHTTP: ceHTTP, channelURI: channelURI, + brokerName: brokerName, } // Run the event handler with the manager. @@ -89,6 +106,30 @@ func main() { logger.Fatal("Unable to add handler", zap.Error(err)) } + // Metrics + e, err := prometheus.NewExporter(prometheus.Options{Namespace: metricsNamespace}) + if err != nil { + logger.Fatal("Unable to create Prometheus exporter", zap.Error(err)) + } + view.RegisterExporter(e) + sm := http.NewServeMux() + sm.Handle("/metrics", e) + metricsSrv := &http.Server{ + Addr: fmt.Sprintf(":%d", metricsPort), + Handler: e, + ErrorLog: zap.NewStdLog(logger), + WriteTimeout: writeTimeout, + } + + err = mgr.Add(&utils.RunnableServer{ + Server: metricsSrv, + ShutdownTimeout: shutdownTimeout, + WaitGroup: &wg, + }) + if err != nil { + logger.Fatal("Unable to add metrics runnableServer", zap.Error(err)) + } + // Set up signals so we handle the first shutdown signal gracefully. stopCh := signals.SetupSignalHandler() // Start blocks forever. @@ -97,7 +138,17 @@ func main() { } logger.Info("Exiting...") - // TODO Gracefully shutdown the server. CloudEvents SDK doesn't seem to let us do that today. + // TODO Gracefully shutdown the ingress server. CloudEvents SDK doesn't seem + // to let us do that today. + go func() { + <-time.After(shutdownTimeout) + log.Fatalf("Shutdown took longer than %v", shutdownTimeout) + }() + + // Wait for runnables to stop. This blocks indefinitely, but the above + // goroutine will exit the process if it takes longer than shutdownTimeout. + wg.Wait() + logger.Info("Done.") } func getRequiredEnv(envKey string) string { @@ -113,6 +164,7 @@ type handler struct { ceClient ceclient.Client ceHTTP *cehttp.Transport channelURI *url.URL + brokerName string } func (h *handler) Start(stopCh <-chan struct{}) error { @@ -138,7 +190,7 @@ func (h *handler) Start(stopCh <-chan struct{}) error { select { case err := <-errCh: return err - case <-time.After(writeTimeout): + case <-time.After(shutdownTimeout): return errors.New("timeout shutting down ceClient") } } @@ -156,13 +208,31 @@ func (h *handler) serveHTTP(ctx context.Context, event cloudevents.Event, resp * return nil } + ctx, _ = tag.New(ctx, tag.Insert(TagBroker, h.brokerName)) + defer func() { + stats.Record(ctx, MeasureMessagesTotal.M(1)) + }() + // TODO Filter. + ctx, _ = tag.New(ctx, tag.Insert(TagResult, "dispatched")) return h.sendEvent(ctx, tctx, event) } func (h *handler) sendEvent(ctx context.Context, tctx cehttp.TransportContext, event cloudevents.Event) error { sendingCTX := broker.SendingContext(ctx, tctx, h.channelURI) + + startTS := time.Now() + defer func() { + dispatchTimeMS := int64(time.Now().Sub(startTS) / time.Millisecond) + stats.Record(sendingCTX, MeasureDispatchTime.M(dispatchTimeMS)) + }() + _, err := h.ceHTTP.Send(sendingCTX, event) + if err != nil { + sendingCTX, _ = tag.New(sendingCTX, tag.Insert(TagResult, "error")) + } else { + sendingCTX, _ = tag.New(sendingCTX, tag.Insert(TagResult, "ok")) + } return err } diff --git a/cmd/broker/ingress/metrics.go b/cmd/broker/ingress/metrics.go new file mode 100644 index 00000000000..09bc41c1e72 --- /dev/null +++ b/cmd/broker/ingress/metrics.go @@ -0,0 +1,90 @@ +/* + * Copyright 2019 The Knative Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" +) + +const ( + metricsNamespace = "broker_ingress" +) + +var ( + // MeasureMessagesTotal is a counter which records the number of messages received + // by the ingress. The value of the Result tag indicates whether the message + // was filtered or dispatched. + MeasureMessagesTotal = stats.Int64( + "knative.dev/eventing/broker/ingress/measures/messages_total", + "Total number of messages received", + stats.UnitNone, + ) + + // MeasureDispatchTime records the time spent dispatching a message, in milliseconds. + MeasureDispatchTime = stats.Int64( + "knative.dev/eventing/broker/ingress/measures/dispatch_time", + "Time spent dispatching a message", + stats.UnitMilliseconds, + ) + + // Tag keys must conform to the restrictions described in + // go.opencensus.io/tag/validate.go. Currently those restrictions are: + // - length between 1 and 255 inclusive + // - characters are printable US-ASCII + + // TagResult is a tag key referring to the observed result of an operation. + TagResult = mustNewTagKey("result") + + // TagBroker is a tag key referring to the Broker name serviced by this + // ingress process. + TagBroker = mustNewTagKey("broker") +) + +func init() { + // Create views for exporting measurements. This returns an error if a + // previously registered view has the same name with a different value. + err := view.Register( + &view.View{ + Name: "messages_total", + Measure: MeasureMessagesTotal, + Aggregation: view.Count(), + TagKeys: []tag.Key{TagResult, TagBroker}, + }, + &view.View{ + Name: "dispatch_time", + Measure: MeasureDispatchTime, + Aggregation: view.Distribution(10, 100, 1000, 10000), + TagKeys: []tag.Key{TagResult, TagBroker}, + }, + ) + if err != nil { + panic(err) + } +} + +// mustNewTagKey creates a Tag or panics. This will only fail if the tag key +// doesn't conform to tag name validations. +// TODO OC library should provide this +func mustNewTagKey(k string) tag.Key { + tagKey, err := tag.NewKey(k) + if err != nil { + panic(err) + } + return tagKey +} diff --git a/docs/metrics.md b/docs/metrics.md new file mode 100644 index 00000000000..0820fd02a8c --- /dev/null +++ b/docs/metrics.md @@ -0,0 +1,10 @@ +# Metrics + +This is a list of metrics exported by Knative Eventing components. + +## Broker ingress + +| Name | Type | Description | Tags +| ---- | ---- | ---- | ---- | +| `messages_total` | count | Number of messages received. | `result`, `broker` | +| `dispatch_time` | histogram | Time to dispatch a message. | `result`, `broker` | diff --git a/pkg/reconciler/v1alpha1/broker/resources/ingress.go b/pkg/reconciler/v1alpha1/broker/resources/ingress.go index 1ebf8957cec..6a41b52d931 100644 --- a/pkg/reconciler/v1alpha1/broker/resources/ingress.go +++ b/pkg/reconciler/v1alpha1/broker/resources/ingress.go @@ -75,6 +75,20 @@ func MakeIngress(args *IngressArgs) *appsv1.Deployment { Name: "CHANNEL", Value: args.ChannelAddress, }, + { + Name: "BROKER", + Value: args.Broker.Name, + }, + }, + Ports: []corev1.ContainerPort{ + { + ContainerPort: 8080, + Name: "http", + }, + { + ContainerPort: 9090, + Name: "metrics", + }, }, }, }, @@ -106,6 +120,10 @@ func MakeIngressService(b *eventingv1alpha1.Broker) *corev1.Service { Port: 80, TargetPort: intstr.FromInt(8080), }, + { + Name: "metrics", + Port: 9090, + }, }, }, } diff --git a/pkg/utils/runnable_server.go b/pkg/utils/runnable_server.go new file mode 100644 index 00000000000..9cc499d6e19 --- /dev/null +++ b/pkg/utils/runnable_server.go @@ -0,0 +1,81 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package utils + +import ( + "context" + "net/http" + "sync" + "time" +) + +// RunnableServer is a small wrapper around http.Server so that it matches the +// manager.Runnable interface. +type RunnableServer struct { + // Server is the http.Server to wrap. + *http.Server + + // ServeFunc is the function used to start the http.Server. If nil, + // ListenAndServe() will be used. + ServeFunc func() error + + // ShutdownTimeout is the duration to wait for the http.Server to gracefully + // shut down when the stop channel is closed. If this is zero or negative, + // the http.Server will be immediately closed instead. + ShutdownTimeout time.Duration + + // WaitGroup is a temporary workaround for Manager returning immediately + // without waiting for Runnables to stop. See + // https://github.com/kubernetes-sigs/controller-runtime/issues/350. + WaitGroup *sync.WaitGroup +} + +// Start the server. The server will be shut down when StopCh is closed. +func (r *RunnableServer) Start(stopCh <-chan struct{}) error { + + errCh := make(chan error) + + if r.WaitGroup != nil { + r.WaitGroup.Add(1) + defer r.WaitGroup.Done() + } + + if r.ServeFunc == nil { + r.ServeFunc = r.Server.ListenAndServe + } + + go func() { + err := r.ServeFunc() + if err != http.ErrServerClosed { + errCh <- err + } + }() + + var err error + select { + case <-stopCh: + if r.ShutdownTimeout > 0 { + ctx, cancel := context.WithTimeout(context.Background(), r.ShutdownTimeout) + defer cancel() + err = r.Server.Shutdown(ctx) + } else { + err = r.Server.Close() + } + case err = <-errCh: + } + return err +} diff --git a/pkg/utils/runnable_server_test.go b/pkg/utils/runnable_server_test.go new file mode 100644 index 00000000000..95ff5828486 --- /dev/null +++ b/pkg/utils/runnable_server_test.go @@ -0,0 +1,148 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package utils + +import ( + "net" + "net/http" + "testing" + "time" +) + +func NewRunnableServer() (*RunnableServer, error) { + l, err := net.Listen("tcp", ":0") + if err != nil { + return nil, err + } + + s := &http.Server{ + Addr: l.Addr().String(), + Handler: http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + }), + } + + rs := &RunnableServer{ + Server: s, + ServeFunc: func() error { return s.Serve(l) }, + } + + return rs, nil +} + +func TestRunnableServerCallsShutdown(t *testing.T) { + rs, err := NewRunnableServer() + if err != nil { + t.Fatalf("error creating runnableServer: %v", err) + } + + rs.ShutdownTimeout = time.Second + shutdownCh := make(chan struct{}) + rs.Server.RegisterOnShutdown(func() { + close(shutdownCh) + }) + + stopCh := make(chan struct{}) + go func() { + if err := rs.Start(stopCh); err != nil { + t.Errorf("Error returned from Start: %v", err) + } + }() + + rsp, err := http.Get("http://" + rs.Addr) + if err != nil { + t.Errorf("error making request: %v", err) + } + if rsp.StatusCode != 200 { + t.Errorf("expected response code 200, got %d", rsp.StatusCode) + } + + close(stopCh) + + select { + case <-time.After(time.Second): + t.Errorf("Expected server to have shut down by now") + case <-shutdownCh: + } +} + +func TestRunnableServerShutdownContext(t *testing.T) { + rs, err := NewRunnableServer() + if err != nil { + t.Fatalf("error creating runnableServer: %v", err) + } + + rs.ShutdownTimeout = time.Millisecond * 10 + rs.Server.Handler = http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + time.Sleep(time.Second * 10) + w.WriteHeader(http.StatusForbidden) + }) + + stopCh := make(chan struct{}) + stoppedCh := make(chan struct{}) + go func() { + if err := rs.Start(stopCh); err == nil { + t.Errorf("Expected context deadline exceeded error from Start but got nil") + } + close(stoppedCh) + }() + + go func() { + http.Get("http://" + rs.Addr) + }() + + // Give the request time to start + time.Sleep(time.Millisecond * 50) + shutdownCh := time.After(time.Millisecond * 20) + + close(stopCh) + + select { + case <-shutdownCh: + t.Errorf("Expected shutdown to complete before the timeout") + case <-stoppedCh: + } +} + +func TestRunnableServerCallsClose(t *testing.T) { + rs, err := NewRunnableServer() + if err != nil { + t.Fatalf("error creating runnableServer: %v", err) + } + + stopCh := make(chan struct{}) + go func() { + if err := rs.Start(stopCh); err != nil { + t.Errorf("Error returned from Start: %v", err) + } + }() + + rsp, err := http.Get("http://" + rs.Addr) + if err != nil { + t.Errorf("error making request: %v", err) + } + if rsp.StatusCode != 200 { + t.Errorf("expected response code 200, got %d", rsp.StatusCode) + } + + close(stopCh) + time.Sleep(time.Millisecond * 10) + + if err := rs.Server.ListenAndServe(); err != http.ErrServerClosed { + t.Errorf("Expected server to have closed by now: %v", err) + } +} diff --git a/vendor/go.opencensus.io/exporter/prometheus/prometheus.go b/vendor/go.opencensus.io/exporter/prometheus/prometheus.go new file mode 100644 index 00000000000..50665dcb1ee --- /dev/null +++ b/vendor/go.opencensus.io/exporter/prometheus/prometheus.go @@ -0,0 +1,308 @@ +// Copyright 2017, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package prometheus contains a Prometheus exporter that supports exporting +// OpenCensus views as Prometheus metrics. +package prometheus // import "go.opencensus.io/exporter/prometheus" + +import ( + "bytes" + "fmt" + "log" + "net/http" + "sort" + "sync" + + "go.opencensus.io/internal" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +// Exporter exports stats to Prometheus, users need +// to register the exporter as an http.Handler to be +// able to export. +type Exporter struct { + opts Options + g prometheus.Gatherer + c *collector + handler http.Handler +} + +// Options contains options for configuring the exporter. +type Options struct { + Namespace string + Registry *prometheus.Registry + OnError func(err error) +} + +// NewExporter returns an exporter that exports stats to Prometheus. +func NewExporter(o Options) (*Exporter, error) { + if o.Registry == nil { + o.Registry = prometheus.NewRegistry() + } + collector := newCollector(o, o.Registry) + e := &Exporter{ + opts: o, + g: o.Registry, + c: collector, + handler: promhttp.HandlerFor(o.Registry, promhttp.HandlerOpts{}), + } + return e, nil +} + +var _ http.Handler = (*Exporter)(nil) +var _ view.Exporter = (*Exporter)(nil) + +func (c *collector) registerViews(views ...*view.View) { + count := 0 + for _, view := range views { + sig := viewSignature(c.opts.Namespace, view) + c.registeredViewsMu.Lock() + _, ok := c.registeredViews[sig] + c.registeredViewsMu.Unlock() + + if !ok { + desc := prometheus.NewDesc( + viewName(c.opts.Namespace, view), + view.Description, + tagKeysToLabels(view.TagKeys), + nil, + ) + c.registeredViewsMu.Lock() + c.registeredViews[sig] = desc + c.registeredViewsMu.Unlock() + count++ + } + } + if count == 0 { + return + } + + c.ensureRegisteredOnce() +} + +// ensureRegisteredOnce invokes reg.Register on the collector itself +// exactly once to ensure that we don't get errors such as +// cannot register the collector: descriptor Desc{fqName: *} +// already exists with the same fully-qualified name and const label values +// which is documented by Prometheus at +// https://github.com/prometheus/client_golang/blob/fcc130e101e76c5d303513d0e28f4b6d732845c7/prometheus/registry.go#L89-L101 +func (c *collector) ensureRegisteredOnce() { + c.registerOnce.Do(func() { + if err := c.reg.Register(c); err != nil { + c.opts.onError(fmt.Errorf("cannot register the collector: %v", err)) + } + }) + +} + +func (o *Options) onError(err error) { + if o.OnError != nil { + o.OnError(err) + } else { + log.Printf("Failed to export to Prometheus: %v", err) + } +} + +// ExportView exports to the Prometheus if view data has one or more rows. +// Each OpenCensus AggregationData will be converted to +// corresponding Prometheus Metric: SumData will be converted +// to Untyped Metric, CountData will be a Counter Metric, +// DistributionData will be a Histogram Metric. +func (e *Exporter) ExportView(vd *view.Data) { + if len(vd.Rows) == 0 { + return + } + e.c.addViewData(vd) +} + +// ServeHTTP serves the Prometheus endpoint. +func (e *Exporter) ServeHTTP(w http.ResponseWriter, r *http.Request) { + e.handler.ServeHTTP(w, r) +} + +// collector implements prometheus.Collector +type collector struct { + opts Options + mu sync.Mutex // mu guards all the fields. + + registerOnce sync.Once + + // reg helps collector register views dynamically. + reg *prometheus.Registry + + // viewData are accumulated and atomically + // appended to on every Export invocation, from + // stats. These views are cleared out when + // Collect is invoked and the cycle is repeated. + viewData map[string]*view.Data + + registeredViewsMu sync.Mutex + // registeredViews maps a view to a prometheus desc. + registeredViews map[string]*prometheus.Desc +} + +func (c *collector) addViewData(vd *view.Data) { + c.registerViews(vd.View) + sig := viewSignature(c.opts.Namespace, vd.View) + + c.mu.Lock() + c.viewData[sig] = vd + c.mu.Unlock() +} + +func (c *collector) Describe(ch chan<- *prometheus.Desc) { + c.registeredViewsMu.Lock() + registered := make(map[string]*prometheus.Desc) + for k, desc := range c.registeredViews { + registered[k] = desc + } + c.registeredViewsMu.Unlock() + + for _, desc := range registered { + ch <- desc + } +} + +// Collect fetches the statistics from OpenCensus +// and delivers them as Prometheus Metrics. +// Collect is invoked everytime a prometheus.Gatherer is run +// for example when the HTTP endpoint is invoked by Prometheus. +func (c *collector) Collect(ch chan<- prometheus.Metric) { + // We need a copy of all the view data up until this point. + viewData := c.cloneViewData() + + for _, vd := range viewData { + sig := viewSignature(c.opts.Namespace, vd.View) + c.registeredViewsMu.Lock() + desc := c.registeredViews[sig] + c.registeredViewsMu.Unlock() + + for _, row := range vd.Rows { + metric, err := c.toMetric(desc, vd.View, row) + if err != nil { + c.opts.onError(err) + } else { + ch <- metric + } + } + } + +} + +func (c *collector) toMetric(desc *prometheus.Desc, v *view.View, row *view.Row) (prometheus.Metric, error) { + switch data := row.Data.(type) { + case *view.CountData: + return prometheus.NewConstMetric(desc, prometheus.CounterValue, float64(data.Value), tagValues(row.Tags)...) + + case *view.DistributionData: + points := make(map[float64]uint64) + // Histograms are cumulative in Prometheus. + // 1. Sort buckets in ascending order but, retain + // their indices for reverse lookup later on. + // TODO: If there is a guarantee that distribution elements + // are always sorted, then skip the sorting. + indicesMap := make(map[float64]int) + buckets := make([]float64, 0, len(v.Aggregation.Buckets)) + for i, b := range v.Aggregation.Buckets { + if _, ok := indicesMap[b]; !ok { + indicesMap[b] = i + buckets = append(buckets, b) + } + } + sort.Float64s(buckets) + + // 2. Now that the buckets are sorted by magnitude + // we can create cumulative indicesmap them back by reverse index + cumCount := uint64(0) + for _, b := range buckets { + i := indicesMap[b] + cumCount += uint64(data.CountPerBucket[i]) + points[b] = cumCount + } + return prometheus.NewConstHistogram(desc, uint64(data.Count), data.Sum(), points, tagValues(row.Tags)...) + + case *view.SumData: + return prometheus.NewConstMetric(desc, prometheus.UntypedValue, data.Value, tagValues(row.Tags)...) + + case *view.LastValueData: + return prometheus.NewConstMetric(desc, prometheus.GaugeValue, data.Value, tagValues(row.Tags)...) + + default: + return nil, fmt.Errorf("aggregation %T is not yet supported", v.Aggregation) + } +} + +func tagKeysToLabels(keys []tag.Key) (labels []string) { + for _, key := range keys { + labels = append(labels, internal.Sanitize(key.Name())) + } + return labels +} + +func tagsToLabels(tags []tag.Tag) []string { + var names []string + for _, tag := range tags { + names = append(names, internal.Sanitize(tag.Key.Name())) + } + return names +} + +func newCollector(opts Options, registrar *prometheus.Registry) *collector { + return &collector{ + reg: registrar, + opts: opts, + registeredViews: make(map[string]*prometheus.Desc), + viewData: make(map[string]*view.Data), + } +} + +func tagValues(t []tag.Tag) []string { + var values []string + for _, t := range t { + values = append(values, t.Value) + } + return values +} + +func viewName(namespace string, v *view.View) string { + var name string + if namespace != "" { + name = namespace + "_" + } + return name + internal.Sanitize(v.Name) +} + +func viewSignature(namespace string, v *view.View) string { + var buf bytes.Buffer + buf.WriteString(viewName(namespace, v)) + for _, k := range v.TagKeys { + buf.WriteString("-" + k.Name()) + } + return buf.String() +} + +func (c *collector) cloneViewData() map[string]*view.Data { + c.mu.Lock() + defer c.mu.Unlock() + + viewDataCopy := make(map[string]*view.Data) + for sig, viewData := range c.viewData { + viewDataCopy[sig] = viewData + } + return viewDataCopy +}