1use std::{
2 collections::{HashMap, HashSet},
3 io::SeekFrom,
4 path::PathBuf,
5 process::Stdio,
6 str::FromStr,
7 sync::{Arc, LazyLock},
8 time::Duration,
9};
10
11use bytes::Bytes;
12use chrono::{TimeZone, Utc};
13use futures::{StreamExt, poll, stream::BoxStream, task::Poll};
14use nix::{
15 sys::signal::{Signal, kill},
16 unistd::Pid,
17};
18use serde_json::{Error as JsonError, Value as JsonValue};
19use snafu::{ResultExt, Snafu};
20use tokio::{
21 fs::{File, OpenOptions},
22 io::{self, AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
23 process::{Child, Command},
24 sync::{Mutex, MutexGuard},
25 time::sleep,
26};
27use tokio_util::codec::FramedRead;
28use vector_lib::{
29 EstimatedJsonEncodedSizeOf,
30 codecs::{CharacterDelimitedDecoder, decoding::BoxedFramingError},
31 config::{LegacyKey, LogNamespace},
32 configurable::configurable_component,
33 finalizer::OrderedFinalizer,
34 internal_event::{
35 ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol, Registered,
36 },
37 lookup::{metadata_path, owned_value_path, path},
38 schema::Definition,
39};
40use vrl::{
41 event_path,
42 value::{Kind, Value, kind::Collection},
43};
44
45use crate::{
46 SourceSender,
47 config::{
48 DataType, SourceAcknowledgementsConfig, SourceConfig, SourceContext, SourceOutput,
49 log_schema,
50 },
51 event::{BatchNotifier, BatchStatus, BatchStatusReceiver, LogEvent},
52 internal_events::{
53 EventsReceived, JournaldCheckpointFileOpenError, JournaldCheckpointSetError,
54 JournaldInvalidRecordError, JournaldReadError, JournaldStartJournalctlError,
55 StreamClosedError,
56 },
57 serde::bool_or_struct,
58 shutdown::ShutdownSignal,
59};
60
61const BATCH_TIMEOUT: Duration = Duration::from_millis(10);
62
63const CHECKPOINT_FILENAME: &str = "checkpoint.txt";
64const CURSOR: &str = "__CURSOR";
65const HOSTNAME: &str = "_HOSTNAME";
66const MESSAGE: &str = "MESSAGE";
67const SYSTEMD_UNIT: &str = "_SYSTEMD_UNIT";
68const SOURCE_TIMESTAMP: &str = "_SOURCE_REALTIME_TIMESTAMP";
69const RECEIVED_TIMESTAMP: &str = "__REALTIME_TIMESTAMP";
70
71const BACKOFF_DURATION: Duration = Duration::from_secs(1);
72
73static JOURNALCTL: LazyLock<PathBuf> = LazyLock::new(|| "journalctl".into());
74
75#[derive(Debug, Snafu)]
76enum BuildError {
77 #[snafu(display("journalctl failed to execute: {}", source))]
78 JournalctlSpawn { source: io::Error },
79 #[snafu(display(
80 "The unit {:?} is duplicated in both include_units and exclude_units",
81 unit
82 ))]
83 DuplicatedUnit { unit: String },
84 #[snafu(display(
85 "The Journal field/value pair {:?}:{:?} is duplicated in both include_matches and exclude_matches.",
86 field,
87 value,
88 ))]
89 DuplicatedMatches { field: String, value: String },
90}
91
92type Matches = HashMap<String, HashSet<String>>;
93
94#[configurable_component(source("journald", "Collect logs from JournalD."))]
96#[derive(Clone, Debug)]
97#[serde(deny_unknown_fields)]
98pub struct JournaldConfig {
99 #[serde(default)]
101 pub since_now: bool,
102
103 #[serde(default = "crate::serde::default_true")]
105 pub current_boot_only: bool,
106
107 #[serde(default)]
113 #[configurable(metadata(docs::examples = "ntpd", docs::examples = "sysinit.target"))]
114 pub include_units: Vec<String>,
115
116 #[serde(default)]
121 #[configurable(metadata(docs::examples = "badservice", docs::examples = "sysinit.target"))]
122 pub exclude_units: Vec<String>,
123
124 #[serde(default)]
130 #[configurable(metadata(
131 docs::additional_props_description = "The set of field values to match in journal entries that are to be included."
132 ))]
133 #[configurable(metadata(docs::examples = "matches_examples()"))]
134 pub include_matches: Matches,
135
136 #[serde(default)]
141 #[configurable(metadata(
142 docs::additional_props_description = "The set of field values to match in journal entries that are to be excluded."
143 ))]
144 #[configurable(metadata(docs::examples = "matches_examples()"))]
145 pub exclude_matches: Matches,
146
147 #[serde(default)]
156 #[configurable(metadata(docs::examples = "/var/lib/vector"))]
157 #[configurable(metadata(docs::human_name = "Data Directory"))]
158 pub data_dir: Option<PathBuf>,
159
160 #[serde(default)]
164 #[configurable(metadata(docs::examples = "--merge"))]
165 pub extra_args: Vec<String>,
166
167 #[serde(default = "default_batch_size")]
171 #[configurable(metadata(docs::type_unit = "events"))]
172 pub batch_size: usize,
173
174 #[serde(default)]
178 pub journalctl_path: Option<PathBuf>,
179
180 #[serde(default)]
184 pub journal_directory: Option<PathBuf>,
185
186 #[serde(default)]
194 pub journal_namespace: Option<String>,
195
196 #[configurable(derived)]
197 #[serde(default, deserialize_with = "bool_or_struct")]
198 acknowledgements: SourceAcknowledgementsConfig,
199
200 #[serde(default)]
204 #[configurable(
205 deprecated = "This option has been deprecated, use the `remap` transform and `to_syslog_level` function instead."
206 )]
207 remap_priority: bool,
208
209 #[configurable(metadata(docs::hidden))]
211 #[serde(default)]
212 log_namespace: Option<bool>,
213
214 #[serde(default = "crate::serde::default_false")]
219 emit_cursor: bool,
220}
221
222const fn default_batch_size() -> usize {
223 16
224}
225
226fn matches_examples() -> HashMap<String, Vec<String>> {
227 HashMap::<_, _>::from_iter([
228 (
229 "_SYSTEMD_UNIT".to_owned(),
230 vec!["sshd.service".to_owned(), "ntpd.service".to_owned()],
231 ),
232 ("_TRANSPORT".to_owned(), vec!["kernel".to_owned()]),
233 ])
234}
235
236impl JournaldConfig {
237 fn merged_include_matches(&self) -> Matches {
238 Self::merge_units(&self.include_matches, &self.include_units)
239 }
240
241 fn merged_exclude_matches(&self) -> Matches {
242 Self::merge_units(&self.exclude_matches, &self.exclude_units)
243 }
244
245 fn merge_units(matches: &Matches, units: &[String]) -> Matches {
246 let mut matches = matches.clone();
247 for unit in units {
248 let entry = matches.entry(String::from(SYSTEMD_UNIT));
249 entry.or_default().insert(fixup_unit(unit));
250 }
251 matches
252 }
253
254 fn schema_definition(&self, log_namespace: LogNamespace) -> Definition {
256 let schema_definition = match log_namespace {
257 LogNamespace::Vector => Definition::new_with_default_metadata(
258 Kind::bytes().or_null(),
259 [LogNamespace::Vector],
260 ),
261 LogNamespace::Legacy => Definition::new_with_default_metadata(
262 Kind::object(Collection::empty()),
263 [LogNamespace::Legacy],
264 ),
265 };
266
267 let mut schema_definition = schema_definition
268 .with_standard_vector_source_metadata()
269 .with_source_metadata(
271 JournaldConfig::NAME,
272 None,
273 &owned_value_path!("metadata"),
274 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
275 None,
276 )
277 .with_source_metadata(
278 JournaldConfig::NAME,
279 None,
280 &owned_value_path!("timestamp"),
281 Kind::timestamp().or_undefined(),
282 Some("timestamp"),
283 )
284 .with_source_metadata(
285 JournaldConfig::NAME,
286 log_schema().host_key().cloned().map(LegacyKey::Overwrite),
287 &owned_value_path!("host"),
288 Kind::bytes().or_undefined(),
289 Some("host"),
290 );
291
292 if log_namespace == LogNamespace::Legacy {
294 schema_definition = schema_definition.unknown_fields(Kind::bytes());
295 }
296
297 schema_definition
298 }
299}
300
301impl Default for JournaldConfig {
302 fn default() -> Self {
303 Self {
304 since_now: false,
305 current_boot_only: true,
306 include_units: vec![],
307 exclude_units: vec![],
308 include_matches: Default::default(),
309 exclude_matches: Default::default(),
310 data_dir: None,
311 batch_size: default_batch_size(),
312 journalctl_path: None,
313 journal_directory: None,
314 journal_namespace: None,
315 extra_args: vec![],
316 acknowledgements: Default::default(),
317 remap_priority: false,
318 log_namespace: None,
319 emit_cursor: false,
320 }
321 }
322}
323
324impl_generate_config_from_default!(JournaldConfig);
325
326type Record = HashMap<String, String>;
327
328#[async_trait::async_trait]
329#[typetag::serde(name = "journald")]
330impl SourceConfig for JournaldConfig {
331 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
332 if self.remap_priority {
333 warn!(
334 "DEPRECATION, option `remap_priority` has been deprecated. Please use the `remap` transform and function `to_syslog_level` instead."
335 );
336 }
337
338 let data_dir = cx
339 .globals
340 .resolve_and_make_data_subdir(self.data_dir.as_ref(), cx.key.id())?;
342
343 if let Some(unit) = self
344 .include_units
345 .iter()
346 .find(|unit| self.exclude_units.contains(unit))
347 {
348 let unit = unit.into();
349 return Err(BuildError::DuplicatedUnit { unit }.into());
350 }
351
352 let include_matches = self.merged_include_matches();
353 let exclude_matches = self.merged_exclude_matches();
354
355 if let Some((field, value)) = find_duplicate_match(&include_matches, &exclude_matches) {
356 return Err(BuildError::DuplicatedMatches { field, value }.into());
357 }
358
359 let mut checkpoint_path = data_dir;
360 checkpoint_path.push(CHECKPOINT_FILENAME);
361
362 let journalctl_path = self
363 .journalctl_path
364 .clone()
365 .unwrap_or_else(|| JOURNALCTL.clone());
366
367 let starter = StartJournalctl::new(
368 journalctl_path,
369 self.journal_directory.clone(),
370 self.journal_namespace.clone(),
371 self.current_boot_only,
372 self.since_now,
373 self.extra_args.clone(),
374 );
375
376 let batch_size = self.batch_size;
377 let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
378 let log_namespace = cx.log_namespace(self.log_namespace);
379
380 Ok(Box::pin(
381 JournaldSource {
382 include_matches,
383 exclude_matches,
384 checkpoint_path,
385 batch_size,
386 remap_priority: self.remap_priority,
387 out: cx.out,
388 acknowledgements,
389 starter,
390 log_namespace,
391 emit_cursor: self.emit_cursor,
392 }
393 .run_shutdown(cx.shutdown),
394 ))
395 }
396
397 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
398 let schema_definition =
399 self.schema_definition(global_log_namespace.merge(self.log_namespace));
400
401 vec![SourceOutput::new_maybe_logs(
402 DataType::Log,
403 schema_definition,
404 )]
405 }
406
407 fn can_acknowledge(&self) -> bool {
408 true
409 }
410}
411
412struct JournaldSource {
413 include_matches: Matches,
414 exclude_matches: Matches,
415 checkpoint_path: PathBuf,
416 batch_size: usize,
417 remap_priority: bool,
418 out: SourceSender,
419 acknowledgements: bool,
420 starter: StartJournalctl,
421 log_namespace: LogNamespace,
422 emit_cursor: bool,
423}
424
425impl JournaldSource {
426 async fn run_shutdown(self, shutdown: ShutdownSignal) -> Result<(), ()> {
427 let checkpointer = StatefulCheckpointer::new(self.checkpoint_path.clone())
428 .await
429 .map_err(|error| {
430 emit!(JournaldCheckpointFileOpenError {
431 error,
432 path: self
433 .checkpoint_path
434 .to_str()
435 .unwrap_or("unknown")
436 .to_string(),
437 });
438 })?;
439
440 let checkpointer = SharedCheckpointer::new(checkpointer);
441 let finalizer = Finalizer::new(
442 self.acknowledgements,
443 checkpointer.clone(),
444 shutdown.clone(),
445 );
446
447 self.run(checkpointer, finalizer, shutdown).await;
448
449 Ok(())
450 }
451
452 async fn run(
453 mut self,
454 checkpointer: SharedCheckpointer,
455 finalizer: Finalizer,
456 mut shutdown: ShutdownSignal,
457 ) {
458 loop {
459 if matches!(poll!(&mut shutdown), Poll::Ready(_)) {
460 break;
461 }
462
463 info!("Starting journalctl.");
464 let cursor = checkpointer.lock().await.cursor.clone();
465 match self.starter.start(cursor.as_deref()) {
466 Ok((stream, running)) => {
467 if !self.run_stream(stream, &finalizer, shutdown.clone()).await {
468 return;
469 }
470 drop(running);
472 }
473 Err(error) => {
474 emit!(JournaldStartJournalctlError { error });
475 }
476 }
477
478 tokio::select! {
481 _ = &mut shutdown => break,
482 _ = sleep(BACKOFF_DURATION) => (),
483 }
484 }
485 }
486
487 async fn run_stream<'a>(
490 &'a mut self,
491 mut stream: JournalStream,
492 finalizer: &'a Finalizer,
493 mut shutdown: ShutdownSignal,
494 ) -> bool {
495 let bytes_received = register!(BytesReceived::from(Protocol::from("journald")));
496 let events_received = register!(EventsReceived);
497
498 let batch_size = self.batch_size;
499 loop {
500 let mut batch = Batch::new(self);
501
502 while batch.events.is_empty() {
505 let item = tokio::select! {
506 _ = &mut shutdown => return false,
507 item = stream.next() => item,
508 };
509 if !batch.handle_next(item) {
510 return true;
511 }
512 }
513
514 let timeout = tokio::time::sleep(BATCH_TIMEOUT);
515 tokio::pin!(timeout);
516
517 for _ in 1..batch_size {
518 tokio::select! {
519 _ = &mut timeout => break,
520 result = stream.next() => if !batch.handle_next(result) {
521 break;
522 }
523 }
524 }
525 if let Some(x) = batch
526 .finish(finalizer, &bytes_received, &events_received)
527 .await
528 {
529 break x;
530 }
531 }
532 }
533}
534
535struct Batch<'a> {
536 events: Vec<LogEvent>,
537 record_size: usize,
538 exiting: Option<bool>,
539 batch: Option<BatchNotifier>,
540 receiver: Option<BatchStatusReceiver>,
541 source: &'a mut JournaldSource,
542 cursor: Option<String>,
543}
544
545impl<'a> Batch<'a> {
546 fn new(source: &'a mut JournaldSource) -> Self {
547 let (batch, receiver) = BatchNotifier::maybe_new_with_receiver(source.acknowledgements);
548 Self {
549 events: Vec::new(),
550 record_size: 0,
551 exiting: None,
552 batch,
553 receiver,
554 source,
555 cursor: None,
556 }
557 }
558
559 fn handle_next(&mut self, result: Option<Result<Bytes, BoxedFramingError>>) -> bool {
560 match result {
561 None => {
562 warn!("Journalctl process stopped.");
563 self.exiting = Some(true);
564 false
565 }
566 Some(Err(error)) => {
567 emit!(JournaldReadError { error });
568 false
569 }
570 Some(Ok(bytes)) => {
571 match decode_record(&bytes, self.source.remap_priority) {
572 Ok(mut record) => {
573 if self.source.emit_cursor {
574 if let Some(tmp) = record.get(CURSOR) {
575 self.cursor = Some(tmp.clone());
576 }
577 } else if let Some(tmp) = record.remove(CURSOR) {
578 self.cursor = Some(tmp);
579 }
580
581 if !filter_matches(
582 &record,
583 &self.source.include_matches,
584 &self.source.exclude_matches,
585 ) {
586 self.record_size += bytes.len();
587
588 let mut event = create_log_event_from_record(
589 record,
590 &self.batch,
591 self.source.log_namespace,
592 );
593
594 enrich_log_event(&mut event, self.source.log_namespace);
595
596 self.events.push(event);
597 }
598 }
599 Err(error) => {
600 emit!(JournaldInvalidRecordError {
601 error,
602 text: String::from_utf8_lossy(&bytes).into_owned()
603 });
604 }
605 }
606 true
607 }
608 }
609 }
610
611 async fn finish(
612 mut self,
613 finalizer: &Finalizer,
614 bytes_received: &'a Registered<BytesReceived>,
615 events_received: &'a Registered<EventsReceived>,
616 ) -> Option<bool> {
617 drop(self.batch);
618
619 if self.record_size > 0 {
620 bytes_received.emit(ByteSize(self.record_size));
621 }
622
623 if !self.events.is_empty() {
624 let count = self.events.len();
625 let byte_size = self.events.estimated_json_encoded_size_of();
626 events_received.emit(CountByteSize(count, byte_size));
627
628 match self.source.out.send_batch(self.events).await {
629 Ok(_) => {
630 if let Some(cursor) = self.cursor {
631 finalizer.finalize(cursor, self.receiver).await;
632 }
633 }
634 Err(_) => {
635 emit!(StreamClosedError { count });
636 self.exiting = Some(false);
638 }
639 }
640 }
641 self.exiting
642 }
643}
644
645type JournalStream = BoxStream<'static, Result<Bytes, BoxedFramingError>>;
646
647struct StartJournalctl {
648 path: PathBuf,
649 journal_dir: Option<PathBuf>,
650 journal_namespace: Option<String>,
651 current_boot_only: bool,
652 since_now: bool,
653 extra_args: Vec<String>,
654}
655
656impl StartJournalctl {
657 const fn new(
658 path: PathBuf,
659 journal_dir: Option<PathBuf>,
660 journal_namespace: Option<String>,
661 current_boot_only: bool,
662 since_now: bool,
663 extra_args: Vec<String>,
664 ) -> Self {
665 Self {
666 path,
667 journal_dir,
668 journal_namespace,
669 current_boot_only,
670 since_now,
671 extra_args,
672 }
673 }
674
675 fn make_command(&self, checkpoint: Option<&str>) -> Command {
676 let mut command = Command::new(&self.path);
677 command.stdout(Stdio::piped());
678 command.arg("--follow");
679 command.arg("--all");
680 command.arg("--show-cursor");
681 command.arg("--output=json");
682
683 if let Some(dir) = &self.journal_dir {
684 command.arg(format!("--directory={}", dir.display()));
685 }
686
687 if let Some(namespace) = &self.journal_namespace {
688 command.arg(format!("--namespace={namespace}"));
689 }
690
691 if self.current_boot_only {
692 command.arg("--boot");
693 }
694
695 if let Some(cursor) = checkpoint {
696 command.arg(format!("--after-cursor={cursor}"));
697 } else if self.since_now {
698 command.arg("--since=now");
699 } else {
700 command.arg("--since=2000-01-01");
702 }
703
704 if !self.extra_args.is_empty() {
705 command.args(&self.extra_args);
706 }
707
708 command
709 }
710
711 fn start(
712 &mut self,
713 checkpoint: Option<&str>,
714 ) -> crate::Result<(JournalStream, RunningJournalctl)> {
715 let mut command = self.make_command(checkpoint);
716
717 let mut child = command.spawn().context(JournalctlSpawnSnafu)?;
718
719 let stream = FramedRead::new(
720 child.stdout.take().unwrap(),
721 CharacterDelimitedDecoder::new(b'\n'),
722 )
723 .boxed();
724
725 Ok((stream, RunningJournalctl(child)))
726 }
727}
728
729struct RunningJournalctl(Child);
730
731impl Drop for RunningJournalctl {
732 fn drop(&mut self) {
733 if let Some(pid) = self.0.id().and_then(|pid| pid.try_into().ok()) {
734 _ = kill(Pid::from_raw(pid), Signal::SIGTERM);
735 }
736 }
737}
738
739fn enrich_log_event(log: &mut LogEvent, log_namespace: LogNamespace) {
740 match log_namespace {
741 LogNamespace::Vector => {
742 if let Some(host) = log
743 .get(metadata_path!(JournaldConfig::NAME, "metadata"))
744 .and_then(|meta| meta.get(HOSTNAME))
745 {
746 log.insert(metadata_path!(JournaldConfig::NAME, "host"), host.clone());
747 }
748 }
749 LogNamespace::Legacy => {
750 if let Some(host) = log.remove(event_path!(HOSTNAME)) {
751 log_namespace.insert_source_metadata(
752 JournaldConfig::NAME,
753 log,
754 log_schema().host_key().map(LegacyKey::Overwrite),
755 path!("host"),
756 host,
757 );
758 }
759 }
760 }
761
762 let timestamp_value = match log_namespace {
764 LogNamespace::Vector => log
765 .get(metadata_path!(JournaldConfig::NAME, "metadata"))
766 .and_then(|meta| {
767 meta.get(SOURCE_TIMESTAMP)
768 .or_else(|| meta.get(RECEIVED_TIMESTAMP))
769 }),
770 LogNamespace::Legacy => log
771 .get(event_path!(SOURCE_TIMESTAMP))
772 .or_else(|| log.get(event_path!(RECEIVED_TIMESTAMP))),
773 };
774
775 let timestamp = timestamp_value
776 .filter(|&ts| ts.is_bytes())
777 .and_then(|ts| ts.as_str().unwrap().parse::<u64>().ok())
778 .map(|ts| {
779 chrono::Utc
780 .timestamp_opt((ts / 1_000_000) as i64, (ts % 1_000_000) as u32 * 1_000)
781 .single()
782 .expect("invalid timestamp")
783 });
784
785 match log_namespace {
787 LogNamespace::Vector => {
788 log.insert(metadata_path!("vector", "ingest_timestamp"), Utc::now());
789
790 if let Some(ts) = timestamp {
791 log.insert(metadata_path!(JournaldConfig::NAME, "timestamp"), ts);
792 }
793 }
794 LogNamespace::Legacy => {
795 if let Some(ts) = timestamp {
796 log.maybe_insert(log_schema().timestamp_key_target_path(), ts);
797 }
798 }
799 }
800
801 log_namespace.insert_vector_metadata(
803 log,
804 log_schema().source_type_key(),
805 path!("source_type"),
806 JournaldConfig::NAME,
807 );
808}
809
810fn create_log_event_from_record(
811 mut record: Record,
812 batch: &Option<BatchNotifier>,
813 log_namespace: LogNamespace,
814) -> LogEvent {
815 match log_namespace {
816 LogNamespace::Vector => {
817 let message_value = record
818 .remove(MESSAGE)
819 .map(|msg| Value::Bytes(Bytes::from(msg)))
820 .unwrap_or(Value::Null);
821
822 let mut log = LogEvent::from(message_value).with_batch_notifier_option(batch);
823
824 record.iter().for_each(|(key, value)| {
826 log.metadata_mut()
827 .value_mut()
828 .insert(path!(JournaldConfig::NAME, "metadata", key), value.as_str());
829 });
830
831 log
832 }
833 LogNamespace::Legacy => {
834 let mut log = LogEvent::from_iter(record).with_batch_notifier_option(batch);
835
836 if let Some(message) = log.remove(event_path!(MESSAGE)) {
837 log.maybe_insert(log_schema().message_key_target_path(), message);
838 }
839
840 log
841 }
842 }
843}
844
845fn fixup_unit(unit: &str) -> String {
848 if unit.contains('.') {
849 unit.into()
850 } else {
851 format!("{unit}.service")
852 }
853}
854
855fn decode_record(line: &[u8], remap: bool) -> Result<Record, JsonError> {
856 let mut record = serde_json::from_str::<JsonValue>(&String::from_utf8_lossy(line))?;
857 if let Some(record) = record.as_object_mut() {
860 for (_, value) in record.iter_mut().filter(|(_, v)| v.is_array()) {
861 *value = decode_array(value.as_array().expect("already validated"));
862 }
863 }
864 if remap {
865 record.get_mut("PRIORITY").map(remap_priority);
866 }
867 serde_json::from_value(record)
868}
869
870fn decode_array(array: &[JsonValue]) -> JsonValue {
871 decode_array_as_bytes(array).unwrap_or_else(|| {
872 let ser = serde_json::to_string(array).expect("already deserialized");
873 JsonValue::String(ser)
874 })
875}
876
877fn decode_array_as_bytes(array: &[JsonValue]) -> Option<JsonValue> {
878 array
882 .iter()
883 .map(|item| {
884 item.as_u64().and_then(|num| match num {
885 num if num <= u8::MAX as u64 => Some(num as u8),
886 _ => None,
887 })
888 })
889 .collect::<Option<Vec<u8>>>()
890 .map(|array| String::from_utf8_lossy(&array).into())
891}
892
893fn remap_priority(priority: &mut JsonValue) {
894 if let Some(num) = priority.as_str().and_then(|s| usize::from_str(s).ok()) {
895 let text = match num {
896 0 => "EMERG",
897 1 => "ALERT",
898 2 => "CRIT",
899 3 => "ERR",
900 4 => "WARNING",
901 5 => "NOTICE",
902 6 => "INFO",
903 7 => "DEBUG",
904 _ => "UNKNOWN",
905 };
906 *priority = JsonValue::String(text.into());
907 }
908}
909
910fn filter_matches(record: &Record, includes: &Matches, excludes: &Matches) -> bool {
911 match (includes.is_empty(), excludes.is_empty()) {
912 (true, true) => false,
913 (false, true) => !contains_match(record, includes),
914 (true, false) => contains_match(record, excludes),
915 (false, false) => !contains_match(record, includes) || contains_match(record, excludes),
916 }
917}
918
919fn contains_match(record: &Record, matches: &Matches) -> bool {
920 let f = move |(field, value)| {
921 matches
922 .get(field)
923 .map(|x| x.contains(value))
924 .unwrap_or(false)
925 };
926 record.iter().any(f)
927}
928
929fn find_duplicate_match(a_matches: &Matches, b_matches: &Matches) -> Option<(String, String)> {
930 for (a_key, a_values) in a_matches {
931 if let Some(b_values) = b_matches.get(a_key.as_str()) {
932 for (a, b) in a_values
933 .iter()
934 .flat_map(|x| std::iter::repeat(x).zip(b_values.iter()))
935 {
936 if a == b {
937 return Some((a_key.into(), b.into()));
938 }
939 }
940 }
941 }
942 None
943}
944
945enum Finalizer {
946 Sync(SharedCheckpointer),
947 Async(OrderedFinalizer<String>),
948}
949
950impl Finalizer {
951 fn new(
952 acknowledgements: bool,
953 checkpointer: SharedCheckpointer,
954 shutdown: ShutdownSignal,
955 ) -> Self {
956 if acknowledgements {
957 let (finalizer, mut ack_stream) = OrderedFinalizer::new(Some(shutdown));
958 tokio::spawn(async move {
959 while let Some((status, cursor)) = ack_stream.next().await {
960 if status == BatchStatus::Delivered {
961 checkpointer.lock().await.set(cursor).await;
962 }
963 }
964 });
965 Self::Async(finalizer)
966 } else {
967 Self::Sync(checkpointer)
968 }
969 }
970
971 async fn finalize(&self, cursor: String, receiver: Option<BatchStatusReceiver>) {
972 match (self, receiver) {
973 (Self::Sync(checkpointer), None) => checkpointer.lock().await.set(cursor).await,
974 (Self::Async(finalizer), Some(receiver)) => finalizer.add(cursor, receiver),
975 _ => {
976 unreachable!("Cannot have async finalization without a receiver in journald source")
977 }
978 }
979 }
980}
981
982struct Checkpointer {
983 file: File,
984 filename: PathBuf,
985}
986
987impl Checkpointer {
988 async fn new(filename: PathBuf) -> Result<Self, io::Error> {
989 let file = OpenOptions::new()
990 .read(true)
991 .write(true)
992 .create(true)
993 .truncate(false)
994 .open(&filename)
995 .await?;
996 Ok(Checkpointer { file, filename })
997 }
998
999 async fn set(&mut self, token: &str) -> Result<(), io::Error> {
1000 self.file.seek(SeekFrom::Start(0)).await?;
1001 self.file.write_all(format!("{token}\n").as_bytes()).await
1002 }
1003
1004 async fn get(&mut self) -> Result<Option<String>, io::Error> {
1005 let mut buf = Vec::<u8>::new();
1006 self.file.seek(SeekFrom::Start(0)).await?;
1007 self.file.read_to_end(&mut buf).await?;
1008 match buf.len() {
1009 0 => Ok(None),
1010 _ => {
1011 let text = String::from_utf8_lossy(&buf);
1012 match text.find('\n') {
1013 Some(nl) => Ok(Some(String::from(&text[..nl]))),
1014 None => Ok(None), }
1016 }
1017 }
1018 }
1019}
1020
1021struct StatefulCheckpointer {
1022 checkpointer: Checkpointer,
1023 cursor: Option<String>,
1024}
1025
1026impl StatefulCheckpointer {
1027 async fn new(filename: PathBuf) -> Result<Self, io::Error> {
1028 let mut checkpointer = Checkpointer::new(filename).await?;
1029 let cursor = checkpointer.get().await?;
1030 Ok(Self {
1031 checkpointer,
1032 cursor,
1033 })
1034 }
1035
1036 async fn set(&mut self, token: impl Into<String>) {
1037 let token = token.into();
1038 if let Err(error) = self.checkpointer.set(&token).await {
1039 emit!(JournaldCheckpointSetError {
1040 error,
1041 filename: self
1042 .checkpointer
1043 .filename
1044 .to_str()
1045 .unwrap_or("unknown")
1046 .to_string(),
1047 });
1048 }
1049 self.cursor = Some(token);
1050 }
1051}
1052
1053#[derive(Clone)]
1054struct SharedCheckpointer(Arc<Mutex<StatefulCheckpointer>>);
1055
1056impl SharedCheckpointer {
1057 fn new(c: StatefulCheckpointer) -> Self {
1058 Self(Arc::new(Mutex::new(c)))
1059 }
1060
1061 async fn lock(&self) -> MutexGuard<'_, StatefulCheckpointer> {
1062 self.0.lock().await
1063 }
1064}
1065
1066#[cfg(test)]
1067mod checkpointer_tests {
1068 use tempfile::tempdir;
1069 use tokio::fs::read_to_string;
1070
1071 use super::*;
1072
1073 #[test]
1074 fn generate_config() {
1075 crate::test_util::test_generate_config::<JournaldConfig>();
1076 }
1077
1078 #[tokio::test]
1079 async fn journald_checkpointer_works() {
1080 let tempdir = tempdir().unwrap();
1081 let mut filename = tempdir.path().to_path_buf();
1082 filename.push(CHECKPOINT_FILENAME);
1083 let mut checkpointer = Checkpointer::new(filename.clone())
1084 .await
1085 .expect("Creating checkpointer failed!");
1086
1087 assert!(checkpointer.get().await.unwrap().is_none());
1088
1089 checkpointer
1090 .set("first test")
1091 .await
1092 .expect("Setting checkpoint failed");
1093 assert_eq!(checkpointer.get().await.unwrap().unwrap(), "first test");
1094 let contents = read_to_string(filename.clone())
1095 .await
1096 .unwrap_or_else(|_| panic!("Failed to read: {filename:?}"));
1097 assert!(contents.starts_with("first test\n"));
1098
1099 checkpointer
1100 .set("second")
1101 .await
1102 .expect("Setting checkpoint failed");
1103 assert_eq!(checkpointer.get().await.unwrap().unwrap(), "second");
1104 let contents = read_to_string(filename.clone())
1105 .await
1106 .unwrap_or_else(|_| panic!("Failed to read: {filename:?}"));
1107 assert!(contents.starts_with("second\n"));
1108 }
1109}
1110
1111#[cfg(test)]
1112mod tests {
1113 use std::{fs, path::Path};
1114
1115 use tempfile::tempdir;
1116 use tokio::time::{Duration, Instant, sleep, timeout};
1117 use vrl::value::{Value, kind::Collection};
1118
1119 use super::*;
1120 use crate::{
1121 config::ComponentKey,
1122 event::{Event, EventStatus},
1123 test_util::components::assert_source_compliance,
1124 };
1125
1126 const TEST_COMPONENT: &str = "journald-test";
1127 const TEST_JOURNALCTL: &str = "tests/data/journalctl";
1128
1129 async fn run_with_units(iunits: &[&str], xunits: &[&str], cursor: Option<&str>) -> Vec<Event> {
1130 let include_matches = create_unit_matches(iunits.to_vec());
1131 let exclude_matches = create_unit_matches(xunits.to_vec());
1132 run_journal(include_matches, exclude_matches, cursor, false).await
1133 }
1134
1135 async fn run_journal(
1136 include_matches: Matches,
1137 exclude_matches: Matches,
1138 checkpoint: Option<&str>,
1139 emit_cursor: bool,
1140 ) -> Vec<Event> {
1141 assert_source_compliance(&["protocol"], async move {
1142 let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Delivered);
1143
1144 let tempdir = tempdir().unwrap();
1145 let tempdir = tempdir.path().to_path_buf();
1146
1147 if let Some(cursor) = checkpoint {
1148 let mut checkpoint_path = tempdir.clone();
1149 checkpoint_path.push(TEST_COMPONENT);
1150 fs::create_dir(&checkpoint_path).unwrap();
1151 checkpoint_path.push(CHECKPOINT_FILENAME);
1152
1153 let mut checkpointer = Checkpointer::new(checkpoint_path.clone())
1154 .await
1155 .expect("Creating checkpointer failed!");
1156
1157 checkpointer
1158 .set(cursor)
1159 .await
1160 .expect("Could not set checkpoint");
1161 }
1162
1163 let (cx, shutdown) =
1164 SourceContext::new_shutdown(&ComponentKey::from(TEST_COMPONENT), tx);
1165 let config = JournaldConfig {
1166 journalctl_path: Some(TEST_JOURNALCTL.into()),
1167 include_matches,
1168 exclude_matches,
1169 data_dir: Some(tempdir),
1170 remap_priority: true,
1171 acknowledgements: false.into(),
1172 emit_cursor,
1173 ..Default::default()
1174 };
1175 let source = config.build(cx).await.unwrap();
1176 tokio::spawn(async move { source.await.unwrap() });
1177
1178 sleep(Duration::from_millis(100)).await;
1179 shutdown
1180 .shutdown_all(Some(Instant::now() + Duration::from_secs(1)))
1181 .await;
1182
1183 timeout(Duration::from_secs(1), rx.collect()).await.unwrap()
1184 })
1185 .await
1186 }
1187
1188 fn create_unit_matches<S: Into<String>>(units: Vec<S>) -> Matches {
1189 let units: HashSet<String> = units.into_iter().map(Into::into).collect();
1190 let mut map = HashMap::new();
1191 if !units.is_empty() {
1192 map.insert(String::from(SYSTEMD_UNIT), units);
1193 }
1194 map
1195 }
1196
1197 fn create_matches<S: Into<String>>(conditions: Vec<(S, S)>) -> Matches {
1198 let mut matches: Matches = HashMap::new();
1199 for (field, value) in conditions {
1200 matches
1201 .entry(field.into())
1202 .or_default()
1203 .insert(value.into());
1204 }
1205 matches
1206 }
1207
1208 #[tokio::test]
1209 async fn reads_journal() {
1210 let received = run_with_units(&[], &[], None).await;
1211 assert_eq!(received.len(), 8);
1212 assert_eq!(
1213 message(&received[0]),
1214 Value::Bytes("System Initialization".into())
1215 );
1216 assert_eq!(
1217 received[0].as_log()[log_schema().source_type_key().unwrap().to_string()],
1218 "journald".into()
1219 );
1220 assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140001000));
1221 assert_eq!(priority(&received[0]), Value::Bytes("INFO".into()));
1222 assert_eq!(message(&received[1]), Value::Bytes("unit message".into()));
1223 assert_eq!(timestamp(&received[1]), value_ts(1578529839, 140002000));
1224 assert_eq!(priority(&received[1]), Value::Bytes("DEBUG".into()));
1225 }
1226
1227 #[tokio::test]
1228 async fn includes_units() {
1229 let received = run_with_units(&["unit.service"], &[], None).await;
1230 assert_eq!(received.len(), 1);
1231 assert_eq!(message(&received[0]), Value::Bytes("unit message".into()));
1232 }
1233
1234 #[tokio::test]
1235 async fn excludes_units() {
1236 let received = run_with_units(&[], &["unit.service", "badunit.service"], None).await;
1237 assert_eq!(received.len(), 6);
1238 assert_eq!(
1239 message(&received[0]),
1240 Value::Bytes("System Initialization".into())
1241 );
1242 assert_eq!(
1243 message(&received[1]),
1244 Value::Bytes("Missing timestamp".into())
1245 );
1246 assert_eq!(
1247 message(&received[2]),
1248 Value::Bytes("Different timestamps".into())
1249 );
1250 }
1251
1252 #[tokio::test]
1253 async fn emits_cursor() {
1254 let received = run_journal(Matches::new(), Matches::new(), None, true).await;
1255 assert_eq!(cursor(&received[0]), Value::Bytes("1".into()));
1256 assert_eq!(cursor(&received[3]), Value::Bytes("4".into()));
1257 assert_eq!(cursor(&received[7]), Value::Bytes("8".into()));
1258 }
1259
1260 #[tokio::test]
1261 async fn includes_matches() {
1262 let matches = create_matches(vec![("PRIORITY", "ERR")]);
1263 let received = run_journal(matches, HashMap::new(), None, false).await;
1264 assert_eq!(received.len(), 2);
1265 assert_eq!(
1266 message(&received[0]),
1267 Value::Bytes("Different timestamps".into())
1268 );
1269 assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140005000));
1270 assert_eq!(
1271 message(&received[1]),
1272 Value::Bytes("Non-ASCII in other field".into())
1273 );
1274 assert_eq!(timestamp(&received[1]), value_ts(1578529839, 140005000));
1275 }
1276
1277 #[tokio::test]
1278 async fn includes_kernel() {
1279 let matches = create_matches(vec![("_TRANSPORT", "kernel")]);
1280 let received = run_journal(matches, HashMap::new(), None, false).await;
1281 assert_eq!(received.len(), 1);
1282 assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140006000));
1283 assert_eq!(message(&received[0]), Value::Bytes("audit log".into()));
1284 }
1285
1286 #[tokio::test]
1287 async fn excludes_matches() {
1288 let matches = create_matches(vec![("PRIORITY", "INFO"), ("PRIORITY", "DEBUG")]);
1289 let received = run_journal(HashMap::new(), matches, None, false).await;
1290 assert_eq!(received.len(), 5);
1291 assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140003000));
1292 assert_eq!(timestamp(&received[1]), value_ts(1578529839, 140004000));
1293 assert_eq!(timestamp(&received[2]), value_ts(1578529839, 140005000));
1294 assert_eq!(timestamp(&received[3]), value_ts(1578529839, 140005000));
1295 assert_eq!(timestamp(&received[4]), value_ts(1578529839, 140006000));
1296 }
1297
1298 #[tokio::test]
1299 async fn handles_checkpoint() {
1300 let received = run_with_units(&[], &[], Some("1")).await;
1301 assert_eq!(received.len(), 7);
1302 assert_eq!(message(&received[0]), Value::Bytes("unit message".into()));
1303 assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140002000));
1304 }
1305
1306 #[tokio::test]
1307 async fn parses_array_messages() {
1308 let received = run_with_units(&["badunit.service"], &[], None).await;
1309 assert_eq!(received.len(), 1);
1310 assert_eq!(message(&received[0]), Value::Bytes("¿Hello?".into()));
1311 }
1312
1313 #[tokio::test]
1314 async fn parses_array_fields() {
1315 let received = run_with_units(&["syslog.service"], &[], None).await;
1316 assert_eq!(received.len(), 1);
1317 assert_eq!(
1318 received[0].as_log()["SYSLOG_RAW"],
1319 Value::Bytes("¿World?".into())
1320 );
1321 }
1322
1323 #[tokio::test]
1324 async fn parses_string_sequences() {
1325 let received = run_with_units(&["NetworkManager.service"], &[], None).await;
1326 assert_eq!(received.len(), 1);
1327 assert_eq!(
1328 received[0].as_log()["SYSLOG_FACILITY"],
1329 Value::Bytes(r#"["DHCP4","DHCP6"]"#.into())
1330 );
1331 }
1332
1333 #[tokio::test]
1334 async fn handles_missing_timestamp() {
1335 let received = run_with_units(&["stdout"], &[], None).await;
1336 assert_eq!(received.len(), 2);
1337 assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140004000));
1338 assert_eq!(timestamp(&received[1]), value_ts(1578529839, 140005000));
1339 }
1340
1341 #[tokio::test]
1342 async fn handles_acknowledgements() {
1343 let (tx, mut rx) = SourceSender::new_test();
1344
1345 let tempdir = tempdir().unwrap();
1346 let tempdir = tempdir.path().to_path_buf();
1347 let mut checkpoint_path = tempdir.clone();
1348 checkpoint_path.push(TEST_COMPONENT);
1349 fs::create_dir(&checkpoint_path).unwrap();
1350 checkpoint_path.push(CHECKPOINT_FILENAME);
1351
1352 let mut checkpointer = Checkpointer::new(checkpoint_path.clone())
1353 .await
1354 .expect("Creating checkpointer failed!");
1355
1356 let config = JournaldConfig {
1357 journalctl_path: Some(TEST_JOURNALCTL.into()),
1358 data_dir: Some(tempdir),
1359 remap_priority: true,
1360 acknowledgements: true.into(),
1361 ..Default::default()
1362 };
1363 let (cx, _shutdown) = SourceContext::new_shutdown(&ComponentKey::from(TEST_COMPONENT), tx);
1364 let source = config.build(cx).await.unwrap();
1365 tokio::spawn(async move { source.await.unwrap() });
1366
1367 assert_eq!(checkpointer.get().await.unwrap(), None);
1369
1370 tokio::time::sleep(Duration::from_millis(100)).await;
1371
1372 let mut count = 0;
1374 while let Poll::Ready(Some(event)) = futures::poll!(rx.next()) {
1375 assert_eq!(checkpointer.get().await.unwrap(), None);
1377 event.metadata().update_status(EventStatus::Delivered);
1378 count += 1;
1379 }
1380 assert_eq!(count, 8);
1381
1382 tokio::time::sleep(Duration::from_millis(100)).await;
1383 assert_eq!(checkpointer.get().await.unwrap().as_deref(), Some("8"));
1384 }
1385
1386 #[test]
1387 fn filter_matches_works_correctly() {
1388 let empty: Matches = HashMap::new();
1389 let includes = create_unit_matches(vec!["one", "two"]);
1390 let excludes = create_unit_matches(vec!["foo", "bar"]);
1391
1392 let zero = HashMap::new();
1393 assert!(!filter_matches(&zero, &empty, &empty));
1394 assert!(filter_matches(&zero, &includes, &empty));
1395 assert!(!filter_matches(&zero, &empty, &excludes));
1396 assert!(filter_matches(&zero, &includes, &excludes));
1397 let mut one = HashMap::new();
1398 one.insert(String::from(SYSTEMD_UNIT), String::from("one"));
1399 assert!(!filter_matches(&one, &empty, &empty));
1400 assert!(!filter_matches(&one, &includes, &empty));
1401 assert!(!filter_matches(&one, &empty, &excludes));
1402 assert!(!filter_matches(&one, &includes, &excludes));
1403 let mut two = HashMap::new();
1404 two.insert(String::from(SYSTEMD_UNIT), String::from("bar"));
1405 assert!(!filter_matches(&two, &empty, &empty));
1406 assert!(filter_matches(&two, &includes, &empty));
1407 assert!(filter_matches(&two, &empty, &excludes));
1408 assert!(filter_matches(&two, &includes, &excludes));
1409 }
1410
1411 #[test]
1412 fn merges_units_and_matches_option() {
1413 let include_units = vec!["one", "two"].into_iter().map(String::from).collect();
1414 let include_matches = create_matches(vec![
1415 ("_SYSTEMD_UNIT", "three.service"),
1416 ("_TRANSPORT", "kernel"),
1417 ]);
1418
1419 let exclude_units = vec!["foo", "bar"].into_iter().map(String::from).collect();
1420 let exclude_matches = create_matches(vec![
1421 ("_SYSTEMD_UNIT", "baz.service"),
1422 ("PRIORITY", "DEBUG"),
1423 ]);
1424
1425 let journald_config = JournaldConfig {
1426 include_units,
1427 include_matches,
1428 exclude_units,
1429 exclude_matches,
1430 ..Default::default()
1431 };
1432
1433 let hashset =
1434 |v: &[&str]| -> HashSet<String> { v.iter().copied().map(String::from).collect() };
1435
1436 let matches = journald_config.merged_include_matches();
1437 let units = matches.get("_SYSTEMD_UNIT").unwrap();
1438 assert_eq!(
1439 units,
1440 &hashset(&["one.service", "two.service", "three.service"])
1441 );
1442 let units = matches.get("_TRANSPORT").unwrap();
1443 assert_eq!(units, &hashset(&["kernel"]));
1444
1445 let matches = journald_config.merged_exclude_matches();
1446 let units = matches.get("_SYSTEMD_UNIT").unwrap();
1447 assert_eq!(
1448 units,
1449 &hashset(&["foo.service", "bar.service", "baz.service"])
1450 );
1451 let units = matches.get("PRIORITY").unwrap();
1452 assert_eq!(units, &hashset(&["DEBUG"]));
1453 }
1454
1455 #[test]
1456 fn find_duplicate_match_works_correctly() {
1457 let include_matches = create_matches(vec![("_TRANSPORT", "kernel")]);
1458 let exclude_matches = create_matches(vec![("_TRANSPORT", "kernel")]);
1459 let (field, value) = find_duplicate_match(&include_matches, &exclude_matches).unwrap();
1460 assert_eq!(field, "_TRANSPORT");
1461 assert_eq!(value, "kernel");
1462
1463 let empty = HashMap::new();
1464 let actual = find_duplicate_match(&empty, &empty);
1465 assert!(actual.is_none());
1466
1467 let actual = find_duplicate_match(&include_matches, &empty);
1468 assert!(actual.is_none());
1469
1470 let actual = find_duplicate_match(&empty, &exclude_matches);
1471 assert!(actual.is_none());
1472 }
1473
1474 #[test]
1475 fn command_options() {
1476 let path = PathBuf::from("journalctl");
1477
1478 let journal_dir = None;
1479 let journal_namespace = None;
1480 let current_boot_only = false;
1481 let cursor = None;
1482 let since_now = false;
1483 let extra_args = vec![];
1484
1485 let command = create_command(
1486 &path,
1487 journal_dir,
1488 journal_namespace,
1489 current_boot_only,
1490 since_now,
1491 cursor,
1492 extra_args,
1493 );
1494 let cmd_line = format!("{command:?}");
1495 assert!(!cmd_line.contains("--directory="));
1496 assert!(!cmd_line.contains("--namespace="));
1497 assert!(!cmd_line.contains("--boot"));
1498 assert!(cmd_line.contains("--since=2000-01-01"));
1499
1500 let journal_dir = None;
1501 let journal_namespace = None;
1502 let since_now = true;
1503 let extra_args = vec![];
1504
1505 let command = create_command(
1506 &path,
1507 journal_dir,
1508 journal_namespace,
1509 current_boot_only,
1510 since_now,
1511 cursor,
1512 extra_args,
1513 );
1514 let cmd_line = format!("{command:?}");
1515 assert!(cmd_line.contains("--since=now"));
1516
1517 let journal_dir = Some(PathBuf::from("/tmp/journal-dir"));
1518 let journal_namespace = Some(String::from("my_namespace"));
1519 let current_boot_only = true;
1520 let cursor = Some("2021-01-01");
1521 let extra_args = vec!["--merge".to_string()];
1522
1523 let command = create_command(
1524 &path,
1525 journal_dir,
1526 journal_namespace,
1527 current_boot_only,
1528 since_now,
1529 cursor,
1530 extra_args,
1531 );
1532 let cmd_line = format!("{command:?}");
1533 assert!(cmd_line.contains("--directory=/tmp/journal-dir"));
1534 assert!(cmd_line.contains("--namespace=my_namespace"));
1535 assert!(cmd_line.contains("--boot"));
1536 assert!(cmd_line.contains("--after-cursor="));
1537 assert!(cmd_line.contains("--merge"));
1538 }
1539
1540 fn create_command(
1541 path: &Path,
1542 journal_dir: Option<PathBuf>,
1543 journal_namespace: Option<String>,
1544 current_boot_only: bool,
1545 since_now: bool,
1546 cursor: Option<&str>,
1547 extra_args: Vec<String>,
1548 ) -> Command {
1549 StartJournalctl::new(
1550 path.into(),
1551 journal_dir,
1552 journal_namespace,
1553 current_boot_only,
1554 since_now,
1555 extra_args,
1556 )
1557 .make_command(cursor)
1558 }
1559
1560 fn message(event: &Event) -> Value {
1561 event.as_log()[log_schema().message_key().unwrap().to_string()].clone()
1562 }
1563
1564 fn timestamp(event: &Event) -> Value {
1565 event.as_log()[log_schema().timestamp_key().unwrap().to_string()].clone()
1566 }
1567
1568 fn cursor(event: &Event) -> Value {
1569 event.as_log()[CURSOR].clone()
1570 }
1571
1572 fn value_ts(secs: i64, usecs: u32) -> Value {
1573 Value::Timestamp(
1574 chrono::Utc
1575 .timestamp_opt(secs, usecs)
1576 .single()
1577 .expect("invalid timestamp"),
1578 )
1579 }
1580
1581 fn priority(event: &Event) -> Value {
1582 event.as_log()["PRIORITY"].clone()
1583 }
1584
1585 #[test]
1586 fn output_schema_definition_vector_namespace() {
1587 let config = JournaldConfig {
1588 log_namespace: Some(true),
1589 ..Default::default()
1590 };
1591
1592 let definitions = config
1593 .outputs(LogNamespace::Vector)
1594 .remove(0)
1595 .schema_definition(true);
1596
1597 let expected_definition =
1598 Definition::new_with_default_metadata(Kind::bytes().or_null(), [LogNamespace::Vector])
1599 .with_metadata_field(
1600 &owned_value_path!("vector", "source_type"),
1601 Kind::bytes(),
1602 None,
1603 )
1604 .with_metadata_field(
1605 &owned_value_path!("vector", "ingest_timestamp"),
1606 Kind::timestamp(),
1607 None,
1608 )
1609 .with_metadata_field(
1610 &owned_value_path!(JournaldConfig::NAME, "metadata"),
1611 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1612 None,
1613 )
1614 .with_metadata_field(
1615 &owned_value_path!(JournaldConfig::NAME, "timestamp"),
1616 Kind::timestamp().or_undefined(),
1617 Some("timestamp"),
1618 )
1619 .with_metadata_field(
1620 &owned_value_path!(JournaldConfig::NAME, "host"),
1621 Kind::bytes().or_undefined(),
1622 Some("host"),
1623 );
1624
1625 assert_eq!(definitions, Some(expected_definition))
1626 }
1627
1628 #[test]
1629 fn output_schema_definition_legacy_namespace() {
1630 let config = JournaldConfig::default();
1631
1632 let definitions = config
1633 .outputs(LogNamespace::Legacy)
1634 .remove(0)
1635 .schema_definition(true);
1636
1637 let expected_definition = Definition::new_with_default_metadata(
1638 Kind::object(Collection::empty()),
1639 [LogNamespace::Legacy],
1640 )
1641 .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
1642 .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
1643 .with_event_field(
1644 &owned_value_path!("host"),
1645 Kind::bytes().or_undefined(),
1646 Some("host"),
1647 )
1648 .unknown_fields(Kind::bytes());
1649
1650 assert_eq!(definitions, Some(expected_definition))
1651 }
1652
1653 fn matches_schema(config: &JournaldConfig, namespace: LogNamespace) {
1654 let record = r#"{
1655 "PRIORITY":"6",
1656 "SYSLOG_FACILITY":"3",
1657 "SYSLOG_IDENTIFIER":"ntpd",
1658 "_BOOT_ID":"124c781146e841ae8d9b4590df8b9231",
1659 "_CAP_EFFECTIVE":"3fffffffff",
1660 "_CMDLINE":"ntpd: [priv]",
1661 "_COMM":"ntpd",
1662 "_EXE":"/usr/sbin/ntpd",
1663 "_GID":"0",
1664 "_MACHINE_ID":"c36e9ea52800a19d214cb71b53263a28",
1665 "_PID":"2156",
1666 "_STREAM_ID":"92c79f4b45c4457490ebdefece29995e",
1667 "_SYSTEMD_CGROUP":"/system.slice/ntpd.service",
1668 "_SYSTEMD_INVOCATION_ID":"496ad5cd046d48e29f37f559a6d176f8",
1669 "_SYSTEMD_SLICE":"system.slice",
1670 "_SYSTEMD_UNIT":"ntpd.service",
1671 "_TRANSPORT":"stdout",
1672 "_UID":"0",
1673 "__MONOTONIC_TIMESTAMP":"98694000446",
1674 "__REALTIME_TIMESTAMP":"1564173027000443",
1675 "host":"my-host.local",
1676 "message":"reply from 192.168.1.2: offset -0.001791 delay 0.000176, next query 1500s",
1677 "source_type":"journald"
1678 }"#;
1679
1680 let json: serde_json::Value = serde_json::from_str(record).unwrap();
1681 let mut event = Event::from(LogEvent::from(vrl::value::Value::from(json)));
1682
1683 event.as_mut_log().insert("timestamp", chrono::Utc::now());
1684
1685 let definitions = config.outputs(namespace).remove(0).schema_definition(true);
1686
1687 definitions.unwrap().assert_valid_for_event(&event);
1688 }
1689
1690 #[test]
1691 fn matches_schema_legacy() {
1692 let config = JournaldConfig::default();
1693
1694 matches_schema(&config, LogNamespace::Legacy)
1695 }
1696}