Skip to content

Commit

Permalink
obsservice: migrate grpc EventIngester to fit new architecture
Browse files Browse the repository at this point in the history
In the original design of the obsservice, exported events were
intended to be written directly to storage. The idea was that
exported events would experience minimal transformation once
ingested, meaning that work done to "package" events properly
was left up to the exporting client (CRDB). The obsservice
would then store the ingested invents into a target storage.
This concept of target storage has been removed for now as
part of this patch.

In the new architecture, exported events are more "raw", and
we expect the obsservice to heavily transform & aggregate the
data externally, where the aggregated results are flushed
to storage instead.

This patch takes the pre-existing gRPC events ingester, and
modifies it to meet the new architecture.

The events ingester will now be provided with a consumer with
which it can feed ingested events into the broader pipeline.
It is no longer the responsibility of the ingester to write
ingested events to storage.

For now, we use a simple STDOUT consumer that writes all
ingested events to STDOUT, but in the future, this will
be a more legitimate component - part of a chain that
eventually buffers ingested events for aggregation.

Release note: none
  • Loading branch information
abarganier committed Jun 27, 2023
1 parent 3fb55d0 commit 946d774
Show file tree
Hide file tree
Showing 21 changed files with 801 additions and 453 deletions.
4 changes: 4 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1434,6 +1434,10 @@ GO_TARGETS = [
"//pkg/obsservice/obslib/ingest:ingest",
"//pkg/obsservice/obslib/ingest:ingest_test",
"//pkg/obsservice/obslib/migrations:migrations",
"//pkg/obsservice/obslib/obsutil:obstestutil",
"//pkg/obsservice/obslib/obsutil:obsutil",
"//pkg/obsservice/obslib/obsutil:testutil",
"//pkg/obsservice/obslib/transform:transform",
"//pkg/obsservice/obslib:obslib",
"//pkg/obsservice/obspb/opentelemetry-proto/collector/logs/v1:logs_service",
"//pkg/obsservice/obspb/opentelemetry-proto/common/v1:common",
Expand Down
1 change: 1 addition & 0 deletions pkg/obs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/base",
"//pkg/obsservice/obslib",
"//pkg/obsservice/obspb",
"//pkg/obsservice/obspb/opentelemetry-proto/collector/logs/v1:logs_service",
"//pkg/obsservice/obspb/opentelemetry-proto/common/v1:common",
Expand Down
5 changes: 5 additions & 0 deletions pkg/obs/event_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/obsservice/obslib"
"github.com/cockroachdb/cockroach/pkg/obsservice/obspb"
otel_collector_pb "github.com/cockroachdb/cockroach/pkg/obsservice/obspb/opentelemetry-proto/collector/logs/v1"
otel_pb "github.com/cockroachdb/cockroach/pkg/obsservice/obspb/opentelemetry-proto/common/v1"
Expand Down Expand Up @@ -94,6 +95,10 @@ type EventExporterTestingKnobs struct {
// FlushTriggerByteSize, if set, overrides the default trigger value for the
// EventExporter.
FlushTriggerByteSize uint64
// TestConsumer, if set, sets the consumer to be used by the embedded ingest
// component used. This allows us to capture consumed events when running
// in embedded mode, so we can make assertions against them in tests.
TestConsumer obslib.EventConsumer
}

var _ base.ModuleTestingKnobs = &EventExporterTestingKnobs{}
Expand Down
36 changes: 10 additions & 26 deletions pkg/obsservice/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ The Obs Service is developed as a library (in the `obslib` package) and a binary
embed and extend the library (for example we imagine CockroachCloud doing so in
the future).

**Note**: Serving DB Console is no longer a core part of the planned utility of the
Obs Service. However, as the functionality to serve the DB Console is maintained for
now, in case it proves useful down the line.

## Building the Obs Service

Build with
Expand Down Expand Up @@ -46,7 +50,8 @@ exporters:
tls:
insecure: true
```
- `--http-addr` is the address on which the DB Console is served.
- `--http-addr` is the address on which the DB Console is served. NB: This feature may
be removed in the future. See note above in [header section](#CockroachDB-Observability-Service)
- `--crdb-http-url` is CRDB's HTTP address. For a multi-node CRDB cluster, this
can point to a load-balancer. It can be either a HTTP or an HTTPS address,
depending on whether the CRDB cluster is running as `--insecure`.
Expand All @@ -70,10 +75,6 @@ exporters:
one created with `cockroach cert create-ca`). If specified, HTTP requests are
only proxied to CRDB nodes that present certificates signed by this CA. If not
specified, the system's CA list is used.
- `--sink-pgurl` is the connection string for the sink cluster. If the pgurl
contains a database name, that database will be used; otherwise `obsservice`
will be used. If not specified, a connection to a local cluster will be
attempted.

## Functionality

Expand All @@ -86,13 +87,8 @@ In the current fledgling state, the Obs Service does a couple of things:

3. The Obs Service exposes the OTLP Logs gRPC service and is able to ingest
events received through calls to this RPC service. Only insecure gRPC
connections are supported at the moment.

4. The Obs Service connects to a sink cluster identified by `--sink-pgurl`. The
required schema is automatically created using SQL migrations run with
[goose](https://github.com/pressly/goose). The state of migrations in a sink
cluster can be inspected through the `observice.obs_admin.migrations` table.
The ingested events are saved in the sink cluster.
connections are supported at the moment. Events are ingested into the
Obs Service for aggregation and eventual storage.

## Event ingestion

Expand All @@ -106,22 +102,10 @@ and,within that, into
[`ScopeLogs`](https://github.com/open-telemetry/opentelemetry-proto/blob/200ccff768a29f8bd431e0a4a463da7ed58be557/opentelemetry/proto/logs/v1/logs.proto#L64).
A resource identifies the cluster/node/tenant that is emitting the respective
events. A scope identifies the type of event; events of different types get
persisted in different tables, based on this event type. Events of unrecognized
types are dropped. Currently, a single event type is supported: `"eventlog"`.
routed to different processing pipelines, based on this event type. Events of
unrecognized types are dropped. Currently, a single event type is supported: `"eventlog"`.
The log records carry attributes and a JSON payload representing the event.

The mapping between event types and tables is listed in the table below:

| Event type | Table | Attributes |
|------------|----------------|--------------|
| eventlog | cluster_events | {event_type} |

Each table storing events can have a different schema. It is expected that these
tables store some event fields as columns and otherwise store the raw event in a
JSON column. The values of the different columns can come from the attributes of
the log record (listed in the table above), or from the JSON itself. Virtual or
computed columns can be used to extract data from the JSON directly.

## Licensing

The Observability Service is licensed as Apache 2.0.
3 changes: 1 addition & 2 deletions pkg/obsservice/cmd/obsservice/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,13 @@ go_library(
"//pkg/cli/exit",
"//pkg/obsservice/obslib/httpproxy",
"//pkg/obsservice/obslib/ingest",
"//pkg/obsservice/obslib/migrations",
"//pkg/obsservice/obslib/obsutil",
"//pkg/obsservice/obspb/opentelemetry-proto/collector/logs/v1:logs_service",
"//pkg/ui/distoss",
"//pkg/util/log",
"//pkg/util/stop",
"//pkg/util/sysutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_jackc_pgx_v4//pgxpool",
"@com_github_spf13_cobra//:cobra",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_x_sys//unix",
Expand Down
39 changes: 16 additions & 23 deletions pkg/obsservice/cmd/obsservice/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@ import (
"github.com/cockroachdb/cockroach/pkg/cli/exit"
"github.com/cockroachdb/cockroach/pkg/obsservice/obslib/httpproxy"
"github.com/cockroachdb/cockroach/pkg/obsservice/obslib/ingest"
"github.com/cockroachdb/cockroach/pkg/obsservice/obslib/migrations"
"github.com/cockroachdb/cockroach/pkg/obsservice/obslib/obsutil"
logspb "github.com/cockroachdb/cockroach/pkg/obsservice/obspb/opentelemetry-proto/collector/logs/v1"
_ "github.com/cockroachdb/cockroach/pkg/ui/distoss" // web UI init hooks
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/sysutil"
"github.com/cockroachdb/errors"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/spf13/cobra"
"golang.org/x/sys/unix"
"google.golang.org/grpc"
Expand All @@ -47,9 +46,6 @@ var drainSignals = []os.Signal{unix.SIGINT, unix.SIGTERM}
// shutdown (i.e. second occurrence does not incur hard shutdown).
var termSignal os.Signal = unix.SIGTERM

// defaultSinkDBName is the name of the database to be used by default.
const defaultSinkDBName = "obsservice"

// RootCmd represents the base command when called without any subcommands
var RootCmd = &cobra.Command{
Use: "obsservice",
Expand All @@ -67,38 +63,35 @@ from one or more CockroachDB clusters.`,
UICertKeyPath: uiCertKeyPath,
}

