-
Notifications
You must be signed in to change notification settings - Fork 50
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support pgsql-kafka binding #1245
Conversation
@@ -191,6 +195,7 @@ | |||
<configuration> | |||
<excludes> | |||
<exclude>io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/types/**/*.class</exclude> | |||
<exclude>net/sf/jsqlparser/parser/*</exclude> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this needed? Does jsqlparser
generate some code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No but it was causing some IlligalClassFormatException in jacoco
java.lang.instrument.IllegalClassFormatException: Error while instrumenting net/sf/jsqlparser/parser/CCJSqlParserTokenManager with JaCoCo 0.8.11.202310140853/f33756c.
at org.jacoco.agent.rt.internal_4742761.CoverageTransformer.transform(CoverageTransformer.java:94)
at java.instrument/java.lang.instrument.ClassFileTransformer.transform(ClassFileTransformer.java:244)
at java.instrument/sun.instrument.TransformerManager.transform(TransformerManager.java:188)
at java.instrument/sun.instrument.InstrumentationImpl.transform(InstrumentationImpl.java:541)
at java.base/java.lang.ClassLoader.defineClass1(Native Method)
at java.base/java.lang.ClassLoader.defineClass(ClassLoader.java:1013)
at java.base/java.security.SecureClassLoader.defineClass(SecureClassLoader.java:150)
at java.base/jdk.internal.loader.BuiltinClassLoader.defineClass(BuiltinClassLoader.java:862)
at java.base/jdk.internal.loader.BuiltinClassLoader.findClassOnClassPathOrNull(BuiltinClassLoader.java:760)
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClassOrNull(BuiltinClassLoader.java:681)
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:639)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
at net.sf.jsqlparser.parser.CCJSqlParser.<init>(CCJSqlParser.java:42028)
at net.sf.jsqlparser.parser.CCJSqlParserManager.parse(CCJSqlParserManager.java:21)
this.kind = binding.kind; | ||
this.routes = binding.routes.stream().map(PgsqlKafkaRouteConfig::new).collect(toList()); | ||
|
||
this.catalog = supplyCatalog.apply(binding.catalogs.get(0).id); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is catalog
is omitted?
Perhaps we can make it required
in the schema patch?
KAFKA_TOPIC_REQUEST_TIMEOUT_MS = config.property("kafka.topic.request.timeout.ms", 30000); | ||
KAFKA_CREATE_TOPICS_PARTITION_COUNT = config.property("kafka.create.topics.partition.count", 1); | ||
KAFKA_CREATE_TOPICS_REPLICAS = config.property("kafka.create.topics.replicas", (short) 1); | ||
KAFKA_AVRO_SCHEMA_NAMESPACE = config.property("kafka.avro.schema.namespace", "io.aklivity.zilla"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't seem like the right value for the avro namespace as it would more naturally be part of the application, not part of zilla directly.
Seems like it should perhaps default to the pgsql database
parameter instead, like dev
and perhaps this can be a pattern that would substitute the pgsql database
parameter into it, so we could configure it with something like "com.example.{database}"
and have it resolve to "com.example.dev"
. Then if there is no reference to {database}
in the pattern, it would be used verbatim as it is now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, good suggestion. I was not sure what you will recommend since com.example was too generic
command: | ||
for (PgsqlKafkaCommandType commandType : PgsqlKafkaCommandType.values()) | ||
{ | ||
if (Arrays.equals(commandType.value, value)) | ||
{ | ||
command = commandType; | ||
break command; | ||
} | ||
} | ||
|
||
return command; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Each call to PgsqlKafkaCommandType.values()
reallocates the array of enums.
Perhaps we can use a static { ... }
initializer to create a static map of byte[]
to PgsqlKafkaCommandType
and then use that map here to lookup the byte[]
?
return receiver; | ||
} | ||
|
||
private void onDecodeCreateTopicCommand( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private void onDecodeCreateTopicCommand( | |
private void decodeCreateTopicCommand( |
} | ||
} | ||
|
||
private void onDecodeUnknownCommand( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private void onDecodeUnknownCommand( | |
private void decodeUnknownCommand( |
Description
Support pgsql-kafka binding.
Fixes #1058