Skip to content

Commit

Permalink
Merge #110433
Browse files Browse the repository at this point in the history
110433: streamingccl: remove special case in heartbeat code r=msbutler a=stevendanna

Previously, you could poll the produce side job status via the builtin used for heartbeating by sending a frontier of MaxTimestamp.

Since the addition of SHOW TENANT WITH REPLICATION STATS no non-test facing code actually used this feature.

It was confusing to always have to call the "NoHeartbeat" version of a function. Further, the special case in the heartbeat code was something that required special care when modifying the function.

Here, we remove this unused feature.

Old versions may still use this feature via now-retired builtins, so we return an error if we see an attempt to heartbeat with a frontier of MaxTimestamp.

Epic: CRDB-26968

Release note: none

Co-authored-by: Steven Danna <[email protected]>
  • Loading branch information
craig[bot] and stevendanna committed Sep 14, 2023
2 parents 0cde11b + eb0e245 commit 82ea947
Show file tree
Hide file tree
Showing 10 changed files with 53 additions and 104 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/streamingccl/replicationtestutils/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ func GetStreamJobIds(
destTenantName).Scan(&tenantInfoBytes)
require.NoError(t, protoutil.Unmarshal(tenantInfoBytes, &tenantInfo))

stats := replicationutils.TestingGetStreamIngestionStatsNoHeartbeatFromReplicationJob(t, ctx, sqlRunner, int(tenantInfo.TenantReplicationJobID))
stats := replicationutils.TestingGetStreamIngestionStatsFromReplicationJob(t, ctx, sqlRunner, int(tenantInfo.TenantReplicationJobID))
return int(stats.IngestionDetails.StreamID), int(tenantInfo.TenantReplicationJobID)
}

