From 44b9535d259027bf1d15f267bbfa6374457d32e8 Mon Sep 17 00:00:00 2001 From: Ammar Abou Zor Date: Wed, 12 Jun 2024 15:12:29 +0200 Subject: [PATCH] Make parser async & Use with Arc wit Dlt parser & Adjustments on plugin hosts - Changing parser trait to async caused an error in the DLT parser because it returns type has a reference in it. Because of that we got a compiler error about that the results doesn't implements Send trait enough. - This Error is possible to be a bug in rust that would be fixed in the future. See issues: - https://github.com/rust-lang/rust/issues/64552 - https://github.com/rust-lang/rust/issues/96865 - For now I replaced the references with Arcs in the results of DLT-Parser - implements for parser Plugin hosts now awaits on the async call from the plugin instead of using `futures::executer::block_on()`. However, this change didn't improve the performance of the plugins --- application/apps/indexer/Cargo.lock | 1 + application/apps/indexer/parsers/Cargo.toml | 1 + .../apps/indexer/parsers/src/dlt/fmt.rs | 19 +- .../apps/indexer/parsers/src/dlt/mod.rs | 36 ++-- application/apps/indexer/parsers/src/lib.rs | 4 +- .../apps/indexer/parsers/src/someip.rs | 50 +++--- application/apps/indexer/parsers/src/text.rs | 42 +++-- .../apps/indexer/processor/src/export/mod.rs | 2 +- .../session/src/handlers/export_raw.rs | 4 +- .../session/src/handlers/observing/mod.rs | 4 +- .../apps/indexer/sources/src/producer.rs | 1 + .../wasm_plugin/host/src/wasm_parser.rs | 168 +++++++++--------- .../wasm_plugin/host/src/wasm_parser2.rs | 14 +- .../wasm_plugin/host/src/wasm_parser_inter.rs | 97 +++++----- 14 files changed, 225 insertions(+), 218 deletions(-) diff --git a/application/apps/indexer/Cargo.lock b/application/apps/indexer/Cargo.lock index 36a395c543..9cfadbe8a4 100644 --- a/application/apps/indexer/Cargo.lock +++ b/application/apps/indexer/Cargo.lock @@ -2037,6 +2037,7 @@ dependencies = [ "someip-payload", "stringreader", "thiserror", + "tokio", ] [[package]] diff --git a/application/apps/indexer/parsers/Cargo.toml b/application/apps/indexer/parsers/Cargo.toml index 76156e55ce..b1777465f7 100644 --- a/application/apps/indexer/parsers/Cargo.toml +++ b/application/apps/indexer/parsers/Cargo.toml @@ -22,3 +22,4 @@ someip-payload = { git = "https://github.com/esrlabs/someip-payload" } [dev-dependencies] stringreader = "0.1.1" +tokio = { version = "1.24", features = ["full"] } diff --git a/application/apps/indexer/parsers/src/dlt/fmt.rs b/application/apps/indexer/parsers/src/dlt/fmt.rs index 52941eee2b..076bd5e9b3 100644 --- a/application/apps/indexer/parsers/src/dlt/fmt.rs +++ b/application/apps/indexer/parsers/src/dlt/fmt.rs @@ -30,6 +30,7 @@ use serde::ser::{Serialize, SerializeStruct, Serializer}; use std::{ fmt::{self, Formatter}, str, + sync::Arc, }; const DLT_COLUMN_SENTINAL: char = '\u{0004}'; @@ -194,13 +195,13 @@ impl From> for FormatOptions { } /// A dlt message that can be formatted with optional FIBEX data support -pub struct FormattableMessage<'a> { +pub struct FormattableMessage { pub message: Message, - pub fibex_metadata: Option<&'a FibexMetadata>, - pub options: Option<&'a FormatOptions>, + pub fibex_metadata: Option>, + pub options: Option>, } -impl<'a> Serialize for FormattableMessage<'a> { +impl Serialize for FormattableMessage { fn serialize(&self, serializer: S) -> Result where S: Serializer, @@ -286,7 +287,7 @@ impl<'a> Serialize for FormattableMessage<'a> { } } -impl<'a> From for FormattableMessage<'a> { +impl From for FormattableMessage { fn from(message: Message) -> Self { FormattableMessage { message, @@ -319,7 +320,7 @@ impl<'a> PrintableMessage<'a> { } } -impl<'a> FormattableMessage<'a> { +impl FormattableMessage { pub fn printable_parts<'b>( &'b self, ext_h_app_id: &'b str, @@ -462,7 +463,7 @@ impl<'a> FormattableMessage<'a> { } fn info_from_metadata<'b>(&'b self, id: u32, data: &[u8]) -> Option> { - let fibex = self.fibex_metadata?; + let fibex = self.fibex_metadata.as_ref()?; let md = extract_metadata(fibex, id, self.message.extended_header.as_ref())?; let msg_type: Option = message_type(&self.message, md.message_info.as_deref()); let app_id = md.application_id.as_deref().or_else(|| { @@ -511,7 +512,7 @@ impl<'a> FormattableMessage<'a> { } } -impl<'a> fmt::Display for FormattableMessage<'a> { +impl fmt::Display for FormattableMessage { /// will format dlt Message with those fields: /// ********* storage-header ******** /// date-time @@ -530,7 +531,7 @@ impl<'a> fmt::Display for FormattableMessage<'a> { /// payload fn fmt(&self, f: &mut Formatter) -> Result<(), fmt::Error> { if let Some(h) = &self.message.storage_header { - let tz = self.options.map(|o| o.tz); + let tz = self.options.as_ref().map(|o| o.tz); match tz { Some(Some(tz)) => { write_tz_string(f, &h.timestamp, &tz)?; diff --git a/application/apps/indexer/parsers/src/dlt/mod.rs b/application/apps/indexer/parsers/src/dlt/mod.rs index 38e7b014ee..49b743af68 100644 --- a/application/apps/indexer/parsers/src/dlt/mod.rs +++ b/application/apps/indexer/parsers/src/dlt/mod.rs @@ -13,11 +13,11 @@ use dlt_core::{ parse::{dlt_consume_msg, dlt_message}, }; use serde::Serialize; -use std::{io::Write, ops::Range}; +use std::{io::Write, ops::Range, sync::Arc}; use self::{attachment::FtScanner, fmt::FormatOptions}; -impl LogMessage for FormattableMessage<'_> { +impl LogMessage for FormattableMessage { fn to_writer(&self, writer: &mut W) -> Result { let bytes = self.message.as_bytes(); let len = bytes.len(); @@ -66,10 +66,10 @@ impl LogMessage for RawMessage { #[derive(Default)] //TODO AAZ: This is the parser that should be replaced -pub struct DltParser<'m> { +pub struct DltParser { pub filter_config: Option, - pub fibex_metadata: Option<&'m FibexMetadata>, - pub fmt_options: Option<&'m FormatOptions>, + pub fibex_metadata: Option>, + pub fmt_options: Option>, pub with_storage_header: bool, ft_scanner: FtScanner, offset: usize, @@ -98,30 +98,30 @@ impl DltRangeParser { } } -impl<'m> DltParser<'m> { +impl DltParser { pub fn new( filter_config: Option, - fibex_metadata: Option<&'m FibexMetadata>, - fmt_options: Option<&'m FormatOptions>, + fibex_metadata: Option, + fmt_options: Option, with_storage_header: bool, ) -> Self { Self { filter_config, - fibex_metadata, + fibex_metadata: fibex_metadata.map(Arc::new), with_storage_header, - fmt_options, + fmt_options: fmt_options.map(Arc::new), ft_scanner: FtScanner::new(), offset: 0, } } } -impl<'m> Parser> for DltParser<'m> { - fn parse<'b>( +impl Parser for DltParser { + async fn parse<'a>( &mut self, - input: &'b [u8], + input: &'a [u8], timestamp: Option, - ) -> Result<(&'b [u8], Option>>), Error> { + ) -> Result<(&'a [u8], Option>), Error> { match dlt_message(input, self.filter_config.as_ref(), self.with_storage_header) .map_err(|e| Error::Parse(format!("{e}")))? { @@ -142,8 +142,8 @@ impl<'m> Parser> for DltParser<'m> { let msg = FormattableMessage { message: msg_with_storage_header, - fibex_metadata: self.fibex_metadata, - options: self.fmt_options, + fibex_metadata: self.fibex_metadata.clone(), + options: self.fmt_options.clone(), }; self.offset += input.len() - rest.len(); Ok(( @@ -160,7 +160,7 @@ impl<'m> Parser> for DltParser<'m> { } impl Parser for DltRangeParser { - fn parse<'b>( + async fn parse<'b>( &mut self, input: &'b [u8], _timestamp: Option, @@ -180,7 +180,7 @@ impl Parser for DltRangeParser { } impl Parser for DltRawParser { - fn parse<'b>( + async fn parse<'b>( &mut self, input: &'b [u8], _timestamp: Option, diff --git a/application/apps/indexer/parsers/src/lib.rs b/application/apps/indexer/parsers/src/lib.rs index 1b7fce8efe..608d2a2b12 100644 --- a/application/apps/indexer/parsers/src/lib.rs +++ b/application/apps/indexer/parsers/src/lib.rs @@ -25,7 +25,7 @@ pub enum ParseYield { MessageAndAttachment((T, Attachment)), } -impl From for ParseYield { +impl From for ParseYield { fn from(item: T) -> Self { Self::Message(item) } @@ -44,7 +44,7 @@ pub trait Parser { &mut self, input: &'a [u8], timestamp: Option, - ) -> Result<(&'a [u8], Option>), Error>; + ) -> impl std::future::Future>), Error>> + Send; } #[derive(Debug, Clone, Serialize)] diff --git a/application/apps/indexer/parsers/src/someip.rs b/application/apps/indexer/parsers/src/someip.rs index 1601017d78..9f0b14074d 100644 --- a/application/apps/indexer/parsers/src/someip.rs +++ b/application/apps/indexer/parsers/src/someip.rs @@ -49,7 +49,7 @@ unsafe impl Send for SomeipParser {} unsafe impl Sync for SomeipParser {} impl Parser for SomeipParser { - fn parse<'a>( + async fn parse<'a>( &mut self, input: &'a [u8], timestamp: Option, @@ -397,8 +397,8 @@ mod test { FibexParser::parse(vec![reader]).expect("parse failed") } - #[test] - fn parse_cookie_client() { + #[tokio::test] + async fn parse_cookie_client() { let input: &[u8] = &[ 0xFF, 0xFF, 0x00, 0x00, // serviceId(u16), methodId(u16) 0x00, 0x00, 0x00, 0x08, // length(u32) @@ -407,7 +407,7 @@ mod test { ]; let mut parser = SomeipParser::new(); - let (output, message) = parser.parse(input, None).unwrap(); + let (output, message) = parser.parse(input, None).await.unwrap(); assert!(output.is_empty()); @@ -418,8 +418,8 @@ mod test { } } - #[test] - fn parse_cookie_server() { + #[tokio::test] + async fn parse_cookie_server() { let input: &[u8] = &[ 0xFF, 0xFF, 0x80, 0x00, // serviceId(u16), methodId(u16) 0x00, 0x00, 0x00, 0x08, // length(u32) @@ -428,7 +428,7 @@ mod test { ]; let mut parser = SomeipParser::new(); - let (output, message) = parser.parse(input, None).unwrap(); + let (output, message) = parser.parse(input, None).await.unwrap(); assert!(output.is_empty()); @@ -439,8 +439,8 @@ mod test { } } - #[test] - fn parse_empty_rpc_message_no_model() { + #[tokio::test] + async fn parse_empty_rpc_message_no_model() { let input: &[u8] = &[ 0x01, 0x03, 0x80, 0x04, // serviceId(u16), methodId(u16) 0x00, 0x00, 0x00, 0x08, // length(u32) @@ -449,7 +449,7 @@ mod test { ]; let mut parser = SomeipParser::new(); - let (output, message) = parser.parse(input, None).unwrap(); + let (output, message) = parser.parse(input, None).await.unwrap(); assert!(output.is_empty()); @@ -462,8 +462,8 @@ mod test { } } - #[test] - fn parse_empty_rpc_message() { + #[tokio::test] + async fn parse_empty_rpc_message() { let input: &[u8] = &[ 0x01, 0x03, 0x80, 0x04, // serviceId(u16), methodId(u16) 0x00, 0x00, 0x00, 0x08, // length(u32) @@ -474,7 +474,7 @@ mod test { let model = test_model(); let mut parser = SomeipParser { model: Some(model) }; - let (output, message) = parser.parse(input, None).unwrap(); + let (output, message) = parser.parse(input, None).await.unwrap(); assert!(output.is_empty()); @@ -487,8 +487,8 @@ mod test { } } - #[test] - fn parse_rpc_message_no_model() { + #[tokio::test] + async fn parse_rpc_message_no_model() { let input: &[u8] = &[ 0x01, 0x03, 0x80, 0x05, // serviceId(u16), methodId(u16) 0x00, 0x00, 0x00, 0x0A, // length(u32) @@ -498,7 +498,7 @@ mod test { ]; let mut parser = SomeipParser::new(); - let (output, message) = parser.parse(input, None).unwrap(); + let (output, message) = parser.parse(input, None).await.unwrap(); assert!(output.is_empty()); @@ -511,8 +511,8 @@ mod test { } } - #[test] - fn parse_rpc_message() { + #[tokio::test] + async fn parse_rpc_message() { let input: &[u8] = &[ 0x01, 0x03, 0x80, 0x05, // serviceId(u16), methodId(u16) 0x00, 0x00, 0x00, 0x0A, // length(u32) @@ -524,7 +524,7 @@ mod test { let model = test_model(); let mut parser = SomeipParser { model: Some(model) }; - let (output, message) = parser.parse(input, None).unwrap(); + let (output, message) = parser.parse(input, None).await.unwrap(); assert!(output.is_empty()); @@ -537,8 +537,8 @@ mod test { } } - #[test] - fn parse_empty_sd_message() { + #[tokio::test] + async fn parse_empty_sd_message() { let input: &[u8] = &[ 0xFF, 0xFF, 0x81, 0x00, // serviceId(u16), methodId(u16) 0x00, 0x00, 0x00, 0x14, // length(u32) @@ -550,7 +550,7 @@ mod test { ]; let mut parser = SomeipParser::new(); - let (output, message) = parser.parse(input, None).unwrap(); + let (output, message) = parser.parse(input, None).await.unwrap(); assert!(output.is_empty()); @@ -563,8 +563,8 @@ mod test { } } - #[test] - fn parse_sd_message() { + #[tokio::test] + async fn parse_sd_message() { let input: &[u8] = &[ 0xFF, 0xFF, 0x81, 0x00, // serviceId(u16), methodId(u16) 0x00, 0x00, 0x00, 0x40, // length(u32) @@ -592,7 +592,7 @@ mod test { ]; let mut parser = SomeipParser::new(); - let (output, message) = parser.parse(input, None).unwrap(); + let (output, message) = parser.parse(input, None).await.unwrap(); assert!(output.is_empty()); diff --git a/application/apps/indexer/parsers/src/text.rs b/application/apps/indexer/parsers/src/text.rs index 6a09a52b9c..1a30af92c9 100644 --- a/application/apps/indexer/parsers/src/text.rs +++ b/application/apps/indexer/parsers/src/text.rs @@ -27,7 +27,7 @@ impl Parser for StringTokenizer where StringMessage: LogMessage, { - fn parse<'b>( + async fn parse<'b>( &mut self, input: &'b [u8], _timestamp: Option, @@ -54,22 +54,28 @@ where } } -#[test] -fn test_string_tokenizer() { - let mut parser = StringTokenizer {}; - let content = b"hello\nworld\n"; - let (rest_1, first_msg) = parser.parse(content, None).unwrap(); - match first_msg { - Some(ParseYield::Message(StringMessage { content })) if content.eq("hello") => {} - _ => panic!("First message did not match"), - } - println!("rest_1 = {:?}", String::from_utf8_lossy(rest_1)); - let (rest_2, second_msg) = parser.parse(rest_1, None).unwrap(); - match second_msg { - Some(ParseYield::Message(StringMessage { content })) if content.eq("world") => {} - _ => panic!("Second message did not match"), +#[cfg(test)] +mod tests { + + use super::*; + + #[tokio::test] + async fn test_string_tokenizer() { + let mut parser = StringTokenizer {}; + let content = b"hello\nworld\n"; + let (rest_1, first_msg) = parser.parse(content, None).await.unwrap(); + match first_msg { + Some(ParseYield::Message(StringMessage { content })) if content.eq("hello") => {} + _ => panic!("First message did not match"), + } + println!("rest_1 = {:?}", String::from_utf8_lossy(rest_1)); + let (rest_2, second_msg) = parser.parse(rest_1, None).await.unwrap(); + match second_msg { + Some(ParseYield::Message(StringMessage { content })) if content.eq("world") => {} + _ => panic!("Second message did not match"), + } + let (rest_3, third_msg) = parser.parse(rest_2, None).await.unwrap(); + println!("rest_3 = {:?}", String::from_utf8_lossy(rest_3)); + assert!(third_msg.is_none()); } - let (rest_3, third_msg) = parser.parse(rest_2, None).unwrap(); - println!("rest_3 = {:?}", String::from_utf8_lossy(rest_3)); - assert!(third_msg.is_none()); } diff --git a/application/apps/indexer/processor/src/export/mod.rs b/application/apps/indexer/processor/src/export/mod.rs index a591b76ad1..d541eb448e 100644 --- a/application/apps/indexer/processor/src/export/mod.rs +++ b/application/apps/indexer/processor/src/export/mod.rs @@ -48,7 +48,7 @@ pub async fn export_raw( cancel: &CancellationToken, ) -> Result where - T: LogMessage + Sized, + T: LogMessage, S: futures::Stream)> + Unpin, { trace!("export_raw, sections: {sections:?}"); diff --git a/application/apps/indexer/session/src/handlers/export_raw.rs b/application/apps/indexer/session/src/handlers/export_raw.rs index e81e20d59b..05ddfc54e4 100644 --- a/application/apps/indexer/session/src/handlers/export_raw.rs +++ b/application/apps/indexer/session/src/handlers/export_raw.rs @@ -159,8 +159,8 @@ async fn export( let fmt_options = Some(FormatOptions::from(settings.tz.as_ref())); let parser = DltParser::new( settings.filter_config.as_ref().map(|f| f.into()), - settings.fibex_metadata.as_ref(), - fmt_options.as_ref(), + settings.fibex_metadata.clone(), + fmt_options, settings.with_storage_header, ); let mut producer = MessageProducer::new(parser, source, None); diff --git a/application/apps/indexer/session/src/handlers/observing/mod.rs b/application/apps/indexer/session/src/handlers/observing/mod.rs index 9d06a1f7fd..a37de78cb8 100644 --- a/application/apps/indexer/session/src/handlers/observing/mod.rs +++ b/application/apps/indexer/session/src/handlers/observing/mod.rs @@ -172,8 +172,8 @@ pub async fn run_source( let fmt_options = Some(FormatOptions::from(settings.tz.as_ref())); let dlt_parser = DltParser::new( settings.filter_config.as_ref().map(|f| f.into()), - settings.fibex_metadata.as_ref(), - fmt_options.as_ref(), + settings.fibex_metadata.clone(), + fmt_options.clone(), settings.with_storage_header, ); let producer = MessageProducer::new(dlt_parser, source, rx_sde); diff --git a/application/apps/indexer/sources/src/producer.rs b/application/apps/indexer/sources/src/producer.rs index 08248852ec..8044d7a266 100644 --- a/application/apps/indexer/sources/src/producer.rs +++ b/application/apps/indexer/sources/src/producer.rs @@ -133,6 +133,7 @@ impl, D: ByteSource> MessageProducer { match self .parser .parse(self.byte_source.current_slice(), self.last_seen_ts) + .await { Ok((rest, Some(m))) => { let consumed = available - rest.len(); diff --git a/application/apps/indexer/wasm_plugin/host/src/wasm_parser.rs b/application/apps/indexer/wasm_plugin/host/src/wasm_parser.rs index fd75cfd52a..cd522fb2b5 100644 --- a/application/apps/indexer/wasm_plugin/host/src/wasm_parser.rs +++ b/application/apps/indexer/wasm_plugin/host/src/wasm_parser.rs @@ -171,7 +171,7 @@ impl WasmParser { } #[inline] - fn parse_with_list<'a>( + async fn parse_with_list<'a>( &mut self, input: &'a [u8], timestamp: Option, @@ -180,19 +180,17 @@ impl WasmParser { // In case of errors we send the whole slice again. This could be optimized to reduce // the calls to wasm None | Some(Err(Error::Parse(_))) | Some(Err(Error::Incomplete)) => { - let results = futures::executor::block_on( - self.parse_translate.interface0.parser().call_parse( - &mut self.store, - self.parser_res, - input, - timestamp, - ), - ) - //TODO: Change this after implementing error definitions - .map_err(|err| { - println!("TODO AAZ: Early Error: {err}"); - parsers::Error::Parse(err.to_string()) - })?; + let results = self + .parse_translate + .interface0 + .parser() + .call_parse(&mut self.store, self.parser_res, input, timestamp) + .await + //TODO: Change this after implementing error definitions + .map_err(|err| { + println!("TODO AAZ: Early Error: {err}"); + parsers::Error::Parse(err.to_string()) + })?; self.cache = results.into(); self.cache .pop_front() @@ -217,103 +215,101 @@ impl WasmParser { } #[inline] - fn parse_with_res<'a>( + async fn parse_with_res<'a>( &mut self, input: &'a [u8], timestamp: Option, ) -> Result<(&'a [u8], Option>), parsers::Error> { - let queue = &mut self.store.data_mut().queue; - let raw_res = match queue.pop_front() { - // In case of errors we send the whole slice again. This could be optimized to reduce - // the calls to wasm - None | Some(Err(Error::Parse(_))) | Some(Err(Error::Incomplete)) => { - futures::executor::block_on( - self.parse_translate.interface0.parser().call_parse_res( - &mut self.store, - self.parser_res, - input, - timestamp, - ), - ) - //TODO: Change this after implementing error definitions - .map_err(|err| { - println!("TODO AAZ: Early Error: {err}"); - parsers::Error::Parse(err.to_string()) - })?; - return self.parse_with_res(input, timestamp); - } - Some(res) => res, - }; - - match raw_res { - Ok(val) => { - let remain = &input[val.cursor as usize..]; - let yld = val.value.map(|y| y.into_parsers_yield()); - - Ok((remain, yld)) - } - Err(err) => { - let err = err.into_parsers_err(); - // println!("TODO AAZ: Error: {err}"); - Err(err) + loop { + let queue = &mut self.store.data_mut().queue; + let raw_res = match queue.pop_front() { + // In case of errors we send the whole slice again. This could be optimized to reduce + // the calls to wasm + None | Some(Err(Error::Parse(_))) | Some(Err(Error::Incomplete)) => { + self.parse_translate + .interface0 + .parser() + .call_parse_res(&mut self.store, self.parser_res, input, timestamp) + .await + //TODO: Change this after implementing error definitions + .map_err(|err| { + println!("TODO AAZ: Early Error: {err}"); + parsers::Error::Parse(err.to_string()) + })?; + continue; + } + Some(res) => res, + }; + + match raw_res { + Ok(val) => { + let remain = &input[val.cursor as usize..]; + let yld = val.value.map(|y| y.into_parsers_yield()); + + return Ok((remain, yld)); + } + Err(err) => { + let err = err.into_parsers_err(); + // println!("TODO AAZ: Error: {err}"); + return Err(err); + } } } } #[inline] - fn parse_with_res_rng<'a>( + async fn parse_with_res_rng<'a>( &mut self, input: &'a [u8], timestamp: Option, ) -> Result<(&'a [u8], Option>), parsers::Error> { - let queue = &mut self.store.data_mut().queue; - let raw_res = match queue.pop_front() { - // In case of errors we send the whole slice again. This could be optimized to reduce - // the calls to wasm - None | Some(Err(Error::Parse(_))) | Some(Err(Error::Incomplete)) => { - futures::executor::block_on( - self.parse_translate.interface0.parser().call_parse_res_rng( - &mut self.store, - self.parser_res, - input, - timestamp, - ), - ) - //TODO: Change this after implementing error definitions - .map_err(|err| { - println!("TODO AAZ: Early Error: {err}"); - parsers::Error::Parse(err.to_string()) - })?; - return self.parse_with_res_rng(input, timestamp); - } - Some(res) => res, - }; - - match raw_res { - Ok(val) => { - let remain = &input[val.cursor as usize..]; - let yld = val.value.map(|y| y.into_parsers_yield()); - - Ok((remain, yld)) - } - Err(err) => { - let err = err.into_parsers_err(); - // println!("TODO AAZ: Error: {err}"); - Err(err) + loop { + let queue = &mut self.store.data_mut().queue; + let raw_res = match queue.pop_front() { + // In case of errors we send the whole slice again. This could be optimized to reduce + // the calls to wasm + None | Some(Err(Error::Parse(_))) | Some(Err(Error::Incomplete)) => { + self.parse_translate + .interface0 + .parser() + .call_parse_res_rng(&mut self.store, self.parser_res, input, timestamp) + .await + //TODO: Change this after implementing error definitions + .map_err(|err| { + println!("TODO AAZ: Early Error: {err}"); + parsers::Error::Parse(err.to_string()) + })?; + continue; + } + Some(res) => res, + }; + + match raw_res { + Ok(val) => { + let remain = &input[val.cursor as usize..]; + let yld = val.value.map(|y| y.into_parsers_yield()); + + return Ok((remain, yld)); + } + Err(err) => { + let err = err.into_parsers_err(); + // println!("TODO AAZ: Error: {err}"); + return Err(err); + } } } } } impl Parser for WasmParser { - fn parse<'a>( + async fn parse<'a>( &mut self, input: &'a [u8], timestamp: Option, ) -> Result<(&'a [u8], Option>), parsers::Error> { //TODO AAZ: Currently I'm using parse_with_res because it has the best perfomance on my //machine, but I need to test the other approaches on another machine - self.parse_with_res(input, timestamp) + self.parse_with_res(input, timestamp).await // match self.method { // ParseMethod::ReturnVec => self.parse_with_list(input, timestamp), // ParseMethod::ResSingle => self.parse_with_res(input, timestamp), diff --git a/application/apps/indexer/wasm_plugin/host/src/wasm_parser2.rs b/application/apps/indexer/wasm_plugin/host/src/wasm_parser2.rs index 7593ffb7e1..0367d8d5b9 100644 --- a/application/apps/indexer/wasm_plugin/host/src/wasm_parser2.rs +++ b/application/apps/indexer/wasm_plugin/host/src/wasm_parser2.rs @@ -142,7 +142,7 @@ impl WasmParser2 { } impl Parser for WasmParser2 { - fn parse<'a>( + async fn parse<'a>( &mut self, input: &'a [u8], timestamp: Option, @@ -151,12 +151,12 @@ impl Parser for WasmParser2 { state.slice_ptr = input.as_ptr() as usize; state.slice_len = input.len(); - let raw_res = - futures::executor::block_on(self.parse_translate.interface0.parser().call_parse_next( - &mut self.store, - self.parser_res, - timestamp, - )) + let raw_res = self + .parse_translate + .interface0 + .parser() + .call_parse_next(&mut self.store, self.parser_res, timestamp) + .await .unwrap(); match raw_res { diff --git a/application/apps/indexer/wasm_plugin/host/src/wasm_parser_inter.rs b/application/apps/indexer/wasm_plugin/host/src/wasm_parser_inter.rs index 48d024db06..1a0b1db1c0 100644 --- a/application/apps/indexer/wasm_plugin/host/src/wasm_parser_inter.rs +++ b/application/apps/indexer/wasm_plugin/host/src/wasm_parser_inter.rs @@ -145,7 +145,7 @@ impl WasmParserInter { } #[inline] - fn parse_with_list<'a>( + async fn parse_with_list<'a>( &mut self, input: &'a [u8], timestamp: Option, @@ -154,16 +154,16 @@ impl WasmParserInter { // In case of errors we send the whole slice again. This could be optimized to reduce // the calls to wasm None | Some(Err(Error::Parse(_))) | Some(Err(Error::Incomplete)) => { - let results = futures::executor::block_on( - self.parse_translate - .interface0 - .call_parse(&mut self.store, input, timestamp), - ) - //TODO: Change this after implementing error definitions - .map_err(|err| { - println!("TODO AAZ: Early Error: {err}"); - parsers::Error::Parse(err.to_string()) - })?; + let results = self + .parse_translate + .interface0 + .call_parse(&mut self.store, input, timestamp) + .await + //TODO: Change this after implementing error definitions + .map_err(|err| { + println!("TODO AAZ: Early Error: {err}"); + parsers::Error::Parse(err.to_string()) + })?; self.cache = results.into(); self.cache .pop_front() @@ -188,60 +188,61 @@ impl WasmParserInter { } #[inline] - fn parse_with_res<'a>( + async fn parse_with_res<'a>( &mut self, input: &'a [u8], timestamp: Option, ) -> Result<(&'a [u8], Option>), parsers::Error> { - let queue = &mut self.store.data_mut().queue; - let raw_res = match queue.pop_front() { - // In case of errors we send the whole slice again. This could be optimized to reduce - // the calls to wasm - None | Some(Err(Error::Parse(_))) | Some(Err(Error::Incomplete)) => { - futures::executor::block_on(self.parse_translate.interface0.call_parse_res( - &mut self.store, - input, - timestamp, - )) - //TODO: Change this after implementing error definitions - .map_err(|err| { - println!("TODO AAZ: Early Error: {err}"); - parsers::Error::Parse(err.to_string()) - })?; - return self.parse_with_res(input, timestamp); - } - Some(res) => res, - }; - - match raw_res { - Ok(val) => { - let remain = &input[val.cursor as usize..]; - let yld = val.value.map(|y| y.into_parsers_yield()); - - Ok((remain, yld)) - } - Err(err) => { - let err = err.into_parsers_err(); - // println!("TODO AAZ: Error: {err}"); - Err(err) + loop { + let queue = &mut self.store.data_mut().queue; + let raw_res = match queue.pop_front() { + // In case of errors we send the whole slice again. This could be optimized to reduce + // the calls to wasm + None | Some(Err(Error::Parse(_))) | Some(Err(Error::Incomplete)) => { + self.parse_translate + .interface0 + .call_parse_res(&mut self.store, input, timestamp) + .await + //TODO: Change this after implementing error definitions + .map_err(|err| { + println!("TODO AAZ: Early Error: {err}"); + parsers::Error::Parse(err.to_string()) + })?; + continue; + } + Some(res) => res, + }; + + match raw_res { + Ok(val) => { + let remain = &input[val.cursor as usize..]; + let yld = val.value.map(|y| y.into_parsers_yield()); + + return Ok((remain, yld)); + } + Err(err) => { + let err = err.into_parsers_err(); + // println!("TODO AAZ: Error: {err}"); + return Err(err); + } } } } } impl Parser for WasmParserInter { - fn parse<'a>( + async fn parse<'a>( &mut self, input: &'a [u8], timestamp: Option, ) -> Result<(&'a [u8], Option>), parsers::Error> { //TODO AAZ: Currently I'm using parse_with_res because it has the best perfomance on my //machine, but I need to test the other approaches on another machine - self.parse_with_res(input, timestamp) + self.parse_with_res(input, timestamp).await // match self.method { - // ParseMethod::ReturnVec => self.parse_with_list(input, timestamp), - // ParseMethod::ResSingle => self.parse_with_res(input, timestamp), - // ParseMethod::ResRange => self.parse_with_res_rng(input, timestamp), + // ParseMethod::ReturnVec => self.parse_with_list(input, timestamp).await, + // ParseMethod::ResSingle => self.parse_with_res(input, timestamp).await, + // ParseMethod::ResRange => self.parse_with_res_rng(input, timestamp).await, // } } }