Skip to content

Commit

Permalink
chore: split JSON mappers (#5066)
Browse files Browse the repository at this point in the history
`JsonMapper` is used for many things:
 * The JSON Rest API.
 * Processing Logger JSON.
 * Deserializing JSON in a few UDFs.
 * Connect JSON handling.
 * The QTT / Test framework specs.

When ever any one of those things needs the a change to the mapper it affects all the other uses, often unintentionally. Better to split `JsonMapper` into usage specific classes.

Co-authored-by: Andy Coates <[email protected]>
  • Loading branch information
big-andy-coates and big-andy-coates authored Apr 15, 2020
1 parent 3f16844 commit 0b27b3f
Show file tree
Hide file tree
Showing 57 changed files with 453 additions and 222 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@
import io.confluent.ksql.cli.console.table.builder.TablesListTableBuilder;
import io.confluent.ksql.cli.console.table.builder.TopicDescriptionTableBuilder;
import io.confluent.ksql.cli.console.table.builder.TypeListTableBuilder;
import io.confluent.ksql.json.JsonMapper;
import io.confluent.ksql.model.WindowType;
import io.confluent.ksql.rest.ApiJsonMapper;
import io.confluent.ksql.rest.entity.ArgumentInfo;
import io.confluent.ksql.rest.entity.CommandStatusEntity;
import io.confluent.ksql.rest.entity.ConnectorDescription;
Expand Down Expand Up @@ -122,6 +122,7 @@ public class Console implements Closeable {
// CHECKSTYLE_RULES.ON: ClassDataAbstractionCoupling

private static final Logger log = LoggerFactory.getLogger(Console.class);
private static final ObjectMapper OBJECT_MAPPER = ApiJsonMapper.INSTANCE.get();

private static final ClassHandlerMap1<KsqlEntity, Console> PRINT_HANDLERS =
HandlerMaps.forClass(KsqlEntity.class).withArgType(Console.class)
Expand Down Expand Up @@ -189,7 +190,6 @@ private static <T extends KsqlEntity> Handler1<KsqlEntity, Console> tablePrinter
}
}

private final ObjectMapper objectMapper;
private final Map<String, CliSpecificCommand> cliSpecificCommands;
private final KsqlTerminal terminal;
private final RowCaptor rowCaptor;
Expand Down Expand Up @@ -234,7 +234,6 @@ public Console(
this.terminal = Objects.requireNonNull(terminal, "terminal");
this.rowCaptor = Objects.requireNonNull(rowCaptor, "rowCaptor");
this.cliSpecificCommands = Maps.newLinkedHashMap();
this.objectMapper = JsonMapper.INSTANCE.mapper;
this.config = new CliConfig(ImmutableMap.of());
}

Expand Down Expand Up @@ -826,7 +825,7 @@ private void printAsJson(final Object o) {
}

try {
objectMapper.writerWithDefaultPrettyPrinter().writeValue(writer(), o);
OBJECT_MAPPER.writerWithDefaultPrettyPrinter().writeValue(writer(), o);
writer().println();
flush();
} catch (final IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Splitter;
import io.confluent.ksql.cli.console.table.Table;
import io.confluent.ksql.json.JsonMapper;
import io.confluent.ksql.rest.ApiJsonMapper;
import io.confluent.ksql.rest.entity.ErrorEntity;
import java.io.IOException;

public class ErrorEntityTableBuilder implements TableBuilder<ErrorEntity> {

private static final ObjectMapper MAPPER = JsonMapper.INSTANCE.mapper;
private static final ObjectMapper MAPPER = ApiJsonMapper.INSTANCE.get();

@Override
public Table buildTable(final ErrorEntity entity) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2018 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.logging.processing;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import io.confluent.ksql.json.KsqlTypesSerializationModule;
import io.confluent.ksql.json.StructSerializationModule;

/**
* Object mapper used for the processing logger.
*/
public enum ProcessingLoggingJsonMapper {

INSTANCE;

private final ObjectMapper mapper = new ObjectMapper()
.disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET)
.registerModule(new Jdk8Module())
.registerModule(new StructSerializationModule())
.registerModule(new KsqlTypesSerializationModule())
.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS)
.enable(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN)
.setNodeFactory(JsonNodeFactory.withExactBigDecimals(true));

public ObjectMapper get() {
return mapper;
}
}
27 changes: 0 additions & 27 deletions ksqldb-common/src/test/java/io/confluent/ksql/GenericRowTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,9 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.testing.EqualsTester;
import io.confluent.ksql.json.JsonMapper;
import java.math.BigDecimal;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
Expand All @@ -34,8 +32,6 @@

