vector/sinks/elasticsearch/
mod.rs1mod common;
2mod config;
3pub mod encoder;
4pub mod health;
5pub mod request_builder;
6pub mod retry;
7pub mod service;
8pub mod sink;
9
10#[cfg(test)]
11mod tests;
12
13#[cfg(test)]
14#[cfg(feature = "es-integration-tests")]
15mod integration_tests;
16
17use std::{convert::TryFrom, fmt};
18
19pub use common::*;
20pub use config::*;
21pub use encoder::ElasticsearchEncoder;
22use http::{uri::InvalidUri, Request};
23use snafu::Snafu;
24use vector_lib::sensitive_string::SensitiveString;
25use vector_lib::{configurable::configurable_component, internal_event};
26
27use crate::{
28 event::{EventRef, LogEvent},
29 internal_events::TemplateRenderingError,
30 template::{Template, TemplateParseError},
31};
32
33#[configurable_component]
35#[derive(Clone, Debug)]
36#[serde(deny_unknown_fields, rename_all = "snake_case", tag = "strategy")]
37#[configurable(metadata(
38 docs::enum_tag_description = "The authentication strategy to use.\n\nAmazon OpenSearch Serverless requires this option to be set to `aws`."
39))]
40pub enum ElasticsearchAuthConfig {
41 Basic {
43 #[configurable(metadata(docs::examples = "${ELASTICSEARCH_USERNAME}"))]
45 #[configurable(metadata(docs::examples = "username"))]
46 user: String,
47
48 #[configurable(metadata(docs::examples = "${ELASTICSEARCH_PASSWORD}"))]
50 #[configurable(metadata(docs::examples = "password"))]
51 password: SensitiveString,
52 },
53
54 #[cfg(feature = "aws-core")]
55 Aws(crate::aws::AwsAuthentication),
57}
58
59#[configurable_component]
61#[derive(Clone, Debug, Eq, PartialEq)]
62#[serde(deny_unknown_fields, rename_all = "snake_case")]
63pub enum ElasticsearchMode {
64 #[serde(alias = "normal")]
66 Bulk,
67
68 DataStream,
75}
76
77impl Default for ElasticsearchMode {
78 fn default() -> Self {
79 Self::Bulk
80 }
81}
82
83#[configurable_component]
85#[derive(Clone, Copy, Debug, Derivative, Eq, Hash, PartialEq)]
86#[serde(deny_unknown_fields, rename_all = "snake_case")]
87pub enum BulkAction {
88 Index,
90
91 Create,
93
94 Update,
96}
97
98#[allow(clippy::trivially_copy_pass_by_ref)]
99impl BulkAction {
100 pub const fn as_str(&self) -> &'static str {
101 match self {
102 BulkAction::Index => "index",
103 BulkAction::Create => "create",
104 BulkAction::Update => "update",
105 }
106 }
107
108 pub const fn as_json_pointer(&self) -> &'static str {
109 match self {
110 BulkAction::Index => "/index",
111 BulkAction::Create => "/create",
112 BulkAction::Update => "/update",
113 }
114 }
115}
116
117impl TryFrom<&str> for BulkAction {
118 type Error = String;
119
120 fn try_from(input: &str) -> Result<Self, Self::Error> {
121 match input {
122 "index" => Ok(BulkAction::Index),
123 "create" => Ok(BulkAction::Create),
124 "update" => Ok(BulkAction::Update),
125 _ => Err(format!("Invalid bulk action: {input}")),
126 }
127 }
128}
129
130#[configurable_component]
132#[derive(Clone, Copy, Debug, Derivative, Eq, Hash, PartialEq)]
133#[serde(deny_unknown_fields, rename_all = "snake_case")]
134pub enum VersionType {
135 Internal,
137
138 External,
140
141 ExternalGte,
143}
144
145#[allow(clippy::trivially_copy_pass_by_ref)]
146impl VersionType {
147 pub const fn as_str(&self) -> &'static str {
148 match self {
149 Self::Internal => "internal",
150 Self::External => "external",
151 Self::ExternalGte => "external_gte",
152 }
153 }
154}
155
156impl TryFrom<&str> for VersionType {
157 type Error = String;
158
159 fn try_from(input: &str) -> Result<Self, Self::Error> {
160 match input {
161 "internal" => Ok(VersionType::Internal),
162 "external" | "external_gt" => Ok(VersionType::External),
163 "external_gte" => Ok(VersionType::ExternalGte),
164 _ => Err(format!("Invalid versioning mode: {input}")),
165 }
166 }
167}
168
169impl_generate_config_from_default!(ElasticsearchConfig);
170
171#[derive(Debug, Clone)]
172pub enum ElasticsearchCommonMode {
173 Bulk {
174 index: Template,
175 template_fallback_index: Option<String>,
176 action: Template,
177 version: Option<Template>,
178 version_type: VersionType,
179 },
180 DataStream(DataStreamConfig),
181}
182
183struct VersionValueParseError<'a> {
184 value: &'a str,
185}
186
187impl internal_event::InternalEvent for VersionValueParseError<'_> {
188 fn emit(self) {
189 warn!("{self}")
190 }
191}
192
193impl fmt::Display for VersionValueParseError<'_> {
194 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
195 write!(f, "Cannot parse version \"{}\" as integer", self.value)
196 }
197}
198
199impl ElasticsearchCommonMode {
200 fn index(&self, log: &LogEvent) -> Option<String> {
201 match self {
202 Self::Bulk {
203 index,
204 template_fallback_index,
205 ..
206 } => index
207 .render_string(log)
208 .or_else(|error| {
209 if let Some(fallback) = template_fallback_index {
210 emit!(TemplateRenderingError {
211 error,
212 field: Some("index"),
213 drop_event: false,
214 });
215 Ok(fallback.clone())
216 } else {
217 emit!(TemplateRenderingError {
218 error,
219 field: Some("index"),
220 drop_event: true,
221 });
222 Err(())
223 }
224 })
225 .ok(),
226 Self::DataStream(ds) => ds.index(log),
227 }
228 }
229
230 fn bulk_action<'a>(&self, event: impl Into<EventRef<'a>>) -> Option<BulkAction> {
231 match self {
232 ElasticsearchCommonMode::Bulk {
233 action: bulk_action_template,
234 ..
235 } => bulk_action_template
236 .render_string(event)
237 .map_err(|error| {
238 emit!(TemplateRenderingError {
239 error,
240 field: Some("bulk_action"),
241 drop_event: true,
242 });
243 })
244 .ok()
245 .and_then(|value| BulkAction::try_from(value.as_str()).ok()),
246 ElasticsearchCommonMode::DataStream(_) => Some(BulkAction::Create),
248 }
249 }
250
251 fn version<'a>(&self, event: impl Into<EventRef<'a>>) -> Option<u64> {
252 match self {
253 ElasticsearchCommonMode::Bulk {
254 version: Some(version),
255 ..
256 } => version
257 .render_string(event)
258 .map_err(|error| {
259 emit!(TemplateRenderingError {
260 error,
261 field: Some("version"),
262 drop_event: true,
263 });
264 })
265 .ok()
266 .as_ref()
267 .and_then(|value| {
268 value
269 .parse()
270 .map_err(|_| emit!(VersionValueParseError { value }))
271 .ok()
272 }),
273 _ => None,
274 }
275 }
276
277 const fn version_type(&self) -> Option<VersionType> {
278 match self {
279 ElasticsearchCommonMode::Bulk { version_type, .. } => Some(*version_type),
280 _ => Some(VersionType::Internal),
281 }
282 }
283
284 const fn as_data_stream_config(&self) -> Option<&DataStreamConfig> {
285 match self {
286 Self::DataStream(value) => Some(value),
287 _ => None,
288 }
289 }
290}
291
292#[configurable_component]
294#[derive(Clone, Debug, Eq, PartialEq)]
295#[cfg_attr(feature = "proptest", derive(proptest_derive::Arbitrary))]
296#[serde(deny_unknown_fields, rename_all = "snake_case")]
297pub enum ElasticsearchApiVersion {
298 Auto,
308 V6,
310 V7,
312 V8,
314}
315
316impl Default for ElasticsearchApiVersion {
317 fn default() -> Self {
318 Self::Auto
319 }
320}
321
322#[derive(Debug, Snafu)]
323#[snafu(visibility(pub))]
324pub enum ParseError {
325 #[snafu(display("Invalid host {:?}: {:?}", host, source))]
326 InvalidHost { host: String, source: InvalidUri },
327 #[snafu(display("Host {:?} must include hostname", host))]
328 HostMustIncludeHostname { host: String },
329 #[snafu(display("Index template parse error: {}", source))]
330 IndexTemplate { source: TemplateParseError },
331 #[snafu(display("Batch action template parse error: {}", source))]
332 BatchActionTemplate { source: TemplateParseError },
333 #[cfg(feature = "aws-core")]
334 #[snafu(display("aws.region required when AWS authentication is in use"))]
335 RegionRequired,
336 #[snafu(display("Endpoints option must be specified"))]
337 EndpointRequired,
338 #[snafu(display(
339 "`endpoint` and `endpoints` options are mutually exclusive. Please use `endpoints` option."
340 ))]
341 EndpointsExclusive,
342 #[snafu(display("Tried to use external versioning without specifying the version itself"))]
343 ExternalVersioningWithoutVersion,
344 #[snafu(display("Cannot use external versioning without specifying a document ID"))]
345 ExternalVersioningWithoutDocumentID,
346 #[snafu(display("Your version field will be ignored because you use internal versioning"))]
347 ExternalVersionIgnoredWithInternalVersioning,
348 #[snafu(display("Amazon OpenSearch Serverless requires `api_version` value to be `auto`"))]
349 ServerlessElasticsearchApiVersionMustBeAuto,
350 #[snafu(display("Amazon OpenSearch Serverless requires `auth.strategy` value to be `aws`"))]
351 OpenSearchServerlessRequiresAwsAuth,
352}