Skip to content

Commit

Permalink
refactor: set prepare_for_bulk_load option for migration
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangsoledad committed Mar 8, 2021
1 parent b112895 commit 220464f
Show file tree
Hide file tree
Showing 12 changed files with 131 additions and 62 deletions.
10 changes: 6 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions ckb-bin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ ckb-chain-iter = { path = "../util/chain-iter", version = "= 0.41.0-pre" }
ckb-verification = { path = "../verification", version = "= 0.41.0-pre" }
ckb-verification-traits = { path = "../verification/traits", version = "= 0.41.0-pre" }
ckb-async-runtime = { path = "../util/runtime", version = "= 0.41.0-pre" }
ckb-db = { path = "../db", version = "= 0.41.0-pre" }
base64 = "0.13.0"
tempfile = "3.0"
rayon = "1.0"
Expand Down
1 change: 1 addition & 0 deletions ckb-bin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ pub fn run_app(version: Version) -> Result<(), ExitCode> {
(cli::CMD_STATS, Some(matches)) => subcommand::stats(setup.stats(&matches)?, handle),
(cli::CMD_RESET_DATA, Some(matches)) => subcommand::reset_data(setup.reset_data(&matches)?),
(cli::CMD_MIGRATE, Some(matches)) => subcommand::migrate(setup.migrate(&matches)?, handle),
(cli::CMD_DB_REPAIR, Some(matches)) => subcommand::db_repair(setup.db_repair(&matches)?),
_ => unreachable!(),
}
}
11 changes: 11 additions & 0 deletions ckb-bin/src/subcommand/db_repair.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
use ckb_app_config::{ExitCode, RepairArgs};
use ckb_db::RocksDB;

pub fn db_repair(args: RepairArgs) -> Result<(), ExitCode> {
RocksDB::repair(&args.config.db.path).map_err(|err| {
eprintln!("repair error: {:?}", err);
ExitCode::Failure
})?;

Ok(())
}
2 changes: 2 additions & 0 deletions ckb-bin/src/subcommand/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod db_repair;
mod export;
mod import;
mod init;
Expand All @@ -10,6 +11,7 @@ mod reset_data;
mod run;
mod stats;

pub use self::db_repair::db_repair;
pub use self::export::export;
pub use self::import::import;
pub use self::init::init;
Expand Down
4 changes: 2 additions & 2 deletions db/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ repository = "https://github.com/nervosnetwork/ckb"
ckb-app-config = { path = "../util/app-config", version = "= 0.41.0-pre" }
ckb-logger = { path = "../util/logger", version = "= 0.41.0-pre" }
ckb-error = { path = "../error", version = "= 0.41.0-pre" }
tempfile = "3.0"
libc = "0.2"
rocksdb = { package = "ckb-rocksdb", version = "=0.14.1", features = ["snappy"] }
rocksdb = { package = "ckb-rocksdb", version = "=0.15.1", features = ["snappy"] }
ckb-db-schema = { path = "../db-schema", version = "= 0.41.0-pre" }
tempfile = "3.0"
108 changes: 61 additions & 47 deletions db/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::write_batch::RocksDBWriteBatch;
use crate::{internal_error, Result};
use ckb_app_config::DBConfig;
use ckb_db_schema::Col;
use ckb_logger::{info, warn};
use ckb_logger::info;
use rocksdb::ops::{
CompactRangeCF, CreateCF, DropCF, GetColumnFamilys, GetPinned, GetPinnedCF, IterateCF, OpenCF,
Put, SetOptions, WriteOps,
Expand All @@ -14,6 +14,7 @@ use rocksdb::{
ffi, ColumnFamily, ColumnFamilyDescriptor, DBPinnableSlice, FullOptions, IteratorMode,
OptimisticTransactionDB, OptimisticTransactionOptions, Options, WriteBatch, WriteOptions,
};
use std::path::Path;
use std::sync::Arc;

/// RocksDB wrapper base on OptimisticTransactionDB
Expand Down Expand Up @@ -61,54 +62,11 @@ impl RocksDB {
(opts, cf_descriptors)
};

opts.create_if_missing(false);
opts.create_if_missing(true);
opts.create_missing_column_families(true);

let db = OptimisticTransactionDB::open_cf_descriptors(
&opts,
&config.path,
cf_descriptors.clone(),
)
.or_else(|err| {
let err_str = err.as_ref();
if err_str.starts_with("Invalid argument:")
&& err_str.ends_with("does not exist (create_if_missing is false)")
{
info!("Initialize a new database");
opts.create_if_missing(true);
let db = OptimisticTransactionDB::open_cf_descriptors(
&opts,
&config.path,
cf_descriptors.clone(),
)
.map_err(|err| {
internal_error(format!("failed to open a new created database: {}", err))
})?;
Ok(db)
} else if err.as_ref().starts_with("Corruption:") {
warn!("Repairing the rocksdb since {} ...", err);
let mut repair_opts = Options::default();
repair_opts.create_if_missing(false);
repair_opts.create_missing_column_families(false);
OptimisticTransactionDB::repair(repair_opts, &config.path).map_err(|err| {
internal_error(format!("failed to repair the database: {}", err))
})?;
warn!("Opening the repaired rocksdb ...");
OptimisticTransactionDB::open_cf_descriptors(
&opts,
&config.path,
cf_descriptors.clone(),
)
.map_err(|err| {
internal_error(format!("failed to open the repaired database: {}", err))
})
} else {
Err(internal_error(format!(
"failed to open the database: {}",
err
)))
}
})?;
let db = OptimisticTransactionDB::open_cf_descriptors(&opts, &config.path, cf_descriptors)
.map_err(|err| internal_error(format!("failed to open database: {}", err)))?;

if !config.options.is_empty() {
let rocksdb_options: Vec<(&str, &str)> = config
Expand All @@ -125,6 +83,17 @@ impl RocksDB {
})
}

