Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: clarify key or value in (de)serialization processing log messages #6109

Merged
merged 5 commits into from
Sep 8, 2020
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion docs/developer-guide/test-and-debug/processing-log.md
Original file line number Diff line number Diff line change
@@ -121,6 +121,11 @@ message.deserializationError (STRUCT)
: The contents of a message with type 0 (DESERIALIZATION_ERROR).
Logged when a deserializer fails to deserialize an {{ site.ak }} record.

message.deserializationError.component (STRING)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a huge fan of the name "component" but couldn't think of anything better. Hopefully someone else has a better suggestion?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I poked around a bit and didn't see consensus on a good term, but I did see a few sources using the very technical term "part", like "key-part" and "value-part".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't mind component or part, or how about target?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, updated to "target".


: Either "key" or "value" representing the component of the row that
failed to deserialize.

message.deserializationError.errorMessage (STRING)

: A string containing a human-readable error message detailing the
@@ -174,6 +179,11 @@ message.serializationError (STRUCT)

: The contents of a message with type 3 (SERIALIZATION_ERROR).
Logged when a serializer fails to serialize a ksqlDB row.

message.serializationError.component (STRING)

: Either "key" or "value" representing the component of the row that
failed to serialize.

message.serializationError.errorMessage (STRING)

@@ -265,7 +275,7 @@ Field | Type
LOGGER | VARCHAR(STRING)
LEVEL | VARCHAR(STRING)
TIME | BIGINT
MESSAGE | STRUCT<type INTEGER, deserializationError STRUCT<errorMessage VARCHAR(STRING), recordB64 VARCHAR(STRING), cause ARRAY<VARCHAR(STRING)>, topic VARCHAR(STRING)>, ...>
MESSAGE | STRUCT<type INTEGER, deserializationError STRUCT<component VARCHAR(STRING), errorMessage VARCHAR(STRING), recordB64 VARCHAR(STRING), cause ARRAY<VARCHAR(STRING)>, topic VARCHAR(STRING)>, ...>
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
```

@@ -281,6 +291,7 @@ ksql> CREATE STREAM PROCESSING_LOG_STREAM (
MESSAGE STRUCT<
`TYPE` INTEGER,
deserializationError STRUCT<
component STRING,
errorMessage STRING,
recordB64 STRING,
cause ARRAY<STRING>,
@@ -292,6 +303,7 @@ ksql> CREATE STREAM PROCESSING_LOG_STREAM (
productionError STRUCT<
errorMessage STRING>,
serializationError STRUCT<
component STRING,
errorMessage STRING,
record STRING,
cause ARRAY<STRING>,
Original file line number Diff line number Diff line change
@@ -24,13 +24,15 @@ public final class ProcessingLogMessageSchema {
private static final Schema CAUSE_SCHEMA =
SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).optional().build();

public static final String DESERIALIZATION_ERROR_FIELD_COMPONENT = "component";
public static final String DESERIALIZATION_ERROR_FIELD_MESSAGE = "errorMessage";
public static final String DESERIALIZATION_ERROR_FIELD_RECORD_B64 = "recordB64";
public static final String DESERIALIZATION_ERROR_FIELD_CAUSE = "cause";
public static final String DESERIALIZATION_ERROR_FIELD_TOPIC = "topic";

private static final Schema DESERIALIZATION_ERROR_SCHEMA = SchemaBuilder.struct()
.name(NAMESPACE + "DeserializationError")
.field(DESERIALIZATION_ERROR_FIELD_COMPONENT, Schema.OPTIONAL_STRING_SCHEMA)
.field(DESERIALIZATION_ERROR_FIELD_MESSAGE, Schema.OPTIONAL_STRING_SCHEMA)
.field(DESERIALIZATION_ERROR_FIELD_RECORD_B64, Schema.OPTIONAL_STRING_SCHEMA)
.field(DESERIALIZATION_ERROR_FIELD_CAUSE, CAUSE_SCHEMA)
@@ -58,13 +60,15 @@ public final class ProcessingLogMessageSchema {
.optional()
.build();

public static final String SERIALIZATION_ERROR_FIELD_COMPONENT = "component";
public static final String SERIALIZATION_ERROR_FIELD_MESSAGE = "errorMessage";
public static final String SERIALIZATION_ERROR_FIELD_RECORD = "record";
public static final String SERIALIZATION_ERROR_FIELD_CAUSE = "cause";
public static final String SERIALIZATION_ERROR_FIELD_TOPIC = "topic";

private static final Schema SERIALIZATION_ERROR_SCHEMA = SchemaBuilder.struct()
.name(NAMESPACE + "SerializationError")
.field(SERIALIZATION_ERROR_FIELD_COMPONENT, Schema.OPTIONAL_STRING_SCHEMA)
.field(SERIALIZATION_ERROR_FIELD_MESSAGE, Schema.OPTIONAL_STRING_SCHEMA)
.field(SERIALIZATION_ERROR_FIELD_RECORD, Schema.OPTIONAL_STRING_SCHEMA)
.field(SERIALIZATION_ERROR_FIELD_CAUSE, CAUSE_SCHEMA)
Original file line number Diff line number Diff line change
@@ -186,10 +186,10 @@ public void shouldBuildCorrectStreamCreateDDL() {
+ "time BIGINT, "
+ "message STRUCT<"
+ "type INT, "
+ "deserializationError STRUCT<errorMessage VARCHAR, recordB64 VARCHAR, cause ARRAY<VARCHAR>, `topic` VARCHAR>, "
+ "deserializationError STRUCT<component VARCHAR, errorMessage VARCHAR, recordB64 VARCHAR, cause ARRAY<VARCHAR>, `topic` VARCHAR>, "
+ "recordProcessingError STRUCT<errorMessage VARCHAR, record VARCHAR, cause ARRAY<VARCHAR>>, "
+ "productionError STRUCT<errorMessage VARCHAR>, "
+ "serializationError STRUCT<errorMessage VARCHAR, record VARCHAR, cause ARRAY<VARCHAR>, `topic` VARCHAR>"
+ "serializationError STRUCT<component VARCHAR, errorMessage VARCHAR, record VARCHAR, cause ARRAY<VARCHAR>, `topic` VARCHAR>"
+ ">"
+ ") WITH(KAFKA_TOPIC='processing_log_topic', VALUE_FORMAT='JSON');"));
}
Original file line number Diff line number Diff line change
@@ -18,9 +18,7 @@
import static java.util.Objects.requireNonNull;

import io.confluent.ksql.logging.processing.ProcessingLogMessageSchema.MessageType;
import io.confluent.ksql.util.ErrorMessageUtil;
import java.util.Base64;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import org.apache.kafka.connect.data.SchemaAndValue;
@@ -31,15 +29,18 @@ public class DeserializationError implements ProcessingLogger.ErrorMessage {
private final Throwable exception;
private final Optional<byte[]> record;
private final String topic;
private final boolean isKey;

public DeserializationError(
final Throwable exception,
final Optional<byte[]> record,
final String topic
final String topic,
final boolean isKey
) {
this.exception = requireNonNull(exception, "exception");
this.record = requireNonNull(record, "record");
this.topic = requireNonNull(topic, "topic");
this.isKey = isKey;
}

@Override
@@ -62,27 +63,29 @@ public boolean equals(final Object o) {
final DeserializationError that = (DeserializationError) o;
return Objects.equals(exception, that.exception)
&& Objects.equals(record, that.record)
&& Objects.equals(topic, that.topic);
&& Objects.equals(topic, that.topic)
&& isKey == that.isKey;
}

@Override
public int hashCode() {
return Objects.hash(exception, record, topic);
return Objects.hash(exception, record, topic, isKey);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have a unit test testing this. Can you add one please?

}

private Struct deserializationError(final ProcessingLogConfig config) {
final Struct deserializationError = new Struct(MessageType.DESERIALIZATION_ERROR.getSchema())
.put(
ProcessingLogMessageSchema.DESERIALIZATION_ERROR_FIELD_COMPONENT,
LoggingSerdeUtil.getRecordComponent(isKey))
.put(
ProcessingLogMessageSchema.DESERIALIZATION_ERROR_FIELD_MESSAGE,
exception.getMessage())
.put(
ProcessingLogMessageSchema.DESERIALIZATION_ERROR_FIELD_CAUSE,
getCause()
)
LoggingSerdeUtil.getCause(exception))
.put(
ProcessingLogMessageSchema.DESERIALIZATION_ERROR_FIELD_TOPIC,
topic
);
topic);

if (config.getBoolean(ProcessingLogConfig.INCLUDE_ROWS)) {
deserializationError.put(
@@ -93,10 +96,4 @@ private Struct deserializationError(final ProcessingLogConfig config) {

return deserializationError;
}

private List<String> getCause() {
final List<String> cause = ErrorMessageUtil.getErrorMessages(exception);
cause.remove(0);
return cause;
}
}
Original file line number Diff line number Diff line change
@@ -26,6 +26,7 @@ public final class LoggingDeserializer<T> implements Deserializer<T> {

private final Deserializer<T> delegate;
private final ProcessingLogger processingLogger;
private boolean isKey;

public LoggingDeserializer(
final Deserializer<T> delegate,
@@ -37,6 +38,7 @@ public LoggingDeserializer(

@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
this.isKey = isKey;
delegate.configure(configs, isKey);
}

@@ -59,7 +61,7 @@ public DelayedResult<T> tryDeserialize(final String topic, final byte[] bytes) {
} catch (final RuntimeException e) {
return new DelayedResult<T>(
e,
new DeserializationError(e, Optional.ofNullable(bytes), topic),
new DeserializationError(e, Optional.ofNullable(bytes), topic, isKey),
processingLogger
);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.logging.processing;

import io.confluent.ksql.util.ErrorMessageUtil;
import java.util.List;

final class LoggingSerdeUtil {

private LoggingSerdeUtil() {
}

static List<String> getCause(final Throwable exception) {
final List<String> cause = ErrorMessageUtil.getErrorMessages(exception);
cause.remove(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is a cut & paste, but I've seen this remove(0) remove the actual cause of the error, e.g. I've seen cases where exception is a simple NullPointerException and the remove(0) has resulted in an empty cause list.

Personally, I'd remove the remove and inline this method.

Which would only leave getRecordComponent, which I'd also be tempted to remove, and just add appropriate "key" and "value" constants to ProcessingLogMessageSchema instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first error message (the one being removed by this line) is already captured in the errorMessage field, which is why it's purposefully left out of the cause field. It sounds like you're advocating for duplication of the initial error message, so that the cause field is complete?

return cause;
}

static String getRecordComponent(final boolean isKey) {
return isKey ? "key" : "value";
}
}
Original file line number Diff line number Diff line change
@@ -25,6 +25,7 @@ public final class LoggingSerializer<T> implements Serializer<T> {

private final Serializer<T> delegate;
private final ProcessingLogger processingLogger;
private boolean isKey;

public LoggingSerializer(
final Serializer<T> delegate,
@@ -36,6 +37,7 @@ public LoggingSerializer(

@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
this.isKey = isKey;
delegate.configure(configs, isKey);
}

@@ -44,7 +46,7 @@ public byte[] serialize(final String topic, final T data) {
try {
return delegate.serialize(topic, data);
} catch (final RuntimeException e) {
processingLogger.error(new SerializationError<>(e, Optional.of(data), topic));
processingLogger.error(new SerializationError<>(e, Optional.of(data), topic, isKey));
throw e;
}
}
Original file line number Diff line number Diff line change
@@ -18,8 +18,6 @@
import static java.util.Objects.requireNonNull;

import io.confluent.ksql.logging.processing.ProcessingLogMessageSchema.MessageType;
import io.confluent.ksql.util.ErrorMessageUtil;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import org.apache.kafka.connect.data.SchemaAndValue;
@@ -30,15 +28,18 @@ public class SerializationError<T> implements ProcessingLogger.ErrorMessage {
private final Throwable exception;
private final Optional<T> record;
private final String topic;
private final boolean isKey;

public SerializationError(
final Throwable exception,
final Optional<T> record,
final String topic
final String topic,
final boolean isKey
) {
this.exception = requireNonNull(exception, "exception");
this.record = requireNonNull(record, "record");
this.topic = requireNonNull(topic, "topic");
this.isKey = isKey;
}

@Override
@@ -58,30 +59,32 @@ public boolean equals(final Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
final SerializationError<?> that = (SerializationError) o;
final SerializationError<?> that = (SerializationError<?>) o;
return Objects.equals(exception, that.exception)
&& Objects.equals(record, that.record)
&& Objects.equals(topic, that.topic);
&& Objects.equals(topic, that.topic)
&& isKey == that.isKey;
}

@Override
public int hashCode() {
return Objects.hash(exception, record, topic);
return Objects.hash(exception, record, topic, isKey);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have a unit test testing this. Can you add one please?

}

private Struct serializationError(final ProcessingLogConfig config) {
final Struct serializationError = new Struct(MessageType.SERIALIZATION_ERROR.getSchema())
.put(
ProcessingLogMessageSchema.SERIALIZATION_ERROR_FIELD_COMPONENT,
LoggingSerdeUtil.getRecordComponent(isKey))
.put(
ProcessingLogMessageSchema.SERIALIZATION_ERROR_FIELD_MESSAGE,
exception.getMessage())
.put(
ProcessingLogMessageSchema.SERIALIZATION_ERROR_FIELD_CAUSE,
getCause()
)
LoggingSerdeUtil.getCause(exception))
.put(
ProcessingLogMessageSchema.SERIALIZATION_ERROR_FIELD_TOPIC,
topic
);
topic);

if (config.getBoolean(ProcessingLogConfig.INCLUDE_ROWS)) {
serializationError.put(
@@ -92,10 +95,4 @@ private Struct serializationError(final ProcessingLogConfig config) {

return serializationError;
}

private List<String> getCause() {
final List<String> cause = ErrorMessageUtil.getErrorMessages(exception);
cause.remove(0);
return cause;
}
}
Original file line number Diff line number Diff line change
@@ -164,7 +164,7 @@ public Struct deserialize(final String topic, final byte[] bytes) {
return struct;
} catch (final Exception e) {
throw new SerializationException(
"Error deserializing DELIMITED message from topic: " + topic, e);
"Error deserializing KAFKA message from topic: " + topic, e);
}
}
}
Loading