-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
obsservice: ingest events #84683
obsservice: ingest events #84683
Conversation
I plan on adding some tests. |
8999d60
to
be97459
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @aadityasondhi, @abarganier, and @dhartunian)
pkg/server/tenant.go
line 547 at r2 (raw file):
monitorAndMetrics.rootSQLMemoryMonitor, // memMonitor - this is not "SQL" usage, but we don't have another memory pool, ) // TODO(andrei): figure out what cluster ID and node ID to use and then call
hey @knz do you have thoughts on how to get a "cluster ID" and a "node ID" for a tenant?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @aadityasondhi, @abarganier, @andreimatei, and @dhartunian)
pkg/server/tenant.go
line 547 at r2 (raw file):
Previously, andreimatei (Andrei Matei) wrote…
hey @knz do you have thoughts on how to get a "cluster ID" and a "node ID" for a tenant?
This point is too early to get them; they are bound later during server initialization.
The best you can do at this point is take a reference to their container (respectively: rpcContext.LogicalClusterID
, baseCfg.IDContainer
).
The value can be sampled after preStart()
completes (which is under startTenantInternal()
).
be97459
to
71706fc
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is awesome, nice work. A few comments
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @aadityasondhi, @andreimatei, and @dhartunian)
Code quote:
I hope we can remove system.eventlog.
-- commits
line 37 at r3:
to be fair, https://github.com/open-telemetry/opentelemetry-proto says:
The proto files can be consumed as GIT submodules or copied and built directly in the consumer project.
We're just following directions 😁
Code quote:
I've given up
pkg/obs/event_exporter.go
line 43 at r3 (raw file):
// requests from the Observability Service to subscribe to the events stream. // Once a subscription is established, new events (published through // SendEvent()) are sent to the subscriber.
Cool - I could see us building cockroachdb receiver & exporter components for the otel collector some day to integrate with this .
Suggestion:
// EventsServer implements the obspb.ObsServer gRPC service. It responds to
// requests from the Observability Service to subscribe to the events stream.
// Once a subscription is established, new events (published through
// SendEvent()) are sent to the subscriber.
pkg/obs/event_exporter.go
line 310 at r3 (raw file):
} buf := s.buf.mu.events[typ]
nit: should we check/handle !ok
with this map access?
Code quote:
buf := s.buf.mu.events[typ]
pkg/obs/event_exporter.go
line 411 at r3 (raw file):
// If we failed to send, we can't use this subscriber anymore. // // TODO(andrei): Figure out how to tolerate errors; we should put the events
in the meantime, maybe we should log something here.
Code quote:
// TODO(andrei): Figure out how to tolerate errors; we should put the events
// back in the buffer (or not take them out of the buffer in the first
// place) in hope that a new subscriber comes along.
pkg/obs/event_exporter.go
line 427 at r3 (raw file):
) *subscriber { sub := &subscriber{ res: s.resource,
nit: should we do a sanity check here that the resource has been set?
Code quote:
res: s.resource,
pkg/obs/event_exporter.go
line 487 at r3 (raw file):
s.reset(ctx, sub) // TODO(andrei): It might be a good idea to return errors in some cases // (another subscriber coming, or quiescence).
Agreed, the subscriber should be told that information. It seems like the error passed to subscriber.close
and sent along to subscriber.stopC
isn't used currently? Maybe runFlusher
should return an error
that we can return here.
Code quote:
// (another subscriber coming, or quiescence).
pkg/obs/event_exporter.go
line 501 at r3 (raw file):
ctx context.Context, bufs *eventsBuffers, flushInterval time.Duration, ) { defer close(sub.flusherDoneC)
Should we defer the closing of sub.flushC
and sub.doneC
too?
Code quote:
defer close(sub.flusherDoneC)
pkg/server/server.go
line 776 at r3 (raw file):
registry.AddMetricStruct(kvProber.Metrics()) obsServer := obs.NewEventServer(
nit: can you note the necessary call SetResourceInfo
(and maybe RegisterObsServer
as well) in a comment here?
pkg/server/server.go
line 829 at r3 (raw file):
monitorAndMetrics: sqlMonitorAndMetrics, settingsStorage: settingsWriter, eventsServer: obsServer,
nit: thoughts on calling it obsServer
as well in the SQL Server for consistency?
Code quote:
eventsServer: obsServer,
pkg/sql/event_log.go
line 120 at r3 (raw file):
// | // +--> system.eventlog if not disabled by setting // | └ also the Obs Service, if connected
nit: switch to spaces for better alignment
Code quote:
// |
pkg/sql/event_log.go
line 487 at r3 (raw file):
ctx context.Context, ex *InternalExecutor, eventsExporter obs.EventsExporter,
nit: Unused - should we use this below instead of ex.s.cfg.EventsExporter
?
Code quote:
eventsExporter obs.EventsExporter,
pkg/sql/event_log.go
line 603 at r3 (raw file):
}) // In the common case where we have just 1 event, we want to skeep
I'm curious about what originally led to this optimization.
It seems like the old code is intentionally avoiding loops, which we've introduced above in this PR in the primary code path.
cc @knz - I see this was added by you a while back in 9318af9. Do you by any chance remember if there were any adverse performance effects that led you to add this optimization? Or was it just done because it was an easy optimization to make here? I just want to make sure we're not inadvertently introducing any weird side-effects with these changes.
Code quote:
// In the common case where we have just 1 event, we want to skeep
// the extra heap allocation and buffer operations of the loop
// below. This is an optimization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @aadityasondhi, @abarganier, @andreimatei, and @dhartunian)
Previously, abarganier (Alex Barganier) wrote…
🔥
See #84983.
pkg/sql/event_log.go
line 603 at r3 (raw file):
Previously, abarganier (Alex Barganier) wrote…
I'm curious about what originally led to this optimization.
It seems like the old code is intentionally avoiding loops, which we've introduced above in this PR in the primary code path.
cc @knz - I see this was added by you a while back in 9318af9. Do you by any chance remember if there were any adverse performance effects that led you to add this optimization? Or was it just done because it was an easy optimization to make here? I just want to make sure we're not inadvertently introducing any weird side-effects with these changes.
Yes this optimization is important if we actually write to the system.eventlog table -- it dramatically decreases the number of KV roundtrips when there are multiple events bundled together (such as what happen during a DROP CASCADE, certain priv changes etc).
It is less important if we do not write to the system.eventlog table (see #84983).
71706fc
to
034059a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not quite done with the tests, but it's now close to what I'd merge.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @aadityasondhi, @abarganier, @andreimatei, and @dhartunian)
pkg/obs/event_exporter.go
line 43 at r3 (raw file):
Previously, abarganier (Alex Barganier) wrote…
Cool - I could see us building cockroachdb receiver & exporter components for the otel collector some day to integrate with this .
yup
pkg/obs/event_exporter.go
line 310 at r3 (raw file):
Previously, abarganier (Alex Barganier) wrote…
nit: should we check/handle
!ok
with this map access?
done
pkg/obs/event_exporter.go
line 411 at r3 (raw file):
Previously, abarganier (Alex Barganier) wrote…
in the meantime, maybe we should log something here.
done
pkg/obs/event_exporter.go
line 427 at r3 (raw file):
Previously, abarganier (Alex Barganier) wrote…
nit: should we do a sanity check here that the resource has been set?
done
pkg/obs/event_exporter.go
line 487 at r3 (raw file):
Previously, abarganier (Alex Barganier) wrote…
Agreed, the subscriber should be told that information. It seems like the error passed to
subscriber.close
and sent along tosubscriber.stopC
isn't used currently? MayberunFlusher
should return anerror
that we can return here.
done
pkg/obs/event_exporter.go
line 501 at r3 (raw file):
Previously, abarganier (Alex Barganier) wrote…
Should we defer the closing of
sub.flushC
andsub.doneC
too?
It would not be correct to close flushC
because SendEvent
might be sending on it concurrently. But note that SendEvent's
write is not blocking.
For doneC
I think you meant flushAndStopC
. That channel is also not for runFlusher
to close; runFlusher
is read-only for the subscriber (that's also enforced at the type level, so we can't close
it even if we wanted to. flushAndStopC
is the stopper channel.
pkg/server/server.go
line 776 at r3 (raw file):
Previously, abarganier (Alex Barganier) wrote…
nit: can you note the necessary call
SetResourceInfo
(and maybeRegisterObsServer
as well) in a comment here?
done
pkg/server/server.go
line 829 at r3 (raw file):
Previously, abarganier (Alex Barganier) wrote…
nit: thoughts on calling it
obsServer
as well in the SQL Server for consistency?
renamed to eventServer
everywhere
pkg/sql/event_log.go
line 120 at r3 (raw file):
Previously, abarganier (Alex Barganier) wrote…
nit: switch to spaces for better alignment
done
pkg/sql/event_log.go
line 487 at r3 (raw file):
Previously, abarganier (Alex Barganier) wrote…
nit: Unused - should we use this below instead of
ex.s.cfg.EventsExporter
?
yes, done
pkg/sql/event_log.go
line 603 at r3 (raw file):
Previously, knz (kena) wrote…
Yes this optimization is important if we actually write to the system.eventlog table -- it dramatically decreases the number of KV roundtrips when there are multiple events bundled together (such as what happen during a DROP CASCADE, certain priv changes etc).
It is less important if we do not write to the system.eventlog table (see #84983).
I've added some loops, but I don't see anything scary in them.
node_id INT NOT NULL, | ||
event_type STRING NOT NULL, | ||
event JSONB, | ||
CONSTRAINT "primary" PRIMARY KEY (timestamp, id) USING HASH WITH BUCKET_COUNT = 16 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These days you don't need the WITH BUCKET_COUNT = 16
, it's the default. Also, this syntax is deprecated in favor of CONSTRAINT "primary" PRIMARY KEY (timestamp, id) USING HASH WITH (bucket_count = 16)
if you did want to state it explicitly.
timestamp TIMESTAMP NOT NULL, | ||
id BYTES NOT NULL DEFAULT uuid_v4(), | ||
cluster_id BYTES NOT NULL, | ||
node_id INT NOT NULL, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you interested in calling this instance_id
? Should this have a tenant_id
?
// persistEventlogEvents writes "eventlog" events to the database. | ||
func persistEventlogEvents( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any interest in making yourself an interface to write the data to persistence for later dependency injection. Feels a little crazy to be passing pgxpool.Pool
here, but maybe that's just me. It seems like you ought to make yourself a data model and then have some interfaces to the database to interact with that model.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @aadityasondhi, @abarganier, @ajwerner, @andreimatei, and @dhartunian)
pkg/obsservice/obslib/ingest/ingest.go
line 127 at r4 (raw file):
Previously, ajwerner wrote…
Any interest in making yourself an interface to write the data to persistence for later dependency injection. Feels a little crazy to be passing
pgxpool.Pool
here, but maybe that's just me. It seems like you ought to make yourself a data model and then have some interfaces to the database to interact with that model.
But what's wrong with passing in a pool (as a handle to a database)? Even for unit tests this works fine - i.e. I don't see tests using any sort of mock.
I'm not very interested in introducing more abstraction in this PR. Did you have an ORM in mind? I imagine we will probably end up some ORM for some event types (probably more for the needs of API endpoints reading the events, rather than for ingesting them), but I'd wait for that need to come (partially because I'll have to educate myself on ORMs when that moment comes). And it might not come for all event types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @aadityasondhi, @abarganier, @andreimatei, and @dhartunian)
pkg/obsservice/obslib/ingest/ingest.go
line 127 at r4 (raw file):
Previously, andreimatei (Andrei Matei) wrote…
But what's wrong with passing in a pool (as a handle to a database)? Even for unit tests this works fine - i.e. I don't see tests using any sort of mock.
I'm not very interested in introducing more abstraction in this PR. Did you have an ORM in mind? I imagine we will probably end up some ORM for some event types (probably more for the needs of API endpoints reading the events, rather than for ingesting them), but I'd wait for that need to come (partially because I'll have to educate myself on ORMs when that moment comes). And it might not come for all event types.
Fair enough. Wait-and-see seems fine.
91d4905
to
1d931b0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @aadityasondhi, @abarganier, @ajwerner, @andreimatei, and @dhartunian)
pkg/obsservice/obslib/migrations/sqlmigrations/0001_init.sql
line 6 at r4 (raw file):
Previously, ajwerner wrote…
Are you interested in calling this
instance_id
? Should this have atenant_id
?
renamed to instance_id
For tenants, the plan is for cluster_id
to represent the tenant id, and have a separate table mapping tenant cluster to host clusters. This other table will be populated by a yet-to-be-written component that detects tenants coming (and also going?) into a host cluster that's being monitored, as well as SQL pods coming and going. FWIW, the component that watches for nodes coming and going in a host cluster is also yet-to-be-written.
FWIW, I see that in CRDB we don't talk about "tenant_id"; we just talk about cluster IDs. E.g. in tenant.go
, we do args.rpcContext.LogicalClusterID.Get()
.
Does this all sound right?
pkg/obsservice/obslib/migrations/sqlmigrations/0001_init.sql
line 9 at r4 (raw file):
Previously, ajwerner wrote…
These days you don't need the
WITH BUCKET_COUNT = 16
, it's the default. Also, this syntax is deprecated in favor ofCONSTRAINT "primary" PRIMARY KEY (timestamp, id) USING HASH WITH (bucket_count = 16)
if you did want to state it explicitly.
I do prefer it explicit. I've switched to the parens syntax.
1d931b0
to
216bad6
Compare
216bad6
to
d0a74e9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 1 of 0 LGTMs obtained (waiting on @aadityasondhi, @abarganier, @ajwerner, @andreimatei, @dhartunian, and @knz)
pkg/obs/event_exporter.go
line 501 at r3 (raw file):
Previously, andreimatei (Andrei Matei) wrote…
It would not be correct to close
flushC
becauseSendEvent
might be sending on it concurrently. But note thatSendEvent's
write is not blocking.
FordoneC
I think you meantflushAndStopC
. That channel is also not forrunFlusher
to close;runFlusher
is read-only for the subscriber (that's also enforced at the type level, so we can'tclose
it even if we wanted to.flushAndStopC
is the stopper channel.
I see, my mistake - thanks for explaining!
pkg/sql/event_log.go
line 603 at r3 (raw file):
Previously, andreimatei (Andrei Matei) wrote…
I've added some loops, but I don't see anything scary in them.
Thanks for clarifying @knz - lgtm
883f687
to
2b26f8d
Compare
This patch adds a couple of things: 1. an RPC endpoint to CRDB for streaming out events. This RPC service is called by the Obs Service. 2. a library in CRDB for exporting events. 3. code in the Obs Service for ingesting the events and writing them into a table in the sink cluster. The first use of the event exporting is for the system.eventlog events. All events written to that table are now also exported. Once the Obs Service takes hold in the future, I hope we can remove system.eventlog. The events are represented using OpenTelemetry protos. Unfortunately, I've had to copy over the otel protos to our tree because I couldn't figure out a vendoring solution. Problems encountered for vendoring are: 1. The repo where these protos live (https://github.com/open-telemetry/opentelemetry-proto) is not go-get-able. This means hacks are needed for our vendoring tools. 2. Some of the protos in that repo do not build with gogoproto (they only build with protoc), because they use the new-ish "optional" option on some fields. The logs protos that we use in this patch do not have this problem, but others do (so we'll need to figure something out in the future when dealing with the metrics proto). FWIW, the OpenTelemetry Collector ironically has the same problem (it uses gogoproto), and it solved it through a sed that changes all the optional fields to one-of's. 3. Even if you solve the first two problems, the next one is that we already have a dependency on these compiled protos in our tree (go.opentelemetry.io/proto/otlp). This repo contains generated code, using protoc. We need it because of our use of the otlp trace exporter. Bringing in the protos again, and building them with gogo, results in go files that want to live in the same package/have the same import path. So changing the import paths is needed. Between all of these, I've given up - at least for the moment - and decided to copy over to our tree the few protos that we actually need. I'm also changing their import paths. You'll notice that there is a script that codifies the process of transforming the needed protos from their otel upstream. Release note: None
2b26f8d
to
bf3e9e4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TFTRs!
bors r+
Reviewable status: complete! 1 of 0 LGTMs obtained (waiting on @aadityasondhi, @abarganier, @ajwerner, @andreimatei, @dhartunian, and @knz)
Build succeeded: |
This patch adds a couple of things:
called by the Obs Service.
into a table in the sink cluster.
The first use of the event exporting is for the system.eventlog events.
All events written to that table are now also exported. Once the Obs
Service takes hold in the future, I hope we can remove system.eventlog.
The events are represented using OpenTelemetry protos. Unfortunately,
I've had to copy over the otel protos to our tree because I couldn't
figure out a vendoring solution. Problems encountered for vendoring are:
(https://github.com/open-telemetry/opentelemetry-proto) is not
go-get-able. This means hacks are needed for our vendoring tools.
only build with protoc), because they use the new-ish "optional"
option on some fields. The logs protos that we use in this patch do not
have this problem, but others do (so we'll need to figure something out
in the future when dealing with the metrics proto). FWIW, the
OpenTelemetry Collector ironically has the same problem (it uses
gogoproto), and it solved it through a sed that changes all the optional
fields to one-of's.
already have a dependency on these compiled protos in our tree
(go.opentelemetry.io/proto/otlp). This repo contains generated code,
using protoc. We need it because of our use of the otlp trace exporter.
Bringing in the protos again, and building them with gogo, results in go
files that want to live in the same package/have the same import path.
So changing the import paths is needed.
Between all of these, I've given up - at least for the moment - and
decided to copy over to our tree the few protos that we actually need.
I'm also changing their import paths. You'll notice that there is a
script that codifies the process of transforming the needed protos from
their otel upstream.
Release note: None
Release justification: Non-production.