From 571760b13cea3872e67b19e4210f87eb7bd772e4 Mon Sep 17 00:00:00 2001 From: Almog Gavra Date: Fri, 21 Feb 2020 14:23:15 -0800 Subject: [PATCH] fix: change default exception handling for ksql timestamp extractors --- .../io/confluent/ksql/util/KsqlConfig.java | 14 ++ .../ksql/planner/plan/DataSourceNodeTest.java | 15 ++ .../ksql/test/model/TestCaseNode.java | 5 - .../ksql/test/tools/TestExecutor.java | 7 +- .../ksql/test/planned/TestCasePlanLoader.java | 16 +- .../plan.json | 6 +- .../spec.json | 4 +- .../topology | 0 .../6.0.0_1583170254458/plan.json | 152 +++++++++++++++++ .../6.0.0_1583170254458/spec.json | 45 +++++ .../6.0.0_1583170254458/topology | 13 ++ .../6.0.0_1583181588448/plan.json | 152 +++++++++++++++++ .../6.0.0_1583181588448/spec.json | 45 +++++ .../6.0.0_1583181588448/topology | 13 ++ .../5.5.0_1582672993973/spec.json | 157 ++++++++++++++++++ .../5.5.0_1582672993973/topology | 13 ++ .../timestamp-extractor.json | 33 ++++ .../ksql/execution/streams/SourceBuilder.java | 23 ++- .../timestamp/LoggingTimestampExtractor.java | 107 ++++++++++++ .../timestamp/TimestampExtractionPolicy.java | 9 + .../execution/streams/SourceBuilderTest.java | 14 ++ .../LoggingTimestampExtractorTest.java | 79 +++++++++ .../TimestampExtractionPolicyFactoryTest.java | 16 +- 23 files changed, 915 insertions(+), 23 deletions(-) rename ksql-functional-tests/src/test/resources/historical_plans/timestamp-extractor_-_KSQL_default_timestamp_extractor/{5.5.0_1581572104400 => 6.0.0_1583181389696}/plan.json (95%) rename ksql-functional-tests/src/test/resources/historical_plans/timestamp-extractor_-_KSQL_default_timestamp_extractor/{5.5.0_1581572104400 => 6.0.0_1583181389696}/spec.json (94%) rename ksql-functional-tests/src/test/resources/historical_plans/timestamp-extractor_-_KSQL_default_timestamp_extractor/{5.5.0_1581572104400 => 6.0.0_1583181389696}/topology (100%) create mode 100644 ksql-functional-tests/src/test/resources/historical_plans/timestamp-extractor_-_KSQL_default_timestamp_extractor_with_format/6.0.0_1583170254458/plan.json create mode 100644 ksql-functional-tests/src/test/resources/historical_plans/timestamp-extractor_-_KSQL_default_timestamp_extractor_with_format/6.0.0_1583170254458/spec.json create mode 100644 ksql-functional-tests/src/test/resources/historical_plans/timestamp-extractor_-_KSQL_default_timestamp_extractor_with_format/6.0.0_1583170254458/topology create mode 100644 ksql-functional-tests/src/test/resources/historical_plans/timestamp-extractor_-_KSQL_default_timestamp_extractor_with_format/6.0.0_1583181588448/plan.json create mode 100644 ksql-functional-tests/src/test/resources/historical_plans/timestamp-extractor_-_KSQL_default_timestamp_extractor_with_format/6.0.0_1583181588448/spec.json create mode 100644 ksql-functional-tests/src/test/resources/historical_plans/timestamp-extractor_-_KSQL_default_timestamp_extractor_with_format/6.0.0_1583181588448/topology create mode 100644 ksql-functional-tests/src/test/resources/historical_plans/timestamp-extractor_-_KSQL_throw_on_invalid_timestamp_extractor_with_format/5.5.0_1582672993973/spec.json create mode 100644 ksql-functional-tests/src/test/resources/historical_plans/timestamp-extractor_-_KSQL_throw_on_invalid_timestamp_extractor_with_format/5.5.0_1582672993973/topology create mode 100644 ksql-streams/src/main/java/io/confluent/ksql/execution/streams/timestamp/LoggingTimestampExtractor.java create mode 100644 ksql-streams/src/test/java/io/confluent/ksql/execution/streams/timestamp/LoggingTimestampExtractorTest.java diff --git a/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java b/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java index a4fdc18d4c59..94c2be70e704 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java +++ b/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java @@ -255,6 +255,13 @@ public class KsqlConfig extends AbstractConfig { public static final Boolean KSQL_NEW_API_ENABLED_DEFAULT = false; public static final String KSQL_NEW_API_ENABLED_DOC = "Is the new Vert.x based API enabled?"; + public static final String KSQL_TIMESTAMP_THROW_ON_INVALID = "ksql.timestamp.throw.on.invalid"; + public static final Boolean KSQL_TIMESTAMP_THROW_ON_INVALID_DEFAULT = false; + public static final String KSQL_TIMESTAMP_THROW_ON_INVALID_DOC = "If an incoming message " + + "contains an invalid timestamp, ksqlDB will log a warning and continue. To disable this " + + "behavior, and instead throw an exception to ensure that no data is missed, set " + + "ksql.timestamp.skip.invalid to true."; + private enum ConfigGeneration { LEGACY, CURRENT @@ -609,6 +616,13 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) { Importance.LOW, KSQL_NEW_API_ENABLED_DOC ) + .define( + KSQL_TIMESTAMP_THROW_ON_INVALID, + Type.BOOLEAN, + KSQL_TIMESTAMP_THROW_ON_INVALID_DEFAULT, + Importance.MEDIUM, + KSQL_TIMESTAMP_THROW_ON_INVALID_DOC + ) .withClientSslSupport(); for (final CompatibilityBreakingConfigDef compatibilityBreakingConfigDef diff --git a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/DataSourceNodeTest.java b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/DataSourceNodeTest.java index e7d93e9aa27f..d07c6dbe1f8a 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/DataSourceNodeTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/DataSourceNodeTest.java @@ -22,6 +22,7 @@ import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.same; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -36,6 +37,9 @@ import io.confluent.ksql.execution.streams.KSPlanBuilder; import io.confluent.ksql.execution.timestamp.TimestampColumn; import io.confluent.ksql.function.FunctionRegistry; +import io.confluent.ksql.logging.processing.ProcessingLogContext; +import io.confluent.ksql.logging.processing.ProcessingLogger; +import io.confluent.ksql.logging.processing.ProcessingLoggerFactory; import io.confluent.ksql.metastore.model.DataSource; import io.confluent.ksql.metastore.model.KeyField; import io.confluent.ksql.metastore.model.KsqlStream; @@ -43,6 +47,7 @@ import io.confluent.ksql.model.WindowType; import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.name.SourceName; +import io.confluent.ksql.query.QueryId; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.types.SqlTypes; import io.confluent.ksql.serde.FormatFactory; @@ -141,6 +146,12 @@ public class DataSourceNodeTest { private SchemaKTable table; @Mock private KsqlTopic topic; + @Mock + private ProcessingLogContext processingLogContext; + @Mock + private ProcessingLoggerFactory processingLoggerFactory; + @Mock + private ProcessingLogger processingLogger; private DataSourceNode node; @@ -149,6 +160,10 @@ public class DataSourceNodeTest { public void before() { realBuilder = new StreamsBuilder(); + when(ksqlStreamBuilder.getQueryId()).thenReturn(new QueryId("fooQuery")); + when(ksqlStreamBuilder.getProcessingLogContext()).thenReturn(processingLogContext); + when(processingLogContext.getLoggerFactory()).thenReturn(processingLoggerFactory); + when(processingLoggerFactory.getLogger(anyString())).thenReturn(processingLogger); when(ksqlStreamBuilder.getKsqlConfig()).thenReturn(realConfig); when(ksqlStreamBuilder.getStreamsBuilder()).thenReturn(realBuilder); when(ksqlStreamBuilder.buildNodeContext(any())).thenAnswer(inv -> diff --git a/ksql-functional-tests/src/main/java/io/confluent/ksql/test/model/TestCaseNode.java b/ksql-functional-tests/src/main/java/io/confluent/ksql/test/model/TestCaseNode.java index 84637e17961b..8b289ee8a8e5 100644 --- a/ksql-functional-tests/src/main/java/io/confluent/ksql/test/model/TestCaseNode.java +++ b/ksql-functional-tests/src/main/java/io/confluent/ksql/test/model/TestCaseNode.java @@ -128,11 +128,6 @@ private void validate() { if (this.statements.isEmpty()) { throw new InvalidFieldException("statements", "was empty"); } - - if (!this.inputs.isEmpty() && this.expectedException.isPresent()) { - throw new InvalidFieldException("inputs and expectedException", - "can not both be set"); - } } private static ImmutableList immutableCopyOf(final List source) { diff --git a/ksql-functional-tests/src/main/java/io/confluent/ksql/test/tools/TestExecutor.java b/ksql-functional-tests/src/main/java/io/confluent/ksql/test/tools/TestExecutor.java index da6229ad7d6b..4bcfc99a5bdb 100644 --- a/ksql-functional-tests/src/main/java/io/confluent/ksql/test/tools/TestExecutor.java +++ b/ksql-functional-tests/src/main/java/io/confluent/ksql/test/tools/TestExecutor.java @@ -139,10 +139,6 @@ public void buildAndExecuteQuery(final TestCase testCase) { stubKafkaService ); - testCase.expectedException().map(ee -> { - throw new AssertionError("Expected test to throw" + StringDescription.toString(ee)); - }); - writeInputIntoTopics(testCase.getInputRecords(), stubKafkaService); final Set inputTopics = testCase.getInputRecords() .stream() @@ -181,6 +177,9 @@ public void buildAndExecuteQuery(final TestCase testCase) { .map(Topic::getName) .forEach(allTopicNames::add); + testCase.expectedException().map(ee -> { + throw new AssertionError("Expected test to throw" + StringDescription.toString(ee)); + }); testCase.getPostConditions().verify(ksqlEngine.getMetaStore(), allTopicNames); } catch (final RuntimeException e) { final Optional> expectedExceptionMatcher = testCase.expectedException(); diff --git a/ksql-functional-tests/src/test/java/io/confluent/ksql/test/planned/TestCasePlanLoader.java b/ksql-functional-tests/src/test/java/io/confluent/ksql/test/planned/TestCasePlanLoader.java index 32f97c60c9a2..b7a7b0609d0a 100644 --- a/ksql-functional-tests/src/test/java/io/confluent/ksql/test/planned/TestCasePlanLoader.java +++ b/ksql-functional-tests/src/test/java/io/confluent/ksql/test/planned/TestCasePlanLoader.java @@ -46,6 +46,7 @@ import java.nio.file.Files; import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; import javax.xml.parsers.DocumentBuilder; @@ -141,6 +142,7 @@ public static List allForTestCase(final TestCase testCase) { .orElse(Collections.emptyList()) .stream() .map(p -> parseSpec(rootforCase.resolve(p))) + .filter(Objects::nonNull) .collect(Collectors.toList()); } @@ -149,11 +151,15 @@ private static TestCasePlan parseSpec(final PlannedTestPath versionDir) { final PlannedTestPath specPath = versionDir.resolve(PlannedTestPath.SPEC_FILE); final PlannedTestPath topologyPath = versionDir.resolve(PlannedTestPath.TOPOLOGY_FILE); - return new TestCasePlan( - parseJson(specPath, JsonTestLoader.OBJECT_MAPPER, TestCaseSpecNode.class), - parseJson(planPath, PlannedTestUtils.PLAN_MAPPER, TestCasePlanNode.class), - slurp(topologyPath) - ); + try { + return new TestCasePlan( + parseJson(specPath, JsonTestLoader.OBJECT_MAPPER, TestCaseSpecNode.class), + parseJson(planPath, PlannedTestUtils.PLAN_MAPPER, TestCasePlanNode.class), + slurp(topologyPath) + ); + } catch (final Exception e) { + return null; + } } private static T parseJson(final PlannedTestPath path, final ObjectMapper mapper, diff --git a/ksql-functional-tests/src/test/resources/historical_plans/timestamp-extractor_-_KSQL_default_timestamp_extractor/5.5.0_1581572104400/plan.json b/ksql-functional-tests/src/test/resources/historical_plans/timestamp-extractor_-_KSQL_default_timestamp_extractor/6.0.0_1583181389696/plan.json similarity index 95% rename from ksql-functional-tests/src/test/resources/historical_plans/timestamp-extractor_-_KSQL_default_timestamp_extractor/5.5.0_1581572104400/plan.json rename to ksql-functional-tests/src/test/resources/historical_plans/timestamp-extractor_-_KSQL_default_timestamp_extractor/6.0.0_1583181389696/plan.json index b4ac931e344c..cd9d5845037e 100644 --- a/ksql-functional-tests/src/test/resources/historical_plans/timestamp-extractor_-_KSQL_default_timestamp_extractor/5.5.0_1581572104400/plan.json +++ b/ksql-functional-tests/src/test/resources/historical_plans/timestamp-extractor_-_KSQL_default_timestamp_extractor/6.0.0_1583181389696/plan.json @@ -92,7 +92,8 @@ }, "options" : [ ] }, - "topicName" : "TS" + "topicName" : "TS", + "timestampColumn" : null }, "queryId" : "CSAS_TS_0" } @@ -114,7 +115,7 @@ "ksql.internal.topic.min.insync.replicas" : "1", "ksql.streams.shutdown.timeout.ms" : "300000", "ksql.new.api.enabled" : "false", - "ksql.streams.state.dir" : "/var/folders/p9/bk8xks6s2lndncftdbq36xh80000gp/T/confluent8393051053551629823", + "ksql.streams.state.dir" : "/var/folders/bz/qnz23q_j6v12b3b_tm9ztv700000gn/T/confluent8144864285731138620", "ksql.internal.topic.replicas" : "1", "ksql.insert.into.values.enabled" : "true", "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", @@ -129,6 +130,7 @@ "ksql.query.pull.streamsstore.rebalancing.timeout.ms" : "10000", "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", "ksql.authorization.cache.max.entries" : "10000", "ksql.metrics.tags.custom" : "", "ksql.pull.queries.enable" : "true", diff --git a/ksql-functional-tests/src/test/resources/historical_plans/timestamp-extractor_-_KSQL_default_timestamp_extractor/5.5.0_1581572104400/spec.json b/ksql-functional-tests/src/test/resources/historical_plans/timestamp-extractor_-_KSQL_default_timestamp_extractor/6.0.0_1583181389696/spec.json similarity index 94% rename from ksql-functional-tests/src/test/resources/historical_plans/timestamp-extractor_-_KSQL_default_timestamp_extractor/5.5.0_1581572104400/spec.json rename to ksql-functional-tests/src/test/resources/historical_plans/timestamp-extractor_-_KSQL_default_timestamp_extractor/6.0.0_1583181389696/spec.json index 0d1f17f4106a..cf6bbc486232 100644 --- a/ksql-functional-tests/src/test/resources/historical_plans/timestamp-extractor_-_KSQL_default_timestamp_extractor/5.5.0_1581572104400/spec.json +++ b/ksql-functional-tests/src/test/resources/historical_plans/timestamp-extractor_-_KSQL_default_timestamp_extractor/6.0.0_1583181389696/spec.json @@ -1,6 +1,6 @@ { - "version" : "5.5.0", - "timestamp" : 1581572104400, + "version" : "6.0.0", + "timestamp" : 1583181389696, "schemas" : { "CSAS_TS_0.KsqlTopic.Source" : "STRUCT NOT NULL", "CSAS_TS_0.TS" : "STRUCT NOT NULL" diff --git a/ksql-functional-tests/src/test/resources/historical_plans/timestamp-extractor_-_KSQL_default_timestamp_extractor/5.5.0_1581572104400/topology b/ksql-functional-tests/src/test/resources/historical_plans/timestamp-extractor_-_KSQL_default_timestamp_extractor/6.0.0_1583181389696/topology similarity index 100% rename from ksql-functional-tests/src/test/resources/historical_plans/timestamp-extractor_-_KSQL_default_timestamp_extractor/5.5.0_1581572104400/topology rename to ksql-functional-tests/src/test/resources/historical_plans/timestamp-extractor_-_KSQL_default_timestamp_extractor/6.0.0_1583181389696/topology diff --git a/ksql-functional-tests/src/test/resources/historical_plans/timestamp-extractor_-_KSQL_default_timestamp_extractor_with_format/6.0.0_1583170254458/plan.json b/ksql-functional-tests/src/test/resources/historical_plans/timestamp-extractor_-_KSQL_default_timestamp_extractor_with_format/6.0.0_1583170254458/plan.json new file mode 100644 index 000000000000..c6e5928d323c --- /dev/null +++ b/ksql-functional-tests/src/test/resources/historical_plans/timestamp-extractor_-_KSQL_default_timestamp_extractor_with_format/6.0.0_1583170254458/plan.json @@ -0,0 +1,152 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST (ID BIGINT, TS STRING) WITH (KAFKA_TOPIC='test_topic', TIMESTAMP='ts', TIMESTAMP_FORMAT='yy-MM-dd HH:mm:ss', VALUE_FORMAT='JSON');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST", + "schema" : "`ROWKEY` STRING KEY, `ID` BIGINT, `TS` STRING", + "keyField" : null, + "timestampColumn" : { + "column" : "TS", + "format" : "yy-MM-dd HH:mm:ss" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON", + "properties" : { } + }, + "options" : [ ] + }, + "windowInfo" : null + }, + "queryPlan" : null + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TS AS SELECT TEST.ID ID\nFROM TEST TEST\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TS", + "schema" : "`ROWKEY` STRING KEY, `ID` BIGINT", + "keyField" : null, + "timestampColumn" : null, + "topicName" : "TS", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON", + "properties" : { } + }, + "options" : [ ] + }, + "windowInfo" : null + }, + "queryPlan" : { + "sources" : [ "TEST" ], + "sink" : "TS", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "TS" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON", + "properties" : { } + }, + "options" : [ ] + }, + "timestampColumn" : { + "column" : "TS", + "format" : "yy-MM-dd HH:mm:ss" + }, + "sourceSchema" : "`ROWKEY` STRING KEY, `ID` BIGINT, `TS` STRING" + }, + "selectExpressions" : [ "ID AS ID" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON", + "properties" : { } + }, + "options" : [ ] + }, + "topicName" : "TS", + "timestampColumn" : null + }, + "queryId" : "CSAS_TS_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "ksql.transient.prefix" : "transient_", + "ksql.persistence.wrap.single.values" : "true", + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.schema.registry.url" : "", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.new.api.enabled" : "false", + "ksql.streams.state.dir" : "/var/folders/bz/qnz23q_j6v12b3b_tm9ztv700000gn/T/confluent8883205057413437130", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.metric.reporters" : "", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.metrics.extension" : null, + "ksql.streams.topology.optimization" : "all", + "ksql.query.pull.streamsstore.rebalancing.timeout.ms" : "10000", + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.metrics.tags.custom" : "", + "ksql.pull.queries.enable" : "true", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.query.persistent.active.limit" : "2147483647" + } +} \ No newline at end of file diff --git a/ksql-functional-tests/src/test/resources/historical_plans/timestamp-extractor_-_KSQL_default_timestamp_extractor_with_format/6.0.0_1583170254458/spec.json b/ksql-functional-tests/src/test/resources/historical_plans/timestamp-extractor_-_KSQL_default_timestamp_extractor_with_format/6.0.0_1583170254458/spec.json new file mode 100644 index 000000000000..a1156d121213 --- /dev/null +++ b/ksql-functional-tests/src/test/resources/historical_plans/timestamp-extractor_-_KSQL_default_timestamp_extractor_with_format/6.0.0_1583170254458/spec.json @@ -0,0 +1,45 @@ +{ + "version" : "6.0.0", + "timestamp" : 1583170254458, + "schemas" : { + "CSAS_TS_0.KsqlTopic.Source" : "STRUCT NOT NULL", + "CSAS_TS_0.TS" : "STRUCT NOT NULL" + }, + "inputs" : [ { + "topic" : "test_topic", + "key" : "", + "value" : { + "ID" : 1, + "TS" : "10-04-19 12:00:17" + } + }, { + "topic" : "test_topic", + "key" : "", + "value" : { + "ID" : 2, + "TS" : "!!!!!!!!!!!!!!!!!" + } + }, { + "topic" : "test_topic", + "key" : "", + "value" : { + "ID" : 3, + "TS" : "10-04-19 12:00:17" + } + } ], + "outputs" : [ { + "topic" : "TS", + "key" : "", + "value" : { + "ID" : 1 + }, + "timestamp" : 1271703617000 + }, { + "topic" : "TS", + "key" : "", + "value" : { + "ID" : 3 + }, + "timestamp" : 1271703617000 + } ] +} \ No newline at end of file diff --git a/ksql-functional-tests/src/test/resources/historical_plans/timestamp-extractor_-_KSQL_default_timestamp_extractor_with_format/6.0.0_1583170254458/topology b/ksql-functional-tests/src/test/resources/historical_plans/timestamp-extractor_-_KSQL_default_timestamp_extractor_with_format/6.0.0_1583170254458/topology new file mode 100644 index 000000000000..aabd9cade152 --- /dev/null +++ b/ksql-functional-tests/src/test/resources/historical_plans/timestamp-extractor_-_KSQL_default_timestamp_extractor_with_format/6.0.0_1583170254458/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: TS) + <-- Project + diff --git a/ksql-functional-tests/src/test/resources/historical_plans/timestamp-extractor_-_KSQL_default_timestamp_extractor_with_format/6.0.0_1583181588448/plan.json b/ksql-functional-tests/src/test/resources/historical_plans/timestamp-extractor_-_KSQL_default_timestamp_extractor_with_format/6.0.0_1583181588448/plan.json new file mode 100644 index 000000000000..cb6753e407f8 --- /dev/null +++ b/ksql-functional-tests/src/test/resources/historical_plans/timestamp-extractor_-_KSQL_default_timestamp_extractor_with_format/6.0.0_1583181588448/plan.json @@ -0,0 +1,152 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST (ID BIGINT, TS STRING) WITH (KAFKA_TOPIC='test_topic', TIMESTAMP='ts', TIMESTAMP_FORMAT='yy-MM-dd HH:mm:ssX', VALUE_FORMAT='JSON');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST", + "schema" : "`ROWKEY` STRING KEY, `ID` BIGINT, `TS` STRING", + "keyField" : null, + "timestampColumn" : { + "column" : "TS", + "format" : "yy-MM-dd HH:mm:ssX" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON", + "properties" : { } + }, + "options" : [ ] + }, + "windowInfo" : null + }, + "queryPlan" : null + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TS AS SELECT TEST.ID ID\nFROM TEST TEST\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TS", + "schema" : "`ROWKEY` STRING KEY, `ID` BIGINT", + "keyField" : null, + "timestampColumn" : null, + "topicName" : "TS", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON", + "properties" : { } + }, + "options" : [ ] + }, + "windowInfo" : null + }, + "queryPlan" : { + "sources" : [ "TEST" ], + "sink" : "TS", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "TS" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON", + "properties" : { } + }, + "options" : [ ] + }, + "timestampColumn" : { + "column" : "TS", + "format" : "yy-MM-dd HH:mm:ssX" + }, + "sourceSchema" : "`ROWKEY` STRING KEY, `ID` BIGINT, `TS` STRING" + }, + "selectExpressions" : [ "ID AS ID" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON", + "properties" : { } + }, + "options" : [ ] + }, + "topicName" : "TS", + "timestampColumn" : null + }, + "queryId" : "CSAS_TS_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "ksql.transient.prefix" : "transient_", + "ksql.persistence.wrap.single.values" : "true", + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.schema.registry.url" : "", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.new.api.enabled" : "false", + "ksql.streams.state.dir" : "/var/folders/bz/qnz23q_j6v12b3b_tm9ztv700000gn/T/confluent5552914606492283376", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.metric.reporters" : "", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.metrics.extension" : null, + "ksql.streams.topology.optimization" : "all", + "ksql.query.pull.streamsstore.rebalancing.timeout.ms" : "10000", + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.metrics.tags.custom" : "", + "ksql.pull.queries.enable" : "true", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.query.persistent.active.limit" : "2147483647" + } +} \ No newline at end of file diff --git a/ksql-functional-tests/src/test/resources/historical_plans/timestamp-extractor_-_KSQL_default_timestamp_extractor_with_format/6.0.0_1583181588448/spec.json b/ksql-functional-tests/src/test/resources/historical_plans/timestamp-extractor_-_KSQL_default_timestamp_extractor_with_format/6.0.0_1583181588448/spec.json new file mode 100644 index 000000000000..7ca79ca94308 --- /dev/null +++ b/ksql-functional-tests/src/test/resources/historical_plans/timestamp-extractor_-_KSQL_default_timestamp_extractor_with_format/6.0.0_1583181588448/spec.json @@ -0,0 +1,45 @@ +{ + "version" : "6.0.0", + "timestamp" : 1583181588448, + "schemas" : { + "CSAS_TS_0.KsqlTopic.Source" : "STRUCT NOT NULL", + "CSAS_TS_0.TS" : "STRUCT NOT NULL" + }, + "inputs" : [ { + "topic" : "test_topic", + "key" : "", + "value" : { + "ID" : 1, + "TS" : "10-04-19 12:00:17-07" + } + }, { + "topic" : "test_topic", + "key" : "", + "value" : { + "ID" : 2, + "TS" : "!!!!!!!!!!!!!!!!!" + } + }, { + "topic" : "test_topic", + "key" : "", + "value" : { + "ID" : 3, + "TS" : "10-04-19 12:00:17-07" + } + } ], + "outputs" : [ { + "topic" : "TS", + "key" : "", + "value" : { + "ID" : 1 + }, + "timestamp" : 1271703617000 + }, { + "topic" : "TS", + "key" : "", + "value" : { + "ID" : 3 + }, + "timestamp" : 1271703617000 + } ] +} \ No newline at end of file diff --git a/ksql-functional-tests/src/test/resources/historical_plans/timestamp-extractor_-_KSQL_default_timestamp_extractor_with_format/6.0.0_1583181588448/topology b/ksql-functional-tests/src/test/resources/historical_plans/timestamp-extractor_-_KSQL_default_timestamp_extractor_with_format/6.0.0_1583181588448/topology new file mode 100644 index 000000000000..aabd9cade152 --- /dev/null +++ b/ksql-functional-tests/src/test/resources/historical_plans/timestamp-extractor_-_KSQL_default_timestamp_extractor_with_format/6.0.0_1583181588448/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: TS) + <-- Project + diff --git a/ksql-functional-tests/src/test/resources/historical_plans/timestamp-extractor_-_KSQL_throw_on_invalid_timestamp_extractor_with_format/5.5.0_1582672993973/spec.json b/ksql-functional-tests/src/test/resources/historical_plans/timestamp-extractor_-_KSQL_throw_on_invalid_timestamp_extractor_with_format/5.5.0_1582672993973/spec.json new file mode 100644 index 000000000000..25d0bfad466e --- /dev/null +++ b/ksql-functional-tests/src/test/resources/historical_plans/timestamp-extractor_-_KSQL_throw_on_invalid_timestamp_extractor_with_format/5.5.0_1582672993973/spec.json @@ -0,0 +1,157 @@ +{ + "version" : "5.5.0", + "timestamp" : 1582672993973, + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST (ID BIGINT, TS STRING) WITH (KAFKA_TOPIC='test_topic', TIMESTAMP='ts', TIMESTAMP_FORMAT='yy-MM-dd HH:mm:ss', VALUE_FORMAT='JSON');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST", + "schema" : "`ROWKEY` STRING KEY, `ID` BIGINT, `TS` STRING", + "keyField" : null, + "timestampColumn" : { + "column" : "TS", + "format" : "yy-MM-dd HH:mm:ss" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON", + "properties" : { } + }, + "options" : [ ] + }, + "windowInfo" : null + }, + "queryPlan" : null + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TS AS SELECT TEST.ID ID\nFROM TEST TEST\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TS", + "schema" : "`ROWKEY` STRING KEY, `ID` BIGINT", + "keyField" : null, + "timestampColumn" : null, + "topicName" : "TS", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON", + "properties" : { } + }, + "options" : [ ] + }, + "windowInfo" : null + }, + "queryPlan" : { + "sources" : [ "TEST" ], + "sink" : "TS", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "TS" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON", + "properties" : { } + }, + "options" : [ ] + }, + "timestampColumn" : { + "column" : "TS", + "format" : "yy-MM-dd HH:mm:ss" + }, + "sourceSchema" : "`ROWKEY` STRING KEY, `ID` BIGINT, `TS` STRING" + }, + "selectExpressions" : [ "ID AS ID" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON", + "properties" : { } + }, + "options" : [ ] + }, + "topicName" : "TS" + }, + "queryId" : "CSAS_TS_0" + } + } ], + "schemas" : { + "CSAS_TS_0.KsqlTopic.Source" : "STRUCT NOT NULL", + "CSAS_TS_0.TS" : "STRUCT NOT NULL" + }, + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "ksql.transient.prefix" : "transient_", + "ksql.persistence.wrap.single.values" : "true", + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.schema.registry.url" : "", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.new.api.enabled" : "false", + "ksql.streams.state.dir" : "/var/folders/bz/qnz23q_j6v12b3b_tm9ztv700000gn/T/confluent8169473536565142001", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.metric.reporters" : "", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.metrics.extension" : null, + "ksql.streams.topology.optimization" : "all", + "ksql.query.pull.streamsstore.rebalancing.timeout.ms" : "10000", + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.metrics.tags.custom" : "", + "ksql.pull.queries.enable" : "true", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.query.persistent.active.limit" : "2147483647" + } +} \ No newline at end of file diff --git a/ksql-functional-tests/src/test/resources/historical_plans/timestamp-extractor_-_KSQL_throw_on_invalid_timestamp_extractor_with_format/5.5.0_1582672993973/topology b/ksql-functional-tests/src/test/resources/historical_plans/timestamp-extractor_-_KSQL_throw_on_invalid_timestamp_extractor_with_format/5.5.0_1582672993973/topology new file mode 100644 index 000000000000..aabd9cade152 --- /dev/null +++ b/ksql-functional-tests/src/test/resources/historical_plans/timestamp-extractor_-_KSQL_throw_on_invalid_timestamp_extractor_with_format/5.5.0_1582672993973/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: TS) + <-- Project + diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/timestamp-extractor.json b/ksql-functional-tests/src/test/resources/query-validation-tests/timestamp-extractor.json index 4e269e2f9b62..cdc10d64e45f 100644 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/timestamp-extractor.json +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/timestamp-extractor.json @@ -20,6 +20,39 @@ {"topic": "TS", "value": {"ID": 3}, "timestamp": 1589234313000} ] }, + { + "name": "KSQL default timestamp extractor with format", + "statements": [ + "CREATE STREAM TEST (ID bigint, TS varchar) WITH (kafka_topic='test_topic', value_format='JSON', timestamp='ts', timestamp_format='yy-MM-dd HH:mm:ssX');", + "CREATE STREAM TS AS SELECT id FROM test;" + ], + "inputs": [ + {"topic": "test_topic", "value": {"ID": 1, "TS": "10-04-19 12:00:17-07"}}, + {"topic": "test_topic", "value": {"ID": 2, "TS": "!!!!!!!!!!!!!!!!!"}}, + {"topic": "test_topic", "value": {"ID": 3, "TS": "10-04-19 12:00:17-07"}} + ], + "outputs": [ + {"topic": "TS", "value": {"ID": 1}, "timestamp": 1271703617000}, + {"topic": "TS", "value": {"ID": 3}, "timestamp": 1271703617000} + ] + }, + { + "name": "KSQL throw on invalid timestamp extractor with format", + "statements": [ + "CREATE STREAM TEST (ID bigint, TS varchar) WITH (kafka_topic='test_topic', value_format='JSON', timestamp='ts', timestamp_format='yy-MM-dd HH:mm:ssX');", + "CREATE STREAM TS AS SELECT id FROM test;" + ], + "properties": { + "ksql.timestamp.throw.on.invalid": "true" + }, + "inputs": [ + {"topic": "test_topic", "value": {"ID": 2, "TS": "!!!!!!!!!!!!!!!!!"}} + ], + "expectedException": { + "type": "org.apache.kafka.streams.errors.StreamsException", + "message": "Fatal user code error in TimestampExtractor callback for record" + } + }, { "name": "KSQL override timestamp extractor", "statements": [ diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/SourceBuilder.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/SourceBuilder.java index f8d754682252..86e8b5569ae2 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/SourceBuilder.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/SourceBuilder.java @@ -18,7 +18,9 @@ import io.confluent.ksql.GenericRow; import io.confluent.ksql.execution.builder.KsqlQueryBuilder; +import io.confluent.ksql.execution.context.QueryContext; import io.confluent.ksql.execution.context.QueryContext.Stacker; +import io.confluent.ksql.execution.context.QueryLoggerUtil; import io.confluent.ksql.execution.plan.ExecutionStepPropertiesV1; import io.confluent.ksql.execution.plan.KStreamHolder; import io.confluent.ksql.execution.plan.KTableHolder; @@ -31,6 +33,7 @@ import io.confluent.ksql.execution.streams.timestamp.TimestampExtractionPolicy; import io.confluent.ksql.execution.streams.timestamp.TimestampExtractionPolicyFactory; import io.confluent.ksql.execution.timestamp.TimestampColumn; +import io.confluent.ksql.query.QueryId; import io.confluent.ksql.schema.ksql.Column; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.PhysicalSchema; @@ -301,7 +304,9 @@ private static KTable buildKTable( private static TimestampExtractor timestampExtractor( final KsqlConfig ksqlConfig, final LogicalSchema sourceSchema, - final Optional timestampColumn + final Optional timestampColumn, + final SourceStep streamSource, + final KsqlQueryBuilder queryBuilder ) { final TimestampExtractionPolicy timestampPolicy = TimestampExtractionPolicyFactory.create( ksqlConfig, @@ -314,7 +319,17 @@ private static TimestampExtractor timestampExtractor( .map(Column::index) .orElse(-1); - return timestampPolicy.create(timestampIndex); + final QueryId queryId = queryBuilder.getQueryId(); + final QueryContext queryContext = streamSource.getProperties().getQueryContext(); + final String loggerNamePrefix = QueryLoggerUtil.queryLoggerName(queryId, queryContext); + + return timestampPolicy.create( + timestampIndex, + ksqlConfig.getBoolean(KsqlConfig.KSQL_TIMESTAMP_THROW_ON_INVALID), + queryBuilder.getProcessingLogContext() + .getLoggerFactory() + .getLogger(loggerNamePrefix) + ); } private static Consumed buildSourceConsumed( @@ -327,7 +342,9 @@ private static Consumed buildSourceConsumed( final TimestampExtractor timestampExtractor = timestampExtractor( queryBuilder.getKsqlConfig(), streamSource.getSourceSchema(), - streamSource.getTimestampColumn() + streamSource.getTimestampColumn(), + streamSource, + queryBuilder ); final Consumed consumed = consumedFactory .create(keySerde, valueSerde) diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/timestamp/LoggingTimestampExtractor.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/timestamp/LoggingTimestampExtractor.java new file mode 100644 index 000000000000..4d70400d0fe1 --- /dev/null +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/timestamp/LoggingTimestampExtractor.java @@ -0,0 +1,107 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.streams.timestamp; + +import com.google.common.annotations.VisibleForTesting; +import io.confluent.ksql.GenericRow; +import io.confluent.ksql.logging.processing.ProcessingLogConfig; +import io.confluent.ksql.logging.processing.ProcessingLogMessageSchema; +import io.confluent.ksql.logging.processing.ProcessingLogMessageSchema.MessageType; +import io.confluent.ksql.logging.processing.ProcessingLogger; +import io.confluent.ksql.util.ErrorMessageUtil; +import java.util.List; +import java.util.Objects; +import java.util.function.Function; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.streams.processor.TimestampExtractor; + +/** + * A wrapper around {@code TimestampExtractor} that can be configured to suppress any + * errors and instead returns a negative timestamp (indicating to streams that the message + * should be ignored). Additionally, this class ensures that any errors are logged to the + * processing log (even the fatal ones) for visibility. + */ +public class LoggingTimestampExtractor implements KsqlTimestampExtractor { + + private final KsqlTimestampExtractor delegate; + private final ProcessingLogger logger; + private final boolean failOnError; + + public LoggingTimestampExtractor( + final KsqlTimestampExtractor delegate, + final ProcessingLogger logger, + final boolean failOnError + ) { + this.delegate = Objects.requireNonNull(delegate, "delegate"); + this.logger = Objects.requireNonNull(logger, "logger"); + this.failOnError = failOnError; + } + + @Override + public long extract(final ConsumerRecord record, final long previousTimestamp) { + try { + return delegate.extract(record, previousTimestamp); + } catch (final Exception e) { + logger.error(timestampExtractErroMsg(e, record.value().toString())); + if (failOnError) { + throw e; + } + return -1L; + } + } + + @Override + public long extract(final GenericRow row) { + try { + return delegate.extract(row); + } catch (final Exception e) { + logger.error(timestampExtractErroMsg(e, row.toString())); + if (failOnError) { + throw e; + } + return -1L; + } + } + + @VisibleForTesting + TimestampExtractor getDelegate() { + return delegate; + } + + public Function timestampExtractErroMsg( + final Exception e, + final String row + ) { + return config -> { + final Struct message = new Struct(ProcessingLogMessageSchema.PROCESSING_LOG_SCHEMA); + final Struct error = new Struct(MessageType.RECORD_PROCESSING_ERROR.getSchema()); + + error.put(ProcessingLogMessageSchema.RECORD_PROCESSING_ERROR_FIELD_MESSAGE, e.getMessage()); + + if (config.getBoolean(ProcessingLogConfig.INCLUDE_ROWS)) { + error.put(ProcessingLogMessageSchema.RECORD_PROCESSING_ERROR_FIELD_RECORD, row.toString()); + } + + final List cause = ErrorMessageUtil.getErrorMessages(e); + cause.remove(0); + error.put(ProcessingLogMessageSchema.RECORD_PROCESSING_ERROR_FIELD_CAUSE, cause); + + return new SchemaAndValue(ProcessingLogMessageSchema.PROCESSING_LOG_SCHEMA, message); + }; + } +} diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/timestamp/TimestampExtractionPolicy.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/timestamp/TimestampExtractionPolicy.java index 4bafb69a6922..8b606bfc3f3e 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/timestamp/TimestampExtractionPolicy.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/timestamp/TimestampExtractionPolicy.java @@ -20,6 +20,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.fasterxml.jackson.annotation.JsonTypeInfo.As; import com.google.errorprone.annotations.Immutable; +import io.confluent.ksql.logging.processing.ProcessingLogger; import io.confluent.ksql.name.ColumnName; @Immutable @@ -34,6 +35,14 @@ }) public interface TimestampExtractionPolicy { + default KsqlTimestampExtractor create( + final int columnIndex, + final boolean throwOnInvalid, + final ProcessingLogger logger + ) { + return new LoggingTimestampExtractor(create(columnIndex), logger, throwOnInvalid); + } + KsqlTimestampExtractor create(int columnIndex); default ColumnName getTimestampField() { diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/SourceBuilderTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/SourceBuilderTest.java index fa155739b6fa..b8d6f9a8bd9d 100644 --- a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/SourceBuilderTest.java +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/SourceBuilderTest.java @@ -46,7 +46,11 @@ import io.confluent.ksql.execution.plan.WindowedStreamSource; import io.confluent.ksql.execution.plan.WindowedTableSource; import io.confluent.ksql.execution.timestamp.TimestampColumn; +import io.confluent.ksql.logging.processing.ProcessingLogContext; +import io.confluent.ksql.logging.processing.ProcessingLogger; +import io.confluent.ksql.logging.processing.ProcessingLoggerFactory; import io.confluent.ksql.name.ColumnName; +import io.confluent.ksql.query.QueryId; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.PhysicalSchema; import io.confluent.ksql.schema.ksql.types.SqlTypes; @@ -167,6 +171,12 @@ public class SourceBuilderTest { private MaterializedFactory materializationFactory; @Mock private Materialized> materialized; + @Mock + private ProcessingLogContext processingLogContext; + @Mock + private ProcessingLoggerFactory processingLoggerFactory; + @Mock + private ProcessingLogger processingLogger; @Captor private ArgumentCaptor> transformSupplierCaptor; @Captor @@ -189,6 +199,8 @@ public class SourceBuilderTest { @SuppressWarnings("unchecked") public void setup() { when(queryBuilder.getStreamsBuilder()).thenReturn(streamsBuilder); + when(queryBuilder.getQueryId()).thenReturn(new QueryId("fooQuery")); + when(queryBuilder.getProcessingLogContext()).thenReturn(processingLogContext); when(streamsBuilder.stream(anyString(), any(Consumed.class))).thenReturn(kStream); when(streamsBuilder.table(anyString(), any(), any())).thenReturn(kTable); when(kStream.mapValues(any(ValueMapper.class))).thenReturn(kStream); @@ -199,6 +211,8 @@ public void setup() { when(queryBuilder.buildValueSerde(any(), any(), any())).thenReturn(valueSerde); when(queryBuilder.getKsqlConfig()).thenReturn(KSQL_CONFIG); when(processorCtx.timestamp()).thenReturn(A_ROWTIME); + when(processingLogContext.getLoggerFactory()).thenReturn(processingLoggerFactory); + when(processingLoggerFactory.getLogger(anyString())).thenReturn(processingLogger); when(streamsFactories.getConsumedFactory()).thenReturn(consumedFactory); when(streamsFactories.getMaterializedFactory()).thenReturn(materializationFactory); when(materializationFactory.create(any(), any(), any())) diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/timestamp/LoggingTimestampExtractorTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/timestamp/LoggingTimestampExtractorTest.java new file mode 100644 index 000000000000..ebfc9a6beea7 --- /dev/null +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/timestamp/LoggingTimestampExtractorTest.java @@ -0,0 +1,79 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.streams.timestamp; + +import static org.mockito.ArgumentMatchers.any; + +import io.confluent.ksql.GenericRow; +import io.confluent.ksql.logging.processing.ProcessingLogger; +import io.confluent.ksql.util.KsqlException; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class LoggingTimestampExtractorTest { + + @Mock + private ProcessingLogger logger; + + @Mock + private GenericRow row; + + @Test + public void shouldLogExceptionsAndNotFail() { + // Given: + final LoggingTimestampExtractor extractor = new LoggingTimestampExtractor( + (rec) -> { + throw new KsqlException("foo"); + }, + logger, + false + ); + + // When: + extractor.extract(row); + + // Then (did not throw): + Mockito.verify(logger).error(any()); + } + + @Test + public void shouldLogExceptionsAndFail() { + // Given: + final LoggingTimestampExtractor extractor = new LoggingTimestampExtractor( + (rec) -> { + throw new KsqlException("foo"); + }, + logger, + true + ); + + // When/Then: + try { + extractor.extract(row); + } catch (final Exception e) { + Mockito.verify(logger).error(any()); + return; + } + + Assert.fail("Expected error!"); + } + +} \ No newline at end of file diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/timestamp/TimestampExtractionPolicyFactoryTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/timestamp/TimestampExtractionPolicyFactoryTest.java index 656e7c090f01..8f1211ba3b7a 100644 --- a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/timestamp/TimestampExtractionPolicyFactoryTest.java +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/timestamp/TimestampExtractionPolicyFactoryTest.java @@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableMap; import io.confluent.ksql.execution.timestamp.TimestampColumn; +import io.confluent.ksql.logging.processing.ProcessingLogger; import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.types.SqlTypes; @@ -30,12 +31,17 @@ import java.util.Optional; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.FailOnInvalidTimestamp; +import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.streams.processor.UsePartitionTimeOnInvalidTimestamp; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +@RunWith(MockitoJUnitRunner.class) public class TimestampExtractionPolicyFactoryTest { private final LogicalSchema.Builder schemaBuilder2 = LogicalSchema.builder() @@ -46,6 +52,9 @@ public class TimestampExtractionPolicyFactoryTest { @Rule public final ExpectedException expectedException = ExpectedException.none(); + @Mock + private ProcessingLogger logger; + @Before public void setup() { ksqlConfig = new KsqlConfig(Collections.emptyMap()); @@ -63,8 +72,11 @@ public void shouldCreateMetadataPolicyWhenTimestampFieldNotProvided() { // Then: assertThat(result, instanceOf(MetadataTimestampExtractionPolicy.class)); - assertThat(((MetadataTimestampExtractor)result.create(0)).getTimestampExtractor(), - instanceOf(FailOnInvalidTimestamp.class)); + final TimestampExtractor timestampExtractor = result.create(0, true, logger); + assertThat(timestampExtractor, instanceOf(LoggingTimestampExtractor.class)); + assertThat( + ((LoggingTimestampExtractor) timestampExtractor).getDelegate(), + instanceOf(MetadataTimestampExtractor.class)); } @Test