diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index d229ffa1d4b3..6c6fffb966df 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -1434,6 +1434,11 @@ GO_TARGETS = [ "//pkg/obsservice/obslib/ingest:ingest", "//pkg/obsservice/obslib/ingest:ingest_test", "//pkg/obsservice/obslib/migrations:migrations", + "//pkg/obsservice/obslib/obsutil:obstestutil", + "//pkg/obsservice/obslib/obsutil:obsutil", + "//pkg/obsservice/obslib/obsutil:testutil", + "//pkg/obsservice/obslib/transform:transform", + "//pkg/obsservice/obslib:obslib", "//pkg/obsservice/obspb/opentelemetry-proto/collector/logs/v1:logs_service", "//pkg/obsservice/obspb/opentelemetry-proto/common/v1:common", "//pkg/obsservice/obspb/opentelemetry-proto/logs/v1:logs", diff --git a/pkg/ccl/sqlproxyccl/proxy_handler_test.go b/pkg/ccl/sqlproxyccl/proxy_handler_test.go index 210c1230413f..23abc08eb643 100644 --- a/pkg/ccl/sqlproxyccl/proxy_handler_test.go +++ b/pkg/ccl/sqlproxyccl/proxy_handler_test.go @@ -212,61 +212,34 @@ func TestProxyProtocol(t *testing.T) { } } - t.Run("allow=true", func(t *testing.T) { - s, sqlAddr, httpAddr := withProxyProtocol(true) + s, sqlAddr, httpAddr := withProxyProtocol(true) - defer testutils.TestingHook(&validateFn, func(h *proxyproto.Header) error { - if h.SourceAddr.String() != "10.20.30.40:4242" { - return errors.Newf("got source addr %s, expected 10.20.30.40:4242", h.SourceAddr) - } - return nil - })() - - // Test SQL. Only request with PROXY should go through. - url := fmt.Sprintf("postgres://bob:builder@%s/tenant-cluster-42.defaultdb?sslmode=require", sqlAddr) - te.TestConnectWithPGConfig( - ctx, t, url, - func(c *pgx.ConnConfig) { - c.DialFunc = proxyDialer - }, - func(conn *pgx.Conn) { - require.Equal(t, int64(1), s.metrics.CurConnCount.Value()) - require.NoError(t, runTestQuery(ctx, conn)) - }, - ) - _ = te.TestConnectErr(ctx, t, url, codeClientReadFailed, "tls error") - - // Test HTTP. Should support with or without PROXY. - client := http.Client{Timeout: timeout} - makeHttpReq(t, &client, httpAddr, true) - proxyClient := http.Client{Transport: &http.Transport{DialContext: proxyDialer}} - makeHttpReq(t, &proxyClient, httpAddr, true) - }) - - t.Run("allow=false", func(t *testing.T) { - s, sqlAddr, httpAddr := withProxyProtocol(false) + defer testutils.TestingHook(&validateFn, func(h *proxyproto.Header) error { + if h.SourceAddr.String() != "10.20.30.40:4242" { + return errors.Newf("got source addr %s, expected 10.20.30.40:4242", h.SourceAddr) + } + return nil + })() - // Test SQL. Only request without PROXY should go through. - url := fmt.Sprintf("postgres://bob:builder@%s/tenant-cluster-42.defaultdb?sslmode=require", sqlAddr) - te.TestConnect(ctx, t, url, func(conn *pgx.Conn) { + // Test SQL. Only request with PROXY should go through. + url := fmt.Sprintf("postgres://bob:builder@%s/tenant-cluster-42.defaultdb?sslmode=require", sqlAddr) + te.TestConnectWithPGConfig( + ctx, t, url, + func(c *pgx.ConnConfig) { + c.DialFunc = proxyDialer + }, + func(conn *pgx.Conn) { require.Equal(t, int64(1), s.metrics.CurConnCount.Value()) require.NoError(t, runTestQuery(ctx, conn)) - }) - _ = te.TestConnectErrWithPGConfig( - ctx, t, url, - func(c *pgx.ConnConfig) { - c.DialFunc = proxyDialer - }, - codeClientReadFailed, - "tls error", - ) + }, + ) + _ = te.TestConnectErr(ctx, t, url, codeClientReadFailed, "tls error") - // Test HTTP. - client := http.Client{Timeout: timeout} - makeHttpReq(t, &client, httpAddr, true) - proxyClient := http.Client{Transport: &http.Transport{DialContext: proxyDialer}} - makeHttpReq(t, &proxyClient, httpAddr, false) - }) + // Test HTTP. Should support with or without PROXY. + client := http.Client{Timeout: timeout} + makeHttpReq(t, &client, httpAddr, true) + proxyClient := http.Client{Transport: &http.Transport{DialContext: proxyDialer}} + makeHttpReq(t, &proxyClient, httpAddr, true) } func TestPrivateEndpointsACL(t *testing.T) { diff --git a/pkg/cmd/roachtest/roachtestutil/mixedversion/BUILD.bazel b/pkg/cmd/roachtest/roachtestutil/mixedversion/BUILD.bazel index 31346f06cf84..a91372831bea 100644 --- a/pkg/cmd/roachtest/roachtestutil/mixedversion/BUILD.bazel +++ b/pkg/cmd/roachtest/roachtestutil/mixedversion/BUILD.bazel @@ -22,13 +22,15 @@ go_library( "//pkg/util/syncutil", "//pkg/util/timeutil", "//pkg/util/version", - "@com_github_cockroachdb_errors//:errors", ], ) go_test( name = "mixedversion_test", - srcs = ["planner_test.go"], + srcs = [ + "planner_test.go", + "runner_test.go", + ], args = ["-test.timeout=295s"], embed = [":mixedversion"], deps = [ diff --git a/pkg/cmd/roachtest/roachtestutil/mixedversion/planner_test.go b/pkg/cmd/roachtest/roachtestutil/mixedversion/planner_test.go index 56833b4f84e2..5ed1b42346f3 100644 --- a/pkg/cmd/roachtest/roachtestutil/mixedversion/planner_test.go +++ b/pkg/cmd/roachtest/roachtestutil/mixedversion/planner_test.go @@ -32,7 +32,7 @@ var ( Stdout: io.Discard, Stderr: io.Discard, } - l, err := cfg.NewLogger("" /* path */) + l, err := cfg.NewLogger("/dev/null" /* path */) if err != nil { panic(err) } diff --git a/pkg/cmd/roachtest/roachtestutil/mixedversion/runner.go b/pkg/cmd/roachtest/roachtestutil/mixedversion/runner.go index a82b2e1e74f9..f108de6055a0 100644 --- a/pkg/cmd/roachtest/roachtestutil/mixedversion/runner.go +++ b/pkg/cmd/roachtest/roachtestutil/mixedversion/runner.go @@ -19,6 +19,7 @@ import ( "path" "path/filepath" "regexp" + "runtime/debug" "strconv" "strings" "sync/atomic" @@ -32,7 +33,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" - "github.com/cockroachdb/errors" ) type ( @@ -249,7 +249,16 @@ func (tr *testRunner) runSingleStep(ctx context.Context, ss singleStep, l *logge prefix := fmt.Sprintf("FINISHED [%s]", timeutil.Since(start)) tr.logStep(prefix, ss, l) }() - if err := ss.Run(ctx, l, tr.cluster, tr.newHelper(ctx, l)); err != nil { + + if err := func() (retErr error) { + defer func() { + if r := recover(); r != nil { + l.Printf("panic stack trace:\n%s", string(debug.Stack())) + retErr = fmt.Errorf("panic (stack trace above): %v", r) + } + }() + return ss.Run(ctx, l, tr.cluster, tr.newHelper(ctx, l)) + }(); err != nil { if isContextCanceled(err) { l.Printf("step terminated (context canceled)") // Avoid creating a `stepError` (which involves querying binary @@ -331,7 +340,7 @@ func (tr *testRunner) testFailure(desc string, l *logger.Logger) error { tr.logger.Printf("could not rename failed step logger: %v", err) } - return errors.WithStack(tf) + return tf } func (tr *testRunner) logStep(prefix string, step singleStep, l *logger.Logger) { diff --git a/pkg/cmd/roachtest/roachtestutil/mixedversion/runner_test.go b/pkg/cmd/roachtest/roachtestutil/mixedversion/runner_test.go new file mode 100644 index 000000000000..62662f30c359 --- /dev/null +++ b/pkg/cmd/roachtest/roachtestutil/mixedversion/runner_test.go @@ -0,0 +1,85 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package mixedversion + +import ( + "context" + "fmt" + "testing" + + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" + "github.com/cockroachdb/cockroach/pkg/roachprod/logger" + "github.com/stretchr/testify/require" +) + +func Test_runSingleStep(t *testing.T) { + tr := testTestRunner() + + // steps that run without errors do not return errors + successStep := newTestStep(func() error { + return nil + }) + err := tr.runSingleStep(ctx, successStep, nilLogger) + require.NoError(t, err) + + // steps that return an error have that error surfaced + errorStep := newTestStep(func() error { + return fmt.Errorf("oops") + }) + err = tr.runSingleStep(ctx, errorStep, nilLogger) + require.Error(t, err) + require.Contains(t, err.Error(), "oops") + + // steps that panic cause an error to be returned + panicStep := newTestStep(func() error { + var ids []int + if ids[0] > 42 { + return nil + } + return fmt.Errorf("unreachable") + }) + err = nil + require.NotPanics(t, func() { + err = tr.runSingleStep(ctx, panicStep, nilLogger) + }) + require.Error(t, err) + require.Contains(t, err.Error(), "panic (stack trace above): runtime error: index out of range [0] with length 0") +} + +func testTestRunner() *testRunner { + runnerCtx, cancel := context.WithCancel(ctx) + return &testRunner{ + ctx: runnerCtx, + cancel: cancel, + logger: nilLogger, + crdbNodes: nodes, + background: newBackgroundRunner(runnerCtx), + seed: seed, + } +} + +type testSingleStep struct { + runFunc func() error +} + +func (testSingleStep) ID() int { return 42 } +func (testSingleStep) Description() string { return "testSingleStep" } +func (testSingleStep) Background() shouldStop { return nil } + +func (tss testSingleStep) Run( + _ context.Context, _ *logger.Logger, _ cluster.Cluster, _ *Helper, +) error { + return tss.runFunc() +} + +func newTestStep(f func() error) singleStep { + return testSingleStep{runFunc: f} +} diff --git a/pkg/gen/protobuf.bzl b/pkg/gen/protobuf.bzl index 83b96208c1b2..49a30261f36b 100644 --- a/pkg/gen/protobuf.bzl +++ b/pkg/gen/protobuf.bzl @@ -50,6 +50,7 @@ PROTOBUF_SRCS = [ "//pkg/obsservice/obspb/opentelemetry-proto/common/v1:v1_go_proto", "//pkg/obsservice/obspb/opentelemetry-proto/logs/v1:v1_go_proto", "//pkg/obsservice/obspb/opentelemetry-proto/resource/v1:v1_go_proto", + "//pkg/obsservice/obspb:obspb_go_proto", "//pkg/repstream/streampb:streampb_go_proto", "//pkg/roachpb:roachpb_go_proto", "//pkg/rpc:rpc_go_proto", diff --git a/pkg/obs/BUILD.bazel b/pkg/obs/BUILD.bazel index 263d1375f47d..32abca429406 100644 --- a/pkg/obs/BUILD.bazel +++ b/pkg/obs/BUILD.bazel @@ -10,6 +10,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/base", + "//pkg/obsservice/obslib", "//pkg/obsservice/obspb", "//pkg/obsservice/obspb/opentelemetry-proto/collector/logs/v1:logs_service", "//pkg/obsservice/obspb/opentelemetry-proto/common/v1:common", diff --git a/pkg/obs/event_exporter.go b/pkg/obs/event_exporter.go index 714c0c06711c..0d9fece6b47b 100644 --- a/pkg/obs/event_exporter.go +++ b/pkg/obs/event_exporter.go @@ -16,6 +16,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/obsservice/obslib" "github.com/cockroachdb/cockroach/pkg/obsservice/obspb" otel_collector_pb "github.com/cockroachdb/cockroach/pkg/obsservice/obspb/opentelemetry-proto/collector/logs/v1" otel_pb "github.com/cockroachdb/cockroach/pkg/obsservice/obspb/opentelemetry-proto/common/v1" @@ -94,6 +95,10 @@ type EventExporterTestingKnobs struct { // FlushTriggerByteSize, if set, overrides the default trigger value for the // EventExporter. FlushTriggerByteSize uint64 + // TestConsumer, if set, sets the consumer to be used by the embedded ingest + // component used. This allows us to capture consumed events when running + // in embedded mode, so we can make assertions against them in tests. + TestConsumer obslib.EventConsumer } var _ base.ModuleTestingKnobs = &EventExporterTestingKnobs{} diff --git a/pkg/obsservice/README.md b/pkg/obsservice/README.md index bfa0622ccbd5..861e5332e799 100644 --- a/pkg/obsservice/README.md +++ b/pkg/obsservice/README.md @@ -9,6 +9,10 @@ The Obs Service is developed as a library (in the `obslib` package) and a binary embed and extend the library (for example we imagine CockroachCloud doing so in the future). +**Note**: Serving DB Console is no longer a core part of the planned utility of the +Obs Service. However, as the functionality to serve the DB Console is maintained for +now, in case it proves useful down the line. + ## Building the Obs Service Build with @@ -46,7 +50,8 @@ exporters: tls: insecure: true ``` -- `--http-addr` is the address on which the DB Console is served. +- `--http-addr` is the address on which the DB Console is served. NB: This feature may + be removed in the future. See note above in [header section](#CockroachDB-Observability-Service) - `--crdb-http-url` is CRDB's HTTP address. For a multi-node CRDB cluster, this can point to a load-balancer. It can be either a HTTP or an HTTPS address, depending on whether the CRDB cluster is running as `--insecure`. @@ -70,10 +75,6 @@ exporters: one created with `cockroach cert create-ca`). If specified, HTTP requests are only proxied to CRDB nodes that present certificates signed by this CA. If not specified, the system's CA list is used. -- `--sink-pgurl` is the connection string for the sink cluster. If the pgurl - contains a database name, that database will be used; otherwise `obsservice` - will be used. If not specified, a connection to a local cluster will be - attempted. ## Functionality @@ -86,13 +87,8 @@ In the current fledgling state, the Obs Service does a couple of things: 3. The Obs Service exposes the OTLP Logs gRPC service and is able to ingest events received through calls to this RPC service. Only insecure gRPC - connections are supported at the moment. - -4. The Obs Service connects to a sink cluster identified by `--sink-pgurl`. The - required schema is automatically created using SQL migrations run with - [goose](https://github.com/pressly/goose). The state of migrations in a sink - cluster can be inspected through the `observice.obs_admin.migrations` table. - The ingested events are saved in the sink cluster. + connections are supported at the moment. Events are ingested into the + Obs Service for aggregation and eventual storage. ## Event ingestion @@ -106,22 +102,10 @@ and,within that, into [`ScopeLogs`](https://github.com/open-telemetry/opentelemetry-proto/blob/200ccff768a29f8bd431e0a4a463da7ed58be557/opentelemetry/proto/logs/v1/logs.proto#L64). A resource identifies the cluster/node/tenant that is emitting the respective events. A scope identifies the type of event; events of different types get -persisted in different tables, based on this event type. Events of unrecognized -types are dropped. Currently, a single event type is supported: `"eventlog"`. +routed to different processing pipelines, based on this event type. Events of +unrecognized types are dropped. Currently, a single event type is supported: `"eventlog"`. The log records carry attributes and a JSON payload representing the event. -The mapping between event types and tables is listed in the table below: - -| Event type | Table | Attributes | -|------------|----------------|--------------| -| eventlog | cluster_events | {event_type} | - -Each table storing events can have a different schema. It is expected that these -tables store some event fields as columns and otherwise store the raw event in a -JSON column. The values of the different columns can come from the attributes of -the log record (listed in the table above), or from the JSON itself. Virtual or -computed columns can be used to extract data from the JSON directly. - ## Licensing The Observability Service is licensed as Apache 2.0. diff --git a/pkg/obsservice/cmd/obsservice/BUILD.bazel b/pkg/obsservice/cmd/obsservice/BUILD.bazel index ef2ce1226b61..e76399568641 100644 --- a/pkg/obsservice/cmd/obsservice/BUILD.bazel +++ b/pkg/obsservice/cmd/obsservice/BUILD.bazel @@ -9,14 +9,13 @@ go_library( "//pkg/cli/exit", "//pkg/obsservice/obslib/httpproxy", "//pkg/obsservice/obslib/ingest", - "//pkg/obsservice/obslib/migrations", + "//pkg/obsservice/obslib/obsutil", "//pkg/obsservice/obspb/opentelemetry-proto/collector/logs/v1:logs_service", "//pkg/ui/distoss", "//pkg/util/log", "//pkg/util/stop", "//pkg/util/sysutil", "@com_github_cockroachdb_errors//:errors", - "@com_github_jackc_pgx_v4//pgxpool", "@com_github_spf13_cobra//:cobra", "@org_golang_google_grpc//:go_default_library", "@org_golang_x_sys//unix", diff --git a/pkg/obsservice/cmd/obsservice/main.go b/pkg/obsservice/cmd/obsservice/main.go index 4557c514a537..461225beb7e8 100644 --- a/pkg/obsservice/cmd/obsservice/main.go +++ b/pkg/obsservice/cmd/obsservice/main.go @@ -20,14 +20,13 @@ import ( "github.com/cockroachdb/cockroach/pkg/cli/exit" "github.com/cockroachdb/cockroach/pkg/obsservice/obslib/httpproxy" "github.com/cockroachdb/cockroach/pkg/obsservice/obslib/ingest" - "github.com/cockroachdb/cockroach/pkg/obsservice/obslib/migrations" + "github.com/cockroachdb/cockroach/pkg/obsservice/obslib/obsutil" logspb "github.com/cockroachdb/cockroach/pkg/obsservice/obspb/opentelemetry-proto/collector/logs/v1" _ "github.com/cockroachdb/cockroach/pkg/ui/distoss" // web UI init hooks "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/sysutil" "github.com/cockroachdb/errors" - "github.com/jackc/pgx/v4/pgxpool" "github.com/spf13/cobra" "golang.org/x/sys/unix" "google.golang.org/grpc" @@ -47,9 +46,6 @@ var drainSignals = []os.Signal{unix.SIGINT, unix.SIGTERM} // shutdown (i.e. second occurrence does not incur hard shutdown). var termSignal os.Signal = unix.SIGTERM -// defaultSinkDBName is the name of the database to be used by default. -const defaultSinkDBName = "obsservice" - // RootCmd represents the base command when called without any subcommands var RootCmd = &cobra.Command{ Use: "obsservice", @@ -67,18 +63,18 @@ from one or more CockroachDB clusters.`, UICertKeyPath: uiCertKeyPath, } - connCfg, err := pgxpool.ParseConfig(sinkPGURL) - if err != nil { - return errors.Wrapf(err, "invalid --sink-pgurl (%s)", sinkPGURL) - } - if connCfg.ConnConfig.Database == "" { - fmt.Printf("No database explicitly provided in --sink-pgurl. Using %q.\n", defaultSinkDBName) - connCfg.ConnConfig.Database = defaultSinkDBName - } - - if err := migrations.RunDBMigrations(ctx, connCfg.ConnConfig); err != nil { - return errors.Wrap(err, "failed to run DB migrations") - } + // TODO(abarganier): migrate DB migrations over to target storage for aggregated outputs + //connCfg, err := pgxpool.ParseConfig(sinkPGURL) + //if err != nil { + // return errors.Wrapf(err, "invalid --sink-pgurl (%s)", sinkPGURL) + //} + //if connCfg.ConnConfig.Database == "" { + // fmt.Printf("No database explicitly provided in --sink-pgurl. Using %q.\n", defaultSinkDBName) + // connCfg.ConnConfig.Database = defaultSinkDBName + //} + //if err := migrations.RunDBMigrations(ctx, connCfg.ConnConfig); err != nil { + // return errors.Wrap(err, "failed to run DB migrations") + //} signalCh := make(chan os.Signal, 1) signal.Notify(signalCh, drainSignals...) @@ -86,19 +82,16 @@ from one or more CockroachDB clusters.`, stop := stop.NewStopper() // Run the event ingestion in the background. - ingester, err := ingest.MakeEventIngester(ctx, connCfg) - if err != nil { - return err - } + consumer := &obsutil.StdOutConsumer{} + ingester := ingest.MakeEventIngester(ctx, consumer, nil) listener, err := net.Listen("tcp", otlpAddr) if err != nil { return errors.Wrapf(err, "failed to listen for incoming HTTP connections on address %s", otlpAddr) } fmt.Printf("Listening for OTLP connections on %s.", otlpAddr) grpcServer := grpc.NewServer() - logspb.RegisterLogsServiceServer(grpcServer, &ingester) + logspb.RegisterLogsServiceServer(grpcServer, ingester) if err := stop.RunAsyncTask(ctx, "event ingester", func(ctx context.Context) { - defer ingester.Close() _ = grpcServer.Serve(listener) }); err != nil { return err diff --git a/pkg/obsservice/obslib/BUILD.bazel b/pkg/obsservice/obslib/BUILD.bazel new file mode 100644 index 000000000000..6925c226b127 --- /dev/null +++ b/pkg/obsservice/obslib/BUILD.bazel @@ -0,0 +1,9 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "obslib", + srcs = ["consumer.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/obsservice/obslib", + visibility = ["//visibility:public"], + deps = ["//pkg/obsservice/obspb"], +) diff --git a/pkg/obsservice/obslib/consumer.go b/pkg/obsservice/obslib/consumer.go new file mode 100644 index 000000000000..e36bd9b91d70 --- /dev/null +++ b/pkg/obsservice/obslib/consumer.go @@ -0,0 +1,27 @@ +// Copyright 2023 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 + +package obslib + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/obsservice/obspb" +) + +// An EventConsumer represents a component in the obsservice +// that's capable of processing an event. +// +// A chain of EventConsumer's is generally used to process events, +// where one Consumer is given a reference to the next Consumer +// to use once it's finished processing the event. +type EventConsumer interface { + // Consume consumes the provided obspb.Event into the component implementing + // the EventConsumer interface. + Consume(ctx context.Context, event *obspb.Event) error +} diff --git a/pkg/obsservice/obslib/ingest/BUILD.bazel b/pkg/obsservice/obslib/ingest/BUILD.bazel index 83cc23cad8bb..1d8129b5bbf3 100644 --- a/pkg/obsservice/obslib/ingest/BUILD.bazel +++ b/pkg/obsservice/obslib/ingest/BUILD.bazel @@ -2,56 +2,56 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "ingest", - srcs = ["ingest.go"], + srcs = ["grpc_ingest.go"], importpath = "github.com/cockroachdb/cockroach/pkg/obsservice/obslib/ingest", visibility = ["//visibility:public"], deps = [ - "//pkg/obsservice/obspb", + "//pkg/obsservice/obslib", + "//pkg/obsservice/obslib/transform", "//pkg/obsservice/obspb/opentelemetry-proto/collector/logs/v1:logs_service", - "//pkg/obsservice/obspb/opentelemetry-proto/logs/v1:logs", - "//pkg/roachpb", "//pkg/util/log", "//pkg/util/timeutil", - "//pkg/util/uuid", "@com_github_cockroachdb_errors//:errors", - "@com_github_jackc_pgx_v4//pgxpool", ], ) go_test( name = "ingest_test", srcs = [ - "ingest_test.go", + "grpc_ingest_test.go", + "ingest_integration_test.go", "main_test.go", ], args = ["-test.timeout=295s"], + data = glob(["testdata/**"]), embed = [":ingest"], deps = [ "//pkg/base", "//pkg/obs", - "//pkg/obsservice/obslib/migrations", + "//pkg/obsservice/obslib/obsutil", "//pkg/obsservice/obspb", "//pkg/obsservice/obspb/opentelemetry-proto/collector/logs/v1:logs_service", "//pkg/obsservice/obspb/opentelemetry-proto/common/v1:common", "//pkg/obsservice/obspb/opentelemetry-proto/logs/v1:logs", "//pkg/obsservice/obspb/opentelemetry-proto/resource/v1:resource", + "//pkg/roachpb", + "//pkg/rpc", "//pkg/security/securityassets", "//pkg/security/securitytest", - "//pkg/security/username", "//pkg/server", + "//pkg/settings/cluster", "//pkg/testutils", "//pkg/testutils/serverutils", - "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", "//pkg/util/leaktest", "//pkg/util/log", "//pkg/util/randutil", "//pkg/util/stop", "//pkg/util/timeutil", + "@com_github_cockroachdb_datadriven//:datadriven", "@com_github_cockroachdb_errors//:errors", "@com_github_google_uuid//:uuid", - "@com_github_jackc_pgx_v4//pgxpool", + "@com_github_kr_pretty//:pretty", "@com_github_stretchr_testify//require", - "@org_golang_google_grpc//:go_default_library", ], ) diff --git a/pkg/obsservice/obslib/ingest/grpc_ingest.go b/pkg/obsservice/obslib/ingest/grpc_ingest.go new file mode 100644 index 000000000000..dbbe5c7eb464 --- /dev/null +++ b/pkg/obsservice/obslib/ingest/grpc_ingest.go @@ -0,0 +1,81 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 + +package ingest + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/obsservice/obslib" + "github.com/cockroachdb/cockroach/pkg/obsservice/obslib/transform" + logspb "github.com/cockroachdb/cockroach/pkg/obsservice/obspb/opentelemetry-proto/collector/logs/v1" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/errors" +) + +// EventIngester implements the OTLP Logs gRPC service, accepting connections +// and ingesting events. +type EventIngester struct { + consumer obslib.EventConsumer + timeSource timeutil.TimeSource +} + +var _ logspb.LogsServiceServer = &EventIngester{} + +// MakeEventIngester creates a new EventIngester. Callers can optionally +// provide a timeutil.TimeSource which will be used when determining ingestion +// timestamps. If nil is provided, timeutil.DefaultTimeSource will be used. +func MakeEventIngester( + _ context.Context, consumer obslib.EventConsumer, timeSource timeutil.TimeSource, +) *EventIngester { + if timeSource == nil { + timeSource = timeutil.DefaultTimeSource{} + } + return &EventIngester{ + consumer: consumer, + timeSource: timeSource, + } +} + +// Export implements the LogsServiceServer gRPC service. +// +// NB: "Export" is a bit of a misnomer here. On the client side, +// it makes sense, but on this end of the wire, we are *Ingesting* +// events, not exporting them. This is simply the receiving end +// of that process. This is done to maintain compatibility between +// the OpenTelemetry Collector and our EventsExporter. +func (e *EventIngester) Export( + ctx context.Context, request *logspb.ExportLogsServiceRequest, +) (*logspb.ExportLogsServiceResponse, error) { + if err := e.unpackAndConsumeEvents(ctx, request); err != nil { + log.Errorf(ctx, "error consuming events: %v", err) + return nil, err + } + return &logspb.ExportLogsServiceResponse{}, nil +} + +// TODO(abarganier): Add context cancellation here to cap transformation/unpack time. +// TODO(abarganier): Add metric to track context cancellations (counter tracking failed transformations) +func (e *EventIngester) unpackAndConsumeEvents( + ctx context.Context, request *logspb.ExportLogsServiceRequest, +) error { + ingestTime := e.timeSource.Now() + var retErr error = nil + for _, resource := range request.ResourceLogs { + for _, scopeLogs := range resource.ScopeLogs { + for _, logRecord := range scopeLogs.LogRecords { + transformed := transform.LogRecordToEvent(ingestTime, resource.Resource, scopeLogs.Scope, logRecord) + if err := e.consumer.Consume(ctx, transformed); err != nil { + retErr = errors.CombineErrors(retErr, err) + } + } + } + } + return retErr +} diff --git a/pkg/obsservice/obslib/ingest/grpc_ingest_test.go b/pkg/obsservice/obslib/ingest/grpc_ingest_test.go new file mode 100644 index 000000000000..9b6f172b4b6b --- /dev/null +++ b/pkg/obsservice/obslib/ingest/grpc_ingest_test.go @@ -0,0 +1,150 @@ +// Copyright 2023 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 + +package ingest + +import ( + "bytes" + "context" + "fmt" + "strings" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/obsservice/obslib/obsutil" + "github.com/cockroachdb/cockroach/pkg/obsservice/obspb" + logspb "github.com/cockroachdb/cockroach/pkg/obsservice/obspb/opentelemetry-proto/collector/logs/v1" + v12 "github.com/cockroachdb/cockroach/pkg/obsservice/obspb/opentelemetry-proto/common/v1" + v1 "github.com/cockroachdb/cockroach/pkg/obsservice/obspb/opentelemetry-proto/logs/v1" + otel_res_pb "github.com/cockroachdb/cockroach/pkg/obsservice/obspb/opentelemetry-proto/resource/v1" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/datadriven" + "github.com/google/uuid" + "github.com/kr/pretty" + "github.com/stretchr/testify/require" +) + +func TestGRPCIngest(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + clusterID := uuid.MustParse("44875af2-aea5-4965-8f9c-63fec244fd41") + testResource := &otel_res_pb.Resource{ + Attributes: []*v12.KeyValue{ + { + Key: obspb.ClusterID, + Value: &v12.AnyValue{Value: &v12.AnyValue_StringValue{StringValue: clusterID.String()}}, + }, + }, + } + testTimeSource := timeutil.NewManualTime(time.Date(2023, 6, 26, 12, 1, 0, 0, time.UTC)) + + datadriven.RunTest(t, "testdata/grpc_ingest", func(t *testing.T, d *datadriven.TestData) string { + ctx := context.Background() + testConsumer := obsutil.NewTestCaptureConsumer() + e := MakeEventIngester(ctx, testConsumer, testTimeSource) + + req := newReqBuilder(testResource) + for _, line := range strings.Split(d.Input, "\n") { + fields := strings.Split(line, ",") + require.Len(t, fields, 2) + req.withLogEvent(fields[0], fields[1]) + } + + _, err := e.Export(ctx, req.build()) + require.NoError(t, err) + + var buf bytes.Buffer + for _, event := range testConsumer.Events() { + fmt.Fprintf(&buf, "%# v\n", pretty.Formatter(event)) + } + return buf.String() + }) +} + +// reqBuilder uses a builder pattern to incrementally build a +// logspb.ExportLogsServiceRequest with various LogRecord events +// of a given type. +// +// When you're finished building the request, use build() to +// finalize the request object. +type reqBuilder struct { + // The request we're building. + req *logspb.ExportLogsServiceRequest + // Accumulated LogRecords, segmented by event type. We use + // a struct slice here instead of a map to provide deterministic + // ordering when iterating. + scopeLogs []*scopeLogs + // The last event timestamp, starting at a static value. + // Each LogEvent added to the builder will have a timestamp + // that increments from this timestamp. The original value is + // static, meaning that timestamps will be deterministic for + // each run so long as the order in which they're added is the + // same. + lastTimestamp int64 +} + +type scopeLogs struct { + eventType string + logs []v1.LogRecord +} + +func newReqBuilder(resource *otel_res_pb.Resource) *reqBuilder { + return &reqBuilder{ + req: &logspb.ExportLogsServiceRequest{ + ResourceLogs: []*v1.ResourceLogs{ + {Resource: resource}, + }, + }, + scopeLogs: make([]*scopeLogs, 0), + lastTimestamp: time.Date(2023, 6, 26, 12, 0, 0, 0, time.UTC).UnixNano(), + } +} + +func (r *reqBuilder) withLogEvent(eventType string, data string) *reqBuilder { + var sl *scopeLogs + for _, s := range r.scopeLogs { + if s.eventType == eventType { + sl = s + break + } + } + if sl == nil { + sl = &scopeLogs{ + eventType: eventType, + logs: make([]v1.LogRecord, 0), + } + r.scopeLogs = append(r.scopeLogs, sl) + } + timestamp := r.lastTimestamp + 10000000 + r.lastTimestamp = timestamp + sl.logs = append(sl.logs, v1.LogRecord{ + TimeUnixNano: uint64(timestamp), + Body: &v12.AnyValue{Value: &v12.AnyValue_StringValue{StringValue: data}}, + Attributes: []*v12.KeyValue{{ + Key: obspb.EventlogEventTypeAttribute, + Value: &v12.AnyValue{Value: &v12.AnyValue_StringValue{StringValue: eventType}}, + }}, + }) + return r +} + +func (r *reqBuilder) build() *logspb.ExportLogsServiceRequest { + for _, sl := range r.scopeLogs { + r.req.ResourceLogs[0].ScopeLogs = append(r.req.ResourceLogs[0].ScopeLogs, v1.ScopeLogs{ + Scope: &v12.InstrumentationScope{ + Name: sl.eventType, + Version: "1.0", + }, + LogRecords: sl.logs, + }) + } + return r.req +} diff --git a/pkg/obsservice/obslib/ingest/ingest.go b/pkg/obsservice/obslib/ingest/ingest.go deleted file mode 100644 index bbd0463be51c..000000000000 --- a/pkg/obsservice/obslib/ingest/ingest.go +++ /dev/null @@ -1,160 +0,0 @@ -// Copyright 2022 The Cockroach Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 - -package ingest - -import ( - "context" - "fmt" - "strings" - - "github.com/cockroachdb/cockroach/pkg/obsservice/obspb" - logspb "github.com/cockroachdb/cockroach/pkg/obsservice/obspb/opentelemetry-proto/collector/logs/v1" - otlogs "github.com/cockroachdb/cockroach/pkg/obsservice/obspb/opentelemetry-proto/logs/v1" - "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/timeutil" - "github.com/cockroachdb/cockroach/pkg/util/uuid" - "github.com/cockroachdb/errors" - "github.com/jackc/pgx/v4/pgxpool" -) - -// EventIngester implements the OTLP Logs gRPC service, accepting connections -// and ingesting events. -type EventIngester struct { - db *pgxpool.Pool -} - -var _ logspb.LogsServiceServer = &EventIngester{} - -func MakeEventIngester(ctx context.Context, cfg *pgxpool.Config) (EventIngester, error) { - pool, err := pgxpool.ConnectConfig(ctx, cfg) - if err != nil { - return EventIngester{}, errors.Wrap(err, "failed to connect to sink database") - } - return EventIngester{ - db: pool, - }, nil -} - -// Close closes the database connections opened by the ingester. -func (e EventIngester) Close() { - e.db.Close() -} - -// Export implements the LogsServiceServer gRPC service. -func (e *EventIngester) Export( - ctx context.Context, request *logspb.ExportLogsServiceRequest, -) (*logspb.ExportLogsServiceResponse, error) { - if e.db == nil { - return nil, errors.AssertionFailedf("SetDB not called before incoming Export() RPC") - } - if err := persistEvents(ctx, request.ResourceLogs, e.db); err != nil { - log.Warningf(ctx, "failed to persist events: %s", err) - } - return &logspb.ExportLogsServiceResponse{}, nil -} - -// persistEvents writes events to the database. -func persistEvents(ctx context.Context, events []*otlogs.ResourceLogs, db *pgxpool.Pool) error { - for _, group := range events { - var clusterID uuid.UUID - var nodeID roachpb.NodeID - for _, att := range group.Resource.Attributes { - switch att.Key { - case obspb.ClusterID: - var err error - clusterID, err = uuid.FromString(att.Value.GetStringValue()) - if err != nil { - log.Warningf(ctx, "invalid cluster ID: %s", att.Value) - continue - } - case obspb.NodeID: - nodeID = roachpb.NodeID(att.Value.GetIntValue()) - if nodeID == 0 { - log.Warningf(ctx, "invalid node ID: %s", att.Value) - continue - } - } - } - if clusterID.Equal(uuid.UUID{}) || nodeID == 0 { - log.Warning(ctx, "clusterID or nodeID not set") - continue - } - for _, scope := range group.ScopeLogs { - switch scope.Scope.Name { - case string(obspb.EventlogEvent): - if err := persistEventlogEvents(ctx, scope, clusterID, nodeID, db); err != nil { - log.Warningf(ctx, "error persisting events: %s", err) - return err - } - } - } - } - return nil -} - -const maxEventsPerStatement = 100 - -// persistEventlogEvents writes "eventlog" events to the database. -func persistEventlogEvents( - ctx context.Context, - events otlogs.ScopeLogs, - clusterID uuid.UUID, - nodeID roachpb.NodeID, - db *pgxpool.Pool, -) error { - if events.Scope.Name != string(obspb.EventlogEvent) { - panic(fmt.Sprintf("wrong event type: %s", events.Scope.Name)) - } - - // We're going to insert the events in chunks. - evs := events.LogRecords - for len(evs) > 0 { - chunk := evs - if len(evs) > maxEventsPerStatement { - chunk = evs[:maxEventsPerStatement] - } - evs = evs[len(chunk):] - - // Build and execute a statement for the chunk. - var sb strings.Builder - _, _ = sb.WriteString("INSERT INTO cluster_events(timestamp, cluster_id, instance_id, event_type, event) VALUES ") - const colsPerEvent = 5 - args := make([]interface{}, 0, len(chunk)*colsPerEvent) - argNum := 1 - for i, ev := range chunk { - var eventType string - for _, kv := range ev.Attributes { - if kv.Key == obspb.EventlogEventTypeAttribute { - eventType = kv.Value.GetStringValue() - } - } - data := ev.Body.GetStringValue() - if i != 0 { - sb.WriteString(", ") - } - sb.WriteString(fmt.Sprintf("($%d, $%d, $%d, $%d, $%d)", - argNum, argNum+1, argNum+2, argNum+3, argNum+4)) - argNum += 5 - args = append(args, - timeutil.Unix(0, int64(ev.TimeUnixNano)), - clusterID, - nodeID, - eventType, - data) - } - - log.VEventf(ctx, 2, "inserting %d events", len(chunk)) - _, err := db.Exec(ctx, sb.String(), args...) - if err != nil { - return err - } - } - return nil -} diff --git a/pkg/obsservice/obslib/ingest/ingest_integration_test.go b/pkg/obsservice/obslib/ingest/ingest_integration_test.go new file mode 100644 index 000000000000..06b0932bdf04 --- /dev/null +++ b/pkg/obsservice/obslib/ingest/ingest_integration_test.go @@ -0,0 +1,139 @@ +// Copyright 2023 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 + +package ingest + +import ( + "context" + gosql "database/sql" + "encoding/json" + "net" + "strings" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/obs" + "github.com/cockroachdb/cockroach/pkg/obsservice/obslib/obsutil" + "github.com/cockroachdb/cockroach/pkg/obsservice/obspb" + logspb "github.com/cockroachdb/cockroach/pkg/obsservice/obspb/opentelemetry-proto/collector/logs/v1" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/rpc" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +// Test an end-to-end integration between the ObsService and a CRDB cluster: +// verify that events get exported from CRDB and imported in the Obs Service. +func TestEventIngestionIntegration(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + + testutils.RunTrueAndFalse(t, "embed", func(t *testing.T, embed bool) { + var obsAddr string + + var s serverutils.TestServerInterface + var sqlDB *gosql.DB + testConsumer := obsutil.NewTestCaptureConsumer() + if !embed { + // Allocate a port for the ingestion service to work around a circular + // dependency: CRDB needs to be told what the port is, but we can only create + // the event ingester after having started CRDB (because the ingester wants a + // reference to CRDB). + otlpListener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + defer func() { + _ = otlpListener.Close() + }() + obsAddr = otlpListener.Addr().String() + s, sqlDB, _ = serverutils.StartServer(t, + base.TestServerArgs{ + ObsServiceAddr: obsAddr, + Knobs: base.TestingKnobs{ + EventExporter: &obs.EventExporterTestingKnobs{ + // Flush every message. + FlushTriggerByteSize: 1, + }, + }, + }, + ) + defer s.Stopper().Stop(ctx) + + // Start the ingestion in the background. + obsStop := stop.NewStopper() + defer obsStop.Stop(ctx) + e := MakeEventIngester(ctx, testConsumer, nil) + rpcContext := rpc.NewContext(ctx, + rpc.ContextOptions{ + TenantID: roachpb.SystemTenantID, + NodeID: &base.NodeIDContainer{}, + Config: &base.Config{Insecure: true}, + Clock: &timeutil.DefaultTimeSource{}, + ToleratedOffset: time.Nanosecond, + Stopper: obsStop, + Settings: cluster.MakeTestingClusterSettings(), + }) + grpcServer, err := rpc.NewServer(rpcContext) + require.NoError(t, err) + defer grpcServer.Stop() + logspb.RegisterLogsServiceServer(grpcServer, e) + go func() { + _ = grpcServer.Serve(otlpListener) + }() + } else { + s, sqlDB, _ = serverutils.StartServer(t, + base.TestServerArgs{ + ObsServiceAddr: base.ObsServiceEmbedFlagValue, + Knobs: base.TestingKnobs{ + EventExporter: &obs.EventExporterTestingKnobs{ + // Flush every message. + FlushTriggerByteSize: 1, + TestConsumer: testConsumer, + }, + }, + }, + ) + defer s.Stopper().Stop(ctx) + } + + // Perform a schema change and check that we get an event. + _, err := sqlDB.Exec("create table t()") + require.NoError(t, err) + testutils.SucceedsSoon(t, func() error { + foundEvent := testConsumer.Contains(func(event *obspb.Event) bool { + type eventLogType struct { + EventType string `json:"EventType"` + Statement string `json:"Statement"` + } + + var ev eventLogType + if err := json.Unmarshal([]byte(event.LogRecord.Body.GetStringValue()), &ev); err != nil { + t.Fatalf("failed to deserialize event: %v", event) + } + // Look for our specific create table statement. + if ev.EventType == "create_table" && + strings.Contains(ev.Statement, "CREATE TABLE defaultdb.public.t") { + return true + } + return false + }) + if !foundEvent { + return errors.Newf("no event found yet") + } + return nil + }) + }) +} diff --git a/pkg/obsservice/obslib/ingest/ingest_test.go b/pkg/obsservice/obslib/ingest/ingest_test.go deleted file mode 100644 index b0888e4ce757..000000000000 --- a/pkg/obsservice/obslib/ingest/ingest_test.go +++ /dev/null @@ -1,222 +0,0 @@ -// Copyright 2022 The Cockroach Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 - -package ingest - -import ( - "context" - gosql "database/sql" - "net" - "net/url" - "testing" - "time" - - "github.com/cockroachdb/cockroach/pkg/base" - "github.com/cockroachdb/cockroach/pkg/obs" - "github.com/cockroachdb/cockroach/pkg/obsservice/obslib/migrations" - "github.com/cockroachdb/cockroach/pkg/obsservice/obspb" - logspb "github.com/cockroachdb/cockroach/pkg/obsservice/obspb/opentelemetry-proto/collector/logs/v1" - otel_pb "github.com/cockroachdb/cockroach/pkg/obsservice/obspb/opentelemetry-proto/common/v1" - otlogs "github.com/cockroachdb/cockroach/pkg/obsservice/obspb/opentelemetry-proto/logs/v1" - v1 "github.com/cockroachdb/cockroach/pkg/obsservice/obspb/opentelemetry-proto/resource/v1" - "github.com/cockroachdb/cockroach/pkg/security/username" - "github.com/cockroachdb/cockroach/pkg/testutils" - "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" - "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" - "github.com/cockroachdb/cockroach/pkg/util/leaktest" - "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/stop" - "github.com/cockroachdb/cockroach/pkg/util/timeutil" - "github.com/cockroachdb/errors" - "github.com/google/uuid" - "github.com/jackc/pgx/v4/pgxpool" - "github.com/stretchr/testify/require" - "google.golang.org/grpc" -) - -func TestPersistEvents(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - ctx := context.Background() - s, _, _ := serverutils.StartServer(t, base.TestServerArgs{}) - defer s.Stopper().Stop(ctx) - pgURL, cleanupFunc := sqlutils.PGUrl( - t, s.ServingSQLAddr(), - "TestPersistEvents", url.User(username.RootUser), - ) - defer cleanupFunc() - - config, err := pgxpool.ParseConfig(pgURL.String()) - require.NoError(t, err) - config.ConnConfig.Database = "defaultdb" - pool, err := pgxpool.ConnectConfig(ctx, config) - require.NoError(t, err) - defer pool.Close() - require.NoError(t, migrations.RunDBMigrations(ctx, config.ConnConfig)) - - var events otlogs.ResourceLogs - clusterID, err := uuid.NewRandom() - require.NoError(t, err) - const instanceID = 42 - const nodeBinaryVersion = "25.2.1" - events.Resource = &v1.Resource{ - Attributes: []*otel_pb.KeyValue{ - { - Key: obspb.ClusterID, - Value: &otel_pb.AnyValue{Value: &otel_pb.AnyValue_StringValue{StringValue: clusterID.String()}}, - }, - { - Key: obspb.NodeID, - Value: &otel_pb.AnyValue{Value: &otel_pb.AnyValue_IntValue{IntValue: int64(instanceID)}}, - }, - { - Key: obspb.NodeBinaryVersion, - Value: &otel_pb.AnyValue{Value: &otel_pb.AnyValue_StringValue{StringValue: nodeBinaryVersion}}, - }, - }, - } - events.ScopeLogs = append(events.ScopeLogs, otlogs.ScopeLogs{ - Scope: &otel_pb.InstrumentationScope{ - Name: string(obspb.EventlogEvent), - }, - }) - - const eventType = "test event type" - numEvents := 2*maxEventsPerStatement + maxEventsPerStatement/2 - now := timeutil.Now() - // A dummy event value. It needs to be JSON. Note that the keys are ordered, - // so they match the string that comes out of the database when reading. - eventData := `{"ApplicationName": "$ internal-set-setting", "EventType": "set_cluster_setting", "PlaceholderValues": ["'22.1-26'"], "SettingName": "version", "Statement": "SET CLUSTER SETTING version = $1", "Tag": "SET CLUSTER SETTING", "Timestamp": 1659477303978528869, "User": "root", "Value": "22.1-26"}` - for i := 0; i < numEvents; i++ { - events.ScopeLogs[0].LogRecords = append(events.ScopeLogs[0].LogRecords, - otlogs.LogRecord{ - TimeUnixNano: uint64(now.UnixNano()), - Attributes: []*otel_pb.KeyValue{ - { - Key: obspb.EventlogEventTypeAttribute, - Value: &otel_pb.AnyValue{Value: &otel_pb.AnyValue_StringValue{StringValue: eventType}}, - }, - }, - Body: &otel_pb.AnyValue{Value: &otel_pb.AnyValue_StringValue{StringValue: eventData}}, - }) - } - - require.NoError(t, persistEvents(ctx, []*otlogs.ResourceLogs{&events}, pool)) - r := pool.QueryRow(ctx, "select count(1) from cluster_events") - var count int - require.NoError(t, r.Scan(&count)) - require.Equal(t, numEvents, count) - r = pool.QueryRow(ctx, "select timestamp, cluster_id, instance_id, event_type, event from cluster_events limit 1") - var timestamp time.Time - var typ, ev string - var cID []byte - var iID int - require.NoError(t, r.Scan(×tamp, &cID, &iID, &typ, &ev)) - require.True(t, timestamp.Before(timeutil.Now())) - require.Less(t, timeutil.Since(timestamp), time.Hour) - require.Equal(t, typ, eventType) - cUUID, err := uuid.ParseBytes(cID) - require.NoError(t, err) - require.Equal(t, clusterID, cUUID) - require.Equal(t, instanceID, iID) - require.Equal(t, eventData, ev) -} - -// Test an end-to-end integration between the ObsService and a CRDB cluster: -// verify that events get exported from CRDB and imported in the Obs Service. -func TestEventIngestionIntegration(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - ctx := context.Background() - - testutils.RunTrueAndFalse(t, "embed", func(t *testing.T, embed bool) { - var obsAddr string - - var s serverutils.TestServerInterface - var sqlDB *gosql.DB - if !embed { - // Allocate a port for the ingestion service to work around a circular - // dependency: CRDB needs to be told what the port is, but we can only create - // the event ingester after having started CRDB (because the ingester wants a - // reference to CRDB). - otlpListener, err := net.Listen("tcp", "127.0.0.1:0") - require.NoError(t, err) - defer func() { - _ = otlpListener.Close() - }() - obsAddr = otlpListener.Addr().String() - s, sqlDB, _ = serverutils.StartServer(t, - base.TestServerArgs{ - ObsServiceAddr: obsAddr, - Knobs: base.TestingKnobs{ - EventExporter: &obs.EventExporterTestingKnobs{ - // Flush every message. - FlushTriggerByteSize: 1, - }, - }, - }, - ) - defer s.Stopper().Stop(ctx) - - pgURL, cleanupFunc := sqlutils.PGUrl( - t, s.ServingSQLAddr(), "TestPersistEvents", url.User(username.RootUser), - ) - defer cleanupFunc() - - config, err := pgxpool.ParseConfig(pgURL.String()) - require.NoError(t, err) - config.ConnConfig.Database = "crdb_observability" - pool, err := pgxpool.ConnectConfig(ctx, config) - require.NoError(t, err) - defer pool.Close() - require.NoError(t, migrations.RunDBMigrations(ctx, config.ConnConfig)) - - // Start the ingestion in the background. - obsStop := stop.NewStopper() - defer obsStop.Stop(ctx) - e, err := MakeEventIngester(ctx, config) - require.NoError(t, err) - defer e.Close() - grpcServer := grpc.NewServer() - defer grpcServer.Stop() - logspb.RegisterLogsServiceServer(grpcServer, &e) - go func() { - _ = grpcServer.Serve(otlpListener) - }() - } else { - s, sqlDB, _ = serverutils.StartServer(t, - base.TestServerArgs{ - ObsServiceAddr: base.ObsServiceEmbedFlagValue, - Knobs: base.TestingKnobs{ - EventExporter: &obs.EventExporterTestingKnobs{ - // Flush every message. - FlushTriggerByteSize: 1, - }, - }, - }, - ) - defer s.Stopper().Stop(ctx) - } - - // Perform a schema change and check that we get an event. - _, err := sqlDB.Exec("create table t()") - require.NoError(t, err) - - // Wait for an event to be ingested. - testutils.SucceedsSoon(t, func() error { - r := sqlDB.QueryRow("SELECT count(*) FROM crdb_observability.cluster_events WHERE event_type='create_table'") - var count int - require.NoError(t, r.Scan(&count)) - if count < 1 { - return errors.Newf("no events yet") - } - return nil - }) - }) - -} diff --git a/pkg/obsservice/obslib/ingest/testdata/grpc_ingest b/pkg/obsservice/obslib/ingest/testdata/grpc_ingest new file mode 100644 index 000000000000..6c30d2f4ad75 --- /dev/null +++ b/pkg/obsservice/obslib/ingest/testdata/grpc_ingest @@ -0,0 +1,166 @@ +test-ingest +type1,hello +type2,hola +type1,world +type2,mundo +---- +&obspb.Event{ + Resource: &v1.Resource{ + Attributes: { + &v1.KeyValue{ + Key: "ClusterID", + Value: &v1.AnyValue{ + Value: &v1.AnyValue_StringValue{StringValue:"44875af2-aea5-4965-8f9c-63fec244fd41"}, + }, + }, + }, + DroppedAttributesCount: 0x0, + }, + Scope: &v1.InstrumentationScope{ + Name: "type1", + Version: "1.0", + Attributes: nil, + DroppedAttributesCount: 0x0, + }, + LogRecord: v1.LogRecord{ + TimeUnixNano: 0x176c33b203521680, + ObservedTimeUnixNano: 0x176c33bffb00d800, + SeverityNumber: 0, + SeverityText: "", + Body: &v1.AnyValue{ + Value: &v1.AnyValue_StringValue{StringValue:"hello"}, + }, + Attributes: { + &v1.KeyValue{ + Key: "event_type", + Value: &v1.AnyValue{ + Value: &v1.AnyValue_StringValue{StringValue:"type1"}, + }, + }, + }, + DroppedAttributesCount: 0x0, + Flags: 0x0, + TraceId: nil, + SpanId: nil, + }, +} +&obspb.Event{ + Resource: &v1.Resource{ + Attributes: { + &v1.KeyValue{ + Key: "ClusterID", + Value: &v1.AnyValue{ + Value: &v1.AnyValue_StringValue{StringValue:"44875af2-aea5-4965-8f9c-63fec244fd41"}, + }, + }, + }, + DroppedAttributesCount: 0x0, + }, + Scope: &v1.InstrumentationScope{ + Name: "type1", + Version: "1.0", + Attributes: nil, + DroppedAttributesCount: 0x0, + }, + LogRecord: v1.LogRecord{ + TimeUnixNano: 0x176c33b204834380, + ObservedTimeUnixNano: 0x176c33bffb00d800, + SeverityNumber: 0, + SeverityText: "", + Body: &v1.AnyValue{ + Value: &v1.AnyValue_StringValue{StringValue:"world"}, + }, + Attributes: { + &v1.KeyValue{ + Key: "event_type", + Value: &v1.AnyValue{ + Value: &v1.AnyValue_StringValue{StringValue:"type1"}, + }, + }, + }, + DroppedAttributesCount: 0x0, + Flags: 0x0, + TraceId: nil, + SpanId: nil, + }, +} +&obspb.Event{ + Resource: &v1.Resource{ + Attributes: { + &v1.KeyValue{ + Key: "ClusterID", + Value: &v1.AnyValue{ + Value: &v1.AnyValue_StringValue{StringValue:"44875af2-aea5-4965-8f9c-63fec244fd41"}, + }, + }, + }, + DroppedAttributesCount: 0x0, + }, + Scope: &v1.InstrumentationScope{ + Name: "type2", + Version: "1.0", + Attributes: nil, + DroppedAttributesCount: 0x0, + }, + LogRecord: v1.LogRecord{ + TimeUnixNano: 0x176c33b203eaad00, + ObservedTimeUnixNano: 0x176c33bffb00d800, + SeverityNumber: 0, + SeverityText: "", + Body: &v1.AnyValue{ + Value: &v1.AnyValue_StringValue{StringValue:"hola"}, + }, + Attributes: { + &v1.KeyValue{ + Key: "event_type", + Value: &v1.AnyValue{ + Value: &v1.AnyValue_StringValue{StringValue:"type2"}, + }, + }, + }, + DroppedAttributesCount: 0x0, + Flags: 0x0, + TraceId: nil, + SpanId: nil, + }, +} +&obspb.Event{ + Resource: &v1.Resource{ + Attributes: { + &v1.KeyValue{ + Key: "ClusterID", + Value: &v1.AnyValue{ + Value: &v1.AnyValue_StringValue{StringValue:"44875af2-aea5-4965-8f9c-63fec244fd41"}, + }, + }, + }, + DroppedAttributesCount: 0x0, + }, + Scope: &v1.InstrumentationScope{ + Name: "type2", + Version: "1.0", + Attributes: nil, + DroppedAttributesCount: 0x0, + }, + LogRecord: v1.LogRecord{ + TimeUnixNano: 0x176c33b2051bda00, + ObservedTimeUnixNano: 0x176c33bffb00d800, + SeverityNumber: 0, + SeverityText: "", + Body: &v1.AnyValue{ + Value: &v1.AnyValue_StringValue{StringValue:"mundo"}, + }, + Attributes: { + &v1.KeyValue{ + Key: "event_type", + Value: &v1.AnyValue{ + Value: &v1.AnyValue_StringValue{StringValue:"type2"}, + }, + }, + }, + DroppedAttributesCount: 0x0, + Flags: 0x0, + TraceId: nil, + SpanId: nil, + }, +} diff --git a/pkg/obsservice/obslib/obsutil/BUILD.bazel b/pkg/obsservice/obslib/obsutil/BUILD.bazel new file mode 100644 index 000000000000..f2feb0d6baa0 --- /dev/null +++ b/pkg/obsservice/obslib/obsutil/BUILD.bazel @@ -0,0 +1,38 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "testutil", + srcs = ["test_consumer.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/obsservice/obslib/testutil", + visibility = ["//visibility:public"], + deps = [ + "//pkg/obsservice/obslib", + "//pkg/obsservice/obspb", + ], +) + +go_library( + name = "obstestutil", + srcs = ["test_consumer.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/obsservice/obslib/obstestutil", + visibility = ["//visibility:public"], + deps = [ + "//pkg/obsservice/obslib", + "//pkg/obsservice/obspb", + ], +) + +go_library( + name = "obsutil", + srcs = [ + "std_out_consumer.go", + "test_consumer.go", + ], + importpath = "github.com/cockroachdb/cockroach/pkg/obsservice/obslib/obsutil", + visibility = ["//visibility:public"], + deps = [ + "//pkg/obsservice/obslib", + "//pkg/obsservice/obspb", + "//pkg/util/syncutil", + ], +) diff --git a/pkg/obsservice/obslib/obsutil/std_out_consumer.go b/pkg/obsservice/obslib/obsutil/std_out_consumer.go new file mode 100644 index 000000000000..8f01a6135a90 --- /dev/null +++ b/pkg/obsservice/obslib/obsutil/std_out_consumer.go @@ -0,0 +1,30 @@ +// Copyright 2023 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 + +package obsutil + +import ( + "context" + "fmt" + + "github.com/cockroachdb/cockroach/pkg/obsservice/obslib" + "github.com/cockroachdb/cockroach/pkg/obsservice/obspb" +) + +// StdOutConsumer implements the EventConsumer interface and logs +// each event it receives to STDOUT, for testing purposes. +// +// StdOutConsumer is not intended for real-world use. +type StdOutConsumer struct{} + +func (s StdOutConsumer) Consume(ctx context.Context, event *obspb.Event) error { + fmt.Printf("StdOutConsumer - consumed event: %v\n", event) + return nil +} + +var _ obslib.EventConsumer = (*StdOutConsumer)(nil) diff --git a/pkg/obsservice/obslib/obsutil/test_consumer.go b/pkg/obsservice/obslib/obsutil/test_consumer.go new file mode 100644 index 000000000000..6e4573f5126c --- /dev/null +++ b/pkg/obsservice/obslib/obsutil/test_consumer.go @@ -0,0 +1,78 @@ +// Copyright 2023 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 + +package obsutil + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/obsservice/obslib" + "github.com/cockroachdb/cockroach/pkg/obsservice/obspb" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" +) + +// TestCaptureConsumer is a test utility used for testing +// components in the observability service. It captures events +// into a buffer and provides functions that allow tests to +// analyze consumed contents to make assertions against. +type TestCaptureConsumer struct { + mu struct { + syncutil.Mutex + events []*obspb.Event + } +} + +var _ obslib.EventConsumer = (*TestCaptureConsumer)(nil) + +// NewTestCaptureConsumer returns a new instance of a TestCaptureConsumer. +func NewTestCaptureConsumer() *TestCaptureConsumer { + c := &TestCaptureConsumer{} + c.mu.events = make([]*obspb.Event, 0) + return c +} + +// Len returns the number of events captured by this +// TestCaptureConsumer. +func (c *TestCaptureConsumer) Len() int { + c.mu.Lock() + defer c.mu.Unlock() + return len(c.mu.events) +} + +func (c *TestCaptureConsumer) Events() []*obspb.Event { + c.mu.Lock() + defer c.mu.Unlock() + return c.mu.events +} + +// Consume implements the consumer.EventConsumer interface. +// Events consumed by the TestCaptureConsumer are stored in an +// internal buffer for later analysis. +// +// Calls to Consume() are synchronized. +func (c *TestCaptureConsumer) Consume(_ context.Context, event *obspb.Event) error { + c.mu.Lock() + defer c.mu.Unlock() + c.mu.events = append(c.mu.events, event) + return nil +} + +// Contains runs the given predicate against all the events in the +// TestCaptureConsumer's buffer. As soon as one of the events matches +// the predicate, Contains returns true. If no events pass the given +// predicate, Contains returns false. +func (c *TestCaptureConsumer) Contains(apply func(*obspb.Event) bool) bool { + c.mu.Lock() + defer c.mu.Unlock() + for _, event := range c.mu.events { + if apply(event) { + return true + } + } + return false +} diff --git a/pkg/obsservice/obslib/transform/BUILD.bazel b/pkg/obsservice/obslib/transform/BUILD.bazel new file mode 100644 index 000000000000..b25050db61a3 --- /dev/null +++ b/pkg/obsservice/obslib/transform/BUILD.bazel @@ -0,0 +1,14 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "transform", + srcs = ["log_record_to_event.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/obsservice/obslib/transform", + visibility = ["//visibility:public"], + deps = [ + "//pkg/obsservice/obspb", + "//pkg/obsservice/obspb/opentelemetry-proto/common/v1:common", + "//pkg/obsservice/obspb/opentelemetry-proto/logs/v1:logs", + "//pkg/obsservice/obspb/opentelemetry-proto/resource/v1:resource", + ], +) diff --git a/pkg/obsservice/obslib/transform/log_record_to_event.go b/pkg/obsservice/obslib/transform/log_record_to_event.go new file mode 100644 index 000000000000..ae6de23c20b6 --- /dev/null +++ b/pkg/obsservice/obslib/transform/log_record_to_event.go @@ -0,0 +1,35 @@ +// Copyright 2023 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 + +package transform + +import ( + "time" + + "github.com/cockroachdb/cockroach/pkg/obsservice/obspb" + commonv1 "github.com/cockroachdb/cockroach/pkg/obsservice/obspb/opentelemetry-proto/common/v1" + logsv1 "github.com/cockroachdb/cockroach/pkg/obsservice/obspb/opentelemetry-proto/logs/v1" + resourcev1 "github.com/cockroachdb/cockroach/pkg/obsservice/obspb/opentelemetry-proto/resource/v1" +) + +// LogRecordToEvent transforms a given LogRecord, with an accompanying +// Resource and Scope, into an internal Event message for further +// processing. +func LogRecordToEvent( + ingestTime time.Time, + resource *resourcev1.Resource, + scope *commonv1.InstrumentationScope, + logRecord logsv1.LogRecord, +) *obspb.Event { + logRecord.ObservedTimeUnixNano = uint64(ingestTime.UnixNano()) + return &obspb.Event{ + Resource: resource, + Scope: scope, + LogRecord: logRecord, + } +} diff --git a/pkg/obsservice/obspb/BUILD.bazel b/pkg/obsservice/obspb/BUILD.bazel index a2cbaf679d52..e291be6b8127 100644 --- a/pkg/obsservice/obspb/BUILD.bazel +++ b/pkg/obsservice/obspb/BUILD.bazel @@ -1,3 +1,5 @@ +load("@rules_proto//proto:defs.bzl", "proto_library") +load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library") load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( @@ -6,6 +8,35 @@ go_library( "event_types.go", "resource.go", ], + embed = [":obspb_go_proto"], importpath = "github.com/cockroachdb/cockroach/pkg/obsservice/obspb", visibility = ["//visibility:public"], ) + +proto_library( + name = "obspb_proto", + srcs = ["obsservice.proto"], + strip_import_prefix = "/pkg", + visibility = ["//visibility:public"], + deps = [ + "//pkg/obsservice/obspb/opentelemetry-proto/common/v1:v1_proto", + "//pkg/obsservice/obspb/opentelemetry-proto/logs/v1:v1_proto", + "//pkg/obsservice/obspb/opentelemetry-proto/resource/v1:v1_proto", + "@com_github_gogo_protobuf//gogoproto:gogo_proto", + "@com_google_protobuf//:timestamp_proto", + ], +) + +go_proto_library( + name = "obspb_go_proto", + compilers = ["//pkg/cmd/protoc-gen-gogoroach:protoc-gen-gogoroach_compiler"], + importpath = "github.com/cockroachdb/cockroach/pkg/obsservice/obspb", + proto = ":obspb_proto", + visibility = ["//visibility:public"], + deps = [ + "//pkg/obsservice/obspb/opentelemetry-proto/common/v1:common", + "//pkg/obsservice/obspb/opentelemetry-proto/logs/v1:logs", + "//pkg/obsservice/obspb/opentelemetry-proto/resource/v1:resource", + "@com_github_gogo_protobuf//gogoproto", + ], +) diff --git a/pkg/obsservice/obspb/obsservice.proto b/pkg/obsservice/obspb/obsservice.proto new file mode 100644 index 000000000000..822ab916a6ec --- /dev/null +++ b/pkg/obsservice/obspb/obsservice.proto @@ -0,0 +1,39 @@ +// Copyright 2023 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 + +syntax = "proto3"; + +package cockroach.obspb; + +import "gogoproto/gogo.proto"; + +import "obsservice/obspb/opentelemetry-proto/logs/v1/logs.proto"; +import "obsservice/obspb/opentelemetry-proto/common/v1/common.proto"; +import "obsservice/obspb/opentelemetry-proto/resource/v1/resource.proto"; +import "google/protobuf/timestamp.proto"; + +option go_package = "github.com/cockroachdb/cockroach/pkg/obsservice/obspb"; + +// A generic event record used within the Observability Service. +// Generally, the data within log_record is eventually transformed +// into an event-specific protobuf message for further processing, +// but this message represents the event in its raw form. +message Event { + // The resource for the event. + // If this field is not set then resource info is unknown. + // Contains information referring to the source of the event. + // For example, cluster ID, node ID, etc. + opentelemetry.proto.resource.v1.Resource resource = 1; + + // The instrumentation scope information for the event. Contains + // event-specific information. For example, event type and version. + opentelemetry.proto.common.v1.InstrumentationScope scope = 2; + + // The LogRecord containing the specific event information. + opentelemetry.proto.logs.v1.LogRecord log_record = 3 [(gogoproto.nullable) = false ]; +} diff --git a/pkg/obsservice/obspb/opentelemetry-proto/common/v1/common.proto b/pkg/obsservice/obspb/opentelemetry-proto/common/v1/common.proto index db4649e33ef2..e7c20b5d257c 100644 --- a/pkg/obsservice/obspb/opentelemetry-proto/common/v1/common.proto +++ b/pkg/obsservice/obspb/opentelemetry-proto/common/v1/common.proto @@ -72,6 +72,10 @@ message InstrumentationScope { // An empty instrumentation scope name means the name is unknown. string name = 1; string version = 2; + + // Additional attributes that describe the scope. [Optional]. + // Attribute keys MUST be unique (it is not allowed to have more than one + // attribute with the same key). repeated KeyValue attributes = 3; uint32 dropped_attributes_count = 4; } diff --git a/pkg/obsservice/obspb/opentelemetry-proto/logs/v1/logs.proto b/pkg/obsservice/obspb/opentelemetry-proto/logs/v1/logs.proto index d5db90161fdb..7517050f5719 100644 --- a/pkg/obsservice/obspb/opentelemetry-proto/logs/v1/logs.proto +++ b/pkg/obsservice/obspb/opentelemetry-proto/logs/v1/logs.proto @@ -106,10 +106,21 @@ enum SeverityNumber { SEVERITY_NUMBER_FATAL4 = 24; } -// Masks for LogRecord.flags field. +// LogRecordFlags is defined as a protobuf 'uint32' type and is to be used as +// bit-fields. Each non-zero value defined in this enum is a bit-mask. +// To extract the bit-field, for example, use an expression like: +// +// (logRecord.flags & LOG_RECORD_FLAGS_TRACE_FLAGS_MASK) +// enum LogRecordFlags { - LOG_RECORD_FLAG_UNSPECIFIED = 0; - LOG_RECORD_FLAG_TRACE_FLAGS_MASK = 0x000000FF; + // The zero value for the enum. Should not be used for comparisons. + // Instead use bitwise "and" with the appropriate mask as shown above. + LOG_RECORD_FLAGS_DO_NOT_USE = 0; + + // Bits 0-7 are used for trace flags. + LOG_RECORD_FLAGS_TRACE_FLAGS_MASK = 0x000000FF; + + // Bits 8-31 are reserved for future use. } // A log record according to OpenTelemetry Log Data Model: @@ -162,18 +173,33 @@ message LogRecord { // defined in W3C Trace Context specification. 24 most significant bits are reserved // and must be set to 0. Readers must not assume that 24 most significant bits // will be zero and must correctly mask the bits when reading 8-bit trace flag (use - // flags & TRACE_FLAGS_MASK). [Optional]. + // flags & LOG_RECORD_FLAGS_TRACE_FLAGS_MASK). [Optional]. fixed32 flags = 8; // A unique identifier for a trace. All logs from the same trace share - // the same `trace_id`. The ID is a 16-byte array. An ID with all zeroes - // is considered invalid. Can be set for logs that are part of request processing - // and have an assigned trace id. [Optional]. + // the same `trace_id`. The ID is a 16-byte array. An ID with all zeroes OR + // of length other than 16 bytes is considered invalid (empty string in OTLP/JSON + // is zero-length and thus is also invalid). + // + // This field is optional. + // + // The receivers SHOULD assume that the log record is not associated with a + // trace if any of the following is true: + // - the field is not present, + // - the field contains an invalid value. bytes trace_id = 9; // A unique identifier for a span within a trace, assigned when the span - // is created. The ID is an 8-byte array. An ID with all zeroes is considered - // invalid. Can be set for logs that are part of a particular processing span. - // If span_id is present trace_id SHOULD be also present. [Optional]. + // is created. The ID is an 8-byte array. An ID with all zeroes OR of length + // other than 8 bytes is considered invalid (empty string in OTLP/JSON + // is zero-length and thus is also invalid). + // + // This field is optional. If the sender specifies a valid span_id then it SHOULD also + // specify a valid trace_id. + // + // The receivers SHOULD assume that the log record is not associated with a + // span if any of the following is true: + // - the field is not present, + // - the field contains an invalid value. bytes span_id = 10; } diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index d44fae2ab7d8..d58264665c1f 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -155,8 +155,10 @@ go_library( "//pkg/multitenant/tenantcapabilities/tenantcapabilitieswatcher", "//pkg/multitenant/tenantcostmodel", "//pkg/obs", + "//pkg/obsservice/obslib", "//pkg/obsservice/obslib/ingest", "//pkg/obsservice/obslib/migrations", + "//pkg/obsservice/obslib/obsutil", "//pkg/obsservice/obspb/opentelemetry-proto/collector/logs/v1:logs_service", "//pkg/roachpb", "//pkg/rpc", diff --git a/pkg/server/initial_sql.go b/pkg/server/initial_sql.go index 86fdf05516bd..4e01a5ed0370 100644 --- a/pkg/server/initial_sql.go +++ b/pkg/server/initial_sql.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage" + "github.com/cockroachdb/cockroach/pkg/obs" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" @@ -32,7 +33,11 @@ func (s *Server) RunInitialSQL( ctx context.Context, startSingleNode bool, adminUser, adminPassword string, ) error { if s.cfg.ObsServiceAddr == base.ObsServiceEmbedFlagValue { - if err := s.startEmbeddedObsService(ctx); err != nil { + var knobs *obs.EventExporterTestingKnobs + if s.cfg.TestingKnobs.EventExporter != nil { + knobs = s.cfg.TestingKnobs.EventExporter.(*obs.EventExporterTestingKnobs) + } + if err := s.startEmbeddedObsService(ctx, knobs); err != nil { return err } } diff --git a/pkg/server/server_obs_service.go b/pkg/server/server_obs_service.go index 57de217c42ff..b0fd2f488877 100644 --- a/pkg/server/server_obs_service.go +++ b/pkg/server/server_obs_service.go @@ -14,8 +14,11 @@ import ( "context" "net" + "github.com/cockroachdb/cockroach/pkg/obs" + "github.com/cockroachdb/cockroach/pkg/obsservice/obslib" "github.com/cockroachdb/cockroach/pkg/obsservice/obslib/ingest" "github.com/cockroachdb/cockroach/pkg/obsservice/obslib/migrations" + "github.com/cockroachdb/cockroach/pkg/obsservice/obslib/obsutil" logspb "github.com/cockroachdb/cockroach/pkg/obsservice/obspb/opentelemetry-proto/collector/logs/v1" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/netutil" @@ -26,7 +29,9 @@ import ( // startEmbeddedObsService creates the schema for the Observability Service (if // it doesn't exist already), starts the internal RPC service for event // ingestion and hooks up the event exporter to talk to the local service. -func (s *Server) startEmbeddedObsService(ctx context.Context) error { +func (s *Server) startEmbeddedObsService( + ctx context.Context, knobs *obs.EventExporterTestingKnobs, +) error { // Create the Obs Service schema. loopbackConfig, err := pgxpool.ParseConfig("") if err != nil { @@ -43,23 +48,24 @@ func (s *Server) startEmbeddedObsService(ctx context.Context) error { } // Create the internal ingester RPC server. - embeddedObsSvc, err := ingest.MakeEventIngester(ctx, loopbackConfig) - if err != nil { - return err + // TODO(abarganier): implement a more useful EventConsumer. + // TODO(abarganier): implement unified initialization for EventIngester. + var consumer obslib.EventConsumer = &obsutil.StdOutConsumer{} + if knobs != nil && knobs.TestConsumer != nil { + consumer = knobs.TestConsumer } + embeddedObsSvc := ingest.MakeEventIngester(ctx, consumer, nil) // We'll use an RPC server serving on a "loopback" interface implemented with // in-memory pipes. grpcServer := grpc.NewServer() - logspb.RegisterLogsServiceServer(grpcServer, &embeddedObsSvc) + logspb.RegisterLogsServiceServer(grpcServer, embeddedObsSvc) rpcLoopbackL := netutil.NewLoopbackListener(ctx, s.stopper) if err := s.stopper.RunAsyncTask( ctx, "obssvc-loopback-quiesce", func(ctx context.Context) { <-s.stopper.ShouldQuiesce() grpcServer.Stop() - embeddedObsSvc.Close() }, ); err != nil { - embeddedObsSvc.Close() return err } if err := s.stopper.RunAsyncTask( diff --git a/pkg/testutils/lint/lint_test.go b/pkg/testutils/lint/lint_test.go index 9022c8ff7b48..0e9434dacde9 100644 --- a/pkg/testutils/lint/lint_test.go +++ b/pkg/testutils/lint/lint_test.go @@ -2306,6 +2306,8 @@ func TestLint(t *testing.T) { // pooling, etc, then test code needs to adhere as well. stream.GrepNot(nakedGoroutineExceptions + `:.*Use of go keyword not allowed`), stream.GrepNot(nakedGoroutineExceptions + `:.*Illegal call to Group\.Go\(\)`), + // We purposefully dereference nil in this file to test panic handling + stream.GrepNot(`pkg/cmd/roachtest/roachtestutil/mixedversion/runner_test\.go:.*nil dereference`), } const vetTool = "roachvet"