1use std::{
2 collections::HashMap,
3 convert::TryFrom,
4 future::ready,
5 pin::Pin,
6 sync::{Arc, LazyLock},
7 time::Duration,
8};
9
10use bollard::{
11 Docker,
12 container::LogOutput,
13 errors::Error as DockerError,
14 query_parameters::{
15 EventsOptionsBuilder, InspectContainerOptions, ListContainersOptionsBuilder,
16 LogsOptionsBuilder,
17 },
18 service::{ContainerInspectResponse, EventMessage},
19};
20use bytes::{Buf, Bytes};
21use chrono::{DateTime, FixedOffset, Local, ParseError, Utc};
22use futures::{Stream, StreamExt};
23use serde_with::serde_as;
24use tokio::sync::mpsc;
25use tracing_futures::Instrument;
26use vector_lib::{
27 codecs::{BytesDeserializer, BytesDeserializerConfig},
28 config::{LegacyKey, LogNamespace},
29 configurable::configurable_component,
30 internal_event::{ByteSize, BytesReceived, InternalEventHandle as _, Protocol, Registered},
31 lookup::{
32 OwnedValuePath, PathPrefix, lookup_v2::OptionalValuePath, metadata_path, owned_value_path,
33 path,
34 },
35};
36use vrl::{
37 event_path,
38 value::{Kind, kind::Collection},
39};
40
41use super::util::MultilineConfig;
42use crate::{
43 SourceSender,
44 config::{DataType, SourceConfig, SourceContext, SourceOutput, log_schema},
45 docker::{DockerTlsConfig, docker},
46 event::{self, EstimatedJsonEncodedSizeOf, LogEvent, Value, merge_state::LogEventMergeState},
47 internal_events::{
48 DockerLogsCommunicationError, DockerLogsContainerEventReceived,
49 DockerLogsContainerMetadataFetchError, DockerLogsContainerUnwatch,
50 DockerLogsContainerWatch, DockerLogsEventsReceived,
51 DockerLogsLoggingDriverUnsupportedError, DockerLogsTimestampParseError, StreamClosedError,
52 },
53 line_agg::{self, LineAgg},
54 shutdown::ShutdownSignal,
55};
56
57#[cfg(test)]
58mod tests;
59
60const IMAGE: &str = "image";
61const CREATED_AT: &str = "container_created_at";
62const NAME: &str = "container_name";
63const STREAM: &str = "stream";
64const CONTAINER: &str = "container_id";
65const MIN_HOSTNAME_LENGTH: usize = 6;
67
68static STDERR: LazyLock<Bytes> = LazyLock::new(|| "stderr".into());
69static STDOUT: LazyLock<Bytes> = LazyLock::new(|| "stdout".into());
70static CONSOLE: LazyLock<Bytes> = LazyLock::new(|| "console".into());
71
72#[serde_as]
74#[configurable_component(source("docker_logs", "Collect container logs from a Docker Daemon."))]
75#[derive(Clone, Debug)]
76#[serde(deny_unknown_fields, default)]
77pub struct DockerLogsConfig {
78 host_key: Option<OptionalValuePath>,
84
85 #[configurable(metadata(docs::examples = "http://localhost:2375"))]
93 #[configurable(metadata(docs::examples = "https://localhost:2376"))]
94 #[configurable(metadata(docs::examples = "unix:///var/run/docker.sock"))]
95 #[configurable(metadata(docs::examples = "npipe:////./pipe/docker_engine"))]
96 #[configurable(metadata(docs::examples = "/var/run/docker.sock"))]
97 #[configurable(metadata(docs::examples = "//./pipe/docker_engine"))]
98 docker_host: Option<String>,
99
100 #[configurable(metadata(
112 docs::examples = "exclude_",
113 docs::examples = "exclude_me_0",
114 docs::examples = "ad08cc418cf9"
115 ))]
116 exclude_containers: Option<Vec<String>>, #[configurable(metadata(
128 docs::examples = "include_",
129 docs::examples = "include_me_0",
130 docs::examples = "ad08cc418cf9"
131 ))]
132 include_containers: Option<Vec<String>>, #[configurable(metadata(
138 docs::examples = "org.opencontainers.image.vendor=Vector",
139 docs::examples = "com.mycorp.internal.animal=fish",
140 ))]
141 include_labels: Option<Vec<String>>,
142
143 #[configurable(metadata(docs::examples = "httpd", docs::examples = "redis",))]
147 include_images: Option<Vec<String>>,
148
149 #[serde(default = "default_partial_event_marker_field")]
154 partial_event_marker_field: Option<String>,
155
156 auto_partial_merge: bool,
158
159 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
161 #[serde(default = "default_retry_backoff_secs")]
162 #[configurable(metadata(docs::human_name = "Retry Backoff"))]
163 retry_backoff_secs: Duration,
164
165 #[configurable(derived)]
169 multiline: Option<MultilineConfig>,
170
171 #[configurable(derived)]
172 tls: Option<DockerTlsConfig>,
173
174 #[serde(default)]
176 #[configurable(metadata(docs::hidden))]
177 pub log_namespace: Option<bool>,
178}
179
180impl Default for DockerLogsConfig {
181 fn default() -> Self {
182 Self {
183 host_key: None,
184 docker_host: None,
185 tls: None,
186 exclude_containers: None,
187 include_containers: None,
188 include_labels: None,
189 include_images: None,
190 partial_event_marker_field: default_partial_event_marker_field(),
191 auto_partial_merge: true,
192 multiline: None,
193 retry_backoff_secs: default_retry_backoff_secs(),
194 log_namespace: None,
195 }
196 }
197}
198
199fn default_partial_event_marker_field() -> Option<String> {
200 Some(event::PARTIAL.to_string())
201}
202
203const fn default_retry_backoff_secs() -> Duration {
204 Duration::from_secs(2)
205}
206
207impl DockerLogsConfig {
208 fn container_name_or_id_included<'a>(
209 &self,
210 id: &str,
211 names: impl IntoIterator<Item = &'a str>,
212 ) -> bool {
213 let containers: Vec<String> = names.into_iter().map(Into::into).collect();
214
215 self.include_containers
216 .as_ref()
217 .map(|include_list| Self::name_or_id_matches(id, &containers, include_list))
218 .unwrap_or(true)
219 && !(self
220 .exclude_containers
221 .as_ref()
222 .map(|exclude_list| Self::name_or_id_matches(id, &containers, exclude_list))
223 .unwrap_or(false))
224 }
225
226 fn name_or_id_matches(id: &str, names: &[String], items: &[String]) -> bool {
227 items.iter().any(|flag| id.starts_with(flag))
228 || names
229 .iter()
230 .any(|name| items.iter().any(|item| name.starts_with(item)))
231 }
232
233 fn with_empty_partial_event_marker_field_as_none(mut self) -> Self {
234 if let Some(val) = &self.partial_event_marker_field
235 && val.is_empty()
236 {
237 self.partial_event_marker_field = None;
238 }
239 self
240 }
241}
242
243impl_generate_config_from_default!(DockerLogsConfig);
244
245#[async_trait::async_trait]
246#[typetag::serde(name = "docker_logs")]
247impl SourceConfig for DockerLogsConfig {
248 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
249 let log_namespace = cx.log_namespace(self.log_namespace);
250 let source = DockerLogsSource::new(
251 self.clone().with_empty_partial_event_marker_field_as_none(),
252 cx.out,
253 cx.shutdown.clone(),
254 log_namespace,
255 )?;
256
257 let fut = async move {
259 match source.handle_running_containers().await {
260 Ok(source) => source.run().await,
261 Err(error) => {
262 error!(
263 message = "Listing currently running containers failed.",
264 %error
265 );
266 }
267 }
268 };
269
270 let shutdown = cx.shutdown;
271 Ok(Box::pin(async move {
273 Ok(tokio::select! {
274 _ = fut => {}
275 _ = shutdown => {}
276 })
277 }))
278 }
279
280 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
281 let host_key = self
282 .host_key
283 .clone()
284 .unwrap_or(log_schema().host_key().cloned().into())
285 .path
286 .map(LegacyKey::Overwrite);
287
288 let schema_definition = BytesDeserializerConfig
289 .schema_definition(global_log_namespace.merge(self.log_namespace))
290 .with_source_metadata(
291 Self::NAME,
292 host_key,
293 &owned_value_path!("host"),
294 Kind::bytes().or_undefined(),
295 Some("host"),
296 )
297 .with_source_metadata(
298 Self::NAME,
299 Some(LegacyKey::Overwrite(owned_value_path!(CONTAINER))),
300 &owned_value_path!(CONTAINER),
301 Kind::bytes(),
302 None,
303 )
304 .with_source_metadata(
305 Self::NAME,
306 Some(LegacyKey::Overwrite(owned_value_path!(IMAGE))),
307 &owned_value_path!(IMAGE),
308 Kind::bytes(),
309 None,
310 )
311 .with_source_metadata(
312 Self::NAME,
313 Some(LegacyKey::Overwrite(owned_value_path!(NAME))),
314 &owned_value_path!(NAME),
315 Kind::bytes(),
316 None,
317 )
318 .with_source_metadata(
319 Self::NAME,
320 Some(LegacyKey::Overwrite(owned_value_path!(CREATED_AT))),
321 &owned_value_path!(CREATED_AT),
322 Kind::timestamp(),
323 None,
324 )
325 .with_source_metadata(
326 Self::NAME,
327 Some(LegacyKey::Overwrite(owned_value_path!("label"))),
328 &owned_value_path!("labels"),
329 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
330 None,
331 )
332 .with_source_metadata(
333 Self::NAME,
334 Some(LegacyKey::Overwrite(owned_value_path!(STREAM))),
335 &owned_value_path!(STREAM),
336 Kind::bytes(),
337 None,
338 )
339 .with_source_metadata(
340 Self::NAME,
341 log_schema()
342 .timestamp_key()
343 .cloned()
344 .map(LegacyKey::Overwrite),
345 &owned_value_path!("timestamp"),
346 Kind::timestamp(),
347 Some("timestamp"),
348 )
349 .with_vector_metadata(
350 log_schema().source_type_key(),
351 &owned_value_path!("source_type"),
352 Kind::bytes(),
353 None,
354 )
355 .with_vector_metadata(
356 None,
357 &owned_value_path!("ingest_timestamp"),
358 Kind::timestamp(),
359 None,
360 );
361
362 vec![SourceOutput::new_maybe_logs(
363 DataType::Log,
364 schema_definition,
365 )]
366 }
367
368 fn can_acknowledge(&self) -> bool {
369 false
370 }
371}
372
373struct DockerLogsSourceCore {
374 config: DockerLogsConfig,
375 line_agg_config: Option<line_agg::Config>,
376 docker: Docker,
377 now_timestamp: DateTime<Utc>,
379}
380
381impl DockerLogsSourceCore {
382 fn new(config: DockerLogsConfig) -> crate::Result<Self> {
383 let docker = docker(config.docker_host.clone(), config.tls.clone())?;
386
387 let now = Local::now();
389 info!(
390 message = "Capturing logs from now on.",
391 now = %now.to_rfc3339()
392 );
393
394 let line_agg_config = if let Some(ref multiline_config) = config.multiline {
395 Some(line_agg::Config::try_from(multiline_config)?)
396 } else {
397 None
398 };
399
400 Ok(DockerLogsSourceCore {
401 config,
402 line_agg_config,
403 docker,
404 now_timestamp: now.into(),
405 })
406 }
407
408 fn docker_logs_event_stream(
410 &self,
411 ) -> impl Stream<Item = Result<EventMessage, DockerError>> + Send + use<> {
412 let mut filters = HashMap::new();
413
414 filters.insert(
421 "event".to_owned(),
422 vec![
423 "start".to_owned(),
424 "unpause".to_owned(),
425 "die".to_owned(),
426 "pause".to_owned(),
427 ],
428 );
429 filters.insert("type".to_owned(), vec!["container".to_owned()]);
430
431 if let Some(include_labels) = &self.config.include_labels {
433 filters.insert("label".to_owned(), include_labels.clone());
434 }
435
436 if let Some(include_images) = &self.config.include_images {
437 filters.insert("image".to_owned(), include_images.clone());
438 }
439
440 self.docker.events(Some(
441 EventsOptionsBuilder::new()
442 .since(&self.now_timestamp.timestamp().to_string())
443 .filters(&filters)
444 .build(),
445 ))
446 }
447}
448
449struct DockerLogsSource {
461 esb: EventStreamBuilder,
462 events: Pin<Box<dyn Stream<Item = Result<EventMessage, DockerError>> + Send>>,
464 containers: HashMap<ContainerId, ContainerState>,
466 main_recv: mpsc::UnboundedReceiver<Result<ContainerLogInfo, (ContainerId, ErrorPersistence)>>,
468 hostname: Option<String>,
470 backoff_duration: Duration,
471}
472
473impl DockerLogsSource {
474 fn new(
475 config: DockerLogsConfig,
476 out: SourceSender,
477 shutdown: ShutdownSignal,
478 log_namespace: LogNamespace,
479 ) -> crate::Result<DockerLogsSource> {
480 let backoff_secs = config.retry_backoff_secs;
481
482 let host_key = config
483 .host_key
484 .clone()
485 .unwrap_or(log_schema().host_key().cloned().into());
486 let hostname = crate::get_hostname().ok();
487
488 let core = DockerLogsSourceCore::new(config)?;
490
491 let events = core.docker_logs_event_stream();
493 info!(message = "Listening to docker log events.");
494
495 let (main_send, main_recv) =
497 mpsc::unbounded_channel::<Result<ContainerLogInfo, (ContainerId, ErrorPersistence)>>();
498
499 let esb = EventStreamBuilder {
508 host_key,
509 hostname: hostname.clone(),
510 core: Arc::new(core),
511 out,
512 main_send,
513 shutdown,
514 log_namespace,
515 };
516
517 Ok(DockerLogsSource {
518 esb,
519 events: Box::pin(events),
520 containers: HashMap::new(),
521 main_recv,
522 hostname,
523 backoff_duration: backoff_secs,
524 })
525 }
526
527 async fn handle_running_containers(mut self) -> crate::Result<Self> {
529 let mut filters = HashMap::new();
530
531 if let Some(include_labels) = &self.esb.core.config.include_labels {
533 filters.insert("label".to_owned(), include_labels.clone());
534 }
535
536 if let Some(include_images) = &self.esb.core.config.include_images {
537 filters.insert("ancestor".to_owned(), include_images.clone());
538 }
539
540 self.esb
541 .core
542 .docker
543 .list_containers(Some(
544 ListContainersOptionsBuilder::new()
545 .all(false)
546 .filters(&filters)
547 .build(),
548 ))
549 .await?
550 .into_iter()
551 .for_each(|container| {
552 let id = container.id.unwrap();
553 let names = container.names.unwrap();
554
555 trace!(message = "Found already running container.", id = %id, names = ?names);
556
557 if self.exclude_self(id.as_str()) {
558 info!(message = "Excluded self container.", id = %id);
559 return;
560 }
561
562 if !self.esb.core.config.container_name_or_id_included(
563 id.as_str(),
564 names.iter().map(|s| {
565 let s = s.as_str();
567 if s.starts_with('/') {
568 s.split_at('/'.len_utf8()).1
569 } else {
570 s
571 }
572 }),
573 ) {
574 info!(message = "Excluded container.", id = %id);
575 return;
576 }
577
578 let id = ContainerId::new(id);
579 self.containers.insert(id.clone(), self.esb.start(id, None));
580 });
581
582 Ok(self)
583 }
584
585 async fn run(mut self) {
586 loop {
587 tokio::select! {
588 value = self.main_recv.recv() => {
589 match value {
590 Some(Ok(info)) => {
591 let state = self
592 .containers
593 .get_mut(&info.id)
594 .expect("Every ContainerLogInfo has it's ContainerState");
595 if state.return_info(info) {
596 self.esb.restart(state);
597 }
598 },
599 Some(Err((id,persistence))) => {
600 let state = self
601 .containers
602 .remove(&id)
603 .expect("Every started ContainerId has it's ContainerState");
604 match persistence{
605 ErrorPersistence::Transient => if state.is_running() {
606 let backoff= Some(self.backoff_duration);
607 self.containers.insert(id.clone(), self.esb.start(id, backoff));
608 }
609 ErrorPersistence::Permanent => (),
611 }
612 }
613 None => {
614 error!(message = "The docker_logs source main stream has ended unexpectedly.");
615 info!(message = "Shutting down docker_logs source.");
616 return;
617 }
618 };
619 }
620 value = self.events.next() => {
621 match value {
622 Some(Ok(mut event)) => {
623 let action = event.action.unwrap();
624 let actor = event.actor.take().unwrap();
625 let id = actor.id.unwrap();
626 let attributes = actor.attributes.unwrap();
627
628 emit!(DockerLogsContainerEventReceived { container_id: &id, action: &action });
629
630 let id = ContainerId::new(id.to_owned());
631
632 match action.as_str() {
634 "die" | "pause" => {
635 if let Some(state) = self.containers.get_mut(&id) {
636 state.stopped();
637 }
638 }
639 "start" | "unpause" => {
640 if let Some(state) = self.containers.get_mut(&id) {
641 state.running();
642 self.esb.restart(state);
643 } else {
644 let include_name =
645 self.esb.core.config.container_name_or_id_included(
646 id.as_str(),
647 attributes.get("name").map(|s| s.as_str()),
648 );
649
650 let exclude_self = self.exclude_self(id.as_str());
651
652 if include_name && !exclude_self {
653 self.containers.insert(id.clone(), self.esb.start(id, None));
654 }
655 }
656 }
657 _ => {},
658 };
659 }
660 Some(Err(error)) => {
661 emit!(DockerLogsCommunicationError {
662 error,
663 container_id: None,
664 });
665 return;
666 },
667 None => {
668 error!(message = "Docker log event stream has ended unexpectedly.");
670 info!(message = "Shutting down docker_logs source.");
671 return;
672 }
673 };
674 }
675 };
676 }
677 }
678
679 fn exclude_self(&self, id: &str) -> bool {
680 self.hostname
681 .as_ref()
682 .map(|hostname| id.starts_with(hostname) && hostname.len() >= MIN_HOSTNAME_LENGTH)
683 .unwrap_or(false)
684 }
685}
686
687#[derive(Clone)]
689struct EventStreamBuilder {
690 host_key: OptionalValuePath,
691 hostname: Option<String>,
692 core: Arc<DockerLogsSourceCore>,
693 out: SourceSender,
695 main_send: mpsc::UnboundedSender<Result<ContainerLogInfo, (ContainerId, ErrorPersistence)>>,
697 shutdown: ShutdownSignal,
699 log_namespace: LogNamespace,
700}
701
702impl EventStreamBuilder {
703 fn start(&self, id: ContainerId, backoff: Option<Duration>) -> ContainerState {
705 let this = self.clone();
706 tokio::spawn(
707 async move {
708 if let Some(duration) = backoff {
709 tokio::time::sleep(duration).await;
710 }
711
712 match this
713 .core
714 .docker
715 .inspect_container(id.as_str(), None::<InspectContainerOptions>)
716 .await
717 {
718 Ok(details) => match ContainerMetadata::from_details(details) {
719 Ok(metadata) => {
720 let info = ContainerLogInfo::new(id, metadata, this.core.now_timestamp);
721 this.run_event_stream(info).await;
722 return;
723 }
724 Err(error) => emit!(DockerLogsTimestampParseError {
725 error,
726 container_id: id.as_str()
727 }),
728 },
729 Err(error) => emit!(DockerLogsContainerMetadataFetchError {
730 error,
731 container_id: id.as_str()
732 }),
733 }
734
735 this.finish(Err((id, ErrorPersistence::Transient)));
736 }
737 .in_current_span(),
738 );
739
740 ContainerState::new_running()
741 }
742
743 fn restart(&self, container: &mut ContainerState) {
745 if let Some(info) = container.take_info() {
746 let this = self.clone();
747 tokio::spawn(this.run_event_stream(info).in_current_span());
748 }
749 }
750
751 async fn run_event_stream(mut self, mut info: ContainerLogInfo) {
752 let options = Some(
754 LogsOptionsBuilder::new()
755 .follow(true)
756 .stdout(true)
757 .stderr(true)
758 .since(info.log_since() as i32) .timestamps(true)
760 .build(),
761 );
762
763 let stream = self.core.docker.logs(info.id.as_str(), options);
764 emit!(DockerLogsContainerWatch {
765 container_id: info.id.as_str()
766 });
767
768 let mut partial_event_merge_state = None;
770
771 let core = Arc::clone(&self.core);
772
773 let bytes_received = register!(BytesReceived::from(Protocol::HTTP));
774
775 let mut error = None;
776 let events_stream = stream
777 .map(|value| {
778 match value {
779 Ok(message) => Ok(info.new_event(
780 message,
781 core.config.partial_event_marker_field.clone(),
782 core.config.auto_partial_merge,
783 &mut partial_event_merge_state,
784 &bytes_received,
785 self.log_namespace,
786 )),
787 Err(error) => {
788 match &error {
790 DockerError::DockerResponseServerError { status_code, .. }
791 if *status_code == http::StatusCode::NOT_IMPLEMENTED =>
792 {
793 emit!(DockerLogsLoggingDriverUnsupportedError {
794 error,
795 container_id: info.id.as_str(),
796 });
797 Err(ErrorPersistence::Permanent)
798 }
799 _ => {
800 emit!(DockerLogsCommunicationError {
801 error,
802 container_id: Some(info.id.as_str())
803 });
804 Err(ErrorPersistence::Transient)
805 }
806 }
807 }
808 }
809 })
810 .take_while(|v| {
811 error = v.as_ref().err().cloned();
812 ready(v.is_ok())
813 })
814 .filter_map(|v| ready(v.ok().flatten()))
815 .take_until(self.shutdown.clone());
816
817 let events_stream: Box<dyn Stream<Item = LogEvent> + Unpin + Send> =
818 if let Some(ref line_agg_config) = core.line_agg_config {
819 Box::new(line_agg_adapter(
820 events_stream,
821 line_agg::Logic::new(line_agg_config.clone()),
822 self.log_namespace,
823 ))
824 } else {
825 Box::new(events_stream)
826 };
827
828 let host_key = self.host_key.clone().path;
829 let hostname = self.hostname.clone();
830 let result = {
831 let mut stream = events_stream
832 .map(move |event| add_hostname(event, &host_key, &hostname, self.log_namespace));
833 self.out.send_event_stream(&mut stream).await.map_err(|_| {
834 let (count, _) = stream.size_hint();
835 emit!(StreamClosedError { count });
836 })
837 };
838
839 emit!(DockerLogsContainerUnwatch {
841 container_id: info.id.as_str()
842 });
843
844 let result = match (result, error) {
845 (Ok(()), None) => Ok(info),
846 (Err(()), _) => Err((info.id, ErrorPersistence::Permanent)),
847 (_, Some(occurrence)) => Err((info.id, occurrence)),
848 };
849
850 self.finish(result);
851 }
852
853 fn finish(self, result: Result<ContainerLogInfo, (ContainerId, ErrorPersistence)>) {
854 _ = self.main_send.send(result);
857 }
858}
859
860fn add_hostname(
861 mut log: LogEvent,
862 host_key: &Option<OwnedValuePath>,
863 hostname: &Option<String>,
864 log_namespace: LogNamespace,
865) -> LogEvent {
866 if let Some(hostname) = hostname {
867 let legacy_host_key = host_key.as_ref().map(LegacyKey::Overwrite);
868
869 log_namespace.insert_source_metadata(
870 DockerLogsConfig::NAME,
871 &mut log,
872 legacy_host_key,
873 path!("host"),
874 hostname.clone(),
875 );
876 }
877
878 log
879}
880
881#[derive(Copy, Clone, Debug, Eq, PartialEq)]
882enum ErrorPersistence {
883 Transient,
884 Permanent,
885}
886
887#[derive(Hash, Clone, Eq, PartialEq, Ord, PartialOrd)]
890struct ContainerId(Bytes);
891
892impl ContainerId {
893 fn new(id: String) -> Self {
894 ContainerId(id.into())
895 }
896
897 fn as_str(&self) -> &str {
898 std::str::from_utf8(&self.0).expect("Container Id Bytes aren't String")
899 }
900}
901
902struct ContainerState {
904 info: Option<ContainerLogInfo>,
906 running: bool,
908 generation: u64,
910}
911
912impl ContainerState {
913 const fn new_running() -> Self {
915 ContainerState {
916 info: None,
917 running: true,
918 generation: 0,
919 }
920 }
921
922 const fn running(&mut self) {
923 self.running = true;
924 self.generation += 1;
925 }
926
927 const fn stopped(&mut self) {
928 self.running = false;
929 }
930
931 const fn is_running(&self) -> bool {
932 self.running
933 }
934
935 #[must_use]
937 fn return_info(&mut self, info: ContainerLogInfo) -> bool {
938 debug_assert!(self.info.is_none());
939 let restart = self.running || info.generation < self.generation;
942 self.info = Some(info);
943 restart
944 }
945
946 fn take_info(&mut self) -> Option<ContainerLogInfo> {
947 self.info.take().map(|mut info| {
948 info.generation = self.generation;
950 info
951 })
952 }
953}
954
955struct ContainerLogInfo {
957 id: ContainerId,
959 created: DateTime<Utc>,
961 last_log: Option<(DateTime<FixedOffset>, u64)>,
963 generation: u64,
965 metadata: ContainerMetadata,
966}
967
968impl ContainerLogInfo {
969 const fn new(id: ContainerId, metadata: ContainerMetadata, created: DateTime<Utc>) -> Self {
972 ContainerLogInfo {
973 id,
974 created,
975 last_log: None,
976 generation: 0,
977 metadata,
978 }
979 }
980
981 fn log_since(&self) -> i64 {
983 self.last_log
984 .as_ref()
985 .map(|(d, _)| d.timestamp())
986 .unwrap_or_else(|| self.created.timestamp())
987 - 1
988 }
989
990 fn new_event(
993 &mut self,
994 log_output: LogOutput,
995 partial_event_marker_field: Option<String>,
996 auto_partial_merge: bool,
997 partial_event_merge_state: &mut Option<LogEventMergeState>,
998 bytes_received: &Registered<BytesReceived>,
999 log_namespace: LogNamespace,
1000 ) -> Option<LogEvent> {
1001 let (stream, mut bytes_message) = match log_output {
1002 LogOutput::StdErr { message } => (STDERR.clone(), message),
1003 LogOutput::StdOut { message } => (STDOUT.clone(), message),
1004 LogOutput::Console { message } => (CONSOLE.clone(), message),
1005 LogOutput::StdIn { message: _ } => return None,
1006 };
1007
1008 bytes_received.emit(ByteSize(bytes_message.len()));
1009
1010 let message = String::from_utf8_lossy(&bytes_message);
1011 let mut splitter = message.splitn(2, char::is_whitespace);
1012 let timestamp_str = splitter.next()?;
1013 let timestamp = match DateTime::parse_from_rfc3339(timestamp_str) {
1014 Ok(timestamp) => {
1015 match self.last_log.as_ref() {
1019 Some(&(last, generation)) => {
1020 if last < timestamp || (last == timestamp && generation == self.generation)
1021 {
1022 } else {
1024 trace!(
1028 message = "Received log from previous container generation.",
1029 log_timestamp = %timestamp_str,
1030 last_log_timestamp = %last,
1031 );
1032 return None;
1033 }
1034 }
1035 None => {
1036 if self.created < timestamp.with_timezone(&Utc) {
1037 } else {
1039 trace!(
1042 message = "Received log from before created timestamp.",
1043 log_timestamp = %timestamp_str,
1044 created_timestamp = %self.created
1045 );
1046 return None;
1047 }
1048 }
1049 }
1050
1051 self.last_log = Some((timestamp, self.generation));
1052
1053 let log_len = splitter.next().map(|log| log.len()).unwrap_or(0);
1054 let remove_len = message.len() - log_len;
1055 bytes_message.advance(remove_len);
1056
1057 Some(timestamp.with_timezone(&Utc))
1059 }
1060 Err(error) => {
1061 emit!(DockerLogsTimestampParseError {
1063 error,
1064 container_id: self.id.as_str()
1065 });
1066 None
1068 }
1069 };
1070
1071 let is_partial = if bytes_message
1077 .last()
1078 .map(|&b| b as char == '\n')
1079 .unwrap_or(false)
1080 {
1081 bytes_message.truncate(bytes_message.len() - 1);
1082 if bytes_message
1083 .last()
1084 .map(|&b| b as char == '\r')
1085 .unwrap_or(false)
1086 {
1087 bytes_message.truncate(bytes_message.len() - 1);
1088 }
1089 false
1090 } else {
1091 true
1092 };
1093
1094 let deserializer = BytesDeserializer;
1096 let mut log = deserializer.parse_single(bytes_message, log_namespace);
1097
1098 log_namespace.insert_source_metadata(
1100 DockerLogsConfig::NAME,
1101 &mut log,
1102 Some(LegacyKey::Overwrite(path!(CONTAINER))),
1103 path!(CONTAINER),
1104 self.id.0.clone(),
1105 );
1106 log_namespace.insert_source_metadata(
1108 DockerLogsConfig::NAME,
1109 &mut log,
1110 Some(LegacyKey::Overwrite(path!(IMAGE))),
1111 path!(IMAGE),
1112 self.metadata.image.clone(),
1113 );
1114 log_namespace.insert_source_metadata(
1116 DockerLogsConfig::NAME,
1117 &mut log,
1118 Some(LegacyKey::Overwrite(path!(NAME))),
1119 path!(NAME),
1120 self.metadata.name.clone(),
1121 );
1122 log_namespace.insert_source_metadata(
1124 DockerLogsConfig::NAME,
1125 &mut log,
1126 Some(LegacyKey::Overwrite(path!(CREATED_AT))),
1127 path!(CREATED_AT),
1128 self.metadata.created_at,
1129 );
1130 if !self.metadata.labels.is_empty() {
1132 for (key, value) in self.metadata.labels.iter() {
1133 log_namespace.insert_source_metadata(
1134 DockerLogsConfig::NAME,
1135 &mut log,
1136 Some(LegacyKey::Overwrite(path!("label", key))),
1137 path!("labels", key),
1138 value.clone(),
1139 )
1140 }
1141 }
1142 log_namespace.insert_source_metadata(
1143 DockerLogsConfig::NAME,
1144 &mut log,
1145 Some(LegacyKey::Overwrite(path!(STREAM))),
1146 path!(STREAM),
1147 stream,
1148 );
1149
1150 log_namespace.insert_vector_metadata(
1151 &mut log,
1152 log_schema().source_type_key(),
1153 path!("source_type"),
1154 Bytes::from_static(DockerLogsConfig::NAME.as_bytes()),
1155 );
1156
1157 match log_namespace {
1160 LogNamespace::Vector => {
1161 if let Some(timestamp) = timestamp {
1162 log.insert(
1163 metadata_path!(DockerLogsConfig::NAME, "timestamp"),
1164 timestamp,
1165 );
1166 }
1167
1168 log.insert(metadata_path!("vector", "ingest_timestamp"), Utc::now());
1169 }
1170 LogNamespace::Legacy => {
1171 if let Some(timestamp) = timestamp
1172 && let Some(timestamp_key) = log_schema().timestamp_key()
1173 {
1174 log.try_insert((PathPrefix::Event, timestamp_key), timestamp);
1175 }
1176 }
1177 };
1178
1179 let log = if auto_partial_merge {
1184 if is_partial {
1188 if let Some(partial_event_merge_state) = partial_event_merge_state {
1193 match log_namespace {
1196 LogNamespace::Vector => {
1197 partial_event_merge_state.merge_in_next_event(log, &["."]);
1198 }
1199 LogNamespace::Legacy => {
1200 partial_event_merge_state.merge_in_next_event(
1201 log,
1202 &[log_schema()
1203 .message_key()
1204 .expect("global log_schema.message_key to be valid path")
1205 .to_string()],
1206 );
1207 }
1208 }
1209 } else {
1210 *partial_event_merge_state = Some(LogEventMergeState::new(log));
1211 };
1212 return None;
1213 };
1214
1215 match partial_event_merge_state.take() {
1220 Some(partial_event_merge_state) => match log_namespace {
1223 LogNamespace::Vector => {
1224 partial_event_merge_state.merge_in_final_event(log, &["."])
1225 }
1226 LogNamespace::Legacy => partial_event_merge_state.merge_in_final_event(
1227 log,
1228 &[log_schema()
1229 .message_key()
1230 .expect("global log_schema.message_key to be valid path")
1231 .to_string()],
1232 ),
1233 },
1234 None => log,
1235 }
1236 } else {
1237 if is_partial {
1239 if let Some(partial_event_marker_field) = partial_event_marker_field {
1241 log_namespace.insert_source_metadata(
1242 DockerLogsConfig::NAME,
1243 &mut log,
1244 Some(LegacyKey::Overwrite(path!(
1245 partial_event_marker_field.as_str()
1246 ))),
1247 path!(event::PARTIAL),
1248 true,
1249 );
1250 }
1251 }
1252 log
1254 };
1255
1256 emit!(DockerLogsEventsReceived {
1259 byte_size: log.estimated_json_encoded_size_of(),
1260 container_id: self.id.as_str(),
1261 container_name: &self.metadata.name_str
1262 });
1263
1264 Some(log)
1265 }
1266}
1267
1268struct ContainerMetadata {
1269 labels: HashMap<String, String>,
1271 name: Value,
1273 name_str: String,
1275 image: Value,
1277 created_at: DateTime<Utc>,
1279}
1280
1281impl ContainerMetadata {
1282 fn from_details(details: ContainerInspectResponse) -> Result<Self, ParseError> {
1283 let config = details.config.unwrap();
1284 let name = details.name.unwrap();
1285 let created = details.created.unwrap();
1286
1287 let labels = config.labels.unwrap_or_default();
1288
1289 Ok(ContainerMetadata {
1290 labels,
1291 name: name.as_str().trim_start_matches('/').to_owned().into(),
1292 name_str: name,
1293 image: config.image.unwrap().into(),
1294 created_at: created.with_timezone(&Utc),
1295 })
1296 }
1297}
1298
1299fn line_agg_adapter(
1300 inner: impl Stream<Item = LogEvent> + Unpin,
1301 logic: line_agg::Logic<Bytes, LogEvent>,
1302 log_namespace: LogNamespace,
1303) -> impl Stream<Item = LogEvent> {
1304 let line_agg_in = inner.map(move |mut log| {
1305 let message_value = match log_namespace {
1306 LogNamespace::Vector => log
1307 .remove(event_path!())
1308 .expect("`.` must exist in the event"),
1309 LogNamespace::Legacy => log
1310 .remove(
1311 log_schema()
1312 .message_key_target_path()
1313 .expect("global log_schema.message_key to be valid path"),
1314 )
1315 .expect("`message` must exist in the event"),
1316 };
1317 let stream_value = match log_namespace {
1318 LogNamespace::Vector => log
1319 .get(metadata_path!(DockerLogsConfig::NAME, STREAM))
1320 .expect("`docker_logs.stream` must exist in the metadata"),
1321 LogNamespace::Legacy => log
1322 .get(event_path!(STREAM))
1323 .expect("stream must exist in the event"),
1324 };
1325
1326 let stream = stream_value.coerce_to_bytes();
1327 let message = message_value.coerce_to_bytes();
1328 (stream, message, log)
1329 });
1330 let line_agg_out = LineAgg::<_, Bytes, LogEvent>::new(line_agg_in, logic);
1331 line_agg_out.map(move |(_, message, mut log, _)| {
1332 match log_namespace {
1333 LogNamespace::Vector => log.insert(event_path!(), message),
1334 LogNamespace::Legacy => log.insert(
1335 log_schema()
1336 .message_key_target_path()
1337 .expect("global log_schema.message_key to be valid path"),
1338 message,
1339 ),
1340 };
1341 log
1342 })
1343}