Skip to content

Commit

Permalink
streamccl,sql: moving StreamIngestionManager and ReplicationStreamMan…
Browse files Browse the repository at this point in the history
…ager to eval

This commit:

1. moves the definition of StreamIngestionManager and ReplicationStreamManager
to eval;
2. has planner implements functions in StreamIngestionManager and
ReplicationStreamManager, so that they won't take eval.Context and evalCtx.Txn
as parameters.

Release note: None
  • Loading branch information
ZhouXing19 committed Nov 3, 2022
1 parent cb5d760 commit 431b1f4
Show file tree
Hide file tree
Showing 27 changed files with 243 additions and 194 deletions.
2 changes: 1 addition & 1 deletion docs/generated/redact_safe.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pkg/base/node_id.go | `*SQLIDContainer`
pkg/base/node_id.go | `*StoreIDContainer`
pkg/ccl/backupccl/backuppb/backup.go | `sz`
pkg/ccl/backupccl/backuppb/backup.go | `timing`
pkg/ccl/streamingccl/streampb/streamid.go | `StreamID`
pkg/cli/exit/exit.go | `Code`
pkg/jobs/jobspb/wrap.go | `Type`
pkg/kv/bulk/bulk_metrics.go | `sz`
Expand Down Expand Up @@ -51,7 +52,6 @@ pkg/storage/enginepb/mvcc.go | `TxnEpoch`
pkg/storage/enginepb/mvcc.go | `TxnSeq`
pkg/storage/enginepb/mvcc3.go | `*MVCCStats`
pkg/storage/enginepb/mvcc3.go | `MVCCStatsDelta`
pkg/streaming/api.go | `StreamID`
pkg/util/hlc/timestamp.go | `ClockTimestamp`
pkg/util/hlc/timestamp.go | `LegacyTimestamp`
pkg/util/hlc/timestamp.go | `Timestamp`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (p *partitionedStreamClient) Create(
row := p.mu.srcConn.QueryRow(ctx, `SELECT crdb_internal.start_replication_stream($1)`, tenantID.ToUint64())
err := row.Scan(&streamID)
if err != nil {
return streaming.InvalidStreamID,
return streampb.InvalidStreamID,
errors.Wrapf(err, "error creating replication stream for tenant %s", tenantID.String())
}

Expand Down
23 changes: 11 additions & 12 deletions pkg/ccl/streamingccl/streamingest/stream_ingest_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,29 +23,28 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
)

type streamIngestManagerImpl struct{}
type streamIngestManagerImpl struct {
evalCtx *eval.Context
txn *kv.Txn
}

// CompleteStreamIngestion implements streaming.StreamIngestManager interface.
func (r *streamIngestManagerImpl) CompleteStreamIngestion(
ctx context.Context,
evalCtx *eval.Context,
txn *kv.Txn,
ingestionJobID jobspb.JobID,
cutoverTimestamp hlc.Timestamp,
ctx context.Context, ingestionJobID jobspb.JobID, cutoverTimestamp hlc.Timestamp,
) error {
return completeStreamIngestion(ctx, evalCtx, txn, ingestionJobID, cutoverTimestamp)
return completeStreamIngestion(ctx, r.evalCtx, r.txn, ingestionJobID, cutoverTimestamp)
}

// GetStreamIngestionStats implements streaming.StreamIngestManager interface.
func (r *streamIngestManagerImpl) GetStreamIngestionStats(
ctx context.Context, evalCtx *eval.Context, txn *kv.Txn, ingestionJobID jobspb.JobID,
ctx context.Context, ingestionJobID jobspb.JobID,
) (*streampb.StreamIngestionStats, error) {
return getStreamIngestionStats(ctx, evalCtx, txn, ingestionJobID)
return getStreamIngestionStats(ctx, r.evalCtx, r.txn, ingestionJobID)
}

func newStreamIngestManagerWithPrivilegesCheck(
ctx context.Context, evalCtx *eval.Context,
) (streaming.StreamIngestManager, error) {
ctx context.Context, evalCtx *eval.Context, txn *kv.Txn,
) (eval.StreamIngestManager, error) {
isAdmin, err := evalCtx.SessionAccessor.HasAdminRole(ctx)
if err != nil {
return nil, err
Expand All @@ -64,7 +63,7 @@ func newStreamIngestManagerWithPrivilegesCheck(
pgcode.InsufficientPrivilege, "replication requires enterprise license")
}

return &streamIngestManagerImpl{}, nil
return &streamIngestManagerImpl{evalCtx: evalCtx, txn: txn}, nil
}

