Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
90964: streamingccl: avoid passing `evalCtx`, `txn` as parameters to ingestion & replication funcs r=ZhouXing19 a=ZhouXing19

This PR is part of the effort to eliminate usages of `eval.Context.Txn`.
It moves 

1. the definition of `StreamIngestionManager` and `ReplicationStreamManager` under eval;
2. the implementation of `StreamIngestionManager` and `ReplicationStreamManager` via `sql.planner`.

The core changes are 

```
// GetReplicationStreamManager returns a ReplicationStreamManager.
func (p *planner) GetReplicationStreamManager(
	ctx context.Context,
) (eval.ReplicationStreamManager, error) {
	return streaming.GetReplicationStreamManager(ctx, p.EvalContext(), p.Txn())
}

// GetStreamIngestManager returns a StreamIngestManager.
func (p *planner) GetStreamIngestManager(ctx context.Context) (eval.StreamIngestManager, error) {
	return streaming.GetStreamIngestManager(ctx, p.EvalContext(), p.Txn())
}
```

so that the functions under these 2 interfaces run upon `eval.Context` and `kv.Txn` from the `sql.planner`. 

Follow-up: 

- [ ] Pass internal executor from planner too, rather than using `register.ex`.

informs #90923 

91249: acceptance: deflake `TestDockerCLI/test_txn_prompt` r=rafiss a=renatolabs

That test would sometimes fail because of the semantics of `expect` and `send` when the expected string was previously written using `send`.

When `expect` is called, the buffer looked at includes content previously sent using `send`. This means that if one runs `send "foo"; expect foo`, the `expect` call will match instataneously even if the program's output after the send does not contain `foo`.

In the case of the test fixed here, we are supposed to expect for the new prompt to be printed after setting it with `\set prompt1`. In order to properly check whether the prompt changed, this PR changes the prompt `expect` call to use a regular expression that matches the new prompt only if it sits in the beginning of a line.

Prior to this commit, since the `expect` call would return immediately, there was a chance the `send "SET DATABASE"` command could run before the cockroach CLI had the chance to print the new prompt, leading to the following error:

```
abcdefaultdbdef \set prompt1 woo
SET database
woo  ->
.221103 18:13:35.539667600 EXPECT TEST: TIMEOUT WAITING FOR "
 -> "
non-zero exit code: 1
```

Epic: None
Release note: None

91401: spanconfigsqltranslator: add sqlutil.InternalExecutor to SQLTranslator r=arulajmani a=ZhouXing19

This commit is part of the effort of having an internal executor better bound to its outer txn if there's one.

The next step after this commit is to replace the executor used in `s.ptsProvider.GetState()` in `SQLTranslator.Translate()` to the one hanging off `SQLTranslator`.

Informs: #91004

Release note: None

91423: roachpb: fix bug when logging lease in NLE r=ajwerner a=ajwerner

We were logging `lease holder unknown` when the deprecated field was not populated.

Epic: None

Release note: None

91436: multitenant: add admin function `RangeIterator failed to seek` test cases r=rafiss a=ecwall

refs #91434

