vector/sources/mongodb_metrics/
mod.rs

1use std::time::{Duration, Instant};
2
3use chrono::Utc;
4use futures::{
5    future::{join_all, try_join_all},
6    StreamExt,
7};
8use mongodb::{
9    bson::{self, doc, from_document, Bson, Document},
10    error::Error as MongoError,
11    options::ClientOptions,
12    Client,
13};
14use serde_with::serde_as;
15use snafu::{ResultExt, Snafu};
16use tokio::time;
17use tokio_stream::wrappers::IntervalStream;
18use vector_lib::configurable::configurable_component;
19use vector_lib::{metric_tags, ByteSizeOf, EstimatedJsonEncodedSizeOf};
20
21use crate::{
22    config::{SourceConfig, SourceContext, SourceOutput},
23    event::metric::{Metric, MetricKind, MetricTags, MetricValue},
24    internal_events::{
25        CollectionCompleted, EndpointBytesReceived, MongoDbMetricsBsonParseError,
26        MongoDbMetricsEventsReceived, MongoDbMetricsRequestError, StreamClosedError,
27    },
28};
29
30mod types;
31use types::{CommandBuildInfo, CommandIsMaster, CommandServerStatus, NodeType};
32use vector_lib::config::LogNamespace;
33
34macro_rules! tags {
35    ($tags:expr_2021) => { $tags.clone() };
36    ($tags:expr_2021, $($key:expr_2021 => $value:expr_2021),*) => {
37        {
38            let mut tags = $tags.clone();
39            $(
40                tags.replace($key.into(), $value.to_string());
41            )*
42            tags
43        }
44    };
45}
46
47macro_rules! counter {
48    ($value:expr_2021) => {
49        MetricValue::Counter {
50            value: $value as f64,
51        }
52    };
53}
54
55macro_rules! gauge {
56    ($value:expr_2021) => {
57        MetricValue::Gauge {
58            value: $value as f64,
59        }
60    };
61}
62
63#[derive(Debug, Snafu)]
64enum BuildError {
65    #[snafu(display("invalid endpoint: {}", source))]
66    InvalidEndpoint { source: MongoError },
67    #[snafu(display("invalid client options: {}", source))]
68    InvalidClientOptions { source: MongoError },
69}
70
71#[derive(Debug)]
72enum CollectError {
73    Mongo(MongoError),
74    Bson(bson::de::Error),
75}
76
77/// Configuration for the `mongodb_metrics` source.
78#[serde_as]
79#[configurable_component(source("mongodb_metrics", "Collect metrics from the MongoDB database."))]
80#[derive(Clone, Debug, Default)]
81#[serde(deny_unknown_fields)]
82pub struct MongoDbMetricsConfig {
83    /// A list of MongoDB instances to scrape.
84    ///
85    /// Each endpoint must be in the [Connection String URI Format](https://www.mongodb.com/docs/manual/reference/connection-string/).
86    #[configurable(metadata(docs::examples = "mongodb://localhost:27017"))]
87    endpoints: Vec<String>,
88
89    /// The interval between scrapes, in seconds.
90    #[serde(default = "default_scrape_interval_secs")]
91    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
92    #[configurable(metadata(docs::human_name = "Scrape Interval"))]
93    scrape_interval_secs: Duration,
94
95    /// Overrides the default namespace for the metrics emitted by the source.
96    ///
97    /// If set to an empty string, no namespace is added to the metrics.
98    ///
99    /// By default, `mongodb` is used.
100    #[serde(default = "default_namespace")]
101    namespace: String,
102}
103
104#[derive(Debug)]
105struct MongoDbMetrics {
106    client: Client,
107    endpoint: String,
108    namespace: Option<String>,
109    tags: MetricTags,
110}
111
112pub const fn default_scrape_interval_secs() -> Duration {
113    Duration::from_secs(15)
114}
115
116pub fn default_namespace() -> String {
117    "mongodb".to_string()
118}
119
120impl_generate_config_from_default!(MongoDbMetricsConfig);
121
122#[async_trait::async_trait]
123#[typetag::serde(name = "mongodb_metrics")]
124impl SourceConfig for MongoDbMetricsConfig {
125    async fn build(&self, mut cx: SourceContext) -> crate::Result<super::Source> {
126        let namespace = Some(self.namespace.clone()).filter(|namespace| !namespace.is_empty());
127
128        let sources = try_join_all(
129            self.endpoints
130                .iter()
131                .map(|endpoint| MongoDbMetrics::new(endpoint, namespace.clone())),
132        )
133        .await?;
134
135        let duration = self.scrape_interval_secs;
136        let shutdown = cx.shutdown;
137        Ok(Box::pin(async move {
138            let mut interval = IntervalStream::new(time::interval(duration)).take_until(shutdown);
139            while interval.next().await.is_some() {
140                let start = Instant::now();
141                let metrics = join_all(sources.iter().map(|mongodb| mongodb.collect())).await;
142                emit!(CollectionCompleted {
143                    start,
144                    end: Instant::now()
145                });
146
147                let metrics: Vec<Metric> = metrics.into_iter().flatten().collect();
148                let count = metrics.len();
149
150                if (cx.out.send_batch(metrics).await).is_err() {
151                    emit!(StreamClosedError { count });
152                    return Err(());
153                }
154            }
155
156            Ok(())
157        }))
158    }
159
160    fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
161        vec![SourceOutput::new_metrics()]
162    }
163
164    fn can_acknowledge(&self) -> bool {
165        false
166    }
167}
168
169impl MongoDbMetrics {
170    /// Works only with Standalone connection-string. Collect metrics only from specified instance.
171    /// <https://docs.mongodb.com/manual/reference/connection-string/#standard-connection-string-format>
172    async fn new(endpoint: &str, namespace: Option<String>) -> Result<MongoDbMetrics, BuildError> {
173        let mut client_options = ClientOptions::parse(endpoint)
174            .await
175            .context(InvalidEndpointSnafu)?;
176        client_options.direct_connection = Some(true);
177
178        let endpoint = sanitize_endpoint(endpoint, &client_options);
179        let tags = metric_tags!(
180            "endpoint" => endpoint.clone(),
181            "host" => client_options.hosts[0].to_string(),
182        );
183
184        Ok(Self {
185            client: Client::with_options(client_options).context(InvalidClientOptionsSnafu)?,
186            endpoint,
187            namespace,
188            tags,
189        })
190    }
191
192    /// Finding node type for client with `isMaster` command.
193    async fn get_node_type(&self) -> Result<NodeType, CollectError> {
194        let doc = self
195            .client
196            .database("admin")
197            .run_command(doc! { "isMaster": 1 }, None)
198            .await
199            .map_err(CollectError::Mongo)?;
200        let msg: CommandIsMaster = from_document(doc).map_err(CollectError::Bson)?;
201
202        Ok(if msg.set_name.is_some() || msg.hosts.is_some() {
203            NodeType::Replset
204        } else if msg.msg.map(|msg| msg == "isdbgrid").unwrap_or(false) {
205            // Contains the value isdbgrid when isMaster returns from a mongos instance.
206            // <https://docs.mongodb.com/manual/reference/command/isMaster/#isMaster.msg>
207            // <https://docs.mongodb.com/manual/core/sharded-cluster-query-router/#confirm-connection-to-mongos-instances>
208            NodeType::Mongos
209        } else {
210            NodeType::Mongod
211        })
212    }
213
214    async fn get_build_info(&self) -> Result<CommandBuildInfo, CollectError> {
215        let doc = self
216            .client
217            .database("admin")
218            .run_command(doc! { "buildInfo": 1 }, None)
219            .await
220            .map_err(CollectError::Mongo)?;
221        from_document(doc).map_err(CollectError::Bson)
222    }
223
224    async fn print_version(&self) -> Result<(), CollectError> {
225        if tracing::level_enabled!(tracing::Level::DEBUG) {
226            let node_type = self.get_node_type().await?;
227            let build_info = self.get_build_info().await?;
228            debug!(
229                message = "Connected to server.", endpoint = %self.endpoint, node_type = ?node_type, server_version = ?serde_json::to_string(&build_info).unwrap()
230            );
231        }
232
233        Ok(())
234    }
235
236    fn create_metric(&self, name: &str, value: MetricValue, tags: MetricTags) -> Metric {
237        Metric::new(name, MetricKind::Absolute, value)
238            .with_namespace(self.namespace.clone())
239            .with_tags(Some(tags))
240            .with_timestamp(Some(Utc::now()))
241    }
242
243    async fn collect(&self) -> Vec<Metric> {
244        // `up` metric is `1` if collection is successful, otherwise `0`.
245        let (up_value, mut metrics) = match self.collect_server_status().await {
246            Ok(metrics) => (1.0, metrics),
247            Err(error) => {
248                match error {
249                    CollectError::Mongo(error) => emit!(MongoDbMetricsRequestError {
250                        error,
251                        endpoint: &self.endpoint,
252                    }),
253                    CollectError::Bson(error) => emit!(MongoDbMetricsBsonParseError {
254                        error,
255                        endpoint: &self.endpoint,
256                    }),
257                }
258
259                (0.0, vec![])
260            }
261        };
262
263        metrics.push(self.create_metric("up", gauge!(up_value), tags!(self.tags)));
264
265        emit!(MongoDbMetricsEventsReceived {
266            byte_size: metrics.estimated_json_encoded_size_of(),
267            count: metrics.len(),
268            endpoint: &self.endpoint,
269        });
270
271        metrics
272    }
273
274    /// Collect metrics from `serverStatus` command.
275    /// <https://docs.mongodb.com/manual/reference/command/serverStatus/>
276    async fn collect_server_status(&self) -> Result<Vec<Metric>, CollectError> {
277        self.print_version().await?;
278
279        let mut metrics = vec![];
280
281        let command = doc! { "serverStatus": 1, "opLatencies": { "histograms": true }};
282        let db = self.client.database("admin");
283        let doc = db
284            .run_command(command, None)
285            .await
286            .map_err(CollectError::Mongo)?;
287        let byte_size = document_size(&doc);
288        emit!(EndpointBytesReceived {
289            byte_size,
290            protocol: "tcp",
291            endpoint: &self.endpoint,
292        });
293        let status: CommandServerStatus = from_document(doc).map_err(CollectError::Bson)?;
294
295        // asserts_total
296        metrics.push(self.create_metric(
297            "asserts_total",
298            counter!(status.asserts.regular),
299            tags!(self.tags, "type" => "regular"),
300        ));
301        metrics.push(self.create_metric(
302            "asserts_total",
303            counter!(status.asserts.warning),
304            tags!(self.tags, "type" => "warning"),
305        ));
306        metrics.push(self.create_metric(
307            "asserts_total",
308            counter!(status.asserts.msg),
309            tags!(self.tags, "type" => "msg"),
310        ));
311        metrics.push(self.create_metric(
312            "asserts_total",
313            counter!(status.asserts.user),
314            tags!(self.tags, "type" => "user"),
315        ));
316        metrics.push(self.create_metric(
317            "asserts_total",
318            counter!(status.asserts.rollovers),
319            tags!(self.tags, "type" => "rollovers"),
320        ));
321
322        // connections
323        metrics.push(self.create_metric(
324            "connections",
325            counter!(status.connections.active),
326            tags!(self.tags, "state" => "active"),
327        ));
328        metrics.push(self.create_metric(
329            "connections",
330            counter!(status.connections.available),
331            tags!(self.tags, "state" => "available"),
332        ));
333        metrics.push(self.create_metric(
334            "connections",
335            counter!(status.connections.current),
336            tags!(self.tags, "state" => "current"),
337        ));
338
339        // extra_info_*
340        if let Some(value) = status.extra_info.heap_usage_bytes {
341            metrics.push(self.create_metric(
342                "extra_info_heap_usage_bytes",
343                gauge!(value),
344                tags!(self.tags),
345            ));
346        }
347        metrics.push(self.create_metric(
348            "extra_info_page_faults",
349            gauge!(status.extra_info.page_faults),
350            tags!(self.tags),
351        ));
352
353        // instance_*
354        metrics.push(self.create_metric(
355            "instance_local_time",
356            gauge!(status.instance.local_time.timestamp_millis() / 1000),
357            tags!(self.tags),
358        ));
359        metrics.push(self.create_metric(
360            "instance_uptime_estimate_seconds_total",
361            gauge!(status.instance.uptime_estimate),
362            tags!(self.tags),
363        ));
364        metrics.push(self.create_metric(
365            "instance_uptime_seconds_total",
366            gauge!(status.instance.uptime),
367            tags!(self.tags),
368        ));
369
370        // memory
371        metrics.push(self.create_metric(
372            "memory",
373            gauge!(status.memory.resident),
374            tags!(self.tags, "type" => "resident"),
375        ));
376        metrics.push(self.create_metric(
377            "memory",
378            gauge!(status.memory.r#virtual),
379            tags!(self.tags, "type" => "virtual"),
380        ));
381        if let Some(value) = status.memory.mapped {
382            metrics.push(self.create_metric(
383                "memory",
384                gauge!(value),
385                tags!(self.tags, "type" => "mapped"),
386            ))
387        }
388        if let Some(value) = status.memory.mapped_with_journal {
389            metrics.push(self.create_metric(
390                "memory",
391                gauge!(value),
392                tags!(self.tags, "type" => "mapped_with_journal"),
393            ))
394        }
395
396        // mongod_global_lock_*
397        metrics.push(self.create_metric(
398            "mongod_global_lock_total_time_seconds",
399            counter!(status.global_lock.total_time),
400            tags!(self.tags),
401        ));
402        metrics.push(self.create_metric(
403            "mongod_global_lock_active_clients",
404            gauge!(status.global_lock.active_clients.total),
405            tags!(self.tags, "type" => "total"),
406        ));
407        metrics.push(self.create_metric(
408            "mongod_global_lock_active_clients",
409            gauge!(status.global_lock.active_clients.readers),
410            tags!(self.tags, "type" => "readers"),
411        ));
412        metrics.push(self.create_metric(
413            "mongod_global_lock_active_clients",
414            gauge!(status.global_lock.active_clients.writers),
415            tags!(self.tags, "type" => "writers"),
416        ));
417        metrics.push(self.create_metric(
418            "mongod_global_lock_current_queue",
419            gauge!(status.global_lock.current_queue.total),
420            tags!(self.tags, "type" => "total"),
421        ));
422        metrics.push(self.create_metric(
423            "mongod_global_lock_current_queue",
424            gauge!(status.global_lock.current_queue.readers),
425            tags!(self.tags, "type" => "readers"),
426        ));
427        metrics.push(self.create_metric(
428            "mongod_global_lock_current_queue",
429            gauge!(status.global_lock.current_queue.writers),
430            tags!(self.tags, "type" => "writers"),
431        ));
432
433        // mongod_locks_time_*
434        for (r#type, lock) in status.locks {
435            if let Some(modes) = lock.time_acquiring_micros {
436                if let Some(value) = modes.read {
437                    metrics.push(self.create_metric(
438                        "mongod_locks_time_acquiring_global_seconds_total",
439                        counter!(value),
440                        tags!(self.tags, "type" => &r#type, "mode" => "read"),
441                    ));
442                }
443                if let Some(value) = modes.write {
444                    metrics.push(self.create_metric(
445                        "mongod_locks_time_acquiring_global_seconds_total",
446                        counter!(value),
447                        tags!(self.tags, "type" => &r#type, "mode" => "write"),
448                    ));
449                }
450            }
451        }
452
453        // mongod_metrics_cursor_*
454        metrics.push(self.create_metric(
455            "mongod_metrics_cursor_timed_out_total",
456            counter!(status.metrics.cursor.timed_out),
457            tags!(self.tags),
458        ));
459        metrics.push(self.create_metric(
460            "mongod_metrics_cursor_open",
461            gauge!(status.metrics.cursor.open.no_timeout),
462            tags!(self.tags, "state" => "no_timeout"),
463        ));
464        metrics.push(self.create_metric(
465            "mongod_metrics_cursor_open",
466            gauge!(status.metrics.cursor.open.pinned),
467            tags!(self.tags, "state" => "pinned"),
468        ));
469        metrics.push(self.create_metric(
470            "mongod_metrics_cursor_open",
471            gauge!(status.metrics.cursor.open.total),
472            tags!(self.tags, "state" => "total"),
473        ));
474
475        // mongod_metrics_document_total
476        metrics.push(self.create_metric(
477            "mongod_metrics_document_total",
478            counter!(status.metrics.document.deleted),
479            tags!(self.tags, "state" => "deleted"),
480        ));
481        metrics.push(self.create_metric(
482            "mongod_metrics_document_total",
483            counter!(status.metrics.document.inserted),
484            tags!(self.tags, "state" => "inserted"),
485        ));
486        metrics.push(self.create_metric(
487            "mongod_metrics_document_total",
488            counter!(status.metrics.document.returned),
489            tags!(self.tags, "state" => "returned"),
490        ));
491        metrics.push(self.create_metric(
492            "mongod_metrics_document_total",
493            counter!(status.metrics.document.updated),
494            tags!(self.tags, "state" => "updated"),
495        ));
496
497        // mongod_metrics_get_last_error_*
498        metrics.push(self.create_metric(
499            "mongod_metrics_get_last_error_wtime_num",
500            gauge!(status.metrics.get_last_error.wtime.num),
501            tags!(self.tags),
502        ));
503        metrics.push(self.create_metric(
504            "mongod_metrics_get_last_error_wtime_seconds_total",
505            counter!(status.metrics.get_last_error.wtime.total_millis / 1000),
506            tags!(self.tags),
507        ));
508        metrics.push(self.create_metric(
509            "mongod_metrics_get_last_error_wtimeouts_total",
510            counter!(status.metrics.get_last_error.wtimeouts),
511            tags!(self.tags),
512        ));
513
514        // mongod_metrics_operation_total
515        metrics.push(self.create_metric(
516            "mongod_metrics_operation_total",
517            counter!(status.metrics.operation.scan_and_order),
518            tags!(self.tags, "type" => "scan_and_order"),
519        ));
520        metrics.push(self.create_metric(
521            "mongod_metrics_operation_total",
522            counter!(status.metrics.operation.write_conflicts),
523            tags!(self.tags, "type" => "write_conflicts"),
524        ));
525
526        // mongod_metrics_query_executor_total
527        metrics.push(self.create_metric(
528            "mongod_metrics_query_executor_total",
529            counter!(status.metrics.query_executor.scanned),
530            tags!(self.tags, "state" => "scanned"),
531        ));
532        metrics.push(self.create_metric(
533            "mongod_metrics_query_executor_total",
534            counter!(status.metrics.query_executor.scanned_objects),
535            tags!(self.tags, "state" => "scanned_objects"),
536        ));
537        if let Some(doc) = status.metrics.query_executor.collection_scans {
538            metrics.push(self.create_metric(
539                "mongod_metrics_query_executor_total",
540                counter!(doc.total),
541                tags!(self.tags, "state" => "collection_scans"),
542            ));
543        }
544
545        // mongod_metrics_record_moves_total
546        if let Some(record) = status.metrics.record {
547            metrics.push(self.create_metric(
548                "mongod_metrics_record_moves_total",
549                counter!(record.moves),
550                tags!(self.tags),
551            ));
552        }
553
554        // mongod_metrics_repl_apply_
555        metrics.push(self.create_metric(
556            "mongod_metrics_repl_apply_batches_num_total",
557            counter!(status.metrics.repl.apply.batches.num),
558            tags!(self.tags),
559        ));
560        metrics.push(self.create_metric(
561            "mongod_metrics_repl_apply_batches_seconds_total",
562            counter!(status.metrics.repl.apply.batches.total_millis / 1000),
563            tags!(self.tags),
564        ));
565        metrics.push(self.create_metric(
566            "mongod_metrics_repl_apply_ops_total",
567            counter!(status.metrics.repl.apply.ops),
568            tags!(self.tags),
569        ));
570
571        // mongod_metrics_repl_buffer_*
572        metrics.push(self.create_metric(
573            "mongod_metrics_repl_buffer_count",
574            counter!(status.metrics.repl.buffer.count),
575            tags!(self.tags),
576        ));
577        metrics.push(self.create_metric(
578            "mongod_metrics_repl_buffer_max_size_bytes_total",
579            counter!(status.metrics.repl.buffer.max_size_bytes),
580            tags!(self.tags),
581        ));
582        metrics.push(self.create_metric(
583            "mongod_metrics_repl_buffer_size_bytes",
584            counter!(status.metrics.repl.buffer.size_bytes),
585            tags!(self.tags),
586        ));
587
588        // mongod_metrics_repl_executor_*
589        metrics.push(self.create_metric(
590            "mongod_metrics_repl_executor_queue",
591            gauge!(status.metrics.repl.executor.queues.network_in_progress),
592            tags!(self.tags, "type" => "network_in_progress"),
593        ));
594        metrics.push(self.create_metric(
595            "mongod_metrics_repl_executor_queue",
596            gauge!(status.metrics.repl.executor.queues.sleepers),
597            tags!(self.tags, "type" => "sleepers"),
598        ));
599        metrics.push(self.create_metric(
600            "mongod_metrics_repl_executor_unsignaled_events",
601            gauge!(status.metrics.repl.executor.unsignaled_events),
602            tags!(self.tags),
603        ));
604
605        // mongod_metrics_repl_network_*
606        metrics.push(self.create_metric(
607            "mongod_metrics_repl_network_bytes_total",
608            counter!(status.metrics.repl.network.bytes),
609            tags!(self.tags),
610        ));
611        metrics.push(self.create_metric(
612            "mongod_metrics_repl_network_getmores_num_total",
613            counter!(status.metrics.repl.network.getmores.num),
614            tags!(self.tags),
615        ));
616        metrics.push(self.create_metric(
617            "mongod_metrics_repl_network_getmores_seconds_total",
618            counter!(status.metrics.repl.network.getmores.total_millis / 1000),
619            tags!(self.tags),
620        ));
621        metrics.push(self.create_metric(
622            "mongod_metrics_repl_network_ops_total",
623            counter!(status.metrics.repl.network.ops),
624            tags!(self.tags),
625        ));
626        metrics.push(self.create_metric(
627            "mongod_metrics_repl_network_readers_created_total",
628            counter!(status.metrics.repl.network.readers_created),
629            tags!(self.tags),
630        ));
631
632        // mongod_metrics_ttl_*
633        metrics.push(self.create_metric(
634            "mongod_metrics_ttl_deleted_documents_total",
635            counter!(status.metrics.ttl.deleted_documents),
636            tags!(self.tags),
637        ));
638        metrics.push(self.create_metric(
639            "mongod_metrics_ttl_passes_total",
640            counter!(status.metrics.ttl.passes),
641            tags!(self.tags),
642        ));
643
644        // mongod_op_latencies_*
645        for (r#type, stat) in status.op_latencies {
646            for bucket in stat.histogram {
647                metrics.push(self.create_metric(
648                    "mongod_op_latencies_histogram",
649                    gauge!(bucket.count),
650                    tags!(self.tags, "type" => &r#type, "micros" => bucket.micros.to_string()),
651                ));
652            }
653            metrics.push(self.create_metric(
654                "mongod_op_latencies_latency",
655                gauge!(stat.latency),
656                tags!(self.tags, "type" => &r#type),
657            ));
658            metrics.push(self.create_metric(
659                "mongod_op_latencies_ops_total",
660                gauge!(stat.ops),
661                tags!(self.tags, "type" => &r#type),
662            ));
663        }
664
665        // mongod_storage_engine
666        metrics.push(self.create_metric(
667            "mongod_storage_engine",
668            gauge!(1),
669            tags!(self.tags, "engine" => status.storage_engine.name),
670        ));
671
672        // mongod_wiredtiger_*
673        if let Some(stat) = status.wired_tiger {
674            // mongod_wiredtiger_blockmanager_blocks_total
675            metrics.push(self.create_metric(
676                "mongod_wiredtiger_blockmanager_blocks_total",
677                counter!(stat.block_manager.blocks_read),
678                tags!(self.tags, "type" => "blocks_read"),
679            ));
680            metrics.push(self.create_metric(
681                "mongod_wiredtiger_blockmanager_blocks_total",
682                counter!(stat.block_manager.mapped_blocks_read),
683                tags!(self.tags, "type" => "blocks_read_mapped"),
684            ));
685            metrics.push(self.create_metric(
686                "mongod_wiredtiger_blockmanager_blocks_total",
687                counter!(stat.block_manager.blocks_pre_loaded),
688                tags!(self.tags, "type" => "blocks_pre_loaded"),
689            ));
690            metrics.push(self.create_metric(
691                "mongod_wiredtiger_blockmanager_blocks_total",
692                counter!(stat.block_manager.blocks_written),
693                tags!(self.tags, "type" => "blocks_written"),
694            ));
695
696            // mongod_wiredtiger_blockmanager_bytes_total
697            metrics.push(self.create_metric(
698                "mongod_wiredtiger_blockmanager_bytes_total",
699                counter!(stat.block_manager.bytes_read),
700                tags!(self.tags, "type" => "bytes_read"),
701            ));
702            metrics.push(self.create_metric(
703                "mongod_wiredtiger_blockmanager_bytes_total",
704                counter!(stat.block_manager.mapped_bytes_read),
705                tags!(self.tags, "type" => "bytes_read_mapped"),
706            ));
707            metrics.push(self.create_metric(
708                "mongod_wiredtiger_blockmanager_bytes_total",
709                counter!(stat.block_manager.bytes_written),
710                tags!(self.tags, "type" => "bytes_written"),
711            ));
712
713            // mongod_wiredtiger_cache_bytes
714            metrics.push(self.create_metric(
715                "mongod_wiredtiger_cache_bytes",
716                gauge!(stat.cache.bytes_total),
717                tags!(self.tags, "type" => "total"),
718            ));
719            metrics.push(self.create_metric(
720                "mongod_wiredtiger_cache_bytes",
721                gauge!(stat.cache.bytes_dirty),
722                tags!(self.tags, "type" => "dirty"),
723            ));
724            metrics.push(self.create_metric(
725                "mongod_wiredtiger_cache_bytes",
726                gauge!(stat.cache.bytes_internal_pages),
727                tags!(self.tags, "type" => "internal_pages"),
728            ));
729            metrics.push(self.create_metric(
730                "mongod_wiredtiger_cache_bytes",
731                gauge!(stat.cache.bytes_leaf_pages),
732                tags!(self.tags, "type" => "leaf_pages"),
733            ));
734
735            // mongod_wiredtiger_cache_bytes_total
736            metrics.push(self.create_metric(
737                "mongod_wiredtiger_cache_bytes_total",
738                counter!(stat.cache.pages_read_into),
739                tags!(self.tags, "type" => "read"),
740            ));
741            metrics.push(self.create_metric(
742                "mongod_wiredtiger_cache_bytes_total",
743                counter!(stat.cache.pages_written_from),
744                tags!(self.tags, "type" => "written"),
745            ));
746
747            // mongod_wiredtiger_cache_evicted_total
748            metrics.push(self.create_metric(
749                "mongod_wiredtiger_cache_evicted_total",
750                counter!(stat.cache.evicted_modified),
751                tags!(self.tags, "type" => "modified"),
752            ));
753            metrics.push(self.create_metric(
754                "mongod_wiredtiger_cache_evicted_total",
755                counter!(stat.cache.evicted_unmodified),
756                tags!(self.tags, "type" => "unmodified"),
757            ));
758
759            // mongod_wiredtiger_cache_max_bytes
760            metrics.push(self.create_metric(
761                "mongod_wiredtiger_cache_max_bytes",
762                gauge!(stat.cache.max_bytes),
763                tags!(self.tags),
764            ));
765
766            // mongod_wiredtiger_cache_overhead_percent
767            metrics.push(self.create_metric(
768                "mongod_wiredtiger_cache_overhead_percent",
769                gauge!(stat.cache.percent_overhead),
770                tags!(self.tags),
771            ));
772
773            // mongod_wiredtiger_cache_pages
774            metrics.push(self.create_metric(
775                "mongod_wiredtiger_cache_pages",
776                gauge!(stat.cache.pages_total),
777                tags!(self.tags, "type" => "total"),
778            ));
779            metrics.push(self.create_metric(
780                "mongod_wiredtiger_cache_pages",
781                gauge!(stat.cache.pages_dirty),
782                tags!(self.tags, "type" => "dirty"),
783            ));
784
785            // mongod_wiredtiger_cache_pages_total
786            metrics.push(self.create_metric(
787                "mongod_wiredtiger_cache_pages_total",
788                counter!(stat.cache.pages_read_into),
789                tags!(self.tags, "type" => "read"),
790            ));
791            metrics.push(self.create_metric(
792                "mongod_wiredtiger_cache_pages_total",
793                counter!(stat.cache.pages_written_from),
794                tags!(self.tags, "type" => "write"),
795            ));
796
797            // mongod_wiredtiger_concurrent_transactions_*
798            metrics.push(self.create_metric(
799                "mongod_wiredtiger_concurrent_transactions_available_tickets",
800                gauge!(stat.concurrent_transactions.read.available),
801                tags!(self.tags, "type" => "read"),
802            ));
803            metrics.push(self.create_metric(
804                "mongod_wiredtiger_concurrent_transactions_available_tickets",
805                gauge!(stat.concurrent_transactions.write.available),
806                tags!(self.tags, "type" => "write"),
807            ));
808            metrics.push(self.create_metric(
809                "mongod_wiredtiger_concurrent_transactions_out_tickets",
810                gauge!(stat.concurrent_transactions.read.out),
811                tags!(self.tags, "type" => "read"),
812            ));
813            metrics.push(self.create_metric(
814                "mongod_wiredtiger_concurrent_transactions_out_tickets",
815                gauge!(stat.concurrent_transactions.write.out),
816                tags!(self.tags, "type" => "write"),
817            ));
818            metrics.push(self.create_metric(
819                "mongod_wiredtiger_concurrent_transactions_total_tickets",
820                gauge!(stat.concurrent_transactions.read.total_tickets),
821                tags!(self.tags, "type" => "read"),
822            ));
823            metrics.push(self.create_metric(
824                "mongod_wiredtiger_concurrent_transactions_total_tickets",
825                gauge!(stat.concurrent_transactions.write.total_tickets),
826                tags!(self.tags, "type" => "write"),
827            ));
828
829            // mongod_wiredtiger_log_*
830            metrics.push(self.create_metric(
831                "mongod_wiredtiger_log_bytes_total",
832                counter!(stat.log.bytes_payload_data),
833                tags!(self.tags, "type" => "payload"),
834            ));
835            metrics.push(self.create_metric(
836                "mongod_wiredtiger_log_bytes_total",
837                counter!(stat.log.bytes_written),
838                tags!(self.tags, "type" => "written"),
839            ));
840            metrics.push(self.create_metric(
841                "mongod_wiredtiger_log_operations_total",
842                counter!(stat.log.log_writes),
843                tags!(self.tags, "type" => "write"),
844            ));
845            metrics.push(self.create_metric(
846                "mongod_wiredtiger_log_operations_total",
847                counter!(stat.log.log_scans),
848                tags!(self.tags, "type" => "scan"),
849            ));
850            metrics.push(self.create_metric(
851                "mongod_wiredtiger_log_operations_total",
852                counter!(stat.log.log_scans_double),
853                tags!(self.tags, "type" => "scan_double"),
854            ));
855            metrics.push(self.create_metric(
856                "mongod_wiredtiger_log_operations_total",
857                counter!(stat.log.log_syncs),
858                tags!(self.tags, "type" => "sync"),
859            ));
860            metrics.push(self.create_metric(
861                "mongod_wiredtiger_log_operations_total",
862                counter!(stat.log.log_sync_dirs),
863                tags!(self.tags, "type" => "sync_dir"),
864            ));
865            metrics.push(self.create_metric(
866                "mongod_wiredtiger_log_operations_total",
867                counter!(stat.log.log_flushes),
868                tags!(self.tags, "type" => "flush"),
869            ));
870            metrics.push(self.create_metric(
871                "mongod_wiredtiger_log_records_scanned_total",
872                counter!(stat.log.records_compressed),
873                tags!(self.tags, "type" => "compressed"),
874            ));
875            metrics.push(self.create_metric(
876                "mongod_wiredtiger_log_records_scanned_total",
877                counter!(stat.log.records_uncompressed),
878                tags!(self.tags, "type" => "uncompressed"),
879            ));
880            metrics.push(self.create_metric(
881                "mongod_wiredtiger_log_records_total",
882                counter!(stat.log.records_processed_log_scan),
883                tags!(self.tags),
884            ));
885
886            // mongod_wiredtiger_session_open_sessions
887            metrics.push(self.create_metric(
888                "mongod_wiredtiger_session_open_sessions",
889                gauge!(stat.session.sessions),
890                tags!(self.tags),
891            ));
892
893            // mongod_wiredtiger_transactions_*
894            metrics.push(self.create_metric(
895                "mongod_wiredtiger_transactions_checkpoint_seconds",
896                gauge!(stat.transaction.checkpoint_min_ms / 1000),
897                tags!(self.tags, "type" => "min"),
898            ));
899            metrics.push(self.create_metric(
900                "mongod_wiredtiger_transactions_checkpoint_seconds",
901                gauge!(stat.transaction.checkpoint_max_ms / 1000),
902                tags!(self.tags, "type" => "max"),
903            ));
904            metrics.push(self.create_metric(
905                "mongod_wiredtiger_transactions_checkpoint_seconds_total",
906                counter!(stat.transaction.checkpoint_total_ms / 1000),
907                tags!(self.tags),
908            ));
909            metrics.push(self.create_metric(
910                "mongod_wiredtiger_transactions_running_checkpoints",
911                gauge!(stat.transaction.checkpoints_running),
912                tags!(self.tags),
913            ));
914            metrics.push(self.create_metric(
915                "mongod_wiredtiger_transactions_total",
916                counter!(stat.transaction.begins),
917                tags!(self.tags, "type" => "begins"),
918            ));
919            metrics.push(self.create_metric(
920                "mongod_wiredtiger_transactions_total",
921                counter!(stat.transaction.checkpoints),
922                tags!(self.tags, "type" => "checkpoints"),
923            ));
924            metrics.push(self.create_metric(
925                "mongod_wiredtiger_transactions_total",
926                counter!(stat.transaction.committed),
927                tags!(self.tags, "type" => "committed"),
928            ));
929            metrics.push(self.create_metric(
930                "mongod_wiredtiger_transactions_total",
931                counter!(stat.transaction.rolled_back),
932                tags!(self.tags, "type" => "rolledback"),
933            ));
934        }
935
936        // network_*
937        metrics.push(self.create_metric(
938            "network_bytes_total",
939            counter!(status.network.bytes_in),
940            tags!(self.tags, "state" => "bytes_in"),
941        ));
942        metrics.push(self.create_metric(
943            "network_bytes_total",
944            counter!(status.network.bytes_out),
945            tags!(self.tags, "state" => "bytes_out"),
946        ));
947        metrics.push(self.create_metric(
948            "network_metrics_num_requests_total",
949            counter!(status.network.num_requests),
950            tags!(self.tags),
951        ));
952
953        // op_counters_repl_total
954        for (r#type, value) in status.opcounters {
955            metrics.push(self.create_metric(
956                "op_counters_repl_total",
957                counter!(value),
958                tags!(self.tags, "type" => r#type),
959            ));
960        }
961
962        // op_counters_total
963        for (r#type, value) in status.opcounters_repl {
964            metrics.push(self.create_metric(
965                "op_counters_total",
966                counter!(value),
967                tags!(self.tags, "type" => r#type),
968            ));
969        }
970
971        Ok(metrics)
972    }
973}
974
975fn bson_size(value: &Bson) -> usize {
976    match value {
977        Bson::Double(value) => value.size_of(),
978        Bson::String(value) => value.size_of(),
979        Bson::Array(value) => value.iter().map(bson_size).sum(),
980        Bson::Document(value) => document_size(value),
981        Bson::Boolean(_) => std::mem::size_of::<bool>(),
982        Bson::RegularExpression(value) => value.pattern.size_of(),
983        Bson::JavaScriptCode(value) => value.size_of(),
984        Bson::JavaScriptCodeWithScope(value) => value.code.size_of() + document_size(&value.scope),
985        Bson::Int32(value) => value.size_of(),
986        Bson::Int64(value) => value.size_of(),
987        Bson::Timestamp(value) => value.time.size_of() + value.increment.size_of(),
988        Bson::Binary(value) => value.bytes.size_of(),
989        Bson::ObjectId(value) => value.bytes().size_of(),
990        Bson::DateTime(_) => std::mem::size_of::<i64>(),
991        Bson::Symbol(value) => value.size_of(),
992        Bson::Decimal128(value) => value.bytes().size_of(),
993        Bson::DbPointer(_) => {
994            // DbPointer parts are not public and cannot be evaluated
995            0
996        }
997        Bson::Null | Bson::Undefined | Bson::MaxKey | Bson::MinKey => 0,
998    }
999}
1000
1001fn document_size(doc: &Document) -> usize {
1002    doc.into_iter()
1003        .map(|(key, value)| key.size_of() + bson_size(value))
1004        .sum()
1005}
1006
1007/// Remove credentials from endpoint.
1008/// URI components: <https://docs.mongodb.com/manual/reference/connection-string/#components>
1009/// It's not possible to use [url::Url](https://docs.rs/url/2.1.1/url/struct.Url.html) because connection string can have multiple hosts.
1010/// Would be nice to serialize [ClientOptions](https://docs.rs/mongodb/1.1.1/mongodb/options/struct.ClientOptions.html) to String, but it's not supported.
1011/// `endpoint` argument would not be required, but field `original_uri` in `ClientOptions` is private.
1012/// `.unwrap()` in function is safe because endpoint was already verified by `ClientOptions`.
1013/// Based on ClientOptions::parse_uri -- <https://github.com/mongodb/mongo-rust-driver/blob/09e1193f93dcd850ebebb7fb82f6ab786fd85de1/src/client/options/mod.rs#L708>
1014fn sanitize_endpoint(endpoint: &str, options: &ClientOptions) -> String {
1015    let mut endpoint = endpoint.to_owned();
1016    if options.credential.is_some() {
1017        let start = endpoint.find("://").unwrap() + 3;
1018
1019        // Split `username:password@host[:port]` and `/defaultauthdb?<options>`
1020        let pre_slash = match endpoint[start..].find('/') {
1021            Some(index) => {
1022                let mut segments = endpoint[start..].split_at(index);
1023                // If we have databases and options
1024                if segments.1.len() > 1 {
1025                    let lstart = start + segments.0.len() + 1;
1026                    let post_slash = &segments.1[1..];
1027                    // Split `/defaultauthdb` and `?<options>`
1028                    if let Some(index) = post_slash.find('?') {
1029                        let segments = post_slash.split_at(index);
1030                        // If we have options
1031                        if segments.1.len() > 1 {
1032                            // Remove authentication options
1033                            let options = segments.1[1..]
1034                                .split('&')
1035                                .filter(|pair| {
1036                                    let (key, _) = pair.split_at(pair.find('=').unwrap());
1037                                    !matches!(
1038                                        key.to_lowercase().as_str(),
1039                                        "authsource" | "authmechanism" | "authmechanismproperties"
1040                                    )
1041                                })
1042                                .collect::<Vec<_>>()
1043                                .join("&");
1044
1045                            // Update options in endpoint
1046                            endpoint = format!(
1047                                "{}{}",
1048                                &endpoint[..lstart + segments.0.len() + 1],
1049                                &options
1050                            );
1051                        }
1052                    }
1053                    segments = endpoint[start..].split_at(index);
1054                }
1055                segments.0
1056            }
1057            None => &endpoint[start..],
1058        };
1059
1060        // Remove `username:password@`
1061        let end = pre_slash.rfind('@').unwrap() + 1;
1062        endpoint = format!("{}{}", &endpoint[0..start], &endpoint[start + end..]);
1063    }
1064    endpoint
1065}
1066
1067#[cfg(test)]
1068mod tests {
1069    use super::*;
1070
1071    #[test]
1072    fn generate_config() {
1073        crate::test_util::test_generate_config::<MongoDbMetricsConfig>();
1074    }
1075
1076    #[tokio::test]
1077    async fn sanitize_endpoint_test() {
1078        let endpoint = "mongodb://myDBReader:D1fficultP%40ssw0rd@mongos0.example.com:27017,mongos1.example.com:27017,mongos2.example.com:27017/?authSource=admin&tls=true";
1079        let client_options = ClientOptions::parse(endpoint).await.unwrap();
1080        let endpoint = sanitize_endpoint(endpoint, &client_options);
1081        assert_eq!(&endpoint, "mongodb://mongos0.example.com:27017,mongos1.example.com:27017,mongos2.example.com:27017/?tls=true");
1082    }
1083}
1084
1085#[cfg(all(test, feature = "mongodb_metrics-integration-tests"))]
1086mod integration_tests {
1087    use futures::StreamExt;
1088    use tokio::time::{timeout, Duration};
1089
1090    use super::*;
1091    use crate::{
1092        test_util::{
1093            components::{assert_source_compliance, PULL_SOURCE_TAGS},
1094            trace_init,
1095        },
1096        SourceSender,
1097    };
1098
1099    fn primary_mongo_address() -> String {
1100        std::env::var("PRIMARY_MONGODB_ADDRESS")
1101            .unwrap_or_else(|_| "mongodb://localhost:27017".into())
1102    }
1103
1104    fn secondary_mongo_address() -> String {
1105        std::env::var("SECONDARY_MONGODB_ADDRESS")
1106            .unwrap_or_else(|_| "mongodb://localhost:27019".into())
1107    }
1108
1109    fn remove_creds(address: &str) -> String {
1110        let mut url = url::Url::parse(address).unwrap();
1111        url.set_password(None).unwrap();
1112        url.set_username("").unwrap();
1113        url.to_string()
1114    }
1115
1116    async fn test_instance(endpoint: String) {
1117        assert_source_compliance(&PULL_SOURCE_TAGS, async {
1118            let host = ClientOptions::parse(endpoint.as_str()).await.unwrap().hosts[0].to_string();
1119            let namespace = "vector_mongodb";
1120
1121            let (sender, mut recv) = SourceSender::new_test();
1122
1123            let endpoints = vec![endpoint.clone()];
1124            tokio::spawn(async move {
1125                MongoDbMetricsConfig {
1126                    endpoints,
1127                    scrape_interval_secs: Duration::from_secs(15),
1128                    namespace: namespace.to_owned(),
1129                }
1130                .build(SourceContext::new_test(sender, None))
1131                .await
1132                .unwrap()
1133                .await
1134                .unwrap()
1135            });
1136
1137            // TODO: We should have a simpler/cleaner method for this sort of collection, where we're essentially waiting
1138            // for a burst of events, and want to debounce ourselves in terms of stopping collection once all events in the
1139            // burst have been collected. This code here isn't bad or anything... I've just noticed now that we do it in a
1140            // few places, and we could solve it in a cleaner way, most likely.
1141            let event = timeout(Duration::from_secs(30), recv.next())
1142                .await
1143                .expect("fetch metrics timeout")
1144                .expect("failed to get metrics from a stream");
1145            let mut events = vec![event];
1146            loop {
1147                match timeout(Duration::from_millis(10), recv.next()).await {
1148                    Ok(Some(event)) => events.push(event),
1149                    Ok(None) => break,
1150                    Err(_) => break,
1151                }
1152            }
1153
1154            let clean_endpoint = remove_creds(&endpoint);
1155
1156            assert!(events.len() > 100);
1157            for event in events {
1158                let metric = event.into_metric();
1159                // validate namespace
1160                assert!(metric.namespace() == Some(namespace));
1161                // validate timestamp
1162                let timestamp = metric.timestamp().expect("existed timestamp");
1163                assert!((timestamp - Utc::now()).num_seconds() < 1);
1164                // validate basic tags
1165                let tags = metric.tags().expect("existed tags");
1166                assert_eq!(tags.get("endpoint"), Some(&clean_endpoint[..]));
1167                assert_eq!(tags.get("host"), Some(&host[..]));
1168            }
1169        })
1170        .await;
1171    }
1172
1173    #[tokio::test]
1174    async fn fetch_metrics_mongod() {
1175        trace_init();
1176        test_instance(primary_mongo_address()).await;
1177    }
1178
1179    // TODO
1180    // #[tokio::test]
1181    // async fn fetch_metrics_mongos() {
1182    //     trace_init();
1183    //     test_instance("mongodb://localhost:27018").await;
1184    // }
1185
1186    #[tokio::test]
1187    async fn fetch_metrics_replset() {
1188        trace_init();
1189        test_instance(secondary_mongo_address()).await;
1190    }
1191}