vector/sources/eventstoredb_metrics/
mod.rs

1use std::time::Duration;
2
3use futures::{FutureExt, StreamExt};
4use http::Uri;
5use hyper::{Body, Request};
6use serde_with::serde_as;
7use tokio_stream::wrappers::IntervalStream;
8use vector_lib::config::LogNamespace;
9use vector_lib::configurable::configurable_component;
10use vector_lib::internal_event::{
11    ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol,
12};
13use vector_lib::EstimatedJsonEncodedSizeOf;
14
15use self::types::Stats;
16use crate::{
17    config::{SourceConfig, SourceContext, SourceOutput},
18    http::HttpClient,
19    internal_events::{
20        EventStoreDbMetricsHttpError, EventStoreDbStatsParsingError, EventsReceived,
21        StreamClosedError,
22    },
23    tls::TlsSettings,
24};
25
26pub mod types;
27
28/// Configuration for the `eventstoredb_metrics` source.
29#[serde_as]
30#[configurable_component(source(
31    "eventstoredb_metrics",
32    "Receive metrics from collected by a EventStoreDB."
33))]
34#[derive(Clone, Debug, Default)]
35pub struct EventStoreDbConfig {
36    /// Endpoint to scrape stats from.
37    #[serde(default = "default_endpoint")]
38    #[configurable(metadata(docs::examples = "https://localhost:2113/stats"))]
39    endpoint: String,
40
41    /// The interval between scrapes, in seconds.
42    #[serde(default = "default_scrape_interval_secs")]
43    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
44    #[configurable(metadata(docs::human_name = "Scrape Interval"))]
45    scrape_interval_secs: Duration,
46
47    /// Overrides the default namespace for the metrics emitted by the source.
48    ///
49    /// By default, `eventstoredb` is used.
50    #[configurable(metadata(docs::examples = "eventstoredb"))]
51    default_namespace: Option<String>,
52}
53
54const fn default_scrape_interval_secs() -> Duration {
55    Duration::from_secs(15)
56}
57
58pub fn default_endpoint() -> String {
59    "https://localhost:2113/stats".to_string()
60}
61
62impl_generate_config_from_default!(EventStoreDbConfig);
63
64#[async_trait::async_trait]
65#[typetag::serde(name = "eventstoredb_metrics")]
66impl SourceConfig for EventStoreDbConfig {
67    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
68        eventstoredb(
69            self.endpoint.clone(),
70            self.scrape_interval_secs,
71            self.default_namespace.clone(),
72            cx,
73        )
74    }
75
76    fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
77        vec![SourceOutput::new_metrics()]
78    }
79
80    fn can_acknowledge(&self) -> bool {
81        false
82    }
83}
84
85fn eventstoredb(
86    endpoint: String,
87    interval: Duration,
88    namespace: Option<String>,
89    mut cx: SourceContext,
90) -> crate::Result<super::Source> {
91    let mut ticks = IntervalStream::new(tokio::time::interval(interval)).take_until(cx.shutdown);
92    let tls_settings = TlsSettings::from_options(None)?;
93    let client = HttpClient::new(tls_settings, &cx.proxy)?;
94    let url: Uri = endpoint.as_str().parse()?;
95
96    let bytes_received = register!(BytesReceived::from(Protocol::HTTP));
97    let events_received = register!(EventsReceived);
98
99    Ok(Box::pin(
100        async move {
101            while ticks.next().await.is_some() {
102                let req = Request::get(&url)
103                    .header("content-type", "application/json")
104                    .body(Body::empty())
105                    .expect("Building request should be infallible.");
106
107                match client.send(req).await {
108                    Err(error) => {
109                        emit!(EventStoreDbMetricsHttpError {
110                            error: error.into(),
111                        });
112                        continue;
113                    }
114
115                    Ok(resp) => {
116                        let bytes = match hyper::body::to_bytes(resp.into_body()).await {
117                            Ok(b) => b,
118                            Err(error) => {
119                                emit!(EventStoreDbMetricsHttpError {
120                                    error: error.into(),
121                                });
122                                continue;
123                            }
124                        };
125                        bytes_received.emit(ByteSize(bytes.len()));
126
127                        match serde_json::from_slice::<Stats>(bytes.as_ref()) {
128                            Err(error) => {
129                                emit!(EventStoreDbStatsParsingError { error });
130                                continue;
131                            }
132
133                            Ok(stats) => {
134                                let metrics = stats.metrics(namespace.clone());
135                                let count = metrics.len();
136                                let byte_size = metrics.estimated_json_encoded_size_of();
137
138                                events_received.emit(CountByteSize(count, byte_size));
139
140                                if (cx.out.send_batch(metrics).await).is_err() {
141                                    emit!(StreamClosedError { count });
142                                    break;
143                                }
144                            }
145                        }
146                    }
147                }
148            }
149        }
150        .map(Ok)
151        .boxed(),
152    ))
153}
154
155#[cfg(all(test, feature = "eventstoredb_metrics-integration-tests"))]
156mod integration_tests {
157    use tokio::time::Duration;
158
159    use super::*;
160    use crate::test_util::components::{run_and_assert_source_compliance, SOURCE_TAGS};
161
162    const EVENTSTOREDB_SCRAPE_ADDRESS: &str = "http://eventstoredb:2113/stats";
163
164    #[tokio::test]
165    async fn scrape_something() {
166        let config = EventStoreDbConfig {
167            endpoint: EVENTSTOREDB_SCRAPE_ADDRESS.to_owned(),
168            scrape_interval_secs: Duration::from_secs(1),
169            default_namespace: None,
170        };
171
172        let events =
173            run_and_assert_source_compliance(config, Duration::from_secs(5), &SOURCE_TAGS).await;
174        assert!(!events.is_empty());
175    }
176}