vector/sinks/elasticsearch/
retry.rs

1use http::StatusCode;
2use serde::Deserialize;
3use vector_lib::{EstimatedJsonEncodedSizeOf, json_size::JsonSize};
4
5use crate::{
6    event::Finalizable,
7    http::HttpError,
8    sinks::{
9        elasticsearch::{
10            encoder::ProcessedEvent,
11            service::{ElasticsearchRequest, ElasticsearchResponse},
12        },
13        util::{
14            metadata::RequestMetadataBuilder,
15            request_builder::RequestBuilder,
16            retries::{RetryAction, RetryLogic},
17        },
18    },
19};
20
21#[derive(Deserialize, Debug)]
22struct EsResultResponse {
23    items: Vec<EsResultItem>,
24}
25
26impl EsResultResponse {
27    fn parse(body: &str) -> Result<Self, String> {
28        serde_json::from_str::<EsResultResponse>(body).map_err(|json_error| {
29            format!("some messages failed, could not parse response, error: {json_error}")
30        })
31    }
32
33    /// Returns iterator over status codes for items and optional error details.
34    fn iter_status(&self) -> impl Iterator<Item = (StatusCode, Option<&EsErrorDetails>)> {
35        self.items.iter().filter_map(|item| {
36            item.result()
37                .status
38                .and_then(|status| StatusCode::from_u16(status).ok())
39                .map(|status| (status, item.result().error.as_ref()))
40        })
41    }
42
43    /// Selects the first error since logging all errors would be quite verbose and many are duplicates.
44    /// If partial retry is enabled and we don't retry, this is because there is no retriable error in the
45    /// response, thus all errors are equally interesting so logging the first is sufficient.
46    /// When partial retry is disabled, we don't retry on any error.
47    fn get_error_reason(&self, body: &str) -> String {
48        match self
49            .items
50            .iter()
51            .find_map(|item| item.result().error.as_ref())
52        {
53            Some(error) => format!("error type: {}, reason: {}", error.err_type, error.reason),
54            None => format!("error response: {body}"),
55        }
56    }
57}
58
59#[derive(Deserialize, Debug)]
60enum EsResultItem {
61    #[serde(rename = "index")]
62    Index(EsIndexResult),
63    #[serde(rename = "create")]
64    Create(EsIndexResult),
65    #[serde(rename = "update")]
66    Update(EsIndexResult),
67}
68
69impl EsResultItem {
70    #[allow(clippy::missing_const_for_fn)] // const cannot run destructor
71    fn result(&self) -> &EsIndexResult {
72        match self {
73            EsResultItem::Index(r) => r,
74            EsResultItem::Create(r) => r,
75            EsResultItem::Update(r) => r,
76        }
77    }
78}
79
80#[derive(Deserialize, Debug)]
81struct EsIndexResult {
82    status: Option<u16>,
83    error: Option<EsErrorDetails>,
84}
85
86#[derive(Deserialize, Debug)]
87struct EsErrorDetails {
88    reason: String,
89    #[serde(rename = "type")]
90    err_type: String,
91}
92
93#[derive(Clone)]
94pub struct ElasticsearchRetryLogic {
95    pub retry_partial: bool,
96}
97
98impl RetryLogic for ElasticsearchRetryLogic {
99    type Error = HttpError;
100    type Request = ElasticsearchRequest;
101    type Response = ElasticsearchResponse;
102
103    fn is_retriable_error(&self, _error: &Self::Error) -> bool {
104        true
105    }
106
107    fn should_retry_response(
108        &self,
109        response: &ElasticsearchResponse,
110    ) -> RetryAction<ElasticsearchRequest> {
111        let status = response.http_response.status();
112
113        match status {
114            StatusCode::TOO_MANY_REQUESTS => RetryAction::Retry("too many requests".into()),
115            StatusCode::NOT_IMPLEMENTED => {
116                RetryAction::DontRetry("endpoint not implemented".into())
117            }
118            _ if status.is_server_error() => RetryAction::Retry(
119                format!(
120                    "{}: {}",
121                    status,
122                    String::from_utf8_lossy(response.http_response.body())
123                )
124                .into(),
125            ),
126            _ if status.is_client_error() => {
127                let body = String::from_utf8_lossy(response.http_response.body());
128                RetryAction::DontRetry(format!("client-side error, {status}: {body}").into())
129            }
130            _ if status.is_success() => {
131                let body = String::from_utf8_lossy(response.http_response.body());
132
133                if body.contains("\"errors\":true") {
134                    match EsResultResponse::parse(&body) {
135                        Ok(resp) => {
136                            if self.retry_partial {
137                                // We will retry if there exists at least one item that
138                                // failed with a retriable error.
139                                // Those are backpressure and server errors.
140                                let status_codes: Vec<bool> = resp
141                                    .iter_status()
142                                    .map(|(status, _)| {
143                                        status == StatusCode::TOO_MANY_REQUESTS
144                                            || status.is_server_error()
145                                    })
146                                    .collect();
147                                if let Some((_status, _error)) =
148                                    resp.iter_status().find(|(status, _)| {
149                                        *status == StatusCode::TOO_MANY_REQUESTS
150                                            || status.is_server_error()
151                                    })
152                                {
153                                    return RetryAction::RetryPartial(Box::new(
154                                        move |req: ElasticsearchRequest| {
155                                            let mut failed_events: Vec<ProcessedEvent> = req
156                                                .original_events
157                                                .clone()
158                                                .into_iter()
159                                                .zip(status_codes.iter())
160                                                .filter(|&(_, &flag)| flag)
161                                                .map(|(item, _)| item)
162                                                .collect();
163                                            let finalizers = failed_events.take_finalizers();
164                                            let batch_size = failed_events.len();
165                                            let events_byte_size = failed_events
166                                                .iter()
167                                                .map(|x| x.log.estimated_json_encoded_size_of())
168                                                .fold(JsonSize::zero(), |a, b| a + b);
169                                            let encode_result = match req
170                                                .elasticsearch_request_builder
171                                                .encode_events(failed_events.clone())
172                                            {
173                                                Ok(s) => s,
174                                                Err(_) => return req,
175                                            };
176                                            let metadata_builder =
177                                                RequestMetadataBuilder::from_events(&failed_events);
178                                            let metadata = metadata_builder.build(&encode_result);
179                                            ElasticsearchRequest {
180                                                payload: encode_result.into_payload(),
181                                                finalizers,
182                                                batch_size,
183                                                events_byte_size,
184                                                metadata,
185                                                original_events: failed_events,
186                                                elasticsearch_request_builder: req
187                                                    .elasticsearch_request_builder,
188                                            }
189                                        },
190                                    ));
191                                }
192                            }
193
194                            RetryAction::DontRetry(resp.get_error_reason(&body).into())
195                        }
196                        Err(msg) => RetryAction::DontRetry(msg.into()),
197                    }
198                } else {
199                    RetryAction::Successful
200                }
201            }
202            _ => RetryAction::DontRetry(format!("response status: {status}").into()),
203        }
204    }
205}
206
207#[cfg(test)]
208mod tests {
209    use bytes::Bytes;
210    use http::Response;
211    use similar_asserts::assert_eq;
212    use vector_lib::{internal_event::CountByteSize, json_size::JsonSize};
213
214    use super::*;
215    use crate::event::EventStatus;
216
217    #[test]
218    fn handles_error_response() {
219        let json = "{\"took\":185,\"errors\":true,\"items\":[{\"index\":{\"_index\":\"test-hgw28jv10u\",\"_type\":\"log_lines\",\"_id\":\"3GhQLXEBE62DvOOUKdFH\",\"status\":400,\"error\":{\"type\":\"illegal_argument_exception\",\"reason\":\"mapper [message] of different type, current_type [long], merged_type [text]\"}}}]}";
220        let response = Response::builder()
221            .status(StatusCode::OK)
222            .body(Bytes::from(json))
223            .unwrap();
224        let logic = ElasticsearchRetryLogic {
225            retry_partial: false,
226        };
227        assert!(matches!(
228            logic.should_retry_response(&ElasticsearchResponse {
229                http_response: response,
230                event_status: EventStatus::Rejected,
231                events_byte_size: CountByteSize(1, JsonSize::new(1)).into(),
232            }),
233            RetryAction::DontRetry(_)
234        ));
235    }
236
237    #[test]
238    fn handles_partial_error_response() {
239        let json = "{\"took\":34,\"errors\":true,\"items\":[{\"index\":{\"_index\":\"test-asjkf1234\",\"_type\":\"log_lines\",\"_id\":\"4Z3QLYEBT52RtoOEKz2H\",\"status\":429}}]}";
240        let response = Response::builder()
241            .status(StatusCode::OK)
242            .body(Bytes::from(json))
243            .unwrap();
244        let logic = ElasticsearchRetryLogic {
245            retry_partial: true,
246        };
247        assert!(matches!(
248            logic.should_retry_response(&ElasticsearchResponse {
249                http_response: response,
250                event_status: EventStatus::Errored,
251                events_byte_size: CountByteSize(1, JsonSize::new(1)).into(),
252            }),
253            RetryAction::RetryPartial(_)
254        ));
255    }
256
257    #[test]
258    fn get_index_error_reason() {
259        let json = "{\"took\":185,\"errors\":true,\"items\":[{\"index\":{\"_index\":\"test-hgw28jv10u\",\"_type\":\"log_lines\",\"_id\":\"3GhQLXEBE62DvOOUKdFH\",\"status\":400,\"error\":{\"type\":\"illegal_argument_exception\",\"reason\":\"mapper [message] of different type, current_type [long], merged_type [text]\"}}}]}";
260        let reason = match EsResultResponse::parse(json) {
261            Ok(resp) => resp.get_error_reason(json),
262            Err(msg) => msg,
263        };
264        assert_eq!(
265            reason,
266            "error type: illegal_argument_exception, reason: mapper [message] of different type, current_type [long], merged_type [text]"
267        );
268    }
269
270    #[test]
271    fn get_create_error_reason() {
272        let json = "{\"took\":3,\"errors\":true,\"items\":[{\"create\":{\"_index\":\"test-hgw28jv10u\",\"_type\":\"_doc\",\"_id\":\"aBLq1HcBWD7eBWkW2nj4\",\"status\":400,\"error\":{\"type\":\"mapper_parsing_exception\",\"reason\":\"object mapping for [host] tried to parse field [host] as object, but found a concrete value\"}}}]}";
273        let reason = match EsResultResponse::parse(json) {
274            Ok(resp) => resp.get_error_reason(json),
275            Err(msg) => msg,
276        };
277        assert_eq!(
278            reason,
279            "error type: mapper_parsing_exception, reason: object mapping for [host] tried to parse field [host] as object, but found a concrete value"
280        );
281    }
282}