Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use bytes for record keys instead of String #157

Merged
merged 27 commits into from
Jun 27, 2022

Conversation

ajalab
Copy link
Contributor

@ajalab ajalab commented Jun 7, 2022

Fixes #126.

Related: #61 (previous attempt to support non-String keys, but abandoned)

Motivation

In these days, decaton has been used to subscribe arbitrary topics. Such topics may not have String as keys and DecatonTask as value.

now Decaton is kind of "general consumer framework" as it supports to consume messages in arbitrary-format, we think we should make Decaton to support also non-String keys.
#126 (comment)

Currently, however, Decaton does not fully support topics with non-String keys. One of the major unsupported features is retry queuing as reported in #126.

This is due to the previous design decision that Decaton should only subscribe topics whose contents are produced by DecatonClient with String keys.

Supporting only ascii-printable String was a design-decision that made in early days of Decaton that all tasks are assumed to be produced by DecatonClient.
#126 (comment)

Solution

To unlock the whole features of Decaton for non-String keys, this PR updates the key type from String to byte[]. This change is originally proposed by @kawamuray in #61 (comment).

Please note that this PR contains breaking changes, which update signature of public interfaces and classes like DecatonClient EDIT Based on the comment, we decided to keep using String for DecatonClient. Only decaton-processor classes and underlying producers (e.g., DecatonTaskProducer) will be affected.

Basically, this PR is made of two changes below.

  1. Add ArbitraryTopicTypeTest to reproduce the problem Retry fails if a non-ASCII string is used for the Kafka Record Key #126
  2. Replace String key definition with byte[]. If the hash of a key is needed for Map or Set, wrap the key with ByteBuffer.

TODO

@CLAassistant
Copy link

CLAassistant commented Jun 7, 2022

CLA assistant check
All committers have signed the CLA.

@ajalab ajalab force-pushed the use-bytes-for-keys branch from 50e5c2c to b3048ae Compare June 7, 2022 08:12
Copy link
Contributor

@mauhiz mauhiz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Have you considered how to handle the places where the key is used in toString?
  • Would there be a way to inject the serializer / deserializer and making things generic instead of using only raw bytes?

return true;
}

final String stringKey = new String(key, StandardCharsets.UTF_8);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if the key cannot be read a String? this can happen. https://stackoverflow.com/questions/70667113/why-cant-i-decode-any-byte-using-utf-8#comment124924879_70667113

it looks like ignoreKeys should keep a set of byte arrays instead

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I assumed new String(key, StandardCharsets.UTF_8) never throws an exception. Instead, it silently accepts the malformed texts by converting some irregular values with a placeholder.

One commonly used approach in UTF-8 decoders is to replace any malformed UTF-8 sequence by a replacement character (U+FFFD), which looks a bit like an inverted question mark, or a similar symbol.
https://www.cl.cam.ac.uk/~mgk25/ucs/examples/UTF-8-test.txt

image

Also, I assumed that we support key denylist only when the target key is String,

  1. Support key blacklisting only if the key is the type of String (the filter just tries to instantiate String from whatever the byte array is and do comparison which might always fail when key is not a type of String).
    Make key deserializer of KafkaConsumer configurable #61 (comment)

If those assumptions are valid, I think we can just ignore non-UTF8 keys here because they will never match the configured keys stored in the denylist.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. It seems we might want to support other input for this denylist, maybe longs...
Let's leave the possibility open even if we don't implement it in this PR

@ajalab
Copy link
Contributor Author

ajalab commented Jun 7, 2022

Have you considered how to handle the places where the key is used in toString?

Ah not yet. In this sense, this change will make it hard to debug it since printed keys in log or elsewhere won't be human-readable anymore.

I'm not sure what's the most preferred way to display it (print in hex or try interpreting as UTF-8). Do you guys have any ideas about that?


Would there be a way to inject the serializer / deserializer and making things generic instead of using only raw bytes?

