1use std::sync::{Arc, LazyLock};
2use std::{collections::HashMap, convert::TryFrom, future::ready, pin::Pin, time::Duration};
3
4use bollard::query_parameters::{
5 EventsOptionsBuilder, ListContainersOptionsBuilder, LogsOptionsBuilder,
6};
7use bollard::{
8 container::LogOutput,
9 errors::Error as DockerError,
10 query_parameters::InspectContainerOptions,
11 service::{ContainerInspectResponse, EventMessage},
12 Docker,
13};
14use bytes::{Buf, Bytes};
15use chrono::{DateTime, FixedOffset, Local, ParseError, Utc};
16use futures::{Stream, StreamExt};
17use serde_with::serde_as;
18use tokio::sync::mpsc;
19use tracing_futures::Instrument;
20use vector_lib::codecs::{BytesDeserializer, BytesDeserializerConfig};
21use vector_lib::config::{LegacyKey, LogNamespace};
22use vector_lib::configurable::configurable_component;
23use vector_lib::internal_event::{
24 ByteSize, BytesReceived, InternalEventHandle as _, Protocol, Registered,
25};
26use vector_lib::lookup::{
27 lookup_v2::OptionalValuePath, metadata_path, owned_value_path, path, OwnedValuePath, PathPrefix,
28};
29use vrl::event_path;
30use vrl::value::{kind::Collection, Kind};
31
32use super::util::MultilineConfig;
33use crate::{
34 config::{log_schema, DataType, SourceConfig, SourceContext, SourceOutput},
35 docker::{docker, DockerTlsConfig},
36 event::{self, merge_state::LogEventMergeState, EstimatedJsonEncodedSizeOf, LogEvent, Value},
37 internal_events::{
38 DockerLogsCommunicationError, DockerLogsContainerEventReceived,
39 DockerLogsContainerMetadataFetchError, DockerLogsContainerUnwatch,
40 DockerLogsContainerWatch, DockerLogsEventsReceived,
41 DockerLogsLoggingDriverUnsupportedError, DockerLogsTimestampParseError, StreamClosedError,
42 },
43 line_agg::{self, LineAgg},
44 shutdown::ShutdownSignal,
45 SourceSender,
46};
47
48#[cfg(test)]
49mod tests;
50
51const IMAGE: &str = "image";
52const CREATED_AT: &str = "container_created_at";
53const NAME: &str = "container_name";
54const STREAM: &str = "stream";
55const CONTAINER: &str = "container_id";
56const MIN_HOSTNAME_LENGTH: usize = 6;
58
59static STDERR: LazyLock<Bytes> = LazyLock::new(|| "stderr".into());
60static STDOUT: LazyLock<Bytes> = LazyLock::new(|| "stdout".into());
61static CONSOLE: LazyLock<Bytes> = LazyLock::new(|| "console".into());
62
63#[serde_as]
65#[configurable_component(source("docker_logs", "Collect container logs from a Docker Daemon."))]
66#[derive(Clone, Debug)]
67#[serde(deny_unknown_fields, default)]
68pub struct DockerLogsConfig {
69 host_key: Option<OptionalValuePath>,
75
76 #[configurable(metadata(docs::examples = "http://localhost:2375"))]
84 #[configurable(metadata(docs::examples = "https://localhost:2376"))]
85 #[configurable(metadata(docs::examples = "unix:///var/run/docker.sock"))]
86 #[configurable(metadata(docs::examples = "npipe:////./pipe/docker_engine"))]
87 #[configurable(metadata(docs::examples = "/var/run/docker.sock"))]
88 #[configurable(metadata(docs::examples = "//./pipe/docker_engine"))]
89 docker_host: Option<String>,
90
91 #[configurable(metadata(
103 docs::examples = "exclude_",
104 docs::examples = "exclude_me_0",
105 docs::examples = "ad08cc418cf9"
106 ))]
107 exclude_containers: Option<Vec<String>>, #[configurable(metadata(
119 docs::examples = "include_",
120 docs::examples = "include_me_0",
121 docs::examples = "ad08cc418cf9"
122 ))]
123 include_containers: Option<Vec<String>>, #[configurable(metadata(
129 docs::examples = "org.opencontainers.image.vendor=Vector",
130 docs::examples = "com.mycorp.internal.animal=fish",
131 ))]
132 include_labels: Option<Vec<String>>,
133
134 #[configurable(metadata(docs::examples = "httpd", docs::examples = "redis",))]
138 include_images: Option<Vec<String>>,
139
140 #[serde(default = "default_partial_event_marker_field")]
145 partial_event_marker_field: Option<String>,
146
147 auto_partial_merge: bool,
149
150 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
152 #[serde(default = "default_retry_backoff_secs")]
153 #[configurable(metadata(docs::human_name = "Retry Backoff"))]
154 retry_backoff_secs: Duration,
155
156 #[configurable(derived)]
160 multiline: Option<MultilineConfig>,
161
162 #[configurable(derived)]
163 tls: Option<DockerTlsConfig>,
164
165 #[serde(default)]
167 #[configurable(metadata(docs::hidden))]
168 pub log_namespace: Option<bool>,
169}
170
171impl Default for DockerLogsConfig {
172 fn default() -> Self {
173 Self {
174 host_key: None,
175 docker_host: None,
176 tls: None,
177 exclude_containers: None,
178 include_containers: None,
179 include_labels: None,
180 include_images: None,
181 partial_event_marker_field: default_partial_event_marker_field(),
182 auto_partial_merge: true,
183 multiline: None,
184 retry_backoff_secs: default_retry_backoff_secs(),
185 log_namespace: None,
186 }
187 }
188}
189
190fn default_partial_event_marker_field() -> Option<String> {
191 Some(event::PARTIAL.to_string())
192}
193
194const fn default_retry_backoff_secs() -> Duration {
195 Duration::from_secs(2)
196}
197
198impl DockerLogsConfig {
199 fn container_name_or_id_included<'a>(
200 &self,
201 id: &str,
202 names: impl IntoIterator<Item = &'a str>,
203 ) -> bool {
204 let containers: Vec<String> = names.into_iter().map(Into::into).collect();
205
206 self.include_containers
207 .as_ref()
208 .map(|include_list| Self::name_or_id_matches(id, &containers, include_list))
209 .unwrap_or(true)
210 && !(self
211 .exclude_containers
212 .as_ref()
213 .map(|exclude_list| Self::name_or_id_matches(id, &containers, exclude_list))
214 .unwrap_or(false))
215 }
216
217 fn name_or_id_matches(id: &str, names: &[String], items: &[String]) -> bool {
218 items.iter().any(|flag| id.starts_with(flag))
219 || names
220 .iter()
221 .any(|name| items.iter().any(|item| name.starts_with(item)))
222 }
223
224 fn with_empty_partial_event_marker_field_as_none(mut self) -> Self {
225 if let Some(val) = &self.partial_event_marker_field {
226 if val.is_empty() {
227 self.partial_event_marker_field = None;
228 }
229 }
230 self
231 }
232}
233
234impl_generate_config_from_default!(DockerLogsConfig);
235
236#[async_trait::async_trait]
237#[typetag::serde(name = "docker_logs")]
238impl SourceConfig for DockerLogsConfig {
239 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
240 let log_namespace = cx.log_namespace(self.log_namespace);
241 let source = DockerLogsSource::new(
242 self.clone().with_empty_partial_event_marker_field_as_none(),
243 cx.out,
244 cx.shutdown.clone(),
245 log_namespace,
246 )?;
247
248 let fut = async move {
250 match source.handle_running_containers().await {
251 Ok(source) => source.run().await,
252 Err(error) => {
253 error!(
254 message = "Listing currently running containers failed.",
255 %error
256 );
257 }
258 }
259 };
260
261 let shutdown = cx.shutdown;
262 Ok(Box::pin(async move {
264 Ok(tokio::select! {
265 _ = fut => {}
266 _ = shutdown => {}
267 })
268 }))
269 }
270
271 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
272 let host_key = self
273 .host_key
274 .clone()
275 .unwrap_or(log_schema().host_key().cloned().into())
276 .path
277 .map(LegacyKey::Overwrite);
278
279 let schema_definition = BytesDeserializerConfig
280 .schema_definition(global_log_namespace.merge(self.log_namespace))
281 .with_source_metadata(
282 Self::NAME,
283 host_key,
284 &owned_value_path!("host"),
285 Kind::bytes().or_undefined(),
286 Some("host"),
287 )
288 .with_source_metadata(
289 Self::NAME,
290 Some(LegacyKey::Overwrite(owned_value_path!(CONTAINER))),
291 &owned_value_path!(CONTAINER),
292 Kind::bytes(),
293 None,
294 )
295 .with_source_metadata(
296 Self::NAME,
297 Some(LegacyKey::Overwrite(owned_value_path!(IMAGE))),
298 &owned_value_path!(IMAGE),
299 Kind::bytes(),
300 None,
301 )
302 .with_source_metadata(
303 Self::NAME,
304 Some(LegacyKey::Overwrite(owned_value_path!(NAME))),
305 &owned_value_path!(NAME),
306 Kind::bytes(),
307 None,
308 )
309 .with_source_metadata(
310 Self::NAME,
311 Some(LegacyKey::Overwrite(owned_value_path!(CREATED_AT))),
312 &owned_value_path!(CREATED_AT),
313 Kind::timestamp(),
314 None,
315 )
316 .with_source_metadata(
317 Self::NAME,
318 Some(LegacyKey::Overwrite(owned_value_path!("label"))),
319 &owned_value_path!("labels"),
320 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
321 None,
322 )
323 .with_source_metadata(
324 Self::NAME,
325 Some(LegacyKey::Overwrite(owned_value_path!(STREAM))),
326 &owned_value_path!(STREAM),
327 Kind::bytes(),
328 None,
329 )
330 .with_source_metadata(
331 Self::NAME,
332 log_schema()
333 .timestamp_key()
334 .cloned()
335 .map(LegacyKey::Overwrite),
336 &owned_value_path!("timestamp"),
337 Kind::timestamp(),
338 Some("timestamp"),
339 )
340 .with_vector_metadata(
341 log_schema().source_type_key(),
342 &owned_value_path!("source_type"),
343 Kind::bytes(),
344 None,
345 )
346 .with_vector_metadata(
347 None,
348 &owned_value_path!("ingest_timestamp"),
349 Kind::timestamp(),
350 None,
351 );
352
353 vec![SourceOutput::new_maybe_logs(
354 DataType::Log,
355 schema_definition,
356 )]
357 }
358
359 fn can_acknowledge(&self) -> bool {
360 false
361 }
362}
363
364struct DockerLogsSourceCore {
365 config: DockerLogsConfig,
366 line_agg_config: Option<line_agg::Config>,
367 docker: Docker,
368 now_timestamp: DateTime<Utc>,
370}
371
372impl DockerLogsSourceCore {
373 fn new(config: DockerLogsConfig) -> crate::Result<Self> {
374 let docker = docker(config.docker_host.clone(), config.tls.clone())?;
377
378 let now = Local::now();
380 info!(
381 message = "Capturing logs from now on.",
382 now = %now.to_rfc3339()
383 );
384
385 let line_agg_config = if let Some(ref multiline_config) = config.multiline {
386 Some(line_agg::Config::try_from(multiline_config)?)
387 } else {
388 None
389 };
390
391 Ok(DockerLogsSourceCore {
392 config,
393 line_agg_config,
394 docker,
395 now_timestamp: now.into(),
396 })
397 }
398
399 fn docker_logs_event_stream(
401 &self,
402 ) -> impl Stream<Item = Result<EventMessage, DockerError>> + Send + use<> {
403 let mut filters = HashMap::new();
404
405 filters.insert(
412 "event".to_owned(),
413 vec![
414 "start".to_owned(),
415 "unpause".to_owned(),
416 "die".to_owned(),
417 "pause".to_owned(),
418 ],
419 );
420 filters.insert("type".to_owned(), vec!["container".to_owned()]);
421
422 if let Some(include_labels) = &self.config.include_labels {
424 filters.insert("label".to_owned(), include_labels.clone());
425 }
426
427 if let Some(include_images) = &self.config.include_images {
428 filters.insert("image".to_owned(), include_images.clone());
429 }
430
431 self.docker.events(Some(
432 EventsOptionsBuilder::new()
433 .since(&self.now_timestamp.timestamp().to_string())
434 .filters(&filters)
435 .build(),
436 ))
437 }
438}
439
440struct DockerLogsSource {
452 esb: EventStreamBuilder,
453 events: Pin<Box<dyn Stream<Item = Result<EventMessage, DockerError>> + Send>>,
455 containers: HashMap<ContainerId, ContainerState>,
457 main_recv: mpsc::UnboundedReceiver<Result<ContainerLogInfo, (ContainerId, ErrorPersistence)>>,
459 hostname: Option<String>,
461 backoff_duration: Duration,
462}
463
464impl DockerLogsSource {
465 fn new(
466 config: DockerLogsConfig,
467 out: SourceSender,
468 shutdown: ShutdownSignal,
469 log_namespace: LogNamespace,
470 ) -> crate::Result<DockerLogsSource> {
471 let backoff_secs = config.retry_backoff_secs;
472
473 let host_key = config
474 .host_key
475 .clone()
476 .unwrap_or(log_schema().host_key().cloned().into());
477 let hostname = crate::get_hostname().ok();
478
479 let core = DockerLogsSourceCore::new(config)?;
481
482 let events = core.docker_logs_event_stream();
484 info!(message = "Listening to docker log events.");
485
486 let (main_send, main_recv) =
488 mpsc::unbounded_channel::<Result<ContainerLogInfo, (ContainerId, ErrorPersistence)>>();
489
490 let esb = EventStreamBuilder {
499 host_key,
500 hostname: hostname.clone(),
501 core: Arc::new(core),
502 out,
503 main_send,
504 shutdown,
505 log_namespace,
506 };
507
508 Ok(DockerLogsSource {
509 esb,
510 events: Box::pin(events),
511 containers: HashMap::new(),
512 main_recv,
513 hostname,
514 backoff_duration: backoff_secs,
515 })
516 }
517
518 async fn handle_running_containers(mut self) -> crate::Result<Self> {
520 let mut filters = HashMap::new();
521
522 if let Some(include_labels) = &self.esb.core.config.include_labels {
524 filters.insert("label".to_owned(), include_labels.clone());
525 }
526
527 if let Some(include_images) = &self.esb.core.config.include_images {
528 filters.insert("ancestor".to_owned(), include_images.clone());
529 }
530
531 self.esb
532 .core
533 .docker
534 .list_containers(Some(
535 ListContainersOptionsBuilder::new()
536 .all(false)
537 .filters(&filters)
538 .build(),
539 ))
540 .await?
541 .into_iter()
542 .for_each(|container| {
543 let id = container.id.unwrap();
544 let names = container.names.unwrap();
545
546 trace!(message = "Found already running container.", id = %id, names = ?names);
547
548 if self.exclude_self(id.as_str()) {
549 info!(message = "Excluded self container.", id = %id);
550 return;
551 }
552
553 if !self.esb.core.config.container_name_or_id_included(
554 id.as_str(),
555 names.iter().map(|s| {
556 let s = s.as_str();
558 if s.starts_with('/') {
559 s.split_at('/'.len_utf8()).1
560 } else {
561 s
562 }
563 }),
564 ) {
565 info!(message = "Excluded container.", id = %id);
566 return;
567 }
568
569 let id = ContainerId::new(id);
570 self.containers.insert(id.clone(), self.esb.start(id, None));
571 });
572
573 Ok(self)
574 }
575
576 async fn run(mut self) {
577 loop {
578 tokio::select! {
579 value = self.main_recv.recv() => {
580 match value {
581 Some(Ok(info)) => {
582 let state = self
583 .containers
584 .get_mut(&info.id)
585 .expect("Every ContainerLogInfo has it's ContainerState");
586 if state.return_info(info) {
587 self.esb.restart(state);
588 }
589 },
590 Some(Err((id,persistence))) => {
591 let state = self
592 .containers
593 .remove(&id)
594 .expect("Every started ContainerId has it's ContainerState");
595 match persistence{
596 ErrorPersistence::Transient => if state.is_running() {
597 let backoff= Some(self.backoff_duration);
598 self.containers.insert(id.clone(), self.esb.start(id, backoff));
599 }
600 ErrorPersistence::Permanent => (),
602 }
603 }
604 None => {
605 error!(message = "The docker_logs source main stream has ended unexpectedly.");
606 info!(message = "Shutting down docker_logs source.");
607 return;
608 }
609 };
610 }
611 value = self.events.next() => {
612 match value {
613 Some(Ok(mut event)) => {
614 let action = event.action.unwrap();
615 let actor = event.actor.take().unwrap();
616 let id = actor.id.unwrap();
617 let attributes = actor.attributes.unwrap();
618
619 emit!(DockerLogsContainerEventReceived { container_id: &id, action: &action });
620
621 let id = ContainerId::new(id.to_owned());
622
623 match action.as_str() {
625 "die" | "pause" => {
626 if let Some(state) = self.containers.get_mut(&id) {
627 state.stopped();
628 }
629 }
630 "start" | "unpause" => {
631 if let Some(state) = self.containers.get_mut(&id) {
632 state.running();
633 self.esb.restart(state);
634 } else {
635 let include_name =
636 self.esb.core.config.container_name_or_id_included(
637 id.as_str(),
638 attributes.get("name").map(|s| s.as_str()),
639 );
640
641 let exclude_self = self.exclude_self(id.as_str());
642
643 if include_name && !exclude_self {
644 self.containers.insert(id.clone(), self.esb.start(id, None));
645 }
646 }
647 }
648 _ => {},
649 };
650 }
651 Some(Err(error)) => {
652 emit!(DockerLogsCommunicationError {
653 error,
654 container_id: None,
655 });
656 return;
657 },
658 None => {
659 error!(message = "Docker log event stream has ended unexpectedly.");
661 info!(message = "Shutting down docker_logs source.");
662 return;
663 }
664 };
665 }
666 };
667 }
668 }
669
670 fn exclude_self(&self, id: &str) -> bool {
671 self.hostname
672 .as_ref()
673 .map(|hostname| id.starts_with(hostname) && hostname.len() >= MIN_HOSTNAME_LENGTH)
674 .unwrap_or(false)
675 }
676}
677
678#[derive(Clone)]
680struct EventStreamBuilder {
681 host_key: OptionalValuePath,
682 hostname: Option<String>,
683 core: Arc<DockerLogsSourceCore>,
684 out: SourceSender,
686 main_send: mpsc::UnboundedSender<Result<ContainerLogInfo, (ContainerId, ErrorPersistence)>>,
688 shutdown: ShutdownSignal,
690 log_namespace: LogNamespace,
691}
692
693impl EventStreamBuilder {
694 fn start(&self, id: ContainerId, backoff: Option<Duration>) -> ContainerState {
696 let this = self.clone();
697 tokio::spawn(
698 async move {
699 if let Some(duration) = backoff {
700 tokio::time::sleep(duration).await;
701 }
702
703 match this
704 .core
705 .docker
706 .inspect_container(id.as_str(), None::<InspectContainerOptions>)
707 .await
708 {
709 Ok(details) => match ContainerMetadata::from_details(details) {
710 Ok(metadata) => {
711 let info = ContainerLogInfo::new(id, metadata, this.core.now_timestamp);
712 this.run_event_stream(info).await;
713 return;
714 }
715 Err(error) => emit!(DockerLogsTimestampParseError {
716 error,
717 container_id: id.as_str()
718 }),
719 },
720 Err(error) => emit!(DockerLogsContainerMetadataFetchError {
721 error,
722 container_id: id.as_str()
723 }),
724 }
725
726 this.finish(Err((id, ErrorPersistence::Transient)));
727 }
728 .in_current_span(),
729 );
730
731 ContainerState::new_running()
732 }
733
734 fn restart(&self, container: &mut ContainerState) {
736 if let Some(info) = container.take_info() {
737 let this = self.clone();
738 tokio::spawn(this.run_event_stream(info).in_current_span());
739 }
740 }
741
742 async fn run_event_stream(mut self, mut info: ContainerLogInfo) {
743 let options = Some(
745 LogsOptionsBuilder::new()
746 .follow(true)
747 .stdout(true)
748 .stderr(true)
749 .since(info.log_since() as i32) .timestamps(true)
751 .build(),
752 );
753
754 let stream = self.core.docker.logs(info.id.as_str(), options);
755 emit!(DockerLogsContainerWatch {
756 container_id: info.id.as_str()
757 });
758
759 let mut partial_event_merge_state = None;
761
762 let core = Arc::clone(&self.core);
763
764 let bytes_received = register!(BytesReceived::from(Protocol::HTTP));
765
766 let mut error = None;
767 let events_stream = stream
768 .map(|value| {
769 match value {
770 Ok(message) => Ok(info.new_event(
771 message,
772 core.config.partial_event_marker_field.clone(),
773 core.config.auto_partial_merge,
774 &mut partial_event_merge_state,
775 &bytes_received,
776 self.log_namespace,
777 )),
778 Err(error) => {
779 match &error {
781 DockerError::DockerResponseServerError { status_code, .. }
782 if *status_code == http::StatusCode::NOT_IMPLEMENTED =>
783 {
784 emit!(DockerLogsLoggingDriverUnsupportedError {
785 error,
786 container_id: info.id.as_str(),
787 });
788 Err(ErrorPersistence::Permanent)
789 }
790 _ => {
791 emit!(DockerLogsCommunicationError {
792 error,
793 container_id: Some(info.id.as_str())
794 });
795 Err(ErrorPersistence::Transient)
796 }
797 }
798 }
799 }
800 })
801 .take_while(|v| {
802 error = v.as_ref().err().cloned();
803 ready(v.is_ok())
804 })
805 .filter_map(|v| ready(v.ok().flatten()))
806 .take_until(self.shutdown.clone());
807
808 let events_stream: Box<dyn Stream<Item = LogEvent> + Unpin + Send> =
809 if let Some(ref line_agg_config) = core.line_agg_config {
810 Box::new(line_agg_adapter(
811 events_stream,
812 line_agg::Logic::new(line_agg_config.clone()),
813 self.log_namespace,
814 ))
815 } else {
816 Box::new(events_stream)
817 };
818
819 let host_key = self.host_key.clone().path;
820 let hostname = self.hostname.clone();
821 let result = {
822 let mut stream = events_stream
823 .map(move |event| add_hostname(event, &host_key, &hostname, self.log_namespace));
824 self.out.send_event_stream(&mut stream).await.map_err(|_| {
825 let (count, _) = stream.size_hint();
826 emit!(StreamClosedError { count });
827 })
828 };
829
830 emit!(DockerLogsContainerUnwatch {
832 container_id: info.id.as_str()
833 });
834
835 let result = match (result, error) {
836 (Ok(()), None) => Ok(info),
837 (Err(()), _) => Err((info.id, ErrorPersistence::Permanent)),
838 (_, Some(occurrence)) => Err((info.id, occurrence)),
839 };
840
841 self.finish(result);
842 }
843
844 fn finish(self, result: Result<ContainerLogInfo, (ContainerId, ErrorPersistence)>) {
845 _ = self.main_send.send(result);
848 }
849}
850
851fn add_hostname(
852 mut log: LogEvent,
853 host_key: &Option<OwnedValuePath>,
854 hostname: &Option<String>,
855 log_namespace: LogNamespace,
856) -> LogEvent {
857 if let Some(hostname) = hostname {
858 let legacy_host_key = host_key.as_ref().map(LegacyKey::Overwrite);
859
860 log_namespace.insert_source_metadata(
861 DockerLogsConfig::NAME,
862 &mut log,
863 legacy_host_key,
864 path!("host"),
865 hostname.clone(),
866 );
867 }
868
869 log
870}
871
872#[derive(Copy, Clone, Debug, Eq, PartialEq)]
873enum ErrorPersistence {
874 Transient,
875 Permanent,
876}
877
878#[derive(Hash, Clone, Eq, PartialEq, Ord, PartialOrd)]
881struct ContainerId(Bytes);
882
883impl ContainerId {
884 fn new(id: String) -> Self {
885 ContainerId(id.into())
886 }
887
888 fn as_str(&self) -> &str {
889 std::str::from_utf8(&self.0).expect("Container Id Bytes aren't String")
890 }
891}
892
893struct ContainerState {
895 info: Option<ContainerLogInfo>,
897 running: bool,
899 generation: u64,
901}
902
903impl ContainerState {
904 const fn new_running() -> Self {
906 ContainerState {
907 info: None,
908 running: true,
909 generation: 0,
910 }
911 }
912
913 const fn running(&mut self) {
914 self.running = true;
915 self.generation += 1;
916 }
917
918 const fn stopped(&mut self) {
919 self.running = false;
920 }
921
922 const fn is_running(&self) -> bool {
923 self.running
924 }
925
926 #[must_use]
928 fn return_info(&mut self, info: ContainerLogInfo) -> bool {
929 debug_assert!(self.info.is_none());
930 let restart = self.running || info.generation < self.generation;
933 self.info = Some(info);
934 restart
935 }
936
937 fn take_info(&mut self) -> Option<ContainerLogInfo> {
938 self.info.take().map(|mut info| {
939 info.generation = self.generation;
941 info
942 })
943 }
944}
945
946struct ContainerLogInfo {
948 id: ContainerId,
950 created: DateTime<Utc>,
952 last_log: Option<(DateTime<FixedOffset>, u64)>,
954 generation: u64,
956 metadata: ContainerMetadata,
957}
958
959impl ContainerLogInfo {
960 const fn new(id: ContainerId, metadata: ContainerMetadata, created: DateTime<Utc>) -> Self {
963 ContainerLogInfo {
964 id,
965 created,
966 last_log: None,
967 generation: 0,
968 metadata,
969 }
970 }
971
972 fn log_since(&self) -> i64 {
974 self.last_log
975 .as_ref()
976 .map(|(d, _)| d.timestamp())
977 .unwrap_or_else(|| self.created.timestamp())
978 - 1
979 }
980
981 fn new_event(
984 &mut self,
985 log_output: LogOutput,
986 partial_event_marker_field: Option<String>,
987 auto_partial_merge: bool,
988 partial_event_merge_state: &mut Option<LogEventMergeState>,
989 bytes_received: &Registered<BytesReceived>,
990 log_namespace: LogNamespace,
991 ) -> Option<LogEvent> {
992 let (stream, mut bytes_message) = match log_output {
993 LogOutput::StdErr { message } => (STDERR.clone(), message),
994 LogOutput::StdOut { message } => (STDOUT.clone(), message),
995 LogOutput::Console { message } => (CONSOLE.clone(), message),
996 LogOutput::StdIn { message: _ } => return None,
997 };
998
999 bytes_received.emit(ByteSize(bytes_message.len()));
1000
1001 let message = String::from_utf8_lossy(&bytes_message);
1002 let mut splitter = message.splitn(2, char::is_whitespace);
1003 let timestamp_str = splitter.next()?;
1004 let timestamp = match DateTime::parse_from_rfc3339(timestamp_str) {
1005 Ok(timestamp) => {
1006 match self.last_log.as_ref() {
1010 Some(&(last, generation)) => {
1011 if last < timestamp || (last == timestamp && generation == self.generation)
1012 {
1013 } else {
1015 trace!(
1019 message = "Received log from previous container generation.",
1020 log_timestamp = %timestamp_str,
1021 last_log_timestamp = %last,
1022 );
1023 return None;
1024 }
1025 }
1026 None => {
1027 if self.created < timestamp.with_timezone(&Utc) {
1028 } else {
1030 trace!(
1033 message = "Received log from before created timestamp.",
1034 log_timestamp = %timestamp_str,
1035 created_timestamp = %self.created
1036 );
1037 return None;
1038 }
1039 }
1040 }
1041
1042 self.last_log = Some((timestamp, self.generation));
1043
1044 let log_len = splitter.next().map(|log| log.len()).unwrap_or(0);
1045 let remove_len = message.len() - log_len;
1046 bytes_message.advance(remove_len);
1047
1048 Some(timestamp.with_timezone(&Utc))
1050 }
1051 Err(error) => {
1052 emit!(DockerLogsTimestampParseError {
1054 error,
1055 container_id: self.id.as_str()
1056 });
1057 None
1059 }
1060 };
1061
1062 let is_partial = if bytes_message
1068 .last()
1069 .map(|&b| b as char == '\n')
1070 .unwrap_or(false)
1071 {
1072 bytes_message.truncate(bytes_message.len() - 1);
1073 if bytes_message
1074 .last()
1075 .map(|&b| b as char == '\r')
1076 .unwrap_or(false)
1077 {
1078 bytes_message.truncate(bytes_message.len() - 1);
1079 }
1080 false
1081 } else {
1082 true
1083 };
1084
1085 let deserializer = BytesDeserializer;
1087 let mut log = deserializer.parse_single(bytes_message, log_namespace);
1088
1089 log_namespace.insert_source_metadata(
1091 DockerLogsConfig::NAME,
1092 &mut log,
1093 Some(LegacyKey::Overwrite(path!(CONTAINER))),
1094 path!(CONTAINER),
1095 self.id.0.clone(),
1096 );
1097 log_namespace.insert_source_metadata(
1099 DockerLogsConfig::NAME,
1100 &mut log,
1101 Some(LegacyKey::Overwrite(path!(IMAGE))),
1102 path!(IMAGE),
1103 self.metadata.image.clone(),
1104 );
1105 log_namespace.insert_source_metadata(
1107 DockerLogsConfig::NAME,
1108 &mut log,
1109 Some(LegacyKey::Overwrite(path!(NAME))),
1110 path!(NAME),
1111 self.metadata.name.clone(),
1112 );
1113 log_namespace.insert_source_metadata(
1115 DockerLogsConfig::NAME,
1116 &mut log,
1117 Some(LegacyKey::Overwrite(path!(CREATED_AT))),
1118 path!(CREATED_AT),
1119 self.metadata.created_at,
1120 );
1121 if !self.metadata.labels.is_empty() {
1123 for (key, value) in self.metadata.labels.iter() {
1124 log_namespace.insert_source_metadata(
1125 DockerLogsConfig::NAME,
1126 &mut log,
1127 Some(LegacyKey::Overwrite(path!("label", key))),
1128 path!("labels", key),
1129 value.clone(),
1130 )
1131 }
1132 }
1133 log_namespace.insert_source_metadata(
1134 DockerLogsConfig::NAME,
1135 &mut log,
1136 Some(LegacyKey::Overwrite(path!(STREAM))),
1137 path!(STREAM),
1138 stream,
1139 );
1140
1141 log_namespace.insert_vector_metadata(
1142 &mut log,
1143 log_schema().source_type_key(),
1144 path!("source_type"),
1145 Bytes::from_static(DockerLogsConfig::NAME.as_bytes()),
1146 );
1147
1148 match log_namespace {
1151 LogNamespace::Vector => {
1152 if let Some(timestamp) = timestamp {
1153 log.insert(
1154 metadata_path!(DockerLogsConfig::NAME, "timestamp"),
1155 timestamp,
1156 );
1157 }
1158
1159 log.insert(metadata_path!("vector", "ingest_timestamp"), Utc::now());
1160 }
1161 LogNamespace::Legacy => {
1162 if let Some(timestamp) = timestamp {
1163 if let Some(timestamp_key) = log_schema().timestamp_key() {
1164 log.try_insert((PathPrefix::Event, timestamp_key), timestamp);
1165 }
1166 }
1167 }
1168 };
1169
1170 let log = if auto_partial_merge {
1175 if is_partial {
1179 if let Some(partial_event_merge_state) = partial_event_merge_state {
1184 match log_namespace {
1187 LogNamespace::Vector => {
1188 partial_event_merge_state.merge_in_next_event(log, &["."]);
1189 }
1190 LogNamespace::Legacy => {
1191 partial_event_merge_state.merge_in_next_event(
1192 log,
1193 &[log_schema()
1194 .message_key()
1195 .expect("global log_schema.message_key to be valid path")
1196 .to_string()],
1197 );
1198 }
1199 }
1200 } else {
1201 *partial_event_merge_state = Some(LogEventMergeState::new(log));
1202 };
1203 return None;
1204 };
1205
1206 match partial_event_merge_state.take() {
1211 Some(partial_event_merge_state) => match log_namespace {
1214 LogNamespace::Vector => {
1215 partial_event_merge_state.merge_in_final_event(log, &["."])
1216 }
1217 LogNamespace::Legacy => partial_event_merge_state.merge_in_final_event(
1218 log,
1219 &[log_schema()
1220 .message_key()
1221 .expect("global log_schema.message_key to be valid path")
1222 .to_string()],
1223 ),
1224 },
1225 None => log,
1226 }
1227 } else {
1228 if is_partial {
1230 if let Some(partial_event_marker_field) = partial_event_marker_field {
1232 log_namespace.insert_source_metadata(
1233 DockerLogsConfig::NAME,
1234 &mut log,
1235 Some(LegacyKey::Overwrite(path!(
1236 partial_event_marker_field.as_str()
1237 ))),
1238 path!(event::PARTIAL),
1239 true,
1240 );
1241 }
1242 }
1243 log
1245 };
1246
1247 emit!(DockerLogsEventsReceived {
1250 byte_size: log.estimated_json_encoded_size_of(),
1251 container_id: self.id.as_str(),
1252 container_name: &self.metadata.name_str
1253 });
1254
1255 Some(log)
1256 }
1257}
1258
1259struct ContainerMetadata {
1260 labels: HashMap<String, String>,
1262 name: Value,
1264 name_str: String,
1266 image: Value,
1268 created_at: DateTime<Utc>,
1270}
1271
1272impl ContainerMetadata {
1273 fn from_details(details: ContainerInspectResponse) -> Result<Self, ParseError> {
1274 let config = details.config.unwrap();
1275 let name = details.name.unwrap();
1276 let created = details.created.unwrap();
1277
1278 let labels = config.labels.unwrap_or_default();
1279
1280 Ok(ContainerMetadata {
1281 labels,
1282 name: name.as_str().trim_start_matches('/').to_owned().into(),
1283 name_str: name,
1284 image: config.image.unwrap().into(),
1285 created_at: created.with_timezone(&Utc),
1286 })
1287 }
1288}
1289
1290fn line_agg_adapter(
1291 inner: impl Stream<Item = LogEvent> + Unpin,
1292 logic: line_agg::Logic<Bytes, LogEvent>,
1293 log_namespace: LogNamespace,
1294) -> impl Stream<Item = LogEvent> {
1295 let line_agg_in = inner.map(move |mut log| {
1296 let message_value = match log_namespace {
1297 LogNamespace::Vector => log
1298 .remove(event_path!())
1299 .expect("`.` must exist in the event"),
1300 LogNamespace::Legacy => log
1301 .remove(
1302 log_schema()
1303 .message_key_target_path()
1304 .expect("global log_schema.message_key to be valid path"),
1305 )
1306 .expect("`message` must exist in the event"),
1307 };
1308 let stream_value = match log_namespace {
1309 LogNamespace::Vector => log
1310 .get(metadata_path!(DockerLogsConfig::NAME, STREAM))
1311 .expect("`docker_logs.stream` must exist in the metadata"),
1312 LogNamespace::Legacy => log
1313 .get(event_path!(STREAM))
1314 .expect("stream must exist in the event"),
1315 };
1316
1317 let stream = stream_value.coerce_to_bytes();
1318 let message = message_value.coerce_to_bytes();
1319 (stream, message, log)
1320 });
1321 let line_agg_out = LineAgg::<_, Bytes, LogEvent>::new(line_agg_in, logic);
1322 line_agg_out.map(move |(_, message, mut log, _)| {
1323 match log_namespace {
1324 LogNamespace::Vector => log.insert(event_path!(), message),
1325 LogNamespace::Legacy => log.insert(
1326 log_schema()
1327 .message_key_target_path()
1328 .expect("global log_schema.message_key to be valid path"),
1329 message,
1330 ),
1331 };
1332 log
1333 })
1334}