vector/sources/splunk_hec/
acknowledgements.rs1use std::{
2 collections::HashMap,
3 num::NonZeroU64,
4 sync::{
5 atomic::{AtomicU64, Ordering},
6 Arc, Mutex, RwLock,
7 },
8 time::{Duration, Instant},
9};
10
11use futures::StreamExt;
12use roaring::RoaringTreemap;
13use serde::{Deserialize, Serialize};
14use tokio::time::interval;
15use vector_lib::configurable::configurable_component;
16use vector_lib::{finalization::BatchStatusReceiver, finalizer::UnorderedFinalizer};
17use warp::Rejection;
18
19use super::ApiError;
20use crate::{event::BatchStatus, shutdown::ShutdownSignal};
21
22#[configurable_component]
24#[derive(Clone, Debug)]
25#[serde(default)]
26pub struct HecAcknowledgementsConfig {
27 pub enabled: Option<bool>,
29
30 #[configurable(metadata(docs::human_name = "Max Number of Pending Acknowledgements"))]
36 pub max_pending_acks: NonZeroU64,
37
38 #[configurable(metadata(docs::human_name = "Max Number of Acknowledgement Channels"))]
42 pub max_number_of_ack_channels: NonZeroU64,
43
44 #[configurable(metadata(
50 docs::human_name = "Max Number of Pending Acknowledgements Per Channel"
51 ))]
52 pub max_pending_acks_per_channel: NonZeroU64,
53
54 #[configurable(metadata(docs::human_name = "Acknowledgement Idle Cleanup"))]
58 pub ack_idle_cleanup: bool,
59
60 pub max_idle_time: NonZeroU64,
66}
67
68impl Default for HecAcknowledgementsConfig {
69 fn default() -> Self {
70 Self {
71 enabled: None,
72 max_pending_acks: NonZeroU64::new(10_000_000).unwrap(),
73 max_number_of_ack_channels: NonZeroU64::new(1_000_000).unwrap(),
74 max_pending_acks_per_channel: NonZeroU64::new(1_000_000).unwrap(),
75 ack_idle_cleanup: false,
76 max_idle_time: NonZeroU64::new(300).unwrap(),
77 }
78 }
79}
80
81impl From<bool> for HecAcknowledgementsConfig {
82 fn from(enabled: bool) -> Self {
83 Self {
84 enabled: Some(enabled),
85 ..Default::default()
86 }
87 }
88}
89
90pub struct IndexerAcknowledgement {
91 max_pending_acks: u64,
92 max_pending_acks_per_channel: u64,
93 max_number_of_ack_channels: u64,
94 channels: Arc<tokio::sync::Mutex<HashMap<String, Arc<Channel>>>>,
95 shutdown: ShutdownSignal,
96 total_pending_acks: AtomicU64,
97}
98
99impl IndexerAcknowledgement {
100 pub fn new(config: HecAcknowledgementsConfig, shutdown: ShutdownSignal) -> Self {
101 let channels: Arc<tokio::sync::Mutex<HashMap<String, Arc<Channel>>>> =
102 Arc::new(tokio::sync::Mutex::new(HashMap::new()));
103 let max_idle_time = u64::from(config.max_idle_time);
104 let idle_task_channels = Arc::clone(&channels);
105
106 if config.ack_idle_cleanup {
107 tokio::spawn(async move {
108 let mut interval = interval(Duration::from_secs(max_idle_time));
109 loop {
110 interval.tick().await;
111 let mut channels = idle_task_channels.lock().await;
112 let now = Instant::now();
113
114 channels.retain(|_, channel| {
115 now.duration_since(channel.get_last_used()).as_secs() <= max_idle_time
116 });
117 }
118 });
119 }
120
121 Self {
122 max_pending_acks: u64::from(config.max_pending_acks),
123 max_pending_acks_per_channel: u64::from(config.max_pending_acks_per_channel),
124 max_number_of_ack_channels: u64::from(config.max_number_of_ack_channels),
125 channels,
126 shutdown,
127 total_pending_acks: AtomicU64::new(0),
128 }
129 }
130
131 async fn create_or_get_channel(&self, id: String) -> Result<Arc<Channel>, Rejection> {
133 let mut channels = self.channels.lock().await;
134 if let Some(channel) = channels.get(&id) {
135 return Ok(Arc::clone(channel));
136 }
137
138 if channels.len() < self.max_number_of_ack_channels as usize {
139 let channel = Arc::new(Channel::new(
141 self.max_pending_acks_per_channel,
142 self.shutdown.clone(),
143 ));
144 channels.insert(id, Arc::clone(&channel));
145 Ok(channel)
146 } else {
147 Err(Rejection::from(ApiError::ServiceUnavailable))
148 }
149 }
150
151 pub async fn get_ack_id_from_channel(
153 &self,
154 channel_id: String,
155 batch_rx: BatchStatusReceiver,
156 ) -> Result<u64, Rejection> {
157 let channel = self.create_or_get_channel(channel_id).await?;
158 let total_pending_acks = self.total_pending_acks.fetch_add(1, Ordering::Relaxed) + 1;
159 if total_pending_acks > self.max_pending_acks
160 && !self.drop_oldest_pending_ack_from_channels().await
161 {
162 self.total_pending_acks.fetch_sub(1, Ordering::Relaxed);
163 return Err(Rejection::from(ApiError::ServiceUnavailable));
164 }
165
166 let ack_id = channel.get_ack_id(batch_rx);
167 Ok(ack_id)
168 }
169
170 pub async fn get_acks_status_from_channel(
172 &self,
173 channel_id: String,
174 ack_ids: &[u64],
175 ) -> Result<HashMap<u64, bool>, Rejection> {
176 let channel = self.create_or_get_channel(channel_id).await?;
177 let acks_status = channel.get_acks_status(ack_ids);
178 let dropped_ack_count = acks_status.values().filter(|status| **status).count();
179 self.total_pending_acks
180 .fetch_sub(dropped_ack_count as u64, Ordering::Relaxed);
181 Ok(acks_status)
182 }
183
184 async fn drop_oldest_pending_ack_from_channels(&self) -> bool {
186 let channels = self.channels.lock().await;
187 let mut ordered_channels = channels.values().collect::<Vec<_>>();
188 ordered_channels.sort_by_key(|a| a.get_last_used());
189 ordered_channels
190 .iter()
191 .any(|channel| channel.drop_oldest_pending_ack())
192 }
193}
194
195pub struct Channel {
196 last_used_timestamp: RwLock<Instant>,
197 currently_available_ack_id: AtomicU64,
198 ack_ids_status: Arc<Mutex<RoaringTreemap>>,
199 ack_event_finalizer: UnorderedFinalizer<u64>,
200}
201
202impl Channel {
203 fn new(max_pending_acks_per_channel: u64, shutdown: ShutdownSignal) -> Self {
204 let ack_ids_status = Arc::new(Mutex::new(RoaringTreemap::new()));
205 let finalizer_ack_ids_status = Arc::clone(&ack_ids_status);
206 let (ack_event_finalizer, mut ack_stream) = UnorderedFinalizer::new(Some(shutdown));
207 tokio::spawn(async move {
208 while let Some((status, ack_id)) = ack_stream.next().await {
209 if status == BatchStatus::Delivered {
210 let mut ack_ids_status = finalizer_ack_ids_status.lock().unwrap();
211 ack_ids_status.insert(ack_id);
212 if ack_ids_status.len() > max_pending_acks_per_channel {
213 match ack_ids_status.min() {
214 Some(min) => ack_ids_status.remove(min),
215 None => unreachable!(
218 "Indexer acknowledgements channel must allow at least one pending ack"
219 ),
220 };
221 }
222 }
223 }
224 });
225
226 Self {
227 last_used_timestamp: RwLock::new(Instant::now()),
228 currently_available_ack_id: AtomicU64::new(0),
229 ack_ids_status,
230 ack_event_finalizer,
231 }
232 }
233
234 fn get_ack_id(&self, batch_rx: BatchStatusReceiver) -> u64 {
235 {
236 let mut last_used_timestamp = self.last_used_timestamp.write().unwrap();
237 *last_used_timestamp = Instant::now();
238 }
239 let ack_id = self
240 .currently_available_ack_id
241 .fetch_add(1, Ordering::Relaxed);
242 self.ack_event_finalizer.add(ack_id, batch_rx);
243 ack_id
244 }
245
246 fn get_acks_status(&self, acks: &[u64]) -> HashMap<u64, bool> {
247 {
248 let mut last_used_timestamp = self.last_used_timestamp.write().unwrap();
249 *last_used_timestamp = Instant::now();
250 }
251 let mut ack_ids_status = self.ack_ids_status.lock().unwrap();
252 acks.iter()
253 .map(|ack_id| (*ack_id, ack_ids_status.remove(*ack_id)))
254 .collect()
255 }
256
257 fn get_last_used(&self) -> Instant {
258 let last_used_timestamp = self.last_used_timestamp.read().unwrap();
259 *last_used_timestamp
260 }
261
262 fn drop_oldest_pending_ack(&self) -> bool {
263 let mut ack_ids_status = self.ack_ids_status.lock().unwrap();
264 match ack_ids_status.min() {
265 Some(ack_id) => ack_ids_status.remove(ack_id),
266 None => false,
267 }
268 }
269}
270
271#[derive(Deserialize, Serialize, Debug)]
272pub struct HecAckStatusRequest {
273 pub acks: Vec<u64>,
274}
275
276#[derive(Deserialize, Serialize, Debug)]
277pub struct HecAckStatusResponse {
278 pub acks: HashMap<u64, bool>,
279}
280
281#[cfg(test)]
282mod tests {
283 use std::num::NonZeroU64;
284
285 use tokio::{time, time::sleep};
286 use vector_lib::event::{BatchNotifier, EventFinalizer, EventStatus};
287
288 use super::{Channel, HecAcknowledgementsConfig, IndexerAcknowledgement};
289 use crate::shutdown::ShutdownSignal;
290
291 #[tokio::test]
292 async fn test_channel_get_ack_id_and_acks_status() {
293 channel_get_ack_id_and_status(EventStatus::Delivered, true).await;
294 }
295
296 #[tokio::test]
297 async fn test_channel_get_ack_id_and_nacks_status() {
298 channel_get_ack_id_and_status(EventStatus::Rejected, false).await;
299 }
300
301 async fn channel_get_ack_id_and_status(status: EventStatus, result: bool) {
302 let shutdown = ShutdownSignal::noop();
303 let max_pending_acks_per_channel = 10;
304 let channel = Channel::new(max_pending_acks_per_channel, shutdown);
305 let expected_ack_ids: Vec<u64> = (0..10).collect();
306
307 for expected_ack_id in &expected_ack_ids {
308 let (tx, batch_rx) = BatchNotifier::new_with_receiver();
309 assert_eq!(*expected_ack_id, channel.get_ack_id(batch_rx));
310 EventFinalizer::new(tx).update_status(status);
311 }
312 sleep(time::Duration::from_secs(1)).await;
314 assert!(channel
315 .get_acks_status(&expected_ack_ids)
316 .values()
317 .all(|&status| status == result));
318 }
319
320 #[tokio::test]
321 async fn test_channel_get_acks_status_repeat() {
322 let shutdown = ShutdownSignal::noop();
323 let max_pending_acks_per_channel = 10;
324 let channel = Channel::new(max_pending_acks_per_channel, shutdown);
325 let expected_ack_ids: Vec<u64> = (0..10).collect();
326
327 for expected_ack_id in &expected_ack_ids {
328 let (_tx, batch_rx) = BatchNotifier::new_with_receiver();
329 assert_eq!(*expected_ack_id, channel.get_ack_id(batch_rx));
330 }
331 sleep(time::Duration::from_secs(1)).await;
333 assert!(channel
334 .get_acks_status(&expected_ack_ids)
335 .values()
336 .all(|status| *status));
337 assert!(channel
339 .get_acks_status(&expected_ack_ids)
340 .values()
341 .all(|status| !*status));
342 }
343
344 #[tokio::test]
345 async fn test_channel_get_ack_id_exceed_max_pending_acks_per_channel() {
346 let shutdown = ShutdownSignal::noop();
347 let max_pending_acks_per_channel = 10;
348 let channel = Channel::new(max_pending_acks_per_channel, shutdown);
349 let dropped_pending_ack_ids: Vec<u64> = (0..10).collect();
350 let expected_ack_ids: Vec<u64> = (10..20).collect();
351
352 for ack_id in dropped_pending_ack_ids
353 .iter()
354 .chain(expected_ack_ids.iter())
355 {
356 let (_tx, batch_rx) = BatchNotifier::new_with_receiver();
357 assert_eq!(*ack_id, channel.get_ack_id(batch_rx));
358 }
359 sleep(time::Duration::from_secs(1)).await;
361 assert!(channel
363 .get_acks_status(&dropped_pending_ack_ids)
364 .values()
365 .all(|status| !*status));
366 assert!(channel
368 .get_acks_status(&expected_ack_ids)
369 .values()
370 .all(|status| *status));
371 }
372
373 #[tokio::test]
374 async fn test_indexer_ack_exceed_max_pending_acks_drop_acks() {
375 let shutdown = ShutdownSignal::noop();
376 let config = HecAcknowledgementsConfig {
377 enabled: Some(true),
378 max_pending_acks: NonZeroU64::new(10).unwrap(),
379 ..Default::default()
380 };
381 let idx_ack = IndexerAcknowledgement::new(config, shutdown);
382 let channel = String::from("channel-id");
383
384 let dropped_pending_ack_ids: Vec<u64> = (0..10).collect();
385 let expected_ack_ids: Vec<u64> = (10..20).collect();
386
387 for ack_id in dropped_pending_ack_ids
388 .iter()
389 .chain(expected_ack_ids.iter())
390 {
391 let (_tx, batch_rx) = BatchNotifier::new_with_receiver();
392 assert_eq!(
393 *ack_id,
394 idx_ack
395 .get_ack_id_from_channel(channel.clone(), batch_rx)
396 .await
397 .unwrap()
398 );
399 sleep(time::Duration::from_millis(100)).await;
400 }
401 sleep(time::Duration::from_secs(1)).await;
402 assert!(idx_ack
403 .get_acks_status_from_channel(channel.clone(), &dropped_pending_ack_ids)
404 .await
405 .unwrap()
406 .values()
407 .all(|status| !*status));
408 assert!(idx_ack
409 .get_acks_status_from_channel(channel, &expected_ack_ids)
410 .await
411 .unwrap()
412 .values()
413 .all(|status| *status));
414 }
415
416 #[tokio::test]
417 async fn test_indexer_ack_exceed_max_pending_acks_server_busy() {
418 let shutdown = ShutdownSignal::noop();
419 let config = HecAcknowledgementsConfig {
420 enabled: Some(true),
421 max_pending_acks: NonZeroU64::new(1).unwrap(),
422 ..Default::default()
423 };
424 let idx_ack = IndexerAcknowledgement::new(config, shutdown);
425 let channel = String::from("channel-id");
426
427 let (_tx, batch_rx) = BatchNotifier::new_with_receiver();
428 idx_ack
429 .get_ack_id_from_channel(channel.clone(), batch_rx)
430 .await
431 .unwrap();
432
433 let (_tx, batch_rx) = BatchNotifier::new_with_receiver();
434 assert!(idx_ack
435 .get_ack_id_from_channel(channel.clone(), batch_rx)
436 .await
437 .is_err());
438 }
439
440 #[tokio::test]
441 async fn test_indexer_ack_create_channels() {
442 let shutdown = ShutdownSignal::noop();
443 let config = HecAcknowledgementsConfig {
444 enabled: Some(true),
445 ..Default::default()
446 };
447 let idx_ack = IndexerAcknowledgement::new(config, shutdown);
448
449 let channel_one = idx_ack
450 .create_or_get_channel(String::from("channel-id-1"))
451 .await
452 .unwrap();
453 let channel_two = idx_ack
454 .create_or_get_channel(String::from("channel-id-2"))
455 .await
456 .unwrap();
457
458 let (_tx, batch_rx) = BatchNotifier::new_with_receiver();
459 let channel_one_ack_id = channel_one.get_ack_id(batch_rx);
460 drop(_tx);
461 let (_tx, batch_rx) = BatchNotifier::new_with_receiver();
462 let channel_two_ack_id = channel_two.get_ack_id(batch_rx);
463 drop(_tx);
464
465 assert_eq!(0, channel_one_ack_id);
466 assert_eq!(0, channel_two_ack_id);
467 }
468
469 #[tokio::test]
470 async fn test_indexer_ack_create_channels_exceed_max_number_of_ack_channels() {
471 let shutdown = ShutdownSignal::noop();
472 let config = HecAcknowledgementsConfig {
473 enabled: Some(true),
474 max_number_of_ack_channels: NonZeroU64::new(1).unwrap(),
475 ..Default::default()
476 };
477 let idx_ack = IndexerAcknowledgement::new(config, shutdown);
478
479 let _channel_one = idx_ack
480 .create_or_get_channel(String::from("channel-id-1"))
481 .await
482 .unwrap();
483
484 assert!(idx_ack
485 .create_or_get_channel(String::from("channel-id-2"))
486 .await
487 .is_err());
488 }
489
490 #[tokio::test]
491 async fn test_indexer_ack_channel_idle_expiration() {
492 let shutdown = ShutdownSignal::noop();
493 let config = HecAcknowledgementsConfig {
494 enabled: Some(true),
495 max_number_of_ack_channels: NonZeroU64::new(1).unwrap(),
496 ack_idle_cleanup: true,
497 max_idle_time: NonZeroU64::new(1).unwrap(),
498 ..Default::default()
499 };
500 let idx_ack = IndexerAcknowledgement::new(config, shutdown);
501 let _channel = idx_ack
502 .create_or_get_channel(String::from("channel-id-1"))
503 .await
504 .unwrap();
505 sleep(time::Duration::from_secs(3)).await;
507 assert!(idx_ack
508 .create_or_get_channel(String::from("channel-id-2"))
509 .await
510 .is_ok());
511 }
512
513 #[tokio::test]
514 async fn test_indexer_ack_channel_active_does_not_expire() {
515 let shutdown = ShutdownSignal::noop();
516 let config = HecAcknowledgementsConfig {
517 enabled: Some(true),
518 ack_idle_cleanup: true,
519 max_idle_time: NonZeroU64::new(2).unwrap(),
520 ..Default::default()
521 };
522 let idx_ack = IndexerAcknowledgement::new(config, shutdown);
523 let channel = String::from("channel-id");
524 let expected_ack_ids: Vec<u64> = (0..10).collect();
525
526 for expected_ack_id in &expected_ack_ids {
527 let (_tx, batch_rx) = BatchNotifier::new_with_receiver();
528 assert_eq!(
529 *expected_ack_id,
530 idx_ack
531 .get_ack_id_from_channel(channel.clone(), batch_rx)
532 .await
533 .unwrap()
534 );
535 }
536 sleep(time::Duration::from_secs(2)).await;
537 assert!(idx_ack
538 .get_acks_status_from_channel(channel.clone(), &expected_ack_ids)
539 .await
540 .unwrap()
541 .values()
542 .all(|status| *status));
543 }
544}