1use std::{collections::HashMap, error, pin::Pin, sync::Arc, time::Instant};
2
3use futures::{Stream, StreamExt};
4use vector_common::{
5 EventDataEq,
6 byte_size_of::ByteSizeOf,
7 internal_event::{
8 self, CountByteSize, DEFAULT_OUTPUT, EventsSent, InternalEventHandle as _, Registered,
9 register,
10 },
11 json_size::JsonSize,
12};
13
14use crate::{
15 config,
16 config::{ComponentKey, OutputId},
17 event::{
18 EstimatedJsonEncodedSizeOf, Event, EventArray, EventContainer, EventMutRef, EventRef,
19 into_event_stream,
20 },
21 fanout::{self, Fanout},
22 schema,
23 schema::Definition,
24};
25
26#[cfg(feature = "lua")]
27pub mod runtime_transform;
28
29pub enum Transform {
34 Function(Box<dyn FunctionTransform>),
35 Synchronous(Box<dyn SyncTransform>),
36 Task(Box<dyn TaskTransform<EventArray>>),
37}
38
39impl Transform {
40 pub fn function(v: impl FunctionTransform + 'static) -> Self {
48 Transform::Function(Box::new(v))
49 }
50
51 pub fn synchronous(v: impl SyncTransform + 'static) -> Self {
58 Transform::Synchronous(Box::new(v))
59 }
60
61 pub fn task(v: impl TaskTransform<EventArray> + 'static) -> Self {
69 Transform::Task(Box::new(v))
70 }
71
72 pub fn event_task(v: impl TaskTransform<Event> + 'static) -> Self {
84 Transform::Task(Box::new(WrapEventTask(v)))
85 }
86
87 pub fn into_task(self) -> Box<dyn TaskTransform<EventArray>> {
93 match self {
94 Transform::Task(t) => t,
95 _ => {
96 panic!("Called `Transform::into_task` on something that was not a task variant.")
97 }
98 }
99 }
100}
101
102pub trait FunctionTransform: Send + dyn_clone::DynClone + Sync {
110 fn transform(&mut self, output: &mut OutputBuffer, event: Event);
111}
112
113dyn_clone::clone_trait_object!(FunctionTransform);
114
115pub trait TaskTransform<T: EventContainer + 'static>: Send + 'static {
124 fn transform(
125 self: Box<Self>,
126 task: Pin<Box<dyn Stream<Item = T> + Send>>,
127 ) -> Pin<Box<dyn Stream<Item = T> + Send>>;
128
129 fn transform_events(
132 self: Box<Self>,
133 task: Pin<Box<dyn Stream<Item = Event> + Send>>,
134 ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
135 where
136 T: From<Event>,
137 T::IntoIter: Send,
138 {
139 self.transform(task.map(Into::into).boxed())
140 .flat_map(into_event_stream)
141 .boxed()
142 }
143}
144
145pub trait SyncTransform: Send + dyn_clone::DynClone + Sync {
150 fn transform(&mut self, event: Event, output: &mut TransformOutputsBuf);
151
152 fn transform_all(&mut self, events: EventArray, output: &mut TransformOutputsBuf) {
153 for event in events.into_events() {
154 self.transform(event, output);
155 }
156 }
157}
158
159dyn_clone::clone_trait_object!(SyncTransform);
160
161impl<T> SyncTransform for T
162where
163 T: FunctionTransform,
164{
165 fn transform(&mut self, event: Event, output: &mut TransformOutputsBuf) {
166 FunctionTransform::transform(
167 self,
168 output.primary_buffer.as_mut().expect("no default output"),
169 event,
170 );
171 }
172}
173
174impl SyncTransform for Box<dyn FunctionTransform> {
176 fn transform(&mut self, event: Event, output: &mut TransformOutputsBuf) {
177 FunctionTransform::transform(
178 self.as_mut(),
179 output.primary_buffer.as_mut().expect("no default output"),
180 event,
181 );
182 }
183}
184
185struct TransformOutput {
186 fanout: Fanout,
187 events_sent: Registered<EventsSent>,
188 log_schema_definitions: HashMap<OutputId, Arc<schema::Definition>>,
189 output_id: Arc<OutputId>,
190}
191
192pub struct TransformOutputs {
193 outputs_spec: Vec<config::TransformOutput>,
194 primary_output: Option<TransformOutput>,
195 named_outputs: HashMap<String, TransformOutput>,
196}
197
198impl TransformOutputs {
199 pub fn new(
200 outputs_in: Vec<config::TransformOutput>,
201 component_key: &ComponentKey,
202 ) -> (Self, HashMap<Option<String>, fanout::ControlChannel>) {
203 let outputs_spec = outputs_in.clone();
204 let mut primary_output = None;
205 let mut named_outputs = HashMap::new();
206 let mut controls = HashMap::new();
207
208 for output in outputs_in {
209 let (fanout, control) = Fanout::new();
210
211 let log_schema_definitions = output
212 .log_schema_definitions
213 .into_iter()
214 .map(|(id, definition)| (id, Arc::new(definition)))
215 .collect();
216
217 match output.port {
218 None => {
219 primary_output = Some(TransformOutput {
220 fanout,
221 events_sent: register(EventsSent::from(internal_event::Output(Some(
222 DEFAULT_OUTPUT.into(),
223 )))),
224 log_schema_definitions,
225 output_id: Arc::new(OutputId {
226 component: component_key.clone(),
227 port: None,
228 }),
229 });
230 controls.insert(None, control);
231 }
232 Some(name) => {
233 named_outputs.insert(
234 name.clone(),
235 TransformOutput {
236 fanout,
237 events_sent: register(EventsSent::from(internal_event::Output(Some(
238 name.clone().into(),
239 )))),
240 log_schema_definitions,
241 output_id: Arc::new(OutputId {
242 component: component_key.clone(),
243 port: Some(name.clone()),
244 }),
245 },
246 );
247 controls.insert(Some(name.clone()), control);
248 }
249 }
250 }
251
252 let me = Self {
253 outputs_spec,
254 primary_output,
255 named_outputs,
256 };
257
258 (me, controls)
259 }
260
261 pub fn new_buf_with_capacity(&self, capacity: usize) -> TransformOutputsBuf {
262 TransformOutputsBuf::new_with_capacity(self.outputs_spec.clone(), capacity)
263 }
264
265 pub async fn send(
272 &mut self,
273 buf: &mut TransformOutputsBuf,
274 ) -> Result<(), Box<dyn error::Error + Send + Sync>> {
275 if let Some(primary) = self.primary_output.as_mut() {
276 let buf = buf
277 .primary_buffer
278 .as_mut()
279 .unwrap_or_else(|| unreachable!("mismatched outputs"));
280 Self::send_single_buffer(buf, primary).await?;
281 }
282 for (key, buf) in &mut buf.named_buffers {
283 let output = self
284 .named_outputs
285 .get_mut(key)
286 .unwrap_or_else(|| unreachable!("unknown output"));
287 Self::send_single_buffer(buf, output).await?;
288 }
289 Ok(())
290 }
291
292 async fn send_single_buffer(
293 buf: &mut OutputBuffer,
294 output: &mut TransformOutput,
295 ) -> Result<(), Box<dyn error::Error + Send + Sync>> {
296 for event in buf.events_mut() {
297 update_runtime_schema_definition(
298 event,
299 &output.output_id,
300 &output.log_schema_definitions,
301 );
302 }
303 let count = buf.len();
304 let byte_size = buf.estimated_json_encoded_size_of();
305 buf.send(&mut output.fanout).await?;
306 output.events_sent.emit(CountByteSize(count, byte_size));
307 Ok(())
308 }
309}
310
311#[allow(clippy::implicit_hasher)]
312pub fn update_runtime_schema_definition(
316 mut event: EventMutRef,
317 output_id: &Arc<OutputId>,
318 log_schema_definitions: &HashMap<OutputId, Arc<Definition>>,
319) {
320 if let EventMutRef::Log(log) = &mut event {
321 if let Some(parent_component_id) = log.metadata().upstream_id() {
322 if let Some(definition) = log_schema_definitions.get(parent_component_id) {
323 log.metadata_mut().set_schema_definition(definition);
324 }
325 } else {
326 if let Some(definition) = log_schema_definitions.values().next() {
330 log.metadata_mut().set_schema_definition(definition);
331 }
332 }
333 }
334 event.metadata_mut().set_upstream_id(Arc::clone(output_id));
335}
336
337#[derive(Debug, Clone)]
338pub struct TransformOutputsBuf {
339 primary_buffer: Option<OutputBuffer>,
340 named_buffers: HashMap<String, OutputBuffer>,
341}
342
343impl TransformOutputsBuf {
344 pub fn new_with_capacity(outputs_in: Vec<config::TransformOutput>, capacity: usize) -> Self {
345 let mut primary_buffer = None;
346 let mut named_buffers = HashMap::new();
347
348 for output in outputs_in {
349 match output.port {
350 None => {
351 primary_buffer = Some(OutputBuffer::with_capacity(capacity));
352 }
353 Some(name) => {
354 named_buffers.insert(name.clone(), OutputBuffer::default());
355 }
356 }
357 }
358
359 Self {
360 primary_buffer,
361 named_buffers,
362 }
363 }
364
365 pub fn push(&mut self, name: Option<&str>, event: Event) {
371 match name {
372 Some(name) => self.named_buffers.get_mut(name),
373 None => self.primary_buffer.as_mut(),
374 }
375 .expect("unknown output")
376 .push(event);
377 }
378
379 pub fn drain(&mut self) -> impl Iterator<Item = Event> + '_ {
385 self.primary_buffer
386 .as_mut()
387 .expect("no default output")
388 .drain()
389 }
390
391 pub fn drain_named(&mut self, name: &str) -> impl Iterator<Item = Event> + '_ {
397 self.named_buffers
398 .get_mut(name)
399 .expect("unknown output")
400 .drain()
401 }
402
403 pub fn take_primary(&mut self) -> OutputBuffer {
409 std::mem::take(self.primary_buffer.as_mut().expect("no default output"))
410 }
411
412 pub fn take_all_named(&mut self) -> HashMap<String, OutputBuffer> {
413 std::mem::take(&mut self.named_buffers)
414 }
415}
416
417impl ByteSizeOf for TransformOutputsBuf {
418 fn allocated_bytes(&self) -> usize {
419 self.primary_buffer.size_of()
420 + self
421 .named_buffers
422 .values()
423 .map(ByteSizeOf::size_of)
424 .sum::<usize>()
425 }
426}
427
428#[derive(Debug, Default, Clone)]
429pub struct OutputBuffer(Vec<EventArray>);
430
431impl OutputBuffer {
432 pub fn with_capacity(capacity: usize) -> Self {
433 Self(Vec::with_capacity(capacity))
434 }
435
436 pub fn push(&mut self, event: Event) {
437 match (event, self.0.last_mut()) {
439 (Event::Log(log), Some(EventArray::Logs(logs))) => {
440 logs.push(log);
441 }
442 (Event::Metric(metric), Some(EventArray::Metrics(metrics))) => {
443 metrics.push(metric);
444 }
445 (Event::Trace(trace), Some(EventArray::Traces(traces))) => {
446 traces.push(trace);
447 }
448 (event, _) => {
449 self.0.push(event.into());
450 }
451 }
452 }
453
454 pub fn append(&mut self, events: &mut Vec<Event>) {
455 for event in events.drain(..) {
456 self.push(event);
457 }
458 }
459
460 pub fn extend(&mut self, events: impl Iterator<Item = Event>) {
461 for event in events {
462 self.push(event);
463 }
464 }
465
466 pub fn is_empty(&self) -> bool {
467 self.0.is_empty()
468 }
469
470 pub fn len(&self) -> usize {
471 self.0.iter().map(EventArray::len).sum()
472 }
473
474 pub fn capacity(&self) -> usize {
475 self.0.capacity()
476 }
477
478 pub fn first(&self) -> Option<EventRef<'_>> {
479 self.0.first().and_then(|first| match first {
480 EventArray::Logs(l) => l.first().map(Into::into),
481 EventArray::Metrics(m) => m.first().map(Into::into),
482 EventArray::Traces(t) => t.first().map(Into::into),
483 })
484 }
485
486 pub fn drain(&mut self) -> impl Iterator<Item = Event> + '_ {
487 self.0.drain(..).flat_map(EventArray::into_events)
488 }
489
490 async fn send(
491 &mut self,
492 output: &mut Fanout,
493 ) -> Result<(), Box<dyn error::Error + Send + Sync>> {
494 let send_start = Some(Instant::now());
495 for array in std::mem::take(&mut self.0) {
496 output.send(array, send_start).await?;
497 }
498
499 Ok(())
500 }
501
502 fn iter_events(&self) -> impl Iterator<Item = EventRef<'_>> {
503 self.0.iter().flat_map(EventArray::iter_events)
504 }
505
506 fn events_mut(&mut self) -> impl Iterator<Item = EventMutRef<'_>> {
507 self.0.iter_mut().flat_map(EventArray::iter_events_mut)
508 }
509
510 pub fn into_events(self) -> impl Iterator<Item = Event> {
511 self.0.into_iter().flat_map(EventArray::into_events)
512 }
513}
514
515impl ByteSizeOf for OutputBuffer {
516 fn allocated_bytes(&self) -> usize {
517 self.0.iter().map(ByteSizeOf::size_of).sum()
518 }
519}
520
521impl EventDataEq<Vec<Event>> for OutputBuffer {
522 fn event_data_eq(&self, other: &Vec<Event>) -> bool {
523 struct Comparator<'a>(EventRef<'a>);
524
525 impl PartialEq<&Event> for Comparator<'_> {
526 fn eq(&self, that: &&Event) -> bool {
527 self.0.event_data_eq(that)
528 }
529 }
530
531 self.iter_events().map(Comparator).eq(other.iter())
532 }
533}
534
535impl EstimatedJsonEncodedSizeOf for OutputBuffer {
536 fn estimated_json_encoded_size_of(&self) -> JsonSize {
537 self.0
538 .iter()
539 .map(EstimatedJsonEncodedSizeOf::estimated_json_encoded_size_of)
540 .sum()
541 }
542}
543
544impl From<Vec<Event>> for OutputBuffer {
545 fn from(events: Vec<Event>) -> Self {
546 let mut result = Self::default();
547 result.extend(events.into_iter());
548 result
549 }
550}
551
552struct WrapEventTask<T>(T);
553
554impl<T: TaskTransform<Event> + Send + 'static> TaskTransform<EventArray> for WrapEventTask<T> {
555 fn transform(
556 self: Box<Self>,
557 stream: Pin<Box<dyn Stream<Item = EventArray> + Send>>,
558 ) -> Pin<Box<dyn Stream<Item = EventArray> + Send>> {
559 let stream = stream.flat_map(into_event_stream).boxed();
561 Box::new(self.0).transform(stream).map(Into::into).boxed()
562 }
563}
564
565#[cfg(test)]
566mod test {
567 use super::*;
568 use crate::event::{LogEvent, Metric, MetricKind, MetricValue};
569
570 #[test]
571 fn buffers_output() {
572 let mut buf = OutputBuffer::default();
573 assert_eq!(buf.len(), 0);
574 assert_eq!(buf.0.len(), 0);
575
576 buf.push(LogEvent::default().into());
578 assert_eq!(buf.len(), 1);
579 assert_eq!(buf.0.len(), 1);
580
581 buf.push(LogEvent::default().into());
583 assert_eq!(buf.len(), 2);
584 assert_eq!(buf.0.len(), 1);
585
586 buf.push(
588 Metric::new(
589 "name",
590 MetricKind::Absolute,
591 MetricValue::Counter { value: 1.0 },
592 )
593 .into(),
594 );
595 assert_eq!(buf.len(), 3);
596 assert_eq!(buf.0.len(), 2);
597
598 buf.push(LogEvent::default().into());
600 assert_eq!(buf.len(), 4);
601 assert_eq!(buf.0.len(), 3);
602 }
603}