Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better logs when doc processing fails. #4323

Merged
merged 2 commits into from
Dec 26, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 63 additions & 40 deletions quickwit/quickwit-indexing/src/actors/doc_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use serde::Serialize;
use serde_json::Value as JsonValue;
use tantivy::schema::{Field, Value};
use tantivy::{DateTime, TantivyDocument};
use thiserror::Error;
use tokio::runtime::Handle;
use tracing::warn;

Expand Down Expand Up @@ -66,7 +67,9 @@ impl JsonDoc {
) -> Result<Self, DocProcessorError> {
match json_value {
JsonValue::Object(json_obj) => Ok(Self::new(json_obj, num_bytes)),
_ => Err(DocProcessorError::Parse),
_ => Err(DocProcessorError::Parsing(
"document must be a JSON object".to_string(),
)),
}
}

Expand All @@ -77,23 +80,40 @@ impl JsonDoc {
}
}

#[derive(Debug)]
#[derive(Error, Debug)]
pub enum DocProcessorError {
Parse,
Schema,
#[error("doc mapper parsing error: {0}")]
DocMapperParsing(DocParsingError),
#[error("OLTP trace parsing error: {0}")]
OltpTraceParsing(OtlpTraceError),
#[error("doc parsing error: {0}")]
Parsing(String),
#[cfg(feature = "vrl")]
#[error("VRL transform error: {0}")]
Transform(VrlTerminate),
}

impl From<OtlpTraceError> for DocProcessorError {
fn from(error: OtlpTraceError) -> Self {
DocProcessorError::OltpTraceParsing(error)
}
}

impl From<DocParsingError> for DocProcessorError {
fn from(error: DocParsingError) -> Self {
DocProcessorError::DocMapperParsing(error)
}
}

