Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: notify flush receiver after write buffer is released #4476

Merged
merged 2 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion src/mito2/src/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use smallvec::SmallVec;
use snafu::ResultExt;
use store_api::storage::RegionId;
use strum::IntoStaticStr;
use tokio::sync::mpsc;
use tokio::sync::{mpsc, watch};

use crate::access_layer::{AccessLayerRef, OperationType, SstWriteRequest};
use crate::cache::CacheManagerRef;
Expand Down Expand Up @@ -88,6 +88,9 @@ pub struct WriteBufferManagerImpl {
memory_used: AtomicUsize,
/// Memory that hasn't been scheduled to free (e.g. used by mutable memtables).
memory_active: AtomicUsize,
/// Optional notifier.
/// The manager can wake up the worker once we free the write buffer.
notifier: Option<watch::Sender<()>>,
evenyag marked this conversation as resolved.
Show resolved Hide resolved
}

impl WriteBufferManagerImpl {
Expand All @@ -98,9 +101,16 @@ impl WriteBufferManagerImpl {
mutable_limit: Self::get_mutable_limit(global_write_buffer_size),
memory_used: AtomicUsize::new(0),
memory_active: AtomicUsize::new(0),
notifier: None,
}
}

/// Attaches a notifier to the manager.
pub fn with_notifier(mut self, notifier: watch::Sender<()>) -> Self {
self.notifier = Some(notifier);
self
}

