vector/api/schema/components/
state.rs

1use std::{
2    collections::{HashMap, HashSet},
3    sync::{Arc, LazyLock, RwLock},
4};
5
6use super::{sink, source, transform, Component};
7use crate::config::{ComponentKey, OutputId};
8
9pub const INVARIANT: &str = "Couldn't acquire lock on Vector components. Please report this.";
10
11pub static COMPONENTS: LazyLock<Arc<RwLock<HashMap<ComponentKey, Component>>>> =
12    LazyLock::new(|| Arc::new(RwLock::new(HashMap::new())));
13
14/// Filter components with the provided `map_func`
15pub fn filter_components<T>(map_func: impl Fn((&ComponentKey, &Component)) -> Option<T>) -> Vec<T> {
16    COMPONENTS
17        .read()
18        .expect(INVARIANT)
19        .iter()
20        .filter_map(map_func)
21        .collect()
22}
23
24/// Returns all components
25pub fn get_components() -> Vec<Component> {
26    filter_components(|(_component_key, components)| Some(components.clone()))
27}
28
29/// Filters components, and returns a clone of sources
30pub fn get_sources() -> Vec<source::Source> {
31    filter_components(|(_, components)| match components {
32        Component::Source(s) => Some(s.clone()),
33        _ => None,
34    })
35}
36
37/// Filters components, and returns a clone of transforms
38pub fn get_transforms() -> Vec<transform::Transform> {
39    filter_components(|(_, components)| match components {
40        Component::Transform(t) => Some(t.clone()),
41        _ => None,
42    })
43}
44
45/// Filters components, and returns a clone of sinks
46pub fn get_sinks() -> Vec<sink::Sink> {
47    filter_components(|(_, components)| match components {
48        Component::Sink(s) => Some(s.clone()),
49        _ => None,
50    })
51}
52
53/// Returns the current component component_keys as a HashSet
54pub fn get_component_keys() -> HashSet<ComponentKey> {
55    COMPONENTS
56        .read()
57        .expect(INVARIANT)
58        .keys()
59        .cloned()
60        .collect::<HashSet<ComponentKey>>()
61}
62
63/// Gets a component by component_key
64pub fn component_by_component_key(component_key: &ComponentKey) -> Option<Component> {
65    Some(
66        COMPONENTS
67            .read()
68            .expect(INVARIANT)
69            .get(component_key)?
70            .clone(),
71    )
72}
73
74/// Gets a component by output_id
75pub fn component_by_output_id(output_id: &OutputId) -> Option<Component> {
76    filter_components(|(key, component)| {
77        if key == &output_id.component {
78            Some(component.clone())
79        } else {
80            None
81        }
82    })
83    .pop()
84}
85
86/// Overwrites component state with new components.
87pub fn update(new_components: HashMap<ComponentKey, Component>) {
88    *COMPONENTS.write().expect(INVARIANT) = new_components
89}