Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(vm-runner): Allow switching between VMs for latest protocol version – follow ups #2567

Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci-core-reusable.yml
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ jobs:
base_token: ["Eth", "Custom"]
deployment_mode: ["Rollup", "Validium"]
env:
SERVER_COMPONENTS: "api,tree,eth,state_keeper,housekeeper,commitment_generator,vm_runner_protective_reads,vm_runner_bwip,da_dispatcher${{ matrix.consensus && ',consensus' || '' }}${{ matrix.base_token == 'Custom' && ',base_token_ratio_persister' || '' }}"
SERVER_COMPONENTS: "api,tree,eth,state_keeper,housekeeper,commitment_generator,vm_runner_protective_reads,vm_runner_bwip,vm_playground,da_dispatcher${{ matrix.consensus && ',consensus' || '' }}${{ matrix.base_token == 'Custom' && ',base_token_ratio_persister' || '' }}"

runs-on: [matterlabs-ci-runner]
steps:
Expand Down
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
use async_trait::async_trait;
use zksync_config::configs::ExperimentalVmPlaygroundConfig;
use zksync_node_framework_derive::{FromContext, IntoContext};
use zksync_state_keeper::MainBatchExecutor;
use zksync_types::L2ChainId;
use zksync_vm_runner::{
impls::{VmPlayground, VmPlaygroundIo, VmPlaygroundLoaderTask},
ConcurrentOutputHandlerFactoryTask,
};

use crate::{
implementations::resources::pools::{MasterPool, PoolResource},
implementations::resources::{
healthcheck::AppHealthCheckResource,
pools::{MasterPool, PoolResource},
},
StopReceiver, Task, TaskId, WiringError, WiringLayer,
};

