Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: change default exception handling for timestamp extractors #4632

Merged
merged 1 commit into from
Mar 3, 2020

Conversation

agavra
Copy link
Contributor

@agavra agavra commented Feb 26, 2020

fixes #2609

Description

ksqlDB currently fails violently whenever a timestamp cannot be extracted from a record. Perhaps more fatally, it also does not log anything to the processing log meaning users that are using the cloud offering of ksql will not be able to see what happened to their queries.

This PR addresses both of those concerns and gives users the ability to toggle whether they want the failure to be violent or graceful so that they can choose not to lose data.

Testing done

New unit tests and end to end testing:

ksql> PRINT ts FROM BEGINNING;
Value format: JSON or KAFKA_STRING
rowtime: 2/25/20 3:11:47 PM PST, key: <null>, value: {"TS":"10-04-19 12:00:17"}
rowtime: 2/25/20 3:11:52 PM PST, key: <null>, value: {"TS":"10-04-19 X 12:00:17"}
rowtime: 2/25/20 3:11:55 PM PST, key: <null>, value: {"TS":"10-04-19 12:00:17"}

ksql> CREATE STREAM ts (ts VARCHAR) WITH (value_format='json', kafka_topic='ts', partitions=1, timestamp='ts', timestamp_format='yy-MM-dd HH:mm:ss')

ksql> SELECT * FROM ts EMIT CHANGES;
+--------------------------------------------------------------+--------------------------------------------------------------+--------------------------------------------------------------+
|ROWTIME                                                       |ROWKEY                                                        |TS                                                            |
+--------------------------------------------------------------+--------------------------------------------------------------+--------------------------------------------------------------+
|1271703617000                                                 |null                                                          |10-04-19 12:00:17                                             |
|1271703617000                                                 |null                                                          |10-04-19 12:00:17                                             |
^CQuery terminated

ksql> SELECT * FROM KSQL_PROCESSING_LOG EMIT CHANGES;
+--------------+--------------+--------------+--------------+--------------+--------------+
|ROWTIME       |ROWKEY        |LOGGER        |LEVEL         |TIME          |MESSAGE       |
+--------------+--------------+--------------+--------------+--------------+--------------+
|1582672325285 |null          |processing.211|ERROR         |1582672325277 |{TYPE=0, DESER|
|              |              |78117353354788|              |              |IALIZATIONERRO|
|              |              |25            |              |              |R={ERRORMESSAG|
|              |              |              |              |              |E=Text '10-04-|
|              |              |              |              |              |19 X 12:00:17'|
|              |              |              |              |              | could not be |
|              |              |              |              |              |parsed at inde|
|              |              |              |              |              |x 9, RECORDB64|
|              |              |              |              |              |=null, CAUSE=[|
|              |              |              |              |              |]}, RECORDPROC|
|              |              |              |              |              |ESSINGERROR=nu|
|              |              |              |              |              |ll, PRODUCTION|
|              |              |              |              |              |ERROR=null}   |

Reviewer checklist

  • Ensure docs are updated if necessary. (eg. if a user visible feature is being added or changed).
  • Ensure relevant issues are linked (description should include text like "Fixes #")

@agavra agavra requested a review from rodesai February 26, 2020 00:53
@agavra agavra requested a review from a team as a code owner February 26, 2020 00:53
if (failOnError) {
throw e;
}
return -1L;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this allow KSQL to display the row with ROWTIME = -1 in case of a failure?

I'm trying to understand how the FailOnInvalidTimestamp will work in this situation. If a -1 is found on the row, then the FailOnInvalidTimestamp will throw an exception which this code will catch and log it to the processing log. But it returns a -1 back to stream, so the stream will set it to as ROWTIME, won't it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any negative timestamp causes the event to be ignored by Kafka Streams. That means that if this wraps FailOnInvalidTimestamp but you set ksql.timestamp.throw.on.invalid to false then it will suppress the throw and just ignore the message

return message(exception, () -> record.map(Base64.getEncoder()::encodeToString).orElse(null));
}

public static Function<ProcessingLogConfig, SchemaAndValue> deserializationErrorMsg(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd use a different message type for this error. Deserialization error messages generally mean the serde couldn't deserialize the record in the topic, and contain a base64 encoded representation of the data inside (in a field called recordB64). The data written out here represents something different - KSQL's view of the row once it's been deserialized.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should be able to use the existing record processing error (EngineProcessingLogMessageFactory.recordProcessingError)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i was debating which I should do since this felt like a type of deserialization error to me, but I'll update it to a new type (I can see both sides)

@agavra agavra force-pushed the timestamp_extractor branch from bd10cfb to ba8cea0 Compare February 28, 2020 23:22
@agavra agavra requested a review from JimGalasyn as a code owner February 28, 2020 23:22
@agavra agavra changed the base branch from 5.5.x to master February 28, 2020 23:23
@agavra agavra requested review from rodesai and spena February 28, 2020 23:23
Copy link
Member

@JimGalasyn JimGalasyn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

@agavra agavra force-pushed the timestamp_extractor branch 3 times, most recently from 571760b to a28de97 Compare March 2, 2020 21:11
@agavra agavra force-pushed the timestamp_extractor branch from a28de97 to 2272e45 Compare March 2, 2020 22:53
Copy link
Contributor

@rodesai rodesai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@agavra agavra changed the title fix: change default exception handling for ksql timestamp extractors fix: change default exception handling for timestamp extractors Mar 3, 2020
@agavra agavra merged commit 1576af0 into confluentinc:master Mar 3, 2020
@agavra agavra deleted the timestamp_extractor branch March 3, 2020 00:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Fatal user code error in TimestampExtractor
4 participants