vector/sources/
static_metrics.rs

1use std::collections::BTreeMap;
2use std::num::NonZeroU32;
3use std::time::Duration;
4
5use chrono::Utc;
6use futures::StreamExt;
7use serde_with::serde_as;
8use tokio::time;
9use tokio_stream::wrappers::IntervalStream;
10use vector_lib::configurable::configurable_component;
11use vector_lib::internal_event::{
12    ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol,
13};
14use vector_lib::{config::LogNamespace, ByteSizeOf, EstimatedJsonEncodedSizeOf};
15
16use crate::{
17    config::{SourceConfig, SourceContext, SourceOutput},
18    event::{
19        metric::{MetricData, MetricName, MetricSeries, MetricTime, MetricValue},
20        EventMetadata, Metric, MetricKind,
21    },
22    internal_events::{EventsReceived, StreamClosedError},
23    shutdown::ShutdownSignal,
24    SourceSender,
25};
26
27/// Configuration for the `static_metrics` source.
28#[serde_as]
29#[configurable_component(source(
30    "static_metrics",
31    "Produce static metrics defined in configuration."
32))]
33#[derive(Clone, Debug)]
34#[serde(deny_unknown_fields)]
35pub struct StaticMetricsConfig {
36    /// The interval between metric emitting, in seconds.
37    #[serde_as(as = "serde_with::DurationSecondsWithFrac<f64>")]
38    #[serde(default = "default_interval")]
39    #[configurable(metadata(docs::human_name = "Emitting interval"))]
40    pub interval_secs: Duration,
41
42    /// Overrides the default namespace for the metrics emitted by the source.
43    #[serde(default = "default_namespace")]
44    pub namespace: String,
45
46    #[configurable(derived)]
47    #[serde(default)]
48    pub metrics: Vec<StaticMetricConfig>,
49}
50
51impl Default for StaticMetricsConfig {
52    fn default() -> Self {
53        Self {
54            interval_secs: default_interval(),
55            metrics: Vec::default(),
56            namespace: default_namespace(),
57        }
58    }
59}
60
61/// Tag configuration for the `internal_metrics` source.
62#[configurable_component]
63#[derive(Clone, Debug)]
64#[serde(deny_unknown_fields)]
65pub struct StaticMetricConfig {
66    /// Name of the static metric
67    pub name: String,
68
69    /// "Observed" value of the static metric
70    pub value: MetricValue,
71
72    /// Kind of the static metric - either absolute or incremental
73    pub kind: MetricKind,
74
75    /// Key-value pairs representing tags and their values to add to the metric.
76    #[configurable(metadata(
77        docs::additional_props_description = "An individual tag - value pair."
78    ))]
79    pub tags: BTreeMap<String, String>,
80}
81
82fn default_interval() -> Duration {
83    Duration::from_secs_f64(1.0)
84}
85
86fn default_namespace() -> String {
87    "static".to_owned()
88}
89
90impl_generate_config_from_default!(StaticMetricsConfig);
91
92#[async_trait::async_trait]
93#[typetag::serde(name = "static_metrics")]
94impl SourceConfig for StaticMetricsConfig {
95    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
96        if self.interval_secs.is_zero() {
97            warn!(
98                "Interval set to 0 secs, this could result in high CPU utilization. It is suggested to use interval >= 1 secs.",
99            );
100        }
101        let interval = self.interval_secs;
102
103        let namespace = self.namespace.clone();
104
105        let metrics = self.metrics.clone();
106
107        Ok(Box::pin(
108            StaticMetrics {
109                namespace,
110                metrics,
111                interval,
112                out: cx.out,
113                shutdown: cx.shutdown,
114            }
115            .run(),
116        ))
117    }
118
119    fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
120        vec![SourceOutput::new_metrics()]
121    }
122
123    fn can_acknowledge(&self) -> bool {
124        false
125    }
126}
127
128struct StaticMetrics {
129    namespace: String,
130    metrics: Vec<StaticMetricConfig>,
131    interval: time::Duration,
132    out: SourceSender,
133    shutdown: ShutdownSignal,
134}
135
136impl StaticMetrics {
137    async fn run(mut self) -> Result<(), ()> {
138        let events_received = register!(EventsReceived);
139        let bytes_received = register!(BytesReceived::from(Protocol::STATIC));
140        let mut interval =
141            IntervalStream::new(time::interval(self.interval)).take_until(self.shutdown);
142
143        // Prepare metrics, since they are static and won't change
144        let metrics: Vec<Metric> = self
145            .metrics
146            .into_iter()
147            .map(
148                |StaticMetricConfig {
149                     name,
150                     value,
151                     kind,
152                     tags,
153                 }| {
154                    Metric::from_parts(
155                        MetricSeries {
156                            name: MetricName {
157                                name,
158                                namespace: Some(self.namespace.clone()),
159                            },
160                            tags: Some(tags.into()),
161                        },
162                        MetricData {
163                            time: MetricTime {
164                                timestamp: None,
165                                interval_ms: NonZeroU32::new(self.interval.as_millis() as u32),
166                            },
167                            kind,
168                            value: value.clone(),
169                        },
170                        EventMetadata::default(),
171                    )
172                },
173            )
174            .collect();
175
176        while interval.next().await.is_some() {
177            let count = metrics.len();
178            let byte_size = metrics.size_of();
179            let json_size = metrics.estimated_json_encoded_size_of();
180
181            bytes_received.emit(ByteSize(byte_size));
182            events_received.emit(CountByteSize(count, json_size));
183
184            let batch = metrics
185                .clone()
186                .into_iter()
187                .map(|metric| metric.with_timestamp(Some(Utc::now())));
188
189            if (self.out.send_batch(batch).await).is_err() {
190                emit!(StreamClosedError { count });
191                return Err(());
192            }
193        }
194
195        Ok(())
196    }
197}
198
199#[cfg(test)]
200mod tests {
201    use super::*;
202    use crate::{
203        event::Event,
204        test_util::{
205            self,
206            components::{run_and_assert_source_compliance, SOURCE_TAGS},
207        },
208    };
209
210    #[test]
211    fn generate_config() {
212        test_util::test_generate_config::<StaticMetricsConfig>();
213    }
214
215    async fn events_from_config(config: StaticMetricsConfig) -> Vec<Event> {
216        run_and_assert_source_compliance(config, time::Duration::from_millis(100), &SOURCE_TAGS)
217            .await
218    }
219
220    fn default_metric() -> StaticMetricConfig {
221        StaticMetricConfig {
222            name: "".to_string(),
223            value: MetricValue::Gauge { value: 0.0 },
224            kind: MetricKind::Absolute,
225            tags: BTreeMap::default(),
226        }
227    }
228
229    #[tokio::test]
230    async fn default_empty() {
231        let events = events_from_config(StaticMetricsConfig::default()).await;
232
233        assert!(events.is_empty());
234    }
235
236    #[tokio::test]
237    async fn default_namespace() {
238        let mut events = events_from_config(StaticMetricsConfig {
239            metrics: vec![default_metric()],
240            ..Default::default()
241        })
242        .await;
243
244        assert!(!events.is_empty());
245        let event = events.remove(0);
246        assert_eq!(event.as_metric().namespace(), Some("static"));
247    }
248
249    #[tokio::test]
250    async fn default_namespace_multiple_events() {
251        let mut events = events_from_config(StaticMetricsConfig {
252            metrics: vec![default_metric(), default_metric()],
253            ..Default::default()
254        })
255        .await;
256
257        assert!(!events.is_empty());
258        let event = events.remove(0);
259        assert_eq!(event.as_metric().namespace(), Some("static"));
260        let event = events.remove(0);
261        assert_eq!(event.as_metric().namespace(), Some("static"));
262    }
263
264    #[tokio::test]
265    async fn namespace() {
266        let namespace = "totally_custom";
267
268        let config = StaticMetricsConfig {
269            namespace: namespace.to_owned(),
270            metrics: vec![default_metric()],
271            ..StaticMetricsConfig::default()
272        };
273
274        let mut events = events_from_config(config).await;
275        assert!(!events.is_empty());
276        let event = events.remove(0);
277
278        assert_eq!(event.as_metric().namespace(), Some(namespace));
279    }
280
281    #[tokio::test]
282    async fn sets_custom_tags() {
283        let mut events = events_from_config(StaticMetricsConfig {
284            metrics: vec![StaticMetricConfig {
285                name: "test".to_string(),
286                value: MetricValue::Gauge { value: 2.3 },
287                kind: MetricKind::Absolute,
288                tags: BTreeMap::from([("custom_tag".to_string(), "custom_tag_value".to_string())]),
289            }],
290            ..Default::default()
291        })
292        .await;
293
294        assert!(!events.is_empty());
295        let event = events.remove(0);
296        let metric = event.as_metric();
297
298        assert_eq!(metric.name(), "test");
299        assert!(matches!(metric.value(), MetricValue::Gauge { value: 2.3 }));
300        assert_eq!(
301            metric.tag_value("custom_tag"),
302            Some("custom_tag_value".to_string())
303        );
304    }
305}