Skip to content

Commit

Permalink
changefeedccl: immediately stop sending webhook sink rows upon error
Browse files Browse the repository at this point in the history
Previously, the sink waited until flushing to acknowledge HTTP errors, leaving
any messages between the initial error and flush to potentially be out of
order. Now, errors are checked before each message is sent and the sink is
restarted if one is detected to maintain ordering.

Resolves #67772

Release note: None
  • Loading branch information
spiffyy99 committed Jul 23, 2021
1 parent 9fe684a commit 7659eb2
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 44 deletions.
38 changes: 35 additions & 3 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4011,7 +4011,7 @@ func TestChangefeedBackfillCheckpoint(t *testing.T) {

rnd, _ := randutil.NewPseudoRand()

var maxCheckopointSize int64
var maxCheckpointSize int64
testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, `CREATE TABLE foo(key INT PRIMARY KEY DEFAULT unique_rowid(), val INT)`)
Expand Down Expand Up @@ -4055,7 +4055,7 @@ func TestChangefeedBackfillCheckpoint(t *testing.T) {
changefeedbase.FrontierCheckpointFrequency.Override(
context.Background(), &f.Server().ClusterSettings().SV, 10*time.Millisecond)
changefeedbase.FrontierCheckpointMaxBytes.Override(
context.Background(), &f.Server().ClusterSettings().SV, maxCheckopointSize)
context.Background(), &f.Server().ClusterSettings().SV, maxCheckpointSize)

registry := f.Server().JobRegistry().(*jobs.Registry)
foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH resolved='100ms'`)
Expand Down Expand Up @@ -4138,7 +4138,7 @@ func TestChangefeedBackfillCheckpoint(t *testing.T) {

// TODO(ssd): Tenant testing disabled because of use of DB()
for _, sz := range []int64{100 << 20, 100} {
maxCheckopointSize = sz
maxCheckpointSize = sz
t.Run(fmt.Sprintf("enterprise-limit=%s", humanize.Bytes(uint64(sz))), enterpriseTest(testFn, feedTestNoTenants))
t.Run(fmt.Sprintf("cloudstorage-limit=%s", humanize.Bytes(uint64(sz))), cloudStorageTest(testFn, feedTestNoTenants))
t.Run(fmt.Sprintf("kafka-limit=%s", humanize.Bytes(uint64(sz))), kafkaTest(testFn, feedTestNoTenants))
Expand Down Expand Up @@ -4196,3 +4196,35 @@ func TestCheckpointFrequency(t *testing.T) {
require.Equal(t, completionTime, js.lastProgressUpdate)
require.False(t, js.progressUpdatesSkipped)
}

func TestChangefeedOrderingWithErrors(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)

foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH updated`)
webhookFoo := foo.(*webhookFeed)
// retry, then fail, then restart changefeed and successfully send messages
webhookFoo.mockSink.SetStatusCodes(append(repeatStatusCode(
http.StatusInternalServerError,
defaultRetryConfig().MaxRetries+1),
[]int{http.StatusOK, http.StatusOK, http.StatusOK}...))
defer closeFeed(t, foo)

sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'a')`)
sqlDB.Exec(t, `UPSERT INTO foo VALUES (1, 'b')`)
sqlDB.Exec(t, `DELETE FROM foo WHERE a = 1`)
assertPayloadsPerKeyOrderedStripTs(t, foo, []string{
`foo: [1]->{"after": {"a": 1, "b": "a"}}`,
`foo: [1]->{"after": {"a": 1, "b": "b"}}`,
`foo: [1]->{"after": null}`,
})
}

// only used for webhook sink for now since it's the only testfeed where
// we can control the ordering of errors
t.Run(`webhook`, webhookTest(testFn))
}
61 changes: 46 additions & 15 deletions pkg/ccl/changefeedccl/sink_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,13 @@ func encodePayloadWebhook(value []byte) ([]byte, error) {
}

type webhookSink struct {
ctx context.Context
workerCtx context.Context
url sinkURL
authHeader string
parallelism int
client *httputil.Client
workerGroup ctxgroup.Group
cancelFunc func()
exitWorkers func()
eventsChans []chan []byte
inflight *inflightTracker
retryCfg retry.Options
Expand Down Expand Up @@ -135,9 +135,9 @@ func makeWebhookSink(
ctx, cancel := context.WithCancel(ctx)

sink := &webhookSink{
ctx: ctx,
workerCtx: ctx,
authHeader: opts[changefeedbase.OptWebhookAuthHeader],
cancelFunc: cancel,
exitWorkers: cancel,
parallelism: parallelism,
retryCfg: retryOptions,
}
Expand Down Expand Up @@ -229,7 +229,7 @@ func defaultWorkerCount() int {

func (s *webhookSink) setupWorkers() {
s.eventsChans = make([]chan []byte, s.parallelism)
s.workerGroup = ctxgroup.WithContext(s.ctx)
s.workerGroup = ctxgroup.WithContext(s.workerCtx)
for i := 0; i < s.parallelism; i++ {
s.eventsChans[i] = make(chan []byte)
j := i
Expand All @@ -240,19 +240,21 @@ func (s *webhookSink) setupWorkers() {
}
}

// TODO (ryan min): Address potential ordering issue where errored message can
// be followed by successful messages. Solution is to immediately stop sending
// messages upon receiving a single error.
func (s *webhookSink) workerLoop(workerCh chan []byte) {
for {
select {
case <-s.ctx.Done():
case <-s.workerCtx.Done():
return
case msg := <-workerCh:
err := s.sendMessageWithRetries(s.ctx, msg)
err := s.sendMessageWithRetries(s.workerCtx, msg)
s.inflight.maybeSetError(err)
// reduce inflight count by one and reduce memory counter
s.inflight.FinishRequest(s.ctx, int64(len(msg)))
s.inflight.FinishRequest(s.workerCtx, int64(len(msg)))
// shut down all other workers immediately if error encountered
if err != nil {
s.exitWorkers()
return
}
}
}
}
Expand Down Expand Up @@ -347,6 +349,17 @@ func (i *inflightTracker) maybeSetError(err error) {
}
}

// hasError checks if inflightTracker has an error on the buffer and returns
// error if exists.
func (i *inflightTracker) hasError() error {
var err error
select {
case err = <-i.errChan:
default:
}
return err
}

// StartRequest enqueues one inflight message to be flushed.
func (i *inflightTracker) StartRequest(ctx context.Context, bytes int64) error {
i.flushMu.Lock()
Expand Down Expand Up @@ -398,15 +411,23 @@ func (s *webhookSink) EmitRow(
return err
}

// check if error has been encountered and exit if needed
err = s.inflight.hasError()
if err != nil {
return err
}

err = s.inflight.StartRequest(ctx, int64(len(j)))
if err != nil {
return err
}

select {
// check the webhook sink context in case workers have been terminated
case <-s.workerCtx.Done():
return s.workerCtx.Err()
case <-ctx.Done():
return ctx.Err()
// Errors resulting from sending the message will be expressed in Flush.
case s.eventsChans[s.workerIndex(key)] <- j:
}
return nil
Expand All @@ -420,6 +441,16 @@ func (s *webhookSink) EmitResolvedTimestamp(
return err
}

select {
// check the webhook sink context in case workers have been terminated
case <-s.workerCtx.Done():
return s.workerCtx.Err()
// non-blocking check for error, restart changefeed if encountered
case <-s.inflight.errChan:
return err
default:
}

err = s.inflight.StartRequest(ctx, int64(len(j)))
if err != nil {
return err
Expand All @@ -431,18 +462,18 @@ func (s *webhookSink) EmitResolvedTimestamp(
err = s.sendMessageWithRetries(ctx, j)
s.inflight.maybeSetError(err)
s.inflight.FinishRequest(ctx, int64(len(j)))
return nil
return err
}

func (s *webhookSink) Flush(ctx context.Context) error {
return s.inflight.Wait(ctx)
}

func (s *webhookSink) Close() error {
s.cancelFunc()
s.exitWorkers()
// ignore errors here since we're closing the sink anyway
_ = s.workerGroup.Wait()
s.inflight.Close(s.ctx)
s.inflight.Close(s.workerCtx)
for _, eventsChan := range s.eventsChans {
close(eventsChan)
}
Expand Down
Loading

0 comments on commit 7659eb2

Please sign in to comment.