Skip to content

Commit

Permalink
Read from Storage in file source (#3869)
Browse files Browse the repository at this point in the history
* Make the StorageResolver available in SourceRuntimeArgs

* Implement slice stream on storages

* Address review comments

* Make adjustments to make tests pass

* Re-enable the security check on the local store

* Fix connectivity check

* Fix the FileSourceParams deserialization to support URIs

* Address review comments

* Fix missing end in local file storage get_slice_stream
  • Loading branch information
rdettai authored Oct 5, 2023
1 parent ce0ac1f commit 4fc31e5
Show file tree
Hide file tree
Showing 22 changed files with 332 additions and 65 deletions.
4 changes: 2 additions & 2 deletions quickwit/quickwit-cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,13 +279,13 @@ pub async fn run_index_checklist(
if let Some(source_config) = source_config_opt {
checks.push((
source_config.source_id.as_str(),
check_source_connectivity(source_config).await,
check_source_connectivity(storage_resolver, source_config).await,
));
} else {
for source_config in index_metadata.sources.values() {
checks.push((
source_config.source_id.as_str(),
check_source_connectivity(source_config).await,
check_source_connectivity(storage_resolver, source_config).await,
));
}
}
Expand Down
15 changes: 11 additions & 4 deletions quickwit/quickwit-cli/tests/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use quickwit_proto::metastore::{EntityKind, MetastoreError};
use serde_json::{json, Number, Value};
use tokio::time::{sleep, Duration};

use crate::helpers::{create_test_env, PACKAGE_BIN_NAME};
use crate::helpers::{create_test_env, upload_test_file, PACKAGE_BIN_NAME};

async fn create_logs_index(test_env: &TestEnv) -> anyhow::Result<()> {
let args = CreateIndexArgs {
Expand Down Expand Up @@ -893,9 +893,16 @@ async fn test_all_with_s3_localstack_cli() {
test_env.start_server().await.unwrap();
create_logs_index(&test_env).await.unwrap();

local_ingest_docs(test_env.resource_files["logs"].as_path(), &test_env)
.await
.unwrap();
let s3_path = upload_test_file(
test_env.storage_resolver.clone(),
test_env.resource_files["logs"].clone(),
"quickwit-integration-tests",
"sources/",
&append_random_suffix("test-all--cli-s3-localstack"),
)
.await;

local_ingest_docs(&s3_path, &test_env).await.unwrap();

// Cli search
let args = SearchIndexArgs {
Expand Down
22 changes: 22 additions & 0 deletions quickwit/quickwit-cli/tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,3 +260,25 @@ pub async fn create_test_env(
storage,
})
}

/// TODO: this should be part of the test env setup
pub async fn upload_test_file(
storage_resolver: StorageResolver,
local_src_path: PathBuf,
bucket: &str,
prefix: &str,
filename: &str,
) -> PathBuf {
let test_data = tokio::fs::read(local_src_path).await.unwrap();
let mut src_location: PathBuf = [r"s3://", bucket, prefix].iter().collect();
let storage = storage_resolver
.resolve(&Uri::from_well_formed(src_location.to_string_lossy()))
.await
.unwrap();
storage
.put(&PathBuf::from(filename), Box::new(test_data))
.await
.unwrap();
src_location.push(filename);
src_location
}
11 changes: 7 additions & 4 deletions quickwit/quickwit-config/src/source_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,13 +263,16 @@ pub struct FileSourceParams {
pub filepath: Option<PathBuf>, //< If None read from stdin.
}

// Deserializing a filepath string into an absolute filepath.
/// Deserializing as an URI first to validate the input.
///
/// TODO: we might want to replace `PathBuf` with `Uri` directly in
/// `FileSourceParams`
fn absolute_filepath_from_str<'de, D>(deserializer: D) -> Result<Option<PathBuf>, D::Error>
where D: Deserializer<'de> {
let filepath_opt: Option<String> = Deserialize::deserialize(deserializer)?;
if let Some(filepath) = filepath_opt {
let uri = Uri::from_str(&filepath).map_err(D::Error::custom)?;
Ok(uri.filepath().map(|path| path.to_path_buf()))
Ok(Some(PathBuf::from(uri.as_str())))
} else {
Ok(None)
}
Expand Down Expand Up @@ -800,8 +803,8 @@ mod tests {
let uri = Uri::from_str("source-path.json").unwrap();
assert_eq!(
file_params.filepath.unwrap().as_path(),
uri.filepath().unwrap()
)
Path::new(uri.as_str())
);
}
}

Expand Down
6 changes: 3 additions & 3 deletions quickwit/quickwit-index-management/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ impl IndexService {
validate_identifier("Source ID", &source_id).map_err(|_| {
IndexServiceError::InvalidIdentifier(format!("invalid source ID: `{source_id}`"))
})?;
check_source_connectivity(&source_config)
check_source_connectivity(&self.storage_resolver, &source_config)
.await
.map_err(IndexServiceError::InvalidConfig)?;
self.metastore
Expand Down Expand Up @@ -368,7 +368,7 @@ mod tests {
#[tokio::test]
async fn test_create_index() {
let metastore = metastore_for_test();
let storage_resolver = StorageResolver::ram_for_test();
let storage_resolver = StorageResolver::ram_and_file_for_test();
let index_service = IndexService::new(metastore.clone(), storage_resolver);
let index_id = "test-index";
let index_uri = "ram://indexes/test-index";
Expand Down Expand Up @@ -404,7 +404,7 @@ mod tests {
#[tokio::test]
async fn test_delete_index() {
let metastore = metastore_for_test();
let storage_resolver = StorageResolver::ram_for_test();
let storage_resolver = StorageResolver::ram_and_file_for_test();
let storage = storage_resolver
.resolve(&Uri::for_test("ram://indexes/test-index"))
.await
Expand Down
8 changes: 7 additions & 1 deletion quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use quickwit_ingest::IngesterPool;
use quickwit_metastore::Metastore;
use quickwit_proto::indexing::IndexingPipelineId;
use quickwit_proto::metastore::MetastoreError;
use quickwit_storage::Storage;
use quickwit_storage::{Storage, StorageResolver};
use tokio::sync::Semaphore;
use tracing::{debug, error, info, instrument};

Expand Down Expand Up @@ -431,6 +431,7 @@ impl IndexingPipeline {
metastore: self.params.metastore.clone(),
ingester_pool: self.params.ingester_pool.clone(),
queues_dir_path: self.params.queues_dir_path.clone(),
storage_resolver: self.params.source_storage_resolver.clone(),
}),
source_checkpoint,
))
Expand Down Expand Up @@ -567,6 +568,7 @@ pub struct IndexingPipelineParams {

// Source-related parameters
pub source_config: SourceConfig,
pub source_storage_resolver: StorageResolver,
pub ingester_pool: IngesterPool,
pub queues_dir_path: PathBuf,
pub event_broker: EventBroker,
Expand Down Expand Up @@ -678,6 +680,7 @@ mod tests {
pipeline_id,
doc_mapper: Arc::new(default_doc_mapper_for_test()),
source_config,
source_storage_resolver: StorageResolver::ram_and_file_for_test(),
indexing_directory: TempDirectory::for_test(),
indexing_settings: IndexingSettings::for_test(),
ingester_pool: IngesterPool::default(),
Expand Down Expand Up @@ -780,6 +783,7 @@ mod tests {
pipeline_id,
doc_mapper: Arc::new(default_doc_mapper_for_test()),
source_config,
source_storage_resolver: StorageResolver::ram_and_file_for_test(),
indexing_directory: TempDirectory::for_test(),
indexing_settings: IndexingSettings::for_test(),
ingester_pool: IngesterPool::default(),
Expand Down Expand Up @@ -858,6 +862,7 @@ mod tests {
pipeline_id,
doc_mapper,
source_config,
source_storage_resolver: StorageResolver::ram_and_file_for_test(),
indexing_directory: TempDirectory::for_test(),
indexing_settings: IndexingSettings::for_test(),
ingester_pool: IngesterPool::default(),
Expand Down Expand Up @@ -982,6 +987,7 @@ mod tests {
pipeline_id,
doc_mapper: Arc::new(broken_mapper),
source_config,
source_storage_resolver: StorageResolver::ram_and_file_for_test(),
indexing_directory: TempDirectory::for_test(),
indexing_settings: IndexingSettings::for_test(),
ingester_pool: IngesterPool::default(),
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-indexing/src/actors/indexing_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ impl IndexingService {
source_config,
ingester_pool: self.ingester_pool.clone(),
queues_dir_path: self.queue_dir_path.clone(),
source_storage_resolver: self.storage_resolver.clone(),

event_broker: self.event_broker.clone(),
};
Expand Down
66 changes: 43 additions & 23 deletions quickwit/quickwit-indexing/src/source/file_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,20 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::fmt;
use std::io::SeekFrom;
use std::ops::Range;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;

use anyhow::Context;
use async_trait::async_trait;
use bytes::Bytes;
use quickwit_actors::{ActorExitStatus, Mailbox};
use quickwit_common::uri::Uri;
use quickwit_config::FileSourceParams;
use quickwit_metastore::checkpoint::{PartitionId, Position, SourceCheckpoint};
use serde::Serialize;
use tokio::fs::File;
use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncSeekExt, BufReader};
use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader};
use tracing::info;

use crate::actors::DocProcessor;
Expand All @@ -51,7 +52,7 @@ pub struct FileSource {
source_id: String,
params: FileSourceParams,
counters: FileSourceCounters,
reader: BufReader<Box<dyn AsyncRead + Send + Sync + Unpin>>,
reader: BufReader<Box<dyn AsyncRead + Send + Unpin>>,
}

impl fmt::Debug for FileSource {
Expand Down Expand Up @@ -137,28 +138,34 @@ impl TypedSourceFactory for FileSourceFactory {
checkpoint: SourceCheckpoint,
) -> anyhow::Result<FileSource> {
let mut offset = 0;
let reader: Box<dyn AsyncRead + Send + Sync + Unpin> =
if let Some(filepath) = &params.filepath {
let mut file = File::open(&filepath).await.with_context(|| {
format!("failed to open source file `{}`", filepath.display())
})?;
let partition_id = PartitionId::from(filepath.to_string_lossy().to_string());
if let Some(Position::Offset(offset_str)) =
checkpoint.position_for_partition(&partition_id).cloned()
{
offset = offset_str.parse::<u64>()?;
file.seek(SeekFrom::Start(offset)).await?;
}
Box::new(file)
} else {
// We cannot use the checkpoint.
Box::new(tokio::io::stdin())
};
let reader: Box<dyn AsyncRead + Send + Unpin> = if let Some(filepath) = &params.filepath {
let partition_id = PartitionId::from(filepath.to_string_lossy().to_string());
if let Some(Position::Offset(offset_str)) =
checkpoint.position_for_partition(&partition_id).cloned()
{
offset = offset_str.parse::<usize>()?;
}
let (dir_uri, file_name) = dir_and_filename(filepath)?;
let storage = ctx.storage_resolver.resolve(&dir_uri).await?;
let file_size = storage.file_num_bytes(file_name).await?.try_into().unwrap();
storage
.get_slice_stream(
file_name,
Range {
start: offset,
end: file_size,
},
)
.await?
} else {
// We cannot use the checkpoint.
Box::new(tokio::io::stdin())
};
let file_source = FileSource {
source_id: ctx.source_id().to_string(),
counters: FileSourceCounters {
previous_offset: offset,
current_offset: offset,
previous_offset: offset as u64,
current_offset: offset as u64,
num_lines_processed: 0,
},
reader: BufReader::new(reader),
Expand All @@ -168,6 +175,19 @@ impl TypedSourceFactory for FileSourceFactory {
}
}

pub(crate) fn dir_and_filename(filepath: &Path) -> anyhow::Result<(Uri, &Path)> {
let dir_uri: Uri = filepath
.parent()
.context("Parent directory could not be resolved")?
.to_str()
.context("Path cannot be turned to string")?
.parse()?;
let file_name = filepath
.file_name()
.context("Path does not appear to be a file")?;
Ok((dir_uri, file_name.as_ref()))
}

#[cfg(test)]
mod tests {
use std::io::Write;
Expand Down
4 changes: 4 additions & 0 deletions quickwit/quickwit-indexing/src/source/ingest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,7 @@ mod tests {
use quickwit_proto::ingest::ingester::{IngesterServiceClient, TruncateResponse};
use quickwit_proto::ingest::{DocBatchV2, Shard};
use quickwit_proto::metastore::{AcquireShardsResponse, AcquireShardsSubresponse};
use quickwit_storage::StorageResolver;
use tokio::sync::watch;

use super::*;
Expand Down Expand Up @@ -531,6 +532,7 @@ mod tests {
metastore,
ingester_pool: ingester_pool.clone(),
queues_dir_path: PathBuf::from("./queues"),
storage_resolver: StorageResolver::ram_and_file_for_test(),
});
let checkpoint = SourceCheckpoint::default();
let mut source = IngestSource::try_new(runtime_args, checkpoint)
Expand Down Expand Up @@ -608,6 +610,7 @@ mod tests {
metastore,
ingester_pool: ingester_pool.clone(),
queues_dir_path: PathBuf::from("./queues"),
storage_resolver: StorageResolver::ram_and_file_for_test(),
});
let checkpoint = SourceCheckpoint::default();
let mut source = IngestSource::try_new(runtime_args, checkpoint)
Expand Down Expand Up @@ -760,6 +763,7 @@ mod tests {
metastore,
ingester_pool: ingester_pool.clone(),
queues_dir_path: PathBuf::from("./queues"),
storage_resolver: StorageResolver::ram_and_file_for_test(),
});
let checkpoint = SourceCheckpoint::default();
let mut source = IngestSource::try_new(runtime_args, checkpoint)
Expand Down
Loading

0 comments on commit 4fc31e5

Please sign in to comment.