vector/sinks/elasticsearch/
retry.rs1use crate::sinks::{
2 elasticsearch::encoder::ProcessedEvent,
3 util::{metadata::RequestMetadataBuilder, request_builder::RequestBuilder},
4};
5use crate::{
6 event::Finalizable,
7 http::HttpError,
8 sinks::{
9 elasticsearch::service::{ElasticsearchRequest, ElasticsearchResponse},
10 util::retries::{RetryAction, RetryLogic},
11 },
12};
13use http::StatusCode;
14use serde::Deserialize;
15use vector_lib::json_size::JsonSize;
16use vector_lib::EstimatedJsonEncodedSizeOf;
17
18#[derive(Deserialize, Debug)]
19struct EsResultResponse {
20 items: Vec<EsResultItem>,
21}
22
23impl EsResultResponse {
24 fn parse(body: &str) -> Result<Self, String> {
25 serde_json::from_str::<EsResultResponse>(body).map_err(|json_error| {
26 format!("some messages failed, could not parse response, error: {json_error}")
27 })
28 }
29
30 fn iter_status(&self) -> impl Iterator<Item = (StatusCode, Option<&EsErrorDetails>)> {
32 self.items.iter().filter_map(|item| {
33 item.result()
34 .status
35 .and_then(|status| StatusCode::from_u16(status).ok())
36 .map(|status| (status, item.result().error.as_ref()))
37 })
38 }
39
40 fn get_error_reason(&self, body: &str) -> String {
45 match self
46 .items
47 .iter()
48 .find_map(|item| item.result().error.as_ref())
49 {
50 Some(error) => format!("error type: {}, reason: {}", error.err_type, error.reason),
51 None => format!("error response: {body}"),
52 }
53 }
54}
55
56#[derive(Deserialize, Debug)]
57enum EsResultItem {
58 #[serde(rename = "index")]
59 Index(EsIndexResult),
60 #[serde(rename = "create")]
61 Create(EsIndexResult),
62 #[serde(rename = "update")]
63 Update(EsIndexResult),
64}
65
66impl EsResultItem {
67 #[allow(clippy::missing_const_for_fn)] fn result(&self) -> &EsIndexResult {
69 match self {
70 EsResultItem::Index(r) => r,
71 EsResultItem::Create(r) => r,
72 EsResultItem::Update(r) => r,
73 }
74 }
75}
76
77#[derive(Deserialize, Debug)]
78struct EsIndexResult {
79 status: Option<u16>,
80 error: Option<EsErrorDetails>,
81}
82
83#[derive(Deserialize, Debug)]
84struct EsErrorDetails {
85 reason: String,
86 #[serde(rename = "type")]
87 err_type: String,
88}
89
90#[derive(Clone)]
91pub struct ElasticsearchRetryLogic {
92 pub retry_partial: bool,
93}
94
95impl RetryLogic for ElasticsearchRetryLogic {
96 type Error = HttpError;
97 type Request = ElasticsearchRequest;
98 type Response = ElasticsearchResponse;
99
100 fn is_retriable_error(&self, _error: &Self::Error) -> bool {
101 true
102 }
103
104 fn should_retry_response(
105 &self,
106 response: &ElasticsearchResponse,
107 ) -> RetryAction<ElasticsearchRequest> {
108 let status = response.http_response.status();
109
110 match status {
111 StatusCode::TOO_MANY_REQUESTS => RetryAction::Retry("too many requests".into()),
112 StatusCode::NOT_IMPLEMENTED => {
113 RetryAction::DontRetry("endpoint not implemented".into())
114 }
115 _ if status.is_server_error() => RetryAction::Retry(
116 format!(
117 "{}: {}",
118 status,
119 String::from_utf8_lossy(response.http_response.body())
120 )
121 .into(),
122 ),
123 _ if status.is_client_error() => {
124 let body = String::from_utf8_lossy(response.http_response.body());
125 RetryAction::DontRetry(format!("client-side error, {status}: {body}").into())
126 }
127 _ if status.is_success() => {
128 let body = String::from_utf8_lossy(response.http_response.body());
129
130 if body.contains("\"errors\":true") {
131 match EsResultResponse::parse(&body) {
132 Ok(resp) => {
133 if self.retry_partial {
134 let status_codes: Vec<bool> = resp
138 .iter_status()
139 .map(|(status, _)| {
140 status == StatusCode::TOO_MANY_REQUESTS
141 || status.is_server_error()
142 })
143 .collect();
144 if let Some((_status, _error)) =
145 resp.iter_status().find(|(status, _)| {
146 *status == StatusCode::TOO_MANY_REQUESTS
147 || status.is_server_error()
148 })
149 {
150 return RetryAction::RetryPartial(Box::new(
151 move |req: ElasticsearchRequest| {
152 let mut failed_events: Vec<ProcessedEvent> = req
153 .original_events
154 .clone()
155 .into_iter()
156 .zip(status_codes.iter())
157 .filter(|(_, &flag)| flag)
158 .map(|(item, _)| item)
159 .collect();
160 let finalizers = failed_events.take_finalizers();
161 let batch_size = failed_events.len();
162 let events_byte_size = failed_events
163 .iter()
164 .map(|x| x.log.estimated_json_encoded_size_of())
165 .fold(JsonSize::zero(), |a, b| a + b);
166 let encode_result = match req
167 .elasticsearch_request_builder
168 .encode_events(failed_events.clone())
169 {
170 Ok(s) => s,
171 Err(_) => return req,
172 };
173 let metadata_builder =
174 RequestMetadataBuilder::from_events(&failed_events);
175 let metadata = metadata_builder.build(&encode_result);
176 ElasticsearchRequest {
177 payload: encode_result.into_payload(),
178 finalizers,
179 batch_size,
180 events_byte_size,
181 metadata,
182 original_events: failed_events,
183 elasticsearch_request_builder: req
184 .elasticsearch_request_builder,
185 }
186 },
187 ));
188 }
189 }
190
191 RetryAction::DontRetry(resp.get_error_reason(&body).into())
192 }
193 Err(msg) => RetryAction::DontRetry(msg.into()),
194 }
195 } else {
196 RetryAction::Successful
197 }
198 }
199 _ => RetryAction::DontRetry(format!("response status: {status}").into()),
200 }
201 }
202}
203
204#[cfg(test)]
205mod tests {
206 use bytes::Bytes;
207 use http::Response;
208 use similar_asserts::assert_eq;
209 use vector_lib::{internal_event::CountByteSize, json_size::JsonSize};
210
211 use super::*;
212 use crate::event::EventStatus;
213
214 #[test]
215 fn handles_error_response() {
216 let json = "{\"took\":185,\"errors\":true,\"items\":[{\"index\":{\"_index\":\"test-hgw28jv10u\",\"_type\":\"log_lines\",\"_id\":\"3GhQLXEBE62DvOOUKdFH\",\"status\":400,\"error\":{\"type\":\"illegal_argument_exception\",\"reason\":\"mapper [message] of different type, current_type [long], merged_type [text]\"}}}]}";
217 let response = Response::builder()
218 .status(StatusCode::OK)
219 .body(Bytes::from(json))
220 .unwrap();
221 let logic = ElasticsearchRetryLogic {
222 retry_partial: false,
223 };
224 assert!(matches!(
225 logic.should_retry_response(&ElasticsearchResponse {
226 http_response: response,
227 event_status: EventStatus::Rejected,
228 events_byte_size: CountByteSize(1, JsonSize::new(1)).into(),
229 }),
230 RetryAction::DontRetry(_)
231 ));
232 }
233
234 #[test]
235 fn handles_partial_error_response() {
236 let json = "{\"took\":34,\"errors\":true,\"items\":[{\"index\":{\"_index\":\"test-asjkf1234\",\"_type\":\"log_lines\",\"_id\":\"4Z3QLYEBT52RtoOEKz2H\",\"status\":429}}]}";
237 let response = Response::builder()
238 .status(StatusCode::OK)
239 .body(Bytes::from(json))
240 .unwrap();
241 let logic = ElasticsearchRetryLogic {
242 retry_partial: true,
243 };
244 assert!(matches!(
245 logic.should_retry_response(&ElasticsearchResponse {
246 http_response: response,
247 event_status: EventStatus::Errored,
248 events_byte_size: CountByteSize(1, JsonSize::new(1)).into(),
249 }),
250 RetryAction::RetryPartial(_)
251 ));
252 }
253
254 #[test]
255 fn get_index_error_reason() {
256 let json = "{\"took\":185,\"errors\":true,\"items\":[{\"index\":{\"_index\":\"test-hgw28jv10u\",\"_type\":\"log_lines\",\"_id\":\"3GhQLXEBE62DvOOUKdFH\",\"status\":400,\"error\":{\"type\":\"illegal_argument_exception\",\"reason\":\"mapper [message] of different type, current_type [long], merged_type [text]\"}}}]}";
257 let reason = match EsResultResponse::parse(json) {
258 Ok(resp) => resp.get_error_reason(json),
259 Err(msg) => msg,
260 };
261 assert_eq!(reason, "error type: illegal_argument_exception, reason: mapper [message] of different type, current_type [long], merged_type [text]");
262 }
263
264 #[test]
265 fn get_create_error_reason() {
266 let json = "{\"took\":3,\"errors\":true,\"items\":[{\"create\":{\"_index\":\"test-hgw28jv10u\",\"_type\":\"_doc\",\"_id\":\"aBLq1HcBWD7eBWkW2nj4\",\"status\":400,\"error\":{\"type\":\"mapper_parsing_exception\",\"reason\":\"object mapping for [host] tried to parse field [host] as object, but found a concrete value\"}}}]}";
267 let reason = match EsResultResponse::parse(json) {
268 Ok(resp) => resp.get_error_reason(json),
269 Err(msg) => msg,
270 };
271 assert_eq!(reason, "error type: mapper_parsing_exception, reason: object mapping for [host] tried to parse field [host] as object, but found a concrete value");
272 }
273}