vector/sinks/gcp/
mod.rs

1use std::collections::HashMap;
2
3use serde::{Deserialize, Serialize};
4use vector_lib::configurable::configurable_component;
5
6pub mod cloud_storage;
7pub mod pubsub;
8pub mod stackdriver;
9
10/// A monitored resource.
11///
12/// Monitored resources in GCP allow associating logs and metrics specifically with native resources
13/// within Google Cloud Platform. This takes the form of a "type" field which identifies the
14/// resource, and a set of type-specific labels to uniquely identify a resource of that type.
15///
16/// See [Monitored resource types][mon_docs] for more information.
17///
18/// [mon_docs]: https://cloud.google.com/monitoring/api/resources
19#[configurable_component]
20#[derive(Clone, Debug, Default)]
21pub struct GcpTypedResource {
22    /// The monitored resource type.
23    ///
24    /// For example, the type of a Compute Engine VM instance is `gce_instance`.
25    #[configurable(metadata(docs::examples = "global", docs::examples = "gce_instance"))]
26    pub r#type: String,
27
28    /// Type-specific labels.
29    #[serde(flatten)]
30    #[configurable(metadata(
31        docs::additional_props_description = "Values for all of the labels listed in the associated monitored resource descriptor.\n\nFor example, Compute Engine VM instances use the labels `projectId`, `instanceId`, and `zone`."
32    ))]
33    #[configurable(metadata(docs::examples = "label_examples()"))]
34    pub labels: HashMap<String, String>,
35}
36
37fn label_examples() -> HashMap<String, String> {
38    let mut example = HashMap::new();
39    example.insert("type".to_string(), "global".to_string());
40    example.insert("projectId".to_string(), "vector-123456".to_string());
41    example.insert("instanceId".to_string(), "Twilight".to_string());
42    example.insert("zone".to_string(), "us-central1-a".to_string());
43
44    example
45}
46
47#[derive(Deserialize, Serialize, Debug, Clone, Copy)]
48#[serde(rename_all = "UPPERCASE")]
49pub enum GcpMetricKind {
50    Cumulative,
51    Gauge,
52}
53
54#[derive(Serialize, Debug, Clone, Copy)]
55#[serde(rename_all = "UPPERCASE")]
56pub enum GcpValueType {
57    Int64,
58}
59
60#[derive(Serialize, Debug, Clone, Copy)]
61pub struct GcpPoint {
62    pub interval: GcpInterval,
63    pub value: GcpPointValue,
64}
65
66#[derive(Serialize, Debug, Clone, Copy)]
67#[serde(rename_all = "camelCase")]
68pub struct GcpInterval {
69    #[serde(
70        skip_serializing_if = "Option::is_none",
71        serialize_with = "serialize_optional_datetime"
72    )]
73    pub start_time: Option<chrono::DateTime<chrono::Utc>>,
74    #[serde(serialize_with = "serialize_datetime")]
75    pub end_time: chrono::DateTime<chrono::Utc>,
76}
77
78#[derive(Serialize, Debug, Clone, Copy)]
79#[serde(rename_all = "camelCase")]
80pub struct GcpPointValue {
81    #[serde(
82        skip_serializing_if = "Option::is_none",
83        serialize_with = "serialize_int64_value"
84    )]
85    pub int64_value: Option<i64>,
86}
87
88#[derive(Serialize, Debug, Clone)]
89#[serde(rename_all = "camelCase")]
90pub struct GcpMetric {
91    pub r#type: String,
92    pub labels: HashMap<String, String>,
93}
94
95#[derive(Serialize, Debug, Clone)]
96#[serde(rename_all = "camelCase")]
97pub struct GcpResource {
98    pub r#type: String,
99    pub labels: HashMap<String, String>,
100}
101
102#[derive(Serialize, Debug, Clone)]
103#[serde(rename_all = "camelCase")]
104pub struct GcpSerie {
105    pub metric: GcpMetric,
106    pub resource: GcpResource,
107    pub metric_kind: GcpMetricKind,
108    pub value_type: GcpValueType,
109    pub points: Vec<GcpPoint>,
110}
111
112#[derive(Serialize, Debug, Clone)]
113#[serde(rename_all = "camelCase")]
114pub struct GcpSeries<'a> {
115    time_series: &'a [GcpSerie],
116}
117
118fn serialize_int64_value<S>(value: &Option<i64>, serializer: S) -> Result<S::Ok, S::Error>
119where
120    S: serde::Serializer,
121{
122    serializer.serialize_str(value.as_ref().expect("always defined").to_string().as_str())
123}
124
125fn serialize_datetime<S>(
126    value: &chrono::DateTime<chrono::Utc>,
127    serializer: S,
128) -> Result<S::Ok, S::Error>
129where
130    S: serde::Serializer,
131{
132    serializer.serialize_str(
133        value
134            .to_rfc3339_opts(chrono::SecondsFormat::Nanos, true)
135            .as_str(),
136    )
137}
138
139fn serialize_optional_datetime<S>(
140    value: &Option<chrono::DateTime<chrono::Utc>>,
141    serializer: S,
142) -> Result<S::Ok, S::Error>
143where
144    S: serde::Serializer,
145{
146    serialize_datetime(value.as_ref().expect("always defined"), serializer)
147}
148
149#[cfg(test)]
150mod tests {
151    use super::*;
152    use chrono::TimeZone;
153
154    /// Ensures that serialized `GcpSeries` matches the format that GCP expects (https://cloud.google.com/monitoring/api/ref_v3/rest/v3/TimeSeries).
155    #[test]
156    fn serialize_gcp_series() {
157        let end_time = chrono::Utc
158            .with_ymd_and_hms(2023, 2, 14, 10, 0, 0)
159            .single()
160            .expect("invalid timestamp");
161        let gcp_series = GcpSeries {
162            time_series: &[GcpSerie {
163                metric: GcpMetric {
164                    r#type: "custom.googleapis.com/my_namespace/metrics/my_metric".to_string(),
165                    labels: [(
166                        "my_metric_label".to_string(),
167                        "my_metric_label_value".to_string(),
168                    )]
169                    .into(),
170                },
171                resource: GcpResource {
172                    r#type: "my_resource".to_string(),
173                    labels: [(
174                        "my_resource_label".to_string(),
175                        "my_resource_label_value".to_string(),
176                    )]
177                    .into(),
178                },
179                metric_kind: GcpMetricKind::Gauge,
180                value_type: GcpValueType::Int64,
181                points: vec![GcpPoint {
182                    interval: GcpInterval {
183                        start_time: None,
184                        end_time,
185                    },
186                    value: GcpPointValue {
187                        int64_value: Some(10),
188                    },
189                }],
190            }],
191        };
192
193        let serialized = serde_json::to_string(&gcp_series).unwrap();
194
195        // Convert to `serde_json::Value` so that field order does not matter.
196        let value: serde_json::Value = serde_json::from_str(&serialized).unwrap();
197        let expected: serde_json::Value = serde_json::from_str(r#"{"timeSeries":[{"metric":{"type":"custom.googleapis.com/my_namespace/metrics/my_metric","labels":{"my_metric_label":"my_metric_label_value"}},"resource":{"type":"my_resource","labels":{"my_resource_label":"my_resource_label_value"}},"metricKind":"GAUGE","valueType": "INT64","points":[{"interval":{"endTime":"2023-02-14T10:00:00.000000000Z"},"value":{"int64Value":"10"}}]}]}"#).unwrap();
198
199        assert_eq!(value, expected);
200    }
201}