1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
use std::{
    collections::{HashMap, HashSet},
    sync::{Arc, LazyLock, RwLock},
};

use super::{sink, source, transform, Component};
use crate::config::{ComponentKey, OutputId};

pub const INVARIANT: &str = "Couldn't acquire lock on Vector components. Please report this.";

pub static COMPONENTS: LazyLock<Arc<RwLock<HashMap<ComponentKey, Component>>>> =
    LazyLock::new(|| Arc::new(RwLock::new(HashMap::new())));

/// Filter components with the provided `map_func`
pub fn filter_components<T>(map_func: impl Fn((&ComponentKey, &Component)) -> Option<T>) -> Vec<T> {
    COMPONENTS
        .read()
        .expect(INVARIANT)
        .iter()
        .filter_map(map_func)
        .collect()
}

/// Returns all components
pub fn get_components() -> Vec<Component> {
    filter_components(|(_component_key, components)| Some(components.clone()))
}

/// Filters components, and returns a clone of sources
pub fn get_sources() -> Vec<source::Source> {
    filter_components(|(_, components)| match components {
        Component::Source(s) => Some(s.clone()),
        _ => None,
    })
}

/// Filters components, and returns a clone of transforms
pub fn get_transforms() -> Vec<transform::Transform> {
    filter_components(|(_, components)| match components {
        Component::Transform(t) => Some(t.clone()),
        _ => None,
    })
}

/// Filters components, and returns a clone of sinks
pub fn get_sinks() -> Vec<sink::Sink> {
    filter_components(|(_, components)| match components {
        Component::Sink(s) => Some(s.clone()),
        _ => None,
    })
}

/// Returns the current component component_keys as a HashSet
pub fn get_component_keys() -> HashSet<ComponentKey> {
    COMPONENTS
        .read()
        .expect(INVARIANT)
        .keys()
        .cloned()
        .collect::<HashSet<ComponentKey>>()
}

/// Gets a component by component_key
pub fn component_by_component_key(component_key: &ComponentKey) -> Option<Component> {
    Some(
        COMPONENTS
            .read()
            .expect(INVARIANT)
            .get(component_key)?
            .clone(),
    )
}

/// Gets a component by output_id
pub fn component_by_output_id(output_id: &OutputId) -> Option<Component> {
    filter_components(|(key, component)| {
        if key == &output_id.component {
            Some(component.clone())
        } else {
            None
        }
    })
    .pop()
}

/// Overwrites component state with new components.
pub fn update(new_components: HashMap<ComponentKey, Component>) {
    *COMPONENTS.write().expect(INVARIANT) = new_components
}