Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: indexer db simple configuration #3735

Merged
merged 1 commit into from
Dec 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion db/src/db_with_ttl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use rocksdb::{
use std::path::Path;

const PROPERTY_NUM_KEYS: &str = "rocksdb.estimate-num-keys";
const DB_LOG_KEEP_NUM: usize = 10;

/// DB with ttl support wrapper
///
Expand Down Expand Up @@ -35,7 +36,7 @@ impl DBWithTTL {
let mut opts = Options::default();
opts.create_if_missing(true);
opts.create_missing_column_families(true);
opts.set_keep_log_file_num(10);
opts.set_keep_log_file_num(DB_LOG_KEEP_NUM);

let cf_descriptors: Vec<_> = cf_names
.into_iter()
Expand Down
3 changes: 1 addition & 2 deletions resource/default.db-options
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@

[DBOptions]
bytes_per_sync=1048576
max_background_compactions=4
max_background_flushes=2
max_background_jobs=6
max_total_wal_size=134217728
keep_log_file_num=32

Expand Down
9 changes: 9 additions & 0 deletions util/app-config/src/configs/indexer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use serde::{Deserialize, Serialize};
use std::num::NonZeroUsize;
use std::path::{Path, PathBuf};

/// Indexer config options.
Expand All @@ -21,6 +22,12 @@ pub struct IndexerConfig {
/// Customize cell filter
#[serde(default)]
pub cell_filter: Option<String>,
/// Maximum number of concurrent db background jobs (compactions and flushes)
#[serde(default)]
pub db_background_jobs: Option<NonZeroUsize>,
/// Maximal db info log files to be kept.
#[serde(default)]
pub db_keep_log_file_num: Option<NonZeroUsize>,
}

const fn default_poll_interval() -> u64 {
Expand All @@ -36,6 +43,8 @@ impl Default for IndexerConfig {
secondary_path: PathBuf::new(),
block_filter: None,
cell_filter: None,
db_background_jobs: None,
db_keep_log_file_num: None,
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions util/indexer/src/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1012,7 +1012,7 @@ mod tests {

fn new_indexer<S: Store>(prefix: &str) -> Indexer<S> {
let tmp_dir = tempfile::Builder::new().prefix(prefix).tempdir().unwrap();
let store = S::new(tmp_dir.path().to_str().unwrap());
let store = S::new(&S::default_options(), tmp_dir.path().to_str().unwrap());
Indexer::new(store, KEEP_NUM, 1, None, CustomFilters::new(None, None))
}

Expand Down Expand Up @@ -2005,7 +2005,7 @@ mod tests {
cell_filter_str: Option<&str>,
) -> Indexer<S> {
let tmp_dir = tempfile::Builder::new().prefix(prefix).tempdir().unwrap();
let store = S::new(tmp_dir.path().to_str().unwrap());
let store = S::new(&S::default_options(), tmp_dir.path().to_str().unwrap());
Indexer::new(
store,
10,
Expand Down
45 changes: 43 additions & 2 deletions util/indexer/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@ use ckb_store::ChainStore;
use ckb_types::{core, packed, prelude::*, H256};
use rocksdb::{prelude::*, Direction, IteratorMode};
use std::convert::TryInto;
use std::num::NonZeroUsize;
use std::sync::{Arc, RwLock};
use std::time::Duration;

const SUBSCRIBER_NAME: &str = "Indexer";
const DEFAULT_LOG_KEEP_NUM: usize = 1;
const DEFAULT_MAX_BACKGROUND_JOBS: usize = 6;

/// Indexer service
#[derive(Clone)]
Expand All @@ -51,7 +54,9 @@ impl IndexerService {
None,
"indexer".to_string(),
);
let store = RocksdbStore::new(&config.store);

let store_opts = Self::indexer_store_options(config);
let store = RocksdbStore::new(&store_opts, &config.store);
let pool = if config.index_tx_pool {
Some(Arc::new(RwLock::new(Pool::default())))
} else {
Expand All @@ -64,7 +69,9 @@ impl IndexerService {
COLUMN_BLOCK_HEADER,
COLUMN_BLOCK_BODY,
];
let secondary_opts = Self::indexer_secondary_options(config);
let secondary_db = SecondaryDB::open_cf(
&secondary_opts,
&ckb_db_config.path,
cf_names,
config.secondary_path.to_string_lossy().to_string(),
Expand Down Expand Up @@ -214,6 +221,37 @@ impl IndexerService {
let block_hash = self.secondary_db.get_block_hash(block_number)?;
self.secondary_db.get_block(&block_hash)
}

fn indexer_store_options(config: &IndexerConfig) -> Options {
let mut opts = Options::default();
opts.create_if_missing(true);
opts.set_keep_log_file_num(
config
.db_keep_log_file_num
.map(NonZeroUsize::get)
.unwrap_or(DEFAULT_LOG_KEEP_NUM),
);
opts.set_max_background_jobs(
config
.db_background_jobs
.map(NonZeroUsize::get)
.unwrap_or(DEFAULT_MAX_BACKGROUND_JOBS) as i32,
);
opts
}

fn indexer_secondary_options(config: &IndexerConfig) -> Options {
let mut opts = Options::default();
opts.create_if_missing(true);
opts.create_missing_column_families(true);
opts.set_keep_log_file_num(
config
.db_keep_log_file_num
.map(NonZeroUsize::get)
.unwrap_or(DEFAULT_LOG_KEEP_NUM),
);
opts
}
}

/// Handle to the indexer.
Expand Down Expand Up @@ -903,7 +941,10 @@ mod tests {

fn new_store(prefix: &str) -> RocksdbStore {
let tmp_dir = tempfile::Builder::new().prefix(prefix).tempdir().unwrap();
RocksdbStore::new(tmp_dir.path().to_str().unwrap())
RocksdbStore::new(
&RocksdbStore::default_options(),
tmp_dir.path().to_str().unwrap(),
)
// Indexer::new(store, 10, 1)
}

Expand Down
5 changes: 4 additions & 1 deletion util/indexer/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@ pub(crate) enum IteratorDirection {

pub(crate) trait Store {
type Batch: Batch;
type Opts;

fn new<P>(path: P) -> Self
fn new<P>(opts: &Self::Opts, path: P) -> Self
where
P: AsRef<Path>;

fn default_options() -> Self::Opts;

fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Vec<u8>>, Error>;

fn exists<K: AsRef<[u8]>>(&self, key: K) -> Result<bool, Error>;
Expand Down
31 changes: 25 additions & 6 deletions util/indexer/src/store/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,22 @@ pub(crate) struct RocksdbStore {

impl Store for RocksdbStore {
type Batch = RocksdbBatch;
type Opts = Options;

fn new<P>(path: P) -> Self
fn new<P>(opts: &Options, path: P) -> Self
where
P: AsRef<Path>,
{
let db = Arc::new(DB::open_default(path.as_ref()).expect("Failed to open rocksdb"));
let db = Arc::new(DB::open(opts, path.as_ref()).expect("Failed to open rocksdb"));
Self { db }
}

fn default_options() -> Self::Opts {
let mut opts = Options::default();
opts.create_if_missing(true);
opts
}

fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Vec<u8>>, Error> {
self.db
.get(key.as_ref())
Expand Down Expand Up @@ -95,7 +102,10 @@ mod tests {
.prefix("put_and_get")
.tempdir()
.unwrap();
let store = RocksdbStore::new(tmp_dir.path().to_str().unwrap());
let store = RocksdbStore::new(
&RocksdbStore::default_options(),
tmp_dir.path().to_str().unwrap(),
);
let mut batch = store.batch().unwrap();
batch.put(&[0, 0], &[0, 0, 0]).unwrap();
batch.put(&[1, 1], &[1, 1, 1]).unwrap();
Expand All @@ -109,7 +119,10 @@ mod tests {
#[test]
fn exists() {
let tmp_dir = tempfile::Builder::new().prefix("exists").tempdir().unwrap();
let store = RocksdbStore::new(tmp_dir.path().to_str().unwrap());
let store = RocksdbStore::new(
&RocksdbStore::default_options(),
tmp_dir.path().to_str().unwrap(),
);
assert!(!store.exists(&[0, 0]).unwrap());

let mut batch = store.batch().unwrap();
Expand All @@ -122,7 +135,10 @@ mod tests {
#[test]
fn delete() {
let tmp_dir = tempfile::Builder::new().prefix("delete").tempdir().unwrap();
let store = RocksdbStore::new(tmp_dir.path().to_str().unwrap());
let store = RocksdbStore::new(
&RocksdbStore::default_options(),
tmp_dir.path().to_str().unwrap(),
);
let mut batch = store.batch().unwrap();
batch.put(&[0, 0], &[0, 0, 0]).unwrap();
batch.commit().unwrap();
Expand All @@ -137,7 +153,10 @@ mod tests {
#[test]
fn iter() {
let tmp_dir = tempfile::Builder::new().prefix("iter").tempdir().unwrap();
let store = RocksdbStore::new(tmp_dir.path().to_str().unwrap());
let store = RocksdbStore::new(
&RocksdbStore::default_options(),
tmp_dir.path().to_str().unwrap(),
);
let mut batch = store.batch().unwrap();
batch.put(&[0, 0, 0], &[0, 0, 0]).unwrap();
batch.put(&[0, 0, 1], &[0, 0, 1]).unwrap();
Expand Down
8 changes: 2 additions & 6 deletions util/indexer/src/store/secondary_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,24 +42,20 @@ pub(crate) struct SecondaryDB {

impl SecondaryDB {
/// Open a SecondaryDB
pub fn open_cf<P, I, N>(path: P, cf_names: I, secondary_path: String) -> Self
pub fn open_cf<P, I, N>(opts: &Options, path: P, cf_names: I, secondary_path: String) -> Self
where
P: AsRef<Path>,
I: IntoIterator<Item = N>,
N: Into<String>,
{
let mut opts = Options::default();
opts.create_if_missing(true);
opts.create_missing_column_families(true);

let cf_descriptors: Vec<_> = cf_names
.into_iter()
.map(|name| ColumnFamilyDescriptor::new(name, Options::default()))
.collect();

let descriptor = SecondaryOpenDescriptor::new(secondary_path);
let inner = SecondaryRocksDB::open_cf_descriptors_with_descriptor(
&opts,
opts,
path,
cf_descriptors,
descriptor,
Expand Down