Generally, an indexer processor follow this flow:
- Receive a stream of Aptos transactions
- Extract data from the transactions
- Transform and merge the parsed data into a coherent, standardized schema
- Store the transformed data into a database
The Aptos Indexer SDK works by modeling each processor as a graph of independent steps. Each of the steps in the flow above is written as a Step
in the SDK, and the output of each Step
is connected to the input of the next Step
by a channel.
To your Cargo.toml
, add
aptos-indexer-processor-sdk = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "{COMMIT_HASH}" }
aptos-indexer-processor-sdk-server-framework = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "{COMMIT_HASH}" }
We’ve created a Quickstart Guide to Aptos Indexer SDK which gets you setup and running an events processor that indexes events on the Aptos blockchain.
To create a step in the SDK, implement these traits:
-
Processable
#[async_trait] impl Processable for MyExtractorStep { type Input = Transaction; type Output = ExtractedDataModel; type RunType = AsyncRunType; // Processes a batch of input items and returns a batch of output items. async fn process( &mut self, input: TransactionContext<Transaction>, ) -> Result<Option<TransactionContext<ExtractedDataModel>>, ProcessorError> { let extracted_data = ... // Extract data from items.data Ok(Some(TransactionContext { data: extracted_data, start_version: input.start_version, end_version: input.end_version, start_transaction_timestamp: input.start_transaction_timestamp, end_transaction_timestamp: input.end_transaction_timestamp, total_size_in_bytes: input.total_size_in_bytes, })) } }
-
NamedStep
impl NamedStep for MyExtractorStep { fn name(&self) -> String { "MyExtractorStep".to_string() } }
-
Either AsyncStep or PollableAsyncStep, which defines how the step will be run in the processor.
-
The most basic step is an
AsyncStep
, which processes a batch of input items and returns a batch of output items.#[async_trait] impl Processable for MyExtractorStep { ... type RunType = AsyncRunType; ... } impl AsyncStep for MyExtractorStep {}
-
A
PollableAsyncStep
does the same asAsyncStep
, but it also periodically polls its internal state and returns a batch of output items if available.#[async_trait] impl<T> Processable for MyPollStep<T> where Self: Sized + Send + 'static, T: Send + 'static, { ... type RunType = PollableAsyncRunType; ... } #[async_trait] impl<T: Send + 'static> PollableAsyncStep for MyPollStep<T> where Self: Sized + Send + Sync + 'static, T: Send + 'static, { /// Returns the duration between polls fn poll_interval(&self) -> std::time::Duration { // Define duration } /// Polls the internal state and returns a batch of output items if available. async fn poll(&mut self) -> Result<Option<Vec<TransactionContext<T>>>, ProcessorError> { // Define polling logic } }
-
The SDK provides several common steps to use in your processor.
TransactionStreamStep
provides a stream of Aptos transactions to the processorTimedBufferStep
buffers a batch of items and periodically polls to release the items to the next step
When ProcessorBuilder
connects two steps, a channel is created linking the two steps and the output of the first step becomes the input of the next step.
let (pb, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step(
first_step.into_runnable_step(),
)
.connect_to(second_step.into_runnable_step(), channel_size)
.connect_to(third_step.into_runnable_step(), channel_size)
.end_and_return_output_receiver(channel_size);
- Use aptos-indexer-processor-example as a starting point
- Add the new processor to ProcessorConfig and Processor
- Add the processor to RunnableConfig
To run the processor, we recommend using the example in aptos-indexer-processor-example and following this configuration guide.
- Fanout + ArcifyStep
- Fan in