From b6808268f78f0bdb023ac631a5c61d7d0b7ea137 Mon Sep 17 00:00:00 2001 From: Matthieu Vachon Date: Mon, 8 Nov 2021 13:57:59 -0500 Subject: [PATCH 1/4] First working implementation of Solana --- Cargo.lock | 29 ++ Cargo.toml | 2 + chain/solana/Cargo.toml | 25 + chain/solana/build.rs | 8 + chain/solana/proto/codec.proto | 142 ++++++ chain/solana/src/adapter.rs | 42 ++ chain/solana/src/capabilities.rs | 37 ++ chain/solana/src/chain.rs | 299 +++++++++++ chain/solana/src/codec.rs | 52 ++ chain/solana/src/data_source.rs | 406 +++++++++++++++ chain/solana/src/lib.rs | 10 + chain/solana/src/protobuf/google.protobuf.rs | 1 + .../solana/src/protobuf/sf.solana.codec.v1.rs | 175 +++++++ chain/solana/src/runtime/abi.rs | 439 ++++++++++++++++ chain/solana/src/runtime/generated.rs | 306 +++++++++++ chain/solana/src/runtime/mod.rs | 6 + chain/solana/src/runtime/runtime_adapter.rs | 11 + chain/solana/src/trigger.rs | 482 ++++++++++++++++++ core/Cargo.toml | 1 + core/src/subgraph/instance_manager.rs | 7 + core/src/subgraph/registrar.rs | 14 + graph/src/blockchain/mod.rs | 5 + graph/src/blockchain/types.rs | 10 +- graph/src/firehose/endpoints.rs | 6 +- graph/src/runtime/mod.rs | 15 + node/Cargo.toml | 1 + node/src/main.rs | 77 ++- server/index-node/Cargo.toml | 1 + server/index-node/src/resolver.rs | 16 + 29 files changed, 2621 insertions(+), 4 deletions(-) create mode 100644 chain/solana/Cargo.toml create mode 100644 chain/solana/build.rs create mode 100644 chain/solana/proto/codec.proto create mode 100644 chain/solana/src/adapter.rs create mode 100644 chain/solana/src/capabilities.rs create mode 100644 chain/solana/src/chain.rs create mode 100644 chain/solana/src/codec.rs create mode 100644 chain/solana/src/data_source.rs create mode 100644 chain/solana/src/lib.rs create mode 100644 chain/solana/src/protobuf/google.protobuf.rs create mode 100644 chain/solana/src/protobuf/sf.solana.codec.v1.rs create mode 100644 chain/solana/src/runtime/abi.rs create mode 100644 chain/solana/src/runtime/generated.rs create mode 100644 chain/solana/src/runtime/mod.rs create mode 100644 chain/solana/src/runtime/runtime_adapter.rs create mode 100644 chain/solana/src/trigger.rs diff --git a/Cargo.lock b/Cargo.lock index b678098d167..7ad67d69d9e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -191,6 +191,12 @@ dependencies = [ "rustc-demangle", ] +[[package]] +name = "base58" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6107fe1be6682a68940da878d9e9f5e90ca5745b3dec9fd1bb393c8777d4f581" + [[package]] name = "base64" version = "0.12.3" @@ -1522,6 +1528,26 @@ dependencies = [ "tonic-build", ] +[[package]] +name = "graph-chain-solana" +version = "0.24.1" +dependencies = [ + "base58", + "base64 0.13.0", + "diesel", + "graph", + "graph-core", + "graph-runtime-derive", + "graph-runtime-wasm", + "graph-store-postgres", + "pretty_assertions 0.7.2", + "prost", + "prost-types", + "serde", + "test-store", + "tonic-build", +] + [[package]] name = "graph-core" version = "0.25.0" @@ -1537,6 +1563,7 @@ dependencies = [ "graph", "graph-chain-ethereum", "graph-chain-near", + "graph-chain-solana", "graph-mock", "graph-runtime-wasm", "graphql-parser", @@ -1596,6 +1623,7 @@ dependencies = [ "graph", "graph-chain-ethereum", "graph-chain-near", + "graph-chain-solana", "graph-core", "graph-graphql", "graph-runtime-wasm", @@ -1690,6 +1718,7 @@ dependencies = [ "graph", "graph-chain-ethereum", "graph-chain-near", + "graph-chain-solana", "graph-graphql", "graphql-parser", "http 0.2.5", diff --git a/Cargo.toml b/Cargo.toml index 66edf4c25b7..ec5e182632a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,6 +2,8 @@ members = [ "core", "chain/ethereum", + "chain/near", + "chain/solana", "graphql", "mock", "node", diff --git a/chain/solana/Cargo.toml b/chain/solana/Cargo.toml new file mode 100644 index 00000000000..a8b535f2079 --- /dev/null +++ b/chain/solana/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "graph-chain-solana" +version = "0.24.1" +edition = "2018" + +[build-dependencies] +tonic-build = "0.5.1" + +[dependencies] +base64 = "0.13" +graph = { path = "../../graph" } +prost = "0.8.0" +prost-types = "0.8.0" +serde = "1.0" +base58 = "0.2.0" + +graph-runtime-wasm = { path = "../../runtime/wasm" } +graph-runtime-derive = { path = "../../runtime/derive" } + +[dev-dependencies] +diesel = { version = "1.4.7", features = ["postgres", "serde_json", "numeric", "r2d2"] } +graph-core = { path = "../../core" } +graph-store-postgres = { path = "../../store/postgres" } +pretty_assertions = "0.7.2" +test-store = { path = "../../store/test-store" } diff --git a/chain/solana/build.rs b/chain/solana/build.rs new file mode 100644 index 00000000000..0ac59996bb0 --- /dev/null +++ b/chain/solana/build.rs @@ -0,0 +1,8 @@ +fn main() { + println!("cargo:rerun-if-changed=proto"); + tonic_build::configure() + .out_dir("src/protobuf") + .format(true) + .compile(&["proto/codec.proto"], &["proto"]) + .expect("Failed to compile StreamingFast Solana proto(s)"); +} diff --git a/chain/solana/proto/codec.proto b/chain/solana/proto/codec.proto new file mode 100644 index 00000000000..14c94fb8d6c --- /dev/null +++ b/chain/solana/proto/codec.proto @@ -0,0 +1,142 @@ +syntax = "proto3"; + +package sf.solana.codec.v1; + +option go_package = "github.com/streamingfast/sf-solana/pb/sf/solana/codec/v1;pbcodec"; + +import "google/protobuf/any.proto"; + +message Block { + bytes id = 1; // corresponds to the Slot id (or hash) + uint64 number = 2; // corresponds to the Slot number for this block + uint32 version = 3; + bytes previous_id = 4; // corresponds to the previous_blockhash, might skip some slots, so beware + uint64 previous_block = 5; + uint64 genesis_unix_timestamp = 6; + uint64 clock_unix_timestamp = 7; + uint64 root_num = 8; + + bytes last_entry_hash = 9; + + repeated Transaction transactions = 10; + uint32 transaction_count = 11; + + bool has_split_account_changes = 12; + string account_changes_file_ref = 13; +} + +message Batch { + repeated Transaction transactions = 1; +} + +// Bundled in separate files, referenced by `account_changes_file_ref` +message AccountChangesBundle { + // Maps to the index of the `repeated` field for Block::transactions + repeated AccountChangesPerTrxIndex transactions = 1; +} + +message AccountChangesPerTrxIndex { + bytes TrxId = 1; + + // Maps to the index within the `repeated` field of the proto for + // Transaction::instructions + repeated AccountChangesPerInstruction instructions = 2; +} + +message AccountChangesPerInstruction { + // Data to be put in Instruction::account_changes + repeated AccountChange changes = 1; +} + +message Transaction { + // The transaction ID corresponds to the _first_ + // signature. Additional signatures are in `additional_signatures`. + bytes id = 1; + + // Index from within a single Slot, deterministically ordered to the + // best of our ability using the transaction ID as a sort key for + // the batch of transactions executed in parallel. + uint64 index = 2; + + repeated bytes additional_signatures = 3; + + MessageHeader header = 4; + // From the original Message object + repeated bytes account_keys = 5; + // From the original Message object + bytes recent_blockhash = 6; + + // What follows Once executed these can be set: + repeated string log_messages = 7; + // Instructions, containing both top-level and nested transactions + repeated Instruction instructions = 8; + + bool failed = 9; + TransactionError error = 10; +} + +message MessageHeader { + uint32 num_required_signatures = 1; + uint32 num_readonly_signed_accounts = 2; + uint32 num_readonly_unsigned_accounts = 3; +} + + +/** +- instr1 (id=1, parent=0) +- instr2 (id=2, parent=0) (pubkey1 is writable) + - instr3 (id=3, parent=2) (pubkey1 is writable) + - instr4 (id=4, parent=3) (pubkey1 is writable) + - instr5 (id=5, parent=4) (pubkey1 is writable, mutates pubkey1) + collect delta of pubkey1 + collect delta of pubkey1 ONLY IF CHANGED AGAIN, from last time we took a snapshot of it. + collect delta of pubkey1 +- instr6 (id=6, parent=0) + */ + +message Instruction { + bytes program_id = 3; + repeated bytes account_keys = 4; + bytes data = 5; + + // What follows is execution trace data, could be empty for un-executed transactions. + + uint32 ordinal = 6; + uint32 parent_ordinal = 7; + uint32 depth = 8; + + repeated BalanceChange balance_changes = 9; + repeated AccountChange account_changes = 10; + + bool failed = 15; + InstructionError error = 16; +} + +message BalanceChange { + bytes pubkey = 1; + uint64 prev_lamports = 2; + uint64 new_lamports = 3; +} + +message AccountChange { + bytes pubkey = 1; + bytes prev_data = 2; + bytes new_data = 3; + uint64 new_data_length = 4; +} + +message TransactionError { + string error = 2; +} + +message TransactionInstructionError { + string error = 2; +} + +message InstructionError { + string error = 2; +} + +message InstructionErrorCustom { + string error = 2; +} \ No newline at end of file diff --git a/chain/solana/src/adapter.rs b/chain/solana/src/adapter.rs new file mode 100644 index 00000000000..bf55d6148d3 --- /dev/null +++ b/chain/solana/src/adapter.rs @@ -0,0 +1,42 @@ +use crate::capabilities::NodeCapabilities; +use crate::{data_source::DataSource, Chain}; +use graph::blockchain as bc; +use graph::prelude::*; + +#[derive(Clone, Debug, Default)] +pub struct TriggerFilter { + pub(crate) block: SolanaBlockFilter, +} + +impl bc::TriggerFilter for TriggerFilter { + fn extend<'a>(&mut self, data_sources: impl Iterator + Clone) { + self.block + .extend(SolanaBlockFilter::from_data_sources(data_sources)); + } + + fn node_capabilities(&self) -> NodeCapabilities { + NodeCapabilities {} + } +} + +#[derive(Clone, Debug, Default)] +pub(crate) struct SolanaBlockFilter { + pub trigger_every_block: bool, +} + +impl SolanaBlockFilter { + pub fn from_data_sources<'a>(iter: impl IntoIterator) -> Self { + iter.into_iter() + .filter(|data_source| data_source.source.program_id.is_some()) + .fold(Self::default(), |mut filter_opt, _data_source| { + filter_opt.extend(Self { + trigger_every_block: true, + }); + filter_opt + }) + } + + pub fn extend(&mut self, other: SolanaBlockFilter) { + self.trigger_every_block = self.trigger_every_block || other.trigger_every_block; + } +} diff --git a/chain/solana/src/capabilities.rs b/chain/solana/src/capabilities.rs new file mode 100644 index 00000000000..2f2d0ff1aba --- /dev/null +++ b/chain/solana/src/capabilities.rs @@ -0,0 +1,37 @@ +use graph::{anyhow::Error, impl_slog_value}; +use std::cmp::{Ordering, PartialOrd}; +use std::fmt; +use std::str::FromStr; + +use crate::data_source::DataSource; + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub struct NodeCapabilities {} + +impl PartialOrd for NodeCapabilities { + fn partial_cmp(&self, _other: &Self) -> Option { + None + } +} + +impl FromStr for NodeCapabilities { + type Err = Error; + + fn from_str(_s: &str) -> Result { + Ok(NodeCapabilities {}) + } +} + +impl fmt::Display for NodeCapabilities { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.write_str("solana") + } +} + +impl_slog_value!(NodeCapabilities, "{}"); + +impl graph::blockchain::NodeCapabilities for NodeCapabilities { + fn from_data_sources(_data_sources: &[DataSource]) -> Self { + NodeCapabilities {} + } +} diff --git a/chain/solana/src/chain.rs b/chain/solana/src/chain.rs new file mode 100644 index 00000000000..5b2c17dee70 --- /dev/null +++ b/chain/solana/src/chain.rs @@ -0,0 +1,299 @@ +use graph::blockchain::{Block, BlockchainKind}; +use graph::cheap_clone::CheapClone; +use graph::data::subgraph::UnifiedMappingApiVersion; +use graph::firehose::FirehoseEndpoints; +use graph::prelude::StopwatchMetrics; +use graph::{ + anyhow, + blockchain::{ + block_stream::{ + BlockStreamEvent, BlockStreamMetrics, BlockWithTriggers, FirehoseError, + FirehoseMapper as FirehoseMapperTrait, TriggersAdapter as TriggersAdapterTrait, + }, + firehose_block_stream::FirehoseBlockStream, + BlockHash, BlockPtr, Blockchain, IngestorError, + }, + components::store::DeploymentLocator, + firehose::{self as firehose, ForkStep}, + prelude::{async_trait, o, BlockNumber, ChainStore, Error, Logger, LoggerFactory}, +}; +use prost::Message; +use std::sync::Arc; + +use crate::adapter::TriggerFilter; +use crate::capabilities::NodeCapabilities; +use crate::data_source::{DataSourceTemplate, UnresolvedDataSourceTemplate}; +use crate::runtime::RuntimeAdapter; +use crate::trigger::{self, SolanaTrigger}; +use crate::{ + codec, + data_source::{DataSource, UnresolvedDataSource}, +}; +use graph::blockchain::block_stream::BlockStream; +use graph::components::store::WritableStore; + +pub struct Chain { + logger_factory: LoggerFactory, + name: String, + firehose_endpoints: Arc, + chain_store: Arc, +} + +impl std::fmt::Debug for Chain { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "chain: near") + } +} + +impl Chain { + pub fn new( + logger_factory: LoggerFactory, + name: String, + chain_store: Arc, + firehose_endpoints: FirehoseEndpoints, + ) -> Self { + Chain { + logger_factory, + name, + firehose_endpoints: Arc::new(firehose_endpoints), + chain_store, + } + } +} + +#[async_trait] +impl Blockchain for Chain { + const KIND: BlockchainKind = BlockchainKind::Solana; + type Block = codec::Block; + type DataSource = DataSource; + type UnresolvedDataSource = UnresolvedDataSource; + type DataSourceTemplate = DataSourceTemplate; + type UnresolvedDataSourceTemplate = UnresolvedDataSourceTemplate; + type TriggersAdapter = TriggersAdapter; + type TriggerData = crate::trigger::SolanaTrigger; + type MappingTrigger = crate::trigger::SolanaTrigger; + type TriggerFilter = crate::adapter::TriggerFilter; + type NodeCapabilities = crate::capabilities::NodeCapabilities; + type RuntimeAdapter = RuntimeAdapter; + + fn triggers_adapter( + &self, + _loc: &DeploymentLocator, + _capabilities: &Self::NodeCapabilities, + _unified_api_version: UnifiedMappingApiVersion, + _stopwatch_metrics: StopwatchMetrics, + ) -> Result, Error> { + let adapter = TriggersAdapter {}; + Ok(Arc::new(adapter)) + } + + async fn new_firehose_block_stream( + &self, + deployment: DeploymentLocator, + store: Arc, + start_blocks: Vec, + filter: Arc, + metrics: Arc, + unified_api_version: UnifiedMappingApiVersion, + ) -> Result>, Error> { + let adapter = self + .triggers_adapter( + &deployment, + &NodeCapabilities {}, + unified_api_version.clone(), + metrics.stopwatch.clone(), + ) + .expect(&format!("no adapter for network {}", self.name,)); + + let firehose_endpoint = match self.firehose_endpoints.random() { + Some(e) => e.clone(), + None => return Err(anyhow::format_err!("no firehose endpoint available",)), + }; + + let logger = self + .logger_factory + .subgraph_logger(&deployment) + .new(o!("component" => "FirehoseBlockStream")); + + let firehose_mapper = Arc::new(FirehoseMapper {}); + let firehose_cursor = store.block_cursor(); + + Ok(Box::new(FirehoseBlockStream::new( + firehose_endpoint, + firehose_cursor, + firehose_mapper, + adapter, + filter, + start_blocks, + logger, + ))) + } + + async fn new_polling_block_stream( + &self, + _deployment: DeploymentLocator, + _writable: Arc, + _start_blocks: Vec, + _subgraph_start_block: Option, + _filter: Arc, + _metrics: Arc, + _unified_api_version: UnifiedMappingApiVersion, + ) -> Result>, Error> { + panic!("SOLANA does not support polling block stream") + } + + fn chain_store(&self) -> Arc { + self.chain_store.clone() + } + + async fn block_pointer_from_number( + &self, + _logger: &Logger, + _number: BlockNumber, + ) -> Result { + // FIXME (Solana): Hmmm, what to do with this? + Ok(BlockPtr { + hash: BlockHash::from(vec![0xff; 32]), + number: 0, + }) + } + + fn runtime_adapter(&self) -> Arc { + Arc::new(RuntimeAdapter {}) + } + + fn is_firehose_supported(&self) -> bool { + true + } +} + +pub struct TriggersAdapter {} + +#[async_trait] +impl TriggersAdapterTrait for TriggersAdapter { + fn ancestor_block( + &self, + _ptr: BlockPtr, + _offset: BlockNumber, + ) -> Result, Error> { + // FIXME (Solana): Might not be necessary for Solana support for now + Ok(None) + } + + async fn scan_triggers( + &self, + _from: BlockNumber, + _to: BlockNumber, + _filter: &TriggerFilter, + ) -> Result>, Error> { + // FIXME (Solana): Scanning triggers makes little sense in Firehose approach, let's see + Ok(vec![]) + } + + async fn triggers_in_block( + &self, + _logger: &Logger, + block: codec::Block, + _filter: &TriggerFilter, + ) -> Result, Error> { + let shared_block = Arc::new(block.clone()); + let instructions = block.transactions.iter().flat_map(|transaction| { + //let transaction_id = transaction.id.clone(); + let b = shared_block.clone(); + let tx = transaction.clone(); + transaction.instructions.iter().flat_map(move |instruction| { + Some(trigger::InstructionWithInfo { + instruction: instruction.clone(), + block_num: b.number, + block_id: b.id.clone(), + transaction_id: tx.id.clone(), + }) + }) + }); + + let mut trigger_data: Vec<_> = instructions + .map(|i| SolanaTrigger::Instruction(Arc::new(i))) + .collect(); + trigger_data.push(SolanaTrigger::Block(shared_block.cheap_clone())); + + Ok(BlockWithTriggers::new(block, trigger_data)) + } + + async fn is_on_main_chain(&self, _ptr: BlockPtr) -> Result { + // FIXME (Solana): Might not be necessary for Solana support for now + Ok(true) + } + + /// Panics if `block` is genesis. + /// But that's ok since this is only called when reverting `block`. + async fn parent_ptr(&self, block: &BlockPtr) -> Result, Error> { + // FIXME (NEAR): Might not be necessary for NEAR support for now + Ok(Some(BlockPtr { + hash: BlockHash::from(vec![0xff; 32]), + number: block.number.saturating_sub(1), + })) + } +} + +pub struct FirehoseMapper {} + +#[async_trait] +impl FirehoseMapperTrait for FirehoseMapper { + async fn to_block_stream_event( + &self, + logger: &Logger, + response: &firehose::Response, + adapter: &TriggersAdapter, + filter: &TriggerFilter, + ) -> Result, FirehoseError> { + let step = ForkStep::from_i32(response.step).unwrap_or_else(|| { + panic!( + "unknown step i32 value {}, maybe you forgot update & re-regenerate the protobuf definitions?", + response.step + ) + }); + let any_block = response + .block + .as_ref() + .expect("block payload information should always be present"); + + // Right now, this is done in all cases but in reality, with how the BlockStreamEvent::Revert + // is defined right now, only block hash and block number is necessary. However, this information + // is not part of the actual bstream::BlockResponseV2 payload. As such, we need to decode the full + // block which is useless. + // + // Check about adding basic information about the block in the bstream::BlockResponseV2 or maybe + // define a slimmed down stuct that would decode only a few fields and ignore all the rest. + let block = codec::Block::decode(any_block.value.as_ref())?; + + use ForkStep::*; + match step { + StepNew => Ok(BlockStreamEvent::ProcessBlock( + adapter.triggers_in_block(logger, block, filter).await?, + Some(response.cursor.clone()), + )), + + StepUndo => { + // let header = block.header(); + // let parent_ptr = header + // .parent_ptr() + // .expect("Genesis block should never be reverted"); + //todo: ^^^^^^^^^^^^^^^^^^^^^^^^^ + + Ok(BlockStreamEvent::Revert( + block.ptr(), + Some(response.cursor.clone()), + None, //Some(parent_ptr), //todo: <----??? + )) + } + + StepIrreversible => { + panic!("irreversible step is not handled and should not be requested in the Firehose request") + } + + StepUnknown => { + panic!("unknown step should not happen in the Firehose response") + } + } + } +} diff --git a/chain/solana/src/codec.rs b/chain/solana/src/codec.rs new file mode 100644 index 00000000000..2b5c31edc2e --- /dev/null +++ b/chain/solana/src/codec.rs @@ -0,0 +1,52 @@ +#[path = "protobuf/sf.solana.codec.v1.rs"] +mod pbcodec; + +use graph::{blockchain::Block as BlockchainBlock, blockchain::BlockPtr, prelude::BlockNumber}; +use std::convert::TryFrom; + +pub use pbcodec::*; + +impl Block { + pub fn parent_ptr(&self) -> Option { + return None; + // if self.previous_id.len() == 0 { + // return None; + // } + // + // Some( + // BlockPtr::try_from((self.previous_id.as_ref(), self.number)) + // .expect("invalid block's hash"), + // ) + //todo: ^^^^^ + } +} + +impl From for BlockPtr { + fn from(b: Block) -> BlockPtr { + (&b).into() + } +} + +impl<'a> From<&'a Block> for BlockPtr { + fn from(b: &'a Block) -> BlockPtr { + // let hash = BlockHash::from(b.id) + // .expect(&format!("id {} should be a valid BlockHash", &b.id,)); + + BlockPtr::try_from((b.id.as_slice(), i64::try_from(b.number).unwrap())) + .expect("invalid block's hash") + } +} + +impl BlockchainBlock for Block { + fn ptr(&self) -> BlockPtr { + self.into() + } + + fn parent_ptr(&self) -> Option { + self.parent_ptr() + } + + fn number(&self) -> i32 { + BlockNumber::try_from(self.number).expect("invalid block's height") + } +} diff --git a/chain/solana/src/data_source.rs b/chain/solana/src/data_source.rs new file mode 100644 index 00000000000..e9a5759acd1 --- /dev/null +++ b/chain/solana/src/data_source.rs @@ -0,0 +1,406 @@ +use base58::ToBase58; +use graph::blockchain::{Block, TriggerWithHandler}; +use graph::components::store::StoredDynamicDataSource; +use graph::data::subgraph::DataSourceContext; +use graph::prelude::SubgraphManifestValidationError; +use graph::{ + anyhow::{anyhow, Error}, + blockchain::{self, Blockchain}, + prelude::{ + async_trait, info, BlockNumber, CheapClone, DataSourceTemplateInfo, Deserialize, Link, + LinkResolver, Logger, + }, + semver, +}; +use std::collections::BTreeMap; +use std::{convert::TryFrom, sync::Arc}; + +use crate::chain::Chain; +use crate::trigger::SolanaTrigger; + +pub const SOLANA_KIND: &str = "solana"; + +/// Runtime representation of a data source. +#[derive(Clone, Debug)] +pub struct DataSource { + pub kind: String, + pub network: Option, + pub name: String, + pub(crate) source: Source, + pub mapping: Mapping, + pub context: Arc>, + pub creation_block: Option, +} + +impl blockchain::DataSource for DataSource { + fn address(&self) -> Option<&[u8]> { + self.source.program_id.as_ref().map(String::as_bytes) + } + + fn start_block(&self) -> BlockNumber { + self.source.start_block + } + + fn name(&self) -> &str { + &self.name + } + + fn kind(&self) -> &str { + &self.kind + } + + fn network(&self) -> Option<&str> { + self.network.as_ref().map(|s| s.as_str()) + } + + fn context(&self) -> Arc> { + self.context.cheap_clone() + } + + fn creation_block(&self) -> Option { + self.creation_block + } + + fn api_version(&self) -> semver::Version { + self.mapping.api_version.clone() + } + + fn runtime(&self) -> &[u8] { + self.mapping.runtime.as_ref() + } + + fn match_and_decode( + &self, + trigger: &::TriggerData, + block: Arc<::Block>, + _logger: &Logger, + ) -> Result>, Error> { + if self.source.start_block > block.number() { + return Ok(None); + } + + let handler = match trigger { + // A block trigger matches if a block handler is present. + SolanaTrigger::Block(_) => match self.handler_for_block() { + Some(handler) => &handler.handler, + None => return Ok(None), + }, + + SolanaTrigger::Instruction(instruction_with_block) => { + let pid = &instruction_with_block.instruction.program_id; + let encoded_instruction_pid = pid.as_slice().to_base58(); + + if Some(encoded_instruction_pid) != self.source.program_id { + return Ok(None); + } + + match self.handler_for_instruction() { + Some(handler) => &handler.handler, + None => return Ok(None), + } + } + }; + + Ok(Some(TriggerWithHandler::new( + trigger.cheap_clone(), + handler.to_owned(), + ))) + } + + fn is_duplicate_of(&self, other: &Self) -> bool { + let DataSource { + kind, + network, + name, + source, + mapping, + context, + + // The creation block is ignored for detection duplicate data sources. + // Contract ABI equality is implicit in `source` and `mapping.abis` equality. + creation_block: _, + } = self; + + // mapping_request_sender, host_metrics, and (most of) host_exports are operational structs + // used at runtime but not needed to define uniqueness; each runtime host should be for a + // unique data source. + kind == &other.kind + && network == &other.network + && name == &other.name + && source == &other.source + && mapping.block_handlers == other.mapping.block_handlers + && context == &other.context + } + + fn as_stored_dynamic_data_source(&self) -> StoredDynamicDataSource { + // FIXME (Solana): Implement me! + todo!() + } + + fn from_stored_dynamic_data_source( + _templates: &BTreeMap<&str, &DataSourceTemplate>, + _stored: StoredDynamicDataSource, + ) -> Result { + // FIXME (Solana): Implement me correctly + todo!() + } + + fn validate(&self) -> Vec { + let mut errors = Vec::new(); + + if self.kind != SOLANA_KIND { + errors.push(anyhow!( + "data source has invalid `kind`, expected {} but found {}", + SOLANA_KIND, + self.kind + )) + } + + // Validate that there is a `source` address if there are instruction handlers + let no_source_address = self.address().is_none(); + let has_instruction_handlers = !self.mapping.instruction_handlers.is_empty(); + if no_source_address && has_instruction_handlers { + errors.push(SubgraphManifestValidationError::SourceAddressRequired.into()); + }; + + // Validate that there are no more than one of both block handlers and receipt handlers + if self.mapping.block_handlers.len() > 1 { + errors.push(anyhow!("data source has duplicated block handlers")); + } + if self.mapping.instruction_handlers.len() > 1 { + errors.push(anyhow!("data source has duplicated receipt handlers")); + } + + errors + } +} + +impl DataSource { + fn from_manifest( + kind: String, + network: Option, + name: String, + source: Source, + mapping: Mapping, + context: Option, + ) -> Result { + // Data sources in the manifest are created "before genesis" so they have no creation block. + let creation_block = None; + + Ok(DataSource { + kind, + network, + name, + source, + mapping, + context: Arc::new(context), + creation_block, + }) + } + + fn handler_for_block(&self) -> Option<&MappingBlockHandler> { + self.mapping.block_handlers.first() + } + + fn handler_for_instruction(&self) -> Option<&MappingInstructionHandler> { + self.mapping.instruction_handlers.first() + } +} + +#[derive(Clone, Debug, Eq, PartialEq, Deserialize)] +pub struct UnresolvedDataSource { + pub kind: String, + pub network: Option, + pub name: String, + pub(crate) source: Source, + pub mapping: UnresolvedMapping, + pub context: Option, +} + +#[async_trait] +impl blockchain::UnresolvedDataSource for UnresolvedDataSource { + async fn resolve( + self, + resolver: &impl LinkResolver, + logger: &Logger, + ) -> Result { + let UnresolvedDataSource { + kind, + network, + name, + source, + mapping, + context, + } = self; + + info!(logger, "Resolve data source"; "name" => &name, "source" => &source.start_block); + + let mapping = mapping.resolve(&*resolver, logger).await?; + + DataSource::from_manifest(kind, network, name, source, mapping, context) + } +} + +impl TryFrom> for DataSource { + type Error = Error; + + fn try_from(_info: DataSourceTemplateInfo) -> Result { + Err(anyhow!("Near subgraphs do not support templates")) + + // How this might be implemented if/when Near gets support for templates: + // let DataSourceTemplateInfo { + // template, + // params, + // context, + // creation_block, + // } = info; + + // let account = params + // .get(0) + // .with_context(|| { + // format!( + // "Failed to create data source from template `{}`: account parameter is missing", + // template.name + // ) + // })? + // .clone(); + + // Ok(DataSource { + // kind: template.kind, + // network: template.network, + // name: template.name, + // source: Source { + // account, + // start_block: 0, + // }, + // mapping: template.mapping, + // context: Arc::new(context), + // creation_block: Some(creation_block), + // }) + } +} + +#[derive(Clone, Debug, Default, Hash, Eq, PartialEq, Deserialize)] +pub struct BaseDataSourceTemplate { + pub kind: String, + pub network: Option, + pub name: String, + pub mapping: M, +} + +pub type UnresolvedDataSourceTemplate = BaseDataSourceTemplate; +pub type DataSourceTemplate = BaseDataSourceTemplate; + +#[async_trait] +impl blockchain::UnresolvedDataSourceTemplate for UnresolvedDataSourceTemplate { + async fn resolve( + self, + resolver: &impl LinkResolver, + logger: &Logger, + ) -> Result { + let UnresolvedDataSourceTemplate { + kind, + network, + name, + mapping, + } = self; + + info!(logger, "Resolve data source template"; "name" => &name); + + Ok(DataSourceTemplate { + kind, + network, + name, + mapping: mapping.resolve(resolver, logger).await?, + }) + } +} + +impl blockchain::DataSourceTemplate for DataSourceTemplate { + fn api_version(&self) -> semver::Version { + self.mapping.api_version.clone() + } + + fn runtime(&self) -> &[u8] { + self.mapping.runtime.as_ref() + } + + fn name(&self) -> &str { + &self.name + } +} + +#[derive(Clone, Debug, Default, Hash, Eq, PartialEq, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct UnresolvedMapping { + pub api_version: String, + pub language: String, + pub entities: Vec, + #[serde(default)] + pub block_handlers: Vec, + #[serde(default)] + pub instruction_handlers: Vec, + pub file: Link, +} + +impl UnresolvedMapping { + pub async fn resolve( + self, + resolver: &impl LinkResolver, + logger: &Logger, + ) -> Result { + let UnresolvedMapping { + api_version, + language, + entities, + block_handlers, + instruction_handlers, + file: link, + } = self; + + let api_version = semver::Version::parse(&api_version)?; + + info!(logger, "Resolve mapping"; "link" => &link.link); + let module_bytes = resolver.cat(logger, &link).await?; + + Ok(Mapping { + api_version, + language, + entities, + block_handlers, + instruction_handlers, + runtime: Arc::new(module_bytes), + link, + }) + } +} + +#[derive(Clone, Debug)] +pub struct Mapping { + pub api_version: semver::Version, + pub language: String, + pub entities: Vec, + pub block_handlers: Vec, + pub instruction_handlers: Vec, + pub runtime: Arc>, + pub link: Link, +} + +#[derive(Clone, Debug, Hash, Eq, PartialEq, Deserialize)] +pub struct MappingBlockHandler { + pub handler: String, +} + +#[derive(Clone, Debug, Hash, Eq, PartialEq, Deserialize)] +pub struct MappingInstructionHandler { + handler: String, +} + +#[derive(Clone, Debug, Hash, Eq, PartialEq, Deserialize)] +pub(crate) struct Source { + // A data source that does not have an account can only have block handlers. + #[serde(rename = "programId", default)] + pub(crate) program_id: Option, + #[serde(rename = "startBlock", default)] + pub(crate) start_block: BlockNumber, +} diff --git a/chain/solana/src/lib.rs b/chain/solana/src/lib.rs new file mode 100644 index 00000000000..a497e77bf9d --- /dev/null +++ b/chain/solana/src/lib.rs @@ -0,0 +1,10 @@ +mod adapter; +mod capabilities; +mod chain; +mod codec; +mod data_source; +mod runtime; +mod trigger; + +pub use crate::chain::Chain; +pub use codec::Block; diff --git a/chain/solana/src/protobuf/google.protobuf.rs b/chain/solana/src/protobuf/google.protobuf.rs new file mode 100644 index 00000000000..8b137891791 --- /dev/null +++ b/chain/solana/src/protobuf/google.protobuf.rs @@ -0,0 +1 @@ + diff --git a/chain/solana/src/protobuf/sf.solana.codec.v1.rs b/chain/solana/src/protobuf/sf.solana.codec.v1.rs new file mode 100644 index 00000000000..7a297701374 --- /dev/null +++ b/chain/solana/src/protobuf/sf.solana.codec.v1.rs @@ -0,0 +1,175 @@ +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Block { + /// corresponds to the Slot id (or hash) + #[prost(bytes = "vec", tag = "1")] + pub id: ::prost::alloc::vec::Vec, + /// corresponds to the Slot number for this block + #[prost(uint64, tag = "2")] + pub number: u64, + #[prost(uint32, tag = "3")] + pub version: u32, + /// corresponds to the previous_blockhash, might skip some slots, so beware + #[prost(bytes = "vec", tag = "4")] + pub previous_id: ::prost::alloc::vec::Vec, + #[prost(uint64, tag = "5")] + pub previous_block: u64, + #[prost(uint64, tag = "6")] + pub genesis_unix_timestamp: u64, + #[prost(uint64, tag = "7")] + pub clock_unix_timestamp: u64, + #[prost(uint64, tag = "8")] + pub root_num: u64, + #[prost(bytes = "vec", tag = "9")] + pub last_entry_hash: ::prost::alloc::vec::Vec, + #[prost(message, repeated, tag = "10")] + pub transactions: ::prost::alloc::vec::Vec, + #[prost(uint32, tag = "11")] + pub transaction_count: u32, + #[prost(bool, tag = "12")] + pub has_split_account_changes: bool, + #[prost(string, tag = "13")] + pub account_changes_file_ref: ::prost::alloc::string::String, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Batch { + #[prost(message, repeated, tag = "1")] + pub transactions: ::prost::alloc::vec::Vec, +} +/// Bundled in separate files, referenced by `account_changes_file_ref` +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct AccountChangesBundle { + /// Maps to the index of the `repeated` field for Block::transactions + #[prost(message, repeated, tag = "1")] + pub transactions: ::prost::alloc::vec::Vec, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct AccountChangesPerTrxIndex { + #[prost(bytes = "vec", tag = "1")] + pub trx_id: ::prost::alloc::vec::Vec, + /// Maps to the index within the `repeated` field of the proto for + /// Transaction::instructions + #[prost(message, repeated, tag = "2")] + pub instructions: ::prost::alloc::vec::Vec, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct AccountChangesPerInstruction { + /// Data to be put in Instruction::account_changes + #[prost(message, repeated, tag = "1")] + pub changes: ::prost::alloc::vec::Vec, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Transaction { + /// The transaction ID corresponds to the _first_ + /// signature. Additional signatures are in `additional_signatures`. + #[prost(bytes = "vec", tag = "1")] + pub id: ::prost::alloc::vec::Vec, + /// Index from within a single Slot, deterministically ordered to the + /// best of our ability using the transaction ID as a sort key for + /// the batch of transactions executed in parallel. + #[prost(uint64, tag = "2")] + pub index: u64, + #[prost(bytes = "vec", repeated, tag = "3")] + pub additional_signatures: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec>, + #[prost(message, optional, tag = "4")] + pub header: ::core::option::Option, + /// From the original Message object + #[prost(bytes = "vec", repeated, tag = "5")] + pub account_keys: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec>, + /// From the original Message object + #[prost(bytes = "vec", tag = "6")] + pub recent_blockhash: ::prost::alloc::vec::Vec, + /// What follows Once executed these can be set: + #[prost(string, repeated, tag = "7")] + pub log_messages: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + /// Instructions, containing both top-level and nested transactions + #[prost(message, repeated, tag = "8")] + pub instructions: ::prost::alloc::vec::Vec, + #[prost(bool, tag = "9")] + pub failed: bool, + #[prost(message, optional, tag = "10")] + pub error: ::core::option::Option, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct MessageHeader { + #[prost(uint32, tag = "1")] + pub num_required_signatures: u32, + #[prost(uint32, tag = "2")] + pub num_readonly_signed_accounts: u32, + #[prost(uint32, tag = "3")] + pub num_readonly_unsigned_accounts: u32, +} +//* +//- instr1 (id=1, parent=0) +//- instr2 (id=2, parent=0) (pubkey1 is writable) +//- instr3 (id=3, parent=2) (pubkey1 is writable) +//- instr4 (id=4, parent=3) (pubkey1 is writable) +//- instr5 (id=5, parent=4) (pubkey1 is writable, mutates pubkey1) +//collect delta of pubkey1 +//collect delta of pubkey1 ONLY IF CHANGED AGAIN, from last time we took a snapshot of it. +//collect delta of pubkey1 +//- instr6 (id=6, parent=0) + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Instruction { + #[prost(bytes = "vec", tag = "3")] + pub program_id: ::prost::alloc::vec::Vec, + #[prost(bytes = "vec", repeated, tag = "4")] + pub account_keys: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec>, + #[prost(bytes = "vec", tag = "5")] + pub data: ::prost::alloc::vec::Vec, + // What follows is execution trace data, could be empty for un-executed transactions. + #[prost(uint32, tag = "6")] + pub ordinal: u32, + #[prost(uint32, tag = "7")] + pub parent_ordinal: u32, + #[prost(uint32, tag = "8")] + pub depth: u32, + #[prost(message, repeated, tag = "9")] + pub balance_changes: ::prost::alloc::vec::Vec, + #[prost(message, repeated, tag = "10")] + pub account_changes: ::prost::alloc::vec::Vec, + #[prost(bool, tag = "15")] + pub failed: bool, + #[prost(message, optional, tag = "16")] + pub error: ::core::option::Option, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct BalanceChange { + #[prost(bytes = "vec", tag = "1")] + pub pubkey: ::prost::alloc::vec::Vec, + #[prost(uint64, tag = "2")] + pub prev_lamports: u64, + #[prost(uint64, tag = "3")] + pub new_lamports: u64, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct AccountChange { + #[prost(bytes = "vec", tag = "1")] + pub pubkey: ::prost::alloc::vec::Vec, + #[prost(bytes = "vec", tag = "2")] + pub prev_data: ::prost::alloc::vec::Vec, + #[prost(bytes = "vec", tag = "3")] + pub new_data: ::prost::alloc::vec::Vec, + #[prost(uint64, tag = "4")] + pub new_data_length: u64, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct TransactionError { + #[prost(string, tag = "2")] + pub error: ::prost::alloc::string::String, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct TransactionInstructionError { + #[prost(string, tag = "2")] + pub error: ::prost::alloc::string::String, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct InstructionError { + #[prost(string, tag = "2")] + pub error: ::prost::alloc::string::String, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct InstructionErrorCustom { + #[prost(string, tag = "2")] + pub error: ::prost::alloc::string::String, +} diff --git a/chain/solana/src/runtime/abi.rs b/chain/solana/src/runtime/abi.rs new file mode 100644 index 00000000000..b7bb2d2ca8b --- /dev/null +++ b/chain/solana/src/runtime/abi.rs @@ -0,0 +1,439 @@ +use crate::codec; +use crate::trigger::InstructionWithInfo; +use graph::anyhow::anyhow; + +use crate::codec::{InstructionError, MessageHeader, TransactionError}; +use graph::runtime::{asc_new, AscHeap, DeterministicHostError, ToAscObj}; +use graph_runtime_wasm::asc_abi::class::Array; + +pub(crate) use super::generated::*; + +impl ToAscObj for codec::Block { + fn to_asc_obj( + &self, + heap: &mut H, + ) -> Result { + Ok(AscBlock { + id: asc_new(heap, self.id.as_slice())?, + number: self.number, + version: self.version, + previous_id: asc_new(heap, self.previous_id.as_slice())?, + previous_block: self.previous_block, + genesis_unix_timestamp: self.genesis_unix_timestamp, + clock_unix_timestamp: self.clock_unix_timestamp, + root_num: self.root_num, + last_entry_hash: asc_new(heap, self.last_entry_hash.as_slice())?, + transactions: asc_new(heap, &self.transactions)?, + transaction_count: self.transaction_count, + has_split_account_changes: self.has_split_account_changes, + account_changes_file_ref: asc_new(heap, &self.account_changes_file_ref)?, + _padding: Padding3::new(), + }) + } +} + +impl ToAscObj for codec::Instruction { + fn to_asc_obj( + &self, + heap: &mut H, + ) -> Result { + Ok(AscInstruction { + program_id: asc_new(heap, self.program_id.as_slice())?, + account_keys: asc_new(heap, &self.account_keys)?, + data: asc_new(heap, self.data.as_slice())?, + ordinal: self.ordinal, + parent_ordinal: self.parent_ordinal, + depth: self.depth, + balance_changes: asc_new(heap, &self.balance_changes)?, + account_changes: asc_new(heap, &self.account_changes)?, + error: asc_new(heap, &self.error)?, + failed: false, + _padding: Padding3::new(), + }) + } +} + +impl ToAscObj for Option { + fn to_asc_obj( + &self, + heap: &mut H, + ) -> Result { + match self { + None => { + let blank = InstructionError { + error: String::from(""), + }; + Ok(blank.to_asc_obj(heap)?) + } + Some(e) => Ok(e.to_asc_obj(heap)?), + } + } +} + +impl ToAscObj for codec::MessageHeader { + fn to_asc_obj( + &self, + _heap: &mut H, + ) -> Result { + Ok(AscMessageHeader { + num_required_signatures: self.num_required_signatures, + num_readonly_signed_accounts: self.num_readonly_signed_accounts, + num_readonly_unsigned_accounts: self.num_readonly_unsigned_accounts, + _padding: 0, + }) + } +} + +impl ToAscObj for codec::Transaction { + fn to_asc_obj( + &self, + heap: &mut H, + ) -> Result { + Ok(AscTransaction { + id: asc_new(heap, self.id.as_slice())?, + index: self.index, + additional_signatures: asc_new(heap, &self.additional_signatures)?, + header: asc_new(heap, &self.header)?, + account_keys: asc_new(heap, &self.account_keys)?, + recent_blockhash: asc_new(heap, self.recent_blockhash.as_slice())?, + log_messages: asc_new(heap, &self.log_messages)?, + instructions: asc_new(heap, &self.instructions)?, + failed: self.failed, + error: asc_new(heap, &self.error)?, + _padding: Padding7::new(), + }) + } +} + +impl ToAscObj for Option { + fn to_asc_obj( + &self, + heap: &mut H, + ) -> Result { + match self { + None => { + let blank = MessageHeader { + num_required_signatures: 0, + num_readonly_signed_accounts: 0, + num_readonly_unsigned_accounts: 0, + }; + Ok(blank.to_asc_obj(heap)?) + } + Some(e) => Ok(e.to_asc_obj(heap)?), + } + } +} + +impl ToAscObj for Option { + fn to_asc_obj( + &self, + heap: &mut H, + ) -> Result { + match self { + None => { + let blank = TransactionError { + error: String::from(""), + }; + Ok(blank.to_asc_obj(heap)?) + } + Some(e) => Ok(e.to_asc_obj(heap)?), + } + } +} + +impl ToAscObj for Vec { + fn to_asc_obj( + &self, + heap: &mut H, + ) -> Result { + let content: Result, _> = self.iter().map(|x| asc_new(heap, x)).collect(); + let content = content?; + Ok(AscTransactionArray(Array::new(&*content, heap)?)) + } +} + +impl ToAscObj for codec::TransactionError { + fn to_asc_obj( + &self, + heap: &mut H, + ) -> Result { + Ok(AscTransactionError { + error: asc_new(heap, &self.error)?, + }) + } +} + +impl ToAscObj for codec::InstructionError { + fn to_asc_obj( + &self, + heap: &mut H, + ) -> Result { + Ok(AscInstructionError { + error: asc_new(heap, &self.error)?, + }) + } +} + +impl ToAscObj for codec::BalanceChange { + fn to_asc_obj( + &self, + heap: &mut H, + ) -> Result { + Ok(AscBalanceChange { + prev_lamports: self.prev_lamports, + new_lamports: self.new_lamports, + pub_key: asc_new(heap, self.pubkey.as_slice())?, + _padding: 0, + }) + } +} + +impl ToAscObj for Vec { + fn to_asc_obj( + &self, + heap: &mut H, + ) -> Result { + let content: Result, _> = self.iter().map(|x| asc_new(heap, x)).collect(); + let content = content?; + Ok(AscBalanceChangeArray(Array::new(&*content, heap)?)) + } +} + +impl ToAscObj for codec::AccountChange { + fn to_asc_obj( + &self, + heap: &mut H, + ) -> Result { + Ok(AscAccountChange { + pub_key: asc_new(heap, self.pubkey.as_slice())?, + prev_data: asc_new(heap, self.prev_data.as_slice())?, + new_data: asc_new(heap, self.new_data.as_slice())?, + new_data_length: self.new_data_length, + _padding: 0, + }) + } +} + +impl ToAscObj for Vec { + fn to_asc_obj( + &self, + heap: &mut H, + ) -> Result { + let content: Result, _> = self.iter().map(|x| asc_new(heap, x)).collect(); + let content = content?; + Ok(AscAccountChangeArray(Array::new(&*content, heap)?)) + } +} + +impl ToAscObj for Vec { + fn to_asc_obj( + &self, + heap: &mut H, + ) -> Result { + let content: Result, _> = self.iter().map(|x| asc_new(heap, x)).collect(); + let content = content?; + Ok(AscInstructionArray(Array::new(&*content, heap)?)) + } +} + +impl ToAscObj for InstructionWithInfo { + fn to_asc_obj( + &self, + heap: &mut H, + ) -> Result { + Ok(AscInstructionWithInfo { + block_num: self.block_num, + instruction: asc_new(heap, &self.instruction)?, + block_id: asc_new(heap, self.block_id.as_slice())?, + transaction_id: asc_new(heap, self.transaction_id.as_slice())?, + _padding: 0, + }) + } +} + +impl ToAscObj for Vec> { + fn to_asc_obj( + &self, + heap: &mut H, + ) -> Result { + let content: Result, _> = self + .iter() + .map(|x| { + asc_new(heap, x.as_slice()) + // asc_new(heap, slice.as_ref())? + }) + .collect(); + let content = content?; + Ok(AscHashArray(Array::new(&*content, heap)?)) + } +} + +impl ToAscObj for Vec { + fn to_asc_obj( + &self, + heap: &mut H, + ) -> Result { + let content: Result, _> = self + .iter() + .map(|x| { + asc_new(heap, x) + // asc_new(heap, slice.as_ref())? + }) + .collect(); + let content = content?; + Ok(AscStringArray(Array::new(&*content, heap)?)) + } +} + +#[test] +fn block_to_asc_ptr() { + use graph::data::subgraph::API_VERSION_0_0_5; + use graph::prelude::hex; + + let mut heap = BytesHeap::new(API_VERSION_0_0_5); + let block = codec::Block { + id: hex::decode("01").expect(format!("Invalid hash value").as_ref()), + number: 0, + version: 0, + previous_id: vec![], + previous_block: 0, + genesis_unix_timestamp: 0, + clock_unix_timestamp: 0, + root_num: 0, + last_entry_hash: vec![], + transactions: vec![], + transaction_count: 0, + has_split_account_changes: false, + account_changes_file_ref: "".to_string(), + }; + + let asc_block = asc_new(&mut heap, &block); + assert!(asc_block.is_ok()); +} + +#[test] +fn transaction_to_asc_ptr() { + use graph::data::subgraph::API_VERSION_0_0_5; + use graph::prelude::hex; + + let mut heap = BytesHeap::new(API_VERSION_0_0_5); + let transaction = codec::Transaction { + id: vec![], + index: 0, + additional_signatures: vec![], + header: None, + account_keys: vec![], + recent_blockhash: vec![], + log_messages: vec![], + instructions: vec![], + failed: false, + error: None, + }; + + let asc_transaction = asc_new(&mut heap, &transaction); + assert!(asc_transaction.is_ok()); +} + +#[test] +fn instruction_to_asc_ptr() { + use graph::data::subgraph::API_VERSION_0_0_5; + use graph::prelude::hex; + use prost::Message; + + let balance_change_1 = codec::BalanceChange { + pubkey: hex::decode("01").expect(format!("Invalid hash value").as_ref()), + prev_lamports: 1, + new_lamports: 2, + }; + + let account_change_1 = codec::AccountChange { + pubkey: hex::decode("01").expect(format!("Invalid hash value").as_ref()), + prev_data: hex::decode("02").expect(format!("Invalid hash value").as_ref()), + new_data: hex::decode("03").expect(format!("Invalid hash value").as_ref()), + new_data_length: 1, + }; + + let mut heap = BytesHeap::new(API_VERSION_0_0_5); + let instruction = codec::Instruction { + program_id: hex::decode("01").expect(format!("Invalid hash value").as_ref()), + account_keys: vec![ + hex::decode("01").unwrap().encode_to_vec(), + hex::decode("02").unwrap().encode_to_vec(), + ], + data: hex::decode("01").expect(format!("Invalid hash value").as_ref()), + ordinal: 1, + parent_ordinal: 2, + depth: 42, + balance_changes: vec![balance_change_1], + account_changes: vec![account_change_1], + failed: false, + error: None, + }; + + let asc_instruction = asc_new(&mut heap, &instruction); + assert!(asc_instruction.is_ok()); +} + +struct BytesHeap { + api_version: graph::semver::Version, + memory: Vec, +} + +#[test] +impl BytesHeap { + fn new(api_version: graph::semver::Version) -> Self { + Self { + api_version, + memory: vec![], + } + } +} + +impl AscHeap for BytesHeap { + fn raw_new(&mut self, bytes: &[u8]) -> Result { + self.memory.extend_from_slice(bytes); + Ok((self.memory.len() - bytes.len()) as u32) + } + + fn get(&self, offset: u32, size: u32) -> Result, DeterministicHostError> { + let memory_byte_count = self.memory.len(); + if memory_byte_count == 0 { + return Err(DeterministicHostError::from(anyhow!( + "No memory is allocated" + ))); + } + + let start_offset = offset as usize; + let end_offset_exclusive = start_offset + size as usize; + + if start_offset >= memory_byte_count { + return Err(DeterministicHostError::from(anyhow!( + "Start offset {} is outside of allocated memory, max offset is {}", + start_offset, + memory_byte_count - 1 + ))); + } + + if end_offset_exclusive > memory_byte_count { + return Err(DeterministicHostError::from(anyhow!( + "End of offset {} is outside of allocated memory, max offset is {}", + end_offset_exclusive, + memory_byte_count - 1 + ))); + } + + return Ok(Vec::from(&self.memory[start_offset..end_offset_exclusive])); + } + + fn api_version(&self) -> graph::semver::Version { + self.api_version.clone() + } + + fn asc_type_id( + &mut self, + type_id_index: graph::runtime::IndexForAscTypeId, + ) -> Result { + // Not totally clear what is the purpose of this method, why not a default implementation here? + Ok(type_id_index as u32) + } +} diff --git a/chain/solana/src/runtime/generated.rs b/chain/solana/src/runtime/generated.rs new file mode 100644 index 00000000000..2c87a9705b1 --- /dev/null +++ b/chain/solana/src/runtime/generated.rs @@ -0,0 +1,306 @@ +// use diesel::dsl::Asc; +use graph::runtime::{AscIndexId, AscPtr, AscType, DeterministicHostError, IndexForAscTypeId}; +use graph::semver::Version; +use graph_runtime_derive::AscType; +use graph_runtime_wasm::asc_abi::class::{Array, AscString, Uint8Array}; + +pub(crate) type AscHash = Uint8Array; + +#[repr(C)] +#[derive(AscType)] +pub(crate) struct AscBlock { + pub number: u64, + pub previous_block: u64, + pub genesis_unix_timestamp: u64, + pub clock_unix_timestamp: u64, + pub root_num: u64, + pub transaction_count: u32, + pub version: u32, + pub id: AscPtr, + pub previous_id: AscPtr, + pub last_entry_hash: AscPtr, + pub transactions: AscPtr, + pub account_changes_file_ref: AscPtr, + pub has_split_account_changes: bool, + pub _padding: Padding3, +} + +impl AscIndexId for AscBlock { + const INDEX_ASC_TYPE_ID: IndexForAscTypeId = IndexForAscTypeId::SolanaBlock; +} + +#[repr(C)] +#[derive(AscType)] +pub(crate) struct AscInstruction { + pub ordinal: u32, + pub parent_ordinal: u32, + pub depth: u32, + pub program_id: AscPtr, + pub account_keys: AscPtr, + pub data: AscPtr, + pub balance_changes: AscPtr, + pub account_changes: AscPtr, + pub error: AscPtr, + pub failed: bool, + pub _padding: Padding3, +} + +// special type for the rare occurance where we need to pad three bytes +pub(crate) struct Padding3([u8; 3]); + +impl Padding3 { + pub fn new() -> Self { + Padding3 { 0: [0, 0, 0] } + } +} + +impl AscType for Padding3 { + fn to_asc_bytes(&self) -> Result, DeterministicHostError> { + Ok(vec![0, 0, 0]) + } + + fn from_asc_bytes( + _asc_obj: &[u8], + _api_version: &Version, + ) -> Result { + Ok(Padding3 { 0: [0, 0, 0] }) + } +} + +// special type for the rare occurance where we need to pad three bytes +pub(crate) struct Padding7([u8; 7]); + +impl Padding7 { + pub fn new() -> Self { + Padding7 { + 0: [0, 0, 0, 0, 0, 0, 0], + } + } +} + +impl AscType for Padding7 { + fn to_asc_bytes(&self) -> Result, DeterministicHostError> { + Ok(vec![0, 0, 0, 0, 0, 0, 0]) + } + + fn from_asc_bytes( + _asc_obj: &[u8], + _api_version: &Version, + ) -> Result { + Ok(Padding7 { + 0: [0, 0, 0, 0, 0, 0, 0], + }) + } +} + +impl AscIndexId for AscInstruction { + const INDEX_ASC_TYPE_ID: IndexForAscTypeId = IndexForAscTypeId::SolanaInstruction; +} + +#[repr(C)] +#[derive(AscType)] +pub(crate) struct AscInstructionWithInfo { + pub block_num: u64, + pub instruction: AscPtr, + pub block_id: AscPtr, + pub transaction_id: AscPtr, + pub _padding: u32, +} + +impl AscIndexId for AscInstructionWithInfo { + const INDEX_ASC_TYPE_ID: IndexForAscTypeId = IndexForAscTypeId::SolanaInstruction; +} + +#[repr(C)] +#[derive(AscType)] +pub(crate) struct AscTransaction { + pub index: u64, + pub id: AscPtr, + pub additional_signatures: AscPtr, + pub header: AscPtr, + pub account_keys: AscPtr, + pub recent_blockhash: AscPtr, + pub log_messages: AscPtr, + pub instructions: AscPtr, + pub error: AscPtr, + pub failed: bool, + pub _padding: Padding7, +} + +impl AscIndexId for AscTransaction { + const INDEX_ASC_TYPE_ID: IndexForAscTypeId = IndexForAscTypeId::SolanaTransaction; +} + +pub struct AscTransactionArray(pub(crate) Array>); + +impl AscType for AscTransactionArray { + fn to_asc_bytes(&self) -> Result, DeterministicHostError> { + self.0.to_asc_bytes() + } + + fn from_asc_bytes( + asc_obj: &[u8], + api_version: &Version, + ) -> Result { + Ok(Self(Array::from_asc_bytes(asc_obj, api_version)?)) + } +} + +impl AscIndexId for AscTransactionArray { + const INDEX_ASC_TYPE_ID: IndexForAscTypeId = IndexForAscTypeId::SolanaArrayTransaction; +} + +pub struct AscInstructionArray(pub(crate) Array>); + +impl AscType for AscInstructionArray { + fn to_asc_bytes(&self) -> Result, DeterministicHostError> { + self.0.to_asc_bytes() + } + + fn from_asc_bytes( + asc_obj: &[u8], + api_version: &Version, + ) -> Result { + Ok(Self(Array::from_asc_bytes(asc_obj, api_version)?)) + } +} + +impl AscIndexId for AscInstructionArray { + const INDEX_ASC_TYPE_ID: IndexForAscTypeId = IndexForAscTypeId::SolanaArrayInstruction; +} + +#[repr(C)] +#[derive(AscType)] +pub(crate) struct AscMessageHeader { + pub num_required_signatures: u32, + pub num_readonly_signed_accounts: u32, + pub num_readonly_unsigned_accounts: u32, + pub _padding: u32, +} + +impl AscIndexId for AscMessageHeader { + const INDEX_ASC_TYPE_ID: IndexForAscTypeId = IndexForAscTypeId::SolanaMessageHeader; +} + +#[repr(C)] +#[derive(AscType)] +pub(crate) struct AscTransactionError { + pub error: AscPtr, +} + +impl AscIndexId for AscTransactionError { + const INDEX_ASC_TYPE_ID: IndexForAscTypeId = IndexForAscTypeId::SolanaTransactionError; +} + +#[repr(C)] +#[derive(AscType)] +pub(crate) struct AscInstructionError { + pub error: AscPtr, +} + +impl AscIndexId for AscInstructionError { + const INDEX_ASC_TYPE_ID: IndexForAscTypeId = IndexForAscTypeId::SolanaInstructionError; +} + +pub struct AscHashArray(pub(crate) Array>); + +impl AscType for AscHashArray { + fn to_asc_bytes(&self) -> Result, DeterministicHostError> { + self.0.to_asc_bytes() + } + + fn from_asc_bytes( + asc_obj: &[u8], + api_version: &Version, + ) -> Result { + Ok(Self(Array::from_asc_bytes(asc_obj, api_version)?)) + } +} + +impl AscIndexId for AscStringArray { + const INDEX_ASC_TYPE_ID: IndexForAscTypeId = IndexForAscTypeId::SolanaArrayHash; +} + +pub struct AscStringArray(pub(crate) Array>); + +impl AscType for AscStringArray { + fn to_asc_bytes(&self) -> Result, DeterministicHostError> { + self.0.to_asc_bytes() + } + + fn from_asc_bytes( + asc_obj: &[u8], + api_version: &Version, + ) -> Result { + Ok(Self(Array::from_asc_bytes(asc_obj, api_version)?)) + } +} + +impl AscIndexId for AscHashArray { + const INDEX_ASC_TYPE_ID: IndexForAscTypeId = IndexForAscTypeId::SolanaArrayString; +} + +#[repr(C)] +#[derive(AscType)] +pub(crate) struct AscBalanceChange { + pub prev_lamports: u64, + pub new_lamports: u64, + pub pub_key: AscPtr, + pub _padding: u32, +} + +impl AscIndexId for AscBalanceChange { + const INDEX_ASC_TYPE_ID: IndexForAscTypeId = IndexForAscTypeId::SolanaBalanceChange; +} + +pub struct AscBalanceChangeArray(pub(crate) Array>); + +impl AscType for AscBalanceChangeArray { + fn to_asc_bytes(&self) -> Result, DeterministicHostError> { + self.0.to_asc_bytes() + } + + fn from_asc_bytes( + asc_obj: &[u8], + api_version: &Version, + ) -> Result { + Ok(Self(Array::from_asc_bytes(asc_obj, api_version)?)) + } +} + +impl AscIndexId for AscBalanceChangeArray { + const INDEX_ASC_TYPE_ID: IndexForAscTypeId = IndexForAscTypeId::SolanaBalanceChangeArray; +} + +#[repr(C)] +#[derive(AscType)] +pub(crate) struct AscAccountChange { + pub pub_key: AscPtr, + pub prev_data: AscPtr, + pub new_data: AscPtr, + pub _padding: u32, // in order to put next field on 8-byte boundary + pub new_data_length: u64, +} + +impl AscIndexId for AscAccountChange { + const INDEX_ASC_TYPE_ID: IndexForAscTypeId = IndexForAscTypeId::SolanaAccountChange; +} + +pub struct AscAccountChangeArray(pub(crate) Array>); + +impl AscType for AscAccountChangeArray { + fn to_asc_bytes(&self) -> Result, DeterministicHostError> { + self.0.to_asc_bytes() + } + + fn from_asc_bytes( + asc_obj: &[u8], + api_version: &Version, + ) -> Result { + Ok(Self(Array::from_asc_bytes(asc_obj, api_version)?)) + } +} + +impl AscIndexId for AscAccountChangeArray { + const INDEX_ASC_TYPE_ID: IndexForAscTypeId = IndexForAscTypeId::SolanaAccountChangeArray; +} diff --git a/chain/solana/src/runtime/mod.rs b/chain/solana/src/runtime/mod.rs new file mode 100644 index 00000000000..f44391caffd --- /dev/null +++ b/chain/solana/src/runtime/mod.rs @@ -0,0 +1,6 @@ +pub use runtime_adapter::RuntimeAdapter; + +pub mod abi; +pub mod runtime_adapter; + +mod generated; diff --git a/chain/solana/src/runtime/runtime_adapter.rs b/chain/solana/src/runtime/runtime_adapter.rs new file mode 100644 index 00000000000..c5fa9e15059 --- /dev/null +++ b/chain/solana/src/runtime/runtime_adapter.rs @@ -0,0 +1,11 @@ +use crate::{data_source::DataSource, Chain}; +use blockchain::HostFn; +use graph::{anyhow::Error, blockchain}; + +pub struct RuntimeAdapter {} + +impl blockchain::RuntimeAdapter for RuntimeAdapter { + fn host_fns(&self, _ds: &DataSource) -> Result, Error> { + Ok(vec![]) + } +} diff --git a/chain/solana/src/trigger.rs b/chain/solana/src/trigger.rs new file mode 100644 index 00000000000..3cbda1fdb37 --- /dev/null +++ b/chain/solana/src/trigger.rs @@ -0,0 +1,482 @@ +use graph::blockchain; +use graph::blockchain::Block; +use graph::blockchain::TriggerData; +use graph::cheap_clone::CheapClone; +use graph::prelude::hex; +use graph::prelude::web3::types::H256; +use graph::prelude::BlockNumber; +use graph::runtime::asc_new; +use graph::runtime::AscHeap; +use graph::runtime::AscPtr; +use graph::runtime::DeterministicHostError; +use std::{cmp::Ordering, sync::Arc}; + +use crate::codec; + +// Logging the block is too verbose, so this strips the block from the trigger for Debug. +impl std::fmt::Debug for SolanaTrigger { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + #[derive(Debug)] + pub enum MappingTriggerWithoutBlock<'a> { + Block, + Instruction { instruction: &'a codec::Instruction }, + } + + let trigger_without_block = match self { + SolanaTrigger::Block(_) => MappingTriggerWithoutBlock::Block, + SolanaTrigger::Instruction(instruction_with_block) => { + MappingTriggerWithoutBlock::Instruction { + instruction: &instruction_with_block.instruction, + } + } + }; + + write!(f, "{:?}", trigger_without_block) + } +} + +impl blockchain::MappingTrigger for SolanaTrigger { + fn to_asc_ptr(self, heap: &mut H) -> Result, DeterministicHostError> { + Ok(match self { + SolanaTrigger::Block(block) => asc_new(heap, block.as_ref())?.erase(), + SolanaTrigger::Instruction(instruction_with_block) => { + asc_new(heap, instruction_with_block.as_ref())?.erase() + } + }) + } +} + +#[derive(Clone)] +pub enum SolanaTrigger { + Block(Arc), + Instruction(Arc), +} + +impl CheapClone for SolanaTrigger { + fn cheap_clone(&self) -> SolanaTrigger { + match self { + SolanaTrigger::Block(block) => SolanaTrigger::Block(block.cheap_clone()), + SolanaTrigger::Instruction(instruction_with_block) => { + SolanaTrigger::Instruction(instruction_with_block.cheap_clone()) + } + } + } +} + +impl PartialEq for SolanaTrigger { + fn eq(&self, other: &Self) -> bool { + match (self, other) { + (Self::Block(a_ptr), Self::Block(b_ptr)) => a_ptr == b_ptr, + (Self::Instruction(a), Self::Instruction(b)) => { + let i = &a.instruction; + let j = &b.instruction; + + return i.program_id == j.program_id + && i.ordinal == j.ordinal + && i.parent_ordinal == j.parent_ordinal + && i.depth == j.depth; + } + + (Self::Block(_), Self::Instruction(_)) | (Self::Instruction(_), Self::Block(_)) => { + false + } + } + } +} + +impl Eq for SolanaTrigger {} + +impl SolanaTrigger { + pub fn block_number(&self) -> BlockNumber { + match self { + SolanaTrigger::Block(block) => block.number(), + SolanaTrigger::Instruction(instruction_with_info) => instruction_with_info.number(), + } + } + + pub fn block_hash(&self) -> H256 { + match self { + SolanaTrigger::Block(block) => block.ptr().hash_as_h256(), + SolanaTrigger::Instruction(instruction_with_block) => { + H256::from_slice(instruction_with_block.block_id.as_slice()) + } + } + } +} + +impl Ord for SolanaTrigger { + fn cmp(&self, other: &Self) -> Ordering { + match (self, other) { + // Keep the order when comparing two block triggers + (Self::Block(..), Self::Block(..)) => Ordering::Equal, + + // Block triggers always come last + (Self::Block(..), _) => Ordering::Greater, + (_, Self::Block(..)) => Ordering::Less, + + // We assumed the provide instructions are ordered correctly, so we say they + // are equal here and array ordering will be used. + (Self::Instruction(..), Self::Instruction(..)) => Ordering::Equal, + } + } +} + +impl PartialOrd for SolanaTrigger { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl TriggerData for SolanaTrigger { + fn error_context(&self) -> std::string::String { + match self { + SolanaTrigger::Block(..) => { + format!("Block #{} ({})", self.block_number(), self.block_hash()) + } + + SolanaTrigger::Instruction(instruction_with_block) => { + format!( + "Instruction #{} (from #{}) for program {} (Block #{} ({})", + instruction_with_block.instruction.ordinal, + instruction_with_block.instruction.parent_ordinal, + hex::encode(&instruction_with_block.instruction.program_id), + self.block_number(), + self.block_hash() + ) + } + } + } +} + +pub struct InstructionWithInfo { + pub instruction: codec::Instruction, + pub block_num: u64, + pub block_id: Vec, + pub transaction_id: Vec, +} + +impl InstructionWithInfo { + pub fn number(&self) -> BlockNumber { + let num = self.block_num as i32; + num.into() + } +} + +// #[cfg(test)] +// mod tests { +// use std::convert::TryFrom; + +// use super::*; + +// use graph::{ +// anyhow::anyhow, +// data::subgraph::API_VERSION_0_0_5, +// prelude::{hex, BigInt}, +// }; + +// #[test] +// fn block_trigger_to_asc_ptr() { +// let mut heap = BytesHeap::new(API_VERSION_0_0_5); +// let trigger = SolanaTrigger::Block(Arc::new(block())); + +// let result = blockchain::MappingTrigger::to_asc_ptr(trigger, &mut heap); +// assert!(result.is_ok()); +// } + +// #[test] +// fn receipt_trigger_to_asc_ptr() { +// let mut heap = BytesHeap::new(API_VERSION_0_0_5); +// let trigger = SolanaTrigger::Instruction(Arc::new(ReceiptWithOutcome { +// block: Arc::new(block()), +// outcome: execution_outcome_with_id().unwrap(), +// receipt: receipt().unwrap(), +// })); + +// let result = blockchain::MappingTrigger::to_asc_ptr(trigger, &mut heap); +// assert!(result.is_ok()); +// } + +// fn block() -> codec::BlockWrapper { +// codec::BlockWrapper { +// block: Some(codec::Block { +// author: "test".to_string(), +// header: Some(codec::BlockHeader { +// height: 2, +// prev_height: 1, +// epoch_id: hash("01"), +// next_epoch_id: hash("02"), +// hash: hash("01"), +// prev_hash: hash("00"), +// prev_state_root: hash("bb00010203"), +// chunk_receipts_root: hash("bb00010203"), +// chunk_headers_root: hash("bb00010203"), +// chunk_tx_root: hash("bb00010203"), +// outcome_root: hash("cc00010203"), +// chunks_included: 1, +// challenges_root: hash("aa"), +// timestamp: 100, +// timestamp_nanosec: 0, +// random_value: hash("010203"), +// validator_proposals: vec![], +// chunk_mask: vec![], +// gas_price: big_int(10), +// block_ordinal: 0, +// validator_reward: big_int(100), +// total_supply: big_int(1_000), +// challenges_result: vec![], +// last_final_block: hash("00"), +// last_ds_final_block: hash("00"), +// next_bp_hash: hash("bb"), +// block_merkle_root: hash("aa"), +// epoch_sync_data_hash: vec![0x00, 0x01], +// approvals: vec![], +// signature: None, +// latest_protocol_version: 0, +// }), +// chunks: vec![chunk_header().unwrap()], +// }), +// shards: vec![codec::IndexerShard { +// shard_id: 0, +// chunk: Some(codec::IndexerChunk { +// author: "near".to_string(), +// header: chunk_header(), +// transactions: vec![codec::IndexerTransactionWithOutcome { +// transaction: Some(codec::SignedTransaction { +// signer_id: "signer".to_string(), +// public_key: Some(codec::PublicKey { bytes: vec![] }), +// nonce: 1, +// receiver_id: "receiver".to_string(), +// actions: vec![], +// signature: Some(codec::Signature { +// r#type: 1, +// bytes: vec![], +// }), +// hash: hash("bb"), +// }), +// outcome: Some(codec::IndexerExecutionOutcomeWithOptionalReceipt { +// execution_outcome: execution_outcome_with_id(), +// receipt: receipt(), +// }), +// }], +// receipts: vec![receipt().unwrap()], +// }), +// receipt_execution_outcomes: vec![codec::IndexerExecutionOutcomeWithReceipt { +// execution_outcome: execution_outcome_with_id(), +// receipt: receipt(), +// }], +// }], +// state_changes: vec![], +// } +// } + +// fn receipt() -> Option { +// Some(codec::Receipt { +// predecessor_id: "genesis.near".to_string(), +// receiver_id: "near".to_string(), +// receipt_id: hash("dead"), +// receipt: Some(codec::receipt::Receipt::Action(codec::ReceiptAction { +// signer_id: "near".to_string(), +// signer_public_key: Some(codec::PublicKey { bytes: vec![] }), +// gas_price: big_int(2), +// output_data_receivers: vec![], +// input_data_ids: vec![], +// actions: vec![ +// codec::Action { +// action: Some(codec::action::Action::CreateAccount( +// codec::CreateAccountAction {}, +// )), +// }, +// codec::Action { +// action: Some(codec::action::Action::DeployContract( +// codec::DeployContractAction { +// code: "/6q7zA==".to_string(), +// }, +// )), +// }, +// codec::Action { +// action: Some(codec::action::Action::FunctionCall( +// codec::FunctionCallAction { +// method_name: "func".to_string(), +// args: "e30=".to_string(), +// gas: 1000, +// deposit: big_int(100), +// }, +// )), +// }, +// codec::Action { +// action: Some(codec::action::Action::Transfer(codec::TransferAction { +// deposit: big_int(100), +// })), +// }, +// codec::Action { +// action: Some(codec::action::Action::Stake(codec::StakeAction { +// stake: big_int(100), +// public_key: Some(codec::PublicKey { bytes: vec![] }), +// })), +// }, +// codec::Action { +// action: Some(codec::action::Action::AddKey(codec::AddKeyAction { +// public_key: Some(codec::PublicKey { bytes: vec![] }), +// access_key: Some(codec::AccessKey { +// nonce: 1, +// permission: Some(codec::AccessKeyPermission { +// permission: Some( +// codec::access_key_permission::Permission::FullAccess( +// codec::FullAccessPermission {}, +// ), +// ), +// }), +// }), +// })), +// }, +// codec::Action { +// action: Some(codec::action::Action::DeleteKey(codec::DeleteKeyAction { +// public_key: Some(codec::PublicKey { bytes: vec![] }), +// })), +// }, +// codec::Action { +// action: Some(codec::action::Action::DeleteAccount( +// codec::DeleteAccountAction { +// beneficiary_id: "suicided.near".to_string(), +// }, +// )), +// }, +// ], +// })), +// }) +// } + +// fn chunk_header() -> Option { +// Some(codec::ChunkHeader { +// chunk_hash: vec![0x00], +// prev_block_hash: vec![0x01], +// outcome_root: vec![0x02], +// prev_state_root: vec![0x03], +// encoded_merkle_root: vec![0x04], +// encoded_length: 1, +// height_created: 2, +// height_included: 3, +// shard_id: 4, +// gas_used: 5, +// gas_limit: 6, +// validator_reward: big_int(7), +// balance_burnt: big_int(7), +// outgoing_receipts_root: vec![0x07], +// tx_root: vec![0x08], +// validator_proposals: vec![codec::ValidatorStake { +// account_id: "account".to_string(), +// public_key: public_key("aa"), +// stake: big_int(10), +// }], +// signature: Some(codec::Signature { +// r#type: 0, +// bytes: vec![], +// }), +// }) +// } + +// fn execution_outcome_with_id() -> Option { +// Some(codec::ExecutionOutcomeWithIdView { +// proof: Some(codec::MerklePath { path: vec![] }), +// block_hash: hash("aa"), +// id: hash("beef"), +// outcome: execution_outcome(), +// }) +// } + +// fn execution_outcome() -> Option { +// Some(codec::ExecutionOutcome { +// logs: vec!["string".to_string()], +// receipt_ids: vec![], +// gas_burnt: 1, +// tokens_burnt: big_int(2), +// executor_id: "near".to_string(), +// status: Some(codec::execution_outcome::Status::SuccessValue( +// codec::SuccessValueExecutionStatus { +// value: "/6q7zA==".to_string(), +// }, +// )), +// }) +// } + +// fn big_int(input: u64) -> Option { +// let value = +// BigInt::try_from(input).expect(format!("Invalid BigInt value {}", input).as_ref()); +// let bytes = value.to_signed_bytes_le(); + +// Some(codec::BigInt { bytes }) +// } + +// fn hash(input: &str) -> Option { +// Some(codec::CryptoHash { +// bytes: hex::decode(input).expect(format!("Invalid hash value {}", input).as_ref()), +// }) +// } + +// fn public_key(input: &str) -> Option { +// Some(codec::PublicKey { +// bytes: hex::decode(input).expect(format!("Invalid PublicKey value {}", input).as_ref()), +// }) +// } + +// struct BytesHeap { +// api_version: graph::semver::Version, +// memory: Vec, +// } + +// impl BytesHeap { +// fn new(api_version: graph::semver::Version) -> Self { +// Self { +// api_version, +// memory: vec![], +// } +// } +// } + +// impl AscHeap for BytesHeap { +// fn raw_new(&mut self, bytes: &[u8]) -> Result { +// self.memory.extend_from_slice(bytes); +// Ok((self.memory.len() - bytes.len()) as u32) +// } + +// fn get(&self, offset: u32, size: u32) -> Result, DeterministicHostError> { +// let memory_byte_count = self.memory.len(); +// if memory_byte_count == 0 { +// return Err(DeterministicHostError(anyhow!("No memory is allocated"))); +// } + +// let start_offset = offset as usize; +// let end_offset_exclusive = start_offset + size as usize; + +// if start_offset >= memory_byte_count { +// return Err(DeterministicHostError(anyhow!( +// "Start offset {} is outside of allocated memory, max offset is {}", +// start_offset, +// memory_byte_count - 1 +// ))); +// } + +// if end_offset_exclusive > memory_byte_count { +// return Err(DeterministicHostError(anyhow!( +// "End of offset {} is outside of allocated memory, max offset is {}", +// end_offset_exclusive, +// memory_byte_count - 1 +// ))); +// } + +// return Ok(Vec::from(&self.memory[start_offset..end_offset_exclusive])); +// } + +// fn api_version(&self) -> graph::semver::Version { +// self.api_version.clone() +// } + +// fn asc_type_id( +// &mut self, +// type_id_index: graph::runtime::IndexForAscTypeId, +// ) -> Result { +// // Not totally clear what is the purpose of this method, why not a default implementation here? +// Ok(type_id_index as u32) +// } +// } +// } diff --git a/core/Cargo.toml b/core/Cargo.toml index b9cf9e85fa2..c592ac36141 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -15,6 +15,7 @@ graph = { path = "../graph" } # finished as long as this dependency exists graph-chain-ethereum = { path = "../chain/ethereum" } graph-chain-near = { path = "../chain/near" } +graph-chain-solana = { path = "../chain/solana" } lazy_static = "1.2.0" lru_time_cache = "0.11" semver = "1.0.3" diff --git a/core/src/subgraph/instance_manager.rs b/core/src/subgraph/instance_manager.rs index f1734d1a0a7..46a80c17f22 100644 --- a/core/src/subgraph/instance_manager.rs +++ b/core/src/subgraph/instance_manager.rs @@ -207,6 +207,13 @@ where ) .await } + BlockchainKind::Solana => { + instance_manager + .start_subgraph_inner::( + logger, loc, manifest, stop_block, + ) + .await + } } }; // Perform the actual work of starting the subgraph in a separate diff --git a/core/src/subgraph/registrar.rs b/core/src/subgraph/registrar.rs index 8ff2763fcc5..f8b7b7acb97 100644 --- a/core/src/subgraph/registrar.rs +++ b/core/src/subgraph/registrar.rs @@ -323,6 +323,20 @@ where ) .await? } + BlockchainKind::Solana => { + create_subgraph_version::( + &logger, + self.store.clone(), + self.chains.cheap_clone(), + name.clone(), + hash.cheap_clone(), + raw, + node_id, + self.version_switching_mode, + self.resolver.cheap_clone(), + ) + .await? + } }; debug!( diff --git a/graph/src/blockchain/mod.rs b/graph/src/blockchain/mod.rs index 4fe2e223f56..84fb863bb1d 100644 --- a/graph/src/blockchain/mod.rs +++ b/graph/src/blockchain/mod.rs @@ -301,6 +301,9 @@ pub enum BlockchainKind { /// NEAR chains (Mainnet, Testnet) or chains that are compatible Near, + + /// Solana chains (Mainnet, Testnet) or chains that are compatible + Solana, } impl fmt::Display for BlockchainKind { @@ -308,6 +311,7 @@ impl fmt::Display for BlockchainKind { let value = match self { BlockchainKind::Ethereum => "ethereum", BlockchainKind::Near => "near", + BlockchainKind::Solana => "solana", }; write!(f, "{}", value) } @@ -320,6 +324,7 @@ impl FromStr for BlockchainKind { match s { "ethereum" => Ok(BlockchainKind::Ethereum), "near" => Ok(BlockchainKind::Near), + "solana" => Ok(BlockchainKind::Solana), _ => Err(anyhow!("unknown blockchain kind {}", s)), } } diff --git a/graph/src/blockchain/types.rs b/graph/src/blockchain/types.rs index ffa168f0de6..860d718be81 100644 --- a/graph/src/blockchain/types.rs +++ b/graph/src/blockchain/types.rs @@ -170,9 +170,17 @@ impl TryFrom<(&str, i64)> for BlockPtr { type Error = anyhow::Error; fn try_from((hash, number): (&str, i64)) -> Result { + BlockPtr::try_from((hash, number as u64)) + } +} + +impl TryFrom<(&str, u64)> for BlockPtr { + type Error = anyhow::Error; + + fn try_from((hash, number): (&str, u64)) -> Result { let hash = hash.trim_start_matches("0x"); let hash = H256::from_str(hash) - .map_err(|e| anyhow!("Cannot parse H256 value from string `{}`: {}", hash, e))?; + .context(anyhow!("Cannot parse H256 value from string `{}`", hash))?; Ok(BlockPtr::from((hash, number))) } diff --git a/graph/src/firehose/endpoints.rs b/graph/src/firehose/endpoints.rs index 4aadce56a6b..6d2f10154e1 100644 --- a/graph/src/firehose/endpoints.rs +++ b/graph/src/firehose/endpoints.rs @@ -92,10 +92,12 @@ impl FirehoseEndpoint { }, ); - debug!(logger, "Connecting to firehose to retrieve genesis block"); + debug!(logger, "Connecting to firehose to retrieve genesis block, start block 118081400, stop block 119081400"); + let response_stream = client .blocks(firehose::Request { - start_block_num: 0, + start_block_num: 118081400, + stop_block_num: 119081400, fork_steps: vec![ForkStep::StepIrreversible as i32], ..Default::default() }) diff --git a/graph/src/runtime/mod.rs b/graph/src/runtime/mod.rs index c1c8e92c2f3..f88d4033514 100644 --- a/graph/src/runtime/mod.rs +++ b/graph/src/runtime/mod.rs @@ -240,6 +240,21 @@ pub enum IndexForAscTypeId { NearChunkHeader = 84, NearBlock = 85, NearReceiptWithOutcome = 86, + SolanaBlock = 87, + SolanaInstruction = 88, + SolanaInstructionWithBlock = 89, + SolanaTransaction = 90, + SolanaMessageHeader = 91, + SolanaTransactionError = 92, + SolanaInstructionError = 93, + SolanaArrayTransaction = 94, + SolanaArrayInstruction = 95, + SolanaArrayHash = 96, + SolanaArrayString = 97, + SolanaBalanceChange = 98, + SolanaBalanceChangeArray = 99, + SolanaAccountChange = 100, + SolanaAccountChangeArray = 101, } impl ToAscObj for IndexForAscTypeId { diff --git a/node/Cargo.toml b/node/Cargo.toml index f7fb2239a41..7faf999439c 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -25,6 +25,7 @@ graph = { path = "../graph" } graph-core = { path = "../core" } graph-chain-ethereum = { path = "../chain/ethereum" } graph-chain-near = { path = "../chain/near" } +graph-chain-solana = { path = "../chain/solana" } graph-graphql = { path = "../graphql" } graph-runtime-wasm = { path = "../runtime/wasm" } graph-server-http = { path = "../server/http" } diff --git a/node/src/main.rs b/node/src/main.rs index 52f32be00c8..72a7fc72535 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -10,6 +10,7 @@ use graph::prelude::{IndexNodeServer as _, JsonRpcServer as _, *}; use graph::prometheus::Registry; use graph_chain_ethereum as ethereum; use graph_chain_near::{self as near, HeaderOnlyBlock as NearFirehoseHeaderOnlyBlock}; +use graph_chain_solana::{self as solana, Block as SolanaFirehoseBlock}; use graph_core::{ LinkResolver, MetricsRegistry, SubgraphAssignmentProvider as IpfsSubgraphAssignmentProvider, SubgraphInstanceManager, SubgraphRegistrar as IpfsSubgraphRegistrar, @@ -199,6 +200,7 @@ async fn main() { let (eth_networks, ethereum_idents) = connect_ethereum_networks(&logger, eth_networks).await; + let (near_networks, near_idents) = connect_firehose_networks::( &logger, @@ -208,7 +210,19 @@ async fn main() { ) .await; - let network_identifiers = ethereum_idents.into_iter().chain(near_idents).collect(); + let (solana_networks, solana_idents) = connect_firehose_networks::( + &logger, + firehose_networks_by_kind + .remove(&BlockchainKind::Solana) + .unwrap_or_else(|| FirehoseNetworks::new()), + ) + .await; + + let network_identifiers = ethereum_idents + .into_iter() + .chain(near_idents.into_iter()) + .chain(solana_idents.into_iter()) + .collect(); let network_store = store_builder.network_store(network_identifiers); let ethereum_chains = ethereum_networks_as_chains( @@ -231,6 +245,14 @@ async fn main() { &logger_factory, ); + let solana_chains = solana_networks_as_chains( + &mut blockchain_map, + &logger, + &solana_networks, + network_store.as_ref(), + &logger_factory, + ); + let blockchain_map = Arc::new(blockchain_map); let load_manager = Arc::new(LoadManager::new( @@ -279,6 +301,11 @@ async fn main() { &network_store, near_chains, ); + start_firehose_block_ingestor::<_, SolanaFirehoseBlock>( + &logger, + &network_store, + solana_chains, + ); // Start a task runner let mut job_runner = graph::util::jobs::Runner::new(&logger); @@ -544,6 +571,54 @@ fn near_networks_as_chains( HashMap::from_iter(chains) } +/// Return the hashmap of SOLANA chains and also add them to `blockchain_map`. +fn solana_networks_as_chains( + blockchain_map: &mut BlockchainMap, + logger: &Logger, + firehose_networks: &FirehoseNetworks, + store: &Store, + logger_factory: &LoggerFactory, +) -> HashMap> { + let chains: Vec<_> = firehose_networks + .networks + .iter() + .filter_map(|(chain_id, endpoints)| { + store + .block_store() + .chain_store(chain_id) + .map(|chain_store| (chain_id, chain_store, endpoints)) + .or_else(|| { + error!( + logger, + "No store configured for SOLANA chain {}; ignoring this chain", chain_id + ); + None + }) + }) + .map(|(chain_id, chain_store, endpoints)| { + ( + chain_id.clone(), + FirehoseChain { + chain: Arc::new(solana::Chain::new( + logger_factory.clone(), + chain_id.clone(), + chain_store, + endpoints.clone(), + )), + firehose_endpoints: endpoints.clone(), + }, + ) + }) + .collect(); + + for (chain_id, firehose_chain) in chains.iter() { + blockchain_map + .insert::(chain_id.clone(), firehose_chain.chain.clone()) + } + + HashMap::from_iter(chains) +} + fn start_block_ingestor( logger: &Logger, logger_factory: &LoggerFactory, diff --git a/server/index-node/Cargo.toml b/server/index-node/Cargo.toml index 89aee3b0b4b..62ca879be5e 100644 --- a/server/index-node/Cargo.toml +++ b/server/index-node/Cargo.toml @@ -9,6 +9,7 @@ graph = { path = "../../graph" } graph-graphql = { path = "../../graphql" } graph-chain-ethereum = { path = "../../chain/ethereum" } graph-chain-near = { path = "../../chain/near" } +graph-chain-solana = { path = "../../chain/solana" } graphql-parser = "0.4.0" http = "0.2" hyper = "0.14" diff --git a/server/index-node/src/resolver.rs b/server/index-node/src/resolver.rs index 3cccfb1ad6f..7be5dd92281 100644 --- a/server/index-node/src/resolver.rs +++ b/server/index-node/src/resolver.rs @@ -239,6 +239,22 @@ where unvalidated_subgraph_manifest, )? } + BlockchainKind::Solana => { + let unvalidated_subgraph_manifest = + UnvalidatedSubgraphManifest::::resolve( + deployment_hash, + raw, + self.link_resolver.clone(), + &self.logger, + MAX_SPEC_VERSION.clone(), + ) + .await?; + + validate_and_extract_features( + &self.subgraph_store, + unvalidated_subgraph_manifest, + )? + } } }; From 6a8653eacab5c047ae5445f9fb32f9694cb84b53 Mon Sep 17 00:00:00 2001 From: colindickson Date: Tue, 8 Feb 2022 10:33:18 -0500 Subject: [PATCH 2/4] wip: pr changes --- chain/solana/src/chain.rs | 36 ++-- chain/solana/src/codec.rs | 28 +-- chain/solana/src/data_source.rs | 39 +--- chain/solana/src/runtime/abi.rs | 1 - chain/solana/src/trigger.rs | 319 -------------------------------- graph/src/firehose/endpoints.rs | 3 +- 6 files changed, 32 insertions(+), 394 deletions(-) diff --git a/chain/solana/src/chain.rs b/chain/solana/src/chain.rs index 5b2c17dee70..88592211004 100644 --- a/chain/solana/src/chain.rs +++ b/chain/solana/src/chain.rs @@ -41,7 +41,7 @@ pub struct Chain { impl std::fmt::Debug for Chain { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "chain: near") + write!(f, "chain: solana") } } @@ -139,7 +139,7 @@ impl Blockchain for Chain { _metrics: Arc, _unified_api_version: UnifiedMappingApiVersion, ) -> Result>, Error> { - panic!("SOLANA does not support polling block stream") + panic!("Solana does not support polling block stream") } fn chain_store(&self) -> Arc { @@ -151,7 +151,17 @@ impl Blockchain for Chain { _logger: &Logger, _number: BlockNumber, ) -> Result { - // FIXME (Solana): Hmmm, what to do with this? + // FIXME (Solana): Once https://github.com/graphprotocol/graph-node/pull/3174/ is merged, we can rebase and do: + // let firehose_endpoint = match self.firehose_endpoints.random() { + // Some(e) => e.clone(), + // None => return Err(anyhow::format_err!("no firehose endpoint available").into()), + // }; + // + // firehose_endpoint + // .block_ptr_for_number::(logger, number) + // .map_err(Into::into) + // .await + Ok(BlockPtr { hash: BlockHash::from(vec![0xff; 32]), number: 0, @@ -176,8 +186,7 @@ impl TriggersAdapterTrait for TriggersAdapter { _ptr: BlockPtr, _offset: BlockNumber, ) -> Result, Error> { - // FIXME (Solana): Might not be necessary for Solana support for now - Ok(None) + panic!("Should never be called since FirehoseBlockStream cannot resolve it") } async fn scan_triggers( @@ -186,8 +195,7 @@ impl TriggersAdapterTrait for TriggersAdapter { _to: BlockNumber, _filter: &TriggerFilter, ) -> Result>, Error> { - // FIXME (Solana): Scanning triggers makes little sense in Firehose approach, let's see - Ok(vec![]) + panic!("Should never be called since not used by FirehoseBlockStream") } async fn triggers_in_block( @@ -198,7 +206,6 @@ impl TriggersAdapterTrait for TriggersAdapter { ) -> Result, Error> { let shared_block = Arc::new(block.clone()); let instructions = block.transactions.iter().flat_map(|transaction| { - //let transaction_id = transaction.id.clone(); let b = shared_block.clone(); let tx = transaction.clone(); transaction.instructions.iter().flat_map(move |instruction| { @@ -220,14 +227,12 @@ impl TriggersAdapterTrait for TriggersAdapter { } async fn is_on_main_chain(&self, _ptr: BlockPtr) -> Result { - // FIXME (Solana): Might not be necessary for Solana support for now - Ok(true) + panic!("Should never be called since not used by FirehoseBlockStream") } /// Panics if `block` is genesis. /// But that's ok since this is only called when reverting `block`. async fn parent_ptr(&self, block: &BlockPtr) -> Result, Error> { - // FIXME (NEAR): Might not be necessary for NEAR support for now Ok(Some(BlockPtr { hash: BlockHash::from(vec![0xff; 32]), number: block.number.saturating_sub(1), @@ -274,16 +279,11 @@ impl FirehoseMapperTrait for FirehoseMapper { )), StepUndo => { - // let header = block.header(); - // let parent_ptr = header - // .parent_ptr() - // .expect("Genesis block should never be reverted"); - //todo: ^^^^^^^^^^^^^^^^^^^^^^^^^ - + let parent_ptr = block.parent_ptr().expect("Genesis block should never be reverted"); Ok(BlockStreamEvent::Revert( block.ptr(), Some(response.cursor.clone()), - None, //Some(parent_ptr), //todo: <----??? + Some(parent_ptr), )) } diff --git a/chain/solana/src/codec.rs b/chain/solana/src/codec.rs index 2b5c31edc2e..06bd4d568e3 100644 --- a/chain/solana/src/codec.rs +++ b/chain/solana/src/codec.rs @@ -8,30 +8,20 @@ pub use pbcodec::*; impl Block { pub fn parent_ptr(&self) -> Option { - return None; - // if self.previous_id.len() == 0 { - // return None; - // } - // - // Some( - // BlockPtr::try_from((self.previous_id.as_ref(), self.number)) - // .expect("invalid block's hash"), - // ) - //todo: ^^^^^ - } -} - -impl From for BlockPtr { - fn from(b: Block) -> BlockPtr { - (&b).into() + if self.previous_id.len() == 0 { + return None; + } + + let hash = String::from_utf8(self.previous_id.clone()).expect("could not decode block hash"); + Some( + BlockPtr::try_from((hash.as_str(), self.number)) + .expect("invalid block hash"), + ) } } impl<'a> From<&'a Block> for BlockPtr { fn from(b: &'a Block) -> BlockPtr { - // let hash = BlockHash::from(b.id) - // .expect(&format!("id {} should be a valid BlockHash", &b.id,)); - BlockPtr::try_from((b.id.as_slice(), i64::try_from(b.number).unwrap())) .expect("invalid block's hash") } diff --git a/chain/solana/src/data_source.rs b/chain/solana/src/data_source.rs index e9a5759acd1..14c41842d97 100644 --- a/chain/solana/src/data_source.rs +++ b/chain/solana/src/data_source.rs @@ -87,10 +87,10 @@ impl blockchain::DataSource for DataSource { }, SolanaTrigger::Instruction(instruction_with_block) => { - let pid = &instruction_with_block.instruction.program_id; - let encoded_instruction_pid = pid.as_slice().to_base58(); + let program_id = &instruction_with_block.instruction.program_id; + let encoded_program_id = program_id.as_slice().to_base58(); - if Some(encoded_instruction_pid) != self.source.program_id { + if Some(encoded_program_id) != self.source.program_id { return Ok(None); } @@ -245,38 +245,7 @@ impl TryFrom> for DataSource { type Error = Error; fn try_from(_info: DataSourceTemplateInfo) -> Result { - Err(anyhow!("Near subgraphs do not support templates")) - - // How this might be implemented if/when Near gets support for templates: - // let DataSourceTemplateInfo { - // template, - // params, - // context, - // creation_block, - // } = info; - - // let account = params - // .get(0) - // .with_context(|| { - // format!( - // "Failed to create data source from template `{}`: account parameter is missing", - // template.name - // ) - // })? - // .clone(); - - // Ok(DataSource { - // kind: template.kind, - // network: template.network, - // name: template.name, - // source: Source { - // account, - // start_block: 0, - // }, - // mapping: template.mapping, - // context: Arc::new(context), - // creation_block: Some(creation_block), - // }) + Err(anyhow!("Solana subgraphs do not support templates")) } } diff --git a/chain/solana/src/runtime/abi.rs b/chain/solana/src/runtime/abi.rs index b7bb2d2ca8b..399354aa5a8 100644 --- a/chain/solana/src/runtime/abi.rs +++ b/chain/solana/src/runtime/abi.rs @@ -260,7 +260,6 @@ impl ToAscObj for Vec> { .iter() .map(|x| { asc_new(heap, x.as_slice()) - // asc_new(heap, slice.as_ref())? }) .collect(); let content = content?; diff --git a/chain/solana/src/trigger.rs b/chain/solana/src/trigger.rs index 3cbda1fdb37..c5a10d1186d 100644 --- a/chain/solana/src/trigger.rs +++ b/chain/solana/src/trigger.rs @@ -161,322 +161,3 @@ impl InstructionWithInfo { num.into() } } - -// #[cfg(test)] -// mod tests { -// use std::convert::TryFrom; - -// use super::*; - -// use graph::{ -// anyhow::anyhow, -// data::subgraph::API_VERSION_0_0_5, -// prelude::{hex, BigInt}, -// }; - -// #[test] -// fn block_trigger_to_asc_ptr() { -// let mut heap = BytesHeap::new(API_VERSION_0_0_5); -// let trigger = SolanaTrigger::Block(Arc::new(block())); - -// let result = blockchain::MappingTrigger::to_asc_ptr(trigger, &mut heap); -// assert!(result.is_ok()); -// } - -// #[test] -// fn receipt_trigger_to_asc_ptr() { -// let mut heap = BytesHeap::new(API_VERSION_0_0_5); -// let trigger = SolanaTrigger::Instruction(Arc::new(ReceiptWithOutcome { -// block: Arc::new(block()), -// outcome: execution_outcome_with_id().unwrap(), -// receipt: receipt().unwrap(), -// })); - -// let result = blockchain::MappingTrigger::to_asc_ptr(trigger, &mut heap); -// assert!(result.is_ok()); -// } - -// fn block() -> codec::BlockWrapper { -// codec::BlockWrapper { -// block: Some(codec::Block { -// author: "test".to_string(), -// header: Some(codec::BlockHeader { -// height: 2, -// prev_height: 1, -// epoch_id: hash("01"), -// next_epoch_id: hash("02"), -// hash: hash("01"), -// prev_hash: hash("00"), -// prev_state_root: hash("bb00010203"), -// chunk_receipts_root: hash("bb00010203"), -// chunk_headers_root: hash("bb00010203"), -// chunk_tx_root: hash("bb00010203"), -// outcome_root: hash("cc00010203"), -// chunks_included: 1, -// challenges_root: hash("aa"), -// timestamp: 100, -// timestamp_nanosec: 0, -// random_value: hash("010203"), -// validator_proposals: vec![], -// chunk_mask: vec![], -// gas_price: big_int(10), -// block_ordinal: 0, -// validator_reward: big_int(100), -// total_supply: big_int(1_000), -// challenges_result: vec![], -// last_final_block: hash("00"), -// last_ds_final_block: hash("00"), -// next_bp_hash: hash("bb"), -// block_merkle_root: hash("aa"), -// epoch_sync_data_hash: vec![0x00, 0x01], -// approvals: vec![], -// signature: None, -// latest_protocol_version: 0, -// }), -// chunks: vec![chunk_header().unwrap()], -// }), -// shards: vec![codec::IndexerShard { -// shard_id: 0, -// chunk: Some(codec::IndexerChunk { -// author: "near".to_string(), -// header: chunk_header(), -// transactions: vec![codec::IndexerTransactionWithOutcome { -// transaction: Some(codec::SignedTransaction { -// signer_id: "signer".to_string(), -// public_key: Some(codec::PublicKey { bytes: vec![] }), -// nonce: 1, -// receiver_id: "receiver".to_string(), -// actions: vec![], -// signature: Some(codec::Signature { -// r#type: 1, -// bytes: vec![], -// }), -// hash: hash("bb"), -// }), -// outcome: Some(codec::IndexerExecutionOutcomeWithOptionalReceipt { -// execution_outcome: execution_outcome_with_id(), -// receipt: receipt(), -// }), -// }], -// receipts: vec![receipt().unwrap()], -// }), -// receipt_execution_outcomes: vec![codec::IndexerExecutionOutcomeWithReceipt { -// execution_outcome: execution_outcome_with_id(), -// receipt: receipt(), -// }], -// }], -// state_changes: vec![], -// } -// } - -// fn receipt() -> Option { -// Some(codec::Receipt { -// predecessor_id: "genesis.near".to_string(), -// receiver_id: "near".to_string(), -// receipt_id: hash("dead"), -// receipt: Some(codec::receipt::Receipt::Action(codec::ReceiptAction { -// signer_id: "near".to_string(), -// signer_public_key: Some(codec::PublicKey { bytes: vec![] }), -// gas_price: big_int(2), -// output_data_receivers: vec![], -// input_data_ids: vec![], -// actions: vec![ -// codec::Action { -// action: Some(codec::action::Action::CreateAccount( -// codec::CreateAccountAction {}, -// )), -// }, -// codec::Action { -// action: Some(codec::action::Action::DeployContract( -// codec::DeployContractAction { -// code: "/6q7zA==".to_string(), -// }, -// )), -// }, -// codec::Action { -// action: Some(codec::action::Action::FunctionCall( -// codec::FunctionCallAction { -// method_name: "func".to_string(), -// args: "e30=".to_string(), -// gas: 1000, -// deposit: big_int(100), -// }, -// )), -// }, -// codec::Action { -// action: Some(codec::action::Action::Transfer(codec::TransferAction { -// deposit: big_int(100), -// })), -// }, -// codec::Action { -// action: Some(codec::action::Action::Stake(codec::StakeAction { -// stake: big_int(100), -// public_key: Some(codec::PublicKey { bytes: vec![] }), -// })), -// }, -// codec::Action { -// action: Some(codec::action::Action::AddKey(codec::AddKeyAction { -// public_key: Some(codec::PublicKey { bytes: vec![] }), -// access_key: Some(codec::AccessKey { -// nonce: 1, -// permission: Some(codec::AccessKeyPermission { -// permission: Some( -// codec::access_key_permission::Permission::FullAccess( -// codec::FullAccessPermission {}, -// ), -// ), -// }), -// }), -// })), -// }, -// codec::Action { -// action: Some(codec::action::Action::DeleteKey(codec::DeleteKeyAction { -// public_key: Some(codec::PublicKey { bytes: vec![] }), -// })), -// }, -// codec::Action { -// action: Some(codec::action::Action::DeleteAccount( -// codec::DeleteAccountAction { -// beneficiary_id: "suicided.near".to_string(), -// }, -// )), -// }, -// ], -// })), -// }) -// } - -// fn chunk_header() -> Option { -// Some(codec::ChunkHeader { -// chunk_hash: vec![0x00], -// prev_block_hash: vec![0x01], -// outcome_root: vec![0x02], -// prev_state_root: vec![0x03], -// encoded_merkle_root: vec![0x04], -// encoded_length: 1, -// height_created: 2, -// height_included: 3, -// shard_id: 4, -// gas_used: 5, -// gas_limit: 6, -// validator_reward: big_int(7), -// balance_burnt: big_int(7), -// outgoing_receipts_root: vec![0x07], -// tx_root: vec![0x08], -// validator_proposals: vec![codec::ValidatorStake { -// account_id: "account".to_string(), -// public_key: public_key("aa"), -// stake: big_int(10), -// }], -// signature: Some(codec::Signature { -// r#type: 0, -// bytes: vec![], -// }), -// }) -// } - -// fn execution_outcome_with_id() -> Option { -// Some(codec::ExecutionOutcomeWithIdView { -// proof: Some(codec::MerklePath { path: vec![] }), -// block_hash: hash("aa"), -// id: hash("beef"), -// outcome: execution_outcome(), -// }) -// } - -// fn execution_outcome() -> Option { -// Some(codec::ExecutionOutcome { -// logs: vec!["string".to_string()], -// receipt_ids: vec![], -// gas_burnt: 1, -// tokens_burnt: big_int(2), -// executor_id: "near".to_string(), -// status: Some(codec::execution_outcome::Status::SuccessValue( -// codec::SuccessValueExecutionStatus { -// value: "/6q7zA==".to_string(), -// }, -// )), -// }) -// } - -// fn big_int(input: u64) -> Option { -// let value = -// BigInt::try_from(input).expect(format!("Invalid BigInt value {}", input).as_ref()); -// let bytes = value.to_signed_bytes_le(); - -// Some(codec::BigInt { bytes }) -// } - -// fn hash(input: &str) -> Option { -// Some(codec::CryptoHash { -// bytes: hex::decode(input).expect(format!("Invalid hash value {}", input).as_ref()), -// }) -// } - -// fn public_key(input: &str) -> Option { -// Some(codec::PublicKey { -// bytes: hex::decode(input).expect(format!("Invalid PublicKey value {}", input).as_ref()), -// }) -// } - -// struct BytesHeap { -// api_version: graph::semver::Version, -// memory: Vec, -// } - -// impl BytesHeap { -// fn new(api_version: graph::semver::Version) -> Self { -// Self { -// api_version, -// memory: vec![], -// } -// } -// } - -// impl AscHeap for BytesHeap { -// fn raw_new(&mut self, bytes: &[u8]) -> Result { -// self.memory.extend_from_slice(bytes); -// Ok((self.memory.len() - bytes.len()) as u32) -// } - -// fn get(&self, offset: u32, size: u32) -> Result, DeterministicHostError> { -// let memory_byte_count = self.memory.len(); -// if memory_byte_count == 0 { -// return Err(DeterministicHostError(anyhow!("No memory is allocated"))); -// } - -// let start_offset = offset as usize; -// let end_offset_exclusive = start_offset + size as usize; - -// if start_offset >= memory_byte_count { -// return Err(DeterministicHostError(anyhow!( -// "Start offset {} is outside of allocated memory, max offset is {}", -// start_offset, -// memory_byte_count - 1 -// ))); -// } - -// if end_offset_exclusive > memory_byte_count { -// return Err(DeterministicHostError(anyhow!( -// "End of offset {} is outside of allocated memory, max offset is {}", -// end_offset_exclusive, -// memory_byte_count - 1 -// ))); -// } - -// return Ok(Vec::from(&self.memory[start_offset..end_offset_exclusive])); -// } - -// fn api_version(&self) -> graph::semver::Version { -// self.api_version.clone() -// } - -// fn asc_type_id( -// &mut self, -// type_id_index: graph::runtime::IndexForAscTypeId, -// ) -> Result { -// // Not totally clear what is the purpose of this method, why not a default implementation here? -// Ok(type_id_index as u32) -// } -// } -// } diff --git a/graph/src/firehose/endpoints.rs b/graph/src/firehose/endpoints.rs index 6d2f10154e1..822fdbc736f 100644 --- a/graph/src/firehose/endpoints.rs +++ b/graph/src/firehose/endpoints.rs @@ -96,8 +96,7 @@ impl FirehoseEndpoint { let response_stream = client .blocks(firehose::Request { - start_block_num: 118081400, - stop_block_num: 119081400, + start_block_num: 0, fork_steps: vec![ForkStep::StepIrreversible as i32], ..Default::default() }) From 7c8d73d8dbf7188a7ed6d54247dfe6574aa3cc95 Mon Sep 17 00:00:00 2001 From: billettc Date: Wed, 9 Feb 2022 14:26:57 -0500 Subject: [PATCH 3/4] added logs to instruction --- chain/solana/proto/codec.proto | 2 ++ chain/solana/src/protobuf/sf.solana.codec.v1.rs | 2 ++ chain/solana/src/runtime/abi.rs | 8 ++------ chain/solana/src/runtime/generated.rs | 2 ++ 4 files changed, 8 insertions(+), 6 deletions(-) diff --git a/chain/solana/proto/codec.proto b/chain/solana/proto/codec.proto index 14c94fb8d6c..58e55b29c6a 100644 --- a/chain/solana/proto/codec.proto +++ b/chain/solana/proto/codec.proto @@ -108,6 +108,8 @@ message Instruction { repeated BalanceChange balance_changes = 9; repeated AccountChange account_changes = 10; + repeated string logs = 11; + bool failed = 15; InstructionError error = 16; } diff --git a/chain/solana/src/protobuf/sf.solana.codec.v1.rs b/chain/solana/src/protobuf/sf.solana.codec.v1.rs index 7a297701374..bf5e3c1c0e8 100644 --- a/chain/solana/src/protobuf/sf.solana.codec.v1.rs +++ b/chain/solana/src/protobuf/sf.solana.codec.v1.rs @@ -128,6 +128,8 @@ pub struct Instruction { pub balance_changes: ::prost::alloc::vec::Vec, #[prost(message, repeated, tag = "10")] pub account_changes: ::prost::alloc::vec::Vec, + #[prost(string, repeated, tag = "11")] + pub logs: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, #[prost(bool, tag = "15")] pub failed: bool, #[prost(message, optional, tag = "16")] diff --git a/chain/solana/src/runtime/abi.rs b/chain/solana/src/runtime/abi.rs index 399354aa5a8..ae93f21886c 100644 --- a/chain/solana/src/runtime/abi.rs +++ b/chain/solana/src/runtime/abi.rs @@ -46,6 +46,7 @@ impl ToAscObj for codec::Instruction { depth: self.depth, balance_changes: asc_new(heap, &self.balance_changes)?, account_changes: asc_new(heap, &self.account_changes)?, + log_messages: asc_new(heap, &self.logs)?, error: asc_new(heap, &self.error)?, failed: false, _padding: Padding3::new(), @@ -256,12 +257,7 @@ impl ToAscObj for Vec> { &self, heap: &mut H, ) -> Result { - let content: Result, _> = self - .iter() - .map(|x| { - asc_new(heap, x.as_slice()) - }) - .collect(); + let content: Result, _> = self.iter().map(|x| asc_new(heap, x.as_slice())).collect(); let content = content?; Ok(AscHashArray(Array::new(&*content, heap)?)) } diff --git a/chain/solana/src/runtime/generated.rs b/chain/solana/src/runtime/generated.rs index 2c87a9705b1..5e4c8e576ab 100644 --- a/chain/solana/src/runtime/generated.rs +++ b/chain/solana/src/runtime/generated.rs @@ -40,11 +40,13 @@ pub(crate) struct AscInstruction { pub data: AscPtr, pub balance_changes: AscPtr, pub account_changes: AscPtr, + pub log_messages: AscPtr, pub error: AscPtr, pub failed: bool, pub _padding: Padding3, } + // special type for the rare occurance where we need to pad three bytes pub(crate) struct Padding3([u8; 3]); From 604ff86ce56fcb4326a92a79eb8df1995d45d854 Mon Sep 17 00:00:00 2001 From: billettc Date: Thu, 10 Feb 2022 14:15:07 -0500 Subject: [PATCH 4/4] change a log --- graph/src/firehose/endpoints.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/graph/src/firehose/endpoints.rs b/graph/src/firehose/endpoints.rs index 822fdbc736f..d25fd7a496c 100644 --- a/graph/src/firehose/endpoints.rs +++ b/graph/src/firehose/endpoints.rs @@ -92,7 +92,7 @@ impl FirehoseEndpoint { }, ); - debug!(logger, "Connecting to firehose to retrieve genesis block, start block 118081400, stop block 119081400"); + debug!(logger, "Connecting to firehose to retrieve genesis block, start block 120087500, stop block 120087500"); let response_stream = client .blocks(firehose::Request {