vector/sinks/elasticsearch/
mod.rs

1mod common;
2mod config;
3pub mod encoder;
4pub mod health;
5pub mod request_builder;
6pub mod retry;
7pub mod service;
8pub mod sink;
9
10#[cfg(test)]
11mod tests;
12
13#[cfg(test)]
14#[cfg(feature = "es-integration-tests")]
15mod integration_tests;
16
17use std::{convert::TryFrom, fmt};
18
19pub use common::*;
20pub use config::*;
21pub use encoder::ElasticsearchEncoder;
22use http::{Request, uri::InvalidUri};
23use snafu::Snafu;
24use vector_lib::{
25    NamedInternalEvent, configurable::configurable_component, internal_event::InternalEvent,
26    sensitive_string::SensitiveString,
27};
28
29use crate::{
30    event::{EventRef, LogEvent},
31    internal_events::TemplateRenderingError,
32    template::{Template, TemplateParseError},
33};
34
35/// Elasticsearch Authentication strategies.
36#[configurable_component]
37#[derive(Clone, Debug)]
38#[serde(deny_unknown_fields, rename_all = "snake_case", tag = "strategy")]
39#[configurable(metadata(
40    docs::enum_tag_description = "The authentication strategy to use.\n\nAmazon OpenSearch Serverless requires this option to be set to `aws`."
41))]
42pub enum ElasticsearchAuthConfig {
43    /// HTTP Basic Authentication.
44    Basic {
45        /// Basic authentication username.
46        #[configurable(metadata(docs::examples = "${ELASTICSEARCH_USERNAME}"))]
47        #[configurable(metadata(docs::examples = "username"))]
48        user: String,
49
50        /// Basic authentication password.
51        #[configurable(metadata(docs::examples = "${ELASTICSEARCH_PASSWORD}"))]
52        #[configurable(metadata(docs::examples = "password"))]
53        password: SensitiveString,
54    },
55
56    #[cfg(feature = "aws-core")]
57    /// Amazon OpenSearch Service-specific authentication.
58    Aws(crate::aws::AwsAuthentication),
59}
60
61/// Elasticsearch Indexing mode.
62#[configurable_component]
63#[derive(Clone, Debug, Eq, PartialEq)]
64#[serde(deny_unknown_fields, rename_all = "snake_case")]
65#[derive(Default)]
66pub enum ElasticsearchMode {
67    /// Ingests documents in bulk, using the bulk API `index` action.
68    #[serde(alias = "normal")]
69    #[default]
70    Bulk,
71
72    /// Ingests documents in bulk, using the bulk API `create` action.
73    ///
74    /// Elasticsearch Data Streams only support the `create` action.
75    ///
76    /// If the mode is set to `data_stream` and a `timestamp` field is present in a message,
77    /// Vector renames this field to the expected `@timestamp` to comply with the Elastic Common Schema.
78    DataStream,
79}
80
81/// Bulk API actions.
82#[configurable_component]
83#[derive(Clone, Copy, Debug, Derivative, Eq, Hash, PartialEq)]
84#[serde(deny_unknown_fields, rename_all = "snake_case")]
85pub enum BulkAction {
86    /// The `index` action.
87    Index,
88
89    /// The `create` action.
90    Create,
91
92    /// The `update` action.
93    Update,
94}
95
96#[allow(clippy::trivially_copy_pass_by_ref)]
97impl BulkAction {
98    pub const fn as_str(&self) -> &'static str {
99        match self {
100            BulkAction::Index => "index",
101            BulkAction::Create => "create",
102            BulkAction::Update => "update",
103        }
104    }
105
106    pub const fn as_json_pointer(&self) -> &'static str {
107        match self {
108            BulkAction::Index => "/index",
109            BulkAction::Create => "/create",
110            BulkAction::Update => "/update",
111        }
112    }
113}
114
115impl TryFrom<&str> for BulkAction {
116    type Error = String;
117
118    fn try_from(input: &str) -> Result<Self, Self::Error> {
119        match input {
120            "index" => Ok(BulkAction::Index),
121            "create" => Ok(BulkAction::Create),
122            "update" => Ok(BulkAction::Update),
123            _ => Err(format!("Invalid bulk action: {input}")),
124        }
125    }
126}
127
128/// Elasticsearch version types.
129#[configurable_component]
130#[derive(Clone, Copy, Debug, Derivative, Eq, Hash, PartialEq)]
131#[serde(deny_unknown_fields, rename_all = "snake_case")]
132pub enum VersionType {
133    /// The `internal` type.
134    Internal,
135
136    /// The `external` or `external_gt` type.
137    External,
138
139    /// The `external_gte` type.
140    ExternalGte,
141}
142
143#[allow(clippy::trivially_copy_pass_by_ref)]
144impl VersionType {
145    pub const fn as_str(&self) -> &'static str {
146        match self {
147            Self::Internal => "internal",
148            Self::External => "external",
149            Self::ExternalGte => "external_gte",
150        }
151    }
152}
153
154impl TryFrom<&str> for VersionType {
155    type Error = String;
156
157    fn try_from(input: &str) -> Result<Self, Self::Error> {
158        match input {
159            "internal" => Ok(VersionType::Internal),
160            "external" | "external_gt" => Ok(VersionType::External),
161            "external_gte" => Ok(VersionType::ExternalGte),
162            _ => Err(format!("Invalid versioning mode: {input}")),
163        }
164    }
165}
166
167impl_generate_config_from_default!(ElasticsearchConfig);
168
169#[derive(Debug, Clone)]
170pub enum ElasticsearchCommonMode {
171    Bulk {
172        index: Template,
173        template_fallback_index: Option<String>,
174        action: Template,
175        version: Option<Template>,
176        version_type: VersionType,
177    },
178    DataStream(DataStreamConfig),
179}
180
181#[derive(NamedInternalEvent)]
182struct VersionValueParseError<'a> {
183    value: &'a str,
184}
185
186impl InternalEvent for VersionValueParseError<'_> {
187    fn emit(self) {
188        warn!("{self}")
189    }
190}
191
192impl fmt::Display for VersionValueParseError<'_> {
193    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
194        write!(f, "Cannot parse version \"{}\" as integer", self.value)
195    }
196}
197
198impl ElasticsearchCommonMode {
199    fn index(&self, log: &LogEvent) -> Option<String> {
200        match self {
201            Self::Bulk {
202                index,
203                template_fallback_index,
204                ..
205            } => index
206                .render_string(log)
207                .or_else(|error| {
208                    if let Some(fallback) = template_fallback_index {
209                        emit!(TemplateRenderingError {
210                            error,
211                            field: Some("index"),
212                            drop_event: false,
213                        });
214                        Ok(fallback.clone())
215                    } else {
216                        emit!(TemplateRenderingError {
217                            error,
218                            field: Some("index"),
219                            drop_event: true,
220                        });
221                        Err(())
222                    }
223                })
224                .ok(),
225            Self::DataStream(ds) => ds.index(log),
226        }
227    }
228
229    fn bulk_action<'a>(&self, event: impl Into<EventRef<'a>>) -> Option<BulkAction> {
230        match self {
231            ElasticsearchCommonMode::Bulk {
232                action: bulk_action_template,
233                ..
234            } => bulk_action_template
235                .render_string(event)
236                .map_err(|error| {
237                    emit!(TemplateRenderingError {
238                        error,
239                        field: Some("bulk_action"),
240                        drop_event: true,
241                    });
242                })
243                .ok()
244                .and_then(|value| BulkAction::try_from(value.as_str()).ok()),
245            // avoid the interpolation
246            ElasticsearchCommonMode::DataStream(_) => Some(BulkAction::Create),
247        }
248    }
249
250    fn version<'a>(&self, event: impl Into<EventRef<'a>>) -> Option<u64> {
251        match self {
252            ElasticsearchCommonMode::Bulk {
253                version: Some(version),
254                ..
255            } => version
256                .render_string(event)
257                .map_err(|error| {
258                    emit!(TemplateRenderingError {
259                        error,
260                        field: Some("version"),
261                        drop_event: true,
262                    });
263                })
264                .ok()
265                .as_ref()
266                .and_then(|value| {
267                    value
268                        .parse()
269                        .map_err(|_| emit!(VersionValueParseError { value }))
270                        .ok()
271                }),
272            _ => None,
273        }
274    }
275
276    const fn version_type(&self) -> Option<VersionType> {
277        match self {
278            ElasticsearchCommonMode::Bulk { version_type, .. } => Some(*version_type),
279            _ => Some(VersionType::Internal),
280        }
281    }
282
283    const fn as_data_stream_config(&self) -> Option<&DataStreamConfig> {
284        match self {
285            Self::DataStream(value) => Some(value),
286            _ => None,
287        }
288    }
289}
290
291/// Configuration for Elasticsearch API version.
292#[configurable_component]
293#[derive(Clone, Debug, Eq, PartialEq)]
294#[cfg_attr(feature = "proptest", derive(proptest_derive::Arbitrary))]
295#[serde(deny_unknown_fields, rename_all = "snake_case")]
296#[derive(Default)]
297pub enum ElasticsearchApiVersion {
298    /// Auto-detect the API version.
299    ///
300    /// If the [cluster state version endpoint][es_version] isn't reachable, a warning is logged to
301    /// stdout, and the version is assumed to be V6 if the `suppress_type_name` option is set to
302    /// `true`. Otherwise, the version is assumed to be V8. In the future, the sink instead
303    /// returns an error during configuration parsing, since a wrongly assumed version could lead to
304    /// incorrect API calls.
305    ///
306    /// [es_version]: https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-state.html#cluster-state-api-path-params
307    #[default]
308    Auto,
309    /// Use the Elasticsearch 6.x API.
310    V6,
311    /// Use the Elasticsearch 7.x API.
312    V7,
313    /// Use the Elasticsearch 8.x API.
314    V8,
315}
316
317#[derive(Debug, Snafu)]
318#[snafu(visibility(pub))]
319pub enum ParseError {
320    #[snafu(display("Invalid host {:?}: {:?}", host, source))]
321    InvalidHost { host: String, source: InvalidUri },
322    #[snafu(display("Host {:?} must include hostname", host))]
323    HostMustIncludeHostname { host: String },
324    #[snafu(display("Index template parse error: {}", source))]
325    IndexTemplate { source: TemplateParseError },
326    #[snafu(display("Batch action template parse error: {}", source))]
327    BatchActionTemplate { source: TemplateParseError },
328    #[cfg(feature = "aws-core")]
329    #[snafu(display("aws.region required when AWS authentication is in use"))]
330    RegionRequired,
331    #[snafu(display("Endpoints option must be specified"))]
332    EndpointRequired,
333    #[snafu(display(
334        "`endpoint` and `endpoints` options are mutually exclusive. Please use `endpoints` option."
335    ))]
336    EndpointsExclusive,
337    #[snafu(display("Tried to use external versioning without specifying the version itself"))]
338    ExternalVersioningWithoutVersion,
339    #[snafu(display("Cannot use external versioning without specifying a document ID"))]
340    ExternalVersioningWithoutDocumentID,
341    #[snafu(display("Your version field will be ignored because you use internal versioning"))]
342    ExternalVersionIgnoredWithInternalVersioning,
343    #[snafu(display("Amazon OpenSearch Serverless requires `api_version` value to be `auto`"))]
344    ServerlessElasticsearchApiVersionMustBeAuto,
345    #[snafu(display("Amazon OpenSearch Serverless requires `auth.strategy` value to be `aws`"))]
346    OpenSearchServerlessRequiresAwsAuth,
347}