Skip to content

Commit

Permalink
fix: format cast arguments with passed context (6.0.x) (#7032)
Browse files Browse the repository at this point in the history
  • Loading branch information
vcrfxia authored Feb 18, 2021
1 parent d6d9b57 commit 8c0d93d
Show file tree
Hide file tree
Showing 6 changed files with 230 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,8 @@ public String visitLikePredicate(final LikePredicate node, final Context context
@Override
public String visitCast(final Cast node, final Context context) {
return "CAST"
+ "(" + process(node.getExpression(), context) + " AS " + node.getType() + ")";
+ "(" + process(node.getExpression(), context) + " AS "
+ process(node.getType(), context) + ")";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,22 @@ public void shouldFormatCast() {
assertThat(result, equalTo("CAST(1 AS DOUBLE)"));
}

@Test
public void shouldFormatCastToStruct() {
// Given:
final Cast cast = new Cast(
new StringLiteral("foo"),
new Type(SqlStruct.builder()
.field("field", SqlTypes.STRING).build())
);

// When:
final String result = ExpressionFormatter.formatExpression(cast, FormatOptions.none());

// Then:
assertThat(result, equalTo("CAST('foo' AS STRUCT<`field` STRING>)"));
}

@Test
public void shouldFormatSearchedCaseExpression() {
final SearchedCaseExpression expression = new SearchedCaseExpression(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
{
"plan" : [ {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM TEST (ID STRING KEY, F0 STRUCT<F0 INTEGER, `f1` INTEGER>) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "TEST",
"schema" : "`ID` STRING KEY, `F0` STRUCT<`F0` INTEGER, `f1` INTEGER>",
"topicName" : "test_topic",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
}
}
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM OUTPUT AS SELECT\n TEST.ID ID,\n CAST(TEST.F0 AS STRUCT<F0 INTEGER, `f1` INTEGER>) KSQL_COL_0\nFROM TEST TEST\nEMIT CHANGES",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "OUTPUT",
"schema" : "`ID` STRING KEY, `KSQL_COL_0` STRUCT<`F0` INTEGER, `f1` INTEGER>",
"topicName" : "OUTPUT",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
}
},
"queryPlan" : {
"sources" : [ "TEST" ],
"sink" : "OUTPUT",
"physicalPlan" : {
"@type" : "streamSinkV1",
"properties" : {
"queryContext" : "OUTPUT"
},
"source" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "Project"
},
"source" : {
"@type" : "streamSourceV1",
"properties" : {
"queryContext" : "KsqlTopic/Source"
},
"topicName" : "test_topic",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"sourceSchema" : "`ID` STRING KEY, `F0` STRUCT<`F0` INTEGER, `f1` INTEGER>"
},
"keyColumnNames" : [ "ID" ],
"selectExpressions" : [ "CAST(F0 AS STRUCT<F0 INTEGER, `f1` INTEGER>) AS KSQL_COL_0" ]
},
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"topicName" : "OUTPUT"
},
"queryId" : "CSAS_OUTPUT_0"
}
} ],
"configs" : {
"ksql.extension.dir" : "ext",
"ksql.streams.cache.max.bytes.buffering" : "0",
"ksql.security.extension.class" : null,
"metric.reporters" : "",
"ksql.transient.prefix" : "transient_",
"ksql.persistence.wrap.single.values" : "true",
"ksql.authorization.cache.expiry.time.secs" : "30",
"ksql.schema.registry.url" : "",
"ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler",
"ksql.output.topic.name.prefix" : "",
"ksql.streams.auto.offset.reset" : "earliest",
"ksql.query.pull.enable.standby.reads" : "false",
"ksql.connect.url" : "http://localhost:8083",
"ksql.service.id" : "some.ksql.service.id",
"ksql.internal.topic.min.insync.replicas" : "1",
"ksql.streams.shutdown.timeout.ms" : "300000",
"ksql.internal.topic.replicas" : "1",
"ksql.insert.into.values.enabled" : "true",
"ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807",
"ksql.query.pull.max.qps" : "2147483647",
"ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler",
"ksql.access.validator.enable" : "auto",
"ksql.streams.bootstrap.servers" : "localhost:0",
"ksql.streams.commit.interval.ms" : "2000",
"ksql.query.pull.metrics.enabled" : "false",
"ksql.streams.auto.commit.interval.ms" : "0",
"ksql.metrics.extension" : null,
"ksql.streams.topology.optimization" : "all",
"ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.streams.num.stream.threads" : "4",
"ksql.timestamp.throw.on.invalid" : "false",
"ksql.authorization.cache.max.entries" : "10000",
"ksql.metrics.tags.custom" : "",
"ksql.pull.queries.enable" : "true",
"ksql.udfs.enabled" : "true",
"ksql.udf.enable.security.manager" : "true",
"ksql.connect.worker.config" : "",
"ksql.sink.window.change.log.additional.retention" : "1000000",
"ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.udf.collect.metrics" : "false",
"ksql.persistent.prefix" : "query_",
"ksql.query.persistent.active.limit" : "2147483647",
"ksql.error.classifier.regex" : ""
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
{
"version" : "6.0.2",
"timestamp" : 1613684374835,
"path" : "query-validation-tests/cast.json",
"schemas" : {
"CSAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT<F0 STRUCT<F0 INT, f1 INT>> NOT NULL",
"CSAS_OUTPUT_0.OUTPUT" : "STRUCT<KSQL_COL_0 STRUCT<F0 INT, f1 INT>> NOT NULL"
},
"testCase" : {
"name" : "struct to struct with same schema",
"inputs" : [ {
"topic" : "test_topic",
"key" : null,
"value" : {
"f0" : {
"f0" : 1,
"f1" : 3
}
}
} ],
"outputs" : [ {
"topic" : "OUTPUT",
"key" : null,
"value" : {
"KSQL_COL_0" : {
"F0" : 1,
"f1" : 3
}
}
} ],
"topics" : [ {
"name" : "OUTPUT",
"replicas" : 1,
"numPartitions" : 4
}, {
"name" : "test_topic",
"replicas" : 1,
"numPartitions" : 4
} ],
"statements" : [ "CREATE STREAM TEST (ID STRING KEY, f0 STRUCT<F0 INT, `f1` INT>) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT ID, cast(f0 as STRUCT<F0 INTEGER, `f1` INTEGER>) FROM TEST;" ],
"post" : {
"topics" : {
"topics" : [ {
"name" : "OUTPUT",
"keyFormat" : {
"formatInfo" : {
"format" : "KAFKA"
}
},
"valueFormat" : {
"format" : "JSON"
},
"partitions" : 4
}, {
"name" : "test_topic",
"keyFormat" : {
"formatInfo" : {
"format" : "KAFKA"
}
},
"valueFormat" : {
"format" : "JSON"
},
"partitions" : 4
} ]
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic])
--> KSTREAM-TRANSFORMVALUES-0000000001
Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: [])
--> Project
<-- KSTREAM-SOURCE-0000000000
Processor: Project (stores: [])
--> KSTREAM-SINK-0000000003
<-- KSTREAM-TRANSFORMVALUES-0000000001
Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT)
<-- Project

Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,14 @@
{
"name": "struct to struct with same schema",
"statements": [
"CREATE STREAM TEST (ID STRING KEY, f0 STRUCT<F0 INT>) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE STREAM OUTPUT AS SELECT ID, cast(f0 as STRUCT<F0 INTEGER>) FROM TEST;"
"CREATE STREAM TEST (ID STRING KEY, f0 STRUCT<F0 INT, `f1` INT>) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE STREAM OUTPUT AS SELECT ID, cast(f0 as STRUCT<F0 INTEGER, `f1` INTEGER>) FROM TEST;"
],
"inputs": [
{"topic": "test_topic", "value": {"f0": {"f0": 1}}}
{"topic": "test_topic", "value": {"f0": {"f0": 1, "f1": 3}}}
],
"outputs": [
{"topic": "OUTPUT", "value": {"KSQL_COL_0": {"F0": 1}}}
{"topic": "OUTPUT", "value": {"KSQL_COL_0": {"F0": 1, "f1": 3}}}
]
},
{
Expand Down

0 comments on commit 8c0d93d

Please sign in to comment.