From 65c49e626b49a15fecdb9b3b1965d1bf21143cfa Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Thu, 25 Jan 2024 10:13:23 +0000 Subject: [PATCH] Fix Gzip file source --- .../src/actors/indexing_pipeline.rs | 52 ++++++++++++++----- .../src/source/file_source.rs | 7 --- 2 files changed, 40 insertions(+), 19 deletions(-) diff --git a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs index 2c2bdfe6ee9..769d8c4e946 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs @@ -636,7 +636,8 @@ mod tests { async fn test_indexing_pipeline_num_fails_before_success( mut num_fails: usize, - ) -> anyhow::Result { + test_file: &str, + ) -> anyhow::Result<()> { let universe = Universe::new(); let mut metastore = MetastoreServiceClient::mock(); metastore @@ -696,7 +697,7 @@ mod tests { max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), desired_num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, - source_params: SourceParams::file(PathBuf::from("data/test_corpus.json")), + source_params: SourceParams::file(PathBuf::from(test_file)), transform_config: None, input_format: SourceInputFormat::Json, }; @@ -728,23 +729,31 @@ mod tests { let (pipeline_exit_status, pipeline_statistics) = pipeline_handle.join().await; assert_eq!(pipeline_statistics.generation, 1); assert_eq!(pipeline_statistics.num_spawn_attempts, 1 + num_fails); - Ok(pipeline_exit_status.is_success()) + assert!(pipeline_exit_status.is_success()); + Ok(()) } #[tokio::test] async fn test_indexing_pipeline_retry_0() -> anyhow::Result<()> { - test_indexing_pipeline_num_fails_before_success(0).await?; - Ok(()) + test_indexing_pipeline_num_fails_before_success(0, "data/test_corpus.json").await } #[tokio::test] async fn test_indexing_pipeline_retry_1() -> anyhow::Result<()> { - test_indexing_pipeline_num_fails_before_success(1).await?; - Ok(()) + test_indexing_pipeline_num_fails_before_success(1, "data/test_corpus.json").await } #[tokio::test] - async fn test_indexing_pipeline_simple() -> anyhow::Result<()> { + async fn test_indexing_pipeline_retry_0_gz() -> anyhow::Result<()> { + test_indexing_pipeline_num_fails_before_success(0, "data/test_corpus.json.gz").await + } + + #[tokio::test] + async fn test_indexing_pipeline_retry_1_gz() -> anyhow::Result<()> { + test_indexing_pipeline_num_fails_before_success(1, "data/test_corpus.json.gz").await + } + + async fn indexing_pipeline_simple(test_file: &str) -> anyhow::Result<()> { let mut metastore = MetastoreServiceClient::mock(); metastore .expect_index_metadata() @@ -796,7 +805,7 @@ mod tests { max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), desired_num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, - source_params: SourceParams::file(PathBuf::from("data/test_corpus.json")), + source_params: SourceParams::file(PathBuf::from(test_file)), transform_config: None, input_format: SourceInputFormat::Json, }; @@ -833,6 +842,16 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_indexing_pipeline_simple() -> anyhow::Result<()> { + indexing_pipeline_simple("data/test_corpus.json").await + } + + #[tokio::test] + async fn test_indexing_pipeline_simple_gz() -> anyhow::Result<()> { + indexing_pipeline_simple("data/test_corpus.json.gz").await + } + #[tokio::test] async fn test_merge_pipeline_does_not_stop_on_indexing_pipeline_failure() { let mut mock_metastore = MetastoreServiceClient::mock(); @@ -930,8 +949,7 @@ mod tests { panic!("Pipeline was apparently not restarted."); } - #[tokio::test] - async fn test_indexing_pipeline_all_failures_handling() -> anyhow::Result<()> { + async fn indexing_pipeline_all_failures_handling(test_file: &str) -> anyhow::Result<()> { let mut metastore = MetastoreServiceClient::mock(); metastore .expect_index_metadata() @@ -981,7 +999,7 @@ mod tests { max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), desired_num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, - source_params: SourceParams::file(PathBuf::from("data/test_corpus.json")), + source_params: SourceParams::file(PathBuf::from(test_file)), transform_config: None, input_format: SourceInputFormat::Json, }; @@ -1040,4 +1058,14 @@ mod tests { universe.assert_quit().await; Ok(()) } + + #[tokio::test] + async fn test_indexing_pipeline_all_failures_handling() -> anyhow::Result<()> { + indexing_pipeline_all_failures_handling("data/test_corpus.json").await + } + + #[tokio::test] + async fn test_indexing_pipeline_all_failures_handling_gz() -> anyhow::Result<()> { + indexing_pipeline_all_failures_handling("data/test_corpus.json.gz").await + } } diff --git a/quickwit/quickwit-indexing/src/source/file_source.rs b/quickwit/quickwit-indexing/src/source/file_source.rs index 78b1c08be36..908d415470e 100644 --- a/quickwit/quickwit-indexing/src/source/file_source.rs +++ b/quickwit/quickwit-indexing/src/source/file_source.rs @@ -153,13 +153,6 @@ impl TypedSourceFactory for FileSourceFactory { let (dir_uri, file_name) = dir_and_filename(filepath)?; let storage = ctx.storage_resolver.resolve(&dir_uri).await?; let file_size = storage.file_num_bytes(file_name).await?.try_into().unwrap(); - if offset > file_size { - return Err(anyhow::anyhow!( - "offset {} can't be greater than the file size {}", - offset, - file_size - )); - } // If it's a gzip file, we can't seek to a specific offset, we need to start from the // beginning of the file, decompress and skip the first `offset` bytes. if filepath.extension() == Some(OsStr::new("gz")) {