vector/sinks/elasticsearch/
common.rs

1use bytes::{Buf, Bytes};
2use http::{Response, StatusCode, Uri};
3use http_body::Body as _;
4use hyper::Body;
5use serde::Deserialize;
6use snafu::ResultExt;
7use vector_lib::config::{LogNamespace, proxy::ProxyConfig};
8
9use super::{
10    ElasticsearchApiVersion, ElasticsearchEncoder, InvalidHostSnafu, Request, VersionType,
11    request_builder::ElasticsearchRequestBuilder,
12};
13use crate::{
14    http::{HttpClient, MaybeAuth, ParameterValue, QueryParameterValue, QueryParameters},
15    sinks::{
16        HealthcheckError,
17        elasticsearch::{
18            ElasticsearchAuthConfig, ElasticsearchCommonMode, ElasticsearchConfig,
19            OpenSearchServiceType, ParseError,
20        },
21        util::{UriSerde, auth::Auth, http::RequestConfig},
22    },
23    tls::TlsSettings,
24    transforms::metric_to_log::MetricToLog,
25};
26
27#[derive(Debug, Clone)]
28pub struct ElasticsearchCommon {
29    pub base_url: String,
30    pub bulk_uri: Uri,
31    pub auth: Option<Auth>,
32    pub service_type: OpenSearchServiceType,
33    pub mode: ElasticsearchCommonMode,
34    pub request_builder: ElasticsearchRequestBuilder,
35    pub tls_settings: TlsSettings,
36    pub request: RequestConfig,
37    pub query_params: QueryParameters,
38    pub metric_to_log: MetricToLog,
39}
40
41impl ElasticsearchCommon {
42    pub async fn parse_config(
43        config: &ElasticsearchConfig,
44        endpoint: &str,
45        proxy_config: &ProxyConfig,
46        version: &mut Option<usize>,
47    ) -> crate::Result<Self> {
48        // Test the configured host
49        Self::check_endpoint(endpoint)?;
50
51        let uri = endpoint.parse::<UriSerde>()?;
52
53        // get auth from config or uri
54        let auth = Self::extract_auth(config, proxy_config, &uri).await?;
55
56        if config.opensearch_service_type == OpenSearchServiceType::Serverless {
57            match &config.auth {
58                #[cfg(feature = "aws-core")]
59                Some(ElasticsearchAuthConfig::Aws(_)) => (),
60                _ => return Err(ParseError::OpenSearchServerlessRequiresAwsAuth.into()),
61            }
62        }
63
64        let base_url = uri.uri.to_string().trim_end_matches('/').to_owned();
65
66        let mode = config.common_mode()?;
67
68        let tower_request = config.request.tower.into_settings();
69
70        if config.bulk.version.is_some() && config.bulk.version_type == VersionType::Internal {
71            return Err(ParseError::ExternalVersionIgnoredWithInternalVersioning.into());
72        }
73        if config.bulk.version.is_some()
74            && (config.bulk.version_type == VersionType::External
75                || config.bulk.version_type == VersionType::ExternalGte)
76            && config.id_key.is_none()
77        {
78            return Err(ParseError::ExternalVersioningWithoutDocumentID.into());
79        }
80        if config.bulk.version.is_none()
81            && (config.bulk.version_type == VersionType::External
82                || config.bulk.version_type == VersionType::ExternalGte)
83        {
84            return Err(ParseError::ExternalVersioningWithoutVersion.into());
85        }
86
87        let mut query_params = config.query.clone().unwrap_or_default();
88        query_params.insert(
89            "timeout".into(),
90            QueryParameterValue::SingleParam(ParameterValue::String(format!(
91                "{}s",
92                tower_request.timeout.as_secs()
93            ))),
94        );
95
96        if let Some(pipeline) = &config.pipeline
97            && !pipeline.is_empty()
98        {
99            query_params.insert(
100                "pipeline".into(),
101                QueryParameterValue::SingleParam(ParameterValue::String(pipeline.into())),
102            );
103        }
104
105        let bulk_url = {
106            let mut query = url::form_urlencoded::Serializer::new(String::new());
107            // Iterate through the HashMap
108            for (param_name, param_value) in &query_params {
109                match param_value {
110                    QueryParameterValue::SingleParam(param) => {
111                        // For single parameter, just append one pair
112                        query.append_pair(param_name, param.value());
113                    }
114                    QueryParameterValue::MultiParams(params) => {
115                        // For multiple parameters, append the same key multiple times
116                        for value in params {
117                            query.append_pair(param_name, value.value());
118                        }
119                    }
120                }
121            }
122            format!("{}/_bulk?{}", base_url, query.finish())
123        };
124        let bulk_uri = bulk_url.parse::<Uri>().unwrap();
125
126        let tls_settings = TlsSettings::from_options(config.tls.as_ref())?;
127        let config = config.clone();
128        let request = config.request;
129
130        let metric_config = config.metrics.clone().unwrap_or_default();
131        let metric_to_log = MetricToLog::new(
132            metric_config.host_tag.as_deref(),
133            metric_config.timezone.unwrap_or_default(),
134            LogNamespace::Legacy,
135            metric_config.metric_tag_values,
136        );
137
138        let service_type = config.opensearch_service_type;
139
140        let version = if service_type == OpenSearchServiceType::Serverless {
141            if config.api_version != ElasticsearchApiVersion::Auto {
142                return Err(ParseError::ServerlessElasticsearchApiVersionMustBeAuto.into());
143            }
144            // Amazon OpenSearch Serverless does not support the cluster-version API; hardcode
145            // well-known API version
146            8
147        } else if let Some(version) = *version {
148            version
149        } else {
150            let ver = match config.api_version {
151                ElasticsearchApiVersion::V6 => 6,
152                ElasticsearchApiVersion::V7 => 7,
153                ElasticsearchApiVersion::V8 => 8,
154                ElasticsearchApiVersion::Auto => {
155                    match get_version(
156                        &base_url,
157                        auth.as_ref(),
158                        #[cfg(feature = "aws-core")]
159                        &service_type,
160                        &request,
161                        &tls_settings,
162                        proxy_config,
163                    )
164                    .await
165                    {
166                        Ok(version) => {
167                            debug!(message = "Auto-detected Elasticsearch API version.", %version);
168                            version
169                        }
170                        // This error should be fatal, but for now we only emit it as a warning
171                        // to make the transition smoother.
172                        Err(error) => {
173                            // For now, estimate version.
174                            // The `suppress_type_name` option is only valid up to V6, so if a user
175                            // specified that is true, then we will assume they need API V6.
176                            // Otherwise, assume the latest version (V8).
177                            // This is by no means a perfect assumption but it's the best we can
178                            // make with the data we have.
179                            let assumed_version = if config.suppress_type_name { 6 } else { 8 };
180                            debug!(message = "Assumed Elasticsearch API version based on config setting suppress_type_name.",
181                                   %assumed_version,
182                                   %config.suppress_type_name
183                            );
184                            warn!(message = "Failed to determine Elasticsearch API version. Please fix the reported error or set an API version explicitly via `api_version`.",
185                                  %assumed_version,
186                                  %error
187                            );
188                            assumed_version
189                        }
190                    }
191                }
192            };
193            *version = Some(ver);
194            ver
195        };
196
197        let doc_type = config.doc_type.clone();
198        let suppress_type_name = if config.suppress_type_name {
199            warn!(
200                message = "DEPRECATION, use of deprecated option `suppress_type_name`. Please use `api_version` option instead."
201            );
202            config.suppress_type_name
203        } else {
204            version >= 7
205        };
206        let request_builder = ElasticsearchRequestBuilder {
207            compression: config.compression,
208            encoder: ElasticsearchEncoder {
209                transformer: config.encoding.clone(),
210                doc_type,
211                suppress_type_name,
212            },
213        };
214
215        Ok(Self {
216            auth,
217            service_type,
218            base_url,
219            bulk_uri,
220            mode,
221            request_builder,
222            query_params,
223            request,
224            tls_settings,
225            metric_to_log,
226        })
227    }
228
229    fn check_endpoint(endpoint: &str) -> crate::Result<()> {
230        let uri = format!("{endpoint}/_test");
231        let uri = uri
232            .parse::<Uri>()
233            .with_context(|_| InvalidHostSnafu { host: endpoint })?;
234        if uri.host().is_none() {
235            return Err(ParseError::HostMustIncludeHostname {
236                host: endpoint.to_string(),
237            }
238            .into());
239        }
240        Ok(())
241    }
242
243    // extract the authentication from config or endpoint
244    async fn extract_auth(
245        config: &ElasticsearchConfig,
246        #[cfg_attr(not(feature = "aws-core"), allow(unused_variables))] proxy_config: &ProxyConfig,
247        uri: &UriSerde,
248    ) -> crate::Result<Option<Auth>> {
249        let auth = match &config.auth {
250            Some(ElasticsearchAuthConfig::Basic { user, password }) => {
251                let auth = Some(crate::http::Auth::Basic {
252                    user: user.clone(),
253                    password: password.clone(),
254                });
255                // get whichever auth is provided between config and uri, prevent duplicate auth.
256                let auth = auth.choose_one(&uri.auth)?.unwrap();
257                Some(Auth::Basic(auth))
258            }
259            #[cfg(feature = "aws-core")]
260            Some(ElasticsearchAuthConfig::Aws(aws)) => {
261                let region = config
262                    .aws
263                    .as_ref()
264                    .map(|config| config.region())
265                    .ok_or(ParseError::RegionRequired)?
266                    .ok_or(ParseError::RegionRequired)?;
267                Some(Auth::Aws {
268                    credentials_provider: aws
269                        .credentials_provider(region.clone(), proxy_config, config.tls.as_ref())
270                        .await?,
271                    region,
272                })
273            }
274            None => {
275                // Use the authentication from the URL if it exists
276                uri.auth.as_ref().and_then(|auth| match auth {
277                    crate::http::Auth::Basic { user, password } => {
278                        Some(Auth::Basic(crate::http::Auth::Basic {
279                            user: user.clone(),
280                            password: password.clone(),
281                        }))
282                    }
283                    _ => None,
284                })
285            }
286        };
287        Ok(auth)
288    }
289
290    /// Parses endpoints into a vector of ElasticsearchCommons. The resulting vector is guaranteed to not be empty.
291    pub async fn parse_many(
292        config: &ElasticsearchConfig,
293        proxy_config: &ProxyConfig,
294    ) -> crate::Result<Vec<Self>> {
295        let mut version = None;
296        if let Some(endpoint) = config.endpoint.as_ref() {
297            warn!(
298                message = "DEPRECATION, use of deprecated option `endpoint`. Please use `endpoints` option instead."
299            );
300            if config.endpoints.is_empty() {
301                Ok(vec![
302                    Self::parse_config(config, endpoint, proxy_config, &mut version).await?,
303                ])
304            } else {
305                Err(ParseError::EndpointsExclusive.into())
306            }
307        } else if config.endpoints.is_empty() {
308            Err(ParseError::EndpointRequired.into())
309        } else {
310            let mut commons = Vec::new();
311            for endpoint in config.endpoints.iter() {
312                commons
313                    .push(Self::parse_config(config, endpoint, proxy_config, &mut version).await?);
314            }
315            Ok(commons)
316        }
317    }
318
319    /// Parses a single endpoint, else panics.
320    #[cfg(test)]
321    pub async fn parse_single(config: &ElasticsearchConfig) -> crate::Result<Self> {
322        let mut commons =
323            Self::parse_many(config, crate::config::SinkContext::default().proxy()).await?;
324        assert_eq!(commons.len(), 1);
325        Ok(commons.remove(0))
326    }
327
328    pub async fn healthcheck(self, client: HttpClient) -> crate::Result<()> {
329        if self.service_type == OpenSearchServiceType::Serverless {
330            warn!(
331                message = "Amazon OpenSearch Serverless does not support healthchecks. Skipping healthcheck..."
332            );
333            Ok(())
334        } else {
335            match get(
336                &self.base_url,
337                self.auth.as_ref(),
338                #[cfg(feature = "aws-core")]
339                &self.service_type,
340                &self.request,
341                client,
342                "/_cluster/health",
343            )
344            .await?
345            .status()
346            {
347                StatusCode::OK => Ok(()),
348                status => Err(HealthcheckError::UnexpectedStatus { status }.into()),
349            }
350        }
351    }
352}
353
354#[cfg(feature = "aws-core")]
355pub async fn sign_request(
356    service_type: &OpenSearchServiceType,
357    request: &mut http::Request<Bytes>,
358    credentials_provider: &aws_credential_types::provider::SharedCredentialsProvider,
359    region: Option<&aws_types::region::Region>,
360) -> crate::Result<()> {
361    // Amazon OpenSearch Serverless requires the x-amz-content-sha256 header when calculating
362    // the AWS v4 signature:
363    // https://docs.aws.amazon.com/opensearch-service/latest/developerguide/serverless-clients.html#serverless-signing
364    crate::aws::sign_request(
365        service_type.as_str(),
366        request,
367        credentials_provider,
368        region,
369        *service_type == OpenSearchServiceType::Serverless,
370    )
371    .await
372}
373
374async fn get_version(
375    base_url: &str,
376    auth: Option<&Auth>,
377    #[cfg(feature = "aws-core")] service_type: &OpenSearchServiceType,
378    request: &RequestConfig,
379    tls_settings: &TlsSettings,
380    proxy_config: &ProxyConfig,
381) -> crate::Result<usize> {
382    #[derive(Deserialize)]
383    struct Version {
384        number: Option<String>,
385    }
386    #[derive(Deserialize)]
387    struct ResponsePayload {
388        version: Option<Version>,
389    }
390
391    let client = HttpClient::new(tls_settings.clone(), proxy_config)?;
392    let response = get(
393        base_url,
394        auth,
395        #[cfg(feature = "aws-core")]
396        service_type,
397        request,
398        client,
399        "/",
400    )
401    .await
402    .map_err(|error| format!("Failed to get Elasticsearch API version: {error}"))?;
403
404    let (_, body) = response.into_parts();
405    let mut body = body.collect().await?.aggregate();
406    let body = body.copy_to_bytes(body.remaining());
407    let ResponsePayload { version } = serde_json::from_slice(&body)?;
408    if let Some(version) = version
409        && let Some(number) = version.number
410    {
411        let v: Vec<&str> = number.split('.').collect();
412        if !v.is_empty()
413            && let Ok(major_version) = v[0].parse::<usize>()
414        {
415            return Ok(major_version);
416        }
417    }
418    Err("Unexpected response from Elasticsearch endpoint `/`. Consider setting `api_version` option.".into())
419}
420
421async fn get(
422    base_url: &str,
423    auth: Option<&Auth>,
424    #[cfg(feature = "aws-core")] service_type: &OpenSearchServiceType,
425    request: &RequestConfig,
426    client: HttpClient,
427    path: &str,
428) -> crate::Result<Response<Body>> {
429    let mut builder = Request::get(format!("{base_url}{path}"));
430
431    for (header, value) in &request.headers {
432        builder = builder.header(&header[..], &value[..]);
433    }
434    let mut request = builder.body(Bytes::new())?;
435
436    if let Some(auth) = auth {
437        match auth {
438            Auth::Basic(http_auth) => {
439                http_auth.apply(&mut request);
440            }
441            #[cfg(feature = "aws-core")]
442            Auth::Aws {
443                credentials_provider: provider,
444                region,
445            } => {
446                let region = region.clone();
447                sign_request(service_type, &mut request, provider, Some(&region)).await?;
448            }
449        }
450    }
451
452    client
453        .send(request.map(hyper::Body::from))
454        .await
455        .map_err(Into::into)
456}