Skip to content

Commit

Permalink
Support pd task observer to compute store stats for TiFlash (#136)
Browse files Browse the repository at this point in the history
* raftstore: Implement observer on_compute_engine_size (tikv#12948)

ref tikv#12849

Implement observer on_compute_engine_size

Signed-off-by: CalvinNeo <[email protected]>

Co-authored-by: Xinye Tao <[email protected]>
Co-authored-by: Ti Chi Robot <[email protected]>

* inc gc time

Signed-off-by: CalvinNeo <[email protected]>

Co-authored-by: Xinye Tao <[email protected]>
Co-authored-by: Ti Chi Robot <[email protected]>
  • Loading branch information
3 people authored Aug 9, 2022
1 parent ec6e7ab commit 56761b7
Show file tree
Hide file tree
Showing 10 changed files with 226 additions and 69 deletions.
27 changes: 27 additions & 0 deletions components/raftstore/src/coprocessor/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ impl_box_observer_g!(
SplitCheckObserver,
WrappedSplitCheckObserver
);
impl_box_observer!(BoxPdTaskObserver, PdTaskObserver, WrappedPdTaskObserver);
impl_box_observer!(BoxRoleObserver, RoleObserver, WrappedRoleObserver);
impl_box_observer!(
BoxRegionChangeObserver,
Expand Down Expand Up @@ -176,6 +177,7 @@ where
region_change_observers: Vec<Entry<BoxRegionChangeObserver>>,
cmd_observers: Vec<Entry<BoxCmdObserver<E>>>,
read_index_observers: Vec<Entry<BoxReadIndexObserver>>,
pd_task_observers: Vec<Entry<BoxPdTaskObserver>>,
// TODO: add endpoint
}

Expand All @@ -191,6 +193,7 @@ impl<E: KvEngine> Default for Registry<E> {
region_change_observers: Default::default(),
cmd_observers: Default::default(),
read_index_observers: Default::default(),
pd_task_observers: Default::default(),
}
}
}
Expand Down Expand Up @@ -237,6 +240,10 @@ impl<E: KvEngine> Registry<E> {
push!(priority, cco, self.consistency_check_observers);
}

pub fn register_pd_task_observer(&mut self, priority: u32, ro: BoxPdTaskObserver) {
push!(priority, ro, self.pd_task_observers);
}

pub fn register_role_observer(&mut self, priority: u32, ro: BoxRoleObserver) {
push!(priority, ro, self.role_observers);
}
Expand Down Expand Up @@ -515,6 +522,15 @@ impl<E: KvEngine> CoprocessorHost<E> {
Ok(hashes)
}

pub fn on_compute_engine_size(&self) -> Option<StoreSizeInfo> {
let mut store_size = None;
for observer in &self.registry.pd_task_observers {
let observer = observer.observer.inner();
observer.on_compute_engine_size(&mut store_size);
}
store_size
}

