-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: fail scalable push query if error is detected in subscribed persistent query #7996
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good. Just a couple comments
ksqldb-engine/src/main/java/io/confluent/ksql/util/PersistentQueryMetadataImpl.java
Outdated
Show resolved
Hide resolved
ksqldb-engine/src/main/java/io/confluent/ksql/util/PersistentQueryMetadataImpl.java
Outdated
Show resolved
Hide resolved
queryErrors.add(queryError); | ||
LOG.error( | ||
"Unhandled exception caught in streams thread {}. ({})", | ||
Thread.currentThread().getName(), | ||
errorType, | ||
e | ||
); | ||
listener.onError(this, queryError); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious, why was it required to move this call?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was failing some tests in SecureIntegrationTest
that called assertQueryFailsWithUserError
. It seemed that the queryError
was not being added and so the tests failed, but when i moved the listener to the end they passed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We looked into this offline and this line can be reverted.
...b-engine/src/test/java/io/confluent/ksql/physical/scalablepush/ScalablePushRegistryTest.java
Outdated
Show resolved
Hide resolved
ksqldb-engine/src/test/java/io/confluent/ksql/physical/scalablepush/PushPhysicalPlanTest.java
Show resolved
Hide resolved
ksqldb-engine/src/main/java/io/confluent/ksql/util/PersistentQueryMetadataImpl.java
Outdated
Show resolved
Hide resolved
ksqldb-engine/src/main/java/io/confluent/ksql/query/TransientQueryQueue.java
Outdated
Show resolved
Hide resolved
queryErrors.add(queryError); | ||
LOG.error( | ||
"Unhandled exception caught in streams thread {}. ({})", | ||
Thread.currentThread().getName(), | ||
errorType, | ||
e | ||
); | ||
listener.onError(this, queryError); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We looked into this offline and this line can be reverted.
Description
Listen for errors in the persistent query that a scalable push query is subscribed to and if one occurs, fail the scalable push query.
Testing done
Unit tests added, as well as manual testing to ensure that we see
Persistent query has error
on the CLI.Reviewer checklist