Skip to content

Commit

Permalink
Implement option emit_events_as_otel_logs (via env var `EMIT_EVENTS…
Browse files Browse the repository at this point in the history
…_AS_OTEL_LOGS`) to send loggregator events to OTL logs service
  • Loading branch information
jriguera committed Jul 15, 2024
1 parent 9867125 commit 5bbdef7
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 7 deletions.
1 change: 1 addition & 0 deletions jobs/loggr-forwarder-agent-windows/monit
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
"AGENT_TAGS" => tags.map { |k, v| "#{k}:#{v}" }.join(","),
"DOWNSTREAM_INGRESS_PORT_GLOB" => p("downstream_ingress_port_glob"),
"EMIT_OTEL_TRACES" => "#{p("temporary_emit_otel_traces")}",
"EMIT_EVENTS_AS_OTEL_LOGS" => "#{p("emit_events_as_otel_logs")}",
"METRICS_PORT" => "#{p("metrics.port")}",
"METRICS_CA_FILE_PATH" => "#{certs_dir}/metrics_ca.crt",
"METRICS_CERT_FILE_PATH" => "#{certs_dir}/metrics.crt",
Expand Down
4 changes: 4 additions & 0 deletions jobs/loggr-forwarder-agent-windows/spec
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ properties:
description: "Whether to emit traces to OpenTelemetry Collector downstream consumers"
default: false

emit_events_as_otel_logs:
description: "Whether to emit events as logs to OpenTelemetry Collector downstream consumers"
default: false

tls.ca_cert:
description: |
TLS loggregator root CA certificate. It is required for key/cert
Expand Down
4 changes: 4 additions & 0 deletions jobs/loggr-forwarder-agent/spec
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ properties:
description: "Whether to emit traces to OpenTelemetry Collector downstream consumers"
default: false

emit_events_as_otel_logs:
description: "Whether to emit events as logs to OpenTelemetry Collector downstream consumers"
default: false

tls.ca_cert:
description: |
TLS loggregator root CA certificate. It is required for key/cert
Expand Down
1 change: 1 addition & 0 deletions jobs/loggr-forwarder-agent/templates/bpm.yml.erb
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

"DOWNSTREAM_INGRESS_PORT_GLOB" => p("downstream_ingress_port_glob"),
"EMIT_OTEL_TRACES" => p("temporary_emit_otel_traces"),
"EMIT_EVENTS_AS_OTEL_LOGS" => p("emit_events_as_otel_logs"),

"METRICS_PORT" => "#{p("metrics.port")}",
"METRICS_CA_FILE_PATH" => "#{certs_dir}/metrics_ca.crt",
Expand Down
1 change: 1 addition & 0 deletions src/cmd/forwarder-agent/app/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Config struct {
Tags map[string]string `env:"AGENT_TAGS"`
DebugMetrics bool `env:"DEBUG_METRICS, report"`
EmitOTelTraces bool `env:"EMIT_OTEL_TRACES, report"`
EmitEventsAsOTelLogs bool `env:"EMIT_EVENTS_AS_OTEL_LOGS, report"`
}

// LoadConfig will load the configuration for the forwarder agent from the
Expand Down
12 changes: 7 additions & 5 deletions src/cmd/forwarder-agent/app/forwarder_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type ForwarderAgent struct {
tags map[string]string
debugMetrics bool
emitOTelTraces bool
emitEventsAsOTelLogs bool
}

type Metrics interface {
Expand Down Expand Up @@ -73,6 +74,7 @@ func NewForwarderAgent(
tags: cfg.Tags,
debugMetrics: cfg.MetricsServer.DebugMetrics,
emitOTelTraces: cfg.EmitOTelTraces,
emitEventsAsOTelLogs: cfg.EmitEventsAsOTelLogs,
}
}

Expand All @@ -96,7 +98,7 @@ func (s *ForwarderAgent) Run() {
}))

