Skip to content

Commit

Permalink
Use TryStreamExt for error handling of exporters
Browse files Browse the repository at this point in the history
  • Loading branch information
dantengsky committed Jul 5, 2022
1 parent f123f61 commit bd934b7
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 9 deletions.
11 changes: 6 additions & 5 deletions src/exporters/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
// limitations under the License.

use futures::stream;
use futures::StreamExt;
use futures::stream::StreamExt;
use futures::stream::TryStreamExt;

use crate::exceptions::Result;
use crate::BlockWorker;
Expand All @@ -36,7 +37,8 @@ impl BlockExporter {
pub async fn export(&self) -> Result<()> {
let jobs = self.numbers.chunks(self.ctx.get_batch_size()).len();
stream::iter(0..jobs)
.for_each_concurrent(self.ctx.get_max_worker(), |job| async move {
.map(Ok)
.try_for_each_concurrent(self.ctx.get_max_worker(), |job| async move {
let mut block_worker = BlockWorker::create(&self.ctx);
let mut chunks = self.numbers.chunks(self.ctx.get_batch_size());
let chunk = chunks.nth(job).unwrap();
Expand All @@ -53,9 +55,8 @@ impl BlockExporter {

// Receipts.
let receipt_worker = ReceiptExporter::create(&self.ctx, tx_hashes);
receipt_worker.export().await.unwrap();
receipt_worker.export().await
})
.await;
Ok(())
.await
}
}
10 changes: 6 additions & 4 deletions src/exporters/receipts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use futures::stream;
use futures::stream::TryStreamExt;
use futures::StreamExt;
use web3::types::H256;

Expand All @@ -36,15 +37,16 @@ impl ReceiptExporter {
pub async fn export(&self) -> Result<()> {
let jobs = self.hashes.chunks(self.ctx.get_batch_size()).len();
stream::iter(0..jobs)
.for_each_concurrent(self.ctx.get_max_worker(), |job| async move {
.map(Ok)
.try_for_each_concurrent(self.ctx.get_max_worker(), |job| async move {
let mut worker = ReceiptWorker::create(&self.ctx);
let mut chunks = self.hashes.chunks(self.ctx.get_batch_size());
let chunk = chunks.nth(job).unwrap();

worker.push_batch(chunk.to_vec()).unwrap();
let _res = worker.execute().await.unwrap();
worker.execute().await?;
Ok(())
})
.await;
Ok(())
.await
}
}

0 comments on commit bd934b7

Please sign in to comment.