vector/sinks/splunk_hec/common/
acknowledgements.rs1use http_body::{Body as _, Collected};
2use hyper::Body;
3use serde::{Deserialize, Serialize};
4use std::{
5 collections::HashMap,
6 io::Write,
7 num::{NonZeroU8, NonZeroU64},
8 sync::Arc,
9 time::Duration,
10};
11use tokio::sync::{mpsc::Receiver, oneshot::Sender};
12use vector_lib::{configurable::configurable_component, event::EventStatus};
13
14use super::service::{HttpRequestBuilder, MetadataFields};
15use crate::{
16 config::AcknowledgementsConfig,
17 http::HttpClient,
18 internal_events::{
19 SplunkIndexerAcknowledgementAPIError, SplunkIndexerAcknowledgementAckAdded,
20 SplunkIndexerAcknowledgementAcksRemoved,
21 },
22 sinks::util::Compressor,
23};
24
25#[configurable_component]
27#[derive(Clone, Debug)]
28#[serde(default)]
29#[configurable(metadata(docs::advanced))]
30pub struct HecClientAcknowledgementsConfig {
31 pub indexer_acknowledgements_enabled: bool,
35
36 #[configurable(metadata(docs::type_unit = "seconds"))]
38 pub query_interval: NonZeroU8,
39
40 pub retry_limit: NonZeroU8,
42
43 pub max_pending_acks: NonZeroU64,
47
48 #[serde(
49 default,
50 deserialize_with = "crate::serde::bool_or_struct",
51 flatten,
52 skip_serializing_if = "crate::serde::is_default"
53 )]
54 pub inner: AcknowledgementsConfig,
55}
56
57impl Default for HecClientAcknowledgementsConfig {
58 fn default() -> Self {
59 Self {
60 indexer_acknowledgements_enabled: true,
61 query_interval: NonZeroU8::new(10).unwrap(),
62 retry_limit: NonZeroU8::new(30).unwrap(),
63 max_pending_acks: NonZeroU64::new(1_000_000).unwrap(),
64 inner: Default::default(),
65 }
66 }
67}
68
69#[derive(Deserialize, Serialize, Eq, PartialEq, Debug)]
70pub struct HecAckStatusRequest {
71 pub acks: Vec<u64>,
72}
73
74#[derive(Deserialize, Serialize, Debug)]
75pub struct HecAckStatusResponse {
76 pub acks: HashMap<u64, bool>,
77}
78
79#[derive(Debug)]
80pub enum HecAckApiError {
81 ClientBuildRequest,
82 ClientParseResponse,
83 ClientSendQuery,
84 ServerSendQuery,
85}
86
87struct HecAckClient {
88 acks: HashMap<u64, (u8, Sender<EventStatus>)>,
89 retry_limit: u8,
90 client: HttpClient,
91 http_request_builder: Arc<HttpRequestBuilder>,
92}
93
94impl HecAckClient {
95 fn new(
96 retry_limit: u8,
97 client: HttpClient,
98 http_request_builder: Arc<HttpRequestBuilder>,
99 ) -> Self {
100 Self {
101 acks: HashMap::new(),
102 retry_limit,
103 client,
104 http_request_builder,
105 }
106 }
107
108 fn add(&mut self, ack_id: u64, ack_event_status_sender: Sender<EventStatus>) {
110 self.acks
111 .insert(ack_id, (self.retry_limit, ack_event_status_sender));
112 emit!(SplunkIndexerAcknowledgementAckAdded);
113 }
114
115 async fn run(&mut self) {
117 let ack_query_body = self.get_ack_query_body();
118 if !ack_query_body.acks.is_empty() {
119 let ack_query_response = self.send_ack_query_request(&ack_query_body).await;
120
121 match ack_query_response {
122 Ok(ack_query_response) => {
123 debug!(message = "Received ack statuses.", ?ack_query_response);
124 let acked_ack_ids = ack_query_response
125 .acks
126 .iter()
127 .filter(|&(_ack_id, ack_status)| *ack_status)
128 .map(|(ack_id, _ack_status)| *ack_id)
129 .collect::<Vec<u64>>();
130 self.finalize_delivered_ack_ids(acked_ack_ids.as_slice());
131 self.expire_ack_ids_with_status(EventStatus::Rejected);
132 }
133 Err(error) => {
134 match error {
135 HecAckApiError::ClientParseResponse | HecAckApiError::ClientSendQuery => {
136 emit!(SplunkIndexerAcknowledgementAPIError {
142 message: "Unable to use indexer acknowledgements. Acknowledging based on initial 200 OK.",
143 error,
144 });
145 self.finalize_delivered_ack_ids(
146 self.acks.keys().copied().collect::<Vec<_>>().as_slice(),
147 );
148 }
149 _ => {
150 emit!(SplunkIndexerAcknowledgementAPIError {
151 message: "Unable to send acknowledgement query request. Will retry.",
152 error,
153 });
154 self.expire_ack_ids_with_status(EventStatus::Errored);
155 }
156 }
157 }
158 };
159 }
160 }
161
162 fn finalize_delivered_ack_ids(&mut self, ack_ids: &[u64]) {
164 let mut removed_count = 0.0;
165 for ack_id in ack_ids {
166 if let Some((_, ack_event_status_sender)) = self.acks.remove(ack_id) {
167 _ = ack_event_status_sender.send(EventStatus::Delivered);
168 removed_count += 1.0;
169 debug!(message = "Finalized ack id.", ?ack_id);
170 }
171 }
172 emit!(SplunkIndexerAcknowledgementAcksRemoved {
173 count: removed_count
174 });
175 }
176
177 fn get_ack_query_body(&mut self) -> HecAckStatusRequest {
179 HecAckStatusRequest {
180 acks: self.acks.keys().copied().collect::<Vec<u64>>(),
181 }
182 }
183
184 fn decrement_retries(&mut self) {
186 for (retries, _) in self.acks.values_mut() {
187 *retries = retries.checked_sub(1).unwrap_or(0);
188 }
189 }
190
191 fn expire_ack_ids_with_status(&mut self, status: EventStatus) {
194 let expired_ack_ids = self
195 .acks
196 .iter()
197 .filter_map(|(ack_id, (retries, _))| (*retries == 0).then_some(*ack_id))
198 .collect::<Vec<_>>();
199 let mut removed_count = 0.0;
200 for ack_id in expired_ack_ids {
201 if let Some((_, ack_event_status_sender)) = self.acks.remove(&ack_id) {
202 _ = ack_event_status_sender.send(status);
203 removed_count += 1.0;
204 }
205 }
206 emit!(SplunkIndexerAcknowledgementAcksRemoved {
207 count: removed_count
208 });
209 }
210
211 async fn send_ack_query_request(
213 &mut self,
214 request_body: &HecAckStatusRequest,
215 ) -> Result<HecAckStatusResponse, HecAckApiError> {
216 self.decrement_retries();
217 let request_body_bytes = crate::serde::json::to_bytes(request_body)
218 .map_err(|_| HecAckApiError::ClientBuildRequest)?
219 .freeze();
220 let mut compressor = Compressor::from(self.http_request_builder.compression);
221 compressor
222 .write_all(request_body_bytes.as_ref())
223 .map_err(|_| HecAckApiError::ClientBuildRequest)?;
224 let payload = compressor
225 .finish()
226 .map_err(|_| HecAckApiError::ClientBuildRequest)?
227 .freeze();
228 let request = self
229 .http_request_builder
230 .build_request(
231 payload,
232 "/services/collector/ack",
233 None,
234 MetadataFields::default(),
235 false,
236 )
237 .map_err(|_| HecAckApiError::ClientBuildRequest)?;
238
239 let response = self
240 .client
241 .send(request.map(Body::from))
242 .await
243 .map_err(|_| HecAckApiError::ServerSendQuery)?;
244
245 let status = response.status();
246 if status.is_success() {
247 let response_body = response
248 .into_body()
249 .collect()
250 .await
251 .map(Collected::to_bytes)
252 .map_err(|_| HecAckApiError::ClientParseResponse)?;
253 serde_json::from_slice::<HecAckStatusResponse>(&response_body)
254 .map_err(|_| HecAckApiError::ClientParseResponse)
255 } else if status.is_client_error() {
256 Err(HecAckApiError::ClientSendQuery)
257 } else {
258 Err(HecAckApiError::ServerSendQuery)
259 }
260 }
261}
262
263pub async fn run_acknowledgements(
264 mut receiver: Receiver<(u64, Sender<EventStatus>)>,
265 client: HttpClient,
266 http_request_builder: Arc<HttpRequestBuilder>,
267 indexer_acknowledgements: HecClientAcknowledgementsConfig,
268) {
269 let mut interval = tokio::time::interval(Duration::from_secs(
270 indexer_acknowledgements.query_interval.get() as u64,
271 ));
272 let mut ack_client = HecAckClient::new(
273 indexer_acknowledgements.retry_limit.get(),
274 client,
275 http_request_builder,
276 );
277
278 loop {
279 tokio::select! {
280 _ = interval.tick() => {
281 ack_client.run().await;
282 },
283 ack_info = receiver.recv() => {
284 match ack_info {
285 Some((ack_id, tx)) => {
286 ack_client.add(ack_id, tx);
287 debug!(message = "Stored ack id.", ?ack_id);
288 },
289 None => break,
290 }
291 }
292 }
293 }
294}
295
296#[cfg(test)]
297mod tests {
298 use std::sync::Arc;
299
300 use futures_util::{StreamExt, stream::FuturesUnordered};
301 use tokio::sync::oneshot::{self, Receiver};
302 use vector_lib::{config::proxy::ProxyConfig, event::EventStatus};
303
304 use super::HecAckClient;
305 use crate::{
306 http::HttpClient,
307 sinks::{
308 splunk_hec::common::{
309 EndpointTarget, acknowledgements::HecAckStatusRequest, service::HttpRequestBuilder,
310 },
311 util::Compression,
312 },
313 };
314
315 fn get_ack_client(retry_limit: u8) -> HecAckClient {
316 let client = HttpClient::new(None, &ProxyConfig::default()).unwrap();
317 let http_request_builder = HttpRequestBuilder::new(
318 String::from(""),
319 EndpointTarget::default(),
320 String::from(""),
321 Compression::default(),
322 );
323 HecAckClient::new(retry_limit, client, Arc::new(http_request_builder))
324 }
325
326 fn populate_ack_client(
327 ack_client: &mut HecAckClient,
328 ack_ids: &[u64],
329 ) -> Vec<Receiver<EventStatus>> {
330 let mut ack_status_rxs = Vec::new();
331 for ack_id in ack_ids {
332 let (tx, rx) = oneshot::channel();
333 ack_client.add(*ack_id, tx);
334 ack_status_rxs.push(rx);
335 }
336 ack_status_rxs
337 }
338
339 #[test]
340 fn test_get_ack_query_body() {
341 let mut ack_client = get_ack_client(1);
342 let ack_ids = (0..100).collect::<Vec<u64>>();
343 _ = populate_ack_client(&mut ack_client, &ack_ids);
344 let expected_ack_body = HecAckStatusRequest { acks: ack_ids };
345
346 let mut ack_request_body = ack_client.get_ack_query_body();
347 ack_request_body.acks.sort_unstable();
348 assert_eq!(expected_ack_body, ack_request_body);
349 }
350
351 #[test]
352 fn test_decrement_retries() {
353 let mut ack_client = get_ack_client(1);
354 let ack_ids = (0..100).collect::<Vec<u64>>();
355 _ = populate_ack_client(&mut ack_client, &ack_ids);
356
357 let mut ack_request_body = ack_client.get_ack_query_body();
358 ack_request_body.acks.sort_unstable();
359 assert_eq!(ack_ids, ack_request_body.acks);
360 ack_client.decrement_retries();
361 ack_client.expire_ack_ids_with_status(EventStatus::Rejected);
362
363 let ack_request_body = ack_client.get_ack_query_body();
364 assert!(ack_request_body.acks.is_empty())
365 }
366
367 #[tokio::test]
368 async fn test_finalize_delivered_ack_ids() {
369 let mut ack_client = get_ack_client(1);
370 let ack_ids = (0..100).collect::<Vec<u64>>();
371 let ack_status_rxs = populate_ack_client(&mut ack_client, &ack_ids);
372
373 ack_client.finalize_delivered_ack_ids(ack_ids.as_slice());
374 let mut statuses = ack_status_rxs.into_iter().collect::<FuturesUnordered<_>>();
375 while let Some(status) = statuses.next().await {
376 assert_eq!(EventStatus::Delivered, status.unwrap());
377 }
378 }
379
380 #[tokio::test]
381 async fn test_expire_ack_ids_with_status() {
382 let mut ack_client = get_ack_client(1);
383 let ack_ids = (0..100).collect::<Vec<u64>>();
384 let ack_status_rxs = populate_ack_client(&mut ack_client, &ack_ids);
385
386 ack_client.decrement_retries();
387 ack_client.expire_ack_ids_with_status(EventStatus::Rejected);
388 let mut statuses = ack_status_rxs.into_iter().collect::<FuturesUnordered<_>>();
389 while let Some(status) = statuses.next().await {
390 assert_eq!(EventStatus::Rejected, status.unwrap());
391 }
392 }
393}