vector/sinks/elasticsearch/
service.rs

1use std::{
2    sync::Arc,
3    task::{Context, Poll},
4};
5
6use bytes::Bytes;
7use futures::future::BoxFuture;
8use http::{Response, Uri};
9use hyper::{service::Service, Body, Request};
10use tower::ServiceExt;
11use vector_lib::stream::DriverResponse;
12use vector_lib::ByteSizeOf;
13use vector_lib::{
14    json_size::JsonSize,
15    request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata},
16};
17
18use super::{ElasticsearchCommon, ElasticsearchConfig};
19use crate::{
20    event::{EventFinalizers, EventStatus, Finalizable},
21    http::HttpClient,
22    sinks::{
23        elasticsearch::{encoder::ProcessedEvent, request_builder::ElasticsearchRequestBuilder},
24        util::{
25            auth::Auth,
26            http::{HttpBatchService, RequestConfig},
27            Compression, ElementCount,
28        },
29    },
30};
31
32#[derive(Clone, Debug)]
33pub struct ElasticsearchRequest {
34    pub payload: Bytes,
35    pub finalizers: EventFinalizers,
36    pub batch_size: usize,
37    pub events_byte_size: JsonSize,
38    pub metadata: RequestMetadata,
39    pub original_events: Vec<ProcessedEvent>, //store original_events for reconstruct request when retrying
40    pub elasticsearch_request_builder: ElasticsearchRequestBuilder,
41}
42
43impl ByteSizeOf for ElasticsearchRequest {
44    fn allocated_bytes(&self) -> usize {
45        self.payload.allocated_bytes() + self.finalizers.allocated_bytes()
46    }
47}
48
49impl ElementCount for ElasticsearchRequest {
50    fn element_count(&self) -> usize {
51        self.batch_size
52    }
53}
54
55impl Finalizable for ElasticsearchRequest {
56    fn take_finalizers(&mut self) -> EventFinalizers {
57        std::mem::take(&mut self.finalizers)
58    }
59}
60
61impl MetaDescriptive for ElasticsearchRequest {
62    fn get_metadata(&self) -> &RequestMetadata {
63        &self.metadata
64    }
65
66    fn metadata_mut(&mut self) -> &mut RequestMetadata {
67        &mut self.metadata
68    }
69}
70
71#[derive(Clone)]
72pub struct ElasticsearchService {
73    // TODO: `HttpBatchService` has been deprecated for direct use in sinks.
74    //       This sink should undergo a refactor to utilize the `HttpService`
75    //       instead, which extracts much of the boilerplate code for `Service`.
76    batch_service: HttpBatchService<
77        BoxFuture<'static, Result<http::Request<Bytes>, crate::Error>>,
78        ElasticsearchRequest,
79    >,
80}
81
82impl ElasticsearchService {
83    pub fn new(
84        http_client: HttpClient<Body>,
85        http_request_builder: HttpRequestBuilder,
86    ) -> ElasticsearchService {
87        let http_request_builder = Arc::new(http_request_builder);
88        let batch_service = HttpBatchService::new(http_client, move |req| {
89            let request_builder = Arc::clone(&http_request_builder);
90            let future: BoxFuture<'static, Result<http::Request<Bytes>, crate::Error>> =
91                Box::pin(async move { request_builder.build_request(req).await });
92            future
93        });
94        ElasticsearchService { batch_service }
95    }
96}
97
98pub struct HttpRequestBuilder {
99    pub bulk_uri: Uri,
100    pub auth: Option<Auth>,
101    pub service_type: crate::sinks::elasticsearch::OpenSearchServiceType,
102    pub compression: Compression,
103    pub http_request_config: RequestConfig,
104}
105
106impl HttpRequestBuilder {
107    pub fn new(common: &ElasticsearchCommon, config: &ElasticsearchConfig) -> HttpRequestBuilder {
108        HttpRequestBuilder {
109            bulk_uri: common.bulk_uri.clone(),
110            auth: common.auth.clone(),
111            service_type: common.service_type.clone(),
112            compression: config.compression,
113            http_request_config: config.request.clone(),
114        }
115    }
116
117    pub async fn build_request(
118        &self,
119        es_req: ElasticsearchRequest,
120    ) -> Result<Request<Bytes>, crate::Error> {
121        let mut builder = Request::post(&self.bulk_uri);
122
123        builder = builder.header("Content-Type", "application/x-ndjson");
124
125        if let Some(ce) = self.compression.content_encoding() {
126            builder = builder.header("Content-Encoding", ce);
127        }
128
129        if let Some(ae) = self.compression.accept_encoding() {
130            builder = builder.header("Accept-Encoding", ae);
131        }
132
133        for (header, value) in &self.http_request_config.headers {
134            builder = builder.header(&header[..], &value[..]);
135        }
136
137        let mut request = builder
138            .body(es_req.payload)
139            .expect("Invalid http request value used");
140
141        if let Some(auth) = &self.auth {
142            match auth {
143                Auth::Basic(auth) => {
144                    auth.apply(&mut request);
145                }
146                #[cfg(feature = "aws-core")]
147                Auth::Aws {
148                    credentials_provider: provider,
149                    region,
150                } => {
151                    crate::sinks::elasticsearch::sign_request(
152                        &self.service_type,
153                        &mut request,
154                        provider,
155                        Some(region),
156                    )
157                    .await?;
158                }
159            }
160        }
161
162        Ok(request)
163    }
164}
165
166pub struct ElasticsearchResponse {
167    pub http_response: Response<Bytes>,
168    pub event_status: EventStatus,
169    pub events_byte_size: GroupedCountByteSize,
170}
171
172impl DriverResponse for ElasticsearchResponse {
173    fn event_status(&self) -> EventStatus {
174        self.event_status
175    }
176
177    fn events_sent(&self) -> &GroupedCountByteSize {
178        &self.events_byte_size
179    }
180}
181
182impl Service<ElasticsearchRequest> for ElasticsearchService {
183    type Response = ElasticsearchResponse;
184    type Error = crate::Error;
185    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
186
187    // Emission of an internal event in case of errors is handled upstream by the caller.
188    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
189        Poll::Ready(Ok(()))
190    }
191
192    // Emission of internal events for errors and dropped events is handled upstream by the caller.
193    fn call(&mut self, mut req: ElasticsearchRequest) -> Self::Future {
194        let mut http_service = self.batch_service.clone();
195        Box::pin(async move {
196            http_service.ready().await?;
197            let events_byte_size =
198                std::mem::take(req.metadata_mut()).into_events_estimated_json_encoded_byte_size();
199            let http_response = http_service.call(req).await?;
200
201            let event_status = get_event_status(&http_response);
202            Ok(ElasticsearchResponse {
203                event_status,
204                http_response,
205                events_byte_size,
206            })
207        })
208    }
209}
210
211// This event is not part of the event framework but is kept because some users were depending on it
212// to identify the number of errors returned by Elasticsearch. It can be dropped when we have better
213// telemetry. Ref: #15886
214fn emit_bad_response_error(response: &Response<Bytes>) {
215    let error_code = format!("http_response_{}", response.status().as_u16());
216
217    error!(
218        message =  "Response contained errors.",
219        error_code = error_code,
220        response = ?response,
221    );
222}
223
224fn get_event_status(response: &Response<Bytes>) -> EventStatus {
225    let status = response.status();
226    if status.is_success() {
227        let body = String::from_utf8_lossy(response.body());
228        if body.contains("\"errors\":true") {
229            emit_bad_response_error(response);
230            EventStatus::Rejected
231        } else {
232            EventStatus::Delivered
233        }
234    } else if status.is_server_error() {
235        emit_bad_response_error(response);
236        EventStatus::Errored
237    } else {
238        emit_bad_response_error(response);
239        EventStatus::Rejected
240    }
241}