From 71706fc83348c8cb027bd250c982d6f697fb7b00 Mon Sep 17 00:00:00 2001 From: Andrei Matei Date: Wed, 6 Jul 2022 19:08:59 -0400 Subject: [PATCH] obsservice: ingest events This patch adds a couple of things: 1. an RPC endpoint to CRDB for streaming out events. This RPC service is called by the Obs Service. 2. a library in CRDB for exporting events. 3. code in the Obs Service for ingesting the events and writing them into a table in the sink cluster. The first use of the event exporting is for the system.eventlog events. All events written to that table are now also exported. Once the Obs Service takes hold in the future, I hope we can remove system.eventlog. The events are represented using OpenTelemetry protos. Unfortunately, I've had to copy over the otel protos to our tree because I couldn't figure out a vendoring solution. Problems encountered for vendoring are: 1. The repo where these protos live (https://github.com/open-telemetry/opentelemetry-proto) is not go-get-able. This means hacks are needed for our vendoring tools. 2. Some of the protos in that repo do not build with gogoproto (they only build with protoc), because they use the new-ish "optional" option on some fields. The logs protos that we use in this patch do not have this problem, but others do (so we'll need to figure something out in the future when dealing with the metrics proto). FWIW, the OpenTelemetry Collector ironically has the same problem (it uses gogoproto), and it solved it through a sed that changes all the optional fields to one-of's. 3. Even if you solve the first two problems, the next one is that we already have a dependency on these compiled protos in our tree (go.opentelemetry.io/proto/otlp). This repo contains generated code, using protoc. We need it because of our use of the otlp trace exporter. Bringing in the protos again, and building them with gogo, results in go files that want to live in the same package/have the same import path. So changing the import paths is needed. Between all of these, I've given up - at least for the moment - and decided to copy over to our tree the few protos that we actually need. I'm also changing their import paths. You'll notice that there is a script that codifies the process of transforming the needed protos from their otel upstream. Release note: None --- .github/CODEOWNERS | 1 + pkg/BUILD.bazel | 12 + pkg/gen/protobuf.bzl | 4 + pkg/obs/BUILD.bazel | 28 + pkg/obs/doc.go | 15 + pkg/obs/event_exporter.go | 573 ++++++++++++++++++ pkg/obsservice/cmd/obsservice/BUILD.bazel | 6 + pkg/obsservice/cmd/obsservice/main.go | 126 +++- pkg/obsservice/obslib/httpproxy/BUILD.bazel | 1 + .../obslib/httpproxy/reverseproxy.go | 158 ++--- pkg/obsservice/obslib/ingest/BUILD.bazel | 25 + pkg/obsservice/obslib/ingest/ingest.go | 157 +++++ .../obslib/migrations/migrations.go | 19 +- .../migrations/sqlmigrations/0001_init.sql | 12 +- pkg/obsservice/obspb/BUILD.bazel | 34 ++ pkg/obsservice/obspb/README.md | 21 + pkg/obsservice/obspb/event_types.go | 22 + pkg/obsservice/obspb/obs.proto | 23 + .../obspb/opentelemetry-proto.patch | 33 + .../opentelemetry-proto/common/v1/BUILD.bazel | 28 + .../common/v1/common.proto | 77 +++ .../opentelemetry-proto/logs/v1/BUILD.bazel | 38 ++ .../opentelemetry-proto/logs/v1/logs.proto | 179 ++++++ .../resource/v1/BUILD.bazel | 30 + .../resource/v1/resource.proto | 37 ++ pkg/obsservice/obspb/resource.go | 18 + .../obspb/update-opentelemetry-proto.sh | 94 +++ pkg/server/BUILD.bazel | 2 + pkg/server/decommission.go | 2 +- pkg/server/node.go | 29 +- pkg/server/server.go | 30 +- pkg/server/server_sql.go | 5 + pkg/server/tenant.go | 39 +- pkg/sql/BUILD.bazel | 4 + pkg/sql/catalog/systemschema/system.go | 5 + pkg/sql/event_log.go | 81 ++- pkg/sql/exec_util.go | 4 + pkg/testutils/lint/lint_test.go | 3 + 38 files changed, 1828 insertions(+), 147 deletions(-) create mode 100644 pkg/obs/BUILD.bazel create mode 100644 pkg/obs/doc.go create mode 100644 pkg/obs/event_exporter.go create mode 100644 pkg/obsservice/obslib/ingest/BUILD.bazel create mode 100644 pkg/obsservice/obslib/ingest/ingest.go create mode 100644 pkg/obsservice/obspb/BUILD.bazel create mode 100644 pkg/obsservice/obspb/README.md create mode 100644 pkg/obsservice/obspb/event_types.go create mode 100644 pkg/obsservice/obspb/obs.proto create mode 100644 pkg/obsservice/obspb/opentelemetry-proto.patch create mode 100644 pkg/obsservice/obspb/opentelemetry-proto/common/v1/BUILD.bazel create mode 100644 pkg/obsservice/obspb/opentelemetry-proto/common/v1/common.proto create mode 100644 pkg/obsservice/obspb/opentelemetry-proto/logs/v1/BUILD.bazel create mode 100644 pkg/obsservice/obspb/opentelemetry-proto/logs/v1/logs.proto create mode 100644 pkg/obsservice/obspb/opentelemetry-proto/resource/v1/BUILD.bazel create mode 100644 pkg/obsservice/obspb/opentelemetry-proto/resource/v1/resource.proto create mode 100644 pkg/obsservice/obspb/resource.go create mode 100755 pkg/obsservice/obspb/update-opentelemetry-proto.sh diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 9bec248b4c88..a0ae1eed6e81 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -351,6 +351,7 @@ /pkg/util/stop/ @cockroachdb/kv-prs /pkg/util/tracing @cockroachdb/obs-inf-prs /pkg/workload/ @cockroachdb/sql-experience-noreview +/pkg/obs/ @cockroachdb/obs-inf-prs /pkg/obsservice/ @cockroachdb/obs-inf-prs # Own all bazel files to dev-inf, but don't request reviews for them diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 451d39f27be7..7b13fe709927 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -1081,10 +1081,16 @@ GO_TARGETS = [ "//pkg/kv:kv_test", "//pkg/multitenant/tenantcostmodel:tenantcostmodel", "//pkg/multitenant:multitenant", + "//pkg/obs:obs", "//pkg/obsservice/cmd/obsservice:obsservice", "//pkg/obsservice/cmd/obsservice:obsservice_lib", "//pkg/obsservice/obslib/httpproxy:httpproxy", + "//pkg/obsservice/obslib/ingest:ingest", "//pkg/obsservice/obslib/migrations:migrations", + "//pkg/obsservice/obspb/opentelemetry-proto/common/v1:common", + "//pkg/obsservice/obspb/opentelemetry-proto/logs/v1:logs", + "//pkg/obsservice/obspb/opentelemetry-proto/resource/v1:resource", + "//pkg/obsservice/obspb:obspb", "//pkg/release:release", "//pkg/roachpb/gen:gen", "//pkg/roachpb/gen:gen_lib", @@ -2239,9 +2245,15 @@ GET_X_DATA_TARGETS = [ "//pkg/kv/kvserver/uncertainty:get_x_data", "//pkg/multitenant:get_x_data", "//pkg/multitenant/tenantcostmodel:get_x_data", + "//pkg/obs:get_x_data", "//pkg/obsservice/cmd/obsservice:get_x_data", "//pkg/obsservice/obslib/httpproxy:get_x_data", + "//pkg/obsservice/obslib/ingest:get_x_data", "//pkg/obsservice/obslib/migrations:get_x_data", + "//pkg/obsservice/obspb:get_x_data", + "//pkg/obsservice/obspb/opentelemetry-proto/common/v1:get_x_data", + "//pkg/obsservice/obspb/opentelemetry-proto/logs/v1:get_x_data", + "//pkg/obsservice/obspb/opentelemetry-proto/resource/v1:get_x_data", "//pkg/release:get_x_data", "//pkg/roachpb:get_x_data", "//pkg/roachpb/gen:get_x_data", diff --git a/pkg/gen/protobuf.bzl b/pkg/gen/protobuf.bzl index b2ff1c497ee2..b496391a8a2a 100644 --- a/pkg/gen/protobuf.bzl +++ b/pkg/gen/protobuf.bzl @@ -28,6 +28,10 @@ PROTOBUF_SRCS = [ "//pkg/kv/kvserver/protectedts/ptstorage:ptstorage_go_proto", "//pkg/kv/kvserver/readsummary/rspb:rspb_go_proto", "//pkg/kv/kvserver:kvserver_go_proto", + "//pkg/obsservice/obspb/opentelemetry-proto/common/v1:v1_go_proto", + "//pkg/obsservice/obspb/opentelemetry-proto/logs/v1:v1_go_proto", + "//pkg/obsservice/obspb/opentelemetry-proto/resource/v1:v1_go_proto", + "//pkg/obsservice/obspb:obs_go_proto", "//pkg/roachpb:roachpb_go_proto", "//pkg/rpc:rpc_go_proto", "//pkg/server/diagnostics/diagnosticspb:diagnosticspb_go_proto", diff --git a/pkg/obs/BUILD.bazel b/pkg/obs/BUILD.bazel new file mode 100644 index 000000000000..70174276d106 --- /dev/null +++ b/pkg/obs/BUILD.bazel @@ -0,0 +1,28 @@ +load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "obs", + srcs = [ + "doc.go", + "event_exporter.go", + ], + importpath = "github.com/cockroachdb/cockroach/pkg/obs", + visibility = ["//visibility:public"], + deps = [ + "//pkg/obsservice/obspb", + "//pkg/obsservice/obspb/opentelemetry-proto/common/v1:common", + "//pkg/obsservice/obspb/opentelemetry-proto/logs/v1:logs", + "//pkg/obsservice/obspb/opentelemetry-proto/resource/v1:resource", + "//pkg/util/log", + "//pkg/util/mon", + "//pkg/util/stop", + "//pkg/util/syncutil", + "//pkg/util/timeutil", + "//pkg/util/uuid", + "@com_github_cockroachdb_redact//:redact", + "@org_golang_google_grpc//peer", + ], +) + +get_x_data(name = "get_x_data") diff --git a/pkg/obs/doc.go b/pkg/obs/doc.go new file mode 100644 index 000000000000..3b7df8bbd6e6 --- /dev/null +++ b/pkg/obs/doc.go @@ -0,0 +1,15 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package obs + +// Package obs represents the client library for the Observability Service. +// +// The Obs Service lives in pkg/obsservice. diff --git a/pkg/obs/event_exporter.go b/pkg/obs/event_exporter.go new file mode 100644 index 000000000000..4700c5558503 --- /dev/null +++ b/pkg/obs/event_exporter.go @@ -0,0 +1,573 @@ +// Copyright 2016 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package obs + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/cockroachdb/cockroach/pkg/obsservice/obspb" + otel_pb "github.com/cockroachdb/cockroach/pkg/obsservice/obspb/opentelemetry-proto/common/v1" + otel_logs_pb "github.com/cockroachdb/cockroach/pkg/obsservice/obspb/opentelemetry-proto/logs/v1" + otel_res_pb "github.com/cockroachdb/cockroach/pkg/obsservice/obspb/opentelemetry-proto/resource/v1" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/mon" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/cockroachdb/redact" + "google.golang.org/grpc/peer" +) + +// EventsExporter abstracts exporting events to the Observability Service. It is +// implemented by EventsServer. +type EventsExporter interface { + // SendEvent buffers an event to be sent to subscribers. + SendEvent(ctx context.Context, typ obspb.EventType, event otel_logs_pb.LogRecord) +} + +// EventsServer implements the obspb.ObsServer gRPC service. It responds to +// requests from the Observability Service to subscribe to the events stream. +// Once a subscription is established, new events (published through +// SendEvent()) are sent to the subscriber. +// +// The EventsServer supports a single subscriber at a time. If a new +// subscription request arrives while a subscriber is active (i.e. while a +// SubscribeToEvents gRPC call is running), the previous subscriber is +// disconnected (i.e. the RPC returns), and future events are sent to the new +// subscriber. +// +// When a subscriber is active, the EventsServer buffers events and flushes them +// out to the subscriber periodically (according to flushInterval) and when a +// buffer size threshold is met (triggerSizeBytes). +// The EventsServer does not buffer events when no subscriber is active, for +// better or worse. +type EventsServer struct { + ambientCtx log.AmbientContext + stop *stop.Stopper + clock timeutil.TimeSource + resource otel_res_pb.Resource + // resourceSet is set by SetResourceInfo(). The server is ready to serve RPCs + // once this is set. + resourceSet syncutil.AtomicBool + + // flushInterval is the duration after which a flush is triggered. + // 0 disables this trigger. + flushInterval time.Duration + // triggerSizeBytes is the size in bytes of accumulated messages which trigger a flush. + // 0 disables this trigger. + triggerSizeBytes uint64 + maxBufferSizeBytes uint64 + + // buf accumulates events to be sent to a subscriber. + buf eventsBuffers + + mu struct { + syncutil.Mutex + // sub is the current subscriber. nil if there is no subscriber. + sub *subscriber + } +} + +var _ EventsExporter = &EventsServer{} + +var _ obspb.ObsServer = &EventsServer{} + +// NewEventServer creates an EventServer. +// +// SetResourceInfo needs to be called before the EventServer is registered with +// a gRPC server. +// +// flushInterval and triggerSize control the circumstances under which the sink +// automatically flushes its contents to the child sink. Zero values disable +// these flush triggers. If all triggers are disabled, the buffer is only ever +// flushed when a flush is explicitly requested through the extraFlush or +// forceSync options passed to output(). +// +// maxBufferSize, if not zero, limits the size of the buffer. When a new message +// is causing the buffer to overflow, old messages are dropped. The caller must +// ensure that maxBufferSize makes sense in relation to triggerSize: triggerSize +// should be lower (otherwise the buffer will never flush based on the size +// threshold), and there should be enough of a gap between the two to generally +// fit at least one message (otherwise the buffer might again never flush, since +// incoming messages would cause old messages to be dropped and the buffer's +// size might never fall in between triggerSize and maxSize). See the diagram +// below. +// +// |msg|msg|msg|msg|msg|msg|msg|msg|msg| +// └----------------------^--------------┘ +// triggerSize maxBufferSize +// └--------------┘ +// sized-based flush is triggered when size falls in this range +// +// maxBufferSize should also be set such that it makes sense in relationship +// with the flush latency: only one flush is ever in flight at a time, so the +// buffer should be sized to generally hold at least the amount of data that is +// expected to be produced during the time it takes one flush to complete. +func NewEventServer( + ambient log.AmbientContext, + clock timeutil.TimeSource, + stop *stop.Stopper, + maxStaleness time.Duration, + triggerSizeBytes uint64, + maxBufferSizeBytes uint64, + memMonitor *mon.BytesMonitor, +) *EventsServer { + s := &EventsServer{ + ambientCtx: ambient, + stop: stop, + clock: clock, + flushInterval: maxStaleness, + triggerSizeBytes: triggerSizeBytes, + maxBufferSizeBytes: maxBufferSizeBytes, + } + s.buf.mu.events = map[obspb.EventType]*eventsBuffer{ + obspb.EventlogEvent: { + instrumentationScope: otel_pb.InstrumentationScope{ + Name: string(obspb.EventlogEvent), + Version: "1.0", + }, + }, + } + s.buf.mu.memAccount = memMonitor.MakeBoundAccount() + return s +} + +// SetResourceInfo sets identifying information that will be attached to all the +// exported data. +// +// nodeID can be either a roachpb.NodeID (for KV nodes) or a base.SQLInstanceID +// (for SQL tenants). +func (s *EventsServer) SetResourceInfo(clusterID uuid.UUID, nodeID int32, version string) { + s.resource = otel_res_pb.Resource{ + Attributes: []*otel_pb.KeyValue{ + { + Key: obspb.ClusterID, + Value: &otel_pb.AnyValue{Value: &otel_pb.AnyValue_StringValue{StringValue: clusterID.String()}}, + }, + { + Key: obspb.NodeID, + Value: &otel_pb.AnyValue{Value: &otel_pb.AnyValue_IntValue{IntValue: int64(nodeID)}}, + }, + { + Key: obspb.NodeBinaryVersion, + Value: &otel_pb.AnyValue{Value: &otel_pb.AnyValue_StringValue{StringValue: version}}, + }, + }, + } + s.resourceSet.Set(true) +} + +// eventsBuffers groups together a buffer for each EventType. +// +// Ordered delivery of events (with possible dropped events) to subscribers is +// ensured for individual EventTypes, not across them. +type eventsBuffers struct { + mu struct { + syncutil.Mutex + // events stores all the buffered data. + events map[obspb.EventType]*eventsBuffer + // sizeBytes is the sum of sizes for the eventsBuffer's. + sizeBytes uint64 + // memAccount tracks the memory usage of events. + memAccount mon.BoundAccount + } +} + +var errEventTooLarge = errors.New("event is too large") + +// maybeDropEventsForSizeLocked makes sure there's room in the buffer for +// a new event with size newEventBytes. +// +// If the new event would cause the buffer to overflow (according to maxSize), +// then events are dropped from the buffer until its size drops below maxSize/2. +func (bufs *eventsBuffers) maybeDropEventsForSizeLocked( + ctx context.Context, newEventSize uint64, maxSize uint64, +) error { + if newEventSize > maxSize/2 { + return errEventTooLarge + } + size := bufs.mu.sizeBytes + if (size + newEventSize) < maxSize { + // The new message fits. There's nothing to do. + return nil + } + + // Drop the oldest events from the event types that take up the most space. + targetSize := maxSize / 2 + needToClearBytes := size - targetSize + for { + if bufs.mu.sizeBytes <= targetSize { + break + } + + // Find the largest event type. + var maxEventType obspb.EventType + maxSize := uint64(0) + for typ, buf := range bufs.mu.events { + if buf.sizeBytes > maxSize { + maxSize = buf.sizeBytes + maxEventType = typ + } + } + if maxEventType == "" { + panic("failed to find non-empty EventType") + } + + // Drop events from the largest event type. + buf := bufs.mu.events[maxEventType] + droppedBytes := buf.dropEvents(needToClearBytes) + buf.sizeBytes -= droppedBytes + bufs.mu.sizeBytes -= droppedBytes + bufs.mu.memAccount.Shrink(ctx, int64(droppedBytes)) + } + return nil +} + +// clear clears all the buffers, dropping all messages. +func (bufs *eventsBuffers) clear(ctx context.Context) { + bufs.mu.Lock() + defer bufs.mu.Unlock() + bufs.mu.sizeBytes = 0 + for _, buf := range bufs.mu.events { + _, _ = buf.moveContents() + } + bufs.mu.memAccount.Empty(ctx) +} + +// eventsBuffer represents a queue of events of a particular type (identified by +// instrumentationScope). +type eventsBuffer struct { + instrumentationScope otel_pb.InstrumentationScope + events []otel_logs_pb.LogRecord + sizeBytes uint64 + // droppedEvents maintains the count of events that have been dropped from the + // buffer because of memory limits. + droppedEvents uint64 +} + +// moveContents empties the buffer, returning all the events in it, and their +// total byte size. +func (b *eventsBuffer) moveContents() ([]otel_logs_pb.LogRecord, uint64) { + events := b.events + sizeBytes := b.sizeBytes + b.events = nil + b.sizeBytes = 0 + return events, sizeBytes +} + +// dropEvents drops events from b until either b is empty, or needToClearBytes +// worth of events have been dropped. Returns the bytes dropped. +func (b *eventsBuffer) dropEvents(needToClearBytes uint64) uint64 { + cleared := uint64(0) + for len(b.events) != 0 && cleared < needToClearBytes { + evSize := sizeOfEvent(b.events[0]) + cleared += evSize + b.sizeBytes -= evSize + b.droppedEvents++ + b.events = b.events[1:] + } + return cleared +} + +// SendEvent buffers an event to be sent to subscribers. +func (s *EventsServer) SendEvent( + ctx context.Context, typ obspb.EventType, event otel_logs_pb.LogRecord, +) { + // If there's no subscriber, short-circuit. + // + // TODO(andrei): We should buffer at least a little bit, so that we don't miss + // events close to the node start, before the Obs Service (if any) has had a + // chance to subscribe. + s.mu.Lock() + sub := s.mu.sub + s.mu.Unlock() + if sub == nil { + return + } + + // Make sure there's room for the new event. If there isn't, we'll drop + // events from the front of the buffer (the oldest), until there is room. + newEventSize := sizeOfEvent(event) + s.buf.mu.Lock() + defer s.buf.mu.Unlock() + if err := s.buf.maybeDropEventsForSizeLocked(ctx, newEventSize, s.maxBufferSizeBytes); err != nil { + log.Warningf(ctx, "%s", err.Error()) + return + } + + buf := s.buf.mu.events[typ] + if err := s.buf.mu.memAccount.Grow(ctx, int64(newEventSize)); err != nil { + // No memory available. + buf.droppedEvents++ + return + } + + buf.events = append(buf.events, event) + buf.sizeBytes += newEventSize + s.buf.mu.sizeBytes += newEventSize + + // If we've hit the flush threshold, trigger a flush. + if s.triggerSizeBytes > 0 && s.buf.mu.sizeBytes > s.triggerSizeBytes { + select { + case sub.flushC <- struct{}{}: + default: + } + } +} + +// sizeOfEvent computes the size, in bytes, of event. This size will be used for +// memory accounting. +func sizeOfEvent(event otel_logs_pb.LogRecord) uint64 { + switch { + case event.Body.GetBytesValue() != nil: + return uint64(len(event.Body.GetBytesValue())) + case event.Body.GetStringValue() != "": + return uint64(len(event.Body.GetStringValue())) + default: + panic(fmt.Sprintf("unsupported event: %s", event.Body)) + } +} + +// subscriber represents data about an events subscrriber - a caller to the +// SubscribeToEvents RPC. +type subscriber struct { + // res represents metadata attached to all events, identifying this CRDB node. + res otel_res_pb.Resource + // stopC is signaled on close(). + stopC chan error + // flushAndStopC is closed to signal to the flusher that it should attempt to + // flush everything and then terminate. + flushAndStopC <-chan struct{} + // flusherDoneC is signaled by the flusher goroutine, informing the RPC + // handler that it finished. + flusherDoneC chan struct{} + // flushC is used to signal the flusher goroutine to flush. + flushC chan struct{} + + mu struct { + syncutil.Mutex + conn obspb.Obs_SubscribeToEventsServer + } +} + +// close closes the subscriber. Further calls to send() will return an error. +// The call blocks until sub's flusher goroutine terminates. +// +// close can be called multiple times. +func (sub *subscriber) close(err error) { + sub.mu.Lock() + defer sub.mu.Unlock() + + if sub.mu.conn == nil { + return + } + + // Mark ourselves as closed. + sub.mu.conn = nil + // Tell the flusher goroutine to terminate. + sub.stopC <- err + <-sub.flusherDoneC +} + +var errSubscriberClosed = errors.New("subscriber closed") + +// send sends events to the remote subscriber. It might block if the network +// connection buffers are full. +// +// If an error is returned, sub is closed and sub.send() should not be called +// anymore. +func (sub *subscriber) send(events []otel_logs_pb.ScopeLogs) error { + sub.mu.Lock() + defer sub.mu.Unlock() + + if sub.mu.conn == nil { + return errSubscriberClosed + } + + msg := &obspb.Events{ + ResourceLogs: []*otel_logs_pb.ResourceLogs{ + { + Resource: &sub.res, + ScopeLogs: events, + }, + }, + } + err := sub.mu.conn.Send(msg) + if err != nil { + // If we failed to send, we can't use this subscriber anymore. + // + // TODO(andrei): Figure out how to tolerate errors; we should put the events + // back in the buffer (or not take them out of the buffer in the first + // place) in hope that a new subscriber comes along. + sub.close(err) + return err + } + return nil +} + +// newSubscriber creates a subscriber. Events will be sent on conn. The +// subscriber's flusher goroutine listens to flushAndStopC for a signal to flush +// and close. +func (s *EventsServer) newSubscriber( + conn obspb.Obs_SubscribeToEventsServer, flushAndStopC <-chan struct{}, +) *subscriber { + sub := &subscriber{ + res: s.resource, + stopC: make(chan error, 1), + flushAndStopC: flushAndStopC, + flusherDoneC: make(chan struct{}, 1), + flushC: make(chan struct{}, 1), + } + sub.mu.conn = conn + return sub +} + +// errNewSubscriber is passed to an existing subscriber when a new subscriber +// comes along. +var errNewSubscriber = errors.New("new subscriber") + +// errServerNotReady is returned by SubscribeToEvents if the server is not ready +// to process requests. +var errServerNotReady = errors.New("server starting up; not ready to serve RPC") + +// SubscribeToEvents is the EventsServer's RPC interface. Events will be pushed +// to subscriber. +func (s *EventsServer) SubscribeToEvents( + req *obspb.SubscribeToEventsRequest, subscriber obspb.Obs_SubscribeToEventsServer, +) error { + ctx := s.ambientCtx.AnnotateCtx(subscriber.Context()) + + if !s.resourceSet.Get() { + return errServerNotReady + } + + var clientAddr redact.SafeString + client, ok := peer.FromContext(ctx) + if ok { + clientAddr = redact.SafeString(client.Addr.String()) + } + log.Infof(ctx, "received events subscription request from Observability Service; "+ + "subscriber identifying as: %s (%s)", redact.SafeString(req.Identity), clientAddr) + + // Register the new subscriber, replacing any existing one. + sub := s.newSubscriber(subscriber, s.stop.ShouldQuiesce()) + { + s.mu.Lock() + if s.mu.sub != nil { + s.mu.sub.close(errNewSubscriber) + } + s.mu.sub = sub + s.mu.Unlock() + } + + // Run the flusher. This call blocks until this subscriber is signaled to + // terminate in one of a couple of ways: + // 1. Through the remote RPC client terminating the call by canceling ctx. + // 2. Through a new subscriber coming and calling close() on the old one. + // 3. Through the stopper quiescing. + sub.runFlusher(ctx, &s.buf, s.flushInterval) + + // Close the subscription, if it hasn't been closed already by the remote + // subscriber. + sub.close(nil /* err */) + s.reset(ctx, sub) + // TODO(andrei): It might be a good idea to return errors in some cases + // (another subscriber coming, or quiescence). + return nil +} + +// runFlusher runs the flusher goroutine for the subscriber. The flusher will +// consume eventsBuffer. +// +// flushInterval, if not zero, controls the flush's timer. Flushes are also +// triggered by events size. +// +// runFlusher returns when stopC, flushAndStopC or ctx.Done() are signaled. +func (sub *subscriber) runFlusher( + ctx context.Context, bufs *eventsBuffers, flushInterval time.Duration, +) { + defer close(sub.flusherDoneC) + timer := timeutil.NewTimer() + defer timer.Stop() + if flushInterval != 0 { + timer.Reset(flushInterval) + } + for { + done := false + select { + case <-sub.stopC: + // The sink has gone away; we need to stop consuming the buffers + // and terminate the flusher goroutine. + return + case <-ctx.Done(): + // The RPC context was canceled. This also signifies that the subscriber + // has gone away. + return + case <-sub.flushAndStopC: + // We'll return after flushing everything. + done = true + case <-timer.C: + timer.Read = true + timer.Reset(flushInterval) + case <-sub.flushC: + } + + // Flush the buffers for all event types. + var msg []otel_logs_pb.ScopeLogs + msgSize := uint64(0) + bufs.mu.Lock() + for _, buf := range bufs.mu.events { + events, sizeBytes := buf.moveContents() + if len(events) == 0 { + continue + } + bufs.mu.sizeBytes -= sizeBytes + msgSize += sizeBytes + msg = append(msg, otel_logs_pb.ScopeLogs{Scope: &buf.instrumentationScope, LogRecords: events}) + } + bufs.mu.Unlock() + + if len(msg) > 0 { + err := sub.send(msg) + bufs.mu.Lock() + bufs.mu.memAccount.Shrink(ctx, int64(msgSize)) + bufs.mu.Unlock() + // If we failed to send, the subscriber has been closed and cannot be used anymore. + if err != nil { + return + } + } + + if done { + return + } + } +} + +// reset resets the server to an empty state - no subscriber and an empty events +// buffer. +// +// The reset is conditional on the server's subscriber still being sub. +func (s *EventsServer) reset(ctx context.Context, sub *subscriber) { + s.mu.Lock() + defer s.mu.Unlock() + if s.mu.sub != sub { + // We have already switched to another subscriber. + return + } + + s.mu.sub = nil + s.buf.clear(ctx) +} diff --git a/pkg/obsservice/cmd/obsservice/BUILD.bazel b/pkg/obsservice/cmd/obsservice/BUILD.bazel index c8b7f189a752..a0d777dec233 100644 --- a/pkg/obsservice/cmd/obsservice/BUILD.bazel +++ b/pkg/obsservice/cmd/obsservice/BUILD.bazel @@ -9,9 +9,15 @@ go_library( deps = [ "//pkg/cli/exit", "//pkg/obsservice/obslib/httpproxy", + "//pkg/obsservice/obslib/ingest", "//pkg/obsservice/obslib/migrations", "//pkg/ui/distoss", + "//pkg/util/log", + "//pkg/util/stop", + "//pkg/util/sysutil", + "@com_github_jackc_pgx_v4//pgxpool", "@com_github_spf13_cobra//:cobra", + "@org_golang_x_sys//unix", ], ) diff --git a/pkg/obsservice/cmd/obsservice/main.go b/pkg/obsservice/cmd/obsservice/main.go index da598f4d5b66..4ca3b8284dfe 100644 --- a/pkg/obsservice/cmd/obsservice/main.go +++ b/pkg/obsservice/cmd/obsservice/main.go @@ -12,14 +12,40 @@ import ( "context" "flag" "fmt" + "os" + "os/signal" + "time" "github.com/cockroachdb/cockroach/pkg/cli/exit" "github.com/cockroachdb/cockroach/pkg/obsservice/obslib/httpproxy" + "github.com/cockroachdb/cockroach/pkg/obsservice/obslib/ingest" "github.com/cockroachdb/cockroach/pkg/obsservice/obslib/migrations" _ "github.com/cockroachdb/cockroach/pkg/ui/distoss" // web UI init hooks + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/sysutil" + "github.com/jackc/pgx/v4/pgxpool" "github.com/spf13/cobra" + "golang.org/x/sys/unix" ) +// drainSignals are the signals that will cause the server to drain and exit. +// +// The signals will initiate a graceful shutdown. If received a second time, +// SIGINT will be reraised without a signal handler and the default action +// terminate the process abruptly. +// +// Receiving SIGTERM a second time does not do a brutal shutdown, as SIGTERM is +// named termSignal below. +var drainSignals = []os.Signal{unix.SIGINT, unix.SIGTERM} + +// termSignal is the signal that causes an idempotent graceful +// shutdown (i.e. second occurrence does not incur hard shutdown). +var termSignal os.Signal = unix.SIGTERM + +// defaultSinkDBName is the name of the database to be used by default. +const defaultSinkDBName = "obsservice" + // RootCmd represents the base command when called without any subcommands var RootCmd = &cobra.Command{ Use: "obsservice", @@ -36,12 +62,70 @@ from one or more CockroachDB clusters.`, UICertKeyPath: uiCertKeyPath, } - if err := migrations.RunDBMigrations(ctx, sinkPGURL); err != nil { - panic(err) + connCfg, err := pgxpool.ParseConfig(sinkPGURL) + if err != nil { + panic(fmt.Sprintf("invalid --sink-pgurl (%s): %s", sinkPGURL, err)) + } + if connCfg.ConnConfig.Database == "" { + fmt.Printf("No database explicitly provided in --sink-pgurl. Using %q.\n", defaultSinkDBName) + connCfg.ConnConfig.Database = defaultSinkDBName } - // Block forever running the proxy. - <-httpproxy.NewReverseHTTPProxy(ctx, cfg).RunAsync(ctx) + pool, err := pgxpool.ConnectConfig(ctx, connCfg) + if err != nil { + panic(fmt.Sprintf("failed to connect to sink database (%s): %s", sinkPGURL, err)) + } + + if err := migrations.RunDBMigrations(ctx, connCfg.ConnConfig); err != nil { + panic(fmt.Sprintf("failed to run DB migrations: %s", err)) + } + + signalCh := make(chan os.Signal, 1) + signal.Notify(signalCh, drainSignals...) + + stop := stop.NewStopper() + + // Run the event ingestion in the background. + if eventsAddr != "" { + ingest.StartIngestEvents(ctx, eventsAddr, pool, stop) + } + // Run the reverse HTTP proxy in the background. + httpproxy.NewReverseHTTPProxy(ctx, cfg).Start(ctx, stop) + + // Block until the process is signaled to terminate. + sig := <-signalCh + log.Infof(ctx, "received signal %s. Shutting down.", sig) + go func() { + stop.Stop(ctx) + }() + + // Print the shutdown progress every 5 seconds. + go func() { + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + for { + select { + case <-ticker.C: + log.Infof(ctx, "%d running tasks", stop.NumTasks()) + case <-stop.IsStopped(): + return + } + } + }() + + // Wait until the shutdown is complete or we receive another signal. + select { + case <-stop.IsStopped(): + log.Infof(ctx, "shutdown complete") + case sig = <-signalCh: + switch sig { + case termSignal: + log.Infof(ctx, "received SIGTERM while shutting down. Continuing shutdown.") + default: + // Crash. + handleSignalDuringShutdown(sig) + } + } }, } @@ -52,6 +136,7 @@ var ( caCertPath string uiCertPath, uiCertKeyPath string sinkPGURL string + eventsAddr string ) func main() { @@ -93,11 +178,40 @@ func main() { RootCmd.PersistentFlags().StringVar( &sinkPGURL, "sink-pgurl", - "postgresql://root@andrei-desktop:26257/defaultdb?sslmode=disable", - "PGURL for the sink cluster.") + "postgresql://root@andrei-desktop:26257?sslmode=disable", + "PGURL for the sink cluster. If the url does not include a database name, "+ + "then \"obsservice\" will be used.") + + RootCmd.PersistentFlags().StringVar( + &eventsAddr, + "crdb-events-addr", + "localhost:26257", + "Address of a CRDB node that events will be ingested from.") if err := RootCmd.Execute(); err != nil { fmt.Println(err) exit.WithCode(exit.UnspecifiedError()) } } + +func handleSignalDuringShutdown(sig os.Signal) { + // On Unix, a signal that was not handled gracefully by the application + // should be reraised so it is visible in the exit code. + + // Reset signal to its original disposition. + signal.Reset(sig) + + // Reraise the signal. os.Signal is always sysutil.Signal. + if err := unix.Kill(unix.Getpid(), sig.(sysutil.Signal)); err != nil { + // Sending a valid signal to ourselves should never fail. + // + // Unfortunately it appears (#34354) that some users + // run CockroachDB in containers that only support + // a subset of all syscalls. If this ever happens, we + // still need to quit immediately. + log.Fatalf(context.Background(), "unable to forward signal %v: %v", sig, err) + } + + // Block while we wait for the signal to be delivered. + select {} +} diff --git a/pkg/obsservice/obslib/httpproxy/BUILD.bazel b/pkg/obsservice/obslib/httpproxy/BUILD.bazel index 861f67ccca1a..d98797fa1503 100644 --- a/pkg/obsservice/obslib/httpproxy/BUILD.bazel +++ b/pkg/obsservice/obslib/httpproxy/BUILD.bazel @@ -10,6 +10,7 @@ go_library( "//pkg/cli/exit", "//pkg/ui", "//pkg/util/log", + "//pkg/util/stop", "//pkg/util/syncutil", "@com_github_cockroachdb_cmux//:cmux", "@com_github_cockroachdb_errors//:errors", diff --git a/pkg/obsservice/obslib/httpproxy/reverseproxy.go b/pkg/obsservice/obslib/httpproxy/reverseproxy.go index 3c5a844421b8..df0fe76d9e0a 100644 --- a/pkg/obsservice/obslib/httpproxy/reverseproxy.go +++ b/pkg/obsservice/obslib/httpproxy/reverseproxy.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/cli/exit" "github.com/cockroachdb/cockroach/pkg/ui" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" ) @@ -117,14 +118,8 @@ func (c *noOIDCConfigured) GetOIDCConf() ui.OIDCUIConf { // underlying CRDB cluster. var CRDBProxyPaths = []string{"/_admin/", "/_status/", "/ts/", "/api/v2/"} -// RunAsync runs an HTTP proxy server in a goroutine. The returned channel is -// closed when the server terminates. -// -// TODO(andrei): Currently the server never terminates. Figure out a closing -// signal. -func (p *ReverseHTTPProxy) RunAsync(ctx context.Context) <-chan struct{} { - ch := make(chan struct{}) - +// Start runs an HTTP proxy server in stopper tasks. +func (p *ReverseHTTPProxy) Start(ctx context.Context, stop *stop.Stopper) { listener, err := net.Listen("tcp", p.listenAddr) if err != nil { fmt.Fprintf(os.Stderr, "failed to listen for incoming HTTP connections on address %s: %s", @@ -133,76 +128,82 @@ func (p *ReverseHTTPProxy) RunAsync(ctx context.Context) <-chan struct{} { } https := p.certs.UICert != nil - go func() { - defer close(ch) - var err error - - defer func() { - _ = listener.Close() - }() - - // Create the HTTP mux. Requests will generally be forwarded to p.proxy, - // except the /debug/pprof ones which will be served locally. - mux := http.NewServeMux() - // TODO(davidh): Ideally, the UI handler should probably be - // configured in `obsservice` and not hardcoded into `obslib`. This - // gives lib users a chance to do whatever they want with the UI. - mux.Handle("/", ui.Handler(ui.Config{ - ExperimentalUseLogin: false, - LoginEnabled: false, - GetUser: func(ctx context.Context) *string { - u := "Observability Service" - return &u - }, - OIDC: &noOIDCConfigured{}, - })) - for _, path := range CRDBProxyPaths { - mux.Handle(path, p.proxy) + // Create the HTTP mux. Some requests will be forwarded to p.proxy, others + // will be served locally. + mux := http.NewServeMux() + // TODO(davidh): Ideally, the UI handler should probably be + // configured in `obsservice` and not hardcoded into `obslib`. This + // gives lib users a chance to do whatever they want with the UI. + mux.Handle("/", ui.Handler(ui.Config{ + ExperimentalUseLogin: false, + LoginEnabled: false, + GetUser: func(ctx context.Context) *string { + u := "Observability Service" + return &u + }, + OIDC: &noOIDCConfigured{}, + })) + for _, path := range CRDBProxyPaths { + mux.Handle(path, p.proxy) + } + // This seems to be the minimal set of handlers that we need to register in + // order to get all the pprof functionality. The pprof.Index handler handles + // some types of profiles itself. + mux.HandleFunc("/debug/pprof/", pprof.Index) + mux.HandleFunc("/debug/pprof/profile", pprof.Profile) + mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) + mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) + mux.HandleFunc("/debug/pprof/trace", pprof.Trace) + + var servers []*http.Server + + if !https { + // The Observability Service is not configured with certs, so it can only + // serve HTTP. + s := &http.Server{Handler: mux} + servers = append(servers, s) + if err := stop.RunAsyncTask(ctx, "reverse proxy HTTP server", func(ctx context.Context) { + _ = s.Serve(listener) + }); err != nil { + return + } + } else { + // We're configured to serve HTTPS. We'll also listen for HTTP requests, and redirect them + // to HTTPS. + + // Separate HTTP traffic from HTTPS traffic. + protocolMux := cmux.New(listener) + clearL := protocolMux.Match(cmux.HTTP1()) // Note that adding this matcher first gives it priority. + tlsL := protocolMux.Match(cmux.Any()) + // Redirect HTTP to HTTPS. + redirectHandler := http.NewServeMux() + redirectHandler.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + // TODO(andrei): Consider dealing with HSTS headers. Probably drop HSTS + // headers coming from CRDB, and set our own headers. + http.Redirect(w, r, "https://"+r.Host+r.RequestURI, http.StatusTemporaryRedirect) + }) + redirectServer := &http.Server{Handler: redirectHandler} + servers = append(servers, redirectServer) + if err := stop.RunAsyncTask(ctx, "reverse proxy redirect server", func(ctx context.Context) { + _ = redirectServer.Serve(clearL) + }); err != nil { + return } - // This seems to be the minimal set of handlers that we need to register in - // order to get all the pprof functionality. The pprof.Index handler handles - // some types of profiles itself. - mux.HandleFunc("/debug/pprof/", pprof.Index) - mux.HandleFunc("/debug/pprof/profile", pprof.Profile) - mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) - mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) - mux.HandleFunc("/debug/pprof/trace", pprof.Trace) - - if !https { - // The Observability Service is not configured with certs, so it can only - // serve HTTP. - err = (&http.Server{Handler: mux}).Serve(listener) - } else { - // We're configured to serve HTTPS. We'll also listen for HTTP requests, and redirect them - // to HTTPS. - - // Separate HTTP traffic from HTTPS traffic. - protocolMux := cmux.New(listener) - clearL := protocolMux.Match(cmux.HTTP1()) // Note that adding this matcher first gives it priority. - tlsL := protocolMux.Match(cmux.Any()) - // Redirect HTTP to HTTPS. - redirectHandler := http.NewServeMux() - redirectHandler.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { - // TODO(andrei): Consider dealing with HSTS headers. Probably drop HSTS - // headers coming from CRDB, and set our own headers. - http.Redirect(w, r, "https://"+r.Host+r.RequestURI, http.StatusTemporaryRedirect) - }) - redirectServer := http.Server{Handler: redirectHandler} - go func() { - _ = redirectServer.Serve(clearL) - }() - // Serve HTTPS traffic by delegating it to the proxy. - tlsServer := &http.Server{Handler: mux} - go func() { - _ = tlsServer.ServeTLS(tlsL, p.certs.UICertPath, p.certs.UICertKeyPath) - }() - err = protocolMux.Serve() + // Serve HTTPS traffic by delegating it to the proxy. + tlsServer := &http.Server{Handler: mux} + servers = append(servers, tlsServer) + if err := stop.RunAsyncTask(ctx, "reverse proxy TLS server", func(ctx context.Context) { + _ = tlsServer.ServeTLS(tlsL, p.certs.UICertPath, p.certs.UICertKeyPath) + }); err != nil { + return } - if !errors.Is(err, http.ErrServerClosed) { - fmt.Println(err.Error()) + if err := stop.RunAsyncTask(ctx, "reverse proxy protocol muxer", func(ctx context.Context) { + _ = protocolMux.Serve() + }); err != nil { + return } - }() + } scheme := "http" if https { @@ -210,7 +211,14 @@ func (p *ReverseHTTPProxy) RunAsync(ctx context.Context) <-chan struct{} { } fmt.Printf("Listening for HTTP requests on %s://%s.\n", scheme, p.listenAddr) - return ch + // Wait for the stopper to signal quiescing and shut down everything. + go func() { + <-stop.ShouldQuiesce() + for _, s := range servers { + _ = s.Shutdown(ctx) + } + _ = listener.Close() + }() } // certificates groups together all the certificates relevant to the proxy diff --git a/pkg/obsservice/obslib/ingest/BUILD.bazel b/pkg/obsservice/obslib/ingest/BUILD.bazel new file mode 100644 index 000000000000..cfb16c39ad7f --- /dev/null +++ b/pkg/obsservice/obslib/ingest/BUILD.bazel @@ -0,0 +1,25 @@ +load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "ingest", + srcs = ["ingest.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/obsservice/obslib/ingest", + visibility = ["//visibility:public"], + deps = [ + "//pkg/obsservice/obspb", + "//pkg/obsservice/obspb/opentelemetry-proto/logs/v1:logs", + "//pkg/roachpb", + "//pkg/util/log", + "//pkg/util/stop", + "//pkg/util/timeutil", + "//pkg/util/uuid", + "@com_github_cockroachdb_cockroach_go_v2//crdb/crdbpgx", + "@com_github_jackc_pgx_v4//:pgx", + "@com_github_jackc_pgx_v4//pgxpool", + "@org_golang_google_grpc//:go_default_library", + "@org_golang_google_grpc//credentials/insecure", + ], +) + +get_x_data(name = "get_x_data") diff --git a/pkg/obsservice/obslib/ingest/ingest.go b/pkg/obsservice/obslib/ingest/ingest.go new file mode 100644 index 000000000000..c6c3d023665b --- /dev/null +++ b/pkg/obsservice/obslib/ingest/ingest.go @@ -0,0 +1,157 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 + +package ingest + +import ( + "context" + "fmt" + "io" + + "github.com/cockroachdb/cockroach-go/v2/crdb/crdbpgx" + "github.com/cockroachdb/cockroach/pkg/obsservice/obspb" + otlogs "github.com/cockroachdb/cockroach/pkg/obsservice/obspb/opentelemetry-proto/logs/v1" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v4/pgxpool" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +// StartIngestEvents runs event ingestion in a stopper task. +func StartIngestEvents(ctx context.Context, addr string, db *pgxpool.Pool, stop *stop.Stopper) { + _ = stop.RunAsyncTask(ctx, "event ingestor", func(ctx context.Context) { + ctx, cancel := stop.WithCancelOnQuiesce(ctx) + defer cancel() + ingestEvents(ctx, addr, db) + }) +} + +// ingestEvents subscribes to events published by the CRDB node at addr and +// persists them to the database. +// +// The call blocks until the RPC is terminated by the server or ctx is canceled. +func ingestEvents(ctx context.Context, addr string, db *pgxpool.Pool) { + // TODO(andrei): recover from connection errors. + // TODO(andrei): use certs for secure clusters. + conn, err := grpc.DialContext(ctx, addr, + grpc.WithBlock(), // block until the connection is established + grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + panic(fmt.Sprintf("failed to dial %s: %s", addr, err)) + } + defer func() { + _ = conn.Close() // nolint:grpcconnclose + }() + log.Infof(ctx, "Ingesting events from %s.", addr) + + c := obspb.NewObsClient(conn) + stream, err := c.SubscribeToEvents(ctx, + &obspb.SubscribeToEventsRequest{ + Identity: "Obs Service", + }) + if err != nil { + panic(fmt.Sprintf("SubscribeToEvents call to %s failed: %s", addr, err)) + } + + for { + events, err := stream.Recv() + if err != nil { + if (err != io.EOF && ctx.Err() == nil) || log.V(2) { + log.Infof(ctx, "event stream error: %s", err) + } + // TODO(andrei): recover from the error by trying to reestablish the + // connection. + return + } + if log.V(3) { + log.Infof(ctx, "received events: %s", events.String()) + } + persistEvents(ctx, events.ResourceLogs, db) + } +} + +// persistEvents writes events to the database. +func persistEvents(ctx context.Context, events []*otlogs.ResourceLogs, db *pgxpool.Pool) { + for _, group := range events { + var clusterID uuid.UUID + var nodeID roachpb.NodeID + for _, att := range group.Resource.Attributes { + switch att.Key { + case obspb.ClusterID: + var err error + clusterID, err = uuid.FromString(att.Value.GetStringValue()) + if err != nil { + log.Warningf(ctx, "invalid cluster ID: %s", att.Value) + continue + } + case obspb.NodeID: + nodeID = roachpb.NodeID(att.Value.GetIntValue()) + if nodeID == 0 { + log.Warningf(ctx, "invalid node ID: %s", att.Value) + continue + } + } + } + if clusterID.Equal(uuid.UUID{}) || nodeID == 0 { + log.Warning(ctx, "clusterID or nodeID not set") + continue + } + for _, scope := range group.ScopeLogs { + switch scope.Scope.Name { + case string(obspb.EventlogEvent): + if err := persistEventlogEvents(ctx, scope, clusterID, nodeID, db); err != nil { + log.Warningf(ctx, "error persisting events: %s", err) + } + } + } + } +} + +// persistEventlogEvents writes "eventlog" events to the database. +func persistEventlogEvents( + ctx context.Context, + events otlogs.ScopeLogs, + clusterID uuid.UUID, + nodeID roachpb.NodeID, + db *pgxpool.Pool, +) error { + if events.Scope.Name != string(obspb.EventlogEvent) { + panic(fmt.Sprintf("wrong event type: %s", events.Scope.Name)) + } + return crdbpgx.ExecuteTx(ctx, db, pgx.TxOptions{}, func(tx pgx.Tx) error { + for _, ev := range events.LogRecords { + var eventType string + for _, kv := range ev.Attributes { + if kv.Key == obspb.EventlogEventTypeAttribute { + eventType = kv.Value.GetStringValue() + } + } + data := ev.Body.GetStringValue() + // TODO(andrei): write multiple events per statement + // TODO(andrei): use a prepared statement + _, err := tx.Exec(ctx, + "INSERT INTO cluster_events(timestamp, cluster_id, node_id, event_type, event) "+ + "VALUES ($1, $2, $3, $4, $5)", + timeutil.Unix(0, int64(ev.TimeUnixNano)), + clusterID, + nodeID, + eventType, + data, + ) + if err != nil { + return err + } + } + return nil + }) +} diff --git a/pkg/obsservice/obslib/migrations/migrations.go b/pkg/obsservice/obslib/migrations/migrations.go index cce86c7d3e5e..552907080452 100644 --- a/pkg/obsservice/obslib/migrations/migrations.go +++ b/pkg/obsservice/obslib/migrations/migrations.go @@ -11,7 +11,6 @@ package migrations import ( "context" "embed" - "fmt" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/jackc/pgx/v4" @@ -24,24 +23,10 @@ import ( //go:embed sqlmigrations/*.sql var sqlMigrations embed.FS -// defaultSinkDBName is the name of the database to be used by default -const defaultSinkDBName = "obsservice" - // RunDBMigrations brings the SQL schema in the sink cluster up to date. // -// sinkPGURL is the connection string for the sink cluster. If it includes a -// database, that database will be used. If it doesn't, a default one will be -// used. -func RunDBMigrations(ctx context.Context, sinkPGURL string) error { - connCfg, err := pgx.ParseConfig(sinkPGURL) - if err != nil { - return err - } - if connCfg.Database == "" { - fmt.Printf("No database explicitly provided in --sink-pgurl. Using %q.\n", defaultSinkDBName) - connCfg.Database = defaultSinkDBName - } - +// connCfg represent the connection info for sink cluster. +func RunDBMigrations(ctx context.Context, connCfg *pgx.ConnConfig) error { if log.V(2) { goose.SetVerbose(true) } diff --git a/pkg/obsservice/obslib/migrations/sqlmigrations/0001_init.sql b/pkg/obsservice/obslib/migrations/sqlmigrations/0001_init.sql index a9a94e2596ba..1bc5ec99891d 100644 --- a/pkg/obsservice/obslib/migrations/sqlmigrations/0001_init.sql +++ b/pkg/obsservice/obslib/migrations/sqlmigrations/0001_init.sql @@ -1,5 +1,13 @@ -- +goose Up -CREATE TABLE events(); +CREATE TABLE cluster_events( + timestamp TIMESTAMP NOT NULL, + id BYTES NOT NULL DEFAULT uuid_v4(), + cluster_id BYTES NOT NULL, + node_id INT NOT NULL, + event_type STRING NOT NULL, + event JSONB, + CONSTRAINT "primary" PRIMARY KEY (timestamp, id) USING HASH WITH BUCKET_COUNT = 16 +) WITH (ttl_expire_after = '3 months'); -- +goose Down -DROP TABLE events; +DROP TABLE cluster_events; diff --git a/pkg/obsservice/obspb/BUILD.bazel b/pkg/obsservice/obspb/BUILD.bazel new file mode 100644 index 000000000000..a693a47c2983 --- /dev/null +++ b/pkg/obsservice/obspb/BUILD.bazel @@ -0,0 +1,34 @@ +load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") +load("@rules_proto//proto:defs.bzl", "proto_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library") + +proto_library( + name = "obs_proto", + srcs = ["obs.proto"], + strip_import_prefix = "/pkg", + visibility = ["//visibility:public"], + deps = ["//pkg/obsservice/obspb/opentelemetry-proto/logs/v1:v1_proto"], +) + +go_proto_library( + name = "obs_go_proto", + compilers = ["//pkg/cmd/protoc-gen-gogoroach:protoc-gen-gogoroach_grpc_compiler"], + importpath = "github.com/cockroachdb/cockroach/pkg/obsservice/obspb", + proto = ":obs_proto", + visibility = ["//visibility:public"], + deps = ["//pkg/obsservice/obspb/opentelemetry-proto/logs/v1:logs"], +) + +go_library( + name = "obspb", + srcs = [ + "event_types.go", + "resource.go", + ], + embed = [":obs_go_proto"], + importpath = "github.com/cockroachdb/cockroach/pkg/obsservice/obspb", + visibility = ["//visibility:public"], +) + +get_x_data(name = "get_x_data") diff --git a/pkg/obsservice/obspb/README.md b/pkg/obsservice/obspb/README.md new file mode 100644 index 000000000000..5d546eb9cb02 --- /dev/null +++ b/pkg/obsservice/obspb/README.md @@ -0,0 +1,21 @@ +This dir contains the gRPC service definition used by CockroachDB to export data +to the Observability Service. + + +The `opentelemetry-proto` dir contains protos copied from +[opentelemetry-proto](https://github.com/open-telemetry/opentelemetry-proto). +They can be kept up to date with upstream by running +`./update-opentelemetry-proto.sh`. + + +We copy the protos that we need from `opentelemetry-proto` into our tree because +vendoring the upstream repo proved too difficult (it's not `go get`-able, some +of the protos in it don't build with gogoproto and also we already vendor +[opentelemetry-proto-go](https://github.com/open-telemetry/opentelemetry-proto-go), +which contains the protoc-compiled protos. This other repo clashes with the +import path the opentelemetry-proto wants. + +[opentelemetry-collector](https://github.com/open-telemetry/opentelemetry-collector) +also uses gogoproto, and has a complicated build pipeline for the protos. For +example, they transform all "optional" fields into "oneof" [using +sed](https://github.com/open-telemetry/opentelemetry-collector/blob/feab9491538a882737a5bceb8757b4458a86edd3/proto_patch.sed). diff --git a/pkg/obsservice/obspb/event_types.go b/pkg/obsservice/obspb/event_types.go new file mode 100644 index 000000000000..730e92f339e5 --- /dev/null +++ b/pkg/obsservice/obspb/event_types.go @@ -0,0 +1,22 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 + +package obspb + +// EventType identifies a type of event that the Obs Service can ingest. +type EventType string + +const ( + // EventlogEvent represents general events about the cluster that historically + // have been persisted inside CRDB in the system.eventlog table. + EventlogEvent EventType = "eventlog" + + // EventlogEventTypeAttribute represents the key of the attribute containing + // the event type of an EventlogEvent. + EventlogEventTypeAttribute = "event_type" +) diff --git a/pkg/obsservice/obspb/obs.proto b/pkg/obsservice/obspb/obs.proto new file mode 100644 index 000000000000..47f4b9eac765 --- /dev/null +++ b/pkg/obsservice/obspb/obs.proto @@ -0,0 +1,23 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 + +syntax = "proto3"; +package obspb; +import "obsservice/obspb/opentelemetry-proto/logs/v1/logs.proto"; + +service Obs { + rpc SubscribeToEvents(SubscribeToEventsRequest) returns (stream Events) {} +} + +message SubscribeToEventsRequest { + string identity = 1; +} + +message Events { + repeated opentelemetry.proto.logs.v1.ResourceLogs resource_logs = 1; +} diff --git a/pkg/obsservice/obspb/opentelemetry-proto.patch b/pkg/obsservice/obspb/opentelemetry-proto.patch new file mode 100644 index 000000000000..bdaee811f7a5 --- /dev/null +++ b/pkg/obsservice/obspb/opentelemetry-proto.patch @@ -0,0 +1,33 @@ +This patch is applied by update-opentelemetry-proto.sh to the otel protos. + +diff --git a/pkg/obsservice/obspb/opentelemetry-proto/logs/v1/logs.proto b/pkg/obsservice/obspb/opentelemetry-proto/logs/v1/logs.proto +index 9629e39e9c..d5db90161f 100644 +--- a/pkg/obsservice/obspb/opentelemetry-proto/logs/v1/logs.proto ++++ b/pkg/obsservice/obspb/opentelemetry-proto/logs/v1/logs.proto +@@ -19,6 +19,8 @@ package opentelemetry.proto.logs.v1; + import "obsservice/obspb/opentelemetry-proto/common/v1/common.proto"; + import "obsservice/obspb/opentelemetry-proto/resource/v1/resource.proto"; + ++import "gogoproto/gogo.proto"; ++ + option csharp_namespace = "OpenTelemetry.Proto.Logs.V1"; + option java_multiple_files = true; + option java_package = "io.opentelemetry.proto.logs.v1"; +@@ -53,7 +55,7 @@ message ResourceLogs { + opentelemetry.proto.resource.v1.Resource resource = 1; + + // A list of ScopeLogs that originate from a resource. +- repeated ScopeLogs scope_logs = 2; ++ repeated ScopeLogs scope_logs = 2 [(gogoproto.nullable) = false ]; + + // This schema_url applies to the data in the "resource" field. It does not apply + // to the data in the "scope_logs" field which have their own schema_url field. +@@ -68,7 +70,7 @@ message ScopeLogs { + opentelemetry.proto.common.v1.InstrumentationScope scope = 1; + + // A list of log records. +- repeated LogRecord log_records = 2; ++ repeated LogRecord log_records = 2 [(gogoproto.nullable) = false ]; + + // This schema_url applies to all logs in the "logs" field. + string schema_url = 3; diff --git a/pkg/obsservice/obspb/opentelemetry-proto/common/v1/BUILD.bazel b/pkg/obsservice/obspb/opentelemetry-proto/common/v1/BUILD.bazel new file mode 100644 index 000000000000..84003132abc8 --- /dev/null +++ b/pkg/obsservice/obspb/opentelemetry-proto/common/v1/BUILD.bazel @@ -0,0 +1,28 @@ +load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") +load("@rules_proto//proto:defs.bzl", "proto_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library") + +proto_library( + name = "v1_proto", + srcs = ["common.proto"], + strip_import_prefix = "/pkg", + visibility = ["//visibility:public"], +) + +go_proto_library( + name = "v1_go_proto", + compilers = ["//pkg/cmd/protoc-gen-gogoroach:protoc-gen-gogoroach_compiler"], + importpath = "github.com/cockroachdb/cockroach/pkg/obsservice/obspb/opentelemetry-proto/common/v1", + proto = ":v1_proto", + visibility = ["//visibility:public"], +) + +go_library( + name = "common", + embed = [":v1_go_proto"], + importpath = "github.com/cockroachdb/cockroach/pkg/obsservice/obspb/opentelemetry-proto/common/v1", + visibility = ["//visibility:public"], +) + +get_x_data(name = "get_x_data") diff --git a/pkg/obsservice/obspb/opentelemetry-proto/common/v1/common.proto b/pkg/obsservice/obspb/opentelemetry-proto/common/v1/common.proto new file mode 100644 index 000000000000..db4649e33ef2 --- /dev/null +++ b/pkg/obsservice/obspb/opentelemetry-proto/common/v1/common.proto @@ -0,0 +1,77 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package opentelemetry.proto.common.v1; + +option csharp_namespace = "OpenTelemetry.Proto.Common.V1"; +option java_multiple_files = true; +option java_package = "io.opentelemetry.proto.common.v1"; +option java_outer_classname = "CommonProto"; +option go_package = "v1"; + +// AnyValue is used to represent any type of attribute value. AnyValue may contain a +// primitive value such as a string or integer or it may contain an arbitrary nested +// object containing arrays, key-value lists and primitives. +message AnyValue { + // The value is one of the listed fields. It is valid for all values to be unspecified + // in which case this AnyValue is considered to be "empty". + oneof value { + string string_value = 1; + bool bool_value = 2; + int64 int_value = 3; + double double_value = 4; + ArrayValue array_value = 5; + KeyValueList kvlist_value = 6; + bytes bytes_value = 7; + } +} + +// ArrayValue is a list of AnyValue messages. We need ArrayValue as a message +// since oneof in AnyValue does not allow repeated fields. +message ArrayValue { + // Array of values. The array may be empty (contain 0 elements). + repeated AnyValue values = 1; +} + +// KeyValueList is a list of KeyValue messages. We need KeyValueList as a message +// since `oneof` in AnyValue does not allow repeated fields. Everywhere else where we need +// a list of KeyValue messages (e.g. in Span) we use `repeated KeyValue` directly to +// avoid unnecessary extra wrapping (which slows down the protocol). The 2 approaches +// are semantically equivalent. +message KeyValueList { + // A collection of key/value pairs of key-value pairs. The list may be empty (may + // contain 0 elements). + // The keys MUST be unique (it is not allowed to have more than one + // value with the same key). + repeated KeyValue values = 1; +} + +// KeyValue is a key-value pair that is used to store Span attributes, Link +// attributes, etc. +message KeyValue { + string key = 1; + AnyValue value = 2; +} + +// InstrumentationScope is a message representing the instrumentation scope information +// such as the fully qualified name and version. +message InstrumentationScope { + // An empty instrumentation scope name means the name is unknown. + string name = 1; + string version = 2; + repeated KeyValue attributes = 3; + uint32 dropped_attributes_count = 4; +} diff --git a/pkg/obsservice/obspb/opentelemetry-proto/logs/v1/BUILD.bazel b/pkg/obsservice/obspb/opentelemetry-proto/logs/v1/BUILD.bazel new file mode 100644 index 000000000000..1bf034770c88 --- /dev/null +++ b/pkg/obsservice/obspb/opentelemetry-proto/logs/v1/BUILD.bazel @@ -0,0 +1,38 @@ +load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") +load("@rules_proto//proto:defs.bzl", "proto_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library") + +proto_library( + name = "v1_proto", + srcs = ["logs.proto"], + strip_import_prefix = "/pkg", + visibility = ["//visibility:public"], + deps = [ + "//pkg/obsservice/obspb/opentelemetry-proto/common/v1:v1_proto", + "//pkg/obsservice/obspb/opentelemetry-proto/resource/v1:v1_proto", + "@com_github_gogo_protobuf//gogoproto:gogo_proto", + ], +) + +go_proto_library( + name = "v1_go_proto", + compilers = ["//pkg/cmd/protoc-gen-gogoroach:protoc-gen-gogoroach_compiler"], + importpath = "github.com/cockroachdb/cockroach/pkg/obsservice/obspb/opentelemetry-proto/logs/v1", + proto = ":v1_proto", + visibility = ["//visibility:public"], + deps = [ + "//pkg/obsservice/obspb/opentelemetry-proto/common/v1:common", + "//pkg/obsservice/obspb/opentelemetry-proto/resource/v1:resource", + "@com_github_gogo_protobuf//gogoproto", + ], +) + +go_library( + name = "logs", + embed = [":v1_go_proto"], + importpath = "github.com/cockroachdb/cockroach/pkg/obsservice/obspb/opentelemetry-proto/logs/v1", + visibility = ["//visibility:public"], +) + +get_x_data(name = "get_x_data") diff --git a/pkg/obsservice/obspb/opentelemetry-proto/logs/v1/logs.proto b/pkg/obsservice/obspb/opentelemetry-proto/logs/v1/logs.proto new file mode 100644 index 000000000000..d5db90161fdb --- /dev/null +++ b/pkg/obsservice/obspb/opentelemetry-proto/logs/v1/logs.proto @@ -0,0 +1,179 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package opentelemetry.proto.logs.v1; + +import "obsservice/obspb/opentelemetry-proto/common/v1/common.proto"; +import "obsservice/obspb/opentelemetry-proto/resource/v1/resource.proto"; + +import "gogoproto/gogo.proto"; + +option csharp_namespace = "OpenTelemetry.Proto.Logs.V1"; +option java_multiple_files = true; +option java_package = "io.opentelemetry.proto.logs.v1"; +option java_outer_classname = "LogsProto"; +option go_package = "v1"; + +// LogsData represents the logs data that can be stored in a persistent storage, +// OR can be embedded by other protocols that transfer OTLP logs data but do not +// implement the OTLP protocol. +// +// The main difference between this message and collector protocol is that +// in this message there will not be any "control" or "metadata" specific to +// OTLP protocol. +// +// When new fields are added into this message, the OTLP request MUST be updated +// as well. +message LogsData { + // An array of ResourceLogs. + // For data coming from a single resource this array will typically contain + // one element. Intermediary nodes that receive data from multiple origins + // typically batch the data before forwarding further and in that case this + // array will contain multiple elements. + repeated ResourceLogs resource_logs = 1; +} + +// A collection of ScopeLogs from a Resource. +message ResourceLogs { + reserved 1000; + + // The resource for the logs in this message. + // If this field is not set then resource info is unknown. + opentelemetry.proto.resource.v1.Resource resource = 1; + + // A list of ScopeLogs that originate from a resource. + repeated ScopeLogs scope_logs = 2 [(gogoproto.nullable) = false ]; + + // This schema_url applies to the data in the "resource" field. It does not apply + // to the data in the "scope_logs" field which have their own schema_url field. + string schema_url = 3; +} + +// A collection of Logs produced by a Scope. +message ScopeLogs { + // The instrumentation scope information for the logs in this message. + // Semantically when InstrumentationScope isn't set, it is equivalent with + // an empty instrumentation scope name (unknown). + opentelemetry.proto.common.v1.InstrumentationScope scope = 1; + + // A list of log records. + repeated LogRecord log_records = 2 [(gogoproto.nullable) = false ]; + + // This schema_url applies to all logs in the "logs" field. + string schema_url = 3; +} + +// Possible values for LogRecord.SeverityNumber. +enum SeverityNumber { + // UNSPECIFIED is the default SeverityNumber, it MUST NOT be used. + SEVERITY_NUMBER_UNSPECIFIED = 0; + SEVERITY_NUMBER_TRACE = 1; + SEVERITY_NUMBER_TRACE2 = 2; + SEVERITY_NUMBER_TRACE3 = 3; + SEVERITY_NUMBER_TRACE4 = 4; + SEVERITY_NUMBER_DEBUG = 5; + SEVERITY_NUMBER_DEBUG2 = 6; + SEVERITY_NUMBER_DEBUG3 = 7; + SEVERITY_NUMBER_DEBUG4 = 8; + SEVERITY_NUMBER_INFO = 9; + SEVERITY_NUMBER_INFO2 = 10; + SEVERITY_NUMBER_INFO3 = 11; + SEVERITY_NUMBER_INFO4 = 12; + SEVERITY_NUMBER_WARN = 13; + SEVERITY_NUMBER_WARN2 = 14; + SEVERITY_NUMBER_WARN3 = 15; + SEVERITY_NUMBER_WARN4 = 16; + SEVERITY_NUMBER_ERROR = 17; + SEVERITY_NUMBER_ERROR2 = 18; + SEVERITY_NUMBER_ERROR3 = 19; + SEVERITY_NUMBER_ERROR4 = 20; + SEVERITY_NUMBER_FATAL = 21; + SEVERITY_NUMBER_FATAL2 = 22; + SEVERITY_NUMBER_FATAL3 = 23; + SEVERITY_NUMBER_FATAL4 = 24; +} + +// Masks for LogRecord.flags field. +enum LogRecordFlags { + LOG_RECORD_FLAG_UNSPECIFIED = 0; + LOG_RECORD_FLAG_TRACE_FLAGS_MASK = 0x000000FF; +} + +// A log record according to OpenTelemetry Log Data Model: +// https://github.com/open-telemetry/oteps/blob/main/text/logs/0097-log-data-model.md +message LogRecord { + reserved 4; + + // time_unix_nano is the time when the event occurred. + // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January 1970. + // Value of 0 indicates unknown or missing timestamp. + fixed64 time_unix_nano = 1; + + // Time when the event was observed by the collection system. + // For events that originate in OpenTelemetry (e.g. using OpenTelemetry Logging SDK) + // this timestamp is typically set at the generation time and is equal to Timestamp. + // For events originating externally and collected by OpenTelemetry (e.g. using + // Collector) this is the time when OpenTelemetry's code observed the event measured + // by the clock of the OpenTelemetry code. This field MUST be set once the event is + // observed by OpenTelemetry. + // + // For converting OpenTelemetry log data to formats that support only one timestamp or + // when receiving OpenTelemetry log data by recipients that support only one timestamp + // internally the following logic is recommended: + // - Use time_unix_nano if it is present, otherwise use observed_time_unix_nano. + // + // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January 1970. + // Value of 0 indicates unknown or missing timestamp. + fixed64 observed_time_unix_nano = 11; + + // Numerical value of the severity, normalized to values described in Log Data Model. + // [Optional]. + SeverityNumber severity_number = 2; + + // The severity text (also known as log level). The original string representation as + // it is known at the source. [Optional]. + string severity_text = 3; + + // A value containing the body of the log record. Can be for example a human-readable + // string message (including multi-line) describing the event in a free form or it can + // be a structured data composed of arrays and maps of other values. [Optional]. + opentelemetry.proto.common.v1.AnyValue body = 5; + + // Additional attributes that describe the specific event occurrence. [Optional]. + // Attribute keys MUST be unique (it is not allowed to have more than one + // attribute with the same key). + repeated opentelemetry.proto.common.v1.KeyValue attributes = 6; + uint32 dropped_attributes_count = 7; + + // Flags, a bit field. 8 least significant bits are the trace flags as + // defined in W3C Trace Context specification. 24 most significant bits are reserved + // and must be set to 0. Readers must not assume that 24 most significant bits + // will be zero and must correctly mask the bits when reading 8-bit trace flag (use + // flags & TRACE_FLAGS_MASK). [Optional]. + fixed32 flags = 8; + + // A unique identifier for a trace. All logs from the same trace share + // the same `trace_id`. The ID is a 16-byte array. An ID with all zeroes + // is considered invalid. Can be set for logs that are part of request processing + // and have an assigned trace id. [Optional]. + bytes trace_id = 9; + + // A unique identifier for a span within a trace, assigned when the span + // is created. The ID is an 8-byte array. An ID with all zeroes is considered + // invalid. Can be set for logs that are part of a particular processing span. + // If span_id is present trace_id SHOULD be also present. [Optional]. + bytes span_id = 10; +} diff --git a/pkg/obsservice/obspb/opentelemetry-proto/resource/v1/BUILD.bazel b/pkg/obsservice/obspb/opentelemetry-proto/resource/v1/BUILD.bazel new file mode 100644 index 000000000000..0ab73d15327f --- /dev/null +++ b/pkg/obsservice/obspb/opentelemetry-proto/resource/v1/BUILD.bazel @@ -0,0 +1,30 @@ +load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") +load("@rules_proto//proto:defs.bzl", "proto_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library") + +proto_library( + name = "v1_proto", + srcs = ["resource.proto"], + strip_import_prefix = "/pkg", + visibility = ["//visibility:public"], + deps = ["//pkg/obsservice/obspb/opentelemetry-proto/common/v1:v1_proto"], +) + +go_proto_library( + name = "v1_go_proto", + compilers = ["//pkg/cmd/protoc-gen-gogoroach:protoc-gen-gogoroach_compiler"], + importpath = "github.com/cockroachdb/cockroach/pkg/obsservice/obspb/opentelemetry-proto/resource/v1", + proto = ":v1_proto", + visibility = ["//visibility:public"], + deps = ["//pkg/obsservice/obspb/opentelemetry-proto/common/v1:common"], +) + +go_library( + name = "resource", + embed = [":v1_go_proto"], + importpath = "github.com/cockroachdb/cockroach/pkg/obsservice/obspb/opentelemetry-proto/resource/v1", + visibility = ["//visibility:public"], +) + +get_x_data(name = "get_x_data") diff --git a/pkg/obsservice/obspb/opentelemetry-proto/resource/v1/resource.proto b/pkg/obsservice/obspb/opentelemetry-proto/resource/v1/resource.proto new file mode 100644 index 000000000000..c3ce770ed2fc --- /dev/null +++ b/pkg/obsservice/obspb/opentelemetry-proto/resource/v1/resource.proto @@ -0,0 +1,37 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package opentelemetry.proto.resource.v1; + +import "obsservice/obspb/opentelemetry-proto/common/v1/common.proto"; + +option csharp_namespace = "OpenTelemetry.Proto.Resource.V1"; +option java_multiple_files = true; +option java_package = "io.opentelemetry.proto.resource.v1"; +option java_outer_classname = "ResourceProto"; +option go_package = "v1"; + +// Resource information. +message Resource { + // Set of attributes that describe the resource. + // Attribute keys MUST be unique (it is not allowed to have more than one + // attribute with the same key). + repeated opentelemetry.proto.common.v1.KeyValue attributes = 1; + + // dropped_attributes_count is the number of dropped attributes. If the value is 0, then + // no attributes were dropped. + uint32 dropped_attributes_count = 2; +} diff --git a/pkg/obsservice/obspb/resource.go b/pkg/obsservice/obspb/resource.go new file mode 100644 index 000000000000..51fbf8426343 --- /dev/null +++ b/pkg/obsservice/obspb/resource.go @@ -0,0 +1,18 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 + +package obspb + +// Names of fields used for Resource.Attributes. +const ( + ClusterID string = "ClusterID" + // NodeID corresponds to either a roachpb.NodeID (for KV nodes) or a + // base.SQLInstanceID (for SQL tenants). + NodeID string = "NodeID" + NodeBinaryVersion string = "NodeBinaryVersion" +) diff --git a/pkg/obsservice/obspb/update-opentelemetry-proto.sh b/pkg/obsservice/obspb/update-opentelemetry-proto.sh new file mode 100755 index 000000000000..3b9835e85534 --- /dev/null +++ b/pkg/obsservice/obspb/update-opentelemetry-proto.sh @@ -0,0 +1,94 @@ +#!/bin/bash + +# This script updates the .proto files under the opentelemetry-proto dir with +# upstream protos from https://github.com/open-telemetry/opentelemetry-proto. +# +# Use as +# cd pkg/obsservice/obspb +# ./update-opentelemetry-proto.sh [-k] [] +# +# We copy the protos that we need from opentelemetry-proto into our tree because +# vendoring the upstream repo proved too difficult (it's not `go get`-able, some +# of the protos in it don't build with gogoproto (*), and also we already vendor +# https://github.com/open-telemetry/opentelemetry-proto-go, which contains the +# protoc-compiled protos. This other repo clashes with the import path the +# opentelemetry-proto wants. +# +# (*) opentelemetry-collector also uses gogoproto, and has a complicated build +# pipeline for the protos. For example, they transform all "optional" fields +# into "oneof" using sed: +# https://github.com/open-telemetry/opentelemetry-collector/blob/feab9491538a882737a5bceb8757b4458a86edd3/proto_patch.sed + +set -euo pipefail + +if [[ ! $(pwd) == */obspb ]]; then + echo "$0 needs to be run from the obspb dir." + exit +fi + +keep="" +while getopts k flag +do + case "$flag" in + k) + keep=true + ;; + esac +done +shift $(($OPTIND - 1)) + +SHA="" +if [ $# -eq 1 ]; then + SHA=$1 + echo "No SHA supplied; will use latest from opentelemetry-proto." +fi + +WORK_DIR=`mktemp -d` + +# deletes the temp directory +function cleanup { + if [ ! "$keep" ] ; then + rm -rf "$WORK_DIR" + echo "Deleted temp working directory $WORK_DIR" + else + echo "-k specified; keeping temp dir $WORK_DIR" + fi +} +# register the cleanup function to be called on the EXIT signal +trap cleanup EXIT + +echo "Cloning opentelemetry-proto in $WORK_DIR." +git clone git@github.com:open-telemetry/opentelemetry-proto.git $WORK_DIR --quiet +if [[ ! -z $SHA ]]; then + echo "Checking out SHA: $SHA." + git -C $WORK_DIR checkout --quiet $SHA +fi + +BKDIR=`mktemp -d` +echo "Making a backup copy of opentelemetry-proto in $BKDIR." +cp -r opentelemetry-proto $BKDIR/ +DEST_DIR=opentelemetry-proto + +echo "Copying protos." +# Copy the protos from the repo. +rsync -avrq --include "*/" --include="common.proto" --include="resource.proto" --include="logs.proto" --exclude="*" --prune-empty-dirs $WORK_DIR/opentelemetry/proto/* $DEST_DIR + +# Massage the protos so that they work in our tree. +echo "Editing protos." + +# Change lines like: +# option go_package = "go.opentelemetry.io/proto/otlp/common/v1"; +# into: +# option go_package = "v1"; +find $DEST_DIR -type f -name "*.proto" -exec sed -i "s/option go_package = \"go.opentelemetry.io\/proto\/otlp\/.*\/v1\"/option go_package = \"v1\"/" {} + ; + +# Change lines like: +# import "obsservice/obspb/opentelemetry-proto/common/v1/common.proto"; +# into +# import "opentelemetry/proto/common/v1/common.proto"; +find $DEST_DIR -type f -name "*.proto" -exec sed -i "s/import \"opentelemetry\/proto\/\(.*\)\/v1/import \"obsservice\/obspb\/opentelemetry-proto\/\1\/v1/" {} + ; + +# Apply a final patch customizing the code generation (sprinkle some gogo.nullable=false). +git apply opentelemetry-proto.patch + +echo 'Done. Do not forget to run `./dev generate bazel` and `./dev generate protobuf`' diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 23e951e9ab43..b463d534c213 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -114,6 +114,8 @@ go_library( "//pkg/kv/kvserver/reports", "//pkg/multitenant", "//pkg/multitenant/tenantcostmodel", + "//pkg/obs", + "//pkg/obsservice/obspb", "//pkg/roachpb", "//pkg/rpc", "//pkg/rpc/nodedialer", diff --git a/pkg/server/decommission.go b/pkg/server/decommission.go index fdaf02ab3a75..3e09359faee9 100644 --- a/pkg/server/decommission.go +++ b/pkg/server/decommission.go @@ -191,7 +191,7 @@ func (s *Server) Decommission( if err := s.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { return sql.InsertEventRecord( ctx, - s.sqlServer.execCfg.InternalExecutor, + s.sqlServer.execCfg.InternalExecutor, s.sqlServer.execCfg.EventsExporter, txn, int32(s.NodeID()), /* reporting ID: the node where the event is logged */ sql.LogToSystemTable|sql.LogToDevChannelIfVerbose, /* we already call log.StructuredEvent above */ diff --git a/pkg/server/node.go b/pkg/server/node.go index 72a518667375..4af5553e0d70 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -32,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvtenant" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/multitenant" + "github.com/cockroachdb/cockroach/pkg/obs" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/server/status" @@ -207,18 +208,19 @@ func (nm nodeMetrics) callComplete(d time.Duration, pErr *roachpb.Error) { // IDs for bootstrapping the node itself or initializing new stores as // they're added on subsequent instantiations. type Node struct { - stopper *stop.Stopper - clusterID *base.ClusterIDContainer // UUID for Cockroach cluster - Descriptor roachpb.NodeDescriptor // Node ID, network/physical topology - storeCfg kvserver.StoreConfig // Config to use and pass to stores - sqlExec *sql.InternalExecutor // For event logging - stores *kvserver.Stores // Access to node-local stores - metrics nodeMetrics - recorder *status.MetricsRecorder - startedAt int64 - lastUp int64 - initialStart bool // true if this is the first time this node has started - txnMetrics kvcoord.TxnMetrics + stopper *stop.Stopper + clusterID *base.ClusterIDContainer // UUID for Cockroach cluster + Descriptor roachpb.NodeDescriptor // Node ID, network/physical topology + storeCfg kvserver.StoreConfig // Config to use and pass to stores + sqlExec *sql.InternalExecutor // For event logging + eventsExporter obs.EventsExporter + stores *kvserver.Stores // Access to node-local stores + metrics nodeMetrics + recorder *status.MetricsRecorder + startedAt int64 + lastUp int64 + initialStart bool // true if this is the first time this node has started + txnMetrics kvcoord.TxnMetrics // Used to signal when additional stores, if any, have been initialized. additionalStoreInitCh chan struct{} @@ -391,6 +393,7 @@ func NewNode( // InitLogger needs to be called if a nil execCfg was passed to NewNode(). func (n *Node) InitLogger(execCfg *sql.ExecutorConfig) { n.sqlExec = execCfg.InternalExecutor + n.eventsExporter = execCfg.EventsExporter } // String implements fmt.Stringer. @@ -964,7 +967,7 @@ func (n *Node) recordJoinEvent(ctx context.Context) { retryOpts.Closer = n.stopper.ShouldQuiesce() for r := retry.Start(retryOpts); r.Next(); { if err := n.storeCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - return sql.InsertEventRecord(ctx, n.sqlExec, + return sql.InsertEventRecord(ctx, n.sqlExec, n.eventsExporter, txn, int32(n.Descriptor.NodeID), /* reporting ID: the node where the event is logged */ sql.LogToSystemTable|sql.LogToDevChannelIfVerbose, /* LogEventDestination: we already call log.StructuredEvent above */ diff --git a/pkg/server/server.go b/pkg/server/server.go index 0cd01633fe08..a5e31dd2b600 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -22,6 +22,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/build" "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/jobs" @@ -42,6 +43,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptreconcile" serverrangefeed "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/reports" + "github.com/cockroachdb/cockroach/pkg/obs" + "github.com/cockroachdb/cockroach/pkg/obsservice/obspb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" @@ -129,8 +132,11 @@ type Server struct { migrationServer *migrationServer tsDB *ts.DB tsServer *ts.Server - raftTransport *kvserver.RaftTransport - stopper *stop.Stopper + // The Obserability Server, used by the Observability Service to subscribe to + // CRDB data. + obsServer *obs.EventsServer + raftTransport *kvserver.RaftTransport + stopper *stop.Stopper debug *debug.Server kvProber *kvprober.Prober @@ -767,6 +773,16 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { }) registry.AddMetricStruct(kvProber.Metrics()) + obsServer := obs.NewEventServer( + cfg.AmbientCtx, + timeutil.DefaultTimeSource{}, + stopper, + 5*time.Second, // maxStaleness + 1<<20, // triggerSizeBytes - 1MB + 10*1<<20, // maxBufferSizeBytes - 10MB + sqlMonitorAndMetrics.rootSQLMemoryMonitor, // memMonitor - this is not "SQL" usage, but we don't have another memory pool, + ) + settingsWriter := newSettingsCacheWriter(engines[0], stopper) sqlServer, err := newSQLServer(ctx, sqlServerArgs{ sqlServerOptionalKVArgs: sqlServerOptionalKVArgs{ @@ -810,6 +826,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { tenantUsageServer: tenantUsage, monitorAndMetrics: sqlMonitorAndMetrics, settingsStorage: settingsWriter, + eventsServer: obsServer, }) if err != nil { return nil, err @@ -863,6 +880,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { authentication: sAuth, tsDB: tsDB, tsServer: &sTS, + obsServer: obsServer, raftTransport: raftTransport, stopper: stopper, debug: debugServer, @@ -1112,6 +1130,11 @@ func (s *Server) PreStart(ctx context.Context) error { serverpb.RegisterMigrationServer(s.grpc.Server, migrationServer) s.migrationServer = migrationServer // only for testing via TestServer + // Register the Obserability Server, used by the Observability Service to + // subscribe to CRDB data. Note that the server will reject RPCs until + // SetResourceInfo is called later. + obspb.RegisterObsServer(s.grpc.Server, s.obsServer) + // Start the RPC server. This opens the RPC/SQL listen socket, // and dispatches the server worker for the RPC. // The SQL listener is returned, to start the SQL server later @@ -1247,6 +1270,9 @@ func (s *Server) PreStart(ctx context.Context) error { return errors.Wrap(err, "invalid init state") } + // Enable the Obs Server. + s.obsServer.SetResourceInfo(state.clusterID, int32(state.nodeID), build.BinaryVersion()) + // Apply any cached initial settings (and start the gossip listener) as early // as possible, to avoid spending time with stale settings. if err := initializeCachedSettings( diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 00b85c0cfb20..e6b9a70e149d 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -36,6 +36,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/multitenant" + "github.com/cockroachdb/cockroach/pkg/obs" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" @@ -340,6 +341,9 @@ type sqlServerArgs struct { // grpc is the RPC service. grpc *grpcServer + + // eventsServer communicates with the Observability Service. + eventsServer *obs.EventsServer } type monitorAndMetrics struct { @@ -825,6 +829,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { SystemTableIDResolver: descs.MakeSystemTableIDResolver(collectionFactory, cfg.circularInternalExecutor, cfg.db), ConsistencyChecker: consistencychecker.NewConsistencyChecker(cfg.db), RangeProber: rangeprober.NewRangeProber(cfg.db), + EventsExporter: cfg.eventsServer, } if sqlSchemaChangerTestingKnobs := cfg.TestingKnobs.SQLSchemaChanger; sqlSchemaChangerTestingKnobs != nil { diff --git a/pkg/server/tenant.go b/pkg/server/tenant.go index e7011608995b..7ef08e216f4a 100644 --- a/pkg/server/tenant.go +++ b/pkg/server/tenant.go @@ -30,6 +30,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptreconcile" "github.com/cockroachdb/cockroach/pkg/multitenant" "github.com/cockroachdb/cockroach/pkg/multitenant/tenantcostmodel" + "github.com/cockroachdb/cockroach/pkg/obs" + "github.com/cockroachdb/cockroach/pkg/obsservice/obspb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" @@ -50,6 +52,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/netutil" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/cockroachdb/redact" ) @@ -138,11 +141,6 @@ func startTenantInternal( if err != nil { return nil, nil, nil, "", "", err } - args.monitorAndMetrics = newRootSQLMemoryMonitor(monitorAndMetricsOptions{ - memoryPoolSize: args.MemoryPoolSize, - histogramWindowInterval: args.HistogramWindowInterval(), - settings: args.Settings, - }) closedSessionCache := sql.NewClosedSessionCache( baseCfg.Settings, args.monitorAndMetrics.rootSQLMemoryMonitor, time.Now) args.closedSessionCache = closedSessionCache @@ -331,6 +329,15 @@ func startTenantInternal( ); err != nil { return nil, nil, nil, "", "", err } + clusterID := args.rpcContext.LogicalClusterID.Get() + instanceID := s.SQLInstanceID() + if clusterID.Equal(uuid.Nil) { + log.Fatalf(ctx, "expected LogicalClusterID to be initialized after preStart") + } + if instanceID == 0 { + log.Fatalf(ctx, "expected SQLInstanceID to be initialized after preStart") + } + args.eventsServer.SetResourceInfo(clusterID, int32(instanceID), "unknown" /* version */) externalUsageFn := func(ctx context.Context) multitenant.ExternalUsage { userTimeMillis, sysTimeMillis, err := status.GetCPUTime(ctx) @@ -531,6 +538,26 @@ func makeTenantSQLServerArgs( sessionRegistry := sql.NewSessionRegistry() flowScheduler := flowinfra.NewFlowScheduler(baseCfg.AmbientCtx, stopper, st) + + monitorAndMetrics := newRootSQLMemoryMonitor(monitorAndMetricsOptions{ + memoryPoolSize: sqlCfg.MemoryPoolSize, + histogramWindowInterval: baseCfg.HistogramWindowInterval(), + settings: baseCfg.Settings, + }) + + // Create the EventServer. It will be made operational later, after the + // cluster ID is known, with a SetResourceInfo() call. + obsServer := obs.NewEventServer( + baseCfg.AmbientCtx, + timeutil.DefaultTimeSource{}, + stopper, + 5*time.Second, // maxStaleness + 1<<20, // triggerSizeBytes - 1MB + 10*1<<20, // maxBufferSizeBytes - 10MB + monitorAndMetrics.rootSQLMemoryMonitor, // memMonitor - this is not "SQL" usage, but we don't have another memory pool, + ) + obspb.RegisterObsServer(grpcServer.Server, obsServer) + return sqlServerArgs{ sqlServerOptionalKVArgs: sqlServerOptionalKVArgs{ nodesStatusServer: serverpb.MakeOptionalNodesStatusServer(nil), @@ -574,7 +601,9 @@ func makeTenantSQLServerArgs( regionsServer: tenantConnect, tenantStatusServer: tenantConnect, costController: costController, + monitorAndMetrics: monitorAndMetrics, grpc: grpcServer, + eventsServer: obsServer, }, nil } diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index c16d3341d331..3ee3c0cbb4e5 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -283,6 +283,10 @@ go_library( "//pkg/kv/kvserver/liveness/livenesspb", "//pkg/kv/kvserver/protectedts", "//pkg/multitenant", + "//pkg/obs", + "//pkg/obsservice/obspb", + "//pkg/obsservice/obspb/opentelemetry-proto/common/v1:common", + "//pkg/obsservice/obspb/opentelemetry-proto/logs/v1:logs", "//pkg/roachpb", "//pkg/rpc", "//pkg/rpc/nodedialer", diff --git a/pkg/sql/catalog/systemschema/system.go b/pkg/sql/catalog/systemschema/system.go index 850bff19392c..ab932fa4924e 100644 --- a/pkg/sql/catalog/systemschema/system.go +++ b/pkg/sql/catalog/systemschema/system.go @@ -116,6 +116,11 @@ CREATE TABLE system.lease ( CONSTRAINT "primary" PRIMARY KEY ("descID", version, expiration, "nodeID") );` + // system.eventlog contains notable events from the cluster. + // + // This data is also exported to the Observability Service. This table might + // go away in the future. + // // TODO(knz): targetID and reportingID are deprecated and should // be removed after v21.1 is released. Their content is now // available inside the info payload, which is a JSON blob. diff --git a/pkg/sql/event_log.go b/pkg/sql/event_log.go index 6672bd6d01b3..c46c672250fd 100644 --- a/pkg/sql/event_log.go +++ b/pkg/sql/event_log.go @@ -12,6 +12,7 @@ package sql import ( "context" + "encoding/binary" "fmt" "strings" @@ -19,6 +20,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/obs" + "github.com/cockroachdb/cockroach/pkg/obsservice/obspb" + v1 "github.com/cockroachdb/cockroach/pkg/obsservice/obspb/opentelemetry-proto/common/v1" + otel_logs_pb "github.com/cockroachdb/cockroach/pkg/obsservice/obspb/opentelemetry-proto/logs/v1" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" @@ -27,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" "github.com/cockroachdb/redact" ) @@ -111,6 +117,7 @@ import ( // (route) // | // +--> system.eventlog if not disabled by setting +// | └ also the Obs Service, if connected // | // +--> DEV channel if requested by log.V // | @@ -250,7 +257,7 @@ func logEventInternalForSchemaChanges( // wraps the call in a db.Txn() callback, which confuses the vmodule // filtering. Easiest is to pretend the event is sourced here. return insertEventRecords( - ctx, execCfg.InternalExecutor, + ctx, execCfg.InternalExecutor, execCfg.EventsExporter, txn, int32(execCfg.NodeID.SQLInstanceID()), /* reporter ID */ 1, /* depth: use this function as origin */ @@ -301,7 +308,7 @@ func logEventInternalForSQLStatements( return insertEventRecords( ctx, - execCfg.InternalExecutor, + execCfg.InternalExecutor, execCfg.EventsExporter, txn, int32(execCfg.NodeID.SQLInstanceID()), /* reporter ID */ 1+depth, /* depth */ @@ -356,7 +363,7 @@ func (l schemaChangerEventLogger) LogEventForSchemaChange( } scCommon.CommonSchemaChangeDetails().InstanceID = int32(l.execCfg.NodeID.SQLInstanceID()) return insertEventRecords( - ctx, l.execCfg.InternalExecutor, + ctx, l.execCfg.InternalExecutor, l.execCfg.EventsExporter, l.txn, int32(l.execCfg.NodeID.SQLInstanceID()), /* reporter ID */ 1, /* depth: use this function as origin */ @@ -400,7 +407,7 @@ func LogEventForJobs( // wraps the call in a db.Txn() callback, which confuses the vmodule // filtering. Easiest is to pretend the event is sourced here. return insertEventRecords( - ctx, execCfg.InternalExecutor, + ctx, execCfg.InternalExecutor, execCfg.EventsExporter, txn, int32(execCfg.NodeID.SQLInstanceID()), /* reporter ID */ 1, /* depth: use this function for vmodule filtering */ @@ -448,6 +455,7 @@ const ( func InsertEventRecord( ctx context.Context, ex *InternalExecutor, + eventsExporter obs.EventsExporter, txn *kv.Txn, reportingID int32, dst LogEventDestination, @@ -457,7 +465,7 @@ func InsertEventRecord( // We use depth=1 because the caller of this function typically // wraps the call in a db.Txn() callback, which confuses the vmodule // filtering. Easiest is to pretend the event is sourced here. - return insertEventRecords(ctx, ex, txn, reportingID, + return insertEventRecords(ctx, ex, eventsExporter, txn, reportingID, 1, /* depth: use this function */ eventLogOptions{dst: dst}, eventLogEntry{targetID: targetID, event: info}) @@ -476,6 +484,7 @@ func InsertEventRecord( func insertEventRecords( ctx context.Context, ex *InternalExecutor, + eventsExporter obs.EventsExporter, txn *kv.Txn, reportingID int32, depth int, @@ -526,18 +535,8 @@ func insertEventRecords( return nil } - // When logging to the system table, ensure that the external - // logging only sees the event when the transaction commits. - if opts.dst.hasFlag(LogExternally) { - txn.AddCommitTrigger(func(ctx context.Context) { - for i := range entries { - log.StructuredEvent(ctx, entries[i].event) - } - }) - } - - // The function below this point is specialized to write to the - // system table. + // Write to the system.eventlog table, to the event log, and to the Obs + // Service. const colsPerEvent = 5 const baseQuery = ` @@ -546,8 +545,18 @@ INSERT INTO system.eventlog ( ) VALUES($1, $2, $3, $4, $5)` args := make([]interface{}, 0, len(entries)*colsPerEvent) - constructArgs := func(reportingID int32, entry eventLogEntry) error { + + events := make([]otel_logs_pb.LogRecord, len(entries)) + sp := tracing.SpanFromContext(ctx) + var traceID, spanID [8]byte + if sp != nil { + binary.LittleEndian.PutUint64(traceID[:], uint64(sp.TraceID())) + binary.LittleEndian.PutUint64(spanID[:], uint64(sp.SpanID())) + } + for i := 0; i < len(entries); i++ { + entry := entries[i] event := entry.event + infoBytes := redact.RedactableBytes("{") _, infoBytes = event.AppendJSONFields(false /* printComma */, infoBytes) infoBytes = append(infoBytes, '}') @@ -563,27 +572,46 @@ VALUES($1, $2, $3, $4, $5)` reportingID, string(infoBytes), ) - return nil + + events[i] = otel_logs_pb.LogRecord{ + TimeUnixNano: uint64(timeutil.Now().UnixNano()), + Body: &v1.AnyValue{Value: &v1.AnyValue_StringValue{StringValue: args[len(args)-1].(string)}}, + Attributes: []*v1.KeyValue{{ + Key: obspb.EventlogEventTypeAttribute, + Value: &v1.AnyValue{&v1.AnyValue_StringValue{eventType}}, + }}, + TraceId: traceID[:], + SpanId: spanID[:], + } } + // When logging to the system table, ensure that the external logging and the + // Obs Service only sees the event when the transaction commits. + if opts.dst.hasFlag(LogExternally) { + txn.AddCommitTrigger(func(ctx context.Context) { + for i := range entries { + log.StructuredEvent(ctx, entries[i].event) + } + }) + } + txn.AddCommitTrigger(func(ctx context.Context) { + for i := range events { + ex.s.cfg.EventsExporter.SendEvent(ctx, obspb.EventlogEvent, events[i]) + } + }) + // In the common case where we have just 1 event, we want to skeep // the extra heap allocation and buffer operations of the loop // below. This is an optimization. query := baseQuery - if err := constructArgs(reportingID, entries[0]); err != nil { - return err - } if len(entries) > 1 { // Extend the query with additional VALUES clauses for all the // events after the first one. var completeQuery strings.Builder completeQuery.WriteString(baseQuery) - for _, extraEntry := range entries[1:] { + for range entries[1:] { placeholderNum := 1 + len(args) - if err := constructArgs(reportingID, extraEntry); err != nil { - return err - } fmt.Fprintf(&completeQuery, ", ($%d, $%d, $%d, $%d, $%d)", placeholderNum, placeholderNum+1, placeholderNum+2, placeholderNum+3, placeholderNum+4) } @@ -597,5 +625,6 @@ VALUES($1, $2, $3, $4, $5)` if rows != len(entries) { return errors.Errorf("%d rows affected by log insertion; expected %d rows affected.", rows, len(entries)) } + return nil } diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 56032b33d0dd..7cdf3b0dd294 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -43,6 +43,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/multitenant" + "github.com/cockroachdb/cockroach/pkg/obs" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/security/username" @@ -1307,6 +1308,9 @@ type ExecutorConfig struct { // RangeProber is used in calls to crdb_internal.probe_ranges. RangeProber eval.RangeProber + + // EventsExporter is the client for the Observability Service. + EventsExporter obs.EventsExporter } // UpdateVersionSystemSettingHook provides a callback that allows us diff --git a/pkg/testutils/lint/lint_test.go b/pkg/testutils/lint/lint_test.go index b40606c973bf..8b8888143eb6 100644 --- a/pkg/testutils/lint/lint_test.go +++ b/pkg/testutils/lint/lint_test.go @@ -227,6 +227,9 @@ func TestLint(t *testing.T) { stream.GrepNot(`/embedded.go`), stream.GrepNot(`geo/geographiclib/geodesic\.c$`), stream.GrepNot(`geo/geographiclib/geodesic\.h$`), + // The opentelemetry-proto files are copied from otel with their own + // license. + stream.GrepNot(`opentelemetry-proto/.*.proto$`), ), func(filename string) { file, err := os.Open(filepath.Join(pkgDir, filename)) if err != nil {