vector/sources/docker_logs/
mod.rs

1use std::{
2    collections::HashMap,
3    convert::TryFrom,
4    future::ready,
5    pin::Pin,
6    sync::{Arc, LazyLock},
7    time::Duration,
8};
9
10use bollard::{
11    Docker,
12    container::LogOutput,
13    errors::Error as DockerError,
14    query_parameters::{
15        EventsOptionsBuilder, InspectContainerOptions, ListContainersOptionsBuilder,
16        LogsOptionsBuilder,
17    },
18    service::{ContainerInspectResponse, EventMessage},
19};
20use bytes::{Buf, Bytes};
21use chrono::{DateTime, FixedOffset, Local, ParseError, Utc};
22use futures::{Stream, StreamExt};
23use serde_with::serde_as;
24use tokio::sync::mpsc;
25use tracing_futures::Instrument;
26use vector_lib::{
27    codecs::{BytesDeserializer, BytesDeserializerConfig},
28    config::{LegacyKey, LogNamespace},
29    configurable::configurable_component,
30    internal_event::{ByteSize, BytesReceived, InternalEventHandle as _, Protocol, Registered},
31    lookup::{
32        OwnedValuePath, PathPrefix, lookup_v2::OptionalValuePath, metadata_path, owned_value_path,
33        path,
34    },
35};
36use vrl::{
37    event_path,
38    value::{Kind, kind::Collection},
39};
40
41use super::util::MultilineConfig;
42use crate::{
43    SourceSender,
44    common::backoff::ExponentialBackoff,
45    config::{DataType, SourceConfig, SourceContext, SourceOutput, log_schema},
46    docker::{DockerTlsConfig, docker},
47    event::{self, EstimatedJsonEncodedSizeOf, LogEvent, Value, merge_state::LogEventMergeState},
48    internal_events::{
49        DockerLogsCommunicationError, DockerLogsContainerEventReceived,
50        DockerLogsContainerMetadataFetchError, DockerLogsContainerUnwatch,
51        DockerLogsContainerWatch, DockerLogsEventsReceived,
52        DockerLogsLoggingDriverUnsupportedError, DockerLogsTimestampParseError, StreamClosedError,
53    },
54    line_agg::{self, LineAgg},
55    shutdown::ShutdownSignal,
56};
57
58#[cfg(test)]
59mod tests;
60
61const IMAGE: &str = "image";
62const CREATED_AT: &str = "container_created_at";
63const NAME: &str = "container_name";
64const STREAM: &str = "stream";
65const CONTAINER: &str = "container_id";
66// Prevent short hostname from being wrongly recognized as a container's short ID.
67const MIN_HOSTNAME_LENGTH: usize = 6;
68
69static STDERR: LazyLock<Bytes> = LazyLock::new(|| "stderr".into());
70static STDOUT: LazyLock<Bytes> = LazyLock::new(|| "stdout".into());
71static CONSOLE: LazyLock<Bytes> = LazyLock::new(|| "console".into());
72
73/// Configuration for the `docker_logs` source.
74#[serde_as]
75#[configurable_component(source("docker_logs", "Collect container logs from a Docker Daemon."))]
76#[derive(Clone, Debug)]
77#[serde(deny_unknown_fields, default)]
78pub struct DockerLogsConfig {
79    /// Overrides the name of the log field used to add the current hostname to each event.
80    ///
81    /// By default, the [global `log_schema.host_key` option][global_host_key] is used.
82    ///
83    /// [global_host_key]: https://vector.dev/docs/reference/configuration/global-options/#log_schema.host_key
84    host_key: Option<OptionalValuePath>,
85
86    /// Docker host to connect to.
87    ///
88    /// Use an HTTPS URL to enable TLS encryption.
89    ///
90    /// If absent, the `DOCKER_HOST` environment variable is used. If `DOCKER_HOST` is also absent,
91    /// the default Docker local socket (`/var/run/docker.sock` on Unix platforms,
92    /// `//./pipe/docker_engine` on Windows) is used.
93    #[configurable(metadata(docs::examples = "http://localhost:2375"))]
94    #[configurable(metadata(docs::examples = "https://localhost:2376"))]
95    #[configurable(metadata(docs::examples = "unix:///var/run/docker.sock"))]
96    #[configurable(metadata(docs::examples = "npipe:////./pipe/docker_engine"))]
97    #[configurable(metadata(docs::examples = "/var/run/docker.sock"))]
98    #[configurable(metadata(docs::examples = "//./pipe/docker_engine"))]
99    docker_host: Option<String>,
100
101    /// A list of container IDs or names of containers to exclude from log collection.
102    ///
103    /// Matching is prefix first, so specifying a value of `foo` would match any container named `foo` as well as any
104    /// container whose name started with `foo`. This applies equally whether matching container IDs or names.
105    ///
106    /// By default, the source collects logs for all containers. If `exclude_containers` is configured, any
107    /// container that matches a configured exclusion is excluded even if it is also included with
108    /// `include_containers`, so care should be taken when using prefix matches as they cannot be overridden by a
109    /// corresponding entry in `include_containers`, for example, excluding `foo` by attempting to include `foo-specific-id`.
110    ///
111    /// This can be used in conjunction with `include_containers`.
112    #[configurable(metadata(
113        docs::examples = "exclude_",
114        docs::examples = "exclude_me_0",
115        docs::examples = "ad08cc418cf9"
116    ))]
117    exclude_containers: Option<Vec<String>>, // Starts with actually, not exclude
118
119    /// A list of container IDs or names of containers to include in log collection.
120    ///
121    /// Matching is prefix first, so specifying a value of `foo` would match any container named `foo` as well as any
122    /// container whose name started with `foo`. This applies equally whether matching container IDs or names.
123    ///
124    /// By default, the source collects logs for all containers. If `include_containers` is configured, only
125    /// containers that match a configured inclusion and are also not excluded get matched.
126    ///
127    /// This can be used in conjunction with `exclude_containers`.
128    #[configurable(metadata(
129        docs::examples = "include_",
130        docs::examples = "include_me_0",
131        docs::examples = "ad08cc418cf9"
132    ))]
133    include_containers: Option<Vec<String>>, // Starts with actually, not include
134
135    /// A list of container object labels to match against when filtering running containers.
136    ///
137    /// Labels should follow the syntax described in the [Docker object labels](https://docs.docker.com/config/labels-custom-metadata/) documentation.
138    #[configurable(metadata(
139        docs::examples = "org.opencontainers.image.vendor=Vector",
140        docs::examples = "com.mycorp.internal.animal=fish",
141    ))]
142    include_labels: Option<Vec<String>>,
143
144    /// A list of image names to match against.
145    ///
146    /// If not provided, all images are included.
147    #[configurable(metadata(docs::examples = "httpd", docs::examples = "redis",))]
148    include_images: Option<Vec<String>>,
149
150    /// Overrides the name of the log field used to mark an event as partial.
151    ///
152    /// If `auto_partial_merge` is disabled, partial events are emitted with a log field, set by this
153    /// configuration value, indicating that the event is not complete.
154    #[serde(default = "default_partial_event_marker_field")]
155    partial_event_marker_field: Option<String>,
156
157    /// Enables automatic merging of partial events.
158    auto_partial_merge: bool,
159
160    /// The amount of time to wait before retrying after an error.
161    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
162    #[serde(default = "default_retry_backoff_secs")]
163    #[configurable(metadata(docs::human_name = "Retry Backoff"))]
164    retry_backoff_secs: Duration,
165
166    /// Multiline aggregation configuration.
167    ///
168    /// If not specified, multiline aggregation is disabled.
169    #[configurable(derived)]
170    multiline: Option<MultilineConfig>,
171
172    #[configurable(derived)]
173    tls: Option<DockerTlsConfig>,
174
175    /// The namespace to use for logs. This overrides the global setting.
176    #[serde(default)]
177    #[configurable(metadata(docs::hidden))]
178    pub log_namespace: Option<bool>,
179}
180
181impl Default for DockerLogsConfig {
182    fn default() -> Self {
183        Self {
184            host_key: None,
185            docker_host: None,
186            tls: None,
187            exclude_containers: None,
188            include_containers: None,
189            include_labels: None,
190            include_images: None,
191            partial_event_marker_field: default_partial_event_marker_field(),
192            auto_partial_merge: true,
193            multiline: None,
194            retry_backoff_secs: default_retry_backoff_secs(),
195            log_namespace: None,
196        }
197    }
198}
199
200fn default_partial_event_marker_field() -> Option<String> {
201    Some(event::PARTIAL.to_string())
202}
203
204const fn default_retry_backoff_secs() -> Duration {
205    Duration::from_secs(2)
206}
207
208impl DockerLogsConfig {
209    fn container_name_or_id_included<'a>(
210        &self,
211        id: &str,
212        names: impl IntoIterator<Item = &'a str>,
213    ) -> bool {
214        let containers: Vec<String> = names.into_iter().map(Into::into).collect();
215
216        self.include_containers
217            .as_ref()
218            .map(|include_list| Self::name_or_id_matches(id, &containers, include_list))
219            .unwrap_or(true)
220            && !(self
221                .exclude_containers
222                .as_ref()
223                .map(|exclude_list| Self::name_or_id_matches(id, &containers, exclude_list))
224                .unwrap_or(false))
225    }
226
227    fn name_or_id_matches(id: &str, names: &[String], items: &[String]) -> bool {
228        items.iter().any(|flag| id.starts_with(flag))
229            || names
230                .iter()
231                .any(|name| items.iter().any(|item| name.starts_with(item)))
232    }
233
234    fn with_empty_partial_event_marker_field_as_none(mut self) -> Self {
235        if let Some(val) = &self.partial_event_marker_field
236            && val.is_empty()
237        {
238            self.partial_event_marker_field = None;
239        }
240        self
241    }
242}
243
244impl_generate_config_from_default!(DockerLogsConfig);
245
246#[async_trait::async_trait]
247#[typetag::serde(name = "docker_logs")]
248impl SourceConfig for DockerLogsConfig {
249    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
250        let log_namespace = cx.log_namespace(self.log_namespace);
251        let source = DockerLogsSource::new(
252            self.clone().with_empty_partial_event_marker_field_as_none(),
253            cx.out,
254            cx.shutdown.clone(),
255            log_namespace,
256        )?;
257
258        // Capture currently running containers, and do main future(run)
259        let fut = async move {
260            match source.handle_running_containers().await {
261                Ok(source) => source.run().await,
262                Err(error) => {
263                    error!(
264                        message = "Listing currently running containers failed.",
265                        ?error
266                    );
267                }
268            }
269        };
270
271        let shutdown = cx.shutdown;
272        // Once this ShutdownSignal resolves it will drop DockerLogsSource and by extension it's ShutdownSignal.
273        Ok(Box::pin(async move {
274            Ok(tokio::select! {
275                _ = fut => {}
276                _ = shutdown => {}
277            })
278        }))
279    }
280
281    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
282        let host_key = self
283            .host_key
284            .clone()
285            .unwrap_or(log_schema().host_key().cloned().into())
286            .path
287            .map(LegacyKey::Overwrite);
288
289        let schema_definition = BytesDeserializerConfig
290            .schema_definition(global_log_namespace.merge(self.log_namespace))
291            .with_source_metadata(
292                Self::NAME,
293                host_key,
294                &owned_value_path!("host"),
295                Kind::bytes().or_undefined(),
296                Some("host"),
297            )
298            .with_source_metadata(
299                Self::NAME,
300                Some(LegacyKey::Overwrite(owned_value_path!(CONTAINER))),
301                &owned_value_path!(CONTAINER),
302                Kind::bytes(),
303                None,
304            )
305            .with_source_metadata(
306                Self::NAME,
307                Some(LegacyKey::Overwrite(owned_value_path!(IMAGE))),
308                &owned_value_path!(IMAGE),
309                Kind::bytes(),
310                None,
311            )
312            .with_source_metadata(
313                Self::NAME,
314                Some(LegacyKey::Overwrite(owned_value_path!(NAME))),
315                &owned_value_path!(NAME),
316                Kind::bytes(),
317                None,
318            )
319            .with_source_metadata(
320                Self::NAME,
321                Some(LegacyKey::Overwrite(owned_value_path!(CREATED_AT))),
322                &owned_value_path!(CREATED_AT),
323                Kind::timestamp(),
324                None,
325            )
326            .with_source_metadata(
327                Self::NAME,
328                Some(LegacyKey::Overwrite(owned_value_path!("label"))),
329                &owned_value_path!("labels"),
330                Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
331                None,
332            )
333            .with_source_metadata(
334                Self::NAME,
335                Some(LegacyKey::Overwrite(owned_value_path!(STREAM))),
336                &owned_value_path!(STREAM),
337                Kind::bytes(),
338                None,
339            )
340            .with_source_metadata(
341                Self::NAME,
342                log_schema()
343                    .timestamp_key()
344                    .cloned()
345                    .map(LegacyKey::Overwrite),
346                &owned_value_path!("timestamp"),
347                Kind::timestamp(),
348                Some("timestamp"),
349            )
350            .with_vector_metadata(
351                log_schema().source_type_key(),
352                &owned_value_path!("source_type"),
353                Kind::bytes(),
354                None,
355            )
356            .with_vector_metadata(
357                None,
358                &owned_value_path!("ingest_timestamp"),
359                Kind::timestamp(),
360                None,
361            );
362
363        vec![SourceOutput::new_maybe_logs(
364            DataType::Log,
365            schema_definition,
366        )]
367    }
368
369    fn can_acknowledge(&self) -> bool {
370        false
371    }
372}
373
374struct DockerLogsSourceCore {
375    config: DockerLogsConfig,
376    line_agg_config: Option<line_agg::Config>,
377    docker: Docker,
378    /// Only logs created at, or after this moment are logged.
379    now_timestamp: DateTime<Utc>,
380}
381
382impl DockerLogsSourceCore {
383    fn new(config: DockerLogsConfig) -> crate::Result<Self> {
384        // ?NOTE: Constructs a new Docker instance for a docker host listening at url specified by an env var DOCKER_HOST.
385        // ?      Otherwise connects to unix socket which requires sudo privileges, or docker group membership.
386        let docker = docker(config.docker_host.clone(), config.tls.clone())?;
387
388        // Only log events created at-or-after this moment are logged.
389        let now = Local::now();
390        info!(
391            message = "Capturing logs from now on.",
392            now = %now.to_rfc3339()
393        );
394
395        let line_agg_config = if let Some(ref multiline_config) = config.multiline {
396            Some(line_agg::Config::try_from(multiline_config)?)
397        } else {
398            None
399        };
400
401        Ok(DockerLogsSourceCore {
402            config,
403            line_agg_config,
404            docker,
405            now_timestamp: now.into(),
406        })
407    }
408
409    /// Returns event stream coming from docker.
410    fn docker_logs_event_stream(
411        &self,
412    ) -> impl Stream<Item = Result<EventMessage, DockerError>> + Send + use<> {
413        let mut filters = HashMap::new();
414
415        // event  | emitted on commands
416        // -------+-------------------
417        // start  | docker start, docker run, restart policy, docker restart
418        // unpause | docker unpause
419        // die    | docker restart, docker stop, docker kill, process exited, oom
420        // pause  | docker pause
421        filters.insert(
422            "event".to_owned(),
423            vec![
424                "start".to_owned(),
425                "unpause".to_owned(),
426                "die".to_owned(),
427                "pause".to_owned(),
428            ],
429        );
430        filters.insert("type".to_owned(), vec!["container".to_owned()]);
431
432        // Apply include filters.
433        if let Some(include_labels) = &self.config.include_labels {
434            filters.insert("label".to_owned(), include_labels.clone());
435        }
436
437        if let Some(include_images) = &self.config.include_images {
438            filters.insert("image".to_owned(), include_images.clone());
439        }
440
441        self.docker.events(Some(
442            EventsOptionsBuilder::new()
443                .since(&self.now_timestamp.timestamp().to_string())
444                .filters(&filters)
445                .build(),
446        ))
447    }
448}
449
450/// Main future which listens for events coming from docker, and maintains
451/// a fan of event_stream futures.
452/// Where each event_stream corresponds to a running container marked with ContainerLogInfo.
453/// While running, event_stream streams Events to out channel.
454/// Once a log stream has ended, it sends ContainerLogInfo back to main.
455///
456/// Future  channel     Future      channel
457///           |<---- event_stream ---->out
458/// main <----|<---- event_stream ---->out
459///           | ...                 ...out
460///
461struct DockerLogsSource {
462    esb: EventStreamBuilder,
463    /// event stream from docker
464    events: Pin<Box<dyn Stream<Item = Result<EventMessage, DockerError>> + Send>>,
465    ///  mappings of seen container_id to their data
466    containers: HashMap<ContainerId, ContainerState>,
467    ///receives ContainerLogInfo coming from event stream futures
468    main_recv: mpsc::UnboundedReceiver<Result<ContainerLogInfo, (ContainerId, ErrorPersistence)>>,
469    /// It may contain shortened container id.
470    hostname: Option<String>,
471    backoff_duration: Duration,
472    /// Backoff strategy for events stream retries
473    events_backoff: ExponentialBackoff,
474}
475
476impl DockerLogsSource {
477    fn new(
478        config: DockerLogsConfig,
479        out: SourceSender,
480        shutdown: ShutdownSignal,
481        log_namespace: LogNamespace,
482    ) -> crate::Result<DockerLogsSource> {
483        let backoff_secs = config.retry_backoff_secs;
484
485        let host_key = config
486            .host_key
487            .clone()
488            .unwrap_or(log_schema().host_key().cloned().into());
489        let hostname = crate::get_hostname().ok();
490
491        // Only logs created at, or after this moment are logged.
492        let core = DockerLogsSourceCore::new(config)?;
493
494        // main event stream, with whom only newly started/restarted containers will be logged.
495        let events = core.docker_logs_event_stream();
496        info!(message = "Listening to docker log events.");
497
498        // Channel of communication between main future and event_stream futures
499        let (main_send, main_recv) =
500            mpsc::unbounded_channel::<Result<ContainerLogInfo, (ContainerId, ErrorPersistence)>>();
501
502        // Starting with logs from now.
503        // TODO: Is this exception acceptable?
504        // Only somewhat exception to this is case where:
505        // t0 -- outside: container running
506        // t1 -- now_timestamp
507        // t2 -- outside: container stopped
508        // t3 -- list_containers
509        // In that case, logs between [t1,t2] will be pulled to vector only on next start/unpause of that container.
510        let esb = EventStreamBuilder {
511            host_key,
512            hostname: hostname.clone(),
513            core: Arc::new(core),
514            out,
515            main_send,
516            shutdown,
517            log_namespace,
518        };
519
520        Ok(DockerLogsSource {
521            esb,
522            events: Box::pin(events),
523            containers: HashMap::new(),
524            main_recv,
525            hostname,
526            backoff_duration: backoff_secs,
527            events_backoff: ExponentialBackoff::default(),
528        })
529    }
530
531    /// Future that captures currently running containers, and starts event streams for them.
532    async fn handle_running_containers(mut self) -> crate::Result<Self> {
533        let mut filters = HashMap::new();
534
535        // Apply include filters
536        if let Some(include_labels) = &self.esb.core.config.include_labels {
537            filters.insert("label".to_owned(), include_labels.clone());
538        }
539
540        if let Some(include_images) = &self.esb.core.config.include_images {
541            filters.insert("ancestor".to_owned(), include_images.clone());
542        }
543
544        self.esb
545            .core
546            .docker
547            .list_containers(Some(
548                ListContainersOptionsBuilder::new()
549                    .all(false)
550                    .filters(&filters)
551                    .build(),
552            ))
553            .await?
554            .into_iter()
555            .for_each(|container| {
556                let id = container.id.unwrap();
557                let names = container.names.unwrap();
558
559                trace!(message = "Found already running container.", id = %id, names = ?names);
560
561                if self.exclude_self(id.as_str()) {
562                    info!(message = "Excluded self container.", id = %id);
563                    return;
564                }
565
566                if !self.esb.core.config.container_name_or_id_included(
567                    id.as_str(),
568                    names.iter().map(|s| {
569                        // In this case bollard / shiplift gives names with starting '/' so it needs to be removed.
570                        let s = s.as_str();
571                        if s.starts_with('/') {
572                            s.split_at('/'.len_utf8()).1
573                        } else {
574                            s
575                        }
576                    }),
577                ) {
578                    info!(message = "Excluded container.", id = %id);
579                    return;
580                }
581
582                let id = ContainerId::new(id);
583                self.containers.insert(id.clone(), self.esb.start(id, None));
584            });
585
586        Ok(self)
587    }
588
589    async fn run(mut self) {
590        loop {
591            tokio::select! {
592                value = self.main_recv.recv() => {
593                    match value {
594                        Some(Ok(info)) => {
595                            let state = self
596                                .containers
597                                .get_mut(&info.id)
598                                .expect("Every ContainerLogInfo has it's ContainerState");
599                            if state.return_info(info) {
600                                self.esb.restart(state);
601                            }
602                        },
603                        Some(Err((id,persistence))) => {
604                            let state = self
605                                .containers
606                                .remove(&id)
607                                .expect("Every started ContainerId has it's ContainerState");
608                            match persistence{
609                                ErrorPersistence::Transient => if state.is_running() {
610                                    let backoff= Some(self.backoff_duration);
611                                    self.containers.insert(id.clone(), self.esb.start(id, backoff));
612                                }
613                                // Forget the container since the error is permanent.
614                                ErrorPersistence::Permanent => (),
615                            }
616                        }
617                        None => {
618                            error!(message = "The docker_logs source main stream has ended unexpectedly.", internal_log_rate_limit = false);
619                            info!(message = "Shutting down docker_logs source.");
620                            return;
621                        }
622                    };
623                }
624                value = self.events.next() => {
625                    match value {
626                        Some(Ok(mut event)) => {
627                            // Reset backoff on successful event
628                            self.events_backoff.reset();
629
630                            let action = event.action.unwrap();
631                            let actor = event.actor.take().unwrap();
632                            let id = actor.id.unwrap();
633                            let attributes = actor.attributes.unwrap();
634
635                            emit!(DockerLogsContainerEventReceived { container_id: &id, action: &action });
636
637                            let id = ContainerId::new(id.to_owned());
638
639                            // Update container status
640                            match action.as_str() {
641                                "die" | "pause" => {
642                                    if let Some(state) = self.containers.get_mut(&id) {
643                                        state.stopped();
644                                    }
645                                }
646                                "start" | "unpause" => {
647                                    if let Some(state) = self.containers.get_mut(&id) {
648                                        state.running();
649                                        self.esb.restart(state);
650                                    } else {
651                                        let include_name =
652                                            self.esb.core.config.container_name_or_id_included(
653                                                id.as_str(),
654                                                attributes.get("name").map(|s| s.as_str()),
655                                            );
656
657                                        let exclude_self = self.exclude_self(id.as_str());
658
659                                        if include_name && !exclude_self {
660                                            self.containers.insert(id.clone(), self.esb.start(id, None));
661                                        }
662                                    }
663                                }
664                                _ => {},
665                            };
666                        }
667                        Some(Err(error)) => {
668                            emit!(DockerLogsCommunicationError {
669                                error,
670                                container_id: None,
671                            });
672                            // Retry events stream with exponential backoff
673                            if !self.retry_events_stream_with_backoff("Docker events stream failed").await {
674                                error!("Docker events stream failed and retry exhausted, shutting down.");
675                                return;
676                            }
677                        },
678                        None => {
679                            // Retry events stream with exponential backoff
680                            if !self.retry_events_stream_with_backoff("Docker events stream ended").await {
681                                error!("Docker events stream ended and retry exhausted, shutting down.");
682                                return;
683                            }
684                        }
685                    };
686                }
687            };
688        }
689    }
690
691    /// Retry events stream with exponential backoff
692    /// Returns true if retry was attempted, false if exhausted or shutdown
693    async fn retry_events_stream_with_backoff(&mut self, reason: &str) -> bool {
694        if let Some(delay) = self.events_backoff.next() {
695            warn!(
696                message = reason,
697                action = "retrying with backoff",
698                delay_ms = delay.as_millis()
699            );
700            tokio::select! {
701                _ = tokio::time::sleep(delay) => {
702                    self.events = Box::pin(self.esb.core.docker_logs_event_stream());
703                    true
704                }
705                _ = self.esb.shutdown.clone() => {
706                    info!("Shutdown signal received during retry backoff.");
707                    false
708                }
709            }
710        } else {
711            error!(message = "Events stream retry exhausted.", reason = reason);
712            false
713        }
714    }
715
716    fn exclude_self(&self, id: &str) -> bool {
717        self.hostname
718            .as_ref()
719            .map(|hostname| id.starts_with(hostname) && hostname.len() >= MIN_HOSTNAME_LENGTH)
720            .unwrap_or(false)
721    }
722}
723
724/// Used to construct and start event stream futures
725#[derive(Clone)]
726struct EventStreamBuilder {
727    host_key: OptionalValuePath,
728    hostname: Option<String>,
729    core: Arc<DockerLogsSourceCore>,
730    /// Event stream futures send events through this
731    out: SourceSender,
732    /// End through which event stream futures send ContainerLogInfo to main future
733    main_send: mpsc::UnboundedSender<Result<ContainerLogInfo, (ContainerId, ErrorPersistence)>>,
734    /// Self and event streams will end on this.
735    shutdown: ShutdownSignal,
736    log_namespace: LogNamespace,
737}
738
739impl EventStreamBuilder {
740    /// Spawn a task to runs event stream until shutdown.
741    fn start(&self, id: ContainerId, backoff: Option<Duration>) -> ContainerState {
742        let this = self.clone();
743        tokio::spawn(
744            async move {
745                if let Some(duration) = backoff {
746                    tokio::time::sleep(duration).await;
747                }
748
749                match this
750                    .core
751                    .docker
752                    .inspect_container(id.as_str(), None::<InspectContainerOptions>)
753                    .await
754                {
755                    Ok(details) => match ContainerMetadata::from_details(details) {
756                        Ok(metadata) => {
757                            let info = ContainerLogInfo::new(id, metadata, this.core.now_timestamp);
758                            this.run_event_stream(info).await;
759                            return;
760                        }
761                        Err(error) => emit!(DockerLogsTimestampParseError {
762                            error,
763                            container_id: id.as_str()
764                        }),
765                    },
766                    Err(error) => emit!(DockerLogsContainerMetadataFetchError {
767                        error,
768                        container_id: id.as_str()
769                    }),
770                }
771
772                this.finish(Err((id, ErrorPersistence::Transient)));
773            }
774            .in_current_span(),
775        );
776
777        ContainerState::new_running()
778    }
779
780    /// If info is present, restarts event stream which will run until shutdown.
781    fn restart(&self, container: &mut ContainerState) {
782        if let Some(info) = container.take_info() {
783            let this = self.clone();
784            tokio::spawn(this.run_event_stream(info).in_current_span());
785        }
786    }
787
788    async fn run_event_stream(mut self, mut info: ContainerLogInfo) {
789        // Establish connection
790        let options = Some(
791            LogsOptionsBuilder::new()
792                .follow(true)
793                .stdout(true)
794                .stderr(true)
795                .since(info.log_since() as i32) // 2038 bug (I think)
796                .timestamps(true)
797                .build(),
798        );
799
800        let stream = self.core.docker.logs(info.id.as_str(), options);
801        emit!(DockerLogsContainerWatch {
802            container_id: info.id.as_str()
803        });
804
805        // Create event streamer
806        let mut partial_event_merge_state = None;
807
808        let core = Arc::clone(&self.core);
809
810        let bytes_received = register!(BytesReceived::from(Protocol::HTTP));
811
812        let mut error = None;
813        let events_stream = stream
814            .map(|value| {
815                match value {
816                    Ok(message) => Ok(info.new_event(
817                        message,
818                        core.config.partial_event_marker_field.clone(),
819                        core.config.auto_partial_merge,
820                        &mut partial_event_merge_state,
821                        &bytes_received,
822                        self.log_namespace,
823                    )),
824                    Err(error) => {
825                        // On any error, restart connection
826                        match &error {
827                            DockerError::DockerResponseServerError { status_code, .. }
828                                if *status_code == http::StatusCode::NOT_IMPLEMENTED =>
829                            {
830                                emit!(DockerLogsLoggingDriverUnsupportedError {
831                                    error,
832                                    container_id: info.id.as_str(),
833                                });
834                                Err(ErrorPersistence::Permanent)
835                            }
836                            _ => {
837                                emit!(DockerLogsCommunicationError {
838                                    error,
839                                    container_id: Some(info.id.as_str())
840                                });
841                                Err(ErrorPersistence::Transient)
842                            }
843                        }
844                    }
845                }
846            })
847            .take_while(|v| {
848                error = v.as_ref().err().cloned();
849                ready(v.is_ok())
850            })
851            .filter_map(|v| ready(v.ok().flatten()))
852            .take_until(self.shutdown.clone());
853
854        let events_stream: Box<dyn Stream<Item = LogEvent> + Unpin + Send> =
855            if let Some(ref line_agg_config) = core.line_agg_config {
856                Box::new(line_agg_adapter(
857                    events_stream,
858                    line_agg::Logic::new(line_agg_config.clone()),
859                    self.log_namespace,
860                ))
861            } else {
862                Box::new(events_stream)
863            };
864
865        let host_key = self.host_key.clone().path;
866        let hostname = self.hostname.clone();
867        let result = {
868            let mut stream = events_stream
869                .map(move |event| add_hostname(event, &host_key, &hostname, self.log_namespace));
870            self.out.send_event_stream(&mut stream).await.map_err(|_| {
871                let (count, _) = stream.size_hint();
872                emit!(StreamClosedError { count });
873            })
874        };
875
876        // End of stream
877        emit!(DockerLogsContainerUnwatch {
878            container_id: info.id.as_str()
879        });
880
881        let result = match (result, error) {
882            (Ok(()), None) => Ok(info),
883            (Err(()), _) => Err((info.id, ErrorPersistence::Permanent)),
884            (_, Some(occurrence)) => Err((info.id, occurrence)),
885        };
886
887        self.finish(result);
888    }
889
890    fn finish(self, result: Result<ContainerLogInfo, (ContainerId, ErrorPersistence)>) {
891        // This can legally fail when shutting down, and any other
892        // reason should have been logged in the main future.
893        _ = self.main_send.send(result);
894    }
895}
896
897fn add_hostname(
898    mut log: LogEvent,
899    host_key: &Option<OwnedValuePath>,
900    hostname: &Option<String>,
901    log_namespace: LogNamespace,
902) -> LogEvent {
903    if let Some(hostname) = hostname {
904        let legacy_host_key = host_key.as_ref().map(LegacyKey::Overwrite);
905
906        log_namespace.insert_source_metadata(
907            DockerLogsConfig::NAME,
908            &mut log,
909            legacy_host_key,
910            path!("host"),
911            hostname.clone(),
912        );
913    }
914
915    log
916}
917
918#[derive(Copy, Clone, Debug, Eq, PartialEq)]
919enum ErrorPersistence {
920    Transient,
921    Permanent,
922}
923
924/// Container ID as assigned by Docker.
925/// Is actually a string.
926#[derive(Hash, Clone, Eq, PartialEq, Ord, PartialOrd)]
927struct ContainerId(Bytes);
928
929impl ContainerId {
930    fn new(id: String) -> Self {
931        ContainerId(id.into())
932    }
933
934    fn as_str(&self) -> &str {
935        std::str::from_utf8(&self.0).expect("Container Id Bytes aren't String")
936    }
937}
938
939/// Kept by main to keep track of container state
940struct ContainerState {
941    /// None if there is a event_stream of this container.
942    info: Option<ContainerLogInfo>,
943    /// True if Container is currently running
944    running: bool,
945    /// Of running
946    generation: u64,
947}
948
949impl ContainerState {
950    /// It's ContainerLogInfo pair must be created exactly once.
951    const fn new_running() -> Self {
952        ContainerState {
953            info: None,
954            running: true,
955            generation: 0,
956        }
957    }
958
959    const fn running(&mut self) {
960        self.running = true;
961        self.generation += 1;
962    }
963
964    const fn stopped(&mut self) {
965        self.running = false;
966    }
967
968    const fn is_running(&self) -> bool {
969        self.running
970    }
971
972    /// True if it needs to be restarted.
973    #[must_use]
974    fn return_info(&mut self, info: ContainerLogInfo) -> bool {
975        debug_assert!(self.info.is_none());
976        // Generation is the only one strictly necessary,
977        // but with v.running, restarting event_stream is automatically done.
978        let restart = self.running || info.generation < self.generation;
979        self.info = Some(info);
980        restart
981    }
982
983    fn take_info(&mut self) -> Option<ContainerLogInfo> {
984        self.info.take().map(|mut info| {
985            // Update info
986            info.generation = self.generation;
987            info
988        })
989    }
990}
991
992/// Exchanged between main future and event_stream futures
993struct ContainerLogInfo {
994    /// Container docker ID
995    id: ContainerId,
996    /// Timestamp of event which created this struct
997    created: DateTime<Utc>,
998    /// Timestamp of last log message with it's generation
999    last_log: Option<(DateTime<FixedOffset>, u64)>,
1000    /// generation of ContainerState at event_stream creation
1001    generation: u64,
1002    metadata: ContainerMetadata,
1003}
1004
1005impl ContainerLogInfo {
1006    /// Container docker ID
1007    /// Unix timestamp of event which created this struct
1008    const fn new(id: ContainerId, metadata: ContainerMetadata, created: DateTime<Utc>) -> Self {
1009        ContainerLogInfo {
1010            id,
1011            created,
1012            last_log: None,
1013            generation: 0,
1014            metadata,
1015        }
1016    }
1017
1018    /// Only logs after or equal to this point need to be fetched
1019    fn log_since(&self) -> i64 {
1020        self.last_log
1021            .as_ref()
1022            .map(|(d, _)| d.timestamp())
1023            .unwrap_or_else(|| self.created.timestamp())
1024            - 1
1025    }
1026
1027    /// Expects timestamp at the beginning of message.
1028    /// Expects messages to be ordered by timestamps.
1029    fn new_event(
1030        &mut self,
1031        log_output: LogOutput,
1032        partial_event_marker_field: Option<String>,
1033        auto_partial_merge: bool,
1034        partial_event_merge_state: &mut Option<LogEventMergeState>,
1035        bytes_received: &Registered<BytesReceived>,
1036        log_namespace: LogNamespace,
1037    ) -> Option<LogEvent> {
1038        let (stream, mut bytes_message) = match log_output {
1039            LogOutput::StdErr { message } => (STDERR.clone(), message),
1040            LogOutput::StdOut { message } => (STDOUT.clone(), message),
1041            LogOutput::Console { message } => (CONSOLE.clone(), message),
1042            LogOutput::StdIn { message: _ } => return None,
1043        };
1044
1045        bytes_received.emit(ByteSize(bytes_message.len()));
1046
1047        let message = String::from_utf8_lossy(&bytes_message);
1048        let mut splitter = message.splitn(2, char::is_whitespace);
1049        let timestamp_str = splitter.next()?;
1050        let timestamp = match DateTime::parse_from_rfc3339(timestamp_str) {
1051            Ok(timestamp) => {
1052                // Timestamp check. This is included to avoid processing the same log multiple times, which can
1053                // occur when a container changes generations, and to avoid processing logs with timestamps before
1054                // the created timestamp.
1055                match self.last_log.as_ref() {
1056                    Some(&(last, generation)) => {
1057                        if last < timestamp || (last == timestamp && generation == self.generation)
1058                        {
1059                            // Noop - log received in order.
1060                        } else {
1061                            // Docker returns logs in order.
1062                            // If we reach this state, this log is from a previous generation of the container.
1063                            // It was already processed, so we can safely skip it.
1064                            trace!(
1065                                message = "Received log from previous container generation.",
1066                                log_timestamp = %timestamp_str,
1067                                last_log_timestamp = %last,
1068                            );
1069                            return None;
1070                        }
1071                    }
1072                    None => {
1073                        if self.created < timestamp.with_timezone(&Utc) {
1074                            // Noop - first log to process.
1075                        } else {
1076                            // Received a log with a timestamp before that provided to the Docker API.
1077                            // This should not happen, but if it does, we can just ignore these logs.
1078                            trace!(
1079                                message = "Received log from before created timestamp.",
1080                                log_timestamp = %timestamp_str,
1081                                created_timestamp = %self.created
1082                            );
1083                            return None;
1084                        }
1085                    }
1086                }
1087
1088                self.last_log = Some((timestamp, self.generation));
1089
1090                let log_len = splitter.next().map(|log| log.len()).unwrap_or(0);
1091                let remove_len = message.len() - log_len;
1092                bytes_message.advance(remove_len);
1093
1094                // Provide the timestamp.
1095                Some(timestamp.with_timezone(&Utc))
1096            }
1097            Err(error) => {
1098                // Received bad timestamp, if any at all.
1099                emit!(DockerLogsTimestampParseError {
1100                    error,
1101                    container_id: self.id.as_str()
1102                });
1103                // So continue normally but without a timestamp.
1104                None
1105            }
1106        };
1107
1108        // Message is actually one line from stderr or stdout, and they are
1109        // delimited with newline, so that newline needs to be removed.
1110        // If there's no newline, the event is considered partial, and will
1111        // either be merged within the docker source, or marked accordingly
1112        // before sending out, depending on the configuration.
1113        let is_partial = if bytes_message
1114            .last()
1115            .map(|&b| b as char == '\n')
1116            .unwrap_or(false)
1117        {
1118            bytes_message.truncate(bytes_message.len() - 1);
1119            if bytes_message
1120                .last()
1121                .map(|&b| b as char == '\r')
1122                .unwrap_or(false)
1123            {
1124                bytes_message.truncate(bytes_message.len() - 1);
1125            }
1126            false
1127        } else {
1128            true
1129        };
1130
1131        // Build the log.
1132        let deserializer = BytesDeserializer;
1133        let mut log = deserializer.parse_single(bytes_message, log_namespace);
1134
1135        // Container ID
1136        log_namespace.insert_source_metadata(
1137            DockerLogsConfig::NAME,
1138            &mut log,
1139            Some(LegacyKey::Overwrite(path!(CONTAINER))),
1140            path!(CONTAINER),
1141            self.id.0.clone(),
1142        );
1143        // Container image
1144        log_namespace.insert_source_metadata(
1145            DockerLogsConfig::NAME,
1146            &mut log,
1147            Some(LegacyKey::Overwrite(path!(IMAGE))),
1148            path!(IMAGE),
1149            self.metadata.image.clone(),
1150        );
1151        // Container name
1152        log_namespace.insert_source_metadata(
1153            DockerLogsConfig::NAME,
1154            &mut log,
1155            Some(LegacyKey::Overwrite(path!(NAME))),
1156            path!(NAME),
1157            self.metadata.name.clone(),
1158        );
1159        // Created at timestamp
1160        log_namespace.insert_source_metadata(
1161            DockerLogsConfig::NAME,
1162            &mut log,
1163            Some(LegacyKey::Overwrite(path!(CREATED_AT))),
1164            path!(CREATED_AT),
1165            self.metadata.created_at,
1166        );
1167        // Labels
1168        if !self.metadata.labels.is_empty() {
1169            for (key, value) in self.metadata.labels.iter() {
1170                log_namespace.insert_source_metadata(
1171                    DockerLogsConfig::NAME,
1172                    &mut log,
1173                    Some(LegacyKey::Overwrite(path!("label", key))),
1174                    path!("labels", key),
1175                    value.clone(),
1176                )
1177            }
1178        }
1179        log_namespace.insert_source_metadata(
1180            DockerLogsConfig::NAME,
1181            &mut log,
1182            Some(LegacyKey::Overwrite(path!(STREAM))),
1183            path!(STREAM),
1184            stream,
1185        );
1186
1187        log_namespace.insert_vector_metadata(
1188            &mut log,
1189            log_schema().source_type_key(),
1190            path!("source_type"),
1191            Bytes::from_static(DockerLogsConfig::NAME.as_bytes()),
1192        );
1193
1194        // This handles the transition from the original timestamp logic. Originally the
1195        // `timestamp_key` was only populated when a timestamp was parsed from the event.
1196        match log_namespace {
1197            LogNamespace::Vector => {
1198                if let Some(timestamp) = timestamp {
1199                    log.insert(
1200                        metadata_path!(DockerLogsConfig::NAME, "timestamp"),
1201                        timestamp,
1202                    );
1203                }
1204
1205                log.insert(metadata_path!("vector", "ingest_timestamp"), Utc::now());
1206            }
1207            LogNamespace::Legacy => {
1208                if let Some(timestamp) = timestamp
1209                    && let Some(timestamp_key) = log_schema().timestamp_key()
1210                {
1211                    log.try_insert((PathPrefix::Event, timestamp_key), timestamp);
1212                }
1213            }
1214        };
1215
1216        // If automatic partial event merging is requested - perform the
1217        // merging.
1218        // Otherwise mark partial events and return all the events with no
1219        // merging.
1220        let log = if auto_partial_merge {
1221            // Partial event events merging logic.
1222
1223            // If event is partial, stash it and return `None`.
1224            if is_partial {
1225                // If we already have a partial event merge state, the current
1226                // message has to be merged into that existing state.
1227                // Otherwise, create a new partial event merge state with the
1228                // current message being the initial one.
1229                if let Some(partial_event_merge_state) = partial_event_merge_state {
1230                    // Depending on the log namespace the actual contents of the log "message" will be
1231                    // found in either the root of the event ("."), or at the globally configured "message_key".
1232                    match log_namespace {
1233                        LogNamespace::Vector => {
1234                            partial_event_merge_state.merge_in_next_event(log, &["."]);
1235                        }
1236                        LogNamespace::Legacy => {
1237                            partial_event_merge_state.merge_in_next_event(
1238                                log,
1239                                &[log_schema()
1240                                    .message_key()
1241                                    .expect("global log_schema.message_key to be valid path")
1242                                    .to_string()],
1243                            );
1244                        }
1245                    }
1246                } else {
1247                    *partial_event_merge_state = Some(LogEventMergeState::new(log));
1248                };
1249                return None;
1250            };
1251
1252            // This is not a partial event. If we have a partial event merge
1253            // state from before, the current event must be a final event, that
1254            // would give us a merged event we can return.
1255            // Otherwise it's just a regular event that we return as-is.
1256            match partial_event_merge_state.take() {
1257                // Depending on the log namespace the actual contents of the log "message" will be
1258                // found in either the root of the event ("."), or at the globally configured "message_key".
1259                Some(partial_event_merge_state) => match log_namespace {
1260                    LogNamespace::Vector => {
1261                        partial_event_merge_state.merge_in_final_event(log, &["."])
1262                    }
1263                    LogNamespace::Legacy => partial_event_merge_state.merge_in_final_event(
1264                        log,
1265                        &[log_schema()
1266                            .message_key()
1267                            .expect("global log_schema.message_key to be valid path")
1268                            .to_string()],
1269                    ),
1270                },
1271                None => log,
1272            }
1273        } else {
1274            // If the event is partial, just set the partial event marker field.
1275            if is_partial {
1276                // Only add partial event marker field if it's requested.
1277                if let Some(partial_event_marker_field) = partial_event_marker_field {
1278                    log_namespace.insert_source_metadata(
1279                        DockerLogsConfig::NAME,
1280                        &mut log,
1281                        Some(LegacyKey::Overwrite(path!(
1282                            partial_event_marker_field.as_str()
1283                        ))),
1284                        path!(event::PARTIAL),
1285                        true,
1286                    );
1287                }
1288            }
1289            // Return the log event as is, partial or not. No merging here.
1290            log
1291        };
1292
1293        // Partial or not partial - we return the event we got here, because all
1294        // other cases were handled earlier.
1295        emit!(DockerLogsEventsReceived {
1296            byte_size: log.estimated_json_encoded_size_of(),
1297            container_id: self.id.as_str(),
1298            container_name: &self.metadata.name_str
1299        });
1300
1301        Some(log)
1302    }
1303}
1304
1305struct ContainerMetadata {
1306    /// label.key -> String
1307    labels: HashMap<String, String>,
1308    /// name -> String
1309    name: Value,
1310    /// name
1311    name_str: String,
1312    /// image -> String
1313    image: Value,
1314    /// created_at
1315    created_at: DateTime<Utc>,
1316}
1317
1318impl ContainerMetadata {
1319    fn from_details(details: ContainerInspectResponse) -> Result<Self, ParseError> {
1320        let config = details.config.unwrap();
1321        let name = details.name.unwrap();
1322        let created = details.created.unwrap();
1323
1324        let labels = config.labels.unwrap_or_default();
1325
1326        Ok(ContainerMetadata {
1327            labels,
1328            name: name.as_str().trim_start_matches('/').to_owned().into(),
1329            name_str: name,
1330            image: config.image.unwrap().into(),
1331            created_at: created.with_timezone(&Utc),
1332        })
1333    }
1334}
1335
1336fn line_agg_adapter(
1337    inner: impl Stream<Item = LogEvent> + Unpin,
1338    logic: line_agg::Logic<Bytes, LogEvent>,
1339    log_namespace: LogNamespace,
1340) -> impl Stream<Item = LogEvent> {
1341    let line_agg_in = inner.map(move |mut log| {
1342        let message_value = match log_namespace {
1343            LogNamespace::Vector => log
1344                .remove(event_path!())
1345                .expect("`.` must exist in the event"),
1346            LogNamespace::Legacy => log
1347                .remove(
1348                    log_schema()
1349                        .message_key_target_path()
1350                        .expect("global log_schema.message_key to be valid path"),
1351                )
1352                .expect("`message` must exist in the event"),
1353        };
1354        let stream_value = match log_namespace {
1355            LogNamespace::Vector => log
1356                .get(metadata_path!(DockerLogsConfig::NAME, STREAM))
1357                .expect("`docker_logs.stream` must exist in the metadata"),
1358            LogNamespace::Legacy => log
1359                .get(event_path!(STREAM))
1360                .expect("stream must exist in the event"),
1361        };
1362
1363        let stream = stream_value.coerce_to_bytes();
1364        let message = message_value.coerce_to_bytes();
1365        (stream, message, log)
1366    });
1367    let line_agg_out = LineAgg::<_, Bytes, LogEvent>::new(line_agg_in, logic);
1368    line_agg_out.map(move |(_, message, mut log, _)| {
1369        match log_namespace {
1370            LogNamespace::Vector => log.insert(event_path!(), message),
1371            LogNamespace::Legacy => log.insert(
1372                log_schema()
1373                    .message_key_target_path()
1374                    .expect("global log_schema.message_key to be valid path"),
1375                message,
1376            ),
1377        };
1378        log
1379    })
1380}