vector/sinks/gcp/stackdriver/metrics/
request_builder.rs

1use std::io;
2
3use bytes::Bytes;
4use chrono::Utc;
5use vector_lib::event::{Metric, MetricValue};
6
7use crate::sinks::{gcp, prelude::*, util::http::HttpRequest};
8
9#[derive(Clone, Debug)]
10pub(super) struct StackdriverMetricsRequestBuilder {
11    pub(super) encoder: StackdriverMetricsEncoder,
12}
13
14impl RequestBuilder<Vec<Metric>> for StackdriverMetricsRequestBuilder {
15    type Metadata = EventFinalizers;
16    type Events = Vec<Metric>;
17    type Encoder = StackdriverMetricsEncoder;
18    type Payload = Bytes;
19    type Request = HttpRequest<()>;
20    type Error = io::Error;
21
22    fn compression(&self) -> Compression {
23        Compression::None
24    }
25
26    fn encoder(&self) -> &Self::Encoder {
27        &self.encoder
28    }
29
30    fn split_input(
31        &self,
32        mut events: Vec<Metric>,
33    ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
34        let finalizers = events.take_finalizers();
35        let builder = RequestMetadataBuilder::from_events(&events);
36        (finalizers, builder, events)
37    }
38
39    fn build_request(
40        &self,
41        metadata: Self::Metadata,
42        request_metadata: RequestMetadata,
43        payload: EncodeResult<Self::Payload>,
44    ) -> Self::Request {
45        HttpRequest::new(payload.into_payload(), metadata, request_metadata, ())
46    }
47}
48
49#[derive(Clone, Debug)]
50pub struct StackdriverMetricsEncoder {
51    pub(super) default_namespace: String,
52    pub(super) started: chrono::DateTime<Utc>,
53    pub(super) resource: gcp::GcpTypedResource,
54}
55
56impl encoding::Encoder<Vec<Metric>> for StackdriverMetricsEncoder {
57    /// Create the object defined [here][api_docs].
58    ///
59    /// [api_docs]: https://cloud.google.com/monitoring/api/ref_v3/rest/v3/projects.timeSeries/create
60    fn encode_input(
61        &self,
62        input: Vec<Metric>,
63        writer: &mut dyn io::Write,
64    ) -> io::Result<(usize, GroupedCountByteSize)> {
65        let mut byte_size = telemetry().create_request_count_byte_size();
66        let time_series = input
67            .into_iter()
68            .map(|metric| {
69                byte_size.add_event(&metric, metric.estimated_json_encoded_size_of());
70
71                let (series, data, _metadata) = metric.into_parts();
72                let namespace = series
73                    .name
74                    .namespace
75                    .unwrap_or_else(|| self.default_namespace.clone());
76                let metric_type = format!(
77                    "custom.googleapis.com/{}/metrics/{}",
78                    namespace, series.name.name
79                );
80
81                let end_time = data.time.timestamp.unwrap_or_else(chrono::Utc::now);
82
83                let (point_value, interval, metric_kind) = match &data.value {
84                    MetricValue::Counter { value } => {
85                        let interval = gcp::GcpInterval {
86                            start_time: Some(self.started),
87                            end_time,
88                        };
89
90                        (*value, interval, gcp::GcpMetricKind::Cumulative)
91                    }
92                    MetricValue::Gauge { value } => {
93                        let interval = gcp::GcpInterval {
94                            start_time: None,
95                            end_time,
96                        };
97
98                        (*value, interval, gcp::GcpMetricKind::Gauge)
99                    }
100                    _ => {
101                        unreachable!("sink has filtered out all metrics that aren't counter or gauge by this point")
102                    },
103                };
104                let metric_labels = series
105                    .tags
106                    .unwrap_or_default()
107                    .into_iter_single()
108                    .collect::<std::collections::HashMap<_, _>>();
109
110                gcp::GcpSerie {
111                    metric: gcp::GcpMetric {
112                        r#type: metric_type,
113                        labels: metric_labels,
114                    },
115                    resource: gcp::GcpResource {
116                        r#type: self.resource.r#type.clone(),
117                        labels: self.resource.labels.clone(),
118                    },
119                    metric_kind,
120                    value_type: gcp::GcpValueType::Int64,
121                    points: vec![gcp::GcpPoint {
122                        interval,
123                        value: gcp::GcpPointValue {
124                            int64_value: Some(point_value as i64),
125                        },
126                    }],
127                }
128            })
129            .collect::<Vec<_>>();
130
131        let series = gcp::GcpSeries {
132            time_series: &time_series,
133        };
134
135        let body = crate::serde::json::to_bytes(&series).unwrap().freeze();
136        writer.write_all(&body).map(|()| (body.len(), byte_size))
137    }
138}