diff --git a/.gitignore b/.gitignore index 217e9a736a388..b4aeded449e46 100644 --- a/.gitignore +++ b/.gitignore @@ -6,4 +6,5 @@ sample.log miniodat .DS_Store .idea/ -checkpoints/* \ No newline at end of file +checkpoints/* +*.iml \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index df4eaa51b4144..53d80313466d4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. - aws_cloudwatch_logs: `stream_name` now accepts `{{key}}` synatx for extracting values from events. - aws_cloudwatch_logs: retry support added and more stablity improvements - coercer: New transform to convert fields into specified types. +- file source: `data_dir` now falls back to global `data_dir` option if not specified - aws_kinesis_streams: Added configurable partition keys ### Changed diff --git a/config/vector.toml b/config/vector.toml index 219e269fd215b..40238bd94dbb6 100644 --- a/config/vector.toml +++ b/config/vector.toml @@ -32,4 +32,5 @@ data_dir = "/var/lib/vector" # Output data [sinks.print] + inputs = ["apache_parser"] type = "console" \ No newline at end of file diff --git a/lib/file-source/src/file_server.rs b/lib/file-source/src/file_server.rs index 49ef362e65d9a..319b65e000a63 100644 --- a/lib/file-source/src/file_server.rs +++ b/lib/file-source/src/file_server.rs @@ -17,7 +17,7 @@ use tracing::field; /// event notification is used by `FileServer`. /// /// `FileServer` is configured on a path to watch. The files do _not_ need to -/// exist at cernan startup. `FileServer` will discover new files which match +/// exist at startup. `FileServer` will discover new files which match /// its path in at most 60 seconds. pub struct FileServer { pub include: Vec, @@ -40,7 +40,7 @@ pub struct FileServer { /// versa but there's no one perfect approach. Very fast files _will_ be lost if /// your system aggressively rolls log files. `FileServer` will keep a file /// handler open but should your system move so quickly that a file disappears -/// before cernan is able to open it the contents will be lost. This should be a +/// before `FileServer` is able to open it the contents will be lost. This should be a /// rare occurence. /// /// Specific operating systems support evented interfaces that correct this diff --git a/lib/file-source/src/file_watcher.rs b/lib/file-source/src/file_watcher.rs index 72ae844002bdf..0b57f837afac8 100644 --- a/lib/file-source/src/file_watcher.rs +++ b/lib/file-source/src/file_watcher.rs @@ -27,7 +27,7 @@ impl FileWatcher { /// /// The input path will be used by `FileWatcher` to prime its state /// machine. A `FileWatcher` tracks _only one_ file. This function returns - /// None if the path does not exist or is not readable by cernan. + /// None if the path does not exist or is not readable by the current process. pub fn new( path: PathBuf, file_position: FilePosition, diff --git a/src/sources/file.rs b/src/sources/file.rs index a937bad32bdf9..74374625726ba 100644 --- a/src/sources/file.rs +++ b/src/sources/file.rs @@ -1,11 +1,12 @@ use crate::{ event::{self, Event}, - topology::config::{DataType, SourceConfig}, + topology::config::{DataType, GlobalOptions, SourceConfig}, }; use bytes::Bytes; use file_source::FileServer; use futures::{future, sync::mpsc, Future, Sink}; use serde::{Deserialize, Serialize}; +use std::fs::DirBuilder; use std::path::PathBuf; use std::thread; use std::time::{Duration, SystemTime}; @@ -24,7 +25,7 @@ pub struct FileConfig { pub fingerprint_bytes: usize, pub ignored_header_bytes: usize, pub host_key: Option, - pub data_dir: PathBuf, + pub data_dir: Option, pub glob_minimum_cooldown: u64, // millis } @@ -44,7 +45,7 @@ impl Default for FileConfig { fingerprint_bytes: 256, ignored_header_bytes: 0, host_key: None, - data_dir: PathBuf::new(), + data_dir: None, glob_minimum_cooldown: 1000, // millis } } @@ -52,9 +53,26 @@ impl Default for FileConfig { #[typetag::serde(name = "file")] impl SourceConfig for FileConfig { - fn build(&self, out: mpsc::Sender) -> Result { - // TODO: validate paths - Ok(file_source(self, out)) + fn build( + &self, + name: &str, + globals: &GlobalOptions, + out: mpsc::Sender, + ) -> Result { + let mut data_dir = resolve_and_validate_data_dir(&self, globals)?; + // now before passing on the validated data_dir, we add the source_name as a subdir, + // so that multiple sources can operate within the same given data_dir (e.g. the global one) + // without the file servers' checkpointers interfering with each other + data_dir.push(name); + if let Err(e) = DirBuilder::new().create(&data_dir) { + return Err(format!( + "could not create subdirectory '{}' inside of data_dir '{}': {}", + name, + data_dir.parent().unwrap().display(), + e + )); + }; + Ok(file_source(self, data_dir, out)) } fn output_type(&self) -> DataType { @@ -62,7 +80,37 @@ impl SourceConfig for FileConfig { } } -pub fn file_source(config: &FileConfig, out: mpsc::Sender) -> super::Source { +fn resolve_and_validate_data_dir( + config: &FileConfig, + globals: &GlobalOptions, +) -> Result { + let data_dir = match config.data_dir.as_ref().or(globals.data_dir.as_ref()) { + Some(v) => v.clone(), + None => return Err("data_dir option required, but not given here or globally".into()), + }; + if !data_dir.exists() { + return Err(format!( + "data_dir '{}' does not exist", + data_dir.to_string_lossy() + )); + } + let readonly = std::fs::metadata(&data_dir) + .map(|meta| meta.permissions().readonly()) + .unwrap_or(true); + if readonly { + return Err(format!( + "data_dir '{}' is not writable", + data_dir.to_string_lossy() + )); + } + Ok(data_dir) +} + +pub fn file_source( + config: &FileConfig, + data_dir: PathBuf, + out: mpsc::Sender, +) -> super::Source { let (shutdown_tx, shutdown_rx) = std::sync::mpsc::channel(); let ignore_before = config @@ -79,7 +127,7 @@ pub fn file_source(config: &FileConfig, out: mpsc::Sender) -> super::Sour max_line_bytes: config.max_line_bytes, fingerprint_bytes: config.fingerprint_bytes, ignored_header_bytes: config.ignored_header_bytes, - data_dir: config.data_dir.clone(), + data_dir, glob_minimum_cooldown: glob_minimum_cooldown, }; @@ -90,7 +138,7 @@ pub fn file_source(config: &FileConfig, out: mpsc::Sender) -> super::Sour let out = out .sink_map_err(|_| ()) .with(move |(line, file): (Bytes, String)| { - trace!(message = "Recieved one event.", file = file.as_str()); + trace!(message = "Received one event.", file = file.as_str()); let event = create_event(line, file, &host_key, &hostname, &file_key); @@ -149,6 +197,7 @@ mod tests { use crate::event; use crate::sources::file; use crate::test_util::{block_on, shutdown_on_idle}; + use crate::topology::Config; use futures::{Future, Stream}; use std::collections::HashSet; use std::fs::{self, File}; @@ -160,7 +209,7 @@ mod tests { fn test_default_file_config(dir: &tempfile::TempDir) -> file::FileConfig { file::FileConfig { fingerprint_bytes: 8, - data_dir: dir.path().to_path_buf(), + data_dir: Some(dir.path().to_path_buf()), glob_minimum_cooldown: 0, // millis ..Default::default() } @@ -180,6 +229,31 @@ mod tests { result.unwrap() } + #[test] + fn resolve_data_dir() { + let global_dir = tempdir().unwrap(); + let local_dir = tempdir().unwrap(); + + let mut config = Config::empty(); + config.data_dir = global_dir.into_path().into(); + + // local path given -- local should win + let res = super::resolve_and_validate_data_dir( + &test_default_file_config(&local_dir), + &GlobalOptions::from(&config), + ) + .unwrap(); + assert_eq!(res, local_dir.path()); + + // no local path given -- global fallback should be in effect + let res = super::resolve_and_validate_data_dir( + &Default::default(), + &GlobalOptions::from(&config), + ) + .unwrap(); + assert_eq!(res, config.data_dir.unwrap()); + } + #[test] fn file_create_event() { let line = Bytes::from("hello world"); @@ -208,7 +282,7 @@ mod tests { ..test_default_file_config(&dir) }; - let source = file::file_source(&config, tx); + let source = file::file_source(&config, config.data_dir.clone().unwrap(), tx); let mut rt = tokio::runtime::Runtime::new().unwrap(); @@ -269,7 +343,7 @@ mod tests { include: vec![dir.path().join("*")], ..test_default_file_config(&dir) }; - let source = file::file_source(&config, tx); + let source = file::file_source(&config, config.data_dir.clone().unwrap(), tx); let mut rt = tokio::runtime::Runtime::new().unwrap(); @@ -338,7 +412,7 @@ mod tests { include: vec![dir.path().join("*")], ..test_default_file_config(&dir) }; - let source = file::file_source(&config, tx); + let source = file::file_source(&config, config.data_dir.clone().unwrap(), tx); let mut rt = tokio::runtime::Runtime::new().unwrap(); @@ -410,7 +484,7 @@ mod tests { ..test_default_file_config(&dir) }; - let source = file::file_source(&config, tx); + let source = file::file_source(&config, config.data_dir.clone().unwrap(), tx); let mut rt = tokio::runtime::Runtime::new().unwrap(); @@ -472,7 +546,7 @@ mod tests { ..test_default_file_config(&dir) }; - let source = file::file_source(&config, tx); + let source = file::file_source(&config, config.data_dir.clone().unwrap(), tx); rt.spawn(source.select(tripwire.clone()).map(|_| ()).map_err(|_| ())); @@ -502,7 +576,7 @@ mod tests { ..test_default_file_config(&dir) }; - let source = file::file_source(&config, tx); + let source = file::file_source(&config, config.data_dir.clone().unwrap(), tx); rt.spawn(source.select(tripwire.clone()).map(|_| ()).map_err(|_| ())); @@ -532,7 +606,7 @@ mod tests { ..test_default_file_config(&dir) }; - let source = file::file_source(&config, tx); + let source = file::file_source(&config, config.data_dir.clone().unwrap(), tx); rt.spawn(source.select(tripwire.clone()).map(|_| ()).map_err(|_| ())); @@ -578,7 +652,7 @@ mod tests { // First time server runs it picks up existing lines. { let (tx, rx) = futures::sync::mpsc::channel(10); - let source = file::file_source(&config, tx); + let source = file::file_source(&config, config.data_dir.clone().unwrap(), tx); let mut rt = tokio::runtime::Runtime::new().unwrap(); let (trigger, tripwire) = Tripwire::new(); rt.spawn(source.select(tripwire).map(|_| ()).map_err(|_| ())); @@ -600,7 +674,7 @@ mod tests { // Restart server, read file from checkpoint. { let (tx, rx) = futures::sync::mpsc::channel(10); - let source = file::file_source(&config, tx); + let source = file::file_source(&config, config.data_dir.clone().unwrap(), tx); let mut rt = tokio::runtime::Runtime::new().unwrap(); let (trigger, tripwire) = Tripwire::new(); rt.spawn(source.select(tripwire).map(|_| ()).map_err(|_| ())); @@ -627,7 +701,7 @@ mod tests { ..test_default_file_config(&dir) }; let (tx, rx) = futures::sync::mpsc::channel(10); - let source = file::file_source(&config, tx); + let source = file::file_source(&config, config.data_dir.clone().unwrap(), tx); let mut rt = tokio::runtime::Runtime::new().unwrap(); let (trigger, tripwire) = Tripwire::new(); rt.spawn(source.select(tripwire).map(|_| ()).map_err(|_| ())); @@ -663,7 +737,7 @@ mod tests { // Run server first time, collect some lines. { let (tx, rx) = futures::sync::mpsc::channel(10); - let source = file::file_source(&config, tx); + let source = file::file_source(&config, config.data_dir.clone().unwrap(), tx); let mut rt = tokio::runtime::Runtime::new().unwrap(); let (trigger, tripwire) = Tripwire::new(); rt.spawn(source.select(tripwire).map(|_| ()).map_err(|_| ())); @@ -689,7 +763,7 @@ mod tests { // even though it has a new name. { let (tx, rx) = futures::sync::mpsc::channel(10); - let source = file::file_source(&config, tx); + let source = file::file_source(&config, config.data_dir.clone().unwrap(), tx); let mut rt = tokio::runtime::Runtime::new().unwrap(); let (trigger, tripwire) = Tripwire::new(); rt.spawn(source.select(tripwire).map(|_| ()).map_err(|_| ())); @@ -727,7 +801,7 @@ mod tests { ..test_default_file_config(&dir) }; - let source = file::file_source(&config, tx); + let source = file::file_source(&config, config.data_dir.clone().unwrap(), tx); rt.spawn(source.select(tripwire).map(|_| ()).map_err(|_| ())); @@ -812,7 +886,7 @@ mod tests { ..test_default_file_config(&dir) }; - let source = file::file_source(&config, tx); + let source = file::file_source(&config, config.data_dir.clone().unwrap(), tx); let mut rt = tokio::runtime::Runtime::new().unwrap(); diff --git a/src/sources/statsd/mod.rs b/src/sources/statsd/mod.rs index e1bf17b55f990..fbb715fd4dfed 100644 --- a/src/sources/statsd/mod.rs +++ b/src/sources/statsd/mod.rs @@ -1,4 +1,4 @@ -use crate::Event; +use crate::{topology::config::GlobalOptions, Event}; use futures::{future, sync::mpsc, Future, Sink, Stream}; use parser::parse; use serde::{Deserialize, Serialize}; @@ -19,7 +19,12 @@ struct StatsdConfig { #[typetag::serde(name = "statsd")] impl crate::topology::config::SourceConfig for StatsdConfig { - fn build(&self, out: mpsc::Sender) -> Result { + fn build( + &self, + _name: &str, + _globals: &GlobalOptions, + out: mpsc::Sender, + ) -> Result { Ok(statsd(self.address.clone(), out)) } diff --git a/src/sources/stdin.rs b/src/sources/stdin.rs index c4953a0bfa9e0..737566273926d 100644 --- a/src/sources/stdin.rs +++ b/src/sources/stdin.rs @@ -1,6 +1,6 @@ use crate::{ event::{self, Event}, - topology::config::{DataType, SourceConfig}, + topology::config::{DataType, GlobalOptions, SourceConfig}, }; use bytes::Bytes; use codec::BytesDelimitedCodec; @@ -34,7 +34,12 @@ fn default_max_length() -> usize { #[typetag::serde(name = "stdin")] impl SourceConfig for StdinConfig { - fn build(&self, out: mpsc::Sender) -> Result { + fn build( + &self, + _name: &str, + _globals: &GlobalOptions, + out: mpsc::Sender, + ) -> Result { Ok(stdin_source(stdin(), self.clone(), out)) } diff --git a/src/sources/syslog.rs b/src/sources/syslog.rs index e16def49dfadb..1d5ce6ff73cf7 100644 --- a/src/sources/syslog.rs +++ b/src/sources/syslog.rs @@ -1,7 +1,7 @@ use super::util::TcpSource; use crate::{ event::{self, Event}, - topology::config::{DataType, SourceConfig}, + topology::config::{DataType, GlobalOptions, SourceConfig}, }; use bytes::Bytes; use chrono::{TimeZone, Utc}; @@ -53,7 +53,12 @@ impl SyslogConfig { #[typetag::serde(name = "syslog")] impl SourceConfig for SyslogConfig { - fn build(&self, out: mpsc::Sender) -> Result { + fn build( + &self, + _name: &str, + _globals: &GlobalOptions, + out: mpsc::Sender, + ) -> Result { let host_key = self.host_key.clone().unwrap_or(event::HOST.to_string()); match self.mode.clone() { diff --git a/src/sources/tcp.rs b/src/sources/tcp.rs index fffa7f84751a4..2f5778371908b 100644 --- a/src/sources/tcp.rs +++ b/src/sources/tcp.rs @@ -1,7 +1,7 @@ use super::util::TcpSource; use crate::{ event::{self, Event}, - topology::config::{DataType, SourceConfig}, + topology::config::{DataType, GlobalOptions, SourceConfig}, }; use bytes::Bytes; use codec::{self, BytesDelimitedCodec}; @@ -43,7 +43,12 @@ impl TcpConfig { #[typetag::serde(name = "tcp")] impl SourceConfig for TcpConfig { - fn build(&self, out: mpsc::Sender) -> Result { + fn build( + &self, + _name: &str, + _globals: &GlobalOptions, + out: mpsc::Sender, + ) -> Result { let tcp = RawTcpSource { config: self.clone(), }; @@ -95,7 +100,7 @@ mod test { use super::TcpConfig; use crate::event; use crate::test_util::{block_on, next_addr, send_lines, wait_for_tcp}; - use crate::topology::config::SourceConfig; + use crate::topology::config::{GlobalOptions, SourceConfig}; use futures::sync::mpsc; use futures::Stream; @@ -105,7 +110,9 @@ mod test { let addr = next_addr(); - let server = TcpConfig::new(addr).build(tx).unwrap(); + let server = TcpConfig::new(addr) + .build("default", &GlobalOptions::default(), tx) + .unwrap(); let mut rt = tokio::runtime::Runtime::new().unwrap(); rt.spawn(server); wait_for_tcp(addr); @@ -147,7 +154,9 @@ mod test { let mut config = TcpConfig::new(addr); config.max_length = 10; - let server = config.build(tx).unwrap(); + let server = config + .build("default", &GlobalOptions::default(), tx) + .unwrap(); let mut rt = tokio::runtime::Runtime::new().unwrap(); rt.spawn(server); wait_for_tcp(addr); diff --git a/src/sources/vector.rs b/src/sources/vector.rs index 529769a5799a2..fd729ed9910bf 100644 --- a/src/sources/vector.rs +++ b/src/sources/vector.rs @@ -1,7 +1,7 @@ use super::util::TcpSource; use crate::{ event::proto, - topology::config::{DataType, SourceConfig}, + topology::config::{DataType, GlobalOptions, SourceConfig}, Event, }; use bytes::{Bytes, BytesMut}; @@ -35,7 +35,12 @@ impl VectorConfig { #[typetag::serde(name = "vector")] impl SourceConfig for VectorConfig { - fn build(&self, out: mpsc::Sender) -> Result { + fn build( + &self, + _name: &str, + _globals: &GlobalOptions, + out: mpsc::Sender, + ) -> Result { let vector = VectorSource; vector.run(self.address, self.shutdown_timeout_secs, out) } @@ -79,7 +84,7 @@ mod test { buffers::Acker, sinks::vector::vector, test_util::{next_addr, wait_for_tcp, CollectCurrent}, - topology::config::SourceConfig, + topology::config::{GlobalOptions, SourceConfig}, Event, }; use futures::{stream, sync::mpsc, Future, Sink}; @@ -89,7 +94,9 @@ mod test { let (tx, rx) = mpsc::channel(100); let addr = next_addr(); - let server = VectorConfig::new(addr.clone()).build(tx).unwrap(); + let server = VectorConfig::new(addr.clone()) + .build("default", &GlobalOptions::default(), tx) + .unwrap(); let mut rt = tokio::runtime::Runtime::new().unwrap(); rt.spawn(server); wait_for_tcp(addr); diff --git a/src/topology/builder.rs b/src/topology/builder.rs index 6f983f107a697..8e80676728268 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -1,5 +1,5 @@ use super::fanout::{self, Fanout}; -use crate::buffers; +use crate::{buffers, topology::config::GlobalOptions}; use futures::{sync::mpsc, Future, Stream}; use std::{collections::HashMap, time::Duration}; use stream_cancel::{Trigger, Tripwire}; @@ -28,13 +28,15 @@ pub fn build_pieces(config: &super::Config) -> Result<(Pieces, Vec), Vec let mut errors = vec![]; let mut warnings = vec![]; + let globals = GlobalOptions::from(config); + // Build sources for (name, source) in &config.sources { let (tx, rx) = mpsc::channel(1000); - let server = match source.build(tx) { + let server = match source.build(&name, &globals, tx) { Err(error) => { - errors.push(format!("Transform \"{}\": {}", name, error)); + errors.push(format!("Source \"{}\": {}", name, error)); continue; } Ok(server) => server, diff --git a/src/topology/config/mod.rs b/src/topology/config/mod.rs index 2cc3f8209ed9a..b9ce5b14c3ba3 100644 --- a/src/topology/config/mod.rs +++ b/src/topology/config/mod.rs @@ -17,6 +17,19 @@ pub struct Config { pub transforms: IndexMap, } +#[derive(Default)] +pub struct GlobalOptions { + pub data_dir: Option, +} + +impl GlobalOptions { + pub fn from(config: &Config) -> Self { + Self { + data_dir: config.data_dir.clone(), + } + } +} + #[derive(Debug, Clone, PartialEq)] pub enum DataType { Any, @@ -26,7 +39,12 @@ pub enum DataType { #[typetag::serde(tag = "type")] pub trait SourceConfig: core::fmt::Debug { - fn build(&self, out: mpsc::Sender) -> Result; + fn build( + &self, + name: &str, + globals: &GlobalOptions, + out: mpsc::Sender, + ) -> Result; fn output_type(&self) -> DataType; } diff --git a/tests/crash.rs b/tests/crash.rs index 076aa909c0e67..fc79ce3e54038 100644 --- a/tests/crash.rs +++ b/tests/crash.rs @@ -6,7 +6,10 @@ use vector::{ block_on, next_addr, random_lines, receive, runtime, send_lines, shutdown_on_idle, wait_for_tcp, }, - topology::{self, config}, + topology::{ + self, + config::{self, GlobalOptions}, + }, Event, {sinks, sources}, }; @@ -152,7 +155,12 @@ struct ErrorSourceConfig; #[typetag::serde(name = "tcp")] impl config::SourceConfig for ErrorSourceConfig { - fn build(&self, _out: mpsc::Sender) -> Result { + fn build( + &self, + _name: &str, + _globals: &GlobalOptions, + _out: mpsc::Sender, + ) -> Result { Ok(Box::new(future::err(()))) } @@ -204,7 +212,12 @@ struct PanicSourceConfig; #[typetag::serde(name = "tcp")] impl config::SourceConfig for PanicSourceConfig { - fn build(&self, _out: mpsc::Sender) -> Result { + fn build( + &self, + _name: &str, + _globals: &GlobalOptions, + _out: mpsc::Sender, + ) -> Result { Ok(Box::new(future::lazy::<_, future::FutureResult<(), ()>>( || panic!(), )))