Skip to content

Commit

Permalink
streamingccl: support DeleteRange in tenant stream replication
Browse files Browse the repository at this point in the history
This PR supports DeleteRange operation both in producer that
process DelRange from rangefeed and in stream ingestion
processor that ingest DelRange as SST into destination.

For version >= 22.2, SSTs can also contain range tombstones,
i.e., DelRange, this PR also supports ingesting them.

Ingestion processor uses a separate SST writer to ingest range
tombstones as SSTBatcher does not support adding MVCCRangeKeys.
yet.

This PR also cleans up the GenerationEvent that is no longer
a valid concept in the current consumer-tracked design.

Release note: None
  • Loading branch information
gh-casper committed Aug 3, 2022
1 parent b000151 commit dae6304
Show file tree
Hide file tree
Showing 19 changed files with 856 additions and 218 deletions.
2 changes: 2 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ ALL_TESTS = [
"//pkg/ccl/streamingccl/streamclient:streamclient_test",
"//pkg/ccl/streamingccl/streamingest:streamingest_test",
"//pkg/ccl/streamingccl/streamproducer:streamproducer_test",
"//pkg/ccl/streamingccl:streamingccl_test",
"//pkg/ccl/telemetryccl:telemetryccl_test",
"//pkg/ccl/testccl/sqlccl:sqlccl_test",
"//pkg/ccl/testccl/workload/schemachange:schemachange_test",
Expand Down Expand Up @@ -745,6 +746,7 @@ GO_TARGETS = [
"//pkg/ccl/streamingccl/streamproducer:streamproducer",
"//pkg/ccl/streamingccl/streamproducer:streamproducer_test",
"//pkg/ccl/streamingccl:streamingccl",
"//pkg/ccl/streamingccl:streamingccl_test",
"//pkg/ccl/telemetryccl:telemetryccl_test",
"//pkg/ccl/testccl/sqlccl:sqlccl_test",
"//pkg/ccl/testccl/workload/schemachange:schemachange_test",
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ func (f rawEventFeed) run(
ctx context.Context,
spans []kvcoord.SpanTimePair,
withDiff bool,
eventC chan<- *roachpb.RangeFeedEvent,
eventC chan<- kvcoord.RangeFeedMessage,
) error {
var startAfter hlc.Timestamp
for _, s := range spans {
Expand All @@ -414,7 +414,7 @@ func (f rawEventFeed) run(
f = f[i:]
for i := range f {
select {
case eventC <- &f[i]:
case eventC <- kvcoord.RangeFeedMessage{RangeFeedEvent: &f[i]}:
case <-ctx.Done():
return ctx.Err()
}
Expand Down
24 changes: 23 additions & 1 deletion pkg/ccl/streamingccl/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data")
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "streamingccl",
Expand All @@ -8,6 +8,7 @@ go_library(
"errors.go",
"event.go",
"settings.go",
"utils.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl",
visibility = ["//visibility:public"],
Expand All @@ -16,7 +17,28 @@ go_library(
"//pkg/jobs/jobspb",
"//pkg/roachpb",
"//pkg/settings",
"//pkg/storage",
"//pkg/streaming",
"//pkg/util/hlc",
"@com_github_cockroachdb_errors//:errors",
],
)

go_test(
name = "streamingccl_test",
srcs = ["utils_test.go"],
embed = [":streamingccl"],
deps = [
"//pkg/clusterversion",
"//pkg/roachpb",
"//pkg/settings/cluster",
"//pkg/storage",
"//pkg/testutils/storageutils",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/timeutil",
"@com_github_stretchr_testify//require",
],
)

Expand Down
87 changes: 57 additions & 30 deletions pkg/ccl/streamingccl/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,31 @@ const (
// SSTableEvent indicates that the SSTable field of an event holds an updated
// SSTable which needs to be ingested.
SSTableEvent
// DeleteRangeEvent indicates that the DeleteRange field of an event holds a
// DeleteRange which needs to be ingested.
DeleteRangeEvent
// CheckpointEvent indicates that GetResolvedSpans will be meaningful. The resolved
// timestamp indicates that all KVs have been emitted up to this timestamp.
CheckpointEvent
// GenerationEvent indicates that the stream should start ingesting with the
// updated topology.
GenerationEvent
)

// Event describes an event emitted by a cluster to cluster stream. Its Type
// field indicates which other fields are meaningful.
// TODO(casper): refactor this to use a protobuf message type that has one of
// union of event types below.
type Event interface {
// Type specifies which accessor will be meaningful.
Type() EventType

// GetKV returns a KV event if the EventType is KVEvent.
GetKV() *roachpb.KeyValue

// GetSSTable returns a SSTable event if the EventType is SSTable.
// GetSSTable returns a AddSSTable event if the EventType is SSTableEvent.
GetSSTable() *roachpb.RangeFeedSSTable

// GetDeleteRange returns a DeleteRange event if the EventType is DeleteRangeEvent.
GetDeleteRange() *roachpb.RangeFeedDeleteRange

// GetResolvedSpans returns a list of span-time pairs indicating the time for
// which all KV events within that span has been emitted.
GetResolvedSpans() *[]jobspb.ResolvedSpan
Expand Down Expand Up @@ -70,6 +75,11 @@ func (kve kvEvent) GetSSTable() *roachpb.RangeFeedSSTable {
return nil
}

// GetDeleteRange implements the Event interface.
func (kve kvEvent) GetDeleteRange() *roachpb.RangeFeedDeleteRange {
return nil
}

// GetResolvedSpans implements the Event interface.
func (kve kvEvent) GetResolvedSpans() *[]jobspb.ResolvedSpan {
return nil
Expand All @@ -95,66 +105,83 @@ func (sste sstableEvent) GetSSTable() *roachpb.RangeFeedSSTable {
return &sste.sst
}

// GetDeleteRange implements the Event interface.
func (sste sstableEvent) GetDeleteRange() *roachpb.RangeFeedDeleteRange {
return nil
}

// GetResolvedSpans implements the Event interface.
func (sste sstableEvent) GetResolvedSpans() *[]jobspb.ResolvedSpan {
return nil
}

var _ Event = sstableEvent{}

// checkpointEvent indicates that the stream has emitted every change for all
// keys in the span it is responsible for up until this timestamp.
type checkpointEvent struct {
resolvedSpans []jobspb.ResolvedSpan
// delRangeEvent is a DeleteRange event that needs to be ingested.
type delRangeEvent struct {
delRange roachpb.RangeFeedDeleteRange
}

var _ Event = checkpointEvent{}

// Type implements the Event interface.
func (ce checkpointEvent) Type() EventType {
return CheckpointEvent
func (dre delRangeEvent) Type() EventType {
return DeleteRangeEvent
}

// GetKV implements the Event interface.
func (ce checkpointEvent) GetKV() *roachpb.KeyValue {
func (dre delRangeEvent) GetKV() *roachpb.KeyValue {
return nil
}

// GetSSTable implements the Event interface.
func (ce checkpointEvent) GetSSTable() *roachpb.RangeFeedSSTable {
func (dre delRangeEvent) GetSSTable() *roachpb.RangeFeedSSTable {
return nil
}

// GetDeleteRange implements the Event interface.
func (dre delRangeEvent) GetDeleteRange() *roachpb.RangeFeedDeleteRange {
return &dre.delRange
}

// GetResolvedSpans implements the Event interface.
func (ce checkpointEvent) GetResolvedSpans() *[]jobspb.ResolvedSpan {
return &ce.resolvedSpans
func (dre delRangeEvent) GetResolvedSpans() *[]jobspb.ResolvedSpan {
return nil
}

// generationEvent indicates that the topology of the stream has changed.
type generationEvent struct{}
var _ Event = delRangeEvent{}

var _ Event = generationEvent{}
// checkpointEvent indicates that the stream has emitted every change for all
// keys in the span it is responsible for up until this timestamp.
type checkpointEvent struct {
resolvedSpans []jobspb.ResolvedSpan
}

var _ Event = checkpointEvent{}

// Type implements the Event interface.
func (ge generationEvent) Type() EventType {
return GenerationEvent
func (ce checkpointEvent) Type() EventType {
return CheckpointEvent
}

// GetKV implements the Event interface.
func (ge generationEvent) GetKV() *roachpb.KeyValue {
func (ce checkpointEvent) GetKV() *roachpb.KeyValue {
return nil
}

// GetSSTable implements the Event interface.
func (ge generationEvent) GetSSTable() *roachpb.RangeFeedSSTable {
func (ce checkpointEvent) GetSSTable() *roachpb.RangeFeedSSTable {
return nil
}

// GetResolvedSpans implements the Event interface.
func (ge generationEvent) GetResolvedSpans() *[]jobspb.ResolvedSpan {
// GetDeleteRange implements the Event interface.
func (ce checkpointEvent) GetDeleteRange() *roachpb.RangeFeedDeleteRange {
return nil
}

// GetResolvedSpans implements the Event interface.
func (ce checkpointEvent) GetResolvedSpans() *[]jobspb.ResolvedSpan {
return &ce.resolvedSpans
}

// MakeKVEvent creates an Event from a KV.
func MakeKVEvent(kv roachpb.KeyValue) Event {
return kvEvent{kv: kv}
Expand All @@ -165,12 +192,12 @@ func MakeSSTableEvent(sst roachpb.RangeFeedSSTable) Event {
return sstableEvent{sst: sst}
}

// MakeDeleteRangeEvent creates an Event from a DeleteRange.
func MakeDeleteRangeEvent(delRange roachpb.RangeFeedDeleteRange) Event {
return delRangeEvent{delRange: delRange}
}

// MakeCheckpointEvent creates an Event from a resolved timestamp.
func MakeCheckpointEvent(resolvedSpans []jobspb.ResolvedSpan) Event {
return checkpointEvent{resolvedSpans: resolvedSpans}
}

// MakeGenerationEvent creates an GenerationEvent.
func MakeGenerationEvent() Event {
return generationEvent{}
}
5 changes: 3 additions & 2 deletions pkg/ccl/streamingccl/streamclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,9 @@ func ExampleClient() {
case streamingccl.SSTableEvent:
sst := event.GetSSTable()
fmt.Printf("sst: %s->%s@%d\n", sst.Span.String(), string(sst.Data), sst.WriteTS.WallTime)
case streamingccl.DeleteRangeEvent:
delRange := event.GetDeleteRange()
fmt.Printf("delRange: %s@%d\n", delRange.Span.String(), delRange.Timestamp.WallTime)
case streamingccl.CheckpointEvent:
ingested.Lock()
minTS := hlc.MaxTimestamp
Expand All @@ -248,8 +251,6 @@ func ExampleClient() {
ingested.ts.Forward(minTS)
ingested.Unlock()
fmt.Printf("resolved %d\n", minTS.WallTime)
case streamingccl.GenerationEvent:
fmt.Printf("received generation event")
default:
panic(fmt.Sprintf("unexpected event type %v", event.Type()))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,13 @@ func parseEvent(streamEvent *streampb.StreamEvent) streamingccl.Event {
} else if len(streamEvent.Batch.KeyValues) > 0 {
event = streamingccl.MakeKVEvent(streamEvent.Batch.KeyValues[0])
streamEvent.Batch.KeyValues = streamEvent.Batch.KeyValues[1:]
} else if len(streamEvent.Batch.DelRanges) > 0 {
event = streamingccl.MakeDeleteRangeEvent(streamEvent.Batch.DelRanges[0])
streamEvent.Batch.DelRanges = streamEvent.Batch.DelRanges[1:]
}
if len(streamEvent.Batch.KeyValues) == 0 && len(streamEvent.Batch.Ssts) == 0 {
if len(streamEvent.Batch.KeyValues) == 0 &&
len(streamEvent.Batch.Ssts) == 0 &&
len(streamEvent.Batch.DelRanges) == 0 {
streamEvent.Batch = nil
}
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/streamingccl/streamingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ go_library(
"//pkg/sql/sem/tree",
"//pkg/sql/types",
"//pkg/storage",
"//pkg/storage/enginepb",
"//pkg/streaming",
"//pkg/util/ctxgroup",
"//pkg/util/hlc",
Expand Down Expand Up @@ -114,6 +115,7 @@ go_test(
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/testutils/storageutils",
"//pkg/testutils/testcluster",
"//pkg/util/hlc",
"//pkg/util/json",
Expand Down
Loading

0 comments on commit dae6304

Please sign in to comment.