Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
90828: roachtest: add --debug-always flag r=stevendanna a=stevendanna

Occasionally, it is very useful to keep a cluster around even if the workload happened to complete without error. The --debug-always is like --debug but saves the cluster even if the test was successful.

Epic: None

Release note: None

91359: keys: accurately size pre-allocated buffers in MakeRangeKey r=arulajmani a=nvanbenschoten

This commit includes a trio of changes that accurately size the byte buffers in `MakeRangeKey` and `MakeRangeKeyPrefix` to avoid unnecessary slice resizing (allocation + memcpy) when constructing range-local keys (for example, `RangeDescriptorKey` and `TransactionKey`).

The first change is to include the size of the suffix and detail slices when pre-allocating the byte buffer. We know that `MakeRangeKey` will be appending these to the key, so they will force a resize if not accounted for upfront.

The second change is to correctly account for the overhead of `EncodeBytesAscending`. The code was getting this wrong in two ways. First, it was failing to account for the 3 bytes of unconditional overhead added by the encoding scheme for the encoding type marker and terminator. Second, it was failing to account for the conditional overhead when bytes need to be escaped. We now accurately and efficiently compute the overhead ahead of time to avoid resizing.

The third change is to redefine the transaction tombstone and push marker keys used with the timestamp cache. Previously, the tombstone and push specifiers were additional suffixes that we added after the `LocalTransactionSuffix`. Now, these specifiers are the only suffix added to the key, which avoids an additional key resize.

```
name                         old time/op    new time/op    delta
KV/Update/Native/rows=1-10     70.6µs ± 2%    69.6µs ± 2%  -1.38%  (p=0.000 n=100+93)
KV/Insert/Native/rows=1-10     46.0µs ± 2%    45.7µs ± 2%  -0.62%  (p=0.000 n=95+97)
KV/Delete/Native/rows=1-10     47.3µs ± 2%    47.1µs ± 2%  -0.55%  (p=0.000 n=97+94)
KV/Insert/Native/rows=10-10    74.4µs ± 2%    74.1µs ± 3%  -0.45%  (p=0.000 n=96+98)
KV/Update/Native/rows=10-10     171µs ± 3%     170µs ± 3%  -0.36%  (p=0.045 n=96+98)
KV/Delete/Native/rows=10-10    85.1µs ± 2%    84.8µs ± 2%  -0.29%  (p=0.020 n=98+93)
KV/Update/SQL/rows=1-10         174µs ± 3%     174µs ± 3%  -0.28%  (p=0.041 n=97+89)
KV/Insert/SQL/rows=1-10         131µs ± 2%     131µs ± 4%    ~     (p=0.961 n=89+91)
KV/Insert/SQL/rows=10-10        186µs ± 3%     186µs ± 3%    ~     (p=0.970 n=94+92)
KV/Update/SQL/rows=10-10        336µs ± 2%     336µs ± 3%    ~     (p=0.947 n=92+92)
KV/Delete/SQL/rows=1-10         149µs ± 2%     149µs ± 3%    ~     (p=0.917 n=96+96)
KV/Delete/SQL/rows=10-10        226µs ± 8%     225µs ±10%    ~     (p=0.057 n=97+98)

name                         old alloc/op   new alloc/op   delta
KV/Insert/Native/rows=1-10     17.9kB ± 1%    17.6kB ± 1%  -1.95%  (p=0.000 n=100+100)
KV/Delete/Native/rows=1-10     18.2kB ± 1%    17.9kB ± 1%  -1.92%  (p=0.000 n=94+94)
KV/Update/Native/rows=1-10     25.1kB ± 0%    24.7kB ± 1%  -1.44%  (p=0.000 n=97+97)
KV/Insert/Native/rows=10-10    44.9kB ± 1%    44.5kB ± 1%  -0.88%  (p=0.000 n=100+99)
KV/Delete/Native/rows=10-10    42.4kB ± 1%    42.0kB ± 0%  -0.87%  (p=0.000 n=96+94)
KV/Delete/SQL/rows=1-10        52.9kB ± 1%    52.6kB ± 1%  -0.48%  (p=0.000 n=96+100)
KV/Update/Native/rows=10-10    75.2kB ± 1%    74.8kB ± 1%  -0.43%  (p=0.000 n=98+99)
KV/Update/SQL/rows=1-10        52.6kB ± 1%    52.4kB ± 0%  -0.35%  (p=0.000 n=95+95)
KV/Insert/SQL/rows=1-10        45.4kB ± 1%    45.3kB ± 0%  -0.24%  (p=0.000 n=97+93)
KV/Update/SQL/rows=10-10        119kB ± 1%     119kB ± 1%  -0.16%  (p=0.002 n=97+99)
KV/Delete/SQL/rows=10-10       88.0kB ± 1%    87.8kB ± 1%  -0.14%  (p=0.017 n=91+95)
KV/Insert/SQL/rows=10-10       93.9kB ± 1%    93.7kB ± 1%  -0.14%  (p=0.000 n=98+100)

name                         old allocs/op  new allocs/op  delta
KV/Insert/Native/rows=1-10        142 ± 0%       130 ± 0%  -8.45%  (p=0.000 n=99+100)
KV/Delete/Native/rows=1-10        143 ± 0%       131 ± 0%  -8.39%  (p=0.000 n=96+94)
KV/Update/Native/rows=1-10        198 ± 0%       186 ± 0%  -6.06%  (p=0.000 n=99+98)
KV/Delete/Native/rows=10-10       275 ± 0%       263 ± 0%  -4.36%  (p=0.000 n=90+90)
KV/Insert/Native/rows=10-10       295 ± 0%       283 ± 0%  -4.07%  (p=0.000 n=93+91)
KV/Update/Native/rows=10-10       472 ± 1%       460 ± 1%  -2.55%  (p=0.000 n=98+98)
KV/Insert/SQL/rows=1-10           365 ± 0%       356 ± 0%  -2.53%  (p=0.000 n=97+75)
KV/Delete/SQL/rows=1-10           402 ± 0%       393 ± 0%  -2.24%  (p=0.000 n=97+98)
KV/Update/SQL/rows=1-10           509 ± 0%       500 ± 0%  -1.81%  (p=0.000 n=96+95)
KV/Insert/SQL/rows=10-10          589 ± 0%       580 ± 0%  -1.53%  (p=0.000 n=94+95)
KV/Delete/SQL/rows=10-10          623 ± 1%       614 ± 1%  -1.47%  (p=0.000 n=98+97)
KV/Update/SQL/rows=10-10          858 ± 1%       849 ± 0%  -1.03%  (p=0.000 n=95+93)
```

