vector/sinks/util/service/
concurrency.rs1use std::{cell::RefCell, fmt};
2
3use serde::{
4 Deserialize, Deserializer, Serialize, Serializer,
5 de::{self, Unexpected, Visitor},
6};
7use serde_json::Value;
8use vector_lib::configurable::{
9 Configurable, GenerateError, Metadata, ToValue,
10 attributes::CustomAttribute,
11 schema::{
12 SchemaGenerator, SchemaObject, apply_base_metadata, generate_const_string_schema,
13 generate_number_schema, generate_one_of_schema,
14 },
15};
16
17#[derive(Clone, Copy, Debug, Derivative, Eq, PartialEq)]
22pub enum Concurrency {
23 None,
27
28 Adaptive,
32
33 Fixed(usize),
35}
36
37impl Serialize for Concurrency {
38 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
39 where
40 S: Serializer,
41 {
42 match &self {
43 Concurrency::None => serializer.serialize_str("none"),
44 Concurrency::Adaptive => serializer.serialize_str("adaptive"),
45 Concurrency::Fixed(i) => serializer.serialize_u64(*i as u64),
46 }
47 }
48}
49
50impl Default for Concurrency {
51 fn default() -> Self {
52 Self::Adaptive
53 }
54}
55
56impl Concurrency {
57 pub const fn parse_concurrency(&self) -> Option<usize> {
58 match self {
59 Concurrency::None => Some(1),
60 Concurrency::Adaptive => None,
61 Concurrency::Fixed(limit) => Some(*limit),
62 }
63 }
64}
65
66impl<'de> Deserialize<'de> for Concurrency {
67 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
69 where
70 D: Deserializer<'de>,
71 {
72 struct UsizeOrAdaptive;
73
74 impl Visitor<'_> for UsizeOrAdaptive {
75 type Value = Concurrency;
76
77 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
78 formatter.write_str(r#"positive integer, "adaptive", or "none" "#)
79 }
80
81 fn visit_str<E: de::Error>(self, value: &str) -> Result<Concurrency, E> {
82 if value == "adaptive" {
83 Ok(Concurrency::Adaptive)
84 } else if value == "none" {
85 Ok(Concurrency::None)
86 } else {
87 Err(de::Error::unknown_variant(value, &["adaptive", "none"]))
88 }
89 }
90
91 fn visit_i64<E: de::Error>(self, value: i64) -> Result<Concurrency, E> {
92 if value > 0 {
93 Ok(Concurrency::Fixed(value as usize))
94 } else {
95 Err(de::Error::invalid_value(
96 Unexpected::Signed(value),
97 &"positive integer",
98 ))
99 }
100 }
101
102 fn visit_u64<E: de::Error>(self, value: u64) -> Result<Concurrency, E> {
103 if value > 0 {
104 Ok(Concurrency::Fixed(value as usize))
105 } else {
106 Err(de::Error::invalid_value(
107 Unexpected::Unsigned(value),
108 &"positive integer",
109 ))
110 }
111 }
112 }
113
114 deserializer.deserialize_any(UsizeOrAdaptive)
115 }
116}
117
118impl Configurable for Concurrency {
120 fn referenceable_name() -> Option<&'static str> {
121 Some(std::any::type_name::<Self>())
122 }
123
124 fn metadata() -> Metadata {
125 let mut metadata = Metadata::default();
126 metadata.set_description(
127 r"Configuration for outbound request concurrency.
128
129This can be set either to one of the below enum values or to a positive integer, which denotes
130a fixed concurrency limit.",
131 );
132 metadata.add_custom_attribute(CustomAttribute::kv("docs::enum_tagging", "external"));
133 metadata
134 }
135
136 fn generate_schema(_: &RefCell<SchemaGenerator>) -> Result<SchemaObject, GenerateError> {
137 let mut none_schema = generate_const_string_schema("none".to_string());
138 let mut none_metadata = Metadata::with_title("A fixed concurrency of 1.");
139 none_metadata.set_description("Only one request can be outstanding at any given time.");
140 none_metadata.add_custom_attribute(CustomAttribute::kv("logical_name", "None"));
141 apply_base_metadata(&mut none_schema, none_metadata);
142
143 let mut adaptive_schema = generate_const_string_schema("adaptive".to_string());
144 let mut adaptive_metadata = Metadata::with_title(
145 "Concurrency is managed by Vector's [Adaptive Request Concurrency][arc] feature.",
146 );
147 adaptive_metadata.set_description("[arc]: https://vector.dev/docs/architecture/arc/");
148 adaptive_metadata.add_custom_attribute(CustomAttribute::kv("logical_name", "Adaptive"));
149 apply_base_metadata(&mut adaptive_schema, adaptive_metadata);
150
151 let mut fixed_schema = generate_number_schema::<usize>();
152 let mut fixed_metadata =
153 Metadata::with_description("A fixed amount of concurrency will be allowed.");
154 fixed_metadata.set_transparent();
155 fixed_metadata.add_custom_attribute(CustomAttribute::kv("docs::numeric_type", "uint"));
156 fixed_metadata.add_custom_attribute(CustomAttribute::kv("logical_name", "Fixed"));
157 apply_base_metadata(&mut fixed_schema, fixed_metadata);
158
159 Ok(generate_one_of_schema(&[
160 none_schema,
161 adaptive_schema,
162 fixed_schema,
163 ]))
164 }
165}
166
167impl ToValue for Concurrency {
168 fn to_value(&self) -> Value {
169 serde_json::to_value(self).expect("Could not convert concurrency to JSON")
170 }
171}
172
173#[test]
174fn is_serialization_reversible() {
175 let variants = [
176 Concurrency::None,
177 Concurrency::Adaptive,
178 Concurrency::Fixed(8),
179 ];
180
181 for v in variants {
182 let value = serde_json::to_value(v).unwrap();
183 let deserialized = serde_json::from_value::<Concurrency>(value)
184 .expect("Failed to deserialize a previously serialized Concurrency value");
185
186 assert_eq!(v, deserialized)
187 }
188}