1use std::{
2 collections::{BTreeMap, HashMap},
3 convert::TryFrom,
4};
5
6use futures::{FutureExt, TryFutureExt};
7use vector_lib::{
8 configurable::configurable_component,
9 lookup::{event_path, lookup_v2::ConfigValuePath},
10 schema::Requirement,
11};
12use vrl::value::Kind;
13
14use crate::{
15 codecs::Transformer,
16 config::{AcknowledgementsConfig, DataType, Input, SinkConfig, SinkContext},
17 event::{EventRef, LogEvent, Value},
18 http::{HttpClient, QueryParameters},
19 internal_events::TemplateRenderingError,
20 sinks::{
21 Healthcheck, VectorSink,
22 elasticsearch::{
23 ElasticsearchApiVersion, ElasticsearchAuthConfig, ElasticsearchCommon,
24 ElasticsearchCommonMode, ElasticsearchMode, VersionType,
25 health::ElasticsearchHealthLogic,
26 retry::ElasticsearchRetryLogic,
27 service::{ElasticsearchService, HttpRequestBuilder},
28 sink::ElasticsearchSink,
29 },
30 util::{
31 BatchConfig, Compression, RealtimeSizeBasedDefaultBatchSettings, http::RequestConfig,
32 service::HealthConfig,
33 },
34 },
35 template::Template,
36 tls::TlsConfig,
37 transforms::metric_to_log::MetricToLogConfig,
38};
39
40pub const DATA_STREAM_TIMESTAMP_KEY: &str = "@timestamp";
42
43#[configurable_component]
47#[derive(Clone, Debug, Eq, PartialEq)]
48#[serde(deny_unknown_fields, rename_all = "lowercase")]
49pub enum OpenSearchServiceType {
50 Managed,
52 Serverless,
54}
55
56impl OpenSearchServiceType {
57 pub const fn as_str(&self) -> &'static str {
58 match self {
59 OpenSearchServiceType::Managed => "es",
60 OpenSearchServiceType::Serverless => "aoss",
61 }
62 }
63}
64
65impl Default for OpenSearchServiceType {
66 fn default() -> Self {
67 Self::Managed
68 }
69}
70
71#[configurable_component(sink("elasticsearch", "Index observability events in Elasticsearch."))]
73#[derive(Clone, Debug)]
74#[serde(deny_unknown_fields)]
75pub struct ElasticsearchConfig {
76 #[serde(default)]
81 #[configurable(
82 deprecated = "This option has been deprecated, the `endpoints` option should be used instead."
83 )]
84 pub endpoint: Option<String>,
85
86 #[serde(default)]
97 #[configurable(metadata(docs::examples = "http://10.24.32.122:9000"))]
98 #[configurable(metadata(docs::examples = "https://example.com"))]
99 #[configurable(metadata(docs::examples = "https://user:password@example.com"))]
100 pub endpoints: Vec<String>,
101
102 #[serde(default = "default_doc_type")]
109 #[configurable(metadata(docs::advanced))]
110 pub doc_type: String,
111
112 #[serde(default)]
116 #[configurable(derived)]
117 pub api_version: ElasticsearchApiVersion,
118
119 #[serde(default)]
125 #[configurable(
126 deprecated = "This option has been deprecated, the `api_version` option should be used instead."
127 )]
128 pub suppress_type_name: bool,
129
130 #[serde(default)]
134 #[configurable(metadata(docs::advanced))]
135 pub request_retry_partial: bool,
136
137 #[serde(default)]
145 #[configurable(metadata(docs::advanced))]
146 #[configurable(metadata(docs::examples = "id"))]
147 #[configurable(metadata(docs::examples = "_id"))]
148 pub id_key: Option<ConfigValuePath>,
149
150 #[serde(default)]
152 #[configurable(metadata(docs::advanced))]
153 #[configurable(metadata(docs::examples = "pipeline-name"))]
154 pub pipeline: Option<String>,
155
156 #[serde(default)]
157 #[configurable(derived)]
158 pub mode: ElasticsearchMode,
159
160 #[serde(default)]
161 #[configurable(derived)]
162 pub compression: Compression,
163
164 #[serde(skip_serializing_if = "crate::serde::is_default", default)]
165 #[configurable(derived)]
166 #[configurable(metadata(docs::advanced))]
167 pub encoding: Transformer,
168
169 #[serde(default)]
170 #[configurable(derived)]
171 pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
172
173 #[serde(default)]
174 #[configurable(derived)]
175 pub request: RequestConfig,
176
177 #[configurable(derived)]
178 pub auth: Option<ElasticsearchAuthConfig>,
179
180 #[serde(default)]
182 #[configurable(metadata(docs::advanced))]
183 #[configurable(metadata(docs::additional_props_description = "A query string parameter."))]
184 #[configurable(metadata(docs::examples = "query_examples()"))]
185 pub query: Option<QueryParameters>,
186
187 #[serde(default)]
188 #[configurable(derived)]
189 #[cfg(feature = "aws-core")]
190 pub aws: Option<crate::aws::RegionOrEndpoint>,
191
192 #[serde(default)]
194 pub opensearch_service_type: OpenSearchServiceType,
195
196 #[serde(default)]
197 #[configurable(derived)]
198 pub tls: Option<TlsConfig>,
199
200 #[serde(default)]
201 #[configurable(derived)]
202 #[serde(rename = "distribution")]
203 pub endpoint_health: Option<HealthConfig>,
204
205 #[serde(alias = "normal", default)]
210 #[configurable(derived)]
211 pub bulk: BulkConfig,
212
213 #[serde(default)]
214 #[configurable(derived)]
215 pub data_stream: Option<DataStreamConfig>,
216
217 #[serde(default)]
218 #[configurable(derived)]
219 pub metrics: Option<MetricToLogConfig>,
220
221 #[serde(
222 default,
223 deserialize_with = "crate::serde::bool_or_struct",
224 skip_serializing_if = "crate::serde::is_default"
225 )]
226 #[configurable(derived)]
227 pub acknowledgements: AcknowledgementsConfig,
228}
229
230fn default_doc_type() -> String {
231 "_doc".to_owned()
232}
233
234fn query_examples() -> HashMap<String, String> {
235 HashMap::<_, _>::from_iter([("X-Powered-By".to_owned(), "Vector".to_owned())])
236}
237
238impl Default for ElasticsearchConfig {
239 fn default() -> Self {
240 Self {
241 endpoint: None,
242 endpoints: vec![],
243 doc_type: default_doc_type(),
244 api_version: Default::default(),
245 suppress_type_name: false,
246 request_retry_partial: false,
247 id_key: None,
248 pipeline: None,
249 mode: Default::default(),
250 compression: Default::default(),
251 encoding: Default::default(),
252 batch: Default::default(),
253 request: Default::default(),
254 auth: None,
255 query: None,
256 #[cfg(feature = "aws-core")]
257 aws: None,
258 opensearch_service_type: Default::default(),
259 tls: None,
260 endpoint_health: None,
261 bulk: BulkConfig::default(), data_stream: None,
263 metrics: None,
264 acknowledgements: Default::default(),
265 }
266 }
267}
268
269impl ElasticsearchConfig {
270 pub fn common_mode(&self) -> crate::Result<ElasticsearchCommonMode> {
271 match self.mode {
272 ElasticsearchMode::Bulk => Ok(ElasticsearchCommonMode::Bulk {
273 index: self.bulk.index.clone(),
274 template_fallback_index: self.bulk.template_fallback_index.clone(),
275 action: self.bulk.action.clone(),
276 version: self.bulk.version.clone(),
277 version_type: self.bulk.version_type,
278 }),
279 ElasticsearchMode::DataStream => Ok(ElasticsearchCommonMode::DataStream(
280 self.data_stream.clone().unwrap_or_default(),
281 )),
282 }
283 }
284}
285
286#[configurable_component]
288#[derive(Clone, Debug, PartialEq)]
289#[serde(rename_all = "snake_case")]
290pub struct BulkConfig {
291 #[serde(default = "default_bulk_action")]
297 #[configurable(metadata(docs::examples = "create"))]
298 #[configurable(metadata(docs::examples = "{{ action }}"))]
299 pub action: Template,
300
301 #[serde(default = "default_index")]
303 #[configurable(metadata(docs::examples = "application-{{ application_id }}-%Y-%m-%d"))]
304 #[configurable(metadata(docs::examples = "{{ index }}"))]
305 pub index: Template,
306
307 #[configurable(metadata(docs::examples = "test-index"))]
309 pub template_fallback_index: Option<String>,
310
311 #[configurable(metadata(docs::examples = "{{ obj_version }}-%Y-%m-%d"))]
313 #[configurable(metadata(docs::examples = "123"))]
314 pub version: Option<Template>,
315
316 #[serde(default = "default_version_type")]
322 #[configurable(metadata(docs::examples = "internal"))]
323 #[configurable(metadata(docs::examples = "external"))]
324 pub version_type: VersionType,
325}
326
327fn default_bulk_action() -> Template {
328 Template::try_from("index").expect("unable to parse template")
329}
330
331fn default_index() -> Template {
332 Template::try_from("vector-%Y.%m.%d").expect("unable to parse template")
333}
334
335const fn default_version_type() -> VersionType {
336 VersionType::Internal
337}
338
339impl Default for BulkConfig {
340 fn default() -> Self {
341 Self {
342 action: default_bulk_action(),
343 index: default_index(),
344 template_fallback_index: Default::default(),
345 version: Default::default(),
346 version_type: default_version_type(),
347 }
348 }
349}
350
351#[configurable_component]
353#[derive(Clone, Debug)]
354#[serde(rename_all = "snake_case")]
355pub struct DataStreamConfig {
356 #[serde(rename = "type", default = "DataStreamConfig::default_type")]
358 #[configurable(metadata(docs::examples = "metrics"))]
359 #[configurable(metadata(docs::examples = "synthetics"))]
360 #[configurable(metadata(docs::examples = "{{ type }}"))]
361 pub dtype: Template,
362
363 #[serde(default = "DataStreamConfig::default_dataset")]
365 #[configurable(metadata(docs::examples = "generic"))]
366 #[configurable(metadata(docs::examples = "nginx"))]
367 #[configurable(metadata(docs::examples = "{{ service }}"))]
368 pub dataset: Template,
369
370 #[serde(default = "DataStreamConfig::default_namespace")]
372 #[configurable(metadata(docs::examples = "{{ environment }}"))]
373 pub namespace: Template,
374
375 #[serde(default = "DataStreamConfig::default_auto_routing")]
384 pub auto_routing: bool,
385
386 #[serde(default = "DataStreamConfig::default_sync_fields")]
390 pub sync_fields: bool,
391}
392
393impl Default for DataStreamConfig {
394 fn default() -> Self {
395 Self {
396 dtype: Self::default_type(),
397 dataset: Self::default_dataset(),
398 namespace: Self::default_namespace(),
399 auto_routing: Self::default_auto_routing(),
400 sync_fields: Self::default_sync_fields(),
401 }
402 }
403}
404
405impl DataStreamConfig {
406 fn default_type() -> Template {
407 Template::try_from("logs").expect("couldn't build default type template")
408 }
409
410 fn default_dataset() -> Template {
411 Template::try_from("generic").expect("couldn't build default dataset template")
412 }
413
414 fn default_namespace() -> Template {
415 Template::try_from("default").expect("couldn't build default namespace template")
416 }
417
418 const fn default_auto_routing() -> bool {
419 true
420 }
421
422 const fn default_sync_fields() -> bool {
423 true
424 }
425
426 pub fn remap_timestamp(&self, log: &mut LogEvent) {
428 if let Some(timestamp_key) = log.timestamp_path().cloned() {
429 if timestamp_key.to_string() == DATA_STREAM_TIMESTAMP_KEY {
430 return;
431 }
432
433 log.rename_key(×tamp_key, event_path!(DATA_STREAM_TIMESTAMP_KEY));
434 }
435 }
436
437 pub fn dtype<'a>(&self, event: impl Into<EventRef<'a>>) -> Option<String> {
438 self.dtype
439 .render_string(event)
440 .map_err(|error| {
441 emit!(TemplateRenderingError {
442 error,
443 field: Some("data_stream.type"),
444 drop_event: true,
445 });
446 })
447 .ok()
448 }
449
450 pub fn dataset<'a>(&self, event: impl Into<EventRef<'a>>) -> Option<String> {
451 self.dataset
452 .render_string(event)
453 .map_err(|error| {
454 emit!(TemplateRenderingError {
455 error,
456 field: Some("data_stream.dataset"),
457 drop_event: true,
458 });
459 })
460 .ok()
461 }
462
463 pub fn namespace<'a>(&self, event: impl Into<EventRef<'a>>) -> Option<String> {
464 self.namespace
465 .render_string(event)
466 .map_err(|error| {
467 emit!(TemplateRenderingError {
468 error,
469 field: Some("data_stream.namespace"),
470 drop_event: true,
471 });
472 })
473 .ok()
474 }
475
476 pub fn sync_fields(&self, log: &mut LogEvent) {
477 if !self.sync_fields {
478 return;
479 }
480
481 let dtype = self.dtype(&*log);
482 let dataset = self.dataset(&*log);
483 let namespace = self.namespace(&*log);
484
485 if log.as_map().is_none() {
486 *log.value_mut() = Value::Object(BTreeMap::new());
487 }
488 let existing = log
489 .as_map_mut()
490 .expect("must be a map")
491 .entry("data_stream".into())
492 .or_insert_with(|| Value::Object(BTreeMap::new()))
493 .as_object_mut_unwrap();
494
495 if let Some(dtype) = dtype {
496 existing
497 .entry("type".into())
498 .or_insert_with(|| dtype.into());
499 }
500 if let Some(dataset) = dataset {
501 existing
502 .entry("dataset".into())
503 .or_insert_with(|| dataset.into());
504 }
505 if let Some(namespace) = namespace {
506 existing
507 .entry("namespace".into())
508 .or_insert_with(|| namespace.into());
509 }
510 }
511
512 pub fn index(&self, log: &LogEvent) -> Option<String> {
513 let (dtype, dataset, namespace) = if !self.auto_routing {
514 (self.dtype(log)?, self.dataset(log)?, self.namespace(log)?)
515 } else {
516 let data_stream = log
517 .get(event_path!("data_stream"))
518 .and_then(|ds| ds.as_object());
519 let dtype = data_stream
520 .and_then(|ds| ds.get("type"))
521 .map(|value| value.to_string_lossy().into_owned())
522 .or_else(|| self.dtype(log))?;
523 let dataset = data_stream
524 .and_then(|ds| ds.get("dataset"))
525 .map(|value| value.to_string_lossy().into_owned())
526 .or_else(|| self.dataset(log))?;
527 let namespace = data_stream
528 .and_then(|ds| ds.get("namespace"))
529 .map(|value| value.to_string_lossy().into_owned())
530 .or_else(|| self.namespace(log))?;
531 (dtype, dataset, namespace)
532 };
533
534 let name = [dtype, dataset, namespace]
535 .into_iter()
536 .filter(|s| !s.is_empty())
537 .collect::<Vec<_>>()
538 .join("-");
539
540 Some(name)
541 }
542}
543
544#[async_trait::async_trait]
545#[typetag::serde(name = "elasticsearch")]
546impl SinkConfig for ElasticsearchConfig {
547 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
548 let commons = ElasticsearchCommon::parse_many(self, cx.proxy()).await?;
549 let common = commons[0].clone();
550
551 let client = HttpClient::new(common.tls_settings.clone(), cx.proxy())?;
552
553 let request_limits = self.request.tower.into_settings();
554
555 let health_config = self.endpoint_health.clone().unwrap_or_default();
556
557 let services = commons
558 .iter()
559 .cloned()
560 .map(|common| {
561 let endpoint = common.base_url.clone();
562
563 let http_request_builder = HttpRequestBuilder::new(&common, self);
564 let service = ElasticsearchService::new(client.clone(), http_request_builder);
565
566 (endpoint, service)
567 })
568 .collect::<Vec<_>>();
569
570 let service = request_limits.distributed_service(
571 ElasticsearchRetryLogic {
572 retry_partial: self.request_retry_partial,
573 },
574 services,
575 health_config,
576 ElasticsearchHealthLogic,
577 1,
578 );
579
580 let sink = ElasticsearchSink::new(&common, self, service)?;
581
582 let stream = VectorSink::from_event_streamsink(sink);
583
584 let healthcheck = futures::future::select_ok(
585 commons
586 .into_iter()
587 .map(move |common| common.healthcheck(client.clone()).boxed()),
588 )
589 .map_ok(|((), _)| ())
590 .boxed();
591 Ok((stream, healthcheck))
592 }
593
594 fn input(&self) -> Input {
595 let requirements = Requirement::empty().optional_meaning("timestamp", Kind::timestamp());
596
597 Input::new(DataType::Metric | DataType::Log).with_schema_requirement(requirements)
598 }
599
600 fn acknowledgements(&self) -> &AcknowledgementsConfig {
601 &self.acknowledgements
602 }
603}
604
605#[cfg(test)]
606mod tests {
607 use super::*;
608
609 #[test]
610 fn generate_config() {
611 crate::test_util::test_generate_config::<ElasticsearchConfig>();
612 }
613
614 #[test]
615 fn parse_aws_auth() {
616 toml::from_str::<ElasticsearchConfig>(
617 r#"
618 endpoints = [""]
619 auth.strategy = "aws"
620 auth.assume_role = "role"
621 "#,
622 )
623 .unwrap();
624
625 toml::from_str::<ElasticsearchConfig>(
626 r#"
627 endpoints = [""]
628 auth.strategy = "aws"
629 "#,
630 )
631 .unwrap();
632 }
633
634 #[test]
635 fn parse_mode() {
636 let config = toml::from_str::<ElasticsearchConfig>(
637 r#"
638 endpoints = [""]
639 mode = "data_stream"
640 data_stream.type = "synthetics"
641 "#,
642 )
643 .unwrap();
644 assert!(matches!(config.mode, ElasticsearchMode::DataStream));
645 assert!(config.data_stream.is_some());
646 }
647
648 #[test]
649 fn parse_distribution() {
650 toml::from_str::<ElasticsearchConfig>(
651 r#"
652 endpoints = ["", ""]
653 distribution.retry_initial_backoff_secs = 10
654 "#,
655 )
656 .unwrap();
657 }
658
659 #[test]
660 fn parse_version() {
661 let config = toml::from_str::<ElasticsearchConfig>(
662 r#"
663 endpoints = [""]
664 api_version = "v7"
665 "#,
666 )
667 .unwrap();
668 assert_eq!(config.api_version, ElasticsearchApiVersion::V7);
669 }
670
671 #[test]
672 fn parse_version_auto() {
673 let config = toml::from_str::<ElasticsearchConfig>(
674 r#"
675 endpoints = [""]
676 api_version = "auto"
677 "#,
678 )
679 .unwrap();
680 assert_eq!(config.api_version, ElasticsearchApiVersion::Auto);
681 }
682
683 #[test]
684 fn parse_default_bulk() {
685 let config = toml::from_str::<ElasticsearchConfig>(
686 r#"
687 endpoints = [""]
688 "#,
689 )
690 .unwrap();
691 assert_eq!(config.mode, ElasticsearchMode::Bulk);
692 assert_eq!(config.bulk, BulkConfig::default());
693 }
694
695 #[test]
696 fn parse_opensearch_service_type_managed() {
697 let config = toml::from_str::<ElasticsearchConfig>(
698 r#"
699 endpoints = [""]
700 opensearch_service_type = "managed"
701 "#,
702 )
703 .unwrap();
704 assert_eq!(
705 config.opensearch_service_type,
706 OpenSearchServiceType::Managed
707 );
708 }
709
710 #[test]
711 fn parse_opensearch_service_type_serverless() {
712 let config = toml::from_str::<ElasticsearchConfig>(
713 r#"
714 endpoints = [""]
715 opensearch_service_type = "serverless"
716 auth.strategy = "aws"
717 api_version = "auto"
718 "#,
719 )
720 .unwrap();
721 assert_eq!(
722 config.opensearch_service_type,
723 OpenSearchServiceType::Serverless
724 );
725 }
726
727 #[test]
728 fn parse_opensearch_service_type_default() {
729 let config = toml::from_str::<ElasticsearchConfig>(
730 r#"
731 endpoints = [""]
732 "#,
733 )
734 .unwrap();
735 assert_eq!(
736 config.opensearch_service_type,
737 OpenSearchServiceType::Managed
738 );
739 }
740
741 #[cfg(feature = "aws-core")]
742 #[test]
743 fn parse_opensearch_serverless_with_aws_auth() {
744 let config = toml::from_str::<ElasticsearchConfig>(
745 r#"
746 endpoints = [""]
747 opensearch_service_type = "serverless"
748 auth.strategy = "aws"
749 api_version = "auto"
750 "#,
751 )
752 .unwrap();
753 assert_eq!(
754 config.opensearch_service_type,
755 OpenSearchServiceType::Serverless
756 );
757 assert!(matches!(config.auth, Some(ElasticsearchAuthConfig::Aws(_))));
758 assert_eq!(config.api_version, ElasticsearchApiVersion::Auto);
759 }
760}