Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka insert prep #18485

Merged
merged 10 commits into from
Aug 9, 2023
Merged

Kafka insert prep #18485

merged 10 commits into from
Aug 9, 2023

Conversation

elonazoulay
Copy link
Member

@elonazoulay elonazoulay commented Aug 1, 2023

Description

Extracted commits from #16828
These are preparatory commits laying the groundwork for implementing insert into kafka.

Additional context and related issues

Release notes

( ) This is not user-visible or docs only and no release notes are required.
( ) Release notes are required, please propose a release note for me.
( ) Release notes are required, with the following suggested text:

# Section
* Fix some things. ({issue}`issuenumber`)

@cla-bot cla-bot bot added the cla-signed label Aug 1, 2023
@elonazoulay elonazoulay requested a review from kokosing August 1, 2023 14:19
@github-actions github-actions bot added the docs label Aug 1, 2023
Copy link
Member

@kokosing kokosing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please improve PR title and description.

LGTM % comments

@@ -34,9 +34,9 @@ public DispatchingRowEncoderFactory(Map<String, RowEncoderFactory> factories)
this.factories = ImmutableMap.copyOf(requireNonNull(factories, "factories is null"));
}

public RowEncoder create(ConnectorSession session, String dataFormat, Optional<String> dataSchema, List<EncoderColumnHandle> columnHandles)
public RowEncoder create(ConnectorSession session, String dataFormat, Optional<String> dataSchema, List<EncoderColumnHandle> columnHandles, String topic, boolean isKey)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we extract some entity that would have things related to the row? That would have parameters like: String dataFormat, Optional<String> dataSchema, List<EncoderColumnHandle> columnHandles, String topic, boolean isKey. I don't want to create anything artificial, but something that represents some logical thing.

Also please use an enum instead of boolean.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated, lmk what you think when you have a chance. Thanks for all the suggestions!

Copy link
Contributor

@ssheikin ssheikin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@@ -31,9 +32,9 @@ public DispatchingRowDecoderFactory(Map<String, RowDecoderFactory> factories)
this.factories = ImmutableMap.copyOf(factories);
}

public RowDecoder create(String dataFormat, Map<String, String> decoderParams, Set<DecoderColumnHandle> columns)
public RowDecoder create(String dataFormat, Map<String, String> decoderParams, Set<DecoderColumnHandle> columns, ConnectorSession session)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should session be the first parameter here too?

Copy link
Member Author

@elonazoulay elonazoulay Aug 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, thanks! We can extend the suggestion for encoder parameters and create a DecoderRowSpec. Update: modified the commit, lmk when you have a chance.

{
return readSchema(tableHandle.getKeyDataSchemaLocation(), tableHandle.getKeySubject());
}

@Override
public final Optional<String> readValueContentSchema(KafkaTableHandle tableHandle)
public final Optional<String> getValue(KafkaTableHandle tableHandle)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getValue -> getMessage?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, will do! Just for context: schema registry uses the term "value" for kafka "message" subject, not sure why since they're both confluent products. I'll change it shortly.

@elonazoulay elonazoulay force-pushed the kafka_insert_prep branch 2 times, most recently from efad910 to 80d421c Compare August 4, 2023 07:11
Copy link
Member

@kokosing kokosing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

% single comment

@@ -58,7 +58,9 @@

public class AvroSchemaConverter
{
public static final String DUMMY_FIELD_NAME = "dummy";
public static final String DUMMY_FIELD_NAME = "$cdekvwhddlgixmua";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do not use random hardcoded characters. I liked $dummy more. Maybe $empty_field_marker?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What concerns do you have?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should field have random part and $dummy part, e.g. cdekvwhddlgixmua$dummy ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It leaks to users. It is even documented. So if it is documented it cannot be so cryptic, and it has to be more user (human) friendly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, will update, thanks!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went with $empty_field_marker - seems more descriptive than $dummy, lmk.

@@ -433,7 +433,7 @@ Inserts are not supported, and the only data format supported is AVRO.
* ``IGNORE`` - Ignore structs with no fields. This propagates to parents.
For example, an array of structs with no fields is ignored.
* ``FAIL`` - Fail the query if a struct with no fields is defined.
* ``DUMMY`` - Add a dummy boolean field called ``dummy``, which is null.
* ``DUMMY`` - Add a dummy boolean field name consisting of random characters ``$empty_field_marker``, which is null.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So now, should replace DUMMY to something more meaningful? Maybe MARK?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch, thanks!

Change the dummy field name to something
less likely to be used.
This is needed for a subsequent commit
adding functionality for handling records
with no fields.
This will be needed for encoding avro records
that use schema registry for providing the schema.
@kokosing kokosing merged commit 4dd4c9a into trinodb:master Aug 9, 2023
@kokosing
Copy link
Member

kokosing commented Aug 9, 2023

@elonazoulay Please provide a release notes.

@github-actions github-actions bot added this to the 423 milestone Aug 9, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

3 participants