vector/transforms/window/
config.rs1use vector_lib::{config::clone_input_definitions, configurable::configurable_component};
2
3use super::transform::Window;
4use crate::{
5 conditions::AnyCondition,
6 config::{
7 DataType, GenerateConfig, Input, OutputId, TransformConfig, TransformContext,
8 TransformOutput,
9 },
10 schema,
11 transforms::Transform,
12};
13
14#[configurable_component(transform(
16 "window",
17 "Apply a buffered sliding window over the stream of events and flush it based on supplied criteria"
18))]
19#[derive(Clone, Debug)]
20#[serde(deny_unknown_fields)]
21pub struct WindowConfig {
22 pub forward_when: Option<AnyCondition>,
28
29 pub flush_when: AnyCondition,
34
35 #[serde(default = "default_events_before")]
37 pub num_events_before: usize,
38
39 #[serde(default = "default_events_after")]
41 pub num_events_after: usize,
42}
43
44impl GenerateConfig for WindowConfig {
45 fn generate_config() -> toml::Value {
46 toml::from_str(r#"flush_when = ".message == \"value\"""#).unwrap()
47 }
48}
49
50const fn default_events_before() -> usize {
51 100
52}
53
54const fn default_events_after() -> usize {
55 0
56}
57
58#[async_trait::async_trait]
59#[typetag::serde(name = "window")]
60impl TransformConfig for WindowConfig {
61 async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
62 Ok(Transform::function(
63 Window::new(
64 self.forward_when
65 .as_ref()
66 .map(|condition| {
67 condition.build(&context.enrichment_tables, &context.metrics_storage)
68 })
69 .transpose()?,
70 self.flush_when
71 .build(&context.enrichment_tables, &context.metrics_storage)?,
72 self.num_events_before,
73 self.num_events_after,
74 )
75 .unwrap(),
76 ))
77 }
78
79 fn input(&self) -> Input {
80 Input::new(DataType::Log)
81 }
82
83 fn outputs(
84 &self,
85 _: &TransformContext,
86 input_definitions: &[(OutputId, schema::Definition)],
87 ) -> Vec<TransformOutput> {
88 vec![TransformOutput::new(
90 DataType::Log,
91 clone_input_definitions(input_definitions),
92 )]
93 }
94}