vector/sinks/splunk_hec/common/
acknowledgements.rs1use hyper::Body;
2use serde::{Deserialize, Serialize};
3use std::io::Write;
4use std::{
5 collections::HashMap,
6 num::{NonZeroU8, NonZeroU64},
7 sync::Arc,
8 time::Duration,
9};
10use tokio::sync::{mpsc::Receiver, oneshot::Sender};
11use vector_lib::{configurable::configurable_component, event::EventStatus};
12
13use super::service::{HttpRequestBuilder, MetadataFields};
14use crate::sinks::util::Compressor;
15use crate::{
16 config::AcknowledgementsConfig,
17 http::HttpClient,
18 internal_events::{
19 SplunkIndexerAcknowledgementAPIError, SplunkIndexerAcknowledgementAckAdded,
20 SplunkIndexerAcknowledgementAcksRemoved,
21 },
22};
23
24#[configurable_component]
26#[derive(Clone, Debug)]
27#[serde(default)]
28#[configurable(metadata(docs::advanced))]
29pub struct HecClientAcknowledgementsConfig {
30 pub indexer_acknowledgements_enabled: bool,
34
35 #[configurable(metadata(docs::type_unit = "seconds"))]
37 pub query_interval: NonZeroU8,
38
39 pub retry_limit: NonZeroU8,
41
42 pub max_pending_acks: NonZeroU64,
46
47 #[serde(
48 default,
49 deserialize_with = "crate::serde::bool_or_struct",
50 flatten,
51 skip_serializing_if = "crate::serde::is_default"
52 )]
53 pub inner: AcknowledgementsConfig,
54}
55
56impl Default for HecClientAcknowledgementsConfig {
57 fn default() -> Self {
58 Self {
59 indexer_acknowledgements_enabled: true,
60 query_interval: NonZeroU8::new(10).unwrap(),
61 retry_limit: NonZeroU8::new(30).unwrap(),
62 max_pending_acks: NonZeroU64::new(1_000_000).unwrap(),
63 inner: Default::default(),
64 }
65 }
66}
67
68#[derive(Deserialize, Serialize, Eq, PartialEq, Debug)]
69pub struct HecAckStatusRequest {
70 pub acks: Vec<u64>,
71}
72
73#[derive(Deserialize, Serialize, Debug)]
74pub struct HecAckStatusResponse {
75 pub acks: HashMap<u64, bool>,
76}
77
78#[derive(Debug)]
79pub enum HecAckApiError {
80 ClientBuildRequest,
81 ClientParseResponse,
82 ClientSendQuery,
83 ServerSendQuery,
84}
85
86struct HecAckClient {
87 acks: HashMap<u64, (u8, Sender<EventStatus>)>,
88 retry_limit: u8,
89 client: HttpClient,
90 http_request_builder: Arc<HttpRequestBuilder>,
91}
92
93impl HecAckClient {
94 fn new(
95 retry_limit: u8,
96 client: HttpClient,
97 http_request_builder: Arc<HttpRequestBuilder>,
98 ) -> Self {
99 Self {
100 acks: HashMap::new(),
101 retry_limit,
102 client,
103 http_request_builder,
104 }
105 }
106
107 fn add(&mut self, ack_id: u64, ack_event_status_sender: Sender<EventStatus>) {
109 self.acks
110 .insert(ack_id, (self.retry_limit, ack_event_status_sender));
111 emit!(SplunkIndexerAcknowledgementAckAdded);
112 }
113
114 async fn run(&mut self) {
116 let ack_query_body = self.get_ack_query_body();
117 if !ack_query_body.acks.is_empty() {
118 let ack_query_response = self.send_ack_query_request(&ack_query_body).await;
119
120 match ack_query_response {
121 Ok(ack_query_response) => {
122 debug!(message = "Received ack statuses.", ?ack_query_response);
123 let acked_ack_ids = ack_query_response
124 .acks
125 .iter()
126 .filter(|&(_ack_id, ack_status)| *ack_status)
127 .map(|(ack_id, _ack_status)| *ack_id)
128 .collect::<Vec<u64>>();
129 self.finalize_delivered_ack_ids(acked_ack_ids.as_slice());
130 self.expire_ack_ids_with_status(EventStatus::Rejected);
131 }
132 Err(error) => {
133 match error {
134 HecAckApiError::ClientParseResponse | HecAckApiError::ClientSendQuery => {
135 emit!(SplunkIndexerAcknowledgementAPIError {
141 message: "Unable to use indexer acknowledgements. Acknowledging based on initial 200 OK.",
142 error,
143 });
144 self.finalize_delivered_ack_ids(
145 self.acks.keys().copied().collect::<Vec<_>>().as_slice(),
146 );
147 }
148 _ => {
149 emit!(SplunkIndexerAcknowledgementAPIError {
150 message: "Unable to send acknowledgement query request. Will retry.",
151 error,
152 });
153 self.expire_ack_ids_with_status(EventStatus::Errored);
154 }
155 }
156 }
157 };
158 }
159 }
160
161 fn finalize_delivered_ack_ids(&mut self, ack_ids: &[u64]) {
163 let mut removed_count = 0.0;
164 for ack_id in ack_ids {
165 if let Some((_, ack_event_status_sender)) = self.acks.remove(ack_id) {
166 _ = ack_event_status_sender.send(EventStatus::Delivered);
167 removed_count += 1.0;
168 debug!(message = "Finalized ack id.", ?ack_id);
169 }
170 }
171 emit!(SplunkIndexerAcknowledgementAcksRemoved {
172 count: removed_count
173 });
174 }
175
176 fn get_ack_query_body(&mut self) -> HecAckStatusRequest {
178 HecAckStatusRequest {
179 acks: self.acks.keys().copied().collect::<Vec<u64>>(),
180 }
181 }
182
183 fn decrement_retries(&mut self) {
185 for (retries, _) in self.acks.values_mut() {
186 *retries = retries.checked_sub(1).unwrap_or(0);
187 }
188 }
189
190 fn expire_ack_ids_with_status(&mut self, status: EventStatus) {
193 let expired_ack_ids = self
194 .acks
195 .iter()
196 .filter_map(|(ack_id, (retries, _))| (*retries == 0).then_some(*ack_id))
197 .collect::<Vec<_>>();
198 let mut removed_count = 0.0;
199 for ack_id in expired_ack_ids {
200 if let Some((_, ack_event_status_sender)) = self.acks.remove(&ack_id) {
201 _ = ack_event_status_sender.send(status);
202 removed_count += 1.0;
203 }
204 }
205 emit!(SplunkIndexerAcknowledgementAcksRemoved {
206 count: removed_count
207 });
208 }
209
210 async fn send_ack_query_request(
212 &mut self,
213 request_body: &HecAckStatusRequest,
214 ) -> Result<HecAckStatusResponse, HecAckApiError> {
215 self.decrement_retries();
216 let request_body_bytes = crate::serde::json::to_bytes(request_body)
217 .map_err(|_| HecAckApiError::ClientBuildRequest)?
218 .freeze();
219 let mut compressor = Compressor::from(self.http_request_builder.compression);
220 compressor
221 .write_all(request_body_bytes.as_ref())
222 .map_err(|_| HecAckApiError::ClientBuildRequest)?;
223 let payload = compressor
224 .finish()
225 .map_err(|_| HecAckApiError::ClientBuildRequest)?
226 .freeze();
227 let request = self
228 .http_request_builder
229 .build_request(
230 payload,
231 "/services/collector/ack",
232 None,
233 MetadataFields::default(),
234 false,
235 )
236 .map_err(|_| HecAckApiError::ClientBuildRequest)?;
237
238 let response = self
239 .client
240 .send(request.map(Body::from))
241 .await
242 .map_err(|_| HecAckApiError::ServerSendQuery)?;
243
244 let status = response.status();
245 if status.is_success() {
246 let response_body = hyper::body::to_bytes(response.into_body())
247 .await
248 .map_err(|_| HecAckApiError::ClientParseResponse)?;
249 serde_json::from_slice::<HecAckStatusResponse>(&response_body)
250 .map_err(|_| HecAckApiError::ClientParseResponse)
251 } else if status.is_client_error() {
252 Err(HecAckApiError::ClientSendQuery)
253 } else {
254 Err(HecAckApiError::ServerSendQuery)
255 }
256 }
257}
258
259pub async fn run_acknowledgements(
260 mut receiver: Receiver<(u64, Sender<EventStatus>)>,
261 client: HttpClient,
262 http_request_builder: Arc<HttpRequestBuilder>,
263 indexer_acknowledgements: HecClientAcknowledgementsConfig,
264) {
265 let mut interval = tokio::time::interval(Duration::from_secs(
266 indexer_acknowledgements.query_interval.get() as u64,
267 ));
268 let mut ack_client = HecAckClient::new(
269 indexer_acknowledgements.retry_limit.get(),
270 client,
271 http_request_builder,
272 );
273
274 loop {
275 tokio::select! {
276 _ = interval.tick() => {
277 ack_client.run().await;
278 },
279 ack_info = receiver.recv() => {
280 match ack_info {
281 Some((ack_id, tx)) => {
282 ack_client.add(ack_id, tx);
283 debug!(message = "Stored ack id.", ?ack_id);
284 },
285 None => break,
286 }
287 }
288 }
289 }
290}
291
292#[cfg(test)]
293mod tests {
294 use std::sync::Arc;
295
296 use futures_util::{StreamExt, stream::FuturesUnordered};
297 use tokio::sync::oneshot::{self, Receiver};
298 use vector_lib::{config::proxy::ProxyConfig, event::EventStatus};
299
300 use super::HecAckClient;
301 use crate::{
302 http::HttpClient,
303 sinks::{
304 splunk_hec::common::{
305 EndpointTarget, acknowledgements::HecAckStatusRequest, service::HttpRequestBuilder,
306 },
307 util::Compression,
308 },
309 };
310
311 fn get_ack_client(retry_limit: u8) -> HecAckClient {
312 let client = HttpClient::new(None, &ProxyConfig::default()).unwrap();
313 let http_request_builder = HttpRequestBuilder::new(
314 String::from(""),
315 EndpointTarget::default(),
316 String::from(""),
317 Compression::default(),
318 );
319 HecAckClient::new(retry_limit, client, Arc::new(http_request_builder))
320 }
321
322 fn populate_ack_client(
323 ack_client: &mut HecAckClient,
324 ack_ids: &[u64],
325 ) -> Vec<Receiver<EventStatus>> {
326 let mut ack_status_rxs = Vec::new();
327 for ack_id in ack_ids {
328 let (tx, rx) = oneshot::channel();
329 ack_client.add(*ack_id, tx);
330 ack_status_rxs.push(rx);
331 }
332 ack_status_rxs
333 }
334
335 #[test]
336 fn test_get_ack_query_body() {
337 let mut ack_client = get_ack_client(1);
338 let ack_ids = (0..100).collect::<Vec<u64>>();
339 _ = populate_ack_client(&mut ack_client, &ack_ids);
340 let expected_ack_body = HecAckStatusRequest { acks: ack_ids };
341
342 let mut ack_request_body = ack_client.get_ack_query_body();
343 ack_request_body.acks.sort_unstable();
344 assert_eq!(expected_ack_body, ack_request_body);
345 }
346
347 #[test]
348 fn test_decrement_retries() {
349 let mut ack_client = get_ack_client(1);
350 let ack_ids = (0..100).collect::<Vec<u64>>();
351 _ = populate_ack_client(&mut ack_client, &ack_ids);
352
353 let mut ack_request_body = ack_client.get_ack_query_body();
354 ack_request_body.acks.sort_unstable();
355 assert_eq!(ack_ids, ack_request_body.acks);
356 ack_client.decrement_retries();
357 ack_client.expire_ack_ids_with_status(EventStatus::Rejected);
358
359 let ack_request_body = ack_client.get_ack_query_body();
360 assert!(ack_request_body.acks.is_empty())
361 }
362
363 #[tokio::test]
364 async fn test_finalize_delivered_ack_ids() {
365 let mut ack_client = get_ack_client(1);
366 let ack_ids = (0..100).collect::<Vec<u64>>();
367 let ack_status_rxs = populate_ack_client(&mut ack_client, &ack_ids);
368
369 ack_client.finalize_delivered_ack_ids(ack_ids.as_slice());
370 let mut statuses = ack_status_rxs.into_iter().collect::<FuturesUnordered<_>>();
371 while let Some(status) = statuses.next().await {
372 assert_eq!(EventStatus::Delivered, status.unwrap());
373 }
374 }
375
376 #[tokio::test]
377 async fn test_expire_ack_ids_with_status() {
378 let mut ack_client = get_ack_client(1);
379 let ack_ids = (0..100).collect::<Vec<u64>>();
380 let ack_status_rxs = populate_ack_client(&mut ack_client, &ack_ids);
381
382 ack_client.decrement_retries();
383 ack_client.expire_ack_ids_with_status(EventStatus::Rejected);
384 let mut statuses = ack_status_rxs.into_iter().collect::<FuturesUnordered<_>>();
385 while let Some(status) = statuses.next().await {
386 assert_eq!(EventStatus::Rejected, status.unwrap());
387 }
388 }
389}