vector/sinks/gcp/stackdriver/metrics/
request_builder.rs1use std::io;
2
3use bytes::Bytes;
4use chrono::Utc;
5use vector_lib::event::{Metric, MetricValue};
6
7use crate::sinks::{gcp, prelude::*, util::http::HttpRequest};
8
9#[derive(Clone, Debug)]
10pub(super) struct StackdriverMetricsRequestBuilder {
11 pub(super) encoder: StackdriverMetricsEncoder,
12}
13
14impl RequestBuilder<Vec<Metric>> for StackdriverMetricsRequestBuilder {
15 type Metadata = EventFinalizers;
16 type Events = Vec<Metric>;
17 type Encoder = StackdriverMetricsEncoder;
18 type Payload = Bytes;
19 type Request = HttpRequest<()>;
20 type Error = io::Error;
21
22 fn compression(&self) -> Compression {
23 Compression::None
24 }
25
26 fn encoder(&self) -> &Self::Encoder {
27 &self.encoder
28 }
29
30 fn split_input(
31 &self,
32 mut events: Vec<Metric>,
33 ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
34 let finalizers = events.take_finalizers();
35 let builder = RequestMetadataBuilder::from_events(&events);
36 (finalizers, builder, events)
37 }
38
39 fn build_request(
40 &self,
41 metadata: Self::Metadata,
42 request_metadata: RequestMetadata,
43 payload: EncodeResult<Self::Payload>,
44 ) -> Self::Request {
45 HttpRequest::new(payload.into_payload(), metadata, request_metadata, ())
46 }
47}
48
49#[derive(Clone, Debug)]
50pub struct StackdriverMetricsEncoder {
51 pub(super) default_namespace: String,
52 pub(super) started: chrono::DateTime<Utc>,
53 pub(super) resource: gcp::GcpTypedResource,
54}
55
56impl encoding::Encoder<Vec<Metric>> for StackdriverMetricsEncoder {
57 fn encode_input(
61 &self,
62 input: Vec<Metric>,
63 writer: &mut dyn io::Write,
64 ) -> io::Result<(usize, GroupedCountByteSize)> {
65 let mut byte_size = telemetry().create_request_count_byte_size();
66 let time_series = input
67 .into_iter()
68 .map(|metric| {
69 byte_size.add_event(&metric, metric.estimated_json_encoded_size_of());
70
71 let (series, data, _metadata) = metric.into_parts();
72 let namespace = series
73 .name
74 .namespace
75 .unwrap_or_else(|| self.default_namespace.clone());
76 let metric_type = format!(
77 "custom.googleapis.com/{}/metrics/{}",
78 namespace, series.name.name
79 );
80
81 let end_time = data.time.timestamp.unwrap_or_else(chrono::Utc::now);
82
83 let (point_value, interval, metric_kind) = match &data.value {
84 MetricValue::Counter { value } => {
85 let interval = gcp::GcpInterval {
86 start_time: Some(self.started),
87 end_time,
88 };
89
90 (*value, interval, gcp::GcpMetricKind::Cumulative)
91 }
92 MetricValue::Gauge { value } => {
93 let interval = gcp::GcpInterval {
94 start_time: None,
95 end_time,
96 };
97
98 (*value, interval, gcp::GcpMetricKind::Gauge)
99 }
100 _ => {
101 unreachable!("sink has filtered out all metrics that aren't counter or gauge by this point")
102 },
103 };
104 let metric_labels = series
105 .tags
106 .unwrap_or_default()
107 .into_iter_single()
108 .collect::<std::collections::HashMap<_, _>>();
109
110 gcp::GcpSerie {
111 metric: gcp::GcpMetric {
112 r#type: metric_type,
113 labels: metric_labels,
114 },
115 resource: gcp::GcpResource {
116 r#type: self.resource.r#type.clone(),
117 labels: self.resource.labels.clone(),
118 },
119 metric_kind,
120 value_type: gcp::GcpValueType::Int64,
121 points: vec![gcp::GcpPoint {
122 interval,
123 value: gcp::GcpPointValue {
124 int64_value: Some(point_value as i64),
125 },
126 }],
127 }
128 })
129 .collect::<Vec<_>>();
130
131 let series = gcp::GcpSeries {
132 time_series: &time_series,
133 };
134
135 let body = crate::serde::json::to_bytes(&series).unwrap().freeze();
136 writer.write_all(&body).map(|()| (body.len(), byte_size))
137 }
138}