vector/sinks/util/service/
concurrency.rs

1use std::{cell::RefCell, fmt};
2
3use serde::{
4    Deserialize, Deserializer, Serialize, Serializer,
5    de::{self, Unexpected, Visitor},
6};
7use serde_json::Value;
8use vector_lib::configurable::{
9    Configurable, GenerateError, Metadata, ToValue,
10    attributes::CustomAttribute,
11    schema::{
12        SchemaGenerator, SchemaObject, apply_base_metadata, generate_const_string_schema,
13        generate_number_schema, generate_one_of_schema,
14    },
15};
16
17/// Configuration for outbound request concurrency.
18///
19/// This can be set either to one of the below enum values or to a positive integer, which denotes
20/// a fixed concurrency limit.
21#[derive(Clone, Copy, Debug, Derivative, Eq, PartialEq)]
22pub enum Concurrency {
23    /// A fixed concurrency of 1.
24    ///
25    /// Only one request can be outstanding at any given time.
26    None,
27
28    /// Concurrency is managed by the [Adaptive Request Concurrency][arc] feature.
29    ///
30    /// [arc]: https://vector.dev/docs/architecture/arc/
31    Adaptive,
32
33    /// A fixed amount of concurrency is allowed.
34    Fixed(usize),
35}
36
37impl Serialize for Concurrency {
38    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
39    where
40        S: Serializer,
41    {
42        match &self {
43            Concurrency::None => serializer.serialize_str("none"),
44            Concurrency::Adaptive => serializer.serialize_str("adaptive"),
45            Concurrency::Fixed(i) => serializer.serialize_u64(*i as u64),
46        }
47    }
48}
49
50impl Default for Concurrency {
51    fn default() -> Self {
52        Self::Adaptive
53    }
54}
55
56impl Concurrency {
57    pub const fn parse_concurrency(&self) -> Option<usize> {
58        match self {
59            Concurrency::None => Some(1),
60            Concurrency::Adaptive => None,
61            Concurrency::Fixed(limit) => Some(*limit),
62        }
63    }
64}
65
66impl<'de> Deserialize<'de> for Concurrency {
67    // Deserialize either a positive integer or the string "adaptive"
68    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
69    where
70        D: Deserializer<'de>,
71    {
72        struct UsizeOrAdaptive;
73
74        impl Visitor<'_> for UsizeOrAdaptive {
75            type Value = Concurrency;
76
77            fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
78                formatter.write_str(r#"positive integer, "adaptive", or "none" "#)
79            }
80
81            fn visit_str<E: de::Error>(self, value: &str) -> Result<Concurrency, E> {
82                if value == "adaptive" {
83                    Ok(Concurrency::Adaptive)
84                } else if value == "none" {
85                    Ok(Concurrency::None)
86                } else {
87                    Err(de::Error::unknown_variant(value, &["adaptive", "none"]))
88                }
89            }
90
91            fn visit_i64<E: de::Error>(self, value: i64) -> Result<Concurrency, E> {
92                if value > 0 {
93                    Ok(Concurrency::Fixed(value as usize))
94                } else {
95                    Err(de::Error::invalid_value(
96                        Unexpected::Signed(value),
97                        &"positive integer",
98                    ))
99                }
100            }
101
102            fn visit_u64<E: de::Error>(self, value: u64) -> Result<Concurrency, E> {
103                if value > 0 {
104                    Ok(Concurrency::Fixed(value as usize))
105                } else {
106                    Err(de::Error::invalid_value(
107                        Unexpected::Unsigned(value),
108                        &"positive integer",
109                    ))
110                }
111            }
112        }
113
114        deserializer.deserialize_any(UsizeOrAdaptive)
115    }
116}
117
118// TODO: Consider an approach for generating schema of "string or number" structure used by this type.
119impl Configurable for Concurrency {
120    fn referenceable_name() -> Option<&'static str> {
121        Some(std::any::type_name::<Self>())
122    }
123
124    fn metadata() -> Metadata {
125        let mut metadata = Metadata::default();
126        metadata.set_description(
127            r"Configuration for outbound request concurrency.
128
129This can be set either to one of the below enum values or to a positive integer, which denotes
130a fixed concurrency limit.",
131        );
132        metadata.add_custom_attribute(CustomAttribute::kv("docs::enum_tagging", "external"));
133        metadata
134    }
135
136    fn generate_schema(_: &RefCell<SchemaGenerator>) -> Result<SchemaObject, GenerateError> {
137        let mut none_schema = generate_const_string_schema("none".to_string());
138        let mut none_metadata = Metadata::with_title("A fixed concurrency of 1.");
139        none_metadata.set_description("Only one request can be outstanding at any given time.");
140        none_metadata.add_custom_attribute(CustomAttribute::kv("logical_name", "None"));
141        apply_base_metadata(&mut none_schema, none_metadata);
142
143        let mut adaptive_schema = generate_const_string_schema("adaptive".to_string());
144        let mut adaptive_metadata = Metadata::with_title(
145            "Concurrency is managed by Vector's [Adaptive Request Concurrency][arc] feature.",
146        );
147        adaptive_metadata.set_description("[arc]: https://vector.dev/docs/architecture/arc/");
148        adaptive_metadata.add_custom_attribute(CustomAttribute::kv("logical_name", "Adaptive"));
149        apply_base_metadata(&mut adaptive_schema, adaptive_metadata);
150
151        let mut fixed_schema = generate_number_schema::<usize>();
152        let mut fixed_metadata =
153            Metadata::with_description("A fixed amount of concurrency will be allowed.");
154        fixed_metadata.set_transparent();
155        fixed_metadata.add_custom_attribute(CustomAttribute::kv("docs::numeric_type", "uint"));
156        fixed_metadata.add_custom_attribute(CustomAttribute::kv("logical_name", "Fixed"));
157        apply_base_metadata(&mut fixed_schema, fixed_metadata);
158
159        Ok(generate_one_of_schema(&[
160            none_schema,
161            adaptive_schema,
162            fixed_schema,
163        ]))
164    }
165}
166
167impl ToValue for Concurrency {
168    fn to_value(&self) -> Value {
169        serde_json::to_value(self).expect("Could not convert concurrency to JSON")
170    }
171}
172
173#[test]
174fn is_serialization_reversible() {
175    let variants = [
176        Concurrency::None,
177        Concurrency::Adaptive,
178        Concurrency::Fixed(8),
179    ];
180
181    for v in variants {
182        let value = serde_json::to_value(v).unwrap();
183        let deserialized = serde_json::from_value::<Concurrency>(value)
184            .expect("Failed to deserialize a previously serialized Concurrency value");
185
186        assert_eq!(v, deserialized)
187    }
188}