vector/sinks/elasticsearch/
service.rs1use std::{
2 sync::Arc,
3 task::{Context, Poll},
4};
5
6use bytes::Bytes;
7use futures::future::BoxFuture;
8use http::{Response, Uri};
9use hyper::{service::Service, Body, Request};
10use tower::ServiceExt;
11use vector_lib::stream::DriverResponse;
12use vector_lib::ByteSizeOf;
13use vector_lib::{
14 json_size::JsonSize,
15 request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata},
16};
17
18use super::{ElasticsearchCommon, ElasticsearchConfig};
19use crate::{
20 event::{EventFinalizers, EventStatus, Finalizable},
21 http::HttpClient,
22 sinks::{
23 elasticsearch::{encoder::ProcessedEvent, request_builder::ElasticsearchRequestBuilder},
24 util::{
25 auth::Auth,
26 http::{HttpBatchService, RequestConfig},
27 Compression, ElementCount,
28 },
29 },
30};
31
32#[derive(Clone, Debug)]
33pub struct ElasticsearchRequest {
34 pub payload: Bytes,
35 pub finalizers: EventFinalizers,
36 pub batch_size: usize,
37 pub events_byte_size: JsonSize,
38 pub metadata: RequestMetadata,
39 pub original_events: Vec<ProcessedEvent>, pub elasticsearch_request_builder: ElasticsearchRequestBuilder,
41}
42
43impl ByteSizeOf for ElasticsearchRequest {
44 fn allocated_bytes(&self) -> usize {
45 self.payload.allocated_bytes() + self.finalizers.allocated_bytes()
46 }
47}
48
49impl ElementCount for ElasticsearchRequest {
50 fn element_count(&self) -> usize {
51 self.batch_size
52 }
53}
54
55impl Finalizable for ElasticsearchRequest {
56 fn take_finalizers(&mut self) -> EventFinalizers {
57 std::mem::take(&mut self.finalizers)
58 }
59}
60
61impl MetaDescriptive for ElasticsearchRequest {
62 fn get_metadata(&self) -> &RequestMetadata {
63 &self.metadata
64 }
65
66 fn metadata_mut(&mut self) -> &mut RequestMetadata {
67 &mut self.metadata
68 }
69}
70
71#[derive(Clone)]
72pub struct ElasticsearchService {
73 batch_service: HttpBatchService<
77 BoxFuture<'static, Result<http::Request<Bytes>, crate::Error>>,
78 ElasticsearchRequest,
79 >,
80}
81
82impl ElasticsearchService {
83 pub fn new(
84 http_client: HttpClient<Body>,
85 http_request_builder: HttpRequestBuilder,
86 ) -> ElasticsearchService {
87 let http_request_builder = Arc::new(http_request_builder);
88 let batch_service = HttpBatchService::new(http_client, move |req| {
89 let request_builder = Arc::clone(&http_request_builder);
90 let future: BoxFuture<'static, Result<http::Request<Bytes>, crate::Error>> =
91 Box::pin(async move { request_builder.build_request(req).await });
92 future
93 });
94 ElasticsearchService { batch_service }
95 }
96}
97
98pub struct HttpRequestBuilder {
99 pub bulk_uri: Uri,
100 pub auth: Option<Auth>,
101 pub service_type: crate::sinks::elasticsearch::OpenSearchServiceType,
102 pub compression: Compression,
103 pub http_request_config: RequestConfig,
104}
105
106impl HttpRequestBuilder {
107 pub fn new(common: &ElasticsearchCommon, config: &ElasticsearchConfig) -> HttpRequestBuilder {
108 HttpRequestBuilder {
109 bulk_uri: common.bulk_uri.clone(),
110 auth: common.auth.clone(),
111 service_type: common.service_type.clone(),
112 compression: config.compression,
113 http_request_config: config.request.clone(),
114 }
115 }
116
117 pub async fn build_request(
118 &self,
119 es_req: ElasticsearchRequest,
120 ) -> Result<Request<Bytes>, crate::Error> {
121 let mut builder = Request::post(&self.bulk_uri);
122
123 builder = builder.header("Content-Type", "application/x-ndjson");
124
125 if let Some(ce) = self.compression.content_encoding() {
126 builder = builder.header("Content-Encoding", ce);
127 }
128
129 if let Some(ae) = self.compression.accept_encoding() {
130 builder = builder.header("Accept-Encoding", ae);
131 }
132
133 for (header, value) in &self.http_request_config.headers {
134 builder = builder.header(&header[..], &value[..]);
135 }
136
137 let mut request = builder
138 .body(es_req.payload)
139 .expect("Invalid http request value used");
140
141 if let Some(auth) = &self.auth {
142 match auth {
143 Auth::Basic(auth) => {
144 auth.apply(&mut request);
145 }
146 #[cfg(feature = "aws-core")]
147 Auth::Aws {
148 credentials_provider: provider,
149 region,
150 } => {
151 crate::sinks::elasticsearch::sign_request(
152 &self.service_type,
153 &mut request,
154 provider,
155 Some(region),
156 )
157 .await?;
158 }
159 }
160 }
161
162 Ok(request)
163 }
164}
165
166pub struct ElasticsearchResponse {
167 pub http_response: Response<Bytes>,
168 pub event_status: EventStatus,
169 pub events_byte_size: GroupedCountByteSize,
170}
171
172impl DriverResponse for ElasticsearchResponse {
173 fn event_status(&self) -> EventStatus {
174 self.event_status
175 }
176
177 fn events_sent(&self) -> &GroupedCountByteSize {
178 &self.events_byte_size
179 }
180}
181
182impl Service<ElasticsearchRequest> for ElasticsearchService {
183 type Response = ElasticsearchResponse;
184 type Error = crate::Error;
185 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
186
187 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
189 Poll::Ready(Ok(()))
190 }
191
192 fn call(&mut self, mut req: ElasticsearchRequest) -> Self::Future {
194 let mut http_service = self.batch_service.clone();
195 Box::pin(async move {
196 http_service.ready().await?;
197 let events_byte_size =
198 std::mem::take(req.metadata_mut()).into_events_estimated_json_encoded_byte_size();
199 let http_response = http_service.call(req).await?;
200
201 let event_status = get_event_status(&http_response);
202 Ok(ElasticsearchResponse {
203 event_status,
204 http_response,
205 events_byte_size,
206 })
207 })
208 }
209}
210
211fn emit_bad_response_error(response: &Response<Bytes>) {
215 let error_code = format!("http_response_{}", response.status().as_u16());
216
217 error!(
218 message = "Response contained errors.",
219 error_code = error_code,
220 response = ?response,
221 );
222}
223
224fn get_event_status(response: &Response<Bytes>) -> EventStatus {
225 let status = response.status();
226 if status.is_success() {
227 let body = String::from_utf8_lossy(response.body());
228 if body.contains("\"errors\":true") {
229 emit_bad_response_error(response);
230 EventStatus::Rejected
231 } else {
232 EventStatus::Delivered
233 }
234 } else if status.is_server_error() {
235 emit_bad_response_error(response);
236 EventStatus::Errored
237 } else {
238 emit_bad_response_error(response);
239 EventStatus::Rejected
240 }
241}