I confirmed in heap profiles that this change eliminates all resizes of these buffers.

Release note: None

Epic: None

91719: storage: add MVCCExportFingerprintOptions r=stevendanna a=stevendanna

This adds MVCCExportFingerprintOptions with two new options:

 - StripTenantPrefix
 - StripValueChecksum

The goal of these options is to produce a fingerprint that can be used for comparing data across two tenants.

Note that if arbitrary keys and values are encountered, both options have the possibility of erroneously removing data from the fingerprint that isn't actually a tenant prefix or checksum.

Fixes #91150

Release note: None

91750: kvserver: factor out ReplicatedCmd r=pavelkalinnikov a=tbg

**Background**

We would like to apply log entries at startup time, and so we are
working towards making all code related to entry application
stand-alone (and, as a nice side product, more unit testable).

We already have a decent set of abstractions in place:

- `apply.Decoder` can consume log entries and turn them into
  (iterators over) `apply.Command`.
- `apply.StateMachine` can make an `apply.Batch` which is
  a pebble batch along with the management of any below-raft
  side effects (replicated or in-memory)
- `apply.Batch` has a `Stage` method that handles an `apply.Command`,
  in the simplest case doing little more than adding it to its pebble
  batch (but in the most complex cases, locking adjacent replicas for
  complicated split-merge dances).
- `Stage` returns a `CheckedCmd`, which can be acked to the client.

**This PR**

This PR (i.e. this and the lead-up of preceding commits) provides an
implementation of `apply.Command` (and friends) outside of `kvserver` in
a way that does not lead to code duplication.

Before this PR, `apply.Command` was implemented only by
`*kvserver.replicatedCmd`[^rc], which is a struct wrapping the decoded
raft entry and adding some state (such as whether the entry applies with
a forced error, i.e. as an empty command, or not). Some of this state is
necessary in standalone application as well (like the forced error),
other state isn't (like a tracing span, or client's context). In
particular, `replicatedCmd` holds on to a `ProposalData` which itself
references types related to latching and the quota pool. These are hard
to extract, and besides this isn't necessary as standalone application
doesn't need them.

