1use std::{
2 cell::RefCell,
3 collections::{HashMap, HashSet},
4 path::PathBuf,
5};
6
7use async_trait::async_trait;
8use dyn_clone::DynClone;
9use serde::Serialize;
10use vector_lib::{
11 config::{GlobalOptions, Input, LogNamespace, TransformOutput},
12 configurable::{
13 Configurable, GenerateError, Metadata, NamedComponent,
14 attributes::CustomAttribute,
15 configurable_component,
16 schema::{SchemaGenerator, SchemaObject},
17 },
18 id::Inputs,
19 schema,
20 transform::Transform,
21};
22use vector_vrl_metrics::MetricsStorage;
23
24use super::{ComponentKey, OutputId, dot_graph::GraphConfig, schema::Options as SchemaOptions};
25use crate::extra_context::ExtraContext;
26
27pub type BoxedTransform = Box<dyn TransformConfig>;
28
29impl Configurable for BoxedTransform {
30 fn referenceable_name() -> Option<&'static str> {
31 Some("vector::transforms::Transforms")
32 }
33
34 fn metadata() -> Metadata {
35 let mut metadata = Metadata::default();
36 metadata.set_description("Configurable transforms in Vector.");
37 metadata.add_custom_attribute(CustomAttribute::kv("docs::enum_tagging", "internal"));
38 metadata.add_custom_attribute(CustomAttribute::kv("docs::enum_tag_field", "type"));
39 metadata
40 }
41
42 fn generate_schema(
43 generator: &RefCell<SchemaGenerator>,
44 ) -> Result<SchemaObject, GenerateError> {
45 vector_lib::configurable::component::TransformDescription::generate_schemas(generator)
46 }
47}
48
49impl<T: TransformConfig + 'static> From<T> for BoxedTransform {
50 fn from(that: T) -> Self {
51 Box::new(that)
52 }
53}
54
55#[configurable_component]
57#[configurable(metadata(docs::component_base_type = "transform"))]
58#[derive(Clone, Debug)]
59pub struct TransformOuter<T>
60where
61 T: Configurable + Serialize + 'static,
62{
63 #[configurable(derived)]
64 #[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
65 pub graph: GraphConfig,
66
67 #[configurable(derived)]
68 pub inputs: Inputs<T>,
69
70 #[configurable(metadata(docs::hidden))]
71 #[serde(flatten)]
72 pub inner: BoxedTransform,
73}
74
75impl<T> TransformOuter<T>
76where
77 T: Configurable + Serialize,
78{
79 pub(crate) fn new<I, IT>(inputs: I, inner: IT) -> Self
80 where
81 I: IntoIterator<Item = T>,
82 IT: Into<BoxedTransform>,
83 {
84 let inputs = Inputs::from_iter(inputs);
85 let inner = inner.into();
86 TransformOuter {
87 inputs,
88 inner,
89 graph: Default::default(),
90 }
91 }
92
93 pub(super) fn map_inputs<U>(self, f: impl Fn(&T) -> U) -> TransformOuter<U>
94 where
95 U: Configurable + Serialize,
96 {
97 let inputs = self.inputs.iter().map(f).collect::<Vec<_>>();
98 self.with_inputs(inputs)
99 }
100
101 pub(crate) fn with_inputs<I, U>(self, inputs: I) -> TransformOuter<U>
102 where
103 I: IntoIterator<Item = U>,
104 U: Configurable + Serialize,
105 {
106 TransformOuter {
107 inputs: Inputs::from_iter(inputs),
108 inner: self.inner,
109 graph: self.graph,
110 }
111 }
112}
113
114pub struct TransformContext {
115 pub key: Option<ComponentKey>,
119
120 pub globals: GlobalOptions,
121
122 pub enrichment_tables: vector_lib::enrichment::TableRegistry,
123
124 pub metrics_storage: MetricsStorage,
125
126 pub schema_definitions: HashMap<Option<String>, HashMap<OutputId, schema::Definition>>,
131
132 pub merged_schema_definition: schema::Definition,
138
139 pub schema: SchemaOptions,
140
141 pub extra_context: ExtraContext,
144}
145
146impl Default for TransformContext {
147 fn default() -> Self {
148 Self {
149 key: Default::default(),
150 globals: Default::default(),
151 enrichment_tables: Default::default(),
152 metrics_storage: Default::default(),
153 schema_definitions: HashMap::from([(None, HashMap::new())]),
154 merged_schema_definition: schema::Definition::any(),
155 schema: SchemaOptions::default(),
156 extra_context: Default::default(),
157 }
158 }
159}
160
161impl TransformContext {
162 #[allow(clippy::needless_update)]
165 pub fn new_with_globals(globals: GlobalOptions) -> Self {
166 Self {
167 globals,
168 ..Default::default()
169 }
170 }
171
172 #[cfg(test)]
173 pub fn new_test(
174 schema_definitions: HashMap<Option<String>, HashMap<OutputId, schema::Definition>>,
175 ) -> Self {
176 Self {
177 schema_definitions,
178 ..Default::default()
179 }
180 }
181
182 pub fn log_namespace(&self, namespace: Option<bool>) -> LogNamespace {
188 namespace
189 .or(self.schema.log_namespace)
190 .unwrap_or(false)
191 .into()
192 }
193}
194
195#[async_trait]
197#[typetag::serde(tag = "type")]
198pub trait TransformConfig: DynClone + NamedComponent + core::fmt::Debug + Send + Sync {
199 async fn build(&self, globals: &TransformContext) -> crate::Result<Transform>;
208
209 fn input(&self) -> Input;
211
212 fn outputs(
217 &self,
218 globals: &TransformContext,
219 input_definitions: &[(OutputId, schema::Definition)],
220 ) -> Vec<TransformOutput>;
221
222 fn validate(&self, _merged_definition: &schema::Definition) -> Result<(), Vec<String>> {
232 Ok(())
233 }
234
235 fn enable_concurrency(&self) -> bool {
242 false
243 }
244
245 fn nestable(&self, _parents: &HashSet<&'static str>) -> bool {
256 true
257 }
258
259 fn files_to_watch(&self) -> Vec<&PathBuf> {
261 Vec::new()
262 }
263}
264
265dyn_clone::clone_trait_object!(TransformConfig);
266
267pub fn get_transform_output_ids<T: TransformConfig + ?Sized>(
270 transform: &T,
271 key: ComponentKey,
272 global_log_namespace: LogNamespace,
273) -> impl Iterator<Item = OutputId> + '_ {
274 transform
275 .outputs(
276 &TransformContext {
277 schema: SchemaOptions {
278 log_namespace: Some(global_log_namespace.into()),
279 ..Default::default()
280 },
281 ..Default::default()
282 },
283 &[(key.clone().into(), schema::Definition::any())],
284 )
285 .into_iter()
286 .map(move |output| OutputId {
287 component: key.clone(),
288 port: output.port,
289 })
290}