Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
105316: obsservice: migrate gRPC ingest to follow new architecture  r=knz a=abarganier

**Reviewer note: review this PR commit-wise.**

----

In the original design of the obsservice, exported events were
intended to be written directly to storage. The idea was that
exported events would experience minimal transformation once
ingested, meaning that work done to "package" events properly
was left up to the exporting client (CRDB). The obsservice
would then store the ingested invents into a target storage.
This concept of target storage has been removed for now as
part of this patch.

In the new architecture, exported events are more "raw", and
we expect the obsservice to heavily transform & aggregate the
data externally, where the aggregated results are flushed
to storage instead.

This patch takes the pre-existing gRPC events ingester, and
modifies it to meet the new architecture.

The events ingester will now be provided with a consumer with
which it can feed ingested events into the broader pipeline.
It is no longer the responsibility of the ingester to write
ingested events to storage.

For now, we use a simple STDOUT consumer that writes all
ingested events to STDOUT, but in the future, this will
be a more legitimate component - part of a chain that
eventually buffers ingested events for aggregation.

Release note: none

Epic: CRDB-28526

105589: ccl/sqlproxyccl: fix possible flake in TestProxyProtocol r=pjtatlow a=jaylim-crl

Fixes #105585.

This commit updates the TestProxyProtocol test to only test the case where RequireProxyProtocol=true. There's no point testing the case where the RequireProxyProtocol field is false since every other tests do not use the proxy protocol (and that case is implicitly covered by them).

It's unclear what is causing this test flake (and it is extremely rare, i.e. 1 legit failure out of 1000 runs [1]). It may be due to some sort of race within the tests, but given that the case is covered by all other tests, this commit opts to remove the test entirely.

[1] https://teamcity.cockroachdb.com/test/-1121006080109385641?currentProjectId=Cockroach_Ci_TestsGcpLinuxX8664BigVm&expandTestHistoryChartSection=true

Release note: None

Release justification: Fixes a test flake.

Epic: none

105630: roachtest: handle panics in `mixedversion` r=smg260 a=renatolabs

Previously, a panic in a user function in a roachtest using the `mixedversion` package would crash the entire roachtest process. This is because all steps run in a separate goroutine, so if panics are not captured, the entire process crashes.

This commit updates the test runner so that all steps (including those that are part of the test infrastructure) run with panics captured. The panic message is returned as a regular error which should lead to usual GitHub error reports. The stack trace for the panic is also logged so that we can pinpoint the exact offending line in the test.

Epic: CRDB-19321

Release note: None

Co-authored-by: Alex Barganier <[email protected]>
Co-authored-by: Jay <[email protected]>
Co-authored-by: Renato Costa <[email protected]>
  • Loading branch information
4 people committed Jun 27, 2023
4 parents 1f2f004 + 946d774 + bcfe501 + 8ca2fc4 commit bdf2a64
Show file tree
Hide file tree
Showing 34 changed files with 1,076 additions and 519 deletions.
5 changes: 5 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1434,6 +1434,11 @@ GO_TARGETS = [
"//pkg/obsservice/obslib/ingest:ingest",
"//pkg/obsservice/obslib/ingest:ingest_test",
"//pkg/obsservice/obslib/migrations:migrations",
"//pkg/obsservice/obslib/obsutil:obstestutil",
"//pkg/obsservice/obslib/obsutil:obsutil",
"//pkg/obsservice/obslib/obsutil:testutil",
"//pkg/obsservice/obslib/transform:transform",
"//pkg/obsservice/obslib:obslib",
"//pkg/obsservice/obspb/opentelemetry-proto/collector/logs/v1:logs_service",
"//pkg/obsservice/obspb/opentelemetry-proto/common/v1:common",
"//pkg/obsservice/obspb/opentelemetry-proto/logs/v1:logs",
Expand Down
73 changes: 23 additions & 50 deletions pkg/ccl/sqlproxyccl/proxy_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,61 +212,34 @@ func TestProxyProtocol(t *testing.T) {
}
}

