diff --git a/block-streamer/proto/block_streamer.proto b/block-streamer/proto/block_streamer.proto index 5d1e0517a..fd24b7972 100644 --- a/block-streamer/proto/block_streamer.proto +++ b/block-streamer/proto/block_streamer.proto @@ -12,6 +12,17 @@ service BlockStreamer { // Lists all current BlockStream processes rpc ListStreams (ListStreamsRequest) returns (ListStreamsResponse); + + // Get info for an existing BlockStream process + rpc GetStream (GetStreamRequest) returns (StreamInfo); +} + +// Request message for getting a BlockStream +message GetStreamRequest { + // Account ID which the indexer is defined under + string account_id = 1; + // Name of the indexer + string function_name = 2; } // Request message for starting a BlockStream @@ -97,4 +108,26 @@ message StreamInfo { string function_name = 4; // Block height corresponding to the created/updated height of the indexer uint64 version = 5; + // Contains health information for the Block Stream + Health health = 6; +} + +// Contains health information for the Block Stream +message Health { + // The processing state of the block stream + ProcessingState processing_state = 1; + // When the health info was last updated + uint64 updated_at_timestamp_secs = 2; +} + +enum ProcessingState { + UNSPECIFIED = 0; + // Not started, or has been stopped + IDLE = 1; + // Running as expected + RUNNING = 2; + // Waiting for some internal condition to be met before continuing + WAITING = 3; + // Stopped due to some unknown error + STALLED = 4; } diff --git a/block-streamer/src/block_stream.rs b/block-streamer/src/block_stream.rs index 04cd824a6..eff2a0d1e 100644 --- a/block-streamer/src/block_stream.rs +++ b/block-streamer/src/block_stream.rs @@ -1,7 +1,9 @@ +use std::cmp::Ordering; use std::future::Future; use std::pin::Pin; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::task::Poll; +use std::time::SystemTime; use anyhow::Context; use futures::StreamExt; @@ -52,16 +54,43 @@ impl Future for PollCounter { } pub struct Task { - handle: JoinHandle>, + stream_handle: JoinHandle>, + monitor_handle: JoinHandle<()>, cancellation_token: tokio_util::sync::CancellationToken, } +/// Represents the processing state of a block stream +#[derive(Clone)] +pub enum ProcessingState { + /// Block Stream is not currently active but can be started. Either has not been started or was + /// stopped. + Idle, + + /// Block Stream is actively processing blocks. + Running, + + /// Block Stream has been intentionally/internally paused due to some condition, i.e. back pressure on + /// the Redis Stream. + Waiting, + + /// Block Stream has stalled due to an error or other condition. Must be manually + /// restarted. + Stalled, +} + +#[derive(Clone)] +pub struct BlockStreamHealth { + pub processing_state: ProcessingState, + pub last_updated: SystemTime, +} + pub struct BlockStream { task: Option, pub indexer_config: IndexerConfig, pub chain_id: ChainId, pub version: u64, pub redis_stream: String, + health: Arc>, } impl BlockStream { @@ -77,23 +106,107 @@ impl BlockStream { chain_id, version, redis_stream, + health: Arc::new(Mutex::new(BlockStreamHealth { + processing_state: ProcessingState::Idle, + last_updated: SystemTime::now(), + })), } } - pub fn start( - &mut self, + pub fn health(&self) -> anyhow::Result { + match self.health.lock() { + Ok(health) => Ok(health.clone()), + Err(e) => Err(anyhow::anyhow!("Failed to acquire health lock: {:?}", e)), + } + } + + fn start_health_monitoring_task(&self, redis: Arc) -> JoinHandle<()> { + tokio::spawn({ + let config = self.indexer_config.clone(); + let health = self.health.clone(); + let redis_stream = self.redis_stream.clone(); + + async move { + let mut last_processed_block = + redis.get_last_processed_block(&config).await.unwrap(); + + loop { + tokio::time::sleep(std::time::Duration::from_secs(15)).await; + + let new_last_processed_block = + if let Ok(block) = redis.get_last_processed_block(&config).await { + block + } else { + tracing::warn!( + account_id = config.account_id.as_str(), + function_name = config.function_name, + "Failed to fetch last processed block" + ); + continue; + }; + + let stream_size = if let Ok(stream_size) = + redis.get_stream_length(redis_stream.clone()).await + { + stream_size + } else { + tracing::warn!( + account_id = config.account_id.as_str(), + function_name = config.function_name, + "Failed to fetch stream size" + ); + continue; + }; + + let mut health_lock = if let Ok(health) = health.lock() { + health + } else { + tracing::warn!( + account_id = config.account_id.as_str(), + function_name = config.function_name, + "Failed to acquire health lock" + ); + continue; + }; + + match new_last_processed_block.cmp(&last_processed_block) { + Ordering::Less => { + tracing::error!( + account_id = config.account_id.as_str(), + function_name = config.function_name, + "Last processed block should not decrease" + ); + + health_lock.processing_state = ProcessingState::Stalled; + } + Ordering::Equal if stream_size >= Some(MAX_STREAM_SIZE) => { + health_lock.processing_state = ProcessingState::Waiting; + } + Ordering::Equal => { + health_lock.processing_state = ProcessingState::Stalled; + } + Ordering::Greater => { + health_lock.processing_state = ProcessingState::Running; + } + }; + + health_lock.last_updated = SystemTime::now(); + + last_processed_block = new_last_processed_block; + } + } + }) + } + + fn start_block_stream_task( + &self, start_block_height: near_indexer_primitives::types::BlockHeight, redis: Arc, reciever_blocks_processor: Arc, lake_s3_client: SharedLakeS3Client, - ) -> anyhow::Result<()> { - if self.task.is_some() { - return Err(anyhow::anyhow!("BlockStreamer has already been started",)); - } - - let cancellation_token = tokio_util::sync::CancellationToken::new(); - - let handle = tokio::spawn({ + cancellation_token: tokio_util::sync::CancellationToken, + ) -> JoinHandle> { + tokio::spawn({ let cancellation_token = cancellation_token.clone(); let indexer_config = self.indexer_config.clone(); let chain_id = self.chain_id.clone(); @@ -137,10 +250,35 @@ impl BlockStream { } } } - }); + }) + } + + pub fn start( + &mut self, + start_block_height: near_indexer_primitives::types::BlockHeight, + redis: Arc, + reciever_blocks_processor: Arc, + lake_s3_client: SharedLakeS3Client, + ) -> anyhow::Result<()> { + if self.task.is_some() { + return Err(anyhow::anyhow!("BlockStreamer has already been started",)); + } + + let cancellation_token = tokio_util::sync::CancellationToken::new(); + + let monitor_handle = self.start_health_monitoring_task(redis.clone()); + + let stream_handle = self.start_block_stream_task( + start_block_height, + redis, + reciever_blocks_processor, + lake_s3_client, + cancellation_token.clone(), + ); self.task = Some(Task { - handle, + stream_handle, + monitor_handle, cancellation_token, }); @@ -149,8 +287,9 @@ impl BlockStream { pub async fn cancel(&mut self) -> anyhow::Result<()> { if let Some(task) = self.task.take() { + task.monitor_handle.abort(); task.cancellation_token.cancel(); - let _ = task.handle.await?; + let _ = task.stream_handle.await?; // Fails if metric doesn't exist, i.e. task was never polled let _ = metrics::BLOCK_STREAM_UP @@ -167,6 +306,7 @@ impl BlockStream { #[allow(clippy::too_many_arguments)] #[tracing::instrument( + name = "block_stream" skip_all, fields( account_id = indexer.account_id.as_str(), diff --git a/block-streamer/src/redis.rs b/block-streamer/src/redis.rs index 959b5ddc9..1a84d34c6 100644 --- a/block-streamer/src/redis.rs +++ b/block-streamer/src/redis.rs @@ -28,6 +28,18 @@ impl RedisCommandsImpl { Ok(Self { connection }) } + pub async fn get(&self, key: T) -> Result, RedisError> + where + T: ToRedisArgs + Debug + Send + Sync + 'static, + { + tracing::debug!("GET: {:?}", key); + + redis::cmd("GET") + .arg(key) + .query_async(&mut self.connection.clone()) + .await + } + pub async fn xadd(&self, stream_key: T, fields: &[(String, U)]) -> Result<(), RedisError> where T: ToRedisArgs + Debug + Send + Sync + 'static, @@ -133,6 +145,16 @@ impl RedisClientImpl { .context("Failed to set last processed block") } + pub async fn get_last_processed_block( + &self, + indexer_config: &IndexerConfig, + ) -> anyhow::Result> { + self.commands + .get(indexer_config.last_processed_block_key()) + .await + .context("Failed to set last processed block") + } + pub async fn get_stream_length(&self, stream: String) -> anyhow::Result> { self.commands.xlen(stream).await } diff --git a/block-streamer/src/server/block_streamer_service.rs b/block-streamer/src/server/block_streamer_service.rs index 94ccb22aa..4dd3f01ae 100644 --- a/block-streamer/src/server/block_streamer_service.rs +++ b/block-streamer/src/server/block_streamer_service.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; use std::sync::Mutex; +use std::time::SystemTime; use near_lake_framework::near_indexer_primitives; use tonic::{Request, Response, Status}; @@ -21,6 +22,30 @@ pub struct BlockStreamerService { block_streams: Mutex>, } +impl From for blockstreamer::Health { + fn from(health: block_stream::BlockStreamHealth) -> Self { + blockstreamer::Health { + processing_state: match health.processing_state { + block_stream::ProcessingState::Running => { + blockstreamer::ProcessingState::Running as i32 + } + block_stream::ProcessingState::Idle => blockstreamer::ProcessingState::Idle as i32, + block_stream::ProcessingState::Stalled => { + blockstreamer::ProcessingState::Stalled as i32 + } + block_stream::ProcessingState::Waiting => { + blockstreamer::ProcessingState::Waiting as i32 + } + }, + updated_at_timestamp_secs: health + .last_updated + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs(), + } + } +} + impl BlockStreamerService { pub fn new( redis: std::sync::Arc, @@ -59,6 +84,44 @@ impl BlockStreamerService { #[tonic::async_trait] impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerService { + #[tracing::instrument(skip(self))] + async fn get_stream( + &self, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + + let lock = self.block_streams.lock().map_err(|err| { + tracing::error!(?err, "Failed to acquire `block_streams` lock"); + tonic::Status::internal("Failed to acquire `block_streams` lock") + })?; + + let stream_entry = lock.iter().find(|(_, block_stream)| { + block_stream.indexer_config.account_id == request.account_id + && block_stream.indexer_config.function_name == request.function_name + }); + + if let Some((stream_id, stream)) = stream_entry { + let stream_health = stream.health().map_err(|err| { + tracing::error!(?err, "Failed to get health of block stream"); + Status::internal("Failed to get health of block stream") + })?; + + Ok(Response::new(StreamInfo { + stream_id: stream_id.to_string(), + account_id: stream.indexer_config.account_id.to_string(), + function_name: stream.indexer_config.function_name.to_string(), + version: stream.version, + health: Some(stream_health.into()), + })) + } else { + Err(Status::not_found(format!( + "Block Stream for account {} and name {} does not exist", + request.account_id, request.function_name + ))) + } + } + #[tracing::instrument(skip(self))] async fn start_stream( &self, @@ -171,14 +234,29 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic &self, _request: Request, ) -> Result, Status> { - let lock = self.block_streams.lock().unwrap(); + let lock = self.block_streams.lock().map_err(|err| { + tracing::error!(?err, "Failed to acquire `block_streams` lock"); + tonic::Status::internal("Failed to acquire `block_streams` lock") + })?; + let block_streams: Vec = lock .values() - .map(|block_stream| StreamInfo { - stream_id: block_stream.indexer_config.get_hash_id(), - account_id: block_stream.indexer_config.account_id.to_string(), - function_name: block_stream.indexer_config.function_name.clone(), - version: block_stream.version, + .map(|block_stream| { + let stream_health = block_stream + .health() + .map_err(|err| { + tracing::error!(?err, "Failed to get health of block stream"); + Status::internal("Failed to get health of block stream") + }) + .ok(); + + StreamInfo { + stream_id: block_stream.indexer_config.get_hash_id(), + account_id: block_stream.indexer_config.account_id.to_string(), + function_name: block_stream.indexer_config.function_name.to_string(), + version: block_stream.version, + health: stream_health.map(|health| health.into()), + } }) .collect(); @@ -234,6 +312,63 @@ mod tests { ) } + #[tokio::test] + async fn get_existing_block_stream() { + let block_streamer_service = create_block_streamer_service(); + + { + let lock = block_streamer_service.get_block_streams_lock().unwrap(); + assert_eq!(lock.len(), 0); + } + + block_streamer_service + .start_stream(Request::new(StartStreamRequest { + start_block_height: 0, + account_id: "morgs.near".to_string(), + function_name: "test".to_string(), + version: 0, + redis_stream: "stream".to_string(), + rule: Some(start_stream_request::Rule::ActionAnyRule(ActionAnyRule { + affected_account_id: "queryapi.dataplatform.near".to_string(), + status: 1, + })), + })) + .await + .unwrap(); + + let stream = block_streamer_service + .get_stream(Request::new(GetStreamRequest { + account_id: "morgs.near".to_string(), + function_name: "test".to_string(), + })) + .await + .unwrap(); + + assert_eq!( + stream.into_inner().stream_id, + "16210176318434468568".to_string() + ); + } + + #[tokio::test] + async fn get_non_existant_block_stream() { + let block_streamer_service = create_block_streamer_service(); + + { + let lock = block_streamer_service.get_block_streams_lock().unwrap(); + assert_eq!(lock.len(), 0); + } + + let stream_response = block_streamer_service + .get_stream(Request::new(GetStreamRequest { + account_id: "morgs.near".to_string(), + function_name: "test".to_string(), + })) + .await; + + assert_eq!(stream_response.err().unwrap().code(), tonic::Code::NotFound); + } + #[tokio::test] async fn starts_a_block_stream() { let block_streamer_service = create_block_streamer_service(); diff --git a/coordinator/src/handlers/block_streams.rs b/coordinator/src/handlers/block_streams.rs index f88a412d5..9f71149b1 100644 --- a/coordinator/src/handlers/block_streams.rs +++ b/coordinator/src/handlers/block_streams.rs @@ -1,38 +1,42 @@ #![cfg_attr(test, allow(dead_code))] +use std::time::{Duration, SystemTime}; + pub use block_streamer::StreamInfo; use anyhow::Context; use block_streamer::block_streamer_client::BlockStreamerClient; use block_streamer::{ - start_stream_request::Rule, ActionAnyRule, ActionFunctionCallRule, ListStreamsRequest, - StartStreamRequest, Status, StopStreamRequest, + start_stream_request::Rule, ActionAnyRule, ActionFunctionCallRule, GetStreamRequest, + ListStreamsRequest, ProcessingState, StartStreamRequest, Status, StopStreamRequest, }; +use near_primitives::types::AccountId; +use registry_types::StartBlock; use tonic::transport::channel::Channel; use tonic::Request; use crate::indexer_config::IndexerConfig; -use crate::redis::KeyProvider; +use crate::redis::{KeyProvider, RedisClient}; use crate::utils::exponential_retry; -#[cfg(not(test))] -pub use BlockStreamsHandlerImpl as BlockStreamsHandler; -#[cfg(test)] -pub use MockBlockStreamsHandlerImpl as BlockStreamsHandler; - -pub struct BlockStreamsHandlerImpl { +#[derive(Clone)] +pub struct BlockStreamsHandler { client: BlockStreamerClient, + redis_client: RedisClient, } #[cfg_attr(test, mockall::automock)] -impl BlockStreamsHandlerImpl { - pub fn connect(block_streamer_url: &str) -> anyhow::Result { +impl BlockStreamsHandler { + pub fn connect(block_streamer_url: &str, redis_client: RedisClient) -> anyhow::Result { let channel = Channel::from_shared(block_streamer_url.to_string()) .context("Block Streamer URL is invalid")? .connect_lazy(); let client = BlockStreamerClient::new(channel); - Ok(Self { client }) + Ok(Self { + client, + redis_client, + }) } pub async fn list(&self) -> anyhow::Result> { @@ -79,6 +83,26 @@ impl BlockStreamsHandlerImpl { .into() } + pub async fn get( + &self, + account_id: AccountId, + function_name: String, + ) -> anyhow::Result> { + let request = GetStreamRequest { + account_id: account_id.to_string(), + function_name: function_name.clone(), + }; + + match self.client.clone().get_stream(Request::new(request)).await { + Ok(response) => Ok(Some(response.into_inner())), + Err(status) if status.code() == tonic::Code::NotFound => Ok(None), + Err(err) => Err(err).context(format!( + "Failed to get stream for account {} and name {}", + account_id, function_name + )), + } + } + pub async fn start( &self, start_block_height: u64, @@ -139,4 +163,167 @@ impl BlockStreamsHandlerImpl { Ok(()) } + + async fn reconfigure_block_stream(&self, config: &IndexerConfig) -> anyhow::Result<()> { + if matches!( + config.start_block, + StartBlock::Latest | StartBlock::Height(..) + ) { + self.redis_client.clear_block_stream(config).await?; + } + + let height = match config.start_block { + StartBlock::Latest => config.get_registry_version(), + StartBlock::Height(height) => height, + StartBlock::Continue => self.get_continuation_block_height(config).await?, + }; + + tracing::info!( + start_block = ?config.start_block, + height, + "Starting block stream" + ); + + self.start(height, config).await?; + + Ok(()) + } + + async fn start_new_block_stream(&self, config: &IndexerConfig) -> anyhow::Result<()> { + let height = match config.start_block { + StartBlock::Height(height) => height, + StartBlock::Latest => config.get_registry_version(), + StartBlock::Continue => { + tracing::warn!( + "Attempted to start new Block Stream with CONTINUE, using LATEST instead" + ); + config.get_registry_version() + } + }; + + tracing::info!( + start_block = ?config.start_block, + height, + "Starting block stream" + ); + + self.start(height, config).await + } + + async fn get_continuation_block_height(&self, config: &IndexerConfig) -> anyhow::Result { + let height = self + .redis_client + .get_last_published_block(config) + .await? + .map(|height| height + 1) + .unwrap_or_else(|| { + tracing::warn!( + "Failed to get continuation block height, using registry version instead" + ); + + config.get_registry_version() + }); + + Ok(height) + } + + async fn resume_block_stream(&self, config: &IndexerConfig) -> anyhow::Result<()> { + let height = self.get_continuation_block_height(config).await?; + + tracing::info!(height, "Resuming block stream"); + + self.start(height, config).await?; + + Ok(()) + } + + async fn ensure_healthy( + &self, + config: &IndexerConfig, + block_stream: &StreamInfo, + ) -> anyhow::Result<()> { + if let Some(health) = block_stream.health.as_ref() { + let updated_at = + SystemTime::UNIX_EPOCH + Duration::from_secs(health.updated_at_timestamp_secs); + + let stale = updated_at.elapsed().unwrap_or_default() > Duration::from_secs(60); + let stalled = matches!( + health.processing_state.try_into(), + Ok(ProcessingState::Stalled) + ); + + if !stale && !stalled { + return Ok(()); + } else { + tracing::info!(stale, stalled, "Restarting stalled block stream"); + } + } else { + tracing::info!("Restarting stalled block stream"); + } + + self.stop(block_stream.stream_id.clone()).await?; + + let height = self.get_continuation_block_height(config).await?; + self.start(height, config).await?; + + Ok(()) + } + + pub async fn synchronise_block_stream( + &self, + config: &IndexerConfig, + previous_sync_version: Option, + ) -> anyhow::Result<()> { + let block_stream = self + .get(config.account_id.clone(), config.function_name.clone()) + .await?; + + if let Some(block_stream) = block_stream { + if block_stream.version == config.get_registry_version() { + self.ensure_healthy(config, &block_stream).await?; + return Ok(()); + } + + tracing::info!( + previous_version = block_stream.version, + "Stopping outdated block stream" + ); + + self.stop(block_stream.stream_id.clone()).await?; + + self.reconfigure_block_stream(config).await?; + + return Ok(()); + } + + if previous_sync_version.is_none() { + self.start_new_block_stream(config).await?; + + return Ok(()); + } + + if previous_sync_version.unwrap() != config.get_registry_version() { + self.reconfigure_block_stream(config).await?; + + return Ok(()); + } + + self.resume_block_stream(config).await?; + + Ok(()) + } + + pub async fn stop_if_needed( + &self, + account_id: AccountId, + function_name: String, + ) -> anyhow::Result<()> { + if let Some(block_stream) = self.get(account_id, function_name).await? { + tracing::info!("Stopping block stream"); + + self.stop(block_stream.stream_id).await?; + } + + Ok(()) + } } diff --git a/coordinator/src/handlers/data_layer.rs b/coordinator/src/handlers/data_layer.rs index 68537b3c7..3e2df54dc 100644 --- a/coordinator/src/handlers/data_layer.rs +++ b/coordinator/src/handlers/data_layer.rs @@ -8,23 +8,18 @@ use anyhow::Context; use runner::data_layer::data_layer_client::DataLayerClient; use runner::data_layer::{DeprovisionRequest, GetTaskStatusRequest, ProvisionRequest}; use tonic::transport::channel::Channel; -use tonic::Request; +use tonic::{Request, Status}; use crate::indexer_config::IndexerConfig; -#[cfg(not(test))] -pub use DataLayerHandlerImpl as DataLayerHandler; -#[cfg(test)] -pub use MockDataLayerHandlerImpl as DataLayerHandler; - type TaskId = String; -pub struct DataLayerHandlerImpl { +#[derive(Clone)] +pub struct DataLayerHandler { client: DataLayerClient, } -#[cfg_attr(test, mockall::automock)] -impl DataLayerHandlerImpl { +impl DataLayerHandler { pub fn connect(runner_url: &str) -> anyhow::Result { let channel = Channel::from_shared(runner_url.to_string()) .context("Runner URL is invalid")? @@ -37,7 +32,7 @@ impl DataLayerHandlerImpl { pub async fn start_provisioning_task( &self, indexer_config: &IndexerConfig, - ) -> anyhow::Result { + ) -> Result { let request = ProvisionRequest { account_id: indexer_config.account_id.to_string(), function_name: indexer_config.function_name.clone(), @@ -98,4 +93,81 @@ impl DataLayerHandlerImpl { Ok(status) } + + pub async fn ensure_provisioned(&self, indexer_config: &IndexerConfig) -> anyhow::Result<()> { + tracing::info!(account_id = ?indexer_config.account_id, function_name = ?indexer_config.function_name, "Provisioning data layer"); + + let start_task_result = self.start_provisioning_task(indexer_config).await; + + if let Err(error) = start_task_result { + // Already provisioned + if error.code() == tonic::Code::FailedPrecondition { + return Ok(()); + } + + return Err(error.into()); + } + + let task_id = start_task_result.unwrap(); + + let mut iterations = 0; + let delay_seconds = 1; + + loop { + if self.get_task_status(task_id.clone()).await? == TaskStatus::Complete { + break; + } + + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + + iterations += 1; + + if iterations * delay_seconds % 60 == 0 { + tracing::warn!( + ?indexer_config.account_id, + ?indexer_config.function_name, + "Still waiting for provisioning to complete after {} seconds", + iterations * delay_seconds + ); + } + } + + Ok(()) + } + + pub async fn ensure_deprovisioned( + &self, + account_id: AccountId, + function_name: String, + ) -> anyhow::Result<()> { + tracing::info!(?account_id, ?function_name, "Deprovisioning data layer"); + + let task_id = self + .start_deprovisioning_task(account_id.clone(), function_name.clone()) + .await?; + + let mut iterations = 0; + let delay_seconds = 1; + + loop { + if self.get_task_status(task_id.clone()).await? == TaskStatus::Complete { + break; + } + + tokio::time::sleep(std::time::Duration::from_secs(delay_seconds)).await; + + iterations += 1; + + if iterations * delay_seconds % 60 == 0 { + tracing::warn!( + ?account_id, + ?function_name, + "Still waiting for deprovisioning to complete after {} seconds", + iterations * delay_seconds + ); + } + } + + Ok(()) + } } diff --git a/coordinator/src/handlers/executors.rs b/coordinator/src/handlers/executors.rs index 686164048..4e12ef26f 100644 --- a/coordinator/src/handlers/executors.rs +++ b/coordinator/src/handlers/executors.rs @@ -1,10 +1,14 @@ #![cfg_attr(test, allow(dead_code))] +use near_primitives::types::AccountId; pub use runner::ExecutorInfo; use anyhow::Context; use runner::runner_client::RunnerClient; -use runner::{ListExecutorsRequest, StartExecutorRequest, StopExecutorRequest}; +use runner::{ + ExecutionState, GetExecutorRequest, ListExecutorsRequest, StartExecutorRequest, + StopExecutorRequest, +}; use tonic::transport::channel::Channel; use tonic::Request; @@ -12,17 +16,12 @@ use crate::indexer_config::IndexerConfig; use crate::redis::KeyProvider; use crate::utils::exponential_retry; -#[cfg(not(test))] -pub use ExecutorsHandlerImpl as ExecutorsHandler; -#[cfg(test)] -pub use MockExecutorsHandlerImpl as ExecutorsHandler; - -pub struct ExecutorsHandlerImpl { +#[derive(Clone)] +pub struct ExecutorsHandler { client: RunnerClient, } -#[cfg_attr(test, mockall::automock)] -impl ExecutorsHandlerImpl { +impl ExecutorsHandler { pub fn connect(runner_url: &str) -> anyhow::Result { let channel = Channel::from_shared(runner_url.to_string()) .context("Runner URL is invalid")? @@ -50,6 +49,31 @@ impl ExecutorsHandlerImpl { .await } + pub async fn get( + &self, + account_id: AccountId, + function_name: String, + ) -> anyhow::Result> { + let request = GetExecutorRequest { + account_id: account_id.to_string(), + function_name: function_name.clone(), + }; + + match self + .client + .clone() + .get_executor(Request::new(request)) + .await + { + Ok(response) => Ok(Some(response.into_inner())), + Err(status) if status.code() == tonic::Code::NotFound => Ok(None), + Err(err) => Err(err).context(format!( + "Failed to get executor for account {} and name {}", + account_id, function_name + )), + } + } + pub async fn start(&self, indexer_config: &IndexerConfig) -> anyhow::Result<()> { let request = StartExecutorRequest { code: indexer_config.code.clone(), @@ -97,4 +121,72 @@ impl ExecutorsHandlerImpl { Ok(()) } + + async fn ensure_healthy( + &self, + config: &IndexerConfig, + executor: ExecutorInfo, + ) -> anyhow::Result<()> { + if let Some(health) = executor.health { + if !matches!( + health.execution_state.try_into(), + Ok(ExecutionState::Stalled) + ) { + return Ok(()); + } + } + + tracing::info!("Restarting stalled executor"); + + self.stop(executor.executor_id).await?; + self.start(config).await?; + + Ok(()) + } + + pub async fn synchronise_executor(&self, config: &IndexerConfig) -> anyhow::Result<()> { + let executor = self + .get(config.account_id.clone(), config.function_name.clone()) + .await?; + + if let Some(executor) = executor { + if executor.version == config.get_registry_version() { + self.ensure_healthy(config, executor).await?; + return Ok(()); + } + + tracing::info!( + account_id = config.account_id.as_str(), + function_name = config.function_name, + version = executor.version, + "Stopping outdated executor" + ); + + self.stop(executor.executor_id).await?; + } + + tracing::info!( + account_id = config.account_id.as_str(), + function_name = config.function_name, + version = config.get_registry_version(), + "Starting executor" + ); + + self.start(config).await?; + + Ok(()) + } + + pub async fn stop_if_needed( + &self, + account_id: AccountId, + function_name: String, + ) -> anyhow::Result<()> { + if let Some(executor) = self.get(account_id, function_name).await? { + tracing::info!("Stopping executor"); + self.stop(executor.executor_id).await?; + } + + Ok(()) + } } diff --git a/coordinator/src/indexer_state.rs b/coordinator/src/indexer_state.rs index 2539e06a2..1200648f0 100644 --- a/coordinator/src/indexer_state.rs +++ b/coordinator/src/indexer_state.rs @@ -4,6 +4,7 @@ use anyhow::Context; use near_primitives::types::AccountId; use crate::indexer_config::IndexerConfig; +use crate::lifecycle::LifecycleState; use crate::redis::{KeyProvider, RedisClient}; #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)] @@ -16,7 +17,7 @@ pub enum ProvisionedState { } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)] -pub struct IndexerState { +pub struct OldIndexerState { pub account_id: AccountId, pub function_name: String, pub block_stream_synced_at: Option, @@ -24,6 +25,15 @@ pub struct IndexerState { pub provisioned_state: ProvisionedState, } +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)] +pub struct IndexerState { + pub account_id: AccountId, + pub function_name: String, + pub block_stream_synced_at: Option, + pub enabled: bool, + pub lifecycle_state: LifecycleState, +} + impl KeyProvider for IndexerState { fn account_id(&self) -> String { self.account_id.to_string() @@ -49,13 +59,46 @@ impl IndexerStateManagerImpl { Self { redis_client } } + pub async fn migrate(&self) -> anyhow::Result<()> { + let raw_states = self.redis_client.list_indexer_states().await?; + + for raw_state in raw_states { + if let Ok(state) = serde_json::from_str::(&raw_state) { + tracing::info!( + "{}/{} already migrated, skipping", + state.account_id, + state.function_name + ); + continue; + } + + tracing::info!("Migrating {}", raw_state); + + let old_state: OldIndexerState = serde_json::from_str(&raw_state)?; + + let state = IndexerState { + account_id: old_state.account_id, + function_name: old_state.function_name, + block_stream_synced_at: old_state.block_stream_synced_at, + enabled: old_state.enabled, + lifecycle_state: LifecycleState::Running, + }; + + self.redis_client + .set(state.get_state_key(), serde_json::to_string(&state)?) + .await?; + } + + Ok(()) + } + fn get_default_state(&self, indexer_config: &IndexerConfig) -> IndexerState { IndexerState { account_id: indexer_config.account_id.clone(), function_name: indexer_config.function_name.clone(), block_stream_synced_at: None, enabled: true, - provisioned_state: ProvisionedState::Unprovisioned, + lifecycle_state: LifecycleState::default(), } } @@ -76,10 +119,12 @@ impl IndexerStateManagerImpl { } pub async fn delete_state(&self, indexer_state: &IndexerState) -> anyhow::Result<()> { + tracing::info!("Deleting state"); + self.redis_client.delete_indexer_state(indexer_state).await } - async fn set_state( + pub async fn set_state( &self, indexer_config: &IndexerConfig, state: IndexerState, @@ -101,58 +146,6 @@ impl IndexerStateManagerImpl { Ok(()) } - pub async fn set_deprovisioning( - &self, - indexer_state: &IndexerState, - task_id: String, - ) -> anyhow::Result<()> { - let mut state = indexer_state.clone(); - - state.provisioned_state = ProvisionedState::Deprovisioning { task_id }; - - self.redis_client - .set(state.get_state_key(), serde_json::to_string(&state)?) - .await?; - - Ok(()) - } - - pub async fn set_provisioning( - &self, - indexer_config: &IndexerConfig, - task_id: String, - ) -> anyhow::Result<()> { - let mut indexer_state = self.get_state(indexer_config).await?; - - indexer_state.provisioned_state = ProvisionedState::Provisioning { task_id }; - - self.set_state(indexer_config, indexer_state).await?; - - Ok(()) - } - - pub async fn set_provisioned(&self, indexer_config: &IndexerConfig) -> anyhow::Result<()> { - let mut indexer_state = self.get_state(indexer_config).await?; - - indexer_state.provisioned_state = ProvisionedState::Provisioned; - - self.set_state(indexer_config, indexer_state).await?; - - Ok(()) - } - pub async fn set_provisioning_failure( - &self, - indexer_config: &IndexerConfig, - ) -> anyhow::Result<()> { - let mut indexer_state = self.get_state(indexer_config).await?; - - indexer_state.provisioned_state = ProvisionedState::Failed; - - self.set_state(indexer_config, indexer_state).await?; - - Ok(()) - } - pub async fn set_enabled( &self, indexer_config: &IndexerConfig, @@ -194,7 +187,7 @@ mod tests { let mut mock_redis_client = RedisClient::default(); mock_redis_client .expect_list_indexer_states() - .returning(|| Ok(vec![serde_json::json!({ "account_id": "morgs.near", "function_name": "test", "block_stream_synced_at": 200, "enabled": true, "provisioned_state": "Provisioned" }).to_string()])) + .returning(|| Ok(vec![serde_json::json!({ "account_id": "morgs.near", "function_name": "test", "block_stream_synced_at": 200, "enabled": true, "lifecycle_state": "Initializing" }).to_string()])) .once(); mock_redis_client .expect_list_indexer_states() @@ -229,7 +222,7 @@ mod tests { .with(predicate::eq(indexer_config.clone())) .returning(|_| { Ok(Some( - serde_json::json!({ "account_id": "morgs.near", "function_name": "test", "block_stream_synced_at": 123, "enabled": true, "provisioned_state": "Provisioned" }) + serde_json::json!({ "account_id": "morgs.near", "function_name": "test", "block_stream_synced_at": 123, "enabled": true, "lifecycle_state": "Initializing" }) .to_string(), )) }); @@ -237,7 +230,7 @@ mod tests { .expect_set_indexer_state::() .with( predicate::always(), - predicate::eq("{\"account_id\":\"morgs.near\",\"function_name\":\"test\",\"block_stream_synced_at\":123,\"enabled\":false,\"provisioned_state\":\"Provisioned\"}".to_string()), + predicate::eq("{\"account_id\":\"morgs.near\",\"function_name\":\"test\",\"block_stream_synced_at\":123,\"enabled\":false,\"lifecycle_state\":\"Initializing\"}".to_string()), ) .returning(|_, _| Ok(())) .once(); diff --git a/coordinator/src/lifecycle.rs b/coordinator/src/lifecycle.rs new file mode 100644 index 000000000..70c9f7b38 --- /dev/null +++ b/coordinator/src/lifecycle.rs @@ -0,0 +1,343 @@ +use tracing::{info, warn}; + +use crate::handlers::block_streams::BlockStreamsHandler; +use crate::handlers::data_layer::DataLayerHandler; +use crate::handlers::executors::ExecutorsHandler; +use crate::indexer_config::IndexerConfig; +use crate::indexer_state::{IndexerState, IndexerStateManager}; +use crate::redis::{KeyProvider, RedisClient}; +use crate::registry::Registry; + +const LOOP_THROTTLE_MS: u64 = 1000; + +/// Represents the different lifecycle states of an Indexer +#[derive(Default, Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)] +pub enum LifecycleState { + /// Pre-requisite resources, i.e. Data Layer are being created. + /// + /// Transitions: + /// - `Running` on success + /// - `Repairing` on Data Layer provisioning failure + #[default] + Initializing, + /// Indexer is functional, Block Stream and Executors are continouously monitored to ensure + /// they are running the latest version of the Indexer. + /// + /// Transitions: + /// - `Stopping` if suspended + /// - `Running` if Block Stream or Executor fails to synchronise, essentially triggering a + /// retry + /// - `Running` on success + Running, + /// Indexer is being stopped, Block Stream and Executors are being stopped. + /// + /// Transitions: + /// - `Stopping` on failure, triggering a retry + /// - `Stopped` on success + Stopping, + /// Indexer is stopped, Block Stream and Executors are not running. + /// + /// Transitions: + /// - `Running` if unsuspended + Stopped, + /// Indexer is in a bad state, currently requires manual intervention, but should eventually + /// self heal. This is a dead-end state + /// + /// Transitions: + /// - `Repairing` continuously + Repairing, // TODO Add `error` to enable reparation + /// Indexer is being deleted, all resources are being cleaned up + /// + /// Transitions: + /// - `Deleting` on failure, triggering a retry + /// - `Deleted` on success + Deleting, + /// Indexer is deleted, all resources are cleaned up, lifecycle manager will exit + Deleted, +} + +pub struct LifecycleManager<'a> { + initial_config: IndexerConfig, + block_streams_handler: &'a BlockStreamsHandler, + executors_handler: &'a ExecutorsHandler, + data_layer_handler: &'a DataLayerHandler, + registry: &'a Registry, + state_manager: &'a IndexerStateManager, + redis_client: &'a RedisClient, +} + +impl<'a> LifecycleManager<'a> { + pub fn new( + initial_config: IndexerConfig, + block_streams_handler: &'a BlockStreamsHandler, + executors_handler: &'a ExecutorsHandler, + data_layer_handler: &'a DataLayerHandler, + registry: &'a Registry, + state_manager: &'a IndexerStateManager, + redis_client: &'a RedisClient, + ) -> Self { + Self { + initial_config, + block_streams_handler, + executors_handler, + data_layer_handler, + registry, + state_manager, + redis_client, + } + } + + #[tracing::instrument(name = "initializing", skip_all)] + async fn handle_initializing( + &self, + config: Option<&IndexerConfig>, + _state: &IndexerState, + ) -> LifecycleState { + if config.is_none() { + return LifecycleState::Deleting; + } + + let config = config.unwrap(); + + if self + .data_layer_handler + .ensure_provisioned(config) + .await + .is_err() + { + return LifecycleState::Repairing; + } + + LifecycleState::Running + } + + #[tracing::instrument(name = "running", skip_all)] + async fn handle_running( + &self, + config: Option<&IndexerConfig>, + state: &mut IndexerState, + ) -> LifecycleState { + if config.is_none() { + return LifecycleState::Deleting; + } + + let config = config.unwrap(); + + if !state.enabled { + return LifecycleState::Stopping; + } + + if let Err(error) = self + .block_streams_handler + .synchronise_block_stream(config, state.block_stream_synced_at) + .await + { + warn!(?error, "Failed to synchronise block stream, retrying..."); + + return LifecycleState::Running; + } + + state.block_stream_synced_at = Some(config.get_registry_version()); + + if let Err(error) = self.executors_handler.synchronise_executor(config).await { + warn!(?error, "Failed to synchronise executor, retrying..."); + + return LifecycleState::Running; + } + + LifecycleState::Running + } + + #[tracing::instrument(name = "stopping", skip_all)] + async fn handle_stopping(&self, config: Option<&IndexerConfig>) -> LifecycleState { + if config.is_none() { + return LifecycleState::Deleting; + } + + let config = config.unwrap(); + + if let Err(error) = self + .block_streams_handler + .stop_if_needed(config.account_id.clone(), config.function_name.clone()) + .await + { + warn!(?error, "Failed to stop block stream, retrying..."); + return LifecycleState::Stopping; + } + + if let Err(error) = self + .executors_handler + .stop_if_needed(config.account_id.clone(), config.function_name.clone()) + .await + { + warn!(?error, "Failed to stop executor, retrying..."); + return LifecycleState::Stopping; + } + + LifecycleState::Stopped + } + + #[tracing::instrument(name = "stopped", skip_all)] + async fn handle_stopped( + &self, + config: Option<&IndexerConfig>, + state: &IndexerState, + ) -> LifecycleState { + if config.is_none() { + return LifecycleState::Deleting; + } + + // TODO Transistion to `Running` on config update + + if state.enabled { + return LifecycleState::Running; + } + + LifecycleState::Stopped + } + + #[tracing::instrument(name = "repairing", skip_all)] + async fn handle_repairing( + &self, + config: Option<&IndexerConfig>, + _state: &IndexerState, + ) -> LifecycleState { + if config.is_none() { + return LifecycleState::Deleting; + } + + // TODO Add more robust error handling, for now just stop + LifecycleState::Repairing + } + + #[tracing::instrument(name = "deleting", skip_all)] + async fn handle_deleting(&self, state: &IndexerState) -> LifecycleState { + if let Err(error) = self + .block_streams_handler + .stop_if_needed(state.account_id.clone(), state.function_name.clone()) + .await + { + warn!(?error, "Failed to stop block stream"); + } + + if let Err(error) = self + .executors_handler + .stop_if_needed(state.account_id.clone(), state.function_name.clone()) + .await + { + warn!(?error, "Failed to stop executor"); + } + + if self.state_manager.delete_state(state).await.is_err() { + // Retry + return LifecycleState::Deleting; + } + + info!("Clearing block stream"); + + if self + .redis_client + .del(state.get_redis_stream_key()) + .await + .is_err() + { + // Retry + return LifecycleState::Deleting; + } + + if self + .data_layer_handler + .ensure_deprovisioned(state.account_id.clone(), state.function_name.clone()) + .await + .is_err() + { + return LifecycleState::Deleted; + } + + LifecycleState::Deleted + } + + #[tracing::instrument( + name = "lifecycle_manager", + skip(self), + fields( + account_id = self.initial_config.account_id.as_str(), + function_name = self.initial_config.function_name.as_str() + ) + )] + pub async fn run(&self) { + let mut first_iteration = true; + + loop { + tokio::time::sleep(std::time::Duration::from_millis(LOOP_THROTTLE_MS)).await; + + let config = match self + .registry + .fetch_indexer( + &self.initial_config.account_id, + &self.initial_config.function_name, + ) + .await + { + Ok(config) => config, + Err(error) => { + warn!(?error, "Failed to fetch config"); + continue; + } + }; + + let mut state = match self.state_manager.get_state(&self.initial_config).await { + Ok(state) => state, + Err(error) => { + warn!(?error, "Failed to get state"); + continue; + } + }; + + if first_iteration { + info!("Initial lifecycle state: {:?}", state.lifecycle_state,); + first_iteration = false; + } + + let desired_lifecycle_state = match state.lifecycle_state { + LifecycleState::Initializing => { + self.handle_initializing(config.as_ref(), &state).await + } + LifecycleState::Running => self.handle_running(config.as_ref(), &mut state).await, + LifecycleState::Stopping => self.handle_stopping(config.as_ref()).await, + LifecycleState::Stopped => self.handle_stopped(config.as_ref(), &state).await, + LifecycleState::Repairing => self.handle_repairing(config.as_ref(), &state).await, + LifecycleState::Deleting => self.handle_deleting(&state).await, + LifecycleState::Deleted => LifecycleState::Deleted, + }; + + if desired_lifecycle_state != state.lifecycle_state { + info!( + "Transitioning lifecycle state: {:?} -> {:?}", + state.lifecycle_state, desired_lifecycle_state, + ); + } + + if desired_lifecycle_state == LifecycleState::Deleted { + break; + } + + state.lifecycle_state = desired_lifecycle_state; + + loop { + match self + .state_manager + .set_state(&self.initial_config, state.clone()) + .await + { + Ok(_) => break, + Err(e) => { + warn!("Failed to set state: {:?}. Retrying...", e); + + tokio::time::sleep(std::time::Duration::from_millis(1000)).await; + } + } + } + } + } +} diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 6d55aa780..fef71dd32 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -1,27 +1,29 @@ +use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; use near_primitives::types::AccountId; +use tokio::task::JoinHandle; use tracing_subscriber::prelude::*; use crate::handlers::block_streams::BlockStreamsHandler; use crate::handlers::data_layer::DataLayerHandler; use crate::handlers::executors::ExecutorsHandler; use crate::indexer_state::IndexerStateManager; +use crate::lifecycle::LifecycleManager; use crate::redis::RedisClient; use crate::registry::Registry; -use crate::synchroniser::Synchroniser; mod handlers; mod indexer_config; mod indexer_state; +mod lifecycle; mod redis; mod registry; mod server; -mod synchroniser; mod utils; -const CONTROL_LOOP_THROTTLE_SECONDS: Duration = Duration::from_secs(1); +const LOOP_THROTTLE_SECONDS: Duration = Duration::from_secs(1); async fn sleep(duration: Duration) -> anyhow::Result<()> { tokio::time::sleep(duration).await; @@ -58,18 +60,11 @@ async fn main() -> anyhow::Result<()> { let registry = Arc::new(Registry::connect(registry_contract_id.clone(), &rpc_url)); let redis_client = RedisClient::connect(&redis_url).await?; - let block_streams_handler = BlockStreamsHandler::connect(&block_streamer_url)?; + let block_streams_handler = + BlockStreamsHandler::connect(&block_streamer_url, redis_client.clone())?; let executors_handler = ExecutorsHandler::connect(&runner_url)?; let data_layer_handler = DataLayerHandler::connect(&runner_url)?; let indexer_state_manager = Arc::new(IndexerStateManager::new(redis_client.clone())); - let synchroniser = Synchroniser::new( - &block_streams_handler, - &executors_handler, - &data_layer_handler, - ®istry, - &indexer_state_manager, - &redis_client, - ); tokio::spawn({ let indexer_state_manager = indexer_state_manager.clone(); @@ -77,7 +72,62 @@ async fn main() -> anyhow::Result<()> { async move { server::init(grpc_port, indexer_state_manager, registry).await } }); + indexer_state_manager.migrate().await?; + + let mut lifecycle_tasks = HashMap::>::new(); + loop { - tokio::try_join!(synchroniser.sync(), sleep(CONTROL_LOOP_THROTTLE_SECONDS))?; + let indexer_registry = registry.fetch().await?; + + for config in indexer_registry.iter() { + if lifecycle_tasks.contains_key(&config.get_full_name()) { + continue; + } + + tracing::info!( + account_id = config.account_id.as_str(), + function_name = config.function_name.as_str(), + "Starting lifecycle manager" + ); + + let handle = tokio::spawn({ + let indexer_state_manager = indexer_state_manager.clone(); + let config = config.clone(); + let registry = registry.clone(); + let redis_client = redis_client.clone(); + let block_streams_handler = block_streams_handler.clone(); + let data_layer_handler = data_layer_handler.clone(); + let executors_handler = executors_handler.clone(); + + async move { + let lifecycle_manager = LifecycleManager::new( + config, + &block_streams_handler, + &executors_handler, + &data_layer_handler, + ®istry, + &indexer_state_manager, + &redis_client, + ); + + lifecycle_manager.run().await + } + }); + + lifecycle_tasks.insert(config.get_full_name(), handle); + } + + let finished_tasks: Vec = lifecycle_tasks + .iter() + .filter_map(|(name, task)| task.is_finished().then_some(name.clone())) + .collect(); + + for indexer_name in finished_tasks { + tracing::info!(indexer_name, "Lifecycle has finished, removing..."); + + lifecycle_tasks.remove(&indexer_name); + } + + sleep(LOOP_THROTTLE_SECONDS).await?; } } diff --git a/coordinator/src/registry.rs b/coordinator/src/registry.rs index d3cb87397..de891ffa3 100644 --- a/coordinator/src/registry.rs +++ b/coordinator/src/registry.rs @@ -169,9 +169,9 @@ impl RegistryImpl { pub async fn fetch_indexer( &self, - account_id: AccountId, - function_name: String, - ) -> anyhow::Result { + account_id: &AccountId, + function_name: &str, + ) -> anyhow::Result> { let response = self .json_rpc_client .call(RpcQueryRequest { @@ -194,10 +194,10 @@ impl RegistryImpl { .context("Failed to fetch indexer")?; if let QueryResponseKind::CallResult(call_result) = response.kind { - let indexer: registry_types::IndexerConfig = - serde_json::from_slice(&call_result.result)?; - - return Ok(IndexerConfig { + let indexer = serde_json::from_slice::>( + &call_result.result, + )? + .map(|indexer| IndexerConfig { account_id: account_id.clone(), function_name: function_name.to_string(), code: indexer.code, @@ -207,6 +207,8 @@ impl RegistryImpl { updated_at_block_height: indexer.updated_at_block_height, created_at_block_height: indexer.created_at_block_height, }); + + return Ok(indexer); } anyhow::bail!("Invalid registry response") diff --git a/coordinator/src/server/indexer_manager_service.rs b/coordinator/src/server/indexer_manager_service.rs index 36a91ca31..809fd8970 100644 --- a/coordinator/src/server/indexer_manager_service.rs +++ b/coordinator/src/server/indexer_manager_service.rs @@ -42,9 +42,10 @@ impl indexer_manager::indexer_manager_server::IndexerManager for IndexerManagerS let indexer_config = self .registry - .fetch_indexer(account_id, request.function_name) + .fetch_indexer(&account_id, &request.function_name) .await - .map_err(|_| Status::not_found("Indexer not found"))?; + .map_err(|_| Status::internal("Failed to fetch indexer"))? + .ok_or(Status::not_found("Indexer not found"))?; self.indexer_state_manager .set_enabled(&indexer_config, true) @@ -78,9 +79,10 @@ impl indexer_manager::indexer_manager_server::IndexerManager for IndexerManagerS let indexer_config = self .registry - .fetch_indexer(account_id, request.function_name) + .fetch_indexer(&account_id, &request.function_name) .await - .map_err(|_| Status::not_found("Indexer not found"))?; + .map_err(|_| Status::internal("Failed to fetch indexer"))? + .ok_or(Status::not_found("Indexer not found"))?; self.indexer_state_manager .set_enabled(&indexer_config, false) diff --git a/coordinator/src/synchroniser.rs b/coordinator/src/synchroniser.rs index bf362bc5d..64a5ce073 100644 --- a/coordinator/src/synchroniser.rs +++ b/coordinator/src/synchroniser.rs @@ -427,956 +427,3 @@ impl<'a> Synchroniser<'a> { Ok(()) } } - -#[cfg(test)] -mod test { - use super::*; - - use mockall::predicate::*; - use std::collections::HashMap; - - use crate::registry::IndexerRegistry; - - #[tokio::test] - async fn generates_sync_states() { - let existing_account_ids = vec![ - "account1.near".to_string(), - "account2.near".to_string(), - "account3.near".to_string(), - "account4.near".to_string(), - ]; - let new_account_ids = vec![ - "new_account1.near".to_string(), - "new_account2.near".to_string(), - ]; - let deleted_account_ids = vec![ - "deleted_account1.near".to_string(), - "deleted_account2.near".to_string(), - ]; - - let mut existing_indexer_configs: Vec = Vec::new(); - for (i, account_id) in existing_account_ids.iter().enumerate() { - for j in 1..=5 { - existing_indexer_configs.push(IndexerConfig { - account_id: account_id.parse().unwrap(), - function_name: format!("existing_indexer{}_{}", i + 1, j), - ..Default::default() - }); - } - } - - let mut new_indexer_configs: Vec = Vec::new(); - for (i, account_id) in new_account_ids.iter().enumerate() { - for j in 1..=3 { - new_indexer_configs.push(IndexerConfig { - account_id: account_id.parse().unwrap(), - function_name: format!("new_indexer{}_{}", i + 1, j), - ..Default::default() - }); - } - } - - let mut deleted_indexer_configs: Vec = Vec::new(); - for (i, account_id) in deleted_account_ids.iter().enumerate() { - for j in 1..=2 { - deleted_indexer_configs.push(IndexerConfig { - account_id: account_id.parse().unwrap(), - function_name: format!("deleted_indexer{}_{}", i + 1, j), - ..Default::default() - }); - } - } - - let mut indexer_registry = IndexerRegistry::new(); - for indexer in existing_indexer_configs - .iter() - .chain(new_indexer_configs.iter()) - { - indexer_registry - .entry(indexer.account_id.clone()) - .or_default() - .insert(indexer.function_name.clone(), indexer.clone()); - } - - let mut block_streams_handler = BlockStreamsHandler::default(); - let block_streams: Vec = existing_indexer_configs - .iter() - // generate some "randomness" - .rev() - .enumerate() - .map(|(i, indexer)| StreamInfo { - stream_id: format!("stream_id{}", i + 1), - account_id: indexer.account_id.to_string(), - function_name: indexer.function_name.clone(), - version: indexer.get_registry_version(), - }) - .collect(); - block_streams_handler - .expect_list() - .returning(move || Ok(block_streams.clone())); - - let mut executors_handler = ExecutorsHandler::default(); - let executors: Vec = existing_indexer_configs - .iter() - // generate some "randomness" - .rev() - .enumerate() - .map(|(i, indexer)| ExecutorInfo { - executor_id: format!("executor_id{}", i + 1), - account_id: indexer.account_id.to_string(), - function_name: indexer.function_name.clone(), - version: indexer.get_registry_version(), - status: "running".to_string(), - }) - .collect(); - - executors_handler - .expect_list() - .returning(move || Ok(executors.clone())); - - let mut registry = Registry::default(); - registry - .expect_fetch() - .returning(move || Ok(indexer_registry.clone())); - - let mut state_manager = IndexerStateManager::default(); - let states: Vec = existing_indexer_configs - .iter() - .map(|indexer| IndexerState { - account_id: indexer.account_id.clone(), - function_name: indexer.function_name.clone(), - block_stream_synced_at: Some(indexer.get_registry_version()), - enabled: true, - provisioned_state: ProvisionedState::Provisioned, - }) - .chain(deleted_indexer_configs.iter().map(|indexer| IndexerState { - account_id: indexer.account_id.clone(), - function_name: indexer.function_name.clone(), - block_stream_synced_at: Some(indexer.get_registry_version()), - enabled: true, - provisioned_state: ProvisionedState::Provisioned, - })) - .collect(); - state_manager - .expect_list() - .returning(move || Ok(states.clone())); - - let redis_client = RedisClient::default(); - let data_layer_handler = DataLayerHandler::default(); - - let synchroniser = Synchroniser::new( - &block_streams_handler, - &executors_handler, - &data_layer_handler, - ®istry, - &state_manager, - &redis_client, - ); - - let synchronisation_states = synchroniser - .generate_synchronisation_states() - .await - .unwrap(); - - let mut new_count = 0; - let mut existing_count = 0; - let mut deleted_count = 0; - - for state in &synchronisation_states { - match state { - SynchronisationState::New(_) => new_count += 1, - SynchronisationState::Existing(_, _, executor, block_stream) => { - assert!(executor.is_some(), "Executor should exist for the indexer"); - assert!( - block_stream.is_some(), - "Block stream should exist for the indexer" - ); - existing_count += 1; - } - SynchronisationState::Deleted(_, _, _) => { - deleted_count += 1; - } - } - } - - assert_eq!(new_count, 6); - assert_eq!(existing_count, 20); - assert_eq!(deleted_count, 4); - } - - mod new { - use super::*; - - #[tokio::test] - async fn triggers_data_layer_provisioning() { - let config = IndexerConfig::default(); - - let indexer_registry = IndexerRegistry::from(&[( - config.account_id.clone(), - HashMap::from([(config.function_name.clone(), config.clone())]), - )]); - - let mut block_streams_handler = BlockStreamsHandler::default(); - block_streams_handler.expect_list().returning(|| Ok(vec![])); - - let mut executors_handler = ExecutorsHandler::default(); - executors_handler.expect_list().returning(|| Ok(vec![])); - - let mut registry = Registry::default(); - registry - .expect_fetch() - .returning(move || Ok(indexer_registry.clone())); - - let mut state_manager = IndexerStateManager::default(); - state_manager.expect_list().returning(|| Ok(vec![])); - state_manager - .expect_set_provisioning() - .with(eq(config.clone()), eq("task_id".to_string())) - .returning(|_, _| Ok(())) - .once(); - - let mut data_layer_handler = DataLayerHandler::default(); - data_layer_handler - .expect_start_provisioning_task() - .with(eq(config)) - .returning(|_| Ok("task_id".to_string())) - .once(); - - let redis_client = RedisClient::default(); - - let synchroniser = Synchroniser::new( - &block_streams_handler, - &executors_handler, - &data_layer_handler, - ®istry, - &state_manager, - &redis_client, - ); - - synchroniser.sync().await.unwrap(); - } - } - - mod existing { - use super::*; - - #[tokio::test] - async fn waits_for_provisioning_to_complete() { - let config = IndexerConfig::default(); - - let indexer_registry = IndexerRegistry::from(&[( - config.account_id.clone(), - HashMap::from([(config.function_name.clone(), config.clone())]), - )]); - - let task_id = "task_id".to_string(); - - let state = IndexerState { - account_id: config.account_id.clone(), - function_name: config.function_name.clone(), - block_stream_synced_at: Some(config.get_registry_version()), - enabled: true, - provisioned_state: ProvisionedState::Provisioning { - task_id: task_id.clone().to_string(), - }, - }; - - let mut registry = Registry::default(); - registry - .expect_fetch() - .returning(move || Ok(indexer_registry.clone())); - - let mut state_manager = IndexerStateManager::default(); - state_manager - .expect_set_provisioned() - .with(eq(config.clone())) - .returning(|_| Ok(())) - .once(); - - let mut data_layer_handler = DataLayerHandler::default(); - data_layer_handler - .expect_get_task_status() - .with(eq(task_id)) - .returning(|_| Ok(TaskStatus::Complete)); - - let mut block_streams_handler = BlockStreamsHandler::default(); - block_streams_handler.expect_start().never(); - - let mut executors_handler = ExecutorsHandler::default(); - executors_handler.expect_start().never(); - - let redis_client = RedisClient::default(); - - let synchroniser = Synchroniser::new( - &block_streams_handler, - &executors_handler, - &data_layer_handler, - ®istry, - &state_manager, - &redis_client, - ); - - synchroniser - .sync_existing_indexer(&config, &state, None, None) - .await - .unwrap(); - } - - #[tokio::test] - async fn ignores_failed_provisioning() { - let config = IndexerConfig::default(); - - let indexer_registry = IndexerRegistry::from(&[( - config.account_id.clone(), - HashMap::from([(config.function_name.clone(), config.clone())]), - )]); - - let state = IndexerState { - account_id: config.account_id.clone(), - function_name: config.function_name.clone(), - block_stream_synced_at: Some(config.get_registry_version()), - enabled: true, - provisioned_state: ProvisionedState::Provisioning { - task_id: "task_id".to_string(), - }, - }; - - let mut registry = Registry::default(); - registry - .expect_fetch() - .returning(move || Ok(indexer_registry.clone())); - - let mut state_manager = IndexerStateManager::default(); - state_manager - .expect_set_provisioning_failure() - .with(eq(config.clone())) - .returning(|_| Ok(())) - .once(); - - let mut data_layer_handler = DataLayerHandler::default(); - data_layer_handler - .expect_get_task_status() - .with(eq("task_id".to_string())) - .returning(|_| Ok(TaskStatus::Failed)); - - let mut block_streams_handler = BlockStreamsHandler::default(); - block_streams_handler.expect_start().never(); - - let mut executors_handler = ExecutorsHandler::default(); - executors_handler.expect_start().never(); - - let redis_client = RedisClient::default(); - - let synchroniser = Synchroniser::new( - &block_streams_handler, - &executors_handler, - &data_layer_handler, - ®istry, - &state_manager, - &redis_client, - ); - - synchroniser - .sync_existing_indexer(&config, &state, None, None) - .await - .unwrap(); - } - - #[tokio::test] - async fn ignores_synced() { - let config = IndexerConfig::default(); - - let indexer_registry = IndexerRegistry::from(&[( - config.account_id.clone(), - HashMap::from([(config.function_name.clone(), config.clone())]), - )]); - - let mut block_streams_handler = BlockStreamsHandler::default(); - let config_clone = config.clone(); - block_streams_handler.expect_list().returning(move || { - Ok(vec![StreamInfo { - stream_id: config_clone.get_redis_stream_key(), - account_id: config_clone.account_id.to_string(), - function_name: config_clone.function_name.clone(), - version: config_clone.get_registry_version(), - }]) - }); - block_streams_handler.expect_stop().never(); - block_streams_handler.expect_start().never(); - - let mut executors_handler = ExecutorsHandler::default(); - let config_clone = config.clone(); - executors_handler.expect_list().returning(move || { - Ok(vec![ExecutorInfo { - executor_id: "executor_id".to_string(), - account_id: config_clone.account_id.to_string(), - function_name: config_clone.function_name.clone(), - version: config_clone.get_registry_version(), - status: "running".to_string(), - }]) - }); - executors_handler.expect_stop().never(); - executors_handler.expect_start().never(); - - let mut registry = Registry::default(); - registry - .expect_fetch() - .returning(move || Ok(indexer_registry.clone())); - - let mut state_manager = IndexerStateManager::default(); - state_manager - .expect_set_synced() - .with(eq(config.clone())) - .returning(|_| Ok(())) - .once(); - state_manager.expect_list().returning(move || { - Ok(vec![IndexerState { - account_id: config.account_id.clone(), - function_name: config.function_name.clone(), - block_stream_synced_at: Some(config.get_registry_version()), - enabled: true, - provisioned_state: ProvisionedState::Provisioned, - }]) - }); - - let redis_client = RedisClient::default(); - let data_layer_handler = DataLayerHandler::default(); - - let synchroniser = Synchroniser::new( - &block_streams_handler, - &executors_handler, - &data_layer_handler, - ®istry, - &state_manager, - &redis_client, - ); - - synchroniser.sync().await.unwrap(); - } - - #[tokio::test] - async fn restarts_outdated() { - let config = IndexerConfig::default(); - - let indexer_registry = IndexerRegistry::from(&[( - config.account_id.clone(), - HashMap::from([(config.function_name.clone(), config.clone())]), - )]); - - let mut block_streams_handler = BlockStreamsHandler::default(); - let config_clone = config.clone(); - block_streams_handler.expect_list().returning(move || { - Ok(vec![StreamInfo { - stream_id: "stream_id".to_string(), - account_id: config_clone.account_id.to_string(), - function_name: config_clone.function_name.clone(), - version: config_clone.get_registry_version() + 1, - }]) - }); - block_streams_handler - .expect_stop() - .with(eq("stream_id".to_string())) - .returning(|_| Ok(())) - .once(); - block_streams_handler - .expect_start() - .with(eq(100), eq(config.clone())) - .returning(|_, _| Ok(())) - .once(); - - let mut executors_handler = ExecutorsHandler::default(); - let config_clone = config.clone(); - executors_handler.expect_list().returning(move || { - Ok(vec![ExecutorInfo { - executor_id: "executor_id".to_string(), - account_id: config_clone.account_id.to_string(), - function_name: config_clone.function_name.clone(), - version: config_clone.get_registry_version() + 1, - status: "running".to_string(), - }]) - }); - executors_handler - .expect_stop() - .with(eq("executor_id".to_string())) - .returning(|_| Ok(())) - .once(); - executors_handler - .expect_start() - .with(eq(config.clone())) - .returning(|_| Ok(())) - .once(); - - let mut registry = Registry::default(); - registry - .expect_fetch() - .returning(move || Ok(indexer_registry.clone())); - - let mut redis_client = RedisClient::default(); - redis_client - .expect_clear_block_stream() - .with(eq(config.clone())) - .returning(|_| Ok(())) - .once(); - - let mut state_manager = IndexerStateManager::default(); - state_manager - .expect_set_synced() - .with(eq(config.clone())) - .returning(|_| Ok(())) - .once(); - state_manager.expect_list().returning(move || { - Ok(vec![IndexerState { - account_id: config.account_id.clone(), - function_name: config.function_name.clone(), - block_stream_synced_at: Some(config.get_registry_version()), - enabled: true, - provisioned_state: ProvisionedState::Provisioned, - }]) - }); - - let data_layer_handler = DataLayerHandler::default(); - - let synchroniser = Synchroniser::new( - &block_streams_handler, - &executors_handler, - &data_layer_handler, - ®istry, - &state_manager, - &redis_client, - ); - - synchroniser.sync().await.unwrap(); - } - - #[tokio::test] - async fn treats_unsynced_blocks_streams_as_new() { - let config = IndexerConfig::default(); - let state = IndexerState { - account_id: config.account_id.clone(), - function_name: config.function_name.clone(), - block_stream_synced_at: None, - enabled: true, - provisioned_state: ProvisionedState::Provisioned, - }; - - let mut block_streams_handler = BlockStreamsHandler::default(); - block_streams_handler - .expect_start() - .with(eq(100), eq(config.clone())) - .returning(|_, _| Ok(())) - .once(); - - let redis_client = RedisClient::default(); - let state_manager = IndexerStateManager::default(); - let executors_handler = ExecutorsHandler::default(); - let registry = Registry::default(); - let data_layer_handler = DataLayerHandler::default(); - - let synchroniser = Synchroniser::new( - &block_streams_handler, - &executors_handler, - &data_layer_handler, - ®istry, - &state_manager, - &redis_client, - ); - - synchroniser - .sync_existing_block_stream(&config, &state, None) - .await - .unwrap(); - } - - #[tokio::test] - async fn restarts_stopped_and_outdated_block_stream() { - let config = IndexerConfig::default(); - let state = IndexerState { - account_id: config.account_id.clone(), - function_name: config.function_name.clone(), - block_stream_synced_at: Some(config.get_registry_version() - 1), - enabled: true, - provisioned_state: ProvisionedState::Provisioned, - }; - - let mut block_streams_handler = BlockStreamsHandler::default(); - block_streams_handler - .expect_start() - .with(eq(100), eq(config.clone())) - .returning(|_, _| Ok(())) - .once(); - - let mut redis_client = RedisClient::default(); - redis_client - .expect_clear_block_stream() - .with(eq(config.clone())) - .returning(|_| Ok(())) - .once(); - - let state_manager = IndexerStateManager::default(); - let executors_handler = ExecutorsHandler::default(); - let registry = Registry::default(); - let data_layer_handler = DataLayerHandler::default(); - - let synchroniser = Synchroniser::new( - &block_streams_handler, - &executors_handler, - &data_layer_handler, - ®istry, - &state_manager, - &redis_client, - ); - - synchroniser - .sync_existing_block_stream(&config, &state, None) - .await - .unwrap(); - } - - #[tokio::test] - async fn resumes_stopped_and_synced_block_stream() { - let config = IndexerConfig::default(); - let state = IndexerState { - account_id: config.account_id.clone(), - function_name: config.function_name.clone(), - block_stream_synced_at: Some(config.get_registry_version()), - enabled: true, - provisioned_state: ProvisionedState::Provisioned, - }; - - let last_published_block = 1; - - let mut redis_client = RedisClient::default(); - redis_client - .expect_clear_block_stream::() - .never(); - redis_client - .expect_get_last_published_block() - .with(eq(config.clone())) - .returning(move |_| Ok(Some(last_published_block))); - - let mut block_streams_handler = BlockStreamsHandler::default(); - block_streams_handler - .expect_start() - .with(eq(last_published_block + 1), eq(config.clone())) - .returning(|_, _| Ok(())) - .once(); - - let state_manager = IndexerStateManager::default(); - let executors_handler = ExecutorsHandler::default(); - let registry = Registry::default(); - let data_layer_handler = DataLayerHandler::default(); - - let synchroniser = Synchroniser::new( - &block_streams_handler, - &executors_handler, - &data_layer_handler, - ®istry, - &state_manager, - &redis_client, - ); - - synchroniser - .sync_existing_block_stream(&config, &state, None) - .await - .unwrap(); - } - - #[tokio::test] - async fn reconfigures_block_stream() { - let config_with_latest = IndexerConfig { - start_block: StartBlock::Latest, - ..IndexerConfig::default() - }; - let height = 5; - let config_with_height = IndexerConfig { - start_block: StartBlock::Height(height), - ..IndexerConfig::default() - }; - let last_published_block = 1; - let config_with_continue = IndexerConfig { - start_block: StartBlock::Continue, - ..IndexerConfig::default() - }; - - let mut block_streams_handler = BlockStreamsHandler::default(); - block_streams_handler - .expect_start() - .with( - eq(last_published_block + 1), - eq(config_with_continue.clone()), - ) - .returning(|_, _| Ok(())) - .once(); - block_streams_handler - .expect_start() - .with( - eq(config_with_latest.get_registry_version()), - eq(config_with_latest.clone()), - ) - .returning(|_, _| Ok(())) - .once(); - block_streams_handler - .expect_start() - .with(eq(height), eq(config_with_height.clone())) - .returning(|_, _| Ok(())) - .once(); - - let mut redis_client = RedisClient::default(); - redis_client - .expect_clear_block_stream() - .with(eq(config_with_latest.clone())) - .returning(|_| Ok(())) - .once(); - redis_client - .expect_clear_block_stream() - .with(eq(config_with_height.clone())) - .returning(|_| Ok(())) - .once(); - redis_client - .expect_get_last_published_block() - .with(eq(config_with_continue.clone())) - .returning(move |_| Ok(Some(last_published_block))); - - let state_manager = IndexerStateManager::default(); - let executors_handler = ExecutorsHandler::default(); - let registry = Registry::default(); - let data_layer_handler = DataLayerHandler::default(); - - let synchroniser = Synchroniser::new( - &block_streams_handler, - &executors_handler, - &data_layer_handler, - ®istry, - &state_manager, - &redis_client, - ); - - synchroniser - .reconfigure_block_stream(&config_with_latest) - .await - .unwrap(); - synchroniser - .reconfigure_block_stream(&config_with_height) - .await - .unwrap(); - synchroniser - .reconfigure_block_stream(&config_with_continue) - .await - .unwrap(); - } - - #[tokio::test] - async fn stops_disabled_indexers() { - let config = IndexerConfig::default(); - let state = IndexerState { - account_id: config.account_id.clone(), - function_name: config.function_name.clone(), - block_stream_synced_at: Some(config.get_registry_version()), - enabled: false, - provisioned_state: ProvisionedState::Provisioned, - }; - let executor = ExecutorInfo { - executor_id: "executor_id".to_string(), - account_id: config.account_id.to_string(), - function_name: config.function_name.clone(), - version: config.get_registry_version(), - status: "running".to_string(), - }; - let block_stream = StreamInfo { - stream_id: "stream_id".to_string(), - account_id: config.account_id.to_string(), - function_name: config.function_name.clone(), - version: config.get_registry_version(), - }; - - let mut block_streams_handler = BlockStreamsHandler::default(); - block_streams_handler - .expect_stop() - .with(eq("stream_id".to_string())) - .returning(|_| Ok(())) - .once(); - - let mut executors_handler = ExecutorsHandler::default(); - executors_handler - .expect_stop() - .with(eq("executor_id".to_string())) - .returning(|_| Ok(())) - .once(); - - let mut state_manager = IndexerStateManager::default(); - state_manager - .expect_set_synced() - .with(eq(config.clone())) - .returning(|_| Ok(())) - .never(); - - let registry = Registry::default(); - let redis_client = RedisClient::default(); - let data_layer_handler = DataLayerHandler::default(); - - let synchroniser = Synchroniser::new( - &block_streams_handler, - &executors_handler, - &data_layer_handler, - ®istry, - &state_manager, - &redis_client, - ); - - synchroniser - .sync_existing_indexer(&config, &state, Some(&executor), Some(&block_stream)) - .await - .unwrap(); - // Simulate second run, start/stop etc should not be called - synchroniser - .sync_existing_indexer(&config, &state, None, None) - .await - .unwrap(); - } - } - - mod deleted { - use super::*; - - #[tokio::test] - async fn stops_block_stream_and_executor() { - let config = IndexerConfig::default(); - let state = IndexerState { - account_id: config.account_id.clone(), - function_name: config.function_name.clone(), - block_stream_synced_at: Some(config.get_registry_version()), - enabled: false, - provisioned_state: ProvisionedState::Deprovisioning { - task_id: "task_id".to_string(), - }, - }; - let executor = ExecutorInfo { - executor_id: "executor_id".to_string(), - account_id: config.account_id.to_string(), - function_name: config.function_name.clone(), - version: config.get_registry_version(), - status: "running".to_string(), - }; - let block_stream = StreamInfo { - stream_id: "stream_id".to_string(), - account_id: config.account_id.to_string(), - function_name: config.function_name.clone(), - version: config.get_registry_version(), - }; - - let mut block_streams_handler = BlockStreamsHandler::default(); - block_streams_handler - .expect_stop() - .with(eq("stream_id".to_string())) - .returning(|_| Ok(())) - .once(); - - let mut executors_handler = ExecutorsHandler::default(); - executors_handler - .expect_stop() - .with(eq("executor_id".to_string())) - .returning(|_| Ok(())) - .once(); - - let mut state_manager = IndexerStateManager::default(); - state_manager.expect_delete_state().never(); - - let mut data_layer_handler = DataLayerHandler::default(); - data_layer_handler - .expect_get_task_status() - .with(eq("task_id".to_string())) - .returning(|_| Ok(TaskStatus::Pending)); - - let registry = Registry::default(); - let redis_client = RedisClient::default(); - - let synchroniser = Synchroniser::new( - &block_streams_handler, - &executors_handler, - &data_layer_handler, - ®istry, - &state_manager, - &redis_client, - ); - - synchroniser - .sync_deleted_indexer(&state, Some(&executor), Some(&block_stream)) - .await - .unwrap(); - } - - #[tokio::test] - async fn cleans_indexer_resources() { - let config = IndexerConfig::default(); - let provisioned_state = IndexerState { - account_id: config.account_id.clone(), - function_name: config.function_name.clone(), - block_stream_synced_at: Some(config.get_registry_version()), - enabled: false, - provisioned_state: ProvisionedState::Provisioned, - }; - let deprovisioning_state = IndexerState { - account_id: config.account_id.clone(), - function_name: config.function_name.clone(), - block_stream_synced_at: Some(config.get_registry_version()), - enabled: false, - provisioned_state: ProvisionedState::Deprovisioning { - task_id: "task_id".to_string(), - }, - }; - - let mut state_manager = IndexerStateManager::default(); - state_manager - .expect_set_deprovisioning() - .with(eq(provisioned_state.clone()), eq("task_id".to_string())) - .returning(|_, _| Ok(())); - state_manager - .expect_delete_state() - .with(eq(deprovisioning_state.clone())) - .returning(|_| Ok(())) - .once(); - - let mut data_layer_handler = DataLayerHandler::default(); - data_layer_handler - .expect_start_deprovisioning_task() - .with( - eq(config.clone().account_id), - eq(config.clone().function_name), - ) - .returning(|_, _| Ok("task_id".to_string())); - data_layer_handler - .expect_get_task_status() - .with(eq("task_id".to_string())) - .returning(|_| Ok(TaskStatus::Complete)); - - let mut redis_client = RedisClient::default(); - redis_client - .expect_del::() - .with(eq(config.get_redis_stream_key())) - .returning(|_| Ok(())) - .once(); - - let registry = Registry::default(); - let block_streams_handler = BlockStreamsHandler::default(); - let executors_handler = ExecutorsHandler::default(); - - let synchroniser = Synchroniser::new( - &block_streams_handler, - &executors_handler, - &data_layer_handler, - ®istry, - &state_manager, - &redis_client, - ); - - synchroniser - .sync_deleted_indexer(&provisioned_state, None, None) - .await - .unwrap(); - synchroniser - .sync_deleted_indexer(&deprovisioning_state, None, None) - .await - .unwrap(); - } - } -} diff --git a/frontend/src/utils/pgSchemaTypeGen.js b/frontend/src/utils/pgSchemaTypeGen.js index 767a85be8..a53df0005 100644 --- a/frontend/src/utils/pgSchemaTypeGen.js +++ b/frontend/src/utils/pgSchemaTypeGen.js @@ -104,7 +104,6 @@ export class PgSchemaTypeGen { ) { this.addColumn(columnSpec, columns); } else if ( - Object.prototype.hasOwnProperty.call(columnSpec, 'constraint') && columnSpec.constraint_type === 'primary key' ) { for (const foreignKeyDef of columnSpec.definition) { diff --git a/frontend/widgets/src/QueryApi.IndexerExplorer.jsx b/frontend/widgets/src/QueryApi.IndexerExplorer.jsx index 1af68c70d..550604178 100644 --- a/frontend/widgets/src/QueryApi.IndexerExplorer.jsx +++ b/frontend/widgets/src/QueryApi.IndexerExplorer.jsx @@ -5,8 +5,10 @@ const [myIndexers, setMyIndexers] = useState([]); const [allIndexers, setAllIndexers] = useState([]); const [error, setError] = useState(null); const [indexerMetadata, setIndexerMetaData] = useState(new Map()); +const [loading, setLoading] = useState(false); const fetchIndexerData = () => { + setLoading(true); Near.asyncView(`${REPL_REGISTRY_CONTRACT_ID}`, "list_all").then((data) => { const allIndexers = []; const myIndexers = []; @@ -22,6 +24,7 @@ const fetchIndexerData = () => { }); setMyIndexers(myIndexers); setAllIndexers(allIndexers); + setLoading(false); }); } @@ -61,6 +64,14 @@ useEffect(() => { storeIndexerMetaData(); }, []); +const Container = styled.div` + display: flex; + justify-content: center; + align-items: center; + height: 100%; + width: 100%; +`; + const Wrapper = styled.div` display: flex; flex-direction: column; @@ -232,6 +243,39 @@ const TextLink = styled.a` } `; +const LoadingSpinner = () => { + const spinnerStyle = { + width: '40px', + height: '40px', + border: '4px solid rgba(0, 0, 0, 0.1)', + borderLeftColor: 'black', + borderRadius: '50%', + animation: 'spin 1s linear infinite', + textAlign: 'center', + display: 'flex', + justifyContent: 'center', + alignCenter: 'center', + }; + + const LoadingContainer = styled.div` + text-align: center; + width: 100%; + height: 100%; + `; + + const LoadingSpinnerContainer = styled.div` + display: flex; + justify-content: center; + font-size: 14px; + ` + return + +
+ + <>{selectedTab === "my-indexers" ? "Loading Your Indexers" : "Loading All Indexers"} + ; +}; + return ( @@ -249,47 +293,64 @@ return ( {error && {error}} + {selectedTab === "all" && ( <> - - {allIndexers.map((indexer, i) => ( - - - - ))} - + {loading ? ( + + + + ) : ( + + <> + {allIndexers.map((indexer, i) => ( + + + + ))} + + + )} )} - {selectedTab === "my-indexers" && myIndexers.length === 0 && ( -
-

You don't have any indexers yet.

-

- QueryAPI streamlines the process of querying specific data from the Near Blockchain. Explore new Indexers and fork them to try it out! -

-

- To learn more about QueryAPI, visit - - QueryAPI Docs - -

-
+ + {selectedTab === "my-indexers" && ( + <> + {loading ? ( + + + + ) : myIndexers.length === 0 ? ( +
+

You don't have any indexers yet.

+

+ QueryAPI streamlines the process of querying specific data from the Near Blockchain. Explore new Indexers and fork them to try it out! +

+

+ To learn more about QueryAPI, visit + + QueryAPI Docs + +

+
+ ) : ( + + <> + {myIndexers.map((indexer, i) => ( + + + + ))} + + + )} + )} - - {selectedTab === "my-indexers" && ( - <> - {myIndexers.map((indexer, i) => ( - - - - ))} - - )} - -
+ ); diff --git a/runner-client/proto/runner.proto b/runner-client/proto/runner.proto index 82c457f7d..39801e657 100644 --- a/runner-client/proto/runner.proto +++ b/runner-client/proto/runner.proto @@ -10,6 +10,15 @@ service Runner { // Lists all Runner executor rpc ListExecutors (ListExecutorsRequest) returns (ListExecutorsResponse); + + // Get Executor info + rpc GetExecutor (GetExecutorRequest) returns (ExecutorInfo); +} + +// Get Executor request +message GetExecutorRequest { + string account_id = 1; + string function_name = 2; } // Start Executor Request @@ -53,7 +62,26 @@ message ExecutorInfo { string executor_id = 1; string account_id = 2; string function_name = 3; - string status = 4; // Block height corresponding to the created/updated height of the indexer uint64 version = 5; + Health health = 6; +} + +// Contains health information for the Executor +message Health { + ExecutionState execution_state = 1; +} + +enum ExecutionState { + UNSPECIFIED = 0; + // Running as expected + RUNNING = 1; + // Executor is running, but the execution is erroring + FAILING = 2; + // Waiting for some internal condition to be met before proceeding + WAITING = 3; + // Intentionally stopped + STOPPED = 4; + // Unintentionally stopped + STALLED = 5; } diff --git a/runner/examples/list-executors.ts b/runner/examples/list-executors.ts index b5200b7e2..396208405 100644 --- a/runner/examples/list-executors.ts +++ b/runner/examples/list-executors.ts @@ -1,13 +1,13 @@ // Run with 'npx ts-node src/test-client.ts' -import runnerClient from '../src/server/runner-client'; +import runnerClient from '../src/server/services/runner/runner-client'; void (async function main () { runnerClient.ListExecutors({}, (err, response) => { if (err) { console.error('List request error: ', err); } else { - console.log('Successful ListExecutors request: ', response); + console.log('list response: ', JSON.stringify({ response }, null, 2)); } }); })(); diff --git a/runner/examples/start-executor.ts b/runner/examples/start-executor.ts index d9466d1e9..9b573daef 100644 --- a/runner/examples/start-executor.ts +++ b/runner/examples/start-executor.ts @@ -1,6 +1,6 @@ // Run with 'npx ts-node src/test-client.ts' -import runnerClient from '../src/server/runner-client'; +import runnerClient from '../src/server/services/runner/runner-client'; const schema = ` CREATE TABLE @@ -13,7 +13,7 @@ CREATE TABLE `; const code = ` -console.log("hello"); +// do nothing `; const indexer = { @@ -35,7 +35,7 @@ void (async function main () { if (err) { console.error('error: ', err); } else { - console.log('start request: ', response); + console.log('start response: ', JSON.stringify({ response }, null, 2)); } }); })(); diff --git a/runner/examples/stop-executor.ts b/runner/examples/stop-executor.ts index 03466c997..ff7eef1bf 100644 --- a/runner/examples/stop-executor.ts +++ b/runner/examples/stop-executor.ts @@ -1,13 +1,13 @@ // Run with 'npx ts-node src/test-client.ts' -import runnerClient from '../src/server/runner-client'; +import runnerClient from '../src/server/services/runner/runner-client'; runnerClient.StopExecutor({ - executorId: 'SOME_EXECUTOR_ID' + executorId: '0293a6b1dcd2259a8be6b59a8cd3e7b4285e540a64a7cbe99639947f7b7e2f9a' }, (err, response) => { if (err) { console.error('error: ', err); } else { - console.log('stop request: ', response); + console.log('stop request: ', JSON.stringify({ response }, null, 2)); } }); diff --git a/runner/protos/runner.proto b/runner/protos/runner.proto index 82c457f7d..39801e657 100644 --- a/runner/protos/runner.proto +++ b/runner/protos/runner.proto @@ -10,6 +10,15 @@ service Runner { // Lists all Runner executor rpc ListExecutors (ListExecutorsRequest) returns (ListExecutorsResponse); + + // Get Executor info + rpc GetExecutor (GetExecutorRequest) returns (ExecutorInfo); +} + +// Get Executor request +message GetExecutorRequest { + string account_id = 1; + string function_name = 2; } // Start Executor Request @@ -53,7 +62,26 @@ message ExecutorInfo { string executor_id = 1; string account_id = 2; string function_name = 3; - string status = 4; // Block height corresponding to the created/updated height of the indexer uint64 version = 5; + Health health = 6; +} + +// Contains health information for the Executor +message Health { + ExecutionState execution_state = 1; +} + +enum ExecutionState { + UNSPECIFIED = 0; + // Running as expected + RUNNING = 1; + // Executor is running, but the execution is erroring + FAILING = 2; + // Waiting for some internal condition to be met before proceeding + WAITING = 3; + // Intentionally stopped + STOPPED = 4; + // Unintentionally stopped + STALLED = 5; } diff --git a/runner/src/dml-handler/dml-handler.test.ts b/runner/src/dml-handler/dml-handler.test.ts index 2cc52842a..4b7c2ef19 100644 --- a/runner/src/dml-handler/dml-handler.test.ts +++ b/runner/src/dml-handler/dml-handler.test.ts @@ -27,6 +27,7 @@ describe('DML Handler tests', () => { format: pgFormat } as unknown as PgClient; TABLE_DEFINITION_NAMES = { + tableName: 'test_table', originalTableName: '"test_table"', originalColumnNames: new Map([ ['account_id', 'account_id'], diff --git a/runner/src/dml-handler/dml-handler.ts b/runner/src/dml-handler/dml-handler.ts index a76163a27..7a481f3e9 100644 --- a/runner/src/dml-handler/dml-handler.ts +++ b/runner/src/dml-handler/dml-handler.ts @@ -6,10 +6,19 @@ import type IndexerConfig from '../indexer-config/indexer-config'; import { type Tracer, trace, type Span } from '@opentelemetry/api'; import { type QueryResult } from 'pg'; -type WhereClauseMulti = Record)>; -type WhereClauseSingle = Record; - -export default class DmlHandler { +export type PostgresRowValue = string | number | any; +export type PostgresRow = Record; +export type WhereClauseMulti = Record; +export type WhereClauseSingle = Record; + +export interface IDmlHandler { + insert: (tableDefinitionNames: TableDefinitionNames, rowsToInsert: PostgresRow[]) => Promise + select: (tableDefinitionNames: TableDefinitionNames, whereObject: WhereClauseMulti, limit: number | null) => Promise + update: (tableDefinitionNames: TableDefinitionNames, whereObject: WhereClauseSingle, updateObject: any) => Promise + upsert: (tableDefinitionNames: TableDefinitionNames, rowsToUpsert: PostgresRow[], conflictColumns: string[], updateColumns: string[]) => Promise + delete: (tableDefinitionNames: TableDefinitionNames, whereObject: WhereClauseMulti) => Promise +} +export default class DmlHandler implements IDmlHandler { validTableNameRegex = /^[a-zA-Z_][a-zA-Z0-9_]*$/; pgClient: PgClient; tracer: Tracer; @@ -53,7 +62,7 @@ export default class DmlHandler { return { queryVars, whereClause }; } - async insert (tableDefinitionNames: TableDefinitionNames, rowsToInsert: any[]): Promise { + async insert (tableDefinitionNames: TableDefinitionNames, rowsToInsert: PostgresRow[]): Promise { if (!rowsToInsert?.length) { return []; } @@ -67,7 +76,7 @@ export default class DmlHandler { return result.rows; } - async select (tableDefinitionNames: TableDefinitionNames, whereObject: WhereClauseMulti, limit: number | null = null): Promise { + async select (tableDefinitionNames: TableDefinitionNames, whereObject: WhereClauseMulti, limit: number | null = null): Promise { const { queryVars, whereClause } = this.getWhereClause(whereObject, tableDefinitionNames.originalColumnNames); let query = `SELECT * FROM ${this.indexerConfig.schemaName()}.${tableDefinitionNames.originalTableName} WHERE ${whereClause}`; if (limit !== null) { @@ -78,7 +87,7 @@ export default class DmlHandler { return result.rows; } - async update (tableDefinitionNames: TableDefinitionNames, whereObject: WhereClauseSingle, updateObject: any): Promise { + async update (tableDefinitionNames: TableDefinitionNames, whereObject: WhereClauseSingle, updateObject: any): Promise { const updateKeys = Object.keys(updateObject).map((col) => tableDefinitionNames.originalColumnNames.get(col) ?? col); const updateParam = Array.from({ length: updateKeys.length }, (_, index) => `${updateKeys[index]}=$${index + 1}`).join(', '); const whereKeys = Object.keys(whereObject).map((col) => tableDefinitionNames.originalColumnNames.get(col) ?? col); @@ -91,7 +100,7 @@ export default class DmlHandler { return result.rows; } - async upsert (tableDefinitionNames: TableDefinitionNames, rowsToUpsert: any[], conflictColumns: string[], updateColumns: string[]): Promise { + async upsert (tableDefinitionNames: TableDefinitionNames, rowsToUpsert: PostgresRow[], conflictColumns: string[], updateColumns: string[]): Promise { if (!rowsToUpsert?.length) { return []; } @@ -108,7 +117,7 @@ export default class DmlHandler { return result.rows; } - async delete (tableDefinitionNames: TableDefinitionNames, whereObject: WhereClauseMulti): Promise { + async delete (tableDefinitionNames: TableDefinitionNames, whereObject: WhereClauseMulti): Promise { const { queryVars, whereClause } = this.getWhereClause(whereObject, tableDefinitionNames.originalColumnNames); const query = `DELETE FROM ${this.indexerConfig.schemaName()}.${tableDefinitionNames.originalTableName} WHERE ${whereClause} RETURNING *`; diff --git a/runner/src/dml-handler/in-memory-dml-handler.test.ts b/runner/src/dml-handler/in-memory-dml-handler.test.ts new file mode 100644 index 000000000..6fbc20c79 --- /dev/null +++ b/runner/src/dml-handler/in-memory-dml-handler.test.ts @@ -0,0 +1,205 @@ +import { type TableDefinitionNames } from '../indexer'; +import InMemoryDmlHandler from './in-memory-dml-handler'; + +const DEFAULT_ITEM_1_WITHOUT_ID = { + account_id: 'TEST_NEAR', + block_height: 1, + content: 'CONTENT', + accounts_liked: [], +}; + +const DEFAULT_ITEM_1_WITH_ID = { + id: 1, + account_id: 'TEST_NEAR', + block_height: 1, + content: 'CONTENT', + accounts_liked: [], +}; + +const DEFAULT_ITEM_2_WITHOUT_ID = { + account_id: 'TEST_NEAR', + block_height: 2, + content: 'CONTENT', + accounts_liked: [], +}; + +const DEFAULT_ITEM_2_WITH_ID = { + id: 2, + account_id: 'TEST_NEAR', + block_height: 2, + content: 'CONTENT', + accounts_liked: [], +}; + +describe('DML Handler Fixture Tests', () => { + const SIMPLE_SCHEMA = `CREATE TABLE + "posts" ( + "id" SERIAL NOT NULL, + "account_id" VARCHAR NOT NULL, + "block_height" DECIMAL(58, 0) NOT NULL, + "content" TEXT NOT NULL, + "accounts_liked" JSONB NOT NULL DEFAULT '[]', + CONSTRAINT "posts_pkey" PRIMARY KEY ("id", "account_id") + );`; + const TABLE_DEFINITION_NAMES: TableDefinitionNames = { + tableName: 'posts', + originalTableName: '"posts"', + originalColumnNames: new Map([]) + }; + + let dmlHandler: InMemoryDmlHandler; + + beforeEach(() => { + dmlHandler = new InMemoryDmlHandler(SIMPLE_SCHEMA); + }); + + test('select rows', async () => { + const inputObj = [DEFAULT_ITEM_1_WITHOUT_ID, DEFAULT_ITEM_2_WITHOUT_ID]; + + await dmlHandler.insert(TABLE_DEFINITION_NAMES, inputObj); + + const selectSingleValue = await dmlHandler.select(TABLE_DEFINITION_NAMES, { id: 1 }); + expect(selectSingleValue[0].id).toEqual(1); + + const selectMultipleValues = await dmlHandler.select(TABLE_DEFINITION_NAMES, { account_id: 'TEST_NEAR', block_height: [1, 2] }); + expect(selectMultipleValues[0].account_id).toEqual('TEST_NEAR'); + expect(selectMultipleValues[1].account_id).toEqual('TEST_NEAR'); + expect(selectMultipleValues[0].block_height).toEqual(1); + expect(selectMultipleValues[1].block_height).toEqual(2); + + expect(await dmlHandler.select(TABLE_DEFINITION_NAMES, { account_id: 'unknown_near' })).toEqual([]); + }); + + test('insert two rows with serial column', async () => { + const inputObj = [DEFAULT_ITEM_1_WITHOUT_ID, DEFAULT_ITEM_2_WITHOUT_ID]; + + const correctResult = [DEFAULT_ITEM_1_WITH_ID, DEFAULT_ITEM_2_WITH_ID]; + + const result = await dmlHandler.insert(TABLE_DEFINITION_NAMES, inputObj); + expect(result).toEqual(correctResult); + }); + + test('reject insert after specifying serial column value', async () => { + const inputObjWithSerial = [DEFAULT_ITEM_1_WITH_ID]; + const inputObj = [DEFAULT_ITEM_2_WITHOUT_ID]; + + await dmlHandler.insert(TABLE_DEFINITION_NAMES, inputObjWithSerial); + await expect(dmlHandler.insert(TABLE_DEFINITION_NAMES, inputObj)).rejects.toThrow('Cannot insert row twice into the same table'); + }); + + test('reject insert after not specifying primary key value', async () => { + const inputObj = [{ + block_height: 1, + content: 'CONTENT', + accounts_liked: [], + }]; + + await expect(dmlHandler.insert(TABLE_DEFINITION_NAMES, inputObj)).rejects.toThrow('Inserted row must specify value for primary key columns'); + }); + + test('update rows', async () => { + const inputObj = [DEFAULT_ITEM_1_WITHOUT_ID, DEFAULT_ITEM_2_WITHOUT_ID]; + + await dmlHandler.insert(TABLE_DEFINITION_NAMES, inputObj); + + const updateOne = await dmlHandler.update(TABLE_DEFINITION_NAMES, { account_id: 'TEST_NEAR', block_height: 2 }, { content: 'UPDATED_CONTENT' }); + const selectOneUpdate = await dmlHandler.select(TABLE_DEFINITION_NAMES, { account_id: 'TEST_NEAR', block_height: 2 }); + expect(updateOne).toEqual(selectOneUpdate); + + const updateAll = await dmlHandler.update(TABLE_DEFINITION_NAMES, { account_id: 'TEST_NEAR' }, { content: 'final content' }); + const selectAllUpdated = await dmlHandler.select(TABLE_DEFINITION_NAMES, { account_id: 'TEST_NEAR' }); + expect(updateAll).toEqual(selectAllUpdated); + }); + + test('update criteria matches nothing', async () => { + const inputObj = [DEFAULT_ITEM_1_WITHOUT_ID, DEFAULT_ITEM_2_WITHOUT_ID]; + + await dmlHandler.insert(TABLE_DEFINITION_NAMES, inputObj); + + const updateNone = await dmlHandler.update(TABLE_DEFINITION_NAMES, { account_id: 'none_near' }, { content: 'UPDATED_CONTENT' }); + const selectUpdated = await dmlHandler.select(TABLE_DEFINITION_NAMES, { content: 'UPDATED_CONTENT' }); + expect(updateNone).toEqual([]); + expect(selectUpdated).toEqual([]); + }); + + test('upsert rows', async () => { + const inputObj = [DEFAULT_ITEM_1_WITHOUT_ID]; + + await dmlHandler.insert(TABLE_DEFINITION_NAMES, inputObj); + + const upsertObj = [{ + account_id: 'TEST_NEAR', + block_height: 1, + content: 'UPSERT', + accounts_liked: [], + }, + { + account_id: 'TEST_NEAR', + block_height: 2, + content: 'UPSERT', + accounts_liked: [], + }]; + + const upserts = await dmlHandler.upsert(TABLE_DEFINITION_NAMES, upsertObj, ['account_id', 'block_height'], ['content']); + + const selectAll = await dmlHandler.select(TABLE_DEFINITION_NAMES, { account_id: 'TEST_NEAR' }); + expect(upserts).toEqual(selectAll); + }); + + test('upsert rows with non unique conflcit columns', async () => { + const inputObj = [DEFAULT_ITEM_1_WITHOUT_ID, DEFAULT_ITEM_2_WITHOUT_ID]; + + await dmlHandler.insert(TABLE_DEFINITION_NAMES, inputObj); + + const upsertObj = [{ + account_id: 'TEST_NEAR', + block_height: 1, + content: 'UPSERT', + accounts_liked: [], + }, + { + account_id: 'TEST_NEAR', + block_height: 2, + content: 'UPSERT', + accounts_liked: [], + }]; + + await expect(dmlHandler.upsert(TABLE_DEFINITION_NAMES, upsertObj, ['account_id'], ['content'])).rejects.toThrow('Conflict update criteria cannot affect row twice'); + }); + + test('reject upsert due to duplicate row', async () => { + const inputObj = [DEFAULT_ITEM_1_WITH_ID, DEFAULT_ITEM_1_WITH_ID]; + + await expect(dmlHandler.upsert(TABLE_DEFINITION_NAMES, inputObj, ['id', 'account_id'], ['content'])).rejects.toThrow('Conflict update criteria cannot affect row twice'); + }); + + test('reject upsert after specifying serial column value', async () => { + const inputObjWithSerial = [DEFAULT_ITEM_1_WITH_ID]; + const inputObj = [DEFAULT_ITEM_1_WITHOUT_ID]; + + await dmlHandler.upsert(TABLE_DEFINITION_NAMES, inputObjWithSerial, ['id', 'account_id'], ['content']); + await expect(dmlHandler.upsert(TABLE_DEFINITION_NAMES, inputObj, ['id', 'account_id'], ['content'])).rejects.toThrow('Cannot insert row twice into the same table'); + }); + + test('reject insert after not specifying primary key value', async () => { + const inputObj = [{ + block_height: 1, + content: 'CONTENT', + accounts_liked: [], + }]; + + await expect(dmlHandler.upsert(TABLE_DEFINITION_NAMES, inputObj, ['id', 'account_id'], ['content'])).rejects.toThrow('Inserted row must specify value for primary key columns'); + }); + + test('delete rows', async () => { + const inputObj = [DEFAULT_ITEM_1_WITHOUT_ID, DEFAULT_ITEM_2_WITHOUT_ID]; + + const correctResponse = [DEFAULT_ITEM_1_WITH_ID, DEFAULT_ITEM_2_WITH_ID]; + + await dmlHandler.insert(TABLE_DEFINITION_NAMES, inputObj); + + const deletedRows = await dmlHandler.delete(TABLE_DEFINITION_NAMES, { account_id: 'TEST_NEAR' }); + + expect(deletedRows).toEqual(correctResponse); + }); +}); diff --git a/runner/src/dml-handler/in-memory-dml-handler.ts b/runner/src/dml-handler/in-memory-dml-handler.ts new file mode 100644 index 000000000..7a8783ef4 --- /dev/null +++ b/runner/src/dml-handler/in-memory-dml-handler.ts @@ -0,0 +1,358 @@ +import { type AST, Parser } from 'node-sql-parser'; +import { type TableDefinitionNames } from '../indexer'; +import { type PostgresRow, type WhereClauseMulti, type WhereClauseSingle, type IDmlHandler } from './dml-handler'; + +// TODO: Define class to represent specification +interface TableSpecification { + tableName: string + columnNames: string[] + primaryKeyColumns: string[] + serialColumns: string[] +} + +class PostgresRowEntity { + data: PostgresRow; + private readonly primaryKeys: string[]; + + constructor (data: PostgresRow, primaryKeys: string[]) { + this.data = data; + this.primaryKeys = primaryKeys.sort(); + + // TODO: Verify value of primary key as well (if primary key is NOT NULL) + if (!primaryKeys.every(primaryKey => { + return primaryKey in data; + })) { + throw new Error('Inserted row must specify value for primary key columns'); + } + } + + public primaryKey (): string { + return JSON.stringify( + this.primaryKeys.reduce>((acc, key) => { + acc[key] = this.data[key]; + return acc; + }, {}) + ); + } + + public isEqualRow (row: PostgresRow): boolean { + return this.primaryKeys.every(primaryKey => { + return row[primaryKey] === this.data[primaryKey]; + }); + } + + public isEqualEntity (entity: PostgresRowEntity): boolean { + return this.primaryKey() === entity.primaryKey(); + } + + public isEqualCriteria (criteria: WhereClauseMulti): boolean { + return Object.keys(criteria).every(attribute => { + const toMatchValue = criteria[attribute]; + if (Array.isArray(toMatchValue)) { + return toMatchValue.includes(this.data[attribute]); + } + return toMatchValue === this.data[attribute]; + }); + } + + public update (updateObject: PostgresRow): void { + Object.keys(updateObject).forEach(updateKey => { + this.data[updateKey] = updateObject[updateKey]; + }); + } +} + +class TableData { + specification: TableSpecification; + data: PostgresRowEntity[]; + serialCounter: Map; + + constructor (tableSpec: TableSpecification) { + this.specification = tableSpec; + this.data = []; + this.serialCounter = new Map(); + } + + public getEntitiesByCriteria (criteria: WhereClauseMulti, limit: number | null): PostgresRowEntity[] { + const matchedRows: PostgresRowEntity[] = []; + this.data.forEach(row => { + if (row.isEqualCriteria(criteria)) { + if (!limit || (limit && matchedRows.length < limit)) { + matchedRows.push(row); + } + } + }); + return matchedRows; + } + + private getSerialValue (columnName: string): number { + const serialCounterKey = `${this.specification.tableName}-${columnName}`; + const counterValue: number = this.serialCounter.get(serialCounterKey) ?? 1; + this.serialCounter.set(serialCounterKey, counterValue + 1); + return counterValue; + } + + private fillSerialValues (row: PostgresRow): void { + for (const serialColumnName of this.specification.serialColumns) { + if (row[serialColumnName] === undefined) { + row[serialColumnName] = this.getSerialValue(serialColumnName); + } + } + } + + private createEntityFromRow (row: PostgresRow): PostgresRowEntity { + // TODO: Fill default values + // TODO: Assert non null values + this.fillSerialValues(row); + return new PostgresRowEntity(row, this.specification.primaryKeyColumns); + } + + public rowIsUnique (otherRow: PostgresRow): boolean { + return this.data.every(entity => { + return !entity.isEqualRow(otherRow); + }); + } + + private entityIsUnique (otherEntity: PostgresRowEntity): boolean { + return this.data.every(entity => { + return !entity.isEqualEntity(otherEntity); + }); + } + + public insertRow (row: PostgresRow): PostgresRowEntity { + const entity: PostgresRowEntity = this.createEntityFromRow(row); + if (!this.entityIsUnique(entity)) { + throw new Error(`Cannot insert row twice into the same table: ${JSON.stringify(entity.data)}`); + } + + this.data.push(entity); + return entity; + } + + public insertEntity (entity: PostgresRowEntity): PostgresRowEntity { + if (!this.entityIsUnique(entity)) { + throw new Error(`Cannot insert row twice into the same table: ${JSON.stringify(entity.data)}`); + } + + this.data.push(entity); + return entity; + } + + public removeEntitiesByCriteria (criteria: WhereClauseMulti): PostgresRowEntity[] { + const remainingRows: PostgresRowEntity[] = []; + const matchedRows: PostgresRowEntity[] = []; + this.data.forEach(entity => { + if (entity.isEqualCriteria(criteria)) { + matchedRows.push(entity); + } else { + remainingRows.push(entity); + } + }); + this.data = remainingRows; + return matchedRows; + } +} + +class IndexerData { + private readonly tables: Map; + + constructor (schema: AST[]) { + this.tables = this.initializeTables(schema); + } + + private initializeTables (schemaAST: AST[]): Map { + const tables = new Map(); + for (const statement of schemaAST) { + if (statement.type === 'create' && statement.keyword === 'table') { + const tableSpec = this.createTableSpecification(statement); + tables.set(tableSpec.tableName, new TableData(tableSpec)); + } + } + + return tables; + } + + private createTableSpecification (createTableStatement: any): TableSpecification { + // TODO: Track foreign key columns and manage them during inserts/updates + const tableName = createTableStatement.table[0].table; + const columnNames: string[] = []; + const primaryKeyColumns: string[] = []; + const serialColumns: string[] = []; + + for (const columnDefinition of createTableStatement.create_definitions ?? []) { + if (columnDefinition.column) { + const columnName = this.getColumnName(columnDefinition); + columnNames.push(columnName); + + const dataType = columnDefinition.definition.dataType as string; + if (dataType.toUpperCase().includes('SERIAL')) { + serialColumns + .push(columnName); + } + + if (columnDefinition.primary_key) { + primaryKeyColumns.push(columnName); + } + } else if (columnDefinition.constraint_type === 'primary key') { + for (const primaryKey of columnDefinition.definition) { + primaryKeyColumns.push(primaryKey.column.expr.value); + } + } + } + const tableSpec: TableSpecification = { + tableName, + columnNames, + primaryKeyColumns, + serialColumns, + }; + + return tableSpec; + } + + private getColumnName (columnDefinition: any): string { + if (columnDefinition.column?.type === 'column_ref') { + return columnDefinition.column.column.expr.value; + } + return ''; + } + + private selectColumnsFromRow (row: PostgresRow, columnsToSelect: string[]): PostgresRow { + return columnsToSelect.reduce((newRow, columnName) => { + newRow[columnName] = columnName in row ? row[columnName] : undefined; + return newRow; + }, {}); + } + + private getTableData (tableName: string): TableData { + const tableData = this.tables.get(tableName); + if (!tableData) { + throw new Error(`Invalid table name provided: ${tableName}`); + } + + return tableData; + } + + private copyRow (row: PostgresRow): PostgresRow { + return JSON.parse(JSON.stringify(row)); + } + + private copyDataFromEntities (entities: PostgresRowEntity[]): PostgresRow[] { + const copiedRowData: PostgresRow[] = []; + for (const entity of entities) { + const copiedRow = this.copyRow(entity.data); + copiedRowData.push(copiedRow); + } + + return copiedRowData; + } + + public select (tableName: string, criteria: WhereClauseMulti, limit: number | null): PostgresRow[] { + const tableData = this.getTableData(tableName); + const matchedRows = tableData.getEntitiesByCriteria(criteria, limit); + + return this.copyDataFromEntities(matchedRows); + } + + public insert (tableName: string, rowsToInsert: PostgresRow[]): PostgresRow[] { + // TODO: Check types of columns + // TODO: Verify columns are correctly named, and have any required values + // TODO: Verify inserts are unique before actual insertion + const tableData = this.getTableData(tableName); + const insertedRows: PostgresRowEntity[] = []; + + for (const row of rowsToInsert) { + const rowCopy = this.copyRow(row); + if (!tableData.rowIsUnique(rowCopy)) { + throw new Error(`Cannot insert row twice into the same table: ${JSON.stringify(rowCopy)}`); + } + insertedRows.push(tableData.insertRow(rowCopy)); + } + + return this.copyDataFromEntities(insertedRows); + } + + public update (tableName: string, criteria: WhereClauseSingle, updateObject: PostgresRow): PostgresRow[] { + // TODO: Validate criteria passed in has valid column names + const tableData = this.getTableData(tableName); + const updatedRows: PostgresRowEntity[] = []; + + const matchedRows = tableData.removeEntitiesByCriteria(criteria); + for (const rowEntity of matchedRows) { + rowEntity.update(updateObject); + updatedRows.push(tableData.insertEntity(rowEntity)); + } + + return this.copyDataFromEntities(updatedRows); + } + + public upsert (tableName: string, rowsToUpsert: PostgresRow[], conflictColumns: string[], updateColumns: string[]): PostgresRow[] { + // TODO: Verify conflictColumns is a superset of primary key set (For uniqueness constraint) + const tableData = this.getTableData(tableName); + const upsertedRows: PostgresRowEntity[] = []; + + for (const row of rowsToUpsert) { + const rowCopy = this.copyRow(row); + const updateCriteriaObject = this.selectColumnsFromRow(rowCopy, conflictColumns); + const rowsMatchingUpdate = tableData.removeEntitiesByCriteria(updateCriteriaObject); + + if (rowsMatchingUpdate.length > 1) { + throw new Error('Conflict update criteria cannot affect row twice'); + } else if (rowsMatchingUpdate.length === 1) { + const matchedEntity = rowsMatchingUpdate[0]; + if (upsertedRows.some(upsertedEntity => upsertedEntity.isEqualEntity(matchedEntity))) { + throw new Error('Conflict update criteria cannot affect row twice'); + } + + const updateObject = this.selectColumnsFromRow(rowCopy, updateColumns); + matchedEntity.update(updateObject); + upsertedRows.push(tableData.insertEntity(matchedEntity)); + } else { + upsertedRows.push(tableData.insertRow(rowCopy)); + } + } + + return this.copyDataFromEntities(upsertedRows); + } + + public delete (tableName: string, deleteCriteria: WhereClauseMulti): PostgresRow[] { + const tableData = this.getTableData(tableName); + const deletedRows = tableData.removeEntitiesByCriteria(deleteCriteria); + + return this.copyDataFromEntities(deletedRows); + } +} + +export default class InMemoryDmlHandler implements IDmlHandler { + private readonly indexerData: IndexerData; + + constructor (schema: string) { + const parser = new Parser(); + let schemaAST = parser.astify(schema, { database: 'Postgresql' }); + schemaAST = Array.isArray(schemaAST) ? schemaAST : [schemaAST]; // Ensure iterable + this.indexerData = new IndexerData(schemaAST); + } + + public async insert (tableDefinitionNames: TableDefinitionNames, rowsToInsert: PostgresRow[]): Promise { + if (!rowsToInsert?.length) { + return []; + } + + return this.indexerData.insert(tableDefinitionNames.tableName, rowsToInsert); + } + + public async select (tableDefinitionNames: TableDefinitionNames, whereObject: WhereClauseMulti, limit: number | null = null): Promise { + return this.indexerData.select(tableDefinitionNames.tableName, whereObject, limit); + } + + public async update (tableDefinitionNames: TableDefinitionNames, whereObject: WhereClauseSingle, updateObject: any): Promise { + return this.indexerData.update(tableDefinitionNames.tableName, whereObject, updateObject); + } + + public async upsert (tableDefinitionNames: TableDefinitionNames, rowsToUpsert: PostgresRow[], conflictColumns: string[], updateColumns: string[]): Promise { + return this.indexerData.upsert(tableDefinitionNames.tableName, rowsToUpsert, conflictColumns, updateColumns); + } + + public async delete (tableDefinitionNames: TableDefinitionNames, whereObject: WhereClauseMulti): Promise { + return this.indexerData.delete(tableDefinitionNames.tableName, whereObject); + } +} diff --git a/runner/src/indexer/indexer.ts b/runner/src/indexer/indexer.ts index 3d6ad3a9a..1fd154ef5 100644 --- a/runner/src/indexer/indexer.ts +++ b/runner/src/indexer/indexer.ts @@ -34,6 +34,7 @@ interface Context { } export interface TableDefinitionNames { + tableName: string originalTableName: string originalColumnNames: Map } @@ -234,6 +235,7 @@ export default class Indexer { for (const columnDef of createDefs) { if (columnDef.column?.type === 'column_ref') { const tableDefinitionNames: TableDefinitionNames = { + tableName, originalTableName: this.retainOriginalQuoting(schema, tableName), originalColumnNames: this.getColumnDefinitionNames(createDefs) }; @@ -401,7 +403,7 @@ export default class Indexer { if (logError) { const message: string = errors ? errors.map((e: any) => e.message).join(', ') : `HTTP ${response.status} error writing with graphql to indexer storage`; const mutation: string = - `mutation writeLog($function_name: String!, $block_height: numeric!, $message: String!){ + `mutation writeLog($function_name: String!, $block_height: numeric!, $message: String!){ insert_indexer_log_entries_one(object: {function_name: $function_name, block_height: $block_height, message: $message}) { id } diff --git a/runner/src/server/services/runner/runner-service.test.ts b/runner/src/server/services/runner/runner-service.test.ts index bfbf9ca8d..57ff6de65 100644 --- a/runner/src/server/services/runner/runner-service.test.ts +++ b/runner/src/server/services/runner/runner-service.test.ts @@ -1,10 +1,10 @@ import * as grpc from '@grpc/grpc-js'; import type StreamHandler from '../../../stream-handler/stream-handler'; -import { IndexerStatus } from '../../../indexer-meta/indexer-meta'; import { LogLevel } from '../../../indexer-meta/log-entry'; import getRunnerService from './runner-service'; import IndexerConfig from '../../../indexer-config/indexer-config'; +import { ExecutionState } from '../../../generated/runner/ExecutionState'; const BASIC_REDIS_STREAM = 'test-redis-stream'; const BASIC_ACCOUNT_ID = 'test-account-id'; @@ -15,7 +15,7 @@ const BASIC_CODE = 'test-code'; const BASIC_SCHEMA = 'test-schema'; const BASIC_VERSION = 1; const BASIC_EXECUTOR_CONTEXT = { - status: IndexerStatus.RUNNING, + executionState: ExecutionState.RUNNING, }; describe('Runner gRPC Service', () => { @@ -32,6 +32,61 @@ describe('Runner gRPC Service', () => { genericIndexerConfig = new IndexerConfig(BASIC_REDIS_STREAM, BASIC_ACCOUNT_ID, BASIC_FUNCTION_NAME, BASIC_VERSION, BASIC_CODE, BASIC_SCHEMA, LogLevel.INFO); }); + it('get non existant executor', async () => { + const streamHandlerType = jest.fn().mockImplementation((indexerConfig) => { + return { + indexerConfig, + executorContext: BASIC_EXECUTOR_CONTEXT + }; + }); + const service = getRunnerService(new Map(), streamHandlerType); + + await new Promise((resolve) => { + service.GetExecutor({ request: { accountId: BASIC_ACCOUNT_ID, functionName: BASIC_FUNCTION_NAME } } as any, (err) => { + expect(err).toEqual({ + code: grpc.status.NOT_FOUND, + message: `Executor for account ${BASIC_ACCOUNT_ID} and name ${BASIC_FUNCTION_NAME} does not exist` + }); + resolve(null); + }); + }); + }); + + it('gets an existing executor', async () => { + const streamHandlerType = jest.fn().mockImplementation((indexerConfig) => { + return { + indexerConfig, + executorContext: BASIC_EXECUTOR_CONTEXT + }; + }); + const service = getRunnerService(new Map(), streamHandlerType); + const request = generateRequest(BASIC_REDIS_STREAM + '-A', BASIC_ACCOUNT_ID, BASIC_FUNCTION_NAME, BASIC_CODE, BASIC_SCHEMA, BASIC_VERSION); + + await new Promise((resolve, reject) => { + service.StartExecutor(request, (err) => { + if (err) reject(err); + resolve(null); + }); + }); + + await new Promise((resolve, reject) => { + service.GetExecutor({ request: { accountId: BASIC_ACCOUNT_ID, functionName: BASIC_FUNCTION_NAME } } as any, (err, response) => { + if (err) reject(err); + + expect(response).toEqual({ + executorId: BASIC_EXECUTOR_ID, + accountId: genericIndexerConfig.accountId, + functionName: genericIndexerConfig.functionName, + version: '1', + health: { + executionState: 'RUNNING' + } + }); + resolve(null); + }); + }); + }); + it('starts a executor with correct settings', () => { const service = getRunnerService(new Map(), genericStreamHandlerType); const mockCallback = jest.fn() as unknown as any; @@ -235,8 +290,10 @@ describe('Runner gRPC Service', () => { executorId: BASIC_EXECUTOR_ID, accountId: genericIndexerConfig.accountId, functionName: genericIndexerConfig.functionName, - status: IndexerStatus.RUNNING, - version: '1' + version: '1', + health: { + executionState: 'RUNNING' + } }] }); resolve(null); diff --git a/runner/src/server/services/runner/runner-service.ts b/runner/src/server/services/runner/runner-service.ts index 8584d185b..3b45856e9 100644 --- a/runner/src/server/services/runner/runner-service.ts +++ b/runner/src/server/services/runner/runner-service.ts @@ -8,6 +8,7 @@ import parentLogger from '../../../logger'; import { type RunnerHandlers } from '../../../generated/runner/Runner'; import { type StartExecutorResponse__Output, type StartExecutorResponse } from '../../../generated/runner/StartExecutorResponse'; import { type StartExecutorRequest__Output } from '../../../generated/runner/StartExecutorRequest'; +import { type GetExecutorRequest__Output } from '../../../generated/runner/GetExecutorRequest'; import { type StopExecutorRequest__Output } from '../../../generated/runner/StopExecutorRequest'; import { type StopExecutorResponse__Output, type StopExecutorResponse } from '../../../generated/runner/StopExecutorResponse'; import { type ListExecutorsRequest__Output } from '../../../generated/runner/ListExecutorsRequest'; @@ -19,6 +20,30 @@ export function getRunnerService ( StreamHandlerType: typeof StreamHandler = StreamHandler ): RunnerHandlers { const RunnerService: RunnerHandlers = { + GetExecutor (call: ServerUnaryCall, callback: sendUnaryData): void { + const { accountId, functionName } = call.request; + + const executorEntry = Array.from(executors.entries()).find(([_id, executor]) => executor.indexerConfig.accountId === accountId && executor.indexerConfig.functionName === functionName); + + if (executorEntry) { + const [executorId, executor] = executorEntry; + callback(null, { + executorId, + accountId: executor.indexerConfig.accountId, + functionName: executor.indexerConfig.functionName, + version: executor.indexerConfig.version.toString(), + health: { + executionState: executor.executorContext.executionState, + } + }); + } else { + const notFoundError = { + code: grpc.status.NOT_FOUND, + message: `Executor for account ${accountId} and name ${functionName} does not exist` + }; + callback(notFoundError, null); + } + }, StartExecutor (call: ServerUnaryCall, callback: sendUnaryData): void { // Validate request const validationResult = validateStartExecutorRequest(call.request); @@ -127,7 +152,9 @@ export function getRunnerService ( accountId: indexerConfig.accountId, functionName: indexerConfig.functionName, version: indexerConfig.version.toString(), - status: indexerContext.status + health: { + executionState: indexerContext.executionState, + } }); }); callback(null, { diff --git a/runner/src/stream-handler/stream-handler.ts b/runner/src/stream-handler/stream-handler.ts index c6a07ce6c..f1b24b4fb 100644 --- a/runner/src/stream-handler/stream-handler.ts +++ b/runner/src/stream-handler/stream-handler.ts @@ -3,7 +3,6 @@ import { Worker, isMainThread } from 'worker_threads'; import { registerWorkerMetrics, deregisterWorkerMetrics } from '../metrics'; import Indexer from '../indexer'; -import { IndexerStatus } from '../indexer-meta/indexer-meta'; import LogEntry from '../indexer-meta/log-entry'; import logger from '../logger'; @@ -12,7 +11,7 @@ import type IndexerConfig from '../indexer-config'; export enum WorkerMessageType { METRICS = 'METRICS', BLOCK_HEIGHT = 'BLOCK_HEIGHT', - STATUS = 'STATUS', + EXECUTION_STATE = 'STATUS', } export interface WorkerMessage { @@ -20,8 +19,16 @@ export interface WorkerMessage { data: any } +export enum ExecutionState { + RUNNING = 'RUNNING', + FAILING = 'FAILING', + WAITING = 'WAITING', + STOPPED = 'STOPPED', + STALLED = 'STALLED', +} + interface ExecutorContext { - status: IndexerStatus + executionState: ExecutionState block_height: number } @@ -42,7 +49,7 @@ export default class StreamHandler { }, }); this.executorContext = { - status: IndexerStatus.RUNNING, + executionState: ExecutionState.RUNNING, block_height: indexerConfig.version, }; @@ -56,12 +63,14 @@ export default class StreamHandler { async stop (): Promise { deregisterWorkerMetrics(this.worker.threadId); + this.executorContext.executionState = ExecutionState.STOPPED; + await this.worker.terminate(); } private handleError (error: Error): void { this.logger.error('Terminating thread', error); - this.executorContext.status = IndexerStatus.STOPPED; + this.executorContext.executionState = ExecutionState.STALLED; const indexer = new Indexer(this.indexerConfig); indexer.setStoppedStatus().catch((e) => { @@ -82,8 +91,8 @@ export default class StreamHandler { private handleMessage (message: WorkerMessage): void { switch (message.type) { - case WorkerMessageType.STATUS: - this.executorContext.status = message.data.status; + case WorkerMessageType.EXECUTION_STATE: + this.executorContext.executionState = message.data.state; break; case WorkerMessageType.BLOCK_HEIGHT: this.executorContext.block_height = message.data; diff --git a/runner/src/stream-handler/worker.ts b/runner/src/stream-handler/worker.ts index d9e77184d..fb1a897ce 100644 --- a/runner/src/stream-handler/worker.ts +++ b/runner/src/stream-handler/worker.ts @@ -7,9 +7,8 @@ import Indexer from '../indexer'; import RedisClient from '../redis-client'; import { METRICS } from '../metrics'; import LakeClient from '../lake-client'; -import { WorkerMessageType, type WorkerMessage } from './stream-handler'; +import { WorkerMessageType, type WorkerMessage, ExecutionState } from './stream-handler'; import setUpTracerExport from '../instrumentation'; -import { IndexerStatus } from '../indexer-meta/indexer-meta'; import IndexerConfig from '../indexer-config'; import parentLogger from '../logger'; import { wrapSpan } from '../utility'; @@ -119,6 +118,7 @@ async function blockQueueConsumer (workerContext: WorkerContext): Promise metricsSpan.end(); if (workerContext.queue.length === 0) { + parentPort?.postMessage({ type: WorkerMessageType.EXECUTION_STATE, data: { state: ExecutionState.WAITING } }); await sleep(100); continue; } @@ -162,7 +162,7 @@ async function blockQueueConsumer (workerContext: WorkerContext): Promise }); const postRunSpan = tracer.startSpan('Delete redis message and shift queue', {}, context.active()); - parentPort?.postMessage({ type: WorkerMessageType.STATUS, data: { status: IndexerStatus.RUNNING } }); + parentPort?.postMessage({ type: WorkerMessageType.EXECUTION_STATE, data: { state: ExecutionState.RUNNING } }); await workerContext.redisClient.deleteStreamMessage(indexerConfig.redisStreamKey, streamMessageId); await workerContext.queue.shift(); @@ -174,7 +174,7 @@ async function blockQueueConsumer (workerContext: WorkerContext): Promise } catch (err) { METRICS.FAILED_EXECUTIONS.labels({ indexer: indexerConfig.fullName() }).inc(); parentSpan.setAttribute('status', 'failed'); - parentPort?.postMessage({ type: WorkerMessageType.STATUS, data: { status: IndexerStatus.FAILING } }); + parentPort?.postMessage({ type: WorkerMessageType.EXECUTION_STATE, data: { state: ExecutionState.FAILING } }); const error = err as Error; if (previousError !== error.message) { previousError = error.message;