vector/sinks/elasticsearch/
retry.rs1use http::StatusCode;
2use serde::Deserialize;
3use vector_lib::{EstimatedJsonEncodedSizeOf, json_size::JsonSize};
4
5use crate::{
6 event::Finalizable,
7 http::HttpError,
8 sinks::{
9 elasticsearch::{
10 encoder::ProcessedEvent,
11 service::{ElasticsearchRequest, ElasticsearchResponse},
12 },
13 util::{
14 metadata::RequestMetadataBuilder,
15 request_builder::RequestBuilder,
16 retries::{RetryAction, RetryLogic},
17 },
18 },
19};
20
21#[derive(Deserialize, Debug)]
22struct EsResultResponse {
23 items: Vec<EsResultItem>,
24}
25
26impl EsResultResponse {
27 fn parse(body: &str) -> Result<Self, String> {
28 serde_json::from_str::<EsResultResponse>(body).map_err(|json_error| {
29 format!("some messages failed, could not parse response, error: {json_error}")
30 })
31 }
32
33 fn iter_status(&self) -> impl Iterator<Item = (StatusCode, Option<&EsErrorDetails>)> {
35 self.items.iter().filter_map(|item| {
36 item.result()
37 .status
38 .and_then(|status| StatusCode::from_u16(status).ok())
39 .map(|status| (status, item.result().error.as_ref()))
40 })
41 }
42
43 fn get_error_reason(&self, body: &str) -> String {
48 match self
49 .items
50 .iter()
51 .find_map(|item| item.result().error.as_ref())
52 {
53 Some(error) => format!("error type: {}, reason: {}", error.err_type, error.reason),
54 None => format!("error response: {body}"),
55 }
56 }
57}
58
59#[derive(Deserialize, Debug)]
60enum EsResultItem {
61 #[serde(rename = "index")]
62 Index(EsIndexResult),
63 #[serde(rename = "create")]
64 Create(EsIndexResult),
65 #[serde(rename = "update")]
66 Update(EsIndexResult),
67}
68
69impl EsResultItem {
70 #[allow(clippy::missing_const_for_fn)] fn result(&self) -> &EsIndexResult {
72 match self {
73 EsResultItem::Index(r) => r,
74 EsResultItem::Create(r) => r,
75 EsResultItem::Update(r) => r,
76 }
77 }
78}
79
80#[derive(Deserialize, Debug)]
81struct EsIndexResult {
82 status: Option<u16>,
83 error: Option<EsErrorDetails>,
84}
85
86#[derive(Deserialize, Debug)]
87struct EsErrorDetails {
88 reason: String,
89 #[serde(rename = "type")]
90 err_type: String,
91}
92
93#[derive(Clone)]
94pub struct ElasticsearchRetryLogic {
95 pub retry_partial: bool,
96}
97
98impl RetryLogic for ElasticsearchRetryLogic {
99 type Error = HttpError;
100 type Request = ElasticsearchRequest;
101 type Response = ElasticsearchResponse;
102
103 fn is_retriable_error(&self, _error: &Self::Error) -> bool {
104 true
105 }
106
107 fn should_retry_response(
108 &self,
109 response: &ElasticsearchResponse,
110 ) -> RetryAction<ElasticsearchRequest> {
111 let status = response.http_response.status();
112
113 match status {
114 StatusCode::TOO_MANY_REQUESTS => RetryAction::Retry("too many requests".into()),
115 StatusCode::NOT_IMPLEMENTED => {
116 RetryAction::DontRetry("endpoint not implemented".into())
117 }
118 _ if status.is_server_error() => RetryAction::Retry(
119 format!(
120 "{}: {}",
121 status,
122 String::from_utf8_lossy(response.http_response.body())
123 )
124 .into(),
125 ),
126 _ if status.is_client_error() => {
127 let body = String::from_utf8_lossy(response.http_response.body());
128 RetryAction::DontRetry(format!("client-side error, {status}: {body}").into())
129 }
130 _ if status.is_success() => {
131 let body = String::from_utf8_lossy(response.http_response.body());
132
133 if body.contains("\"errors\":true") {
134 match EsResultResponse::parse(&body) {
135 Ok(resp) => {
136 if self.retry_partial {
137 let status_codes: Vec<bool> = resp
141 .iter_status()
142 .map(|(status, _)| {
143 status == StatusCode::TOO_MANY_REQUESTS
144 || status.is_server_error()
145 })
146 .collect();
147 if let Some((_status, _error)) =
148 resp.iter_status().find(|(status, _)| {
149 *status == StatusCode::TOO_MANY_REQUESTS
150 || status.is_server_error()
151 })
152 {
153 return RetryAction::RetryPartial(Box::new(
154 move |req: ElasticsearchRequest| {
155 let mut failed_events: Vec<ProcessedEvent> = req
156 .original_events
157 .clone()
158 .into_iter()
159 .zip(status_codes.iter())
160 .filter(|&(_, &flag)| flag)
161 .map(|(item, _)| item)
162 .collect();
163 let finalizers = failed_events.take_finalizers();
164 let batch_size = failed_events.len();
165 let events_byte_size = failed_events
166 .iter()
167 .map(|x| x.log.estimated_json_encoded_size_of())
168 .fold(JsonSize::zero(), |a, b| a + b);
169 let encode_result = match req
170 .elasticsearch_request_builder
171 .encode_events(failed_events.clone())
172 {
173 Ok(s) => s,
174 Err(_) => return req,
175 };
176 let metadata_builder =
177 RequestMetadataBuilder::from_events(&failed_events);
178 let metadata = metadata_builder.build(&encode_result);
179 ElasticsearchRequest {
180 payload: encode_result.into_payload(),
181 finalizers,
182 batch_size,
183 events_byte_size,
184 metadata,
185 original_events: failed_events,
186 elasticsearch_request_builder: req
187 .elasticsearch_request_builder,
188 }
189 },
190 ));
191 }
192 }
193
194 RetryAction::DontRetry(resp.get_error_reason(&body).into())
195 }
196 Err(msg) => RetryAction::DontRetry(msg.into()),
197 }
198 } else {
199 RetryAction::Successful
200 }
201 }
202 _ => RetryAction::DontRetry(format!("response status: {status}").into()),
203 }
204 }
205}
206
207#[cfg(test)]
208mod tests {
209 use bytes::Bytes;
210 use http::Response;
211 use similar_asserts::assert_eq;
212 use vector_lib::{internal_event::CountByteSize, json_size::JsonSize};
213
214 use super::*;
215 use crate::event::EventStatus;
216
217 #[test]
218 fn handles_error_response() {
219 let json = "{\"took\":185,\"errors\":true,\"items\":[{\"index\":{\"_index\":\"test-hgw28jv10u\",\"_type\":\"log_lines\",\"_id\":\"3GhQLXEBE62DvOOUKdFH\",\"status\":400,\"error\":{\"type\":\"illegal_argument_exception\",\"reason\":\"mapper [message] of different type, current_type [long], merged_type [text]\"}}}]}";
220 let response = Response::builder()
221 .status(StatusCode::OK)
222 .body(Bytes::from(json))
223 .unwrap();
224 let logic = ElasticsearchRetryLogic {
225 retry_partial: false,
226 };
227 assert!(matches!(
228 logic.should_retry_response(&ElasticsearchResponse {
229 http_response: response,
230 event_status: EventStatus::Rejected,
231 events_byte_size: CountByteSize(1, JsonSize::new(1)).into(),
232 }),
233 RetryAction::DontRetry(_)
234 ));
235 }
236
237 #[test]
238 fn handles_partial_error_response() {
239 let json = "{\"took\":34,\"errors\":true,\"items\":[{\"index\":{\"_index\":\"test-asjkf1234\",\"_type\":\"log_lines\",\"_id\":\"4Z3QLYEBT52RtoOEKz2H\",\"status\":429}}]}";
240 let response = Response::builder()
241 .status(StatusCode::OK)
242 .body(Bytes::from(json))
243 .unwrap();
244 let logic = ElasticsearchRetryLogic {
245 retry_partial: true,
246 };
247 assert!(matches!(
248 logic.should_retry_response(&ElasticsearchResponse {
249 http_response: response,
250 event_status: EventStatus::Errored,
251 events_byte_size: CountByteSize(1, JsonSize::new(1)).into(),
252 }),
253 RetryAction::RetryPartial(_)
254 ));
255 }
256
257 #[test]
258 fn get_index_error_reason() {
259 let json = "{\"took\":185,\"errors\":true,\"items\":[{\"index\":{\"_index\":\"test-hgw28jv10u\",\"_type\":\"log_lines\",\"_id\":\"3GhQLXEBE62DvOOUKdFH\",\"status\":400,\"error\":{\"type\":\"illegal_argument_exception\",\"reason\":\"mapper [message] of different type, current_type [long], merged_type [text]\"}}}]}";
260 let reason = match EsResultResponse::parse(json) {
261 Ok(resp) => resp.get_error_reason(json),
262 Err(msg) => msg,
263 };
264 assert_eq!(
265 reason,
266 "error type: illegal_argument_exception, reason: mapper [message] of different type, current_type [long], merged_type [text]"
267 );
268 }
269
270 #[test]
271 fn get_create_error_reason() {
272 let json = "{\"took\":3,\"errors\":true,\"items\":[{\"create\":{\"_index\":\"test-hgw28jv10u\",\"_type\":\"_doc\",\"_id\":\"aBLq1HcBWD7eBWkW2nj4\",\"status\":400,\"error\":{\"type\":\"mapper_parsing_exception\",\"reason\":\"object mapping for [host] tried to parse field [host] as object, but found a concrete value\"}}}]}";
273 let reason = match EsResultResponse::parse(json) {
274 Ok(resp) => resp.get_error_reason(json),
275 Err(msg) => msg,
276 };
277 assert_eq!(
278 reason,
279 "error type: mapper_parsing_exception, reason: object mapping for [host] tried to parse field [host] as object, but found a concrete value"
280 );
281 }
282}