1use std::collections::BTreeMap;
2use std::num::NonZeroU32;
3use std::time::Duration;
4
5use chrono::Utc;
6use futures::StreamExt;
7use serde_with::serde_as;
8use tokio::time;
9use tokio_stream::wrappers::IntervalStream;
10use vector_lib::configurable::configurable_component;
11use vector_lib::internal_event::{
12 ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol,
13};
14use vector_lib::{config::LogNamespace, ByteSizeOf, EstimatedJsonEncodedSizeOf};
15
16use crate::{
17 config::{SourceConfig, SourceContext, SourceOutput},
18 event::{
19 metric::{MetricData, MetricName, MetricSeries, MetricTime, MetricValue},
20 EventMetadata, Metric, MetricKind,
21 },
22 internal_events::{EventsReceived, StreamClosedError},
23 shutdown::ShutdownSignal,
24 SourceSender,
25};
26
27#[serde_as]
29#[configurable_component(source(
30 "static_metrics",
31 "Produce static metrics defined in configuration."
32))]
33#[derive(Clone, Debug)]
34#[serde(deny_unknown_fields)]
35pub struct StaticMetricsConfig {
36 #[serde_as(as = "serde_with::DurationSecondsWithFrac<f64>")]
38 #[serde(default = "default_interval")]
39 #[configurable(metadata(docs::human_name = "Emitting interval"))]
40 pub interval_secs: Duration,
41
42 #[serde(default = "default_namespace")]
44 pub namespace: String,
45
46 #[configurable(derived)]
47 #[serde(default)]
48 pub metrics: Vec<StaticMetricConfig>,
49}
50
51impl Default for StaticMetricsConfig {
52 fn default() -> Self {
53 Self {
54 interval_secs: default_interval(),
55 metrics: Vec::default(),
56 namespace: default_namespace(),
57 }
58 }
59}
60
61#[configurable_component]
63#[derive(Clone, Debug)]
64#[serde(deny_unknown_fields)]
65pub struct StaticMetricConfig {
66 pub name: String,
68
69 pub value: MetricValue,
71
72 pub kind: MetricKind,
74
75 #[configurable(metadata(
77 docs::additional_props_description = "An individual tag - value pair."
78 ))]
79 pub tags: BTreeMap<String, String>,
80}
81
82fn default_interval() -> Duration {
83 Duration::from_secs_f64(1.0)
84}
85
86fn default_namespace() -> String {
87 "static".to_owned()
88}
89
90impl_generate_config_from_default!(StaticMetricsConfig);
91
92#[async_trait::async_trait]
93#[typetag::serde(name = "static_metrics")]
94impl SourceConfig for StaticMetricsConfig {
95 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
96 if self.interval_secs.is_zero() {
97 warn!(
98 "Interval set to 0 secs, this could result in high CPU utilization. It is suggested to use interval >= 1 secs.",
99 );
100 }
101 let interval = self.interval_secs;
102
103 let namespace = self.namespace.clone();
104
105 let metrics = self.metrics.clone();
106
107 Ok(Box::pin(
108 StaticMetrics {
109 namespace,
110 metrics,
111 interval,
112 out: cx.out,
113 shutdown: cx.shutdown,
114 }
115 .run(),
116 ))
117 }
118
119 fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
120 vec![SourceOutput::new_metrics()]
121 }
122
123 fn can_acknowledge(&self) -> bool {
124 false
125 }
126}
127
128struct StaticMetrics {
129 namespace: String,
130 metrics: Vec<StaticMetricConfig>,
131 interval: time::Duration,
132 out: SourceSender,
133 shutdown: ShutdownSignal,
134}
135
136impl StaticMetrics {
137 async fn run(mut self) -> Result<(), ()> {
138 let events_received = register!(EventsReceived);
139 let bytes_received = register!(BytesReceived::from(Protocol::STATIC));
140 let mut interval =
141 IntervalStream::new(time::interval(self.interval)).take_until(self.shutdown);
142
143 let metrics: Vec<Metric> = self
145 .metrics
146 .into_iter()
147 .map(
148 |StaticMetricConfig {
149 name,
150 value,
151 kind,
152 tags,
153 }| {
154 Metric::from_parts(
155 MetricSeries {
156 name: MetricName {
157 name,
158 namespace: Some(self.namespace.clone()),
159 },
160 tags: Some(tags.into()),
161 },
162 MetricData {
163 time: MetricTime {
164 timestamp: None,
165 interval_ms: NonZeroU32::new(self.interval.as_millis() as u32),
166 },
167 kind,
168 value: value.clone(),
169 },
170 EventMetadata::default(),
171 )
172 },
173 )
174 .collect();
175
176 while interval.next().await.is_some() {
177 let count = metrics.len();
178 let byte_size = metrics.size_of();
179 let json_size = metrics.estimated_json_encoded_size_of();
180
181 bytes_received.emit(ByteSize(byte_size));
182 events_received.emit(CountByteSize(count, json_size));
183
184 let batch = metrics
185 .clone()
186 .into_iter()
187 .map(|metric| metric.with_timestamp(Some(Utc::now())));
188
189 if (self.out.send_batch(batch).await).is_err() {
190 emit!(StreamClosedError { count });
191 return Err(());
192 }
193 }
194
195 Ok(())
196 }
197}
198
199#[cfg(test)]
200mod tests {
201 use super::*;
202 use crate::{
203 event::Event,
204 test_util::{
205 self,
206 components::{run_and_assert_source_compliance, SOURCE_TAGS},
207 },
208 };
209
210 #[test]
211 fn generate_config() {
212 test_util::test_generate_config::<StaticMetricsConfig>();
213 }
214
215 async fn events_from_config(config: StaticMetricsConfig) -> Vec<Event> {
216 run_and_assert_source_compliance(config, time::Duration::from_millis(100), &SOURCE_TAGS)
217 .await
218 }
219
220 fn default_metric() -> StaticMetricConfig {
221 StaticMetricConfig {
222 name: "".to_string(),
223 value: MetricValue::Gauge { value: 0.0 },
224 kind: MetricKind::Absolute,
225 tags: BTreeMap::default(),
226 }
227 }
228
229 #[tokio::test]
230 async fn default_empty() {
231 let events = events_from_config(StaticMetricsConfig::default()).await;
232
233 assert!(events.is_empty());
234 }
235
236 #[tokio::test]
237 async fn default_namespace() {
238 let mut events = events_from_config(StaticMetricsConfig {
239 metrics: vec![default_metric()],
240 ..Default::default()
241 })
242 .await;
243
244 assert!(!events.is_empty());
245 let event = events.remove(0);
246 assert_eq!(event.as_metric().namespace(), Some("static"));
247 }
248
249 #[tokio::test]
250 async fn default_namespace_multiple_events() {
251 let mut events = events_from_config(StaticMetricsConfig {
252 metrics: vec![default_metric(), default_metric()],
253 ..Default::default()
254 })
255 .await;
256
257 assert!(!events.is_empty());
258 let event = events.remove(0);
259 assert_eq!(event.as_metric().namespace(), Some("static"));
260 let event = events.remove(0);
261 assert_eq!(event.as_metric().namespace(), Some("static"));
262 }
263
264 #[tokio::test]
265 async fn namespace() {
266 let namespace = "totally_custom";
267
268 let config = StaticMetricsConfig {
269 namespace: namespace.to_owned(),
270 metrics: vec![default_metric()],
271 ..StaticMetricsConfig::default()
272 };
273
274 let mut events = events_from_config(config).await;
275 assert!(!events.is_empty());
276 let event = events.remove(0);
277
278 assert_eq!(event.as_metric().namespace(), Some(namespace));
279 }
280
281 #[tokio::test]
282 async fn sets_custom_tags() {
283 let mut events = events_from_config(StaticMetricsConfig {
284 metrics: vec![StaticMetricConfig {
285 name: "test".to_string(),
286 value: MetricValue::Gauge { value: 2.3 },
287 kind: MetricKind::Absolute,
288 tags: BTreeMap::from([("custom_tag".to_string(), "custom_tag_value".to_string())]),
289 }],
290 ..Default::default()
291 })
292 .await;
293
294 assert!(!events.is_empty());
295 let event = events.remove(0);
296 let metric = event.as_metric();
297
298 assert_eq!(metric.name(), "test");
299 assert!(matches!(metric.value(), MetricValue::Gauge { value: 2.3 }));
300 assert_eq!(
301 metric.tag_value("custom_tag"),
302 Some("custom_tag_value".to_string())
303 );
304 }
305}