vector/sinks/elasticsearch/
mod.rs1mod common;
2mod config;
3pub mod encoder;
4pub mod health;
5pub mod request_builder;
6pub mod retry;
7pub mod service;
8pub mod sink;
9
10#[cfg(test)]
11mod tests;
12
13#[cfg(test)]
14#[cfg(feature = "es-integration-tests")]
15mod integration_tests;
16
17use std::{convert::TryFrom, fmt};
18
19pub use common::*;
20pub use config::*;
21pub use encoder::ElasticsearchEncoder;
22use http::{Request, uri::InvalidUri};
23use snafu::Snafu;
24use vector_lib::{
25 configurable::configurable_component, internal_event, sensitive_string::SensitiveString,
26};
27
28use crate::{
29 event::{EventRef, LogEvent},
30 internal_events::TemplateRenderingError,
31 template::{Template, TemplateParseError},
32};
33
34#[configurable_component]
36#[derive(Clone, Debug)]
37#[serde(deny_unknown_fields, rename_all = "snake_case", tag = "strategy")]
38#[configurable(metadata(
39 docs::enum_tag_description = "The authentication strategy to use.\n\nAmazon OpenSearch Serverless requires this option to be set to `aws`."
40))]
41pub enum ElasticsearchAuthConfig {
42 Basic {
44 #[configurable(metadata(docs::examples = "${ELASTICSEARCH_USERNAME}"))]
46 #[configurable(metadata(docs::examples = "username"))]
47 user: String,
48
49 #[configurable(metadata(docs::examples = "${ELASTICSEARCH_PASSWORD}"))]
51 #[configurable(metadata(docs::examples = "password"))]
52 password: SensitiveString,
53 },
54
55 #[cfg(feature = "aws-core")]
56 Aws(crate::aws::AwsAuthentication),
58}
59
60#[configurable_component]
62#[derive(Clone, Debug, Eq, PartialEq)]
63#[serde(deny_unknown_fields, rename_all = "snake_case")]
64pub enum ElasticsearchMode {
65 #[serde(alias = "normal")]
67 Bulk,
68
69 DataStream,
76}
77
78impl Default for ElasticsearchMode {
79 fn default() -> Self {
80 Self::Bulk
81 }
82}
83
84#[configurable_component]
86#[derive(Clone, Copy, Debug, Derivative, Eq, Hash, PartialEq)]
87#[serde(deny_unknown_fields, rename_all = "snake_case")]
88pub enum BulkAction {
89 Index,
91
92 Create,
94
95 Update,
97}
98
99#[allow(clippy::trivially_copy_pass_by_ref)]
100impl BulkAction {
101 pub const fn as_str(&self) -> &'static str {
102 match self {
103 BulkAction::Index => "index",
104 BulkAction::Create => "create",
105 BulkAction::Update => "update",
106 }
107 }
108
109 pub const fn as_json_pointer(&self) -> &'static str {
110 match self {
111 BulkAction::Index => "/index",
112 BulkAction::Create => "/create",
113 BulkAction::Update => "/update",
114 }
115 }
116}
117
118impl TryFrom<&str> for BulkAction {
119 type Error = String;
120
121 fn try_from(input: &str) -> Result<Self, Self::Error> {
122 match input {
123 "index" => Ok(BulkAction::Index),
124 "create" => Ok(BulkAction::Create),
125 "update" => Ok(BulkAction::Update),
126 _ => Err(format!("Invalid bulk action: {input}")),
127 }
128 }
129}
130
131#[configurable_component]
133#[derive(Clone, Copy, Debug, Derivative, Eq, Hash, PartialEq)]
134#[serde(deny_unknown_fields, rename_all = "snake_case")]
135pub enum VersionType {
136 Internal,
138
139 External,
141
142 ExternalGte,
144}
145
146#[allow(clippy::trivially_copy_pass_by_ref)]
147impl VersionType {
148 pub const fn as_str(&self) -> &'static str {
149 match self {
150 Self::Internal => "internal",
151 Self::External => "external",
152 Self::ExternalGte => "external_gte",
153 }
154 }
155}
156
157impl TryFrom<&str> for VersionType {
158 type Error = String;
159
160 fn try_from(input: &str) -> Result<Self, Self::Error> {
161 match input {
162 "internal" => Ok(VersionType::Internal),
163 "external" | "external_gt" => Ok(VersionType::External),
164 "external_gte" => Ok(VersionType::ExternalGte),
165 _ => Err(format!("Invalid versioning mode: {input}")),
166 }
167 }
168}
169
170impl_generate_config_from_default!(ElasticsearchConfig);
171
172#[derive(Debug, Clone)]
173pub enum ElasticsearchCommonMode {
174 Bulk {
175 index: Template,
176 template_fallback_index: Option<String>,
177 action: Template,
178 version: Option<Template>,
179 version_type: VersionType,
180 },
181 DataStream(DataStreamConfig),
182}
183
184struct VersionValueParseError<'a> {
185 value: &'a str,
186}
187
188impl internal_event::InternalEvent for VersionValueParseError<'_> {
189 fn emit(self) {
190 warn!("{self}")
191 }
192}
193
194impl fmt::Display for VersionValueParseError<'_> {
195 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
196 write!(f, "Cannot parse version \"{}\" as integer", self.value)
197 }
198}
199
200impl ElasticsearchCommonMode {
201 fn index(&self, log: &LogEvent) -> Option<String> {
202 match self {
203 Self::Bulk {
204 index,
205 template_fallback_index,
206 ..
207 } => index
208 .render_string(log)
209 .or_else(|error| {
210 if let Some(fallback) = template_fallback_index {
211 emit!(TemplateRenderingError {
212 error,
213 field: Some("index"),
214 drop_event: false,
215 });
216 Ok(fallback.clone())
217 } else {
218 emit!(TemplateRenderingError {
219 error,
220 field: Some("index"),
221 drop_event: true,
222 });
223 Err(())
224 }
225 })
226 .ok(),
227 Self::DataStream(ds) => ds.index(log),
228 }
229 }
230
231 fn bulk_action<'a>(&self, event: impl Into<EventRef<'a>>) -> Option<BulkAction> {
232 match self {
233 ElasticsearchCommonMode::Bulk {
234 action: bulk_action_template,
235 ..
236 } => bulk_action_template
237 .render_string(event)
238 .map_err(|error| {
239 emit!(TemplateRenderingError {
240 error,
241 field: Some("bulk_action"),
242 drop_event: true,
243 });
244 })
245 .ok()
246 .and_then(|value| BulkAction::try_from(value.as_str()).ok()),
247 ElasticsearchCommonMode::DataStream(_) => Some(BulkAction::Create),
249 }
250 }
251
252 fn version<'a>(&self, event: impl Into<EventRef<'a>>) -> Option<u64> {
253 match self {
254 ElasticsearchCommonMode::Bulk {
255 version: Some(version),
256 ..
257 } => version
258 .render_string(event)
259 .map_err(|error| {
260 emit!(TemplateRenderingError {
261 error,
262 field: Some("version"),
263 drop_event: true,
264 });
265 })
266 .ok()
267 .as_ref()
268 .and_then(|value| {
269 value
270 .parse()
271 .map_err(|_| emit!(VersionValueParseError { value }))
272 .ok()
273 }),
274 _ => None,
275 }
276 }
277
278 const fn version_type(&self) -> Option<VersionType> {
279 match self {
280 ElasticsearchCommonMode::Bulk { version_type, .. } => Some(*version_type),
281 _ => Some(VersionType::Internal),
282 }
283 }
284
285 const fn as_data_stream_config(&self) -> Option<&DataStreamConfig> {
286 match self {
287 Self::DataStream(value) => Some(value),
288 _ => None,
289 }
290 }
291}
292
293#[configurable_component]
295#[derive(Clone, Debug, Eq, PartialEq)]
296#[cfg_attr(feature = "proptest", derive(proptest_derive::Arbitrary))]
297#[serde(deny_unknown_fields, rename_all = "snake_case")]
298pub enum ElasticsearchApiVersion {
299 Auto,
309 V6,
311 V7,
313 V8,
315}
316
317impl Default for ElasticsearchApiVersion {
318 fn default() -> Self {
319 Self::Auto
320 }
321}
322
323#[derive(Debug, Snafu)]
324#[snafu(visibility(pub))]
325pub enum ParseError {
326 #[snafu(display("Invalid host {:?}: {:?}", host, source))]
327 InvalidHost { host: String, source: InvalidUri },
328 #[snafu(display("Host {:?} must include hostname", host))]
329 HostMustIncludeHostname { host: String },
330 #[snafu(display("Index template parse error: {}", source))]
331 IndexTemplate { source: TemplateParseError },
332 #[snafu(display("Batch action template parse error: {}", source))]
333 BatchActionTemplate { source: TemplateParseError },
334 #[cfg(feature = "aws-core")]
335 #[snafu(display("aws.region required when AWS authentication is in use"))]
336 RegionRequired,
337 #[snafu(display("Endpoints option must be specified"))]
338 EndpointRequired,
339 #[snafu(display(
340 "`endpoint` and `endpoints` options are mutually exclusive. Please use `endpoints` option."
341 ))]
342 EndpointsExclusive,
343 #[snafu(display("Tried to use external versioning without specifying the version itself"))]
344 ExternalVersioningWithoutVersion,
345 #[snafu(display("Cannot use external versioning without specifying a document ID"))]
346 ExternalVersioningWithoutDocumentID,
347 #[snafu(display("Your version field will be ignored because you use internal versioning"))]
348 ExternalVersionIgnoredWithInternalVersioning,
349 #[snafu(display("Amazon OpenSearch Serverless requires `api_version` value to be `auto`"))]
350 ServerlessElasticsearchApiVersionMustBeAuto,
351 #[snafu(display("Amazon OpenSearch Serverless requires `auth.strategy` value to be `aws`"))]
352 OpenSearchServerlessRequiresAwsAuth,
353}