Skip to content

Commit

Permalink
feat: Implement user defined delimiter for value format (#3393)
Browse files Browse the repository at this point in the history
This PR implements configurable delimiters for value formats.
  • Loading branch information
purplefox authored Sep 26, 2019
1 parent 3aaeb73 commit b84d0aa
Show file tree
Hide file tree
Showing 38 changed files with 717 additions and 112 deletions.
20 changes: 20 additions & 0 deletions docs/developer-guide/syntax-reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,11 @@ The WITH clause supports the following properties:
| | set, then the default Kafka cluster configuration for replicas will be used for creating a |
| | new topic. |
+-------------------------+--------------------------------------------------------------------------------------------+
| VALUE_DELIMITER | Used when VALUE_FORMAT='DELIMITED'. Supports single character to be a delimiter, |
| | defaults to ','. |
| | For space and tab delimited values you must use the special values 'SPACE' or 'TAB', not |
| | an actual space or tab character. |
+-------------------------+--------------------------------------------------------------------------------------------+
| KEY | Optimization hint: If the Kafka message key is also present as a field/column in the Kafka |
| | message value, you may set this property to associate the corresponding field/column with |
| | the implicit ``ROWKEY`` column (message key). |
Expand Down Expand Up @@ -470,6 +475,11 @@ The WITH clause supports the following properties:
| | set, then the default Kafka cluster configuration for replicas will be used for creating a |
| | new topic. |
+-------------------------+--------------------------------------------------------------------------------------------+
| VALUE_DELIMITER | Used when VALUE_FORMAT='DELIMITED'. Supports single character to be a delimiter, |
| | defaults to ','. |
| | For space and tab delimited values you must use the special values 'SPACE' or 'TAB', not |
| | an actual space or tab character. |
+-------------------------+--------------------------------------------------------------------------------------------+
| KEY | Optimization hint: If the Kafka message key is also present as a field/column in the Kafka |
| | message value, you may set this property to associate the corresponding field/column with |
| | the implicit ``ROWKEY`` column (message key). |
Expand Down Expand Up @@ -587,6 +597,11 @@ The WITH clause for the result supports the following properties:
| | If this property is not set, then the format of the input stream/table is used. |
| | For more information, see :ref:`ksql_formats`. |
+-------------------------+------------------------------------------------------------------------------------------------------+
| VALUE_DELIMITER | Used when VALUE_FORMAT='DELIMITED'. Supports single character to be a delimiter, |
| | defaults to ','. |
| | For space and tab delimited values you must use the special values 'SPACE' or 'TAB', not |
| | an actual space or tab character. |
+-------------------------+------------------------------------------------------------------------------------------------------+
| PARTITIONS | The number of partitions in the backing topic. If this property is not set, then the number |
| | of partitions of the input stream/table will be used. In join queries, the property values are taken |
| | from the left-side stream or table. |
Expand Down Expand Up @@ -694,6 +709,11 @@ The WITH clause supports the following properties:
| | If this property is not set, then the format of the input stream/table is used. |
| | For more information, see :ref:`ksql_formats`. |
+-------------------------+------------------------------------------------------------------------------------------------------+
| VALUE_DELIMITER | Used when VALUE_FORMAT='DELIMITED'. Supports single character to be a delimiter, |
| | defaults to ','. |
| | For space and tab delimited values you must use the special values 'SPACE' or 'TAB', not |
| | an actual space or tab character. |
+-------------------------+------------------------------------------------------------------------------------------------------+
| PARTITIONS | The number of partitions in the backing topic. If this property is not set, then the number |
| | of partitions of the input stream/table will be used. In join queries, the property values are taken |
| | from the left-side stream or table. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ private static Serde<GenericRow> getJsonSerdeHelper(
final org.apache.kafka.connect.data.Schema schema
) {
return getGenericRowSerde(
FormatInfo.of(Format.JSON, Optional.empty()),
FormatInfo.of(Format.JSON),
schema,
() -> null
);
Expand All @@ -170,7 +170,7 @@ private static Serde<GenericRow> getAvroSerde(
final SchemaRegistryClient schemaRegistryClient = new MockSchemaRegistryClient();

return getGenericRowSerde(
FormatInfo.of(Format.AVRO, Optional.of("benchmarkSchema")),
FormatInfo.of(Format.AVRO, Optional.of("benchmarkSchema"), Optional.empty()),
schema,
() -> schemaRegistryClient
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.common.config.ConfigDef.Validator;
import org.apache.kafka.common.config.ConfigException;
Expand All @@ -32,6 +33,39 @@ public final class ConfigValidators {
private ConfigValidators() {
}

/**
* Validator that tests the STRING property can be parsed by the supplied {@code parser}.
* @param parser the parser.
* @return the validator
*/
public static Validator parses(final Function<String, ?> parser) {
return (name, val) -> {
if (val != null && !(val instanceof String)) {
throw new IllegalArgumentException("validator should only be used with STRING defs");
}
try {
parser.apply((String)val);
} catch (Exception e) {
throw new ConfigException("Configuration " + name + " is invalid: " + e.getMessage());
}
};
}

/**
* Validator that allows null values and calls the {@code delegate} for any non-null values.
* @param delegate the delegate to call for non-null values.
* @return the validator.
*/
public static Validator nullsAllowed(final Validator delegate) {
return (name, value) -> {
if (value == null) {
return;
}

delegate.ensureValid(name, value);
};
}

public static <T extends Enum<T>> Validator enumValues(final Class<T> enumClass) {
final String[] enumValues = EnumSet.allOf(enumClass)
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.confluent.ksql.properties.with;

import io.confluent.ksql.configdef.ConfigValidators;
import io.confluent.ksql.serde.Delimiter;
import io.confluent.ksql.serde.Format;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
Expand All @@ -40,6 +41,8 @@ public final class CommonCreateConfigs {
public static final String VALUE_FORMAT_PROPERTY = "VALUE_FORMAT";
public static final String WRAP_SINGLE_VALUE = "WRAP_SINGLE_VALUE";

public static final String VALUE_DELIMITER_PROPERTY = "VALUE_DELIMITER";

static void addToConfigDef(
final ConfigDef configDef,
final boolean topicNameRequired,
Expand Down Expand Up @@ -117,7 +120,18 @@ static void addToConfigDef(
ConfigDef.Type.STRING,
null,
Importance.LOW,
"The fully qualified name of the Avro schema to use");
"The fully qualified name of the Avro schema to use"
)
.define(
VALUE_DELIMITER_PROPERTY,
ConfigDef.Type.STRING,
null,
ConfigValidators.nullsAllowed(ConfigValidators.parses(Delimiter::of)),
Importance.LOW,
"The delimiter to use when VALUE_FORMAT='DELIMITED'. Supports single "
+ "character to be a delimiter, defaults to ','. For space and tab delimited values "
+ "you must use the special values 'SPACE' or 'TAB', not an actual space or tab "
+ "character.");
}

private CommonCreateConfigs() {
Expand Down
99 changes: 99 additions & 0 deletions ksql-common/src/main/java/io/confluent/ksql/serde/Delimiter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.serde;

import com.google.common.collect.ImmutableMap;
import com.google.errorprone.annotations.Immutable;
import java.util.Map;
import java.util.Objects;
import org.apache.commons.lang3.StringUtils;

@Immutable
public final class Delimiter {

private static final Map<String, Character> NAMED_DELIMITERS = ImmutableMap
.<String, Character>builder()
.put("TAB", '\t')
.put("SPACE", ' ')
.build();

private static final String NAMED_DELIMITERS_STRING =
StringUtils.join(NAMED_DELIMITERS.keySet(), ",");

private final char delimiter;

public static Delimiter of(final char ch) {
return new Delimiter(ch);
}

private Delimiter(final char delimiter) {
this.delimiter = delimiter;
}

public static Delimiter of(final String str) {
if (str == null) {
throw new NullPointerException();
}
if (str.trim().isEmpty()) {
throw new IllegalArgumentException(
"Delimiter cannot be empty, if you meant to have a tab or space for delimiter, please "
+ "use the special values 'TAB' or 'SPACE'"
+ System.lineSeparator()
+ "Example valid value: ';'"
);
}
if (str.length() == 1) {
return new Delimiter(str.charAt(0));
}
final Character delim = NAMED_DELIMITERS.get(str);
if (delim != null) {
return new Delimiter(delim);
}
throw new IllegalArgumentException(
"Invalid delimiter value: '" + str
+ "'. Delimiter must be a single character or "
+ NAMED_DELIMITERS_STRING
+ System.lineSeparator()
+ "Example valid value: ';'"
);
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final Delimiter delimiter1 = (Delimiter) o;
return delimiter == delimiter1.delimiter;
}

@Override
public int hashCode() {
return Objects.hash(delimiter);
}

@Override
public String toString() {
return String.valueOf(delimiter);
}

public char getDelimiter() {
return delimiter;
}
}
34 changes: 25 additions & 9 deletions ksql-common/src/main/java/io/confluent/ksql/serde/FormatInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,23 @@ public final class FormatInfo {

private final Format format;
private final Optional<String> avroFullSchemaName;
private final Optional<Delimiter> delimiter;

public static FormatInfo of(final Format format) {
return FormatInfo.of(format, Optional.empty());
return FormatInfo.of(format, Optional.empty(), Optional.empty());
}

public static FormatInfo of(
final Format format,
final Optional<String> avroFullSchemaName
) {
return new FormatInfo(format, avroFullSchemaName);
final Optional<String> avroFullSchemaName,
final Optional<Delimiter> valueDelimiter) {
return new FormatInfo(format, avroFullSchemaName, valueDelimiter);
}

private FormatInfo(
final Format format,
final Optional<String> avroFullSchemaName
final Optional<String> avroFullSchemaName,
final Optional<Delimiter> delimiter
) {
this.format = Objects.requireNonNull(format, "format");
this.avroFullSchemaName = Objects.requireNonNull(avroFullSchemaName, "avroFullSchemaName");
Expand All @@ -51,9 +53,17 @@ private FormatInfo(
throw new KsqlException("Full schema name only supported with AVRO format");
}

if (avroFullSchemaName.map(name -> name.trim().isEmpty()).orElse(false)) {
throw new KsqlException("Schema name can not be empty");
if (format == Format.AVRO
&& avroFullSchemaName.map(name -> name.trim().isEmpty()).orElse(false)) {
throw new KsqlException("Schema name cannot be empty");
}

this.delimiter = Objects.requireNonNull(delimiter, "delimiter");

if (format != Format.DELIMITED && delimiter.isPresent()) {
throw new KsqlException("Delimeter only supported with DELIMITED format");
}

}

public Format getFormat() {
Expand All @@ -64,6 +74,10 @@ public Optional<String> getAvroFullSchemaName() {
return avroFullSchemaName;
}

public Optional<Delimiter> getDelimiter() {
return delimiter;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand All @@ -74,19 +88,21 @@ public boolean equals(final Object o) {
}
final FormatInfo that = (FormatInfo) o;
return format == that.format
&& Objects.equals(avroFullSchemaName, that.avroFullSchemaName);
&& Objects.equals(avroFullSchemaName, that.avroFullSchemaName)
&& Objects.equals(delimiter, that.delimiter);
}

@Override
public int hashCode() {
return Objects.hash(format, avroFullSchemaName);
return Objects.hash(format, avroFullSchemaName, delimiter);
}

@Override
public String toString() {
return "FormatInfo{"
+ "format=" + format
+ ", avroFullSchemaName=" + avroFullSchemaName
+ ", delimiter=" + delimiter
+ '}';
}
}
11 changes: 0 additions & 11 deletions ksql-common/src/main/java/io/confluent/ksql/serde/KeyFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,6 @@ public static KeyFormat nonWindowed(final FormatInfo format) {
return new KeyFormat(format, Optional.empty());
}

public static KeyFormat windowed(
final Format format,
final Optional<String> avroSchemaName,
final WindowInfo windowInfo
) {
return new KeyFormat(
FormatInfo.of(format, avroSchemaName),
Optional.of(windowInfo)
);
}

public static KeyFormat windowed(
final FormatInfo format,
final WindowInfo windowInfo
Expand Down
Loading

0 comments on commit b84d0aa

Please sign in to comment.