vector_common/internal_event/
events_sent.rs

1use std::sync::Arc;
2
3use metrics::{Counter, counter};
4use tracing::trace;
5
6use super::{CountByteSize, OptionalTag, Output, SharedString};
7use crate::config::ComponentKey;
8
9pub const DEFAULT_OUTPUT: &str = "_default";
10
11crate::registered_event!(
12    EventsSent {
13        output: Option<SharedString>,
14    } => {
15        events: Counter = if let Some(output) = &self.output {
16            counter!("component_sent_events_total", "output" => output.clone())
17        } else {
18            counter!("component_sent_events_total")
19        },
20        event_bytes: Counter = if let Some(output) = &self.output {
21            counter!("component_sent_event_bytes_total", "output" => output.clone())
22        } else {
23            counter!("component_sent_event_bytes_total")
24        },
25        output: Option<SharedString> = self.output,
26    }
27
28    fn emit(&self, data: CountByteSize) {
29        let CountByteSize(count, byte_size) = data;
30
31        match &self.output {
32            Some(output) => {
33                trace!(message = "Events sent.", count = %count, byte_size = %byte_size.get(), output = %output);
34            }
35            None => {
36                trace!(message = "Events sent.", count = %count, byte_size = %byte_size.get());
37            }
38        }
39
40        self.events.increment(count as u64);
41        self.event_bytes.increment(byte_size.get() as u64);
42    }
43);
44
45impl From<Output> for EventsSent {
46    fn from(output: Output) -> Self {
47        Self { output: output.0 }
48    }
49}
50
51/// Makes a list of the tags to use with the events sent event.
52fn make_tags(
53    source: &OptionalTag<Arc<ComponentKey>>,
54    service: &OptionalTag<String>,
55) -> Vec<(&'static str, String)> {
56    let mut tags = Vec::new();
57    if let OptionalTag::Specified(tag) = source {
58        tags.push((
59            "source",
60            tag.as_ref()
61                .map_or_else(|| "-".to_string(), |tag| tag.id().to_string()),
62        ));
63    }
64
65    if let OptionalTag::Specified(tag) = service {
66        tags.push(("service", tag.clone().unwrap_or("-".to_string())));
67    }
68
69    tags
70}
71
72crate::registered_event!(
73    TaggedEventsSent {
74        source: OptionalTag<Arc<ComponentKey>>,
75        service: OptionalTag<String>,
76    } => {
77        events: Counter = {
78            counter!("component_sent_events_total", &make_tags(&self.source, &self.service))
79        },
80        event_bytes: Counter = {
81            counter!("component_sent_event_bytes_total", &make_tags(&self.source, &self.service))
82        },
83    }
84
85    fn emit(&self, data: CountByteSize) {
86        let CountByteSize(count, byte_size) = data;
87        trace!(message = "Events sent.", %count, %byte_size);
88
89        self.events.increment(count as u64);
90        self.event_bytes.increment(byte_size.get() as u64);
91    }
92
93    fn register(_fixed: (), tags: TaggedEventsSent) {
94        super::register(tags)
95    }
96);
97
98impl TaggedEventsSent {
99    #[must_use]
100    pub fn new_empty() -> Self {
101        Self {
102            source: OptionalTag::Specified(None),
103            service: OptionalTag::Specified(None),
104        }
105    }
106
107    #[must_use]
108    pub fn new_unspecified() -> Self {
109        Self {
110            source: OptionalTag::Ignored,
111            service: OptionalTag::Ignored,
112        }
113    }
114}