vector/sources/
journald.rs

1use std::{
2    collections::{HashMap, HashSet},
3    io::SeekFrom,
4    path::PathBuf,
5    process::Stdio,
6    str::FromStr,
7    sync::{Arc, LazyLock},
8    time::Duration,
9};
10
11use bytes::Bytes;
12use chrono::{TimeZone, Utc};
13use futures::{StreamExt, poll, stream::BoxStream, task::Poll};
14use nix::{
15    sys::signal::{Signal, kill},
16    unistd::Pid,
17};
18use serde_json::{Error as JsonError, Value as JsonValue};
19use snafu::{ResultExt, Snafu};
20use tokio::{
21    fs::{File, OpenOptions},
22    io::{self, AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
23    process::{Child, Command},
24    sync::{Mutex, MutexGuard},
25    time::sleep,
26};
27use tokio_util::codec::FramedRead;
28use vector_lib::{
29    EstimatedJsonEncodedSizeOf,
30    codecs::{CharacterDelimitedDecoder, decoding::BoxedFramingError},
31    config::{LegacyKey, LogNamespace},
32    configurable::configurable_component,
33    finalizer::OrderedFinalizer,
34    internal_event::{
35        ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol, Registered,
36    },
37    lookup::{metadata_path, owned_value_path, path},
38    schema::Definition,
39};
40use vrl::{
41    event_path,
42    value::{Kind, Value, kind::Collection},
43};
44
45use crate::{
46    SourceSender,
47    config::{
48        DataType, SourceAcknowledgementsConfig, SourceConfig, SourceContext, SourceOutput,
49        log_schema,
50    },
51    event::{BatchNotifier, BatchStatus, BatchStatusReceiver, LogEvent},
52    internal_events::{
53        EventsReceived, JournaldCheckpointFileOpenError, JournaldCheckpointSetError,
54        JournaldInvalidRecordError, JournaldReadError, JournaldStartJournalctlError,
55        StreamClosedError,
56    },
57    serde::bool_or_struct,
58    shutdown::ShutdownSignal,
59};
60
61const BATCH_TIMEOUT: Duration = Duration::from_millis(10);
62
63const CHECKPOINT_FILENAME: &str = "checkpoint.txt";
64const CURSOR: &str = "__CURSOR";
65const HOSTNAME: &str = "_HOSTNAME";
66const MESSAGE: &str = "MESSAGE";
67const SYSTEMD_UNIT: &str = "_SYSTEMD_UNIT";
68const SOURCE_TIMESTAMP: &str = "_SOURCE_REALTIME_TIMESTAMP";
69const RECEIVED_TIMESTAMP: &str = "__REALTIME_TIMESTAMP";
70
71const BACKOFF_DURATION: Duration = Duration::from_secs(1);
72
73static JOURNALCTL: LazyLock<PathBuf> = LazyLock::new(|| "journalctl".into());
74
75#[derive(Debug, Snafu)]
76enum BuildError {
77    #[snafu(display("journalctl failed to execute: {}", source))]
78    JournalctlSpawn { source: io::Error },
79    #[snafu(display(
80        "The unit {:?} is duplicated in both include_units and exclude_units",
81        unit
82    ))]
83    DuplicatedUnit { unit: String },
84    #[snafu(display(
85        "The Journal field/value pair {:?}:{:?} is duplicated in both include_matches and exclude_matches.",
86        field,
87        value,
88    ))]
89    DuplicatedMatches { field: String, value: String },
90}
91
92type Matches = HashMap<String, HashSet<String>>;
93
94/// Configuration for the `journald` source.
95#[configurable_component(source("journald", "Collect logs from JournalD."))]
96#[derive(Clone, Debug)]
97#[serde(deny_unknown_fields)]
98pub struct JournaldConfig {
99    /// Only include entries that appended to the journal after the entries have been read.
100    #[serde(default)]
101    pub since_now: bool,
102
103    /// Only include entries that occurred after the current boot of the system.
104    #[serde(default = "crate::serde::default_true")]
105    pub current_boot_only: bool,
106
107    /// A list of unit names to monitor.
108    ///
109    /// If empty or not present, all units are accepted.
110    ///
111    /// Unit names lacking a `.` have `.service` appended to make them a valid service unit name.
112    #[serde(default)]
113    #[configurable(metadata(docs::examples = "ntpd", docs::examples = "sysinit.target"))]
114    pub include_units: Vec<String>,
115
116    /// A list of unit names to exclude from monitoring.
117    ///
118    /// Unit names lacking a `.` have `.service` appended to make them a valid service unit
119    /// name.
120    #[serde(default)]
121    #[configurable(metadata(docs::examples = "badservice", docs::examples = "sysinit.target"))]
122    pub exclude_units: Vec<String>,
123
124    /// A list of sets of field/value pairs to monitor.
125    ///
126    /// If empty or not present, all journal fields are accepted.
127    ///
128    /// If `include_units` is specified, it is merged into this list.
129    #[serde(default)]
130    #[configurable(metadata(
131        docs::additional_props_description = "The set of field values to match in journal entries that are to be included."
132    ))]
133    #[configurable(metadata(docs::examples = "matches_examples()"))]
134    pub include_matches: Matches,
135
136    /// A list of sets of field/value pairs that, if any are present in a journal entry,
137    /// excludes the entry from this source.
138    ///
139    /// If `exclude_units` is specified, it is merged into this list.
140    #[serde(default)]
141    #[configurable(metadata(
142        docs::additional_props_description = "The set of field values to match in journal entries that are to be excluded."
143    ))]
144    #[configurable(metadata(docs::examples = "matches_examples()"))]
145    pub exclude_matches: Matches,
146
147    /// The directory used to persist file checkpoint positions.
148    ///
149    /// By default, the [global `data_dir` option][global_data_dir] is used.
150    /// Make sure the running user has write permissions to this directory.
151    ///
152    /// If this directory is specified, then Vector will attempt to create it.
153    ///
154    /// [global_data_dir]: https://vector.dev/docs/reference/configuration/global-options/#data_dir
155    #[serde(default)]
156    #[configurable(metadata(docs::examples = "/var/lib/vector"))]
157    #[configurable(metadata(docs::human_name = "Data Directory"))]
158    pub data_dir: Option<PathBuf>,
159
160    /// A list of extra command line arguments to pass to `journalctl`.
161    ///
162    /// If specified, it is merged to the command line arguments as-is.
163    #[serde(default)]
164    #[configurable(metadata(docs::examples = "--merge"))]
165    pub extra_args: Vec<String>,
166
167    /// The systemd journal is read in batches, and a checkpoint is set at the end of each batch.
168    ///
169    /// This option limits the size of the batch.
170    #[serde(default = "default_batch_size")]
171    #[configurable(metadata(docs::type_unit = "events"))]
172    pub batch_size: usize,
173
174    /// The full path of the `journalctl` executable.
175    ///
176    /// If not set, a search is done for the `journalctl` path.
177    #[serde(default)]
178    pub journalctl_path: Option<PathBuf>,
179
180    /// The full path of the journal directory.
181    ///
182    /// If not set, `journalctl` uses the default system journal path.
183    #[serde(default)]
184    pub journal_directory: Option<PathBuf>,
185
186    /// The [journal namespace][journal-namespace].
187    ///
188    /// This value is passed to `journalctl` through the [`--namespace` option][journalctl-namespace-option].
189    /// If not set, `journalctl` uses the default namespace.
190    ///
191    /// [journal-namespace]: https://www.freedesktop.org/software/systemd/man/systemd-journald.service.html#Journal%20Namespaces
192    /// [journalctl-namespace-option]: https://www.freedesktop.org/software/systemd/man/journalctl.html#--namespace=NAMESPACE
193    #[serde(default)]
194    pub journal_namespace: Option<String>,
195
196    #[configurable(derived)]
197    #[serde(default, deserialize_with = "bool_or_struct")]
198    acknowledgements: SourceAcknowledgementsConfig,
199
200    /// Enables remapping the `PRIORITY` field from an integer to string value.
201    ///
202    /// Has no effect unless the value of the field is already an integer.
203    #[serde(default)]
204    #[configurable(
205        deprecated = "This option has been deprecated, use the `remap` transform and `to_syslog_level` function instead."
206    )]
207    remap_priority: bool,
208
209    /// The namespace to use for logs. This overrides the global setting.
210    #[configurable(metadata(docs::hidden))]
211    #[serde(default)]
212    log_namespace: Option<bool>,
213
214    /// Whether to emit the [__CURSOR field][cursor]. See also [sd_journal_get_cursor][get_cursor].
215    ///
216    /// [cursor]: https://www.freedesktop.org/software/systemd/man/latest/systemd.journal-fields.html#Address%20Fields
217    /// [get_cursor]: https://www.freedesktop.org/software/systemd/man/latest/sd_journal_get_cursor.html
218    #[serde(default = "crate::serde::default_false")]
219    emit_cursor: bool,
220}
221
222const fn default_batch_size() -> usize {
223    16
224}
225
226fn matches_examples() -> HashMap<String, Vec<String>> {
227    HashMap::<_, _>::from_iter([
228        (
229            "_SYSTEMD_UNIT".to_owned(),
230            vec!["sshd.service".to_owned(), "ntpd.service".to_owned()],
231        ),
232        ("_TRANSPORT".to_owned(), vec!["kernel".to_owned()]),
233    ])
234}
235
236impl JournaldConfig {
237    fn merged_include_matches(&self) -> Matches {
238        Self::merge_units(&self.include_matches, &self.include_units)
239    }
240
241    fn merged_exclude_matches(&self) -> Matches {
242        Self::merge_units(&self.exclude_matches, &self.exclude_units)
243    }
244
245    fn merge_units(matches: &Matches, units: &[String]) -> Matches {
246        let mut matches = matches.clone();
247        for unit in units {
248            let entry = matches.entry(String::from(SYSTEMD_UNIT));
249            entry.or_default().insert(fixup_unit(unit));
250        }
251        matches
252    }
253
254    /// Builds the `schema::Definition` for this source using the provided `LogNamespace`.
255    fn schema_definition(&self, log_namespace: LogNamespace) -> Definition {
256        let schema_definition = match log_namespace {
257            LogNamespace::Vector => Definition::new_with_default_metadata(
258                Kind::bytes().or_null(),
259                [LogNamespace::Vector],
260            ),
261            LogNamespace::Legacy => Definition::new_with_default_metadata(
262                Kind::object(Collection::empty()),
263                [LogNamespace::Legacy],
264            ),
265        };
266
267        let mut schema_definition = schema_definition
268            .with_standard_vector_source_metadata()
269            // for metadata that is added to the events dynamically through the Record
270            .with_source_metadata(
271                JournaldConfig::NAME,
272                None,
273                &owned_value_path!("metadata"),
274                Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
275                None,
276            )
277            .with_source_metadata(
278                JournaldConfig::NAME,
279                None,
280                &owned_value_path!("timestamp"),
281                Kind::timestamp().or_undefined(),
282                Some("timestamp"),
283            )
284            .with_source_metadata(
285                JournaldConfig::NAME,
286                log_schema().host_key().cloned().map(LegacyKey::Overwrite),
287                &owned_value_path!("host"),
288                Kind::bytes().or_undefined(),
289                Some("host"),
290            );
291
292        // for metadata that is added to the events dynamically through the Record
293        if log_namespace == LogNamespace::Legacy {
294            schema_definition = schema_definition.unknown_fields(Kind::bytes());
295        }
296
297        schema_definition
298    }
299}
300
301impl Default for JournaldConfig {
302    fn default() -> Self {
303        Self {
304            since_now: false,
305            current_boot_only: true,
306            include_units: vec![],
307            exclude_units: vec![],
308            include_matches: Default::default(),
309            exclude_matches: Default::default(),
310            data_dir: None,
311            batch_size: default_batch_size(),
312            journalctl_path: None,
313            journal_directory: None,
314            journal_namespace: None,
315            extra_args: vec![],
316            acknowledgements: Default::default(),
317            remap_priority: false,
318            log_namespace: None,
319            emit_cursor: false,
320        }
321    }
322}
323
324impl_generate_config_from_default!(JournaldConfig);
325
326type Record = HashMap<String, String>;
327
328#[async_trait::async_trait]
329#[typetag::serde(name = "journald")]
330impl SourceConfig for JournaldConfig {
331    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
332        if self.remap_priority {
333            warn!(
334                "DEPRECATION, option `remap_priority` has been deprecated. Please use the `remap` transform and function `to_syslog_level` instead."
335            );
336        }
337
338        let data_dir = cx
339            .globals
340            // source are only global, name can be used for subdir
341            .resolve_and_make_data_subdir(self.data_dir.as_ref(), cx.key.id())?;
342
343        if let Some(unit) = self
344            .include_units
345            .iter()
346            .find(|unit| self.exclude_units.contains(unit))
347        {
348            let unit = unit.into();
349            return Err(BuildError::DuplicatedUnit { unit }.into());
350        }
351
352        let include_matches = self.merged_include_matches();
353        let exclude_matches = self.merged_exclude_matches();
354
355        if let Some((field, value)) = find_duplicate_match(&include_matches, &exclude_matches) {
356            return Err(BuildError::DuplicatedMatches { field, value }.into());
357        }
358
359        let mut checkpoint_path = data_dir;
360        checkpoint_path.push(CHECKPOINT_FILENAME);
361
362        let journalctl_path = self
363            .journalctl_path
364            .clone()
365            .unwrap_or_else(|| JOURNALCTL.clone());
366
367        let starter = StartJournalctl::new(
368            journalctl_path,
369            self.journal_directory.clone(),
370            self.journal_namespace.clone(),
371            self.current_boot_only,
372            self.since_now,
373            self.extra_args.clone(),
374        );
375
376        let batch_size = self.batch_size;
377        let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
378        let log_namespace = cx.log_namespace(self.log_namespace);
379
380        Ok(Box::pin(
381            JournaldSource {
382                include_matches,
383                exclude_matches,
384                checkpoint_path,
385                batch_size,
386                remap_priority: self.remap_priority,
387                out: cx.out,
388                acknowledgements,
389                starter,
390                log_namespace,
391                emit_cursor: self.emit_cursor,
392            }
393            .run_shutdown(cx.shutdown),
394        ))
395    }
396
397    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
398        let schema_definition =
399            self.schema_definition(global_log_namespace.merge(self.log_namespace));
400
401        vec![SourceOutput::new_maybe_logs(
402            DataType::Log,
403            schema_definition,
404        )]
405    }
406
407    fn can_acknowledge(&self) -> bool {
408        true
409    }
410}
411
412struct JournaldSource {
413    include_matches: Matches,
414    exclude_matches: Matches,
415    checkpoint_path: PathBuf,
416    batch_size: usize,
417    remap_priority: bool,
418    out: SourceSender,
419    acknowledgements: bool,
420    starter: StartJournalctl,
421    log_namespace: LogNamespace,
422    emit_cursor: bool,
423}
424
425impl JournaldSource {
426    async fn run_shutdown(self, shutdown: ShutdownSignal) -> Result<(), ()> {
427        let checkpointer = StatefulCheckpointer::new(self.checkpoint_path.clone())
428            .await
429            .map_err(|error| {
430                emit!(JournaldCheckpointFileOpenError {
431                    error,
432                    path: self
433                        .checkpoint_path
434                        .to_str()
435                        .unwrap_or("unknown")
436                        .to_string(),
437                });
438            })?;
439
440        let checkpointer = SharedCheckpointer::new(checkpointer);
441        let finalizer = Finalizer::new(
442            self.acknowledgements,
443            checkpointer.clone(),
444            shutdown.clone(),
445        );
446
447        self.run(checkpointer, finalizer, shutdown).await;
448
449        Ok(())
450    }
451
452    async fn run(
453        mut self,
454        checkpointer: SharedCheckpointer,
455        finalizer: Finalizer,
456        mut shutdown: ShutdownSignal,
457    ) {
458        loop {
459            if matches!(poll!(&mut shutdown), Poll::Ready(_)) {
460                break;
461            }
462
463            info!("Starting journalctl.");
464            let cursor = checkpointer.lock().await.cursor.clone();
465            match self.starter.start(cursor.as_deref()) {
466                Ok((stdout_stream, stderr_stream, running)) => {
467                    if !self
468                        .run_stream(stdout_stream, stderr_stream, &finalizer, shutdown.clone())
469                        .await
470                    {
471                        return;
472                    }
473                    // Explicit drop to ensure it isn't dropped earlier.
474                    drop(running);
475                }
476                Err(error) => {
477                    emit!(JournaldStartJournalctlError { error });
478                }
479            }
480
481            // journalctl process should never stop,
482            // so it is an error if we reach here.
483            tokio::select! {
484                _ = &mut shutdown => break,
485                _ = sleep(BACKOFF_DURATION) => (),
486            }
487        }
488    }
489
490    /// Process `journalctl` output until some error occurs.
491    /// Return `true` if should restart `journalctl`.
492    async fn run_stream<'a>(
493        &'a mut self,
494        mut stdout_stream: JournalStream,
495        stderr_stream: JournalStream,
496        finalizer: &'a Finalizer,
497        mut shutdown: ShutdownSignal,
498    ) -> bool {
499        let bytes_received = register!(BytesReceived::from(Protocol::from("journald")));
500        let events_received = register!(EventsReceived);
501
502        // Spawn stderr handler task
503        let stderr_handler = tokio::spawn(Self::handle_stderr(stderr_stream));
504
505        let batch_size = self.batch_size;
506        let result = loop {
507            let mut batch = Batch::new(self);
508
509            // Start the timeout counter only once we have received a
510            // valid and non-filtered event.
511            while batch.events.is_empty() {
512                let item = tokio::select! {
513                    _ = &mut shutdown => {
514                        stderr_handler.abort();
515                        return false;
516                    },
517                    item = stdout_stream.next() => item,
518                };
519                if !batch.handle_next(item) {
520                    stderr_handler.abort();
521                    return true;
522                }
523            }
524
525            let timeout = tokio::time::sleep(BATCH_TIMEOUT);
526            tokio::pin!(timeout);
527
528            for _ in 1..batch_size {
529                tokio::select! {
530                    _ = &mut timeout => break,
531                    result = stdout_stream.next() => if !batch.handle_next(result) {
532                        break;
533                    }
534                }
535            }
536            if let Some(x) = batch
537                .finish(finalizer, &bytes_received, &events_received)
538                .await
539            {
540                break x;
541            }
542        };
543
544        stderr_handler.abort();
545        result
546    }
547
548    /// Handle stderr stream from journalctl process
549    async fn handle_stderr(mut stderr_stream: JournalStream) {
550        while let Some(result) = stderr_stream.next().await {
551            match result {
552                Ok(line) => {
553                    let line_str = String::from_utf8_lossy(&line);
554                    let trimmed = line_str.trim();
555                    if !trimmed.is_empty() {
556                        warn!("Warning journalctl stderr: {trimmed}");
557                    }
558                }
559                Err(err) => {
560                    error!("Error reading journalctl stderr: {err}");
561                    break;
562                }
563            }
564        }
565    }
566}
567
568struct Batch<'a> {
569    events: Vec<LogEvent>,
570    record_size: usize,
571    exiting: Option<bool>,
572    batch: Option<BatchNotifier>,
573    receiver: Option<BatchStatusReceiver>,
574    source: &'a mut JournaldSource,
575    cursor: Option<String>,
576}
577
578impl<'a> Batch<'a> {
579    fn new(source: &'a mut JournaldSource) -> Self {
580        let (batch, receiver) = BatchNotifier::maybe_new_with_receiver(source.acknowledgements);
581        Self {
582            events: Vec::new(),
583            record_size: 0,
584            exiting: None,
585            batch,
586            receiver,
587            source,
588            cursor: None,
589        }
590    }
591
592    fn handle_next(&mut self, result: Option<Result<Bytes, BoxedFramingError>>) -> bool {
593        match result {
594            None => {
595                warn!("Journalctl process stopped.");
596                self.exiting = Some(true);
597                false
598            }
599            Some(Err(error)) => {
600                emit!(JournaldReadError { error });
601                false
602            }
603            Some(Ok(bytes)) => {
604                match decode_record(&bytes, self.source.remap_priority) {
605                    Ok(mut record) => {
606                        if self.source.emit_cursor {
607                            if let Some(tmp) = record.get(CURSOR) {
608                                self.cursor = Some(tmp.clone());
609                            }
610                        } else if let Some(tmp) = record.remove(CURSOR) {
611                            self.cursor = Some(tmp);
612                        }
613
614                        if !filter_matches(
615                            &record,
616                            &self.source.include_matches,
617                            &self.source.exclude_matches,
618                        ) {
619                            self.record_size += bytes.len();
620
621                            let mut event = create_log_event_from_record(
622                                record,
623                                &self.batch,
624                                self.source.log_namespace,
625                            );
626
627                            enrich_log_event(&mut event, self.source.log_namespace);
628
629                            self.events.push(event);
630                        }
631                    }
632                    Err(error) => {
633                        emit!(JournaldInvalidRecordError {
634                            error,
635                            text: String::from_utf8_lossy(&bytes).into_owned()
636                        });
637                    }
638                }
639                true
640            }
641        }
642    }
643
644    async fn finish(
645        mut self,
646        finalizer: &Finalizer,
647        bytes_received: &'a Registered<BytesReceived>,
648        events_received: &'a Registered<EventsReceived>,
649    ) -> Option<bool> {
650        drop(self.batch);
651
652        if self.record_size > 0 {
653            bytes_received.emit(ByteSize(self.record_size));
654        }
655
656        if !self.events.is_empty() {
657            let count = self.events.len();
658            let byte_size = self.events.estimated_json_encoded_size_of();
659            events_received.emit(CountByteSize(count, byte_size));
660
661            match self.source.out.send_batch(self.events).await {
662                Ok(_) => {
663                    if let Some(cursor) = self.cursor {
664                        finalizer.finalize(cursor, self.receiver).await;
665                    }
666                }
667                Err(_) => {
668                    emit!(StreamClosedError { count });
669                    // `out` channel is closed, don't restart journalctl.
670                    self.exiting = Some(false);
671                }
672            }
673        }
674        self.exiting
675    }
676}
677
678type JournalStream = BoxStream<'static, Result<Bytes, BoxedFramingError>>;
679
680struct StartJournalctl {
681    path: PathBuf,
682    journal_dir: Option<PathBuf>,
683    journal_namespace: Option<String>,
684    current_boot_only: bool,
685    since_now: bool,
686    extra_args: Vec<String>,
687}
688
689impl StartJournalctl {
690    const fn new(
691        path: PathBuf,
692        journal_dir: Option<PathBuf>,
693        journal_namespace: Option<String>,
694        current_boot_only: bool,
695        since_now: bool,
696        extra_args: Vec<String>,
697    ) -> Self {
698        Self {
699            path,
700            journal_dir,
701            journal_namespace,
702            current_boot_only,
703            since_now,
704            extra_args,
705        }
706    }
707
708    fn make_command(&self, checkpoint: Option<&str>) -> Command {
709        let mut command = Command::new(&self.path);
710        command.stdout(Stdio::piped());
711        command.stderr(Stdio::piped());
712        command.arg("--follow");
713        command.arg("--all");
714        command.arg("--show-cursor");
715        command.arg("--output=json");
716
717        if let Some(dir) = &self.journal_dir {
718            command.arg(format!("--directory={}", dir.display()));
719        }
720
721        if let Some(namespace) = &self.journal_namespace {
722            command.arg(format!("--namespace={namespace}"));
723        }
724
725        if self.current_boot_only {
726            command.arg("--boot");
727        }
728
729        if let Some(cursor) = checkpoint {
730            command.arg(format!("--after-cursor={cursor}"));
731        } else if self.since_now {
732            command.arg("--since=now");
733        } else {
734            // journalctl --follow only outputs a few lines without a starting point
735            command.arg("--since=2000-01-01");
736        }
737
738        if !self.extra_args.is_empty() {
739            command.args(&self.extra_args);
740        }
741
742        command
743    }
744
745    fn start(
746        &mut self,
747        checkpoint: Option<&str>,
748    ) -> crate::Result<(JournalStream, JournalStream, RunningJournalctl)> {
749        let mut command = self.make_command(checkpoint);
750
751        let mut child = command.spawn().context(JournalctlSpawnSnafu)?;
752
753        let stdout_stream = FramedRead::new(
754            child.stdout.take().unwrap(),
755            CharacterDelimitedDecoder::new(b'\n'),
756        );
757
758        let stderr = child.stderr.take().unwrap();
759        let stderr_stream = FramedRead::new(stderr, CharacterDelimitedDecoder::new(b'\n'));
760
761        Ok((
762            stdout_stream.boxed(),
763            stderr_stream.boxed(),
764            RunningJournalctl(child),
765        ))
766    }
767}
768
769struct RunningJournalctl(Child);
770
771impl Drop for RunningJournalctl {
772    fn drop(&mut self) {
773        if let Some(pid) = self.0.id().and_then(|pid| pid.try_into().ok()) {
774            _ = kill(Pid::from_raw(pid), Signal::SIGTERM);
775        }
776    }
777}
778
779fn enrich_log_event(log: &mut LogEvent, log_namespace: LogNamespace) {
780    match log_namespace {
781        LogNamespace::Vector => {
782            if let Some(host) = log
783                .get(metadata_path!(JournaldConfig::NAME, "metadata"))
784                .and_then(|meta| meta.get(HOSTNAME))
785            {
786                log.insert(metadata_path!(JournaldConfig::NAME, "host"), host.clone());
787            }
788        }
789        LogNamespace::Legacy => {
790            if let Some(host) = log.remove(event_path!(HOSTNAME)) {
791                log_namespace.insert_source_metadata(
792                    JournaldConfig::NAME,
793                    log,
794                    log_schema().host_key().map(LegacyKey::Overwrite),
795                    path!("host"),
796                    host,
797                );
798            }
799        }
800    }
801
802    // Create a Utc timestamp from an existing log field if present.
803    let timestamp_value = match log_namespace {
804        LogNamespace::Vector => log
805            .get(metadata_path!(JournaldConfig::NAME, "metadata"))
806            .and_then(|meta| {
807                meta.get(SOURCE_TIMESTAMP)
808                    .or_else(|| meta.get(RECEIVED_TIMESTAMP))
809            }),
810        LogNamespace::Legacy => log
811            .get(event_path!(SOURCE_TIMESTAMP))
812            .or_else(|| log.get(event_path!(RECEIVED_TIMESTAMP))),
813    };
814
815    let timestamp = timestamp_value
816        .filter(|&ts| ts.is_bytes())
817        .and_then(|ts| ts.as_str().unwrap().parse::<u64>().ok())
818        .map(|ts| {
819            chrono::Utc
820                .timestamp_opt((ts / 1_000_000) as i64, (ts % 1_000_000) as u32 * 1_000)
821                .single()
822                .expect("invalid timestamp")
823        });
824
825    // Add timestamp.
826    match log_namespace {
827        LogNamespace::Vector => {
828            log.insert(metadata_path!("vector", "ingest_timestamp"), Utc::now());
829
830            if let Some(ts) = timestamp {
831                log.insert(metadata_path!(JournaldConfig::NAME, "timestamp"), ts);
832            }
833        }
834        LogNamespace::Legacy => {
835            if let Some(ts) = timestamp {
836                log.maybe_insert(log_schema().timestamp_key_target_path(), ts);
837            }
838        }
839    }
840
841    // Add source type.
842    log_namespace.insert_vector_metadata(
843        log,
844        log_schema().source_type_key(),
845        path!("source_type"),
846        JournaldConfig::NAME,
847    );
848}
849
850fn create_log_event_from_record(
851    mut record: Record,
852    batch: &Option<BatchNotifier>,
853    log_namespace: LogNamespace,
854) -> LogEvent {
855    match log_namespace {
856        LogNamespace::Vector => {
857            let message_value = record
858                .remove(MESSAGE)
859                .map(|msg| Value::Bytes(Bytes::from(msg)))
860                .unwrap_or(Value::Null);
861
862            let mut log = LogEvent::from(message_value).with_batch_notifier_option(batch);
863
864            // Add the remaining fields from the Record to the log event into an object to avoid collisions.
865            record.iter().for_each(|(key, value)| {
866                log.metadata_mut()
867                    .value_mut()
868                    .insert(path!(JournaldConfig::NAME, "metadata", key), value.as_str());
869            });
870
871            log
872        }
873        LogNamespace::Legacy => {
874            let mut log = LogEvent::from_iter(record).with_batch_notifier_option(batch);
875
876            if let Some(message) = log.remove(event_path!(MESSAGE)) {
877                log.maybe_insert(log_schema().message_key_target_path(), message);
878            }
879
880            log
881        }
882    }
883}
884
885/// Map the given unit name into a valid systemd unit
886/// by appending ".service" if no extension is present.
887fn fixup_unit(unit: &str) -> String {
888    if unit.contains('.') {
889        unit.into()
890    } else {
891        format!("{unit}.service")
892    }
893}
894
895fn decode_record(line: &[u8], remap: bool) -> Result<Record, JsonError> {
896    let mut record = serde_json::from_str::<JsonValue>(&String::from_utf8_lossy(line))?;
897    // journalctl will output non-ASCII values using an array
898    // of integers. Look for those values and re-parse them.
899    if let Some(record) = record.as_object_mut() {
900        for (_, value) in record.iter_mut().filter(|(_, v)| v.is_array()) {
901            *value = decode_array(value.as_array().expect("already validated"));
902        }
903    }
904    if remap {
905        record.get_mut("PRIORITY").map(remap_priority);
906    }
907    serde_json::from_value(record)
908}
909
910fn decode_array(array: &[JsonValue]) -> JsonValue {
911    decode_array_as_bytes(array).unwrap_or_else(|| {
912        let ser = serde_json::to_string(array).expect("already deserialized");
913        JsonValue::String(ser)
914    })
915}
916
917fn decode_array_as_bytes(array: &[JsonValue]) -> Option<JsonValue> {
918    // From the array of values, turn all the numbers into bytes, and
919    // then the bytes into a string, but return None if any value in the
920    // array was not a valid byte.
921    array
922        .iter()
923        .map(|item| {
924            item.as_u64().and_then(|num| match num {
925                num if num <= u8::MAX as u64 => Some(num as u8),
926                _ => None,
927            })
928        })
929        .collect::<Option<Vec<u8>>>()
930        .map(|array| String::from_utf8_lossy(&array).into())
931}
932
933fn remap_priority(priority: &mut JsonValue) {
934    if let Some(num) = priority.as_str().and_then(|s| usize::from_str(s).ok()) {
935        let text = match num {
936            0 => "EMERG",
937            1 => "ALERT",
938            2 => "CRIT",
939            3 => "ERR",
940            4 => "WARNING",
941            5 => "NOTICE",
942            6 => "INFO",
943            7 => "DEBUG",
944            _ => "UNKNOWN",
945        };
946        *priority = JsonValue::String(text.into());
947    }
948}
949
950fn filter_matches(record: &Record, includes: &Matches, excludes: &Matches) -> bool {
951    match (includes.is_empty(), excludes.is_empty()) {
952        (true, true) => false,
953        (false, true) => !contains_match(record, includes),
954        (true, false) => contains_match(record, excludes),
955        (false, false) => !contains_match(record, includes) || contains_match(record, excludes),
956    }
957}
958
959fn contains_match(record: &Record, matches: &Matches) -> bool {
960    let f = move |(field, value)| {
961        matches
962            .get(field)
963            .map(|x| x.contains(value))
964            .unwrap_or(false)
965    };
966    record.iter().any(f)
967}
968
969fn find_duplicate_match(a_matches: &Matches, b_matches: &Matches) -> Option<(String, String)> {
970    for (a_key, a_values) in a_matches {
971        if let Some(b_values) = b_matches.get(a_key.as_str()) {
972            for (a, b) in a_values
973                .iter()
974                .flat_map(|x| std::iter::repeat(x).zip(b_values.iter()))
975            {
976                if a == b {
977                    return Some((a_key.into(), b.into()));
978                }
979            }
980        }
981    }
982    None
983}
984
985enum Finalizer {
986    Sync(SharedCheckpointer),
987    Async(OrderedFinalizer<String>),
988}
989
990impl Finalizer {
991    fn new(
992        acknowledgements: bool,
993        checkpointer: SharedCheckpointer,
994        shutdown: ShutdownSignal,
995    ) -> Self {
996        if acknowledgements {
997            let (finalizer, mut ack_stream) = OrderedFinalizer::new(Some(shutdown));
998            tokio::spawn(async move {
999                while let Some((status, cursor)) = ack_stream.next().await {
1000                    if status == BatchStatus::Delivered {
1001                        checkpointer.lock().await.set(cursor).await;
1002                    }
1003                }
1004            });
1005            Self::Async(finalizer)
1006        } else {
1007            Self::Sync(checkpointer)
1008        }
1009    }
1010
1011    async fn finalize(&self, cursor: String, receiver: Option<BatchStatusReceiver>) {
1012        match (self, receiver) {
1013            (Self::Sync(checkpointer), None) => checkpointer.lock().await.set(cursor).await,
1014            (Self::Async(finalizer), Some(receiver)) => finalizer.add(cursor, receiver),
1015            _ => {
1016                unreachable!("Cannot have async finalization without a receiver in journald source")
1017            }
1018        }
1019    }
1020}
1021
1022struct Checkpointer {
1023    file: File,
1024    filename: PathBuf,
1025}
1026
1027impl Checkpointer {
1028    async fn new(filename: PathBuf) -> Result<Self, io::Error> {
1029        let file = OpenOptions::new()
1030            .read(true)
1031            .write(true)
1032            .create(true)
1033            .truncate(false)
1034            .open(&filename)
1035            .await?;
1036        Ok(Checkpointer { file, filename })
1037    }
1038
1039    async fn set(&mut self, token: &str) -> Result<(), io::Error> {
1040        self.file.seek(SeekFrom::Start(0)).await?;
1041        self.file.write_all(format!("{token}\n").as_bytes()).await
1042    }
1043
1044    async fn get(&mut self) -> Result<Option<String>, io::Error> {
1045        let mut buf = Vec::<u8>::new();
1046        self.file.seek(SeekFrom::Start(0)).await?;
1047        self.file.read_to_end(&mut buf).await?;
1048        match buf.len() {
1049            0 => Ok(None),
1050            _ => {
1051                let text = String::from_utf8_lossy(&buf);
1052                match text.find('\n') {
1053                    Some(nl) => Ok(Some(String::from(&text[..nl]))),
1054                    None => Ok(None), // Maybe return an error?
1055                }
1056            }
1057        }
1058    }
1059}
1060
1061struct StatefulCheckpointer {
1062    checkpointer: Checkpointer,
1063    cursor: Option<String>,
1064}
1065
1066impl StatefulCheckpointer {
1067    async fn new(filename: PathBuf) -> Result<Self, io::Error> {
1068        let mut checkpointer = Checkpointer::new(filename).await?;
1069        let cursor = checkpointer.get().await?;
1070        Ok(Self {
1071            checkpointer,
1072            cursor,
1073        })
1074    }
1075
1076    async fn set(&mut self, token: impl Into<String>) {
1077        let token = token.into();
1078        if let Err(error) = self.checkpointer.set(&token).await {
1079            emit!(JournaldCheckpointSetError {
1080                error,
1081                filename: self
1082                    .checkpointer
1083                    .filename
1084                    .to_str()
1085                    .unwrap_or("unknown")
1086                    .to_string(),
1087            });
1088        }
1089        self.cursor = Some(token);
1090    }
1091}
1092
1093#[derive(Clone)]
1094struct SharedCheckpointer(Arc<Mutex<StatefulCheckpointer>>);
1095
1096impl SharedCheckpointer {
1097    fn new(c: StatefulCheckpointer) -> Self {
1098        Self(Arc::new(Mutex::new(c)))
1099    }
1100
1101    async fn lock(&self) -> MutexGuard<'_, StatefulCheckpointer> {
1102        self.0.lock().await
1103    }
1104}
1105
1106#[cfg(test)]
1107mod checkpointer_tests {
1108    use tempfile::tempdir;
1109    use tokio::fs::read_to_string;
1110
1111    use super::*;
1112
1113    #[test]
1114    fn generate_config() {
1115        crate::test_util::test_generate_config::<JournaldConfig>();
1116    }
1117
1118    #[tokio::test]
1119    async fn journald_checkpointer_works() {
1120        let tempdir = tempdir().unwrap();
1121        let mut filename = tempdir.path().to_path_buf();
1122        filename.push(CHECKPOINT_FILENAME);
1123        let mut checkpointer = Checkpointer::new(filename.clone())
1124            .await
1125            .expect("Creating checkpointer failed!");
1126
1127        assert!(checkpointer.get().await.unwrap().is_none());
1128
1129        checkpointer
1130            .set("first test")
1131            .await
1132            .expect("Setting checkpoint failed");
1133        assert_eq!(checkpointer.get().await.unwrap().unwrap(), "first test");
1134        let contents = read_to_string(filename.clone())
1135            .await
1136            .unwrap_or_else(|_| panic!("Failed to read: {filename:?}"));
1137        assert!(contents.starts_with("first test\n"));
1138
1139        checkpointer
1140            .set("second")
1141            .await
1142            .expect("Setting checkpoint failed");
1143        assert_eq!(checkpointer.get().await.unwrap().unwrap(), "second");
1144        let contents = read_to_string(filename.clone())
1145            .await
1146            .unwrap_or_else(|_| panic!("Failed to read: {filename:?}"));
1147        assert!(contents.starts_with("second\n"));
1148    }
1149}
1150
1151#[cfg(test)]
1152mod tests {
1153    use std::{fs, path::Path};
1154
1155    use tempfile::tempdir;
1156    use tokio::time::{Duration, Instant, sleep, timeout};
1157    use vrl::value::{Value, kind::Collection};
1158
1159    use super::*;
1160    use crate::{
1161        config::ComponentKey,
1162        event::{Event, EventStatus},
1163        test_util::components::assert_source_compliance,
1164    };
1165
1166    const TEST_COMPONENT: &str = "journald-test";
1167    const TEST_JOURNALCTL: &str = "tests/data/journalctl";
1168
1169    async fn run_with_units(iunits: &[&str], xunits: &[&str], cursor: Option<&str>) -> Vec<Event> {
1170        let include_matches = create_unit_matches(iunits.to_vec());
1171        let exclude_matches = create_unit_matches(xunits.to_vec());
1172        run_journal(include_matches, exclude_matches, cursor, false).await
1173    }
1174
1175    async fn run_journal(
1176        include_matches: Matches,
1177        exclude_matches: Matches,
1178        checkpoint: Option<&str>,
1179        emit_cursor: bool,
1180    ) -> Vec<Event> {
1181        assert_source_compliance(&["protocol"], async move {
1182            let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Delivered);
1183
1184            let tempdir = tempdir().unwrap();
1185            let tempdir = tempdir.path().to_path_buf();
1186
1187            if let Some(cursor) = checkpoint {
1188                let mut checkpoint_path = tempdir.clone();
1189                checkpoint_path.push(TEST_COMPONENT);
1190                fs::create_dir(&checkpoint_path).unwrap();
1191                checkpoint_path.push(CHECKPOINT_FILENAME);
1192
1193                let mut checkpointer = Checkpointer::new(checkpoint_path.clone())
1194                    .await
1195                    .expect("Creating checkpointer failed!");
1196
1197                checkpointer
1198                    .set(cursor)
1199                    .await
1200                    .expect("Could not set checkpoint");
1201            }
1202
1203            let (cx, shutdown) =
1204                SourceContext::new_shutdown(&ComponentKey::from(TEST_COMPONENT), tx);
1205            let config = JournaldConfig {
1206                journalctl_path: Some(TEST_JOURNALCTL.into()),
1207                include_matches,
1208                exclude_matches,
1209                data_dir: Some(tempdir),
1210                remap_priority: true,
1211                acknowledgements: false.into(),
1212                emit_cursor,
1213                ..Default::default()
1214            };
1215            let source = config.build(cx).await.unwrap();
1216            tokio::spawn(async move { source.await.unwrap() });
1217
1218            // Hack: Sleep to ensure journalctl process starts and emits events before shutdown.
1219            sleep(Duration::from_secs(1)).await;
1220            shutdown
1221                .shutdown_all(Some(Instant::now() + Duration::from_secs(1)))
1222                .await;
1223
1224            timeout(Duration::from_secs(1), rx.collect()).await.unwrap()
1225        })
1226        .await
1227    }
1228
1229    fn create_unit_matches<S: Into<String>>(units: Vec<S>) -> Matches {
1230        let units: HashSet<String> = units.into_iter().map(Into::into).collect();
1231        let mut map = HashMap::new();
1232        if !units.is_empty() {
1233            map.insert(String::from(SYSTEMD_UNIT), units);
1234        }
1235        map
1236    }
1237
1238    fn create_matches<S: Into<String>>(conditions: Vec<(S, S)>) -> Matches {
1239        let mut matches: Matches = HashMap::new();
1240        for (field, value) in conditions {
1241            matches
1242                .entry(field.into())
1243                .or_default()
1244                .insert(value.into());
1245        }
1246        matches
1247    }
1248
1249    #[tokio::test]
1250    async fn reads_journal() {
1251        let received = run_with_units(&[], &[], None).await;
1252        assert_eq!(received.len(), 8);
1253        assert_eq!(
1254            message(&received[0]),
1255            Value::Bytes("System Initialization".into())
1256        );
1257        assert_eq!(
1258            received[0].as_log()[log_schema().source_type_key().unwrap().to_string()],
1259            "journald".into()
1260        );
1261        assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140001000));
1262        assert_eq!(priority(&received[0]), Value::Bytes("INFO".into()));
1263        assert_eq!(message(&received[1]), Value::Bytes("unit message".into()));
1264        assert_eq!(timestamp(&received[1]), value_ts(1578529839, 140002000));
1265        assert_eq!(priority(&received[1]), Value::Bytes("DEBUG".into()));
1266    }
1267
1268    #[tokio::test]
1269    async fn includes_units() {
1270        let received = run_with_units(&["unit.service"], &[], None).await;
1271        assert_eq!(received.len(), 1);
1272        assert_eq!(message(&received[0]), Value::Bytes("unit message".into()));
1273    }
1274
1275    #[tokio::test]
1276    async fn excludes_units() {
1277        let received = run_with_units(&[], &["unit.service", "badunit.service"], None).await;
1278        assert_eq!(received.len(), 6);
1279        assert_eq!(
1280            message(&received[0]),
1281            Value::Bytes("System Initialization".into())
1282        );
1283        assert_eq!(
1284            message(&received[1]),
1285            Value::Bytes("Missing timestamp".into())
1286        );
1287        assert_eq!(
1288            message(&received[2]),
1289            Value::Bytes("Different timestamps".into())
1290        );
1291    }
1292
1293    #[tokio::test]
1294    async fn emits_cursor() {
1295        let received = run_journal(Matches::new(), Matches::new(), None, true).await;
1296        assert_eq!(cursor(&received[0]), Value::Bytes("1".into()));
1297        assert_eq!(cursor(&received[3]), Value::Bytes("4".into()));
1298        assert_eq!(cursor(&received[7]), Value::Bytes("8".into()));
1299    }
1300
1301    #[tokio::test]
1302    async fn includes_matches() {
1303        let matches = create_matches(vec![("PRIORITY", "ERR")]);
1304        let received = run_journal(matches, HashMap::new(), None, false).await;
1305        assert_eq!(received.len(), 2);
1306        assert_eq!(
1307            message(&received[0]),
1308            Value::Bytes("Different timestamps".into())
1309        );
1310        assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140005000));
1311        assert_eq!(
1312            message(&received[1]),
1313            Value::Bytes("Non-ASCII in other field".into())
1314        );
1315        assert_eq!(timestamp(&received[1]), value_ts(1578529839, 140005000));
1316    }
1317
1318    #[tokio::test]
1319    async fn includes_kernel() {
1320        let matches = create_matches(vec![("_TRANSPORT", "kernel")]);
1321        let received = run_journal(matches, HashMap::new(), None, false).await;
1322        assert_eq!(received.len(), 1);
1323        assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140006000));
1324        assert_eq!(message(&received[0]), Value::Bytes("audit log".into()));
1325    }
1326
1327    #[tokio::test]
1328    async fn excludes_matches() {
1329        let matches = create_matches(vec![("PRIORITY", "INFO"), ("PRIORITY", "DEBUG")]);
1330        let received = run_journal(HashMap::new(), matches, None, false).await;
1331        assert_eq!(received.len(), 5);
1332        assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140003000));
1333        assert_eq!(timestamp(&received[1]), value_ts(1578529839, 140004000));
1334        assert_eq!(timestamp(&received[2]), value_ts(1578529839, 140005000));
1335        assert_eq!(timestamp(&received[3]), value_ts(1578529839, 140005000));
1336        assert_eq!(timestamp(&received[4]), value_ts(1578529839, 140006000));
1337    }
1338
1339    #[tokio::test]
1340    async fn handles_checkpoint() {
1341        let received = run_with_units(&[], &[], Some("1")).await;
1342        assert_eq!(received.len(), 7);
1343        assert_eq!(message(&received[0]), Value::Bytes("unit message".into()));
1344        assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140002000));
1345    }
1346
1347    #[tokio::test]
1348    async fn parses_array_messages() {
1349        let received = run_with_units(&["badunit.service"], &[], None).await;
1350        assert_eq!(received.len(), 1);
1351        assert_eq!(message(&received[0]), Value::Bytes("¿Hello?".into()));
1352    }
1353
1354    #[tokio::test]
1355    async fn parses_array_fields() {
1356        let received = run_with_units(&["syslog.service"], &[], None).await;
1357        assert_eq!(received.len(), 1);
1358        assert_eq!(
1359            received[0].as_log()["SYSLOG_RAW"],
1360            Value::Bytes("¿World?".into())
1361        );
1362    }
1363
1364    #[tokio::test]
1365    async fn parses_string_sequences() {
1366        let received = run_with_units(&["NetworkManager.service"], &[], None).await;
1367        assert_eq!(received.len(), 1);
1368        assert_eq!(
1369            received[0].as_log()["SYSLOG_FACILITY"],
1370            Value::Bytes(r#"["DHCP4","DHCP6"]"#.into())
1371        );
1372    }
1373
1374    #[tokio::test]
1375    async fn handles_missing_timestamp() {
1376        let received = run_with_units(&["stdout"], &[], None).await;
1377        assert_eq!(received.len(), 2);
1378        assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140004000));
1379        assert_eq!(timestamp(&received[1]), value_ts(1578529839, 140005000));
1380    }
1381
1382    #[tokio::test]
1383    async fn handles_acknowledgements() {
1384        let (tx, mut rx) = SourceSender::new_test();
1385
1386        let tempdir = tempdir().unwrap();
1387        let tempdir = tempdir.path().to_path_buf();
1388        let mut checkpoint_path = tempdir.clone();
1389        checkpoint_path.push(TEST_COMPONENT);
1390        fs::create_dir(&checkpoint_path).unwrap();
1391        checkpoint_path.push(CHECKPOINT_FILENAME);
1392
1393        let mut checkpointer = Checkpointer::new(checkpoint_path.clone())
1394            .await
1395            .expect("Creating checkpointer failed!");
1396
1397        let config = JournaldConfig {
1398            journalctl_path: Some(TEST_JOURNALCTL.into()),
1399            data_dir: Some(tempdir),
1400            remap_priority: true,
1401            acknowledgements: true.into(),
1402            ..Default::default()
1403        };
1404        let (cx, _shutdown) = SourceContext::new_shutdown(&ComponentKey::from(TEST_COMPONENT), tx);
1405        let source = config.build(cx).await.unwrap();
1406        tokio::spawn(async move { source.await.unwrap() });
1407
1408        // Make sure the checkpointer cursor is empty
1409        assert_eq!(checkpointer.get().await.unwrap(), None);
1410
1411        // Hack: Sleep to ensure journalctl process starts and emits events.
1412        sleep(Duration::from_secs(1)).await;
1413
1414        // Acknowledge all the received events.
1415        let mut count = 0;
1416        while let Poll::Ready(Some(event)) = futures::poll!(rx.next()) {
1417            // The checkpointer shouldn't set the cursor until the end of the batch.
1418            assert_eq!(checkpointer.get().await.unwrap(), None);
1419            event.metadata().update_status(EventStatus::Delivered);
1420            count += 1;
1421        }
1422        assert_eq!(count, 8);
1423
1424        sleep(Duration::from_millis(100)).await;
1425        assert_eq!(checkpointer.get().await.unwrap().as_deref(), Some("8"));
1426    }
1427
1428    #[test]
1429    fn filter_matches_works_correctly() {
1430        let empty: Matches = HashMap::new();
1431        let includes = create_unit_matches(vec!["one", "two"]);
1432        let excludes = create_unit_matches(vec!["foo", "bar"]);
1433
1434        let zero = HashMap::new();
1435        assert!(!filter_matches(&zero, &empty, &empty));
1436        assert!(filter_matches(&zero, &includes, &empty));
1437        assert!(!filter_matches(&zero, &empty, &excludes));
1438        assert!(filter_matches(&zero, &includes, &excludes));
1439        let mut one = HashMap::new();
1440        one.insert(String::from(SYSTEMD_UNIT), String::from("one"));
1441        assert!(!filter_matches(&one, &empty, &empty));
1442        assert!(!filter_matches(&one, &includes, &empty));
1443        assert!(!filter_matches(&one, &empty, &excludes));
1444        assert!(!filter_matches(&one, &includes, &excludes));
1445        let mut two = HashMap::new();
1446        two.insert(String::from(SYSTEMD_UNIT), String::from("bar"));
1447        assert!(!filter_matches(&two, &empty, &empty));
1448        assert!(filter_matches(&two, &includes, &empty));
1449        assert!(filter_matches(&two, &empty, &excludes));
1450        assert!(filter_matches(&two, &includes, &excludes));
1451    }
1452
1453    #[test]
1454    fn merges_units_and_matches_option() {
1455        let include_units = vec!["one", "two"].into_iter().map(String::from).collect();
1456        let include_matches = create_matches(vec![
1457            ("_SYSTEMD_UNIT", "three.service"),
1458            ("_TRANSPORT", "kernel"),
1459        ]);
1460
1461        let exclude_units = vec!["foo", "bar"].into_iter().map(String::from).collect();
1462        let exclude_matches = create_matches(vec![
1463            ("_SYSTEMD_UNIT", "baz.service"),
1464            ("PRIORITY", "DEBUG"),
1465        ]);
1466
1467        let journald_config = JournaldConfig {
1468            include_units,
1469            include_matches,
1470            exclude_units,
1471            exclude_matches,
1472            ..Default::default()
1473        };
1474
1475        let hashset =
1476            |v: &[&str]| -> HashSet<String> { v.iter().copied().map(String::from).collect() };
1477
1478        let matches = journald_config.merged_include_matches();
1479        let units = matches.get("_SYSTEMD_UNIT").unwrap();
1480        assert_eq!(
1481            units,
1482            &hashset(&["one.service", "two.service", "three.service"])
1483        );
1484        let units = matches.get("_TRANSPORT").unwrap();
1485        assert_eq!(units, &hashset(&["kernel"]));
1486
1487        let matches = journald_config.merged_exclude_matches();
1488        let units = matches.get("_SYSTEMD_UNIT").unwrap();
1489        assert_eq!(
1490            units,
1491            &hashset(&["foo.service", "bar.service", "baz.service"])
1492        );
1493        let units = matches.get("PRIORITY").unwrap();
1494        assert_eq!(units, &hashset(&["DEBUG"]));
1495    }
1496
1497    #[test]
1498    fn find_duplicate_match_works_correctly() {
1499        let include_matches = create_matches(vec![("_TRANSPORT", "kernel")]);
1500        let exclude_matches = create_matches(vec![("_TRANSPORT", "kernel")]);
1501        let (field, value) = find_duplicate_match(&include_matches, &exclude_matches).unwrap();
1502        assert_eq!(field, "_TRANSPORT");
1503        assert_eq!(value, "kernel");
1504
1505        let empty = HashMap::new();
1506        let actual = find_duplicate_match(&empty, &empty);
1507        assert!(actual.is_none());
1508
1509        let actual = find_duplicate_match(&include_matches, &empty);
1510        assert!(actual.is_none());
1511
1512        let actual = find_duplicate_match(&empty, &exclude_matches);
1513        assert!(actual.is_none());
1514    }
1515
1516    #[test]
1517    fn command_options() {
1518        let path = PathBuf::from("journalctl");
1519
1520        let journal_dir = None;
1521        let journal_namespace = None;
1522        let current_boot_only = false;
1523        let cursor = None;
1524        let since_now = false;
1525        let extra_args = vec![];
1526
1527        let command = create_command(
1528            &path,
1529            journal_dir,
1530            journal_namespace,
1531            current_boot_only,
1532            since_now,
1533            cursor,
1534            extra_args,
1535        );
1536        let cmd_line = format!("{command:?}");
1537        assert!(!cmd_line.contains("--directory="));
1538        assert!(!cmd_line.contains("--namespace="));
1539        assert!(!cmd_line.contains("--boot"));
1540        assert!(cmd_line.contains("--since=2000-01-01"));
1541
1542        let journal_dir = None;
1543        let journal_namespace = None;
1544        let since_now = true;
1545        let extra_args = vec![];
1546
1547        let command = create_command(
1548            &path,
1549            journal_dir,
1550            journal_namespace,
1551            current_boot_only,
1552            since_now,
1553            cursor,
1554            extra_args,
1555        );
1556        let cmd_line = format!("{command:?}");
1557        assert!(cmd_line.contains("--since=now"));
1558
1559        let journal_dir = Some(PathBuf::from("/tmp/journal-dir"));
1560        let journal_namespace = Some(String::from("my_namespace"));
1561        let current_boot_only = true;
1562        let cursor = Some("2021-01-01");
1563        let extra_args = vec!["--merge".to_string()];
1564
1565        let command = create_command(
1566            &path,
1567            journal_dir,
1568            journal_namespace,
1569            current_boot_only,
1570            since_now,
1571            cursor,
1572            extra_args,
1573        );
1574        let cmd_line = format!("{command:?}");
1575        assert!(cmd_line.contains("--directory=/tmp/journal-dir"));
1576        assert!(cmd_line.contains("--namespace=my_namespace"));
1577        assert!(cmd_line.contains("--boot"));
1578        assert!(cmd_line.contains("--after-cursor="));
1579        assert!(cmd_line.contains("--merge"));
1580    }
1581
1582    fn create_command(
1583        path: &Path,
1584        journal_dir: Option<PathBuf>,
1585        journal_namespace: Option<String>,
1586        current_boot_only: bool,
1587        since_now: bool,
1588        cursor: Option<&str>,
1589        extra_args: Vec<String>,
1590    ) -> Command {
1591        StartJournalctl::new(
1592            path.into(),
1593            journal_dir,
1594            journal_namespace,
1595            current_boot_only,
1596            since_now,
1597            extra_args,
1598        )
1599        .make_command(cursor)
1600    }
1601
1602    fn message(event: &Event) -> Value {
1603        event.as_log()[log_schema().message_key().unwrap().to_string()].clone()
1604    }
1605
1606    fn timestamp(event: &Event) -> Value {
1607        event.as_log()[log_schema().timestamp_key().unwrap().to_string()].clone()
1608    }
1609
1610    fn cursor(event: &Event) -> Value {
1611        event.as_log()[CURSOR].clone()
1612    }
1613
1614    fn value_ts(secs: i64, usecs: u32) -> Value {
1615        Value::Timestamp(
1616            chrono::Utc
1617                .timestamp_opt(secs, usecs)
1618                .single()
1619                .expect("invalid timestamp"),
1620        )
1621    }
1622
1623    fn priority(event: &Event) -> Value {
1624        event.as_log()["PRIORITY"].clone()
1625    }
1626
1627    #[test]
1628    fn output_schema_definition_vector_namespace() {
1629        let config = JournaldConfig {
1630            log_namespace: Some(true),
1631            ..Default::default()
1632        };
1633
1634        let definitions = config
1635            .outputs(LogNamespace::Vector)
1636            .remove(0)
1637            .schema_definition(true);
1638
1639        let expected_definition =
1640            Definition::new_with_default_metadata(Kind::bytes().or_null(), [LogNamespace::Vector])
1641                .with_metadata_field(
1642                    &owned_value_path!("vector", "source_type"),
1643                    Kind::bytes(),
1644                    None,
1645                )
1646                .with_metadata_field(
1647                    &owned_value_path!("vector", "ingest_timestamp"),
1648                    Kind::timestamp(),
1649                    None,
1650                )
1651                .with_metadata_field(
1652                    &owned_value_path!(JournaldConfig::NAME, "metadata"),
1653                    Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1654                    None,
1655                )
1656                .with_metadata_field(
1657                    &owned_value_path!(JournaldConfig::NAME, "timestamp"),
1658                    Kind::timestamp().or_undefined(),
1659                    Some("timestamp"),
1660                )
1661                .with_metadata_field(
1662                    &owned_value_path!(JournaldConfig::NAME, "host"),
1663                    Kind::bytes().or_undefined(),
1664                    Some("host"),
1665                );
1666
1667        assert_eq!(definitions, Some(expected_definition))
1668    }
1669
1670    #[test]
1671    fn output_schema_definition_legacy_namespace() {
1672        let config = JournaldConfig::default();
1673
1674        let definitions = config
1675            .outputs(LogNamespace::Legacy)
1676            .remove(0)
1677            .schema_definition(true);
1678
1679        let expected_definition = Definition::new_with_default_metadata(
1680            Kind::object(Collection::empty()),
1681            [LogNamespace::Legacy],
1682        )
1683        .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
1684        .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
1685        .with_event_field(
1686            &owned_value_path!("host"),
1687            Kind::bytes().or_undefined(),
1688            Some("host"),
1689        )
1690        .unknown_fields(Kind::bytes());
1691
1692        assert_eq!(definitions, Some(expected_definition))
1693    }
1694
1695    fn matches_schema(config: &JournaldConfig, namespace: LogNamespace) {
1696        let record = r#"{
1697            "PRIORITY":"6",
1698            "SYSLOG_FACILITY":"3",
1699            "SYSLOG_IDENTIFIER":"ntpd",
1700            "_BOOT_ID":"124c781146e841ae8d9b4590df8b9231",
1701            "_CAP_EFFECTIVE":"3fffffffff",
1702            "_CMDLINE":"ntpd: [priv]",
1703            "_COMM":"ntpd",
1704            "_EXE":"/usr/sbin/ntpd",
1705            "_GID":"0",
1706            "_MACHINE_ID":"c36e9ea52800a19d214cb71b53263a28",
1707            "_PID":"2156",
1708            "_STREAM_ID":"92c79f4b45c4457490ebdefece29995e",
1709            "_SYSTEMD_CGROUP":"/system.slice/ntpd.service",
1710            "_SYSTEMD_INVOCATION_ID":"496ad5cd046d48e29f37f559a6d176f8",
1711            "_SYSTEMD_SLICE":"system.slice",
1712            "_SYSTEMD_UNIT":"ntpd.service",
1713            "_TRANSPORT":"stdout",
1714            "_UID":"0",
1715            "__MONOTONIC_TIMESTAMP":"98694000446",
1716            "__REALTIME_TIMESTAMP":"1564173027000443",
1717            "host":"my-host.local",
1718            "message":"reply from 192.168.1.2: offset -0.001791 delay 0.000176, next query 1500s",
1719            "source_type":"journald"
1720        }"#;
1721
1722        let json: serde_json::Value = serde_json::from_str(record).unwrap();
1723        let mut event = Event::from(LogEvent::from(vrl::value::Value::from(json)));
1724
1725        event.as_mut_log().insert("timestamp", chrono::Utc::now());
1726
1727        let definitions = config.outputs(namespace).remove(0).schema_definition(true);
1728
1729        definitions.unwrap().assert_valid_for_event(&event);
1730    }
1731
1732    #[test]
1733    fn matches_schema_legacy() {
1734        let config = JournaldConfig::default();
1735
1736        matches_schema(&config, LogNamespace::Legacy)
1737    }
1738}