Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat. X-Ray - Real time stream debugging for Flow components #6045

Closed
wants to merge 33 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
356458d
Commit code so I can revert everything else
tpaschalis Dec 1, 2023
3128fea
Use non-compressed handler; remove dead code
tpaschalis Dec 1, 2023
b066d43
debug stream to ui
wildum Dec 4, 2023
babebc7
merge main
wildum Dec 4, 2023
3eed9d4
cleanup
wildum Dec 4, 2023
2e049d7
add debug hook to scrape, remote_write, discoveryfile
wildum Dec 4, 2023
b357de7
Add button to UI
jcreixell Dec 5, 2023
9dd4217
wip otel
wildum Dec 5, 2023
66a4735
Merge branch 'hackathon-xray-web-stream' of github.com:grafana/agent …
wildum Dec 5, 2023
4e1fa64
receivers and exporters
wildum Dec 5, 2023
be40348
Tweak button style
jcreixell Dec 5, 2023
ebab84c
Tweak UI and limit buffer size
jcreixell Dec 5, 2023
4c325d0
change name in ui
wildum Dec 6, 2023
8e6630c
support all exporters
wildum Dec 6, 2023
497331c
support discover processor otel
wildum Dec 6, 2023
3a56b01
Add controls to stream UI
jcreixell Dec 6, 2023
311c628
fix stop resume buttons
wildum Dec 6, 2023
5615c07
add some style to the buttons
wildum Dec 6, 2023
4c27f31
add download button
wildum Dec 6, 2023
7481d44
format the debug stream line
wildum Dec 6, 2023
e86910f
Use auto-scroller component and improve style
jcreixell Dec 6, 2023
9adab81
add sampling feature
wildum Dec 7, 2023
241e763
separate blocks log lines
wildum Dec 7, 2023
46a30b5
add filter feature
wildum Dec 7, 2023
ba69c81
Tweak css
jcreixell Dec 7, 2023
b6e9ddf
Merge branch 'main' into hackathon-xray-web-stream
jcreixell Jan 4, 2024
42a1c13
Delete tmp.yaml
jcreixell Jan 4, 2024
389f0e2
Delete yarn.lock
jcreixell Jan 4, 2024
803cd24
Add initial implementation of x-ray service
jcreixell Jan 5, 2024
f61a967
Simplify callback
jcreixell Jan 5, 2024
9bd6224
Migrate supported otel components to xray service
jcreixell Jan 5, 2024
7d25a5d
Merge branch 'main' into hackathon-xray-web-stream
jcreixell Jan 15, 2024
489dff5
Merge branch 'main' into hackathon-xray-web-stream
jcreixell Feb 8, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cmd/internal/flowmode/cmd_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/grafana/agent/service/labelstore"
otel_service "github.com/grafana/agent/service/otel"
uiservice "github.com/grafana/agent/service/ui"
"github.com/grafana/agent/service/xray"
"github.com/grafana/ckit/advertise"
"github.com/grafana/ckit/peer"
"github.com/grafana/river/diag"
Expand Down Expand Up @@ -243,9 +244,12 @@ func (fr *flowRun) Run(configPath string) error {
EnablePProf: fr.enablePprof,
})

xrayService := xray.New(l)

uiService := uiservice.New(uiservice.Options{
UIPrefix: fr.uiPrefix,
Cluster: clusterService.Data().(cluster.Cluster),
Xray: xrayService,
})