dests := downstreamDestinations(s.downstreamFilePattern, s.log)
writers := downstreamWriters(dests, s.grpc, s.m, s.emitOTelTraces, s.log)
writers := downstreamWriters(dests, s.grpc, s.m, s.emitOTelTraces, s.emitEventsAsOTelLogs, s.log)
tagger := egress_v2.NewTagger(s.tags)
ew := egress_v2.NewEnvelopeWriter(
multiWriter{writers: writers},
Expand Down Expand Up @@ -209,13 +211,13 @@ func downstreamDestinations(pattern string, l *log.Logger) []destination {
return dests
}

func downstreamWriters(dests []destination, grpc GRPC, m Metrics, emitOTelTraces bool, l *log.Logger) []Writer {
func downstreamWriters(dests []destination, grpc GRPC, m Metrics, emitOTelTraces, emitEventsAsOTelLogs bool, l *log.Logger) []Writer {
var writers []Writer
for _, d := range dests {
var w Writer
switch d.Protocol {
case "otelcol":
w = otelCollectorClient(d, grpc, m, emitOTelTraces, l)
w = otelCollectorClient(d, grpc, m, emitOTelTraces, emitEventsAsOTelLogs, l)
default:
w = loggregatorClient(d, grpc, m, l)
}
Expand All @@ -224,7 +226,7 @@ func downstreamWriters(dests []destination, grpc GRPC, m Metrics, emitOTelTraces
return writers
}

func otelCollectorClient(dest destination, grpc GRPC, m Metrics, emitTraces bool, l *log.Logger) Writer {
func otelCollectorClient(dest destination, grpc GRPC, m Metrics, emitTraces, emitEvents bool, l *log.Logger) Writer {
clientCreds, err := tlsconfig.Build(
tlsconfig.WithInternalServiceDefaults(),
tlsconfig.WithIdentityFromFile(grpc.CertFile, grpc.KeyFile),
Expand Down Expand Up @@ -252,7 +254,7 @@ func otelCollectorClient(dest destination, grpc GRPC, m Metrics, emitTraces bool
}),
)

dw := egress.NewDiodeWriter(context.Background(), otelcolclient.New(w, emitTraces), gendiodes.AlertFunc(func(missed int) {
dw := egress.NewDiodeWriter(context.Background(), otelcolclient.New(w, emitTraces, emitEvents), gendiodes.AlertFunc(func(missed int) {
expired.Add(float64(missed))
}), timeoutwaitgroup.New(time.Minute))

Expand Down
75 changes: 73 additions & 2 deletions src/cmd/forwarder-agent/app/forwarder_agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,21 @@ var _ = Describe("App", func() {
otelTraceServer = startSpyOtelColTraceServer(ingressCfgPath, agentCerts, "otel-collector")
otelLogsServer = startSpyOtelColLogServer(ingressCfgPath, agentCerts, "otel-collector")
agentCfg.EmitOTelTraces = true
agentCfg.EmitEventsAsOTelLogs = true
})

JustBeforeEach(func() {
// Because the event being sent JustBeforeEach in the main test, channels need to be emptied
for len(otelMetricsServer.requests) > 0 {
<-otelMetricsServer.requests
}
for len(otelTraceServer.requests) > 0 {
<-otelTraceServer.requests
}
// test-title events
for len(otelLogsServer.requests) > 0 {
<-otelLogsServer.requests
}
})

AfterEach(func() {
Expand All @@ -379,15 +394,14 @@ var _ = Describe("App", func() {
Entry("drops events", &loggregator_v2.Envelope{Message: &loggregator_v2.Envelope_Event{}}),
)

DescribeTable("not forward counters, gagues, timers and event envelopes to otel logs",
DescribeTable("not forward counters, gagues and timers envelopes to otel logs",
func(e *loggregator_v2.Envelope) {
ingressClient.Emit(e)
Consistently(otelLogsServer.requests, 3).ShouldNot(Receive())
},
Entry("drops counters", &loggregator_v2.Envelope{Message: &loggregator_v2.Envelope_Counter{}}),
Entry("drops gauges", &loggregator_v2.Envelope{Message: &loggregator_v2.Envelope_Gauge{}}),
Entry("drops timers", &loggregator_v2.Envelope{Message: &loggregator_v2.Envelope_Timer{}}),
Entry("drops events", &loggregator_v2.Envelope{Message: &loggregator_v2.Envelope_Event{}}),
)

It("forwards counters", func() {
Expand Down Expand Up @@ -434,6 +448,63 @@ var _ = Describe("App", func() {
Expect(log.GetBody().GetStringValue()).To(Equal(body))
})

It("forwards events", func() {
title := "event title"
body := "event body"
ingressClient.EmitEvent(context.TODO(), title, body)

var req *collogspb.ExportLogsServiceRequest
Eventually(otelLogsServer.requests).Should(Receive(&req))

log := req.ResourceLogs[0].ScopeLogs[0].LogRecords[0]
Expect(len(log.GetBody().GetKvlistValue().GetValues())).To(Equal(2))
for _, v := range log.GetBody().GetKvlistValue().GetValues() {
switch v.GetKey() {
case "title":
Expect(v.GetValue().GetStringValue()).To(Equal(title))
case "body":
Expect(v.GetValue().GetStringValue()).To(Equal(body))
default:
Expect(v.GetKey()).ToNot(HaveOccurred())
}
}
})

Context("when support for forwarding events as traces is not active", func() {
BeforeEach(func() {
agentCfg.EmitEventsAsOTelLogs = false
})

It("only emits events to other destinations", func() {
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
defer wg.Wait()
defer cancel()

wg.Add(1)
go func() {
defer wg.Done()

ticker := time.NewTicker(10 * time.Millisecond)
for {
select {
case <-ctx.Done():
ticker.Stop()
return
case <-ticker.C:
ingressClient.EmitEvent(context.TODO(), "title", "event body")
}
}
}()

var e *loggregator_v2.Envelope
Eventually(ingressServer1.envelopes, 5).Should(Receive(&e))
Expect(e.GetEvent().GetTitle()).To(Equal("title"))
Expect(e.GetEvent().GetBody()).To(Equal("event body"))
Consistently(otelLogsServer.requests, 5).ShouldNot(Receive())
})
})

Context("when support for forwarding timers as traces is not active", func() {
BeforeEach(func() {
agentCfg.EmitOTelTraces = false
Expand Down

0 comments on commit 5bbdef7

Please sign in to comment.