vector_common/internal_event/
events_sent.rs1use std::sync::Arc;
2
3use metrics::{Counter, counter};
4use tracing::trace;
5
6use super::{CountByteSize, OptionalTag, Output, SharedString};
7use crate::config::ComponentKey;
8
9pub const DEFAULT_OUTPUT: &str = "_default";
10
11crate::registered_event!(
12 EventsSent {
13 output: Option<SharedString>,
14 } => {
15 events: Counter = if let Some(output) = &self.output {
16 counter!("component_sent_events_total", "output" => output.clone())
17 } else {
18 counter!("component_sent_events_total")
19 },
20 event_bytes: Counter = if let Some(output) = &self.output {
21 counter!("component_sent_event_bytes_total", "output" => output.clone())
22 } else {
23 counter!("component_sent_event_bytes_total")
24 },
25 output: Option<SharedString> = self.output,
26 }
27
28 fn emit(&self, data: CountByteSize) {
29 let CountByteSize(count, byte_size) = data;
30
31 match &self.output {
32 Some(output) => {
33 trace!(message = "Events sent.", count = %count, byte_size = %byte_size.get(), output = %output);
34 }
35 None => {
36 trace!(message = "Events sent.", count = %count, byte_size = %byte_size.get());
37 }
38 }
39
40 self.events.increment(count as u64);
41 self.event_bytes.increment(byte_size.get() as u64);
42 }
43);
44
45impl From<Output> for EventsSent {
46 fn from(output: Output) -> Self {
47 Self { output: output.0 }
48 }
49}
50
51fn make_tags(
53 source: &OptionalTag<Arc<ComponentKey>>,
54 service: &OptionalTag<String>,
55) -> Vec<(&'static str, String)> {
56 let mut tags = Vec::new();
57 if let OptionalTag::Specified(tag) = source {
58 tags.push((
59 "source",
60 tag.as_ref()
61 .map_or_else(|| "-".to_string(), |tag| tag.id().to_string()),
62 ));
63 }
64
65 if let OptionalTag::Specified(tag) = service {
66 tags.push(("service", tag.clone().unwrap_or("-".to_string())));
67 }
68
69 tags
70}
71
72crate::registered_event!(
73 TaggedEventsSent {
74 source: OptionalTag<Arc<ComponentKey>>,
75 service: OptionalTag<String>,
76 } => {
77 events: Counter = {
78 counter!("component_sent_events_total", &make_tags(&self.source, &self.service))
79 },
80 event_bytes: Counter = {
81 counter!("component_sent_event_bytes_total", &make_tags(&self.source, &self.service))
82 },
83 }
84
85 fn emit(&self, data: CountByteSize) {
86 let CountByteSize(count, byte_size) = data;
87 trace!(message = "Events sent.", %count, %byte_size);
88
89 self.events.increment(count as u64);
90 self.event_bytes.increment(byte_size.get() as u64);
91 }
92
93 fn register(_fixed: (), tags: TaggedEventsSent) {
94 super::register(tags)
95 }
96);
97
98impl TaggedEventsSent {
99 #[must_use]
100 pub fn new_empty() -> Self {
101 Self {
102 source: OptionalTag::Specified(None),
103 service: OptionalTag::Specified(None),
104 }
105 }
106
107 #[must_use]
108 pub fn new_unspecified() -> Self {
109 Self {
110 source: OptionalTag::Ignored,
111 service: OptionalTag::Ignored,
112 }
113 }
114}