otelService := otel_service.New(l)
Expand All @@ -263,6 +267,7 @@ func (fr *flowRun) Run(configPath string) error {
Reg: reg,
Services: []service.Service{
httpService,
xrayService,
uiService,
clusterService,
otelService,
Expand Down
14 changes: 14 additions & 0 deletions component/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package discovery

import (
"context"
"fmt"
"sort"
"strings"
"sync"
"time"

"github.com/grafana/agent/component"
"github.com/grafana/agent/service/cluster"
"github.com/grafana/agent/service/xray"
"github.com/grafana/ckit/shard"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/discovery"
Expand Down Expand Up @@ -108,17 +110,25 @@ type Component struct {
discMut sync.Mutex
latestDisc discovery.Discoverer
newDiscoverer chan struct{}
xray *xray.Service

creator Creator
}

// New creates a discovery component given arguments and a concrete Discovery implementation function.
func New(o component.Options, args component.Arguments, creator Creator) (*Component, error) {
data, err := o.GetServiceData(xray.ServiceName)
if err != nil {
return nil, fmt.Errorf("failed to get information about X-Ray service: %w", err)
}
xray := data.(*xray.Service)

c := &Component{
opts: o,
creator: creator,
// buffered to avoid deadlock from the first immediate update
newDiscoverer: make(chan struct{}, 1),
xray: xray,
}
return c, c.Update(args)
}
Expand Down Expand Up @@ -198,6 +208,10 @@ func (c *Component) runDiscovery(ctx context.Context, d Discoverer) {
allTargets = append(allTargets, labels)
}
}

if ds := c.xray.GetDebugStream(c.opts.ID); ds != nil {
ds(fmt.Sprintf("%s", allTargets))
}
c.opts.OnStateChange(Exports{Targets: allTargets})
}

Expand Down
17 changes: 14 additions & 3 deletions component/otelcol/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,18 @@ package exporter
import (
"context"
"errors"
"fmt"
"os"

"github.com/grafana/agent/component"
"github.com/grafana/agent/component/otelcol"
"github.com/grafana/agent/component/otelcol/internal/lazycollector"
"github.com/grafana/agent/component/otelcol/internal/lazyconsumer"
"github.com/grafana/agent/component/otelcol/internal/lazyexporterconsumer"
"github.com/grafana/agent/component/otelcol/internal/scheduler"
"github.com/grafana/agent/component/otelcol/internal/views"
"github.com/grafana/agent/pkg/build"
"github.com/grafana/agent/pkg/util/zapadapter"
"github.com/grafana/agent/service/xray"
"github.com/prometheus/client_golang/prometheus"
otelcomponent "go.opentelemetry.io/collector/component"
otelexporter "go.opentelemetry.io/collector/exporter"
Expand Down Expand Up @@ -79,7 +81,7 @@ type Exporter struct {

opts component.Options
factory otelexporter.Factory
consumer *lazyconsumer.Consumer
consumer *lazyexporterconsumer.Consumer

sched *scheduler.Scheduler
collector *lazycollector.Collector
Expand All @@ -101,9 +103,18 @@ var (
// The registered component must be registered to export the
// otelcol.ConsumerExports type, otherwise New will panic.
func New(opts component.Options, f otelexporter.Factory, args Arguments, supportedSignals TypeSignal) (*Exporter, error) {
data, err := opts.GetServiceData(xray.ServiceName)
if err != nil {
return nil, fmt.Errorf("failed to get information about X-Ray service: %w", err)
}
xray := data.(*xray.Service)
debugStreamCallback := func() func(string) {
return xray.GetDebugStream(opts.ID)
}

ctx, cancel := context.WithCancel(context.Background())

consumer := lazyconsumer.New(ctx)
consumer := lazyexporterconsumer.New(ctx, debugStreamCallback)

// Create a lazy collector where metrics from the upstream component will be
// forwarded.
Expand Down
43 changes: 38 additions & 5 deletions component/otelcol/exporter/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@ package loki

import (
"context"
"fmt"
"time"

"github.com/go-kit/log"
"github.com/grafana/agent/component"
"github.com/grafana/agent/component/common/loki"
"github.com/grafana/agent/component/otelcol"
"github.com/grafana/agent/component/otelcol/exporter/loki/internal/convert"
"github.com/grafana/agent/component/otelcol/internal/lazyconsumer"
"github.com/grafana/agent/service/xray"
)

func init() {
Expand All @@ -34,20 +37,34 @@ type Component struct {
log log.Logger
opts component.Options

converter *convert.Converter
converter *convert.Converter
logsReceiverStreamDebug *logsReceiverStreamDebug
xray *xray.Service
}

var _ component.Component = (*Component)(nil)
var (
_ component.Component = (*Component)(nil)
)

// New creates a new otelcol.exporter.loki component.
func New(o component.Options, c Arguments) (*Component, error) {
converter := convert.New(o.Logger, o.Registerer, c.ForwardTo)

data, err := o.GetServiceData(xray.ServiceName)
if err != nil {
return nil, fmt.Errorf("failed to get information about X-Ray service: %w", err)
}
xray := data.(*xray.Service)

res := &Component{
log: o.Logger,
opts: o,

converter: converter,
logsReceiverStreamDebug: &logsReceiverStreamDebug{
entries: make(chan loki.Entry),
},
xray: xray,
}
if err := res.Update(c); err != nil {
return nil, err
Expand All @@ -65,13 +82,29 @@ func New(o component.Options, c Arguments) (*Component, error) {

// Run implements Component.
func (c *Component) Run(ctx context.Context) error {
<-ctx.Done()
return nil
for {
select {
case <-ctx.Done():
return nil
case entry := <-c.logsReceiverStreamDebug.Chan():
if ds := c.xray.GetDebugStream(c.opts.ID); ds != nil {
ds(fmt.Sprintf("ts=%s, labels=%s, entry=%s", entry.Timestamp.Format(time.RFC3339Nano), entry.Labels.String(), entry.Line))
}
}
}
}

// Update implements Component.
func (c *Component) Update(newConfig component.Arguments) error {
cfg := newConfig.(Arguments)
c.converter.UpdateFanout(cfg.ForwardTo)
c.converter.UpdateFanout(append(cfg.ForwardTo, c.logsReceiverStreamDebug))
return nil
}

type logsReceiverStreamDebug struct {
entries chan loki.Entry
}

func (l *logsReceiverStreamDebug) Chan() chan loki.Entry {
return l.entries
}
69 changes: 58 additions & 11 deletions component/otelcol/exporter/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ import (
"github.com/grafana/agent/component/otelcol/internal/lazyconsumer"
"github.com/grafana/agent/component/prometheus"
"github.com/grafana/agent/service/labelstore"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/storage"
)

Expand Down Expand Up @@ -72,8 +76,9 @@ type Component struct {
fanout *prometheus.Fanout
converter *convert.Converter

mut sync.RWMutex
cfg Arguments
mut sync.RWMutex
cfg Arguments
debugStreamCallback func(func() string)
}

var _ component.Component = (*Component)(nil)
Expand All @@ -85,17 +90,55 @@ func New(o component.Options, c Arguments) (*Component, error) {
return nil, err
}
ls := service.(labelstore.LabelStore)
fanout := prometheus.NewFanout(nil, o.ID, o.Registerer, ls)

converter := convert.New(o.Logger, fanout, convertArgumentsToConvertOptions(c))

res := &Component{
log: o.Logger,
opts: o,

fanout: fanout,
converter: converter,
log: o.Logger,
opts: o,
debugStreamCallback: func(func() string) {},
}

res.fanout = prometheus.NewFanout(nil, o.ID, o.Registerer, ls)

interceptor := prometheus.NewInterceptor(res.fanout, ls,
prometheus.WithAppendHook(func(globalRef storage.SeriesRef, l labels.Labels, t int64, v float64, next storage.Appender) (storage.SeriesRef, error) {
localID := ls.GetLocalRefID(res.opts.ID, uint64(globalRef))
_, nextErr := next.Append(storage.SeriesRef(localID), l, t, v)
res.debugStreamCallback(func() string { return fmt.Sprintf("ts=%d, labels=%s, value=%f", t, l, v) })
return globalRef, nextErr
}),
prometheus.WithHistogramHook(func(globalRef storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram, next storage.Appender) (storage.SeriesRef, error) {
localID := ls.GetLocalRefID(res.opts.ID, uint64(globalRef))
_, nextErr := next.AppendHistogram(storage.SeriesRef(localID), l, t, h, fh)
res.debugStreamCallback(func() string {
if h != nil {
return fmt.Sprintf("ts=%d, labels=%s, histogram=%s", t, l, h.String())
} else if fh != nil {
return fmt.Sprintf("ts=%d, labels=%s, float_histogram=%s", t, l, fh.String())
}
return fmt.Sprintf("ts=%d, labels=%s, no_value", t, l)
})
return globalRef, nextErr
}),
prometheus.WithMetadataHook(func(globalRef storage.SeriesRef, l labels.Labels, m metadata.Metadata, next storage.Appender) (storage.SeriesRef, error) {
localID := ls.GetLocalRefID(res.opts.ID, uint64(globalRef))
_, nextErr := next.UpdateMetadata(storage.SeriesRef(localID), l, m)
res.debugStreamCallback(func() string {
return fmt.Sprintf("labels=%s, type=%s, unit=%s, help=%s", l, m.Type, m.Unit, m.Help)
})
return globalRef, nextErr
}),
prometheus.WithExemplarHook(func(globalRef storage.SeriesRef, l labels.Labels, e exemplar.Exemplar, next storage.Appender) (storage.SeriesRef, error) {
localID := ls.GetLocalRefID(res.opts.ID, uint64(globalRef))
_, nextErr := next.AppendExemplar(storage.SeriesRef(localID), l, e)
res.debugStreamCallback(func() string {
return fmt.Sprintf("ts=%d, labels=%s, exemplar_labels=%s, value=%f", e.Ts, l, e.Labels, e.Value)
})
return globalRef, nextErr
}),
)

res.converter = convert.New(o.Logger, interceptor, convertArgumentsToConvertOptions(c))

if err := res.Update(c); err != nil {
return nil, err
}
Expand All @@ -104,7 +147,7 @@ func New(o component.Options, c Arguments) (*Component, error) {
// remain the same throughout the component's lifetime, so we do this during
// component construction.
export := lazyconsumer.New(context.Background())
export.SetConsumers(nil, converter, nil)
export.SetConsumers(nil, res.converter, nil)
o.OnStateChange(otelcol.ConsumerExports{Input: export})

return res, nil
Expand Down Expand Up @@ -159,3 +202,7 @@ func convertArgumentsToConvertOptions(args Arguments) convert.Options {
ResourceToTelemetryConversion: args.ResourceToTelemetryConversion,
}
}

func (c *Component) HookDebugStream(active bool, debugStreamCallback func(computeDataFunc func() string)) {
c.debugStreamCallback = debugStreamCallback
}
Loading
Loading