vector/sinks/util/service/
concurrency.rs1use std::{cell::RefCell, fmt};
2
3use serde::Serializer;
4use serde_json::Value;
5use vector_lib::configurable::attributes::CustomAttribute;
6use vector_lib::configurable::{
7 schema::{
8 apply_base_metadata, generate_const_string_schema, generate_number_schema,
9 generate_one_of_schema, SchemaGenerator, SchemaObject,
10 },
11 Configurable, GenerateError, Metadata, ToValue,
12};
13
14use serde::{
15 de::{self, Unexpected, Visitor},
16 Deserialize, Deserializer, Serialize,
17};
18
19#[derive(Clone, Copy, Debug, Derivative, Eq, PartialEq)]
24pub enum Concurrency {
25 None,
29
30 Adaptive,
34
35 Fixed(usize),
37}
38
39impl Serialize for Concurrency {
40 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
41 where
42 S: Serializer,
43 {
44 match &self {
45 Concurrency::None => serializer.serialize_str("none"),
46 Concurrency::Adaptive => serializer.serialize_str("adaptive"),
47 Concurrency::Fixed(i) => serializer.serialize_u64(*i as u64),
48 }
49 }
50}
51
52impl Default for Concurrency {
53 fn default() -> Self {
54 Self::Adaptive
55 }
56}
57
58impl Concurrency {
59 pub const fn parse_concurrency(&self) -> Option<usize> {
60 match self {
61 Concurrency::None => Some(1),
62 Concurrency::Adaptive => None,
63 Concurrency::Fixed(limit) => Some(*limit),
64 }
65 }
66}
67
68impl<'de> Deserialize<'de> for Concurrency {
69 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
71 where
72 D: Deserializer<'de>,
73 {
74 struct UsizeOrAdaptive;
75
76 impl Visitor<'_> for UsizeOrAdaptive {
77 type Value = Concurrency;
78
79 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
80 formatter.write_str(r#"positive integer, "adaptive", or "none" "#)
81 }
82
83 fn visit_str<E: de::Error>(self, value: &str) -> Result<Concurrency, E> {
84 if value == "adaptive" {
85 Ok(Concurrency::Adaptive)
86 } else if value == "none" {
87 Ok(Concurrency::None)
88 } else {
89 Err(de::Error::unknown_variant(value, &["adaptive", "none"]))
90 }
91 }
92
93 fn visit_i64<E: de::Error>(self, value: i64) -> Result<Concurrency, E> {
94 if value > 0 {
95 Ok(Concurrency::Fixed(value as usize))
96 } else {
97 Err(de::Error::invalid_value(
98 Unexpected::Signed(value),
99 &"positive integer",
100 ))
101 }
102 }
103
104 fn visit_u64<E: de::Error>(self, value: u64) -> Result<Concurrency, E> {
105 if value > 0 {
106 Ok(Concurrency::Fixed(value as usize))
107 } else {
108 Err(de::Error::invalid_value(
109 Unexpected::Unsigned(value),
110 &"positive integer",
111 ))
112 }
113 }
114 }
115
116 deserializer.deserialize_any(UsizeOrAdaptive)
117 }
118}
119
120impl Configurable for Concurrency {
122 fn referenceable_name() -> Option<&'static str> {
123 Some(std::any::type_name::<Self>())
124 }
125
126 fn metadata() -> Metadata {
127 let mut metadata = Metadata::default();
128 metadata.set_description(
129 r"Configuration for outbound request concurrency.
130
131This can be set either to one of the below enum values or to a positive integer, which denotes
132a fixed concurrency limit.",
133 );
134 metadata.add_custom_attribute(CustomAttribute::kv("docs::enum_tagging", "external"));
135 metadata
136 }
137
138 fn generate_schema(_: &RefCell<SchemaGenerator>) -> Result<SchemaObject, GenerateError> {
139 let mut none_schema = generate_const_string_schema("none".to_string());
140 let mut none_metadata = Metadata::with_title("A fixed concurrency of 1.");
141 none_metadata.set_description("Only one request can be outstanding at any given time.");
142 none_metadata.add_custom_attribute(CustomAttribute::kv("logical_name", "None"));
143 apply_base_metadata(&mut none_schema, none_metadata);
144
145 let mut adaptive_schema = generate_const_string_schema("adaptive".to_string());
146 let mut adaptive_metadata = Metadata::with_title(
147 "Concurrency is managed by Vector's [Adaptive Request Concurrency][arc] feature.",
148 );
149 adaptive_metadata.set_description("[arc]: https://vector.dev/docs/architecture/arc/");
150 adaptive_metadata.add_custom_attribute(CustomAttribute::kv("logical_name", "Adaptive"));
151 apply_base_metadata(&mut adaptive_schema, adaptive_metadata);
152
153 let mut fixed_schema = generate_number_schema::<usize>();
154 let mut fixed_metadata =
155 Metadata::with_description("A fixed amount of concurrency will be allowed.");
156 fixed_metadata.set_transparent();
157 fixed_metadata.add_custom_attribute(CustomAttribute::kv("docs::numeric_type", "uint"));
158 fixed_metadata.add_custom_attribute(CustomAttribute::kv("logical_name", "Fixed"));
159 apply_base_metadata(&mut fixed_schema, fixed_metadata);
160
161 Ok(generate_one_of_schema(&[
162 none_schema,
163 adaptive_schema,
164 fixed_schema,
165 ]))
166 }
167}
168
169impl ToValue for Concurrency {
170 fn to_value(&self) -> Value {
171 serde_json::to_value(self).expect("Could not convert concurrency to JSON")
172 }
173}
174
175#[test]
176fn is_serialization_reversible() {
177 let variants = [
178 Concurrency::None,
179 Concurrency::Adaptive,
180 Concurrency::Fixed(8),
181 ];
182
183 for v in variants {
184 let value = serde_json::to_value(v).unwrap();
185 let deserialized = serde_json::from_value::<Concurrency>(value)
186 .expect("Failed to deserialize a previously serialized Concurrency value");
187
188 assert_eq!(v, deserialized)
189 }
190}