vector/sinks/splunk_hec/common/
acknowledgements.rs

1use http_body::{Body as _, Collected};
2use hyper::Body;
3use serde::{Deserialize, Serialize};
4use std::{
5    collections::HashMap,
6    io::Write,
7    num::{NonZeroU8, NonZeroU64},
8    sync::Arc,
9    time::Duration,
10};
11use tokio::sync::{mpsc::Receiver, oneshot::Sender};
12use vector_lib::{configurable::configurable_component, event::EventStatus};
13
14use super::service::{HttpRequestBuilder, MetadataFields};
15use crate::{
16    config::AcknowledgementsConfig,
17    http::HttpClient,
18    internal_events::{
19        SplunkIndexerAcknowledgementAPIError, SplunkIndexerAcknowledgementAckAdded,
20        SplunkIndexerAcknowledgementAcksRemoved,
21    },
22    sinks::util::Compressor,
23};
24
25/// Splunk HEC acknowledgement configuration.
26#[configurable_component]
27#[derive(Clone, Debug)]
28#[serde(default)]
29#[configurable(metadata(docs::advanced))]
30pub struct HecClientAcknowledgementsConfig {
31    /// Controls if the sink integrates with [Splunk HEC indexer acknowledgements][splunk_indexer_ack_docs] for end-to-end acknowledgements.
32    ///
33    /// [splunk_indexer_ack_docs]: https://docs.splunk.com/Documentation/Splunk/8.2.3/Data/AboutHECIDXAck
34    pub indexer_acknowledgements_enabled: bool,
35
36    /// The amount of time to wait between queries to the Splunk HEC indexer acknowledgement endpoint.
37    #[configurable(metadata(docs::type_unit = "seconds"))]
38    pub query_interval: NonZeroU8,
39
40    /// The maximum number of times an acknowledgement ID is queried for its status.
41    pub retry_limit: NonZeroU8,
42
43    /// The maximum number of pending acknowledgements from events sent to the Splunk HEC collector.
44    ///
45    /// Once reached, the sink begins applying backpressure.
46    pub max_pending_acks: NonZeroU64,
47
48    #[serde(
49        default,
50        deserialize_with = "crate::serde::bool_or_struct",
51        flatten,
52        skip_serializing_if = "crate::serde::is_default"
53    )]
54    pub inner: AcknowledgementsConfig,
55}
56
57impl Default for HecClientAcknowledgementsConfig {
58    fn default() -> Self {
59        Self {
60            indexer_acknowledgements_enabled: true,
61            query_interval: NonZeroU8::new(10).unwrap(),
62            retry_limit: NonZeroU8::new(30).unwrap(),
63            max_pending_acks: NonZeroU64::new(1_000_000).unwrap(),
64            inner: Default::default(),
65        }
66    }
67}
68
69#[derive(Deserialize, Serialize, Eq, PartialEq, Debug)]
70pub struct HecAckStatusRequest {
71    pub acks: Vec<u64>,
72}
73
74#[derive(Deserialize, Serialize, Debug)]
75pub struct HecAckStatusResponse {
76    pub acks: HashMap<u64, bool>,
77}
78
79#[derive(Debug)]
80pub enum HecAckApiError {
81    ClientBuildRequest,
82    ClientParseResponse,
83    ClientSendQuery,
84    ServerSendQuery,
85}
86
87struct HecAckClient {
88    acks: HashMap<u64, (u8, Sender<EventStatus>)>,
89    retry_limit: u8,
90    client: HttpClient,
91    http_request_builder: Arc<HttpRequestBuilder>,
92}
93
94impl HecAckClient {
95    fn new(
96        retry_limit: u8,
97        client: HttpClient,
98        http_request_builder: Arc<HttpRequestBuilder>,
99    ) -> Self {
100        Self {
101            acks: HashMap::new(),
102            retry_limit,
103            client,
104            http_request_builder,
105        }
106    }
107
108    /// Adds an ack id to be queried
109    fn add(&mut self, ack_id: u64, ack_event_status_sender: Sender<EventStatus>) {
110        self.acks
111            .insert(ack_id, (self.retry_limit, ack_event_status_sender));
112        emit!(SplunkIndexerAcknowledgementAckAdded);
113    }
114
115    /// Queries Splunk HEC with stored ack ids and finalizes events that are successfully acked
116    async fn run(&mut self) {
117        let ack_query_body = self.get_ack_query_body();
118        if !ack_query_body.acks.is_empty() {
119            let ack_query_response = self.send_ack_query_request(&ack_query_body).await;
120
121            match ack_query_response {
122                Ok(ack_query_response) => {
123                    debug!(message = "Received ack statuses.", ?ack_query_response);
124                    let acked_ack_ids = ack_query_response
125                        .acks
126                        .iter()
127                        .filter(|&(_ack_id, ack_status)| *ack_status)
128                        .map(|(ack_id, _ack_status)| *ack_id)
129                        .collect::<Vec<u64>>();
130                    self.finalize_delivered_ack_ids(acked_ack_ids.as_slice());
131                    self.expire_ack_ids_with_status(EventStatus::Rejected);
132                }
133                Err(error) => {
134                    match error {
135                        HecAckApiError::ClientParseResponse | HecAckApiError::ClientSendQuery => {
136                            // If we are permanently unable to interact with
137                            // Splunk HEC indexer acknowledgements (e.g. due to
138                            // request/response format changes in future
139                            // versions), log an error and fall back to default
140                            // behavior.
141                            emit!(SplunkIndexerAcknowledgementAPIError {
142                                message: "Unable to use indexer acknowledgements. Acknowledging based on initial 200 OK.",
143                                error,
144                            });
145                            self.finalize_delivered_ack_ids(
146                                self.acks.keys().copied().collect::<Vec<_>>().as_slice(),
147                            );
148                        }
149                        _ => {
150                            emit!(SplunkIndexerAcknowledgementAPIError {
151                                message: "Unable to send acknowledgement query request. Will retry.",
152                                error,
153                            });
154                            self.expire_ack_ids_with_status(EventStatus::Errored);
155                        }
156                    }
157                }
158            };
159        }
160    }
161
162    /// Removes successfully acked ack ids and finalizes associated events
163    fn finalize_delivered_ack_ids(&mut self, ack_ids: &[u64]) {
164        let mut removed_count = 0.0;
165        for ack_id in ack_ids {
166            if let Some((_, ack_event_status_sender)) = self.acks.remove(ack_id) {
167                _ = ack_event_status_sender.send(EventStatus::Delivered);
168                removed_count += 1.0;
169                debug!(message = "Finalized ack id.", ?ack_id);
170            }
171        }
172        emit!(SplunkIndexerAcknowledgementAcksRemoved {
173            count: removed_count
174        });
175    }
176
177    /// Builds an ack query body with stored ack ids
178    fn get_ack_query_body(&mut self) -> HecAckStatusRequest {
179        HecAckStatusRequest {
180            acks: self.acks.keys().copied().collect::<Vec<u64>>(),
181        }
182    }
183
184    /// Decrements retry count on all stored ack ids by 1
185    fn decrement_retries(&mut self) {
186        for (retries, _) in self.acks.values_mut() {
187            *retries = retries.checked_sub(1).unwrap_or(0);
188        }
189    }
190
191    /// Removes all expired ack ids (those with a retry count of 0) and
192    /// finalizes associated events with the given status
193    fn expire_ack_ids_with_status(&mut self, status: EventStatus) {
194        let expired_ack_ids = self
195            .acks
196            .iter()
197            .filter_map(|(ack_id, (retries, _))| (*retries == 0).then_some(*ack_id))
198            .collect::<Vec<_>>();
199        let mut removed_count = 0.0;
200        for ack_id in expired_ack_ids {
201            if let Some((_, ack_event_status_sender)) = self.acks.remove(&ack_id) {
202                _ = ack_event_status_sender.send(status);
203                removed_count += 1.0;
204            }
205        }
206        emit!(SplunkIndexerAcknowledgementAcksRemoved {
207            count: removed_count
208        });
209    }
210
211    // Sends an ack status query request to Splunk HEC
212    async fn send_ack_query_request(
213        &mut self,
214        request_body: &HecAckStatusRequest,
215    ) -> Result<HecAckStatusResponse, HecAckApiError> {
216        self.decrement_retries();
217        let request_body_bytes = crate::serde::json::to_bytes(request_body)
218            .map_err(|_| HecAckApiError::ClientBuildRequest)?
219            .freeze();
220        let mut compressor = Compressor::from(self.http_request_builder.compression);
221        compressor
222            .write_all(request_body_bytes.as_ref())
223            .map_err(|_| HecAckApiError::ClientBuildRequest)?;
224        let payload = compressor
225            .finish()
226            .map_err(|_| HecAckApiError::ClientBuildRequest)?
227            .freeze();
228        let request = self
229            .http_request_builder
230            .build_request(
231                payload,
232                "/services/collector/ack",
233                None,
234                MetadataFields::default(),
235                false,
236            )
237            .map_err(|_| HecAckApiError::ClientBuildRequest)?;
238
239        let response = self
240            .client
241            .send(request.map(Body::from))
242            .await
243            .map_err(|_| HecAckApiError::ServerSendQuery)?;
244
245        let status = response.status();
246        if status.is_success() {
247            let response_body = response
248                .into_body()
249                .collect()
250                .await
251                .map(Collected::to_bytes)
252                .map_err(|_| HecAckApiError::ClientParseResponse)?;
253            serde_json::from_slice::<HecAckStatusResponse>(&response_body)
254                .map_err(|_| HecAckApiError::ClientParseResponse)
255        } else if status.is_client_error() {
256            Err(HecAckApiError::ClientSendQuery)
257        } else {
258            Err(HecAckApiError::ServerSendQuery)
259        }
260    }
261}
262
263pub async fn run_acknowledgements(
264    mut receiver: Receiver<(u64, Sender<EventStatus>)>,
265    client: HttpClient,
266    http_request_builder: Arc<HttpRequestBuilder>,
267    indexer_acknowledgements: HecClientAcknowledgementsConfig,
268) {
269    let mut interval = tokio::time::interval(Duration::from_secs(
270        indexer_acknowledgements.query_interval.get() as u64,
271    ));
272    let mut ack_client = HecAckClient::new(
273        indexer_acknowledgements.retry_limit.get(),
274        client,
275        http_request_builder,
276    );
277
278    loop {
279        tokio::select! {
280            _ = interval.tick() => {
281                ack_client.run().await;
282            },
283            ack_info = receiver.recv() => {
284                match ack_info {
285                    Some((ack_id, tx)) => {
286                        ack_client.add(ack_id, tx);
287                        debug!(message = "Stored ack id.", ?ack_id);
288                    },
289                    None => break,
290                }
291            }
292        }
293    }
294}
295
296#[cfg(test)]
297mod tests {
298    use std::sync::Arc;
299
300    use futures_util::{StreamExt, stream::FuturesUnordered};
301    use tokio::sync::oneshot::{self, Receiver};
302    use vector_lib::{config::proxy::ProxyConfig, event::EventStatus};
303
304    use super::HecAckClient;
305    use crate::{
306        http::HttpClient,
307        sinks::{
308            splunk_hec::common::{
309                EndpointTarget, acknowledgements::HecAckStatusRequest, service::HttpRequestBuilder,
310            },
311            util::Compression,
312        },
313    };
314
315    fn get_ack_client(retry_limit: u8) -> HecAckClient {
316        let client = HttpClient::new(None, &ProxyConfig::default()).unwrap();
317        let http_request_builder = HttpRequestBuilder::new(
318            String::from(""),
319            EndpointTarget::default(),
320            String::from(""),
321            Compression::default(),
322        );
323        HecAckClient::new(retry_limit, client, Arc::new(http_request_builder))
324    }
325
326    fn populate_ack_client(
327        ack_client: &mut HecAckClient,
328        ack_ids: &[u64],
329    ) -> Vec<Receiver<EventStatus>> {
330        let mut ack_status_rxs = Vec::new();
331        for ack_id in ack_ids {
332            let (tx, rx) = oneshot::channel();
333            ack_client.add(*ack_id, tx);
334            ack_status_rxs.push(rx);
335        }
336        ack_status_rxs
337    }
338
339    #[test]
340    fn test_get_ack_query_body() {
341        let mut ack_client = get_ack_client(1);
342        let ack_ids = (0..100).collect::<Vec<u64>>();
343        _ = populate_ack_client(&mut ack_client, &ack_ids);
344        let expected_ack_body = HecAckStatusRequest { acks: ack_ids };
345
346        let mut ack_request_body = ack_client.get_ack_query_body();
347        ack_request_body.acks.sort_unstable();
348        assert_eq!(expected_ack_body, ack_request_body);
349    }
350
351    #[test]
352    fn test_decrement_retries() {
353        let mut ack_client = get_ack_client(1);
354        let ack_ids = (0..100).collect::<Vec<u64>>();
355        _ = populate_ack_client(&mut ack_client, &ack_ids);
356
357        let mut ack_request_body = ack_client.get_ack_query_body();
358        ack_request_body.acks.sort_unstable();
359        assert_eq!(ack_ids, ack_request_body.acks);
360        ack_client.decrement_retries();
361        ack_client.expire_ack_ids_with_status(EventStatus::Rejected);
362
363        let ack_request_body = ack_client.get_ack_query_body();
364        assert!(ack_request_body.acks.is_empty())
365    }
366
367    #[tokio::test]
368    async fn test_finalize_delivered_ack_ids() {
369        let mut ack_client = get_ack_client(1);
370        let ack_ids = (0..100).collect::<Vec<u64>>();
371        let ack_status_rxs = populate_ack_client(&mut ack_client, &ack_ids);
372
373        ack_client.finalize_delivered_ack_ids(ack_ids.as_slice());
374        let mut statuses = ack_status_rxs.into_iter().collect::<FuturesUnordered<_>>();
375        while let Some(status) = statuses.next().await {
376            assert_eq!(EventStatus::Delivered, status.unwrap());
377        }
378    }
379
380    #[tokio::test]
381    async fn test_expire_ack_ids_with_status() {
382        let mut ack_client = get_ack_client(1);
383        let ack_ids = (0..100).collect::<Vec<u64>>();
384        let ack_status_rxs = populate_ack_client(&mut ack_client, &ack_ids);
385
386        ack_client.decrement_retries();
387        ack_client.expire_ack_ids_with_status(EventStatus::Rejected);
388        let mut statuses = ack_status_rxs.into_iter().collect::<FuturesUnordered<_>>();
389        while let Some(status) = statuses.next().await {
390            assert_eq!(EventStatus::Rejected, status.unwrap());
391        }
392    }
393}