connCfg, err := pgxpool.ParseConfig(sinkPGURL)
if err != nil {
return errors.Wrapf(err, "invalid --sink-pgurl (%s)", sinkPGURL)
}
if connCfg.ConnConfig.Database == "" {
fmt.Printf("No database explicitly provided in --sink-pgurl. Using %q.\n", defaultSinkDBName)
connCfg.ConnConfig.Database = defaultSinkDBName
}

if err := migrations.RunDBMigrations(ctx, connCfg.ConnConfig); err != nil {
return errors.Wrap(err, "failed to run DB migrations")
}
// TODO(abarganier): migrate DB migrations over to target storage for aggregated outputs
//connCfg, err := pgxpool.ParseConfig(sinkPGURL)
//if err != nil {
// return errors.Wrapf(err, "invalid --sink-pgurl (%s)", sinkPGURL)
//}
//if connCfg.ConnConfig.Database == "" {
// fmt.Printf("No database explicitly provided in --sink-pgurl. Using %q.\n", defaultSinkDBName)
// connCfg.ConnConfig.Database = defaultSinkDBName
//}
//if err := migrations.RunDBMigrations(ctx, connCfg.ConnConfig); err != nil {
// return errors.Wrap(err, "failed to run DB migrations")
//}

signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, drainSignals...)

