1use std::time::{Duration, Instant};
2
3use chrono::Utc;
4use futures::{
5 StreamExt,
6 future::{join_all, try_join_all},
7};
8use mongodb::{
9 Client,
10 bson::{self, Bson, Document, doc, from_document},
11 error::Error as MongoError,
12 options::ClientOptions,
13};
14use serde_with::serde_as;
15use snafu::{ResultExt, Snafu};
16use tokio::time;
17use tokio_stream::wrappers::IntervalStream;
18use vector_lib::{
19 ByteSizeOf, EstimatedJsonEncodedSizeOf, configurable::configurable_component, metric_tags,
20};
21
22use crate::{
23 config::{SourceConfig, SourceContext, SourceOutput},
24 event::metric::{Metric, MetricKind, MetricTags, MetricValue},
25 internal_events::{
26 CollectionCompleted, EndpointBytesReceived, MongoDbMetricsBsonParseError,
27 MongoDbMetricsEventsReceived, MongoDbMetricsRequestError, StreamClosedError,
28 },
29};
30
31mod types;
32use types::{CommandBuildInfo, CommandIsMaster, CommandServerStatus, NodeType};
33use vector_lib::config::LogNamespace;
34
35macro_rules! tags {
36 ($tags:expr_2021) => { $tags.clone() };
37 ($tags:expr_2021, $($key:expr_2021 => $value:expr_2021),*) => {
38 {
39 let mut tags = $tags.clone();
40 $(
41 tags.replace($key.into(), $value.to_string());
42 )*
43 tags
44 }
45 };
46}
47
48macro_rules! counter {
49 ($value:expr_2021) => {
50 MetricValue::Counter {
51 value: $value as f64,
52 }
53 };
54}
55
56macro_rules! gauge {
57 ($value:expr_2021) => {
58 MetricValue::Gauge {
59 value: $value as f64,
60 }
61 };
62}
63
64#[derive(Debug, Snafu)]
65enum BuildError {
66 #[snafu(display("invalid endpoint: {}", source))]
67 InvalidEndpoint { source: MongoError },
68 #[snafu(display("invalid client options: {}", source))]
69 InvalidClientOptions { source: MongoError },
70}
71
72#[derive(Debug)]
73enum CollectError {
74 Mongo(MongoError),
75 Bson(bson::de::Error),
76}
77
78#[serde_as]
80#[configurable_component(source("mongodb_metrics", "Collect metrics from the MongoDB database."))]
81#[derive(Clone, Debug, Default)]
82#[serde(deny_unknown_fields)]
83pub struct MongoDbMetricsConfig {
84 #[configurable(metadata(docs::examples = "mongodb://localhost:27017"))]
88 endpoints: Vec<String>,
89
90 #[serde(default = "default_scrape_interval_secs")]
92 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
93 #[configurable(metadata(docs::human_name = "Scrape Interval"))]
94 scrape_interval_secs: Duration,
95
96 #[serde(default = "default_namespace")]
102 namespace: String,
103}
104
105#[derive(Debug)]
106struct MongoDbMetrics {
107 client: Client,
108 endpoint: String,
109 namespace: Option<String>,
110 tags: MetricTags,
111}
112
113pub const fn default_scrape_interval_secs() -> Duration {
114 Duration::from_secs(15)
115}
116
117pub fn default_namespace() -> String {
118 "mongodb".to_string()
119}
120
121impl_generate_config_from_default!(MongoDbMetricsConfig);
122
123#[async_trait::async_trait]
124#[typetag::serde(name = "mongodb_metrics")]
125impl SourceConfig for MongoDbMetricsConfig {
126 async fn build(&self, mut cx: SourceContext) -> crate::Result<super::Source> {
127 let namespace = Some(self.namespace.clone()).filter(|namespace| !namespace.is_empty());
128
129 let sources = try_join_all(
130 self.endpoints
131 .iter()
132 .map(|endpoint| MongoDbMetrics::new(endpoint, namespace.clone())),
133 )
134 .await?;
135
136 let duration = self.scrape_interval_secs;
137 let shutdown = cx.shutdown;
138 Ok(Box::pin(async move {
139 let mut interval = IntervalStream::new(time::interval(duration)).take_until(shutdown);
140 while interval.next().await.is_some() {
141 let start = Instant::now();
142 let metrics = join_all(sources.iter().map(|mongodb| mongodb.collect())).await;
143 emit!(CollectionCompleted {
144 start,
145 end: Instant::now()
146 });
147
148 let metrics: Vec<Metric> = metrics.into_iter().flatten().collect();
149 let count = metrics.len();
150
151 if (cx.out.send_batch(metrics).await).is_err() {
152 emit!(StreamClosedError { count });
153 return Err(());
154 }
155 }
156
157 Ok(())
158 }))
159 }
160
161 fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
162 vec![SourceOutput::new_metrics()]
163 }
164
165 fn can_acknowledge(&self) -> bool {
166 false
167 }
168}
169
170impl MongoDbMetrics {
171 async fn new(endpoint: &str, namespace: Option<String>) -> Result<MongoDbMetrics, BuildError> {
174 let mut client_options = ClientOptions::parse(endpoint)
175 .await
176 .context(InvalidEndpointSnafu)?;
177 client_options.direct_connection = Some(true);
178
179 let endpoint = sanitize_endpoint(endpoint, &client_options);
180 let tags = metric_tags!(
181 "endpoint" => endpoint.clone(),
182 "host" => client_options.hosts[0].to_string(),
183 );
184
185 Ok(Self {
186 client: Client::with_options(client_options).context(InvalidClientOptionsSnafu)?,
187 endpoint,
188 namespace,
189 tags,
190 })
191 }
192
193 async fn get_node_type(&self) -> Result<NodeType, CollectError> {
195 let doc = self
196 .client
197 .database("admin")
198 .run_command(doc! { "isMaster": 1 }, None)
199 .await
200 .map_err(CollectError::Mongo)?;
201 let msg: CommandIsMaster = from_document(doc).map_err(CollectError::Bson)?;
202
203 Ok(if msg.set_name.is_some() || msg.hosts.is_some() {
204 NodeType::Replset
205 } else if msg.msg.map(|msg| msg == "isdbgrid").unwrap_or(false) {
206 NodeType::Mongos
210 } else {
211 NodeType::Mongod
212 })
213 }
214
215 async fn get_build_info(&self) -> Result<CommandBuildInfo, CollectError> {
216 let doc = self
217 .client
218 .database("admin")
219 .run_command(doc! { "buildInfo": 1 }, None)
220 .await
221 .map_err(CollectError::Mongo)?;
222 from_document(doc).map_err(CollectError::Bson)
223 }
224
225 async fn print_version(&self) -> Result<(), CollectError> {
226 if tracing::level_enabled!(tracing::Level::DEBUG) {
227 let node_type = self.get_node_type().await?;
228 let build_info = self.get_build_info().await?;
229 debug!(
230 message = "Connected to server.", endpoint = %self.endpoint, node_type = ?node_type, server_version = ?serde_json::to_string(&build_info).unwrap()
231 );
232 }
233
234 Ok(())
235 }
236
237 fn create_metric(&self, name: &str, value: MetricValue, tags: MetricTags) -> Metric {
238 Metric::new(name, MetricKind::Absolute, value)
239 .with_namespace(self.namespace.clone())
240 .with_tags(Some(tags))
241 .with_timestamp(Some(Utc::now()))
242 }
243
244 async fn collect(&self) -> Vec<Metric> {
245 let (up_value, mut metrics) = match self.collect_server_status().await {
247 Ok(metrics) => (1.0, metrics),
248 Err(error) => {
249 match error {
250 CollectError::Mongo(error) => emit!(MongoDbMetricsRequestError {
251 error,
252 endpoint: &self.endpoint,
253 }),
254 CollectError::Bson(error) => emit!(MongoDbMetricsBsonParseError {
255 error,
256 endpoint: &self.endpoint,
257 }),
258 }
259
260 (0.0, vec![])
261 }
262 };
263
264 metrics.push(self.create_metric("up", gauge!(up_value), tags!(self.tags)));
265
266 emit!(MongoDbMetricsEventsReceived {
267 byte_size: metrics.estimated_json_encoded_size_of(),
268 count: metrics.len(),
269 endpoint: &self.endpoint,
270 });
271
272 metrics
273 }
274
275 async fn collect_server_status(&self) -> Result<Vec<Metric>, CollectError> {
278 self.print_version().await?;
279
280 let mut metrics = vec![];
281
282 let command = doc! { "serverStatus": 1, "opLatencies": { "histograms": true }};
283 let db = self.client.database("admin");
284 let doc = db
285 .run_command(command, None)
286 .await
287 .map_err(CollectError::Mongo)?;
288 let byte_size = document_size(&doc);
289 emit!(EndpointBytesReceived {
290 byte_size,
291 protocol: "tcp",
292 endpoint: &self.endpoint,
293 });
294 let status: CommandServerStatus = from_document(doc).map_err(CollectError::Bson)?;
295
296 metrics.push(self.create_metric(
298 "asserts_total",
299 counter!(status.asserts.regular),
300 tags!(self.tags, "type" => "regular"),
301 ));
302 metrics.push(self.create_metric(
303 "asserts_total",
304 counter!(status.asserts.warning),
305 tags!(self.tags, "type" => "warning"),
306 ));
307 metrics.push(self.create_metric(
308 "asserts_total",
309 counter!(status.asserts.msg),
310 tags!(self.tags, "type" => "msg"),
311 ));
312 metrics.push(self.create_metric(
313 "asserts_total",
314 counter!(status.asserts.user),
315 tags!(self.tags, "type" => "user"),
316 ));
317 metrics.push(self.create_metric(
318 "asserts_total",
319 counter!(status.asserts.rollovers),
320 tags!(self.tags, "type" => "rollovers"),
321 ));
322
323 metrics.push(self.create_metric(
325 "connections",
326 counter!(status.connections.active),
327 tags!(self.tags, "state" => "active"),
328 ));
329 metrics.push(self.create_metric(
330 "connections",
331 counter!(status.connections.available),
332 tags!(self.tags, "state" => "available"),
333 ));
334 metrics.push(self.create_metric(
335 "connections",
336 counter!(status.connections.current),
337 tags!(self.tags, "state" => "current"),
338 ));
339
340 if let Some(value) = status.extra_info.heap_usage_bytes {
342 metrics.push(self.create_metric(
343 "extra_info_heap_usage_bytes",
344 gauge!(value),
345 tags!(self.tags),
346 ));
347 }
348 metrics.push(self.create_metric(
349 "extra_info_page_faults",
350 gauge!(status.extra_info.page_faults),
351 tags!(self.tags),
352 ));
353
354 metrics.push(self.create_metric(
356 "instance_local_time",
357 gauge!(status.instance.local_time.timestamp_millis() / 1000),
358 tags!(self.tags),
359 ));
360 metrics.push(self.create_metric(
361 "instance_uptime_estimate_seconds_total",
362 gauge!(status.instance.uptime_estimate),
363 tags!(self.tags),
364 ));
365 metrics.push(self.create_metric(
366 "instance_uptime_seconds_total",
367 gauge!(status.instance.uptime),
368 tags!(self.tags),
369 ));
370
371 metrics.push(self.create_metric(
373 "memory",
374 gauge!(status.memory.resident),
375 tags!(self.tags, "type" => "resident"),
376 ));
377 metrics.push(self.create_metric(
378 "memory",
379 gauge!(status.memory.r#virtual),
380 tags!(self.tags, "type" => "virtual"),
381 ));
382 if let Some(value) = status.memory.mapped {
383 metrics.push(self.create_metric(
384 "memory",
385 gauge!(value),
386 tags!(self.tags, "type" => "mapped"),
387 ))
388 }
389 if let Some(value) = status.memory.mapped_with_journal {
390 metrics.push(self.create_metric(
391 "memory",
392 gauge!(value),
393 tags!(self.tags, "type" => "mapped_with_journal"),
394 ))
395 }
396
397 metrics.push(self.create_metric(
399 "mongod_global_lock_total_time_seconds",
400 counter!(status.global_lock.total_time),
401 tags!(self.tags),
402 ));
403 metrics.push(self.create_metric(
404 "mongod_global_lock_active_clients",
405 gauge!(status.global_lock.active_clients.total),
406 tags!(self.tags, "type" => "total"),
407 ));
408 metrics.push(self.create_metric(
409 "mongod_global_lock_active_clients",
410 gauge!(status.global_lock.active_clients.readers),
411 tags!(self.tags, "type" => "readers"),
412 ));
413 metrics.push(self.create_metric(
414 "mongod_global_lock_active_clients",
415 gauge!(status.global_lock.active_clients.writers),
416 tags!(self.tags, "type" => "writers"),
417 ));
418 metrics.push(self.create_metric(
419 "mongod_global_lock_current_queue",
420 gauge!(status.global_lock.current_queue.total),
421 tags!(self.tags, "type" => "total"),
422 ));
423 metrics.push(self.create_metric(
424 "mongod_global_lock_current_queue",
425 gauge!(status.global_lock.current_queue.readers),
426 tags!(self.tags, "type" => "readers"),
427 ));
428 metrics.push(self.create_metric(
429 "mongod_global_lock_current_queue",
430 gauge!(status.global_lock.current_queue.writers),
431 tags!(self.tags, "type" => "writers"),
432 ));
433
434 for (r#type, lock) in status.locks {
436 if let Some(modes) = lock.time_acquiring_micros {
437 if let Some(value) = modes.read {
438 metrics.push(self.create_metric(
439 "mongod_locks_time_acquiring_global_seconds_total",
440 counter!(value),
441 tags!(self.tags, "type" => &r#type, "mode" => "read"),
442 ));
443 }
444 if let Some(value) = modes.write {
445 metrics.push(self.create_metric(
446 "mongod_locks_time_acquiring_global_seconds_total",
447 counter!(value),
448 tags!(self.tags, "type" => &r#type, "mode" => "write"),
449 ));
450 }
451 }
452 }
453
454 metrics.push(self.create_metric(
456 "mongod_metrics_cursor_timed_out_total",
457 counter!(status.metrics.cursor.timed_out),
458 tags!(self.tags),
459 ));
460 metrics.push(self.create_metric(
461 "mongod_metrics_cursor_open",
462 gauge!(status.metrics.cursor.open.no_timeout),
463 tags!(self.tags, "state" => "no_timeout"),
464 ));
465 metrics.push(self.create_metric(
466 "mongod_metrics_cursor_open",
467 gauge!(status.metrics.cursor.open.pinned),
468 tags!(self.tags, "state" => "pinned"),
469 ));
470 metrics.push(self.create_metric(
471 "mongod_metrics_cursor_open",
472 gauge!(status.metrics.cursor.open.total),
473 tags!(self.tags, "state" => "total"),
474 ));
475
476 metrics.push(self.create_metric(
478 "mongod_metrics_document_total",
479 counter!(status.metrics.document.deleted),
480 tags!(self.tags, "state" => "deleted"),
481 ));
482 metrics.push(self.create_metric(
483 "mongod_metrics_document_total",
484 counter!(status.metrics.document.inserted),
485 tags!(self.tags, "state" => "inserted"),
486 ));
487 metrics.push(self.create_metric(
488 "mongod_metrics_document_total",
489 counter!(status.metrics.document.returned),
490 tags!(self.tags, "state" => "returned"),
491 ));
492 metrics.push(self.create_metric(
493 "mongod_metrics_document_total",
494 counter!(status.metrics.document.updated),
495 tags!(self.tags, "state" => "updated"),
496 ));
497
498 metrics.push(self.create_metric(
500 "mongod_metrics_get_last_error_wtime_num",
501 gauge!(status.metrics.get_last_error.wtime.num),
502 tags!(self.tags),
503 ));
504 metrics.push(self.create_metric(
505 "mongod_metrics_get_last_error_wtime_seconds_total",
506 counter!(status.metrics.get_last_error.wtime.total_millis / 1000),
507 tags!(self.tags),
508 ));
509 metrics.push(self.create_metric(
510 "mongod_metrics_get_last_error_wtimeouts_total",
511 counter!(status.metrics.get_last_error.wtimeouts),
512 tags!(self.tags),
513 ));
514
515 metrics.push(self.create_metric(
517 "mongod_metrics_operation_total",
518 counter!(status.metrics.operation.scan_and_order),
519 tags!(self.tags, "type" => "scan_and_order"),
520 ));
521 metrics.push(self.create_metric(
522 "mongod_metrics_operation_total",
523 counter!(status.metrics.operation.write_conflicts),
524 tags!(self.tags, "type" => "write_conflicts"),
525 ));
526
527 metrics.push(self.create_metric(
529 "mongod_metrics_query_executor_total",
530 counter!(status.metrics.query_executor.scanned),
531 tags!(self.tags, "state" => "scanned"),
532 ));
533 metrics.push(self.create_metric(
534 "mongod_metrics_query_executor_total",
535 counter!(status.metrics.query_executor.scanned_objects),
536 tags!(self.tags, "state" => "scanned_objects"),
537 ));
538 if let Some(doc) = status.metrics.query_executor.collection_scans {
539 metrics.push(self.create_metric(
540 "mongod_metrics_query_executor_total",
541 counter!(doc.total),
542 tags!(self.tags, "state" => "collection_scans"),
543 ));
544 }
545
546 if let Some(record) = status.metrics.record {
548 metrics.push(self.create_metric(
549 "mongod_metrics_record_moves_total",
550 counter!(record.moves),
551 tags!(self.tags),
552 ));
553 }
554
555 metrics.push(self.create_metric(
557 "mongod_metrics_repl_apply_batches_num_total",
558 counter!(status.metrics.repl.apply.batches.num),
559 tags!(self.tags),
560 ));
561 metrics.push(self.create_metric(
562 "mongod_metrics_repl_apply_batches_seconds_total",
563 counter!(status.metrics.repl.apply.batches.total_millis / 1000),
564 tags!(self.tags),
565 ));
566 metrics.push(self.create_metric(
567 "mongod_metrics_repl_apply_ops_total",
568 counter!(status.metrics.repl.apply.ops),
569 tags!(self.tags),
570 ));
571
572 metrics.push(self.create_metric(
574 "mongod_metrics_repl_buffer_count",
575 counter!(status.metrics.repl.buffer.count),
576 tags!(self.tags),
577 ));
578 metrics.push(self.create_metric(
579 "mongod_metrics_repl_buffer_max_size_bytes_total",
580 counter!(status.metrics.repl.buffer.max_size_bytes),
581 tags!(self.tags),
582 ));
583 metrics.push(self.create_metric(
584 "mongod_metrics_repl_buffer_size_bytes",
585 counter!(status.metrics.repl.buffer.size_bytes),
586 tags!(self.tags),
587 ));
588
589 metrics.push(self.create_metric(
591 "mongod_metrics_repl_executor_queue",
592 gauge!(status.metrics.repl.executor.queues.network_in_progress),
593 tags!(self.tags, "type" => "network_in_progress"),
594 ));
595 metrics.push(self.create_metric(
596 "mongod_metrics_repl_executor_queue",
597 gauge!(status.metrics.repl.executor.queues.sleepers),
598 tags!(self.tags, "type" => "sleepers"),
599 ));
600 metrics.push(self.create_metric(
601 "mongod_metrics_repl_executor_unsignaled_events",
602 gauge!(status.metrics.repl.executor.unsignaled_events),
603 tags!(self.tags),
604 ));
605
606 metrics.push(self.create_metric(
608 "mongod_metrics_repl_network_bytes_total",
609 counter!(status.metrics.repl.network.bytes),
610 tags!(self.tags),
611 ));
612 metrics.push(self.create_metric(
613 "mongod_metrics_repl_network_getmores_num_total",
614 counter!(status.metrics.repl.network.getmores.num),
615 tags!(self.tags),
616 ));
617 metrics.push(self.create_metric(
618 "mongod_metrics_repl_network_getmores_seconds_total",
619 counter!(status.metrics.repl.network.getmores.total_millis / 1000),
620 tags!(self.tags),
621 ));
622 metrics.push(self.create_metric(
623 "mongod_metrics_repl_network_ops_total",
624 counter!(status.metrics.repl.network.ops),
625 tags!(self.tags),
626 ));
627 metrics.push(self.create_metric(
628 "mongod_metrics_repl_network_readers_created_total",
629 counter!(status.metrics.repl.network.readers_created),
630 tags!(self.tags),
631 ));
632
633 metrics.push(self.create_metric(
635 "mongod_metrics_ttl_deleted_documents_total",
636 counter!(status.metrics.ttl.deleted_documents),
637 tags!(self.tags),
638 ));
639 metrics.push(self.create_metric(
640 "mongod_metrics_ttl_passes_total",
641 counter!(status.metrics.ttl.passes),
642 tags!(self.tags),
643 ));
644
645 for (r#type, stat) in status.op_latencies {
647 for bucket in stat.histogram {
648 metrics.push(self.create_metric(
649 "mongod_op_latencies_histogram",
650 gauge!(bucket.count),
651 tags!(self.tags, "type" => &r#type, "micros" => bucket.micros.to_string()),
652 ));
653 }
654 metrics.push(self.create_metric(
655 "mongod_op_latencies_latency",
656 gauge!(stat.latency),
657 tags!(self.tags, "type" => &r#type),
658 ));
659 metrics.push(self.create_metric(
660 "mongod_op_latencies_ops_total",
661 gauge!(stat.ops),
662 tags!(self.tags, "type" => &r#type),
663 ));
664 }
665
666 metrics.push(self.create_metric(
668 "mongod_storage_engine",
669 gauge!(1),
670 tags!(self.tags, "engine" => status.storage_engine.name),
671 ));
672
673 if let Some(stat) = status.wired_tiger {
675 metrics.push(self.create_metric(
677 "mongod_wiredtiger_blockmanager_blocks_total",
678 counter!(stat.block_manager.blocks_read),
679 tags!(self.tags, "type" => "blocks_read"),
680 ));
681 metrics.push(self.create_metric(
682 "mongod_wiredtiger_blockmanager_blocks_total",
683 counter!(stat.block_manager.mapped_blocks_read),
684 tags!(self.tags, "type" => "blocks_read_mapped"),
685 ));
686 metrics.push(self.create_metric(
687 "mongod_wiredtiger_blockmanager_blocks_total",
688 counter!(stat.block_manager.blocks_pre_loaded),
689 tags!(self.tags, "type" => "blocks_pre_loaded"),
690 ));
691 metrics.push(self.create_metric(
692 "mongod_wiredtiger_blockmanager_blocks_total",
693 counter!(stat.block_manager.blocks_written),
694 tags!(self.tags, "type" => "blocks_written"),
695 ));
696
697 metrics.push(self.create_metric(
699 "mongod_wiredtiger_blockmanager_bytes_total",
700 counter!(stat.block_manager.bytes_read),
701 tags!(self.tags, "type" => "bytes_read"),
702 ));
703 metrics.push(self.create_metric(
704 "mongod_wiredtiger_blockmanager_bytes_total",
705 counter!(stat.block_manager.mapped_bytes_read),
706 tags!(self.tags, "type" => "bytes_read_mapped"),
707 ));
708 metrics.push(self.create_metric(
709 "mongod_wiredtiger_blockmanager_bytes_total",
710 counter!(stat.block_manager.bytes_written),
711 tags!(self.tags, "type" => "bytes_written"),
712 ));
713
714 metrics.push(self.create_metric(
716 "mongod_wiredtiger_cache_bytes",
717 gauge!(stat.cache.bytes_total),
718 tags!(self.tags, "type" => "total"),
719 ));
720 metrics.push(self.create_metric(
721 "mongod_wiredtiger_cache_bytes",
722 gauge!(stat.cache.bytes_dirty),
723 tags!(self.tags, "type" => "dirty"),
724 ));
725 metrics.push(self.create_metric(
726 "mongod_wiredtiger_cache_bytes",
727 gauge!(stat.cache.bytes_internal_pages),
728 tags!(self.tags, "type" => "internal_pages"),
729 ));
730 metrics.push(self.create_metric(
731 "mongod_wiredtiger_cache_bytes",
732 gauge!(stat.cache.bytes_leaf_pages),
733 tags!(self.tags, "type" => "leaf_pages"),
734 ));
735
736 metrics.push(self.create_metric(
738 "mongod_wiredtiger_cache_bytes_total",
739 counter!(stat.cache.pages_read_into),
740 tags!(self.tags, "type" => "read"),
741 ));
742 metrics.push(self.create_metric(
743 "mongod_wiredtiger_cache_bytes_total",
744 counter!(stat.cache.pages_written_from),
745 tags!(self.tags, "type" => "written"),
746 ));
747
748 metrics.push(self.create_metric(
750 "mongod_wiredtiger_cache_evicted_total",
751 counter!(stat.cache.evicted_modified),
752 tags!(self.tags, "type" => "modified"),
753 ));
754 metrics.push(self.create_metric(
755 "mongod_wiredtiger_cache_evicted_total",
756 counter!(stat.cache.evicted_unmodified),
757 tags!(self.tags, "type" => "unmodified"),
758 ));
759
760 metrics.push(self.create_metric(
762 "mongod_wiredtiger_cache_max_bytes",
763 gauge!(stat.cache.max_bytes),
764 tags!(self.tags),
765 ));
766
767 metrics.push(self.create_metric(
769 "mongod_wiredtiger_cache_overhead_percent",
770 gauge!(stat.cache.percent_overhead),
771 tags!(self.tags),
772 ));
773
774 metrics.push(self.create_metric(
776 "mongod_wiredtiger_cache_pages",
777 gauge!(stat.cache.pages_total),
778 tags!(self.tags, "type" => "total"),
779 ));
780 metrics.push(self.create_metric(
781 "mongod_wiredtiger_cache_pages",
782 gauge!(stat.cache.pages_dirty),
783 tags!(self.tags, "type" => "dirty"),
784 ));
785
786 metrics.push(self.create_metric(
788 "mongod_wiredtiger_cache_pages_total",
789 counter!(stat.cache.pages_read_into),
790 tags!(self.tags, "type" => "read"),
791 ));
792 metrics.push(self.create_metric(
793 "mongod_wiredtiger_cache_pages_total",
794 counter!(stat.cache.pages_written_from),
795 tags!(self.tags, "type" => "write"),
796 ));
797
798 metrics.push(self.create_metric(
800 "mongod_wiredtiger_concurrent_transactions_available_tickets",
801 gauge!(stat.concurrent_transactions.read.available),
802 tags!(self.tags, "type" => "read"),
803 ));
804 metrics.push(self.create_metric(
805 "mongod_wiredtiger_concurrent_transactions_available_tickets",
806 gauge!(stat.concurrent_transactions.write.available),
807 tags!(self.tags, "type" => "write"),
808 ));
809 metrics.push(self.create_metric(
810 "mongod_wiredtiger_concurrent_transactions_out_tickets",
811 gauge!(stat.concurrent_transactions.read.out),
812 tags!(self.tags, "type" => "read"),
813 ));
814 metrics.push(self.create_metric(
815 "mongod_wiredtiger_concurrent_transactions_out_tickets",
816 gauge!(stat.concurrent_transactions.write.out),
817 tags!(self.tags, "type" => "write"),
818 ));
819 metrics.push(self.create_metric(
820 "mongod_wiredtiger_concurrent_transactions_total_tickets",
821 gauge!(stat.concurrent_transactions.read.total_tickets),
822 tags!(self.tags, "type" => "read"),
823 ));
824 metrics.push(self.create_metric(
825 "mongod_wiredtiger_concurrent_transactions_total_tickets",
826 gauge!(stat.concurrent_transactions.write.total_tickets),
827 tags!(self.tags, "type" => "write"),
828 ));
829
830 metrics.push(self.create_metric(
832 "mongod_wiredtiger_log_bytes_total",
833 counter!(stat.log.bytes_payload_data),
834 tags!(self.tags, "type" => "payload"),
835 ));
836 metrics.push(self.create_metric(
837 "mongod_wiredtiger_log_bytes_total",
838 counter!(stat.log.bytes_written),
839 tags!(self.tags, "type" => "written"),
840 ));
841 metrics.push(self.create_metric(
842 "mongod_wiredtiger_log_operations_total",
843 counter!(stat.log.log_writes),
844 tags!(self.tags, "type" => "write"),
845 ));
846 metrics.push(self.create_metric(
847 "mongod_wiredtiger_log_operations_total",
848 counter!(stat.log.log_scans),
849 tags!(self.tags, "type" => "scan"),
850 ));
851 metrics.push(self.create_metric(
852 "mongod_wiredtiger_log_operations_total",
853 counter!(stat.log.log_scans_double),
854 tags!(self.tags, "type" => "scan_double"),
855 ));
856 metrics.push(self.create_metric(
857 "mongod_wiredtiger_log_operations_total",
858 counter!(stat.log.log_syncs),
859 tags!(self.tags, "type" => "sync"),
860 ));
861 metrics.push(self.create_metric(
862 "mongod_wiredtiger_log_operations_total",
863 counter!(stat.log.log_sync_dirs),
864 tags!(self.tags, "type" => "sync_dir"),
865 ));
866 metrics.push(self.create_metric(
867 "mongod_wiredtiger_log_operations_total",
868 counter!(stat.log.log_flushes),
869 tags!(self.tags, "type" => "flush"),
870 ));
871 metrics.push(self.create_metric(
872 "mongod_wiredtiger_log_records_scanned_total",
873 counter!(stat.log.records_compressed),
874 tags!(self.tags, "type" => "compressed"),
875 ));
876 metrics.push(self.create_metric(
877 "mongod_wiredtiger_log_records_scanned_total",
878 counter!(stat.log.records_uncompressed),
879 tags!(self.tags, "type" => "uncompressed"),
880 ));
881 metrics.push(self.create_metric(
882 "mongod_wiredtiger_log_records_total",
883 counter!(stat.log.records_processed_log_scan),
884 tags!(self.tags),
885 ));
886
887 metrics.push(self.create_metric(
889 "mongod_wiredtiger_session_open_sessions",
890 gauge!(stat.session.sessions),
891 tags!(self.tags),
892 ));
893
894 metrics.push(self.create_metric(
896 "mongod_wiredtiger_transactions_checkpoint_seconds",
897 gauge!(stat.transaction.checkpoint_min_ms / 1000),
898 tags!(self.tags, "type" => "min"),
899 ));
900 metrics.push(self.create_metric(
901 "mongod_wiredtiger_transactions_checkpoint_seconds",
902 gauge!(stat.transaction.checkpoint_max_ms / 1000),
903 tags!(self.tags, "type" => "max"),
904 ));
905 metrics.push(self.create_metric(
906 "mongod_wiredtiger_transactions_checkpoint_seconds_total",
907 counter!(stat.transaction.checkpoint_total_ms / 1000),
908 tags!(self.tags),
909 ));
910 metrics.push(self.create_metric(
911 "mongod_wiredtiger_transactions_running_checkpoints",
912 gauge!(stat.transaction.checkpoints_running),
913 tags!(self.tags),
914 ));
915 metrics.push(self.create_metric(
916 "mongod_wiredtiger_transactions_total",
917 counter!(stat.transaction.begins),
918 tags!(self.tags, "type" => "begins"),
919 ));
920 metrics.push(self.create_metric(
921 "mongod_wiredtiger_transactions_total",
922 counter!(stat.transaction.checkpoints),
923 tags!(self.tags, "type" => "checkpoints"),
924 ));
925 metrics.push(self.create_metric(
926 "mongod_wiredtiger_transactions_total",
927 counter!(stat.transaction.committed),
928 tags!(self.tags, "type" => "committed"),
929 ));
930 metrics.push(self.create_metric(
931 "mongod_wiredtiger_transactions_total",
932 counter!(stat.transaction.rolled_back),
933 tags!(self.tags, "type" => "rolledback"),
934 ));
935 }
936
937 metrics.push(self.create_metric(
939 "network_bytes_total",
940 counter!(status.network.bytes_in),
941 tags!(self.tags, "state" => "bytes_in"),
942 ));
943 metrics.push(self.create_metric(
944 "network_bytes_total",
945 counter!(status.network.bytes_out),
946 tags!(self.tags, "state" => "bytes_out"),
947 ));
948 metrics.push(self.create_metric(
949 "network_metrics_num_requests_total",
950 counter!(status.network.num_requests),
951 tags!(self.tags),
952 ));
953
954 for (r#type, value) in status.opcounters {
956 metrics.push(self.create_metric(
957 "op_counters_repl_total",
958 counter!(value),
959 tags!(self.tags, "type" => r#type),
960 ));
961 }
962
963 for (r#type, value) in status.opcounters_repl {
965 metrics.push(self.create_metric(
966 "op_counters_total",
967 counter!(value),
968 tags!(self.tags, "type" => r#type),
969 ));
970 }
971
972 Ok(metrics)
973 }
974}
975
976fn bson_size(value: &Bson) -> usize {
977 match value {
978 Bson::Double(value) => value.size_of(),
979 Bson::String(value) => value.size_of(),
980 Bson::Array(value) => value.iter().map(bson_size).sum(),
981 Bson::Document(value) => document_size(value),
982 Bson::Boolean(_) => std::mem::size_of::<bool>(),
983 Bson::RegularExpression(value) => value.pattern.size_of(),
984 Bson::JavaScriptCode(value) => value.size_of(),
985 Bson::JavaScriptCodeWithScope(value) => value.code.size_of() + document_size(&value.scope),
986 Bson::Int32(value) => value.size_of(),
987 Bson::Int64(value) => value.size_of(),
988 Bson::Timestamp(value) => value.time.size_of() + value.increment.size_of(),
989 Bson::Binary(value) => value.bytes.size_of(),
990 Bson::ObjectId(value) => value.bytes().size_of(),
991 Bson::DateTime(_) => std::mem::size_of::<i64>(),
992 Bson::Symbol(value) => value.size_of(),
993 Bson::Decimal128(value) => value.bytes().size_of(),
994 Bson::DbPointer(_) => {
995 0
997 }
998 Bson::Null | Bson::Undefined | Bson::MaxKey | Bson::MinKey => 0,
999 }
1000}
1001
1002fn document_size(doc: &Document) -> usize {
1003 doc.into_iter()
1004 .map(|(key, value)| key.size_of() + bson_size(value))
1005 .sum()
1006}
1007
1008fn sanitize_endpoint(endpoint: &str, options: &ClientOptions) -> String {
1016 let mut endpoint = endpoint.to_owned();
1017 if options.credential.is_some() {
1018 let start = endpoint.find("://").unwrap() + 3;
1019
1020 let pre_slash = match endpoint[start..].find('/') {
1022 Some(index) => {
1023 let mut segments = endpoint[start..].split_at(index);
1024 if segments.1.len() > 1 {
1026 let lstart = start + segments.0.len() + 1;
1027 let post_slash = &segments.1[1..];
1028 if let Some(index) = post_slash.find('?') {
1030 let segments = post_slash.split_at(index);
1031 if segments.1.len() > 1 {
1033 let options = segments.1[1..]
1035 .split('&')
1036 .filter(|pair| {
1037 let (key, _) = pair.split_at(pair.find('=').unwrap());
1038 !matches!(
1039 key.to_lowercase().as_str(),
1040 "authsource" | "authmechanism" | "authmechanismproperties"
1041 )
1042 })
1043 .collect::<Vec<_>>()
1044 .join("&");
1045
1046 endpoint = format!(
1048 "{}{}",
1049 &endpoint[..lstart + segments.0.len() + 1],
1050 &options
1051 );
1052 }
1053 }
1054 segments = endpoint[start..].split_at(index);
1055 }
1056 segments.0
1057 }
1058 None => &endpoint[start..],
1059 };
1060
1061 let end = pre_slash.rfind('@').unwrap() + 1;
1063 endpoint = format!("{}{}", &endpoint[0..start], &endpoint[start + end..]);
1064 }
1065 endpoint
1066}
1067
1068#[cfg(test)]
1069mod tests {
1070 use super::*;
1071
1072 #[test]
1073 fn generate_config() {
1074 crate::test_util::test_generate_config::<MongoDbMetricsConfig>();
1075 }
1076
1077 #[tokio::test]
1078 async fn sanitize_endpoint_test() {
1079 let endpoint = "mongodb://myDBReader:D1fficultP%40ssw0rd@mongos0.example.com:27017,mongos1.example.com:27017,mongos2.example.com:27017/?authSource=admin&tls=true";
1080 let client_options = ClientOptions::parse(endpoint).await.unwrap();
1081 let endpoint = sanitize_endpoint(endpoint, &client_options);
1082 assert_eq!(
1083 &endpoint,
1084 "mongodb://mongos0.example.com:27017,mongos1.example.com:27017,mongos2.example.com:27017/?tls=true"
1085 );
1086 }
1087}
1088
1089#[cfg(all(test, feature = "mongodb_metrics-integration-tests"))]
1090mod integration_tests {
1091 use futures::StreamExt;
1092 use tokio::time::{Duration, timeout};
1093
1094 use super::*;
1095 use crate::{
1096 SourceSender,
1097 test_util::{
1098 components::{PULL_SOURCE_TAGS, assert_source_compliance},
1099 trace_init,
1100 },
1101 };
1102
1103 fn primary_mongo_address() -> String {
1104 std::env::var("PRIMARY_MONGODB_ADDRESS")
1105 .unwrap_or_else(|_| "mongodb://localhost:27017".into())
1106 }
1107
1108 fn secondary_mongo_address() -> String {
1109 std::env::var("SECONDARY_MONGODB_ADDRESS")
1110 .unwrap_or_else(|_| "mongodb://localhost:27019".into())
1111 }
1112
1113 fn remove_creds(address: &str) -> String {
1114 let mut url = url::Url::parse(address).unwrap();
1115 url.set_password(None).unwrap();
1116 url.set_username("").unwrap();
1117 url.to_string()
1118 }
1119
1120 async fn test_instance(endpoint: String) {
1121 assert_source_compliance(&PULL_SOURCE_TAGS, async {
1122 let host = ClientOptions::parse(endpoint.as_str()).await.unwrap().hosts[0].to_string();
1123 let namespace = "vector_mongodb";
1124
1125 let (sender, mut recv) = SourceSender::new_test();
1126
1127 let endpoints = vec![endpoint.clone()];
1128 tokio::spawn(async move {
1129 MongoDbMetricsConfig {
1130 endpoints,
1131 scrape_interval_secs: Duration::from_secs(15),
1132 namespace: namespace.to_owned(),
1133 }
1134 .build(SourceContext::new_test(sender, None))
1135 .await
1136 .unwrap()
1137 .await
1138 .unwrap()
1139 });
1140
1141 let event = timeout(Duration::from_secs(30), recv.next())
1146 .await
1147 .expect("fetch metrics timeout")
1148 .expect("failed to get metrics from a stream");
1149 let mut events = vec![event];
1150 loop {
1151 match timeout(Duration::from_millis(10), recv.next()).await {
1152 Ok(Some(event)) => events.push(event),
1153 Ok(None) => break,
1154 Err(_) => break,
1155 }
1156 }
1157
1158 let clean_endpoint = remove_creds(&endpoint);
1159
1160 assert!(events.len() > 100);
1161 for event in events {
1162 let metric = event.into_metric();
1163 assert!(metric.namespace() == Some(namespace));
1165 let timestamp = metric.timestamp().expect("existed timestamp");
1167 assert!((timestamp - Utc::now()).num_seconds() < 1);
1168 let tags = metric.tags().expect("existed tags");
1170 assert_eq!(tags.get("endpoint"), Some(&clean_endpoint[..]));
1171 assert_eq!(tags.get("host"), Some(&host[..]));
1172 }
1173 })
1174 .await;
1175 }
1176
1177 #[tokio::test]
1178 async fn fetch_metrics_mongod() {
1179 trace_init();
1180 test_instance(primary_mongo_address()).await;
1181 }
1182
1183 #[tokio::test]
1191 async fn fetch_metrics_replset() {
1192 trace_init();
1193 test_instance(secondary_mongo_address()).await;
1194 }
1195}