vector/transforms/window/
config.rs1use vector_lib::config::{clone_input_definitions, LogNamespace};
2use vector_lib::configurable::configurable_component;
3
4use crate::{
5 conditions::AnyCondition,
6 config::{
7 DataType, GenerateConfig, Input, OutputId, TransformConfig, TransformContext,
8 TransformOutput,
9 },
10 schema,
11 transforms::Transform,
12};
13
14use super::transform::Window;
15
16#[configurable_component(transform(
18 "window",
19 "Apply a buffered sliding window over the stream of events and flush it based on supplied criteria"
20))]
21#[derive(Clone, Debug)]
22#[serde(deny_unknown_fields)]
23pub struct WindowConfig {
24 pub forward_when: Option<AnyCondition>,
30
31 pub flush_when: AnyCondition,
36
37 #[serde(default = "default_events_before")]
39 pub num_events_before: usize,
40
41 #[serde(default = "default_events_after")]
43 pub num_events_after: usize,
44}
45
46impl GenerateConfig for WindowConfig {
47 fn generate_config() -> toml::Value {
48 toml::from_str(r#"flush_when = ".message == \"value\"""#).unwrap()
49 }
50}
51
52const fn default_events_before() -> usize {
53 100
54}
55
56const fn default_events_after() -> usize {
57 0
58}
59
60#[async_trait::async_trait]
61#[typetag::serde(name = "window")]
62impl TransformConfig for WindowConfig {
63 async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
64 Ok(Transform::function(
65 Window::new(
66 self.forward_when
67 .as_ref()
68 .map(|condition| condition.build(&context.enrichment_tables))
69 .transpose()?,
70 self.flush_when.build(&context.enrichment_tables)?,
71 self.num_events_before,
72 self.num_events_after,
73 )
74 .unwrap(),
75 ))
76 }
77
78 fn input(&self) -> Input {
79 Input::new(DataType::Log)
80 }
81
82 fn outputs(
83 &self,
84 _: vector_lib::enrichment::TableRegistry,
85 input_definitions: &[(OutputId, schema::Definition)],
86 _: LogNamespace,
87 ) -> Vec<TransformOutput> {
88 vec![TransformOutput::new(
90 DataType::Log,
91 clone_input_definitions(input_definitions),
92 )]
93 }
94}