vector/sources/
journald.rs

1use std::{
2    collections::{HashMap, HashSet},
3    io::SeekFrom,
4    path::PathBuf,
5    process::Stdio,
6    str::FromStr,
7    sync::{Arc, LazyLock},
8    time::Duration,
9};
10
11use bytes::Bytes;
12use chrono::{TimeZone, Utc};
13use futures::{poll, stream::BoxStream, task::Poll, StreamExt};
14use nix::{
15    sys::signal::{kill, Signal},
16    unistd::Pid,
17};
18use serde_json::{Error as JsonError, Value as JsonValue};
19use snafu::{ResultExt, Snafu};
20use tokio::{
21    fs::{File, OpenOptions},
22    io::{self, AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
23    process::{Child, Command},
24    sync::{Mutex, MutexGuard},
25    time::sleep,
26};
27use tokio_util::codec::FramedRead;
28use vector_lib::codecs::{decoding::BoxedFramingError, CharacterDelimitedDecoder};
29use vector_lib::configurable::configurable_component;
30use vector_lib::lookup::{metadata_path, owned_value_path, path};
31use vector_lib::{
32    config::{LegacyKey, LogNamespace},
33    schema::Definition,
34    EstimatedJsonEncodedSizeOf,
35};
36use vector_lib::{
37    finalizer::OrderedFinalizer,
38    internal_event::{
39        ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol, Registered,
40    },
41};
42use vrl::event_path;
43use vrl::value::{kind::Collection, Kind, Value};
44
45use crate::{
46    config::{
47        log_schema, DataType, SourceAcknowledgementsConfig, SourceConfig, SourceContext,
48        SourceOutput,
49    },
50    event::{BatchNotifier, BatchStatus, BatchStatusReceiver, LogEvent},
51    internal_events::{
52        EventsReceived, JournaldCheckpointFileOpenError, JournaldCheckpointSetError,
53        JournaldInvalidRecordError, JournaldReadError, JournaldStartJournalctlError,
54        StreamClosedError,
55    },
56    serde::bool_or_struct,
57    shutdown::ShutdownSignal,
58    SourceSender,
59};
60
61const BATCH_TIMEOUT: Duration = Duration::from_millis(10);
62
63const CHECKPOINT_FILENAME: &str = "checkpoint.txt";
64const CURSOR: &str = "__CURSOR";
65const HOSTNAME: &str = "_HOSTNAME";
66const MESSAGE: &str = "MESSAGE";
67const SYSTEMD_UNIT: &str = "_SYSTEMD_UNIT";
68const SOURCE_TIMESTAMP: &str = "_SOURCE_REALTIME_TIMESTAMP";
69const RECEIVED_TIMESTAMP: &str = "__REALTIME_TIMESTAMP";
70
71const BACKOFF_DURATION: Duration = Duration::from_secs(1);
72
73static JOURNALCTL: LazyLock<PathBuf> = LazyLock::new(|| "journalctl".into());
74
75#[derive(Debug, Snafu)]
76enum BuildError {
77    #[snafu(display("journalctl failed to execute: {}", source))]
78    JournalctlSpawn { source: io::Error },
79    #[snafu(display(
80        "The unit {:?} is duplicated in both include_units and exclude_units",
81        unit
82    ))]
83    DuplicatedUnit { unit: String },
84    #[snafu(display(
85        "The Journal field/value pair {:?}:{:?} is duplicated in both include_matches and exclude_matches.",
86        field,
87        value,
88    ))]
89    DuplicatedMatches { field: String, value: String },
90}
91
92type Matches = HashMap<String, HashSet<String>>;
93
94/// Configuration for the `journald` source.
95#[configurable_component(source("journald", "Collect logs from JournalD."))]
96#[derive(Clone, Debug)]
97#[serde(deny_unknown_fields)]
98pub struct JournaldConfig {
99    /// Only include entries that appended to the journal after the entries have been read.
100    #[serde(default)]
101    pub since_now: bool,
102
103    /// Only include entries that occurred after the current boot of the system.
104    #[serde(default = "crate::serde::default_true")]
105    pub current_boot_only: bool,
106
107    /// A list of unit names to monitor.
108    ///
109    /// If empty or not present, all units are accepted.
110    ///
111    /// Unit names lacking a `.` have `.service` appended to make them a valid service unit name.
112    #[serde(default)]
113    #[configurable(metadata(docs::examples = "ntpd", docs::examples = "sysinit.target"))]
114    pub include_units: Vec<String>,
115
116    /// A list of unit names to exclude from monitoring.
117    ///
118    /// Unit names lacking a `.` have `.service` appended to make them a valid service unit
119    /// name.
120    #[serde(default)]
121    #[configurable(metadata(docs::examples = "badservice", docs::examples = "sysinit.target"))]
122    pub exclude_units: Vec<String>,
123
124    /// A list of sets of field/value pairs to monitor.
125    ///
126    /// If empty or not present, all journal fields are accepted.
127    ///
128    /// If `include_units` is specified, it is merged into this list.
129    #[serde(default)]
130    #[configurable(metadata(
131        docs::additional_props_description = "The set of field values to match in journal entries that are to be included."
132    ))]
133    #[configurable(metadata(docs::examples = "matches_examples()"))]
134    pub include_matches: Matches,
135
136    /// A list of sets of field/value pairs that, if any are present in a journal entry,
137    /// excludes the entry from this source.
138    ///
139    /// If `exclude_units` is specified, it is merged into this list.
140    #[serde(default)]
141    #[configurable(metadata(
142        docs::additional_props_description = "The set of field values to match in journal entries that are to be excluded."
143    ))]
144    #[configurable(metadata(docs::examples = "matches_examples()"))]
145    pub exclude_matches: Matches,
146
147    /// The directory used to persist file checkpoint positions.
148    ///
149    /// By default, the [global `data_dir` option][global_data_dir] is used.
150    /// Make sure the running user has write permissions to this directory.
151    ///
152    /// If this directory is specified, then Vector will attempt to create it.
153    ///
154    /// [global_data_dir]: https://vector.dev/docs/reference/configuration/global-options/#data_dir
155    #[serde(default)]
156    #[configurable(metadata(docs::examples = "/var/lib/vector"))]
157    #[configurable(metadata(docs::human_name = "Data Directory"))]
158    pub data_dir: Option<PathBuf>,
159
160    /// A list of extra command line arguments to pass to `journalctl`.
161    ///
162    /// If specified, it is merged to the command line arguments as-is.
163    #[serde(default)]
164    #[configurable(metadata(docs::examples = "--merge"))]
165    pub extra_args: Vec<String>,
166
167    /// The systemd journal is read in batches, and a checkpoint is set at the end of each batch.
168    ///
169    /// This option limits the size of the batch.
170    #[serde(default = "default_batch_size")]
171    #[configurable(metadata(docs::type_unit = "events"))]
172    pub batch_size: usize,
173
174    /// The full path of the `journalctl` executable.
175    ///
176    /// If not set, a search is done for the `journalctl` path.
177    #[serde(default)]
178    pub journalctl_path: Option<PathBuf>,
179
180    /// The full path of the journal directory.
181    ///
182    /// If not set, `journalctl` uses the default system journal path.
183    #[serde(default)]
184    pub journal_directory: Option<PathBuf>,
185
186    /// The [journal namespace][journal-namespace].
187    ///
188    /// This value is passed to `journalctl` through the [`--namespace` option][journalctl-namespace-option].
189    /// If not set, `journalctl` uses the default namespace.
190    ///
191    /// [journal-namespace]: https://www.freedesktop.org/software/systemd/man/systemd-journald.service.html#Journal%20Namespaces
192    /// [journalctl-namespace-option]: https://www.freedesktop.org/software/systemd/man/journalctl.html#--namespace=NAMESPACE
193    #[serde(default)]
194    pub journal_namespace: Option<String>,
195
196    #[configurable(derived)]
197    #[serde(default, deserialize_with = "bool_or_struct")]
198    acknowledgements: SourceAcknowledgementsConfig,
199
200    /// Enables remapping the `PRIORITY` field from an integer to string value.
201    ///
202    /// Has no effect unless the value of the field is already an integer.
203    #[serde(default)]
204    #[configurable(
205        deprecated = "This option has been deprecated, use the `remap` transform and `to_syslog_level` function instead."
206    )]
207    remap_priority: bool,
208
209    /// The namespace to use for logs. This overrides the global setting.
210    #[configurable(metadata(docs::hidden))]
211    #[serde(default)]
212    log_namespace: Option<bool>,
213
214    /// Whether to emit the [__CURSOR field][cursor]. See also [sd_journal_get_cursor][get_cursor].
215    ///
216    /// [cursor]: https://www.freedesktop.org/software/systemd/man/latest/systemd.journal-fields.html#Address%20Fields
217    /// [get_cursor]: https://www.freedesktop.org/software/systemd/man/latest/sd_journal_get_cursor.html
218    #[serde(default = "crate::serde::default_false")]
219    emit_cursor: bool,
220}
221
222const fn default_batch_size() -> usize {
223    16
224}
225
226fn matches_examples() -> HashMap<String, Vec<String>> {
227    HashMap::<_, _>::from_iter([
228        (
229            "_SYSTEMD_UNIT".to_owned(),
230            vec!["sshd.service".to_owned(), "ntpd.service".to_owned()],
231        ),
232        ("_TRANSPORT".to_owned(), vec!["kernel".to_owned()]),
233    ])
234}
235
236impl JournaldConfig {
237    fn merged_include_matches(&self) -> Matches {
238        Self::merge_units(&self.include_matches, &self.include_units)
239    }
240
241    fn merged_exclude_matches(&self) -> Matches {
242        Self::merge_units(&self.exclude_matches, &self.exclude_units)
243    }
244
245    fn merge_units(matches: &Matches, units: &[String]) -> Matches {
246        let mut matches = matches.clone();
247        for unit in units {
248            let entry = matches.entry(String::from(SYSTEMD_UNIT));
249            entry.or_default().insert(fixup_unit(unit));
250        }
251        matches
252    }
253
254    /// Builds the `schema::Definition` for this source using the provided `LogNamespace`.
255    fn schema_definition(&self, log_namespace: LogNamespace) -> Definition {
256        let schema_definition = match log_namespace {
257            LogNamespace::Vector => Definition::new_with_default_metadata(
258                Kind::bytes().or_null(),
259                [LogNamespace::Vector],
260            ),
261            LogNamespace::Legacy => Definition::new_with_default_metadata(
262                Kind::object(Collection::empty()),
263                [LogNamespace::Legacy],
264            ),
265        };
266
267        let mut schema_definition = schema_definition
268            .with_standard_vector_source_metadata()
269            // for metadata that is added to the events dynamically through the Record
270            .with_source_metadata(
271                JournaldConfig::NAME,
272                None,
273                &owned_value_path!("metadata"),
274                Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
275                None,
276            )
277            .with_source_metadata(
278                JournaldConfig::NAME,
279                None,
280                &owned_value_path!("timestamp"),
281                Kind::timestamp().or_undefined(),
282                Some("timestamp"),
283            )
284            .with_source_metadata(
285                JournaldConfig::NAME,
286                log_schema().host_key().cloned().map(LegacyKey::Overwrite),
287                &owned_value_path!("host"),
288                Kind::bytes().or_undefined(),
289                Some("host"),
290            );
291
292        // for metadata that is added to the events dynamically through the Record
293        if log_namespace == LogNamespace::Legacy {
294            schema_definition = schema_definition.unknown_fields(Kind::bytes());
295        }
296
297        schema_definition
298    }
299}
300
301impl Default for JournaldConfig {
302    fn default() -> Self {
303        Self {
304            since_now: false,
305            current_boot_only: true,
306            include_units: vec![],
307            exclude_units: vec![],
308            include_matches: Default::default(),
309            exclude_matches: Default::default(),
310            data_dir: None,
311            batch_size: default_batch_size(),
312            journalctl_path: None,
313            journal_directory: None,
314            journal_namespace: None,
315            extra_args: vec![],
316            acknowledgements: Default::default(),
317            remap_priority: false,
318            log_namespace: None,
319            emit_cursor: false,
320        }
321    }
322}
323
324impl_generate_config_from_default!(JournaldConfig);
325
326type Record = HashMap<String, String>;
327
328#[async_trait::async_trait]
329#[typetag::serde(name = "journald")]
330impl SourceConfig for JournaldConfig {
331    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
332        if self.remap_priority {
333            warn!("DEPRECATION, option `remap_priority` has been deprecated. Please use the `remap` transform and function `to_syslog_level` instead.");
334        }
335
336        let data_dir = cx
337            .globals
338            // source are only global, name can be used for subdir
339            .resolve_and_make_data_subdir(self.data_dir.as_ref(), cx.key.id())?;
340
341        if let Some(unit) = self
342            .include_units
343            .iter()
344            .find(|unit| self.exclude_units.contains(unit))
345        {
346            let unit = unit.into();
347            return Err(BuildError::DuplicatedUnit { unit }.into());
348        }
349
350        let include_matches = self.merged_include_matches();
351        let exclude_matches = self.merged_exclude_matches();
352
353        if let Some((field, value)) = find_duplicate_match(&include_matches, &exclude_matches) {
354            return Err(BuildError::DuplicatedMatches { field, value }.into());
355        }
356
357        let mut checkpoint_path = data_dir;
358        checkpoint_path.push(CHECKPOINT_FILENAME);
359
360        let journalctl_path = self
361            .journalctl_path
362            .clone()
363            .unwrap_or_else(|| JOURNALCTL.clone());
364
365        let starter = StartJournalctl::new(
366            journalctl_path,
367            self.journal_directory.clone(),
368            self.journal_namespace.clone(),
369            self.current_boot_only,
370            self.since_now,
371            self.extra_args.clone(),
372        );
373
374        let batch_size = self.batch_size;
375        let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
376        let log_namespace = cx.log_namespace(self.log_namespace);
377
378        Ok(Box::pin(
379            JournaldSource {
380                include_matches,
381                exclude_matches,
382                checkpoint_path,
383                batch_size,
384                remap_priority: self.remap_priority,
385                out: cx.out,
386                acknowledgements,
387                starter,
388                log_namespace,
389                emit_cursor: self.emit_cursor,
390            }
391            .run_shutdown(cx.shutdown),
392        ))
393    }
394
395    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
396        let schema_definition =
397            self.schema_definition(global_log_namespace.merge(self.log_namespace));
398
399        vec![SourceOutput::new_maybe_logs(
400            DataType::Log,
401            schema_definition,
402        )]
403    }
404
405    fn can_acknowledge(&self) -> bool {
406        true
407    }
408}
409
410struct JournaldSource {
411    include_matches: Matches,
412    exclude_matches: Matches,
413    checkpoint_path: PathBuf,
414    batch_size: usize,
415    remap_priority: bool,
416    out: SourceSender,
417    acknowledgements: bool,
418    starter: StartJournalctl,
419    log_namespace: LogNamespace,
420    emit_cursor: bool,
421}
422
423impl JournaldSource {
424    async fn run_shutdown(self, shutdown: ShutdownSignal) -> Result<(), ()> {
425        let checkpointer = StatefulCheckpointer::new(self.checkpoint_path.clone())
426            .await
427            .map_err(|error| {
428                emit!(JournaldCheckpointFileOpenError {
429                    error,
430                    path: self
431                        .checkpoint_path
432                        .to_str()
433                        .unwrap_or("unknown")
434                        .to_string(),
435                });
436            })?;
437
438        let checkpointer = SharedCheckpointer::new(checkpointer);
439        let finalizer = Finalizer::new(
440            self.acknowledgements,
441            checkpointer.clone(),
442            shutdown.clone(),
443        );
444
445        self.run(checkpointer, finalizer, shutdown).await;
446
447        Ok(())
448    }
449
450    async fn run(
451        mut self,
452        checkpointer: SharedCheckpointer,
453        finalizer: Finalizer,
454        mut shutdown: ShutdownSignal,
455    ) {
456        loop {
457            if matches!(poll!(&mut shutdown), Poll::Ready(_)) {
458                break;
459            }
460
461            info!("Starting journalctl.");
462            let cursor = checkpointer.lock().await.cursor.clone();
463            match self.starter.start(cursor.as_deref()) {
464                Ok((stream, running)) => {
465                    if !self.run_stream(stream, &finalizer, shutdown.clone()).await {
466                        return;
467                    }
468                    // Explicit drop to ensure it isn't dropped earlier.
469                    drop(running);
470                }
471                Err(error) => {
472                    emit!(JournaldStartJournalctlError { error });
473                }
474            }
475
476            // journalctl process should never stop,
477            // so it is an error if we reach here.
478            tokio::select! {
479                _ = &mut shutdown => break,
480                _ = sleep(BACKOFF_DURATION) => (),
481            }
482        }
483    }
484
485    /// Process `journalctl` output until some error occurs.
486    /// Return `true` if should restart `journalctl`.
487    async fn run_stream<'a>(
488        &'a mut self,
489        mut stream: JournalStream,
490        finalizer: &'a Finalizer,
491        mut shutdown: ShutdownSignal,
492    ) -> bool {
493        let bytes_received = register!(BytesReceived::from(Protocol::from("journald")));
494        let events_received = register!(EventsReceived);
495
496        let batch_size = self.batch_size;
497        loop {
498            let mut batch = Batch::new(self);
499
500            // Start the timeout counter only once we have received a
501            // valid and non-filtered event.
502            while batch.events.is_empty() {
503                let item = tokio::select! {
504                    _ = &mut shutdown => return false,
505                    item = stream.next() => item,
506                };
507                if !batch.handle_next(item) {
508                    return true;
509                }
510            }
511
512            let timeout = tokio::time::sleep(BATCH_TIMEOUT);
513            tokio::pin!(timeout);
514
515            for _ in 1..batch_size {
516                tokio::select! {
517                    _ = &mut timeout => break,
518                    result = stream.next() => if !batch.handle_next(result) {
519                        break;
520                    }
521                }
522            }
523            if let Some(x) = batch
524                .finish(finalizer, &bytes_received, &events_received)
525                .await
526            {
527                break x;
528            }
529        }
530    }
531}
532
533struct Batch<'a> {
534    events: Vec<LogEvent>,
535    record_size: usize,
536    exiting: Option<bool>,
537    batch: Option<BatchNotifier>,
538    receiver: Option<BatchStatusReceiver>,
539    source: &'a mut JournaldSource,
540    cursor: Option<String>,
541}
542
543impl<'a> Batch<'a> {
544    fn new(source: &'a mut JournaldSource) -> Self {
545        let (batch, receiver) = BatchNotifier::maybe_new_with_receiver(source.acknowledgements);
546        Self {
547            events: Vec::new(),
548            record_size: 0,
549            exiting: None,
550            batch,
551            receiver,
552            source,
553            cursor: None,
554        }
555    }
556
557    fn handle_next(&mut self, result: Option<Result<Bytes, BoxedFramingError>>) -> bool {
558        match result {
559            None => {
560                warn!("Journalctl process stopped.");
561                self.exiting = Some(true);
562                false
563            }
564            Some(Err(error)) => {
565                emit!(JournaldReadError { error });
566                false
567            }
568            Some(Ok(bytes)) => {
569                match decode_record(&bytes, self.source.remap_priority) {
570                    Ok(mut record) => {
571                        if self.source.emit_cursor {
572                            if let Some(tmp) = record.get(CURSOR) {
573                                self.cursor = Some(tmp.clone());
574                            }
575                        } else if let Some(tmp) = record.remove(CURSOR) {
576                            self.cursor = Some(tmp);
577                        }
578
579                        if !filter_matches(
580                            &record,
581                            &self.source.include_matches,
582                            &self.source.exclude_matches,
583                        ) {
584                            self.record_size += bytes.len();
585
586                            let mut event = create_log_event_from_record(
587                                record,
588                                &self.batch,
589                                self.source.log_namespace,
590                            );
591
592                            enrich_log_event(&mut event, self.source.log_namespace);
593
594                            self.events.push(event);
595                        }
596                    }
597                    Err(error) => {
598                        emit!(JournaldInvalidRecordError {
599                            error,
600                            text: String::from_utf8_lossy(&bytes).into_owned()
601                        });
602                    }
603                }
604                true
605            }
606        }
607    }
608
609    async fn finish(
610        mut self,
611        finalizer: &Finalizer,
612        bytes_received: &'a Registered<BytesReceived>,
613        events_received: &'a Registered<EventsReceived>,
614    ) -> Option<bool> {
615        drop(self.batch);
616
617        if self.record_size > 0 {
618            bytes_received.emit(ByteSize(self.record_size));
619        }
620
621        if !self.events.is_empty() {
622            let count = self.events.len();
623            let byte_size = self.events.estimated_json_encoded_size_of();
624            events_received.emit(CountByteSize(count, byte_size));
625
626            match self.source.out.send_batch(self.events).await {
627                Ok(_) => {
628                    if let Some(cursor) = self.cursor {
629                        finalizer.finalize(cursor, self.receiver).await;
630                    }
631                }
632                Err(_) => {
633                    emit!(StreamClosedError { count });
634                    // `out` channel is closed, don't restart journalctl.
635                    self.exiting = Some(false);
636                }
637            }
638        }
639        self.exiting
640    }
641}
642
643type JournalStream = BoxStream<'static, Result<Bytes, BoxedFramingError>>;
644
645struct StartJournalctl {
646    path: PathBuf,
647    journal_dir: Option<PathBuf>,
648    journal_namespace: Option<String>,
649    current_boot_only: bool,
650    since_now: bool,
651    extra_args: Vec<String>,
652}
653
654impl StartJournalctl {
655    const fn new(
656        path: PathBuf,
657        journal_dir: Option<PathBuf>,
658        journal_namespace: Option<String>,
659        current_boot_only: bool,
660        since_now: bool,
661        extra_args: Vec<String>,
662    ) -> Self {
663        Self {
664            path,
665            journal_dir,
666            journal_namespace,
667            current_boot_only,
668            since_now,
669            extra_args,
670        }
671    }
672
673    fn make_command(&self, checkpoint: Option<&str>) -> Command {
674        let mut command = Command::new(&self.path);
675        command.stdout(Stdio::piped());
676        command.arg("--follow");
677        command.arg("--all");
678        command.arg("--show-cursor");
679        command.arg("--output=json");
680
681        if let Some(dir) = &self.journal_dir {
682            command.arg(format!("--directory={}", dir.display()));
683        }
684
685        if let Some(namespace) = &self.journal_namespace {
686            command.arg(format!("--namespace={namespace}"));
687        }
688
689        if self.current_boot_only {
690            command.arg("--boot");
691        }
692
693        if let Some(cursor) = checkpoint {
694            command.arg(format!("--after-cursor={cursor}"));
695        } else if self.since_now {
696            command.arg("--since=now");
697        } else {
698            // journalctl --follow only outputs a few lines without a starting point
699            command.arg("--since=2000-01-01");
700        }
701
702        if !self.extra_args.is_empty() {
703            command.args(&self.extra_args);
704        }
705
706        command
707    }
708
709    fn start(
710        &mut self,
711        checkpoint: Option<&str>,
712    ) -> crate::Result<(JournalStream, RunningJournalctl)> {
713        let mut command = self.make_command(checkpoint);
714
715        let mut child = command.spawn().context(JournalctlSpawnSnafu)?;
716
717        let stream = FramedRead::new(
718            child.stdout.take().unwrap(),
719            CharacterDelimitedDecoder::new(b'\n'),
720        )
721        .boxed();
722
723        Ok((stream, RunningJournalctl(child)))
724    }
725}
726
727struct RunningJournalctl(Child);
728
729impl Drop for RunningJournalctl {
730    fn drop(&mut self) {
731        if let Some(pid) = self.0.id().and_then(|pid| pid.try_into().ok()) {
732            _ = kill(Pid::from_raw(pid), Signal::SIGTERM);
733        }
734    }
735}
736
737fn enrich_log_event(log: &mut LogEvent, log_namespace: LogNamespace) {
738    match log_namespace {
739        LogNamespace::Vector => {
740            if let Some(host) = log
741                .get(metadata_path!(JournaldConfig::NAME, "metadata"))
742                .and_then(|meta| meta.get(HOSTNAME))
743            {
744                log.insert(metadata_path!(JournaldConfig::NAME, "host"), host.clone());
745            }
746        }
747        LogNamespace::Legacy => {
748            if let Some(host) = log.remove(event_path!(HOSTNAME)) {
749                log_namespace.insert_source_metadata(
750                    JournaldConfig::NAME,
751                    log,
752                    log_schema().host_key().map(LegacyKey::Overwrite),
753                    path!("host"),
754                    host,
755                );
756            }
757        }
758    }
759
760    // Create a Utc timestamp from an existing log field if present.
761    let timestamp_value = match log_namespace {
762        LogNamespace::Vector => log
763            .get(metadata_path!(JournaldConfig::NAME, "metadata"))
764            .and_then(|meta| {
765                meta.get(SOURCE_TIMESTAMP)
766                    .or_else(|| meta.get(RECEIVED_TIMESTAMP))
767            }),
768        LogNamespace::Legacy => log
769            .get(event_path!(SOURCE_TIMESTAMP))
770            .or_else(|| log.get(event_path!(RECEIVED_TIMESTAMP))),
771    };
772
773    let timestamp = timestamp_value
774        .filter(|&ts| ts.is_bytes())
775        .and_then(|ts| ts.as_str().unwrap().parse::<u64>().ok())
776        .map(|ts| {
777            chrono::Utc
778                .timestamp_opt((ts / 1_000_000) as i64, (ts % 1_000_000) as u32 * 1_000)
779                .single()
780                .expect("invalid timestamp")
781        });
782
783    // Add timestamp.
784    match log_namespace {
785        LogNamespace::Vector => {
786            log.insert(metadata_path!("vector", "ingest_timestamp"), Utc::now());
787
788            if let Some(ts) = timestamp {
789                log.insert(metadata_path!(JournaldConfig::NAME, "timestamp"), ts);
790            }
791        }
792        LogNamespace::Legacy => {
793            if let Some(ts) = timestamp {
794                log.maybe_insert(log_schema().timestamp_key_target_path(), ts);
795            }
796        }
797    }
798
799    // Add source type.
800    log_namespace.insert_vector_metadata(
801        log,
802        log_schema().source_type_key(),
803        path!("source_type"),
804        JournaldConfig::NAME,
805    );
806}
807
808fn create_log_event_from_record(
809    mut record: Record,
810    batch: &Option<BatchNotifier>,
811    log_namespace: LogNamespace,
812) -> LogEvent {
813    match log_namespace {
814        LogNamespace::Vector => {
815            let message_value = record
816                .remove(MESSAGE)
817                .map(|msg| Value::Bytes(Bytes::from(msg)))
818                .unwrap_or(Value::Null);
819
820            let mut log = LogEvent::from(message_value).with_batch_notifier_option(batch);
821
822            // Add the remaining fields from the Record to the log event into an object to avoid collisions.
823            record.iter().for_each(|(key, value)| {
824                log.metadata_mut()
825                    .value_mut()
826                    .insert(path!(JournaldConfig::NAME, "metadata", key), value.as_str());
827            });
828
829            log
830        }
831        LogNamespace::Legacy => {
832            let mut log = LogEvent::from_iter(record).with_batch_notifier_option(batch);
833
834            if let Some(message) = log.remove(event_path!(MESSAGE)) {
835                log.maybe_insert(log_schema().message_key_target_path(), message);
836            }
837
838            log
839        }
840    }
841}
842
843/// Map the given unit name into a valid systemd unit
844/// by appending ".service" if no extension is present.
845fn fixup_unit(unit: &str) -> String {
846    if unit.contains('.') {
847        unit.into()
848    } else {
849        format!("{unit}.service")
850    }
851}
852
853fn decode_record(line: &[u8], remap: bool) -> Result<Record, JsonError> {
854    let mut record = serde_json::from_str::<JsonValue>(&String::from_utf8_lossy(line))?;
855    // journalctl will output non-ASCII values using an array
856    // of integers. Look for those values and re-parse them.
857    if let Some(record) = record.as_object_mut() {
858        for (_, value) in record.iter_mut().filter(|(_, v)| v.is_array()) {
859            *value = decode_array(value.as_array().expect("already validated"));
860        }
861    }
862    if remap {
863        record.get_mut("PRIORITY").map(remap_priority);
864    }
865    serde_json::from_value(record)
866}
867
868fn decode_array(array: &[JsonValue]) -> JsonValue {
869    decode_array_as_bytes(array).unwrap_or_else(|| {
870        let ser = serde_json::to_string(array).expect("already deserialized");
871        JsonValue::String(ser)
872    })
873}
874
875fn decode_array_as_bytes(array: &[JsonValue]) -> Option<JsonValue> {
876    // From the array of values, turn all the numbers into bytes, and
877    // then the bytes into a string, but return None if any value in the
878    // array was not a valid byte.
879    array
880        .iter()
881        .map(|item| {
882            item.as_u64().and_then(|num| match num {
883                num if num <= u8::MAX as u64 => Some(num as u8),
884                _ => None,
885            })
886        })
887        .collect::<Option<Vec<u8>>>()
888        .map(|array| String::from_utf8_lossy(&array).into())
889}
890
891fn remap_priority(priority: &mut JsonValue) {
892    if let Some(num) = priority.as_str().and_then(|s| usize::from_str(s).ok()) {
893        let text = match num {
894            0 => "EMERG",
895            1 => "ALERT",
896            2 => "CRIT",
897            3 => "ERR",
898            4 => "WARNING",
899            5 => "NOTICE",
900            6 => "INFO",
901            7 => "DEBUG",
902            _ => "UNKNOWN",
903        };
904        *priority = JsonValue::String(text.into());
905    }
906}
907
908fn filter_matches(record: &Record, includes: &Matches, excludes: &Matches) -> bool {
909    match (includes.is_empty(), excludes.is_empty()) {
910        (true, true) => false,
911        (false, true) => !contains_match(record, includes),
912        (true, false) => contains_match(record, excludes),
913        (false, false) => !contains_match(record, includes) || contains_match(record, excludes),
914    }
915}
916
917fn contains_match(record: &Record, matches: &Matches) -> bool {
918    let f = move |(field, value)| {
919        matches
920            .get(field)
921            .map(|x| x.contains(value))
922            .unwrap_or(false)
923    };
924    record.iter().any(f)
925}
926
927fn find_duplicate_match(a_matches: &Matches, b_matches: &Matches) -> Option<(String, String)> {
928    for (a_key, a_values) in a_matches {
929        if let Some(b_values) = b_matches.get(a_key.as_str()) {
930            for (a, b) in a_values
931                .iter()
932                .flat_map(|x| std::iter::repeat(x).zip(b_values.iter()))
933            {
934                if a == b {
935                    return Some((a_key.into(), b.into()));
936                }
937            }
938        }
939    }
940    None
941}
942
943enum Finalizer {
944    Sync(SharedCheckpointer),
945    Async(OrderedFinalizer<String>),
946}
947
948impl Finalizer {
949    fn new(
950        acknowledgements: bool,
951        checkpointer: SharedCheckpointer,
952        shutdown: ShutdownSignal,
953    ) -> Self {
954        if acknowledgements {
955            let (finalizer, mut ack_stream) = OrderedFinalizer::new(Some(shutdown));
956            tokio::spawn(async move {
957                while let Some((status, cursor)) = ack_stream.next().await {
958                    if status == BatchStatus::Delivered {
959                        checkpointer.lock().await.set(cursor).await;
960                    }
961                }
962            });
963            Self::Async(finalizer)
964        } else {
965            Self::Sync(checkpointer)
966        }
967    }
968
969    async fn finalize(&self, cursor: String, receiver: Option<BatchStatusReceiver>) {
970        match (self, receiver) {
971            (Self::Sync(checkpointer), None) => checkpointer.lock().await.set(cursor).await,
972            (Self::Async(finalizer), Some(receiver)) => finalizer.add(cursor, receiver),
973            _ => {
974                unreachable!("Cannot have async finalization without a receiver in journald source")
975            }
976        }
977    }
978}
979
980struct Checkpointer {
981    file: File,
982    filename: PathBuf,
983}
984
985impl Checkpointer {
986    async fn new(filename: PathBuf) -> Result<Self, io::Error> {
987        let file = OpenOptions::new()
988            .read(true)
989            .write(true)
990            .create(true)
991            .truncate(false)
992            .open(&filename)
993            .await?;
994        Ok(Checkpointer { file, filename })
995    }
996
997    async fn set(&mut self, token: &str) -> Result<(), io::Error> {
998        self.file.seek(SeekFrom::Start(0)).await?;
999        self.file.write_all(format!("{token}\n").as_bytes()).await
1000    }
1001
1002    async fn get(&mut self) -> Result<Option<String>, io::Error> {
1003        let mut buf = Vec::<u8>::new();
1004        self.file.seek(SeekFrom::Start(0)).await?;
1005        self.file.read_to_end(&mut buf).await?;
1006        match buf.len() {
1007            0 => Ok(None),
1008            _ => {
1009                let text = String::from_utf8_lossy(&buf);
1010                match text.find('\n') {
1011                    Some(nl) => Ok(Some(String::from(&text[..nl]))),
1012                    None => Ok(None), // Maybe return an error?
1013                }
1014            }
1015        }
1016    }
1017}
1018
1019struct StatefulCheckpointer {
1020    checkpointer: Checkpointer,
1021    cursor: Option<String>,
1022}
1023
1024impl StatefulCheckpointer {
1025    async fn new(filename: PathBuf) -> Result<Self, io::Error> {
1026        let mut checkpointer = Checkpointer::new(filename).await?;
1027        let cursor = checkpointer.get().await?;
1028        Ok(Self {
1029            checkpointer,
1030            cursor,
1031        })
1032    }
1033
1034    async fn set(&mut self, token: impl Into<String>) {
1035        let token = token.into();
1036        if let Err(error) = self.checkpointer.set(&token).await {
1037            emit!(JournaldCheckpointSetError {
1038                error,
1039                filename: self
1040                    .checkpointer
1041                    .filename
1042                    .to_str()
1043                    .unwrap_or("unknown")
1044                    .to_string(),
1045            });
1046        }
1047        self.cursor = Some(token);
1048    }
1049}
1050
1051#[derive(Clone)]
1052struct SharedCheckpointer(Arc<Mutex<StatefulCheckpointer>>);
1053
1054impl SharedCheckpointer {
1055    fn new(c: StatefulCheckpointer) -> Self {
1056        Self(Arc::new(Mutex::new(c)))
1057    }
1058
1059    async fn lock(&self) -> MutexGuard<'_, StatefulCheckpointer> {
1060        self.0.lock().await
1061    }
1062}
1063
1064#[cfg(test)]
1065mod checkpointer_tests {
1066    use tempfile::tempdir;
1067    use tokio::fs::read_to_string;
1068
1069    use super::*;
1070
1071    #[test]
1072    fn generate_config() {
1073        crate::test_util::test_generate_config::<JournaldConfig>();
1074    }
1075
1076    #[tokio::test]
1077    async fn journald_checkpointer_works() {
1078        let tempdir = tempdir().unwrap();
1079        let mut filename = tempdir.path().to_path_buf();
1080        filename.push(CHECKPOINT_FILENAME);
1081        let mut checkpointer = Checkpointer::new(filename.clone())
1082            .await
1083            .expect("Creating checkpointer failed!");
1084
1085        assert!(checkpointer.get().await.unwrap().is_none());
1086
1087        checkpointer
1088            .set("first test")
1089            .await
1090            .expect("Setting checkpoint failed");
1091        assert_eq!(checkpointer.get().await.unwrap().unwrap(), "first test");
1092        let contents = read_to_string(filename.clone())
1093            .await
1094            .unwrap_or_else(|_| panic!("Failed to read: {filename:?}"));
1095        assert!(contents.starts_with("first test\n"));
1096
1097        checkpointer
1098            .set("second")
1099            .await
1100            .expect("Setting checkpoint failed");
1101        assert_eq!(checkpointer.get().await.unwrap().unwrap(), "second");
1102        let contents = read_to_string(filename.clone())
1103            .await
1104            .unwrap_or_else(|_| panic!("Failed to read: {filename:?}"));
1105        assert!(contents.starts_with("second\n"));
1106    }
1107}
1108
1109#[cfg(test)]
1110mod tests {
1111    use std::{fs, path::Path};
1112
1113    use tempfile::tempdir;
1114    use tokio::time::{sleep, timeout, Duration, Instant};
1115    use vrl::value::{kind::Collection, Value};
1116
1117    use super::*;
1118    use crate::{
1119        config::ComponentKey, event::Event, event::EventStatus,
1120        test_util::components::assert_source_compliance,
1121    };
1122
1123    const TEST_COMPONENT: &str = "journald-test";
1124    const TEST_JOURNALCTL: &str = "tests/data/journalctl";
1125
1126    async fn run_with_units(iunits: &[&str], xunits: &[&str], cursor: Option<&str>) -> Vec<Event> {
1127        let include_matches = create_unit_matches(iunits.to_vec());
1128        let exclude_matches = create_unit_matches(xunits.to_vec());
1129        run_journal(include_matches, exclude_matches, cursor, false).await
1130    }
1131
1132    async fn run_journal(
1133        include_matches: Matches,
1134        exclude_matches: Matches,
1135        checkpoint: Option<&str>,
1136        emit_cursor: bool,
1137    ) -> Vec<Event> {
1138        assert_source_compliance(&["protocol"], async move {
1139            let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Delivered);
1140
1141            let tempdir = tempdir().unwrap();
1142            let tempdir = tempdir.path().to_path_buf();
1143
1144            if let Some(cursor) = checkpoint {
1145                let mut checkpoint_path = tempdir.clone();
1146                checkpoint_path.push(TEST_COMPONENT);
1147                fs::create_dir(&checkpoint_path).unwrap();
1148                checkpoint_path.push(CHECKPOINT_FILENAME);
1149
1150                let mut checkpointer = Checkpointer::new(checkpoint_path.clone())
1151                    .await
1152                    .expect("Creating checkpointer failed!");
1153
1154                checkpointer
1155                    .set(cursor)
1156                    .await
1157                    .expect("Could not set checkpoint");
1158            }
1159
1160            let (cx, shutdown) =
1161                SourceContext::new_shutdown(&ComponentKey::from(TEST_COMPONENT), tx);
1162            let config = JournaldConfig {
1163                journalctl_path: Some(TEST_JOURNALCTL.into()),
1164                include_matches,
1165                exclude_matches,
1166                data_dir: Some(tempdir),
1167                remap_priority: true,
1168                acknowledgements: false.into(),
1169                emit_cursor,
1170                ..Default::default()
1171            };
1172            let source = config.build(cx).await.unwrap();
1173            tokio::spawn(async move { source.await.unwrap() });
1174
1175            sleep(Duration::from_millis(100)).await;
1176            shutdown
1177                .shutdown_all(Some(Instant::now() + Duration::from_secs(1)))
1178                .await;
1179
1180            timeout(Duration::from_secs(1), rx.collect()).await.unwrap()
1181        })
1182        .await
1183    }
1184
1185    fn create_unit_matches<S: Into<String>>(units: Vec<S>) -> Matches {
1186        let units: HashSet<String> = units.into_iter().map(Into::into).collect();
1187        let mut map = HashMap::new();
1188        if !units.is_empty() {
1189            map.insert(String::from(SYSTEMD_UNIT), units);
1190        }
1191        map
1192    }
1193
1194    fn create_matches<S: Into<String>>(conditions: Vec<(S, S)>) -> Matches {
1195        let mut matches: Matches = HashMap::new();
1196        for (field, value) in conditions {
1197            matches
1198                .entry(field.into())
1199                .or_default()
1200                .insert(value.into());
1201        }
1202        matches
1203    }
1204
1205    #[tokio::test]
1206    async fn reads_journal() {
1207        let received = run_with_units(&[], &[], None).await;
1208        assert_eq!(received.len(), 8);
1209        assert_eq!(
1210            message(&received[0]),
1211            Value::Bytes("System Initialization".into())
1212        );
1213        assert_eq!(
1214            received[0].as_log()[log_schema().source_type_key().unwrap().to_string()],
1215            "journald".into()
1216        );
1217        assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140001000));
1218        assert_eq!(priority(&received[0]), Value::Bytes("INFO".into()));
1219        assert_eq!(message(&received[1]), Value::Bytes("unit message".into()));
1220        assert_eq!(timestamp(&received[1]), value_ts(1578529839, 140002000));
1221        assert_eq!(priority(&received[1]), Value::Bytes("DEBUG".into()));
1222    }
1223
1224    #[tokio::test]
1225    async fn includes_units() {
1226        let received = run_with_units(&["unit.service"], &[], None).await;
1227        assert_eq!(received.len(), 1);
1228        assert_eq!(message(&received[0]), Value::Bytes("unit message".into()));
1229    }
1230
1231    #[tokio::test]
1232    async fn excludes_units() {
1233        let received = run_with_units(&[], &["unit.service", "badunit.service"], None).await;
1234        assert_eq!(received.len(), 6);
1235        assert_eq!(
1236            message(&received[0]),
1237            Value::Bytes("System Initialization".into())
1238        );
1239        assert_eq!(
1240            message(&received[1]),
1241            Value::Bytes("Missing timestamp".into())
1242        );
1243        assert_eq!(
1244            message(&received[2]),
1245            Value::Bytes("Different timestamps".into())
1246        );
1247    }
1248
1249    #[tokio::test]
1250    async fn emits_cursor() {
1251        let received = run_journal(Matches::new(), Matches::new(), None, true).await;
1252        assert_eq!(cursor(&received[0]), Value::Bytes("1".into()));
1253        assert_eq!(cursor(&received[3]), Value::Bytes("4".into()));
1254        assert_eq!(cursor(&received[7]), Value::Bytes("8".into()));
1255    }
1256
1257    #[tokio::test]
1258    async fn includes_matches() {
1259        let matches = create_matches(vec![("PRIORITY", "ERR")]);
1260        let received = run_journal(matches, HashMap::new(), None, false).await;
1261        assert_eq!(received.len(), 2);
1262        assert_eq!(
1263            message(&received[0]),
1264            Value::Bytes("Different timestamps".into())
1265        );
1266        assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140005000));
1267        assert_eq!(
1268            message(&received[1]),
1269            Value::Bytes("Non-ASCII in other field".into())
1270        );
1271        assert_eq!(timestamp(&received[1]), value_ts(1578529839, 140005000));
1272    }
1273
1274    #[tokio::test]
1275    async fn includes_kernel() {
1276        let matches = create_matches(vec![("_TRANSPORT", "kernel")]);
1277        let received = run_journal(matches, HashMap::new(), None, false).await;
1278        assert_eq!(received.len(), 1);
1279        assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140006000));
1280        assert_eq!(message(&received[0]), Value::Bytes("audit log".into()));
1281    }
1282
1283    #[tokio::test]
1284    async fn excludes_matches() {
1285        let matches = create_matches(vec![("PRIORITY", "INFO"), ("PRIORITY", "DEBUG")]);
1286        let received = run_journal(HashMap::new(), matches, None, false).await;
1287        assert_eq!(received.len(), 5);
1288        assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140003000));
1289        assert_eq!(timestamp(&received[1]), value_ts(1578529839, 140004000));
1290        assert_eq!(timestamp(&received[2]), value_ts(1578529839, 140005000));
1291        assert_eq!(timestamp(&received[3]), value_ts(1578529839, 140005000));
1292        assert_eq!(timestamp(&received[4]), value_ts(1578529839, 140006000));
1293    }
1294
1295    #[tokio::test]
1296    async fn handles_checkpoint() {
1297        let received = run_with_units(&[], &[], Some("1")).await;
1298        assert_eq!(received.len(), 7);
1299        assert_eq!(message(&received[0]), Value::Bytes("unit message".into()));
1300        assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140002000));
1301    }
1302
1303    #[tokio::test]
1304    async fn parses_array_messages() {
1305        let received = run_with_units(&["badunit.service"], &[], None).await;
1306        assert_eq!(received.len(), 1);
1307        assert_eq!(message(&received[0]), Value::Bytes("¿Hello?".into()));
1308    }
1309
1310    #[tokio::test]
1311    async fn parses_array_fields() {
1312        let received = run_with_units(&["syslog.service"], &[], None).await;
1313        assert_eq!(received.len(), 1);
1314        assert_eq!(
1315            received[0].as_log()["SYSLOG_RAW"],
1316            Value::Bytes("¿World?".into())
1317        );
1318    }
1319
1320    #[tokio::test]
1321    async fn parses_string_sequences() {
1322        let received = run_with_units(&["NetworkManager.service"], &[], None).await;
1323        assert_eq!(received.len(), 1);
1324        assert_eq!(
1325            received[0].as_log()["SYSLOG_FACILITY"],
1326            Value::Bytes(r#"["DHCP4","DHCP6"]"#.into())
1327        );
1328    }
1329
1330    #[tokio::test]
1331    async fn handles_missing_timestamp() {
1332        let received = run_with_units(&["stdout"], &[], None).await;
1333        assert_eq!(received.len(), 2);
1334        assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140004000));
1335        assert_eq!(timestamp(&received[1]), value_ts(1578529839, 140005000));
1336    }
1337
1338    #[tokio::test]
1339    async fn handles_acknowledgements() {
1340        let (tx, mut rx) = SourceSender::new_test();
1341
1342        let tempdir = tempdir().unwrap();
1343        let tempdir = tempdir.path().to_path_buf();
1344        let mut checkpoint_path = tempdir.clone();
1345        checkpoint_path.push(TEST_COMPONENT);
1346        fs::create_dir(&checkpoint_path).unwrap();
1347        checkpoint_path.push(CHECKPOINT_FILENAME);
1348
1349        let mut checkpointer = Checkpointer::new(checkpoint_path.clone())
1350            .await
1351            .expect("Creating checkpointer failed!");
1352
1353        let config = JournaldConfig {
1354            journalctl_path: Some(TEST_JOURNALCTL.into()),
1355            data_dir: Some(tempdir),
1356            remap_priority: true,
1357            acknowledgements: true.into(),
1358            ..Default::default()
1359        };
1360        let (cx, _shutdown) = SourceContext::new_shutdown(&ComponentKey::from(TEST_COMPONENT), tx);
1361        let source = config.build(cx).await.unwrap();
1362        tokio::spawn(async move { source.await.unwrap() });
1363
1364        // Make sure the checkpointer cursor is empty
1365        assert_eq!(checkpointer.get().await.unwrap(), None);
1366
1367        tokio::time::sleep(Duration::from_millis(100)).await;
1368
1369        // Acknowledge all the received events.
1370        let mut count = 0;
1371        while let Poll::Ready(Some(event)) = futures::poll!(rx.next()) {
1372            // The checkpointer shouldn't set the cursor until the end of the batch.
1373            assert_eq!(checkpointer.get().await.unwrap(), None);
1374            event.metadata().update_status(EventStatus::Delivered);
1375            count += 1;
1376        }
1377        assert_eq!(count, 8);
1378
1379        tokio::time::sleep(Duration::from_millis(100)).await;
1380        assert_eq!(checkpointer.get().await.unwrap().as_deref(), Some("8"));
1381    }
1382
1383    #[test]
1384    fn filter_matches_works_correctly() {
1385        let empty: Matches = HashMap::new();
1386        let includes = create_unit_matches(vec!["one", "two"]);
1387        let excludes = create_unit_matches(vec!["foo", "bar"]);
1388
1389        let zero = HashMap::new();
1390        assert!(!filter_matches(&zero, &empty, &empty));
1391        assert!(filter_matches(&zero, &includes, &empty));
1392        assert!(!filter_matches(&zero, &empty, &excludes));
1393        assert!(filter_matches(&zero, &includes, &excludes));
1394        let mut one = HashMap::new();
1395        one.insert(String::from(SYSTEMD_UNIT), String::from("one"));
1396        assert!(!filter_matches(&one, &empty, &empty));
1397        assert!(!filter_matches(&one, &includes, &empty));
1398        assert!(!filter_matches(&one, &empty, &excludes));
1399        assert!(!filter_matches(&one, &includes, &excludes));
1400        let mut two = HashMap::new();
1401        two.insert(String::from(SYSTEMD_UNIT), String::from("bar"));
1402        assert!(!filter_matches(&two, &empty, &empty));
1403        assert!(filter_matches(&two, &includes, &empty));
1404        assert!(filter_matches(&two, &empty, &excludes));
1405        assert!(filter_matches(&two, &includes, &excludes));
1406    }
1407
1408    #[test]
1409    fn merges_units_and_matches_option() {
1410        let include_units = vec!["one", "two"].into_iter().map(String::from).collect();
1411        let include_matches = create_matches(vec![
1412            ("_SYSTEMD_UNIT", "three.service"),
1413            ("_TRANSPORT", "kernel"),
1414        ]);
1415
1416        let exclude_units = vec!["foo", "bar"].into_iter().map(String::from).collect();
1417        let exclude_matches = create_matches(vec![
1418            ("_SYSTEMD_UNIT", "baz.service"),
1419            ("PRIORITY", "DEBUG"),
1420        ]);
1421
1422        let journald_config = JournaldConfig {
1423            include_units,
1424            include_matches,
1425            exclude_units,
1426            exclude_matches,
1427            ..Default::default()
1428        };
1429
1430        let hashset =
1431            |v: &[&str]| -> HashSet<String> { v.iter().copied().map(String::from).collect() };
1432
1433        let matches = journald_config.merged_include_matches();
1434        let units = matches.get("_SYSTEMD_UNIT").unwrap();
1435        assert_eq!(
1436            units,
1437            &hashset(&["one.service", "two.service", "three.service"])
1438        );
1439        let units = matches.get("_TRANSPORT").unwrap();
1440        assert_eq!(units, &hashset(&["kernel"]));
1441
1442        let matches = journald_config.merged_exclude_matches();
1443        let units = matches.get("_SYSTEMD_UNIT").unwrap();
1444        assert_eq!(
1445            units,
1446            &hashset(&["foo.service", "bar.service", "baz.service"])
1447        );
1448        let units = matches.get("PRIORITY").unwrap();
1449        assert_eq!(units, &hashset(&["DEBUG"]));
1450    }
1451
1452    #[test]
1453    fn find_duplicate_match_works_correctly() {
1454        let include_matches = create_matches(vec![("_TRANSPORT", "kernel")]);
1455        let exclude_matches = create_matches(vec![("_TRANSPORT", "kernel")]);
1456        let (field, value) = find_duplicate_match(&include_matches, &exclude_matches).unwrap();
1457        assert_eq!(field, "_TRANSPORT");
1458        assert_eq!(value, "kernel");
1459
1460        let empty = HashMap::new();
1461        let actual = find_duplicate_match(&empty, &empty);
1462        assert!(actual.is_none());
1463
1464        let actual = find_duplicate_match(&include_matches, &empty);
1465        assert!(actual.is_none());
1466
1467        let actual = find_duplicate_match(&empty, &exclude_matches);
1468        assert!(actual.is_none());
1469    }
1470
1471    #[test]
1472    fn command_options() {
1473        let path = PathBuf::from("journalctl");
1474
1475        let journal_dir = None;
1476        let journal_namespace = None;
1477        let current_boot_only = false;
1478        let cursor = None;
1479        let since_now = false;
1480        let extra_args = vec![];
1481
1482        let command = create_command(
1483            &path,
1484            journal_dir,
1485            journal_namespace,
1486            current_boot_only,
1487            since_now,
1488            cursor,
1489            extra_args,
1490        );
1491        let cmd_line = format!("{command:?}");
1492        assert!(!cmd_line.contains("--directory="));
1493        assert!(!cmd_line.contains("--namespace="));
1494        assert!(!cmd_line.contains("--boot"));
1495        assert!(cmd_line.contains("--since=2000-01-01"));
1496
1497        let journal_dir = None;
1498        let journal_namespace = None;
1499        let since_now = true;
1500        let extra_args = vec![];
1501
1502        let command = create_command(
1503            &path,
1504            journal_dir,
1505            journal_namespace,
1506            current_boot_only,
1507            since_now,
1508            cursor,
1509            extra_args,
1510        );
1511        let cmd_line = format!("{command:?}");
1512        assert!(cmd_line.contains("--since=now"));
1513
1514        let journal_dir = Some(PathBuf::from("/tmp/journal-dir"));
1515        let journal_namespace = Some(String::from("my_namespace"));
1516        let current_boot_only = true;
1517        let cursor = Some("2021-01-01");
1518        let extra_args = vec!["--merge".to_string()];
1519
1520        let command = create_command(
1521            &path,
1522            journal_dir,
1523            journal_namespace,
1524            current_boot_only,
1525            since_now,
1526            cursor,
1527            extra_args,
1528        );
1529        let cmd_line = format!("{command:?}");
1530        assert!(cmd_line.contains("--directory=/tmp/journal-dir"));
1531        assert!(cmd_line.contains("--namespace=my_namespace"));
1532        assert!(cmd_line.contains("--boot"));
1533        assert!(cmd_line.contains("--after-cursor="));
1534        assert!(cmd_line.contains("--merge"));
1535    }
1536
1537    fn create_command(
1538        path: &Path,
1539        journal_dir: Option<PathBuf>,
1540        journal_namespace: Option<String>,
1541        current_boot_only: bool,
1542        since_now: bool,
1543        cursor: Option<&str>,
1544        extra_args: Vec<String>,
1545    ) -> Command {
1546        StartJournalctl::new(
1547            path.into(),
1548            journal_dir,
1549            journal_namespace,
1550            current_boot_only,
1551            since_now,
1552            extra_args,
1553        )
1554        .make_command(cursor)
1555    }
1556
1557    fn message(event: &Event) -> Value {
1558        event.as_log()[log_schema().message_key().unwrap().to_string()].clone()
1559    }
1560
1561    fn timestamp(event: &Event) -> Value {
1562        event.as_log()[log_schema().timestamp_key().unwrap().to_string()].clone()
1563    }
1564
1565    fn cursor(event: &Event) -> Value {
1566        event.as_log()[CURSOR].clone()
1567    }
1568
1569    fn value_ts(secs: i64, usecs: u32) -> Value {
1570        Value::Timestamp(
1571            chrono::Utc
1572                .timestamp_opt(secs, usecs)
1573                .single()
1574                .expect("invalid timestamp"),
1575        )
1576    }
1577
1578    fn priority(event: &Event) -> Value {
1579        event.as_log()["PRIORITY"].clone()
1580    }
1581
1582    #[test]
1583    fn output_schema_definition_vector_namespace() {
1584        let config = JournaldConfig {
1585            log_namespace: Some(true),
1586            ..Default::default()
1587        };
1588
1589        let definitions = config
1590            .outputs(LogNamespace::Vector)
1591            .remove(0)
1592            .schema_definition(true);
1593
1594        let expected_definition =
1595            Definition::new_with_default_metadata(Kind::bytes().or_null(), [LogNamespace::Vector])
1596                .with_metadata_field(
1597                    &owned_value_path!("vector", "source_type"),
1598                    Kind::bytes(),
1599                    None,
1600                )
1601                .with_metadata_field(
1602                    &owned_value_path!("vector", "ingest_timestamp"),
1603                    Kind::timestamp(),
1604                    None,
1605                )
1606                .with_metadata_field(
1607                    &owned_value_path!(JournaldConfig::NAME, "metadata"),
1608                    Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1609                    None,
1610                )
1611                .with_metadata_field(
1612                    &owned_value_path!(JournaldConfig::NAME, "timestamp"),
1613                    Kind::timestamp().or_undefined(),
1614                    Some("timestamp"),
1615                )
1616                .with_metadata_field(
1617                    &owned_value_path!(JournaldConfig::NAME, "host"),
1618                    Kind::bytes().or_undefined(),
1619                    Some("host"),
1620                );
1621
1622        assert_eq!(definitions, Some(expected_definition))
1623    }
1624
1625    #[test]
1626    fn output_schema_definition_legacy_namespace() {
1627        let config = JournaldConfig::default();
1628
1629        let definitions = config
1630            .outputs(LogNamespace::Legacy)
1631            .remove(0)
1632            .schema_definition(true);
1633
1634        let expected_definition = Definition::new_with_default_metadata(
1635            Kind::object(Collection::empty()),
1636            [LogNamespace::Legacy],
1637        )
1638        .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
1639        .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
1640        .with_event_field(
1641            &owned_value_path!("host"),
1642            Kind::bytes().or_undefined(),
1643            Some("host"),
1644        )
1645        .unknown_fields(Kind::bytes());
1646
1647        assert_eq!(definitions, Some(expected_definition))
1648    }
1649
1650    fn matches_schema(config: &JournaldConfig, namespace: LogNamespace) {
1651        let record = r#"{
1652            "PRIORITY":"6",
1653            "SYSLOG_FACILITY":"3",
1654            "SYSLOG_IDENTIFIER":"ntpd",
1655            "_BOOT_ID":"124c781146e841ae8d9b4590df8b9231",
1656            "_CAP_EFFECTIVE":"3fffffffff",
1657            "_CMDLINE":"ntpd: [priv]",
1658            "_COMM":"ntpd",
1659            "_EXE":"/usr/sbin/ntpd",
1660            "_GID":"0",
1661            "_MACHINE_ID":"c36e9ea52800a19d214cb71b53263a28",
1662            "_PID":"2156",
1663            "_STREAM_ID":"92c79f4b45c4457490ebdefece29995e",
1664            "_SYSTEMD_CGROUP":"/system.slice/ntpd.service",
1665            "_SYSTEMD_INVOCATION_ID":"496ad5cd046d48e29f37f559a6d176f8",
1666            "_SYSTEMD_SLICE":"system.slice",
1667            "_SYSTEMD_UNIT":"ntpd.service",
1668            "_TRANSPORT":"stdout",
1669            "_UID":"0",
1670            "__MONOTONIC_TIMESTAMP":"98694000446",
1671            "__REALTIME_TIMESTAMP":"1564173027000443",
1672            "host":"my-host.local",
1673            "message":"reply from 192.168.1.2: offset -0.001791 delay 0.000176, next query 1500s",
1674            "source_type":"journald"
1675        }"#;
1676
1677        let json: serde_json::Value = serde_json::from_str(record).unwrap();
1678        let mut event = Event::from(LogEvent::from(vrl::value::Value::from(json)));
1679
1680        event.as_mut_log().insert("timestamp", chrono::Utc::now());
1681
1682        let definitions = config.outputs(namespace).remove(0).schema_definition(true);
1683
1684        definitions.unwrap().assert_valid_for_event(&event);
1685    }
1686
1687    #[test]
1688    fn matches_schema_legacy() {
1689        let config = JournaldConfig::default();
1690
1691        matches_schema(&config, LogNamespace::Legacy)
1692    }
1693}