vector/sources/
journald.rs

1use std::{
2    collections::{HashMap, HashSet},
3    io::SeekFrom,
4    path::PathBuf,
5    process::Stdio,
6    str::FromStr,
7    sync::{Arc, LazyLock},
8    time::Duration,
9};
10
11use bytes::Bytes;
12use chrono::{TimeZone, Utc};
13use futures::{StreamExt, poll, stream::BoxStream, task::Poll};
14use nix::{
15    sys::signal::{Signal, kill},
16    unistd::Pid,
17};
18use serde_json::{Error as JsonError, Value as JsonValue};
19use snafu::{ResultExt, Snafu};
20use tokio::{
21    fs::{File, OpenOptions},
22    io::{self, AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
23    process::{Child, Command},
24    sync::{Mutex, MutexGuard},
25    time::sleep,
26};
27use tokio_util::codec::FramedRead;
28use vector_lib::{
29    EstimatedJsonEncodedSizeOf,
30    codecs::{CharacterDelimitedDecoder, decoding::BoxedFramingError},
31    config::{LegacyKey, LogNamespace},
32    configurable::configurable_component,
33    finalizer::OrderedFinalizer,
34    internal_event::{
35        ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol, Registered,
36    },
37    lookup::{metadata_path, owned_value_path, path},
38    schema::Definition,
39};
40use vrl::{
41    event_path,
42    value::{Kind, Value, kind::Collection},
43};
44
45use crate::{
46    SourceSender,
47    config::{
48        DataType, SourceAcknowledgementsConfig, SourceConfig, SourceContext, SourceOutput,
49        log_schema,
50    },
51    event::{BatchNotifier, BatchStatus, BatchStatusReceiver, LogEvent},
52    internal_events::{
53        EventsReceived, JournaldCheckpointFileOpenError, JournaldCheckpointSetError,
54        JournaldInvalidRecordError, JournaldReadError, JournaldStartJournalctlError,
55        StreamClosedError,
56    },
57    serde::bool_or_struct,
58    shutdown::ShutdownSignal,
59};
60
61const BATCH_TIMEOUT: Duration = Duration::from_millis(10);
62
63const CHECKPOINT_FILENAME: &str = "checkpoint.txt";
64const CURSOR: &str = "__CURSOR";
65const HOSTNAME: &str = "_HOSTNAME";
66const MESSAGE: &str = "MESSAGE";
67const SYSTEMD_UNIT: &str = "_SYSTEMD_UNIT";
68const SOURCE_TIMESTAMP: &str = "_SOURCE_REALTIME_TIMESTAMP";
69const RECEIVED_TIMESTAMP: &str = "__REALTIME_TIMESTAMP";
70
71const BACKOFF_DURATION: Duration = Duration::from_secs(1);
72
73static JOURNALCTL: LazyLock<PathBuf> = LazyLock::new(|| "journalctl".into());
74
75#[derive(Debug, Snafu)]
76enum BuildError {
77    #[snafu(display("journalctl failed to execute: {}", source))]
78    JournalctlSpawn { source: io::Error },
79    #[snafu(display("failed to parse output of `journalctl --version`: {:?}", output))]
80    JournalctlParseVersion { output: String },
81    #[snafu(display(
82        "The unit {:?} is duplicated in both include_units and exclude_units",
83        unit
84    ))]
85    DuplicatedUnit { unit: String },
86    #[snafu(display(
87        "The Journal field/value pair {:?}:{:?} is duplicated in both include_matches and exclude_matches.",
88        field,
89        value,
90    ))]
91    DuplicatedMatches { field: String, value: String },
92    #[snafu(display(
93        "`current_boot_only: false` not supported for systemd versions 250 through 257 (got {}).",
94        systemd_version
95    ))]
96    AllBootsNotSupported { systemd_version: u32 },
97}
98
99type Matches = HashMap<String, HashSet<String>>;
100
101/// Configuration for the `journald` source.
102#[configurable_component(source("journald", "Collect logs from JournalD."))]
103#[derive(Clone, Debug)]
104#[serde(deny_unknown_fields)]
105pub struct JournaldConfig {
106    /// Only include entries that appended to the journal after the entries have been read.
107    #[serde(default)]
108    pub since_now: bool,
109
110    /// Only include entries that occurred after the current boot of the system.
111    #[serde(default = "crate::serde::default_true")]
112    pub current_boot_only: bool,
113
114    /// A list of unit names to monitor.
115    ///
116    /// If empty or not present, all units are accepted.
117    ///
118    /// Unit names lacking a `.` have `.service` appended to make them a valid service unit name.
119    ///
120    /// **Note:** This option matches only the `_SYSTEMD_UNIT` field, which is narrower than `journalctl --unit`.
121    /// Messages from systemd about unit lifecycle (start/stop) have `_SYSTEMD_UNIT=init.scope` and will not match.
122    /// To capture these, explicitly include `init.scope` or use `include_matches` for finer control.
123    #[serde(default)]
124    #[configurable(metadata(docs::examples = "ntpd", docs::examples = "sysinit.target"))]
125    pub include_units: Vec<String>,
126
127    /// A list of unit names to exclude from monitoring.
128    ///
129    /// Unit names lacking a `.` have `.service` appended to make them a valid service unit
130    /// name.
131    #[serde(default)]
132    #[configurable(metadata(docs::examples = "badservice", docs::examples = "sysinit.target"))]
133    pub exclude_units: Vec<String>,
134
135    /// A list of sets of field/value pairs to monitor.
136    ///
137    /// If empty or not present, all journal fields are accepted.
138    ///
139    /// If `include_units` is specified, it is merged into this list.
140    #[serde(default)]
141    #[configurable(metadata(
142        docs::additional_props_description = "The set of field values to match in journal entries that are to be included."
143    ))]
144    #[configurable(metadata(docs::examples = "matches_examples()"))]
145    pub include_matches: Matches,
146
147    /// A list of sets of field/value pairs that, if any are present in a journal entry,
148    /// excludes the entry from this source.
149    ///
150    /// If `exclude_units` is specified, it is merged into this list.
151    #[serde(default)]
152    #[configurable(metadata(
153        docs::additional_props_description = "The set of field values to match in journal entries that are to be excluded."
154    ))]
155    #[configurable(metadata(docs::examples = "matches_examples()"))]
156    pub exclude_matches: Matches,
157
158    /// The directory used to persist file checkpoint positions.
159    ///
160    /// By default, the [global `data_dir` option][global_data_dir] is used.
161    /// Make sure the running user has write permissions to this directory.
162    ///
163    /// If this directory is specified, then Vector will attempt to create it.
164    ///
165    /// [global_data_dir]: https://vector.dev/docs/reference/configuration/global-options/#data_dir
166    #[serde(default)]
167    #[configurable(metadata(docs::examples = "/var/lib/vector"))]
168    #[configurable(metadata(docs::human_name = "Data Directory"))]
169    pub data_dir: Option<PathBuf>,
170
171    /// A list of extra command line arguments to pass to `journalctl`.
172    ///
173    /// If specified, it is merged to the command line arguments as-is.
174    #[serde(default)]
175    #[configurable(metadata(docs::examples = "--merge"))]
176    pub extra_args: Vec<String>,
177
178    /// The systemd journal is read in batches, and a checkpoint is set at the end of each batch.
179    ///
180    /// This option limits the size of the batch.
181    #[serde(default = "default_batch_size")]
182    #[configurable(metadata(docs::type_unit = "events"))]
183    pub batch_size: usize,
184
185    /// The full path of the `journalctl` executable.
186    ///
187    /// If not set, a search is done for the `journalctl` path.
188    #[serde(default)]
189    pub journalctl_path: Option<PathBuf>,
190
191    /// The full path of the journal directory.
192    ///
193    /// If not set, `journalctl` uses the default system journal path.
194    #[serde(default)]
195    pub journal_directory: Option<PathBuf>,
196
197    /// The [journal namespace][journal-namespace].
198    ///
199    /// This value is passed to `journalctl` through the [`--namespace` option][journalctl-namespace-option].
200    /// If not set, `journalctl` uses the default namespace.
201    ///
202    /// [journal-namespace]: https://www.freedesktop.org/software/systemd/man/systemd-journald.service.html#Journal%20Namespaces
203    /// [journalctl-namespace-option]: https://www.freedesktop.org/software/systemd/man/journalctl.html#--namespace=NAMESPACE
204    #[serde(default)]
205    pub journal_namespace: Option<String>,
206
207    #[configurable(derived)]
208    #[serde(default, deserialize_with = "bool_or_struct")]
209    acknowledgements: SourceAcknowledgementsConfig,
210
211    /// Enables remapping the `PRIORITY` field from an integer to string value.
212    ///
213    /// Has no effect unless the value of the field is already an integer.
214    #[serde(default)]
215    #[configurable(
216        deprecated = "This option has been deprecated, use the `remap` transform and `to_syslog_level` function instead."
217    )]
218    remap_priority: bool,
219
220    /// The namespace to use for logs. This overrides the global setting.
221    #[configurable(metadata(docs::hidden))]
222    #[serde(default)]
223    log_namespace: Option<bool>,
224
225    /// Whether to emit the [__CURSOR field][cursor]. See also [sd_journal_get_cursor][get_cursor].
226    ///
227    /// [cursor]: https://www.freedesktop.org/software/systemd/man/latest/systemd.journal-fields.html#Address%20Fields
228    /// [get_cursor]: https://www.freedesktop.org/software/systemd/man/latest/sd_journal_get_cursor.html
229    #[serde(default = "crate::serde::default_false")]
230    emit_cursor: bool,
231}
232
233const fn default_batch_size() -> usize {
234    16
235}
236
237fn matches_examples() -> HashMap<String, Vec<String>> {
238    HashMap::<_, _>::from_iter([
239        (
240            "_SYSTEMD_UNIT".to_owned(),
241            vec!["sshd.service".to_owned(), "ntpd.service".to_owned()],
242        ),
243        ("_TRANSPORT".to_owned(), vec!["kernel".to_owned()]),
244    ])
245}
246
247impl JournaldConfig {
248    fn merged_include_matches(&self) -> Matches {
249        Self::merge_units(&self.include_matches, &self.include_units)
250    }
251
252    fn merged_exclude_matches(&self) -> Matches {
253        Self::merge_units(&self.exclude_matches, &self.exclude_units)
254    }
255
256    fn merge_units(matches: &Matches, units: &[String]) -> Matches {
257        let mut matches = matches.clone();
258        for unit in units {
259            let entry = matches.entry(String::from(SYSTEMD_UNIT));
260            entry.or_default().insert(fixup_unit(unit));
261        }
262        matches
263    }
264
265    /// Builds the `schema::Definition` for this source using the provided `LogNamespace`.
266    fn schema_definition(&self, log_namespace: LogNamespace) -> Definition {
267        let schema_definition = match log_namespace {
268            LogNamespace::Vector => Definition::new_with_default_metadata(
269                Kind::bytes().or_null(),
270                [LogNamespace::Vector],
271            ),
272            LogNamespace::Legacy => Definition::new_with_default_metadata(
273                Kind::object(Collection::empty()),
274                [LogNamespace::Legacy],
275            ),
276        };
277
278        let mut schema_definition = schema_definition
279            .with_standard_vector_source_metadata()
280            // for metadata that is added to the events dynamically through the Record
281            .with_source_metadata(
282                JournaldConfig::NAME,
283                None,
284                &owned_value_path!("metadata"),
285                Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
286                None,
287            )
288            .with_source_metadata(
289                JournaldConfig::NAME,
290                None,
291                &owned_value_path!("timestamp"),
292                Kind::timestamp().or_undefined(),
293                Some("timestamp"),
294            )
295            .with_source_metadata(
296                JournaldConfig::NAME,
297                log_schema().host_key().cloned().map(LegacyKey::Overwrite),
298                &owned_value_path!("host"),
299                Kind::bytes().or_undefined(),
300                Some("host"),
301            );
302
303        // for metadata that is added to the events dynamically through the Record
304        if log_namespace == LogNamespace::Legacy {
305            schema_definition = schema_definition.unknown_fields(Kind::bytes());
306        }
307
308        schema_definition
309    }
310}
311
312impl Default for JournaldConfig {
313    fn default() -> Self {
314        Self {
315            since_now: false,
316            current_boot_only: true,
317            include_units: vec![],
318            exclude_units: vec![],
319            include_matches: Default::default(),
320            exclude_matches: Default::default(),
321            data_dir: None,
322            batch_size: default_batch_size(),
323            journalctl_path: None,
324            journal_directory: None,
325            journal_namespace: None,
326            extra_args: vec![],
327            acknowledgements: Default::default(),
328            remap_priority: false,
329            log_namespace: None,
330            emit_cursor: false,
331        }
332    }
333}
334
335impl_generate_config_from_default!(JournaldConfig);
336
337type Record = HashMap<String, String>;
338
339#[async_trait::async_trait]
340#[typetag::serde(name = "journald")]
341impl SourceConfig for JournaldConfig {
342    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
343        if self.remap_priority {
344            warn!(
345                "DEPRECATION, option `remap_priority` has been deprecated. Please use the `remap` transform and function `to_syslog_level` instead."
346            );
347        }
348
349        let data_dir = cx
350            .globals
351            // source are only global, name can be used for subdir
352            .resolve_and_make_data_subdir(self.data_dir.as_ref(), cx.key.id())?;
353
354        if let Some(unit) = self
355            .include_units
356            .iter()
357            .find(|unit| self.exclude_units.contains(unit))
358        {
359            let unit = unit.into();
360            return Err(BuildError::DuplicatedUnit { unit }.into());
361        }
362
363        let include_matches = self.merged_include_matches();
364        let exclude_matches = self.merged_exclude_matches();
365
366        if let Some((field, value)) = find_duplicate_match(&include_matches, &exclude_matches) {
367            return Err(BuildError::DuplicatedMatches { field, value }.into());
368        }
369
370        let mut checkpoint_path = data_dir;
371        checkpoint_path.push(CHECKPOINT_FILENAME);
372
373        let journalctl_path = self
374            .journalctl_path
375            .clone()
376            .unwrap_or_else(|| JOURNALCTL.clone());
377
378        let systemd_version = get_systemd_version_from_journalctl(&journalctl_path).await?;
379
380        if !self.current_boot_only && (250..=257).contains(&systemd_version) {
381            // https://github.com/vectordotdev/vector/issues/18068
382            return Err(BuildError::AllBootsNotSupported { systemd_version }.into());
383        }
384
385        let starter = StartJournalctl::new(
386            journalctl_path,
387            systemd_version,
388            self.journal_directory.clone(),
389            self.journal_namespace.clone(),
390            self.current_boot_only,
391            self.since_now,
392            self.extra_args.clone(),
393        );
394
395        let batch_size = self.batch_size;
396        let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
397        let log_namespace = cx.log_namespace(self.log_namespace);
398
399        Ok(Box::pin(
400            JournaldSource {
401                include_matches,
402                exclude_matches,
403                checkpoint_path,
404                batch_size,
405                remap_priority: self.remap_priority,
406                out: cx.out,
407                acknowledgements,
408                starter,
409                log_namespace,
410                emit_cursor: self.emit_cursor,
411            }
412            .run_shutdown(cx.shutdown),
413        ))
414    }
415
416    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
417        let schema_definition =
418            self.schema_definition(global_log_namespace.merge(self.log_namespace));
419
420        vec![SourceOutput::new_maybe_logs(
421            DataType::Log,
422            schema_definition,
423        )]
424    }
425
426    fn can_acknowledge(&self) -> bool {
427        true
428    }
429}
430
431struct JournaldSource {
432    include_matches: Matches,
433    exclude_matches: Matches,
434    checkpoint_path: PathBuf,
435    batch_size: usize,
436    remap_priority: bool,
437    out: SourceSender,
438    acknowledgements: bool,
439    starter: StartJournalctl,
440    log_namespace: LogNamespace,
441    emit_cursor: bool,
442}
443
444impl JournaldSource {
445    async fn run_shutdown(self, shutdown: ShutdownSignal) -> Result<(), ()> {
446        let checkpointer = StatefulCheckpointer::new(self.checkpoint_path.clone())
447            .await
448            .map_err(|error| {
449                emit!(JournaldCheckpointFileOpenError {
450                    error,
451                    path: self
452                        .checkpoint_path
453                        .to_str()
454                        .unwrap_or("unknown")
455                        .to_string(),
456                });
457            })?;
458
459        let checkpointer = SharedCheckpointer::new(checkpointer);
460        let finalizer = Finalizer::new(
461            self.acknowledgements,
462            checkpointer.clone(),
463            shutdown.clone(),
464        );
465
466        self.run(checkpointer, finalizer, shutdown).await;
467
468        Ok(())
469    }
470
471    async fn run(
472        mut self,
473        checkpointer: SharedCheckpointer,
474        finalizer: Finalizer,
475        mut shutdown: ShutdownSignal,
476    ) {
477        loop {
478            if matches!(poll!(&mut shutdown), Poll::Ready(_)) {
479                break;
480            }
481
482            info!("Starting journalctl.");
483            let cursor = checkpointer.lock().await.cursor.clone();
484            match self.starter.start(cursor.as_deref()) {
485                Ok((stdout_stream, stderr_stream, running)) => {
486                    if !self
487                        .run_stream(stdout_stream, stderr_stream, &finalizer, shutdown.clone())
488                        .await
489                    {
490                        return;
491                    }
492                    // Explicit drop to ensure it isn't dropped earlier.
493                    drop(running);
494                }
495                Err(error) => {
496                    emit!(JournaldStartJournalctlError { error });
497                }
498            }
499
500            // journalctl process should never stop,
501            // so it is an error if we reach here.
502            tokio::select! {
503                _ = &mut shutdown => break,
504                _ = sleep(BACKOFF_DURATION) => (),
505            }
506        }
507    }
508
509    /// Process `journalctl` output until some error occurs.
510    /// Return `true` if should restart `journalctl`.
511    async fn run_stream<'a>(
512        &'a mut self,
513        mut stdout_stream: JournalStream,
514        stderr_stream: JournalStream,
515        finalizer: &'a Finalizer,
516        mut shutdown: ShutdownSignal,
517    ) -> bool {
518        let bytes_received = register!(BytesReceived::from(Protocol::from("journald")));
519        let events_received = register!(EventsReceived);
520
521        // Spawn stderr handler task
522        let stderr_handler = tokio::spawn(Self::handle_stderr(stderr_stream));
523
524        let batch_size = self.batch_size;
525        let result = loop {
526            let mut batch = Batch::new(self);
527
528            // Start the timeout counter only once we have received a
529            // valid and non-filtered event.
530            while batch.events.is_empty() {
531                let item = tokio::select! {
532                    _ = &mut shutdown => {
533                        stderr_handler.abort();
534                        return false;
535                    },
536                    item = stdout_stream.next() => item,
537                };
538                if !batch.handle_next(item) {
539                    stderr_handler.abort();
540                    return true;
541                }
542            }
543
544            let timeout = tokio::time::sleep(BATCH_TIMEOUT);
545            tokio::pin!(timeout);
546
547            for _ in 1..batch_size {
548                tokio::select! {
549                    _ = &mut timeout => break,
550                    result = stdout_stream.next() => if !batch.handle_next(result) {
551                        break;
552                    }
553                }
554            }
555            if let Some(x) = batch
556                .finish(finalizer, &bytes_received, &events_received)
557                .await
558            {
559                break x;
560            }
561        };
562
563        stderr_handler.abort();
564        result
565    }
566
567    /// Handle stderr stream from journalctl process
568    async fn handle_stderr(mut stderr_stream: JournalStream) {
569        while let Some(result) = stderr_stream.next().await {
570            match result {
571                Ok(line) => {
572                    let line_str = String::from_utf8_lossy(&line);
573                    let trimmed = line_str.trim();
574                    if !trimmed.is_empty() {
575                        warn!("Warning journalctl stderr: {trimmed}");
576                    }
577                }
578                Err(err) => {
579                    error!("Error reading journalctl stderr: {err}");
580                    break;
581                }
582            }
583        }
584    }
585}
586
587struct Batch<'a> {
588    events: Vec<LogEvent>,
589    record_size: usize,
590    exiting: Option<bool>,
591    batch: Option<BatchNotifier>,
592    receiver: Option<BatchStatusReceiver>,
593    source: &'a mut JournaldSource,
594    cursor: Option<String>,
595}
596
597impl<'a> Batch<'a> {
598    fn new(source: &'a mut JournaldSource) -> Self {
599        let (batch, receiver) = BatchNotifier::maybe_new_with_receiver(source.acknowledgements);
600        Self {
601            events: Vec::new(),
602            record_size: 0,
603            exiting: None,
604            batch,
605            receiver,
606            source,
607            cursor: None,
608        }
609    }
610
611    fn handle_next(&mut self, result: Option<Result<Bytes, BoxedFramingError>>) -> bool {
612        match result {
613            None => {
614                warn!("Journalctl process stopped.");
615                self.exiting = Some(true);
616                false
617            }
618            Some(Err(error)) => {
619                emit!(JournaldReadError { error });
620                false
621            }
622            Some(Ok(bytes)) => {
623                match decode_record(&bytes, self.source.remap_priority) {
624                    Ok(mut record) => {
625                        if self.source.emit_cursor {
626                            if let Some(tmp) = record.get(CURSOR) {
627                                self.cursor = Some(tmp.clone());
628                            }
629                        } else if let Some(tmp) = record.remove(CURSOR) {
630                            self.cursor = Some(tmp);
631                        }
632
633                        if !filter_matches(
634                            &record,
635                            &self.source.include_matches,
636                            &self.source.exclude_matches,
637                        ) {
638                            self.record_size += bytes.len();
639
640                            let mut event = create_log_event_from_record(
641                                record,
642                                &self.batch,
643                                self.source.log_namespace,
644                            );
645
646                            enrich_log_event(&mut event, self.source.log_namespace);
647
648                            self.events.push(event);
649                        }
650                    }
651                    Err(error) => {
652                        emit!(JournaldInvalidRecordError {
653                            error,
654                            text: String::from_utf8_lossy(&bytes).into_owned()
655                        });
656                    }
657                }
658                true
659            }
660        }
661    }
662
663    async fn finish(
664        mut self,
665        finalizer: &Finalizer,
666        bytes_received: &'a Registered<BytesReceived>,
667        events_received: &'a Registered<EventsReceived>,
668    ) -> Option<bool> {
669        drop(self.batch);
670
671        if self.record_size > 0 {
672            bytes_received.emit(ByteSize(self.record_size));
673        }
674
675        if !self.events.is_empty() {
676            let count = self.events.len();
677            let byte_size = self.events.estimated_json_encoded_size_of();
678            events_received.emit(CountByteSize(count, byte_size));
679
680            match self.source.out.send_batch(self.events).await {
681                Ok(_) => {
682                    if let Some(cursor) = self.cursor {
683                        finalizer.finalize(cursor, self.receiver).await;
684                    }
685                }
686                Err(_) => {
687                    emit!(StreamClosedError { count });
688                    // `out` channel is closed, don't restart journalctl.
689                    self.exiting = Some(false);
690                }
691            }
692        }
693        self.exiting
694    }
695}
696
697type JournalStream = BoxStream<'static, Result<Bytes, BoxedFramingError>>;
698
699struct StartJournalctl {
700    path: PathBuf,
701    systemd_version: u32,
702    journal_dir: Option<PathBuf>,
703    journal_namespace: Option<String>,
704    current_boot_only: bool,
705    since_now: bool,
706    extra_args: Vec<String>,
707}
708
709impl StartJournalctl {
710    const fn new(
711        path: PathBuf,
712        systemd_version: u32,
713        journal_dir: Option<PathBuf>,
714        journal_namespace: Option<String>,
715        current_boot_only: bool,
716        since_now: bool,
717        extra_args: Vec<String>,
718    ) -> Self {
719        Self {
720            path,
721            systemd_version,
722            journal_dir,
723            journal_namespace,
724            current_boot_only,
725            since_now,
726            extra_args,
727        }
728    }
729
730    fn make_command(&self, checkpoint: Option<&str>) -> Command {
731        let mut command = Command::new(&self.path);
732        command.stdout(Stdio::piped());
733        command.stderr(Stdio::piped());
734        command.arg("--follow");
735        command.arg("--all");
736        command.arg("--show-cursor");
737        command.arg("--output=json");
738
739        if let Some(dir) = &self.journal_dir {
740            command.arg(format!("--directory={}", dir.display()));
741        }
742
743        if let Some(namespace) = &self.journal_namespace {
744            command.arg(format!("--namespace={namespace}"));
745        }
746
747        // By default entries from all boots are included
748        // systemd 242 introduces support for --boot=all
749        // systemd 250 lets --follow imply --boot (with no facility to override)
750        // systemd 258 allows to override --boot as implied by --follow
751        if self.current_boot_only {
752            if self.systemd_version < 250 {
753                command.arg("--boot");
754            }
755        } else if self.systemd_version >= 258 {
756            command.arg("--boot=all");
757        }
758
759        if let Some(cursor) = checkpoint {
760            command.arg(format!("--after-cursor={cursor}"));
761        } else if self.since_now {
762            command.arg("--since=now");
763        } else {
764            // journalctl --follow only outputs a few lines without a starting point
765            command.arg("--since=2000-01-01");
766        }
767
768        if !self.extra_args.is_empty() {
769            command.args(&self.extra_args);
770        }
771
772        command
773    }
774
775    fn start(
776        &mut self,
777        checkpoint: Option<&str>,
778    ) -> crate::Result<(JournalStream, JournalStream, RunningJournalctl)> {
779        let mut command = self.make_command(checkpoint);
780
781        let mut child = command.spawn().context(JournalctlSpawnSnafu)?;
782
783        let stdout_stream = FramedRead::new(
784            child.stdout.take().unwrap(),
785            CharacterDelimitedDecoder::new(b'\n'),
786        );
787
788        let stderr = child.stderr.take().unwrap();
789        let stderr_stream = FramedRead::new(stderr, CharacterDelimitedDecoder::new(b'\n'));
790
791        Ok((
792            stdout_stream.boxed(),
793            stderr_stream.boxed(),
794            RunningJournalctl(child),
795        ))
796    }
797}
798
799struct RunningJournalctl(Child);
800
801impl Drop for RunningJournalctl {
802    fn drop(&mut self) {
803        if let Some(pid) = self.0.id().and_then(|pid| pid.try_into().ok()) {
804            _ = kill(Pid::from_raw(pid), Signal::SIGTERM);
805        }
806    }
807}
808
809async fn get_systemd_version_from_journalctl(journalctl_path: &PathBuf) -> crate::Result<u32> {
810    let stdout = Command::new(journalctl_path)
811        .arg("--version")
812        .output()
813        .await
814        .context(JournalctlSpawnSnafu)?
815        .stdout;
816
817    // output format: `systemd {version_number} ({full_version}){newline}{config ...}`
818    let stdout = String::from_utf8_lossy(&stdout);
819    Ok(stdout
820        .split_whitespace()
821        .nth(1)
822        .and_then(|s| s.parse::<u32>().ok())
823        .ok_or_else(|| BuildError::JournalctlParseVersion {
824            output: {
825                let cutoff = 40;
826                let length = stdout.chars().count();
827                format!(
828                    "{}{}",
829                    stdout.chars().take(cutoff).collect::<String>(),
830                    if length > cutoff {
831                        format!(" ..{} more char(s)", length - cutoff)
832                    } else {
833                        "".to_string()
834                    }
835                )
836            },
837        })?)
838}
839
840fn enrich_log_event(log: &mut LogEvent, log_namespace: LogNamespace) {
841    match log_namespace {
842        LogNamespace::Vector => {
843            if let Some(host) = log
844                .get(metadata_path!(JournaldConfig::NAME, "metadata"))
845                .and_then(|meta| meta.get(HOSTNAME))
846            {
847                log.insert(metadata_path!(JournaldConfig::NAME, "host"), host.clone());
848            }
849        }
850        LogNamespace::Legacy => {
851            if let Some(host) = log.remove(event_path!(HOSTNAME)) {
852                log_namespace.insert_source_metadata(
853                    JournaldConfig::NAME,
854                    log,
855                    log_schema().host_key().map(LegacyKey::Overwrite),
856                    path!("host"),
857                    host,
858                );
859            }
860        }
861    }
862
863    // Create a Utc timestamp from an existing log field if present.
864    let timestamp_value = match log_namespace {
865        LogNamespace::Vector => log
866            .get(metadata_path!(JournaldConfig::NAME, "metadata"))
867            .and_then(|meta| {
868                meta.get(SOURCE_TIMESTAMP)
869                    .or_else(|| meta.get(RECEIVED_TIMESTAMP))
870            }),
871        LogNamespace::Legacy => log
872            .get(event_path!(SOURCE_TIMESTAMP))
873            .or_else(|| log.get(event_path!(RECEIVED_TIMESTAMP))),
874    };
875
876    let timestamp = timestamp_value
877        .filter(|&ts| ts.is_bytes())
878        .and_then(|ts| ts.as_str().unwrap().parse::<u64>().ok())
879        .map(|ts| {
880            chrono::Utc
881                .timestamp_opt((ts / 1_000_000) as i64, (ts % 1_000_000) as u32 * 1_000)
882                .single()
883                .expect("invalid timestamp")
884        });
885
886    // Add timestamp.
887    match log_namespace {
888        LogNamespace::Vector => {
889            log.insert(metadata_path!("vector", "ingest_timestamp"), Utc::now());
890
891            if let Some(ts) = timestamp {
892                log.insert(metadata_path!(JournaldConfig::NAME, "timestamp"), ts);
893            }
894        }
895        LogNamespace::Legacy => {
896            if let Some(ts) = timestamp {
897                log.maybe_insert(log_schema().timestamp_key_target_path(), ts);
898            }
899        }
900    }
901
902    // Add source type.
903    log_namespace.insert_vector_metadata(
904        log,
905        log_schema().source_type_key(),
906        path!("source_type"),
907        JournaldConfig::NAME,
908    );
909}
910
911fn create_log_event_from_record(
912    mut record: Record,
913    batch: &Option<BatchNotifier>,
914    log_namespace: LogNamespace,
915) -> LogEvent {
916    match log_namespace {
917        LogNamespace::Vector => {
918            let message_value = record
919                .remove(MESSAGE)
920                .map(|msg| Value::Bytes(Bytes::from(msg)))
921                .unwrap_or(Value::Null);
922
923            let mut log = LogEvent::from(message_value).with_batch_notifier_option(batch);
924
925            // Add the remaining fields from the Record to the log event into an object to avoid collisions.
926            record.iter().for_each(|(key, value)| {
927                log.metadata_mut()
928                    .value_mut()
929                    .insert(path!(JournaldConfig::NAME, "metadata", key), value.as_str());
930            });
931
932            log
933        }
934        LogNamespace::Legacy => {
935            let mut log = LogEvent::from_iter(record).with_batch_notifier_option(batch);
936
937            if let Some(message) = log.remove(event_path!(MESSAGE)) {
938                log.maybe_insert(log_schema().message_key_target_path(), message);
939            }
940
941            log
942        }
943    }
944}
945
946/// Map the given unit name into a valid systemd unit
947/// by appending ".service" if no extension is present.
948fn fixup_unit(unit: &str) -> String {
949    if unit.contains('.') {
950        unit.into()
951    } else {
952        format!("{unit}.service")
953    }
954}
955
956fn decode_record(line: &[u8], remap: bool) -> Result<Record, JsonError> {
957    let mut record = serde_json::from_str::<JsonValue>(&String::from_utf8_lossy(line))?;
958    // journalctl will output non-ASCII values using an array
959    // of integers. Look for those values and re-parse them.
960    if let Some(record) = record.as_object_mut() {
961        for (_, value) in record.iter_mut().filter(|(_, v)| v.is_array()) {
962            *value = decode_array(value.as_array().expect("already validated"));
963        }
964    }
965    if remap {
966        record.get_mut("PRIORITY").map(remap_priority);
967    }
968    serde_json::from_value(record)
969}
970
971fn decode_array(array: &[JsonValue]) -> JsonValue {
972    decode_array_as_bytes(array).unwrap_or_else(|| {
973        let ser = serde_json::to_string(array).expect("already deserialized");
974        JsonValue::String(ser)
975    })
976}
977
978fn decode_array_as_bytes(array: &[JsonValue]) -> Option<JsonValue> {
979    // From the array of values, turn all the numbers into bytes, and
980    // then the bytes into a string, but return None if any value in the
981    // array was not a valid byte.
982    array
983        .iter()
984        .map(|item| {
985            item.as_u64().and_then(|num| match num {
986                num if num <= u8::MAX as u64 => Some(num as u8),
987                _ => None,
988            })
989        })
990        .collect::<Option<Vec<u8>>>()
991        .map(|array| String::from_utf8_lossy(&array).into())
992}
993
994fn remap_priority(priority: &mut JsonValue) {
995    if let Some(num) = priority.as_str().and_then(|s| usize::from_str(s).ok()) {
996        let text = match num {
997            0 => "EMERG",
998            1 => "ALERT",
999            2 => "CRIT",
1000            3 => "ERR",
1001            4 => "WARNING",
1002            5 => "NOTICE",
1003            6 => "INFO",
1004            7 => "DEBUG",
1005            _ => "UNKNOWN",
1006        };
1007        *priority = JsonValue::String(text.into());
1008    }
1009}
1010
1011fn filter_matches(record: &Record, includes: &Matches, excludes: &Matches) -> bool {
1012    match (includes.is_empty(), excludes.is_empty()) {
1013        (true, true) => false,
1014        (false, true) => !contains_match(record, includes),
1015        (true, false) => contains_match(record, excludes),
1016        (false, false) => !contains_match(record, includes) || contains_match(record, excludes),
1017    }
1018}
1019
1020fn contains_match(record: &Record, matches: &Matches) -> bool {
1021    let f = move |(field, value)| {
1022        matches
1023            .get(field)
1024            .map(|x| x.contains(value))
1025            .unwrap_or(false)
1026    };
1027    record.iter().any(f)
1028}
1029
1030fn find_duplicate_match(a_matches: &Matches, b_matches: &Matches) -> Option<(String, String)> {
1031    for (a_key, a_values) in a_matches {
1032        if let Some(b_values) = b_matches.get(a_key.as_str()) {
1033            for (a, b) in a_values
1034                .iter()
1035                .flat_map(|x| std::iter::repeat(x).zip(b_values.iter()))
1036            {
1037                if a == b {
1038                    return Some((a_key.into(), b.into()));
1039                }
1040            }
1041        }
1042    }
1043    None
1044}
1045
1046enum Finalizer {
1047    Sync(SharedCheckpointer),
1048    Async(OrderedFinalizer<String>),
1049}
1050
1051impl Finalizer {
1052    fn new(
1053        acknowledgements: bool,
1054        checkpointer: SharedCheckpointer,
1055        shutdown: ShutdownSignal,
1056    ) -> Self {
1057        if acknowledgements {
1058            let (finalizer, mut ack_stream) = OrderedFinalizer::new(Some(shutdown));
1059            tokio::spawn(async move {
1060                while let Some((status, cursor)) = ack_stream.next().await {
1061                    if status == BatchStatus::Delivered {
1062                        checkpointer.lock().await.set(cursor).await;
1063                    }
1064                }
1065            });
1066            Self::Async(finalizer)
1067        } else {
1068            Self::Sync(checkpointer)
1069        }
1070    }
1071
1072    async fn finalize(&self, cursor: String, receiver: Option<BatchStatusReceiver>) {
1073        match (self, receiver) {
1074            (Self::Sync(checkpointer), None) => checkpointer.lock().await.set(cursor).await,
1075            (Self::Async(finalizer), Some(receiver)) => finalizer.add(cursor, receiver),
1076            _ => {
1077                unreachable!("Cannot have async finalization without a receiver in journald source")
1078            }
1079        }
1080    }
1081}
1082
1083struct Checkpointer {
1084    file: File,
1085    filename: PathBuf,
1086}
1087
1088impl Checkpointer {
1089    async fn new(filename: PathBuf) -> Result<Self, io::Error> {
1090        let file = OpenOptions::new()
1091            .read(true)
1092            .write(true)
1093            .create(true)
1094            .truncate(false)
1095            .open(&filename)
1096            .await?;
1097        Ok(Checkpointer { file, filename })
1098    }
1099
1100    async fn set(&mut self, token: &str) -> Result<(), io::Error> {
1101        self.file.seek(SeekFrom::Start(0)).await?;
1102        self.file.write_all(format!("{token}\n").as_bytes()).await
1103    }
1104
1105    async fn get(&mut self) -> Result<Option<String>, io::Error> {
1106        let mut buf = Vec::<u8>::new();
1107        self.file.seek(SeekFrom::Start(0)).await?;
1108        self.file.read_to_end(&mut buf).await?;
1109        match buf.len() {
1110            0 => Ok(None),
1111            _ => {
1112                let text = String::from_utf8_lossy(&buf);
1113                match text.find('\n') {
1114                    Some(nl) => Ok(Some(String::from(&text[..nl]))),
1115                    None => Ok(None), // Maybe return an error?
1116                }
1117            }
1118        }
1119    }
1120}
1121
1122struct StatefulCheckpointer {
1123    checkpointer: Checkpointer,
1124    cursor: Option<String>,
1125}
1126
1127impl StatefulCheckpointer {
1128    async fn new(filename: PathBuf) -> Result<Self, io::Error> {
1129        let mut checkpointer = Checkpointer::new(filename).await?;
1130        let cursor = checkpointer.get().await?;
1131        Ok(Self {
1132            checkpointer,
1133            cursor,
1134        })
1135    }
1136
1137    async fn set(&mut self, token: impl Into<String>) {
1138        let token = token.into();
1139        if let Err(error) = self.checkpointer.set(&token).await {
1140            emit!(JournaldCheckpointSetError {
1141                error,
1142                filename: self
1143                    .checkpointer
1144                    .filename
1145                    .to_str()
1146                    .unwrap_or("unknown")
1147                    .to_string(),
1148            });
1149        }
1150        self.cursor = Some(token);
1151    }
1152}
1153
1154#[derive(Clone)]
1155struct SharedCheckpointer(Arc<Mutex<StatefulCheckpointer>>);
1156
1157impl SharedCheckpointer {
1158    fn new(c: StatefulCheckpointer) -> Self {
1159        Self(Arc::new(Mutex::new(c)))
1160    }
1161
1162    async fn lock(&self) -> MutexGuard<'_, StatefulCheckpointer> {
1163        self.0.lock().await
1164    }
1165}
1166
1167#[cfg(test)]
1168mod checkpointer_tests {
1169    use tempfile::tempdir;
1170    use tokio::fs::read_to_string;
1171
1172    use super::*;
1173
1174    #[test]
1175    fn generate_config() {
1176        crate::test_util::test_generate_config::<JournaldConfig>();
1177    }
1178
1179    #[tokio::test]
1180    async fn journald_checkpointer_works() {
1181        let tempdir = tempdir().unwrap();
1182        let mut filename = tempdir.path().to_path_buf();
1183        filename.push(CHECKPOINT_FILENAME);
1184        let mut checkpointer = Checkpointer::new(filename.clone())
1185            .await
1186            .expect("Creating checkpointer failed!");
1187
1188        assert!(checkpointer.get().await.unwrap().is_none());
1189
1190        checkpointer
1191            .set("first test")
1192            .await
1193            .expect("Setting checkpoint failed");
1194        assert_eq!(checkpointer.get().await.unwrap().unwrap(), "first test");
1195        let contents = read_to_string(filename.clone())
1196            .await
1197            .unwrap_or_else(|_| panic!("Failed to read: {filename:?}"));
1198        assert!(contents.starts_with("first test\n"));
1199
1200        checkpointer
1201            .set("second")
1202            .await
1203            .expect("Setting checkpoint failed");
1204        assert_eq!(checkpointer.get().await.unwrap().unwrap(), "second");
1205        let contents = read_to_string(filename.clone())
1206            .await
1207            .unwrap_or_else(|_| panic!("Failed to read: {filename:?}"));
1208        assert!(contents.starts_with("second\n"));
1209    }
1210}
1211
1212#[cfg(test)]
1213mod tests {
1214    use std::{fs, path::Path};
1215
1216    use tempfile::tempdir;
1217    use tokio::time::{Duration, Instant, sleep, timeout};
1218    use vrl::value::{Value, kind::Collection};
1219
1220    use super::*;
1221    use crate::{
1222        config::ComponentKey,
1223        event::{Event, EventStatus},
1224        test_util::components::assert_source_compliance,
1225    };
1226
1227    const TEST_COMPONENT: &str = "journald-test";
1228    const TEST_JOURNALCTL: &str = "tests/data/journalctl";
1229
1230    async fn run_with_units(iunits: &[&str], xunits: &[&str], cursor: Option<&str>) -> Vec<Event> {
1231        let include_matches = create_unit_matches(iunits.to_vec());
1232        let exclude_matches = create_unit_matches(xunits.to_vec());
1233        run_journal(include_matches, exclude_matches, cursor, false).await
1234    }
1235
1236    async fn run_journal(
1237        include_matches: Matches,
1238        exclude_matches: Matches,
1239        checkpoint: Option<&str>,
1240        emit_cursor: bool,
1241    ) -> Vec<Event> {
1242        assert_source_compliance(&["protocol"], async move {
1243            let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Delivered);
1244
1245            let tempdir = tempdir().unwrap();
1246            let tempdir = tempdir.path().to_path_buf();
1247
1248            if let Some(cursor) = checkpoint {
1249                let mut checkpoint_path = tempdir.clone();
1250                checkpoint_path.push(TEST_COMPONENT);
1251                fs::create_dir(&checkpoint_path).unwrap();
1252                checkpoint_path.push(CHECKPOINT_FILENAME);
1253
1254                let mut checkpointer = Checkpointer::new(checkpoint_path.clone())
1255                    .await
1256                    .expect("Creating checkpointer failed!");
1257
1258                checkpointer
1259                    .set(cursor)
1260                    .await
1261                    .expect("Could not set checkpoint");
1262            }
1263
1264            let (cx, shutdown) =
1265                SourceContext::new_shutdown(&ComponentKey::from(TEST_COMPONENT), tx);
1266            let config = JournaldConfig {
1267                journalctl_path: Some(TEST_JOURNALCTL.into()),
1268                include_matches,
1269                exclude_matches,
1270                data_dir: Some(tempdir),
1271                remap_priority: true,
1272                acknowledgements: false.into(),
1273                emit_cursor,
1274                ..Default::default()
1275            };
1276            let source = config.build(cx).await.unwrap();
1277            tokio::spawn(async move { source.await.unwrap() });
1278
1279            // Hack: Sleep to ensure journalctl process starts and emits events before shutdown.
1280            sleep(Duration::from_secs(1)).await;
1281            shutdown
1282                .shutdown_all(Some(Instant::now() + Duration::from_secs(1)))
1283                .await;
1284
1285            timeout(Duration::from_secs(1), rx.collect()).await.unwrap()
1286        })
1287        .await
1288    }
1289
1290    fn create_unit_matches<S: Into<String>>(units: Vec<S>) -> Matches {
1291        let units: HashSet<String> = units.into_iter().map(Into::into).collect();
1292        let mut map = HashMap::new();
1293        if !units.is_empty() {
1294            map.insert(String::from(SYSTEMD_UNIT), units);
1295        }
1296        map
1297    }
1298
1299    fn create_matches<S: Into<String>>(conditions: Vec<(S, S)>) -> Matches {
1300        let mut matches: Matches = HashMap::new();
1301        for (field, value) in conditions {
1302            matches
1303                .entry(field.into())
1304                .or_default()
1305                .insert(value.into());
1306        }
1307        matches
1308    }
1309
1310    #[tokio::test]
1311    async fn reads_journal() {
1312        let received = run_with_units(&[], &[], None).await;
1313        assert_eq!(received.len(), 8);
1314        assert_eq!(
1315            message(&received[0]),
1316            Value::Bytes("System Initialization".into())
1317        );
1318        assert_eq!(
1319            received[0].as_log()[log_schema().source_type_key().unwrap().to_string()],
1320            "journald".into()
1321        );
1322        assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140001000));
1323        assert_eq!(priority(&received[0]), Value::Bytes("INFO".into()));
1324        assert_eq!(message(&received[1]), Value::Bytes("unit message".into()));
1325        assert_eq!(timestamp(&received[1]), value_ts(1578529839, 140002000));
1326        assert_eq!(priority(&received[1]), Value::Bytes("DEBUG".into()));
1327    }
1328
1329    #[tokio::test]
1330    async fn includes_units() {
1331        let received = run_with_units(&["unit.service"], &[], None).await;
1332        assert_eq!(received.len(), 1);
1333        assert_eq!(message(&received[0]), Value::Bytes("unit message".into()));
1334    }
1335
1336    #[tokio::test]
1337    async fn excludes_units() {
1338        let received = run_with_units(&[], &["unit.service", "badunit.service"], None).await;
1339        assert_eq!(received.len(), 6);
1340        assert_eq!(
1341            message(&received[0]),
1342            Value::Bytes("System Initialization".into())
1343        );
1344        assert_eq!(
1345            message(&received[1]),
1346            Value::Bytes("Missing timestamp".into())
1347        );
1348        assert_eq!(
1349            message(&received[2]),
1350            Value::Bytes("Different timestamps".into())
1351        );
1352    }
1353
1354    #[tokio::test]
1355    async fn emits_cursor() {
1356        let received = run_journal(Matches::new(), Matches::new(), None, true).await;
1357        assert_eq!(cursor(&received[0]), Value::Bytes("1".into()));
1358        assert_eq!(cursor(&received[3]), Value::Bytes("4".into()));
1359        assert_eq!(cursor(&received[7]), Value::Bytes("8".into()));
1360    }
1361
1362    #[tokio::test]
1363    async fn includes_matches() {
1364        let matches = create_matches(vec![("PRIORITY", "ERR")]);
1365        let received = run_journal(matches, HashMap::new(), None, false).await;
1366        assert_eq!(received.len(), 2);
1367        assert_eq!(
1368            message(&received[0]),
1369            Value::Bytes("Different timestamps".into())
1370        );
1371        assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140005000));
1372        assert_eq!(
1373            message(&received[1]),
1374            Value::Bytes("Non-ASCII in other field".into())
1375        );
1376        assert_eq!(timestamp(&received[1]), value_ts(1578529839, 140005000));
1377    }
1378
1379    #[tokio::test]
1380    async fn includes_kernel() {
1381        let matches = create_matches(vec![("_TRANSPORT", "kernel")]);
1382        let received = run_journal(matches, HashMap::new(), None, false).await;
1383        assert_eq!(received.len(), 1);
1384        assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140006000));
1385        assert_eq!(message(&received[0]), Value::Bytes("audit log".into()));
1386    }
1387
1388    #[tokio::test]
1389    async fn excludes_matches() {
1390        let matches = create_matches(vec![("PRIORITY", "INFO"), ("PRIORITY", "DEBUG")]);
1391        let received = run_journal(HashMap::new(), matches, None, false).await;
1392        assert_eq!(received.len(), 5);
1393        assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140003000));
1394        assert_eq!(timestamp(&received[1]), value_ts(1578529839, 140004000));
1395        assert_eq!(timestamp(&received[2]), value_ts(1578529839, 140005000));
1396        assert_eq!(timestamp(&received[3]), value_ts(1578529839, 140005000));
1397        assert_eq!(timestamp(&received[4]), value_ts(1578529839, 140006000));
1398    }
1399
1400    #[tokio::test]
1401    async fn handles_checkpoint() {
1402        let received = run_with_units(&[], &[], Some("1")).await;
1403        assert_eq!(received.len(), 7);
1404        assert_eq!(message(&received[0]), Value::Bytes("unit message".into()));
1405        assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140002000));
1406    }
1407
1408    #[tokio::test]
1409    async fn parses_array_messages() {
1410        let received = run_with_units(&["badunit.service"], &[], None).await;
1411        assert_eq!(received.len(), 1);
1412        assert_eq!(message(&received[0]), Value::Bytes("¿Hello?".into()));
1413    }
1414
1415    #[tokio::test]
1416    async fn parses_array_fields() {
1417        let received = run_with_units(&["syslog.service"], &[], None).await;
1418        assert_eq!(received.len(), 1);
1419        assert_eq!(
1420            received[0].as_log()["SYSLOG_RAW"],
1421            Value::Bytes("¿World?".into())
1422        );
1423    }
1424
1425    #[tokio::test]
1426    async fn parses_string_sequences() {
1427        let received = run_with_units(&["NetworkManager.service"], &[], None).await;
1428        assert_eq!(received.len(), 1);
1429        assert_eq!(
1430            received[0].as_log()["SYSLOG_FACILITY"],
1431            Value::Bytes(r#"["DHCP4","DHCP6"]"#.into())
1432        );
1433    }
1434
1435    #[tokio::test]
1436    async fn handles_missing_timestamp() {
1437        let received = run_with_units(&["stdout"], &[], None).await;
1438        assert_eq!(received.len(), 2);
1439        assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140004000));
1440        assert_eq!(timestamp(&received[1]), value_ts(1578529839, 140005000));
1441    }
1442
1443    #[tokio::test]
1444    async fn handles_acknowledgements() {
1445        let (tx, mut rx) = SourceSender::new_test();
1446
1447        let tempdir = tempdir().unwrap();
1448        let tempdir = tempdir.path().to_path_buf();
1449        let mut checkpoint_path = tempdir.clone();
1450        checkpoint_path.push(TEST_COMPONENT);
1451        fs::create_dir(&checkpoint_path).unwrap();
1452        checkpoint_path.push(CHECKPOINT_FILENAME);
1453
1454        let mut checkpointer = Checkpointer::new(checkpoint_path.clone())
1455            .await
1456            .expect("Creating checkpointer failed!");
1457
1458        let config = JournaldConfig {
1459            journalctl_path: Some(TEST_JOURNALCTL.into()),
1460            data_dir: Some(tempdir),
1461            remap_priority: true,
1462            acknowledgements: true.into(),
1463            ..Default::default()
1464        };
1465        let (cx, _shutdown) = SourceContext::new_shutdown(&ComponentKey::from(TEST_COMPONENT), tx);
1466        let source = config.build(cx).await.unwrap();
1467        tokio::spawn(async move { source.await.unwrap() });
1468
1469        // Make sure the checkpointer cursor is empty
1470        assert_eq!(checkpointer.get().await.unwrap(), None);
1471
1472        // Hack: Sleep to ensure journalctl process starts and emits events.
1473        sleep(Duration::from_secs(1)).await;
1474
1475        // Acknowledge all the received events.
1476        let mut count = 0;
1477        while let Poll::Ready(Some(event)) = futures::poll!(rx.next()) {
1478            // The checkpointer shouldn't set the cursor until the end of the batch.
1479            assert_eq!(checkpointer.get().await.unwrap(), None);
1480            event.metadata().update_status(EventStatus::Delivered);
1481            count += 1;
1482        }
1483        assert_eq!(count, 8);
1484
1485        sleep(Duration::from_millis(100)).await;
1486        assert_eq!(checkpointer.get().await.unwrap().as_deref(), Some("8"));
1487    }
1488
1489    #[test]
1490    fn filter_matches_works_correctly() {
1491        let empty: Matches = HashMap::new();
1492        let includes = create_unit_matches(vec!["one", "two"]);
1493        let excludes = create_unit_matches(vec!["foo", "bar"]);
1494
1495        let zero = HashMap::new();
1496        assert!(!filter_matches(&zero, &empty, &empty));
1497        assert!(filter_matches(&zero, &includes, &empty));
1498        assert!(!filter_matches(&zero, &empty, &excludes));
1499        assert!(filter_matches(&zero, &includes, &excludes));
1500        let mut one = HashMap::new();
1501        one.insert(String::from(SYSTEMD_UNIT), String::from("one"));
1502        assert!(!filter_matches(&one, &empty, &empty));
1503        assert!(!filter_matches(&one, &includes, &empty));
1504        assert!(!filter_matches(&one, &empty, &excludes));
1505        assert!(!filter_matches(&one, &includes, &excludes));
1506        let mut two = HashMap::new();
1507        two.insert(String::from(SYSTEMD_UNIT), String::from("bar"));
1508        assert!(!filter_matches(&two, &empty, &empty));
1509        assert!(filter_matches(&two, &includes, &empty));
1510        assert!(filter_matches(&two, &empty, &excludes));
1511        assert!(filter_matches(&two, &includes, &excludes));
1512    }
1513
1514    #[test]
1515    fn merges_units_and_matches_option() {
1516        let include_units = vec!["one", "two"].into_iter().map(String::from).collect();
1517        let include_matches = create_matches(vec![
1518            ("_SYSTEMD_UNIT", "three.service"),
1519            ("_TRANSPORT", "kernel"),
1520        ]);
1521
1522        let exclude_units = vec!["foo", "bar"].into_iter().map(String::from).collect();
1523        let exclude_matches = create_matches(vec![
1524            ("_SYSTEMD_UNIT", "baz.service"),
1525            ("PRIORITY", "DEBUG"),
1526        ]);
1527
1528        let journald_config = JournaldConfig {
1529            include_units,
1530            include_matches,
1531            exclude_units,
1532            exclude_matches,
1533            ..Default::default()
1534        };
1535
1536        let hashset =
1537            |v: &[&str]| -> HashSet<String> { v.iter().copied().map(String::from).collect() };
1538
1539        let matches = journald_config.merged_include_matches();
1540        let units = matches.get("_SYSTEMD_UNIT").unwrap();
1541        assert_eq!(
1542            units,
1543            &hashset(&["one.service", "two.service", "three.service"])
1544        );
1545        let units = matches.get("_TRANSPORT").unwrap();
1546        assert_eq!(units, &hashset(&["kernel"]));
1547
1548        let matches = journald_config.merged_exclude_matches();
1549        let units = matches.get("_SYSTEMD_UNIT").unwrap();
1550        assert_eq!(
1551            units,
1552            &hashset(&["foo.service", "bar.service", "baz.service"])
1553        );
1554        let units = matches.get("PRIORITY").unwrap();
1555        assert_eq!(units, &hashset(&["DEBUG"]));
1556    }
1557
1558    #[test]
1559    fn find_duplicate_match_works_correctly() {
1560        let include_matches = create_matches(vec![("_TRANSPORT", "kernel")]);
1561        let exclude_matches = create_matches(vec![("_TRANSPORT", "kernel")]);
1562        let (field, value) = find_duplicate_match(&include_matches, &exclude_matches).unwrap();
1563        assert_eq!(field, "_TRANSPORT");
1564        assert_eq!(value, "kernel");
1565
1566        let empty = HashMap::new();
1567        let actual = find_duplicate_match(&empty, &empty);
1568        assert!(actual.is_none());
1569
1570        let actual = find_duplicate_match(&include_matches, &empty);
1571        assert!(actual.is_none());
1572
1573        let actual = find_duplicate_match(&empty, &exclude_matches);
1574        assert!(actual.is_none());
1575    }
1576
1577    #[test]
1578    fn command_options() {
1579        let path = PathBuf::from("journalctl");
1580
1581        let systemd_version = 239;
1582        let journal_dir = None;
1583        let journal_namespace = None;
1584        let current_boot_only = false;
1585        let cursor = None;
1586        let since_now = false;
1587        let extra_args = vec![];
1588
1589        let command = create_command(
1590            &path,
1591            systemd_version,
1592            journal_dir,
1593            journal_namespace,
1594            current_boot_only,
1595            since_now,
1596            cursor,
1597            extra_args,
1598        );
1599        let cmd_line = format!("{command:?}");
1600        assert!(!cmd_line.contains("--directory="));
1601        assert!(!cmd_line.contains("--namespace="));
1602        assert!(!cmd_line.contains("--boot=all"));
1603        assert!(cmd_line.contains("--since=2000-01-01"));
1604
1605        let journal_dir = None;
1606        let journal_namespace = None;
1607        let since_now = true;
1608        let extra_args = vec![];
1609
1610        let command = create_command(
1611            &path,
1612            systemd_version,
1613            journal_dir,
1614            journal_namespace,
1615            current_boot_only,
1616            since_now,
1617            cursor,
1618            extra_args,
1619        );
1620        let cmd_line = format!("{command:?}");
1621        assert!(cmd_line.contains("--since=now"));
1622
1623        let journal_dir = Some(PathBuf::from("/tmp/journal-dir"));
1624        let journal_namespace = Some(String::from("my_namespace"));
1625        let current_boot_only = true;
1626        let cursor = Some("2021-01-01");
1627        let extra_args = vec!["--merge".to_string()];
1628
1629        let command = create_command(
1630            &path,
1631            systemd_version,
1632            journal_dir,
1633            journal_namespace,
1634            current_boot_only,
1635            since_now,
1636            cursor,
1637            extra_args,
1638        );
1639        let cmd_line = format!("{command:?}");
1640        assert!(cmd_line.contains("--directory=/tmp/journal-dir"));
1641        assert!(cmd_line.contains("--namespace=my_namespace"));
1642        assert!(cmd_line.contains("--boot"));
1643        assert!(cmd_line.contains("--after-cursor="));
1644        assert!(cmd_line.contains("--merge"));
1645
1646        let systemd_version = 258;
1647        let journal_dir = None;
1648        let journal_namespace = None;
1649        let current_boot_only = false;
1650        let extra_args = vec![];
1651
1652        let command = create_command(
1653            &path,
1654            systemd_version,
1655            journal_dir,
1656            journal_namespace,
1657            current_boot_only,
1658            since_now,
1659            cursor,
1660            extra_args,
1661        );
1662        let cmd_line = format!("{command:?}");
1663        assert!(cmd_line.contains("--boot=all"));
1664    }
1665
1666    #[allow(clippy::too_many_arguments)]
1667    fn create_command(
1668        path: &Path,
1669        systemd_version: u32,
1670        journal_dir: Option<PathBuf>,
1671        journal_namespace: Option<String>,
1672        current_boot_only: bool,
1673        since_now: bool,
1674        cursor: Option<&str>,
1675        extra_args: Vec<String>,
1676    ) -> Command {
1677        StartJournalctl::new(
1678            path.into(),
1679            systemd_version,
1680            journal_dir,
1681            journal_namespace,
1682            current_boot_only,
1683            since_now,
1684            extra_args,
1685        )
1686        .make_command(cursor)
1687    }
1688
1689    fn message(event: &Event) -> Value {
1690        event.as_log()[log_schema().message_key().unwrap().to_string()].clone()
1691    }
1692
1693    fn timestamp(event: &Event) -> Value {
1694        event.as_log()[log_schema().timestamp_key().unwrap().to_string()].clone()
1695    }
1696
1697    fn cursor(event: &Event) -> Value {
1698        event.as_log()[CURSOR].clone()
1699    }
1700
1701    fn value_ts(secs: i64, usecs: u32) -> Value {
1702        Value::Timestamp(
1703            chrono::Utc
1704                .timestamp_opt(secs, usecs)
1705                .single()
1706                .expect("invalid timestamp"),
1707        )
1708    }
1709
1710    fn priority(event: &Event) -> Value {
1711        event.as_log()["PRIORITY"].clone()
1712    }
1713
1714    #[test]
1715    fn output_schema_definition_vector_namespace() {
1716        let config = JournaldConfig {
1717            log_namespace: Some(true),
1718            ..Default::default()
1719        };
1720
1721        let definitions = config
1722            .outputs(LogNamespace::Vector)
1723            .remove(0)
1724            .schema_definition(true);
1725
1726        let expected_definition =
1727            Definition::new_with_default_metadata(Kind::bytes().or_null(), [LogNamespace::Vector])
1728                .with_metadata_field(
1729                    &owned_value_path!("vector", "source_type"),
1730                    Kind::bytes(),
1731                    None,
1732                )
1733                .with_metadata_field(
1734                    &owned_value_path!("vector", "ingest_timestamp"),
1735                    Kind::timestamp(),
1736                    None,
1737                )
1738                .with_metadata_field(
1739                    &owned_value_path!(JournaldConfig::NAME, "metadata"),
1740                    Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1741                    None,
1742                )
1743                .with_metadata_field(
1744                    &owned_value_path!(JournaldConfig::NAME, "timestamp"),
1745                    Kind::timestamp().or_undefined(),
1746                    Some("timestamp"),
1747                )
1748                .with_metadata_field(
1749                    &owned_value_path!(JournaldConfig::NAME, "host"),
1750                    Kind::bytes().or_undefined(),
1751                    Some("host"),
1752                );
1753
1754        assert_eq!(definitions, Some(expected_definition))
1755    }
1756
1757    #[test]
1758    fn output_schema_definition_legacy_namespace() {
1759        let config = JournaldConfig::default();
1760
1761        let definitions = config
1762            .outputs(LogNamespace::Legacy)
1763            .remove(0)
1764            .schema_definition(true);
1765
1766        let expected_definition = Definition::new_with_default_metadata(
1767            Kind::object(Collection::empty()),
1768            [LogNamespace::Legacy],
1769        )
1770        .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
1771        .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
1772        .with_event_field(
1773            &owned_value_path!("host"),
1774            Kind::bytes().or_undefined(),
1775            Some("host"),
1776        )
1777        .unknown_fields(Kind::bytes());
1778
1779        assert_eq!(definitions, Some(expected_definition))
1780    }
1781
1782    fn matches_schema(config: &JournaldConfig, namespace: LogNamespace) {
1783        let record = r#"{
1784            "PRIORITY":"6",
1785            "SYSLOG_FACILITY":"3",
1786            "SYSLOG_IDENTIFIER":"ntpd",
1787            "_BOOT_ID":"124c781146e841ae8d9b4590df8b9231",
1788            "_CAP_EFFECTIVE":"3fffffffff",
1789            "_CMDLINE":"ntpd: [priv]",
1790            "_COMM":"ntpd",
1791            "_EXE":"/usr/sbin/ntpd",
1792            "_GID":"0",
1793            "_MACHINE_ID":"c36e9ea52800a19d214cb71b53263a28",
1794            "_PID":"2156",
1795            "_STREAM_ID":"92c79f4b45c4457490ebdefece29995e",
1796            "_SYSTEMD_CGROUP":"/system.slice/ntpd.service",
1797            "_SYSTEMD_INVOCATION_ID":"496ad5cd046d48e29f37f559a6d176f8",
1798            "_SYSTEMD_SLICE":"system.slice",
1799            "_SYSTEMD_UNIT":"ntpd.service",
1800            "_TRANSPORT":"stdout",
1801            "_UID":"0",
1802            "__MONOTONIC_TIMESTAMP":"98694000446",
1803            "__REALTIME_TIMESTAMP":"1564173027000443",
1804            "host":"my-host.local",
1805            "message":"reply from 192.168.1.2: offset -0.001791 delay 0.000176, next query 1500s",
1806            "source_type":"journald"
1807        }"#;
1808
1809        let json: serde_json::Value = serde_json::from_str(record).unwrap();
1810        let mut event = Event::from(LogEvent::from(vrl::value::Value::from(json)));
1811
1812        event.as_mut_log().insert("timestamp", chrono::Utc::now());
1813
1814        let definitions = config.outputs(namespace).remove(0).schema_definition(true);
1815
1816        definitions.unwrap().assert_valid_for_event(&event);
1817    }
1818
1819    #[test]
1820    fn matches_schema_legacy() {
1821        let config = JournaldConfig::default();
1822
1823        matches_schema(&config, LogNamespace::Legacy)
1824    }
1825}