vector/sinks/splunk_hec/common/
acknowledgements.rs

1use std::{
2    collections::HashMap,
3    num::{NonZeroU64, NonZeroU8},
4    sync::Arc,
5    time::Duration,
6};
7
8use hyper::Body;
9use serde::{Deserialize, Serialize};
10use tokio::sync::{mpsc::Receiver, oneshot::Sender};
11use vector_lib::configurable::configurable_component;
12use vector_lib::event::EventStatus;
13
14use super::service::{HttpRequestBuilder, MetadataFields};
15use crate::{
16    config::AcknowledgementsConfig,
17    http::HttpClient,
18    internal_events::{
19        SplunkIndexerAcknowledgementAPIError, SplunkIndexerAcknowledgementAckAdded,
20        SplunkIndexerAcknowledgementAcksRemoved,
21    },
22};
23
24/// Splunk HEC acknowledgement configuration.
25#[configurable_component]
26#[derive(Clone, Debug)]
27#[serde(default)]
28#[configurable(metadata(docs::advanced))]
29pub struct HecClientAcknowledgementsConfig {
30    /// Controls if the sink integrates with [Splunk HEC indexer acknowledgements][splunk_indexer_ack_docs] for end-to-end acknowledgements.
31    ///
32    /// [splunk_indexer_ack_docs]: https://docs.splunk.com/Documentation/Splunk/8.2.3/Data/AboutHECIDXAck
33    pub indexer_acknowledgements_enabled: bool,
34
35    /// The amount of time to wait between queries to the Splunk HEC indexer acknowledgement endpoint.
36    #[configurable(metadata(docs::type_unit = "seconds"))]
37    pub query_interval: NonZeroU8,
38
39    /// The maximum number of times an acknowledgement ID is queried for its status.
40    pub retry_limit: NonZeroU8,
41
42    /// The maximum number of pending acknowledgements from events sent to the Splunk HEC collector.
43    ///
44    /// Once reached, the sink begins applying backpressure.
45    pub max_pending_acks: NonZeroU64,
46
47    #[serde(
48        default,
49        deserialize_with = "crate::serde::bool_or_struct",
50        flatten,
51        skip_serializing_if = "crate::serde::is_default"
52    )]
53    pub inner: AcknowledgementsConfig,
54}
55
56impl Default for HecClientAcknowledgementsConfig {
57    fn default() -> Self {
58        Self {
59            indexer_acknowledgements_enabled: true,
60            query_interval: NonZeroU8::new(10).unwrap(),
61            retry_limit: NonZeroU8::new(30).unwrap(),
62            max_pending_acks: NonZeroU64::new(1_000_000).unwrap(),
63            inner: Default::default(),
64        }
65    }
66}
67
68#[derive(Deserialize, Serialize, Eq, PartialEq, Debug)]
69pub struct HecAckStatusRequest {
70    pub acks: Vec<u64>,
71}
72
73#[derive(Deserialize, Serialize, Debug)]
74pub struct HecAckStatusResponse {
75    pub acks: HashMap<u64, bool>,
76}
77
78#[derive(Debug)]
79pub enum HecAckApiError {
80    ClientBuildRequest,
81    ClientParseResponse,
82    ClientSendQuery,
83    ServerSendQuery,
84}
85
86struct HecAckClient {
87    acks: HashMap<u64, (u8, Sender<EventStatus>)>,
88    retry_limit: u8,
89    client: HttpClient,
90    http_request_builder: Arc<HttpRequestBuilder>,
91}
92
93impl HecAckClient {
94    fn new(
95        retry_limit: u8,
96        client: HttpClient,
97        http_request_builder: Arc<HttpRequestBuilder>,
98    ) -> Self {
99        Self {
100            acks: HashMap::new(),
101            retry_limit,
102            client,
103            http_request_builder,
104        }
105    }
106
107    /// Adds an ack id to be queried
108    fn add(&mut self, ack_id: u64, ack_event_status_sender: Sender<EventStatus>) {
109        self.acks
110            .insert(ack_id, (self.retry_limit, ack_event_status_sender));
111        emit!(SplunkIndexerAcknowledgementAckAdded);
112    }
113
114    /// Queries Splunk HEC with stored ack ids and finalizes events that are successfully acked
115    async fn run(&mut self) {
116        let ack_query_body = self.get_ack_query_body();
117        if !ack_query_body.acks.is_empty() {
118            let ack_query_response = self.send_ack_query_request(&ack_query_body).await;
119
120            match ack_query_response {
121                Ok(ack_query_response) => {
122                    debug!(message = "Received ack statuses.", ?ack_query_response);
123                    let acked_ack_ids = ack_query_response
124                        .acks
125                        .iter()
126                        .filter(|&(_ack_id, ack_status)| *ack_status)
127                        .map(|(ack_id, _ack_status)| *ack_id)
128                        .collect::<Vec<u64>>();
129                    self.finalize_delivered_ack_ids(acked_ack_ids.as_slice());
130                    self.expire_ack_ids_with_status(EventStatus::Rejected);
131                }
132                Err(error) => {
133                    match error {
134                        HecAckApiError::ClientParseResponse | HecAckApiError::ClientSendQuery => {
135                            // If we are permanently unable to interact with
136                            // Splunk HEC indexer acknowledgements (e.g. due to
137                            // request/response format changes in future
138                            // versions), log an error and fall back to default
139                            // behavior.
140                            emit!(SplunkIndexerAcknowledgementAPIError {
141                                message: "Unable to use indexer acknowledgements. Acknowledging based on initial 200 OK.",
142                                error,
143                            });
144                            self.finalize_delivered_ack_ids(
145                                self.acks.keys().copied().collect::<Vec<_>>().as_slice(),
146                            );
147                        }
148                        _ => {
149                            emit!(SplunkIndexerAcknowledgementAPIError {
150                                message:
151                                    "Unable to send acknowledgement query request. Will retry.",
152                                error,
153                            });
154                            self.expire_ack_ids_with_status(EventStatus::Errored);
155                        }
156                    }
157                }
158            };
159        }
160    }
161
162    /// Removes successfully acked ack ids and finalizes associated events
163    fn finalize_delivered_ack_ids(&mut self, ack_ids: &[u64]) {
164        let mut removed_count = 0.0;
165        for ack_id in ack_ids {
166            if let Some((_, ack_event_status_sender)) = self.acks.remove(ack_id) {
167                _ = ack_event_status_sender.send(EventStatus::Delivered);
168                removed_count += 1.0;
169                debug!(message = "Finalized ack id.", ?ack_id);
170            }
171        }
172        emit!(SplunkIndexerAcknowledgementAcksRemoved {
173            count: removed_count
174        });
175    }
176
177    /// Builds an ack query body with stored ack ids
178    fn get_ack_query_body(&mut self) -> HecAckStatusRequest {
179        HecAckStatusRequest {
180            acks: self.acks.keys().copied().collect::<Vec<u64>>(),
181        }
182    }
183
184    /// Decrements retry count on all stored ack ids by 1
185    fn decrement_retries(&mut self) {
186        for (retries, _) in self.acks.values_mut() {
187            *retries = retries.checked_sub(1).unwrap_or(0);
188        }
189    }
190
191    /// Removes all expired ack ids (those with a retry count of 0) and
192    /// finalizes associated events with the given status
193    fn expire_ack_ids_with_status(&mut self, status: EventStatus) {
194        let expired_ack_ids = self
195            .acks
196            .iter()
197            .filter_map(|(ack_id, (retries, _))| (*retries == 0).then_some(*ack_id))
198            .collect::<Vec<_>>();
199        let mut removed_count = 0.0;
200        for ack_id in expired_ack_ids {
201            if let Some((_, ack_event_status_sender)) = self.acks.remove(&ack_id) {
202                _ = ack_event_status_sender.send(status);
203                removed_count += 1.0;
204            }
205        }
206        emit!(SplunkIndexerAcknowledgementAcksRemoved {
207            count: removed_count
208        });
209    }
210
211    // Sends an ack status query request to Splunk HEC
212    async fn send_ack_query_request(
213        &mut self,
214        request_body: &HecAckStatusRequest,
215    ) -> Result<HecAckStatusResponse, HecAckApiError> {
216        self.decrement_retries();
217        let request_body_bytes = crate::serde::json::to_bytes(request_body)
218            .map_err(|_| HecAckApiError::ClientBuildRequest)?
219            .freeze();
220        let request = self
221            .http_request_builder
222            .build_request(
223                request_body_bytes,
224                "/services/collector/ack",
225                None,
226                MetadataFields::default(),
227                false,
228            )
229            .map_err(|_| HecAckApiError::ClientBuildRequest)?;
230
231        let response = self
232            .client
233            .send(request.map(Body::from))
234            .await
235            .map_err(|_| HecAckApiError::ServerSendQuery)?;
236
237        let status = response.status();
238        if status.is_success() {
239            let response_body = hyper::body::to_bytes(response.into_body())
240                .await
241                .map_err(|_| HecAckApiError::ClientParseResponse)?;
242            serde_json::from_slice::<HecAckStatusResponse>(&response_body)
243                .map_err(|_| HecAckApiError::ClientParseResponse)
244        } else if status.is_client_error() {
245            Err(HecAckApiError::ClientSendQuery)
246        } else {
247            Err(HecAckApiError::ServerSendQuery)
248        }
249    }
250}
251
252pub async fn run_acknowledgements(
253    mut receiver: Receiver<(u64, Sender<EventStatus>)>,
254    client: HttpClient,
255    http_request_builder: Arc<HttpRequestBuilder>,
256    indexer_acknowledgements: HecClientAcknowledgementsConfig,
257) {
258    let mut interval = tokio::time::interval(Duration::from_secs(
259        indexer_acknowledgements.query_interval.get() as u64,
260    ));
261    let mut ack_client = HecAckClient::new(
262        indexer_acknowledgements.retry_limit.get(),
263        client,
264        http_request_builder,
265    );
266
267    loop {
268        tokio::select! {
269            _ = interval.tick() => {
270                ack_client.run().await;
271            },
272            ack_info = receiver.recv() => {
273                match ack_info {
274                    Some((ack_id, tx)) => {
275                        ack_client.add(ack_id, tx);
276                        debug!(message = "Stored ack id.", ?ack_id);
277                    },
278                    None => break,
279                }
280            }
281        }
282    }
283}
284
285#[cfg(test)]
286mod tests {
287    use std::sync::Arc;
288
289    use futures_util::{stream::FuturesUnordered, StreamExt};
290    use tokio::sync::oneshot::{self, Receiver};
291    use vector_lib::{config::proxy::ProxyConfig, event::EventStatus};
292
293    use super::HecAckClient;
294    use crate::{
295        http::HttpClient,
296        sinks::{
297            splunk_hec::common::{
298                acknowledgements::HecAckStatusRequest, service::HttpRequestBuilder, EndpointTarget,
299            },
300            util::Compression,
301        },
302    };
303
304    fn get_ack_client(retry_limit: u8) -> HecAckClient {
305        let client = HttpClient::new(None, &ProxyConfig::default()).unwrap();
306        let http_request_builder = HttpRequestBuilder::new(
307            String::from(""),
308            EndpointTarget::default(),
309            String::from(""),
310            Compression::default(),
311        );
312        HecAckClient::new(retry_limit, client, Arc::new(http_request_builder))
313    }
314
315    fn populate_ack_client(
316        ack_client: &mut HecAckClient,
317        ack_ids: &[u64],
318    ) -> Vec<Receiver<EventStatus>> {
319        let mut ack_status_rxs = Vec::new();
320        for ack_id in ack_ids {
321            let (tx, rx) = oneshot::channel();
322            ack_client.add(*ack_id, tx);
323            ack_status_rxs.push(rx);
324        }
325        ack_status_rxs
326    }
327
328    #[test]
329    fn test_get_ack_query_body() {
330        let mut ack_client = get_ack_client(1);
331        let ack_ids = (0..100).collect::<Vec<u64>>();
332        _ = populate_ack_client(&mut ack_client, &ack_ids);
333        let expected_ack_body = HecAckStatusRequest { acks: ack_ids };
334
335        let mut ack_request_body = ack_client.get_ack_query_body();
336        ack_request_body.acks.sort_unstable();
337        assert_eq!(expected_ack_body, ack_request_body);
338    }
339
340    #[test]
341    fn test_decrement_retries() {
342        let mut ack_client = get_ack_client(1);
343        let ack_ids = (0..100).collect::<Vec<u64>>();
344        _ = populate_ack_client(&mut ack_client, &ack_ids);
345
346        let mut ack_request_body = ack_client.get_ack_query_body();
347        ack_request_body.acks.sort_unstable();
348        assert_eq!(ack_ids, ack_request_body.acks);
349        ack_client.decrement_retries();
350        ack_client.expire_ack_ids_with_status(EventStatus::Rejected);
351
352        let ack_request_body = ack_client.get_ack_query_body();
353        assert!(ack_request_body.acks.is_empty())
354    }
355
356    #[tokio::test]
357    async fn test_finalize_delivered_ack_ids() {
358        let mut ack_client = get_ack_client(1);
359        let ack_ids = (0..100).collect::<Vec<u64>>();
360        let ack_status_rxs = populate_ack_client(&mut ack_client, &ack_ids);
361
362        ack_client.finalize_delivered_ack_ids(ack_ids.as_slice());
363        let mut statuses = ack_status_rxs.into_iter().collect::<FuturesUnordered<_>>();
364        while let Some(status) = statuses.next().await {
365            assert_eq!(EventStatus::Delivered, status.unwrap());
366        }
367    }
368
369    #[tokio::test]
370    async fn test_expire_ack_ids_with_status() {
371        let mut ack_client = get_ack_client(1);
372        let ack_ids = (0..100).collect::<Vec<u64>>();
373        let ack_status_rxs = populate_ack_client(&mut ack_client, &ack_ids);
374
375        ack_client.decrement_retries();
376        ack_client.expire_ack_ids_with_status(EventStatus::Rejected);
377        let mut statuses = ack_status_rxs.into_iter().collect::<FuturesUnordered<_>>();
378        while let Some(status) = statuses.next().await {
379            assert_eq!(EventStatus::Rejected, status.unwrap());
380        }
381    }
382}