vector_common/
request_metadata.rs

1use std::{
2    collections::HashMap,
3    ops::{Add, AddAssign},
4};
5
6use crate::{
7    internal_event::{
8        CountByteSize, InternalEventHandle, RegisterTaggedInternalEvent, RegisteredEventCache,
9        TaggedEventsSent,
10    },
11    json_size::JsonSize,
12};
13
14/// Must be implemented by events to get the tags that will be attached to
15/// the `component_sent_event_*` emitted metrics.
16pub trait GetEventCountTags {
17    fn get_tags(&self) -> TaggedEventsSent;
18}
19
20/// Keeps track of the estimated json size of a given batch of events by
21/// source and service.
22#[derive(Clone, Debug)]
23pub enum GroupedCountByteSize {
24    /// When we need to keep track of the events by certain tags we use this
25    /// variant.
26    Tagged {
27        sizes: HashMap<TaggedEventsSent, CountByteSize>,
28    },
29    /// If we don't need to track the events by certain tags we can use
30    /// this variant to avoid allocating a `HashMap`,
31    Untagged { size: CountByteSize },
32}
33
34impl Default for GroupedCountByteSize {
35    fn default() -> Self {
36        Self::Untagged {
37            size: CountByteSize(0, JsonSize::zero()),
38        }
39    }
40}
41
42impl GroupedCountByteSize {
43    /// Creates a new Tagged variant for when we need to track events by
44    /// certain tags.
45    #[must_use]
46    pub fn new_tagged() -> Self {
47        Self::Tagged {
48            sizes: HashMap::new(),
49        }
50    }
51
52    /// Creates a new Tagged variant for when we do not need to track events by
53    /// tags.
54    #[must_use]
55    pub fn new_untagged() -> Self {
56        Self::Untagged {
57            size: CountByteSize(0, JsonSize::zero()),
58        }
59    }
60
61    /// Returns a `HashMap` of tags => event counts for when we are tracking by tags.
62    /// Returns `None` if we are not tracking by tags.
63    #[must_use]
64    #[cfg(any(test, feature = "test"))]
65    pub fn sizes(&self) -> Option<&HashMap<TaggedEventsSent, CountByteSize>> {
66        match self {
67            Self::Tagged { sizes } => Some(sizes),
68            Self::Untagged { .. } => None,
69        }
70    }
71
72    /// Returns a single count for when we are not tracking by tags.
73    #[must_use]
74    #[cfg(any(test, feature = "test"))]
75    pub fn size(&self) -> Option<CountByteSize> {
76        match self {
77            Self::Tagged { .. } => None,
78            Self::Untagged { size } => Some(*size),
79        }
80    }
81
82    /// Adds the given estimated json size of the event to current count.
83    pub fn add_event<E>(&mut self, event: &E, json_size: JsonSize)
84    where
85        E: GetEventCountTags,
86    {
87        match self {
88            Self::Tagged { sizes } => {
89                let size = CountByteSize(1, json_size);
90                let tags = event.get_tags();
91
92                match sizes.get_mut(&tags) {
93                    Some(current) => {
94                        *current += size;
95                    }
96                    None => {
97                        sizes.insert(tags, size);
98                    }
99                }
100            }
101            Self::Untagged { size } => {
102                *size += CountByteSize(1, json_size);
103            }
104        }
105    }
106
107    /// Emits our counts to a `RegisteredEvent` cached event.
108    pub fn emit_event<T, H>(&self, event_cache: &RegisteredEventCache<(), T>)
109    where
110        T: RegisterTaggedInternalEvent<Tags = TaggedEventsSent, Fixed = (), Handle = H>,
111        H: InternalEventHandle<Data = CountByteSize>,
112    {
113        match self {
114            GroupedCountByteSize::Tagged { sizes } => {
115                for (tags, size) in sizes {
116                    event_cache.emit(tags, *size);
117                }
118            }
119            GroupedCountByteSize::Untagged { size } => {
120                event_cache.emit(&TaggedEventsSent::new_unspecified(), *size);
121            }
122        }
123    }
124
125    /// Returns `true` if we are the `Tagged` variant - keeping track of the byte sizes
126    /// grouped by their relevant tags.
127    #[must_use]
128    pub fn is_tagged(&self) -> bool {
129        match self {
130            GroupedCountByteSize::Tagged { .. } => true,
131            GroupedCountByteSize::Untagged { .. } => false,
132        }
133    }
134
135    /// Returns `true` if we are the `Untagged` variant - keeping a single count for all events.
136    #[must_use]
137    pub fn is_untagged(&self) -> bool {
138        !self.is_tagged()
139    }
140}
141
142impl From<CountByteSize> for GroupedCountByteSize {
143    fn from(value: CountByteSize) -> Self {
144        Self::Untagged { size: value }
145    }
146}
147
148impl AddAssign for GroupedCountByteSize {
149    fn add_assign(&mut self, mut rhs: Self) {
150        if self.is_untagged() && rhs.is_tagged() {
151            // First handle the case where we are untagged and assigning to a tagged value.
152            // We need to change `self` and so need to ensure our match doesn't take ownership of the object.
153            *self = match (&self, &mut rhs) {
154                (Self::Untagged { size }, Self::Tagged { sizes }) => {
155                    let mut sizes = std::mem::take(sizes);
156                    match sizes.get_mut(&TaggedEventsSent::new_empty()) {
157                        Some(empty_size) => *empty_size += *size,
158                        None => {
159                            sizes.insert(TaggedEventsSent::new_empty(), *size);
160                        }
161                    }
162
163                    Self::Tagged { sizes }
164                }
165                _ => {
166                    unreachable!()
167                }
168            };
169
170            return;
171        }
172
173        // For these cases, we know we won't have to change `self` so the match can take ownership.
174        match (self, rhs) {
175            (Self::Tagged { sizes: ref mut lhs }, Self::Tagged { sizes: rhs }) => {
176                for (key, value) in rhs {
177                    match lhs.get_mut(&key) {
178                        Some(size) => *size += value,
179                        None => {
180                            lhs.insert(key.clone(), value);
181                        }
182                    }
183                }
184            }
185
186            (Self::Untagged { size: lhs }, Self::Untagged { size: rhs }) => {
187                *lhs = *lhs + rhs;
188            }
189
190            (Self::Tagged { ref mut sizes }, Self::Untagged { size }) => {
191                match sizes.get_mut(&TaggedEventsSent::new_empty()) {
192                    Some(empty_size) => *empty_size += size,
193                    None => {
194                        sizes.insert(TaggedEventsSent::new_empty(), size);
195                    }
196                }
197            }
198            (Self::Untagged { .. }, Self::Tagged { .. }) => unreachable!(),
199        }
200    }
201}
202
203impl<'a> Add<&'a GroupedCountByteSize> for GroupedCountByteSize {
204    type Output = GroupedCountByteSize;
205
206    fn add(self, other: &'a Self::Output) -> Self::Output {
207        match (self, other) {
208            (Self::Tagged { sizes: mut lhs }, Self::Tagged { sizes: rhs }) => {
209                for (key, value) in rhs {
210                    match lhs.get_mut(key) {
211                        Some(size) => *size += *value,
212                        None => {
213                            lhs.insert(key.clone(), *value);
214                        }
215                    }
216                }
217
218                Self::Tagged { sizes: lhs }
219            }
220
221            (Self::Untagged { size: lhs }, Self::Untagged { size: rhs }) => {
222                Self::Untagged { size: lhs + *rhs }
223            }
224
225            // The following two scenarios shouldn't really occur in practice, but are provided for completeness.
226            (Self::Tagged { mut sizes }, Self::Untagged { size }) => {
227                match sizes.get_mut(&TaggedEventsSent::new_empty()) {
228                    Some(empty_size) => *empty_size += *size,
229                    None => {
230                        sizes.insert(TaggedEventsSent::new_empty(), *size);
231                    }
232                }
233
234                Self::Tagged { sizes }
235            }
236            (Self::Untagged { size }, Self::Tagged { sizes }) => {
237                let mut sizes = sizes.clone();
238                match sizes.get_mut(&TaggedEventsSent::new_empty()) {
239                    Some(empty_size) => *empty_size += size,
240                    None => {
241                        sizes.insert(TaggedEventsSent::new_empty(), size);
242                    }
243                }
244
245                Self::Tagged { sizes }
246            }
247        }
248    }
249}
250
251/// Metadata for batch requests.
252#[derive(Clone, Debug, Default)]
253pub struct RequestMetadata {
254    /// Number of events represented by this batch request.
255    event_count: usize,
256    /// Size, in bytes, of the in-memory representation of all events in this batch request.
257    events_byte_size: usize,
258    /// Size, in bytes, of the estimated JSON-encoded representation of all events in this batch request.
259    events_estimated_json_encoded_byte_size: GroupedCountByteSize,
260    /// Uncompressed size, in bytes, of the encoded events in this batch request.
261    request_encoded_size: usize,
262    /// On-the-wire size, in bytes, of the batch request itself after compression, etc.
263    ///
264    /// This is akin to the bytes sent/received over the network, regardless of whether or not compression was used.
265    request_wire_size: usize,
266}
267
268impl RequestMetadata {
269    #[must_use]
270    pub fn new(
271        event_count: usize,
272        events_byte_size: usize,
273        request_encoded_size: usize,
274        request_wire_size: usize,
275        events_estimated_json_encoded_byte_size: GroupedCountByteSize,
276    ) -> Self {
277        Self {
278            event_count,
279            events_byte_size,
280            events_estimated_json_encoded_byte_size,
281            request_encoded_size,
282            request_wire_size,
283        }
284    }
285
286    #[must_use]
287    pub const fn event_count(&self) -> usize {
288        self.event_count
289    }
290
291    #[must_use]
292    pub const fn events_byte_size(&self) -> usize {
293        self.events_byte_size
294    }
295
296    #[must_use]
297    pub fn events_estimated_json_encoded_byte_size(&self) -> &GroupedCountByteSize {
298        &self.events_estimated_json_encoded_byte_size
299    }
300
301    /// Consumes the object and returns the byte size of the request grouped by
302    /// the tags (source and service).
303    #[must_use]
304    pub fn into_events_estimated_json_encoded_byte_size(self) -> GroupedCountByteSize {
305        self.events_estimated_json_encoded_byte_size
306    }
307
308    #[must_use]
309    pub const fn request_encoded_size(&self) -> usize {
310        self.request_encoded_size
311    }
312
313    #[must_use]
314    pub const fn request_wire_size(&self) -> usize {
315        self.request_wire_size
316    }
317
318    /// Constructs a `RequestMetadata` by summation of the "batch" of `RequestMetadata` provided.
319    #[must_use]
320    pub fn from_batch<T: IntoIterator<Item = RequestMetadata>>(metadata_iter: T) -> Self {
321        let mut metadata_sum = RequestMetadata::new(0, 0, 0, 0, GroupedCountByteSize::default());
322
323        for metadata in metadata_iter {
324            metadata_sum = metadata_sum + &metadata;
325        }
326        metadata_sum
327    }
328}
329
330impl<'a> Add<&'a RequestMetadata> for RequestMetadata {
331    type Output = RequestMetadata;
332
333    /// Adds the other `RequestMetadata` to this one.
334    fn add(self, other: &'a Self::Output) -> Self::Output {
335        Self::Output {
336            event_count: self.event_count + other.event_count,
337            events_byte_size: self.events_byte_size + other.events_byte_size,
338            events_estimated_json_encoded_byte_size: self.events_estimated_json_encoded_byte_size
339                + &other.events_estimated_json_encoded_byte_size,
340            request_encoded_size: self.request_encoded_size + other.request_encoded_size,
341            request_wire_size: self.request_wire_size + other.request_wire_size,
342        }
343    }
344}
345
346/// Objects implementing this trait have metadata that describes the request.
347pub trait MetaDescriptive {
348    /// Returns the `RequestMetadata` associated with this object.
349    fn get_metadata(&self) -> &RequestMetadata;
350
351    // Returns a mutable reference to the `RequestMetadata` associated with this object.
352    fn metadata_mut(&mut self) -> &mut RequestMetadata;
353}
354
355#[cfg(test)]
356mod tests {
357    use std::sync::Arc;
358
359    use crate::{config::ComponentKey, internal_event::OptionalTag};
360
361    use super::*;
362
363    struct DummyEvent {
364        source: OptionalTag<Arc<ComponentKey>>,
365        service: OptionalTag<String>,
366    }
367
368    impl GetEventCountTags for DummyEvent {
369        fn get_tags(&self) -> TaggedEventsSent {
370            TaggedEventsSent {
371                source: self.source.clone(),
372                service: self.service.clone(),
373            }
374        }
375    }
376
377    #[test]
378    fn add_request_count_bytesize_event_untagged() {
379        let mut bytesize = GroupedCountByteSize::new_untagged();
380        let event = DummyEvent {
381            source: Some(Arc::new(ComponentKey::from("carrot"))).into(),
382            service: Some("cabbage".to_string()).into(),
383        };
384
385        bytesize.add_event(&event, JsonSize::new(42));
386
387        let event = DummyEvent {
388            source: Some(Arc::new(ComponentKey::from("pea"))).into(),
389            service: Some("potato".to_string()).into(),
390        };
391
392        bytesize.add_event(&event, JsonSize::new(36));
393
394        assert_eq!(Some(CountByteSize(2, JsonSize::new(78))), bytesize.size());
395        assert_eq!(None, bytesize.sizes());
396    }
397
398    #[test]
399    fn add_request_count_bytesize_event_tagged() {
400        let mut bytesize = GroupedCountByteSize::new_tagged();
401        let event = DummyEvent {
402            source: OptionalTag::Ignored,
403            service: Some("cabbage".to_string()).into(),
404        };
405
406        bytesize.add_event(&event, JsonSize::new(42));
407
408        let event = DummyEvent {
409            source: OptionalTag::Ignored,
410            service: Some("cabbage".to_string()).into(),
411        };
412
413        bytesize.add_event(&event, JsonSize::new(36));
414
415        let event = DummyEvent {
416            source: OptionalTag::Ignored,
417            service: Some("tomato".to_string()).into(),
418        };
419
420        bytesize.add_event(&event, JsonSize::new(23));
421
422        assert_eq!(None, bytesize.size());
423        let mut sizes = bytesize
424            .sizes()
425            .unwrap()
426            .clone()
427            .into_iter()
428            .collect::<Vec<_>>();
429        sizes.sort();
430
431        assert_eq!(
432            vec![
433                (
434                    TaggedEventsSent {
435                        source: OptionalTag::Ignored,
436                        service: Some("cabbage".to_string()).into()
437                    },
438                    CountByteSize(2, JsonSize::new(78))
439                ),
440                (
441                    TaggedEventsSent {
442                        source: OptionalTag::Ignored,
443                        service: Some("tomato".to_string()).into()
444                    },
445                    CountByteSize(1, JsonSize::new(23))
446                ),
447            ],
448            sizes
449        );
450    }
451}