One of the solutions I came up with is to inject a serializer into DecatonClientImpl, so that we can replace put(byte[] key, ...) with put(K key, ...) where K is a type variable for keys. I just avoided doing this since I found the following comment.

However changing key as generic type T consequence the mess that almost all classes of decaton to be a generic class which we would wanna avoid.
#61 (comment)

However, adding a type variable in DecatonClientImpl and its related interfaces/classes may not be a significant change. Let me try implementing it as another commit for comparison.

@mauhiz
Copy link
Contributor

mauhiz commented Jun 7, 2022

almost all classes of decaton to be a generic class which we would wanna avoid

I don't see how that would be a bad thing, TBH.

Jokes aside, maybe we need to dissociate the "context key" (String, human readable key) and "record key" (bytes from Kafka), and just give a way to convert between them (by default : a StringSerializer / StringDeserializer, but could be Long serdes + toString)

Copy link
Contributor

@ocadaruma ocadaruma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for tackling this!

Overall strategy looks good though, here is my thoughts:

  • decaton-processor => +1 for changing ProcessingContext's key type to byte[] (which will be considered as breaking change).
    • As @mauhiz mentioned, making thing generic would be an option though, it could introduce another type parameter in DecatonProcessor, so the impact for users will be huge.
    • On the other hand, I suppose only limited users refer ProcessingContext#key in their code so only changing it to byte[] would be good compromise.
  • decaton-client => I think we can keep key type as String for DecatonClient.
    • Because DecatonClient is just a default client to produce decaton tasks in "standard" format, rather than general producer to produce tasks in arbitrary key/value serialization format

WDYT? @kawamuray

@kawamuray
Copy link
Contributor

kawamuray commented Jun 10, 2022

@ocadaruma Agree with your comment.

I don't see how that would be a bad thing, TBH.

Before I made a significant refactoring for class hierarchies, decaton was holding task type as generic parameter almost at all classes in objects hierarchy, while the type parameter is referred only to hold a field for most of the classes, and it was really messy.
I then refactoring the decaton code to look like today's which encapsulate type parameter to the ProcessPipeline, which is created by users themselves and passed to ProcessorsBuilder, then eventually stored in ProcessUnit but without the type parameter (ProcessPipeline<?>), so now the structure represents the fact that decaton's core classes which doesn't rely on the task type stay agnostic for the type of the task.

It might be an option to add another type parameter for key, to the ProcessPIpeline and objects under that, but that will make us need to move the key filtering (BlacklistedKeysFilter) form ProcessorSubscription level to much later part, after queueing for subpartition, which makes the key filtering feature less-effective by letting these tasks to be filtered out once to get into the queue before they gets dropped, and I think it is a problem since the key filtering feature is commonly used to drop tasks from abusive user account (key = userid). This is what making it difficult (to choice) here, and I think fixing byte[] type as a key, while expecting string keys for enabling key filtering feature would be a good compromise.

I'm happy to hear if there's a better idea though.

@ajalab
Copy link
Contributor Author

ajalab commented Jun 13, 2022

Thank you all for sharing your idea!

decaton-client => I think we can keep key type as String for DecatonClient.

I also agree for this strategy. Let me try to implement subsequent changes in this way.

@ajalab
Copy link
Contributor Author

ajalab commented Jun 13, 2022

decaton-client => I think we can keep key type as String for DecatonClient.

Added 33d026e to let DecatonClient use String instead of byte[] as the previous implementation does.

Please note that the underlying producer (DecatonTaskProducer) still has to use byte[] keys. This is because it's used by DecatonTaskRetryQueuingProcessor, which has to produce a retry task based on a record in an arbitrary topic.

I believe that user-impact due to this change is still small, since DecatonTaskProducer is not expected to be used by applications.

@ajalab ajalab requested a review from ocadaruma June 14, 2022 05:50
Copy link
Contributor

