vector/sources/nginx_metrics/
mod.rs1use std::{
2 convert::TryFrom,
3 time::{Duration, Instant},
4};
5
6use bytes::Bytes;
7use chrono::Utc;
8use futures::{StreamExt, TryFutureExt, future::join_all};
9use http::{Request, StatusCode};
10use hyper::{Body, Uri, body::to_bytes as body_to_bytes};
11use serde_with::serde_as;
12use snafu::{ResultExt, Snafu};
13use tokio::time;
14use tokio_stream::wrappers::IntervalStream;
15use vector_lib::{EstimatedJsonEncodedSizeOf, configurable::configurable_component, metric_tags};
16
17use crate::{
18 config::{SourceConfig, SourceContext, SourceOutput},
19 event::metric::{Metric, MetricKind, MetricTags, MetricValue},
20 http::{Auth, HttpClient},
21 internal_events::{
22 CollectionCompleted, EndpointBytesReceived, NginxMetricsEventsReceived,
23 NginxMetricsRequestError, NginxMetricsStubStatusParseError, StreamClosedError,
24 },
25 tls::{TlsConfig, TlsSettings},
26};
27
28pub mod parser;
29use parser::NginxStubStatus;
30use vector_lib::config::LogNamespace;
31
32macro_rules! counter {
33 ($value:expr_2021) => {
34 MetricValue::Counter {
35 value: $value as f64,
36 }
37 };
38}
39
40macro_rules! gauge {
41 ($value:expr_2021) => {
42 MetricValue::Gauge {
43 value: $value as f64,
44 }
45 };
46}
47
48#[derive(Debug, Snafu)]
49enum NginxBuildError {
50 #[snafu(display("Failed to parse endpoint: {}", source))]
51 HostInvalidUri { source: http::uri::InvalidUri },
52}
53
54#[derive(Debug, Snafu)]
55enum NginxError {
56 #[snafu(display("Invalid response status: {}", status))]
57 InvalidResponseStatus { status: StatusCode },
58}
59
60#[serde_as]
62#[configurable_component(source("nginx_metrics", "Collect metrics from NGINX."))]
63#[derive(Clone, Debug, Default)]
64#[serde(deny_unknown_fields)]
65pub struct NginxMetricsConfig {
66 #[configurable(metadata(docs::examples = "http://localhost:8000/basic_status"))]
71 endpoints: Vec<String>,
72
73 #[serde(default = "default_scrape_interval_secs")]
75 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
76 #[configurable(metadata(docs::human_name = "Scrape Interval"))]
77 scrape_interval_secs: Duration,
78
79 #[serde(default = "default_namespace")]
85 namespace: String,
86
87 #[configurable(derived)]
88 tls: Option<TlsConfig>,
89
90 #[configurable(derived)]
91 auth: Option<Auth>,
92}
93
94pub(super) const fn default_scrape_interval_secs() -> Duration {
95 Duration::from_secs(15)
96}
97
98pub fn default_namespace() -> String {
99 "nginx".to_string()
100}
101
102impl_generate_config_from_default!(NginxMetricsConfig);
103
104#[async_trait::async_trait]
105#[typetag::serde(name = "nginx_metrics")]
106impl SourceConfig for NginxMetricsConfig {
107 async fn build(&self, mut cx: SourceContext) -> crate::Result<super::Source> {
108 let tls = TlsSettings::from_options(self.tls.as_ref())?;
109 let http_client = HttpClient::new(tls, &cx.proxy)?;
110
111 let namespace = Some(self.namespace.clone()).filter(|namespace| !namespace.is_empty());
112 let mut sources = Vec::with_capacity(self.endpoints.len());
113 for endpoint in self.endpoints.iter() {
114 sources.push(NginxMetrics::new(
115 http_client.clone(),
116 endpoint.clone(),
117 self.auth.clone(),
118 namespace.clone(),
119 )?);
120 }
121
122 let duration = self.scrape_interval_secs;
123 let shutdown = cx.shutdown;
124 Ok(Box::pin(async move {
125 let mut interval = IntervalStream::new(time::interval(duration)).take_until(shutdown);
126 while interval.next().await.is_some() {
127 let start = Instant::now();
128 let metrics = join_all(sources.iter().map(|nginx| nginx.collect())).await;
129 emit!(CollectionCompleted {
130 start,
131 end: Instant::now()
132 });
133
134 let metrics: Vec<Metric> = metrics.into_iter().flatten().collect();
135 let count = metrics.len();
136
137 if (cx.out.send_batch(metrics).await).is_err() {
138 emit!(StreamClosedError { count });
139 return Err(());
140 }
141 }
142
143 Ok(())
144 }))
145 }
146
147 fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
148 vec![SourceOutput::new_metrics()]
149 }
150
151 fn can_acknowledge(&self) -> bool {
152 false
153 }
154}
155
156#[derive(Debug)]
157struct NginxMetrics {
158 http_client: HttpClient,
159 endpoint: String,
160 auth: Option<Auth>,
161 namespace: Option<String>,
162 tags: MetricTags,
163}
164
165impl NginxMetrics {
166 fn new(
167 http_client: HttpClient,
168 endpoint: String,
169 auth: Option<Auth>,
170 namespace: Option<String>,
171 ) -> crate::Result<Self> {
172 let tags = metric_tags!(
173 "endpoint" => endpoint.clone(),
174 "host" => Self::get_endpoint_host(&endpoint)?,
175 );
176
177 Ok(Self {
178 http_client,
179 endpoint,
180 auth,
181 namespace,
182 tags,
183 })
184 }
185
186 fn get_endpoint_host(endpoint: &str) -> crate::Result<String> {
187 let uri: Uri = endpoint.parse().context(HostInvalidUriSnafu)?;
188 Ok(match (uri.host().unwrap_or(""), uri.port()) {
189 (host, None) => host.to_owned(),
190 (host, Some(port)) => format!("{host}:{port}"),
191 })
192 }
193
194 async fn collect(&self) -> Vec<Metric> {
195 let (up_value, mut metrics) = match self.collect_metrics().await {
196 Ok(metrics) => (1.0, metrics),
197 Err(()) => (0.0, vec![]),
198 };
199
200 let byte_size = metrics.estimated_json_encoded_size_of();
201
202 metrics.push(self.create_metric("up", gauge!(up_value)));
203
204 emit!(NginxMetricsEventsReceived {
205 count: metrics.len(),
206 byte_size,
207 endpoint: &self.endpoint
208 });
209
210 metrics
211 }
212
213 async fn collect_metrics(&self) -> Result<Vec<Metric>, ()> {
214 let response = self.get_nginx_response().await.map_err(|error| {
215 emit!(NginxMetricsRequestError {
216 error,
217 endpoint: &self.endpoint,
218 })
219 })?;
220 emit!(EndpointBytesReceived {
221 byte_size: response.len(),
222 protocol: "http",
223 endpoint: &self.endpoint,
224 });
225
226 let status = NginxStubStatus::try_from(String::from_utf8_lossy(&response).as_ref())
227 .map_err(|error| {
228 emit!(NginxMetricsStubStatusParseError {
229 error,
230 endpoint: &self.endpoint,
231 })
232 })?;
233
234 Ok(vec![
235 self.create_metric("connections_active", gauge!(status.active)),
236 self.create_metric("connections_accepted_total", counter!(status.accepts)),
237 self.create_metric("connections_handled_total", counter!(status.handled)),
238 self.create_metric("http_requests_total", counter!(status.requests)),
239 self.create_metric("connections_reading", gauge!(status.reading)),
240 self.create_metric("connections_writing", gauge!(status.writing)),
241 self.create_metric("connections_waiting", gauge!(status.waiting)),
242 ])
243 }
244
245 async fn get_nginx_response(&self) -> crate::Result<Bytes> {
246 let mut request = Request::get(&self.endpoint).body(Body::empty())?;
247 if let Some(auth) = &self.auth {
248 auth.apply(&mut request);
249 }
250
251 let response = self.http_client.send(request).await?;
252 let (parts, body) = response.into_parts();
253 match parts.status {
254 StatusCode::OK => body_to_bytes(body).err_into().await,
255 status => Err(Box::new(NginxError::InvalidResponseStatus { status })),
256 }
257 }
258
259 fn create_metric(&self, name: &str, value: MetricValue) -> Metric {
260 Metric::new(name, MetricKind::Absolute, value)
261 .with_namespace(self.namespace.clone())
262 .with_tags(Some(self.tags.clone()))
263 .with_timestamp(Some(Utc::now()))
264 }
265}
266
267#[cfg(test)]
268mod tests {
269 use super::*;
270
271 #[test]
272 fn generate_config() {
273 crate::test_util::test_generate_config::<NginxMetricsConfig>();
274 }
275}
276
277#[cfg(all(test, feature = "nginx-integration-tests"))]
278mod integration_tests {
279 use tokio::time::Duration;
280
281 use super::*;
282 use crate::{
283 config::ProxyConfig,
284 test_util::components::{HTTP_PULL_SOURCE_TAGS, run_and_assert_source_compliance_advanced},
285 };
286
287 fn nginx_proxy_address() -> String {
288 std::env::var("NGINX_PROXY_ADDRESS").unwrap_or_else(|_| "http://nginx-proxy:8000".into())
289 }
290
291 fn nginx_address() -> String {
292 std::env::var("NGINX_ADDRESS").unwrap_or_else(|_| "http://localhost:8000".into())
293 }
294
295 fn squid_address() -> String {
296 std::env::var("SQUID_ADDRESS").unwrap_or_else(|_| "http://localhost:3128".into())
297 }
298
299 async fn test_nginx(endpoint: String, auth: Option<Auth>, proxy: ProxyConfig) {
300 let config = NginxMetricsConfig {
301 endpoints: vec![endpoint],
302 scrape_interval_secs: Duration::from_secs(15),
303 namespace: "vector_nginx".to_owned(),
304 tls: None,
305 auth,
306 };
307
308 let events = run_and_assert_source_compliance_advanced(
309 config,
310 move |context: &mut SourceContext| {
311 context.proxy = proxy;
312 },
313 Some(Duration::from_secs(3)),
314 None,
315 &HTTP_PULL_SOURCE_TAGS,
316 )
317 .await;
318 assert_eq!(events.len(), 8);
319 }
320
321 #[tokio::test]
322 async fn test_stub_status() {
323 let url = format!("{}/basic_status", nginx_address());
324 test_nginx(url, None, ProxyConfig::default()).await
325 }
326
327 #[tokio::test]
328 async fn test_stub_status_auth() {
329 let url = format!("{}/basic_status_auth", nginx_address());
330 test_nginx(
331 url,
332 Some(Auth::Basic {
333 user: "vector".to_owned(),
334 password: "vector".to_owned().into(),
335 }),
336 ProxyConfig::default(),
337 )
338 .await
339 }
340
341 #[tokio::test]
344 async fn test_stub_status_with_proxy() {
345 let url = format!("{}/basic_status", nginx_proxy_address());
346 test_nginx(
347 url,
348 None,
349 ProxyConfig {
350 http: Some(squid_address()),
351 ..Default::default()
352 },
353 )
354 .await
355 }
356}