Skip to content

Commit

Permalink
streamingccl: support sst event in random stream client
Browse files Browse the repository at this point in the history
Previously random stream client lacks test coverage
for AddSSTable operation, this PR enables random stream
client to generate and validate random SSTableEvents.

This PR also cleans up the InterceptableStreamClient
to avoid unnecessary abstraction, making code cleaner.

Release note : None

Release justification: low risk, high benefit changes
to existing functionality
  • Loading branch information
gh-casper committed Aug 24, 2022
1 parent b5b33c8 commit 7a8f501
Show file tree
Hide file tree
Showing 12 changed files with 308 additions and 203 deletions.
12 changes: 6 additions & 6 deletions pkg/ccl/streamingccl/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type Event interface {

// GetResolvedSpans returns a list of span-time pairs indicating the time for
// which all KV events within that span has been emitted.
GetResolvedSpans() *[]jobspb.ResolvedSpan
GetResolvedSpans() []jobspb.ResolvedSpan
}

// kvEvent is a key value pair that needs to be ingested.
Expand Down Expand Up @@ -81,7 +81,7 @@ func (kve kvEvent) GetDeleteRange() *roachpb.RangeFeedDeleteRange {
}

// GetResolvedSpans implements the Event interface.
func (kve kvEvent) GetResolvedSpans() *[]jobspb.ResolvedSpan {
func (kve kvEvent) GetResolvedSpans() []jobspb.ResolvedSpan {
return nil
}

Expand Down Expand Up @@ -111,7 +111,7 @@ func (sste sstableEvent) GetDeleteRange() *roachpb.RangeFeedDeleteRange {
}

// GetResolvedSpans implements the Event interface.
func (sste sstableEvent) GetResolvedSpans() *[]jobspb.ResolvedSpan {
func (sste sstableEvent) GetResolvedSpans() []jobspb.ResolvedSpan {
return nil
}

Expand Down Expand Up @@ -143,7 +143,7 @@ func (dre delRangeEvent) GetDeleteRange() *roachpb.RangeFeedDeleteRange {
}

// GetResolvedSpans implements the Event interface.
func (dre delRangeEvent) GetResolvedSpans() *[]jobspb.ResolvedSpan {
func (dre delRangeEvent) GetResolvedSpans() []jobspb.ResolvedSpan {
return nil
}

Expand Down Expand Up @@ -178,8 +178,8 @@ func (ce checkpointEvent) GetDeleteRange() *roachpb.RangeFeedDeleteRange {
}

// GetResolvedSpans implements the Event interface.
func (ce checkpointEvent) GetResolvedSpans() *[]jobspb.ResolvedSpan {
return &ce.resolvedSpans
func (ce checkpointEvent) GetResolvedSpans() []jobspb.ResolvedSpan {
return ce.resolvedSpans
}

// MakeKVEvent creates an Event from a KV.
Expand Down
10 changes: 4 additions & 6 deletions pkg/ccl/streamingccl/streamclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,6 @@ func TestGetFirstActiveClient(t *testing.T) {
defer func() {
require.NoError(t, client.Close(context.Background()))
}()
interceptable, ok := client.(InterceptableStreamClient)
require.True(t, ok)

streamAddresses := []string{
"randomgen://test0/",
Expand All @@ -142,7 +140,7 @@ func TestGetFirstActiveClient(t *testing.T) {
}

// Track dials and error for all but test3 and test4
interceptable.RegisterDialInterception(func(streamURL *url.URL) error {
client.RegisterDialInterception(func(streamURL *url.URL) error {
addr := streamURL.String()
addressDialCount[addr]++
if addr != streamAddresses[3] && addr != streamAddresses[4] {
Expand All @@ -151,7 +149,7 @@ func TestGetFirstActiveClient(t *testing.T) {
return nil
})

client, err := GetFirstActiveClient(context.Background(), streamAddresses)
activeClient, err := GetFirstActiveClient(context.Background(), streamAddresses)
require.NoError(t, err)

// Should've dialed the valid schemes up to the 5th one where it should've
Expand All @@ -165,7 +163,7 @@ func TestGetFirstActiveClient(t *testing.T) {
require.Equal(t, 0, addressDialCount[streamAddresses[6]])

// The 5th should've succeded as it was a valid scheme and succeeded Dial
require.Equal(t, client.(*randomStreamClient).streamURL.String(), streamAddresses[4])
require.Equal(t, activeClient.(*RandomStreamClient).streamURL.String(), streamAddresses[4])
}

// ExampleClientUsage serves as documentation to indicate how a stream
Expand Down Expand Up @@ -243,7 +241,7 @@ func ExampleClient() {
case streamingccl.CheckpointEvent:
ingested.Lock()
minTS := hlc.MaxTimestamp
for _, rs := range *event.GetResolvedSpans() {
for _, rs := range event.GetResolvedSpans() {
if rs.Timestamp.Less(minTS) {
minTS = rs.Timestamp
}
Expand Down
Loading

0 comments on commit 7a8f501

Please sign in to comment.