diff --git a/CHANGELOG.md b/CHANGELOG.md index 4f48aa92c1cc19..45a7017c492d88 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. - aws_s3: Add `filename_extension` options. - aws_cloudwatch_logs: `stream_name` now accepts `{{key}}` synatx for extracting values from events. +- file source: `data_dir` now falls back to global `data_dir` option if not specified ### Changed - [configuration] Empty inputs are treated as errors instead of warnings [#506] diff --git a/src/sources/file.rs b/src/sources/file.rs index 6bbd99e0eba032..ba599269a7026e 100644 --- a/src/sources/file.rs +++ b/src/sources/file.rs @@ -1,6 +1,6 @@ use crate::{ event::{self, Event}, - topology::config::{DataType, SourceConfig}, + topology::config::{DataType, GlobalOptions, SourceConfig}, }; use bytes::Bytes; use file_source::FileServer; @@ -24,7 +24,7 @@ pub struct FileConfig { pub fingerprint_bytes: usize, pub ignored_header_bytes: usize, pub host_key: Option, - pub data_dir: PathBuf, + pub data_dir: Option, pub glob_minimum_cooldown: u64, // millis } @@ -44,7 +44,7 @@ impl Default for FileConfig { fingerprint_bytes: 256, ignored_header_bytes: 0, host_key: None, - data_dir: PathBuf::new(), + data_dir: None, glob_minimum_cooldown: 1000, // millis } } @@ -52,9 +52,31 @@ impl Default for FileConfig { #[typetag::serde(name = "file")] impl SourceConfig for FileConfig { - fn build(&self, out: mpsc::Sender) -> Result { - // TODO: validate paths - Ok(file_source(self, out)) + fn build( + &self, + globals: &GlobalOptions, + out: mpsc::Sender, + ) -> Result { + let data_dir = match &self.data_dir { + Some(v) => v.clone(), + None => match &globals.data_dir { + Some(v) => v.clone(), + None => { + return Err("data_dir option required, but not given here or globally".into()) + } + }, + }; + if !data_dir.exists() { + return Err("data_dir '{}' does not exist".into()); + } + let readonly = std::fs::metadata(&data_dir) + .map(|meta| meta.permissions().readonly()) + .unwrap_or(true); + if readonly { + return Err("data_dir '{}' is not writable".into()); + } + + Ok(file_source(self, data_dir, out)) } fn output_type(&self) -> DataType { @@ -62,7 +84,11 @@ impl SourceConfig for FileConfig { } } -pub fn file_source(config: &FileConfig, out: mpsc::Sender) -> super::Source { +pub fn file_source( + config: &FileConfig, + data_dir: PathBuf, + out: mpsc::Sender, +) -> super::Source { let (shutdown_tx, shutdown_rx) = std::sync::mpsc::channel(); let ignore_before = config @@ -79,7 +105,7 @@ pub fn file_source(config: &FileConfig, out: mpsc::Sender) -> super::Sour max_line_bytes: config.max_line_bytes, fingerprint_bytes: config.fingerprint_bytes, ignored_header_bytes: config.ignored_header_bytes, - data_dir: config.data_dir.clone(), + data_dir, glob_minimum_cooldown: glob_minimum_cooldown, }; @@ -90,7 +116,7 @@ pub fn file_source(config: &FileConfig, out: mpsc::Sender) -> super::Sour let out = out .sink_map_err(|_| ()) .with(move |(line, file): (Bytes, String)| { - trace!(message = "Recieved one event.", file = file.as_str()); + trace!(message = "Received one event.", file = file.as_str()); let event = create_event(line, file, &host_key, &hostname, &file_key); @@ -164,7 +190,7 @@ mod tests { fn test_default_file_config(dir: &tempfile::TempDir) -> file::FileConfig { file::FileConfig { fingerprint_bytes: 8, - data_dir: dir.path().to_path_buf(), + data_dir: Some(dir.path().to_path_buf()), glob_minimum_cooldown: 0, // millis ..Default::default() } @@ -212,7 +238,7 @@ mod tests { ..test_default_file_config(&dir) }; - let source = file::file_source(&config, tx); + let source = file::file_source(&config, config.data_dir.clone().unwrap(), tx); let mut rt = tokio::runtime::Runtime::new().unwrap(); @@ -273,7 +299,7 @@ mod tests { include: vec![dir.path().join("*")], ..test_default_file_config(&dir) }; - let source = file::file_source(&config, tx); + let source = file::file_source(&config, config.data_dir.clone().unwrap(), tx); let mut rt = tokio::runtime::Runtime::new().unwrap(); @@ -342,7 +368,7 @@ mod tests { include: vec![dir.path().join("*")], ..test_default_file_config(&dir) }; - let source = file::file_source(&config, tx); + let source = file::file_source(&config, config.data_dir.clone().unwrap(), tx); let mut rt = tokio::runtime::Runtime::new().unwrap(); @@ -414,7 +440,7 @@ mod tests { ..test_default_file_config(&dir) }; - let source = file::file_source(&config, tx); + let source = file::file_source(&config, config.data_dir.clone().unwrap(), tx); let mut rt = tokio::runtime::Runtime::new().unwrap(); @@ -476,7 +502,7 @@ mod tests { ..test_default_file_config(&dir) }; - let source = file::file_source(&config, tx); + let source = file::file_source(&config, config.data_dir.clone().unwrap(), tx); rt.spawn(source.select(tripwire.clone()).map(|_| ()).map_err(|_| ())); @@ -506,7 +532,7 @@ mod tests { ..test_default_file_config(&dir) }; - let source = file::file_source(&config, tx); + let source = file::file_source(&config, config.data_dir.clone().unwrap(), tx); rt.spawn(source.select(tripwire.clone()).map(|_| ()).map_err(|_| ())); @@ -536,7 +562,7 @@ mod tests { ..test_default_file_config(&dir) }; - let source = file::file_source(&config, tx); + let source = file::file_source(&config, config.data_dir.clone().unwrap(), tx); rt.spawn(source.select(tripwire.clone()).map(|_| ()).map_err(|_| ())); @@ -582,7 +608,7 @@ mod tests { // First time server runs it picks up existing lines. { let (tx, rx) = futures::sync::mpsc::channel(10); - let source = file::file_source(&config, tx); + let source = file::file_source(&config, config.data_dir.clone().unwrap(), tx); let mut rt = tokio::runtime::Runtime::new().unwrap(); let (trigger, tripwire) = Tripwire::new(); rt.spawn(source.select(tripwire).map(|_| ()).map_err(|_| ())); @@ -604,7 +630,7 @@ mod tests { // Restart server, read file from checkpoint. { let (tx, rx) = futures::sync::mpsc::channel(10); - let source = file::file_source(&config, tx); + let source = file::file_source(&config, config.data_dir.clone().unwrap(), tx); let mut rt = tokio::runtime::Runtime::new().unwrap(); let (trigger, tripwire) = Tripwire::new(); rt.spawn(source.select(tripwire).map(|_| ()).map_err(|_| ())); @@ -631,7 +657,7 @@ mod tests { ..test_default_file_config(&dir) }; let (tx, rx) = futures::sync::mpsc::channel(10); - let source = file::file_source(&config, tx); + let source = file::file_source(&config, config.data_dir.clone().unwrap(), tx); let mut rt = tokio::runtime::Runtime::new().unwrap(); let (trigger, tripwire) = Tripwire::new(); rt.spawn(source.select(tripwire).map(|_| ()).map_err(|_| ())); @@ -667,7 +693,7 @@ mod tests { // Run server first time, collect some lines. { let (tx, rx) = futures::sync::mpsc::channel(10); - let source = file::file_source(&config, tx); + let source = file::file_source(&config, config.data_dir.clone().unwrap(), tx); let mut rt = tokio::runtime::Runtime::new().unwrap(); let (trigger, tripwire) = Tripwire::new(); rt.spawn(source.select(tripwire).map(|_| ()).map_err(|_| ())); @@ -693,7 +719,7 @@ mod tests { // even though it has a new name. { let (tx, rx) = futures::sync::mpsc::channel(10); - let source = file::file_source(&config, tx); + let source = file::file_source(&config, config.data_dir.clone().unwrap(), tx); let mut rt = tokio::runtime::Runtime::new().unwrap(); let (trigger, tripwire) = Tripwire::new(); rt.spawn(source.select(tripwire).map(|_| ()).map_err(|_| ())); @@ -731,7 +757,7 @@ mod tests { ..test_default_file_config(&dir) }; - let source = file::file_source(&config, tx); + let source = file::file_source(&config, config.data_dir.clone().unwrap(), tx); rt.spawn(source.select(tripwire).map(|_| ()).map_err(|_| ())); @@ -816,7 +842,7 @@ mod tests { ..test_default_file_config(&dir) }; - let source = file::file_source(&config, tx); + let source = file::file_source(&config, config.data_dir.clone().unwrap(), tx); let mut rt = tokio::runtime::Runtime::new().unwrap(); diff --git a/src/sources/statsd/mod.rs b/src/sources/statsd/mod.rs index e1bf17b55f990d..aed962325326a6 100644 --- a/src/sources/statsd/mod.rs +++ b/src/sources/statsd/mod.rs @@ -1,4 +1,4 @@ -use crate::Event; +use crate::{topology::config::GlobalOptions, Event}; use futures::{future, sync::mpsc, Future, Sink, Stream}; use parser::parse; use serde::{Deserialize, Serialize}; @@ -19,7 +19,11 @@ struct StatsdConfig { #[typetag::serde(name = "statsd")] impl crate::topology::config::SourceConfig for StatsdConfig { - fn build(&self, out: mpsc::Sender) -> Result { + fn build( + &self, + _globals: &GlobalOptions, + out: mpsc::Sender, + ) -> Result { Ok(statsd(self.address.clone(), out)) } diff --git a/src/sources/stdin.rs b/src/sources/stdin.rs index c4953a0bfa9e05..506e51124314e0 100644 --- a/src/sources/stdin.rs +++ b/src/sources/stdin.rs @@ -1,6 +1,6 @@ use crate::{ event::{self, Event}, - topology::config::{DataType, SourceConfig}, + topology::config::{DataType, GlobalOptions, SourceConfig}, }; use bytes::Bytes; use codec::BytesDelimitedCodec; @@ -34,7 +34,11 @@ fn default_max_length() -> usize { #[typetag::serde(name = "stdin")] impl SourceConfig for StdinConfig { - fn build(&self, out: mpsc::Sender) -> Result { + fn build( + &self, + _globals: &GlobalOptions, + out: mpsc::Sender, + ) -> Result { Ok(stdin_source(stdin(), self.clone(), out)) } diff --git a/src/sources/syslog.rs b/src/sources/syslog.rs index 556ac8929fb414..76b0cb0369135f 100644 --- a/src/sources/syslog.rs +++ b/src/sources/syslog.rs @@ -1,7 +1,7 @@ use super::util::TcpSource; use crate::{ event::{self, Event}, - topology::config::{DataType, SourceConfig}, + topology::config::{DataType, GlobalOptions, SourceConfig}, }; use bytes::Bytes; use chrono::{TimeZone, Utc}; @@ -53,7 +53,11 @@ impl SyslogConfig { #[typetag::serde(name = "syslog")] impl SourceConfig for SyslogConfig { - fn build(&self, out: mpsc::Sender) -> Result { + fn build( + &self, + _globals: &GlobalOptions, + out: mpsc::Sender, + ) -> Result { let host_key = self.host_key.clone().unwrap_or(event::HOST.to_string()); match self.mode.clone() { diff --git a/src/sources/tcp.rs b/src/sources/tcp.rs index fffa7f84751a41..1e2d0b864802dd 100644 --- a/src/sources/tcp.rs +++ b/src/sources/tcp.rs @@ -1,7 +1,7 @@ use super::util::TcpSource; use crate::{ event::{self, Event}, - topology::config::{DataType, SourceConfig}, + topology::config::{DataType, GlobalOptions, SourceConfig}, }; use bytes::Bytes; use codec::{self, BytesDelimitedCodec}; @@ -43,7 +43,11 @@ impl TcpConfig { #[typetag::serde(name = "tcp")] impl SourceConfig for TcpConfig { - fn build(&self, out: mpsc::Sender) -> Result { + fn build( + &self, + _globals: &GlobalOptions, + out: mpsc::Sender, + ) -> Result { let tcp = RawTcpSource { config: self.clone(), }; @@ -95,7 +99,7 @@ mod test { use super::TcpConfig; use crate::event; use crate::test_util::{block_on, next_addr, send_lines, wait_for_tcp}; - use crate::topology::config::SourceConfig; + use crate::topology::config::{GlobalOptions, SourceConfig}; use futures::sync::mpsc; use futures::Stream; @@ -105,7 +109,9 @@ mod test { let addr = next_addr(); - let server = TcpConfig::new(addr).build(tx).unwrap(); + let server = TcpConfig::new(addr) + .build(&GlobalOptions::default(), tx) + .unwrap(); let mut rt = tokio::runtime::Runtime::new().unwrap(); rt.spawn(server); wait_for_tcp(addr); @@ -147,7 +153,7 @@ mod test { let mut config = TcpConfig::new(addr); config.max_length = 10; - let server = config.build(tx).unwrap(); + let server = config.build(&GlobalOptions::default(), tx).unwrap(); let mut rt = tokio::runtime::Runtime::new().unwrap(); rt.spawn(server); wait_for_tcp(addr); diff --git a/src/sources/vector.rs b/src/sources/vector.rs index 529769a5799a24..d2ed72641e94e5 100644 --- a/src/sources/vector.rs +++ b/src/sources/vector.rs @@ -1,7 +1,7 @@ use super::util::TcpSource; use crate::{ event::proto, - topology::config::{DataType, SourceConfig}, + topology::config::{DataType, GlobalOptions, SourceConfig}, Event, }; use bytes::{Bytes, BytesMut}; @@ -35,7 +35,11 @@ impl VectorConfig { #[typetag::serde(name = "vector")] impl SourceConfig for VectorConfig { - fn build(&self, out: mpsc::Sender) -> Result { + fn build( + &self, + _globals: &GlobalOptions, + out: mpsc::Sender, + ) -> Result { let vector = VectorSource; vector.run(self.address, self.shutdown_timeout_secs, out) } @@ -79,7 +83,7 @@ mod test { buffers::Acker, sinks::vector::vector, test_util::{next_addr, wait_for_tcp, CollectCurrent}, - topology::config::SourceConfig, + topology::config::{GlobalOptions, SourceConfig}, Event, }; use futures::{stream, sync::mpsc, Future, Sink}; @@ -89,7 +93,9 @@ mod test { let (tx, rx) = mpsc::channel(100); let addr = next_addr(); - let server = VectorConfig::new(addr.clone()).build(tx).unwrap(); + let server = VectorConfig::new(addr.clone()) + .build(&GlobalOptions::default(), tx) + .unwrap(); let mut rt = tokio::runtime::Runtime::new().unwrap(); rt.spawn(server); wait_for_tcp(addr); diff --git a/src/topology/builder.rs b/src/topology/builder.rs index 6f983f107a697c..ae14490385a992 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -1,5 +1,5 @@ use super::fanout::{self, Fanout}; -use crate::buffers; +use crate::{buffers, topology::config::GlobalOptions}; use futures::{sync::mpsc, Future, Stream}; use std::{collections::HashMap, time::Duration}; use stream_cancel::{Trigger, Tripwire}; @@ -28,13 +28,15 @@ pub fn build_pieces(config: &super::Config) -> Result<(Pieces, Vec), Vec let mut errors = vec![]; let mut warnings = vec![]; + let globals = GlobalOptions::from(config); + // Build sources for (name, source) in &config.sources { let (tx, rx) = mpsc::channel(1000); - let server = match source.build(tx) { + let server = match source.build(&globals, tx) { Err(error) => { - errors.push(format!("Transform \"{}\": {}", name, error)); + errors.push(format!("Source \"{}\": {}", name, error)); continue; } Ok(server) => server, diff --git a/src/topology/config/mod.rs b/src/topology/config/mod.rs index 2cc3f8209ed9ad..6f8cb3bd210a7d 100644 --- a/src/topology/config/mod.rs +++ b/src/topology/config/mod.rs @@ -17,6 +17,19 @@ pub struct Config { pub transforms: IndexMap, } +#[derive(Default)] +pub struct GlobalOptions { + pub data_dir: Option, +} + +impl GlobalOptions { + pub fn from(config: &Config) -> Self { + Self { + data_dir: config.data_dir.clone(), + } + } +} + #[derive(Debug, Clone, PartialEq)] pub enum DataType { Any, @@ -26,7 +39,11 @@ pub enum DataType { #[typetag::serde(tag = "type")] pub trait SourceConfig: core::fmt::Debug { - fn build(&self, out: mpsc::Sender) -> Result; + fn build( + &self, + _globals: &GlobalOptions, + out: mpsc::Sender, + ) -> Result; fn output_type(&self) -> DataType; } diff --git a/tests/crash.rs b/tests/crash.rs index 076aa909c0e677..3f5c8690993b3a 100644 --- a/tests/crash.rs +++ b/tests/crash.rs @@ -6,7 +6,10 @@ use vector::{ block_on, next_addr, random_lines, receive, runtime, send_lines, shutdown_on_idle, wait_for_tcp, }, - topology::{self, config}, + topology::{ + self, + config::{self, GlobalOptions}, + }, Event, {sinks, sources}, }; @@ -152,7 +155,11 @@ struct ErrorSourceConfig; #[typetag::serde(name = "tcp")] impl config::SourceConfig for ErrorSourceConfig { - fn build(&self, _out: mpsc::Sender) -> Result { + fn build( + &self, + _globals: &GlobalOptions, + _out: mpsc::Sender, + ) -> Result { Ok(Box::new(future::err(()))) } @@ -204,7 +211,11 @@ struct PanicSourceConfig; #[typetag::serde(name = "tcp")] impl config::SourceConfig for PanicSourceConfig { - fn build(&self, _out: mpsc::Sender) -> Result { + fn build( + &self, + _globals: &GlobalOptions, + _out: mpsc::Sender, + ) -> Result { Ok(Box::new(future::lazy::<_, future::FutureResult<(), ()>>( || panic!(), )))