1use bytes::{Buf, Bytes};
2use http::{Response, StatusCode, Uri};
3use http_body::Body as _;
4use hyper::Body;
5use serde::Deserialize;
6use snafu::ResultExt;
7use vector_lib::config::{LogNamespace, proxy::ProxyConfig};
8
9use super::{
10 ElasticsearchApiVersion, ElasticsearchEncoder, InvalidHostSnafu, Request, VersionType,
11 request_builder::ElasticsearchRequestBuilder,
12};
13use crate::{
14 http::{HttpClient, MaybeAuth, ParameterValue, QueryParameterValue, QueryParameters},
15 sinks::{
16 HealthcheckError,
17 elasticsearch::{
18 ElasticsearchAuthConfig, ElasticsearchCommonMode, ElasticsearchConfig,
19 OpenSearchServiceType, ParseError,
20 },
21 util::{UriSerde, auth::Auth, http::RequestConfig},
22 },
23 tls::TlsSettings,
24 transforms::metric_to_log::MetricToLog,
25};
26
27#[derive(Debug, Clone)]
28pub struct ElasticsearchCommon {
29 pub base_url: String,
30 pub bulk_uri: Uri,
31 pub auth: Option<Auth>,
32 pub service_type: OpenSearchServiceType,
33 pub mode: ElasticsearchCommonMode,
34 pub request_builder: ElasticsearchRequestBuilder,
35 pub tls_settings: TlsSettings,
36 pub request: RequestConfig,
37 pub query_params: QueryParameters,
38 pub metric_to_log: MetricToLog,
39}
40
41impl ElasticsearchCommon {
42 pub async fn parse_config(
43 config: &ElasticsearchConfig,
44 endpoint: &str,
45 proxy_config: &ProxyConfig,
46 version: &mut Option<usize>,
47 ) -> crate::Result<Self> {
48 Self::check_endpoint(endpoint)?;
50
51 let uri = endpoint.parse::<UriSerde>()?;
52
53 let auth = Self::extract_auth(config, proxy_config, &uri).await?;
55
56 if config.opensearch_service_type == OpenSearchServiceType::Serverless {
57 match &config.auth {
58 #[cfg(feature = "aws-core")]
59 Some(ElasticsearchAuthConfig::Aws(_)) => (),
60 _ => return Err(ParseError::OpenSearchServerlessRequiresAwsAuth.into()),
61 }
62 }
63
64 let base_url = uri.uri.to_string().trim_end_matches('/').to_owned();
65
66 let mode = config.common_mode()?;
67
68 let tower_request = config.request.tower.into_settings();
69
70 if config.bulk.version.is_some() && config.bulk.version_type == VersionType::Internal {
71 return Err(ParseError::ExternalVersionIgnoredWithInternalVersioning.into());
72 }
73 if config.bulk.version.is_some()
74 && (config.bulk.version_type == VersionType::External
75 || config.bulk.version_type == VersionType::ExternalGte)
76 && config.id_key.is_none()
77 {
78 return Err(ParseError::ExternalVersioningWithoutDocumentID.into());
79 }
80 if config.bulk.version.is_none()
81 && (config.bulk.version_type == VersionType::External
82 || config.bulk.version_type == VersionType::ExternalGte)
83 {
84 return Err(ParseError::ExternalVersioningWithoutVersion.into());
85 }
86
87 let mut query_params = config.query.clone().unwrap_or_default();
88 query_params.insert(
89 "timeout".into(),
90 QueryParameterValue::SingleParam(ParameterValue::String(format!(
91 "{}s",
92 tower_request.timeout.as_secs()
93 ))),
94 );
95
96 if let Some(pipeline) = &config.pipeline
97 && !pipeline.is_empty()
98 {
99 query_params.insert(
100 "pipeline".into(),
101 QueryParameterValue::SingleParam(ParameterValue::String(pipeline.into())),
102 );
103 }
104
105 let bulk_url = {
106 let mut query = url::form_urlencoded::Serializer::new(String::new());
107 for (param_name, param_value) in &query_params {
109 match param_value {
110 QueryParameterValue::SingleParam(param) => {
111 query.append_pair(param_name, param.value());
113 }
114 QueryParameterValue::MultiParams(params) => {
115 for value in params {
117 query.append_pair(param_name, value.value());
118 }
119 }
120 }
121 }
122 format!("{}/_bulk?{}", base_url, query.finish())
123 };
124 let bulk_uri = bulk_url.parse::<Uri>().unwrap();
125
126 let tls_settings = TlsSettings::from_options(config.tls.as_ref())?;
127 let config = config.clone();
128 let request = config.request;
129
130 let metric_config = config.metrics.clone().unwrap_or_default();
131 let metric_to_log = MetricToLog::new(
132 metric_config.host_tag.as_deref(),
133 metric_config.timezone.unwrap_or_default(),
134 LogNamespace::Legacy,
135 metric_config.metric_tag_values,
136 );
137
138 let service_type = config.opensearch_service_type;
139
140 let version = if service_type == OpenSearchServiceType::Serverless {
141 if config.api_version != ElasticsearchApiVersion::Auto {
142 return Err(ParseError::ServerlessElasticsearchApiVersionMustBeAuto.into());
143 }
144 8
147 } else if let Some(version) = *version {
148 version
149 } else {
150 let ver = match config.api_version {
151 ElasticsearchApiVersion::V6 => 6,
152 ElasticsearchApiVersion::V7 => 7,
153 ElasticsearchApiVersion::V8 => 8,
154 ElasticsearchApiVersion::Auto => {
155 match get_version(
156 &base_url,
157 auth.as_ref(),
158 #[cfg(feature = "aws-core")]
159 &service_type,
160 &request,
161 &tls_settings,
162 proxy_config,
163 )
164 .await
165 {
166 Ok(version) => {
167 debug!(message = "Auto-detected Elasticsearch API version.", %version);
168 version
169 }
170 Err(error) => {
173 let assumed_version = if config.suppress_type_name { 6 } else { 8 };
180 debug!(message = "Assumed Elasticsearch API version based on config setting suppress_type_name.",
181 %assumed_version,
182 %config.suppress_type_name
183 );
184 warn!(message = "Failed to determine Elasticsearch API version. Please fix the reported error or set an API version explicitly via `api_version`.",
185 %assumed_version,
186 %error
187 );
188 assumed_version
189 }
190 }
191 }
192 };
193 *version = Some(ver);
194 ver
195 };
196
197 let doc_type = config.doc_type.clone();
198 let suppress_type_name = if config.suppress_type_name {
199 warn!(
200 message = "DEPRECATION, use of deprecated option `suppress_type_name`. Please use `api_version` option instead."
201 );
202 config.suppress_type_name
203 } else {
204 version >= 7
205 };
206 let request_builder = ElasticsearchRequestBuilder {
207 compression: config.compression,
208 encoder: ElasticsearchEncoder {
209 transformer: config.encoding.clone(),
210 doc_type,
211 suppress_type_name,
212 },
213 };
214
215 Ok(Self {
216 auth,
217 service_type,
218 base_url,
219 bulk_uri,
220 mode,
221 request_builder,
222 query_params,
223 request,
224 tls_settings,
225 metric_to_log,
226 })
227 }
228
229 fn check_endpoint(endpoint: &str) -> crate::Result<()> {
230 let uri = format!("{endpoint}/_test");
231 let uri = uri
232 .parse::<Uri>()
233 .with_context(|_| InvalidHostSnafu { host: endpoint })?;
234 if uri.host().is_none() {
235 return Err(ParseError::HostMustIncludeHostname {
236 host: endpoint.to_string(),
237 }
238 .into());
239 }
240 Ok(())
241 }
242
243 async fn extract_auth(
245 config: &ElasticsearchConfig,
246 #[cfg_attr(not(feature = "aws-core"), allow(unused_variables))] proxy_config: &ProxyConfig,
247 uri: &UriSerde,
248 ) -> crate::Result<Option<Auth>> {
249 let auth = match &config.auth {
250 Some(ElasticsearchAuthConfig::Basic { user, password }) => {
251 let auth = Some(crate::http::Auth::Basic {
252 user: user.clone(),
253 password: password.clone(),
254 });
255 let auth = auth.choose_one(&uri.auth)?.unwrap();
257 Some(Auth::Basic(auth))
258 }
259 #[cfg(feature = "aws-core")]
260 Some(ElasticsearchAuthConfig::Aws(aws)) => {
261 let region = config
262 .aws
263 .as_ref()
264 .map(|config| config.region())
265 .ok_or(ParseError::RegionRequired)?
266 .ok_or(ParseError::RegionRequired)?;
267 Some(Auth::Aws {
268 credentials_provider: aws
269 .credentials_provider(region.clone(), proxy_config, config.tls.as_ref())
270 .await?,
271 region,
272 })
273 }
274 None => {
275 uri.auth.as_ref().and_then(|auth| match auth {
277 crate::http::Auth::Basic { user, password } => {
278 Some(Auth::Basic(crate::http::Auth::Basic {
279 user: user.clone(),
280 password: password.clone(),
281 }))
282 }
283 _ => None,
284 })
285 }
286 };
287 Ok(auth)
288 }
289
290 pub async fn parse_many(
292 config: &ElasticsearchConfig,
293 proxy_config: &ProxyConfig,
294 ) -> crate::Result<Vec<Self>> {
295 let mut version = None;
296 if let Some(endpoint) = config.endpoint.as_ref() {
297 warn!(
298 message = "DEPRECATION, use of deprecated option `endpoint`. Please use `endpoints` option instead."
299 );
300 if config.endpoints.is_empty() {
301 Ok(vec![
302 Self::parse_config(config, endpoint, proxy_config, &mut version).await?,
303 ])
304 } else {
305 Err(ParseError::EndpointsExclusive.into())
306 }
307 } else if config.endpoints.is_empty() {
308 Err(ParseError::EndpointRequired.into())
309 } else {
310 let mut commons = Vec::new();
311 for endpoint in config.endpoints.iter() {
312 commons
313 .push(Self::parse_config(config, endpoint, proxy_config, &mut version).await?);
314 }
315 Ok(commons)
316 }
317 }
318
319 #[cfg(test)]
321 pub async fn parse_single(config: &ElasticsearchConfig) -> crate::Result<Self> {
322 let mut commons =
323 Self::parse_many(config, crate::config::SinkContext::default().proxy()).await?;
324 assert_eq!(commons.len(), 1);
325 Ok(commons.remove(0))
326 }
327
328 pub async fn healthcheck(self, client: HttpClient) -> crate::Result<()> {
329 if self.service_type == OpenSearchServiceType::Serverless {
330 warn!(
331 message = "Amazon OpenSearch Serverless does not support healthchecks. Skipping healthcheck..."
332 );
333 Ok(())
334 } else {
335 match get(
336 &self.base_url,
337 self.auth.as_ref(),
338 #[cfg(feature = "aws-core")]
339 &self.service_type,
340 &self.request,
341 client,
342 "/_cluster/health",
343 )
344 .await?
345 .status()
346 {
347 StatusCode::OK => Ok(()),
348 status => Err(HealthcheckError::UnexpectedStatus { status }.into()),
349 }
350 }
351 }
352}
353
354#[cfg(feature = "aws-core")]
355pub async fn sign_request(
356 service_type: &OpenSearchServiceType,
357 request: &mut http::Request<Bytes>,
358 credentials_provider: &aws_credential_types::provider::SharedCredentialsProvider,
359 region: Option<&aws_types::region::Region>,
360) -> crate::Result<()> {
361 crate::aws::sign_request(
365 service_type.as_str(),
366 request,
367 credentials_provider,
368 region,
369 *service_type == OpenSearchServiceType::Serverless,
370 )
371 .await
372}
373
374async fn get_version(
375 base_url: &str,
376 auth: Option<&Auth>,
377 #[cfg(feature = "aws-core")] service_type: &OpenSearchServiceType,
378 request: &RequestConfig,
379 tls_settings: &TlsSettings,
380 proxy_config: &ProxyConfig,
381) -> crate::Result<usize> {
382 #[derive(Deserialize)]
383 struct Version {
384 number: Option<String>,
385 }
386 #[derive(Deserialize)]
387 struct ResponsePayload {
388 version: Option<Version>,
389 }
390
391 let client = HttpClient::new(tls_settings.clone(), proxy_config)?;
392 let response = get(
393 base_url,
394 auth,
395 #[cfg(feature = "aws-core")]
396 service_type,
397 request,
398 client,
399 "/",
400 )
401 .await
402 .map_err(|error| format!("Failed to get Elasticsearch API version: {error}"))?;
403
404 let (_, body) = response.into_parts();
405 let mut body = body.collect().await?.aggregate();
406 let body = body.copy_to_bytes(body.remaining());
407 let ResponsePayload { version } = serde_json::from_slice(&body)?;
408 if let Some(version) = version
409 && let Some(number) = version.number
410 {
411 let v: Vec<&str> = number.split('.').collect();
412 if !v.is_empty()
413 && let Ok(major_version) = v[0].parse::<usize>()
414 {
415 return Ok(major_version);
416 }
417 }
418 Err("Unexpected response from Elasticsearch endpoint `/`. Consider setting `api_version` option.".into())
419}
420
421async fn get(
422 base_url: &str,
423 auth: Option<&Auth>,
424 #[cfg(feature = "aws-core")] service_type: &OpenSearchServiceType,
425 request: &RequestConfig,
426 client: HttpClient,
427 path: &str,
428) -> crate::Result<Response<Body>> {
429 let mut builder = Request::get(format!("{base_url}{path}"));
430
431 for (header, value) in &request.headers {
432 builder = builder.header(&header[..], &value[..]);
433 }
434 let mut request = builder.body(Bytes::new())?;
435
436 if let Some(auth) = auth {
437 match auth {
438 Auth::Basic(http_auth) => {
439 http_auth.apply(&mut request);
440 }
441 #[cfg(feature = "aws-core")]
442 Auth::Aws {
443 credentials_provider: provider,
444 region,
445 } => {
446 let region = region.clone();
447 sign_request(service_type, &mut request, provider, Some(®ion)).await?;
448 }
449 }
450 }
451
452 client
453 .send(request.map(hyper::Body::from))
454 .await
455 .map_err(Into::into)
456}