diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt
index c86d6bc2770a..a1131a5fd652 100644
--- a/docs/generated/settings/settings-for-tenants.txt
+++ b/docs/generated/settings/settings-for-tenants.txt
@@ -16,6 +16,7 @@ changefeed.event_consumer_worker_queue_size integer 16 if changefeed.event_consu
changefeed.event_consumer_workers integer 0 the number of workers to use when processing events: <0 disables, 0 assigns a reasonable default, >0 assigns the setting value. for experimental/core changefeeds and changefeeds using parquet format, this is disabled
changefeed.fast_gzip.enabled boolean true use fast gzip implementation
changefeed.node_throttle_config string specifies node level throttling configuration for all changefeeeds
+changefeed.protect_timestamp.expire_after duration 72h0m0s automatically expire protected timestamp older than this threshold. 0 disables expiration. Note: this setting should not be too small; it should be larger than changefeed.protect_timestamp_interval
changefeed.schema_feed.read_with_priority_after duration 1m0s retry with high priority if we were not able to read descriptors for too long; 0 disables
cloudstorage.azure.concurrent_upload_buffers integer 1 controls the number of concurrent buffers that will be used by the Azure client when uploading chunks.Each buffer can buffer up to cloudstorage.write_chunk.size of memory during an upload
cloudstorage.http.custom_ca string custom root CA (appended to system's default CAs) for verifying certificates when interacting with HTTPS storage
diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html
index 00ad25da61df..7c5e4591400b 100644
--- a/docs/generated/settings/settings.html
+++ b/docs/generated/settings/settings.html
@@ -22,6 +22,7 @@
changefeed.event_consumer_workers
| integer | 0 | the number of workers to use when processing events: <0 disables, 0 assigns a reasonable default, >0 assigns the setting value. for experimental/core changefeeds and changefeeds using parquet format, this is disabled |
changefeed.fast_gzip.enabled
| boolean | true | use fast gzip implementation |
changefeed.node_throttle_config
| string |
| specifies node level throttling configuration for all changefeeeds |
+changefeed.protect_timestamp.expire_after
| duration | 72h0m0s | automatically expire protected timestamp older than this threshold. 0 disables expiration. Note: this setting should not be too small; it should be larger than changefeed.protect_timestamp_interval |
changefeed.schema_feed.read_with_priority_after
| duration | 1m0s | retry with high priority if we were not able to read descriptors for too long; 0 disables |
cloudstorage.azure.concurrent_upload_buffers
| integer | 1 | controls the number of concurrent buffers that will be used by the Azure client when uploading chunks.Each buffer can buffer up to cloudstorage.write_chunk.size of memory during an upload |
cloudstorage.http.custom_ca
| string |
| custom root CA (appended to system's default CAs) for verifying certificates when interacting with HTTPS storage |
diff --git a/pkg/ccl/changefeedccl/changefeed.go b/pkg/ccl/changefeedccl/changefeed.go
index 4b842036e856..459b3765725c 100644
--- a/pkg/ccl/changefeedccl/changefeed.go
+++ b/pkg/ccl/changefeedccl/changefeed.go
@@ -11,6 +11,7 @@ package changefeedccl
import (
"context"
"encoding/json"
+ "time"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/cloud"
@@ -115,11 +116,13 @@ func createProtectedTimestampRecord(
jobID jobspb.JobID,
targets changefeedbase.Targets,
resolved hlc.Timestamp,
+ expiration time.Duration,
progress *jobspb.ChangefeedProgress,
) *ptpb.Record {
progress.ProtectedTimestampRecord = uuid.MakeV4()
deprecatedSpansToProtect := makeSpansToProtect(codec, targets)
targetToProtect := makeTargetToProtect(targets)
+ targetToProtect.Expiration = expiration
log.VEventf(ctx, 2, "creating protected timestamp %v at %v", progress.ProtectedTimestampRecord, resolved)
return jobsprotectedts.MakeRecord(
diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go
index 5b1e487e0984..77429758ba9e 100644
--- a/pkg/ccl/changefeedccl/changefeed_processors.go
+++ b/pkg/ccl/changefeedccl/changefeed_processors.go
@@ -1368,8 +1368,11 @@ func (cf *changeFrontier) manageProtectedTimestamps(
}
recordID := progress.ProtectedTimestampRecord
+ expiration := changefeedbase.PTSExpiresAfter.Get(&cf.flowCtx.Cfg.Settings.SV)
if recordID == uuid.Nil {
- ptr := createProtectedTimestampRecord(ctx, cf.flowCtx.Codec(), cf.spec.JobID, AllTargets(cf.spec.Feed), highWater, progress)
+ ptr := createProtectedTimestampRecord(
+ ctx, cf.flowCtx.Codec(), cf.spec.JobID, AllTargets(cf.spec.Feed), highWater, expiration, progress,
+ )
if err := pts.Protect(ctx, ptr); err != nil {
return err
}
diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go
index a456d8ce8855..c8d5a14341bd 100644
--- a/pkg/ccl/changefeedccl/changefeed_stmt.go
+++ b/pkg/ccl/changefeedccl/changefeed_stmt.go
@@ -270,6 +270,7 @@ func changefeedPlanHook(
jobID,
AllTargets(details),
details.StatementTime,
+ changefeedbase.PTSExpiresAfter.Get(&p.ExecCfg().Settings.SV),
progress.GetChangefeed(),
)
@@ -1206,7 +1207,10 @@ func (b *changefeedResumer) OnPauseRequest(
return nil
}
pts := execCfg.ProtectedTimestampProvider.WithTxn(txn)
- ptr := createProtectedTimestampRecord(ctx, execCfg.Codec, b.job.ID(), AllTargets(details), *resolved, cp)
+ ptr := createProtectedTimestampRecord(
+ ctx, execCfg.Codec, b.job.ID(), AllTargets(details), *resolved,
+ changefeedbase.PTSExpiresAfter.Get(&execCfg.Settings.SV), cp,
+ )
return pts.Protect(ctx, ptr)
}
diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go
index 5ca4e75848f0..5249f2d116f8 100644
--- a/pkg/ccl/changefeedccl/changefeed_test.go
+++ b/pkg/ccl/changefeedccl/changefeed_test.go
@@ -5411,6 +5411,7 @@ func TestChangefeedProtectedTimestampOnPause(t *testing.T) {
require.NotEqual(t, uuid.Nil, details.ProtectedTimestampRecord)
r, err := pts.GetRecord(ctx, details.ProtectedTimestampRecord)
require.NoError(t, err)
+ require.Equal(t, changefeedbase.PTSExpiresAfter.Get(&serverCfg.Settings.SV), r.Target.Expiration)
require.True(t, r.Timestamp.LessEq(*progress.GetHighWater()))
} else {
require.Equal(t, uuid.Nil, details.ProtectedTimestampRecord)
diff --git a/pkg/ccl/changefeedccl/changefeedbase/settings.go b/pkg/ccl/changefeedccl/changefeedbase/settings.go
index abb8292b3867..c351fc86d32c 100644
--- a/pkg/ccl/changefeedccl/changefeedbase/settings.go
+++ b/pkg/ccl/changefeedccl/changefeedbase/settings.go
@@ -211,6 +211,16 @@ var ProtectTimestampInterval = settings.RegisterDurationSetting(
settings.PositiveDuration,
)
+// PTSExpiresAfter sets the expiration for the protected timestamp records.
+var PTSExpiresAfter = settings.RegisterDurationSetting(
+ settings.TenantWritable,
+ "changefeed.protect_timestamp.expire_after",
+ "automatically expire protected timestamp older than this threshold. 0 disables expiration. "+
+ "Note: this setting should not be too small; it should be larger than changefeed.protect_timestamp_interval",
+ 72*time.Hour, // a very generous default in case job is paused in error.
+ settings.NonNegativeDuration,
+).WithPublic()
+
// BatchReductionRetryEnabled enables the temporary reduction of batch sizes upon kafka message too large errors
var BatchReductionRetryEnabled = settings.RegisterBoolSetting(
settings.TenantWritable,