stop := stop.NewStopper()

// Run the event ingestion in the background.
ingester, err := ingest.MakeEventIngester(ctx, connCfg)
if err != nil {
return err
}
consumer := &obsutil.StdOutConsumer{}
ingester := ingest.MakeEventIngester(ctx, consumer, nil)
listener, err := net.Listen("tcp", otlpAddr)
if err != nil {
return errors.Wrapf(err, "failed to listen for incoming HTTP connections on address %s", otlpAddr)
}
fmt.Printf("Listening for OTLP connections on %s.", otlpAddr)
grpcServer := grpc.NewServer()
logspb.RegisterLogsServiceServer(grpcServer, &ingester)
logspb.RegisterLogsServiceServer(grpcServer, ingester)
if err := stop.RunAsyncTask(ctx, "event ingester", func(ctx context.Context) {
defer ingester.Close()
_ = grpcServer.Serve(listener)
}); err != nil {
return err
Expand Down
24 changes: 12 additions & 12 deletions pkg/obsservice/obslib/ingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,56 +2,56 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "ingest",
srcs = ["ingest.go"],
srcs = ["grpc_ingest.go"],
importpath = "github.com/cockroachdb/cockroach/pkg/obsservice/obslib/ingest",
visibility = ["//visibility:public"],
deps = [
"//pkg/obsservice/obspb",
"//pkg/obsservice/obslib",
"//pkg/obsservice/obslib/transform",
"//pkg/obsservice/obspb/opentelemetry-proto/collector/logs/v1:logs_service",
"//pkg/obsservice/obspb/opentelemetry-proto/logs/v1:logs",
"//pkg/roachpb",
"//pkg/util/log",
"//pkg/util/timeutil",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@com_github_jackc_pgx_v4//pgxpool",
],
)

