vector/sinks/elasticsearch/
config.rs

1use std::{
2    collections::{BTreeMap, HashMap},
3    convert::TryFrom,
4};
5
6use futures::{FutureExt, TryFutureExt};
7use vector_lib::{
8    configurable::configurable_component,
9    lookup::{event_path, lookup_v2::ConfigValuePath},
10    schema::Requirement,
11};
12use vrl::value::Kind;
13
14use crate::{
15    codecs::Transformer,
16    config::{AcknowledgementsConfig, DataType, Input, SinkConfig, SinkContext},
17    event::{EventRef, LogEvent, Value},
18    http::{HttpClient, QueryParameters},
19    internal_events::TemplateRenderingError,
20    sinks::{
21        Healthcheck, VectorSink,
22        elasticsearch::{
23            ElasticsearchApiVersion, ElasticsearchAuthConfig, ElasticsearchCommon,
24            ElasticsearchCommonMode, ElasticsearchMode, VersionType,
25            health::ElasticsearchHealthLogic,
26            retry::ElasticsearchRetryLogic,
27            service::{ElasticsearchService, HttpRequestBuilder},
28            sink::ElasticsearchSink,
29        },
30        util::{
31            BatchConfig, Compression, RealtimeSizeBasedDefaultBatchSettings, http::RequestConfig,
32            service::HealthConfig,
33        },
34    },
35    template::Template,
36    tls::TlsConfig,
37    transforms::metric_to_log::MetricToLogConfig,
38};
39
40/// The field name for the timestamp required by data stream mode
41pub const DATA_STREAM_TIMESTAMP_KEY: &str = "@timestamp";
42
43/// The Amazon OpenSearch service type, either managed or serverless; primarily, selects the
44/// correct AWS service to use when calculating the AWS v4 signature + disables features
45/// unsupported by serverless: Elasticsearch API version autodetection, health checks
46#[configurable_component]
47#[derive(Clone, Debug, Eq, PartialEq)]
48#[serde(deny_unknown_fields, rename_all = "lowercase")]
49pub enum OpenSearchServiceType {
50    /// Elasticsearch or OpenSearch Managed domain
51    Managed,
52    /// OpenSearch Serverless collection
53    Serverless,
54}
55
56impl OpenSearchServiceType {
57    pub const fn as_str(&self) -> &'static str {
58        match self {
59            OpenSearchServiceType::Managed => "es",
60            OpenSearchServiceType::Serverless => "aoss",
61        }
62    }
63}
64
65impl Default for OpenSearchServiceType {
66    fn default() -> Self {
67        Self::Managed
68    }
69}
70
71/// Configuration for the `elasticsearch` sink.
72#[configurable_component(sink("elasticsearch", "Index observability events in Elasticsearch."))]
73#[derive(Clone, Debug)]
74#[serde(deny_unknown_fields)]
75pub struct ElasticsearchConfig {
76    /// The Elasticsearch endpoint to send logs to.
77    ///
78    /// The endpoint must contain an HTTP scheme, and may specify a
79    /// hostname or IP address and port.
80    #[serde(default)]
81    #[configurable(
82        deprecated = "This option has been deprecated, the `endpoints` option should be used instead."
83    )]
84    pub endpoint: Option<String>,
85
86    /// A list of Elasticsearch endpoints to send logs to.
87    ///
88    /// The endpoint must contain an HTTP scheme, and may specify a
89    /// hostname or IP address and port.
90    /// The endpoint may include basic authentication credentials,
91    /// e.g., `https://user:password@example.com`. If credentials are provided in the endpoint,
92    /// they will be used to authenticate against Elasticsearch.
93    ///
94    /// If `auth` is specified and the endpoint contains credentials,
95    /// a configuration error will be raised.
96    #[serde(default)]
97    #[configurable(metadata(docs::examples = "http://10.24.32.122:9000"))]
98    #[configurable(metadata(docs::examples = "https://example.com"))]
99    #[configurable(metadata(docs::examples = "https://user:password@example.com"))]
100    pub endpoints: Vec<String>,
101
102    /// The [`doc_type`][doc_type] for your index data.
103    ///
104    /// This is only relevant for Elasticsearch <= 6.X. If you are using >= 7.0 you do not need to
105    /// set this option since Elasticsearch has removed it.
106    ///
107    /// [doc_type]: https://www.elastic.co/guide/en/elasticsearch/reference/6.8/actions-index.html
108    #[serde(default = "default_doc_type")]
109    #[configurable(metadata(docs::advanced))]
110    pub doc_type: String,
111
112    /// The API version of Elasticsearch.
113    ///
114    /// Amazon OpenSearch Serverless requires this option to be set to `auto` (the default).
115    #[serde(default)]
116    #[configurable(derived)]
117    pub api_version: ElasticsearchApiVersion,
118
119    /// Whether or not to send the `type` field to Elasticsearch.
120    ///
121    /// The `type` field was deprecated in Elasticsearch 7.x and removed in Elasticsearch 8.x.
122    ///
123    /// If enabled, the `doc_type` option is ignored.
124    #[serde(default)]
125    #[configurable(
126        deprecated = "This option has been deprecated, the `api_version` option should be used instead."
127    )]
128    pub suppress_type_name: bool,
129
130    /// Whether or not to retry successful requests containing partial failures.
131    ///
132    /// To avoid duplicates in Elasticsearch, please use option `id_key`.
133    #[serde(default)]
134    #[configurable(metadata(docs::advanced))]
135    pub request_retry_partial: bool,
136
137    /// The name of the event key that should map to Elasticsearch’s [`_id` field][es_id].
138    ///
139    /// By default, the `_id` field is not set, which allows Elasticsearch to set this
140    /// automatically. Setting your own Elasticsearch IDs can [hinder performance][perf_doc].
141    ///
142    /// [es_id]: https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-id-field.html
143    /// [perf_doc]: https://www.elastic.co/guide/en/elasticsearch/reference/master/tune-for-indexing-speed.html#_use_auto_generated_ids
144    #[serde(default)]
145    #[configurable(metadata(docs::advanced))]
146    #[configurable(metadata(docs::examples = "id"))]
147    #[configurable(metadata(docs::examples = "_id"))]
148    pub id_key: Option<ConfigValuePath>,
149
150    /// The name of the pipeline to apply.
151    #[serde(default)]
152    #[configurable(metadata(docs::advanced))]
153    #[configurable(metadata(docs::examples = "pipeline-name"))]
154    pub pipeline: Option<String>,
155
156    #[serde(default)]
157    #[configurable(derived)]
158    pub mode: ElasticsearchMode,
159
160    #[serde(default)]
161    #[configurable(derived)]
162    pub compression: Compression,
163
164    #[serde(skip_serializing_if = "crate::serde::is_default", default)]
165    #[configurable(derived)]
166    #[configurable(metadata(docs::advanced))]
167    pub encoding: Transformer,
168
169    #[serde(default)]
170    #[configurable(derived)]
171    pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
172
173    #[serde(default)]
174    #[configurable(derived)]
175    pub request: RequestConfig,
176
177    #[configurable(derived)]
178    pub auth: Option<ElasticsearchAuthConfig>,
179
180    /// Custom parameters to add to the query string for each HTTP request sent to Elasticsearch.
181    #[serde(default)]
182    #[configurable(metadata(docs::advanced))]
183    #[configurable(metadata(docs::additional_props_description = "A query string parameter."))]
184    #[configurable(metadata(docs::examples = "query_examples()"))]
185    pub query: Option<QueryParameters>,
186
187    #[serde(default)]
188    #[configurable(derived)]
189    #[cfg(feature = "aws-core")]
190    pub aws: Option<crate::aws::RegionOrEndpoint>,
191
192    /// Amazon OpenSearch service type
193    #[serde(default)]
194    pub opensearch_service_type: OpenSearchServiceType,
195
196    #[serde(default)]
197    #[configurable(derived)]
198    pub tls: Option<TlsConfig>,
199
200    #[serde(default)]
201    #[configurable(derived)]
202    #[serde(rename = "distribution")]
203    pub endpoint_health: Option<HealthConfig>,
204
205    // TODO: `bulk` and `data_stream` are each only relevant if the `mode` is set to their
206    // corresponding mode. An improvement to look into would be to extract the `BulkConfig` and
207    // `DataStreamConfig` into the `mode` enum variants. Doing so would remove them from the root
208    // of the config here and thus any post serde config parsing manual error prone logic.
209    #[serde(alias = "normal", default)]
210    #[configurable(derived)]
211    pub bulk: BulkConfig,
212
213    #[serde(default)]
214    #[configurable(derived)]
215    pub data_stream: Option<DataStreamConfig>,
216
217    #[serde(default)]
218    #[configurable(derived)]
219    pub metrics: Option<MetricToLogConfig>,
220
221    #[serde(
222        default,
223        deserialize_with = "crate::serde::bool_or_struct",
224        skip_serializing_if = "crate::serde::is_default"
225    )]
226    #[configurable(derived)]
227    pub acknowledgements: AcknowledgementsConfig,
228}
229
230fn default_doc_type() -> String {
231    "_doc".to_owned()
232}
233
234fn query_examples() -> HashMap<String, String> {
235    HashMap::<_, _>::from_iter([("X-Powered-By".to_owned(), "Vector".to_owned())])
236}
237
238impl Default for ElasticsearchConfig {
239    fn default() -> Self {
240        Self {
241            endpoint: None,
242            endpoints: vec![],
243            doc_type: default_doc_type(),
244            api_version: Default::default(),
245            suppress_type_name: false,
246            request_retry_partial: false,
247            id_key: None,
248            pipeline: None,
249            mode: Default::default(),
250            compression: Default::default(),
251            encoding: Default::default(),
252            batch: Default::default(),
253            request: Default::default(),
254            auth: None,
255            query: None,
256            #[cfg(feature = "aws-core")]
257            aws: None,
258            opensearch_service_type: Default::default(),
259            tls: None,
260            endpoint_health: None,
261            bulk: BulkConfig::default(), // the default mode is Bulk
262            data_stream: None,
263            metrics: None,
264            acknowledgements: Default::default(),
265        }
266    }
267}
268
269impl ElasticsearchConfig {
270    pub fn common_mode(&self) -> crate::Result<ElasticsearchCommonMode> {
271        match self.mode {
272            ElasticsearchMode::Bulk => Ok(ElasticsearchCommonMode::Bulk {
273                index: self.bulk.index.clone(),
274                template_fallback_index: self.bulk.template_fallback_index.clone(),
275                action: self.bulk.action.clone(),
276                version: self.bulk.version.clone(),
277                version_type: self.bulk.version_type,
278            }),
279            ElasticsearchMode::DataStream => Ok(ElasticsearchCommonMode::DataStream(
280                self.data_stream.clone().unwrap_or_default(),
281            )),
282        }
283    }
284}
285
286/// Elasticsearch bulk mode configuration.
287#[configurable_component]
288#[derive(Clone, Debug, PartialEq)]
289#[serde(rename_all = "snake_case")]
290pub struct BulkConfig {
291    /// Action to use when making requests to the [Elasticsearch Bulk API][es_bulk].
292    ///
293    /// Only `index`, `create` and `update` actions are supported.
294    ///
295    /// [es_bulk]: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
296    #[serde(default = "default_bulk_action")]
297    #[configurable(metadata(docs::examples = "create"))]
298    #[configurable(metadata(docs::examples = "{{ action }}"))]
299    pub action: Template,
300
301    /// The name of the index to write events to.
302    #[serde(default = "default_index")]
303    #[configurable(metadata(docs::examples = "application-{{ application_id }}-%Y-%m-%d"))]
304    #[configurable(metadata(docs::examples = "{{ index }}"))]
305    pub index: Template,
306
307    /// The default index to write events to if the template in `bulk.index` cannot be resolved
308    #[configurable(metadata(docs::examples = "test-index"))]
309    pub template_fallback_index: Option<String>,
310
311    /// Version field value.
312    #[configurable(metadata(docs::examples = "{{ obj_version }}-%Y-%m-%d"))]
313    #[configurable(metadata(docs::examples = "123"))]
314    pub version: Option<Template>,
315
316    /// Version type.
317    ///
318    /// Possible values are `internal`, `external` or `external_gt` and `external_gte`.
319    ///
320    /// [es_index_versioning]: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html#index-versioning
321    #[serde(default = "default_version_type")]
322    #[configurable(metadata(docs::examples = "internal"))]
323    #[configurable(metadata(docs::examples = "external"))]
324    pub version_type: VersionType,
325}
326
327fn default_bulk_action() -> Template {
328    Template::try_from("index").expect("unable to parse template")
329}
330
331fn default_index() -> Template {
332    Template::try_from("vector-%Y.%m.%d").expect("unable to parse template")
333}
334
335const fn default_version_type() -> VersionType {
336    VersionType::Internal
337}
338
339impl Default for BulkConfig {
340    fn default() -> Self {
341        Self {
342            action: default_bulk_action(),
343            index: default_index(),
344            template_fallback_index: Default::default(),
345            version: Default::default(),
346            version_type: default_version_type(),
347        }
348    }
349}
350
351/// Elasticsearch data stream mode configuration.
352#[configurable_component]
353#[derive(Clone, Debug)]
354#[serde(rename_all = "snake_case")]
355pub struct DataStreamConfig {
356    /// The data stream type used to construct the data stream at index time.
357    #[serde(rename = "type", default = "DataStreamConfig::default_type")]
358    #[configurable(metadata(docs::examples = "metrics"))]
359    #[configurable(metadata(docs::examples = "synthetics"))]
360    #[configurable(metadata(docs::examples = "{{ type }}"))]
361    pub dtype: Template,
362
363    /// The data stream dataset used to construct the data stream at index time.
364    #[serde(default = "DataStreamConfig::default_dataset")]
365    #[configurable(metadata(docs::examples = "generic"))]
366    #[configurable(metadata(docs::examples = "nginx"))]
367    #[configurable(metadata(docs::examples = "{{ service }}"))]
368    pub dataset: Template,
369
370    /// The data stream namespace used to construct the data stream at index time.
371    #[serde(default = "DataStreamConfig::default_namespace")]
372    #[configurable(metadata(docs::examples = "{{ environment }}"))]
373    pub namespace: Template,
374
375    /// Automatically routes events by deriving the data stream name using specific event fields.
376    ///
377    /// The format of the data stream name is `<type>-<dataset>-<namespace>`, where each value comes
378    /// from the `data_stream` configuration field of the same name.
379    ///
380    /// If enabled, the value of the `data_stream.type`, `data_stream.dataset`, and
381    /// `data_stream.namespace` event fields are used if they are present. Otherwise, the values
382    /// set in this configuration are used.
383    #[serde(default = "DataStreamConfig::default_auto_routing")]
384    pub auto_routing: bool,
385
386    /// Automatically adds and syncs the `data_stream.*` event fields if they are missing from the event.
387    ///
388    /// This ensures that fields match the name of the data stream that is receiving events.
389    #[serde(default = "DataStreamConfig::default_sync_fields")]
390    pub sync_fields: bool,
391}
392
393impl Default for DataStreamConfig {
394    fn default() -> Self {
395        Self {
396            dtype: Self::default_type(),
397            dataset: Self::default_dataset(),
398            namespace: Self::default_namespace(),
399            auto_routing: Self::default_auto_routing(),
400            sync_fields: Self::default_sync_fields(),
401        }
402    }
403}
404
405impl DataStreamConfig {
406    fn default_type() -> Template {
407        Template::try_from("logs").expect("couldn't build default type template")
408    }
409
410    fn default_dataset() -> Template {
411        Template::try_from("generic").expect("couldn't build default dataset template")
412    }
413
414    fn default_namespace() -> Template {
415        Template::try_from("default").expect("couldn't build default namespace template")
416    }
417
418    const fn default_auto_routing() -> bool {
419        true
420    }
421
422    const fn default_sync_fields() -> bool {
423        true
424    }
425
426    /// If there is a `timestamp` field, rename it to the expected `@timestamp` for Elastic Common Schema.
427    pub fn remap_timestamp(&self, log: &mut LogEvent) {
428        if let Some(timestamp_key) = log.timestamp_path().cloned() {
429            if timestamp_key.to_string() == DATA_STREAM_TIMESTAMP_KEY {
430                return;
431            }
432
433            log.rename_key(&timestamp_key, event_path!(DATA_STREAM_TIMESTAMP_KEY));
434        }
435    }
436
437    pub fn dtype<'a>(&self, event: impl Into<EventRef<'a>>) -> Option<String> {
438        self.dtype
439            .render_string(event)
440            .map_err(|error| {
441                emit!(TemplateRenderingError {
442                    error,
443                    field: Some("data_stream.type"),
444                    drop_event: true,
445                });
446            })
447            .ok()
448    }
449
450    pub fn dataset<'a>(&self, event: impl Into<EventRef<'a>>) -> Option<String> {
451        self.dataset
452            .render_string(event)
453            .map_err(|error| {
454                emit!(TemplateRenderingError {
455                    error,
456                    field: Some("data_stream.dataset"),
457                    drop_event: true,
458                });
459            })
460            .ok()
461    }
462
463    pub fn namespace<'a>(&self, event: impl Into<EventRef<'a>>) -> Option<String> {
464        self.namespace
465            .render_string(event)
466            .map_err(|error| {
467                emit!(TemplateRenderingError {
468                    error,
469                    field: Some("data_stream.namespace"),
470                    drop_event: true,
471                });
472            })
473            .ok()
474    }
475
476    pub fn sync_fields(&self, log: &mut LogEvent) {
477        if !self.sync_fields {
478            return;
479        }
480
481        let dtype = self.dtype(&*log);
482        let dataset = self.dataset(&*log);
483        let namespace = self.namespace(&*log);
484
485        if log.as_map().is_none() {
486            *log.value_mut() = Value::Object(BTreeMap::new());
487        }
488        let existing = log
489            .as_map_mut()
490            .expect("must be a map")
491            .entry("data_stream".into())
492            .or_insert_with(|| Value::Object(BTreeMap::new()))
493            .as_object_mut_unwrap();
494
495        if let Some(dtype) = dtype {
496            existing
497                .entry("type".into())
498                .or_insert_with(|| dtype.into());
499        }
500        if let Some(dataset) = dataset {
501            existing
502                .entry("dataset".into())
503                .or_insert_with(|| dataset.into());
504        }
505        if let Some(namespace) = namespace {
506            existing
507                .entry("namespace".into())
508                .or_insert_with(|| namespace.into());
509        }
510    }
511
512    pub fn index(&self, log: &LogEvent) -> Option<String> {
513        let (dtype, dataset, namespace) = if !self.auto_routing {
514            (self.dtype(log)?, self.dataset(log)?, self.namespace(log)?)
515        } else {
516            let data_stream = log
517                .get(event_path!("data_stream"))
518                .and_then(|ds| ds.as_object());
519            let dtype = data_stream
520                .and_then(|ds| ds.get("type"))
521                .map(|value| value.to_string_lossy().into_owned())
522                .or_else(|| self.dtype(log))?;
523            let dataset = data_stream
524                .and_then(|ds| ds.get("dataset"))
525                .map(|value| value.to_string_lossy().into_owned())
526                .or_else(|| self.dataset(log))?;
527            let namespace = data_stream
528                .and_then(|ds| ds.get("namespace"))
529                .map(|value| value.to_string_lossy().into_owned())
530                .or_else(|| self.namespace(log))?;
531            (dtype, dataset, namespace)
532        };
533
534        let name = [dtype, dataset, namespace]
535            .into_iter()
536            .filter(|s| !s.is_empty())
537            .collect::<Vec<_>>()
538            .join("-");
539
540        Some(name)
541    }
542}
543
544#[async_trait::async_trait]
545#[typetag::serde(name = "elasticsearch")]
546impl SinkConfig for ElasticsearchConfig {
547    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
548        let commons = ElasticsearchCommon::parse_many(self, cx.proxy()).await?;
549        let common = commons[0].clone();
550
551        let client = HttpClient::new(common.tls_settings.clone(), cx.proxy())?;
552
553        let request_limits = self.request.tower.into_settings();
554
555        let health_config = self.endpoint_health.clone().unwrap_or_default();
556
557        let services = commons
558            .iter()
559            .cloned()
560            .map(|common| {
561                let endpoint = common.base_url.clone();
562
563                let http_request_builder = HttpRequestBuilder::new(&common, self);
564                let service = ElasticsearchService::new(client.clone(), http_request_builder);
565
566                (endpoint, service)
567            })
568            .collect::<Vec<_>>();
569
570        let service = request_limits.distributed_service(
571            ElasticsearchRetryLogic {
572                retry_partial: self.request_retry_partial,
573            },
574            services,
575            health_config,
576            ElasticsearchHealthLogic,
577            1,
578        );
579
580        let sink = ElasticsearchSink::new(&common, self, service)?;
581
582        let stream = VectorSink::from_event_streamsink(sink);
583
584        let healthcheck = futures::future::select_ok(
585            commons
586                .into_iter()
587                .map(move |common| common.healthcheck(client.clone()).boxed()),
588        )
589        .map_ok(|((), _)| ())
590        .boxed();
591        Ok((stream, healthcheck))
592    }
593
594    fn input(&self) -> Input {
595        let requirements = Requirement::empty().optional_meaning("timestamp", Kind::timestamp());
596
597        Input::new(DataType::Metric | DataType::Log).with_schema_requirement(requirements)
598    }
599
600    fn acknowledgements(&self) -> &AcknowledgementsConfig {
601        &self.acknowledgements
602    }
603}
604
605#[cfg(test)]
606mod tests {
607    use super::*;
608
609    #[test]
610    fn generate_config() {
611        crate::test_util::test_generate_config::<ElasticsearchConfig>();
612    }
613
614    #[test]
615    fn parse_aws_auth() {
616        toml::from_str::<ElasticsearchConfig>(
617            r#"
618            endpoints = [""]
619            auth.strategy = "aws"
620            auth.assume_role = "role"
621        "#,
622        )
623        .unwrap();
624
625        toml::from_str::<ElasticsearchConfig>(
626            r#"
627            endpoints = [""]
628            auth.strategy = "aws"
629        "#,
630        )
631        .unwrap();
632    }
633
634    #[test]
635    fn parse_mode() {
636        let config = toml::from_str::<ElasticsearchConfig>(
637            r#"
638            endpoints = [""]
639            mode = "data_stream"
640            data_stream.type = "synthetics"
641        "#,
642        )
643        .unwrap();
644        assert!(matches!(config.mode, ElasticsearchMode::DataStream));
645        assert!(config.data_stream.is_some());
646    }
647
648    #[test]
649    fn parse_distribution() {
650        toml::from_str::<ElasticsearchConfig>(
651            r#"
652            endpoints = ["", ""]
653            distribution.retry_initial_backoff_secs = 10
654        "#,
655        )
656        .unwrap();
657    }
658
659    #[test]
660    fn parse_version() {
661        let config = toml::from_str::<ElasticsearchConfig>(
662            r#"
663            endpoints = [""]
664            api_version = "v7"
665        "#,
666        )
667        .unwrap();
668        assert_eq!(config.api_version, ElasticsearchApiVersion::V7);
669    }
670
671    #[test]
672    fn parse_version_auto() {
673        let config = toml::from_str::<ElasticsearchConfig>(
674            r#"
675            endpoints = [""]
676            api_version = "auto"
677        "#,
678        )
679        .unwrap();
680        assert_eq!(config.api_version, ElasticsearchApiVersion::Auto);
681    }
682
683    #[test]
684    fn parse_default_bulk() {
685        let config = toml::from_str::<ElasticsearchConfig>(
686            r#"
687            endpoints = [""]
688        "#,
689        )
690        .unwrap();
691        assert_eq!(config.mode, ElasticsearchMode::Bulk);
692        assert_eq!(config.bulk, BulkConfig::default());
693    }
694
695    #[test]
696    fn parse_opensearch_service_type_managed() {
697        let config = toml::from_str::<ElasticsearchConfig>(
698            r#"
699            endpoints = [""]
700            opensearch_service_type = "managed"
701        "#,
702        )
703        .unwrap();
704        assert_eq!(
705            config.opensearch_service_type,
706            OpenSearchServiceType::Managed
707        );
708    }
709
710    #[test]
711    fn parse_opensearch_service_type_serverless() {
712        let config = toml::from_str::<ElasticsearchConfig>(
713            r#"
714            endpoints = [""]
715            opensearch_service_type = "serverless"
716            auth.strategy = "aws"
717            api_version = "auto"
718        "#,
719        )
720        .unwrap();
721        assert_eq!(
722            config.opensearch_service_type,
723            OpenSearchServiceType::Serverless
724        );
725    }
726
727    #[test]
728    fn parse_opensearch_service_type_default() {
729        let config = toml::from_str::<ElasticsearchConfig>(
730            r#"
731            endpoints = [""]
732        "#,
733        )
734        .unwrap();
735        assert_eq!(
736            config.opensearch_service_type,
737            OpenSearchServiceType::Managed
738        );
739    }
740
741    #[cfg(feature = "aws-core")]
742    #[test]
743    fn parse_opensearch_serverless_with_aws_auth() {
744        let config = toml::from_str::<ElasticsearchConfig>(
745            r#"
746            endpoints = [""]
747            opensearch_service_type = "serverless"
748            auth.strategy = "aws"
749            api_version = "auto"
750        "#,
751        )
752        .unwrap();
753        assert_eq!(
754            config.opensearch_service_type,
755            OpenSearchServiceType::Serverless
756        );
757        assert!(matches!(config.auth, Some(ElasticsearchAuthConfig::Aws(_))));
758        assert_eq!(config.api_version, ElasticsearchApiVersion::Auto);
759    }
760}