From 951add48c4fbbe2f0310a3a85d8686e5f8dd52f3 Mon Sep 17 00:00:00 2001 From: Oleg Afanasyev Date: Tue, 15 Nov 2022 16:46:43 +0000 Subject: [PATCH] ptsstorage: allow synthetic timestamps in pts storage Previously synthetic timestamps were causing failures in changefeeds if checkpoint contained a synthetic timestamps. Timestamp representation was parsed as decimal for storage which is not the case for synthetic timestamps. This commit changes pts storage to strip synthetic flag to mitigate the issue. Release note: None --- .../kvserver/protectedts/ptstorage/storage.go | 6 ++-- .../protectedts/ptstorage/storage_test.go | 29 +++++++++++++++++++ 2 files changed, 32 insertions(+), 3 deletions(-) diff --git a/pkg/kv/kvserver/protectedts/ptstorage/storage.go b/pkg/kv/kvserver/protectedts/ptstorage/storage.go index 995b58e435aa..28bbf41e13bb 100644 --- a/pkg/kv/kvserver/protectedts/ptstorage/storage.go +++ b/pkg/kv/kvserver/protectedts/ptstorage/storage.go @@ -75,7 +75,7 @@ func (p *storage) UpdateTimestamp( ) error { row, err := p.ex.QueryRowEx(ctx, "protectedts-update", txn, sessiondata.InternalExecutorOverride{User: username.NodeUserName()}, - updateTimestampQuery, id.GetBytesMut(), timestamp.AsOfSystemTime()) + updateTimestampQuery, id.GetBytesMut(), timestamp.WithSynthetic(false).AsOfSystemTime()) if err != nil { return errors.Wrapf(err, "failed to update record %v", id) } @@ -97,7 +97,7 @@ func (p *storage) deprecatedProtect( sessiondata.InternalExecutorOverride{User: username.NodeUserName()}, protectQueryWithoutTarget, s.maxSpans, s.maxBytes, len(r.DeprecatedSpans), - r.ID, r.Timestamp.AsOfSystemTime(), + r.ID, r.Timestamp.WithSynthetic(false).AsOfSystemTime(), r.MetaType, meta, len(r.DeprecatedSpans), encodedSpans) if err != nil { @@ -177,7 +177,7 @@ func (p *storage) Protect(ctx context.Context, txn *kv.Txn, r *ptpb.Record) erro sessiondata.InternalExecutorOverride{User: username.NodeUserName()}, protectQuery, s.maxSpans, s.maxBytes, len(r.DeprecatedSpans), - r.ID, r.Timestamp.AsOfSystemTime(), + r.ID, r.Timestamp.WithSynthetic(false).AsOfSystemTime(), r.MetaType, meta, len(r.DeprecatedSpans), encodedTarget, encodedTarget) if err != nil { diff --git a/pkg/kv/kvserver/protectedts/ptstorage/storage_test.go b/pkg/kv/kvserver/protectedts/ptstorage/storage_test.go index be96b9d0aa60..09706e822b64 100644 --- a/pkg/kv/kvserver/protectedts/ptstorage/storage_test.go +++ b/pkg/kv/kvserver/protectedts/ptstorage/storage_test.go @@ -314,6 +314,35 @@ var testCases = []testCase{ }), }, }, + { + name: "Protect using synthetic timestamp", + ops: []op{ + funcOp(func(ctx context.Context, t *testing.T, tCtx *testContext) { + rec := newRecord(tCtx, tCtx.tc.Server(0).Clock().Now().WithSynthetic(true), "", nil, tableTarget(42), + tableSpan(42)) + err := tCtx.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + return tCtx.pts.Protect(ctx, txn, &rec) + }) + require.NoError(t, err) + // Synthetic should be reset when writing timestamps to make it + // compatible with underlying sql schema. + rec.Timestamp.Synthetic = false + tCtx.state.Records = append(tCtx.state.Records, rec) + tCtx.state.Version++ + tCtx.state.NumRecords++ + tCtx.state.NumSpans += uint64(len(rec.DeprecatedSpans)) + var encoded []byte + if tCtx.runWithDeprecatedSpans { + encoded, err = protoutil.Marshal(&ptstorage.Spans{Spans: rec.DeprecatedSpans}) + require.NoError(t, err) + } else { + encoded, err = protoutil.Marshal(&ptpb.Target{Union: rec.Target.GetUnion()}) + require.NoError(t, err) + } + tCtx.state.TotalBytes += uint64(len(encoded)) + }), + }, + }, } type testContext struct {