vector/sinks/util/
metadata.rs

1use std::num::NonZeroUsize;
2
3use vector_lib::request_metadata::{GetEventCountTags, GroupedCountByteSize, RequestMetadata};
4use vector_lib::{config, ByteSizeOf, EstimatedJsonEncodedSizeOf};
5
6use super::request_builder::EncodeResult;
7
8#[derive(Clone, Default)]
9pub struct RequestMetadataBuilder {
10    event_count: usize,
11    events_byte_size: usize,
12    grouped_events_byte_size: GroupedCountByteSize,
13}
14
15impl RequestMetadataBuilder {
16    pub fn from_events<E>(events: &[E]) -> Self
17    where
18        E: ByteSizeOf + GetEventCountTags + EstimatedJsonEncodedSizeOf,
19    {
20        let mut size = config::telemetry().create_request_count_byte_size();
21
22        let mut events_byte_size = 0;
23
24        for event in events {
25            events_byte_size += event.size_of();
26            size.add_event(event, event.estimated_json_encoded_size_of());
27        }
28
29        Self {
30            event_count: events.len(),
31            events_byte_size,
32            grouped_events_byte_size: size,
33        }
34    }
35
36    pub fn from_event<E>(event: &E) -> Self
37    where
38        E: ByteSizeOf + GetEventCountTags + EstimatedJsonEncodedSizeOf,
39    {
40        let mut size = config::telemetry().create_request_count_byte_size();
41        size.add_event(event, event.estimated_json_encoded_size_of());
42
43        Self {
44            event_count: 1,
45            events_byte_size: event.size_of(),
46            grouped_events_byte_size: size,
47        }
48    }
49
50    pub const fn new(
51        event_count: usize,
52        events_byte_size: usize,
53        grouped_events_byte_size: GroupedCountByteSize,
54    ) -> Self {
55        Self {
56            event_count,
57            events_byte_size,
58            grouped_events_byte_size,
59        }
60    }
61
62    pub fn track_event<E>(&mut self, event: E)
63    where
64        E: ByteSizeOf + GetEventCountTags + EstimatedJsonEncodedSizeOf,
65    {
66        self.event_count += 1;
67        self.events_byte_size += event.size_of();
68        let json_size = event.estimated_json_encoded_size_of();
69        self.grouped_events_byte_size.add_event(&event, json_size);
70    }
71
72    /// Builds the [`RequestMetadata`] with the given size.
73    /// This is used when there is no encoder in the process to provide an `EncodeResult`
74    pub fn with_request_size(&self, size: NonZeroUsize) -> RequestMetadata {
75        let size = size.get();
76
77        RequestMetadata::new(
78            self.event_count,
79            self.events_byte_size,
80            size,
81            size,
82            self.grouped_events_byte_size.clone(),
83        )
84    }
85
86    /// Builds the [`RequestMetadata`] from the results of encoding.
87    /// `EncodeResult` provides us with the byte size before and after compression
88    /// and the json size of the events after transforming (dropping unwanted fields) but
89    /// before encoding.
90    pub fn build<T>(&self, result: &EncodeResult<T>) -> RequestMetadata {
91        RequestMetadata::new(
92            self.event_count,
93            self.events_byte_size,
94            result.uncompressed_byte_size,
95            result
96                .compressed_byte_size
97                .unwrap_or(result.uncompressed_byte_size),
98            // Building from an encoded result, we take the json size from the encoded since that has the size
99            // after transforming the event.
100            result.transformed_json_size.clone(),
101        )
102    }
103}