Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: make stream and column names case-insensitive in /inserts-stream #5591

Merged
merged 7 commits into from
Jun 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -434,14 +434,14 @@ public void shouldHandleErrorResponseFromTerminatePushQuery() {
public void shouldInsertInto() throws Exception {
// Given
final KsqlObject insertRow = new KsqlObject()
.put("STR", "HELLO")
.put("LONG", 100L)
.put("str", "HELLO") // Column names are case-insensitive
.put("`LONG`", 100L) // Quotes may be used to preserve case-sensitivity
.put("DEC", new BigDecimal("13.31"))
.put("ARRAY", new KsqlArray().add("v1").add("v2"))
.put("MAP", new KsqlObject().put("some_key", "a_value").put("another_key", ""));

// When
client.insertInto(EMPTY_TEST_STREAM, insertRow).get();
client.insertInto(EMPTY_TEST_STREAM.toLowerCase(), insertRow).get(); // Stream name is case-insensitive

// Then: should receive new row
final String query = "SELECT * FROM " + EMPTY_TEST_STREAM + " EMIT CHANGES LIMIT 1;";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.confluent.ksql.api.server.InsertResult;
import io.confluent.ksql.api.server.InsertsStreamSubscriber;
import io.confluent.ksql.api.server.KsqlApiException;
import io.confluent.ksql.api.server.ServerUtils;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.metastore.model.DataSource;
Expand Down Expand Up @@ -48,18 +49,28 @@ public InsertsStreamEndpoint(final KsqlEngine ksqlEngine, final KsqlConfig ksqlC
this.reservedInternalTopics = reservedInternalTopics;
}

public InsertsStreamSubscriber createInsertsSubscriber(final String target,
public InsertsStreamSubscriber createInsertsSubscriber(final String caseInsensitiveTarget,
final JsonObject properties,
final Subscriber<InsertResult> acksSubscriber, final Context context,
final WorkerExecutor workerExecutor,
final ServiceContext serviceContext) {
VertxUtils.checkIsWorker();

if (!ksqlConfig.getBoolean(KsqlConfig.KSQL_INSERT_INTO_VALUES_ENABLED)) {
throw new KsqlApiException("The server has disabled INSERT INTO ... VALUES functionality. "
+ "To enable it, restart your ksqlDB server "
+ "with 'ksql.insert.into.values.enabled'=true",
ERROR_CODE_BAD_REQUEST);
}

final String target;
try {
target = ServerUtils.getIdentifierText(caseInsensitiveTarget);
} catch (IllegalArgumentException e) {
throw new KsqlApiException(
"Invalid target name: " + e.getMessage(), ERROR_CODE_BAD_STATEMENT);
}

final DataSource dataSource = getDataSource(ksqlEngine.getMetaStore(),
SourceName.of(target));
if (dataSource.getDataSourceType() == DataSourceType.KTABLE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

package io.confluent.ksql.api.impl;

import static io.confluent.ksql.api.impl.KeyValueExtractor.convertColumnNameCase;

import io.confluent.ksql.GenericRow;
import io.confluent.ksql.api.server.InsertResult;
import io.confluent.ksql.api.server.InsertsStreamSubscriber;
Expand Down Expand Up @@ -132,9 +134,11 @@ protected void afterSubscribe(final Subscription subscription) {
}

@Override
protected void handleValue(final JsonObject jsonObject) {
protected void handleValue(final JsonObject jsonObjectWithCaseInsensitiveFields) {

try {
final JsonObject jsonObject = convertColumnNameCase(jsonObjectWithCaseInsensitiveFields);

final Struct key = extractKey(jsonObject);
final GenericRow values = extractValues(jsonObject);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.confluent.ksql.GenericRow;
import io.confluent.ksql.api.server.KsqlApiException;
import io.confluent.ksql.api.server.ServerUtils;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.schema.ksql.Column;
import io.confluent.ksql.schema.ksql.LogicalSchema;
Expand All @@ -30,6 +31,7 @@
import java.math.RoundingMode;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;

Expand Down Expand Up @@ -69,6 +71,28 @@ public static GenericRow extractValues(final JsonObject values, final LogicalSch
return GenericRow.fromList(vals);
}

static JsonObject convertColumnNameCase(final JsonObject jsonObjectWithCaseInsensitiveFields) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of iterating through the entries again in here and having an extra method, it would be simpler to call ServerUtils.getIdentifierText in the existing extractValues method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we'd also have to do the conversion in extractKey() too, right? I think it's better to do it once upfront, rather than making both extractValues() and extractKey() responsible for converting.

I'm going to merge this for now; can open a follow-up with the requested changes if I've misunderstood.

final JsonObject jsonObject = new JsonObject();

for (Map.Entry<String, Object> entry :
jsonObjectWithCaseInsensitiveFields.getMap().entrySet()) {
final String key;
try {
key = ServerUtils.getIdentifierText(entry.getKey());
} catch (IllegalArgumentException e) {
throw new KsqlApiException(
String.format("Invalid column name. Column: %s. Reason: %s",
entry.getKey(), e.getMessage()),
Errors.ERROR_CODE_BAD_REQUEST
);
}

jsonObject.put(key, entry.getValue());
}

return jsonObject;
}

private static Object coerceObject(
final Object value,
final SqlType sqlType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,20 @@ public static String convertCommaSeparatedWilcardsToRegex(final String csv) {
return out.toString();
}

// See ParserUtil#getIdentifierText()
public static String getIdentifierText(final String text) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not great that this logic is repeated here and also in the method of the same name in ParserUtil.java. I couldn't think of a way to combine them, though, since the ParserUtil method is specific to the AST.

if (text.isEmpty()) {
return "";
}

final char firstChar = text.charAt(0);
if (firstChar == '`' || firstChar == '"') {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really also support double quotes for escaping? Thought it was just backtick.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also support double quotes. Here's the relevant code:

if (context instanceof SqlBaseParser.QuotedIdentifierAlternativeContext) {
return unquote(context.getText(), "\"");
} else if (context instanceof SqlBaseParser.BackQuotedIdentifierContext) {
return unquote(context.getText(), "`");

I also sanity checked manually.

return unquote(text, firstChar);
}

return text.toUpperCase();
}

public static boolean checkHttp2(final RoutingContext routingContext) {
if (routingContext.request().version() != HttpVersion.HTTP_2) {
routingContext.fail(BAD_REQUEST.code(),
Expand Down Expand Up @@ -131,4 +145,28 @@ static Void handleEndpointException(
ERROR_CODE_SERVER_ERROR));
return null;
}

private static String unquote(final String value, final char quote) {
if (value.charAt(0) != quote) {
throw new IllegalStateException("Value must begin with quote");
}
if (value.charAt(value.length() - 1) != quote || value.length() < 2) {
throw new IllegalArgumentException("Expected matching quote at end of value");
}

int i = 1;
while (i < value.length() - 1) {
if (value.charAt(i) == quote) {
if (value.charAt(i + 1) != quote || i + 1 == value.length() - 1) {
throw new IllegalArgumentException("Un-escaped quote in middle of value at index " + i);
}
i += 2;
} else {
i++;
}
}

return value.substring(1, value.length() - 1)
.replace("" + quote + quote, "" + quote);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import static io.confluent.ksql.test.util.EmbeddedSingleNodeKafkaCluster.VALID_USER2;
import static io.confluent.ksql.util.KsqlConfig.KSQL_STREAMS_PREFIX;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
Expand Down Expand Up @@ -221,7 +223,7 @@ public void shouldExecutePushQueryNoLimit() throws Exception {
} catch (Throwable t) {
return Integer.MAX_VALUE;
}
}, is(6));
}, greaterThanOrEqualTo(6));

// The response shouldn't have ended yet
assertThat(writeStream.isEnded(), is(false));
Expand Down Expand Up @@ -416,6 +418,79 @@ public void shouldInsertWithMissingValueField() {
shouldInsert(row);
}

@Test
public void shouldInsertWithCaseInsensitivity() {

// Given: lowercase fields names and stream name
String target = TEST_STREAM.toLowerCase();
JsonObject row = new JsonObject()
.put("str", "HELLO")
.put("dec", 12.21) // JsonObject does not accept BigDecimal
.put("array", new JsonArray().add("a").add("b"))
.put("map", new JsonObject().put("k1", "v1").put("k2", "v2"));

// Then:
shouldInsert(target, row);
}

@Test
public void shouldTreatInsertTargetAsCaseSensitiveIfQuotedWithBackticks() {
// Given:
String target = "`" + TEST_STREAM.toLowerCase() + "`";
JsonObject row = new JsonObject()
.put("STR", "HELLO")
.put("LONG", 1000L)
.put("DEC", 12.21) // JsonObject does not accept BigDecimal
.put("ARRAY", new JsonArray().add("a").add("b"))
.put("MAP", new JsonObject().put("k1", "v1").put("k2", "v2"));

// Then: request fails because stream name is invalid
shouldRejectInsertRequest(target, row, "Cannot insert values into an unknown stream: " + target);
}

@Test
public void shouldTreatInsertTargetAsCaseSensitiveIfQuotedWithDoubleQuotes() {
// Given:
String target = "\"" + TEST_STREAM.toLowerCase() + "\"";
JsonObject row = new JsonObject()
.put("STR", "HELLO")
.put("LONG", 1000L)
.put("DEC", 12.21) // JsonObject does not accept BigDecimal
.put("ARRAY", new JsonArray().add("a").add("b"))
.put("MAP", new JsonObject().put("k1", "v1").put("k2", "v2"));

// Then: request fails because stream name is invalid
shouldRejectInsertRequest(target, row, "Cannot insert values into an unknown stream: `" + TEST_STREAM.toLowerCase() + "`");
}

@Test
public void shouldTreatInsertColumnNamesAsCaseSensitiveIfQuotedWithBackticks() {
// Given:
JsonObject row = new JsonObject()
.put("`str`", "HELLO")
.put("LONG", 1000L)
.put("DEC", 12.21) // JsonObject does not accept BigDecimal
.put("ARRAY", new JsonArray().add("a").add("b"))
.put("MAP", new JsonObject().put("k1", "v1").put("k2", "v2"));

// Then: request fails because column name is incorrect
shouldFailToInsert(row, ERROR_CODE_BAD_REQUEST, "Key field must be specified: STR");
}

@Test
public void shouldTreatInsertColumnNamesAsCaseSensitiveIfQuotedWithDoubleQuotes() {
// Given:
JsonObject row = new JsonObject()
.put("\"str\"", "HELLO")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about backtick?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

.put("LONG", 1000L)
.put("DEC", 12.21) // JsonObject does not accept BigDecimal
.put("ARRAY", new JsonArray().add("a").add("b"))
.put("MAP", new JsonObject().put("k1", "v1").put("k2", "v2"));

// Then: request fails because column name is incorrect
shouldFailToInsert(row, ERROR_CODE_BAD_REQUEST, "Key field must be specified: STR");
}

@Test
public void shouldExecutePushQueryFromLatestOffset() {

Expand Down Expand Up @@ -494,15 +569,7 @@ private QueryResponse executeQuery(final String sql) {
}

private void shouldFailToInsert(final JsonObject row, final int errorCode, final String message) {
JsonObject properties = new JsonObject();
JsonObject requestBody = new JsonObject()
.put("target", TEST_STREAM).put("properties", properties);
Buffer bodyBuffer = requestBody.toBuffer();
bodyBuffer.appendString("\n");

bodyBuffer.appendBuffer(row.toBuffer()).appendString("\n");

HttpResponse<Buffer> response = sendRequest("/inserts-stream", bodyBuffer);
final HttpResponse<Buffer> response = makeInsertsRequest(TEST_STREAM, row);

assertThat(response.statusCode(), is(200));

Expand All @@ -515,15 +582,11 @@ private void shouldFailToInsert(final JsonObject row, final int errorCode, final
}

private void shouldInsert(final JsonObject row) {
JsonObject properties = new JsonObject();
JsonObject requestBody = new JsonObject()
.put("target", TEST_STREAM).put("properties", properties);
Buffer bodyBuffer = requestBody.toBuffer();
bodyBuffer.appendString("\n");

bodyBuffer.appendBuffer(row.toBuffer()).appendString("\n");
shouldInsert(TEST_STREAM, row);
}

HttpResponse<Buffer> response = sendRequest("/inserts-stream", bodyBuffer);
private void shouldInsert(final String target, final JsonObject row) {
HttpResponse<Buffer> response = makeInsertsRequest(target, row);

assertThat(response.statusCode(), is(200));

Expand All @@ -532,6 +595,29 @@ private void shouldInsert(final JsonObject row) {
assertThat(insertsResponse.error, is(nullValue()));
}

private void shouldRejectInsertRequest(final String target, final JsonObject row, final String message) {
HttpResponse<Buffer> response = makeInsertsRequest(target, row);

assertThat(response.statusCode(), is(400));
assertThat(response.statusMessage(), is("Bad Request"));

QueryResponse queryResponse = new QueryResponse(response.bodyAsString());
assertThat(queryResponse.responseObject.getInteger("error_code"), is(ERROR_CODE_BAD_STATEMENT));
assertThat(queryResponse.responseObject.getString("message"), containsString(message));
}

private HttpResponse<Buffer> makeInsertsRequest(final String target, final JsonObject row) {
JsonObject properties = new JsonObject();
JsonObject requestBody = new JsonObject()
.put("target", target).put("properties", properties);
Buffer bodyBuffer = requestBody.toBuffer();
bodyBuffer.appendString("\n");

bodyBuffer.appendBuffer(row.toBuffer()).appendString("\n");

return sendRequest("/inserts-stream", bodyBuffer);
}

private WebClient createClient() {
WebClientOptions options = new WebClientOptions().
setProtocolVersion(HttpVersion.HTTP_2).setHttp2ClearTextUpgrade(false)
Expand Down