vector/sources/nginx_metrics/
mod.rs

1use std::{
2    convert::TryFrom,
3    time::{Duration, Instant},
4};
5
6use bytes::Bytes;
7use chrono::Utc;
8use futures::{future::join_all, StreamExt, TryFutureExt};
9use http::{Request, StatusCode};
10use hyper::{body::to_bytes as body_to_bytes, Body, Uri};
11use serde_with::serde_as;
12use snafu::{ResultExt, Snafu};
13use tokio::time;
14use tokio_stream::wrappers::IntervalStream;
15use vector_lib::configurable::configurable_component;
16use vector_lib::{metric_tags, EstimatedJsonEncodedSizeOf};
17
18use crate::{
19    config::{SourceConfig, SourceContext, SourceOutput},
20    event::metric::{Metric, MetricKind, MetricTags, MetricValue},
21    http::{Auth, HttpClient},
22    internal_events::{
23        CollectionCompleted, EndpointBytesReceived, NginxMetricsEventsReceived,
24        NginxMetricsRequestError, NginxMetricsStubStatusParseError, StreamClosedError,
25    },
26    tls::{TlsConfig, TlsSettings},
27};
28
29pub mod parser;
30use parser::NginxStubStatus;
31use vector_lib::config::LogNamespace;
32
33macro_rules! counter {
34    ($value:expr_2021) => {
35        MetricValue::Counter {
36            value: $value as f64,
37        }
38    };
39}
40
41macro_rules! gauge {
42    ($value:expr_2021) => {
43        MetricValue::Gauge {
44            value: $value as f64,
45        }
46    };
47}
48
49#[derive(Debug, Snafu)]
50enum NginxBuildError {
51    #[snafu(display("Failed to parse endpoint: {}", source))]
52    HostInvalidUri { source: http::uri::InvalidUri },
53}
54
55#[derive(Debug, Snafu)]
56enum NginxError {
57    #[snafu(display("Invalid response status: {}", status))]
58    InvalidResponseStatus { status: StatusCode },
59}
60
61/// Configuration for the `nginx_metrics` source.
62#[serde_as]
63#[configurable_component(source("nginx_metrics", "Collect metrics from NGINX."))]
64#[derive(Clone, Debug, Default)]
65#[serde(deny_unknown_fields)]
66pub struct NginxMetricsConfig {
67    /// A list of NGINX instances to scrape.
68    ///
69    /// Each endpoint must be a valid HTTP/HTTPS URI pointing to an NGINX instance that has the
70    /// `ngx_http_stub_status_module` module enabled.
71    #[configurable(metadata(docs::examples = "http://localhost:8000/basic_status"))]
72    endpoints: Vec<String>,
73
74    /// The interval between scrapes.
75    #[serde(default = "default_scrape_interval_secs")]
76    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
77    #[configurable(metadata(docs::human_name = "Scrape Interval"))]
78    scrape_interval_secs: Duration,
79
80    /// Overrides the default namespace for the metrics emitted by the source.
81    ///
82    /// If set to an empty string, no namespace is added to the metrics.
83    ///
84    /// By default, `nginx` is used.
85    #[serde(default = "default_namespace")]
86    namespace: String,
87
88    #[configurable(derived)]
89    tls: Option<TlsConfig>,
90
91    #[configurable(derived)]
92    auth: Option<Auth>,
93}
94
95pub(super) const fn default_scrape_interval_secs() -> Duration {
96    Duration::from_secs(15)
97}
98
99pub fn default_namespace() -> String {
100    "nginx".to_string()
101}
102
103impl_generate_config_from_default!(NginxMetricsConfig);
104
105#[async_trait::async_trait]
106#[typetag::serde(name = "nginx_metrics")]
107impl SourceConfig for NginxMetricsConfig {
108    async fn build(&self, mut cx: SourceContext) -> crate::Result<super::Source> {
109        let tls = TlsSettings::from_options(self.tls.as_ref())?;
110        let http_client = HttpClient::new(tls, &cx.proxy)?;
111
112        let namespace = Some(self.namespace.clone()).filter(|namespace| !namespace.is_empty());
113        let mut sources = Vec::with_capacity(self.endpoints.len());
114        for endpoint in self.endpoints.iter() {
115            sources.push(NginxMetrics::new(
116                http_client.clone(),
117                endpoint.clone(),
118                self.auth.clone(),
119                namespace.clone(),
120            )?);
121        }
122
123        let duration = self.scrape_interval_secs;
124        let shutdown = cx.shutdown;
125        Ok(Box::pin(async move {
126            let mut interval = IntervalStream::new(time::interval(duration)).take_until(shutdown);
127            while interval.next().await.is_some() {
128                let start = Instant::now();
129                let metrics = join_all(sources.iter().map(|nginx| nginx.collect())).await;
130                emit!(CollectionCompleted {
131                    start,
132                    end: Instant::now()
133                });
134
135                let metrics: Vec<Metric> = metrics.into_iter().flatten().collect();
136                let count = metrics.len();
137
138                if (cx.out.send_batch(metrics).await).is_err() {
139                    emit!(StreamClosedError { count });
140                    return Err(());
141                }
142            }
143
144            Ok(())
145        }))
146    }
147
148    fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
149        vec![SourceOutput::new_metrics()]
150    }
151
152    fn can_acknowledge(&self) -> bool {
153        false
154    }
155}
156
157#[derive(Debug)]
158struct NginxMetrics {
159    http_client: HttpClient,
160    endpoint: String,
161    auth: Option<Auth>,
162    namespace: Option<String>,
163    tags: MetricTags,
164}
165
166impl NginxMetrics {
167    fn new(
168        http_client: HttpClient,
169        endpoint: String,
170        auth: Option<Auth>,
171        namespace: Option<String>,
172    ) -> crate::Result<Self> {
173        let tags = metric_tags!(
174            "endpoint" => endpoint.clone(),
175            "host" => Self::get_endpoint_host(&endpoint)?,
176        );
177
178        Ok(Self {
179            http_client,
180            endpoint,
181            auth,
182            namespace,
183            tags,
184        })
185    }
186
187    fn get_endpoint_host(endpoint: &str) -> crate::Result<String> {
188        let uri: Uri = endpoint.parse().context(HostInvalidUriSnafu)?;
189        Ok(match (uri.host().unwrap_or(""), uri.port()) {
190            (host, None) => host.to_owned(),
191            (host, Some(port)) => format!("{host}:{port}"),
192        })
193    }
194
195    async fn collect(&self) -> Vec<Metric> {
196        let (up_value, mut metrics) = match self.collect_metrics().await {
197            Ok(metrics) => (1.0, metrics),
198            Err(()) => (0.0, vec![]),
199        };
200
201        let byte_size = metrics.estimated_json_encoded_size_of();
202
203        metrics.push(self.create_metric("up", gauge!(up_value)));
204
205        emit!(NginxMetricsEventsReceived {
206            count: metrics.len(),
207            byte_size,
208            endpoint: &self.endpoint
209        });
210
211        metrics
212    }
213
214    async fn collect_metrics(&self) -> Result<Vec<Metric>, ()> {
215        let response = self.get_nginx_response().await.map_err(|error| {
216            emit!(NginxMetricsRequestError {
217                error,
218                endpoint: &self.endpoint,
219            })
220        })?;
221        emit!(EndpointBytesReceived {
222            byte_size: response.len(),
223            protocol: "http",
224            endpoint: &self.endpoint,
225        });
226
227        let status = NginxStubStatus::try_from(String::from_utf8_lossy(&response).as_ref())
228            .map_err(|error| {
229                emit!(NginxMetricsStubStatusParseError {
230                    error,
231                    endpoint: &self.endpoint,
232                })
233            })?;
234
235        Ok(vec![
236            self.create_metric("connections_active", gauge!(status.active)),
237            self.create_metric("connections_accepted_total", counter!(status.accepts)),
238            self.create_metric("connections_handled_total", counter!(status.handled)),
239            self.create_metric("http_requests_total", counter!(status.requests)),
240            self.create_metric("connections_reading", gauge!(status.reading)),
241            self.create_metric("connections_writing", gauge!(status.writing)),
242            self.create_metric("connections_waiting", gauge!(status.waiting)),
243        ])
244    }
245
246    async fn get_nginx_response(&self) -> crate::Result<Bytes> {
247        let mut request = Request::get(&self.endpoint).body(Body::empty())?;
248        if let Some(auth) = &self.auth {
249            auth.apply(&mut request);
250        }
251
252        let response = self.http_client.send(request).await?;
253        let (parts, body) = response.into_parts();
254        match parts.status {
255            StatusCode::OK => body_to_bytes(body).err_into().await,
256            status => Err(Box::new(NginxError::InvalidResponseStatus { status })),
257        }
258    }
259
260    fn create_metric(&self, name: &str, value: MetricValue) -> Metric {
261        Metric::new(name, MetricKind::Absolute, value)
262            .with_namespace(self.namespace.clone())
263            .with_tags(Some(self.tags.clone()))
264            .with_timestamp(Some(Utc::now()))
265    }
266}
267
268#[cfg(test)]
269mod tests {
270    use super::*;
271
272    #[test]
273    fn generate_config() {
274        crate::test_util::test_generate_config::<NginxMetricsConfig>();
275    }
276}
277
278#[cfg(all(test, feature = "nginx-integration-tests"))]
279mod integration_tests {
280    use super::*;
281    use crate::{
282        config::ProxyConfig,
283        test_util::components::{run_and_assert_source_compliance_advanced, HTTP_PULL_SOURCE_TAGS},
284    };
285    use tokio::time::Duration;
286
287    fn nginx_proxy_address() -> String {
288        std::env::var("NGINX_PROXY_ADDRESS").unwrap_or_else(|_| "http://nginx-proxy:8000".into())
289    }
290
291    fn nginx_address() -> String {
292        std::env::var("NGINX_ADDRESS").unwrap_or_else(|_| "http://localhost:8000".into())
293    }
294
295    fn squid_address() -> String {
296        std::env::var("SQUID_ADDRESS").unwrap_or_else(|_| "http://localhost:3128".into())
297    }
298
299    async fn test_nginx(endpoint: String, auth: Option<Auth>, proxy: ProxyConfig) {
300        let config = NginxMetricsConfig {
301            endpoints: vec![endpoint],
302            scrape_interval_secs: Duration::from_secs(15),
303            namespace: "vector_nginx".to_owned(),
304            tls: None,
305            auth,
306        };
307
308        let events = run_and_assert_source_compliance_advanced(
309            config,
310            move |context: &mut SourceContext| {
311                context.proxy = proxy;
312            },
313            Some(Duration::from_secs(3)),
314            None,
315            &HTTP_PULL_SOURCE_TAGS,
316        )
317        .await;
318        assert_eq!(events.len(), 8);
319    }
320
321    #[tokio::test]
322    async fn test_stub_status() {
323        let url = format!("{}/basic_status", nginx_address());
324        test_nginx(url, None, ProxyConfig::default()).await
325    }
326
327    #[tokio::test]
328    async fn test_stub_status_auth() {
329        let url = format!("{}/basic_status_auth", nginx_address());
330        test_nginx(
331            url,
332            Some(Auth::Basic {
333                user: "vector".to_owned(),
334                password: "vector".to_owned().into(),
335            }),
336            ProxyConfig::default(),
337        )
338        .await
339    }
340
341    // This integration test verifies that proxy support is wired up correctly in Vector
342    // It is the only test of its kind
343    #[tokio::test]
344    async fn test_stub_status_with_proxy() {
345        let url = format!("{}/basic_status", nginx_proxy_address());
346        test_nginx(
347            url,
348            None,
349            ProxyConfig {
350                http: Some(squid_address()),
351                ..Default::default()
352            },
353        )
354        .await
355    }
356}