vector/sinks/elasticsearch/
common.rs

1use bytes::{Buf, Bytes};
2use http::{Response, StatusCode, Uri};
3use hyper::{body, Body};
4use serde::Deserialize;
5use snafu::ResultExt;
6use vector_lib::config::proxy::ProxyConfig;
7use vector_lib::config::LogNamespace;
8
9use super::{
10    request_builder::ElasticsearchRequestBuilder, ElasticsearchApiVersion, ElasticsearchEncoder,
11    InvalidHostSnafu, Request, VersionType,
12};
13use crate::{
14    http::{HttpClient, MaybeAuth, ParameterValue, QueryParameterValue, QueryParameters},
15    sinks::{
16        elasticsearch::{
17            ElasticsearchAuthConfig, ElasticsearchCommonMode, ElasticsearchConfig,
18            OpenSearchServiceType, ParseError,
19        },
20        util::{auth::Auth, http::RequestConfig, UriSerde},
21        HealthcheckError,
22    },
23    tls::TlsSettings,
24    transforms::metric_to_log::MetricToLog,
25};
26
27#[derive(Debug, Clone)]
28pub struct ElasticsearchCommon {
29    pub base_url: String,
30    pub bulk_uri: Uri,
31    pub auth: Option<Auth>,
32    pub service_type: OpenSearchServiceType,
33    pub mode: ElasticsearchCommonMode,
34    pub request_builder: ElasticsearchRequestBuilder,
35    pub tls_settings: TlsSettings,
36    pub request: RequestConfig,
37    pub query_params: QueryParameters,
38    pub metric_to_log: MetricToLog,
39}
40
41impl ElasticsearchCommon {
42    pub async fn parse_config(
43        config: &ElasticsearchConfig,
44        endpoint: &str,
45        proxy_config: &ProxyConfig,
46        version: &mut Option<usize>,
47    ) -> crate::Result<Self> {
48        // Test the configured host
49        Self::check_endpoint(endpoint)?;
50
51        let uri = endpoint.parse::<UriSerde>()?;
52
53        // get auth from config or uri
54        let auth = Self::extract_auth(config, proxy_config, &uri).await?;
55
56        if config.opensearch_service_type == OpenSearchServiceType::Serverless {
57            match &config.auth {
58                #[cfg(feature = "aws-core")]
59                Some(ElasticsearchAuthConfig::Aws(_)) => (),
60                _ => return Err(ParseError::OpenSearchServerlessRequiresAwsAuth.into()),
61            }
62        }
63
64        let base_url = uri.uri.to_string().trim_end_matches('/').to_owned();
65
66        let mode = config.common_mode()?;
67
68        let tower_request = config.request.tower.into_settings();
69
70        if config.bulk.version.is_some() && config.bulk.version_type == VersionType::Internal {
71            return Err(ParseError::ExternalVersionIgnoredWithInternalVersioning.into());
72        }
73        if config.bulk.version.is_some()
74            && (config.bulk.version_type == VersionType::External
75                || config.bulk.version_type == VersionType::ExternalGte)
76            && config.id_key.is_none()
77        {
78            return Err(ParseError::ExternalVersioningWithoutDocumentID.into());
79        }
80        if config.bulk.version.is_none()
81            && (config.bulk.version_type == VersionType::External
82                || config.bulk.version_type == VersionType::ExternalGte)
83        {
84            return Err(ParseError::ExternalVersioningWithoutVersion.into());
85        }
86
87        let mut query_params = config.query.clone().unwrap_or_default();
88        query_params.insert(
89            "timeout".into(),
90            QueryParameterValue::SingleParam(ParameterValue::String(format!(
91                "{}s",
92                tower_request.timeout.as_secs()
93            ))),
94        );
95
96        if let Some(pipeline) = &config.pipeline {
97            if !pipeline.is_empty() {
98                query_params.insert(
99                    "pipeline".into(),
100                    QueryParameterValue::SingleParam(ParameterValue::String(pipeline.into())),
101                );
102            }
103        }
104
105        let bulk_url = {
106            let mut query = url::form_urlencoded::Serializer::new(String::new());
107            // Iterate through the HashMap
108            for (param_name, param_value) in &query_params {
109                match param_value {
110                    QueryParameterValue::SingleParam(param) => {
111                        // For single parameter, just append one pair
112                        query.append_pair(param_name, param.value());
113                    }
114                    QueryParameterValue::MultiParams(params) => {
115                        // For multiple parameters, append the same key multiple times
116                        for value in params {
117                            query.append_pair(param_name, value.value());
118                        }
119                    }
120                }
121            }
122            format!("{}/_bulk?{}", base_url, query.finish())
123        };
124        let bulk_uri = bulk_url.parse::<Uri>().unwrap();
125
126        let tls_settings = TlsSettings::from_options(config.tls.as_ref())?;
127        let config = config.clone();
128        let request = config.request;
129
130        let metric_config = config.metrics.clone().unwrap_or_default();
131        let metric_to_log = MetricToLog::new(
132            metric_config.host_tag.as_deref(),
133            metric_config.timezone.unwrap_or_default(),
134            LogNamespace::Legacy,
135            metric_config.metric_tag_values,
136        );
137
138        let service_type = config.opensearch_service_type;
139
140        let version = if service_type == OpenSearchServiceType::Serverless {
141            if config.api_version != ElasticsearchApiVersion::Auto {
142                return Err(ParseError::ServerlessElasticsearchApiVersionMustBeAuto.into());
143            }
144            // Amazon OpenSearch Serverless does not support the cluster-version API; hardcode
145            // well-known API version
146            8
147        } else if let Some(version) = *version {
148            version
149        } else {
150            let ver = match config.api_version {
151                ElasticsearchApiVersion::V6 => 6,
152                ElasticsearchApiVersion::V7 => 7,
153                ElasticsearchApiVersion::V8 => 8,
154                ElasticsearchApiVersion::Auto => {
155                    match get_version(
156                        &base_url,
157                        auth.as_ref(),
158                        #[cfg(feature = "aws-core")]
159                        &service_type,
160                        &request,
161                        &tls_settings,
162                        proxy_config,
163                    )
164                    .await
165                    {
166                        Ok(version) => {
167                            debug!(message = "Auto-detected Elasticsearch API version.", %version);
168                            version
169                        }
170                        // This error should be fatal, but for now we only emit it as a warning
171                        // to make the transition smoother.
172                        Err(error) => {
173                            // For now, estimate version.
174                            // The `suppress_type_name` option is only valid up to V6, so if a user
175                            // specified that is true, then we will assume they need API V6.
176                            // Otherwise, assume the latest version (V8).
177                            // This is by no means a perfect assumption but it's the best we can
178                            // make with the data we have.
179                            let assumed_version = if config.suppress_type_name { 6 } else { 8 };
180                            debug!(message = "Assumed Elasticsearch API version based on config setting suppress_type_name.",
181                                   %assumed_version,
182                                   %config.suppress_type_name
183                            );
184                            warn!(message = "Failed to determine Elasticsearch API version. Please fix the reported error or set an API version explicitly via `api_version`.",
185                                  %assumed_version,
186                                  %error
187                            );
188                            assumed_version
189                        }
190                    }
191                }
192            };
193            *version = Some(ver);
194            ver
195        };
196
197        let doc_type = config.doc_type.clone();
198        let suppress_type_name = if config.suppress_type_name {
199            warn!(message = "DEPRECATION, use of deprecated option `suppress_type_name`. Please use `api_version` option instead.");
200            config.suppress_type_name
201        } else {
202            version >= 7
203        };
204        let request_builder = ElasticsearchRequestBuilder {
205            compression: config.compression,
206            encoder: ElasticsearchEncoder {
207                transformer: config.encoding.clone(),
208                doc_type,
209                suppress_type_name,
210            },
211        };
212
213        Ok(Self {
214            auth,
215            service_type,
216            base_url,
217            bulk_uri,
218            mode,
219            request_builder,
220            query_params,
221            request,
222            tls_settings,
223            metric_to_log,
224        })
225    }
226
227    fn check_endpoint(endpoint: &str) -> crate::Result<()> {
228        let uri = format!("{endpoint}/_test");
229        let uri = uri
230            .parse::<Uri>()
231            .with_context(|_| InvalidHostSnafu { host: endpoint })?;
232        if uri.host().is_none() {
233            return Err(ParseError::HostMustIncludeHostname {
234                host: endpoint.to_string(),
235            }
236            .into());
237        }
238        Ok(())
239    }
240
241    // extract the authentication from config or endpoint
242    async fn extract_auth(
243        config: &ElasticsearchConfig,
244        #[cfg_attr(not(feature = "aws-core"), allow(unused_variables))] proxy_config: &ProxyConfig,
245        uri: &UriSerde,
246    ) -> crate::Result<Option<Auth>> {
247        let auth = match &config.auth {
248            Some(ElasticsearchAuthConfig::Basic { user, password }) => {
249                let auth = Some(crate::http::Auth::Basic {
250                    user: user.clone(),
251                    password: password.clone(),
252                });
253                // get whichever auth is provided between config and uri, prevent duplicate auth.
254                let auth = auth.choose_one(&uri.auth)?.unwrap();
255                Some(Auth::Basic(auth))
256            }
257            #[cfg(feature = "aws-core")]
258            Some(ElasticsearchAuthConfig::Aws(aws)) => {
259                let region = config
260                    .aws
261                    .as_ref()
262                    .map(|config| config.region())
263                    .ok_or(ParseError::RegionRequired)?
264                    .ok_or(ParseError::RegionRequired)?;
265                Some(Auth::Aws {
266                    credentials_provider: aws
267                        .credentials_provider(region.clone(), proxy_config, config.tls.as_ref())
268                        .await?,
269                    region,
270                })
271            }
272            None => {
273                // Use the authentication from the URL if it exists
274                uri.auth.as_ref().and_then(|auth| match auth {
275                    crate::http::Auth::Basic { user, password } => {
276                        Some(Auth::Basic(crate::http::Auth::Basic {
277                            user: user.clone(),
278                            password: password.clone(),
279                        }))
280                    }
281                    _ => None,
282                })
283            }
284        };
285        Ok(auth)
286    }
287
288    /// Parses endpoints into a vector of ElasticsearchCommons. The resulting vector is guaranteed to not be empty.
289    pub async fn parse_many(
290        config: &ElasticsearchConfig,
291        proxy_config: &ProxyConfig,
292    ) -> crate::Result<Vec<Self>> {
293        let mut version = None;
294        if let Some(endpoint) = config.endpoint.as_ref() {
295            warn!(message = "DEPRECATION, use of deprecated option `endpoint`. Please use `endpoints` option instead.");
296            if config.endpoints.is_empty() {
297                Ok(vec![
298                    Self::parse_config(config, endpoint, proxy_config, &mut version).await?,
299                ])
300            } else {
301                Err(ParseError::EndpointsExclusive.into())
302            }
303        } else if config.endpoints.is_empty() {
304            Err(ParseError::EndpointRequired.into())
305        } else {
306            let mut commons = Vec::new();
307            for endpoint in config.endpoints.iter() {
308                commons
309                    .push(Self::parse_config(config, endpoint, proxy_config, &mut version).await?);
310            }
311            Ok(commons)
312        }
313    }
314
315    /// Parses a single endpoint, else panics.
316    #[cfg(test)]
317    pub async fn parse_single(config: &ElasticsearchConfig) -> crate::Result<Self> {
318        let mut commons =
319            Self::parse_many(config, crate::config::SinkContext::default().proxy()).await?;
320        assert_eq!(commons.len(), 1);
321        Ok(commons.remove(0))
322    }
323
324    pub async fn healthcheck(self, client: HttpClient) -> crate::Result<()> {
325        if self.service_type == OpenSearchServiceType::Serverless {
326            warn!(message = "Amazon OpenSearch Serverless does not support healthchecks. Skipping healthcheck...");
327            Ok(())
328        } else {
329            match get(
330                &self.base_url,
331                self.auth.as_ref(),
332                #[cfg(feature = "aws-core")]
333                &self.service_type,
334                &self.request,
335                client,
336                "/_cluster/health",
337            )
338            .await?
339            .status()
340            {
341                StatusCode::OK => Ok(()),
342                status => Err(HealthcheckError::UnexpectedStatus { status }.into()),
343            }
344        }
345    }
346}
347
348#[cfg(feature = "aws-core")]
349pub async fn sign_request(
350    service_type: &OpenSearchServiceType,
351    request: &mut http::Request<Bytes>,
352    credentials_provider: &aws_credential_types::provider::SharedCredentialsProvider,
353    region: Option<&aws_types::region::Region>,
354) -> crate::Result<()> {
355    // Amazon OpenSearch Serverless requires the x-amz-content-sha256 header when calculating
356    // the AWS v4 signature:
357    // https://docs.aws.amazon.com/opensearch-service/latest/developerguide/serverless-clients.html#serverless-signing
358    crate::aws::sign_request(
359        service_type.as_str(),
360        request,
361        credentials_provider,
362        region,
363        *service_type == OpenSearchServiceType::Serverless,
364    )
365    .await
366}
367
368async fn get_version(
369    base_url: &str,
370    auth: Option<&Auth>,
371    #[cfg(feature = "aws-core")] service_type: &OpenSearchServiceType,
372    request: &RequestConfig,
373    tls_settings: &TlsSettings,
374    proxy_config: &ProxyConfig,
375) -> crate::Result<usize> {
376    #[derive(Deserialize)]
377    struct Version {
378        number: Option<String>,
379    }
380    #[derive(Deserialize)]
381    struct ResponsePayload {
382        version: Option<Version>,
383    }
384
385    let client = HttpClient::new(tls_settings.clone(), proxy_config)?;
386    let response = get(
387        base_url,
388        auth,
389        #[cfg(feature = "aws-core")]
390        service_type,
391        request,
392        client,
393        "/",
394    )
395    .await
396    .map_err(|error| format!("Failed to get Elasticsearch API version: {error}"))?;
397
398    let (_, body) = response.into_parts();
399    let mut body = body::aggregate(body).await?;
400    let body = body.copy_to_bytes(body.remaining());
401    let ResponsePayload { version } = serde_json::from_slice(&body)?;
402    if let Some(version) = version {
403        if let Some(number) = version.number {
404            let v: Vec<&str> = number.split('.').collect();
405            if !v.is_empty() {
406                if let Ok(major_version) = v[0].parse::<usize>() {
407                    return Ok(major_version);
408                }
409            }
410        }
411    }
412    Err("Unexpected response from Elasticsearch endpoint `/`. Consider setting `api_version` option.".into())
413}
414
415async fn get(
416    base_url: &str,
417    auth: Option<&Auth>,
418    #[cfg(feature = "aws-core")] service_type: &OpenSearchServiceType,
419    request: &RequestConfig,
420    client: HttpClient,
421    path: &str,
422) -> crate::Result<Response<Body>> {
423    let mut builder = Request::get(format!("{base_url}{path}"));
424
425    for (header, value) in &request.headers {
426        builder = builder.header(&header[..], &value[..]);
427    }
428    let mut request = builder.body(Bytes::new())?;
429
430    if let Some(auth) = auth {
431        match auth {
432            Auth::Basic(http_auth) => {
433                http_auth.apply(&mut request);
434            }
435            #[cfg(feature = "aws-core")]
436            Auth::Aws {
437                credentials_provider: provider,
438                region,
439            } => {
440                let region = region.clone();
441                sign_request(service_type, &mut request, provider, Some(&region)).await?;
442            }
443        }
444    }
445
446    client
447        .send(request.map(hyper::Body::from))
448        .await
449        .map_err(Into::into)
450}