vector/sources/eventstoredb_metrics/
mod.rs1use std::time::Duration;
2
3use futures::{FutureExt, StreamExt};
4use http::Uri;
5use hyper::{Body, Request};
6use serde_with::serde_as;
7use tokio_stream::wrappers::IntervalStream;
8use vector_lib::config::LogNamespace;
9use vector_lib::configurable::configurable_component;
10use vector_lib::internal_event::{
11 ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol,
12};
13use vector_lib::EstimatedJsonEncodedSizeOf;
14
15use self::types::Stats;
16use crate::{
17 config::{SourceConfig, SourceContext, SourceOutput},
18 http::HttpClient,
19 internal_events::{
20 EventStoreDbMetricsHttpError, EventStoreDbStatsParsingError, EventsReceived,
21 StreamClosedError,
22 },
23 tls::TlsSettings,
24};
25
26pub mod types;
27
28#[serde_as]
30#[configurable_component(source(
31 "eventstoredb_metrics",
32 "Receive metrics from collected by a EventStoreDB."
33))]
34#[derive(Clone, Debug, Default)]
35pub struct EventStoreDbConfig {
36 #[serde(default = "default_endpoint")]
38 #[configurable(metadata(docs::examples = "https://localhost:2113/stats"))]
39 endpoint: String,
40
41 #[serde(default = "default_scrape_interval_secs")]
43 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
44 #[configurable(metadata(docs::human_name = "Scrape Interval"))]
45 scrape_interval_secs: Duration,
46
47 #[configurable(metadata(docs::examples = "eventstoredb"))]
51 default_namespace: Option<String>,
52}
53
54const fn default_scrape_interval_secs() -> Duration {
55 Duration::from_secs(15)
56}
57
58pub fn default_endpoint() -> String {
59 "https://localhost:2113/stats".to_string()
60}
61
62impl_generate_config_from_default!(EventStoreDbConfig);
63
64#[async_trait::async_trait]
65#[typetag::serde(name = "eventstoredb_metrics")]
66impl SourceConfig for EventStoreDbConfig {
67 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
68 eventstoredb(
69 self.endpoint.clone(),
70 self.scrape_interval_secs,
71 self.default_namespace.clone(),
72 cx,
73 )
74 }
75
76 fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
77 vec![SourceOutput::new_metrics()]
78 }
79
80 fn can_acknowledge(&self) -> bool {
81 false
82 }
83}
84
85fn eventstoredb(
86 endpoint: String,
87 interval: Duration,
88 namespace: Option<String>,
89 mut cx: SourceContext,
90) -> crate::Result<super::Source> {
91 let mut ticks = IntervalStream::new(tokio::time::interval(interval)).take_until(cx.shutdown);
92 let tls_settings = TlsSettings::from_options(None)?;
93 let client = HttpClient::new(tls_settings, &cx.proxy)?;
94 let url: Uri = endpoint.as_str().parse()?;
95
96 let bytes_received = register!(BytesReceived::from(Protocol::HTTP));
97 let events_received = register!(EventsReceived);
98
99 Ok(Box::pin(
100 async move {
101 while ticks.next().await.is_some() {
102 let req = Request::get(&url)
103 .header("content-type", "application/json")
104 .body(Body::empty())
105 .expect("Building request should be infallible.");
106
107 match client.send(req).await {
108 Err(error) => {
109 emit!(EventStoreDbMetricsHttpError {
110 error: error.into(),
111 });
112 continue;
113 }
114
115 Ok(resp) => {
116 let bytes = match hyper::body::to_bytes(resp.into_body()).await {
117 Ok(b) => b,
118 Err(error) => {
119 emit!(EventStoreDbMetricsHttpError {
120 error: error.into(),
121 });
122 continue;
123 }
124 };
125 bytes_received.emit(ByteSize(bytes.len()));
126
127 match serde_json::from_slice::<Stats>(bytes.as_ref()) {
128 Err(error) => {
129 emit!(EventStoreDbStatsParsingError { error });
130 continue;
131 }
132
133 Ok(stats) => {
134 let metrics = stats.metrics(namespace.clone());
135 let count = metrics.len();
136 let byte_size = metrics.estimated_json_encoded_size_of();
137
138 events_received.emit(CountByteSize(count, byte_size));
139
140 if (cx.out.send_batch(metrics).await).is_err() {
141 emit!(StreamClosedError { count });
142 break;
143 }
144 }
145 }
146 }
147 }
148 }
149 }
150 .map(Ok)
151 .boxed(),
152 ))
153}
154
155#[cfg(all(test, feature = "eventstoredb_metrics-integration-tests"))]
156mod integration_tests {
157 use tokio::time::Duration;
158
159 use super::*;
160 use crate::test_util::components::{run_and_assert_source_compliance, SOURCE_TAGS};
161
162 const EVENTSTOREDB_SCRAPE_ADDRESS: &str = "http://eventstoredb:2113/stats";
163
164 #[tokio::test]
165 async fn scrape_something() {
166 let config = EventStoreDbConfig {
167 endpoint: EVENTSTOREDB_SCRAPE_ADDRESS.to_owned(),
168 scrape_interval_secs: Duration::from_secs(1),
169 default_namespace: None,
170 };
171
172 let events =
173 run_and_assert_source_compliance(config, Duration::from_secs(5), &SOURCE_TAGS).await;
174 assert!(!events.is_empty());
175 }
176}