Skip to content

Commit

Permalink
feat(sources/file): fall back to global data_dir option (#644) (#673)
Browse files Browse the repository at this point in the history
Signed-off-by: Denis Andrejew <[email protected]>
  • Loading branch information
seeekr authored and binarylogic committed Aug 6, 2019
1 parent ce3bc8d commit e190e96
Show file tree
Hide file tree
Showing 14 changed files with 191 additions and 50 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ sample.log
miniodat
.DS_Store
.idea/
checkpoints/*
checkpoints/*
*.iml
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
- aws_cloudwatch_logs: `stream_name` now accepts `{{key}}` synatx for extracting values from events.
- aws_cloudwatch_logs: retry support added and more stablity improvements
- coercer: New transform to convert fields into specified types.
- file source: `data_dir` now falls back to global `data_dir` option if not specified
- aws_kinesis_streams: Added configurable partition keys

### Changed
Expand Down
1 change: 1 addition & 0 deletions config/vector.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,5 @@ data_dir = "/var/lib/vector"

# Output data
[sinks.print]
inputs = ["apache_parser"]
type = "console"
4 changes: 2 additions & 2 deletions lib/file-source/src/file_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use tracing::field;
/// event notification is used by `FileServer`.
///
/// `FileServer` is configured on a path to watch. The files do _not_ need to
/// exist at cernan startup. `FileServer` will discover new files which match
/// exist at startup. `FileServer` will discover new files which match
/// its path in at most 60 seconds.
pub struct FileServer {
pub include: Vec<PathBuf>,
Expand All @@ -40,7 +40,7 @@ pub struct FileServer {
/// versa but there's no one perfect approach. Very fast files _will_ be lost if
/// your system aggressively rolls log files. `FileServer` will keep a file
/// handler open but should your system move so quickly that a file disappears
/// before cernan is able to open it the contents will be lost. This should be a
/// before `FileServer` is able to open it the contents will be lost. This should be a
/// rare occurence.
///
/// Specific operating systems support evented interfaces that correct this
Expand Down
2 changes: 1 addition & 1 deletion lib/file-source/src/file_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ impl FileWatcher {
///
/// The input path will be used by `FileWatcher` to prime its state
/// machine. A `FileWatcher` tracks _only one_ file. This function returns
/// None if the path does not exist or is not readable by cernan.
/// None if the path does not exist or is not readable by the current process.
pub fn new(
path: PathBuf,
file_position: FilePosition,
Expand Down
122 changes: 98 additions & 24 deletions src/sources/file.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use crate::{
event::{self, Event},
topology::config::{DataType, SourceConfig},
topology::config::{DataType, GlobalOptions, SourceConfig},
};
use bytes::Bytes;
use file_source::FileServer;
use futures::{future, sync::mpsc, Future, Sink};
use serde::{Deserialize, Serialize};
use std::fs::DirBuilder;
use std::path::PathBuf;
use std::thread;
use std::time::{Duration, SystemTime};
Expand All @@ -24,7 +25,7 @@ pub struct FileConfig {
pub fingerprint_bytes: usize,
pub ignored_header_bytes: usize,
pub host_key: Option<String>,
pub data_dir: PathBuf,
pub data_dir: Option<PathBuf>,
pub glob_minimum_cooldown: u64, // millis
}

Expand All @@ -44,25 +45,72 @@ impl Default for FileConfig {
fingerprint_bytes: 256,
ignored_header_bytes: 0,
host_key: None,
data_dir: PathBuf::new(),
data_dir: None,
glob_minimum_cooldown: 1000, // millis
}
}
}

#[typetag::serde(name = "file")]
impl SourceConfig for FileConfig {
fn build(&self, out: mpsc::Sender<Event>) -> Result<super::Source, String> {
// TODO: validate paths
Ok(file_source(self, out))
fn build(
&self,
name: &str,
globals: &GlobalOptions,
out: mpsc::Sender<Event>,
) -> Result<super::Source, String> {
let mut data_dir = resolve_and_validate_data_dir(&self, globals)?;
// now before passing on the validated data_dir, we add the source_name as a subdir,
// so that multiple sources can operate within the same given data_dir (e.g. the global one)
// without the file servers' checkpointers interfering with each other
data_dir.push(name);
if let Err(e) = DirBuilder::new().create(&data_dir) {
return Err(format!(
"could not create subdirectory '{}' inside of data_dir '{}': {}",
name,
data_dir.parent().unwrap().display(),
e
));
};
Ok(file_source(self, data_dir, out))
}

fn output_type(&self) -> DataType {
DataType::Log
}
}

pub fn file_source(config: &FileConfig, out: mpsc::Sender<Event>) -> super::Source {
fn resolve_and_validate_data_dir(
config: &FileConfig,
globals: &GlobalOptions,
) -> Result<PathBuf, String> {
let data_dir = match config.data_dir.as_ref().or(globals.data_dir.as_ref()) {
Some(v) => v.clone(),
None => return Err("data_dir option required, but not given here or globally".into()),
};
if !data_dir.exists() {
return Err(format!(
"data_dir '{}' does not exist",
data_dir.to_string_lossy()
));
}
let readonly = std::fs::metadata(&data_dir)
.map(|meta| meta.permissions().readonly())
.unwrap_or(true);
if readonly {
return Err(format!(
"data_dir '{}' is not writable",
data_dir.to_string_lossy()
));
}
Ok(data_dir)
}

pub fn file_source(
config: &FileConfig,
data_dir: PathBuf,
out: mpsc::Sender<Event>,
) -> super::Source {
let (shutdown_tx, shutdown_rx) = std::sync::mpsc::channel();

let ignore_before = config
Expand All @@ -79,7 +127,7 @@ pub fn file_source(config: &FileConfig, out: mpsc::Sender<Event>) -> super::Sour
max_line_bytes: config.max_line_bytes,
fingerprint_bytes: config.fingerprint_bytes,
ignored_header_bytes: config.ignored_header_bytes,
data_dir: config.data_dir.clone(),
data_dir,
glob_minimum_cooldown: glob_minimum_cooldown,
};

Expand All @@ -90,7 +138,7 @@ pub fn file_source(config: &FileConfig, out: mpsc::Sender<Event>) -> super::Sour
let out = out
.sink_map_err(|_| ())
.with(move |(line, file): (Bytes, String)| {
trace!(message = "Recieved one event.", file = file.as_str());
trace!(message = "Received one event.", file = file.as_str());

let event = create_event(line, file, &host_key, &hostname, &file_key);

Expand Down Expand Up @@ -149,6 +197,7 @@ mod tests {
use crate::event;
use crate::sources::file;
use crate::test_util::{block_on, shutdown_on_idle};
use crate::topology::Config;
use futures::{Future, Stream};
use std::collections::HashSet;
use std::fs::{self, File};
Expand All @@ -160,7 +209,7 @@ mod tests {
fn test_default_file_config(dir: &tempfile::TempDir) -> file::FileConfig {
file::FileConfig {
fingerprint_bytes: 8,
data_dir: dir.path().to_path_buf(),
data_dir: Some(dir.path().to_path_buf()),
glob_minimum_cooldown: 0, // millis
..Default::default()
}
Expand All @@ -180,6 +229,31 @@ mod tests {
result.unwrap()
}

#[test]
fn resolve_data_dir() {
let global_dir = tempdir().unwrap();
let local_dir = tempdir().unwrap();

let mut config = Config::empty();
config.data_dir = global_dir.into_path().into();

// local path given -- local should win
let res = super::resolve_and_validate_data_dir(
&test_default_file_config(&local_dir),
&GlobalOptions::from(&config),
)
.unwrap();
assert_eq!(res, local_dir.path());

// no local path given -- global fallback should be in effect
let res = super::resolve_and_validate_data_dir(
&Default::default(),
&GlobalOptions::from(&config),
)
.unwrap();
assert_eq!(res, config.data_dir.unwrap());
}

#[test]
fn file_create_event() {
let line = Bytes::from("hello world");
Expand Down Expand Up @@ -208,7 +282,7 @@ mod tests {
..test_default_file_config(&dir)
};

let source = file::file_source(&config, tx);
let source = file::file_source(&config, config.data_dir.clone().unwrap(), tx);

let mut rt = tokio::runtime::Runtime::new().unwrap();

Expand Down Expand Up @@ -269,7 +343,7 @@ mod tests {
include: vec![dir.path().join("*")],
..test_default_file_config(&dir)
};
let source = file::file_source(&config, tx);
let source = file::file_source(&config, config.data_dir.clone().unwrap(), tx);

let mut rt = tokio::runtime::Runtime::new().unwrap();

Expand Down Expand Up @@ -338,7 +412,7 @@ mod tests {
include: vec![dir.path().join("*")],
..test_default_file_config(&dir)
};
let source = file::file_source(&config, tx);
let source = file::file_source(&config, config.data_dir.clone().unwrap(), tx);

let mut rt = tokio::runtime::Runtime::new().unwrap();

Expand Down Expand Up @@ -410,7 +484,7 @@ mod tests {
..test_default_file_config(&dir)
};

let source = file::file_source(&config, tx);
let source = file::file_source(&config, config.data_dir.clone().unwrap(), tx);

let mut rt = tokio::runtime::Runtime::new().unwrap();

Expand Down Expand Up @@ -472,7 +546,7 @@ mod tests {
..test_default_file_config(&dir)
};

let source = file::file_source(&config, tx);
let source = file::file_source(&config, config.data_dir.clone().unwrap(), tx);

rt.spawn(source.select(tripwire.clone()).map(|_| ()).map_err(|_| ()));

Expand Down Expand Up @@ -502,7 +576,7 @@ mod tests {
..test_default_file_config(&dir)
};

let source = file::file_source(&config, tx);
let source = file::file_source(&config, config.data_dir.clone().unwrap(), tx);

rt.spawn(source.select(tripwire.clone()).map(|_| ()).map_err(|_| ()));

Expand Down Expand Up @@ -532,7 +606,7 @@ mod tests {
..test_default_file_config(&dir)
};

let source = file::file_source(&config, tx);
let source = file::file_source(&config, config.data_dir.clone().unwrap(), tx);

rt.spawn(source.select(tripwire.clone()).map(|_| ()).map_err(|_| ()));

Expand Down Expand Up @@ -578,7 +652,7 @@ mod tests {
// First time server runs it picks up existing lines.
{
let (tx, rx) = futures::sync::mpsc::channel(10);
let source = file::file_source(&config, tx);
let source = file::file_source(&config, config.data_dir.clone().unwrap(), tx);
let mut rt = tokio::runtime::Runtime::new().unwrap();
let (trigger, tripwire) = Tripwire::new();
rt.spawn(source.select(tripwire).map(|_| ()).map_err(|_| ()));
Expand All @@ -600,7 +674,7 @@ mod tests {
// Restart server, read file from checkpoint.
{
let (tx, rx) = futures::sync::mpsc::channel(10);
let source = file::file_source(&config, tx);
let source = file::file_source(&config, config.data_dir.clone().unwrap(), tx);
let mut rt = tokio::runtime::Runtime::new().unwrap();
let (trigger, tripwire) = Tripwire::new();
rt.spawn(source.select(tripwire).map(|_| ()).map_err(|_| ()));
Expand All @@ -627,7 +701,7 @@ mod tests {
..test_default_file_config(&dir)
};
let (tx, rx) = futures::sync::mpsc::channel(10);
let source = file::file_source(&config, tx);
let source = file::file_source(&config, config.data_dir.clone().unwrap(), tx);
let mut rt = tokio::runtime::Runtime::new().unwrap();
let (trigger, tripwire) = Tripwire::new();
rt.spawn(source.select(tripwire).map(|_| ()).map_err(|_| ()));
Expand Down Expand Up @@ -663,7 +737,7 @@ mod tests {
// Run server first time, collect some lines.
{
let (tx, rx) = futures::sync::mpsc::channel(10);
let source = file::file_source(&config, tx);
let source = file::file_source(&config, config.data_dir.clone().unwrap(), tx);
let mut rt = tokio::runtime::Runtime::new().unwrap();
let (trigger, tripwire) = Tripwire::new();
rt.spawn(source.select(tripwire).map(|_| ()).map_err(|_| ()));
Expand All @@ -689,7 +763,7 @@ mod tests {
// even though it has a new name.
{
let (tx, rx) = futures::sync::mpsc::channel(10);
let source = file::file_source(&config, tx);
let source = file::file_source(&config, config.data_dir.clone().unwrap(), tx);
let mut rt = tokio::runtime::Runtime::new().unwrap();
let (trigger, tripwire) = Tripwire::new();
rt.spawn(source.select(tripwire).map(|_| ()).map_err(|_| ()));
Expand Down Expand Up @@ -727,7 +801,7 @@ mod tests {
..test_default_file_config(&dir)
};

let source = file::file_source(&config, tx);
let source = file::file_source(&config, config.data_dir.clone().unwrap(), tx);

rt.spawn(source.select(tripwire).map(|_| ()).map_err(|_| ()));

Expand Down Expand Up @@ -812,7 +886,7 @@ mod tests {
..test_default_file_config(&dir)
};

let source = file::file_source(&config, tx);
let source = file::file_source(&config, config.data_dir.clone().unwrap(), tx);

let mut rt = tokio::runtime::Runtime::new().unwrap();

Expand Down
9 changes: 7 additions & 2 deletions src/sources/statsd/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::Event;
use crate::{topology::config::GlobalOptions, Event};
use futures::{future, sync::mpsc, Future, Sink, Stream};
use parser::parse;
use serde::{Deserialize, Serialize};
Expand All @@ -19,7 +19,12 @@ struct StatsdConfig {

#[typetag::serde(name = "statsd")]
impl crate::topology::config::SourceConfig for StatsdConfig {
fn build(&self, out: mpsc::Sender<Event>) -> Result<super::Source, String> {
fn build(
&self,
_name: &str,
_globals: &GlobalOptions,
out: mpsc::Sender<Event>,
) -> Result<super::Source, String> {
Ok(statsd(self.address.clone(), out))
}

Expand Down
9 changes: 7 additions & 2 deletions src/sources/stdin.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
event::{self, Event},
topology::config::{DataType, SourceConfig},
topology::config::{DataType, GlobalOptions, SourceConfig},
};
use bytes::Bytes;
use codec::BytesDelimitedCodec;
Expand Down Expand Up @@ -34,7 +34,12 @@ fn default_max_length() -> usize {

#[typetag::serde(name = "stdin")]
impl SourceConfig for StdinConfig {
fn build(&self, out: mpsc::Sender<Event>) -> Result<super::Source, String> {
fn build(
&self,
_name: &str,
_globals: &GlobalOptions,
out: mpsc::Sender<Event>,
) -> Result<super::Source, String> {
Ok(stdin_source(stdin(), self.clone(), out))
}

Expand Down
Loading

0 comments on commit e190e96

Please sign in to comment.