Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: serialize/deserialize plans from qtt #4080

Merged
merged 1 commit into from
Dec 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import io.confluent.ksql.json.KsqlTypesSerializationModule;
import io.confluent.ksql.parser.json.KsqlParserSerializationModule;
import io.confluent.ksql.parser.json.KsqlTypesDeserializationModule;

public final class PlanJsonMapper {
private PlanJsonMapper() {
Expand All @@ -33,7 +36,10 @@ public static ObjectMapper create() {
ObjectMapper mapper = new ObjectMapper();
mapper.registerModules(
new Jdk8Module(),
new JavaTimeModule()
new JavaTimeModule(),
new KsqlParserSerializationModule(),
new KsqlTypesSerializationModule(),
new KsqlTypesDeserializationModule(true)
);
mapper.enable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
mapper.enable(DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,18 @@
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableSet;
import io.confluent.ksql.execution.context.QueryContext;
import java.io.IOException;
import org.junit.Test;

public class QueryContextTest {
private static final ObjectMapper MAPPER = new ObjectMapper();

private final QueryContext.Stacker contextStacker = new QueryContext.Stacker().push("node");
private final QueryContext queryContext = contextStacker.getQueryContext();

Expand All @@ -51,4 +56,26 @@ public void shouldGenerateNewContextOnPush() {
assertQueryContext(childContext, "node", "child");
assertQueryContext(grandchildContext, "node", "child", "grandchild");
}

@Test
public void shouldSerializeCorrectly() throws IOException {
// Given:
final QueryContext context = contextStacker.push("child").getQueryContext();

// When:
final String serialized = MAPPER.writeValueAsString(context);

// Then:
assertThat(serialized, is("\"node/child\""));
}

@Test
public void shouldDeserializeCorrectly() throws IOException {
// When:
final QueryContext deserialized = MAPPER.readValue("\"node/child\"", QueryContext.class);

// Then:
final QueryContext expected = contextStacker.push("child").getQueryContext();
assertThat(deserialized, is(expected));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.confluent.ksql.execution.context;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.collect.ImmutableList;
import com.google.errorprone.annotations.Immutable;
Expand Down Expand Up @@ -43,12 +44,12 @@ private QueryContext(List<String> context) {
}
}

@SuppressWarnings("unused")// Invoked via reflection by Jackson
@JsonCreator
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some reason Jackson complains it can't deserialize into ImmutableList when using the JsonCreator (even though it ultimately just calls this constructor). So I switched this class to use a custom deserializer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's because there is a getter called getContext, so its checking out the private final ImmutableList<String> context; field. Mark getContext as @JsonIgnore and you'll be golden.

private QueryContext(final String context) {
private QueryContext(String context) {
this(ImmutableList.copyOf(context.split(DELIMITER)));
}

@JsonIgnore
public List<String> getContext() {
return context;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.KsqlExecutionContext.ExecuteResult;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.engine.KsqlPlan;
import io.confluent.ksql.engine.SqlFormatInjector;
import io.confluent.ksql.engine.StubInsertValuesExecutor;
import io.confluent.ksql.execution.json.PlanJsonMapper;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.name.SourceName;
Expand All @@ -41,12 +44,14 @@
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.Relation;
import io.confluent.ksql.parser.tree.Table;
import io.confluent.ksql.planner.plan.ConfiguredKsqlPlan;
import io.confluent.ksql.schema.ksql.inference.DefaultSchemaInjector;
import io.confluent.ksql.schema.ksql.inference.SchemaRegistryTopicSchemaSupplier;
import io.confluent.ksql.serde.Format;
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.test.TestFrameworkException;
import io.confluent.ksql.test.serde.SerdeSupplier;
import io.confluent.ksql.test.tools.stubs.StubKafkaService;
import io.confluent.ksql.test.utils.SerdeUtil;
Expand All @@ -55,6 +60,7 @@
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlStatementException;
import io.confluent.ksql.util.PersistentQueryMetadata;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -71,6 +77,8 @@
public final class TestExecutorUtil {
// CHECKSTYLE_RULES.ON: ClassDataAbstractionCoupling

private static final ObjectMapper PLAN_MAPPER = PlanJsonMapper.create();

private TestExecutorUtil() {
}

Expand Down Expand Up @@ -275,7 +283,6 @@ private static List<PersistentQueryAndSortedSources> execute(
.collect(Collectors.toList());
}


@SuppressWarnings({"rawtypes", "unchecked"})
private static ExecuteResultAndSortedSources execute(
final KsqlExecutionContext executionContext,
Expand Down Expand Up @@ -308,7 +315,7 @@ private static ExecuteResultAndSortedSources execute(

final ExecuteResult executeResult;
try {
executeResult = executionContext.execute(executionContext.getServiceContext(), reformatted);
executeResult = executeConfiguredStatement(executionContext, reformatted);
} catch (final KsqlStatementException statementException) {
// use the original statement text in the exception so that tests
// can easily check that the failed statement is the input statement
Expand Down Expand Up @@ -339,6 +346,31 @@ private static ExecuteResultAndSortedSources execute(
Optional.empty());
}

@SuppressWarnings("unchecked")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: don't think this suppression is needed

private static ExecuteResult executeConfiguredStatement(
final KsqlExecutionContext executionContext,
final ConfiguredStatement<?> stmt) {
final ConfiguredKsqlPlan configuredPlan;
try {
configuredPlan = buildConfiguredPlan(executionContext, stmt);
} catch (final IOException e) {
throw new TestFrameworkException("Error (de)serializing plan: " + e.getMessage(), e);
}
return executionContext.execute(executionContext.getServiceContext(), configuredPlan);
}

private static ConfiguredKsqlPlan buildConfiguredPlan(
final KsqlExecutionContext executionContext,
final ConfiguredStatement<?> stmt
) throws IOException {
final KsqlPlan plan = executionContext.plan(executionContext.getServiceContext(), stmt);
final String serialized = PLAN_MAPPER.writeValueAsString(plan);
return ConfiguredKsqlPlan.of(
PLAN_MAPPER.readValue(serialized, KsqlPlan.class),
stmt.getOverrides(),
stmt.getConfig());
}

private static Optional<Long> getWindowSize(final Query query) {
return query.getWindow().flatMap(window -> window
.getKsqlWindowExpression()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.client.json;
package io.confluent.ksql.parser.json;

import com.fasterxml.jackson.databind.module.SimpleModule;
import io.confluent.ksql.schema.ksql.LogicalSchema;

public class KsqlTypesDeserializationModule extends SimpleModule {

public KsqlTypesDeserializationModule() {
addDeserializer(LogicalSchema.class, new LogicalSchemaDeserializer());
public KsqlTypesDeserializationModule(boolean withImplicitColumns) {
addDeserializer(LogicalSchema.class, new LogicalSchemaDeserializer(withImplicitColumns));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.client.json;
package io.confluent.ksql.parser.json;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
Expand All @@ -25,6 +25,11 @@
import java.io.IOException;

final class LogicalSchemaDeserializer extends JsonDeserializer<LogicalSchema> {
final boolean withImplicitColumns;

LogicalSchemaDeserializer(final boolean withImplicitColumns) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this because when deserializing ddl schemas we do want the implicit columns.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of interest, is it just ROWTIME you need? I'm hoping to completely remove the concept of 'implicit' column soon by:

  1. making ROWTIME a pseudo column
  2. explicitly adding ROWTIME in CT/CS statements if a key column is not provided.

Copy link
Contributor Author

@rodesai rodesai Dec 10, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

explicitly adding ROWTIME in CT/CS statements if a key column is not provided

you mean ROWKEY?

this.withImplicitColumns = withImplicitColumns;
}

@Override
public LogicalSchema deserialize(
Expand All @@ -36,6 +41,6 @@ public LogicalSchema deserialize(

final TableElements tableElements = SchemaParser.parse(text, TypeRegistry.EMPTY);

return tableElements.toLogicalSchema(false);
return tableElements.toLogicalSchema(withImplicitColumns);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.client.json;
package io.confluent.ksql.parser.json;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
Expand All @@ -32,7 +32,7 @@ public class LogicalSchemaDeserializerTest {

@BeforeClass
public static void classSetUp() {
MAPPER.registerModule(new TestModule());
MAPPER.registerModule(new TestModule(false));
}

@Test
Expand Down Expand Up @@ -83,10 +83,29 @@ public void shouldDeserializeSchemaWithKeyAfterValue() throws Exception {
.build()));
}

@Test
public void shouldAddImplicitColumns() throws Exception {
// Given:
final ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new TestModule(true));
final String json = "\"`v0` INTEGER\"";

// When:
final LogicalSchema schema = mapper.readValue(json, LogicalSchema.class);

// Then:
assertThat(schema, is(LogicalSchema.builder()
.valueColumn(ColumnName.of("v0"), SqlTypes.INTEGER)
.build()));
}

private static class TestModule extends SimpleModule {

private TestModule() {
addDeserializer(LogicalSchema.class, new LogicalSchemaDeserializer());
private TestModule(boolean withImplicitColumns) {
addDeserializer(
LogicalSchema.class,
new LogicalSchemaDeserializer(withImplicitColumns)
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.json.JsonMapper;
import io.confluent.ksql.parser.json.KsqlTypesDeserializationModule;
import io.confluent.ksql.properties.LocalProperties;
import io.confluent.ksql.rest.client.json.KsqlTypesDeserializationModule;
import java.net.URI;
import java.util.Map;
import java.util.Optional;
Expand All @@ -35,7 +35,7 @@ public final class KsqlClient implements AutoCloseable {
static {
JsonMapper.INSTANCE.mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
JsonMapper.INSTANCE.mapper.configure(DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES, false);
JsonMapper.INSTANCE.mapper.registerModule(new KsqlTypesDeserializationModule());
JsonMapper.INSTANCE.mapper.registerModule(new KsqlTypesDeserializationModule(false));
}

private final Client httpClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import io.confluent.ksql.json.KsqlTypesSerializationModule;
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.rest.client.json.KsqlTypesDeserializationModule;
import io.confluent.ksql.parser.json.KsqlTypesDeserializationModule;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.types.SqlTypes;
import java.util.Arrays;
Expand Down Expand Up @@ -53,7 +53,7 @@ public class TableRowsEntityTest {
MAPPER = new ObjectMapper();
MAPPER.registerModule(new Jdk8Module());
MAPPER.registerModule(new KsqlTypesSerializationModule());
MAPPER.registerModule(new KsqlTypesDeserializationModule());
MAPPER.registerModule(new KsqlTypesDeserializationModule(false));
}

@Test(expected = IllegalArgumentException.class)
Expand Down