1use bytes::{Buf, Bytes};
2use http::{Response, StatusCode, Uri};
3use hyper::{body, Body};
4use serde::Deserialize;
5use snafu::ResultExt;
6use vector_lib::config::proxy::ProxyConfig;
7use vector_lib::config::LogNamespace;
8
9use super::{
10 request_builder::ElasticsearchRequestBuilder, ElasticsearchApiVersion, ElasticsearchEncoder,
11 InvalidHostSnafu, Request, VersionType,
12};
13use crate::{
14 http::{HttpClient, MaybeAuth, ParameterValue, QueryParameterValue, QueryParameters},
15 sinks::{
16 elasticsearch::{
17 ElasticsearchAuthConfig, ElasticsearchCommonMode, ElasticsearchConfig,
18 OpenSearchServiceType, ParseError,
19 },
20 util::{auth::Auth, http::RequestConfig, UriSerde},
21 HealthcheckError,
22 },
23 tls::TlsSettings,
24 transforms::metric_to_log::MetricToLog,
25};
26
27#[derive(Debug, Clone)]
28pub struct ElasticsearchCommon {
29 pub base_url: String,
30 pub bulk_uri: Uri,
31 pub auth: Option<Auth>,
32 pub service_type: OpenSearchServiceType,
33 pub mode: ElasticsearchCommonMode,
34 pub request_builder: ElasticsearchRequestBuilder,
35 pub tls_settings: TlsSettings,
36 pub request: RequestConfig,
37 pub query_params: QueryParameters,
38 pub metric_to_log: MetricToLog,
39}
40
41impl ElasticsearchCommon {
42 pub async fn parse_config(
43 config: &ElasticsearchConfig,
44 endpoint: &str,
45 proxy_config: &ProxyConfig,
46 version: &mut Option<usize>,
47 ) -> crate::Result<Self> {
48 Self::check_endpoint(endpoint)?;
50
51 let uri = endpoint.parse::<UriSerde>()?;
52
53 let auth = Self::extract_auth(config, proxy_config, &uri).await?;
55
56 if config.opensearch_service_type == OpenSearchServiceType::Serverless {
57 match &config.auth {
58 #[cfg(feature = "aws-core")]
59 Some(ElasticsearchAuthConfig::Aws(_)) => (),
60 _ => return Err(ParseError::OpenSearchServerlessRequiresAwsAuth.into()),
61 }
62 }
63
64 let base_url = uri.uri.to_string().trim_end_matches('/').to_owned();
65
66 let mode = config.common_mode()?;
67
68 let tower_request = config.request.tower.into_settings();
69
70 if config.bulk.version.is_some() && config.bulk.version_type == VersionType::Internal {
71 return Err(ParseError::ExternalVersionIgnoredWithInternalVersioning.into());
72 }
73 if config.bulk.version.is_some()
74 && (config.bulk.version_type == VersionType::External
75 || config.bulk.version_type == VersionType::ExternalGte)
76 && config.id_key.is_none()
77 {
78 return Err(ParseError::ExternalVersioningWithoutDocumentID.into());
79 }
80 if config.bulk.version.is_none()
81 && (config.bulk.version_type == VersionType::External
82 || config.bulk.version_type == VersionType::ExternalGte)
83 {
84 return Err(ParseError::ExternalVersioningWithoutVersion.into());
85 }
86
87 let mut query_params = config.query.clone().unwrap_or_default();
88 query_params.insert(
89 "timeout".into(),
90 QueryParameterValue::SingleParam(ParameterValue::String(format!(
91 "{}s",
92 tower_request.timeout.as_secs()
93 ))),
94 );
95
96 if let Some(pipeline) = &config.pipeline {
97 if !pipeline.is_empty() {
98 query_params.insert(
99 "pipeline".into(),
100 QueryParameterValue::SingleParam(ParameterValue::String(pipeline.into())),
101 );
102 }
103 }
104
105 let bulk_url = {
106 let mut query = url::form_urlencoded::Serializer::new(String::new());
107 for (param_name, param_value) in &query_params {
109 match param_value {
110 QueryParameterValue::SingleParam(param) => {
111 query.append_pair(param_name, param.value());
113 }
114 QueryParameterValue::MultiParams(params) => {
115 for value in params {
117 query.append_pair(param_name, value.value());
118 }
119 }
120 }
121 }
122 format!("{}/_bulk?{}", base_url, query.finish())
123 };
124 let bulk_uri = bulk_url.parse::<Uri>().unwrap();
125
126 let tls_settings = TlsSettings::from_options(config.tls.as_ref())?;
127 let config = config.clone();
128 let request = config.request;
129
130 let metric_config = config.metrics.clone().unwrap_or_default();
131 let metric_to_log = MetricToLog::new(
132 metric_config.host_tag.as_deref(),
133 metric_config.timezone.unwrap_or_default(),
134 LogNamespace::Legacy,
135 metric_config.metric_tag_values,
136 );
137
138 let service_type = config.opensearch_service_type;
139
140 let version = if service_type == OpenSearchServiceType::Serverless {
141 if config.api_version != ElasticsearchApiVersion::Auto {
142 return Err(ParseError::ServerlessElasticsearchApiVersionMustBeAuto.into());
143 }
144 8
147 } else if let Some(version) = *version {
148 version
149 } else {
150 let ver = match config.api_version {
151 ElasticsearchApiVersion::V6 => 6,
152 ElasticsearchApiVersion::V7 => 7,
153 ElasticsearchApiVersion::V8 => 8,
154 ElasticsearchApiVersion::Auto => {
155 match get_version(
156 &base_url,
157 auth.as_ref(),
158 #[cfg(feature = "aws-core")]
159 &service_type,
160 &request,
161 &tls_settings,
162 proxy_config,
163 )
164 .await
165 {
166 Ok(version) => {
167 debug!(message = "Auto-detected Elasticsearch API version.", %version);
168 version
169 }
170 Err(error) => {
173 let assumed_version = if config.suppress_type_name { 6 } else { 8 };
180 debug!(message = "Assumed Elasticsearch API version based on config setting suppress_type_name.",
181 %assumed_version,
182 %config.suppress_type_name
183 );
184 warn!(message = "Failed to determine Elasticsearch API version. Please fix the reported error or set an API version explicitly via `api_version`.",
185 %assumed_version,
186 %error
187 );
188 assumed_version
189 }
190 }
191 }
192 };
193 *version = Some(ver);
194 ver
195 };
196
197 let doc_type = config.doc_type.clone();
198 let suppress_type_name = if config.suppress_type_name {
199 warn!(message = "DEPRECATION, use of deprecated option `suppress_type_name`. Please use `api_version` option instead.");
200 config.suppress_type_name
201 } else {
202 version >= 7
203 };
204 let request_builder = ElasticsearchRequestBuilder {
205 compression: config.compression,
206 encoder: ElasticsearchEncoder {
207 transformer: config.encoding.clone(),
208 doc_type,
209 suppress_type_name,
210 },
211 };
212
213 Ok(Self {
214 auth,
215 service_type,
216 base_url,
217 bulk_uri,
218 mode,
219 request_builder,
220 query_params,
221 request,
222 tls_settings,
223 metric_to_log,
224 })
225 }
226
227 fn check_endpoint(endpoint: &str) -> crate::Result<()> {
228 let uri = format!("{endpoint}/_test");
229 let uri = uri
230 .parse::<Uri>()
231 .with_context(|_| InvalidHostSnafu { host: endpoint })?;
232 if uri.host().is_none() {
233 return Err(ParseError::HostMustIncludeHostname {
234 host: endpoint.to_string(),
235 }
236 .into());
237 }
238 Ok(())
239 }
240
241 async fn extract_auth(
243 config: &ElasticsearchConfig,
244 #[cfg_attr(not(feature = "aws-core"), allow(unused_variables))] proxy_config: &ProxyConfig,
245 uri: &UriSerde,
246 ) -> crate::Result<Option<Auth>> {
247 let auth = match &config.auth {
248 Some(ElasticsearchAuthConfig::Basic { user, password }) => {
249 let auth = Some(crate::http::Auth::Basic {
250 user: user.clone(),
251 password: password.clone(),
252 });
253 let auth = auth.choose_one(&uri.auth)?.unwrap();
255 Some(Auth::Basic(auth))
256 }
257 #[cfg(feature = "aws-core")]
258 Some(ElasticsearchAuthConfig::Aws(aws)) => {
259 let region = config
260 .aws
261 .as_ref()
262 .map(|config| config.region())
263 .ok_or(ParseError::RegionRequired)?
264 .ok_or(ParseError::RegionRequired)?;
265 Some(Auth::Aws {
266 credentials_provider: aws
267 .credentials_provider(region.clone(), proxy_config, config.tls.as_ref())
268 .await?,
269 region,
270 })
271 }
272 None => {
273 uri.auth.as_ref().and_then(|auth| match auth {
275 crate::http::Auth::Basic { user, password } => {
276 Some(Auth::Basic(crate::http::Auth::Basic {
277 user: user.clone(),
278 password: password.clone(),
279 }))
280 }
281 _ => None,
282 })
283 }
284 };
285 Ok(auth)
286 }
287
288 pub async fn parse_many(
290 config: &ElasticsearchConfig,
291 proxy_config: &ProxyConfig,
292 ) -> crate::Result<Vec<Self>> {
293 let mut version = None;
294 if let Some(endpoint) = config.endpoint.as_ref() {
295 warn!(message = "DEPRECATION, use of deprecated option `endpoint`. Please use `endpoints` option instead.");
296 if config.endpoints.is_empty() {
297 Ok(vec![
298 Self::parse_config(config, endpoint, proxy_config, &mut version).await?,
299 ])
300 } else {
301 Err(ParseError::EndpointsExclusive.into())
302 }
303 } else if config.endpoints.is_empty() {
304 Err(ParseError::EndpointRequired.into())
305 } else {
306 let mut commons = Vec::new();
307 for endpoint in config.endpoints.iter() {
308 commons
309 .push(Self::parse_config(config, endpoint, proxy_config, &mut version).await?);
310 }
311 Ok(commons)
312 }
313 }
314
315 #[cfg(test)]
317 pub async fn parse_single(config: &ElasticsearchConfig) -> crate::Result<Self> {
318 let mut commons =
319 Self::parse_many(config, crate::config::SinkContext::default().proxy()).await?;
320 assert_eq!(commons.len(), 1);
321 Ok(commons.remove(0))
322 }
323
324 pub async fn healthcheck(self, client: HttpClient) -> crate::Result<()> {
325 if self.service_type == OpenSearchServiceType::Serverless {
326 warn!(message = "Amazon OpenSearch Serverless does not support healthchecks. Skipping healthcheck...");
327 Ok(())
328 } else {
329 match get(
330 &self.base_url,
331 self.auth.as_ref(),
332 #[cfg(feature = "aws-core")]
333 &self.service_type,
334 &self.request,
335 client,
336 "/_cluster/health",
337 )
338 .await?
339 .status()
340 {
341 StatusCode::OK => Ok(()),
342 status => Err(HealthcheckError::UnexpectedStatus { status }.into()),
343 }
344 }
345 }
346}
347
348#[cfg(feature = "aws-core")]
349pub async fn sign_request(
350 service_type: &OpenSearchServiceType,
351 request: &mut http::Request<Bytes>,
352 credentials_provider: &aws_credential_types::provider::SharedCredentialsProvider,
353 region: Option<&aws_types::region::Region>,
354) -> crate::Result<()> {
355 crate::aws::sign_request(
359 service_type.as_str(),
360 request,
361 credentials_provider,
362 region,
363 *service_type == OpenSearchServiceType::Serverless,
364 )
365 .await
366}
367
368async fn get_version(
369 base_url: &str,
370 auth: Option<&Auth>,
371 #[cfg(feature = "aws-core")] service_type: &OpenSearchServiceType,
372 request: &RequestConfig,
373 tls_settings: &TlsSettings,
374 proxy_config: &ProxyConfig,
375) -> crate::Result<usize> {
376 #[derive(Deserialize)]
377 struct Version {
378 number: Option<String>,
379 }
380 #[derive(Deserialize)]
381 struct ResponsePayload {
382 version: Option<Version>,
383 }
384
385 let client = HttpClient::new(tls_settings.clone(), proxy_config)?;
386 let response = get(
387 base_url,
388 auth,
389 #[cfg(feature = "aws-core")]
390 service_type,
391 request,
392 client,
393 "/",
394 )
395 .await
396 .map_err(|error| format!("Failed to get Elasticsearch API version: {error}"))?;
397
398 let (_, body) = response.into_parts();
399 let mut body = body::aggregate(body).await?;
400 let body = body.copy_to_bytes(body.remaining());
401 let ResponsePayload { version } = serde_json::from_slice(&body)?;
402 if let Some(version) = version {
403 if let Some(number) = version.number {
404 let v: Vec<&str> = number.split('.').collect();
405 if !v.is_empty() {
406 if let Ok(major_version) = v[0].parse::<usize>() {
407 return Ok(major_version);
408 }
409 }
410 }
411 }
412 Err("Unexpected response from Elasticsearch endpoint `/`. Consider setting `api_version` option.".into())
413}
414
415async fn get(
416 base_url: &str,
417 auth: Option<&Auth>,
418 #[cfg(feature = "aws-core")] service_type: &OpenSearchServiceType,
419 request: &RequestConfig,
420 client: HttpClient,
421 path: &str,
422) -> crate::Result<Response<Body>> {
423 let mut builder = Request::get(format!("{base_url}{path}"));
424
425 for (header, value) in &request.headers {
426 builder = builder.header(&header[..], &value[..]);
427 }
428 let mut request = builder.body(Bytes::new())?;
429
430 if let Some(auth) = auth {
431 match auth {
432 Auth::Basic(http_auth) => {
433 http_auth.apply(&mut request);
434 }
435 #[cfg(feature = "aws-core")]
436 Auth::Aws {
437 credentials_provider: provider,
438 region,
439 } => {
440 let region = region.clone();
441 sign_request(service_type, &mut request, provider, Some(®ion)).await?;
442 }
443 }
444 }
445
446 client
447 .send(request.map(hyper::Body::from))
448 .await
449 .map_err(Into::into)
450}