vector/sources/docker_logs/
mod.rs

1use std::{
2    collections::HashMap,
3    convert::TryFrom,
4    future::ready,
5    pin::Pin,
6    sync::{Arc, LazyLock},
7    time::Duration,
8};
9
10use bollard::{
11    Docker,
12    container::LogOutput,
13    errors::Error as DockerError,
14    query_parameters::{
15        EventsOptionsBuilder, InspectContainerOptions, ListContainersOptionsBuilder,
16        LogsOptionsBuilder,
17    },
18    service::{ContainerInspectResponse, EventMessage},
19};
20use bytes::{Buf, Bytes};
21use chrono::{DateTime, FixedOffset, Local, ParseError, Utc};
22use futures::{Stream, StreamExt};
23use serde_with::serde_as;
24use tokio::sync::mpsc;
25use tracing_futures::Instrument;
26use vector_lib::{
27    codecs::{BytesDeserializer, BytesDeserializerConfig},
28    config::{LegacyKey, LogNamespace},
29    configurable::configurable_component,
30    internal_event::{ByteSize, BytesReceived, InternalEventHandle as _, Protocol, Registered},
31    lookup::{
32        OwnedValuePath, PathPrefix, lookup_v2::OptionalValuePath, metadata_path, owned_value_path,
33        path,
34    },
35};
36use vrl::{
37    event_path,
38    value::{Kind, kind::Collection},
39};
40
41use super::util::MultilineConfig;
42use crate::{
43    SourceSender,
44    config::{DataType, SourceConfig, SourceContext, SourceOutput, log_schema},
45    docker::{DockerTlsConfig, docker},
46    event::{self, EstimatedJsonEncodedSizeOf, LogEvent, Value, merge_state::LogEventMergeState},
47    internal_events::{
48        DockerLogsCommunicationError, DockerLogsContainerEventReceived,
49        DockerLogsContainerMetadataFetchError, DockerLogsContainerUnwatch,
50        DockerLogsContainerWatch, DockerLogsEventsReceived,
51        DockerLogsLoggingDriverUnsupportedError, DockerLogsTimestampParseError, StreamClosedError,
52    },
53    line_agg::{self, LineAgg},
54    shutdown::ShutdownSignal,
55};
56
57#[cfg(test)]
58mod tests;
59
60const IMAGE: &str = "image";
61const CREATED_AT: &str = "container_created_at";
62const NAME: &str = "container_name";
63const STREAM: &str = "stream";
64const CONTAINER: &str = "container_id";
65// Prevent short hostname from being wrongly recognized as a container's short ID.
66const MIN_HOSTNAME_LENGTH: usize = 6;
67
68static STDERR: LazyLock<Bytes> = LazyLock::new(|| "stderr".into());
69static STDOUT: LazyLock<Bytes> = LazyLock::new(|| "stdout".into());
70static CONSOLE: LazyLock<Bytes> = LazyLock::new(|| "console".into());
71
72/// Configuration for the `docker_logs` source.
73#[serde_as]
74#[configurable_component(source("docker_logs", "Collect container logs from a Docker Daemon."))]
75#[derive(Clone, Debug)]
76#[serde(deny_unknown_fields, default)]
77pub struct DockerLogsConfig {
78    /// Overrides the name of the log field used to add the current hostname to each event.
79    ///
80    /// By default, the [global `log_schema.host_key` option][global_host_key] is used.
81    ///
82    /// [global_host_key]: https://vector.dev/docs/reference/configuration/global-options/#log_schema.host_key
83    host_key: Option<OptionalValuePath>,
84
85    /// Docker host to connect to.
86    ///
87    /// Use an HTTPS URL to enable TLS encryption.
88    ///
89    /// If absent, the `DOCKER_HOST` environment variable is used. If `DOCKER_HOST` is also absent,
90    /// the default Docker local socket (`/var/run/docker.sock` on Unix platforms,
91    /// `//./pipe/docker_engine` on Windows) is used.
92    #[configurable(metadata(docs::examples = "http://localhost:2375"))]
93    #[configurable(metadata(docs::examples = "https://localhost:2376"))]
94    #[configurable(metadata(docs::examples = "unix:///var/run/docker.sock"))]
95    #[configurable(metadata(docs::examples = "npipe:////./pipe/docker_engine"))]
96    #[configurable(metadata(docs::examples = "/var/run/docker.sock"))]
97    #[configurable(metadata(docs::examples = "//./pipe/docker_engine"))]
98    docker_host: Option<String>,
99
100    /// A list of container IDs or names of containers to exclude from log collection.
101    ///
102    /// Matching is prefix first, so specifying a value of `foo` would match any container named `foo` as well as any
103    /// container whose name started with `foo`. This applies equally whether matching container IDs or names.
104    ///
105    /// By default, the source collects logs for all containers. If `exclude_containers` is configured, any
106    /// container that matches a configured exclusion is excluded even if it is also included with
107    /// `include_containers`, so care should be taken when using prefix matches as they cannot be overridden by a
108    /// corresponding entry in `include_containers`, for example, excluding `foo` by attempting to include `foo-specific-id`.
109    ///
110    /// This can be used in conjunction with `include_containers`.
111    #[configurable(metadata(
112        docs::examples = "exclude_",
113        docs::examples = "exclude_me_0",
114        docs::examples = "ad08cc418cf9"
115    ))]
116    exclude_containers: Option<Vec<String>>, // Starts with actually, not exclude
117
118    /// A list of container IDs or names of containers to include in log collection.
119    ///
120    /// Matching is prefix first, so specifying a value of `foo` would match any container named `foo` as well as any
121    /// container whose name started with `foo`. This applies equally whether matching container IDs or names.
122    ///
123    /// By default, the source collects logs for all containers. If `include_containers` is configured, only
124    /// containers that match a configured inclusion and are also not excluded get matched.
125    ///
126    /// This can be used in conjunction with `exclude_containers`.
127    #[configurable(metadata(
128        docs::examples = "include_",
129        docs::examples = "include_me_0",
130        docs::examples = "ad08cc418cf9"
131    ))]
132    include_containers: Option<Vec<String>>, // Starts with actually, not include
133
134    /// A list of container object labels to match against when filtering running containers.
135    ///
136    /// Labels should follow the syntax described in the [Docker object labels](https://docs.docker.com/config/labels-custom-metadata/) documentation.
137    #[configurable(metadata(
138        docs::examples = "org.opencontainers.image.vendor=Vector",
139        docs::examples = "com.mycorp.internal.animal=fish",
140    ))]
141    include_labels: Option<Vec<String>>,
142
143    /// A list of image names to match against.
144    ///
145    /// If not provided, all images are included.
146    #[configurable(metadata(docs::examples = "httpd", docs::examples = "redis",))]
147    include_images: Option<Vec<String>>,
148
149    /// Overrides the name of the log field used to mark an event as partial.
150    ///
151    /// If `auto_partial_merge` is disabled, partial events are emitted with a log field, set by this
152    /// configuration value, indicating that the event is not complete.
153    #[serde(default = "default_partial_event_marker_field")]
154    partial_event_marker_field: Option<String>,
155
156    /// Enables automatic merging of partial events.
157    auto_partial_merge: bool,
158
159    /// The amount of time to wait before retrying after an error.
160    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
161    #[serde(default = "default_retry_backoff_secs")]
162    #[configurable(metadata(docs::human_name = "Retry Backoff"))]
163    retry_backoff_secs: Duration,
164
165    /// Multiline aggregation configuration.
166    ///
167    /// If not specified, multiline aggregation is disabled.
168    #[configurable(derived)]
169    multiline: Option<MultilineConfig>,
170
171    #[configurable(derived)]
172    tls: Option<DockerTlsConfig>,
173
174    /// The namespace to use for logs. This overrides the global setting.
175    #[serde(default)]
176    #[configurable(metadata(docs::hidden))]
177    pub log_namespace: Option<bool>,
178}
179
180impl Default for DockerLogsConfig {
181    fn default() -> Self {
182        Self {
183            host_key: None,
184            docker_host: None,
185            tls: None,
186            exclude_containers: None,
187            include_containers: None,
188            include_labels: None,
189            include_images: None,
190            partial_event_marker_field: default_partial_event_marker_field(),
191            auto_partial_merge: true,
192            multiline: None,
193            retry_backoff_secs: default_retry_backoff_secs(),
194            log_namespace: None,
195        }
196    }
197}
198
199fn default_partial_event_marker_field() -> Option<String> {
200    Some(event::PARTIAL.to_string())
201}
202
203const fn default_retry_backoff_secs() -> Duration {
204    Duration::from_secs(2)
205}
206
207impl DockerLogsConfig {
208    fn container_name_or_id_included<'a>(
209        &self,
210        id: &str,
211        names: impl IntoIterator<Item = &'a str>,
212    ) -> bool {
213        let containers: Vec<String> = names.into_iter().map(Into::into).collect();
214
215        self.include_containers
216            .as_ref()
217            .map(|include_list| Self::name_or_id_matches(id, &containers, include_list))
218            .unwrap_or(true)
219            && !(self
220                .exclude_containers
221                .as_ref()
222                .map(|exclude_list| Self::name_or_id_matches(id, &containers, exclude_list))
223                .unwrap_or(false))
224    }
225
226    fn name_or_id_matches(id: &str, names: &[String], items: &[String]) -> bool {
227        items.iter().any(|flag| id.starts_with(flag))
228            || names
229                .iter()
230                .any(|name| items.iter().any(|item| name.starts_with(item)))
231    }
232
233    fn with_empty_partial_event_marker_field_as_none(mut self) -> Self {
234        if let Some(val) = &self.partial_event_marker_field
235            && val.is_empty()
236        {
237            self.partial_event_marker_field = None;
238        }
239        self
240    }
241}
242
243impl_generate_config_from_default!(DockerLogsConfig);
244
245#[async_trait::async_trait]
246#[typetag::serde(name = "docker_logs")]
247impl SourceConfig for DockerLogsConfig {
248    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
249        let log_namespace = cx.log_namespace(self.log_namespace);
250        let source = DockerLogsSource::new(
251            self.clone().with_empty_partial_event_marker_field_as_none(),
252            cx.out,
253            cx.shutdown.clone(),
254            log_namespace,
255        )?;
256
257        // Capture currently running containers, and do main future(run)
258        let fut = async move {
259            match source.handle_running_containers().await {
260                Ok(source) => source.run().await,
261                Err(error) => {
262                    error!(
263                        message = "Listing currently running containers failed.",
264                        %error
265                    );
266                }
267            }
268        };
269
270        let shutdown = cx.shutdown;
271        // Once this ShutdownSignal resolves it will drop DockerLogsSource and by extension it's ShutdownSignal.
272        Ok(Box::pin(async move {
273            Ok(tokio::select! {
274                _ = fut => {}
275                _ = shutdown => {}
276            })
277        }))
278    }
279
280    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
281        let host_key = self
282            .host_key
283            .clone()
284            .unwrap_or(log_schema().host_key().cloned().into())
285            .path
286            .map(LegacyKey::Overwrite);
287
288        let schema_definition = BytesDeserializerConfig
289            .schema_definition(global_log_namespace.merge(self.log_namespace))
290            .with_source_metadata(
291                Self::NAME,
292                host_key,
293                &owned_value_path!("host"),
294                Kind::bytes().or_undefined(),
295                Some("host"),
296            )
297            .with_source_metadata(
298                Self::NAME,
299                Some(LegacyKey::Overwrite(owned_value_path!(CONTAINER))),
300                &owned_value_path!(CONTAINER),
301                Kind::bytes(),
302                None,
303            )
304            .with_source_metadata(
305                Self::NAME,
306                Some(LegacyKey::Overwrite(owned_value_path!(IMAGE))),
307                &owned_value_path!(IMAGE),
308                Kind::bytes(),
309                None,
310            )
311            .with_source_metadata(
312                Self::NAME,
313                Some(LegacyKey::Overwrite(owned_value_path!(NAME))),
314                &owned_value_path!(NAME),
315                Kind::bytes(),
316                None,
317            )
318            .with_source_metadata(
319                Self::NAME,
320                Some(LegacyKey::Overwrite(owned_value_path!(CREATED_AT))),
321                &owned_value_path!(CREATED_AT),
322                Kind::timestamp(),
323                None,
324            )
325            .with_source_metadata(
326                Self::NAME,
327                Some(LegacyKey::Overwrite(owned_value_path!("label"))),
328                &owned_value_path!("labels"),
329                Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
330                None,
331            )
332            .with_source_metadata(
333                Self::NAME,
334                Some(LegacyKey::Overwrite(owned_value_path!(STREAM))),
335                &owned_value_path!(STREAM),
336                Kind::bytes(),
337                None,
338            )
339            .with_source_metadata(
340                Self::NAME,
341                log_schema()
342                    .timestamp_key()
343                    .cloned()
344                    .map(LegacyKey::Overwrite),
345                &owned_value_path!("timestamp"),
346                Kind::timestamp(),
347                Some("timestamp"),
348            )
349            .with_vector_metadata(
350                log_schema().source_type_key(),
351                &owned_value_path!("source_type"),
352                Kind::bytes(),
353                None,
354            )
355            .with_vector_metadata(
356                None,
357                &owned_value_path!("ingest_timestamp"),
358                Kind::timestamp(),
359                None,
360            );
361
362        vec![SourceOutput::new_maybe_logs(
363            DataType::Log,
364            schema_definition,
365        )]
366    }
367
368    fn can_acknowledge(&self) -> bool {
369        false
370    }
371}
372
373struct DockerLogsSourceCore {
374    config: DockerLogsConfig,
375    line_agg_config: Option<line_agg::Config>,
376    docker: Docker,
377    /// Only logs created at, or after this moment are logged.
378    now_timestamp: DateTime<Utc>,
379}
380
381impl DockerLogsSourceCore {
382    fn new(config: DockerLogsConfig) -> crate::Result<Self> {
383        // ?NOTE: Constructs a new Docker instance for a docker host listening at url specified by an env var DOCKER_HOST.
384        // ?      Otherwise connects to unix socket which requires sudo privileges, or docker group membership.
385        let docker = docker(config.docker_host.clone(), config.tls.clone())?;
386
387        // Only log events created at-or-after this moment are logged.
388        let now = Local::now();
389        info!(
390            message = "Capturing logs from now on.",
391            now = %now.to_rfc3339()
392        );
393
394        let line_agg_config = if let Some(ref multiline_config) = config.multiline {
395            Some(line_agg::Config::try_from(multiline_config)?)
396        } else {
397            None
398        };
399
400        Ok(DockerLogsSourceCore {
401            config,
402            line_agg_config,
403            docker,
404            now_timestamp: now.into(),
405        })
406    }
407
408    /// Returns event stream coming from docker.
409    fn docker_logs_event_stream(
410        &self,
411    ) -> impl Stream<Item = Result<EventMessage, DockerError>> + Send + use<> {
412        let mut filters = HashMap::new();
413
414        // event  | emitted on commands
415        // -------+-------------------
416        // start  | docker start, docker run, restart policy, docker restart
417        // unpause | docker unpause
418        // die    | docker restart, docker stop, docker kill, process exited, oom
419        // pause  | docker pause
420        filters.insert(
421            "event".to_owned(),
422            vec![
423                "start".to_owned(),
424                "unpause".to_owned(),
425                "die".to_owned(),
426                "pause".to_owned(),
427            ],
428        );
429        filters.insert("type".to_owned(), vec!["container".to_owned()]);
430
431        // Apply include filters.
432        if let Some(include_labels) = &self.config.include_labels {
433            filters.insert("label".to_owned(), include_labels.clone());
434        }
435
436        if let Some(include_images) = &self.config.include_images {
437            filters.insert("image".to_owned(), include_images.clone());
438        }
439
440        self.docker.events(Some(
441            EventsOptionsBuilder::new()
442                .since(&self.now_timestamp.timestamp().to_string())
443                .filters(&filters)
444                .build(),
445        ))
446    }
447}
448
449/// Main future which listens for events coming from docker, and maintains
450/// a fan of event_stream futures.
451/// Where each event_stream corresponds to a running container marked with ContainerLogInfo.
452/// While running, event_stream streams Events to out channel.
453/// Once a log stream has ended, it sends ContainerLogInfo back to main.
454///
455/// Future  channel     Future      channel
456///           |<---- event_stream ---->out
457/// main <----|<---- event_stream ---->out
458///           | ...                 ...out
459///
460struct DockerLogsSource {
461    esb: EventStreamBuilder,
462    /// event stream from docker
463    events: Pin<Box<dyn Stream<Item = Result<EventMessage, DockerError>> + Send>>,
464    ///  mappings of seen container_id to their data
465    containers: HashMap<ContainerId, ContainerState>,
466    ///receives ContainerLogInfo coming from event stream futures
467    main_recv: mpsc::UnboundedReceiver<Result<ContainerLogInfo, (ContainerId, ErrorPersistence)>>,
468    /// It may contain shortened container id.
469    hostname: Option<String>,
470    backoff_duration: Duration,
471}
472
473impl DockerLogsSource {
474    fn new(
475        config: DockerLogsConfig,
476        out: SourceSender,
477        shutdown: ShutdownSignal,
478        log_namespace: LogNamespace,
479    ) -> crate::Result<DockerLogsSource> {
480        let backoff_secs = config.retry_backoff_secs;
481
482        let host_key = config
483            .host_key
484            .clone()
485            .unwrap_or(log_schema().host_key().cloned().into());
486        let hostname = crate::get_hostname().ok();
487
488        // Only logs created at, or after this moment are logged.
489        let core = DockerLogsSourceCore::new(config)?;
490
491        // main event stream, with whom only newly started/restarted containers will be logged.
492        let events = core.docker_logs_event_stream();
493        info!(message = "Listening to docker log events.");
494
495        // Channel of communication between main future and event_stream futures
496        let (main_send, main_recv) =
497            mpsc::unbounded_channel::<Result<ContainerLogInfo, (ContainerId, ErrorPersistence)>>();
498
499        // Starting with logs from now.
500        // TODO: Is this exception acceptable?
501        // Only somewhat exception to this is case where:
502        // t0 -- outside: container running
503        // t1 -- now_timestamp
504        // t2 -- outside: container stopped
505        // t3 -- list_containers
506        // In that case, logs between [t1,t2] will be pulled to vector only on next start/unpause of that container.
507        let esb = EventStreamBuilder {
508            host_key,
509            hostname: hostname.clone(),
510            core: Arc::new(core),
511            out,
512            main_send,
513            shutdown,
514            log_namespace,
515        };
516
517        Ok(DockerLogsSource {
518            esb,
519            events: Box::pin(events),
520            containers: HashMap::new(),
521            main_recv,
522            hostname,
523            backoff_duration: backoff_secs,
524        })
525    }
526
527    /// Future that captures currently running containers, and starts event streams for them.
528    async fn handle_running_containers(mut self) -> crate::Result<Self> {
529        let mut filters = HashMap::new();
530
531        // Apply include filters
532        if let Some(include_labels) = &self.esb.core.config.include_labels {
533            filters.insert("label".to_owned(), include_labels.clone());
534        }
535
536        if let Some(include_images) = &self.esb.core.config.include_images {
537            filters.insert("ancestor".to_owned(), include_images.clone());
538        }
539
540        self.esb
541            .core
542            .docker
543            .list_containers(Some(
544                ListContainersOptionsBuilder::new()
545                    .all(false)
546                    .filters(&filters)
547                    .build(),
548            ))
549            .await?
550            .into_iter()
551            .for_each(|container| {
552                let id = container.id.unwrap();
553                let names = container.names.unwrap();
554
555                trace!(message = "Found already running container.", id = %id, names = ?names);
556
557                if self.exclude_self(id.as_str()) {
558                    info!(message = "Excluded self container.", id = %id);
559                    return;
560                }
561
562                if !self.esb.core.config.container_name_or_id_included(
563                    id.as_str(),
564                    names.iter().map(|s| {
565                        // In this case bollard / shiplift gives names with starting '/' so it needs to be removed.
566                        let s = s.as_str();
567                        if s.starts_with('/') {
568                            s.split_at('/'.len_utf8()).1
569                        } else {
570                            s
571                        }
572                    }),
573                ) {
574                    info!(message = "Excluded container.", id = %id);
575                    return;
576                }
577
578                let id = ContainerId::new(id);
579                self.containers.insert(id.clone(), self.esb.start(id, None));
580            });
581
582        Ok(self)
583    }
584
585    async fn run(mut self) {
586        loop {
587            tokio::select! {
588                value = self.main_recv.recv() => {
589                    match value {
590                        Some(Ok(info)) => {
591                            let state = self
592                                .containers
593                                .get_mut(&info.id)
594                                .expect("Every ContainerLogInfo has it's ContainerState");
595                            if state.return_info(info) {
596                                self.esb.restart(state);
597                            }
598                        },
599                        Some(Err((id,persistence))) => {
600                            let state = self
601                                .containers
602                                .remove(&id)
603                                .expect("Every started ContainerId has it's ContainerState");
604                            match persistence{
605                                ErrorPersistence::Transient => if state.is_running() {
606                                    let backoff= Some(self.backoff_duration);
607                                    self.containers.insert(id.clone(), self.esb.start(id, backoff));
608                                }
609                                // Forget the container since the error is permanent.
610                                ErrorPersistence::Permanent => (),
611                            }
612                        }
613                        None => {
614                            error!(message = "The docker_logs source main stream has ended unexpectedly.");
615                            info!(message = "Shutting down docker_logs source.");
616                            return;
617                        }
618                    };
619                }
620                value = self.events.next() => {
621                    match value {
622                        Some(Ok(mut event)) => {
623                            let action = event.action.unwrap();
624                            let actor = event.actor.take().unwrap();
625                            let id = actor.id.unwrap();
626                            let attributes = actor.attributes.unwrap();
627
628                            emit!(DockerLogsContainerEventReceived { container_id: &id, action: &action });
629
630                            let id = ContainerId::new(id.to_owned());
631
632                            // Update container status
633                            match action.as_str() {
634                                "die" | "pause" => {
635                                    if let Some(state) = self.containers.get_mut(&id) {
636                                        state.stopped();
637                                    }
638                                }
639                                "start" | "unpause" => {
640                                    if let Some(state) = self.containers.get_mut(&id) {
641                                        state.running();
642                                        self.esb.restart(state);
643                                    } else {
644                                        let include_name =
645                                            self.esb.core.config.container_name_or_id_included(
646                                                id.as_str(),
647                                                attributes.get("name").map(|s| s.as_str()),
648                                            );
649
650                                        let exclude_self = self.exclude_self(id.as_str());
651
652                                        if include_name && !exclude_self {
653                                            self.containers.insert(id.clone(), self.esb.start(id, None));
654                                        }
655                                    }
656                                }
657                                _ => {},
658                            };
659                        }
660                        Some(Err(error)) => {
661                            emit!(DockerLogsCommunicationError {
662                                error,
663                                container_id: None,
664                            });
665                            return;
666                        },
667                        None => {
668                            // TODO: this could be fixed, but should be tried with some timeoff and exponential backoff
669                            error!(message = "Docker log event stream has ended unexpectedly.");
670                            info!(message = "Shutting down docker_logs source.");
671                            return;
672                        }
673                    };
674                }
675            };
676        }
677    }
678
679    fn exclude_self(&self, id: &str) -> bool {
680        self.hostname
681            .as_ref()
682            .map(|hostname| id.starts_with(hostname) && hostname.len() >= MIN_HOSTNAME_LENGTH)
683            .unwrap_or(false)
684    }
685}
686
687/// Used to construct and start event stream futures
688#[derive(Clone)]
689struct EventStreamBuilder {
690    host_key: OptionalValuePath,
691    hostname: Option<String>,
692    core: Arc<DockerLogsSourceCore>,
693    /// Event stream futures send events through this
694    out: SourceSender,
695    /// End through which event stream futures send ContainerLogInfo to main future
696    main_send: mpsc::UnboundedSender<Result<ContainerLogInfo, (ContainerId, ErrorPersistence)>>,
697    /// Self and event streams will end on this.
698    shutdown: ShutdownSignal,
699    log_namespace: LogNamespace,
700}
701
702impl EventStreamBuilder {
703    /// Spawn a task to runs event stream until shutdown.
704    fn start(&self, id: ContainerId, backoff: Option<Duration>) -> ContainerState {
705        let this = self.clone();
706        tokio::spawn(
707            async move {
708                if let Some(duration) = backoff {
709                    tokio::time::sleep(duration).await;
710                }
711
712                match this
713                    .core
714                    .docker
715                    .inspect_container(id.as_str(), None::<InspectContainerOptions>)
716                    .await
717                {
718                    Ok(details) => match ContainerMetadata::from_details(details) {
719                        Ok(metadata) => {
720                            let info = ContainerLogInfo::new(id, metadata, this.core.now_timestamp);
721                            this.run_event_stream(info).await;
722                            return;
723                        }
724                        Err(error) => emit!(DockerLogsTimestampParseError {
725                            error,
726                            container_id: id.as_str()
727                        }),
728                    },
729                    Err(error) => emit!(DockerLogsContainerMetadataFetchError {
730                        error,
731                        container_id: id.as_str()
732                    }),
733                }
734
735                this.finish(Err((id, ErrorPersistence::Transient)));
736            }
737            .in_current_span(),
738        );
739
740        ContainerState::new_running()
741    }
742
743    /// If info is present, restarts event stream which will run until shutdown.
744    fn restart(&self, container: &mut ContainerState) {
745        if let Some(info) = container.take_info() {
746            let this = self.clone();
747            tokio::spawn(this.run_event_stream(info).in_current_span());
748        }
749    }
750
751    async fn run_event_stream(mut self, mut info: ContainerLogInfo) {
752        // Establish connection
753        let options = Some(
754            LogsOptionsBuilder::new()
755                .follow(true)
756                .stdout(true)
757                .stderr(true)
758                .since(info.log_since() as i32) // 2038 bug (I think)
759                .timestamps(true)
760                .build(),
761        );
762
763        let stream = self.core.docker.logs(info.id.as_str(), options);
764        emit!(DockerLogsContainerWatch {
765            container_id: info.id.as_str()
766        });
767
768        // Create event streamer
769        let mut partial_event_merge_state = None;
770
771        let core = Arc::clone(&self.core);
772
773        let bytes_received = register!(BytesReceived::from(Protocol::HTTP));
774
775        let mut error = None;
776        let events_stream = stream
777            .map(|value| {
778                match value {
779                    Ok(message) => Ok(info.new_event(
780                        message,
781                        core.config.partial_event_marker_field.clone(),
782                        core.config.auto_partial_merge,
783                        &mut partial_event_merge_state,
784                        &bytes_received,
785                        self.log_namespace,
786                    )),
787                    Err(error) => {
788                        // On any error, restart connection
789                        match &error {
790                            DockerError::DockerResponseServerError { status_code, .. }
791                                if *status_code == http::StatusCode::NOT_IMPLEMENTED =>
792                            {
793                                emit!(DockerLogsLoggingDriverUnsupportedError {
794                                    error,
795                                    container_id: info.id.as_str(),
796                                });
797                                Err(ErrorPersistence::Permanent)
798                            }
799                            _ => {
800                                emit!(DockerLogsCommunicationError {
801                                    error,
802                                    container_id: Some(info.id.as_str())
803                                });
804                                Err(ErrorPersistence::Transient)
805                            }
806                        }
807                    }
808                }
809            })
810            .take_while(|v| {
811                error = v.as_ref().err().cloned();
812                ready(v.is_ok())
813            })
814            .filter_map(|v| ready(v.ok().flatten()))
815            .take_until(self.shutdown.clone());
816
817        let events_stream: Box<dyn Stream<Item = LogEvent> + Unpin + Send> =
818            if let Some(ref line_agg_config) = core.line_agg_config {
819                Box::new(line_agg_adapter(
820                    events_stream,
821                    line_agg::Logic::new(line_agg_config.clone()),
822                    self.log_namespace,
823                ))
824            } else {
825                Box::new(events_stream)
826            };
827
828        let host_key = self.host_key.clone().path;
829        let hostname = self.hostname.clone();
830        let result = {
831            let mut stream = events_stream
832                .map(move |event| add_hostname(event, &host_key, &hostname, self.log_namespace));
833            self.out.send_event_stream(&mut stream).await.map_err(|_| {
834                let (count, _) = stream.size_hint();
835                emit!(StreamClosedError { count });
836            })
837        };
838
839        // End of stream
840        emit!(DockerLogsContainerUnwatch {
841            container_id: info.id.as_str()
842        });
843
844        let result = match (result, error) {
845            (Ok(()), None) => Ok(info),
846            (Err(()), _) => Err((info.id, ErrorPersistence::Permanent)),
847            (_, Some(occurrence)) => Err((info.id, occurrence)),
848        };
849
850        self.finish(result);
851    }
852
853    fn finish(self, result: Result<ContainerLogInfo, (ContainerId, ErrorPersistence)>) {
854        // This can legally fail when shutting down, and any other
855        // reason should have been logged in the main future.
856        _ = self.main_send.send(result);
857    }
858}
859
860fn add_hostname(
861    mut log: LogEvent,
862    host_key: &Option<OwnedValuePath>,
863    hostname: &Option<String>,
864    log_namespace: LogNamespace,
865) -> LogEvent {
866    if let Some(hostname) = hostname {
867        let legacy_host_key = host_key.as_ref().map(LegacyKey::Overwrite);
868
869        log_namespace.insert_source_metadata(
870            DockerLogsConfig::NAME,
871            &mut log,
872            legacy_host_key,
873            path!("host"),
874            hostname.clone(),
875        );
876    }
877
878    log
879}
880
881#[derive(Copy, Clone, Debug, Eq, PartialEq)]
882enum ErrorPersistence {
883    Transient,
884    Permanent,
885}
886
887/// Container ID as assigned by Docker.
888/// Is actually a string.
889#[derive(Hash, Clone, Eq, PartialEq, Ord, PartialOrd)]
890struct ContainerId(Bytes);
891
892impl ContainerId {
893    fn new(id: String) -> Self {
894        ContainerId(id.into())
895    }
896
897    fn as_str(&self) -> &str {
898        std::str::from_utf8(&self.0).expect("Container Id Bytes aren't String")
899    }
900}
901
902/// Kept by main to keep track of container state
903struct ContainerState {
904    /// None if there is a event_stream of this container.
905    info: Option<ContainerLogInfo>,
906    /// True if Container is currently running
907    running: bool,
908    /// Of running
909    generation: u64,
910}
911
912impl ContainerState {
913    /// It's ContainerLogInfo pair must be created exactly once.
914    const fn new_running() -> Self {
915        ContainerState {
916            info: None,
917            running: true,
918            generation: 0,
919        }
920    }
921
922    const fn running(&mut self) {
923        self.running = true;
924        self.generation += 1;
925    }
926
927    const fn stopped(&mut self) {
928        self.running = false;
929    }
930
931    const fn is_running(&self) -> bool {
932        self.running
933    }
934
935    /// True if it needs to be restarted.
936    #[must_use]
937    fn return_info(&mut self, info: ContainerLogInfo) -> bool {
938        debug_assert!(self.info.is_none());
939        // Generation is the only one strictly necessary,
940        // but with v.running, restarting event_stream is automatically done.
941        let restart = self.running || info.generation < self.generation;
942        self.info = Some(info);
943        restart
944    }
945
946    fn take_info(&mut self) -> Option<ContainerLogInfo> {
947        self.info.take().map(|mut info| {
948            // Update info
949            info.generation = self.generation;
950            info
951        })
952    }
953}
954
955/// Exchanged between main future and event_stream futures
956struct ContainerLogInfo {
957    /// Container docker ID
958    id: ContainerId,
959    /// Timestamp of event which created this struct
960    created: DateTime<Utc>,
961    /// Timestamp of last log message with it's generation
962    last_log: Option<(DateTime<FixedOffset>, u64)>,
963    /// generation of ContainerState at event_stream creation
964    generation: u64,
965    metadata: ContainerMetadata,
966}
967
968impl ContainerLogInfo {
969    /// Container docker ID
970    /// Unix timestamp of event which created this struct
971    const fn new(id: ContainerId, metadata: ContainerMetadata, created: DateTime<Utc>) -> Self {
972        ContainerLogInfo {
973            id,
974            created,
975            last_log: None,
976            generation: 0,
977            metadata,
978        }
979    }
980
981    /// Only logs after or equal to this point need to be fetched
982    fn log_since(&self) -> i64 {
983        self.last_log
984            .as_ref()
985            .map(|(d, _)| d.timestamp())
986            .unwrap_or_else(|| self.created.timestamp())
987            - 1
988    }
989
990    /// Expects timestamp at the beginning of message.
991    /// Expects messages to be ordered by timestamps.
992    fn new_event(
993        &mut self,
994        log_output: LogOutput,
995        partial_event_marker_field: Option<String>,
996        auto_partial_merge: bool,
997        partial_event_merge_state: &mut Option<LogEventMergeState>,
998        bytes_received: &Registered<BytesReceived>,
999        log_namespace: LogNamespace,
1000    ) -> Option<LogEvent> {
1001        let (stream, mut bytes_message) = match log_output {
1002            LogOutput::StdErr { message } => (STDERR.clone(), message),
1003            LogOutput::StdOut { message } => (STDOUT.clone(), message),
1004            LogOutput::Console { message } => (CONSOLE.clone(), message),
1005            LogOutput::StdIn { message: _ } => return None,
1006        };
1007
1008        bytes_received.emit(ByteSize(bytes_message.len()));
1009
1010        let message = String::from_utf8_lossy(&bytes_message);
1011        let mut splitter = message.splitn(2, char::is_whitespace);
1012        let timestamp_str = splitter.next()?;
1013        let timestamp = match DateTime::parse_from_rfc3339(timestamp_str) {
1014            Ok(timestamp) => {
1015                // Timestamp check. This is included to avoid processing the same log multiple times, which can
1016                // occur when a container changes generations, and to avoid processing logs with timestamps before
1017                // the created timestamp.
1018                match self.last_log.as_ref() {
1019                    Some(&(last, generation)) => {
1020                        if last < timestamp || (last == timestamp && generation == self.generation)
1021                        {
1022                            // Noop - log received in order.
1023                        } else {
1024                            // Docker returns logs in order.
1025                            // If we reach this state, this log is from a previous generation of the container.
1026                            // It was already processed, so we can safely skip it.
1027                            trace!(
1028                                message = "Received log from previous container generation.",
1029                                log_timestamp = %timestamp_str,
1030                                last_log_timestamp = %last,
1031                            );
1032                            return None;
1033                        }
1034                    }
1035                    None => {
1036                        if self.created < timestamp.with_timezone(&Utc) {
1037                            // Noop - first log to process.
1038                        } else {
1039                            // Received a log with a timestamp before that provided to the Docker API.
1040                            // This should not happen, but if it does, we can just ignore these logs.
1041                            trace!(
1042                                message = "Received log from before created timestamp.",
1043                                log_timestamp = %timestamp_str,
1044                                created_timestamp = %self.created
1045                            );
1046                            return None;
1047                        }
1048                    }
1049                }
1050
1051                self.last_log = Some((timestamp, self.generation));
1052
1053                let log_len = splitter.next().map(|log| log.len()).unwrap_or(0);
1054                let remove_len = message.len() - log_len;
1055                bytes_message.advance(remove_len);
1056
1057                // Provide the timestamp.
1058                Some(timestamp.with_timezone(&Utc))
1059            }
1060            Err(error) => {
1061                // Received bad timestamp, if any at all.
1062                emit!(DockerLogsTimestampParseError {
1063                    error,
1064                    container_id: self.id.as_str()
1065                });
1066                // So continue normally but without a timestamp.
1067                None
1068            }
1069        };
1070
1071        // Message is actually one line from stderr or stdout, and they are
1072        // delimited with newline, so that newline needs to be removed.
1073        // If there's no newline, the event is considered partial, and will
1074        // either be merged within the docker source, or marked accordingly
1075        // before sending out, depending on the configuration.
1076        let is_partial = if bytes_message
1077            .last()
1078            .map(|&b| b as char == '\n')
1079            .unwrap_or(false)
1080        {
1081            bytes_message.truncate(bytes_message.len() - 1);
1082            if bytes_message
1083                .last()
1084                .map(|&b| b as char == '\r')
1085                .unwrap_or(false)
1086            {
1087                bytes_message.truncate(bytes_message.len() - 1);
1088            }
1089            false
1090        } else {
1091            true
1092        };
1093
1094        // Build the log.
1095        let deserializer = BytesDeserializer;
1096        let mut log = deserializer.parse_single(bytes_message, log_namespace);
1097
1098        // Container ID
1099        log_namespace.insert_source_metadata(
1100            DockerLogsConfig::NAME,
1101            &mut log,
1102            Some(LegacyKey::Overwrite(path!(CONTAINER))),
1103            path!(CONTAINER),
1104            self.id.0.clone(),
1105        );
1106        // Container image
1107        log_namespace.insert_source_metadata(
1108            DockerLogsConfig::NAME,
1109            &mut log,
1110            Some(LegacyKey::Overwrite(path!(IMAGE))),
1111            path!(IMAGE),
1112            self.metadata.image.clone(),
1113        );
1114        // Container name
1115        log_namespace.insert_source_metadata(
1116            DockerLogsConfig::NAME,
1117            &mut log,
1118            Some(LegacyKey::Overwrite(path!(NAME))),
1119            path!(NAME),
1120            self.metadata.name.clone(),
1121        );
1122        // Created at timestamp
1123        log_namespace.insert_source_metadata(
1124            DockerLogsConfig::NAME,
1125            &mut log,
1126            Some(LegacyKey::Overwrite(path!(CREATED_AT))),
1127            path!(CREATED_AT),
1128            self.metadata.created_at,
1129        );
1130        // Labels
1131        if !self.metadata.labels.is_empty() {
1132            for (key, value) in self.metadata.labels.iter() {
1133                log_namespace.insert_source_metadata(
1134                    DockerLogsConfig::NAME,
1135                    &mut log,
1136                    Some(LegacyKey::Overwrite(path!("label", key))),
1137                    path!("labels", key),
1138                    value.clone(),
1139                )
1140            }
1141        }
1142        log_namespace.insert_source_metadata(
1143            DockerLogsConfig::NAME,
1144            &mut log,
1145            Some(LegacyKey::Overwrite(path!(STREAM))),
1146            path!(STREAM),
1147            stream,
1148        );
1149
1150        log_namespace.insert_vector_metadata(
1151            &mut log,
1152            log_schema().source_type_key(),
1153            path!("source_type"),
1154            Bytes::from_static(DockerLogsConfig::NAME.as_bytes()),
1155        );
1156
1157        // This handles the transition from the original timestamp logic. Originally the
1158        // `timestamp_key` was only populated when a timestamp was parsed from the event.
1159        match log_namespace {
1160            LogNamespace::Vector => {
1161                if let Some(timestamp) = timestamp {
1162                    log.insert(
1163                        metadata_path!(DockerLogsConfig::NAME, "timestamp"),
1164                        timestamp,
1165                    );
1166                }
1167
1168                log.insert(metadata_path!("vector", "ingest_timestamp"), Utc::now());
1169            }
1170            LogNamespace::Legacy => {
1171                if let Some(timestamp) = timestamp
1172                    && let Some(timestamp_key) = log_schema().timestamp_key()
1173                {
1174                    log.try_insert((PathPrefix::Event, timestamp_key), timestamp);
1175                }
1176            }
1177        };
1178
1179        // If automatic partial event merging is requested - perform the
1180        // merging.
1181        // Otherwise mark partial events and return all the events with no
1182        // merging.
1183        let log = if auto_partial_merge {
1184            // Partial event events merging logic.
1185
1186            // If event is partial, stash it and return `None`.
1187            if is_partial {
1188                // If we already have a partial event merge state, the current
1189                // message has to be merged into that existing state.
1190                // Otherwise, create a new partial event merge state with the
1191                // current message being the initial one.
1192                if let Some(partial_event_merge_state) = partial_event_merge_state {
1193                    // Depending on the log namespace the actual contents of the log "message" will be
1194                    // found in either the root of the event ("."), or at the globally configured "message_key".
1195                    match log_namespace {
1196                        LogNamespace::Vector => {
1197                            partial_event_merge_state.merge_in_next_event(log, &["."]);
1198                        }
1199                        LogNamespace::Legacy => {
1200                            partial_event_merge_state.merge_in_next_event(
1201                                log,
1202                                &[log_schema()
1203                                    .message_key()
1204                                    .expect("global log_schema.message_key to be valid path")
1205                                    .to_string()],
1206                            );
1207                        }
1208                    }
1209                } else {
1210                    *partial_event_merge_state = Some(LogEventMergeState::new(log));
1211                };
1212                return None;
1213            };
1214
1215            // This is not a partial event. If we have a partial event merge
1216            // state from before, the current event must be a final event, that
1217            // would give us a merged event we can return.
1218            // Otherwise it's just a regular event that we return as-is.
1219            match partial_event_merge_state.take() {
1220                // Depending on the log namespace the actual contents of the log "message" will be
1221                // found in either the root of the event ("."), or at the globally configured "message_key".
1222                Some(partial_event_merge_state) => match log_namespace {
1223                    LogNamespace::Vector => {
1224                        partial_event_merge_state.merge_in_final_event(log, &["."])
1225                    }
1226                    LogNamespace::Legacy => partial_event_merge_state.merge_in_final_event(
1227                        log,
1228                        &[log_schema()
1229                            .message_key()
1230                            .expect("global log_schema.message_key to be valid path")
1231                            .to_string()],
1232                    ),
1233                },
1234                None => log,
1235            }
1236        } else {
1237            // If the event is partial, just set the partial event marker field.
1238            if is_partial {
1239                // Only add partial event marker field if it's requested.
1240                if let Some(partial_event_marker_field) = partial_event_marker_field {
1241                    log_namespace.insert_source_metadata(
1242                        DockerLogsConfig::NAME,
1243                        &mut log,
1244                        Some(LegacyKey::Overwrite(path!(
1245                            partial_event_marker_field.as_str()
1246                        ))),
1247                        path!(event::PARTIAL),
1248                        true,
1249                    );
1250                }
1251            }
1252            // Return the log event as is, partial or not. No merging here.
1253            log
1254        };
1255
1256        // Partial or not partial - we return the event we got here, because all
1257        // other cases were handled earlier.
1258        emit!(DockerLogsEventsReceived {
1259            byte_size: log.estimated_json_encoded_size_of(),
1260            container_id: self.id.as_str(),
1261            container_name: &self.metadata.name_str
1262        });
1263
1264        Some(log)
1265    }
1266}
1267
1268struct ContainerMetadata {
1269    /// label.key -> String
1270    labels: HashMap<String, String>,
1271    /// name -> String
1272    name: Value,
1273    /// name
1274    name_str: String,
1275    /// image -> String
1276    image: Value,
1277    /// created_at
1278    created_at: DateTime<Utc>,
1279}
1280
1281impl ContainerMetadata {
1282    fn from_details(details: ContainerInspectResponse) -> Result<Self, ParseError> {
1283        let config = details.config.unwrap();
1284        let name = details.name.unwrap();
1285        let created = details.created.unwrap();
1286
1287        let labels = config.labels.unwrap_or_default();
1288
1289        Ok(ContainerMetadata {
1290            labels,
1291            name: name.as_str().trim_start_matches('/').to_owned().into(),
1292            name_str: name,
1293            image: config.image.unwrap().into(),
1294            created_at: created.with_timezone(&Utc),
1295        })
1296    }
1297}
1298
1299fn line_agg_adapter(
1300    inner: impl Stream<Item = LogEvent> + Unpin,
1301    logic: line_agg::Logic<Bytes, LogEvent>,
1302    log_namespace: LogNamespace,
1303) -> impl Stream<Item = LogEvent> {
1304    let line_agg_in = inner.map(move |mut log| {
1305        let message_value = match log_namespace {
1306            LogNamespace::Vector => log
1307                .remove(event_path!())
1308                .expect("`.` must exist in the event"),
1309            LogNamespace::Legacy => log
1310                .remove(
1311                    log_schema()
1312                        .message_key_target_path()
1313                        .expect("global log_schema.message_key to be valid path"),
1314                )
1315                .expect("`message` must exist in the event"),
1316        };
1317        let stream_value = match log_namespace {
1318            LogNamespace::Vector => log
1319                .get(metadata_path!(DockerLogsConfig::NAME, STREAM))
1320                .expect("`docker_logs.stream` must exist in the metadata"),
1321            LogNamespace::Legacy => log
1322                .get(event_path!(STREAM))
1323                .expect("stream must exist in the event"),
1324        };
1325
1326        let stream = stream_value.coerce_to_bytes();
1327        let message = message_value.coerce_to_bytes();
1328        (stream, message, log)
1329    });
1330    let line_agg_out = LineAgg::<_, Bytes, LogEvent>::new(line_agg_in, logic);
1331    line_agg_out.map(move |(_, message, mut log, _)| {
1332        match log_namespace {
1333            LogNamespace::Vector => log.insert(event_path!(), message),
1334            LogNamespace::Legacy => log.insert(
1335                log_schema()
1336                    .message_key_target_path()
1337                    .expect("global log_schema.message_key to be valid path"),
1338                message,
1339            ),
1340        };
1341        log
1342    })
1343}