1use std::{
2 collections::HashMap,
3 future::ready,
4 num::NonZeroUsize,
5 sync::{Arc, LazyLock, Mutex},
6 time::Instant,
7};
8
9use futures::{FutureExt, StreamExt, TryStreamExt, stream::FuturesOrdered};
10use futures_util::stream::FuturesUnordered;
11use metrics::gauge;
12use stream_cancel::{StreamExt as StreamCancelExt, Trigger, Tripwire};
13use tokio::{
14 select,
15 sync::{mpsc::UnboundedSender, oneshot},
16 time::timeout,
17};
18use tracing::Instrument;
19use vector_lib::{
20 EstimatedJsonEncodedSizeOf,
21 buffers::{
22 BufferType, WhenFull,
23 topology::{
24 builder::TopologyBuilder,
25 channel::{BufferReceiver, BufferSender},
26 },
27 },
28 config::LogNamespace,
29 internal_event::{self, CountByteSize, EventsSent, InternalEventHandle as _, Registered},
30 schema::Definition,
31 transform::update_runtime_schema_definition,
32};
33
34use super::{
35 BuiltBuffer, ConfigDiff,
36 fanout::{self, Fanout},
37 schema,
38 task::{Task, TaskOutput, TaskResult},
39};
40use crate::{
41 SourceSender,
42 config::{
43 ComponentKey, Config, DataType, EnrichmentTableConfig, Input, Inputs, OutputId,
44 ProxyConfig, SinkContext, SourceContext, TransformContext, TransformOuter, TransformOutput,
45 },
46 event::{EventArray, EventContainer},
47 extra_context::ExtraContext,
48 internal_events::EventsReceived,
49 shutdown::SourceShutdownCoordinator,
50 source_sender::{CHUNK_SIZE, SourceSenderItem},
51 spawn_named,
52 topology::task::TaskError,
53 transforms::{SyncTransform, TaskTransform, Transform, TransformOutputs, TransformOutputsBuf},
54 utilization::{UtilizationComponentSender, UtilizationEmitter, wrap},
55};
56
57static ENRICHMENT_TABLES: LazyLock<vector_lib::enrichment::TableRegistry> =
58 LazyLock::new(vector_lib::enrichment::TableRegistry::default);
59
60pub(crate) static SOURCE_SENDER_BUFFER_SIZE: LazyLock<usize> =
61 LazyLock::new(|| *TRANSFORM_CONCURRENCY_LIMIT * CHUNK_SIZE);
62
63const READY_ARRAY_CAPACITY: NonZeroUsize = NonZeroUsize::new(CHUNK_SIZE * 4).unwrap();
64pub(crate) const TOPOLOGY_BUFFER_SIZE: NonZeroUsize = NonZeroUsize::new(100).unwrap();
65
66static TRANSFORM_CONCURRENCY_LIMIT: LazyLock<usize> = LazyLock::new(|| {
67 crate::app::worker_threads()
68 .map(std::num::NonZeroUsize::get)
69 .unwrap_or_else(crate::num_threads)
70});
71
72const INTERNAL_SOURCES: [&str; 2] = ["internal_logs", "internal_metrics"];
73
74struct Builder<'a> {
75 config: &'a super::Config,
76 diff: &'a ConfigDiff,
77 shutdown_coordinator: SourceShutdownCoordinator,
78 errors: Vec<String>,
79 outputs: HashMap<OutputId, UnboundedSender<fanout::ControlMessage>>,
80 tasks: HashMap<ComponentKey, Task>,
81 buffers: HashMap<ComponentKey, BuiltBuffer>,
82 inputs: HashMap<ComponentKey, (BufferSender<EventArray>, Inputs<OutputId>)>,
83 healthchecks: HashMap<ComponentKey, Task>,
84 detach_triggers: HashMap<ComponentKey, Trigger>,
85 extra_context: ExtraContext,
86 utilization_emitter: UtilizationEmitter,
87}
88
89impl<'a> Builder<'a> {
90 fn new(
91 config: &'a super::Config,
92 diff: &'a ConfigDiff,
93 buffers: HashMap<ComponentKey, BuiltBuffer>,
94 extra_context: ExtraContext,
95 ) -> Self {
96 Self {
97 config,
98 diff,
99 buffers,
100 shutdown_coordinator: SourceShutdownCoordinator::default(),
101 errors: vec![],
102 outputs: HashMap::new(),
103 tasks: HashMap::new(),
104 inputs: HashMap::new(),
105 healthchecks: HashMap::new(),
106 detach_triggers: HashMap::new(),
107 extra_context,
108 utilization_emitter: UtilizationEmitter::new(),
109 }
110 }
111
112 async fn build(mut self) -> Result<TopologyPieces, Vec<String>> {
114 let enrichment_tables = self.load_enrichment_tables().await;
115 let source_tasks = self.build_sources(enrichment_tables).await;
116 self.build_transforms(enrichment_tables).await;
117 self.build_sinks(enrichment_tables).await;
118
119 enrichment_tables.finish_load();
122
123 if self.errors.is_empty() {
124 Ok(TopologyPieces {
125 inputs: self.inputs,
126 outputs: Self::finalize_outputs(self.outputs),
127 tasks: self.tasks,
128 source_tasks,
129 healthchecks: self.healthchecks,
130 shutdown_coordinator: self.shutdown_coordinator,
131 detach_triggers: self.detach_triggers,
132 utilization_emitter: Some(self.utilization_emitter),
133 })
134 } else {
135 Err(self.errors)
136 }
137 }
138
139 fn finalize_outputs(
140 outputs: HashMap<OutputId, UnboundedSender<fanout::ControlMessage>>,
141 ) -> HashMap<ComponentKey, HashMap<Option<String>, UnboundedSender<fanout::ControlMessage>>>
142 {
143 let mut finalized_outputs = HashMap::new();
144 for (id, output) in outputs {
145 let entry = finalized_outputs
146 .entry(id.component)
147 .or_insert_with(HashMap::new);
148 entry.insert(id.port, output);
149 }
150
151 finalized_outputs
152 }
153
154 async fn load_enrichment_tables(&mut self) -> &'static vector_lib::enrichment::TableRegistry {
157 let mut enrichment_tables = HashMap::new();
158
159 'tables: for (name, table_outer) in self.config.enrichment_tables.iter() {
161 let table_name = name.to_string();
162 if ENRICHMENT_TABLES.needs_reload(&table_name) {
163 let indexes = if !self.diff.enrichment_tables.is_added(name) {
164 Some(ENRICHMENT_TABLES.index_fields(&table_name))
167 } else {
168 None
169 };
170
171 let mut table = match table_outer.inner.build(&self.config.global).await {
172 Ok(table) => table,
173 Err(error) => {
174 self.errors
175 .push(format!("Enrichment Table \"{name}\": {error}"));
176 continue;
177 }
178 };
179
180 if let Some(indexes) = indexes {
181 for (case, index) in indexes {
182 match table
183 .add_index(case, &index.iter().map(|s| s.as_ref()).collect::<Vec<_>>())
184 {
185 Ok(_) => (),
186 Err(error) => {
187 error!(message = "Unable to add index to reloaded enrichment table.",
191 table = ?name.to_string(),
192 %error,
193 internal_log_rate_limit = true);
194 continue 'tables;
195 }
196 }
197 }
198 }
199
200 enrichment_tables.insert(table_name, table);
201 }
202 }
203
204 ENRICHMENT_TABLES.load(enrichment_tables);
205
206 &ENRICHMENT_TABLES
207 }
208
209 async fn build_sources(
210 &mut self,
211 enrichment_tables: &vector_lib::enrichment::TableRegistry,
212 ) -> HashMap<ComponentKey, Task> {
213 let mut source_tasks = HashMap::new();
214
215 let table_sources = self
216 .config
217 .enrichment_tables
218 .iter()
219 .filter_map(|(key, table)| table.as_source(key))
220 .collect::<Vec<_>>();
221 for (key, source) in self
222 .config
223 .sources()
224 .filter(|(key, _)| self.diff.sources.contains_new(key))
225 .chain(
226 table_sources
227 .iter()
228 .map(|(key, sink)| (key, sink))
229 .filter(|(key, _)| self.diff.enrichment_tables.contains_new(key)),
230 )
231 {
232 debug!(component = %key, "Building new source.");
233
234 let typetag = source.inner.get_component_name();
235 let source_outputs = source.inner.outputs(self.config.schema.log_namespace());
236
237 let span = error_span!(
238 "source",
239 component_kind = "source",
240 component_id = %key.id(),
241 component_type = %source.inner.get_component_name(),
242 );
243 let _entered_span = span.enter();
244
245 let task_name = format!(
246 ">> {} ({}, pump) >>",
247 source.inner.get_component_name(),
248 key.id()
249 );
250
251 let mut builder = SourceSender::builder().with_buffer(*SOURCE_SENDER_BUFFER_SIZE);
252 let mut pumps = Vec::new();
253 let mut controls = HashMap::new();
254 let mut schema_definitions = HashMap::with_capacity(source_outputs.len());
255
256 for output in source_outputs.into_iter() {
257 let mut rx = builder.add_source_output(output.clone(), key.clone());
258
259 let (mut fanout, control) = Fanout::new();
260 let source_type = source.inner.get_component_name();
261 let source = Arc::new(key.clone());
262
263 let pump = async move {
264 debug!("Source pump starting.");
265
266 while let Some(SourceSenderItem {
267 events: mut array,
268 send_reference,
269 }) = rx.next().await
270 {
271 array.set_output_id(&source);
272 array.set_source_type(source_type);
273 fanout
274 .send(array, Some(send_reference))
275 .await
276 .map_err(|e| {
277 debug!("Source pump finished with an error.");
278 TaskError::wrapped(e)
279 })?;
280 }
281
282 debug!("Source pump finished normally.");
283 Ok(TaskOutput::Source)
284 };
285
286 pumps.push(pump.instrument(span.clone()));
287 controls.insert(
288 OutputId {
289 component: key.clone(),
290 port: output.port.clone(),
291 },
292 control,
293 );
294
295 let port = output.port.clone();
296 if let Some(definition) = output.schema_definition(self.config.schema.enabled) {
297 schema_definitions.insert(port, definition);
298 }
299 }
300
301 let (pump_error_tx, mut pump_error_rx) = oneshot::channel();
302 let pump = async move {
303 debug!("Source pump supervisor starting.");
304
305 let mut handles = FuturesUnordered::new();
310 for pump in pumps {
311 handles.push(spawn_named(pump, task_name.as_ref()));
312 }
313
314 let mut had_pump_error = false;
315 while let Some(output) = handles.try_next().await? {
316 if let Err(e) = output {
317 _ = pump_error_tx.send(e);
320 had_pump_error = true;
321 break;
322 }
323 }
324
325 if had_pump_error {
326 debug!("Source pump supervisor task finished with an error.");
327 } else {
328 debug!("Source pump supervisor task finished normally.");
329 }
330 Ok(TaskOutput::Source)
331 };
332 let pump = Task::new(key.clone(), typetag, pump);
333
334 let pipeline = builder.build();
335
336 let (shutdown_signal, force_shutdown_tripwire) = self
337 .shutdown_coordinator
338 .register_source(key, INTERNAL_SOURCES.contains(&typetag));
339
340 let context = SourceContext {
341 key: key.clone(),
342 globals: self.config.global.clone(),
343 enrichment_tables: enrichment_tables.clone(),
344 shutdown: shutdown_signal,
345 out: pipeline,
346 proxy: ProxyConfig::merge_with_env(&self.config.global.proxy, &source.proxy),
347 acknowledgements: source.sink_acknowledgements,
348 schema_definitions,
349 schema: self.config.schema,
350 extra_context: self.extra_context.clone(),
351 };
352 let source = source.inner.build(context).await;
353 let server = match source {
354 Err(error) => {
355 self.errors.push(format!("Source \"{key}\": {error}"));
356 continue;
357 }
358 Ok(server) => server,
359 };
360
361 let server = async move {
369 debug!("Source starting.");
370
371 let mut result = select! {
372 biased;
373
374 _ = force_shutdown_tripwire => Ok(()),
376
377 Ok(e) = &mut pump_error_rx => Err(e),
383
384 result = server => result.map_err(|_| TaskError::Opaque),
386 };
387
388 if let Ok(e) = pump_error_rx.try_recv() {
397 result = Err(e);
398 }
399
400 match result {
401 Ok(()) => {
402 debug!("Source finished normally.");
403 Ok(TaskOutput::Source)
404 }
405 Err(e) => {
406 debug!("Source finished with an error.");
407 Err(e)
408 }
409 }
410 };
411 let server = Task::new(key.clone(), typetag, server);
412
413 self.outputs.extend(controls);
414 self.tasks.insert(key.clone(), pump);
415 source_tasks.insert(key.clone(), server);
416 }
417
418 source_tasks
419 }
420
421 async fn build_transforms(
422 &mut self,
423 enrichment_tables: &vector_lib::enrichment::TableRegistry,
424 ) {
425 let mut definition_cache = HashMap::default();
426
427 for (key, transform) in self
428 .config
429 .transforms()
430 .filter(|(key, _)| self.diff.transforms.contains_new(key))
431 {
432 debug!(component = %key, "Building new transform.");
433
434 let input_definitions = match schema::input_definitions(
435 &transform.inputs,
436 self.config,
437 enrichment_tables.clone(),
438 &mut definition_cache,
439 ) {
440 Ok(definitions) => definitions,
441 Err(_) => {
442 return;
446 }
447 };
448
449 let merged_definition: Definition = input_definitions
450 .iter()
451 .map(|(_output_id, definition)| definition.clone())
452 .reduce(Definition::merge)
453 .unwrap_or_else(Definition::any);
455
456 let span = error_span!(
457 "transform",
458 component_kind = "transform",
459 component_id = %key.id(),
460 component_type = %transform.inner.get_component_name(),
461 );
462
463 let schema_definitions = transform
465 .inner
466 .outputs(
467 enrichment_tables.clone(),
468 &input_definitions,
469 self.config.schema.log_namespace(),
470 )
471 .into_iter()
472 .map(|output| {
473 let definitions = output.schema_definitions(self.config.schema.enabled);
474 (output.port, definitions)
475 })
476 .collect::<HashMap<_, _>>();
477
478 let context = TransformContext {
479 key: Some(key.clone()),
480 globals: self.config.global.clone(),
481 enrichment_tables: enrichment_tables.clone(),
482 schema_definitions,
483 merged_schema_definition: merged_definition.clone(),
484 schema: self.config.schema,
485 extra_context: self.extra_context.clone(),
486 };
487
488 let node = TransformNode::from_parts(
489 key.clone(),
490 enrichment_tables.clone(),
491 transform,
492 &input_definitions,
493 self.config.schema.log_namespace(),
494 );
495
496 let transform = match transform
497 .inner
498 .build(&context)
499 .instrument(span.clone())
500 .await
501 {
502 Err(error) => {
503 self.errors.push(format!("Transform \"{key}\": {error}"));
504 continue;
505 }
506 Ok(transform) => transform,
507 };
508
509 let (input_tx, input_rx) =
510 TopologyBuilder::standalone_memory(TOPOLOGY_BUFFER_SIZE, WhenFull::Block, &span)
511 .await;
512
513 self.inputs
514 .insert(key.clone(), (input_tx, node.inputs.clone()));
515
516 let (transform_task, transform_outputs) = {
517 let _span = span.enter();
518 build_transform(transform, node, input_rx, &mut self.utilization_emitter)
519 };
520
521 self.outputs.extend(transform_outputs);
522 self.tasks.insert(key.clone(), transform_task);
523 }
524 }
525
526 async fn build_sinks(&mut self, enrichment_tables: &vector_lib::enrichment::TableRegistry) {
527 let table_sinks = self
528 .config
529 .enrichment_tables
530 .iter()
531 .filter_map(|(key, table)| table.as_sink(key))
532 .collect::<Vec<_>>();
533 for (key, sink) in self
534 .config
535 .sinks()
536 .filter(|(key, _)| self.diff.sinks.contains_new(key))
537 .chain(
538 table_sinks
539 .iter()
540 .map(|(key, sink)| (key, sink))
541 .filter(|(key, _)| self.diff.enrichment_tables.contains_new(key)),
542 )
543 {
544 debug!(component = %key, "Building new sink.");
545
546 let sink_inputs = &sink.inputs;
547 let healthcheck = sink.healthcheck();
548 let enable_healthcheck = healthcheck.enabled && self.config.healthchecks.enabled;
549 let healthcheck_timeout = healthcheck.timeout;
550
551 let typetag = sink.inner.get_component_name();
552 let input_type = sink.inner.input().data_type();
553
554 let span = error_span!(
555 "sink",
556 component_kind = "sink",
557 component_id = %key.id(),
558 component_type = %sink.inner.get_component_name(),
559 );
560 let _entered_span = span.enter();
561
562 if let Err(mut err) = schema::validate_sink_expectations(
566 key,
567 sink,
568 self.config,
569 enrichment_tables.clone(),
570 ) {
571 self.errors.append(&mut err);
572 };
573
574 let (tx, rx) = match self.buffers.remove(key) {
575 Some(buffer) => buffer,
576 _ => {
577 let buffer_type =
578 match sink.buffer.stages().first().expect("cant ever be empty") {
579 BufferType::Memory { .. } => "memory",
580 BufferType::DiskV2 { .. } => "disk",
581 };
582 let buffer_span = error_span!("sink", buffer_type);
583 let buffer = sink
584 .buffer
585 .build(
586 self.config.global.data_dir.clone(),
587 key.to_string(),
588 buffer_span,
589 )
590 .await;
591 match buffer {
592 Err(error) => {
593 self.errors.push(format!("Sink \"{key}\": {error}"));
594 continue;
595 }
596 Ok((tx, rx)) => (tx, Arc::new(Mutex::new(Some(rx.into_stream())))),
597 }
598 }
599 };
600
601 let cx = SinkContext {
602 healthcheck,
603 globals: self.config.global.clone(),
604 enrichment_tables: enrichment_tables.clone(),
605 proxy: ProxyConfig::merge_with_env(&self.config.global.proxy, sink.proxy()),
606 schema: self.config.schema,
607 app_name: crate::get_app_name().to_string(),
608 app_name_slug: crate::get_slugified_app_name(),
609 extra_context: self.extra_context.clone(),
610 };
611
612 let (sink, healthcheck) = match sink.inner.build(cx).await {
613 Err(error) => {
614 self.errors.push(format!("Sink \"{key}\": {error}"));
615 continue;
616 }
617 Ok(built) => built,
618 };
619
620 let (trigger, tripwire) = Tripwire::new();
621
622 let utilization_sender = self
623 .utilization_emitter
624 .add_component(key.clone(), gauge!("utilization"));
625 let component_key = key.clone();
626 let sink = async move {
627 debug!("Sink starting.");
628
629 let rx = rx
636 .lock()
637 .unwrap()
638 .take()
639 .expect("Task started but input has been taken.");
640
641 let mut rx = wrap(utilization_sender, component_key.clone(), rx);
642
643 let events_received = register!(EventsReceived);
644 sink.run(
645 rx.by_ref()
646 .filter(|events: &EventArray| ready(filter_events_type(events, input_type)))
647 .inspect(|events| {
648 events_received.emit(CountByteSize(
649 events.len(),
650 events.estimated_json_encoded_size_of(),
651 ))
652 })
653 .take_until_if(tripwire),
654 )
655 .await
656 .map(|_| {
657 debug!("Sink finished normally.");
658 TaskOutput::Sink(rx)
659 })
660 .map_err(|_| {
661 debug!("Sink finished with an error.");
662 TaskError::Opaque
663 })
664 };
665
666 let task = Task::new(key.clone(), typetag, sink);
667
668 let component_key = key.clone();
669 let healthcheck_task = async move {
670 if enable_healthcheck {
671 timeout(healthcheck_timeout, healthcheck)
672 .map(|result| match result {
673 Ok(Ok(_)) => {
674 info!("Healthcheck passed.");
675 Ok(TaskOutput::Healthcheck)
676 }
677 Ok(Err(error)) => {
678 error!(
679 msg = "Healthcheck failed.",
680 %error,
681 component_kind = "sink",
682 component_type = typetag,
683 component_id = %component_key.id(),
684 );
685 Err(TaskError::wrapped(error))
686 }
687 Err(e) => {
688 error!(
689 msg = "Healthcheck timed out.",
690 component_kind = "sink",
691 component_type = typetag,
692 component_id = %component_key.id(),
693 );
694 Err(TaskError::wrapped(Box::new(e)))
695 }
696 })
697 .await
698 } else {
699 info!("Healthcheck disabled.");
700 Ok(TaskOutput::Healthcheck)
701 }
702 };
703
704 let healthcheck_task = Task::new(key.clone(), typetag, healthcheck_task);
705
706 self.inputs.insert(key.clone(), (tx, sink_inputs.clone()));
707 self.healthchecks.insert(key.clone(), healthcheck_task);
708 self.tasks.insert(key.clone(), task);
709 self.detach_triggers.insert(key.clone(), trigger);
710 }
711 }
712}
713
714pub async fn reload_enrichment_tables(config: &Config) {
715 let mut enrichment_tables = HashMap::new();
716 'tables: for (name, table_outer) in config.enrichment_tables.iter() {
718 let table_name = name.to_string();
719 if ENRICHMENT_TABLES.needs_reload(&table_name) {
720 let indexes = Some(ENRICHMENT_TABLES.index_fields(&table_name));
721
722 let mut table = match table_outer.inner.build(&config.global).await {
723 Ok(table) => table,
724 Err(error) => {
725 error!(
726 internal_log_rate_limit = true,
727 "Enrichment table \"{name}\" reload failed: {error}",
728 );
729 continue;
730 }
731 };
732
733 if let Some(indexes) = indexes {
734 for (case, index) in indexes {
735 match table
736 .add_index(case, &index.iter().map(|s| s.as_ref()).collect::<Vec<_>>())
737 {
738 Ok(_) => (),
739 Err(error) => {
740 error!(
744 internal_log_rate_limit = true,
745 message = "Unable to add index to reloaded enrichment table.",
746 table = ?name.to_string(),
747 %error
748 );
749 continue 'tables;
750 }
751 }
752 }
753 }
754
755 enrichment_tables.insert(table_name, table);
756 }
757 }
758
759 ENRICHMENT_TABLES.load(enrichment_tables);
760 ENRICHMENT_TABLES.finish_load();
761}
762
763pub struct TopologyPieces {
764 pub(super) inputs: HashMap<ComponentKey, (BufferSender<EventArray>, Inputs<OutputId>)>,
765 pub(crate) outputs: HashMap<ComponentKey, HashMap<Option<String>, fanout::ControlChannel>>,
766 pub(super) tasks: HashMap<ComponentKey, Task>,
767 pub(crate) source_tasks: HashMap<ComponentKey, Task>,
768 pub(super) healthchecks: HashMap<ComponentKey, Task>,
769 pub(crate) shutdown_coordinator: SourceShutdownCoordinator,
770 pub(crate) detach_triggers: HashMap<ComponentKey, Trigger>,
771 pub(crate) utilization_emitter: Option<UtilizationEmitter>,
772}
773
774impl TopologyPieces {
775 pub async fn build_or_log_errors(
776 config: &Config,
777 diff: &ConfigDiff,
778 buffers: HashMap<ComponentKey, BuiltBuffer>,
779 extra_context: ExtraContext,
780 ) -> Option<Self> {
781 match TopologyPieces::build(config, diff, buffers, extra_context).await {
782 Err(errors) => {
783 for error in errors {
784 error!(message = "Configuration error.", %error);
785 }
786 None
787 }
788 Ok(new_pieces) => Some(new_pieces),
789 }
790 }
791
792 pub async fn build(
794 config: &super::Config,
795 diff: &ConfigDiff,
796 buffers: HashMap<ComponentKey, BuiltBuffer>,
797 extra_context: ExtraContext,
798 ) -> Result<Self, Vec<String>> {
799 Builder::new(config, diff, buffers, extra_context)
800 .build()
801 .await
802 }
803}
804
805const fn filter_events_type(events: &EventArray, data_type: DataType) -> bool {
806 match events {
807 EventArray::Logs(_) => data_type.contains(DataType::Log),
808 EventArray::Metrics(_) => data_type.contains(DataType::Metric),
809 EventArray::Traces(_) => data_type.contains(DataType::Trace),
810 }
811}
812
813#[derive(Debug, Clone)]
814struct TransformNode {
815 key: ComponentKey,
816 typetag: &'static str,
817 inputs: Inputs<OutputId>,
818 input_details: Input,
819 outputs: Vec<TransformOutput>,
820 enable_concurrency: bool,
821}
822
823impl TransformNode {
824 pub fn from_parts(
825 key: ComponentKey,
826 enrichment_tables: vector_lib::enrichment::TableRegistry,
827 transform: &TransformOuter<OutputId>,
828 schema_definition: &[(OutputId, Definition)],
829 global_log_namespace: LogNamespace,
830 ) -> Self {
831 Self {
832 key,
833 typetag: transform.inner.get_component_name(),
834 inputs: transform.inputs.clone(),
835 input_details: transform.inner.input(),
836 outputs: transform.inner.outputs(
837 enrichment_tables,
838 schema_definition,
839 global_log_namespace,
840 ),
841 enable_concurrency: transform.inner.enable_concurrency(),
842 }
843 }
844}
845
846fn build_transform(
847 transform: Transform,
848 node: TransformNode,
849 input_rx: BufferReceiver<EventArray>,
850 utilization_emitter: &mut UtilizationEmitter,
851) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
852 match transform {
853 Transform::Function(t) => {
855 build_sync_transform(Box::new(t), node, input_rx, utilization_emitter)
856 }
857 Transform::Synchronous(t) => build_sync_transform(t, node, input_rx, utilization_emitter),
858 Transform::Task(t) => build_task_transform(
859 t,
860 input_rx,
861 node.input_details.data_type(),
862 node.typetag,
863 &node.key,
864 &node.outputs,
865 utilization_emitter,
866 ),
867 }
868}
869
870fn build_sync_transform(
871 t: Box<dyn SyncTransform>,
872 node: TransformNode,
873 input_rx: BufferReceiver<EventArray>,
874 utilization_emitter: &mut UtilizationEmitter,
875) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
876 let (outputs, controls) = TransformOutputs::new(node.outputs, &node.key);
877
878 let sender = utilization_emitter.add_component(node.key.clone(), gauge!("utilization"));
879 let runner = Runner::new(t, input_rx, sender, node.input_details.data_type(), outputs);
880 let transform = if node.enable_concurrency {
881 runner.run_concurrently().boxed()
882 } else {
883 runner.run_inline().boxed()
884 };
885
886 let transform = async move {
887 debug!("Synchronous transform starting.");
888
889 match transform.await {
890 Ok(v) => {
891 debug!("Synchronous transform finished normally.");
892 Ok(v)
893 }
894 Err(e) => {
895 debug!("Synchronous transform finished with an error.");
896 Err(e)
897 }
898 }
899 };
900
901 let mut output_controls = HashMap::new();
902 for (name, control) in controls {
903 let id = name
904 .map(|name| OutputId::from((&node.key, name)))
905 .unwrap_or_else(|| OutputId::from(&node.key));
906 output_controls.insert(id, control);
907 }
908
909 let task = Task::new(node.key.clone(), node.typetag, transform);
910
911 (task, output_controls)
912}
913
914struct Runner {
915 transform: Box<dyn SyncTransform>,
916 input_rx: Option<BufferReceiver<EventArray>>,
917 input_type: DataType,
918 outputs: TransformOutputs,
919 timer_tx: UtilizationComponentSender,
920 events_received: Registered<EventsReceived>,
921}
922
923impl Runner {
924 fn new(
925 transform: Box<dyn SyncTransform>,
926 input_rx: BufferReceiver<EventArray>,
927 timer_tx: UtilizationComponentSender,
928 input_type: DataType,
929 outputs: TransformOutputs,
930 ) -> Self {
931 Self {
932 transform,
933 input_rx: Some(input_rx),
934 input_type,
935 outputs,
936 timer_tx,
937 events_received: register!(EventsReceived),
938 }
939 }
940
941 fn on_events_received(&mut self, events: &EventArray) {
942 self.timer_tx.try_send_stop_wait();
943
944 self.events_received.emit(CountByteSize(
945 events.len(),
946 events.estimated_json_encoded_size_of(),
947 ));
948 }
949
950 async fn send_outputs(&mut self, outputs_buf: &mut TransformOutputsBuf) -> crate::Result<()> {
951 self.timer_tx.try_send_start_wait();
952 self.outputs.send(outputs_buf).await
953 }
954
955 async fn run_inline(mut self) -> TaskResult {
956 const INLINE_BATCH_SIZE: usize = 128;
958
959 let mut outputs_buf = self.outputs.new_buf_with_capacity(INLINE_BATCH_SIZE);
960
961 let mut input_rx = self
962 .input_rx
963 .take()
964 .expect("can't run runner twice")
965 .into_stream()
966 .filter(move |events| ready(filter_events_type(events, self.input_type)));
967
968 self.timer_tx.try_send_start_wait();
969 while let Some(events) = input_rx.next().await {
970 self.on_events_received(&events);
971 self.transform.transform_all(events, &mut outputs_buf);
972 self.send_outputs(&mut outputs_buf)
973 .await
974 .map_err(TaskError::wrapped)?;
975 }
976
977 Ok(TaskOutput::Transform)
978 }
979
980 async fn run_concurrently(mut self) -> TaskResult {
981 let input_rx = self
982 .input_rx
983 .take()
984 .expect("can't run runner twice")
985 .into_stream()
986 .filter(move |events| ready(filter_events_type(events, self.input_type)));
987
988 let mut input_rx =
989 super::ready_arrays::ReadyArrays::with_capacity(input_rx, READY_ARRAY_CAPACITY);
990
991 let mut in_flight = FuturesOrdered::new();
992 let mut shutting_down = false;
993
994 self.timer_tx.try_send_start_wait();
995 loop {
996 tokio::select! {
997 biased;
998
999 result = in_flight.next(), if !in_flight.is_empty() => {
1000 match result {
1001 Some(Ok(outputs_buf)) => {
1002 let mut outputs_buf: TransformOutputsBuf = outputs_buf;
1003 self.send_outputs(&mut outputs_buf).await
1004 .map_err(TaskError::wrapped)?;
1005 }
1006 _ => unreachable!("join error or bad poll"),
1007 }
1008 }
1009
1010 input_arrays = input_rx.next(), if in_flight.len() < *TRANSFORM_CONCURRENCY_LIMIT && !shutting_down => {
1011 match input_arrays {
1012 Some(input_arrays) => {
1013 let mut len = 0;
1014 for events in &input_arrays {
1015 self.on_events_received(events);
1016 len += events.len();
1017 }
1018
1019 let mut t = self.transform.clone();
1020 let mut outputs_buf = self.outputs.new_buf_with_capacity(len);
1021 let task = tokio::spawn(async move {
1022 for events in input_arrays {
1023 t.transform_all(events, &mut outputs_buf);
1024 }
1025 outputs_buf
1026 }.in_current_span());
1027 in_flight.push_back(task);
1028 }
1029 None => {
1030 shutting_down = true;
1031 continue
1032 }
1033 }
1034 }
1035
1036 else => {
1037 if shutting_down {
1038 break
1039 }
1040 }
1041 }
1042 }
1043
1044 Ok(TaskOutput::Transform)
1045 }
1046}
1047
1048fn build_task_transform(
1049 t: Box<dyn TaskTransform<EventArray>>,
1050 input_rx: BufferReceiver<EventArray>,
1051 input_type: DataType,
1052 typetag: &str,
1053 key: &ComponentKey,
1054 outputs: &[TransformOutput],
1055 utilization_emitter: &mut UtilizationEmitter,
1056) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
1057 let (mut fanout, control) = Fanout::new();
1058
1059 let sender = utilization_emitter.add_component(key.clone(), gauge!("utilization"));
1060 let input_rx = wrap(sender, key.clone(), input_rx.into_stream());
1061
1062 let events_received = register!(EventsReceived);
1063 let filtered = input_rx
1064 .filter(move |events| ready(filter_events_type(events, input_type)))
1065 .inspect(move |events| {
1066 events_received.emit(CountByteSize(
1067 events.len(),
1068 events.estimated_json_encoded_size_of(),
1069 ))
1070 });
1071 let events_sent = register!(EventsSent::from(internal_event::Output(None)));
1072 let output_id = Arc::new(OutputId {
1073 component: key.clone(),
1074 port: None,
1075 });
1076
1077 let schema_definition_map = outputs
1079 .iter()
1080 .find(|x| x.port.is_none())
1081 .expect("output for default port required for task transforms")
1082 .log_schema_definitions
1083 .clone()
1084 .into_iter()
1085 .map(|(key, value)| (key, Arc::new(value)))
1086 .collect();
1087
1088 let stream = t
1089 .transform(Box::pin(filtered))
1090 .map(move |mut events| {
1091 for event in events.iter_events_mut() {
1092 update_runtime_schema_definition(event, &output_id, &schema_definition_map);
1093 }
1094 (events, Instant::now())
1095 })
1096 .inspect(move |(events, _): &(EventArray, Instant)| {
1097 events_sent.emit(CountByteSize(
1098 events.len(),
1099 events.estimated_json_encoded_size_of(),
1100 ));
1101 });
1102 let transform = async move {
1103 debug!("Task transform starting.");
1104
1105 match fanout.send_stream(stream).await {
1106 Ok(()) => {
1107 debug!("Task transform finished normally.");
1108 Ok(TaskOutput::Transform)
1109 }
1110 Err(e) => {
1111 debug!("Task transform finished with an error.");
1112 Err(TaskError::wrapped(e))
1113 }
1114 }
1115 }
1116 .boxed();
1117
1118 let mut outputs = HashMap::new();
1119 outputs.insert(OutputId::from(key), control);
1120
1121 let task = Task::new(key.clone(), typetag, transform);
1122
1123 (task, outputs)
1124}