Expand All @@ -32,6 +34,8 @@ impl VmPlaygroundLayer {
#[context(crate = crate)]
pub struct Input {
pub master_pool: PoolResource<MasterPool>,
#[context(default)]
pub app_health: AppHealthCheckResource,
}

#[derive(Debug, IntoContext)]
Expand All @@ -55,7 +59,10 @@ impl WiringLayer for VmPlaygroundLayer {
}

async fn wire(self, input: Self::Input) -> Result<Self::Output, WiringError> {
let Input { master_pool } = input;
let Input {
master_pool,
app_health,
} = input;

// - 1 connection for `StorageSyncTask` which can hold a long-term connection in case it needs to
// catch up cache.
Expand All @@ -64,19 +71,21 @@ impl WiringLayer for VmPlaygroundLayer {
// - 1 connection for the only running VM instance.
let connection_pool = master_pool.get_custom(3).await?;

let mut batch_executor = Box::new(MainBatchExecutor::new(false, false));
batch_executor.set_fast_vm_mode(self.config.fast_vm_mode);

let (playground, tasks) = VmPlayground::new(
connection_pool,
batch_executor,
self.config.fast_vm_mode,
self.config.db_path,
self.zksync_network_id,
self.config.first_processed_batch,
self.config.reset,
)
.await?;

app_health
.0
.insert_component(playground.health_check())
.map_err(WiringError::internal)?;

Ok(Output {
output_handler_factory_task: tasks.output_handler_factory_task,
loader_task: tasks.loader_task,
Expand Down
2 changes: 2 additions & 0 deletions core/node/vm_runner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ zksync_utils.workspace = true
zksync_prover_interface.workspace = true
zksync_object_store.workspace = true
zksync_vm_utils.workspace = true
zksync_health_check.workspace = true

serde.workspace = true
tokio = { workspace = true, features = ["time"] }
anyhow.workspace = true
async-trait.workspace = true
Expand Down
51 changes: 45 additions & 6 deletions core/node/vm_runner/src/impls/playground.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,40 @@ use std::{

use anyhow::Context as _;
use async_trait::async_trait;
use serde::Serialize;
use tokio::{
fs,
sync::{oneshot, watch},
};
use zksync_dal::{Connection, ConnectionPool, Core, CoreDal};
use zksync_health_check::{Health, HealthStatus, HealthUpdater, ReactiveHealthCheck};
use zksync_state::RocksdbStorage;
use zksync_state_keeper::{BatchExecutor, StateKeeperOutputHandler, UpdatesManager};
use zksync_types::{L1BatchNumber, L2ChainId};
use zksync_state_keeper::{MainBatchExecutor, StateKeeperOutputHandler, UpdatesManager};
use zksync_types::{vm::FastVmMode, L1BatchNumber, L2ChainId};

use crate::{
ConcurrentOutputHandlerFactory, ConcurrentOutputHandlerFactoryTask, OutputHandlerFactory,
StorageSyncTask, VmRunner, VmRunnerIo, VmRunnerStorage,
};

#[derive(Debug, Serialize)]
struct VmPlaygroundHealth {
vm_mode: FastVmMode,
last_processed_batch: L1BatchNumber,
}

impl From<VmPlaygroundHealth> for Health {
fn from(health: VmPlaygroundHealth) -> Self {
Health::from(HealthStatus::Ready).with_details(health)
}
}

/// Virtual machine playground. Does not persist anything in Postgres; instead, keeps an L1 batch cursor as a plain text file in the RocksDB directory
/// (so that the playground doesn't repeatedly process same batches after a restart).
#[derive(Debug)]
pub struct VmPlayground {
pool: ConnectionPool<Core>,
batch_executor: Box<dyn BatchExecutor>,
batch_executor: MainBatchExecutor,
rocksdb_path: String,
chain_id: L2ChainId,
io: VmPlaygroundIo,
Expand All @@ -39,14 +53,14 @@ impl VmPlayground {
/// Creates a new playground.
pub async fn new(
pool: ConnectionPool<Core>,
batch_executor: Box<dyn BatchExecutor>,
vm_mode: FastVmMode,
rocksdb_path: String,
chain_id: L2ChainId,
first_processed_batch: L1BatchNumber,
reset_state: bool,
) -> anyhow::Result<(Self, VmPlaygroundTasks)> {
tracing::info!(
"Starting VM playground with executor {batch_executor:?}, first processed batch is #{first_processed_batch} \
"Starting VM playground with mode {vm_mode:?}, first processed batch is #{first_processed_batch} \
(reset processing: {reset_state:?})"
);

Expand All @@ -59,9 +73,14 @@ impl VmPlayground {
latest_processed_batch.unwrap_or(first_processed_batch)
};

let mut batch_executor = MainBatchExecutor::new(false, false);
batch_executor.set_fast_vm_mode(vm_mode);

let io = VmPlaygroundIo {
cursor_file_path,
vm_mode,
latest_processed_batch: Arc::new(watch::channel(latest_processed_batch).0),
health_updater: Arc::new(ReactiveHealthCheck::new("vm_playground").1),
};
let (output_handler_factory, output_handler_factory_task) =
ConcurrentOutputHandlerFactory::new(
Expand Down Expand Up @@ -92,6 +111,11 @@ impl VmPlayground {
))
}

/// Returns a health check for this component.
pub fn health_check(&self) -> ReactiveHealthCheck {
self.io.health_updater.subscribe()
}

#[cfg(test)]
pub(crate) fn io(&self) -> &VmPlaygroundIo {
&self.io
Expand Down Expand Up @@ -123,6 +147,8 @@ impl VmPlayground {
.with_context(|| format!("cannot create dir `{}`", self.rocksdb_path))?;

if let Some(reset_to_batch) = self.reset_to_batch {
self.io.health_updater.update(HealthStatus::Affected.into());

self.reset_rocksdb_cache(reset_to_batch).await?;
self.io
.write_cursor(reset_to_batch)
Expand All @@ -131,6 +157,8 @@ impl VmPlayground {
tracing::info!("Finished resetting playground state");
}

self.io.update_health();

let (loader, loader_task) = VmRunnerStorage::new(
self.pool.clone(),
self.rocksdb_path,
Expand All @@ -144,7 +172,7 @@ impl VmPlayground {
Box::new(self.io),
Arc::new(loader),
Box::new(self.output_handler_factory),
self.batch_executor,
Box::new(self.batch_executor),
);
vm_runner.run(stop_receiver).await
}
Expand Down Expand Up @@ -184,9 +212,11 @@ pub struct VmPlaygroundTasks {
#[derive(Debug, Clone)]
pub struct VmPlaygroundIo {
cursor_file_path: PathBuf,
vm_mode: FastVmMode,
// We don't read this value from the cursor file in the `VmRunnerIo` implementation because reads / writes
// aren't guaranteed to be atomic.
latest_processed_batch: Arc<watch::Sender<L1BatchNumber>>,
health_updater: Arc<HealthUpdater>,
}

impl VmPlaygroundIo {
Expand Down Expand Up @@ -218,6 +248,14 @@ impl VmPlaygroundIo {
})
}

fn update_health(&self) {
let health = VmPlaygroundHealth {
vm_mode: self.vm_mode,
last_processed_batch: *self.latest_processed_batch.borrow(),
};
self.health_updater.update(health.into());
}

#[cfg(test)]
pub(crate) fn subscribe_to_completed_batches(&self) -> watch::Receiver<L1BatchNumber> {
self.latest_processed_batch.subscribe()
Expand Down Expand Up @@ -268,6 +306,7 @@ impl VmRunnerIo for VmPlaygroundIo {
self.write_cursor(l1_batch_number).await?;
// We should only update the in-memory value after the write to the cursor file succeeded.
self.latest_processed_batch.send_replace(l1_batch_number);
self.update_health();
Ok(())
}
}
Expand Down
17 changes: 13 additions & 4 deletions core/node/vm_runner/src/tests/playground.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use test_casing::test_casing;
use tokio::sync::watch;
use zksync_health_check::HealthStatus;
use zksync_node_genesis::{insert_genesis_batch, GenesisParams};
use zksync_state::RocksdbStorage;
use zksync_state_keeper::MainBatchExecutor;
use zksync_types::vm::FastVmMode;

use super::*;
Expand Down Expand Up @@ -33,11 +33,9 @@ async fn run_playground(
.unwrap();
}

let mut batch_executor = MainBatchExecutor::new(false, false);
batch_executor.set_fast_vm_mode(FastVmMode::Shadow);
let (playground, playground_tasks) = VmPlayground::new(
pool.clone(),
Box::new(batch_executor),
FastVmMode::Shadow,
rocksdb_dir.path().to_str().unwrap().to_owned(),
genesis_params.config().l2_chain_id,
L1BatchNumber(0),
Expand All @@ -62,6 +60,7 @@ async fn run_playground(
.unwrap(),
L1BatchNumber(1)
);
let mut health_check = playground.health_check();

let mut completed_batches = playground_io.subscribe_to_completed_batches();
let task_handles = [
Expand All @@ -78,6 +77,16 @@ async fn run_playground(
.wait_for(|&number| number == L1BatchNumber(1))
.await
.unwrap();
health_check
.wait_for(|health| {
if !matches!(health.status(), HealthStatus::Ready) {
return false;
}
let health_details = health.details().unwrap();
assert_eq!(health_details["vm_mode"], "shadow");
health_details["last_processed_batch"] == 1_u64
})
.await;

// Check that playground I/O works correctly.
assert_eq!(
Expand Down
59 changes: 56 additions & 3 deletions core/tests/ts-integration/src/context-owner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import * as zksync from 'zksync-ethers';
import * as ethers from 'ethers';
import { BigNumberish } from 'ethers';

import { TestContext, TestEnvironment, TestWallets } from './types';
import { NodeMode, TestContext, TestEnvironment, TestWallets } from './types';
import { lookupPrerequisites } from './prerequisites';
import { Reporter } from './reporter';
import { scaledGasPrice } from './helpers';
Expand Down Expand Up @@ -541,17 +541,70 @@ export class TestContextOwner {
this.reporter.finishAction();
}

private async waitForVmPlayground() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May I ask you to add somewhat verbose comments on what do these functions do and why do we need them? Because there is a good chance that in the future someone will hard time understanding what's going on in here.

while (true) {
const lastProcessedBatch = await this.lastPlaygroundBatch();
if (lastProcessedBatch === undefined) {
this.reporter.warn('The node does not run VM playground; run to check old / new VM divergence');
break;
}
const lastNodeBatch = await this.l2Provider.getL1BatchNumber();

this.reporter.debug(`VM playground progress: L1 batch #${lastProcessedBatch} / ${lastNodeBatch}`);
if (lastProcessedBatch >= lastNodeBatch) {
break;
}
await zksync.utils.sleep(500);
}
}

private async lastPlaygroundBatch() {
interface VmPlaygroundHealth {
readonly status: string;
readonly details?: {
vm_mode?: string;
last_processed_batch?: number;
};
}

interface NodeHealth {
readonly components: {
vm_playground?: VmPlaygroundHealth;
};
}

const healthcheckPort = process.env.API_HEALTHCHECK_PORT ?? '3071';
const nodeHealth = (await (await fetch(`http://127.0.0.1:${healthcheckPort}/health`)).json()) as NodeHealth;
const playgroundHealth = nodeHealth.components.vm_playground;
if (playgroundHealth === undefined) {
return undefined;
}
if (playgroundHealth.status !== 'ready') {
throw new Error(`Unexpected VM playground health status: ${playgroundHealth.status}`);
}
if (playgroundHealth.details?.vm_mode !== 'shadow') {
this.reporter.warn(
`VM playground mode is '${playgroundHealth.details?.vm_mode}'; should be set to 'shadow' to check VM divergence`
);
return undefined;
}
return playgroundHealth.details?.last_processed_batch ?? 0;
}

/**
* Performs context deinitialization.
*/
async teardownContext() {
// Reset the reporter context.
this.reporter = new Reporter();
try {
if (this.env.nodeMode == NodeMode.Main && this.env.network === 'localhost') {
this.reporter.startAction('Waiting for VM playground to catch up');
await this.waitForVmPlayground();
this.reporter.finishAction();
}
this.reporter.startAction(`Tearing down the context`);

await this.collectFunds();

this.reporter.finishAction();
} catch (error: any) {
// Report the issue to the console and mark the last action as failed.
Expand Down
4 changes: 2 additions & 2 deletions core/tests/ts-integration/src/reporter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ export class Reporter {
/**
* Prints an error message to the console.
*/
error(message: string) {
console.log(this.indent(`${errorPrefix('Error:')}: ${fail(message)}`));
error(message: string, ...args: any[]) {
console.log(this.indent(`${errorPrefix('Error:')}: ${fail(message)}`), ...args);
}

/**
Expand Down
Loading
Loading