vector/sources/splunk_hec/
acknowledgements.rs1use std::{
2 collections::HashMap,
3 num::NonZeroU64,
4 sync::{
5 Arc, Mutex, RwLock,
6 atomic::{AtomicU64, Ordering},
7 },
8 time::{Duration, Instant},
9};
10
11use futures::StreamExt;
12use roaring::RoaringTreemap;
13use serde::{Deserialize, Serialize};
14use tokio::time::interval;
15use vector_lib::{
16 configurable::configurable_component, finalization::BatchStatusReceiver,
17 finalizer::UnorderedFinalizer,
18};
19use warp::Rejection;
20
21use super::ApiError;
22use crate::{event::BatchStatus, shutdown::ShutdownSignal};
23
24#[configurable_component]
26#[derive(Clone, Debug)]
27#[serde(default)]
28pub struct HecAcknowledgementsConfig {
29 pub enabled: Option<bool>,
31
32 #[configurable(metadata(docs::human_name = "Max Number of Pending Acknowledgements"))]
38 pub max_pending_acks: NonZeroU64,
39
40 #[configurable(metadata(docs::human_name = "Max Number of Acknowledgement Channels"))]
44 pub max_number_of_ack_channels: NonZeroU64,
45
46 #[configurable(metadata(
52 docs::human_name = "Max Number of Pending Acknowledgements Per Channel"
53 ))]
54 pub max_pending_acks_per_channel: NonZeroU64,
55
56 #[configurable(metadata(docs::human_name = "Acknowledgement Idle Cleanup"))]
60 pub ack_idle_cleanup: bool,
61
62 pub max_idle_time: NonZeroU64,
68}
69
70impl Default for HecAcknowledgementsConfig {
71 fn default() -> Self {
72 Self {
73 enabled: None,
74 max_pending_acks: NonZeroU64::new(10_000_000).unwrap(),
75 max_number_of_ack_channels: NonZeroU64::new(1_000_000).unwrap(),
76 max_pending_acks_per_channel: NonZeroU64::new(1_000_000).unwrap(),
77 ack_idle_cleanup: false,
78 max_idle_time: NonZeroU64::new(300).unwrap(),
79 }
80 }
81}
82
83impl From<bool> for HecAcknowledgementsConfig {
84 fn from(enabled: bool) -> Self {
85 Self {
86 enabled: Some(enabled),
87 ..Default::default()
88 }
89 }
90}
91
92pub struct IndexerAcknowledgement {
93 max_pending_acks: u64,
94 max_pending_acks_per_channel: u64,
95 max_number_of_ack_channels: u64,
96 channels: Arc<tokio::sync::Mutex<HashMap<String, Arc<Channel>>>>,
97 shutdown: ShutdownSignal,
98 total_pending_acks: AtomicU64,
99}
100
101impl IndexerAcknowledgement {
102 pub fn new(config: HecAcknowledgementsConfig, shutdown: ShutdownSignal) -> Self {
103 let channels: Arc<tokio::sync::Mutex<HashMap<String, Arc<Channel>>>> =
104 Arc::new(tokio::sync::Mutex::new(HashMap::new()));
105 let max_idle_time = u64::from(config.max_idle_time);
106 let idle_task_channels = Arc::clone(&channels);
107
108 if config.ack_idle_cleanup {
109 tokio::spawn(async move {
110 let mut interval = interval(Duration::from_secs(max_idle_time));
111 loop {
112 interval.tick().await;
113 let mut channels = idle_task_channels.lock().await;
114 let now = Instant::now();
115
116 channels.retain(|_, channel| {
117 now.duration_since(channel.get_last_used()).as_secs() <= max_idle_time
118 });
119 }
120 });
121 }
122
123 Self {
124 max_pending_acks: u64::from(config.max_pending_acks),
125 max_pending_acks_per_channel: u64::from(config.max_pending_acks_per_channel),
126 max_number_of_ack_channels: u64::from(config.max_number_of_ack_channels),
127 channels,
128 shutdown,
129 total_pending_acks: AtomicU64::new(0),
130 }
131 }
132
133 async fn create_or_get_channel(&self, id: String) -> Result<Arc<Channel>, Rejection> {
135 let mut channels = self.channels.lock().await;
136 if let Some(channel) = channels.get(&id) {
137 return Ok(Arc::clone(channel));
138 }
139
140 if channels.len() < self.max_number_of_ack_channels as usize {
141 let channel = Arc::new(Channel::new(
143 self.max_pending_acks_per_channel,
144 self.shutdown.clone(),
145 ));
146 channels.insert(id, Arc::clone(&channel));
147 Ok(channel)
148 } else {
149 Err(Rejection::from(ApiError::ServiceUnavailable))
150 }
151 }
152
153 pub async fn get_ack_id_from_channel(
155 &self,
156 channel_id: String,
157 batch_rx: BatchStatusReceiver,
158 ) -> Result<u64, Rejection> {
159 let channel = self.create_or_get_channel(channel_id).await?;
160 let total_pending_acks = self.total_pending_acks.fetch_add(1, Ordering::Relaxed) + 1;
161 if total_pending_acks > self.max_pending_acks
162 && !self.drop_oldest_pending_ack_from_channels().await
163 {
164 self.total_pending_acks.fetch_sub(1, Ordering::Relaxed);
165 return Err(Rejection::from(ApiError::ServiceUnavailable));
166 }
167
168 let ack_id = channel.get_ack_id(batch_rx);
169 Ok(ack_id)
170 }
171
172 pub async fn get_acks_status_from_channel(
174 &self,
175 channel_id: String,
176 ack_ids: &[u64],
177 ) -> Result<HashMap<u64, bool>, Rejection> {
178 let channel = self.create_or_get_channel(channel_id).await?;
179 let acks_status = channel.get_acks_status(ack_ids);
180 let dropped_ack_count = acks_status.values().filter(|status| **status).count();
181 self.total_pending_acks
182 .fetch_sub(dropped_ack_count as u64, Ordering::Relaxed);
183 Ok(acks_status)
184 }
185
186 async fn drop_oldest_pending_ack_from_channels(&self) -> bool {
188 let channels = self.channels.lock().await;
189 let mut ordered_channels = channels.values().collect::<Vec<_>>();
190 ordered_channels.sort_by_key(|a| a.get_last_used());
191 ordered_channels
192 .iter()
193 .any(|channel| channel.drop_oldest_pending_ack())
194 }
195}
196
197pub struct Channel {
198 last_used_timestamp: RwLock<Instant>,
199 currently_available_ack_id: AtomicU64,
200 ack_ids_status: Arc<Mutex<RoaringTreemap>>,
201 ack_event_finalizer: UnorderedFinalizer<u64>,
202}
203
204impl Channel {
205 fn new(max_pending_acks_per_channel: u64, shutdown: ShutdownSignal) -> Self {
206 let ack_ids_status = Arc::new(Mutex::new(RoaringTreemap::new()));
207 let finalizer_ack_ids_status = Arc::clone(&ack_ids_status);
208 let (ack_event_finalizer, mut ack_stream) = UnorderedFinalizer::new(Some(shutdown));
209 tokio::spawn(async move {
210 while let Some((status, ack_id)) = ack_stream.next().await {
211 if status == BatchStatus::Delivered {
212 let mut ack_ids_status = finalizer_ack_ids_status.lock().unwrap();
213 ack_ids_status.insert(ack_id);
214 if ack_ids_status.len() > max_pending_acks_per_channel {
215 match ack_ids_status.min() {
216 Some(min) => ack_ids_status.remove(min),
217 None => unreachable!(
220 "Indexer acknowledgements channel must allow at least one pending ack"
221 ),
222 };
223 }
224 }
225 }
226 });
227
228 Self {
229 last_used_timestamp: RwLock::new(Instant::now()),
230 currently_available_ack_id: AtomicU64::new(0),
231 ack_ids_status,
232 ack_event_finalizer,
233 }
234 }
235
236 fn get_ack_id(&self, batch_rx: BatchStatusReceiver) -> u64 {
237 {
238 let mut last_used_timestamp = self.last_used_timestamp.write().unwrap();
239 *last_used_timestamp = Instant::now();
240 }
241 let ack_id = self
242 .currently_available_ack_id
243 .fetch_add(1, Ordering::Relaxed);
244 self.ack_event_finalizer.add(ack_id, batch_rx);
245 ack_id
246 }
247
248 fn get_acks_status(&self, acks: &[u64]) -> HashMap<u64, bool> {
249 {
250 let mut last_used_timestamp = self.last_used_timestamp.write().unwrap();
251 *last_used_timestamp = Instant::now();
252 }
253 let mut ack_ids_status = self.ack_ids_status.lock().unwrap();
254 acks.iter()
255 .map(|ack_id| (*ack_id, ack_ids_status.remove(*ack_id)))
256 .collect()
257 }
258
259 fn get_last_used(&self) -> Instant {
260 let last_used_timestamp = self.last_used_timestamp.read().unwrap();
261 *last_used_timestamp
262 }
263
264 fn drop_oldest_pending_ack(&self) -> bool {
265 let mut ack_ids_status = self.ack_ids_status.lock().unwrap();
266 match ack_ids_status.min() {
267 Some(ack_id) => ack_ids_status.remove(ack_id),
268 None => false,
269 }
270 }
271}
272
273#[derive(Deserialize, Serialize, Debug)]
274pub struct HecAckStatusRequest {
275 pub acks: Vec<u64>,
276}
277
278#[derive(Deserialize, Serialize, Debug)]
279pub struct HecAckStatusResponse {
280 pub acks: HashMap<u64, bool>,
281}
282
283#[cfg(test)]
284mod tests {
285 use std::num::NonZeroU64;
286
287 use tokio::{time, time::sleep};
288 use vector_lib::event::{BatchNotifier, EventFinalizer, EventStatus};
289
290 use super::{Channel, HecAcknowledgementsConfig, IndexerAcknowledgement};
291 use crate::shutdown::ShutdownSignal;
292
293 #[tokio::test]
294 async fn test_channel_get_ack_id_and_acks_status() {
295 channel_get_ack_id_and_status(EventStatus::Delivered, true).await;
296 }
297
298 #[tokio::test]
299 async fn test_channel_get_ack_id_and_nacks_status() {
300 channel_get_ack_id_and_status(EventStatus::Rejected, false).await;
301 }
302
303 async fn channel_get_ack_id_and_status(status: EventStatus, result: bool) {
304 let shutdown = ShutdownSignal::noop();
305 let max_pending_acks_per_channel = 10;
306 let channel = Channel::new(max_pending_acks_per_channel, shutdown);
307 let expected_ack_ids: Vec<u64> = (0..10).collect();
308
309 for expected_ack_id in &expected_ack_ids {
310 let (tx, batch_rx) = BatchNotifier::new_with_receiver();
311 assert_eq!(*expected_ack_id, channel.get_ack_id(batch_rx));
312 EventFinalizer::new(tx).update_status(status);
313 }
314 sleep(time::Duration::from_secs(1)).await;
316 assert!(
317 channel
318 .get_acks_status(&expected_ack_ids)
319 .values()
320 .all(|&status| status == result)
321 );
322 }
323
324 #[tokio::test]
325 async fn test_channel_get_acks_status_repeat() {
326 let shutdown = ShutdownSignal::noop();
327 let max_pending_acks_per_channel = 10;
328 let channel = Channel::new(max_pending_acks_per_channel, shutdown);
329 let expected_ack_ids: Vec<u64> = (0..10).collect();
330
331 for expected_ack_id in &expected_ack_ids {
332 let (_tx, batch_rx) = BatchNotifier::new_with_receiver();
333 assert_eq!(*expected_ack_id, channel.get_ack_id(batch_rx));
334 }
335 sleep(time::Duration::from_secs(1)).await;
337 assert!(
338 channel
339 .get_acks_status(&expected_ack_ids)
340 .values()
341 .all(|status| *status)
342 );
343 assert!(
345 channel
346 .get_acks_status(&expected_ack_ids)
347 .values()
348 .all(|status| !*status)
349 );
350 }
351
352 #[tokio::test]
353 async fn test_channel_get_ack_id_exceed_max_pending_acks_per_channel() {
354 let shutdown = ShutdownSignal::noop();
355 let max_pending_acks_per_channel = 10;
356 let channel = Channel::new(max_pending_acks_per_channel, shutdown);
357 let dropped_pending_ack_ids: Vec<u64> = (0..10).collect();
358 let expected_ack_ids: Vec<u64> = (10..20).collect();
359
360 for ack_id in dropped_pending_ack_ids
361 .iter()
362 .chain(expected_ack_ids.iter())
363 {
364 let (_tx, batch_rx) = BatchNotifier::new_with_receiver();
365 assert_eq!(*ack_id, channel.get_ack_id(batch_rx));
366 }
367 sleep(time::Duration::from_secs(1)).await;
369 assert!(
371 channel
372 .get_acks_status(&dropped_pending_ack_ids)
373 .values()
374 .all(|status| !*status)
375 );
376 assert!(
378 channel
379 .get_acks_status(&expected_ack_ids)
380 .values()
381 .all(|status| *status)
382 );
383 }
384
385 #[tokio::test]
386 async fn test_indexer_ack_exceed_max_pending_acks_drop_acks() {
387 let shutdown = ShutdownSignal::noop();
388 let config = HecAcknowledgementsConfig {
389 enabled: Some(true),
390 max_pending_acks: NonZeroU64::new(10).unwrap(),
391 ..Default::default()
392 };
393 let idx_ack = IndexerAcknowledgement::new(config, shutdown);
394 let channel = String::from("channel-id");
395
396 let dropped_pending_ack_ids: Vec<u64> = (0..10).collect();
397 let expected_ack_ids: Vec<u64> = (10..20).collect();
398
399 for ack_id in dropped_pending_ack_ids
400 .iter()
401 .chain(expected_ack_ids.iter())
402 {
403 let (_tx, batch_rx) = BatchNotifier::new_with_receiver();
404 assert_eq!(
405 *ack_id,
406 idx_ack
407 .get_ack_id_from_channel(channel.clone(), batch_rx)
408 .await
409 .unwrap()
410 );
411 sleep(time::Duration::from_millis(100)).await;
412 }
413 sleep(time::Duration::from_secs(1)).await;
414 assert!(
415 idx_ack
416 .get_acks_status_from_channel(channel.clone(), &dropped_pending_ack_ids)
417 .await
418 .unwrap()
419 .values()
420 .all(|status| !*status)
421 );
422 assert!(
423 idx_ack
424 .get_acks_status_from_channel(channel, &expected_ack_ids)
425 .await
426 .unwrap()
427 .values()
428 .all(|status| *status)
429 );
430 }
431
432 #[tokio::test]
433 async fn test_indexer_ack_exceed_max_pending_acks_server_busy() {
434 let shutdown = ShutdownSignal::noop();
435 let config = HecAcknowledgementsConfig {
436 enabled: Some(true),
437 max_pending_acks: NonZeroU64::new(1).unwrap(),
438 ..Default::default()
439 };
440 let idx_ack = IndexerAcknowledgement::new(config, shutdown);
441 let channel = String::from("channel-id");
442
443 let (_tx, batch_rx) = BatchNotifier::new_with_receiver();
444 idx_ack
445 .get_ack_id_from_channel(channel.clone(), batch_rx)
446 .await
447 .unwrap();
448
449 let (_tx, batch_rx) = BatchNotifier::new_with_receiver();
450 assert!(
451 idx_ack
452 .get_ack_id_from_channel(channel.clone(), batch_rx)
453 .await
454 .is_err()
455 );
456 }
457
458 #[tokio::test]
459 async fn test_indexer_ack_create_channels() {
460 let shutdown = ShutdownSignal::noop();
461 let config = HecAcknowledgementsConfig {
462 enabled: Some(true),
463 ..Default::default()
464 };
465 let idx_ack = IndexerAcknowledgement::new(config, shutdown);
466
467 let channel_one = idx_ack
468 .create_or_get_channel(String::from("channel-id-1"))
469 .await
470 .unwrap();
471 let channel_two = idx_ack
472 .create_or_get_channel(String::from("channel-id-2"))
473 .await
474 .unwrap();
475
476 let (_tx, batch_rx) = BatchNotifier::new_with_receiver();
477 let channel_one_ack_id = channel_one.get_ack_id(batch_rx);
478 drop(_tx);
479 let (_tx, batch_rx) = BatchNotifier::new_with_receiver();
480 let channel_two_ack_id = channel_two.get_ack_id(batch_rx);
481 drop(_tx);
482
483 assert_eq!(0, channel_one_ack_id);
484 assert_eq!(0, channel_two_ack_id);
485 }
486
487 #[tokio::test]
488 async fn test_indexer_ack_create_channels_exceed_max_number_of_ack_channels() {
489 let shutdown = ShutdownSignal::noop();
490 let config = HecAcknowledgementsConfig {
491 enabled: Some(true),
492 max_number_of_ack_channels: NonZeroU64::new(1).unwrap(),
493 ..Default::default()
494 };
495 let idx_ack = IndexerAcknowledgement::new(config, shutdown);
496
497 let _channel_one = idx_ack
498 .create_or_get_channel(String::from("channel-id-1"))
499 .await
500 .unwrap();
501
502 assert!(
503 idx_ack
504 .create_or_get_channel(String::from("channel-id-2"))
505 .await
506 .is_err()
507 );
508 }
509
510 #[tokio::test]
511 async fn test_indexer_ack_channel_idle_expiration() {
512 let shutdown = ShutdownSignal::noop();
513 let config = HecAcknowledgementsConfig {
514 enabled: Some(true),
515 max_number_of_ack_channels: NonZeroU64::new(1).unwrap(),
516 ack_idle_cleanup: true,
517 max_idle_time: NonZeroU64::new(1).unwrap(),
518 ..Default::default()
519 };
520 let idx_ack = IndexerAcknowledgement::new(config, shutdown);
521 let _channel = idx_ack
522 .create_or_get_channel(String::from("channel-id-1"))
523 .await
524 .unwrap();
525 sleep(time::Duration::from_secs(3)).await;
527 assert!(
528 idx_ack
529 .create_or_get_channel(String::from("channel-id-2"))
530 .await
531 .is_ok()
532 );
533 }
534
535 #[tokio::test]
536 async fn test_indexer_ack_channel_active_does_not_expire() {
537 let shutdown = ShutdownSignal::noop();
538 let config = HecAcknowledgementsConfig {
539 enabled: Some(true),
540 ack_idle_cleanup: true,
541 max_idle_time: NonZeroU64::new(2).unwrap(),
542 ..Default::default()
543 };
544 let idx_ack = IndexerAcknowledgement::new(config, shutdown);
545 let channel = String::from("channel-id");
546 let expected_ack_ids: Vec<u64> = (0..10).collect();
547
548 for expected_ack_id in &expected_ack_ids {
549 let (_tx, batch_rx) = BatchNotifier::new_with_receiver();
550 assert_eq!(
551 *expected_ack_id,
552 idx_ack
553 .get_ack_id_from_channel(channel.clone(), batch_rx)
554 .await
555 .unwrap()
556 );
557 }
558 sleep(time::Duration::from_secs(2)).await;
559 assert!(
560 idx_ack
561 .get_acks_status_from_channel(channel.clone(), &expected_ack_ids)
562 .await
563 .unwrap()
564 .values()
565 .all(|status| *status)
566 );
567 }
568}