1use std::time::{Duration, Instant};
2
3use chrono::Utc;
4use futures::{
5 future::{join_all, try_join_all},
6 StreamExt,
7};
8use mongodb::{
9 bson::{self, doc, from_document, Bson, Document},
10 error::Error as MongoError,
11 options::ClientOptions,
12 Client,
13};
14use serde_with::serde_as;
15use snafu::{ResultExt, Snafu};
16use tokio::time;
17use tokio_stream::wrappers::IntervalStream;
18use vector_lib::configurable::configurable_component;
19use vector_lib::{metric_tags, ByteSizeOf, EstimatedJsonEncodedSizeOf};
20
21use crate::{
22 config::{SourceConfig, SourceContext, SourceOutput},
23 event::metric::{Metric, MetricKind, MetricTags, MetricValue},
24 internal_events::{
25 CollectionCompleted, EndpointBytesReceived, MongoDbMetricsBsonParseError,
26 MongoDbMetricsEventsReceived, MongoDbMetricsRequestError, StreamClosedError,
27 },
28};
29
30mod types;
31use types::{CommandBuildInfo, CommandIsMaster, CommandServerStatus, NodeType};
32use vector_lib::config::LogNamespace;
33
34macro_rules! tags {
35 ($tags:expr_2021) => { $tags.clone() };
36 ($tags:expr_2021, $($key:expr_2021 => $value:expr_2021),*) => {
37 {
38 let mut tags = $tags.clone();
39 $(
40 tags.replace($key.into(), $value.to_string());
41 )*
42 tags
43 }
44 };
45}
46
47macro_rules! counter {
48 ($value:expr_2021) => {
49 MetricValue::Counter {
50 value: $value as f64,
51 }
52 };
53}
54
55macro_rules! gauge {
56 ($value:expr_2021) => {
57 MetricValue::Gauge {
58 value: $value as f64,
59 }
60 };
61}
62
63#[derive(Debug, Snafu)]
64enum BuildError {
65 #[snafu(display("invalid endpoint: {}", source))]
66 InvalidEndpoint { source: MongoError },
67 #[snafu(display("invalid client options: {}", source))]
68 InvalidClientOptions { source: MongoError },
69}
70
71#[derive(Debug)]
72enum CollectError {
73 Mongo(MongoError),
74 Bson(bson::de::Error),
75}
76
77#[serde_as]
79#[configurable_component(source("mongodb_metrics", "Collect metrics from the MongoDB database."))]
80#[derive(Clone, Debug, Default)]
81#[serde(deny_unknown_fields)]
82pub struct MongoDbMetricsConfig {
83 #[configurable(metadata(docs::examples = "mongodb://localhost:27017"))]
87 endpoints: Vec<String>,
88
89 #[serde(default = "default_scrape_interval_secs")]
91 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
92 #[configurable(metadata(docs::human_name = "Scrape Interval"))]
93 scrape_interval_secs: Duration,
94
95 #[serde(default = "default_namespace")]
101 namespace: String,
102}
103
104#[derive(Debug)]
105struct MongoDbMetrics {
106 client: Client,
107 endpoint: String,
108 namespace: Option<String>,
109 tags: MetricTags,
110}
111
112pub const fn default_scrape_interval_secs() -> Duration {
113 Duration::from_secs(15)
114}
115
116pub fn default_namespace() -> String {
117 "mongodb".to_string()
118}
119
120impl_generate_config_from_default!(MongoDbMetricsConfig);
121
122#[async_trait::async_trait]
123#[typetag::serde(name = "mongodb_metrics")]
124impl SourceConfig for MongoDbMetricsConfig {
125 async fn build(&self, mut cx: SourceContext) -> crate::Result<super::Source> {
126 let namespace = Some(self.namespace.clone()).filter(|namespace| !namespace.is_empty());
127
128 let sources = try_join_all(
129 self.endpoints
130 .iter()
131 .map(|endpoint| MongoDbMetrics::new(endpoint, namespace.clone())),
132 )
133 .await?;
134
135 let duration = self.scrape_interval_secs;
136 let shutdown = cx.shutdown;
137 Ok(Box::pin(async move {
138 let mut interval = IntervalStream::new(time::interval(duration)).take_until(shutdown);
139 while interval.next().await.is_some() {
140 let start = Instant::now();
141 let metrics = join_all(sources.iter().map(|mongodb| mongodb.collect())).await;
142 emit!(CollectionCompleted {
143 start,
144 end: Instant::now()
145 });
146
147 let metrics: Vec<Metric> = metrics.into_iter().flatten().collect();
148 let count = metrics.len();
149
150 if (cx.out.send_batch(metrics).await).is_err() {
151 emit!(StreamClosedError { count });
152 return Err(());
153 }
154 }
155
156 Ok(())
157 }))
158 }
159
160 fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
161 vec![SourceOutput::new_metrics()]
162 }
163
164 fn can_acknowledge(&self) -> bool {
165 false
166 }
167}
168
169impl MongoDbMetrics {
170 async fn new(endpoint: &str, namespace: Option<String>) -> Result<MongoDbMetrics, BuildError> {
173 let mut client_options = ClientOptions::parse(endpoint)
174 .await
175 .context(InvalidEndpointSnafu)?;
176 client_options.direct_connection = Some(true);
177
178 let endpoint = sanitize_endpoint(endpoint, &client_options);
179 let tags = metric_tags!(
180 "endpoint" => endpoint.clone(),
181 "host" => client_options.hosts[0].to_string(),
182 );
183
184 Ok(Self {
185 client: Client::with_options(client_options).context(InvalidClientOptionsSnafu)?,
186 endpoint,
187 namespace,
188 tags,
189 })
190 }
191
192 async fn get_node_type(&self) -> Result<NodeType, CollectError> {
194 let doc = self
195 .client
196 .database("admin")
197 .run_command(doc! { "isMaster": 1 }, None)
198 .await
199 .map_err(CollectError::Mongo)?;
200 let msg: CommandIsMaster = from_document(doc).map_err(CollectError::Bson)?;
201
202 Ok(if msg.set_name.is_some() || msg.hosts.is_some() {
203 NodeType::Replset
204 } else if msg.msg.map(|msg| msg == "isdbgrid").unwrap_or(false) {
205 NodeType::Mongos
209 } else {
210 NodeType::Mongod
211 })
212 }
213
214 async fn get_build_info(&self) -> Result<CommandBuildInfo, CollectError> {
215 let doc = self
216 .client
217 .database("admin")
218 .run_command(doc! { "buildInfo": 1 }, None)
219 .await
220 .map_err(CollectError::Mongo)?;
221 from_document(doc).map_err(CollectError::Bson)
222 }
223
224 async fn print_version(&self) -> Result<(), CollectError> {
225 if tracing::level_enabled!(tracing::Level::DEBUG) {
226 let node_type = self.get_node_type().await?;
227 let build_info = self.get_build_info().await?;
228 debug!(
229 message = "Connected to server.", endpoint = %self.endpoint, node_type = ?node_type, server_version = ?serde_json::to_string(&build_info).unwrap()
230 );
231 }
232
233 Ok(())
234 }
235
236 fn create_metric(&self, name: &str, value: MetricValue, tags: MetricTags) -> Metric {
237 Metric::new(name, MetricKind::Absolute, value)
238 .with_namespace(self.namespace.clone())
239 .with_tags(Some(tags))
240 .with_timestamp(Some(Utc::now()))
241 }
242
243 async fn collect(&self) -> Vec<Metric> {
244 let (up_value, mut metrics) = match self.collect_server_status().await {
246 Ok(metrics) => (1.0, metrics),
247 Err(error) => {
248 match error {
249 CollectError::Mongo(error) => emit!(MongoDbMetricsRequestError {
250 error,
251 endpoint: &self.endpoint,
252 }),
253 CollectError::Bson(error) => emit!(MongoDbMetricsBsonParseError {
254 error,
255 endpoint: &self.endpoint,
256 }),
257 }
258
259 (0.0, vec![])
260 }
261 };
262
263 metrics.push(self.create_metric("up", gauge!(up_value), tags!(self.tags)));
264
265 emit!(MongoDbMetricsEventsReceived {
266 byte_size: metrics.estimated_json_encoded_size_of(),
267 count: metrics.len(),
268 endpoint: &self.endpoint,
269 });
270
271 metrics
272 }
273
274 async fn collect_server_status(&self) -> Result<Vec<Metric>, CollectError> {
277 self.print_version().await?;
278
279 let mut metrics = vec![];
280
281 let command = doc! { "serverStatus": 1, "opLatencies": { "histograms": true }};
282 let db = self.client.database("admin");
283 let doc = db
284 .run_command(command, None)
285 .await
286 .map_err(CollectError::Mongo)?;
287 let byte_size = document_size(&doc);
288 emit!(EndpointBytesReceived {
289 byte_size,
290 protocol: "tcp",
291 endpoint: &self.endpoint,
292 });
293 let status: CommandServerStatus = from_document(doc).map_err(CollectError::Bson)?;
294
295 metrics.push(self.create_metric(
297 "asserts_total",
298 counter!(status.asserts.regular),
299 tags!(self.tags, "type" => "regular"),
300 ));
301 metrics.push(self.create_metric(
302 "asserts_total",
303 counter!(status.asserts.warning),
304 tags!(self.tags, "type" => "warning"),
305 ));
306 metrics.push(self.create_metric(
307 "asserts_total",
308 counter!(status.asserts.msg),
309 tags!(self.tags, "type" => "msg"),
310 ));
311 metrics.push(self.create_metric(
312 "asserts_total",
313 counter!(status.asserts.user),
314 tags!(self.tags, "type" => "user"),
315 ));
316 metrics.push(self.create_metric(
317 "asserts_total",
318 counter!(status.asserts.rollovers),
319 tags!(self.tags, "type" => "rollovers"),
320 ));
321
322 metrics.push(self.create_metric(
324 "connections",
325 counter!(status.connections.active),
326 tags!(self.tags, "state" => "active"),
327 ));
328 metrics.push(self.create_metric(
329 "connections",
330 counter!(status.connections.available),
331 tags!(self.tags, "state" => "available"),
332 ));
333 metrics.push(self.create_metric(
334 "connections",
335 counter!(status.connections.current),
336 tags!(self.tags, "state" => "current"),
337 ));
338
339 if let Some(value) = status.extra_info.heap_usage_bytes {
341 metrics.push(self.create_metric(
342 "extra_info_heap_usage_bytes",
343 gauge!(value),
344 tags!(self.tags),
345 ));
346 }
347 metrics.push(self.create_metric(
348 "extra_info_page_faults",
349 gauge!(status.extra_info.page_faults),
350 tags!(self.tags),
351 ));
352
353 metrics.push(self.create_metric(
355 "instance_local_time",
356 gauge!(status.instance.local_time.timestamp_millis() / 1000),
357 tags!(self.tags),
358 ));
359 metrics.push(self.create_metric(
360 "instance_uptime_estimate_seconds_total",
361 gauge!(status.instance.uptime_estimate),
362 tags!(self.tags),
363 ));
364 metrics.push(self.create_metric(
365 "instance_uptime_seconds_total",
366 gauge!(status.instance.uptime),
367 tags!(self.tags),
368 ));
369
370 metrics.push(self.create_metric(
372 "memory",
373 gauge!(status.memory.resident),
374 tags!(self.tags, "type" => "resident"),
375 ));
376 metrics.push(self.create_metric(
377 "memory",
378 gauge!(status.memory.r#virtual),
379 tags!(self.tags, "type" => "virtual"),
380 ));
381 if let Some(value) = status.memory.mapped {
382 metrics.push(self.create_metric(
383 "memory",
384 gauge!(value),
385 tags!(self.tags, "type" => "mapped"),
386 ))
387 }
388 if let Some(value) = status.memory.mapped_with_journal {
389 metrics.push(self.create_metric(
390 "memory",
391 gauge!(value),
392 tags!(self.tags, "type" => "mapped_with_journal"),
393 ))
394 }
395
396 metrics.push(self.create_metric(
398 "mongod_global_lock_total_time_seconds",
399 counter!(status.global_lock.total_time),
400 tags!(self.tags),
401 ));
402 metrics.push(self.create_metric(
403 "mongod_global_lock_active_clients",
404 gauge!(status.global_lock.active_clients.total),
405 tags!(self.tags, "type" => "total"),
406 ));
407 metrics.push(self.create_metric(
408 "mongod_global_lock_active_clients",
409 gauge!(status.global_lock.active_clients.readers),
410 tags!(self.tags, "type" => "readers"),
411 ));
412 metrics.push(self.create_metric(
413 "mongod_global_lock_active_clients",
414 gauge!(status.global_lock.active_clients.writers),
415 tags!(self.tags, "type" => "writers"),
416 ));
417 metrics.push(self.create_metric(
418 "mongod_global_lock_current_queue",
419 gauge!(status.global_lock.current_queue.total),
420 tags!(self.tags, "type" => "total"),
421 ));
422 metrics.push(self.create_metric(
423 "mongod_global_lock_current_queue",
424 gauge!(status.global_lock.current_queue.readers),
425 tags!(self.tags, "type" => "readers"),
426 ));
427 metrics.push(self.create_metric(
428 "mongod_global_lock_current_queue",
429 gauge!(status.global_lock.current_queue.writers),
430 tags!(self.tags, "type" => "writers"),
431 ));
432
433 for (r#type, lock) in status.locks {
435 if let Some(modes) = lock.time_acquiring_micros {
436 if let Some(value) = modes.read {
437 metrics.push(self.create_metric(
438 "mongod_locks_time_acquiring_global_seconds_total",
439 counter!(value),
440 tags!(self.tags, "type" => &r#type, "mode" => "read"),
441 ));
442 }
443 if let Some(value) = modes.write {
444 metrics.push(self.create_metric(
445 "mongod_locks_time_acquiring_global_seconds_total",
446 counter!(value),
447 tags!(self.tags, "type" => &r#type, "mode" => "write"),
448 ));
449 }
450 }
451 }
452
453 metrics.push(self.create_metric(
455 "mongod_metrics_cursor_timed_out_total",
456 counter!(status.metrics.cursor.timed_out),
457 tags!(self.tags),
458 ));
459 metrics.push(self.create_metric(
460 "mongod_metrics_cursor_open",
461 gauge!(status.metrics.cursor.open.no_timeout),
462 tags!(self.tags, "state" => "no_timeout"),
463 ));
464 metrics.push(self.create_metric(
465 "mongod_metrics_cursor_open",
466 gauge!(status.metrics.cursor.open.pinned),
467 tags!(self.tags, "state" => "pinned"),
468 ));
469 metrics.push(self.create_metric(
470 "mongod_metrics_cursor_open",
471 gauge!(status.metrics.cursor.open.total),
472 tags!(self.tags, "state" => "total"),
473 ));
474
475 metrics.push(self.create_metric(
477 "mongod_metrics_document_total",
478 counter!(status.metrics.document.deleted),
479 tags!(self.tags, "state" => "deleted"),
480 ));
481 metrics.push(self.create_metric(
482 "mongod_metrics_document_total",
483 counter!(status.metrics.document.inserted),
484 tags!(self.tags, "state" => "inserted"),
485 ));
486 metrics.push(self.create_metric(
487 "mongod_metrics_document_total",
488 counter!(status.metrics.document.returned),
489 tags!(self.tags, "state" => "returned"),
490 ));
491 metrics.push(self.create_metric(
492 "mongod_metrics_document_total",
493 counter!(status.metrics.document.updated),
494 tags!(self.tags, "state" => "updated"),
495 ));
496
497 metrics.push(self.create_metric(
499 "mongod_metrics_get_last_error_wtime_num",
500 gauge!(status.metrics.get_last_error.wtime.num),
501 tags!(self.tags),
502 ));
503 metrics.push(self.create_metric(
504 "mongod_metrics_get_last_error_wtime_seconds_total",
505 counter!(status.metrics.get_last_error.wtime.total_millis / 1000),
506 tags!(self.tags),
507 ));
508 metrics.push(self.create_metric(
509 "mongod_metrics_get_last_error_wtimeouts_total",
510 counter!(status.metrics.get_last_error.wtimeouts),
511 tags!(self.tags),
512 ));
513
514 metrics.push(self.create_metric(
516 "mongod_metrics_operation_total",
517 counter!(status.metrics.operation.scan_and_order),
518 tags!(self.tags, "type" => "scan_and_order"),
519 ));
520 metrics.push(self.create_metric(
521 "mongod_metrics_operation_total",
522 counter!(status.metrics.operation.write_conflicts),
523 tags!(self.tags, "type" => "write_conflicts"),
524 ));
525
526 metrics.push(self.create_metric(
528 "mongod_metrics_query_executor_total",
529 counter!(status.metrics.query_executor.scanned),
530 tags!(self.tags, "state" => "scanned"),
531 ));
532 metrics.push(self.create_metric(
533 "mongod_metrics_query_executor_total",
534 counter!(status.metrics.query_executor.scanned_objects),
535 tags!(self.tags, "state" => "scanned_objects"),
536 ));
537 if let Some(doc) = status.metrics.query_executor.collection_scans {
538 metrics.push(self.create_metric(
539 "mongod_metrics_query_executor_total",
540 counter!(doc.total),
541 tags!(self.tags, "state" => "collection_scans"),
542 ));
543 }
544
545 if let Some(record) = status.metrics.record {
547 metrics.push(self.create_metric(
548 "mongod_metrics_record_moves_total",
549 counter!(record.moves),
550 tags!(self.tags),
551 ));
552 }
553
554 metrics.push(self.create_metric(
556 "mongod_metrics_repl_apply_batches_num_total",
557 counter!(status.metrics.repl.apply.batches.num),
558 tags!(self.tags),
559 ));
560 metrics.push(self.create_metric(
561 "mongod_metrics_repl_apply_batches_seconds_total",
562 counter!(status.metrics.repl.apply.batches.total_millis / 1000),
563 tags!(self.tags),
564 ));
565 metrics.push(self.create_metric(
566 "mongod_metrics_repl_apply_ops_total",
567 counter!(status.metrics.repl.apply.ops),
568 tags!(self.tags),
569 ));
570
571 metrics.push(self.create_metric(
573 "mongod_metrics_repl_buffer_count",
574 counter!(status.metrics.repl.buffer.count),
575 tags!(self.tags),
576 ));
577 metrics.push(self.create_metric(
578 "mongod_metrics_repl_buffer_max_size_bytes_total",
579 counter!(status.metrics.repl.buffer.max_size_bytes),
580 tags!(self.tags),
581 ));
582 metrics.push(self.create_metric(
583 "mongod_metrics_repl_buffer_size_bytes",
584 counter!(status.metrics.repl.buffer.size_bytes),
585 tags!(self.tags),
586 ));
587
588 metrics.push(self.create_metric(
590 "mongod_metrics_repl_executor_queue",
591 gauge!(status.metrics.repl.executor.queues.network_in_progress),
592 tags!(self.tags, "type" => "network_in_progress"),
593 ));
594 metrics.push(self.create_metric(
595 "mongod_metrics_repl_executor_queue",
596 gauge!(status.metrics.repl.executor.queues.sleepers),
597 tags!(self.tags, "type" => "sleepers"),
598 ));
599 metrics.push(self.create_metric(
600 "mongod_metrics_repl_executor_unsignaled_events",
601 gauge!(status.metrics.repl.executor.unsignaled_events),
602 tags!(self.tags),
603 ));
604
605 metrics.push(self.create_metric(
607 "mongod_metrics_repl_network_bytes_total",
608 counter!(status.metrics.repl.network.bytes),
609 tags!(self.tags),
610 ));
611 metrics.push(self.create_metric(
612 "mongod_metrics_repl_network_getmores_num_total",
613 counter!(status.metrics.repl.network.getmores.num),
614 tags!(self.tags),
615 ));
616 metrics.push(self.create_metric(
617 "mongod_metrics_repl_network_getmores_seconds_total",
618 counter!(status.metrics.repl.network.getmores.total_millis / 1000),
619 tags!(self.tags),
620 ));
621 metrics.push(self.create_metric(
622 "mongod_metrics_repl_network_ops_total",
623 counter!(status.metrics.repl.network.ops),
624 tags!(self.tags),
625 ));
626 metrics.push(self.create_metric(
627 "mongod_metrics_repl_network_readers_created_total",
628 counter!(status.metrics.repl.network.readers_created),
629 tags!(self.tags),
630 ));
631
632 metrics.push(self.create_metric(
634 "mongod_metrics_ttl_deleted_documents_total",
635 counter!(status.metrics.ttl.deleted_documents),
636 tags!(self.tags),
637 ));
638 metrics.push(self.create_metric(
639 "mongod_metrics_ttl_passes_total",
640 counter!(status.metrics.ttl.passes),
641 tags!(self.tags),
642 ));
643
644 for (r#type, stat) in status.op_latencies {
646 for bucket in stat.histogram {
647 metrics.push(self.create_metric(
648 "mongod_op_latencies_histogram",
649 gauge!(bucket.count),
650 tags!(self.tags, "type" => &r#type, "micros" => bucket.micros.to_string()),
651 ));
652 }
653 metrics.push(self.create_metric(
654 "mongod_op_latencies_latency",
655 gauge!(stat.latency),
656 tags!(self.tags, "type" => &r#type),
657 ));
658 metrics.push(self.create_metric(
659 "mongod_op_latencies_ops_total",
660 gauge!(stat.ops),
661 tags!(self.tags, "type" => &r#type),
662 ));
663 }
664
665 metrics.push(self.create_metric(
667 "mongod_storage_engine",
668 gauge!(1),
669 tags!(self.tags, "engine" => status.storage_engine.name),
670 ));
671
672 if let Some(stat) = status.wired_tiger {
674 metrics.push(self.create_metric(
676 "mongod_wiredtiger_blockmanager_blocks_total",
677 counter!(stat.block_manager.blocks_read),
678 tags!(self.tags, "type" => "blocks_read"),
679 ));
680 metrics.push(self.create_metric(
681 "mongod_wiredtiger_blockmanager_blocks_total",
682 counter!(stat.block_manager.mapped_blocks_read),
683 tags!(self.tags, "type" => "blocks_read_mapped"),
684 ));
685 metrics.push(self.create_metric(
686 "mongod_wiredtiger_blockmanager_blocks_total",
687 counter!(stat.block_manager.blocks_pre_loaded),
688 tags!(self.tags, "type" => "blocks_pre_loaded"),
689 ));
690 metrics.push(self.create_metric(
691 "mongod_wiredtiger_blockmanager_blocks_total",
692 counter!(stat.block_manager.blocks_written),
693 tags!(self.tags, "type" => "blocks_written"),
694 ));
695
696 metrics.push(self.create_metric(
698 "mongod_wiredtiger_blockmanager_bytes_total",
699 counter!(stat.block_manager.bytes_read),
700 tags!(self.tags, "type" => "bytes_read"),
701 ));
702 metrics.push(self.create_metric(
703 "mongod_wiredtiger_blockmanager_bytes_total",
704 counter!(stat.block_manager.mapped_bytes_read),
705 tags!(self.tags, "type" => "bytes_read_mapped"),
706 ));
707 metrics.push(self.create_metric(
708 "mongod_wiredtiger_blockmanager_bytes_total",
709 counter!(stat.block_manager.bytes_written),
710 tags!(self.tags, "type" => "bytes_written"),
711 ));
712
713 metrics.push(self.create_metric(
715 "mongod_wiredtiger_cache_bytes",
716 gauge!(stat.cache.bytes_total),
717 tags!(self.tags, "type" => "total"),
718 ));
719 metrics.push(self.create_metric(
720 "mongod_wiredtiger_cache_bytes",
721 gauge!(stat.cache.bytes_dirty),
722 tags!(self.tags, "type" => "dirty"),
723 ));
724 metrics.push(self.create_metric(
725 "mongod_wiredtiger_cache_bytes",
726 gauge!(stat.cache.bytes_internal_pages),
727 tags!(self.tags, "type" => "internal_pages"),
728 ));
729 metrics.push(self.create_metric(
730 "mongod_wiredtiger_cache_bytes",
731 gauge!(stat.cache.bytes_leaf_pages),
732 tags!(self.tags, "type" => "leaf_pages"),
733 ));
734
735 metrics.push(self.create_metric(
737 "mongod_wiredtiger_cache_bytes_total",
738 counter!(stat.cache.pages_read_into),
739 tags!(self.tags, "type" => "read"),
740 ));
741 metrics.push(self.create_metric(
742 "mongod_wiredtiger_cache_bytes_total",
743 counter!(stat.cache.pages_written_from),
744 tags!(self.tags, "type" => "written"),
745 ));
746
747 metrics.push(self.create_metric(
749 "mongod_wiredtiger_cache_evicted_total",
750 counter!(stat.cache.evicted_modified),
751 tags!(self.tags, "type" => "modified"),
752 ));
753 metrics.push(self.create_metric(
754 "mongod_wiredtiger_cache_evicted_total",
755 counter!(stat.cache.evicted_unmodified),
756 tags!(self.tags, "type" => "unmodified"),
757 ));
758
759 metrics.push(self.create_metric(
761 "mongod_wiredtiger_cache_max_bytes",
762 gauge!(stat.cache.max_bytes),
763 tags!(self.tags),
764 ));
765
766 metrics.push(self.create_metric(
768 "mongod_wiredtiger_cache_overhead_percent",
769 gauge!(stat.cache.percent_overhead),
770 tags!(self.tags),
771 ));
772
773 metrics.push(self.create_metric(
775 "mongod_wiredtiger_cache_pages",
776 gauge!(stat.cache.pages_total),
777 tags!(self.tags, "type" => "total"),
778 ));
779 metrics.push(self.create_metric(
780 "mongod_wiredtiger_cache_pages",
781 gauge!(stat.cache.pages_dirty),
782 tags!(self.tags, "type" => "dirty"),
783 ));
784
785 metrics.push(self.create_metric(
787 "mongod_wiredtiger_cache_pages_total",
788 counter!(stat.cache.pages_read_into),
789 tags!(self.tags, "type" => "read"),
790 ));
791 metrics.push(self.create_metric(
792 "mongod_wiredtiger_cache_pages_total",
793 counter!(stat.cache.pages_written_from),
794 tags!(self.tags, "type" => "write"),
795 ));
796
797 metrics.push(self.create_metric(
799 "mongod_wiredtiger_concurrent_transactions_available_tickets",
800 gauge!(stat.concurrent_transactions.read.available),
801 tags!(self.tags, "type" => "read"),
802 ));
803 metrics.push(self.create_metric(
804 "mongod_wiredtiger_concurrent_transactions_available_tickets",
805 gauge!(stat.concurrent_transactions.write.available),
806 tags!(self.tags, "type" => "write"),
807 ));
808 metrics.push(self.create_metric(
809 "mongod_wiredtiger_concurrent_transactions_out_tickets",
810 gauge!(stat.concurrent_transactions.read.out),
811 tags!(self.tags, "type" => "read"),
812 ));
813 metrics.push(self.create_metric(
814 "mongod_wiredtiger_concurrent_transactions_out_tickets",
815 gauge!(stat.concurrent_transactions.write.out),
816 tags!(self.tags, "type" => "write"),
817 ));
818 metrics.push(self.create_metric(
819 "mongod_wiredtiger_concurrent_transactions_total_tickets",
820 gauge!(stat.concurrent_transactions.read.total_tickets),
821 tags!(self.tags, "type" => "read"),
822 ));
823 metrics.push(self.create_metric(
824 "mongod_wiredtiger_concurrent_transactions_total_tickets",
825 gauge!(stat.concurrent_transactions.write.total_tickets),
826 tags!(self.tags, "type" => "write"),
827 ));
828
829 metrics.push(self.create_metric(
831 "mongod_wiredtiger_log_bytes_total",
832 counter!(stat.log.bytes_payload_data),
833 tags!(self.tags, "type" => "payload"),
834 ));
835 metrics.push(self.create_metric(
836 "mongod_wiredtiger_log_bytes_total",
837 counter!(stat.log.bytes_written),
838 tags!(self.tags, "type" => "written"),
839 ));
840 metrics.push(self.create_metric(
841 "mongod_wiredtiger_log_operations_total",
842 counter!(stat.log.log_writes),
843 tags!(self.tags, "type" => "write"),
844 ));
845 metrics.push(self.create_metric(
846 "mongod_wiredtiger_log_operations_total",
847 counter!(stat.log.log_scans),
848 tags!(self.tags, "type" => "scan"),
849 ));
850 metrics.push(self.create_metric(
851 "mongod_wiredtiger_log_operations_total",
852 counter!(stat.log.log_scans_double),
853 tags!(self.tags, "type" => "scan_double"),
854 ));
855 metrics.push(self.create_metric(
856 "mongod_wiredtiger_log_operations_total",
857 counter!(stat.log.log_syncs),
858 tags!(self.tags, "type" => "sync"),
859 ));
860 metrics.push(self.create_metric(
861 "mongod_wiredtiger_log_operations_total",
862 counter!(stat.log.log_sync_dirs),
863 tags!(self.tags, "type" => "sync_dir"),
864 ));
865 metrics.push(self.create_metric(
866 "mongod_wiredtiger_log_operations_total",
867 counter!(stat.log.log_flushes),
868 tags!(self.tags, "type" => "flush"),
869 ));
870 metrics.push(self.create_metric(
871 "mongod_wiredtiger_log_records_scanned_total",
872 counter!(stat.log.records_compressed),
873 tags!(self.tags, "type" => "compressed"),
874 ));
875 metrics.push(self.create_metric(
876 "mongod_wiredtiger_log_records_scanned_total",
877 counter!(stat.log.records_uncompressed),
878 tags!(self.tags, "type" => "uncompressed"),
879 ));
880 metrics.push(self.create_metric(
881 "mongod_wiredtiger_log_records_total",
882 counter!(stat.log.records_processed_log_scan),
883 tags!(self.tags),
884 ));
885
886 metrics.push(self.create_metric(
888 "mongod_wiredtiger_session_open_sessions",
889 gauge!(stat.session.sessions),
890 tags!(self.tags),
891 ));
892
893 metrics.push(self.create_metric(
895 "mongod_wiredtiger_transactions_checkpoint_seconds",
896 gauge!(stat.transaction.checkpoint_min_ms / 1000),
897 tags!(self.tags, "type" => "min"),
898 ));
899 metrics.push(self.create_metric(
900 "mongod_wiredtiger_transactions_checkpoint_seconds",
901 gauge!(stat.transaction.checkpoint_max_ms / 1000),
902 tags!(self.tags, "type" => "max"),
903 ));
904 metrics.push(self.create_metric(
905 "mongod_wiredtiger_transactions_checkpoint_seconds_total",
906 counter!(stat.transaction.checkpoint_total_ms / 1000),
907 tags!(self.tags),
908 ));
909 metrics.push(self.create_metric(
910 "mongod_wiredtiger_transactions_running_checkpoints",
911 gauge!(stat.transaction.checkpoints_running),
912 tags!(self.tags),
913 ));
914 metrics.push(self.create_metric(
915 "mongod_wiredtiger_transactions_total",
916 counter!(stat.transaction.begins),
917 tags!(self.tags, "type" => "begins"),
918 ));
919 metrics.push(self.create_metric(
920 "mongod_wiredtiger_transactions_total",
921 counter!(stat.transaction.checkpoints),
922 tags!(self.tags, "type" => "checkpoints"),
923 ));
924 metrics.push(self.create_metric(
925 "mongod_wiredtiger_transactions_total",
926 counter!(stat.transaction.committed),
927 tags!(self.tags, "type" => "committed"),
928 ));
929 metrics.push(self.create_metric(
930 "mongod_wiredtiger_transactions_total",
931 counter!(stat.transaction.rolled_back),
932 tags!(self.tags, "type" => "rolledback"),
933 ));
934 }
935
936 metrics.push(self.create_metric(
938 "network_bytes_total",
939 counter!(status.network.bytes_in),
940 tags!(self.tags, "state" => "bytes_in"),
941 ));
942 metrics.push(self.create_metric(
943 "network_bytes_total",
944 counter!(status.network.bytes_out),
945 tags!(self.tags, "state" => "bytes_out"),
946 ));
947 metrics.push(self.create_metric(
948 "network_metrics_num_requests_total",
949 counter!(status.network.num_requests),
950 tags!(self.tags),
951 ));
952
953 for (r#type, value) in status.opcounters {
955 metrics.push(self.create_metric(
956 "op_counters_repl_total",
957 counter!(value),
958 tags!(self.tags, "type" => r#type),
959 ));
960 }
961
962 for (r#type, value) in status.opcounters_repl {
964 metrics.push(self.create_metric(
965 "op_counters_total",
966 counter!(value),
967 tags!(self.tags, "type" => r#type),
968 ));
969 }
970
971 Ok(metrics)
972 }
973}
974
975fn bson_size(value: &Bson) -> usize {
976 match value {
977 Bson::Double(value) => value.size_of(),
978 Bson::String(value) => value.size_of(),
979 Bson::Array(value) => value.iter().map(bson_size).sum(),
980 Bson::Document(value) => document_size(value),
981 Bson::Boolean(_) => std::mem::size_of::<bool>(),
982 Bson::RegularExpression(value) => value.pattern.size_of(),
983 Bson::JavaScriptCode(value) => value.size_of(),
984 Bson::JavaScriptCodeWithScope(value) => value.code.size_of() + document_size(&value.scope),
985 Bson::Int32(value) => value.size_of(),
986 Bson::Int64(value) => value.size_of(),
987 Bson::Timestamp(value) => value.time.size_of() + value.increment.size_of(),
988 Bson::Binary(value) => value.bytes.size_of(),
989 Bson::ObjectId(value) => value.bytes().size_of(),
990 Bson::DateTime(_) => std::mem::size_of::<i64>(),
991 Bson::Symbol(value) => value.size_of(),
992 Bson::Decimal128(value) => value.bytes().size_of(),
993 Bson::DbPointer(_) => {
994 0
996 }
997 Bson::Null | Bson::Undefined | Bson::MaxKey | Bson::MinKey => 0,
998 }
999}
1000
1001fn document_size(doc: &Document) -> usize {
1002 doc.into_iter()
1003 .map(|(key, value)| key.size_of() + bson_size(value))
1004 .sum()
1005}
1006
1007fn sanitize_endpoint(endpoint: &str, options: &ClientOptions) -> String {
1015 let mut endpoint = endpoint.to_owned();
1016 if options.credential.is_some() {
1017 let start = endpoint.find("://").unwrap() + 3;
1018
1019 let pre_slash = match endpoint[start..].find('/') {
1021 Some(index) => {
1022 let mut segments = endpoint[start..].split_at(index);
1023 if segments.1.len() > 1 {
1025 let lstart = start + segments.0.len() + 1;
1026 let post_slash = &segments.1[1..];
1027 if let Some(index) = post_slash.find('?') {
1029 let segments = post_slash.split_at(index);
1030 if segments.1.len() > 1 {
1032 let options = segments.1[1..]
1034 .split('&')
1035 .filter(|pair| {
1036 let (key, _) = pair.split_at(pair.find('=').unwrap());
1037 !matches!(
1038 key.to_lowercase().as_str(),
1039 "authsource" | "authmechanism" | "authmechanismproperties"
1040 )
1041 })
1042 .collect::<Vec<_>>()
1043 .join("&");
1044
1045 endpoint = format!(
1047 "{}{}",
1048 &endpoint[..lstart + segments.0.len() + 1],
1049 &options
1050 );
1051 }
1052 }
1053 segments = endpoint[start..].split_at(index);
1054 }
1055 segments.0
1056 }
1057 None => &endpoint[start..],
1058 };
1059
1060 let end = pre_slash.rfind('@').unwrap() + 1;
1062 endpoint = format!("{}{}", &endpoint[0..start], &endpoint[start + end..]);
1063 }
1064 endpoint
1065}
1066
1067#[cfg(test)]
1068mod tests {
1069 use super::*;
1070
1071 #[test]
1072 fn generate_config() {
1073 crate::test_util::test_generate_config::<MongoDbMetricsConfig>();
1074 }
1075
1076 #[tokio::test]
1077 async fn sanitize_endpoint_test() {
1078 let endpoint = "mongodb://myDBReader:D1fficultP%40ssw0rd@mongos0.example.com:27017,mongos1.example.com:27017,mongos2.example.com:27017/?authSource=admin&tls=true";
1079 let client_options = ClientOptions::parse(endpoint).await.unwrap();
1080 let endpoint = sanitize_endpoint(endpoint, &client_options);
1081 assert_eq!(&endpoint, "mongodb://mongos0.example.com:27017,mongos1.example.com:27017,mongos2.example.com:27017/?tls=true");
1082 }
1083}
1084
1085#[cfg(all(test, feature = "mongodb_metrics-integration-tests"))]
1086mod integration_tests {
1087 use futures::StreamExt;
1088 use tokio::time::{timeout, Duration};
1089
1090 use super::*;
1091 use crate::{
1092 test_util::{
1093 components::{assert_source_compliance, PULL_SOURCE_TAGS},
1094 trace_init,
1095 },
1096 SourceSender,
1097 };
1098
1099 fn primary_mongo_address() -> String {
1100 std::env::var("PRIMARY_MONGODB_ADDRESS")
1101 .unwrap_or_else(|_| "mongodb://localhost:27017".into())
1102 }
1103
1104 fn secondary_mongo_address() -> String {
1105 std::env::var("SECONDARY_MONGODB_ADDRESS")
1106 .unwrap_or_else(|_| "mongodb://localhost:27019".into())
1107 }
1108
1109 fn remove_creds(address: &str) -> String {
1110 let mut url = url::Url::parse(address).unwrap();
1111 url.set_password(None).unwrap();
1112 url.set_username("").unwrap();
1113 url.to_string()
1114 }
1115
1116 async fn test_instance(endpoint: String) {
1117 assert_source_compliance(&PULL_SOURCE_TAGS, async {
1118 let host = ClientOptions::parse(endpoint.as_str()).await.unwrap().hosts[0].to_string();
1119 let namespace = "vector_mongodb";
1120
1121 let (sender, mut recv) = SourceSender::new_test();
1122
1123 let endpoints = vec![endpoint.clone()];
1124 tokio::spawn(async move {
1125 MongoDbMetricsConfig {
1126 endpoints,
1127 scrape_interval_secs: Duration::from_secs(15),
1128 namespace: namespace.to_owned(),
1129 }
1130 .build(SourceContext::new_test(sender, None))
1131 .await
1132 .unwrap()
1133 .await
1134 .unwrap()
1135 });
1136
1137 let event = timeout(Duration::from_secs(30), recv.next())
1142 .await
1143 .expect("fetch metrics timeout")
1144 .expect("failed to get metrics from a stream");
1145 let mut events = vec![event];
1146 loop {
1147 match timeout(Duration::from_millis(10), recv.next()).await {
1148 Ok(Some(event)) => events.push(event),
1149 Ok(None) => break,
1150 Err(_) => break,
1151 }
1152 }
1153
1154 let clean_endpoint = remove_creds(&endpoint);
1155
1156 assert!(events.len() > 100);
1157 for event in events {
1158 let metric = event.into_metric();
1159 assert!(metric.namespace() == Some(namespace));
1161 let timestamp = metric.timestamp().expect("existed timestamp");
1163 assert!((timestamp - Utc::now()).num_seconds() < 1);
1164 let tags = metric.tags().expect("existed tags");
1166 assert_eq!(tags.get("endpoint"), Some(&clean_endpoint[..]));
1167 assert_eq!(tags.get("host"), Some(&host[..]));
1168 }
1169 })
1170 .await;
1171 }
1172
1173 #[tokio::test]
1174 async fn fetch_metrics_mongod() {
1175 trace_init();
1176 test_instance(primary_mongo_address()).await;
1177 }
1178
1179 #[tokio::test]
1187 async fn fetch_metrics_replset() {
1188 trace_init();
1189 test_instance(secondary_mongo_address()).await;
1190 }
1191}