1use std::{
2 collections::{BTreeMap, HashMap},
3 convert::TryFrom,
4};
5
6use futures::{FutureExt, TryFutureExt};
7use vector_lib::configurable::configurable_component;
8
9use crate::{
10 codecs::Transformer,
11 config::{AcknowledgementsConfig, DataType, Input, SinkConfig, SinkContext},
12 event::{EventRef, LogEvent, Value},
13 http::{HttpClient, QueryParameters},
14 internal_events::TemplateRenderingError,
15 sinks::{
16 elasticsearch::{
17 health::ElasticsearchHealthLogic,
18 retry::ElasticsearchRetryLogic,
19 service::{ElasticsearchService, HttpRequestBuilder},
20 sink::ElasticsearchSink,
21 ElasticsearchApiVersion, ElasticsearchAuthConfig, ElasticsearchCommon,
22 ElasticsearchCommonMode, ElasticsearchMode, VersionType,
23 },
24 util::{
25 http::RequestConfig, service::HealthConfig, BatchConfig, Compression,
26 RealtimeSizeBasedDefaultBatchSettings,
27 },
28 Healthcheck, VectorSink,
29 },
30 template::Template,
31 tls::TlsConfig,
32 transforms::metric_to_log::MetricToLogConfig,
33};
34use vector_lib::lookup::event_path;
35use vector_lib::lookup::lookup_v2::ConfigValuePath;
36use vector_lib::schema::Requirement;
37use vrl::value::Kind;
38
39pub const DATA_STREAM_TIMESTAMP_KEY: &str = "@timestamp";
41
42#[configurable_component]
46#[derive(Clone, Debug, Eq, PartialEq)]
47#[serde(deny_unknown_fields, rename_all = "lowercase")]
48pub enum OpenSearchServiceType {
49 Managed,
51 Serverless,
53}
54
55impl OpenSearchServiceType {
56 pub const fn as_str(&self) -> &'static str {
57 match self {
58 OpenSearchServiceType::Managed => "es",
59 OpenSearchServiceType::Serverless => "aoss",
60 }
61 }
62}
63
64impl Default for OpenSearchServiceType {
65 fn default() -> Self {
66 Self::Managed
67 }
68}
69
70#[configurable_component(sink("elasticsearch", "Index observability events in Elasticsearch."))]
72#[derive(Clone, Debug)]
73#[serde(deny_unknown_fields)]
74pub struct ElasticsearchConfig {
75 #[serde(default)]
80 #[configurable(
81 deprecated = "This option has been deprecated, the `endpoints` option should be used instead."
82 )]
83 pub endpoint: Option<String>,
84
85 #[serde(default)]
96 #[configurable(metadata(docs::examples = "http://10.24.32.122:9000"))]
97 #[configurable(metadata(docs::examples = "https://example.com"))]
98 #[configurable(metadata(docs::examples = "https://user:password@example.com"))]
99 pub endpoints: Vec<String>,
100
101 #[serde(default = "default_doc_type")]
108 #[configurable(metadata(docs::advanced))]
109 pub doc_type: String,
110
111 #[serde(default)]
115 #[configurable(derived)]
116 pub api_version: ElasticsearchApiVersion,
117
118 #[serde(default)]
124 #[configurable(
125 deprecated = "This option has been deprecated, the `api_version` option should be used instead."
126 )]
127 pub suppress_type_name: bool,
128
129 #[serde(default)]
133 #[configurable(metadata(docs::advanced))]
134 pub request_retry_partial: bool,
135
136 #[serde(default)]
144 #[configurable(metadata(docs::advanced))]
145 #[configurable(metadata(docs::examples = "id"))]
146 #[configurable(metadata(docs::examples = "_id"))]
147 pub id_key: Option<ConfigValuePath>,
148
149 #[serde(default)]
151 #[configurable(metadata(docs::advanced))]
152 #[configurable(metadata(docs::examples = "pipeline-name"))]
153 pub pipeline: Option<String>,
154
155 #[serde(default)]
156 #[configurable(derived)]
157 pub mode: ElasticsearchMode,
158
159 #[serde(default)]
160 #[configurable(derived)]
161 pub compression: Compression,
162
163 #[serde(skip_serializing_if = "crate::serde::is_default", default)]
164 #[configurable(derived)]
165 #[configurable(metadata(docs::advanced))]
166 pub encoding: Transformer,
167
168 #[serde(default)]
169 #[configurable(derived)]
170 pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
171
172 #[serde(default)]
173 #[configurable(derived)]
174 pub request: RequestConfig,
175
176 #[configurable(derived)]
177 pub auth: Option<ElasticsearchAuthConfig>,
178
179 #[serde(default)]
181 #[configurable(metadata(docs::advanced))]
182 #[configurable(metadata(docs::additional_props_description = "A query string parameter."))]
183 #[configurable(metadata(docs::examples = "query_examples()"))]
184 pub query: Option<QueryParameters>,
185
186 #[serde(default)]
187 #[configurable(derived)]
188 #[cfg(feature = "aws-core")]
189 pub aws: Option<crate::aws::RegionOrEndpoint>,
190
191 #[serde(default)]
193 pub opensearch_service_type: OpenSearchServiceType,
194
195 #[serde(default)]
196 #[configurable(derived)]
197 pub tls: Option<TlsConfig>,
198
199 #[serde(default)]
200 #[configurable(derived)]
201 #[serde(rename = "distribution")]
202 pub endpoint_health: Option<HealthConfig>,
203
204 #[serde(alias = "normal", default)]
209 #[configurable(derived)]
210 pub bulk: BulkConfig,
211
212 #[serde(default)]
213 #[configurable(derived)]
214 pub data_stream: Option<DataStreamConfig>,
215
216 #[serde(default)]
217 #[configurable(derived)]
218 pub metrics: Option<MetricToLogConfig>,
219
220 #[serde(
221 default,
222 deserialize_with = "crate::serde::bool_or_struct",
223 skip_serializing_if = "crate::serde::is_default"
224 )]
225 #[configurable(derived)]
226 pub acknowledgements: AcknowledgementsConfig,
227}
228
229fn default_doc_type() -> String {
230 "_doc".to_owned()
231}
232
233fn query_examples() -> HashMap<String, String> {
234 HashMap::<_, _>::from_iter([("X-Powered-By".to_owned(), "Vector".to_owned())])
235}
236
237impl Default for ElasticsearchConfig {
238 fn default() -> Self {
239 Self {
240 endpoint: None,
241 endpoints: vec![],
242 doc_type: default_doc_type(),
243 api_version: Default::default(),
244 suppress_type_name: false,
245 request_retry_partial: false,
246 id_key: None,
247 pipeline: None,
248 mode: Default::default(),
249 compression: Default::default(),
250 encoding: Default::default(),
251 batch: Default::default(),
252 request: Default::default(),
253 auth: None,
254 query: None,
255 #[cfg(feature = "aws-core")]
256 aws: None,
257 opensearch_service_type: Default::default(),
258 tls: None,
259 endpoint_health: None,
260 bulk: BulkConfig::default(), data_stream: None,
262 metrics: None,
263 acknowledgements: Default::default(),
264 }
265 }
266}
267
268impl ElasticsearchConfig {
269 pub fn common_mode(&self) -> crate::Result<ElasticsearchCommonMode> {
270 match self.mode {
271 ElasticsearchMode::Bulk => Ok(ElasticsearchCommonMode::Bulk {
272 index: self.bulk.index.clone(),
273 template_fallback_index: self.bulk.template_fallback_index.clone(),
274 action: self.bulk.action.clone(),
275 version: self.bulk.version.clone(),
276 version_type: self.bulk.version_type,
277 }),
278 ElasticsearchMode::DataStream => Ok(ElasticsearchCommonMode::DataStream(
279 self.data_stream.clone().unwrap_or_default(),
280 )),
281 }
282 }
283}
284
285#[configurable_component]
287#[derive(Clone, Debug, PartialEq)]
288#[serde(rename_all = "snake_case")]
289pub struct BulkConfig {
290 #[serde(default = "default_bulk_action")]
296 #[configurable(metadata(docs::examples = "create"))]
297 #[configurable(metadata(docs::examples = "{{ action }}"))]
298 pub action: Template,
299
300 #[serde(default = "default_index")]
302 #[configurable(metadata(docs::examples = "application-{{ application_id }}-%Y-%m-%d"))]
303 #[configurable(metadata(docs::examples = "{{ index }}"))]
304 pub index: Template,
305
306 #[configurable(metadata(docs::examples = "test-index"))]
308 pub template_fallback_index: Option<String>,
309
310 #[configurable(metadata(docs::examples = "{{ obj_version }}-%Y-%m-%d"))]
312 #[configurable(metadata(docs::examples = "123"))]
313 pub version: Option<Template>,
314
315 #[serde(default = "default_version_type")]
321 #[configurable(metadata(docs::examples = "internal"))]
322 #[configurable(metadata(docs::examples = "external"))]
323 pub version_type: VersionType,
324}
325
326fn default_bulk_action() -> Template {
327 Template::try_from("index").expect("unable to parse template")
328}
329
330fn default_index() -> Template {
331 Template::try_from("vector-%Y.%m.%d").expect("unable to parse template")
332}
333
334const fn default_version_type() -> VersionType {
335 VersionType::Internal
336}
337
338impl Default for BulkConfig {
339 fn default() -> Self {
340 Self {
341 action: default_bulk_action(),
342 index: default_index(),
343 template_fallback_index: Default::default(),
344 version: Default::default(),
345 version_type: default_version_type(),
346 }
347 }
348}
349
350#[configurable_component]
352#[derive(Clone, Debug)]
353#[serde(rename_all = "snake_case")]
354pub struct DataStreamConfig {
355 #[serde(rename = "type", default = "DataStreamConfig::default_type")]
357 #[configurable(metadata(docs::examples = "metrics"))]
358 #[configurable(metadata(docs::examples = "synthetics"))]
359 #[configurable(metadata(docs::examples = "{{ type }}"))]
360 pub dtype: Template,
361
362 #[serde(default = "DataStreamConfig::default_dataset")]
364 #[configurable(metadata(docs::examples = "generic"))]
365 #[configurable(metadata(docs::examples = "nginx"))]
366 #[configurable(metadata(docs::examples = "{{ service }}"))]
367 pub dataset: Template,
368
369 #[serde(default = "DataStreamConfig::default_namespace")]
371 #[configurable(metadata(docs::examples = "{{ environment }}"))]
372 pub namespace: Template,
373
374 #[serde(default = "DataStreamConfig::default_auto_routing")]
383 pub auto_routing: bool,
384
385 #[serde(default = "DataStreamConfig::default_sync_fields")]
389 pub sync_fields: bool,
390}
391
392impl Default for DataStreamConfig {
393 fn default() -> Self {
394 Self {
395 dtype: Self::default_type(),
396 dataset: Self::default_dataset(),
397 namespace: Self::default_namespace(),
398 auto_routing: Self::default_auto_routing(),
399 sync_fields: Self::default_sync_fields(),
400 }
401 }
402}
403
404impl DataStreamConfig {
405 fn default_type() -> Template {
406 Template::try_from("logs").expect("couldn't build default type template")
407 }
408
409 fn default_dataset() -> Template {
410 Template::try_from("generic").expect("couldn't build default dataset template")
411 }
412
413 fn default_namespace() -> Template {
414 Template::try_from("default").expect("couldn't build default namespace template")
415 }
416
417 const fn default_auto_routing() -> bool {
418 true
419 }
420
421 const fn default_sync_fields() -> bool {
422 true
423 }
424
425 pub fn remap_timestamp(&self, log: &mut LogEvent) {
427 if let Some(timestamp_key) = log.timestamp_path().cloned() {
428 if timestamp_key.to_string() == DATA_STREAM_TIMESTAMP_KEY {
429 return;
430 }
431
432 log.rename_key(×tamp_key, event_path!(DATA_STREAM_TIMESTAMP_KEY));
433 }
434 }
435
436 pub fn dtype<'a>(&self, event: impl Into<EventRef<'a>>) -> Option<String> {
437 self.dtype
438 .render_string(event)
439 .map_err(|error| {
440 emit!(TemplateRenderingError {
441 error,
442 field: Some("data_stream.type"),
443 drop_event: true,
444 });
445 })
446 .ok()
447 }
448
449 pub fn dataset<'a>(&self, event: impl Into<EventRef<'a>>) -> Option<String> {
450 self.dataset
451 .render_string(event)
452 .map_err(|error| {
453 emit!(TemplateRenderingError {
454 error,
455 field: Some("data_stream.dataset"),
456 drop_event: true,
457 });
458 })
459 .ok()
460 }
461
462 pub fn namespace<'a>(&self, event: impl Into<EventRef<'a>>) -> Option<String> {
463 self.namespace
464 .render_string(event)
465 .map_err(|error| {
466 emit!(TemplateRenderingError {
467 error,
468 field: Some("data_stream.namespace"),
469 drop_event: true,
470 });
471 })
472 .ok()
473 }
474
475 pub fn sync_fields(&self, log: &mut LogEvent) {
476 if !self.sync_fields {
477 return;
478 }
479
480 let dtype = self.dtype(&*log);
481 let dataset = self.dataset(&*log);
482 let namespace = self.namespace(&*log);
483
484 if log.as_map().is_none() {
485 *log.value_mut() = Value::Object(BTreeMap::new());
486 }
487 let existing = log
488 .as_map_mut()
489 .expect("must be a map")
490 .entry("data_stream".into())
491 .or_insert_with(|| Value::Object(BTreeMap::new()))
492 .as_object_mut_unwrap();
493
494 if let Some(dtype) = dtype {
495 existing
496 .entry("type".into())
497 .or_insert_with(|| dtype.into());
498 }
499 if let Some(dataset) = dataset {
500 existing
501 .entry("dataset".into())
502 .or_insert_with(|| dataset.into());
503 }
504 if let Some(namespace) = namespace {
505 existing
506 .entry("namespace".into())
507 .or_insert_with(|| namespace.into());
508 }
509 }
510
511 pub fn index(&self, log: &LogEvent) -> Option<String> {
512 let (dtype, dataset, namespace) = if !self.auto_routing {
513 (self.dtype(log)?, self.dataset(log)?, self.namespace(log)?)
514 } else {
515 let data_stream = log
516 .get(event_path!("data_stream"))
517 .and_then(|ds| ds.as_object());
518 let dtype = data_stream
519 .and_then(|ds| ds.get("type"))
520 .map(|value| value.to_string_lossy().into_owned())
521 .or_else(|| self.dtype(log))?;
522 let dataset = data_stream
523 .and_then(|ds| ds.get("dataset"))
524 .map(|value| value.to_string_lossy().into_owned())
525 .or_else(|| self.dataset(log))?;
526 let namespace = data_stream
527 .and_then(|ds| ds.get("namespace"))
528 .map(|value| value.to_string_lossy().into_owned())
529 .or_else(|| self.namespace(log))?;
530 (dtype, dataset, namespace)
531 };
532
533 let name = [dtype, dataset, namespace]
534 .into_iter()
535 .filter(|s| !s.is_empty())
536 .collect::<Vec<_>>()
537 .join("-");
538
539 Some(name)
540 }
541}
542
543#[async_trait::async_trait]
544#[typetag::serde(name = "elasticsearch")]
545impl SinkConfig for ElasticsearchConfig {
546 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
547 let commons = ElasticsearchCommon::parse_many(self, cx.proxy()).await?;
548 let common = commons[0].clone();
549
550 let client = HttpClient::new(common.tls_settings.clone(), cx.proxy())?;
551
552 let request_limits = self.request.tower.into_settings();
553
554 let health_config = self.endpoint_health.clone().unwrap_or_default();
555
556 let services = commons
557 .iter()
558 .cloned()
559 .map(|common| {
560 let endpoint = common.base_url.clone();
561
562 let http_request_builder = HttpRequestBuilder::new(&common, self);
563 let service = ElasticsearchService::new(client.clone(), http_request_builder);
564
565 (endpoint, service)
566 })
567 .collect::<Vec<_>>();
568
569 let service = request_limits.distributed_service(
570 ElasticsearchRetryLogic {
571 retry_partial: self.request_retry_partial,
572 },
573 services,
574 health_config,
575 ElasticsearchHealthLogic,
576 1,
577 );
578
579 let sink = ElasticsearchSink::new(&common, self, service)?;
580
581 let stream = VectorSink::from_event_streamsink(sink);
582
583 let healthcheck = futures::future::select_ok(
584 commons
585 .into_iter()
586 .map(move |common| common.healthcheck(client.clone()).boxed()),
587 )
588 .map_ok(|((), _)| ())
589 .boxed();
590 Ok((stream, healthcheck))
591 }
592
593 fn input(&self) -> Input {
594 let requirements = Requirement::empty().optional_meaning("timestamp", Kind::timestamp());
595
596 Input::new(DataType::Metric | DataType::Log).with_schema_requirement(requirements)
597 }
598
599 fn acknowledgements(&self) -> &AcknowledgementsConfig {
600 &self.acknowledgements
601 }
602}
603
604#[cfg(test)]
605mod tests {
606 use super::*;
607
608 #[test]
609 fn generate_config() {
610 crate::test_util::test_generate_config::<ElasticsearchConfig>();
611 }
612
613 #[test]
614 fn parse_aws_auth() {
615 toml::from_str::<ElasticsearchConfig>(
616 r#"
617 endpoints = [""]
618 auth.strategy = "aws"
619 auth.assume_role = "role"
620 "#,
621 )
622 .unwrap();
623
624 toml::from_str::<ElasticsearchConfig>(
625 r#"
626 endpoints = [""]
627 auth.strategy = "aws"
628 "#,
629 )
630 .unwrap();
631 }
632
633 #[test]
634 fn parse_mode() {
635 let config = toml::from_str::<ElasticsearchConfig>(
636 r#"
637 endpoints = [""]
638 mode = "data_stream"
639 data_stream.type = "synthetics"
640 "#,
641 )
642 .unwrap();
643 assert!(matches!(config.mode, ElasticsearchMode::DataStream));
644 assert!(config.data_stream.is_some());
645 }
646
647 #[test]
648 fn parse_distribution() {
649 toml::from_str::<ElasticsearchConfig>(
650 r#"
651 endpoints = ["", ""]
652 distribution.retry_initial_backoff_secs = 10
653 "#,
654 )
655 .unwrap();
656 }
657
658 #[test]
659 fn parse_version() {
660 let config = toml::from_str::<ElasticsearchConfig>(
661 r#"
662 endpoints = [""]
663 api_version = "v7"
664 "#,
665 )
666 .unwrap();
667 assert_eq!(config.api_version, ElasticsearchApiVersion::V7);
668 }
669
670 #[test]
671 fn parse_version_auto() {
672 let config = toml::from_str::<ElasticsearchConfig>(
673 r#"
674 endpoints = [""]
675 api_version = "auto"
676 "#,
677 )
678 .unwrap();
679 assert_eq!(config.api_version, ElasticsearchApiVersion::Auto);
680 }
681
682 #[test]
683 fn parse_default_bulk() {
684 let config = toml::from_str::<ElasticsearchConfig>(
685 r#"
686 endpoints = [""]
687 "#,
688 )
689 .unwrap();
690 assert_eq!(config.mode, ElasticsearchMode::Bulk);
691 assert_eq!(config.bulk, BulkConfig::default());
692 }
693}