-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Kafka headers as column #4462
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your PR, great addition to the connector.
presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaRecordSet.java
Outdated
Show resolved
Hide resolved
presto-kafka/src/test/java/io/prestosql/plugin/kafka/TestKafkaIntegrationSmokeTest.java
Outdated
Show resolved
Hide resolved
presto-kafka/src/test/java/io/prestosql/plugin/kafka/TestKafkaIntegrationSmokeTest.java
Outdated
Show resolved
Hide resolved
presto-kafka/src/test/java/io/prestosql/plugin/kafka/TestKafkaIntegrationSmokeTest.java
Outdated
Show resolved
Hide resolved
presto-kafka/src/test/java/io/prestosql/plugin/kafka/TestKafkaIntegrationSmokeTest.java
Outdated
Show resolved
Hide resolved
presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaRecordSet.java
Outdated
Show resolved
Hide resolved
presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaInternalFieldDescription.java
Outdated
Show resolved
Hide resolved
presto-kafka/src/test/java/io/prestosql/plugin/kafka/TestKafkaIntegrationSmokeTest.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaRecordSet.java
Outdated
Show resolved
Hide resolved
presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaRecordSet.java
Outdated
Show resolved
Hide resolved
presto-kafka/src/test/java/io/prestosql/plugin/kafka/TestKafkaIntegrationSmokeTest.java
Outdated
Show resolved
Hide resolved
presto-kafka/src/test/java/io/prestosql/plugin/kafka/TestKafkaIntegrationSmokeTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor suggestions, thanks for your work on this :)
presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaInternalFieldDescription.java
Outdated
Show resolved
Hide resolved
presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaInternalFieldDescription.java
Outdated
Show resolved
Hide resolved
presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaInternalFieldDescription.java
Outdated
Show resolved
Hide resolved
presto-kafka/src/test/java/io/prestosql/plugin/kafka/TestKafkaIntegrationSmokeTest.java
Show resolved
Hide resolved
presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaInternalFieldDescription.java
Outdated
Show resolved
Hide resolved
presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaInternalFieldDescription.java
Outdated
Show resolved
Hide resolved
presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaInternalFieldDescription.java
Outdated
Show resolved
Hide resolved
presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaInternalFieldDescription.java
Outdated
Show resolved
Hide resolved
presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaRecordSet.java
Outdated
Show resolved
Hide resolved
@@ -269,4 +278,53 @@ public void close() | |||
kafkaConsumer.close(); | |||
} | |||
} | |||
|
|||
public static FieldValueProvider headerMapValueProvider(MapType varcharMapType, Headers headers) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd assume empty headers case is not a rare case, so we could want to cache a result for this (empty block).
@dain wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@0xE282B0 can you address this one unless the assumption about the fact, that empty headers is a common thing, is false?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there an easier way to get an empty MapBlock than creating a MapType and using a MapBlockBuilder?
MapType mapType = new MapType(VarcharType.VARCHAR, new ArrayType(VarbinaryType.VARBINARY),
MethodHandles.empty(methodType(Boolean.class, Block.class, int.class, long.class)),
MethodHandles.empty(methodType(Boolean.class, Block.class, int.class, Block.class, int.class)),
MethodHandles.empty(methodType(long.class, Object.class)),
MethodHandles.empty(methodType(long.class, Object.class)));
BlockBuilder mapBlockBuilder = new MapBlockBuilder(mapType, null, 0);
mapBlockBuilder.beginBlockEntry();
mapBlockBuilder.closeEntry();
Block emptyMapBlock = mapType.getObject(mapBlockBuilder, 0);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not aware. Looks good to me.
Nit: can you rename createEmptyMapBlockProvider
to createEmptyHeadersFieldProvider
and EMPTY_MAP_BLOCK_PROVIDER
to EMPTY_HEADERS_FIELD_PROVIDER
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also please add a test when we are reading _headers
column but headers map is empty.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Naming sounds better - more specific.
The empty header test doesn't look as straightforward as I wanted it to be, but it works. Since the assertQuery
does not work on maps and VARBINARY does not work with JSON, I converted the map type and then cast it to JSON to compare it with empty objects.
SELECT cast(transform_values(_headers,(k,v)->transform(v,x->from_utf8(x))) AS JSON)
Recomendations welcome 😉
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the test could be a bit more readable if headers topic also defined named id
field. It would make assertions simpler.
Then if messages with empty headers has id
equal to 1
and 2
.
The assertion would look like:
assertQuery("SELECT id FROM default." + headersTopic + " WHERE cardinality(_headers) = 0",
"VALUES (1), (2)");
The other test query could be more readable too if you used id
in WHERE
clause.
WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately we don't have an ID, but we can enumerate the messages based on the value of the message.
assertQuery("SELECT _message FROM default." + headersTopic + " WHERE cardinality(_headers) = 0",
"VALUES ('1'),('2')");
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking of extending the schema with id
but using _message
is totally fine too.
record = new ProducerRecord<>(topicName, null, "{}".getBytes(UTF_8)); | ||
record.headers() | ||
.add("foo", "bar".getBytes(UTF_8)) | ||
.add("foo", null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for taking care of null values
can header key be null as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, at least the Kafka Java client throws an exception if you try to create a Header with null as key.
See: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeader.java#L32
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@0xE282B0 For me the responsibility split between static enum InternalField
and runtime InternalFieldDescription
is very counterintuitive.
As we are forced to introduce InternalFieldDescription
which is built at runtime it does not make sense to me to keep rich InternalField
with subset of column information.
Instead I would suggest to move all the information to InternalFieldDescription
and keep InternalField
enum just as a name marker.
Given some naming changes it may look like this:
public class KafkaInternalFieldManager {
enum InternalFieldKey {
PARTITION_ID_FIELD,
PARTITION_OFFSET_FIELD,
MESSAGE_CORRUPT_FIELD,
...
}
public static class InternalField {
private final InternalFieldKey key;
private final String columnName;
private final String comment;
private final Type type;
// constructor, getter, setters
}
private final Map<InternalFieldKey, InternalField> internalFields;
}
We could even drop InternalFieldKey
just define constants to be used for matching based on columnName
.
WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks looks good. Minor comments.
presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaInternalFieldManager.java
Outdated
Show resolved
Hide resolved
@@ -269,4 +278,53 @@ public void close() | |||
kafkaConsumer.close(); | |||
} | |||
} | |||
|
|||
public static FieldValueProvider headerMapValueProvider(MapType varcharMapType, Headers headers) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@0xE282B0 can you address this one unless the assumption about the fact, that empty headers is a common thing, is false?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
One more request for a test and we are good to go :)
@@ -269,4 +278,53 @@ public void close() | |||
kafkaConsumer.close(); | |||
} | |||
} | |||
|
|||
public static FieldValueProvider headerMapValueProvider(MapType varcharMapType, Headers headers) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not aware. Looks good to me.
Nit: can you rename createEmptyMapBlockProvider
to createEmptyHeadersFieldProvider
and EMPTY_MAP_BLOCK_PROVIDER
to EMPTY_HEADERS_FIELD_PROVIDER
@@ -269,4 +278,53 @@ public void close() | |||
kafkaConsumer.close(); | |||
} | |||
} | |||
|
|||
public static FieldValueProvider headerMapValueProvider(MapType varcharMapType, Headers headers) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also please add a test when we are reading _headers
column but headers map is empty.
Signed-off-by: Sven Pfennig <[email protected]>
The KafkaInternalFieldManager creates the internalFields map in the consructor where the TypeManager can be used. Signed-off-by: Sven Pfennig <[email protected]>
Column definition has been added to KafkaInternalFieldDescription with map(VARCHAR,array(VARBINARY)) type from TypeManager. ValueProvider has been added to KafkaRecordSet Signed-off-by: Sven Pfennig <[email protected]>
Thanks! I did not notice previously that you already made a change to the test :) |
This PR adds the internal _headers field to the kafka connector.
As type a map with key String and value array of byte[] is used.
For better review the changes are splited into two commits:
I would appreciate if someone could review the code.