1#![allow(missing_docs)]
2pub(super) use vector_lib::fanout;
11pub mod schema;
12
13pub mod builder;
14mod controller;
15mod ready_arrays;
16mod running;
17mod task;
18
19#[cfg(test)]
20mod test;
21
22use std::{
23 panic::AssertUnwindSafe,
24 sync::{Arc, Mutex},
25};
26
27use futures::{Future, FutureExt};
28use tokio::sync::mpsc;
29use vector_lib::buffers::topology::channel::{BufferReceiverStream, BufferSender};
30
31use self::task::{Task, TaskError, TaskResult};
32pub use self::{
33 builder::TopologyPieces,
34 controller::{ReloadOutcome, SharedTopologyController, TopologyController},
35 running::{RunningTopology, ShutdownErrorReceiver},
36};
37use crate::{
38 config::{ComponentKey, Config, ConfigDiff},
39 event::EventArray,
40 signal::ShutdownError,
41};
42
43type TaskHandle = tokio::task::JoinHandle<TaskResult>;
44
45type BuiltBuffer = (
46 BufferSender<EventArray>,
47 Arc<Mutex<Option<BufferReceiverStream<EventArray>>>>,
48);
49
50pub(super) fn take_healthchecks(
51 diff: &ConfigDiff,
52 pieces: &mut TopologyPieces,
53) -> Vec<(ComponentKey, Task)> {
54 (&diff.sinks.to_change | &diff.sinks.to_add)
55 .into_iter()
56 .filter_map(|id| pieces.healthchecks.remove(&id).map(move |task| (id, task)))
57 .collect()
58}
59
60async fn handle_errors(
61 task: impl Future<Output = TaskResult>,
62 abort_tx: mpsc::UnboundedSender<ShutdownError>,
63 error: impl FnOnce(String) -> ShutdownError,
64) -> TaskResult {
65 AssertUnwindSafe(task)
66 .catch_unwind()
67 .await
68 .map_err(|_| TaskError::Panicked)
69 .and_then(|res| res)
70 .map_err(|e| {
71 error!("An error occurred that Vector couldn't handle: {}.", e);
72 _ = abort_tx.send(error(e.to_string()));
73 e
74 })
75}
76
77fn retain<T>(vec: &mut Vec<T>, mut retain_filter: impl FnMut(&mut T) -> bool) {
79 let mut i = 0;
80 while let Some(data) = vec.get_mut(i) {
81 if retain_filter(data) {
82 i += 1;
83 } else {
84 _ = vec.remove(i);
85 }
86 }
87}