vector/topology/
mod.rs

1#![allow(missing_docs)]
2//! Topology contains all topology based types.
3//!
4//! Topology is broken up into two main sections. The first
5//! section contains all the main topology types include `Topology`
6//! and the ability to start, stop and reload a config. The second
7//! part contains config related items including config traits for
8//! each type of component.
9
10pub(super) use vector_lib::fanout;
11pub mod schema;
12
13pub mod builder;
14mod controller;
15mod ready_arrays;
16mod running;
17mod task;
18
19#[cfg(test)]
20mod test;
21
22use std::{
23    panic::AssertUnwindSafe,
24    sync::{Arc, Mutex},
25};
26
27use futures::{Future, FutureExt};
28use tokio::sync::mpsc;
29use vector_lib::buffers::topology::channel::{BufferReceiverStream, BufferSender};
30
31use self::task::{Task, TaskError, TaskResult};
32pub use self::{
33    builder::TopologyPieces,
34    controller::{ReloadOutcome, SharedTopologyController, TopologyController},
35    running::{RunningTopology, ShutdownErrorReceiver},
36};
37use crate::{
38    config::{ComponentKey, Config, ConfigDiff},
39    event::EventArray,
40    signal::ShutdownError,
41};
42
43type TaskHandle = tokio::task::JoinHandle<TaskResult>;
44
45type BuiltBuffer = (
46    BufferSender<EventArray>,
47    Arc<Mutex<Option<BufferReceiverStream<EventArray>>>>,
48);
49
50pub(super) fn take_healthchecks(
51    diff: &ConfigDiff,
52    pieces: &mut TopologyPieces,
53) -> Vec<(ComponentKey, Task)> {
54    (&diff.sinks.to_change | &diff.sinks.to_add)
55        .into_iter()
56        .filter_map(|id| pieces.healthchecks.remove(&id).map(move |task| (id, task)))
57        .collect()
58}
59
60async fn handle_errors(
61    task: impl Future<Output = TaskResult>,
62    abort_tx: mpsc::UnboundedSender<ShutdownError>,
63    error: impl FnOnce(String) -> ShutdownError,
64) -> TaskResult {
65    AssertUnwindSafe(task)
66        .catch_unwind()
67        .await
68        .map_err(|_| TaskError::Panicked)
69        .and_then(|res| res)
70        .map_err(|e| {
71            error!("An error occurred that Vector couldn't handle: {}.", e);
72            _ = abort_tx.send(error(e.to_string()));
73            e
74        })
75}
76
77/// If the closure returns false, then the element is removed
78fn retain<T>(vec: &mut Vec<T>, mut retain_filter: impl FnMut(&mut T) -> bool) {
79    let mut i = 0;
80    while let Some(data) = vec.get_mut(i) {
81        if retain_filter(data) {
82            i += 1;
83        } else {
84            _ = vec.remove(i);
85        }
86    }
87}