vector/sources/docker_logs/
mod.rs

1use std::sync::{Arc, LazyLock};
2use std::{collections::HashMap, convert::TryFrom, future::ready, pin::Pin, time::Duration};
3
4use bollard::query_parameters::{
5    EventsOptionsBuilder, ListContainersOptionsBuilder, LogsOptionsBuilder,
6};
7use bollard::{
8    container::LogOutput,
9    errors::Error as DockerError,
10    query_parameters::InspectContainerOptions,
11    service::{ContainerInspectResponse, EventMessage},
12    Docker,
13};
14use bytes::{Buf, Bytes};
15use chrono::{DateTime, FixedOffset, Local, ParseError, Utc};
16use futures::{Stream, StreamExt};
17use serde_with::serde_as;
18use tokio::sync::mpsc;
19use tracing_futures::Instrument;
20use vector_lib::codecs::{BytesDeserializer, BytesDeserializerConfig};
21use vector_lib::config::{LegacyKey, LogNamespace};
22use vector_lib::configurable::configurable_component;
23use vector_lib::internal_event::{
24    ByteSize, BytesReceived, InternalEventHandle as _, Protocol, Registered,
25};
26use vector_lib::lookup::{
27    lookup_v2::OptionalValuePath, metadata_path, owned_value_path, path, OwnedValuePath, PathPrefix,
28};
29use vrl::event_path;
30use vrl::value::{kind::Collection, Kind};
31
32use super::util::MultilineConfig;
33use crate::{
34    config::{log_schema, DataType, SourceConfig, SourceContext, SourceOutput},
35    docker::{docker, DockerTlsConfig},
36    event::{self, merge_state::LogEventMergeState, EstimatedJsonEncodedSizeOf, LogEvent, Value},
37    internal_events::{
38        DockerLogsCommunicationError, DockerLogsContainerEventReceived,
39        DockerLogsContainerMetadataFetchError, DockerLogsContainerUnwatch,
40        DockerLogsContainerWatch, DockerLogsEventsReceived,
41        DockerLogsLoggingDriverUnsupportedError, DockerLogsTimestampParseError, StreamClosedError,
42    },
43    line_agg::{self, LineAgg},
44    shutdown::ShutdownSignal,
45    SourceSender,
46};
47
48#[cfg(test)]
49mod tests;
50
51const IMAGE: &str = "image";
52const CREATED_AT: &str = "container_created_at";
53const NAME: &str = "container_name";
54const STREAM: &str = "stream";
55const CONTAINER: &str = "container_id";
56// Prevent short hostname from being wrongly recognized as a container's short ID.
57const MIN_HOSTNAME_LENGTH: usize = 6;
58
59static STDERR: LazyLock<Bytes> = LazyLock::new(|| "stderr".into());
60static STDOUT: LazyLock<Bytes> = LazyLock::new(|| "stdout".into());
61static CONSOLE: LazyLock<Bytes> = LazyLock::new(|| "console".into());
62
63/// Configuration for the `docker_logs` source.
64#[serde_as]
65#[configurable_component(source("docker_logs", "Collect container logs from a Docker Daemon."))]
66#[derive(Clone, Debug)]
67#[serde(deny_unknown_fields, default)]
68pub struct DockerLogsConfig {
69    /// Overrides the name of the log field used to add the current hostname to each event.
70    ///
71    /// By default, the [global `log_schema.host_key` option][global_host_key] is used.
72    ///
73    /// [global_host_key]: https://vector.dev/docs/reference/configuration/global-options/#log_schema.host_key
74    host_key: Option<OptionalValuePath>,
75
76    /// Docker host to connect to.
77    ///
78    /// Use an HTTPS URL to enable TLS encryption.
79    ///
80    /// If absent, the `DOCKER_HOST` environment variable is used. If `DOCKER_HOST` is also absent,
81    /// the default Docker local socket (`/var/run/docker.sock` on Unix platforms,
82    /// `//./pipe/docker_engine` on Windows) is used.
83    #[configurable(metadata(docs::examples = "http://localhost:2375"))]
84    #[configurable(metadata(docs::examples = "https://localhost:2376"))]
85    #[configurable(metadata(docs::examples = "unix:///var/run/docker.sock"))]
86    #[configurable(metadata(docs::examples = "npipe:////./pipe/docker_engine"))]
87    #[configurable(metadata(docs::examples = "/var/run/docker.sock"))]
88    #[configurable(metadata(docs::examples = "//./pipe/docker_engine"))]
89    docker_host: Option<String>,
90
91    /// A list of container IDs or names of containers to exclude from log collection.
92    ///
93    /// Matching is prefix first, so specifying a value of `foo` would match any container named `foo` as well as any
94    /// container whose name started with `foo`. This applies equally whether matching container IDs or names.
95    ///
96    /// By default, the source collects logs for all containers. If `exclude_containers` is configured, any
97    /// container that matches a configured exclusion is excluded even if it is also included with
98    /// `include_containers`, so care should be taken when using prefix matches as they cannot be overridden by a
99    /// corresponding entry in `include_containers`, for example, excluding `foo` by attempting to include `foo-specific-id`.
100    ///
101    /// This can be used in conjunction with `include_containers`.
102    #[configurable(metadata(
103        docs::examples = "exclude_",
104        docs::examples = "exclude_me_0",
105        docs::examples = "ad08cc418cf9"
106    ))]
107    exclude_containers: Option<Vec<String>>, // Starts with actually, not exclude
108
109    /// A list of container IDs or names of containers to include in log collection.
110    ///
111    /// Matching is prefix first, so specifying a value of `foo` would match any container named `foo` as well as any
112    /// container whose name started with `foo`. This applies equally whether matching container IDs or names.
113    ///
114    /// By default, the source collects logs for all containers. If `include_containers` is configured, only
115    /// containers that match a configured inclusion and are also not excluded get matched.
116    ///
117    /// This can be used in conjunction with `exclude_containers`.
118    #[configurable(metadata(
119        docs::examples = "include_",
120        docs::examples = "include_me_0",
121        docs::examples = "ad08cc418cf9"
122    ))]
123    include_containers: Option<Vec<String>>, // Starts with actually, not include
124
125    /// A list of container object labels to match against when filtering running containers.
126    ///
127    /// Labels should follow the syntax described in the [Docker object labels](https://docs.docker.com/config/labels-custom-metadata/) documentation.
128    #[configurable(metadata(
129        docs::examples = "org.opencontainers.image.vendor=Vector",
130        docs::examples = "com.mycorp.internal.animal=fish",
131    ))]
132    include_labels: Option<Vec<String>>,
133
134    /// A list of image names to match against.
135    ///
136    /// If not provided, all images are included.
137    #[configurable(metadata(docs::examples = "httpd", docs::examples = "redis",))]
138    include_images: Option<Vec<String>>,
139
140    /// Overrides the name of the log field used to mark an event as partial.
141    ///
142    /// If `auto_partial_merge` is disabled, partial events are emitted with a log field, set by this
143    /// configuration value, indicating that the event is not complete.
144    #[serde(default = "default_partial_event_marker_field")]
145    partial_event_marker_field: Option<String>,
146
147    /// Enables automatic merging of partial events.
148    auto_partial_merge: bool,
149
150    /// The amount of time to wait before retrying after an error.
151    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
152    #[serde(default = "default_retry_backoff_secs")]
153    #[configurable(metadata(docs::human_name = "Retry Backoff"))]
154    retry_backoff_secs: Duration,
155
156    /// Multiline aggregation configuration.
157    ///
158    /// If not specified, multiline aggregation is disabled.
159    #[configurable(derived)]
160    multiline: Option<MultilineConfig>,
161
162    #[configurable(derived)]
163    tls: Option<DockerTlsConfig>,
164
165    /// The namespace to use for logs. This overrides the global setting.
166    #[serde(default)]
167    #[configurable(metadata(docs::hidden))]
168    pub log_namespace: Option<bool>,
169}
170
171impl Default for DockerLogsConfig {
172    fn default() -> Self {
173        Self {
174            host_key: None,
175            docker_host: None,
176            tls: None,
177            exclude_containers: None,
178            include_containers: None,
179            include_labels: None,
180            include_images: None,
181            partial_event_marker_field: default_partial_event_marker_field(),
182            auto_partial_merge: true,
183            multiline: None,
184            retry_backoff_secs: default_retry_backoff_secs(),
185            log_namespace: None,
186        }
187    }
188}
189
190fn default_partial_event_marker_field() -> Option<String> {
191    Some(event::PARTIAL.to_string())
192}
193
194const fn default_retry_backoff_secs() -> Duration {
195    Duration::from_secs(2)
196}
197
198impl DockerLogsConfig {
199    fn container_name_or_id_included<'a>(
200        &self,
201        id: &str,
202        names: impl IntoIterator<Item = &'a str>,
203    ) -> bool {
204        let containers: Vec<String> = names.into_iter().map(Into::into).collect();
205
206        self.include_containers
207            .as_ref()
208            .map(|include_list| Self::name_or_id_matches(id, &containers, include_list))
209            .unwrap_or(true)
210            && !(self
211                .exclude_containers
212                .as_ref()
213                .map(|exclude_list| Self::name_or_id_matches(id, &containers, exclude_list))
214                .unwrap_or(false))
215    }
216
217    fn name_or_id_matches(id: &str, names: &[String], items: &[String]) -> bool {
218        items.iter().any(|flag| id.starts_with(flag))
219            || names
220                .iter()
221                .any(|name| items.iter().any(|item| name.starts_with(item)))
222    }
223
224    fn with_empty_partial_event_marker_field_as_none(mut self) -> Self {
225        if let Some(val) = &self.partial_event_marker_field {
226            if val.is_empty() {
227                self.partial_event_marker_field = None;
228            }
229        }
230        self
231    }
232}
233
234impl_generate_config_from_default!(DockerLogsConfig);
235
236#[async_trait::async_trait]
237#[typetag::serde(name = "docker_logs")]
238impl SourceConfig for DockerLogsConfig {
239    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
240        let log_namespace = cx.log_namespace(self.log_namespace);
241        let source = DockerLogsSource::new(
242            self.clone().with_empty_partial_event_marker_field_as_none(),
243            cx.out,
244            cx.shutdown.clone(),
245            log_namespace,
246        )?;
247
248        // Capture currently running containers, and do main future(run)
249        let fut = async move {
250            match source.handle_running_containers().await {
251                Ok(source) => source.run().await,
252                Err(error) => {
253                    error!(
254                        message = "Listing currently running containers failed.",
255                        %error
256                    );
257                }
258            }
259        };
260
261        let shutdown = cx.shutdown;
262        // Once this ShutdownSignal resolves it will drop DockerLogsSource and by extension it's ShutdownSignal.
263        Ok(Box::pin(async move {
264            Ok(tokio::select! {
265                _ = fut => {}
266                _ = shutdown => {}
267            })
268        }))
269    }
270
271    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
272        let host_key = self
273            .host_key
274            .clone()
275            .unwrap_or(log_schema().host_key().cloned().into())
276            .path
277            .map(LegacyKey::Overwrite);
278
279        let schema_definition = BytesDeserializerConfig
280            .schema_definition(global_log_namespace.merge(self.log_namespace))
281            .with_source_metadata(
282                Self::NAME,
283                host_key,
284                &owned_value_path!("host"),
285                Kind::bytes().or_undefined(),
286                Some("host"),
287            )
288            .with_source_metadata(
289                Self::NAME,
290                Some(LegacyKey::Overwrite(owned_value_path!(CONTAINER))),
291                &owned_value_path!(CONTAINER),
292                Kind::bytes(),
293                None,
294            )
295            .with_source_metadata(
296                Self::NAME,
297                Some(LegacyKey::Overwrite(owned_value_path!(IMAGE))),
298                &owned_value_path!(IMAGE),
299                Kind::bytes(),
300                None,
301            )
302            .with_source_metadata(
303                Self::NAME,
304                Some(LegacyKey::Overwrite(owned_value_path!(NAME))),
305                &owned_value_path!(NAME),
306                Kind::bytes(),
307                None,
308            )
309            .with_source_metadata(
310                Self::NAME,
311                Some(LegacyKey::Overwrite(owned_value_path!(CREATED_AT))),
312                &owned_value_path!(CREATED_AT),
313                Kind::timestamp(),
314                None,
315            )
316            .with_source_metadata(
317                Self::NAME,
318                Some(LegacyKey::Overwrite(owned_value_path!("label"))),
319                &owned_value_path!("labels"),
320                Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
321                None,
322            )
323            .with_source_metadata(
324                Self::NAME,
325                Some(LegacyKey::Overwrite(owned_value_path!(STREAM))),
326                &owned_value_path!(STREAM),
327                Kind::bytes(),
328                None,
329            )
330            .with_source_metadata(
331                Self::NAME,
332                log_schema()
333                    .timestamp_key()
334                    .cloned()
335                    .map(LegacyKey::Overwrite),
336                &owned_value_path!("timestamp"),
337                Kind::timestamp(),
338                Some("timestamp"),
339            )
340            .with_vector_metadata(
341                log_schema().source_type_key(),
342                &owned_value_path!("source_type"),
343                Kind::bytes(),
344                None,
345            )
346            .with_vector_metadata(
347                None,
348                &owned_value_path!("ingest_timestamp"),
349                Kind::timestamp(),
350                None,
351            );
352
353        vec![SourceOutput::new_maybe_logs(
354            DataType::Log,
355            schema_definition,
356        )]
357    }
358
359    fn can_acknowledge(&self) -> bool {
360        false
361    }
362}
363
364struct DockerLogsSourceCore {
365    config: DockerLogsConfig,
366    line_agg_config: Option<line_agg::Config>,
367    docker: Docker,
368    /// Only logs created at, or after this moment are logged.
369    now_timestamp: DateTime<Utc>,
370}
371
372impl DockerLogsSourceCore {
373    fn new(config: DockerLogsConfig) -> crate::Result<Self> {
374        // ?NOTE: Constructs a new Docker instance for a docker host listening at url specified by an env var DOCKER_HOST.
375        // ?      Otherwise connects to unix socket which requires sudo privileges, or docker group membership.
376        let docker = docker(config.docker_host.clone(), config.tls.clone())?;
377
378        // Only log events created at-or-after this moment are logged.
379        let now = Local::now();
380        info!(
381            message = "Capturing logs from now on.",
382            now = %now.to_rfc3339()
383        );
384
385        let line_agg_config = if let Some(ref multiline_config) = config.multiline {
386            Some(line_agg::Config::try_from(multiline_config)?)
387        } else {
388            None
389        };
390
391        Ok(DockerLogsSourceCore {
392            config,
393            line_agg_config,
394            docker,
395            now_timestamp: now.into(),
396        })
397    }
398
399    /// Returns event stream coming from docker.
400    fn docker_logs_event_stream(
401        &self,
402    ) -> impl Stream<Item = Result<EventMessage, DockerError>> + Send + use<> {
403        let mut filters = HashMap::new();
404
405        // event  | emitted on commands
406        // -------+-------------------
407        // start  | docker start, docker run, restart policy, docker restart
408        // unpause | docker unpause
409        // die    | docker restart, docker stop, docker kill, process exited, oom
410        // pause  | docker pause
411        filters.insert(
412            "event".to_owned(),
413            vec![
414                "start".to_owned(),
415                "unpause".to_owned(),
416                "die".to_owned(),
417                "pause".to_owned(),
418            ],
419        );
420        filters.insert("type".to_owned(), vec!["container".to_owned()]);
421
422        // Apply include filters.
423        if let Some(include_labels) = &self.config.include_labels {
424            filters.insert("label".to_owned(), include_labels.clone());
425        }
426
427        if let Some(include_images) = &self.config.include_images {
428            filters.insert("image".to_owned(), include_images.clone());
429        }
430
431        self.docker.events(Some(
432            EventsOptionsBuilder::new()
433                .since(&self.now_timestamp.timestamp().to_string())
434                .filters(&filters)
435                .build(),
436        ))
437    }
438}
439
440/// Main future which listens for events coming from docker, and maintains
441/// a fan of event_stream futures.
442/// Where each event_stream corresponds to a running container marked with ContainerLogInfo.
443/// While running, event_stream streams Events to out channel.
444/// Once a log stream has ended, it sends ContainerLogInfo back to main.
445///
446/// Future  channel     Future      channel
447///           |<---- event_stream ---->out
448/// main <----|<---- event_stream ---->out
449///           | ...                 ...out
450///
451struct DockerLogsSource {
452    esb: EventStreamBuilder,
453    /// event stream from docker
454    events: Pin<Box<dyn Stream<Item = Result<EventMessage, DockerError>> + Send>>,
455    ///  mappings of seen container_id to their data
456    containers: HashMap<ContainerId, ContainerState>,
457    ///receives ContainerLogInfo coming from event stream futures
458    main_recv: mpsc::UnboundedReceiver<Result<ContainerLogInfo, (ContainerId, ErrorPersistence)>>,
459    /// It may contain shortened container id.
460    hostname: Option<String>,
461    backoff_duration: Duration,
462}
463
464impl DockerLogsSource {
465    fn new(
466        config: DockerLogsConfig,
467        out: SourceSender,
468        shutdown: ShutdownSignal,
469        log_namespace: LogNamespace,
470    ) -> crate::Result<DockerLogsSource> {
471        let backoff_secs = config.retry_backoff_secs;
472
473        let host_key = config
474            .host_key
475            .clone()
476            .unwrap_or(log_schema().host_key().cloned().into());
477        let hostname = crate::get_hostname().ok();
478
479        // Only logs created at, or after this moment are logged.
480        let core = DockerLogsSourceCore::new(config)?;
481
482        // main event stream, with whom only newly started/restarted containers will be logged.
483        let events = core.docker_logs_event_stream();
484        info!(message = "Listening to docker log events.");
485
486        // Channel of communication between main future and event_stream futures
487        let (main_send, main_recv) =
488            mpsc::unbounded_channel::<Result<ContainerLogInfo, (ContainerId, ErrorPersistence)>>();
489
490        // Starting with logs from now.
491        // TODO: Is this exception acceptable?
492        // Only somewhat exception to this is case where:
493        // t0 -- outside: container running
494        // t1 -- now_timestamp
495        // t2 -- outside: container stopped
496        // t3 -- list_containers
497        // In that case, logs between [t1,t2] will be pulled to vector only on next start/unpause of that container.
498        let esb = EventStreamBuilder {
499            host_key,
500            hostname: hostname.clone(),
501            core: Arc::new(core),
502            out,
503            main_send,
504            shutdown,
505            log_namespace,
506        };
507
508        Ok(DockerLogsSource {
509            esb,
510            events: Box::pin(events),
511            containers: HashMap::new(),
512            main_recv,
513            hostname,
514            backoff_duration: backoff_secs,
515        })
516    }
517
518    /// Future that captures currently running containers, and starts event streams for them.
519    async fn handle_running_containers(mut self) -> crate::Result<Self> {
520        let mut filters = HashMap::new();
521
522        // Apply include filters
523        if let Some(include_labels) = &self.esb.core.config.include_labels {
524            filters.insert("label".to_owned(), include_labels.clone());
525        }
526
527        if let Some(include_images) = &self.esb.core.config.include_images {
528            filters.insert("ancestor".to_owned(), include_images.clone());
529        }
530
531        self.esb
532            .core
533            .docker
534            .list_containers(Some(
535                ListContainersOptionsBuilder::new()
536                    .all(false)
537                    .filters(&filters)
538                    .build(),
539            ))
540            .await?
541            .into_iter()
542            .for_each(|container| {
543                let id = container.id.unwrap();
544                let names = container.names.unwrap();
545
546                trace!(message = "Found already running container.", id = %id, names = ?names);
547
548                if self.exclude_self(id.as_str()) {
549                    info!(message = "Excluded self container.", id = %id);
550                    return;
551                }
552
553                if !self.esb.core.config.container_name_or_id_included(
554                    id.as_str(),
555                    names.iter().map(|s| {
556                        // In this case bollard / shiplift gives names with starting '/' so it needs to be removed.
557                        let s = s.as_str();
558                        if s.starts_with('/') {
559                            s.split_at('/'.len_utf8()).1
560                        } else {
561                            s
562                        }
563                    }),
564                ) {
565                    info!(message = "Excluded container.", id = %id);
566                    return;
567                }
568
569                let id = ContainerId::new(id);
570                self.containers.insert(id.clone(), self.esb.start(id, None));
571            });
572
573        Ok(self)
574    }
575
576    async fn run(mut self) {
577        loop {
578            tokio::select! {
579                value = self.main_recv.recv() => {
580                    match value {
581                        Some(Ok(info)) => {
582                            let state = self
583                                .containers
584                                .get_mut(&info.id)
585                                .expect("Every ContainerLogInfo has it's ContainerState");
586                            if state.return_info(info) {
587                                self.esb.restart(state);
588                            }
589                        },
590                        Some(Err((id,persistence))) => {
591                            let state = self
592                                .containers
593                                .remove(&id)
594                                .expect("Every started ContainerId has it's ContainerState");
595                            match persistence{
596                                ErrorPersistence::Transient => if state.is_running() {
597                                    let backoff= Some(self.backoff_duration);
598                                    self.containers.insert(id.clone(), self.esb.start(id, backoff));
599                                }
600                                // Forget the container since the error is permanent.
601                                ErrorPersistence::Permanent => (),
602                            }
603                        }
604                        None => {
605                            error!(message = "The docker_logs source main stream has ended unexpectedly.");
606                            info!(message = "Shutting down docker_logs source.");
607                            return;
608                        }
609                    };
610                }
611                value = self.events.next() => {
612                    match value {
613                        Some(Ok(mut event)) => {
614                            let action = event.action.unwrap();
615                            let actor = event.actor.take().unwrap();
616                            let id = actor.id.unwrap();
617                            let attributes = actor.attributes.unwrap();
618
619                            emit!(DockerLogsContainerEventReceived { container_id: &id, action: &action });
620
621                            let id = ContainerId::new(id.to_owned());
622
623                            // Update container status
624                            match action.as_str() {
625                                "die" | "pause" => {
626                                    if let Some(state) = self.containers.get_mut(&id) {
627                                        state.stopped();
628                                    }
629                                }
630                                "start" | "unpause" => {
631                                    if let Some(state) = self.containers.get_mut(&id) {
632                                        state.running();
633                                        self.esb.restart(state);
634                                    } else {
635                                        let include_name =
636                                            self.esb.core.config.container_name_or_id_included(
637                                                id.as_str(),
638                                                attributes.get("name").map(|s| s.as_str()),
639                                            );
640
641                                        let exclude_self = self.exclude_self(id.as_str());
642
643                                        if include_name && !exclude_self {
644                                            self.containers.insert(id.clone(), self.esb.start(id, None));
645                                        }
646                                    }
647                                }
648                                _ => {},
649                            };
650                        }
651                        Some(Err(error)) => {
652                            emit!(DockerLogsCommunicationError {
653                                error,
654                                container_id: None,
655                            });
656                            return;
657                        },
658                        None => {
659                            // TODO: this could be fixed, but should be tried with some timeoff and exponential backoff
660                            error!(message = "Docker log event stream has ended unexpectedly.");
661                            info!(message = "Shutting down docker_logs source.");
662                            return;
663                        }
664                    };
665                }
666            };
667        }
668    }
669
670    fn exclude_self(&self, id: &str) -> bool {
671        self.hostname
672            .as_ref()
673            .map(|hostname| id.starts_with(hostname) && hostname.len() >= MIN_HOSTNAME_LENGTH)
674            .unwrap_or(false)
675    }
676}
677
678/// Used to construct and start event stream futures
679#[derive(Clone)]
680struct EventStreamBuilder {
681    host_key: OptionalValuePath,
682    hostname: Option<String>,
683    core: Arc<DockerLogsSourceCore>,
684    /// Event stream futures send events through this
685    out: SourceSender,
686    /// End through which event stream futures send ContainerLogInfo to main future
687    main_send: mpsc::UnboundedSender<Result<ContainerLogInfo, (ContainerId, ErrorPersistence)>>,
688    /// Self and event streams will end on this.
689    shutdown: ShutdownSignal,
690    log_namespace: LogNamespace,
691}
692
693impl EventStreamBuilder {
694    /// Spawn a task to runs event stream until shutdown.
695    fn start(&self, id: ContainerId, backoff: Option<Duration>) -> ContainerState {
696        let this = self.clone();
697        tokio::spawn(
698            async move {
699                if let Some(duration) = backoff {
700                    tokio::time::sleep(duration).await;
701                }
702
703                match this
704                    .core
705                    .docker
706                    .inspect_container(id.as_str(), None::<InspectContainerOptions>)
707                    .await
708                {
709                    Ok(details) => match ContainerMetadata::from_details(details) {
710                        Ok(metadata) => {
711                            let info = ContainerLogInfo::new(id, metadata, this.core.now_timestamp);
712                            this.run_event_stream(info).await;
713                            return;
714                        }
715                        Err(error) => emit!(DockerLogsTimestampParseError {
716                            error,
717                            container_id: id.as_str()
718                        }),
719                    },
720                    Err(error) => emit!(DockerLogsContainerMetadataFetchError {
721                        error,
722                        container_id: id.as_str()
723                    }),
724                }
725
726                this.finish(Err((id, ErrorPersistence::Transient)));
727            }
728            .in_current_span(),
729        );
730
731        ContainerState::new_running()
732    }
733
734    /// If info is present, restarts event stream which will run until shutdown.
735    fn restart(&self, container: &mut ContainerState) {
736        if let Some(info) = container.take_info() {
737            let this = self.clone();
738            tokio::spawn(this.run_event_stream(info).in_current_span());
739        }
740    }
741
742    async fn run_event_stream(mut self, mut info: ContainerLogInfo) {
743        // Establish connection
744        let options = Some(
745            LogsOptionsBuilder::new()
746                .follow(true)
747                .stdout(true)
748                .stderr(true)
749                .since(info.log_since() as i32) // 2038 bug (I think)
750                .timestamps(true)
751                .build(),
752        );
753
754        let stream = self.core.docker.logs(info.id.as_str(), options);
755        emit!(DockerLogsContainerWatch {
756            container_id: info.id.as_str()
757        });
758
759        // Create event streamer
760        let mut partial_event_merge_state = None;
761
762        let core = Arc::clone(&self.core);
763
764        let bytes_received = register!(BytesReceived::from(Protocol::HTTP));
765
766        let mut error = None;
767        let events_stream = stream
768            .map(|value| {
769                match value {
770                    Ok(message) => Ok(info.new_event(
771                        message,
772                        core.config.partial_event_marker_field.clone(),
773                        core.config.auto_partial_merge,
774                        &mut partial_event_merge_state,
775                        &bytes_received,
776                        self.log_namespace,
777                    )),
778                    Err(error) => {
779                        // On any error, restart connection
780                        match &error {
781                            DockerError::DockerResponseServerError { status_code, .. }
782                                if *status_code == http::StatusCode::NOT_IMPLEMENTED =>
783                            {
784                                emit!(DockerLogsLoggingDriverUnsupportedError {
785                                    error,
786                                    container_id: info.id.as_str(),
787                                });
788                                Err(ErrorPersistence::Permanent)
789                            }
790                            _ => {
791                                emit!(DockerLogsCommunicationError {
792                                    error,
793                                    container_id: Some(info.id.as_str())
794                                });
795                                Err(ErrorPersistence::Transient)
796                            }
797                        }
798                    }
799                }
800            })
801            .take_while(|v| {
802                error = v.as_ref().err().cloned();
803                ready(v.is_ok())
804            })
805            .filter_map(|v| ready(v.ok().flatten()))
806            .take_until(self.shutdown.clone());
807
808        let events_stream: Box<dyn Stream<Item = LogEvent> + Unpin + Send> =
809            if let Some(ref line_agg_config) = core.line_agg_config {
810                Box::new(line_agg_adapter(
811                    events_stream,
812                    line_agg::Logic::new(line_agg_config.clone()),
813                    self.log_namespace,
814                ))
815            } else {
816                Box::new(events_stream)
817            };
818
819        let host_key = self.host_key.clone().path;
820        let hostname = self.hostname.clone();
821        let result = {
822            let mut stream = events_stream
823                .map(move |event| add_hostname(event, &host_key, &hostname, self.log_namespace));
824            self.out.send_event_stream(&mut stream).await.map_err(|_| {
825                let (count, _) = stream.size_hint();
826                emit!(StreamClosedError { count });
827            })
828        };
829
830        // End of stream
831        emit!(DockerLogsContainerUnwatch {
832            container_id: info.id.as_str()
833        });
834
835        let result = match (result, error) {
836            (Ok(()), None) => Ok(info),
837            (Err(()), _) => Err((info.id, ErrorPersistence::Permanent)),
838            (_, Some(occurrence)) => Err((info.id, occurrence)),
839        };
840
841        self.finish(result);
842    }
843
844    fn finish(self, result: Result<ContainerLogInfo, (ContainerId, ErrorPersistence)>) {
845        // This can legally fail when shutting down, and any other
846        // reason should have been logged in the main future.
847        _ = self.main_send.send(result);
848    }
849}
850
851fn add_hostname(
852    mut log: LogEvent,
853    host_key: &Option<OwnedValuePath>,
854    hostname: &Option<String>,
855    log_namespace: LogNamespace,
856) -> LogEvent {
857    if let Some(hostname) = hostname {
858        let legacy_host_key = host_key.as_ref().map(LegacyKey::Overwrite);
859
860        log_namespace.insert_source_metadata(
861            DockerLogsConfig::NAME,
862            &mut log,
863            legacy_host_key,
864            path!("host"),
865            hostname.clone(),
866        );
867    }
868
869    log
870}
871
872#[derive(Copy, Clone, Debug, Eq, PartialEq)]
873enum ErrorPersistence {
874    Transient,
875    Permanent,
876}
877
878/// Container ID as assigned by Docker.
879/// Is actually a string.
880#[derive(Hash, Clone, Eq, PartialEq, Ord, PartialOrd)]
881struct ContainerId(Bytes);
882
883impl ContainerId {
884    fn new(id: String) -> Self {
885        ContainerId(id.into())
886    }
887
888    fn as_str(&self) -> &str {
889        std::str::from_utf8(&self.0).expect("Container Id Bytes aren't String")
890    }
891}
892
893/// Kept by main to keep track of container state
894struct ContainerState {
895    /// None if there is a event_stream of this container.
896    info: Option<ContainerLogInfo>,
897    /// True if Container is currently running
898    running: bool,
899    /// Of running
900    generation: u64,
901}
902
903impl ContainerState {
904    /// It's ContainerLogInfo pair must be created exactly once.
905    const fn new_running() -> Self {
906        ContainerState {
907            info: None,
908            running: true,
909            generation: 0,
910        }
911    }
912
913    const fn running(&mut self) {
914        self.running = true;
915        self.generation += 1;
916    }
917
918    const fn stopped(&mut self) {
919        self.running = false;
920    }
921
922    const fn is_running(&self) -> bool {
923        self.running
924    }
925
926    /// True if it needs to be restarted.
927    #[must_use]
928    fn return_info(&mut self, info: ContainerLogInfo) -> bool {
929        debug_assert!(self.info.is_none());
930        // Generation is the only one strictly necessary,
931        // but with v.running, restarting event_stream is automatically done.
932        let restart = self.running || info.generation < self.generation;
933        self.info = Some(info);
934        restart
935    }
936
937    fn take_info(&mut self) -> Option<ContainerLogInfo> {
938        self.info.take().map(|mut info| {
939            // Update info
940            info.generation = self.generation;
941            info
942        })
943    }
944}
945
946/// Exchanged between main future and event_stream futures
947struct ContainerLogInfo {
948    /// Container docker ID
949    id: ContainerId,
950    /// Timestamp of event which created this struct
951    created: DateTime<Utc>,
952    /// Timestamp of last log message with it's generation
953    last_log: Option<(DateTime<FixedOffset>, u64)>,
954    /// generation of ContainerState at event_stream creation
955    generation: u64,
956    metadata: ContainerMetadata,
957}
958
959impl ContainerLogInfo {
960    /// Container docker ID
961    /// Unix timestamp of event which created this struct
962    const fn new(id: ContainerId, metadata: ContainerMetadata, created: DateTime<Utc>) -> Self {
963        ContainerLogInfo {
964            id,
965            created,
966            last_log: None,
967            generation: 0,
968            metadata,
969        }
970    }
971
972    /// Only logs after or equal to this point need to be fetched
973    fn log_since(&self) -> i64 {
974        self.last_log
975            .as_ref()
976            .map(|(d, _)| d.timestamp())
977            .unwrap_or_else(|| self.created.timestamp())
978            - 1
979    }
980
981    /// Expects timestamp at the beginning of message.
982    /// Expects messages to be ordered by timestamps.
983    fn new_event(
984        &mut self,
985        log_output: LogOutput,
986        partial_event_marker_field: Option<String>,
987        auto_partial_merge: bool,
988        partial_event_merge_state: &mut Option<LogEventMergeState>,
989        bytes_received: &Registered<BytesReceived>,
990        log_namespace: LogNamespace,
991    ) -> Option<LogEvent> {
992        let (stream, mut bytes_message) = match log_output {
993            LogOutput::StdErr { message } => (STDERR.clone(), message),
994            LogOutput::StdOut { message } => (STDOUT.clone(), message),
995            LogOutput::Console { message } => (CONSOLE.clone(), message),
996            LogOutput::StdIn { message: _ } => return None,
997        };
998
999        bytes_received.emit(ByteSize(bytes_message.len()));
1000
1001        let message = String::from_utf8_lossy(&bytes_message);
1002        let mut splitter = message.splitn(2, char::is_whitespace);
1003        let timestamp_str = splitter.next()?;
1004        let timestamp = match DateTime::parse_from_rfc3339(timestamp_str) {
1005            Ok(timestamp) => {
1006                // Timestamp check. This is included to avoid processing the same log multiple times, which can
1007                // occur when a container changes generations, and to avoid processing logs with timestamps before
1008                // the created timestamp.
1009                match self.last_log.as_ref() {
1010                    Some(&(last, generation)) => {
1011                        if last < timestamp || (last == timestamp && generation == self.generation)
1012                        {
1013                            // Noop - log received in order.
1014                        } else {
1015                            // Docker returns logs in order.
1016                            // If we reach this state, this log is from a previous generation of the container.
1017                            // It was already processed, so we can safely skip it.
1018                            trace!(
1019                                message = "Received log from previous container generation.",
1020                                log_timestamp = %timestamp_str,
1021                                last_log_timestamp = %last,
1022                            );
1023                            return None;
1024                        }
1025                    }
1026                    None => {
1027                        if self.created < timestamp.with_timezone(&Utc) {
1028                            // Noop - first log to process.
1029                        } else {
1030                            // Received a log with a timestamp before that provided to the Docker API.
1031                            // This should not happen, but if it does, we can just ignore these logs.
1032                            trace!(
1033                                message = "Received log from before created timestamp.",
1034                                log_timestamp = %timestamp_str,
1035                                created_timestamp = %self.created
1036                            );
1037                            return None;
1038                        }
1039                    }
1040                }
1041
1042                self.last_log = Some((timestamp, self.generation));
1043
1044                let log_len = splitter.next().map(|log| log.len()).unwrap_or(0);
1045                let remove_len = message.len() - log_len;
1046                bytes_message.advance(remove_len);
1047
1048                // Provide the timestamp.
1049                Some(timestamp.with_timezone(&Utc))
1050            }
1051            Err(error) => {
1052                // Received bad timestamp, if any at all.
1053                emit!(DockerLogsTimestampParseError {
1054                    error,
1055                    container_id: self.id.as_str()
1056                });
1057                // So continue normally but without a timestamp.
1058                None
1059            }
1060        };
1061
1062        // Message is actually one line from stderr or stdout, and they are
1063        // delimited with newline, so that newline needs to be removed.
1064        // If there's no newline, the event is considered partial, and will
1065        // either be merged within the docker source, or marked accordingly
1066        // before sending out, depending on the configuration.
1067        let is_partial = if bytes_message
1068            .last()
1069            .map(|&b| b as char == '\n')
1070            .unwrap_or(false)
1071        {
1072            bytes_message.truncate(bytes_message.len() - 1);
1073            if bytes_message
1074                .last()
1075                .map(|&b| b as char == '\r')
1076                .unwrap_or(false)
1077            {
1078                bytes_message.truncate(bytes_message.len() - 1);
1079            }
1080            false
1081        } else {
1082            true
1083        };
1084
1085        // Build the log.
1086        let deserializer = BytesDeserializer;
1087        let mut log = deserializer.parse_single(bytes_message, log_namespace);
1088
1089        // Container ID
1090        log_namespace.insert_source_metadata(
1091            DockerLogsConfig::NAME,
1092            &mut log,
1093            Some(LegacyKey::Overwrite(path!(CONTAINER))),
1094            path!(CONTAINER),
1095            self.id.0.clone(),
1096        );
1097        // Container image
1098        log_namespace.insert_source_metadata(
1099            DockerLogsConfig::NAME,
1100            &mut log,
1101            Some(LegacyKey::Overwrite(path!(IMAGE))),
1102            path!(IMAGE),
1103            self.metadata.image.clone(),
1104        );
1105        // Container name
1106        log_namespace.insert_source_metadata(
1107            DockerLogsConfig::NAME,
1108            &mut log,
1109            Some(LegacyKey::Overwrite(path!(NAME))),
1110            path!(NAME),
1111            self.metadata.name.clone(),
1112        );
1113        // Created at timestamp
1114        log_namespace.insert_source_metadata(
1115            DockerLogsConfig::NAME,
1116            &mut log,
1117            Some(LegacyKey::Overwrite(path!(CREATED_AT))),
1118            path!(CREATED_AT),
1119            self.metadata.created_at,
1120        );
1121        // Labels
1122        if !self.metadata.labels.is_empty() {
1123            for (key, value) in self.metadata.labels.iter() {
1124                log_namespace.insert_source_metadata(
1125                    DockerLogsConfig::NAME,
1126                    &mut log,
1127                    Some(LegacyKey::Overwrite(path!("label", key))),
1128                    path!("labels", key),
1129                    value.clone(),
1130                )
1131            }
1132        }
1133        log_namespace.insert_source_metadata(
1134            DockerLogsConfig::NAME,
1135            &mut log,
1136            Some(LegacyKey::Overwrite(path!(STREAM))),
1137            path!(STREAM),
1138            stream,
1139        );
1140
1141        log_namespace.insert_vector_metadata(
1142            &mut log,
1143            log_schema().source_type_key(),
1144            path!("source_type"),
1145            Bytes::from_static(DockerLogsConfig::NAME.as_bytes()),
1146        );
1147
1148        // This handles the transition from the original timestamp logic. Originally the
1149        // `timestamp_key` was only populated when a timestamp was parsed from the event.
1150        match log_namespace {
1151            LogNamespace::Vector => {
1152                if let Some(timestamp) = timestamp {
1153                    log.insert(
1154                        metadata_path!(DockerLogsConfig::NAME, "timestamp"),
1155                        timestamp,
1156                    );
1157                }
1158
1159                log.insert(metadata_path!("vector", "ingest_timestamp"), Utc::now());
1160            }
1161            LogNamespace::Legacy => {
1162                if let Some(timestamp) = timestamp {
1163                    if let Some(timestamp_key) = log_schema().timestamp_key() {
1164                        log.try_insert((PathPrefix::Event, timestamp_key), timestamp);
1165                    }
1166                }
1167            }
1168        };
1169
1170        // If automatic partial event merging is requested - perform the
1171        // merging.
1172        // Otherwise mark partial events and return all the events with no
1173        // merging.
1174        let log = if auto_partial_merge {
1175            // Partial event events merging logic.
1176
1177            // If event is partial, stash it and return `None`.
1178            if is_partial {
1179                // If we already have a partial event merge state, the current
1180                // message has to be merged into that existing state.
1181                // Otherwise, create a new partial event merge state with the
1182                // current message being the initial one.
1183                if let Some(partial_event_merge_state) = partial_event_merge_state {
1184                    // Depending on the log namespace the actual contents of the log "message" will be
1185                    // found in either the root of the event ("."), or at the globally configured "message_key".
1186                    match log_namespace {
1187                        LogNamespace::Vector => {
1188                            partial_event_merge_state.merge_in_next_event(log, &["."]);
1189                        }
1190                        LogNamespace::Legacy => {
1191                            partial_event_merge_state.merge_in_next_event(
1192                                log,
1193                                &[log_schema()
1194                                    .message_key()
1195                                    .expect("global log_schema.message_key to be valid path")
1196                                    .to_string()],
1197                            );
1198                        }
1199                    }
1200                } else {
1201                    *partial_event_merge_state = Some(LogEventMergeState::new(log));
1202                };
1203                return None;
1204            };
1205
1206            // This is not a partial event. If we have a partial event merge
1207            // state from before, the current event must be a final event, that
1208            // would give us a merged event we can return.
1209            // Otherwise it's just a regular event that we return as-is.
1210            match partial_event_merge_state.take() {
1211                // Depending on the log namespace the actual contents of the log "message" will be
1212                // found in either the root of the event ("."), or at the globally configured "message_key".
1213                Some(partial_event_merge_state) => match log_namespace {
1214                    LogNamespace::Vector => {
1215                        partial_event_merge_state.merge_in_final_event(log, &["."])
1216                    }
1217                    LogNamespace::Legacy => partial_event_merge_state.merge_in_final_event(
1218                        log,
1219                        &[log_schema()
1220                            .message_key()
1221                            .expect("global log_schema.message_key to be valid path")
1222                            .to_string()],
1223                    ),
1224                },
1225                None => log,
1226            }
1227        } else {
1228            // If the event is partial, just set the partial event marker field.
1229            if is_partial {
1230                // Only add partial event marker field if it's requested.
1231                if let Some(partial_event_marker_field) = partial_event_marker_field {
1232                    log_namespace.insert_source_metadata(
1233                        DockerLogsConfig::NAME,
1234                        &mut log,
1235                        Some(LegacyKey::Overwrite(path!(
1236                            partial_event_marker_field.as_str()
1237                        ))),
1238                        path!(event::PARTIAL),
1239                        true,
1240                    );
1241                }
1242            }
1243            // Return the log event as is, partial or not. No merging here.
1244            log
1245        };
1246
1247        // Partial or not partial - we return the event we got here, because all
1248        // other cases were handled earlier.
1249        emit!(DockerLogsEventsReceived {
1250            byte_size: log.estimated_json_encoded_size_of(),
1251            container_id: self.id.as_str(),
1252            container_name: &self.metadata.name_str
1253        });
1254
1255        Some(log)
1256    }
1257}
1258
1259struct ContainerMetadata {
1260    /// label.key -> String
1261    labels: HashMap<String, String>,
1262    /// name -> String
1263    name: Value,
1264    /// name
1265    name_str: String,
1266    /// image -> String
1267    image: Value,
1268    /// created_at
1269    created_at: DateTime<Utc>,
1270}
1271
1272impl ContainerMetadata {
1273    fn from_details(details: ContainerInspectResponse) -> Result<Self, ParseError> {
1274        let config = details.config.unwrap();
1275        let name = details.name.unwrap();
1276        let created = details.created.unwrap();
1277
1278        let labels = config.labels.unwrap_or_default();
1279
1280        Ok(ContainerMetadata {
1281            labels,
1282            name: name.as_str().trim_start_matches('/').to_owned().into(),
1283            name_str: name,
1284            image: config.image.unwrap().into(),
1285            created_at: created.with_timezone(&Utc),
1286        })
1287    }
1288}
1289
1290fn line_agg_adapter(
1291    inner: impl Stream<Item = LogEvent> + Unpin,
1292    logic: line_agg::Logic<Bytes, LogEvent>,
1293    log_namespace: LogNamespace,
1294) -> impl Stream<Item = LogEvent> {
1295    let line_agg_in = inner.map(move |mut log| {
1296        let message_value = match log_namespace {
1297            LogNamespace::Vector => log
1298                .remove(event_path!())
1299                .expect("`.` must exist in the event"),
1300            LogNamespace::Legacy => log
1301                .remove(
1302                    log_schema()
1303                        .message_key_target_path()
1304                        .expect("global log_schema.message_key to be valid path"),
1305                )
1306                .expect("`message` must exist in the event"),
1307        };
1308        let stream_value = match log_namespace {
1309            LogNamespace::Vector => log
1310                .get(metadata_path!(DockerLogsConfig::NAME, STREAM))
1311                .expect("`docker_logs.stream` must exist in the metadata"),
1312            LogNamespace::Legacy => log
1313                .get(event_path!(STREAM))
1314                .expect("stream must exist in the event"),
1315        };
1316
1317        let stream = stream_value.coerce_to_bytes();
1318        let message = message_value.coerce_to_bytes();
1319        (stream, message, log)
1320    });
1321    let line_agg_out = LineAgg::<_, Bytes, LogEvent>::new(line_agg_in, logic);
1322    line_agg_out.map(move |(_, message, mut log, _)| {
1323        match log_namespace {
1324            LogNamespace::Vector => log.insert(event_path!(), message),
1325            LogNamespace::Legacy => log.insert(
1326                log_schema()
1327                    .message_key_target_path()
1328                    .expect("global log_schema.message_key to be valid path"),
1329                message,
1330            ),
1331        };
1332        log
1333    })
1334}