1#![allow(missing_docs)]
2pub(super) use vector_lib::fanout;
11pub mod schema;
12
13pub mod builder;
14mod controller;
15mod ready_arrays;
16mod running;
17mod task;
18
19#[cfg(test)]
20mod test;
21
22use std::{
23 panic::AssertUnwindSafe,
24 sync::{Arc, Mutex},
25};
26
27use futures::{Future, FutureExt};
28use tokio::sync::mpsc;
29use vector_lib::buffers::topology::channel::{BufferReceiverStream, BufferSender};
30
31pub use self::builder::TopologyPieces;
32pub use self::controller::{ReloadOutcome, SharedTopologyController, TopologyController};
33pub use self::running::{RunningTopology, ShutdownErrorReceiver};
34
35use self::task::{Task, TaskError, TaskResult};
36use crate::{
37 config::{ComponentKey, Config, ConfigDiff},
38 event::EventArray,
39 signal::ShutdownError,
40};
41
42type TaskHandle = tokio::task::JoinHandle<TaskResult>;
43
44type BuiltBuffer = (
45 BufferSender<EventArray>,
46 Arc<Mutex<Option<BufferReceiverStream<EventArray>>>>,
47);
48
49pub(super) fn take_healthchecks(
50 diff: &ConfigDiff,
51 pieces: &mut TopologyPieces,
52) -> Vec<(ComponentKey, Task)> {
53 (&diff.sinks.to_change | &diff.sinks.to_add)
54 .into_iter()
55 .filter_map(|id| pieces.healthchecks.remove(&id).map(move |task| (id, task)))
56 .collect()
57}
58
59async fn handle_errors(
60 task: impl Future<Output = TaskResult>,
61 abort_tx: mpsc::UnboundedSender<ShutdownError>,
62 error: impl FnOnce(String) -> ShutdownError,
63) -> TaskResult {
64 AssertUnwindSafe(task)
65 .catch_unwind()
66 .await
67 .map_err(|_| TaskError::Panicked)
68 .and_then(|res| res)
69 .map_err(|e| {
70 error!("An error occurred that Vector couldn't handle: {}.", e);
71 _ = abort_tx.send(error(e.to_string()));
72 e
73 })
74}
75
76fn retain<T>(vec: &mut Vec<T>, mut retain_filter: impl FnMut(&mut T) -> bool) {
78 let mut i = 0;
79 while let Some(data) = vec.get_mut(i) {
80 if retain_filter(data) {
81 i += 1;
82 } else {
83 _ = vec.remove(i);
84 }
85 }
86}