Skip to content

Commit

Permalink
fix: change default exception handling for ksql timestamp extractors
Browse files Browse the repository at this point in the history
  • Loading branch information
agavra committed Mar 2, 2020
1 parent ce74cf8 commit 571760b
Show file tree
Hide file tree
Showing 23 changed files with 915 additions and 23 deletions.
14 changes: 14 additions & 0 deletions ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,13 @@ public class KsqlConfig extends AbstractConfig {
public static final Boolean KSQL_NEW_API_ENABLED_DEFAULT = false;
public static final String KSQL_NEW_API_ENABLED_DOC = "Is the new Vert.x based API enabled?";

public static final String KSQL_TIMESTAMP_THROW_ON_INVALID = "ksql.timestamp.throw.on.invalid";
public static final Boolean KSQL_TIMESTAMP_THROW_ON_INVALID_DEFAULT = false;
public static final String KSQL_TIMESTAMP_THROW_ON_INVALID_DOC = "If an incoming message "
+ "contains an invalid timestamp, ksqlDB will log a warning and continue. To disable this "
+ "behavior, and instead throw an exception to ensure that no data is missed, set "
+ "ksql.timestamp.skip.invalid to true.";

private enum ConfigGeneration {
LEGACY,
CURRENT
Expand Down Expand Up @@ -609,6 +616,13 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
Importance.LOW,
KSQL_NEW_API_ENABLED_DOC
)
.define(
KSQL_TIMESTAMP_THROW_ON_INVALID,
Type.BOOLEAN,
KSQL_TIMESTAMP_THROW_ON_INVALID_DEFAULT,
Importance.MEDIUM,
KSQL_TIMESTAMP_THROW_ON_INVALID_DOC
)
.withClientSslSupport();

for (final CompatibilityBreakingConfigDef compatibilityBreakingConfigDef
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
Expand All @@ -36,13 +37,17 @@
import io.confluent.ksql.execution.streams.KSPlanBuilder;
import io.confluent.ksql.execution.timestamp.TimestampColumn;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.logging.processing.ProcessingLogContext;
import io.confluent.ksql.logging.processing.ProcessingLogger;
import io.confluent.ksql.logging.processing.ProcessingLoggerFactory;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.metastore.model.KeyField;
import io.confluent.ksql.metastore.model.KsqlStream;
import io.confluent.ksql.metastore.model.KsqlTable;
import io.confluent.ksql.model.WindowType;
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.types.SqlTypes;
import io.confluent.ksql.serde.FormatFactory;
Expand Down Expand Up @@ -141,6 +146,12 @@ public class DataSourceNodeTest {
private SchemaKTable<Struct> table;
@Mock
private KsqlTopic topic;
@Mock
private ProcessingLogContext processingLogContext;
@Mock
private ProcessingLoggerFactory processingLoggerFactory;
@Mock
private ProcessingLogger processingLogger;

private DataSourceNode node;

Expand All @@ -149,6 +160,10 @@ public class DataSourceNodeTest {
public void before() {
realBuilder = new StreamsBuilder();

when(ksqlStreamBuilder.getQueryId()).thenReturn(new QueryId("fooQuery"));
when(ksqlStreamBuilder.getProcessingLogContext()).thenReturn(processingLogContext);
when(processingLogContext.getLoggerFactory()).thenReturn(processingLoggerFactory);
when(processingLoggerFactory.getLogger(anyString())).thenReturn(processingLogger);
when(ksqlStreamBuilder.getKsqlConfig()).thenReturn(realConfig);
when(ksqlStreamBuilder.getStreamsBuilder()).thenReturn(realBuilder);
when(ksqlStreamBuilder.buildNodeContext(any())).thenAnswer(inv ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,6 @@ private void validate() {
if (this.statements.isEmpty()) {
throw new InvalidFieldException("statements", "was empty");
}

if (!this.inputs.isEmpty() && this.expectedException.isPresent()) {
throw new InvalidFieldException("inputs and expectedException",
"can not both be set");
}
}

private static <T> ImmutableList<T> immutableCopyOf(final List<T> source) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,6 @@ public void buildAndExecuteQuery(final TestCase testCase) {
stubKafkaService
);

testCase.expectedException().map(ee -> {
throw new AssertionError("Expected test to throw" + StringDescription.toString(ee));
});

writeInputIntoTopics(testCase.getInputRecords(), stubKafkaService);
final Set<String> inputTopics = testCase.getInputRecords()
.stream()
Expand Down Expand Up @@ -181,6 +177,9 @@ public void buildAndExecuteQuery(final TestCase testCase) {
.map(Topic::getName)
.forEach(allTopicNames::add);

testCase.expectedException().map(ee -> {
throw new AssertionError("Expected test to throw" + StringDescription.toString(ee));
});
testCase.getPostConditions().verify(ksqlEngine.getMetaStore(), allTopicNames);
} catch (final RuntimeException e) {
final Optional<Matcher<Throwable>> expectedExceptionMatcher = testCase.expectedException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.nio.file.Files;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.xml.parsers.DocumentBuilder;
Expand Down Expand Up @@ -141,6 +142,7 @@ public static List<TestCasePlan> allForTestCase(final TestCase testCase) {
.orElse(Collections.emptyList())
.stream()
.map(p -> parseSpec(rootforCase.resolve(p)))
.filter(Objects::nonNull)
.collect(Collectors.toList());
}

Expand All @@ -149,11 +151,15 @@ private static TestCasePlan parseSpec(final PlannedTestPath versionDir) {
final PlannedTestPath specPath = versionDir.resolve(PlannedTestPath.SPEC_FILE);
final PlannedTestPath topologyPath = versionDir.resolve(PlannedTestPath.TOPOLOGY_FILE);

return new TestCasePlan(
parseJson(specPath, JsonTestLoader.OBJECT_MAPPER, TestCaseSpecNode.class),
parseJson(planPath, PlannedTestUtils.PLAN_MAPPER, TestCasePlanNode.class),
slurp(topologyPath)
);
try {
return new TestCasePlan(
parseJson(specPath, JsonTestLoader.OBJECT_MAPPER, TestCaseSpecNode.class),
parseJson(planPath, PlannedTestUtils.PLAN_MAPPER, TestCasePlanNode.class),
slurp(topologyPath)
);
} catch (final Exception e) {
return null;
}
}

private static <T> T parseJson(final PlannedTestPath path, final ObjectMapper mapper,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@
},
"options" : [ ]
},
"topicName" : "TS"
"topicName" : "TS",
"timestampColumn" : null
},
"queryId" : "CSAS_TS_0"
}
Expand All @@ -114,7 +115,7 @@
"ksql.internal.topic.min.insync.replicas" : "1",
"ksql.streams.shutdown.timeout.ms" : "300000",
"ksql.new.api.enabled" : "false",
"ksql.streams.state.dir" : "/var/folders/p9/bk8xks6s2lndncftdbq36xh80000gp/T/confluent8393051053551629823",
"ksql.streams.state.dir" : "/var/folders/bz/qnz23q_j6v12b3b_tm9ztv700000gn/T/confluent8144864285731138620",
"ksql.internal.topic.replicas" : "1",
"ksql.insert.into.values.enabled" : "true",
"ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807",
Expand All @@ -129,6 +130,7 @@
"ksql.query.pull.streamsstore.rebalancing.timeout.ms" : "10000",
"ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.streams.num.stream.threads" : "4",
"ksql.timestamp.throw.on.invalid" : "false",
"ksql.authorization.cache.max.entries" : "10000",
"ksql.metrics.tags.custom" : "",
"ksql.pull.queries.enable" : "true",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"version" : "5.5.0",
"timestamp" : 1581572104400,
"version" : "6.0.0",
"timestamp" : 1583181389696,
"schemas" : {
"CSAS_TS_0.KsqlTopic.Source" : "STRUCT<ID BIGINT> NOT NULL",
"CSAS_TS_0.TS" : "STRUCT<ID BIGINT> NOT NULL"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
{
"plan" : [ {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM TEST (ID BIGINT, TS STRING) WITH (KAFKA_TOPIC='test_topic', TIMESTAMP='ts', TIMESTAMP_FORMAT='yy-MM-dd HH:mm:ss', VALUE_FORMAT='JSON');",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "TEST",
"schema" : "`ROWKEY` STRING KEY, `ID` BIGINT, `TS` STRING",
"keyField" : null,
"timestampColumn" : {
"column" : "TS",
"format" : "yy-MM-dd HH:mm:ss"
},
"topicName" : "test_topic",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "JSON",
"properties" : { }
},
"options" : [ ]
},
"windowInfo" : null
},
"queryPlan" : null
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM TS AS SELECT TEST.ID ID\nFROM TEST TEST\nEMIT CHANGES",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "TS",
"schema" : "`ROWKEY` STRING KEY, `ID` BIGINT",
"keyField" : null,
"timestampColumn" : null,
"topicName" : "TS",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "JSON",
"properties" : { }
},
"options" : [ ]
},
"windowInfo" : null
},
"queryPlan" : {
"sources" : [ "TEST" ],
"sink" : "TS",
"physicalPlan" : {
"@type" : "streamSinkV1",
"properties" : {
"queryContext" : "TS"
},
"source" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "Project"
},
"source" : {
"@type" : "streamSourceV1",
"properties" : {
"queryContext" : "KsqlTopic/Source"
},
"topicName" : "test_topic",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "JSON",
"properties" : { }
},
"options" : [ ]
},
"timestampColumn" : {
"column" : "TS",
"format" : "yy-MM-dd HH:mm:ss"
},
"sourceSchema" : "`ROWKEY` STRING KEY, `ID` BIGINT, `TS` STRING"
},
"selectExpressions" : [ "ID AS ID" ]
},
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "JSON",
"properties" : { }
},
"options" : [ ]
},
"topicName" : "TS",
"timestampColumn" : null
},
"queryId" : "CSAS_TS_0"
}
} ],
"configs" : {
"ksql.extension.dir" : "ext",
"ksql.streams.cache.max.bytes.buffering" : "0",
"ksql.security.extension.class" : null,
"ksql.transient.prefix" : "transient_",
"ksql.persistence.wrap.single.values" : "true",
"ksql.authorization.cache.expiry.time.secs" : "30",
"ksql.schema.registry.url" : "",
"ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler",
"ksql.output.topic.name.prefix" : "",
"ksql.streams.auto.offset.reset" : "earliest",
"ksql.query.pull.enable.standby.reads" : "false",
"ksql.connect.url" : "http://localhost:8083",
"ksql.service.id" : "some.ksql.service.id",
"ksql.internal.topic.min.insync.replicas" : "1",
"ksql.streams.shutdown.timeout.ms" : "300000",
"ksql.new.api.enabled" : "false",
"ksql.streams.state.dir" : "/var/folders/bz/qnz23q_j6v12b3b_tm9ztv700000gn/T/confluent8883205057413437130",
"ksql.internal.topic.replicas" : "1",
"ksql.insert.into.values.enabled" : "true",
"ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807",
"ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler",
"ksql.access.validator.enable" : "auto",
"ksql.streams.bootstrap.servers" : "localhost:0",
"ksql.streams.commit.interval.ms" : "2000",
"ksql.metric.reporters" : "",
"ksql.streams.auto.commit.interval.ms" : "0",
"ksql.metrics.extension" : null,
"ksql.streams.topology.optimization" : "all",
"ksql.query.pull.streamsstore.rebalancing.timeout.ms" : "10000",
"ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.streams.num.stream.threads" : "4",
"ksql.timestamp.throw.on.invalid" : "false",
"ksql.authorization.cache.max.entries" : "10000",
"ksql.metrics.tags.custom" : "",
"ksql.pull.queries.enable" : "true",
"ksql.udfs.enabled" : "true",
"ksql.udf.enable.security.manager" : "true",
"ksql.connect.worker.config" : "",
"ksql.sink.window.change.log.additional.retention" : "1000000",
"ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.udf.collect.metrics" : "false",
"ksql.persistent.prefix" : "query_",
"ksql.query.persistent.active.limit" : "2147483647"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
{
"version" : "6.0.0",
"timestamp" : 1583170254458,
"schemas" : {
"CSAS_TS_0.KsqlTopic.Source" : "STRUCT<ID BIGINT, TS VARCHAR> NOT NULL",
"CSAS_TS_0.TS" : "STRUCT<ID BIGINT> NOT NULL"
},
"inputs" : [ {
"topic" : "test_topic",
"key" : "",
"value" : {
"ID" : 1,
"TS" : "10-04-19 12:00:17"
}
}, {
"topic" : "test_topic",
"key" : "",
"value" : {
"ID" : 2,
"TS" : "!!!!!!!!!!!!!!!!!"
}
}, {
"topic" : "test_topic",
"key" : "",
"value" : {
"ID" : 3,
"TS" : "10-04-19 12:00:17"
}
} ],
"outputs" : [ {
"topic" : "TS",
"key" : "",
"value" : {
"ID" : 1
},
"timestamp" : 1271703617000
}, {
"topic" : "TS",
"key" : "",
"value" : {
"ID" : 3
},
"timestamp" : 1271703617000
} ]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic])
--> KSTREAM-TRANSFORMVALUES-0000000001
Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: [])
--> Project
<-- KSTREAM-SOURCE-0000000000
Processor: Project (stores: [])
--> KSTREAM-SINK-0000000003
<-- KSTREAM-TRANSFORMVALUES-0000000001
Sink: KSTREAM-SINK-0000000003 (topic: TS)
<-- Project

Loading

0 comments on commit 571760b

Please sign in to comment.