vector/sinks/util/service/
concurrency.rs1use std::{cell::RefCell, fmt};
2
3use serde::{
4 Deserialize, Deserializer, Serialize, Serializer,
5 de::{self, Unexpected, Visitor},
6};
7use serde_json::Value;
8use vector_lib::configurable::{
9 Configurable, GenerateError, Metadata, ToValue,
10 attributes::CustomAttribute,
11 schema::{
12 SchemaGenerator, SchemaObject, apply_base_metadata, generate_const_string_schema,
13 generate_number_schema, generate_one_of_schema,
14 },
15};
16
17#[derive(Clone, Copy, Debug, Derivative, Eq, PartialEq, Default)]
22pub enum Concurrency {
23 None,
27
28 #[default]
32 Adaptive,
33
34 Fixed(usize),
36}
37
38impl Serialize for Concurrency {
39 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
40 where
41 S: Serializer,
42 {
43 match &self {
44 Concurrency::None => serializer.serialize_str("none"),
45 Concurrency::Adaptive => serializer.serialize_str("adaptive"),
46 Concurrency::Fixed(i) => serializer.serialize_u64(*i as u64),
47 }
48 }
49}
50
51impl Concurrency {
52 pub const fn parse_concurrency(&self) -> Option<usize> {
53 match self {
54 Concurrency::None => Some(1),
55 Concurrency::Adaptive => None,
56 Concurrency::Fixed(limit) => Some(*limit),
57 }
58 }
59}
60
61impl<'de> Deserialize<'de> for Concurrency {
62 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
64 where
65 D: Deserializer<'de>,
66 {
67 struct UsizeOrAdaptive;
68
69 impl Visitor<'_> for UsizeOrAdaptive {
70 type Value = Concurrency;
71
72 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
73 formatter.write_str(r#"positive integer, "adaptive", or "none" "#)
74 }
75
76 fn visit_str<E: de::Error>(self, value: &str) -> Result<Concurrency, E> {
77 if value == "adaptive" {
78 Ok(Concurrency::Adaptive)
79 } else if value == "none" {
80 Ok(Concurrency::None)
81 } else {
82 Err(de::Error::unknown_variant(value, &["adaptive", "none"]))
83 }
84 }
85
86 fn visit_i64<E: de::Error>(self, value: i64) -> Result<Concurrency, E> {
87 if value > 0 {
88 Ok(Concurrency::Fixed(value as usize))
89 } else {
90 Err(de::Error::invalid_value(
91 Unexpected::Signed(value),
92 &"positive integer",
93 ))
94 }
95 }
96
97 fn visit_u64<E: de::Error>(self, value: u64) -> Result<Concurrency, E> {
98 if value > 0 {
99 Ok(Concurrency::Fixed(value as usize))
100 } else {
101 Err(de::Error::invalid_value(
102 Unexpected::Unsigned(value),
103 &"positive integer",
104 ))
105 }
106 }
107 }
108
109 deserializer.deserialize_any(UsizeOrAdaptive)
110 }
111}
112
113impl Configurable for Concurrency {
115 fn referenceable_name() -> Option<&'static str> {
116 Some(std::any::type_name::<Self>())
117 }
118
119 fn metadata() -> Metadata {
120 let mut metadata = Metadata::default();
121 metadata.set_description(
122 r"Configuration for outbound request concurrency.
123
124This can be set either to one of the below enum values or to a positive integer, which denotes
125a fixed concurrency limit.",
126 );
127 metadata.add_custom_attribute(CustomAttribute::kv("docs::enum_tagging", "external"));
128 metadata
129 }
130
131 fn generate_schema(_: &RefCell<SchemaGenerator>) -> Result<SchemaObject, GenerateError> {
132 let mut none_schema = generate_const_string_schema("none".to_string());
133 let mut none_metadata = Metadata::with_title("A fixed concurrency of 1.");
134 none_metadata.set_description("Only one request can be outstanding at any given time.");
135 none_metadata.add_custom_attribute(CustomAttribute::kv("logical_name", "None"));
136 apply_base_metadata(&mut none_schema, none_metadata);
137
138 let mut adaptive_schema = generate_const_string_schema("adaptive".to_string());
139 let mut adaptive_metadata = Metadata::with_title(
140 "Concurrency is managed by Vector's [Adaptive Request Concurrency][arc] feature.",
141 );
142 adaptive_metadata.set_description("[arc]: https://vector.dev/docs/architecture/arc/");
143 adaptive_metadata.add_custom_attribute(CustomAttribute::kv("logical_name", "Adaptive"));
144 apply_base_metadata(&mut adaptive_schema, adaptive_metadata);
145
146 let mut fixed_schema = generate_number_schema::<usize>();
147 let mut fixed_metadata =
148 Metadata::with_description("A fixed amount of concurrency will be allowed.");
149 fixed_metadata.set_transparent();
150 fixed_metadata.add_custom_attribute(CustomAttribute::kv("docs::numeric_type", "uint"));
151 fixed_metadata.add_custom_attribute(CustomAttribute::kv("logical_name", "Fixed"));
152 apply_base_metadata(&mut fixed_schema, fixed_metadata);
153
154 Ok(generate_one_of_schema(&[
155 none_schema,
156 adaptive_schema,
157 fixed_schema,
158 ]))
159 }
160}
161
162impl ToValue for Concurrency {
163 fn to_value(&self) -> Value {
164 serde_json::to_value(self).expect("Could not convert concurrency to JSON")
165 }
166}
167
168#[test]
169fn is_serialization_reversible() {
170 let variants = [
171 Concurrency::None,
172 Concurrency::Adaptive,
173 Concurrency::Fixed(8),
174 ];
175
176 for v in variants {
177 let value = serde_json::to_value(v).unwrap();
178 let deserialized = serde_json::from_value::<Concurrency>(value)
179 .expect("Failed to deserialize a previously serialized Concurrency value");
180
181 assert_eq!(v, deserialized)
182 }
183}