vector/sinks/elasticsearch/
mod.rs

1mod common;
2mod config;
3pub mod encoder;
4pub mod health;
5pub mod request_builder;
6pub mod retry;
7pub mod service;
8pub mod sink;
9
10#[cfg(test)]
11mod tests;
12
13#[cfg(test)]
14#[cfg(feature = "es-integration-tests")]
15mod integration_tests;
16
17use std::{convert::TryFrom, fmt};
18
19pub use common::*;
20pub use config::*;
21pub use encoder::ElasticsearchEncoder;
22use http::{uri::InvalidUri, Request};
23use snafu::Snafu;
24use vector_lib::sensitive_string::SensitiveString;
25use vector_lib::{configurable::configurable_component, internal_event};
26
27use crate::{
28    event::{EventRef, LogEvent},
29    internal_events::TemplateRenderingError,
30    template::{Template, TemplateParseError},
31};
32
33/// Elasticsearch Authentication strategies.
34#[configurable_component]
35#[derive(Clone, Debug)]
36#[serde(deny_unknown_fields, rename_all = "snake_case", tag = "strategy")]
37#[configurable(metadata(
38    docs::enum_tag_description = "The authentication strategy to use.\n\nAmazon OpenSearch Serverless requires this option to be set to `aws`."
39))]
40pub enum ElasticsearchAuthConfig {
41    /// HTTP Basic Authentication.
42    Basic {
43        /// Basic authentication username.
44        #[configurable(metadata(docs::examples = "${ELASTICSEARCH_USERNAME}"))]
45        #[configurable(metadata(docs::examples = "username"))]
46        user: String,
47
48        /// Basic authentication password.
49        #[configurable(metadata(docs::examples = "${ELASTICSEARCH_PASSWORD}"))]
50        #[configurable(metadata(docs::examples = "password"))]
51        password: SensitiveString,
52    },
53
54    #[cfg(feature = "aws-core")]
55    /// Amazon OpenSearch Service-specific authentication.
56    Aws(crate::aws::AwsAuthentication),
57}
58
59/// Elasticsearch Indexing mode.
60#[configurable_component]
61#[derive(Clone, Debug, Eq, PartialEq)]
62#[serde(deny_unknown_fields, rename_all = "snake_case")]
63pub enum ElasticsearchMode {
64    /// Ingests documents in bulk, using the bulk API `index` action.
65    #[serde(alias = "normal")]
66    Bulk,
67
68    /// Ingests documents in bulk, using the bulk API `create` action.
69    ///
70    /// Elasticsearch Data Streams only support the `create` action.
71    ///
72    /// If the mode is set to `data_stream` and a `timestamp` field is present in a message,
73    /// Vector renames this field to the expected `@timestamp` to comply with the Elastic Common Schema.
74    DataStream,
75}
76
77impl Default for ElasticsearchMode {
78    fn default() -> Self {
79        Self::Bulk
80    }
81}
82
83/// Bulk API actions.
84#[configurable_component]
85#[derive(Clone, Copy, Debug, Derivative, Eq, Hash, PartialEq)]
86#[serde(deny_unknown_fields, rename_all = "snake_case")]
87pub enum BulkAction {
88    /// The `index` action.
89    Index,
90
91    /// The `create` action.
92    Create,
93
94    /// The `update` action.
95    Update,
96}
97
98#[allow(clippy::trivially_copy_pass_by_ref)]
99impl BulkAction {
100    pub const fn as_str(&self) -> &'static str {
101        match self {
102            BulkAction::Index => "index",
103            BulkAction::Create => "create",
104            BulkAction::Update => "update",
105        }
106    }
107
108    pub const fn as_json_pointer(&self) -> &'static str {
109        match self {
110            BulkAction::Index => "/index",
111            BulkAction::Create => "/create",
112            BulkAction::Update => "/update",
113        }
114    }
115}
116
117impl TryFrom<&str> for BulkAction {
118    type Error = String;
119
120    fn try_from(input: &str) -> Result<Self, Self::Error> {
121        match input {
122            "index" => Ok(BulkAction::Index),
123            "create" => Ok(BulkAction::Create),
124            "update" => Ok(BulkAction::Update),
125            _ => Err(format!("Invalid bulk action: {input}")),
126        }
127    }
128}
129
130/// Elasticsearch version types.
131#[configurable_component]
132#[derive(Clone, Copy, Debug, Derivative, Eq, Hash, PartialEq)]
133#[serde(deny_unknown_fields, rename_all = "snake_case")]
134pub enum VersionType {
135    /// The `internal` type.
136    Internal,
137
138    /// The `external` or `external_gt` type.
139    External,
140
141    /// The `external_gte` type.
142    ExternalGte,
143}
144
145#[allow(clippy::trivially_copy_pass_by_ref)]
146impl VersionType {
147    pub const fn as_str(&self) -> &'static str {
148        match self {
149            Self::Internal => "internal",
150            Self::External => "external",
151            Self::ExternalGte => "external_gte",
152        }
153    }
154}
155
156impl TryFrom<&str> for VersionType {
157    type Error = String;
158
159    fn try_from(input: &str) -> Result<Self, Self::Error> {
160        match input {
161            "internal" => Ok(VersionType::Internal),
162            "external" | "external_gt" => Ok(VersionType::External),
163            "external_gte" => Ok(VersionType::ExternalGte),
164            _ => Err(format!("Invalid versioning mode: {input}")),
165        }
166    }
167}
168
169impl_generate_config_from_default!(ElasticsearchConfig);
170
171#[derive(Debug, Clone)]
172pub enum ElasticsearchCommonMode {
173    Bulk {
174        index: Template,
175        template_fallback_index: Option<String>,
176        action: Template,
177        version: Option<Template>,
178        version_type: VersionType,
179    },
180    DataStream(DataStreamConfig),
181}
182
183struct VersionValueParseError<'a> {
184    value: &'a str,
185}
186
187impl internal_event::InternalEvent for VersionValueParseError<'_> {
188    fn emit(self) {
189        warn!("{self}")
190    }
191}
192
193impl fmt::Display for VersionValueParseError<'_> {
194    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
195        write!(f, "Cannot parse version \"{}\" as integer", self.value)
196    }
197}
198
199impl ElasticsearchCommonMode {
200    fn index(&self, log: &LogEvent) -> Option<String> {
201        match self {
202            Self::Bulk {
203                index,
204                template_fallback_index,
205                ..
206            } => index
207                .render_string(log)
208                .or_else(|error| {
209                    if let Some(fallback) = template_fallback_index {
210                        emit!(TemplateRenderingError {
211                            error,
212                            field: Some("index"),
213                            drop_event: false,
214                        });
215                        Ok(fallback.clone())
216                    } else {
217                        emit!(TemplateRenderingError {
218                            error,
219                            field: Some("index"),
220                            drop_event: true,
221                        });
222                        Err(())
223                    }
224                })
225                .ok(),
226            Self::DataStream(ds) => ds.index(log),
227        }
228    }
229
230    fn bulk_action<'a>(&self, event: impl Into<EventRef<'a>>) -> Option<BulkAction> {
231        match self {
232            ElasticsearchCommonMode::Bulk {
233                action: bulk_action_template,
234                ..
235            } => bulk_action_template
236                .render_string(event)
237                .map_err(|error| {
238                    emit!(TemplateRenderingError {
239                        error,
240                        field: Some("bulk_action"),
241                        drop_event: true,
242                    });
243                })
244                .ok()
245                .and_then(|value| BulkAction::try_from(value.as_str()).ok()),
246            // avoid the interpolation
247            ElasticsearchCommonMode::DataStream(_) => Some(BulkAction::Create),
248        }
249    }
250
251    fn version<'a>(&self, event: impl Into<EventRef<'a>>) -> Option<u64> {
252        match self {
253            ElasticsearchCommonMode::Bulk {
254                version: Some(version),
255                ..
256            } => version
257                .render_string(event)
258                .map_err(|error| {
259                    emit!(TemplateRenderingError {
260                        error,
261                        field: Some("version"),
262                        drop_event: true,
263                    });
264                })
265                .ok()
266                .as_ref()
267                .and_then(|value| {
268                    value
269                        .parse()
270                        .map_err(|_| emit!(VersionValueParseError { value }))
271                        .ok()
272                }),
273            _ => None,
274        }
275    }
276
277    const fn version_type(&self) -> Option<VersionType> {
278        match self {
279            ElasticsearchCommonMode::Bulk { version_type, .. } => Some(*version_type),
280            _ => Some(VersionType::Internal),
281        }
282    }
283
284    const fn as_data_stream_config(&self) -> Option<&DataStreamConfig> {
285        match self {
286            Self::DataStream(value) => Some(value),
287            _ => None,
288        }
289    }
290}
291
292/// Configuration for Elasticsearch API version.
293#[configurable_component]
294#[derive(Clone, Debug, Eq, PartialEq)]
295#[cfg_attr(feature = "proptest", derive(proptest_derive::Arbitrary))]
296#[serde(deny_unknown_fields, rename_all = "snake_case")]
297pub enum ElasticsearchApiVersion {
298    /// Auto-detect the API version.
299    ///
300    /// If the [cluster state version endpoint][es_version] isn't reachable, a warning is logged to
301    /// stdout, and the version is assumed to be V6 if the `suppress_type_name` option is set to
302    /// `true`. Otherwise, the version is assumed to be V8. In the future, the sink instead
303    /// returns an error during configuration parsing, since a wrongly assumed version could lead to
304    /// incorrect API calls.
305    ///
306    /// [es_version]: https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-state.html#cluster-state-api-path-params
307    Auto,
308    /// Use the Elasticsearch 6.x API.
309    V6,
310    /// Use the Elasticsearch 7.x API.
311    V7,
312    /// Use the Elasticsearch 8.x API.
313    V8,
314}
315
316impl Default for ElasticsearchApiVersion {
317    fn default() -> Self {
318        Self::Auto
319    }
320}
321
322#[derive(Debug, Snafu)]
323#[snafu(visibility(pub))]
324pub enum ParseError {
325    #[snafu(display("Invalid host {:?}: {:?}", host, source))]
326    InvalidHost { host: String, source: InvalidUri },
327    #[snafu(display("Host {:?} must include hostname", host))]
328    HostMustIncludeHostname { host: String },
329    #[snafu(display("Index template parse error: {}", source))]
330    IndexTemplate { source: TemplateParseError },
331    #[snafu(display("Batch action template parse error: {}", source))]
332    BatchActionTemplate { source: TemplateParseError },
333    #[cfg(feature = "aws-core")]
334    #[snafu(display("aws.region required when AWS authentication is in use"))]
335    RegionRequired,
336    #[snafu(display("Endpoints option must be specified"))]
337    EndpointRequired,
338    #[snafu(display(
339        "`endpoint` and `endpoints` options are mutually exclusive. Please use `endpoints` option."
340    ))]
341    EndpointsExclusive,
342    #[snafu(display("Tried to use external versioning without specifying the version itself"))]
343    ExternalVersioningWithoutVersion,
344    #[snafu(display("Cannot use external versioning without specifying a document ID"))]
345    ExternalVersioningWithoutDocumentID,
346    #[snafu(display("Your version field will be ignored because you use internal versioning"))]
347    ExternalVersionIgnoredWithInternalVersioning,
348    #[snafu(display("Amazon OpenSearch Serverless requires `api_version` value to be `auto`"))]
349    ServerlessElasticsearchApiVersionMustBeAuto,
350    #[snafu(display("Amazon OpenSearch Serverless requires `auth.strategy` value to be `aws`"))]
351    OpenSearchServerlessRequiresAwsAuth,
352}