vector_common/internal_event/
mod.rs1mod bytes_received;
2mod bytes_sent;
3pub mod cached_event;
4pub mod component_events_dropped;
5mod events_received;
6mod events_sent;
7mod optional_tag;
8mod prelude;
9pub mod service;
10
11use std::ops::{Add, AddAssign};
12
13pub use metrics::SharedString;
14
15pub use bytes_received::BytesReceived;
16pub use bytes_sent::BytesSent;
17#[allow(clippy::module_name_repetitions)]
18pub use cached_event::{RegisterTaggedInternalEvent, RegisteredEventCache};
19pub use component_events_dropped::{ComponentEventsDropped, INTENTIONAL, UNINTENTIONAL};
20pub use events_received::EventsReceived;
21pub use events_sent::{EventsSent, TaggedEventsSent, DEFAULT_OUTPUT};
22pub use optional_tag::OptionalTag;
23pub use prelude::{error_stage, error_type};
24pub use service::{CallError, PollReadyError};
25
26use crate::json_size::JsonSize;
27
28pub trait InternalEvent: Sized {
29 fn emit(self);
30
31 fn name(&self) -> Option<&'static str> {
33 None
34 }
35}
36
37#[allow(clippy::module_name_repetitions)]
38pub trait RegisterInternalEvent: Sized {
39 type Handle: InternalEventHandle;
40
41 fn register(self) -> Self::Handle;
42
43 fn name(&self) -> Option<&'static str> {
44 None
45 }
46}
47
48#[allow(clippy::module_name_repetitions)]
49pub trait InternalEventHandle: Sized {
50 type Data: Sized;
51 fn emit(&self, data: Self::Data);
52}
53
54pub struct DefaultName<E> {
56 pub name: &'static str,
57 pub event: E,
58}
59
60impl<E: InternalEvent> InternalEvent for DefaultName<E> {
61 fn emit(self) {
62 self.event.emit();
63 }
64
65 fn name(&self) -> Option<&'static str> {
66 Some(self.event.name().unwrap_or(self.name))
67 }
68}
69
70impl<E: RegisterInternalEvent> RegisterInternalEvent for DefaultName<E> {
71 type Handle = E::Handle;
72
73 fn register(self) -> Self::Handle {
74 self.event.register()
75 }
76
77 fn name(&self) -> Option<&'static str> {
78 Some(self.event.name().unwrap_or(self.name))
79 }
80}
81
82#[cfg(any(test, feature = "test"))]
83pub fn emit(event: impl InternalEvent) {
84 if let Some(name) = event.name() {
85 super::event_test_util::record_internal_event(name);
86 }
87 event.emit();
88}
89
90#[cfg(not(any(test, feature = "test")))]
91pub fn emit(event: impl InternalEvent) {
92 event.emit();
93}
94
95#[cfg(any(test, feature = "test"))]
96pub fn register<E: RegisterInternalEvent>(event: E) -> E::Handle {
97 if let Some(name) = event.name() {
98 super::event_test_util::record_internal_event(name);
99 }
100 event.register()
101}
102
103#[cfg(not(any(test, feature = "test")))]
104pub fn register<E: RegisterInternalEvent>(event: E) -> E::Handle {
105 event.register()
106}
107
108pub type Registered<T> = <T as RegisterInternalEvent>::Handle;
109
110#[derive(Clone, Copy)]
113pub struct ByteSize(pub usize);
114
115#[derive(Clone, Copy)]
116pub struct Count(pub usize);
117
118#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
120pub struct CountByteSize(pub usize, pub JsonSize);
121
122impl AddAssign for CountByteSize {
123 fn add_assign(&mut self, rhs: Self) {
124 self.0 += rhs.0;
125 self.1 += rhs.1;
126 }
127}
128
129impl Add<CountByteSize> for CountByteSize {
130 type Output = CountByteSize;
131
132 fn add(self, rhs: CountByteSize) -> Self::Output {
133 CountByteSize(self.0 + rhs.0, self.1 + rhs.1)
134 }
135}
136
137pub struct Output(pub Option<SharedString>);
140
141pub struct Protocol(pub SharedString);
142
143impl Protocol {
144 pub const HTTP: Protocol = Protocol(SharedString::const_str("http"));
145 pub const HTTPS: Protocol = Protocol(SharedString::const_str("https"));
146 pub const NONE: Protocol = Protocol(SharedString::const_str("none"));
147 pub const TCP: Protocol = Protocol(SharedString::const_str("tcp"));
148 pub const UDP: Protocol = Protocol(SharedString::const_str("udp"));
149 pub const UNIX: Protocol = Protocol(SharedString::const_str("unix"));
150 pub const INTERNAL: Protocol = Protocol(SharedString::const_str("internal"));
151 pub const STATIC: Protocol = Protocol(SharedString::const_str("static"));
152}
153
154impl From<&'static str> for Protocol {
155 fn from(s: &'static str) -> Self {
156 Self(SharedString::const_str(s))
157 }
158}
159
160impl From<Protocol> for SharedString {
161 fn from(value: Protocol) -> Self {
162 value.0
163 }
164}
165
166#[macro_export]
196macro_rules! registered_event {
197 ($event:ident => $($tail:tt)*) => {
199 #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
200 pub struct $event;
201
202 $crate::registered_event!(=> $event $($tail)*);
203 };
204
205 ($event:ident { $( $field:ident: $type:ty, )* } => $($tail:tt)*) => {
207 #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
208 pub struct $event {
209 $( pub $field: $type, )*
210 }
211
212 $crate::registered_event!(=> $event $($tail)*);
213 };
214
215 (
217 => $event:ident {
218 $( $field:ident: $type:ty = $value:expr, )*
219 }
220
221 fn emit(&$slf:ident, $data_name:ident: $data:ident)
222 $emit_body:block
223
224 $(fn register($fixed_name:ident: $fixed_tags:ty, $tags_name:ident: $tags:ty)
225 $register_body:block)?
226 ) => {
227 paste::paste!{
228 #[derive(Clone)]
229 pub struct [<$event Handle>] {
230 $( $field: $type, )*
231 }
232
233 impl $crate::internal_event::RegisterInternalEvent for $event {
234 type Handle = [<$event Handle>];
235
236 fn name(&self) -> Option<&'static str> {
237 Some(stringify!($event))
238 }
239
240 fn register($slf) -> Self::Handle {
241 Self::Handle {
242 $( $field: $value, )*
243 }
244 }
245 }
246
247 impl $crate::internal_event::InternalEventHandle for [<$event Handle>] {
248 type Data = $data;
249
250 fn emit(&$slf, $data_name: $data)
251 $emit_body
252 }
253
254 $(impl $crate::internal_event::cached_event::RegisterTaggedInternalEvent for $event {
255 type Tags = $tags;
256 type Fixed = $fixed_tags;
257
258 fn register(
259 $fixed_name: $fixed_tags,
260 $tags_name: $tags,
261 ) -> <Self as $crate::internal_event::RegisterInternalEvent>::Handle {
262 $register_body
263 }
264 })?
265
266 }
267 };
268}