vector/sinks/util/service/
concurrency.rs

1use std::{cell::RefCell, fmt};
2
3use serde::{
4    Deserialize, Deserializer, Serialize, Serializer,
5    de::{self, Unexpected, Visitor},
6};
7use serde_json::Value;
8use vector_lib::configurable::{
9    Configurable, GenerateError, Metadata, ToValue,
10    attributes::CustomAttribute,
11    schema::{
12        SchemaGenerator, SchemaObject, apply_base_metadata, generate_const_string_schema,
13        generate_number_schema, generate_one_of_schema,
14    },
15};
16
17/// Configuration for outbound request concurrency.
18///
19/// This can be set either to one of the below enum values or to a positive integer, which denotes
20/// a fixed concurrency limit.
21#[derive(Clone, Copy, Debug, Derivative, Eq, PartialEq, Default)]
22pub enum Concurrency {
23    /// A fixed concurrency of 1.
24    ///
25    /// Only one request can be outstanding at any given time.
26    None,
27
28    /// Concurrency is managed by the [Adaptive Request Concurrency][arc] feature.
29    ///
30    /// [arc]: https://vector.dev/docs/architecture/arc/
31    #[default]
32    Adaptive,
33
34    /// A fixed amount of concurrency is allowed.
35    Fixed(usize),
36}
37
38impl Serialize for Concurrency {
39    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
40    where
41        S: Serializer,
42    {
43        match &self {
44            Concurrency::None => serializer.serialize_str("none"),
45            Concurrency::Adaptive => serializer.serialize_str("adaptive"),
46            Concurrency::Fixed(i) => serializer.serialize_u64(*i as u64),
47        }
48    }
49}
50
51impl Concurrency {
52    pub const fn parse_concurrency(&self) -> Option<usize> {
53        match self {
54            Concurrency::None => Some(1),
55            Concurrency::Adaptive => None,
56            Concurrency::Fixed(limit) => Some(*limit),
57        }
58    }
59}
60
61impl<'de> Deserialize<'de> for Concurrency {
62    // Deserialize either a positive integer or the string "adaptive"
63    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
64    where
65        D: Deserializer<'de>,
66    {
67        struct UsizeOrAdaptive;
68
69        impl Visitor<'_> for UsizeOrAdaptive {
70            type Value = Concurrency;
71
72            fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
73                formatter.write_str(r#"positive integer, "adaptive", or "none" "#)
74            }
75
76            fn visit_str<E: de::Error>(self, value: &str) -> Result<Concurrency, E> {
77                if value == "adaptive" {
78                    Ok(Concurrency::Adaptive)
79                } else if value == "none" {
80                    Ok(Concurrency::None)
81                } else {
82                    Err(de::Error::unknown_variant(value, &["adaptive", "none"]))
83                }
84            }
85
86            fn visit_i64<E: de::Error>(self, value: i64) -> Result<Concurrency, E> {
87                if value > 0 {
88                    Ok(Concurrency::Fixed(value as usize))
89                } else {
90                    Err(de::Error::invalid_value(
91                        Unexpected::Signed(value),
92                        &"positive integer",
93                    ))
94                }
95            }
96
97            fn visit_u64<E: de::Error>(self, value: u64) -> Result<Concurrency, E> {
98                if value > 0 {
99                    Ok(Concurrency::Fixed(value as usize))
100                } else {
101                    Err(de::Error::invalid_value(
102                        Unexpected::Unsigned(value),
103                        &"positive integer",
104                    ))
105                }
106            }
107        }
108
109        deserializer.deserialize_any(UsizeOrAdaptive)
110    }
111}
112
113// TODO: Consider an approach for generating schema of "string or number" structure used by this type.
114impl Configurable for Concurrency {
115    fn referenceable_name() -> Option<&'static str> {
116        Some(std::any::type_name::<Self>())
117    }
118
119    fn metadata() -> Metadata {
120        let mut metadata = Metadata::default();
121        metadata.set_description(
122            r"Configuration for outbound request concurrency.
123
124This can be set either to one of the below enum values or to a positive integer, which denotes
125a fixed concurrency limit.",
126        );
127        metadata.add_custom_attribute(CustomAttribute::kv("docs::enum_tagging", "external"));
128        metadata
129    }
130
131    fn generate_schema(_: &RefCell<SchemaGenerator>) -> Result<SchemaObject, GenerateError> {
132        let mut none_schema = generate_const_string_schema("none".to_string());
133        let mut none_metadata = Metadata::with_title("A fixed concurrency of 1.");
134        none_metadata.set_description("Only one request can be outstanding at any given time.");
135        none_metadata.add_custom_attribute(CustomAttribute::kv("logical_name", "None"));
136        apply_base_metadata(&mut none_schema, none_metadata);
137
138        let mut adaptive_schema = generate_const_string_schema("adaptive".to_string());
139        let mut adaptive_metadata = Metadata::with_title(
140            "Concurrency is managed by Vector's [Adaptive Request Concurrency][arc] feature.",
141        );
142        adaptive_metadata.set_description("[arc]: https://vector.dev/docs/architecture/arc/");
143        adaptive_metadata.add_custom_attribute(CustomAttribute::kv("logical_name", "Adaptive"));
144        apply_base_metadata(&mut adaptive_schema, adaptive_metadata);
145
146        let mut fixed_schema = generate_number_schema::<usize>();
147        let mut fixed_metadata =
148            Metadata::with_description("A fixed amount of concurrency will be allowed.");
149        fixed_metadata.set_transparent();
150        fixed_metadata.add_custom_attribute(CustomAttribute::kv("docs::numeric_type", "uint"));
151        fixed_metadata.add_custom_attribute(CustomAttribute::kv("logical_name", "Fixed"));
152        apply_base_metadata(&mut fixed_schema, fixed_metadata);
153
154        Ok(generate_one_of_schema(&[
155            none_schema,
156            adaptive_schema,
157            fixed_schema,
158        ]))
159    }
160}
161
162impl ToValue for Concurrency {
163    fn to_value(&self) -> Value {
164        serde_json::to_value(self).expect("Could not convert concurrency to JSON")
165    }
166}
167
168#[test]
169fn is_serialization_reversible() {
170    let variants = [
171        Concurrency::None,
172        Concurrency::Adaptive,
173        Concurrency::Fixed(8),
174    ];
175
176    for v in variants {
177        let value = serde_json::to_value(v).unwrap();
178        let deserialized = serde_json::from_value::<Concurrency>(value)
179            .expect("Failed to deserialize a previously serialized Concurrency value");
180
181        assert_eq!(v, deserialized)
182    }
183}