vector/transforms/window/
config.rs1use vector_lib::{
2 config::{LogNamespace, clone_input_definitions},
3 configurable::configurable_component,
4};
5
6use super::transform::Window;
7use crate::{
8 conditions::AnyCondition,
9 config::{
10 DataType, GenerateConfig, Input, OutputId, TransformConfig, TransformContext,
11 TransformOutput,
12 },
13 schema,
14 transforms::Transform,
15};
16
17#[configurable_component(transform(
19 "window",
20 "Apply a buffered sliding window over the stream of events and flush it based on supplied criteria"
21))]
22#[derive(Clone, Debug)]
23#[serde(deny_unknown_fields)]
24pub struct WindowConfig {
25 pub forward_when: Option<AnyCondition>,
31
32 pub flush_when: AnyCondition,
37
38 #[serde(default = "default_events_before")]
40 pub num_events_before: usize,
41
42 #[serde(default = "default_events_after")]
44 pub num_events_after: usize,
45}
46
47impl GenerateConfig for WindowConfig {
48 fn generate_config() -> toml::Value {
49 toml::from_str(r#"flush_when = ".message == \"value\"""#).unwrap()
50 }
51}
52
53const fn default_events_before() -> usize {
54 100
55}
56
57const fn default_events_after() -> usize {
58 0
59}
60
61#[async_trait::async_trait]
62#[typetag::serde(name = "window")]
63impl TransformConfig for WindowConfig {
64 async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
65 Ok(Transform::function(
66 Window::new(
67 self.forward_when
68 .as_ref()
69 .map(|condition| condition.build(&context.enrichment_tables))
70 .transpose()?,
71 self.flush_when.build(&context.enrichment_tables)?,
72 self.num_events_before,
73 self.num_events_after,
74 )
75 .unwrap(),
76 ))
77 }
78
79 fn input(&self) -> Input {
80 Input::new(DataType::Log)
81 }
82
83 fn outputs(
84 &self,
85 _: vector_lib::enrichment::TableRegistry,
86 input_definitions: &[(OutputId, schema::Definition)],
87 _: LogNamespace,
88 ) -> Vec<TransformOutput> {
89 vec![TransformOutput::new(
91 DataType::Log,
92 clone_input_definitions(input_definitions),
93 )]
94 }
95}