Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to configure prefix for internal Kafka fields #14224

Merged

Conversation

ssheikin
Copy link
Contributor

@ssheikin ssheikin commented Sep 20, 2022

Description

When we process Kafka topics, for internal usage we add some bunch of additional columns with some useful data and we fill them with data which is coming from Kafka (RecordSet), for example:
case PARTITION_OFFSET_FIELD -> longValueProvider(message.offset());
By default in current implementation this columns have some hardcoded names like:
_partition_id _message_corrupt e.t.c
So it could be a situation, that Kafka topic itself have some fields with similar name, and in this case during processing we could have a conflict (like two columns have the same name, for example _key)
So reason of this PR is to provide ability to tune internal column names with custom prefix like XX_my_prefix_XX_key, so this conflict could be more rare then with simple current default prefix _key
This PR will not resolve problem, but just postpone it until times when users will have more difficult column name, which will conflict with our internal column.
This change is backward compatible so for people who don't have colliding field names they don't need to change their existing queries to work with newer version.
The drawback is that this can't anticipate a collision ahead of time. It does however allow adminstrators to use a unique prefix unlikely to ever exist - e.g. __kafka_message_metadata_.

Thank you, @vlad-lyutenko and @hashhar for the description :)

Non-technical explanation

Release notes

( ) This is not user-visible or docs only and no release notes are required.
( ) Release notes are required, please propose a release note for me.
(*) Release notes are required, with the following suggested text:

# Kafka
* Allow configuring the prefix for internal column names using `kafka.internal-column-prefix` catalog configuration property. The default value is `_` to maintain the same behaviour that exists today. ({issue}`14224`)