/// Repairer does best effort recovery to recover as much data as possible
/// after a disaster without compromising consistency.
/// It does not guarantee bringing the database to a time consistent state.
/// Note: Currently there is a limitation that un-flushed column families will be lost after repair.
/// This would happen even if the DB is in healthy state.
pub fn repair<P: AsRef<Path>>(path: P) -> Result<()> {
let repair_opts = Options::default();
OptimisticTransactionDB::repair(repair_opts, path)
.map_err(|err| internal_error(format!("failed to repair database: {}", err)))
}

/// Open a database with the given configuration and columns count.
pub fn open(config: &DBConfig, columns: u32) -> Self {
Self::open_with_check(config, columns).unwrap_or_else(|err| panic!("{}", err))
Expand All @@ -140,6 +109,51 @@ impl RocksDB {
Self::open_with_check(&config, columns).unwrap_or_else(|err| panic!("{}", err))
}

/// Set appropriate parameters for bulk loading.
pub fn prepare_for_bulk_load_open<P: AsRef<Path>>(
path: P,
columns: u32,
) -> Result<Option<Self>> {
let mut opts = Options::default();

opts.create_missing_column_families(true);
opts.set_prepare_for_bulk_load();

let cfnames: Vec<_> = (0..columns).map(|c| c.to_string()).collect();
let cf_options: Vec<&str> = cfnames.iter().map(|n| n as &str).collect();

OptimisticTransactionDB::open_cf(&opts, path, &cf_options).map_or_else(
|err| {
let err_str = err.as_ref();
if err_str.starts_with("Invalid argument:")
&& err_str.ends_with("does not exist (create_if_missing is false)")
{
Ok(None)
} else if err_str.starts_with("Corruption:") {
info!(
"DB corrupted: {}.\n\
Try ckb db-repair command to repair DB.\n\
Note: Currently there is a limitation that un-flushed column families will be lost after repair.\
This would happen even if the DB is in healthy state.\n\
See https://github.com/facebook/rocksdb/wiki/RocksDB-Repairer for detail",
err_str
);
Err(internal_error("DB corrupted"))
} else {
Err(internal_error(format!(
"failed to open the database: {}",
err
)))
}
},
|db| {
Ok(Some(RocksDB {
inner: Arc::new(db),
}))
},
)
}

/// Return the value associated with a key using RocksDB's PinnableSlice from the given column
/// so as to avoid unnecessary memory copy.
pub fn get_pinned(&self, col: Col, key: &[u8]) -> Result<Option<DBPinnableSlice>> {
Expand Down
1 change: 1 addition & 0 deletions shared/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,4 @@ ckb-migration-template = { path = "migration-template", version = "= 0.41.0-pre"
ckb-constant = { path = "../util/constant", version = "= 0.41.0-pre" }
num_cpus = "1.10"
faketime = "0.2.0"
tempfile = "3.0"
33 changes: 25 additions & 8 deletions shared/src/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ impl Shared {

/// TODO(doc): @quake
pub struct SharedBuilder {
db: RocksDB,
db_config: DBConfig,
ancient_path: Option<PathBuf>,
consensus: Option<Consensus>,
tx_pool_config: Option<TxPoolConfig>,
Expand All @@ -520,16 +520,15 @@ const INIT_DB_VERSION: &str = "20191127135521";

impl SharedBuilder {
/// Generates the base SharedBuilder with ancient path and async_handle
pub fn new(config: &DBConfig, ancient: Option<PathBuf>, async_handle: Handle) -> Self {
let db = RocksDB::open(config, COLUMNS);
pub fn new(db_config: &DBConfig, ancient: Option<PathBuf>, async_handle: Handle) -> Self {
let mut migrations = Migrations::default();
migrations.add_migration(Box::new(DefaultMigration::new(INIT_DB_VERSION)));
migrations.add_migration(Box::new(migrations::ChangeMoleculeTableToStruct));
migrations.add_migration(Box::new(migrations::CellMigration));
migrations.add_migration(Box::new(migrations::AddNumberHashMapping));

SharedBuilder {
db,
db_config: db_config.clone(),
ancient_path: ancient,
consensus: None,
tx_pool_config: None,
Expand All @@ -545,8 +544,13 @@ impl SharedBuilder {
/// Generates the SharedBuilder with temp db
pub fn with_temp_db() -> Self {
let (handle, stop) = new_global_runtime();
let tmp_dir = tempfile::Builder::new().tempdir().unwrap();
let db_config = DBConfig {
path: tmp_dir.path().to_path_buf(),
..Default::default()
};
SharedBuilder {
db: RocksDB::open_tmp(COLUMNS),
db_config,
ancient_path: None,
consensus: None,
tx_pool_config: None,
Expand All @@ -565,12 +569,18 @@ impl SharedBuilder {
///
/// Return true if migration is required
pub fn migration_check(&self) -> bool {
self.migrations.check(&self.db)
RocksDB::prepare_for_bulk_load_open(&self.db_config.path, COLUMNS)
.unwrap_or_else(|err| panic!("{}", err))
.map(|db| self.migrations.check(&db))
.unwrap_or(false)
}

/// Check whether database requires expensive migrations.
pub fn require_expensive_migrations(&self) -> bool {
self.migrations.expensive(&self.db)
RocksDB::prepare_for_bulk_load_open(&self.db_config.path, COLUMNS)
.unwrap_or_else(|err| panic!("{}", err))
.map(|db| self.migrations.expensive(&db))
.unwrap_or(false)
}

/// TODO(doc): @quake
Expand Down Expand Up @@ -615,7 +625,14 @@ impl SharedBuilder {
let tx_pool_config = self.tx_pool_config.unwrap_or_else(Default::default);
let notify_config = self.notify_config.unwrap_or_else(Default::default);
let store_config = self.store_config.unwrap_or_else(Default::default);
let db = self.migrations.migrate(self.db)?;

if let Some(migration_db) =
RocksDB::prepare_for_bulk_load_open(&self.db_config.path, COLUMNS)?
{
self.migrations.migrate(migration_db)?;
}

let db = RocksDB::open(&self.db_config, COLUMNS);
let store = if store_config.freezer_enable && self.ancient_path.is_some() {
let freezer = Freezer::open(self.ancient_path.expect("exist checked"))?;
ChainDB::new_with_freezer(db, freezer, store_config)
Expand Down
6 changes: 6 additions & 0 deletions util/app-config/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,12 @@ pub struct MigrateArgs {
pub force: bool,
}

/// Parsed command line arguments for `ckb db-repair`.
pub struct RepairArgs {
/// Parsed `ckb.toml`.
pub config: Box<CKBAppConfig>,
}

impl CustomizeSpec {
/// No specified parameters for chain spec.
pub fn is_unset(&self) -> bool {
Expand Down
7 changes: 7 additions & 0 deletions util/app-config/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ pub const CMD_GEN_SECRET: &str = "gen";
pub const CMD_FROM_SECRET: &str = "from-secret";
/// Subcommand `migrate`.
pub const CMD_MIGRATE: &str = "migrate";
/// Subcommand `db-repair`.
pub const CMD_DB_REPAIR: &str = "db-repair";

/// Command line argument `--config-dir`.
pub const ARG_CONFIG_DIR: &str = "config-dir";
Expand Down Expand Up @@ -136,6 +138,7 @@ fn basic_app<'b>() -> App<'static, 'b> {
.subcommand(reset_data())
.subcommand(peer_id())
.subcommand(migrate())
.subcommand(db_repair())
}

/// Parse the command line arguments by supplying the version information.
Expand Down Expand Up @@ -345,6 +348,10 @@ fn migrate() -> App<'static, 'static> {
)
}

fn db_repair() -> App<'static, 'static> {
SubCommand::with_name(CMD_DB_REPAIR).about("Try repair ckb database")
}

fn list_hashes() -> App<'static, 'static> {
SubCommand::with_name(CMD_LIST_HASHES)
.about("Lists well known hashes")
Expand Down
9 changes: 8 additions & 1 deletion util/app-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ mod sentry_config;

pub use app_config::{AppConfig, CKBAppConfig, MinerAppConfig};
pub use args::{
ExportArgs, ImportArgs, InitArgs, MigrateArgs, MinerArgs, PeerIDArgs, ReplayArgs,
ExportArgs, ImportArgs, InitArgs, MigrateArgs, MinerArgs, PeerIDArgs, RepairArgs, ReplayArgs,
ResetDataArgs, RunArgs, StatsArgs,
};
pub use configs::*;
Expand Down Expand Up @@ -108,6 +108,13 @@ impl Setup {
})
}

/// `db-repair` subcommand
pub fn db_repair<'m>(self, _matches: &ArgMatches<'m>) -> Result<RepairArgs, ExitCode> {
let config = self.config.into_ckb()?;

Ok(RepairArgs { config })
}

/// Executes `ckb miner`.
pub fn miner<'m>(self, matches: &ArgMatches<'m>) -> Result<MinerArgs, ExitCode> {
let spec = self.chain_spec()?;
Expand Down

0 comments on commit 220464f

Please sign in to comment.