1use std::{
2 collections::{HashMap, HashSet},
3 io::SeekFrom,
4 path::PathBuf,
5 process::Stdio,
6 str::FromStr,
7 sync::{Arc, LazyLock},
8 time::Duration,
9};
10
11use bytes::Bytes;
12use chrono::{TimeZone, Utc};
13use futures::{StreamExt, poll, stream::BoxStream, task::Poll};
14use nix::{
15 sys::signal::{Signal, kill},
16 unistd::Pid,
17};
18use serde_json::{Error as JsonError, Value as JsonValue};
19use snafu::{ResultExt, Snafu};
20use tokio::{
21 fs::{File, OpenOptions},
22 io::{self, AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
23 process::{Child, Command},
24 sync::{Mutex, MutexGuard},
25 time::sleep,
26};
27use tokio_util::codec::FramedRead;
28use vector_lib::{
29 EstimatedJsonEncodedSizeOf,
30 codecs::{CharacterDelimitedDecoder, decoding::BoxedFramingError},
31 config::{LegacyKey, LogNamespace},
32 configurable::configurable_component,
33 finalizer::OrderedFinalizer,
34 internal_event::{
35 ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol, Registered,
36 },
37 lookup::{metadata_path, owned_value_path, path},
38 schema::Definition,
39};
40use vrl::{
41 event_path,
42 value::{Kind, Value, kind::Collection},
43};
44
45use crate::{
46 SourceSender,
47 config::{
48 DataType, SourceAcknowledgementsConfig, SourceConfig, SourceContext, SourceOutput,
49 log_schema,
50 },
51 event::{BatchNotifier, BatchStatus, BatchStatusReceiver, LogEvent},
52 internal_events::{
53 EventsReceived, JournaldCheckpointFileOpenError, JournaldCheckpointSetError,
54 JournaldInvalidRecordError, JournaldReadError, JournaldStartJournalctlError,
55 StreamClosedError,
56 },
57 serde::bool_or_struct,
58 shutdown::ShutdownSignal,
59};
60
61const BATCH_TIMEOUT: Duration = Duration::from_millis(10);
62
63const CHECKPOINT_FILENAME: &str = "checkpoint.txt";
64const CURSOR: &str = "__CURSOR";
65const HOSTNAME: &str = "_HOSTNAME";
66const MESSAGE: &str = "MESSAGE";
67const SYSTEMD_UNIT: &str = "_SYSTEMD_UNIT";
68const SOURCE_TIMESTAMP: &str = "_SOURCE_REALTIME_TIMESTAMP";
69const RECEIVED_TIMESTAMP: &str = "__REALTIME_TIMESTAMP";
70
71const BACKOFF_DURATION: Duration = Duration::from_secs(1);
72
73static JOURNALCTL: LazyLock<PathBuf> = LazyLock::new(|| "journalctl".into());
74
75#[derive(Debug, Snafu)]
76enum BuildError {
77 #[snafu(display("journalctl failed to execute: {}", source))]
78 JournalctlSpawn { source: io::Error },
79 #[snafu(display("failed to parse output of `journalctl --version`: {:?}", output))]
80 JournalctlParseVersion { output: String },
81 #[snafu(display(
82 "The unit {:?} is duplicated in both include_units and exclude_units",
83 unit
84 ))]
85 DuplicatedUnit { unit: String },
86 #[snafu(display(
87 "The Journal field/value pair {:?}:{:?} is duplicated in both include_matches and exclude_matches.",
88 field,
89 value,
90 ))]
91 DuplicatedMatches { field: String, value: String },
92 #[snafu(display(
93 "`current_boot_only: false` not supported for systemd versions 250 through 257 (got {}).",
94 systemd_version
95 ))]
96 AllBootsNotSupported { systemd_version: u32 },
97}
98
99type Matches = HashMap<String, HashSet<String>>;
100
101#[configurable_component(source("journald", "Collect logs from JournalD."))]
103#[derive(Clone, Debug)]
104#[serde(deny_unknown_fields)]
105pub struct JournaldConfig {
106 #[serde(default)]
108 pub since_now: bool,
109
110 #[serde(default = "crate::serde::default_true")]
112 pub current_boot_only: bool,
113
114 #[serde(default)]
124 #[configurable(metadata(docs::examples = "ntpd", docs::examples = "sysinit.target"))]
125 pub include_units: Vec<String>,
126
127 #[serde(default)]
132 #[configurable(metadata(docs::examples = "badservice", docs::examples = "sysinit.target"))]
133 pub exclude_units: Vec<String>,
134
135 #[serde(default)]
141 #[configurable(metadata(
142 docs::additional_props_description = "The set of field values to match in journal entries that are to be included."
143 ))]
144 #[configurable(metadata(docs::examples = "matches_examples()"))]
145 pub include_matches: Matches,
146
147 #[serde(default)]
152 #[configurable(metadata(
153 docs::additional_props_description = "The set of field values to match in journal entries that are to be excluded."
154 ))]
155 #[configurable(metadata(docs::examples = "matches_examples()"))]
156 pub exclude_matches: Matches,
157
158 #[serde(default)]
167 #[configurable(metadata(docs::examples = "/var/lib/vector"))]
168 #[configurable(metadata(docs::human_name = "Data Directory"))]
169 pub data_dir: Option<PathBuf>,
170
171 #[serde(default)]
175 #[configurable(metadata(docs::examples = "--merge"))]
176 pub extra_args: Vec<String>,
177
178 #[serde(default = "default_batch_size")]
182 #[configurable(metadata(docs::type_unit = "events"))]
183 pub batch_size: usize,
184
185 #[serde(default)]
189 pub journalctl_path: Option<PathBuf>,
190
191 #[serde(default)]
195 pub journal_directory: Option<PathBuf>,
196
197 #[serde(default)]
205 pub journal_namespace: Option<String>,
206
207 #[configurable(derived)]
208 #[serde(default, deserialize_with = "bool_or_struct")]
209 acknowledgements: SourceAcknowledgementsConfig,
210
211 #[serde(default)]
215 #[configurable(
216 deprecated = "This option has been deprecated, use the `remap` transform and `to_syslog_level` function instead."
217 )]
218 remap_priority: bool,
219
220 #[configurable(metadata(docs::hidden))]
222 #[serde(default)]
223 log_namespace: Option<bool>,
224
225 #[serde(default = "crate::serde::default_false")]
230 emit_cursor: bool,
231}
232
233const fn default_batch_size() -> usize {
234 16
235}
236
237fn matches_examples() -> HashMap<String, Vec<String>> {
238 HashMap::<_, _>::from_iter([
239 (
240 "_SYSTEMD_UNIT".to_owned(),
241 vec!["sshd.service".to_owned(), "ntpd.service".to_owned()],
242 ),
243 ("_TRANSPORT".to_owned(), vec!["kernel".to_owned()]),
244 ])
245}
246
247impl JournaldConfig {
248 fn merged_include_matches(&self) -> Matches {
249 Self::merge_units(&self.include_matches, &self.include_units)
250 }
251
252 fn merged_exclude_matches(&self) -> Matches {
253 Self::merge_units(&self.exclude_matches, &self.exclude_units)
254 }
255
256 fn merge_units(matches: &Matches, units: &[String]) -> Matches {
257 let mut matches = matches.clone();
258 for unit in units {
259 let entry = matches.entry(String::from(SYSTEMD_UNIT));
260 entry.or_default().insert(fixup_unit(unit));
261 }
262 matches
263 }
264
265 fn schema_definition(&self, log_namespace: LogNamespace) -> Definition {
267 let schema_definition = match log_namespace {
268 LogNamespace::Vector => Definition::new_with_default_metadata(
269 Kind::bytes().or_null(),
270 [LogNamespace::Vector],
271 ),
272 LogNamespace::Legacy => Definition::new_with_default_metadata(
273 Kind::object(Collection::empty()),
274 [LogNamespace::Legacy],
275 ),
276 };
277
278 let mut schema_definition = schema_definition
279 .with_standard_vector_source_metadata()
280 .with_source_metadata(
282 JournaldConfig::NAME,
283 None,
284 &owned_value_path!("metadata"),
285 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
286 None,
287 )
288 .with_source_metadata(
289 JournaldConfig::NAME,
290 None,
291 &owned_value_path!("timestamp"),
292 Kind::timestamp().or_undefined(),
293 Some("timestamp"),
294 )
295 .with_source_metadata(
296 JournaldConfig::NAME,
297 log_schema().host_key().cloned().map(LegacyKey::Overwrite),
298 &owned_value_path!("host"),
299 Kind::bytes().or_undefined(),
300 Some("host"),
301 );
302
303 if log_namespace == LogNamespace::Legacy {
305 schema_definition = schema_definition.unknown_fields(Kind::bytes());
306 }
307
308 schema_definition
309 }
310}
311
312impl Default for JournaldConfig {
313 fn default() -> Self {
314 Self {
315 since_now: false,
316 current_boot_only: true,
317 include_units: vec![],
318 exclude_units: vec![],
319 include_matches: Default::default(),
320 exclude_matches: Default::default(),
321 data_dir: None,
322 batch_size: default_batch_size(),
323 journalctl_path: None,
324 journal_directory: None,
325 journal_namespace: None,
326 extra_args: vec![],
327 acknowledgements: Default::default(),
328 remap_priority: false,
329 log_namespace: None,
330 emit_cursor: false,
331 }
332 }
333}
334
335impl_generate_config_from_default!(JournaldConfig);
336
337type Record = HashMap<String, String>;
338
339#[async_trait::async_trait]
340#[typetag::serde(name = "journald")]
341impl SourceConfig for JournaldConfig {
342 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
343 if self.remap_priority {
344 warn!(
345 "DEPRECATION, option `remap_priority` has been deprecated. Please use the `remap` transform and function `to_syslog_level` instead."
346 );
347 }
348
349 let data_dir = cx
350 .globals
351 .resolve_and_make_data_subdir(self.data_dir.as_ref(), cx.key.id())?;
353
354 if let Some(unit) = self
355 .include_units
356 .iter()
357 .find(|unit| self.exclude_units.contains(unit))
358 {
359 let unit = unit.into();
360 return Err(BuildError::DuplicatedUnit { unit }.into());
361 }
362
363 let include_matches = self.merged_include_matches();
364 let exclude_matches = self.merged_exclude_matches();
365
366 if let Some((field, value)) = find_duplicate_match(&include_matches, &exclude_matches) {
367 return Err(BuildError::DuplicatedMatches { field, value }.into());
368 }
369
370 let mut checkpoint_path = data_dir;
371 checkpoint_path.push(CHECKPOINT_FILENAME);
372
373 let journalctl_path = self
374 .journalctl_path
375 .clone()
376 .unwrap_or_else(|| JOURNALCTL.clone());
377
378 let systemd_version = get_systemd_version_from_journalctl(&journalctl_path).await?;
379
380 if !self.current_boot_only && (250..=257).contains(&systemd_version) {
381 return Err(BuildError::AllBootsNotSupported { systemd_version }.into());
383 }
384
385 let starter = StartJournalctl::new(
386 journalctl_path,
387 systemd_version,
388 self.journal_directory.clone(),
389 self.journal_namespace.clone(),
390 self.current_boot_only,
391 self.since_now,
392 self.extra_args.clone(),
393 );
394
395 let batch_size = self.batch_size;
396 let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
397 let log_namespace = cx.log_namespace(self.log_namespace);
398
399 Ok(Box::pin(
400 JournaldSource {
401 include_matches,
402 exclude_matches,
403 checkpoint_path,
404 batch_size,
405 remap_priority: self.remap_priority,
406 out: cx.out,
407 acknowledgements,
408 starter,
409 log_namespace,
410 emit_cursor: self.emit_cursor,
411 }
412 .run_shutdown(cx.shutdown),
413 ))
414 }
415
416 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
417 let schema_definition =
418 self.schema_definition(global_log_namespace.merge(self.log_namespace));
419
420 vec![SourceOutput::new_maybe_logs(
421 DataType::Log,
422 schema_definition,
423 )]
424 }
425
426 fn can_acknowledge(&self) -> bool {
427 true
428 }
429}
430
431struct JournaldSource {
432 include_matches: Matches,
433 exclude_matches: Matches,
434 checkpoint_path: PathBuf,
435 batch_size: usize,
436 remap_priority: bool,
437 out: SourceSender,
438 acknowledgements: bool,
439 starter: StartJournalctl,
440 log_namespace: LogNamespace,
441 emit_cursor: bool,
442}
443
444impl JournaldSource {
445 async fn run_shutdown(self, shutdown: ShutdownSignal) -> Result<(), ()> {
446 let checkpointer = StatefulCheckpointer::new(self.checkpoint_path.clone())
447 .await
448 .map_err(|error| {
449 emit!(JournaldCheckpointFileOpenError {
450 error,
451 path: self
452 .checkpoint_path
453 .to_str()
454 .unwrap_or("unknown")
455 .to_string(),
456 });
457 })?;
458
459 let checkpointer = SharedCheckpointer::new(checkpointer);
460 let finalizer = Finalizer::new(
461 self.acknowledgements,
462 checkpointer.clone(),
463 shutdown.clone(),
464 );
465
466 self.run(checkpointer, finalizer, shutdown).await;
467
468 Ok(())
469 }
470
471 async fn run(
472 mut self,
473 checkpointer: SharedCheckpointer,
474 finalizer: Finalizer,
475 mut shutdown: ShutdownSignal,
476 ) {
477 loop {
478 if matches!(poll!(&mut shutdown), Poll::Ready(_)) {
479 break;
480 }
481
482 info!("Starting journalctl.");
483 let cursor = checkpointer.lock().await.cursor.clone();
484 match self.starter.start(cursor.as_deref()) {
485 Ok((stdout_stream, stderr_stream, running)) => {
486 if !self
487 .run_stream(stdout_stream, stderr_stream, &finalizer, shutdown.clone())
488 .await
489 {
490 return;
491 }
492 drop(running);
494 }
495 Err(error) => {
496 emit!(JournaldStartJournalctlError { error });
497 }
498 }
499
500 tokio::select! {
503 _ = &mut shutdown => break,
504 _ = sleep(BACKOFF_DURATION) => (),
505 }
506 }
507 }
508
509 async fn run_stream<'a>(
512 &'a mut self,
513 mut stdout_stream: JournalStream,
514 stderr_stream: JournalStream,
515 finalizer: &'a Finalizer,
516 mut shutdown: ShutdownSignal,
517 ) -> bool {
518 let bytes_received = register!(BytesReceived::from(Protocol::from("journald")));
519 let events_received = register!(EventsReceived);
520
521 let stderr_handler = tokio::spawn(Self::handle_stderr(stderr_stream));
523
524 let batch_size = self.batch_size;
525 let result = loop {
526 let mut batch = Batch::new(self);
527
528 while batch.events.is_empty() {
531 let item = tokio::select! {
532 _ = &mut shutdown => {
533 stderr_handler.abort();
534 return false;
535 },
536 item = stdout_stream.next() => item,
537 };
538 if !batch.handle_next(item) {
539 stderr_handler.abort();
540 return true;
541 }
542 }
543
544 let timeout = tokio::time::sleep(BATCH_TIMEOUT);
545 tokio::pin!(timeout);
546
547 for _ in 1..batch_size {
548 tokio::select! {
549 _ = &mut timeout => break,
550 result = stdout_stream.next() => if !batch.handle_next(result) {
551 break;
552 }
553 }
554 }
555 if let Some(x) = batch
556 .finish(finalizer, &bytes_received, &events_received)
557 .await
558 {
559 break x;
560 }
561 };
562
563 stderr_handler.abort();
564 result
565 }
566
567 async fn handle_stderr(mut stderr_stream: JournalStream) {
569 while let Some(result) = stderr_stream.next().await {
570 match result {
571 Ok(line) => {
572 let line_str = String::from_utf8_lossy(&line);
573 let trimmed = line_str.trim();
574 if !trimmed.is_empty() {
575 warn!("Warning journalctl stderr: {trimmed}");
576 }
577 }
578 Err(err) => {
579 error!("Error reading journalctl stderr: {err}");
580 break;
581 }
582 }
583 }
584 }
585}
586
587struct Batch<'a> {
588 events: Vec<LogEvent>,
589 record_size: usize,
590 exiting: Option<bool>,
591 batch: Option<BatchNotifier>,
592 receiver: Option<BatchStatusReceiver>,
593 source: &'a mut JournaldSource,
594 cursor: Option<String>,
595}
596
597impl<'a> Batch<'a> {
598 fn new(source: &'a mut JournaldSource) -> Self {
599 let (batch, receiver) = BatchNotifier::maybe_new_with_receiver(source.acknowledgements);
600 Self {
601 events: Vec::new(),
602 record_size: 0,
603 exiting: None,
604 batch,
605 receiver,
606 source,
607 cursor: None,
608 }
609 }
610
611 fn handle_next(&mut self, result: Option<Result<Bytes, BoxedFramingError>>) -> bool {
612 match result {
613 None => {
614 warn!("Journalctl process stopped.");
615 self.exiting = Some(true);
616 false
617 }
618 Some(Err(error)) => {
619 emit!(JournaldReadError { error });
620 false
621 }
622 Some(Ok(bytes)) => {
623 match decode_record(&bytes, self.source.remap_priority) {
624 Ok(mut record) => {
625 if self.source.emit_cursor {
626 if let Some(tmp) = record.get(CURSOR) {
627 self.cursor = Some(tmp.clone());
628 }
629 } else if let Some(tmp) = record.remove(CURSOR) {
630 self.cursor = Some(tmp);
631 }
632
633 if !filter_matches(
634 &record,
635 &self.source.include_matches,
636 &self.source.exclude_matches,
637 ) {
638 self.record_size += bytes.len();
639
640 let mut event = create_log_event_from_record(
641 record,
642 &self.batch,
643 self.source.log_namespace,
644 );
645
646 enrich_log_event(&mut event, self.source.log_namespace);
647
648 self.events.push(event);
649 }
650 }
651 Err(error) => {
652 emit!(JournaldInvalidRecordError {
653 error,
654 text: String::from_utf8_lossy(&bytes).into_owned()
655 });
656 }
657 }
658 true
659 }
660 }
661 }
662
663 async fn finish(
664 mut self,
665 finalizer: &Finalizer,
666 bytes_received: &'a Registered<BytesReceived>,
667 events_received: &'a Registered<EventsReceived>,
668 ) -> Option<bool> {
669 drop(self.batch);
670
671 if self.record_size > 0 {
672 bytes_received.emit(ByteSize(self.record_size));
673 }
674
675 if !self.events.is_empty() {
676 let count = self.events.len();
677 let byte_size = self.events.estimated_json_encoded_size_of();
678 events_received.emit(CountByteSize(count, byte_size));
679
680 match self.source.out.send_batch(self.events).await {
681 Ok(_) => {
682 if let Some(cursor) = self.cursor {
683 finalizer.finalize(cursor, self.receiver).await;
684 }
685 }
686 Err(_) => {
687 emit!(StreamClosedError { count });
688 self.exiting = Some(false);
690 }
691 }
692 }
693 self.exiting
694 }
695}
696
697type JournalStream = BoxStream<'static, Result<Bytes, BoxedFramingError>>;
698
699struct StartJournalctl {
700 path: PathBuf,
701 systemd_version: u32,
702 journal_dir: Option<PathBuf>,
703 journal_namespace: Option<String>,
704 current_boot_only: bool,
705 since_now: bool,
706 extra_args: Vec<String>,
707}
708
709impl StartJournalctl {
710 const fn new(
711 path: PathBuf,
712 systemd_version: u32,
713 journal_dir: Option<PathBuf>,
714 journal_namespace: Option<String>,
715 current_boot_only: bool,
716 since_now: bool,
717 extra_args: Vec<String>,
718 ) -> Self {
719 Self {
720 path,
721 systemd_version,
722 journal_dir,
723 journal_namespace,
724 current_boot_only,
725 since_now,
726 extra_args,
727 }
728 }
729
730 fn make_command(&self, checkpoint: Option<&str>) -> Command {
731 let mut command = Command::new(&self.path);
732 command.stdout(Stdio::piped());
733 command.stderr(Stdio::piped());
734 command.arg("--follow");
735 command.arg("--all");
736 command.arg("--show-cursor");
737 command.arg("--output=json");
738
739 if let Some(dir) = &self.journal_dir {
740 command.arg(format!("--directory={}", dir.display()));
741 }
742
743 if let Some(namespace) = &self.journal_namespace {
744 command.arg(format!("--namespace={namespace}"));
745 }
746
747 if self.current_boot_only {
752 if self.systemd_version < 250 {
753 command.arg("--boot");
754 }
755 } else if self.systemd_version >= 258 {
756 command.arg("--boot=all");
757 }
758
759 if let Some(cursor) = checkpoint {
760 command.arg(format!("--after-cursor={cursor}"));
761 } else if self.since_now {
762 command.arg("--since=now");
763 } else {
764 command.arg("--since=2000-01-01");
766 }
767
768 if !self.extra_args.is_empty() {
769 command.args(&self.extra_args);
770 }
771
772 command
773 }
774
775 fn start(
776 &mut self,
777 checkpoint: Option<&str>,
778 ) -> crate::Result<(JournalStream, JournalStream, RunningJournalctl)> {
779 let mut command = self.make_command(checkpoint);
780
781 let mut child = command.spawn().context(JournalctlSpawnSnafu)?;
782
783 let stdout_stream = FramedRead::new(
784 child.stdout.take().unwrap(),
785 CharacterDelimitedDecoder::new(b'\n'),
786 );
787
788 let stderr = child.stderr.take().unwrap();
789 let stderr_stream = FramedRead::new(stderr, CharacterDelimitedDecoder::new(b'\n'));
790
791 Ok((
792 stdout_stream.boxed(),
793 stderr_stream.boxed(),
794 RunningJournalctl(child),
795 ))
796 }
797}
798
799struct RunningJournalctl(Child);
800
801impl Drop for RunningJournalctl {
802 fn drop(&mut self) {
803 if let Some(pid) = self.0.id().and_then(|pid| pid.try_into().ok()) {
804 _ = kill(Pid::from_raw(pid), Signal::SIGTERM);
805 }
806 }
807}
808
809async fn get_systemd_version_from_journalctl(journalctl_path: &PathBuf) -> crate::Result<u32> {
810 let stdout = Command::new(journalctl_path)
811 .arg("--version")
812 .output()
813 .await
814 .context(JournalctlSpawnSnafu)?
815 .stdout;
816
817 let stdout = String::from_utf8_lossy(&stdout);
819 Ok(stdout
820 .split_whitespace()
821 .nth(1)
822 .and_then(|s| s.parse::<u32>().ok())
823 .ok_or_else(|| BuildError::JournalctlParseVersion {
824 output: {
825 let cutoff = 40;
826 let length = stdout.chars().count();
827 format!(
828 "{}{}",
829 stdout.chars().take(cutoff).collect::<String>(),
830 if length > cutoff {
831 format!(" ..{} more char(s)", length - cutoff)
832 } else {
833 "".to_string()
834 }
835 )
836 },
837 })?)
838}
839
840fn enrich_log_event(log: &mut LogEvent, log_namespace: LogNamespace) {
841 match log_namespace {
842 LogNamespace::Vector => {
843 if let Some(host) = log
844 .get(metadata_path!(JournaldConfig::NAME, "metadata"))
845 .and_then(|meta| meta.get(HOSTNAME))
846 {
847 log.insert(metadata_path!(JournaldConfig::NAME, "host"), host.clone());
848 }
849 }
850 LogNamespace::Legacy => {
851 if let Some(host) = log.remove(event_path!(HOSTNAME)) {
852 log_namespace.insert_source_metadata(
853 JournaldConfig::NAME,
854 log,
855 log_schema().host_key().map(LegacyKey::Overwrite),
856 path!("host"),
857 host,
858 );
859 }
860 }
861 }
862
863 let timestamp_value = match log_namespace {
865 LogNamespace::Vector => log
866 .get(metadata_path!(JournaldConfig::NAME, "metadata"))
867 .and_then(|meta| {
868 meta.get(SOURCE_TIMESTAMP)
869 .or_else(|| meta.get(RECEIVED_TIMESTAMP))
870 }),
871 LogNamespace::Legacy => log
872 .get(event_path!(SOURCE_TIMESTAMP))
873 .or_else(|| log.get(event_path!(RECEIVED_TIMESTAMP))),
874 };
875
876 let timestamp = timestamp_value
877 .filter(|&ts| ts.is_bytes())
878 .and_then(|ts| ts.as_str().unwrap().parse::<u64>().ok())
879 .map(|ts| {
880 chrono::Utc
881 .timestamp_opt((ts / 1_000_000) as i64, (ts % 1_000_000) as u32 * 1_000)
882 .single()
883 .expect("invalid timestamp")
884 });
885
886 match log_namespace {
888 LogNamespace::Vector => {
889 log.insert(metadata_path!("vector", "ingest_timestamp"), Utc::now());
890
891 if let Some(ts) = timestamp {
892 log.insert(metadata_path!(JournaldConfig::NAME, "timestamp"), ts);
893 }
894 }
895 LogNamespace::Legacy => {
896 if let Some(ts) = timestamp {
897 log.maybe_insert(log_schema().timestamp_key_target_path(), ts);
898 }
899 }
900 }
901
902 log_namespace.insert_vector_metadata(
904 log,
905 log_schema().source_type_key(),
906 path!("source_type"),
907 JournaldConfig::NAME,
908 );
909}
910
911fn create_log_event_from_record(
912 mut record: Record,
913 batch: &Option<BatchNotifier>,
914 log_namespace: LogNamespace,
915) -> LogEvent {
916 match log_namespace {
917 LogNamespace::Vector => {
918 let message_value = record
919 .remove(MESSAGE)
920 .map(|msg| Value::Bytes(Bytes::from(msg)))
921 .unwrap_or(Value::Null);
922
923 let mut log = LogEvent::from(message_value).with_batch_notifier_option(batch);
924
925 record.iter().for_each(|(key, value)| {
927 log.metadata_mut()
928 .value_mut()
929 .insert(path!(JournaldConfig::NAME, "metadata", key), value.as_str());
930 });
931
932 log
933 }
934 LogNamespace::Legacy => {
935 let mut log = LogEvent::from_iter(record).with_batch_notifier_option(batch);
936
937 if let Some(message) = log.remove(event_path!(MESSAGE)) {
938 log.maybe_insert(log_schema().message_key_target_path(), message);
939 }
940
941 log
942 }
943 }
944}
945
946fn fixup_unit(unit: &str) -> String {
949 if unit.contains('.') {
950 unit.into()
951 } else {
952 format!("{unit}.service")
953 }
954}
955
956fn decode_record(line: &[u8], remap: bool) -> Result<Record, JsonError> {
957 let mut record = serde_json::from_str::<JsonValue>(&String::from_utf8_lossy(line))?;
958 if let Some(record) = record.as_object_mut() {
961 for (_, value) in record.iter_mut().filter(|(_, v)| v.is_array()) {
962 *value = decode_array(value.as_array().expect("already validated"));
963 }
964 }
965 if remap {
966 record.get_mut("PRIORITY").map(remap_priority);
967 }
968 serde_json::from_value(record)
969}
970
971fn decode_array(array: &[JsonValue]) -> JsonValue {
972 decode_array_as_bytes(array).unwrap_or_else(|| {
973 let ser = serde_json::to_string(array).expect("already deserialized");
974 JsonValue::String(ser)
975 })
976}
977
978fn decode_array_as_bytes(array: &[JsonValue]) -> Option<JsonValue> {
979 array
983 .iter()
984 .map(|item| {
985 item.as_u64().and_then(|num| match num {
986 num if num <= u8::MAX as u64 => Some(num as u8),
987 _ => None,
988 })
989 })
990 .collect::<Option<Vec<u8>>>()
991 .map(|array| String::from_utf8_lossy(&array).into())
992}
993
994fn remap_priority(priority: &mut JsonValue) {
995 if let Some(num) = priority.as_str().and_then(|s| usize::from_str(s).ok()) {
996 let text = match num {
997 0 => "EMERG",
998 1 => "ALERT",
999 2 => "CRIT",
1000 3 => "ERR",
1001 4 => "WARNING",
1002 5 => "NOTICE",
1003 6 => "INFO",
1004 7 => "DEBUG",
1005 _ => "UNKNOWN",
1006 };
1007 *priority = JsonValue::String(text.into());
1008 }
1009}
1010
1011fn filter_matches(record: &Record, includes: &Matches, excludes: &Matches) -> bool {
1012 match (includes.is_empty(), excludes.is_empty()) {
1013 (true, true) => false,
1014 (false, true) => !contains_match(record, includes),
1015 (true, false) => contains_match(record, excludes),
1016 (false, false) => !contains_match(record, includes) || contains_match(record, excludes),
1017 }
1018}
1019
1020fn contains_match(record: &Record, matches: &Matches) -> bool {
1021 let f = move |(field, value)| {
1022 matches
1023 .get(field)
1024 .map(|x| x.contains(value))
1025 .unwrap_or(false)
1026 };
1027 record.iter().any(f)
1028}
1029
1030fn find_duplicate_match(a_matches: &Matches, b_matches: &Matches) -> Option<(String, String)> {
1031 for (a_key, a_values) in a_matches {
1032 if let Some(b_values) = b_matches.get(a_key.as_str()) {
1033 for (a, b) in a_values
1034 .iter()
1035 .flat_map(|x| std::iter::repeat(x).zip(b_values.iter()))
1036 {
1037 if a == b {
1038 return Some((a_key.into(), b.into()));
1039 }
1040 }
1041 }
1042 }
1043 None
1044}
1045
1046enum Finalizer {
1047 Sync(SharedCheckpointer),
1048 Async(OrderedFinalizer<String>),
1049}
1050
1051impl Finalizer {
1052 fn new(
1053 acknowledgements: bool,
1054 checkpointer: SharedCheckpointer,
1055 shutdown: ShutdownSignal,
1056 ) -> Self {
1057 if acknowledgements {
1058 let (finalizer, mut ack_stream) = OrderedFinalizer::new(Some(shutdown));
1059 tokio::spawn(async move {
1060 while let Some((status, cursor)) = ack_stream.next().await {
1061 if status == BatchStatus::Delivered {
1062 checkpointer.lock().await.set(cursor).await;
1063 }
1064 }
1065 });
1066 Self::Async(finalizer)
1067 } else {
1068 Self::Sync(checkpointer)
1069 }
1070 }
1071
1072 async fn finalize(&self, cursor: String, receiver: Option<BatchStatusReceiver>) {
1073 match (self, receiver) {
1074 (Self::Sync(checkpointer), None) => checkpointer.lock().await.set(cursor).await,
1075 (Self::Async(finalizer), Some(receiver)) => finalizer.add(cursor, receiver),
1076 _ => {
1077 unreachable!("Cannot have async finalization without a receiver in journald source")
1078 }
1079 }
1080 }
1081}
1082
1083struct Checkpointer {
1084 file: File,
1085 filename: PathBuf,
1086}
1087
1088impl Checkpointer {
1089 async fn new(filename: PathBuf) -> Result<Self, io::Error> {
1090 let file = OpenOptions::new()
1091 .read(true)
1092 .write(true)
1093 .create(true)
1094 .truncate(false)
1095 .open(&filename)
1096 .await?;
1097 Ok(Checkpointer { file, filename })
1098 }
1099
1100 async fn set(&mut self, token: &str) -> Result<(), io::Error> {
1101 self.file.seek(SeekFrom::Start(0)).await?;
1102 self.file.write_all(format!("{token}\n").as_bytes()).await
1103 }
1104
1105 async fn get(&mut self) -> Result<Option<String>, io::Error> {
1106 let mut buf = Vec::<u8>::new();
1107 self.file.seek(SeekFrom::Start(0)).await?;
1108 self.file.read_to_end(&mut buf).await?;
1109 match buf.len() {
1110 0 => Ok(None),
1111 _ => {
1112 let text = String::from_utf8_lossy(&buf);
1113 match text.find('\n') {
1114 Some(nl) => Ok(Some(String::from(&text[..nl]))),
1115 None => Ok(None), }
1117 }
1118 }
1119 }
1120}
1121
1122struct StatefulCheckpointer {
1123 checkpointer: Checkpointer,
1124 cursor: Option<String>,
1125}
1126
1127impl StatefulCheckpointer {
1128 async fn new(filename: PathBuf) -> Result<Self, io::Error> {
1129 let mut checkpointer = Checkpointer::new(filename).await?;
1130 let cursor = checkpointer.get().await?;
1131 Ok(Self {
1132 checkpointer,
1133 cursor,
1134 })
1135 }
1136
1137 async fn set(&mut self, token: impl Into<String>) {
1138 let token = token.into();
1139 if let Err(error) = self.checkpointer.set(&token).await {
1140 emit!(JournaldCheckpointSetError {
1141 error,
1142 filename: self
1143 .checkpointer
1144 .filename
1145 .to_str()
1146 .unwrap_or("unknown")
1147 .to_string(),
1148 });
1149 }
1150 self.cursor = Some(token);
1151 }
1152}
1153
1154#[derive(Clone)]
1155struct SharedCheckpointer(Arc<Mutex<StatefulCheckpointer>>);
1156
1157impl SharedCheckpointer {
1158 fn new(c: StatefulCheckpointer) -> Self {
1159 Self(Arc::new(Mutex::new(c)))
1160 }
1161
1162 async fn lock(&self) -> MutexGuard<'_, StatefulCheckpointer> {
1163 self.0.lock().await
1164 }
1165}
1166
1167#[cfg(test)]
1168mod checkpointer_tests {
1169 use tempfile::tempdir;
1170 use tokio::fs::read_to_string;
1171
1172 use super::*;
1173
1174 #[test]
1175 fn generate_config() {
1176 crate::test_util::test_generate_config::<JournaldConfig>();
1177 }
1178
1179 #[tokio::test]
1180 async fn journald_checkpointer_works() {
1181 let tempdir = tempdir().unwrap();
1182 let mut filename = tempdir.path().to_path_buf();
1183 filename.push(CHECKPOINT_FILENAME);
1184 let mut checkpointer = Checkpointer::new(filename.clone())
1185 .await
1186 .expect("Creating checkpointer failed!");
1187
1188 assert!(checkpointer.get().await.unwrap().is_none());
1189
1190 checkpointer
1191 .set("first test")
1192 .await
1193 .expect("Setting checkpoint failed");
1194 assert_eq!(checkpointer.get().await.unwrap().unwrap(), "first test");
1195 let contents = read_to_string(filename.clone())
1196 .await
1197 .unwrap_or_else(|_| panic!("Failed to read: {filename:?}"));
1198 assert!(contents.starts_with("first test\n"));
1199
1200 checkpointer
1201 .set("second")
1202 .await
1203 .expect("Setting checkpoint failed");
1204 assert_eq!(checkpointer.get().await.unwrap().unwrap(), "second");
1205 let contents = read_to_string(filename.clone())
1206 .await
1207 .unwrap_or_else(|_| panic!("Failed to read: {filename:?}"));
1208 assert!(contents.starts_with("second\n"));
1209 }
1210}
1211
1212#[cfg(test)]
1213mod tests {
1214 use std::{fs, path::Path};
1215
1216 use tempfile::tempdir;
1217 use tokio::time::{Duration, Instant, sleep, timeout};
1218 use vrl::value::{Value, kind::Collection};
1219
1220 use super::*;
1221 use crate::{
1222 config::ComponentKey,
1223 event::{Event, EventStatus},
1224 test_util::components::assert_source_compliance,
1225 };
1226
1227 const TEST_COMPONENT: &str = "journald-test";
1228 const TEST_JOURNALCTL: &str = "tests/data/journalctl";
1229
1230 async fn run_with_units(iunits: &[&str], xunits: &[&str], cursor: Option<&str>) -> Vec<Event> {
1231 let include_matches = create_unit_matches(iunits.to_vec());
1232 let exclude_matches = create_unit_matches(xunits.to_vec());
1233 run_journal(include_matches, exclude_matches, cursor, false).await
1234 }
1235
1236 async fn run_journal(
1237 include_matches: Matches,
1238 exclude_matches: Matches,
1239 checkpoint: Option<&str>,
1240 emit_cursor: bool,
1241 ) -> Vec<Event> {
1242 assert_source_compliance(&["protocol"], async move {
1243 let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Delivered);
1244
1245 let tempdir = tempdir().unwrap();
1246 let tempdir = tempdir.path().to_path_buf();
1247
1248 if let Some(cursor) = checkpoint {
1249 let mut checkpoint_path = tempdir.clone();
1250 checkpoint_path.push(TEST_COMPONENT);
1251 fs::create_dir(&checkpoint_path).unwrap();
1252 checkpoint_path.push(CHECKPOINT_FILENAME);
1253
1254 let mut checkpointer = Checkpointer::new(checkpoint_path.clone())
1255 .await
1256 .expect("Creating checkpointer failed!");
1257
1258 checkpointer
1259 .set(cursor)
1260 .await
1261 .expect("Could not set checkpoint");
1262 }
1263
1264 let (cx, shutdown) =
1265 SourceContext::new_shutdown(&ComponentKey::from(TEST_COMPONENT), tx);
1266 let config = JournaldConfig {
1267 journalctl_path: Some(TEST_JOURNALCTL.into()),
1268 include_matches,
1269 exclude_matches,
1270 data_dir: Some(tempdir),
1271 remap_priority: true,
1272 acknowledgements: false.into(),
1273 emit_cursor,
1274 ..Default::default()
1275 };
1276 let source = config.build(cx).await.unwrap();
1277 tokio::spawn(async move { source.await.unwrap() });
1278
1279 sleep(Duration::from_secs(1)).await;
1281 shutdown
1282 .shutdown_all(Some(Instant::now() + Duration::from_secs(1)))
1283 .await;
1284
1285 timeout(Duration::from_secs(1), rx.collect()).await.unwrap()
1286 })
1287 .await
1288 }
1289
1290 fn create_unit_matches<S: Into<String>>(units: Vec<S>) -> Matches {
1291 let units: HashSet<String> = units.into_iter().map(Into::into).collect();
1292 let mut map = HashMap::new();
1293 if !units.is_empty() {
1294 map.insert(String::from(SYSTEMD_UNIT), units);
1295 }
1296 map
1297 }
1298
1299 fn create_matches<S: Into<String>>(conditions: Vec<(S, S)>) -> Matches {
1300 let mut matches: Matches = HashMap::new();
1301 for (field, value) in conditions {
1302 matches
1303 .entry(field.into())
1304 .or_default()
1305 .insert(value.into());
1306 }
1307 matches
1308 }
1309
1310 #[tokio::test]
1311 async fn reads_journal() {
1312 let received = run_with_units(&[], &[], None).await;
1313 assert_eq!(received.len(), 8);
1314 assert_eq!(
1315 message(&received[0]),
1316 Value::Bytes("System Initialization".into())
1317 );
1318 assert_eq!(
1319 received[0].as_log()[log_schema().source_type_key().unwrap().to_string()],
1320 "journald".into()
1321 );
1322 assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140001000));
1323 assert_eq!(priority(&received[0]), Value::Bytes("INFO".into()));
1324 assert_eq!(message(&received[1]), Value::Bytes("unit message".into()));
1325 assert_eq!(timestamp(&received[1]), value_ts(1578529839, 140002000));
1326 assert_eq!(priority(&received[1]), Value::Bytes("DEBUG".into()));
1327 }
1328
1329 #[tokio::test]
1330 async fn includes_units() {
1331 let received = run_with_units(&["unit.service"], &[], None).await;
1332 assert_eq!(received.len(), 1);
1333 assert_eq!(message(&received[0]), Value::Bytes("unit message".into()));
1334 }
1335
1336 #[tokio::test]
1337 async fn excludes_units() {
1338 let received = run_with_units(&[], &["unit.service", "badunit.service"], None).await;
1339 assert_eq!(received.len(), 6);
1340 assert_eq!(
1341 message(&received[0]),
1342 Value::Bytes("System Initialization".into())
1343 );
1344 assert_eq!(
1345 message(&received[1]),
1346 Value::Bytes("Missing timestamp".into())
1347 );
1348 assert_eq!(
1349 message(&received[2]),
1350 Value::Bytes("Different timestamps".into())
1351 );
1352 }
1353
1354 #[tokio::test]
1355 async fn emits_cursor() {
1356 let received = run_journal(Matches::new(), Matches::new(), None, true).await;
1357 assert_eq!(cursor(&received[0]), Value::Bytes("1".into()));
1358 assert_eq!(cursor(&received[3]), Value::Bytes("4".into()));
1359 assert_eq!(cursor(&received[7]), Value::Bytes("8".into()));
1360 }
1361
1362 #[tokio::test]
1363 async fn includes_matches() {
1364 let matches = create_matches(vec![("PRIORITY", "ERR")]);
1365 let received = run_journal(matches, HashMap::new(), None, false).await;
1366 assert_eq!(received.len(), 2);
1367 assert_eq!(
1368 message(&received[0]),
1369 Value::Bytes("Different timestamps".into())
1370 );
1371 assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140005000));
1372 assert_eq!(
1373 message(&received[1]),
1374 Value::Bytes("Non-ASCII in other field".into())
1375 );
1376 assert_eq!(timestamp(&received[1]), value_ts(1578529839, 140005000));
1377 }
1378
1379 #[tokio::test]
1380 async fn includes_kernel() {
1381 let matches = create_matches(vec![("_TRANSPORT", "kernel")]);
1382 let received = run_journal(matches, HashMap::new(), None, false).await;
1383 assert_eq!(received.len(), 1);
1384 assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140006000));
1385 assert_eq!(message(&received[0]), Value::Bytes("audit log".into()));
1386 }
1387
1388 #[tokio::test]
1389 async fn excludes_matches() {
1390 let matches = create_matches(vec![("PRIORITY", "INFO"), ("PRIORITY", "DEBUG")]);
1391 let received = run_journal(HashMap::new(), matches, None, false).await;
1392 assert_eq!(received.len(), 5);
1393 assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140003000));
1394 assert_eq!(timestamp(&received[1]), value_ts(1578529839, 140004000));
1395 assert_eq!(timestamp(&received[2]), value_ts(1578529839, 140005000));
1396 assert_eq!(timestamp(&received[3]), value_ts(1578529839, 140005000));
1397 assert_eq!(timestamp(&received[4]), value_ts(1578529839, 140006000));
1398 }
1399
1400 #[tokio::test]
1401 async fn handles_checkpoint() {
1402 let received = run_with_units(&[], &[], Some("1")).await;
1403 assert_eq!(received.len(), 7);
1404 assert_eq!(message(&received[0]), Value::Bytes("unit message".into()));
1405 assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140002000));
1406 }
1407
1408 #[tokio::test]
1409 async fn parses_array_messages() {
1410 let received = run_with_units(&["badunit.service"], &[], None).await;
1411 assert_eq!(received.len(), 1);
1412 assert_eq!(message(&received[0]), Value::Bytes("¿Hello?".into()));
1413 }
1414
1415 #[tokio::test]
1416 async fn parses_array_fields() {
1417 let received = run_with_units(&["syslog.service"], &[], None).await;
1418 assert_eq!(received.len(), 1);
1419 assert_eq!(
1420 received[0].as_log()["SYSLOG_RAW"],
1421 Value::Bytes("¿World?".into())
1422 );
1423 }
1424
1425 #[tokio::test]
1426 async fn parses_string_sequences() {
1427 let received = run_with_units(&["NetworkManager.service"], &[], None).await;
1428 assert_eq!(received.len(), 1);
1429 assert_eq!(
1430 received[0].as_log()["SYSLOG_FACILITY"],
1431 Value::Bytes(r#"["DHCP4","DHCP6"]"#.into())
1432 );
1433 }
1434
1435 #[tokio::test]
1436 async fn handles_missing_timestamp() {
1437 let received = run_with_units(&["stdout"], &[], None).await;
1438 assert_eq!(received.len(), 2);
1439 assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140004000));
1440 assert_eq!(timestamp(&received[1]), value_ts(1578529839, 140005000));
1441 }
1442
1443 #[tokio::test]
1444 async fn handles_acknowledgements() {
1445 let (tx, mut rx) = SourceSender::new_test();
1446
1447 let tempdir = tempdir().unwrap();
1448 let tempdir = tempdir.path().to_path_buf();
1449 let mut checkpoint_path = tempdir.clone();
1450 checkpoint_path.push(TEST_COMPONENT);
1451 fs::create_dir(&checkpoint_path).unwrap();
1452 checkpoint_path.push(CHECKPOINT_FILENAME);
1453
1454 let mut checkpointer = Checkpointer::new(checkpoint_path.clone())
1455 .await
1456 .expect("Creating checkpointer failed!");
1457
1458 let config = JournaldConfig {
1459 journalctl_path: Some(TEST_JOURNALCTL.into()),
1460 data_dir: Some(tempdir),
1461 remap_priority: true,
1462 acknowledgements: true.into(),
1463 ..Default::default()
1464 };
1465 let (cx, _shutdown) = SourceContext::new_shutdown(&ComponentKey::from(TEST_COMPONENT), tx);
1466 let source = config.build(cx).await.unwrap();
1467 tokio::spawn(async move { source.await.unwrap() });
1468
1469 assert_eq!(checkpointer.get().await.unwrap(), None);
1471
1472 sleep(Duration::from_secs(1)).await;
1474
1475 let mut count = 0;
1477 while let Poll::Ready(Some(event)) = futures::poll!(rx.next()) {
1478 assert_eq!(checkpointer.get().await.unwrap(), None);
1480 event.metadata().update_status(EventStatus::Delivered);
1481 count += 1;
1482 }
1483 assert_eq!(count, 8);
1484
1485 sleep(Duration::from_millis(100)).await;
1486 assert_eq!(checkpointer.get().await.unwrap().as_deref(), Some("8"));
1487 }
1488
1489 #[test]
1490 fn filter_matches_works_correctly() {
1491 let empty: Matches = HashMap::new();
1492 let includes = create_unit_matches(vec!["one", "two"]);
1493 let excludes = create_unit_matches(vec!["foo", "bar"]);
1494
1495 let zero = HashMap::new();
1496 assert!(!filter_matches(&zero, &empty, &empty));
1497 assert!(filter_matches(&zero, &includes, &empty));
1498 assert!(!filter_matches(&zero, &empty, &excludes));
1499 assert!(filter_matches(&zero, &includes, &excludes));
1500 let mut one = HashMap::new();
1501 one.insert(String::from(SYSTEMD_UNIT), String::from("one"));
1502 assert!(!filter_matches(&one, &empty, &empty));
1503 assert!(!filter_matches(&one, &includes, &empty));
1504 assert!(!filter_matches(&one, &empty, &excludes));
1505 assert!(!filter_matches(&one, &includes, &excludes));
1506 let mut two = HashMap::new();
1507 two.insert(String::from(SYSTEMD_UNIT), String::from("bar"));
1508 assert!(!filter_matches(&two, &empty, &empty));
1509 assert!(filter_matches(&two, &includes, &empty));
1510 assert!(filter_matches(&two, &empty, &excludes));
1511 assert!(filter_matches(&two, &includes, &excludes));
1512 }
1513
1514 #[test]
1515 fn merges_units_and_matches_option() {
1516 let include_units = vec!["one", "two"].into_iter().map(String::from).collect();
1517 let include_matches = create_matches(vec![
1518 ("_SYSTEMD_UNIT", "three.service"),
1519 ("_TRANSPORT", "kernel"),
1520 ]);
1521
1522 let exclude_units = vec!["foo", "bar"].into_iter().map(String::from).collect();
1523 let exclude_matches = create_matches(vec![
1524 ("_SYSTEMD_UNIT", "baz.service"),
1525 ("PRIORITY", "DEBUG"),
1526 ]);
1527
1528 let journald_config = JournaldConfig {
1529 include_units,
1530 include_matches,
1531 exclude_units,
1532 exclude_matches,
1533 ..Default::default()
1534 };
1535
1536 let hashset =
1537 |v: &[&str]| -> HashSet<String> { v.iter().copied().map(String::from).collect() };
1538
1539 let matches = journald_config.merged_include_matches();
1540 let units = matches.get("_SYSTEMD_UNIT").unwrap();
1541 assert_eq!(
1542 units,
1543 &hashset(&["one.service", "two.service", "three.service"])
1544 );
1545 let units = matches.get("_TRANSPORT").unwrap();
1546 assert_eq!(units, &hashset(&["kernel"]));
1547
1548 let matches = journald_config.merged_exclude_matches();
1549 let units = matches.get("_SYSTEMD_UNIT").unwrap();
1550 assert_eq!(
1551 units,
1552 &hashset(&["foo.service", "bar.service", "baz.service"])
1553 );
1554 let units = matches.get("PRIORITY").unwrap();
1555 assert_eq!(units, &hashset(&["DEBUG"]));
1556 }
1557
1558 #[test]
1559 fn find_duplicate_match_works_correctly() {
1560 let include_matches = create_matches(vec![("_TRANSPORT", "kernel")]);
1561 let exclude_matches = create_matches(vec![("_TRANSPORT", "kernel")]);
1562 let (field, value) = find_duplicate_match(&include_matches, &exclude_matches).unwrap();
1563 assert_eq!(field, "_TRANSPORT");
1564 assert_eq!(value, "kernel");
1565
1566 let empty = HashMap::new();
1567 let actual = find_duplicate_match(&empty, &empty);
1568 assert!(actual.is_none());
1569
1570 let actual = find_duplicate_match(&include_matches, &empty);
1571 assert!(actual.is_none());
1572
1573 let actual = find_duplicate_match(&empty, &exclude_matches);
1574 assert!(actual.is_none());
1575 }
1576
1577 #[test]
1578 fn command_options() {
1579 let path = PathBuf::from("journalctl");
1580
1581 let systemd_version = 239;
1582 let journal_dir = None;
1583 let journal_namespace = None;
1584 let current_boot_only = false;
1585 let cursor = None;
1586 let since_now = false;
1587 let extra_args = vec![];
1588
1589 let command = create_command(
1590 &path,
1591 systemd_version,
1592 journal_dir,
1593 journal_namespace,
1594 current_boot_only,
1595 since_now,
1596 cursor,
1597 extra_args,
1598 );
1599 let cmd_line = format!("{command:?}");
1600 assert!(!cmd_line.contains("--directory="));
1601 assert!(!cmd_line.contains("--namespace="));
1602 assert!(!cmd_line.contains("--boot=all"));
1603 assert!(cmd_line.contains("--since=2000-01-01"));
1604
1605 let journal_dir = None;
1606 let journal_namespace = None;
1607 let since_now = true;
1608 let extra_args = vec![];
1609
1610 let command = create_command(
1611 &path,
1612 systemd_version,
1613 journal_dir,
1614 journal_namespace,
1615 current_boot_only,
1616 since_now,
1617 cursor,
1618 extra_args,
1619 );
1620 let cmd_line = format!("{command:?}");
1621 assert!(cmd_line.contains("--since=now"));
1622
1623 let journal_dir = Some(PathBuf::from("/tmp/journal-dir"));
1624 let journal_namespace = Some(String::from("my_namespace"));
1625 let current_boot_only = true;
1626 let cursor = Some("2021-01-01");
1627 let extra_args = vec!["--merge".to_string()];
1628
1629 let command = create_command(
1630 &path,
1631 systemd_version,
1632 journal_dir,
1633 journal_namespace,
1634 current_boot_only,
1635 since_now,
1636 cursor,
1637 extra_args,
1638 );
1639 let cmd_line = format!("{command:?}");
1640 assert!(cmd_line.contains("--directory=/tmp/journal-dir"));
1641 assert!(cmd_line.contains("--namespace=my_namespace"));
1642 assert!(cmd_line.contains("--boot"));
1643 assert!(cmd_line.contains("--after-cursor="));
1644 assert!(cmd_line.contains("--merge"));
1645
1646 let systemd_version = 258;
1647 let journal_dir = None;
1648 let journal_namespace = None;
1649 let current_boot_only = false;
1650 let extra_args = vec![];
1651
1652 let command = create_command(
1653 &path,
1654 systemd_version,
1655 journal_dir,
1656 journal_namespace,
1657 current_boot_only,
1658 since_now,
1659 cursor,
1660 extra_args,
1661 );
1662 let cmd_line = format!("{command:?}");
1663 assert!(cmd_line.contains("--boot=all"));
1664 }
1665
1666 #[allow(clippy::too_many_arguments)]
1667 fn create_command(
1668 path: &Path,
1669 systemd_version: u32,
1670 journal_dir: Option<PathBuf>,
1671 journal_namespace: Option<String>,
1672 current_boot_only: bool,
1673 since_now: bool,
1674 cursor: Option<&str>,
1675 extra_args: Vec<String>,
1676 ) -> Command {
1677 StartJournalctl::new(
1678 path.into(),
1679 systemd_version,
1680 journal_dir,
1681 journal_namespace,
1682 current_boot_only,
1683 since_now,
1684 extra_args,
1685 )
1686 .make_command(cursor)
1687 }
1688
1689 fn message(event: &Event) -> Value {
1690 event.as_log()[log_schema().message_key().unwrap().to_string()].clone()
1691 }
1692
1693 fn timestamp(event: &Event) -> Value {
1694 event.as_log()[log_schema().timestamp_key().unwrap().to_string()].clone()
1695 }
1696
1697 fn cursor(event: &Event) -> Value {
1698 event.as_log()[CURSOR].clone()
1699 }
1700
1701 fn value_ts(secs: i64, usecs: u32) -> Value {
1702 Value::Timestamp(
1703 chrono::Utc
1704 .timestamp_opt(secs, usecs)
1705 .single()
1706 .expect("invalid timestamp"),
1707 )
1708 }
1709
1710 fn priority(event: &Event) -> Value {
1711 event.as_log()["PRIORITY"].clone()
1712 }
1713
1714 #[test]
1715 fn output_schema_definition_vector_namespace() {
1716 let config = JournaldConfig {
1717 log_namespace: Some(true),
1718 ..Default::default()
1719 };
1720
1721 let definitions = config
1722 .outputs(LogNamespace::Vector)
1723 .remove(0)
1724 .schema_definition(true);
1725
1726 let expected_definition =
1727 Definition::new_with_default_metadata(Kind::bytes().or_null(), [LogNamespace::Vector])
1728 .with_metadata_field(
1729 &owned_value_path!("vector", "source_type"),
1730 Kind::bytes(),
1731 None,
1732 )
1733 .with_metadata_field(
1734 &owned_value_path!("vector", "ingest_timestamp"),
1735 Kind::timestamp(),
1736 None,
1737 )
1738 .with_metadata_field(
1739 &owned_value_path!(JournaldConfig::NAME, "metadata"),
1740 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1741 None,
1742 )
1743 .with_metadata_field(
1744 &owned_value_path!(JournaldConfig::NAME, "timestamp"),
1745 Kind::timestamp().or_undefined(),
1746 Some("timestamp"),
1747 )
1748 .with_metadata_field(
1749 &owned_value_path!(JournaldConfig::NAME, "host"),
1750 Kind::bytes().or_undefined(),
1751 Some("host"),
1752 );
1753
1754 assert_eq!(definitions, Some(expected_definition))
1755 }
1756
1757 #[test]
1758 fn output_schema_definition_legacy_namespace() {
1759 let config = JournaldConfig::default();
1760
1761 let definitions = config
1762 .outputs(LogNamespace::Legacy)
1763 .remove(0)
1764 .schema_definition(true);
1765
1766 let expected_definition = Definition::new_with_default_metadata(
1767 Kind::object(Collection::empty()),
1768 [LogNamespace::Legacy],
1769 )
1770 .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
1771 .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
1772 .with_event_field(
1773 &owned_value_path!("host"),
1774 Kind::bytes().or_undefined(),
1775 Some("host"),
1776 )
1777 .unknown_fields(Kind::bytes());
1778
1779 assert_eq!(definitions, Some(expected_definition))
1780 }
1781
1782 fn matches_schema(config: &JournaldConfig, namespace: LogNamespace) {
1783 let record = r#"{
1784 "PRIORITY":"6",
1785 "SYSLOG_FACILITY":"3",
1786 "SYSLOG_IDENTIFIER":"ntpd",
1787 "_BOOT_ID":"124c781146e841ae8d9b4590df8b9231",
1788 "_CAP_EFFECTIVE":"3fffffffff",
1789 "_CMDLINE":"ntpd: [priv]",
1790 "_COMM":"ntpd",
1791 "_EXE":"/usr/sbin/ntpd",
1792 "_GID":"0",
1793 "_MACHINE_ID":"c36e9ea52800a19d214cb71b53263a28",
1794 "_PID":"2156",
1795 "_STREAM_ID":"92c79f4b45c4457490ebdefece29995e",
1796 "_SYSTEMD_CGROUP":"/system.slice/ntpd.service",
1797 "_SYSTEMD_INVOCATION_ID":"496ad5cd046d48e29f37f559a6d176f8",
1798 "_SYSTEMD_SLICE":"system.slice",
1799 "_SYSTEMD_UNIT":"ntpd.service",
1800 "_TRANSPORT":"stdout",
1801 "_UID":"0",
1802 "__MONOTONIC_TIMESTAMP":"98694000446",
1803 "__REALTIME_TIMESTAMP":"1564173027000443",
1804 "host":"my-host.local",
1805 "message":"reply from 192.168.1.2: offset -0.001791 delay 0.000176, next query 1500s",
1806 "source_type":"journald"
1807 }"#;
1808
1809 let json: serde_json::Value = serde_json::from_str(record).unwrap();
1810 let mut event = Event::from(LogEvent::from(vrl::value::Value::from(json)));
1811
1812 event.as_mut_log().insert("timestamp", chrono::Utc::now());
1813
1814 let definitions = config.outputs(namespace).remove(0).schema_definition(true);
1815
1816 definitions.unwrap().assert_valid_for_event(&event);
1817 }
1818
1819 #[test]
1820 fn matches_schema_legacy() {
1821 let config = JournaldConfig::default();
1822
1823 matches_schema(&config, LogNamespace::Legacy)
1824 }
1825}