public class GenericRowTest {

private static final ObjectMapper MAPPER = JsonMapper.INSTANCE.mapper;

private final Schema addressSchema = SchemaBuilder.struct()
.field("NUMBER", Schema.OPTIONAL_INT64_SCHEMA)
.field("STREET", Schema.OPTIONAL_STRING_SCHEMA)
Expand Down Expand Up @@ -188,27 +184,4 @@ public void testEquals() {
)
.testEquals();
}

@Test
public void shouldSerialize() throws Exception {
// Given:
final GenericRow original = genericRow(1, 2L, 3.0, Long.MAX_VALUE);

// When:
final String json = MAPPER.writeValueAsString(original);

// Then:
assertThat(json, is("{\"columns\":[1,2,3.0,9223372036854775807]}"));

// When:
final GenericRow result = MAPPER.readValue(json, GenericRow.class);

// Then:
assertThat(result, is(genericRow(
1,
2, // Note: int, not long, as JSON doesn't distinguish
BigDecimal.valueOf(3.0),// Note: decimal, not double, as Jackson is configured this way
Long.MAX_VALUE
)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,11 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.testing.EqualsTester;
import io.confluent.ksql.json.JsonMapper;
import org.junit.Test;

public class QueryIdTest {

private final ObjectMapper objectMapper = JsonMapper.INSTANCE.mapper;

@SuppressWarnings("UnstableApiUsage")
@Test
public void shouldImplementEqualsProperly() {
Expand All @@ -53,27 +49,4 @@ public void shouldPreserveCase() {
// Then:
assertThat(id.toString(), is("Mixed-Case-Id"));
}

@Test
public void shouldSerializeMaintainingCase() throws Exception {
// Given:
final QueryId id = new QueryId("Query-Id");

// When:
final String serialized = objectMapper.writeValueAsString(id);

assertThat(serialized, is("\"Query-Id\""));
}

@Test
public void shouldDeserializeMaintainingCase() throws Exception {
// Given:
final String serialized = "\"An-Id\"";

// When:
final QueryId deserialized = objectMapper.readValue(serialized, QueryId.class);

// Then:
assertThat(deserialized.toString(), is("An-Id"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,29 +24,29 @@
import io.confluent.ksql.parser.json.KsqlParserSerializationModule;
import io.confluent.ksql.parser.json.KsqlTypesDeserializationModule;

public final class PlanJsonMapper {
private PlanJsonMapper() {
}
/**
* The Json mapper used for serializing and deserializing to internal topics such as the command and
* config topics.
*/
public enum PlanJsonMapper {

INSTANCE;

private final ObjectMapper mapper = new ObjectMapper()
.registerModules(
new Jdk8Module(),
new JavaTimeModule(),
new KsqlParserSerializationModule(),
new KsqlTypesSerializationModule(),
new KsqlTypesDeserializationModule(true)
)
.enable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
.enable(DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES)
.enable(DeserializationFeature.FAIL_ON_NULL_CREATOR_PROPERTIES)
.enable(DeserializationFeature.FAIL_ON_INVALID_SUBTYPE)
.setSerializationInclusion(Include.NON_EMPTY);

/**
* Create an ObjectMapper configured for serializing/deserializing a KSQL physical plan.
*
* @return ObjectMapper instance
*/
public static ObjectMapper create() {
final ObjectMapper mapper = new ObjectMapper();
mapper.registerModules(
new Jdk8Module(),
new JavaTimeModule(),
new KsqlParserSerializationModule(),
new KsqlTypesSerializationModule(),
new KsqlTypesDeserializationModule(true)
);
mapper.enable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
mapper.enable(DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES);
mapper.enable(DeserializationFeature.FAIL_ON_NULL_CREATOR_PROPERTIES);
mapper.enable(DeserializationFeature.FAIL_ON_INVALID_SUBTYPE);
mapper.setSerializationInclusion(Include.NON_EMPTY);
public ObjectMapper get() {
return mapper;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.json.JsonMapper;
import java.io.IOException;
import java.util.EnumMap;
import java.util.Objects;
Expand All @@ -50,7 +49,7 @@ public class JsonArrayContains {
private static final JsonFactory PARSER_FACTORY = new JsonFactoryBuilder()
.disable(CANONICALIZE_FIELD_NAMES)
.build()
.setCodec(JsonMapper.INSTANCE.mapper);
.setCodec(UdfJsonMapper.INSTANCE.get());

private static final EnumMap<JsonToken, Predicate<Object>> TOKEN_COMPAT;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import io.confluent.ksql.execution.function.UdfUtil;
import io.confluent.ksql.function.KsqlFunctionException;
import io.confluent.ksql.function.udf.Kudf;
import io.confluent.ksql.json.JsonMapper;
import io.confluent.ksql.name.FunctionName;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.json.JsonPathTokenizer;
Expand All @@ -31,7 +30,7 @@

public class JsonExtractStringKudf implements Kudf {

private static final ObjectReader OBJECT_READER = JsonMapper.INSTANCE.mapper.reader();
private static final ObjectReader OBJECT_READER = UdfJsonMapper.INSTANCE.get().reader();
public static final FunctionName FUNCTION_NAME = FunctionName.of("EXTRACTJSONFIELD");

private List<String> tokens = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,28 @@
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.json;
package io.confluent.ksql.function.udf.json;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;

public enum JsonMapper {
/**
* Shared Object mapper used by JSON processing UDFs
*/
public enum UdfJsonMapper {

INSTANCE;

public final ObjectMapper mapper =
new ObjectMapper().disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET);
private final ObjectMapper mapper = new ObjectMapper()
.disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET)
.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS)
.enable(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN)
.setNodeFactory(JsonNodeFactory.withExactBigDecimals(true));

JsonMapper() {
mapper.registerModule(new Jdk8Module());
mapper.registerModule(new StructSerializationModule());
mapper.registerModule(new KsqlTypesSerializationModule());
mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
mapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
mapper.enable(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN);
mapper.setNodeFactory(JsonNodeFactory.withExactBigDecimals(true));
public ObjectMapper get() {
return mapper;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2018 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.services;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import io.confluent.ksql.json.KsqlTypesSerializationModule;
import io.confluent.ksql.json.StructSerializationModule;

/**
* Json Mapper used by Connect integration.
*/
public enum ConnectJsonMapper {

INSTANCE;

private final ObjectMapper mapper = new ObjectMapper()
.disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET)
.registerModule(new Jdk8Module())
.registerModule(new StructSerializationModule())
.registerModule(new KsqlTypesSerializationModule())
.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS)
.enable(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN)
.setNodeFactory(JsonNodeFactory.withExactBigDecimals(true));

public ObjectMapper get() {
return mapper;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies;
import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.json.JsonMapper;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlServerException;
import java.net.URI;
Expand Down Expand Up @@ -56,7 +55,8 @@
public class DefaultConnectClient implements ConnectClient {

private static final Logger LOG = LoggerFactory.getLogger(DefaultConnectClient.class);
private static final ObjectMapper MAPPER = JsonMapper.INSTANCE.mapper;

private static final ObjectMapper MAPPER = ConnectJsonMapper.INSTANCE.get();

private static final String CONNECTORS = "/connectors";
private static final String STATUS = "/status";
Expand Down
Loading

0 comments on commit 0b27b3f

Please sign in to comment.