This change adds test cases for admin functions (see #91434) that fail because of a `RangeIterator failed to seek` error once the multitenant check is bypassed. This needs to be addressed before those admin functions can be supported for secondary tenants.

Release note: None

91508: logictest: fix flake in fk due to sequence non-determinism r=ajwerner a=ajwerner

See [here](https://teamcity.cockroachdb.com/buildConfiguration/Cockroach_BazelEssentialCi/7392167?showRootCauses=false&expandBuildChangesSection=true&expandBuildProblemsSection=true&expandBuildTestsSection=true). This patch stops showing the sequence column while still relying on its ordering properties.

Epic: None

Release note: None

91515: kvserver: return DeprecatedLeaseHolder field in NLHEs r=arulajmani a=arulajmani

v22.1 binaries assume that the leaseholder is unknown when logging NLHE errors if the (Deprecated)LeaseHolder field is unset -- regardless of if the Lease is set or not. We broke this logging in 0402f47 (for mixed version clusters) when we stopped shipping back leaseholder information (in favour of only shipping lease information) on NLHEs. This patch fixes this by populating the (Deprecated)LeaseHolder field when constructing NLHEs.

Release note: None

Co-authored-by: Jane Xing <[email protected]>
Co-authored-by: Renato Costa <[email protected]>
Co-authored-by: Andrew Werner <[email protected]>
Co-authored-by: Evan Wall <[email protected]>
Co-authored-by: Arul Ajmani <[email protected]>
  • Loading branch information
6 people committed Nov 8, 2022
8 parents 12ac878 + 431b1f4 + 077b250 + 3d80db3 + d55a7a7 + ba5edd2 + 7cca6dd + 134de35 commit 2e90394
Show file tree
Hide file tree
Showing 42 changed files with 494 additions and 281 deletions.
2 changes: 1 addition & 1 deletion docs/generated/redact_safe.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pkg/base/node_id.go | `*SQLIDContainer`
pkg/base/node_id.go | `*StoreIDContainer`
pkg/ccl/backupccl/backuppb/backup.go | `sz`
pkg/ccl/backupccl/backuppb/backup.go | `timing`
pkg/ccl/streamingccl/streampb/streamid.go | `StreamID`
pkg/cli/exit/exit.go | `Code`
pkg/jobs/jobspb/wrap.go | `Type`
pkg/kv/bulk/bulk_metrics.go | `sz`
Expand Down Expand Up @@ -51,7 +52,6 @@ pkg/storage/enginepb/mvcc.go | `TxnEpoch`
pkg/storage/enginepb/mvcc.go | `TxnSeq`
pkg/storage/enginepb/mvcc3.go | `*MVCCStats`
pkg/storage/enginepb/mvcc3.go | `MVCCStatsDelta`
pkg/streaming/api.go | `StreamID`
pkg/util/hlc/timestamp.go | `ClockTimestamp`
pkg/util/hlc/timestamp.go | `LegacyTimestamp`
pkg/util/hlc/timestamp.go | `Timestamp`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ go_test(
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/descs",
"//pkg/sql/catalog/tabledesc",
"//pkg/sql/sqlutil",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
Expand Down Expand Up @@ -180,10 +181,10 @@ func TestDataDriven(t *testing.T) {

var records []spanconfig.Record
sqlTranslatorFactory := tenant.SpanConfigSQLTranslatorFactory().(*spanconfigsqltranslator.Factory)
err := sql.DescsTxn(ctx, &execCfg, func(
ctx context.Context, txn *kv.Txn, descsCol *descs.Collection,
err := execCfg.InternalExecutorFactory.DescsTxnWithExecutor(ctx, execCfg.DB, nil /* session data */, func(
ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, ie sqlutil.InternalExecutor,
) error {
sqlTranslator := sqlTranslatorFactory.NewSQLTranslator(txn, descsCol)
sqlTranslator := sqlTranslatorFactory.NewSQLTranslator(txn, ie, descsCol)
var err error
records, _, err = sqlTranslator.Translate(ctx, descIDs, generateSystemSpanConfigs)
require.NoError(t, err)
Expand Down Expand Up @@ -212,10 +213,10 @@ func TestDataDriven(t *testing.T) {
case "full-translate":
sqlTranslatorFactory := tenant.SpanConfigSQLTranslatorFactory().(*spanconfigsqltranslator.Factory)
var records []spanconfig.Record
err := sql.DescsTxn(ctx, &execCfg, func(
ctx context.Context, txn *kv.Txn, descsCol *descs.Collection,
err := execCfg.InternalExecutorFactory.DescsTxnWithExecutor(ctx, execCfg.DB, nil /* session data */, func(
ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, ie sqlutil.InternalExecutor,
) error {
sqlTranslator := sqlTranslatorFactory.NewSQLTranslator(txn, descsCol)
sqlTranslator := sqlTranslatorFactory.NewSQLTranslator(txn, ie, descsCol)
var err error
records, _, err = spanconfig.FullTranslate(ctx, sqlTranslator)
require.NoError(t, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (p *partitionedStreamClient) Create(
row := p.mu.srcConn.QueryRow(ctx, `SELECT crdb_internal.start_replication_stream($1)`, tenantID.ToUint64())
err := row.Scan(&streamID)
if err != nil {
return streaming.InvalidStreamID,
return streampb.InvalidStreamID,
errors.Wrapf(err, "error creating replication stream for tenant %s", tenantID.String())
}

Expand Down
23 changes: 11 additions & 12 deletions pkg/ccl/streamingccl/streamingest/stream_ingest_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,29 +23,28 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
)

type streamIngestManagerImpl struct{}
type streamIngestManagerImpl struct {
evalCtx *eval.Context
txn *kv.Txn
}

// CompleteStreamIngestion implements streaming.StreamIngestManager interface.
func (r *streamIngestManagerImpl) CompleteStreamIngestion(
ctx context.Context,
evalCtx *eval.Context,
txn *kv.Txn,
ingestionJobID jobspb.JobID,
cutoverTimestamp hlc.Timestamp,
ctx context.Context, ingestionJobID jobspb.JobID, cutoverTimestamp hlc.Timestamp,
) error {
return completeStreamIngestion(ctx, evalCtx, txn, ingestionJobID, cutoverTimestamp)
return completeStreamIngestion(ctx, r.evalCtx, r.txn, ingestionJobID, cutoverTimestamp)
}

// GetStreamIngestionStats implements streaming.StreamIngestManager interface.
func (r *streamIngestManagerImpl) GetStreamIngestionStats(
ctx context.Context, evalCtx *eval.Context, txn *kv.Txn, ingestionJobID jobspb.JobID,
ctx context.Context, ingestionJobID jobspb.JobID,
) (*streampb.StreamIngestionStats, error) {
return getStreamIngestionStats(ctx, evalCtx, txn, ingestionJobID)
return getStreamIngestionStats(ctx, r.evalCtx, r.txn, ingestionJobID)
}

func newStreamIngestManagerWithPrivilegesCheck(
ctx context.Context, evalCtx *eval.Context,
) (streaming.StreamIngestManager, error) {
ctx context.Context, evalCtx *eval.Context, txn *kv.Txn,
) (eval.StreamIngestManager, error) {
isAdmin, err := evalCtx.SessionAccessor.HasAdminRole(ctx)
if err != nil {
return nil, err
Expand All @@ -64,7 +63,7 @@ func newStreamIngestManagerWithPrivilegesCheck(
pgcode.InsufficientPrivilege, "replication requires enterprise license")
}

return &streamIngestManagerImpl{}, nil
return &streamIngestManagerImpl{evalCtx: evalCtx, txn: txn}, nil
}

func init() {
Expand Down
5 changes: 4 additions & 1 deletion pkg/ccl/streamingccl/streampb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ go_proto_library(

go_library(
name = "streampb",
srcs = ["empty.go"],
srcs = [
"empty.go",
"streamid.go",
],
embed = [":streampb_go_proto"],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb",
visibility = ["//visibility:public"],
Expand Down
18 changes: 18 additions & 0 deletions pkg/ccl/streamingccl/streampb/streamid.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright 2022 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package streampb

// StreamID is the ID of a replication stream.
type StreamID int64

// SafeValue implements the redact.SafeValue interface.
func (j StreamID) SafeValue() {}

// InvalidStreamID is the zero value for StreamID corresponding to no stream.
const InvalidStreamID StreamID = 0
5 changes: 2 additions & 3 deletions pkg/ccl/streamingccl/streamproducer/event_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/streaming"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand All @@ -38,7 +37,7 @@ import (
)

type eventStream struct {
streamID streaming.StreamID
streamID streampb.StreamID
execCfg *sql.ExecutorConfig
spec streampb.StreamPartitionSpec
subscribedSpans roachpb.SpanGroup
Expand Down Expand Up @@ -515,7 +514,7 @@ func setConfigDefaults(cfg *streampb.StreamPartitionSpec_ExecutionConfig) {
}

func streamPartition(
evalCtx *eval.Context, streamID streaming.StreamID, opaqueSpec []byte,
evalCtx *eval.Context, streamID streampb.StreamID, opaqueSpec []byte,
) (eval.ValueGenerator, error) {
if !evalCtx.SessionData().AvoidBuffering {
return nil, errors.New("partition streaming requires 'SET avoid_buffering = true' option")
Expand Down
42 changes: 19 additions & 23 deletions pkg/ccl/streamingccl/streamproducer/replication_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,54 +22,49 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
)

type replicationStreamManagerImpl struct{}
type replicationStreamManagerImpl struct {
evalCtx *eval.Context
txn *kv.Txn
}

// StartReplicationStream implements streaming.ReplicationStreamManager interface.
func (r *replicationStreamManagerImpl) StartReplicationStream(
ctx context.Context, evalCtx *eval.Context, txn *kv.Txn, tenantID uint64,
) (streaming.StreamID, error) {
return startReplicationStreamJob(ctx, evalCtx, txn, tenantID)
ctx context.Context, tenantID uint64,
) (streampb.StreamID, error) {
return startReplicationStreamJob(ctx, r.evalCtx, r.txn, tenantID)
}

// HeartbeatReplicationStream implements streaming.ReplicationStreamManager interface.
func (r *replicationStreamManagerImpl) HeartbeatReplicationStream(
ctx context.Context,
evalCtx *eval.Context,
streamID streaming.StreamID,
frontier hlc.Timestamp,
txn *kv.Txn,
ctx context.Context, streamID streampb.StreamID, frontier hlc.Timestamp,
) (streampb.StreamReplicationStatus, error) {
return heartbeatReplicationStream(ctx, evalCtx, streamID, frontier, txn)
return heartbeatReplicationStream(ctx, r.evalCtx, r.txn, streamID, frontier)
}

// StreamPartition implements streaming.ReplicationStreamManager interface.
func (r *replicationStreamManagerImpl) StreamPartition(
evalCtx *eval.Context, streamID streaming.StreamID, opaqueSpec []byte,
streamID streampb.StreamID, opaqueSpec []byte,
) (eval.ValueGenerator, error) {
return streamPartition(evalCtx, streamID, opaqueSpec)
return streamPartition(r.evalCtx, streamID, opaqueSpec)
}

// GetReplicationStreamSpec implements streaming.ReplicationStreamManager interface.
func (r *replicationStreamManagerImpl) GetReplicationStreamSpec(
ctx context.Context, evalCtx *eval.Context, txn *kv.Txn, streamID streaming.StreamID,
ctx context.Context, streamID streampb.StreamID,
) (*streampb.ReplicationStreamSpec, error) {
return getReplicationStreamSpec(ctx, evalCtx, txn, streamID)
return getReplicationStreamSpec(ctx, r.evalCtx, streamID)
}

// CompleteReplicationStream implements ReplicationStreamManager interface.
func (r *replicationStreamManagerImpl) CompleteReplicationStream(
ctx context.Context,
evalCtx *eval.Context,
txn *kv.Txn,
streamID streaming.StreamID,
successfulIngestion bool,
ctx context.Context, streamID streampb.StreamID, successfulIngestion bool,
) error {
return completeReplicationStream(ctx, evalCtx, txn, streamID, successfulIngestion)
return completeReplicationStream(ctx, r.evalCtx, r.txn, streamID, successfulIngestion)
}

func newReplicationStreamManagerWithPrivilegesCheck(
ctx context.Context, evalCtx *eval.Context,
) (streaming.ReplicationStreamManager, error) {
ctx context.Context, evalCtx *eval.Context, txn *kv.Txn,
) (eval.ReplicationStreamManager, error) {
isAdmin, err := evalCtx.SessionAccessor.HasAdminRole(ctx)
if err != nil {
return nil, err
Expand All @@ -81,14 +76,15 @@ func newReplicationStreamManagerWithPrivilegesCheck(
}

execCfg := evalCtx.Planner.ExecutorConfig().(*sql.ExecutorConfig)

enterpriseCheckErr := utilccl.CheckEnterpriseEnabled(
execCfg.Settings, execCfg.NodeInfo.LogicalClusterID(), execCfg.Organization(), "REPLICATION")
if enterpriseCheckErr != nil {
return nil, pgerror.Wrap(enterpriseCheckErr,
pgcode.InsufficientPrivilege, "replication requires enterprise license")
}

return &replicationStreamManagerImpl{}, nil
return &replicationStreamManagerImpl{evalCtx: evalCtx, txn: txn}, nil
}

func init() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/streaming"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand All @@ -44,15 +43,15 @@ func TestReplicationManagerRequiresAdminRole(t *testing.T) {
require.NoError(t, protoutil.Unmarshal(sessionSerialized, &sessionData))
}

getManagerForUser := func(u string) (streaming.ReplicationStreamManager, error) {
getManagerForUser := func(u string) (eval.ReplicationStreamManager, error) {
sqlUser, err := username.MakeSQLUsernameFromUserInput(u, username.PurposeValidation)
require.NoError(t, err)
execCfg := s.ExecutorConfig().(sql.ExecutorConfig)
txn := kvDB.NewTxn(ctx, "test")
p, cleanup := sql.NewInternalPlanner("test", txn, sqlUser, &sql.MemoryMetrics{}, &execCfg, sessionData)
defer cleanup()
ec := p.(interface{ EvalContext() *eval.Context }).EvalContext()
return newReplicationStreamManagerWithPrivilegesCheck(ctx, ec)
return newReplicationStreamManagerWithPrivilegesCheck(ctx, ec, txn)
}

for _, tc := range []struct {
Expand Down
23 changes: 11 additions & 12 deletions pkg/ccl/streamingccl/streamproducer/stream_lifetime.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/streaming"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand All @@ -36,24 +35,24 @@ import (
// 2. TODO(casper): Updates the protected timestamp for spans being replicated
func startReplicationStreamJob(
ctx context.Context, evalCtx *eval.Context, txn *kv.Txn, tenantID uint64,
) (streaming.StreamID, error) {
) (streampb.StreamID, error) {
execConfig := evalCtx.Planner.ExecutorConfig().(*sql.ExecutorConfig)
hasAdminRole, err := evalCtx.SessionAccessor.HasAdminRole(ctx)

if err != nil {
return streaming.InvalidStreamID, err
return streampb.InvalidStreamID, err
}

if !hasAdminRole {
return streaming.InvalidStreamID, errors.New("admin role required to start stream replication jobs")
return streampb.InvalidStreamID, errors.New("admin role required to start stream replication jobs")
}

registry := execConfig.JobRegistry
timeout := streamingccl.StreamReplicationJobLivenessTimeout.Get(&evalCtx.Settings.SV)
ptsID := uuid.MakeV4()
jr := makeProducerJobRecord(registry, tenantID, timeout, evalCtx.SessionData().User(), ptsID)
if _, err := registry.CreateAdoptableJobWithTxn(ctx, jr, jr.JobID, txn); err != nil {
return streaming.InvalidStreamID, err
return streampb.InvalidStreamID, err
}

ptp := execConfig.ProtectedTimestampProvider
Expand All @@ -68,9 +67,9 @@ func startReplicationStreamJob(
deprecatedSpansToProtect, jobsprotectedts.Jobs, targetToProtect)

if err := ptp.Protect(ctx, txn, pts); err != nil {
return streaming.InvalidStreamID, err
return streampb.InvalidStreamID, err
}
return streaming.StreamID(jr.JobID), nil
return streampb.StreamID(jr.JobID), nil
}

// Convert the producer job's status into corresponding replication
Expand Down Expand Up @@ -99,7 +98,7 @@ func updateReplicationStreamProgress(
expiration time.Time,
ptsProvider protectedts.Provider,
registry *jobs.Registry,
streamID streaming.StreamID,
streamID streampb.StreamID,
consumedTime hlc.Timestamp,
txn *kv.Txn,
) (status streampb.StreamReplicationStatus, err error) {
Expand Down Expand Up @@ -157,9 +156,9 @@ func updateReplicationStreamProgress(
func heartbeatReplicationStream(
ctx context.Context,
evalCtx *eval.Context,
streamID streaming.StreamID,
frontier hlc.Timestamp,
txn *kv.Txn,
streamID streampb.StreamID,
frontier hlc.Timestamp,
) (streampb.StreamReplicationStatus, error) {
execConfig := evalCtx.Planner.ExecutorConfig().(*sql.ExecutorConfig)
timeout := streamingccl.StreamReplicationJobLivenessTimeout.Get(&evalCtx.Settings.SV)
Expand Down Expand Up @@ -198,7 +197,7 @@ func heartbeatReplicationStream(

// getReplicationStreamSpec gets a replication stream specification for the specified stream.
func getReplicationStreamSpec(
ctx context.Context, evalCtx *eval.Context, txn *kv.Txn, streamID streaming.StreamID,
ctx context.Context, evalCtx *eval.Context, streamID streampb.StreamID,
) (*streampb.ReplicationStreamSpec, error) {
jobExecCtx := evalCtx.JobExecContext.(sql.JobExecContext)
// Returns error if the replication stream is not active
Expand Down Expand Up @@ -257,7 +256,7 @@ func completeReplicationStream(
ctx context.Context,
evalCtx *eval.Context,
txn *kv.Txn,
streamID streaming.StreamID,
streamID streampb.StreamID,
successfulIngestion bool,
) error {
registry := evalCtx.Planner.ExecutorConfig().(*sql.ExecutorConfig).JobRegistry
Expand Down
10 changes: 9 additions & 1 deletion pkg/cli/interactive_tests/common.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,15 @@ proc eexpect {text} {
}
}

# eexpect_re is like eexpect, but takes a regular expression argument
# instead of a text string
proc eexpect_re {text} {
expect {
-re $text {}
timeout { handle_timeout $text }
}
}

# Convenience function that sends Ctrl+C to the monitored process.
proc interrupt {} {
report "INTERRUPT TO FOREGROUND PROCESS"
Expand Down Expand Up @@ -178,4 +187,3 @@ proc stop_tenant {tenant_id argv} {

report "END STOP TENANT $tenant_id"
}

Loading

0 comments on commit 2e90394

Please sign in to comment.