vector/sources/apache_metrics/
mod.rs

1use std::{future::ready, time::Duration};
2
3use chrono::Utc;
4use futures::{stream, FutureExt, StreamExt, TryFutureExt};
5use http::uri::Scheme;
6use hyper::{Body, Request};
7use serde_with::serde_as;
8use snafu::ResultExt;
9use tokio_stream::wrappers::IntervalStream;
10use vector_lib::configurable::configurable_component;
11use vector_lib::{metric_tags, EstimatedJsonEncodedSizeOf};
12
13use crate::{
14    config::{GenerateConfig, ProxyConfig, SourceConfig, SourceContext, SourceOutput},
15    event::metric::{Metric, MetricKind, MetricValue},
16    http::HttpClient,
17    internal_events::{
18        ApacheMetricsEventsReceived, ApacheMetricsParseError, EndpointBytesReceived,
19        HttpClientHttpError, HttpClientHttpResponseError, StreamClosedError,
20    },
21    shutdown::ShutdownSignal,
22    SourceSender,
23};
24
25mod parser;
26
27pub use parser::ParseError;
28use vector_lib::config::LogNamespace;
29
30/// Configuration for the `apache_metrics` source.
31#[serde_as]
32#[configurable_component(source("apache_metrics", "Collect metrics from Apache's HTTPD server."))]
33#[derive(Clone, Debug)]
34pub struct ApacheMetricsConfig {
35    /// The list of `mod_status` endpoints to scrape metrics from.
36    #[configurable(metadata(docs::examples = "http://localhost:8080/server-status/?auto"))]
37    endpoints: Vec<String>,
38
39    /// The interval between scrapes.
40    #[serde(default = "default_scrape_interval_secs")]
41    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
42    #[configurable(metadata(docs::human_name = "Scrape Interval"))]
43    scrape_interval_secs: Duration,
44
45    /// The namespace of the metric.
46    ///
47    /// Disabled if empty.
48    #[serde(default = "default_namespace")]
49    namespace: String,
50}
51
52pub const fn default_scrape_interval_secs() -> Duration {
53    Duration::from_secs(15)
54}
55
56pub fn default_namespace() -> String {
57    "apache".to_string()
58}
59
60impl GenerateConfig for ApacheMetricsConfig {
61    fn generate_config() -> toml::Value {
62        toml::Value::try_from(Self {
63            endpoints: vec!["http://localhost:8080/server-status/?auto".to_owned()],
64            scrape_interval_secs: default_scrape_interval_secs(),
65            namespace: default_namespace(),
66        })
67        .unwrap()
68    }
69}
70
71#[async_trait::async_trait]
72#[typetag::serde(name = "apache_metrics")]
73impl SourceConfig for ApacheMetricsConfig {
74    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
75        let urls = self
76            .endpoints
77            .iter()
78            .map(|endpoint| endpoint.parse::<http::Uri>())
79            .collect::<Result<Vec<_>, _>>()
80            .context(super::UriParseSnafu)?;
81
82        let namespace = Some(self.namespace.clone()).filter(|namespace| !namespace.is_empty());
83
84        Ok(apache_metrics(
85            urls,
86            self.scrape_interval_secs,
87            namespace,
88            cx.shutdown,
89            cx.out,
90            cx.proxy,
91        ))
92    }
93
94    fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
95        vec![SourceOutput::new_metrics()]
96    }
97
98    fn can_acknowledge(&self) -> bool {
99        false
100    }
101}
102
103trait UriExt {
104    fn to_sanitized_string(&self) -> String;
105
106    fn sanitized_authority(&self) -> String;
107}
108
109impl UriExt for http::Uri {
110    fn to_sanitized_string(&self) -> String {
111        let mut s = String::new();
112
113        if let Some(scheme) = self.scheme() {
114            s.push_str(scheme.as_str());
115            s.push_str("://");
116        }
117
118        s.push_str(&self.sanitized_authority());
119
120        s.push_str(self.path());
121
122        if let Some(query) = self.query() {
123            s.push_str(query);
124        }
125
126        s
127    }
128
129    fn sanitized_authority(&self) -> String {
130        let mut s = String::new();
131
132        if let Some(host) = self.host() {
133            s.push_str(host);
134        }
135
136        if let Some(port) = self.port() {
137            s.push(':');
138            s.push_str(port.as_str());
139        }
140
141        s
142    }
143}
144
145fn apache_metrics(
146    urls: Vec<http::Uri>,
147    interval: Duration,
148    namespace: Option<String>,
149    shutdown: ShutdownSignal,
150    mut out: SourceSender,
151    proxy: ProxyConfig,
152) -> super::Source {
153    Box::pin(async move {
154        let mut stream = IntervalStream::new(tokio::time::interval(interval))
155            .take_until(shutdown)
156            .map(move |_| stream::iter(urls.clone()))
157            .flatten()
158            .map(move |url| {
159                let client = HttpClient::new(None, &proxy).expect("HTTPS initialization failed");
160                let sanitized_url = url.to_sanitized_string();
161
162                let request = Request::get(&url)
163                    .body(Body::empty())
164                    .expect("error creating request");
165
166                let tags = metric_tags! {
167                    "endpoint" => sanitized_url.to_string(),
168                    "host" => url.sanitized_authority(),
169                };
170
171                let namespace = namespace.clone();
172                client
173                    .send(request)
174                    .map_err(crate::Error::from)
175                    .and_then(|response| async {
176                        let (header, body) = response.into_parts();
177                        let body = hyper::body::to_bytes(body).await?;
178                        Ok((header, body))
179                    })
180                    .into_stream()
181                    .filter_map(move |response| {
182                        ready(match response {
183                            Ok((header, body)) if header.status == hyper::StatusCode::OK => {
184                                let byte_size = body.len();
185                                let body = String::from_utf8_lossy(&body);
186                                emit!(EndpointBytesReceived {
187                                    byte_size,
188                                    protocol: url.scheme().unwrap_or(&Scheme::HTTP).as_str(),
189                                    endpoint: &sanitized_url,
190                                });
191
192                                let results = parser::parse(
193                                    &body,
194                                    namespace.as_deref(),
195                                    Utc::now(),
196                                    Some(&tags),
197                                )
198                                .chain(vec![Ok(Metric::new(
199                                    "up",
200                                    MetricKind::Absolute,
201                                    MetricValue::Gauge { value: 1.0 },
202                                )
203                                .with_namespace(namespace.clone())
204                                .with_tags(Some(tags.clone()))
205                                .with_timestamp(Some(Utc::now())))]);
206
207                                let metrics = results
208                                    .filter_map(|res| match res {
209                                        Ok(metric) => Some(metric),
210                                        Err(e) => {
211                                            emit!(ApacheMetricsParseError {
212                                                error: e,
213                                                endpoint: &sanitized_url,
214                                            });
215                                            None
216                                        }
217                                    })
218                                    .collect::<Vec<_>>();
219
220                                emit!(ApacheMetricsEventsReceived {
221                                    byte_size: metrics.estimated_json_encoded_size_of(),
222                                    count: metrics.len(),
223                                    endpoint: &sanitized_url,
224                                });
225                                Some(stream::iter(metrics))
226                            }
227                            Ok((header, _)) => {
228                                emit!(HttpClientHttpResponseError {
229                                    code: header.status,
230                                    url: sanitized_url.to_owned(),
231                                });
232                                Some(stream::iter(vec![Metric::new(
233                                    "up",
234                                    MetricKind::Absolute,
235                                    MetricValue::Gauge { value: 1.0 },
236                                )
237                                .with_namespace(namespace.clone())
238                                .with_tags(Some(tags.clone()))
239                                .with_timestamp(Some(Utc::now()))]))
240                            }
241                            Err(error) => {
242                                emit!(HttpClientHttpError {
243                                    error,
244                                    url: sanitized_url.to_owned(),
245                                });
246                                Some(stream::iter(vec![Metric::new(
247                                    "up",
248                                    MetricKind::Absolute,
249                                    MetricValue::Gauge { value: 0.0 },
250                                )
251                                .with_namespace(namespace.clone())
252                                .with_tags(Some(tags.clone()))
253                                .with_timestamp(Some(Utc::now()))]))
254                            }
255                        })
256                    })
257                    .flatten()
258            })
259            .flatten()
260            .boxed();
261
262        match out.send_event_stream(&mut stream).await {
263            Ok(()) => {
264                debug!("Finished sending.");
265                Ok(())
266            }
267            Err(_) => {
268                let (count, _) = stream.size_hint();
269                emit!(StreamClosedError { count });
270                Err(())
271            }
272        }
273    })
274}
275
276#[cfg(test)]
277mod test {
278    use hyper::{
279        service::{make_service_fn, service_fn},
280        Body, Response, Server,
281    };
282    use similar_asserts::assert_eq;
283    use tokio::time::{sleep, Duration};
284
285    use super::*;
286    use crate::{
287        config::SourceConfig,
288        test_util::{
289            collect_ready,
290            components::{run_and_assert_source_compliance, HTTP_PULL_SOURCE_TAGS},
291            next_addr, wait_for_tcp,
292        },
293        Error,
294    };
295
296    #[test]
297    fn generate_config() {
298        crate::test_util::test_generate_config::<ApacheMetricsConfig>();
299    }
300
301    #[tokio::test]
302    async fn test_apache_up() {
303        let in_addr = next_addr();
304
305        let make_svc = make_service_fn(|_| async {
306            Ok::<_, Error>(service_fn(|_| async {
307                Ok::<_, Error>(Response::new(Body::from(
308                    r"
309localhost
310ServerVersion: Apache/2.4.46 (Unix)
311ServerMPM: event
312Server Built: Aug  5 2020 23:20:17
313CurrentTime: Friday, 21-Aug-2020 18:41:34 UTC
314RestartTime: Friday, 21-Aug-2020 18:41:08 UTC
315ParentServerConfigGeneration: 1
316ParentServerMPMGeneration: 0
317ServerUptimeSeconds: 26
318ServerUptime: 26 seconds
319Load1: 0.00
320Load5: 0.03
321Load15: 0.03
322Total Accesses: 30
323Total kBytes: 217
324Total Duration: 11
325CPUUser: .2
326CPUSystem: .02
327CPUChildrenUser: 0
328CPUChildrenSystem: 0
329CPULoad: .846154
330Uptime: 26
331ReqPerSec: 1.15385
332BytesPerSec: 8546.46
333BytesPerReq: 7406.93
334DurationPerReq: .366667
335BusyWorkers: 1
336IdleWorkers: 74
337Processes: 3
338Stopping: 0
339BusyWorkers: 1
340IdleWorkers: 74
341ConnsTotal: 1
342ConnsAsyncWriting: 0
343ConnsAsyncKeepAlive: 0
344ConnsAsyncClosing: 0
345Scoreboard: ____S_____I______R____I_______KK___D__C__G_L____________W__________________.....................................................................................................................................................................................................................................................................................................................................
346                    ",
347                )))
348            }))
349        });
350
351        tokio::spawn(async move {
352            if let Err(error) = Server::bind(&in_addr).serve(make_svc).await {
353                error!(message = "Server error.", %error);
354            }
355        });
356        wait_for_tcp(in_addr).await;
357
358        let config = ApacheMetricsConfig {
359            endpoints: vec![format!("http://foo:bar@{}/metrics", in_addr)],
360            scrape_interval_secs: Duration::from_secs(1),
361            namespace: "custom".to_string(),
362        };
363
364        let events = run_and_assert_source_compliance(
365            config,
366            Duration::from_secs(1),
367            &HTTP_PULL_SOURCE_TAGS,
368        )
369        .await;
370        let metrics = events
371            .into_iter()
372            .map(|e| e.into_metric())
373            .collect::<Vec<_>>();
374
375        match metrics.iter().find(|m| m.name() == "up") {
376            Some(m) => {
377                assert_eq!(m.value(), &MetricValue::Gauge { value: 1.0 });
378
379                match m.tags() {
380                    Some(tags) => {
381                        assert_eq!(
382                            tags.get("endpoint"),
383                            Some(&format!("http://{in_addr}/metrics")[..])
384                        );
385                        assert_eq!(tags.get("host"), Some(&in_addr.to_string()[..]));
386                    }
387                    None => error!(message = "No tags for metric.", metric = ?m),
388                }
389            }
390            None => error!(message = "Could not find up metric in.", metrics = ?metrics),
391        }
392    }
393
394    #[tokio::test]
395    async fn test_apache_error() {
396        let in_addr = next_addr();
397
398        let make_svc = make_service_fn(|_| async {
399            Ok::<_, Error>(service_fn(|_| async {
400                Ok::<_, Error>(
401                    Response::builder()
402                        .status(404)
403                        .body(Body::from("not found"))
404                        .unwrap(),
405                )
406            }))
407        });
408
409        tokio::spawn(async move {
410            if let Err(error) = Server::bind(&in_addr).serve(make_svc).await {
411                error!(message = "Server error.", %error);
412            }
413        });
414        wait_for_tcp(in_addr).await;
415
416        let (tx, rx) = SourceSender::new_test();
417
418        let source = ApacheMetricsConfig {
419            endpoints: vec![format!("http://{}", in_addr)],
420            scrape_interval_secs: Duration::from_secs(1),
421            namespace: "apache".to_string(),
422        }
423        .build(SourceContext::new_test(tx, None))
424        .await
425        .unwrap();
426        tokio::spawn(source);
427
428        sleep(Duration::from_secs(1)).await;
429
430        let metrics = collect_ready(rx)
431            .await
432            .into_iter()
433            .map(|e| e.into_metric())
434            .collect::<Vec<_>>();
435
436        // we still publish `up=1` for bad status codes following the pattern of the Prometheus exporter:
437        //
438        // https://github.com/Lusitaniae/apache_exporter/blob/712a6796fb84f741ef3cd562dc11418f2ee8b741/apache_exporter.go#L200
439        match metrics.iter().find(|m| m.name() == "up") {
440            Some(m) => assert_eq!(m.value(), &MetricValue::Gauge { value: 1.0 }),
441            None => error!(message = "Could not find up metric in.", metrics = ?metrics),
442        }
443    }
444
445    #[tokio::test]
446    async fn test_apache_down() {
447        // will have nothing bound
448        let in_addr = next_addr();
449
450        let (tx, rx) = SourceSender::new_test();
451
452        let source = ApacheMetricsConfig {
453            endpoints: vec![format!("http://{}", in_addr)],
454            scrape_interval_secs: Duration::from_secs(1),
455            namespace: "custom".to_string(),
456        }
457        .build(SourceContext::new_test(tx, None))
458        .await
459        .unwrap();
460        tokio::spawn(source);
461
462        sleep(Duration::from_secs(1)).await;
463
464        let metrics = collect_ready(rx)
465            .await
466            .into_iter()
467            .map(|e| e.into_metric())
468            .collect::<Vec<_>>();
469
470        match metrics.iter().find(|m| m.name() == "up") {
471            Some(m) => assert_eq!(m.value(), &MetricValue::Gauge { value: 0.0 }),
472            None => error!(message = "Could not find up metric in.", metrics = ?metrics),
473        }
474    }
475}