1use std::{
2 collections::HashMap,
3 ops::{Add, AddAssign},
4};
5
6use crate::{
7 internal_event::{
8 CountByteSize, InternalEventHandle, RegisterTaggedInternalEvent, RegisteredEventCache,
9 TaggedEventsSent,
10 },
11 json_size::JsonSize,
12};
13
14pub trait GetEventCountTags {
17 fn get_tags(&self) -> TaggedEventsSent;
18}
19
20#[derive(Clone, Debug)]
23pub enum GroupedCountByteSize {
24 Tagged {
27 sizes: HashMap<TaggedEventsSent, CountByteSize>,
28 },
29 Untagged { size: CountByteSize },
32}
33
34impl Default for GroupedCountByteSize {
35 fn default() -> Self {
36 Self::Untagged {
37 size: CountByteSize(0, JsonSize::zero()),
38 }
39 }
40}
41
42impl GroupedCountByteSize {
43 #[must_use]
46 pub fn new_tagged() -> Self {
47 Self::Tagged {
48 sizes: HashMap::new(),
49 }
50 }
51
52 #[must_use]
55 pub fn new_untagged() -> Self {
56 Self::Untagged {
57 size: CountByteSize(0, JsonSize::zero()),
58 }
59 }
60
61 #[must_use]
64 #[cfg(any(test, feature = "test"))]
65 pub fn sizes(&self) -> Option<&HashMap<TaggedEventsSent, CountByteSize>> {
66 match self {
67 Self::Tagged { sizes } => Some(sizes),
68 Self::Untagged { .. } => None,
69 }
70 }
71
72 #[must_use]
74 #[cfg(any(test, feature = "test"))]
75 pub fn size(&self) -> Option<CountByteSize> {
76 match self {
77 Self::Tagged { .. } => None,
78 Self::Untagged { size } => Some(*size),
79 }
80 }
81
82 pub fn add_event<E>(&mut self, event: &E, json_size: JsonSize)
84 where
85 E: GetEventCountTags,
86 {
87 match self {
88 Self::Tagged { sizes } => {
89 let size = CountByteSize(1, json_size);
90 let tags = event.get_tags();
91
92 match sizes.get_mut(&tags) {
93 Some(current) => {
94 *current += size;
95 }
96 None => {
97 sizes.insert(tags, size);
98 }
99 }
100 }
101 Self::Untagged { size } => {
102 *size += CountByteSize(1, json_size);
103 }
104 }
105 }
106
107 pub fn emit_event<T, H>(&self, event_cache: &RegisteredEventCache<(), T>)
109 where
110 T: RegisterTaggedInternalEvent<Tags = TaggedEventsSent, Fixed = (), Handle = H>,
111 H: InternalEventHandle<Data = CountByteSize>,
112 {
113 match self {
114 GroupedCountByteSize::Tagged { sizes } => {
115 for (tags, size) in sizes {
116 event_cache.emit(tags, *size);
117 }
118 }
119 GroupedCountByteSize::Untagged { size } => {
120 event_cache.emit(&TaggedEventsSent::new_unspecified(), *size);
121 }
122 }
123 }
124
125 #[must_use]
128 pub fn is_tagged(&self) -> bool {
129 match self {
130 GroupedCountByteSize::Tagged { .. } => true,
131 GroupedCountByteSize::Untagged { .. } => false,
132 }
133 }
134
135 #[must_use]
137 pub fn is_untagged(&self) -> bool {
138 !self.is_tagged()
139 }
140}
141
142impl From<CountByteSize> for GroupedCountByteSize {
143 fn from(value: CountByteSize) -> Self {
144 Self::Untagged { size: value }
145 }
146}
147
148impl AddAssign for GroupedCountByteSize {
149 fn add_assign(&mut self, mut rhs: Self) {
150 if self.is_untagged() && rhs.is_tagged() {
151 *self = match (&self, &mut rhs) {
154 (Self::Untagged { size }, Self::Tagged { sizes }) => {
155 let mut sizes = std::mem::take(sizes);
156 match sizes.get_mut(&TaggedEventsSent::new_empty()) {
157 Some(empty_size) => *empty_size += *size,
158 None => {
159 sizes.insert(TaggedEventsSent::new_empty(), *size);
160 }
161 }
162
163 Self::Tagged { sizes }
164 }
165 _ => {
166 unreachable!()
167 }
168 };
169
170 return;
171 }
172
173 match (self, rhs) {
175 (Self::Tagged { sizes: ref mut lhs }, Self::Tagged { sizes: rhs }) => {
176 for (key, value) in rhs {
177 match lhs.get_mut(&key) {
178 Some(size) => *size += value,
179 None => {
180 lhs.insert(key.clone(), value);
181 }
182 }
183 }
184 }
185
186 (Self::Untagged { size: lhs }, Self::Untagged { size: rhs }) => {
187 *lhs = *lhs + rhs;
188 }
189
190 (Self::Tagged { ref mut sizes }, Self::Untagged { size }) => {
191 match sizes.get_mut(&TaggedEventsSent::new_empty()) {
192 Some(empty_size) => *empty_size += size,
193 None => {
194 sizes.insert(TaggedEventsSent::new_empty(), size);
195 }
196 }
197 }
198 (Self::Untagged { .. }, Self::Tagged { .. }) => unreachable!(),
199 }
200 }
201}
202
203impl<'a> Add<&'a GroupedCountByteSize> for GroupedCountByteSize {
204 type Output = GroupedCountByteSize;
205
206 fn add(self, other: &'a Self::Output) -> Self::Output {
207 match (self, other) {
208 (Self::Tagged { sizes: mut lhs }, Self::Tagged { sizes: rhs }) => {
209 for (key, value) in rhs {
210 match lhs.get_mut(key) {
211 Some(size) => *size += *value,
212 None => {
213 lhs.insert(key.clone(), *value);
214 }
215 }
216 }
217
218 Self::Tagged { sizes: lhs }
219 }
220
221 (Self::Untagged { size: lhs }, Self::Untagged { size: rhs }) => {
222 Self::Untagged { size: lhs + *rhs }
223 }
224
225 (Self::Tagged { mut sizes }, Self::Untagged { size }) => {
227 match sizes.get_mut(&TaggedEventsSent::new_empty()) {
228 Some(empty_size) => *empty_size += *size,
229 None => {
230 sizes.insert(TaggedEventsSent::new_empty(), *size);
231 }
232 }
233
234 Self::Tagged { sizes }
235 }
236 (Self::Untagged { size }, Self::Tagged { sizes }) => {
237 let mut sizes = sizes.clone();
238 match sizes.get_mut(&TaggedEventsSent::new_empty()) {
239 Some(empty_size) => *empty_size += size,
240 None => {
241 sizes.insert(TaggedEventsSent::new_empty(), size);
242 }
243 }
244
245 Self::Tagged { sizes }
246 }
247 }
248 }
249}
250
251#[derive(Clone, Debug, Default)]
253pub struct RequestMetadata {
254 event_count: usize,
256 events_byte_size: usize,
258 events_estimated_json_encoded_byte_size: GroupedCountByteSize,
260 request_encoded_size: usize,
262 request_wire_size: usize,
266}
267
268impl RequestMetadata {
269 #[must_use]
270 pub fn new(
271 event_count: usize,
272 events_byte_size: usize,
273 request_encoded_size: usize,
274 request_wire_size: usize,
275 events_estimated_json_encoded_byte_size: GroupedCountByteSize,
276 ) -> Self {
277 Self {
278 event_count,
279 events_byte_size,
280 events_estimated_json_encoded_byte_size,
281 request_encoded_size,
282 request_wire_size,
283 }
284 }
285
286 #[must_use]
287 pub const fn event_count(&self) -> usize {
288 self.event_count
289 }
290
291 #[must_use]
292 pub const fn events_byte_size(&self) -> usize {
293 self.events_byte_size
294 }
295
296 #[must_use]
297 pub fn events_estimated_json_encoded_byte_size(&self) -> &GroupedCountByteSize {
298 &self.events_estimated_json_encoded_byte_size
299 }
300
301 #[must_use]
304 pub fn into_events_estimated_json_encoded_byte_size(self) -> GroupedCountByteSize {
305 self.events_estimated_json_encoded_byte_size
306 }
307
308 #[must_use]
309 pub const fn request_encoded_size(&self) -> usize {
310 self.request_encoded_size
311 }
312
313 #[must_use]
314 pub const fn request_wire_size(&self) -> usize {
315 self.request_wire_size
316 }
317
318 #[must_use]
320 pub fn from_batch<T: IntoIterator<Item = RequestMetadata>>(metadata_iter: T) -> Self {
321 let mut metadata_sum = RequestMetadata::new(0, 0, 0, 0, GroupedCountByteSize::default());
322
323 for metadata in metadata_iter {
324 metadata_sum = metadata_sum + &metadata;
325 }
326 metadata_sum
327 }
328}
329
330impl<'a> Add<&'a RequestMetadata> for RequestMetadata {
331 type Output = RequestMetadata;
332
333 fn add(self, other: &'a Self::Output) -> Self::Output {
335 Self::Output {
336 event_count: self.event_count + other.event_count,
337 events_byte_size: self.events_byte_size + other.events_byte_size,
338 events_estimated_json_encoded_byte_size: self.events_estimated_json_encoded_byte_size
339 + &other.events_estimated_json_encoded_byte_size,
340 request_encoded_size: self.request_encoded_size + other.request_encoded_size,
341 request_wire_size: self.request_wire_size + other.request_wire_size,
342 }
343 }
344}
345
346pub trait MetaDescriptive {
348 fn get_metadata(&self) -> &RequestMetadata;
350
351 fn metadata_mut(&mut self) -> &mut RequestMetadata;
353}
354
355#[cfg(test)]
356mod tests {
357 use std::sync::Arc;
358
359 use crate::{config::ComponentKey, internal_event::OptionalTag};
360
361 use super::*;
362
363 struct DummyEvent {
364 source: OptionalTag<Arc<ComponentKey>>,
365 service: OptionalTag<String>,
366 }
367
368 impl GetEventCountTags for DummyEvent {
369 fn get_tags(&self) -> TaggedEventsSent {
370 TaggedEventsSent {
371 source: self.source.clone(),
372 service: self.service.clone(),
373 }
374 }
375 }
376
377 #[test]
378 fn add_request_count_bytesize_event_untagged() {
379 let mut bytesize = GroupedCountByteSize::new_untagged();
380 let event = DummyEvent {
381 source: Some(Arc::new(ComponentKey::from("carrot"))).into(),
382 service: Some("cabbage".to_string()).into(),
383 };
384
385 bytesize.add_event(&event, JsonSize::new(42));
386
387 let event = DummyEvent {
388 source: Some(Arc::new(ComponentKey::from("pea"))).into(),
389 service: Some("potato".to_string()).into(),
390 };
391
392 bytesize.add_event(&event, JsonSize::new(36));
393
394 assert_eq!(Some(CountByteSize(2, JsonSize::new(78))), bytesize.size());
395 assert_eq!(None, bytesize.sizes());
396 }
397
398 #[test]
399 fn add_request_count_bytesize_event_tagged() {
400 let mut bytesize = GroupedCountByteSize::new_tagged();
401 let event = DummyEvent {
402 source: OptionalTag::Ignored,
403 service: Some("cabbage".to_string()).into(),
404 };
405
406 bytesize.add_event(&event, JsonSize::new(42));
407
408 let event = DummyEvent {
409 source: OptionalTag::Ignored,
410 service: Some("cabbage".to_string()).into(),
411 };
412
413 bytesize.add_event(&event, JsonSize::new(36));
414
415 let event = DummyEvent {
416 source: OptionalTag::Ignored,
417 service: Some("tomato".to_string()).into(),
418 };
419
420 bytesize.add_event(&event, JsonSize::new(23));
421
422 assert_eq!(None, bytesize.size());
423 let mut sizes = bytesize
424 .sizes()
425 .unwrap()
426 .clone()
427 .into_iter()
428 .collect::<Vec<_>>();
429 sizes.sort();
430
431 assert_eq!(
432 vec![
433 (
434 TaggedEventsSent {
435 source: OptionalTag::Ignored,
436 service: Some("cabbage".to_string()).into()
437 },
438 CountByteSize(2, JsonSize::new(78))
439 ),
440 (
441 TaggedEventsSent {
442 source: OptionalTag::Ignored,
443 service: Some("tomato".to_string()).into()
444 },
445 CountByteSize(1, JsonSize::new(23))
446 ),
447 ],
448 sizes
449 );
450 }
451}