vector/sources/eventstoredb_metrics/
mod.rs

1use std::time::Duration;
2
3use futures::{FutureExt, StreamExt};
4use http::Uri;
5use http_body::Collected;
6use hyper::{Body, Request};
7use serde_with::serde_as;
8use tokio_stream::wrappers::IntervalStream;
9use vector_lib::{
10    EstimatedJsonEncodedSizeOf,
11    config::LogNamespace,
12    configurable::configurable_component,
13    internal_event::{ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol},
14};
15
16use self::types::Stats;
17use crate::{
18    config::{SourceConfig, SourceContext, SourceOutput},
19    http::HttpClient,
20    internal_events::{
21        EventStoreDbMetricsHttpError, EventStoreDbStatsParsingError, EventsReceived,
22        StreamClosedError,
23    },
24    tls::TlsSettings,
25};
26
27pub mod types;
28
29/// Configuration for the `eventstoredb_metrics` source.
30#[serde_as]
31#[configurable_component(source(
32    "eventstoredb_metrics",
33    "Receive metrics from collected by a EventStoreDB."
34))]
35#[derive(Clone, Debug, Default)]
36pub struct EventStoreDbConfig {
37    /// Endpoint to scrape stats from.
38    #[serde(default = "default_endpoint")]
39    #[configurable(metadata(docs::examples = "https://localhost:2113/stats"))]
40    endpoint: String,
41
42    /// The interval between scrapes, in seconds.
43    #[serde(default = "default_scrape_interval_secs")]
44    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
45    #[configurable(metadata(docs::human_name = "Scrape Interval"))]
46    scrape_interval_secs: Duration,
47
48    /// Overrides the default namespace for the metrics emitted by the source.
49    ///
50    /// By default, `eventstoredb` is used.
51    #[configurable(metadata(docs::examples = "eventstoredb"))]
52    default_namespace: Option<String>,
53}
54
55const fn default_scrape_interval_secs() -> Duration {
56    Duration::from_secs(15)
57}
58
59pub fn default_endpoint() -> String {
60    "https://localhost:2113/stats".to_string()
61}
62
63impl_generate_config_from_default!(EventStoreDbConfig);
64
65#[async_trait::async_trait]
66#[typetag::serde(name = "eventstoredb_metrics")]
67impl SourceConfig for EventStoreDbConfig {
68    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
69        eventstoredb(
70            self.endpoint.clone(),
71            self.scrape_interval_secs,
72            self.default_namespace.clone(),
73            cx,
74        )
75    }
76
77    fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
78        vec![SourceOutput::new_metrics()]
79    }
80
81    fn can_acknowledge(&self) -> bool {
82        false
83    }
84}
85
86fn eventstoredb(
87    endpoint: String,
88    interval: Duration,
89    namespace: Option<String>,
90    mut cx: SourceContext,
91) -> crate::Result<super::Source> {
92    let mut ticks = IntervalStream::new(tokio::time::interval(interval)).take_until(cx.shutdown);
93    let tls_settings = TlsSettings::from_options(None)?;
94    let client = HttpClient::new(tls_settings, &cx.proxy)?;
95    let url: Uri = endpoint.as_str().parse()?;
96
97    let bytes_received = register!(BytesReceived::from(Protocol::HTTP));
98    let events_received = register!(EventsReceived);
99
100    Ok(Box::pin(
101        async move {
102            while ticks.next().await.is_some() {
103                let req = Request::get(&url)
104                    .header("content-type", "application/json")
105                    .body(Body::empty())
106                    .expect("Building request should be infallible.");
107
108                match client.send(req).await {
109                    Err(error) => {
110                        emit!(EventStoreDbMetricsHttpError {
111                            error: error.into(),
112                        });
113                        continue;
114                    }
115
116                    Ok(resp) => {
117                        let bytes = match http_body::Body::collect(resp.into_body())
118                            .await
119                            .map(Collected::to_bytes)
120                        {
121                            Ok(b) => b,
122                            Err(error) => {
123                                emit!(EventStoreDbMetricsHttpError {
124                                    error: error.into(),
125                                });
126                                continue;
127                            }
128                        };
129                        bytes_received.emit(ByteSize(bytes.len()));
130
131                        match serde_json::from_slice::<Stats>(bytes.as_ref()) {
132                            Err(error) => {
133                                emit!(EventStoreDbStatsParsingError { error });
134                                continue;
135                            }
136
137                            Ok(stats) => {
138                                let metrics = stats.metrics(namespace.clone());
139                                let count = metrics.len();
140                                let byte_size = metrics.estimated_json_encoded_size_of();
141
142                                events_received.emit(CountByteSize(count, byte_size));
143
144                                if (cx.out.send_batch(metrics).await).is_err() {
145                                    emit!(StreamClosedError { count });
146                                    break;
147                                }
148                            }
149                        }
150                    }
151                }
152            }
153        }
154        .map(Ok)
155        .boxed(),
156    ))
157}
158
159#[cfg(all(test, feature = "eventstoredb_metrics-integration-tests"))]
160mod integration_tests {
161    use tokio::time::Duration;
162
163    use super::*;
164    use crate::test_util::components::{SOURCE_TAGS, run_and_assert_source_compliance};
165
166    const EVENTSTOREDB_SCRAPE_ADDRESS: &str = "http://eventstoredb:2113/stats";
167
168    #[tokio::test]
169    async fn scrape_something() {
170        let config = EventStoreDbConfig {
171            endpoint: EVENTSTOREDB_SCRAPE_ADDRESS.to_owned(),
172            scrape_interval_secs: Duration::from_secs(1),
173            default_namespace: None,
174        };
175
176        let events =
177            run_and_assert_source_compliance(config, Duration::from_secs(5), &SOURCE_TAGS).await;
178        assert!(!events.is_empty());
179    }
180}