Skip to content

Commit

Permalink
feat(sources/file): fall back to global data_dir option (vectordotdev…
Browse files Browse the repository at this point in the history
…#644)

also makes global options (currently only data_dir) available to all
sources via SourceConfig::build()

Signed-off-by: Denis Andrejew <[email protected]>
  • Loading branch information
seeekr committed Jul 22, 2019
1 parent efe6541 commit 333e857
Show file tree
Hide file tree
Showing 10 changed files with 127 additions and 46 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

- aws_s3: Add `filename_extension` options.
- aws_cloudwatch_logs: `stream_name` now accepts `{{key}}` synatx for extracting values from events.
- file source: `data_dir` now falls back to global `data_dir` option if not specified

### Changed
- [configuration] Empty inputs are treated as errors instead of warnings [#506]
Expand Down
74 changes: 50 additions & 24 deletions src/sources/file.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
event::{self, Event},
topology::config::{DataType, SourceConfig},
topology::config::{DataType, GlobalOptions, SourceConfig},
};
use bytes::Bytes;
use file_source::FileServer;
Expand All @@ -24,7 +24,7 @@ pub struct FileConfig {
pub fingerprint_bytes: usize,
pub ignored_header_bytes: usize,
pub host_key: Option<String>,
pub data_dir: PathBuf,
pub data_dir: Option<PathBuf>,
pub glob_minimum_cooldown: u64, // millis
}

Expand All @@ -44,25 +44,51 @@ impl Default for FileConfig {
fingerprint_bytes: 256,
ignored_header_bytes: 0,
host_key: None,
data_dir: PathBuf::new(),
data_dir: None,
glob_minimum_cooldown: 1000, // millis
}
}
}

