vector_common/internal_event/
events_sent.rs

1use std::sync::Arc;
2
3use metrics::{counter, Counter};
4use tracing::trace;
5
6use crate::config::ComponentKey;
7
8use super::{CountByteSize, OptionalTag, Output, SharedString};
9
10pub const DEFAULT_OUTPUT: &str = "_default";
11
12crate::registered_event!(
13    EventsSent {
14        output: Option<SharedString>,
15    } => {
16        events: Counter = if let Some(output) = &self.output {
17            counter!("component_sent_events_total", "output" => output.clone())
18        } else {
19            counter!("component_sent_events_total")
20        },
21        event_bytes: Counter = if let Some(output) = &self.output {
22            counter!("component_sent_event_bytes_total", "output" => output.clone())
23        } else {
24            counter!("component_sent_event_bytes_total")
25        },
26        output: Option<SharedString> = self.output,
27    }
28
29    fn emit(&self, data: CountByteSize) {
30        let CountByteSize(count, byte_size) = data;
31
32        match &self.output {
33            Some(output) => {
34                trace!(message = "Events sent.", count = %count, byte_size = %byte_size.get(), output = %output);
35            }
36            None => {
37                trace!(message = "Events sent.", count = %count, byte_size = %byte_size.get());
38            }
39        }
40
41        self.events.increment(count as u64);
42        self.event_bytes.increment(byte_size.get() as u64);
43    }
44);
45
46impl From<Output> for EventsSent {
47    fn from(output: Output) -> Self {
48        Self { output: output.0 }
49    }
50}
51
52/// Makes a list of the tags to use with the events sent event.
53fn make_tags(
54    source: &OptionalTag<Arc<ComponentKey>>,
55    service: &OptionalTag<String>,
56) -> Vec<(&'static str, String)> {
57    let mut tags = Vec::new();
58    if let OptionalTag::Specified(tag) = source {
59        tags.push((
60            "source",
61            tag.as_ref()
62                .map_or_else(|| "-".to_string(), |tag| tag.id().to_string()),
63        ));
64    }
65
66    if let OptionalTag::Specified(tag) = service {
67        tags.push(("service", tag.clone().unwrap_or("-".to_string())));
68    }
69
70    tags
71}
72
73crate::registered_event!(
74    TaggedEventsSent {
75        source: OptionalTag<Arc<ComponentKey>>,
76        service: OptionalTag<String>,
77    } => {
78        events: Counter = {
79            counter!("component_sent_events_total", &make_tags(&self.source, &self.service))
80        },
81        event_bytes: Counter = {
82            counter!("component_sent_event_bytes_total", &make_tags(&self.source, &self.service))
83        },
84    }
85
86    fn emit(&self, data: CountByteSize) {
87        let CountByteSize(count, byte_size) = data;
88        trace!(message = "Events sent.", %count, %byte_size);
89
90        self.events.increment(count as u64);
91        self.event_bytes.increment(byte_size.get() as u64);
92    }
93
94    fn register(_fixed: (), tags: TaggedEventsSent) {
95        super::register(tags)
96    }
97);
98
99impl TaggedEventsSent {
100    #[must_use]
101    pub fn new_empty() -> Self {
102        Self {
103            source: OptionalTag::Specified(None),
104            service: OptionalTag::Specified(None),
105        }
106    }
107
108    #[must_use]
109    pub fn new_unspecified() -> Self {
110        Self {
111            source: OptionalTag::Ignored,
112            service: OptionalTag::Ignored,
113        }
114    }
115}