t.Run("allow=true", func(t *testing.T) {
s, sqlAddr, httpAddr := withProxyProtocol(true)
s, sqlAddr, httpAddr := withProxyProtocol(true)

defer testutils.TestingHook(&validateFn, func(h *proxyproto.Header) error {
if h.SourceAddr.String() != "10.20.30.40:4242" {
return errors.Newf("got source addr %s, expected 10.20.30.40:4242", h.SourceAddr)
}
return nil
})()

// Test SQL. Only request with PROXY should go through.
url := fmt.Sprintf("postgres://bob:builder@%s/tenant-cluster-42.defaultdb?sslmode=require", sqlAddr)
te.TestConnectWithPGConfig(
ctx, t, url,
func(c *pgx.ConnConfig) {
c.DialFunc = proxyDialer
},
func(conn *pgx.Conn) {
require.Equal(t, int64(1), s.metrics.CurConnCount.Value())
require.NoError(t, runTestQuery(ctx, conn))
},
)
_ = te.TestConnectErr(ctx, t, url, codeClientReadFailed, "tls error")

// Test HTTP. Should support with or without PROXY.
client := http.Client{Timeout: timeout}
makeHttpReq(t, &client, httpAddr, true)
proxyClient := http.Client{Transport: &http.Transport{DialContext: proxyDialer}}
makeHttpReq(t, &proxyClient, httpAddr, true)
})

t.Run("allow=false", func(t *testing.T) {
s, sqlAddr, httpAddr := withProxyProtocol(false)
defer testutils.TestingHook(&validateFn, func(h *proxyproto.Header) error {
if h.SourceAddr.String() != "10.20.30.40:4242" {
return errors.Newf("got source addr %s, expected 10.20.30.40:4242", h.SourceAddr)
}
return nil
})()

// Test SQL. Only request without PROXY should go through.
url := fmt.Sprintf("postgres://bob:builder@%s/tenant-cluster-42.defaultdb?sslmode=require", sqlAddr)
te.TestConnect(ctx, t, url, func(conn *pgx.Conn) {
// Test SQL. Only request with PROXY should go through.
url := fmt.Sprintf("postgres://bob:builder@%s/tenant-cluster-42.defaultdb?sslmode=require", sqlAddr)
te.TestConnectWithPGConfig(
ctx, t, url,
func(c *pgx.ConnConfig) {
c.DialFunc = proxyDialer
},
func(conn *pgx.Conn) {
require.Equal(t, int64(1), s.metrics.CurConnCount.Value())
require.NoError(t, runTestQuery(ctx, conn))
})
_ = te.TestConnectErrWithPGConfig(
ctx, t, url,
func(c *pgx.ConnConfig) {
c.DialFunc = proxyDialer
},
codeClientReadFailed,
"tls error",
)
},
)
_ = te.TestConnectErr(ctx, t, url, codeClientReadFailed, "tls error")

// Test HTTP.
client := http.Client{Timeout: timeout}
makeHttpReq(t, &client, httpAddr, true)
proxyClient := http.Client{Transport: &http.Transport{DialContext: proxyDialer}}
makeHttpReq(t, &proxyClient, httpAddr, false)
})
// Test HTTP. Should support with or without PROXY.
client := http.Client{Timeout: timeout}
makeHttpReq(t, &client, httpAddr, true)
proxyClient := http.Client{Transport: &http.Transport{DialContext: proxyDialer}}
makeHttpReq(t, &proxyClient, httpAddr, true)
}

func TestPrivateEndpointsACL(t *testing.T) {
Expand Down
6 changes: 4 additions & 2 deletions pkg/cmd/roachtest/roachtestutil/mixedversion/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ go_library(
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/version",
"@com_github_cockroachdb_errors//:errors",
],
)

go_test(
name = "mixedversion_test",
srcs = ["planner_test.go"],
srcs = [
"planner_test.go",
"runner_test.go",
],
args = ["-test.timeout=295s"],
embed = [":mixedversion"],
deps = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ var (
Stdout: io.Discard,
Stderr: io.Discard,
}
l, err := cfg.NewLogger("" /* path */)
l, err := cfg.NewLogger("/dev/null" /* path */)
if err != nil {
panic(err)
}
Expand Down
15 changes: 12 additions & 3 deletions pkg/cmd/roachtest/roachtestutil/mixedversion/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"path"
"path/filepath"
"regexp"
"runtime/debug"
"strconv"
"strings"
"sync/atomic"
Expand All @@ -32,7 +33,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
)

