Skip to content

Commit

Permalink
add end-to-end test (#3454)
Browse files Browse the repository at this point in the history
  • Loading branch information
mangas authored Apr 19, 2022
1 parent f9e6f79 commit d0d0cc1
Showing 23 changed files with 550 additions and 89 deletions.
15 changes: 13 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -66,6 +66,17 @@ jobs:
command: test
args: --verbose --workspace --exclude graph-tests -- --nocapture

- name: Run runner tests
id: runner-tests-1
uses: actions-rs/cargo@v1
env:
RUSTFLAGS: "-C link-arg=-fuse-ld=lld -D warnings"
TESTS_GANACHE_HARD_WAIT_SECONDS: "30"
with:
command: test
args: --verbose --package graph-tests -- test_runner --nocapture
continue-on-error: true

integration-tests:
name: Run integration tests
strategy:
@@ -119,7 +130,7 @@ jobs:
TESTS_GANACHE_HARD_WAIT_SECONDS: "30"
with:
command: test
args: --verbose --package graph-tests -- --nocapture
args: --verbose --package graph-tests -- --nocapture --skip test_runner
continue-on-error: true
- name: Run integration tests (round 2)
id: integration-tests-2
@@ -131,7 +142,7 @@ jobs:
TESTS_GANACHE_HARD_WAIT_SECONDS: "30"
with:
command: test
args: --verbose --package graph-tests -- --nocapture
args: --verbose --package graph-tests -- --nocapture --skip test_runner

rustfmt:
name: Check rustfmt style
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion chain/ethereum/src/adapter.rs
Original file line number Diff line number Diff line change
@@ -756,7 +756,7 @@ pub struct ProviderEthRpcMetrics {
}

impl ProviderEthRpcMetrics {
pub fn new(registry: Arc<impl MetricsRegistry>) -> Self {
pub fn new(registry: Arc<dyn MetricsRegistry>) -> Self {
let request_duration = registry
.new_histogram_vec(
"eth_rpc_request_duration",
108 changes: 75 additions & 33 deletions chain/ethereum/src/chain.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use anyhow::Result;
use anyhow::{Context, Error};
use graph::blockchain::BlockchainKind;
use graph::data::subgraph::UnifiedMappingApiVersion;
@@ -41,11 +42,74 @@ use crate::{
SubgraphEthRpcMetrics, TriggerFilter, ENV_VARS,
};
use crate::{network::EthereumNetworkAdapters, EthereumAdapter};
use graph::blockchain::block_stream::{BlockStream, FirehoseCursor};
use graph::blockchain::block_stream::{BlockStream, BlockStreamBuilder, FirehoseCursor};

/// Celo Mainnet: 42220, Testnet Alfajores: 44787, Testnet Baklava: 62320
const CELO_CHAIN_IDS: [u64; 3] = [42220, 44787, 62320];

pub struct EthereumStreamBuilder {}

#[async_trait]
impl BlockStreamBuilder<Chain> for EthereumStreamBuilder {
fn build_firehose(
&self,
chain: &Chain,
deployment: DeploymentLocator,
block_cursor: Option<String>,
start_blocks: Vec<BlockNumber>,
subgraph_current_block: Option<BlockPtr>,
filter: Arc<<Chain as Blockchain>::TriggerFilter>,
unified_api_version: UnifiedMappingApiVersion,
) -> Result<Box<dyn BlockStream<Chain>>> {
let requirements = filter.node_capabilities();
let adapter = chain
.triggers_adapter(&deployment, &requirements, unified_api_version)
.expect(&format!(
"no adapter for network {} with capabilities {}",
chain.name, requirements
));

let firehose_endpoint = match chain.firehose_endpoints.random() {
Some(e) => e.clone(),
None => return Err(anyhow::format_err!("no firehose endpoint available",)),
};

let logger = chain
.logger_factory
.subgraph_logger(&deployment)
.new(o!("component" => "FirehoseBlockStream"));

let firehose_mapper = Arc::new(FirehoseMapper {
endpoint: firehose_endpoint.cheap_clone(),
});

Ok(Box::new(FirehoseBlockStream::new(
deployment.hash,
firehose_endpoint,
subgraph_current_block,
block_cursor,
firehose_mapper,
adapter,
filter,
start_blocks,
logger,
chain.registry.clone(),
)))
}

async fn build_polling(
&self,
_chain: Arc<Chain>,
_deployment: DeploymentLocator,
_start_blocks: Vec<BlockNumber>,
_subgraph_current_block: Option<BlockPtr>,
_filter: Arc<<Chain as Blockchain>::TriggerFilter>,
_unified_api_version: UnifiedMappingApiVersion,
) -> Result<Box<dyn BlockStream<Chain>>> {
todo!()
}
}

pub struct Chain {
logger_factory: LoggerFactory,
name: String,
@@ -58,6 +122,7 @@ pub struct Chain {
chain_head_update_listener: Arc<dyn ChainHeadUpdateListener>,
reorg_threshold: BlockNumber,
pub is_ingestible: bool,
block_stream_builder: Arc<dyn BlockStreamBuilder<Self>>,
}

impl std::fmt::Debug for Chain {
@@ -78,6 +143,7 @@ impl Chain {
firehose_endpoints: FirehoseEndpoints,
eth_adapters: EthereumNetworkAdapters,
chain_head_update_listener: Arc<dyn ChainHeadUpdateListener>,
block_stream_builder: Arc<dyn BlockStreamBuilder<Self>>,
reorg_threshold: BlockNumber,
is_ingestible: bool,
) -> Self {
@@ -91,6 +157,7 @@ impl Chain {
chain_store,
call_cache,
chain_head_update_listener,
block_stream_builder,
reorg_threshold,
is_ingestible,
}
@@ -178,40 +245,15 @@ impl Blockchain for Chain {
filter: Arc<Self::TriggerFilter>,
unified_api_version: UnifiedMappingApiVersion,
) -> Result<Box<dyn BlockStream<Self>>, Error> {
let requirements = filter.node_capabilities();
let adapter = self
.triggers_adapter(&deployment, &requirements, unified_api_version)
.expect(&format!(
"no adapter for network {} with capabilities {}",
self.name, requirements
));

let firehose_endpoint = match self.firehose_endpoints.random() {
Some(e) => e.clone(),
None => return Err(anyhow::format_err!("no firehose endpoint available",)),
};

let logger = self
.logger_factory
.subgraph_logger(&deployment)
.new(o!("component" => "FirehoseBlockStream"));

let firehose_mapper = Arc::new(FirehoseMapper {
endpoint: firehose_endpoint.cheap_clone(),
});

Ok(Box::new(FirehoseBlockStream::new(
deployment.hash,
firehose_endpoint,
subgraph_current_block,
self.block_stream_builder.build_firehose(
self,
deployment,
block_cursor,
firehose_mapper,
adapter,
filter,
start_blocks,
logger,
self.registry.clone(),
)))
subgraph_current_block,
filter,
unified_api_version,
)
}

async fn new_polling_block_stream(
2 changes: 1 addition & 1 deletion chain/ethereum/src/lib.rs
Original file line number Diff line number Diff line change
@@ -20,7 +20,7 @@ pub use trigger::MappingTrigger;

pub mod chain;

mod network;
pub mod network;
mod trigger;

pub use crate::adapter::{
100 changes: 71 additions & 29 deletions chain/near/src/chain.rs
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@ use graph::firehose::{FirehoseEndpoint, FirehoseEndpoints};
use graph::prelude::{MetricsRegistry, TryFutureExt};
use graph::{
anyhow,
anyhow::Result,
blockchain::{
block_stream::{
BlockStreamEvent, BlockWithTriggers, FirehoseError,
@@ -29,14 +30,74 @@ use crate::{
codec,
data_source::{DataSource, UnresolvedDataSource},
};
use graph::blockchain::block_stream::BlockStream;
use graph::blockchain::block_stream::{BlockStream, BlockStreamBuilder};

pub struct NearStreamBuilder {}

#[async_trait]
impl BlockStreamBuilder<Chain> for NearStreamBuilder {
fn build_firehose(
&self,
chain: &Chain,
deployment: DeploymentLocator,
block_cursor: Option<String>,
start_blocks: Vec<BlockNumber>,
subgraph_current_block: Option<BlockPtr>,
filter: Arc<<Chain as Blockchain>::TriggerFilter>,
unified_api_version: UnifiedMappingApiVersion,
) -> Result<Box<dyn BlockStream<Chain>>> {
let adapter = chain
.triggers_adapter(&deployment, &NodeCapabilities {}, unified_api_version)
.expect(&format!("no adapter for network {}", chain.name,));

let firehose_endpoint = match chain.firehose_endpoints.random() {
Some(e) => e.clone(),
None => return Err(anyhow::format_err!("no firehose endpoint available")),
};

let logger = chain
.logger_factory
.subgraph_logger(&deployment)
.new(o!("component" => "FirehoseBlockStream"));

let firehose_mapper = Arc::new(FirehoseMapper {
endpoint: firehose_endpoint.cheap_clone(),
});

Ok(Box::new(FirehoseBlockStream::new(
deployment.hash,
firehose_endpoint,
subgraph_current_block,
block_cursor,
firehose_mapper,
adapter,
filter,
start_blocks,
logger,
chain.metrics_registry.clone(),
)))
}

async fn build_polling(
&self,
_chain: Arc<Chain>,
_deployment: DeploymentLocator,
_start_blocks: Vec<BlockNumber>,
_subgraph_current_block: Option<BlockPtr>,
_filter: Arc<<Chain as Blockchain>::TriggerFilter>,
_unified_api_version: UnifiedMappingApiVersion,
) -> Result<Box<dyn BlockStream<Chain>>> {
todo!()
}
}

pub struct Chain {
logger_factory: LoggerFactory,
name: String,
firehose_endpoints: Arc<FirehoseEndpoints>,
chain_store: Arc<dyn ChainStore>,
metrics_registry: Arc<dyn MetricsRegistry>,
block_stream_builder: Arc<dyn BlockStreamBuilder<Self>>,
}

impl std::fmt::Debug for Chain {
@@ -52,13 +113,15 @@ impl Chain {
chain_store: Arc<dyn ChainStore>,
firehose_endpoints: FirehoseEndpoints,
metrics_registry: Arc<dyn MetricsRegistry>,
block_stream_builder: Arc<dyn BlockStreamBuilder<Self>>,
) -> Self {
Chain {
logger_factory,
name,
firehose_endpoints: Arc::new(firehose_endpoints),
chain_store,
metrics_registry,
block_stream_builder,
}
}
}
@@ -108,36 +171,15 @@ impl Blockchain for Chain {
filter: Arc<Self::TriggerFilter>,
unified_api_version: UnifiedMappingApiVersion,
) -> Result<Box<dyn BlockStream<Self>>, Error> {
let adapter = self
.triggers_adapter(&deployment, &NodeCapabilities {}, unified_api_version)
.expect(&format!("no adapter for network {}", self.name,));

let firehose_endpoint = match self.firehose_endpoints.random() {
Some(e) => e.clone(),
None => return Err(anyhow::format_err!("no firehose endpoint available")),
};

let logger = self
.logger_factory
.subgraph_logger(&deployment)
.new(o!("component" => "FirehoseBlockStream"));

let firehose_mapper = Arc::new(FirehoseMapper {
endpoint: firehose_endpoint.cheap_clone(),
});

Ok(Box::new(FirehoseBlockStream::new(
deployment.hash,
firehose_endpoint,
subgraph_current_block,
self.block_stream_builder.build_firehose(
self,
deployment,
block_cursor,
firehose_mapper,
adapter,
filter,
start_blocks,
logger,
self.metrics_registry.clone(),
)))
subgraph_current_block,
filter,
unified_api_version,
)
}

async fn new_polling_block_stream(
2 changes: 1 addition & 1 deletion chain/near/src/codec.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#[path = "protobuf/sf.near.codec.v1.rs"]
mod pbcodec;
pub mod pbcodec;

use graph::{
blockchain::Block as BlockchainBlock,
4 changes: 2 additions & 2 deletions chain/near/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
mod adapter;
mod capabilities;
mod chain;
mod codec;
pub mod codec;
mod data_source;
mod runtime;
mod trigger;

pub use crate::chain::Chain;
pub use codec::Block;
pub use crate::chain::NearStreamBuilder;
pub use codec::HeaderOnlyBlock;
Loading

0 comments on commit d0d0cc1

Please sign in to comment.