vector/sources/eventstoredb_metrics/
mod.rs1use std::time::Duration;
2
3use futures::{FutureExt, StreamExt};
4use http::Uri;
5use http_body::Collected;
6use hyper::{Body, Request};
7use serde_with::serde_as;
8use tokio_stream::wrappers::IntervalStream;
9use vector_lib::{
10 EstimatedJsonEncodedSizeOf,
11 config::LogNamespace,
12 configurable::configurable_component,
13 internal_event::{ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol},
14};
15
16use self::types::Stats;
17use crate::{
18 config::{SourceConfig, SourceContext, SourceOutput},
19 http::HttpClient,
20 internal_events::{
21 EventStoreDbMetricsHttpError, EventStoreDbStatsParsingError, EventsReceived,
22 StreamClosedError,
23 },
24 tls::TlsSettings,
25};
26
27pub mod types;
28
29#[serde_as]
31#[configurable_component(source(
32 "eventstoredb_metrics",
33 "Receive metrics from collected by a EventStoreDB."
34))]
35#[derive(Clone, Debug, Default)]
36pub struct EventStoreDbConfig {
37 #[serde(default = "default_endpoint")]
39 #[configurable(metadata(docs::examples = "https://localhost:2113/stats"))]
40 endpoint: String,
41
42 #[serde(default = "default_scrape_interval_secs")]
44 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
45 #[configurable(metadata(docs::human_name = "Scrape Interval"))]
46 scrape_interval_secs: Duration,
47
48 #[configurable(metadata(docs::examples = "eventstoredb"))]
52 default_namespace: Option<String>,
53}
54
55const fn default_scrape_interval_secs() -> Duration {
56 Duration::from_secs(15)
57}
58
59pub fn default_endpoint() -> String {
60 "https://localhost:2113/stats".to_string()
61}
62
63impl_generate_config_from_default!(EventStoreDbConfig);
64
65#[async_trait::async_trait]
66#[typetag::serde(name = "eventstoredb_metrics")]
67impl SourceConfig for EventStoreDbConfig {
68 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
69 eventstoredb(
70 self.endpoint.clone(),
71 self.scrape_interval_secs,
72 self.default_namespace.clone(),
73 cx,
74 )
75 }
76
77 fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
78 vec![SourceOutput::new_metrics()]
79 }
80
81 fn can_acknowledge(&self) -> bool {
82 false
83 }
84}
85
86fn eventstoredb(
87 endpoint: String,
88 interval: Duration,
89 namespace: Option<String>,
90 mut cx: SourceContext,
91) -> crate::Result<super::Source> {
92 let mut ticks = IntervalStream::new(tokio::time::interval(interval)).take_until(cx.shutdown);
93 let tls_settings = TlsSettings::from_options(None)?;
94 let client = HttpClient::new(tls_settings, &cx.proxy)?;
95 let url: Uri = endpoint.as_str().parse()?;
96
97 let bytes_received = register!(BytesReceived::from(Protocol::HTTP));
98 let events_received = register!(EventsReceived);
99
100 Ok(Box::pin(
101 async move {
102 while ticks.next().await.is_some() {
103 let req = Request::get(&url)
104 .header("content-type", "application/json")
105 .body(Body::empty())
106 .expect("Building request should be infallible.");
107
108 match client.send(req).await {
109 Err(error) => {
110 emit!(EventStoreDbMetricsHttpError {
111 error: error.into(),
112 });
113 continue;
114 }
115
116 Ok(resp) => {
117 let bytes = match http_body::Body::collect(resp.into_body())
118 .await
119 .map(Collected::to_bytes)
120 {
121 Ok(b) => b,
122 Err(error) => {
123 emit!(EventStoreDbMetricsHttpError {
124 error: error.into(),
125 });
126 continue;
127 }
128 };
129 bytes_received.emit(ByteSize(bytes.len()));
130
131 match serde_json::from_slice::<Stats>(bytes.as_ref()) {
132 Err(error) => {
133 emit!(EventStoreDbStatsParsingError { error });
134 continue;
135 }
136
137 Ok(stats) => {
138 let metrics = stats.metrics(namespace.clone());
139 let count = metrics.len();
140 let byte_size = metrics.estimated_json_encoded_size_of();
141
142 events_received.emit(CountByteSize(count, byte_size));
143
144 if (cx.out.send_batch(metrics).await).is_err() {
145 emit!(StreamClosedError { count });
146 break;
147 }
148 }
149 }
150 }
151 }
152 }
153 }
154 .map(Ok)
155 .boxed(),
156 ))
157}
158
159#[cfg(all(test, feature = "eventstoredb_metrics-integration-tests"))]
160mod integration_tests {
161 use tokio::time::Duration;
162
163 use super::*;
164 use crate::test_util::components::{SOURCE_TAGS, run_and_assert_source_compliance};
165
166 const EVENTSTOREDB_SCRAPE_ADDRESS: &str = "http://eventstoredb:2113/stats";
167
168 #[tokio::test]
169 async fn scrape_something() {
170 let config = EventStoreDbConfig {
171 endpoint: EVENTSTOREDB_SCRAPE_ADDRESS.to_owned(),
172 scrape_interval_secs: Duration::from_secs(1),
173 default_namespace: None,
174 };
175
176 let events =
177 run_and_assert_source_compliance(config, Duration::from_secs(5), &SOURCE_TAGS).await;
178 assert!(!events.is_empty());
179 }
180}