Skip to content

Commit

Permalink
changefeedccl: support inline results over pgwire
Browse files Browse the repository at this point in the history
    CREATE CHANGEFEED FOR <target> WITH <...>

Notably the `INTO <sink>` clause is omitted. This triggers a fairly
radical change in behavior. Instead of setting up a system.job to emit
to a sink in the background and returning immediately with the job ID,
the `CREATE CHANGEFEED` blocks forever and returns all changes as rows
directly over pgwire. The types of these rows are `(topic STRING, key
BYTES, value BYTES)` and they correspond exactly to what would be
emitted to a sink.

Because everything is funneled through the sql gateway and the client
connection dies if it dies, there's no reason to run this as a system
job. So we don't. This means the client is responsible for keeping track
of the latest resolved timestamp emitted and, on errors, reconnecting
with the `cursor=<timestamp>` option.

This also "happens" to be exactly the interface we need for most of the
unit tests, so switch them over to it. This allows the removal of the
timing based aspects of the tests, which should reduce some the flake
problems these tests have been having.

NB: The system still internally buffers results, so changes are not
returned immediately. Which means this is currently quite awkward and
surprising to use as a client of CockroachDB. The fix initially seems
like a decent amount of work so it'll wait for a followup PR. (The tests
work around it with a hack to set `ConnResultsBufferBytes` to a very
small value in the test servers.)

Closes #26661
Closes #26682

Release note: None
  • Loading branch information
danhhz committed Jun 18, 2018
1 parent c461ab7 commit 923ee38
Show file tree
Hide file tree
Showing 13 changed files with 517 additions and 338 deletions.
6 changes: 5 additions & 1 deletion docs/generated/sql/bnf/stmt_block.bnf
Original file line number Diff line number Diff line change
Expand Up @@ -908,7 +908,7 @@ create_sequence_stmt ::=
| 'CREATE' 'SEQUENCE' 'IF' 'NOT' 'EXISTS' sequence_name opt_sequence_option_list

create_changefeed_stmt ::=
'CREATE' 'CHANGEFEED' 'FOR' targets 'INTO' string_or_placeholder opt_with_options
'CREATE' 'CHANGEFEED' 'FOR' targets opt_changefeed_sink opt_with_options

statistics_name ::=
name
Expand Down Expand Up @@ -1255,6 +1255,10 @@ opt_sequence_option_list ::=
sequence_option_list
|

opt_changefeed_sink ::=
'INTO' string_or_placeholder
|

cte_list ::=
( common_table_expr ) ( ( ',' common_table_expr ) )*

Expand Down
14 changes: 6 additions & 8 deletions pkg/ccl/changefeedccl/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,15 @@ package changefeedccl

