-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Do not require Confluent libraries for Kafka connector if Protobuf features are not being used #17858
Do not require Confluent libraries for Kafka connector if Protobuf features are not being used #17858
Conversation
the first 2 commits are alternative solutions for the same problem - please suggest which is better. Or should I keep both commits? |
199e588
to
3a369f6
Compare
Guice calls `Class#getDeclaredMethods` to discover the methods which require injection of an argument. This makes all the classes present in the method signature to get loaded. In 02cc332 a new method was added whose signature includes `ProtobufSchema` which is present in a Confluent licensed library which Trino doesn't ship with. Since that commit the Kafka connector startup fails with the following error message regardless of whether or not Protobuf functionality is being used: 2023-04-30T05:33:54.710Z ERROR main io.trino.server.Server io/confluent/kafka/schemaregistry/protobuf/ProtobufSchema java.lang.NoClassDefFoundError: io/confluent/kafka/schemaregistry/protobuf/ProtobufSchema at java.base/java.lang.Class.getDeclaredMethods0(Native Method) at java.base/java.lang.Class.privateGetDeclaredMethods(Class.java:3402) at java.base/java.lang.Class.getDeclaredMethods(Class.java:2504) at com.google.inject.internal.DeclaredMembers.getDeclaredMethods(DeclaredMembers.java:48) at com.google.inject.spi.InjectionPoint.getDeclaredMethods(InjectionPoint.java:811) at com.google.inject.spi.InjectionPoint.getInjectionPoints(InjectionPoint.java:730) at com.google.inject.spi.InjectionPoint.forInstanceMethodsAndFields(InjectionPoint.java:430) at com.google.inject.internal.ConstructorBindingImpl.getInternalDependencies(ConstructorBindingImpl.java:177) at com.google.inject.internal.InjectorImpl.getInternalDependencies(InjectorImpl.java:670) at com.google.inject.internal.InjectorImpl.cleanup(InjectorImpl.java:627) at com.google.inject.internal.InjectorImpl.initializeJitBinding(InjectorImpl.java:613) at com.google.inject.internal.InjectorImpl.createJustInTimeBinding(InjectorImpl.java:943) at com.google.inject.internal.InjectorImpl.createJustInTimeBindingRecursive(InjectorImpl.java:863) at com.google.inject.internal.InjectorImpl.getJustInTimeBinding(InjectorImpl.java:300) at com.google.inject.internal.InjectorImpl.getBindingOrThrow(InjectorImpl.java:223) at com.google.inject.internal.InjectorImpl.getInternalFactory(InjectorImpl.java:949) at com.google.inject.internal.FactoryProxy.notify(FactoryProxy.java:48) at com.google.inject.internal.ProcessedBindingData.runCreationListeners(ProcessedBindingData.java:60) at com.google.inject.internal.InternalInjectorCreator.initializeStatically(InternalInjectorCreator.java:137) at com.google.inject.internal.InternalInjectorCreator.build(InternalInjectorCreator.java:110) at com.google.inject.Guice.createInjector(Guice.java:87) at io.airlift.bootstrap.Bootstrap.initialize(Bootstrap.java:268) at io.trino.plugin.kafka.KafkaConnectorFactory.create(KafkaConnectorFactory.java:71) at io.trino.connector.DefaultCatalogFactory.createConnector(DefaultCatalogFactory.java:221) at io.trino.connector.DefaultCatalogFactory.createCatalog(DefaultCatalogFactory.java:130) at io.trino.connector.LazyCatalogFactory.createCatalog(LazyCatalogFactory.java:45) at io.trino.connector.StaticCatalogManager.lambda$loadInitialCatalogs$1(StaticCatalogManager.java:158) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31) at java.base/java.util.concurrent.ExecutorCompletionService.submit(ExecutorCompletionService.java:184) at io.trino.util.Executors.executeUntilFailure(Executors.java:41) at io.trino.connector.StaticCatalogManager.loadInitialCatalogs(StaticCatalogManager.java:152) at io.trino.server.Server.doStart(Server.java:144) at io.trino.server.Server.lambda$start$0(Server.java:91) at io.trino.$gen.Trino_415____20230430_053321_1.run(Unknown Source) at io.trino.server.Server.start(Server.java:91) at io.trino.server.TrinoServer.main(TrinoServer.java:38) Caused by: java.lang.ClassNotFoundException: io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:587) at io.trino.server.PluginClassLoader.loadClass(PluginClassLoader.java:128) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:520) ... 39 more This change resolves the issue by changing the method signature to not use a class which isn't always on the classpath. The next commit introduces some safeguards to prevent other occurences of this issue.
3a369f6
to
2ddff91
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentModule.java
Show resolved
Hide resolved
...roduct-tests/src/main/java/io/trino/tests/product/kafka/TestKafkaProtobufReadsSmokeTest.java
Show resolved
Hide resolved
.../java/io/trino/tests/product/launcher/env/environment/EnvMultinodeKafkaConfluentLicense.java
Outdated
Show resolved
Hide resolved
2ddff91
to
5418146
Compare
This avoids loading the Confluent ProtobufSchemaParser unless the protobuf functionality is actually being used avoiding the need to manually copy Confluent licensed libraries onto the Kafka connector classpath.
The message serializer being used is strictly coupled to the catalog being used so the serializer also belongs in the KafkaCatalog. This is useful for an upcoming commit to be able to simplify the usages of DataProvider in some product tests.
The older approach was to copy the Confluent licensed libraries to the Kafka connector classpath for all product environments extending from `Kafka`. This meant that it was not possible to detect changes which introduce a hard-dependency on the Confluent licensed libraries for Kafka connector startup. 02cc332 inadvertently made it so that the Confluent libraries became required for the Kafka connector to start up and was not caught by tests because all the product test environments already had the Confluent libraries present on the Kafka connector classpath. This commit instead introduces a new product test environment `EnvMultinodeKafkaConfluentLicense` which is the only product test environment that copies the Confluent licensed libraries onto the Kafka connector classpath. There is also a new test group `kafka_confluent_license` which now includes all the tests for the functionality which requires the Confluent licensed libraries.
5418146
to
12eee83
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, well done! Thanks
Description
Fixes #17299
Additional context and related issues
Add product tests for Kafka connector with Confluent licensed libraries
The older approach was to copy the Confluent licensed libraries to the
Kafka connector classpath for all product environments extending from
Kafka
. This meant that it was not possible to detect changes whichintroduce a hard-dependency on the Confluent licensed libraries for
Kafka connector startup.
02cc332 inadvertently made it so that
the Confluent libraries became required for the Kafka connector to start
up and was not caught by tests because all the product test environments
already had the Confluent libraries present on the Kafka connector
classpath.
This commit instead introduces a new product test environment
EnvMultinodeKafkaConfluentLicense
which is the only product testenvironment that copies the Confluent licensed libraries onto the Kafka
connector classpath. There is also a new test group
kafka_confluent_license
which now includes all the tests for thefunctionality which requires the Confluent licensed libraries.
Release notes
( ) This is not user-visible or docs only and no release notes are required.
( ) Release notes are required, please propose a release note for me.
(x) Release notes are required, with the following suggested text: