diff --git a/CHANGELOG.md b/CHANGELOG.md index 810d78da6..9828627c9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -153,6 +153,8 @@ ### Unreleased +- Load previous logs before subscribing to new logs in case fromBlock is set + [1264](https://github.com/gakonst/ethers-rs/pull/1264) - Add retries to the pending transaction future [1221](https://github.com/gakonst/ethers-rs/pull/1221) - Add support for basic and bearer authentication in http and non-wasm websockets. diff --git a/ethers-providers/src/provider.rs b/ethers-providers/src/provider.rs index 1bdfe2006..6dab10d7e 100644 --- a/ethers-providers/src/provider.rs +++ b/ethers-providers/src/provider.rs @@ -16,9 +16,9 @@ use ethers_core::{ types::{ transaction::{eip2718::TypedTransaction, eip2930::AccessListWithGasUsed}, Address, Block, BlockId, BlockNumber, BlockTrace, Bytes, EIP1186ProofResponse, FeeHistory, - Filter, Log, NameOrAddress, Selector, Signature, Trace, TraceFilter, TraceType, - Transaction, TransactionReceipt, TransactionRequest, TxHash, TxpoolContent, TxpoolInspect, - TxpoolStatus, H256, U256, U64, + Filter, FilterBlockOption, Log, NameOrAddress, Selector, Signature, Trace, TraceFilter, + TraceType, Transaction, TransactionReceipt, TransactionRequest, TxHash, TxpoolContent, + TxpoolInspect, TxpoolStatus, H256, U256, U64, }, utils, }; @@ -1102,9 +1102,29 @@ impl Middleware for Provider

{ where P: PubsubClient, { + let logs = match filter.block_option { + FilterBlockOption::Range { from_block, to_block: _ } => { + if from_block.is_none() { + Ok(vec![]) + } else { + self.get_logs(filter).await + } + } + FilterBlockOption::AtBlockHash(_block_hash) => self.get_logs(filter).await, + }; + + if logs.is_err() { + return Err(logs.err().unwrap()) + } + + let loaded_logs = logs.unwrap(); + let logs = utils::serialize(&"logs"); // TODO: Make this a static let filter = utils::serialize(filter); - self.subscribe([logs, filter]).await + self.subscribe([logs, filter]).await.map(|mut stream| { + stream.set_loaded_elements(loaded_logs); + stream + }) } async fn fee_history + Send + Sync>( diff --git a/ethers-providers/src/pubsub.rs b/ethers-providers/src/pubsub.rs index 9aed8252c..79548612f 100644 --- a/ethers-providers/src/pubsub.rs +++ b/ethers-providers/src/pubsub.rs @@ -31,6 +31,8 @@ pub struct SubscriptionStream<'a, P: PubsubClient, R: DeserializeOwned> { /// The subscription's installed id on the ethereum node pub id: U256, + loaded_elements: Vec, + provider: &'a Provider

, #[pin] @@ -54,13 +56,17 @@ where pub fn new(id: U256, provider: &'a Provider

) -> Result { // Call the underlying PubsubClient's subscribe let rx = provider.as_ref().subscribe(id)?; - Ok(Self { id, provider, rx, ret: PhantomData }) + Ok(Self { id, provider, rx, ret: PhantomData, loaded_elements: vec![] }) } /// Unsubscribes from the subscription. pub async fn unsubscribe(&self) -> Result { self.provider.unsubscribe(self.id).await } + + pub fn set_loaded_elements(&mut self, loaded_elements: Vec) { + self.loaded_elements = loaded_elements; + } } // Each subscription item is a serde_json::Value which must be decoded to the @@ -74,6 +80,11 @@ where type Item = R; fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll> { + if self.loaded_elements.len() > 0 { + let next_element = self.get_mut().loaded_elements.remove(0); + return Poll::Ready(Some(next_element)) + } + let this = self.project(); match futures_util::ready!(this.rx.poll_next(ctx)) { Some(item) => match serde_json::from_str(item.get()) {