1use std::{
2 collections::{HashMap, HashSet},
3 io::SeekFrom,
4 path::PathBuf,
5 process::Stdio,
6 str::FromStr,
7 sync::{Arc, LazyLock},
8 time::Duration,
9};
10
11use bytes::Bytes;
12use chrono::{TimeZone, Utc};
13use futures::{poll, stream::BoxStream, task::Poll, StreamExt};
14use nix::{
15 sys::signal::{kill, Signal},
16 unistd::Pid,
17};
18use serde_json::{Error as JsonError, Value as JsonValue};
19use snafu::{ResultExt, Snafu};
20use tokio::{
21 fs::{File, OpenOptions},
22 io::{self, AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
23 process::{Child, Command},
24 sync::{Mutex, MutexGuard},
25 time::sleep,
26};
27use tokio_util::codec::FramedRead;
28use vector_lib::codecs::{decoding::BoxedFramingError, CharacterDelimitedDecoder};
29use vector_lib::configurable::configurable_component;
30use vector_lib::lookup::{metadata_path, owned_value_path, path};
31use vector_lib::{
32 config::{LegacyKey, LogNamespace},
33 schema::Definition,
34 EstimatedJsonEncodedSizeOf,
35};
36use vector_lib::{
37 finalizer::OrderedFinalizer,
38 internal_event::{
39 ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol, Registered,
40 },
41};
42use vrl::event_path;
43use vrl::value::{kind::Collection, Kind, Value};
44
45use crate::{
46 config::{
47 log_schema, DataType, SourceAcknowledgementsConfig, SourceConfig, SourceContext,
48 SourceOutput,
49 },
50 event::{BatchNotifier, BatchStatus, BatchStatusReceiver, LogEvent},
51 internal_events::{
52 EventsReceived, JournaldCheckpointFileOpenError, JournaldCheckpointSetError,
53 JournaldInvalidRecordError, JournaldReadError, JournaldStartJournalctlError,
54 StreamClosedError,
55 },
56 serde::bool_or_struct,
57 shutdown::ShutdownSignal,
58 SourceSender,
59};
60
61const BATCH_TIMEOUT: Duration = Duration::from_millis(10);
62
63const CHECKPOINT_FILENAME: &str = "checkpoint.txt";
64const CURSOR: &str = "__CURSOR";
65const HOSTNAME: &str = "_HOSTNAME";
66const MESSAGE: &str = "MESSAGE";
67const SYSTEMD_UNIT: &str = "_SYSTEMD_UNIT";
68const SOURCE_TIMESTAMP: &str = "_SOURCE_REALTIME_TIMESTAMP";
69const RECEIVED_TIMESTAMP: &str = "__REALTIME_TIMESTAMP";
70
71const BACKOFF_DURATION: Duration = Duration::from_secs(1);
72
73static JOURNALCTL: LazyLock<PathBuf> = LazyLock::new(|| "journalctl".into());
74
75#[derive(Debug, Snafu)]
76enum BuildError {
77 #[snafu(display("journalctl failed to execute: {}", source))]
78 JournalctlSpawn { source: io::Error },
79 #[snafu(display(
80 "The unit {:?} is duplicated in both include_units and exclude_units",
81 unit
82 ))]
83 DuplicatedUnit { unit: String },
84 #[snafu(display(
85 "The Journal field/value pair {:?}:{:?} is duplicated in both include_matches and exclude_matches.",
86 field,
87 value,
88 ))]
89 DuplicatedMatches { field: String, value: String },
90}
91
92type Matches = HashMap<String, HashSet<String>>;
93
94#[configurable_component(source("journald", "Collect logs from JournalD."))]
96#[derive(Clone, Debug)]
97#[serde(deny_unknown_fields)]
98pub struct JournaldConfig {
99 #[serde(default)]
101 pub since_now: bool,
102
103 #[serde(default = "crate::serde::default_true")]
105 pub current_boot_only: bool,
106
107 #[serde(default)]
113 #[configurable(metadata(docs::examples = "ntpd", docs::examples = "sysinit.target"))]
114 pub include_units: Vec<String>,
115
116 #[serde(default)]
121 #[configurable(metadata(docs::examples = "badservice", docs::examples = "sysinit.target"))]
122 pub exclude_units: Vec<String>,
123
124 #[serde(default)]
130 #[configurable(metadata(
131 docs::additional_props_description = "The set of field values to match in journal entries that are to be included."
132 ))]
133 #[configurable(metadata(docs::examples = "matches_examples()"))]
134 pub include_matches: Matches,
135
136 #[serde(default)]
141 #[configurable(metadata(
142 docs::additional_props_description = "The set of field values to match in journal entries that are to be excluded."
143 ))]
144 #[configurable(metadata(docs::examples = "matches_examples()"))]
145 pub exclude_matches: Matches,
146
147 #[serde(default)]
156 #[configurable(metadata(docs::examples = "/var/lib/vector"))]
157 #[configurable(metadata(docs::human_name = "Data Directory"))]
158 pub data_dir: Option<PathBuf>,
159
160 #[serde(default)]
164 #[configurable(metadata(docs::examples = "--merge"))]
165 pub extra_args: Vec<String>,
166
167 #[serde(default = "default_batch_size")]
171 #[configurable(metadata(docs::type_unit = "events"))]
172 pub batch_size: usize,
173
174 #[serde(default)]
178 pub journalctl_path: Option<PathBuf>,
179
180 #[serde(default)]
184 pub journal_directory: Option<PathBuf>,
185
186 #[serde(default)]
194 pub journal_namespace: Option<String>,
195
196 #[configurable(derived)]
197 #[serde(default, deserialize_with = "bool_or_struct")]
198 acknowledgements: SourceAcknowledgementsConfig,
199
200 #[serde(default)]
204 #[configurable(
205 deprecated = "This option has been deprecated, use the `remap` transform and `to_syslog_level` function instead."
206 )]
207 remap_priority: bool,
208
209 #[configurable(metadata(docs::hidden))]
211 #[serde(default)]
212 log_namespace: Option<bool>,
213
214 #[serde(default = "crate::serde::default_false")]
219 emit_cursor: bool,
220}
221
222const fn default_batch_size() -> usize {
223 16
224}
225
226fn matches_examples() -> HashMap<String, Vec<String>> {
227 HashMap::<_, _>::from_iter([
228 (
229 "_SYSTEMD_UNIT".to_owned(),
230 vec!["sshd.service".to_owned(), "ntpd.service".to_owned()],
231 ),
232 ("_TRANSPORT".to_owned(), vec!["kernel".to_owned()]),
233 ])
234}
235
236impl JournaldConfig {
237 fn merged_include_matches(&self) -> Matches {
238 Self::merge_units(&self.include_matches, &self.include_units)
239 }
240
241 fn merged_exclude_matches(&self) -> Matches {
242 Self::merge_units(&self.exclude_matches, &self.exclude_units)
243 }
244
245 fn merge_units(matches: &Matches, units: &[String]) -> Matches {
246 let mut matches = matches.clone();
247 for unit in units {
248 let entry = matches.entry(String::from(SYSTEMD_UNIT));
249 entry.or_default().insert(fixup_unit(unit));
250 }
251 matches
252 }
253
254 fn schema_definition(&self, log_namespace: LogNamespace) -> Definition {
256 let schema_definition = match log_namespace {
257 LogNamespace::Vector => Definition::new_with_default_metadata(
258 Kind::bytes().or_null(),
259 [LogNamespace::Vector],
260 ),
261 LogNamespace::Legacy => Definition::new_with_default_metadata(
262 Kind::object(Collection::empty()),
263 [LogNamespace::Legacy],
264 ),
265 };
266
267 let mut schema_definition = schema_definition
268 .with_standard_vector_source_metadata()
269 .with_source_metadata(
271 JournaldConfig::NAME,
272 None,
273 &owned_value_path!("metadata"),
274 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
275 None,
276 )
277 .with_source_metadata(
278 JournaldConfig::NAME,
279 None,
280 &owned_value_path!("timestamp"),
281 Kind::timestamp().or_undefined(),
282 Some("timestamp"),
283 )
284 .with_source_metadata(
285 JournaldConfig::NAME,
286 log_schema().host_key().cloned().map(LegacyKey::Overwrite),
287 &owned_value_path!("host"),
288 Kind::bytes().or_undefined(),
289 Some("host"),
290 );
291
292 if log_namespace == LogNamespace::Legacy {
294 schema_definition = schema_definition.unknown_fields(Kind::bytes());
295 }
296
297 schema_definition
298 }
299}
300
301impl Default for JournaldConfig {
302 fn default() -> Self {
303 Self {
304 since_now: false,
305 current_boot_only: true,
306 include_units: vec![],
307 exclude_units: vec![],
308 include_matches: Default::default(),
309 exclude_matches: Default::default(),
310 data_dir: None,
311 batch_size: default_batch_size(),
312 journalctl_path: None,
313 journal_directory: None,
314 journal_namespace: None,
315 extra_args: vec![],
316 acknowledgements: Default::default(),
317 remap_priority: false,
318 log_namespace: None,
319 emit_cursor: false,
320 }
321 }
322}
323
324impl_generate_config_from_default!(JournaldConfig);
325
326type Record = HashMap<String, String>;
327
328#[async_trait::async_trait]
329#[typetag::serde(name = "journald")]
330impl SourceConfig for JournaldConfig {
331 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
332 if self.remap_priority {
333 warn!("DEPRECATION, option `remap_priority` has been deprecated. Please use the `remap` transform and function `to_syslog_level` instead.");
334 }
335
336 let data_dir = cx
337 .globals
338 .resolve_and_make_data_subdir(self.data_dir.as_ref(), cx.key.id())?;
340
341 if let Some(unit) = self
342 .include_units
343 .iter()
344 .find(|unit| self.exclude_units.contains(unit))
345 {
346 let unit = unit.into();
347 return Err(BuildError::DuplicatedUnit { unit }.into());
348 }
349
350 let include_matches = self.merged_include_matches();
351 let exclude_matches = self.merged_exclude_matches();
352
353 if let Some((field, value)) = find_duplicate_match(&include_matches, &exclude_matches) {
354 return Err(BuildError::DuplicatedMatches { field, value }.into());
355 }
356
357 let mut checkpoint_path = data_dir;
358 checkpoint_path.push(CHECKPOINT_FILENAME);
359
360 let journalctl_path = self
361 .journalctl_path
362 .clone()
363 .unwrap_or_else(|| JOURNALCTL.clone());
364
365 let starter = StartJournalctl::new(
366 journalctl_path,
367 self.journal_directory.clone(),
368 self.journal_namespace.clone(),
369 self.current_boot_only,
370 self.since_now,
371 self.extra_args.clone(),
372 );
373
374 let batch_size = self.batch_size;
375 let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
376 let log_namespace = cx.log_namespace(self.log_namespace);
377
378 Ok(Box::pin(
379 JournaldSource {
380 include_matches,
381 exclude_matches,
382 checkpoint_path,
383 batch_size,
384 remap_priority: self.remap_priority,
385 out: cx.out,
386 acknowledgements,
387 starter,
388 log_namespace,
389 emit_cursor: self.emit_cursor,
390 }
391 .run_shutdown(cx.shutdown),
392 ))
393 }
394
395 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
396 let schema_definition =
397 self.schema_definition(global_log_namespace.merge(self.log_namespace));
398
399 vec![SourceOutput::new_maybe_logs(
400 DataType::Log,
401 schema_definition,
402 )]
403 }
404
405 fn can_acknowledge(&self) -> bool {
406 true
407 }
408}
409
410struct JournaldSource {
411 include_matches: Matches,
412 exclude_matches: Matches,
413 checkpoint_path: PathBuf,
414 batch_size: usize,
415 remap_priority: bool,
416 out: SourceSender,
417 acknowledgements: bool,
418 starter: StartJournalctl,
419 log_namespace: LogNamespace,
420 emit_cursor: bool,
421}
422
423impl JournaldSource {
424 async fn run_shutdown(self, shutdown: ShutdownSignal) -> Result<(), ()> {
425 let checkpointer = StatefulCheckpointer::new(self.checkpoint_path.clone())
426 .await
427 .map_err(|error| {
428 emit!(JournaldCheckpointFileOpenError {
429 error,
430 path: self
431 .checkpoint_path
432 .to_str()
433 .unwrap_or("unknown")
434 .to_string(),
435 });
436 })?;
437
438 let checkpointer = SharedCheckpointer::new(checkpointer);
439 let finalizer = Finalizer::new(
440 self.acknowledgements,
441 checkpointer.clone(),
442 shutdown.clone(),
443 );
444
445 self.run(checkpointer, finalizer, shutdown).await;
446
447 Ok(())
448 }
449
450 async fn run(
451 mut self,
452 checkpointer: SharedCheckpointer,
453 finalizer: Finalizer,
454 mut shutdown: ShutdownSignal,
455 ) {
456 loop {
457 if matches!(poll!(&mut shutdown), Poll::Ready(_)) {
458 break;
459 }
460
461 info!("Starting journalctl.");
462 let cursor = checkpointer.lock().await.cursor.clone();
463 match self.starter.start(cursor.as_deref()) {
464 Ok((stream, running)) => {
465 if !self.run_stream(stream, &finalizer, shutdown.clone()).await {
466 return;
467 }
468 drop(running);
470 }
471 Err(error) => {
472 emit!(JournaldStartJournalctlError { error });
473 }
474 }
475
476 tokio::select! {
479 _ = &mut shutdown => break,
480 _ = sleep(BACKOFF_DURATION) => (),
481 }
482 }
483 }
484
485 async fn run_stream<'a>(
488 &'a mut self,
489 mut stream: JournalStream,
490 finalizer: &'a Finalizer,
491 mut shutdown: ShutdownSignal,
492 ) -> bool {
493 let bytes_received = register!(BytesReceived::from(Protocol::from("journald")));
494 let events_received = register!(EventsReceived);
495
496 let batch_size = self.batch_size;
497 loop {
498 let mut batch = Batch::new(self);
499
500 while batch.events.is_empty() {
503 let item = tokio::select! {
504 _ = &mut shutdown => return false,
505 item = stream.next() => item,
506 };
507 if !batch.handle_next(item) {
508 return true;
509 }
510 }
511
512 let timeout = tokio::time::sleep(BATCH_TIMEOUT);
513 tokio::pin!(timeout);
514
515 for _ in 1..batch_size {
516 tokio::select! {
517 _ = &mut timeout => break,
518 result = stream.next() => if !batch.handle_next(result) {
519 break;
520 }
521 }
522 }
523 if let Some(x) = batch
524 .finish(finalizer, &bytes_received, &events_received)
525 .await
526 {
527 break x;
528 }
529 }
530 }
531}
532
533struct Batch<'a> {
534 events: Vec<LogEvent>,
535 record_size: usize,
536 exiting: Option<bool>,
537 batch: Option<BatchNotifier>,
538 receiver: Option<BatchStatusReceiver>,
539 source: &'a mut JournaldSource,
540 cursor: Option<String>,
541}
542
543impl<'a> Batch<'a> {
544 fn new(source: &'a mut JournaldSource) -> Self {
545 let (batch, receiver) = BatchNotifier::maybe_new_with_receiver(source.acknowledgements);
546 Self {
547 events: Vec::new(),
548 record_size: 0,
549 exiting: None,
550 batch,
551 receiver,
552 source,
553 cursor: None,
554 }
555 }
556
557 fn handle_next(&mut self, result: Option<Result<Bytes, BoxedFramingError>>) -> bool {
558 match result {
559 None => {
560 warn!("Journalctl process stopped.");
561 self.exiting = Some(true);
562 false
563 }
564 Some(Err(error)) => {
565 emit!(JournaldReadError { error });
566 false
567 }
568 Some(Ok(bytes)) => {
569 match decode_record(&bytes, self.source.remap_priority) {
570 Ok(mut record) => {
571 if self.source.emit_cursor {
572 if let Some(tmp) = record.get(CURSOR) {
573 self.cursor = Some(tmp.clone());
574 }
575 } else if let Some(tmp) = record.remove(CURSOR) {
576 self.cursor = Some(tmp);
577 }
578
579 if !filter_matches(
580 &record,
581 &self.source.include_matches,
582 &self.source.exclude_matches,
583 ) {
584 self.record_size += bytes.len();
585
586 let mut event = create_log_event_from_record(
587 record,
588 &self.batch,
589 self.source.log_namespace,
590 );
591
592 enrich_log_event(&mut event, self.source.log_namespace);
593
594 self.events.push(event);
595 }
596 }
597 Err(error) => {
598 emit!(JournaldInvalidRecordError {
599 error,
600 text: String::from_utf8_lossy(&bytes).into_owned()
601 });
602 }
603 }
604 true
605 }
606 }
607 }
608
609 async fn finish(
610 mut self,
611 finalizer: &Finalizer,
612 bytes_received: &'a Registered<BytesReceived>,
613 events_received: &'a Registered<EventsReceived>,
614 ) -> Option<bool> {
615 drop(self.batch);
616
617 if self.record_size > 0 {
618 bytes_received.emit(ByteSize(self.record_size));
619 }
620
621 if !self.events.is_empty() {
622 let count = self.events.len();
623 let byte_size = self.events.estimated_json_encoded_size_of();
624 events_received.emit(CountByteSize(count, byte_size));
625
626 match self.source.out.send_batch(self.events).await {
627 Ok(_) => {
628 if let Some(cursor) = self.cursor {
629 finalizer.finalize(cursor, self.receiver).await;
630 }
631 }
632 Err(_) => {
633 emit!(StreamClosedError { count });
634 self.exiting = Some(false);
636 }
637 }
638 }
639 self.exiting
640 }
641}
642
643type JournalStream = BoxStream<'static, Result<Bytes, BoxedFramingError>>;
644
645struct StartJournalctl {
646 path: PathBuf,
647 journal_dir: Option<PathBuf>,
648 journal_namespace: Option<String>,
649 current_boot_only: bool,
650 since_now: bool,
651 extra_args: Vec<String>,
652}
653
654impl StartJournalctl {
655 const fn new(
656 path: PathBuf,
657 journal_dir: Option<PathBuf>,
658 journal_namespace: Option<String>,
659 current_boot_only: bool,
660 since_now: bool,
661 extra_args: Vec<String>,
662 ) -> Self {
663 Self {
664 path,
665 journal_dir,
666 journal_namespace,
667 current_boot_only,
668 since_now,
669 extra_args,
670 }
671 }
672
673 fn make_command(&self, checkpoint: Option<&str>) -> Command {
674 let mut command = Command::new(&self.path);
675 command.stdout(Stdio::piped());
676 command.arg("--follow");
677 command.arg("--all");
678 command.arg("--show-cursor");
679 command.arg("--output=json");
680
681 if let Some(dir) = &self.journal_dir {
682 command.arg(format!("--directory={}", dir.display()));
683 }
684
685 if let Some(namespace) = &self.journal_namespace {
686 command.arg(format!("--namespace={namespace}"));
687 }
688
689 if self.current_boot_only {
690 command.arg("--boot");
691 }
692
693 if let Some(cursor) = checkpoint {
694 command.arg(format!("--after-cursor={cursor}"));
695 } else if self.since_now {
696 command.arg("--since=now");
697 } else {
698 command.arg("--since=2000-01-01");
700 }
701
702 if !self.extra_args.is_empty() {
703 command.args(&self.extra_args);
704 }
705
706 command
707 }
708
709 fn start(
710 &mut self,
711 checkpoint: Option<&str>,
712 ) -> crate::Result<(JournalStream, RunningJournalctl)> {
713 let mut command = self.make_command(checkpoint);
714
715 let mut child = command.spawn().context(JournalctlSpawnSnafu)?;
716
717 let stream = FramedRead::new(
718 child.stdout.take().unwrap(),
719 CharacterDelimitedDecoder::new(b'\n'),
720 )
721 .boxed();
722
723 Ok((stream, RunningJournalctl(child)))
724 }
725}
726
727struct RunningJournalctl(Child);
728
729impl Drop for RunningJournalctl {
730 fn drop(&mut self) {
731 if let Some(pid) = self.0.id().and_then(|pid| pid.try_into().ok()) {
732 _ = kill(Pid::from_raw(pid), Signal::SIGTERM);
733 }
734 }
735}
736
737fn enrich_log_event(log: &mut LogEvent, log_namespace: LogNamespace) {
738 match log_namespace {
739 LogNamespace::Vector => {
740 if let Some(host) = log
741 .get(metadata_path!(JournaldConfig::NAME, "metadata"))
742 .and_then(|meta| meta.get(HOSTNAME))
743 {
744 log.insert(metadata_path!(JournaldConfig::NAME, "host"), host.clone());
745 }
746 }
747 LogNamespace::Legacy => {
748 if let Some(host) = log.remove(event_path!(HOSTNAME)) {
749 log_namespace.insert_source_metadata(
750 JournaldConfig::NAME,
751 log,
752 log_schema().host_key().map(LegacyKey::Overwrite),
753 path!("host"),
754 host,
755 );
756 }
757 }
758 }
759
760 let timestamp_value = match log_namespace {
762 LogNamespace::Vector => log
763 .get(metadata_path!(JournaldConfig::NAME, "metadata"))
764 .and_then(|meta| {
765 meta.get(SOURCE_TIMESTAMP)
766 .or_else(|| meta.get(RECEIVED_TIMESTAMP))
767 }),
768 LogNamespace::Legacy => log
769 .get(event_path!(SOURCE_TIMESTAMP))
770 .or_else(|| log.get(event_path!(RECEIVED_TIMESTAMP))),
771 };
772
773 let timestamp = timestamp_value
774 .filter(|&ts| ts.is_bytes())
775 .and_then(|ts| ts.as_str().unwrap().parse::<u64>().ok())
776 .map(|ts| {
777 chrono::Utc
778 .timestamp_opt((ts / 1_000_000) as i64, (ts % 1_000_000) as u32 * 1_000)
779 .single()
780 .expect("invalid timestamp")
781 });
782
783 match log_namespace {
785 LogNamespace::Vector => {
786 log.insert(metadata_path!("vector", "ingest_timestamp"), Utc::now());
787
788 if let Some(ts) = timestamp {
789 log.insert(metadata_path!(JournaldConfig::NAME, "timestamp"), ts);
790 }
791 }
792 LogNamespace::Legacy => {
793 if let Some(ts) = timestamp {
794 log.maybe_insert(log_schema().timestamp_key_target_path(), ts);
795 }
796 }
797 }
798
799 log_namespace.insert_vector_metadata(
801 log,
802 log_schema().source_type_key(),
803 path!("source_type"),
804 JournaldConfig::NAME,
805 );
806}
807
808fn create_log_event_from_record(
809 mut record: Record,
810 batch: &Option<BatchNotifier>,
811 log_namespace: LogNamespace,
812) -> LogEvent {
813 match log_namespace {
814 LogNamespace::Vector => {
815 let message_value = record
816 .remove(MESSAGE)
817 .map(|msg| Value::Bytes(Bytes::from(msg)))
818 .unwrap_or(Value::Null);
819
820 let mut log = LogEvent::from(message_value).with_batch_notifier_option(batch);
821
822 record.iter().for_each(|(key, value)| {
824 log.metadata_mut()
825 .value_mut()
826 .insert(path!(JournaldConfig::NAME, "metadata", key), value.as_str());
827 });
828
829 log
830 }
831 LogNamespace::Legacy => {
832 let mut log = LogEvent::from_iter(record).with_batch_notifier_option(batch);
833
834 if let Some(message) = log.remove(event_path!(MESSAGE)) {
835 log.maybe_insert(log_schema().message_key_target_path(), message);
836 }
837
838 log
839 }
840 }
841}
842
843fn fixup_unit(unit: &str) -> String {
846 if unit.contains('.') {
847 unit.into()
848 } else {
849 format!("{unit}.service")
850 }
851}
852
853fn decode_record(line: &[u8], remap: bool) -> Result<Record, JsonError> {
854 let mut record = serde_json::from_str::<JsonValue>(&String::from_utf8_lossy(line))?;
855 if let Some(record) = record.as_object_mut() {
858 for (_, value) in record.iter_mut().filter(|(_, v)| v.is_array()) {
859 *value = decode_array(value.as_array().expect("already validated"));
860 }
861 }
862 if remap {
863 record.get_mut("PRIORITY").map(remap_priority);
864 }
865 serde_json::from_value(record)
866}
867
868fn decode_array(array: &[JsonValue]) -> JsonValue {
869 decode_array_as_bytes(array).unwrap_or_else(|| {
870 let ser = serde_json::to_string(array).expect("already deserialized");
871 JsonValue::String(ser)
872 })
873}
874
875fn decode_array_as_bytes(array: &[JsonValue]) -> Option<JsonValue> {
876 array
880 .iter()
881 .map(|item| {
882 item.as_u64().and_then(|num| match num {
883 num if num <= u8::MAX as u64 => Some(num as u8),
884 _ => None,
885 })
886 })
887 .collect::<Option<Vec<u8>>>()
888 .map(|array| String::from_utf8_lossy(&array).into())
889}
890
891fn remap_priority(priority: &mut JsonValue) {
892 if let Some(num) = priority.as_str().and_then(|s| usize::from_str(s).ok()) {
893 let text = match num {
894 0 => "EMERG",
895 1 => "ALERT",
896 2 => "CRIT",
897 3 => "ERR",
898 4 => "WARNING",
899 5 => "NOTICE",
900 6 => "INFO",
901 7 => "DEBUG",
902 _ => "UNKNOWN",
903 };
904 *priority = JsonValue::String(text.into());
905 }
906}
907
908fn filter_matches(record: &Record, includes: &Matches, excludes: &Matches) -> bool {
909 match (includes.is_empty(), excludes.is_empty()) {
910 (true, true) => false,
911 (false, true) => !contains_match(record, includes),
912 (true, false) => contains_match(record, excludes),
913 (false, false) => !contains_match(record, includes) || contains_match(record, excludes),
914 }
915}
916
917fn contains_match(record: &Record, matches: &Matches) -> bool {
918 let f = move |(field, value)| {
919 matches
920 .get(field)
921 .map(|x| x.contains(value))
922 .unwrap_or(false)
923 };
924 record.iter().any(f)
925}
926
927fn find_duplicate_match(a_matches: &Matches, b_matches: &Matches) -> Option<(String, String)> {
928 for (a_key, a_values) in a_matches {
929 if let Some(b_values) = b_matches.get(a_key.as_str()) {
930 for (a, b) in a_values
931 .iter()
932 .flat_map(|x| std::iter::repeat(x).zip(b_values.iter()))
933 {
934 if a == b {
935 return Some((a_key.into(), b.into()));
936 }
937 }
938 }
939 }
940 None
941}
942
943enum Finalizer {
944 Sync(SharedCheckpointer),
945 Async(OrderedFinalizer<String>),
946}
947
948impl Finalizer {
949 fn new(
950 acknowledgements: bool,
951 checkpointer: SharedCheckpointer,
952 shutdown: ShutdownSignal,
953 ) -> Self {
954 if acknowledgements {
955 let (finalizer, mut ack_stream) = OrderedFinalizer::new(Some(shutdown));
956 tokio::spawn(async move {
957 while let Some((status, cursor)) = ack_stream.next().await {
958 if status == BatchStatus::Delivered {
959 checkpointer.lock().await.set(cursor).await;
960 }
961 }
962 });
963 Self::Async(finalizer)
964 } else {
965 Self::Sync(checkpointer)
966 }
967 }
968
969 async fn finalize(&self, cursor: String, receiver: Option<BatchStatusReceiver>) {
970 match (self, receiver) {
971 (Self::Sync(checkpointer), None) => checkpointer.lock().await.set(cursor).await,
972 (Self::Async(finalizer), Some(receiver)) => finalizer.add(cursor, receiver),
973 _ => {
974 unreachable!("Cannot have async finalization without a receiver in journald source")
975 }
976 }
977 }
978}
979
980struct Checkpointer {
981 file: File,
982 filename: PathBuf,
983}
984
985impl Checkpointer {
986 async fn new(filename: PathBuf) -> Result<Self, io::Error> {
987 let file = OpenOptions::new()
988 .read(true)
989 .write(true)
990 .create(true)
991 .truncate(false)
992 .open(&filename)
993 .await?;
994 Ok(Checkpointer { file, filename })
995 }
996
997 async fn set(&mut self, token: &str) -> Result<(), io::Error> {
998 self.file.seek(SeekFrom::Start(0)).await?;
999 self.file.write_all(format!("{token}\n").as_bytes()).await
1000 }
1001
1002 async fn get(&mut self) -> Result<Option<String>, io::Error> {
1003 let mut buf = Vec::<u8>::new();
1004 self.file.seek(SeekFrom::Start(0)).await?;
1005 self.file.read_to_end(&mut buf).await?;
1006 match buf.len() {
1007 0 => Ok(None),
1008 _ => {
1009 let text = String::from_utf8_lossy(&buf);
1010 match text.find('\n') {
1011 Some(nl) => Ok(Some(String::from(&text[..nl]))),
1012 None => Ok(None), }
1014 }
1015 }
1016 }
1017}
1018
1019struct StatefulCheckpointer {
1020 checkpointer: Checkpointer,
1021 cursor: Option<String>,
1022}
1023
1024impl StatefulCheckpointer {
1025 async fn new(filename: PathBuf) -> Result<Self, io::Error> {
1026 let mut checkpointer = Checkpointer::new(filename).await?;
1027 let cursor = checkpointer.get().await?;
1028 Ok(Self {
1029 checkpointer,
1030 cursor,
1031 })
1032 }
1033
1034 async fn set(&mut self, token: impl Into<String>) {
1035 let token = token.into();
1036 if let Err(error) = self.checkpointer.set(&token).await {
1037 emit!(JournaldCheckpointSetError {
1038 error,
1039 filename: self
1040 .checkpointer
1041 .filename
1042 .to_str()
1043 .unwrap_or("unknown")
1044 .to_string(),
1045 });
1046 }
1047 self.cursor = Some(token);
1048 }
1049}
1050
1051#[derive(Clone)]
1052struct SharedCheckpointer(Arc<Mutex<StatefulCheckpointer>>);
1053
1054impl SharedCheckpointer {
1055 fn new(c: StatefulCheckpointer) -> Self {
1056 Self(Arc::new(Mutex::new(c)))
1057 }
1058
1059 async fn lock(&self) -> MutexGuard<'_, StatefulCheckpointer> {
1060 self.0.lock().await
1061 }
1062}
1063
1064#[cfg(test)]
1065mod checkpointer_tests {
1066 use tempfile::tempdir;
1067 use tokio::fs::read_to_string;
1068
1069 use super::*;
1070
1071 #[test]
1072 fn generate_config() {
1073 crate::test_util::test_generate_config::<JournaldConfig>();
1074 }
1075
1076 #[tokio::test]
1077 async fn journald_checkpointer_works() {
1078 let tempdir = tempdir().unwrap();
1079 let mut filename = tempdir.path().to_path_buf();
1080 filename.push(CHECKPOINT_FILENAME);
1081 let mut checkpointer = Checkpointer::new(filename.clone())
1082 .await
1083 .expect("Creating checkpointer failed!");
1084
1085 assert!(checkpointer.get().await.unwrap().is_none());
1086
1087 checkpointer
1088 .set("first test")
1089 .await
1090 .expect("Setting checkpoint failed");
1091 assert_eq!(checkpointer.get().await.unwrap().unwrap(), "first test");
1092 let contents = read_to_string(filename.clone())
1093 .await
1094 .unwrap_or_else(|_| panic!("Failed to read: {filename:?}"));
1095 assert!(contents.starts_with("first test\n"));
1096
1097 checkpointer
1098 .set("second")
1099 .await
1100 .expect("Setting checkpoint failed");
1101 assert_eq!(checkpointer.get().await.unwrap().unwrap(), "second");
1102 let contents = read_to_string(filename.clone())
1103 .await
1104 .unwrap_or_else(|_| panic!("Failed to read: {filename:?}"));
1105 assert!(contents.starts_with("second\n"));
1106 }
1107}
1108
1109#[cfg(test)]
1110mod tests {
1111 use std::{fs, path::Path};
1112
1113 use tempfile::tempdir;
1114 use tokio::time::{sleep, timeout, Duration, Instant};
1115 use vrl::value::{kind::Collection, Value};
1116
1117 use super::*;
1118 use crate::{
1119 config::ComponentKey, event::Event, event::EventStatus,
1120 test_util::components::assert_source_compliance,
1121 };
1122
1123 const TEST_COMPONENT: &str = "journald-test";
1124 const TEST_JOURNALCTL: &str = "tests/data/journalctl";
1125
1126 async fn run_with_units(iunits: &[&str], xunits: &[&str], cursor: Option<&str>) -> Vec<Event> {
1127 let include_matches = create_unit_matches(iunits.to_vec());
1128 let exclude_matches = create_unit_matches(xunits.to_vec());
1129 run_journal(include_matches, exclude_matches, cursor, false).await
1130 }
1131
1132 async fn run_journal(
1133 include_matches: Matches,
1134 exclude_matches: Matches,
1135 checkpoint: Option<&str>,
1136 emit_cursor: bool,
1137 ) -> Vec<Event> {
1138 assert_source_compliance(&["protocol"], async move {
1139 let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Delivered);
1140
1141 let tempdir = tempdir().unwrap();
1142 let tempdir = tempdir.path().to_path_buf();
1143
1144 if let Some(cursor) = checkpoint {
1145 let mut checkpoint_path = tempdir.clone();
1146 checkpoint_path.push(TEST_COMPONENT);
1147 fs::create_dir(&checkpoint_path).unwrap();
1148 checkpoint_path.push(CHECKPOINT_FILENAME);
1149
1150 let mut checkpointer = Checkpointer::new(checkpoint_path.clone())
1151 .await
1152 .expect("Creating checkpointer failed!");
1153
1154 checkpointer
1155 .set(cursor)
1156 .await
1157 .expect("Could not set checkpoint");
1158 }
1159
1160 let (cx, shutdown) =
1161 SourceContext::new_shutdown(&ComponentKey::from(TEST_COMPONENT), tx);
1162 let config = JournaldConfig {
1163 journalctl_path: Some(TEST_JOURNALCTL.into()),
1164 include_matches,
1165 exclude_matches,
1166 data_dir: Some(tempdir),
1167 remap_priority: true,
1168 acknowledgements: false.into(),
1169 emit_cursor,
1170 ..Default::default()
1171 };
1172 let source = config.build(cx).await.unwrap();
1173 tokio::spawn(async move { source.await.unwrap() });
1174
1175 sleep(Duration::from_millis(100)).await;
1176 shutdown
1177 .shutdown_all(Some(Instant::now() + Duration::from_secs(1)))
1178 .await;
1179
1180 timeout(Duration::from_secs(1), rx.collect()).await.unwrap()
1181 })
1182 .await
1183 }
1184
1185 fn create_unit_matches<S: Into<String>>(units: Vec<S>) -> Matches {
1186 let units: HashSet<String> = units.into_iter().map(Into::into).collect();
1187 let mut map = HashMap::new();
1188 if !units.is_empty() {
1189 map.insert(String::from(SYSTEMD_UNIT), units);
1190 }
1191 map
1192 }
1193
1194 fn create_matches<S: Into<String>>(conditions: Vec<(S, S)>) -> Matches {
1195 let mut matches: Matches = HashMap::new();
1196 for (field, value) in conditions {
1197 matches
1198 .entry(field.into())
1199 .or_default()
1200 .insert(value.into());
1201 }
1202 matches
1203 }
1204
1205 #[tokio::test]
1206 async fn reads_journal() {
1207 let received = run_with_units(&[], &[], None).await;
1208 assert_eq!(received.len(), 8);
1209 assert_eq!(
1210 message(&received[0]),
1211 Value::Bytes("System Initialization".into())
1212 );
1213 assert_eq!(
1214 received[0].as_log()[log_schema().source_type_key().unwrap().to_string()],
1215 "journald".into()
1216 );
1217 assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140001000));
1218 assert_eq!(priority(&received[0]), Value::Bytes("INFO".into()));
1219 assert_eq!(message(&received[1]), Value::Bytes("unit message".into()));
1220 assert_eq!(timestamp(&received[1]), value_ts(1578529839, 140002000));
1221 assert_eq!(priority(&received[1]), Value::Bytes("DEBUG".into()));
1222 }
1223
1224 #[tokio::test]
1225 async fn includes_units() {
1226 let received = run_with_units(&["unit.service"], &[], None).await;
1227 assert_eq!(received.len(), 1);
1228 assert_eq!(message(&received[0]), Value::Bytes("unit message".into()));
1229 }
1230
1231 #[tokio::test]
1232 async fn excludes_units() {
1233 let received = run_with_units(&[], &["unit.service", "badunit.service"], None).await;
1234 assert_eq!(received.len(), 6);
1235 assert_eq!(
1236 message(&received[0]),
1237 Value::Bytes("System Initialization".into())
1238 );
1239 assert_eq!(
1240 message(&received[1]),
1241 Value::Bytes("Missing timestamp".into())
1242 );
1243 assert_eq!(
1244 message(&received[2]),
1245 Value::Bytes("Different timestamps".into())
1246 );
1247 }
1248
1249 #[tokio::test]
1250 async fn emits_cursor() {
1251 let received = run_journal(Matches::new(), Matches::new(), None, true).await;
1252 assert_eq!(cursor(&received[0]), Value::Bytes("1".into()));
1253 assert_eq!(cursor(&received[3]), Value::Bytes("4".into()));
1254 assert_eq!(cursor(&received[7]), Value::Bytes("8".into()));
1255 }
1256
1257 #[tokio::test]
1258 async fn includes_matches() {
1259 let matches = create_matches(vec![("PRIORITY", "ERR")]);
1260 let received = run_journal(matches, HashMap::new(), None, false).await;
1261 assert_eq!(received.len(), 2);
1262 assert_eq!(
1263 message(&received[0]),
1264 Value::Bytes("Different timestamps".into())
1265 );
1266 assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140005000));
1267 assert_eq!(
1268 message(&received[1]),
1269 Value::Bytes("Non-ASCII in other field".into())
1270 );
1271 assert_eq!(timestamp(&received[1]), value_ts(1578529839, 140005000));
1272 }
1273
1274 #[tokio::test]
1275 async fn includes_kernel() {
1276 let matches = create_matches(vec![("_TRANSPORT", "kernel")]);
1277 let received = run_journal(matches, HashMap::new(), None, false).await;
1278 assert_eq!(received.len(), 1);
1279 assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140006000));
1280 assert_eq!(message(&received[0]), Value::Bytes("audit log".into()));
1281 }
1282
1283 #[tokio::test]
1284 async fn excludes_matches() {
1285 let matches = create_matches(vec![("PRIORITY", "INFO"), ("PRIORITY", "DEBUG")]);
1286 let received = run_journal(HashMap::new(), matches, None, false).await;
1287 assert_eq!(received.len(), 5);
1288 assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140003000));
1289 assert_eq!(timestamp(&received[1]), value_ts(1578529839, 140004000));
1290 assert_eq!(timestamp(&received[2]), value_ts(1578529839, 140005000));
1291 assert_eq!(timestamp(&received[3]), value_ts(1578529839, 140005000));
1292 assert_eq!(timestamp(&received[4]), value_ts(1578529839, 140006000));
1293 }
1294
1295 #[tokio::test]
1296 async fn handles_checkpoint() {
1297 let received = run_with_units(&[], &[], Some("1")).await;
1298 assert_eq!(received.len(), 7);
1299 assert_eq!(message(&received[0]), Value::Bytes("unit message".into()));
1300 assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140002000));
1301 }
1302
1303 #[tokio::test]
1304 async fn parses_array_messages() {
1305 let received = run_with_units(&["badunit.service"], &[], None).await;
1306 assert_eq!(received.len(), 1);
1307 assert_eq!(message(&received[0]), Value::Bytes("¿Hello?".into()));
1308 }
1309
1310 #[tokio::test]
1311 async fn parses_array_fields() {
1312 let received = run_with_units(&["syslog.service"], &[], None).await;
1313 assert_eq!(received.len(), 1);
1314 assert_eq!(
1315 received[0].as_log()["SYSLOG_RAW"],
1316 Value::Bytes("¿World?".into())
1317 );
1318 }
1319
1320 #[tokio::test]
1321 async fn parses_string_sequences() {
1322 let received = run_with_units(&["NetworkManager.service"], &[], None).await;
1323 assert_eq!(received.len(), 1);
1324 assert_eq!(
1325 received[0].as_log()["SYSLOG_FACILITY"],
1326 Value::Bytes(r#"["DHCP4","DHCP6"]"#.into())
1327 );
1328 }
1329
1330 #[tokio::test]
1331 async fn handles_missing_timestamp() {
1332 let received = run_with_units(&["stdout"], &[], None).await;
1333 assert_eq!(received.len(), 2);
1334 assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140004000));
1335 assert_eq!(timestamp(&received[1]), value_ts(1578529839, 140005000));
1336 }
1337
1338 #[tokio::test]
1339 async fn handles_acknowledgements() {
1340 let (tx, mut rx) = SourceSender::new_test();
1341
1342 let tempdir = tempdir().unwrap();
1343 let tempdir = tempdir.path().to_path_buf();
1344 let mut checkpoint_path = tempdir.clone();
1345 checkpoint_path.push(TEST_COMPONENT);
1346 fs::create_dir(&checkpoint_path).unwrap();
1347 checkpoint_path.push(CHECKPOINT_FILENAME);
1348
1349 let mut checkpointer = Checkpointer::new(checkpoint_path.clone())
1350 .await
1351 .expect("Creating checkpointer failed!");
1352
1353 let config = JournaldConfig {
1354 journalctl_path: Some(TEST_JOURNALCTL.into()),
1355 data_dir: Some(tempdir),
1356 remap_priority: true,
1357 acknowledgements: true.into(),
1358 ..Default::default()
1359 };
1360 let (cx, _shutdown) = SourceContext::new_shutdown(&ComponentKey::from(TEST_COMPONENT), tx);
1361 let source = config.build(cx).await.unwrap();
1362 tokio::spawn(async move { source.await.unwrap() });
1363
1364 assert_eq!(checkpointer.get().await.unwrap(), None);
1366
1367 tokio::time::sleep(Duration::from_millis(100)).await;
1368
1369 let mut count = 0;
1371 while let Poll::Ready(Some(event)) = futures::poll!(rx.next()) {
1372 assert_eq!(checkpointer.get().await.unwrap(), None);
1374 event.metadata().update_status(EventStatus::Delivered);
1375 count += 1;
1376 }
1377 assert_eq!(count, 8);
1378
1379 tokio::time::sleep(Duration::from_millis(100)).await;
1380 assert_eq!(checkpointer.get().await.unwrap().as_deref(), Some("8"));
1381 }
1382
1383 #[test]
1384 fn filter_matches_works_correctly() {
1385 let empty: Matches = HashMap::new();
1386 let includes = create_unit_matches(vec!["one", "two"]);
1387 let excludes = create_unit_matches(vec!["foo", "bar"]);
1388
1389 let zero = HashMap::new();
1390 assert!(!filter_matches(&zero, &empty, &empty));
1391 assert!(filter_matches(&zero, &includes, &empty));
1392 assert!(!filter_matches(&zero, &empty, &excludes));
1393 assert!(filter_matches(&zero, &includes, &excludes));
1394 let mut one = HashMap::new();
1395 one.insert(String::from(SYSTEMD_UNIT), String::from("one"));
1396 assert!(!filter_matches(&one, &empty, &empty));
1397 assert!(!filter_matches(&one, &includes, &empty));
1398 assert!(!filter_matches(&one, &empty, &excludes));
1399 assert!(!filter_matches(&one, &includes, &excludes));
1400 let mut two = HashMap::new();
1401 two.insert(String::from(SYSTEMD_UNIT), String::from("bar"));
1402 assert!(!filter_matches(&two, &empty, &empty));
1403 assert!(filter_matches(&two, &includes, &empty));
1404 assert!(filter_matches(&two, &empty, &excludes));
1405 assert!(filter_matches(&two, &includes, &excludes));
1406 }
1407
1408 #[test]
1409 fn merges_units_and_matches_option() {
1410 let include_units = vec!["one", "two"].into_iter().map(String::from).collect();
1411 let include_matches = create_matches(vec![
1412 ("_SYSTEMD_UNIT", "three.service"),
1413 ("_TRANSPORT", "kernel"),
1414 ]);
1415
1416 let exclude_units = vec!["foo", "bar"].into_iter().map(String::from).collect();
1417 let exclude_matches = create_matches(vec![
1418 ("_SYSTEMD_UNIT", "baz.service"),
1419 ("PRIORITY", "DEBUG"),
1420 ]);
1421
1422 let journald_config = JournaldConfig {
1423 include_units,
1424 include_matches,
1425 exclude_units,
1426 exclude_matches,
1427 ..Default::default()
1428 };
1429
1430 let hashset =
1431 |v: &[&str]| -> HashSet<String> { v.iter().copied().map(String::from).collect() };
1432
1433 let matches = journald_config.merged_include_matches();
1434 let units = matches.get("_SYSTEMD_UNIT").unwrap();
1435 assert_eq!(
1436 units,
1437 &hashset(&["one.service", "two.service", "three.service"])
1438 );
1439 let units = matches.get("_TRANSPORT").unwrap();
1440 assert_eq!(units, &hashset(&["kernel"]));
1441
1442 let matches = journald_config.merged_exclude_matches();
1443 let units = matches.get("_SYSTEMD_UNIT").unwrap();
1444 assert_eq!(
1445 units,
1446 &hashset(&["foo.service", "bar.service", "baz.service"])
1447 );
1448 let units = matches.get("PRIORITY").unwrap();
1449 assert_eq!(units, &hashset(&["DEBUG"]));
1450 }
1451
1452 #[test]
1453 fn find_duplicate_match_works_correctly() {
1454 let include_matches = create_matches(vec![("_TRANSPORT", "kernel")]);
1455 let exclude_matches = create_matches(vec![("_TRANSPORT", "kernel")]);
1456 let (field, value) = find_duplicate_match(&include_matches, &exclude_matches).unwrap();
1457 assert_eq!(field, "_TRANSPORT");
1458 assert_eq!(value, "kernel");
1459
1460 let empty = HashMap::new();
1461 let actual = find_duplicate_match(&empty, &empty);
1462 assert!(actual.is_none());
1463
1464 let actual = find_duplicate_match(&include_matches, &empty);
1465 assert!(actual.is_none());
1466
1467 let actual = find_duplicate_match(&empty, &exclude_matches);
1468 assert!(actual.is_none());
1469 }
1470
1471 #[test]
1472 fn command_options() {
1473 let path = PathBuf::from("journalctl");
1474
1475 let journal_dir = None;
1476 let journal_namespace = None;
1477 let current_boot_only = false;
1478 let cursor = None;
1479 let since_now = false;
1480 let extra_args = vec![];
1481
1482 let command = create_command(
1483 &path,
1484 journal_dir,
1485 journal_namespace,
1486 current_boot_only,
1487 since_now,
1488 cursor,
1489 extra_args,
1490 );
1491 let cmd_line = format!("{command:?}");
1492 assert!(!cmd_line.contains("--directory="));
1493 assert!(!cmd_line.contains("--namespace="));
1494 assert!(!cmd_line.contains("--boot"));
1495 assert!(cmd_line.contains("--since=2000-01-01"));
1496
1497 let journal_dir = None;
1498 let journal_namespace = None;
1499 let since_now = true;
1500 let extra_args = vec![];
1501
1502 let command = create_command(
1503 &path,
1504 journal_dir,
1505 journal_namespace,
1506 current_boot_only,
1507 since_now,
1508 cursor,
1509 extra_args,
1510 );
1511 let cmd_line = format!("{command:?}");
1512 assert!(cmd_line.contains("--since=now"));
1513
1514 let journal_dir = Some(PathBuf::from("/tmp/journal-dir"));
1515 let journal_namespace = Some(String::from("my_namespace"));
1516 let current_boot_only = true;
1517 let cursor = Some("2021-01-01");
1518 let extra_args = vec!["--merge".to_string()];
1519
1520 let command = create_command(
1521 &path,
1522 journal_dir,
1523 journal_namespace,
1524 current_boot_only,
1525 since_now,
1526 cursor,
1527 extra_args,
1528 );
1529 let cmd_line = format!("{command:?}");
1530 assert!(cmd_line.contains("--directory=/tmp/journal-dir"));
1531 assert!(cmd_line.contains("--namespace=my_namespace"));
1532 assert!(cmd_line.contains("--boot"));
1533 assert!(cmd_line.contains("--after-cursor="));
1534 assert!(cmd_line.contains("--merge"));
1535 }
1536
1537 fn create_command(
1538 path: &Path,
1539 journal_dir: Option<PathBuf>,
1540 journal_namespace: Option<String>,
1541 current_boot_only: bool,
1542 since_now: bool,
1543 cursor: Option<&str>,
1544 extra_args: Vec<String>,
1545 ) -> Command {
1546 StartJournalctl::new(
1547 path.into(),
1548 journal_dir,
1549 journal_namespace,
1550 current_boot_only,
1551 since_now,
1552 extra_args,
1553 )
1554 .make_command(cursor)
1555 }
1556
1557 fn message(event: &Event) -> Value {
1558 event.as_log()[log_schema().message_key().unwrap().to_string()].clone()
1559 }
1560
1561 fn timestamp(event: &Event) -> Value {
1562 event.as_log()[log_schema().timestamp_key().unwrap().to_string()].clone()
1563 }
1564
1565 fn cursor(event: &Event) -> Value {
1566 event.as_log()[CURSOR].clone()
1567 }
1568
1569 fn value_ts(secs: i64, usecs: u32) -> Value {
1570 Value::Timestamp(
1571 chrono::Utc
1572 .timestamp_opt(secs, usecs)
1573 .single()
1574 .expect("invalid timestamp"),
1575 )
1576 }
1577
1578 fn priority(event: &Event) -> Value {
1579 event.as_log()["PRIORITY"].clone()
1580 }
1581
1582 #[test]
1583 fn output_schema_definition_vector_namespace() {
1584 let config = JournaldConfig {
1585 log_namespace: Some(true),
1586 ..Default::default()
1587 };
1588
1589 let definitions = config
1590 .outputs(LogNamespace::Vector)
1591 .remove(0)
1592 .schema_definition(true);
1593
1594 let expected_definition =
1595 Definition::new_with_default_metadata(Kind::bytes().or_null(), [LogNamespace::Vector])
1596 .with_metadata_field(
1597 &owned_value_path!("vector", "source_type"),
1598 Kind::bytes(),
1599 None,
1600 )
1601 .with_metadata_field(
1602 &owned_value_path!("vector", "ingest_timestamp"),
1603 Kind::timestamp(),
1604 None,
1605 )
1606 .with_metadata_field(
1607 &owned_value_path!(JournaldConfig::NAME, "metadata"),
1608 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1609 None,
1610 )
1611 .with_metadata_field(
1612 &owned_value_path!(JournaldConfig::NAME, "timestamp"),
1613 Kind::timestamp().or_undefined(),
1614 Some("timestamp"),
1615 )
1616 .with_metadata_field(
1617 &owned_value_path!(JournaldConfig::NAME, "host"),
1618 Kind::bytes().or_undefined(),
1619 Some("host"),
1620 );
1621
1622 assert_eq!(definitions, Some(expected_definition))
1623 }
1624
1625 #[test]
1626 fn output_schema_definition_legacy_namespace() {
1627 let config = JournaldConfig::default();
1628
1629 let definitions = config
1630 .outputs(LogNamespace::Legacy)
1631 .remove(0)
1632 .schema_definition(true);
1633
1634 let expected_definition = Definition::new_with_default_metadata(
1635 Kind::object(Collection::empty()),
1636 [LogNamespace::Legacy],
1637 )
1638 .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
1639 .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
1640 .with_event_field(
1641 &owned_value_path!("host"),
1642 Kind::bytes().or_undefined(),
1643 Some("host"),
1644 )
1645 .unknown_fields(Kind::bytes());
1646
1647 assert_eq!(definitions, Some(expected_definition))
1648 }
1649
1650 fn matches_schema(config: &JournaldConfig, namespace: LogNamespace) {
1651 let record = r#"{
1652 "PRIORITY":"6",
1653 "SYSLOG_FACILITY":"3",
1654 "SYSLOG_IDENTIFIER":"ntpd",
1655 "_BOOT_ID":"124c781146e841ae8d9b4590df8b9231",
1656 "_CAP_EFFECTIVE":"3fffffffff",
1657 "_CMDLINE":"ntpd: [priv]",
1658 "_COMM":"ntpd",
1659 "_EXE":"/usr/sbin/ntpd",
1660 "_GID":"0",
1661 "_MACHINE_ID":"c36e9ea52800a19d214cb71b53263a28",
1662 "_PID":"2156",
1663 "_STREAM_ID":"92c79f4b45c4457490ebdefece29995e",
1664 "_SYSTEMD_CGROUP":"/system.slice/ntpd.service",
1665 "_SYSTEMD_INVOCATION_ID":"496ad5cd046d48e29f37f559a6d176f8",
1666 "_SYSTEMD_SLICE":"system.slice",
1667 "_SYSTEMD_UNIT":"ntpd.service",
1668 "_TRANSPORT":"stdout",
1669 "_UID":"0",
1670 "__MONOTONIC_TIMESTAMP":"98694000446",
1671 "__REALTIME_TIMESTAMP":"1564173027000443",
1672 "host":"my-host.local",
1673 "message":"reply from 192.168.1.2: offset -0.001791 delay 0.000176, next query 1500s",
1674 "source_type":"journald"
1675 }"#;
1676
1677 let json: serde_json::Value = serde_json::from_str(record).unwrap();
1678 let mut event = Event::from(LogEvent::from(vrl::value::Value::from(json)));
1679
1680 event.as_mut_log().insert("timestamp", chrono::Utc::now());
1681
1682 let definitions = config.outputs(namespace).remove(0).schema_definition(true);
1683
1684 definitions.unwrap().assert_valid_for_event(&event);
1685 }
1686
1687 #[test]
1688 fn matches_schema_legacy() {
1689 let config = JournaldConfig::default();
1690
1691 matches_schema(&config, LogNamespace::Legacy)
1692 }
1693}