impl From<serde_json::Error> for DocProcessorError {
fn from(_error: serde_json::Error) -> Self {
DocProcessorError::Parse
fn from(error: serde_json::Error) -> Self {
DocProcessorError::Parsing(error.to_string())
}
}

impl From<FromUtf8Error> for DocProcessorError {
fn from(_error: FromUtf8Error) -> Self {
DocProcessorError::Parse
fn from(error: FromUtf8Error) -> Self {
DocProcessorError::Parsing(error.to_string())
}
}

Expand Down Expand Up @@ -211,7 +231,7 @@ impl From<Result<JsonSpanIterator, OtlpTraceError>> for JsonDocIterator {
fn from(result: Result<JsonSpanIterator, OtlpTraceError>) -> Self {
match result {
Ok(json_doc) => Self::Spans(json_doc),
Err(_) => Self::One(Some(Err(DocProcessorError::Parse))),
Err(error) => Self::One(Some(Err(DocProcessorError::from(error)))),
}
}
}
Expand All @@ -226,9 +246,9 @@ pub struct DocProcessorCounters {
/// - number of docs that could not be transformed.
/// - number of docs for which the doc mapper returnd an error.
/// - number of valid docs.
pub num_parse_errors: AtomicU64,
pub num_doc_parsing_errors: AtomicU64,
pub num_transform_errors: AtomicU64,
pub num_schema_errors: AtomicU64,
pub num_oltp_trace_errors: AtomicU64,
pub num_valid_docs: AtomicU64,

/// Number of bytes that went through the indexer
Expand All @@ -243,9 +263,9 @@ impl DocProcessorCounters {
Self {
index_id,
source_id,
num_parse_errors: Default::default(),
num_doc_parsing_errors: Default::default(),
num_transform_errors: Default::default(),
num_schema_errors: Default::default(),
num_oltp_trace_errors: Default::default(),
num_valid_docs: Default::default(),
num_bytes_total: Default::default(),
}
Expand All @@ -254,17 +274,17 @@ impl DocProcessorCounters {
/// Returns the overall number of docs that went through the indexer (valid or not).
pub fn num_processed_docs(&self) -> u64 {
self.num_valid_docs.load(Ordering::Relaxed)
+ self.num_parse_errors.load(Ordering::Relaxed)
+ self.num_schema_errors.load(Ordering::Relaxed)
+ self.num_doc_parsing_errors.load(Ordering::Relaxed)
+ self.num_oltp_trace_errors.load(Ordering::Relaxed)
+ self.num_transform_errors.load(Ordering::Relaxed)
}

/// Returns the overall number of docs that were sent to the indexer but were invalid.
/// (For instance, because they were missing a required field or because their because
/// their format was invalid)
pub fn num_invalid_docs(&self) -> u64 {
self.num_parse_errors.load(Ordering::Relaxed)
+ self.num_schema_errors.load(Ordering::Relaxed)
self.num_doc_parsing_errors.load(Ordering::Relaxed)
+ self.num_oltp_trace_errors.load(Ordering::Relaxed)
+ self.num_transform_errors.load(Ordering::Relaxed)
}

Expand All @@ -284,13 +304,17 @@ impl DocProcessorCounters {

pub fn record_error(&self, error: DocProcessorError, num_bytes: u64) {
let label = match error {
DocProcessorError::Parse => {
self.num_parse_errors.fetch_add(1, Ordering::Relaxed);
"parse_error"
DocProcessorError::DocMapperParsing(_) => {
self.num_doc_parsing_errors.fetch_add(1, Ordering::Relaxed);
"doc_mapper_error"
}
DocProcessorError::Schema => {
self.num_schema_errors.fetch_add(1, Ordering::Relaxed);
"schema_error"
DocProcessorError::OltpTraceParsing(_) => {
self.num_oltp_trace_errors.fetch_add(1, Ordering::Relaxed);
"otlp_trace_parsing_error"
}
DocProcessorError::Parsing(_) => {
self.num_doc_parsing_errors.fetch_add(1, Ordering::Relaxed);
"parsing_error"
}
#[cfg(feature = "vrl")]
DocProcessorError::Transform(_) => {
Expand Down Expand Up @@ -365,7 +389,9 @@ impl DocProcessor {
let timestamp = doc
.get_first(timestamp_field)
.and_then(|val| val.as_datetime())
.ok_or(DocProcessorError::Schema)?;
.ok_or(DocProcessorError::from(DocParsingError::RequiredField(
"timestamp field is required".to_string(),
)))?;
Ok(Some(timestamp))
}

Expand All @@ -387,6 +413,12 @@ impl DocProcessor {
processed_docs.push(processed_doc);
}
Err(error) => {
warn!(
index_id = self.counters.index_id,
source_id = self.counters.source_id,
"{}",
error
);
self.counters.record_error(error, num_bytes as u64);
}
}
Expand All @@ -396,16 +428,7 @@ impl DocProcessor {
fn process_json_doc(&self, json_doc: JsonDoc) -> Result<ProcessedDoc, DocProcessorError> {
let num_bytes = json_doc.num_bytes;

let (partition, doc) = self
.doc_mapper
.doc_from_json_obj(json_doc.json_obj)
.map_err(|error| {
warn!(index_id=self.counters.index_id, source_id=self.counters.source_id, error=?error);
match error {
DocParsingError::RequiredField(_) => DocProcessorError::Schema,
_ => DocProcessorError::Parse,
}
})?;
let (partition, doc) = self.doc_mapper.doc_from_json_obj(json_doc.json_obj)?;
let timestamp_opt = self.extract_timestamp(&doc)?;
Ok(ProcessedDoc {
doc,
Expand Down Expand Up @@ -585,9 +608,9 @@ mod tests {
.state;
assert_eq!(counters.index_id, index_id);
assert_eq!(counters.source_id, source_id);
assert_eq!(counters.num_parse_errors.load(Ordering::Relaxed), 1);
assert_eq!(counters.num_doc_parsing_errors.load(Ordering::Relaxed), 2);
assert_eq!(counters.num_transform_errors.load(Ordering::Relaxed), 0);
assert_eq!(counters.num_schema_errors.load(Ordering::Relaxed), 1);
assert_eq!(counters.num_oltp_trace_errors.load(Ordering::Relaxed), 0);
assert_eq!(counters.num_valid_docs.load(Ordering::Relaxed), 2);
assert_eq!(counters.num_bytes_total.load(Ordering::Relaxed), 387);

Expand Down Expand Up @@ -978,9 +1001,9 @@ mod tests_vrl {
.state;
assert_eq!(counters.index_id, index_id.to_string());
assert_eq!(counters.source_id, source_id.to_string());
assert_eq!(counters.num_parse_errors.load(Ordering::Relaxed), 1);
assert_eq!(counters.num_doc_parsing_errors.load(Ordering::Relaxed), 1);
assert_eq!(counters.num_transform_errors.load(Ordering::Relaxed), 0);
assert_eq!(counters.num_schema_errors.load(Ordering::Relaxed), 1);
assert_eq!(counters.num_oltp_trace_errors.load(Ordering::Relaxed), 1);
assert_eq!(counters.num_valid_docs.load(Ordering::Relaxed), 2);
assert_eq!(counters.num_bytes_total.load(Ordering::Relaxed), 397);

Expand Down Expand Up @@ -1068,9 +1091,9 @@ mod tests_vrl {
.state;
assert_eq!(counters.index_id, index_id);
assert_eq!(counters.source_id, source_id);
assert_eq!(counters.num_parse_errors.load(Ordering::Relaxed), 0,);
assert_eq!(counters.num_doc_parsing_errors.load(Ordering::Relaxed), 0,);
assert_eq!(counters.num_transform_errors.load(Ordering::Relaxed), 1,);
assert_eq!(counters.num_schema_errors.load(Ordering::Relaxed), 0,);
assert_eq!(counters.num_oltp_trace_errors.load(Ordering::Relaxed), 0,);
assert_eq!(counters.num_valid_docs.load(Ordering::Relaxed), 2,);
assert_eq!(counters.num_bytes_total.load(Ordering::Relaxed), 200,);

Expand Down