diff --git a/cmd/zfs_object_agent/.cargo/config.toml b/cmd/zfs_object_agent/.cargo/config.toml index b441aa8fa90d..d5955cf2b05a 100644 --- a/cmd/zfs_object_agent/.cargo/config.toml +++ b/cmd/zfs_object_agent/.cargo/config.toml @@ -9,3 +9,9 @@ rustflags = [ "-Dclippy::dbg_macro", ] +# include frame pointers so that we get a little bit of stack trace for +# flame graphs (without full dwarf unwinding) +[target.x86_64-unknown-linux-gnu] +rustflags = [ + "-C", "force-frame-pointers=yes", +] diff --git a/cmd/zfs_object_agent/Cargo.lock b/cmd/zfs_object_agent/Cargo.lock index 6e8fa5e001c2..7e3be71ceb20 100644 --- a/cmd/zfs_object_agent/Cargo.lock +++ b/cmd/zfs_object_agent/Cargo.lock @@ -89,6 +89,15 @@ dependencies = [ "synattra", ] +[[package]] +name = "async-lock" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e97a171d191782fba31bb902b14ad94e24a68145032b7eedf871ab0bc0d077b6" +dependencies = [ + "event-listener", +] + [[package]] name = "async-stream" version = "0.3.3" @@ -226,6 +235,34 @@ dependencies = [ "uuid 0.8.2", ] +[[package]] +name = "azure_core" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca4393afee90ad13c987a2cbfeb5bbb0b9fb3c86585e42ed3ed151babaa93da1" +dependencies = [ + "async-trait", + "base64", + "bytes", + "chrono", + "dyn-clone", + "futures", + "getrandom", + "http", + "log", + "oauth2", + "pin-project", + "rand", + "reqwest", + "rustc_version", + "serde", + "serde_derive", + "serde_json", + "thiserror", + "url", + "uuid 1.0.0", +] + [[package]] name = "azure_identity" version = "0.1.1" @@ -234,7 +271,29 @@ checksum = "ebda98657980528a8f0f0f7cc85c88c7dabc160e026bf258d06e54b77b698b08" dependencies = [ "async-timer", "async-trait", - "azure_core", + "azure_core 0.1.1", + "chrono", + "futures", + "log", + "oauth2", + "reqwest", + "serde", + "serde_json", + "thiserror", + "url", +] + +[[package]] +name = "azure_identity" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2dc06f2cd86f196a9d33258bc486d611720420e39cf3ff15d0fb6a81f75bd72" +dependencies = [ + "async-lock", + "async-timer", + "async-trait", + "azure_core 0.2.2", + "base64", "chrono", "futures", "log", @@ -244,6 +303,7 @@ dependencies = [ "serde_json", "thiserror", "url", + "uuid 1.0.0", ] [[package]] @@ -254,7 +314,7 @@ checksum = "22c413e8459badf86c9e6e0c84f5894609663bcc8fa5eb1e49bfb985273dac58" dependencies = [ "RustyXML", "async-trait", - "azure_core", + "azure_core 0.1.1", "base64", "bytes", "chrono", @@ -272,6 +332,33 @@ dependencies = [ "uuid 0.8.2", ] +[[package]] +name = "azure_storage" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a9f2aee687da9817f7b332e1e01dda51cd9f7a0a68a5abcfec7c4c494a65546" +dependencies = [ + "RustyXML", + "async-trait", + "azure_core 0.2.2", + "base64", + "bytes", + "chrono", + "futures", + "hmac 0.12.1", + "http", + "log", + "once_cell", + "serde", + "serde-xml-rs", + "serde_derive", + "serde_json", + "sha2 0.10.2", + "thiserror", + "url", + "uuid 1.0.0", +] + [[package]] name = "azure_storage_blobs" version = "0.1.0" @@ -279,8 +366,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a70ec6fab8a2cae5d774098267870c0f3fbef1cb63cac12afab38b8c17cc8d97" dependencies = [ "RustyXML", - "azure_core", - "azure_storage", + "azure_core 0.1.1", + "azure_storage 0.1.0", "base64", "bytes", "chrono", @@ -297,6 +384,31 @@ dependencies = [ "uuid 0.8.2", ] +[[package]] +name = "azure_storage_blobs" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d17982127c4a34736a60656ddbd05b1714420686b6e6304145ee3b4501395e75" +dependencies = [ + "RustyXML", + "azure_core 0.2.2", + "azure_storage 0.2.0", + "base64", + "bytes", + "chrono", + "futures", + "http", + "log", + "md5", + "serde", + "serde-xml-rs", + "serde_derive", + "serde_json", + "thiserror", + "url", + "uuid 1.0.0", +] + [[package]] name = "backtrace" version = "0.3.65" @@ -694,6 +806,7 @@ checksum = "f2fb860ca6fafa5552fb6d0e816a69c8e49f0908bf524e30a90d97c85892d506" dependencies = [ "block-buffer 0.10.2", "crypto-common", + "subtle", ] [[package]] @@ -775,6 +888,12 @@ dependencies = [ "str-buf", ] +[[package]] +name = "event-listener" +version = "2.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77f3309417938f28bf8228fcff79a4a37103981e3e186d2ccd19c74b38f4eb71" + [[package]] name = "exitcode" version = "1.1.2" @@ -808,19 +927,6 @@ dependencies = [ "miniz_oxide", ] -[[package]] -name = "flume" -version = "0.10.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "843c03199d0c0ca54bc1ea90ac0d507274c28abcc4f691ae8b4eaa375087c76a" -dependencies = [ - "futures-core", - "futures-sink", - "nanorand", - "pin-project", - "spin 0.9.3", -] - [[package]] name = "fnv" version = "1.0.7" @@ -1129,6 +1235,15 @@ dependencies = [ "digest 0.9.0", ] +[[package]] +name = "hmac" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e" +dependencies = [ + "digest 0.10.3", +] + [[package]] name = "hostname" version = "0.3.1" @@ -1530,15 +1645,6 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7843ec2de400bcbc6a6328c958dc38e5359da6e93e72e37bc5246bf1ae776389" -[[package]] -name = "nanorand" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" -dependencies = [ - "getrandom", -] - [[package]] name = "native-tls" version = "0.2.10" @@ -1865,26 +1971,6 @@ dependencies = [ "sha-1", ] -[[package]] -name = "pin-project" -version = "1.0.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58ad3879ad3baf4e44784bc6a718a8698867bb991f8ce24d1bcbe2cfb4c3a75e" -dependencies = [ - "pin-project-internal", -] - -[[package]] -name = "pin-project-internal" -version = "1.0.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "744b6f092ba29c3650faf274db506afd39944f48420f6c86b17cfe0ee1cb36bb" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "pin-project-lite" version = "0.2.9" @@ -2106,7 +2192,7 @@ dependencies = [ "cc", "libc", "once_cell", - "spin 0.5.2", + "spin", "untrusted", "web-sys", "winapi 0.3.9", @@ -2191,7 +2277,7 @@ dependencies = [ "digest 0.9.0", "futures", "hex", - "hmac", + "hmac 0.11.0", "http", "hyper", "log", @@ -2541,15 +2627,6 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" -[[package]] -name = "spin" -version = "0.9.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c530c2b0d0bf8b69304b39fe2001993e267461948b890cd037d8ad4293fa1a0d" -dependencies = [ - "lock_api", -] - [[package]] name = "str-buf" version = "1.0.5" @@ -3275,7 +3352,6 @@ dependencies = [ "conv", "derivative", "either", - "flume", "futures", "futures-core", "lazy_static", @@ -3306,10 +3382,10 @@ dependencies = [ "arr_macro", "async-stream", "async-trait", - "azure_core", - "azure_identity", - "azure_storage", - "azure_storage_blobs", + "azure_core 0.2.2", + "azure_identity 0.3.0", + "azure_storage 0.2.0", + "azure_storage_blobs 0.2.0", "base64", "bincode", "bytes", @@ -3347,6 +3423,7 @@ dependencies = [ "stream-reduce", "tokio", "tokio-stream", + "url", "util", "uuid 1.0.0", "zettacache", @@ -3409,10 +3486,10 @@ name = "zoa_test" version = "0.1.0" dependencies = [ "anyhow", - "azure_core", - "azure_identity", - "azure_storage", - "azure_storage_blobs", + "azure_core 0.1.1", + "azure_identity 0.1.1", + "azure_storage 0.1.0", + "azure_storage_blobs 0.1.0", "chrono", "clap", "futures", diff --git a/cmd/zfs_object_agent/client/src/main.rs b/cmd/zfs_object_agent/client/src/main.rs index 8418b2771ac3..8ed3b0444957 100644 --- a/cmd/zfs_object_agent/client/src/main.rs +++ b/cmd/zfs_object_agent/client/src/main.rs @@ -55,6 +55,13 @@ const BUCKET_NAME: &str = "cloudburst-data-2"; const POOL_NAME: &str = "testpool"; const POOL_GUID: u64 = 1234; +// The default URL including port number for Azurite. +const AZURITE_EMULATOR_URL: &str = "http://127.0.0.1:10000"; +// The well-known account and key used by Azurite. +const AZURITE_EMULATOR_ACCOUNT: &str = "devstoreaccount1"; +const AZURITE_EMULATOR_ACCOUNT_KEY: &str = + "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="; + static GIT_VERSION: &str = git_version!( fallback = match option_env!("CARGO_ZOA_GITREV") { Some(value) => value, @@ -544,6 +551,8 @@ struct Cli { azure_key: Option, #[clap(short = 'm', long)] managed_identity: bool, + #[clap(long, conflicts_with_all = &["managed-identity", "azure-account", "azure-key"])] + emulator: bool, #[clap(short, long, parse(from_occurrences))] verbose: usize, @@ -666,7 +675,17 @@ impl From for CliParams { } } Protocol::Blob => ObjectAccessProtocol::Blob { - credentials: if cli.managed_identity { + endpoint: if cli.emulator { + Some(AZURITE_EMULATOR_URL.to_string()) + } else { + None + }, + credentials: if cli.emulator { + BlobCredentials::Key { + azure_account: AZURITE_EMULATOR_ACCOUNT.to_string(), + azure_key: AZURITE_EMULATOR_ACCOUNT_KEY.to_string(), + } + } else if cli.managed_identity { BlobCredentials::ManagedCredentials { azure_account: cli.azure_account.unwrap(), } diff --git a/cmd/zfs_object_agent/server/src/main.rs b/cmd/zfs_object_agent/server/src/main.rs index 0bf0ef557ecb..86ebc7dd22f6 100644 --- a/cmd/zfs_object_agent/server/src/main.rs +++ b/cmd/zfs_object_agent/server/src/main.rs @@ -231,13 +231,14 @@ fn main() { }); } Some(Commands::TestConnectivityBlob { - endpoint: _, // XXX DLPX-80615 + endpoint, bucket, azure_account, azure_key, managed_identity, }) => { let protocol = ObjectAccessProtocol::Blob { + endpoint, credentials: if managed_identity { BlobCredentials::ManagedCredentials { azure_account } } else { diff --git a/cmd/zfs_object_agent/util/src/zettacache_stats.rs b/cmd/zfs_object_agent/util/src/zettacache_stats.rs index 666035fbf5c9..7e67562b874e 100644 --- a/cmd/zfs_object_agent/util/src/zettacache_stats.rs +++ b/cmd/zfs_object_agent/util/src/zettacache_stats.rs @@ -38,7 +38,7 @@ use crate::nice_p2size; use crate::write_stdout; /// The zettacache disk I/O types that are collected and displayed for each disk. -#[derive(Debug, Enum, Copy, Clone, Serialize, Deserialize)] +#[derive(Debug, Enum, Copy, Clone, Serialize, Deserialize, PartialEq)] pub enum DiskIoType { ReadDataForLookup, ReadIndexForLookup, @@ -349,10 +349,9 @@ impl IoStats { pub fn max_name_len(&self) -> usize { self.disk_stats .iter() - .max_by_key(|stats| stats.name.len()) - .unwrap() - .name - .len() + .map(|stats| stats.name.len()) + .max() + .unwrap_or_default() } } diff --git a/cmd/zfs_object_agent/zettacache/Cargo.toml b/cmd/zfs_object_agent/zettacache/Cargo.toml index bbd2f0577547..dcff5c3b492c 100644 --- a/cmd/zfs_object_agent/zettacache/Cargo.toml +++ b/cmd/zfs_object_agent/zettacache/Cargo.toml @@ -18,7 +18,6 @@ chrono = "0.4" conv = "0.3.3" derivative = "2.2.0" either = "1.6.1" -flume = "0.10.10" futures = "0.3.13" futures-core = "0.3.13" lazy_static = "1.4.0" diff --git a/cmd/zfs_object_agent/zettacache/src/block_access.rs b/cmd/zfs_object_agent/zettacache/src/block_access.rs index 3777bd9f1f8d..99fdcd2db459 100644 --- a/cmd/zfs_object_agent/zettacache/src/block_access.rs +++ b/cmd/zfs_object_agent/zettacache/src/block_access.rs @@ -9,6 +9,7 @@ use std::os::unix::prelude::OpenOptionsExt; use std::path::Path; use std::path::PathBuf; use std::sync::atomic::Ordering; +use std::sync::Arc; use std::sync::Mutex; use std::sync::RwLock; use std::thread::sleep; @@ -54,12 +55,14 @@ use crate::base_types::Extent; tunable! { static ref MIN_SECTOR_SIZE: usize = 512; - static ref DISK_WRITE_MAX_QUEUE_DEPTH: usize = 32; - // Stop aggregating if run would exceed DISK_WRITE_MAX_AGGREGATION_SIZE + // Stop aggregating if run would exceed DISK_READ/WRITE_MAX_AGGREGATION_SIZE + pub static ref DISK_READ_MAX_AGGREGATION_SIZE: ByteSize = ByteSize::kib(128); pub static ref DISK_WRITE_MAX_AGGREGATION_SIZE: ByteSize = ByteSize::kib(128); // CHUNK must be > MAX_AGG_SIZE, see Disk::write() - static ref DISK_WRITE_CHUNK: ByteSize = ByteSize::mib(1); + static ref DISK_AGG_CHUNK: ByteSize = ByteSize::mib(1); static ref DISK_WRITE_QUEUE_EMPTY_DELAY: Duration = Duration::from_millis(1); + static ref DISK_READ_QUEUE_EMPTY_DELAY: Duration = Duration::from_micros(10); + static ref DISK_WRITE_MAX_QUEUE_DEPTH: usize = 32; static ref DISK_METADATA_WRITE_MAX_QUEUE_DEPTH: usize = 16; pub static ref DISK_READ_MAX_QUEUE_DEPTH: usize = 64; } @@ -141,13 +144,7 @@ pub struct BlockAccess { #[derive(Derivative)] #[derivative(Debug)] pub struct Disk { - // We want all the reader/writer_threads to share the same file descriptor, but we don't have - // a mechanism to ensure that they stop using the fd when the DiskStruct is dropped and the - // fd is closed. To solve this we simply never close the fd. The fd is owned by the File, - // and we leave a reference to it here to indicate that it's related to this Disk, even - // though it's only used via the reader/writer_threads. - #[allow(dead_code)] - file: &'static File, + file: Arc, path: PathBuf, canonical_path: PathBuf, @@ -156,7 +153,7 @@ pub struct Disk { #[derivative(Debug = "ignore")] io_stats: &'static DiskIoStats, #[derivative(Debug = "ignore")] - reader_tx: flume::Sender, + reader_txs: Vec>, #[derivative(Debug = "ignore")] writer_txs: Vec>, #[derivative(Debug = "ignore")] @@ -217,14 +214,12 @@ impl Disk { .open(path) .with_context(|| format!("opening disk {path:?}"))?; // see comment in `struct Disk` - let file = &*Box::leak(Box::new(file)); - let (sector_size, size) = disk_sizes(file)?; + let file = Arc::new(file); + let (sector_size, size) = disk_sizes(&file)?; let short_name = path.file_name().unwrap().to_string_lossy().to_string(); let canonical_path = Path::new(path).canonicalize()?; - let (reader_tx, reader_rx) = flume::unbounded(); - let mut writer_txs = Vec::new(); let mut writer_rxs = Vec::new(); for _ in 0..*DISK_WRITE_MAX_QUEUE_DEPTH { @@ -241,6 +236,14 @@ impl Disk { metadata_writer_rxs.push(rx); } + let mut reader_txs = Vec::new(); + let mut reader_rxs = Vec::new(); + for _ in 0..*DISK_READ_MAX_QUEUE_DEPTH { + let (tx, rx) = mpsc::unbounded_channel(); + reader_txs.push(tx); + reader_rxs.push(rx); + } + let io_stats = &*Box::leak(Box::new(DiskIoStats::new(short_name))); let this = Disk { @@ -250,22 +253,23 @@ impl Disk { size: Mutex::new(size), sector_size, io_stats, - reader_tx, + reader_txs, writer_txs, metadata_writer_txs, }; - for _ in 0..*DISK_READ_MAX_QUEUE_DEPTH { - let rx = reader_rx.clone(); - // note, we want to use a "std" thread here rather than - // tokio::task::spawn_blocking() because the latter has a limit of how many - // threads it will create (default 512) + // note, we use "std" threads rather than tokio::task::spawn_blocking() because the + // latter has a limit of how many threads it will create (default 512) + + for rx in reader_rxs { + let file = this.file.clone(); std::thread::spawn(move || { Self::reader_thread(file, io_stats, sector_size, rx); }); } if !readonly { for rx in writer_rxs { + let file = this.file.clone(); std::thread::spawn(move || { Self::writer_thread( file, @@ -276,6 +280,7 @@ impl Disk { }); } for rx in metadata_writer_rxs { + let file = this.file.clone(); std::thread::spawn(move || { Self::writer_thread( file, @@ -292,32 +297,94 @@ impl Disk { } fn reader_thread( - file: &'static File, + file: Arc, io_stats: &'static DiskIoStats, sector_size: usize, - rx: flume::Receiver, + mut rx: mpsc::UnboundedReceiver, ) { - while let Ok(message) = rx.recv() { - let op = OpInProgress::new(&io_stats.stats[message.io_type]); - let vec = measure!() - .func(|| { - pread_aligned( - file, - message.offset.try_into().unwrap(), - message.size, - sector_size, - ) - }) + /// returns (offsets, total_bytes) + fn find_run<'a, I: Iterator>( + mut iter: I, + ) -> Option<(Vec, usize, DiskIoType)> { + let (mut run, mut len, io_type) = if let Some((&offset, message)) = iter.next() { + (vec![offset], message.size, message.io_type) + } else { + return None; + }; + for (&offset, message) in iter { + if len > 0 && len + message.size > DISK_READ_MAX_AGGREGATION_SIZE.as_usize() { + break; + } + if message.io_type != io_type { + break; + } + if offset == run[0] + len as u64 { + run.push(offset); + len += message.size; + } else { + break; + } + } + Some((run, len, io_type)) + } + + let read_impl = |offset: u64, size, io_type| { + let op = OpInProgress::new(&io_stats.stats[io_type]); + let vec = measure!("pread_aligned") + .func(|| pread_aligned(&file, offset.try_into().unwrap(), size, sector_size)) .unwrap(); assert_eq!( vec.len(), - message.size, + size, "fd={}, offset={}", file.as_raw_fd(), - message.offset + offset ); - op.end(message.size as u64); - message.tx.send(vec.into()).unwrap(); + op.end(size as u64); + AlignedBytes::from(vec) + }; + + let mut sorted: BTreeMap = BTreeMap::new(); + let mut prev_offset = 0; + + // When all the Senders of receiver `rx` are dropped (i.e. when the + // respective Disk of this thread is dropped), `blocking_recv()` will + // return an error and this thread will terminate without being leaked. + loop { + // Look for next run of messages in sorted queue + if let Some((run, len, io_type)) = find_run(iter_wrapping(&sorted, prev_offset)) { + // Issue one read for this run (which may have multiple messages) + let first_offset = run[0]; + let bytes = read_impl(first_offset, len, io_type); + for offset in run { + let message = sorted.remove(&offset).unwrap(); + assert_eq!(message.io_type, io_type); + let relative_offset = (message.offset - first_offset).as_usize(); + let slice = &bytes[relative_offset..relative_offset + message.size]; + message.tx.send(bytes.slice_ref(slice)).unwrap(); + } + prev_offset = first_offset; + } else { + // Nothing in `sorted`; wait for a message + let message = match rx.blocking_recv() { + Some(message) => message, + None => return, + }; + sorted.insert(message.offset, message); + // Delay a bit to allow for more messages to arrive, to improve our chances of + // aggregation. + sleep(*DISK_READ_QUEUE_EMPTY_DELAY); + } + + // Receive as many messages as we can without blocking + while let Ok(message) = rx.try_recv() { + if let Some(message) = sorted.insert(message.offset, message) { + // duplicate read inserted; issue the old one immediately + measure!("duplicate read").hit(); + let bytes = read_impl(message.offset, message.size, message.io_type); + message.tx.send(bytes).unwrap(); + } + } } } @@ -341,13 +408,15 @@ impl Disk { tx, }; - // note: reader_tx is unbounded, so .send() will not block - self.reader_tx.send(message).unwrap(); + let reader = usize::from64(offset / DISK_AGG_CHUNK.as_u64() % self.reader_txs.len() as u64); + self.reader_txs[reader] + .send(message) + .unwrap_or_else(|e| panic!("reader_txs[{}].send: {}", reader, e)); async move { measure!().fut(rx).await.unwrap() } } fn writer_thread( - file: &'static File, + file: Arc, stat_values: &'static IoStatValues, sector_size: usize, mut rx: mpsc::UnboundedReceiver, @@ -379,6 +448,9 @@ impl Disk { let mut sorted: BTreeMap = BTreeMap::new(); let mut prev_offset = 0; + // When all the Senders of receiver `rx` are dropped (i.e. when the + // respective Disk of this thread is dropped), `blocking_recv()` will + // return an error and this thread will terminate without being leaked. loop { // Look for next run of messages in sorted queue let (run, len) = find_run(iter_wrapping(&sorted, prev_offset)); @@ -485,14 +557,14 @@ impl Disk { _ => panic!("invalid {:?} for write", io_type), }; // Dispatch this write to a writer thread, determined based on its offset. The first - // DISK_WRITE_CHUNK (default 1MB) of the disk goes to the first thread, the second chunk + // DISK_AGG_CHUNK (default 1MB) of the disk goes to the first thread, the second chunk // to the second thread, and so on, wrapping back around to the first thread. Note that // each block allocator slab (32MB) is mapped to multiple threads, so the work is // distributed to multiple threads even when it's concentrated among a small number of // slabs. The CHUNK (1MB) is larger than the DISK_WRITE_MAX_AGGREGATION_SIZE (128KB) so // that we can find aggregations that cross MAX_AGG_SIZE boundaries (e.g. from offsets // 100KB to 228KB). - let writer = usize::from64(offset / DISK_WRITE_CHUNK.as_u64() % txs.len() as u64); + let writer = usize::from64(offset / DISK_AGG_CHUNK.as_u64() % txs.len() as u64); txs[writer] .send(message) .unwrap_or_else(|e| panic!("writer_txs[{}].send: {}", writer, e)); @@ -553,7 +625,7 @@ impl BlockAccess { pub fn expand_disk(&self, disk: DiskId) -> Result { let disks = self.disks.read().unwrap(); let disk = &disks[disk.index()]; - let (_, new_size) = disk_sizes(disk.file)?; + let (_, new_size) = disk_sizes(&disk.file)?; let mut size = disk.size.lock().unwrap(); let additional_bytes = new_size.checked_sub(*size).ok_or_else(|| { anyhow!( diff --git a/cmd/zfs_object_agent/zettaobject/Cargo.toml b/cmd/zfs_object_agent/zettaobject/Cargo.toml index 0029de5db102..61c0c253f661 100644 --- a/cmd/zfs_object_agent/zettaobject/Cargo.toml +++ b/cmd/zfs_object_agent/zettaobject/Cargo.toml @@ -10,10 +10,10 @@ publish = false [dependencies] anyhow = "1.0" arr_macro = "0.1.3" -azure_core = { version = "0.1.1"} # , default-features = false, features = ["enable_hyper"] -azure_identity = "0.1.1" -azure_storage = "0.1.0" -azure_storage_blobs = "0.1.0" +azure_core = { version = "0.2.2"} # , default-features = false, features = ["enable_hyper", "azurite_workaround"] +azure_identity = "0.3.0" +azure_storage = "0.2.0" +azure_storage_blobs = "0.2.0" async-stream = "0.3.0" async-trait = "0.1.52" base64 = "0.13.0" @@ -53,6 +53,7 @@ serde-xml-rs = "0.5.1" stream-reduce = "0.1.0" tokio = { version = "1.4", features = ["full"] } tokio-stream = "0.1.5" +url = "2.2" util = { path = "../util" } uuid = {version = "1.0.0", features = ["v4", "serde"]} zettacache = { path = "../zettacache" } diff --git a/cmd/zfs_object_agent/zettaobject/src/object_access/blob.rs b/cmd/zfs_object_agent/zettaobject/src/object_access/blob.rs index 2d240c67599a..04f12e11bfec 100644 --- a/cmd/zfs_object_agent/zettaobject/src/object_access/blob.rs +++ b/cmd/zfs_object_agent/zettaobject/src/object_access/blob.rs @@ -15,8 +15,8 @@ use anyhow::Result; use async_stream::try_stream; use async_trait::async_trait; use azure_core::HttpError; -use azure_identity::token_credentials::ImdsManagedIdentityCredential; -use azure_identity::token_credentials::TokenCredential; +use azure_identity::ImdsManagedIdentityCredential; +use azure_identity::TokenCredential; use azure_storage::clients::AsStorageClient; use azure_storage::clients::StorageAccountClient; use azure_storage::clients::StorageClient; @@ -40,6 +40,7 @@ use more_asserts::assert_le; use rusoto_core::ByteStream; use tokio::io::AsyncReadExt; use tokio::sync::RwLock; +use url::Url; use util::tunable; use super::retry; @@ -151,16 +152,12 @@ where HttpError::ExecuteRequest(err) => Self::InternalError(err.to_string()), HttpError::ReadBytes(err) => Self::InternalError(err.to_string()), HttpError::BuildResponse(err) => Self::InternalError(err.to_string()), - /* - * XXX Long term, we probably want to handle this, but for now this error - * probably only happens if we mess up our code for building requests. Panic - * to make it easier to debug and develop. - */ - HttpError::StreamReset(err) => panic!("{}", err), + HttpError::Url(err) => Self::InternalError(err.to_string()), _ => todo!(), } } } + impl From for OAError where E: MaybeFrom + Display, @@ -176,8 +173,8 @@ struct BlobBucketClient { } impl BlobBucketClient { - async fn new(credentials: BlobCredentials) -> Result { - let (storage_client, expires_on) = get_azure_storage_client(credentials).await?; + async fn new(endpoint: Option, credentials: BlobCredentials) -> Result { + let (storage_client, expires_on) = get_azure_storage_client(endpoint, credentials).await?; let blob_service = storage_client.as_blob_service_client(); Ok(Self { blob_service, @@ -198,14 +195,17 @@ impl BlobBucketClient { pub struct BlobBucketAccess { blob_bucket_client: RwLock, + endpoint: Option, credentials: BlobCredentials, } impl BlobBucketAccess { - pub async fn new(credentials: BlobCredentials) -> Result { - let blob_bucket_client = BlobBucketClient::new(credentials.clone()).await?; + pub async fn new(endpoint: Option, credentials: BlobCredentials) -> Result { + let blob_bucket_client = + BlobBucketClient::new(endpoint.clone(), credentials.clone()).await?; Ok(Self { blob_bucket_client: RwLock::new(blob_bucket_client), + endpoint, credentials, }) } @@ -214,7 +214,7 @@ impl BlobBucketAccess { let mut blob_bucket_client = self.blob_bucket_client.write().await; // Expiry might have been checked earlier but we check again after taking the write lock. if blob_bucket_client.is_expired() { - match BlobBucketClient::new(self.credentials.clone()).await { + match BlobBucketClient::new(self.endpoint.clone(), self.credentials.clone()).await { Ok(new_blob_bucket_client) => { info!("BlobServiceClient refreshed after the credential tokens expired"); *blob_bucket_client = new_blob_bucket_client; @@ -290,9 +290,13 @@ struct BlobContainerClient { } impl BlobContainerClient { - async fn new(bucket: &str, credentials: BlobCredentials) -> Result { + async fn new( + endpoint: Option, + bucket: &str, + credentials: BlobCredentials, + ) -> Result { let (storage_account_client, expires_on) = - get_azure_storage_client(credentials.clone()).await?; + get_azure_storage_client(endpoint, credentials.clone()).await?; let container_client = storage_account_client.as_container_client(bucket); Ok(Self { @@ -314,6 +318,7 @@ impl BlobContainerClient { pub struct BlobObjectAccess { blob_container_client: RwLock, + endpoint: Option, bucket: String, credentials: BlobCredentials, access_stats: ObjectAccessStats, @@ -325,7 +330,13 @@ impl BlobObjectAccess { let mut blob_container_client = self.blob_container_client.write().await; // Expiry might have been checked earlier but we check again after taking the write lock. if blob_container_client.is_expired() { - match BlobContainerClient::new(&self.bucket, self.credentials.clone()).await { + match BlobContainerClient::new( + self.endpoint.clone(), + &self.bucket, + self.credentials.clone(), + ) + .await + { Ok(new_container_client) => { info!("ContainerClient refreshed after the credential tokens expired"); *blob_container_client = new_container_client; @@ -358,13 +369,19 @@ impl BlobObjectAccess { self.update_container_client().await } - pub async fn new(bucket: &str, credentials: BlobCredentials) -> Result { - let blob_client = BlobContainerClient::new(bucket, credentials.clone()).await?; + pub async fn new( + endpoint: Option, + bucket: &str, + credentials: BlobCredentials, + ) -> Result { + let blob_client = + BlobContainerClient::new(endpoint.clone(), bucket, credentials.clone()).await?; Ok(Self { blob_container_client: RwLock::new(blob_client), access_stats: Default::default(), outstanding_ops: Default::default(), + endpoint, bucket: bucket.to_string(), credentials, }) @@ -374,6 +391,10 @@ impl BlobObjectAccess { self.bucket.clone() } + pub fn endpoint(&self) -> Option { + self.endpoint.clone() + } + pub fn credentials_profile(&self) -> Option { if let BlobCredentials::Profile(profile) = &self.credentials { Some(profile.clone()) @@ -386,8 +407,41 @@ impl BlobObjectAccess { where T: MaybeFrom + Display, { - let http_error: Box = e.downcast().unwrap(); - OAError::from(*http_error) + match e.downcast::() { + Ok(http_error) => OAError::from(*http_error), + Err(err) => { + match err.downcast::() { + Ok(az_err) => { + match *az_err { + azure_storage::Error::CoreError(azure_core::Error::Other( + other_error, + )) => { + // Azurite does not support soft-delete. But it also does not tell + // us that the delete is permanent by including + // "x-ms-delete-type-permanent" in the header. The azure-sdk + // looks for this header and not finding it, throws a CoreError. + // See: https://github.com/Azure/azure-sdk-for-rust/issues/780 + // This is propagated as RequestError::EmulatorBug. + if other_error + .to_string() + .contains("x-ms-delete-type-permanent") + { + OAError::RequestError(RequestError::EmulatorBug(format!( + "EmulatorBug: {other_error}" + ))) + } else { + OAError::Other(anyhow!( + "azure_storage::Error::CoreError: {other_error}" + )) + } + } + _ => OAError::Other(anyhow!("azure_storage::Error: {az_err}")), + } + } + Err(e) => OAError::Other(anyhow!(e)), + } + } + } } } @@ -496,14 +550,17 @@ impl ObjectAccessTrait for BlobObjectAccess { match blob_client.delete().execute().await { Err(e) => { debug!("error while deleting: {}", e); - let err = Self::convert_error::(e); - if let OAError::RequestError(RequestError::Service( - ObjectStoreError::NoSuchKey, - )) = err - { - Ok(None) - } else { - Err(err) + match Self::convert_error::(e) { + OAError::RequestError(RequestError::Service( + ObjectStoreError::NoSuchKey, + )) => Ok(None), + OAError::RequestError(RequestError::EmulatorBug(s)) => { + trace!( + "Hit emulator error which is expected; ignoring {s}" + ); + Ok(None) + } + err => Err(err), } } Ok(res) => { @@ -639,7 +696,7 @@ async fn get_azure_storage_client_with_managed_key( // There is a new AutoRefreshingTokenCredential wrapper in the repo that has not been released // yet. Once it is released, we should consider using it. // See: https://github.com/Azure/azure-sdk-for-rust/pull/673 - let creds = ImdsManagedIdentityCredential {}; + let creds = ImdsManagedIdentityCredential::default(); let bearer_token = creds.get_token("https://storage.azure.com/").await?; let expires_on = bearer_token.expires_on; @@ -653,39 +710,74 @@ async fn get_azure_storage_client_with_managed_key( Ok((client, Some(expires_on))) } -fn get_azure_storage_client_from_key( +fn get_azure_storage_emulator_client( + blob_storage: &str, azure_account: &str, azure_key: &str, ) -> Result<(Arc, Option>)> { let http_client = azure_core::new_http_client(); + let blob_storage_url = Url::parse(blob_storage).unwrap(); + + // We care only about the blob service but the API expects URLs to the other ones as well. + // So, we just pass the default values. + let queue_storage_url = Url::parse("http://127.0.0.1:10001").unwrap(); + let table_storage_url = Url::parse("http://127.0.0.1:10002").unwrap(); + let filesystem_url = Url::parse("http://127.0.0.1:10004").unwrap(); + validate_azure_key(azure_key)?; Ok(( - StorageAccountClient::new_access_key(http_client, azure_account, azure_key) - .as_storage_client(), + StorageAccountClient::new_emulator_with_account( + http_client, + &blob_storage_url, + &table_storage_url, + &queue_storage_url, + &filesystem_url, + azure_account, + azure_key, + ) + .as_storage_client(), None, )) } -fn get_azure_storage_client_from_env() -> Result<(Arc, Option>)> { +fn get_azure_storage_client_from_key( + endpoint: Option, + azure_account: &str, + azure_key: &str, +) -> Result<(Arc, Option>)> { let http_client = azure_core::new_http_client(); - let storage_client = match env::var("AZURE_CONNECTION_STRING") { - Ok(connection_string) => { + validate_azure_key(azure_key)?; + + match endpoint { + Some(endpoint) => get_azure_storage_emulator_client(&endpoint, azure_account, azure_key), + None => Ok(( + StorageAccountClient::new_access_key(http_client, azure_account, azure_key) + .as_storage_client(), + None, + )), + } +} + +fn get_azure_storage_client_from_env( + endpoint: Option, +) -> Result<(Arc, Option>)> { + let http_client = azure_core::new_http_client(); + match env::var("AZURE_CONNECTION_STRING") { + Ok(connection_string) => Ok(( StorageAccountClient::new_connection_string(http_client.clone(), &connection_string)? - .as_storage_client() - } + .as_storage_client(), + None, + )), Err(_) => { let azure_account = env::var("AZURE_ACCOUNT")?; let azure_key = env::var("AZURE_KEY")?; - validate_azure_key(&azure_key)?; - StorageAccountClient::new_access_key(http_client, azure_account, azure_key) - .as_storage_client() + get_azure_storage_client_from_key(endpoint, &azure_account, &azure_key) } - }; - - Ok((storage_client, None)) + } } + fn get_credentials_file() -> Result { let home_dir = dirs_next::home_dir(); if home_dir.is_none() { @@ -720,6 +812,7 @@ fn get_credentials_file() -> Result { } fn get_azure_storage_client_from_profile_key( + endpoint: Option, credentials_profile: String, ) -> Result<(Arc, Option>)> { let ini_file = get_credentials_file()?; @@ -744,14 +837,7 @@ fn get_azure_storage_client_from_profile_key( Some(azure_key) => azure_key, }; - validate_azure_key(azure_key)?; - - let http_client = azure_core::new_http_client(); - Ok(( - StorageAccountClient::new_access_key(http_client, azure_account, azure_key) - .as_storage_client(), - None, - )) + get_azure_storage_client_from_key(endpoint, azure_account, azure_key) } /// Create a StorageClient after getting credentials the following sources in order: @@ -760,10 +846,11 @@ fn get_azure_storage_client_from_profile_key( /// 3. managed identities. /// Once credentials have been successfully obtained from a source, we do not try the rest of the /// sources even if the credentials are invalid. -async fn get_azure_storage_client_automatic() -> Result<(Arc, Option>)> -{ - match get_azure_storage_client_from_env() - .or_else(|_| get_azure_storage_client_from_profile_key("default".to_string())) +async fn get_azure_storage_client_automatic( + endpoint: Option, +) -> Result<(Arc, Option>)> { + match get_azure_storage_client_from_env(endpoint.clone()) + .or_else(|_| get_azure_storage_client_from_profile_key(endpoint, "default".to_string())) { Ok(tuple) => Ok(tuple), Err(_) => get_azure_storage_client_with_managed_key_profile("default".to_string()).await, @@ -771,6 +858,7 @@ async fn get_azure_storage_client_automatic() -> Result<(Arc, Opt } async fn get_azure_storage_client( + endpoint: Option, credentials: BlobCredentials, ) -> Result<(Arc, Option>)> { match credentials { @@ -781,7 +869,7 @@ async fn get_azure_storage_client( // be fetched via Managed Identity Credential. This are similar to // BlobCredentials::Key and BlobCredentials::ManagedCredentials respectively, except for // the fact that it is passed via an ini file. We have to try both methods. - match get_azure_storage_client_from_profile_key(profile.clone()) { + match get_azure_storage_client_from_profile_key(endpoint.clone(), profile.clone()) { Ok(tuple) => Ok(tuple), Err(_) => get_azure_storage_client_with_managed_key_profile(profile).await, } @@ -790,12 +878,15 @@ async fn get_azure_storage_client( azure_account, azure_key, } => Ok(get_azure_storage_client_from_key( + endpoint, &azure_account, &azure_key, )?), BlobCredentials::ManagedCredentials { azure_account } => { Ok(get_azure_storage_client_with_managed_key(&azure_account).await?) } - BlobCredentials::Automatic => Ok(get_azure_storage_client_automatic().await?), + BlobCredentials::Automatic => { + Ok(get_azure_storage_client_automatic(endpoint.clone()).await?) + } } } diff --git a/cmd/zfs_object_agent/zettaobject/src/object_access/mod.rs b/cmd/zfs_object_agent/zettaobject/src/object_access/mod.rs index 4ee7fa21b842..4b27d99a7102 100644 --- a/cmd/zfs_object_agent/zettaobject/src/object_access/mod.rs +++ b/cmd/zfs_object_agent/zettaobject/src/object_access/mod.rs @@ -173,6 +173,7 @@ pub enum ObjectAccessProtocol { credentials: S3Credentials, }, Blob { + endpoint: Option, credentials: BlobCredentials, }, } @@ -200,6 +201,7 @@ impl<'de> Deserialize<'de> for ObjectAccessProtocol { credentials: S3Credentials, }, Blob { + endpoint: Option, credentials: BlobCredentials, }, } @@ -212,6 +214,7 @@ impl<'de> Deserialize<'de> for ObjectAccessProtocol { credentials_profile: Option, }, Blob { + endpoint: Option, credentials_profile: Option, }, } @@ -239,9 +242,13 @@ impl<'de> Deserialize<'de> for ObjectAccessProtocol { region, credentials, }), - Upgrader::Serialized(Serialized::Blob { credentials }) => { - Ok(Self::Blob { credentials }) - } + Upgrader::Serialized(Serialized::Blob { + credentials, + endpoint, + }) => Ok(Self::Blob { + credentials, + endpoint, + }), Upgrader::S3Legacy { endpoint, region, @@ -261,8 +268,10 @@ impl<'de> Deserialize<'de> for ObjectAccessProtocol { credentials: credentials_profile.into(), }), Upgrader::Socket(Socket::Blob { + endpoint, credentials_profile, }) => Ok(Self::Blob { + endpoint, credentials: credentials_profile.into(), }), } @@ -295,8 +304,11 @@ impl ObjectAccess { S3ObjectAccess::new(&endpoint, ®ion, &bucket, credentials), readonly, )), - ObjectAccessProtocol::Blob { credentials } => { - let oa = BlobObjectAccess::new(&bucket, credentials).await?; + ObjectAccessProtocol::Blob { + credentials, + endpoint, + } => { + let oa = BlobObjectAccess::new(endpoint, &bucket, credentials).await?; Ok(ObjectAccess::from_blob(oa, readonly)) } } @@ -359,6 +371,7 @@ impl ObjectAccess { }, }, ObjectAccessEnum::Blob(oa) => ObjectAccessProtocol::Blob { + endpoint: oa.endpoint(), credentials: match oa.credentials_profile() { Some(profile) => BlobCredentials::Profile(profile), None => BlobCredentials::Automatic, @@ -545,8 +558,11 @@ impl BucketAccess { inner: BucketAccessEnum::S3(ba), })) } - ObjectAccessProtocol::Blob { credentials } => { - let ba = BlobBucketAccess::new(credentials).await?; + ObjectAccessProtocol::Blob { + credentials, + endpoint, + } => { + let ba = BlobBucketAccess::new(endpoint, credentials).await?; Ok(Arc::new(BucketAccess { inner: BucketAccessEnum::Azure(ba), })) @@ -624,6 +640,7 @@ pub enum RequestError { InvalidCredentials, /// The request time and the server time were too far out of sync TimeSkew, + EmulatorBug(String), } impl Display for RequestError { @@ -636,6 +653,7 @@ impl Display for RequestError { RequestError::ExpiredCredentials => f.write_str("Expired credentials"), RequestError::InvalidCredentials => f.write_str("Invalid credentials"), RequestError::TimeSkew => f.write_str("Request time too skewed"), + RequestError::EmulatorBug(s) => s.fmt(f), } } } diff --git a/cmd/zfs_object_agent/zettaobject/src/object_access/s3.rs b/cmd/zfs_object_agent/zettaobject/src/object_access/s3.rs index 2ce1c9491c0d..a922a658ce7a 100644 --- a/cmd/zfs_object_agent/zettaobject/src/object_access/s3.rs +++ b/cmd/zfs_object_agent/zettaobject/src/object_access/s3.rs @@ -165,6 +165,7 @@ impl S3ObjectAccess { RequestError::ExpiredCredentials => RequestError::ExpiredCredentials, RequestError::InvalidCredentials => RequestError::InvalidCredentials, RequestError::TimeSkew => RequestError::TimeSkew, + RequestError::EmulatorBug(s) => RequestError::EmulatorBug(s), } } diff --git a/cmd/zfs_object_agent/zettaobject/src/pool_destroy.rs b/cmd/zfs_object_agent/zettaobject/src/pool_destroy.rs index 190eee6b6318..ecb75186968d 100644 --- a/cmd/zfs_object_agent/zettaobject/src/pool_destroy.rs +++ b/cmd/zfs_object_agent/zettaobject/src/pool_destroy.rs @@ -11,7 +11,6 @@ use anyhow::Result; use futures::stream::StreamExt; use lazy_static::lazy_static; use log::*; -use nvpair::NvList; use serde::Deserialize; use serde::Serialize; use tokio::fs; @@ -64,8 +63,8 @@ struct DestroyingCachePhys { pools: HashMap, } -#[derive(Serialize, Debug)] -struct DestroyingPool { +#[derive(Serialize, Debug, Clone)] +pub struct DestroyingPool { #[serde(flatten)] cache_phys: DestroyingCacheItemPhys, #[serde(flatten)] @@ -137,17 +136,6 @@ impl DestroyingPoolsMap { }); } } - - fn to_nvlist(&self) -> NvList { - let mut nvl = NvList::new_unique_names(); - - for (guid, destroying_pool) in self.pools.iter() { - let nvl_item = nvpair::to_nvlist(destroying_pool).unwrap(); - nvl.insert(format!("{}", guid), nvl_item.as_ref()).unwrap(); - } - - nvl - } } #[derive(Default, Debug)] @@ -438,7 +426,7 @@ pub async fn resume_destroy(object_access: Arc, guid: PoolGuid) -> /// Retrieve the PoolDestroyer's list of pools that are either being destroyed or have been /// destroyed. -pub async fn get_destroy_list() -> NvList { +pub async fn get_destroy_list() -> HashMap { maybe_die_with(|| "in get_destroy_list"); let maybe_pool_destroyer = POOL_DESTROYER.lock().await; @@ -446,7 +434,8 @@ pub async fn get_destroy_list() -> NvList { .as_ref() .unwrap() .destroying_pools_map - .to_nvlist() + .pools + .clone() } /// Remove pools that have been successfully destroyed from the PoolDestroyer's list of pools. diff --git a/cmd/zfs_object_agent/zettaobject/src/public_connection.rs b/cmd/zfs_object_agent/zettaobject/src/public_connection.rs index 30a8da0d72b5..d8a591f1e004 100644 --- a/cmd/zfs_object_agent/zettaobject/src/public_connection.rs +++ b/cmd/zfs_object_agent/zettaobject/src/public_connection.rs @@ -1,8 +1,8 @@ +use std::collections::HashMap; use std::path::Path; use std::sync::Arc; use std::sync::Mutex; -use anyhow::Result; use futures::stream::StreamExt; use log::*; use nvpair::NvList; @@ -21,6 +21,7 @@ use crate::object_access::ObjectAccess; use crate::object_access::ObjectAccessProtocol; use crate::pool::*; use crate::pool_destroy; +use crate::pool_destroy::DestroyingPool; use crate::server::return_ok; use crate::server::ConnectionState; use crate::server::HandlerReturn; @@ -90,102 +91,57 @@ impl PublicConnectionState { fn get_pools(&mut self, nvl: NvList) -> HandlerReturn { info!("got request: {:?}", nvl); - Ok(Box::pin(async move { Self::get_pools_impl(nvl).await })) - } - - async fn get_pools_impl(nvl: NvList) -> Result> { - #[derive(Debug, Deserialize)] - struct GetPoolsRequest { - #[serde(flatten)] - protocol: ObjectAccessProtocol, - #[serde(default)] - readonly: bool, - bucket: Option, - guid: Option, - } - let request: GetPoolsRequest = nvpair::from_nvlist(&nvl)?; - - let bucket_access = BucketAccess::new(request.protocol.clone()).await?; + Ok(Box::pin(async move { + #[derive(Debug, Deserialize)] + struct GetPoolsRequest { + #[serde(flatten)] + protocol: ObjectAccessProtocol, + bucket: Option, + guid: Option, + } + let request: GetPoolsRequest = nvpair::from_nvlist(&nvl)?; + let bucket_access = BucketAccess::new(request.protocol.clone()).await?; + let buckets = if let Some(bucket) = request.bucket { + vec![bucket] + } else { + bucket_access.list_buckets().await + }; - let mut buckets = vec![]; - if let Some(bucket) = request.bucket { - buckets.push(bucket); - } else { - buckets.append(&mut bucket_access.list_buckets().await); - } + maybe_die_with(|| "in get_pools_impl"); - maybe_die_with(|| "in get_pools_impl"); - let response = Arc::new(Mutex::new(NvList::new_unique_names())); - for buck in buckets { - let object_access = - ObjectAccess::new(request.protocol.clone(), buck, request.readonly).await?; - if let Some(guid) = request.guid { - if !Pool::exists(&object_access, PoolGuid(guid)).await { - continue; - } - - match Pool::get_config(&object_access, PoolGuid(guid)).await { - Ok(pool_config) => { - let mut owned_response = - Arc::try_unwrap(response).unwrap().into_inner().unwrap(); - owned_response - .insert(format!("{}", guid), pool_config.as_ref()) - .unwrap(); - debug!("sending response: {:?}", owned_response); - return Ok(Some(owned_response)); - } - Err(e) => { - error!("skipping {:?}: {:?}", guid, e); - continue; - } + let pools = Arc::new(Mutex::new(NvList::new_unique_names())); + for buck in buckets { + let object_access = ObjectAccess::new(request.protocol.clone(), buck, true).await?; + if let Some(guid) = request.guid { + find_pool(object_access, PoolGuid(guid), pools.clone()).await; + } else { + discover_pools(object_access, pools.clone()).await; } } - - object_access - .list_prefixes("zfs/".to_string()) - .for_each_concurrent(*GET_POOLS_QUEUE_DEPTH, |prefix| { - let my_object_access = object_access.clone(); - let my_response = response.clone(); - async move { - debug!("prefix: {}", prefix); - let split: Vec<&str> = prefix.rsplitn(3, '/').collect(); - let guid_str = split[1]; - if let Ok(guid64) = str::parse::(guid_str) { - let guid = PoolGuid(guid64); - match Pool::get_config(&my_object_access, guid).await { - Ok(pool_config) => my_response - .lock() - .unwrap() - .insert(guid_str, pool_config.as_ref()) - .unwrap(), - Err(e) => { - error!("skipping {:?}: {:?}", guid, e); - } - } - } - } - }) - .await; - } - let owned_response = Arc::try_unwrap(response).unwrap().into_inner().unwrap(); - info!("sending response: {:?}", owned_response); - Ok(Some(owned_response)) + let pools = Arc::try_unwrap(pools).unwrap().into_inner().unwrap(); + let mut response = NvList::new_unique_names(); + response.insert("pools", pools.as_ref())?; + info!("sending response: {response:?}"); + Ok(Some(response)) + })) } fn get_destroying_pools(&mut self, nvl: NvList) -> HandlerReturn { Ok(Box::pin(async move { - // XXX convert to use serde nvlist response debug!("got request: {:?}", nvl); - let pools = pool_destroy::get_destroy_list().await; - let mut response = NvList::new_unique_names(); - response - .insert(AGENT_RESPONSE_TYPE, TYPE_GET_DESTROYING_POOLS) - .unwrap(); - response.insert("pools", pools.as_ref()).unwrap(); - - debug!("sending response: {:?}", response); - Ok(Some(response)) + #[derive(Debug, Serialize)] + struct DestroyingPoolsResponse { + pools: HashMap, + } + let response = DestroyingPoolsResponse { + pools: pool_destroy::get_destroy_list() + .await + .into_iter() + .map(|(guid, destroying)| (guid.to_string(), destroying)) + .collect(), + }; + return_ok(response, true) })) } @@ -195,13 +151,7 @@ impl PublicConnectionState { debug!("got request: {:?}", nvl); pool_destroy::remove_not_in_progress().await; - let mut response = NvList::new_unique_names(); - response - .insert(AGENT_RESPONSE_TYPE, TYPE_CLEAR_DESTROYED_POOLS) - .unwrap(); - - debug!("sending response: {:?}", response); - Ok(Some(response)) + Ok(Some(NvList::new_unique_names())) })) } @@ -225,7 +175,7 @@ impl PublicConnectionState { bucket_size: phys.bucket_size, combined_histogram, }; - return_ok(TYPE_REPORT_HITS, response, true) + return_ok(response, true) })) } @@ -240,7 +190,7 @@ impl PublicConnectionState { let response = ListDevicesResponse { devices_json: serde_json::to_string(&cache.devices()).unwrap(), }; - return_ok(TYPE_LIST_DEVICES, response, true) + return_ok(response, true) })) } @@ -255,7 +205,7 @@ impl PublicConnectionState { let response = ZcacheIostatResponse { iostats_json: serde_json::to_string(&cache.io_stats()).unwrap(), }; - return_ok(TYPE_ZCACHE_IOSTAT, response, false) + return_ok(response, false) })) } @@ -270,7 +220,52 @@ impl PublicConnectionState { let response = ZcacheStatsResponse { stats_json: serde_json::to_string(&cache.stats()).unwrap(), }; - return_ok(TYPE_ZCACHE_STATS, response, false) + return_ok(response, false) })) } } + +async fn discover_pools(object_access: Arc, pools: Arc>) { + object_access + .list_prefixes("zfs/".to_string()) + .for_each_concurrent(*GET_POOLS_QUEUE_DEPTH, |prefix| { + let my_object_access = object_access.clone(); + let pools = pools.clone(); + async move { + debug!("prefix: {}", prefix); + let split = prefix.rsplitn(3, '/').collect::>(); + let guid_str = split[1]; + if let Ok(guid64) = str::parse::(guid_str) { + let guid = PoolGuid(guid64); + match Pool::get_config(&my_object_access, guid).await { + Ok(pool_config) => pools + .lock() + .unwrap() + .insert(guid_str, pool_config.as_ref()) + .unwrap(), + Err(e) => { + error!("skipping {guid:?}: {e:?}"); + } + } + } + } + }) + .await; +} + +async fn find_pool(object_access: Arc, guid: PoolGuid, pools: Arc>) { + if Pool::exists(&object_access, guid).await { + match Pool::get_config(&object_access, guid).await { + Ok(pool_config) => { + pools + .lock() + .unwrap() + .insert(format!("{}", guid), pool_config.as_ref()) + .unwrap(); + } + Err(e) => { + error!("skipping {guid:?}: {e:?}"); + } + } + } +} diff --git a/cmd/zfs_object_agent/zettaobject/src/root_connection.rs b/cmd/zfs_object_agent/zettaobject/src/root_connection.rs index c5e6fec358b2..906d2bcb5721 100644 --- a/cmd/zfs_object_agent/zettaobject/src/root_connection.rs +++ b/cmd/zfs_object_agent/zettaobject/src/root_connection.rs @@ -150,7 +150,7 @@ impl RootConnectionState { .await .map_err(|e| FailureMessage::new(e.into())); - return_result(TYPE_CREATE_POOL, request.id, result, true) + return_result(request.id, result, true) }) } @@ -269,7 +269,7 @@ impl RootConnectionState { }) } }; - return_result(TYPE_OPEN_POOL, request.id, result, true) + return_result(request.id, result, true) }) } @@ -349,13 +349,11 @@ impl RootConnectionState { .await; #[derive(Debug, Serialize)] struct EndTxgResponse { - response_type: &'static str, #[serde(flatten)] stats: PoolStatsPhys, features: HashMap, } let response = EndTxgResponse { - response_type: TYPE_END_TXG, stats, features: features .into_iter() @@ -488,7 +486,6 @@ impl RootConnectionState { } let mut response = NvList::new_unique_names(); - response.insert("response_type", TYPE_GET_STATS).unwrap(); response.insert("token", &request.token).unwrap(); response.insert("stats", nvl.as_ref()).unwrap(); @@ -516,13 +513,8 @@ impl RootConnectionState { .await; } #[derive(Debug, Serialize)] - struct ClosePoolResponse { - response_type: &'static str, - } - let response = ClosePoolResponse { - response_type: TYPE_CLOSE_POOL, - }; - return_struct(response, true) + struct ClosePoolResponse {} + return_struct(ClosePoolResponse {}, true) })) } @@ -550,14 +542,12 @@ impl RootConnectionState { #[derive(Debug, Serialize)] struct EnableFeatureResponse { - response_type: &'static str, feature: String, } let response = EnableFeatureResponse { - response_type: TYPE_ENABLE_FEATURE, feature: request.feature, }; - handler_return_struct(response, true) + Ok(Box::pin(future::ready(return_struct(response, true)))) } fn resume_destroy_pool(&mut self, nvl: NvList) -> HandlerReturn { @@ -575,22 +565,16 @@ impl RootConnectionState { let result = pool_destroy::resume_destroy(object_access, request.guid) .await .map_err(FailureMessage::new); - return_result(TYPE_RESUME_DESTROY_POOL, (), result, true) + return_result((), result, true) })) } fn clear_hit_data(&mut self, _nvl: NvList) -> HandlerReturn { let cache = self.cache.clone(); Ok(Box::pin(async move { - #[derive(Debug, Serialize)] - struct ClearHitDataResponse { - response_type: &'static str, - result: &'static str, - } - debug!("got ClearHitDataRequest"); cache.clear_hit_data().await; - return_ok(TYPE_CLEAR_HIT_DATA, (), true) + return_ok((), true) })) } @@ -604,7 +588,7 @@ impl RootConnectionState { .add_disk(&request.path) .await .map_err(FailureMessage::new); - return_result(TYPE_ADD_DISK, (), result, true) + return_result((), result, true) })) } @@ -618,7 +602,7 @@ impl RootConnectionState { .expand_disk(&request.path) .await .map_err(FailureMessage::new); - return_result(TYPE_EXPAND_DISK, (), result, true) + return_result((), result, true) })) } @@ -628,7 +612,7 @@ impl RootConnectionState { debug!("got {:?}", nvl); cache.sync_checkpoint().await; - return_ok(TYPE_SYNC_CHECKPOINT, (), true) + return_ok((), true) })) } @@ -638,7 +622,7 @@ impl RootConnectionState { debug!("got {:?}", nvl); cache.initiate_merge().await; - return_ok(TYPE_INITIATE_MERGE, (), true) + return_ok((), true) })) } } @@ -661,10 +645,3 @@ where } Ok(Some(nvl)) } - -fn handler_return_struct(response: T, debug: bool) -> HandlerReturn -where - T: Debug + Serialize, -{ - Ok(Box::pin(future::ready(return_struct(response, debug)))) -} diff --git a/cmd/zfs_object_agent/zettaobject/src/server.rs b/cmd/zfs_object_agent/zettaobject/src/server.rs index a0d480afb5d7..ed484d7c2c28 100644 --- a/cmd/zfs_object_agent/zettaobject/src/server.rs +++ b/cmd/zfs_object_agent/zettaobject/src/server.rs @@ -317,11 +317,14 @@ where with_alloctag_hf("Server::start_connection() NvList::lookup_string()", || { nvl.lookup_string(AGENT_REQUEST_TYPE) })?; - let request_type = request_type_cstr.to_str()?; - match self.nvlist_handlers.get(request_type) { + let request_type = request_type_cstr.to_str()?.to_owned(); + match self.nvlist_handlers.get(&request_type) { Some(HandlerEnum::Serial(handler)) => { let response_opt = handler(&mut state, nvl).await?; - if let Some(response) = response_opt { + if let Some(mut response) = response_opt { + response + .insert(AGENT_RESPONSE_TYPE, request_type.as_str()) + .unwrap(); responder.respond_with_nvlist(response); } } @@ -331,7 +334,10 @@ where let responder = responder.clone(); tokio::spawn(async move { match fut.await { - Ok(Some(response)) => { + Ok(Some(mut response)) => { + response + .insert(AGENT_RESPONSE_TYPE, request_type.as_str()) + .unwrap(); responder.respond_with_nvlist(response); } Ok(None) => {} @@ -513,7 +519,7 @@ pub enum FailureMessage { impl Debug for FailureMessage { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match self { - FailureMessage::Other { message } => write!(f, "{}", message), + FailureMessage::Other { message } => write!(f, "{message}"), } } } @@ -535,14 +541,14 @@ impl FailureMessage { /// (not tuple-like). FailureMessage is an example. /// /// The response nvlist will have the following nvpairs: -/// * "response_type" -> response_type (string) /// * fields from R /// * if result.is_ok(), fields from O /// * if result.is_err(), "err" -> EnumVariantName (string) /// * if result.is_err(), "errstr" -> stringified error (string) /// * if result.is_err(), fields from the varant of E +/// +/// Note that the server infrastructure will add the "response_type" nvpair pub fn return_result( - response_type: &str, request_id: R, result: Result, debug: bool, @@ -553,8 +559,7 @@ where E: Debug + Serialize, { #[derive(Debug, Serialize)] - struct Response<'a, R, O, E> { - response_type: &'a str, + struct Response { #[serde(flatten)] request: R, #[serde(flatten)] @@ -565,16 +570,15 @@ where } if let Err(e) = &result { - error!("sending failure: {:?}", e); + error!("sending failure: {e:?}"); } let (ok, errstr, err) = match result { Ok(o) => (Some(o), None, None), - Err(e) => (None, Some(format!("{:?}", e)), Some(e)), + Err(e) => (None, Some(format!("{e:?}")), Some(e)), }; let response = Response { - response_type, request: request_id, ok, err, @@ -582,9 +586,9 @@ where }; if debug { - trace!("sending response: {:?}", response); + trace!("sending response: {response:?}"); } else { - super_trace!("sending response: {:?}", response); + super_trace!("sending response: {response:?}"); } let nvl = nvpair::to_nvlist(&response)?; @@ -596,17 +600,17 @@ where } if debug { - maybe_die_with(|| format!("before sending response: {:?}", nvl)); - debug!("sending response nvl: {:?}", nvl); + maybe_die_with(|| format!("before sending response: {nvl:?}")); + debug!("sending response nvl: {nvl:?}"); } else { - super_trace!("sending response nvl: {:?}", nvl); + super_trace!("sending response nvl: {nvl:?}"); } Ok(Some(nvl)) } -pub fn return_ok(response_type: &str, response: O, debug: bool) -> Result> +pub fn return_ok(response: O, debug: bool) -> Result> where O: Debug + Serialize, { - return_result(response_type, (), Ok::<_, ()>(response), debug) + return_result((), Ok::<_, ()>(response), debug) } diff --git a/cmd/zpool/zpool_vdev.c b/cmd/zpool/zpool_vdev.c index 28cd756b9796..2ed379229432 100644 --- a/cmd/zpool/zpool_vdev.c +++ b/cmd/zpool/zpool_vdev.c @@ -277,11 +277,6 @@ make_objstore_vdev(nvlist_t *props, const char *protocol, const char *arg) "vdev %s\n"), arg); fnvlist_free(vdev); return (NULL); - } else if (err == 0 && !s3) { - fprintf(stderr, gettext("Endpoint provided for objstore " - "vdev that doesn't support endpoints %s\n"), arg); - fnvlist_free(vdev); - return (NULL); } else if (err == 0) { fnvlist_add_string(vdev, zpool_prop_to_name(ZPOOL_PROP_OBJ_ENDPOINT), endpoint); diff --git a/include/sys/spa.h b/include/sys/spa.h index 74b6ecd40cbd..84cff7c22327 100644 --- a/include/sys/spa.h +++ b/include/sys/spa.h @@ -978,7 +978,6 @@ extern void spa_config_enter(spa_t *spa, int locks, const void *tag, krw_t rw); extern void spa_config_exit(spa_t *spa, int locks, const void *tag); extern int spa_config_held(spa_t *spa, int locks, krw_t rw); extern void spa_config_enter_read_priority(spa_t *, int); -extern int spa_config_write_wanted(spa_t *, int locks); /* Pool vdev add/remove lock */ extern uint64_t spa_vdev_enter(spa_t *spa); diff --git a/lib/libshare/libshare.c b/lib/libshare/libshare.c index 7294940a27f8..7bc31d7a6823 100644 --- a/lib/libshare/libshare.c +++ b/lib/libshare/libshare.c @@ -75,12 +75,6 @@ libshare_init(void) libshare_smb_init(); } -__attribute__((destructor)) static void -libshare_fini(void) -{ - libshare_nfs_fini(); -} - int sa_enable_share(const char *zfsname, const char *mountpoint, const char *shareopts, char *protocol) diff --git a/lib/libshare/nfs.h b/lib/libshare/nfs.h index 01183bd712f5..cfac274c3d26 100644 --- a/lib/libshare/nfs.h +++ b/lib/libshare/nfs.h @@ -29,7 +29,6 @@ #define FILE_HEADER "# !!! DO NOT EDIT THIS FILE MANUALLY !!!\n\n" void libshare_nfs_init(void); -void libshare_nfs_fini(void); boolean_t nfs_is_shared_impl(const char *exports, sa_share_impl_t impl_share); int nfs_toggle_share(const char *lockfile, const char *exports, diff --git a/lib/libshare/os/linux/nfs.c b/lib/libshare/os/linux/nfs.c index 15a762fa73f8..5acfa3fb8545 100644 --- a/lib/libshare/os/linux/nfs.c +++ b/lib/libshare/os/linux/nfs.c @@ -23,7 +23,7 @@ * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved. * Copyright (c) 2011 Gunnar Beutner * Copyright (c) 2012 Cyril Plisko. All rights reserved. - * Copyright (c) 2019, 2022 by Delphix. All rights reserved. + * Copyright (c) 2019, 2020 by Delphix. All rights reserved. */ #include @@ -31,13 +31,10 @@ #include #include #include -#include -#include #include #include #include #include -#include #include #include #include @@ -48,8 +45,6 @@ #define ZFS_EXPORTS_FILE ZFS_EXPORTS_DIR"/zfs.exports" #define ZFS_EXPORTS_LOCK ZFS_EXPORTS_FILE".lock" -#define EXPORTFS_PROG "/usr/sbin/exportfs" - static sa_fstype_t *nfs_fstype; typedef int (*nfs_shareopt_callback_t)(const char *opt, const char *value, @@ -58,16 +53,6 @@ typedef int (*nfs_shareopt_callback_t)(const char *opt, const char *value, typedef int (*nfs_host_callback_t)(FILE *tmpfile, const char *sharepath, const char *host, const char *security, const char *access, void *cookie); -static boolean_t exportfs_cache_control = B_FALSE; - -typedef struct unshared_dataset_entry { - list_node_t ude_node; - char ude_path[0]; -} unshared_dataset_entry_t; - -static pthread_mutex_t unshared_dataset_lock; -static list_t unshared_dataset_list; - /* * Invokes the specified callback function for each Solaris share option * listed in the specified string. @@ -154,6 +139,10 @@ foreach_nfs_host_cb(const char *opt, const char *value, void *pcookie) nfs_host_cookie_t *udata = (nfs_host_cookie_t *)pcookie; int cidr_len; +#ifdef DEBUG + fprintf(stderr, "foreach_nfs_host_cb: key=%s, value=%s\n", opt, value); +#endif + if (strcmp(opt, "sec") == 0) udata->security = value; @@ -444,37 +433,13 @@ nfs_enable_share(sa_share_impl_t impl_share) nfs_enable_share_impl)); } -/* - * Save the mountpoint so we can flush the rpc caches after - * the shares are commited. - */ -static int -save_unshared_dataset(const char *mountpoint) -{ - pthread_mutex_lock(&unshared_dataset_lock); - unshared_dataset_entry_t *ude = - malloc(sizeof (unshared_dataset_entry_t) + - strlen(mountpoint) + 1); - if (ude == NULL) - return (SA_NO_MEMORY); - strcpy(ude->ude_path, mountpoint); - list_insert_tail(&unshared_dataset_list, ude); - pthread_mutex_unlock(&unshared_dataset_lock); - - return (SA_OK); -} - /* * Disables NFS sharing for the specified share. */ static int nfs_disable_share_impl(sa_share_impl_t impl_share, FILE *tmpfile) { - (void) tmpfile; - - if (exportfs_cache_control) - return (save_unshared_dataset(impl_share->sa_mountpoint)); - + (void) impl_share, (void) tmpfile; return (SA_OK); } @@ -527,60 +492,16 @@ nfs_clear_shareopts(sa_share_impl_t impl_share) FSINFO(impl_share, nfs_fstype)->shareopts = NULL; } -static int -exportfs_flush_cache_entry(char *mountpoint) -{ - ASSERT(exportfs_cache_control); - - char *argv[] = { - EXPORTFS_PROG, - "-F", - mountpoint, - NULL - }; - - return (libzfs_run_process(argv[0], argv, 0)); -} - -static void -flush_unshared_datasets(void) -{ - /* - * After calling exportfs to reexport our shares we need to - * flush the rpc cache entries for datasets that were unshared. - */ - unshared_dataset_entry_t *ude = NULL; - pthread_mutex_lock(&unshared_dataset_lock); - while ((ude = list_remove_head(&unshared_dataset_list)) != NULL) { - pthread_mutex_unlock(&unshared_dataset_lock); - exportfs_flush_cache_entry(ude->ude_path); - pthread_mutex_lock(&unshared_dataset_lock); - free(ude); - } - pthread_mutex_unlock(&unshared_dataset_lock); -} - -/* - * Reexport all directories including the ones maintained in our - * '/etc/exports.d/zfs.exports' file. This will remove entries - * that have been deleted from our exports file. - */ static int nfs_commit_shares(void) { - char *exportfs_args = exportfs_cache_control ? "-raN" : "-ra"; char *argv[] = { - EXPORTFS_PROG, - exportfs_args, + "/usr/sbin/exportfs", + "-ra", NULL }; - int rc = libzfs_run_process(argv[0], argv, 0); - - if (exportfs_cache_control) - flush_unshared_datasets(); - - return (rc); + return (libzfs_run_process(argv[0], argv, 0)); } static const sa_share_ops_t nfs_shareops = { @@ -594,31 +515,6 @@ static const sa_share_ops_t nfs_shareops = { .commit_shares = nfs_commit_shares, }; -static boolean_t -cache_control_supported(void) -{ - char *argv[] = {EXPORTFS_PROG, "-vh", NULL}; - char **lines = NULL; - int lines_cnt = 0; - int rc; - boolean_t supported; - - /* - * Expected output to confirm support for finer-grain cache flushing: - * - * $ exportfs -vh - * supports: no_cache_flush,flush_one_entry - * usage: exportfs [-adfFhiNoruvs] [host:/path] - */ - rc = libzfs_run_process_get_stdout_nopath(EXPORTFS_PROG, argv, NULL, - &lines, &lines_cnt); - supported = (rc == 0 && lines_cnt > 0 && - strstr(lines[0], "no_cache_flush") != NULL); - libzfs_free_str_array(lines, lines_cnt); - - return (supported); -} - /* * Initializes the NFS functionality of libshare. */ @@ -626,22 +522,4 @@ void libshare_nfs_init(void) { nfs_fstype = register_fstype("nfs", &nfs_shareops); - - exportfs_cache_control = cache_control_supported(); - if (exportfs_cache_control) { - pthread_mutex_init(&unshared_dataset_lock, NULL); - list_create(&unshared_dataset_list, - sizeof (unshared_dataset_entry_t), - offsetof(unshared_dataset_entry_t, ude_node)); - } -} - -void -libshare_nfs_fini(void) -{ - if (exportfs_cache_control) { - ASSERT(list_is_empty(&unshared_dataset_list)); - list_destroy(&unshared_dataset_list); - pthread_mutex_destroy(&unshared_dataset_lock); - } } diff --git a/lib/libzutil/zutil_import.c b/lib/libzutil/zutil_import.c index ebb1ac2a55d8..6626691da586 100644 --- a/lib/libzutil/zutil_import.c +++ b/lib/libzutil/zutil_import.c @@ -1901,8 +1901,11 @@ zpool_find_import_agent(libpc_handle_t *hdl, importargs_t *iarg, nvlist_t *resp = zoa_send_recv_msg(hdl, msg, AGENT_PUBLIC_PROTOCOL_VERSION, ZFS_PUBLIC_SOCKET, NULL); + nvlist_t *pools = NULL; + (void) nvlist_lookup_nvlist(resp, "pools", &pools); + nvpair_t *elem = NULL; - while ((elem = nvlist_next_nvpair(resp, elem)) != NULL) { + while ((elem = nvlist_next_nvpair(pools, elem)) != NULL) { avl_index_t where; rdsk_node_t *slice; nvlist_t *config; diff --git a/module/zfs/dmu_send.c b/module/zfs/dmu_send.c index af08cc02bdb0..64aecf8bf0c5 100644 --- a/module/zfs/dmu_send.c +++ b/module/zfs/dmu_send.c @@ -69,13 +69,11 @@ /* Set this tunable to TRUE to replace corrupt data with 0x2f5baddb10c */ static int zfs_send_corrupt_data = B_FALSE; /* - * This tunable controls the amount of data (measured in bytes) that will be - * prefetched by zfs send. If the main thread is blocking on reads that haven't - * completed, this variable might need to be increased. If instead the main - * thread is issuing new reads because the prefetches have fallen out of the - * cache, this may need to be decreased. + * This tunable controls the amount of memory (measured in bytes) that will be + * used to buffer data read for zfs send. If the main thread is blocking on + * reads that haven't completed, this variable might need to be increased. */ -static int zfs_send_queue_length = SPA_MAXBLOCKSIZE; +static int zfs_send_queue_length = 64 * 1024 * 1024; /* * This tunable controls the length of the queues that zfs send worker threads * use to communicate. If the send_main_thread is blocking on these queues, diff --git a/module/zfs/spa_misc.c b/module/zfs/spa_misc.c index 2853f87db3e4..ac4eaa2e4527 100644 --- a/module/zfs/spa_misc.c +++ b/module/zfs/spa_misc.c @@ -515,23 +515,6 @@ spa_config_tryenter(spa_t *spa, int locks, void *tag, krw_t rw) return (1); } -int -spa_config_write_wanted(spa_t *spa, int locks) -{ - int locks_wanted = 0; - for (int i = 0; i < SCL_LOCKS; i++) { - spa_config_lock_t *scl = &spa->spa_config_lock[i]; - if (!(locks & (1 << i))) - continue; - mutex_enter(&scl->scl_lock); - if (scl->scl_write_wanted) { - locks_wanted |= 1 << i; - } - mutex_exit(&scl->scl_lock); - } - return (locks_wanted); -} - /* * This function should only be called as an exception since it * will not check for any waiting writers and could lead to starvation. diff --git a/module/zfs/zio.c b/module/zfs/zio.c index 8a7199036b1e..d50590768307 100644 --- a/module/zfs/zio.c +++ b/module/zfs/zio.c @@ -3871,14 +3871,6 @@ zio_vdev_io_start(zio_t *zio) SCL_ZIO); object_store_update_max_blockid(zio); - - /* - * If there is a spa_config_lock WRITER - * waiting, then keep flushing out the max - * I/O that has been issued. - */ - if (spa_config_write_wanted(spa, SCL_ZIO)) - object_store_flush_locked_writes(spa); } else { spa_config_enter(spa, SCL_ZIO, zio, RW_READER); }