vector/sinks/splunk_hec/common/
acknowledgements.rs1use std::{
2 collections::HashMap,
3 num::{NonZeroU8, NonZeroU64},
4 sync::Arc,
5 time::Duration,
6};
7
8use hyper::Body;
9use serde::{Deserialize, Serialize};
10use tokio::sync::{mpsc::Receiver, oneshot::Sender};
11use vector_lib::{configurable::configurable_component, event::EventStatus};
12
13use super::service::{HttpRequestBuilder, MetadataFields};
14use crate::{
15 config::AcknowledgementsConfig,
16 http::HttpClient,
17 internal_events::{
18 SplunkIndexerAcknowledgementAPIError, SplunkIndexerAcknowledgementAckAdded,
19 SplunkIndexerAcknowledgementAcksRemoved,
20 },
21 sinks::util::Compression,
22};
23
24#[configurable_component]
26#[derive(Clone, Debug)]
27#[serde(default)]
28#[configurable(metadata(docs::advanced))]
29pub struct HecClientAcknowledgementsConfig {
30 pub indexer_acknowledgements_enabled: bool,
34
35 #[configurable(metadata(docs::type_unit = "seconds"))]
37 pub query_interval: NonZeroU8,
38
39 pub retry_limit: NonZeroU8,
41
42 pub max_pending_acks: NonZeroU64,
46
47 #[serde(
48 default,
49 deserialize_with = "crate::serde::bool_or_struct",
50 flatten,
51 skip_serializing_if = "crate::serde::is_default"
52 )]
53 pub inner: AcknowledgementsConfig,
54}
55
56impl Default for HecClientAcknowledgementsConfig {
57 fn default() -> Self {
58 Self {
59 indexer_acknowledgements_enabled: true,
60 query_interval: NonZeroU8::new(10).unwrap(),
61 retry_limit: NonZeroU8::new(30).unwrap(),
62 max_pending_acks: NonZeroU64::new(1_000_000).unwrap(),
63 inner: Default::default(),
64 }
65 }
66}
67
68#[derive(Deserialize, Serialize, Eq, PartialEq, Debug)]
69pub struct HecAckStatusRequest {
70 pub acks: Vec<u64>,
71}
72
73#[derive(Deserialize, Serialize, Debug)]
74pub struct HecAckStatusResponse {
75 pub acks: HashMap<u64, bool>,
76}
77
78#[derive(Debug)]
79pub enum HecAckApiError {
80 ClientBuildRequest,
81 ClientParseResponse,
82 ClientSendQuery,
83 ServerSendQuery,
84}
85
86struct HecAckClient {
87 acks: HashMap<u64, (u8, Sender<EventStatus>)>,
88 retry_limit: u8,
89 client: HttpClient,
90 http_request_builder: Arc<HttpRequestBuilder>,
91}
92
93impl HecAckClient {
94 fn new(
95 retry_limit: u8,
96 client: HttpClient,
97 http_request_builder: Arc<HttpRequestBuilder>,
98 ) -> Self {
99 let http_request_builder = Arc::new(HttpRequestBuilder {
101 compression: Compression::None,
102 ..(*http_request_builder).clone()
103 });
104
105 Self {
106 acks: HashMap::new(),
107 retry_limit,
108 client,
109 http_request_builder,
110 }
111 }
112
113 fn add(&mut self, ack_id: u64, ack_event_status_sender: Sender<EventStatus>) {
115 self.acks
116 .insert(ack_id, (self.retry_limit, ack_event_status_sender));
117 emit!(SplunkIndexerAcknowledgementAckAdded);
118 }
119
120 async fn run(&mut self) {
122 let ack_query_body = self.get_ack_query_body();
123 if !ack_query_body.acks.is_empty() {
124 let ack_query_response = self.send_ack_query_request(&ack_query_body).await;
125
126 match ack_query_response {
127 Ok(ack_query_response) => {
128 debug!(message = "Received ack statuses.", ?ack_query_response);
129 let acked_ack_ids = ack_query_response
130 .acks
131 .iter()
132 .filter(|&(_ack_id, ack_status)| *ack_status)
133 .map(|(ack_id, _ack_status)| *ack_id)
134 .collect::<Vec<u64>>();
135 self.finalize_delivered_ack_ids(acked_ack_ids.as_slice());
136 self.expire_ack_ids_with_status(EventStatus::Rejected);
137 }
138 Err(error) => {
139 match error {
140 HecAckApiError::ClientParseResponse | HecAckApiError::ClientSendQuery => {
141 emit!(SplunkIndexerAcknowledgementAPIError {
147 message: "Unable to use indexer acknowledgements. Acknowledging based on initial 200 OK.",
148 error,
149 });
150 self.finalize_delivered_ack_ids(
151 self.acks.keys().copied().collect::<Vec<_>>().as_slice(),
152 );
153 }
154 _ => {
155 emit!(SplunkIndexerAcknowledgementAPIError {
156 message: "Unable to send acknowledgement query request. Will retry.",
157 error,
158 });
159 self.expire_ack_ids_with_status(EventStatus::Errored);
160 }
161 }
162 }
163 };
164 }
165 }
166
167 fn finalize_delivered_ack_ids(&mut self, ack_ids: &[u64]) {
169 let mut removed_count = 0.0;
170 for ack_id in ack_ids {
171 if let Some((_, ack_event_status_sender)) = self.acks.remove(ack_id) {
172 _ = ack_event_status_sender.send(EventStatus::Delivered);
173 removed_count += 1.0;
174 debug!(message = "Finalized ack id.", ?ack_id);
175 }
176 }
177 emit!(SplunkIndexerAcknowledgementAcksRemoved {
178 count: removed_count
179 });
180 }
181
182 fn get_ack_query_body(&mut self) -> HecAckStatusRequest {
184 HecAckStatusRequest {
185 acks: self.acks.keys().copied().collect::<Vec<u64>>(),
186 }
187 }
188
189 fn decrement_retries(&mut self) {
191 for (retries, _) in self.acks.values_mut() {
192 *retries = retries.checked_sub(1).unwrap_or(0);
193 }
194 }
195
196 fn expire_ack_ids_with_status(&mut self, status: EventStatus) {
199 let expired_ack_ids = self
200 .acks
201 .iter()
202 .filter_map(|(ack_id, (retries, _))| (*retries == 0).then_some(*ack_id))
203 .collect::<Vec<_>>();
204 let mut removed_count = 0.0;
205 for ack_id in expired_ack_ids {
206 if let Some((_, ack_event_status_sender)) = self.acks.remove(&ack_id) {
207 _ = ack_event_status_sender.send(status);
208 removed_count += 1.0;
209 }
210 }
211 emit!(SplunkIndexerAcknowledgementAcksRemoved {
212 count: removed_count
213 });
214 }
215
216 async fn send_ack_query_request(
218 &mut self,
219 request_body: &HecAckStatusRequest,
220 ) -> Result<HecAckStatusResponse, HecAckApiError> {
221 self.decrement_retries();
222 let request_body_bytes = crate::serde::json::to_bytes(request_body)
223 .map_err(|_| HecAckApiError::ClientBuildRequest)?
224 .freeze();
225 let request = self
226 .http_request_builder
227 .build_request(
228 request_body_bytes,
229 "/services/collector/ack",
230 None,
231 MetadataFields::default(),
232 false,
233 )
234 .map_err(|_| HecAckApiError::ClientBuildRequest)?;
235
236 let response = self
237 .client
238 .send(request.map(Body::from))
239 .await
240 .map_err(|_| HecAckApiError::ServerSendQuery)?;
241
242 let status = response.status();
243 if status.is_success() {
244 let response_body = hyper::body::to_bytes(response.into_body())
245 .await
246 .map_err(|_| HecAckApiError::ClientParseResponse)?;
247 serde_json::from_slice::<HecAckStatusResponse>(&response_body)
248 .map_err(|_| HecAckApiError::ClientParseResponse)
249 } else if status.is_client_error() {
250 Err(HecAckApiError::ClientSendQuery)
251 } else {
252 Err(HecAckApiError::ServerSendQuery)
253 }
254 }
255}
256
257pub async fn run_acknowledgements(
258 mut receiver: Receiver<(u64, Sender<EventStatus>)>,
259 client: HttpClient,
260 http_request_builder: Arc<HttpRequestBuilder>,
261 indexer_acknowledgements: HecClientAcknowledgementsConfig,
262) {
263 let mut interval = tokio::time::interval(Duration::from_secs(
264 indexer_acknowledgements.query_interval.get() as u64,
265 ));
266 let mut ack_client = HecAckClient::new(
267 indexer_acknowledgements.retry_limit.get(),
268 client,
269 http_request_builder,
270 );
271
272 loop {
273 tokio::select! {
274 _ = interval.tick() => {
275 ack_client.run().await;
276 },
277 ack_info = receiver.recv() => {
278 match ack_info {
279 Some((ack_id, tx)) => {
280 ack_client.add(ack_id, tx);
281 debug!(message = "Stored ack id.", ?ack_id);
282 },
283 None => break,
284 }
285 }
286 }
287 }
288}
289
290#[cfg(test)]
291mod tests {
292 use std::sync::Arc;
293
294 use futures_util::{StreamExt, stream::FuturesUnordered};
295 use tokio::sync::oneshot::{self, Receiver};
296 use vector_lib::{config::proxy::ProxyConfig, event::EventStatus};
297
298 use super::HecAckClient;
299 use crate::{
300 http::HttpClient,
301 sinks::{
302 splunk_hec::common::{
303 EndpointTarget, acknowledgements::HecAckStatusRequest, service::HttpRequestBuilder,
304 },
305 util::Compression,
306 },
307 };
308
309 fn get_ack_client(retry_limit: u8) -> HecAckClient {
310 let client = HttpClient::new(None, &ProxyConfig::default()).unwrap();
311 let http_request_builder = HttpRequestBuilder::new(
312 String::from(""),
313 EndpointTarget::default(),
314 String::from(""),
315 Compression::default(),
316 );
317 HecAckClient::new(retry_limit, client, Arc::new(http_request_builder))
318 }
319
320 fn populate_ack_client(
321 ack_client: &mut HecAckClient,
322 ack_ids: &[u64],
323 ) -> Vec<Receiver<EventStatus>> {
324 let mut ack_status_rxs = Vec::new();
325 for ack_id in ack_ids {
326 let (tx, rx) = oneshot::channel();
327 ack_client.add(*ack_id, tx);
328 ack_status_rxs.push(rx);
329 }
330 ack_status_rxs
331 }
332
333 #[test]
334 fn test_get_ack_query_body() {
335 let mut ack_client = get_ack_client(1);
336 let ack_ids = (0..100).collect::<Vec<u64>>();
337 _ = populate_ack_client(&mut ack_client, &ack_ids);
338 let expected_ack_body = HecAckStatusRequest { acks: ack_ids };
339
340 let mut ack_request_body = ack_client.get_ack_query_body();
341 ack_request_body.acks.sort_unstable();
342 assert_eq!(expected_ack_body, ack_request_body);
343 }
344
345 #[test]
346 fn test_decrement_retries() {
347 let mut ack_client = get_ack_client(1);
348 let ack_ids = (0..100).collect::<Vec<u64>>();
349 _ = populate_ack_client(&mut ack_client, &ack_ids);
350
351 let mut ack_request_body = ack_client.get_ack_query_body();
352 ack_request_body.acks.sort_unstable();
353 assert_eq!(ack_ids, ack_request_body.acks);
354 ack_client.decrement_retries();
355 ack_client.expire_ack_ids_with_status(EventStatus::Rejected);
356
357 let ack_request_body = ack_client.get_ack_query_body();
358 assert!(ack_request_body.acks.is_empty())
359 }
360
361 #[tokio::test]
362 async fn test_finalize_delivered_ack_ids() {
363 let mut ack_client = get_ack_client(1);
364 let ack_ids = (0..100).collect::<Vec<u64>>();
365 let ack_status_rxs = populate_ack_client(&mut ack_client, &ack_ids);
366
367 ack_client.finalize_delivered_ack_ids(ack_ids.as_slice());
368 let mut statuses = ack_status_rxs.into_iter().collect::<FuturesUnordered<_>>();
369 while let Some(status) = statuses.next().await {
370 assert_eq!(EventStatus::Delivered, status.unwrap());
371 }
372 }
373
374 #[tokio::test]
375 async fn test_expire_ack_ids_with_status() {
376 let mut ack_client = get_ack_client(1);
377 let ack_ids = (0..100).collect::<Vec<u64>>();
378 let ack_status_rxs = populate_ack_client(&mut ack_client, &ack_ids);
379
380 ack_client.decrement_retries();
381 ack_client.expire_ack_ids_with_status(EventStatus::Rejected);
382 let mut statuses = ack_status_rxs.into_iter().collect::<FuturesUnordered<_>>();
383 while let Some(status) = statuses.next().await {
384 assert_eq!(EventStatus::Rejected, status.unwrap());
385 }
386 }
387}