1use std::{
2 collections::{HashMap, HashSet},
3 io::SeekFrom,
4 path::PathBuf,
5 process::Stdio,
6 str::FromStr,
7 sync::{Arc, LazyLock},
8 time::Duration,
9};
10
11use bytes::Bytes;
12use chrono::{TimeZone, Utc};
13use futures::{StreamExt, poll, stream::BoxStream, task::Poll};
14use nix::{
15 sys::signal::{Signal, kill},
16 unistd::Pid,
17};
18use serde_json::{Error as JsonError, Value as JsonValue};
19use snafu::{ResultExt, Snafu};
20use tokio::{
21 fs::{File, OpenOptions},
22 io::{self, AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
23 process::{Child, Command},
24 sync::{Mutex, MutexGuard},
25 time::sleep,
26};
27use tokio_util::codec::FramedRead;
28use vector_lib::{
29 EstimatedJsonEncodedSizeOf,
30 codecs::{CharacterDelimitedDecoder, decoding::BoxedFramingError},
31 config::{LegacyKey, LogNamespace},
32 configurable::configurable_component,
33 finalizer::OrderedFinalizer,
34 internal_event::{
35 ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol, Registered,
36 },
37 lookup::{metadata_path, owned_value_path, path},
38 schema::Definition,
39};
40use vrl::{
41 event_path,
42 value::{Kind, Value, kind::Collection},
43};
44
45use crate::{
46 SourceSender,
47 config::{
48 DataType, SourceAcknowledgementsConfig, SourceConfig, SourceContext, SourceOutput,
49 log_schema,
50 },
51 event::{BatchNotifier, BatchStatus, BatchStatusReceiver, LogEvent},
52 internal_events::{
53 EventsReceived, JournaldCheckpointFileOpenError, JournaldCheckpointSetError,
54 JournaldInvalidRecordError, JournaldReadError, JournaldStartJournalctlError,
55 StreamClosedError,
56 },
57 serde::bool_or_struct,
58 shutdown::ShutdownSignal,
59};
60
61const BATCH_TIMEOUT: Duration = Duration::from_millis(10);
62
63const CHECKPOINT_FILENAME: &str = "checkpoint.txt";
64const CURSOR: &str = "__CURSOR";
65const HOSTNAME: &str = "_HOSTNAME";
66const MESSAGE: &str = "MESSAGE";
67const SYSTEMD_UNIT: &str = "_SYSTEMD_UNIT";
68const SOURCE_TIMESTAMP: &str = "_SOURCE_REALTIME_TIMESTAMP";
69const RECEIVED_TIMESTAMP: &str = "__REALTIME_TIMESTAMP";
70
71const BACKOFF_DURATION: Duration = Duration::from_secs(1);
72
73static JOURNALCTL: LazyLock<PathBuf> = LazyLock::new(|| "journalctl".into());
74
75#[derive(Debug, Snafu)]
76enum BuildError {
77 #[snafu(display("journalctl failed to execute: {}", source))]
78 JournalctlSpawn { source: io::Error },
79 #[snafu(display(
80 "The unit {:?} is duplicated in both include_units and exclude_units",
81 unit
82 ))]
83 DuplicatedUnit { unit: String },
84 #[snafu(display(
85 "The Journal field/value pair {:?}:{:?} is duplicated in both include_matches and exclude_matches.",
86 field,
87 value,
88 ))]
89 DuplicatedMatches { field: String, value: String },
90}
91
92type Matches = HashMap<String, HashSet<String>>;
93
94#[configurable_component(source("journald", "Collect logs from JournalD."))]
96#[derive(Clone, Debug)]
97#[serde(deny_unknown_fields)]
98pub struct JournaldConfig {
99 #[serde(default)]
101 pub since_now: bool,
102
103 #[serde(default = "crate::serde::default_true")]
105 pub current_boot_only: bool,
106
107 #[serde(default)]
117 #[configurable(metadata(docs::examples = "ntpd", docs::examples = "sysinit.target"))]
118 pub include_units: Vec<String>,
119
120 #[serde(default)]
125 #[configurable(metadata(docs::examples = "badservice", docs::examples = "sysinit.target"))]
126 pub exclude_units: Vec<String>,
127
128 #[serde(default)]
134 #[configurable(metadata(
135 docs::additional_props_description = "The set of field values to match in journal entries that are to be included."
136 ))]
137 #[configurable(metadata(docs::examples = "matches_examples()"))]
138 pub include_matches: Matches,
139
140 #[serde(default)]
145 #[configurable(metadata(
146 docs::additional_props_description = "The set of field values to match in journal entries that are to be excluded."
147 ))]
148 #[configurable(metadata(docs::examples = "matches_examples()"))]
149 pub exclude_matches: Matches,
150
151 #[serde(default)]
160 #[configurable(metadata(docs::examples = "/var/lib/vector"))]
161 #[configurable(metadata(docs::human_name = "Data Directory"))]
162 pub data_dir: Option<PathBuf>,
163
164 #[serde(default)]
168 #[configurable(metadata(docs::examples = "--merge"))]
169 pub extra_args: Vec<String>,
170
171 #[serde(default = "default_batch_size")]
175 #[configurable(metadata(docs::type_unit = "events"))]
176 pub batch_size: usize,
177
178 #[serde(default)]
182 pub journalctl_path: Option<PathBuf>,
183
184 #[serde(default)]
188 pub journal_directory: Option<PathBuf>,
189
190 #[serde(default)]
198 pub journal_namespace: Option<String>,
199
200 #[configurable(derived)]
201 #[serde(default, deserialize_with = "bool_or_struct")]
202 acknowledgements: SourceAcknowledgementsConfig,
203
204 #[serde(default)]
208 #[configurable(
209 deprecated = "This option has been deprecated, use the `remap` transform and `to_syslog_level` function instead."
210 )]
211 remap_priority: bool,
212
213 #[configurable(metadata(docs::hidden))]
215 #[serde(default)]
216 log_namespace: Option<bool>,
217
218 #[serde(default = "crate::serde::default_false")]
223 emit_cursor: bool,
224}
225
226const fn default_batch_size() -> usize {
227 16
228}
229
230fn matches_examples() -> HashMap<String, Vec<String>> {
231 HashMap::<_, _>::from_iter([
232 (
233 "_SYSTEMD_UNIT".to_owned(),
234 vec!["sshd.service".to_owned(), "ntpd.service".to_owned()],
235 ),
236 ("_TRANSPORT".to_owned(), vec!["kernel".to_owned()]),
237 ])
238}
239
240impl JournaldConfig {
241 fn merged_include_matches(&self) -> Matches {
242 Self::merge_units(&self.include_matches, &self.include_units)
243 }
244
245 fn merged_exclude_matches(&self) -> Matches {
246 Self::merge_units(&self.exclude_matches, &self.exclude_units)
247 }
248
249 fn merge_units(matches: &Matches, units: &[String]) -> Matches {
250 let mut matches = matches.clone();
251 for unit in units {
252 let entry = matches.entry(String::from(SYSTEMD_UNIT));
253 entry.or_default().insert(fixup_unit(unit));
254 }
255 matches
256 }
257
258 fn schema_definition(&self, log_namespace: LogNamespace) -> Definition {
260 let schema_definition = match log_namespace {
261 LogNamespace::Vector => Definition::new_with_default_metadata(
262 Kind::bytes().or_null(),
263 [LogNamespace::Vector],
264 ),
265 LogNamespace::Legacy => Definition::new_with_default_metadata(
266 Kind::object(Collection::empty()),
267 [LogNamespace::Legacy],
268 ),
269 };
270
271 let mut schema_definition = schema_definition
272 .with_standard_vector_source_metadata()
273 .with_source_metadata(
275 JournaldConfig::NAME,
276 None,
277 &owned_value_path!("metadata"),
278 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
279 None,
280 )
281 .with_source_metadata(
282 JournaldConfig::NAME,
283 None,
284 &owned_value_path!("timestamp"),
285 Kind::timestamp().or_undefined(),
286 Some("timestamp"),
287 )
288 .with_source_metadata(
289 JournaldConfig::NAME,
290 log_schema().host_key().cloned().map(LegacyKey::Overwrite),
291 &owned_value_path!("host"),
292 Kind::bytes().or_undefined(),
293 Some("host"),
294 );
295
296 if log_namespace == LogNamespace::Legacy {
298 schema_definition = schema_definition.unknown_fields(Kind::bytes());
299 }
300
301 schema_definition
302 }
303}
304
305impl Default for JournaldConfig {
306 fn default() -> Self {
307 Self {
308 since_now: false,
309 current_boot_only: true,
310 include_units: vec![],
311 exclude_units: vec![],
312 include_matches: Default::default(),
313 exclude_matches: Default::default(),
314 data_dir: None,
315 batch_size: default_batch_size(),
316 journalctl_path: None,
317 journal_directory: None,
318 journal_namespace: None,
319 extra_args: vec![],
320 acknowledgements: Default::default(),
321 remap_priority: false,
322 log_namespace: None,
323 emit_cursor: false,
324 }
325 }
326}
327
328impl_generate_config_from_default!(JournaldConfig);
329
330type Record = HashMap<String, String>;
331
332#[async_trait::async_trait]
333#[typetag::serde(name = "journald")]
334impl SourceConfig for JournaldConfig {
335 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
336 if self.remap_priority {
337 warn!(
338 "DEPRECATION, option `remap_priority` has been deprecated. Please use the `remap` transform and function `to_syslog_level` instead."
339 );
340 }
341
342 let data_dir = cx
343 .globals
344 .resolve_and_make_data_subdir(self.data_dir.as_ref(), cx.key.id())?;
346
347 if let Some(unit) = self
348 .include_units
349 .iter()
350 .find(|unit| self.exclude_units.contains(unit))
351 {
352 let unit = unit.into();
353 return Err(BuildError::DuplicatedUnit { unit }.into());
354 }
355
356 let include_matches = self.merged_include_matches();
357 let exclude_matches = self.merged_exclude_matches();
358
359 if let Some((field, value)) = find_duplicate_match(&include_matches, &exclude_matches) {
360 return Err(BuildError::DuplicatedMatches { field, value }.into());
361 }
362
363 let mut checkpoint_path = data_dir;
364 checkpoint_path.push(CHECKPOINT_FILENAME);
365
366 let journalctl_path = self
367 .journalctl_path
368 .clone()
369 .unwrap_or_else(|| JOURNALCTL.clone());
370
371 let starter = StartJournalctl::new(
372 journalctl_path,
373 self.journal_directory.clone(),
374 self.journal_namespace.clone(),
375 self.current_boot_only,
376 self.since_now,
377 self.extra_args.clone(),
378 );
379
380 let batch_size = self.batch_size;
381 let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
382 let log_namespace = cx.log_namespace(self.log_namespace);
383
384 Ok(Box::pin(
385 JournaldSource {
386 include_matches,
387 exclude_matches,
388 checkpoint_path,
389 batch_size,
390 remap_priority: self.remap_priority,
391 out: cx.out,
392 acknowledgements,
393 starter,
394 log_namespace,
395 emit_cursor: self.emit_cursor,
396 }
397 .run_shutdown(cx.shutdown),
398 ))
399 }
400
401 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
402 let schema_definition =
403 self.schema_definition(global_log_namespace.merge(self.log_namespace));
404
405 vec![SourceOutput::new_maybe_logs(
406 DataType::Log,
407 schema_definition,
408 )]
409 }
410
411 fn can_acknowledge(&self) -> bool {
412 true
413 }
414}
415
416struct JournaldSource {
417 include_matches: Matches,
418 exclude_matches: Matches,
419 checkpoint_path: PathBuf,
420 batch_size: usize,
421 remap_priority: bool,
422 out: SourceSender,
423 acknowledgements: bool,
424 starter: StartJournalctl,
425 log_namespace: LogNamespace,
426 emit_cursor: bool,
427}
428
429impl JournaldSource {
430 async fn run_shutdown(self, shutdown: ShutdownSignal) -> Result<(), ()> {
431 let checkpointer = StatefulCheckpointer::new(self.checkpoint_path.clone())
432 .await
433 .map_err(|error| {
434 emit!(JournaldCheckpointFileOpenError {
435 error,
436 path: self
437 .checkpoint_path
438 .to_str()
439 .unwrap_or("unknown")
440 .to_string(),
441 });
442 })?;
443
444 let checkpointer = SharedCheckpointer::new(checkpointer);
445 let finalizer = Finalizer::new(
446 self.acknowledgements,
447 checkpointer.clone(),
448 shutdown.clone(),
449 );
450
451 self.run(checkpointer, finalizer, shutdown).await;
452
453 Ok(())
454 }
455
456 async fn run(
457 mut self,
458 checkpointer: SharedCheckpointer,
459 finalizer: Finalizer,
460 mut shutdown: ShutdownSignal,
461 ) {
462 loop {
463 if matches!(poll!(&mut shutdown), Poll::Ready(_)) {
464 break;
465 }
466
467 info!("Starting journalctl.");
468 let cursor = checkpointer.lock().await.cursor.clone();
469 match self.starter.start(cursor.as_deref()) {
470 Ok((stdout_stream, stderr_stream, running)) => {
471 if !self
472 .run_stream(stdout_stream, stderr_stream, &finalizer, shutdown.clone())
473 .await
474 {
475 return;
476 }
477 drop(running);
479 }
480 Err(error) => {
481 emit!(JournaldStartJournalctlError { error });
482 }
483 }
484
485 tokio::select! {
488 _ = &mut shutdown => break,
489 _ = sleep(BACKOFF_DURATION) => (),
490 }
491 }
492 }
493
494 async fn run_stream<'a>(
497 &'a mut self,
498 mut stdout_stream: JournalStream,
499 stderr_stream: JournalStream,
500 finalizer: &'a Finalizer,
501 mut shutdown: ShutdownSignal,
502 ) -> bool {
503 let bytes_received = register!(BytesReceived::from(Protocol::from("journald")));
504 let events_received = register!(EventsReceived);
505
506 let stderr_handler = tokio::spawn(Self::handle_stderr(stderr_stream));
508
509 let batch_size = self.batch_size;
510 let result = loop {
511 let mut batch = Batch::new(self);
512
513 while batch.events.is_empty() {
516 let item = tokio::select! {
517 _ = &mut shutdown => {
518 stderr_handler.abort();
519 return false;
520 },
521 item = stdout_stream.next() => item,
522 };
523 if !batch.handle_next(item) {
524 stderr_handler.abort();
525 return true;
526 }
527 }
528
529 let timeout = tokio::time::sleep(BATCH_TIMEOUT);
530 tokio::pin!(timeout);
531
532 for _ in 1..batch_size {
533 tokio::select! {
534 _ = &mut timeout => break,
535 result = stdout_stream.next() => if !batch.handle_next(result) {
536 break;
537 }
538 }
539 }
540 if let Some(x) = batch
541 .finish(finalizer, &bytes_received, &events_received)
542 .await
543 {
544 break x;
545 }
546 };
547
548 stderr_handler.abort();
549 result
550 }
551
552 async fn handle_stderr(mut stderr_stream: JournalStream) {
554 while let Some(result) = stderr_stream.next().await {
555 match result {
556 Ok(line) => {
557 let line_str = String::from_utf8_lossy(&line);
558 let trimmed = line_str.trim();
559 if !trimmed.is_empty() {
560 warn!("Warning journalctl stderr: {trimmed}");
561 }
562 }
563 Err(err) => {
564 error!("Error reading journalctl stderr: {err}");
565 break;
566 }
567 }
568 }
569 }
570}
571
572struct Batch<'a> {
573 events: Vec<LogEvent>,
574 record_size: usize,
575 exiting: Option<bool>,
576 batch: Option<BatchNotifier>,
577 receiver: Option<BatchStatusReceiver>,
578 source: &'a mut JournaldSource,
579 cursor: Option<String>,
580}
581
582impl<'a> Batch<'a> {
583 fn new(source: &'a mut JournaldSource) -> Self {
584 let (batch, receiver) = BatchNotifier::maybe_new_with_receiver(source.acknowledgements);
585 Self {
586 events: Vec::new(),
587 record_size: 0,
588 exiting: None,
589 batch,
590 receiver,
591 source,
592 cursor: None,
593 }
594 }
595
596 fn handle_next(&mut self, result: Option<Result<Bytes, BoxedFramingError>>) -> bool {
597 match result {
598 None => {
599 warn!("Journalctl process stopped.");
600 self.exiting = Some(true);
601 false
602 }
603 Some(Err(error)) => {
604 emit!(JournaldReadError { error });
605 false
606 }
607 Some(Ok(bytes)) => {
608 match decode_record(&bytes, self.source.remap_priority) {
609 Ok(mut record) => {
610 if self.source.emit_cursor {
611 if let Some(tmp) = record.get(CURSOR) {
612 self.cursor = Some(tmp.clone());
613 }
614 } else if let Some(tmp) = record.remove(CURSOR) {
615 self.cursor = Some(tmp);
616 }
617
618 if !filter_matches(
619 &record,
620 &self.source.include_matches,
621 &self.source.exclude_matches,
622 ) {
623 self.record_size += bytes.len();
624
625 let mut event = create_log_event_from_record(
626 record,
627 &self.batch,
628 self.source.log_namespace,
629 );
630
631 enrich_log_event(&mut event, self.source.log_namespace);
632
633 self.events.push(event);
634 }
635 }
636 Err(error) => {
637 emit!(JournaldInvalidRecordError {
638 error,
639 text: String::from_utf8_lossy(&bytes).into_owned()
640 });
641 }
642 }
643 true
644 }
645 }
646 }
647
648 async fn finish(
649 mut self,
650 finalizer: &Finalizer,
651 bytes_received: &'a Registered<BytesReceived>,
652 events_received: &'a Registered<EventsReceived>,
653 ) -> Option<bool> {
654 drop(self.batch);
655
656 if self.record_size > 0 {
657 bytes_received.emit(ByteSize(self.record_size));
658 }
659
660 if !self.events.is_empty() {
661 let count = self.events.len();
662 let byte_size = self.events.estimated_json_encoded_size_of();
663 events_received.emit(CountByteSize(count, byte_size));
664
665 match self.source.out.send_batch(self.events).await {
666 Ok(_) => {
667 if let Some(cursor) = self.cursor {
668 finalizer.finalize(cursor, self.receiver).await;
669 }
670 }
671 Err(_) => {
672 emit!(StreamClosedError { count });
673 self.exiting = Some(false);
675 }
676 }
677 }
678 self.exiting
679 }
680}
681
682type JournalStream = BoxStream<'static, Result<Bytes, BoxedFramingError>>;
683
684struct StartJournalctl {
685 path: PathBuf,
686 journal_dir: Option<PathBuf>,
687 journal_namespace: Option<String>,
688 current_boot_only: bool,
689 since_now: bool,
690 extra_args: Vec<String>,
691}
692
693impl StartJournalctl {
694 const fn new(
695 path: PathBuf,
696 journal_dir: Option<PathBuf>,
697 journal_namespace: Option<String>,
698 current_boot_only: bool,
699 since_now: bool,
700 extra_args: Vec<String>,
701 ) -> Self {
702 Self {
703 path,
704 journal_dir,
705 journal_namespace,
706 current_boot_only,
707 since_now,
708 extra_args,
709 }
710 }
711
712 fn make_command(&self, checkpoint: Option<&str>) -> Command {
713 let mut command = Command::new(&self.path);
714 command.stdout(Stdio::piped());
715 command.stderr(Stdio::piped());
716 command.arg("--follow");
717 command.arg("--all");
718 command.arg("--show-cursor");
719 command.arg("--output=json");
720
721 if let Some(dir) = &self.journal_dir {
722 command.arg(format!("--directory={}", dir.display()));
723 }
724
725 if let Some(namespace) = &self.journal_namespace {
726 command.arg(format!("--namespace={namespace}"));
727 }
728
729 if self.current_boot_only {
730 command.arg("--boot");
731 }
732
733 if let Some(cursor) = checkpoint {
734 command.arg(format!("--after-cursor={cursor}"));
735 } else if self.since_now {
736 command.arg("--since=now");
737 } else {
738 command.arg("--since=2000-01-01");
740 }
741
742 if !self.extra_args.is_empty() {
743 command.args(&self.extra_args);
744 }
745
746 command
747 }
748
749 fn start(
750 &mut self,
751 checkpoint: Option<&str>,
752 ) -> crate::Result<(JournalStream, JournalStream, RunningJournalctl)> {
753 let mut command = self.make_command(checkpoint);
754
755 let mut child = command.spawn().context(JournalctlSpawnSnafu)?;
756
757 let stdout_stream = FramedRead::new(
758 child.stdout.take().unwrap(),
759 CharacterDelimitedDecoder::new(b'\n'),
760 );
761
762 let stderr = child.stderr.take().unwrap();
763 let stderr_stream = FramedRead::new(stderr, CharacterDelimitedDecoder::new(b'\n'));
764
765 Ok((
766 stdout_stream.boxed(),
767 stderr_stream.boxed(),
768 RunningJournalctl(child),
769 ))
770 }
771}
772
773struct RunningJournalctl(Child);
774
775impl Drop for RunningJournalctl {
776 fn drop(&mut self) {
777 if let Some(pid) = self.0.id().and_then(|pid| pid.try_into().ok()) {
778 _ = kill(Pid::from_raw(pid), Signal::SIGTERM);
779 }
780 }
781}
782
783fn enrich_log_event(log: &mut LogEvent, log_namespace: LogNamespace) {
784 match log_namespace {
785 LogNamespace::Vector => {
786 if let Some(host) = log
787 .get(metadata_path!(JournaldConfig::NAME, "metadata"))
788 .and_then(|meta| meta.get(HOSTNAME))
789 {
790 log.insert(metadata_path!(JournaldConfig::NAME, "host"), host.clone());
791 }
792 }
793 LogNamespace::Legacy => {
794 if let Some(host) = log.remove(event_path!(HOSTNAME)) {
795 log_namespace.insert_source_metadata(
796 JournaldConfig::NAME,
797 log,
798 log_schema().host_key().map(LegacyKey::Overwrite),
799 path!("host"),
800 host,
801 );
802 }
803 }
804 }
805
806 let timestamp_value = match log_namespace {
808 LogNamespace::Vector => log
809 .get(metadata_path!(JournaldConfig::NAME, "metadata"))
810 .and_then(|meta| {
811 meta.get(SOURCE_TIMESTAMP)
812 .or_else(|| meta.get(RECEIVED_TIMESTAMP))
813 }),
814 LogNamespace::Legacy => log
815 .get(event_path!(SOURCE_TIMESTAMP))
816 .or_else(|| log.get(event_path!(RECEIVED_TIMESTAMP))),
817 };
818
819 let timestamp = timestamp_value
820 .filter(|&ts| ts.is_bytes())
821 .and_then(|ts| ts.as_str().unwrap().parse::<u64>().ok())
822 .map(|ts| {
823 chrono::Utc
824 .timestamp_opt((ts / 1_000_000) as i64, (ts % 1_000_000) as u32 * 1_000)
825 .single()
826 .expect("invalid timestamp")
827 });
828
829 match log_namespace {
831 LogNamespace::Vector => {
832 log.insert(metadata_path!("vector", "ingest_timestamp"), Utc::now());
833
834 if let Some(ts) = timestamp {
835 log.insert(metadata_path!(JournaldConfig::NAME, "timestamp"), ts);
836 }
837 }
838 LogNamespace::Legacy => {
839 if let Some(ts) = timestamp {
840 log.maybe_insert(log_schema().timestamp_key_target_path(), ts);
841 }
842 }
843 }
844
845 log_namespace.insert_vector_metadata(
847 log,
848 log_schema().source_type_key(),
849 path!("source_type"),
850 JournaldConfig::NAME,
851 );
852}
853
854fn create_log_event_from_record(
855 mut record: Record,
856 batch: &Option<BatchNotifier>,
857 log_namespace: LogNamespace,
858) -> LogEvent {
859 match log_namespace {
860 LogNamespace::Vector => {
861 let message_value = record
862 .remove(MESSAGE)
863 .map(|msg| Value::Bytes(Bytes::from(msg)))
864 .unwrap_or(Value::Null);
865
866 let mut log = LogEvent::from(message_value).with_batch_notifier_option(batch);
867
868 record.iter().for_each(|(key, value)| {
870 log.metadata_mut()
871 .value_mut()
872 .insert(path!(JournaldConfig::NAME, "metadata", key), value.as_str());
873 });
874
875 log
876 }
877 LogNamespace::Legacy => {
878 let mut log = LogEvent::from_iter(record).with_batch_notifier_option(batch);
879
880 if let Some(message) = log.remove(event_path!(MESSAGE)) {
881 log.maybe_insert(log_schema().message_key_target_path(), message);
882 }
883
884 log
885 }
886 }
887}
888
889fn fixup_unit(unit: &str) -> String {
892 if unit.contains('.') {
893 unit.into()
894 } else {
895 format!("{unit}.service")
896 }
897}
898
899fn decode_record(line: &[u8], remap: bool) -> Result<Record, JsonError> {
900 let mut record = serde_json::from_str::<JsonValue>(&String::from_utf8_lossy(line))?;
901 if let Some(record) = record.as_object_mut() {
904 for (_, value) in record.iter_mut().filter(|(_, v)| v.is_array()) {
905 *value = decode_array(value.as_array().expect("already validated"));
906 }
907 }
908 if remap {
909 record.get_mut("PRIORITY").map(remap_priority);
910 }
911 serde_json::from_value(record)
912}
913
914fn decode_array(array: &[JsonValue]) -> JsonValue {
915 decode_array_as_bytes(array).unwrap_or_else(|| {
916 let ser = serde_json::to_string(array).expect("already deserialized");
917 JsonValue::String(ser)
918 })
919}
920
921fn decode_array_as_bytes(array: &[JsonValue]) -> Option<JsonValue> {
922 array
926 .iter()
927 .map(|item| {
928 item.as_u64().and_then(|num| match num {
929 num if num <= u8::MAX as u64 => Some(num as u8),
930 _ => None,
931 })
932 })
933 .collect::<Option<Vec<u8>>>()
934 .map(|array| String::from_utf8_lossy(&array).into())
935}
936
937fn remap_priority(priority: &mut JsonValue) {
938 if let Some(num) = priority.as_str().and_then(|s| usize::from_str(s).ok()) {
939 let text = match num {
940 0 => "EMERG",
941 1 => "ALERT",
942 2 => "CRIT",
943 3 => "ERR",
944 4 => "WARNING",
945 5 => "NOTICE",
946 6 => "INFO",
947 7 => "DEBUG",
948 _ => "UNKNOWN",
949 };
950 *priority = JsonValue::String(text.into());
951 }
952}
953
954fn filter_matches(record: &Record, includes: &Matches, excludes: &Matches) -> bool {
955 match (includes.is_empty(), excludes.is_empty()) {
956 (true, true) => false,
957 (false, true) => !contains_match(record, includes),
958 (true, false) => contains_match(record, excludes),
959 (false, false) => !contains_match(record, includes) || contains_match(record, excludes),
960 }
961}
962
963fn contains_match(record: &Record, matches: &Matches) -> bool {
964 let f = move |(field, value)| {
965 matches
966 .get(field)
967 .map(|x| x.contains(value))
968 .unwrap_or(false)
969 };
970 record.iter().any(f)
971}
972
973fn find_duplicate_match(a_matches: &Matches, b_matches: &Matches) -> Option<(String, String)> {
974 for (a_key, a_values) in a_matches {
975 if let Some(b_values) = b_matches.get(a_key.as_str()) {
976 for (a, b) in a_values
977 .iter()
978 .flat_map(|x| std::iter::repeat(x).zip(b_values.iter()))
979 {
980 if a == b {
981 return Some((a_key.into(), b.into()));
982 }
983 }
984 }
985 }
986 None
987}
988
989enum Finalizer {
990 Sync(SharedCheckpointer),
991 Async(OrderedFinalizer<String>),
992}
993
994impl Finalizer {
995 fn new(
996 acknowledgements: bool,
997 checkpointer: SharedCheckpointer,
998 shutdown: ShutdownSignal,
999 ) -> Self {
1000 if acknowledgements {
1001 let (finalizer, mut ack_stream) = OrderedFinalizer::new(Some(shutdown));
1002 tokio::spawn(async move {
1003 while let Some((status, cursor)) = ack_stream.next().await {
1004 if status == BatchStatus::Delivered {
1005 checkpointer.lock().await.set(cursor).await;
1006 }
1007 }
1008 });
1009 Self::Async(finalizer)
1010 } else {
1011 Self::Sync(checkpointer)
1012 }
1013 }
1014
1015 async fn finalize(&self, cursor: String, receiver: Option<BatchStatusReceiver>) {
1016 match (self, receiver) {
1017 (Self::Sync(checkpointer), None) => checkpointer.lock().await.set(cursor).await,
1018 (Self::Async(finalizer), Some(receiver)) => finalizer.add(cursor, receiver),
1019 _ => {
1020 unreachable!("Cannot have async finalization without a receiver in journald source")
1021 }
1022 }
1023 }
1024}
1025
1026struct Checkpointer {
1027 file: File,
1028 filename: PathBuf,
1029}
1030
1031impl Checkpointer {
1032 async fn new(filename: PathBuf) -> Result<Self, io::Error> {
1033 let file = OpenOptions::new()
1034 .read(true)
1035 .write(true)
1036 .create(true)
1037 .truncate(false)
1038 .open(&filename)
1039 .await?;
1040 Ok(Checkpointer { file, filename })
1041 }
1042
1043 async fn set(&mut self, token: &str) -> Result<(), io::Error> {
1044 self.file.seek(SeekFrom::Start(0)).await?;
1045 self.file.write_all(format!("{token}\n").as_bytes()).await
1046 }
1047
1048 async fn get(&mut self) -> Result<Option<String>, io::Error> {
1049 let mut buf = Vec::<u8>::new();
1050 self.file.seek(SeekFrom::Start(0)).await?;
1051 self.file.read_to_end(&mut buf).await?;
1052 match buf.len() {
1053 0 => Ok(None),
1054 _ => {
1055 let text = String::from_utf8_lossy(&buf);
1056 match text.find('\n') {
1057 Some(nl) => Ok(Some(String::from(&text[..nl]))),
1058 None => Ok(None), }
1060 }
1061 }
1062 }
1063}
1064
1065struct StatefulCheckpointer {
1066 checkpointer: Checkpointer,
1067 cursor: Option<String>,
1068}
1069
1070impl StatefulCheckpointer {
1071 async fn new(filename: PathBuf) -> Result<Self, io::Error> {
1072 let mut checkpointer = Checkpointer::new(filename).await?;
1073 let cursor = checkpointer.get().await?;
1074 Ok(Self {
1075 checkpointer,
1076 cursor,
1077 })
1078 }
1079
1080 async fn set(&mut self, token: impl Into<String>) {
1081 let token = token.into();
1082 if let Err(error) = self.checkpointer.set(&token).await {
1083 emit!(JournaldCheckpointSetError {
1084 error,
1085 filename: self
1086 .checkpointer
1087 .filename
1088 .to_str()
1089 .unwrap_or("unknown")
1090 .to_string(),
1091 });
1092 }
1093 self.cursor = Some(token);
1094 }
1095}
1096
1097#[derive(Clone)]
1098struct SharedCheckpointer(Arc<Mutex<StatefulCheckpointer>>);
1099
1100impl SharedCheckpointer {
1101 fn new(c: StatefulCheckpointer) -> Self {
1102 Self(Arc::new(Mutex::new(c)))
1103 }
1104
1105 async fn lock(&self) -> MutexGuard<'_, StatefulCheckpointer> {
1106 self.0.lock().await
1107 }
1108}
1109
1110#[cfg(test)]
1111mod checkpointer_tests {
1112 use tempfile::tempdir;
1113 use tokio::fs::read_to_string;
1114
1115 use super::*;
1116
1117 #[test]
1118 fn generate_config() {
1119 crate::test_util::test_generate_config::<JournaldConfig>();
1120 }
1121
1122 #[tokio::test]
1123 async fn journald_checkpointer_works() {
1124 let tempdir = tempdir().unwrap();
1125 let mut filename = tempdir.path().to_path_buf();
1126 filename.push(CHECKPOINT_FILENAME);
1127 let mut checkpointer = Checkpointer::new(filename.clone())
1128 .await
1129 .expect("Creating checkpointer failed!");
1130
1131 assert!(checkpointer.get().await.unwrap().is_none());
1132
1133 checkpointer
1134 .set("first test")
1135 .await
1136 .expect("Setting checkpoint failed");
1137 assert_eq!(checkpointer.get().await.unwrap().unwrap(), "first test");
1138 let contents = read_to_string(filename.clone())
1139 .await
1140 .unwrap_or_else(|_| panic!("Failed to read: {filename:?}"));
1141 assert!(contents.starts_with("first test\n"));
1142
1143 checkpointer
1144 .set("second")
1145 .await
1146 .expect("Setting checkpoint failed");
1147 assert_eq!(checkpointer.get().await.unwrap().unwrap(), "second");
1148 let contents = read_to_string(filename.clone())
1149 .await
1150 .unwrap_or_else(|_| panic!("Failed to read: {filename:?}"));
1151 assert!(contents.starts_with("second\n"));
1152 }
1153}
1154
1155#[cfg(test)]
1156mod tests {
1157 use std::{fs, path::Path};
1158
1159 use tempfile::tempdir;
1160 use tokio::time::{Duration, Instant, sleep, timeout};
1161 use vrl::value::{Value, kind::Collection};
1162
1163 use super::*;
1164 use crate::{
1165 config::ComponentKey,
1166 event::{Event, EventStatus},
1167 test_util::components::assert_source_compliance,
1168 };
1169
1170 const TEST_COMPONENT: &str = "journald-test";
1171 const TEST_JOURNALCTL: &str = "tests/data/journalctl";
1172
1173 async fn run_with_units(iunits: &[&str], xunits: &[&str], cursor: Option<&str>) -> Vec<Event> {
1174 let include_matches = create_unit_matches(iunits.to_vec());
1175 let exclude_matches = create_unit_matches(xunits.to_vec());
1176 run_journal(include_matches, exclude_matches, cursor, false).await
1177 }
1178
1179 async fn run_journal(
1180 include_matches: Matches,
1181 exclude_matches: Matches,
1182 checkpoint: Option<&str>,
1183 emit_cursor: bool,
1184 ) -> Vec<Event> {
1185 assert_source_compliance(&["protocol"], async move {
1186 let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Delivered);
1187
1188 let tempdir = tempdir().unwrap();
1189 let tempdir = tempdir.path().to_path_buf();
1190
1191 if let Some(cursor) = checkpoint {
1192 let mut checkpoint_path = tempdir.clone();
1193 checkpoint_path.push(TEST_COMPONENT);
1194 fs::create_dir(&checkpoint_path).unwrap();
1195 checkpoint_path.push(CHECKPOINT_FILENAME);
1196
1197 let mut checkpointer = Checkpointer::new(checkpoint_path.clone())
1198 .await
1199 .expect("Creating checkpointer failed!");
1200
1201 checkpointer
1202 .set(cursor)
1203 .await
1204 .expect("Could not set checkpoint");
1205 }
1206
1207 let (cx, shutdown) =
1208 SourceContext::new_shutdown(&ComponentKey::from(TEST_COMPONENT), tx);
1209 let config = JournaldConfig {
1210 journalctl_path: Some(TEST_JOURNALCTL.into()),
1211 include_matches,
1212 exclude_matches,
1213 data_dir: Some(tempdir),
1214 remap_priority: true,
1215 acknowledgements: false.into(),
1216 emit_cursor,
1217 ..Default::default()
1218 };
1219 let source = config.build(cx).await.unwrap();
1220 tokio::spawn(async move { source.await.unwrap() });
1221
1222 sleep(Duration::from_secs(1)).await;
1224 shutdown
1225 .shutdown_all(Some(Instant::now() + Duration::from_secs(1)))
1226 .await;
1227
1228 timeout(Duration::from_secs(1), rx.collect()).await.unwrap()
1229 })
1230 .await
1231 }
1232
1233 fn create_unit_matches<S: Into<String>>(units: Vec<S>) -> Matches {
1234 let units: HashSet<String> = units.into_iter().map(Into::into).collect();
1235 let mut map = HashMap::new();
1236 if !units.is_empty() {
1237 map.insert(String::from(SYSTEMD_UNIT), units);
1238 }
1239 map
1240 }
1241
1242 fn create_matches<S: Into<String>>(conditions: Vec<(S, S)>) -> Matches {
1243 let mut matches: Matches = HashMap::new();
1244 for (field, value) in conditions {
1245 matches
1246 .entry(field.into())
1247 .or_default()
1248 .insert(value.into());
1249 }
1250 matches
1251 }
1252
1253 #[tokio::test]
1254 async fn reads_journal() {
1255 let received = run_with_units(&[], &[], None).await;
1256 assert_eq!(received.len(), 8);
1257 assert_eq!(
1258 message(&received[0]),
1259 Value::Bytes("System Initialization".into())
1260 );
1261 assert_eq!(
1262 received[0].as_log()[log_schema().source_type_key().unwrap().to_string()],
1263 "journald".into()
1264 );
1265 assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140001000));
1266 assert_eq!(priority(&received[0]), Value::Bytes("INFO".into()));
1267 assert_eq!(message(&received[1]), Value::Bytes("unit message".into()));
1268 assert_eq!(timestamp(&received[1]), value_ts(1578529839, 140002000));
1269 assert_eq!(priority(&received[1]), Value::Bytes("DEBUG".into()));
1270 }
1271
1272 #[tokio::test]
1273 async fn includes_units() {
1274 let received = run_with_units(&["unit.service"], &[], None).await;
1275 assert_eq!(received.len(), 1);
1276 assert_eq!(message(&received[0]), Value::Bytes("unit message".into()));
1277 }
1278
1279 #[tokio::test]
1280 async fn excludes_units() {
1281 let received = run_with_units(&[], &["unit.service", "badunit.service"], None).await;
1282 assert_eq!(received.len(), 6);
1283 assert_eq!(
1284 message(&received[0]),
1285 Value::Bytes("System Initialization".into())
1286 );
1287 assert_eq!(
1288 message(&received[1]),
1289 Value::Bytes("Missing timestamp".into())
1290 );
1291 assert_eq!(
1292 message(&received[2]),
1293 Value::Bytes("Different timestamps".into())
1294 );
1295 }
1296
1297 #[tokio::test]
1298 async fn emits_cursor() {
1299 let received = run_journal(Matches::new(), Matches::new(), None, true).await;
1300 assert_eq!(cursor(&received[0]), Value::Bytes("1".into()));
1301 assert_eq!(cursor(&received[3]), Value::Bytes("4".into()));
1302 assert_eq!(cursor(&received[7]), Value::Bytes("8".into()));
1303 }
1304
1305 #[tokio::test]
1306 async fn includes_matches() {
1307 let matches = create_matches(vec![("PRIORITY", "ERR")]);
1308 let received = run_journal(matches, HashMap::new(), None, false).await;
1309 assert_eq!(received.len(), 2);
1310 assert_eq!(
1311 message(&received[0]),
1312 Value::Bytes("Different timestamps".into())
1313 );
1314 assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140005000));
1315 assert_eq!(
1316 message(&received[1]),
1317 Value::Bytes("Non-ASCII in other field".into())
1318 );
1319 assert_eq!(timestamp(&received[1]), value_ts(1578529839, 140005000));
1320 }
1321
1322 #[tokio::test]
1323 async fn includes_kernel() {
1324 let matches = create_matches(vec![("_TRANSPORT", "kernel")]);
1325 let received = run_journal(matches, HashMap::new(), None, false).await;
1326 assert_eq!(received.len(), 1);
1327 assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140006000));
1328 assert_eq!(message(&received[0]), Value::Bytes("audit log".into()));
1329 }
1330
1331 #[tokio::test]
1332 async fn excludes_matches() {
1333 let matches = create_matches(vec![("PRIORITY", "INFO"), ("PRIORITY", "DEBUG")]);
1334 let received = run_journal(HashMap::new(), matches, None, false).await;
1335 assert_eq!(received.len(), 5);
1336 assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140003000));
1337 assert_eq!(timestamp(&received[1]), value_ts(1578529839, 140004000));
1338 assert_eq!(timestamp(&received[2]), value_ts(1578529839, 140005000));
1339 assert_eq!(timestamp(&received[3]), value_ts(1578529839, 140005000));
1340 assert_eq!(timestamp(&received[4]), value_ts(1578529839, 140006000));
1341 }
1342
1343 #[tokio::test]
1344 async fn handles_checkpoint() {
1345 let received = run_with_units(&[], &[], Some("1")).await;
1346 assert_eq!(received.len(), 7);
1347 assert_eq!(message(&received[0]), Value::Bytes("unit message".into()));
1348 assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140002000));
1349 }
1350
1351 #[tokio::test]
1352 async fn parses_array_messages() {
1353 let received = run_with_units(&["badunit.service"], &[], None).await;
1354 assert_eq!(received.len(), 1);
1355 assert_eq!(message(&received[0]), Value::Bytes("¿Hello?".into()));
1356 }
1357
1358 #[tokio::test]
1359 async fn parses_array_fields() {
1360 let received = run_with_units(&["syslog.service"], &[], None).await;
1361 assert_eq!(received.len(), 1);
1362 assert_eq!(
1363 received[0].as_log()["SYSLOG_RAW"],
1364 Value::Bytes("¿World?".into())
1365 );
1366 }
1367
1368 #[tokio::test]
1369 async fn parses_string_sequences() {
1370 let received = run_with_units(&["NetworkManager.service"], &[], None).await;
1371 assert_eq!(received.len(), 1);
1372 assert_eq!(
1373 received[0].as_log()["SYSLOG_FACILITY"],
1374 Value::Bytes(r#"["DHCP4","DHCP6"]"#.into())
1375 );
1376 }
1377
1378 #[tokio::test]
1379 async fn handles_missing_timestamp() {
1380 let received = run_with_units(&["stdout"], &[], None).await;
1381 assert_eq!(received.len(), 2);
1382 assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140004000));
1383 assert_eq!(timestamp(&received[1]), value_ts(1578529839, 140005000));
1384 }
1385
1386 #[tokio::test]
1387 async fn handles_acknowledgements() {
1388 let (tx, mut rx) = SourceSender::new_test();
1389
1390 let tempdir = tempdir().unwrap();
1391 let tempdir = tempdir.path().to_path_buf();
1392 let mut checkpoint_path = tempdir.clone();
1393 checkpoint_path.push(TEST_COMPONENT);
1394 fs::create_dir(&checkpoint_path).unwrap();
1395 checkpoint_path.push(CHECKPOINT_FILENAME);
1396
1397 let mut checkpointer = Checkpointer::new(checkpoint_path.clone())
1398 .await
1399 .expect("Creating checkpointer failed!");
1400
1401 let config = JournaldConfig {
1402 journalctl_path: Some(TEST_JOURNALCTL.into()),
1403 data_dir: Some(tempdir),
1404 remap_priority: true,
1405 acknowledgements: true.into(),
1406 ..Default::default()
1407 };
1408 let (cx, _shutdown) = SourceContext::new_shutdown(&ComponentKey::from(TEST_COMPONENT), tx);
1409 let source = config.build(cx).await.unwrap();
1410 tokio::spawn(async move { source.await.unwrap() });
1411
1412 assert_eq!(checkpointer.get().await.unwrap(), None);
1414
1415 sleep(Duration::from_secs(1)).await;
1417
1418 let mut count = 0;
1420 while let Poll::Ready(Some(event)) = futures::poll!(rx.next()) {
1421 assert_eq!(checkpointer.get().await.unwrap(), None);
1423 event.metadata().update_status(EventStatus::Delivered);
1424 count += 1;
1425 }
1426 assert_eq!(count, 8);
1427
1428 sleep(Duration::from_millis(100)).await;
1429 assert_eq!(checkpointer.get().await.unwrap().as_deref(), Some("8"));
1430 }
1431
1432 #[test]
1433 fn filter_matches_works_correctly() {
1434 let empty: Matches = HashMap::new();
1435 let includes = create_unit_matches(vec!["one", "two"]);
1436 let excludes = create_unit_matches(vec!["foo", "bar"]);
1437
1438 let zero = HashMap::new();
1439 assert!(!filter_matches(&zero, &empty, &empty));
1440 assert!(filter_matches(&zero, &includes, &empty));
1441 assert!(!filter_matches(&zero, &empty, &excludes));
1442 assert!(filter_matches(&zero, &includes, &excludes));
1443 let mut one = HashMap::new();
1444 one.insert(String::from(SYSTEMD_UNIT), String::from("one"));
1445 assert!(!filter_matches(&one, &empty, &empty));
1446 assert!(!filter_matches(&one, &includes, &empty));
1447 assert!(!filter_matches(&one, &empty, &excludes));
1448 assert!(!filter_matches(&one, &includes, &excludes));
1449 let mut two = HashMap::new();
1450 two.insert(String::from(SYSTEMD_UNIT), String::from("bar"));
1451 assert!(!filter_matches(&two, &empty, &empty));
1452 assert!(filter_matches(&two, &includes, &empty));
1453 assert!(filter_matches(&two, &empty, &excludes));
1454 assert!(filter_matches(&two, &includes, &excludes));
1455 }
1456
1457 #[test]
1458 fn merges_units_and_matches_option() {
1459 let include_units = vec!["one", "two"].into_iter().map(String::from).collect();
1460 let include_matches = create_matches(vec![
1461 ("_SYSTEMD_UNIT", "three.service"),
1462 ("_TRANSPORT", "kernel"),
1463 ]);
1464
1465 let exclude_units = vec!["foo", "bar"].into_iter().map(String::from).collect();
1466 let exclude_matches = create_matches(vec![
1467 ("_SYSTEMD_UNIT", "baz.service"),
1468 ("PRIORITY", "DEBUG"),
1469 ]);
1470
1471 let journald_config = JournaldConfig {
1472 include_units,
1473 include_matches,
1474 exclude_units,
1475 exclude_matches,
1476 ..Default::default()
1477 };
1478
1479 let hashset =
1480 |v: &[&str]| -> HashSet<String> { v.iter().copied().map(String::from).collect() };
1481
1482 let matches = journald_config.merged_include_matches();
1483 let units = matches.get("_SYSTEMD_UNIT").unwrap();
1484 assert_eq!(
1485 units,
1486 &hashset(&["one.service", "two.service", "three.service"])
1487 );
1488 let units = matches.get("_TRANSPORT").unwrap();
1489 assert_eq!(units, &hashset(&["kernel"]));
1490
1491 let matches = journald_config.merged_exclude_matches();
1492 let units = matches.get("_SYSTEMD_UNIT").unwrap();
1493 assert_eq!(
1494 units,
1495 &hashset(&["foo.service", "bar.service", "baz.service"])
1496 );
1497 let units = matches.get("PRIORITY").unwrap();
1498 assert_eq!(units, &hashset(&["DEBUG"]));
1499 }
1500
1501 #[test]
1502 fn find_duplicate_match_works_correctly() {
1503 let include_matches = create_matches(vec![("_TRANSPORT", "kernel")]);
1504 let exclude_matches = create_matches(vec![("_TRANSPORT", "kernel")]);
1505 let (field, value) = find_duplicate_match(&include_matches, &exclude_matches).unwrap();
1506 assert_eq!(field, "_TRANSPORT");
1507 assert_eq!(value, "kernel");
1508
1509 let empty = HashMap::new();
1510 let actual = find_duplicate_match(&empty, &empty);
1511 assert!(actual.is_none());
1512
1513 let actual = find_duplicate_match(&include_matches, &empty);
1514 assert!(actual.is_none());
1515
1516 let actual = find_duplicate_match(&empty, &exclude_matches);
1517 assert!(actual.is_none());
1518 }
1519
1520 #[test]
1521 fn command_options() {
1522 let path = PathBuf::from("journalctl");
1523
1524 let journal_dir = None;
1525 let journal_namespace = None;
1526 let current_boot_only = false;
1527 let cursor = None;
1528 let since_now = false;
1529 let extra_args = vec![];
1530
1531 let command = create_command(
1532 &path,
1533 journal_dir,
1534 journal_namespace,
1535 current_boot_only,
1536 since_now,
1537 cursor,
1538 extra_args,
1539 );
1540 let cmd_line = format!("{command:?}");
1541 assert!(!cmd_line.contains("--directory="));
1542 assert!(!cmd_line.contains("--namespace="));
1543 assert!(!cmd_line.contains("--boot"));
1544 assert!(cmd_line.contains("--since=2000-01-01"));
1545
1546 let journal_dir = None;
1547 let journal_namespace = None;
1548 let since_now = true;
1549 let extra_args = vec![];
1550
1551 let command = create_command(
1552 &path,
1553 journal_dir,
1554 journal_namespace,
1555 current_boot_only,
1556 since_now,
1557 cursor,
1558 extra_args,
1559 );
1560 let cmd_line = format!("{command:?}");
1561 assert!(cmd_line.contains("--since=now"));
1562
1563 let journal_dir = Some(PathBuf::from("/tmp/journal-dir"));
1564 let journal_namespace = Some(String::from("my_namespace"));
1565 let current_boot_only = true;
1566 let cursor = Some("2021-01-01");
1567 let extra_args = vec!["--merge".to_string()];
1568
1569 let command = create_command(
1570 &path,
1571 journal_dir,
1572 journal_namespace,
1573 current_boot_only,
1574 since_now,
1575 cursor,
1576 extra_args,
1577 );
1578 let cmd_line = format!("{command:?}");
1579 assert!(cmd_line.contains("--directory=/tmp/journal-dir"));
1580 assert!(cmd_line.contains("--namespace=my_namespace"));
1581 assert!(cmd_line.contains("--boot"));
1582 assert!(cmd_line.contains("--after-cursor="));
1583 assert!(cmd_line.contains("--merge"));
1584 }
1585
1586 fn create_command(
1587 path: &Path,
1588 journal_dir: Option<PathBuf>,
1589 journal_namespace: Option<String>,
1590 current_boot_only: bool,
1591 since_now: bool,
1592 cursor: Option<&str>,
1593 extra_args: Vec<String>,
1594 ) -> Command {
1595 StartJournalctl::new(
1596 path.into(),
1597 journal_dir,
1598 journal_namespace,
1599 current_boot_only,
1600 since_now,
1601 extra_args,
1602 )
1603 .make_command(cursor)
1604 }
1605
1606 fn message(event: &Event) -> Value {
1607 event.as_log()[log_schema().message_key().unwrap().to_string()].clone()
1608 }
1609
1610 fn timestamp(event: &Event) -> Value {
1611 event.as_log()[log_schema().timestamp_key().unwrap().to_string()].clone()
1612 }
1613
1614 fn cursor(event: &Event) -> Value {
1615 event.as_log()[CURSOR].clone()
1616 }
1617
1618 fn value_ts(secs: i64, usecs: u32) -> Value {
1619 Value::Timestamp(
1620 chrono::Utc
1621 .timestamp_opt(secs, usecs)
1622 .single()
1623 .expect("invalid timestamp"),
1624 )
1625 }
1626
1627 fn priority(event: &Event) -> Value {
1628 event.as_log()["PRIORITY"].clone()
1629 }
1630
1631 #[test]
1632 fn output_schema_definition_vector_namespace() {
1633 let config = JournaldConfig {
1634 log_namespace: Some(true),
1635 ..Default::default()
1636 };
1637
1638 let definitions = config
1639 .outputs(LogNamespace::Vector)
1640 .remove(0)
1641 .schema_definition(true);
1642
1643 let expected_definition =
1644 Definition::new_with_default_metadata(Kind::bytes().or_null(), [LogNamespace::Vector])
1645 .with_metadata_field(
1646 &owned_value_path!("vector", "source_type"),
1647 Kind::bytes(),
1648 None,
1649 )
1650 .with_metadata_field(
1651 &owned_value_path!("vector", "ingest_timestamp"),
1652 Kind::timestamp(),
1653 None,
1654 )
1655 .with_metadata_field(
1656 &owned_value_path!(JournaldConfig::NAME, "metadata"),
1657 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1658 None,
1659 )
1660 .with_metadata_field(
1661 &owned_value_path!(JournaldConfig::NAME, "timestamp"),
1662 Kind::timestamp().or_undefined(),
1663 Some("timestamp"),
1664 )
1665 .with_metadata_field(
1666 &owned_value_path!(JournaldConfig::NAME, "host"),
1667 Kind::bytes().or_undefined(),
1668 Some("host"),
1669 );
1670
1671 assert_eq!(definitions, Some(expected_definition))
1672 }
1673
1674 #[test]
1675 fn output_schema_definition_legacy_namespace() {
1676 let config = JournaldConfig::default();
1677
1678 let definitions = config
1679 .outputs(LogNamespace::Legacy)
1680 .remove(0)
1681 .schema_definition(true);
1682
1683 let expected_definition = Definition::new_with_default_metadata(
1684 Kind::object(Collection::empty()),
1685 [LogNamespace::Legacy],
1686 )
1687 .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
1688 .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
1689 .with_event_field(
1690 &owned_value_path!("host"),
1691 Kind::bytes().or_undefined(),
1692 Some("host"),
1693 )
1694 .unknown_fields(Kind::bytes());
1695
1696 assert_eq!(definitions, Some(expected_definition))
1697 }
1698
1699 fn matches_schema(config: &JournaldConfig, namespace: LogNamespace) {
1700 let record = r#"{
1701 "PRIORITY":"6",
1702 "SYSLOG_FACILITY":"3",
1703 "SYSLOG_IDENTIFIER":"ntpd",
1704 "_BOOT_ID":"124c781146e841ae8d9b4590df8b9231",
1705 "_CAP_EFFECTIVE":"3fffffffff",
1706 "_CMDLINE":"ntpd: [priv]",
1707 "_COMM":"ntpd",
1708 "_EXE":"/usr/sbin/ntpd",
1709 "_GID":"0",
1710 "_MACHINE_ID":"c36e9ea52800a19d214cb71b53263a28",
1711 "_PID":"2156",
1712 "_STREAM_ID":"92c79f4b45c4457490ebdefece29995e",
1713 "_SYSTEMD_CGROUP":"/system.slice/ntpd.service",
1714 "_SYSTEMD_INVOCATION_ID":"496ad5cd046d48e29f37f559a6d176f8",
1715 "_SYSTEMD_SLICE":"system.slice",
1716 "_SYSTEMD_UNIT":"ntpd.service",
1717 "_TRANSPORT":"stdout",
1718 "_UID":"0",
1719 "__MONOTONIC_TIMESTAMP":"98694000446",
1720 "__REALTIME_TIMESTAMP":"1564173027000443",
1721 "host":"my-host.local",
1722 "message":"reply from 192.168.1.2: offset -0.001791 delay 0.000176, next query 1500s",
1723 "source_type":"journald"
1724 }"#;
1725
1726 let json: serde_json::Value = serde_json::from_str(record).unwrap();
1727 let mut event = Event::from(LogEvent::from(vrl::value::Value::from(json)));
1728
1729 event.as_mut_log().insert("timestamp", chrono::Utc::now());
1730
1731 let definitions = config.outputs(namespace).remove(0).schema_definition(true);
1732
1733 definitions.unwrap().assert_valid_for_event(&event);
1734 }
1735
1736 #[test]
1737 fn matches_schema_legacy() {
1738 let config = JournaldConfig::default();
1739
1740 matches_schema(&config, LogNamespace::Legacy)
1741 }
1742}