-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: allow insert into table using HTTP/2 endpoint and Java API #8114
Conversation
fa30ecc
to
4481cbe
Compare
|
||
// Then: should receive new row | ||
final String query = "SELECT * FROM " + EMPTY_TEST_STREAM + " EMIT CHANGES LIMIT 1;"; | ||
final StreamedQueryResult streamedQueryResult = client.streamQuery(PULL_QUERY_ON_TABLE).get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems this is unused?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch 👍
@@ -657,22 +679,111 @@ public void shouldInsertInto() throws Exception { | |||
} | |||
|
|||
@Test | |||
public void shouldHandleErrorResponseFromInsertInto() { | |||
public void shouldPullQueryTableAfterInsert() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A regular table should not be pull-queryable? Is the test name incorrect?
Overall, I am wondering what you try to do: insert into the table and verify that the data was written? For this case you could use a push query in the test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, test name should be shouldNot...
I'm trying to test you can insert into table created using with
but pull query it will throw exception
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My question is really: this PR is about allowing INSERT INTO for tables -- however, INSERT INTO and pull-queries have nothing to do with each other. So what do we gain by issuing a pull query (that we know is not supported for this case anyway)? -- Overall, it seems that the second part of this test is unnecessary and the first part is covered by the next test that uses a push query to verify if the insert was successful. Thus, should we remove this test?
...api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
@Test | ||
public void shouldPullQueryCTASTableAfterInsert() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if this test buys us much?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Trying to make sure direct insert to CTAS table will have no effect
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean by "will not affected" exaclty?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean you can insert to the table successfully but you won't see the result in pull request
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean you can insert to the table successfully but you won't see the result in pull request
Well, while this is true, to me it's rather a design flaw / bug than a feature. Thus, I am not sure if it makes sense to test this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to Matthias: I don't think this test makes sense in this file. This test file is meant to test client-specific behavior. If the purpose of this test is to verify that INSERT VALUES does not update pull query state stores, then the test does not belong in this file. (In general, integration tests are (relatively) expensive so I'd err against adding redundant/duplicate tests.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. I'll remove this
} | ||
|
||
@Test | ||
public void shouldPullQueryCTASTableAfterInsertStream() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
insert into stream
did always work, so it seems the test is actually not related to this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Want to test inserting to stream which backs CTAS table and it will be inserted to table. I'm trying to cover all possible ways to insert to a table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this closes a test gap, great. I was just wondering if this case was already covered or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. Previously in integration test we have a CTAS table AGG_TABLE
. This test case's table is a new table created using WITH
and we are inserting to the topic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto to my comment above -- I'm not convinced this test belongs in this file. I definitely think we should have a test that verifies that INSERT VALUES is allowed for tables, but I don't think we need to test auxiliary behaviors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can remove this as well
@@ -133,7 +133,7 @@ public boolean topicExists(final String topicName) { | |||
/** | |||
* Ensure topics with the given {@code topicNames} exist. | |||
* | |||
* <p>Topics will be creates, if they do not already exist, with a single partition and replica. | |||
* <p>Topics will be created, if they do not already exist, with a single partition and replica. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch!
Seems there is some spotbug issue:
Might be fixed already by removing the unused variable. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems there is some spotbug issue:
[ERROR] Medium: Dead store to streamedQueryResult in io.confluent.ksql.api.client.integration.ClientIntegrationTest.shouldInsertInto() [io.confluent.ksql.api.client.integration.ClientIntegrationTest] At ClientIntegrationTest.java:[line 660] DLS_DEAD_LOCAL_STORE
Might be fixed already by removing the unused variable.
Yeah. It's checkstyle failure and I fixed it
} | ||
|
||
@Test | ||
public void shouldPullQueryCTASTableAfterInsertStream() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. Previously in integration test we have a CTAS table AGG_TABLE
. This test case's table is a new table created using WITH
and we are inserting to the topic
@@ -657,22 +678,111 @@ public void shouldInsertInto() throws Exception { | |||
} | |||
|
|||
@Test | |||
public void shouldHandleErrorResponseFromInsertInto() { | |||
public void shouldHandlePullQueryTableExceptionAfterInsert() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you had a comment here in older commit but it's gone after I updated this file...
My question is really: this PR is about allowing INSERT INTO for tables -- however, INSERT INTO and pull-queries have nothing to do with each other. So what do we gain by issuing a pull query (that we know is not supported for this case anyway)? -- Overall, it seems that the second part of this test is unnecessary and the first part is covered by the next test that uses a push query to verify if the insert was successful. Thus, should we remove this test?
I guess this could be two test case:
- Insert into table
- Table created with
WITH
is not queryable.
I don't see 2 is covered here so added it. Maybe I can split it to 2 test cases to make it clearer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM. Call for second review @vcrfxia to sanity check, this part of the code is also new to me. -- In particular if we should/need to test "insert into" a table created via CTAS.
} | ||
|
||
@Test | ||
public void shouldPullQueryCTASTableAfterInsert() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean you can insert to the table successfully but you won't see the result in pull request
Well, while this is true, to me it's rather a design flaw / bug than a feature. Thus, I am not sure if it makes sense to test this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the fix @lihaosky ! Some comments about test coverage inline.
I also updated the PR title to be more informative to users for when it's eventually included in the changelog for a release.
@@ -591,24 +591,6 @@ public void shouldInsertInto() throws Exception { | |||
assertThat(testEndpoints.getLastTarget(), is("test-stream")); | |||
} | |||
|
|||
@Test | |||
public void shouldHandleErrorResponseFromInsertInto() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are these tests being removed? They are validating that an informative 400 response error message is returned to the user. We can update the exception message to be more realistic (in light of the changes in this PR) but I think we should keep the test coverage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But an "insert into" should be supported now? What kind of error do you have in mind that we should test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The purpose of this test is to check that any error from the insert values endpoint is properly propagated to the user. If you look at the code for the endpoint, you'll see that there are plenty of other errors that could be triggered (besides attempting to insert into a table), e.g., trying to insert into a non-existent source.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Deleting this test opens us up to potential regressions where the client does not properly propagate error messages from the server response to the user. We should not delete this coverage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @vcrfxia, do you mean we should also test handling error like these:
https://github.com/confluentinc/ksql/blob/master/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/InsertsStreamEndpoint.java#L61-L80
I agree we should cover those but that seems belong to another PR since these PR is about allowing insert to table. I can create another issue about above missing tests and fix those in a separate PR. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm saying we should have one test to validate that any error is properly propagated from the server to the user (via the client). We do not need to test every possible server error. The proper place for testing each possible server error would be in a unit/integration test for the server endpoint, not the client. The client tests should just contain a single test to validate that any error is properly propagated to the user.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should simply update the test highlighted in this comment (in this PR) to test a different error, rather than removing the test in this PR and opening a follow-up PR to add it back.
@@ -255,7 +259,12 @@ public static void setUpClass() throws Exception { | |||
AGG_SCHEMA | |||
); | |||
|
|||
final String testDir = Paths.get(TestUtils.tempDirectory().getAbsolutePath(), "client_integ_test").toString(); | |||
makeKsqlRequest( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we move this up a few lines to be alongside the other makeKsqlRequest()
?
@@ -657,22 +678,103 @@ public void shouldInsertInto() throws Exception { | |||
} | |||
|
|||
@Test | |||
public void shouldHandleErrorResponseFromInsertInto() { | |||
public void shouldHandlePullQueryTableException() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this test being added? Testing the specific error (query against a table this is not query-able) doesn't make sense in this class since it's not specific to the client. Regarding client behavior for this error, this test file already contains shouldHandleErrorResponseFromExecuteQuery()
and shouldHandleErrorResponseFromStreamQuery()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm trying to cover more error cases. This is client behavior triggering exception by query non-queryable table. shouldHandleErrorResponseFromExecuteQuery
and shouldHandleErrorResponseFromStreamQuery
seem cover other behavior like query non-exist table. I feel query non-queryable table is also one of the cases. If you feel this is not needed, I can remove this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As mentioned above, whether the table is queryable or not is not relevant to the client. For client test coverage, we should simply check that the client properly propagates errors from the server to the user. This is not the right place to try exercising all possible server-side errors. That belongs in the server unit/integration tests instead.
} | ||
|
||
@Test | ||
public void shouldPushQueryTableAfterInsert() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The existing test shouldInsertInto()
already issues a push query after an insertInto()
call. What additional coverage does this test buy us?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldInsertInto
insert into a stream EMPTY_TEST_STREAM
, this one insertInto
a table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If that's the purpose of this new test, can we rename the test to shouldInsertIntoTable()
or similar, in order to better capture what the test is for?
} | ||
|
||
@Test | ||
public void shouldPullQueryCTASTableAfterInsert() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to Matthias: I don't think this test makes sense in this file. This test file is meant to test client-specific behavior. If the purpose of this test is to verify that INSERT VALUES does not update pull query state stores, then the test does not belong in this file. (In general, integration tests are (relatively) expensive so I'd err against adding redundant/duplicate tests.)
} | ||
|
||
@Test | ||
public void shouldPullQueryCTASTableAfterInsertStream() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto to my comment above -- I'm not convinced this test belongs in this file. I definitely think we should have a test that verifies that INSERT VALUES is allowed for tables, but I don't think we need to test auxiliary behaviors.
@@ -657,22 +678,103 @@ public void shouldInsertInto() throws Exception { | |||
} | |||
|
|||
@Test | |||
public void shouldHandleErrorResponseFromInsertInto() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is important test coverage (that error responses are properly exposed to the user). If we are removing the specific cause of the error used in the existing test, then let's replace the error with a different error thrown from the endpoint, rather than removing the test entirely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As mentioned above, I can create separate PR to add more tests catching different exceptions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As above, I don't think we should be removing test coverage in this PR. Let me know if what I'm saying isn't making sense, I'm happy to sync offline.
@@ -858,25 +975,12 @@ public void shouldStreamInserts() throws Exception { | |||
assertThat(acksPublisher.isFailed(), is(false)); | |||
} | |||
|
|||
@Test | |||
public void shouldHandleErrorResponseFromStreamInserts() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As above: this is important test coverage (that error responses are properly exposed to the user). If we are removing the specific cause of the error used in the existing test, then let's replace the error with a different error thrown from the endpoint, rather than removing the test entirely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mentioned above, I can add more case in different PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As above: #8114 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @lihaosky -- LGTM besides the minor comment inline.
); | ||
|
||
// Then | ||
assertThat(e.getCause(), instanceOf(KsqlClientException.class)); | ||
assertThat(e.getCause().getMessage(), containsString("Received 400 response from server")); | ||
assertThat(e.getCause().getMessage(), containsString("Cannot insert into a table")); | ||
assertThat(e.getCause().getMessage(), containsString("Cannot insert values into an unknown stream")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's update the error message (in the actual server endpoint) so that it says "unknown stream/table" rather than "unknown stream".
@@ -89,7 +86,8 @@ private DataSource getDataSource( | |||
final DataSource dataSource = metaStore.getSource(sourceName); | |||
if (dataSource == null) { | |||
throw new KsqlApiException( | |||
"Cannot insert values into an unknown stream: " + sourceName, ERROR_CODE_BAD_STATEMENT); | |||
"Cannot insert values into an unknown stream/table: " + sourceName, | |||
ERROR_CODE_BAD_STATEMENT); | |||
} | |||
|
|||
if (dataSource.getKsqlTopic().getKeyFormat().isWindowed()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also update the error message in this block to say stream/table
rather than just stream
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM -- thanks @lihaosky !
Description
Inserting into tables using CLI is permitted but using Java API is not. Allow it in Java API to make the behavior consistent.
Fixes #7948
Changes
ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/InsertsStreamEndpoint.java
.ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java
as they usetestEndpoint
and doesn't actually test anything in production endpoint.