vector/sources/apache_metrics/
mod.rs1use std::{future::ready, time::Duration};
2
3use chrono::Utc;
4use futures::{stream, FutureExt, StreamExt, TryFutureExt};
5use http::uri::Scheme;
6use hyper::{Body, Request};
7use serde_with::serde_as;
8use snafu::ResultExt;
9use tokio_stream::wrappers::IntervalStream;
10use vector_lib::configurable::configurable_component;
11use vector_lib::{metric_tags, EstimatedJsonEncodedSizeOf};
12
13use crate::{
14 config::{GenerateConfig, ProxyConfig, SourceConfig, SourceContext, SourceOutput},
15 event::metric::{Metric, MetricKind, MetricValue},
16 http::HttpClient,
17 internal_events::{
18 ApacheMetricsEventsReceived, ApacheMetricsParseError, EndpointBytesReceived,
19 HttpClientHttpError, HttpClientHttpResponseError, StreamClosedError,
20 },
21 shutdown::ShutdownSignal,
22 SourceSender,
23};
24
25mod parser;
26
27pub use parser::ParseError;
28use vector_lib::config::LogNamespace;
29
30#[serde_as]
32#[configurable_component(source("apache_metrics", "Collect metrics from Apache's HTTPD server."))]
33#[derive(Clone, Debug)]
34pub struct ApacheMetricsConfig {
35 #[configurable(metadata(docs::examples = "http://localhost:8080/server-status/?auto"))]
37 endpoints: Vec<String>,
38
39 #[serde(default = "default_scrape_interval_secs")]
41 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
42 #[configurable(metadata(docs::human_name = "Scrape Interval"))]
43 scrape_interval_secs: Duration,
44
45 #[serde(default = "default_namespace")]
49 namespace: String,
50}
51
52pub const fn default_scrape_interval_secs() -> Duration {
53 Duration::from_secs(15)
54}
55
56pub fn default_namespace() -> String {
57 "apache".to_string()
58}
59
60impl GenerateConfig for ApacheMetricsConfig {
61 fn generate_config() -> toml::Value {
62 toml::Value::try_from(Self {
63 endpoints: vec!["http://localhost:8080/server-status/?auto".to_owned()],
64 scrape_interval_secs: default_scrape_interval_secs(),
65 namespace: default_namespace(),
66 })
67 .unwrap()
68 }
69}
70
71#[async_trait::async_trait]
72#[typetag::serde(name = "apache_metrics")]
73impl SourceConfig for ApacheMetricsConfig {
74 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
75 let urls = self
76 .endpoints
77 .iter()
78 .map(|endpoint| endpoint.parse::<http::Uri>())
79 .collect::<Result<Vec<_>, _>>()
80 .context(super::UriParseSnafu)?;
81
82 let namespace = Some(self.namespace.clone()).filter(|namespace| !namespace.is_empty());
83
84 Ok(apache_metrics(
85 urls,
86 self.scrape_interval_secs,
87 namespace,
88 cx.shutdown,
89 cx.out,
90 cx.proxy,
91 ))
92 }
93
94 fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
95 vec![SourceOutput::new_metrics()]
96 }
97
98 fn can_acknowledge(&self) -> bool {
99 false
100 }
101}
102
103trait UriExt {
104 fn to_sanitized_string(&self) -> String;
105
106 fn sanitized_authority(&self) -> String;
107}
108
109impl UriExt for http::Uri {
110 fn to_sanitized_string(&self) -> String {
111 let mut s = String::new();
112
113 if let Some(scheme) = self.scheme() {
114 s.push_str(scheme.as_str());
115 s.push_str("://");
116 }
117
118 s.push_str(&self.sanitized_authority());
119
120 s.push_str(self.path());
121
122 if let Some(query) = self.query() {
123 s.push_str(query);
124 }
125
126 s
127 }
128
129 fn sanitized_authority(&self) -> String {
130 let mut s = String::new();
131
132 if let Some(host) = self.host() {
133 s.push_str(host);
134 }
135
136 if let Some(port) = self.port() {
137 s.push(':');
138 s.push_str(port.as_str());
139 }
140
141 s
142 }
143}
144
145fn apache_metrics(
146 urls: Vec<http::Uri>,
147 interval: Duration,
148 namespace: Option<String>,
149 shutdown: ShutdownSignal,
150 mut out: SourceSender,
151 proxy: ProxyConfig,
152) -> super::Source {
153 Box::pin(async move {
154 let mut stream = IntervalStream::new(tokio::time::interval(interval))
155 .take_until(shutdown)
156 .map(move |_| stream::iter(urls.clone()))
157 .flatten()
158 .map(move |url| {
159 let client = HttpClient::new(None, &proxy).expect("HTTPS initialization failed");
160 let sanitized_url = url.to_sanitized_string();
161
162 let request = Request::get(&url)
163 .body(Body::empty())
164 .expect("error creating request");
165
166 let tags = metric_tags! {
167 "endpoint" => sanitized_url.to_string(),
168 "host" => url.sanitized_authority(),
169 };
170
171 let namespace = namespace.clone();
172 client
173 .send(request)
174 .map_err(crate::Error::from)
175 .and_then(|response| async {
176 let (header, body) = response.into_parts();
177 let body = hyper::body::to_bytes(body).await?;
178 Ok((header, body))
179 })
180 .into_stream()
181 .filter_map(move |response| {
182 ready(match response {
183 Ok((header, body)) if header.status == hyper::StatusCode::OK => {
184 let byte_size = body.len();
185 let body = String::from_utf8_lossy(&body);
186 emit!(EndpointBytesReceived {
187 byte_size,
188 protocol: url.scheme().unwrap_or(&Scheme::HTTP).as_str(),
189 endpoint: &sanitized_url,
190 });
191
192 let results = parser::parse(
193 &body,
194 namespace.as_deref(),
195 Utc::now(),
196 Some(&tags),
197 )
198 .chain(vec![Ok(Metric::new(
199 "up",
200 MetricKind::Absolute,
201 MetricValue::Gauge { value: 1.0 },
202 )
203 .with_namespace(namespace.clone())
204 .with_tags(Some(tags.clone()))
205 .with_timestamp(Some(Utc::now())))]);
206
207 let metrics = results
208 .filter_map(|res| match res {
209 Ok(metric) => Some(metric),
210 Err(e) => {
211 emit!(ApacheMetricsParseError {
212 error: e,
213 endpoint: &sanitized_url,
214 });
215 None
216 }
217 })
218 .collect::<Vec<_>>();
219
220 emit!(ApacheMetricsEventsReceived {
221 byte_size: metrics.estimated_json_encoded_size_of(),
222 count: metrics.len(),
223 endpoint: &sanitized_url,
224 });
225 Some(stream::iter(metrics))
226 }
227 Ok((header, _)) => {
228 emit!(HttpClientHttpResponseError {
229 code: header.status,
230 url: sanitized_url.to_owned(),
231 });
232 Some(stream::iter(vec![Metric::new(
233 "up",
234 MetricKind::Absolute,
235 MetricValue::Gauge { value: 1.0 },
236 )
237 .with_namespace(namespace.clone())
238 .with_tags(Some(tags.clone()))
239 .with_timestamp(Some(Utc::now()))]))
240 }
241 Err(error) => {
242 emit!(HttpClientHttpError {
243 error,
244 url: sanitized_url.to_owned(),
245 });
246 Some(stream::iter(vec![Metric::new(
247 "up",
248 MetricKind::Absolute,
249 MetricValue::Gauge { value: 0.0 },
250 )
251 .with_namespace(namespace.clone())
252 .with_tags(Some(tags.clone()))
253 .with_timestamp(Some(Utc::now()))]))
254 }
255 })
256 })
257 .flatten()
258 })
259 .flatten()
260 .boxed();
261
262 match out.send_event_stream(&mut stream).await {
263 Ok(()) => {
264 debug!("Finished sending.");
265 Ok(())
266 }
267 Err(_) => {
268 let (count, _) = stream.size_hint();
269 emit!(StreamClosedError { count });
270 Err(())
271 }
272 }
273 })
274}
275
276#[cfg(test)]
277mod test {
278 use hyper::{
279 service::{make_service_fn, service_fn},
280 Body, Response, Server,
281 };
282 use similar_asserts::assert_eq;
283 use tokio::time::{sleep, Duration};
284
285 use super::*;
286 use crate::{
287 config::SourceConfig,
288 test_util::{
289 collect_ready,
290 components::{run_and_assert_source_compliance, HTTP_PULL_SOURCE_TAGS},
291 next_addr, wait_for_tcp,
292 },
293 Error,
294 };
295
296 #[test]
297 fn generate_config() {
298 crate::test_util::test_generate_config::<ApacheMetricsConfig>();
299 }
300
301 #[tokio::test]
302 async fn test_apache_up() {
303 let in_addr = next_addr();
304
305 let make_svc = make_service_fn(|_| async {
306 Ok::<_, Error>(service_fn(|_| async {
307 Ok::<_, Error>(Response::new(Body::from(
308 r"
309localhost
310ServerVersion: Apache/2.4.46 (Unix)
311ServerMPM: event
312Server Built: Aug 5 2020 23:20:17
313CurrentTime: Friday, 21-Aug-2020 18:41:34 UTC
314RestartTime: Friday, 21-Aug-2020 18:41:08 UTC
315ParentServerConfigGeneration: 1
316ParentServerMPMGeneration: 0
317ServerUptimeSeconds: 26
318ServerUptime: 26 seconds
319Load1: 0.00
320Load5: 0.03
321Load15: 0.03
322Total Accesses: 30
323Total kBytes: 217
324Total Duration: 11
325CPUUser: .2
326CPUSystem: .02
327CPUChildrenUser: 0
328CPUChildrenSystem: 0
329CPULoad: .846154
330Uptime: 26
331ReqPerSec: 1.15385
332BytesPerSec: 8546.46
333BytesPerReq: 7406.93
334DurationPerReq: .366667
335BusyWorkers: 1
336IdleWorkers: 74
337Processes: 3
338Stopping: 0
339BusyWorkers: 1
340IdleWorkers: 74
341ConnsTotal: 1
342ConnsAsyncWriting: 0
343ConnsAsyncKeepAlive: 0
344ConnsAsyncClosing: 0
345Scoreboard: ____S_____I______R____I_______KK___D__C__G_L____________W__________________.....................................................................................................................................................................................................................................................................................................................................
346 ",
347 )))
348 }))
349 });
350
351 tokio::spawn(async move {
352 if let Err(error) = Server::bind(&in_addr).serve(make_svc).await {
353 error!(message = "Server error.", %error);
354 }
355 });
356 wait_for_tcp(in_addr).await;
357
358 let config = ApacheMetricsConfig {
359 endpoints: vec![format!("http://foo:bar@{}/metrics", in_addr)],
360 scrape_interval_secs: Duration::from_secs(1),
361 namespace: "custom".to_string(),
362 };
363
364 let events = run_and_assert_source_compliance(
365 config,
366 Duration::from_secs(1),
367 &HTTP_PULL_SOURCE_TAGS,
368 )
369 .await;
370 let metrics = events
371 .into_iter()
372 .map(|e| e.into_metric())
373 .collect::<Vec<_>>();
374
375 match metrics.iter().find(|m| m.name() == "up") {
376 Some(m) => {
377 assert_eq!(m.value(), &MetricValue::Gauge { value: 1.0 });
378
379 match m.tags() {
380 Some(tags) => {
381 assert_eq!(
382 tags.get("endpoint"),
383 Some(&format!("http://{in_addr}/metrics")[..])
384 );
385 assert_eq!(tags.get("host"), Some(&in_addr.to_string()[..]));
386 }
387 None => error!(message = "No tags for metric.", metric = ?m),
388 }
389 }
390 None => error!(message = "Could not find up metric in.", metrics = ?metrics),
391 }
392 }
393
394 #[tokio::test]
395 async fn test_apache_error() {
396 let in_addr = next_addr();
397
398 let make_svc = make_service_fn(|_| async {
399 Ok::<_, Error>(service_fn(|_| async {
400 Ok::<_, Error>(
401 Response::builder()
402 .status(404)
403 .body(Body::from("not found"))
404 .unwrap(),
405 )
406 }))
407 });
408
409 tokio::spawn(async move {
410 if let Err(error) = Server::bind(&in_addr).serve(make_svc).await {
411 error!(message = "Server error.", %error);
412 }
413 });
414 wait_for_tcp(in_addr).await;
415
416 let (tx, rx) = SourceSender::new_test();
417
418 let source = ApacheMetricsConfig {
419 endpoints: vec![format!("http://{}", in_addr)],
420 scrape_interval_secs: Duration::from_secs(1),
421 namespace: "apache".to_string(),
422 }
423 .build(SourceContext::new_test(tx, None))
424 .await
425 .unwrap();
426 tokio::spawn(source);
427
428 sleep(Duration::from_secs(1)).await;
429
430 let metrics = collect_ready(rx)
431 .await
432 .into_iter()
433 .map(|e| e.into_metric())
434 .collect::<Vec<_>>();
435
436 match metrics.iter().find(|m| m.name() == "up") {
440 Some(m) => assert_eq!(m.value(), &MetricValue::Gauge { value: 1.0 }),
441 None => error!(message = "Could not find up metric in.", metrics = ?metrics),
442 }
443 }
444
445 #[tokio::test]
446 async fn test_apache_down() {
447 let in_addr = next_addr();
449
450 let (tx, rx) = SourceSender::new_test();
451
452 let source = ApacheMetricsConfig {
453 endpoints: vec![format!("http://{}", in_addr)],
454 scrape_interval_secs: Duration::from_secs(1),
455 namespace: "custom".to_string(),
456 }
457 .build(SourceContext::new_test(tx, None))
458 .await
459 .unwrap();
460 tokio::spawn(source);
461
462 sleep(Duration::from_secs(1)).await;
463
464 let metrics = collect_ready(rx)
465 .await
466 .into_iter()
467 .map(|e| e.into_metric())
468 .collect::<Vec<_>>();
469
470 match metrics.iter().find(|m| m.name() == "up") {
471 Some(m) => assert_eq!(m.value(), &MetricValue::Gauge { value: 0.0 }),
472 None => error!(message = "Could not find up metric in.", metrics = ?metrics),
473 }
474 }
475}