From cf49a69319d4f2aef75c506c7db8b7696ed5d111 Mon Sep 17 00:00:00 2001 From: Steven Danna Date: Tue, 22 Nov 2022 07:57:15 +0000 Subject: [PATCH] streamingccl: avoid jobExecContext.Txn() This appears to always be nil. As a method it doesn't really even make sense, since it is unclear would such a transaction commit. Release note: None --- .../streamingest/stream_ingestion_job.go | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go index b8239ec8a237..840b01f1ab1c 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go @@ -513,17 +513,19 @@ func (s *streamIngestionResumer) OnFailOrCancel( details := s.job.Details().(jobspb.StreamIngestionDetails) s.cancelProducerJob(ctx, details) - tenInfo, err := sql.GetTenantRecordByID(ctx, jobExecCtx.ExecCfg(), jobExecCtx.Txn(), details.DestinationTenantID) - if err != nil { - return errors.Wrap(err, "fetch tenant info") - } + return jobExecCtx.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + tenInfo, err := sql.GetTenantRecordByID(ctx, jobExecCtx.ExecCfg(), txn, details.DestinationTenantID) + if err != nil { + return errors.Wrap(err, "fetch tenant info") + } - tenInfo.TenantReplicationJobID = 0 - if err := sql.UpdateTenantRecord(ctx, jobExecCtx.ExecCfg(), jobExecCtx.Txn(), tenInfo); err != nil { - return errors.Wrap(err, "update tenant record") - } + tenInfo.TenantReplicationJobID = 0 + if err := sql.UpdateTenantRecord(ctx, jobExecCtx.ExecCfg(), txn, tenInfo); err != nil { + return errors.Wrap(err, "update tenant record") + } - return nil + return nil + }) } func (s *streamIngestionResumer) ForceRealSpan() bool { return true }