vector/sinks/greptimedb/metrics/
batch.rs1use vector_lib::{
2 event::{Metric, MetricValue},
3 stream::batcher::limiter::ItemBatchSize,
4};
5
6use super::request_builder::{
7 DISTRIBUTION_QUANTILES, DISTRIBUTION_STAT_FIELD_COUNT, SUMMARY_STAT_FIELD_COUNT,
8};
9
10const F64_BYTE_SIZE: usize = 8;
11const I64_BYTE_SIZE: usize = 8;
12
13#[derive(Default)]
15pub struct GreptimeDBBatchSizer;
16
17impl GreptimeDBBatchSizer {
18 pub fn estimated_size_of(&self, item: &Metric) -> usize {
19 item.series().name().name().len()
21 + item.series().name().namespace().map(|s| s.len() + 1).unwrap_or(0)
23 + item.series().tags().map(|t| {
25 t.iter_all().map(|(k, v)| {
26 k.len() + 1 + v.map(|v| v.len()).unwrap_or(0)
27 })
28 .sum()
29 })
30 .unwrap_or(0)
31 + I64_BYTE_SIZE
33 +
34 match item.value() {
36 MetricValue::Counter { .. } | MetricValue::Gauge { .. } | MetricValue::Set { ..} => F64_BYTE_SIZE,
37 MetricValue::Distribution { .. } => F64_BYTE_SIZE * (DISTRIBUTION_QUANTILES.len() + DISTRIBUTION_STAT_FIELD_COUNT),
38 MetricValue::AggregatedHistogram { buckets, .. } => F64_BYTE_SIZE * (buckets.len() + SUMMARY_STAT_FIELD_COUNT),
39 MetricValue::AggregatedSummary { quantiles, .. } => F64_BYTE_SIZE * (quantiles.len() + SUMMARY_STAT_FIELD_COUNT),
40 MetricValue::Sketch { .. } => F64_BYTE_SIZE * (DISTRIBUTION_QUANTILES.len() + DISTRIBUTION_STAT_FIELD_COUNT),
41 }
42 }
43}
44
45impl ItemBatchSize<Metric> for GreptimeDBBatchSizer {
46 fn size(&self, item: &Metric) -> usize {
47 self.estimated_size_of(item)
48 }
49}