go_test(
name = "ingest_test",
srcs = [
"ingest_test.go",
"grpc_ingest_test.go",
"ingest_integration_test.go",
"main_test.go",
],
args = ["-test.timeout=295s"],
data = glob(["testdata/**"]),
embed = [":ingest"],
deps = [
"//pkg/base",
"//pkg/obs",
"//pkg/obsservice/obslib/migrations",
"//pkg/obsservice/obslib/obsutil",
"//pkg/obsservice/obspb",
"//pkg/obsservice/obspb/opentelemetry-proto/collector/logs/v1:logs_service",
"//pkg/obsservice/obspb/opentelemetry-proto/common/v1:common",
"//pkg/obsservice/obspb/opentelemetry-proto/logs/v1:logs",
"//pkg/obsservice/obspb/opentelemetry-proto/resource/v1:resource",
"//pkg/roachpb",
"//pkg/rpc",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/security/username",
"//pkg/server",
"//pkg/settings/cluster",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/randutil",
"//pkg/util/stop",
"//pkg/util/timeutil",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_cockroachdb_errors//:errors",
"@com_github_google_uuid//:uuid",
"@com_github_jackc_pgx_v4//pgxpool",
"@com_github_kr_pretty//:pretty",
"@com_github_stretchr_testify//require",
"@org_golang_google_grpc//:go_default_library",
],
)
81 changes: 81 additions & 0 deletions pkg/obsservice/obslib/ingest/grpc_ingest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright 2022 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0

package ingest

import (
"context"

"github.com/cockroachdb/cockroach/pkg/obsservice/obslib"
"github.com/cockroachdb/cockroach/pkg/obsservice/obslib/transform"
logspb "github.com/cockroachdb/cockroach/pkg/obsservice/obspb/opentelemetry-proto/collector/logs/v1"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
)

// EventIngester implements the OTLP Logs gRPC service, accepting connections
// and ingesting events.
type EventIngester struct {
consumer obslib.EventConsumer
timeSource timeutil.TimeSource
}

var _ logspb.LogsServiceServer = &EventIngester{}

// MakeEventIngester creates a new EventIngester. Callers can optionally
// provide a timeutil.TimeSource which will be used when determining ingestion
// timestamps. If nil is provided, timeutil.DefaultTimeSource will be used.
func MakeEventIngester(
_ context.Context, consumer obslib.EventConsumer, timeSource timeutil.TimeSource,
) *EventIngester {
if timeSource == nil {
timeSource = timeutil.DefaultTimeSource{}
}
return &EventIngester{
consumer: consumer,
timeSource: timeSource,
}
}

// Export implements the LogsServiceServer gRPC service.
//
// NB: "Export" is a bit of a misnomer here. On the client side,
// it makes sense, but on this end of the wire, we are *Ingesting*
// events, not exporting them. This is simply the receiving end
// of that process. This is done to maintain compatibility between
// the OpenTelemetry Collector and our EventsExporter.
func (e *EventIngester) Export(
ctx context.Context, request *logspb.ExportLogsServiceRequest,
) (*logspb.ExportLogsServiceResponse, error) {
if err := e.unpackAndConsumeEvents(ctx, request); err != nil {
log.Errorf(ctx, "error consuming events: %v", err)
return nil, err
}
return &logspb.ExportLogsServiceResponse{}, nil
}

// TODO(abarganier): Add context cancellation here to cap transformation/unpack time.
// TODO(abarganier): Add metric to track context cancellations (counter tracking failed transformations)
func (e *EventIngester) unpackAndConsumeEvents(
ctx context.Context, request *logspb.ExportLogsServiceRequest,
) error {
ingestTime := e.timeSource.Now()
var retErr error = nil
for _, resource := range request.ResourceLogs {
for _, scopeLogs := range resource.ScopeLogs {
for _, logRecord := range scopeLogs.LogRecords {
transformed := transform.LogRecordToEvent(ingestTime, resource.Resource, scopeLogs.Scope, logRecord)
if err := e.consumer.Consume(ctx, transformed); err != nil {
retErr = errors.CombineErrors(retErr, err)
}
}
}
}
return retErr
}
Loading

0 comments on commit 946d774

Please sign in to comment.