1use std::{
2 collections::{BTreeMap, HashMap},
3 convert::TryFrom,
4};
5
6use futures::{FutureExt, TryFutureExt};
7use vector_lib::{
8 configurable::configurable_component,
9 lookup::{event_path, lookup_v2::ConfigValuePath},
10 schema::Requirement,
11};
12use vrl::value::Kind;
13
14use crate::{
15 codecs::Transformer,
16 config::{AcknowledgementsConfig, DataType, Input, SinkConfig, SinkContext},
17 event::{EventRef, LogEvent, Value},
18 http::{HttpClient, QueryParameters},
19 internal_events::TemplateRenderingError,
20 sinks::{
21 Healthcheck, VectorSink,
22 elasticsearch::{
23 ElasticsearchApiVersion, ElasticsearchAuthConfig, ElasticsearchCommon,
24 ElasticsearchCommonMode, ElasticsearchMode, VersionType,
25 health::ElasticsearchHealthLogic,
26 retry::ElasticsearchRetryLogic,
27 service::{ElasticsearchService, HttpRequestBuilder},
28 sink::ElasticsearchSink,
29 },
30 util::{
31 BatchConfig, Compression, RealtimeSizeBasedDefaultBatchSettings, http::RequestConfig,
32 service::HealthConfig,
33 },
34 },
35 template::Template,
36 tls::TlsConfig,
37 transforms::metric_to_log::MetricToLogConfig,
38};
39
40pub const DATA_STREAM_TIMESTAMP_KEY: &str = "@timestamp";
42
43#[configurable_component]
47#[derive(Clone, Debug, Eq, PartialEq)]
48#[serde(deny_unknown_fields, rename_all = "lowercase")]
49#[derive(Default)]
50pub enum OpenSearchServiceType {
51 #[default]
53 Managed,
54 Serverless,
56}
57
58impl OpenSearchServiceType {
59 pub const fn as_str(&self) -> &'static str {
60 match self {
61 OpenSearchServiceType::Managed => "es",
62 OpenSearchServiceType::Serverless => "aoss",
63 }
64 }
65}
66
67#[configurable_component(sink("elasticsearch", "Index observability events in Elasticsearch."))]
69#[derive(Clone, Debug)]
70#[serde(deny_unknown_fields)]
71pub struct ElasticsearchConfig {
72 #[serde(default)]
77 #[configurable(
78 deprecated = "This option has been deprecated, the `endpoints` option should be used instead."
79 )]
80 pub endpoint: Option<String>,
81
82 #[serde(default)]
93 #[configurable(metadata(docs::examples = "http://10.24.32.122:9000"))]
94 #[configurable(metadata(docs::examples = "https://example.com"))]
95 #[configurable(metadata(docs::examples = "https://user:password@example.com"))]
96 pub endpoints: Vec<String>,
97
98 #[serde(default = "default_doc_type")]
105 #[configurable(metadata(docs::advanced))]
106 pub doc_type: String,
107
108 #[serde(default)]
112 #[configurable(derived)]
113 pub api_version: ElasticsearchApiVersion,
114
115 #[serde(default)]
121 #[configurable(
122 deprecated = "This option has been deprecated, the `api_version` option should be used instead."
123 )]
124 pub suppress_type_name: bool,
125
126 #[serde(default)]
130 #[configurable(metadata(docs::advanced))]
131 pub request_retry_partial: bool,
132
133 #[serde(default)]
141 #[configurable(metadata(docs::advanced))]
142 #[configurable(metadata(docs::examples = "id"))]
143 #[configurable(metadata(docs::examples = "_id"))]
144 pub id_key: Option<ConfigValuePath>,
145
146 #[serde(default)]
148 #[configurable(metadata(docs::advanced))]
149 #[configurable(metadata(docs::examples = "pipeline-name"))]
150 pub pipeline: Option<String>,
151
152 #[serde(default)]
153 #[configurable(derived)]
154 pub mode: ElasticsearchMode,
155
156 #[serde(default)]
157 #[configurable(derived)]
158 pub compression: Compression,
159
160 #[serde(skip_serializing_if = "crate::serde::is_default", default)]
161 #[configurable(derived)]
162 #[configurable(metadata(docs::advanced))]
163 pub encoding: Transformer,
164
165 #[serde(default)]
166 #[configurable(derived)]
167 pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
168
169 #[serde(default)]
170 #[configurable(derived)]
171 pub request: RequestConfig,
172
173 #[configurable(derived)]
174 pub auth: Option<ElasticsearchAuthConfig>,
175
176 #[serde(default)]
178 #[configurable(metadata(docs::advanced))]
179 #[configurable(metadata(docs::additional_props_description = "A query string parameter."))]
180 #[configurable(metadata(docs::examples = "query_examples()"))]
181 pub query: Option<QueryParameters>,
182
183 #[serde(default)]
184 #[configurable(derived)]
185 #[cfg(feature = "aws-core")]
186 pub aws: Option<crate::aws::RegionOrEndpoint>,
187
188 #[serde(default)]
190 pub opensearch_service_type: OpenSearchServiceType,
191
192 #[serde(default)]
193 #[configurable(derived)]
194 pub tls: Option<TlsConfig>,
195
196 #[serde(default)]
197 #[configurable(derived)]
198 #[serde(rename = "distribution")]
199 pub endpoint_health: Option<HealthConfig>,
200
201 #[serde(alias = "normal", default)]
206 #[configurable(derived)]
207 pub bulk: BulkConfig,
208
209 #[serde(default)]
210 #[configurable(derived)]
211 pub data_stream: Option<DataStreamConfig>,
212
213 #[serde(default)]
214 #[configurable(derived)]
215 pub metrics: Option<MetricToLogConfig>,
216
217 #[serde(
218 default,
219 deserialize_with = "crate::serde::bool_or_struct",
220 skip_serializing_if = "crate::serde::is_default"
221 )]
222 #[configurable(derived)]
223 pub acknowledgements: AcknowledgementsConfig,
224}
225
226fn default_doc_type() -> String {
227 "_doc".to_owned()
228}
229
230fn query_examples() -> HashMap<String, String> {
231 HashMap::<_, _>::from_iter([("X-Powered-By".to_owned(), "Vector".to_owned())])
232}
233
234impl Default for ElasticsearchConfig {
235 fn default() -> Self {
236 Self {
237 endpoint: None,
238 endpoints: vec![],
239 doc_type: default_doc_type(),
240 api_version: Default::default(),
241 suppress_type_name: false,
242 request_retry_partial: false,
243 id_key: None,
244 pipeline: None,
245 mode: Default::default(),
246 compression: Default::default(),
247 encoding: Default::default(),
248 batch: Default::default(),
249 request: Default::default(),
250 auth: None,
251 query: None,
252 #[cfg(feature = "aws-core")]
253 aws: None,
254 opensearch_service_type: Default::default(),
255 tls: None,
256 endpoint_health: None,
257 bulk: BulkConfig::default(), data_stream: None,
259 metrics: None,
260 acknowledgements: Default::default(),
261 }
262 }
263}
264
265impl ElasticsearchConfig {
266 pub fn common_mode(&self) -> crate::Result<ElasticsearchCommonMode> {
267 match self.mode {
268 ElasticsearchMode::Bulk => Ok(ElasticsearchCommonMode::Bulk {
269 index: self.bulk.index.clone(),
270 template_fallback_index: self.bulk.template_fallback_index.clone(),
271 action: self.bulk.action.clone(),
272 version: self.bulk.version.clone(),
273 version_type: self.bulk.version_type,
274 }),
275 ElasticsearchMode::DataStream => Ok(ElasticsearchCommonMode::DataStream(
276 self.data_stream.clone().unwrap_or_default(),
277 )),
278 }
279 }
280}
281
282#[configurable_component]
284#[derive(Clone, Debug, PartialEq)]
285#[serde(rename_all = "snake_case")]
286pub struct BulkConfig {
287 #[serde(default = "default_bulk_action")]
293 #[configurable(metadata(docs::examples = "create"))]
294 #[configurable(metadata(docs::examples = "{{ action }}"))]
295 pub action: Template,
296
297 #[serde(default = "default_index")]
299 #[configurable(metadata(docs::examples = "application-{{ application_id }}-%Y-%m-%d"))]
300 #[configurable(metadata(docs::examples = "{{ index }}"))]
301 pub index: Template,
302
303 #[configurable(metadata(docs::examples = "test-index"))]
305 pub template_fallback_index: Option<String>,
306
307 #[configurable(metadata(docs::examples = "{{ obj_version }}-%Y-%m-%d"))]
309 #[configurable(metadata(docs::examples = "123"))]
310 pub version: Option<Template>,
311
312 #[serde(default = "default_version_type")]
318 #[configurable(metadata(docs::examples = "internal"))]
319 #[configurable(metadata(docs::examples = "external"))]
320 pub version_type: VersionType,
321}
322
323fn default_bulk_action() -> Template {
324 Template::try_from("index").expect("unable to parse template")
325}
326
327fn default_index() -> Template {
328 Template::try_from("vector-%Y.%m.%d").expect("unable to parse template")
329}
330
331const fn default_version_type() -> VersionType {
332 VersionType::Internal
333}
334
335impl Default for BulkConfig {
336 fn default() -> Self {
337 Self {
338 action: default_bulk_action(),
339 index: default_index(),
340 template_fallback_index: Default::default(),
341 version: Default::default(),
342 version_type: default_version_type(),
343 }
344 }
345}
346
347#[configurable_component]
349#[derive(Clone, Debug)]
350#[serde(rename_all = "snake_case")]
351pub struct DataStreamConfig {
352 #[serde(rename = "type", default = "DataStreamConfig::default_type")]
354 #[configurable(metadata(docs::examples = "metrics"))]
355 #[configurable(metadata(docs::examples = "synthetics"))]
356 #[configurable(metadata(docs::examples = "{{ type }}"))]
357 pub dtype: Template,
358
359 #[serde(default = "DataStreamConfig::default_dataset")]
361 #[configurable(metadata(docs::examples = "generic"))]
362 #[configurable(metadata(docs::examples = "nginx"))]
363 #[configurable(metadata(docs::examples = "{{ service }}"))]
364 pub dataset: Template,
365
366 #[serde(default = "DataStreamConfig::default_namespace")]
368 #[configurable(metadata(docs::examples = "{{ environment }}"))]
369 pub namespace: Template,
370
371 #[serde(default = "DataStreamConfig::default_auto_routing")]
380 pub auto_routing: bool,
381
382 #[serde(default = "DataStreamConfig::default_sync_fields")]
386 pub sync_fields: bool,
387}
388
389impl Default for DataStreamConfig {
390 fn default() -> Self {
391 Self {
392 dtype: Self::default_type(),
393 dataset: Self::default_dataset(),
394 namespace: Self::default_namespace(),
395 auto_routing: Self::default_auto_routing(),
396 sync_fields: Self::default_sync_fields(),
397 }
398 }
399}
400
401impl DataStreamConfig {
402 fn default_type() -> Template {
403 Template::try_from("logs").expect("couldn't build default type template")
404 }
405
406 fn default_dataset() -> Template {
407 Template::try_from("generic").expect("couldn't build default dataset template")
408 }
409
410 fn default_namespace() -> Template {
411 Template::try_from("default").expect("couldn't build default namespace template")
412 }
413
414 const fn default_auto_routing() -> bool {
415 true
416 }
417
418 const fn default_sync_fields() -> bool {
419 true
420 }
421
422 pub fn remap_timestamp(&self, log: &mut LogEvent) {
424 if let Some(timestamp_key) = log.timestamp_path().cloned() {
425 if timestamp_key.to_string() == DATA_STREAM_TIMESTAMP_KEY {
426 return;
427 }
428
429 log.rename_key(×tamp_key, event_path!(DATA_STREAM_TIMESTAMP_KEY));
430 }
431 }
432
433 pub fn dtype<'a>(&self, event: impl Into<EventRef<'a>>) -> Option<String> {
434 self.dtype
435 .render_string(event)
436 .map_err(|error| {
437 emit!(TemplateRenderingError {
438 error,
439 field: Some("data_stream.type"),
440 drop_event: true,
441 });
442 })
443 .ok()
444 }
445
446 pub fn dataset<'a>(&self, event: impl Into<EventRef<'a>>) -> Option<String> {
447 self.dataset
448 .render_string(event)
449 .map_err(|error| {
450 emit!(TemplateRenderingError {
451 error,
452 field: Some("data_stream.dataset"),
453 drop_event: true,
454 });
455 })
456 .ok()
457 }
458
459 pub fn namespace<'a>(&self, event: impl Into<EventRef<'a>>) -> Option<String> {
460 self.namespace
461 .render_string(event)
462 .map_err(|error| {
463 emit!(TemplateRenderingError {
464 error,
465 field: Some("data_stream.namespace"),
466 drop_event: true,
467 });
468 })
469 .ok()
470 }
471
472 pub fn sync_fields(&self, log: &mut LogEvent) {
473 if !self.sync_fields {
474 return;
475 }
476
477 let dtype = self.dtype(&*log);
478 let dataset = self.dataset(&*log);
479 let namespace = self.namespace(&*log);
480
481 if log.as_map().is_none() {
482 *log.value_mut() = Value::Object(BTreeMap::new());
483 }
484 let existing = log
485 .as_map_mut()
486 .expect("must be a map")
487 .entry("data_stream".into())
488 .or_insert_with(|| Value::Object(BTreeMap::new()))
489 .as_object_mut_unwrap();
490
491 if let Some(dtype) = dtype {
492 existing
493 .entry("type".into())
494 .or_insert_with(|| dtype.into());
495 }
496 if let Some(dataset) = dataset {
497 existing
498 .entry("dataset".into())
499 .or_insert_with(|| dataset.into());
500 }
501 if let Some(namespace) = namespace {
502 existing
503 .entry("namespace".into())
504 .or_insert_with(|| namespace.into());
505 }
506 }
507
508 pub fn index(&self, log: &LogEvent) -> Option<String> {
509 let (dtype, dataset, namespace) = if !self.auto_routing {
510 (self.dtype(log)?, self.dataset(log)?, self.namespace(log)?)
511 } else {
512 let data_stream = log
513 .get(event_path!("data_stream"))
514 .and_then(|ds| ds.as_object());
515 let dtype = data_stream
516 .and_then(|ds| ds.get("type"))
517 .map(|value| value.to_string_lossy().into_owned())
518 .or_else(|| self.dtype(log))?;
519 let dataset = data_stream
520 .and_then(|ds| ds.get("dataset"))
521 .map(|value| value.to_string_lossy().into_owned())
522 .or_else(|| self.dataset(log))?;
523 let namespace = data_stream
524 .and_then(|ds| ds.get("namespace"))
525 .map(|value| value.to_string_lossy().into_owned())
526 .or_else(|| self.namespace(log))?;
527 (dtype, dataset, namespace)
528 };
529
530 let name = [dtype, dataset, namespace]
531 .into_iter()
532 .filter(|s| !s.is_empty())
533 .collect::<Vec<_>>()
534 .join("-");
535
536 Some(name)
537 }
538}
539
540#[async_trait::async_trait]
541#[typetag::serde(name = "elasticsearch")]
542impl SinkConfig for ElasticsearchConfig {
543 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
544 let commons = ElasticsearchCommon::parse_many(self, cx.proxy()).await?;
545 let common = commons[0].clone();
546
547 let client = HttpClient::new(common.tls_settings.clone(), cx.proxy())?;
548
549 let request_limits = self.request.tower.into_settings();
550
551 let health_config = self.endpoint_health.clone().unwrap_or_default();
552
553 let services = commons
554 .iter()
555 .cloned()
556 .map(|common| {
557 let endpoint = common.base_url.clone();
558
559 let http_request_builder = HttpRequestBuilder::new(&common, self);
560 let service = ElasticsearchService::new(client.clone(), http_request_builder);
561
562 (endpoint, service)
563 })
564 .collect::<Vec<_>>();
565
566 let service = request_limits.distributed_service(
567 ElasticsearchRetryLogic {
568 retry_partial: self.request_retry_partial,
569 },
570 services,
571 health_config,
572 ElasticsearchHealthLogic,
573 1,
574 );
575
576 let sink = ElasticsearchSink::new(&common, self, service)?;
577
578 let stream = VectorSink::from_event_streamsink(sink);
579
580 let healthcheck = futures::future::select_ok(
581 commons
582 .into_iter()
583 .map(move |common| common.healthcheck(client.clone()).boxed()),
584 )
585 .map_ok(|((), _)| ())
586 .boxed();
587 Ok((stream, healthcheck))
588 }
589
590 fn input(&self) -> Input {
591 let requirements = Requirement::empty().optional_meaning("timestamp", Kind::timestamp());
592
593 Input::new(DataType::Metric | DataType::Log).with_schema_requirement(requirements)
594 }
595
596 fn acknowledgements(&self) -> &AcknowledgementsConfig {
597 &self.acknowledgements
598 }
599}
600
601#[cfg(test)]
602mod tests {
603 use super::*;
604
605 #[test]
606 fn generate_config() {
607 crate::test_util::test_generate_config::<ElasticsearchConfig>();
608 }
609
610 #[test]
611 fn parse_aws_auth() {
612 toml::from_str::<ElasticsearchConfig>(
613 r#"
614 endpoints = [""]
615 auth.strategy = "aws"
616 auth.assume_role = "role"
617 "#,
618 )
619 .unwrap();
620
621 toml::from_str::<ElasticsearchConfig>(
622 r#"
623 endpoints = [""]
624 auth.strategy = "aws"
625 "#,
626 )
627 .unwrap();
628 }
629
630 #[test]
631 fn parse_mode() {
632 let config = toml::from_str::<ElasticsearchConfig>(
633 r#"
634 endpoints = [""]
635 mode = "data_stream"
636 data_stream.type = "synthetics"
637 "#,
638 )
639 .unwrap();
640 assert!(matches!(config.mode, ElasticsearchMode::DataStream));
641 assert!(config.data_stream.is_some());
642 }
643
644 #[test]
645 fn parse_distribution() {
646 toml::from_str::<ElasticsearchConfig>(
647 r#"
648 endpoints = ["", ""]
649 distribution.retry_initial_backoff_secs = 10
650 "#,
651 )
652 .unwrap();
653 }
654
655 #[test]
656 fn parse_version() {
657 let config = toml::from_str::<ElasticsearchConfig>(
658 r#"
659 endpoints = [""]
660 api_version = "v7"
661 "#,
662 )
663 .unwrap();
664 assert_eq!(config.api_version, ElasticsearchApiVersion::V7);
665 }
666
667 #[test]
668 fn parse_version_auto() {
669 let config = toml::from_str::<ElasticsearchConfig>(
670 r#"
671 endpoints = [""]
672 api_version = "auto"
673 "#,
674 )
675 .unwrap();
676 assert_eq!(config.api_version, ElasticsearchApiVersion::Auto);
677 }
678
679 #[test]
680 fn parse_default_bulk() {
681 let config = toml::from_str::<ElasticsearchConfig>(
682 r#"
683 endpoints = [""]
684 "#,
685 )
686 .unwrap();
687 assert_eq!(config.mode, ElasticsearchMode::Bulk);
688 assert_eq!(config.bulk, BulkConfig::default());
689 }
690
691 #[test]
692 fn parse_opensearch_service_type_managed() {
693 let config = toml::from_str::<ElasticsearchConfig>(
694 r#"
695 endpoints = [""]
696 opensearch_service_type = "managed"
697 "#,
698 )
699 .unwrap();
700 assert_eq!(
701 config.opensearch_service_type,
702 OpenSearchServiceType::Managed
703 );
704 }
705
706 #[test]
707 fn parse_opensearch_service_type_serverless() {
708 let config = toml::from_str::<ElasticsearchConfig>(
709 r#"
710 endpoints = [""]
711 opensearch_service_type = "serverless"
712 auth.strategy = "aws"
713 api_version = "auto"
714 "#,
715 )
716 .unwrap();
717 assert_eq!(
718 config.opensearch_service_type,
719 OpenSearchServiceType::Serverless
720 );
721 }
722
723 #[test]
724 fn parse_opensearch_service_type_default() {
725 let config = toml::from_str::<ElasticsearchConfig>(
726 r#"
727 endpoints = [""]
728 "#,
729 )
730 .unwrap();
731 assert_eq!(
732 config.opensearch_service_type,
733 OpenSearchServiceType::Managed
734 );
735 }
736
737 #[cfg(feature = "aws-core")]
738 #[test]
739 fn parse_opensearch_serverless_with_aws_auth() {
740 let config = toml::from_str::<ElasticsearchConfig>(
741 r#"
742 endpoints = [""]
743 opensearch_service_type = "serverless"
744 auth.strategy = "aws"
745 api_version = "auto"
746 "#,
747 )
748 .unwrap();
749 assert_eq!(
750 config.opensearch_service_type,
751 OpenSearchServiceType::Serverless
752 );
753 assert!(matches!(config.auth, Some(ElasticsearchAuthConfig::Aws(_))));
754 assert_eq!(config.api_version, ElasticsearchApiVersion::Auto);
755 }
756}