vector/sources/mongodb_metrics/
mod.rs

1use std::time::{Duration, Instant};
2
3use chrono::Utc;
4use futures::{
5    StreamExt,
6    future::{join_all, try_join_all},
7};
8use mongodb::{
9    Client,
10    bson::{self, Bson, Document, doc, from_document},
11    error::Error as MongoError,
12    options::ClientOptions,
13};
14use serde_with::serde_as;
15use snafu::{ResultExt, Snafu};
16use tokio::time;
17use tokio_stream::wrappers::IntervalStream;
18use vector_lib::{
19    ByteSizeOf, EstimatedJsonEncodedSizeOf, configurable::configurable_component, metric_tags,
20};
21
22use crate::{
23    config::{SourceConfig, SourceContext, SourceOutput},
24    event::metric::{Metric, MetricKind, MetricTags, MetricValue},
25    internal_events::{
26        CollectionCompleted, EndpointBytesReceived, MongoDbMetricsBsonParseError,
27        MongoDbMetricsEventsReceived, MongoDbMetricsRequestError, StreamClosedError,
28    },
29};
30
31mod types;
32use types::{CommandBuildInfo, CommandIsMaster, CommandServerStatus, NodeType};
33use vector_lib::config::LogNamespace;
34
35macro_rules! tags {
36    ($tags:expr_2021) => { $tags.clone() };
37    ($tags:expr_2021, $($key:expr_2021 => $value:expr_2021),*) => {
38        {
39            let mut tags = $tags.clone();
40            $(
41                tags.replace($key.into(), $value.to_string());
42            )*
43            tags
44        }
45    };
46}
47
48macro_rules! counter {
49    ($value:expr_2021) => {
50        MetricValue::Counter {
51            value: $value as f64,
52        }
53    };
54}
55
56macro_rules! gauge {
57    ($value:expr_2021) => {
58        MetricValue::Gauge {
59            value: $value as f64,
60        }
61    };
62}
63
64#[derive(Debug, Snafu)]
65enum BuildError {
66    #[snafu(display("invalid endpoint: {}", source))]
67    InvalidEndpoint { source: MongoError },
68    #[snafu(display("invalid client options: {}", source))]
69    InvalidClientOptions { source: MongoError },
70}
71
72#[derive(Debug)]
73enum CollectError {
74    Mongo(MongoError),
75    Bson(bson::de::Error),
76}
77
78/// Configuration for the `mongodb_metrics` source.
79#[serde_as]
80#[configurable_component(source("mongodb_metrics", "Collect metrics from the MongoDB database."))]
81#[derive(Clone, Debug, Default)]
82#[serde(deny_unknown_fields)]
83pub struct MongoDbMetricsConfig {
84    /// A list of MongoDB instances to scrape.
85    ///
86    /// Each endpoint must be in the [Connection String URI Format](https://www.mongodb.com/docs/manual/reference/connection-string/).
87    #[configurable(metadata(docs::examples = "mongodb://localhost:27017"))]
88    endpoints: Vec<String>,
89
90    /// The interval between scrapes, in seconds.
91    #[serde(default = "default_scrape_interval_secs")]
92    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
93    #[configurable(metadata(docs::human_name = "Scrape Interval"))]
94    scrape_interval_secs: Duration,
95
96    /// Overrides the default namespace for the metrics emitted by the source.
97    ///
98    /// If set to an empty string, no namespace is added to the metrics.
99    ///
100    /// By default, `mongodb` is used.
101    #[serde(default = "default_namespace")]
102    namespace: String,
103}
104
105#[derive(Debug)]
106struct MongoDbMetrics {
107    client: Client,
108    endpoint: String,
109    namespace: Option<String>,
110    tags: MetricTags,
111}
112
113pub const fn default_scrape_interval_secs() -> Duration {
114    Duration::from_secs(15)
115}
116
117pub fn default_namespace() -> String {
118    "mongodb".to_string()
119}
120
121impl_generate_config_from_default!(MongoDbMetricsConfig);
122
123#[async_trait::async_trait]
124#[typetag::serde(name = "mongodb_metrics")]
125impl SourceConfig for MongoDbMetricsConfig {
126    async fn build(&self, mut cx: SourceContext) -> crate::Result<super::Source> {
127        let namespace = Some(self.namespace.clone()).filter(|namespace| !namespace.is_empty());
128
129        let sources = try_join_all(
130            self.endpoints
131                .iter()
132                .map(|endpoint| MongoDbMetrics::new(endpoint, namespace.clone())),
133        )
134        .await?;
135
136        let duration = self.scrape_interval_secs;
137        let shutdown = cx.shutdown;
138        Ok(Box::pin(async move {
139            let mut interval = IntervalStream::new(time::interval(duration)).take_until(shutdown);
140            while interval.next().await.is_some() {
141                let start = Instant::now();
142                let metrics = join_all(sources.iter().map(|mongodb| mongodb.collect())).await;
143                emit!(CollectionCompleted {
144                    start,
145                    end: Instant::now()
146                });
147
148                let metrics: Vec<Metric> = metrics.into_iter().flatten().collect();
149                let count = metrics.len();
150
151                if (cx.out.send_batch(metrics).await).is_err() {
152                    emit!(StreamClosedError { count });
153                    return Err(());
154                }
155            }
156
157            Ok(())
158        }))
159    }
160
161    fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
162        vec![SourceOutput::new_metrics()]
163    }
164
165    fn can_acknowledge(&self) -> bool {
166        false
167    }
168}
169
170impl MongoDbMetrics {
171    /// Works only with Standalone connection-string. Collect metrics only from specified instance.
172    /// <https://docs.mongodb.com/manual/reference/connection-string/#standard-connection-string-format>
173    async fn new(endpoint: &str, namespace: Option<String>) -> Result<MongoDbMetrics, BuildError> {
174        let mut client_options = ClientOptions::parse(endpoint)
175            .await
176            .context(InvalidEndpointSnafu)?;
177        client_options.direct_connection = Some(true);
178
179        let endpoint = sanitize_endpoint(endpoint, &client_options);
180        let tags = metric_tags!(
181            "endpoint" => endpoint.clone(),
182            "host" => client_options.hosts[0].to_string(),
183        );
184
185        Ok(Self {
186            client: Client::with_options(client_options).context(InvalidClientOptionsSnafu)?,
187            endpoint,
188            namespace,
189            tags,
190        })
191    }
192
193    /// Finding node type for client with `isMaster` command.
194    async fn get_node_type(&self) -> Result<NodeType, CollectError> {
195        let doc = self
196            .client
197            .database("admin")
198            .run_command(doc! { "isMaster": 1 }, None)
199            .await
200            .map_err(CollectError::Mongo)?;
201        let msg: CommandIsMaster = from_document(doc).map_err(CollectError::Bson)?;
202
203        Ok(if msg.set_name.is_some() || msg.hosts.is_some() {
204            NodeType::Replset
205        } else if msg.msg.map(|msg| msg == "isdbgrid").unwrap_or(false) {
206            // Contains the value isdbgrid when isMaster returns from a mongos instance.
207            // <https://docs.mongodb.com/manual/reference/command/isMaster/#isMaster.msg>
208            // <https://docs.mongodb.com/manual/core/sharded-cluster-query-router/#confirm-connection-to-mongos-instances>
209            NodeType::Mongos
210        } else {
211            NodeType::Mongod
212        })
213    }
214
215    async fn get_build_info(&self) -> Result<CommandBuildInfo, CollectError> {
216        let doc = self
217            .client
218            .database("admin")
219            .run_command(doc! { "buildInfo": 1 }, None)
220            .await
221            .map_err(CollectError::Mongo)?;
222        from_document(doc).map_err(CollectError::Bson)
223    }
224
225    async fn print_version(&self) -> Result<(), CollectError> {
226        if tracing::level_enabled!(tracing::Level::DEBUG) {
227            let node_type = self.get_node_type().await?;
228            let build_info = self.get_build_info().await?;
229            debug!(
230                message = "Connected to server.", endpoint = %self.endpoint, node_type = ?node_type, server_version = ?serde_json::to_string(&build_info).unwrap()
231            );
232        }
233
234        Ok(())
235    }
236
237    fn create_metric(&self, name: &str, value: MetricValue, tags: MetricTags) -> Metric {
238        Metric::new(name, MetricKind::Absolute, value)
239            .with_namespace(self.namespace.clone())
240            .with_tags(Some(tags))
241            .with_timestamp(Some(Utc::now()))
242    }
243
244    async fn collect(&self) -> Vec<Metric> {
245        // `up` metric is `1` if collection is successful, otherwise `0`.
246        let (up_value, mut metrics) = match self.collect_server_status().await {
247            Ok(metrics) => (1.0, metrics),
248            Err(error) => {
249                match error {
250                    CollectError::Mongo(error) => emit!(MongoDbMetricsRequestError {
251                        error,
252                        endpoint: &self.endpoint,
253                    }),
254                    CollectError::Bson(error) => emit!(MongoDbMetricsBsonParseError {
255                        error,
256                        endpoint: &self.endpoint,
257                    }),
258                }
259
260                (0.0, vec![])
261            }
262        };
263
264        metrics.push(self.create_metric("up", gauge!(up_value), tags!(self.tags)));
265
266        emit!(MongoDbMetricsEventsReceived {
267            byte_size: metrics.estimated_json_encoded_size_of(),
268            count: metrics.len(),
269            endpoint: &self.endpoint,
270        });
271
272        metrics
273    }
274
275    /// Collect metrics from `serverStatus` command.
276    /// <https://docs.mongodb.com/manual/reference/command/serverStatus/>
277    async fn collect_server_status(&self) -> Result<Vec<Metric>, CollectError> {
278        self.print_version().await?;
279
280        let mut metrics = vec![];
281
282        let command = doc! { "serverStatus": 1, "opLatencies": { "histograms": true }};
283        let db = self.client.database("admin");
284        let doc = db
285            .run_command(command, None)
286            .await
287            .map_err(CollectError::Mongo)?;
288        let byte_size = document_size(&doc);
289        emit!(EndpointBytesReceived {
290            byte_size,
291            protocol: "tcp",
292            endpoint: &self.endpoint,
293        });
294        let status: CommandServerStatus = from_document(doc).map_err(CollectError::Bson)?;
295
296        // asserts_total
297        metrics.push(self.create_metric(
298            "asserts_total",
299            counter!(status.asserts.regular),
300            tags!(self.tags, "type" => "regular"),
301        ));
302        metrics.push(self.create_metric(
303            "asserts_total",
304            counter!(status.asserts.warning),
305            tags!(self.tags, "type" => "warning"),
306        ));
307        metrics.push(self.create_metric(
308            "asserts_total",
309            counter!(status.asserts.msg),
310            tags!(self.tags, "type" => "msg"),
311        ));
312        metrics.push(self.create_metric(
313            "asserts_total",
314            counter!(status.asserts.user),
315            tags!(self.tags, "type" => "user"),
316        ));
317        metrics.push(self.create_metric(
318            "asserts_total",
319            counter!(status.asserts.rollovers),
320            tags!(self.tags, "type" => "rollovers"),
321        ));
322
323        // connections
324        metrics.push(self.create_metric(
325            "connections",
326            counter!(status.connections.active),
327            tags!(self.tags, "state" => "active"),
328        ));
329        metrics.push(self.create_metric(
330            "connections",
331            counter!(status.connections.available),
332            tags!(self.tags, "state" => "available"),
333        ));
334        metrics.push(self.create_metric(
335            "connections",
336            counter!(status.connections.current),
337            tags!(self.tags, "state" => "current"),
338        ));
339
340        // extra_info_*
341        if let Some(value) = status.extra_info.heap_usage_bytes {
342            metrics.push(self.create_metric(
343                "extra_info_heap_usage_bytes",
344                gauge!(value),
345                tags!(self.tags),
346            ));
347        }
348        metrics.push(self.create_metric(
349            "extra_info_page_faults",
350            gauge!(status.extra_info.page_faults),
351            tags!(self.tags),
352        ));
353
354        // instance_*
355        metrics.push(self.create_metric(
356            "instance_local_time",
357            gauge!(status.instance.local_time.timestamp_millis() / 1000),
358            tags!(self.tags),
359        ));
360        metrics.push(self.create_metric(
361            "instance_uptime_estimate_seconds_total",
362            gauge!(status.instance.uptime_estimate),
363            tags!(self.tags),
364        ));
365        metrics.push(self.create_metric(
366            "instance_uptime_seconds_total",
367            gauge!(status.instance.uptime),
368            tags!(self.tags),
369        ));
370
371        // memory
372        metrics.push(self.create_metric(
373            "memory",
374            gauge!(status.memory.resident),
375            tags!(self.tags, "type" => "resident"),
376        ));
377        metrics.push(self.create_metric(
378            "memory",
379            gauge!(status.memory.r#virtual),
380            tags!(self.tags, "type" => "virtual"),
381        ));
382        if let Some(value) = status.memory.mapped {
383            metrics.push(self.create_metric(
384                "memory",
385                gauge!(value),
386                tags!(self.tags, "type" => "mapped"),
387            ))
388        }
389        if let Some(value) = status.memory.mapped_with_journal {
390            metrics.push(self.create_metric(
391                "memory",
392                gauge!(value),
393                tags!(self.tags, "type" => "mapped_with_journal"),
394            ))
395        }
396
397        // mongod_global_lock_*
398        metrics.push(self.create_metric(
399            "mongod_global_lock_total_time_seconds",
400            counter!(status.global_lock.total_time),
401            tags!(self.tags),
402        ));
403        metrics.push(self.create_metric(
404            "mongod_global_lock_active_clients",
405            gauge!(status.global_lock.active_clients.total),
406            tags!(self.tags, "type" => "total"),
407        ));
408        metrics.push(self.create_metric(
409            "mongod_global_lock_active_clients",
410            gauge!(status.global_lock.active_clients.readers),
411            tags!(self.tags, "type" => "readers"),
412        ));
413        metrics.push(self.create_metric(
414            "mongod_global_lock_active_clients",
415            gauge!(status.global_lock.active_clients.writers),
416            tags!(self.tags, "type" => "writers"),
417        ));
418        metrics.push(self.create_metric(
419            "mongod_global_lock_current_queue",
420            gauge!(status.global_lock.current_queue.total),
421            tags!(self.tags, "type" => "total"),
422        ));
423        metrics.push(self.create_metric(
424            "mongod_global_lock_current_queue",
425            gauge!(status.global_lock.current_queue.readers),
426            tags!(self.tags, "type" => "readers"),
427        ));
428        metrics.push(self.create_metric(
429            "mongod_global_lock_current_queue",
430            gauge!(status.global_lock.current_queue.writers),
431            tags!(self.tags, "type" => "writers"),
432        ));
433
434        // mongod_locks_time_*
435        for (r#type, lock) in status.locks {
436            if let Some(modes) = lock.time_acquiring_micros {
437                if let Some(value) = modes.read {
438                    metrics.push(self.create_metric(
439                        "mongod_locks_time_acquiring_global_seconds_total",
440                        counter!(value),
441                        tags!(self.tags, "type" => &r#type, "mode" => "read"),
442                    ));
443                }
444                if let Some(value) = modes.write {
445                    metrics.push(self.create_metric(
446                        "mongod_locks_time_acquiring_global_seconds_total",
447                        counter!(value),
448                        tags!(self.tags, "type" => &r#type, "mode" => "write"),
449                    ));
450                }
451            }
452        }
453
454        // mongod_metrics_cursor_*
455        metrics.push(self.create_metric(
456            "mongod_metrics_cursor_timed_out_total",
457            counter!(status.metrics.cursor.timed_out),
458            tags!(self.tags),
459        ));
460        metrics.push(self.create_metric(
461            "mongod_metrics_cursor_open",
462            gauge!(status.metrics.cursor.open.no_timeout),
463            tags!(self.tags, "state" => "no_timeout"),
464        ));
465        metrics.push(self.create_metric(
466            "mongod_metrics_cursor_open",
467            gauge!(status.metrics.cursor.open.pinned),
468            tags!(self.tags, "state" => "pinned"),
469        ));
470        metrics.push(self.create_metric(
471            "mongod_metrics_cursor_open",
472            gauge!(status.metrics.cursor.open.total),
473            tags!(self.tags, "state" => "total"),
474        ));
475
476        // mongod_metrics_document_total
477        metrics.push(self.create_metric(
478            "mongod_metrics_document_total",
479            counter!(status.metrics.document.deleted),
480            tags!(self.tags, "state" => "deleted"),
481        ));
482        metrics.push(self.create_metric(
483            "mongod_metrics_document_total",
484            counter!(status.metrics.document.inserted),
485            tags!(self.tags, "state" => "inserted"),
486        ));
487        metrics.push(self.create_metric(
488            "mongod_metrics_document_total",
489            counter!(status.metrics.document.returned),
490            tags!(self.tags, "state" => "returned"),
491        ));
492        metrics.push(self.create_metric(
493            "mongod_metrics_document_total",
494            counter!(status.metrics.document.updated),
495            tags!(self.tags, "state" => "updated"),
496        ));
497
498        // mongod_metrics_get_last_error_*
499        metrics.push(self.create_metric(
500            "mongod_metrics_get_last_error_wtime_num",
501            gauge!(status.metrics.get_last_error.wtime.num),
502            tags!(self.tags),
503        ));
504        metrics.push(self.create_metric(
505            "mongod_metrics_get_last_error_wtime_seconds_total",
506            counter!(status.metrics.get_last_error.wtime.total_millis / 1000),
507            tags!(self.tags),
508        ));
509        metrics.push(self.create_metric(
510            "mongod_metrics_get_last_error_wtimeouts_total",
511            counter!(status.metrics.get_last_error.wtimeouts),
512            tags!(self.tags),
513        ));
514
515        // mongod_metrics_operation_total
516        metrics.push(self.create_metric(
517            "mongod_metrics_operation_total",
518            counter!(status.metrics.operation.scan_and_order),
519            tags!(self.tags, "type" => "scan_and_order"),
520        ));
521        metrics.push(self.create_metric(
522            "mongod_metrics_operation_total",
523            counter!(status.metrics.operation.write_conflicts),
524            tags!(self.tags, "type" => "write_conflicts"),
525        ));
526
527        // mongod_metrics_query_executor_total
528        metrics.push(self.create_metric(
529            "mongod_metrics_query_executor_total",
530            counter!(status.metrics.query_executor.scanned),
531            tags!(self.tags, "state" => "scanned"),
532        ));
533        metrics.push(self.create_metric(
534            "mongod_metrics_query_executor_total",
535            counter!(status.metrics.query_executor.scanned_objects),
536            tags!(self.tags, "state" => "scanned_objects"),
537        ));
538        if let Some(doc) = status.metrics.query_executor.collection_scans {
539            metrics.push(self.create_metric(
540                "mongod_metrics_query_executor_total",
541                counter!(doc.total),
542                tags!(self.tags, "state" => "collection_scans"),
543            ));
544        }
545
546        // mongod_metrics_record_moves_total
547        if let Some(record) = status.metrics.record {
548            metrics.push(self.create_metric(
549                "mongod_metrics_record_moves_total",
550                counter!(record.moves),
551                tags!(self.tags),
552            ));
553        }
554
555        // mongod_metrics_repl_apply_
556        metrics.push(self.create_metric(
557            "mongod_metrics_repl_apply_batches_num_total",
558            counter!(status.metrics.repl.apply.batches.num),
559            tags!(self.tags),
560        ));
561        metrics.push(self.create_metric(
562            "mongod_metrics_repl_apply_batches_seconds_total",
563            counter!(status.metrics.repl.apply.batches.total_millis / 1000),
564            tags!(self.tags),
565        ));
566        metrics.push(self.create_metric(
567            "mongod_metrics_repl_apply_ops_total",
568            counter!(status.metrics.repl.apply.ops),
569            tags!(self.tags),
570        ));
571
572        // mongod_metrics_repl_buffer_*
573        metrics.push(self.create_metric(
574            "mongod_metrics_repl_buffer_count",
575            counter!(status.metrics.repl.buffer.count),
576            tags!(self.tags),
577        ));
578        metrics.push(self.create_metric(
579            "mongod_metrics_repl_buffer_max_size_bytes_total",
580            counter!(status.metrics.repl.buffer.max_size_bytes),
581            tags!(self.tags),
582        ));
583        metrics.push(self.create_metric(
584            "mongod_metrics_repl_buffer_size_bytes",
585            counter!(status.metrics.repl.buffer.size_bytes),
586            tags!(self.tags),
587        ));
588
589        // mongod_metrics_repl_executor_*
590        metrics.push(self.create_metric(
591            "mongod_metrics_repl_executor_queue",
592            gauge!(status.metrics.repl.executor.queues.network_in_progress),
593            tags!(self.tags, "type" => "network_in_progress"),
594        ));
595        metrics.push(self.create_metric(
596            "mongod_metrics_repl_executor_queue",
597            gauge!(status.metrics.repl.executor.queues.sleepers),
598            tags!(self.tags, "type" => "sleepers"),
599        ));
600        metrics.push(self.create_metric(
601            "mongod_metrics_repl_executor_unsignaled_events",
602            gauge!(status.metrics.repl.executor.unsignaled_events),
603            tags!(self.tags),
604        ));
605
606        // mongod_metrics_repl_network_*
607        metrics.push(self.create_metric(
608            "mongod_metrics_repl_network_bytes_total",
609            counter!(status.metrics.repl.network.bytes),
610            tags!(self.tags),
611        ));
612        metrics.push(self.create_metric(
613            "mongod_metrics_repl_network_getmores_num_total",
614            counter!(status.metrics.repl.network.getmores.num),
615            tags!(self.tags),
616        ));
617        metrics.push(self.create_metric(
618            "mongod_metrics_repl_network_getmores_seconds_total",
619            counter!(status.metrics.repl.network.getmores.total_millis / 1000),
620            tags!(self.tags),
621        ));
622        metrics.push(self.create_metric(
623            "mongod_metrics_repl_network_ops_total",
624            counter!(status.metrics.repl.network.ops),
625            tags!(self.tags),
626        ));
627        metrics.push(self.create_metric(
628            "mongod_metrics_repl_network_readers_created_total",
629            counter!(status.metrics.repl.network.readers_created),
630            tags!(self.tags),
631        ));
632
633        // mongod_metrics_ttl_*
634        metrics.push(self.create_metric(
635            "mongod_metrics_ttl_deleted_documents_total",
636            counter!(status.metrics.ttl.deleted_documents),
637            tags!(self.tags),
638        ));
639        metrics.push(self.create_metric(
640            "mongod_metrics_ttl_passes_total",
641            counter!(status.metrics.ttl.passes),
642            tags!(self.tags),
643        ));
644
645        // mongod_op_latencies_*
646        for (r#type, stat) in status.op_latencies {
647            for bucket in stat.histogram {
648                metrics.push(self.create_metric(
649                    "mongod_op_latencies_histogram",
650                    gauge!(bucket.count),
651                    tags!(self.tags, "type" => &r#type, "micros" => bucket.micros.to_string()),
652                ));
653            }
654            metrics.push(self.create_metric(
655                "mongod_op_latencies_latency",
656                gauge!(stat.latency),
657                tags!(self.tags, "type" => &r#type),
658            ));
659            metrics.push(self.create_metric(
660                "mongod_op_latencies_ops_total",
661                gauge!(stat.ops),
662                tags!(self.tags, "type" => &r#type),
663            ));
664        }
665
666        // mongod_storage_engine
667        metrics.push(self.create_metric(
668            "mongod_storage_engine",
669            gauge!(1),
670            tags!(self.tags, "engine" => status.storage_engine.name),
671        ));
672
673        // mongod_wiredtiger_*
674        if let Some(stat) = status.wired_tiger {
675            // mongod_wiredtiger_blockmanager_blocks_total
676            metrics.push(self.create_metric(
677                "mongod_wiredtiger_blockmanager_blocks_total",
678                counter!(stat.block_manager.blocks_read),
679                tags!(self.tags, "type" => "blocks_read"),
680            ));
681            metrics.push(self.create_metric(
682                "mongod_wiredtiger_blockmanager_blocks_total",
683                counter!(stat.block_manager.mapped_blocks_read),
684                tags!(self.tags, "type" => "blocks_read_mapped"),
685            ));
686            metrics.push(self.create_metric(
687                "mongod_wiredtiger_blockmanager_blocks_total",
688                counter!(stat.block_manager.blocks_pre_loaded),
689                tags!(self.tags, "type" => "blocks_pre_loaded"),
690            ));
691            metrics.push(self.create_metric(
692                "mongod_wiredtiger_blockmanager_blocks_total",
693                counter!(stat.block_manager.blocks_written),
694                tags!(self.tags, "type" => "blocks_written"),
695            ));
696
697            // mongod_wiredtiger_blockmanager_bytes_total
698            metrics.push(self.create_metric(
699                "mongod_wiredtiger_blockmanager_bytes_total",
700                counter!(stat.block_manager.bytes_read),
701                tags!(self.tags, "type" => "bytes_read"),
702            ));
703            metrics.push(self.create_metric(
704                "mongod_wiredtiger_blockmanager_bytes_total",
705                counter!(stat.block_manager.mapped_bytes_read),
706                tags!(self.tags, "type" => "bytes_read_mapped"),
707            ));
708            metrics.push(self.create_metric(
709                "mongod_wiredtiger_blockmanager_bytes_total",
710                counter!(stat.block_manager.bytes_written),
711                tags!(self.tags, "type" => "bytes_written"),
712            ));
713
714            // mongod_wiredtiger_cache_bytes
715            metrics.push(self.create_metric(
716                "mongod_wiredtiger_cache_bytes",
717                gauge!(stat.cache.bytes_total),
718                tags!(self.tags, "type" => "total"),
719            ));
720            metrics.push(self.create_metric(
721                "mongod_wiredtiger_cache_bytes",
722                gauge!(stat.cache.bytes_dirty),
723                tags!(self.tags, "type" => "dirty"),
724            ));
725            metrics.push(self.create_metric(
726                "mongod_wiredtiger_cache_bytes",
727                gauge!(stat.cache.bytes_internal_pages),
728                tags!(self.tags, "type" => "internal_pages"),
729            ));
730            metrics.push(self.create_metric(
731                "mongod_wiredtiger_cache_bytes",
732                gauge!(stat.cache.bytes_leaf_pages),
733                tags!(self.tags, "type" => "leaf_pages"),
734            ));
735
736            // mongod_wiredtiger_cache_bytes_total
737            metrics.push(self.create_metric(
738                "mongod_wiredtiger_cache_bytes_total",
739                counter!(stat.cache.pages_read_into),
740                tags!(self.tags, "type" => "read"),
741            ));
742            metrics.push(self.create_metric(
743                "mongod_wiredtiger_cache_bytes_total",
744                counter!(stat.cache.pages_written_from),
745                tags!(self.tags, "type" => "written"),
746            ));
747
748            // mongod_wiredtiger_cache_evicted_total
749            metrics.push(self.create_metric(
750                "mongod_wiredtiger_cache_evicted_total",
751                counter!(stat.cache.evicted_modified),
752                tags!(self.tags, "type" => "modified"),
753            ));
754            metrics.push(self.create_metric(
755                "mongod_wiredtiger_cache_evicted_total",
756                counter!(stat.cache.evicted_unmodified),
757                tags!(self.tags, "type" => "unmodified"),
758            ));
759
760            // mongod_wiredtiger_cache_max_bytes
761            metrics.push(self.create_metric(
762                "mongod_wiredtiger_cache_max_bytes",
763                gauge!(stat.cache.max_bytes),
764                tags!(self.tags),
765            ));
766
767            // mongod_wiredtiger_cache_overhead_percent
768            metrics.push(self.create_metric(
769                "mongod_wiredtiger_cache_overhead_percent",
770                gauge!(stat.cache.percent_overhead),
771                tags!(self.tags),
772            ));
773
774            // mongod_wiredtiger_cache_pages
775            metrics.push(self.create_metric(
776                "mongod_wiredtiger_cache_pages",
777                gauge!(stat.cache.pages_total),
778                tags!(self.tags, "type" => "total"),
779            ));
780            metrics.push(self.create_metric(
781                "mongod_wiredtiger_cache_pages",
782                gauge!(stat.cache.pages_dirty),
783                tags!(self.tags, "type" => "dirty"),
784            ));
785
786            // mongod_wiredtiger_cache_pages_total
787            metrics.push(self.create_metric(
788                "mongod_wiredtiger_cache_pages_total",
789                counter!(stat.cache.pages_read_into),
790                tags!(self.tags, "type" => "read"),
791            ));
792            metrics.push(self.create_metric(
793                "mongod_wiredtiger_cache_pages_total",
794                counter!(stat.cache.pages_written_from),
795                tags!(self.tags, "type" => "write"),
796            ));
797
798            // mongod_wiredtiger_concurrent_transactions_*
799            metrics.push(self.create_metric(
800                "mongod_wiredtiger_concurrent_transactions_available_tickets",
801                gauge!(stat.concurrent_transactions.read.available),
802                tags!(self.tags, "type" => "read"),
803            ));
804            metrics.push(self.create_metric(
805                "mongod_wiredtiger_concurrent_transactions_available_tickets",
806                gauge!(stat.concurrent_transactions.write.available),
807                tags!(self.tags, "type" => "write"),
808            ));
809            metrics.push(self.create_metric(
810                "mongod_wiredtiger_concurrent_transactions_out_tickets",
811                gauge!(stat.concurrent_transactions.read.out),
812                tags!(self.tags, "type" => "read"),
813            ));
814            metrics.push(self.create_metric(
815                "mongod_wiredtiger_concurrent_transactions_out_tickets",
816                gauge!(stat.concurrent_transactions.write.out),
817                tags!(self.tags, "type" => "write"),
818            ));
819            metrics.push(self.create_metric(
820                "mongod_wiredtiger_concurrent_transactions_total_tickets",
821                gauge!(stat.concurrent_transactions.read.total_tickets),
822                tags!(self.tags, "type" => "read"),
823            ));
824            metrics.push(self.create_metric(
825                "mongod_wiredtiger_concurrent_transactions_total_tickets",
826                gauge!(stat.concurrent_transactions.write.total_tickets),
827                tags!(self.tags, "type" => "write"),
828            ));
829
830            // mongod_wiredtiger_log_*
831            metrics.push(self.create_metric(
832                "mongod_wiredtiger_log_bytes_total",
833                counter!(stat.log.bytes_payload_data),
834                tags!(self.tags, "type" => "payload"),
835            ));
836            metrics.push(self.create_metric(
837                "mongod_wiredtiger_log_bytes_total",
838                counter!(stat.log.bytes_written),
839                tags!(self.tags, "type" => "written"),
840            ));
841            metrics.push(self.create_metric(
842                "mongod_wiredtiger_log_operations_total",
843                counter!(stat.log.log_writes),
844                tags!(self.tags, "type" => "write"),
845            ));
846            metrics.push(self.create_metric(
847                "mongod_wiredtiger_log_operations_total",
848                counter!(stat.log.log_scans),
849                tags!(self.tags, "type" => "scan"),
850            ));
851            metrics.push(self.create_metric(
852                "mongod_wiredtiger_log_operations_total",
853                counter!(stat.log.log_scans_double),
854                tags!(self.tags, "type" => "scan_double"),
855            ));
856            metrics.push(self.create_metric(
857                "mongod_wiredtiger_log_operations_total",
858                counter!(stat.log.log_syncs),
859                tags!(self.tags, "type" => "sync"),
860            ));
861            metrics.push(self.create_metric(
862                "mongod_wiredtiger_log_operations_total",
863                counter!(stat.log.log_sync_dirs),
864                tags!(self.tags, "type" => "sync_dir"),
865            ));
866            metrics.push(self.create_metric(
867                "mongod_wiredtiger_log_operations_total",
868                counter!(stat.log.log_flushes),
869                tags!(self.tags, "type" => "flush"),
870            ));
871            metrics.push(self.create_metric(
872                "mongod_wiredtiger_log_records_scanned_total",
873                counter!(stat.log.records_compressed),
874                tags!(self.tags, "type" => "compressed"),
875            ));
876            metrics.push(self.create_metric(
877                "mongod_wiredtiger_log_records_scanned_total",
878                counter!(stat.log.records_uncompressed),
879                tags!(self.tags, "type" => "uncompressed"),
880            ));
881            metrics.push(self.create_metric(
882                "mongod_wiredtiger_log_records_total",
883                counter!(stat.log.records_processed_log_scan),
884                tags!(self.tags),
885            ));
886
887            // mongod_wiredtiger_session_open_sessions
888            metrics.push(self.create_metric(
889                "mongod_wiredtiger_session_open_sessions",
890                gauge!(stat.session.sessions),
891                tags!(self.tags),
892            ));
893
894            // mongod_wiredtiger_transactions_*
895            metrics.push(self.create_metric(
896                "mongod_wiredtiger_transactions_checkpoint_seconds",
897                gauge!(stat.transaction.checkpoint_min_ms / 1000),
898                tags!(self.tags, "type" => "min"),
899            ));
900            metrics.push(self.create_metric(
901                "mongod_wiredtiger_transactions_checkpoint_seconds",
902                gauge!(stat.transaction.checkpoint_max_ms / 1000),
903                tags!(self.tags, "type" => "max"),
904            ));
905            metrics.push(self.create_metric(
906                "mongod_wiredtiger_transactions_checkpoint_seconds_total",
907                counter!(stat.transaction.checkpoint_total_ms / 1000),
908                tags!(self.tags),
909            ));
910            metrics.push(self.create_metric(
911                "mongod_wiredtiger_transactions_running_checkpoints",
912                gauge!(stat.transaction.checkpoints_running),
913                tags!(self.tags),
914            ));
915            metrics.push(self.create_metric(
916                "mongod_wiredtiger_transactions_total",
917                counter!(stat.transaction.begins),
918                tags!(self.tags, "type" => "begins"),
919            ));
920            metrics.push(self.create_metric(
921                "mongod_wiredtiger_transactions_total",
922                counter!(stat.transaction.checkpoints),
923                tags!(self.tags, "type" => "checkpoints"),
924            ));
925            metrics.push(self.create_metric(
926                "mongod_wiredtiger_transactions_total",
927                counter!(stat.transaction.committed),
928                tags!(self.tags, "type" => "committed"),
929            ));
930            metrics.push(self.create_metric(
931                "mongod_wiredtiger_transactions_total",
932                counter!(stat.transaction.rolled_back),
933                tags!(self.tags, "type" => "rolledback"),
934            ));
935        }
936
937        // network_*
938        metrics.push(self.create_metric(
939            "network_bytes_total",
940            counter!(status.network.bytes_in),
941            tags!(self.tags, "state" => "bytes_in"),
942        ));
943        metrics.push(self.create_metric(
944            "network_bytes_total",
945            counter!(status.network.bytes_out),
946            tags!(self.tags, "state" => "bytes_out"),
947        ));
948        metrics.push(self.create_metric(
949            "network_metrics_num_requests_total",
950            counter!(status.network.num_requests),
951            tags!(self.tags),
952        ));
953
954        // op_counters_repl_total
955        for (r#type, value) in status.opcounters {
956            metrics.push(self.create_metric(
957                "op_counters_repl_total",
958                counter!(value),
959                tags!(self.tags, "type" => r#type),
960            ));
961        }
962
963        // op_counters_total
964        for (r#type, value) in status.opcounters_repl {
965            metrics.push(self.create_metric(
966                "op_counters_total",
967                counter!(value),
968                tags!(self.tags, "type" => r#type),
969            ));
970        }
971
972        Ok(metrics)
973    }
974}
975
976fn bson_size(value: &Bson) -> usize {
977    match value {
978        Bson::Double(value) => value.size_of(),
979        Bson::String(value) => value.size_of(),
980        Bson::Array(value) => value.iter().map(bson_size).sum(),
981        Bson::Document(value) => document_size(value),
982        Bson::Boolean(_) => std::mem::size_of::<bool>(),
983        Bson::RegularExpression(value) => value.pattern.size_of(),
984        Bson::JavaScriptCode(value) => value.size_of(),
985        Bson::JavaScriptCodeWithScope(value) => value.code.size_of() + document_size(&value.scope),
986        Bson::Int32(value) => value.size_of(),
987        Bson::Int64(value) => value.size_of(),
988        Bson::Timestamp(value) => value.time.size_of() + value.increment.size_of(),
989        Bson::Binary(value) => value.bytes.size_of(),
990        Bson::ObjectId(value) => value.bytes().size_of(),
991        Bson::DateTime(_) => std::mem::size_of::<i64>(),
992        Bson::Symbol(value) => value.size_of(),
993        Bson::Decimal128(value) => value.bytes().size_of(),
994        Bson::DbPointer(_) => {
995            // DbPointer parts are not public and cannot be evaluated
996            0
997        }
998        Bson::Null | Bson::Undefined | Bson::MaxKey | Bson::MinKey => 0,
999    }
1000}
1001
1002fn document_size(doc: &Document) -> usize {
1003    doc.into_iter()
1004        .map(|(key, value)| key.size_of() + bson_size(value))
1005        .sum()
1006}
1007
1008/// Remove credentials from endpoint.
1009/// URI components: <https://docs.mongodb.com/manual/reference/connection-string/#components>
1010/// It's not possible to use [url::Url](https://docs.rs/url/2.1.1/url/struct.Url.html) because connection string can have multiple hosts.
1011/// Would be nice to serialize [ClientOptions](https://docs.rs/mongodb/1.1.1/mongodb/options/struct.ClientOptions.html) to String, but it's not supported.
1012/// `endpoint` argument would not be required, but field `original_uri` in `ClientOptions` is private.
1013/// `.unwrap()` in function is safe because endpoint was already verified by `ClientOptions`.
1014/// Based on ClientOptions::parse_uri -- <https://github.com/mongodb/mongo-rust-driver/blob/09e1193f93dcd850ebebb7fb82f6ab786fd85de1/src/client/options/mod.rs#L708>
1015fn sanitize_endpoint(endpoint: &str, options: &ClientOptions) -> String {
1016    let mut endpoint = endpoint.to_owned();
1017    if options.credential.is_some() {
1018        let start = endpoint.find("://").unwrap() + 3;
1019
1020        // Split `username:password@host[:port]` and `/defaultauthdb?<options>`
1021        let pre_slash = match endpoint[start..].find('/') {
1022            Some(index) => {
1023                let mut segments = endpoint[start..].split_at(index);
1024                // If we have databases and options
1025                if segments.1.len() > 1 {
1026                    let lstart = start + segments.0.len() + 1;
1027                    let post_slash = &segments.1[1..];
1028                    // Split `/defaultauthdb` and `?<options>`
1029                    if let Some(index) = post_slash.find('?') {
1030                        let segments = post_slash.split_at(index);
1031                        // If we have options
1032                        if segments.1.len() > 1 {
1033                            // Remove authentication options
1034                            let options = segments.1[1..]
1035                                .split('&')
1036                                .filter(|pair| {
1037                                    let (key, _) = pair.split_at(pair.find('=').unwrap());
1038                                    !matches!(
1039                                        key.to_lowercase().as_str(),
1040                                        "authsource" | "authmechanism" | "authmechanismproperties"
1041                                    )
1042                                })
1043                                .collect::<Vec<_>>()
1044                                .join("&");
1045
1046                            // Update options in endpoint
1047                            endpoint = format!(
1048                                "{}{}",
1049                                &endpoint[..lstart + segments.0.len() + 1],
1050                                &options
1051                            );
1052                        }
1053                    }
1054                    segments = endpoint[start..].split_at(index);
1055                }
1056                segments.0
1057            }
1058            None => &endpoint[start..],
1059        };
1060
1061        // Remove `username:password@`
1062        let end = pre_slash.rfind('@').unwrap() + 1;
1063        endpoint = format!("{}{}", &endpoint[0..start], &endpoint[start + end..]);
1064    }
1065    endpoint
1066}
1067
1068#[cfg(test)]
1069mod tests {
1070    use super::*;
1071
1072    #[test]
1073    fn generate_config() {
1074        crate::test_util::test_generate_config::<MongoDbMetricsConfig>();
1075    }
1076
1077    #[tokio::test]
1078    async fn sanitize_endpoint_test() {
1079        let endpoint = "mongodb://myDBReader:D1fficultP%40ssw0rd@mongos0.example.com:27017,mongos1.example.com:27017,mongos2.example.com:27017/?authSource=admin&tls=true";
1080        let client_options = ClientOptions::parse(endpoint).await.unwrap();
1081        let endpoint = sanitize_endpoint(endpoint, &client_options);
1082        assert_eq!(
1083            &endpoint,
1084            "mongodb://mongos0.example.com:27017,mongos1.example.com:27017,mongos2.example.com:27017/?tls=true"
1085        );
1086    }
1087}
1088
1089#[cfg(all(test, feature = "mongodb_metrics-integration-tests"))]
1090mod integration_tests {
1091    use futures::StreamExt;
1092    use tokio::time::{Duration, timeout};
1093
1094    use super::*;
1095    use crate::{
1096        SourceSender,
1097        test_util::{
1098            components::{PULL_SOURCE_TAGS, assert_source_compliance},
1099            trace_init,
1100        },
1101    };
1102
1103    fn primary_mongo_address() -> String {
1104        std::env::var("PRIMARY_MONGODB_ADDRESS")
1105            .unwrap_or_else(|_| "mongodb://localhost:27017".into())
1106    }
1107
1108    fn secondary_mongo_address() -> String {
1109        std::env::var("SECONDARY_MONGODB_ADDRESS")
1110            .unwrap_or_else(|_| "mongodb://localhost:27019".into())
1111    }
1112
1113    fn remove_creds(address: &str) -> String {
1114        let mut url = url::Url::parse(address).unwrap();
1115        url.set_password(None).unwrap();
1116        url.set_username("").unwrap();
1117        url.to_string()
1118    }
1119
1120    async fn test_instance(endpoint: String) {
1121        assert_source_compliance(&PULL_SOURCE_TAGS, async {
1122            let host = ClientOptions::parse(endpoint.as_str()).await.unwrap().hosts[0].to_string();
1123            let namespace = "vector_mongodb";
1124
1125            let (sender, mut recv) = SourceSender::new_test();
1126
1127            let endpoints = vec![endpoint.clone()];
1128            tokio::spawn(async move {
1129                MongoDbMetricsConfig {
1130                    endpoints,
1131                    scrape_interval_secs: Duration::from_secs(15),
1132                    namespace: namespace.to_owned(),
1133                }
1134                .build(SourceContext::new_test(sender, None))
1135                .await
1136                .unwrap()
1137                .await
1138                .unwrap()
1139            });
1140
1141            // TODO: We should have a simpler/cleaner method for this sort of collection, where we're essentially waiting
1142            // for a burst of events, and want to debounce ourselves in terms of stopping collection once all events in the
1143            // burst have been collected. This code here isn't bad or anything... I've just noticed now that we do it in a
1144            // few places, and we could solve it in a cleaner way, most likely.
1145            let event = timeout(Duration::from_secs(30), recv.next())
1146                .await
1147                .expect("fetch metrics timeout")
1148                .expect("failed to get metrics from a stream");
1149            let mut events = vec![event];
1150            loop {
1151                match timeout(Duration::from_millis(10), recv.next()).await {
1152                    Ok(Some(event)) => events.push(event),
1153                    Ok(None) => break,
1154                    Err(_) => break,
1155                }
1156            }
1157
1158            let clean_endpoint = remove_creds(&endpoint);
1159
1160            assert!(events.len() > 100);
1161            for event in events {
1162                let metric = event.into_metric();
1163                // validate namespace
1164                assert!(metric.namespace() == Some(namespace));
1165                // validate timestamp
1166                let timestamp = metric.timestamp().expect("existed timestamp");
1167                assert!((timestamp - Utc::now()).num_seconds() < 1);
1168                // validate basic tags
1169                let tags = metric.tags().expect("existed tags");
1170                assert_eq!(tags.get("endpoint"), Some(&clean_endpoint[..]));
1171                assert_eq!(tags.get("host"), Some(&host[..]));
1172            }
1173        })
1174        .await;
1175    }
1176
1177    #[tokio::test]
1178    async fn fetch_metrics_mongod() {
1179        trace_init();
1180        test_instance(primary_mongo_address()).await;
1181    }
1182
1183    // TODO
1184    // #[tokio::test]
1185    // async fn fetch_metrics_mongos() {
1186    //     trace_init();
1187    //     test_instance("mongodb://localhost:27018").await;
1188    // }
1189
1190    #[tokio::test]
1191    async fn fetch_metrics_replset() {
1192        trace_init();
1193        test_instance(secondary_mongo_address()).await;
1194    }
1195}