@cla-bot cla-bot bot added the cla-signed label Sep 20, 2022
@ssheikin ssheikin force-pushed the ssheikin/56/oss/kafka_internal_column_prefix branch from ef8ea29 to 0f4785b Compare September 20, 2022 22:50
TIMESTAMP_MILLIS))
.buildOrThrow();
String prefix = kafkaConfig.getInternalFieldPrefix();
internalFields = Stream.of(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have this to preserve order of columns, correct?

Earlier code the order being preserved was mostly an impl detail of ImmutableMap that insertion order = iteration order?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't know about ordering. I guess it is not relevant here.

This code is tricky.
Once it looked like enum already. And was refactored this way to use dynamic value from TypeManager.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say that order here does not matter. It's just a holder.
IIRC order matters for column handlers, and they are used within cursor.

@hashhar hashhar requested a review from Praveen2112 September 21, 2022 07:00
@vlad-lyutenko
Copy link
Contributor

vlad-lyutenko commented Sep 21, 2022

I tried to understand what is going in this PR, let me try to summarise it and please correct me if I am wrong or miss something.

When we process Kafka topics, for internal usage we add some bunch of additional columns with some useful data and we fill them with data which is coming from Kafka (RecordSet), for example:

case PARTITION_OFFSET_FIELD -> longValueProvider(message.offset());

By default in current implementation this columns have some hardcoded names like:

_partition_id _message_corrupt e.t.c

So it could be a situation, that Kafka topic itself have some fields with similar name, and in this case during processing we could have a conflict (like two columns have the same name, for example _key)

So reason of this PR is to provide ability to tune internal column names with custom prefix like XX_my_prefix_XX_key,
so this conflict could be more rare then with simple current default prefix _key

If my description is correct and I understand correctly, this PR will not resolve problem, but just postpone it until times when users will have more difficult column name, which will conflict with our internal column.

But maybe I missed smth, please correct me if I ma wrong

@hashhar
Copy link
Member

hashhar commented Sep 21, 2022

@vlad-lyutenko Exactly correct understanding.

this PR will not resolve problem, but just postpone it until times when users will have more difficult column name, which will conflict with our internal column.

True. This change has some benefits - it's backward compatible so for people who don't have colliding field names they don't need to change their existing queries to work with newer version.

The drawback is that this can't anticipate a collision ahead of time. It does however allow adminstrators to use a unique prefix unlikely to ever exist - e.g. __kafka_message_metadata_.

Is there some other solution you can think of? One idea which I was thinking was to expose all internal columns as a ROW type with multiple fields instead of separate column per field - that way you only have to worry about one collision.

@ssheikin
Copy link
Contributor Author

One idea which I was thinking was to expose all internal columns as a ROW type with multiple fields instead of separate column per field - that way you only have to worry about one collision.

The probability in O() notation is the same, however it's more complex implementation.
With 10 columns it's O(10 * n) = O(n)

@vlad-lyutenko
Copy link
Contributor

Is there some other solution you can think of? One idea which I was thinking was to expose all internal columns as a ROW type with multiple fields instead of separate column per field - that way you only have to worry about one collision.

I think in this case, we will lose backward compatibility for people who already have some queries,
I more think about solution of providing users with more detailed description what happens and how to resolve problem, using custom prefix.
Looks like current exception is coming somewhere from core/engine (not from connector).
So maybe we can add such check on column collision inside connector, with more detailed description, not to get any open issues/tickets from users in future.

@ssheikin ssheikin force-pushed the ssheikin/56/oss/kafka_internal_column_prefix branch from 0f4785b to 7b64f31 Compare September 21, 2022 16:01
@hashhar
Copy link
Member

hashhar commented Sep 21, 2022

I more think about solution of providing users with more detailed description what happens and how to resolve problem, using custom prefix.

This I agree with - see #14224 (comment)

@Praveen2112
Copy link
Member

What if we could generate the name of these meta-columns based on the table structure. Like if we have column _timestamp_n then we could generate the metacolumn name as _timestamp_(n+1) it would be backward compatible and will also avoid the conflict if it happens anytime in the future ?

@ssheikin
Copy link
Contributor Author

What if we could generate the name of these meta-columns based on the table structure. Like if we have column _timestamp_n then we could generate the metacolumn name as _timestamp_(n+1) it would be backward compatible and will also avoid the conflict if it happens anytime in the future ?

If these internal columns are used by endusers (and according to docs it's possible to query them) I think it's no-go as most probably clients rely on some concrete names.

@hashhar
Copy link
Member

hashhar commented Sep 22, 2022

I think the current solution is the best we can do for now while also not breaking user queries.

An alternative might be to expose these columns using prefixes which are invalid for the target system - in Hive that's $ but AFAIK in Kafka connector there are no limitations on field names other than for Avro which requires prefix to be [A-Za-z_].

@hashhar
Copy link
Member

hashhar commented Sep 22, 2022

LGTM % remaining comments. Please let me know when addressed @ssheikin

@ssheikin ssheikin force-pushed the ssheikin/56/oss/kafka_internal_column_prefix branch 4 times, most recently from 59ce82c to 47d03b5 Compare September 22, 2022 13:19
@ssheikin
Copy link
Contributor Author

Looks like current exception is coming somewhere from core/engine (not from connector).
So maybe we can add such check on column collision inside connector, with more detailed description, not to get any open issues/tickets from users in future.

Current exception comes from connector.
I've Improved exception message.

@ssheikin ssheikin requested a review from hashhar September 22, 2022 13:21
@ssheikin
Copy link
Contributor Author

@hashhar @vlad-lyutenko @Praveen2112 All comments are addressed. Please take a look one more time.

@ssheikin ssheikin force-pushed the ssheikin/56/oss/kafka_internal_column_prefix branch from 47d03b5 to e6113b4 Compare September 22, 2022 13:46
@vlad-lyutenko
Copy link
Contributor

Looks like current exception is coming somewhere from core/engine (not from connector).
So maybe we can add such check on column collision inside connector, with more detailed description, not to get any open issues/tickets from users in future.

Current exception comes from connector. I've Improved exception message.

Ok, thx, now I see it was from buildOrThrow of immutable map.

@vlad-lyutenko
Copy link
Contributor

vlad-lyutenko commented Sep 22, 2022

I am ok with changes

Copy link
Member

@hashhar hashhar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM % comments.

@ssheikin ssheikin force-pushed the ssheikin/56/oss/kafka_internal_column_prefix branch 2 times, most recently from 1047da0 to 22d2fd0 Compare October 5, 2022 12:03
- switch expressions
- unused parameters
- simplify loops
- inline effectively final variables
- use early return
When we process Kafka topics, for internal usage we add some bunch of
additional columns with some useful data and we fill them with data
which is coming from Kafka (RecordSet), for example:
`case PARTITION_OFFSET_FIELD -> longValueProvider(message.offset());`
By default in current implementation these columns have some hardcoded
names like: `_partition_id` `_message_corrupt` e.t.c
So it could be a situation, that Kafka topic itself have some fields
with similar name, and in this case during processing we could have a
conflict (like two columns have the same name, for example `_key`)
@ssheikin ssheikin force-pushed the ssheikin/56/oss/kafka_internal_column_prefix branch from 22d2fd0 to af75dfa Compare October 11, 2022 15:41
@ssheikin
Copy link
Contributor Author

@wendigo please approive

@ssheikin ssheikin force-pushed the ssheikin/56/oss/kafka_internal_column_prefix branch from af75dfa to 8e0dad2 Compare October 12, 2022 11:33
Copy link
Member

@hashhar hashhar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, will merge once CI is done.

When we process Kafka topics, for internal usage we add some bunch of
additional columns with some useful data and we fill them with data
which is coming from Kafka (RecordSet), for example:
`case PARTITION_OFFSET_FIELD -> longValueProvider(message.offset());`
By default in current implementation these columns have some hardcoded
names like: `_partition_id` `_message_corrupt` e.t.c
So it could be a situation, that Kafka topic itself have some fields
with similar name, and in this case during processing we could have a
conflict (like two columns have the same name, for example `_key`)

Reason of this PR is to provide ability to tune internal column names
with custom prefix like `XX_my_prefix_XX_`, so this conflict could be
more rare then with simple current default prefix `_`
This PR will not resolve problem, but just postpone it until times when
users will have more difficult column name, which will conflict with our
internal column.
This change is backward compatible so for people who don't have
colliding field names they don't need to change their existing queries
to work with newer version.
The drawback is that this can't anticipate a collision ahead of time.
It does however allow adminstrators to use a unique prefix unlikely to
ever exist - e.g. `__kafka_message_metadata_`.
@ssheikin
Copy link
Contributor Author

image
Restarting checks

@ssheikin ssheikin force-pushed the ssheikin/56/oss/kafka_internal_column_prefix branch from 8e0dad2 to bd40ee5 Compare October 13, 2022 09:27
@hashhar hashhar merged commit f5c12ae into trinodb:master Oct 17, 2022
@hashhar hashhar mentioned this pull request Oct 17, 2022
@github-actions github-actions bot added this to the 401 milestone Oct 17, 2022
@ssheikin ssheikin deleted the ssheikin/56/oss/kafka_internal_column_prefix branch April 12, 2024 09:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

8 participants