-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
changefeedccl: support a CSV format for changefeeds #79853
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copy/pasting from slack for posterity: My thinking here is that resolved and diff don't make any sense as options for initial_scan=only anyway--we won't emit resolved timestamps and the before in initial_scan mode is meaningless. So disallow them for initial_scan=only, which in turn means we don't have to figure out how to encode them in csv right now. Then later if we need to support those options for csv, we can figure out an encoding scheme.
Reviewed 2 of 9 files at r1, all commit messages.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @sherman-grewal and @stevendanna)
pkg/ccl/changefeedccl/encoder.go, line 373 at r1 (raw file):
} type csvEncoder struct {
Format number 3 (ish) strikes me as the right moment to break out different encoders into their own files.
pkg/ccl/changefeedccl/encoder.go, line 409 at r1 (raw file):
include := make(map[descpb.ColumnID]struct{}, len(family.ColumnIDs)) var yes struct{} for _, colID := range family.ColumnIDs {
Could take advantage of the fact that this is restricted to initial_scan=only
to cache this map--we shouldn't be seeing multiple versions of a table. (There might be 2? Not 3.)
pkg/ccl/changefeedccl/encoder.go, line 413 at r1 (raw file):
} var csvRow []string
Is there a way to do this without building this slice? Does the csv writer support something like writer.WriteValue(tree.AsString(datum.Datum))
and then writer.EndOfRow()
after we're done?
pkg/ccl/changefeedccl/encoder.go, line 439 at r1 (raw file):
_ context.Context, _ string, resolved hlc.Timestamp, ) ([]byte, error) { return nil, nil
Probably better to error here.
pkg/ccl/changefeedccl/changefeed_test.go, line 5868 at r1 (raw file):
sqlDB.Exec(t, "INSERT INTO foo VALUES (4, 'Doug'), (5, 'Elaine'), (6, 'Fred')") expectedMessages := []string{
So we're not expected to emit a header message, I take it? It'd be a little tricky but we could.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is very nice. Few nits/questions...
1d800a3
to
ee566a1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @HonoreDB, @miretskiy, @sherman-grewal, and @stevendanna)
pkg/ccl/changefeedccl/encoder.go, line 373 at r1 (raw file):
Previously, HonoreDB (Aaron Zinger) wrote…
Format number 3 (ish) strikes me as the right moment to break out different encoders into their own files.
yeah I totally agree - I added a new files for the three encoders. Thanks for the suggestion 😄
pkg/ccl/changefeedccl/encoder.go, line 409 at r1 (raw file):
Previously, HonoreDB (Aaron Zinger) wrote…
Could take advantage of the fact that this is restricted to
initial_scan=only
to cache this map--we shouldn't be seeing multiple versions of a table. (There might be 2? Not 3.)
Yeah I added a mechanism to cache the map (which is then rebuilt if we see a new version number). If we see another version, does it mean that we won't see rows with the previous version number? If we can still see both version numbers, I was considering storing the cache for each version we see (which is at most 2). Let me know what you think.
pkg/ccl/changefeedccl/encoder.go, line 413 at r1 (raw file):
Previously, HonoreDB (Aaron Zinger) wrote…
Is there a way to do this without building this slice? Does the csv writer support something like
writer.WriteValue(tree.AsString(datum.Datum))
and thenwriter.EndOfRow()
after we're done?
As mentioned on slack, there is no way of building this without a slice if we want to use the CSV writer package. I added a TODO to keep a lookout of the performance of CSV encoding for large rows - if we incur a significant loss in performance we can look into building our own CSV writer that writes values directly into the buffer rather than storing them in an array and then writing the entire array to the buffer.
pkg/ccl/changefeedccl/encoder.go, line 439 at r1 (raw file):
Previously, HonoreDB (Aaron Zinger) wrote…
Probably better to error here.
Done - Thanks for the suggestion!
pkg/ccl/changefeedccl/changefeed_test.go, line 5868 at r1 (raw file):
Previously, HonoreDB (Aaron Zinger) wrote…
So we're not expected to emit a header message, I take it? It'd be a little tricky but we could.
As mentioned on slack, we decided to not emit a header for a few reasons
- The EXPORT statement does not do this
- We need to establish the product requirements for the behaviour of header messages in file-based sinks (e.g. do we re-emit the header message for each new file we create? or is it done once?)
- We need to establish the behaviour of re-emitting the header message if the changefeed was paused and then unpaused
I think that once we iron these issues out we can provide an option to include a header message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 3 of 11 files at r2, all commit messages.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @HonoreDB, @miretskiy, @sherman-grewal, and @stevendanna)
pkg/ccl/changefeedccl/encoder.go, line 409 at r1 (raw file):
Previously, sherman-grewal (Sherman) wrote…
Yeah I added a mechanism to cache the map (which is then rebuilt if we see a new version number). If we see another version, does it mean that we won't see rows with the previous version number? If we can still see both version numbers, I was considering storing the cache for each version we see (which is at most 2). Let me know what you think.
I'd just store the cache for each version. If it's possible to see multiple versions it's possible to see them out of order.
pkg/ccl/changefeedccl/encoder_avro.go, line 131 at r2 (raw file):
// Get the raw SQL-formatted string for a table name // and apply full_table_name and avro_schema_prefix options func (e *confluentAvroEncoder) rawTableName(
Oh man did I really not delete this function when I was doing the topic refactor? Whoops. This should just be a call to TopicNamer.Name but I can do that in a separate PR.
ee566a1
to
74881ab
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @HonoreDB, @miretskiy, @sherman-grewal, and @stevendanna)
pkg/ccl/changefeedccl/encoder.go, line 409 at r1 (raw file):
Previously, HonoreDB (Aaron Zinger) wrote…
I'd just store the cache for each version. If it's possible to see multiple versions it's possible to see them out of order.
Done
pkg/ccl/changefeedccl/encoder_avro.go, line 131 at r2 (raw file):
Previously, HonoreDB (Aaron Zinger) wrote…
Oh man did I really not delete this function when I was doing the topic refactor? Whoops. This should just be a call to TopicNamer.Name but I can do that in a separate PR.
ahh ok sounds good, thanks for catching that!!
cf8232d
to
45e5c77
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @HonoreDB, @miretskiy, @sherman-grewal, and @stevendanna)
pkg/ccl/changefeedccl/encoder_csv.go, line 91 at r3 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
At the very least, we should memoize this array; store csvRow array in the encoder struct, initialize it with
csvRow: make([]string, 0, len(row.tableDesc.PublicColumns()))
and then zero it out here:csvRow = csvRow[:0]
I completely agree, I implemented memoization in this array in my latest commit. Thanks again for the suggestions 👍
pkg/ccl/changefeedccl/sink_webhook.go, line 137 at r3 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
looks like we are not updating some of the fields in the result -- such as mvcc timestamp.
ahh yes - thanks for catching this! 🙌
@sherman-grewal are you waiting for additional comments? |
I'm giving it 👍 ; let's see if @HonoreDB has more comments though before merge. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM after one comment is addressed, I'll pre-emptively approve it too.
Reviewed 2 of 9 files at r1, 6 of 11 files at r2, 2 of 2 files at r4, all commit messages.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @HonoreDB, @miretskiy, @sherman-grewal, and @stevendanna)
pkg/ccl/changefeedccl/changefeed_test.go
line 5870 at r4 (raw file):
`initial scan only with csv`: `CREATE CHANGEFEED FOR foo WITH initial_scan_only, format = csv`, `initial backfill only with csv`: `CREATE CHANGEFEED FOR foo WITH initial_scan = 'only', format = csv`, }
Mind also adding a test with one changefeed on two different tables? It'll be one encoder for both tables so there can be bugs there.
In this PR, we introduce a new CSV format for changefeeds. Note that this format is only supported with the initial_scan='only' option. For instance, one can now execute: CREATE CHANGEFEED FOR foo WITH format=csv, initial_scan='only' Release note (enterprise change): Support a CSV format for changefeeds. Only works with initial_scan='only', and does not work with diff/resolved options.
45e5c77
to
f7ca7b1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, thanks for the reviews!
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @HonoreDB, @miretskiy, @sherman-grewal, and @stevendanna)
bors r+ |
Build failed (retrying...): |
Build failed (retrying...): |
Build succeeded: |
blathers backport 22.1 |
Encountered an error creating backports. Some common things that can go wrong:
You might need to create your backport manually using the backport tool. error creating merge commit from f7ca7b1 to blathers/backport-release-22.1-79853: POST https://api.github.com/repos/cockroachdb/cockroach/merges: 409 Merge conflict [] you may need to manually resolve merge conflicts with the backport tool. Backport to branch 22.1 failed. See errors above. 🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is otan. |
In this PR, we introduce a new CSV format for changefeeds.
Note that this format is only supported with the
initial_scan='only' option. For instance, one can now
execute:
CREATE CHANGEFEED FOR foo WITH format=csv, initial_scan='only';
Release note (enterprise change): Support a CSV format for
changefeeds, with the intention of supporting use cases
similar to EXPORT. Only works with initial_scan='only',
and does not work with diff/resolved options.