From 513d57e8072509aaaad3cd7f8f322dbe42558fa1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Massot?= Date: Tue, 26 Dec 2023 01:11:04 +0100 Subject: [PATCH 1/2] Better logs when doc processing fails. --- .../src/actors/doc_processor.rs | 91 +++++++++++-------- 1 file changed, 52 insertions(+), 39 deletions(-) diff --git a/quickwit/quickwit-indexing/src/actors/doc_processor.rs b/quickwit/quickwit-indexing/src/actors/doc_processor.rs index 269123ee3f0..d8c86faca8c 100644 --- a/quickwit/quickwit-indexing/src/actors/doc_processor.rs +++ b/quickwit/quickwit-indexing/src/actors/doc_processor.rs @@ -66,7 +66,9 @@ impl JsonDoc { ) -> Result { match json_value { JsonValue::Object(json_obj) => Ok(Self::new(json_obj, num_bytes)), - _ => Err(DocProcessorError::Parse), + _ => Err(DocProcessorError::Parsing( + "document must be a JSON object".to_string(), + )), } } @@ -79,21 +81,34 @@ impl JsonDoc { #[derive(Debug)] pub enum DocProcessorError { - Parse, - Schema, + DocMapperParsing(DocParsingError), + OltpTraceParsing(OtlpTraceError), + Parsing(String), #[cfg(feature = "vrl")] Transform(VrlTerminate), } +impl From for DocProcessorError { + fn from(error: OtlpTraceError) -> Self { + DocProcessorError::OltpTraceParsing(error) + } +} + +impl From for DocProcessorError { + fn from(error: DocParsingError) -> Self { + DocProcessorError::DocMapperParsing(error) + } +} + impl From for DocProcessorError { - fn from(_error: serde_json::Error) -> Self { - DocProcessorError::Parse + fn from(error: serde_json::Error) -> Self { + DocProcessorError::Parsing(error.to_string()) } } impl From for DocProcessorError { - fn from(_error: FromUtf8Error) -> Self { - DocProcessorError::Parse + fn from(error: FromUtf8Error) -> Self { + DocProcessorError::Parsing(error.to_string()) } } @@ -211,7 +226,7 @@ impl From> for JsonDocIterator { fn from(result: Result) -> Self { match result { Ok(json_doc) => Self::Spans(json_doc), - Err(_) => Self::One(Some(Err(DocProcessorError::Parse))), + Err(error) => Self::One(Some(Err(DocProcessorError::from(error)))), } } } @@ -226,9 +241,9 @@ pub struct DocProcessorCounters { /// - number of docs that could not be transformed. /// - number of docs for which the doc mapper returnd an error. /// - number of valid docs. - pub num_parse_errors: AtomicU64, + pub num_doc_parsing_errors: AtomicU64, pub num_transform_errors: AtomicU64, - pub num_schema_errors: AtomicU64, + pub num_oltp_trace_errors: AtomicU64, pub num_valid_docs: AtomicU64, /// Number of bytes that went through the indexer @@ -243,9 +258,9 @@ impl DocProcessorCounters { Self { index_id, source_id, - num_parse_errors: Default::default(), + num_doc_parsing_errors: Default::default(), num_transform_errors: Default::default(), - num_schema_errors: Default::default(), + num_oltp_trace_errors: Default::default(), num_valid_docs: Default::default(), num_bytes_total: Default::default(), } @@ -254,8 +269,8 @@ impl DocProcessorCounters { /// Returns the overall number of docs that went through the indexer (valid or not). pub fn num_processed_docs(&self) -> u64 { self.num_valid_docs.load(Ordering::Relaxed) - + self.num_parse_errors.load(Ordering::Relaxed) - + self.num_schema_errors.load(Ordering::Relaxed) + + self.num_doc_parsing_errors.load(Ordering::Relaxed) + + self.num_oltp_trace_errors.load(Ordering::Relaxed) + self.num_transform_errors.load(Ordering::Relaxed) } @@ -263,8 +278,8 @@ impl DocProcessorCounters { /// (For instance, because they were missing a required field or because their because /// their format was invalid) pub fn num_invalid_docs(&self) -> u64 { - self.num_parse_errors.load(Ordering::Relaxed) - + self.num_schema_errors.load(Ordering::Relaxed) + self.num_doc_parsing_errors.load(Ordering::Relaxed) + + self.num_oltp_trace_errors.load(Ordering::Relaxed) + self.num_transform_errors.load(Ordering::Relaxed) } @@ -284,13 +299,17 @@ impl DocProcessorCounters { pub fn record_error(&self, error: DocProcessorError, num_bytes: u64) { let label = match error { - DocProcessorError::Parse => { - self.num_parse_errors.fetch_add(1, Ordering::Relaxed); - "parse_error" + DocProcessorError::DocMapperParsing(_) => { + self.num_doc_parsing_errors.fetch_add(1, Ordering::Relaxed); + "doc_mapper_error" } - DocProcessorError::Schema => { - self.num_schema_errors.fetch_add(1, Ordering::Relaxed); - "schema_error" + DocProcessorError::OltpTraceParsing(_) => { + self.num_oltp_trace_errors.fetch_add(1, Ordering::Relaxed); + "otlp_trace_parsing_error" + } + DocProcessorError::Parsing(_) => { + self.num_doc_parsing_errors.fetch_add(1, Ordering::Relaxed); + "json_parsing_error" } #[cfg(feature = "vrl")] DocProcessorError::Transform(_) => { @@ -365,7 +384,9 @@ impl DocProcessor { let timestamp = doc .get_first(timestamp_field) .and_then(|val| val.as_datetime()) - .ok_or(DocProcessorError::Schema)?; + .ok_or(DocProcessorError::from(DocParsingError::RequiredField( + "timestamp field is required".to_string(), + )))?; Ok(Some(timestamp)) } @@ -387,6 +408,7 @@ impl DocProcessor { processed_docs.push(processed_doc); } Err(error) => { + warn!(index_id=self.counters.index_id, source_id=self.counters.source_id, error=?error); self.counters.record_error(error, num_bytes as u64); } } @@ -396,16 +418,7 @@ impl DocProcessor { fn process_json_doc(&self, json_doc: JsonDoc) -> Result { let num_bytes = json_doc.num_bytes; - let (partition, doc) = self - .doc_mapper - .doc_from_json_obj(json_doc.json_obj) - .map_err(|error| { - warn!(index_id=self.counters.index_id, source_id=self.counters.source_id, error=?error); - match error { - DocParsingError::RequiredField(_) => DocProcessorError::Schema, - _ => DocProcessorError::Parse, - } - })?; + let (partition, doc) = self.doc_mapper.doc_from_json_obj(json_doc.json_obj)?; let timestamp_opt = self.extract_timestamp(&doc)?; Ok(ProcessedDoc { doc, @@ -585,9 +598,9 @@ mod tests { .state; assert_eq!(counters.index_id, index_id); assert_eq!(counters.source_id, source_id); - assert_eq!(counters.num_parse_errors.load(Ordering::Relaxed), 1); + assert_eq!(counters.num_doc_parsing_errors.load(Ordering::Relaxed), 2); assert_eq!(counters.num_transform_errors.load(Ordering::Relaxed), 0); - assert_eq!(counters.num_schema_errors.load(Ordering::Relaxed), 1); + assert_eq!(counters.num_oltp_trace_errors.load(Ordering::Relaxed), 0); assert_eq!(counters.num_valid_docs.load(Ordering::Relaxed), 2); assert_eq!(counters.num_bytes_total.load(Ordering::Relaxed), 387); @@ -978,9 +991,9 @@ mod tests_vrl { .state; assert_eq!(counters.index_id, index_id.to_string()); assert_eq!(counters.source_id, source_id.to_string()); - assert_eq!(counters.num_parse_errors.load(Ordering::Relaxed), 1); + assert_eq!(counters.num_doc_parsing_errors.load(Ordering::Relaxed), 1); assert_eq!(counters.num_transform_errors.load(Ordering::Relaxed), 0); - assert_eq!(counters.num_schema_errors.load(Ordering::Relaxed), 1); + assert_eq!(counters.num_oltp_trace_errors.load(Ordering::Relaxed), 1); assert_eq!(counters.num_valid_docs.load(Ordering::Relaxed), 2); assert_eq!(counters.num_bytes_total.load(Ordering::Relaxed), 397); @@ -1068,9 +1081,9 @@ mod tests_vrl { .state; assert_eq!(counters.index_id, index_id); assert_eq!(counters.source_id, source_id); - assert_eq!(counters.num_parse_errors.load(Ordering::Relaxed), 0,); + assert_eq!(counters.num_doc_parsing_errors.load(Ordering::Relaxed), 0,); assert_eq!(counters.num_transform_errors.load(Ordering::Relaxed), 1,); - assert_eq!(counters.num_schema_errors.load(Ordering::Relaxed), 0,); + assert_eq!(counters.num_oltp_trace_errors.load(Ordering::Relaxed), 0,); assert_eq!(counters.num_valid_docs.load(Ordering::Relaxed), 2,); assert_eq!(counters.num_bytes_total.load(Ordering::Relaxed), 200,); From fadfc302fc3c22d7e859116463281b140723f545 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Massot?= Date: Tue, 26 Dec 2023 21:52:41 +0100 Subject: [PATCH 2/2] Small log improvement. --- .../src/actors/doc_processor.rs | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/quickwit/quickwit-indexing/src/actors/doc_processor.rs b/quickwit/quickwit-indexing/src/actors/doc_processor.rs index d8c86faca8c..c7da729160a 100644 --- a/quickwit/quickwit-indexing/src/actors/doc_processor.rs +++ b/quickwit/quickwit-indexing/src/actors/doc_processor.rs @@ -35,6 +35,7 @@ use serde::Serialize; use serde_json::Value as JsonValue; use tantivy::schema::{Field, Value}; use tantivy::{DateTime, TantivyDocument}; +use thiserror::Error; use tokio::runtime::Handle; use tracing::warn; @@ -79,12 +80,16 @@ impl JsonDoc { } } -#[derive(Debug)] +#[derive(Error, Debug)] pub enum DocProcessorError { + #[error("doc mapper parsing error: {0}")] DocMapperParsing(DocParsingError), + #[error("OLTP trace parsing error: {0}")] OltpTraceParsing(OtlpTraceError), + #[error("doc parsing error: {0}")] Parsing(String), #[cfg(feature = "vrl")] + #[error("VRL transform error: {0}")] Transform(VrlTerminate), } @@ -309,7 +314,7 @@ impl DocProcessorCounters { } DocProcessorError::Parsing(_) => { self.num_doc_parsing_errors.fetch_add(1, Ordering::Relaxed); - "json_parsing_error" + "parsing_error" } #[cfg(feature = "vrl")] DocProcessorError::Transform(_) => { @@ -408,7 +413,12 @@ impl DocProcessor { processed_docs.push(processed_doc); } Err(error) => { - warn!(index_id=self.counters.index_id, source_id=self.counters.source_id, error=?error); + warn!( + index_id = self.counters.index_id, + source_id = self.counters.source_id, + "{}", + error + ); self.counters.record_error(error, num_bytes as u64); } }