vector/sinks/prometheus/remote_write/
sink.rs

1use std::fmt;
2
3use vector_lib::{
4    byte_size_of::ByteSizeOf,
5    event::Metric,
6    stream::batcher::{data::BatchData, limiter::ByteSizeOfItemSize},
7};
8
9use super::{
10    PartitionKey, PrometheusMetricNormalize,
11    request_builder::{RemoteWriteEncoder, RemoteWriteRequest, RemoteWriteRequestBuilder},
12};
13use crate::sinks::{prelude::*, util::buffer::metrics::MetricSet};
14
15pub(super) struct RemoteWriteMetric {
16    pub(super) metric: Metric,
17    tenant_id: Option<String>,
18}
19
20impl Finalizable for RemoteWriteMetric {
21    fn take_finalizers(&mut self) -> EventFinalizers {
22        self.metric.take_finalizers()
23    }
24}
25
26impl GetEventCountTags for RemoteWriteMetric {
27    fn get_tags(&self) -> TaggedEventsSent {
28        self.metric.get_tags()
29    }
30}
31
32impl EstimatedJsonEncodedSizeOf for RemoteWriteMetric {
33    fn estimated_json_encoded_size_of(&self) -> JsonSize {
34        self.metric.estimated_json_encoded_size_of()
35    }
36}
37
38impl ByteSizeOf for RemoteWriteMetric {
39    fn allocated_bytes(&self) -> usize {
40        self.metric.allocated_bytes()
41    }
42}
43
44#[derive(Clone, Copy, Debug, Default)]
45pub struct PrometheusRemoteWriteDefaultBatchSettings;
46
47impl SinkBatchSettings for PrometheusRemoteWriteDefaultBatchSettings {
48    const MAX_EVENTS: Option<usize> = Some(1_000);
49    const MAX_BYTES: Option<usize> = None;
50    const TIMEOUT_SECS: f64 = 1.0;
51}
52
53pub(super) struct PrometheusTenantIdPartitioner;
54
55impl Partitioner for PrometheusTenantIdPartitioner {
56    type Item = RemoteWriteMetric;
57    type Key = PartitionKey;
58
59    fn partition(&self, item: &Self::Item) -> Self::Key {
60        PartitionKey {
61            tenant_id: item.tenant_id.clone(),
62        }
63    }
64}
65
66pub(super) enum BatchedMetrics {
67    Aggregated(MetricSet),
68    Unaggregated(Vec<Metric>),
69}
70
71impl BatchedMetrics {
72    pub(super) fn into_metrics(self) -> Vec<Metric> {
73        match self {
74            BatchedMetrics::Aggregated(metrics) => metrics.into_metrics(),
75            BatchedMetrics::Unaggregated(metrics) => metrics,
76        }
77    }
78
79    pub(super) fn insert_update(&mut self, metric: Metric) {
80        match self {
81            BatchedMetrics::Aggregated(metrics) => metrics.insert_update(metric),
82            BatchedMetrics::Unaggregated(metrics) => metrics.push(metric),
83        }
84    }
85
86    pub(super) fn len(&self) -> usize {
87        match self {
88            BatchedMetrics::Aggregated(metrics) => metrics.len(),
89            BatchedMetrics::Unaggregated(metrics) => metrics.len(),
90        }
91    }
92}
93
94pub(super) struct EventCollection {
95    pub(super) finalizers: EventFinalizers,
96    pub(super) events: BatchedMetrics,
97    pub(super) events_byte_size: usize,
98    pub(super) events_json_byte_size: GroupedCountByteSize,
99}
100
101impl EventCollection {
102    /// Creates a new event collection that will either aggregate the incremental metrics
103    /// or store all the metrics, depending on the value of the `aggregate` parameter.
104    fn new(aggregate: bool) -> Self {
105        Self {
106            finalizers: Default::default(),
107            events: if aggregate {
108                BatchedMetrics::Aggregated(Default::default())
109            } else {
110                BatchedMetrics::Unaggregated(Default::default())
111            },
112            events_byte_size: Default::default(),
113            events_json_byte_size: telemetry().create_request_count_byte_size(),
114        }
115    }
116
117    const fn is_aggregated(&self) -> bool {
118        matches!(self.events, BatchedMetrics::Aggregated(_))
119    }
120}
121
122impl BatchData<RemoteWriteMetric> for EventCollection {
123    type Batch = Self;
124
125    fn len(&self) -> usize {
126        self.events.len()
127    }
128
129    fn take_batch(&mut self) -> Self::Batch {
130        let mut new = Self::new(self.is_aggregated());
131        std::mem::swap(self, &mut new);
132        new
133    }
134
135    fn push_item(&mut self, mut item: RemoteWriteMetric) {
136        self.finalizers
137            .merge(item.metric.metadata_mut().take_finalizers());
138        self.events_byte_size += item.size_of();
139        self.events_json_byte_size
140            .add_event(&item.metric, item.estimated_json_encoded_size_of());
141        self.events.insert_update(item.metric);
142    }
143}
144
145pub(super) struct RemoteWriteSink<S> {
146    pub(super) tenant_id: Option<Template>,
147    pub(super) batch_settings: BatcherSettings,
148    pub(super) aggregate: bool,
149    pub(super) compression: super::Compression,
150    pub(super) default_namespace: Option<String>,
151    pub(super) buckets: Vec<f64>,
152    pub(super) quantiles: Vec<f64>,
153    pub(super) expire_metrics_secs: Option<f64>,
154    pub(super) service: S,
155}
156
157impl<S> RemoteWriteSink<S>
158where
159    S: Service<RemoteWriteRequest> + Send + 'static,
160    S::Future: Send + 'static,
161    S::Response: DriverResponse + Send + 'static,
162    S::Error: fmt::Debug + Into<crate::Error> + Send,
163{
164    async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
165        let request_builder = RemoteWriteRequestBuilder {
166            compression: self.compression,
167            encoder: RemoteWriteEncoder {
168                default_namespace: self.default_namespace.clone(),
169                buckets: self.buckets.clone(),
170                quantiles: self.quantiles.clone(),
171            },
172        };
173
174        let batch_settings = self.batch_settings;
175        let tenant_id = self.tenant_id.clone();
176        let service = self.service;
177        let expire_metrics_secs = self.expire_metrics_secs;
178
179        input
180            .filter_map(|event| future::ready(event.try_into_metric()))
181            .normalized_with_ttl::<PrometheusMetricNormalize>(expire_metrics_secs)
182            .filter_map(move |event| {
183                future::ready(make_remote_write_event(tenant_id.as_ref(), event))
184            })
185            .batched_partitioned(PrometheusTenantIdPartitioner, || {
186                batch_settings
187                    .as_reducer_config(ByteSizeOfItemSize, EventCollection::new(self.aggregate))
188            })
189            .request_builder(default_request_builder_concurrency_limit(), request_builder)
190            .filter_map(|request| async move {
191                match request {
192                    Err(e) => {
193                        error!("Failed to build Remote Write request: {:?}.", e);
194                        None
195                    }
196                    Ok(req) => Some(req),
197                }
198            })
199            .into_driver(service)
200            .run()
201            .await
202    }
203}
204
205#[async_trait]
206impl<S> StreamSink<Event> for RemoteWriteSink<S>
207where
208    S: Service<RemoteWriteRequest> + Send + 'static,
209    S::Future: Send + 'static,
210    S::Response: DriverResponse + Send + 'static,
211    S::Error: fmt::Debug + Into<crate::Error> + Send,
212{
213    async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
214        self.run_inner(input).await
215    }
216}
217
218fn make_remote_write_event(
219    tenant_id: Option<&Template>,
220    metric: Metric,
221) -> Option<RemoteWriteMetric> {
222    let tenant_id = tenant_id.and_then(|template| {
223        template
224            .render_string(&metric)
225            .map_err(|error| {
226                emit!(TemplateRenderingError {
227                    error,
228                    field: Some("tenant_id"),
229                    drop_event: true,
230                })
231            })
232            .ok()
233    });
234
235    Some(RemoteWriteMetric { metric, tenant_id })
236}