diff --git a/jobs/loggr-forwarder-agent-windows/monit b/jobs/loggr-forwarder-agent-windows/monit index 7410241a4..d56bb879a 100644 --- a/jobs/loggr-forwarder-agent-windows/monit +++ b/jobs/loggr-forwarder-agent-windows/monit @@ -25,6 +25,7 @@ "AGENT_TAGS" => tags.map { |k, v| "#{k}:#{v}" }.join(","), "DOWNSTREAM_INGRESS_PORT_GLOB" => p("downstream_ingress_port_glob"), "EMIT_OTEL_TRACES" => "#{p("temporary_emit_otel_traces")}", + "EMIT_EVENTS_AS_OTEL_LOGS" => "#{p("emit_events_as_otel_logs")}", "METRICS_PORT" => "#{p("metrics.port")}", "METRICS_CA_FILE_PATH" => "#{certs_dir}/metrics_ca.crt", "METRICS_CERT_FILE_PATH" => "#{certs_dir}/metrics.crt", diff --git a/jobs/loggr-forwarder-agent-windows/spec b/jobs/loggr-forwarder-agent-windows/spec index 41816d805..66dfc6708 100644 --- a/jobs/loggr-forwarder-agent-windows/spec +++ b/jobs/loggr-forwarder-agent-windows/spec @@ -38,6 +38,10 @@ properties: description: "Whether to emit traces to OpenTelemetry Collector downstream consumers" default: false + emit_events_as_otel_logs: + description: "Whether to emit events as logs to OpenTelemetry Collector downstream consumers" + default: false + tls.ca_cert: description: | TLS loggregator root CA certificate. It is required for key/cert diff --git a/jobs/loggr-forwarder-agent/spec b/jobs/loggr-forwarder-agent/spec index e72576559..5138e37cb 100644 --- a/jobs/loggr-forwarder-agent/spec +++ b/jobs/loggr-forwarder-agent/spec @@ -38,6 +38,10 @@ properties: description: "Whether to emit traces to OpenTelemetry Collector downstream consumers" default: false + emit_events_as_otel_logs: + description: "Whether to emit events as logs to OpenTelemetry Collector downstream consumers" + default: false + tls.ca_cert: description: | TLS loggregator root CA certificate. It is required for key/cert diff --git a/jobs/loggr-forwarder-agent/templates/bpm.yml.erb b/jobs/loggr-forwarder-agent/templates/bpm.yml.erb index ddb5309b8..3aa4a13e0 100644 --- a/jobs/loggr-forwarder-agent/templates/bpm.yml.erb +++ b/jobs/loggr-forwarder-agent/templates/bpm.yml.erb @@ -30,6 +30,7 @@ "DOWNSTREAM_INGRESS_PORT_GLOB" => p("downstream_ingress_port_glob"), "EMIT_OTEL_TRACES" => p("temporary_emit_otel_traces"), + "EMIT_EVENTS_AS_OTEL_LOGS" => p("emit_events_as_otel_logs"), "METRICS_PORT" => "#{p("metrics.port")}", "METRICS_CA_FILE_PATH" => "#{certs_dir}/metrics_ca.crt", diff --git a/src/cmd/forwarder-agent/app/config.go b/src/cmd/forwarder-agent/app/config.go index 54daa6120..e11e90d02 100644 --- a/src/cmd/forwarder-agent/app/config.go +++ b/src/cmd/forwarder-agent/app/config.go @@ -30,6 +30,7 @@ type Config struct { Tags map[string]string `env:"AGENT_TAGS"` DebugMetrics bool `env:"DEBUG_METRICS, report"` EmitOTelTraces bool `env:"EMIT_OTEL_TRACES, report"` + EmitEventsAsOTelLogs bool `env:"EMIT_EVENTS_AS_OTEL_LOGS, report"` } // LoadConfig will load the configuration for the forwarder agent from the diff --git a/src/cmd/forwarder-agent/app/forwarder_agent.go b/src/cmd/forwarder-agent/app/forwarder_agent.go index f2e8da0e9..14c3c58dd 100644 --- a/src/cmd/forwarder-agent/app/forwarder_agent.go +++ b/src/cmd/forwarder-agent/app/forwarder_agent.go @@ -42,6 +42,7 @@ type ForwarderAgent struct { tags map[string]string debugMetrics bool emitOTelTraces bool + emitEventsAsOTelLogs bool } type Metrics interface { @@ -73,6 +74,7 @@ func NewForwarderAgent( tags: cfg.Tags, debugMetrics: cfg.MetricsServer.DebugMetrics, emitOTelTraces: cfg.EmitOTelTraces, + emitEventsAsOTelLogs: cfg.EmitEventsAsOTelLogs, } } @@ -96,7 +98,7 @@ func (s *ForwarderAgent) Run() { })) dests := downstreamDestinations(s.downstreamFilePattern, s.log) - writers := downstreamWriters(dests, s.grpc, s.m, s.emitOTelTraces, s.log) + writers := downstreamWriters(dests, s.grpc, s.m, s.emitOTelTraces, s.emitEventsAsOTelLogs, s.log) tagger := egress_v2.NewTagger(s.tags) ew := egress_v2.NewEnvelopeWriter( multiWriter{writers: writers}, @@ -209,13 +211,13 @@ func downstreamDestinations(pattern string, l *log.Logger) []destination { return dests } -func downstreamWriters(dests []destination, grpc GRPC, m Metrics, emitOTelTraces bool, l *log.Logger) []Writer { +func downstreamWriters(dests []destination, grpc GRPC, m Metrics, emitOTelTraces, emitEventsAsOTelLogs bool, l *log.Logger) []Writer { var writers []Writer for _, d := range dests { var w Writer switch d.Protocol { case "otelcol": - w = otelCollectorClient(d, grpc, m, emitOTelTraces, l) + w = otelCollectorClient(d, grpc, m, emitOTelTraces, emitEventsAsOTelLogs, l) default: w = loggregatorClient(d, grpc, m, l) } @@ -224,7 +226,7 @@ func downstreamWriters(dests []destination, grpc GRPC, m Metrics, emitOTelTraces return writers } -func otelCollectorClient(dest destination, grpc GRPC, m Metrics, emitTraces bool, l *log.Logger) Writer { +func otelCollectorClient(dest destination, grpc GRPC, m Metrics, emitTraces, emitEvents bool, l *log.Logger) Writer { clientCreds, err := tlsconfig.Build( tlsconfig.WithInternalServiceDefaults(), tlsconfig.WithIdentityFromFile(grpc.CertFile, grpc.KeyFile), @@ -252,7 +254,7 @@ func otelCollectorClient(dest destination, grpc GRPC, m Metrics, emitTraces bool }), ) - dw := egress.NewDiodeWriter(context.Background(), otelcolclient.New(w, emitTraces), gendiodes.AlertFunc(func(missed int) { + dw := egress.NewDiodeWriter(context.Background(), otelcolclient.New(w, emitTraces, emitEvents), gendiodes.AlertFunc(func(missed int) { expired.Add(float64(missed)) }), timeoutwaitgroup.New(time.Minute)) diff --git a/src/cmd/forwarder-agent/app/forwarder_agent_test.go b/src/cmd/forwarder-agent/app/forwarder_agent_test.go index c493fb8c3..7ad159453 100644 --- a/src/cmd/forwarder-agent/app/forwarder_agent_test.go +++ b/src/cmd/forwarder-agent/app/forwarder_agent_test.go @@ -362,6 +362,21 @@ var _ = Describe("App", func() { otelTraceServer = startSpyOtelColTraceServer(ingressCfgPath, agentCerts, "otel-collector") otelLogsServer = startSpyOtelColLogServer(ingressCfgPath, agentCerts, "otel-collector") agentCfg.EmitOTelTraces = true + agentCfg.EmitEventsAsOTelLogs = true + }) + + JustBeforeEach(func() { + // Because the event being sent JustBeforeEach in the main test, channels need to be emptied + for len(otelMetricsServer.requests) > 0 { + <-otelMetricsServer.requests + } + for len(otelTraceServer.requests) > 0 { + <-otelTraceServer.requests + } + // test-title events + for len(otelLogsServer.requests) > 0 { + <-otelLogsServer.requests + } }) AfterEach(func() { @@ -379,7 +394,7 @@ var _ = Describe("App", func() { Entry("drops events", &loggregator_v2.Envelope{Message: &loggregator_v2.Envelope_Event{}}), ) - DescribeTable("not forward counters, gagues, timers and event envelopes to otel logs", + DescribeTable("not forward counters, gagues and timers envelopes to otel logs", func(e *loggregator_v2.Envelope) { ingressClient.Emit(e) Consistently(otelLogsServer.requests, 3).ShouldNot(Receive()) @@ -387,7 +402,6 @@ var _ = Describe("App", func() { Entry("drops counters", &loggregator_v2.Envelope{Message: &loggregator_v2.Envelope_Counter{}}), Entry("drops gauges", &loggregator_v2.Envelope{Message: &loggregator_v2.Envelope_Gauge{}}), Entry("drops timers", &loggregator_v2.Envelope{Message: &loggregator_v2.Envelope_Timer{}}), - Entry("drops events", &loggregator_v2.Envelope{Message: &loggregator_v2.Envelope_Event{}}), ) It("forwards counters", func() { @@ -434,6 +448,63 @@ var _ = Describe("App", func() { Expect(log.GetBody().GetStringValue()).To(Equal(body)) }) + It("forwards events", func() { + title := "event title" + body := "event body" + ingressClient.EmitEvent(context.TODO(), title, body) + + var req *collogspb.ExportLogsServiceRequest + Eventually(otelLogsServer.requests).Should(Receive(&req)) + + log := req.ResourceLogs[0].ScopeLogs[0].LogRecords[0] + Expect(len(log.GetBody().GetKvlistValue().GetValues())).To(Equal(2)) + for _, v := range log.GetBody().GetKvlistValue().GetValues() { + switch v.GetKey() { + case "title": + Expect(v.GetValue().GetStringValue()).To(Equal(title)) + case "body": + Expect(v.GetValue().GetStringValue()).To(Equal(body)) + default: + Expect(v.GetKey()).ToNot(HaveOccurred()) + } + } + }) + + Context("when support for forwarding events as traces is not active", func() { + BeforeEach(func() { + agentCfg.EmitEventsAsOTelLogs = false + }) + + It("only emits events to other destinations", func() { + ctx, cancel := context.WithCancel(context.Background()) + var wg sync.WaitGroup + defer wg.Wait() + defer cancel() + + wg.Add(1) + go func() { + defer wg.Done() + + ticker := time.NewTicker(10 * time.Millisecond) + for { + select { + case <-ctx.Done(): + ticker.Stop() + return + case <-ticker.C: + ingressClient.EmitEvent(context.TODO(), "title", "event body") + } + } + }() + + var e *loggregator_v2.Envelope + Eventually(ingressServer1.envelopes, 5).Should(Receive(&e)) + Expect(e.GetEvent().GetTitle()).To(Equal("title")) + Expect(e.GetEvent().GetBody()).To(Equal("event body")) + Consistently(otelLogsServer.requests, 5).ShouldNot(Receive()) + }) + }) + Context("when support for forwarding timers as traces is not active", func() { BeforeEach(func() { agentCfg.EmitOTelTraces = false