Skip to content

Commit

Permalink
Merge #126404
Browse files Browse the repository at this point in the history
126404: crosscluster/logical: prevent data looping via OriginID session variable r=dt a=msbutler

Previously, LDR would prevent data looping by spinning up rangefeeds with
filtering. This big hammer prevented LDR replicated data from appearing in
destination side changefeeds.

This patch replaces this data loop prevention strategy by 1) binding an
OriginID of 1 to the MVCCValueHeader of each replicated KV during ingestion; 2)
filtering these KVs with their OriginID value when these replicated KVs appear
as LDR source side rangefeed events.

To implement 1), the Internal Execetor in the LDR row processor now sets the
OriginIDForLogicalDataReplication session variable to 1, which has the effect
of binding OriginID=1 to each batch request header created by the
InternalExecutor's write queries. The request header value will be plumbed to
each KV's Value header in the kv layer.

To implement 2), source side rangefeeds are now initialized with the
WithEmitMatchingOriginIDs option, causing rangefeeds to only emit local writes,
with OriginID=0.

Note that a similar ingestion side plumbing strategy will be used for
OriginTimestamp even though each ingested row may have a different timestamp.
We can still bind the OriginTimestamp to the Internal Executor session before
each query because 1) each IE query creates a new session; 2) we do not plan to
use multi row insert statements during LDR ingestion via sql.

Fixes #126253

Release note: none

Co-authored-by: Michael Butler <[email protected]>
  • Loading branch information
craig[bot] and msbutler committed Jul 1, 2024
2 parents 4ff422e + c7b720c commit 053bc17
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -596,17 +596,6 @@ func (t *txnBatch) HandleBatch(
} else {
var txnStats batchStats
err = t.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
// Note that we cannot use the DisableChangefeedReplication override
// option in LWW row processor because it only affects new txns, and
// we already have one.
// TODO(ssd): For now, we SetOmitInRangefeeds to
// prevent the data from being emitted back to the source.
// However, I don't think we want to do this in the long run.
// Rather, we want to store the inbound cluster ID and store that
// in a way that allows us to choose to filter it out from or not.
// Doing it this way means that you can't choose to run CDC just from
// one side and not the other.
txn.KV().SetOmitInRangefeeds()
txnStats = batchStats{}
for _, kv := range batch {
rowStats, err := t.rp.ProcessRow(ctx, txn, kv.KeyValue, kv.PrevValue)
Expand Down
24 changes: 16 additions & 8 deletions pkg/ccl/crosscluster/logical/lww_row_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,16 +202,24 @@ func (lww *sqlLastWriteWinsRowProcessor) ProcessRow(
var (
implicitTxnOverrides = sessiondata.InternalExecutorOverride{
AttributeToUser: true,
// TODO(ssd): we do this for now to prevent the data from being emitted back
// to the source. However, I don't think we want to do this in the long run.
// Rather, we want to store the inbound cluster ID and store that in a way
// that allows us to choose to filter it out from or not. Doing it this way
// means that you can't choose to run CDC just from one side and not the
// other.
DisableChangefeedReplication: true,

// The OriginIDForLogicalDataReplication session variable will bind the
// origin ID 1 to each per-statement batch request header sent by the
// internal executor. This metadata will be plumbed to the MVCCValueHeader
// of each written kv, and will be used by source side rangefeeds to filter
// these replicated events, preventing data looping.
//
// Note that a similar ingestion side plumbing strategy will be used for
// OriginTimestamp even though each ingested row may have a different
// timestamp. We can still bind the OriginTimestamp to the Internal Executor
// session before each query because 1) each IE query creates a new session;
// 2) we do not plan to use multi row insert statements during LDR ingestion
// via sql.
OriginIDForLogicalDataReplication: 1,
}
explicitTxnOverrides = sessiondata.InternalExecutorOverride{
AttributeToUser: true,
AttributeToUser: true,
OriginIDForLogicalDataReplication: 1,
}
)

Expand Down
7 changes: 6 additions & 1 deletion pkg/ccl/crosscluster/producer/event_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,18 @@ func (s *eventStream) Start(ctx context.Context, txn *kv.Txn) (retErr error) {
rangefeed.WithOnDeleteRange(s.onDeleteRange),
rangefeed.WithFrontierQuantized(quantize.Get(&s.execCfg.Settings.SV)),
rangefeed.WithOnValues(s.onValues),
rangefeed.WithFiltering(s.spec.WithFiltering),
rangefeed.WithDiff(s.spec.WithDiff),
rangefeed.WithInvoker(func(fn func() error) error { return fn() }),
}
if emitMetadata.Get(&s.execCfg.Settings.SV) {
opts = append(opts, rangefeed.WithOnMetadata(s.onMetadata))
}
if s.spec.Type == streampb.ReplicationType_LOGICAL {
// To prevent data looping during Logical Replication, only emit events that
// were written by the foreground workload, not from the LDR replication
// stream.
opts = append(opts, rangefeed.WithOriginIDsMatching(0))
}

initialTimestamp := s.spec.InitialScanTimestamp
s.frontier, err = span.MakeFrontier(s.spec.Spans...)
Expand Down

0 comments on commit 053bc17

Please sign in to comment.