import (
"context"
"fmt"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/util/hlc"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand Down Expand Up @@ -61,13 +60,12 @@ func BenchmarkChangefeed(b *testing.B) {

b.ResetTimer()
for i := 0; i < b.N; i++ {
k := newTestKafkaProducer()
hookName := fmt.Sprintf(`%s-%d`, b.Name(), i)
testProducersHook[hookName] = k
resultsCh := make(chan tree.Datums, 1)

b.StartTimer()
cancelFeed := createBenchmarkChangefeed(ctx, s, feedClock, `d`, `bank`, `kafka://`+hookName)
for rows := 0; rows < numRows; rows += len(k.WaitUntilNewMessages()) {
cancelFeed := createBenchmarkChangefeed(ctx, s, feedClock, `d`, `bank`, resultsCh)
for rows := 0; rows < numRows; rows++ {
<-resultsCh
}
b.StopTimer()

Expand Down
53 changes: 22 additions & 31 deletions pkg/ccl/changefeedccl/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ import (
"net/url"
"time"

"github.com/Shopify/sarama"

"github.com/cockroachdb/cockroach/pkg/ccl/storageccl/engineccl"
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -75,7 +73,7 @@ func runChangefeedFlow(
execCfg *sql.ExecutorConfig,
details jobspb.ChangefeedDetails,
progress jobspb.ChangefeedProgress,
startedCh chan<- tree.Datums,
resultsCh chan<- tree.Datums,
progressedFn func(context.Context, jobs.ProgressedFn) error,
) error {
details, err := validateChangefeed(details)
Expand Down Expand Up @@ -103,7 +101,7 @@ func runChangefeedFlow(
// TODO(dan): Make this into a DistSQL flow.
changedKVsFn := exportRequestPoll(execCfg, details, progress)
rowsFn := kvsToRows(execCfg, details, changedKVsFn)
emitRowsFn, closeFn, err := emitRows(details, jobProgressedFn, rowsFn)
emitRowsFn, closeFn, err := emitRows(details, jobProgressedFn, rowsFn, resultsCh)
if err != nil {
return err
}
Expand All @@ -113,14 +111,6 @@ func runChangefeedFlow(
}
}()

// We abuse the job's results channel to make CREATE CHANGEFEED wait for
// this before returning to the user to ensure the setup went okay. Job
// resumption doesn't have the same hack, but at the moment ignores results
// and so is currently okay. Return nil instead of anything meaningful so
// that if we start doing anything with the results returned by resumed
// jobs, then it breaks instead of returning nonsense.
startedCh <- tree.Datums(nil)

for {
if err := emitRowsFn(ctx); err != nil {
return err
Expand Down Expand Up @@ -291,22 +281,33 @@ func emitRows(
details jobspb.ChangefeedDetails,
jobProgressedFn func(context.Context, hlc.Timestamp) error,
inputFn func(context.Context) ([]emitRow, error),
resultsCh chan<- tree.Datums,
) (emitFn func(context.Context) error, closeFn func() error, err error) {
var kafkaTopicPrefix string
var producer sarama.SyncProducer
var sink Sink

sinkURI, err := url.Parse(details.SinkURI)
if err != nil {
return nil, nil, err
}
switch sinkURI.Scheme {
case sinkSchemeChannel:
sink = &channelSink{resultsCh: resultsCh}
closeFn = sink.Close
case sinkSchemeKafka:
kafkaTopicPrefix = sinkURI.Query().Get(sinkParamTopicPrefix)
producer, err = getKafkaProducer(sinkURI.Host)
kafkaTopicPrefix := sinkURI.Query().Get(sinkParamTopicPrefix)
sink, err = getKafkaSink(kafkaTopicPrefix, sinkURI.Host)
if err != nil {
return nil, nil, err
}
closeFn = producer.Close
closeFn = sink.Close

// We abuse the job's results channel to make CREATE CHANGEFEED wait for
// this before returning to the user to ensure the setup went okay. Job
// resumption doesn't have the same hack, but at the moment ignores results
// and so is currently okay. Return nil instead of anything meaningful so
// that if we start doing anything with the results returned by resumed
// jobs, then it breaks instead of returning nonsense.
resultsCh <- tree.Datums(nil)
default:
return nil, nil, errors.Errorf(`unsupported sink: %s`, sinkURI.Scheme)
}
Expand Down Expand Up @@ -351,26 +352,16 @@ func emitRows(
log.Infof(ctx, `row %s -> %s`, key.String(), value.String())
}

message := &sarama.ProducerMessage{
Topic: kafkaTopicPrefix + input.tableDesc.Name,
Key: sarama.ByteEncoder(key.Bytes()),
Value: sarama.ByteEncoder(value.Bytes()),
}
if _, _, err := producer.SendMessage(message); err != nil {
return errors.Wrapf(err, `sending message to kafka topic %s`, message.Topic)
topic := input.tableDesc.Name
if err := sink.EmitRow(ctx, topic, key.Bytes(), value.Bytes()); err != nil {
return err
}
}
if input.resolved != (hlc.Timestamp{}) {
if err := jobProgressedFn(ctx, input.resolved); err != nil {
return err
}

// TODO(dan): HACK for testing. We call SendMessages with nil to
// indicate to the test that a full poll finished. Figure out
// something better.
if err := producer.SendMessages(nil); err != nil {
return err
}
// TODO(dan): Emit resolved timestamps to the sink.
}
}
return nil
Expand Down
53 changes: 43 additions & 10 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const (
optEnvelopeKeyOnly envelopeType = `key_only`
optEnvelopeRow envelopeType = `row`

sinkSchemeChannel = ``
sinkSchemeKafka = `kafka`
sinkParamTopicPrefix = `topic_prefix`
)
Expand All @@ -57,19 +58,39 @@ func changefeedPlanHook(
return nil, nil, nil, nil
}

sinkURIFn, err := p.TypeAsString(changefeedStmt.SinkURI, `CREATE CHANGEFEED`)
if err != nil {
return nil, nil, nil, err
var sinkURIFn func() (string, error)
var header sqlbase.ResultColumns
unspecifiedSink := changefeedStmt.SinkURI == nil
if unspecifiedSink {
// An unspecified sink triggers a fairly radical change in behavior.
// Instead of setting up a system.job to emit to a sink in the
// background and returning immediately with the job ID, the `CREATE
// CHANGEFEED` blocks forever and returns all changes as rows directly
// over pgwire. The types of these rows are `(topic STRING, key BYTES,
// value BYTES)` and they correspond exactly to what would be emitted to
// a sink.
sinkURIFn = func() (string, error) { return ``, nil }
header = sqlbase.ResultColumns{
{Name: "table", Typ: types.String},
{Name: "key", Typ: types.Bytes},
{Name: "value", Typ: types.Bytes},
}
} else {
var err error
sinkURIFn, err = p.TypeAsString(changefeedStmt.SinkURI, `CREATE CHANGEFEED`)
if err != nil {
return nil, nil, nil, err
}
header = sqlbase.ResultColumns{
{Name: "job_id", Typ: types.Int},
}
}

optsFn, err := p.TypeAsStringOpts(changefeedStmt.Options, changefeedOptionExpectValues)
if err != nil {
return nil, nil, nil, err
}

header := sqlbase.ResultColumns{
{Name: "job_id", Typ: types.Int},
}
fn := func(ctx context.Context, _ []sql.PlanNode, resultsCh chan<- tree.Datums) error {
ctx, span := tracing.ChildSpan(ctx, stmt.StatementTag())
defer tracing.FinishSpan(span)
Expand All @@ -78,6 +99,11 @@ func changefeedPlanHook(
if err != nil {
return err
}
if !unspecifiedSink && sinkURI == `` {
// Error if someone specifies an INTO with the empty string. We've
// already sent the wrong result column headers.
return errors.New(`omit the SINK clause for inline results`)
}

opts, err := optsFn()
if err != nil {
Expand Down Expand Up @@ -118,6 +144,15 @@ func changefeedPlanHook(
Opts: opts,
SinkURI: sinkURI,
}
progress := jobspb.ChangefeedProgress{
Highwater: highwater,
}

if details.SinkURI == `` {
return runChangefeedFlow(
ctx, p.ExecCfg(), details, progress, resultsCh, nil, /* progressedFn */
)
}

// Make a channel for runChangefeedFlow to signal once everything has
// been setup okay. This intentionally abuses what would normally be
Expand All @@ -132,10 +167,8 @@ func changefeedPlanHook(
}
return sqlDescIDs
}(),
Details: details,
Progress: jobspb.ChangefeedProgress{
Highwater: highwater,
},
Details: details,
Progress: progress,
})
if err != nil {
return err
Expand Down
Loading

0 comments on commit 923ee38

Please sign in to comment.