vector/config/
transform.rs1use std::cell::RefCell;
2use std::collections::{HashMap, HashSet};
3use std::path::PathBuf;
4
5use async_trait::async_trait;
6use dyn_clone::DynClone;
7use serde::Serialize;
8use vector_lib::configurable::attributes::CustomAttribute;
9use vector_lib::configurable::{
10 configurable_component,
11 schema::{SchemaGenerator, SchemaObject},
12 Configurable, GenerateError, Metadata, NamedComponent,
13};
14use vector_lib::{
15 config::{GlobalOptions, Input, LogNamespace, TransformOutput},
16 id::Inputs,
17 schema,
18 transform::Transform,
19};
20
21use super::dot_graph::GraphConfig;
22use super::schema::Options as SchemaOptions;
23use super::ComponentKey;
24use super::OutputId;
25use crate::extra_context::ExtraContext;
26
27pub type BoxedTransform = Box<dyn TransformConfig>;
28
29impl Configurable for BoxedTransform {
30 fn referenceable_name() -> Option<&'static str> {
31 Some("vector::transforms::Transforms")
32 }
33
34 fn metadata() -> Metadata {
35 let mut metadata = Metadata::default();
36 metadata.set_description("Configurable transforms in Vector.");
37 metadata.add_custom_attribute(CustomAttribute::kv("docs::enum_tagging", "internal"));
38 metadata.add_custom_attribute(CustomAttribute::kv("docs::enum_tag_field", "type"));
39 metadata
40 }
41
42 fn generate_schema(
43 generator: &RefCell<SchemaGenerator>,
44 ) -> Result<SchemaObject, GenerateError> {
45 vector_lib::configurable::component::TransformDescription::generate_schemas(generator)
46 }
47}
48
49impl<T: TransformConfig + 'static> From<T> for BoxedTransform {
50 fn from(that: T) -> Self {
51 Box::new(that)
52 }
53}
54
55#[configurable_component]
57#[configurable(metadata(docs::component_base_type = "transform"))]
58#[derive(Clone, Debug)]
59pub struct TransformOuter<T>
60where
61 T: Configurable + Serialize + 'static,
62{
63 #[configurable(derived)]
64 #[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
65 pub graph: GraphConfig,
66
67 #[configurable(derived)]
68 pub inputs: Inputs<T>,
69
70 #[configurable(metadata(docs::hidden))]
71 #[serde(flatten)]
72 pub inner: BoxedTransform,
73}
74
75impl<T> TransformOuter<T>
76where
77 T: Configurable + Serialize,
78{
79 pub(crate) fn new<I, IT>(inputs: I, inner: IT) -> Self
80 where
81 I: IntoIterator<Item = T>,
82 IT: Into<BoxedTransform>,
83 {
84 let inputs = Inputs::from_iter(inputs);
85 let inner = inner.into();
86 TransformOuter {
87 inputs,
88 inner,
89 graph: Default::default(),
90 }
91 }
92
93 pub(super) fn map_inputs<U>(self, f: impl Fn(&T) -> U) -> TransformOuter<U>
94 where
95 U: Configurable + Serialize,
96 {
97 let inputs = self.inputs.iter().map(f).collect::<Vec<_>>();
98 self.with_inputs(inputs)
99 }
100
101 pub(crate) fn with_inputs<I, U>(self, inputs: I) -> TransformOuter<U>
102 where
103 I: IntoIterator<Item = U>,
104 U: Configurable + Serialize,
105 {
106 TransformOuter {
107 inputs: Inputs::from_iter(inputs),
108 inner: self.inner,
109 graph: self.graph,
110 }
111 }
112}
113
114pub struct TransformContext {
115 pub key: Option<ComponentKey>,
119
120 pub globals: GlobalOptions,
121
122 pub enrichment_tables: vector_lib::enrichment::TableRegistry,
123
124 pub schema_definitions: HashMap<Option<String>, HashMap<OutputId, schema::Definition>>,
129
130 pub merged_schema_definition: schema::Definition,
136
137 pub schema: SchemaOptions,
138
139 pub extra_context: ExtraContext,
142}
143
144impl Default for TransformContext {
145 fn default() -> Self {
146 Self {
147 key: Default::default(),
148 globals: Default::default(),
149 enrichment_tables: Default::default(),
150 schema_definitions: HashMap::from([(None, HashMap::new())]),
151 merged_schema_definition: schema::Definition::any(),
152 schema: SchemaOptions::default(),
153 extra_context: Default::default(),
154 }
155 }
156}
157
158impl TransformContext {
159 #[allow(clippy::needless_update)]
162 pub fn new_with_globals(globals: GlobalOptions) -> Self {
163 Self {
164 globals,
165 ..Default::default()
166 }
167 }
168
169 #[cfg(test)]
170 pub fn new_test(
171 schema_definitions: HashMap<Option<String>, HashMap<OutputId, schema::Definition>>,
172 ) -> Self {
173 Self {
174 schema_definitions,
175 ..Default::default()
176 }
177 }
178
179 pub fn log_namespace(&self, namespace: Option<bool>) -> LogNamespace {
185 namespace
186 .or(self.schema.log_namespace)
187 .unwrap_or(false)
188 .into()
189 }
190}
191
192#[async_trait]
194#[typetag::serde(tag = "type")]
195pub trait TransformConfig: DynClone + NamedComponent + core::fmt::Debug + Send + Sync {
196 async fn build(&self, globals: &TransformContext) -> crate::Result<Transform>;
205
206 fn input(&self) -> Input;
208
209 fn outputs(
214 &self,
215 enrichment_tables: vector_lib::enrichment::TableRegistry,
216 input_definitions: &[(OutputId, schema::Definition)],
217
218 global_log_namespace: LogNamespace,
221 ) -> Vec<TransformOutput>;
222
223 fn validate(&self, _merged_definition: &schema::Definition) -> Result<(), Vec<String>> {
233 Ok(())
234 }
235
236 fn enable_concurrency(&self) -> bool {
243 false
244 }
245
246 fn nestable(&self, _parents: &HashSet<&'static str>) -> bool {
257 true
258 }
259
260 fn files_to_watch(&self) -> Vec<&PathBuf> {
262 Vec::new()
263 }
264}
265
266dyn_clone::clone_trait_object!(TransformConfig);
267
268pub fn get_transform_output_ids<T: TransformConfig + ?Sized>(
271 transform: &T,
272 key: ComponentKey,
273 global_log_namespace: LogNamespace,
274) -> impl Iterator<Item = OutputId> + '_ {
275 transform
276 .outputs(
277 vector_lib::enrichment::TableRegistry::default(),
278 &[(key.clone().into(), schema::Definition::any())],
279 global_log_namespace,
280 )
281 .into_iter()
282 .map(move |output| OutputId {
283 component: key.clone(),
284 port: output.port,
285 })
286}