vector/sinks/prometheus/remote_write/
sink.rs1use std::fmt;
2
3use vector_lib::{
4 byte_size_of::ByteSizeOf,
5 event::Metric,
6 stream::batcher::{data::BatchData, limiter::ByteSizeOfItemSize},
7};
8
9use super::{
10 PartitionKey, PrometheusMetricNormalize,
11 request_builder::{RemoteWriteEncoder, RemoteWriteRequest, RemoteWriteRequestBuilder},
12};
13use crate::sinks::{prelude::*, util::buffer::metrics::MetricSet};
14
15pub(super) struct RemoteWriteMetric {
16 pub(super) metric: Metric,
17 tenant_id: Option<String>,
18}
19
20impl Finalizable for RemoteWriteMetric {
21 fn take_finalizers(&mut self) -> EventFinalizers {
22 self.metric.take_finalizers()
23 }
24}
25
26impl GetEventCountTags for RemoteWriteMetric {
27 fn get_tags(&self) -> TaggedEventsSent {
28 self.metric.get_tags()
29 }
30}
31
32impl EstimatedJsonEncodedSizeOf for RemoteWriteMetric {
33 fn estimated_json_encoded_size_of(&self) -> JsonSize {
34 self.metric.estimated_json_encoded_size_of()
35 }
36}
37
38impl ByteSizeOf for RemoteWriteMetric {
39 fn allocated_bytes(&self) -> usize {
40 self.metric.allocated_bytes()
41 }
42}
43
44#[derive(Clone, Copy, Debug, Default)]
45pub struct PrometheusRemoteWriteDefaultBatchSettings;
46
47impl SinkBatchSettings for PrometheusRemoteWriteDefaultBatchSettings {
48 const MAX_EVENTS: Option<usize> = Some(1_000);
49 const MAX_BYTES: Option<usize> = None;
50 const TIMEOUT_SECS: f64 = 1.0;
51}
52
53pub(super) struct PrometheusTenantIdPartitioner;
54
55impl Partitioner for PrometheusTenantIdPartitioner {
56 type Item = RemoteWriteMetric;
57 type Key = PartitionKey;
58
59 fn partition(&self, item: &Self::Item) -> Self::Key {
60 PartitionKey {
61 tenant_id: item.tenant_id.clone(),
62 }
63 }
64}
65
66pub(super) enum BatchedMetrics {
67 Aggregated(MetricSet),
68 Unaggregated(Vec<Metric>),
69}
70
71impl BatchedMetrics {
72 pub(super) fn into_metrics(self) -> Vec<Metric> {
73 match self {
74 BatchedMetrics::Aggregated(metrics) => metrics.into_metrics(),
75 BatchedMetrics::Unaggregated(metrics) => metrics,
76 }
77 }
78
79 pub(super) fn insert_update(&mut self, metric: Metric) {
80 match self {
81 BatchedMetrics::Aggregated(metrics) => metrics.insert_update(metric),
82 BatchedMetrics::Unaggregated(metrics) => metrics.push(metric),
83 }
84 }
85
86 pub(super) fn len(&self) -> usize {
87 match self {
88 BatchedMetrics::Aggregated(metrics) => metrics.len(),
89 BatchedMetrics::Unaggregated(metrics) => metrics.len(),
90 }
91 }
92}
93
94pub(super) struct EventCollection {
95 pub(super) finalizers: EventFinalizers,
96 pub(super) events: BatchedMetrics,
97 pub(super) events_byte_size: usize,
98 pub(super) events_json_byte_size: GroupedCountByteSize,
99}
100
101impl EventCollection {
102 fn new(aggregate: bool) -> Self {
105 Self {
106 finalizers: Default::default(),
107 events: if aggregate {
108 BatchedMetrics::Aggregated(Default::default())
109 } else {
110 BatchedMetrics::Unaggregated(Default::default())
111 },
112 events_byte_size: Default::default(),
113 events_json_byte_size: telemetry().create_request_count_byte_size(),
114 }
115 }
116
117 const fn is_aggregated(&self) -> bool {
118 matches!(self.events, BatchedMetrics::Aggregated(_))
119 }
120}
121
122impl BatchData<RemoteWriteMetric> for EventCollection {
123 type Batch = Self;
124
125 fn len(&self) -> usize {
126 self.events.len()
127 }
128
129 fn take_batch(&mut self) -> Self::Batch {
130 let mut new = Self::new(self.is_aggregated());
131 std::mem::swap(self, &mut new);
132 new
133 }
134
135 fn push_item(&mut self, mut item: RemoteWriteMetric) {
136 self.finalizers
137 .merge(item.metric.metadata_mut().take_finalizers());
138 self.events_byte_size += item.size_of();
139 self.events_json_byte_size
140 .add_event(&item.metric, item.estimated_json_encoded_size_of());
141 self.events.insert_update(item.metric);
142 }
143}
144
145pub(super) struct RemoteWriteSink<S> {
146 pub(super) tenant_id: Option<Template>,
147 pub(super) batch_settings: BatcherSettings,
148 pub(super) aggregate: bool,
149 pub(super) compression: super::Compression,
150 pub(super) default_namespace: Option<String>,
151 pub(super) buckets: Vec<f64>,
152 pub(super) quantiles: Vec<f64>,
153 pub(super) expire_metrics_secs: Option<f64>,
154 pub(super) service: S,
155}
156
157impl<S> RemoteWriteSink<S>
158where
159 S: Service<RemoteWriteRequest> + Send + 'static,
160 S::Future: Send + 'static,
161 S::Response: DriverResponse + Send + 'static,
162 S::Error: fmt::Debug + Into<crate::Error> + Send,
163{
164 async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
165 let request_builder = RemoteWriteRequestBuilder {
166 compression: self.compression,
167 encoder: RemoteWriteEncoder {
168 default_namespace: self.default_namespace.clone(),
169 buckets: self.buckets.clone(),
170 quantiles: self.quantiles.clone(),
171 },
172 };
173
174 let batch_settings = self.batch_settings;
175 let tenant_id = self.tenant_id.clone();
176 let service = self.service;
177 let expire_metrics_secs = self.expire_metrics_secs;
178
179 input
180 .filter_map(|event| future::ready(event.try_into_metric()))
181 .normalized_with_ttl::<PrometheusMetricNormalize>(expire_metrics_secs)
182 .filter_map(move |event| {
183 future::ready(make_remote_write_event(tenant_id.as_ref(), event))
184 })
185 .batched_partitioned(PrometheusTenantIdPartitioner, || {
186 batch_settings
187 .as_reducer_config(ByteSizeOfItemSize, EventCollection::new(self.aggregate))
188 })
189 .request_builder(default_request_builder_concurrency_limit(), request_builder)
190 .filter_map(|request| async move {
191 match request {
192 Err(e) => {
193 error!("Failed to build Remote Write request: {:?}.", e);
194 None
195 }
196 Ok(req) => Some(req),
197 }
198 })
199 .into_driver(service)
200 .run()
201 .await
202 }
203}
204
205#[async_trait]
206impl<S> StreamSink<Event> for RemoteWriteSink<S>
207where
208 S: Service<RemoteWriteRequest> + Send + 'static,
209 S::Future: Send + 'static,
210 S::Response: DriverResponse + Send + 'static,
211 S::Error: fmt::Debug + Into<crate::Error> + Send,
212{
213 async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
214 self.run_inner(input).await
215 }
216}
217
218fn make_remote_write_event(
219 tenant_id: Option<&Template>,
220 metric: Metric,
221) -> Option<RemoteWriteMetric> {
222 let tenant_id = tenant_id.and_then(|template| {
223 template
224 .render_string(&metric)
225 .map_err(|error| {
226 emit!(TemplateRenderingError {
227 error,
228 field: Some("tenant_id"),
229 drop_event: true,
230 })
231 })
232 .ok()
233 });
234
235 Some(RemoteWriteMetric { metric, tenant_id })
236}