vector/sources/mongodb_metrics/
mod.rs

1use std::time::{Duration, Instant};
2
3use chrono::Utc;
4use futures::{
5    StreamExt,
6    future::{join_all, try_join_all},
7};
8use mongodb::{
9    Client,
10    bson::{self, Bson, Document, doc, from_document},
11    error::Error as MongoError,
12    options::ClientOptions,
13};
14use serde_with::serde_as;
15use snafu::{ResultExt, Snafu};
16use tokio::time;
17use tokio_stream::wrappers::IntervalStream;
18use vector_lib::{
19    ByteSizeOf, EstimatedJsonEncodedSizeOf, configurable::configurable_component, metric_tags,
20};
21
22use crate::{
23    config::{SourceConfig, SourceContext, SourceOutput},
24    event::metric::{Metric, MetricKind, MetricTags, MetricValue},
25    internal_events::{
26        CollectionCompleted, EndpointBytesReceived, MongoDbMetricsBsonParseError,
27        MongoDbMetricsEventsReceived, MongoDbMetricsRequestError, StreamClosedError,
28    },
29};
30
31mod types;
32use types::{CommandBuildInfo, CommandIsMaster, CommandServerStatus, NodeType};
33use vector_lib::config::LogNamespace;
34
35macro_rules! tags {
36    ($tags:expr_2021) => { $tags.clone() };
37    ($tags:expr_2021, $($key:expr_2021 => $value:expr_2021),*) => {
38        {
39            let mut tags = $tags.clone();
40            $(
41                tags.replace($key.into(), $value.to_string());
42            )*
43            tags
44        }
45    };
46}
47
48macro_rules! counter {
49    ($value:expr_2021) => {
50        MetricValue::Counter {
51            value: $value as f64,
52        }
53    };
54}
55
56macro_rules! gauge {
57    ($value:expr_2021) => {
58        MetricValue::Gauge {
59            value: $value as f64,
60        }
61    };
62}
63
64#[derive(Debug, Snafu)]
65enum BuildError {
66    #[snafu(display("invalid endpoint: {}", source))]
67    InvalidEndpoint { source: MongoError },
68    #[snafu(display("invalid client options: {}", source))]
69    InvalidClientOptions { source: MongoError },
70}
71
72#[derive(Debug)]
73enum CollectError {
74    Mongo(MongoError),
75    Bson(bson::de::Error),
76}
77
78/// Configuration for the `mongodb_metrics` source.
79#[serde_as]
80#[configurable_component(source("mongodb_metrics", "Collect metrics from the MongoDB database."))]
81#[derive(Clone, Debug, Default)]
82#[serde(deny_unknown_fields)]
83pub struct MongoDbMetricsConfig {
84    /// A list of MongoDB instances to scrape.
85    ///
86    /// Each endpoint must be in the [Connection String URI Format](https://www.mongodb.com/docs/manual/reference/connection-string/).
87    #[configurable(metadata(docs::examples = "mongodb://localhost:27017"))]
88    endpoints: Vec<String>,
89
90    /// The interval between scrapes, in seconds.
91    #[serde(default = "default_scrape_interval_secs")]
92    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
93    #[configurable(metadata(docs::human_name = "Scrape Interval"))]
94    scrape_interval_secs: Duration,
95
96    /// Overrides the default namespace for the metrics emitted by the source.
97    ///
98    /// If set to an empty string, no namespace is added to the metrics.
99    ///
100    /// By default, `mongodb` is used.
101    #[serde(default = "default_namespace")]
102    namespace: String,
103}
104
105#[derive(Debug)]
106struct MongoDbMetrics {
107    client: Client,
108    endpoint: String,
109    namespace: Option<String>,
110    tags: MetricTags,
111}
112
113pub const fn default_scrape_interval_secs() -> Duration {
114    Duration::from_secs(15)
115}
116
117pub fn default_namespace() -> String {
118    "mongodb".to_string()
119}
120
121impl_generate_config_from_default!(MongoDbMetricsConfig);
122
123#[async_trait::async_trait]
124#[typetag::serde(name = "mongodb_metrics")]
125impl SourceConfig for MongoDbMetricsConfig {
126    async fn build(&self, mut cx: SourceContext) -> crate::Result<super::Source> {
127        let namespace = Some(self.namespace.clone()).filter(|namespace| !namespace.is_empty());
128
129        let sources = try_join_all(
130            self.endpoints
131                .iter()
132                .map(|endpoint| MongoDbMetrics::new(endpoint, namespace.clone())),
133        )
134        .await?;
135
136        let duration = self.scrape_interval_secs;
137        let shutdown = cx.shutdown;
138        Ok(Box::pin(async move {
139            let mut interval = IntervalStream::new(time::interval(duration)).take_until(shutdown);
140            while interval.next().await.is_some() {
141                let start = Instant::now();
142                let metrics = join_all(sources.iter().map(|mongodb| mongodb.collect())).await;
143                emit!(CollectionCompleted {
144                    start,
145                    end: Instant::now()
146                });
147
148                let metrics: Vec<Metric> = metrics.into_iter().flatten().collect();
149                let count = metrics.len();
150
151                if (cx.out.send_batch(metrics).await).is_err() {
152                    emit!(StreamClosedError { count });
153                    return Err(());
154                }
155            }
156
157            Ok(())
158        }))
159    }
160
161    fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
162        vec![SourceOutput::new_metrics()]
163    }
164
165    fn can_acknowledge(&self) -> bool {
166        false
167    }
168}
169
170impl MongoDbMetrics {
171    /// Works only with Standalone connection-string. Collect metrics only from specified instance.
172    /// <https://docs.mongodb.com/manual/reference/connection-string/#standard-connection-string-format>
173    async fn new(endpoint: &str, namespace: Option<String>) -> Result<MongoDbMetrics, BuildError> {
174        let mut client_options = ClientOptions::parse(endpoint)
175            .await
176            .context(InvalidEndpointSnafu)?;
177        client_options.direct_connection = Some(true);
178
179        let endpoint = sanitize_endpoint(endpoint, &client_options);
180        let tags = metric_tags!(
181            "endpoint" => endpoint.clone(),
182            "host" => client_options.hosts[0].to_string(),
183        );
184
185        Ok(Self {
186            client: Client::with_options(client_options).context(InvalidClientOptionsSnafu)?,
187            endpoint,
188            namespace,
189            tags,
190        })
191    }
192
193    /// Finding node type for client with `isMaster` command.
194    async fn get_node_type(&self) -> Result<NodeType, CollectError> {
195        let doc = self
196            .client
197            .database("admin")
198            .run_command(doc! { "isMaster": 1 })
199            .await
200            .map_err(CollectError::Mongo)?;
201        let msg: CommandIsMaster = from_document(doc).map_err(CollectError::Bson)?;
202
203        Ok(if msg.set_name.is_some() || msg.hosts.is_some() {
204            NodeType::Replset
205        } else if msg.msg.map(|msg| msg == "isdbgrid").unwrap_or(false) {
206            // Contains the value isdbgrid when isMaster returns from a mongos instance.
207            // <https://docs.mongodb.com/manual/reference/command/isMaster/#isMaster.msg>
208            // <https://docs.mongodb.com/manual/core/sharded-cluster-query-router/#confirm-connection-to-mongos-instances>
209            NodeType::Mongos
210        } else {
211            NodeType::Mongod
212        })
213    }
214
215    async fn get_build_info(&self) -> Result<CommandBuildInfo, CollectError> {
216        let doc = self
217            .client
218            .database("admin")
219            .run_command(doc! { "buildInfo": 1 })
220            .await
221            .map_err(CollectError::Mongo)?;
222        from_document(doc).map_err(CollectError::Bson)
223    }
224
225    async fn print_version(&self) -> Result<(), CollectError> {
226        if tracing::level_enabled!(tracing::Level::DEBUG) {
227            let node_type = self.get_node_type().await?;
228            let build_info = self.get_build_info().await?;
229            debug!(
230                message = "Connected to server.", endpoint = %self.endpoint, node_type = ?node_type, server_version = ?serde_json::to_string(&build_info).unwrap()
231            );
232        }
233
234        Ok(())
235    }
236
237    fn create_metric(&self, name: &str, value: MetricValue, tags: MetricTags) -> Metric {
238        Metric::new(name, MetricKind::Absolute, value)
239            .with_namespace(self.namespace.clone())
240            .with_tags(Some(tags))
241            .with_timestamp(Some(Utc::now()))
242    }
243
244    async fn collect(&self) -> Vec<Metric> {
245        // `up` metric is `1` if collection is successful, otherwise `0`.
246        let (up_value, mut metrics) = match self.collect_server_status().await {
247            Ok(metrics) => (1.0, metrics),
248            Err(error) => {
249                match error {
250                    CollectError::Mongo(error) => emit!(MongoDbMetricsRequestError {
251                        error,
252                        endpoint: &self.endpoint,
253                    }),
254                    CollectError::Bson(error) => emit!(MongoDbMetricsBsonParseError {
255                        error,
256                        endpoint: &self.endpoint,
257                    }),
258                }
259
260                (0.0, vec![])
261            }
262        };
263
264        metrics.push(self.create_metric("up", gauge!(up_value), tags!(self.tags)));
265
266        emit!(MongoDbMetricsEventsReceived {
267            byte_size: metrics.estimated_json_encoded_size_of(),
268            count: metrics.len(),
269            endpoint: &self.endpoint,
270        });
271
272        metrics
273    }
274
275    /// Collect metrics from `serverStatus` command.
276    /// <https://docs.mongodb.com/manual/reference/command/serverStatus/>
277    async fn collect_server_status(&self) -> Result<Vec<Metric>, CollectError> {
278        self.print_version().await?;
279
280        let mut metrics = vec![];
281
282        let command = doc! { "serverStatus": 1, "opLatencies": { "histograms": true }};
283        let db = self.client.database("admin");
284        let doc = db.run_command(command).await.map_err(CollectError::Mongo)?;
285        let byte_size = document_size(&doc);
286        emit!(EndpointBytesReceived {
287            byte_size,
288            protocol: "tcp",
289            endpoint: &self.endpoint,
290        });
291        let status: CommandServerStatus = from_document(doc).map_err(CollectError::Bson)?;
292
293        // asserts_total
294        metrics.push(self.create_metric(
295            "asserts_total",
296            counter!(status.asserts.regular),
297            tags!(self.tags, "type" => "regular"),
298        ));
299        metrics.push(self.create_metric(
300            "asserts_total",
301            counter!(status.asserts.warning),
302            tags!(self.tags, "type" => "warning"),
303        ));
304        metrics.push(self.create_metric(
305            "asserts_total",
306            counter!(status.asserts.msg),
307            tags!(self.tags, "type" => "msg"),
308        ));
309        metrics.push(self.create_metric(
310            "asserts_total",
311            counter!(status.asserts.user),
312            tags!(self.tags, "type" => "user"),
313        ));
314        metrics.push(self.create_metric(
315            "asserts_total",
316            counter!(status.asserts.rollovers),
317            tags!(self.tags, "type" => "rollovers"),
318        ));
319
320        // connections
321        metrics.push(self.create_metric(
322            "connections",
323            counter!(status.connections.active),
324            tags!(self.tags, "state" => "active"),
325        ));
326        metrics.push(self.create_metric(
327            "connections",
328            counter!(status.connections.available),
329            tags!(self.tags, "state" => "available"),
330        ));
331        metrics.push(self.create_metric(
332            "connections",
333            counter!(status.connections.current),
334            tags!(self.tags, "state" => "current"),
335        ));
336
337        // extra_info_*
338        if let Some(value) = status.extra_info.heap_usage_bytes {
339            metrics.push(self.create_metric(
340                "extra_info_heap_usage_bytes",
341                gauge!(value),
342                tags!(self.tags),
343            ));
344        }
345        metrics.push(self.create_metric(
346            "extra_info_page_faults",
347            gauge!(status.extra_info.page_faults),
348            tags!(self.tags),
349        ));
350
351        // instance_*
352        metrics.push(self.create_metric(
353            "instance_local_time",
354            gauge!(status.instance.local_time.timestamp_millis() / 1000),
355            tags!(self.tags),
356        ));
357        metrics.push(self.create_metric(
358            "instance_uptime_estimate_seconds_total",
359            gauge!(status.instance.uptime_estimate),
360            tags!(self.tags),
361        ));
362        metrics.push(self.create_metric(
363            "instance_uptime_seconds_total",
364            gauge!(status.instance.uptime),
365            tags!(self.tags),
366        ));
367
368        // memory
369        metrics.push(self.create_metric(
370            "memory",
371            gauge!(status.memory.resident),
372            tags!(self.tags, "type" => "resident"),
373        ));
374        metrics.push(self.create_metric(
375            "memory",
376            gauge!(status.memory.r#virtual),
377            tags!(self.tags, "type" => "virtual"),
378        ));
379        if let Some(value) = status.memory.mapped {
380            metrics.push(self.create_metric(
381                "memory",
382                gauge!(value),
383                tags!(self.tags, "type" => "mapped"),
384            ))
385        }
386        if let Some(value) = status.memory.mapped_with_journal {
387            metrics.push(self.create_metric(
388                "memory",
389                gauge!(value),
390                tags!(self.tags, "type" => "mapped_with_journal"),
391            ))
392        }
393
394        // mongod_global_lock_*
395        metrics.push(self.create_metric(
396            "mongod_global_lock_total_time_seconds",
397            counter!(status.global_lock.total_time),
398            tags!(self.tags),
399        ));
400        metrics.push(self.create_metric(
401            "mongod_global_lock_active_clients",
402            gauge!(status.global_lock.active_clients.total),
403            tags!(self.tags, "type" => "total"),
404        ));
405        metrics.push(self.create_metric(
406            "mongod_global_lock_active_clients",
407            gauge!(status.global_lock.active_clients.readers),
408            tags!(self.tags, "type" => "readers"),
409        ));
410        metrics.push(self.create_metric(
411            "mongod_global_lock_active_clients",
412            gauge!(status.global_lock.active_clients.writers),
413            tags!(self.tags, "type" => "writers"),
414        ));
415        metrics.push(self.create_metric(
416            "mongod_global_lock_current_queue",
417            gauge!(status.global_lock.current_queue.total),
418            tags!(self.tags, "type" => "total"),
419        ));
420        metrics.push(self.create_metric(
421            "mongod_global_lock_current_queue",
422            gauge!(status.global_lock.current_queue.readers),
423            tags!(self.tags, "type" => "readers"),
424        ));
425        metrics.push(self.create_metric(
426            "mongod_global_lock_current_queue",
427            gauge!(status.global_lock.current_queue.writers),
428            tags!(self.tags, "type" => "writers"),
429        ));
430
431        // mongod_locks_time_*
432        for (r#type, lock) in status.locks {
433            if let Some(modes) = lock.time_acquiring_micros {
434                if let Some(value) = modes.read {
435                    metrics.push(self.create_metric(
436                        "mongod_locks_time_acquiring_global_seconds_total",
437                        counter!(value),
438                        tags!(self.tags, "type" => &r#type, "mode" => "read"),
439                    ));
440                }
441                if let Some(value) = modes.write {
442                    metrics.push(self.create_metric(
443                        "mongod_locks_time_acquiring_global_seconds_total",
444                        counter!(value),
445                        tags!(self.tags, "type" => &r#type, "mode" => "write"),
446                    ));
447                }
448            }
449        }
450
451        // mongod_metrics_cursor_*
452        metrics.push(self.create_metric(
453            "mongod_metrics_cursor_timed_out_total",
454            counter!(status.metrics.cursor.timed_out),
455            tags!(self.tags),
456        ));
457        metrics.push(self.create_metric(
458            "mongod_metrics_cursor_open",
459            gauge!(status.metrics.cursor.open.no_timeout),
460            tags!(self.tags, "state" => "no_timeout"),
461        ));
462        metrics.push(self.create_metric(
463            "mongod_metrics_cursor_open",
464            gauge!(status.metrics.cursor.open.pinned),
465            tags!(self.tags, "state" => "pinned"),
466        ));
467        metrics.push(self.create_metric(
468            "mongod_metrics_cursor_open",
469            gauge!(status.metrics.cursor.open.total),
470            tags!(self.tags, "state" => "total"),
471        ));
472
473        // mongod_metrics_document_total
474        metrics.push(self.create_metric(
475            "mongod_metrics_document_total",
476            counter!(status.metrics.document.deleted),
477            tags!(self.tags, "state" => "deleted"),
478        ));
479        metrics.push(self.create_metric(
480            "mongod_metrics_document_total",
481            counter!(status.metrics.document.inserted),
482            tags!(self.tags, "state" => "inserted"),
483        ));
484        metrics.push(self.create_metric(
485            "mongod_metrics_document_total",
486            counter!(status.metrics.document.returned),
487            tags!(self.tags, "state" => "returned"),
488        ));
489        metrics.push(self.create_metric(
490            "mongod_metrics_document_total",
491            counter!(status.metrics.document.updated),
492            tags!(self.tags, "state" => "updated"),
493        ));
494
495        // mongod_metrics_get_last_error_*
496        metrics.push(self.create_metric(
497            "mongod_metrics_get_last_error_wtime_num",
498            gauge!(status.metrics.get_last_error.wtime.num),
499            tags!(self.tags),
500        ));
501        metrics.push(self.create_metric(
502            "mongod_metrics_get_last_error_wtime_seconds_total",
503            counter!(status.metrics.get_last_error.wtime.total_millis / 1000),
504            tags!(self.tags),
505        ));
506        metrics.push(self.create_metric(
507            "mongod_metrics_get_last_error_wtimeouts_total",
508            counter!(status.metrics.get_last_error.wtimeouts),
509            tags!(self.tags),
510        ));
511
512        // mongod_metrics_operation_total
513        metrics.push(self.create_metric(
514            "mongod_metrics_operation_total",
515            counter!(status.metrics.operation.scan_and_order),
516            tags!(self.tags, "type" => "scan_and_order"),
517        ));
518        metrics.push(self.create_metric(
519            "mongod_metrics_operation_total",
520            counter!(status.metrics.operation.write_conflicts),
521            tags!(self.tags, "type" => "write_conflicts"),
522        ));
523
524        // mongod_metrics_query_executor_total
525        metrics.push(self.create_metric(
526            "mongod_metrics_query_executor_total",
527            counter!(status.metrics.query_executor.scanned),
528            tags!(self.tags, "state" => "scanned"),
529        ));
530        metrics.push(self.create_metric(
531            "mongod_metrics_query_executor_total",
532            counter!(status.metrics.query_executor.scanned_objects),
533            tags!(self.tags, "state" => "scanned_objects"),
534        ));
535        if let Some(doc) = status.metrics.query_executor.collection_scans {
536            metrics.push(self.create_metric(
537                "mongod_metrics_query_executor_total",
538                counter!(doc.total),
539                tags!(self.tags, "state" => "collection_scans"),
540            ));
541        }
542
543        // mongod_metrics_record_moves_total
544        if let Some(record) = status.metrics.record {
545            metrics.push(self.create_metric(
546                "mongod_metrics_record_moves_total",
547                counter!(record.moves),
548                tags!(self.tags),
549            ));
550        }
551
552        // mongod_metrics_repl_apply_
553        metrics.push(self.create_metric(
554            "mongod_metrics_repl_apply_batches_num_total",
555            counter!(status.metrics.repl.apply.batches.num),
556            tags!(self.tags),
557        ));
558        metrics.push(self.create_metric(
559            "mongod_metrics_repl_apply_batches_seconds_total",
560            counter!(status.metrics.repl.apply.batches.total_millis / 1000),
561            tags!(self.tags),
562        ));
563        metrics.push(self.create_metric(
564            "mongod_metrics_repl_apply_ops_total",
565            counter!(status.metrics.repl.apply.ops),
566            tags!(self.tags),
567        ));
568
569        // mongod_metrics_repl_buffer_*
570        metrics.push(self.create_metric(
571            "mongod_metrics_repl_buffer_count",
572            counter!(status.metrics.repl.buffer.count),
573            tags!(self.tags),
574        ));
575        metrics.push(self.create_metric(
576            "mongod_metrics_repl_buffer_max_size_bytes_total",
577            counter!(status.metrics.repl.buffer.max_size_bytes),
578            tags!(self.tags),
579        ));
580        metrics.push(self.create_metric(
581            "mongod_metrics_repl_buffer_size_bytes",
582            counter!(status.metrics.repl.buffer.size_bytes),
583            tags!(self.tags),
584        ));
585
586        // mongod_metrics_repl_executor_*
587        metrics.push(self.create_metric(
588            "mongod_metrics_repl_executor_queue",
589            gauge!(status.metrics.repl.executor.queues.network_in_progress),
590            tags!(self.tags, "type" => "network_in_progress"),
591        ));
592        metrics.push(self.create_metric(
593            "mongod_metrics_repl_executor_queue",
594            gauge!(status.metrics.repl.executor.queues.sleepers),
595            tags!(self.tags, "type" => "sleepers"),
596        ));
597        metrics.push(self.create_metric(
598            "mongod_metrics_repl_executor_unsignaled_events",
599            gauge!(status.metrics.repl.executor.unsignaled_events),
600            tags!(self.tags),
601        ));
602
603        // mongod_metrics_repl_network_*
604        metrics.push(self.create_metric(
605            "mongod_metrics_repl_network_bytes_total",
606            counter!(status.metrics.repl.network.bytes),
607            tags!(self.tags),
608        ));
609        metrics.push(self.create_metric(
610            "mongod_metrics_repl_network_getmores_num_total",
611            counter!(status.metrics.repl.network.getmores.num),
612            tags!(self.tags),
613        ));
614        metrics.push(self.create_metric(
615            "mongod_metrics_repl_network_getmores_seconds_total",
616            counter!(status.metrics.repl.network.getmores.total_millis / 1000),
617            tags!(self.tags),
618        ));
619        metrics.push(self.create_metric(
620            "mongod_metrics_repl_network_ops_total",
621            counter!(status.metrics.repl.network.ops),
622            tags!(self.tags),
623        ));
624        metrics.push(self.create_metric(
625            "mongod_metrics_repl_network_readers_created_total",
626            counter!(status.metrics.repl.network.readers_created),
627            tags!(self.tags),
628        ));
629
630        // mongod_metrics_ttl_*
631        metrics.push(self.create_metric(
632            "mongod_metrics_ttl_deleted_documents_total",
633            counter!(status.metrics.ttl.deleted_documents),
634            tags!(self.tags),
635        ));
636        metrics.push(self.create_metric(
637            "mongod_metrics_ttl_passes_total",
638            counter!(status.metrics.ttl.passes),
639            tags!(self.tags),
640        ));
641
642        // mongod_op_latencies_*
643        for (r#type, stat) in status.op_latencies {
644            for bucket in stat.histogram {
645                metrics.push(self.create_metric(
646                    "mongod_op_latencies_histogram",
647                    gauge!(bucket.count),
648                    tags!(self.tags, "type" => &r#type, "micros" => bucket.micros.to_string()),
649                ));
650            }
651            metrics.push(self.create_metric(
652                "mongod_op_latencies_latency",
653                gauge!(stat.latency),
654                tags!(self.tags, "type" => &r#type),
655            ));
656            metrics.push(self.create_metric(
657                "mongod_op_latencies_ops_total",
658                gauge!(stat.ops),
659                tags!(self.tags, "type" => &r#type),
660            ));
661        }
662
663        // mongod_storage_engine
664        metrics.push(self.create_metric(
665            "mongod_storage_engine",
666            gauge!(1),
667            tags!(self.tags, "engine" => status.storage_engine.name),
668        ));
669
670        // mongod_wiredtiger_*
671        if let Some(stat) = status.wired_tiger {
672            // mongod_wiredtiger_blockmanager_blocks_total
673            metrics.push(self.create_metric(
674                "mongod_wiredtiger_blockmanager_blocks_total",
675                counter!(stat.block_manager.blocks_read),
676                tags!(self.tags, "type" => "blocks_read"),
677            ));
678            metrics.push(self.create_metric(
679                "mongod_wiredtiger_blockmanager_blocks_total",
680                counter!(stat.block_manager.mapped_blocks_read),
681                tags!(self.tags, "type" => "blocks_read_mapped"),
682            ));
683            metrics.push(self.create_metric(
684                "mongod_wiredtiger_blockmanager_blocks_total",
685                counter!(stat.block_manager.blocks_pre_loaded),
686                tags!(self.tags, "type" => "blocks_pre_loaded"),
687            ));
688            metrics.push(self.create_metric(
689                "mongod_wiredtiger_blockmanager_blocks_total",
690                counter!(stat.block_manager.blocks_written),
691                tags!(self.tags, "type" => "blocks_written"),
692            ));
693
694            // mongod_wiredtiger_blockmanager_bytes_total
695            metrics.push(self.create_metric(
696                "mongod_wiredtiger_blockmanager_bytes_total",
697                counter!(stat.block_manager.bytes_read),
698                tags!(self.tags, "type" => "bytes_read"),
699            ));
700            metrics.push(self.create_metric(
701                "mongod_wiredtiger_blockmanager_bytes_total",
702                counter!(stat.block_manager.mapped_bytes_read),
703                tags!(self.tags, "type" => "bytes_read_mapped"),
704            ));
705            metrics.push(self.create_metric(
706                "mongod_wiredtiger_blockmanager_bytes_total",
707                counter!(stat.block_manager.bytes_written),
708                tags!(self.tags, "type" => "bytes_written"),
709            ));
710
711            // mongod_wiredtiger_cache_bytes
712            metrics.push(self.create_metric(
713                "mongod_wiredtiger_cache_bytes",
714                gauge!(stat.cache.bytes_total),
715                tags!(self.tags, "type" => "total"),
716            ));
717            metrics.push(self.create_metric(
718                "mongod_wiredtiger_cache_bytes",
719                gauge!(stat.cache.bytes_dirty),
720                tags!(self.tags, "type" => "dirty"),
721            ));
722            metrics.push(self.create_metric(
723                "mongod_wiredtiger_cache_bytes",
724                gauge!(stat.cache.bytes_internal_pages),
725                tags!(self.tags, "type" => "internal_pages"),
726            ));
727            metrics.push(self.create_metric(
728                "mongod_wiredtiger_cache_bytes",
729                gauge!(stat.cache.bytes_leaf_pages),
730                tags!(self.tags, "type" => "leaf_pages"),
731            ));
732
733            // mongod_wiredtiger_cache_bytes_total
734            metrics.push(self.create_metric(
735                "mongod_wiredtiger_cache_bytes_total",
736                counter!(stat.cache.pages_read_into),
737                tags!(self.tags, "type" => "read"),
738            ));
739            metrics.push(self.create_metric(
740                "mongod_wiredtiger_cache_bytes_total",
741                counter!(stat.cache.pages_written_from),
742                tags!(self.tags, "type" => "written"),
743            ));
744
745            // mongod_wiredtiger_cache_evicted_total
746            metrics.push(self.create_metric(
747                "mongod_wiredtiger_cache_evicted_total",
748                counter!(stat.cache.evicted_modified),
749                tags!(self.tags, "type" => "modified"),
750            ));
751            metrics.push(self.create_metric(
752                "mongod_wiredtiger_cache_evicted_total",
753                counter!(stat.cache.evicted_unmodified),
754                tags!(self.tags, "type" => "unmodified"),
755            ));
756
757            // mongod_wiredtiger_cache_max_bytes
758            metrics.push(self.create_metric(
759                "mongod_wiredtiger_cache_max_bytes",
760                gauge!(stat.cache.max_bytes),
761                tags!(self.tags),
762            ));
763
764            // mongod_wiredtiger_cache_overhead_percent
765            metrics.push(self.create_metric(
766                "mongod_wiredtiger_cache_overhead_percent",
767                gauge!(stat.cache.percent_overhead),
768                tags!(self.tags),
769            ));
770
771            // mongod_wiredtiger_cache_pages
772            metrics.push(self.create_metric(
773                "mongod_wiredtiger_cache_pages",
774                gauge!(stat.cache.pages_total),
775                tags!(self.tags, "type" => "total"),
776            ));
777            metrics.push(self.create_metric(
778                "mongod_wiredtiger_cache_pages",
779                gauge!(stat.cache.pages_dirty),
780                tags!(self.tags, "type" => "dirty"),
781            ));
782
783            // mongod_wiredtiger_cache_pages_total
784            metrics.push(self.create_metric(
785                "mongod_wiredtiger_cache_pages_total",
786                counter!(stat.cache.pages_read_into),
787                tags!(self.tags, "type" => "read"),
788            ));
789            metrics.push(self.create_metric(
790                "mongod_wiredtiger_cache_pages_total",
791                counter!(stat.cache.pages_written_from),
792                tags!(self.tags, "type" => "write"),
793            ));
794
795            // mongod_wiredtiger_concurrent_transactions_*
796            metrics.push(self.create_metric(
797                "mongod_wiredtiger_concurrent_transactions_available_tickets",
798                gauge!(stat.concurrent_transactions.read.available),
799                tags!(self.tags, "type" => "read"),
800            ));
801            metrics.push(self.create_metric(
802                "mongod_wiredtiger_concurrent_transactions_available_tickets",
803                gauge!(stat.concurrent_transactions.write.available),
804                tags!(self.tags, "type" => "write"),
805            ));
806            metrics.push(self.create_metric(
807                "mongod_wiredtiger_concurrent_transactions_out_tickets",
808                gauge!(stat.concurrent_transactions.read.out),
809                tags!(self.tags, "type" => "read"),
810            ));
811            metrics.push(self.create_metric(
812                "mongod_wiredtiger_concurrent_transactions_out_tickets",
813                gauge!(stat.concurrent_transactions.write.out),
814                tags!(self.tags, "type" => "write"),
815            ));
816            metrics.push(self.create_metric(
817                "mongod_wiredtiger_concurrent_transactions_total_tickets",
818                gauge!(stat.concurrent_transactions.read.total_tickets),
819                tags!(self.tags, "type" => "read"),
820            ));
821            metrics.push(self.create_metric(
822                "mongod_wiredtiger_concurrent_transactions_total_tickets",
823                gauge!(stat.concurrent_transactions.write.total_tickets),
824                tags!(self.tags, "type" => "write"),
825            ));
826
827            // mongod_wiredtiger_log_*
828            metrics.push(self.create_metric(
829                "mongod_wiredtiger_log_bytes_total",
830                counter!(stat.log.bytes_payload_data),
831                tags!(self.tags, "type" => "payload"),
832            ));
833            metrics.push(self.create_metric(
834                "mongod_wiredtiger_log_bytes_total",
835                counter!(stat.log.bytes_written),
836                tags!(self.tags, "type" => "written"),
837            ));
838            metrics.push(self.create_metric(
839                "mongod_wiredtiger_log_operations_total",
840                counter!(stat.log.log_writes),
841                tags!(self.tags, "type" => "write"),
842            ));
843            metrics.push(self.create_metric(
844                "mongod_wiredtiger_log_operations_total",
845                counter!(stat.log.log_scans),
846                tags!(self.tags, "type" => "scan"),
847            ));
848            metrics.push(self.create_metric(
849                "mongod_wiredtiger_log_operations_total",
850                counter!(stat.log.log_scans_double),
851                tags!(self.tags, "type" => "scan_double"),
852            ));
853            metrics.push(self.create_metric(
854                "mongod_wiredtiger_log_operations_total",
855                counter!(stat.log.log_syncs),
856                tags!(self.tags, "type" => "sync"),
857            ));
858            metrics.push(self.create_metric(
859                "mongod_wiredtiger_log_operations_total",
860                counter!(stat.log.log_sync_dirs),
861                tags!(self.tags, "type" => "sync_dir"),
862            ));
863            metrics.push(self.create_metric(
864                "mongod_wiredtiger_log_operations_total",
865                counter!(stat.log.log_flushes),
866                tags!(self.tags, "type" => "flush"),
867            ));
868            metrics.push(self.create_metric(
869                "mongod_wiredtiger_log_records_scanned_total",
870                counter!(stat.log.records_compressed),
871                tags!(self.tags, "type" => "compressed"),
872            ));
873            metrics.push(self.create_metric(
874                "mongod_wiredtiger_log_records_scanned_total",
875                counter!(stat.log.records_uncompressed),
876                tags!(self.tags, "type" => "uncompressed"),
877            ));
878            metrics.push(self.create_metric(
879                "mongod_wiredtiger_log_records_total",
880                counter!(stat.log.records_processed_log_scan),
881                tags!(self.tags),
882            ));
883
884            // mongod_wiredtiger_session_open_sessions
885            metrics.push(self.create_metric(
886                "mongod_wiredtiger_session_open_sessions",
887                gauge!(stat.session.sessions),
888                tags!(self.tags),
889            ));
890
891            // mongod_wiredtiger_transactions_*
892            metrics.push(self.create_metric(
893                "mongod_wiredtiger_transactions_checkpoint_seconds",
894                gauge!(stat.transaction.checkpoint_min_ms / 1000),
895                tags!(self.tags, "type" => "min"),
896            ));
897            metrics.push(self.create_metric(
898                "mongod_wiredtiger_transactions_checkpoint_seconds",
899                gauge!(stat.transaction.checkpoint_max_ms / 1000),
900                tags!(self.tags, "type" => "max"),
901            ));
902            metrics.push(self.create_metric(
903                "mongod_wiredtiger_transactions_checkpoint_seconds_total",
904                counter!(stat.transaction.checkpoint_total_ms / 1000),
905                tags!(self.tags),
906            ));
907            metrics.push(self.create_metric(
908                "mongod_wiredtiger_transactions_running_checkpoints",
909                gauge!(stat.transaction.checkpoints_running),
910                tags!(self.tags),
911            ));
912            metrics.push(self.create_metric(
913                "mongod_wiredtiger_transactions_total",
914                counter!(stat.transaction.begins),
915                tags!(self.tags, "type" => "begins"),
916            ));
917            metrics.push(self.create_metric(
918                "mongod_wiredtiger_transactions_total",
919                counter!(stat.transaction.checkpoints),
920                tags!(self.tags, "type" => "checkpoints"),
921            ));
922            metrics.push(self.create_metric(
923                "mongod_wiredtiger_transactions_total",
924                counter!(stat.transaction.committed),
925                tags!(self.tags, "type" => "committed"),
926            ));
927            metrics.push(self.create_metric(
928                "mongod_wiredtiger_transactions_total",
929                counter!(stat.transaction.rolled_back),
930                tags!(self.tags, "type" => "rolledback"),
931            ));
932        }
933
934        // network_*
935        metrics.push(self.create_metric(
936            "network_bytes_total",
937            counter!(status.network.bytes_in),
938            tags!(self.tags, "state" => "bytes_in"),
939        ));
940        metrics.push(self.create_metric(
941            "network_bytes_total",
942            counter!(status.network.bytes_out),
943            tags!(self.tags, "state" => "bytes_out"),
944        ));
945        metrics.push(self.create_metric(
946            "network_metrics_num_requests_total",
947            counter!(status.network.num_requests),
948            tags!(self.tags),
949        ));
950
951        // op_counters_repl_total
952        for (r#type, value) in status.opcounters {
953            metrics.push(self.create_metric(
954                "op_counters_repl_total",
955                counter!(value),
956                tags!(self.tags, "type" => r#type),
957            ));
958        }
959
960        // op_counters_total
961        for (r#type, value) in status.opcounters_repl {
962            metrics.push(self.create_metric(
963                "op_counters_total",
964                counter!(value),
965                tags!(self.tags, "type" => r#type),
966            ));
967        }
968
969        Ok(metrics)
970    }
971}
972
973fn bson_size(value: &Bson) -> usize {
974    match value {
975        Bson::Double(value) => value.size_of(),
976        Bson::String(value) => value.size_of(),
977        Bson::Array(value) => value.iter().map(bson_size).sum(),
978        Bson::Document(value) => document_size(value),
979        Bson::Boolean(_) => std::mem::size_of::<bool>(),
980        Bson::RegularExpression(value) => value.pattern.size_of(),
981        Bson::JavaScriptCode(value) => value.size_of(),
982        Bson::JavaScriptCodeWithScope(value) => value.code.size_of() + document_size(&value.scope),
983        Bson::Int32(value) => value.size_of(),
984        Bson::Int64(value) => value.size_of(),
985        Bson::Timestamp(value) => value.time.size_of() + value.increment.size_of(),
986        Bson::Binary(value) => value.bytes.size_of(),
987        Bson::ObjectId(value) => value.bytes().size_of(),
988        Bson::DateTime(_) => std::mem::size_of::<i64>(),
989        Bson::Symbol(value) => value.size_of(),
990        Bson::Decimal128(value) => value.bytes().size_of(),
991        Bson::DbPointer(_) => {
992            // DbPointer parts are not public and cannot be evaluated
993            0
994        }
995        Bson::Null | Bson::Undefined | Bson::MaxKey | Bson::MinKey => 0,
996    }
997}
998
999fn document_size(doc: &Document) -> usize {
1000    doc.into_iter()
1001        .map(|(key, value)| key.size_of() + bson_size(value))
1002        .sum()
1003}
1004
1005/// Remove credentials from endpoint.
1006/// URI components: <https://docs.mongodb.com/manual/reference/connection-string/#components>
1007/// It's not possible to use [url::Url](https://docs.rs/url/2.1.1/url/struct.Url.html) because connection string can have multiple hosts.
1008/// Would be nice to serialize [ClientOptions](https://docs.rs/mongodb/1.1.1/mongodb/options/struct.ClientOptions.html) to String, but it's not supported.
1009/// `endpoint` argument would not be required, but field `original_uri` in `ClientOptions` is private.
1010/// `.unwrap()` in function is safe because endpoint was already verified by `ClientOptions`.
1011/// Based on ClientOptions::parse_uri -- <https://github.com/mongodb/mongo-rust-driver/blob/09e1193f93dcd850ebebb7fb82f6ab786fd85de1/src/client/options/mod.rs#L708>
1012fn sanitize_endpoint(endpoint: &str, options: &ClientOptions) -> String {
1013    let mut endpoint = endpoint.to_owned();
1014    if options.credential.is_some() {
1015        let start = endpoint.find("://").unwrap() + 3;
1016
1017        // Split `username:password@host[:port]` and `/defaultauthdb?<options>`
1018        let pre_slash = match endpoint[start..].find('/') {
1019            Some(index) => {
1020                let mut segments = endpoint[start..].split_at(index);
1021                // If we have databases and options
1022                if segments.1.len() > 1 {
1023                    let lstart = start + segments.0.len() + 1;
1024                    let post_slash = &segments.1[1..];
1025                    // Split `/defaultauthdb` and `?<options>`
1026                    if let Some(index) = post_slash.find('?') {
1027                        let segments = post_slash.split_at(index);
1028                        // If we have options
1029                        if segments.1.len() > 1 {
1030                            // Remove authentication options
1031                            let options = segments.1[1..]
1032                                .split('&')
1033                                .filter(|pair| {
1034                                    let (key, _) = pair.split_at(pair.find('=').unwrap());
1035                                    !matches!(
1036                                        key.to_lowercase().as_str(),
1037                                        "authsource" | "authmechanism" | "authmechanismproperties"
1038                                    )
1039                                })
1040                                .collect::<Vec<_>>()
1041                                .join("&");
1042
1043                            // Update options in endpoint
1044                            endpoint = format!(
1045                                "{}{}",
1046                                &endpoint[..lstart + segments.0.len() + 1],
1047                                &options
1048                            );
1049                        }
1050                    }
1051                    segments = endpoint[start..].split_at(index);
1052                }
1053                segments.0
1054            }
1055            None => &endpoint[start..],
1056        };
1057
1058        // Remove `username:password@`
1059        let end = pre_slash.rfind('@').unwrap() + 1;
1060        endpoint = format!("{}{}", &endpoint[0..start], &endpoint[start + end..]);
1061    }
1062    endpoint
1063}
1064
1065#[cfg(test)]
1066mod tests {
1067    use super::*;
1068
1069    #[test]
1070    fn generate_config() {
1071        crate::test_util::test_generate_config::<MongoDbMetricsConfig>();
1072    }
1073
1074    #[tokio::test]
1075    async fn sanitize_endpoint_test() {
1076        let endpoint = "mongodb://myDBReader:D1fficultP%40ssw0rd@mongos0.example.com:27017,mongos1.example.com:27017,mongos2.example.com:27017/?authSource=admin&tls=true";
1077        let client_options = ClientOptions::parse(endpoint).await.unwrap();
1078        let endpoint = sanitize_endpoint(endpoint, &client_options);
1079        assert_eq!(
1080            &endpoint,
1081            "mongodb://mongos0.example.com:27017,mongos1.example.com:27017,mongos2.example.com:27017/?tls=true"
1082        );
1083    }
1084}
1085
1086#[cfg(all(test, feature = "mongodb_metrics-integration-tests"))]
1087mod integration_tests {
1088    use futures::StreamExt;
1089    use tokio::time::{Duration, timeout};
1090
1091    use super::*;
1092    use crate::{
1093        SourceSender,
1094        test_util::{
1095            components::{PULL_SOURCE_TAGS, assert_source_compliance},
1096            trace_init,
1097        },
1098    };
1099
1100    fn primary_mongo_address() -> String {
1101        std::env::var("PRIMARY_MONGODB_ADDRESS")
1102            .unwrap_or_else(|_| "mongodb://localhost:27017".into())
1103    }
1104
1105    fn secondary_mongo_address() -> String {
1106        std::env::var("SECONDARY_MONGODB_ADDRESS")
1107            .unwrap_or_else(|_| "mongodb://localhost:27019".into())
1108    }
1109
1110    fn remove_creds(address: &str) -> String {
1111        let mut url = url::Url::parse(address).unwrap();
1112        url.set_password(None).unwrap();
1113        url.set_username("").unwrap();
1114        url.to_string()
1115    }
1116
1117    async fn test_instance(endpoint: String) {
1118        assert_source_compliance(&PULL_SOURCE_TAGS, async {
1119            let host = ClientOptions::parse(endpoint.as_str()).await.unwrap().hosts[0].to_string();
1120            let namespace = "vector_mongodb";
1121
1122            let (sender, mut recv) = SourceSender::new_test();
1123
1124            let endpoints = vec![endpoint.clone()];
1125            tokio::spawn(async move {
1126                MongoDbMetricsConfig {
1127                    endpoints,
1128                    scrape_interval_secs: Duration::from_secs(15),
1129                    namespace: namespace.to_owned(),
1130                }
1131                .build(SourceContext::new_test(sender, None))
1132                .await
1133                .unwrap()
1134                .await
1135                .unwrap()
1136            });
1137
1138            // TODO: We should have a simpler/cleaner method for this sort of collection, where we're essentially waiting
1139            // for a burst of events, and want to debounce ourselves in terms of stopping collection once all events in the
1140            // burst have been collected. This code here isn't bad or anything... I've just noticed now that we do it in a
1141            // few places, and we could solve it in a cleaner way, most likely.
1142            let event = timeout(Duration::from_secs(30), recv.next())
1143                .await
1144                .expect("fetch metrics timeout")
1145                .expect("failed to get metrics from a stream");
1146            let mut events = vec![event];
1147            loop {
1148                match timeout(Duration::from_millis(10), recv.next()).await {
1149                    Ok(Some(event)) => events.push(event),
1150                    Ok(None) => break,
1151                    Err(_) => break,
1152                }
1153            }
1154
1155            let clean_endpoint = remove_creds(&endpoint);
1156
1157            assert!(events.len() > 100);
1158            for event in events {
1159                let metric = event.into_metric();
1160                // validate namespace
1161                assert!(metric.namespace() == Some(namespace));
1162                // validate timestamp
1163                let timestamp = metric.timestamp().expect("existed timestamp");
1164                assert!((timestamp - Utc::now()).num_seconds() < 1);
1165                // validate basic tags
1166                let tags = metric.tags().expect("existed tags");
1167                assert_eq!(tags.get("endpoint"), Some(&clean_endpoint[..]));
1168                assert_eq!(tags.get("host"), Some(&host[..]));
1169            }
1170        })
1171        .await;
1172    }
1173
1174    #[tokio::test]
1175    async fn fetch_metrics_mongod() {
1176        trace_init();
1177        test_instance(primary_mongo_address()).await;
1178    }
1179
1180    // TODO
1181    // #[tokio::test]
1182    // async fn fetch_metrics_mongos() {
1183    //     trace_init();
1184    //     test_instance("mongodb://localhost:27018").await;
1185    // }
1186
1187    #[tokio::test]
1188    async fn fetch_metrics_replset() {
1189        trace_init();
1190        test_instance(secondary_mongo_address()).await;
1191    }
1192}