vector/sources/
journald.rs

1use std::{
2    collections::{HashMap, HashSet},
3    io::SeekFrom,
4    path::PathBuf,
5    process::Stdio,
6    str::FromStr,
7    sync::{Arc, LazyLock},
8    time::Duration,
9};
10
11use bytes::Bytes;
12use chrono::{TimeZone, Utc};
13use futures::{StreamExt, poll, stream::BoxStream, task::Poll};
14use nix::{
15    sys::signal::{Signal, kill},
16    unistd::Pid,
17};
18use serde_json::{Error as JsonError, Value as JsonValue};
19use snafu::{ResultExt, Snafu};
20use tokio::{
21    fs::{File, OpenOptions},
22    io::{self, AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
23    process::{Child, Command},
24    sync::{Mutex, MutexGuard},
25    time::sleep,
26};
27use tokio_util::codec::FramedRead;
28use vector_lib::{
29    EstimatedJsonEncodedSizeOf,
30    codecs::{CharacterDelimitedDecoder, decoding::BoxedFramingError},
31    config::{LegacyKey, LogNamespace},
32    configurable::configurable_component,
33    finalizer::OrderedFinalizer,
34    internal_event::{
35        ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol, Registered,
36    },
37    lookup::{metadata_path, owned_value_path, path},
38    schema::Definition,
39};
40use vrl::{
41    event_path,
42    value::{Kind, Value, kind::Collection},
43};
44
45use crate::{
46    SourceSender,
47    config::{
48        DataType, SourceAcknowledgementsConfig, SourceConfig, SourceContext, SourceOutput,
49        log_schema,
50    },
51    event::{BatchNotifier, BatchStatus, BatchStatusReceiver, LogEvent},
52    internal_events::{
53        EventsReceived, JournaldCheckpointFileOpenError, JournaldCheckpointSetError,
54        JournaldInvalidRecordError, JournaldReadError, JournaldStartJournalctlError,
55        StreamClosedError,
56    },
57    serde::bool_or_struct,
58    shutdown::ShutdownSignal,
59};
60
61const BATCH_TIMEOUT: Duration = Duration::from_millis(10);
62
63const CHECKPOINT_FILENAME: &str = "checkpoint.txt";
64const CURSOR: &str = "__CURSOR";
65const HOSTNAME: &str = "_HOSTNAME";
66const MESSAGE: &str = "MESSAGE";
67const SYSTEMD_UNIT: &str = "_SYSTEMD_UNIT";
68const SOURCE_TIMESTAMP: &str = "_SOURCE_REALTIME_TIMESTAMP";
69const RECEIVED_TIMESTAMP: &str = "__REALTIME_TIMESTAMP";
70
71const BACKOFF_DURATION: Duration = Duration::from_secs(1);
72
73static JOURNALCTL: LazyLock<PathBuf> = LazyLock::new(|| "journalctl".into());
74
75#[derive(Debug, Snafu)]
76enum BuildError {
77    #[snafu(display("journalctl failed to execute: {}", source))]
78    JournalctlSpawn { source: io::Error },
79    #[snafu(display(
80        "The unit {:?} is duplicated in both include_units and exclude_units",
81        unit
82    ))]
83    DuplicatedUnit { unit: String },
84    #[snafu(display(
85        "The Journal field/value pair {:?}:{:?} is duplicated in both include_matches and exclude_matches.",
86        field,
87        value,
88    ))]
89    DuplicatedMatches { field: String, value: String },
90}
91
92type Matches = HashMap<String, HashSet<String>>;
93
94/// Configuration for the `journald` source.
95#[configurable_component(source("journald", "Collect logs from JournalD."))]
96#[derive(Clone, Debug)]
97#[serde(deny_unknown_fields)]
98pub struct JournaldConfig {
99    /// Only include entries that appended to the journal after the entries have been read.
100    #[serde(default)]
101    pub since_now: bool,
102
103    /// Only include entries that occurred after the current boot of the system.
104    #[serde(default = "crate::serde::default_true")]
105    pub current_boot_only: bool,
106
107    /// A list of unit names to monitor.
108    ///
109    /// If empty or not present, all units are accepted.
110    ///
111    /// Unit names lacking a `.` have `.service` appended to make them a valid service unit name.
112    ///
113    /// **Note:** This option matches only the `_SYSTEMD_UNIT` field, which is narrower than `journalctl --unit`.
114    /// Messages from systemd about unit lifecycle (start/stop) have `_SYSTEMD_UNIT=init.scope` and will not match.
115    /// To capture these, explicitly include `init.scope` or use `include_matches` for finer control.
116    #[serde(default)]
117    #[configurable(metadata(docs::examples = "ntpd", docs::examples = "sysinit.target"))]
118    pub include_units: Vec<String>,
119
120    /// A list of unit names to exclude from monitoring.
121    ///
122    /// Unit names lacking a `.` have `.service` appended to make them a valid service unit
123    /// name.
124    #[serde(default)]
125    #[configurable(metadata(docs::examples = "badservice", docs::examples = "sysinit.target"))]
126    pub exclude_units: Vec<String>,
127
128    /// A list of sets of field/value pairs to monitor.
129    ///
130    /// If empty or not present, all journal fields are accepted.
131    ///
132    /// If `include_units` is specified, it is merged into this list.
133    #[serde(default)]
134    #[configurable(metadata(
135        docs::additional_props_description = "The set of field values to match in journal entries that are to be included."
136    ))]
137    #[configurable(metadata(docs::examples = "matches_examples()"))]
138    pub include_matches: Matches,
139
140    /// A list of sets of field/value pairs that, if any are present in a journal entry,
141    /// excludes the entry from this source.
142    ///
143    /// If `exclude_units` is specified, it is merged into this list.
144    #[serde(default)]
145    #[configurable(metadata(
146        docs::additional_props_description = "The set of field values to match in journal entries that are to be excluded."
147    ))]
148    #[configurable(metadata(docs::examples = "matches_examples()"))]
149    pub exclude_matches: Matches,
150
151    /// The directory used to persist file checkpoint positions.
152    ///
153    /// By default, the [global `data_dir` option][global_data_dir] is used.
154    /// Make sure the running user has write permissions to this directory.
155    ///
156    /// If this directory is specified, then Vector will attempt to create it.
157    ///
158    /// [global_data_dir]: https://vector.dev/docs/reference/configuration/global-options/#data_dir
159    #[serde(default)]
160    #[configurable(metadata(docs::examples = "/var/lib/vector"))]
161    #[configurable(metadata(docs::human_name = "Data Directory"))]
162    pub data_dir: Option<PathBuf>,
163
164    /// A list of extra command line arguments to pass to `journalctl`.
165    ///
166    /// If specified, it is merged to the command line arguments as-is.
167    #[serde(default)]
168    #[configurable(metadata(docs::examples = "--merge"))]
169    pub extra_args: Vec<String>,
170
171    /// The systemd journal is read in batches, and a checkpoint is set at the end of each batch.
172    ///
173    /// This option limits the size of the batch.
174    #[serde(default = "default_batch_size")]
175    #[configurable(metadata(docs::type_unit = "events"))]
176    pub batch_size: usize,
177
178    /// The full path of the `journalctl` executable.
179    ///
180    /// If not set, a search is done for the `journalctl` path.
181    #[serde(default)]
182    pub journalctl_path: Option<PathBuf>,
183
184    /// The full path of the journal directory.
185    ///
186    /// If not set, `journalctl` uses the default system journal path.
187    #[serde(default)]
188    pub journal_directory: Option<PathBuf>,
189
190    /// The [journal namespace][journal-namespace].
191    ///
192    /// This value is passed to `journalctl` through the [`--namespace` option][journalctl-namespace-option].
193    /// If not set, `journalctl` uses the default namespace.
194    ///
195    /// [journal-namespace]: https://www.freedesktop.org/software/systemd/man/systemd-journald.service.html#Journal%20Namespaces
196    /// [journalctl-namespace-option]: https://www.freedesktop.org/software/systemd/man/journalctl.html#--namespace=NAMESPACE
197    #[serde(default)]
198    pub journal_namespace: Option<String>,
199
200    #[configurable(derived)]
201    #[serde(default, deserialize_with = "bool_or_struct")]
202    acknowledgements: SourceAcknowledgementsConfig,
203
204    /// Enables remapping the `PRIORITY` field from an integer to string value.
205    ///
206    /// Has no effect unless the value of the field is already an integer.
207    #[serde(default)]
208    #[configurable(
209        deprecated = "This option has been deprecated, use the `remap` transform and `to_syslog_level` function instead."
210    )]
211    remap_priority: bool,
212
213    /// The namespace to use for logs. This overrides the global setting.
214    #[configurable(metadata(docs::hidden))]
215    #[serde(default)]
216    log_namespace: Option<bool>,
217
218    /// Whether to emit the [__CURSOR field][cursor]. See also [sd_journal_get_cursor][get_cursor].
219    ///
220    /// [cursor]: https://www.freedesktop.org/software/systemd/man/latest/systemd.journal-fields.html#Address%20Fields
221    /// [get_cursor]: https://www.freedesktop.org/software/systemd/man/latest/sd_journal_get_cursor.html
222    #[serde(default = "crate::serde::default_false")]
223    emit_cursor: bool,
224}
225
226const fn default_batch_size() -> usize {
227    16
228}
229
230fn matches_examples() -> HashMap<String, Vec<String>> {
231    HashMap::<_, _>::from_iter([
232        (
233            "_SYSTEMD_UNIT".to_owned(),
234            vec!["sshd.service".to_owned(), "ntpd.service".to_owned()],
235        ),
236        ("_TRANSPORT".to_owned(), vec!["kernel".to_owned()]),
237    ])
238}
239
240impl JournaldConfig {
241    fn merged_include_matches(&self) -> Matches {
242        Self::merge_units(&self.include_matches, &self.include_units)
243    }
244
245    fn merged_exclude_matches(&self) -> Matches {
246        Self::merge_units(&self.exclude_matches, &self.exclude_units)
247    }
248
249    fn merge_units(matches: &Matches, units: &[String]) -> Matches {
250        let mut matches = matches.clone();
251        for unit in units {
252            let entry = matches.entry(String::from(SYSTEMD_UNIT));
253            entry.or_default().insert(fixup_unit(unit));
254        }
255        matches
256    }
257
258    /// Builds the `schema::Definition` for this source using the provided `LogNamespace`.
259    fn schema_definition(&self, log_namespace: LogNamespace) -> Definition {
260        let schema_definition = match log_namespace {
261            LogNamespace::Vector => Definition::new_with_default_metadata(
262                Kind::bytes().or_null(),
263                [LogNamespace::Vector],
264            ),
265            LogNamespace::Legacy => Definition::new_with_default_metadata(
266                Kind::object(Collection::empty()),
267                [LogNamespace::Legacy],
268            ),
269        };
270
271        let mut schema_definition = schema_definition
272            .with_standard_vector_source_metadata()
273            // for metadata that is added to the events dynamically through the Record
274            .with_source_metadata(
275                JournaldConfig::NAME,
276                None,
277                &owned_value_path!("metadata"),
278                Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
279                None,
280            )
281            .with_source_metadata(
282                JournaldConfig::NAME,
283                None,
284                &owned_value_path!("timestamp"),
285                Kind::timestamp().or_undefined(),
286                Some("timestamp"),
287            )
288            .with_source_metadata(
289                JournaldConfig::NAME,
290                log_schema().host_key().cloned().map(LegacyKey::Overwrite),
291                &owned_value_path!("host"),
292                Kind::bytes().or_undefined(),
293                Some("host"),
294            );
295
296        // for metadata that is added to the events dynamically through the Record
297        if log_namespace == LogNamespace::Legacy {
298            schema_definition = schema_definition.unknown_fields(Kind::bytes());
299        }
300
301        schema_definition
302    }
303}
304
305impl Default for JournaldConfig {
306    fn default() -> Self {
307        Self {
308            since_now: false,
309            current_boot_only: true,
310            include_units: vec![],
311            exclude_units: vec![],
312            include_matches: Default::default(),
313            exclude_matches: Default::default(),
314            data_dir: None,
315            batch_size: default_batch_size(),
316            journalctl_path: None,
317            journal_directory: None,
318            journal_namespace: None,
319            extra_args: vec![],
320            acknowledgements: Default::default(),
321            remap_priority: false,
322            log_namespace: None,
323            emit_cursor: false,
324        }
325    }
326}
327
328impl_generate_config_from_default!(JournaldConfig);
329
330type Record = HashMap<String, String>;
331
332#[async_trait::async_trait]
333#[typetag::serde(name = "journald")]
334impl SourceConfig for JournaldConfig {
335    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
336        if self.remap_priority {
337            warn!(
338                "DEPRECATION, option `remap_priority` has been deprecated. Please use the `remap` transform and function `to_syslog_level` instead."
339            );
340        }
341
342        let data_dir = cx
343            .globals
344            // source are only global, name can be used for subdir
345            .resolve_and_make_data_subdir(self.data_dir.as_ref(), cx.key.id())?;
346
347        if let Some(unit) = self
348            .include_units
349            .iter()
350            .find(|unit| self.exclude_units.contains(unit))
351        {
352            let unit = unit.into();
353            return Err(BuildError::DuplicatedUnit { unit }.into());
354        }
355
356        let include_matches = self.merged_include_matches();
357        let exclude_matches = self.merged_exclude_matches();
358
359        if let Some((field, value)) = find_duplicate_match(&include_matches, &exclude_matches) {
360            return Err(BuildError::DuplicatedMatches { field, value }.into());
361        }
362
363        let mut checkpoint_path = data_dir;
364        checkpoint_path.push(CHECKPOINT_FILENAME);
365
366        let journalctl_path = self
367            .journalctl_path
368            .clone()
369            .unwrap_or_else(|| JOURNALCTL.clone());
370
371        let starter = StartJournalctl::new(
372            journalctl_path,
373            self.journal_directory.clone(),
374            self.journal_namespace.clone(),
375            self.current_boot_only,
376            self.since_now,
377            self.extra_args.clone(),
378        );
379
380        let batch_size = self.batch_size;
381        let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
382        let log_namespace = cx.log_namespace(self.log_namespace);
383
384        Ok(Box::pin(
385            JournaldSource {
386                include_matches,
387                exclude_matches,
388                checkpoint_path,
389                batch_size,
390                remap_priority: self.remap_priority,
391                out: cx.out,
392                acknowledgements,
393                starter,
394                log_namespace,
395                emit_cursor: self.emit_cursor,
396            }
397            .run_shutdown(cx.shutdown),
398        ))
399    }
400
401    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
402        let schema_definition =
403            self.schema_definition(global_log_namespace.merge(self.log_namespace));
404
405        vec![SourceOutput::new_maybe_logs(
406            DataType::Log,
407            schema_definition,
408        )]
409    }
410
411    fn can_acknowledge(&self) -> bool {
412        true
413    }
414}
415
416struct JournaldSource {
417    include_matches: Matches,
418    exclude_matches: Matches,
419    checkpoint_path: PathBuf,
420    batch_size: usize,
421    remap_priority: bool,
422    out: SourceSender,
423    acknowledgements: bool,
424    starter: StartJournalctl,
425    log_namespace: LogNamespace,
426    emit_cursor: bool,
427}
428
429impl JournaldSource {
430    async fn run_shutdown(self, shutdown: ShutdownSignal) -> Result<(), ()> {
431        let checkpointer = StatefulCheckpointer::new(self.checkpoint_path.clone())
432            .await
433            .map_err(|error| {
434                emit!(JournaldCheckpointFileOpenError {
435                    error,
436                    path: self
437                        .checkpoint_path
438                        .to_str()
439                        .unwrap_or("unknown")
440                        .to_string(),
441                });
442            })?;
443
444        let checkpointer = SharedCheckpointer::new(checkpointer);
445        let finalizer = Finalizer::new(
446            self.acknowledgements,
447            checkpointer.clone(),
448            shutdown.clone(),
449        );
450
451        self.run(checkpointer, finalizer, shutdown).await;
452
453        Ok(())
454    }
455
456    async fn run(
457        mut self,
458        checkpointer: SharedCheckpointer,
459        finalizer: Finalizer,
460        mut shutdown: ShutdownSignal,
461    ) {
462        loop {
463            if matches!(poll!(&mut shutdown), Poll::Ready(_)) {
464                break;
465            }
466
467            info!("Starting journalctl.");
468            let cursor = checkpointer.lock().await.cursor.clone();
469            match self.starter.start(cursor.as_deref()) {
470                Ok((stdout_stream, stderr_stream, running)) => {
471                    if !self
472                        .run_stream(stdout_stream, stderr_stream, &finalizer, shutdown.clone())
473                        .await
474                    {
475                        return;
476                    }
477                    // Explicit drop to ensure it isn't dropped earlier.
478                    drop(running);
479                }
480                Err(error) => {
481                    emit!(JournaldStartJournalctlError { error });
482                }
483            }
484
485            // journalctl process should never stop,
486            // so it is an error if we reach here.
487            tokio::select! {
488                _ = &mut shutdown => break,
489                _ = sleep(BACKOFF_DURATION) => (),
490            }
491        }
492    }
493
494    /// Process `journalctl` output until some error occurs.
495    /// Return `true` if should restart `journalctl`.
496    async fn run_stream<'a>(
497        &'a mut self,
498        mut stdout_stream: JournalStream,
499        stderr_stream: JournalStream,
500        finalizer: &'a Finalizer,
501        mut shutdown: ShutdownSignal,
502    ) -> bool {
503        let bytes_received = register!(BytesReceived::from(Protocol::from("journald")));
504        let events_received = register!(EventsReceived);
505
506        // Spawn stderr handler task
507        let stderr_handler = tokio::spawn(Self::handle_stderr(stderr_stream));
508
509        let batch_size = self.batch_size;
510        let result = loop {
511            let mut batch = Batch::new(self);
512
513            // Start the timeout counter only once we have received a
514            // valid and non-filtered event.
515            while batch.events.is_empty() {
516                let item = tokio::select! {
517                    _ = &mut shutdown => {
518                        stderr_handler.abort();
519                        return false;
520                    },
521                    item = stdout_stream.next() => item,
522                };
523                if !batch.handle_next(item) {
524                    stderr_handler.abort();
525                    return true;
526                }
527            }
528
529            let timeout = tokio::time::sleep(BATCH_TIMEOUT);
530            tokio::pin!(timeout);
531
532            for _ in 1..batch_size {
533                tokio::select! {
534                    _ = &mut timeout => break,
535                    result = stdout_stream.next() => if !batch.handle_next(result) {
536                        break;
537                    }
538                }
539            }
540            if let Some(x) = batch
541                .finish(finalizer, &bytes_received, &events_received)
542                .await
543            {
544                break x;
545            }
546        };
547
548        stderr_handler.abort();
549        result
550    }
551
552    /// Handle stderr stream from journalctl process
553    async fn handle_stderr(mut stderr_stream: JournalStream) {
554        while let Some(result) = stderr_stream.next().await {
555            match result {
556                Ok(line) => {
557                    let line_str = String::from_utf8_lossy(&line);
558                    let trimmed = line_str.trim();
559                    if !trimmed.is_empty() {
560                        warn!("Warning journalctl stderr: {trimmed}");
561                    }
562                }
563                Err(err) => {
564                    error!("Error reading journalctl stderr: {err}");
565                    break;
566                }
567            }
568        }
569    }
570}
571
572struct Batch<'a> {
573    events: Vec<LogEvent>,
574    record_size: usize,
575    exiting: Option<bool>,
576    batch: Option<BatchNotifier>,
577    receiver: Option<BatchStatusReceiver>,
578    source: &'a mut JournaldSource,
579    cursor: Option<String>,
580}
581
582impl<'a> Batch<'a> {
583    fn new(source: &'a mut JournaldSource) -> Self {
584        let (batch, receiver) = BatchNotifier::maybe_new_with_receiver(source.acknowledgements);
585        Self {
586            events: Vec::new(),
587            record_size: 0,
588            exiting: None,
589            batch,
590            receiver,
591            source,
592            cursor: None,
593        }
594    }
595
596    fn handle_next(&mut self, result: Option<Result<Bytes, BoxedFramingError>>) -> bool {
597        match result {
598            None => {
599                warn!("Journalctl process stopped.");
600                self.exiting = Some(true);
601                false
602            }
603            Some(Err(error)) => {
604                emit!(JournaldReadError { error });
605                false
606            }
607            Some(Ok(bytes)) => {
608                match decode_record(&bytes, self.source.remap_priority) {
609                    Ok(mut record) => {
610                        if self.source.emit_cursor {
611                            if let Some(tmp) = record.get(CURSOR) {
612                                self.cursor = Some(tmp.clone());
613                            }
614                        } else if let Some(tmp) = record.remove(CURSOR) {
615                            self.cursor = Some(tmp);
616                        }
617
618                        if !filter_matches(
619                            &record,
620                            &self.source.include_matches,
621                            &self.source.exclude_matches,
622                        ) {
623                            self.record_size += bytes.len();
624
625                            let mut event = create_log_event_from_record(
626                                record,
627                                &self.batch,
628                                self.source.log_namespace,
629                            );
630
631                            enrich_log_event(&mut event, self.source.log_namespace);
632
633                            self.events.push(event);
634                        }
635                    }
636                    Err(error) => {
637                        emit!(JournaldInvalidRecordError {
638                            error,
639                            text: String::from_utf8_lossy(&bytes).into_owned()
640                        });
641                    }
642                }
643                true
644            }
645        }
646    }
647
648    async fn finish(
649        mut self,
650        finalizer: &Finalizer,
651        bytes_received: &'a Registered<BytesReceived>,
652        events_received: &'a Registered<EventsReceived>,
653    ) -> Option<bool> {
654        drop(self.batch);
655
656        if self.record_size > 0 {
657            bytes_received.emit(ByteSize(self.record_size));
658        }
659
660        if !self.events.is_empty() {
661            let count = self.events.len();
662            let byte_size = self.events.estimated_json_encoded_size_of();
663            events_received.emit(CountByteSize(count, byte_size));
664
665            match self.source.out.send_batch(self.events).await {
666                Ok(_) => {
667                    if let Some(cursor) = self.cursor {
668                        finalizer.finalize(cursor, self.receiver).await;
669                    }
670                }
671                Err(_) => {
672                    emit!(StreamClosedError { count });
673                    // `out` channel is closed, don't restart journalctl.
674                    self.exiting = Some(false);
675                }
676            }
677        }
678        self.exiting
679    }
680}
681
682type JournalStream = BoxStream<'static, Result<Bytes, BoxedFramingError>>;
683
684struct StartJournalctl {
685    path: PathBuf,
686    journal_dir: Option<PathBuf>,
687    journal_namespace: Option<String>,
688    current_boot_only: bool,
689    since_now: bool,
690    extra_args: Vec<String>,
691}
692
693impl StartJournalctl {
694    const fn new(
695        path: PathBuf,
696        journal_dir: Option<PathBuf>,
697        journal_namespace: Option<String>,
698        current_boot_only: bool,
699        since_now: bool,
700        extra_args: Vec<String>,
701    ) -> Self {
702        Self {
703            path,
704            journal_dir,
705            journal_namespace,
706            current_boot_only,
707            since_now,
708            extra_args,
709        }
710    }
711
712    fn make_command(&self, checkpoint: Option<&str>) -> Command {
713        let mut command = Command::new(&self.path);
714        command.stdout(Stdio::piped());
715        command.stderr(Stdio::piped());
716        command.arg("--follow");
717        command.arg("--all");
718        command.arg("--show-cursor");
719        command.arg("--output=json");
720
721        if let Some(dir) = &self.journal_dir {
722            command.arg(format!("--directory={}", dir.display()));
723        }
724
725        if let Some(namespace) = &self.journal_namespace {
726            command.arg(format!("--namespace={namespace}"));
727        }
728
729        if self.current_boot_only {
730            command.arg("--boot");
731        }
732
733        if let Some(cursor) = checkpoint {
734            command.arg(format!("--after-cursor={cursor}"));
735        } else if self.since_now {
736            command.arg("--since=now");
737        } else {
738            // journalctl --follow only outputs a few lines without a starting point
739            command.arg("--since=2000-01-01");
740        }
741
742        if !self.extra_args.is_empty() {
743            command.args(&self.extra_args);
744        }
745
746        command
747    }
748
749    fn start(
750        &mut self,
751        checkpoint: Option<&str>,
752    ) -> crate::Result<(JournalStream, JournalStream, RunningJournalctl)> {
753        let mut command = self.make_command(checkpoint);
754
755        let mut child = command.spawn().context(JournalctlSpawnSnafu)?;
756
757        let stdout_stream = FramedRead::new(
758            child.stdout.take().unwrap(),
759            CharacterDelimitedDecoder::new(b'\n'),
760        );
761
762        let stderr = child.stderr.take().unwrap();
763        let stderr_stream = FramedRead::new(stderr, CharacterDelimitedDecoder::new(b'\n'));
764
765        Ok((
766            stdout_stream.boxed(),
767            stderr_stream.boxed(),
768            RunningJournalctl(child),
769        ))
770    }
771}
772
773struct RunningJournalctl(Child);
774
775impl Drop for RunningJournalctl {
776    fn drop(&mut self) {
777        if let Some(pid) = self.0.id().and_then(|pid| pid.try_into().ok()) {
778            _ = kill(Pid::from_raw(pid), Signal::SIGTERM);
779        }
780    }
781}
782
783fn enrich_log_event(log: &mut LogEvent, log_namespace: LogNamespace) {
784    match log_namespace {
785        LogNamespace::Vector => {
786            if let Some(host) = log
787                .get(metadata_path!(JournaldConfig::NAME, "metadata"))
788                .and_then(|meta| meta.get(HOSTNAME))
789            {
790                log.insert(metadata_path!(JournaldConfig::NAME, "host"), host.clone());
791            }
792        }
793        LogNamespace::Legacy => {
794            if let Some(host) = log.remove(event_path!(HOSTNAME)) {
795                log_namespace.insert_source_metadata(
796                    JournaldConfig::NAME,
797                    log,
798                    log_schema().host_key().map(LegacyKey::Overwrite),
799                    path!("host"),
800                    host,
801                );
802            }
803        }
804    }
805
806    // Create a Utc timestamp from an existing log field if present.
807    let timestamp_value = match log_namespace {
808        LogNamespace::Vector => log
809            .get(metadata_path!(JournaldConfig::NAME, "metadata"))
810            .and_then(|meta| {
811                meta.get(SOURCE_TIMESTAMP)
812                    .or_else(|| meta.get(RECEIVED_TIMESTAMP))
813            }),
814        LogNamespace::Legacy => log
815            .get(event_path!(SOURCE_TIMESTAMP))
816            .or_else(|| log.get(event_path!(RECEIVED_TIMESTAMP))),
817    };
818
819    let timestamp = timestamp_value
820        .filter(|&ts| ts.is_bytes())
821        .and_then(|ts| ts.as_str().unwrap().parse::<u64>().ok())
822        .map(|ts| {
823            chrono::Utc
824                .timestamp_opt((ts / 1_000_000) as i64, (ts % 1_000_000) as u32 * 1_000)
825                .single()
826                .expect("invalid timestamp")
827        });
828
829    // Add timestamp.
830    match log_namespace {
831        LogNamespace::Vector => {
832            log.insert(metadata_path!("vector", "ingest_timestamp"), Utc::now());
833
834            if let Some(ts) = timestamp {
835                log.insert(metadata_path!(JournaldConfig::NAME, "timestamp"), ts);
836            }
837        }
838        LogNamespace::Legacy => {
839            if let Some(ts) = timestamp {
840                log.maybe_insert(log_schema().timestamp_key_target_path(), ts);
841            }
842        }
843    }
844
845    // Add source type.
846    log_namespace.insert_vector_metadata(
847        log,
848        log_schema().source_type_key(),
849        path!("source_type"),
850        JournaldConfig::NAME,
851    );
852}
853
854fn create_log_event_from_record(
855    mut record: Record,
856    batch: &Option<BatchNotifier>,
857    log_namespace: LogNamespace,
858) -> LogEvent {
859    match log_namespace {
860        LogNamespace::Vector => {
861            let message_value = record
862                .remove(MESSAGE)
863                .map(|msg| Value::Bytes(Bytes::from(msg)))
864                .unwrap_or(Value::Null);
865
866            let mut log = LogEvent::from(message_value).with_batch_notifier_option(batch);
867
868            // Add the remaining fields from the Record to the log event into an object to avoid collisions.
869            record.iter().for_each(|(key, value)| {
870                log.metadata_mut()
871                    .value_mut()
872                    .insert(path!(JournaldConfig::NAME, "metadata", key), value.as_str());
873            });
874
875            log
876        }
877        LogNamespace::Legacy => {
878            let mut log = LogEvent::from_iter(record).with_batch_notifier_option(batch);
879
880            if let Some(message) = log.remove(event_path!(MESSAGE)) {
881                log.maybe_insert(log_schema().message_key_target_path(), message);
882            }
883
884            log
885        }
886    }
887}
888
889/// Map the given unit name into a valid systemd unit
890/// by appending ".service" if no extension is present.
891fn fixup_unit(unit: &str) -> String {
892    if unit.contains('.') {
893        unit.into()
894    } else {
895        format!("{unit}.service")
896    }
897}
898
899fn decode_record(line: &[u8], remap: bool) -> Result<Record, JsonError> {
900    let mut record = serde_json::from_str::<JsonValue>(&String::from_utf8_lossy(line))?;
901    // journalctl will output non-ASCII values using an array
902    // of integers. Look for those values and re-parse them.
903    if let Some(record) = record.as_object_mut() {
904        for (_, value) in record.iter_mut().filter(|(_, v)| v.is_array()) {
905            *value = decode_array(value.as_array().expect("already validated"));
906        }
907    }
908    if remap {
909        record.get_mut("PRIORITY").map(remap_priority);
910    }
911    serde_json::from_value(record)
912}
913
914fn decode_array(array: &[JsonValue]) -> JsonValue {
915    decode_array_as_bytes(array).unwrap_or_else(|| {
916        let ser = serde_json::to_string(array).expect("already deserialized");
917        JsonValue::String(ser)
918    })
919}
920
921fn decode_array_as_bytes(array: &[JsonValue]) -> Option<JsonValue> {
922    // From the array of values, turn all the numbers into bytes, and
923    // then the bytes into a string, but return None if any value in the
924    // array was not a valid byte.
925    array
926        .iter()
927        .map(|item| {
928            item.as_u64().and_then(|num| match num {
929                num if num <= u8::MAX as u64 => Some(num as u8),
930                _ => None,
931            })
932        })
933        .collect::<Option<Vec<u8>>>()
934        .map(|array| String::from_utf8_lossy(&array).into())
935}
936
937fn remap_priority(priority: &mut JsonValue) {
938    if let Some(num) = priority.as_str().and_then(|s| usize::from_str(s).ok()) {
939        let text = match num {
940            0 => "EMERG",
941            1 => "ALERT",
942            2 => "CRIT",
943            3 => "ERR",
944            4 => "WARNING",
945            5 => "NOTICE",
946            6 => "INFO",
947            7 => "DEBUG",
948            _ => "UNKNOWN",
949        };
950        *priority = JsonValue::String(text.into());
951    }
952}
953
954fn filter_matches(record: &Record, includes: &Matches, excludes: &Matches) -> bool {
955    match (includes.is_empty(), excludes.is_empty()) {
956        (true, true) => false,
957        (false, true) => !contains_match(record, includes),
958        (true, false) => contains_match(record, excludes),
959        (false, false) => !contains_match(record, includes) || contains_match(record, excludes),
960    }
961}
962
963fn contains_match(record: &Record, matches: &Matches) -> bool {
964    let f = move |(field, value)| {
965        matches
966            .get(field)
967            .map(|x| x.contains(value))
968            .unwrap_or(false)
969    };
970    record.iter().any(f)
971}
972
973fn find_duplicate_match(a_matches: &Matches, b_matches: &Matches) -> Option<(String, String)> {
974    for (a_key, a_values) in a_matches {
975        if let Some(b_values) = b_matches.get(a_key.as_str()) {
976            for (a, b) in a_values
977                .iter()
978                .flat_map(|x| std::iter::repeat(x).zip(b_values.iter()))
979            {
980                if a == b {
981                    return Some((a_key.into(), b.into()));
982                }
983            }
984        }
985    }
986    None
987}
988
989enum Finalizer {
990    Sync(SharedCheckpointer),
991    Async(OrderedFinalizer<String>),
992}
993
994impl Finalizer {
995    fn new(
996        acknowledgements: bool,
997        checkpointer: SharedCheckpointer,
998        shutdown: ShutdownSignal,
999    ) -> Self {
1000        if acknowledgements {
1001            let (finalizer, mut ack_stream) = OrderedFinalizer::new(Some(shutdown));
1002            tokio::spawn(async move {
1003                while let Some((status, cursor)) = ack_stream.next().await {
1004                    if status == BatchStatus::Delivered {
1005                        checkpointer.lock().await.set(cursor).await;
1006                    }
1007                }
1008            });
1009            Self::Async(finalizer)
1010        } else {
1011            Self::Sync(checkpointer)
1012        }
1013    }
1014
1015    async fn finalize(&self, cursor: String, receiver: Option<BatchStatusReceiver>) {
1016        match (self, receiver) {
1017            (Self::Sync(checkpointer), None) => checkpointer.lock().await.set(cursor).await,
1018            (Self::Async(finalizer), Some(receiver)) => finalizer.add(cursor, receiver),
1019            _ => {
1020                unreachable!("Cannot have async finalization without a receiver in journald source")
1021            }
1022        }
1023    }
1024}
1025
1026struct Checkpointer {
1027    file: File,
1028    filename: PathBuf,
1029}
1030
1031impl Checkpointer {
1032    async fn new(filename: PathBuf) -> Result<Self, io::Error> {
1033        let file = OpenOptions::new()
1034            .read(true)
1035            .write(true)
1036            .create(true)
1037            .truncate(false)
1038            .open(&filename)
1039            .await?;
1040        Ok(Checkpointer { file, filename })
1041    }
1042
1043    async fn set(&mut self, token: &str) -> Result<(), io::Error> {
1044        self.file.seek(SeekFrom::Start(0)).await?;
1045        self.file.write_all(format!("{token}\n").as_bytes()).await
1046    }
1047
1048    async fn get(&mut self) -> Result<Option<String>, io::Error> {
1049        let mut buf = Vec::<u8>::new();
1050        self.file.seek(SeekFrom::Start(0)).await?;
1051        self.file.read_to_end(&mut buf).await?;
1052        match buf.len() {
1053            0 => Ok(None),
1054            _ => {
1055                let text = String::from_utf8_lossy(&buf);
1056                match text.find('\n') {
1057                    Some(nl) => Ok(Some(String::from(&text[..nl]))),
1058                    None => Ok(None), // Maybe return an error?
1059                }
1060            }
1061        }
1062    }
1063}
1064
1065struct StatefulCheckpointer {
1066    checkpointer: Checkpointer,
1067    cursor: Option<String>,
1068}
1069
1070impl StatefulCheckpointer {
1071    async fn new(filename: PathBuf) -> Result<Self, io::Error> {
1072        let mut checkpointer = Checkpointer::new(filename).await?;
1073        let cursor = checkpointer.get().await?;
1074        Ok(Self {
1075            checkpointer,
1076            cursor,
1077        })
1078    }
1079
1080    async fn set(&mut self, token: impl Into<String>) {
1081        let token = token.into();
1082        if let Err(error) = self.checkpointer.set(&token).await {
1083            emit!(JournaldCheckpointSetError {
1084                error,
1085                filename: self
1086                    .checkpointer
1087                    .filename
1088                    .to_str()
1089                    .unwrap_or("unknown")
1090                    .to_string(),
1091            });
1092        }
1093        self.cursor = Some(token);
1094    }
1095}
1096
1097#[derive(Clone)]
1098struct SharedCheckpointer(Arc<Mutex<StatefulCheckpointer>>);
1099
1100impl SharedCheckpointer {
1101    fn new(c: StatefulCheckpointer) -> Self {
1102        Self(Arc::new(Mutex::new(c)))
1103    }
1104
1105    async fn lock(&self) -> MutexGuard<'_, StatefulCheckpointer> {
1106        self.0.lock().await
1107    }
1108}
1109
1110#[cfg(test)]
1111mod checkpointer_tests {
1112    use tempfile::tempdir;
1113    use tokio::fs::read_to_string;
1114
1115    use super::*;
1116
1117    #[test]
1118    fn generate_config() {
1119        crate::test_util::test_generate_config::<JournaldConfig>();
1120    }
1121
1122    #[tokio::test]
1123    async fn journald_checkpointer_works() {
1124        let tempdir = tempdir().unwrap();
1125        let mut filename = tempdir.path().to_path_buf();
1126        filename.push(CHECKPOINT_FILENAME);
1127        let mut checkpointer = Checkpointer::new(filename.clone())
1128            .await
1129            .expect("Creating checkpointer failed!");
1130
1131        assert!(checkpointer.get().await.unwrap().is_none());
1132
1133        checkpointer
1134            .set("first test")
1135            .await
1136            .expect("Setting checkpoint failed");
1137        assert_eq!(checkpointer.get().await.unwrap().unwrap(), "first test");
1138        let contents = read_to_string(filename.clone())
1139            .await
1140            .unwrap_or_else(|_| panic!("Failed to read: {filename:?}"));
1141        assert!(contents.starts_with("first test\n"));
1142
1143        checkpointer
1144            .set("second")
1145            .await
1146            .expect("Setting checkpoint failed");
1147        assert_eq!(checkpointer.get().await.unwrap().unwrap(), "second");
1148        let contents = read_to_string(filename.clone())
1149            .await
1150            .unwrap_or_else(|_| panic!("Failed to read: {filename:?}"));
1151        assert!(contents.starts_with("second\n"));
1152    }
1153}
1154
1155#[cfg(test)]
1156mod tests {
1157    use std::{fs, path::Path};
1158
1159    use tempfile::tempdir;
1160    use tokio::time::{Duration, Instant, sleep, timeout};
1161    use vrl::value::{Value, kind::Collection};
1162
1163    use super::*;
1164    use crate::{
1165        config::ComponentKey,
1166        event::{Event, EventStatus},
1167        test_util::components::assert_source_compliance,
1168    };
1169
1170    const TEST_COMPONENT: &str = "journald-test";
1171    const TEST_JOURNALCTL: &str = "tests/data/journalctl";
1172
1173    async fn run_with_units(iunits: &[&str], xunits: &[&str], cursor: Option<&str>) -> Vec<Event> {
1174        let include_matches = create_unit_matches(iunits.to_vec());
1175        let exclude_matches = create_unit_matches(xunits.to_vec());
1176        run_journal(include_matches, exclude_matches, cursor, false).await
1177    }
1178
1179    async fn run_journal(
1180        include_matches: Matches,
1181        exclude_matches: Matches,
1182        checkpoint: Option<&str>,
1183        emit_cursor: bool,
1184    ) -> Vec<Event> {
1185        assert_source_compliance(&["protocol"], async move {
1186            let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Delivered);
1187
1188            let tempdir = tempdir().unwrap();
1189            let tempdir = tempdir.path().to_path_buf();
1190
1191            if let Some(cursor) = checkpoint {
1192                let mut checkpoint_path = tempdir.clone();
1193                checkpoint_path.push(TEST_COMPONENT);
1194                fs::create_dir(&checkpoint_path).unwrap();
1195                checkpoint_path.push(CHECKPOINT_FILENAME);
1196
1197                let mut checkpointer = Checkpointer::new(checkpoint_path.clone())
1198                    .await
1199                    .expect("Creating checkpointer failed!");
1200
1201                checkpointer
1202                    .set(cursor)
1203                    .await
1204                    .expect("Could not set checkpoint");
1205            }
1206
1207            let (cx, shutdown) =
1208                SourceContext::new_shutdown(&ComponentKey::from(TEST_COMPONENT), tx);
1209            let config = JournaldConfig {
1210                journalctl_path: Some(TEST_JOURNALCTL.into()),
1211                include_matches,
1212                exclude_matches,
1213                data_dir: Some(tempdir),
1214                remap_priority: true,
1215                acknowledgements: false.into(),
1216                emit_cursor,
1217                ..Default::default()
1218            };
1219            let source = config.build(cx).await.unwrap();
1220            tokio::spawn(async move { source.await.unwrap() });
1221
1222            // Hack: Sleep to ensure journalctl process starts and emits events before shutdown.
1223            sleep(Duration::from_secs(1)).await;
1224            shutdown
1225                .shutdown_all(Some(Instant::now() + Duration::from_secs(1)))
1226                .await;
1227
1228            timeout(Duration::from_secs(1), rx.collect()).await.unwrap()
1229        })
1230        .await
1231    }
1232
1233    fn create_unit_matches<S: Into<String>>(units: Vec<S>) -> Matches {
1234        let units: HashSet<String> = units.into_iter().map(Into::into).collect();
1235        let mut map = HashMap::new();
1236        if !units.is_empty() {
1237            map.insert(String::from(SYSTEMD_UNIT), units);
1238        }
1239        map
1240    }
1241
1242    fn create_matches<S: Into<String>>(conditions: Vec<(S, S)>) -> Matches {
1243        let mut matches: Matches = HashMap::new();
1244        for (field, value) in conditions {
1245            matches
1246                .entry(field.into())
1247                .or_default()
1248                .insert(value.into());
1249        }
1250        matches
1251    }
1252
1253    #[tokio::test]
1254    async fn reads_journal() {
1255        let received = run_with_units(&[], &[], None).await;
1256        assert_eq!(received.len(), 8);
1257        assert_eq!(
1258            message(&received[0]),
1259            Value::Bytes("System Initialization".into())
1260        );
1261        assert_eq!(
1262            received[0].as_log()[log_schema().source_type_key().unwrap().to_string()],
1263            "journald".into()
1264        );
1265        assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140001000));
1266        assert_eq!(priority(&received[0]), Value::Bytes("INFO".into()));
1267        assert_eq!(message(&received[1]), Value::Bytes("unit message".into()));
1268        assert_eq!(timestamp(&received[1]), value_ts(1578529839, 140002000));
1269        assert_eq!(priority(&received[1]), Value::Bytes("DEBUG".into()));
1270    }
1271
1272    #[tokio::test]
1273    async fn includes_units() {
1274        let received = run_with_units(&["unit.service"], &[], None).await;
1275        assert_eq!(received.len(), 1);
1276        assert_eq!(message(&received[0]), Value::Bytes("unit message".into()));
1277    }
1278
1279    #[tokio::test]
1280    async fn excludes_units() {
1281        let received = run_with_units(&[], &["unit.service", "badunit.service"], None).await;
1282        assert_eq!(received.len(), 6);
1283        assert_eq!(
1284            message(&received[0]),
1285            Value::Bytes("System Initialization".into())
1286        );
1287        assert_eq!(
1288            message(&received[1]),
1289            Value::Bytes("Missing timestamp".into())
1290        );
1291        assert_eq!(
1292            message(&received[2]),
1293            Value::Bytes("Different timestamps".into())
1294        );
1295    }
1296
1297    #[tokio::test]
1298    async fn emits_cursor() {
1299        let received = run_journal(Matches::new(), Matches::new(), None, true).await;
1300        assert_eq!(cursor(&received[0]), Value::Bytes("1".into()));
1301        assert_eq!(cursor(&received[3]), Value::Bytes("4".into()));
1302        assert_eq!(cursor(&received[7]), Value::Bytes("8".into()));
1303    }
1304
1305    #[tokio::test]
1306    async fn includes_matches() {
1307        let matches = create_matches(vec![("PRIORITY", "ERR")]);
1308        let received = run_journal(matches, HashMap::new(), None, false).await;
1309        assert_eq!(received.len(), 2);
1310        assert_eq!(
1311            message(&received[0]),
1312            Value::Bytes("Different timestamps".into())
1313        );
1314        assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140005000));
1315        assert_eq!(
1316            message(&received[1]),
1317            Value::Bytes("Non-ASCII in other field".into())
1318        );
1319        assert_eq!(timestamp(&received[1]), value_ts(1578529839, 140005000));
1320    }
1321
1322    #[tokio::test]
1323    async fn includes_kernel() {
1324        let matches = create_matches(vec![("_TRANSPORT", "kernel")]);
1325        let received = run_journal(matches, HashMap::new(), None, false).await;
1326        assert_eq!(received.len(), 1);
1327        assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140006000));
1328        assert_eq!(message(&received[0]), Value::Bytes("audit log".into()));
1329    }
1330
1331    #[tokio::test]
1332    async fn excludes_matches() {
1333        let matches = create_matches(vec![("PRIORITY", "INFO"), ("PRIORITY", "DEBUG")]);
1334        let received = run_journal(HashMap::new(), matches, None, false).await;
1335        assert_eq!(received.len(), 5);
1336        assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140003000));
1337        assert_eq!(timestamp(&received[1]), value_ts(1578529839, 140004000));
1338        assert_eq!(timestamp(&received[2]), value_ts(1578529839, 140005000));
1339        assert_eq!(timestamp(&received[3]), value_ts(1578529839, 140005000));
1340        assert_eq!(timestamp(&received[4]), value_ts(1578529839, 140006000));
1341    }
1342
1343    #[tokio::test]
1344    async fn handles_checkpoint() {
1345        let received = run_with_units(&[], &[], Some("1")).await;
1346        assert_eq!(received.len(), 7);
1347        assert_eq!(message(&received[0]), Value::Bytes("unit message".into()));
1348        assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140002000));
1349    }
1350
1351    #[tokio::test]
1352    async fn parses_array_messages() {
1353        let received = run_with_units(&["badunit.service"], &[], None).await;
1354        assert_eq!(received.len(), 1);
1355        assert_eq!(message(&received[0]), Value::Bytes("¿Hello?".into()));
1356    }
1357
1358    #[tokio::test]
1359    async fn parses_array_fields() {
1360        let received = run_with_units(&["syslog.service"], &[], None).await;
1361        assert_eq!(received.len(), 1);
1362        assert_eq!(
1363            received[0].as_log()["SYSLOG_RAW"],
1364            Value::Bytes("¿World?".into())
1365        );
1366    }
1367
1368    #[tokio::test]
1369    async fn parses_string_sequences() {
1370        let received = run_with_units(&["NetworkManager.service"], &[], None).await;
1371        assert_eq!(received.len(), 1);
1372        assert_eq!(
1373            received[0].as_log()["SYSLOG_FACILITY"],
1374            Value::Bytes(r#"["DHCP4","DHCP6"]"#.into())
1375        );
1376    }
1377
1378    #[tokio::test]
1379    async fn handles_missing_timestamp() {
1380        let received = run_with_units(&["stdout"], &[], None).await;
1381        assert_eq!(received.len(), 2);
1382        assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140004000));
1383        assert_eq!(timestamp(&received[1]), value_ts(1578529839, 140005000));
1384    }
1385
1386    #[tokio::test]
1387    async fn handles_acknowledgements() {
1388        let (tx, mut rx) = SourceSender::new_test();
1389
1390        let tempdir = tempdir().unwrap();
1391        let tempdir = tempdir.path().to_path_buf();
1392        let mut checkpoint_path = tempdir.clone();
1393        checkpoint_path.push(TEST_COMPONENT);
1394        fs::create_dir(&checkpoint_path).unwrap();
1395        checkpoint_path.push(CHECKPOINT_FILENAME);
1396
1397        let mut checkpointer = Checkpointer::new(checkpoint_path.clone())
1398            .await
1399            .expect("Creating checkpointer failed!");
1400
1401        let config = JournaldConfig {
1402            journalctl_path: Some(TEST_JOURNALCTL.into()),
1403            data_dir: Some(tempdir),
1404            remap_priority: true,
1405            acknowledgements: true.into(),
1406            ..Default::default()
1407        };
1408        let (cx, _shutdown) = SourceContext::new_shutdown(&ComponentKey::from(TEST_COMPONENT), tx);
1409        let source = config.build(cx).await.unwrap();
1410        tokio::spawn(async move { source.await.unwrap() });
1411
1412        // Make sure the checkpointer cursor is empty
1413        assert_eq!(checkpointer.get().await.unwrap(), None);
1414
1415        // Hack: Sleep to ensure journalctl process starts and emits events.
1416        sleep(Duration::from_secs(1)).await;
1417
1418        // Acknowledge all the received events.
1419        let mut count = 0;
1420        while let Poll::Ready(Some(event)) = futures::poll!(rx.next()) {
1421            // The checkpointer shouldn't set the cursor until the end of the batch.
1422            assert_eq!(checkpointer.get().await.unwrap(), None);
1423            event.metadata().update_status(EventStatus::Delivered);
1424            count += 1;
1425        }
1426        assert_eq!(count, 8);
1427
1428        sleep(Duration::from_millis(100)).await;
1429        assert_eq!(checkpointer.get().await.unwrap().as_deref(), Some("8"));
1430    }
1431
1432    #[test]
1433    fn filter_matches_works_correctly() {
1434        let empty: Matches = HashMap::new();
1435        let includes = create_unit_matches(vec!["one", "two"]);
1436        let excludes = create_unit_matches(vec!["foo", "bar"]);
1437
1438        let zero = HashMap::new();
1439        assert!(!filter_matches(&zero, &empty, &empty));
1440        assert!(filter_matches(&zero, &includes, &empty));
1441        assert!(!filter_matches(&zero, &empty, &excludes));
1442        assert!(filter_matches(&zero, &includes, &excludes));
1443        let mut one = HashMap::new();
1444        one.insert(String::from(SYSTEMD_UNIT), String::from("one"));
1445        assert!(!filter_matches(&one, &empty, &empty));
1446        assert!(!filter_matches(&one, &includes, &empty));
1447        assert!(!filter_matches(&one, &empty, &excludes));
1448        assert!(!filter_matches(&one, &includes, &excludes));
1449        let mut two = HashMap::new();
1450        two.insert(String::from(SYSTEMD_UNIT), String::from("bar"));
1451        assert!(!filter_matches(&two, &empty, &empty));
1452        assert!(filter_matches(&two, &includes, &empty));
1453        assert!(filter_matches(&two, &empty, &excludes));
1454        assert!(filter_matches(&two, &includes, &excludes));
1455    }
1456
1457    #[test]
1458    fn merges_units_and_matches_option() {
1459        let include_units = vec!["one", "two"].into_iter().map(String::from).collect();
1460        let include_matches = create_matches(vec![
1461            ("_SYSTEMD_UNIT", "three.service"),
1462            ("_TRANSPORT", "kernel"),
1463        ]);
1464
1465        let exclude_units = vec!["foo", "bar"].into_iter().map(String::from).collect();
1466        let exclude_matches = create_matches(vec![
1467            ("_SYSTEMD_UNIT", "baz.service"),
1468            ("PRIORITY", "DEBUG"),
1469        ]);
1470
1471        let journald_config = JournaldConfig {
1472            include_units,
1473            include_matches,
1474            exclude_units,
1475            exclude_matches,
1476            ..Default::default()
1477        };
1478
1479        let hashset =
1480            |v: &[&str]| -> HashSet<String> { v.iter().copied().map(String::from).collect() };
1481
1482        let matches = journald_config.merged_include_matches();
1483        let units = matches.get("_SYSTEMD_UNIT").unwrap();
1484        assert_eq!(
1485            units,
1486            &hashset(&["one.service", "two.service", "three.service"])
1487        );
1488        let units = matches.get("_TRANSPORT").unwrap();
1489        assert_eq!(units, &hashset(&["kernel"]));
1490
1491        let matches = journald_config.merged_exclude_matches();
1492        let units = matches.get("_SYSTEMD_UNIT").unwrap();
1493        assert_eq!(
1494            units,
1495            &hashset(&["foo.service", "bar.service", "baz.service"])
1496        );
1497        let units = matches.get("PRIORITY").unwrap();
1498        assert_eq!(units, &hashset(&["DEBUG"]));
1499    }
1500
1501    #[test]
1502    fn find_duplicate_match_works_correctly() {
1503        let include_matches = create_matches(vec![("_TRANSPORT", "kernel")]);
1504        let exclude_matches = create_matches(vec![("_TRANSPORT", "kernel")]);
1505        let (field, value) = find_duplicate_match(&include_matches, &exclude_matches).unwrap();
1506        assert_eq!(field, "_TRANSPORT");
1507        assert_eq!(value, "kernel");
1508
1509        let empty = HashMap::new();
1510        let actual = find_duplicate_match(&empty, &empty);
1511        assert!(actual.is_none());
1512
1513        let actual = find_duplicate_match(&include_matches, &empty);
1514        assert!(actual.is_none());
1515
1516        let actual = find_duplicate_match(&empty, &exclude_matches);
1517        assert!(actual.is_none());
1518    }
1519
1520    #[test]
1521    fn command_options() {
1522        let path = PathBuf::from("journalctl");
1523
1524        let journal_dir = None;
1525        let journal_namespace = None;
1526        let current_boot_only = false;
1527        let cursor = None;
1528        let since_now = false;
1529        let extra_args = vec![];
1530
1531        let command = create_command(
1532            &path,
1533            journal_dir,
1534            journal_namespace,
1535            current_boot_only,
1536            since_now,
1537            cursor,
1538            extra_args,
1539        );
1540        let cmd_line = format!("{command:?}");
1541        assert!(!cmd_line.contains("--directory="));
1542        assert!(!cmd_line.contains("--namespace="));
1543        assert!(!cmd_line.contains("--boot"));
1544        assert!(cmd_line.contains("--since=2000-01-01"));
1545
1546        let journal_dir = None;
1547        let journal_namespace = None;
1548        let since_now = true;
1549        let extra_args = vec![];
1550
1551        let command = create_command(
1552            &path,
1553            journal_dir,
1554            journal_namespace,
1555            current_boot_only,
1556            since_now,
1557            cursor,
1558            extra_args,
1559        );
1560        let cmd_line = format!("{command:?}");
1561        assert!(cmd_line.contains("--since=now"));
1562
1563        let journal_dir = Some(PathBuf::from("/tmp/journal-dir"));
1564        let journal_namespace = Some(String::from("my_namespace"));
1565        let current_boot_only = true;
1566        let cursor = Some("2021-01-01");
1567        let extra_args = vec!["--merge".to_string()];
1568
1569        let command = create_command(
1570            &path,
1571            journal_dir,
1572            journal_namespace,
1573            current_boot_only,
1574            since_now,
1575            cursor,
1576            extra_args,
1577        );
1578        let cmd_line = format!("{command:?}");
1579        assert!(cmd_line.contains("--directory=/tmp/journal-dir"));
1580        assert!(cmd_line.contains("--namespace=my_namespace"));
1581        assert!(cmd_line.contains("--boot"));
1582        assert!(cmd_line.contains("--after-cursor="));
1583        assert!(cmd_line.contains("--merge"));
1584    }
1585
1586    fn create_command(
1587        path: &Path,
1588        journal_dir: Option<PathBuf>,
1589        journal_namespace: Option<String>,
1590        current_boot_only: bool,
1591        since_now: bool,
1592        cursor: Option<&str>,
1593        extra_args: Vec<String>,
1594    ) -> Command {
1595        StartJournalctl::new(
1596            path.into(),
1597            journal_dir,
1598            journal_namespace,
1599            current_boot_only,
1600            since_now,
1601            extra_args,
1602        )
1603        .make_command(cursor)
1604    }
1605
1606    fn message(event: &Event) -> Value {
1607        event.as_log()[log_schema().message_key().unwrap().to_string()].clone()
1608    }
1609
1610    fn timestamp(event: &Event) -> Value {
1611        event.as_log()[log_schema().timestamp_key().unwrap().to_string()].clone()
1612    }
1613
1614    fn cursor(event: &Event) -> Value {
1615        event.as_log()[CURSOR].clone()
1616    }
1617
1618    fn value_ts(secs: i64, usecs: u32) -> Value {
1619        Value::Timestamp(
1620            chrono::Utc
1621                .timestamp_opt(secs, usecs)
1622                .single()
1623                .expect("invalid timestamp"),
1624        )
1625    }
1626
1627    fn priority(event: &Event) -> Value {
1628        event.as_log()["PRIORITY"].clone()
1629    }
1630
1631    #[test]
1632    fn output_schema_definition_vector_namespace() {
1633        let config = JournaldConfig {
1634            log_namespace: Some(true),
1635            ..Default::default()
1636        };
1637
1638        let definitions = config
1639            .outputs(LogNamespace::Vector)
1640            .remove(0)
1641            .schema_definition(true);
1642
1643        let expected_definition =
1644            Definition::new_with_default_metadata(Kind::bytes().or_null(), [LogNamespace::Vector])
1645                .with_metadata_field(
1646                    &owned_value_path!("vector", "source_type"),
1647                    Kind::bytes(),
1648                    None,
1649                )
1650                .with_metadata_field(
1651                    &owned_value_path!("vector", "ingest_timestamp"),
1652                    Kind::timestamp(),
1653                    None,
1654                )
1655                .with_metadata_field(
1656                    &owned_value_path!(JournaldConfig::NAME, "metadata"),
1657                    Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1658                    None,
1659                )
1660                .with_metadata_field(
1661                    &owned_value_path!(JournaldConfig::NAME, "timestamp"),
1662                    Kind::timestamp().or_undefined(),
1663                    Some("timestamp"),
1664                )
1665                .with_metadata_field(
1666                    &owned_value_path!(JournaldConfig::NAME, "host"),
1667                    Kind::bytes().or_undefined(),
1668                    Some("host"),
1669                );
1670
1671        assert_eq!(definitions, Some(expected_definition))
1672    }
1673
1674    #[test]
1675    fn output_schema_definition_legacy_namespace() {
1676        let config = JournaldConfig::default();
1677
1678        let definitions = config
1679            .outputs(LogNamespace::Legacy)
1680            .remove(0)
1681            .schema_definition(true);
1682
1683        let expected_definition = Definition::new_with_default_metadata(
1684            Kind::object(Collection::empty()),
1685            [LogNamespace::Legacy],
1686        )
1687        .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
1688        .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
1689        .with_event_field(
1690            &owned_value_path!("host"),
1691            Kind::bytes().or_undefined(),
1692            Some("host"),
1693        )
1694        .unknown_fields(Kind::bytes());
1695
1696        assert_eq!(definitions, Some(expected_definition))
1697    }
1698
1699    fn matches_schema(config: &JournaldConfig, namespace: LogNamespace) {
1700        let record = r#"{
1701            "PRIORITY":"6",
1702            "SYSLOG_FACILITY":"3",
1703            "SYSLOG_IDENTIFIER":"ntpd",
1704            "_BOOT_ID":"124c781146e841ae8d9b4590df8b9231",
1705            "_CAP_EFFECTIVE":"3fffffffff",
1706            "_CMDLINE":"ntpd: [priv]",
1707            "_COMM":"ntpd",
1708            "_EXE":"/usr/sbin/ntpd",
1709            "_GID":"0",
1710            "_MACHINE_ID":"c36e9ea52800a19d214cb71b53263a28",
1711            "_PID":"2156",
1712            "_STREAM_ID":"92c79f4b45c4457490ebdefece29995e",
1713            "_SYSTEMD_CGROUP":"/system.slice/ntpd.service",
1714            "_SYSTEMD_INVOCATION_ID":"496ad5cd046d48e29f37f559a6d176f8",
1715            "_SYSTEMD_SLICE":"system.slice",
1716            "_SYSTEMD_UNIT":"ntpd.service",
1717            "_TRANSPORT":"stdout",
1718            "_UID":"0",
1719            "__MONOTONIC_TIMESTAMP":"98694000446",
1720            "__REALTIME_TIMESTAMP":"1564173027000443",
1721            "host":"my-host.local",
1722            "message":"reply from 192.168.1.2: offset -0.001791 delay 0.000176, next query 1500s",
1723            "source_type":"journald"
1724        }"#;
1725
1726        let json: serde_json::Value = serde_json::from_str(record).unwrap();
1727        let mut event = Event::from(LogEvent::from(vrl::value::Value::from(json)));
1728
1729        event.as_mut_log().insert("timestamp", chrono::Utc::now());
1730
1731        let definitions = config.outputs(namespace).remove(0).schema_definition(true);
1732
1733        definitions.unwrap().assert_valid_for_event(&event);
1734    }
1735
1736    #[test]
1737    fn matches_schema_legacy() {
1738        let config = JournaldConfig::default();
1739
1740        matches_schema(&config, LogNamespace::Legacy)
1741    }
1742}