1use std::time::{Duration, Instant};
2
3use chrono::Utc;
4use futures::{
5 StreamExt,
6 future::{join_all, try_join_all},
7};
8use mongodb::{
9 Client,
10 bson::{self, Bson, Document, doc, from_document},
11 error::Error as MongoError,
12 options::ClientOptions,
13};
14use serde_with::serde_as;
15use snafu::{ResultExt, Snafu};
16use tokio::time;
17use tokio_stream::wrappers::IntervalStream;
18use vector_lib::{
19 ByteSizeOf, EstimatedJsonEncodedSizeOf, configurable::configurable_component, metric_tags,
20};
21
22use crate::{
23 config::{SourceConfig, SourceContext, SourceOutput},
24 event::metric::{Metric, MetricKind, MetricTags, MetricValue},
25 internal_events::{
26 CollectionCompleted, EndpointBytesReceived, MongoDbMetricsBsonParseError,
27 MongoDbMetricsEventsReceived, MongoDbMetricsRequestError, StreamClosedError,
28 },
29};
30
31mod types;
32use types::{CommandBuildInfo, CommandIsMaster, CommandServerStatus, NodeType};
33use vector_lib::config::LogNamespace;
34
35macro_rules! tags {
36 ($tags:expr_2021) => { $tags.clone() };
37 ($tags:expr_2021, $($key:expr_2021 => $value:expr_2021),*) => {
38 {
39 let mut tags = $tags.clone();
40 $(
41 tags.replace($key.into(), $value.to_string());
42 )*
43 tags
44 }
45 };
46}
47
48macro_rules! counter {
49 ($value:expr_2021) => {
50 MetricValue::Counter {
51 value: $value as f64,
52 }
53 };
54}
55
56macro_rules! gauge {
57 ($value:expr_2021) => {
58 MetricValue::Gauge {
59 value: $value as f64,
60 }
61 };
62}
63
64#[derive(Debug, Snafu)]
65enum BuildError {
66 #[snafu(display("invalid endpoint: {}", source))]
67 InvalidEndpoint { source: MongoError },
68 #[snafu(display("invalid client options: {}", source))]
69 InvalidClientOptions { source: MongoError },
70}
71
72#[derive(Debug)]
73enum CollectError {
74 Mongo(MongoError),
75 Bson(bson::de::Error),
76}
77
78#[serde_as]
80#[configurable_component(source("mongodb_metrics", "Collect metrics from the MongoDB database."))]
81#[derive(Clone, Debug, Default)]
82#[serde(deny_unknown_fields)]
83pub struct MongoDbMetricsConfig {
84 #[configurable(metadata(docs::examples = "mongodb://localhost:27017"))]
88 endpoints: Vec<String>,
89
90 #[serde(default = "default_scrape_interval_secs")]
92 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
93 #[configurable(metadata(docs::human_name = "Scrape Interval"))]
94 scrape_interval_secs: Duration,
95
96 #[serde(default = "default_namespace")]
102 namespace: String,
103}
104
105#[derive(Debug)]
106struct MongoDbMetrics {
107 client: Client,
108 endpoint: String,
109 namespace: Option<String>,
110 tags: MetricTags,
111}
112
113pub const fn default_scrape_interval_secs() -> Duration {
114 Duration::from_secs(15)
115}
116
117pub fn default_namespace() -> String {
118 "mongodb".to_string()
119}
120
121impl_generate_config_from_default!(MongoDbMetricsConfig);
122
123#[async_trait::async_trait]
124#[typetag::serde(name = "mongodb_metrics")]
125impl SourceConfig for MongoDbMetricsConfig {
126 async fn build(&self, mut cx: SourceContext) -> crate::Result<super::Source> {
127 let namespace = Some(self.namespace.clone()).filter(|namespace| !namespace.is_empty());
128
129 let sources = try_join_all(
130 self.endpoints
131 .iter()
132 .map(|endpoint| MongoDbMetrics::new(endpoint, namespace.clone())),
133 )
134 .await?;
135
136 let duration = self.scrape_interval_secs;
137 let shutdown = cx.shutdown;
138 Ok(Box::pin(async move {
139 let mut interval = IntervalStream::new(time::interval(duration)).take_until(shutdown);
140 while interval.next().await.is_some() {
141 let start = Instant::now();
142 let metrics = join_all(sources.iter().map(|mongodb| mongodb.collect())).await;
143 emit!(CollectionCompleted {
144 start,
145 end: Instant::now()
146 });
147
148 let metrics: Vec<Metric> = metrics.into_iter().flatten().collect();
149 let count = metrics.len();
150
151 if (cx.out.send_batch(metrics).await).is_err() {
152 emit!(StreamClosedError { count });
153 return Err(());
154 }
155 }
156
157 Ok(())
158 }))
159 }
160
161 fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
162 vec![SourceOutput::new_metrics()]
163 }
164
165 fn can_acknowledge(&self) -> bool {
166 false
167 }
168}
169
170impl MongoDbMetrics {
171 async fn new(endpoint: &str, namespace: Option<String>) -> Result<MongoDbMetrics, BuildError> {
174 let mut client_options = ClientOptions::parse(endpoint)
175 .await
176 .context(InvalidEndpointSnafu)?;
177 client_options.direct_connection = Some(true);
178
179 let endpoint = sanitize_endpoint(endpoint, &client_options);
180 let tags = metric_tags!(
181 "endpoint" => endpoint.clone(),
182 "host" => client_options.hosts[0].to_string(),
183 );
184
185 Ok(Self {
186 client: Client::with_options(client_options).context(InvalidClientOptionsSnafu)?,
187 endpoint,
188 namespace,
189 tags,
190 })
191 }
192
193 async fn get_node_type(&self) -> Result<NodeType, CollectError> {
195 let doc = self
196 .client
197 .database("admin")
198 .run_command(doc! { "isMaster": 1 })
199 .await
200 .map_err(CollectError::Mongo)?;
201 let msg: CommandIsMaster = from_document(doc).map_err(CollectError::Bson)?;
202
203 Ok(if msg.set_name.is_some() || msg.hosts.is_some() {
204 NodeType::Replset
205 } else if msg.msg.map(|msg| msg == "isdbgrid").unwrap_or(false) {
206 NodeType::Mongos
210 } else {
211 NodeType::Mongod
212 })
213 }
214
215 async fn get_build_info(&self) -> Result<CommandBuildInfo, CollectError> {
216 let doc = self
217 .client
218 .database("admin")
219 .run_command(doc! { "buildInfo": 1 })
220 .await
221 .map_err(CollectError::Mongo)?;
222 from_document(doc).map_err(CollectError::Bson)
223 }
224
225 async fn print_version(&self) -> Result<(), CollectError> {
226 if tracing::level_enabled!(tracing::Level::DEBUG) {
227 let node_type = self.get_node_type().await?;
228 let build_info = self.get_build_info().await?;
229 debug!(
230 message = "Connected to server.", endpoint = %self.endpoint, node_type = ?node_type, server_version = ?serde_json::to_string(&build_info).unwrap()
231 );
232 }
233
234 Ok(())
235 }
236
237 fn create_metric(&self, name: &str, value: MetricValue, tags: MetricTags) -> Metric {
238 Metric::new(name, MetricKind::Absolute, value)
239 .with_namespace(self.namespace.clone())
240 .with_tags(Some(tags))
241 .with_timestamp(Some(Utc::now()))
242 }
243
244 async fn collect(&self) -> Vec<Metric> {
245 let (up_value, mut metrics) = match self.collect_server_status().await {
247 Ok(metrics) => (1.0, metrics),
248 Err(error) => {
249 match error {
250 CollectError::Mongo(error) => emit!(MongoDbMetricsRequestError {
251 error,
252 endpoint: &self.endpoint,
253 }),
254 CollectError::Bson(error) => emit!(MongoDbMetricsBsonParseError {
255 error,
256 endpoint: &self.endpoint,
257 }),
258 }
259
260 (0.0, vec![])
261 }
262 };
263
264 metrics.push(self.create_metric("up", gauge!(up_value), tags!(self.tags)));
265
266 emit!(MongoDbMetricsEventsReceived {
267 byte_size: metrics.estimated_json_encoded_size_of(),
268 count: metrics.len(),
269 endpoint: &self.endpoint,
270 });
271
272 metrics
273 }
274
275 async fn collect_server_status(&self) -> Result<Vec<Metric>, CollectError> {
278 self.print_version().await?;
279
280 let mut metrics = vec![];
281
282 let command = doc! { "serverStatus": 1, "opLatencies": { "histograms": true }};
283 let db = self.client.database("admin");
284 let doc = db.run_command(command).await.map_err(CollectError::Mongo)?;
285 let byte_size = document_size(&doc);
286 emit!(EndpointBytesReceived {
287 byte_size,
288 protocol: "tcp",
289 endpoint: &self.endpoint,
290 });
291 let status: CommandServerStatus = from_document(doc).map_err(CollectError::Bson)?;
292
293 metrics.push(self.create_metric(
295 "asserts_total",
296 counter!(status.asserts.regular),
297 tags!(self.tags, "type" => "regular"),
298 ));
299 metrics.push(self.create_metric(
300 "asserts_total",
301 counter!(status.asserts.warning),
302 tags!(self.tags, "type" => "warning"),
303 ));
304 metrics.push(self.create_metric(
305 "asserts_total",
306 counter!(status.asserts.msg),
307 tags!(self.tags, "type" => "msg"),
308 ));
309 metrics.push(self.create_metric(
310 "asserts_total",
311 counter!(status.asserts.user),
312 tags!(self.tags, "type" => "user"),
313 ));
314 metrics.push(self.create_metric(
315 "asserts_total",
316 counter!(status.asserts.rollovers),
317 tags!(self.tags, "type" => "rollovers"),
318 ));
319
320 metrics.push(self.create_metric(
322 "connections",
323 counter!(status.connections.active),
324 tags!(self.tags, "state" => "active"),
325 ));
326 metrics.push(self.create_metric(
327 "connections",
328 counter!(status.connections.available),
329 tags!(self.tags, "state" => "available"),
330 ));
331 metrics.push(self.create_metric(
332 "connections",
333 counter!(status.connections.current),
334 tags!(self.tags, "state" => "current"),
335 ));
336
337 if let Some(value) = status.extra_info.heap_usage_bytes {
339 metrics.push(self.create_metric(
340 "extra_info_heap_usage_bytes",
341 gauge!(value),
342 tags!(self.tags),
343 ));
344 }
345 metrics.push(self.create_metric(
346 "extra_info_page_faults",
347 gauge!(status.extra_info.page_faults),
348 tags!(self.tags),
349 ));
350
351 metrics.push(self.create_metric(
353 "instance_local_time",
354 gauge!(status.instance.local_time.timestamp_millis() / 1000),
355 tags!(self.tags),
356 ));
357 metrics.push(self.create_metric(
358 "instance_uptime_estimate_seconds_total",
359 gauge!(status.instance.uptime_estimate),
360 tags!(self.tags),
361 ));
362 metrics.push(self.create_metric(
363 "instance_uptime_seconds_total",
364 gauge!(status.instance.uptime),
365 tags!(self.tags),
366 ));
367
368 metrics.push(self.create_metric(
370 "memory",
371 gauge!(status.memory.resident),
372 tags!(self.tags, "type" => "resident"),
373 ));
374 metrics.push(self.create_metric(
375 "memory",
376 gauge!(status.memory.r#virtual),
377 tags!(self.tags, "type" => "virtual"),
378 ));
379 if let Some(value) = status.memory.mapped {
380 metrics.push(self.create_metric(
381 "memory",
382 gauge!(value),
383 tags!(self.tags, "type" => "mapped"),
384 ))
385 }
386 if let Some(value) = status.memory.mapped_with_journal {
387 metrics.push(self.create_metric(
388 "memory",
389 gauge!(value),
390 tags!(self.tags, "type" => "mapped_with_journal"),
391 ))
392 }
393
394 metrics.push(self.create_metric(
396 "mongod_global_lock_total_time_seconds",
397 counter!(status.global_lock.total_time),
398 tags!(self.tags),
399 ));
400 metrics.push(self.create_metric(
401 "mongod_global_lock_active_clients",
402 gauge!(status.global_lock.active_clients.total),
403 tags!(self.tags, "type" => "total"),
404 ));
405 metrics.push(self.create_metric(
406 "mongod_global_lock_active_clients",
407 gauge!(status.global_lock.active_clients.readers),
408 tags!(self.tags, "type" => "readers"),
409 ));
410 metrics.push(self.create_metric(
411 "mongod_global_lock_active_clients",
412 gauge!(status.global_lock.active_clients.writers),
413 tags!(self.tags, "type" => "writers"),
414 ));
415 metrics.push(self.create_metric(
416 "mongod_global_lock_current_queue",
417 gauge!(status.global_lock.current_queue.total),
418 tags!(self.tags, "type" => "total"),
419 ));
420 metrics.push(self.create_metric(
421 "mongod_global_lock_current_queue",
422 gauge!(status.global_lock.current_queue.readers),
423 tags!(self.tags, "type" => "readers"),
424 ));
425 metrics.push(self.create_metric(
426 "mongod_global_lock_current_queue",
427 gauge!(status.global_lock.current_queue.writers),
428 tags!(self.tags, "type" => "writers"),
429 ));
430
431 for (r#type, lock) in status.locks {
433 if let Some(modes) = lock.time_acquiring_micros {
434 if let Some(value) = modes.read {
435 metrics.push(self.create_metric(
436 "mongod_locks_time_acquiring_global_seconds_total",
437 counter!(value),
438 tags!(self.tags, "type" => &r#type, "mode" => "read"),
439 ));
440 }
441 if let Some(value) = modes.write {
442 metrics.push(self.create_metric(
443 "mongod_locks_time_acquiring_global_seconds_total",
444 counter!(value),
445 tags!(self.tags, "type" => &r#type, "mode" => "write"),
446 ));
447 }
448 }
449 }
450
451 metrics.push(self.create_metric(
453 "mongod_metrics_cursor_timed_out_total",
454 counter!(status.metrics.cursor.timed_out),
455 tags!(self.tags),
456 ));
457 metrics.push(self.create_metric(
458 "mongod_metrics_cursor_open",
459 gauge!(status.metrics.cursor.open.no_timeout),
460 tags!(self.tags, "state" => "no_timeout"),
461 ));
462 metrics.push(self.create_metric(
463 "mongod_metrics_cursor_open",
464 gauge!(status.metrics.cursor.open.pinned),
465 tags!(self.tags, "state" => "pinned"),
466 ));
467 metrics.push(self.create_metric(
468 "mongod_metrics_cursor_open",
469 gauge!(status.metrics.cursor.open.total),
470 tags!(self.tags, "state" => "total"),
471 ));
472
473 metrics.push(self.create_metric(
475 "mongod_metrics_document_total",
476 counter!(status.metrics.document.deleted),
477 tags!(self.tags, "state" => "deleted"),
478 ));
479 metrics.push(self.create_metric(
480 "mongod_metrics_document_total",
481 counter!(status.metrics.document.inserted),
482 tags!(self.tags, "state" => "inserted"),
483 ));
484 metrics.push(self.create_metric(
485 "mongod_metrics_document_total",
486 counter!(status.metrics.document.returned),
487 tags!(self.tags, "state" => "returned"),
488 ));
489 metrics.push(self.create_metric(
490 "mongod_metrics_document_total",
491 counter!(status.metrics.document.updated),
492 tags!(self.tags, "state" => "updated"),
493 ));
494
495 metrics.push(self.create_metric(
497 "mongod_metrics_get_last_error_wtime_num",
498 gauge!(status.metrics.get_last_error.wtime.num),
499 tags!(self.tags),
500 ));
501 metrics.push(self.create_metric(
502 "mongod_metrics_get_last_error_wtime_seconds_total",
503 counter!(status.metrics.get_last_error.wtime.total_millis / 1000),
504 tags!(self.tags),
505 ));
506 metrics.push(self.create_metric(
507 "mongod_metrics_get_last_error_wtimeouts_total",
508 counter!(status.metrics.get_last_error.wtimeouts),
509 tags!(self.tags),
510 ));
511
512 metrics.push(self.create_metric(
514 "mongod_metrics_operation_total",
515 counter!(status.metrics.operation.scan_and_order),
516 tags!(self.tags, "type" => "scan_and_order"),
517 ));
518 metrics.push(self.create_metric(
519 "mongod_metrics_operation_total",
520 counter!(status.metrics.operation.write_conflicts),
521 tags!(self.tags, "type" => "write_conflicts"),
522 ));
523
524 metrics.push(self.create_metric(
526 "mongod_metrics_query_executor_total",
527 counter!(status.metrics.query_executor.scanned),
528 tags!(self.tags, "state" => "scanned"),
529 ));
530 metrics.push(self.create_metric(
531 "mongod_metrics_query_executor_total",
532 counter!(status.metrics.query_executor.scanned_objects),
533 tags!(self.tags, "state" => "scanned_objects"),
534 ));
535 if let Some(doc) = status.metrics.query_executor.collection_scans {
536 metrics.push(self.create_metric(
537 "mongod_metrics_query_executor_total",
538 counter!(doc.total),
539 tags!(self.tags, "state" => "collection_scans"),
540 ));
541 }
542
543 if let Some(record) = status.metrics.record {
545 metrics.push(self.create_metric(
546 "mongod_metrics_record_moves_total",
547 counter!(record.moves),
548 tags!(self.tags),
549 ));
550 }
551
552 metrics.push(self.create_metric(
554 "mongod_metrics_repl_apply_batches_num_total",
555 counter!(status.metrics.repl.apply.batches.num),
556 tags!(self.tags),
557 ));
558 metrics.push(self.create_metric(
559 "mongod_metrics_repl_apply_batches_seconds_total",
560 counter!(status.metrics.repl.apply.batches.total_millis / 1000),
561 tags!(self.tags),
562 ));
563 metrics.push(self.create_metric(
564 "mongod_metrics_repl_apply_ops_total",
565 counter!(status.metrics.repl.apply.ops),
566 tags!(self.tags),
567 ));
568
569 metrics.push(self.create_metric(
571 "mongod_metrics_repl_buffer_count",
572 counter!(status.metrics.repl.buffer.count),
573 tags!(self.tags),
574 ));
575 metrics.push(self.create_metric(
576 "mongod_metrics_repl_buffer_max_size_bytes_total",
577 counter!(status.metrics.repl.buffer.max_size_bytes),
578 tags!(self.tags),
579 ));
580 metrics.push(self.create_metric(
581 "mongod_metrics_repl_buffer_size_bytes",
582 counter!(status.metrics.repl.buffer.size_bytes),
583 tags!(self.tags),
584 ));
585
586 metrics.push(self.create_metric(
588 "mongod_metrics_repl_executor_queue",
589 gauge!(status.metrics.repl.executor.queues.network_in_progress),
590 tags!(self.tags, "type" => "network_in_progress"),
591 ));
592 metrics.push(self.create_metric(
593 "mongod_metrics_repl_executor_queue",
594 gauge!(status.metrics.repl.executor.queues.sleepers),
595 tags!(self.tags, "type" => "sleepers"),
596 ));
597 metrics.push(self.create_metric(
598 "mongod_metrics_repl_executor_unsignaled_events",
599 gauge!(status.metrics.repl.executor.unsignaled_events),
600 tags!(self.tags),
601 ));
602
603 metrics.push(self.create_metric(
605 "mongod_metrics_repl_network_bytes_total",
606 counter!(status.metrics.repl.network.bytes),
607 tags!(self.tags),
608 ));
609 metrics.push(self.create_metric(
610 "mongod_metrics_repl_network_getmores_num_total",
611 counter!(status.metrics.repl.network.getmores.num),
612 tags!(self.tags),
613 ));
614 metrics.push(self.create_metric(
615 "mongod_metrics_repl_network_getmores_seconds_total",
616 counter!(status.metrics.repl.network.getmores.total_millis / 1000),
617 tags!(self.tags),
618 ));
619 metrics.push(self.create_metric(
620 "mongod_metrics_repl_network_ops_total",
621 counter!(status.metrics.repl.network.ops),
622 tags!(self.tags),
623 ));
624 metrics.push(self.create_metric(
625 "mongod_metrics_repl_network_readers_created_total",
626 counter!(status.metrics.repl.network.readers_created),
627 tags!(self.tags),
628 ));
629
630 metrics.push(self.create_metric(
632 "mongod_metrics_ttl_deleted_documents_total",
633 counter!(status.metrics.ttl.deleted_documents),
634 tags!(self.tags),
635 ));
636 metrics.push(self.create_metric(
637 "mongod_metrics_ttl_passes_total",
638 counter!(status.metrics.ttl.passes),
639 tags!(self.tags),
640 ));
641
642 for (r#type, stat) in status.op_latencies {
644 for bucket in stat.histogram {
645 metrics.push(self.create_metric(
646 "mongod_op_latencies_histogram",
647 gauge!(bucket.count),
648 tags!(self.tags, "type" => &r#type, "micros" => bucket.micros.to_string()),
649 ));
650 }
651 metrics.push(self.create_metric(
652 "mongod_op_latencies_latency",
653 gauge!(stat.latency),
654 tags!(self.tags, "type" => &r#type),
655 ));
656 metrics.push(self.create_metric(
657 "mongod_op_latencies_ops_total",
658 gauge!(stat.ops),
659 tags!(self.tags, "type" => &r#type),
660 ));
661 }
662
663 metrics.push(self.create_metric(
665 "mongod_storage_engine",
666 gauge!(1),
667 tags!(self.tags, "engine" => status.storage_engine.name),
668 ));
669
670 if let Some(stat) = status.wired_tiger {
672 metrics.push(self.create_metric(
674 "mongod_wiredtiger_blockmanager_blocks_total",
675 counter!(stat.block_manager.blocks_read),
676 tags!(self.tags, "type" => "blocks_read"),
677 ));
678 metrics.push(self.create_metric(
679 "mongod_wiredtiger_blockmanager_blocks_total",
680 counter!(stat.block_manager.mapped_blocks_read),
681 tags!(self.tags, "type" => "blocks_read_mapped"),
682 ));
683 metrics.push(self.create_metric(
684 "mongod_wiredtiger_blockmanager_blocks_total",
685 counter!(stat.block_manager.blocks_pre_loaded),
686 tags!(self.tags, "type" => "blocks_pre_loaded"),
687 ));
688 metrics.push(self.create_metric(
689 "mongod_wiredtiger_blockmanager_blocks_total",
690 counter!(stat.block_manager.blocks_written),
691 tags!(self.tags, "type" => "blocks_written"),
692 ));
693
694 metrics.push(self.create_metric(
696 "mongod_wiredtiger_blockmanager_bytes_total",
697 counter!(stat.block_manager.bytes_read),
698 tags!(self.tags, "type" => "bytes_read"),
699 ));
700 metrics.push(self.create_metric(
701 "mongod_wiredtiger_blockmanager_bytes_total",
702 counter!(stat.block_manager.mapped_bytes_read),
703 tags!(self.tags, "type" => "bytes_read_mapped"),
704 ));
705 metrics.push(self.create_metric(
706 "mongod_wiredtiger_blockmanager_bytes_total",
707 counter!(stat.block_manager.bytes_written),
708 tags!(self.tags, "type" => "bytes_written"),
709 ));
710
711 metrics.push(self.create_metric(
713 "mongod_wiredtiger_cache_bytes",
714 gauge!(stat.cache.bytes_total),
715 tags!(self.tags, "type" => "total"),
716 ));
717 metrics.push(self.create_metric(
718 "mongod_wiredtiger_cache_bytes",
719 gauge!(stat.cache.bytes_dirty),
720 tags!(self.tags, "type" => "dirty"),
721 ));
722 metrics.push(self.create_metric(
723 "mongod_wiredtiger_cache_bytes",
724 gauge!(stat.cache.bytes_internal_pages),
725 tags!(self.tags, "type" => "internal_pages"),
726 ));
727 metrics.push(self.create_metric(
728 "mongod_wiredtiger_cache_bytes",
729 gauge!(stat.cache.bytes_leaf_pages),
730 tags!(self.tags, "type" => "leaf_pages"),
731 ));
732
733 metrics.push(self.create_metric(
735 "mongod_wiredtiger_cache_bytes_total",
736 counter!(stat.cache.pages_read_into),
737 tags!(self.tags, "type" => "read"),
738 ));
739 metrics.push(self.create_metric(
740 "mongod_wiredtiger_cache_bytes_total",
741 counter!(stat.cache.pages_written_from),
742 tags!(self.tags, "type" => "written"),
743 ));
744
745 metrics.push(self.create_metric(
747 "mongod_wiredtiger_cache_evicted_total",
748 counter!(stat.cache.evicted_modified),
749 tags!(self.tags, "type" => "modified"),
750 ));
751 metrics.push(self.create_metric(
752 "mongod_wiredtiger_cache_evicted_total",
753 counter!(stat.cache.evicted_unmodified),
754 tags!(self.tags, "type" => "unmodified"),
755 ));
756
757 metrics.push(self.create_metric(
759 "mongod_wiredtiger_cache_max_bytes",
760 gauge!(stat.cache.max_bytes),
761 tags!(self.tags),
762 ));
763
764 metrics.push(self.create_metric(
766 "mongod_wiredtiger_cache_overhead_percent",
767 gauge!(stat.cache.percent_overhead),
768 tags!(self.tags),
769 ));
770
771 metrics.push(self.create_metric(
773 "mongod_wiredtiger_cache_pages",
774 gauge!(stat.cache.pages_total),
775 tags!(self.tags, "type" => "total"),
776 ));
777 metrics.push(self.create_metric(
778 "mongod_wiredtiger_cache_pages",
779 gauge!(stat.cache.pages_dirty),
780 tags!(self.tags, "type" => "dirty"),
781 ));
782
783 metrics.push(self.create_metric(
785 "mongod_wiredtiger_cache_pages_total",
786 counter!(stat.cache.pages_read_into),
787 tags!(self.tags, "type" => "read"),
788 ));
789 metrics.push(self.create_metric(
790 "mongod_wiredtiger_cache_pages_total",
791 counter!(stat.cache.pages_written_from),
792 tags!(self.tags, "type" => "write"),
793 ));
794
795 metrics.push(self.create_metric(
797 "mongod_wiredtiger_concurrent_transactions_available_tickets",
798 gauge!(stat.concurrent_transactions.read.available),
799 tags!(self.tags, "type" => "read"),
800 ));
801 metrics.push(self.create_metric(
802 "mongod_wiredtiger_concurrent_transactions_available_tickets",
803 gauge!(stat.concurrent_transactions.write.available),
804 tags!(self.tags, "type" => "write"),
805 ));
806 metrics.push(self.create_metric(
807 "mongod_wiredtiger_concurrent_transactions_out_tickets",
808 gauge!(stat.concurrent_transactions.read.out),
809 tags!(self.tags, "type" => "read"),
810 ));
811 metrics.push(self.create_metric(
812 "mongod_wiredtiger_concurrent_transactions_out_tickets",
813 gauge!(stat.concurrent_transactions.write.out),
814 tags!(self.tags, "type" => "write"),
815 ));
816 metrics.push(self.create_metric(
817 "mongod_wiredtiger_concurrent_transactions_total_tickets",
818 gauge!(stat.concurrent_transactions.read.total_tickets),
819 tags!(self.tags, "type" => "read"),
820 ));
821 metrics.push(self.create_metric(
822 "mongod_wiredtiger_concurrent_transactions_total_tickets",
823 gauge!(stat.concurrent_transactions.write.total_tickets),
824 tags!(self.tags, "type" => "write"),
825 ));
826
827 metrics.push(self.create_metric(
829 "mongod_wiredtiger_log_bytes_total",
830 counter!(stat.log.bytes_payload_data),
831 tags!(self.tags, "type" => "payload"),
832 ));
833 metrics.push(self.create_metric(
834 "mongod_wiredtiger_log_bytes_total",
835 counter!(stat.log.bytes_written),
836 tags!(self.tags, "type" => "written"),
837 ));
838 metrics.push(self.create_metric(
839 "mongod_wiredtiger_log_operations_total",
840 counter!(stat.log.log_writes),
841 tags!(self.tags, "type" => "write"),
842 ));
843 metrics.push(self.create_metric(
844 "mongod_wiredtiger_log_operations_total",
845 counter!(stat.log.log_scans),
846 tags!(self.tags, "type" => "scan"),
847 ));
848 metrics.push(self.create_metric(
849 "mongod_wiredtiger_log_operations_total",
850 counter!(stat.log.log_scans_double),
851 tags!(self.tags, "type" => "scan_double"),
852 ));
853 metrics.push(self.create_metric(
854 "mongod_wiredtiger_log_operations_total",
855 counter!(stat.log.log_syncs),
856 tags!(self.tags, "type" => "sync"),
857 ));
858 metrics.push(self.create_metric(
859 "mongod_wiredtiger_log_operations_total",
860 counter!(stat.log.log_sync_dirs),
861 tags!(self.tags, "type" => "sync_dir"),
862 ));
863 metrics.push(self.create_metric(
864 "mongod_wiredtiger_log_operations_total",
865 counter!(stat.log.log_flushes),
866 tags!(self.tags, "type" => "flush"),
867 ));
868 metrics.push(self.create_metric(
869 "mongod_wiredtiger_log_records_scanned_total",
870 counter!(stat.log.records_compressed),
871 tags!(self.tags, "type" => "compressed"),
872 ));
873 metrics.push(self.create_metric(
874 "mongod_wiredtiger_log_records_scanned_total",
875 counter!(stat.log.records_uncompressed),
876 tags!(self.tags, "type" => "uncompressed"),
877 ));
878 metrics.push(self.create_metric(
879 "mongod_wiredtiger_log_records_total",
880 counter!(stat.log.records_processed_log_scan),
881 tags!(self.tags),
882 ));
883
884 metrics.push(self.create_metric(
886 "mongod_wiredtiger_session_open_sessions",
887 gauge!(stat.session.sessions),
888 tags!(self.tags),
889 ));
890
891 metrics.push(self.create_metric(
893 "mongod_wiredtiger_transactions_checkpoint_seconds",
894 gauge!(stat.transaction.checkpoint_min_ms / 1000),
895 tags!(self.tags, "type" => "min"),
896 ));
897 metrics.push(self.create_metric(
898 "mongod_wiredtiger_transactions_checkpoint_seconds",
899 gauge!(stat.transaction.checkpoint_max_ms / 1000),
900 tags!(self.tags, "type" => "max"),
901 ));
902 metrics.push(self.create_metric(
903 "mongod_wiredtiger_transactions_checkpoint_seconds_total",
904 counter!(stat.transaction.checkpoint_total_ms / 1000),
905 tags!(self.tags),
906 ));
907 metrics.push(self.create_metric(
908 "mongod_wiredtiger_transactions_running_checkpoints",
909 gauge!(stat.transaction.checkpoints_running),
910 tags!(self.tags),
911 ));
912 metrics.push(self.create_metric(
913 "mongod_wiredtiger_transactions_total",
914 counter!(stat.transaction.begins),
915 tags!(self.tags, "type" => "begins"),
916 ));
917 metrics.push(self.create_metric(
918 "mongod_wiredtiger_transactions_total",
919 counter!(stat.transaction.checkpoints),
920 tags!(self.tags, "type" => "checkpoints"),
921 ));
922 metrics.push(self.create_metric(
923 "mongod_wiredtiger_transactions_total",
924 counter!(stat.transaction.committed),
925 tags!(self.tags, "type" => "committed"),
926 ));
927 metrics.push(self.create_metric(
928 "mongod_wiredtiger_transactions_total",
929 counter!(stat.transaction.rolled_back),
930 tags!(self.tags, "type" => "rolledback"),
931 ));
932 }
933
934 metrics.push(self.create_metric(
936 "network_bytes_total",
937 counter!(status.network.bytes_in),
938 tags!(self.tags, "state" => "bytes_in"),
939 ));
940 metrics.push(self.create_metric(
941 "network_bytes_total",
942 counter!(status.network.bytes_out),
943 tags!(self.tags, "state" => "bytes_out"),
944 ));
945 metrics.push(self.create_metric(
946 "network_metrics_num_requests_total",
947 counter!(status.network.num_requests),
948 tags!(self.tags),
949 ));
950
951 for (r#type, value) in status.opcounters {
953 metrics.push(self.create_metric(
954 "op_counters_repl_total",
955 counter!(value),
956 tags!(self.tags, "type" => r#type),
957 ));
958 }
959
960 for (r#type, value) in status.opcounters_repl {
962 metrics.push(self.create_metric(
963 "op_counters_total",
964 counter!(value),
965 tags!(self.tags, "type" => r#type),
966 ));
967 }
968
969 Ok(metrics)
970 }
971}
972
973fn bson_size(value: &Bson) -> usize {
974 match value {
975 Bson::Double(value) => value.size_of(),
976 Bson::String(value) => value.size_of(),
977 Bson::Array(value) => value.iter().map(bson_size).sum(),
978 Bson::Document(value) => document_size(value),
979 Bson::Boolean(_) => std::mem::size_of::<bool>(),
980 Bson::RegularExpression(value) => value.pattern.size_of(),
981 Bson::JavaScriptCode(value) => value.size_of(),
982 Bson::JavaScriptCodeWithScope(value) => value.code.size_of() + document_size(&value.scope),
983 Bson::Int32(value) => value.size_of(),
984 Bson::Int64(value) => value.size_of(),
985 Bson::Timestamp(value) => value.time.size_of() + value.increment.size_of(),
986 Bson::Binary(value) => value.bytes.size_of(),
987 Bson::ObjectId(value) => value.bytes().size_of(),
988 Bson::DateTime(_) => std::mem::size_of::<i64>(),
989 Bson::Symbol(value) => value.size_of(),
990 Bson::Decimal128(value) => value.bytes().size_of(),
991 Bson::DbPointer(_) => {
992 0
994 }
995 Bson::Null | Bson::Undefined | Bson::MaxKey | Bson::MinKey => 0,
996 }
997}
998
999fn document_size(doc: &Document) -> usize {
1000 doc.into_iter()
1001 .map(|(key, value)| key.size_of() + bson_size(value))
1002 .sum()
1003}
1004
1005fn sanitize_endpoint(endpoint: &str, options: &ClientOptions) -> String {
1013 let mut endpoint = endpoint.to_owned();
1014 if options.credential.is_some() {
1015 let start = endpoint.find("://").unwrap() + 3;
1016
1017 let pre_slash = match endpoint[start..].find('/') {
1019 Some(index) => {
1020 let mut segments = endpoint[start..].split_at(index);
1021 if segments.1.len() > 1 {
1023 let lstart = start + segments.0.len() + 1;
1024 let post_slash = &segments.1[1..];
1025 if let Some(index) = post_slash.find('?') {
1027 let segments = post_slash.split_at(index);
1028 if segments.1.len() > 1 {
1030 let options = segments.1[1..]
1032 .split('&')
1033 .filter(|pair| {
1034 let (key, _) = pair.split_at(pair.find('=').unwrap());
1035 !matches!(
1036 key.to_lowercase().as_str(),
1037 "authsource" | "authmechanism" | "authmechanismproperties"
1038 )
1039 })
1040 .collect::<Vec<_>>()
1041 .join("&");
1042
1043 endpoint = format!(
1045 "{}{}",
1046 &endpoint[..lstart + segments.0.len() + 1],
1047 &options
1048 );
1049 }
1050 }
1051 segments = endpoint[start..].split_at(index);
1052 }
1053 segments.0
1054 }
1055 None => &endpoint[start..],
1056 };
1057
1058 let end = pre_slash.rfind('@').unwrap() + 1;
1060 endpoint = format!("{}{}", &endpoint[0..start], &endpoint[start + end..]);
1061 }
1062 endpoint
1063}
1064
1065#[cfg(test)]
1066mod tests {
1067 use super::*;
1068
1069 #[test]
1070 fn generate_config() {
1071 crate::test_util::test_generate_config::<MongoDbMetricsConfig>();
1072 }
1073
1074 #[tokio::test]
1075 async fn sanitize_endpoint_test() {
1076 let endpoint = "mongodb://myDBReader:D1fficultP%40ssw0rd@mongos0.example.com:27017,mongos1.example.com:27017,mongos2.example.com:27017/?authSource=admin&tls=true";
1077 let client_options = ClientOptions::parse(endpoint).await.unwrap();
1078 let endpoint = sanitize_endpoint(endpoint, &client_options);
1079 assert_eq!(
1080 &endpoint,
1081 "mongodb://mongos0.example.com:27017,mongos1.example.com:27017,mongos2.example.com:27017/?tls=true"
1082 );
1083 }
1084}
1085
1086#[cfg(all(test, feature = "mongodb_metrics-integration-tests"))]
1087mod integration_tests {
1088 use futures::StreamExt;
1089 use tokio::time::{Duration, timeout};
1090
1091 use super::*;
1092 use crate::{
1093 SourceSender,
1094 test_util::{
1095 components::{PULL_SOURCE_TAGS, assert_source_compliance},
1096 trace_init,
1097 },
1098 };
1099
1100 fn primary_mongo_address() -> String {
1101 std::env::var("PRIMARY_MONGODB_ADDRESS")
1102 .unwrap_or_else(|_| "mongodb://localhost:27017".into())
1103 }
1104
1105 fn secondary_mongo_address() -> String {
1106 std::env::var("SECONDARY_MONGODB_ADDRESS")
1107 .unwrap_or_else(|_| "mongodb://localhost:27019".into())
1108 }
1109
1110 fn remove_creds(address: &str) -> String {
1111 let mut url = url::Url::parse(address).unwrap();
1112 url.set_password(None).unwrap();
1113 url.set_username("").unwrap();
1114 url.to_string()
1115 }
1116
1117 async fn test_instance(endpoint: String) {
1118 assert_source_compliance(&PULL_SOURCE_TAGS, async {
1119 let host = ClientOptions::parse(endpoint.as_str()).await.unwrap().hosts[0].to_string();
1120 let namespace = "vector_mongodb";
1121
1122 let (sender, mut recv) = SourceSender::new_test();
1123
1124 let endpoints = vec![endpoint.clone()];
1125 tokio::spawn(async move {
1126 MongoDbMetricsConfig {
1127 endpoints,
1128 scrape_interval_secs: Duration::from_secs(15),
1129 namespace: namespace.to_owned(),
1130 }
1131 .build(SourceContext::new_test(sender, None))
1132 .await
1133 .unwrap()
1134 .await
1135 .unwrap()
1136 });
1137
1138 let event = timeout(Duration::from_secs(30), recv.next())
1143 .await
1144 .expect("fetch metrics timeout")
1145 .expect("failed to get metrics from a stream");
1146 let mut events = vec![event];
1147 loop {
1148 match timeout(Duration::from_millis(10), recv.next()).await {
1149 Ok(Some(event)) => events.push(event),
1150 Ok(None) => break,
1151 Err(_) => break,
1152 }
1153 }
1154
1155 let clean_endpoint = remove_creds(&endpoint);
1156
1157 assert!(events.len() > 100);
1158 for event in events {
1159 let metric = event.into_metric();
1160 assert!(metric.namespace() == Some(namespace));
1162 let timestamp = metric.timestamp().expect("existed timestamp");
1164 assert!((timestamp - Utc::now()).num_seconds() < 1);
1165 let tags = metric.tags().expect("existed tags");
1167 assert_eq!(tags.get("endpoint"), Some(&clean_endpoint[..]));
1168 assert_eq!(tags.get("host"), Some(&host[..]));
1169 }
1170 })
1171 .await;
1172 }
1173
1174 #[tokio::test]
1175 async fn fetch_metrics_mongod() {
1176 trace_init();
1177 test_instance(primary_mongo_address()).await;
1178 }
1179
1180 #[tokio::test]
1188 async fn fetch_metrics_replset() {
1189 trace_init();
1190 test_instance(secondary_mongo_address()).await;
1191 }
1192}