Skip to content

Commit

Permalink
streampb: move out of CCL
Browse files Browse the repository at this point in the history
This moves streampb into pkg/streaming so that non-CCL code doesn't
need to import CCL code.

It also renames pkg/streaming to pkg/repstream.

Fixes #91005

Release note: None
  • Loading branch information
stevendanna committed Nov 14, 2022
1 parent df02d67 commit 1ead23a
Show file tree
Hide file tree
Showing 40 changed files with 125 additions and 139 deletions.
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@
/pkg/settings/ @cockroachdb/unowned
/pkg/spanconfig/ @cockroachdb/kv-prs
/pkg/startupmigrations/ @cockroachdb/unowned @cockroachdb/sql-schema
/pkg/streaming/ @cockroachdb/disaster-recovery
/pkg/repstream/ @cockroachdb/disaster-recovery
/pkg/testutils/ @cockroachdb/test-eng-noreview
/pkg/testutils/reduce/ @cockroachdb/sql-queries
/pkg/testutils/sqlutils/ @cockroachdb/sql-queries
Expand Down
2 changes: 1 addition & 1 deletion docs/generated/redact_safe.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ pkg/base/node_id.go | `*SQLIDContainer`
pkg/base/node_id.go | `*StoreIDContainer`
pkg/ccl/backupccl/backuppb/backup.go | `sz`
pkg/ccl/backupccl/backuppb/backup.go | `timing`
pkg/ccl/streamingccl/streampb/streamid.go | `StreamID`
pkg/cli/exit/exit.go | `Code`
pkg/jobs/jobspb/wrap.go | `Type`
pkg/kv/bulk/bulk_metrics.go | `sz`
Expand All @@ -18,6 +17,7 @@ pkg/kv/kvserver/concurrency/lock/locking.go | `Durability`
pkg/kv/kvserver/concurrency/lock/locking.go | `Strength`
pkg/kv/kvserver/concurrency/lock/locking.go | `WaitPolicy`
pkg/kv/kvserver/kvserverpb/raft.go | `SnapshotRequest_Type`
pkg/repstream/streampb/streamid.go | `StreamID`
pkg/roachpb/data.go | `LeaseSequence`
pkg/roachpb/data.go | `ReplicaChangeType`
pkg/roachpb/data.go | `TransactionStatus`
Expand Down
8 changes: 4 additions & 4 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -794,7 +794,6 @@ GO_TARGETS = [
"//pkg/ccl/streamingccl/streamingest:streamingest",
"//pkg/ccl/streamingccl/streamingest:streamingest_test",
"//pkg/ccl/streamingccl/streamingtest:streamingtest",
"//pkg/ccl/streamingccl/streampb:streampb",
"//pkg/ccl/streamingccl/streamproducer:streamproducer",
"//pkg/ccl/streamingccl/streamproducer:streamproducer_test",
"//pkg/ccl/streamingccl:streamingccl",
Expand Down Expand Up @@ -1248,6 +1247,8 @@ GO_TARGETS = [
"//pkg/obsservice/obspb/opentelemetry-proto/resource/v1:resource",
"//pkg/obsservice/obspb:obspb",
"//pkg/release:release",
"//pkg/repstream/streampb:streampb",
"//pkg/repstream:repstream",
"//pkg/roachpb/gen:gen",
"//pkg/roachpb/gen:gen_lib",
"//pkg/roachpb/roachpbmock:roachpbmock",
Expand Down Expand Up @@ -1834,7 +1835,6 @@ GO_TARGETS = [
"//pkg/storage/metamorphic:metamorphic_test",
"//pkg/storage:storage",
"//pkg/storage:storage_test",
"//pkg/streaming:streaming",
"//pkg/testutils/buildutil:buildutil",
"//pkg/testutils/colcontainerutils:colcontainerutils",
"//pkg/testutils/diagutils:diagutils",
Expand Down Expand Up @@ -2279,7 +2279,6 @@ GET_X_DATA_TARGETS = [
"//pkg/ccl/streamingccl/streamclient:get_x_data",
"//pkg/ccl/streamingccl/streamingest:get_x_data",
"//pkg/ccl/streamingccl/streamingtest:get_x_data",
"//pkg/ccl/streamingccl/streampb:get_x_data",
"//pkg/ccl/streamingccl/streamproducer:get_x_data",
"//pkg/ccl/telemetryccl:get_x_data",
"//pkg/ccl/testccl/authccl:get_x_data",
Expand Down Expand Up @@ -2542,6 +2541,8 @@ GET_X_DATA_TARGETS = [
"//pkg/obsservice/obspb/opentelemetry-proto/logs/v1:get_x_data",
"//pkg/obsservice/obspb/opentelemetry-proto/resource/v1:get_x_data",
"//pkg/release:get_x_data",
"//pkg/repstream:get_x_data",
"//pkg/repstream/streampb:get_x_data",
"//pkg/roachpb:get_x_data",
"//pkg/roachpb/gen:get_x_data",
"//pkg/roachpb/roachpbmock:get_x_data",
Expand Down Expand Up @@ -2903,7 +2904,6 @@ GET_X_DATA_TARGETS = [
"//pkg/storage/enginepb:get_x_data",
"//pkg/storage/fs:get_x_data",
"//pkg/storage/metamorphic:get_x_data",
"//pkg/streaming:get_x_data",
"//pkg/testutils:get_x_data",
"//pkg/testutils/buildutil:get_x_data",
"//pkg/testutils/colcontainerutils:get_x_data",
Expand Down
3 changes: 1 addition & 2 deletions pkg/ccl/streamingccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,11 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl",
visibility = ["//visibility:public"],
deps = [
"//pkg/ccl/streamingccl/streampb",
"//pkg/jobs/jobspb",
"//pkg/repstream/streampb",
"//pkg/roachpb",
"//pkg/settings",
"//pkg/storage",
"//pkg/streaming",
"//pkg/util/hlc",
"@com_github_cockroachdb_errors//:errors",
],
Expand Down
7 changes: 3 additions & 4 deletions pkg/ccl/streamingccl/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,18 @@ package streamingccl
import (
"fmt"

"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb"
"github.com/cockroachdb/cockroach/pkg/streaming"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
)

// StreamStatusErr is an error that encapsulate a replication stream's inactive status.
type StreamStatusErr struct {
StreamID streaming.StreamID
StreamID streampb.StreamID
StreamStatus streampb.StreamReplicationStatus_StreamStatus
}

// NewStreamStatusErr creates a new StreamStatusErr.
func NewStreamStatusErr(
streamID streaming.StreamID, streamStatus streampb.StreamReplicationStatus_StreamStatus,
streamID streampb.StreamID, streamStatus streampb.StreamReplicationStatus_StreamStatus,
) StreamStatusErr {
return StreamStatusErr{
StreamID: streamID,
Expand Down
6 changes: 2 additions & 4 deletions pkg/ccl/streamingccl/streamclient/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/ccl/streamingccl",
"//pkg/ccl/streamingccl/streampb",
"//pkg/jobs/jobspb",
"//pkg/keys",
"//pkg/repstream/streampb",
"//pkg/roachpb",
"//pkg/security/username",
"//pkg/sql",
Expand All @@ -26,7 +26,6 @@ go_library(
"//pkg/sql/rowenc",
"//pkg/sql/rowenc/valueside",
"//pkg/sql/sem/tree",
"//pkg/streaming",
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/protoutil",
Expand Down Expand Up @@ -55,18 +54,17 @@ go_test(
"//pkg/ccl/storageccl",
"//pkg/ccl/streamingccl",
"//pkg/ccl/streamingccl/streamingtest",
"//pkg/ccl/streamingccl/streampb",
"//pkg/ccl/streamingccl/streamproducer",
"//pkg/ccl/utilccl",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/repstream/streampb",
"//pkg/roachpb",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/sql/catalog/desctestutils",
"//pkg/sql/pgwire/pgcode",
"//pkg/streaming",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/testcluster",
Expand Down
13 changes: 6 additions & 7 deletions pkg/ccl/streamingccl/streamclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@ import (
"context"

"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/streaming"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -48,7 +47,7 @@ type Client interface {
// Create initializes a stream with the source, potentially reserving any
// required resources, such as protected timestamps, and returns an ID which
// can be used to interact with this stream in the future.
Create(ctx context.Context, tenantID roachpb.TenantID) (streaming.StreamID, error)
Create(ctx context.Context, tenantID roachpb.TenantID) (streampb.StreamID, error)

// Dial checks if the source is able to be connected to for queries
Dial(ctx context.Context) error
Expand All @@ -63,21 +62,21 @@ type Client interface {
// TODO(dt): ts -> checkpointToken.
Heartbeat(
ctx context.Context,
streamID streaming.StreamID,
streamID streampb.StreamID,
consumed hlc.Timestamp,
) (streampb.StreamReplicationStatus, error)

// Plan returns a Topology for this stream.
// TODO(dt): separate target argument from address argument.
Plan(ctx context.Context, streamID streaming.StreamID) (Topology, error)
Plan(ctx context.Context, streamID streampb.StreamID) (Topology, error)

// Subscribe opens and returns a subscription for the specified partition from
// the specified remote address. This is used by each consumer processor to
// open its subscription to its partition of a larger stream.
// TODO(dt): ts -> checkpointToken.
Subscribe(
ctx context.Context,
streamID streaming.StreamID,
streamID streampb.StreamID,
spec SubscriptionToken,
checkpoint hlc.Timestamp,
) (Subscription, error)
Expand All @@ -86,7 +85,7 @@ type Client interface {
Close(ctx context.Context) error

// Complete completes a replication stream consumption.
Complete(ctx context.Context, streamID streaming.StreamID, successfulIngestion bool) error
Complete(ctx context.Context, streamID streampb.StreamID, successfulIngestion bool) error
}

// Topology is a configuration of stream partitions. These are particular to a
Expand Down
15 changes: 7 additions & 8 deletions pkg/ccl/streamingccl/streamclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/streaming"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand All @@ -40,12 +39,12 @@ func (sc testStreamClient) Dial(ctx context.Context) error {
// Create implements the Client interface.
func (sc testStreamClient) Create(
ctx context.Context, target roachpb.TenantID,
) (streaming.StreamID, error) {
return streaming.StreamID(1), nil
) (streampb.StreamID, error) {
return streampb.StreamID(1), nil
}

// Plan implements the Client interface.
func (sc testStreamClient) Plan(ctx context.Context, ID streaming.StreamID) (Topology, error) {
func (sc testStreamClient) Plan(ctx context.Context, ID streampb.StreamID) (Topology, error) {
return Topology{
{SrcAddr: "test://host1"},
{SrcAddr: "test://host2"},
Expand All @@ -54,7 +53,7 @@ func (sc testStreamClient) Plan(ctx context.Context, ID streaming.StreamID) (Top

// Heartbeat implements the Client interface.
func (sc testStreamClient) Heartbeat(
ctx context.Context, ID streaming.StreamID, _ hlc.Timestamp,
ctx context.Context, ID streampb.StreamID, _ hlc.Timestamp,
) (streampb.StreamReplicationStatus, error) {
return streampb.StreamReplicationStatus{}, nil
}
Expand All @@ -66,7 +65,7 @@ func (sc testStreamClient) Close(ctx context.Context) error {

// Subscribe implements the Client interface.
func (sc testStreamClient) Subscribe(
ctx context.Context, stream streaming.StreamID, spec SubscriptionToken, checkpoint hlc.Timestamp,
ctx context.Context, stream streampb.StreamID, spec SubscriptionToken, checkpoint hlc.Timestamp,
) (Subscription, error) {
sampleKV := roachpb.KeyValue{
Key: []byte("key_1"),
Expand All @@ -93,7 +92,7 @@ func (sc testStreamClient) Subscribe(

// Complete implements the streamclient.Client interface.
func (sc testStreamClient) Complete(
ctx context.Context, streamID streaming.StreamID, successfulIngestion bool,
ctx context.Context, streamID streampb.StreamID, successfulIngestion bool,
) error {
return nil
}
Expand Down
17 changes: 8 additions & 9 deletions pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@ import (
"net/url"

"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/streaming"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
Expand Down Expand Up @@ -65,13 +64,13 @@ var _ Client = &partitionedStreamClient{}
// Create implements Client interface.
func (p *partitionedStreamClient) Create(
ctx context.Context, tenantID roachpb.TenantID,
) (streaming.StreamID, error) {
) (streampb.StreamID, error) {
ctx, sp := tracing.ChildSpan(ctx, "streamclient.Client.Create")
defer sp.Finish()

p.mu.Lock()
defer p.mu.Unlock()
var streamID streaming.StreamID
var streamID streampb.StreamID
row := p.mu.srcConn.QueryRow(ctx, `SELECT crdb_internal.start_replication_stream($1)`, tenantID.ToUint64())
err := row.Scan(&streamID)
if err != nil {
Expand All @@ -92,7 +91,7 @@ func (p *partitionedStreamClient) Dial(ctx context.Context) error {

// Heartbeat implements Client interface.
func (p *partitionedStreamClient) Heartbeat(
ctx context.Context, streamID streaming.StreamID, consumed hlc.Timestamp,
ctx context.Context, streamID streampb.StreamID, consumed hlc.Timestamp,
) (streampb.StreamReplicationStatus, error) {
ctx, sp := tracing.ChildSpan(ctx, "streamclient.Client.Heartbeat")
defer sp.Finish()
Expand Down Expand Up @@ -126,7 +125,7 @@ func (p *partitionedStreamClient) postgresURL(servingAddr string) (url.URL, erro

// Plan implements Client interface.
func (p *partitionedStreamClient) Plan(
ctx context.Context, streamID streaming.StreamID,
ctx context.Context, streamID streampb.StreamID,
) (Topology, error) {
var spec streampb.ReplicationStreamSpec
{
Expand Down Expand Up @@ -184,7 +183,7 @@ func (p *partitionedStreamClient) Close(ctx context.Context) error {

// Subscribe implements Client interface.
func (p *partitionedStreamClient) Subscribe(
ctx context.Context, stream streaming.StreamID, spec SubscriptionToken, checkpoint hlc.Timestamp,
ctx context.Context, stream streampb.StreamID, spec SubscriptionToken, checkpoint hlc.Timestamp,
) (Subscription, error) {
_, sp := tracing.ChildSpan(ctx, "streamclient.Client.Subscribe")
defer sp.Finish()
Expand Down Expand Up @@ -215,7 +214,7 @@ func (p *partitionedStreamClient) Subscribe(

// Complete implements the streamclient.Client interface.
func (p *partitionedStreamClient) Complete(
ctx context.Context, streamID streaming.StreamID, successfulIngestion bool,
ctx context.Context, streamID streampb.StreamID, successfulIngestion bool,
) error {
ctx, sp := tracing.ChildSpan(ctx, "streamclient.Client.Complete")
defer sp.Finish()
Expand All @@ -239,7 +238,7 @@ type partitionedStreamSubscription struct {

streamEvent *streampb.StreamEvent
specBytes []byte
streamID streaming.StreamID
streamID streampb.StreamID
}

var _ Subscription = (*partitionedStreamSubscription)(nil)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@ import (
_ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl" // Ensure we can start tenant.
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamingtest"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb"
_ "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamproducer" // Ensure we can start replication stream.
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/streaming"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/cancelchecker"
Expand Down Expand Up @@ -98,7 +97,7 @@ INSERT INTO d.t2 VALUES (2);
require.NoError(t, client.Close(ctx))
}()
require.NoError(t, err)
expectStreamState := func(streamID streaming.StreamID, status jobs.Status) {
expectStreamState := func(streamID streampb.StreamID, status jobs.Status) {
h.SysSQL.CheckQueryResultsRetry(t, fmt.Sprintf("SELECT status FROM system.jobs WHERE id = %d", streamID),
[][]string{{string(status)}})
}
Expand Down Expand Up @@ -212,7 +211,7 @@ INSERT INTO d.t2 VALUES (2);
})

// Testing client.Complete()
err = client.Complete(ctx, streaming.StreamID(999), true)
err = client.Complete(ctx, streampb.StreamID(999), true)
require.True(t, testutils.IsError(err, fmt.Sprintf("job %d: not found in system.jobs table", 999)), err)

// Makes producer job exit quickly.
Expand Down
Loading

0 comments on commit 1ead23a

Please sign in to comment.