@ocadaruma ocadaruma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left few feedbacks.
Besides, could you fix below points as well?

key -> new ArrayList<>()).add(record);
}

@Override
public void doAssert() {
// Checks there's no overlap between two consecutive records' processing time
for (Entry<String, List<ProcessedRecord>> entry : records.entrySet()) {
for (Entry<ByteBuffer, List<ProcessedRecord>> entry : records.entrySet()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we should convert key to string when generating assert message to get readable result here: https://github.com/line/decaton/pull/157/files#diff-141d497c7aafcef2217814fbe1d2ea75d00a4089212776327360030a935d2278L56

}

@Test(timeout = 30000)
public void testPrintableAsciiStringKeyValue() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess String-key based scenario is already tested in other basic tests.
The intention to have another test here is?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just thought that there were no test scenarios that produce String-key records without DecatonClient but an arbitrary Producer.

But it looks we already have such scenarios? Let me consider removing this one then

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

d79abd2 Replaced with byte key/value test case (or we don't need two tests?)

@@ -153,17 +154,17 @@ public void testSingleThreadProcessing() throws Exception {
// Note that this processing semantics is not be considered as Decaton specification which users can rely on.
// Rather, this is just a expected behavior based on current implementation when we set concurrency to 1.
ProcessingGuarantee noDuplicates = new ProcessingGuarantee() {
private final Map<String, List<TestTask>> produced = new HashMap<>();
private final Map<String, List<TestTask>> processed = new HashMap<>();
private final Map<ByteBuffer, List<TestTask>> produced = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ByteBuffer is mutable and its equals/hashcode methods depend on position/limit/remaining. Meaning, reading from it will affect findability in the map.

I don't know if is this overkill, but what about defining a new wrapper type, that
simply delegates equals/hashCode to Arrays.equals / Arrays.hashCode?

It could also cache hashCode (like String does)

And have a toString that just attempts to decode it with UTF-8

// Preceding isEmpty() check is for reducing tiny overhead applied for each contains() by calling
// Object#hashCode. Since ignoreKeys should be empty for most cases..
if (!ignoreKeys.isEmpty() && ignoreKeys.contains(record.key())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading this again, I thought we might want to changes ignoreKeys to a Set<ByteBuffer> to avoid the charset decoding step (=encode only once when reading blacklist config)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is finally fixed by 6d3cc6c

ajalab added 4 commits June 20, 2022 16:26
For LoggingContext to print record keys in bytes as human-readable
String, this commit introduces a new interface `KeyFormatter` that
translates keys from bytes into `String`.
This interface has a canonical implementation that reads the byte key as
a UTF-8 byte sequence.

For other places, this commit just prevents the byte keys from being
printed, or prints it as String if it's inside a test case.
To avoid charset decoding step.
@ajalab
Copy link
Contributor Author

ajalab commented Jun 20, 2022

Addressed all of the comments above. Notably, we introduced two new classes here.

EDIT (2022-06-21) Those were removed #157 (comment)

TaskKey

This is a tiny wrapper of record keys in byte[]. It equips overridden hashCode with cache, and is used to represent a set or map of record keys instead of ByteBuffer. (cf. #157 (comment))

KeyFormatter

This is a functional interface that translates keys from TaskKey into String. By default, a canonical converter that recognizes the byte key as UTF-8 char sequence is used. Users can inject a custom key formatter via ProcessorsBuilder to print out non-String keys (e.g., Long) in the log.

@ajalab ajalab requested a review from ocadaruma June 20, 2022 08:50
@ajalab
Copy link
Contributor Author

ajalab commented Jun 20, 2022

Somehow I could only re-request a review of @ocadaruma but @mauhiz could you take a look too?

@ajalab
Copy link
Contributor Author

ajalab commented Jun 21, 2022

Based on the comments, I removed/replaced the following new classes.

@ajalab ajalab requested a review from ocadaruma June 21, 2022 09:10
@ajalab
Copy link
Contributor Author

ajalab commented Jun 21, 2022

Hmm, seems the CI failed because of unresolvable dependencies (due to CloudFlare incident?) and I have no right to rerun the workflow

* What went wrong:
A problem occurred configuring root project 'decaton'.
> Could not resolve all files for configuration ':classpath'.
   > Could not resolve me.champeau.jmh:jmh-gradle-plugin:0.6.6.
     Required by:
         project : > me.champeau.jmh:me.champeau.jmh.gradle.plugin:0.6.6
      > Could not resolve me.champeau.jmh:jmh-gradle-plugin:0.6.6.
         > Could not get resource 'https://plugins.gradle.org/m2/me/champeau/jmh/jmh-gradle-plugin/0.6.6/jmh-gradle-plugin-0.6.6.pom'.
            > Could not GET 'https://plugins-artifacts.gradle.org/me.champeau.jmh/jmh-gradle-plugin/0.6.6/562890d50aca0743c76181dfbc9fdc8a0750e8c47c978bfbb84472ea[41](https://github.com/line/decaton/runs/6979355735?check_suite_focus=true#step:4:42)1a0c5d/jmh-gradle-plugin-0.6.6.pom'. Received status code 500 from server: Internal Server Error
   > Could not resolve gradle.plugin.com.github.johnrengelman:shadow:7.1.1.
     Required by:
         project : > com.github.johnrengelman.shadow:com.github.johnrengelman.shadow.gradle.plugin:7.1.1
      > Skipped due to earlier error
   > Could not resolve io.franzbecker:gradle-lombok:5.0.0.
     Required by:
         project : > io.franzbecker.gradle-lombok:io.franzbecker.gradle-lombok.gradle.plugin:5.0.0
      > Skipped due to earlier error

Copy link
Contributor

@ocadaruma ocadaruma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left few trivial feedbacks but almost looks good!

}

@Override
public String toString() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we delegate to a static, reusable method, e.g. for use in LoggingContext

Copy link
Contributor Author

@ajalab ajalab Jun 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit afraid that adding a such static method like HashableByteArray.asString(byte[]) (note: toString can't be used due to name collision nope, we can use it) leads to give multiple responsibilities to the class.

It might be better to create a utility class with a static method ByteArrays.toString(byte[])? WDYT (cc. @ocadaruma)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method may support both null array and non-null array.

I'm also not sure if we can make UTF-8 conversion as if it's a canonical way (ByteArrays.toString) to convert byte[] into String... Just keep using String(byte[], Charset) here and there will be an option?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anyway let me add ByteArrays.toString as PoC

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added de1cb5f com.linecorp.decaton.processor.internal.ByteArrays with static toString method

@ajalab ajalab force-pushed the use-bytes-for-keys branch from f3a0b0b to 9d9db83 Compare June 22, 2022 09:00
@ajalab ajalab force-pushed the use-bytes-for-keys branch from de1cb5f to a288fc6 Compare June 22, 2022 09:05
@ajalab ajalab requested a review from ocadaruma June 22, 2022 09:11
Copy link
Contributor

@kawamuray kawamuray left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Great progress, thanks!

Copy link
Contributor

@ocadaruma ocadaruma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left last comment

@ajalab ajalab requested a review from ocadaruma June 23, 2022 02:58
Copy link
Contributor

@ocadaruma ocadaruma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found one more point to fix....
I hope this is the last. :)

Copy link
Contributor

@ocadaruma ocadaruma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update.

LGTM!

@ocadaruma ocadaruma merged commit ae8dd85 into line:master Jun 27, 2022
@ocadaruma ocadaruma added the breaking change Breaking change for a public API label Jun 27, 2022
@ajalab ajalab deleted the use-bytes-for-keys branch June 28, 2022 04:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
breaking change Breaking change for a public API
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Retry fails if a non-ASCII string is used for the Kafka Record Key
5 participants