Skip to content

Commit

Permalink
refactor(indexer): Update Indexer Framework 0.9.0 (IndexerShard) (#4245)
Browse files Browse the repository at this point in the history
* Add IndexerShard structure
* Bump version to 0.9.0
* update CHANGELOG

Also I've updated `example-indexer` to keep it up to date and changed the commented example of `StreamerMessage` it was a bit outdated.
  • Loading branch information
khorolets authored Apr 26, 2021
1 parent c116d83 commit 379f87e
Show file tree
Hide file tree
Showing 10 changed files with 292 additions and 211 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 1 addition & 8 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2027,14 +2027,7 @@ impl Chain {
block_hash: &CryptoHash,
) -> Result<HashMap<ShardId, Vec<ExecutionOutcomeWithIdAndProof>>, Error> {
let block = self.get_block(block_hash)?;
let block_height = block.header().height();
let chunk_headers = block
.chunks()
.iter()
.filter_map(
|h| if h.height_included() == block_height { Some(h.clone()) } else { None },
)
.collect::<Vec<_>>();
let chunk_headers = block.chunks().iter().cloned().collect::<Vec<_>>();

let mut res = HashMap::new();
for chunk_header in chunk_headers {
Expand Down
20 changes: 20 additions & 0 deletions chain/indexer/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,25 @@
# Changelog

## 0.9.0

* Introduce `IndexerShard` structure which contains corresponding chunks and `IndexerExecutionOutcomeWithReceipt`
* `receipt` field in `IndexerExecutionOutcomeWithReceipt` is no longer optional as it used to be always set anyway,
so now we explicitly communicate this relation ("every outcome has a corresponding receipt") through the type system
* Introduce `IndexerExecutionOutcomeWithOptionalReceipt` which is the same as `IndexerExecutionOutcomeWithReceipt`
but with optional `receipt` field.

## Breaking changes

* `IndexerChunkView` doesn't contain field `receipt_execution_outcomes` anymore, this field has been moved to `IndexerShard`
* `StreamerMessage` structure was aligned more with NEAR Protocol specification and now looks like:
```
StreamerMessage {
block: BlockView,
shards: Vec<IndexerShard>,
state_changes: StateChangesView,
}
```

## 0.8.1

* Add `InitConfigArgs` and `indexer_init_configs`
Expand Down
1 change: 1 addition & 0 deletions chain/indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ tokio = { version = "1.1", features = ["time", "sync"] }

neard = { path = "../../neard" }
near-client = { path = "../client" }
near-chain-configs = { path = "../../core/chain-configs" }
near-crypto = { path = "../../core/crypto" }
near-primitives = { path = "../../core/primitives" }
node-runtime = { path = "../../runtime/runtime" }
4 changes: 2 additions & 2 deletions chain/indexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ pub use neard::{get_default_home, init_configs, NearConfig};
mod streamer;

pub use self::streamer::{
IndexerChunkView, IndexerExecutionOutcomeWithReceipt, IndexerTransactionWithOutcome,
IndexerChunkView, IndexerExecutionOutcomeWithOptionalReceipt,
IndexerExecutionOutcomeWithReceipt, IndexerShard, IndexerTransactionWithOutcome,
StreamerMessage,
};
pub use near_primitives;
Expand Down Expand Up @@ -102,7 +103,6 @@ impl Indexer {
actix::spawn(streamer::start(
self.view_client.clone(),
self.client.clone(),
self.near_config.clone(),
self.indexer_config.clone(),
sender,
));
Expand Down
28 changes: 21 additions & 7 deletions chain/indexer/src/streamer/fetchers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub use near_primitives::hash::CryptoHash;
pub use near_primitives::{types, views};

use super::errors::FailedToFetchData;
use super::types::IndexerExecutionOutcomeWithReceipt;
use super::types::IndexerExecutionOutcomeWithOptionalReceipt;
use super::INDEXER;

pub(crate) async fn fetch_status(
Expand Down Expand Up @@ -76,12 +76,12 @@ async fn fetch_single_chunk(
}

/// Fetch all ExecutionOutcomeWithId for current block
/// Returns a HashMap where the key is Receipt id or Transaction hash and the value is ExecutionOutcome wth id and proof
/// Returns a HashMap where the key is shard id IndexerExecutionOutcomeWithOptionalReceipt
pub(crate) async fn fetch_outcomes(
client: &Addr<near_client::ViewClientActor>,
block_hash: CryptoHash,
) -> Result<
HashMap<near_primitives::types::ShardId, Vec<IndexerExecutionOutcomeWithReceipt>>,
HashMap<near_primitives::types::ShardId, Vec<IndexerExecutionOutcomeWithOptionalReceipt>>,
FailedToFetchData,
> {
let outcomes = client
Expand All @@ -91,10 +91,10 @@ pub(crate) async fn fetch_outcomes(

let mut shard_execution_outcomes_with_receipts: HashMap<
near_primitives::types::ShardId,
Vec<IndexerExecutionOutcomeWithReceipt>,
Vec<IndexerExecutionOutcomeWithOptionalReceipt>,
> = HashMap::new();
for (shard_id, shard_outcomes) in outcomes {
let mut outcomes_with_receipts: Vec<IndexerExecutionOutcomeWithReceipt> = vec![];
let mut outcomes_with_receipts: Vec<IndexerExecutionOutcomeWithOptionalReceipt> = vec![];
for outcome in shard_outcomes {
let receipt = match fetch_receipt_by_id(&client, outcome.id).await {
Ok(res) => res,
Expand All @@ -108,8 +108,10 @@ pub(crate) async fn fetch_outcomes(
None
}
};
outcomes_with_receipts
.push(IndexerExecutionOutcomeWithReceipt { execution_outcome: outcome, receipt });
outcomes_with_receipts.push(IndexerExecutionOutcomeWithOptionalReceipt {
execution_outcome: outcome,
receipt,
});
}
shard_execution_outcomes_with_receipts.insert(shard_id, outcomes_with_receipts);
}
Expand Down Expand Up @@ -148,3 +150,15 @@ pub(crate) async fn fetch_chunks(

Ok(response)
}

pub(crate) async fn fetch_protocol_config(
client: &Addr<near_client::ViewClientActor>,
block_hash: near_primitives::hash::CryptoHash,
) -> Result<near_chain_configs::ProtocolConfigView, FailedToFetchData> {
Ok(client
.send(near_client::GetProtocolConfig(types::BlockReference::from(types::BlockId::Hash(
block_hash,
))))
.await?
.map_err(|err| FailedToFetchData::String(err.to_string()))?)
}
68 changes: 52 additions & 16 deletions chain/indexer/src/streamer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ use self::fetchers::{
fetch_status,
};
pub use self::types::{
IndexerChunkView, IndexerExecutionOutcomeWithReceipt, IndexerTransactionWithOutcome,
IndexerChunkView, IndexerExecutionOutcomeWithOptionalReceipt,
IndexerExecutionOutcomeWithReceipt, IndexerShard, IndexerTransactionWithOutcome,
StreamerMessage,
};
use self::utils::convert_transactions_sir_into_local_receipts;
use crate::streamer::fetchers::fetch_protocol_config;

mod errors;
mod fetchers;
Expand All @@ -35,7 +37,6 @@ const INTERVAL: Duration = Duration::from_millis(500);
async fn build_streamer_message(
client: &Addr<near_client::ViewClientActor>,
block: views::BlockView,
near_config: &neard::NearConfig,
) -> Result<StreamerMessage, FailedToFetchData> {
let chunks_to_fetch = block
.chunks
Expand All @@ -50,16 +51,30 @@ async fn build_streamer_message(
.collect::<Vec<_>>();
let chunks = fetch_chunks(&client, chunks_to_fetch).await?;

let mut local_receipts: Vec<views::ReceiptView> = vec![];
let protocol_config_view = fetch_protocol_config(&client, block.header.hash).await?;
let num_shards = protocol_config_view.num_block_producer_seats_per_shard.len()
as near_primitives::types::NumShards;

let mut shards_outcomes = fetch_outcomes(&client, block.header.hash).await?;
let mut indexer_chunks: Vec<IndexerChunkView> = vec![];
let mut indexer_shards: Vec<IndexerShard> = vec![];

for shard_id in 0..num_shards {
indexer_shards.push(IndexerShard {
shard_id,
chunk: None,
receipt_execution_outcomes: vec![],
})
}

for chunk in chunks {
let views::ChunkView { transactions, author, header, receipts: chunk_non_local_receipts } =
chunk;

let mut outcomes = shards_outcomes
.remove(&header.shard_id)
.expect("Execution outcomes for given shard should be present");

// Take execution outcomes for receipts from the vec and keep only the ones for transactions
let mut receipt_outcomes = outcomes.split_off(transactions.len());

let indexer_transactions = transactions
Expand All @@ -76,7 +91,7 @@ async fn build_streamer_message(

let chunk_local_receipts = convert_transactions_sir_into_local_receipts(
&client,
near_config,
&protocol_config_view,
indexer_transactions
.iter()
.filter(|tx| tx.transaction.signer_id == tx.transaction.receiver_id)
Expand All @@ -85,13 +100,8 @@ async fn build_streamer_message(
)
.await?;

local_receipts.extend_from_slice(&chunk_local_receipts);

let mut chunk_receipts = chunk_local_receipts;
chunk_receipts.extend(chunk_non_local_receipts);

// Add local receipts to corresponding outcomes
for receipt in &local_receipts {
for receipt in &chunk_local_receipts {
if let Some(outcome) = receipt_outcomes
.iter_mut()
.find(|outcome| outcome.execution_outcome.id == receipt.receipt_id)
Expand All @@ -101,18 +111,45 @@ async fn build_streamer_message(
}
}

indexer_chunks.push(IndexerChunkView {
let mut chunk_receipts = chunk_local_receipts;
chunk_receipts.extend(chunk_non_local_receipts);

let shard_id = header.shard_id.clone() as usize;

indexer_shards[shard_id].receipt_execution_outcomes = receipt_outcomes
.into_iter()
.map(|IndexerExecutionOutcomeWithOptionalReceipt { execution_outcome, receipt }| {
IndexerExecutionOutcomeWithReceipt {
execution_outcome,
receipt: receipt.expect("`receipt` must be present at this moment"),
}
})
.collect();

// Put the chunk into corresponding indexer shard
indexer_shards[shard_id].chunk = Some(IndexerChunkView {
author,
header,
transactions: indexer_transactions,
receipts: chunk_receipts,
receipt_execution_outcomes: receipt_outcomes,
});
}

// Ideally we expect `shards_outcomes` to be empty by this time, but if something went wrong with
// chunks and we end up with non-empty `shards_outcomes` we want to be sure we put them into IndexerShard
// That might happen before the fix https://github.com/near/nearcore/pull/4228
for (shard_id, outcomes) in shards_outcomes {
indexer_shards[shard_id as usize].receipt_execution_outcomes.extend(
outcomes.into_iter().map(|outcome| IndexerExecutionOutcomeWithReceipt {
execution_outcome: outcome.execution_outcome,
receipt: outcome.receipt.expect("`receipt` must be present at this moment"),
}),
)
}

let state_changes = fetch_state_changes(&client, block.header.hash).await?;

Ok(StreamerMessage { block, chunks: indexer_chunks, state_changes })
Ok(StreamerMessage { block, shards: indexer_shards, state_changes })
}

/// Function that starts Streamer's busy loop. Every half a seconds it fetches the status
Expand All @@ -122,7 +159,6 @@ async fn build_streamer_message(
pub(crate) async fn start(
view_client: Addr<near_client::ViewClientActor>,
client: Addr<near_client::ClientActor>,
near_config: neard::NearConfig,
indexer_config: IndexerConfig,
blocks_sink: mpsc::Sender<StreamerMessage>,
) {
Expand Down Expand Up @@ -180,7 +216,7 @@ pub(crate) async fn start(
);
for block_height in start_syncing_block_height..=latest_block_height {
if let Ok(block) = fetch_block_by_height(&view_client, block_height).await {
let response = build_streamer_message(&view_client, block, &near_config).await;
let response = build_streamer_message(&view_client, block).await;

match response {
Ok(streamer_message) => {
Expand Down
20 changes: 16 additions & 4 deletions chain/indexer/src/streamer/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ pub use near_primitives::{types, views};
#[derive(Debug, Serialize, Deserialize)]
pub struct StreamerMessage {
pub block: views::BlockView,
pub chunks: Vec<IndexerChunkView>,
pub shards: Vec<IndexerShard>,
pub state_changes: views::StateChangesView,
}

Expand All @@ -17,17 +17,29 @@ pub struct IndexerChunkView {
pub header: views::ChunkHeaderView,
pub transactions: Vec<IndexerTransactionWithOutcome>,
pub receipts: Vec<views::ReceiptView>,
pub receipt_execution_outcomes: Vec<IndexerExecutionOutcomeWithReceipt>,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct IndexerTransactionWithOutcome {
pub transaction: views::SignedTransactionView,
pub outcome: IndexerExecutionOutcomeWithReceipt,
pub outcome: IndexerExecutionOutcomeWithOptionalReceipt,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct IndexerExecutionOutcomeWithReceipt {
pub struct IndexerExecutionOutcomeWithOptionalReceipt {
pub execution_outcome: views::ExecutionOutcomeWithIdView,
pub receipt: Option<views::ReceiptView>,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct IndexerExecutionOutcomeWithReceipt {
pub execution_outcome: views::ExecutionOutcomeWithIdView,
pub receipt: views::ReceiptView,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct IndexerShard {
pub shard_id: types::ShardId,
pub chunk: Option<IndexerChunkView>,
pub receipt_execution_outcomes: Vec<IndexerExecutionOutcomeWithReceipt>,
}
6 changes: 3 additions & 3 deletions chain/indexer/src/streamer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use super::IndexerTransactionWithOutcome;

pub(crate) async fn convert_transactions_sir_into_local_receipts(
client: &Addr<near_client::ViewClientActor>,
near_config: &neard::NearConfig,
protocol_config: &near_chain_configs::ProtocolConfigView,
txs: Vec<&IndexerTransactionWithOutcome>,
block: &views::BlockView,
) -> Result<Vec<views::ReceiptView>, FailedToFetchData> {
Expand All @@ -22,7 +22,7 @@ pub(crate) async fn convert_transactions_sir_into_local_receipts(
txs.into_iter()
.map(|tx| {
let cost = tx_cost(
&near_config.genesis.config.runtime_config.transaction_costs,
&protocol_config.runtime_config.transaction_costs,
&near_primitives::transaction::Transaction {
signer_id: tx.transaction.signer_id.clone(),
public_key: tx.transaction.public_key.clone(),
Expand All @@ -41,7 +41,7 @@ pub(crate) async fn convert_transactions_sir_into_local_receipts(
},
prev_block_gas_price,
true,
near_config.genesis.config.protocol_version,
protocol_config.clone().protocol_version,
);
views::ReceiptView {
predecessor_id: tx.transaction.signer_id.clone(),
Expand Down
Loading

0 comments on commit 379f87e

Please sign in to comment.