diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 87f370d60287..c8feb1e00409 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -346,7 +346,7 @@ /pkg/settings/ @cockroachdb/unowned /pkg/spanconfig/ @cockroachdb/kv-prs /pkg/startupmigrations/ @cockroachdb/unowned @cockroachdb/sql-schema -/pkg/streaming/ @cockroachdb/disaster-recovery +/pkg/repstream/ @cockroachdb/disaster-recovery /pkg/testutils/ @cockroachdb/test-eng-noreview /pkg/testutils/reduce/ @cockroachdb/sql-queries /pkg/testutils/sqlutils/ @cockroachdb/sql-queries diff --git a/docs/generated/redact_safe.md b/docs/generated/redact_safe.md index d65990bd80ef..0a716ea68861 100644 --- a/docs/generated/redact_safe.md +++ b/docs/generated/redact_safe.md @@ -7,7 +7,6 @@ pkg/base/node_id.go | `*SQLIDContainer` pkg/base/node_id.go | `*StoreIDContainer` pkg/ccl/backupccl/backuppb/backup.go | `sz` pkg/ccl/backupccl/backuppb/backup.go | `timing` -pkg/ccl/streamingccl/streampb/streamid.go | `StreamID` pkg/cli/exit/exit.go | `Code` pkg/jobs/jobspb/wrap.go | `Type` pkg/kv/bulk/bulk_metrics.go | `sz` @@ -18,6 +17,7 @@ pkg/kv/kvserver/concurrency/lock/locking.go | `Durability` pkg/kv/kvserver/concurrency/lock/locking.go | `Strength` pkg/kv/kvserver/concurrency/lock/locking.go | `WaitPolicy` pkg/kv/kvserver/kvserverpb/raft.go | `SnapshotRequest_Type` +pkg/repstream/streampb/streamid.go | `StreamID` pkg/roachpb/data.go | `LeaseSequence` pkg/roachpb/data.go | `ReplicaChangeType` pkg/roachpb/data.go | `TransactionStatus` diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index dec349965f52..aaed0636ccd9 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -794,7 +794,6 @@ GO_TARGETS = [ "//pkg/ccl/streamingccl/streamingest:streamingest", "//pkg/ccl/streamingccl/streamingest:streamingest_test", "//pkg/ccl/streamingccl/streamingtest:streamingtest", - "//pkg/ccl/streamingccl/streampb:streampb", "//pkg/ccl/streamingccl/streamproducer:streamproducer", "//pkg/ccl/streamingccl/streamproducer:streamproducer_test", "//pkg/ccl/streamingccl:streamingccl", @@ -1248,6 +1247,8 @@ GO_TARGETS = [ "//pkg/obsservice/obspb/opentelemetry-proto/resource/v1:resource", "//pkg/obsservice/obspb:obspb", "//pkg/release:release", + "//pkg/repstream/streampb:streampb", + "//pkg/repstream:repstream", "//pkg/roachpb/gen:gen", "//pkg/roachpb/gen:gen_lib", "//pkg/roachpb/roachpbmock:roachpbmock", @@ -1834,7 +1835,6 @@ GO_TARGETS = [ "//pkg/storage/metamorphic:metamorphic_test", "//pkg/storage:storage", "//pkg/storage:storage_test", - "//pkg/streaming:streaming", "//pkg/testutils/buildutil:buildutil", "//pkg/testutils/colcontainerutils:colcontainerutils", "//pkg/testutils/diagutils:diagutils", @@ -2279,7 +2279,6 @@ GET_X_DATA_TARGETS = [ "//pkg/ccl/streamingccl/streamclient:get_x_data", "//pkg/ccl/streamingccl/streamingest:get_x_data", "//pkg/ccl/streamingccl/streamingtest:get_x_data", - "//pkg/ccl/streamingccl/streampb:get_x_data", "//pkg/ccl/streamingccl/streamproducer:get_x_data", "//pkg/ccl/telemetryccl:get_x_data", "//pkg/ccl/testccl/authccl:get_x_data", @@ -2542,6 +2541,8 @@ GET_X_DATA_TARGETS = [ "//pkg/obsservice/obspb/opentelemetry-proto/logs/v1:get_x_data", "//pkg/obsservice/obspb/opentelemetry-proto/resource/v1:get_x_data", "//pkg/release:get_x_data", + "//pkg/repstream:get_x_data", + "//pkg/repstream/streampb:get_x_data", "//pkg/roachpb:get_x_data", "//pkg/roachpb/gen:get_x_data", "//pkg/roachpb/roachpbmock:get_x_data", @@ -2903,7 +2904,6 @@ GET_X_DATA_TARGETS = [ "//pkg/storage/enginepb:get_x_data", "//pkg/storage/fs:get_x_data", "//pkg/storage/metamorphic:get_x_data", - "//pkg/streaming:get_x_data", "//pkg/testutils:get_x_data", "//pkg/testutils/buildutil:get_x_data", "//pkg/testutils/colcontainerutils:get_x_data", diff --git a/pkg/ccl/streamingccl/BUILD.bazel b/pkg/ccl/streamingccl/BUILD.bazel index 96bbd7c12ec2..42c9a19f5387 100644 --- a/pkg/ccl/streamingccl/BUILD.bazel +++ b/pkg/ccl/streamingccl/BUILD.bazel @@ -13,12 +13,11 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl", visibility = ["//visibility:public"], deps = [ - "//pkg/ccl/streamingccl/streampb", "//pkg/jobs/jobspb", + "//pkg/repstream/streampb", "//pkg/roachpb", "//pkg/settings", "//pkg/storage", - "//pkg/streaming", "//pkg/util/hlc", "@com_github_cockroachdb_errors//:errors", ], diff --git a/pkg/ccl/streamingccl/errors.go b/pkg/ccl/streamingccl/errors.go index c33e27f76552..f6e98bcb94a2 100644 --- a/pkg/ccl/streamingccl/errors.go +++ b/pkg/ccl/streamingccl/errors.go @@ -11,19 +11,18 @@ package streamingccl import ( "fmt" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb" - "github.com/cockroachdb/cockroach/pkg/streaming" + "github.com/cockroachdb/cockroach/pkg/repstream/streampb" ) // StreamStatusErr is an error that encapsulate a replication stream's inactive status. type StreamStatusErr struct { - StreamID streaming.StreamID + StreamID streampb.StreamID StreamStatus streampb.StreamReplicationStatus_StreamStatus } // NewStreamStatusErr creates a new StreamStatusErr. func NewStreamStatusErr( - streamID streaming.StreamID, streamStatus streampb.StreamReplicationStatus_StreamStatus, + streamID streampb.StreamID, streamStatus streampb.StreamReplicationStatus_StreamStatus, ) StreamStatusErr { return StreamStatusErr{ StreamID: streamID, diff --git a/pkg/ccl/streamingccl/streamclient/BUILD.bazel b/pkg/ccl/streamingccl/streamclient/BUILD.bazel index 0c8d2e326939..6c9d1e1d1e56 100644 --- a/pkg/ccl/streamingccl/streamclient/BUILD.bazel +++ b/pkg/ccl/streamingccl/streamclient/BUILD.bazel @@ -12,9 +12,9 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/ccl/streamingccl", - "//pkg/ccl/streamingccl/streampb", "//pkg/jobs/jobspb", "//pkg/keys", + "//pkg/repstream/streampb", "//pkg/roachpb", "//pkg/security/username", "//pkg/sql", @@ -26,7 +26,6 @@ go_library( "//pkg/sql/rowenc", "//pkg/sql/rowenc/valueside", "//pkg/sql/sem/tree", - "//pkg/streaming", "//pkg/util/hlc", "//pkg/util/log", "//pkg/util/protoutil", @@ -55,18 +54,17 @@ go_test( "//pkg/ccl/storageccl", "//pkg/ccl/streamingccl", "//pkg/ccl/streamingccl/streamingtest", - "//pkg/ccl/streamingccl/streampb", "//pkg/ccl/streamingccl/streamproducer", "//pkg/ccl/utilccl", "//pkg/jobs", "//pkg/jobs/jobspb", + "//pkg/repstream/streampb", "//pkg/roachpb", "//pkg/security/securityassets", "//pkg/security/securitytest", "//pkg/server", "//pkg/sql/catalog/desctestutils", "//pkg/sql/pgwire/pgcode", - "//pkg/streaming", "//pkg/testutils", "//pkg/testutils/serverutils", "//pkg/testutils/testcluster", diff --git a/pkg/ccl/streamingccl/streamclient/client.go b/pkg/ccl/streamingccl/streamclient/client.go index 4953065d1843..cf88147cf72c 100644 --- a/pkg/ccl/streamingccl/streamclient/client.go +++ b/pkg/ccl/streamingccl/streamclient/client.go @@ -12,9 +12,8 @@ import ( "context" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb" + "github.com/cockroachdb/cockroach/pkg/repstream/streampb" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/streaming" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" @@ -48,7 +47,7 @@ type Client interface { // Create initializes a stream with the source, potentially reserving any // required resources, such as protected timestamps, and returns an ID which // can be used to interact with this stream in the future. - Create(ctx context.Context, tenantID roachpb.TenantID) (streaming.StreamID, error) + Create(ctx context.Context, tenantID roachpb.TenantID) (streampb.StreamID, error) // Dial checks if the source is able to be connected to for queries Dial(ctx context.Context) error @@ -63,13 +62,13 @@ type Client interface { // TODO(dt): ts -> checkpointToken. Heartbeat( ctx context.Context, - streamID streaming.StreamID, + streamID streampb.StreamID, consumed hlc.Timestamp, ) (streampb.StreamReplicationStatus, error) // Plan returns a Topology for this stream. // TODO(dt): separate target argument from address argument. - Plan(ctx context.Context, streamID streaming.StreamID) (Topology, error) + Plan(ctx context.Context, streamID streampb.StreamID) (Topology, error) // Subscribe opens and returns a subscription for the specified partition from // the specified remote address. This is used by each consumer processor to @@ -77,7 +76,7 @@ type Client interface { // TODO(dt): ts -> checkpointToken. Subscribe( ctx context.Context, - streamID streaming.StreamID, + streamID streampb.StreamID, spec SubscriptionToken, checkpoint hlc.Timestamp, ) (Subscription, error) @@ -86,7 +85,7 @@ type Client interface { Close(ctx context.Context) error // Complete completes a replication stream consumption. - Complete(ctx context.Context, streamID streaming.StreamID, successfulIngestion bool) error + Complete(ctx context.Context, streamID streampb.StreamID, successfulIngestion bool) error } // Topology is a configuration of stream partitions. These are particular to a diff --git a/pkg/ccl/streamingccl/streamclient/client_test.go b/pkg/ccl/streamingccl/streamclient/client_test.go index c2c0e98517b1..f41d95261a54 100644 --- a/pkg/ccl/streamingccl/streamclient/client_test.go +++ b/pkg/ccl/streamingccl/streamclient/client_test.go @@ -16,10 +16,9 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/repstream/streampb" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/streaming" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -40,12 +39,12 @@ func (sc testStreamClient) Dial(ctx context.Context) error { // Create implements the Client interface. func (sc testStreamClient) Create( ctx context.Context, target roachpb.TenantID, -) (streaming.StreamID, error) { - return streaming.StreamID(1), nil +) (streampb.StreamID, error) { + return streampb.StreamID(1), nil } // Plan implements the Client interface. -func (sc testStreamClient) Plan(ctx context.Context, ID streaming.StreamID) (Topology, error) { +func (sc testStreamClient) Plan(ctx context.Context, ID streampb.StreamID) (Topology, error) { return Topology{ {SrcAddr: "test://host1"}, {SrcAddr: "test://host2"}, @@ -54,7 +53,7 @@ func (sc testStreamClient) Plan(ctx context.Context, ID streaming.StreamID) (Top // Heartbeat implements the Client interface. func (sc testStreamClient) Heartbeat( - ctx context.Context, ID streaming.StreamID, _ hlc.Timestamp, + ctx context.Context, ID streampb.StreamID, _ hlc.Timestamp, ) (streampb.StreamReplicationStatus, error) { return streampb.StreamReplicationStatus{}, nil } @@ -66,7 +65,7 @@ func (sc testStreamClient) Close(ctx context.Context) error { // Subscribe implements the Client interface. func (sc testStreamClient) Subscribe( - ctx context.Context, stream streaming.StreamID, spec SubscriptionToken, checkpoint hlc.Timestamp, + ctx context.Context, stream streampb.StreamID, spec SubscriptionToken, checkpoint hlc.Timestamp, ) (Subscription, error) { sampleKV := roachpb.KeyValue{ Key: []byte("key_1"), @@ -93,7 +92,7 @@ func (sc testStreamClient) Subscribe( // Complete implements the streamclient.Client interface. func (sc testStreamClient) Complete( - ctx context.Context, streamID streaming.StreamID, successfulIngestion bool, + ctx context.Context, streamID streampb.StreamID, successfulIngestion bool, ) error { return nil } diff --git a/pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go b/pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go index 44cddf7e9ddc..814313f860e8 100644 --- a/pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go +++ b/pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go @@ -14,9 +14,8 @@ import ( "net/url" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb" + "github.com/cockroachdb/cockroach/pkg/repstream/streampb" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/streaming" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" @@ -65,13 +64,13 @@ var _ Client = &partitionedStreamClient{} // Create implements Client interface. func (p *partitionedStreamClient) Create( ctx context.Context, tenantID roachpb.TenantID, -) (streaming.StreamID, error) { +) (streampb.StreamID, error) { ctx, sp := tracing.ChildSpan(ctx, "streamclient.Client.Create") defer sp.Finish() p.mu.Lock() defer p.mu.Unlock() - var streamID streaming.StreamID + var streamID streampb.StreamID row := p.mu.srcConn.QueryRow(ctx, `SELECT crdb_internal.start_replication_stream($1)`, tenantID.ToUint64()) err := row.Scan(&streamID) if err != nil { @@ -92,7 +91,7 @@ func (p *partitionedStreamClient) Dial(ctx context.Context) error { // Heartbeat implements Client interface. func (p *partitionedStreamClient) Heartbeat( - ctx context.Context, streamID streaming.StreamID, consumed hlc.Timestamp, + ctx context.Context, streamID streampb.StreamID, consumed hlc.Timestamp, ) (streampb.StreamReplicationStatus, error) { ctx, sp := tracing.ChildSpan(ctx, "streamclient.Client.Heartbeat") defer sp.Finish() @@ -126,7 +125,7 @@ func (p *partitionedStreamClient) postgresURL(servingAddr string) (url.URL, erro // Plan implements Client interface. func (p *partitionedStreamClient) Plan( - ctx context.Context, streamID streaming.StreamID, + ctx context.Context, streamID streampb.StreamID, ) (Topology, error) { var spec streampb.ReplicationStreamSpec { @@ -184,7 +183,7 @@ func (p *partitionedStreamClient) Close(ctx context.Context) error { // Subscribe implements Client interface. func (p *partitionedStreamClient) Subscribe( - ctx context.Context, stream streaming.StreamID, spec SubscriptionToken, checkpoint hlc.Timestamp, + ctx context.Context, stream streampb.StreamID, spec SubscriptionToken, checkpoint hlc.Timestamp, ) (Subscription, error) { _, sp := tracing.ChildSpan(ctx, "streamclient.Client.Subscribe") defer sp.Finish() @@ -215,7 +214,7 @@ func (p *partitionedStreamClient) Subscribe( // Complete implements the streamclient.Client interface. func (p *partitionedStreamClient) Complete( - ctx context.Context, streamID streaming.StreamID, successfulIngestion bool, + ctx context.Context, streamID streampb.StreamID, successfulIngestion bool, ) error { ctx, sp := tracing.ChildSpan(ctx, "streamclient.Client.Complete") defer sp.Finish() @@ -239,7 +238,7 @@ type partitionedStreamSubscription struct { streamEvent *streampb.StreamEvent specBytes []byte - streamID streaming.StreamID + streamID streampb.StreamID } var _ Subscription = (*partitionedStreamSubscription)(nil) diff --git a/pkg/ccl/streamingccl/streamclient/partitioned_stream_client_test.go b/pkg/ccl/streamingccl/streamclient/partitioned_stream_client_test.go index 8ee2b9d90374..2e1191f32cdd 100644 --- a/pkg/ccl/streamingccl/streamclient/partitioned_stream_client_test.go +++ b/pkg/ccl/streamingccl/streamclient/partitioned_stream_client_test.go @@ -19,13 +19,12 @@ import ( _ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl" // Ensure we can start tenant. "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamingtest" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb" _ "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamproducer" // Ensure we can start replication stream. "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/repstream/streampb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" - "github.com/cockroachdb/cockroach/pkg/streaming" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/util/cancelchecker" @@ -98,7 +97,7 @@ INSERT INTO d.t2 VALUES (2); require.NoError(t, client.Close(ctx)) }() require.NoError(t, err) - expectStreamState := func(streamID streaming.StreamID, status jobs.Status) { + expectStreamState := func(streamID streampb.StreamID, status jobs.Status) { h.SysSQL.CheckQueryResultsRetry(t, fmt.Sprintf("SELECT status FROM system.jobs WHERE id = %d", streamID), [][]string{{string(status)}}) } @@ -212,7 +211,7 @@ INSERT INTO d.t2 VALUES (2); }) // Testing client.Complete() - err = client.Complete(ctx, streaming.StreamID(999), true) + err = client.Complete(ctx, streampb.StreamID(999), true) require.True(t, testutils.IsError(err, fmt.Sprintf("job %d: not found in system.jobs table", 999)), err) // Makes producer job exit quickly. diff --git a/pkg/ccl/streamingccl/streamclient/random_stream_client.go b/pkg/ccl/streamingccl/streamclient/random_stream_client.go index a16b9daf32f3..0474364f1c4a 100644 --- a/pkg/ccl/streamingccl/streamclient/random_stream_client.go +++ b/pkg/ccl/streamingccl/streamclient/random_stream_client.go @@ -17,9 +17,9 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/repstream/streampb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/sql" @@ -31,7 +31,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/rowenc/valueside" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" - "github.com/cockroachdb/cockroach/pkg/streaming" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/randutil" @@ -351,7 +350,7 @@ func (m *RandomStreamClient) Dial(ctx context.Context) error { } // Plan implements the Client interface. -func (m *RandomStreamClient) Plan(ctx context.Context, id streaming.StreamID) (Topology, error) { +func (m *RandomStreamClient) Plan(ctx context.Context, id streampb.StreamID) (Topology, error) { topology := make(Topology, 0, m.config.numPartitions) log.Infof(ctx, "planning random stream for tenant %d", m.config.tenantID) @@ -381,15 +380,15 @@ func (m *RandomStreamClient) Plan(ctx context.Context, id streaming.StreamID) (T // Create implements the Client interface. func (m *RandomStreamClient) Create( ctx context.Context, target roachpb.TenantID, -) (streaming.StreamID, error) { +) (streampb.StreamID, error) { log.Infof(ctx, "creating random stream for tenant %d", target.ToUint64()) m.config.tenantID = target - return streaming.StreamID(target.ToUint64()), nil + return streampb.StreamID(target.ToUint64()), nil } // Heartbeat implements the Client interface. func (m *RandomStreamClient) Heartbeat( - ctx context.Context, _ streaming.StreamID, ts hlc.Timestamp, + ctx context.Context, _ streampb.StreamID, ts hlc.Timestamp, ) (streampb.StreamReplicationStatus, error) { m.mu.Lock() defer m.mu.Unlock() @@ -457,7 +456,7 @@ func (m *RandomStreamClient) Close(ctx context.Context) error { // Subscribe implements the Client interface. func (m *RandomStreamClient) Subscribe( - ctx context.Context, stream streaming.StreamID, spec SubscriptionToken, checkpoint hlc.Timestamp, + ctx context.Context, stream streampb.StreamID, spec SubscriptionToken, checkpoint hlc.Timestamp, ) (Subscription, error) { partitionURL, err := url.Parse(string(spec)) if err != nil { @@ -531,7 +530,7 @@ func (m *RandomStreamClient) Subscribe( // Complete implements the streamclient.Client interface. func (m *RandomStreamClient) Complete( - ctx context.Context, streamID streaming.StreamID, successfulIngestion bool, + ctx context.Context, streamID streampb.StreamID, successfulIngestion bool, ) error { return nil } diff --git a/pkg/ccl/streamingccl/streamingest/BUILD.bazel b/pkg/ccl/streamingccl/streamingest/BUILD.bazel index 267220d942f0..8e81e40ab02b 100644 --- a/pkg/ccl/streamingccl/streamingest/BUILD.bazel +++ b/pkg/ccl/streamingccl/streamingest/BUILD.bazel @@ -21,13 +21,14 @@ go_library( "//pkg/ccl/changefeedccl/cdctest", "//pkg/ccl/streamingccl", "//pkg/ccl/streamingccl/streamclient", - "//pkg/ccl/streamingccl/streampb", "//pkg/ccl/utilccl", "//pkg/jobs", "//pkg/jobs/jobspb", "//pkg/keys", "//pkg/kv", "//pkg/kv/bulk", + "//pkg/repstream", + "//pkg/repstream/streampb", "//pkg/roachpb", "//pkg/settings", "//pkg/settings/cluster", @@ -47,7 +48,6 @@ go_library( "//pkg/sql/types", "//pkg/storage", "//pkg/storage/enginepb", - "//pkg/streaming", "//pkg/util/ctxgroup", "//pkg/util/hlc", "//pkg/util/log", @@ -87,7 +87,6 @@ go_test( "//pkg/ccl/streamingccl", "//pkg/ccl/streamingccl/streamclient", "//pkg/ccl/streamingccl/streamingtest", - "//pkg/ccl/streamingccl/streampb", "//pkg/ccl/streamingccl/streamproducer", "//pkg/ccl/utilccl", "//pkg/cloud/impl:cloudimpl", @@ -96,6 +95,7 @@ go_test( "//pkg/keys", "//pkg/kv", "//pkg/kv/kvserver", + "//pkg/repstream/streampb", "//pkg/roachpb", "//pkg/security/securityassets", "//pkg/security/securitytest", @@ -110,7 +110,6 @@ go_test( "//pkg/sql/sem/eval", "//pkg/sql/sem/tree", "//pkg/storage", - "//pkg/streaming", "//pkg/testutils", "//pkg/testutils/distsqlutils", "//pkg/testutils/jobutils", diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingest_manager.go b/pkg/ccl/streamingccl/streamingest/stream_ingest_manager.go index 0f2e8b24ecd8..0213a7f4d0ae 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingest_manager.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingest_manager.go @@ -11,15 +11,15 @@ package streamingest import ( "context" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb" "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/repstream" + "github.com/cockroachdb/cockroach/pkg/repstream/streampb" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" - "github.com/cockroachdb/cockroach/pkg/streaming" "github.com/cockroachdb/cockroach/pkg/util/hlc" ) @@ -67,5 +67,5 @@ func newStreamIngestManagerWithPrivilegesCheck( } func init() { - streaming.GetStreamIngestManagerHook = newStreamIngestManagerWithPrivilegesCheck + repstream.GetStreamIngestManagerHook = newStreamIngestManagerWithPrivilegesCheck } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go index 5c50713e0f73..5807bc10f2b5 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go @@ -15,10 +15,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/repstream/streampb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" @@ -26,7 +26,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/rowexec" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" - "github.com/cockroachdb/cockroach/pkg/streaming" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -158,7 +157,7 @@ func (sf *streamIngestionFrontier) MustBeStreaming() bool { type heartbeatSender struct { lastSent time.Time client streamclient.Client - streamID streaming.StreamID + streamID streampb.StreamID frontierUpdates chan hlc.Timestamp frontier hlc.Timestamp flowCtx *execinfra.FlowCtx @@ -179,7 +178,7 @@ func newHeartbeatSender( } return &heartbeatSender{ client: streamClient, - streamID: streaming.StreamID(spec.StreamID), + streamID: streampb.StreamID(spec.StreamID), flowCtx: flowCtx, frontierUpdates: make(chan hlc.Timestamp), cancel: func() {}, diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go index f1f11ae74609..83069afa6fce 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go @@ -15,15 +15,14 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/repstream/streampb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" - "github.com/cockroachdb/cockroach/pkg/streaming" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/retry" @@ -103,7 +102,7 @@ func getStreamIngestionStats( if err != nil { return nil, err } - streamStatus, err := client.Heartbeat(ctx, streaming.StreamID(details.StreamID), hlc.MaxTimestamp) + streamStatus, err := client.Heartbeat(ctx, streampb.StreamID(details.StreamID), hlc.MaxTimestamp) if err != nil { stats.ProducerError = err.Error() } else { @@ -148,7 +147,7 @@ func connectToActiveClient( func waitUntilProducerActive( ctx context.Context, client streamclient.Client, - streamID streaming.StreamID, + streamID streampb.StreamID, heartbeatTimestamp hlc.Timestamp, ingestionJobID jobspb.JobID, ) error { @@ -222,7 +221,7 @@ func ingest(ctx context.Context, execCtx sql.JobExecContext, ingestionJob *jobs. return err } ingestWithClient := func() error { - streamID := streaming.StreamID(details.StreamID) + streamID := streampb.StreamID(details.StreamID) updateRunningStatus(ctx, ingestionJob, fmt.Sprintf("connecting to the producer job %d "+ "and creating a stream replication plan", streamID)) if err := waitUntilProducerActive(ctx, client, streamID, startTime, ingestionJob.ID()); err != nil { @@ -473,7 +472,7 @@ func activateTenant(ctx context.Context, execCtx interface{}, newTenantID roachp func (s *streamIngestionResumer) cancelProducerJob( ctx context.Context, details jobspb.StreamIngestionDetails, ) { - streamID := streaming.StreamID(details.StreamID) + streamID := streampb.StreamID(details.StreamID) addr := streamingccl.StreamAddress(details.StreamAddress) client, err := streamclient.NewStreamClient(ctx, addr) if err != nil { diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go index f0355ba9516c..97e5fdf44c96 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go @@ -21,12 +21,12 @@ import ( _ "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamingtest" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb" _ "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamproducer" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/repstream/streampb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/testutils" diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go index 0925375f61ba..852914bf54e6 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/bulk" + "github.com/cockroachdb/cockroach/pkg/repstream/streampb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -34,7 +35,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" - "github.com/cockroachdb/cockroach/pkg/streaming" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -333,7 +333,7 @@ func (sip *streamIngestionProcessor) Start(ctx context.Context) { } } - sub, err := streamClient.Subscribe(ctx, streaming.StreamID(sip.spec.StreamID), token, startTime) + sub, err := streamClient.Subscribe(ctx, streampb.StreamID(sip.spec.StreamID), token, startTime) if err != nil { sip.MoveToDraining(errors.Wrapf(err, "consuming partition %v", addr)) diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_planning.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_planning.go index 787d772e2dd8..293696bba0d2 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_planning.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_planning.go @@ -16,12 +16,12 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/repstream/streampb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/physicalplan" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" - "github.com/cockroachdb/cockroach/pkg/streaming" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/logtags" ) @@ -33,7 +33,7 @@ func distStreamIngestionPlanSpecs( initialHighWater hlc.Timestamp, checkpoint jobspb.StreamIngestionCheckpoint, jobID jobspb.JobID, - streamID streaming.StreamID, + streamID streampb.StreamID, oldTenantID roachpb.TenantID, newTenantID roachpb.TenantID, ) ([]*execinfrapb.StreamIngestionDataSpec, *execinfrapb.StreamIngestionFrontierSpec, error) { diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go index a04cb9f25eee..7cad7f99d5e3 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go @@ -21,11 +21,11 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/backupccl" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/repstream/streampb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" @@ -34,7 +34,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/storage" - "github.com/cockroachdb/cockroach/pkg/streaming" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/distsqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/storageutils" @@ -63,7 +62,7 @@ var _ streamclient.Client = &mockStreamClient{} // Create implements the Client interface. func (m *mockStreamClient) Create( ctx context.Context, target roachpb.TenantID, -) (streaming.StreamID, error) { +) (streampb.StreamID, error) { panic("unimplemented") } @@ -74,14 +73,14 @@ func (m *mockStreamClient) Dial(ctx context.Context) error { // Heartbeat implements the Client interface. func (m *mockStreamClient) Heartbeat( - ctx context.Context, ID streaming.StreamID, _ hlc.Timestamp, + ctx context.Context, ID streampb.StreamID, _ hlc.Timestamp, ) (streampb.StreamReplicationStatus, error) { panic("unimplemented") } // Plan implements the Client interface. func (m *mockStreamClient) Plan( - ctx context.Context, _ streaming.StreamID, + ctx context.Context, _ streampb.StreamID, ) (streamclient.Topology, error) { panic("unimplemented mock method") } @@ -108,7 +107,7 @@ func (m *mockSubscription) Err() error { // Subscribe implements the Client interface. func (m *mockStreamClient) Subscribe( ctx context.Context, - stream streaming.StreamID, + stream streampb.StreamID, token streamclient.SubscriptionToken, startTime hlc.Timestamp, ) (streamclient.Subscription, error) { @@ -144,7 +143,7 @@ func (m *mockStreamClient) Close(ctx context.Context) error { // Complete implements the streamclient.Client interface. func (m *mockStreamClient) Complete( - ctx context.Context, streamID streaming.StreamID, successfulIngestion bool, + ctx context.Context, streamID streampb.StreamID, successfulIngestion bool, ) error { return nil } @@ -157,7 +156,7 @@ var _ streamclient.Client = &errorStreamClient{} // ConsumePartition implements the streamclient.Client interface. func (m *errorStreamClient) Subscribe( ctx context.Context, - stream streaming.StreamID, + stream streampb.StreamID, spec streamclient.SubscriptionToken, checkpoint hlc.Timestamp, ) (streamclient.Subscription, error) { @@ -166,7 +165,7 @@ func (m *errorStreamClient) Subscribe( // Complete implements the streamclient.Client interface. func (m *errorStreamClient) Complete( - ctx context.Context, streamID streaming.StreamID, successfulIngestion bool, + ctx context.Context, streamID streampb.StreamID, successfulIngestion bool, ) error { return nil } diff --git a/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go b/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go index 8caff70736c4..c8f49a503384 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go @@ -19,11 +19,11 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/base" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb" _ "github.com/cockroachdb/cockroach/pkg/cloud/impl" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/repstream/streampb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/server/serverpb" diff --git a/pkg/ccl/streamingccl/streampb/empty.go b/pkg/ccl/streamingccl/streampb/empty.go deleted file mode 100644 index 1e596c9ef362..000000000000 --- a/pkg/ccl/streamingccl/streampb/empty.go +++ /dev/null @@ -1,11 +0,0 @@ -// Copyright 2022 The Cockroach Authors. -// -// Licensed as a CockroachDB Enterprise file under the Cockroach Community -// License (the "License"); you may not use this file except in compliance with -// the License. You may obtain a copy of the License at -// -// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt - -package streampb - -// This file is intentionally left empty. diff --git a/pkg/ccl/streamingccl/streamproducer/BUILD.bazel b/pkg/ccl/streamingccl/streamproducer/BUILD.bazel index 0aa22e7c4ad9..98f6db533c3d 100644 --- a/pkg/ccl/streamingccl/streamproducer/BUILD.bazel +++ b/pkg/ccl/streamingccl/streamproducer/BUILD.bazel @@ -13,7 +13,6 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/ccl/streamingccl", - "//pkg/ccl/streamingccl/streampb", "//pkg/ccl/utilccl", "//pkg/jobs", "//pkg/jobs/jobspb", @@ -24,6 +23,8 @@ go_library( "//pkg/kv/kvclient/rangefeed", "//pkg/kv/kvserver/protectedts", "//pkg/kv/kvserver/protectedts/ptpb", + "//pkg/repstream", + "//pkg/repstream/streampb", "//pkg/roachpb", "//pkg/security/username", "//pkg/settings/cluster", @@ -34,7 +35,6 @@ go_library( "//pkg/sql/sem/tree", "//pkg/sql/types", "//pkg/storage", - "//pkg/streaming", "//pkg/testutils", "//pkg/util/ctxgroup", "//pkg/util/hlc", @@ -66,7 +66,6 @@ go_test( "//pkg/ccl/storageccl", "//pkg/ccl/streamingccl", "//pkg/ccl/streamingccl/streamingtest", - "//pkg/ccl/streamingccl/streampb", "//pkg/ccl/utilccl", "//pkg/cloud/impl:cloudimpl", "//pkg/jobs", @@ -76,6 +75,7 @@ go_test( "//pkg/kv", "//pkg/kv/kvserver/protectedts", "//pkg/kv/kvserver/protectedts/ptpb", + "//pkg/repstream/streampb", "//pkg/roachpb", "//pkg/security/securityassets", "//pkg/security/securitytest", @@ -87,7 +87,6 @@ go_test( "//pkg/sql/distsql", "//pkg/sql/sem/eval", "//pkg/sql/sessiondatapb", - "//pkg/streaming", "//pkg/testutils", "//pkg/testutils/jobutils", "//pkg/testutils/serverutils", diff --git a/pkg/ccl/streamingccl/streamproducer/event_stream.go b/pkg/ccl/streamingccl/streamproducer/event_stream.go index 6d0086cfe73a..6c4690b76733 100644 --- a/pkg/ccl/streamingccl/streamproducer/event_stream.go +++ b/pkg/ccl/streamingccl/streamproducer/event_stream.go @@ -14,11 +14,11 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" + "github.com/cockroachdb/cockroach/pkg/repstream/streampb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" diff --git a/pkg/ccl/streamingccl/streamproducer/producer_job_test.go b/pkg/ccl/streamingccl/streamproducer/producer_job_test.go index b1d1a0614087..89bd7cfc243d 100644 --- a/pkg/ccl/streamingccl/streamproducer/producer_job_test.go +++ b/pkg/ccl/streamingccl/streamproducer/producer_job_test.go @@ -15,17 +15,16 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/base" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb" + "github.com/cockroachdb/cockroach/pkg/repstream/streampb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/distsql" - "github.com/cockroachdb/cockroach/pkg/streaming" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" @@ -202,7 +201,7 @@ func TestStreamReplicationProducerJob(t *testing.T) { var status streampb.StreamReplicationStatus require.NoError(t, source.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { status, err = updateReplicationStreamProgress( - ctx, timeutil.Now(), ptp, registry, streaming.StreamID(jr.JobID), + ctx, timeutil.Now(), ptp, registry, streampb.StreamID(jr.JobID), hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}, txn) return err })) @@ -238,7 +237,7 @@ func TestStreamReplicationProducerJob(t *testing.T) { expire := expirationTime(jr).Add(10 * time.Millisecond) require.NoError(t, source.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { streamStatus, err = updateReplicationStreamProgress( - ctx, expire, ptp, registry, streaming.StreamID(jr.JobID), updatedFrontier, txn) + ctx, expire, ptp, registry, streampb.StreamID(jr.JobID), updatedFrontier, txn) return err })) require.Equal(t, streampb.StreamReplicationStatus_STREAM_ACTIVE, streamStatus.StreamStatus) diff --git a/pkg/ccl/streamingccl/streamproducer/replication_manager.go b/pkg/ccl/streamingccl/streamproducer/replication_manager.go index 5ab6c379d5c8..0848ee817ff0 100644 --- a/pkg/ccl/streamingccl/streamproducer/replication_manager.go +++ b/pkg/ccl/streamingccl/streamproducer/replication_manager.go @@ -11,14 +11,14 @@ package streamproducer import ( "context" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb" "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/repstream" + "github.com/cockroachdb/cockroach/pkg/repstream/streampb" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" - "github.com/cockroachdb/cockroach/pkg/streaming" "github.com/cockroachdb/cockroach/pkg/util/hlc" ) @@ -88,5 +88,5 @@ func newReplicationStreamManagerWithPrivilegesCheck( } func init() { - streaming.GetReplicationStreamManagerHook = newReplicationStreamManagerWithPrivilegesCheck + repstream.GetReplicationStreamManagerHook = newReplicationStreamManagerWithPrivilegesCheck } diff --git a/pkg/ccl/streamingccl/streamproducer/replication_stream_test.go b/pkg/ccl/streamingccl/streamproducer/replication_stream_test.go index 1f22f14f2556..18d6617c066a 100644 --- a/pkg/ccl/streamingccl/streamproducer/replication_stream_test.go +++ b/pkg/ccl/streamingccl/streamproducer/replication_stream_test.go @@ -23,13 +23,13 @@ import ( _ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl" // Ensure we can start tenant. "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamingtest" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb" _ "github.com/cockroachdb/cockroach/pkg/cloud/impl" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" + "github.com/cockroachdb/cockroach/pkg/repstream/streampb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils" "github.com/cockroachdb/cockroach/pkg/sql/distsql" @@ -424,8 +424,8 @@ USE d; t.Run("stream-batches-events", func(t *testing.T) { srcTenant.SQL.Exec(t, ` CREATE TABLE t3( - i INT PRIMARY KEY, - a STRING, + i INT PRIMARY KEY, + a STRING, b STRING, INDEX (a,b), -- Just to have a bit more data in the table FAMILY fb (b) diff --git a/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go b/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go index 652de91ec09f..4564e46bcbca 100644 --- a/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go +++ b/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go @@ -13,13 +13,13 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb" + "github.com/cockroachdb/cockroach/pkg/repstream/streampb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" diff --git a/pkg/gen/protobuf.bzl b/pkg/gen/protobuf.bzl index 375dbeff5b28..ddf47b046add 100644 --- a/pkg/gen/protobuf.bzl +++ b/pkg/gen/protobuf.bzl @@ -13,7 +13,6 @@ PROTOBUF_SRCS = [ "//pkg/ccl/baseccl:baseccl_go_proto", "//pkg/ccl/sqlproxyccl/tenant:tenant_go_proto", "//pkg/ccl/storageccl/engineccl/enginepbccl:enginepbccl_go_proto", - "//pkg/ccl/streamingccl/streampb:streampb_go_proto", "//pkg/ccl/utilccl/licenseccl:licenseccl_go_proto", "//pkg/cloud/cloudpb:cloudpb_go_proto", "//pkg/cloud/externalconn/connectionpb:connectionpb_go_proto", @@ -40,6 +39,7 @@ PROTOBUF_SRCS = [ "//pkg/obsservice/obspb/opentelemetry-proto/logs/v1:v1_go_proto", "//pkg/obsservice/obspb/opentelemetry-proto/resource/v1:v1_go_proto", "//pkg/obsservice/obspb:obs_go_proto", + "//pkg/repstream/streampb:streampb_go_proto", "//pkg/roachpb:roachpb_go_proto", "//pkg/rpc:rpc_go_proto", "//pkg/server/diagnostics/diagnosticspb:diagnosticspb_go_proto", diff --git a/pkg/streaming/BUILD.bazel b/pkg/repstream/BUILD.bazel similarity index 73% rename from pkg/streaming/BUILD.bazel rename to pkg/repstream/BUILD.bazel index e1ce04c319c8..7608230a5823 100644 --- a/pkg/streaming/BUILD.bazel +++ b/pkg/repstream/BUILD.bazel @@ -2,12 +2,11 @@ load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( - name = "streaming", + name = "repstream", srcs = ["api.go"], - importpath = "github.com/cockroachdb/cockroach/pkg/streaming", + importpath = "github.com/cockroachdb/cockroach/pkg/repstream", visibility = ["//visibility:public"], deps = [ - "//pkg/ccl/streamingccl/streampb", "//pkg/kv", "//pkg/sql/sem/eval", "@com_github_cockroachdb_errors//:errors", diff --git a/pkg/streaming/api.go b/pkg/repstream/api.go similarity index 92% rename from pkg/streaming/api.go rename to pkg/repstream/api.go index 2d70e4324c12..40a4c17b9294 100644 --- a/pkg/streaming/api.go +++ b/pkg/repstream/api.go @@ -8,20 +8,16 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package streaming +package repstream import ( "context" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/errors" ) -// StreamID is the ID of a replication stream. -type StreamID = streampb.StreamID - // GetReplicationStreamManagerHook is the hook to get access to the producer side replication APIs. // Used by builtin functions to trigger streaming replication. var GetReplicationStreamManagerHook func(ctx context.Context, evalCtx *eval.Context, txn *kv.Txn) (eval.ReplicationStreamManager, error) diff --git a/pkg/ccl/streamingccl/streampb/BUILD.bazel b/pkg/repstream/streampb/BUILD.bazel similarity index 88% rename from pkg/ccl/streamingccl/streampb/BUILD.bazel rename to pkg/repstream/streampb/BUILD.bazel index 7ed7afceeada..80184f78b2f1 100644 --- a/pkg/ccl/streamingccl/streampb/BUILD.bazel +++ b/pkg/repstream/streampb/BUILD.bazel @@ -21,7 +21,7 @@ proto_library( go_proto_library( name = "streampb_go_proto", compilers = ["//pkg/cmd/protoc-gen-gogoroach:protoc-gen-gogoroach_compiler"], - importpath = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb", + importpath = "github.com/cockroachdb/cockroach/pkg/repstream/streampb", proto = ":streampb_proto", visibility = ["//visibility:public"], deps = [ @@ -40,7 +40,7 @@ go_library( "streamid.go", ], embed = [":streampb_go_proto"], - importpath = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb", + importpath = "github.com/cockroachdb/cockroach/pkg/repstream/streampb", visibility = ["//visibility:public"], ) diff --git a/pkg/repstream/streampb/empty.go b/pkg/repstream/streampb/empty.go new file mode 100644 index 000000000000..3fbd7425cb86 --- /dev/null +++ b/pkg/repstream/streampb/empty.go @@ -0,0 +1,13 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package streampb + +// This file is intentionally left empty. diff --git a/pkg/ccl/streamingccl/streampb/stream.proto b/pkg/repstream/streampb/stream.proto similarity index 93% rename from pkg/ccl/streamingccl/streampb/stream.proto rename to pkg/repstream/streampb/stream.proto index dfd6e4faf0a4..2f709304d80e 100644 --- a/pkg/ccl/streamingccl/streampb/stream.proto +++ b/pkg/repstream/streampb/stream.proto @@ -1,14 +1,16 @@ // Copyright 2021 The Cockroach Authors. // -// Licensed as a CockroachDB Enterprise file under the Cockroach Community -// License (the "License"); you may not use this file except in compliance with -// the License. You may obtain a copy of the License at +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. // -// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. syntax = "proto3"; -package cockroach.ccl.streamingccl; +package cockroach.repstream.streampb; option go_package = "streampb"; diff --git a/pkg/ccl/streamingccl/streampb/streamid.go b/pkg/repstream/streampb/streamid.go similarity index 50% rename from pkg/ccl/streamingccl/streampb/streamid.go rename to pkg/repstream/streampb/streamid.go index 4521d2b7400d..fac345f02b35 100644 --- a/pkg/ccl/streamingccl/streampb/streamid.go +++ b/pkg/repstream/streampb/streamid.go @@ -1,10 +1,12 @@ // Copyright 2022 The Cockroach Authors. // -// Licensed as a CockroachDB Enterprise file under the Cockroach Community -// License (the "License"); you may not use this file except in compliance with -// the License. You may obtain a copy of the License at +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. // -// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. package streampb diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index 773bcf7ce62d..051a02fbc2c6 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -299,6 +299,7 @@ go_library( "//pkg/obsservice/obspb", "//pkg/obsservice/obspb/opentelemetry-proto/common/v1:common", "//pkg/obsservice/obspb/opentelemetry-proto/logs/v1:logs", + "//pkg/repstream", "//pkg/roachpb", "//pkg/rpc", "//pkg/rpc/nodedialer", @@ -456,7 +457,6 @@ go_library( "//pkg/sql/vtable", "//pkg/storage", "//pkg/storage/enginepb", - "//pkg/streaming", "//pkg/testutils/serverutils", "//pkg/upgrade", "//pkg/util", diff --git a/pkg/sql/planner.go b/pkg/sql/planner.go index 89a1632ece9e..6e899772acd9 100644 --- a/pkg/sql/planner.go +++ b/pkg/sql/planner.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/repstream" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/spanconfig" @@ -43,7 +44,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/types" - "github.com/cockroachdb/cockroach/pkg/streaming" "github.com/cockroachdb/cockroach/pkg/upgrade" "github.com/cockroachdb/cockroach/pkg/util/cancelchecker" "github.com/cockroachdb/cockroach/pkg/util/envutil" @@ -835,10 +835,10 @@ func (p *planner) resetPlanner( func (p *planner) GetReplicationStreamManager( ctx context.Context, ) (eval.ReplicationStreamManager, error) { - return streaming.GetReplicationStreamManager(ctx, p.EvalContext(), p.Txn()) + return repstream.GetReplicationStreamManager(ctx, p.EvalContext(), p.Txn()) } // GetStreamIngestManager returns a StreamIngestManager. func (p *planner) GetStreamIngestManager(ctx context.Context) (eval.StreamIngestManager, error) { - return streaming.GetStreamIngestManager(ctx, p.EvalContext(), p.Txn()) + return repstream.GetStreamIngestManager(ctx, p.EvalContext(), p.Txn()) } diff --git a/pkg/sql/sem/builtins/BUILD.bazel b/pkg/sql/sem/builtins/BUILD.bazel index d55f07281df3..30c4f627bfd2 100644 --- a/pkg/sql/sem/builtins/BUILD.bazel +++ b/pkg/sql/sem/builtins/BUILD.bazel @@ -31,7 +31,6 @@ go_library( deps = [ "//pkg/base", "//pkg/build", - "//pkg/ccl/streamingccl/streampb", "//pkg/clusterversion", "//pkg/config/zonepb", "//pkg/geo", @@ -48,6 +47,7 @@ go_library( "//pkg/kv", "//pkg/kv/kvclient", "//pkg/kv/kvserver/kvserverbase", + "//pkg/repstream/streampb", "//pkg/roachpb", "//pkg/security/password", "//pkg/security/username", diff --git a/pkg/sql/sem/builtins/replication_builtins.go b/pkg/sql/sem/builtins/replication_builtins.go index 346d2b0596d8..0ba86b627b7f 100644 --- a/pkg/sql/sem/builtins/replication_builtins.go +++ b/pkg/sql/sem/builtins/replication_builtins.go @@ -14,8 +14,8 @@ import ( "context" gojson "encoding/json" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/repstream/streampb" "github.com/cockroachdb/cockroach/pkg/sql/sem/builtins/builtinconstants" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" diff --git a/pkg/sql/sem/eval/BUILD.bazel b/pkg/sql/sem/eval/BUILD.bazel index 02e5413c29de..b28b208ecd83 100644 --- a/pkg/sql/sem/eval/BUILD.bazel +++ b/pkg/sql/sem/eval/BUILD.bazel @@ -37,7 +37,6 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/base", - "//pkg/ccl/streamingccl/streampb", "//pkg/clusterversion", "//pkg/geo", "//pkg/geo/geopb", @@ -45,6 +44,7 @@ go_library( "//pkg/keys", "//pkg/kv", "//pkg/kv/kvserver/kvserverbase", + "//pkg/repstream/streampb", "//pkg/roachpb", "//pkg/security/username", "//pkg/server/telemetry", diff --git a/pkg/sql/sem/eval/context.go b/pkg/sql/sem/eval/context.go index 2439dc0ff821..e6928f84ee2e 100644 --- a/pkg/sql/sem/eval/context.go +++ b/pkg/sql/sem/eval/context.go @@ -17,11 +17,11 @@ import ( "github.com/cockroachdb/apd/v3" "github.com/cockroachdb/cockroach/pkg/base" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" + "github.com/cockroachdb/cockroach/pkg/repstream/streampb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"