-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(ksql-connect): poll connect-configs and auto register sources #3178
Conversation
@agavra I have a couple of questions about the design before I start reviewing the PR.
|
Thanks @hjafarpour - answers in line.
Correct for both of these. I don't think we should distinguish between what was created by KSQL and what was not.
I talked about this with @rmoff just earlier today. As it stands, we're going to make this decision per "blessed" connector (e.g. JDBC will always use Table, and Kinesis will always use stream). I think this is an acceptable first step as long as we allow users to reimport things as the other if we made a mistake. In the future, I think we should bleed the stream table duality into connect, and allow the connectors to specify what it is that they are creating. |
Hmm, I think we indeed should distinguish between KSQL generated connectors and other ones. Otherwise, we would have significant complexity in managing KSQL generated connectors. For instance, should we be able to drop a connector that was created outside KSQL? |
Talked offline so I'm summarizing discussion here. There are two different concerns baked into that:
I think the Security aspect should be handled transparently by some RBAC-like system and shouldn't impact the design choices here. Viewing, creating and dropping connectors should all go through the principal of the client issuing the command. UI Bloat is another issue - most other systems (e.g. HIVE, Presto etc...) require each script to declare what it wants to use, and does not make them immediately usable. The approach suggested in this PR could cause too many streams/tables show up when you Since this feature is an addition, we can implement it in a follow-up PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two more questions before LGTM! :)
* {@link ConnectPollingService} to digest and register with KSQL. | ||
* | ||
* <p>On startup, this service reads the connect configuration topic from the | ||
* beginning to make sure that it reconstructs the necessary state.</p> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens when connectors are terminated? Does Connect write such information to this topic? How do we deal with such scenarios where external systems create and terminate connectors?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good questions! Answers inline:
What happens when connectors are terminated? Does Connect write such information to this topic?
Nothing happens because connect doesn't write any such information to the topic, but more importantly it doesn't delete the topics that it already created so I'm not sure what the expected behavior would be.
In the future when we support SHOW CONNECTORS, they would not show up.
How do we deal with such scenarios where external systems create and terminate connectors?
The beautiful thing about this design is that everything is decoupled, so whether the connector is created internally or externally, nothing changes!
topic, connector.getName(), source); | ||
final Builder<String, Literal> builder = ImmutableMap.<String, Literal>builder() | ||
.put(CommonCreateConfigs.KAFKA_TOPIC_NAME_PROPERTY, new StringLiteral(topic)) | ||
.put(CommonCreateConfigs.VALUE_FORMAT_PROPERTY, new StringLiteral("AVRO")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we always import data in AVRO
format? What if we want other formats?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now, yes, since Schema Registry only supports AVRO. When it supports other types hopefully there would be some API that allows me to get what format it is and inject it here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
As we discussed after the MVP we can revisit the design decisions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, @agavra! Left a bunch of comments inline.
ksql-engine/src/main/java/io/confluent/ksql/connect/ConnectConfigService.java
Outdated
Show resolved
Hide resolved
final String name = connector.getName(); | ||
final String source = connector.mapToSource(topic).toUpperCase(); | ||
|
||
// if the meta store already contains the source, don't send the extra command |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we be looking for sources that use topic
rather than the exact source name the connector would use? E.g., if a user creates a stream and topic and then starts a connector, should we automatically create another stream? (I honestly don't know the answer here)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that might be a way to go about it, but that requires adding an index in the Metastore. For now, I just want to make sure I'm not spamming the command topic with unnecessary commands and this serves that purpose. We can always change that later, it would be mostly backwards compatible
ksql-engine/src/main/java/io/confluent/ksql/connect/ConnectConfigService.java
Outdated
Show resolved
Hide resolved
|
||
@Override | ||
protected Scheduler scheduler() { | ||
return Scheduler.newFixedRateSchedule(0, INTERVAL_S, TimeUnit.SECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One potential improvement could be to poll with a backoff in such a way that if we add a new connector we can poll with a short delay and back off until we hit the steady state interval. Out of scope for this change though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good suggestion - I was thinking of adding a whole suite of things to improve this (might change it from a scheduled service to something more custom that wakes up either every N seconds or when we add a connector, something like a blocking queue).
ksql-engine/src/main/java/io/confluent/ksql/connect/ConnectConfigService.java
Outdated
Show resolved
Hide resolved
} | ||
} | ||
|
||
private static Connector jdbc(final Map<String, String> properties) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this fits better in its own class. Otherwise this will get out-of-hand as we add more connectors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's handle this problem as it comes. I was considering the best way to do it and I'm not sure that having a class per Connector will make it any cleaner - in fact I think it might make the amount of boilerplate annoying to refactor. I may be wrong, but refactoring in the future is easy :)
ksql-engine/src/main/java/io/confluent/ksql/connect/ConnectConfigService.java
Show resolved
Hide resolved
ksql-engine/src/test/java/io/confluent/ksql/connect/ConnectConfigServiceTest.java
Show resolved
Hide resolved
private static <T> ResponseHandler<ConnectResponse<T>> createHandler( | ||
@SuppressWarnings("unchecked") | ||
@Override | ||
public ConnectResponse<List<String>> connectors() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should bake in some retries here for network/5xx errors. ditto below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will do this in a follow-up PR
ksql-engine/src/test/java/io/confluent/ksql/connect/ConnectPollingServiceTest.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Description
This PR implements the most bare-bones mechanism to automatically import tables created from a JDBC connector. It should be noted that this PR is incremental and is the minimum chunk that I felt I could implement and put out in a single PR. See the Future Work section for upcoming PRs. Because of this, there are lots of limitations - BUT if you look at the testing done section it makes for a pretty 💣 💥 demo! What's left makes it much more robust.
Design
There are two main services introduced:
ConnectConfigService
is in charge of listening to theconnect-configs
topic (value configurable) and polling /connectors endpoint, extracting "known" connector configurations, passing them to theConnectPollingService
. As of now, only JDBC source connector is a known connector.ConnectPollingService
is a scheduled service that runs every so often, scanning all kafka topics and seeing if any of them could have been created by a connector that was passed in byConnectConfigService
. If it does, it issues aCREATE TABLE
request to the KSQL endpoint.The diagram above describes the flow of creating a connector and having it automatically imported into KSQL (including what was implemented in #3149
Beyond that, the following classes were changed:
connect-configs
KsqlConnect
class simply wraps the two classes describes above into one easy to pass around componentConnector
class models information specific to each connectors (e.g. thetopic.prefix
config for JDBC connector) andConnectors
helps create thoseCreateConfigs
(WITH clause) now accepts metadata describing which connector created the source. This is not used as of this PR but it was straightforward enough removing it was annoying.Distributed System Concerns
Since multiple servers will be running this at the same time, we make sure that only one is in charge by having them all share a group.id when reading from
connect-configs
. If a server becomes the one assigned to read fromconnect-configs
, it will reconstruct the entire state by calling/connectors
and reading data from connect.Security Concerns
cc @spena - since this is asynchronous, the KSQL principal will be the one who creates the table from the connect topic. Do you have any suggestions here with regards to the ksql security model?
Future Work
DROP <SOURCE>
, it will be re-created the next timeConnectPollingService
runsconnect-configs
topic. When we do thatConnectPollingService
can be removed.DESCRIBE
functionality to connectors, which will leverage theWITH
clause change in this PR. This will be in the returned response forCREATE SOURCE CONNECTOR
for improved usabilityAWAIT <SOURCE>
so that users can wait until a certain stream is imported into KSQLTesting done
Reviewer checklist