#[typetag::serde(name = "file")]
impl SourceConfig for FileConfig {
fn build(&self, out: mpsc::Sender<Event>) -> Result<super::Source, String> {
// TODO: validate paths
Ok(file_source(self, out))
fn build(
&self,
globals: &GlobalOptions,
out: mpsc::Sender<Event>,
) -> Result<super::Source, String> {
let data_dir = match &self.data_dir {
Some(v) => v.clone(),
None => match &globals.data_dir {
Some(v) => v.clone(),
None => {
return Err("data_dir option required, but not given here or globally".into())
}
},
};
if !data_dir.exists() {
return Err("data_dir '{}' does not exist".into());
}
let readonly = std::fs::metadata(&data_dir)
.map(|meta| meta.permissions().readonly())
.unwrap_or(true);
if readonly {
return Err("data_dir '{}' is not writable".into());
}

Ok(file_source(self, data_dir, out))
}

fn output_type(&self) -> DataType {
DataType::Log
}
}

pub fn file_source(config: &FileConfig, out: mpsc::Sender<Event>) -> super::Source {
pub fn file_source(
config: &FileConfig,
data_dir: PathBuf,
out: mpsc::Sender<Event>,
) -> super::Source {
let (shutdown_tx, shutdown_rx) = std::sync::mpsc::channel();

let ignore_before = config
Expand All @@ -79,7 +105,7 @@ pub fn file_source(config: &FileConfig, out: mpsc::Sender<Event>) -> super::Sour
max_line_bytes: config.max_line_bytes,
fingerprint_bytes: config.fingerprint_bytes,
ignored_header_bytes: config.ignored_header_bytes,
data_dir: config.data_dir.clone(),
data_dir,
glob_minimum_cooldown: glob_minimum_cooldown,
};

Expand All @@ -90,7 +116,7 @@ pub fn file_source(config: &FileConfig, out: mpsc::Sender<Event>) -> super::Sour
let out = out
.sink_map_err(|_| ())
.with(move |(line, file): (Bytes, String)| {
trace!(message = "Recieved one event.", file = file.as_str());
trace!(message = "Received one event.", file = file.as_str());

let event = create_event(line, file, &host_key, &hostname, &file_key);

Expand Down Expand Up @@ -164,7 +190,7 @@ mod tests {
fn test_default_file_config(dir: &tempfile::TempDir) -> file::FileConfig {
file::FileConfig {
fingerprint_bytes: 8,
data_dir: dir.path().to_path_buf(),
data_dir: Some(dir.path().to_path_buf()),
glob_minimum_cooldown: 0, // millis
..Default::default()
}
Expand Down Expand Up @@ -212,7 +238,7 @@ mod tests {
..test_default_file_config(&dir)
};

let source = file::file_source(&config, tx);
let source = file::file_source(&config, config.data_dir.clone().unwrap(), tx);

let mut rt = tokio::runtime::Runtime::new().unwrap();

Expand Down Expand Up @@ -273,7 +299,7 @@ mod tests {
include: vec![dir.path().join("*")],
..test_default_file_config(&dir)
};
let source = file::file_source(&config, tx);
let source = file::file_source(&config, config.data_dir.clone().unwrap(), tx);

let mut rt = tokio::runtime::Runtime::new().unwrap();

Expand Down Expand Up @@ -342,7 +368,7 @@ mod tests {
include: vec![dir.path().join("*")],
..test_default_file_config(&dir)
};
let source = file::file_source(&config, tx);
let source = file::file_source(&config, config.data_dir.clone().unwrap(), tx);

let mut rt = tokio::runtime::Runtime::new().unwrap();

Expand Down Expand Up @@ -414,7 +440,7 @@ mod tests {
..test_default_file_config(&dir)
};

let source = file::file_source(&config, tx);
let source = file::file_source(&config, config.data_dir.clone().unwrap(), tx);

let mut rt = tokio::runtime::Runtime::new().unwrap();

Expand Down Expand Up @@ -476,7 +502,7 @@ mod tests {
..test_default_file_config(&dir)
};

let source = file::file_source(&config, tx);
let source = file::file_source(&config, config.data_dir.clone().unwrap(), tx);

rt.spawn(source.select(tripwire.clone()).map(|_| ()).map_err(|_| ()));

Expand Down Expand Up @@ -506,7 +532,7 @@ mod tests {
..test_default_file_config(&dir)
};

let source = file::file_source(&config, tx);
let source = file::file_source(&config, config.data_dir.clone().unwrap(), tx);

rt.spawn(source.select(tripwire.clone()).map(|_| ()).map_err(|_| ()));

Expand Down Expand Up @@ -536,7 +562,7 @@ mod tests {
..test_default_file_config(&dir)
};

let source = file::file_source(&config, tx);
let source = file::file_source(&config, config.data_dir.clone().unwrap(), tx);

rt.spawn(source.select(tripwire.clone()).map(|_| ()).map_err(|_| ()));

Expand Down Expand Up @@ -582,7 +608,7 @@ mod tests {
// First time server runs it picks up existing lines.
{
let (tx, rx) = futures::sync::mpsc::channel(10);
let source = file::file_source(&config, tx);
let source = file::file_source(&config, config.data_dir.clone().unwrap(), tx);
let mut rt = tokio::runtime::Runtime::new().unwrap();
let (trigger, tripwire) = Tripwire::new();
rt.spawn(source.select(tripwire).map(|_| ()).map_err(|_| ()));
Expand All @@ -604,7 +630,7 @@ mod tests {
// Restart server, read file from checkpoint.
{
let (tx, rx) = futures::sync::mpsc::channel(10);
let source = file::file_source(&config, tx);
let source = file::file_source(&config, config.data_dir.clone().unwrap(), tx);
let mut rt = tokio::runtime::Runtime::new().unwrap();
let (trigger, tripwire) = Tripwire::new();
rt.spawn(source.select(tripwire).map(|_| ()).map_err(|_| ()));
Expand All @@ -631,7 +657,7 @@ mod tests {
..test_default_file_config(&dir)
};
let (tx, rx) = futures::sync::mpsc::channel(10);
let source = file::file_source(&config, tx);
let source = file::file_source(&config, config.data_dir.clone().unwrap(), tx);
let mut rt = tokio::runtime::Runtime::new().unwrap();
let (trigger, tripwire) = Tripwire::new();
rt.spawn(source.select(tripwire).map(|_| ()).map_err(|_| ()));
Expand Down Expand Up @@ -667,7 +693,7 @@ mod tests {
// Run server first time, collect some lines.
{
let (tx, rx) = futures::sync::mpsc::channel(10);
let source = file::file_source(&config, tx);
let source = file::file_source(&config, config.data_dir.clone().unwrap(), tx);
let mut rt = tokio::runtime::Runtime::new().unwrap();
let (trigger, tripwire) = Tripwire::new();
rt.spawn(source.select(tripwire).map(|_| ()).map_err(|_| ()));
Expand All @@ -693,7 +719,7 @@ mod tests {
// even though it has a new name.
{
let (tx, rx) = futures::sync::mpsc::channel(10);
let source = file::file_source(&config, tx);
let source = file::file_source(&config, config.data_dir.clone().unwrap(), tx);
let mut rt = tokio::runtime::Runtime::new().unwrap();
let (trigger, tripwire) = Tripwire::new();
rt.spawn(source.select(tripwire).map(|_| ()).map_err(|_| ()));
Expand Down Expand Up @@ -731,7 +757,7 @@ mod tests {
..test_default_file_config(&dir)
};

let source = file::file_source(&config, tx);
let source = file::file_source(&config, config.data_dir.clone().unwrap(), tx);

rt.spawn(source.select(tripwire).map(|_| ()).map_err(|_| ()));

Expand Down Expand Up @@ -816,7 +842,7 @@ mod tests {
..test_default_file_config(&dir)
};

let source = file::file_source(&config, tx);
let source = file::file_source(&config, config.data_dir.clone().unwrap(), tx);

let mut rt = tokio::runtime::Runtime::new().unwrap();

Expand Down
8 changes: 6 additions & 2 deletions src/sources/statsd/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::Event;
use crate::{topology::config::GlobalOptions, Event};
use futures::{future, sync::mpsc, Future, Sink, Stream};
use parser::parse;
use serde::{Deserialize, Serialize};
Expand All @@ -19,7 +19,11 @@ struct StatsdConfig {

#[typetag::serde(name = "statsd")]
impl crate::topology::config::SourceConfig for StatsdConfig {
fn build(&self, out: mpsc::Sender<Event>) -> Result<super::Source, String> {
fn build(
&self,
_globals: &GlobalOptions,
out: mpsc::Sender<Event>,
) -> Result<super::Source, String> {
Ok(statsd(self.address.clone(), out))
}

Expand Down
8 changes: 6 additions & 2 deletions src/sources/stdin.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
event::{self, Event},
topology::config::{DataType, SourceConfig},
topology::config::{DataType, GlobalOptions, SourceConfig},
};
use bytes::Bytes;
use codec::BytesDelimitedCodec;
Expand Down Expand Up @@ -34,7 +34,11 @@ fn default_max_length() -> usize {

#[typetag::serde(name = "stdin")]
impl SourceConfig for StdinConfig {
fn build(&self, out: mpsc::Sender<Event>) -> Result<super::Source, String> {
fn build(
&self,
_globals: &GlobalOptions,
out: mpsc::Sender<Event>,
) -> Result<super::Source, String> {
Ok(stdin_source(stdin(), self.clone(), out))
}

Expand Down
8 changes: 6 additions & 2 deletions src/sources/syslog.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::util::TcpSource;
use crate::{
event::{self, Event},
topology::config::{DataType, SourceConfig},
topology::config::{DataType, GlobalOptions, SourceConfig},
};
use bytes::Bytes;
use chrono::{TimeZone, Utc};
Expand Down Expand Up @@ -53,7 +53,11 @@ impl SyslogConfig {

#[typetag::serde(name = "syslog")]
impl SourceConfig for SyslogConfig {
fn build(&self, out: mpsc::Sender<Event>) -> Result<super::Source, String> {
fn build(
&self,
_globals: &GlobalOptions,
out: mpsc::Sender<Event>,
) -> Result<super::Source, String> {
let host_key = self.host_key.clone().unwrap_or(event::HOST.to_string());

match self.mode.clone() {
Expand Down
16 changes: 11 additions & 5 deletions src/sources/tcp.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::util::TcpSource;
use crate::{
event::{self, Event},
topology::config::{DataType, SourceConfig},
topology::config::{DataType, GlobalOptions, SourceConfig},
};
use bytes::Bytes;
use codec::{self, BytesDelimitedCodec};
Expand Down Expand Up @@ -43,7 +43,11 @@ impl TcpConfig {

#[typetag::serde(name = "tcp")]
impl SourceConfig for TcpConfig {
fn build(&self, out: mpsc::Sender<Event>) -> Result<super::Source, String> {
fn build(
&self,
_globals: &GlobalOptions,
out: mpsc::Sender<Event>,
) -> Result<super::Source, String> {
let tcp = RawTcpSource {
config: self.clone(),
};
Expand Down Expand Up @@ -95,7 +99,7 @@ mod test {
use super::TcpConfig;
use crate::event;
use crate::test_util::{block_on, next_addr, send_lines, wait_for_tcp};
use crate::topology::config::SourceConfig;
use crate::topology::config::{GlobalOptions, SourceConfig};
use futures::sync::mpsc;
use futures::Stream;

Expand All @@ -105,7 +109,9 @@ mod test {

let addr = next_addr();

let server = TcpConfig::new(addr).build(tx).unwrap();
let server = TcpConfig::new(addr)
.build(&GlobalOptions::default(), tx)
.unwrap();
let mut rt = tokio::runtime::Runtime::new().unwrap();
rt.spawn(server);
wait_for_tcp(addr);
Expand Down Expand Up @@ -147,7 +153,7 @@ mod test {
let mut config = TcpConfig::new(addr);
config.max_length = 10;

let server = config.build(tx).unwrap();
let server = config.build(&GlobalOptions::default(), tx).unwrap();
let mut rt = tokio::runtime::Runtime::new().unwrap();
rt.spawn(server);
wait_for_tcp(addr);
Expand Down
14 changes: 10 additions & 4 deletions src/sources/vector.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::util::TcpSource;
use crate::{
event::proto,
topology::config::{DataType, SourceConfig},
topology::config::{DataType, GlobalOptions, SourceConfig},
Event,
};
use bytes::{Bytes, BytesMut};
Expand Down Expand Up @@ -35,7 +35,11 @@ impl VectorConfig {

#[typetag::serde(name = "vector")]
impl SourceConfig for VectorConfig {
fn build(&self, out: mpsc::Sender<Event>) -> Result<super::Source, String> {
fn build(
&self,
_globals: &GlobalOptions,
out: mpsc::Sender<Event>,
) -> Result<super::Source, String> {
let vector = VectorSource;
vector.run(self.address, self.shutdown_timeout_secs, out)
}
Expand Down Expand Up @@ -79,7 +83,7 @@ mod test {
buffers::Acker,
sinks::vector::vector,
test_util::{next_addr, wait_for_tcp, CollectCurrent},
topology::config::SourceConfig,
topology::config::{GlobalOptions, SourceConfig},
Event,
};
use futures::{stream, sync::mpsc, Future, Sink};
Expand All @@ -89,7 +93,9 @@ mod test {
let (tx, rx) = mpsc::channel(100);

let addr = next_addr();
let server = VectorConfig::new(addr.clone()).build(tx).unwrap();
let server = VectorConfig::new(addr.clone())
.build(&GlobalOptions::default(), tx)
.unwrap();
let mut rt = tokio::runtime::Runtime::new().unwrap();
rt.spawn(server);
wait_for_tcp(addr);
Expand Down
Loading

0 comments on commit 333e857

Please sign in to comment.