vector_common/internal_event/
events_sent.rs1use std::sync::Arc;
2
3use metrics::{counter, Counter};
4use tracing::trace;
5
6use crate::config::ComponentKey;
7
8use super::{CountByteSize, OptionalTag, Output, SharedString};
9
10pub const DEFAULT_OUTPUT: &str = "_default";
11
12crate::registered_event!(
13 EventsSent {
14 output: Option<SharedString>,
15 } => {
16 events: Counter = if let Some(output) = &self.output {
17 counter!("component_sent_events_total", "output" => output.clone())
18 } else {
19 counter!("component_sent_events_total")
20 },
21 event_bytes: Counter = if let Some(output) = &self.output {
22 counter!("component_sent_event_bytes_total", "output" => output.clone())
23 } else {
24 counter!("component_sent_event_bytes_total")
25 },
26 output: Option<SharedString> = self.output,
27 }
28
29 fn emit(&self, data: CountByteSize) {
30 let CountByteSize(count, byte_size) = data;
31
32 match &self.output {
33 Some(output) => {
34 trace!(message = "Events sent.", count = %count, byte_size = %byte_size.get(), output = %output);
35 }
36 None => {
37 trace!(message = "Events sent.", count = %count, byte_size = %byte_size.get());
38 }
39 }
40
41 self.events.increment(count as u64);
42 self.event_bytes.increment(byte_size.get() as u64);
43 }
44);
45
46impl From<Output> for EventsSent {
47 fn from(output: Output) -> Self {
48 Self { output: output.0 }
49 }
50}
51
52fn make_tags(
54 source: &OptionalTag<Arc<ComponentKey>>,
55 service: &OptionalTag<String>,
56) -> Vec<(&'static str, String)> {
57 let mut tags = Vec::new();
58 if let OptionalTag::Specified(tag) = source {
59 tags.push((
60 "source",
61 tag.as_ref()
62 .map_or_else(|| "-".to_string(), |tag| tag.id().to_string()),
63 ));
64 }
65
66 if let OptionalTag::Specified(tag) = service {
67 tags.push(("service", tag.clone().unwrap_or("-".to_string())));
68 }
69
70 tags
71}
72
73crate::registered_event!(
74 TaggedEventsSent {
75 source: OptionalTag<Arc<ComponentKey>>,
76 service: OptionalTag<String>,
77 } => {
78 events: Counter = {
79 counter!("component_sent_events_total", &make_tags(&self.source, &self.service))
80 },
81 event_bytes: Counter = {
82 counter!("component_sent_event_bytes_total", &make_tags(&self.source, &self.service))
83 },
84 }
85
86 fn emit(&self, data: CountByteSize) {
87 let CountByteSize(count, byte_size) = data;
88 trace!(message = "Events sent.", %count, %byte_size);
89
90 self.events.increment(count as u64);
91 self.event_bytes.increment(byte_size.get() as u64);
92 }
93
94 fn register(_fixed: (), tags: TaggedEventsSent) {
95 super::register(tags)
96 }
97);
98
99impl TaggedEventsSent {
100 #[must_use]
101 pub fn new_empty() -> Self {
102 Self {
103 source: OptionalTag::Specified(None),
104 service: OptionalTag::Specified(None),
105 }
106 }
107
108 #[must_use]
109 pub fn new_unspecified() -> Self {
110 Self {
111 source: OptionalTag::Ignored,
112 service: OptionalTag::Ignored,
113 }
114 }
115}