vector/sources/splunk_hec/
acknowledgements.rs

1use std::{
2    collections::HashMap,
3    num::NonZeroU64,
4    sync::{
5        Arc, Mutex, RwLock,
6        atomic::{AtomicU64, Ordering},
7    },
8    time::{Duration, Instant},
9};
10
11use futures::StreamExt;
12use roaring::RoaringTreemap;
13use serde::{Deserialize, Serialize};
14use tokio::time::interval;
15use vector_lib::{
16    configurable::configurable_component, finalization::BatchStatusReceiver,
17    finalizer::UnorderedFinalizer,
18};
19use warp::Rejection;
20
21use super::ApiError;
22use crate::{event::BatchStatus, shutdown::ShutdownSignal};
23
24/// Acknowledgement configuration for the `splunk_hec` source.
25#[configurable_component]
26#[derive(Clone, Debug)]
27#[serde(default)]
28pub struct HecAcknowledgementsConfig {
29    /// Enables end-to-end acknowledgements.
30    pub enabled: Option<bool>,
31
32    /// The maximum number of acknowledgement statuses pending query across all channels.
33    ///
34    /// Equivalent to the `max_number_of_acked_requests_pending_query` Splunk HEC setting.
35    ///
36    /// Minimum of `1`.
37    #[configurable(metadata(docs::human_name = "Max Number of Pending Acknowledgements"))]
38    pub max_pending_acks: NonZeroU64,
39
40    /// The maximum number of Splunk HEC channels clients can use with this source.
41    ///
42    /// Minimum of `1`.
43    #[configurable(metadata(docs::human_name = "Max Number of Acknowledgement Channels"))]
44    pub max_number_of_ack_channels: NonZeroU64,
45
46    /// The maximum number of acknowledgement statuses pending query for a single channel.
47    ///
48    /// Equivalent to the `max_number_of_acked_requests_pending_query_per_ack_channel` Splunk HEC setting.
49    ///
50    /// Minimum of `1`.
51    #[configurable(metadata(
52        docs::human_name = "Max Number of Pending Acknowledgements Per Channel"
53    ))]
54    pub max_pending_acks_per_channel: NonZeroU64,
55
56    /// Whether or not to remove channels after idling for `max_idle_time` seconds.
57    ///
58    /// A channel is idling if it is not used for sending data or querying acknowledgement statuses.
59    #[configurable(metadata(docs::human_name = "Acknowledgement Idle Cleanup"))]
60    pub ack_idle_cleanup: bool,
61
62    /// The amount of time, in seconds, a channel is allowed to idle before removal.
63    ///
64    /// Channels can potentially idle for longer than this setting but clients should not rely on such behavior.
65    ///
66    /// Minimum of `1`.
67    pub max_idle_time: NonZeroU64,
68}
69
70impl Default for HecAcknowledgementsConfig {
71    fn default() -> Self {
72        Self {
73            enabled: None,
74            max_pending_acks: NonZeroU64::new(10_000_000).unwrap(),
75            max_number_of_ack_channels: NonZeroU64::new(1_000_000).unwrap(),
76            max_pending_acks_per_channel: NonZeroU64::new(1_000_000).unwrap(),
77            ack_idle_cleanup: false,
78            max_idle_time: NonZeroU64::new(300).unwrap(),
79        }
80    }
81}
82
83impl From<bool> for HecAcknowledgementsConfig {
84    fn from(enabled: bool) -> Self {
85        Self {
86            enabled: Some(enabled),
87            ..Default::default()
88        }
89    }
90}
91
92pub struct IndexerAcknowledgement {
93    max_pending_acks: u64,
94    max_pending_acks_per_channel: u64,
95    max_number_of_ack_channels: u64,
96    channels: Arc<tokio::sync::Mutex<HashMap<String, Arc<Channel>>>>,
97    shutdown: ShutdownSignal,
98    total_pending_acks: AtomicU64,
99}
100
101impl IndexerAcknowledgement {
102    pub fn new(config: HecAcknowledgementsConfig, shutdown: ShutdownSignal) -> Self {
103        let channels: Arc<tokio::sync::Mutex<HashMap<String, Arc<Channel>>>> =
104            Arc::new(tokio::sync::Mutex::new(HashMap::new()));
105        let max_idle_time = u64::from(config.max_idle_time);
106        let idle_task_channels = Arc::clone(&channels);
107
108        if config.ack_idle_cleanup {
109            tokio::spawn(async move {
110                let mut interval = interval(Duration::from_secs(max_idle_time));
111                loop {
112                    interval.tick().await;
113                    let mut channels = idle_task_channels.lock().await;
114                    let now = Instant::now();
115
116                    channels.retain(|_, channel| {
117                        now.duration_since(channel.get_last_used()).as_secs() <= max_idle_time
118                    });
119                }
120            });
121        }
122
123        Self {
124            max_pending_acks: u64::from(config.max_pending_acks),
125            max_pending_acks_per_channel: u64::from(config.max_pending_acks_per_channel),
126            max_number_of_ack_channels: u64::from(config.max_number_of_ack_channels),
127            channels,
128            shutdown,
129            total_pending_acks: AtomicU64::new(0),
130        }
131    }
132
133    /// Creates a channel with the specified id if it does not exist.
134    async fn create_or_get_channel(&self, id: String) -> Result<Arc<Channel>, Rejection> {
135        let mut channels = self.channels.lock().await;
136        if let Some(channel) = channels.get(&id) {
137            return Ok(Arc::clone(channel));
138        }
139
140        if channels.len() < self.max_number_of_ack_channels as usize {
141            // Create the channel if it does not exist
142            let channel = Arc::new(Channel::new(
143                self.max_pending_acks_per_channel,
144                self.shutdown.clone(),
145            ));
146            channels.insert(id, Arc::clone(&channel));
147            Ok(channel)
148        } else {
149            Err(Rejection::from(ApiError::ServiceUnavailable))
150        }
151    }
152
153    /// Gets the next available ack id from a specified channel, creating the channel if it does not exist
154    pub async fn get_ack_id_from_channel(
155        &self,
156        channel_id: String,
157        batch_rx: BatchStatusReceiver,
158    ) -> Result<u64, Rejection> {
159        let channel = self.create_or_get_channel(channel_id).await?;
160        let total_pending_acks = self.total_pending_acks.fetch_add(1, Ordering::Relaxed) + 1;
161        if total_pending_acks > self.max_pending_acks
162            && !self.drop_oldest_pending_ack_from_channels().await
163        {
164            self.total_pending_acks.fetch_sub(1, Ordering::Relaxed);
165            return Err(Rejection::from(ApiError::ServiceUnavailable));
166        }
167
168        let ack_id = channel.get_ack_id(batch_rx);
169        Ok(ack_id)
170    }
171
172    /// Gets the requested ack id statuses from a specified channel, creating the channel if it does not exist
173    pub async fn get_acks_status_from_channel(
174        &self,
175        channel_id: String,
176        ack_ids: &[u64],
177    ) -> Result<HashMap<u64, bool>, Rejection> {
178        let channel = self.create_or_get_channel(channel_id).await?;
179        let acks_status = channel.get_acks_status(ack_ids);
180        let dropped_ack_count = acks_status.values().filter(|status| **status).count();
181        self.total_pending_acks
182            .fetch_sub(dropped_ack_count as u64, Ordering::Relaxed);
183        Ok(acks_status)
184    }
185
186    /// Drops the oldest ack id (if one exists) across all channels
187    async fn drop_oldest_pending_ack_from_channels(&self) -> bool {
188        let channels = self.channels.lock().await;
189        let mut ordered_channels = channels.values().collect::<Vec<_>>();
190        ordered_channels.sort_by_key(|a| a.get_last_used());
191        ordered_channels
192            .iter()
193            .any(|channel| channel.drop_oldest_pending_ack())
194    }
195}
196
197pub struct Channel {
198    last_used_timestamp: RwLock<Instant>,
199    currently_available_ack_id: AtomicU64,
200    ack_ids_status: Arc<Mutex<RoaringTreemap>>,
201    ack_event_finalizer: UnorderedFinalizer<u64>,
202}
203
204impl Channel {
205    fn new(max_pending_acks_per_channel: u64, shutdown: ShutdownSignal) -> Self {
206        let ack_ids_status = Arc::new(Mutex::new(RoaringTreemap::new()));
207        let finalizer_ack_ids_status = Arc::clone(&ack_ids_status);
208        let (ack_event_finalizer, mut ack_stream) = UnorderedFinalizer::new(Some(shutdown));
209        tokio::spawn(async move {
210            while let Some((status, ack_id)) = ack_stream.next().await {
211                if status == BatchStatus::Delivered {
212                    let mut ack_ids_status = finalizer_ack_ids_status.lock().unwrap();
213                    ack_ids_status.insert(ack_id);
214                    if ack_ids_status.len() > max_pending_acks_per_channel {
215                        match ack_ids_status.min() {
216                            Some(min) => ack_ids_status.remove(min),
217                            // max pending acks per channel is guaranteed to be >= 1,
218                            // thus there must be at least one ack id available to remove
219                            None => unreachable!(
220                                "Indexer acknowledgements channel must allow at least one pending ack"
221                            ),
222                        };
223                    }
224                }
225            }
226        });
227
228        Self {
229            last_used_timestamp: RwLock::new(Instant::now()),
230            currently_available_ack_id: AtomicU64::new(0),
231            ack_ids_status,
232            ack_event_finalizer,
233        }
234    }
235
236    fn get_ack_id(&self, batch_rx: BatchStatusReceiver) -> u64 {
237        {
238            let mut last_used_timestamp = self.last_used_timestamp.write().unwrap();
239            *last_used_timestamp = Instant::now();
240        }
241        let ack_id = self
242            .currently_available_ack_id
243            .fetch_add(1, Ordering::Relaxed);
244        self.ack_event_finalizer.add(ack_id, batch_rx);
245        ack_id
246    }
247
248    fn get_acks_status(&self, acks: &[u64]) -> HashMap<u64, bool> {
249        {
250            let mut last_used_timestamp = self.last_used_timestamp.write().unwrap();
251            *last_used_timestamp = Instant::now();
252        }
253        let mut ack_ids_status = self.ack_ids_status.lock().unwrap();
254        acks.iter()
255            .map(|ack_id| (*ack_id, ack_ids_status.remove(*ack_id)))
256            .collect()
257    }
258
259    fn get_last_used(&self) -> Instant {
260        let last_used_timestamp = self.last_used_timestamp.read().unwrap();
261        *last_used_timestamp
262    }
263
264    fn drop_oldest_pending_ack(&self) -> bool {
265        let mut ack_ids_status = self.ack_ids_status.lock().unwrap();
266        match ack_ids_status.min() {
267            Some(ack_id) => ack_ids_status.remove(ack_id),
268            None => false,
269        }
270    }
271}
272
273#[derive(Deserialize, Serialize, Debug)]
274pub struct HecAckStatusRequest {
275    pub acks: Vec<u64>,
276}
277
278#[derive(Deserialize, Serialize, Debug)]
279pub struct HecAckStatusResponse {
280    pub acks: HashMap<u64, bool>,
281}
282
283#[cfg(test)]
284mod tests {
285    use std::num::NonZeroU64;
286
287    use tokio::{time, time::sleep};
288    use vector_lib::event::{BatchNotifier, EventFinalizer, EventStatus};
289
290    use super::{Channel, HecAcknowledgementsConfig, IndexerAcknowledgement};
291    use crate::shutdown::ShutdownSignal;
292
293    #[tokio::test]
294    async fn test_channel_get_ack_id_and_acks_status() {
295        channel_get_ack_id_and_status(EventStatus::Delivered, true).await;
296    }
297
298    #[tokio::test]
299    async fn test_channel_get_ack_id_and_nacks_status() {
300        channel_get_ack_id_and_status(EventStatus::Rejected, false).await;
301    }
302
303    async fn channel_get_ack_id_and_status(status: EventStatus, result: bool) {
304        let shutdown = ShutdownSignal::noop();
305        let max_pending_acks_per_channel = 10;
306        let channel = Channel::new(max_pending_acks_per_channel, shutdown);
307        let expected_ack_ids: Vec<u64> = (0..10).collect();
308
309        for expected_ack_id in &expected_ack_ids {
310            let (tx, batch_rx) = BatchNotifier::new_with_receiver();
311            assert_eq!(*expected_ack_id, channel.get_ack_id(batch_rx));
312            EventFinalizer::new(tx).update_status(status);
313        }
314        // Let the ack finalizer task run
315        sleep(time::Duration::from_secs(1)).await;
316        assert!(
317            channel
318                .get_acks_status(&expected_ack_ids)
319                .values()
320                .all(|&status| status == result)
321        );
322    }
323
324    #[tokio::test]
325    async fn test_channel_get_acks_status_repeat() {
326        let shutdown = ShutdownSignal::noop();
327        let max_pending_acks_per_channel = 10;
328        let channel = Channel::new(max_pending_acks_per_channel, shutdown);
329        let expected_ack_ids: Vec<u64> = (0..10).collect();
330
331        for expected_ack_id in &expected_ack_ids {
332            let (_tx, batch_rx) = BatchNotifier::new_with_receiver();
333            assert_eq!(*expected_ack_id, channel.get_ack_id(batch_rx));
334        }
335        // Let the ack finalizer task run
336        sleep(time::Duration::from_secs(1)).await;
337        assert!(
338            channel
339                .get_acks_status(&expected_ack_ids)
340                .values()
341                .all(|status| *status)
342        );
343        // Subsequent queries for the same ackId's should result in false
344        assert!(
345            channel
346                .get_acks_status(&expected_ack_ids)
347                .values()
348                .all(|status| !*status)
349        );
350    }
351
352    #[tokio::test]
353    async fn test_channel_get_ack_id_exceed_max_pending_acks_per_channel() {
354        let shutdown = ShutdownSignal::noop();
355        let max_pending_acks_per_channel = 10;
356        let channel = Channel::new(max_pending_acks_per_channel, shutdown);
357        let dropped_pending_ack_ids: Vec<u64> = (0..10).collect();
358        let expected_ack_ids: Vec<u64> = (10..20).collect();
359
360        for ack_id in dropped_pending_ack_ids
361            .iter()
362            .chain(expected_ack_ids.iter())
363        {
364            let (_tx, batch_rx) = BatchNotifier::new_with_receiver();
365            assert_eq!(*ack_id, channel.get_ack_id(batch_rx));
366        }
367        // Let the ack finalizer task run
368        sleep(time::Duration::from_secs(1)).await;
369        // The first 10 pending ack ids are dropped
370        assert!(
371            channel
372                .get_acks_status(&dropped_pending_ack_ids)
373                .values()
374                .all(|status| !*status)
375        );
376        // The second 10 pending ack ids can be queried
377        assert!(
378            channel
379                .get_acks_status(&expected_ack_ids)
380                .values()
381                .all(|status| *status)
382        );
383    }
384
385    #[tokio::test]
386    async fn test_indexer_ack_exceed_max_pending_acks_drop_acks() {
387        let shutdown = ShutdownSignal::noop();
388        let config = HecAcknowledgementsConfig {
389            enabled: Some(true),
390            max_pending_acks: NonZeroU64::new(10).unwrap(),
391            ..Default::default()
392        };
393        let idx_ack = IndexerAcknowledgement::new(config, shutdown);
394        let channel = String::from("channel-id");
395
396        let dropped_pending_ack_ids: Vec<u64> = (0..10).collect();
397        let expected_ack_ids: Vec<u64> = (10..20).collect();
398
399        for ack_id in dropped_pending_ack_ids
400            .iter()
401            .chain(expected_ack_ids.iter())
402        {
403            let (_tx, batch_rx) = BatchNotifier::new_with_receiver();
404            assert_eq!(
405                *ack_id,
406                idx_ack
407                    .get_ack_id_from_channel(channel.clone(), batch_rx)
408                    .await
409                    .unwrap()
410            );
411            sleep(time::Duration::from_millis(100)).await;
412        }
413        sleep(time::Duration::from_secs(1)).await;
414        assert!(
415            idx_ack
416                .get_acks_status_from_channel(channel.clone(), &dropped_pending_ack_ids)
417                .await
418                .unwrap()
419                .values()
420                .all(|status| !*status)
421        );
422        assert!(
423            idx_ack
424                .get_acks_status_from_channel(channel, &expected_ack_ids)
425                .await
426                .unwrap()
427                .values()
428                .all(|status| *status)
429        );
430    }
431
432    #[tokio::test]
433    async fn test_indexer_ack_exceed_max_pending_acks_server_busy() {
434        let shutdown = ShutdownSignal::noop();
435        let config = HecAcknowledgementsConfig {
436            enabled: Some(true),
437            max_pending_acks: NonZeroU64::new(1).unwrap(),
438            ..Default::default()
439        };
440        let idx_ack = IndexerAcknowledgement::new(config, shutdown);
441        let channel = String::from("channel-id");
442
443        let (_tx, batch_rx) = BatchNotifier::new_with_receiver();
444        idx_ack
445            .get_ack_id_from_channel(channel.clone(), batch_rx)
446            .await
447            .unwrap();
448
449        let (_tx, batch_rx) = BatchNotifier::new_with_receiver();
450        assert!(
451            idx_ack
452                .get_ack_id_from_channel(channel.clone(), batch_rx)
453                .await
454                .is_err()
455        );
456    }
457
458    #[tokio::test]
459    async fn test_indexer_ack_create_channels() {
460        let shutdown = ShutdownSignal::noop();
461        let config = HecAcknowledgementsConfig {
462            enabled: Some(true),
463            ..Default::default()
464        };
465        let idx_ack = IndexerAcknowledgement::new(config, shutdown);
466
467        let channel_one = idx_ack
468            .create_or_get_channel(String::from("channel-id-1"))
469            .await
470            .unwrap();
471        let channel_two = idx_ack
472            .create_or_get_channel(String::from("channel-id-2"))
473            .await
474            .unwrap();
475
476        let (_tx, batch_rx) = BatchNotifier::new_with_receiver();
477        let channel_one_ack_id = channel_one.get_ack_id(batch_rx);
478        drop(_tx);
479        let (_tx, batch_rx) = BatchNotifier::new_with_receiver();
480        let channel_two_ack_id = channel_two.get_ack_id(batch_rx);
481        drop(_tx);
482
483        assert_eq!(0, channel_one_ack_id);
484        assert_eq!(0, channel_two_ack_id);
485    }
486
487    #[tokio::test]
488    async fn test_indexer_ack_create_channels_exceed_max_number_of_ack_channels() {
489        let shutdown = ShutdownSignal::noop();
490        let config = HecAcknowledgementsConfig {
491            enabled: Some(true),
492            max_number_of_ack_channels: NonZeroU64::new(1).unwrap(),
493            ..Default::default()
494        };
495        let idx_ack = IndexerAcknowledgement::new(config, shutdown);
496
497        let _channel_one = idx_ack
498            .create_or_get_channel(String::from("channel-id-1"))
499            .await
500            .unwrap();
501
502        assert!(
503            idx_ack
504                .create_or_get_channel(String::from("channel-id-2"))
505                .await
506                .is_err()
507        );
508    }
509
510    #[tokio::test]
511    async fn test_indexer_ack_channel_idle_expiration() {
512        let shutdown = ShutdownSignal::noop();
513        let config = HecAcknowledgementsConfig {
514            enabled: Some(true),
515            max_number_of_ack_channels: NonZeroU64::new(1).unwrap(),
516            ack_idle_cleanup: true,
517            max_idle_time: NonZeroU64::new(1).unwrap(),
518            ..Default::default()
519        };
520        let idx_ack = IndexerAcknowledgement::new(config, shutdown);
521        let _channel = idx_ack
522            .create_or_get_channel(String::from("channel-id-1"))
523            .await
524            .unwrap();
525        // Allow channel to expire and free up the max channel limit of 1
526        sleep(time::Duration::from_secs(3)).await;
527        assert!(
528            idx_ack
529                .create_or_get_channel(String::from("channel-id-2"))
530                .await
531                .is_ok()
532        );
533    }
534
535    #[tokio::test]
536    async fn test_indexer_ack_channel_active_does_not_expire() {
537        let shutdown = ShutdownSignal::noop();
538        let config = HecAcknowledgementsConfig {
539            enabled: Some(true),
540            ack_idle_cleanup: true,
541            max_idle_time: NonZeroU64::new(2).unwrap(),
542            ..Default::default()
543        };
544        let idx_ack = IndexerAcknowledgement::new(config, shutdown);
545        let channel = String::from("channel-id");
546        let expected_ack_ids: Vec<u64> = (0..10).collect();
547
548        for expected_ack_id in &expected_ack_ids {
549            let (_tx, batch_rx) = BatchNotifier::new_with_receiver();
550            assert_eq!(
551                *expected_ack_id,
552                idx_ack
553                    .get_ack_id_from_channel(channel.clone(), batch_rx)
554                    .await
555                    .unwrap()
556            );
557        }
558        sleep(time::Duration::from_secs(2)).await;
559        assert!(
560            idx_ack
561                .get_acks_status_from_channel(channel.clone(), &expected_ack_ids)
562                .await
563                .unwrap()
564                .values()
565                .all(|status| *status)
566        );
567    }
568}