vector/sources/
static_metrics.rs

1use std::{collections::BTreeMap, num::NonZeroU32, time::Duration};
2
3use chrono::Utc;
4use futures::StreamExt;
5use serde_with::serde_as;
6use tokio::time;
7use tokio_stream::wrappers::IntervalStream;
8use vector_lib::{
9    ByteSizeOf, EstimatedJsonEncodedSizeOf,
10    config::LogNamespace,
11    configurable::configurable_component,
12    internal_event::{ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol},
13};
14
15use crate::{
16    SourceSender,
17    config::{SourceConfig, SourceContext, SourceOutput},
18    event::{
19        EventMetadata, Metric, MetricKind,
20        metric::{MetricData, MetricName, MetricSeries, MetricTime, MetricValue},
21    },
22    internal_events::{EventsReceived, StreamClosedError},
23    shutdown::ShutdownSignal,
24};
25
26/// Configuration for the `static_metrics` source.
27#[serde_as]
28#[configurable_component(source(
29    "static_metrics",
30    "Produce static metrics defined in configuration."
31))]
32#[derive(Clone, Debug)]
33#[serde(deny_unknown_fields)]
34pub struct StaticMetricsConfig {
35    /// The interval between metric emitting, in seconds.
36    #[serde_as(as = "serde_with::DurationSecondsWithFrac<f64>")]
37    #[serde(default = "default_interval")]
38    #[configurable(metadata(docs::human_name = "Emitting interval"))]
39    pub interval_secs: Duration,
40
41    /// Overrides the default namespace for the metrics emitted by the source.
42    #[serde(default = "default_namespace")]
43    pub namespace: String,
44
45    #[configurable(derived)]
46    #[serde(default)]
47    pub metrics: Vec<StaticMetricConfig>,
48}
49
50impl Default for StaticMetricsConfig {
51    fn default() -> Self {
52        Self {
53            interval_secs: default_interval(),
54            metrics: Vec::default(),
55            namespace: default_namespace(),
56        }
57    }
58}
59
60/// Tag configuration for the `internal_metrics` source.
61#[configurable_component]
62#[derive(Clone, Debug)]
63#[serde(deny_unknown_fields)]
64pub struct StaticMetricConfig {
65    /// Name of the static metric
66    pub name: String,
67
68    /// "Observed" value of the static metric
69    pub value: MetricValue,
70
71    /// Kind of the static metric - either absolute or incremental
72    pub kind: MetricKind,
73
74    /// Key-value pairs representing tags and their values to add to the metric.
75    #[configurable(metadata(
76        docs::additional_props_description = "An individual tag - value pair."
77    ))]
78    pub tags: BTreeMap<String, String>,
79}
80
81fn default_interval() -> Duration {
82    Duration::from_secs_f64(1.0)
83}
84
85fn default_namespace() -> String {
86    "static".to_owned()
87}
88
89impl_generate_config_from_default!(StaticMetricsConfig);
90
91#[async_trait::async_trait]
92#[typetag::serde(name = "static_metrics")]
93impl SourceConfig for StaticMetricsConfig {
94    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
95        if self.interval_secs.is_zero() {
96            warn!(
97                "Interval set to 0 secs, this could result in high CPU utilization. It is suggested to use interval >= 1 secs.",
98            );
99        }
100        let interval = self.interval_secs;
101
102        let namespace = self.namespace.clone();
103
104        let metrics = self.metrics.clone();
105
106        Ok(Box::pin(
107            StaticMetrics {
108                namespace,
109                metrics,
110                interval,
111                out: cx.out,
112                shutdown: cx.shutdown,
113            }
114            .run(),
115        ))
116    }
117
118    fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
119        vec![SourceOutput::new_metrics()]
120    }
121
122    fn can_acknowledge(&self) -> bool {
123        false
124    }
125}
126
127struct StaticMetrics {
128    namespace: String,
129    metrics: Vec<StaticMetricConfig>,
130    interval: time::Duration,
131    out: SourceSender,
132    shutdown: ShutdownSignal,
133}
134
135impl StaticMetrics {
136    async fn run(mut self) -> Result<(), ()> {
137        let events_received = register!(EventsReceived);
138        let bytes_received = register!(BytesReceived::from(Protocol::STATIC));
139        let mut interval =
140            IntervalStream::new(time::interval(self.interval)).take_until(self.shutdown);
141
142        // Prepare metrics, since they are static and won't change
143        let metrics: Vec<Metric> = self
144            .metrics
145            .into_iter()
146            .map(
147                |StaticMetricConfig {
148                     name,
149                     value,
150                     kind,
151                     tags,
152                 }| {
153                    Metric::from_parts(
154                        MetricSeries {
155                            name: MetricName {
156                                name,
157                                namespace: Some(self.namespace.clone()),
158                            },
159                            tags: Some(tags.into()),
160                        },
161                        MetricData {
162                            time: MetricTime {
163                                timestamp: None,
164                                interval_ms: NonZeroU32::new(self.interval.as_millis() as u32),
165                            },
166                            kind,
167                            value: value.clone(),
168                        },
169                        EventMetadata::default(),
170                    )
171                },
172            )
173            .collect();
174
175        while interval.next().await.is_some() {
176            let count = metrics.len();
177            let byte_size = metrics.size_of();
178            let json_size = metrics.estimated_json_encoded_size_of();
179
180            bytes_received.emit(ByteSize(byte_size));
181            events_received.emit(CountByteSize(count, json_size));
182
183            let batch = metrics
184                .clone()
185                .into_iter()
186                .map(|metric| metric.with_timestamp(Some(Utc::now())));
187
188            if (self.out.send_batch(batch).await).is_err() {
189                emit!(StreamClosedError { count });
190                return Err(());
191            }
192        }
193
194        Ok(())
195    }
196}
197
198#[cfg(test)]
199mod tests {
200    use super::*;
201    use crate::{
202        event::Event,
203        test_util::{
204            self,
205            components::{SOURCE_TAGS, run_and_assert_source_compliance},
206        },
207    };
208
209    #[test]
210    fn generate_config() {
211        test_util::test_generate_config::<StaticMetricsConfig>();
212    }
213
214    async fn events_from_config(config: StaticMetricsConfig) -> Vec<Event> {
215        run_and_assert_source_compliance(config, time::Duration::from_millis(100), &SOURCE_TAGS)
216            .await
217    }
218
219    fn default_metric() -> StaticMetricConfig {
220        StaticMetricConfig {
221            name: "".to_string(),
222            value: MetricValue::Gauge { value: 0.0 },
223            kind: MetricKind::Absolute,
224            tags: BTreeMap::default(),
225        }
226    }
227
228    #[tokio::test]
229    async fn default_empty() {
230        let events = events_from_config(StaticMetricsConfig::default()).await;
231
232        assert!(events.is_empty());
233    }
234
235    #[tokio::test]
236    async fn default_namespace() {
237        let mut events = events_from_config(StaticMetricsConfig {
238            metrics: vec![default_metric()],
239            ..Default::default()
240        })
241        .await;
242
243        assert!(!events.is_empty());
244        let event = events.remove(0);
245        assert_eq!(event.as_metric().namespace(), Some("static"));
246    }
247
248    #[tokio::test]
249    async fn default_namespace_multiple_events() {
250        let mut events = events_from_config(StaticMetricsConfig {
251            metrics: vec![default_metric(), default_metric()],
252            ..Default::default()
253        })
254        .await;
255
256        assert!(!events.is_empty());
257        let event = events.remove(0);
258        assert_eq!(event.as_metric().namespace(), Some("static"));
259        let event = events.remove(0);
260        assert_eq!(event.as_metric().namespace(), Some("static"));
261    }
262
263    #[tokio::test]
264    async fn namespace() {
265        let namespace = "totally_custom";
266
267        let config = StaticMetricsConfig {
268            namespace: namespace.to_owned(),
269            metrics: vec![default_metric()],
270            ..StaticMetricsConfig::default()
271        };
272
273        let mut events = events_from_config(config).await;
274        assert!(!events.is_empty());
275        let event = events.remove(0);
276
277        assert_eq!(event.as_metric().namespace(), Some(namespace));
278    }
279
280    #[tokio::test]
281    async fn sets_custom_tags() {
282        let mut events = events_from_config(StaticMetricsConfig {
283            metrics: vec![StaticMetricConfig {
284                name: "test".to_string(),
285                value: MetricValue::Gauge { value: 2.3 },
286                kind: MetricKind::Absolute,
287                tags: BTreeMap::from([("custom_tag".to_string(), "custom_tag_value".to_string())]),
288            }],
289            ..Default::default()
290        })
291        .await;
292
293        assert!(!events.is_empty());
294        let event = events.remove(0);
295        let metric = event.as_metric();
296
297        assert_eq!(metric.name(), "test");
298        assert!(matches!(metric.value(), MetricValue::Gauge { value: 2.3 }));
299        assert_eq!(
300            metric.tag_value("custom_tag"),
301            Some("custom_tag_value".to_string())
302        );
303    }
304}