-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ksql 386 ksql functions non static #370
Ksql 386 ksql functions non static #370
Conversation
…uery termination.
…intermediate-topics-for-terminated-queries
build-tools/pom.xml
Outdated
@@ -20,6 +20,6 @@ | |||
<modelVersion>4.0.0</modelVersion> | |||
<groupId>io.confluent</groupId> | |||
<artifactId>build-tools</artifactId> | |||
<version>4.0.0-SNAPSHOT</version> | |||
<version>4.1.0-SNAPSHOT</version> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why 4.1.0-SNAPSHOT? Shouldn't this branch remain on 4.0?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is 4.0.1 using kafka 11.0.1 where the adminclient is fixed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be 4.0.0. I think it was messed up when I synced while Alex was preparing the branches. Will fix it.
ksql-clickstream-demo/pom.xml
Outdated
@@ -23,7 +23,7 @@ | |||
<parent> | |||
<groupId>io.confluent.ksql</groupId> | |||
<artifactId>ksql-parent</artifactId> | |||
<version>4.0.0-SNAPSHOT</version> | |||
<version>4.1.0-SNAPSHOT</version> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above, will fix it :)
@@ -48,6 +48,10 @@ | |||
public static final String DEFAULT_SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION = | |||
"ksql.sink.window.change.log.additional.retention.default"; | |||
|
|||
public static final String STREAM_INTERNAL_CHANGELOG_TOPIC_SUFFIX = "-changelog"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do these need to be public? Looks like they could be package protected and don't need to be known by a user?
@@ -61,8 +64,8 @@ public KsqlContext() { | |||
streamsProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BOOTSTRAP_SERVER_OPTION_DEFAULT); | |||
} | |||
KsqlConfig ksqlConfig = new KsqlConfig(streamsProperties); | |||
|
|||
topicClient = new KafkaTopicClientImpl(ksqlConfig.getKsqlAdminClientConfigProps()); | |||
adminClient = AdminClient.create(ksqlConfig.getKsqlAdminClientConfigProps()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As it stands we can't test this class without starting up kafka etc. I suggest we change it so that we can unit test the class. We could add one or two static factory methods, i.e,
KsqlContext.create()
and KsqlContext.create(Map<String, Object> properties)
we than have a package private constructor KsqlContext(AdminClient, KafkaTopicClient, KsqlEngine)
We can then have unit tests of this class
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point! I'll make the changes.
final MetaStore tempMetaStore) | ||
throws Exception { | ||
|
||
public List<QueryMetadata> planQueries(final boolean createNewAppId, final List<Pair<String, Statement>> statementList, final Map<String, Object> overriddenProperties, final MetaStore tempMetaStore) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
with so many params and a long line length it would be good to have 1 param per line. Makes the diffs easier to read
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
@@ -120,7 +123,7 @@ private Predicate getWindowedKeyPredicate() throws Exception { | |||
CodeGenRunner codeGenRunner = new CodeGenRunner(); | |||
ExpressionMetadata | |||
expressionEvaluator = | |||
codeGenRunner.buildCodeGenFromParseTree(filterExpression, schema); | |||
codeGenRunner.buildCodeGenFromParseTree(filterExpression, schema, ksqlFunctionRegistry); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pass KSqlFunctionRegistry
into constructor of CodeGenRunner
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea! Done :)
import java.util.List; | ||
import java.util.Set; | ||
|
||
public class CleanUpUtil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see where this class is used and there are no tests for it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will use it in the clean up task.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
code shouldn't be added without tests
|
||
public KafkaTopicClientImpl(Map<String, Object> adminClientConfig) { | ||
this.adminClientConfig = adminClientConfig; | ||
public KafkaTopicClientImpl(AdminClient adminClient) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: final
} catch (InterruptedException | ExecutionException e) { | ||
throw new KafkaResponseGetFailedException("Failed to describe kafka topics", e); | ||
} | ||
} | ||
|
||
public void deleteTopics(List<String> topicsToDelete) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tests for this method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will add test when I finish the clean up task which this method is for.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as above
hasDeleteErrors = true; | ||
} | ||
} | ||
if (hasDeleteErrors) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rather than having a flag it would be better to collect all the failures into a list and then throw an exception with all the topics that failed. Also, we shouldn't be throwing RuntimeException
- at the very least it should be KsqlException
, but perhaps a DeleteTopicsFailedException
or something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea, Done!
@@ -165,7 +172,7 @@ private static String formatQualifiedName(QualifiedName name) { | |||
Boolean unmangleNames) { | |||
StringBuilder builder = new StringBuilder("("); | |||
String name = node.getName().getSuffix(); | |||
KsqlFunction ksqlFunction = KsqlFunctions.getFunction(name); | |||
KsqlFunction ksqlFunction = ksqlFunctionRegistry.getFunction(name); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to remove the ksql prefixing everywhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yer, consider it done :)
@@ -46,12 +46,16 @@ | |||
import java.util.List; | |||
import java.util.Map; | |||
|
|||
public class KsqlFunctions { | |||
public class KsqlFunctionRegistry { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can rename this to FunctionRegistry - we have an an explosion of KSQLEverything
|
||
public class KafkaTopicClientImpl implements KafkaTopicClient { | ||
private static final Logger log = LoggerFactory.getLogger(KafkaTopicClient.class); | ||
private final Map<String, Object> adminClientConfig; | ||
private final AdminClient adminClient; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is 4.0.1 using kafka 11.0.1 where the adminclient is fixed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we use 4.0.0 and it has Kafka 1.0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the adminclient should be fixed there. I also reimported your changes to use singleton adminclinent
build-tools/pom.xml
Outdated
@@ -20,6 +20,6 @@ | |||
<modelVersion>4.0.0</modelVersion> | |||
<groupId>io.confluent</groupId> | |||
<artifactId>build-tools</artifactId> | |||
<version>4.0.0-SNAPSHOT</version> | |||
<version>4.1.0-SNAPSHOT</version> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is 4.0.1 using kafka 11.0.1 where the adminclient is fixed?
@@ -69,12 +72,12 @@ private PlanNode buildLogicalPlan(String queryStr) { | |||
analyzer.process(statements.get(0), new AnalysisContext(null)); | |||
AggregateAnalysis aggregateAnalysis = new AggregateAnalysis(); | |||
AggregateAnalyzer aggregateAnalyzer = new AggregateAnalyzer(aggregateAnalysis, | |||
analysis); | |||
analysis, ksqlFunctionRegistry); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would make me happy to rename this to functionRegistry
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely :) It's done!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @hjafarpour, still no tests for CleanupUtil
and KafkaTopicClient.deleteTopics
@@ -50,7 +63,7 @@ public KsqlContext() { | |||
* | |||
* @param streamsProperties | |||
*/ | |||
public KsqlContext(Map<String, Object> streamsProperties) { | |||
private KsqlContext(Map<String, Object> streamsProperties) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
put the logic in here into the create(Map)
method and have it call the new constructor below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
return new KsqlContext(streamsProperties); | ||
} | ||
|
||
private KsqlContext() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can remove this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
|
||
|
||
public static KsqlContext create() { | ||
return new KsqlContext(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
delegation, i.e, create(Collections.emptyMap())
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, made the changes.
@hjafarpour this PR doesn't look right. You have commits that are unrelated, i.e., those from @aayars |
@dguy the commits by alex came from a merge I had to do with master. |
@hjafarpour why would you need to merge master into 4.0.x? It should be the other way around |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
This PR refactors the function meta data handling by using a Function registry.