/// Returns memory usage of mutable memtables.
pub fn mutable_usage(&self) -> usize {
self.memory_active.load(Ordering::Relaxed)
Expand Down Expand Up @@ -159,6 +169,12 @@ impl WriteBufferManager for WriteBufferManagerImpl {

fn free_mem(&self, mem: usize) {
self.memory_used.fetch_sub(mem, Ordering::Relaxed);
if let Some(notifier) = &self.notifier {
// Notifies the worker after the memory usage is decreased. When we drop the memtable
// outside of the worker, the worker may still stall requests because the memory usage
// is not updated. So we need to notify the worker to handle stalled requests again.
let _ = notifier.send(());
}
}

fn memory_usage(&self) -> usize {
Expand Down Expand Up @@ -786,6 +802,18 @@ mod tests {
assert!(manager.should_flush_engine());
}

#[test]
fn test_manager_notify() {
let (sender, receiver) = watch::channel(());
let manager = WriteBufferManagerImpl::new(1000).with_notifier(sender);
manager.reserve_mem(500);
assert!(!receiver.has_changed().unwrap());
manager.schedule_free_mem(500);
assert!(!receiver.has_changed().unwrap());
manager.free_mem(500);
assert!(receiver.has_changed().unwrap());
}

#[tokio::test]
async fn test_schedule_empty() {
let env = SchedulerEnv::new().await;
Expand Down
10 changes: 7 additions & 3 deletions src/mito2/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,13 @@ lazy_static! {
/// Global memtable dictionary size in bytes.
pub static ref MEMTABLE_DICT_BYTES: IntGauge =
register_int_gauge!("greptime_mito_memtable_dict_bytes", "mito memtable dictionary size in bytes").unwrap();
/// Gauge for open regions
pub static ref REGION_COUNT: IntGauge =
register_int_gauge!("greptime_mito_region_count", "mito region count").unwrap();
/// Gauge for open regions in each worker.
pub static ref REGION_COUNT: IntGaugeVec =
register_int_gauge_vec!(
"greptime_mito_region_count",
"mito region count in each worker",
&[WORKER_LABEL],
).unwrap();
/// Elapsed time to handle requests.
pub static ref HANDLE_REQUEST_ELAPSED: HistogramVec = register_histogram_vec!(
"greptime_mito_handle_request_elapsed",
Expand Down
26 changes: 16 additions & 10 deletions src/mito2/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use crate::config::MitoConfig;
use crate::error::{JoinSnafu, Result, WorkerStoppedSnafu};
use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef};
use crate::memtable::MemtableBuilderProvider;
use crate::metrics::WRITE_STALL_TOTAL;
use crate::metrics::{REGION_COUNT, WRITE_STALL_TOTAL};
use crate::region::{MitoRegionRef, OpeningRegions, OpeningRegionsRef, RegionMap, RegionMapRef};
use crate::request::{
BackgroundNotify, DdlRequest, SenderDdlRequest, SenderWriteRequest, WorkerRequest,
Expand Down Expand Up @@ -130,9 +130,11 @@ impl WorkerGroup {
object_store_manager: ObjectStoreManagerRef,
plugins: Plugins,
) -> Result<WorkerGroup> {
let write_buffer_manager = Arc::new(WriteBufferManagerImpl::new(
config.global_write_buffer_size.as_bytes() as usize,
));
let (flush_sender, flush_receiver) = watch::channel(());
let write_buffer_manager = Arc::new(
WriteBufferManagerImpl::new(config.global_write_buffer_size.as_bytes() as usize)
.with_notifier(flush_sender.clone()),
);
let puffin_manager_factory = PuffinManagerFactory::new(
&config.index.aux_path,
config.index.staging_size.as_bytes(),
Expand Down Expand Up @@ -165,7 +167,6 @@ impl WorkerGroup {
.build(),
);
let time_provider = Arc::new(StdTimeProvider);
let (flush_sender, flush_receiver) = watch::channel(());

let workers = (0..config.num_workers)
.map(|id| {
Expand Down Expand Up @@ -265,10 +266,12 @@ impl WorkerGroup {
listener: Option<crate::engine::listener::EventListenerRef>,
time_provider: TimeProviderRef,
) -> Result<WorkerGroup> {
let (flush_sender, flush_receiver) = watch::channel(());
let write_buffer_manager = write_buffer_manager.unwrap_or_else(|| {
Arc::new(WriteBufferManagerImpl::new(
config.global_write_buffer_size.as_bytes() as usize,
))
Arc::new(
WriteBufferManagerImpl::new(config.global_write_buffer_size.as_bytes() as usize)
.with_notifier(flush_sender.clone()),
)
});
let scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs));
let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs));
Expand Down Expand Up @@ -297,7 +300,6 @@ impl WorkerGroup {
.write_cache(write_cache)
.build(),
);
let (flush_sender, flush_receiver) = watch::channel(());
let workers = (0..config.num_workers)
.map(|id| {
WorkerStarter {
Expand Down Expand Up @@ -401,6 +403,7 @@ impl<S: LogStore> WorkerStarter<S> {

let running = Arc::new(AtomicBool::new(true));
let now = self.time_provider.current_time_millis();
let id_string = self.id.to_string();
let mut worker_thread = RegionWorkerLoop {
id: self.id,
config: self.config.clone(),
Expand Down Expand Up @@ -436,7 +439,8 @@ impl<S: LogStore> WorkerStarter<S> {
last_periodical_check_millis: now,
flush_sender: self.flush_sender,
flush_receiver: self.flush_receiver,
stalled_count: WRITE_STALL_TOTAL.with_label_values(&[&self.id.to_string()]),
stalled_count: WRITE_STALL_TOTAL.with_label_values(&[&id_string]),
region_count: REGION_COUNT.with_label_values(&[&id_string]),
};
let handle = common_runtime::spawn_global(async move {
worker_thread.run().await;
Expand Down Expand Up @@ -623,6 +627,8 @@ struct RegionWorkerLoop<S> {
flush_receiver: watch::Receiver<()>,
/// Gauge of stalled request count.
stalled_count: IntGauge,
/// Gauge of regions in the worker.
region_count: IntGauge,
}

impl<S: LogStore> RegionWorkerLoop<S> {
Expand Down
7 changes: 3 additions & 4 deletions src/mito2/src/worker/handle_close.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use store_api::region_request::AffectedRows;
use store_api::storage::RegionId;

use crate::error::Result;
use crate::metrics::REGION_COUNT;
use crate::worker::RegionWorkerLoop;

impl<S> RegionWorkerLoop<S> {
Expand All @@ -31,7 +30,7 @@ impl<S> RegionWorkerLoop<S> {
return Ok(0);
};

info!("Try to close region {}", region_id);
info!("Try to close region {}, worker: {}", region_id, self.id);

region.stop().await;
self.regions.remove_region(region_id);
Expand All @@ -40,9 +39,9 @@ impl<S> RegionWorkerLoop<S> {
// Clean compaction status.
self.compaction_scheduler.on_region_closed(region_id);

info!("Region {} closed", region_id);
info!("Region {} closed, worker: {}", region_id, self.id);

REGION_COUNT.dec();
self.region_count.dec();

Ok(0)
}
Expand Down
9 changes: 6 additions & 3 deletions src/mito2/src/worker/handle_create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use store_api::region_request::{AffectedRows, RegionCreateRequest};
use store_api::storage::RegionId;

use crate::error::{InvalidMetadataSnafu, Result};
use crate::metrics::REGION_COUNT;
use crate::region::opener::{check_recovered_region, RegionOpener};
use crate::worker::RegionWorkerLoop;

Expand Down Expand Up @@ -70,9 +69,13 @@ impl<S: LogStore> RegionWorkerLoop<S> {
.create_or_open(&self.config, &self.wal)
.await?;

info!("A new region created, region: {:?}", region.metadata());
info!(
"A new region created, worker: {}, region: {:?}",
self.id,
region.metadata()
);

REGION_COUNT.inc();
self.region_count.inc();

// Insert the MitoRegion into the RegionMap.
self.regions.insert_region(Arc::new(region));
Expand Down
5 changes: 2 additions & 3 deletions src/mito2/src/worker/handle_drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use store_api::storage::RegionId;
use tokio::time::sleep;

use crate::error::{OpenDalSnafu, Result};
use crate::metrics::REGION_COUNT;
use crate::region::{RegionMapRef, RegionState};
use crate::worker::{RegionWorkerLoop, DROPPING_MARKER_FILE};

Expand All @@ -45,7 +44,7 @@ where
) -> Result<AffectedRows> {
let region = self.regions.writable_region(region_id)?;

info!("Try to drop region: {}", region_id);
info!("Try to drop region: {}, worker: {}", region_id, self.id);

// Marks the region as dropping.
region.set_dropping()?;
Expand Down Expand Up @@ -93,7 +92,7 @@ where
region_id
);

REGION_COUNT.dec();
self.region_count.dec();

// Detaches a background task to delete the region dir
let region_dir = region.access_layer.region_dir().to_owned();
Expand Down
14 changes: 9 additions & 5 deletions src/mito2/src/worker/handle_open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use store_api::storage::RegionId;
use crate::error::{
ObjectStoreNotFoundSnafu, OpenDalSnafu, OpenRegionSnafu, RegionNotFoundSnafu, Result,
};
use crate::metrics::REGION_COUNT;
use crate::region::opener::RegionOpener;
use crate::request::OptionOutputTx;
use crate::wal::entry_distributor::WalEntryReceiver;
Expand Down Expand Up @@ -56,7 +55,10 @@ impl<S: LogStore> RegionWorkerLoop<S> {
.context(OpenDalSnafu)?
{
let result = remove_region_dir_once(&request.region_dir, object_store).await;
info!("Region {} is dropped, result: {:?}", region_id, result);
info!(
"Region {} is dropped, worker: {}, result: {:?}",
region_id, self.id, result
);
return RegionNotFoundSnafu { region_id }.fail();
}

Expand Down Expand Up @@ -84,7 +86,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
sender.send(Err(err));
return;
}
info!("Try to open region {}", region_id);
info!("Try to open region {}, worker: {}", region_id, self.id);

// Open region from specific region dir.
let opener = match RegionOpener::new(
Expand Down Expand Up @@ -112,12 +114,14 @@ impl<S: LogStore> RegionWorkerLoop<S> {
let wal = self.wal.clone();
let config = self.config.clone();
let opening_regions = self.opening_regions.clone();
let region_count = self.region_count.clone();
let worker_id = self.id;
opening_regions.insert_sender(region_id, sender);
common_runtime::spawn_global(async move {
match opener.open(&config, &wal).await {
Ok(region) => {
info!("Region {} is opened", region_id);
REGION_COUNT.inc();
info!("Region {} is opened, worker: {}", region_id, worker_id);
region_count.inc();

// Insert the Region into the RegionMap.
regions.insert_region(Arc::new(region));
Expand Down