pub fn on_role_change(&self, region: &Region, role_change: RoleChange) {
loop_ob!(
region,
Expand Down Expand Up @@ -688,6 +704,12 @@ mod tests {
}
}

impl PdTaskObserver for TestCoprocessor {
fn on_compute_engine_size(&self, _: &mut Option<StoreSizeInfo>) {
self.called.fetch_add(19, Ordering::SeqCst);
}
}

impl RoleObserver for TestCoprocessor {
fn on_role_change(&self, ctx: &mut ObserverContext<'_>, _: &RoleChange) {
self.called.fetch_add(7, Ordering::SeqCst);
Expand Down Expand Up @@ -762,6 +784,8 @@ mod tests {
.register_query_observer(1, BoxQueryObserver::new(ob.clone()));
host.registry
.register_apply_snapshot_observer(1, BoxApplySnapshotObserver::new(ob.clone()));
host.registry
.register_pd_task_observer(1, BoxPdTaskObserver::new(ob.clone()));
host.registry
.register_role_observer(1, BoxRoleObserver::new(ob.clone()));
host.registry
Expand Down Expand Up @@ -826,6 +850,9 @@ mod tests {
admin_req.set_admin_request(AdminRequest::default());
host.pre_exec(&region, &admin_req, 0, 0);
assert_all!([&ob.called], &[119]); // 16

host.on_compute_engine_size();
assert_all!([&ob.called], &[138]); // 19
}

#[test]
Expand Down
22 changes: 20 additions & 2 deletions components/raftstore/src/coprocessor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ pub use self::{
consistency_check::{ConsistencyCheckObserver, Raw as RawConsistencyCheckObserver},
dispatcher::{
BoxAdminObserver, BoxApplySnapshotObserver, BoxCmdObserver, BoxConsistencyCheckObserver,
BoxQueryObserver, BoxRegionChangeObserver, BoxRoleObserver, BoxSplitCheckObserver,
CoprocessorHost, Registry,
BoxPdTaskObserver, BoxQueryObserver, BoxRegionChangeObserver, BoxRoleObserver,
BoxSplitCheckObserver, CoprocessorHost, Registry,
},
error::{Error, Result},
region_info_accessor::{
Expand Down Expand Up @@ -169,6 +169,24 @@ pub trait SplitCheckObserver<E>: Coprocessor {
);
}

/// Describes size information about all stores.
/// There is guarantee that capacity >= used + avail.
/// since some space can be reserved.
#[derive(Debug, Default)]
pub struct StoreSizeInfo {
/// The capacity of the store.
pub capacity: u64,
/// Size of actual data.
pub used: u64,
/// Available space that can be written with actual data.
pub avail: u64,
}

pub trait PdTaskObserver: Coprocessor {
/// Compute capacity/used/available size of this store.
fn on_compute_engine_size(&self, _: &mut Option<StoreSizeInfo>) {}
}

pub struct RoleChange {
pub state: StateRole,
pub leader_id: u64,
Expand Down
24 changes: 18 additions & 6 deletions components/raftstore/src/engine_store_ffi/observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ use yatp::{
use crate::{
coprocessor::{
AdminObserver, ApplySnapshotObserver, BoxAdminObserver, BoxApplySnapshotObserver,
BoxQueryObserver, BoxRegionChangeObserver, Cmd, Coprocessor, CoprocessorHost,
ObserverContext, QueryObserver, RegionChangeEvent, RegionChangeObserver,
BoxPdTaskObserver, BoxQueryObserver, BoxRegionChangeObserver, Cmd, Coprocessor,
CoprocessorHost, ObserverContext, PdTaskObserver, QueryObserver, RegionChangeEvent,
RegionChangeObserver, StoreSizeInfo,
},
engine_store_ffi::{
gen_engine_store_server_helper,
Expand Down Expand Up @@ -149,10 +150,10 @@ impl TiFlashObserver {
TIFLASH_OBSERVER_PRIORITY,
BoxRegionChangeObserver::new(self.clone()),
);
// coprocessor_host.registry.register_pd_task_observer(
// TIFLASH_OBSERVER_PRIORITY,
// BoxPdTaskObserver::new(self.clone()),
// );
coprocessor_host.registry.register_pd_task_observer(
TIFLASH_OBSERVER_PRIORITY,
BoxPdTaskObserver::new(self.clone()),
);
}
}

Expand Down Expand Up @@ -234,3 +235,14 @@ impl RegionChangeObserver for TiFlashObserver {
}
}
}

impl PdTaskObserver for TiFlashObserver {
fn on_compute_engine_size(&self, store_size: &mut Option<StoreSizeInfo>) {
let stats = self.engine_store_server_helper.handle_compute_store_stats();
store_size.insert(StoreSizeInfo {
capacity: stats.fs_stats.capacity_size,
used: stats.fs_stats.used_size,
avail: stats.fs_stats.avail_size,
});
}
}
2 changes: 2 additions & 0 deletions components/raftstore/src/store/fsm/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1602,6 +1602,7 @@ impl<EK: KvEngine, ER: RaftEngine> RaftBatchSystem<EK, ER> {
let (raft_builder, apply_builder) = (builder.clone(), apply_poller_builder.clone());

let tag = format!("raftstore-{}", store.get_id());
let coprocessor_host = builder.coprocessor_host.clone();
self.system.spawn(tag, builder);

let mut mailboxes = Vec::with_capacity(region_peers.len());
Expand Down Expand Up @@ -1650,6 +1651,7 @@ impl<EK: KvEngine, ER: RaftEngine> RaftBatchSystem<EK, ER> {
collector_reg_handle,
region_read_progress,
health_service,
coprocessor_host,
);
assert!(workers.pd_worker.start_with_timer(pd_runner));

Expand Down
Loading

0 comments on commit 56761b7

Please sign in to comment.