Instead of trying to transitively extract `replicatedCmd` from
`kvserver`, we split `replicatedCmd` into the standalone part, and let
`replicatedCmd` extend the standalone part with the fields only
applicable during application in the context of a `*Replica`. In effect,
`raftlog.ReplicatedCmd` determines deterministically the effect of a raft
command onto the state machine, whereas `replicatedCmd` additionally
deals with additional concerns outside of that scope, such as managing
concurrency between different state machines, acknowledging clients,
updating in-memory counters, etc.

This decoupling results in the type introduced in this PR,
`ReplicatedCmd`, which is embedded into `replicatedCmd`.

Both `*replicatedCmd` and `*ReplicatedCmd` implement `apply.Command`
(as well as `apply.CheckedCommand` and `apply.AppliedCommand`).

To avoid duplication, the implementations on `*replicatedCmd` fall
through to those of `*ReplicatedCmd` whereever possible (via the
embedding of the latter into the former).

For example, the `IsTrivial` method is purely a property of the
`ReplicatedEvalResult`, i.e. of the raft entry, and can thus be answered
by `ReplicatedCmd.IsTrivial()`; `*replicatedCmd` does not override
this method.[^concern]

For another, a bit less trivial example, the `Rejected()` method checks
whether there is a forced error for this command. This is determined
below raft (i.e. not a property of the entry itself), but both
stand-alone and regular application need to come to the same results
here. This is why the forced error is a field on `ReplicatedCmd` and
the implementation sits on the base as well, simply returning whether
the field is set.

An example where the implementations differ is `IsLocal()`: in
standalone applications commands are never considered "local" (we never
have a client waiting to be acked), so the base implementation returns
`false`, but the implementation on `*replicatedCmd` overrides this to
`return c.proposal != nil`.

With this in place, we have an implementation of
`apply.{,Checked,Applied}Command` present in `raftlog`.

[^rc]: https://github.com/cockroachdb/cockroach/blob/fb4014a31b9b8d8235dc48de52196e64b185f490/pkg/kv/kvserver/replica_application_cmd.go#L27-L79
[^concern]: it might be too opaque to implement an interface wholly or in part via an embedded
implementation. Reviewers should consider whether they'd prefer explicit
"passthrough" methods to be added.

**Next Steps**

It's straightforward to implement the `apply.Decoder` seen in the
walkthrough at the top. The tricky part is `apply.StateMachine`.

Today's implementation of that is essentially the `*Replica`[^sm];
the meat of it is in `*replicaAppBatch`[^rb], which is the
`apply.Batch`, and which holds a `*Replica` that it calls out to on
many occasions, in particular during `Stage()`.

The next challenge will be a similar separation of concerns as carried
out above for `replicatedCmd`. Large parts of `Stage` are related to the
deterministic handling of log entries during application, but others
deal with updating in-memory state, keeping counters, notifying
rangefeeds of events, etc, all concerns not relevant to standalone
application and rather side-effects of state machine application vs
a part of it.

The goal is to introduce a handler interface which can separate these,
and for which the standalone application provides an implementation of
the essentials with no-ops for everything which is not a property of
state machine application in itself. With this interface contained in it,
`replicaAppBatch` can be moved to a leaf package, and can be used in
both standalone and regular log entry application.

[^sm]: https://github.com/cockroachdb/cockroach/blob/ccac3ddd85ca2fd4a8d02a89c82cd04761a1ce26/pkg/kv/kvserver/replica_application_state_machine.go#L106-L121
[^rb]: https://github.com/cockroachdb/cockroach/blob/ccac3ddd85ca2fd4a8d02a89c82cd04761a1ce26/pkg/kv/kvserver/replica_application_state_machine.go#L202-L234

Touches #75729.

Epic: CRDB-220
Release note: None


91755: upgrades: add weaker column schema exists funcs for use with migrations r=ajwerner,rafiss a=andyyang890

This patch adds two schema exists functions for use with migrations
that involve multiple schema changes on the same column(s) in order
to preserve the idempotence of the migration(s). They are weaker
in the sense that they do not check that the stored and final
expected descriptor match.

Informs #91449

Release note: None

91806: streampb: move out of CCL r=adityamaru a=stevendanna

This moves streampb into pkg/streaming so that non-CCL code doesn't need to import CCL code.

