Skip to content

Commit

Permalink
Refactors tx caching in ShieldedContext
Browse files Browse the repository at this point in the history
  • Loading branch information
grarco committed Jan 8, 2024
1 parent c7637ca commit 01fb2b4
Show file tree
Hide file tree
Showing 18 changed files with 49 additions and 93 deletions.
20 changes: 6 additions & 14 deletions apps/src/lib/node/ledger/shell/finalize_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,8 @@ where
tx_event["hash"]
);
if is_committed_fee_unshield {
tx_event["is_valid_masp_tx"] = String::new();
tx_event["is_valid_masp_tx"] =
format!("{}", tx_index);
}
self.wl_storage.storage.tx_queue.push(TxInQueue {
tx: wrapper.expect("Missing expected wrapper"),
Expand All @@ -440,7 +441,8 @@ where
address::InternalAddress::Masp,
),
) {
tx_event["is_valid_masp_tx"] = String::new();
tx_event["is_valid_masp_tx"] =
format!("{}", tx_index);
}
changed_keys
.extend(result.changed_keys.iter().cloned());
Expand Down Expand Up @@ -468,18 +470,7 @@ where
.map(|ibc_event| {
// Add the IBC event besides the tx_event
let mut event = Event::from(ibc_event);
// Add the height for IBC event query
event["height"] = height.to_string();
if tx_event
.attributes
.contains_key("is_valid_masp_tx")
{
// Add the tx index for masp txs clients
// queries
// FIXME: review this
event["is_valid_masp_tx"] =
tx_index.to_string();
}
event
})
// eth bridge events
Expand Down Expand Up @@ -565,7 +556,8 @@ where
// The fee unshield operation could still have been
// committed
if is_committed_fee_unshield {
tx_event["is_valid_masp_tx"] = String::new();
tx_event["is_valid_masp_tx"] =
format!("{}", tx_index);
}
} else {
tx_event["code"] = ResultCode::WasmRuntimeError.into();
Expand Down
2 changes: 0 additions & 2 deletions core/src/types/ibc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,6 @@ pub fn get_shielded_transfer(
return Ok(None);
}

// FIXME: I should place the is_masp_tx attribute directly on the ibc event
// not in finalize block FIXME: maybe it's not possible
event
.attributes
.get("memo")
Expand Down
120 changes: 43 additions & 77 deletions sdk/src/masp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ use thiserror::Error;
#[cfg(feature = "testing")]
use crate::error::EncodingError;
use crate::error::{Error, PinnedBalanceError, QueryError};
use crate::events::EventType;
use crate::io::Io;
use crate::proto::Tx;
use crate::queries::Client;
Expand Down Expand Up @@ -602,7 +601,7 @@ pub struct ShieldedContext<U: ShieldedUtils> {
#[borsh(skip)]
pub utils: U,
/// The last indexed transaction to be processed in this context
pub last_indexed: IndexedTx,
pub last_indexed: Option<IndexedTx>,
/// The commitment tree produced by scanning all transactions up to tx_pos
pub tree: CommitmentTree<Node>,
// FIXME: review these positions, what do they refer to?
Expand Down Expand Up @@ -635,10 +634,7 @@ impl<U: ShieldedUtils + Default> Default for ShieldedContext<U> {
fn default() -> ShieldedContext<U> {
ShieldedContext::<U> {
utils: U::default(),
last_indexed: IndexedTx {
height: BlockHeight::first(),
index: TxIndex::default(),
},
last_indexed: None,
tree: CommitmentTree::empty(),
pos_map: HashMap::default(),
nf_map: HashMap::default(),
Expand Down Expand Up @@ -725,14 +721,7 @@ impl<U: ShieldedUtils + MaybeSend + MaybeSync> ShieldedContext<U> {
if !unknown_keys.is_empty() {
// Load all transactions accepted until this point
eprintln!("FETCHING AGAIN FROM INDEX 0 BECAUSE NEW KEY"); //FIXME: remove
txs = Self::fetch_shielded_transfers(
client,
IndexedTx {
height: BlockHeight::first(),
index: TxIndex::default(),
},
)
.await?;
txs = Self::fetch_shielded_transfers(client, None).await?;
tx_iter = txs.iter();
// Do this by constructing a shielding context only for unknown keys
let mut tx_ctx = Self {
Expand Down Expand Up @@ -774,9 +763,7 @@ impl<U: ShieldedUtils + MaybeSend + MaybeSync> ShieldedContext<U> {
// FIXME: remove all the unwraps, we are in the sdk here
pub async fn fetch_shielded_transfers<C: Client + Sync>(
client: &C,
// FIXME: just pass the block heigh here? I always query block anyway
// FIXME: should probably also cached only the last block height? Yes
last_indexed_tx: IndexedTx,
last_indexed_tx: Option<IndexedTx>,
) -> Result<BTreeMap<IndexedTx, (Epoch, Transfer, Transaction)>, Error>
{
// Query for the last produced block height
Expand All @@ -786,27 +773,16 @@ impl<U: ShieldedUtils + MaybeSend + MaybeSync> ShieldedContext<U> {

eprintln!("ABOUT TO REQUEST PAGINATED RESULT"); //FIXME: remove
eprintln!("LAST BLOCK HEIGHT: {}", last_block_height); //FIXME: remove
eprintln!("LAST INDEXED HEIGHT: {}", last_indexed_tx.height); //FIXME: remove
eprintln!("LAST INDEXED HEIGHT: {:#?}", last_indexed_tx); //FIXME: remove
let mut shielded_txs = BTreeMap::new();
// Fetch all the transactions we do not have yet
// FIXME: this will actually query another time the already quiered
// previous last block height, should probably increase by one here.
// Actually, even queryin the same block again shouldn't be a problem
// cause I'll simply overwrite the entry in the map in the context
// FIXME: instead it's apparently a problem, the integratio ntests were
// failing becasue of this FIXME: the index starts from 1 so I
// need to check the last indexe height only in this case, for
// the other cases I can start from the following one
// FIXME: refator this if with methods in IndexedTx
let first_height_to_query = if last_indexed_tx.height.0 <= 1 {
1
} else {
last_indexed_tx.height.next_height().0
};
let first_height_to_query =
last_indexed_tx.map_or_else(|| 1, |last| last.height.0);
let first_idx_to_query =
last_indexed_tx.map_or_else(|| 0, |last| last.index.0 + 1);
for height in first_height_to_query..=last_block_height.0 {
eprintln!("IN HEIGHT {height} LOOP"); //FIXME: remove
// Get the valid masp transactions at the specified height
// FIXME: review if we really need extra key for ibc events

let epoch = query_epoch_at_height(client, height.into())
.await?
Expand All @@ -818,11 +794,6 @@ impl<U: ShieldedUtils + MaybeSend + MaybeSync> ShieldedContext<U> {
})?;

eprintln!("REQUESTING BLOCK AT HEIGHT: {}", height); //FIXME: remove
// Paginate the results
// FIXME: I think I'm braking here even before doing the first
// transaction, or better I'm livelocking, the ledger runs but the
// client never submits transactions FIXME: I get to
// here
let txs_results = match client
.block_results(height)
.await
Expand All @@ -832,17 +803,36 @@ impl<U: ShieldedUtils + MaybeSend + MaybeSync> ShieldedContext<U> {
// FIXME: imrpove this match
Some(events) => events
.into_iter()
.enumerate()
.filter(|(_idx, event)| {
// eprintln!("EVENT: {:#?}", event); //FIXME: remove
.filter_map(|event| {
// Filter only the tx events which are valid masp txs
// FIXME: probably no need the condition on the event
// type, it's redundant
(event.kind == EventType::Accepted.to_string()
|| event.kind == EventType::Applied.to_string())
&& event.attributes.iter().any(|attribute| {
&attribute.key == "is_valid_masp_tx"
})
// and that we haven't fetched yet
let tx_index =
event.attributes.iter().find_map(|attribute| {
if attribute.key == "is_valid_masp_tx" {
Some(TxIndex(
// FIXME: ok to unwrap here?
u32::from_str(&attribute.value)
.unwrap(),
))
} else {
None
}
});

match tx_index {
Some(idx) => {
if height == first_height_to_query {
if idx.0 >= first_idx_to_query {
Some((idx, event))
} else {
None
}
} else {
Some((idx, event))
}
}
None => None,
}
})
.collect::<Vec<_>>(),
None => {
Expand All @@ -859,25 +849,9 @@ impl<U: ShieldedUtils + MaybeSend + MaybeSync> ShieldedContext<U> {
.block
.data;

// FIXME: but I don't get to here, I must break in between
eprintln!("SIZE OF RESPONSE: {}", txs_results.len()); //FIXME: remove
// FIXME: seems like we can't find the succesful previous masp
// transaction even though it has been flagged
// FIXME: I never get here becasuse it seems the result is always
// empty
for (idx, tx_event) in &txs_results {
eprintln!("FOUND TRANSACTION"); //FIXME: remove
// FIXME: could I also receive a block height smaller than
// the cached one?
// FIXME: I think this condition is useless because in case I
// just overwrite the entry in the hashmap
// if BlockHeight(height) == last_indexed_tx.height
// && tx_index <= last_indexed_tx.index
// {
// continue;
// }

let tx = Tx::try_from(block[*idx].as_ref())
for (idx, tx_event) in txs_results {
let tx = Tx::try_from(block[idx.0 as usize].as_ref())
.map_err(|e| Error::Other(e.to_string()))?;

let tx_header = tx.header();
Expand All @@ -894,9 +868,6 @@ impl<U: ShieldedUtils + MaybeSend + MaybeSync> ShieldedContext<U> {
let hash = wrapper_header
.unshield_section_hash
.ok_or_else(|| {
// FIXME: error here
// FIXME: probably in MockNode I'm getting the
// wrong index
Error::Other(
"Missing expected fee unshielding section hash"
.to_string(),
Expand Down Expand Up @@ -998,7 +969,7 @@ impl<U: ShieldedUtils + MaybeSend + MaybeSync> ShieldedContext<U> {
shielded_txs.insert(
IndexedTx {
height: height.into(),
index: TxIndex(*idx as u32),
index: idx,
},
(epoch, transfer, masp_transaction),
);
Expand Down Expand Up @@ -1231,7 +1202,7 @@ impl<U: ShieldedUtils + MaybeSend + MaybeSync> ShieldedContext<U> {
change: -tx.amount.amount().change(),
},
);
self.last_indexed = indexed_tx;
self.last_indexed = Some(indexed_tx);

self.delta_map
.insert(indexed_tx, (epoch, transfer_delta, transaction_delta));
Expand Down Expand Up @@ -1701,9 +1672,7 @@ impl<U: ShieldedUtils + MaybeSend + MaybeSync> ShieldedContext<U> {
Error::Other("Missing masp transaction".to_string())
})?,
Err(_) => {
// FIXME: add support for pinned ibc masp txs?
// FIXME: probably need to review also how we do it in
// fewtch_shielded_transfer
// FIXME: add support for pinned ibc masp txs? Yes
return Err(Error::Other("IBC Masp pinned tx".to_string()));
}
};
Expand Down Expand Up @@ -2238,7 +2207,6 @@ impl<U: ShieldedUtils + MaybeSend + MaybeSync> ShieldedContext<U> {
let _ = self.save().await;
// Required for filtering out rejected transactions from Tendermint
// responses
// FIXME: here we query the reulst of only the last block
let block_results = rpc::query_results(client).await?;
let mut transfers = self.get_tx_deltas().clone();
// Construct the set of addresses relevant to user's query
Expand All @@ -2255,8 +2223,6 @@ impl<U: ShieldedUtils + MaybeSend + MaybeSync> ShieldedContext<U> {
for addr in relevant_addrs {
for prop in ["transfer.source", "transfer.target"] {
// Query transactions involving the current address
// FIXME: bnut it seems like here we query all transactions, not
// only those from the last block
let mut tx_query = Query::eq(prop, addr.encode());
// Elaborate the query if requested by the user
if let Some(token) = &query_token {
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.

0 comments on commit 01fb2b4

Please sign in to comment.