func init() {
Expand Down
5 changes: 4 additions & 1 deletion pkg/ccl/streamingccl/streampb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ go_proto_library(

go_library(
name = "streampb",
srcs = ["empty.go"],
srcs = [
"empty.go",
"streamid.go",
],
embed = [":streampb_go_proto"],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb",
visibility = ["//visibility:public"],
Expand Down
18 changes: 18 additions & 0 deletions pkg/ccl/streamingccl/streampb/streamid.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright 2022 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package streampb

// StreamID is the ID of a replication stream.
type StreamID int64

// SafeValue implements the redact.SafeValue interface.
func (j StreamID) SafeValue() {}

// InvalidStreamID is the zero value for StreamID corresponding to no stream.
const InvalidStreamID StreamID = 0
5 changes: 2 additions & 3 deletions pkg/ccl/streamingccl/streamproducer/event_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/streaming"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand All @@ -38,7 +37,7 @@ import (
)

type eventStream struct {
streamID streaming.StreamID
streamID streampb.StreamID
execCfg *sql.ExecutorConfig
spec streampb.StreamPartitionSpec
subscribedSpans roachpb.SpanGroup
Expand Down Expand Up @@ -515,7 +514,7 @@ func setConfigDefaults(cfg *streampb.StreamPartitionSpec_ExecutionConfig) {
}

func streamPartition(
evalCtx *eval.Context, streamID streaming.StreamID, opaqueSpec []byte,
evalCtx *eval.Context, streamID streampb.StreamID, opaqueSpec []byte,
) (eval.ValueGenerator, error) {
if !evalCtx.SessionData().AvoidBuffering {
return nil, errors.New("partition streaming requires 'SET avoid_buffering = true' option")
Expand Down
42 changes: 19 additions & 23 deletions pkg/ccl/streamingccl/streamproducer/replication_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,54 +22,49 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
)

type replicationStreamManagerImpl struct{}
type replicationStreamManagerImpl struct {
evalCtx *eval.Context
txn *kv.Txn
}

// StartReplicationStream implements streaming.ReplicationStreamManager interface.
func (r *replicationStreamManagerImpl) StartReplicationStream(
ctx context.Context, evalCtx *eval.Context, txn *kv.Txn, tenantID uint64,
) (streaming.StreamID, error) {
return startReplicationStreamJob(ctx, evalCtx, txn, tenantID)
ctx context.Context, tenantID uint64,
) (streampb.StreamID, error) {
return startReplicationStreamJob(ctx, r.evalCtx, r.txn, tenantID)
}

// HeartbeatReplicationStream implements streaming.ReplicationStreamManager interface.
func (r *replicationStreamManagerImpl) HeartbeatReplicationStream(
ctx context.Context,
evalCtx *eval.Context,
streamID streaming.StreamID,
frontier hlc.Timestamp,
txn *kv.Txn,
ctx context.Context, streamID streampb.StreamID, frontier hlc.Timestamp,
) (streampb.StreamReplicationStatus, error) {
return heartbeatReplicationStream(ctx, evalCtx, streamID, frontier, txn)
return heartbeatReplicationStream(ctx, r.evalCtx, r.txn, streamID, frontier)
}

// StreamPartition implements streaming.ReplicationStreamManager interface.
func (r *replicationStreamManagerImpl) StreamPartition(
evalCtx *eval.Context, streamID streaming.StreamID, opaqueSpec []byte,
streamID streampb.StreamID, opaqueSpec []byte,
) (eval.ValueGenerator, error) {
return streamPartition(evalCtx, streamID, opaqueSpec)
return streamPartition(r.evalCtx, streamID, opaqueSpec)
}

// GetReplicationStreamSpec implements streaming.ReplicationStreamManager interface.
func (r *replicationStreamManagerImpl) GetReplicationStreamSpec(
ctx context.Context, evalCtx *eval.Context, txn *kv.Txn, streamID streaming.StreamID,
ctx context.Context, streamID streampb.StreamID,
) (*streampb.ReplicationStreamSpec, error) {
return getReplicationStreamSpec(ctx, evalCtx, txn, streamID)
return getReplicationStreamSpec(ctx, r.evalCtx, streamID)
}

// CompleteReplicationStream implements ReplicationStreamManager interface.
func (r *replicationStreamManagerImpl) CompleteReplicationStream(
ctx context.Context,
evalCtx *eval.Context,
txn *kv.Txn,
streamID streaming.StreamID,
successfulIngestion bool,
ctx context.Context, streamID streampb.StreamID, successfulIngestion bool,
) error {
return completeReplicationStream(ctx, evalCtx, txn, streamID, successfulIngestion)
return completeReplicationStream(ctx, r.evalCtx, r.txn, streamID, successfulIngestion)
}

func newReplicationStreamManagerWithPrivilegesCheck(
ctx context.Context, evalCtx *eval.Context,
) (streaming.ReplicationStreamManager, error) {
ctx context.Context, evalCtx *eval.Context, txn *kv.Txn,
) (eval.ReplicationStreamManager, error) {
isAdmin, err := evalCtx.SessionAccessor.HasAdminRole(ctx)
if err != nil {
return nil, err
Expand All @@ -81,14 +76,15 @@ func newReplicationStreamManagerWithPrivilegesCheck(
}

execCfg := evalCtx.Planner.ExecutorConfig().(*sql.ExecutorConfig)

enterpriseCheckErr := utilccl.CheckEnterpriseEnabled(
execCfg.Settings, execCfg.NodeInfo.LogicalClusterID(), execCfg.Organization(), "REPLICATION")
if enterpriseCheckErr != nil {
return nil, pgerror.Wrap(enterpriseCheckErr,
pgcode.InsufficientPrivilege, "replication requires enterprise license")
}

return &replicationStreamManagerImpl{}, nil
return &replicationStreamManagerImpl{evalCtx: evalCtx, txn: txn}, nil
}

func init() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/streaming"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand All @@ -44,15 +43,15 @@ func TestReplicationManagerRequiresAdminRole(t *testing.T) {
require.NoError(t, protoutil.Unmarshal(sessionSerialized, &sessionData))
}

getManagerForUser := func(u string) (streaming.ReplicationStreamManager, error) {
getManagerForUser := func(u string) (eval.ReplicationStreamManager, error) {
sqlUser, err := username.MakeSQLUsernameFromUserInput(u, username.PurposeValidation)
require.NoError(t, err)
execCfg := s.ExecutorConfig().(sql.ExecutorConfig)
txn := kvDB.NewTxn(ctx, "test")
p, cleanup := sql.NewInternalPlanner("test", txn, sqlUser, &sql.MemoryMetrics{}, &execCfg, sessionData)
defer cleanup()
ec := p.(interface{ EvalContext() *eval.Context }).EvalContext()
return newReplicationStreamManagerWithPrivilegesCheck(ctx, ec)
return newReplicationStreamManagerWithPrivilegesCheck(ctx, ec, txn)
}

for _, tc := range []struct {
Expand Down
23 changes: 11 additions & 12 deletions pkg/ccl/streamingccl/streamproducer/stream_lifetime.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/streaming"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand All @@ -36,24 +35,24 @@ import (
// 2. TODO(casper): Updates the protected timestamp for spans being replicated
func startReplicationStreamJob(
ctx context.Context, evalCtx *eval.Context, txn *kv.Txn, tenantID uint64,
) (streaming.StreamID, error) {
) (streampb.StreamID, error) {
execConfig := evalCtx.Planner.ExecutorConfig().(*sql.ExecutorConfig)
hasAdminRole, err := evalCtx.SessionAccessor.HasAdminRole(ctx)

if err != nil {
return streaming.InvalidStreamID, err
return streampb.InvalidStreamID, err
}

if !hasAdminRole {
return streaming.InvalidStreamID, errors.New("admin role required to start stream replication jobs")
return streampb.InvalidStreamID, errors.New("admin role required to start stream replication jobs")
}

registry := execConfig.JobRegistry
timeout := streamingccl.StreamReplicationJobLivenessTimeout.Get(&evalCtx.Settings.SV)
ptsID := uuid.MakeV4()
jr := makeProducerJobRecord(registry, tenantID, timeout, evalCtx.SessionData().User(), ptsID)
if _, err := registry.CreateAdoptableJobWithTxn(ctx, jr, jr.JobID, txn); err != nil {
return streaming.InvalidStreamID, err
return streampb.InvalidStreamID, err
}

ptp := execConfig.ProtectedTimestampProvider
Expand All @@ -68,9 +67,9 @@ func startReplicationStreamJob(
deprecatedSpansToProtect, jobsprotectedts.Jobs, targetToProtect)

if err := ptp.Protect(ctx, txn, pts); err != nil {
return streaming.InvalidStreamID, err
return streampb.InvalidStreamID, err
}
return streaming.StreamID(jr.JobID), nil
return streampb.StreamID(jr.JobID), nil
}

// Convert the producer job's status into corresponding replication
Expand Down Expand Up @@ -99,7 +98,7 @@ func updateReplicationStreamProgress(
expiration time.Time,
ptsProvider protectedts.Provider,
registry *jobs.Registry,
streamID streaming.StreamID,
streamID streampb.StreamID,
consumedTime hlc.Timestamp,
txn *kv.Txn,
) (status streampb.StreamReplicationStatus, err error) {
Expand Down Expand Up @@ -157,9 +156,9 @@ func updateReplicationStreamProgress(
func heartbeatReplicationStream(
ctx context.Context,
evalCtx *eval.Context,
streamID streaming.StreamID,
frontier hlc.Timestamp,
txn *kv.Txn,
streamID streampb.StreamID,
frontier hlc.Timestamp,
) (streampb.StreamReplicationStatus, error) {
execConfig := evalCtx.Planner.ExecutorConfig().(*sql.ExecutorConfig)
timeout := streamingccl.StreamReplicationJobLivenessTimeout.Get(&evalCtx.Settings.SV)
Expand Down Expand Up @@ -198,7 +197,7 @@ func heartbeatReplicationStream(

// getReplicationStreamSpec gets a replication stream specification for the specified stream.
func getReplicationStreamSpec(
ctx context.Context, evalCtx *eval.Context, txn *kv.Txn, streamID streaming.StreamID,
ctx context.Context, evalCtx *eval.Context, streamID streampb.StreamID,
) (*streampb.ReplicationStreamSpec, error) {
jobExecCtx := evalCtx.JobExecContext.(sql.JobExecContext)
// Returns error if the replication stream is not active
Expand Down Expand Up @@ -257,7 +256,7 @@ func completeReplicationStream(
ctx context.Context,
evalCtx *eval.Context,
txn *kv.Txn,
streamID streaming.StreamID,
streamID streampb.StreamID,
successfulIngestion bool,
) error {
registry := evalCtx.Planner.ExecutorConfig().(*sql.ExecutorConfig).JobRegistry
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,7 @@ go_library(
"//pkg/sql/vtable",
"//pkg/storage",
"//pkg/storage/enginepb",
"//pkg/streaming",
"//pkg/testutils/serverutils",
"//pkg/upgrade",
"//pkg/util",
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/apply_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ func runPlanInsidePlan(
ctx, evalCtx, &plannerCopy, params.p.txn, distributeType)
planCtx.planner.curPlan.planComponents = *plan
planCtx.ExtendedEvalCtx.Planner = &plannerCopy
planCtx.ExtendedEvalCtx.StreamManagerFactory = &plannerCopy
planCtx.stmtType = recv.stmtType

params.p.extendedEvalCtx.ExecCfg.DistSQLPlanner.PlanAndRun(
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colexec/colexecbase/cast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func TestRandomizedCast(t *testing.T) {
evalCtx := eval.MakeTestingEvalContext(st)
defer evalCtx.Stop(ctx)
evalCtx.Planner = &faketreeeval.DummyEvalPlanner{}
evalCtx.StreamManagerFactory = &faketreeeval.DummyStreamManagerFactory{}
rng, _ := randutil.NewTestRand()

getValidSupportedCast := func() (from, to *types.T) {
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colexec/colexecbase/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func TestMain(m *testing.M) {
testMemAcc = &memAcc
evalCtx := eval.MakeTestingEvalContext(st)
evalCtx.Planner = &faketreeeval.DummyEvalPlanner{}
evalCtx.StreamManagerFactory = &faketreeeval.DummyStreamManagerFactory{}
testColumnFactory = coldataext.NewExtendedColumnFactory(&evalCtx)
testAllocator = colmem.NewAllocator(ctx, testMemAcc, testColumnFactory)
defer testMemAcc.Close(ctx)
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2762,6 +2762,7 @@ func (ex *connExecutor) initEvalCtx(ctx context.Context, evalCtx *extendedEvalCo
*evalCtx = extendedEvalContext{
Context: eval.Context{
Planner: p,
StreamManagerFactory: p,
PrivilegedAccessor: p,
SessionAccessor: p,
JobExecContext: p,
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/distsql/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ func (ds *ServerImpl) setupFlow(
Locality: ds.ServerConfig.Locality,
Tracer: ds.ServerConfig.Tracer,
Planner: &faketreeeval.DummyEvalPlanner{Monitor: monitor},
StreamManagerFactory: &faketreeeval.DummyStreamManagerFactory{},
PrivilegedAccessor: &faketreeeval.DummyPrivilegedAccessor{},
SessionAccessor: &faketreeeval.DummySessionAccessor{},
ClientNoticeSender: &faketreeeval.DummyClientNoticeSender{},
Expand Down
Loading

0 comments on commit 431b1f4

Please sign in to comment.