1use std::{
2 collections::HashMap,
3 convert::TryFrom,
4 future::ready,
5 pin::Pin,
6 sync::{Arc, LazyLock},
7 time::Duration,
8};
9
10use bollard::{
11 Docker,
12 container::LogOutput,
13 errors::Error as DockerError,
14 query_parameters::{
15 EventsOptionsBuilder, InspectContainerOptions, ListContainersOptionsBuilder,
16 LogsOptionsBuilder,
17 },
18 service::{ContainerInspectResponse, EventMessage},
19};
20use bytes::{Buf, Bytes};
21use chrono::{DateTime, FixedOffset, Local, ParseError, Utc};
22use futures::{Stream, StreamExt};
23use serde_with::serde_as;
24use tokio::sync::mpsc;
25use tracing_futures::Instrument;
26use vector_lib::{
27 codecs::{BytesDeserializer, BytesDeserializerConfig},
28 config::{LegacyKey, LogNamespace},
29 configurable::configurable_component,
30 internal_event::{ByteSize, BytesReceived, InternalEventHandle as _, Protocol, Registered},
31 lookup::{
32 OwnedValuePath, PathPrefix, lookup_v2::OptionalValuePath, metadata_path, owned_value_path,
33 path,
34 },
35};
36use vrl::{
37 event_path,
38 value::{Kind, kind::Collection},
39};
40
41use super::util::MultilineConfig;
42use crate::{
43 SourceSender,
44 common::backoff::ExponentialBackoff,
45 config::{DataType, SourceConfig, SourceContext, SourceOutput, log_schema},
46 docker::{DockerTlsConfig, docker},
47 event::{self, EstimatedJsonEncodedSizeOf, LogEvent, Value, merge_state::LogEventMergeState},
48 internal_events::{
49 DockerLogsCommunicationError, DockerLogsContainerEventReceived,
50 DockerLogsContainerMetadataFetchError, DockerLogsContainerUnwatch,
51 DockerLogsContainerWatch, DockerLogsEventsReceived,
52 DockerLogsLoggingDriverUnsupportedError, DockerLogsTimestampParseError, StreamClosedError,
53 },
54 line_agg::{self, LineAgg},
55 shutdown::ShutdownSignal,
56};
57
58#[cfg(test)]
59mod tests;
60
61const IMAGE: &str = "image";
62const CREATED_AT: &str = "container_created_at";
63const NAME: &str = "container_name";
64const STREAM: &str = "stream";
65const CONTAINER: &str = "container_id";
66const MIN_HOSTNAME_LENGTH: usize = 6;
68
69static STDERR: LazyLock<Bytes> = LazyLock::new(|| "stderr".into());
70static STDOUT: LazyLock<Bytes> = LazyLock::new(|| "stdout".into());
71static CONSOLE: LazyLock<Bytes> = LazyLock::new(|| "console".into());
72
73#[serde_as]
75#[configurable_component(source("docker_logs", "Collect container logs from a Docker Daemon."))]
76#[derive(Clone, Debug)]
77#[serde(deny_unknown_fields, default)]
78pub struct DockerLogsConfig {
79 host_key: Option<OptionalValuePath>,
85
86 #[configurable(metadata(docs::examples = "http://localhost:2375"))]
94 #[configurable(metadata(docs::examples = "https://localhost:2376"))]
95 #[configurable(metadata(docs::examples = "unix:///var/run/docker.sock"))]
96 #[configurable(metadata(docs::examples = "npipe:////./pipe/docker_engine"))]
97 #[configurable(metadata(docs::examples = "/var/run/docker.sock"))]
98 #[configurable(metadata(docs::examples = "//./pipe/docker_engine"))]
99 docker_host: Option<String>,
100
101 #[configurable(metadata(
113 docs::examples = "exclude_",
114 docs::examples = "exclude_me_0",
115 docs::examples = "ad08cc418cf9"
116 ))]
117 exclude_containers: Option<Vec<String>>, #[configurable(metadata(
129 docs::examples = "include_",
130 docs::examples = "include_me_0",
131 docs::examples = "ad08cc418cf9"
132 ))]
133 include_containers: Option<Vec<String>>, #[configurable(metadata(
139 docs::examples = "org.opencontainers.image.vendor=Vector",
140 docs::examples = "com.mycorp.internal.animal=fish",
141 ))]
142 include_labels: Option<Vec<String>>,
143
144 #[configurable(metadata(docs::examples = "httpd", docs::examples = "redis",))]
148 include_images: Option<Vec<String>>,
149
150 #[serde(default = "default_partial_event_marker_field")]
155 partial_event_marker_field: Option<String>,
156
157 auto_partial_merge: bool,
159
160 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
162 #[serde(default = "default_retry_backoff_secs")]
163 #[configurable(metadata(docs::human_name = "Retry Backoff"))]
164 retry_backoff_secs: Duration,
165
166 #[configurable(derived)]
170 multiline: Option<MultilineConfig>,
171
172 #[configurable(derived)]
173 tls: Option<DockerTlsConfig>,
174
175 #[serde(default)]
177 #[configurable(metadata(docs::hidden))]
178 pub log_namespace: Option<bool>,
179}
180
181impl Default for DockerLogsConfig {
182 fn default() -> Self {
183 Self {
184 host_key: None,
185 docker_host: None,
186 tls: None,
187 exclude_containers: None,
188 include_containers: None,
189 include_labels: None,
190 include_images: None,
191 partial_event_marker_field: default_partial_event_marker_field(),
192 auto_partial_merge: true,
193 multiline: None,
194 retry_backoff_secs: default_retry_backoff_secs(),
195 log_namespace: None,
196 }
197 }
198}
199
200fn default_partial_event_marker_field() -> Option<String> {
201 Some(event::PARTIAL.to_string())
202}
203
204const fn default_retry_backoff_secs() -> Duration {
205 Duration::from_secs(2)
206}
207
208impl DockerLogsConfig {
209 fn container_name_or_id_included<'a>(
210 &self,
211 id: &str,
212 names: impl IntoIterator<Item = &'a str>,
213 ) -> bool {
214 let containers: Vec<String> = names.into_iter().map(Into::into).collect();
215
216 self.include_containers
217 .as_ref()
218 .map(|include_list| Self::name_or_id_matches(id, &containers, include_list))
219 .unwrap_or(true)
220 && !(self
221 .exclude_containers
222 .as_ref()
223 .map(|exclude_list| Self::name_or_id_matches(id, &containers, exclude_list))
224 .unwrap_or(false))
225 }
226
227 fn name_or_id_matches(id: &str, names: &[String], items: &[String]) -> bool {
228 items.iter().any(|flag| id.starts_with(flag))
229 || names
230 .iter()
231 .any(|name| items.iter().any(|item| name.starts_with(item)))
232 }
233
234 fn with_empty_partial_event_marker_field_as_none(mut self) -> Self {
235 if let Some(val) = &self.partial_event_marker_field
236 && val.is_empty()
237 {
238 self.partial_event_marker_field = None;
239 }
240 self
241 }
242}
243
244impl_generate_config_from_default!(DockerLogsConfig);
245
246#[async_trait::async_trait]
247#[typetag::serde(name = "docker_logs")]
248impl SourceConfig for DockerLogsConfig {
249 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
250 let log_namespace = cx.log_namespace(self.log_namespace);
251 let source = DockerLogsSource::new(
252 self.clone().with_empty_partial_event_marker_field_as_none(),
253 cx.out,
254 cx.shutdown.clone(),
255 log_namespace,
256 )?;
257
258 let fut = async move {
260 match source.handle_running_containers().await {
261 Ok(source) => source.run().await,
262 Err(error) => {
263 error!(
264 message = "Listing currently running containers failed.",
265 ?error
266 );
267 }
268 }
269 };
270
271 let shutdown = cx.shutdown;
272 Ok(Box::pin(async move {
274 Ok(tokio::select! {
275 _ = fut => {}
276 _ = shutdown => {}
277 })
278 }))
279 }
280
281 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
282 let host_key = self
283 .host_key
284 .clone()
285 .unwrap_or(log_schema().host_key().cloned().into())
286 .path
287 .map(LegacyKey::Overwrite);
288
289 let schema_definition = BytesDeserializerConfig
290 .schema_definition(global_log_namespace.merge(self.log_namespace))
291 .with_source_metadata(
292 Self::NAME,
293 host_key,
294 &owned_value_path!("host"),
295 Kind::bytes().or_undefined(),
296 Some("host"),
297 )
298 .with_source_metadata(
299 Self::NAME,
300 Some(LegacyKey::Overwrite(owned_value_path!(CONTAINER))),
301 &owned_value_path!(CONTAINER),
302 Kind::bytes(),
303 None,
304 )
305 .with_source_metadata(
306 Self::NAME,
307 Some(LegacyKey::Overwrite(owned_value_path!(IMAGE))),
308 &owned_value_path!(IMAGE),
309 Kind::bytes(),
310 None,
311 )
312 .with_source_metadata(
313 Self::NAME,
314 Some(LegacyKey::Overwrite(owned_value_path!(NAME))),
315 &owned_value_path!(NAME),
316 Kind::bytes(),
317 None,
318 )
319 .with_source_metadata(
320 Self::NAME,
321 Some(LegacyKey::Overwrite(owned_value_path!(CREATED_AT))),
322 &owned_value_path!(CREATED_AT),
323 Kind::timestamp(),
324 None,
325 )
326 .with_source_metadata(
327 Self::NAME,
328 Some(LegacyKey::Overwrite(owned_value_path!("label"))),
329 &owned_value_path!("labels"),
330 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
331 None,
332 )
333 .with_source_metadata(
334 Self::NAME,
335 Some(LegacyKey::Overwrite(owned_value_path!(STREAM))),
336 &owned_value_path!(STREAM),
337 Kind::bytes(),
338 None,
339 )
340 .with_source_metadata(
341 Self::NAME,
342 log_schema()
343 .timestamp_key()
344 .cloned()
345 .map(LegacyKey::Overwrite),
346 &owned_value_path!("timestamp"),
347 Kind::timestamp(),
348 Some("timestamp"),
349 )
350 .with_vector_metadata(
351 log_schema().source_type_key(),
352 &owned_value_path!("source_type"),
353 Kind::bytes(),
354 None,
355 )
356 .with_vector_metadata(
357 None,
358 &owned_value_path!("ingest_timestamp"),
359 Kind::timestamp(),
360 None,
361 );
362
363 vec![SourceOutput::new_maybe_logs(
364 DataType::Log,
365 schema_definition,
366 )]
367 }
368
369 fn can_acknowledge(&self) -> bool {
370 false
371 }
372}
373
374struct DockerLogsSourceCore {
375 config: DockerLogsConfig,
376 line_agg_config: Option<line_agg::Config>,
377 docker: Docker,
378 now_timestamp: DateTime<Utc>,
380}
381
382impl DockerLogsSourceCore {
383 fn new(config: DockerLogsConfig) -> crate::Result<Self> {
384 let docker = docker(config.docker_host.clone(), config.tls.clone())?;
387
388 let now = Local::now();
390 info!(
391 message = "Capturing logs from now on.",
392 now = %now.to_rfc3339()
393 );
394
395 let line_agg_config = if let Some(ref multiline_config) = config.multiline {
396 Some(line_agg::Config::try_from(multiline_config)?)
397 } else {
398 None
399 };
400
401 Ok(DockerLogsSourceCore {
402 config,
403 line_agg_config,
404 docker,
405 now_timestamp: now.into(),
406 })
407 }
408
409 fn docker_logs_event_stream(
411 &self,
412 ) -> impl Stream<Item = Result<EventMessage, DockerError>> + Send + use<> {
413 let mut filters = HashMap::new();
414
415 filters.insert(
422 "event".to_owned(),
423 vec![
424 "start".to_owned(),
425 "unpause".to_owned(),
426 "die".to_owned(),
427 "pause".to_owned(),
428 ],
429 );
430 filters.insert("type".to_owned(), vec!["container".to_owned()]);
431
432 if let Some(include_labels) = &self.config.include_labels {
434 filters.insert("label".to_owned(), include_labels.clone());
435 }
436
437 if let Some(include_images) = &self.config.include_images {
438 filters.insert("image".to_owned(), include_images.clone());
439 }
440
441 self.docker.events(Some(
442 EventsOptionsBuilder::new()
443 .since(&self.now_timestamp.timestamp().to_string())
444 .filters(&filters)
445 .build(),
446 ))
447 }
448}
449
450struct DockerLogsSource {
462 esb: EventStreamBuilder,
463 events: Pin<Box<dyn Stream<Item = Result<EventMessage, DockerError>> + Send>>,
465 containers: HashMap<ContainerId, ContainerState>,
467 main_recv: mpsc::UnboundedReceiver<Result<ContainerLogInfo, (ContainerId, ErrorPersistence)>>,
469 hostname: Option<String>,
471 backoff_duration: Duration,
472 events_backoff: ExponentialBackoff,
474}
475
476impl DockerLogsSource {
477 fn new(
478 config: DockerLogsConfig,
479 out: SourceSender,
480 shutdown: ShutdownSignal,
481 log_namespace: LogNamespace,
482 ) -> crate::Result<DockerLogsSource> {
483 let backoff_secs = config.retry_backoff_secs;
484
485 let host_key = config
486 .host_key
487 .clone()
488 .unwrap_or(log_schema().host_key().cloned().into());
489 let hostname = crate::get_hostname().ok();
490
491 let core = DockerLogsSourceCore::new(config)?;
493
494 let events = core.docker_logs_event_stream();
496 info!(message = "Listening to docker log events.");
497
498 let (main_send, main_recv) =
500 mpsc::unbounded_channel::<Result<ContainerLogInfo, (ContainerId, ErrorPersistence)>>();
501
502 let esb = EventStreamBuilder {
511 host_key,
512 hostname: hostname.clone(),
513 core: Arc::new(core),
514 out,
515 main_send,
516 shutdown,
517 log_namespace,
518 };
519
520 Ok(DockerLogsSource {
521 esb,
522 events: Box::pin(events),
523 containers: HashMap::new(),
524 main_recv,
525 hostname,
526 backoff_duration: backoff_secs,
527 events_backoff: ExponentialBackoff::default(),
528 })
529 }
530
531 async fn handle_running_containers(mut self) -> crate::Result<Self> {
533 let mut filters = HashMap::new();
534
535 if let Some(include_labels) = &self.esb.core.config.include_labels {
537 filters.insert("label".to_owned(), include_labels.clone());
538 }
539
540 if let Some(include_images) = &self.esb.core.config.include_images {
541 filters.insert("ancestor".to_owned(), include_images.clone());
542 }
543
544 self.esb
545 .core
546 .docker
547 .list_containers(Some(
548 ListContainersOptionsBuilder::new()
549 .all(false)
550 .filters(&filters)
551 .build(),
552 ))
553 .await?
554 .into_iter()
555 .for_each(|container| {
556 let id = container.id.unwrap();
557 let names = container.names.unwrap();
558
559 trace!(message = "Found already running container.", id = %id, names = ?names);
560
561 if self.exclude_self(id.as_str()) {
562 info!(message = "Excluded self container.", id = %id);
563 return;
564 }
565
566 if !self.esb.core.config.container_name_or_id_included(
567 id.as_str(),
568 names.iter().map(|s| {
569 let s = s.as_str();
571 if s.starts_with('/') {
572 s.split_at('/'.len_utf8()).1
573 } else {
574 s
575 }
576 }),
577 ) {
578 info!(message = "Excluded container.", id = %id);
579 return;
580 }
581
582 let id = ContainerId::new(id);
583 self.containers.insert(id.clone(), self.esb.start(id, None));
584 });
585
586 Ok(self)
587 }
588
589 async fn run(mut self) {
590 loop {
591 tokio::select! {
592 value = self.main_recv.recv() => {
593 match value {
594 Some(Ok(info)) => {
595 let state = self
596 .containers
597 .get_mut(&info.id)
598 .expect("Every ContainerLogInfo has it's ContainerState");
599 if state.return_info(info) {
600 self.esb.restart(state);
601 }
602 },
603 Some(Err((id,persistence))) => {
604 let state = self
605 .containers
606 .remove(&id)
607 .expect("Every started ContainerId has it's ContainerState");
608 match persistence{
609 ErrorPersistence::Transient => if state.is_running() {
610 let backoff= Some(self.backoff_duration);
611 self.containers.insert(id.clone(), self.esb.start(id, backoff));
612 }
613 ErrorPersistence::Permanent => (),
615 }
616 }
617 None => {
618 error!(message = "The docker_logs source main stream has ended unexpectedly.", internal_log_rate_limit = false);
619 info!(message = "Shutting down docker_logs source.");
620 return;
621 }
622 };
623 }
624 value = self.events.next() => {
625 match value {
626 Some(Ok(mut event)) => {
627 self.events_backoff.reset();
629
630 let action = event.action.unwrap();
631 let actor = event.actor.take().unwrap();
632 let id = actor.id.unwrap();
633 let attributes = actor.attributes.unwrap();
634
635 emit!(DockerLogsContainerEventReceived { container_id: &id, action: &action });
636
637 let id = ContainerId::new(id.to_owned());
638
639 match action.as_str() {
641 "die" | "pause" => {
642 if let Some(state) = self.containers.get_mut(&id) {
643 state.stopped();
644 }
645 }
646 "start" | "unpause" => {
647 if let Some(state) = self.containers.get_mut(&id) {
648 state.running();
649 self.esb.restart(state);
650 } else {
651 let include_name =
652 self.esb.core.config.container_name_or_id_included(
653 id.as_str(),
654 attributes.get("name").map(|s| s.as_str()),
655 );
656
657 let exclude_self = self.exclude_self(id.as_str());
658
659 if include_name && !exclude_self {
660 self.containers.insert(id.clone(), self.esb.start(id, None));
661 }
662 }
663 }
664 _ => {},
665 };
666 }
667 Some(Err(error)) => {
668 emit!(DockerLogsCommunicationError {
669 error,
670 container_id: None,
671 });
672 if !self.retry_events_stream_with_backoff("Docker events stream failed").await {
674 error!("Docker events stream failed and retry exhausted, shutting down.");
675 return;
676 }
677 },
678 None => {
679 if !self.retry_events_stream_with_backoff("Docker events stream ended").await {
681 error!("Docker events stream ended and retry exhausted, shutting down.");
682 return;
683 }
684 }
685 };
686 }
687 };
688 }
689 }
690
691 async fn retry_events_stream_with_backoff(&mut self, reason: &str) -> bool {
694 if let Some(delay) = self.events_backoff.next() {
695 warn!(
696 message = reason,
697 action = "retrying with backoff",
698 delay_ms = delay.as_millis()
699 );
700 tokio::select! {
701 _ = tokio::time::sleep(delay) => {
702 self.events = Box::pin(self.esb.core.docker_logs_event_stream());
703 true
704 }
705 _ = self.esb.shutdown.clone() => {
706 info!("Shutdown signal received during retry backoff.");
707 false
708 }
709 }
710 } else {
711 error!(message = "Events stream retry exhausted.", reason = reason);
712 false
713 }
714 }
715
716 fn exclude_self(&self, id: &str) -> bool {
717 self.hostname
718 .as_ref()
719 .map(|hostname| id.starts_with(hostname) && hostname.len() >= MIN_HOSTNAME_LENGTH)
720 .unwrap_or(false)
721 }
722}
723
724#[derive(Clone)]
726struct EventStreamBuilder {
727 host_key: OptionalValuePath,
728 hostname: Option<String>,
729 core: Arc<DockerLogsSourceCore>,
730 out: SourceSender,
732 main_send: mpsc::UnboundedSender<Result<ContainerLogInfo, (ContainerId, ErrorPersistence)>>,
734 shutdown: ShutdownSignal,
736 log_namespace: LogNamespace,
737}
738
739impl EventStreamBuilder {
740 fn start(&self, id: ContainerId, backoff: Option<Duration>) -> ContainerState {
742 let this = self.clone();
743 tokio::spawn(
744 async move {
745 if let Some(duration) = backoff {
746 tokio::time::sleep(duration).await;
747 }
748
749 match this
750 .core
751 .docker
752 .inspect_container(id.as_str(), None::<InspectContainerOptions>)
753 .await
754 {
755 Ok(details) => match ContainerMetadata::from_details(details) {
756 Ok(metadata) => {
757 let info = ContainerLogInfo::new(id, metadata, this.core.now_timestamp);
758 this.run_event_stream(info).await;
759 return;
760 }
761 Err(error) => emit!(DockerLogsTimestampParseError {
762 error,
763 container_id: id.as_str()
764 }),
765 },
766 Err(error) => emit!(DockerLogsContainerMetadataFetchError {
767 error,
768 container_id: id.as_str()
769 }),
770 }
771
772 this.finish(Err((id, ErrorPersistence::Transient)));
773 }
774 .in_current_span(),
775 );
776
777 ContainerState::new_running()
778 }
779
780 fn restart(&self, container: &mut ContainerState) {
782 if let Some(info) = container.take_info() {
783 let this = self.clone();
784 tokio::spawn(this.run_event_stream(info).in_current_span());
785 }
786 }
787
788 async fn run_event_stream(mut self, mut info: ContainerLogInfo) {
789 let options = Some(
791 LogsOptionsBuilder::new()
792 .follow(true)
793 .stdout(true)
794 .stderr(true)
795 .since(info.log_since() as i32) .timestamps(true)
797 .build(),
798 );
799
800 let stream = self.core.docker.logs(info.id.as_str(), options);
801 emit!(DockerLogsContainerWatch {
802 container_id: info.id.as_str()
803 });
804
805 let mut partial_event_merge_state = None;
807
808 let core = Arc::clone(&self.core);
809
810 let bytes_received = register!(BytesReceived::from(Protocol::HTTP));
811
812 let mut error = None;
813 let events_stream = stream
814 .map(|value| {
815 match value {
816 Ok(message) => Ok(info.new_event(
817 message,
818 core.config.partial_event_marker_field.clone(),
819 core.config.auto_partial_merge,
820 &mut partial_event_merge_state,
821 &bytes_received,
822 self.log_namespace,
823 )),
824 Err(error) => {
825 match &error {
827 DockerError::DockerResponseServerError { status_code, .. }
828 if *status_code == http::StatusCode::NOT_IMPLEMENTED =>
829 {
830 emit!(DockerLogsLoggingDriverUnsupportedError {
831 error,
832 container_id: info.id.as_str(),
833 });
834 Err(ErrorPersistence::Permanent)
835 }
836 _ => {
837 emit!(DockerLogsCommunicationError {
838 error,
839 container_id: Some(info.id.as_str())
840 });
841 Err(ErrorPersistence::Transient)
842 }
843 }
844 }
845 }
846 })
847 .take_while(|v| {
848 error = v.as_ref().err().cloned();
849 ready(v.is_ok())
850 })
851 .filter_map(|v| ready(v.ok().flatten()))
852 .take_until(self.shutdown.clone());
853
854 let events_stream: Box<dyn Stream<Item = LogEvent> + Unpin + Send> =
855 if let Some(ref line_agg_config) = core.line_agg_config {
856 Box::new(line_agg_adapter(
857 events_stream,
858 line_agg::Logic::new(line_agg_config.clone()),
859 self.log_namespace,
860 ))
861 } else {
862 Box::new(events_stream)
863 };
864
865 let host_key = self.host_key.clone().path;
866 let hostname = self.hostname.clone();
867 let result = {
868 let mut stream = events_stream
869 .map(move |event| add_hostname(event, &host_key, &hostname, self.log_namespace));
870 self.out.send_event_stream(&mut stream).await.map_err(|_| {
871 let (count, _) = stream.size_hint();
872 emit!(StreamClosedError { count });
873 })
874 };
875
876 emit!(DockerLogsContainerUnwatch {
878 container_id: info.id.as_str()
879 });
880
881 let result = match (result, error) {
882 (Ok(()), None) => Ok(info),
883 (Err(()), _) => Err((info.id, ErrorPersistence::Permanent)),
884 (_, Some(occurrence)) => Err((info.id, occurrence)),
885 };
886
887 self.finish(result);
888 }
889
890 fn finish(self, result: Result<ContainerLogInfo, (ContainerId, ErrorPersistence)>) {
891 _ = self.main_send.send(result);
894 }
895}
896
897fn add_hostname(
898 mut log: LogEvent,
899 host_key: &Option<OwnedValuePath>,
900 hostname: &Option<String>,
901 log_namespace: LogNamespace,
902) -> LogEvent {
903 if let Some(hostname) = hostname {
904 let legacy_host_key = host_key.as_ref().map(LegacyKey::Overwrite);
905
906 log_namespace.insert_source_metadata(
907 DockerLogsConfig::NAME,
908 &mut log,
909 legacy_host_key,
910 path!("host"),
911 hostname.clone(),
912 );
913 }
914
915 log
916}
917
918#[derive(Copy, Clone, Debug, Eq, PartialEq)]
919enum ErrorPersistence {
920 Transient,
921 Permanent,
922}
923
924#[derive(Hash, Clone, Eq, PartialEq, Ord, PartialOrd)]
927struct ContainerId(Bytes);
928
929impl ContainerId {
930 fn new(id: String) -> Self {
931 ContainerId(id.into())
932 }
933
934 fn as_str(&self) -> &str {
935 std::str::from_utf8(&self.0).expect("Container Id Bytes aren't String")
936 }
937}
938
939struct ContainerState {
941 info: Option<ContainerLogInfo>,
943 running: bool,
945 generation: u64,
947}
948
949impl ContainerState {
950 const fn new_running() -> Self {
952 ContainerState {
953 info: None,
954 running: true,
955 generation: 0,
956 }
957 }
958
959 const fn running(&mut self) {
960 self.running = true;
961 self.generation += 1;
962 }
963
964 const fn stopped(&mut self) {
965 self.running = false;
966 }
967
968 const fn is_running(&self) -> bool {
969 self.running
970 }
971
972 #[must_use]
974 fn return_info(&mut self, info: ContainerLogInfo) -> bool {
975 debug_assert!(self.info.is_none());
976 let restart = self.running || info.generation < self.generation;
979 self.info = Some(info);
980 restart
981 }
982
983 fn take_info(&mut self) -> Option<ContainerLogInfo> {
984 self.info.take().map(|mut info| {
985 info.generation = self.generation;
987 info
988 })
989 }
990}
991
992struct ContainerLogInfo {
994 id: ContainerId,
996 created: DateTime<Utc>,
998 last_log: Option<(DateTime<FixedOffset>, u64)>,
1000 generation: u64,
1002 metadata: ContainerMetadata,
1003}
1004
1005impl ContainerLogInfo {
1006 const fn new(id: ContainerId, metadata: ContainerMetadata, created: DateTime<Utc>) -> Self {
1009 ContainerLogInfo {
1010 id,
1011 created,
1012 last_log: None,
1013 generation: 0,
1014 metadata,
1015 }
1016 }
1017
1018 fn log_since(&self) -> i64 {
1020 self.last_log
1021 .as_ref()
1022 .map(|(d, _)| d.timestamp())
1023 .unwrap_or_else(|| self.created.timestamp())
1024 - 1
1025 }
1026
1027 fn new_event(
1030 &mut self,
1031 log_output: LogOutput,
1032 partial_event_marker_field: Option<String>,
1033 auto_partial_merge: bool,
1034 partial_event_merge_state: &mut Option<LogEventMergeState>,
1035 bytes_received: &Registered<BytesReceived>,
1036 log_namespace: LogNamespace,
1037 ) -> Option<LogEvent> {
1038 let (stream, mut bytes_message) = match log_output {
1039 LogOutput::StdErr { message } => (STDERR.clone(), message),
1040 LogOutput::StdOut { message } => (STDOUT.clone(), message),
1041 LogOutput::Console { message } => (CONSOLE.clone(), message),
1042 LogOutput::StdIn { message: _ } => return None,
1043 };
1044
1045 bytes_received.emit(ByteSize(bytes_message.len()));
1046
1047 let message = String::from_utf8_lossy(&bytes_message);
1048 let mut splitter = message.splitn(2, char::is_whitespace);
1049 let timestamp_str = splitter.next()?;
1050 let timestamp = match DateTime::parse_from_rfc3339(timestamp_str) {
1051 Ok(timestamp) => {
1052 match self.last_log.as_ref() {
1056 Some(&(last, generation)) => {
1057 if last < timestamp || (last == timestamp && generation == self.generation)
1058 {
1059 } else {
1061 trace!(
1065 message = "Received log from previous container generation.",
1066 log_timestamp = %timestamp_str,
1067 last_log_timestamp = %last,
1068 );
1069 return None;
1070 }
1071 }
1072 None => {
1073 if self.created < timestamp.with_timezone(&Utc) {
1074 } else {
1076 trace!(
1079 message = "Received log from before created timestamp.",
1080 log_timestamp = %timestamp_str,
1081 created_timestamp = %self.created
1082 );
1083 return None;
1084 }
1085 }
1086 }
1087
1088 self.last_log = Some((timestamp, self.generation));
1089
1090 let log_len = splitter.next().map(|log| log.len()).unwrap_or(0);
1091 let remove_len = message.len() - log_len;
1092 bytes_message.advance(remove_len);
1093
1094 Some(timestamp.with_timezone(&Utc))
1096 }
1097 Err(error) => {
1098 emit!(DockerLogsTimestampParseError {
1100 error,
1101 container_id: self.id.as_str()
1102 });
1103 None
1105 }
1106 };
1107
1108 let is_partial = if bytes_message
1114 .last()
1115 .map(|&b| b as char == '\n')
1116 .unwrap_or(false)
1117 {
1118 bytes_message.truncate(bytes_message.len() - 1);
1119 if bytes_message
1120 .last()
1121 .map(|&b| b as char == '\r')
1122 .unwrap_or(false)
1123 {
1124 bytes_message.truncate(bytes_message.len() - 1);
1125 }
1126 false
1127 } else {
1128 true
1129 };
1130
1131 let deserializer = BytesDeserializer;
1133 let mut log = deserializer.parse_single(bytes_message, log_namespace);
1134
1135 log_namespace.insert_source_metadata(
1137 DockerLogsConfig::NAME,
1138 &mut log,
1139 Some(LegacyKey::Overwrite(path!(CONTAINER))),
1140 path!(CONTAINER),
1141 self.id.0.clone(),
1142 );
1143 log_namespace.insert_source_metadata(
1145 DockerLogsConfig::NAME,
1146 &mut log,
1147 Some(LegacyKey::Overwrite(path!(IMAGE))),
1148 path!(IMAGE),
1149 self.metadata.image.clone(),
1150 );
1151 log_namespace.insert_source_metadata(
1153 DockerLogsConfig::NAME,
1154 &mut log,
1155 Some(LegacyKey::Overwrite(path!(NAME))),
1156 path!(NAME),
1157 self.metadata.name.clone(),
1158 );
1159 log_namespace.insert_source_metadata(
1161 DockerLogsConfig::NAME,
1162 &mut log,
1163 Some(LegacyKey::Overwrite(path!(CREATED_AT))),
1164 path!(CREATED_AT),
1165 self.metadata.created_at,
1166 );
1167 if !self.metadata.labels.is_empty() {
1169 for (key, value) in self.metadata.labels.iter() {
1170 log_namespace.insert_source_metadata(
1171 DockerLogsConfig::NAME,
1172 &mut log,
1173 Some(LegacyKey::Overwrite(path!("label", key))),
1174 path!("labels", key),
1175 value.clone(),
1176 )
1177 }
1178 }
1179 log_namespace.insert_source_metadata(
1180 DockerLogsConfig::NAME,
1181 &mut log,
1182 Some(LegacyKey::Overwrite(path!(STREAM))),
1183 path!(STREAM),
1184 stream,
1185 );
1186
1187 log_namespace.insert_vector_metadata(
1188 &mut log,
1189 log_schema().source_type_key(),
1190 path!("source_type"),
1191 Bytes::from_static(DockerLogsConfig::NAME.as_bytes()),
1192 );
1193
1194 match log_namespace {
1197 LogNamespace::Vector => {
1198 if let Some(timestamp) = timestamp {
1199 log.insert(
1200 metadata_path!(DockerLogsConfig::NAME, "timestamp"),
1201 timestamp,
1202 );
1203 }
1204
1205 log.insert(metadata_path!("vector", "ingest_timestamp"), Utc::now());
1206 }
1207 LogNamespace::Legacy => {
1208 if let Some(timestamp) = timestamp
1209 && let Some(timestamp_key) = log_schema().timestamp_key()
1210 {
1211 log.try_insert((PathPrefix::Event, timestamp_key), timestamp);
1212 }
1213 }
1214 };
1215
1216 let log = if auto_partial_merge {
1221 if is_partial {
1225 if let Some(partial_event_merge_state) = partial_event_merge_state {
1230 match log_namespace {
1233 LogNamespace::Vector => {
1234 partial_event_merge_state.merge_in_next_event(log, &["."]);
1235 }
1236 LogNamespace::Legacy => {
1237 partial_event_merge_state.merge_in_next_event(
1238 log,
1239 &[log_schema()
1240 .message_key()
1241 .expect("global log_schema.message_key to be valid path")
1242 .to_string()],
1243 );
1244 }
1245 }
1246 } else {
1247 *partial_event_merge_state = Some(LogEventMergeState::new(log));
1248 };
1249 return None;
1250 };
1251
1252 match partial_event_merge_state.take() {
1257 Some(partial_event_merge_state) => match log_namespace {
1260 LogNamespace::Vector => {
1261 partial_event_merge_state.merge_in_final_event(log, &["."])
1262 }
1263 LogNamespace::Legacy => partial_event_merge_state.merge_in_final_event(
1264 log,
1265 &[log_schema()
1266 .message_key()
1267 .expect("global log_schema.message_key to be valid path")
1268 .to_string()],
1269 ),
1270 },
1271 None => log,
1272 }
1273 } else {
1274 if is_partial {
1276 if let Some(partial_event_marker_field) = partial_event_marker_field {
1278 log_namespace.insert_source_metadata(
1279 DockerLogsConfig::NAME,
1280 &mut log,
1281 Some(LegacyKey::Overwrite(path!(
1282 partial_event_marker_field.as_str()
1283 ))),
1284 path!(event::PARTIAL),
1285 true,
1286 );
1287 }
1288 }
1289 log
1291 };
1292
1293 emit!(DockerLogsEventsReceived {
1296 byte_size: log.estimated_json_encoded_size_of(),
1297 container_id: self.id.as_str(),
1298 container_name: &self.metadata.name_str
1299 });
1300
1301 Some(log)
1302 }
1303}
1304
1305struct ContainerMetadata {
1306 labels: HashMap<String, String>,
1308 name: Value,
1310 name_str: String,
1312 image: Value,
1314 created_at: DateTime<Utc>,
1316}
1317
1318impl ContainerMetadata {
1319 fn from_details(details: ContainerInspectResponse) -> Result<Self, ParseError> {
1320 let config = details.config.unwrap();
1321 let name = details.name.unwrap();
1322 let created = details.created.unwrap();
1323
1324 let labels = config.labels.unwrap_or_default();
1325
1326 Ok(ContainerMetadata {
1327 labels,
1328 name: name.as_str().trim_start_matches('/').to_owned().into(),
1329 name_str: name,
1330 image: config.image.unwrap().into(),
1331 created_at: created.with_timezone(&Utc),
1332 })
1333 }
1334}
1335
1336fn line_agg_adapter(
1337 inner: impl Stream<Item = LogEvent> + Unpin,
1338 logic: line_agg::Logic<Bytes, LogEvent>,
1339 log_namespace: LogNamespace,
1340) -> impl Stream<Item = LogEvent> {
1341 let line_agg_in = inner.map(move |mut log| {
1342 let message_value = match log_namespace {
1343 LogNamespace::Vector => log
1344 .remove(event_path!())
1345 .expect("`.` must exist in the event"),
1346 LogNamespace::Legacy => log
1347 .remove(
1348 log_schema()
1349 .message_key_target_path()
1350 .expect("global log_schema.message_key to be valid path"),
1351 )
1352 .expect("`message` must exist in the event"),
1353 };
1354 let stream_value = match log_namespace {
1355 LogNamespace::Vector => log
1356 .get(metadata_path!(DockerLogsConfig::NAME, STREAM))
1357 .expect("`docker_logs.stream` must exist in the metadata"),
1358 LogNamespace::Legacy => log
1359 .get(event_path!(STREAM))
1360 .expect("stream must exist in the event"),
1361 };
1362
1363 let stream = stream_value.coerce_to_bytes();
1364 let message = message_value.coerce_to_bytes();
1365 (stream, message, log)
1366 });
1367 let line_agg_out = LineAgg::<_, Bytes, LogEvent>::new(line_agg_in, logic);
1368 line_agg_out.map(move |(_, message, mut log, _)| {
1369 match log_namespace {
1370 LogNamespace::Vector => log.insert(event_path!(), message),
1371 LogNamespace::Legacy => log.insert(
1372 log_schema()
1373 .message_key_target_path()
1374 .expect("global log_schema.message_key to be valid path"),
1375 message,
1376 ),
1377 };
1378 log
1379 })
1380}