1use std::{collections::HashMap, error, pin::Pin, sync::Arc, time::Instant};
2
3use futures::{Stream, StreamExt};
4use vector_common::internal_event::{
5 self, register, CountByteSize, EventsSent, InternalEventHandle as _, Registered, DEFAULT_OUTPUT,
6};
7use vector_common::{byte_size_of::ByteSizeOf, json_size::JsonSize, EventDataEq};
8
9use crate::config::{ComponentKey, OutputId};
10use crate::event::EventMutRef;
11use crate::schema::Definition;
12use crate::{
13 config,
14 event::{
15 into_event_stream, EstimatedJsonEncodedSizeOf, Event, EventArray, EventContainer, EventRef,
16 },
17 fanout::{self, Fanout},
18 schema,
19};
20
21#[cfg(feature = "lua")]
22pub mod runtime_transform;
23
24pub enum Transform {
29 Function(Box<dyn FunctionTransform>),
30 Synchronous(Box<dyn SyncTransform>),
31 Task(Box<dyn TaskTransform<EventArray>>),
32}
33
34impl Transform {
35 pub fn function(v: impl FunctionTransform + 'static) -> Self {
43 Transform::Function(Box::new(v))
44 }
45
46 pub fn synchronous(v: impl SyncTransform + 'static) -> Self {
53 Transform::Synchronous(Box::new(v))
54 }
55
56 pub fn task(v: impl TaskTransform<EventArray> + 'static) -> Self {
64 Transform::Task(Box::new(v))
65 }
66
67 pub fn event_task(v: impl TaskTransform<Event> + 'static) -> Self {
79 Transform::Task(Box::new(WrapEventTask(v)))
80 }
81
82 pub fn into_task(self) -> Box<dyn TaskTransform<EventArray>> {
88 match self {
89 Transform::Task(t) => t,
90 _ => {
91 panic!("Called `Transform::into_task` on something that was not a task variant.")
92 }
93 }
94 }
95}
96
97pub trait FunctionTransform: Send + dyn_clone::DynClone + Sync {
105 fn transform(&mut self, output: &mut OutputBuffer, event: Event);
106}
107
108dyn_clone::clone_trait_object!(FunctionTransform);
109
110pub trait TaskTransform<T: EventContainer + 'static>: Send + 'static {
119 fn transform(
120 self: Box<Self>,
121 task: Pin<Box<dyn Stream<Item = T> + Send>>,
122 ) -> Pin<Box<dyn Stream<Item = T> + Send>>;
123
124 fn transform_events(
127 self: Box<Self>,
128 task: Pin<Box<dyn Stream<Item = Event> + Send>>,
129 ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
130 where
131 T: From<Event>,
132 T::IntoIter: Send,
133 {
134 self.transform(task.map(Into::into).boxed())
135 .flat_map(into_event_stream)
136 .boxed()
137 }
138}
139
140pub trait SyncTransform: Send + dyn_clone::DynClone + Sync {
145 fn transform(&mut self, event: Event, output: &mut TransformOutputsBuf);
146
147 fn transform_all(&mut self, events: EventArray, output: &mut TransformOutputsBuf) {
148 for event in events.into_events() {
149 self.transform(event, output);
150 }
151 }
152}
153
154dyn_clone::clone_trait_object!(SyncTransform);
155
156impl<T> SyncTransform for T
157where
158 T: FunctionTransform,
159{
160 fn transform(&mut self, event: Event, output: &mut TransformOutputsBuf) {
161 FunctionTransform::transform(
162 self,
163 output.primary_buffer.as_mut().expect("no default output"),
164 event,
165 );
166 }
167}
168
169impl SyncTransform for Box<dyn FunctionTransform> {
171 fn transform(&mut self, event: Event, output: &mut TransformOutputsBuf) {
172 FunctionTransform::transform(
173 self.as_mut(),
174 output.primary_buffer.as_mut().expect("no default output"),
175 event,
176 );
177 }
178}
179
180struct TransformOutput {
181 fanout: Fanout,
182 events_sent: Registered<EventsSent>,
183 log_schema_definitions: HashMap<OutputId, Arc<schema::Definition>>,
184 output_id: Arc<OutputId>,
185}
186
187pub struct TransformOutputs {
188 outputs_spec: Vec<config::TransformOutput>,
189 primary_output: Option<TransformOutput>,
190 named_outputs: HashMap<String, TransformOutput>,
191}
192
193impl TransformOutputs {
194 pub fn new(
195 outputs_in: Vec<config::TransformOutput>,
196 component_key: &ComponentKey,
197 ) -> (Self, HashMap<Option<String>, fanout::ControlChannel>) {
198 let outputs_spec = outputs_in.clone();
199 let mut primary_output = None;
200 let mut named_outputs = HashMap::new();
201 let mut controls = HashMap::new();
202
203 for output in outputs_in {
204 let (fanout, control) = Fanout::new();
205
206 let log_schema_definitions = output
207 .log_schema_definitions
208 .into_iter()
209 .map(|(id, definition)| (id, Arc::new(definition)))
210 .collect();
211
212 match output.port {
213 None => {
214 primary_output = Some(TransformOutput {
215 fanout,
216 events_sent: register(EventsSent::from(internal_event::Output(Some(
217 DEFAULT_OUTPUT.into(),
218 )))),
219 log_schema_definitions,
220 output_id: Arc::new(OutputId {
221 component: component_key.clone(),
222 port: None,
223 }),
224 });
225 controls.insert(None, control);
226 }
227 Some(name) => {
228 named_outputs.insert(
229 name.clone(),
230 TransformOutput {
231 fanout,
232 events_sent: register(EventsSent::from(internal_event::Output(Some(
233 name.clone().into(),
234 )))),
235 log_schema_definitions,
236 output_id: Arc::new(OutputId {
237 component: component_key.clone(),
238 port: Some(name.clone()),
239 }),
240 },
241 );
242 controls.insert(Some(name.clone()), control);
243 }
244 }
245 }
246
247 let me = Self {
248 outputs_spec,
249 primary_output,
250 named_outputs,
251 };
252
253 (me, controls)
254 }
255
256 pub fn new_buf_with_capacity(&self, capacity: usize) -> TransformOutputsBuf {
257 TransformOutputsBuf::new_with_capacity(self.outputs_spec.clone(), capacity)
258 }
259
260 pub async fn send(
267 &mut self,
268 buf: &mut TransformOutputsBuf,
269 ) -> Result<(), Box<dyn error::Error + Send + Sync>> {
270 if let Some(primary) = self.primary_output.as_mut() {
271 let buf = buf
272 .primary_buffer
273 .as_mut()
274 .unwrap_or_else(|| unreachable!("mismatched outputs"));
275 Self::send_single_buffer(buf, primary).await?;
276 }
277 for (key, buf) in &mut buf.named_buffers {
278 let output = self
279 .named_outputs
280 .get_mut(key)
281 .unwrap_or_else(|| unreachable!("unknown output"));
282 Self::send_single_buffer(buf, output).await?;
283 }
284 Ok(())
285 }
286
287 async fn send_single_buffer(
288 buf: &mut OutputBuffer,
289 output: &mut TransformOutput,
290 ) -> Result<(), Box<dyn error::Error + Send + Sync>> {
291 for event in buf.events_mut() {
292 update_runtime_schema_definition(
293 event,
294 &output.output_id,
295 &output.log_schema_definitions,
296 );
297 }
298 let count = buf.len();
299 let byte_size = buf.estimated_json_encoded_size_of();
300 buf.send(&mut output.fanout).await?;
301 output.events_sent.emit(CountByteSize(count, byte_size));
302 Ok(())
303 }
304}
305
306#[allow(clippy::implicit_hasher)]
307pub fn update_runtime_schema_definition(
311 mut event: EventMutRef,
312 output_id: &Arc<OutputId>,
313 log_schema_definitions: &HashMap<OutputId, Arc<Definition>>,
314) {
315 if let EventMutRef::Log(log) = &mut event {
316 if let Some(parent_component_id) = log.metadata().upstream_id() {
317 if let Some(definition) = log_schema_definitions.get(parent_component_id) {
318 log.metadata_mut().set_schema_definition(definition);
319 }
320 } else {
321 if let Some(definition) = log_schema_definitions.values().next() {
325 log.metadata_mut().set_schema_definition(definition);
326 }
327 }
328 }
329 event.metadata_mut().set_upstream_id(Arc::clone(output_id));
330}
331
332#[derive(Debug, Clone)]
333pub struct TransformOutputsBuf {
334 primary_buffer: Option<OutputBuffer>,
335 named_buffers: HashMap<String, OutputBuffer>,
336}
337
338impl TransformOutputsBuf {
339 pub fn new_with_capacity(outputs_in: Vec<config::TransformOutput>, capacity: usize) -> Self {
340 let mut primary_buffer = None;
341 let mut named_buffers = HashMap::new();
342
343 for output in outputs_in {
344 match output.port {
345 None => {
346 primary_buffer = Some(OutputBuffer::with_capacity(capacity));
347 }
348 Some(name) => {
349 named_buffers.insert(name.clone(), OutputBuffer::default());
350 }
351 }
352 }
353
354 Self {
355 primary_buffer,
356 named_buffers,
357 }
358 }
359
360 pub fn push(&mut self, name: Option<&str>, event: Event) {
366 match name {
367 Some(name) => self.named_buffers.get_mut(name),
368 None => self.primary_buffer.as_mut(),
369 }
370 .expect("unknown output")
371 .push(event);
372 }
373
374 pub fn drain(&mut self) -> impl Iterator<Item = Event> + '_ {
380 self.primary_buffer
381 .as_mut()
382 .expect("no default output")
383 .drain()
384 }
385
386 pub fn drain_named(&mut self, name: &str) -> impl Iterator<Item = Event> + '_ {
392 self.named_buffers
393 .get_mut(name)
394 .expect("unknown output")
395 .drain()
396 }
397
398 pub fn take_primary(&mut self) -> OutputBuffer {
404 std::mem::take(self.primary_buffer.as_mut().expect("no default output"))
405 }
406
407 pub fn take_all_named(&mut self) -> HashMap<String, OutputBuffer> {
408 std::mem::take(&mut self.named_buffers)
409 }
410}
411
412impl ByteSizeOf for TransformOutputsBuf {
413 fn allocated_bytes(&self) -> usize {
414 self.primary_buffer.size_of()
415 + self
416 .named_buffers
417 .values()
418 .map(ByteSizeOf::size_of)
419 .sum::<usize>()
420 }
421}
422
423#[derive(Debug, Default, Clone)]
424pub struct OutputBuffer(Vec<EventArray>);
425
426impl OutputBuffer {
427 pub fn with_capacity(capacity: usize) -> Self {
428 Self(Vec::with_capacity(capacity))
429 }
430
431 pub fn push(&mut self, event: Event) {
432 match (event, self.0.last_mut()) {
434 (Event::Log(log), Some(EventArray::Logs(logs))) => {
435 logs.push(log);
436 }
437 (Event::Metric(metric), Some(EventArray::Metrics(metrics))) => {
438 metrics.push(metric);
439 }
440 (Event::Trace(trace), Some(EventArray::Traces(traces))) => {
441 traces.push(trace);
442 }
443 (event, _) => {
444 self.0.push(event.into());
445 }
446 }
447 }
448
449 pub fn append(&mut self, events: &mut Vec<Event>) {
450 for event in events.drain(..) {
451 self.push(event);
452 }
453 }
454
455 pub fn extend(&mut self, events: impl Iterator<Item = Event>) {
456 for event in events {
457 self.push(event);
458 }
459 }
460
461 pub fn is_empty(&self) -> bool {
462 self.0.is_empty()
463 }
464
465 pub fn len(&self) -> usize {
466 self.0.iter().map(EventArray::len).sum()
467 }
468
469 pub fn capacity(&self) -> usize {
470 self.0.capacity()
471 }
472
473 pub fn first(&self) -> Option<EventRef> {
474 self.0.first().and_then(|first| match first {
475 EventArray::Logs(l) => l.first().map(Into::into),
476 EventArray::Metrics(m) => m.first().map(Into::into),
477 EventArray::Traces(t) => t.first().map(Into::into),
478 })
479 }
480
481 pub fn drain(&mut self) -> impl Iterator<Item = Event> + '_ {
482 self.0.drain(..).flat_map(EventArray::into_events)
483 }
484
485 async fn send(
486 &mut self,
487 output: &mut Fanout,
488 ) -> Result<(), Box<dyn error::Error + Send + Sync>> {
489 let send_start = Some(Instant::now());
490 for array in std::mem::take(&mut self.0) {
491 output.send(array, send_start).await?;
492 }
493
494 Ok(())
495 }
496
497 fn iter_events(&self) -> impl Iterator<Item = EventRef> {
498 self.0.iter().flat_map(EventArray::iter_events)
499 }
500
501 fn events_mut(&mut self) -> impl Iterator<Item = EventMutRef> {
502 self.0.iter_mut().flat_map(EventArray::iter_events_mut)
503 }
504
505 pub fn into_events(self) -> impl Iterator<Item = Event> {
506 self.0.into_iter().flat_map(EventArray::into_events)
507 }
508}
509
510impl ByteSizeOf for OutputBuffer {
511 fn allocated_bytes(&self) -> usize {
512 self.0.iter().map(ByteSizeOf::size_of).sum()
513 }
514}
515
516impl EventDataEq<Vec<Event>> for OutputBuffer {
517 fn event_data_eq(&self, other: &Vec<Event>) -> bool {
518 struct Comparator<'a>(EventRef<'a>);
519
520 impl PartialEq<&Event> for Comparator<'_> {
521 fn eq(&self, that: &&Event) -> bool {
522 self.0.event_data_eq(that)
523 }
524 }
525
526 self.iter_events().map(Comparator).eq(other.iter())
527 }
528}
529
530impl EstimatedJsonEncodedSizeOf for OutputBuffer {
531 fn estimated_json_encoded_size_of(&self) -> JsonSize {
532 self.0
533 .iter()
534 .map(EstimatedJsonEncodedSizeOf::estimated_json_encoded_size_of)
535 .sum()
536 }
537}
538
539impl From<Vec<Event>> for OutputBuffer {
540 fn from(events: Vec<Event>) -> Self {
541 let mut result = Self::default();
542 result.extend(events.into_iter());
543 result
544 }
545}
546
547struct WrapEventTask<T>(T);
548
549impl<T: TaskTransform<Event> + Send + 'static> TaskTransform<EventArray> for WrapEventTask<T> {
550 fn transform(
551 self: Box<Self>,
552 stream: Pin<Box<dyn Stream<Item = EventArray> + Send>>,
553 ) -> Pin<Box<dyn Stream<Item = EventArray> + Send>> {
554 let stream = stream.flat_map(into_event_stream).boxed();
556 Box::new(self.0).transform(stream).map(Into::into).boxed()
557 }
558}
559
560#[cfg(test)]
561mod test {
562 use super::*;
563 use crate::event::{LogEvent, Metric, MetricKind, MetricValue};
564
565 #[test]
566 fn buffers_output() {
567 let mut buf = OutputBuffer::default();
568 assert_eq!(buf.len(), 0);
569 assert_eq!(buf.0.len(), 0);
570
571 buf.push(LogEvent::default().into());
573 assert_eq!(buf.len(), 1);
574 assert_eq!(buf.0.len(), 1);
575
576 buf.push(LogEvent::default().into());
578 assert_eq!(buf.len(), 2);
579 assert_eq!(buf.0.len(), 1);
580
581 buf.push(
583 Metric::new(
584 "name",
585 MetricKind::Absolute,
586 MetricValue::Counter { value: 1.0 },
587 )
588 .into(),
589 );
590 assert_eq!(buf.len(), 3);
591 assert_eq!(buf.0.len(), 2);
592
593 buf.push(LogEvent::default().into());
595 assert_eq!(buf.len(), 4);
596 assert_eq!(buf.0.len(), 3);
597 }
598}