vector/sinks/elasticsearch/
mod.rsmod common;
mod config;
pub mod encoder;
pub mod health;
pub mod request_builder;
pub mod retry;
pub mod service;
pub mod sink;
#[cfg(test)]
mod tests;
#[cfg(test)]
#[cfg(feature = "es-integration-tests")]
mod integration_tests;
use std::{convert::TryFrom, fmt};
pub use common::*;
pub use config::*;
pub use encoder::ElasticsearchEncoder;
use http::{uri::InvalidUri, Request};
use snafu::Snafu;
use vector_lib::sensitive_string::SensitiveString;
use vector_lib::{configurable::configurable_component, internal_event};
use crate::{
event::{EventRef, LogEvent},
internal_events::TemplateRenderingError,
template::{Template, TemplateParseError},
};
#[configurable_component]
#[derive(Clone, Debug)]
#[serde(deny_unknown_fields, rename_all = "snake_case", tag = "strategy")]
#[configurable(metadata(
docs::enum_tag_description = "The authentication strategy to use.\n\nAmazon OpenSearch Serverless requires this option to be set to `aws`."
))]
pub enum ElasticsearchAuthConfig {
Basic {
#[configurable(metadata(docs::examples = "${ELASTICSEARCH_USERNAME}"))]
#[configurable(metadata(docs::examples = "username"))]
user: String,
#[configurable(metadata(docs::examples = "${ELASTICSEARCH_PASSWORD}"))]
#[configurable(metadata(docs::examples = "password"))]
password: SensitiveString,
},
#[cfg(feature = "aws-core")]
Aws(crate::aws::AwsAuthentication),
}
#[configurable_component]
#[derive(Clone, Debug, Eq, PartialEq)]
#[serde(deny_unknown_fields, rename_all = "snake_case")]
pub enum ElasticsearchMode {
#[serde(alias = "normal")]
Bulk,
DataStream,
}
impl Default for ElasticsearchMode {
fn default() -> Self {
Self::Bulk
}
}
#[configurable_component]
#[derive(Clone, Copy, Debug, Derivative, Eq, Hash, PartialEq)]
#[serde(deny_unknown_fields, rename_all = "snake_case")]
pub enum BulkAction {
Index,
Create,
Update,
}
#[allow(clippy::trivially_copy_pass_by_ref)]
impl BulkAction {
pub const fn as_str(&self) -> &'static str {
match self {
BulkAction::Index => "index",
BulkAction::Create => "create",
BulkAction::Update => "update",
}
}
pub const fn as_json_pointer(&self) -> &'static str {
match self {
BulkAction::Index => "/index",
BulkAction::Create => "/create",
BulkAction::Update => "/update",
}
}
}
impl TryFrom<&str> for BulkAction {
type Error = String;
fn try_from(input: &str) -> Result<Self, Self::Error> {
match input {
"index" => Ok(BulkAction::Index),
"create" => Ok(BulkAction::Create),
"update" => Ok(BulkAction::Update),
_ => Err(format!("Invalid bulk action: {}", input)),
}
}
}
#[configurable_component]
#[derive(Clone, Copy, Debug, Derivative, Eq, Hash, PartialEq)]
#[serde(deny_unknown_fields, rename_all = "snake_case")]
pub enum VersionType {
Internal,
External,
ExternalGte,
}
#[allow(clippy::trivially_copy_pass_by_ref)]
impl VersionType {
pub const fn as_str(&self) -> &'static str {
match self {
Self::Internal => "internal",
Self::External => "external",
Self::ExternalGte => "external_gte",
}
}
}
impl TryFrom<&str> for VersionType {
type Error = String;
fn try_from(input: &str) -> Result<Self, Self::Error> {
match input {
"internal" => Ok(VersionType::Internal),
"external" | "external_gt" => Ok(VersionType::External),
"external_gte" => Ok(VersionType::ExternalGte),
_ => Err(format!("Invalid versioning mode: {}", input)),
}
}
}
impl_generate_config_from_default!(ElasticsearchConfig);
#[derive(Debug, Clone)]
pub enum ElasticsearchCommonMode {
Bulk {
index: Template,
template_fallback_index: Option<String>,
action: Template,
version: Option<Template>,
version_type: VersionType,
},
DataStream(DataStreamConfig),
}
struct VersionValueParseError<'a> {
value: &'a str,
}
impl internal_event::InternalEvent for VersionValueParseError<'_> {
fn emit(self) {
warn!("{self}")
}
}
impl fmt::Display for VersionValueParseError<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Cannot parse version \"{}\" as integer", self.value)
}
}
impl ElasticsearchCommonMode {
fn index(&self, log: &LogEvent) -> Option<String> {
match self {
Self::Bulk {
index,
template_fallback_index,
..
} => index
.render_string(log)
.or_else(|error| {
if let Some(fallback) = template_fallback_index {
emit!(TemplateRenderingError {
error,
field: Some("index"),
drop_event: false,
});
Ok(fallback.clone())
} else {
emit!(TemplateRenderingError {
error,
field: Some("index"),
drop_event: true,
});
Err(())
}
})
.ok(),
Self::DataStream(ds) => ds.index(log),
}
}
fn bulk_action<'a>(&self, event: impl Into<EventRef<'a>>) -> Option<BulkAction> {
match self {
ElasticsearchCommonMode::Bulk {
action: bulk_action_template,
..
} => bulk_action_template
.render_string(event)
.map_err(|error| {
emit!(TemplateRenderingError {
error,
field: Some("bulk_action"),
drop_event: true,
});
})
.ok()
.and_then(|value| BulkAction::try_from(value.as_str()).ok()),
ElasticsearchCommonMode::DataStream(_) => Some(BulkAction::Create),
}
}
fn version<'a>(&self, event: impl Into<EventRef<'a>>) -> Option<u64> {
match self {
ElasticsearchCommonMode::Bulk {
version: Some(version),
..
} => version
.render_string(event)
.map_err(|error| {
emit!(TemplateRenderingError {
error,
field: Some("version"),
drop_event: true,
});
})
.ok()
.as_ref()
.and_then(|value| {
value
.parse()
.map_err(|_| emit!(VersionValueParseError { value }))
.ok()
}),
_ => None,
}
}
const fn version_type(&self) -> Option<VersionType> {
match self {
ElasticsearchCommonMode::Bulk { version_type, .. } => Some(*version_type),
_ => Some(VersionType::Internal),
}
}
const fn as_data_stream_config(&self) -> Option<&DataStreamConfig> {
match self {
Self::DataStream(value) => Some(value),
_ => None,
}
}
}
#[configurable_component]
#[derive(Clone, Debug, Eq, PartialEq)]
#[cfg_attr(feature = "proptest", derive(proptest_derive::Arbitrary))]
#[serde(deny_unknown_fields, rename_all = "snake_case")]
pub enum ElasticsearchApiVersion {
Auto,
V6,
V7,
V8,
}
impl Default for ElasticsearchApiVersion {
fn default() -> Self {
Self::Auto
}
}
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum ParseError {
#[snafu(display("Invalid host {:?}: {:?}", host, source))]
InvalidHost { host: String, source: InvalidUri },
#[snafu(display("Host {:?} must include hostname", host))]
HostMustIncludeHostname { host: String },
#[snafu(display("Index template parse error: {}", source))]
IndexTemplate { source: TemplateParseError },
#[snafu(display("Batch action template parse error: {}", source))]
BatchActionTemplate { source: TemplateParseError },
#[cfg(feature = "aws-core")]
#[snafu(display("aws.region required when AWS authentication is in use"))]
RegionRequired,
#[snafu(display("Endpoints option must be specified"))]
EndpointRequired,
#[snafu(display(
"`endpoint` and `endpoints` options are mutually exclusive. Please use `endpoints` option."
))]
EndpointsExclusive,
#[snafu(display("Tried to use external versioning without specifying the version itself"))]
ExternalVersioningWithoutVersion,
#[snafu(display("Cannot use external versioning without specifying a document ID"))]
ExternalVersioningWithoutDocumentID,
#[snafu(display("Your version field will be ignored because you use internal versioning"))]
ExternalVersionIgnoredWithInternalVersioning,
#[snafu(display("Amazon OpenSearch Serverless requires `api_version` value to be `auto`"))]
ServerlessElasticsearchApiVersionMustBeAuto,
#[snafu(display("Amazon OpenSearch Serverless requires `auth.strategy` value to be `aws`"))]
OpenSearchServerlessRequiresAwsAuth,
}