vector/sinks/elasticsearch/
retry.rs

1use crate::sinks::{
2    elasticsearch::encoder::ProcessedEvent,
3    util::{metadata::RequestMetadataBuilder, request_builder::RequestBuilder},
4};
5use crate::{
6    event::Finalizable,
7    http::HttpError,
8    sinks::{
9        elasticsearch::service::{ElasticsearchRequest, ElasticsearchResponse},
10        util::retries::{RetryAction, RetryLogic},
11    },
12};
13use http::StatusCode;
14use serde::Deserialize;
15use vector_lib::json_size::JsonSize;
16use vector_lib::EstimatedJsonEncodedSizeOf;
17
18#[derive(Deserialize, Debug)]
19struct EsResultResponse {
20    items: Vec<EsResultItem>,
21}
22
23impl EsResultResponse {
24    fn parse(body: &str) -> Result<Self, String> {
25        serde_json::from_str::<EsResultResponse>(body).map_err(|json_error| {
26            format!("some messages failed, could not parse response, error: {json_error}")
27        })
28    }
29
30    /// Returns iterator over status codes for items and optional error details.
31    fn iter_status(&self) -> impl Iterator<Item = (StatusCode, Option<&EsErrorDetails>)> {
32        self.items.iter().filter_map(|item| {
33            item.result()
34                .status
35                .and_then(|status| StatusCode::from_u16(status).ok())
36                .map(|status| (status, item.result().error.as_ref()))
37        })
38    }
39
40    /// Selects the first error since logging all errors would be quite verbose and many are duplicates.
41    /// If partial retry is enabled and we don't retry, this is because there is no retriable error in the
42    /// response, thus all errors are equally interesting so logging the first is sufficient.
43    /// When partial retry is disabled, we don't retry on any error.
44    fn get_error_reason(&self, body: &str) -> String {
45        match self
46            .items
47            .iter()
48            .find_map(|item| item.result().error.as_ref())
49        {
50            Some(error) => format!("error type: {}, reason: {}", error.err_type, error.reason),
51            None => format!("error response: {body}"),
52        }
53    }
54}
55
56#[derive(Deserialize, Debug)]
57enum EsResultItem {
58    #[serde(rename = "index")]
59    Index(EsIndexResult),
60    #[serde(rename = "create")]
61    Create(EsIndexResult),
62    #[serde(rename = "update")]
63    Update(EsIndexResult),
64}
65
66impl EsResultItem {
67    #[allow(clippy::missing_const_for_fn)] // const cannot run destructor
68    fn result(&self) -> &EsIndexResult {
69        match self {
70            EsResultItem::Index(r) => r,
71            EsResultItem::Create(r) => r,
72            EsResultItem::Update(r) => r,
73        }
74    }
75}
76
77#[derive(Deserialize, Debug)]
78struct EsIndexResult {
79    status: Option<u16>,
80    error: Option<EsErrorDetails>,
81}
82
83#[derive(Deserialize, Debug)]
84struct EsErrorDetails {
85    reason: String,
86    #[serde(rename = "type")]
87    err_type: String,
88}
89
90#[derive(Clone)]
91pub struct ElasticsearchRetryLogic {
92    pub retry_partial: bool,
93}
94
95impl RetryLogic for ElasticsearchRetryLogic {
96    type Error = HttpError;
97    type Request = ElasticsearchRequest;
98    type Response = ElasticsearchResponse;
99
100    fn is_retriable_error(&self, _error: &Self::Error) -> bool {
101        true
102    }
103
104    fn should_retry_response(
105        &self,
106        response: &ElasticsearchResponse,
107    ) -> RetryAction<ElasticsearchRequest> {
108        let status = response.http_response.status();
109
110        match status {
111            StatusCode::TOO_MANY_REQUESTS => RetryAction::Retry("too many requests".into()),
112            StatusCode::NOT_IMPLEMENTED => {
113                RetryAction::DontRetry("endpoint not implemented".into())
114            }
115            _ if status.is_server_error() => RetryAction::Retry(
116                format!(
117                    "{}: {}",
118                    status,
119                    String::from_utf8_lossy(response.http_response.body())
120                )
121                .into(),
122            ),
123            _ if status.is_client_error() => {
124                let body = String::from_utf8_lossy(response.http_response.body());
125                RetryAction::DontRetry(format!("client-side error, {status}: {body}").into())
126            }
127            _ if status.is_success() => {
128                let body = String::from_utf8_lossy(response.http_response.body());
129
130                if body.contains("\"errors\":true") {
131                    match EsResultResponse::parse(&body) {
132                        Ok(resp) => {
133                            if self.retry_partial {
134                                // We will retry if there exists at least one item that
135                                // failed with a retriable error.
136                                // Those are backpressure and server errors.
137                                let status_codes: Vec<bool> = resp
138                                    .iter_status()
139                                    .map(|(status, _)| {
140                                        status == StatusCode::TOO_MANY_REQUESTS
141                                            || status.is_server_error()
142                                    })
143                                    .collect();
144                                if let Some((_status, _error)) =
145                                    resp.iter_status().find(|(status, _)| {
146                                        *status == StatusCode::TOO_MANY_REQUESTS
147                                            || status.is_server_error()
148                                    })
149                                {
150                                    return RetryAction::RetryPartial(Box::new(
151                                        move |req: ElasticsearchRequest| {
152                                            let mut failed_events: Vec<ProcessedEvent> = req
153                                                .original_events
154                                                .clone()
155                                                .into_iter()
156                                                .zip(status_codes.iter())
157                                                .filter(|(_, &flag)| flag)
158                                                .map(|(item, _)| item)
159                                                .collect();
160                                            let finalizers = failed_events.take_finalizers();
161                                            let batch_size = failed_events.len();
162                                            let events_byte_size = failed_events
163                                                .iter()
164                                                .map(|x| x.log.estimated_json_encoded_size_of())
165                                                .fold(JsonSize::zero(), |a, b| a + b);
166                                            let encode_result = match req
167                                                .elasticsearch_request_builder
168                                                .encode_events(failed_events.clone())
169                                            {
170                                                Ok(s) => s,
171                                                Err(_) => return req,
172                                            };
173                                            let metadata_builder =
174                                                RequestMetadataBuilder::from_events(&failed_events);
175                                            let metadata = metadata_builder.build(&encode_result);
176                                            ElasticsearchRequest {
177                                                payload: encode_result.into_payload(),
178                                                finalizers,
179                                                batch_size,
180                                                events_byte_size,
181                                                metadata,
182                                                original_events: failed_events,
183                                                elasticsearch_request_builder: req
184                                                    .elasticsearch_request_builder,
185                                            }
186                                        },
187                                    ));
188                                }
189                            }
190
191                            RetryAction::DontRetry(resp.get_error_reason(&body).into())
192                        }
193                        Err(msg) => RetryAction::DontRetry(msg.into()),
194                    }
195                } else {
196                    RetryAction::Successful
197                }
198            }
199            _ => RetryAction::DontRetry(format!("response status: {status}").into()),
200        }
201    }
202}
203
204#[cfg(test)]
205mod tests {
206    use bytes::Bytes;
207    use http::Response;
208    use similar_asserts::assert_eq;
209    use vector_lib::{internal_event::CountByteSize, json_size::JsonSize};
210
211    use super::*;
212    use crate::event::EventStatus;
213
214    #[test]
215    fn handles_error_response() {
216        let json = "{\"took\":185,\"errors\":true,\"items\":[{\"index\":{\"_index\":\"test-hgw28jv10u\",\"_type\":\"log_lines\",\"_id\":\"3GhQLXEBE62DvOOUKdFH\",\"status\":400,\"error\":{\"type\":\"illegal_argument_exception\",\"reason\":\"mapper [message] of different type, current_type [long], merged_type [text]\"}}}]}";
217        let response = Response::builder()
218            .status(StatusCode::OK)
219            .body(Bytes::from(json))
220            .unwrap();
221        let logic = ElasticsearchRetryLogic {
222            retry_partial: false,
223        };
224        assert!(matches!(
225            logic.should_retry_response(&ElasticsearchResponse {
226                http_response: response,
227                event_status: EventStatus::Rejected,
228                events_byte_size: CountByteSize(1, JsonSize::new(1)).into(),
229            }),
230            RetryAction::DontRetry(_)
231        ));
232    }
233
234    #[test]
235    fn handles_partial_error_response() {
236        let json = "{\"took\":34,\"errors\":true,\"items\":[{\"index\":{\"_index\":\"test-asjkf1234\",\"_type\":\"log_lines\",\"_id\":\"4Z3QLYEBT52RtoOEKz2H\",\"status\":429}}]}";
237        let response = Response::builder()
238            .status(StatusCode::OK)
239            .body(Bytes::from(json))
240            .unwrap();
241        let logic = ElasticsearchRetryLogic {
242            retry_partial: true,
243        };
244        assert!(matches!(
245            logic.should_retry_response(&ElasticsearchResponse {
246                http_response: response,
247                event_status: EventStatus::Errored,
248                events_byte_size: CountByteSize(1, JsonSize::new(1)).into(),
249            }),
250            RetryAction::RetryPartial(_)
251        ));
252    }
253
254    #[test]
255    fn get_index_error_reason() {
256        let json = "{\"took\":185,\"errors\":true,\"items\":[{\"index\":{\"_index\":\"test-hgw28jv10u\",\"_type\":\"log_lines\",\"_id\":\"3GhQLXEBE62DvOOUKdFH\",\"status\":400,\"error\":{\"type\":\"illegal_argument_exception\",\"reason\":\"mapper [message] of different type, current_type [long], merged_type [text]\"}}}]}";
257        let reason = match EsResultResponse::parse(json) {
258            Ok(resp) => resp.get_error_reason(json),
259            Err(msg) => msg,
260        };
261        assert_eq!(reason, "error type: illegal_argument_exception, reason: mapper [message] of different type, current_type [long], merged_type [text]");
262    }
263
264    #[test]
265    fn get_create_error_reason() {
266        let json = "{\"took\":3,\"errors\":true,\"items\":[{\"create\":{\"_index\":\"test-hgw28jv10u\",\"_type\":\"_doc\",\"_id\":\"aBLq1HcBWD7eBWkW2nj4\",\"status\":400,\"error\":{\"type\":\"mapper_parsing_exception\",\"reason\":\"object mapping for [host] tried to parse field [host] as object, but found a concrete value\"}}}]}";
267        let reason = match EsResultResponse::parse(json) {
268            Ok(resp) => resp.get_error_reason(json),
269            Err(msg) => msg,
270        };
271        assert_eq!(reason, "error type: mapper_parsing_exception, reason: object mapping for [host] tried to parse field [host] as object, but found a concrete value");
272    }
273}