vector/sinks/elasticsearch/
config.rs

1use std::{
2    collections::{BTreeMap, HashMap},
3    convert::TryFrom,
4};
5
6use futures::{FutureExt, TryFutureExt};
7use vector_lib::configurable::configurable_component;
8
9use crate::{
10    codecs::Transformer,
11    config::{AcknowledgementsConfig, DataType, Input, SinkConfig, SinkContext},
12    event::{EventRef, LogEvent, Value},
13    http::{HttpClient, QueryParameters},
14    internal_events::TemplateRenderingError,
15    sinks::{
16        elasticsearch::{
17            health::ElasticsearchHealthLogic,
18            retry::ElasticsearchRetryLogic,
19            service::{ElasticsearchService, HttpRequestBuilder},
20            sink::ElasticsearchSink,
21            ElasticsearchApiVersion, ElasticsearchAuthConfig, ElasticsearchCommon,
22            ElasticsearchCommonMode, ElasticsearchMode, VersionType,
23        },
24        util::{
25            http::RequestConfig, service::HealthConfig, BatchConfig, Compression,
26            RealtimeSizeBasedDefaultBatchSettings,
27        },
28        Healthcheck, VectorSink,
29    },
30    template::Template,
31    tls::TlsConfig,
32    transforms::metric_to_log::MetricToLogConfig,
33};
34use vector_lib::lookup::event_path;
35use vector_lib::lookup::lookup_v2::ConfigValuePath;
36use vector_lib::schema::Requirement;
37use vrl::value::Kind;
38
39/// The field name for the timestamp required by data stream mode
40pub const DATA_STREAM_TIMESTAMP_KEY: &str = "@timestamp";
41
42/// The Amazon OpenSearch service type, either managed or serverless; primarily, selects the
43/// correct AWS service to use when calculating the AWS v4 signature + disables features
44/// unsupported by serverless: Elasticsearch API version autodetection, health checks
45#[configurable_component]
46#[derive(Clone, Debug, Eq, PartialEq)]
47#[serde(deny_unknown_fields, rename_all = "lowercase")]
48pub enum OpenSearchServiceType {
49    /// Elasticsearch or OpenSearch Managed domain
50    Managed,
51    /// OpenSearch Serverless collection
52    Serverless,
53}
54
55impl OpenSearchServiceType {
56    pub const fn as_str(&self) -> &'static str {
57        match self {
58            OpenSearchServiceType::Managed => "es",
59            OpenSearchServiceType::Serverless => "aoss",
60        }
61    }
62}
63
64impl Default for OpenSearchServiceType {
65    fn default() -> Self {
66        Self::Managed
67    }
68}
69
70/// Configuration for the `elasticsearch` sink.
71#[configurable_component(sink("elasticsearch", "Index observability events in Elasticsearch."))]
72#[derive(Clone, Debug)]
73#[serde(deny_unknown_fields)]
74pub struct ElasticsearchConfig {
75    /// The Elasticsearch endpoint to send logs to.
76    ///
77    /// The endpoint must contain an HTTP scheme, and may specify a
78    /// hostname or IP address and port.
79    #[serde(default)]
80    #[configurable(
81        deprecated = "This option has been deprecated, the `endpoints` option should be used instead."
82    )]
83    pub endpoint: Option<String>,
84
85    /// A list of Elasticsearch endpoints to send logs to.
86    ///
87    /// The endpoint must contain an HTTP scheme, and may specify a
88    /// hostname or IP address and port.
89    /// The endpoint may include basic authentication credentials,
90    /// e.g., `https://user:password@example.com`. If credentials are provided in the endpoint,
91    /// they will be used to authenticate against Elasticsearch.
92    ///
93    /// If `auth` is specified and the endpoint contains credentials,
94    /// a configuration error will be raised.
95    #[serde(default)]
96    #[configurable(metadata(docs::examples = "http://10.24.32.122:9000"))]
97    #[configurable(metadata(docs::examples = "https://example.com"))]
98    #[configurable(metadata(docs::examples = "https://user:password@example.com"))]
99    pub endpoints: Vec<String>,
100
101    /// The [`doc_type`][doc_type] for your index data.
102    ///
103    /// This is only relevant for Elasticsearch <= 6.X. If you are using >= 7.0 you do not need to
104    /// set this option since Elasticsearch has removed it.
105    ///
106    /// [doc_type]: https://www.elastic.co/guide/en/elasticsearch/reference/6.8/actions-index.html
107    #[serde(default = "default_doc_type")]
108    #[configurable(metadata(docs::advanced))]
109    pub doc_type: String,
110
111    /// The API version of Elasticsearch.
112    ///
113    /// Amazon OpenSearch Serverless requires this option to be set to `auto` (the default).
114    #[serde(default)]
115    #[configurable(derived)]
116    pub api_version: ElasticsearchApiVersion,
117
118    /// Whether or not to send the `type` field to Elasticsearch.
119    ///
120    /// The `type` field was deprecated in Elasticsearch 7.x and removed in Elasticsearch 8.x.
121    ///
122    /// If enabled, the `doc_type` option is ignored.
123    #[serde(default)]
124    #[configurable(
125        deprecated = "This option has been deprecated, the `api_version` option should be used instead."
126    )]
127    pub suppress_type_name: bool,
128
129    /// Whether or not to retry successful requests containing partial failures.
130    ///
131    /// To avoid duplicates in Elasticsearch, please use option `id_key`.
132    #[serde(default)]
133    #[configurable(metadata(docs::advanced))]
134    pub request_retry_partial: bool,
135
136    /// The name of the event key that should map to Elasticsearch’s [`_id` field][es_id].
137    ///
138    /// By default, the `_id` field is not set, which allows Elasticsearch to set this
139    /// automatically. Setting your own Elasticsearch IDs can [hinder performance][perf_doc].
140    ///
141    /// [es_id]: https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-id-field.html
142    /// [perf_doc]: https://www.elastic.co/guide/en/elasticsearch/reference/master/tune-for-indexing-speed.html#_use_auto_generated_ids
143    #[serde(default)]
144    #[configurable(metadata(docs::advanced))]
145    #[configurable(metadata(docs::examples = "id"))]
146    #[configurable(metadata(docs::examples = "_id"))]
147    pub id_key: Option<ConfigValuePath>,
148
149    /// The name of the pipeline to apply.
150    #[serde(default)]
151    #[configurable(metadata(docs::advanced))]
152    #[configurable(metadata(docs::examples = "pipeline-name"))]
153    pub pipeline: Option<String>,
154
155    #[serde(default)]
156    #[configurable(derived)]
157    pub mode: ElasticsearchMode,
158
159    #[serde(default)]
160    #[configurable(derived)]
161    pub compression: Compression,
162
163    #[serde(skip_serializing_if = "crate::serde::is_default", default)]
164    #[configurable(derived)]
165    #[configurable(metadata(docs::advanced))]
166    pub encoding: Transformer,
167
168    #[serde(default)]
169    #[configurable(derived)]
170    pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
171
172    #[serde(default)]
173    #[configurable(derived)]
174    pub request: RequestConfig,
175
176    #[configurable(derived)]
177    pub auth: Option<ElasticsearchAuthConfig>,
178
179    /// Custom parameters to add to the query string for each HTTP request sent to Elasticsearch.
180    #[serde(default)]
181    #[configurable(metadata(docs::advanced))]
182    #[configurable(metadata(docs::additional_props_description = "A query string parameter."))]
183    #[configurable(metadata(docs::examples = "query_examples()"))]
184    pub query: Option<QueryParameters>,
185
186    #[serde(default)]
187    #[configurable(derived)]
188    #[cfg(feature = "aws-core")]
189    pub aws: Option<crate::aws::RegionOrEndpoint>,
190
191    /// Amazon OpenSearch service type
192    #[serde(default)]
193    pub opensearch_service_type: OpenSearchServiceType,
194
195    #[serde(default)]
196    #[configurable(derived)]
197    pub tls: Option<TlsConfig>,
198
199    #[serde(default)]
200    #[configurable(derived)]
201    #[serde(rename = "distribution")]
202    pub endpoint_health: Option<HealthConfig>,
203
204    // TODO: `bulk` and `data_stream` are each only relevant if the `mode` is set to their
205    // corresponding mode. An improvement to look into would be to extract the `BulkConfig` and
206    // `DataStreamConfig` into the `mode` enum variants. Doing so would remove them from the root
207    // of the config here and thus any post serde config parsing manual error prone logic.
208    #[serde(alias = "normal", default)]
209    #[configurable(derived)]
210    pub bulk: BulkConfig,
211
212    #[serde(default)]
213    #[configurable(derived)]
214    pub data_stream: Option<DataStreamConfig>,
215
216    #[serde(default)]
217    #[configurable(derived)]
218    pub metrics: Option<MetricToLogConfig>,
219
220    #[serde(
221        default,
222        deserialize_with = "crate::serde::bool_or_struct",
223        skip_serializing_if = "crate::serde::is_default"
224    )]
225    #[configurable(derived)]
226    pub acknowledgements: AcknowledgementsConfig,
227}
228
229fn default_doc_type() -> String {
230    "_doc".to_owned()
231}
232
233fn query_examples() -> HashMap<String, String> {
234    HashMap::<_, _>::from_iter([("X-Powered-By".to_owned(), "Vector".to_owned())])
235}
236
237impl Default for ElasticsearchConfig {
238    fn default() -> Self {
239        Self {
240            endpoint: None,
241            endpoints: vec![],
242            doc_type: default_doc_type(),
243            api_version: Default::default(),
244            suppress_type_name: false,
245            request_retry_partial: false,
246            id_key: None,
247            pipeline: None,
248            mode: Default::default(),
249            compression: Default::default(),
250            encoding: Default::default(),
251            batch: Default::default(),
252            request: Default::default(),
253            auth: None,
254            query: None,
255            #[cfg(feature = "aws-core")]
256            aws: None,
257            opensearch_service_type: Default::default(),
258            tls: None,
259            endpoint_health: None,
260            bulk: BulkConfig::default(), // the default mode is Bulk
261            data_stream: None,
262            metrics: None,
263            acknowledgements: Default::default(),
264        }
265    }
266}
267
268impl ElasticsearchConfig {
269    pub fn common_mode(&self) -> crate::Result<ElasticsearchCommonMode> {
270        match self.mode {
271            ElasticsearchMode::Bulk => Ok(ElasticsearchCommonMode::Bulk {
272                index: self.bulk.index.clone(),
273                template_fallback_index: self.bulk.template_fallback_index.clone(),
274                action: self.bulk.action.clone(),
275                version: self.bulk.version.clone(),
276                version_type: self.bulk.version_type,
277            }),
278            ElasticsearchMode::DataStream => Ok(ElasticsearchCommonMode::DataStream(
279                self.data_stream.clone().unwrap_or_default(),
280            )),
281        }
282    }
283}
284
285/// Elasticsearch bulk mode configuration.
286#[configurable_component]
287#[derive(Clone, Debug, PartialEq)]
288#[serde(rename_all = "snake_case")]
289pub struct BulkConfig {
290    /// Action to use when making requests to the [Elasticsearch Bulk API][es_bulk].
291    ///
292    /// Only `index`, `create` and `update` actions are supported.
293    ///
294    /// [es_bulk]: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
295    #[serde(default = "default_bulk_action")]
296    #[configurable(metadata(docs::examples = "create"))]
297    #[configurable(metadata(docs::examples = "{{ action }}"))]
298    pub action: Template,
299
300    /// The name of the index to write events to.
301    #[serde(default = "default_index")]
302    #[configurable(metadata(docs::examples = "application-{{ application_id }}-%Y-%m-%d"))]
303    #[configurable(metadata(docs::examples = "{{ index }}"))]
304    pub index: Template,
305
306    /// The default index to write events to if the template in `bulk.index` cannot be resolved
307    #[configurable(metadata(docs::examples = "test-index"))]
308    pub template_fallback_index: Option<String>,
309
310    /// Version field value.
311    #[configurable(metadata(docs::examples = "{{ obj_version }}-%Y-%m-%d"))]
312    #[configurable(metadata(docs::examples = "123"))]
313    pub version: Option<Template>,
314
315    /// Version type.
316    ///
317    /// Possible values are `internal`, `external` or `external_gt` and `external_gte`.
318    ///
319    /// [es_index_versioning]: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html#index-versioning
320    #[serde(default = "default_version_type")]
321    #[configurable(metadata(docs::examples = "internal"))]
322    #[configurable(metadata(docs::examples = "external"))]
323    pub version_type: VersionType,
324}
325
326fn default_bulk_action() -> Template {
327    Template::try_from("index").expect("unable to parse template")
328}
329
330fn default_index() -> Template {
331    Template::try_from("vector-%Y.%m.%d").expect("unable to parse template")
332}
333
334const fn default_version_type() -> VersionType {
335    VersionType::Internal
336}
337
338impl Default for BulkConfig {
339    fn default() -> Self {
340        Self {
341            action: default_bulk_action(),
342            index: default_index(),
343            template_fallback_index: Default::default(),
344            version: Default::default(),
345            version_type: default_version_type(),
346        }
347    }
348}
349
350/// Elasticsearch data stream mode configuration.
351#[configurable_component]
352#[derive(Clone, Debug)]
353#[serde(rename_all = "snake_case")]
354pub struct DataStreamConfig {
355    /// The data stream type used to construct the data stream at index time.
356    #[serde(rename = "type", default = "DataStreamConfig::default_type")]
357    #[configurable(metadata(docs::examples = "metrics"))]
358    #[configurable(metadata(docs::examples = "synthetics"))]
359    #[configurable(metadata(docs::examples = "{{ type }}"))]
360    pub dtype: Template,
361
362    /// The data stream dataset used to construct the data stream at index time.
363    #[serde(default = "DataStreamConfig::default_dataset")]
364    #[configurable(metadata(docs::examples = "generic"))]
365    #[configurable(metadata(docs::examples = "nginx"))]
366    #[configurable(metadata(docs::examples = "{{ service }}"))]
367    pub dataset: Template,
368
369    /// The data stream namespace used to construct the data stream at index time.
370    #[serde(default = "DataStreamConfig::default_namespace")]
371    #[configurable(metadata(docs::examples = "{{ environment }}"))]
372    pub namespace: Template,
373
374    /// Automatically routes events by deriving the data stream name using specific event fields.
375    ///
376    /// The format of the data stream name is `<type>-<dataset>-<namespace>`, where each value comes
377    /// from the `data_stream` configuration field of the same name.
378    ///
379    /// If enabled, the value of the `data_stream.type`, `data_stream.dataset`, and
380    /// `data_stream.namespace` event fields are used if they are present. Otherwise, the values
381    /// set in this configuration are used.
382    #[serde(default = "DataStreamConfig::default_auto_routing")]
383    pub auto_routing: bool,
384
385    /// Automatically adds and syncs the `data_stream.*` event fields if they are missing from the event.
386    ///
387    /// This ensures that fields match the name of the data stream that is receiving events.
388    #[serde(default = "DataStreamConfig::default_sync_fields")]
389    pub sync_fields: bool,
390}
391
392impl Default for DataStreamConfig {
393    fn default() -> Self {
394        Self {
395            dtype: Self::default_type(),
396            dataset: Self::default_dataset(),
397            namespace: Self::default_namespace(),
398            auto_routing: Self::default_auto_routing(),
399            sync_fields: Self::default_sync_fields(),
400        }
401    }
402}
403
404impl DataStreamConfig {
405    fn default_type() -> Template {
406        Template::try_from("logs").expect("couldn't build default type template")
407    }
408
409    fn default_dataset() -> Template {
410        Template::try_from("generic").expect("couldn't build default dataset template")
411    }
412
413    fn default_namespace() -> Template {
414        Template::try_from("default").expect("couldn't build default namespace template")
415    }
416
417    const fn default_auto_routing() -> bool {
418        true
419    }
420
421    const fn default_sync_fields() -> bool {
422        true
423    }
424
425    /// If there is a `timestamp` field, rename it to the expected `@timestamp` for Elastic Common Schema.
426    pub fn remap_timestamp(&self, log: &mut LogEvent) {
427        if let Some(timestamp_key) = log.timestamp_path().cloned() {
428            if timestamp_key.to_string() == DATA_STREAM_TIMESTAMP_KEY {
429                return;
430            }
431
432            log.rename_key(&timestamp_key, event_path!(DATA_STREAM_TIMESTAMP_KEY));
433        }
434    }
435
436    pub fn dtype<'a>(&self, event: impl Into<EventRef<'a>>) -> Option<String> {
437        self.dtype
438            .render_string(event)
439            .map_err(|error| {
440                emit!(TemplateRenderingError {
441                    error,
442                    field: Some("data_stream.type"),
443                    drop_event: true,
444                });
445            })
446            .ok()
447    }
448
449    pub fn dataset<'a>(&self, event: impl Into<EventRef<'a>>) -> Option<String> {
450        self.dataset
451            .render_string(event)
452            .map_err(|error| {
453                emit!(TemplateRenderingError {
454                    error,
455                    field: Some("data_stream.dataset"),
456                    drop_event: true,
457                });
458            })
459            .ok()
460    }
461
462    pub fn namespace<'a>(&self, event: impl Into<EventRef<'a>>) -> Option<String> {
463        self.namespace
464            .render_string(event)
465            .map_err(|error| {
466                emit!(TemplateRenderingError {
467                    error,
468                    field: Some("data_stream.namespace"),
469                    drop_event: true,
470                });
471            })
472            .ok()
473    }
474
475    pub fn sync_fields(&self, log: &mut LogEvent) {
476        if !self.sync_fields {
477            return;
478        }
479
480        let dtype = self.dtype(&*log);
481        let dataset = self.dataset(&*log);
482        let namespace = self.namespace(&*log);
483
484        if log.as_map().is_none() {
485            *log.value_mut() = Value::Object(BTreeMap::new());
486        }
487        let existing = log
488            .as_map_mut()
489            .expect("must be a map")
490            .entry("data_stream".into())
491            .or_insert_with(|| Value::Object(BTreeMap::new()))
492            .as_object_mut_unwrap();
493
494        if let Some(dtype) = dtype {
495            existing
496                .entry("type".into())
497                .or_insert_with(|| dtype.into());
498        }
499        if let Some(dataset) = dataset {
500            existing
501                .entry("dataset".into())
502                .or_insert_with(|| dataset.into());
503        }
504        if let Some(namespace) = namespace {
505            existing
506                .entry("namespace".into())
507                .or_insert_with(|| namespace.into());
508        }
509    }
510
511    pub fn index(&self, log: &LogEvent) -> Option<String> {
512        let (dtype, dataset, namespace) = if !self.auto_routing {
513            (self.dtype(log)?, self.dataset(log)?, self.namespace(log)?)
514        } else {
515            let data_stream = log
516                .get(event_path!("data_stream"))
517                .and_then(|ds| ds.as_object());
518            let dtype = data_stream
519                .and_then(|ds| ds.get("type"))
520                .map(|value| value.to_string_lossy().into_owned())
521                .or_else(|| self.dtype(log))?;
522            let dataset = data_stream
523                .and_then(|ds| ds.get("dataset"))
524                .map(|value| value.to_string_lossy().into_owned())
525                .or_else(|| self.dataset(log))?;
526            let namespace = data_stream
527                .and_then(|ds| ds.get("namespace"))
528                .map(|value| value.to_string_lossy().into_owned())
529                .or_else(|| self.namespace(log))?;
530            (dtype, dataset, namespace)
531        };
532
533        let name = [dtype, dataset, namespace]
534            .into_iter()
535            .filter(|s| !s.is_empty())
536            .collect::<Vec<_>>()
537            .join("-");
538
539        Some(name)
540    }
541}
542
543#[async_trait::async_trait]
544#[typetag::serde(name = "elasticsearch")]
545impl SinkConfig for ElasticsearchConfig {
546    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
547        let commons = ElasticsearchCommon::parse_many(self, cx.proxy()).await?;
548        let common = commons[0].clone();
549
550        let client = HttpClient::new(common.tls_settings.clone(), cx.proxy())?;
551
552        let request_limits = self.request.tower.into_settings();
553
554        let health_config = self.endpoint_health.clone().unwrap_or_default();
555
556        let services = commons
557            .iter()
558            .cloned()
559            .map(|common| {
560                let endpoint = common.base_url.clone();
561
562                let http_request_builder = HttpRequestBuilder::new(&common, self);
563                let service = ElasticsearchService::new(client.clone(), http_request_builder);
564
565                (endpoint, service)
566            })
567            .collect::<Vec<_>>();
568
569        let service = request_limits.distributed_service(
570            ElasticsearchRetryLogic {
571                retry_partial: self.request_retry_partial,
572            },
573            services,
574            health_config,
575            ElasticsearchHealthLogic,
576            1,
577        );
578
579        let sink = ElasticsearchSink::new(&common, self, service)?;
580
581        let stream = VectorSink::from_event_streamsink(sink);
582
583        let healthcheck = futures::future::select_ok(
584            commons
585                .into_iter()
586                .map(move |common| common.healthcheck(client.clone()).boxed()),
587        )
588        .map_ok(|((), _)| ())
589        .boxed();
590        Ok((stream, healthcheck))
591    }
592
593    fn input(&self) -> Input {
594        let requirements = Requirement::empty().optional_meaning("timestamp", Kind::timestamp());
595
596        Input::new(DataType::Metric | DataType::Log).with_schema_requirement(requirements)
597    }
598
599    fn acknowledgements(&self) -> &AcknowledgementsConfig {
600        &self.acknowledgements
601    }
602}
603
604#[cfg(test)]
605mod tests {
606    use super::*;
607
608    #[test]
609    fn generate_config() {
610        crate::test_util::test_generate_config::<ElasticsearchConfig>();
611    }
612
613    #[test]
614    fn parse_aws_auth() {
615        toml::from_str::<ElasticsearchConfig>(
616            r#"
617            endpoints = [""]
618            auth.strategy = "aws"
619            auth.assume_role = "role"
620        "#,
621        )
622        .unwrap();
623
624        toml::from_str::<ElasticsearchConfig>(
625            r#"
626            endpoints = [""]
627            auth.strategy = "aws"
628        "#,
629        )
630        .unwrap();
631    }
632
633    #[test]
634    fn parse_mode() {
635        let config = toml::from_str::<ElasticsearchConfig>(
636            r#"
637            endpoints = [""]
638            mode = "data_stream"
639            data_stream.type = "synthetics"
640        "#,
641        )
642        .unwrap();
643        assert!(matches!(config.mode, ElasticsearchMode::DataStream));
644        assert!(config.data_stream.is_some());
645    }
646
647    #[test]
648    fn parse_distribution() {
649        toml::from_str::<ElasticsearchConfig>(
650            r#"
651            endpoints = ["", ""]
652            distribution.retry_initial_backoff_secs = 10
653        "#,
654        )
655        .unwrap();
656    }
657
658    #[test]
659    fn parse_version() {
660        let config = toml::from_str::<ElasticsearchConfig>(
661            r#"
662            endpoints = [""]
663            api_version = "v7"
664        "#,
665        )
666        .unwrap();
667        assert_eq!(config.api_version, ElasticsearchApiVersion::V7);
668    }
669
670    #[test]
671    fn parse_version_auto() {
672        let config = toml::from_str::<ElasticsearchConfig>(
673            r#"
674            endpoints = [""]
675            api_version = "auto"
676        "#,
677        )
678        .unwrap();
679        assert_eq!(config.api_version, ElasticsearchApiVersion::Auto);
680    }
681
682    #[test]
683    fn parse_default_bulk() {
684        let config = toml::from_str::<ElasticsearchConfig>(
685            r#"
686            endpoints = [""]
687        "#,
688        )
689        .unwrap();
690        assert_eq!(config.mode, ElasticsearchMode::Bulk);
691        assert_eq!(config.bulk, BulkConfig::default());
692    }
693}