diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/json/PlanJsonMapper.java b/ksql-engine/src/main/java/io/confluent/ksql/execution/json/PlanJsonMapper.java similarity index 80% rename from ksql-execution/src/main/java/io/confluent/ksql/execution/json/PlanJsonMapper.java rename to ksql-engine/src/main/java/io/confluent/ksql/execution/json/PlanJsonMapper.java index b913ac928823..b84d6e8d8f02 100644 --- a/ksql-execution/src/main/java/io/confluent/ksql/execution/json/PlanJsonMapper.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/execution/json/PlanJsonMapper.java @@ -19,6 +19,9 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import io.confluent.ksql.json.KsqlTypesSerializationModule; +import io.confluent.ksql.parser.json.KsqlParserSerializationModule; +import io.confluent.ksql.parser.json.KsqlTypesDeserializationModule; public final class PlanJsonMapper { private PlanJsonMapper() { @@ -33,7 +36,10 @@ public static ObjectMapper create() { ObjectMapper mapper = new ObjectMapper(); mapper.registerModules( new Jdk8Module(), - new JavaTimeModule() + new JavaTimeModule(), + new KsqlParserSerializationModule(), + new KsqlTypesSerializationModule(), + new KsqlTypesDeserializationModule(true) ); mapper.enable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); mapper.enable(DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES); diff --git a/ksql-execution/src/test/java/io/confluent/ksql/execution/json/PlanJsonMapperTest.java b/ksql-engine/src/test/java/io/confluent/ksql/execution/json/PlanJsonMapperTest.java similarity index 100% rename from ksql-execution/src/test/java/io/confluent/ksql/execution/json/PlanJsonMapperTest.java rename to ksql-engine/src/test/java/io/confluent/ksql/execution/json/PlanJsonMapperTest.java diff --git a/ksql-engine/src/test/java/io/confluent/ksql/structured/QueryContextTest.java b/ksql-engine/src/test/java/io/confluent/ksql/structured/QueryContextTest.java index 49f70376f01d..ce6f9d5cbbc9 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/structured/QueryContextTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/structured/QueryContextTest.java @@ -18,13 +18,18 @@ import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableSet; import io.confluent.ksql.execution.context.QueryContext; +import java.io.IOException; import org.junit.Test; public class QueryContextTest { + private static final ObjectMapper MAPPER = new ObjectMapper(); + private final QueryContext.Stacker contextStacker = new QueryContext.Stacker().push("node"); private final QueryContext queryContext = contextStacker.getQueryContext(); @@ -51,4 +56,26 @@ public void shouldGenerateNewContextOnPush() { assertQueryContext(childContext, "node", "child"); assertQueryContext(grandchildContext, "node", "child", "grandchild"); } + + @Test + public void shouldSerializeCorrectly() throws IOException { + // Given: + final QueryContext context = contextStacker.push("child").getQueryContext(); + + // When: + final String serialized = MAPPER.writeValueAsString(context); + + // Then: + assertThat(serialized, is("\"node/child\"")); + } + + @Test + public void shouldDeserializeCorrectly() throws IOException { + // When: + final QueryContext deserialized = MAPPER.readValue("\"node/child\"", QueryContext.class); + + // Then: + final QueryContext expected = contextStacker.push("child").getQueryContext(); + assertThat(deserialized, is(expected)); + } } \ No newline at end of file diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/context/QueryContext.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/context/QueryContext.java index 50cdd7793670..b2f5cf886c7d 100644 --- a/ksql-execution/src/main/java/io/confluent/ksql/execution/context/QueryContext.java +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/context/QueryContext.java @@ -16,6 +16,7 @@ package io.confluent.ksql.execution.context; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonValue; import com.google.common.collect.ImmutableList; import com.google.errorprone.annotations.Immutable; @@ -43,12 +44,12 @@ private QueryContext(List context) { } } - @SuppressWarnings("unused")// Invoked via reflection by Jackson @JsonCreator - private QueryContext(final String context) { + private QueryContext(String context) { this(ImmutableList.copyOf(context.split(DELIMITER))); } + @JsonIgnore public List getContext() { return context; } diff --git a/ksql-functional-tests/src/main/java/io/confluent/ksql/test/tools/TestExecutorUtil.java b/ksql-functional-tests/src/main/java/io/confluent/ksql/test/tools/TestExecutorUtil.java index f37bccbe9c60..e94b4edbf5fa 100644 --- a/ksql-functional-tests/src/main/java/io/confluent/ksql/test/tools/TestExecutorUtil.java +++ b/ksql-functional-tests/src/main/java/io/confluent/ksql/test/tools/TestExecutorUtil.java @@ -20,14 +20,17 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import io.confluent.kafka.schemaregistry.client.SchemaMetadata; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.ksql.KsqlExecutionContext; import io.confluent.ksql.KsqlExecutionContext.ExecuteResult; import io.confluent.ksql.engine.KsqlEngine; +import io.confluent.ksql.engine.KsqlPlan; import io.confluent.ksql.engine.SqlFormatInjector; import io.confluent.ksql.engine.StubInsertValuesExecutor; +import io.confluent.ksql.execution.json.PlanJsonMapper; import io.confluent.ksql.metastore.MetaStore; import io.confluent.ksql.metastore.model.DataSource; import io.confluent.ksql.name.SourceName; @@ -41,12 +44,14 @@ import io.confluent.ksql.parser.tree.Query; import io.confluent.ksql.parser.tree.Relation; import io.confluent.ksql.parser.tree.Table; +import io.confluent.ksql.planner.plan.ConfiguredKsqlPlan; import io.confluent.ksql.schema.ksql.inference.DefaultSchemaInjector; import io.confluent.ksql.schema.ksql.inference.SchemaRegistryTopicSchemaSupplier; import io.confluent.ksql.serde.Format; import io.confluent.ksql.services.KafkaTopicClient; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.statement.ConfiguredStatement; +import io.confluent.ksql.test.TestFrameworkException; import io.confluent.ksql.test.serde.SerdeSupplier; import io.confluent.ksql.test.tools.stubs.StubKafkaService; import io.confluent.ksql.test.utils.SerdeUtil; @@ -55,6 +60,7 @@ import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.KsqlStatementException; import io.confluent.ksql.util.PersistentQueryMetadata; +import java.io.IOException; import java.time.Duration; import java.util.ArrayList; import java.util.List; @@ -71,6 +77,8 @@ public final class TestExecutorUtil { // CHECKSTYLE_RULES.ON: ClassDataAbstractionCoupling + private static final ObjectMapper PLAN_MAPPER = PlanJsonMapper.create(); + private TestExecutorUtil() { } @@ -275,7 +283,6 @@ private static List execute( .collect(Collectors.toList()); } - @SuppressWarnings({"rawtypes", "unchecked"}) private static ExecuteResultAndSortedSources execute( final KsqlExecutionContext executionContext, @@ -308,7 +315,7 @@ private static ExecuteResultAndSortedSources execute( final ExecuteResult executeResult; try { - executeResult = executionContext.execute(executionContext.getServiceContext(), reformatted); + executeResult = executeConfiguredStatement(executionContext, reformatted); } catch (final KsqlStatementException statementException) { // use the original statement text in the exception so that tests // can easily check that the failed statement is the input statement @@ -339,6 +346,31 @@ private static ExecuteResultAndSortedSources execute( Optional.empty()); } + @SuppressWarnings("unchecked") + private static ExecuteResult executeConfiguredStatement( + final KsqlExecutionContext executionContext, + final ConfiguredStatement stmt) { + final ConfiguredKsqlPlan configuredPlan; + try { + configuredPlan = buildConfiguredPlan(executionContext, stmt); + } catch (final IOException e) { + throw new TestFrameworkException("Error (de)serializing plan: " + e.getMessage(), e); + } + return executionContext.execute(executionContext.getServiceContext(), configuredPlan); + } + + private static ConfiguredKsqlPlan buildConfiguredPlan( + final KsqlExecutionContext executionContext, + final ConfiguredStatement stmt + ) throws IOException { + final KsqlPlan plan = executionContext.plan(executionContext.getServiceContext(), stmt); + final String serialized = PLAN_MAPPER.writeValueAsString(plan); + return ConfiguredKsqlPlan.of( + PLAN_MAPPER.readValue(serialized, KsqlPlan.class), + stmt.getOverrides(), + stmt.getConfig()); + } + private static Optional getWindowSize(final Query query) { return query.getWindow().flatMap(window -> window .getKsqlWindowExpression() diff --git a/ksql-rest-client/src/main/java/io/confluent/ksql/rest/client/json/KsqlTypesDeserializationModule.java b/ksql-parser/src/main/java/io/confluent/ksql/parser/json/KsqlTypesDeserializationModule.java similarity index 85% rename from ksql-rest-client/src/main/java/io/confluent/ksql/rest/client/json/KsqlTypesDeserializationModule.java rename to ksql-parser/src/main/java/io/confluent/ksql/parser/json/KsqlTypesDeserializationModule.java index 992010afd770..b1fc161fe2ab 100644 --- a/ksql-rest-client/src/main/java/io/confluent/ksql/rest/client/json/KsqlTypesDeserializationModule.java +++ b/ksql-parser/src/main/java/io/confluent/ksql/parser/json/KsqlTypesDeserializationModule.java @@ -13,14 +13,14 @@ * specific language governing permissions and limitations under the License. */ -package io.confluent.ksql.rest.client.json; +package io.confluent.ksql.parser.json; import com.fasterxml.jackson.databind.module.SimpleModule; import io.confluent.ksql.schema.ksql.LogicalSchema; public class KsqlTypesDeserializationModule extends SimpleModule { - public KsqlTypesDeserializationModule() { - addDeserializer(LogicalSchema.class, new LogicalSchemaDeserializer()); + public KsqlTypesDeserializationModule(boolean withImplicitColumns) { + addDeserializer(LogicalSchema.class, new LogicalSchemaDeserializer(withImplicitColumns)); } } diff --git a/ksql-rest-client/src/main/java/io/confluent/ksql/rest/client/json/LogicalSchemaDeserializer.java b/ksql-parser/src/main/java/io/confluent/ksql/parser/json/LogicalSchemaDeserializer.java similarity index 83% rename from ksql-rest-client/src/main/java/io/confluent/ksql/rest/client/json/LogicalSchemaDeserializer.java rename to ksql-parser/src/main/java/io/confluent/ksql/parser/json/LogicalSchemaDeserializer.java index 304c73ae3ec1..21ea5a9a8a2d 100644 --- a/ksql-rest-client/src/main/java/io/confluent/ksql/rest/client/json/LogicalSchemaDeserializer.java +++ b/ksql-parser/src/main/java/io/confluent/ksql/parser/json/LogicalSchemaDeserializer.java @@ -13,7 +13,7 @@ * specific language governing permissions and limitations under the License. */ -package io.confluent.ksql.rest.client.json; +package io.confluent.ksql.parser.json; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.databind.DeserializationContext; @@ -25,6 +25,11 @@ import java.io.IOException; final class LogicalSchemaDeserializer extends JsonDeserializer { + final boolean withImplicitColumns; + + LogicalSchemaDeserializer(final boolean withImplicitColumns) { + this.withImplicitColumns = withImplicitColumns; + } @Override public LogicalSchema deserialize( @@ -36,6 +41,6 @@ public LogicalSchema deserialize( final TableElements tableElements = SchemaParser.parse(text, TypeRegistry.EMPTY); - return tableElements.toLogicalSchema(false); + return tableElements.toLogicalSchema(withImplicitColumns); } } diff --git a/ksql-rest-client/src/test/java/io/confluent/ksql/rest/client/json/LogicalSchemaDeserializerTest.java b/ksql-parser/src/test/java/io/confluent/ksql/parser/json/LogicalSchemaDeserializerTest.java similarity index 78% rename from ksql-rest-client/src/test/java/io/confluent/ksql/rest/client/json/LogicalSchemaDeserializerTest.java rename to ksql-parser/src/test/java/io/confluent/ksql/parser/json/LogicalSchemaDeserializerTest.java index 11f6e13625f5..65f4e904bcf0 100644 --- a/ksql-rest-client/src/test/java/io/confluent/ksql/rest/client/json/LogicalSchemaDeserializerTest.java +++ b/ksql-parser/src/test/java/io/confluent/ksql/parser/json/LogicalSchemaDeserializerTest.java @@ -13,7 +13,7 @@ * specific language governing permissions and limitations under the License. */ -package io.confluent.ksql.rest.client.json; +package io.confluent.ksql.parser.json; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; @@ -32,7 +32,7 @@ public class LogicalSchemaDeserializerTest { @BeforeClass public static void classSetUp() { - MAPPER.registerModule(new TestModule()); + MAPPER.registerModule(new TestModule(false)); } @Test @@ -83,10 +83,29 @@ public void shouldDeserializeSchemaWithKeyAfterValue() throws Exception { .build())); } + @Test + public void shouldAddImplicitColumns() throws Exception { + // Given: + final ObjectMapper mapper = new ObjectMapper(); + mapper.registerModule(new TestModule(true)); + final String json = "\"`v0` INTEGER\""; + + // When: + final LogicalSchema schema = mapper.readValue(json, LogicalSchema.class); + + // Then: + assertThat(schema, is(LogicalSchema.builder() + .valueColumn(ColumnName.of("v0"), SqlTypes.INTEGER) + .build())); + } + private static class TestModule extends SimpleModule { - private TestModule() { - addDeserializer(LogicalSchema.class, new LogicalSchemaDeserializer()); + private TestModule(boolean withImplicitColumns) { + addDeserializer( + LogicalSchema.class, + new LogicalSchemaDeserializer(withImplicitColumns) + ); } } } diff --git a/ksql-rest-client/src/main/java/io/confluent/ksql/rest/client/KsqlClient.java b/ksql-rest-client/src/main/java/io/confluent/ksql/rest/client/KsqlClient.java index da01dc0dfec7..2f26d8542d38 100644 --- a/ksql-rest-client/src/main/java/io/confluent/ksql/rest/client/KsqlClient.java +++ b/ksql-rest-client/src/main/java/io/confluent/ksql/rest/client/KsqlClient.java @@ -20,8 +20,8 @@ import com.fasterxml.jackson.databind.DeserializationFeature; import com.google.common.annotations.VisibleForTesting; import io.confluent.ksql.json.JsonMapper; +import io.confluent.ksql.parser.json.KsqlTypesDeserializationModule; import io.confluent.ksql.properties.LocalProperties; -import io.confluent.ksql.rest.client.json.KsqlTypesDeserializationModule; import java.net.URI; import java.util.Map; import java.util.Optional; @@ -35,7 +35,7 @@ public final class KsqlClient implements AutoCloseable { static { JsonMapper.INSTANCE.mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); JsonMapper.INSTANCE.mapper.configure(DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES, false); - JsonMapper.INSTANCE.mapper.registerModule(new KsqlTypesDeserializationModule()); + JsonMapper.INSTANCE.mapper.registerModule(new KsqlTypesDeserializationModule(false)); } private final Client httpClient; diff --git a/ksql-rest-client/src/test/java/io/confluent/ksql/rest/entity/TableRowsEntityTest.java b/ksql-rest-client/src/test/java/io/confluent/ksql/rest/entity/TableRowsEntityTest.java index cd158bf1cf81..59bcd372cb4b 100644 --- a/ksql-rest-client/src/test/java/io/confluent/ksql/rest/entity/TableRowsEntityTest.java +++ b/ksql-rest-client/src/test/java/io/confluent/ksql/rest/entity/TableRowsEntityTest.java @@ -25,7 +25,7 @@ import io.confluent.ksql.json.KsqlTypesSerializationModule; import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.query.QueryId; -import io.confluent.ksql.rest.client.json.KsqlTypesDeserializationModule; +import io.confluent.ksql.parser.json.KsqlTypesDeserializationModule; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.types.SqlTypes; import java.util.Arrays; @@ -53,7 +53,7 @@ public class TableRowsEntityTest { MAPPER = new ObjectMapper(); MAPPER.registerModule(new Jdk8Module()); MAPPER.registerModule(new KsqlTypesSerializationModule()); - MAPPER.registerModule(new KsqlTypesDeserializationModule()); + MAPPER.registerModule(new KsqlTypesDeserializationModule(false)); } @Test(expected = IllegalArgumentException.class)