Expand Down
1 change: 0 additions & 1 deletion pkg/ccl/streamingccl/replicationutils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationutils",
visibility = ["//visibility:public"],
deps = [
"//pkg/ccl/streamingccl/streamclient",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/kv/kvpb",
Expand Down
44 changes: 1 addition & 43 deletions pkg/ccl/streamingccl/replicationutils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"fmt"
"testing"

"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
Expand Down Expand Up @@ -133,7 +132,7 @@ func ScanSST(
return nil
}

func GetStreamIngestionStatsNoHeartbeat(
func GetStreamIngestionStats(
ctx context.Context,
streamIngestionDetails jobspb.StreamIngestionDetails,
jobProgress jobspb.Progress,
Expand Down Expand Up @@ -269,47 +268,6 @@ func fingerprintClustersByTable(
dstFingerprints)
}

func GetStreamIngestionStats(
ctx context.Context,
streamIngestionDetails jobspb.StreamIngestionDetails,
jobProgress jobspb.Progress,
) (*streampb.StreamIngestionStats, error) {
stats, err := GetStreamIngestionStatsNoHeartbeat(ctx, streamIngestionDetails, jobProgress)
if err != nil {
return nil, err
}
// No need to pass a db into this call because the StreamAddresses do not have
// an external connection url scheme.
client, err := streamclient.GetFirstActiveClient(ctx, stats.IngestionProgress.StreamAddresses, nil)
if err != nil {
return nil, err
}
streamStatus, err := client.Heartbeat(ctx, streampb.StreamID(stats.IngestionDetails.StreamID), hlc.MaxTimestamp)
if err != nil {
stats.ProducerError = err.Error()
} else {
stats.ProducerStatus = &streamStatus
}
return stats, client.Close(ctx)
}

func TestingGetStreamIngestionStatsNoHeartbeatFromReplicationJob(
t *testing.T, ctx context.Context, sqlRunner *sqlutils.SQLRunner, ingestionJobID int,
) *streampb.StreamIngestionStats {
var payloadBytes []byte
var progressBytes []byte
var payload jobspb.Payload
var progress jobspb.Progress
stmt := fmt.Sprintf(`SELECT payload, progress FROM (%s)`, jobutils.InternalSystemJobsBaseQuery)
sqlRunner.QueryRow(t, stmt, ingestionJobID).Scan(&payloadBytes, &progressBytes)
require.NoError(t, protoutil.Unmarshal(payloadBytes, &payload))
require.NoError(t, protoutil.Unmarshal(progressBytes, &progress))
details := payload.GetStreamIngestion()
stats, err := GetStreamIngestionStatsNoHeartbeat(ctx, *details, progress)
require.NoError(t, err)
return stats
}

func TestingGetStreamIngestionStatsFromReplicationJob(
t *testing.T, ctx context.Context, sqlRunner *sqlutils.SQLRunner, ingestionJobID int,
) *streampb.StreamIngestionStats {
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/streamingccl/streamingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ go_test(
"//pkg/kv/kvpb",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/protectedts",
"//pkg/kv/kvserver/protectedts/ptpb",
"//pkg/repstream/streampb",
"//pkg/roachpb",
"//pkg/security/securityassets",
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/streamingccl/streamingest/alter_replication_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func alterTenantJobCutover(
// that embeds a priviledge check which is already completed.
//
// Check that the timestamp is above our retained timestamp.
stats, err := replicationutils.GetStreamIngestionStatsNoHeartbeat(ctx, details, progress)
stats, err := replicationutils.GetStreamIngestionStats(ctx, details, progress)
if err != nil {
return hlc.Timestamp{}, err
}
Expand Down
63 changes: 42 additions & 21 deletions pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/testutils"
Expand Down Expand Up @@ -64,7 +66,6 @@ func TestTenantStreamingProducerJobTimedOut(t *testing.T) {

require.NotNil(t, stats.ReplicationLagInfo)
require.True(t, srcTime.LessEq(stats.ReplicationLagInfo.MinIngestedTimestamp))
require.Equal(t, "", stats.ProducerError)

// Make producer job easily times out
c.SrcSysSQL.ExecMultiple(t, replicationtestutils.ConfigureClusterSettings(map[string]string{
Expand Down Expand Up @@ -333,6 +334,24 @@ func TestTenantStreamingCheckpoint(t *testing.T) {

}

func requireReleasedProducerPTSRecord(
t *testing.T,
ctx context.Context,
srv serverutils.ApplicationLayerInterface,
producerJobID jobspb.JobID,
) {
t.Helper()
job, err := srv.JobRegistry().(*jobs.Registry).LoadJob(ctx, producerJobID)
require.NoError(t, err)
ptsRecordID := job.Payload().Details.(*jobspb.Payload_StreamReplication).StreamReplication.ProtectedTimestampRecordID
ptsProvider := srv.ExecutorConfig().(sql.ExecutorConfig).ProtectedTimestampProvider
err = srv.InternalDB().(descs.DB).Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
_, err := ptsProvider.WithTxn(txn).GetRecord(ctx, ptsRecordID)
return err
})
require.ErrorIs(t, err, protectedts.ErrNotExists)
}

func TestTenantStreamingCancelIngestion(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -362,9 +381,7 @@ func TestTenantStreamingCancelIngestion(t *testing.T) {
jobutils.WaitForJobToFail(c.T, c.SrcSysSQL, jobspb.JobID(producerJobID))

// Check if the producer job has released protected timestamp.
stats := replicationutils.TestingGetStreamIngestionStatsFromReplicationJob(t, ctx, c.DestSysSQL, ingestionJobID)
require.NotNil(t, stats.ProducerStatus)
require.Nil(t, stats.ProducerStatus.ProtectedTimestamp)
requireReleasedProducerPTSRecord(t, ctx, c.SrcSysServer.ApplicationLayer(), jobspb.JobID(producerJobID))

// Check if dest tenant key ranges are not cleaned up.
destTenantSpan := keys.MakeTenantSpan(args.DestTenantID)
Expand Down Expand Up @@ -432,9 +449,7 @@ func TestTenantStreamingDropTenantCancelsStream(t *testing.T) {
jobutils.WaitForJobToFail(c.T, c.SrcSysSQL, jobspb.JobID(producerJobID))

// Check if the producer job has released protected timestamp.
stats := replicationutils.TestingGetStreamIngestionStatsFromReplicationJob(t, ctx, c.DestSysSQL, ingestionJobID)
require.NotNil(t, stats.ProducerStatus)
require.Nil(t, stats.ProducerStatus.ProtectedTimestamp)
requireReleasedProducerPTSRecord(t, ctx, c.SrcSysServer.ApplicationLayer(), jobspb.JobID(producerJobID))

// Wait for the GC job to finish
c.DestSysSQL.Exec(t, "SHOW JOBS WHEN COMPLETE SELECT job_id FROM [SHOW JOBS] WHERE job_type = 'SCHEMA CHANGE GC'")
Expand Down Expand Up @@ -785,19 +800,27 @@ func TestProtectedTimestampManagement(t *testing.T) {
// waitForProducerProtection asserts that there is a PTS record protecting
// the source tenant. We ensure the PTS record is protecting a timestamp
// greater or equal to the frontier we know we have replicated up until.
waitForProducerProtection := func(c *replicationtestutils.TenantStreamingClusters, frontier hlc.Timestamp, replicationJobID int) {
waitForProducerProtection := func(c *replicationtestutils.TenantStreamingClusters, frontier hlc.Timestamp, producerJobID int) {
testutils.SucceedsSoon(t, func() error {
stats := replicationutils.TestingGetStreamIngestionStatsFromReplicationJob(t, ctx, c.DestSysSQL, replicationJobID)
if stats.ProducerStatus == nil {
return errors.New("nil ProducerStatus")
srv := c.SrcSysServer.ApplicationLayer()
job, err := srv.JobRegistry().(*jobs.Registry).LoadJob(ctx, jobspb.JobID(producerJobID))
if err != nil {
return err
}
if stats.ProducerStatus.ProtectedTimestamp == nil {
return errors.New("nil ProducerStatus.ProtectedTimestamp")
ptsRecordID := job.Payload().Details.(*jobspb.Payload_StreamReplication).StreamReplication.ProtectedTimestampRecordID
ptsProvider := srv.ExecutorConfig().(sql.ExecutorConfig).ProtectedTimestampProvider

var ptsRecord *ptpb.Record
if err := srv.InternalDB().(descs.DB).Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
var err error
ptsRecord, err = ptsProvider.WithTxn(txn).GetRecord(ctx, ptsRecordID)
return err
}); err != nil {
return err
}
pts := *stats.ProducerStatus.ProtectedTimestamp
if pts.Less(frontier) {
if ptsRecord.Timestamp.Less(frontier) {
return errors.Newf("protection is at %s, expected to be >= %s",
pts.String(), frontier.String())
ptsRecord.Timestamp.String(), frontier.String())
}
return nil
})
Expand Down Expand Up @@ -868,7 +891,7 @@ func TestProtectedTimestampManagement(t *testing.T) {

// Check that the producer and replication job have written a protected
// timestamp.
waitForProducerProtection(c, now, replicationJobID)
waitForProducerProtection(c, now, producerJobID)
checkDestinationProtection(c, now, replicationJobID)

now2 := now.Add(time.Second.Nanoseconds(), 0)
Expand All @@ -877,7 +900,7 @@ func TestProtectedTimestampManagement(t *testing.T) {
// protected timestamp record has also been updated on the destination
// cluster. This update happens in the same txn in which we update the
// replication job's progress.
waitForProducerProtection(c, now2, replicationJobID)
waitForProducerProtection(c, now2, producerJobID)
checkDestinationProtection(c, now2, replicationJobID)

if pauseBeforeTerminal {
Expand All @@ -902,9 +925,7 @@ func TestProtectedTimestampManagement(t *testing.T) {
}

// Check if the producer job has released protected timestamp.
stats := replicationutils.TestingGetStreamIngestionStatsFromReplicationJob(t, ctx, c.DestSysSQL, replicationJobID)
require.NotNil(t, stats.ProducerStatus)
require.Nil(t, stats.ProducerStatus.ProtectedTimestamp)
requireReleasedProducerPTSRecord(t, ctx, c.SrcSysServer.ApplicationLayer(), jobspb.JobID(producerJobID))

// Check if the replication job has released protected timestamp.
checkNoDestinationProtection(c, replicationJobID)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/streamingccl/streamingest/stream_ingest_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func getReplicationStatsAndStatus(
return nil, jobspb.ReplicationError.String(), err
}

stats, err := replicationutils.GetStreamIngestionStatsNoHeartbeat(ctx, details, job.Progress())
stats, err := replicationutils.GetStreamIngestionStats(ctx, details, job.Progress())
if err != nil {
return nil, jobspb.ReplicationError.String(), err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,6 @@ func testStreamReplicationStatus(
expectedStreamStatus.ProtectedTimestamp = &updatedFrontier
}
checkStreamStatus(t, updatedFrontier, expectedStreamStatus)
// Send a query.
// The expected protected timestamp is still 'updatedFrontier' as the protected
// timestamp doesn't get updated when this is a query.
checkStreamStatus(t, hlc.MaxTimestamp, expectedStreamStatus)
}

func TestReplicationStreamInitialization(t *testing.T) {
Expand Down
31 changes: 4 additions & 27 deletions pkg/ccl/streamingccl/streamproducer/stream_lifetime.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,7 @@ func updateReplicationStreamProgress(
}

// heartbeatReplicationStream updates replication stream progress and advances protected timestamp
// record to the specified frontier. If 'frontier' is hlc.MaxTimestamp, returns the producer job
// progress without updating it.
// record to the specified frontier.
func heartbeatReplicationStream(
ctx context.Context,
evalCtx *eval.Context,
Expand All @@ -204,32 +203,10 @@ func heartbeatReplicationStream(
execConfig := evalCtx.Planner.ExecutorConfig().(*sql.ExecutorConfig)
timeout := streamingccl.StreamReplicationJobLivenessTimeout.Get(&evalCtx.Settings.SV)
expirationTime := timeutil.Now().Add(timeout)
// MaxTimestamp indicates not a real heartbeat, skip updating the producer
// job progress.
if frontier == hlc.MaxTimestamp {
var status streampb.StreamReplicationStatus
pj, err := execConfig.JobRegistry.LoadJobWithTxn(ctx, jobspb.JobID(streamID), txn)
if jobs.HasJobNotFoundError(err) || testutils.IsError(err, "not found in system.jobs table") {
status.StreamStatus = streampb.StreamReplicationStatus_STREAM_INACTIVE
return status, nil
}
if err != nil {
return streampb.StreamReplicationStatus{}, err
}
status.StreamStatus = convertProducerJobStatusToStreamStatus(pj.Status())
payload := pj.Payload()
ptsRecord, err := execConfig.ProtectedTimestampProvider.WithTxn(txn).GetRecord(
ctx, payload.GetStreamReplication().ProtectedTimestampRecordID,
)
// Nil protected timestamp indicates it was not created or has been released.
if errors.Is(err, protectedts.ErrNotExists) {
return status, nil
}
if err != nil {
return streampb.StreamReplicationStatus{}, err
}
status.ProtectedTimestamp = &ptsRecord.Timestamp
return status, nil
// NB: We used to allow this as a no-op update to get
// the status. That code was removed.
return streampb.StreamReplicationStatus{}, pgerror.Newf(pgcode.InvalidParameterValue, "MaxTimestamp no longer accepted as frontier")
}

return updateReplicationStreamProgress(ctx,
Expand Down
7 changes: 2 additions & 5 deletions pkg/repstream/streampb/stream.proto
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,8 @@ message StreamReplicationStatus {
}

message StreamIngestionStats {
// The status of current stream producer job.
StreamReplicationStatus producer_status = 1;

// The error when trying to reach the current stream producer job.
string producer_error = 2;
reserved 1;
reserved 2;

// Stream ingestion details.
cockroach.sql.jobs.jobspb.StreamIngestionDetails ingestion_details = 3;
Expand Down

0 comments on commit 82ea947

Please sign in to comment.