1use std::{collections::BTreeMap, num::NonZeroU32, time::Duration};
2
3use chrono::Utc;
4use futures::StreamExt;
5use serde_with::serde_as;
6use tokio::time;
7use tokio_stream::wrappers::IntervalStream;
8use vector_lib::{
9 ByteSizeOf, EstimatedJsonEncodedSizeOf,
10 config::LogNamespace,
11 configurable::configurable_component,
12 internal_event::{ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol},
13};
14
15use crate::{
16 SourceSender,
17 config::{SourceConfig, SourceContext, SourceOutput},
18 event::{
19 EventMetadata, Metric, MetricKind,
20 metric::{MetricData, MetricName, MetricSeries, MetricTime, MetricValue},
21 },
22 internal_events::{EventsReceived, StreamClosedError},
23 shutdown::ShutdownSignal,
24};
25
26#[serde_as]
28#[configurable_component(source(
29 "static_metrics",
30 "Produce static metrics defined in configuration."
31))]
32#[derive(Clone, Debug)]
33#[serde(deny_unknown_fields)]
34pub struct StaticMetricsConfig {
35 #[serde_as(as = "serde_with::DurationSecondsWithFrac<f64>")]
37 #[serde(default = "default_interval")]
38 #[configurable(metadata(docs::human_name = "Emitting interval"))]
39 pub interval_secs: Duration,
40
41 #[serde(default = "default_namespace")]
43 pub namespace: String,
44
45 #[configurable(derived)]
46 #[serde(default)]
47 pub metrics: Vec<StaticMetricConfig>,
48}
49
50impl Default for StaticMetricsConfig {
51 fn default() -> Self {
52 Self {
53 interval_secs: default_interval(),
54 metrics: Vec::default(),
55 namespace: default_namespace(),
56 }
57 }
58}
59
60#[configurable_component]
62#[derive(Clone, Debug)]
63#[serde(deny_unknown_fields)]
64pub struct StaticMetricConfig {
65 pub name: String,
67
68 pub value: MetricValue,
70
71 pub kind: MetricKind,
73
74 #[configurable(metadata(
76 docs::additional_props_description = "An individual tag - value pair."
77 ))]
78 pub tags: BTreeMap<String, String>,
79}
80
81fn default_interval() -> Duration {
82 Duration::from_secs_f64(1.0)
83}
84
85fn default_namespace() -> String {
86 "static".to_owned()
87}
88
89impl_generate_config_from_default!(StaticMetricsConfig);
90
91#[async_trait::async_trait]
92#[typetag::serde(name = "static_metrics")]
93impl SourceConfig for StaticMetricsConfig {
94 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
95 if self.interval_secs.is_zero() {
96 warn!(
97 "Interval set to 0 secs, this could result in high CPU utilization. It is suggested to use interval >= 1 secs.",
98 );
99 }
100 let interval = self.interval_secs;
101
102 let namespace = self.namespace.clone();
103
104 let metrics = self.metrics.clone();
105
106 Ok(Box::pin(
107 StaticMetrics {
108 namespace,
109 metrics,
110 interval,
111 out: cx.out,
112 shutdown: cx.shutdown,
113 }
114 .run(),
115 ))
116 }
117
118 fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
119 vec![SourceOutput::new_metrics()]
120 }
121
122 fn can_acknowledge(&self) -> bool {
123 false
124 }
125}
126
127struct StaticMetrics {
128 namespace: String,
129 metrics: Vec<StaticMetricConfig>,
130 interval: time::Duration,
131 out: SourceSender,
132 shutdown: ShutdownSignal,
133}
134
135impl StaticMetrics {
136 async fn run(mut self) -> Result<(), ()> {
137 let events_received = register!(EventsReceived);
138 let bytes_received = register!(BytesReceived::from(Protocol::STATIC));
139 let mut interval =
140 IntervalStream::new(time::interval(self.interval)).take_until(self.shutdown);
141
142 let metrics: Vec<Metric> = self
144 .metrics
145 .into_iter()
146 .map(
147 |StaticMetricConfig {
148 name,
149 value,
150 kind,
151 tags,
152 }| {
153 Metric::from_parts(
154 MetricSeries {
155 name: MetricName {
156 name,
157 namespace: Some(self.namespace.clone()),
158 },
159 tags: Some(tags.into()),
160 },
161 MetricData {
162 time: MetricTime {
163 timestamp: None,
164 interval_ms: NonZeroU32::new(self.interval.as_millis() as u32),
165 },
166 kind,
167 value: value.clone(),
168 },
169 EventMetadata::default(),
170 )
171 },
172 )
173 .collect();
174
175 while interval.next().await.is_some() {
176 let count = metrics.len();
177 let byte_size = metrics.size_of();
178 let json_size = metrics.estimated_json_encoded_size_of();
179
180 bytes_received.emit(ByteSize(byte_size));
181 events_received.emit(CountByteSize(count, json_size));
182
183 let batch = metrics
184 .clone()
185 .into_iter()
186 .map(|metric| metric.with_timestamp(Some(Utc::now())));
187
188 if (self.out.send_batch(batch).await).is_err() {
189 emit!(StreamClosedError { count });
190 return Err(());
191 }
192 }
193
194 Ok(())
195 }
196}
197
198#[cfg(test)]
199mod tests {
200 use super::*;
201 use crate::{
202 event::Event,
203 test_util::{
204 self,
205 components::{SOURCE_TAGS, run_and_assert_source_compliance},
206 },
207 };
208
209 #[test]
210 fn generate_config() {
211 test_util::test_generate_config::<StaticMetricsConfig>();
212 }
213
214 async fn events_from_config(config: StaticMetricsConfig) -> Vec<Event> {
215 run_and_assert_source_compliance(config, time::Duration::from_millis(100), &SOURCE_TAGS)
216 .await
217 }
218
219 fn default_metric() -> StaticMetricConfig {
220 StaticMetricConfig {
221 name: "".to_string(),
222 value: MetricValue::Gauge { value: 0.0 },
223 kind: MetricKind::Absolute,
224 tags: BTreeMap::default(),
225 }
226 }
227
228 #[tokio::test]
229 async fn default_empty() {
230 let events = events_from_config(StaticMetricsConfig::default()).await;
231
232 assert!(events.is_empty());
233 }
234
235 #[tokio::test]
236 async fn default_namespace() {
237 let mut events = events_from_config(StaticMetricsConfig {
238 metrics: vec![default_metric()],
239 ..Default::default()
240 })
241 .await;
242
243 assert!(!events.is_empty());
244 let event = events.remove(0);
245 assert_eq!(event.as_metric().namespace(), Some("static"));
246 }
247
248 #[tokio::test]
249 async fn default_namespace_multiple_events() {
250 let mut events = events_from_config(StaticMetricsConfig {
251 metrics: vec![default_metric(), default_metric()],
252 ..Default::default()
253 })
254 .await;
255
256 assert!(!events.is_empty());
257 let event = events.remove(0);
258 assert_eq!(event.as_metric().namespace(), Some("static"));
259 let event = events.remove(0);
260 assert_eq!(event.as_metric().namespace(), Some("static"));
261 }
262
263 #[tokio::test]
264 async fn namespace() {
265 let namespace = "totally_custom";
266
267 let config = StaticMetricsConfig {
268 namespace: namespace.to_owned(),
269 metrics: vec![default_metric()],
270 ..StaticMetricsConfig::default()
271 };
272
273 let mut events = events_from_config(config).await;
274 assert!(!events.is_empty());
275 let event = events.remove(0);
276
277 assert_eq!(event.as_metric().namespace(), Some(namespace));
278 }
279
280 #[tokio::test]
281 async fn sets_custom_tags() {
282 let mut events = events_from_config(StaticMetricsConfig {
283 metrics: vec![StaticMetricConfig {
284 name: "test".to_string(),
285 value: MetricValue::Gauge { value: 2.3 },
286 kind: MetricKind::Absolute,
287 tags: BTreeMap::from([("custom_tag".to_string(), "custom_tag_value".to_string())]),
288 }],
289 ..Default::default()
290 })
291 .await;
292
293 assert!(!events.is_empty());
294 let event = events.remove(0);
295 let metric = event.as_metric();
296
297 assert_eq!(metric.name(), "test");
298 assert!(matches!(metric.value(), MetricValue::Gauge { value: 2.3 }));
299 assert_eq!(
300 metric.tag_value("custom_tag"),
301 Some("custom_tag_value".to_string())
302 );
303 }
304}