vector/sinks/elasticsearch/
config.rs

1use std::{
2    collections::{BTreeMap, HashMap},
3    convert::TryFrom,
4};
5
6use futures::{FutureExt, TryFutureExt};
7use vector_lib::{
8    configurable::configurable_component,
9    lookup::{event_path, lookup_v2::ConfigValuePath},
10    schema::Requirement,
11};
12use vrl::value::Kind;
13
14use crate::{
15    codecs::Transformer,
16    config::{AcknowledgementsConfig, DataType, Input, SinkConfig, SinkContext},
17    event::{EventRef, LogEvent, Value},
18    http::{HttpClient, QueryParameters},
19    internal_events::TemplateRenderingError,
20    sinks::{
21        Healthcheck, VectorSink,
22        elasticsearch::{
23            ElasticsearchApiVersion, ElasticsearchAuthConfig, ElasticsearchCommon,
24            ElasticsearchCommonMode, ElasticsearchMode, VersionType,
25            health::ElasticsearchHealthLogic,
26            retry::ElasticsearchRetryLogic,
27            service::{ElasticsearchService, HttpRequestBuilder},
28            sink::ElasticsearchSink,
29        },
30        util::{
31            BatchConfig, Compression, RealtimeSizeBasedDefaultBatchSettings, http::RequestConfig,
32            service::HealthConfig,
33        },
34    },
35    template::Template,
36    tls::TlsConfig,
37    transforms::metric_to_log::MetricToLogConfig,
38};
39
40/// The field name for the timestamp required by data stream mode
41pub const DATA_STREAM_TIMESTAMP_KEY: &str = "@timestamp";
42
43/// The Amazon OpenSearch service type, either managed or serverless; primarily, selects the
44/// correct AWS service to use when calculating the AWS v4 signature + disables features
45/// unsupported by serverless: Elasticsearch API version autodetection, health checks
46#[configurable_component]
47#[derive(Clone, Debug, Eq, PartialEq)]
48#[serde(deny_unknown_fields, rename_all = "lowercase")]
49#[derive(Default)]
50pub enum OpenSearchServiceType {
51    /// Elasticsearch or OpenSearch Managed domain
52    #[default]
53    Managed,
54    /// OpenSearch Serverless collection
55    Serverless,
56}
57
58impl OpenSearchServiceType {
59    pub const fn as_str(&self) -> &'static str {
60        match self {
61            OpenSearchServiceType::Managed => "es",
62            OpenSearchServiceType::Serverless => "aoss",
63        }
64    }
65}
66
67/// Configuration for the `elasticsearch` sink.
68#[configurable_component(sink("elasticsearch", "Index observability events in Elasticsearch."))]
69#[derive(Clone, Debug)]
70#[serde(deny_unknown_fields)]
71pub struct ElasticsearchConfig {
72    /// The Elasticsearch endpoint to send logs to.
73    ///
74    /// The endpoint must contain an HTTP scheme, and may specify a
75    /// hostname or IP address and port.
76    #[serde(default)]
77    #[configurable(
78        deprecated = "This option has been deprecated, the `endpoints` option should be used instead."
79    )]
80    pub endpoint: Option<String>,
81
82    /// A list of Elasticsearch endpoints to send logs to.
83    ///
84    /// The endpoint must contain an HTTP scheme, and may specify a
85    /// hostname or IP address and port.
86    /// The endpoint may include basic authentication credentials,
87    /// e.g., `https://user:password@example.com`. If credentials are provided in the endpoint,
88    /// they will be used to authenticate against Elasticsearch.
89    ///
90    /// If `auth` is specified and the endpoint contains credentials,
91    /// a configuration error will be raised.
92    #[serde(default)]
93    #[configurable(metadata(docs::examples = "http://10.24.32.122:9000"))]
94    #[configurable(metadata(docs::examples = "https://example.com"))]
95    #[configurable(metadata(docs::examples = "https://user:password@example.com"))]
96    pub endpoints: Vec<String>,
97
98    /// The [`doc_type`][doc_type] for your index data.
99    ///
100    /// This is only relevant for Elasticsearch <= 6.X. If you are using >= 7.0 you do not need to
101    /// set this option since Elasticsearch has removed it.
102    ///
103    /// [doc_type]: https://www.elastic.co/guide/en/elasticsearch/reference/6.8/actions-index.html
104    #[serde(default = "default_doc_type")]
105    #[configurable(metadata(docs::advanced))]
106    pub doc_type: String,
107
108    /// The API version of Elasticsearch.
109    ///
110    /// Amazon OpenSearch Serverless requires this option to be set to `auto` (the default).
111    #[serde(default)]
112    #[configurable(derived)]
113    pub api_version: ElasticsearchApiVersion,
114
115    /// Whether or not to send the `type` field to Elasticsearch.
116    ///
117    /// The `type` field was deprecated in Elasticsearch 7.x and removed in Elasticsearch 8.x.
118    ///
119    /// If enabled, the `doc_type` option is ignored.
120    #[serde(default)]
121    #[configurable(
122        deprecated = "This option has been deprecated, the `api_version` option should be used instead."
123    )]
124    pub suppress_type_name: bool,
125
126    /// Whether or not to retry successful requests containing partial failures.
127    ///
128    /// To avoid duplicates in Elasticsearch, please use option `id_key`.
129    #[serde(default)]
130    #[configurable(metadata(docs::advanced))]
131    pub request_retry_partial: bool,
132
133    /// The name of the event key that should map to Elasticsearch’s [`_id` field][es_id].
134    ///
135    /// By default, the `_id` field is not set, which allows Elasticsearch to set this
136    /// automatically. Setting your own Elasticsearch IDs can [hinder performance][perf_doc].
137    ///
138    /// [es_id]: https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-id-field.html
139    /// [perf_doc]: https://www.elastic.co/guide/en/elasticsearch/reference/master/tune-for-indexing-speed.html#_use_auto_generated_ids
140    #[serde(default)]
141    #[configurable(metadata(docs::advanced))]
142    #[configurable(metadata(docs::examples = "id"))]
143    #[configurable(metadata(docs::examples = "_id"))]
144    pub id_key: Option<ConfigValuePath>,
145
146    /// The name of the pipeline to apply.
147    #[serde(default)]
148    #[configurable(metadata(docs::advanced))]
149    #[configurable(metadata(docs::examples = "pipeline-name"))]
150    pub pipeline: Option<String>,
151
152    #[serde(default)]
153    #[configurable(derived)]
154    pub mode: ElasticsearchMode,
155
156    #[serde(default)]
157    #[configurable(derived)]
158    pub compression: Compression,
159
160    #[serde(skip_serializing_if = "crate::serde::is_default", default)]
161    #[configurable(derived)]
162    #[configurable(metadata(docs::advanced))]
163    pub encoding: Transformer,
164
165    #[serde(default)]
166    #[configurable(derived)]
167    pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
168
169    #[serde(default)]
170    #[configurable(derived)]
171    pub request: RequestConfig,
172
173    #[configurable(derived)]
174    pub auth: Option<ElasticsearchAuthConfig>,
175
176    /// Custom parameters to add to the query string for each HTTP request sent to Elasticsearch.
177    #[serde(default)]
178    #[configurable(metadata(docs::advanced))]
179    #[configurable(metadata(docs::additional_props_description = "A query string parameter."))]
180    #[configurable(metadata(docs::examples = "query_examples()"))]
181    pub query: Option<QueryParameters>,
182
183    #[serde(default)]
184    #[configurable(derived)]
185    #[cfg(feature = "aws-core")]
186    pub aws: Option<crate::aws::RegionOrEndpoint>,
187
188    /// Amazon OpenSearch service type
189    #[serde(default)]
190    pub opensearch_service_type: OpenSearchServiceType,
191
192    #[serde(default)]
193    #[configurable(derived)]
194    pub tls: Option<TlsConfig>,
195
196    #[serde(default)]
197    #[configurable(derived)]
198    #[serde(rename = "distribution")]
199    pub endpoint_health: Option<HealthConfig>,
200
201    // TODO: `bulk` and `data_stream` are each only relevant if the `mode` is set to their
202    // corresponding mode. An improvement to look into would be to extract the `BulkConfig` and
203    // `DataStreamConfig` into the `mode` enum variants. Doing so would remove them from the root
204    // of the config here and thus any post serde config parsing manual error prone logic.
205    #[serde(alias = "normal", default)]
206    #[configurable(derived)]
207    pub bulk: BulkConfig,
208
209    #[serde(default)]
210    #[configurable(derived)]
211    pub data_stream: Option<DataStreamConfig>,
212
213    #[serde(default)]
214    #[configurable(derived)]
215    pub metrics: Option<MetricToLogConfig>,
216
217    #[serde(
218        default,
219        deserialize_with = "crate::serde::bool_or_struct",
220        skip_serializing_if = "crate::serde::is_default"
221    )]
222    #[configurable(derived)]
223    pub acknowledgements: AcknowledgementsConfig,
224}
225
226fn default_doc_type() -> String {
227    "_doc".to_owned()
228}
229
230fn query_examples() -> HashMap<String, String> {
231    HashMap::<_, _>::from_iter([("X-Powered-By".to_owned(), "Vector".to_owned())])
232}
233
234impl Default for ElasticsearchConfig {
235    fn default() -> Self {
236        Self {
237            endpoint: None,
238            endpoints: vec![],
239            doc_type: default_doc_type(),
240            api_version: Default::default(),
241            suppress_type_name: false,
242            request_retry_partial: false,
243            id_key: None,
244            pipeline: None,
245            mode: Default::default(),
246            compression: Default::default(),
247            encoding: Default::default(),
248            batch: Default::default(),
249            request: Default::default(),
250            auth: None,
251            query: None,
252            #[cfg(feature = "aws-core")]
253            aws: None,
254            opensearch_service_type: Default::default(),
255            tls: None,
256            endpoint_health: None,
257            bulk: BulkConfig::default(), // the default mode is Bulk
258            data_stream: None,
259            metrics: None,
260            acknowledgements: Default::default(),
261        }
262    }
263}
264
265impl ElasticsearchConfig {
266    pub fn common_mode(&self) -> crate::Result<ElasticsearchCommonMode> {
267        match self.mode {
268            ElasticsearchMode::Bulk => Ok(ElasticsearchCommonMode::Bulk {
269                index: self.bulk.index.clone(),
270                template_fallback_index: self.bulk.template_fallback_index.clone(),
271                action: self.bulk.action.clone(),
272                version: self.bulk.version.clone(),
273                version_type: self.bulk.version_type,
274            }),
275            ElasticsearchMode::DataStream => Ok(ElasticsearchCommonMode::DataStream(
276                self.data_stream.clone().unwrap_or_default(),
277            )),
278        }
279    }
280}
281
282/// Elasticsearch bulk mode configuration.
283#[configurable_component]
284#[derive(Clone, Debug, PartialEq)]
285#[serde(rename_all = "snake_case")]
286pub struct BulkConfig {
287    /// Action to use when making requests to the [Elasticsearch Bulk API][es_bulk].
288    ///
289    /// Only `index`, `create` and `update` actions are supported.
290    ///
291    /// [es_bulk]: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
292    #[serde(default = "default_bulk_action")]
293    #[configurable(metadata(docs::examples = "create"))]
294    #[configurable(metadata(docs::examples = "{{ action }}"))]
295    pub action: Template,
296
297    /// The name of the index to write events to.
298    #[serde(default = "default_index")]
299    #[configurable(metadata(docs::examples = "application-{{ application_id }}-%Y-%m-%d"))]
300    #[configurable(metadata(docs::examples = "{{ index }}"))]
301    pub index: Template,
302
303    /// The default index to write events to if the template in `bulk.index` cannot be resolved
304    #[configurable(metadata(docs::examples = "test-index"))]
305    pub template_fallback_index: Option<String>,
306
307    /// Version field value.
308    #[configurable(metadata(docs::examples = "{{ obj_version }}-%Y-%m-%d"))]
309    #[configurable(metadata(docs::examples = "123"))]
310    pub version: Option<Template>,
311
312    /// Version type.
313    ///
314    /// Possible values are `internal`, `external` or `external_gt` and `external_gte`.
315    ///
316    /// [es_index_versioning]: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html#index-versioning
317    #[serde(default = "default_version_type")]
318    #[configurable(metadata(docs::examples = "internal"))]
319    #[configurable(metadata(docs::examples = "external"))]
320    pub version_type: VersionType,
321}
322
323fn default_bulk_action() -> Template {
324    Template::try_from("index").expect("unable to parse template")
325}
326
327fn default_index() -> Template {
328    Template::try_from("vector-%Y.%m.%d").expect("unable to parse template")
329}
330
331const fn default_version_type() -> VersionType {
332    VersionType::Internal
333}
334
335impl Default for BulkConfig {
336    fn default() -> Self {
337        Self {
338            action: default_bulk_action(),
339            index: default_index(),
340            template_fallback_index: Default::default(),
341            version: Default::default(),
342            version_type: default_version_type(),
343        }
344    }
345}
346
347/// Elasticsearch data stream mode configuration.
348#[configurable_component]
349#[derive(Clone, Debug)]
350#[serde(rename_all = "snake_case")]
351pub struct DataStreamConfig {
352    /// The data stream type used to construct the data stream at index time.
353    #[serde(rename = "type", default = "DataStreamConfig::default_type")]
354    #[configurable(metadata(docs::examples = "metrics"))]
355    #[configurable(metadata(docs::examples = "synthetics"))]
356    #[configurable(metadata(docs::examples = "{{ type }}"))]
357    pub dtype: Template,
358
359    /// The data stream dataset used to construct the data stream at index time.
360    #[serde(default = "DataStreamConfig::default_dataset")]
361    #[configurable(metadata(docs::examples = "generic"))]
362    #[configurable(metadata(docs::examples = "nginx"))]
363    #[configurable(metadata(docs::examples = "{{ service }}"))]
364    pub dataset: Template,
365
366    /// The data stream namespace used to construct the data stream at index time.
367    #[serde(default = "DataStreamConfig::default_namespace")]
368    #[configurable(metadata(docs::examples = "{{ environment }}"))]
369    pub namespace: Template,
370
371    /// Automatically routes events by deriving the data stream name using specific event fields.
372    ///
373    /// The format of the data stream name is `<type>-<dataset>-<namespace>`, where each value comes
374    /// from the `data_stream` configuration field of the same name.
375    ///
376    /// If enabled, the value of the `data_stream.type`, `data_stream.dataset`, and
377    /// `data_stream.namespace` event fields are used if they are present. Otherwise, the values
378    /// set in this configuration are used.
379    #[serde(default = "DataStreamConfig::default_auto_routing")]
380    pub auto_routing: bool,
381
382    /// Automatically adds and syncs the `data_stream.*` event fields if they are missing from the event.
383    ///
384    /// This ensures that fields match the name of the data stream that is receiving events.
385    #[serde(default = "DataStreamConfig::default_sync_fields")]
386    pub sync_fields: bool,
387}
388
389impl Default for DataStreamConfig {
390    fn default() -> Self {
391        Self {
392            dtype: Self::default_type(),
393            dataset: Self::default_dataset(),
394            namespace: Self::default_namespace(),
395            auto_routing: Self::default_auto_routing(),
396            sync_fields: Self::default_sync_fields(),
397        }
398    }
399}
400
401impl DataStreamConfig {
402    fn default_type() -> Template {
403        Template::try_from("logs").expect("couldn't build default type template")
404    }
405
406    fn default_dataset() -> Template {
407        Template::try_from("generic").expect("couldn't build default dataset template")
408    }
409
410    fn default_namespace() -> Template {
411        Template::try_from("default").expect("couldn't build default namespace template")
412    }
413
414    const fn default_auto_routing() -> bool {
415        true
416    }
417
418    const fn default_sync_fields() -> bool {
419        true
420    }
421
422    /// If there is a `timestamp` field, rename it to the expected `@timestamp` for Elastic Common Schema.
423    pub fn remap_timestamp(&self, log: &mut LogEvent) {
424        if let Some(timestamp_key) = log.timestamp_path().cloned() {
425            if timestamp_key.to_string() == DATA_STREAM_TIMESTAMP_KEY {
426                return;
427            }
428
429            log.rename_key(&timestamp_key, event_path!(DATA_STREAM_TIMESTAMP_KEY));
430        }
431    }
432
433    pub fn dtype<'a>(&self, event: impl Into<EventRef<'a>>) -> Option<String> {
434        self.dtype
435            .render_string(event)
436            .map_err(|error| {
437                emit!(TemplateRenderingError {
438                    error,
439                    field: Some("data_stream.type"),
440                    drop_event: true,
441                });
442            })
443            .ok()
444    }
445
446    pub fn dataset<'a>(&self, event: impl Into<EventRef<'a>>) -> Option<String> {
447        self.dataset
448            .render_string(event)
449            .map_err(|error| {
450                emit!(TemplateRenderingError {
451                    error,
452                    field: Some("data_stream.dataset"),
453                    drop_event: true,
454                });
455            })
456            .ok()
457    }
458
459    pub fn namespace<'a>(&self, event: impl Into<EventRef<'a>>) -> Option<String> {
460        self.namespace
461            .render_string(event)
462            .map_err(|error| {
463                emit!(TemplateRenderingError {
464                    error,
465                    field: Some("data_stream.namespace"),
466                    drop_event: true,
467                });
468            })
469            .ok()
470    }
471
472    pub fn sync_fields(&self, log: &mut LogEvent) {
473        if !self.sync_fields {
474            return;
475        }
476
477        let dtype = self.dtype(&*log);
478        let dataset = self.dataset(&*log);
479        let namespace = self.namespace(&*log);
480
481        if log.as_map().is_none() {
482            *log.value_mut() = Value::Object(BTreeMap::new());
483        }
484        let existing = log
485            .as_map_mut()
486            .expect("must be a map")
487            .entry("data_stream".into())
488            .or_insert_with(|| Value::Object(BTreeMap::new()))
489            .as_object_mut_unwrap();
490
491        if let Some(dtype) = dtype {
492            existing
493                .entry("type".into())
494                .or_insert_with(|| dtype.into());
495        }
496        if let Some(dataset) = dataset {
497            existing
498                .entry("dataset".into())
499                .or_insert_with(|| dataset.into());
500        }
501        if let Some(namespace) = namespace {
502            existing
503                .entry("namespace".into())
504                .or_insert_with(|| namespace.into());
505        }
506    }
507
508    pub fn index(&self, log: &LogEvent) -> Option<String> {
509        let (dtype, dataset, namespace) = if !self.auto_routing {
510            (self.dtype(log)?, self.dataset(log)?, self.namespace(log)?)
511        } else {
512            let data_stream = log
513                .get(event_path!("data_stream"))
514                .and_then(|ds| ds.as_object());
515            let dtype = data_stream
516                .and_then(|ds| ds.get("type"))
517                .map(|value| value.to_string_lossy().into_owned())
518                .or_else(|| self.dtype(log))?;
519            let dataset = data_stream
520                .and_then(|ds| ds.get("dataset"))
521                .map(|value| value.to_string_lossy().into_owned())
522                .or_else(|| self.dataset(log))?;
523            let namespace = data_stream
524                .and_then(|ds| ds.get("namespace"))
525                .map(|value| value.to_string_lossy().into_owned())
526                .or_else(|| self.namespace(log))?;
527            (dtype, dataset, namespace)
528        };
529
530        let name = [dtype, dataset, namespace]
531            .into_iter()
532            .filter(|s| !s.is_empty())
533            .collect::<Vec<_>>()
534            .join("-");
535
536        Some(name)
537    }
538}
539
540#[async_trait::async_trait]
541#[typetag::serde(name = "elasticsearch")]
542impl SinkConfig for ElasticsearchConfig {
543    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
544        let commons = ElasticsearchCommon::parse_many(self, cx.proxy()).await?;
545        let common = commons[0].clone();
546
547        let client = HttpClient::new(common.tls_settings.clone(), cx.proxy())?;
548
549        let request_limits = self.request.tower.into_settings();
550
551        let health_config = self.endpoint_health.clone().unwrap_or_default();
552
553        let services = commons
554            .iter()
555            .cloned()
556            .map(|common| {
557                let endpoint = common.base_url.clone();
558
559                let http_request_builder = HttpRequestBuilder::new(&common, self);
560                let service = ElasticsearchService::new(client.clone(), http_request_builder);
561
562                (endpoint, service)
563            })
564            .collect::<Vec<_>>();
565
566        let service = request_limits.distributed_service(
567            ElasticsearchRetryLogic {
568                retry_partial: self.request_retry_partial,
569            },
570            services,
571            health_config,
572            ElasticsearchHealthLogic,
573            1,
574        );
575
576        let sink = ElasticsearchSink::new(&common, self, service)?;
577
578        let stream = VectorSink::from_event_streamsink(sink);
579
580        let healthcheck = futures::future::select_ok(
581            commons
582                .into_iter()
583                .map(move |common| common.healthcheck(client.clone()).boxed()),
584        )
585        .map_ok(|((), _)| ())
586        .boxed();
587        Ok((stream, healthcheck))
588    }
589
590    fn input(&self) -> Input {
591        let requirements = Requirement::empty().optional_meaning("timestamp", Kind::timestamp());
592
593        Input::new(DataType::Metric | DataType::Log).with_schema_requirement(requirements)
594    }
595
596    fn acknowledgements(&self) -> &AcknowledgementsConfig {
597        &self.acknowledgements
598    }
599}
600
601#[cfg(test)]
602mod tests {
603    use super::*;
604
605    #[test]
606    fn generate_config() {
607        crate::test_util::test_generate_config::<ElasticsearchConfig>();
608    }
609
610    #[test]
611    fn parse_aws_auth() {
612        toml::from_str::<ElasticsearchConfig>(
613            r#"
614            endpoints = [""]
615            auth.strategy = "aws"
616            auth.assume_role = "role"
617        "#,
618        )
619        .unwrap();
620
621        toml::from_str::<ElasticsearchConfig>(
622            r#"
623            endpoints = [""]
624            auth.strategy = "aws"
625        "#,
626        )
627        .unwrap();
628    }
629
630    #[test]
631    fn parse_mode() {
632        let config = toml::from_str::<ElasticsearchConfig>(
633            r#"
634            endpoints = [""]
635            mode = "data_stream"
636            data_stream.type = "synthetics"
637        "#,
638        )
639        .unwrap();
640        assert!(matches!(config.mode, ElasticsearchMode::DataStream));
641        assert!(config.data_stream.is_some());
642    }
643
644    #[test]
645    fn parse_distribution() {
646        toml::from_str::<ElasticsearchConfig>(
647            r#"
648            endpoints = ["", ""]
649            distribution.retry_initial_backoff_secs = 10
650        "#,
651        )
652        .unwrap();
653    }
654
655    #[test]
656    fn parse_version() {
657        let config = toml::from_str::<ElasticsearchConfig>(
658            r#"
659            endpoints = [""]
660            api_version = "v7"
661        "#,
662        )
663        .unwrap();
664        assert_eq!(config.api_version, ElasticsearchApiVersion::V7);
665    }
666
667    #[test]
668    fn parse_version_auto() {
669        let config = toml::from_str::<ElasticsearchConfig>(
670            r#"
671            endpoints = [""]
672            api_version = "auto"
673        "#,
674        )
675        .unwrap();
676        assert_eq!(config.api_version, ElasticsearchApiVersion::Auto);
677    }
678
679    #[test]
680    fn parse_default_bulk() {
681        let config = toml::from_str::<ElasticsearchConfig>(
682            r#"
683            endpoints = [""]
684        "#,
685        )
686        .unwrap();
687        assert_eq!(config.mode, ElasticsearchMode::Bulk);
688        assert_eq!(config.bulk, BulkConfig::default());
689    }
690
691    #[test]
692    fn parse_opensearch_service_type_managed() {
693        let config = toml::from_str::<ElasticsearchConfig>(
694            r#"
695            endpoints = [""]
696            opensearch_service_type = "managed"
697        "#,
698        )
699        .unwrap();
700        assert_eq!(
701            config.opensearch_service_type,
702            OpenSearchServiceType::Managed
703        );
704    }
705
706    #[test]
707    fn parse_opensearch_service_type_serverless() {
708        let config = toml::from_str::<ElasticsearchConfig>(
709            r#"
710            endpoints = [""]
711            opensearch_service_type = "serverless"
712            auth.strategy = "aws"
713            api_version = "auto"
714        "#,
715        )
716        .unwrap();
717        assert_eq!(
718            config.opensearch_service_type,
719            OpenSearchServiceType::Serverless
720        );
721    }
722
723    #[test]
724    fn parse_opensearch_service_type_default() {
725        let config = toml::from_str::<ElasticsearchConfig>(
726            r#"
727            endpoints = [""]
728        "#,
729        )
730        .unwrap();
731        assert_eq!(
732            config.opensearch_service_type,
733            OpenSearchServiceType::Managed
734        );
735    }
736
737    #[cfg(feature = "aws-core")]
738    #[test]
739    fn parse_opensearch_serverless_with_aws_auth() {
740        let config = toml::from_str::<ElasticsearchConfig>(
741            r#"
742            endpoints = [""]
743            opensearch_service_type = "serverless"
744            auth.strategy = "aws"
745            api_version = "auto"
746        "#,
747        )
748        .unwrap();
749        assert_eq!(
750            config.opensearch_service_type,
751            OpenSearchServiceType::Serverless
752        );
753        assert!(matches!(config.auth, Some(ElasticsearchAuthConfig::Aws(_))));
754        assert_eq!(config.api_version, ElasticsearchApiVersion::Auto);
755    }
756}