vector/sinks/elasticsearch/
mod.rs1mod common;
2mod config;
3pub mod encoder;
4pub mod health;
5pub mod request_builder;
6pub mod retry;
7pub mod service;
8pub mod sink;
9
10#[cfg(test)]
11mod tests;
12
13#[cfg(test)]
14#[cfg(feature = "es-integration-tests")]
15mod integration_tests;
16
17use std::{convert::TryFrom, fmt};
18
19pub use common::*;
20pub use config::*;
21pub use encoder::ElasticsearchEncoder;
22use http::{Request, uri::InvalidUri};
23use snafu::Snafu;
24use vector_lib::{
25 NamedInternalEvent, configurable::configurable_component, internal_event::InternalEvent,
26 sensitive_string::SensitiveString,
27};
28
29use crate::{
30 event::{EventRef, LogEvent},
31 internal_events::TemplateRenderingError,
32 template::{Template, TemplateParseError},
33};
34
35#[configurable_component]
37#[derive(Clone, Debug)]
38#[serde(deny_unknown_fields, rename_all = "snake_case", tag = "strategy")]
39#[configurable(metadata(
40 docs::enum_tag_description = "The authentication strategy to use.\n\nAmazon OpenSearch Serverless requires this option to be set to `aws`."
41))]
42pub enum ElasticsearchAuthConfig {
43 Basic {
45 #[configurable(metadata(docs::examples = "${ELASTICSEARCH_USERNAME}"))]
47 #[configurable(metadata(docs::examples = "username"))]
48 user: String,
49
50 #[configurable(metadata(docs::examples = "${ELASTICSEARCH_PASSWORD}"))]
52 #[configurable(metadata(docs::examples = "password"))]
53 password: SensitiveString,
54 },
55
56 #[cfg(feature = "aws-core")]
57 Aws(crate::aws::AwsAuthentication),
59}
60
61#[configurable_component]
63#[derive(Clone, Debug, Eq, PartialEq)]
64#[serde(deny_unknown_fields, rename_all = "snake_case")]
65#[derive(Default)]
66pub enum ElasticsearchMode {
67 #[serde(alias = "normal")]
69 #[default]
70 Bulk,
71
72 DataStream,
79}
80
81#[configurable_component]
83#[derive(Clone, Copy, Debug, Derivative, Eq, Hash, PartialEq)]
84#[serde(deny_unknown_fields, rename_all = "snake_case")]
85pub enum BulkAction {
86 Index,
88
89 Create,
91
92 Update,
94}
95
96#[allow(clippy::trivially_copy_pass_by_ref)]
97impl BulkAction {
98 pub const fn as_str(&self) -> &'static str {
99 match self {
100 BulkAction::Index => "index",
101 BulkAction::Create => "create",
102 BulkAction::Update => "update",
103 }
104 }
105
106 pub const fn as_json_pointer(&self) -> &'static str {
107 match self {
108 BulkAction::Index => "/index",
109 BulkAction::Create => "/create",
110 BulkAction::Update => "/update",
111 }
112 }
113}
114
115impl TryFrom<&str> for BulkAction {
116 type Error = String;
117
118 fn try_from(input: &str) -> Result<Self, Self::Error> {
119 match input {
120 "index" => Ok(BulkAction::Index),
121 "create" => Ok(BulkAction::Create),
122 "update" => Ok(BulkAction::Update),
123 _ => Err(format!("Invalid bulk action: {input}")),
124 }
125 }
126}
127
128#[configurable_component]
130#[derive(Clone, Copy, Debug, Derivative, Eq, Hash, PartialEq)]
131#[serde(deny_unknown_fields, rename_all = "snake_case")]
132pub enum VersionType {
133 Internal,
135
136 External,
138
139 ExternalGte,
141}
142
143#[allow(clippy::trivially_copy_pass_by_ref)]
144impl VersionType {
145 pub const fn as_str(&self) -> &'static str {
146 match self {
147 Self::Internal => "internal",
148 Self::External => "external",
149 Self::ExternalGte => "external_gte",
150 }
151 }
152}
153
154impl TryFrom<&str> for VersionType {
155 type Error = String;
156
157 fn try_from(input: &str) -> Result<Self, Self::Error> {
158 match input {
159 "internal" => Ok(VersionType::Internal),
160 "external" | "external_gt" => Ok(VersionType::External),
161 "external_gte" => Ok(VersionType::ExternalGte),
162 _ => Err(format!("Invalid versioning mode: {input}")),
163 }
164 }
165}
166
167impl_generate_config_from_default!(ElasticsearchConfig);
168
169#[derive(Debug, Clone)]
170pub enum ElasticsearchCommonMode {
171 Bulk {
172 index: Template,
173 template_fallback_index: Option<String>,
174 action: Template,
175 version: Option<Template>,
176 version_type: VersionType,
177 },
178 DataStream(DataStreamConfig),
179}
180
181#[derive(NamedInternalEvent)]
182struct VersionValueParseError<'a> {
183 value: &'a str,
184}
185
186impl InternalEvent for VersionValueParseError<'_> {
187 fn emit(self) {
188 warn!("{self}")
189 }
190}
191
192impl fmt::Display for VersionValueParseError<'_> {
193 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
194 write!(f, "Cannot parse version \"{}\" as integer", self.value)
195 }
196}
197
198impl ElasticsearchCommonMode {
199 fn index(&self, log: &LogEvent) -> Option<String> {
200 match self {
201 Self::Bulk {
202 index,
203 template_fallback_index,
204 ..
205 } => index
206 .render_string(log)
207 .or_else(|error| {
208 if let Some(fallback) = template_fallback_index {
209 emit!(TemplateRenderingError {
210 error,
211 field: Some("index"),
212 drop_event: false,
213 });
214 Ok(fallback.clone())
215 } else {
216 emit!(TemplateRenderingError {
217 error,
218 field: Some("index"),
219 drop_event: true,
220 });
221 Err(())
222 }
223 })
224 .ok(),
225 Self::DataStream(ds) => ds.index(log),
226 }
227 }
228
229 fn bulk_action<'a>(&self, event: impl Into<EventRef<'a>>) -> Option<BulkAction> {
230 match self {
231 ElasticsearchCommonMode::Bulk {
232 action: bulk_action_template,
233 ..
234 } => bulk_action_template
235 .render_string(event)
236 .map_err(|error| {
237 emit!(TemplateRenderingError {
238 error,
239 field: Some("bulk_action"),
240 drop_event: true,
241 });
242 })
243 .ok()
244 .and_then(|value| BulkAction::try_from(value.as_str()).ok()),
245 ElasticsearchCommonMode::DataStream(_) => Some(BulkAction::Create),
247 }
248 }
249
250 fn version<'a>(&self, event: impl Into<EventRef<'a>>) -> Option<u64> {
251 match self {
252 ElasticsearchCommonMode::Bulk {
253 version: Some(version),
254 ..
255 } => version
256 .render_string(event)
257 .map_err(|error| {
258 emit!(TemplateRenderingError {
259 error,
260 field: Some("version"),
261 drop_event: true,
262 });
263 })
264 .ok()
265 .as_ref()
266 .and_then(|value| {
267 value
268 .parse()
269 .map_err(|_| emit!(VersionValueParseError { value }))
270 .ok()
271 }),
272 _ => None,
273 }
274 }
275
276 const fn version_type(&self) -> Option<VersionType> {
277 match self {
278 ElasticsearchCommonMode::Bulk { version_type, .. } => Some(*version_type),
279 _ => Some(VersionType::Internal),
280 }
281 }
282
283 const fn as_data_stream_config(&self) -> Option<&DataStreamConfig> {
284 match self {
285 Self::DataStream(value) => Some(value),
286 _ => None,
287 }
288 }
289}
290
291#[configurable_component]
293#[derive(Clone, Debug, Eq, PartialEq)]
294#[cfg_attr(feature = "proptest", derive(proptest_derive::Arbitrary))]
295#[serde(deny_unknown_fields, rename_all = "snake_case")]
296#[derive(Default)]
297pub enum ElasticsearchApiVersion {
298 #[default]
308 Auto,
309 V6,
311 V7,
313 V8,
315}
316
317#[derive(Debug, Snafu)]
318#[snafu(visibility(pub))]
319pub enum ParseError {
320 #[snafu(display("Invalid host {:?}: {:?}", host, source))]
321 InvalidHost { host: String, source: InvalidUri },
322 #[snafu(display("Host {:?} must include hostname", host))]
323 HostMustIncludeHostname { host: String },
324 #[snafu(display("Index template parse error: {}", source))]
325 IndexTemplate { source: TemplateParseError },
326 #[snafu(display("Batch action template parse error: {}", source))]
327 BatchActionTemplate { source: TemplateParseError },
328 #[cfg(feature = "aws-core")]
329 #[snafu(display("aws.region required when AWS authentication is in use"))]
330 RegionRequired,
331 #[snafu(display("Endpoints option must be specified"))]
332 EndpointRequired,
333 #[snafu(display(
334 "`endpoint` and `endpoints` options are mutually exclusive. Please use `endpoints` option."
335 ))]
336 EndpointsExclusive,
337 #[snafu(display("Tried to use external versioning without specifying the version itself"))]
338 ExternalVersioningWithoutVersion,
339 #[snafu(display("Cannot use external versioning without specifying a document ID"))]
340 ExternalVersioningWithoutDocumentID,
341 #[snafu(display("Your version field will be ignored because you use internal versioning"))]
342 ExternalVersionIgnoredWithInternalVersioning,
343 #[snafu(display("Amazon OpenSearch Serverless requires `api_version` value to be `auto`"))]
344 ServerlessElasticsearchApiVersionMustBeAuto,
345 #[snafu(display("Amazon OpenSearch Serverless requires `auth.strategy` value to be `aws`"))]
346 OpenSearchServerlessRequiresAwsAuth,
347}