vector/sources/apache_metrics/
mod.rs1use std::{future::ready, time::Duration};
2
3use chrono::Utc;
4use futures::{FutureExt, StreamExt, TryFutureExt, stream};
5use http::uri::Scheme;
6use hyper::{Body, Request};
7use serde_with::serde_as;
8use snafu::ResultExt;
9use tokio_stream::wrappers::IntervalStream;
10use vector_lib::{EstimatedJsonEncodedSizeOf, configurable::configurable_component, metric_tags};
11
12use crate::{
13 SourceSender,
14 config::{GenerateConfig, ProxyConfig, SourceConfig, SourceContext, SourceOutput},
15 event::metric::{Metric, MetricKind, MetricValue},
16 http::HttpClient,
17 internal_events::{
18 ApacheMetricsEventsReceived, ApacheMetricsParseError, EndpointBytesReceived,
19 HttpClientHttpError, HttpClientHttpResponseError, StreamClosedError,
20 },
21 shutdown::ShutdownSignal,
22};
23
24mod parser;
25
26pub use parser::ParseError;
27use vector_lib::config::LogNamespace;
28
29#[serde_as]
31#[configurable_component(source("apache_metrics", "Collect metrics from Apache's HTTPD server."))]
32#[derive(Clone, Debug)]
33pub struct ApacheMetricsConfig {
34 #[configurable(metadata(docs::examples = "http://localhost:8080/server-status/?auto"))]
36 endpoints: Vec<String>,
37
38 #[serde(default = "default_scrape_interval_secs")]
40 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
41 #[configurable(metadata(docs::human_name = "Scrape Interval"))]
42 scrape_interval_secs: Duration,
43
44 #[serde(default = "default_namespace")]
48 namespace: String,
49}
50
51pub const fn default_scrape_interval_secs() -> Duration {
52 Duration::from_secs(15)
53}
54
55pub fn default_namespace() -> String {
56 "apache".to_string()
57}
58
59impl GenerateConfig for ApacheMetricsConfig {
60 fn generate_config() -> toml::Value {
61 toml::Value::try_from(Self {
62 endpoints: vec!["http://localhost:8080/server-status/?auto".to_owned()],
63 scrape_interval_secs: default_scrape_interval_secs(),
64 namespace: default_namespace(),
65 })
66 .unwrap()
67 }
68}
69
70#[async_trait::async_trait]
71#[typetag::serde(name = "apache_metrics")]
72impl SourceConfig for ApacheMetricsConfig {
73 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
74 let urls = self
75 .endpoints
76 .iter()
77 .map(|endpoint| endpoint.parse::<http::Uri>())
78 .collect::<Result<Vec<_>, _>>()
79 .context(super::UriParseSnafu)?;
80
81 let namespace = Some(self.namespace.clone()).filter(|namespace| !namespace.is_empty());
82
83 Ok(apache_metrics(
84 urls,
85 self.scrape_interval_secs,
86 namespace,
87 cx.shutdown,
88 cx.out,
89 cx.proxy,
90 ))
91 }
92
93 fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
94 vec![SourceOutput::new_metrics()]
95 }
96
97 fn can_acknowledge(&self) -> bool {
98 false
99 }
100}
101
102trait UriExt {
103 fn to_sanitized_string(&self) -> String;
104
105 fn sanitized_authority(&self) -> String;
106}
107
108impl UriExt for http::Uri {
109 fn to_sanitized_string(&self) -> String {
110 let mut s = String::new();
111
112 if let Some(scheme) = self.scheme() {
113 s.push_str(scheme.as_str());
114 s.push_str("://");
115 }
116
117 s.push_str(&self.sanitized_authority());
118
119 s.push_str(self.path());
120
121 if let Some(query) = self.query() {
122 s.push_str(query);
123 }
124
125 s
126 }
127
128 fn sanitized_authority(&self) -> String {
129 let mut s = String::new();
130
131 if let Some(host) = self.host() {
132 s.push_str(host);
133 }
134
135 if let Some(port) = self.port() {
136 s.push(':');
137 s.push_str(port.as_str());
138 }
139
140 s
141 }
142}
143
144fn apache_metrics(
145 urls: Vec<http::Uri>,
146 interval: Duration,
147 namespace: Option<String>,
148 shutdown: ShutdownSignal,
149 mut out: SourceSender,
150 proxy: ProxyConfig,
151) -> super::Source {
152 Box::pin(async move {
153 let mut stream = IntervalStream::new(tokio::time::interval(interval))
154 .take_until(shutdown)
155 .map(move |_| stream::iter(urls.clone()))
156 .flatten()
157 .map(move |url| {
158 let client = HttpClient::new(None, &proxy).expect("HTTPS initialization failed");
159 let sanitized_url = url.to_sanitized_string();
160
161 let request = Request::get(&url)
162 .body(Body::empty())
163 .expect("error creating request");
164
165 let tags = metric_tags! {
166 "endpoint" => sanitized_url.to_string(),
167 "host" => url.sanitized_authority(),
168 };
169
170 let namespace = namespace.clone();
171 client
172 .send(request)
173 .map_err(crate::Error::from)
174 .and_then(|response| async {
175 let (header, body) = response.into_parts();
176 let body = hyper::body::to_bytes(body).await?;
177 Ok((header, body))
178 })
179 .into_stream()
180 .filter_map(move |response| {
181 ready(match response {
182 Ok((header, body)) if header.status == hyper::StatusCode::OK => {
183 let byte_size = body.len();
184 let body = String::from_utf8_lossy(&body);
185 emit!(EndpointBytesReceived {
186 byte_size,
187 protocol: url.scheme().unwrap_or(&Scheme::HTTP).as_str(),
188 endpoint: &sanitized_url,
189 });
190
191 let results = parser::parse(
192 &body,
193 namespace.as_deref(),
194 Utc::now(),
195 Some(&tags),
196 )
197 .chain(vec![Ok(Metric::new(
198 "up",
199 MetricKind::Absolute,
200 MetricValue::Gauge { value: 1.0 },
201 )
202 .with_namespace(namespace.clone())
203 .with_tags(Some(tags.clone()))
204 .with_timestamp(Some(Utc::now())))]);
205
206 let metrics = results
207 .filter_map(|res| match res {
208 Ok(metric) => Some(metric),
209 Err(e) => {
210 emit!(ApacheMetricsParseError {
211 error: e,
212 endpoint: &sanitized_url,
213 });
214 None
215 }
216 })
217 .collect::<Vec<_>>();
218
219 emit!(ApacheMetricsEventsReceived {
220 byte_size: metrics.estimated_json_encoded_size_of(),
221 count: metrics.len(),
222 endpoint: &sanitized_url,
223 });
224 Some(stream::iter(metrics))
225 }
226 Ok((header, _)) => {
227 emit!(HttpClientHttpResponseError {
228 code: header.status,
229 url: sanitized_url.to_owned(),
230 });
231 Some(stream::iter(vec![
232 Metric::new(
233 "up",
234 MetricKind::Absolute,
235 MetricValue::Gauge { value: 1.0 },
236 )
237 .with_namespace(namespace.clone())
238 .with_tags(Some(tags.clone()))
239 .with_timestamp(Some(Utc::now())),
240 ]))
241 }
242 Err(error) => {
243 emit!(HttpClientHttpError {
244 error,
245 url: sanitized_url.to_owned(),
246 });
247 Some(stream::iter(vec![
248 Metric::new(
249 "up",
250 MetricKind::Absolute,
251 MetricValue::Gauge { value: 0.0 },
252 )
253 .with_namespace(namespace.clone())
254 .with_tags(Some(tags.clone()))
255 .with_timestamp(Some(Utc::now())),
256 ]))
257 }
258 })
259 })
260 .flatten()
261 })
262 .flatten()
263 .boxed();
264
265 match out.send_event_stream(&mut stream).await {
266 Ok(()) => {
267 debug!("Finished sending.");
268 Ok(())
269 }
270 Err(_) => {
271 let (count, _) = stream.size_hint();
272 emit!(StreamClosedError { count });
273 Err(())
274 }
275 }
276 })
277}
278
279#[cfg(test)]
280mod test {
281 use hyper::{
282 Body, Response, Server,
283 service::{make_service_fn, service_fn},
284 };
285 use similar_asserts::assert_eq;
286 use tokio::time::{Duration, sleep};
287
288 use super::*;
289 use crate::{
290 Error,
291 config::SourceConfig,
292 test_util::{
293 collect_ready,
294 components::{HTTP_PULL_SOURCE_TAGS, run_and_assert_source_compliance},
295 next_addr, wait_for_tcp,
296 },
297 };
298
299 #[test]
300 fn generate_config() {
301 crate::test_util::test_generate_config::<ApacheMetricsConfig>();
302 }
303
304 #[tokio::test]
305 async fn test_apache_up() {
306 let in_addr = next_addr();
307
308 let make_svc = make_service_fn(|_| async {
309 Ok::<_, Error>(service_fn(|_| async {
310 Ok::<_, Error>(Response::new(Body::from(
311 r"
312localhost
313ServerVersion: Apache/2.4.46 (Unix)
314ServerMPM: event
315Server Built: Aug 5 2020 23:20:17
316CurrentTime: Friday, 21-Aug-2020 18:41:34 UTC
317RestartTime: Friday, 21-Aug-2020 18:41:08 UTC
318ParentServerConfigGeneration: 1
319ParentServerMPMGeneration: 0
320ServerUptimeSeconds: 26
321ServerUptime: 26 seconds
322Load1: 0.00
323Load5: 0.03
324Load15: 0.03
325Total Accesses: 30
326Total kBytes: 217
327Total Duration: 11
328CPUUser: .2
329CPUSystem: .02
330CPUChildrenUser: 0
331CPUChildrenSystem: 0
332CPULoad: .846154
333Uptime: 26
334ReqPerSec: 1.15385
335BytesPerSec: 8546.46
336BytesPerReq: 7406.93
337DurationPerReq: .366667
338BusyWorkers: 1
339IdleWorkers: 74
340Processes: 3
341Stopping: 0
342BusyWorkers: 1
343IdleWorkers: 74
344ConnsTotal: 1
345ConnsAsyncWriting: 0
346ConnsAsyncKeepAlive: 0
347ConnsAsyncClosing: 0
348Scoreboard: ____S_____I______R____I_______KK___D__C__G_L____________W__________________.....................................................................................................................................................................................................................................................................................................................................
349 ",
350 )))
351 }))
352 });
353
354 tokio::spawn(async move {
355 if let Err(error) = Server::bind(&in_addr).serve(make_svc).await {
356 error!(message = "Server error.", %error);
357 }
358 });
359 wait_for_tcp(in_addr).await;
360
361 let config = ApacheMetricsConfig {
362 endpoints: vec![format!("http://foo:bar@{}/metrics", in_addr)],
363 scrape_interval_secs: Duration::from_secs(1),
364 namespace: "custom".to_string(),
365 };
366
367 let events = run_and_assert_source_compliance(
368 config,
369 Duration::from_secs(1),
370 &HTTP_PULL_SOURCE_TAGS,
371 )
372 .await;
373 let metrics = events
374 .into_iter()
375 .map(|e| e.into_metric())
376 .collect::<Vec<_>>();
377
378 match metrics.iter().find(|m| m.name() == "up") {
379 Some(m) => {
380 assert_eq!(m.value(), &MetricValue::Gauge { value: 1.0 });
381
382 match m.tags() {
383 Some(tags) => {
384 assert_eq!(
385 tags.get("endpoint"),
386 Some(&format!("http://{in_addr}/metrics")[..])
387 );
388 assert_eq!(tags.get("host"), Some(&in_addr.to_string()[..]));
389 }
390 None => error!(message = "No tags for metric.", metric = ?m),
391 }
392 }
393 None => error!(message = "Could not find up metric in.", metrics = ?metrics),
394 }
395 }
396
397 #[tokio::test]
398 async fn test_apache_error() {
399 let in_addr = next_addr();
400
401 let make_svc = make_service_fn(|_| async {
402 Ok::<_, Error>(service_fn(|_| async {
403 Ok::<_, Error>(
404 Response::builder()
405 .status(404)
406 .body(Body::from("not found"))
407 .unwrap(),
408 )
409 }))
410 });
411
412 tokio::spawn(async move {
413 if let Err(error) = Server::bind(&in_addr).serve(make_svc).await {
414 error!(message = "Server error.", %error);
415 }
416 });
417 wait_for_tcp(in_addr).await;
418
419 let (tx, rx) = SourceSender::new_test();
420
421 let source = ApacheMetricsConfig {
422 endpoints: vec![format!("http://{}", in_addr)],
423 scrape_interval_secs: Duration::from_secs(1),
424 namespace: "apache".to_string(),
425 }
426 .build(SourceContext::new_test(tx, None))
427 .await
428 .unwrap();
429 tokio::spawn(source);
430
431 sleep(Duration::from_secs(1)).await;
432
433 let metrics = collect_ready(rx)
434 .await
435 .into_iter()
436 .map(|e| e.into_metric())
437 .collect::<Vec<_>>();
438
439 match metrics.iter().find(|m| m.name() == "up") {
443 Some(m) => assert_eq!(m.value(), &MetricValue::Gauge { value: 1.0 }),
444 None => error!(message = "Could not find up metric in.", metrics = ?metrics),
445 }
446 }
447
448 #[tokio::test]
449 async fn test_apache_down() {
450 let in_addr = next_addr();
452
453 let (tx, rx) = SourceSender::new_test();
454
455 let source = ApacheMetricsConfig {
456 endpoints: vec![format!("http://{}", in_addr)],
457 scrape_interval_secs: Duration::from_secs(1),
458 namespace: "custom".to_string(),
459 }
460 .build(SourceContext::new_test(tx, None))
461 .await
462 .unwrap();
463 tokio::spawn(source);
464
465 sleep(Duration::from_secs(1)).await;
466
467 let metrics = collect_ready(rx)
468 .await
469 .into_iter()
470 .map(|e| e.into_metric())
471 .collect::<Vec<_>>();
472
473 match metrics.iter().find(|m| m.name() == "up") {
474 Some(m) => assert_eq!(m.value(), &MetricValue::Gauge { value: 0.0 }),
475 None => error!(message = "Could not find up metric in.", metrics = ?metrics),
476 }
477 }
478}