1use bytes::{Buf, Bytes};
2use http::{Response, StatusCode, Uri};
3use hyper::{Body, body};
4use serde::Deserialize;
5use snafu::ResultExt;
6use vector_lib::config::{LogNamespace, proxy::ProxyConfig};
7
8use super::{
9 ElasticsearchApiVersion, ElasticsearchEncoder, InvalidHostSnafu, Request, VersionType,
10 request_builder::ElasticsearchRequestBuilder,
11};
12use crate::{
13 http::{HttpClient, MaybeAuth, ParameterValue, QueryParameterValue, QueryParameters},
14 sinks::{
15 HealthcheckError,
16 elasticsearch::{
17 ElasticsearchAuthConfig, ElasticsearchCommonMode, ElasticsearchConfig,
18 OpenSearchServiceType, ParseError,
19 },
20 util::{UriSerde, auth::Auth, http::RequestConfig},
21 },
22 tls::TlsSettings,
23 transforms::metric_to_log::MetricToLog,
24};
25
26#[derive(Debug, Clone)]
27pub struct ElasticsearchCommon {
28 pub base_url: String,
29 pub bulk_uri: Uri,
30 pub auth: Option<Auth>,
31 pub service_type: OpenSearchServiceType,
32 pub mode: ElasticsearchCommonMode,
33 pub request_builder: ElasticsearchRequestBuilder,
34 pub tls_settings: TlsSettings,
35 pub request: RequestConfig,
36 pub query_params: QueryParameters,
37 pub metric_to_log: MetricToLog,
38}
39
40impl ElasticsearchCommon {
41 pub async fn parse_config(
42 config: &ElasticsearchConfig,
43 endpoint: &str,
44 proxy_config: &ProxyConfig,
45 version: &mut Option<usize>,
46 ) -> crate::Result<Self> {
47 Self::check_endpoint(endpoint)?;
49
50 let uri = endpoint.parse::<UriSerde>()?;
51
52 let auth = Self::extract_auth(config, proxy_config, &uri).await?;
54
55 if config.opensearch_service_type == OpenSearchServiceType::Serverless {
56 match &config.auth {
57 #[cfg(feature = "aws-core")]
58 Some(ElasticsearchAuthConfig::Aws(_)) => (),
59 _ => return Err(ParseError::OpenSearchServerlessRequiresAwsAuth.into()),
60 }
61 }
62
63 let base_url = uri.uri.to_string().trim_end_matches('/').to_owned();
64
65 let mode = config.common_mode()?;
66
67 let tower_request = config.request.tower.into_settings();
68
69 if config.bulk.version.is_some() && config.bulk.version_type == VersionType::Internal {
70 return Err(ParseError::ExternalVersionIgnoredWithInternalVersioning.into());
71 }
72 if config.bulk.version.is_some()
73 && (config.bulk.version_type == VersionType::External
74 || config.bulk.version_type == VersionType::ExternalGte)
75 && config.id_key.is_none()
76 {
77 return Err(ParseError::ExternalVersioningWithoutDocumentID.into());
78 }
79 if config.bulk.version.is_none()
80 && (config.bulk.version_type == VersionType::External
81 || config.bulk.version_type == VersionType::ExternalGte)
82 {
83 return Err(ParseError::ExternalVersioningWithoutVersion.into());
84 }
85
86 let mut query_params = config.query.clone().unwrap_or_default();
87 query_params.insert(
88 "timeout".into(),
89 QueryParameterValue::SingleParam(ParameterValue::String(format!(
90 "{}s",
91 tower_request.timeout.as_secs()
92 ))),
93 );
94
95 if let Some(pipeline) = &config.pipeline
96 && !pipeline.is_empty()
97 {
98 query_params.insert(
99 "pipeline".into(),
100 QueryParameterValue::SingleParam(ParameterValue::String(pipeline.into())),
101 );
102 }
103
104 let bulk_url = {
105 let mut query = url::form_urlencoded::Serializer::new(String::new());
106 for (param_name, param_value) in &query_params {
108 match param_value {
109 QueryParameterValue::SingleParam(param) => {
110 query.append_pair(param_name, param.value());
112 }
113 QueryParameterValue::MultiParams(params) => {
114 for value in params {
116 query.append_pair(param_name, value.value());
117 }
118 }
119 }
120 }
121 format!("{}/_bulk?{}", base_url, query.finish())
122 };
123 let bulk_uri = bulk_url.parse::<Uri>().unwrap();
124
125 let tls_settings = TlsSettings::from_options(config.tls.as_ref())?;
126 let config = config.clone();
127 let request = config.request;
128
129 let metric_config = config.metrics.clone().unwrap_or_default();
130 let metric_to_log = MetricToLog::new(
131 metric_config.host_tag.as_deref(),
132 metric_config.timezone.unwrap_or_default(),
133 LogNamespace::Legacy,
134 metric_config.metric_tag_values,
135 );
136
137 let service_type = config.opensearch_service_type;
138
139 let version = if service_type == OpenSearchServiceType::Serverless {
140 if config.api_version != ElasticsearchApiVersion::Auto {
141 return Err(ParseError::ServerlessElasticsearchApiVersionMustBeAuto.into());
142 }
143 8
146 } else if let Some(version) = *version {
147 version
148 } else {
149 let ver = match config.api_version {
150 ElasticsearchApiVersion::V6 => 6,
151 ElasticsearchApiVersion::V7 => 7,
152 ElasticsearchApiVersion::V8 => 8,
153 ElasticsearchApiVersion::Auto => {
154 match get_version(
155 &base_url,
156 auth.as_ref(),
157 #[cfg(feature = "aws-core")]
158 &service_type,
159 &request,
160 &tls_settings,
161 proxy_config,
162 )
163 .await
164 {
165 Ok(version) => {
166 debug!(message = "Auto-detected Elasticsearch API version.", %version);
167 version
168 }
169 Err(error) => {
172 let assumed_version = if config.suppress_type_name { 6 } else { 8 };
179 debug!(message = "Assumed Elasticsearch API version based on config setting suppress_type_name.",
180 %assumed_version,
181 %config.suppress_type_name
182 );
183 warn!(message = "Failed to determine Elasticsearch API version. Please fix the reported error or set an API version explicitly via `api_version`.",
184 %assumed_version,
185 %error
186 );
187 assumed_version
188 }
189 }
190 }
191 };
192 *version = Some(ver);
193 ver
194 };
195
196 let doc_type = config.doc_type.clone();
197 let suppress_type_name = if config.suppress_type_name {
198 warn!(
199 message = "DEPRECATION, use of deprecated option `suppress_type_name`. Please use `api_version` option instead."
200 );
201 config.suppress_type_name
202 } else {
203 version >= 7
204 };
205 let request_builder = ElasticsearchRequestBuilder {
206 compression: config.compression,
207 encoder: ElasticsearchEncoder {
208 transformer: config.encoding.clone(),
209 doc_type,
210 suppress_type_name,
211 },
212 };
213
214 Ok(Self {
215 auth,
216 service_type,
217 base_url,
218 bulk_uri,
219 mode,
220 request_builder,
221 query_params,
222 request,
223 tls_settings,
224 metric_to_log,
225 })
226 }
227
228 fn check_endpoint(endpoint: &str) -> crate::Result<()> {
229 let uri = format!("{endpoint}/_test");
230 let uri = uri
231 .parse::<Uri>()
232 .with_context(|_| InvalidHostSnafu { host: endpoint })?;
233 if uri.host().is_none() {
234 return Err(ParseError::HostMustIncludeHostname {
235 host: endpoint.to_string(),
236 }
237 .into());
238 }
239 Ok(())
240 }
241
242 async fn extract_auth(
244 config: &ElasticsearchConfig,
245 #[cfg_attr(not(feature = "aws-core"), allow(unused_variables))] proxy_config: &ProxyConfig,
246 uri: &UriSerde,
247 ) -> crate::Result<Option<Auth>> {
248 let auth = match &config.auth {
249 Some(ElasticsearchAuthConfig::Basic { user, password }) => {
250 let auth = Some(crate::http::Auth::Basic {
251 user: user.clone(),
252 password: password.clone(),
253 });
254 let auth = auth.choose_one(&uri.auth)?.unwrap();
256 Some(Auth::Basic(auth))
257 }
258 #[cfg(feature = "aws-core")]
259 Some(ElasticsearchAuthConfig::Aws(aws)) => {
260 let region = config
261 .aws
262 .as_ref()
263 .map(|config| config.region())
264 .ok_or(ParseError::RegionRequired)?
265 .ok_or(ParseError::RegionRequired)?;
266 Some(Auth::Aws {
267 credentials_provider: aws
268 .credentials_provider(region.clone(), proxy_config, config.tls.as_ref())
269 .await?,
270 region,
271 })
272 }
273 None => {
274 uri.auth.as_ref().and_then(|auth| match auth {
276 crate::http::Auth::Basic { user, password } => {
277 Some(Auth::Basic(crate::http::Auth::Basic {
278 user: user.clone(),
279 password: password.clone(),
280 }))
281 }
282 _ => None,
283 })
284 }
285 };
286 Ok(auth)
287 }
288
289 pub async fn parse_many(
291 config: &ElasticsearchConfig,
292 proxy_config: &ProxyConfig,
293 ) -> crate::Result<Vec<Self>> {
294 let mut version = None;
295 if let Some(endpoint) = config.endpoint.as_ref() {
296 warn!(
297 message = "DEPRECATION, use of deprecated option `endpoint`. Please use `endpoints` option instead."
298 );
299 if config.endpoints.is_empty() {
300 Ok(vec![
301 Self::parse_config(config, endpoint, proxy_config, &mut version).await?,
302 ])
303 } else {
304 Err(ParseError::EndpointsExclusive.into())
305 }
306 } else if config.endpoints.is_empty() {
307 Err(ParseError::EndpointRequired.into())
308 } else {
309 let mut commons = Vec::new();
310 for endpoint in config.endpoints.iter() {
311 commons
312 .push(Self::parse_config(config, endpoint, proxy_config, &mut version).await?);
313 }
314 Ok(commons)
315 }
316 }
317
318 #[cfg(test)]
320 pub async fn parse_single(config: &ElasticsearchConfig) -> crate::Result<Self> {
321 let mut commons =
322 Self::parse_many(config, crate::config::SinkContext::default().proxy()).await?;
323 assert_eq!(commons.len(), 1);
324 Ok(commons.remove(0))
325 }
326
327 pub async fn healthcheck(self, client: HttpClient) -> crate::Result<()> {
328 if self.service_type == OpenSearchServiceType::Serverless {
329 warn!(
330 message = "Amazon OpenSearch Serverless does not support healthchecks. Skipping healthcheck..."
331 );
332 Ok(())
333 } else {
334 match get(
335 &self.base_url,
336 self.auth.as_ref(),
337 #[cfg(feature = "aws-core")]
338 &self.service_type,
339 &self.request,
340 client,
341 "/_cluster/health",
342 )
343 .await?
344 .status()
345 {
346 StatusCode::OK => Ok(()),
347 status => Err(HealthcheckError::UnexpectedStatus { status }.into()),
348 }
349 }
350 }
351}
352
353#[cfg(feature = "aws-core")]
354pub async fn sign_request(
355 service_type: &OpenSearchServiceType,
356 request: &mut http::Request<Bytes>,
357 credentials_provider: &aws_credential_types::provider::SharedCredentialsProvider,
358 region: Option<&aws_types::region::Region>,
359) -> crate::Result<()> {
360 crate::aws::sign_request(
364 service_type.as_str(),
365 request,
366 credentials_provider,
367 region,
368 *service_type == OpenSearchServiceType::Serverless,
369 )
370 .await
371}
372
373async fn get_version(
374 base_url: &str,
375 auth: Option<&Auth>,
376 #[cfg(feature = "aws-core")] service_type: &OpenSearchServiceType,
377 request: &RequestConfig,
378 tls_settings: &TlsSettings,
379 proxy_config: &ProxyConfig,
380) -> crate::Result<usize> {
381 #[derive(Deserialize)]
382 struct Version {
383 number: Option<String>,
384 }
385 #[derive(Deserialize)]
386 struct ResponsePayload {
387 version: Option<Version>,
388 }
389
390 let client = HttpClient::new(tls_settings.clone(), proxy_config)?;
391 let response = get(
392 base_url,
393 auth,
394 #[cfg(feature = "aws-core")]
395 service_type,
396 request,
397 client,
398 "/",
399 )
400 .await
401 .map_err(|error| format!("Failed to get Elasticsearch API version: {error}"))?;
402
403 let (_, body) = response.into_parts();
404 let mut body = body::aggregate(body).await?;
405 let body = body.copy_to_bytes(body.remaining());
406 let ResponsePayload { version } = serde_json::from_slice(&body)?;
407 if let Some(version) = version
408 && let Some(number) = version.number
409 {
410 let v: Vec<&str> = number.split('.').collect();
411 if !v.is_empty()
412 && let Ok(major_version) = v[0].parse::<usize>()
413 {
414 return Ok(major_version);
415 }
416 }
417 Err("Unexpected response from Elasticsearch endpoint `/`. Consider setting `api_version` option.".into())
418}
419
420async fn get(
421 base_url: &str,
422 auth: Option<&Auth>,
423 #[cfg(feature = "aws-core")] service_type: &OpenSearchServiceType,
424 request: &RequestConfig,
425 client: HttpClient,
426 path: &str,
427) -> crate::Result<Response<Body>> {
428 let mut builder = Request::get(format!("{base_url}{path}"));
429
430 for (header, value) in &request.headers {
431 builder = builder.header(&header[..], &value[..]);
432 }
433 let mut request = builder.body(Bytes::new())?;
434
435 if let Some(auth) = auth {
436 match auth {
437 Auth::Basic(http_auth) => {
438 http_auth.apply(&mut request);
439 }
440 #[cfg(feature = "aws-core")]
441 Auth::Aws {
442 credentials_provider: provider,
443 region,
444 } => {
445 let region = region.clone();
446 sign_request(service_type, &mut request, provider, Some(®ion)).await?;
447 }
448 }
449 }
450
451 client
452 .send(request.map(hyper::Body::from))
453 .await
454 .map_err(Into::into)
455}