Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prod Release 24/09/24 #1021

Merged
merged 17 commits into from
Sep 24, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactor: Move synchronisation logic to LifecycleManger (#1005)
This PR moves the synchronisation logic within
`ExecutorsHandler`/`BlockStreamsHandler` in to `LifecycleManager`. The
motive behind this is to provide greater awareness/control over the
individual sync tasks (managing unhealthy, reconfiguring, etc.) to
`LifecycleManger`.

I plan to build on top of this in my next PR, and have
`LifecycleManager` count how many times an unhealthy stream/executor was
restarted, and move the Indexer to `Suspended` if it goes above a
configured threshold. This refactor makes "counting" actually possible.

Wanted to land the refactor PR first to make review easier.
morgsmccauley authored Aug 14, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit 4b46f54fb820b0545e0c09898d042346190db9bb
342 changes: 148 additions & 194 deletions coordinator/src/handlers/block_streams.rs
Original file line number Diff line number Diff line change
@@ -18,7 +18,19 @@ use tonic::transport::channel::Channel;
use crate::indexer_config::IndexerConfig;
use crate::redis::{KeyProvider, RedisClient};

const RESTART_TIMEOUT_SECONDS: u64 = 600;
#[derive(Debug, PartialEq)]
pub enum BlockStreamStatus {
/// Block Stream is running as expected
Active,
/// Existing Block Stream is in an unhealthy state
Unhealthy,
/// Existing Block Stream is not running
Inactive,
/// Block Stream is not synchronized with the latest config
Outdated,
/// Block Stream has not been encountered before
NotStarted,
}

#[cfg(not(test))]
use BlockStreamsClientWrapperImpl as BlockStreamsClientWrapper;
@@ -191,7 +203,10 @@ impl BlockStreamsHandlerImpl {
Ok(())
}

async fn reconfigure(&self, config: &IndexerConfig) -> anyhow::Result<()> {
pub async fn reconfigure(&self, config: &IndexerConfig) -> anyhow::Result<()> {
self.stop_if_needed(config.account_id.clone(), config.function_name.clone())
.await?;

if matches!(
config.start_block,
StartBlock::Latest | StartBlock::Height(..)
@@ -216,7 +231,7 @@ impl BlockStreamsHandlerImpl {
Ok(())
}

async fn start_new_block_stream(&self, config: &IndexerConfig) -> anyhow::Result<()> {
pub async fn start_new_block_stream(&self, config: &IndexerConfig) -> anyhow::Result<()> {
let height = match config.start_block {
StartBlock::Height(height) => height,
StartBlock::Latest => config.get_registry_version(),
@@ -254,7 +269,7 @@ impl BlockStreamsHandlerImpl {
Ok(height)
}

async fn resume(&self, config: &IndexerConfig) -> anyhow::Result<()> {
pub async fn resume(&self, config: &IndexerConfig) -> anyhow::Result<()> {
let height = self.get_continuation_block_height(config).await?;

tracing::info!(height, "Resuming block stream");
@@ -264,11 +279,7 @@ impl BlockStreamsHandlerImpl {
Ok(())
}

async fn ensure_healthy(
&self,
config: &IndexerConfig,
block_stream: &StreamInfo,
) -> anyhow::Result<()> {
fn is_healthy(&self, block_stream: &StreamInfo) -> bool {
if let Some(health) = block_stream.health.as_ref() {
let updated_at =
SystemTime::UNIX_EPOCH + Duration::from_secs(health.updated_at_timestamp_secs);
@@ -280,84 +291,67 @@ impl BlockStreamsHandlerImpl {
);

if !stale && !stalled {
return Ok(());
} else {
tracing::info!(
stale,
stalled,
"Restarting stalled block stream after {RESTART_TIMEOUT_SECONDS} seconds"
);
return true;
}
} else {
tracing::info!(
"Restarting stalled block stream after {RESTART_TIMEOUT_SECONDS} seconds"
);
}

self.stop(block_stream.stream_id.clone()).await?;
tokio::time::sleep(tokio::time::Duration::from_secs(RESTART_TIMEOUT_SECONDS)).await;
let height = self.get_continuation_block_height(config).await?;
self.start(height, config).await?;
false
}

pub async fn stop_if_needed(
&self,
account_id: AccountId,
function_name: String,
) -> anyhow::Result<()> {
if let Some(block_stream) = self.get(account_id, function_name).await? {
tracing::info!("Stopping block stream");

self.stop(block_stream.stream_id).await?;
}

Ok(())
}

pub async fn synchronise(
pub async fn get_status(
&self,
config: &IndexerConfig,
previous_sync_version: Option<u64>,
) -> anyhow::Result<()> {
let block_stream = self
) -> anyhow::Result<BlockStreamStatus> {
if let Some(block_stream) = self
.get(config.account_id.clone(), config.function_name.clone())
.await?;

if let Some(block_stream) = block_stream {
if block_stream.version == config.get_registry_version() {
self.ensure_healthy(config, &block_stream).await?;
return Ok(());
.await?
{
if block_stream.version != config.get_registry_version() {
return Ok(BlockStreamStatus::Outdated);
}

tracing::info!(
previous_version = block_stream.version,
"Stopping outdated block stream"
);

self.stop(block_stream.stream_id.clone()).await?;

self.reconfigure(config).await?;
if !self.is_healthy(&block_stream) {
return Ok(BlockStreamStatus::Unhealthy);
}

return Ok(());
return Ok(BlockStreamStatus::Active);
}

if previous_sync_version.is_none() {
self.start_new_block_stream(config).await?;

return Ok(());
return Ok(BlockStreamStatus::NotStarted);
}

if previous_sync_version.unwrap() != config.get_registry_version() {
self.reconfigure(config).await?;

return Ok(());
return Ok(BlockStreamStatus::Outdated);
}

self.resume(config).await?;

Ok(())
Ok(BlockStreamStatus::Inactive)
}

pub async fn stop_if_needed(
&self,
account_id: AccountId,
function_name: String,
) -> anyhow::Result<()> {
if let Some(block_stream) = self.get(account_id, function_name).await? {
tracing::info!("Stopping block stream");

self.stop(block_stream.stream_id).await?;
pub async fn restart(&self, config: &IndexerConfig) -> anyhow::Result<()> {
if let Some(block_stream) = self
.get(config.account_id.clone(), config.function_name.clone())
.await?
{
self.stop(block_stream.stream_id.clone()).await?;
}

Ok(())
self.resume(config).await
}
}

@@ -381,18 +375,84 @@ mod tests {
}

#[tokio::test]
async fn resumes_stopped_streams() {
async fn returns_stream_status() {
let config = IndexerConfig::default();
let test_cases = [
(
Some(StreamInfo {
version: config.get_registry_version(),
health: Some(block_streamer::Health {
updated_at_timestamp_secs: SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs(),
processing_state: ProcessingState::Running.into(),
}),
..Default::default()
}),
Some(config.get_registry_version()),
BlockStreamStatus::Active,
),
(
None,
Some(config.get_registry_version()),
BlockStreamStatus::Inactive,
),
(
Some(StreamInfo {
version: config.get_registry_version() - 1,
..Default::default()
}),
Some(config.get_registry_version()),
BlockStreamStatus::Outdated,
),
(
Some(StreamInfo {
version: config.get_registry_version(),
health: None,
..Default::default()
}),
Some(config.get_registry_version()),
BlockStreamStatus::Unhealthy,
),
(None, None, BlockStreamStatus::NotStarted),
];

for (stream, previous_sync_version, expected) in test_cases {
let mut mock_client = BlockStreamsClientWrapper::default();
mock_client
.expect_get_stream::<GetStreamRequest>()
.returning(move |_| {
if let Some(stream) = stream.clone() {
Ok(Response::new(stream))
} else {
Err(tonic::Status::not_found("not found"))
}
});

let mock_redis = RedisClient::default();

let handler = BlockStreamsHandlerImpl {
client: mock_client,
redis_client: mock_redis,
};

assert_eq!(
expected,
handler
.get_status(&config, previous_sync_version)
.await
.unwrap()
);
}
}

#[tokio::test]
async fn resumes_streams() {
let config = IndexerConfig::default();
let last_published_block = 10;

let mut mock_client = BlockStreamsClientWrapper::default();
mock_client
.expect_get_stream::<GetStreamRequest>()
.with(eq(GetStreamRequest {
account_id: config.account_id.to_string(),
function_name: config.function_name.clone(),
}))
.returning(|_| Err(tonic::Status::not_found("not found")));
mock_client
.expect_start_stream::<StartStreamRequest>()
.with(eq(StartStreamRequest {
@@ -406,26 +466,25 @@ mod tests {
start_block_height: last_published_block + 1,
version: config.get_registry_version(),
}))
.returning(|_| Ok(Response::new(StartStreamResponse::default())));
.returning(|_| Ok(Response::new(StartStreamResponse::default())))
.once();

let mut mock_redis = RedisClient::default();
mock_redis
.expect_get_last_published_block::<IndexerConfig>()
.returning(move |_| Ok(Some(last_published_block)));
.returning(move |_| Ok(Some(last_published_block)))
.once();

let handler = BlockStreamsHandlerImpl {
client: mock_client,
redis_client: mock_redis,
};

handler
.synchronise(&config, Some(config.get_registry_version()))
.await
.unwrap();
handler.resume(&config).await.unwrap();
}

#[tokio::test]
async fn reconfigures_outdated_streams() {
async fn reconfigures_streams() {
let config = IndexerConfig::default();

let existing_stream = StreamInfo {
@@ -480,24 +539,14 @@ mod tests {
redis_client: mock_redis,
};

handler
.synchronise(&config, Some(config.get_registry_version()))
.await
.unwrap();
handler.reconfigure(&config).await.unwrap();
}

#[tokio::test]
async fn starts_new_streams() {
let config = IndexerConfig::default();

let mut mock_client = BlockStreamsClientWrapper::default();
mock_client
.expect_get_stream::<GetStreamRequest>()
.with(eq(GetStreamRequest {
account_id: config.account_id.to_string(),
function_name: config.function_name.clone(),
}))
.returning(|_| Err(tonic::Status::not_found("not found")));
mock_client
.expect_start_stream::<StartStreamRequest>()
.with(eq(StartStreamRequest {
@@ -524,62 +573,14 @@ mod tests {
redis_client: mock_redis,
};

handler.synchronise(&config, None).await.unwrap();
}

#[tokio::test]
async fn reconfigures_outdated_and_stopped_streams() {
let config = IndexerConfig {
start_block: StartBlock::Latest,
..Default::default()
};

let mut mock_client = BlockStreamsClientWrapper::default();
mock_client
.expect_get_stream::<GetStreamRequest>()
.with(eq(GetStreamRequest {
account_id: config.account_id.to_string(),
function_name: config.function_name.clone(),
}))
.returning(|_| Err(tonic::Status::not_found("not found")));
mock_client
.expect_start_stream::<StartStreamRequest>()
.with(eq(StartStreamRequest {
account_id: config.account_id.to_string(),
function_name: config.function_name.clone(),
redis_stream: config.get_redis_stream_key(),
rule: Some(Rule::ActionAnyRule(ActionAnyRule {
affected_account_id: "queryapi.dataplatform.near".to_string(),
status: Status::Any.into(),
})),
start_block_height: config.get_registry_version(),
version: config.get_registry_version(),
}))
.returning(|_| Ok(Response::new(StartStreamResponse::default())));

let mut mock_redis = RedisClient::default();
mock_redis
.expect_clear_block_stream::<IndexerConfig>()
.returning(|_| Ok(()))
.once();

let handler = BlockStreamsHandlerImpl {
client: mock_client,
redis_client: mock_redis,
};

handler
.synchronise(&config, Some(config.get_registry_version() - 1))
.await
.unwrap();
handler.start_new_block_stream(&config).await.unwrap();
}

#[tokio::test]
async fn restarts_unhealthy_streams() {
async fn unhealthy_stream() {
tokio::time::pause();

let config = IndexerConfig::default();
let last_published_block = 10;

let existing_stream = StreamInfo {
account_id: config.account_id.to_string(),
@@ -595,53 +596,19 @@ mod tests {
}),
};

let mut mock_client = BlockStreamsClientWrapper::default();
mock_client
.expect_stop_stream::<StopStreamRequest>()
.with(eq(StopStreamRequest {
stream_id: existing_stream.stream_id.clone(),
}))
.returning(|_| Ok(Response::new(StopStreamResponse::default())));
mock_client
.expect_get_stream::<GetStreamRequest>()
.with(eq(GetStreamRequest {
account_id: config.account_id.to_string(),
function_name: config.function_name.clone(),
}))
.returning(move |_| Ok(Response::new(existing_stream.clone())));
mock_client
.expect_start_stream::<StartStreamRequest>()
.with(eq(StartStreamRequest {
account_id: config.account_id.to_string(),
function_name: config.function_name.clone(),
redis_stream: config.get_redis_stream_key(),
rule: Some(Rule::ActionAnyRule(ActionAnyRule {
affected_account_id: "queryapi.dataplatform.near".to_string(),
status: Status::Any.into(),
})),
start_block_height: last_published_block + 1,
version: config.get_registry_version(),
}))
.returning(|_| Ok(Response::new(StartStreamResponse::default())));

let mut mock_redis = RedisClient::default();
mock_redis
.expect_get_last_published_block::<IndexerConfig>()
.returning(move |_| Ok(Some(last_published_block)));
let mock_client = BlockStreamsClientWrapper::default();
let mock_redis = RedisClient::default();

let handler = BlockStreamsHandlerImpl {
client: mock_client,
redis_client: mock_redis,
};

handler
.synchronise(&config, Some(config.get_registry_version() - 1))
.await
.unwrap();
assert!(!handler.is_healthy(&existing_stream));
}

#[tokio::test]
async fn ignores_healthy_streams() {
async fn healthy_streams() {
tokio::time::pause();

let config = IndexerConfig::default();
@@ -667,32 +634,15 @@ mod tests {
}),
};

let mut mock_client = BlockStreamsClientWrapper::default();
mock_client
.expect_get_stream::<GetStreamRequest>()
.with(eq(GetStreamRequest {
account_id: config.account_id.to_string(),
function_name: config.function_name.clone(),
}))
.returning(move |_| Ok(Response::new(existing_stream.clone())));
mock_client
.expect_stop_stream::<StopStreamRequest>()
.never();
mock_client
.expect_start_stream::<StartStreamRequest>()
.never();

let mock_client = BlockStreamsClientWrapper::default();
let mock_redis = RedisClient::default();

let handler = BlockStreamsHandlerImpl {
client: mock_client,
redis_client: mock_redis,
};

handler
.synchronise(&config, Some(config.get_registry_version()))
.await
.unwrap();
assert!(handler.is_healthy(&existing_stream));
}
}

@@ -709,6 +659,10 @@ mod tests {
};

let mut mock_client = BlockStreamsClientWrapper::default();
mock_client
.expect_get_stream::<GetStreamRequest>()
.returning(|_| Err(tonic::Status::not_found("not found")))
.times(3);
mock_client
.expect_start_stream::<StartStreamRequest>()
.with(always())
185 changes: 91 additions & 94 deletions coordinator/src/handlers/executors.rs
Original file line number Diff line number Diff line change
@@ -14,7 +14,17 @@ use tonic::transport::channel::Channel;
use crate::indexer_config::IndexerConfig;
use crate::redis::KeyProvider;

const RESTART_TIMEOUT_SECONDS: u64 = 600;
#[derive(Debug, PartialEq)]
pub enum ExecutorStatus {
/// Executor is running as expected
Active,
/// Executor is in an unhealthy state
Unhealthy,
/// Executor is not running
Inactive,
/// Executor is not synchronized with the latest config
Outdated,
}

#[cfg(not(test))]
use ExecutorsClientWrapperImpl as ExecutorsClientWrapper;
@@ -148,56 +158,40 @@ impl ExecutorsHandlerImpl {
Ok(())
}

async fn ensure_healthy(
&self,
config: &IndexerConfig,
executor: ExecutorInfo,
) -> anyhow::Result<()> {
fn is_healthy(&self, executor: ExecutorInfo) -> bool {
if let Some(health) = executor.health {
if !matches!(
return !matches!(
health.execution_state.try_into(),
Ok(ExecutionState::Stalled)
) {
return Ok(());
}
);
}

tracing::info!("Restarting stalled executor after {RESTART_TIMEOUT_SECONDS} seconds");

self.stop(executor.executor_id).await?;
tokio::time::sleep(tokio::time::Duration::from_secs(RESTART_TIMEOUT_SECONDS)).await;
self.start(config).await?;

Ok(())
false
}

pub async fn synchronise(&self, config: &IndexerConfig) -> anyhow::Result<()> {
pub async fn get_status(&self, config: &IndexerConfig) -> anyhow::Result<ExecutorStatus> {
let executor = self
.get(config.account_id.clone(), config.function_name.clone())
.await?;

if let Some(executor) = executor {
if executor.version == config.get_registry_version() {
self.ensure_healthy(config, executor).await?;
return Ok(());
if executor.version != config.get_registry_version() {
return Ok(ExecutorStatus::Outdated);
}

tracing::info!(
account_id = config.account_id.as_str(),
function_name = config.function_name,
version = executor.version,
"Stopping outdated executor"
);
if !self.is_healthy(executor) {
return Ok(ExecutorStatus::Unhealthy);
}

self.stop(executor.executor_id).await?;
return Ok(ExecutorStatus::Active);
}

tracing::info!(
account_id = config.account_id.as_str(),
function_name = config.function_name,
version = config.get_registry_version(),
"Starting executor"
);
Ok(ExecutorStatus::Inactive)
}

pub async fn restart(&self, config: &IndexerConfig) -> anyhow::Result<()> {
self.stop_if_needed(config.account_id.clone(), config.function_name.clone())
.await?;

self.start(config).await?;

@@ -238,18 +232,63 @@ mod tests {
}

#[tokio::test]
async fn resumes_stopped_executors() {
async fn returns_executor_status() {
let config = IndexerConfig::default();
let test_cases = [
(
Some(ExecutorInfo {
version: config.get_registry_version(),
health: None,
..Default::default()
}),
ExecutorStatus::Unhealthy,
),
(None, ExecutorStatus::Inactive),
(
Some(ExecutorInfo {
version: config.get_registry_version() - 1,
..Default::default()
}),
ExecutorStatus::Outdated,
),
(
Some(ExecutorInfo {
version: config.get_registry_version(),
health: Some(runner::Health {
execution_state: runner::ExecutionState::Running.into(),
}),
..Default::default()
}),
ExecutorStatus::Active,
),
];

for (executor, expected_status) in test_cases {
let mut mock_client = ExecutorsClientWrapper::default();
mock_client
.expect_get_executor::<GetExecutorRequest>()
.with(always())
.returning(move |_| {
if let Some(executor) = executor.clone() {
Ok(Response::new(executor))
} else {
Err(tonic::Status::not_found("not found"))
}
});

let handler = ExecutorsHandlerImpl {
client: mock_client,
};

assert_eq!(handler.get_status(&config).await.unwrap(), expected_status);
}
}

#[tokio::test]
async fn starts_executors() {
let config = IndexerConfig::default();

let mut mock_client = ExecutorsClientWrapper::default();
mock_client
.expect_get_executor::<GetExecutorRequest>()
.with(eq(GetExecutorRequest {
account_id: config.account_id.to_string(),
function_name: config.function_name.clone(),
}))
.returning(|_| Err(tonic::Status::not_found("not found")))
.once();
mock_client
.expect_start_executor::<StartExecutorRequest>()
.with(eq(StartExecutorRequest {
@@ -271,11 +310,11 @@ mod tests {
client: mock_client,
};

handler.synchronise(&config).await.unwrap()
handler.start(&config).await.unwrap()
}

#[tokio::test]
async fn reconfigures_outdated_executors() {
async fn restarts_executors() {
let config = IndexerConfig::default();

let executor = ExecutorInfo {
@@ -324,11 +363,11 @@ mod tests {
client: mock_client,
};

handler.synchronise(&config).await.unwrap()
handler.restart(&config).await.unwrap()
}

#[tokio::test]
async fn restarts_unhealthy_executors() {
async fn unhealthy_executor() {
tokio::time::pause();

let config = IndexerConfig::default();
@@ -343,49 +382,17 @@ mod tests {
}),
};

let mut mock_client = ExecutorsClientWrapper::default();
mock_client
.expect_stop_executor::<StopExecutorRequest>()
.with(eq(StopExecutorRequest {
executor_id: executor.executor_id.clone(),
}))
.returning(|_| {
Ok(Response::new(StopExecutorResponse {
executor_id: "executor_id".to_string(),
}))
})
.once();
mock_client
.expect_start_executor::<StartExecutorRequest>()
.with(eq(StartExecutorRequest {
code: config.code.clone(),
schema: config.schema.clone(),
redis_stream: config.get_redis_stream_key(),
version: config.get_registry_version(),
account_id: config.account_id.to_string(),
function_name: config.function_name.clone(),
}))
.returning(|_| {
Ok(tonic::Response::new(StartExecutorResponse {
executor_id: "executor_id".to_string(),
}))
})
.once();
mock_client
.expect_get_executor::<GetExecutorRequest>()
.with(always())
.returning(move |_| Ok(Response::new(executor.clone())))
.once();
let mock_client = ExecutorsClientWrapper::default();

let handler = ExecutorsHandlerImpl {
client: mock_client,
};

handler.synchronise(&config).await.unwrap()
assert!(!handler.is_healthy(executor));
}

#[tokio::test]
async fn ignores_healthy_executors() {
async fn healthy_executors() {
tokio::time::pause();

let config = IndexerConfig::default();
@@ -408,23 +415,13 @@ mod tests {
}),
};

let mut mock_client = ExecutorsClientWrapper::default();
mock_client
.expect_stop_executor::<StopExecutorRequest>()
.never();
mock_client
.expect_start_executor::<StartExecutorRequest>()
.never();
mock_client
.expect_get_executor::<GetExecutorRequest>()
.with(always())
.returning(move |_| Ok(Response::new(executor.clone())));
let mock_client = ExecutorsClientWrapper::default();

let handler = ExecutorsHandlerImpl {
client: mock_client,
};

handler.synchronise(&config).await.unwrap()
assert!(handler.is_healthy(executor));
}
}
}
428 changes: 393 additions & 35 deletions coordinator/src/lifecycle.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use tracing::{info, warn};

use crate::handlers::block_streams::BlockStreamsHandler;
use crate::handlers::block_streams::{BlockStreamStatus, BlockStreamsHandler};
use crate::handlers::data_layer::DataLayerHandler;
use crate::handlers::executors::ExecutorsHandler;
use crate::handlers::executors::{ExecutorStatus, ExecutorsHandler};
use crate::indexer_config::IndexerConfig;
use crate::indexer_state::{IndexerState, IndexerStateManager};
use crate::redis::{KeyProvider, RedisClient};
use crate::registry::Registry;

const LOOP_THROTTLE_MS: u64 = 1000;
const RESTART_TIMEOUT_SECONDS: u64 = 600;

/// Represents the different lifecycle states of an Indexer
#[derive(Default, Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
@@ -124,18 +125,55 @@ impl<'a> LifecycleManager<'a> {
return LifecycleState::Suspending;
}

if let Err(error) = self
let stream_status = match self
.block_streams_handler
.synchronise(config, state.block_stream_synced_at)
.get_status(config, state.block_stream_synced_at)
.await
{
Ok(status) => status,
Err(error) => {
warn!(?error, "Failed to get block stream status");
return LifecycleState::Running;
}
};

if let Err(error) = match stream_status {
BlockStreamStatus::Active => Ok(()),
BlockStreamStatus::Inactive => self.block_streams_handler.resume(config).await,
BlockStreamStatus::Outdated => self.block_streams_handler.reconfigure(config).await,
BlockStreamStatus::Unhealthy => {
tokio::time::sleep(tokio::time::Duration::from_secs(RESTART_TIMEOUT_SECONDS)).await;
self.block_streams_handler.restart(config).await
}
BlockStreamStatus::NotStarted => {
self.block_streams_handler
.start_new_block_stream(config)
.await
}
} {
warn!(?error, "Failed to synchronise block stream, retrying...");
return LifecycleState::Running;
}

state.block_stream_synced_at = Some(config.get_registry_version());

if let Err(error) = self.executors_handler.synchronise(config).await {
let executor_status = match self.executors_handler.get_status(config).await {
Ok(status) => status,
Err(error) => {
warn!(?error, "Failed to synchronise executor");
return LifecycleState::Running;
}
};

if let Err(error) = match executor_status {
ExecutorStatus::Active => Ok(()),
ExecutorStatus::Inactive => self.executors_handler.start(config).await,
ExecutorStatus::Outdated => self.executors_handler.restart(config).await,
ExecutorStatus::Unhealthy => {
tokio::time::sleep(tokio::time::Duration::from_secs(RESTART_TIMEOUT_SECONDS)).await;
self.executors_handler.restart(config).await
}
} {
warn!(?error, "Failed to synchronise executor, retrying...");
return LifecycleState::Running;
}
@@ -621,53 +659,162 @@ mod tests {
}

#[tokio::test]
async fn synchronises_streams_and_executors() {
async fn ignores_active_stream() {
let config = IndexerConfig::default();
let mut state = IndexerState {
lifecycle_state: LifecycleState::Running,
account_id: config.account_id.clone(),
function_name: config.function_name.clone(),
enabled: true,
block_stream_synced_at: None,
};

let mut block_streams_handler = BlockStreamsHandler::default();
block_streams_handler
.expect_synchronise()
.returning(|_, _| Ok(()))
.expect_get_status()
.returning(|_, _| Ok(BlockStreamStatus::Active));

let mut executors_handler = ExecutorsHandler::default();
executors_handler
.expect_get_status()
.returning(|_| Ok(ExecutorStatus::Active));

let data_layer_handler = DataLayerHandler::default();
let state_manager = IndexerStateManager::default();
let registry = Registry::default();
let redis_client = RedisClient::default();

let lifecycle_manager = LifecycleManager::new(
config.clone(),
&block_streams_handler,
&executors_handler,
&data_layer_handler,
&registry,
&state_manager,
&redis_client,
);

lifecycle_manager.handle_running(&config, &mut state).await;
}

#[tokio::test]
async fn restarts_unhealthy_stream() {
let config = IndexerConfig::default();
let mut state = IndexerState {
lifecycle_state: LifecycleState::Running,
account_id: config.account_id.clone(),
function_name: config.function_name.clone(),
enabled: true,
block_stream_synced_at: None,
};

let mut block_streams_handler = BlockStreamsHandler::default();
block_streams_handler
.expect_get_status()
.returning(|_, _| Ok(BlockStreamStatus::Unhealthy));
block_streams_handler
.expect_restart()
.returning(|_| Ok(()))
.once();

let mut executors_handler = ExecutorsHandler::default();
executors_handler
.expect_synchronise()
.expect_get_status()
.returning(|_| Ok(ExecutorStatus::Active));

let data_layer_handler = DataLayerHandler::default();
let state_manager = IndexerStateManager::default();
let registry = Registry::default();
let redis_client = RedisClient::default();

let lifecycle_manager = LifecycleManager::new(
config.clone(),
&block_streams_handler,
&executors_handler,
&data_layer_handler,
&registry,
&state_manager,
&redis_client,
);

lifecycle_manager.handle_running(&config, &mut state).await;
}

#[tokio::test]
async fn resumes_inactive_streams() {
let config = IndexerConfig::default();
let mut state = IndexerState {
lifecycle_state: LifecycleState::Running,
account_id: config.account_id.clone(),
function_name: config.function_name.clone(),
enabled: true,
block_stream_synced_at: None,
};

let mut block_streams_handler = BlockStreamsHandler::default();
block_streams_handler
.expect_get_status()
.returning(|_, _| Ok(BlockStreamStatus::Inactive));
block_streams_handler
.expect_resume()
.returning(|_| Ok(()))
.once();

let mut executors_handler = ExecutorsHandler::default();
executors_handler
.expect_get_status()
.returning(|_| Ok(ExecutorStatus::Active));

let data_layer_handler = DataLayerHandler::default();
let state_manager = IndexerStateManager::default();
let registry = Registry::default();
let redis_client = RedisClient::default();

let mut registry = Registry::default();
registry
.expect_fetch_indexer()
.returning(move |_, _| Ok(Some(IndexerConfig::default())));
let lifecycle_manager = LifecycleManager::new(
config.clone(),
&block_streams_handler,
&executors_handler,
&data_layer_handler,
&registry,
&state_manager,
&redis_client,
);

let mut state_manager = IndexerStateManager::default();
state_manager.expect_get_state().returning(|_| {
Ok(IndexerState {
lifecycle_state: LifecycleState::Running,
account_id: "near".parse().unwrap(),
function_name: "function_name".to_string(),
enabled: true,
block_stream_synced_at: None,
})
});
state_manager
.expect_set_state()
.with(
always(),
function(|state: &IndexerState| {
state.lifecycle_state == LifecycleState::Running
&& state.block_stream_synced_at == Some(2)
}),
)
.returning(|_, _| Ok(()));
lifecycle_manager.handle_running(&config, &mut state).await;
}

#[tokio::test]
async fn reconfigures_outdated_streams() {
let config = IndexerConfig::default();
let mut state = IndexerState {
lifecycle_state: LifecycleState::Running,
account_id: config.account_id.clone(),
function_name: config.function_name.clone(),
enabled: true,
block_stream_synced_at: None,
};

let mut block_streams_handler = BlockStreamsHandler::default();
block_streams_handler
.expect_get_status()
.returning(|_, _| Ok(BlockStreamStatus::Outdated));
block_streams_handler
.expect_reconfigure()
.returning(|_| Ok(()))
.once();

let mut executors_handler = ExecutorsHandler::default();
executors_handler
.expect_get_status()
.returning(|_| Ok(ExecutorStatus::Active));

let data_layer_handler = DataLayerHandler::default();
let state_manager = IndexerStateManager::default();
let registry = Registry::default();
let redis_client = RedisClient::default();

let lifecycle_manager = LifecycleManager::new(
config,
config.clone(),
&block_streams_handler,
&executors_handler,
&data_layer_handler,
@@ -676,7 +823,218 @@ mod tests {
&redis_client,
);

lifecycle_manager.handle_transitions(true).await;
lifecycle_manager.handle_running(&config, &mut state).await;
}

#[tokio::test]
async fn starts_new_streams() {
let config = IndexerConfig::default();
let mut state = IndexerState {
lifecycle_state: LifecycleState::Running,
account_id: config.account_id.clone(),
function_name: config.function_name.clone(),
enabled: true,
block_stream_synced_at: None,
};

let mut block_streams_handler = BlockStreamsHandler::default();
block_streams_handler
.expect_get_status()
.returning(|_, _| Ok(BlockStreamStatus::NotStarted));
block_streams_handler
.expect_start_new_block_stream()
.returning(|_| Ok(()))
.once();

let mut executors_handler = ExecutorsHandler::default();
executors_handler
.expect_get_status()
.returning(|_| Ok(ExecutorStatus::Active));

let data_layer_handler = DataLayerHandler::default();
let state_manager = IndexerStateManager::default();
let registry = Registry::default();
let redis_client = RedisClient::default();

let lifecycle_manager = LifecycleManager::new(
config.clone(),
&block_streams_handler,
&executors_handler,
&data_layer_handler,
&registry,
&state_manager,
&redis_client,
);

lifecycle_manager.handle_running(&config, &mut state).await;
}

#[tokio::test]
async fn ignores_active_executors() {
let config = IndexerConfig::default();
let mut state = IndexerState {
lifecycle_state: LifecycleState::Running,
account_id: config.account_id.clone(),
function_name: config.function_name.clone(),
enabled: true,
block_stream_synced_at: None,
};

let mut block_streams_handler = BlockStreamsHandler::default();
block_streams_handler
.expect_get_status()
.returning(|_, _| Ok(BlockStreamStatus::Active));

let mut executors_handler = ExecutorsHandler::default();
executors_handler
.expect_get_status()
.returning(|_| Ok(ExecutorStatus::Active));

let data_layer_handler = DataLayerHandler::default();
let state_manager = IndexerStateManager::default();
let registry = Registry::default();
let redis_client = RedisClient::default();

let lifecycle_manager = LifecycleManager::new(
config.clone(),
&block_streams_handler,
&executors_handler,
&data_layer_handler,
&registry,
&state_manager,
&redis_client,
);

lifecycle_manager.handle_running(&config, &mut state).await;
}

#[tokio::test]
async fn starts_inactive_executors() {
let config = IndexerConfig::default();
let mut state = IndexerState {
lifecycle_state: LifecycleState::Running,
account_id: config.account_id.clone(),
function_name: config.function_name.clone(),
enabled: true,
block_stream_synced_at: None,
};

let mut block_streams_handler = BlockStreamsHandler::default();
block_streams_handler
.expect_get_status()
.returning(|_, _| Ok(BlockStreamStatus::Active));

let mut executors_handler = ExecutorsHandler::default();
executors_handler
.expect_get_status()
.returning(|_| Ok(ExecutorStatus::Inactive));
executors_handler
.expect_start()
.returning(|_| Ok(()))
.once();

let data_layer_handler = DataLayerHandler::default();
let state_manager = IndexerStateManager::default();
let registry = Registry::default();
let redis_client = RedisClient::default();

let lifecycle_manager = LifecycleManager::new(
config.clone(),
&block_streams_handler,
&executors_handler,
&data_layer_handler,
&registry,
&state_manager,
&redis_client,
);

lifecycle_manager.handle_running(&config, &mut state).await;
}

#[tokio::test]
async fn restarts_unhealthy_executor() {
let config = IndexerConfig::default();
let mut state = IndexerState {
lifecycle_state: LifecycleState::Running,
account_id: config.account_id.clone(),
function_name: config.function_name.clone(),
enabled: true,
block_stream_synced_at: None,
};

let mut block_streams_handler = BlockStreamsHandler::default();
block_streams_handler
.expect_get_status()
.returning(|_, _| Ok(BlockStreamStatus::Active));

let mut executors_handler = ExecutorsHandler::default();
executors_handler
.expect_get_status()
.returning(|_| Ok(ExecutorStatus::Unhealthy));
executors_handler
.expect_restart()
.returning(|_| Ok(()))
.once();

let data_layer_handler = DataLayerHandler::default();
let state_manager = IndexerStateManager::default();
let registry = Registry::default();
let redis_client = RedisClient::default();

let lifecycle_manager = LifecycleManager::new(
config.clone(),
&block_streams_handler,
&executors_handler,
&data_layer_handler,
&registry,
&state_manager,
&redis_client,
);

lifecycle_manager.handle_running(&config, &mut state).await;
}

#[tokio::test]
async fn restarts_outdated_executor() {
let config = IndexerConfig::default();
let mut state = IndexerState {
lifecycle_state: LifecycleState::Running,
account_id: config.account_id.clone(),
function_name: config.function_name.clone(),
enabled: true,
block_stream_synced_at: None,
};

let mut block_streams_handler = BlockStreamsHandler::default();
block_streams_handler
.expect_get_status()
.returning(|_, _| Ok(BlockStreamStatus::Active));

let mut executors_handler = ExecutorsHandler::default();
executors_handler
.expect_get_status()
.returning(|_| Ok(ExecutorStatus::Outdated));
executors_handler
.expect_restart()
.returning(|_| Ok(()))
.once();

let data_layer_handler = DataLayerHandler::default();
let state_manager = IndexerStateManager::default();
let registry = Registry::default();
let redis_client = RedisClient::default();

let lifecycle_manager = LifecycleManager::new(
config.clone(),
&block_streams_handler,
&executors_handler,
&data_layer_handler,
&registry,
&state_manager,
&redis_client,
);

lifecycle_manager.handle_running(&config, &mut state).await;
}
}