vector/topology/
mod.rs

1#![allow(missing_docs)]
2//! Topology contains all topology based types.
3//!
4//! Topology is broken up into two main sections. The first
5//! section contains all the main topology types include `Topology`
6//! and the ability to start, stop and reload a config. The second
7//! part contains config related items including config traits for
8//! each type of component.
9
10pub(super) use vector_lib::fanout;
11pub mod schema;
12
13pub mod builder;
14mod controller;
15mod ready_arrays;
16mod running;
17mod task;
18
19#[cfg(test)]
20mod test;
21
22use std::{
23    panic::AssertUnwindSafe,
24    sync::{Arc, Mutex},
25};
26
27use futures::{Future, FutureExt};
28use tokio::sync::mpsc;
29use vector_lib::buffers::topology::channel::{BufferReceiverStream, BufferSender};
30
31pub use self::builder::TopologyPieces;
32pub use self::controller::{ReloadOutcome, SharedTopologyController, TopologyController};
33pub use self::running::{RunningTopology, ShutdownErrorReceiver};
34
35use self::task::{Task, TaskError, TaskResult};
36use crate::{
37    config::{ComponentKey, Config, ConfigDiff},
38    event::EventArray,
39    signal::ShutdownError,
40};
41
42type TaskHandle = tokio::task::JoinHandle<TaskResult>;
43
44type BuiltBuffer = (
45    BufferSender<EventArray>,
46    Arc<Mutex<Option<BufferReceiverStream<EventArray>>>>,
47);
48
49pub(super) fn take_healthchecks(
50    diff: &ConfigDiff,
51    pieces: &mut TopologyPieces,
52) -> Vec<(ComponentKey, Task)> {
53    (&diff.sinks.to_change | &diff.sinks.to_add)
54        .into_iter()
55        .filter_map(|id| pieces.healthchecks.remove(&id).map(move |task| (id, task)))
56        .collect()
57}
58
59async fn handle_errors(
60    task: impl Future<Output = TaskResult>,
61    abort_tx: mpsc::UnboundedSender<ShutdownError>,
62    error: impl FnOnce(String) -> ShutdownError,
63) -> TaskResult {
64    AssertUnwindSafe(task)
65        .catch_unwind()
66        .await
67        .map_err(|_| TaskError::Panicked)
68        .and_then(|res| res)
69        .map_err(|e| {
70            error!("An error occurred that Vector couldn't handle: {}.", e);
71            _ = abort_tx.send(error(e.to_string()));
72            e
73        })
74}
75
76/// If the closure returns false, then the element is removed
77fn retain<T>(vec: &mut Vec<T>, mut retain_filter: impl FnMut(&mut T) -> bool) {
78    let mut i = 0;
79    while let Some(data) = vec.get_mut(i) {
80        if retain_filter(data) {
81            i += 1;
82        } else {
83            _ = vec.remove(i);
84        }
85    }
86}