Skip to content

Commit

Permalink
feat(journald source): Add emit_cursor option (#18882)
Browse files Browse the repository at this point in the history
* Add emit_cursor option to journald source

* Update documentation

* Fix typo
  • Loading branch information
sproberts92 authored Nov 1, 2023
1 parent 051de5a commit 74051dc
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 5 deletions.
38 changes: 33 additions & 5 deletions src/sources/journald.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,13 @@ pub struct JournaldConfig {
#[configurable(metadata(docs::hidden))]
#[serde(default)]
log_namespace: Option<bool>,

/// Whether to emit the [__CURSOR field][cursor]. See also [sd_journal_get_cursor][get_cursor].
///
/// [cursor]: https://www.freedesktop.org/software/systemd/man/latest/systemd.journal-fields.html#Address%20Fields
/// [get_cursor]: https://www.freedesktop.org/software/systemd/man/latest/sd_journal_get_cursor.html
#[serde(default = "crate::serde::default_false")]
emit_cursor: bool,
}

const fn default_batch_size() -> usize {
Expand Down Expand Up @@ -308,6 +315,7 @@ impl Default for JournaldConfig {
acknowledgements: Default::default(),
remap_priority: false,
log_namespace: None,
emit_cursor: false,
}
}
}
Expand Down Expand Up @@ -377,6 +385,7 @@ impl SourceConfig for JournaldConfig {
acknowledgements,
starter,
log_namespace,
emit_cursor: self.emit_cursor,
}
.run_shutdown(cx.shutdown),
))
Expand Down Expand Up @@ -404,6 +413,7 @@ struct JournaldSource {
acknowledgements: bool,
starter: StartJournalctl,
log_namespace: LogNamespace,
emit_cursor: bool,
}

impl JournaldSource {
Expand Down Expand Up @@ -554,7 +564,11 @@ impl<'a> Batch<'a> {
Some(Ok(bytes)) => {
match decode_record(&bytes, self.source.remap_priority) {
Ok(mut record) => {
if let Some(tmp) = record.remove(CURSOR) {
if self.source.emit_cursor {
if let Some(tmp) = record.get(CURSOR) {
self.cursor = Some(tmp.clone());
}
} else if let Some(tmp) = record.remove(CURSOR) {
self.cursor = Some(tmp);
}

Expand Down Expand Up @@ -1089,13 +1103,14 @@ mod tests {
async fn run_with_units(iunits: &[&str], xunits: &[&str], cursor: Option<&str>) -> Vec<Event> {
let include_matches = create_unit_matches(iunits.to_vec());
let exclude_matches = create_unit_matches(xunits.to_vec());
run_journal(include_matches, exclude_matches, cursor).await
run_journal(include_matches, exclude_matches, cursor, false).await
}

async fn run_journal(
include_matches: Matches,
exclude_matches: Matches,
checkpoint: Option<&str>,
emit_cursor: bool,
) -> Vec<Event> {
assert_source_compliance(&["protocol"], async move {
let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Delivered);
Expand Down Expand Up @@ -1128,6 +1143,7 @@ mod tests {
data_dir: Some(tempdir),
remap_priority: true,
acknowledgements: false.into(),
emit_cursor,
..Default::default()
};
let source = config.build(cx).await.unwrap();
Expand Down Expand Up @@ -1207,10 +1223,18 @@ mod tests {
);
}

#[tokio::test]
async fn emits_cursor() {
let received = run_journal(Matches::new(), Matches::new(), None, true).await;
assert_eq!(cursor(&received[0]), Value::Bytes("1".into()));
assert_eq!(cursor(&received[3]), Value::Bytes("4".into()));
assert_eq!(cursor(&received[7]), Value::Bytes("8".into()));
}

#[tokio::test]
async fn includes_matches() {
let matches = create_matches(vec![("PRIORITY", "ERR")]);
let received = run_journal(matches, HashMap::new(), None).await;
let received = run_journal(matches, HashMap::new(), None, false).await;
assert_eq!(received.len(), 2);
assert_eq!(
message(&received[0]),
Expand All @@ -1227,7 +1251,7 @@ mod tests {
#[tokio::test]
async fn includes_kernel() {
let matches = create_matches(vec![("_TRANSPORT", "kernel")]);
let received = run_journal(matches, HashMap::new(), None).await;
let received = run_journal(matches, HashMap::new(), None, false).await;
assert_eq!(received.len(), 1);
assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140006000));
assert_eq!(message(&received[0]), Value::Bytes("audit log".into()));
Expand All @@ -1236,7 +1260,7 @@ mod tests {
#[tokio::test]
async fn excludes_matches() {
let matches = create_matches(vec![("PRIORITY", "INFO"), ("PRIORITY", "DEBUG")]);
let received = run_journal(HashMap::new(), matches, None).await;
let received = run_journal(HashMap::new(), matches, None, false).await;
assert_eq!(received.len(), 5);
assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140003000));
assert_eq!(timestamp(&received[1]), value_ts(1578529839, 140004000));
Expand Down Expand Up @@ -1515,6 +1539,10 @@ mod tests {
event.as_log()[log_schema().timestamp_key().unwrap().to_string()].clone()
}

fn cursor(event: &Event) -> Value {
event.as_log()[CURSOR].clone()
}

fn value_ts(secs: i64, usecs: u32) -> Value {
Value::Timestamp(
chrono::Utc
Expand Down
10 changes: 10 additions & 0 deletions website/cue/reference/components/sources/base/journald.cue
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,16 @@ base: components: sources: journald: configuration: {
required: false
type: string: examples: ["/var/lib/vector"]
}
emit_cursor: {
description: """
Whether to emit the [__CURSOR field][cursor]. See also [sd_journal_get_cursor][get_cursor].
[cursor]: https://www.freedesktop.org/software/systemd/man/latest/systemd.journal-fields.html#Address%20Fields
[get_cursor]: https://www.freedesktop.org/software/systemd/man/latest/sd_journal_get_cursor.html
"""
required: false
type: bool: default: false
}
exclude_matches: {
description: """
A list of sets of field/value pairs that, if any are present in a journal entry,
Expand Down

0 comments on commit 74051dc

Please sign in to comment.