Skip to content

Commit

Permalink
feat(providers): load previous logs before subscribing (gakonst#1264)
Browse files Browse the repository at this point in the history
* feat(providers): load previous logs before subscribing

Load previous logs and stream it back to the user before establishing
a new stream for streaming future logs.

Closes gakonst#988

* docs: add subscribe_logs example

* fix clippy errors

* refactor: use VecDeque and address review
  • Loading branch information
meetmangukiya authored May 16, 2022
1 parent 3df1527 commit b15d0f8
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 6 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@

### Unreleased

- Load previous logs before subscribing to new logs in case fromBlock is set
[1264](https://github.com/gakonst/ethers-rs/pull/1264)
- Add retries to the pending transaction future
[1221](https://github.com/gakonst/ethers-rs/pull/1221)
- Add support for basic and bearer authentication in http and non-wasm websockets.
Expand Down
27 changes: 22 additions & 5 deletions ethers-providers/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ use ethers_core::{
types::{
transaction::{eip2718::TypedTransaction, eip2930::AccessListWithGasUsed},
Address, Block, BlockId, BlockNumber, BlockTrace, Bytes, EIP1186ProofResponse, FeeHistory,
Filter, Log, NameOrAddress, Selector, Signature, Trace, TraceFilter, TraceType,
Transaction, TransactionReceipt, TransactionRequest, TxHash, TxpoolContent, TxpoolInspect,
TxpoolStatus, H256, U256, U64,
Filter, FilterBlockOption, Log, NameOrAddress, Selector, Signature, Trace, TraceFilter,
TraceType, Transaction, TransactionReceipt, TransactionRequest, TxHash, TxpoolContent,
TxpoolInspect, TxpoolStatus, H256, U256, U64,
},
utils,
};
Expand All @@ -28,7 +28,9 @@ use thiserror::Error;
use url::{ParseError, Url};

use futures_util::{lock::Mutex, try_join};
use std::{convert::TryFrom, fmt::Debug, str::FromStr, sync::Arc, time::Duration};
use std::{
collections::VecDeque, convert::TryFrom, fmt::Debug, str::FromStr, sync::Arc, time::Duration,
};
use tracing::trace;
use tracing_futures::Instrument;

Expand Down Expand Up @@ -1102,9 +1104,24 @@ impl<P: JsonRpcClient> Middleware for Provider<P> {
where
P: PubsubClient,
{
let loaded_logs = match filter.block_option {
FilterBlockOption::Range { from_block, to_block: _ } => {
if from_block.is_none() {
vec![]
} else {
self.get_logs(filter).await?
}
}
FilterBlockOption::AtBlockHash(_block_hash) => self.get_logs(filter).await?,
};
let loaded_logs = VecDeque::from(loaded_logs);

let logs = utils::serialize(&"logs"); // TODO: Make this a static
let filter = utils::serialize(filter);
self.subscribe([logs, filter]).await
self.subscribe([logs, filter]).await.map(|mut stream| {
stream.set_loaded_elements(loaded_logs);
stream
})
}

async fn fee_history<T: Into<U256> + Send + Sync>(
Expand Down
14 changes: 13 additions & 1 deletion ethers-providers/src/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use pin_project::{pin_project, pinned_drop};
use serde::de::DeserializeOwned;
use serde_json::value::RawValue;
use std::{
collections::VecDeque,
marker::PhantomData,
pin::Pin,
task::{Context, Poll},
Expand All @@ -31,6 +32,8 @@ pub struct SubscriptionStream<'a, P: PubsubClient, R: DeserializeOwned> {
/// The subscription's installed id on the ethereum node
pub id: U256,

loaded_elements: VecDeque<R>,

provider: &'a Provider<P>,

#[pin]
Expand All @@ -54,13 +57,17 @@ where
pub fn new(id: U256, provider: &'a Provider<P>) -> Result<Self, P::Error> {
// Call the underlying PubsubClient's subscribe
let rx = provider.as_ref().subscribe(id)?;
Ok(Self { id, provider, rx, ret: PhantomData })
Ok(Self { id, provider, rx, ret: PhantomData, loaded_elements: VecDeque::new() })
}

/// Unsubscribes from the subscription.
pub async fn unsubscribe(&self) -> Result<bool, crate::ProviderError> {
self.provider.unsubscribe(self.id).await
}

pub fn set_loaded_elements(&mut self, loaded_elements: VecDeque<R>) {
self.loaded_elements = loaded_elements;
}
}

// Each subscription item is a serde_json::Value which must be decoded to the
Expand All @@ -74,6 +81,11 @@ where
type Item = R;

fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
if !self.loaded_elements.is_empty() {
let next_element = self.get_mut().loaded_elements.pop_front();
return Poll::Ready(next_element)
}

let this = self.project();
match futures_util::ready!(this.rx.poll_next(ctx)) {
Some(item) => match serde_json::from_str(item.get()) {
Expand Down
34 changes: 34 additions & 0 deletions examples/subscribe_logs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use ethers::{abi::AbiDecode, prelude::*, utils::keccak256};
use eyre::Result;
use std::sync::Arc;

#[tokio::main]
async fn main() -> Result<()> {
let client =
Provider::<Ws>::connect("wss://mainnet.infura.io/ws/v3/c60b0bb42f8a4c6481ecd229eddaca27")
.await?;
let client = Arc::new(client);

let last_block = client.get_block(BlockNumber::Latest).await?.unwrap().number.unwrap();
println!("last_block: {}", last_block);

let erc20_transfer_filter = Filter::new()
.from_block(last_block - 25)
.topic0(ValueOrArray::Value(H256::from(keccak256("Transfer(address,address,uint256)"))));

let mut stream = client.subscribe_logs(&erc20_transfer_filter).await?;

while let Some(log) = stream.next().await {
println!(
"block: {:?}, tx: {:?}, token: {:?}, from: {:?}, to: {:?}, amount: {:?}",
log.block_number,
log.transaction_hash,
log.address,
Address::from(log.topics[1]),
Address::from(log.topics[2]),
U256::decode(log.data)
);
}

Ok(())
}

0 comments on commit b15d0f8

Please sign in to comment.