type (
Expand Down Expand Up @@ -249,7 +249,16 @@ func (tr *testRunner) runSingleStep(ctx context.Context, ss singleStep, l *logge
prefix := fmt.Sprintf("FINISHED [%s]", timeutil.Since(start))
tr.logStep(prefix, ss, l)
}()
if err := ss.Run(ctx, l, tr.cluster, tr.newHelper(ctx, l)); err != nil {

if err := func() (retErr error) {
defer func() {
if r := recover(); r != nil {
l.Printf("panic stack trace:\n%s", string(debug.Stack()))
retErr = fmt.Errorf("panic (stack trace above): %v", r)
}
}()
return ss.Run(ctx, l, tr.cluster, tr.newHelper(ctx, l))
}(); err != nil {
if isContextCanceled(err) {
l.Printf("step terminated (context canceled)")
// Avoid creating a `stepError` (which involves querying binary
Expand Down Expand Up @@ -331,7 +340,7 @@ func (tr *testRunner) testFailure(desc string, l *logger.Logger) error {
tr.logger.Printf("could not rename failed step logger: %v", err)
}

return errors.WithStack(tf)
return tf
}

func (tr *testRunner) logStep(prefix string, step singleStep, l *logger.Logger) {
Expand Down
85 changes: 85 additions & 0 deletions pkg/cmd/roachtest/roachtestutil/mixedversion/runner_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package mixedversion

import (
"context"
"fmt"
"testing"

"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
"github.com/stretchr/testify/require"
)

func Test_runSingleStep(t *testing.T) {
tr := testTestRunner()

// steps that run without errors do not return errors
successStep := newTestStep(func() error {
return nil
})
err := tr.runSingleStep(ctx, successStep, nilLogger)
require.NoError(t, err)

// steps that return an error have that error surfaced
errorStep := newTestStep(func() error {
return fmt.Errorf("oops")
})
err = tr.runSingleStep(ctx, errorStep, nilLogger)
require.Error(t, err)
require.Contains(t, err.Error(), "oops")

// steps that panic cause an error to be returned
panicStep := newTestStep(func() error {
var ids []int
if ids[0] > 42 {
return nil
}
return fmt.Errorf("unreachable")
})
err = nil
require.NotPanics(t, func() {
err = tr.runSingleStep(ctx, panicStep, nilLogger)
})
require.Error(t, err)
require.Contains(t, err.Error(), "panic (stack trace above): runtime error: index out of range [0] with length 0")
}

func testTestRunner() *testRunner {
runnerCtx, cancel := context.WithCancel(ctx)
return &testRunner{
ctx: runnerCtx,
cancel: cancel,
logger: nilLogger,
crdbNodes: nodes,
background: newBackgroundRunner(runnerCtx),
seed: seed,
}
}

type testSingleStep struct {
runFunc func() error
}

func (testSingleStep) ID() int { return 42 }
func (testSingleStep) Description() string { return "testSingleStep" }
func (testSingleStep) Background() shouldStop { return nil }

func (tss testSingleStep) Run(
_ context.Context, _ *logger.Logger, _ cluster.Cluster, _ *Helper,
) error {
return tss.runFunc()
}

func newTestStep(f func() error) singleStep {
return testSingleStep{runFunc: f}
}
1 change: 1 addition & 0 deletions pkg/gen/protobuf.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ PROTOBUF_SRCS = [
"//pkg/obsservice/obspb/opentelemetry-proto/common/v1:v1_go_proto",
"//pkg/obsservice/obspb/opentelemetry-proto/logs/v1:v1_go_proto",
"//pkg/obsservice/obspb/opentelemetry-proto/resource/v1:v1_go_proto",
"//pkg/obsservice/obspb:obspb_go_proto",
"//pkg/repstream/streampb:streampb_go_proto",
"//pkg/roachpb:roachpb_go_proto",
"//pkg/rpc:rpc_go_proto",
Expand Down
1 change: 1 addition & 0 deletions pkg/obs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/base",
"//pkg/obsservice/obslib",
"//pkg/obsservice/obspb",
"//pkg/obsservice/obspb/opentelemetry-proto/collector/logs/v1:logs_service",
"//pkg/obsservice/obspb/opentelemetry-proto/common/v1:common",
Expand Down
5 changes: 5 additions & 0 deletions pkg/obs/event_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/obsservice/obslib"
"github.com/cockroachdb/cockroach/pkg/obsservice/obspb"
otel_collector_pb "github.com/cockroachdb/cockroach/pkg/obsservice/obspb/opentelemetry-proto/collector/logs/v1"
otel_pb "github.com/cockroachdb/cockroach/pkg/obsservice/obspb/opentelemetry-proto/common/v1"
Expand Down Expand Up @@ -94,6 +95,10 @@ type EventExporterTestingKnobs struct {
// FlushTriggerByteSize, if set, overrides the default trigger value for the
// EventExporter.
FlushTriggerByteSize uint64
// TestConsumer, if set, sets the consumer to be used by the embedded ingest
// component used. This allows us to capture consumed events when running
// in embedded mode, so we can make assertions against them in tests.
TestConsumer obslib.EventConsumer
}

var _ base.ModuleTestingKnobs = &EventExporterTestingKnobs{}
Expand Down
36 changes: 10 additions & 26 deletions pkg/obsservice/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ The Obs Service is developed as a library (in the `obslib` package) and a binary
embed and extend the library (for example we imagine CockroachCloud doing so in
the future).

**Note**: Serving DB Console is no longer a core part of the planned utility of the
Obs Service. However, as the functionality to serve the DB Console is maintained for
now, in case it proves useful down the line.

## Building the Obs Service

Build with
Expand Down Expand Up @@ -46,7 +50,8 @@ exporters:
tls:
insecure: true
```
- `--http-addr` is the address on which the DB Console is served.
- `--http-addr` is the address on which the DB Console is served. NB: This feature may
be removed in the future. See note above in [header section](#CockroachDB-Observability-Service)
- `--crdb-http-url` is CRDB's HTTP address. For a multi-node CRDB cluster, this
can point to a load-balancer. It can be either a HTTP or an HTTPS address,
depending on whether the CRDB cluster is running as `--insecure`.
Expand All @@ -70,10 +75,6 @@ exporters:
one created with `cockroach cert create-ca`). If specified, HTTP requests are
only proxied to CRDB nodes that present certificates signed by this CA. If not
specified, the system's CA list is used.
- `--sink-pgurl` is the connection string for the sink cluster. If the pgurl
contains a database name, that database will be used; otherwise `obsservice`
will be used. If not specified, a connection to a local cluster will be
attempted.

## Functionality

Expand All @@ -86,13 +87,8 @@ In the current fledgling state, the Obs Service does a couple of things:

3. The Obs Service exposes the OTLP Logs gRPC service and is able to ingest
events received through calls to this RPC service. Only insecure gRPC
connections are supported at the moment.

4. The Obs Service connects to a sink cluster identified by `--sink-pgurl`. The
required schema is automatically created using SQL migrations run with
[goose](https://github.com/pressly/goose). The state of migrations in a sink
cluster can be inspected through the `observice.obs_admin.migrations` table.
The ingested events are saved in the sink cluster.
connections are supported at the moment. Events are ingested into the
Obs Service for aggregation and eventual storage.

## Event ingestion

Expand All @@ -106,22 +102,10 @@ and,within that, into
[`ScopeLogs`](https://github.com/open-telemetry/opentelemetry-proto/blob/200ccff768a29f8bd431e0a4a463da7ed58be557/opentelemetry/proto/logs/v1/logs.proto#L64).
A resource identifies the cluster/node/tenant that is emitting the respective
events. A scope identifies the type of event; events of different types get
persisted in different tables, based on this event type. Events of unrecognized
types are dropped. Currently, a single event type is supported: `"eventlog"`.
routed to different processing pipelines, based on this event type. Events of
unrecognized types are dropped. Currently, a single event type is supported: `"eventlog"`.
The log records carry attributes and a JSON payload representing the event.

The mapping between event types and tables is listed in the table below:

| Event type | Table | Attributes |
|------------|----------------|--------------|
| eventlog | cluster_events | {event_type} |

Each table storing events can have a different schema. It is expected that these
tables store some event fields as columns and otherwise store the raw event in a
JSON column. The values of the different columns can come from the attributes of
the log record (listed in the table above), or from the JSON itself. Virtual or
computed columns can be used to extract data from the JSON directly.

## Licensing

The Observability Service is licensed as Apache 2.0.
3 changes: 1 addition & 2 deletions pkg/obsservice/cmd/obsservice/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,13 @@ go_library(
"//pkg/cli/exit",
"//pkg/obsservice/obslib/httpproxy",
"//pkg/obsservice/obslib/ingest",
"//pkg/obsservice/obslib/migrations",
"//pkg/obsservice/obslib/obsutil",
"//pkg/obsservice/obspb/opentelemetry-proto/collector/logs/v1:logs_service",
"//pkg/ui/distoss",
"//pkg/util/log",
"//pkg/util/stop",
"//pkg/util/sysutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_jackc_pgx_v4//pgxpool",
"@com_github_spf13_cobra//:cobra",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_x_sys//unix",
Expand Down
Loading

0 comments on commit bdf2a64

Please sign in to comment.