1use std::{
2 cell::RefCell,
3 collections::{HashMap, HashSet},
4 path::PathBuf,
5};
6
7use async_trait::async_trait;
8use dyn_clone::DynClone;
9use serde::Serialize;
10use vector_lib::{
11 config::{GlobalOptions, Input, LogNamespace, TransformOutput},
12 configurable::{
13 Configurable, GenerateError, Metadata, NamedComponent,
14 attributes::CustomAttribute,
15 configurable_component,
16 schema::{SchemaGenerator, SchemaObject},
17 },
18 id::Inputs,
19 schema,
20 transform::Transform,
21};
22
23use super::{ComponentKey, OutputId, dot_graph::GraphConfig, schema::Options as SchemaOptions};
24use crate::extra_context::ExtraContext;
25
26pub type BoxedTransform = Box<dyn TransformConfig>;
27
28impl Configurable for BoxedTransform {
29 fn referenceable_name() -> Option<&'static str> {
30 Some("vector::transforms::Transforms")
31 }
32
33 fn metadata() -> Metadata {
34 let mut metadata = Metadata::default();
35 metadata.set_description("Configurable transforms in Vector.");
36 metadata.add_custom_attribute(CustomAttribute::kv("docs::enum_tagging", "internal"));
37 metadata.add_custom_attribute(CustomAttribute::kv("docs::enum_tag_field", "type"));
38 metadata
39 }
40
41 fn generate_schema(
42 generator: &RefCell<SchemaGenerator>,
43 ) -> Result<SchemaObject, GenerateError> {
44 vector_lib::configurable::component::TransformDescription::generate_schemas(generator)
45 }
46}
47
48impl<T: TransformConfig + 'static> From<T> for BoxedTransform {
49 fn from(that: T) -> Self {
50 Box::new(that)
51 }
52}
53
54#[configurable_component]
56#[configurable(metadata(docs::component_base_type = "transform"))]
57#[derive(Clone, Debug)]
58pub struct TransformOuter<T>
59where
60 T: Configurable + Serialize + 'static,
61{
62 #[configurable(derived)]
63 #[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
64 pub graph: GraphConfig,
65
66 #[configurable(derived)]
67 pub inputs: Inputs<T>,
68
69 #[configurable(metadata(docs::hidden))]
70 #[serde(flatten)]
71 pub inner: BoxedTransform,
72}
73
74impl<T> TransformOuter<T>
75where
76 T: Configurable + Serialize,
77{
78 pub(crate) fn new<I, IT>(inputs: I, inner: IT) -> Self
79 where
80 I: IntoIterator<Item = T>,
81 IT: Into<BoxedTransform>,
82 {
83 let inputs = Inputs::from_iter(inputs);
84 let inner = inner.into();
85 TransformOuter {
86 inputs,
87 inner,
88 graph: Default::default(),
89 }
90 }
91
92 pub(super) fn map_inputs<U>(self, f: impl Fn(&T) -> U) -> TransformOuter<U>
93 where
94 U: Configurable + Serialize,
95 {
96 let inputs = self.inputs.iter().map(f).collect::<Vec<_>>();
97 self.with_inputs(inputs)
98 }
99
100 pub(crate) fn with_inputs<I, U>(self, inputs: I) -> TransformOuter<U>
101 where
102 I: IntoIterator<Item = U>,
103 U: Configurable + Serialize,
104 {
105 TransformOuter {
106 inputs: Inputs::from_iter(inputs),
107 inner: self.inner,
108 graph: self.graph,
109 }
110 }
111}
112
113pub struct TransformContext {
114 pub key: Option<ComponentKey>,
118
119 pub globals: GlobalOptions,
120
121 pub enrichment_tables: vector_lib::enrichment::TableRegistry,
122
123 pub schema_definitions: HashMap<Option<String>, HashMap<OutputId, schema::Definition>>,
128
129 pub merged_schema_definition: schema::Definition,
135
136 pub schema: SchemaOptions,
137
138 pub extra_context: ExtraContext,
141}
142
143impl Default for TransformContext {
144 fn default() -> Self {
145 Self {
146 key: Default::default(),
147 globals: Default::default(),
148 enrichment_tables: Default::default(),
149 schema_definitions: HashMap::from([(None, HashMap::new())]),
150 merged_schema_definition: schema::Definition::any(),
151 schema: SchemaOptions::default(),
152 extra_context: Default::default(),
153 }
154 }
155}
156
157impl TransformContext {
158 #[allow(clippy::needless_update)]
161 pub fn new_with_globals(globals: GlobalOptions) -> Self {
162 Self {
163 globals,
164 ..Default::default()
165 }
166 }
167
168 #[cfg(test)]
169 pub fn new_test(
170 schema_definitions: HashMap<Option<String>, HashMap<OutputId, schema::Definition>>,
171 ) -> Self {
172 Self {
173 schema_definitions,
174 ..Default::default()
175 }
176 }
177
178 pub fn log_namespace(&self, namespace: Option<bool>) -> LogNamespace {
184 namespace
185 .or(self.schema.log_namespace)
186 .unwrap_or(false)
187 .into()
188 }
189}
190
191#[async_trait]
193#[typetag::serde(tag = "type")]
194pub trait TransformConfig: DynClone + NamedComponent + core::fmt::Debug + Send + Sync {
195 async fn build(&self, globals: &TransformContext) -> crate::Result<Transform>;
204
205 fn input(&self) -> Input;
207
208 fn outputs(
213 &self,
214 enrichment_tables: vector_lib::enrichment::TableRegistry,
215 input_definitions: &[(OutputId, schema::Definition)],
216
217 global_log_namespace: LogNamespace,
220 ) -> Vec<TransformOutput>;
221
222 fn validate(&self, _merged_definition: &schema::Definition) -> Result<(), Vec<String>> {
232 Ok(())
233 }
234
235 fn enable_concurrency(&self) -> bool {
242 false
243 }
244
245 fn nestable(&self, _parents: &HashSet<&'static str>) -> bool {
256 true
257 }
258
259 fn files_to_watch(&self) -> Vec<&PathBuf> {
261 Vec::new()
262 }
263}
264
265dyn_clone::clone_trait_object!(TransformConfig);
266
267pub fn get_transform_output_ids<T: TransformConfig + ?Sized>(
270 transform: &T,
271 key: ComponentKey,
272 global_log_namespace: LogNamespace,
273) -> impl Iterator<Item = OutputId> + '_ {
274 transform
275 .outputs(
276 vector_lib::enrichment::TableRegistry::default(),
277 &[(key.clone().into(), schema::Definition::any())],
278 global_log_namespace,
279 )
280 .into_iter()
281 .map(move |output| OutputId {
282 component: key.clone(),
283 port: output.port,
284 })
285}