vector/sinks/splunk_hec/common/
acknowledgements.rs

1use std::{
2    collections::HashMap,
3    num::{NonZeroU8, NonZeroU64},
4    sync::Arc,
5    time::Duration,
6};
7
8use hyper::Body;
9use serde::{Deserialize, Serialize};
10use tokio::sync::{mpsc::Receiver, oneshot::Sender};
11use vector_lib::{configurable::configurable_component, event::EventStatus};
12
13use super::service::{HttpRequestBuilder, MetadataFields};
14use crate::{
15    config::AcknowledgementsConfig,
16    http::HttpClient,
17    internal_events::{
18        SplunkIndexerAcknowledgementAPIError, SplunkIndexerAcknowledgementAckAdded,
19        SplunkIndexerAcknowledgementAcksRemoved,
20    },
21    sinks::util::Compression,
22};
23
24/// Splunk HEC acknowledgement configuration.
25#[configurable_component]
26#[derive(Clone, Debug)]
27#[serde(default)]
28#[configurable(metadata(docs::advanced))]
29pub struct HecClientAcknowledgementsConfig {
30    /// Controls if the sink integrates with [Splunk HEC indexer acknowledgements][splunk_indexer_ack_docs] for end-to-end acknowledgements.
31    ///
32    /// [splunk_indexer_ack_docs]: https://docs.splunk.com/Documentation/Splunk/8.2.3/Data/AboutHECIDXAck
33    pub indexer_acknowledgements_enabled: bool,
34
35    /// The amount of time to wait between queries to the Splunk HEC indexer acknowledgement endpoint.
36    #[configurable(metadata(docs::type_unit = "seconds"))]
37    pub query_interval: NonZeroU8,
38
39    /// The maximum number of times an acknowledgement ID is queried for its status.
40    pub retry_limit: NonZeroU8,
41
42    /// The maximum number of pending acknowledgements from events sent to the Splunk HEC collector.
43    ///
44    /// Once reached, the sink begins applying backpressure.
45    pub max_pending_acks: NonZeroU64,
46
47    #[serde(
48        default,
49        deserialize_with = "crate::serde::bool_or_struct",
50        flatten,
51        skip_serializing_if = "crate::serde::is_default"
52    )]
53    pub inner: AcknowledgementsConfig,
54}
55
56impl Default for HecClientAcknowledgementsConfig {
57    fn default() -> Self {
58        Self {
59            indexer_acknowledgements_enabled: true,
60            query_interval: NonZeroU8::new(10).unwrap(),
61            retry_limit: NonZeroU8::new(30).unwrap(),
62            max_pending_acks: NonZeroU64::new(1_000_000).unwrap(),
63            inner: Default::default(),
64        }
65    }
66}
67
68#[derive(Deserialize, Serialize, Eq, PartialEq, Debug)]
69pub struct HecAckStatusRequest {
70    pub acks: Vec<u64>,
71}
72
73#[derive(Deserialize, Serialize, Debug)]
74pub struct HecAckStatusResponse {
75    pub acks: HashMap<u64, bool>,
76}
77
78#[derive(Debug)]
79pub enum HecAckApiError {
80    ClientBuildRequest,
81    ClientParseResponse,
82    ClientSendQuery,
83    ServerSendQuery,
84}
85
86struct HecAckClient {
87    acks: HashMap<u64, (u8, Sender<EventStatus>)>,
88    retry_limit: u8,
89    client: HttpClient,
90    http_request_builder: Arc<HttpRequestBuilder>,
91}
92
93impl HecAckClient {
94    fn new(
95        retry_limit: u8,
96        client: HttpClient,
97        http_request_builder: Arc<HttpRequestBuilder>,
98    ) -> Self {
99        // Reimplement with compression support, see https://github.com/vectordotdev/vector/issues/23748
100        let http_request_builder = Arc::new(HttpRequestBuilder {
101            compression: Compression::None,
102            ..(*http_request_builder).clone()
103        });
104
105        Self {
106            acks: HashMap::new(),
107            retry_limit,
108            client,
109            http_request_builder,
110        }
111    }
112
113    /// Adds an ack id to be queried
114    fn add(&mut self, ack_id: u64, ack_event_status_sender: Sender<EventStatus>) {
115        self.acks
116            .insert(ack_id, (self.retry_limit, ack_event_status_sender));
117        emit!(SplunkIndexerAcknowledgementAckAdded);
118    }
119
120    /// Queries Splunk HEC with stored ack ids and finalizes events that are successfully acked
121    async fn run(&mut self) {
122        let ack_query_body = self.get_ack_query_body();
123        if !ack_query_body.acks.is_empty() {
124            let ack_query_response = self.send_ack_query_request(&ack_query_body).await;
125
126            match ack_query_response {
127                Ok(ack_query_response) => {
128                    debug!(message = "Received ack statuses.", ?ack_query_response);
129                    let acked_ack_ids = ack_query_response
130                        .acks
131                        .iter()
132                        .filter(|&(_ack_id, ack_status)| *ack_status)
133                        .map(|(ack_id, _ack_status)| *ack_id)
134                        .collect::<Vec<u64>>();
135                    self.finalize_delivered_ack_ids(acked_ack_ids.as_slice());
136                    self.expire_ack_ids_with_status(EventStatus::Rejected);
137                }
138                Err(error) => {
139                    match error {
140                        HecAckApiError::ClientParseResponse | HecAckApiError::ClientSendQuery => {
141                            // If we are permanently unable to interact with
142                            // Splunk HEC indexer acknowledgements (e.g. due to
143                            // request/response format changes in future
144                            // versions), log an error and fall back to default
145                            // behavior.
146                            emit!(SplunkIndexerAcknowledgementAPIError {
147                                message: "Unable to use indexer acknowledgements. Acknowledging based on initial 200 OK.",
148                                error,
149                            });
150                            self.finalize_delivered_ack_ids(
151                                self.acks.keys().copied().collect::<Vec<_>>().as_slice(),
152                            );
153                        }
154                        _ => {
155                            emit!(SplunkIndexerAcknowledgementAPIError {
156                                message: "Unable to send acknowledgement query request. Will retry.",
157                                error,
158                            });
159                            self.expire_ack_ids_with_status(EventStatus::Errored);
160                        }
161                    }
162                }
163            };
164        }
165    }
166
167    /// Removes successfully acked ack ids and finalizes associated events
168    fn finalize_delivered_ack_ids(&mut self, ack_ids: &[u64]) {
169        let mut removed_count = 0.0;
170        for ack_id in ack_ids {
171            if let Some((_, ack_event_status_sender)) = self.acks.remove(ack_id) {
172                _ = ack_event_status_sender.send(EventStatus::Delivered);
173                removed_count += 1.0;
174                debug!(message = "Finalized ack id.", ?ack_id);
175            }
176        }
177        emit!(SplunkIndexerAcknowledgementAcksRemoved {
178            count: removed_count
179        });
180    }
181
182    /// Builds an ack query body with stored ack ids
183    fn get_ack_query_body(&mut self) -> HecAckStatusRequest {
184        HecAckStatusRequest {
185            acks: self.acks.keys().copied().collect::<Vec<u64>>(),
186        }
187    }
188
189    /// Decrements retry count on all stored ack ids by 1
190    fn decrement_retries(&mut self) {
191        for (retries, _) in self.acks.values_mut() {
192            *retries = retries.checked_sub(1).unwrap_or(0);
193        }
194    }
195
196    /// Removes all expired ack ids (those with a retry count of 0) and
197    /// finalizes associated events with the given status
198    fn expire_ack_ids_with_status(&mut self, status: EventStatus) {
199        let expired_ack_ids = self
200            .acks
201            .iter()
202            .filter_map(|(ack_id, (retries, _))| (*retries == 0).then_some(*ack_id))
203            .collect::<Vec<_>>();
204        let mut removed_count = 0.0;
205        for ack_id in expired_ack_ids {
206            if let Some((_, ack_event_status_sender)) = self.acks.remove(&ack_id) {
207                _ = ack_event_status_sender.send(status);
208                removed_count += 1.0;
209            }
210        }
211        emit!(SplunkIndexerAcknowledgementAcksRemoved {
212            count: removed_count
213        });
214    }
215
216    // Sends an ack status query request to Splunk HEC
217    async fn send_ack_query_request(
218        &mut self,
219        request_body: &HecAckStatusRequest,
220    ) -> Result<HecAckStatusResponse, HecAckApiError> {
221        self.decrement_retries();
222        let request_body_bytes = crate::serde::json::to_bytes(request_body)
223            .map_err(|_| HecAckApiError::ClientBuildRequest)?
224            .freeze();
225        let request = self
226            .http_request_builder
227            .build_request(
228                request_body_bytes,
229                "/services/collector/ack",
230                None,
231                MetadataFields::default(),
232                false,
233            )
234            .map_err(|_| HecAckApiError::ClientBuildRequest)?;
235
236        let response = self
237            .client
238            .send(request.map(Body::from))
239            .await
240            .map_err(|_| HecAckApiError::ServerSendQuery)?;
241
242        let status = response.status();
243        if status.is_success() {
244            let response_body = hyper::body::to_bytes(response.into_body())
245                .await
246                .map_err(|_| HecAckApiError::ClientParseResponse)?;
247            serde_json::from_slice::<HecAckStatusResponse>(&response_body)
248                .map_err(|_| HecAckApiError::ClientParseResponse)
249        } else if status.is_client_error() {
250            Err(HecAckApiError::ClientSendQuery)
251        } else {
252            Err(HecAckApiError::ServerSendQuery)
253        }
254    }
255}
256
257pub async fn run_acknowledgements(
258    mut receiver: Receiver<(u64, Sender<EventStatus>)>,
259    client: HttpClient,
260    http_request_builder: Arc<HttpRequestBuilder>,
261    indexer_acknowledgements: HecClientAcknowledgementsConfig,
262) {
263    let mut interval = tokio::time::interval(Duration::from_secs(
264        indexer_acknowledgements.query_interval.get() as u64,
265    ));
266    let mut ack_client = HecAckClient::new(
267        indexer_acknowledgements.retry_limit.get(),
268        client,
269        http_request_builder,
270    );
271
272    loop {
273        tokio::select! {
274            _ = interval.tick() => {
275                ack_client.run().await;
276            },
277            ack_info = receiver.recv() => {
278                match ack_info {
279                    Some((ack_id, tx)) => {
280                        ack_client.add(ack_id, tx);
281                        debug!(message = "Stored ack id.", ?ack_id);
282                    },
283                    None => break,
284                }
285            }
286        }
287    }
288}
289
290#[cfg(test)]
291mod tests {
292    use std::sync::Arc;
293
294    use futures_util::{StreamExt, stream::FuturesUnordered};
295    use tokio::sync::oneshot::{self, Receiver};
296    use vector_lib::{config::proxy::ProxyConfig, event::EventStatus};
297
298    use super::HecAckClient;
299    use crate::{
300        http::HttpClient,
301        sinks::{
302            splunk_hec::common::{
303                EndpointTarget, acknowledgements::HecAckStatusRequest, service::HttpRequestBuilder,
304            },
305            util::Compression,
306        },
307    };
308
309    fn get_ack_client(retry_limit: u8) -> HecAckClient {
310        let client = HttpClient::new(None, &ProxyConfig::default()).unwrap();
311        let http_request_builder = HttpRequestBuilder::new(
312            String::from(""),
313            EndpointTarget::default(),
314            String::from(""),
315            Compression::default(),
316        );
317        HecAckClient::new(retry_limit, client, Arc::new(http_request_builder))
318    }
319
320    fn populate_ack_client(
321        ack_client: &mut HecAckClient,
322        ack_ids: &[u64],
323    ) -> Vec<Receiver<EventStatus>> {
324        let mut ack_status_rxs = Vec::new();
325        for ack_id in ack_ids {
326            let (tx, rx) = oneshot::channel();
327            ack_client.add(*ack_id, tx);
328            ack_status_rxs.push(rx);
329        }
330        ack_status_rxs
331    }
332
333    #[test]
334    fn test_get_ack_query_body() {
335        let mut ack_client = get_ack_client(1);
336        let ack_ids = (0..100).collect::<Vec<u64>>();
337        _ = populate_ack_client(&mut ack_client, &ack_ids);
338        let expected_ack_body = HecAckStatusRequest { acks: ack_ids };
339
340        let mut ack_request_body = ack_client.get_ack_query_body();
341        ack_request_body.acks.sort_unstable();
342        assert_eq!(expected_ack_body, ack_request_body);
343    }
344
345    #[test]
346    fn test_decrement_retries() {
347        let mut ack_client = get_ack_client(1);
348        let ack_ids = (0..100).collect::<Vec<u64>>();
349        _ = populate_ack_client(&mut ack_client, &ack_ids);
350
351        let mut ack_request_body = ack_client.get_ack_query_body();
352        ack_request_body.acks.sort_unstable();
353        assert_eq!(ack_ids, ack_request_body.acks);
354        ack_client.decrement_retries();
355        ack_client.expire_ack_ids_with_status(EventStatus::Rejected);
356
357        let ack_request_body = ack_client.get_ack_query_body();
358        assert!(ack_request_body.acks.is_empty())
359    }
360
361    #[tokio::test]
362    async fn test_finalize_delivered_ack_ids() {
363        let mut ack_client = get_ack_client(1);
364        let ack_ids = (0..100).collect::<Vec<u64>>();
365        let ack_status_rxs = populate_ack_client(&mut ack_client, &ack_ids);
366
367        ack_client.finalize_delivered_ack_ids(ack_ids.as_slice());
368        let mut statuses = ack_status_rxs.into_iter().collect::<FuturesUnordered<_>>();
369        while let Some(status) = statuses.next().await {
370            assert_eq!(EventStatus::Delivered, status.unwrap());
371        }
372    }
373
374    #[tokio::test]
375    async fn test_expire_ack_ids_with_status() {
376        let mut ack_client = get_ack_client(1);
377        let ack_ids = (0..100).collect::<Vec<u64>>();
378        let ack_status_rxs = populate_ack_client(&mut ack_client, &ack_ids);
379
380        ack_client.decrement_retries();
381        ack_client.expire_ack_ids_with_status(EventStatus::Rejected);
382        let mut statuses = ack_status_rxs.into_iter().collect::<FuturesUnordered<_>>();
383        while let Some(status) = statuses.next().await {
384            assert_eq!(EventStatus::Rejected, status.unwrap());
385        }
386    }
387}