diff --git a/ethetl/src/etl/mod.rs b/ethetl/src/etl/mod.rs index 64bb6f6..1911814 100644 --- a/ethetl/src/etl/mod.rs +++ b/ethetl/src/etl/mod.rs @@ -25,6 +25,9 @@ pub use pipeline::Pipeline; pub use stream::StreamEtl; pub use worker::Worker; +// The syncing status file. +pub static SYNCING_STATUS_FILE: &str = "mars_syncing_status.json"; + #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] struct SyncingStatus { start: usize, diff --git a/ethetl/src/etl/normal.rs b/ethetl/src/etl/normal.rs index 7874c90..4235c65 100644 --- a/ethetl/src/etl/normal.rs +++ b/ethetl/src/etl/normal.rs @@ -18,8 +18,7 @@ use log::info; use crate::contexts::ContextRef; use crate::etl::Batch; use crate::etl::SyncingStatus; - -static NORMAL_SYNCING_STATUS_FILE: &str = "mars_normal_syncing_status.json"; +use crate::etl::SYNCING_STATUS_FILE; pub struct NormalEtl { ctx: ContextRef, @@ -37,21 +36,19 @@ impl NormalEtl { // Fetch syncing file. { let op = self.ctx.get_storage(); - if let Ok(data) = op.object(NORMAL_SYNCING_STATUS_FILE).read().await { + if let Ok(data) = op.object(SYNCING_STATUS_FILE).read().await { let prev_syncing_status: SyncingStatus = serde_json::from_slice(&data)?; start = prev_syncing_status.end + 1; info!( "Found normal syncing status file={}, status={:?}", - NORMAL_SYNCING_STATUS_FILE, prev_syncing_status + SYNCING_STATUS_FILE, prev_syncing_status ); } } if start <= end { let batch = Batch::create(self.ctx.clone()); - batch - .syncing(start, end, NORMAL_SYNCING_STATUS_FILE) - .await?; + batch.syncing(start, end, SYNCING_STATUS_FILE).await?; } Ok(()) diff --git a/ethetl/src/etl/stream.rs b/ethetl/src/etl/stream.rs index 3ba79d0..70fa456 100644 --- a/ethetl/src/etl/stream.rs +++ b/ethetl/src/etl/stream.rs @@ -22,8 +22,7 @@ use crate::chains::eth::BlockNumber; use crate::contexts::ContextRef; use crate::etl::Batch; use crate::etl::SyncingStatus; - -static STREAM_SYNCING_STATUS_FILE: &str = "mars_stream_syncing_status.json"; +use crate::etl::SYNCING_STATUS_FILE; pub struct StreamEtl { ctx: ContextRef, @@ -40,12 +39,12 @@ impl StreamEtl { // Fetch syncing file. { let op = self.ctx.get_storage(); - if let Ok(data) = op.object(STREAM_SYNCING_STATUS_FILE).read().await { + if let Ok(data) = op.object(SYNCING_STATUS_FILE).read().await { let prev_syncing_status: SyncingStatus = serde_json::from_slice(&data)?; start = prev_syncing_status.end + 1; info!( "Found syncing status file={}, status={:?}", - STREAM_SYNCING_STATUS_FILE, prev_syncing_status + SYNCING_STATUS_FILE, prev_syncing_status ); } } @@ -63,9 +62,7 @@ impl StreamEtl { }; if start <= end { let batch = Batch::create(self.ctx.clone()); - batch - .syncing(start, end, STREAM_SYNCING_STATUS_FILE) - .await?; + batch.syncing(start, end, SYNCING_STATUS_FILE).await?; start = end + 1; } }