vector/sinks/splunk_hec/common/
acknowledgements.rs1use std::{
2 collections::HashMap,
3 num::{NonZeroU64, NonZeroU8},
4 sync::Arc,
5 time::Duration,
6};
7
8use hyper::Body;
9use serde::{Deserialize, Serialize};
10use tokio::sync::{mpsc::Receiver, oneshot::Sender};
11use vector_lib::configurable::configurable_component;
12use vector_lib::event::EventStatus;
13
14use super::service::{HttpRequestBuilder, MetadataFields};
15use crate::{
16 config::AcknowledgementsConfig,
17 http::HttpClient,
18 internal_events::{
19 SplunkIndexerAcknowledgementAPIError, SplunkIndexerAcknowledgementAckAdded,
20 SplunkIndexerAcknowledgementAcksRemoved,
21 },
22};
23
24#[configurable_component]
26#[derive(Clone, Debug)]
27#[serde(default)]
28#[configurable(metadata(docs::advanced))]
29pub struct HecClientAcknowledgementsConfig {
30 pub indexer_acknowledgements_enabled: bool,
34
35 #[configurable(metadata(docs::type_unit = "seconds"))]
37 pub query_interval: NonZeroU8,
38
39 pub retry_limit: NonZeroU8,
41
42 pub max_pending_acks: NonZeroU64,
46
47 #[serde(
48 default,
49 deserialize_with = "crate::serde::bool_or_struct",
50 flatten,
51 skip_serializing_if = "crate::serde::is_default"
52 )]
53 pub inner: AcknowledgementsConfig,
54}
55
56impl Default for HecClientAcknowledgementsConfig {
57 fn default() -> Self {
58 Self {
59 indexer_acknowledgements_enabled: true,
60 query_interval: NonZeroU8::new(10).unwrap(),
61 retry_limit: NonZeroU8::new(30).unwrap(),
62 max_pending_acks: NonZeroU64::new(1_000_000).unwrap(),
63 inner: Default::default(),
64 }
65 }
66}
67
68#[derive(Deserialize, Serialize, Eq, PartialEq, Debug)]
69pub struct HecAckStatusRequest {
70 pub acks: Vec<u64>,
71}
72
73#[derive(Deserialize, Serialize, Debug)]
74pub struct HecAckStatusResponse {
75 pub acks: HashMap<u64, bool>,
76}
77
78#[derive(Debug)]
79pub enum HecAckApiError {
80 ClientBuildRequest,
81 ClientParseResponse,
82 ClientSendQuery,
83 ServerSendQuery,
84}
85
86struct HecAckClient {
87 acks: HashMap<u64, (u8, Sender<EventStatus>)>,
88 retry_limit: u8,
89 client: HttpClient,
90 http_request_builder: Arc<HttpRequestBuilder>,
91}
92
93impl HecAckClient {
94 fn new(
95 retry_limit: u8,
96 client: HttpClient,
97 http_request_builder: Arc<HttpRequestBuilder>,
98 ) -> Self {
99 Self {
100 acks: HashMap::new(),
101 retry_limit,
102 client,
103 http_request_builder,
104 }
105 }
106
107 fn add(&mut self, ack_id: u64, ack_event_status_sender: Sender<EventStatus>) {
109 self.acks
110 .insert(ack_id, (self.retry_limit, ack_event_status_sender));
111 emit!(SplunkIndexerAcknowledgementAckAdded);
112 }
113
114 async fn run(&mut self) {
116 let ack_query_body = self.get_ack_query_body();
117 if !ack_query_body.acks.is_empty() {
118 let ack_query_response = self.send_ack_query_request(&ack_query_body).await;
119
120 match ack_query_response {
121 Ok(ack_query_response) => {
122 debug!(message = "Received ack statuses.", ?ack_query_response);
123 let acked_ack_ids = ack_query_response
124 .acks
125 .iter()
126 .filter(|&(_ack_id, ack_status)| *ack_status)
127 .map(|(ack_id, _ack_status)| *ack_id)
128 .collect::<Vec<u64>>();
129 self.finalize_delivered_ack_ids(acked_ack_ids.as_slice());
130 self.expire_ack_ids_with_status(EventStatus::Rejected);
131 }
132 Err(error) => {
133 match error {
134 HecAckApiError::ClientParseResponse | HecAckApiError::ClientSendQuery => {
135 emit!(SplunkIndexerAcknowledgementAPIError {
141 message: "Unable to use indexer acknowledgements. Acknowledging based on initial 200 OK.",
142 error,
143 });
144 self.finalize_delivered_ack_ids(
145 self.acks.keys().copied().collect::<Vec<_>>().as_slice(),
146 );
147 }
148 _ => {
149 emit!(SplunkIndexerAcknowledgementAPIError {
150 message:
151 "Unable to send acknowledgement query request. Will retry.",
152 error,
153 });
154 self.expire_ack_ids_with_status(EventStatus::Errored);
155 }
156 }
157 }
158 };
159 }
160 }
161
162 fn finalize_delivered_ack_ids(&mut self, ack_ids: &[u64]) {
164 let mut removed_count = 0.0;
165 for ack_id in ack_ids {
166 if let Some((_, ack_event_status_sender)) = self.acks.remove(ack_id) {
167 _ = ack_event_status_sender.send(EventStatus::Delivered);
168 removed_count += 1.0;
169 debug!(message = "Finalized ack id.", ?ack_id);
170 }
171 }
172 emit!(SplunkIndexerAcknowledgementAcksRemoved {
173 count: removed_count
174 });
175 }
176
177 fn get_ack_query_body(&mut self) -> HecAckStatusRequest {
179 HecAckStatusRequest {
180 acks: self.acks.keys().copied().collect::<Vec<u64>>(),
181 }
182 }
183
184 fn decrement_retries(&mut self) {
186 for (retries, _) in self.acks.values_mut() {
187 *retries = retries.checked_sub(1).unwrap_or(0);
188 }
189 }
190
191 fn expire_ack_ids_with_status(&mut self, status: EventStatus) {
194 let expired_ack_ids = self
195 .acks
196 .iter()
197 .filter_map(|(ack_id, (retries, _))| (*retries == 0).then_some(*ack_id))
198 .collect::<Vec<_>>();
199 let mut removed_count = 0.0;
200 for ack_id in expired_ack_ids {
201 if let Some((_, ack_event_status_sender)) = self.acks.remove(&ack_id) {
202 _ = ack_event_status_sender.send(status);
203 removed_count += 1.0;
204 }
205 }
206 emit!(SplunkIndexerAcknowledgementAcksRemoved {
207 count: removed_count
208 });
209 }
210
211 async fn send_ack_query_request(
213 &mut self,
214 request_body: &HecAckStatusRequest,
215 ) -> Result<HecAckStatusResponse, HecAckApiError> {
216 self.decrement_retries();
217 let request_body_bytes = crate::serde::json::to_bytes(request_body)
218 .map_err(|_| HecAckApiError::ClientBuildRequest)?
219 .freeze();
220 let request = self
221 .http_request_builder
222 .build_request(
223 request_body_bytes,
224 "/services/collector/ack",
225 None,
226 MetadataFields::default(),
227 false,
228 )
229 .map_err(|_| HecAckApiError::ClientBuildRequest)?;
230
231 let response = self
232 .client
233 .send(request.map(Body::from))
234 .await
235 .map_err(|_| HecAckApiError::ServerSendQuery)?;
236
237 let status = response.status();
238 if status.is_success() {
239 let response_body = hyper::body::to_bytes(response.into_body())
240 .await
241 .map_err(|_| HecAckApiError::ClientParseResponse)?;
242 serde_json::from_slice::<HecAckStatusResponse>(&response_body)
243 .map_err(|_| HecAckApiError::ClientParseResponse)
244 } else if status.is_client_error() {
245 Err(HecAckApiError::ClientSendQuery)
246 } else {
247 Err(HecAckApiError::ServerSendQuery)
248 }
249 }
250}
251
252pub async fn run_acknowledgements(
253 mut receiver: Receiver<(u64, Sender<EventStatus>)>,
254 client: HttpClient,
255 http_request_builder: Arc<HttpRequestBuilder>,
256 indexer_acknowledgements: HecClientAcknowledgementsConfig,
257) {
258 let mut interval = tokio::time::interval(Duration::from_secs(
259 indexer_acknowledgements.query_interval.get() as u64,
260 ));
261 let mut ack_client = HecAckClient::new(
262 indexer_acknowledgements.retry_limit.get(),
263 client,
264 http_request_builder,
265 );
266
267 loop {
268 tokio::select! {
269 _ = interval.tick() => {
270 ack_client.run().await;
271 },
272 ack_info = receiver.recv() => {
273 match ack_info {
274 Some((ack_id, tx)) => {
275 ack_client.add(ack_id, tx);
276 debug!(message = "Stored ack id.", ?ack_id);
277 },
278 None => break,
279 }
280 }
281 }
282 }
283}
284
285#[cfg(test)]
286mod tests {
287 use std::sync::Arc;
288
289 use futures_util::{stream::FuturesUnordered, StreamExt};
290 use tokio::sync::oneshot::{self, Receiver};
291 use vector_lib::{config::proxy::ProxyConfig, event::EventStatus};
292
293 use super::HecAckClient;
294 use crate::{
295 http::HttpClient,
296 sinks::{
297 splunk_hec::common::{
298 acknowledgements::HecAckStatusRequest, service::HttpRequestBuilder, EndpointTarget,
299 },
300 util::Compression,
301 },
302 };
303
304 fn get_ack_client(retry_limit: u8) -> HecAckClient {
305 let client = HttpClient::new(None, &ProxyConfig::default()).unwrap();
306 let http_request_builder = HttpRequestBuilder::new(
307 String::from(""),
308 EndpointTarget::default(),
309 String::from(""),
310 Compression::default(),
311 );
312 HecAckClient::new(retry_limit, client, Arc::new(http_request_builder))
313 }
314
315 fn populate_ack_client(
316 ack_client: &mut HecAckClient,
317 ack_ids: &[u64],
318 ) -> Vec<Receiver<EventStatus>> {
319 let mut ack_status_rxs = Vec::new();
320 for ack_id in ack_ids {
321 let (tx, rx) = oneshot::channel();
322 ack_client.add(*ack_id, tx);
323 ack_status_rxs.push(rx);
324 }
325 ack_status_rxs
326 }
327
328 #[test]
329 fn test_get_ack_query_body() {
330 let mut ack_client = get_ack_client(1);
331 let ack_ids = (0..100).collect::<Vec<u64>>();
332 _ = populate_ack_client(&mut ack_client, &ack_ids);
333 let expected_ack_body = HecAckStatusRequest { acks: ack_ids };
334
335 let mut ack_request_body = ack_client.get_ack_query_body();
336 ack_request_body.acks.sort_unstable();
337 assert_eq!(expected_ack_body, ack_request_body);
338 }
339
340 #[test]
341 fn test_decrement_retries() {
342 let mut ack_client = get_ack_client(1);
343 let ack_ids = (0..100).collect::<Vec<u64>>();
344 _ = populate_ack_client(&mut ack_client, &ack_ids);
345
346 let mut ack_request_body = ack_client.get_ack_query_body();
347 ack_request_body.acks.sort_unstable();
348 assert_eq!(ack_ids, ack_request_body.acks);
349 ack_client.decrement_retries();
350 ack_client.expire_ack_ids_with_status(EventStatus::Rejected);
351
352 let ack_request_body = ack_client.get_ack_query_body();
353 assert!(ack_request_body.acks.is_empty())
354 }
355
356 #[tokio::test]
357 async fn test_finalize_delivered_ack_ids() {
358 let mut ack_client = get_ack_client(1);
359 let ack_ids = (0..100).collect::<Vec<u64>>();
360 let ack_status_rxs = populate_ack_client(&mut ack_client, &ack_ids);
361
362 ack_client.finalize_delivered_ack_ids(ack_ids.as_slice());
363 let mut statuses = ack_status_rxs.into_iter().collect::<FuturesUnordered<_>>();
364 while let Some(status) = statuses.next().await {
365 assert_eq!(EventStatus::Delivered, status.unwrap());
366 }
367 }
368
369 #[tokio::test]
370 async fn test_expire_ack_ids_with_status() {
371 let mut ack_client = get_ack_client(1);
372 let ack_ids = (0..100).collect::<Vec<u64>>();
373 let ack_status_rxs = populate_ack_client(&mut ack_client, &ack_ids);
374
375 ack_client.decrement_retries();
376 ack_client.expire_ack_ids_with_status(EventStatus::Rejected);
377 let mut statuses = ack_status_rxs.into_iter().collect::<FuturesUnordered<_>>();
378 while let Some(status) = statuses.next().await {
379 assert_eq!(EventStatus::Rejected, status.unwrap());
380 }
381 }
382}