vector/sinks/elasticsearch/
mod.rs

1mod common;
2mod config;
3pub mod encoder;
4pub mod health;
5pub mod request_builder;
6pub mod retry;
7pub mod service;
8pub mod sink;
9
10#[cfg(test)]
11mod tests;
12
13#[cfg(test)]
14#[cfg(feature = "es-integration-tests")]
15mod integration_tests;
16
17use std::{convert::TryFrom, fmt};
18
19pub use common::*;
20pub use config::*;
21pub use encoder::ElasticsearchEncoder;
22use http::{Request, uri::InvalidUri};
23use snafu::Snafu;
24use vector_lib::{
25    configurable::configurable_component, internal_event, sensitive_string::SensitiveString,
26};
27
28use crate::{
29    event::{EventRef, LogEvent},
30    internal_events::TemplateRenderingError,
31    template::{Template, TemplateParseError},
32};
33
34/// Elasticsearch Authentication strategies.
35#[configurable_component]
36#[derive(Clone, Debug)]
37#[serde(deny_unknown_fields, rename_all = "snake_case", tag = "strategy")]
38#[configurable(metadata(
39    docs::enum_tag_description = "The authentication strategy to use.\n\nAmazon OpenSearch Serverless requires this option to be set to `aws`."
40))]
41pub enum ElasticsearchAuthConfig {
42    /// HTTP Basic Authentication.
43    Basic {
44        /// Basic authentication username.
45        #[configurable(metadata(docs::examples = "${ELASTICSEARCH_USERNAME}"))]
46        #[configurable(metadata(docs::examples = "username"))]
47        user: String,
48
49        /// Basic authentication password.
50        #[configurable(metadata(docs::examples = "${ELASTICSEARCH_PASSWORD}"))]
51        #[configurable(metadata(docs::examples = "password"))]
52        password: SensitiveString,
53    },
54
55    #[cfg(feature = "aws-core")]
56    /// Amazon OpenSearch Service-specific authentication.
57    Aws(crate::aws::AwsAuthentication),
58}
59
60/// Elasticsearch Indexing mode.
61#[configurable_component]
62#[derive(Clone, Debug, Eq, PartialEq)]
63#[serde(deny_unknown_fields, rename_all = "snake_case")]
64pub enum ElasticsearchMode {
65    /// Ingests documents in bulk, using the bulk API `index` action.
66    #[serde(alias = "normal")]
67    Bulk,
68
69    /// Ingests documents in bulk, using the bulk API `create` action.
70    ///
71    /// Elasticsearch Data Streams only support the `create` action.
72    ///
73    /// If the mode is set to `data_stream` and a `timestamp` field is present in a message,
74    /// Vector renames this field to the expected `@timestamp` to comply with the Elastic Common Schema.
75    DataStream,
76}
77
78impl Default for ElasticsearchMode {
79    fn default() -> Self {
80        Self::Bulk
81    }
82}
83
84/// Bulk API actions.
85#[configurable_component]
86#[derive(Clone, Copy, Debug, Derivative, Eq, Hash, PartialEq)]
87#[serde(deny_unknown_fields, rename_all = "snake_case")]
88pub enum BulkAction {
89    /// The `index` action.
90    Index,
91
92    /// The `create` action.
93    Create,
94
95    /// The `update` action.
96    Update,
97}
98
99#[allow(clippy::trivially_copy_pass_by_ref)]
100impl BulkAction {
101    pub const fn as_str(&self) -> &'static str {
102        match self {
103            BulkAction::Index => "index",
104            BulkAction::Create => "create",
105            BulkAction::Update => "update",
106        }
107    }
108
109    pub const fn as_json_pointer(&self) -> &'static str {
110        match self {
111            BulkAction::Index => "/index",
112            BulkAction::Create => "/create",
113            BulkAction::Update => "/update",
114        }
115    }
116}
117
118impl TryFrom<&str> for BulkAction {
119    type Error = String;
120
121    fn try_from(input: &str) -> Result<Self, Self::Error> {
122        match input {
123            "index" => Ok(BulkAction::Index),
124            "create" => Ok(BulkAction::Create),
125            "update" => Ok(BulkAction::Update),
126            _ => Err(format!("Invalid bulk action: {input}")),
127        }
128    }
129}
130
131/// Elasticsearch version types.
132#[configurable_component]
133#[derive(Clone, Copy, Debug, Derivative, Eq, Hash, PartialEq)]
134#[serde(deny_unknown_fields, rename_all = "snake_case")]
135pub enum VersionType {
136    /// The `internal` type.
137    Internal,
138
139    /// The `external` or `external_gt` type.
140    External,
141
142    /// The `external_gte` type.
143    ExternalGte,
144}
145
146#[allow(clippy::trivially_copy_pass_by_ref)]
147impl VersionType {
148    pub const fn as_str(&self) -> &'static str {
149        match self {
150            Self::Internal => "internal",
151            Self::External => "external",
152            Self::ExternalGte => "external_gte",
153        }
154    }
155}
156
157impl TryFrom<&str> for VersionType {
158    type Error = String;
159
160    fn try_from(input: &str) -> Result<Self, Self::Error> {
161        match input {
162            "internal" => Ok(VersionType::Internal),
163            "external" | "external_gt" => Ok(VersionType::External),
164            "external_gte" => Ok(VersionType::ExternalGte),
165            _ => Err(format!("Invalid versioning mode: {input}")),
166        }
167    }
168}
169
170impl_generate_config_from_default!(ElasticsearchConfig);
171
172#[derive(Debug, Clone)]
173pub enum ElasticsearchCommonMode {
174    Bulk {
175        index: Template,
176        template_fallback_index: Option<String>,
177        action: Template,
178        version: Option<Template>,
179        version_type: VersionType,
180    },
181    DataStream(DataStreamConfig),
182}
183
184struct VersionValueParseError<'a> {
185    value: &'a str,
186}
187
188impl internal_event::InternalEvent for VersionValueParseError<'_> {
189    fn emit(self) {
190        warn!("{self}")
191    }
192}
193
194impl fmt::Display for VersionValueParseError<'_> {
195    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
196        write!(f, "Cannot parse version \"{}\" as integer", self.value)
197    }
198}
199
200impl ElasticsearchCommonMode {
201    fn index(&self, log: &LogEvent) -> Option<String> {
202        match self {
203            Self::Bulk {
204                index,
205                template_fallback_index,
206                ..
207            } => index
208                .render_string(log)
209                .or_else(|error| {
210                    if let Some(fallback) = template_fallback_index {
211                        emit!(TemplateRenderingError {
212                            error,
213                            field: Some("index"),
214                            drop_event: false,
215                        });
216                        Ok(fallback.clone())
217                    } else {
218                        emit!(TemplateRenderingError {
219                            error,
220                            field: Some("index"),
221                            drop_event: true,
222                        });
223                        Err(())
224                    }
225                })
226                .ok(),
227            Self::DataStream(ds) => ds.index(log),
228        }
229    }
230
231    fn bulk_action<'a>(&self, event: impl Into<EventRef<'a>>) -> Option<BulkAction> {
232        match self {
233            ElasticsearchCommonMode::Bulk {
234                action: bulk_action_template,
235                ..
236            } => bulk_action_template
237                .render_string(event)
238                .map_err(|error| {
239                    emit!(TemplateRenderingError {
240                        error,
241                        field: Some("bulk_action"),
242                        drop_event: true,
243                    });
244                })
245                .ok()
246                .and_then(|value| BulkAction::try_from(value.as_str()).ok()),
247            // avoid the interpolation
248            ElasticsearchCommonMode::DataStream(_) => Some(BulkAction::Create),
249        }
250    }
251
252    fn version<'a>(&self, event: impl Into<EventRef<'a>>) -> Option<u64> {
253        match self {
254            ElasticsearchCommonMode::Bulk {
255                version: Some(version),
256                ..
257            } => version
258                .render_string(event)
259                .map_err(|error| {
260                    emit!(TemplateRenderingError {
261                        error,
262                        field: Some("version"),
263                        drop_event: true,
264                    });
265                })
266                .ok()
267                .as_ref()
268                .and_then(|value| {
269                    value
270                        .parse()
271                        .map_err(|_| emit!(VersionValueParseError { value }))
272                        .ok()
273                }),
274            _ => None,
275        }
276    }
277
278    const fn version_type(&self) -> Option<VersionType> {
279        match self {
280            ElasticsearchCommonMode::Bulk { version_type, .. } => Some(*version_type),
281            _ => Some(VersionType::Internal),
282        }
283    }
284
285    const fn as_data_stream_config(&self) -> Option<&DataStreamConfig> {
286        match self {
287            Self::DataStream(value) => Some(value),
288            _ => None,
289        }
290    }
291}
292
293/// Configuration for Elasticsearch API version.
294#[configurable_component]
295#[derive(Clone, Debug, Eq, PartialEq)]
296#[cfg_attr(feature = "proptest", derive(proptest_derive::Arbitrary))]
297#[serde(deny_unknown_fields, rename_all = "snake_case")]
298pub enum ElasticsearchApiVersion {
299    /// Auto-detect the API version.
300    ///
301    /// If the [cluster state version endpoint][es_version] isn't reachable, a warning is logged to
302    /// stdout, and the version is assumed to be V6 if the `suppress_type_name` option is set to
303    /// `true`. Otherwise, the version is assumed to be V8. In the future, the sink instead
304    /// returns an error during configuration parsing, since a wrongly assumed version could lead to
305    /// incorrect API calls.
306    ///
307    /// [es_version]: https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-state.html#cluster-state-api-path-params
308    Auto,
309    /// Use the Elasticsearch 6.x API.
310    V6,
311    /// Use the Elasticsearch 7.x API.
312    V7,
313    /// Use the Elasticsearch 8.x API.
314    V8,
315}
316
317impl Default for ElasticsearchApiVersion {
318    fn default() -> Self {
319        Self::Auto
320    }
321}
322
323#[derive(Debug, Snafu)]
324#[snafu(visibility(pub))]
325pub enum ParseError {
326    #[snafu(display("Invalid host {:?}: {:?}", host, source))]
327    InvalidHost { host: String, source: InvalidUri },
328    #[snafu(display("Host {:?} must include hostname", host))]
329    HostMustIncludeHostname { host: String },
330    #[snafu(display("Index template parse error: {}", source))]
331    IndexTemplate { source: TemplateParseError },
332    #[snafu(display("Batch action template parse error: {}", source))]
333    BatchActionTemplate { source: TemplateParseError },
334    #[cfg(feature = "aws-core")]
335    #[snafu(display("aws.region required when AWS authentication is in use"))]
336    RegionRequired,
337    #[snafu(display("Endpoints option must be specified"))]
338    EndpointRequired,
339    #[snafu(display(
340        "`endpoint` and `endpoints` options are mutually exclusive. Please use `endpoints` option."
341    ))]
342    EndpointsExclusive,
343    #[snafu(display("Tried to use external versioning without specifying the version itself"))]
344    ExternalVersioningWithoutVersion,
345    #[snafu(display("Cannot use external versioning without specifying a document ID"))]
346    ExternalVersioningWithoutDocumentID,
347    #[snafu(display("Your version field will be ignored because you use internal versioning"))]
348    ExternalVersionIgnoredWithInternalVersioning,
349    #[snafu(display("Amazon OpenSearch Serverless requires `api_version` value to be `auto`"))]
350    ServerlessElasticsearchApiVersionMustBeAuto,
351    #[snafu(display("Amazon OpenSearch Serverless requires `auth.strategy` value to be `aws`"))]
352    OpenSearchServerlessRequiresAwsAuth,
353}