Skip to content

Commit

Permalink
Download snapshots by most recent ledger index (#780)
Browse files Browse the repository at this point in the history
* Implement partial snapshot downloads

* Nit

* Bump bee-ledger version

* Fix clippy

* Remove function with ambiguous name

* Address comments

* Small improvements

* Improve robustness

* Small improvements

* Feedback

* Rename to `is_consistent`

* More feedback

* Cosmetics

* f

Co-authored-by: Thibault Martinez <[email protected]>
Co-authored-by: Jochen Görtler <[email protected]>
  • Loading branch information
3 people authored Dec 7, 2021
1 parent b473277 commit bd3bc8c
Show file tree
Hide file tree
Showing 13 changed files with 237 additions and 65 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion bee-api/bee-rest-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ keywords = [ "iota", "bee", "framework", "node", "api" ]
homepage = "https://www.iota.org"

[dependencies]
bee-ledger = { version = "0.5.0", path = "../../bee-ledger", default-features = false }
bee-ledger = { version = "0.6.0", path = "../../bee-ledger", default-features = false }
bee-message = { version = "0.1.5", path = "../../bee-message", default-features = false }
bee-protocol = { version = "0.1.1", path = "../../bee-protocol", default-features = false, optional = true }

Expand Down
13 changes: 13 additions & 0 deletions bee-ledger/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,19 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Security -->

## 0.6.0 - 2021-12-07

### Added

- Download snapshots by most recent ledger index;
- Derive `Clone` for `Balance`, `OutputDiff`, `TreasuryDiff` and `Unspent`;

### Changed

- Decouple snapshot names and download URLs;
- Reduced number of dependencies features;
- Update bee-tangle version and change `MsTangle` to `Tangle`;

## 0.5.0 - 2021-08-30

### Added
Expand Down
4 changes: 3 additions & 1 deletion bee-ledger/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "bee-ledger"
version = "0.5.0"
version = "0.6.0"
authors = [ "IOTA Stiftung" ]
edition = "2021"
description = "All types and features required to compute and maintain the ledger state"
Expand All @@ -18,6 +18,7 @@ bee-storage = { version = "0.9.0", path = "../bee-storage/bee-storage", default-
bee-tangle = { version = "0.2.0", path = "../bee-tangle", default-features = false, optional = true }

async-trait = { version = "0.1.51", default-features = false, optional = true }
bytes = { version = "1.0.1", default-features = false, optional = true }
digest = { version = "0.9.0", default-features = false, optional = true }
futures = { version = "0.3.17", default-features = false, optional = true }
hashbrown = { version = "0.11.2", default-features = false, optional = true }
Expand All @@ -38,6 +39,7 @@ workers = [
"bee-storage",
"bee-tangle",
"async-trait",
"bytes",
"digest",
"futures",
"hashbrown",
Expand Down
3 changes: 3 additions & 0 deletions bee-ledger/src/types/snapshot/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ pub struct SnapshotHeader {
}

impl SnapshotHeader {
/// The length, in bytes, of a `SnapshotHeader`.
pub const LENGTH: usize = 26;

/// Returns the kind of a `SnapshotHeader`.
pub fn kind(&self) -> SnapshotKind {
self.kind
Expand Down
210 changes: 187 additions & 23 deletions bee-ledger/src/workers/snapshot/download.rs
Original file line number Diff line number Diff line change
@@ -1,42 +1,206 @@
// Copyright 2020-2021 IOTA Stiftung
// SPDX-License-Identifier: Apache-2.0

use crate::workers::snapshot::error::Error;
use crate::{
types::snapshot::SnapshotHeader,
workers::snapshot::{config::DownloadUrls, error::Error},
};

use log::{info, warn};
use bee_common::packable::Packable;
use bee_message::milestone::MilestoneIndex;

use std::path::Path;
use bytes::Buf;
use futures::{future::join_all, StreamExt};
use log::{debug, info, warn};
use reqwest::Response;

pub(crate) async fn download_snapshot_file(
path: &Path,
download_urls: impl Iterator<Item = &str>,
) -> Result<(), Error> {
use std::{io::Read, path::Path};

async fn download_snapshot_header(download_url: &str) -> Result<SnapshotHeader, Error> {
debug!("Downloading snapshot header {}...", download_url);

match reqwest::get(download_url).await.and_then(Response::error_for_status) {
Ok(res) => {
let mut stream = res.bytes_stream();
let mut bytes = Vec::<u8>::with_capacity(SnapshotHeader::LENGTH);

while let Some(chunk) = stream.next().await {
let mut chunk_reader = chunk.map_err(|_| Error::DownloadingFailed)?.reader();

let mut buf = Vec::new();
chunk_reader.read_to_end(&mut buf)?;
bytes.extend_from_slice(&buf);

if bytes.len() >= SnapshotHeader::LENGTH {
debug!("Downloaded snapshot header from {}.", download_url);

let mut slice: &[u8] = &bytes[..SnapshotHeader::LENGTH];

return Ok(SnapshotHeader::unpack(&mut slice)?);
}
}
}
Err(e) => warn!("Downloading snapshot header failed: {:?}.", e.to_string()),
}

Err(Error::DownloadingFailed)
}

struct SourceInformation<'a> {
urls: &'a DownloadUrls,
full_header: SnapshotHeader,
delta_header: Option<SnapshotHeader>,
}

impl<'a> SourceInformation<'a> {
async fn download_snapshots(
&self,
full_snapshot_path: &Path,
delta_snapshot_path: Option<&Path>,
) -> Result<(), Error> {
download_snapshot_file(full_snapshot_path, self.urls.full()).await?;

if let Some(delta_path) = delta_snapshot_path {
download_snapshot_file(delta_path, self.urls.delta()).await?;
}

Ok(())
}

fn index(&self) -> MilestoneIndex {
self.delta_header
.as_ref()
.map_or(self.full_header.sep_index(), SnapshotHeader::sep_index)
}

fn is_consistent(&self, wanted_network_id: u64) -> bool {
if self.full_header.network_id() != wanted_network_id {
warn!(
"Full snapshot network ID does not match ({} != {}): {}.",
self.full_header.network_id(),
wanted_network_id,
self.urls.full()
);
return false;
};

if let Some(delta_header) = self.delta_header.as_ref() {
if delta_header.network_id() != wanted_network_id {
warn!(
"Delta snapshot network ID does not match ({} != {}): {}.",
delta_header.network_id(),
wanted_network_id,
self.urls.delta()
);
return false;
};

if self.full_header.sep_index() > delta_header.sep_index() {
warn!(
"Full snapshot SEP index is bigger than delta snapshot SEP index ({} > {}): {}.",
self.full_header.sep_index(),
delta_header.sep_index(),
self.urls.full()
);
return false;
}

if self.full_header.sep_index() != delta_header.ledger_index() {
warn!(
"Full snapshot SEP index does not match the delta snapshot ledger index ({} != {}): {}.",
self.full_header.sep_index(),
delta_header.ledger_index(),
self.urls.full()
);
return false;
}
}

true
}
}

async fn gather_source_information(
download_delta: bool,
urls: &'_ DownloadUrls,
) -> Result<SourceInformation<'_>, Error> {
let full_header = download_snapshot_header(urls.full()).await?;
let delta_header = if download_delta {
Some(download_snapshot_header(urls.delta()).await?)
} else {
None
};

Ok(SourceInformation {
urls,
full_header,
delta_header,
})
}

async fn download_snapshot_file(path: &Path, download_url: &str) -> Result<(), Error> {
tokio::fs::create_dir_all(
path.parent()
.ok_or_else(|| Error::InvalidFilePath(format!("{}", path.display())))?,
)
.await
.map_err(|_| Error::InvalidFilePath(format!("{}", path.display())))?;

for url in download_urls {
info!("Downloading snapshot file {}...", url);

match reqwest::get(url).await.and_then(|res| res.error_for_status()) {
Ok(res) => {
tokio::io::copy(
&mut res.bytes().await.map_err(|_| Error::DownloadingFailed)?.as_ref(),
&mut tokio::fs::File::create(path).await?,
)
.await?;
break;
}
Err(e) => warn!("Downloading snapshot file failed with status code {:?}.", e.status()),
info!("Downloading snapshot file {}...", download_url);

match reqwest::get(download_url).await {
Ok(res) => {
tokio::io::copy(
&mut res.bytes().await.map_err(|_| Error::DownloadingFailed)?.as_ref(),
&mut tokio::fs::File::create(path).await?,
)
.await?;
}
Err(e) => warn!("Downloading snapshot file failed with status code {:?}.", e.status()),
}

if !path.exists() {
return Err(Error::NoDownloadSourceAvailable);
Ok(())
}

/// Tries to download the latest snapshot files from the sources specified in the `SnapshotConfig`.
///
/// * `wanted_network_id` - The id of the current network (typically the hash of the network name).
/// * `full_snapshot_path` - The location where the full snapshot will be stored.
/// * `full_snapshot_path` - The location where the delta snapshot will be stored.
/// * `download_urls` - The list of snapshot sources.
pub(crate) async fn download_latest_snapshot_files(
wanted_network_id: u64,
full_snapshot_path: &Path,
delta_snapshot_path: Option<&Path>,
download_urls: &[DownloadUrls],
) -> Result<(), Error> {
let download_delta = delta_snapshot_path.is_some();

let all_sources = join_all(
download_urls
.iter()
.map(|source| gather_source_information(download_delta, source)),
)
.await;

let mut available_sources = all_sources
.into_iter()
.flatten()
.filter(|source| source.is_consistent(wanted_network_id))
.collect::<Vec<SourceInformation>>();

// Sort all available sources so that the freshest is at the end.
available_sources.sort_by_key(SourceInformation::index);

while let Some(source) = available_sources.pop() {
if source
.download_snapshots(full_snapshot_path, delta_snapshot_path)
.await
.is_ok()
{
return Ok(());
}
}

Ok(())
Err(Error::NoDownloadSourceAvailable)
}
29 changes: 16 additions & 13 deletions bee-ledger/src/workers/snapshot/error.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright 2020-2021 IOTA Stiftung
// SPDX-License-Identifier: Apache-2.0

use crate::types::snapshot::SnapshotKind;
use crate::types::{snapshot::SnapshotKind, Error as TypesError};

use bee_message::milestone::MilestoneIndex;

Expand All @@ -11,41 +11,44 @@ use thiserror::Error;
#[derive(Debug, Error)]
pub enum Error {
/// I/O error happened.
#[error("I/O error happened: {0}")]
#[error("I/O error: {0}")]
Io(#[from] std::io::Error),
/// Types error.
#[error("types error: {0}")]
Types(#[from] TypesError),
/// Unexpected snapshot kind.
#[error("Unexpected snapshot kind: expected {0:?}, read {1:?}")]
#[error("unexpected snapshot kind: expected {0:?}, read {1:?}")]
UnexpectedSnapshotKind(SnapshotKind, SnapshotKind),
/// Downloading failed.
#[error("Downloading failed")]
#[error("downloading failed")]
DownloadingFailed,
/// No snapshot download source available.
#[error("No snapshot download source available")]
#[error("no snapshot download source available")]
NoDownloadSourceAvailable,
/// Invalid file path.
#[error("Invalid file path: {0}")]
#[error("invalid file path: {0}")]
InvalidFilePath(String),
/// Network id mismatch between configuration and snapshot.
#[error("Network id mismatch between configuration and snapshot: {0} != {1}")]
#[error("network id mismatch between configuration and snapshot: {0} != {1}")]
NetworkIdMismatch(u64, u64),
/// Inconsistency between ledger index and sep index.
#[error("Inconsistency between ledger index {0} and sep index {1}")]
#[error("inconsistency between ledger index {0} and sep index {1}")]
LedgerSepIndexesInconsistency(MilestoneIndex, MilestoneIndex),
/// Invalid milestone diffs count.
#[error("Invalid milestone diffs count: expected {0}, read {1}")]
#[error("invalid milestone diffs count: expected {0}, read {1}")]
InvalidMilestoneDiffsCount(usize, usize),
/// Only a delta snapshot file exists without a full snapshot file.
#[error(
"Only a delta snapshot file exists without a full snapshot file. Remove the delta snapshot file and restart"
"only a delta snapshot file exists without a full snapshot file (remove the delta snapshot file and restart)"
)]
OnlyDeltaSnapshotFileExists,
/// Unexpected milestone diff index.
#[error("Unexpected milestone diff index: {0:?}")]
#[error("unexpected milestone diff index: {0:?}")]
UnexpectedMilestoneDiffIndex(MilestoneIndex),
/// Missing consumed treasury.
#[error("Missing consumed treasury")]
#[error("missing consumed treasury")]
MissingConsumedTreasury,
/// Remaining bytes in file.
#[error("Remaining bytes in file")]
#[error("remaining bytes in file")]
RemainingBytes,
}
Loading

0 comments on commit bd3bc8c

Please sign in to comment.