It also renames pkg/streaming to pkg/repstream.

Fixes #91005

Release note: None

91920: ptsstorage: allow synthetic timestamps in pts storage r=HonoreDB a=aliher1911

Previously synthetic timestamps were causing failures in changefeeds if checkpoint contained a synthetic timestamps. Timestamp representation was parsed as decimal for storage which is not the case for synthetic timestamps.
This commit changes pts storage to strip synthetic flag to mitigate the issue.
Stripping synthetic flag should be safe as protected timestamp is not used to update key or transaction timestamps but to compare against GC thresholds.

Release note: None

Fixes #91922

Co-authored-by: Steven Danna <[email protected]>
Co-authored-by: Nathan VanBenschoten <[email protected]>
Co-authored-by: Tobias Grieger <[email protected]>
Co-authored-by: Andy Yang <[email protected]>
Co-authored-by: Oleg Afanasyev <[email protected]>
  • Loading branch information
6 people committed Nov 15, 2022
8 parents 56b139b + c6badb4 + f0f9c7e + d316af0 + 0788ad6 + 61cc346 + 1ead23a + 951add4 commit 03f6062
Show file tree
Hide file tree
Showing 72 changed files with 1,024 additions and 503 deletions.
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@
/pkg/settings/ @cockroachdb/unowned
/pkg/spanconfig/ @cockroachdb/kv-prs
/pkg/startupmigrations/ @cockroachdb/unowned @cockroachdb/sql-schema
/pkg/streaming/ @cockroachdb/disaster-recovery
/pkg/repstream/ @cockroachdb/disaster-recovery
/pkg/testutils/ @cockroachdb/test-eng-noreview
/pkg/testutils/reduce/ @cockroachdb/sql-queries
/pkg/testutils/sqlutils/ @cockroachdb/sql-queries
Expand Down
2 changes: 1 addition & 1 deletion docs/generated/redact_safe.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ pkg/base/node_id.go | `*SQLIDContainer`
pkg/base/node_id.go | `*StoreIDContainer`
pkg/ccl/backupccl/backuppb/backup.go | `sz`
pkg/ccl/backupccl/backuppb/backup.go | `timing`
pkg/ccl/streamingccl/streampb/streamid.go | `StreamID`
pkg/cli/exit/exit.go | `Code`
pkg/jobs/jobspb/wrap.go | `Type`
pkg/kv/bulk/bulk_metrics.go | `sz`
Expand All @@ -18,6 +17,7 @@ pkg/kv/kvserver/concurrency/lock/locking.go | `Durability`
pkg/kv/kvserver/concurrency/lock/locking.go | `Strength`
pkg/kv/kvserver/concurrency/lock/locking.go | `WaitPolicy`
pkg/kv/kvserver/kvserverpb/raft.go | `SnapshotRequest_Type`
pkg/repstream/streampb/streamid.go | `StreamID`
pkg/roachpb/data.go | `LeaseSequence`
pkg/roachpb/data.go | `ReplicaChangeType`
pkg/roachpb/data.go | `TransactionStatus`
Expand Down
8 changes: 4 additions & 4 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -794,7 +794,6 @@ GO_TARGETS = [
"//pkg/ccl/streamingccl/streamingest:streamingest",
"//pkg/ccl/streamingccl/streamingest:streamingest_test",
"//pkg/ccl/streamingccl/streamingtest:streamingtest",
"//pkg/ccl/streamingccl/streampb:streampb",
"//pkg/ccl/streamingccl/streamproducer:streamproducer",
"//pkg/ccl/streamingccl/streamproducer:streamproducer_test",
"//pkg/ccl/streamingccl:streamingccl",
Expand Down Expand Up @@ -1248,6 +1247,8 @@ GO_TARGETS = [
"//pkg/obsservice/obspb/opentelemetry-proto/resource/v1:resource",
"//pkg/obsservice/obspb:obspb",
"//pkg/release:release",
"//pkg/repstream/streampb:streampb",
"//pkg/repstream:repstream",
"//pkg/roachpb/gen:gen",
"//pkg/roachpb/gen:gen_lib",
"//pkg/roachpb/roachpbmock:roachpbmock",
Expand Down Expand Up @@ -1834,7 +1835,6 @@ GO_TARGETS = [
"//pkg/storage/metamorphic:metamorphic_test",
"//pkg/storage:storage",
"//pkg/storage:storage_test",
"//pkg/streaming:streaming",
"//pkg/testutils/buildutil:buildutil",
"//pkg/testutils/colcontainerutils:colcontainerutils",
"//pkg/testutils/diagutils:diagutils",
Expand Down Expand Up @@ -2279,7 +2279,6 @@ GET_X_DATA_TARGETS = [
"//pkg/ccl/streamingccl/streamclient:get_x_data",
"//pkg/ccl/streamingccl/streamingest:get_x_data",
"//pkg/ccl/streamingccl/streamingtest:get_x_data",
"//pkg/ccl/streamingccl/streampb:get_x_data",
"//pkg/ccl/streamingccl/streamproducer:get_x_data",
"//pkg/ccl/telemetryccl:get_x_data",
"//pkg/ccl/testccl/authccl:get_x_data",
Expand Down Expand Up @@ -2542,6 +2541,8 @@ GET_X_DATA_TARGETS = [
"//pkg/obsservice/obspb/opentelemetry-proto/logs/v1:get_x_data",
"//pkg/obsservice/obspb/opentelemetry-proto/resource/v1:get_x_data",
"//pkg/release:get_x_data",
"//pkg/repstream:get_x_data",
"//pkg/repstream/streampb:get_x_data",
"//pkg/roachpb:get_x_data",
"//pkg/roachpb/gen:get_x_data",
"//pkg/roachpb/roachpbmock:get_x_data",
Expand Down Expand Up @@ -2903,7 +2904,6 @@ GET_X_DATA_TARGETS = [
"//pkg/storage/enginepb:get_x_data",
"//pkg/storage/fs:get_x_data",
"//pkg/storage/metamorphic:get_x_data",
"//pkg/streaming:get_x_data",
"//pkg/testutils:get_x_data",
"//pkg/testutils/buildutil:get_x_data",
"//pkg/testutils/colcontainerutils:get_x_data",
Expand Down
3 changes: 1 addition & 2 deletions pkg/ccl/streamingccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,11 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl",
visibility = ["//visibility:public"],
deps = [
"//pkg/ccl/streamingccl/streampb",
"//pkg/jobs/jobspb",
"//pkg/repstream/streampb",
"//pkg/roachpb",
"//pkg/settings",
"//pkg/storage",
"//pkg/streaming",
"//pkg/util/hlc",
"@com_github_cockroachdb_errors//:errors",
],
Expand Down
7 changes: 3 additions & 4 deletions pkg/ccl/streamingccl/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,18 @@ package streamingccl
import (
"fmt"

"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb"
"github.com/cockroachdb/cockroach/pkg/streaming"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
)

// StreamStatusErr is an error that encapsulate a replication stream's inactive status.
type StreamStatusErr struct {
StreamID streaming.StreamID
StreamID streampb.StreamID
StreamStatus streampb.StreamReplicationStatus_StreamStatus
}

// NewStreamStatusErr creates a new StreamStatusErr.
func NewStreamStatusErr(
streamID streaming.StreamID, streamStatus streampb.StreamReplicationStatus_StreamStatus,
streamID streampb.StreamID, streamStatus streampb.StreamReplicationStatus_StreamStatus,
) StreamStatusErr {
return StreamStatusErr{
StreamID: streamID,
Expand Down
6 changes: 2 additions & 4 deletions pkg/ccl/streamingccl/streamclient/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/ccl/streamingccl",
"//pkg/ccl/streamingccl/streampb",
"//pkg/jobs/jobspb",
"//pkg/keys",
"//pkg/repstream/streampb",
"//pkg/roachpb",
"//pkg/security/username",
"//pkg/sql",
Expand All @@ -26,7 +26,6 @@ go_library(
"//pkg/sql/rowenc",
"//pkg/sql/rowenc/valueside",
"//pkg/sql/sem/tree",
"//pkg/streaming",
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/protoutil",
Expand Down Expand Up @@ -55,18 +54,17 @@ go_test(
"//pkg/ccl/storageccl",
"//pkg/ccl/streamingccl",
"//pkg/ccl/streamingccl/streamingtest",
"//pkg/ccl/streamingccl/streampb",
"//pkg/ccl/streamingccl/streamproducer",
"//pkg/ccl/utilccl",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/repstream/streampb",
"//pkg/roachpb",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/sql/catalog/desctestutils",
"//pkg/sql/pgwire/pgcode",
"//pkg/streaming",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/testcluster",
Expand Down
13 changes: 6 additions & 7 deletions pkg/ccl/streamingccl/streamclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@ import (
"context"

"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/streaming"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -48,7 +47,7 @@ type Client interface {
// Create initializes a stream with the source, potentially reserving any
// required resources, such as protected timestamps, and returns an ID which
// can be used to interact with this stream in the future.
Create(ctx context.Context, tenantID roachpb.TenantID) (streaming.StreamID, error)
Create(ctx context.Context, tenantID roachpb.TenantID) (streampb.StreamID, error)

// Dial checks if the source is able to be connected to for queries
Dial(ctx context.Context) error
Expand All @@ -63,21 +62,21 @@ type Client interface {
// TODO(dt): ts -> checkpointToken.
Heartbeat(
ctx context.Context,
streamID streaming.StreamID,
streamID streampb.StreamID,
consumed hlc.Timestamp,
) (streampb.StreamReplicationStatus, error)

// Plan returns a Topology for this stream.
// TODO(dt): separate target argument from address argument.
Plan(ctx context.Context, streamID streaming.StreamID) (Topology, error)
Plan(ctx context.Context, streamID streampb.StreamID) (Topology, error)

// Subscribe opens and returns a subscription for the specified partition from
// the specified remote address. This is used by each consumer processor to
// open its subscription to its partition of a larger stream.
// TODO(dt): ts -> checkpointToken.
Subscribe(
ctx context.Context,
streamID streaming.StreamID,
streamID streampb.StreamID,
spec SubscriptionToken,
checkpoint hlc.Timestamp,
) (Subscription, error)
Expand All @@ -86,7 +85,7 @@ type Client interface {
Close(ctx context.Context) error

// Complete completes a replication stream consumption.
Complete(ctx context.Context, streamID streaming.StreamID, successfulIngestion bool) error
Complete(ctx context.Context, streamID streampb.StreamID, successfulIngestion bool) error
}

// Topology is a configuration of stream partitions. These are particular to a
Expand Down
15 changes: 7 additions & 8 deletions pkg/ccl/streamingccl/streamclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/streaming"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand All @@ -40,12 +39,12 @@ func (sc testStreamClient) Dial(ctx context.Context) error {
// Create implements the Client interface.
func (sc testStreamClient) Create(
ctx context.Context, target roachpb.TenantID,
) (streaming.StreamID, error) {
return streaming.StreamID(1), nil
) (streampb.StreamID, error) {
return streampb.StreamID(1), nil
}

// Plan implements the Client interface.
func (sc testStreamClient) Plan(ctx context.Context, ID streaming.StreamID) (Topology, error) {
func (sc testStreamClient) Plan(ctx context.Context, ID streampb.StreamID) (Topology, error) {
return Topology{
{SrcAddr: "test://host1"},
{SrcAddr: "test://host2"},
Expand All @@ -54,7 +53,7 @@ func (sc testStreamClient) Plan(ctx context.Context, ID streaming.StreamID) (Top

// Heartbeat implements the Client interface.
func (sc testStreamClient) Heartbeat(
ctx context.Context, ID streaming.StreamID, _ hlc.Timestamp,
ctx context.Context, ID streampb.StreamID, _ hlc.Timestamp,
) (streampb.StreamReplicationStatus, error) {
return streampb.StreamReplicationStatus{}, nil
}
Expand All @@ -66,7 +65,7 @@ func (sc testStreamClient) Close(ctx context.Context) error {

// Subscribe implements the Client interface.
func (sc testStreamClient) Subscribe(
ctx context.Context, stream streaming.StreamID, spec SubscriptionToken, checkpoint hlc.Timestamp,
ctx context.Context, stream streampb.StreamID, spec SubscriptionToken, checkpoint hlc.Timestamp,
) (Subscription, error) {
sampleKV := roachpb.KeyValue{
Key: []byte("key_1"),
Expand All @@ -93,7 +92,7 @@ func (sc testStreamClient) Subscribe(

// Complete implements the streamclient.Client interface.
func (sc testStreamClient) Complete(
ctx context.Context, streamID streaming.StreamID, successfulIngestion bool,
ctx context.Context, streamID streampb.StreamID, successfulIngestion bool,
) error {
return nil
}
Expand Down
17 changes: 8 additions & 9 deletions pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@ import (
"net/url"

"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/streaming"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
Expand Down Expand Up @@ -65,13 +64,13 @@ var _ Client = &partitionedStreamClient{}
// Create implements Client interface.
func (p *partitionedStreamClient) Create(
ctx context.Context, tenantID roachpb.TenantID,
) (streaming.StreamID, error) {
) (streampb.StreamID, error) {
ctx, sp := tracing.ChildSpan(ctx, "streamclient.Client.Create")
defer sp.Finish()

p.mu.Lock()
defer p.mu.Unlock()
var streamID streaming.StreamID
var streamID streampb.StreamID
row := p.mu.srcConn.QueryRow(ctx, `SELECT crdb_internal.start_replication_stream($1)`, tenantID.ToUint64())
err := row.Scan(&streamID)
if err != nil {
Expand All @@ -92,7 +91,7 @@ func (p *partitionedStreamClient) Dial(ctx context.Context) error {

// Heartbeat implements Client interface.
func (p *partitionedStreamClient) Heartbeat(
ctx context.Context, streamID streaming.StreamID, consumed hlc.Timestamp,
ctx context.Context, streamID streampb.StreamID, consumed hlc.Timestamp,
) (streampb.StreamReplicationStatus, error) {
ctx, sp := tracing.ChildSpan(ctx, "streamclient.Client.Heartbeat")
defer sp.Finish()
Expand Down Expand Up @@ -126,7 +125,7 @@ func (p *partitionedStreamClient) postgresURL(servingAddr string) (url.URL, erro

// Plan implements Client interface.
func (p *partitionedStreamClient) Plan(
ctx context.Context, streamID streaming.StreamID,
ctx context.Context, streamID streampb.StreamID,
) (Topology, error) {
var spec streampb.ReplicationStreamSpec
{
Expand Down Expand Up @@ -184,7 +183,7 @@ func (p *partitionedStreamClient) Close(ctx context.Context) error {

// Subscribe implements Client interface.
func (p *partitionedStreamClient) Subscribe(
ctx context.Context, stream streaming.StreamID, spec SubscriptionToken, checkpoint hlc.Timestamp,
ctx context.Context, stream streampb.StreamID, spec SubscriptionToken, checkpoint hlc.Timestamp,
) (Subscription, error) {
_, sp := tracing.ChildSpan(ctx, "streamclient.Client.Subscribe")
defer sp.Finish()
Expand Down Expand Up @@ -215,7 +214,7 @@ func (p *partitionedStreamClient) Subscribe(

// Complete implements the streamclient.Client interface.
func (p *partitionedStreamClient) Complete(
ctx context.Context, streamID streaming.StreamID, successfulIngestion bool,
ctx context.Context, streamID streampb.StreamID, successfulIngestion bool,
) error {
ctx, sp := tracing.ChildSpan(ctx, "streamclient.Client.Complete")
defer sp.Finish()
Expand All @@ -239,7 +238,7 @@ type partitionedStreamSubscription struct {

streamEvent *streampb.StreamEvent
specBytes []byte
streamID streaming.StreamID
streamID streampb.StreamID
}

var _ Subscription = (*partitionedStreamSubscription)(nil)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@ import (
_ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl" // Ensure we can start tenant.
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamingtest"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb"
_ "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamproducer" // Ensure we can start replication stream.
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/streaming"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/cancelchecker"
Expand Down Expand Up @@ -98,7 +97,7 @@ INSERT INTO d.t2 VALUES (2);
require.NoError(t, client.Close(ctx))
}()
require.NoError(t, err)
expectStreamState := func(streamID streaming.StreamID, status jobs.Status) {
expectStreamState := func(streamID streampb.StreamID, status jobs.Status) {
h.SysSQL.CheckQueryResultsRetry(t, fmt.Sprintf("SELECT status FROM system.jobs WHERE id = %d", streamID),
[][]string{{string(status)}})
}
Expand Down Expand Up @@ -212,7 +211,7 @@ INSERT INTO d.t2 VALUES (2);
})

// Testing client.Complete()
err = client.Complete(ctx, streaming.StreamID(999), true)
err = client.Complete(ctx, streampb.StreamID(999), true)
require.True(t, testutils.IsError(err, fmt.Sprintf("job %d: not found in system.jobs table", 999)), err)

// Makes producer job exit quickly.
Expand Down
Loading

0 comments on commit 03f6062

Please sign in to comment.