Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(journald source): Add emit_cursor option #18882

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 33 additions & 5 deletions src/sources/journald.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,13 @@ pub struct JournaldConfig {
#[configurable(metadata(docs::hidden))]
#[serde(default)]
log_namespace: Option<bool>,

/// Whether to emit the [__CURSOR field][cursor]. See also [sd_journal_get_cursor][get_cursor].
///
/// [cursor]: https://www.freedesktop.org/software/systemd/man/latest/systemd.journal-fields.html#Address%20Fields
/// [get_cursor]: https://www.freedesktop.org/software/systemd/man/latest/sd_journal_get_cursor.html
#[serde(default = "crate::serde::default_false")]
emit_cursor: bool,
}

const fn default_batch_size() -> usize {
Expand Down Expand Up @@ -308,6 +315,7 @@ impl Default for JournaldConfig {
acknowledgements: Default::default(),
remap_priority: false,
log_namespace: None,
emit_cursor: false,
}
}
}
Expand Down Expand Up @@ -377,6 +385,7 @@ impl SourceConfig for JournaldConfig {
acknowledgements,
starter,
log_namespace,
emit_cursor: self.emit_cursor,
}
.run_shutdown(cx.shutdown),
))
Expand Down Expand Up @@ -404,6 +413,7 @@ struct JournaldSource {
acknowledgements: bool,
starter: StartJournalctl,
log_namespace: LogNamespace,
emit_cursor: bool,
}

impl JournaldSource {
Expand Down Expand Up @@ -554,7 +564,11 @@ impl<'a> Batch<'a> {
Some(Ok(bytes)) => {
match decode_record(&bytes, self.source.remap_priority) {
Ok(mut record) => {
if let Some(tmp) = record.remove(CURSOR) {
if self.source.emit_cursor {
if let Some(tmp) = record.get(CURSOR) {
self.cursor = Some(tmp.clone());
}
} else if let Some(tmp) = record.remove(CURSOR) {
self.cursor = Some(tmp);
}

Expand Down Expand Up @@ -1089,13 +1103,14 @@ mod tests {
async fn run_with_units(iunits: &[&str], xunits: &[&str], cursor: Option<&str>) -> Vec<Event> {
let include_matches = create_unit_matches(iunits.to_vec());
let exclude_matches = create_unit_matches(xunits.to_vec());
run_journal(include_matches, exclude_matches, cursor).await
run_journal(include_matches, exclude_matches, cursor, false).await
}

async fn run_journal(
include_matches: Matches,
exclude_matches: Matches,
checkpoint: Option<&str>,
emit_cursor: bool,
) -> Vec<Event> {
assert_source_compliance(&["protocol"], async move {
let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Delivered);
Expand Down Expand Up @@ -1128,6 +1143,7 @@ mod tests {
data_dir: Some(tempdir),
remap_priority: true,
acknowledgements: false.into(),
emit_cursor,
..Default::default()
};
let source = config.build(cx).await.unwrap();
Expand Down Expand Up @@ -1207,10 +1223,18 @@ mod tests {
);
}

#[tokio::test]
async fn emits_cursor() {
let received = run_journal(Matches::new(), Matches::new(), None, true).await;
assert_eq!(cursor(&received[0]), Value::Bytes("1".into()));
assert_eq!(cursor(&received[3]), Value::Bytes("4".into()));
assert_eq!(cursor(&received[7]), Value::Bytes("8".into()));
}

#[tokio::test]
async fn includes_matches() {
let matches = create_matches(vec![("PRIORITY", "ERR")]);
let received = run_journal(matches, HashMap::new(), None).await;
let received = run_journal(matches, HashMap::new(), None, false).await;
assert_eq!(received.len(), 2);
assert_eq!(
message(&received[0]),
Expand All @@ -1227,7 +1251,7 @@ mod tests {
#[tokio::test]
async fn includes_kernel() {
let matches = create_matches(vec![("_TRANSPORT", "kernel")]);
let received = run_journal(matches, HashMap::new(), None).await;
let received = run_journal(matches, HashMap::new(), None, false).await;
assert_eq!(received.len(), 1);
assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140006000));
assert_eq!(message(&received[0]), Value::Bytes("audit log".into()));
Expand All @@ -1236,7 +1260,7 @@ mod tests {
#[tokio::test]
async fn excludes_matches() {
let matches = create_matches(vec![("PRIORITY", "INFO"), ("PRIORITY", "DEBUG")]);
let received = run_journal(HashMap::new(), matches, None).await;
let received = run_journal(HashMap::new(), matches, None, false).await;
assert_eq!(received.len(), 5);
assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140003000));
assert_eq!(timestamp(&received[1]), value_ts(1578529839, 140004000));
Expand Down Expand Up @@ -1515,6 +1539,10 @@ mod tests {
event.as_log()[log_schema().timestamp_key().unwrap().to_string()].clone()
}

fn cursor(event: &Event) -> Value {
event.as_log()[CURSOR].clone()
}

fn value_ts(secs: i64, usecs: u32) -> Value {
Value::Timestamp(
chrono::Utc
Expand Down
10 changes: 10 additions & 0 deletions website/cue/reference/components/sources/base/journald.cue
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,16 @@ base: components: sources: journald: configuration: {
required: false
type: string: examples: ["/var/lib/vector"]
}
emit_cursor: {
description: """
Whether to emit the [__CURSOR field][cursor]. See also [sd_journal_get_cursor][get_cursor].

[cursor]: https://www.freedesktop.org/software/systemd/man/latest/systemd.journal-fields.html#Address%20Fields
[get_cursor]: https://www.freedesktop.org/software/systemd/man/latest/sd_journal_get_cursor.html
"""
required: false
type: bool: default: false
}
exclude_matches: {
description: """
A list of sets of field/value pairs that, if any are present in a journal entry,
Expand Down