1#![allow(clippy::print_stderr)] #![allow(clippy::print_stdout)] use std::collections::HashSet;
5
6pub async fn load(url: &str) -> Result<String, Box<dyn std::error::Error>> {
10 let response = reqwest::get(url).await?.error_for_status()?;
11 let body = response.text().await?;
12 Ok(body)
13}
14
15fn metrics_regex() -> regex::Regex {
16 regex::RegexBuilder::new(
17 r"^(?P<name>[a-zA-Z_:][a-zA-Z0-9_:]*)(?P<labels>\{[^}]*\})? (?P<value>\S+?)( (?P<timestamp>\S+?))?$",
18 )
19 .multi_line(true)
20 .build()
21 .expect("invalid regex")
22}
23
24pub fn extract_component_sent_events_total_sum(
27 metrics: &str,
28) -> Result<u64, Box<dyn std::error::Error>> {
29 metrics_regex()
30 .captures_iter(metrics)
31 .filter_map(|captures| {
32 let metric_name = &captures["name"];
33 let value = &captures["value"];
34 if !metric_name.contains("component_sent_events_total") {
35 return None;
36 }
37 Some(value.to_owned())
38 })
39 .try_fold::<u64, _, Result<u64, Box<dyn std::error::Error>>>(0u64, |acc, value| {
40 let value = value.parse::<u64>()?;
41 let next_acc = acc.checked_add(value).ok_or("u64 overflow")?;
42 Ok(next_acc)
43 })
44}
45
46pub fn extract_vector_started(metrics: &str) -> bool {
48 metrics_regex().captures_iter(metrics).any(|captures| {
49 let metric_name = &captures["name"];
50 let value = &captures["value"];
51 metric_name.contains("vector_started") && value == "1"
52 })
53}
54
55pub async fn get_component_sent_events_total(url: &str) -> Result<u64, Box<dyn std::error::Error>> {
58 let metrics = load(url).await?;
59 extract_component_sent_events_total_sum(&metrics)
60}
61
62pub async fn assert_vector_started(url: &str) -> Result<(), Box<dyn std::error::Error>> {
65 let metrics = load(url).await?;
66 if !extract_vector_started(&metrics) {
67 return Err(format!("`vector_started`-ish metric was not found:\n{metrics}").into());
68 }
69 Ok(())
70}
71
72pub async fn wait_for_vector_started(
76 url: &str,
77 next_attempt_delay: std::time::Duration,
78 deadline: std::time::Instant,
79) -> Result<(), Box<dyn std::error::Error>> {
80 loop {
81 let err = match assert_vector_started(url).await {
82 Ok(()) => break,
83 Err(err) => err,
84 };
85 if std::time::Instant::now() >= deadline {
86 return Err(err);
87 }
88
89 eprintln!(
90 "Waiting for `vector_started`-ish metric to be available, next poll in {} sec, deadline in {} sec",
91 next_attempt_delay.as_secs_f64(),
92 deadline
93 .saturating_duration_since(std::time::Instant::now())
94 .as_secs_f64(),
95 );
96 tokio::time::sleep(next_attempt_delay).await;
97 }
98 Ok(())
99}
100
101pub const HOST_METRICS: &[&str] = &[
102 "host_load1",
103 "host_load5",
104 "host_cpu_seconds_total",
105 "host_filesystem_total_bytes",
106];
107
108pub const SOURCE_COMPLIANCE_METRICS: &[&str] = &[
109 "vector_component_received_events_total",
110 "vector_component_received_event_bytes_total",
111 "vector_component_sent_events_total",
112 "vector_component_sent_event_bytes_total",
113];
114
115pub async fn assert_metrics_present(
118 url: &str,
119 metrics_list: &[&str],
120) -> Result<(), Box<dyn std::error::Error>> {
121 let metrics = load(url).await?;
122 let mut required_metrics: HashSet<_> = HashSet::from_iter(metrics_list.iter().cloned());
123 for captures in metrics_regex().captures_iter(&metrics) {
124 let metric_name = &captures["name"];
125 required_metrics.remove(metric_name);
126 }
127 if !required_metrics.is_empty() {
128 return Err(format!("Some host metrics were not found:\n{required_metrics:?}").into());
129 }
130 Ok(())
131}
132
133#[cfg(test)]
134mod tests {
135 use super::*;
136
137 #[test]
138 fn test_extract_component_sent_events_total_sum() {
139 let cases = vec![
140 (vec![r#""#], 0),
141 (vec![r#"component_sent_events_total 123"#], 123),
142 (vec![r#"component_sent_events_total{} 123"#], 123),
143 (
144 vec![r#"component_sent_events_total{method="POST"} 456"#],
145 456,
146 ),
147 (vec![r#"component_sent_events_total{a="b",c="d"} 456"#], 456),
148 (
149 vec![
150 r#"component_sent_events_total 123"#,
151 r#"component_sent_events_total{method="POST"} 456"#,
152 ],
153 123 + 456,
154 ),
155 (vec![r#"other{} 789"#], 0),
156 (
157 vec![
158 r#"component_sent_events_total{} 123"#,
159 r#"component_sent_events_total{method="POST"} 456"#,
160 r#"other{} 789"#,
161 ],
162 123 + 456,
163 ),
164 (
166 vec![
167 r#"component_sent_events_total 1"#,
168 r#"vector_component_sent_events_total 3"#,
169 ],
170 1 + 3,
171 ),
172 (
174 vec![
175 r#"component_sent_events_total 1 1607985729161"#,
176 r#"vector_component_sent_events_total 3 1607985729161"#,
177 ],
178 1 + 3,
179 ),
180 ];
181
182 for (input, expected_value) in cases {
183 let input = input.join("\n");
184 let actual_value = extract_component_sent_events_total_sum(&input).unwrap();
185 assert_eq!(expected_value, actual_value);
186 }
187 }
188
189 #[test]
190 fn test_extract_vector_started() {
191 let cases = vec![
192 (vec![r#"vector_started 1"#], true),
193 (vec![r#"vector_started_total 1"#], true),
194 (vec![r#"vector_vector_started_total 1"#], true),
195 (vec![r#""#], false),
196 (vec![r#"other{} 1"#], false),
197 (
199 vec![
200 r#"# HELP vector_started_total vector_started_total"#,
201 r#"# TYPE vector_started_total counter"#,
202 r#"vector_started_total 1"#,
203 ],
204 true,
205 ),
206 (
208 vec![
209 r#"# HELP vector_started_total started_total"#,
210 r#"# TYPE vector_started_total counter"#,
211 r#"vector_started_total 1 1607985729161"#,
212 ],
213 true,
214 ),
215 ];
216
217 for (input, expected_value) in cases {
218 let input = input.join("\n");
219 let actual_value = extract_vector_started(&input);
220 assert_eq!(expected_value, actual_value, "input: {input}");
221 }
222 }
223}