Skip to content

Commit

Permalink
streamingccl: avoid jobExecContext.Txn()
Browse files Browse the repository at this point in the history
This appears to always be nil. As a method it doesn't really even make
sense, since it is unclear would such a transaction commit.

Release note: None
  • Loading branch information
stevendanna committed Nov 22, 2022
1 parent dca415e commit cf49a69
Showing 1 changed file with 11 additions and 9 deletions.
20 changes: 11 additions & 9 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,17 +513,19 @@ func (s *streamIngestionResumer) OnFailOrCancel(
details := s.job.Details().(jobspb.StreamIngestionDetails)
s.cancelProducerJob(ctx, details)

tenInfo, err := sql.GetTenantRecordByID(ctx, jobExecCtx.ExecCfg(), jobExecCtx.Txn(), details.DestinationTenantID)
if err != nil {
return errors.Wrap(err, "fetch tenant info")
}
return jobExecCtx.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
tenInfo, err := sql.GetTenantRecordByID(ctx, jobExecCtx.ExecCfg(), txn, details.DestinationTenantID)
if err != nil {
return errors.Wrap(err, "fetch tenant info")
}

tenInfo.TenantReplicationJobID = 0
if err := sql.UpdateTenantRecord(ctx, jobExecCtx.ExecCfg(), jobExecCtx.Txn(), tenInfo); err != nil {
return errors.Wrap(err, "update tenant record")
}
tenInfo.TenantReplicationJobID = 0
if err := sql.UpdateTenantRecord(ctx, jobExecCtx.ExecCfg(), txn, tenInfo); err != nil {
return errors.Wrap(err, "update tenant record")
}

return nil
return nil
})
}

func (s *streamIngestionResumer) ForceRealSpan() bool { return true }
Expand Down

0 comments on commit cf49a69

Please sign in to comment.