vector/sinks/splunk_hec/common/
acknowledgements.rs

1use hyper::Body;
2use serde::{Deserialize, Serialize};
3use std::io::Write;
4use std::{
5    collections::HashMap,
6    num::{NonZeroU8, NonZeroU64},
7    sync::Arc,
8    time::Duration,
9};
10use tokio::sync::{mpsc::Receiver, oneshot::Sender};
11use vector_lib::{configurable::configurable_component, event::EventStatus};
12
13use super::service::{HttpRequestBuilder, MetadataFields};
14use crate::sinks::util::Compressor;
15use crate::{
16    config::AcknowledgementsConfig,
17    http::HttpClient,
18    internal_events::{
19        SplunkIndexerAcknowledgementAPIError, SplunkIndexerAcknowledgementAckAdded,
20        SplunkIndexerAcknowledgementAcksRemoved,
21    },
22};
23
24/// Splunk HEC acknowledgement configuration.
25#[configurable_component]
26#[derive(Clone, Debug)]
27#[serde(default)]
28#[configurable(metadata(docs::advanced))]
29pub struct HecClientAcknowledgementsConfig {
30    /// Controls if the sink integrates with [Splunk HEC indexer acknowledgements][splunk_indexer_ack_docs] for end-to-end acknowledgements.
31    ///
32    /// [splunk_indexer_ack_docs]: https://docs.splunk.com/Documentation/Splunk/8.2.3/Data/AboutHECIDXAck
33    pub indexer_acknowledgements_enabled: bool,
34
35    /// The amount of time to wait between queries to the Splunk HEC indexer acknowledgement endpoint.
36    #[configurable(metadata(docs::type_unit = "seconds"))]
37    pub query_interval: NonZeroU8,
38
39    /// The maximum number of times an acknowledgement ID is queried for its status.
40    pub retry_limit: NonZeroU8,
41
42    /// The maximum number of pending acknowledgements from events sent to the Splunk HEC collector.
43    ///
44    /// Once reached, the sink begins applying backpressure.
45    pub max_pending_acks: NonZeroU64,
46
47    #[serde(
48        default,
49        deserialize_with = "crate::serde::bool_or_struct",
50        flatten,
51        skip_serializing_if = "crate::serde::is_default"
52    )]
53    pub inner: AcknowledgementsConfig,
54}
55
56impl Default for HecClientAcknowledgementsConfig {
57    fn default() -> Self {
58        Self {
59            indexer_acknowledgements_enabled: true,
60            query_interval: NonZeroU8::new(10).unwrap(),
61            retry_limit: NonZeroU8::new(30).unwrap(),
62            max_pending_acks: NonZeroU64::new(1_000_000).unwrap(),
63            inner: Default::default(),
64        }
65    }
66}
67
68#[derive(Deserialize, Serialize, Eq, PartialEq, Debug)]
69pub struct HecAckStatusRequest {
70    pub acks: Vec<u64>,
71}
72
73#[derive(Deserialize, Serialize, Debug)]
74pub struct HecAckStatusResponse {
75    pub acks: HashMap<u64, bool>,
76}
77
78#[derive(Debug)]
79pub enum HecAckApiError {
80    ClientBuildRequest,
81    ClientParseResponse,
82    ClientSendQuery,
83    ServerSendQuery,
84}
85
86struct HecAckClient {
87    acks: HashMap<u64, (u8, Sender<EventStatus>)>,
88    retry_limit: u8,
89    client: HttpClient,
90    http_request_builder: Arc<HttpRequestBuilder>,
91}
92
93impl HecAckClient {
94    fn new(
95        retry_limit: u8,
96        client: HttpClient,
97        http_request_builder: Arc<HttpRequestBuilder>,
98    ) -> Self {
99        Self {
100            acks: HashMap::new(),
101            retry_limit,
102            client,
103            http_request_builder,
104        }
105    }
106
107    /// Adds an ack id to be queried
108    fn add(&mut self, ack_id: u64, ack_event_status_sender: Sender<EventStatus>) {
109        self.acks
110            .insert(ack_id, (self.retry_limit, ack_event_status_sender));
111        emit!(SplunkIndexerAcknowledgementAckAdded);
112    }
113
114    /// Queries Splunk HEC with stored ack ids and finalizes events that are successfully acked
115    async fn run(&mut self) {
116        let ack_query_body = self.get_ack_query_body();
117        if !ack_query_body.acks.is_empty() {
118            let ack_query_response = self.send_ack_query_request(&ack_query_body).await;
119
120            match ack_query_response {
121                Ok(ack_query_response) => {
122                    debug!(message = "Received ack statuses.", ?ack_query_response);
123                    let acked_ack_ids = ack_query_response
124                        .acks
125                        .iter()
126                        .filter(|&(_ack_id, ack_status)| *ack_status)
127                        .map(|(ack_id, _ack_status)| *ack_id)
128                        .collect::<Vec<u64>>();
129                    self.finalize_delivered_ack_ids(acked_ack_ids.as_slice());
130                    self.expire_ack_ids_with_status(EventStatus::Rejected);
131                }
132                Err(error) => {
133                    match error {
134                        HecAckApiError::ClientParseResponse | HecAckApiError::ClientSendQuery => {
135                            // If we are permanently unable to interact with
136                            // Splunk HEC indexer acknowledgements (e.g. due to
137                            // request/response format changes in future
138                            // versions), log an error and fall back to default
139                            // behavior.
140                            emit!(SplunkIndexerAcknowledgementAPIError {
141                                message: "Unable to use indexer acknowledgements. Acknowledging based on initial 200 OK.",
142                                error,
143                            });
144                            self.finalize_delivered_ack_ids(
145                                self.acks.keys().copied().collect::<Vec<_>>().as_slice(),
146                            );
147                        }
148                        _ => {
149                            emit!(SplunkIndexerAcknowledgementAPIError {
150                                message: "Unable to send acknowledgement query request. Will retry.",
151                                error,
152                            });
153                            self.expire_ack_ids_with_status(EventStatus::Errored);
154                        }
155                    }
156                }
157            };
158        }
159    }
160
161    /// Removes successfully acked ack ids and finalizes associated events
162    fn finalize_delivered_ack_ids(&mut self, ack_ids: &[u64]) {
163        let mut removed_count = 0.0;
164        for ack_id in ack_ids {
165            if let Some((_, ack_event_status_sender)) = self.acks.remove(ack_id) {
166                _ = ack_event_status_sender.send(EventStatus::Delivered);
167                removed_count += 1.0;
168                debug!(message = "Finalized ack id.", ?ack_id);
169            }
170        }
171        emit!(SplunkIndexerAcknowledgementAcksRemoved {
172            count: removed_count
173        });
174    }
175
176    /// Builds an ack query body with stored ack ids
177    fn get_ack_query_body(&mut self) -> HecAckStatusRequest {
178        HecAckStatusRequest {
179            acks: self.acks.keys().copied().collect::<Vec<u64>>(),
180        }
181    }
182
183    /// Decrements retry count on all stored ack ids by 1
184    fn decrement_retries(&mut self) {
185        for (retries, _) in self.acks.values_mut() {
186            *retries = retries.checked_sub(1).unwrap_or(0);
187        }
188    }
189
190    /// Removes all expired ack ids (those with a retry count of 0) and
191    /// finalizes associated events with the given status
192    fn expire_ack_ids_with_status(&mut self, status: EventStatus) {
193        let expired_ack_ids = self
194            .acks
195            .iter()
196            .filter_map(|(ack_id, (retries, _))| (*retries == 0).then_some(*ack_id))
197            .collect::<Vec<_>>();
198        let mut removed_count = 0.0;
199        for ack_id in expired_ack_ids {
200            if let Some((_, ack_event_status_sender)) = self.acks.remove(&ack_id) {
201                _ = ack_event_status_sender.send(status);
202                removed_count += 1.0;
203            }
204        }
205        emit!(SplunkIndexerAcknowledgementAcksRemoved {
206            count: removed_count
207        });
208    }
209
210    // Sends an ack status query request to Splunk HEC
211    async fn send_ack_query_request(
212        &mut self,
213        request_body: &HecAckStatusRequest,
214    ) -> Result<HecAckStatusResponse, HecAckApiError> {
215        self.decrement_retries();
216        let request_body_bytes = crate::serde::json::to_bytes(request_body)
217            .map_err(|_| HecAckApiError::ClientBuildRequest)?
218            .freeze();
219        let mut compressor = Compressor::from(self.http_request_builder.compression);
220        compressor
221            .write_all(request_body_bytes.as_ref())
222            .map_err(|_| HecAckApiError::ClientBuildRequest)?;
223        let payload = compressor
224            .finish()
225            .map_err(|_| HecAckApiError::ClientBuildRequest)?
226            .freeze();
227        let request = self
228            .http_request_builder
229            .build_request(
230                payload,
231                "/services/collector/ack",
232                None,
233                MetadataFields::default(),
234                false,
235            )
236            .map_err(|_| HecAckApiError::ClientBuildRequest)?;
237
238        let response = self
239            .client
240            .send(request.map(Body::from))
241            .await
242            .map_err(|_| HecAckApiError::ServerSendQuery)?;
243
244        let status = response.status();
245        if status.is_success() {
246            let response_body = hyper::body::to_bytes(response.into_body())
247                .await
248                .map_err(|_| HecAckApiError::ClientParseResponse)?;
249            serde_json::from_slice::<HecAckStatusResponse>(&response_body)
250                .map_err(|_| HecAckApiError::ClientParseResponse)
251        } else if status.is_client_error() {
252            Err(HecAckApiError::ClientSendQuery)
253        } else {
254            Err(HecAckApiError::ServerSendQuery)
255        }
256    }
257}
258
259pub async fn run_acknowledgements(
260    mut receiver: Receiver<(u64, Sender<EventStatus>)>,
261    client: HttpClient,
262    http_request_builder: Arc<HttpRequestBuilder>,
263    indexer_acknowledgements: HecClientAcknowledgementsConfig,
264) {
265    let mut interval = tokio::time::interval(Duration::from_secs(
266        indexer_acknowledgements.query_interval.get() as u64,
267    ));
268    let mut ack_client = HecAckClient::new(
269        indexer_acknowledgements.retry_limit.get(),
270        client,
271        http_request_builder,
272    );
273
274    loop {
275        tokio::select! {
276            _ = interval.tick() => {
277                ack_client.run().await;
278            },
279            ack_info = receiver.recv() => {
280                match ack_info {
281                    Some((ack_id, tx)) => {
282                        ack_client.add(ack_id, tx);
283                        debug!(message = "Stored ack id.", ?ack_id);
284                    },
285                    None => break,
286                }
287            }
288        }
289    }
290}
291
292#[cfg(test)]
293mod tests {
294    use std::sync::Arc;
295
296    use futures_util::{StreamExt, stream::FuturesUnordered};
297    use tokio::sync::oneshot::{self, Receiver};
298    use vector_lib::{config::proxy::ProxyConfig, event::EventStatus};
299
300    use super::HecAckClient;
301    use crate::{
302        http::HttpClient,
303        sinks::{
304            splunk_hec::common::{
305                EndpointTarget, acknowledgements::HecAckStatusRequest, service::HttpRequestBuilder,
306            },
307            util::Compression,
308        },
309    };
310
311    fn get_ack_client(retry_limit: u8) -> HecAckClient {
312        let client = HttpClient::new(None, &ProxyConfig::default()).unwrap();
313        let http_request_builder = HttpRequestBuilder::new(
314            String::from(""),
315            EndpointTarget::default(),
316            String::from(""),
317            Compression::default(),
318        );
319        HecAckClient::new(retry_limit, client, Arc::new(http_request_builder))
320    }
321
322    fn populate_ack_client(
323        ack_client: &mut HecAckClient,
324        ack_ids: &[u64],
325    ) -> Vec<Receiver<EventStatus>> {
326        let mut ack_status_rxs = Vec::new();
327        for ack_id in ack_ids {
328            let (tx, rx) = oneshot::channel();
329            ack_client.add(*ack_id, tx);
330            ack_status_rxs.push(rx);
331        }
332        ack_status_rxs
333    }
334
335    #[test]
336    fn test_get_ack_query_body() {
337        let mut ack_client = get_ack_client(1);
338        let ack_ids = (0..100).collect::<Vec<u64>>();
339        _ = populate_ack_client(&mut ack_client, &ack_ids);
340        let expected_ack_body = HecAckStatusRequest { acks: ack_ids };
341
342        let mut ack_request_body = ack_client.get_ack_query_body();
343        ack_request_body.acks.sort_unstable();
344        assert_eq!(expected_ack_body, ack_request_body);
345    }
346
347    #[test]
348    fn test_decrement_retries() {
349        let mut ack_client = get_ack_client(1);
350        let ack_ids = (0..100).collect::<Vec<u64>>();
351        _ = populate_ack_client(&mut ack_client, &ack_ids);
352
353        let mut ack_request_body = ack_client.get_ack_query_body();
354        ack_request_body.acks.sort_unstable();
355        assert_eq!(ack_ids, ack_request_body.acks);
356        ack_client.decrement_retries();
357        ack_client.expire_ack_ids_with_status(EventStatus::Rejected);
358
359        let ack_request_body = ack_client.get_ack_query_body();
360        assert!(ack_request_body.acks.is_empty())
361    }
362
363    #[tokio::test]
364    async fn test_finalize_delivered_ack_ids() {
365        let mut ack_client = get_ack_client(1);
366        let ack_ids = (0..100).collect::<Vec<u64>>();
367        let ack_status_rxs = populate_ack_client(&mut ack_client, &ack_ids);
368
369        ack_client.finalize_delivered_ack_ids(ack_ids.as_slice());
370        let mut statuses = ack_status_rxs.into_iter().collect::<FuturesUnordered<_>>();
371        while let Some(status) = statuses.next().await {
372            assert_eq!(EventStatus::Delivered, status.unwrap());
373        }
374    }
375
376    #[tokio::test]
377    async fn test_expire_ack_ids_with_status() {
378        let mut ack_client = get_ack_client(1);
379        let ack_ids = (0..100).collect::<Vec<u64>>();
380        let ack_status_rxs = populate_ack_client(&mut ack_client, &ack_ids);
381
382        ack_client.decrement_retries();
383        ack_client.expire_ack_ids_with_status(EventStatus::Rejected);
384        let mut statuses = ack_status_rxs.into_iter().collect::<FuturesUnordered<_>>();
385        while let Some(status) = statuses.next().await {
386            assert_eq!(EventStatus::Rejected, status.unwrap());
387        }
388    }
389}