vector/sources/apache_metrics/
mod.rs

1use std::{future::ready, time::Duration};
2
3use chrono::Utc;
4use futures::{FutureExt, StreamExt, TryFutureExt, stream};
5use http::uri::Scheme;
6use hyper::{Body, Request};
7use serde_with::serde_as;
8use snafu::ResultExt;
9use tokio_stream::wrappers::IntervalStream;
10use vector_lib::{EstimatedJsonEncodedSizeOf, configurable::configurable_component, metric_tags};
11
12use crate::{
13    SourceSender,
14    config::{GenerateConfig, ProxyConfig, SourceConfig, SourceContext, SourceOutput},
15    event::metric::{Metric, MetricKind, MetricValue},
16    http::HttpClient,
17    internal_events::{
18        ApacheMetricsEventsReceived, ApacheMetricsParseError, EndpointBytesReceived,
19        HttpClientHttpError, HttpClientHttpResponseError, StreamClosedError,
20    },
21    shutdown::ShutdownSignal,
22};
23
24mod parser;
25
26pub use parser::ParseError;
27use vector_lib::config::LogNamespace;
28
29/// Configuration for the `apache_metrics` source.
30#[serde_as]
31#[configurable_component(source("apache_metrics", "Collect metrics from Apache's HTTPD server."))]
32#[derive(Clone, Debug)]
33pub struct ApacheMetricsConfig {
34    /// The list of `mod_status` endpoints to scrape metrics from.
35    #[configurable(metadata(docs::examples = "http://localhost:8080/server-status/?auto"))]
36    endpoints: Vec<String>,
37
38    /// The interval between scrapes.
39    #[serde(default = "default_scrape_interval_secs")]
40    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
41    #[configurable(metadata(docs::human_name = "Scrape Interval"))]
42    scrape_interval_secs: Duration,
43
44    /// The namespace of the metric.
45    ///
46    /// Disabled if empty.
47    #[serde(default = "default_namespace")]
48    namespace: String,
49}
50
51pub const fn default_scrape_interval_secs() -> Duration {
52    Duration::from_secs(15)
53}
54
55pub fn default_namespace() -> String {
56    "apache".to_string()
57}
58
59impl GenerateConfig for ApacheMetricsConfig {
60    fn generate_config() -> toml::Value {
61        toml::Value::try_from(Self {
62            endpoints: vec!["http://localhost:8080/server-status/?auto".to_owned()],
63            scrape_interval_secs: default_scrape_interval_secs(),
64            namespace: default_namespace(),
65        })
66        .unwrap()
67    }
68}
69
70#[async_trait::async_trait]
71#[typetag::serde(name = "apache_metrics")]
72impl SourceConfig for ApacheMetricsConfig {
73    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
74        let urls = self
75            .endpoints
76            .iter()
77            .map(|endpoint| endpoint.parse::<http::Uri>())
78            .collect::<Result<Vec<_>, _>>()
79            .context(super::UriParseSnafu)?;
80
81        let namespace = Some(self.namespace.clone()).filter(|namespace| !namespace.is_empty());
82
83        Ok(apache_metrics(
84            urls,
85            self.scrape_interval_secs,
86            namespace,
87            cx.shutdown,
88            cx.out,
89            cx.proxy,
90        ))
91    }
92
93    fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
94        vec![SourceOutput::new_metrics()]
95    }
96
97    fn can_acknowledge(&self) -> bool {
98        false
99    }
100}
101
102trait UriExt {
103    fn to_sanitized_string(&self) -> String;
104
105    fn sanitized_authority(&self) -> String;
106}
107
108impl UriExt for http::Uri {
109    fn to_sanitized_string(&self) -> String {
110        let mut s = String::new();
111
112        if let Some(scheme) = self.scheme() {
113            s.push_str(scheme.as_str());
114            s.push_str("://");
115        }
116
117        s.push_str(&self.sanitized_authority());
118
119        s.push_str(self.path());
120
121        if let Some(query) = self.query() {
122            s.push_str(query);
123        }
124
125        s
126    }
127
128    fn sanitized_authority(&self) -> String {
129        let mut s = String::new();
130
131        if let Some(host) = self.host() {
132            s.push_str(host);
133        }
134
135        if let Some(port) = self.port() {
136            s.push(':');
137            s.push_str(port.as_str());
138        }
139
140        s
141    }
142}
143
144fn apache_metrics(
145    urls: Vec<http::Uri>,
146    interval: Duration,
147    namespace: Option<String>,
148    shutdown: ShutdownSignal,
149    mut out: SourceSender,
150    proxy: ProxyConfig,
151) -> super::Source {
152    Box::pin(async move {
153        let mut stream = IntervalStream::new(tokio::time::interval(interval))
154            .take_until(shutdown)
155            .map(move |_| stream::iter(urls.clone()))
156            .flatten()
157            .map(move |url| {
158                let client = HttpClient::new(None, &proxy).expect("HTTPS initialization failed");
159                let sanitized_url = url.to_sanitized_string();
160
161                let request = Request::get(&url)
162                    .body(Body::empty())
163                    .expect("error creating request");
164
165                let tags = metric_tags! {
166                    "endpoint" => sanitized_url.to_string(),
167                    "host" => url.sanitized_authority(),
168                };
169
170                let namespace = namespace.clone();
171                client
172                    .send(request)
173                    .map_err(crate::Error::from)
174                    .and_then(|response| async {
175                        let (header, body) = response.into_parts();
176                        let body = hyper::body::to_bytes(body).await?;
177                        Ok((header, body))
178                    })
179                    .into_stream()
180                    .filter_map(move |response| {
181                        ready(match response {
182                            Ok((header, body)) if header.status == hyper::StatusCode::OK => {
183                                let byte_size = body.len();
184                                let body = String::from_utf8_lossy(&body);
185                                emit!(EndpointBytesReceived {
186                                    byte_size,
187                                    protocol: url.scheme().unwrap_or(&Scheme::HTTP).as_str(),
188                                    endpoint: &sanitized_url,
189                                });
190
191                                let results = parser::parse(
192                                    &body,
193                                    namespace.as_deref(),
194                                    Utc::now(),
195                                    Some(&tags),
196                                )
197                                .chain(vec![Ok(Metric::new(
198                                    "up",
199                                    MetricKind::Absolute,
200                                    MetricValue::Gauge { value: 1.0 },
201                                )
202                                .with_namespace(namespace.clone())
203                                .with_tags(Some(tags.clone()))
204                                .with_timestamp(Some(Utc::now())))]);
205
206                                let metrics = results
207                                    .filter_map(|res| match res {
208                                        Ok(metric) => Some(metric),
209                                        Err(e) => {
210                                            emit!(ApacheMetricsParseError {
211                                                error: e,
212                                                endpoint: &sanitized_url,
213                                            });
214                                            None
215                                        }
216                                    })
217                                    .collect::<Vec<_>>();
218
219                                emit!(ApacheMetricsEventsReceived {
220                                    byte_size: metrics.estimated_json_encoded_size_of(),
221                                    count: metrics.len(),
222                                    endpoint: &sanitized_url,
223                                });
224                                Some(stream::iter(metrics))
225                            }
226                            Ok((header, _)) => {
227                                emit!(HttpClientHttpResponseError {
228                                    code: header.status,
229                                    url: sanitized_url.to_owned(),
230                                });
231                                Some(stream::iter(vec![
232                                    Metric::new(
233                                        "up",
234                                        MetricKind::Absolute,
235                                        MetricValue::Gauge { value: 1.0 },
236                                    )
237                                    .with_namespace(namespace.clone())
238                                    .with_tags(Some(tags.clone()))
239                                    .with_timestamp(Some(Utc::now())),
240                                ]))
241                            }
242                            Err(error) => {
243                                emit!(HttpClientHttpError {
244                                    error,
245                                    url: sanitized_url.to_owned(),
246                                });
247                                Some(stream::iter(vec![
248                                    Metric::new(
249                                        "up",
250                                        MetricKind::Absolute,
251                                        MetricValue::Gauge { value: 0.0 },
252                                    )
253                                    .with_namespace(namespace.clone())
254                                    .with_tags(Some(tags.clone()))
255                                    .with_timestamp(Some(Utc::now())),
256                                ]))
257                            }
258                        })
259                    })
260                    .flatten()
261            })
262            .flatten()
263            .boxed();
264
265        match out.send_event_stream(&mut stream).await {
266            Ok(()) => {
267                debug!("Finished sending.");
268                Ok(())
269            }
270            Err(_) => {
271                let (count, _) = stream.size_hint();
272                emit!(StreamClosedError { count });
273                Err(())
274            }
275        }
276    })
277}
278
279#[cfg(test)]
280mod test {
281    use hyper::{
282        Body, Response, Server,
283        service::{make_service_fn, service_fn},
284    };
285    use similar_asserts::assert_eq;
286    use tokio::time::{Duration, sleep};
287
288    use super::*;
289    use crate::{
290        Error,
291        config::SourceConfig,
292        test_util::{
293            collect_ready,
294            components::{HTTP_PULL_SOURCE_TAGS, run_and_assert_source_compliance},
295            next_addr, wait_for_tcp,
296        },
297    };
298
299    #[test]
300    fn generate_config() {
301        crate::test_util::test_generate_config::<ApacheMetricsConfig>();
302    }
303
304    #[tokio::test]
305    async fn test_apache_up() {
306        let in_addr = next_addr();
307
308        let make_svc = make_service_fn(|_| async {
309            Ok::<_, Error>(service_fn(|_| async {
310                Ok::<_, Error>(Response::new(Body::from(
311                    r"
312localhost
313ServerVersion: Apache/2.4.46 (Unix)
314ServerMPM: event
315Server Built: Aug  5 2020 23:20:17
316CurrentTime: Friday, 21-Aug-2020 18:41:34 UTC
317RestartTime: Friday, 21-Aug-2020 18:41:08 UTC
318ParentServerConfigGeneration: 1
319ParentServerMPMGeneration: 0
320ServerUptimeSeconds: 26
321ServerUptime: 26 seconds
322Load1: 0.00
323Load5: 0.03
324Load15: 0.03
325Total Accesses: 30
326Total kBytes: 217
327Total Duration: 11
328CPUUser: .2
329CPUSystem: .02
330CPUChildrenUser: 0
331CPUChildrenSystem: 0
332CPULoad: .846154
333Uptime: 26
334ReqPerSec: 1.15385
335BytesPerSec: 8546.46
336BytesPerReq: 7406.93
337DurationPerReq: .366667
338BusyWorkers: 1
339IdleWorkers: 74
340Processes: 3
341Stopping: 0
342BusyWorkers: 1
343IdleWorkers: 74
344ConnsTotal: 1
345ConnsAsyncWriting: 0
346ConnsAsyncKeepAlive: 0
347ConnsAsyncClosing: 0
348Scoreboard: ____S_____I______R____I_______KK___D__C__G_L____________W__________________.....................................................................................................................................................................................................................................................................................................................................
349                    ",
350                )))
351            }))
352        });
353
354        tokio::spawn(async move {
355            if let Err(error) = Server::bind(&in_addr).serve(make_svc).await {
356                error!(message = "Server error.", %error);
357            }
358        });
359        wait_for_tcp(in_addr).await;
360
361        let config = ApacheMetricsConfig {
362            endpoints: vec![format!("http://foo:bar@{}/metrics", in_addr)],
363            scrape_interval_secs: Duration::from_secs(1),
364            namespace: "custom".to_string(),
365        };
366
367        let events = run_and_assert_source_compliance(
368            config,
369            Duration::from_secs(1),
370            &HTTP_PULL_SOURCE_TAGS,
371        )
372        .await;
373        let metrics = events
374            .into_iter()
375            .map(|e| e.into_metric())
376            .collect::<Vec<_>>();
377
378        match metrics.iter().find(|m| m.name() == "up") {
379            Some(m) => {
380                assert_eq!(m.value(), &MetricValue::Gauge { value: 1.0 });
381
382                match m.tags() {
383                    Some(tags) => {
384                        assert_eq!(
385                            tags.get("endpoint"),
386                            Some(&format!("http://{in_addr}/metrics")[..])
387                        );
388                        assert_eq!(tags.get("host"), Some(&in_addr.to_string()[..]));
389                    }
390                    None => error!(message = "No tags for metric.", metric = ?m),
391                }
392            }
393            None => error!(message = "Could not find up metric in.", metrics = ?metrics),
394        }
395    }
396
397    #[tokio::test]
398    async fn test_apache_error() {
399        let in_addr = next_addr();
400
401        let make_svc = make_service_fn(|_| async {
402            Ok::<_, Error>(service_fn(|_| async {
403                Ok::<_, Error>(
404                    Response::builder()
405                        .status(404)
406                        .body(Body::from("not found"))
407                        .unwrap(),
408                )
409            }))
410        });
411
412        tokio::spawn(async move {
413            if let Err(error) = Server::bind(&in_addr).serve(make_svc).await {
414                error!(message = "Server error.", %error);
415            }
416        });
417        wait_for_tcp(in_addr).await;
418
419        let (tx, rx) = SourceSender::new_test();
420
421        let source = ApacheMetricsConfig {
422            endpoints: vec![format!("http://{}", in_addr)],
423            scrape_interval_secs: Duration::from_secs(1),
424            namespace: "apache".to_string(),
425        }
426        .build(SourceContext::new_test(tx, None))
427        .await
428        .unwrap();
429        tokio::spawn(source);
430
431        sleep(Duration::from_secs(1)).await;
432
433        let metrics = collect_ready(rx)
434            .await
435            .into_iter()
436            .map(|e| e.into_metric())
437            .collect::<Vec<_>>();
438
439        // we still publish `up=1` for bad status codes following the pattern of the Prometheus exporter:
440        //
441        // https://github.com/Lusitaniae/apache_exporter/blob/712a6796fb84f741ef3cd562dc11418f2ee8b741/apache_exporter.go#L200
442        match metrics.iter().find(|m| m.name() == "up") {
443            Some(m) => assert_eq!(m.value(), &MetricValue::Gauge { value: 1.0 }),
444            None => error!(message = "Could not find up metric in.", metrics = ?metrics),
445        }
446    }
447
448    #[tokio::test]
449    async fn test_apache_down() {
450        // will have nothing bound
451        let in_addr = next_addr();
452
453        let (tx, rx) = SourceSender::new_test();
454
455        let source = ApacheMetricsConfig {
456            endpoints: vec![format!("http://{}", in_addr)],
457            scrape_interval_secs: Duration::from_secs(1),
458            namespace: "custom".to_string(),
459        }
460        .build(SourceContext::new_test(tx, None))
461        .await
462        .unwrap();
463        tokio::spawn(source);
464
465        sleep(Duration::from_secs(1)).await;
466
467        let metrics = collect_ready(rx)
468            .await
469            .into_iter()
470            .map(|e| e.into_metric())
471            .collect::<Vec<_>>();
472
473        match metrics.iter().find(|m| m.name() == "up") {
474            Some(m) => assert_eq!(m.value(), &MetricValue::Gauge { value: 0.0 }),
475            None => error!(message = "Could not find up metric in.", metrics = ?metrics),
476        }
477    }
478}