Skip to content

Commit

Permalink
Merge pull request #1885 from michalrus/LW-11112-file-scheme-urls
Browse files Browse the repository at this point in the history
feat: support `file://` URLs for snapshot locations
  • Loading branch information
jpraynaud authored Aug 23, 2024
2 parents 33ee2fd + 6ffd08c commit b3faff0
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 20 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,5 @@ mithril-infra/terraform.tfstate*
mithril-infra/*.tfvars
justfile

# Outputs of nix-build:
result
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ As a minor extension, we have adopted a slightly different versioning convention

- Support for Mithril nodes footprint support in Prometheus monitoring in infrastructure
- Add support for custom HTTP headers in Mithril client WASM library
- Support `file://` URLs for snapshot locations

- **UNSTABLE** Cardano stake distribution certification:

Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion mithril-client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "mithril-client"
version = "0.8.14"
version = "0.8.15"
description = "Mithril client library"
authors = { workspace = true }
edition = { workspace = true }
Expand Down
113 changes: 95 additions & 18 deletions mithril-client/src/snapshot_downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,13 @@
use anyhow::{anyhow, Context};
use async_trait::async_trait;
use futures::StreamExt;
use reqwest::Url;
use reqwest::{Response, StatusCode};
use slog::{debug, Logger};
use std::fs;
use std::path::Path;
use tokio::fs::File;
use tokio::io::AsyncReadExt;

#[cfg(test)]
use mockall::automock;
Expand Down Expand Up @@ -78,6 +82,74 @@ impl HttpSnapshotDownloader {
status_code => Err(anyhow!("Unhandled error {status_code}")),
}
}

fn file_scheme_to_local_path(file_url: &str) -> Option<String> {
Url::parse(file_url)
.ok()
.filter(|url| url.scheme() == "file")
.and_then(|url| url.to_file_path().ok())
.map(|path| path.to_string_lossy().into_owned())
}

async fn download_local_file<F, Fut>(
&self,
local_path: &str,
sender: &flume::Sender<Vec<u8>>,
report_progress: F,
) -> MithrilResult<()>
where
F: Fn(u64) -> Fut,
Fut: std::future::Future<Output = ()>,
{
// Stream the `location` directly from the local filesystem
let mut downloaded_bytes: u64 = 0;
let mut file = File::open(local_path).await?;

loop {
// We can either allocate here each time, or clone a shared buffer into sender.
// A larger read buffer is faster, less context switches:
let mut buffer = vec![0; 16 * 1024 * 1024];
let bytes_read = file.read(&mut buffer).await?;
if bytes_read == 0 {
break;
}
buffer.truncate(bytes_read);
sender.send_async(buffer).await.with_context(|| {
format!(
"Local file read: could not write {} bytes to stream.",
bytes_read
)
})?;
downloaded_bytes += bytes_read as u64;
report_progress(downloaded_bytes).await
}
Ok(())
}

async fn download_remote_file<F, Fut>(
&self,
location: &str,
sender: &flume::Sender<Vec<u8>>,
report_progress: F,
) -> MithrilResult<()>
where
F: Fn(u64) -> Fut,
Fut: std::future::Future<Output = ()>,
{
let mut downloaded_bytes: u64 = 0;
let mut remote_stream = self.get(location).await?.bytes_stream();
while let Some(item) = remote_stream.next().await {
let chunk = item.with_context(|| "Download: Could not read from byte stream")?;

sender.send_async(chunk.to_vec()).await.with_context(|| {
format!("Download: could not write {} bytes to stream.", chunk.len())
})?;

downloaded_bytes += chunk.len() as u64;
report_progress(downloaded_bytes).await
}
Ok(())
}
}

#[cfg_attr(test, automock)]
Expand All @@ -97,8 +169,6 @@ impl SnapshotDownloader for HttpSnapshotDownloader {
.context("Download-Unpack: prerequisite error"),
)?;
}
let mut downloaded_bytes: u64 = 0;
let mut remote_stream = self.get(location).await?.bytes_stream();
let (sender, receiver) = flume::bounded(5);

let dest_dir = target_dir.to_path_buf();
Expand All @@ -107,21 +177,22 @@ impl SnapshotDownloader for HttpSnapshotDownloader {
unpacker.unpack_snapshot(receiver, compression_algorithm, &dest_dir)
});

while let Some(item) = remote_stream.next().await {
let chunk = item.with_context(|| "Download: Could not read from byte stream")?;

sender.send_async(chunk.to_vec()).await.with_context(|| {
format!("Download: could not write {} bytes to stream.", chunk.len())
})?;

downloaded_bytes += chunk.len() as u64;
let report_progress = |downloaded_bytes: u64| async move {
self.feedback_sender
.send_event(MithrilEvent::SnapshotDownloadProgress {
download_id: download_id.to_owned(),
downloaded_bytes,
size: snapshot_size,
})
.await
};

if let Some(local_path) = Self::file_scheme_to_local_path(location) {
self.download_local_file(&local_path, &sender, report_progress)
.await?;
} else {
self.download_remote_file(location, &sender, report_progress)
.await?;
}

drop(sender); // Signal EOF
Expand All @@ -143,15 +214,21 @@ impl SnapshotDownloader for HttpSnapshotDownloader {
async fn probe(&self, location: &str) -> MithrilResult<()> {
debug!(self.logger, "HEAD Snapshot location='{location}'.");

let request_builder = self.http_client.head(location);
let response = request_builder.send().await.with_context(|| {
format!("Cannot perform a HEAD for snapshot at location='{location}'")
})?;
if let Some(local_path) = Self::file_scheme_to_local_path(location) {
fs::metadata(local_path)
.with_context(|| format!("Local snapshot location='{location}' not found"))
.map(drop)
} else {
let request_builder = self.http_client.head(location);
let response = request_builder.send().await.with_context(|| {
format!("Cannot perform a HEAD for snapshot at location='{location}'")
})?;

match response.status() {
StatusCode::OK => Ok(()),
StatusCode::NOT_FOUND => Err(anyhow!("Snapshot location='{location} not found")),
status_code => Err(anyhow!("Unhandled error {status_code}")),
match response.status() {
StatusCode::OK => Ok(()),
StatusCode::NOT_FOUND => Err(anyhow!("Snapshot location='{location} not found")),
status_code => Err(anyhow!("Unhandled error {status_code}")),
}
}
}
}

0 comments on commit b3faff0

Please sign in to comment.