1use std::{
2 collections::{HashMap, HashSet},
3 io::SeekFrom,
4 path::PathBuf,
5 process::Stdio,
6 str::FromStr,
7 sync::{Arc, LazyLock},
8 time::Duration,
9};
10
11use bytes::Bytes;
12use chrono::{TimeZone, Utc};
13use futures::{StreamExt, poll, stream::BoxStream, task::Poll};
14use nix::{
15 sys::signal::{Signal, kill},
16 unistd::Pid,
17};
18use serde_json::{Error as JsonError, Value as JsonValue};
19use snafu::{ResultExt, Snafu};
20use tokio::{
21 fs::{File, OpenOptions},
22 io::{self, AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
23 process::{Child, Command},
24 sync::{Mutex, MutexGuard},
25 time::sleep,
26};
27use tokio_util::codec::FramedRead;
28use vector_lib::{
29 EstimatedJsonEncodedSizeOf,
30 codecs::{CharacterDelimitedDecoder, decoding::BoxedFramingError},
31 config::{LegacyKey, LogNamespace},
32 configurable::configurable_component,
33 finalizer::OrderedFinalizer,
34 internal_event::{
35 ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol, Registered,
36 },
37 lookup::{metadata_path, owned_value_path, path},
38 schema::Definition,
39};
40use vrl::{
41 event_path,
42 value::{Kind, Value, kind::Collection},
43};
44
45use crate::{
46 SourceSender,
47 config::{
48 DataType, SourceAcknowledgementsConfig, SourceConfig, SourceContext, SourceOutput,
49 log_schema,
50 },
51 event::{BatchNotifier, BatchStatus, BatchStatusReceiver, LogEvent},
52 internal_events::{
53 EventsReceived, JournaldCheckpointFileOpenError, JournaldCheckpointSetError,
54 JournaldInvalidRecordError, JournaldReadError, JournaldStartJournalctlError,
55 StreamClosedError,
56 },
57 serde::bool_or_struct,
58 shutdown::ShutdownSignal,
59};
60
61const BATCH_TIMEOUT: Duration = Duration::from_millis(10);
62
63const CHECKPOINT_FILENAME: &str = "checkpoint.txt";
64const CURSOR: &str = "__CURSOR";
65const HOSTNAME: &str = "_HOSTNAME";
66const MESSAGE: &str = "MESSAGE";
67const SYSTEMD_UNIT: &str = "_SYSTEMD_UNIT";
68const SOURCE_TIMESTAMP: &str = "_SOURCE_REALTIME_TIMESTAMP";
69const RECEIVED_TIMESTAMP: &str = "__REALTIME_TIMESTAMP";
70
71const BACKOFF_DURATION: Duration = Duration::from_secs(1);
72
73static JOURNALCTL: LazyLock<PathBuf> = LazyLock::new(|| "journalctl".into());
74
75#[derive(Debug, Snafu)]
76enum BuildError {
77 #[snafu(display("journalctl failed to execute: {}", source))]
78 JournalctlSpawn { source: io::Error },
79 #[snafu(display(
80 "The unit {:?} is duplicated in both include_units and exclude_units",
81 unit
82 ))]
83 DuplicatedUnit { unit: String },
84 #[snafu(display(
85 "The Journal field/value pair {:?}:{:?} is duplicated in both include_matches and exclude_matches.",
86 field,
87 value,
88 ))]
89 DuplicatedMatches { field: String, value: String },
90}
91
92type Matches = HashMap<String, HashSet<String>>;
93
94#[configurable_component(source("journald", "Collect logs from JournalD."))]
96#[derive(Clone, Debug)]
97#[serde(deny_unknown_fields)]
98pub struct JournaldConfig {
99 #[serde(default)]
101 pub since_now: bool,
102
103 #[serde(default = "crate::serde::default_true")]
105 pub current_boot_only: bool,
106
107 #[serde(default)]
113 #[configurable(metadata(docs::examples = "ntpd", docs::examples = "sysinit.target"))]
114 pub include_units: Vec<String>,
115
116 #[serde(default)]
121 #[configurable(metadata(docs::examples = "badservice", docs::examples = "sysinit.target"))]
122 pub exclude_units: Vec<String>,
123
124 #[serde(default)]
130 #[configurable(metadata(
131 docs::additional_props_description = "The set of field values to match in journal entries that are to be included."
132 ))]
133 #[configurable(metadata(docs::examples = "matches_examples()"))]
134 pub include_matches: Matches,
135
136 #[serde(default)]
141 #[configurable(metadata(
142 docs::additional_props_description = "The set of field values to match in journal entries that are to be excluded."
143 ))]
144 #[configurable(metadata(docs::examples = "matches_examples()"))]
145 pub exclude_matches: Matches,
146
147 #[serde(default)]
156 #[configurable(metadata(docs::examples = "/var/lib/vector"))]
157 #[configurable(metadata(docs::human_name = "Data Directory"))]
158 pub data_dir: Option<PathBuf>,
159
160 #[serde(default)]
164 #[configurable(metadata(docs::examples = "--merge"))]
165 pub extra_args: Vec<String>,
166
167 #[serde(default = "default_batch_size")]
171 #[configurable(metadata(docs::type_unit = "events"))]
172 pub batch_size: usize,
173
174 #[serde(default)]
178 pub journalctl_path: Option<PathBuf>,
179
180 #[serde(default)]
184 pub journal_directory: Option<PathBuf>,
185
186 #[serde(default)]
194 pub journal_namespace: Option<String>,
195
196 #[configurable(derived)]
197 #[serde(default, deserialize_with = "bool_or_struct")]
198 acknowledgements: SourceAcknowledgementsConfig,
199
200 #[serde(default)]
204 #[configurable(
205 deprecated = "This option has been deprecated, use the `remap` transform and `to_syslog_level` function instead."
206 )]
207 remap_priority: bool,
208
209 #[configurable(metadata(docs::hidden))]
211 #[serde(default)]
212 log_namespace: Option<bool>,
213
214 #[serde(default = "crate::serde::default_false")]
219 emit_cursor: bool,
220}
221
222const fn default_batch_size() -> usize {
223 16
224}
225
226fn matches_examples() -> HashMap<String, Vec<String>> {
227 HashMap::<_, _>::from_iter([
228 (
229 "_SYSTEMD_UNIT".to_owned(),
230 vec!["sshd.service".to_owned(), "ntpd.service".to_owned()],
231 ),
232 ("_TRANSPORT".to_owned(), vec!["kernel".to_owned()]),
233 ])
234}
235
236impl JournaldConfig {
237 fn merged_include_matches(&self) -> Matches {
238 Self::merge_units(&self.include_matches, &self.include_units)
239 }
240
241 fn merged_exclude_matches(&self) -> Matches {
242 Self::merge_units(&self.exclude_matches, &self.exclude_units)
243 }
244
245 fn merge_units(matches: &Matches, units: &[String]) -> Matches {
246 let mut matches = matches.clone();
247 for unit in units {
248 let entry = matches.entry(String::from(SYSTEMD_UNIT));
249 entry.or_default().insert(fixup_unit(unit));
250 }
251 matches
252 }
253
254 fn schema_definition(&self, log_namespace: LogNamespace) -> Definition {
256 let schema_definition = match log_namespace {
257 LogNamespace::Vector => Definition::new_with_default_metadata(
258 Kind::bytes().or_null(),
259 [LogNamespace::Vector],
260 ),
261 LogNamespace::Legacy => Definition::new_with_default_metadata(
262 Kind::object(Collection::empty()),
263 [LogNamespace::Legacy],
264 ),
265 };
266
267 let mut schema_definition = schema_definition
268 .with_standard_vector_source_metadata()
269 .with_source_metadata(
271 JournaldConfig::NAME,
272 None,
273 &owned_value_path!("metadata"),
274 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
275 None,
276 )
277 .with_source_metadata(
278 JournaldConfig::NAME,
279 None,
280 &owned_value_path!("timestamp"),
281 Kind::timestamp().or_undefined(),
282 Some("timestamp"),
283 )
284 .with_source_metadata(
285 JournaldConfig::NAME,
286 log_schema().host_key().cloned().map(LegacyKey::Overwrite),
287 &owned_value_path!("host"),
288 Kind::bytes().or_undefined(),
289 Some("host"),
290 );
291
292 if log_namespace == LogNamespace::Legacy {
294 schema_definition = schema_definition.unknown_fields(Kind::bytes());
295 }
296
297 schema_definition
298 }
299}
300
301impl Default for JournaldConfig {
302 fn default() -> Self {
303 Self {
304 since_now: false,
305 current_boot_only: true,
306 include_units: vec![],
307 exclude_units: vec![],
308 include_matches: Default::default(),
309 exclude_matches: Default::default(),
310 data_dir: None,
311 batch_size: default_batch_size(),
312 journalctl_path: None,
313 journal_directory: None,
314 journal_namespace: None,
315 extra_args: vec![],
316 acknowledgements: Default::default(),
317 remap_priority: false,
318 log_namespace: None,
319 emit_cursor: false,
320 }
321 }
322}
323
324impl_generate_config_from_default!(JournaldConfig);
325
326type Record = HashMap<String, String>;
327
328#[async_trait::async_trait]
329#[typetag::serde(name = "journald")]
330impl SourceConfig for JournaldConfig {
331 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
332 if self.remap_priority {
333 warn!(
334 "DEPRECATION, option `remap_priority` has been deprecated. Please use the `remap` transform and function `to_syslog_level` instead."
335 );
336 }
337
338 let data_dir = cx
339 .globals
340 .resolve_and_make_data_subdir(self.data_dir.as_ref(), cx.key.id())?;
342
343 if let Some(unit) = self
344 .include_units
345 .iter()
346 .find(|unit| self.exclude_units.contains(unit))
347 {
348 let unit = unit.into();
349 return Err(BuildError::DuplicatedUnit { unit }.into());
350 }
351
352 let include_matches = self.merged_include_matches();
353 let exclude_matches = self.merged_exclude_matches();
354
355 if let Some((field, value)) = find_duplicate_match(&include_matches, &exclude_matches) {
356 return Err(BuildError::DuplicatedMatches { field, value }.into());
357 }
358
359 let mut checkpoint_path = data_dir;
360 checkpoint_path.push(CHECKPOINT_FILENAME);
361
362 let journalctl_path = self
363 .journalctl_path
364 .clone()
365 .unwrap_or_else(|| JOURNALCTL.clone());
366
367 let starter = StartJournalctl::new(
368 journalctl_path,
369 self.journal_directory.clone(),
370 self.journal_namespace.clone(),
371 self.current_boot_only,
372 self.since_now,
373 self.extra_args.clone(),
374 );
375
376 let batch_size = self.batch_size;
377 let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
378 let log_namespace = cx.log_namespace(self.log_namespace);
379
380 Ok(Box::pin(
381 JournaldSource {
382 include_matches,
383 exclude_matches,
384 checkpoint_path,
385 batch_size,
386 remap_priority: self.remap_priority,
387 out: cx.out,
388 acknowledgements,
389 starter,
390 log_namespace,
391 emit_cursor: self.emit_cursor,
392 }
393 .run_shutdown(cx.shutdown),
394 ))
395 }
396
397 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
398 let schema_definition =
399 self.schema_definition(global_log_namespace.merge(self.log_namespace));
400
401 vec![SourceOutput::new_maybe_logs(
402 DataType::Log,
403 schema_definition,
404 )]
405 }
406
407 fn can_acknowledge(&self) -> bool {
408 true
409 }
410}
411
412struct JournaldSource {
413 include_matches: Matches,
414 exclude_matches: Matches,
415 checkpoint_path: PathBuf,
416 batch_size: usize,
417 remap_priority: bool,
418 out: SourceSender,
419 acknowledgements: bool,
420 starter: StartJournalctl,
421 log_namespace: LogNamespace,
422 emit_cursor: bool,
423}
424
425impl JournaldSource {
426 async fn run_shutdown(self, shutdown: ShutdownSignal) -> Result<(), ()> {
427 let checkpointer = StatefulCheckpointer::new(self.checkpoint_path.clone())
428 .await
429 .map_err(|error| {
430 emit!(JournaldCheckpointFileOpenError {
431 error,
432 path: self
433 .checkpoint_path
434 .to_str()
435 .unwrap_or("unknown")
436 .to_string(),
437 });
438 })?;
439
440 let checkpointer = SharedCheckpointer::new(checkpointer);
441 let finalizer = Finalizer::new(
442 self.acknowledgements,
443 checkpointer.clone(),
444 shutdown.clone(),
445 );
446
447 self.run(checkpointer, finalizer, shutdown).await;
448
449 Ok(())
450 }
451
452 async fn run(
453 mut self,
454 checkpointer: SharedCheckpointer,
455 finalizer: Finalizer,
456 mut shutdown: ShutdownSignal,
457 ) {
458 loop {
459 if matches!(poll!(&mut shutdown), Poll::Ready(_)) {
460 break;
461 }
462
463 info!("Starting journalctl.");
464 let cursor = checkpointer.lock().await.cursor.clone();
465 match self.starter.start(cursor.as_deref()) {
466 Ok((stdout_stream, stderr_stream, running)) => {
467 if !self
468 .run_stream(stdout_stream, stderr_stream, &finalizer, shutdown.clone())
469 .await
470 {
471 return;
472 }
473 drop(running);
475 }
476 Err(error) => {
477 emit!(JournaldStartJournalctlError { error });
478 }
479 }
480
481 tokio::select! {
484 _ = &mut shutdown => break,
485 _ = sleep(BACKOFF_DURATION) => (),
486 }
487 }
488 }
489
490 async fn run_stream<'a>(
493 &'a mut self,
494 mut stdout_stream: JournalStream,
495 stderr_stream: JournalStream,
496 finalizer: &'a Finalizer,
497 mut shutdown: ShutdownSignal,
498 ) -> bool {
499 let bytes_received = register!(BytesReceived::from(Protocol::from("journald")));
500 let events_received = register!(EventsReceived);
501
502 let stderr_handler = tokio::spawn(Self::handle_stderr(stderr_stream));
504
505 let batch_size = self.batch_size;
506 let result = loop {
507 let mut batch = Batch::new(self);
508
509 while batch.events.is_empty() {
512 let item = tokio::select! {
513 _ = &mut shutdown => {
514 stderr_handler.abort();
515 return false;
516 },
517 item = stdout_stream.next() => item,
518 };
519 if !batch.handle_next(item) {
520 stderr_handler.abort();
521 return true;
522 }
523 }
524
525 let timeout = tokio::time::sleep(BATCH_TIMEOUT);
526 tokio::pin!(timeout);
527
528 for _ in 1..batch_size {
529 tokio::select! {
530 _ = &mut timeout => break,
531 result = stdout_stream.next() => if !batch.handle_next(result) {
532 break;
533 }
534 }
535 }
536 if let Some(x) = batch
537 .finish(finalizer, &bytes_received, &events_received)
538 .await
539 {
540 break x;
541 }
542 };
543
544 stderr_handler.abort();
545 result
546 }
547
548 async fn handle_stderr(mut stderr_stream: JournalStream) {
550 while let Some(result) = stderr_stream.next().await {
551 match result {
552 Ok(line) => {
553 let line_str = String::from_utf8_lossy(&line);
554 let trimmed = line_str.trim();
555 if !trimmed.is_empty() {
556 warn!("Warning journalctl stderr: {trimmed}");
557 }
558 }
559 Err(err) => {
560 error!("Error reading journalctl stderr: {err}");
561 break;
562 }
563 }
564 }
565 }
566}
567
568struct Batch<'a> {
569 events: Vec<LogEvent>,
570 record_size: usize,
571 exiting: Option<bool>,
572 batch: Option<BatchNotifier>,
573 receiver: Option<BatchStatusReceiver>,
574 source: &'a mut JournaldSource,
575 cursor: Option<String>,
576}
577
578impl<'a> Batch<'a> {
579 fn new(source: &'a mut JournaldSource) -> Self {
580 let (batch, receiver) = BatchNotifier::maybe_new_with_receiver(source.acknowledgements);
581 Self {
582 events: Vec::new(),
583 record_size: 0,
584 exiting: None,
585 batch,
586 receiver,
587 source,
588 cursor: None,
589 }
590 }
591
592 fn handle_next(&mut self, result: Option<Result<Bytes, BoxedFramingError>>) -> bool {
593 match result {
594 None => {
595 warn!("Journalctl process stopped.");
596 self.exiting = Some(true);
597 false
598 }
599 Some(Err(error)) => {
600 emit!(JournaldReadError { error });
601 false
602 }
603 Some(Ok(bytes)) => {
604 match decode_record(&bytes, self.source.remap_priority) {
605 Ok(mut record) => {
606 if self.source.emit_cursor {
607 if let Some(tmp) = record.get(CURSOR) {
608 self.cursor = Some(tmp.clone());
609 }
610 } else if let Some(tmp) = record.remove(CURSOR) {
611 self.cursor = Some(tmp);
612 }
613
614 if !filter_matches(
615 &record,
616 &self.source.include_matches,
617 &self.source.exclude_matches,
618 ) {
619 self.record_size += bytes.len();
620
621 let mut event = create_log_event_from_record(
622 record,
623 &self.batch,
624 self.source.log_namespace,
625 );
626
627 enrich_log_event(&mut event, self.source.log_namespace);
628
629 self.events.push(event);
630 }
631 }
632 Err(error) => {
633 emit!(JournaldInvalidRecordError {
634 error,
635 text: String::from_utf8_lossy(&bytes).into_owned()
636 });
637 }
638 }
639 true
640 }
641 }
642 }
643
644 async fn finish(
645 mut self,
646 finalizer: &Finalizer,
647 bytes_received: &'a Registered<BytesReceived>,
648 events_received: &'a Registered<EventsReceived>,
649 ) -> Option<bool> {
650 drop(self.batch);
651
652 if self.record_size > 0 {
653 bytes_received.emit(ByteSize(self.record_size));
654 }
655
656 if !self.events.is_empty() {
657 let count = self.events.len();
658 let byte_size = self.events.estimated_json_encoded_size_of();
659 events_received.emit(CountByteSize(count, byte_size));
660
661 match self.source.out.send_batch(self.events).await {
662 Ok(_) => {
663 if let Some(cursor) = self.cursor {
664 finalizer.finalize(cursor, self.receiver).await;
665 }
666 }
667 Err(_) => {
668 emit!(StreamClosedError { count });
669 self.exiting = Some(false);
671 }
672 }
673 }
674 self.exiting
675 }
676}
677
678type JournalStream = BoxStream<'static, Result<Bytes, BoxedFramingError>>;
679
680struct StartJournalctl {
681 path: PathBuf,
682 journal_dir: Option<PathBuf>,
683 journal_namespace: Option<String>,
684 current_boot_only: bool,
685 since_now: bool,
686 extra_args: Vec<String>,
687}
688
689impl StartJournalctl {
690 const fn new(
691 path: PathBuf,
692 journal_dir: Option<PathBuf>,
693 journal_namespace: Option<String>,
694 current_boot_only: bool,
695 since_now: bool,
696 extra_args: Vec<String>,
697 ) -> Self {
698 Self {
699 path,
700 journal_dir,
701 journal_namespace,
702 current_boot_only,
703 since_now,
704 extra_args,
705 }
706 }
707
708 fn make_command(&self, checkpoint: Option<&str>) -> Command {
709 let mut command = Command::new(&self.path);
710 command.stdout(Stdio::piped());
711 command.stderr(Stdio::piped());
712 command.arg("--follow");
713 command.arg("--all");
714 command.arg("--show-cursor");
715 command.arg("--output=json");
716
717 if let Some(dir) = &self.journal_dir {
718 command.arg(format!("--directory={}", dir.display()));
719 }
720
721 if let Some(namespace) = &self.journal_namespace {
722 command.arg(format!("--namespace={namespace}"));
723 }
724
725 if self.current_boot_only {
726 command.arg("--boot");
727 }
728
729 if let Some(cursor) = checkpoint {
730 command.arg(format!("--after-cursor={cursor}"));
731 } else if self.since_now {
732 command.arg("--since=now");
733 } else {
734 command.arg("--since=2000-01-01");
736 }
737
738 if !self.extra_args.is_empty() {
739 command.args(&self.extra_args);
740 }
741
742 command
743 }
744
745 fn start(
746 &mut self,
747 checkpoint: Option<&str>,
748 ) -> crate::Result<(JournalStream, JournalStream, RunningJournalctl)> {
749 let mut command = self.make_command(checkpoint);
750
751 let mut child = command.spawn().context(JournalctlSpawnSnafu)?;
752
753 let stdout_stream = FramedRead::new(
754 child.stdout.take().unwrap(),
755 CharacterDelimitedDecoder::new(b'\n'),
756 );
757
758 let stderr = child.stderr.take().unwrap();
759 let stderr_stream = FramedRead::new(stderr, CharacterDelimitedDecoder::new(b'\n'));
760
761 Ok((
762 stdout_stream.boxed(),
763 stderr_stream.boxed(),
764 RunningJournalctl(child),
765 ))
766 }
767}
768
769struct RunningJournalctl(Child);
770
771impl Drop for RunningJournalctl {
772 fn drop(&mut self) {
773 if let Some(pid) = self.0.id().and_then(|pid| pid.try_into().ok()) {
774 _ = kill(Pid::from_raw(pid), Signal::SIGTERM);
775 }
776 }
777}
778
779fn enrich_log_event(log: &mut LogEvent, log_namespace: LogNamespace) {
780 match log_namespace {
781 LogNamespace::Vector => {
782 if let Some(host) = log
783 .get(metadata_path!(JournaldConfig::NAME, "metadata"))
784 .and_then(|meta| meta.get(HOSTNAME))
785 {
786 log.insert(metadata_path!(JournaldConfig::NAME, "host"), host.clone());
787 }
788 }
789 LogNamespace::Legacy => {
790 if let Some(host) = log.remove(event_path!(HOSTNAME)) {
791 log_namespace.insert_source_metadata(
792 JournaldConfig::NAME,
793 log,
794 log_schema().host_key().map(LegacyKey::Overwrite),
795 path!("host"),
796 host,
797 );
798 }
799 }
800 }
801
802 let timestamp_value = match log_namespace {
804 LogNamespace::Vector => log
805 .get(metadata_path!(JournaldConfig::NAME, "metadata"))
806 .and_then(|meta| {
807 meta.get(SOURCE_TIMESTAMP)
808 .or_else(|| meta.get(RECEIVED_TIMESTAMP))
809 }),
810 LogNamespace::Legacy => log
811 .get(event_path!(SOURCE_TIMESTAMP))
812 .or_else(|| log.get(event_path!(RECEIVED_TIMESTAMP))),
813 };
814
815 let timestamp = timestamp_value
816 .filter(|&ts| ts.is_bytes())
817 .and_then(|ts| ts.as_str().unwrap().parse::<u64>().ok())
818 .map(|ts| {
819 chrono::Utc
820 .timestamp_opt((ts / 1_000_000) as i64, (ts % 1_000_000) as u32 * 1_000)
821 .single()
822 .expect("invalid timestamp")
823 });
824
825 match log_namespace {
827 LogNamespace::Vector => {
828 log.insert(metadata_path!("vector", "ingest_timestamp"), Utc::now());
829
830 if let Some(ts) = timestamp {
831 log.insert(metadata_path!(JournaldConfig::NAME, "timestamp"), ts);
832 }
833 }
834 LogNamespace::Legacy => {
835 if let Some(ts) = timestamp {
836 log.maybe_insert(log_schema().timestamp_key_target_path(), ts);
837 }
838 }
839 }
840
841 log_namespace.insert_vector_metadata(
843 log,
844 log_schema().source_type_key(),
845 path!("source_type"),
846 JournaldConfig::NAME,
847 );
848}
849
850fn create_log_event_from_record(
851 mut record: Record,
852 batch: &Option<BatchNotifier>,
853 log_namespace: LogNamespace,
854) -> LogEvent {
855 match log_namespace {
856 LogNamespace::Vector => {
857 let message_value = record
858 .remove(MESSAGE)
859 .map(|msg| Value::Bytes(Bytes::from(msg)))
860 .unwrap_or(Value::Null);
861
862 let mut log = LogEvent::from(message_value).with_batch_notifier_option(batch);
863
864 record.iter().for_each(|(key, value)| {
866 log.metadata_mut()
867 .value_mut()
868 .insert(path!(JournaldConfig::NAME, "metadata", key), value.as_str());
869 });
870
871 log
872 }
873 LogNamespace::Legacy => {
874 let mut log = LogEvent::from_iter(record).with_batch_notifier_option(batch);
875
876 if let Some(message) = log.remove(event_path!(MESSAGE)) {
877 log.maybe_insert(log_schema().message_key_target_path(), message);
878 }
879
880 log
881 }
882 }
883}
884
885fn fixup_unit(unit: &str) -> String {
888 if unit.contains('.') {
889 unit.into()
890 } else {
891 format!("{unit}.service")
892 }
893}
894
895fn decode_record(line: &[u8], remap: bool) -> Result<Record, JsonError> {
896 let mut record = serde_json::from_str::<JsonValue>(&String::from_utf8_lossy(line))?;
897 if let Some(record) = record.as_object_mut() {
900 for (_, value) in record.iter_mut().filter(|(_, v)| v.is_array()) {
901 *value = decode_array(value.as_array().expect("already validated"));
902 }
903 }
904 if remap {
905 record.get_mut("PRIORITY").map(remap_priority);
906 }
907 serde_json::from_value(record)
908}
909
910fn decode_array(array: &[JsonValue]) -> JsonValue {
911 decode_array_as_bytes(array).unwrap_or_else(|| {
912 let ser = serde_json::to_string(array).expect("already deserialized");
913 JsonValue::String(ser)
914 })
915}
916
917fn decode_array_as_bytes(array: &[JsonValue]) -> Option<JsonValue> {
918 array
922 .iter()
923 .map(|item| {
924 item.as_u64().and_then(|num| match num {
925 num if num <= u8::MAX as u64 => Some(num as u8),
926 _ => None,
927 })
928 })
929 .collect::<Option<Vec<u8>>>()
930 .map(|array| String::from_utf8_lossy(&array).into())
931}
932
933fn remap_priority(priority: &mut JsonValue) {
934 if let Some(num) = priority.as_str().and_then(|s| usize::from_str(s).ok()) {
935 let text = match num {
936 0 => "EMERG",
937 1 => "ALERT",
938 2 => "CRIT",
939 3 => "ERR",
940 4 => "WARNING",
941 5 => "NOTICE",
942 6 => "INFO",
943 7 => "DEBUG",
944 _ => "UNKNOWN",
945 };
946 *priority = JsonValue::String(text.into());
947 }
948}
949
950fn filter_matches(record: &Record, includes: &Matches, excludes: &Matches) -> bool {
951 match (includes.is_empty(), excludes.is_empty()) {
952 (true, true) => false,
953 (false, true) => !contains_match(record, includes),
954 (true, false) => contains_match(record, excludes),
955 (false, false) => !contains_match(record, includes) || contains_match(record, excludes),
956 }
957}
958
959fn contains_match(record: &Record, matches: &Matches) -> bool {
960 let f = move |(field, value)| {
961 matches
962 .get(field)
963 .map(|x| x.contains(value))
964 .unwrap_or(false)
965 };
966 record.iter().any(f)
967}
968
969fn find_duplicate_match(a_matches: &Matches, b_matches: &Matches) -> Option<(String, String)> {
970 for (a_key, a_values) in a_matches {
971 if let Some(b_values) = b_matches.get(a_key.as_str()) {
972 for (a, b) in a_values
973 .iter()
974 .flat_map(|x| std::iter::repeat(x).zip(b_values.iter()))
975 {
976 if a == b {
977 return Some((a_key.into(), b.into()));
978 }
979 }
980 }
981 }
982 None
983}
984
985enum Finalizer {
986 Sync(SharedCheckpointer),
987 Async(OrderedFinalizer<String>),
988}
989
990impl Finalizer {
991 fn new(
992 acknowledgements: bool,
993 checkpointer: SharedCheckpointer,
994 shutdown: ShutdownSignal,
995 ) -> Self {
996 if acknowledgements {
997 let (finalizer, mut ack_stream) = OrderedFinalizer::new(Some(shutdown));
998 tokio::spawn(async move {
999 while let Some((status, cursor)) = ack_stream.next().await {
1000 if status == BatchStatus::Delivered {
1001 checkpointer.lock().await.set(cursor).await;
1002 }
1003 }
1004 });
1005 Self::Async(finalizer)
1006 } else {
1007 Self::Sync(checkpointer)
1008 }
1009 }
1010
1011 async fn finalize(&self, cursor: String, receiver: Option<BatchStatusReceiver>) {
1012 match (self, receiver) {
1013 (Self::Sync(checkpointer), None) => checkpointer.lock().await.set(cursor).await,
1014 (Self::Async(finalizer), Some(receiver)) => finalizer.add(cursor, receiver),
1015 _ => {
1016 unreachable!("Cannot have async finalization without a receiver in journald source")
1017 }
1018 }
1019 }
1020}
1021
1022struct Checkpointer {
1023 file: File,
1024 filename: PathBuf,
1025}
1026
1027impl Checkpointer {
1028 async fn new(filename: PathBuf) -> Result<Self, io::Error> {
1029 let file = OpenOptions::new()
1030 .read(true)
1031 .write(true)
1032 .create(true)
1033 .truncate(false)
1034 .open(&filename)
1035 .await?;
1036 Ok(Checkpointer { file, filename })
1037 }
1038
1039 async fn set(&mut self, token: &str) -> Result<(), io::Error> {
1040 self.file.seek(SeekFrom::Start(0)).await?;
1041 self.file.write_all(format!("{token}\n").as_bytes()).await
1042 }
1043
1044 async fn get(&mut self) -> Result<Option<String>, io::Error> {
1045 let mut buf = Vec::<u8>::new();
1046 self.file.seek(SeekFrom::Start(0)).await?;
1047 self.file.read_to_end(&mut buf).await?;
1048 match buf.len() {
1049 0 => Ok(None),
1050 _ => {
1051 let text = String::from_utf8_lossy(&buf);
1052 match text.find('\n') {
1053 Some(nl) => Ok(Some(String::from(&text[..nl]))),
1054 None => Ok(None), }
1056 }
1057 }
1058 }
1059}
1060
1061struct StatefulCheckpointer {
1062 checkpointer: Checkpointer,
1063 cursor: Option<String>,
1064}
1065
1066impl StatefulCheckpointer {
1067 async fn new(filename: PathBuf) -> Result<Self, io::Error> {
1068 let mut checkpointer = Checkpointer::new(filename).await?;
1069 let cursor = checkpointer.get().await?;
1070 Ok(Self {
1071 checkpointer,
1072 cursor,
1073 })
1074 }
1075
1076 async fn set(&mut self, token: impl Into<String>) {
1077 let token = token.into();
1078 if let Err(error) = self.checkpointer.set(&token).await {
1079 emit!(JournaldCheckpointSetError {
1080 error,
1081 filename: self
1082 .checkpointer
1083 .filename
1084 .to_str()
1085 .unwrap_or("unknown")
1086 .to_string(),
1087 });
1088 }
1089 self.cursor = Some(token);
1090 }
1091}
1092
1093#[derive(Clone)]
1094struct SharedCheckpointer(Arc<Mutex<StatefulCheckpointer>>);
1095
1096impl SharedCheckpointer {
1097 fn new(c: StatefulCheckpointer) -> Self {
1098 Self(Arc::new(Mutex::new(c)))
1099 }
1100
1101 async fn lock(&self) -> MutexGuard<'_, StatefulCheckpointer> {
1102 self.0.lock().await
1103 }
1104}
1105
1106#[cfg(test)]
1107mod checkpointer_tests {
1108 use tempfile::tempdir;
1109 use tokio::fs::read_to_string;
1110
1111 use super::*;
1112
1113 #[test]
1114 fn generate_config() {
1115 crate::test_util::test_generate_config::<JournaldConfig>();
1116 }
1117
1118 #[tokio::test]
1119 async fn journald_checkpointer_works() {
1120 let tempdir = tempdir().unwrap();
1121 let mut filename = tempdir.path().to_path_buf();
1122 filename.push(CHECKPOINT_FILENAME);
1123 let mut checkpointer = Checkpointer::new(filename.clone())
1124 .await
1125 .expect("Creating checkpointer failed!");
1126
1127 assert!(checkpointer.get().await.unwrap().is_none());
1128
1129 checkpointer
1130 .set("first test")
1131 .await
1132 .expect("Setting checkpoint failed");
1133 assert_eq!(checkpointer.get().await.unwrap().unwrap(), "first test");
1134 let contents = read_to_string(filename.clone())
1135 .await
1136 .unwrap_or_else(|_| panic!("Failed to read: {filename:?}"));
1137 assert!(contents.starts_with("first test\n"));
1138
1139 checkpointer
1140 .set("second")
1141 .await
1142 .expect("Setting checkpoint failed");
1143 assert_eq!(checkpointer.get().await.unwrap().unwrap(), "second");
1144 let contents = read_to_string(filename.clone())
1145 .await
1146 .unwrap_or_else(|_| panic!("Failed to read: {filename:?}"));
1147 assert!(contents.starts_with("second\n"));
1148 }
1149}
1150
1151#[cfg(test)]
1152mod tests {
1153 use std::{fs, path::Path};
1154
1155 use tempfile::tempdir;
1156 use tokio::time::{Duration, Instant, sleep, timeout};
1157 use vrl::value::{Value, kind::Collection};
1158
1159 use super::*;
1160 use crate::{
1161 config::ComponentKey,
1162 event::{Event, EventStatus},
1163 test_util::components::assert_source_compliance,
1164 };
1165
1166 const TEST_COMPONENT: &str = "journald-test";
1167 const TEST_JOURNALCTL: &str = "tests/data/journalctl";
1168
1169 async fn run_with_units(iunits: &[&str], xunits: &[&str], cursor: Option<&str>) -> Vec<Event> {
1170 let include_matches = create_unit_matches(iunits.to_vec());
1171 let exclude_matches = create_unit_matches(xunits.to_vec());
1172 run_journal(include_matches, exclude_matches, cursor, false).await
1173 }
1174
1175 async fn run_journal(
1176 include_matches: Matches,
1177 exclude_matches: Matches,
1178 checkpoint: Option<&str>,
1179 emit_cursor: bool,
1180 ) -> Vec<Event> {
1181 assert_source_compliance(&["protocol"], async move {
1182 let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Delivered);
1183
1184 let tempdir = tempdir().unwrap();
1185 let tempdir = tempdir.path().to_path_buf();
1186
1187 if let Some(cursor) = checkpoint {
1188 let mut checkpoint_path = tempdir.clone();
1189 checkpoint_path.push(TEST_COMPONENT);
1190 fs::create_dir(&checkpoint_path).unwrap();
1191 checkpoint_path.push(CHECKPOINT_FILENAME);
1192
1193 let mut checkpointer = Checkpointer::new(checkpoint_path.clone())
1194 .await
1195 .expect("Creating checkpointer failed!");
1196
1197 checkpointer
1198 .set(cursor)
1199 .await
1200 .expect("Could not set checkpoint");
1201 }
1202
1203 let (cx, shutdown) =
1204 SourceContext::new_shutdown(&ComponentKey::from(TEST_COMPONENT), tx);
1205 let config = JournaldConfig {
1206 journalctl_path: Some(TEST_JOURNALCTL.into()),
1207 include_matches,
1208 exclude_matches,
1209 data_dir: Some(tempdir),
1210 remap_priority: true,
1211 acknowledgements: false.into(),
1212 emit_cursor,
1213 ..Default::default()
1214 };
1215 let source = config.build(cx).await.unwrap();
1216 tokio::spawn(async move { source.await.unwrap() });
1217
1218 sleep(Duration::from_secs(1)).await;
1220 shutdown
1221 .shutdown_all(Some(Instant::now() + Duration::from_secs(1)))
1222 .await;
1223
1224 timeout(Duration::from_secs(1), rx.collect()).await.unwrap()
1225 })
1226 .await
1227 }
1228
1229 fn create_unit_matches<S: Into<String>>(units: Vec<S>) -> Matches {
1230 let units: HashSet<String> = units.into_iter().map(Into::into).collect();
1231 let mut map = HashMap::new();
1232 if !units.is_empty() {
1233 map.insert(String::from(SYSTEMD_UNIT), units);
1234 }
1235 map
1236 }
1237
1238 fn create_matches<S: Into<String>>(conditions: Vec<(S, S)>) -> Matches {
1239 let mut matches: Matches = HashMap::new();
1240 for (field, value) in conditions {
1241 matches
1242 .entry(field.into())
1243 .or_default()
1244 .insert(value.into());
1245 }
1246 matches
1247 }
1248
1249 #[tokio::test]
1250 async fn reads_journal() {
1251 let received = run_with_units(&[], &[], None).await;
1252 assert_eq!(received.len(), 8);
1253 assert_eq!(
1254 message(&received[0]),
1255 Value::Bytes("System Initialization".into())
1256 );
1257 assert_eq!(
1258 received[0].as_log()[log_schema().source_type_key().unwrap().to_string()],
1259 "journald".into()
1260 );
1261 assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140001000));
1262 assert_eq!(priority(&received[0]), Value::Bytes("INFO".into()));
1263 assert_eq!(message(&received[1]), Value::Bytes("unit message".into()));
1264 assert_eq!(timestamp(&received[1]), value_ts(1578529839, 140002000));
1265 assert_eq!(priority(&received[1]), Value::Bytes("DEBUG".into()));
1266 }
1267
1268 #[tokio::test]
1269 async fn includes_units() {
1270 let received = run_with_units(&["unit.service"], &[], None).await;
1271 assert_eq!(received.len(), 1);
1272 assert_eq!(message(&received[0]), Value::Bytes("unit message".into()));
1273 }
1274
1275 #[tokio::test]
1276 async fn excludes_units() {
1277 let received = run_with_units(&[], &["unit.service", "badunit.service"], None).await;
1278 assert_eq!(received.len(), 6);
1279 assert_eq!(
1280 message(&received[0]),
1281 Value::Bytes("System Initialization".into())
1282 );
1283 assert_eq!(
1284 message(&received[1]),
1285 Value::Bytes("Missing timestamp".into())
1286 );
1287 assert_eq!(
1288 message(&received[2]),
1289 Value::Bytes("Different timestamps".into())
1290 );
1291 }
1292
1293 #[tokio::test]
1294 async fn emits_cursor() {
1295 let received = run_journal(Matches::new(), Matches::new(), None, true).await;
1296 assert_eq!(cursor(&received[0]), Value::Bytes("1".into()));
1297 assert_eq!(cursor(&received[3]), Value::Bytes("4".into()));
1298 assert_eq!(cursor(&received[7]), Value::Bytes("8".into()));
1299 }
1300
1301 #[tokio::test]
1302 async fn includes_matches() {
1303 let matches = create_matches(vec![("PRIORITY", "ERR")]);
1304 let received = run_journal(matches, HashMap::new(), None, false).await;
1305 assert_eq!(received.len(), 2);
1306 assert_eq!(
1307 message(&received[0]),
1308 Value::Bytes("Different timestamps".into())
1309 );
1310 assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140005000));
1311 assert_eq!(
1312 message(&received[1]),
1313 Value::Bytes("Non-ASCII in other field".into())
1314 );
1315 assert_eq!(timestamp(&received[1]), value_ts(1578529839, 140005000));
1316 }
1317
1318 #[tokio::test]
1319 async fn includes_kernel() {
1320 let matches = create_matches(vec![("_TRANSPORT", "kernel")]);
1321 let received = run_journal(matches, HashMap::new(), None, false).await;
1322 assert_eq!(received.len(), 1);
1323 assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140006000));
1324 assert_eq!(message(&received[0]), Value::Bytes("audit log".into()));
1325 }
1326
1327 #[tokio::test]
1328 async fn excludes_matches() {
1329 let matches = create_matches(vec![("PRIORITY", "INFO"), ("PRIORITY", "DEBUG")]);
1330 let received = run_journal(HashMap::new(), matches, None, false).await;
1331 assert_eq!(received.len(), 5);
1332 assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140003000));
1333 assert_eq!(timestamp(&received[1]), value_ts(1578529839, 140004000));
1334 assert_eq!(timestamp(&received[2]), value_ts(1578529839, 140005000));
1335 assert_eq!(timestamp(&received[3]), value_ts(1578529839, 140005000));
1336 assert_eq!(timestamp(&received[4]), value_ts(1578529839, 140006000));
1337 }
1338
1339 #[tokio::test]
1340 async fn handles_checkpoint() {
1341 let received = run_with_units(&[], &[], Some("1")).await;
1342 assert_eq!(received.len(), 7);
1343 assert_eq!(message(&received[0]), Value::Bytes("unit message".into()));
1344 assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140002000));
1345 }
1346
1347 #[tokio::test]
1348 async fn parses_array_messages() {
1349 let received = run_with_units(&["badunit.service"], &[], None).await;
1350 assert_eq!(received.len(), 1);
1351 assert_eq!(message(&received[0]), Value::Bytes("¿Hello?".into()));
1352 }
1353
1354 #[tokio::test]
1355 async fn parses_array_fields() {
1356 let received = run_with_units(&["syslog.service"], &[], None).await;
1357 assert_eq!(received.len(), 1);
1358 assert_eq!(
1359 received[0].as_log()["SYSLOG_RAW"],
1360 Value::Bytes("¿World?".into())
1361 );
1362 }
1363
1364 #[tokio::test]
1365 async fn parses_string_sequences() {
1366 let received = run_with_units(&["NetworkManager.service"], &[], None).await;
1367 assert_eq!(received.len(), 1);
1368 assert_eq!(
1369 received[0].as_log()["SYSLOG_FACILITY"],
1370 Value::Bytes(r#"["DHCP4","DHCP6"]"#.into())
1371 );
1372 }
1373
1374 #[tokio::test]
1375 async fn handles_missing_timestamp() {
1376 let received = run_with_units(&["stdout"], &[], None).await;
1377 assert_eq!(received.len(), 2);
1378 assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140004000));
1379 assert_eq!(timestamp(&received[1]), value_ts(1578529839, 140005000));
1380 }
1381
1382 #[tokio::test]
1383 async fn handles_acknowledgements() {
1384 let (tx, mut rx) = SourceSender::new_test();
1385
1386 let tempdir = tempdir().unwrap();
1387 let tempdir = tempdir.path().to_path_buf();
1388 let mut checkpoint_path = tempdir.clone();
1389 checkpoint_path.push(TEST_COMPONENT);
1390 fs::create_dir(&checkpoint_path).unwrap();
1391 checkpoint_path.push(CHECKPOINT_FILENAME);
1392
1393 let mut checkpointer = Checkpointer::new(checkpoint_path.clone())
1394 .await
1395 .expect("Creating checkpointer failed!");
1396
1397 let config = JournaldConfig {
1398 journalctl_path: Some(TEST_JOURNALCTL.into()),
1399 data_dir: Some(tempdir),
1400 remap_priority: true,
1401 acknowledgements: true.into(),
1402 ..Default::default()
1403 };
1404 let (cx, _shutdown) = SourceContext::new_shutdown(&ComponentKey::from(TEST_COMPONENT), tx);
1405 let source = config.build(cx).await.unwrap();
1406 tokio::spawn(async move { source.await.unwrap() });
1407
1408 assert_eq!(checkpointer.get().await.unwrap(), None);
1410
1411 sleep(Duration::from_secs(1)).await;
1413
1414 let mut count = 0;
1416 while let Poll::Ready(Some(event)) = futures::poll!(rx.next()) {
1417 assert_eq!(checkpointer.get().await.unwrap(), None);
1419 event.metadata().update_status(EventStatus::Delivered);
1420 count += 1;
1421 }
1422 assert_eq!(count, 8);
1423
1424 sleep(Duration::from_millis(100)).await;
1425 assert_eq!(checkpointer.get().await.unwrap().as_deref(), Some("8"));
1426 }
1427
1428 #[test]
1429 fn filter_matches_works_correctly() {
1430 let empty: Matches = HashMap::new();
1431 let includes = create_unit_matches(vec!["one", "two"]);
1432 let excludes = create_unit_matches(vec!["foo", "bar"]);
1433
1434 let zero = HashMap::new();
1435 assert!(!filter_matches(&zero, &empty, &empty));
1436 assert!(filter_matches(&zero, &includes, &empty));
1437 assert!(!filter_matches(&zero, &empty, &excludes));
1438 assert!(filter_matches(&zero, &includes, &excludes));
1439 let mut one = HashMap::new();
1440 one.insert(String::from(SYSTEMD_UNIT), String::from("one"));
1441 assert!(!filter_matches(&one, &empty, &empty));
1442 assert!(!filter_matches(&one, &includes, &empty));
1443 assert!(!filter_matches(&one, &empty, &excludes));
1444 assert!(!filter_matches(&one, &includes, &excludes));
1445 let mut two = HashMap::new();
1446 two.insert(String::from(SYSTEMD_UNIT), String::from("bar"));
1447 assert!(!filter_matches(&two, &empty, &empty));
1448 assert!(filter_matches(&two, &includes, &empty));
1449 assert!(filter_matches(&two, &empty, &excludes));
1450 assert!(filter_matches(&two, &includes, &excludes));
1451 }
1452
1453 #[test]
1454 fn merges_units_and_matches_option() {
1455 let include_units = vec!["one", "two"].into_iter().map(String::from).collect();
1456 let include_matches = create_matches(vec![
1457 ("_SYSTEMD_UNIT", "three.service"),
1458 ("_TRANSPORT", "kernel"),
1459 ]);
1460
1461 let exclude_units = vec!["foo", "bar"].into_iter().map(String::from).collect();
1462 let exclude_matches = create_matches(vec![
1463 ("_SYSTEMD_UNIT", "baz.service"),
1464 ("PRIORITY", "DEBUG"),
1465 ]);
1466
1467 let journald_config = JournaldConfig {
1468 include_units,
1469 include_matches,
1470 exclude_units,
1471 exclude_matches,
1472 ..Default::default()
1473 };
1474
1475 let hashset =
1476 |v: &[&str]| -> HashSet<String> { v.iter().copied().map(String::from).collect() };
1477
1478 let matches = journald_config.merged_include_matches();
1479 let units = matches.get("_SYSTEMD_UNIT").unwrap();
1480 assert_eq!(
1481 units,
1482 &hashset(&["one.service", "two.service", "three.service"])
1483 );
1484 let units = matches.get("_TRANSPORT").unwrap();
1485 assert_eq!(units, &hashset(&["kernel"]));
1486
1487 let matches = journald_config.merged_exclude_matches();
1488 let units = matches.get("_SYSTEMD_UNIT").unwrap();
1489 assert_eq!(
1490 units,
1491 &hashset(&["foo.service", "bar.service", "baz.service"])
1492 );
1493 let units = matches.get("PRIORITY").unwrap();
1494 assert_eq!(units, &hashset(&["DEBUG"]));
1495 }
1496
1497 #[test]
1498 fn find_duplicate_match_works_correctly() {
1499 let include_matches = create_matches(vec![("_TRANSPORT", "kernel")]);
1500 let exclude_matches = create_matches(vec![("_TRANSPORT", "kernel")]);
1501 let (field, value) = find_duplicate_match(&include_matches, &exclude_matches).unwrap();
1502 assert_eq!(field, "_TRANSPORT");
1503 assert_eq!(value, "kernel");
1504
1505 let empty = HashMap::new();
1506 let actual = find_duplicate_match(&empty, &empty);
1507 assert!(actual.is_none());
1508
1509 let actual = find_duplicate_match(&include_matches, &empty);
1510 assert!(actual.is_none());
1511
1512 let actual = find_duplicate_match(&empty, &exclude_matches);
1513 assert!(actual.is_none());
1514 }
1515
1516 #[test]
1517 fn command_options() {
1518 let path = PathBuf::from("journalctl");
1519
1520 let journal_dir = None;
1521 let journal_namespace = None;
1522 let current_boot_only = false;
1523 let cursor = None;
1524 let since_now = false;
1525 let extra_args = vec![];
1526
1527 let command = create_command(
1528 &path,
1529 journal_dir,
1530 journal_namespace,
1531 current_boot_only,
1532 since_now,
1533 cursor,
1534 extra_args,
1535 );
1536 let cmd_line = format!("{command:?}");
1537 assert!(!cmd_line.contains("--directory="));
1538 assert!(!cmd_line.contains("--namespace="));
1539 assert!(!cmd_line.contains("--boot"));
1540 assert!(cmd_line.contains("--since=2000-01-01"));
1541
1542 let journal_dir = None;
1543 let journal_namespace = None;
1544 let since_now = true;
1545 let extra_args = vec![];
1546
1547 let command = create_command(
1548 &path,
1549 journal_dir,
1550 journal_namespace,
1551 current_boot_only,
1552 since_now,
1553 cursor,
1554 extra_args,
1555 );
1556 let cmd_line = format!("{command:?}");
1557 assert!(cmd_line.contains("--since=now"));
1558
1559 let journal_dir = Some(PathBuf::from("/tmp/journal-dir"));
1560 let journal_namespace = Some(String::from("my_namespace"));
1561 let current_boot_only = true;
1562 let cursor = Some("2021-01-01");
1563 let extra_args = vec!["--merge".to_string()];
1564
1565 let command = create_command(
1566 &path,
1567 journal_dir,
1568 journal_namespace,
1569 current_boot_only,
1570 since_now,
1571 cursor,
1572 extra_args,
1573 );
1574 let cmd_line = format!("{command:?}");
1575 assert!(cmd_line.contains("--directory=/tmp/journal-dir"));
1576 assert!(cmd_line.contains("--namespace=my_namespace"));
1577 assert!(cmd_line.contains("--boot"));
1578 assert!(cmd_line.contains("--after-cursor="));
1579 assert!(cmd_line.contains("--merge"));
1580 }
1581
1582 fn create_command(
1583 path: &Path,
1584 journal_dir: Option<PathBuf>,
1585 journal_namespace: Option<String>,
1586 current_boot_only: bool,
1587 since_now: bool,
1588 cursor: Option<&str>,
1589 extra_args: Vec<String>,
1590 ) -> Command {
1591 StartJournalctl::new(
1592 path.into(),
1593 journal_dir,
1594 journal_namespace,
1595 current_boot_only,
1596 since_now,
1597 extra_args,
1598 )
1599 .make_command(cursor)
1600 }
1601
1602 fn message(event: &Event) -> Value {
1603 event.as_log()[log_schema().message_key().unwrap().to_string()].clone()
1604 }
1605
1606 fn timestamp(event: &Event) -> Value {
1607 event.as_log()[log_schema().timestamp_key().unwrap().to_string()].clone()
1608 }
1609
1610 fn cursor(event: &Event) -> Value {
1611 event.as_log()[CURSOR].clone()
1612 }
1613
1614 fn value_ts(secs: i64, usecs: u32) -> Value {
1615 Value::Timestamp(
1616 chrono::Utc
1617 .timestamp_opt(secs, usecs)
1618 .single()
1619 .expect("invalid timestamp"),
1620 )
1621 }
1622
1623 fn priority(event: &Event) -> Value {
1624 event.as_log()["PRIORITY"].clone()
1625 }
1626
1627 #[test]
1628 fn output_schema_definition_vector_namespace() {
1629 let config = JournaldConfig {
1630 log_namespace: Some(true),
1631 ..Default::default()
1632 };
1633
1634 let definitions = config
1635 .outputs(LogNamespace::Vector)
1636 .remove(0)
1637 .schema_definition(true);
1638
1639 let expected_definition =
1640 Definition::new_with_default_metadata(Kind::bytes().or_null(), [LogNamespace::Vector])
1641 .with_metadata_field(
1642 &owned_value_path!("vector", "source_type"),
1643 Kind::bytes(),
1644 None,
1645 )
1646 .with_metadata_field(
1647 &owned_value_path!("vector", "ingest_timestamp"),
1648 Kind::timestamp(),
1649 None,
1650 )
1651 .with_metadata_field(
1652 &owned_value_path!(JournaldConfig::NAME, "metadata"),
1653 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1654 None,
1655 )
1656 .with_metadata_field(
1657 &owned_value_path!(JournaldConfig::NAME, "timestamp"),
1658 Kind::timestamp().or_undefined(),
1659 Some("timestamp"),
1660 )
1661 .with_metadata_field(
1662 &owned_value_path!(JournaldConfig::NAME, "host"),
1663 Kind::bytes().or_undefined(),
1664 Some("host"),
1665 );
1666
1667 assert_eq!(definitions, Some(expected_definition))
1668 }
1669
1670 #[test]
1671 fn output_schema_definition_legacy_namespace() {
1672 let config = JournaldConfig::default();
1673
1674 let definitions = config
1675 .outputs(LogNamespace::Legacy)
1676 .remove(0)
1677 .schema_definition(true);
1678
1679 let expected_definition = Definition::new_with_default_metadata(
1680 Kind::object(Collection::empty()),
1681 [LogNamespace::Legacy],
1682 )
1683 .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
1684 .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
1685 .with_event_field(
1686 &owned_value_path!("host"),
1687 Kind::bytes().or_undefined(),
1688 Some("host"),
1689 )
1690 .unknown_fields(Kind::bytes());
1691
1692 assert_eq!(definitions, Some(expected_definition))
1693 }
1694
1695 fn matches_schema(config: &JournaldConfig, namespace: LogNamespace) {
1696 let record = r#"{
1697 "PRIORITY":"6",
1698 "SYSLOG_FACILITY":"3",
1699 "SYSLOG_IDENTIFIER":"ntpd",
1700 "_BOOT_ID":"124c781146e841ae8d9b4590df8b9231",
1701 "_CAP_EFFECTIVE":"3fffffffff",
1702 "_CMDLINE":"ntpd: [priv]",
1703 "_COMM":"ntpd",
1704 "_EXE":"/usr/sbin/ntpd",
1705 "_GID":"0",
1706 "_MACHINE_ID":"c36e9ea52800a19d214cb71b53263a28",
1707 "_PID":"2156",
1708 "_STREAM_ID":"92c79f4b45c4457490ebdefece29995e",
1709 "_SYSTEMD_CGROUP":"/system.slice/ntpd.service",
1710 "_SYSTEMD_INVOCATION_ID":"496ad5cd046d48e29f37f559a6d176f8",
1711 "_SYSTEMD_SLICE":"system.slice",
1712 "_SYSTEMD_UNIT":"ntpd.service",
1713 "_TRANSPORT":"stdout",
1714 "_UID":"0",
1715 "__MONOTONIC_TIMESTAMP":"98694000446",
1716 "__REALTIME_TIMESTAMP":"1564173027000443",
1717 "host":"my-host.local",
1718 "message":"reply from 192.168.1.2: offset -0.001791 delay 0.000176, next query 1500s",
1719 "source_type":"journald"
1720 }"#;
1721
1722 let json: serde_json::Value = serde_json::from_str(record).unwrap();
1723 let mut event = Event::from(LogEvent::from(vrl::value::Value::from(json)));
1724
1725 event.as_mut_log().insert("timestamp", chrono::Utc::now());
1726
1727 let definitions = config.outputs(namespace).remove(0).schema_definition(true);
1728
1729 definitions.unwrap().assert_valid_for_event(&event);
1730 }
1731
1732 #[test]
1733 fn matches_schema_legacy() {
1734 let config = JournaldConfig::default();
1735
1736 matches_schema(&config, LogNamespace::Legacy)
1737 }
1738}