vector/sinks/elasticsearch/
common.rs

1use bytes::{Buf, Bytes};
2use http::{Response, StatusCode, Uri};
3use hyper::{Body, body};
4use serde::Deserialize;
5use snafu::ResultExt;
6use vector_lib::config::{LogNamespace, proxy::ProxyConfig};
7
8use super::{
9    ElasticsearchApiVersion, ElasticsearchEncoder, InvalidHostSnafu, Request, VersionType,
10    request_builder::ElasticsearchRequestBuilder,
11};
12use crate::{
13    http::{HttpClient, MaybeAuth, ParameterValue, QueryParameterValue, QueryParameters},
14    sinks::{
15        HealthcheckError,
16        elasticsearch::{
17            ElasticsearchAuthConfig, ElasticsearchCommonMode, ElasticsearchConfig,
18            OpenSearchServiceType, ParseError,
19        },
20        util::{UriSerde, auth::Auth, http::RequestConfig},
21    },
22    tls::TlsSettings,
23    transforms::metric_to_log::MetricToLog,
24};
25
26#[derive(Debug, Clone)]
27pub struct ElasticsearchCommon {
28    pub base_url: String,
29    pub bulk_uri: Uri,
30    pub auth: Option<Auth>,
31    pub service_type: OpenSearchServiceType,
32    pub mode: ElasticsearchCommonMode,
33    pub request_builder: ElasticsearchRequestBuilder,
34    pub tls_settings: TlsSettings,
35    pub request: RequestConfig,
36    pub query_params: QueryParameters,
37    pub metric_to_log: MetricToLog,
38}
39
40impl ElasticsearchCommon {
41    pub async fn parse_config(
42        config: &ElasticsearchConfig,
43        endpoint: &str,
44        proxy_config: &ProxyConfig,
45        version: &mut Option<usize>,
46    ) -> crate::Result<Self> {
47        // Test the configured host
48        Self::check_endpoint(endpoint)?;
49
50        let uri = endpoint.parse::<UriSerde>()?;
51
52        // get auth from config or uri
53        let auth = Self::extract_auth(config, proxy_config, &uri).await?;
54
55        if config.opensearch_service_type == OpenSearchServiceType::Serverless {
56            match &config.auth {
57                #[cfg(feature = "aws-core")]
58                Some(ElasticsearchAuthConfig::Aws(_)) => (),
59                _ => return Err(ParseError::OpenSearchServerlessRequiresAwsAuth.into()),
60            }
61        }
62
63        let base_url = uri.uri.to_string().trim_end_matches('/').to_owned();
64
65        let mode = config.common_mode()?;
66
67        let tower_request = config.request.tower.into_settings();
68
69        if config.bulk.version.is_some() && config.bulk.version_type == VersionType::Internal {
70            return Err(ParseError::ExternalVersionIgnoredWithInternalVersioning.into());
71        }
72        if config.bulk.version.is_some()
73            && (config.bulk.version_type == VersionType::External
74                || config.bulk.version_type == VersionType::ExternalGte)
75            && config.id_key.is_none()
76        {
77            return Err(ParseError::ExternalVersioningWithoutDocumentID.into());
78        }
79        if config.bulk.version.is_none()
80            && (config.bulk.version_type == VersionType::External
81                || config.bulk.version_type == VersionType::ExternalGte)
82        {
83            return Err(ParseError::ExternalVersioningWithoutVersion.into());
84        }
85
86        let mut query_params = config.query.clone().unwrap_or_default();
87        query_params.insert(
88            "timeout".into(),
89            QueryParameterValue::SingleParam(ParameterValue::String(format!(
90                "{}s",
91                tower_request.timeout.as_secs()
92            ))),
93        );
94
95        if let Some(pipeline) = &config.pipeline
96            && !pipeline.is_empty()
97        {
98            query_params.insert(
99                "pipeline".into(),
100                QueryParameterValue::SingleParam(ParameterValue::String(pipeline.into())),
101            );
102        }
103
104        let bulk_url = {
105            let mut query = url::form_urlencoded::Serializer::new(String::new());
106            // Iterate through the HashMap
107            for (param_name, param_value) in &query_params {
108                match param_value {
109                    QueryParameterValue::SingleParam(param) => {
110                        // For single parameter, just append one pair
111                        query.append_pair(param_name, param.value());
112                    }
113                    QueryParameterValue::MultiParams(params) => {
114                        // For multiple parameters, append the same key multiple times
115                        for value in params {
116                            query.append_pair(param_name, value.value());
117                        }
118                    }
119                }
120            }
121            format!("{}/_bulk?{}", base_url, query.finish())
122        };
123        let bulk_uri = bulk_url.parse::<Uri>().unwrap();
124
125        let tls_settings = TlsSettings::from_options(config.tls.as_ref())?;
126        let config = config.clone();
127        let request = config.request;
128
129        let metric_config = config.metrics.clone().unwrap_or_default();
130        let metric_to_log = MetricToLog::new(
131            metric_config.host_tag.as_deref(),
132            metric_config.timezone.unwrap_or_default(),
133            LogNamespace::Legacy,
134            metric_config.metric_tag_values,
135        );
136
137        let service_type = config.opensearch_service_type;
138
139        let version = if service_type == OpenSearchServiceType::Serverless {
140            if config.api_version != ElasticsearchApiVersion::Auto {
141                return Err(ParseError::ServerlessElasticsearchApiVersionMustBeAuto.into());
142            }
143            // Amazon OpenSearch Serverless does not support the cluster-version API; hardcode
144            // well-known API version
145            8
146        } else if let Some(version) = *version {
147            version
148        } else {
149            let ver = match config.api_version {
150                ElasticsearchApiVersion::V6 => 6,
151                ElasticsearchApiVersion::V7 => 7,
152                ElasticsearchApiVersion::V8 => 8,
153                ElasticsearchApiVersion::Auto => {
154                    match get_version(
155                        &base_url,
156                        auth.as_ref(),
157                        #[cfg(feature = "aws-core")]
158                        &service_type,
159                        &request,
160                        &tls_settings,
161                        proxy_config,
162                    )
163                    .await
164                    {
165                        Ok(version) => {
166                            debug!(message = "Auto-detected Elasticsearch API version.", %version);
167                            version
168                        }
169                        // This error should be fatal, but for now we only emit it as a warning
170                        // to make the transition smoother.
171                        Err(error) => {
172                            // For now, estimate version.
173                            // The `suppress_type_name` option is only valid up to V6, so if a user
174                            // specified that is true, then we will assume they need API V6.
175                            // Otherwise, assume the latest version (V8).
176                            // This is by no means a perfect assumption but it's the best we can
177                            // make with the data we have.
178                            let assumed_version = if config.suppress_type_name { 6 } else { 8 };
179                            debug!(message = "Assumed Elasticsearch API version based on config setting suppress_type_name.",
180                                   %assumed_version,
181                                   %config.suppress_type_name
182                            );
183                            warn!(message = "Failed to determine Elasticsearch API version. Please fix the reported error or set an API version explicitly via `api_version`.",
184                                  %assumed_version,
185                                  %error
186                            );
187                            assumed_version
188                        }
189                    }
190                }
191            };
192            *version = Some(ver);
193            ver
194        };
195
196        let doc_type = config.doc_type.clone();
197        let suppress_type_name = if config.suppress_type_name {
198            warn!(
199                message = "DEPRECATION, use of deprecated option `suppress_type_name`. Please use `api_version` option instead."
200            );
201            config.suppress_type_name
202        } else {
203            version >= 7
204        };
205        let request_builder = ElasticsearchRequestBuilder {
206            compression: config.compression,
207            encoder: ElasticsearchEncoder {
208                transformer: config.encoding.clone(),
209                doc_type,
210                suppress_type_name,
211            },
212        };
213
214        Ok(Self {
215            auth,
216            service_type,
217            base_url,
218            bulk_uri,
219            mode,
220            request_builder,
221            query_params,
222            request,
223            tls_settings,
224            metric_to_log,
225        })
226    }
227
228    fn check_endpoint(endpoint: &str) -> crate::Result<()> {
229        let uri = format!("{endpoint}/_test");
230        let uri = uri
231            .parse::<Uri>()
232            .with_context(|_| InvalidHostSnafu { host: endpoint })?;
233        if uri.host().is_none() {
234            return Err(ParseError::HostMustIncludeHostname {
235                host: endpoint.to_string(),
236            }
237            .into());
238        }
239        Ok(())
240    }
241
242    // extract the authentication from config or endpoint
243    async fn extract_auth(
244        config: &ElasticsearchConfig,
245        #[cfg_attr(not(feature = "aws-core"), allow(unused_variables))] proxy_config: &ProxyConfig,
246        uri: &UriSerde,
247    ) -> crate::Result<Option<Auth>> {
248        let auth = match &config.auth {
249            Some(ElasticsearchAuthConfig::Basic { user, password }) => {
250                let auth = Some(crate::http::Auth::Basic {
251                    user: user.clone(),
252                    password: password.clone(),
253                });
254                // get whichever auth is provided between config and uri, prevent duplicate auth.
255                let auth = auth.choose_one(&uri.auth)?.unwrap();
256                Some(Auth::Basic(auth))
257            }
258            #[cfg(feature = "aws-core")]
259            Some(ElasticsearchAuthConfig::Aws(aws)) => {
260                let region = config
261                    .aws
262                    .as_ref()
263                    .map(|config| config.region())
264                    .ok_or(ParseError::RegionRequired)?
265                    .ok_or(ParseError::RegionRequired)?;
266                Some(Auth::Aws {
267                    credentials_provider: aws
268                        .credentials_provider(region.clone(), proxy_config, config.tls.as_ref())
269                        .await?,
270                    region,
271                })
272            }
273            None => {
274                // Use the authentication from the URL if it exists
275                uri.auth.as_ref().and_then(|auth| match auth {
276                    crate::http::Auth::Basic { user, password } => {
277                        Some(Auth::Basic(crate::http::Auth::Basic {
278                            user: user.clone(),
279                            password: password.clone(),
280                        }))
281                    }
282                    _ => None,
283                })
284            }
285        };
286        Ok(auth)
287    }
288
289    /// Parses endpoints into a vector of ElasticsearchCommons. The resulting vector is guaranteed to not be empty.
290    pub async fn parse_many(
291        config: &ElasticsearchConfig,
292        proxy_config: &ProxyConfig,
293    ) -> crate::Result<Vec<Self>> {
294        let mut version = None;
295        if let Some(endpoint) = config.endpoint.as_ref() {
296            warn!(
297                message = "DEPRECATION, use of deprecated option `endpoint`. Please use `endpoints` option instead."
298            );
299            if config.endpoints.is_empty() {
300                Ok(vec![
301                    Self::parse_config(config, endpoint, proxy_config, &mut version).await?,
302                ])
303            } else {
304                Err(ParseError::EndpointsExclusive.into())
305            }
306        } else if config.endpoints.is_empty() {
307            Err(ParseError::EndpointRequired.into())
308        } else {
309            let mut commons = Vec::new();
310            for endpoint in config.endpoints.iter() {
311                commons
312                    .push(Self::parse_config(config, endpoint, proxy_config, &mut version).await?);
313            }
314            Ok(commons)
315        }
316    }
317
318    /// Parses a single endpoint, else panics.
319    #[cfg(test)]
320    pub async fn parse_single(config: &ElasticsearchConfig) -> crate::Result<Self> {
321        let mut commons =
322            Self::parse_many(config, crate::config::SinkContext::default().proxy()).await?;
323        assert_eq!(commons.len(), 1);
324        Ok(commons.remove(0))
325    }
326
327    pub async fn healthcheck(self, client: HttpClient) -> crate::Result<()> {
328        if self.service_type == OpenSearchServiceType::Serverless {
329            warn!(
330                message = "Amazon OpenSearch Serverless does not support healthchecks. Skipping healthcheck..."
331            );
332            Ok(())
333        } else {
334            match get(
335                &self.base_url,
336                self.auth.as_ref(),
337                #[cfg(feature = "aws-core")]
338                &self.service_type,
339                &self.request,
340                client,
341                "/_cluster/health",
342            )
343            .await?
344            .status()
345            {
346                StatusCode::OK => Ok(()),
347                status => Err(HealthcheckError::UnexpectedStatus { status }.into()),
348            }
349        }
350    }
351}
352
353#[cfg(feature = "aws-core")]
354pub async fn sign_request(
355    service_type: &OpenSearchServiceType,
356    request: &mut http::Request<Bytes>,
357    credentials_provider: &aws_credential_types::provider::SharedCredentialsProvider,
358    region: Option<&aws_types::region::Region>,
359) -> crate::Result<()> {
360    // Amazon OpenSearch Serverless requires the x-amz-content-sha256 header when calculating
361    // the AWS v4 signature:
362    // https://docs.aws.amazon.com/opensearch-service/latest/developerguide/serverless-clients.html#serverless-signing
363    crate::aws::sign_request(
364        service_type.as_str(),
365        request,
366        credentials_provider,
367        region,
368        *service_type == OpenSearchServiceType::Serverless,
369    )
370    .await
371}
372
373async fn get_version(
374    base_url: &str,
375    auth: Option<&Auth>,
376    #[cfg(feature = "aws-core")] service_type: &OpenSearchServiceType,
377    request: &RequestConfig,
378    tls_settings: &TlsSettings,
379    proxy_config: &ProxyConfig,
380) -> crate::Result<usize> {
381    #[derive(Deserialize)]
382    struct Version {
383        number: Option<String>,
384    }
385    #[derive(Deserialize)]
386    struct ResponsePayload {
387        version: Option<Version>,
388    }
389
390    let client = HttpClient::new(tls_settings.clone(), proxy_config)?;
391    let response = get(
392        base_url,
393        auth,
394        #[cfg(feature = "aws-core")]
395        service_type,
396        request,
397        client,
398        "/",
399    )
400    .await
401    .map_err(|error| format!("Failed to get Elasticsearch API version: {error}"))?;
402
403    let (_, body) = response.into_parts();
404    let mut body = body::aggregate(body).await?;
405    let body = body.copy_to_bytes(body.remaining());
406    let ResponsePayload { version } = serde_json::from_slice(&body)?;
407    if let Some(version) = version
408        && let Some(number) = version.number
409    {
410        let v: Vec<&str> = number.split('.').collect();
411        if !v.is_empty()
412            && let Ok(major_version) = v[0].parse::<usize>()
413        {
414            return Ok(major_version);
415        }
416    }
417    Err("Unexpected response from Elasticsearch endpoint `/`. Consider setting `api_version` option.".into())
418}
419
420async fn get(
421    base_url: &str,
422    auth: Option<&Auth>,
423    #[cfg(feature = "aws-core")] service_type: &OpenSearchServiceType,
424    request: &RequestConfig,
425    client: HttpClient,
426    path: &str,
427) -> crate::Result<Response<Body>> {
428    let mut builder = Request::get(format!("{base_url}{path}"));
429
430    for (header, value) in &request.headers {
431        builder = builder.header(&header[..], &value[..]);
432    }
433    let mut request = builder.body(Bytes::new())?;
434
435    if let Some(auth) = auth {
436        match auth {
437            Auth::Basic(http_auth) => {
438                http_auth.apply(&mut request);
439            }
440            #[cfg(feature = "aws-core")]
441            Auth::Aws {
442                credentials_provider: provider,
443                region,
444            } => {
445                let region = region.clone();
446                sign_request(service_type, &mut request, provider, Some(&region)).await?;
447            }
448        }
449    }
450
451    client
452        .send(request.map(hyper::Body::from))
453        .await
454        .map_err(Into::into)
455}