vector/sinks/util/service/
concurrency.rs

1use std::{cell::RefCell, fmt};
2
3use serde::Serializer;
4use serde_json::Value;
5use vector_lib::configurable::attributes::CustomAttribute;
6use vector_lib::configurable::{
7    schema::{
8        apply_base_metadata, generate_const_string_schema, generate_number_schema,
9        generate_one_of_schema, SchemaGenerator, SchemaObject,
10    },
11    Configurable, GenerateError, Metadata, ToValue,
12};
13
14use serde::{
15    de::{self, Unexpected, Visitor},
16    Deserialize, Deserializer, Serialize,
17};
18
19/// Configuration for outbound request concurrency.
20///
21/// This can be set either to one of the below enum values or to a positive integer, which denotes
22/// a fixed concurrency limit.
23#[derive(Clone, Copy, Debug, Derivative, Eq, PartialEq)]
24pub enum Concurrency {
25    /// A fixed concurrency of 1.
26    ///
27    /// Only one request can be outstanding at any given time.
28    None,
29
30    /// Concurrency is managed by the [Adaptive Request Concurrency][arc] feature.
31    ///
32    /// [arc]: https://vector.dev/docs/architecture/arc/
33    Adaptive,
34
35    /// A fixed amount of concurrency is allowed.
36    Fixed(usize),
37}
38
39impl Serialize for Concurrency {
40    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
41    where
42        S: Serializer,
43    {
44        match &self {
45            Concurrency::None => serializer.serialize_str("none"),
46            Concurrency::Adaptive => serializer.serialize_str("adaptive"),
47            Concurrency::Fixed(i) => serializer.serialize_u64(*i as u64),
48        }
49    }
50}
51
52impl Default for Concurrency {
53    fn default() -> Self {
54        Self::Adaptive
55    }
56}
57
58impl Concurrency {
59    pub const fn parse_concurrency(&self) -> Option<usize> {
60        match self {
61            Concurrency::None => Some(1),
62            Concurrency::Adaptive => None,
63            Concurrency::Fixed(limit) => Some(*limit),
64        }
65    }
66}
67
68impl<'de> Deserialize<'de> for Concurrency {
69    // Deserialize either a positive integer or the string "adaptive"
70    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
71    where
72        D: Deserializer<'de>,
73    {
74        struct UsizeOrAdaptive;
75
76        impl Visitor<'_> for UsizeOrAdaptive {
77            type Value = Concurrency;
78
79            fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
80                formatter.write_str(r#"positive integer, "adaptive", or "none" "#)
81            }
82
83            fn visit_str<E: de::Error>(self, value: &str) -> Result<Concurrency, E> {
84                if value == "adaptive" {
85                    Ok(Concurrency::Adaptive)
86                } else if value == "none" {
87                    Ok(Concurrency::None)
88                } else {
89                    Err(de::Error::unknown_variant(value, &["adaptive", "none"]))
90                }
91            }
92
93            fn visit_i64<E: de::Error>(self, value: i64) -> Result<Concurrency, E> {
94                if value > 0 {
95                    Ok(Concurrency::Fixed(value as usize))
96                } else {
97                    Err(de::Error::invalid_value(
98                        Unexpected::Signed(value),
99                        &"positive integer",
100                    ))
101                }
102            }
103
104            fn visit_u64<E: de::Error>(self, value: u64) -> Result<Concurrency, E> {
105                if value > 0 {
106                    Ok(Concurrency::Fixed(value as usize))
107                } else {
108                    Err(de::Error::invalid_value(
109                        Unexpected::Unsigned(value),
110                        &"positive integer",
111                    ))
112                }
113            }
114        }
115
116        deserializer.deserialize_any(UsizeOrAdaptive)
117    }
118}
119
120// TODO: Consider an approach for generating schema of "string or number" structure used by this type.
121impl Configurable for Concurrency {
122    fn referenceable_name() -> Option<&'static str> {
123        Some(std::any::type_name::<Self>())
124    }
125
126    fn metadata() -> Metadata {
127        let mut metadata = Metadata::default();
128        metadata.set_description(
129            r"Configuration for outbound request concurrency.
130
131This can be set either to one of the below enum values or to a positive integer, which denotes
132a fixed concurrency limit.",
133        );
134        metadata.add_custom_attribute(CustomAttribute::kv("docs::enum_tagging", "external"));
135        metadata
136    }
137
138    fn generate_schema(_: &RefCell<SchemaGenerator>) -> Result<SchemaObject, GenerateError> {
139        let mut none_schema = generate_const_string_schema("none".to_string());
140        let mut none_metadata = Metadata::with_title("A fixed concurrency of 1.");
141        none_metadata.set_description("Only one request can be outstanding at any given time.");
142        none_metadata.add_custom_attribute(CustomAttribute::kv("logical_name", "None"));
143        apply_base_metadata(&mut none_schema, none_metadata);
144
145        let mut adaptive_schema = generate_const_string_schema("adaptive".to_string());
146        let mut adaptive_metadata = Metadata::with_title(
147            "Concurrency is managed by Vector's [Adaptive Request Concurrency][arc] feature.",
148        );
149        adaptive_metadata.set_description("[arc]: https://vector.dev/docs/architecture/arc/");
150        adaptive_metadata.add_custom_attribute(CustomAttribute::kv("logical_name", "Adaptive"));
151        apply_base_metadata(&mut adaptive_schema, adaptive_metadata);
152
153        let mut fixed_schema = generate_number_schema::<usize>();
154        let mut fixed_metadata =
155            Metadata::with_description("A fixed amount of concurrency will be allowed.");
156        fixed_metadata.set_transparent();
157        fixed_metadata.add_custom_attribute(CustomAttribute::kv("docs::numeric_type", "uint"));
158        fixed_metadata.add_custom_attribute(CustomAttribute::kv("logical_name", "Fixed"));
159        apply_base_metadata(&mut fixed_schema, fixed_metadata);
160
161        Ok(generate_one_of_schema(&[
162            none_schema,
163            adaptive_schema,
164            fixed_schema,
165        ]))
166    }
167}
168
169impl ToValue for Concurrency {
170    fn to_value(&self) -> Value {
171        serde_json::to_value(self).expect("Could not convert concurrency to JSON")
172    }
173}
174
175#[test]
176fn is_serialization_reversible() {
177    let variants = [
178        Concurrency::None,
179        Concurrency::Adaptive,
180        Concurrency::Fixed(8),
181    ];
182
183    for v in variants {
184        let value = serde_json::to_value(v).unwrap();
185        let deserialized = serde_json::from_value::<Concurrency>(value)
186            .expect("Failed to deserialize a previously serialized Concurrency value");
187
188        assert_eq!(v, deserialized)
189    }
190}