vector/sources/splunk_hec/
acknowledgements.rs

1use std::{
2    collections::HashMap,
3    num::NonZeroU64,
4    sync::{
5        atomic::{AtomicU64, Ordering},
6        Arc, Mutex, RwLock,
7    },
8    time::{Duration, Instant},
9};
10
11use futures::StreamExt;
12use roaring::RoaringTreemap;
13use serde::{Deserialize, Serialize};
14use tokio::time::interval;
15use vector_lib::configurable::configurable_component;
16use vector_lib::{finalization::BatchStatusReceiver, finalizer::UnorderedFinalizer};
17use warp::Rejection;
18
19use super::ApiError;
20use crate::{event::BatchStatus, shutdown::ShutdownSignal};
21
22/// Acknowledgement configuration for the `splunk_hec` source.
23#[configurable_component]
24#[derive(Clone, Debug)]
25#[serde(default)]
26pub struct HecAcknowledgementsConfig {
27    /// Enables end-to-end acknowledgements.
28    pub enabled: Option<bool>,
29
30    /// The maximum number of acknowledgement statuses pending query across all channels.
31    ///
32    /// Equivalent to the `max_number_of_acked_requests_pending_query` Splunk HEC setting.
33    ///
34    /// Minimum of `1`.
35    #[configurable(metadata(docs::human_name = "Max Number of Pending Acknowledgements"))]
36    pub max_pending_acks: NonZeroU64,
37
38    /// The maximum number of Splunk HEC channels clients can use with this source.
39    ///
40    /// Minimum of `1`.
41    #[configurable(metadata(docs::human_name = "Max Number of Acknowledgement Channels"))]
42    pub max_number_of_ack_channels: NonZeroU64,
43
44    /// The maximum number of acknowledgement statuses pending query for a single channel.
45    ///
46    /// Equivalent to the `max_number_of_acked_requests_pending_query_per_ack_channel` Splunk HEC setting.
47    ///
48    /// Minimum of `1`.
49    #[configurable(metadata(
50        docs::human_name = "Max Number of Pending Acknowledgements Per Channel"
51    ))]
52    pub max_pending_acks_per_channel: NonZeroU64,
53
54    /// Whether or not to remove channels after idling for `max_idle_time` seconds.
55    ///
56    /// A channel is idling if it is not used for sending data or querying acknowledgement statuses.
57    #[configurable(metadata(docs::human_name = "Acknowledgement Idle Cleanup"))]
58    pub ack_idle_cleanup: bool,
59
60    /// The amount of time, in seconds, a channel is allowed to idle before removal.
61    ///
62    /// Channels can potentially idle for longer than this setting but clients should not rely on such behavior.
63    ///
64    /// Minimum of `1`.
65    pub max_idle_time: NonZeroU64,
66}
67
68impl Default for HecAcknowledgementsConfig {
69    fn default() -> Self {
70        Self {
71            enabled: None,
72            max_pending_acks: NonZeroU64::new(10_000_000).unwrap(),
73            max_number_of_ack_channels: NonZeroU64::new(1_000_000).unwrap(),
74            max_pending_acks_per_channel: NonZeroU64::new(1_000_000).unwrap(),
75            ack_idle_cleanup: false,
76            max_idle_time: NonZeroU64::new(300).unwrap(),
77        }
78    }
79}
80
81impl From<bool> for HecAcknowledgementsConfig {
82    fn from(enabled: bool) -> Self {
83        Self {
84            enabled: Some(enabled),
85            ..Default::default()
86        }
87    }
88}
89
90pub struct IndexerAcknowledgement {
91    max_pending_acks: u64,
92    max_pending_acks_per_channel: u64,
93    max_number_of_ack_channels: u64,
94    channels: Arc<tokio::sync::Mutex<HashMap<String, Arc<Channel>>>>,
95    shutdown: ShutdownSignal,
96    total_pending_acks: AtomicU64,
97}
98
99impl IndexerAcknowledgement {
100    pub fn new(config: HecAcknowledgementsConfig, shutdown: ShutdownSignal) -> Self {
101        let channels: Arc<tokio::sync::Mutex<HashMap<String, Arc<Channel>>>> =
102            Arc::new(tokio::sync::Mutex::new(HashMap::new()));
103        let max_idle_time = u64::from(config.max_idle_time);
104        let idle_task_channels = Arc::clone(&channels);
105
106        if config.ack_idle_cleanup {
107            tokio::spawn(async move {
108                let mut interval = interval(Duration::from_secs(max_idle_time));
109                loop {
110                    interval.tick().await;
111                    let mut channels = idle_task_channels.lock().await;
112                    let now = Instant::now();
113
114                    channels.retain(|_, channel| {
115                        now.duration_since(channel.get_last_used()).as_secs() <= max_idle_time
116                    });
117                }
118            });
119        }
120
121        Self {
122            max_pending_acks: u64::from(config.max_pending_acks),
123            max_pending_acks_per_channel: u64::from(config.max_pending_acks_per_channel),
124            max_number_of_ack_channels: u64::from(config.max_number_of_ack_channels),
125            channels,
126            shutdown,
127            total_pending_acks: AtomicU64::new(0),
128        }
129    }
130
131    /// Creates a channel with the specified id if it does not exist.
132    async fn create_or_get_channel(&self, id: String) -> Result<Arc<Channel>, Rejection> {
133        let mut channels = self.channels.lock().await;
134        if let Some(channel) = channels.get(&id) {
135            return Ok(Arc::clone(channel));
136        }
137
138        if channels.len() < self.max_number_of_ack_channels as usize {
139            // Create the channel if it does not exist
140            let channel = Arc::new(Channel::new(
141                self.max_pending_acks_per_channel,
142                self.shutdown.clone(),
143            ));
144            channels.insert(id, Arc::clone(&channel));
145            Ok(channel)
146        } else {
147            Err(Rejection::from(ApiError::ServiceUnavailable))
148        }
149    }
150
151    /// Gets the next available ack id from a specified channel, creating the channel if it does not exist
152    pub async fn get_ack_id_from_channel(
153        &self,
154        channel_id: String,
155        batch_rx: BatchStatusReceiver,
156    ) -> Result<u64, Rejection> {
157        let channel = self.create_or_get_channel(channel_id).await?;
158        let total_pending_acks = self.total_pending_acks.fetch_add(1, Ordering::Relaxed) + 1;
159        if total_pending_acks > self.max_pending_acks
160            && !self.drop_oldest_pending_ack_from_channels().await
161        {
162            self.total_pending_acks.fetch_sub(1, Ordering::Relaxed);
163            return Err(Rejection::from(ApiError::ServiceUnavailable));
164        }
165
166        let ack_id = channel.get_ack_id(batch_rx);
167        Ok(ack_id)
168    }
169
170    /// Gets the requested ack id statuses from a specified channel, creating the channel if it does not exist
171    pub async fn get_acks_status_from_channel(
172        &self,
173        channel_id: String,
174        ack_ids: &[u64],
175    ) -> Result<HashMap<u64, bool>, Rejection> {
176        let channel = self.create_or_get_channel(channel_id).await?;
177        let acks_status = channel.get_acks_status(ack_ids);
178        let dropped_ack_count = acks_status.values().filter(|status| **status).count();
179        self.total_pending_acks
180            .fetch_sub(dropped_ack_count as u64, Ordering::Relaxed);
181        Ok(acks_status)
182    }
183
184    /// Drops the oldest ack id (if one exists) across all channels
185    async fn drop_oldest_pending_ack_from_channels(&self) -> bool {
186        let channels = self.channels.lock().await;
187        let mut ordered_channels = channels.values().collect::<Vec<_>>();
188        ordered_channels.sort_by_key(|a| a.get_last_used());
189        ordered_channels
190            .iter()
191            .any(|channel| channel.drop_oldest_pending_ack())
192    }
193}
194
195pub struct Channel {
196    last_used_timestamp: RwLock<Instant>,
197    currently_available_ack_id: AtomicU64,
198    ack_ids_status: Arc<Mutex<RoaringTreemap>>,
199    ack_event_finalizer: UnorderedFinalizer<u64>,
200}
201
202impl Channel {
203    fn new(max_pending_acks_per_channel: u64, shutdown: ShutdownSignal) -> Self {
204        let ack_ids_status = Arc::new(Mutex::new(RoaringTreemap::new()));
205        let finalizer_ack_ids_status = Arc::clone(&ack_ids_status);
206        let (ack_event_finalizer, mut ack_stream) = UnorderedFinalizer::new(Some(shutdown));
207        tokio::spawn(async move {
208            while let Some((status, ack_id)) = ack_stream.next().await {
209                if status == BatchStatus::Delivered {
210                    let mut ack_ids_status = finalizer_ack_ids_status.lock().unwrap();
211                    ack_ids_status.insert(ack_id);
212                    if ack_ids_status.len() > max_pending_acks_per_channel {
213                        match ack_ids_status.min() {
214                            Some(min) => ack_ids_status.remove(min),
215                            // max pending acks per channel is guaranteed to be >= 1,
216                            // thus there must be at least one ack id available to remove
217                            None => unreachable!(
218                                "Indexer acknowledgements channel must allow at least one pending ack"
219                            ),
220                        };
221                    }
222                }
223            }
224        });
225
226        Self {
227            last_used_timestamp: RwLock::new(Instant::now()),
228            currently_available_ack_id: AtomicU64::new(0),
229            ack_ids_status,
230            ack_event_finalizer,
231        }
232    }
233
234    fn get_ack_id(&self, batch_rx: BatchStatusReceiver) -> u64 {
235        {
236            let mut last_used_timestamp = self.last_used_timestamp.write().unwrap();
237            *last_used_timestamp = Instant::now();
238        }
239        let ack_id = self
240            .currently_available_ack_id
241            .fetch_add(1, Ordering::Relaxed);
242        self.ack_event_finalizer.add(ack_id, batch_rx);
243        ack_id
244    }
245
246    fn get_acks_status(&self, acks: &[u64]) -> HashMap<u64, bool> {
247        {
248            let mut last_used_timestamp = self.last_used_timestamp.write().unwrap();
249            *last_used_timestamp = Instant::now();
250        }
251        let mut ack_ids_status = self.ack_ids_status.lock().unwrap();
252        acks.iter()
253            .map(|ack_id| (*ack_id, ack_ids_status.remove(*ack_id)))
254            .collect()
255    }
256
257    fn get_last_used(&self) -> Instant {
258        let last_used_timestamp = self.last_used_timestamp.read().unwrap();
259        *last_used_timestamp
260    }
261
262    fn drop_oldest_pending_ack(&self) -> bool {
263        let mut ack_ids_status = self.ack_ids_status.lock().unwrap();
264        match ack_ids_status.min() {
265            Some(ack_id) => ack_ids_status.remove(ack_id),
266            None => false,
267        }
268    }
269}
270
271#[derive(Deserialize, Serialize, Debug)]
272pub struct HecAckStatusRequest {
273    pub acks: Vec<u64>,
274}
275
276#[derive(Deserialize, Serialize, Debug)]
277pub struct HecAckStatusResponse {
278    pub acks: HashMap<u64, bool>,
279}
280
281#[cfg(test)]
282mod tests {
283    use std::num::NonZeroU64;
284
285    use tokio::{time, time::sleep};
286    use vector_lib::event::{BatchNotifier, EventFinalizer, EventStatus};
287
288    use super::{Channel, HecAcknowledgementsConfig, IndexerAcknowledgement};
289    use crate::shutdown::ShutdownSignal;
290
291    #[tokio::test]
292    async fn test_channel_get_ack_id_and_acks_status() {
293        channel_get_ack_id_and_status(EventStatus::Delivered, true).await;
294    }
295
296    #[tokio::test]
297    async fn test_channel_get_ack_id_and_nacks_status() {
298        channel_get_ack_id_and_status(EventStatus::Rejected, false).await;
299    }
300
301    async fn channel_get_ack_id_and_status(status: EventStatus, result: bool) {
302        let shutdown = ShutdownSignal::noop();
303        let max_pending_acks_per_channel = 10;
304        let channel = Channel::new(max_pending_acks_per_channel, shutdown);
305        let expected_ack_ids: Vec<u64> = (0..10).collect();
306
307        for expected_ack_id in &expected_ack_ids {
308            let (tx, batch_rx) = BatchNotifier::new_with_receiver();
309            assert_eq!(*expected_ack_id, channel.get_ack_id(batch_rx));
310            EventFinalizer::new(tx).update_status(status);
311        }
312        // Let the ack finalizer task run
313        sleep(time::Duration::from_secs(1)).await;
314        assert!(channel
315            .get_acks_status(&expected_ack_ids)
316            .values()
317            .all(|&status| status == result));
318    }
319
320    #[tokio::test]
321    async fn test_channel_get_acks_status_repeat() {
322        let shutdown = ShutdownSignal::noop();
323        let max_pending_acks_per_channel = 10;
324        let channel = Channel::new(max_pending_acks_per_channel, shutdown);
325        let expected_ack_ids: Vec<u64> = (0..10).collect();
326
327        for expected_ack_id in &expected_ack_ids {
328            let (_tx, batch_rx) = BatchNotifier::new_with_receiver();
329            assert_eq!(*expected_ack_id, channel.get_ack_id(batch_rx));
330        }
331        // Let the ack finalizer task run
332        sleep(time::Duration::from_secs(1)).await;
333        assert!(channel
334            .get_acks_status(&expected_ack_ids)
335            .values()
336            .all(|status| *status));
337        // Subsequent queries for the same ackId's should result in false
338        assert!(channel
339            .get_acks_status(&expected_ack_ids)
340            .values()
341            .all(|status| !*status));
342    }
343
344    #[tokio::test]
345    async fn test_channel_get_ack_id_exceed_max_pending_acks_per_channel() {
346        let shutdown = ShutdownSignal::noop();
347        let max_pending_acks_per_channel = 10;
348        let channel = Channel::new(max_pending_acks_per_channel, shutdown);
349        let dropped_pending_ack_ids: Vec<u64> = (0..10).collect();
350        let expected_ack_ids: Vec<u64> = (10..20).collect();
351
352        for ack_id in dropped_pending_ack_ids
353            .iter()
354            .chain(expected_ack_ids.iter())
355        {
356            let (_tx, batch_rx) = BatchNotifier::new_with_receiver();
357            assert_eq!(*ack_id, channel.get_ack_id(batch_rx));
358        }
359        // Let the ack finalizer task run
360        sleep(time::Duration::from_secs(1)).await;
361        // The first 10 pending ack ids are dropped
362        assert!(channel
363            .get_acks_status(&dropped_pending_ack_ids)
364            .values()
365            .all(|status| !*status));
366        // The second 10 pending ack ids can be queried
367        assert!(channel
368            .get_acks_status(&expected_ack_ids)
369            .values()
370            .all(|status| *status));
371    }
372
373    #[tokio::test]
374    async fn test_indexer_ack_exceed_max_pending_acks_drop_acks() {
375        let shutdown = ShutdownSignal::noop();
376        let config = HecAcknowledgementsConfig {
377            enabled: Some(true),
378            max_pending_acks: NonZeroU64::new(10).unwrap(),
379            ..Default::default()
380        };
381        let idx_ack = IndexerAcknowledgement::new(config, shutdown);
382        let channel = String::from("channel-id");
383
384        let dropped_pending_ack_ids: Vec<u64> = (0..10).collect();
385        let expected_ack_ids: Vec<u64> = (10..20).collect();
386
387        for ack_id in dropped_pending_ack_ids
388            .iter()
389            .chain(expected_ack_ids.iter())
390        {
391            let (_tx, batch_rx) = BatchNotifier::new_with_receiver();
392            assert_eq!(
393                *ack_id,
394                idx_ack
395                    .get_ack_id_from_channel(channel.clone(), batch_rx)
396                    .await
397                    .unwrap()
398            );
399            sleep(time::Duration::from_millis(100)).await;
400        }
401        sleep(time::Duration::from_secs(1)).await;
402        assert!(idx_ack
403            .get_acks_status_from_channel(channel.clone(), &dropped_pending_ack_ids)
404            .await
405            .unwrap()
406            .values()
407            .all(|status| !*status));
408        assert!(idx_ack
409            .get_acks_status_from_channel(channel, &expected_ack_ids)
410            .await
411            .unwrap()
412            .values()
413            .all(|status| *status));
414    }
415
416    #[tokio::test]
417    async fn test_indexer_ack_exceed_max_pending_acks_server_busy() {
418        let shutdown = ShutdownSignal::noop();
419        let config = HecAcknowledgementsConfig {
420            enabled: Some(true),
421            max_pending_acks: NonZeroU64::new(1).unwrap(),
422            ..Default::default()
423        };
424        let idx_ack = IndexerAcknowledgement::new(config, shutdown);
425        let channel = String::from("channel-id");
426
427        let (_tx, batch_rx) = BatchNotifier::new_with_receiver();
428        idx_ack
429            .get_ack_id_from_channel(channel.clone(), batch_rx)
430            .await
431            .unwrap();
432
433        let (_tx, batch_rx) = BatchNotifier::new_with_receiver();
434        assert!(idx_ack
435            .get_ack_id_from_channel(channel.clone(), batch_rx)
436            .await
437            .is_err());
438    }
439
440    #[tokio::test]
441    async fn test_indexer_ack_create_channels() {
442        let shutdown = ShutdownSignal::noop();
443        let config = HecAcknowledgementsConfig {
444            enabled: Some(true),
445            ..Default::default()
446        };
447        let idx_ack = IndexerAcknowledgement::new(config, shutdown);
448
449        let channel_one = idx_ack
450            .create_or_get_channel(String::from("channel-id-1"))
451            .await
452            .unwrap();
453        let channel_two = idx_ack
454            .create_or_get_channel(String::from("channel-id-2"))
455            .await
456            .unwrap();
457
458        let (_tx, batch_rx) = BatchNotifier::new_with_receiver();
459        let channel_one_ack_id = channel_one.get_ack_id(batch_rx);
460        drop(_tx);
461        let (_tx, batch_rx) = BatchNotifier::new_with_receiver();
462        let channel_two_ack_id = channel_two.get_ack_id(batch_rx);
463        drop(_tx);
464
465        assert_eq!(0, channel_one_ack_id);
466        assert_eq!(0, channel_two_ack_id);
467    }
468
469    #[tokio::test]
470    async fn test_indexer_ack_create_channels_exceed_max_number_of_ack_channels() {
471        let shutdown = ShutdownSignal::noop();
472        let config = HecAcknowledgementsConfig {
473            enabled: Some(true),
474            max_number_of_ack_channels: NonZeroU64::new(1).unwrap(),
475            ..Default::default()
476        };
477        let idx_ack = IndexerAcknowledgement::new(config, shutdown);
478
479        let _channel_one = idx_ack
480            .create_or_get_channel(String::from("channel-id-1"))
481            .await
482            .unwrap();
483
484        assert!(idx_ack
485            .create_or_get_channel(String::from("channel-id-2"))
486            .await
487            .is_err());
488    }
489
490    #[tokio::test]
491    async fn test_indexer_ack_channel_idle_expiration() {
492        let shutdown = ShutdownSignal::noop();
493        let config = HecAcknowledgementsConfig {
494            enabled: Some(true),
495            max_number_of_ack_channels: NonZeroU64::new(1).unwrap(),
496            ack_idle_cleanup: true,
497            max_idle_time: NonZeroU64::new(1).unwrap(),
498            ..Default::default()
499        };
500        let idx_ack = IndexerAcknowledgement::new(config, shutdown);
501        let _channel = idx_ack
502            .create_or_get_channel(String::from("channel-id-1"))
503            .await
504            .unwrap();
505        // Allow channel to expire and free up the max channel limit of 1
506        sleep(time::Duration::from_secs(3)).await;
507        assert!(idx_ack
508            .create_or_get_channel(String::from("channel-id-2"))
509            .await
510            .is_ok());
511    }
512
513    #[tokio::test]
514    async fn test_indexer_ack_channel_active_does_not_expire() {
515        let shutdown = ShutdownSignal::noop();
516        let config = HecAcknowledgementsConfig {
517            enabled: Some(true),
518            ack_idle_cleanup: true,
519            max_idle_time: NonZeroU64::new(2).unwrap(),
520            ..Default::default()
521        };
522        let idx_ack = IndexerAcknowledgement::new(config, shutdown);
523        let channel = String::from("channel-id");
524        let expected_ack_ids: Vec<u64> = (0..10).collect();
525
526        for expected_ack_id in &expected_ack_ids {
527            let (_tx, batch_rx) = BatchNotifier::new_with_receiver();
528            assert_eq!(
529                *expected_ack_id,
530                idx_ack
531                    .get_ack_id_from_channel(channel.clone(), batch_rx)
532                    .await
533                    .unwrap()
534            );
535        }
536        sleep(time::Duration::from_secs(2)).await;
537        assert!(idx_ack
538            .get_acks_status_from_channel(channel.clone(), &expected_ack_ids)
539            .await
540            .unwrap()
541            .values()
542            .all(|status| *status));
543    }
544}