1use std::{
2 collections::HashMap,
3 future::ready,
4 num::NonZeroUsize,
5 sync::{Arc, LazyLock, Mutex},
6 time::Instant,
7};
8
9use futures::{stream::FuturesOrdered, FutureExt, StreamExt, TryStreamExt};
10use futures_util::stream::FuturesUnordered;
11use metrics::gauge;
12use stream_cancel::{StreamExt as StreamCancelExt, Trigger, Tripwire};
13use tokio::{
14 select,
15 sync::{mpsc::UnboundedSender, oneshot},
16 time::timeout,
17};
18use tracing::Instrument;
19use vector_lib::config::LogNamespace;
20use vector_lib::internal_event::{
21 self, CountByteSize, EventsSent, InternalEventHandle as _, Registered,
22};
23use vector_lib::transform::update_runtime_schema_definition;
24use vector_lib::{
25 buffers::{
26 topology::{
27 builder::TopologyBuilder,
28 channel::{BufferReceiver, BufferSender},
29 },
30 BufferType, WhenFull,
31 },
32 schema::Definition,
33 EstimatedJsonEncodedSizeOf,
34};
35
36use super::{
37 fanout::{self, Fanout},
38 schema,
39 task::{Task, TaskOutput, TaskResult},
40 BuiltBuffer, ConfigDiff,
41};
42use crate::{
43 config::{
44 ComponentKey, Config, DataType, EnrichmentTableConfig, Input, Inputs, OutputId,
45 ProxyConfig, SinkContext, SourceContext, TransformContext, TransformOuter, TransformOutput,
46 },
47 event::{EventArray, EventContainer},
48 extra_context::ExtraContext,
49 internal_events::EventsReceived,
50 shutdown::SourceShutdownCoordinator,
51 source_sender::{SourceSenderItem, CHUNK_SIZE},
52 spawn_named,
53 topology::task::TaskError,
54 transforms::{SyncTransform, TaskTransform, Transform, TransformOutputs, TransformOutputsBuf},
55 utilization::{wrap, UtilizationComponentSender, UtilizationEmitter},
56 SourceSender,
57};
58
59static ENRICHMENT_TABLES: LazyLock<vector_lib::enrichment::TableRegistry> =
60 LazyLock::new(vector_lib::enrichment::TableRegistry::default);
61
62pub(crate) static SOURCE_SENDER_BUFFER_SIZE: LazyLock<usize> =
63 LazyLock::new(|| *TRANSFORM_CONCURRENCY_LIMIT * CHUNK_SIZE);
64
65const READY_ARRAY_CAPACITY: NonZeroUsize = NonZeroUsize::new(CHUNK_SIZE * 4).unwrap();
66pub(crate) const TOPOLOGY_BUFFER_SIZE: NonZeroUsize = NonZeroUsize::new(100).unwrap();
67
68static TRANSFORM_CONCURRENCY_LIMIT: LazyLock<usize> = LazyLock::new(|| {
69 crate::app::worker_threads()
70 .map(std::num::NonZeroUsize::get)
71 .unwrap_or_else(crate::num_threads)
72});
73
74const INTERNAL_SOURCES: [&str; 2] = ["internal_logs", "internal_metrics"];
75
76struct Builder<'a> {
77 config: &'a super::Config,
78 diff: &'a ConfigDiff,
79 shutdown_coordinator: SourceShutdownCoordinator,
80 errors: Vec<String>,
81 outputs: HashMap<OutputId, UnboundedSender<fanout::ControlMessage>>,
82 tasks: HashMap<ComponentKey, Task>,
83 buffers: HashMap<ComponentKey, BuiltBuffer>,
84 inputs: HashMap<ComponentKey, (BufferSender<EventArray>, Inputs<OutputId>)>,
85 healthchecks: HashMap<ComponentKey, Task>,
86 detach_triggers: HashMap<ComponentKey, Trigger>,
87 extra_context: ExtraContext,
88 utilization_emitter: UtilizationEmitter,
89}
90
91impl<'a> Builder<'a> {
92 fn new(
93 config: &'a super::Config,
94 diff: &'a ConfigDiff,
95 buffers: HashMap<ComponentKey, BuiltBuffer>,
96 extra_context: ExtraContext,
97 ) -> Self {
98 Self {
99 config,
100 diff,
101 buffers,
102 shutdown_coordinator: SourceShutdownCoordinator::default(),
103 errors: vec![],
104 outputs: HashMap::new(),
105 tasks: HashMap::new(),
106 inputs: HashMap::new(),
107 healthchecks: HashMap::new(),
108 detach_triggers: HashMap::new(),
109 extra_context,
110 utilization_emitter: UtilizationEmitter::new(),
111 }
112 }
113
114 async fn build(mut self) -> Result<TopologyPieces, Vec<String>> {
116 let enrichment_tables = self.load_enrichment_tables().await;
117 let source_tasks = self.build_sources(enrichment_tables).await;
118 self.build_transforms(enrichment_tables).await;
119 self.build_sinks(enrichment_tables).await;
120
121 enrichment_tables.finish_load();
124
125 if self.errors.is_empty() {
126 Ok(TopologyPieces {
127 inputs: self.inputs,
128 outputs: Self::finalize_outputs(self.outputs),
129 tasks: self.tasks,
130 source_tasks,
131 healthchecks: self.healthchecks,
132 shutdown_coordinator: self.shutdown_coordinator,
133 detach_triggers: self.detach_triggers,
134 utilization_emitter: Some(self.utilization_emitter),
135 })
136 } else {
137 Err(self.errors)
138 }
139 }
140
141 fn finalize_outputs(
142 outputs: HashMap<OutputId, UnboundedSender<fanout::ControlMessage>>,
143 ) -> HashMap<ComponentKey, HashMap<Option<String>, UnboundedSender<fanout::ControlMessage>>>
144 {
145 let mut finalized_outputs = HashMap::new();
146 for (id, output) in outputs {
147 let entry = finalized_outputs
148 .entry(id.component)
149 .or_insert_with(HashMap::new);
150 entry.insert(id.port, output);
151 }
152
153 finalized_outputs
154 }
155
156 async fn load_enrichment_tables(&mut self) -> &'static vector_lib::enrichment::TableRegistry {
159 let mut enrichment_tables = HashMap::new();
160
161 'tables: for (name, table_outer) in self.config.enrichment_tables.iter() {
163 let table_name = name.to_string();
164 if ENRICHMENT_TABLES.needs_reload(&table_name) {
165 let indexes = if !self.diff.enrichment_tables.is_added(name) {
166 Some(ENRICHMENT_TABLES.index_fields(&table_name))
169 } else {
170 None
171 };
172
173 let mut table = match table_outer.inner.build(&self.config.global).await {
174 Ok(table) => table,
175 Err(error) => {
176 self.errors
177 .push(format!("Enrichment Table \"{name}\": {error}"));
178 continue;
179 }
180 };
181
182 if let Some(indexes) = indexes {
183 for (case, index) in indexes {
184 match table
185 .add_index(case, &index.iter().map(|s| s.as_ref()).collect::<Vec<_>>())
186 {
187 Ok(_) => (),
188 Err(error) => {
189 error!(message = "Unable to add index to reloaded enrichment table.",
193 table = ?name.to_string(),
194 %error,
195 internal_log_rate_limit = true);
196 continue 'tables;
197 }
198 }
199 }
200 }
201
202 enrichment_tables.insert(table_name, table);
203 }
204 }
205
206 ENRICHMENT_TABLES.load(enrichment_tables);
207
208 &ENRICHMENT_TABLES
209 }
210
211 async fn build_sources(
212 &mut self,
213 enrichment_tables: &vector_lib::enrichment::TableRegistry,
214 ) -> HashMap<ComponentKey, Task> {
215 let mut source_tasks = HashMap::new();
216
217 let table_sources = self
218 .config
219 .enrichment_tables
220 .iter()
221 .filter_map(|(key, table)| table.as_source(key))
222 .collect::<Vec<_>>();
223 for (key, source) in self
224 .config
225 .sources()
226 .filter(|(key, _)| self.diff.sources.contains_new(key))
227 .chain(
228 table_sources
229 .iter()
230 .map(|(key, sink)| (key, sink))
231 .filter(|(key, _)| self.diff.enrichment_tables.contains_new(key)),
232 )
233 {
234 debug!(component = %key, "Building new source.");
235
236 let typetag = source.inner.get_component_name();
237 let source_outputs = source.inner.outputs(self.config.schema.log_namespace());
238
239 let span = error_span!(
240 "source",
241 component_kind = "source",
242 component_id = %key.id(),
243 component_type = %source.inner.get_component_name(),
244 );
245 let _entered_span = span.enter();
246
247 let task_name = format!(
248 ">> {} ({}, pump) >>",
249 source.inner.get_component_name(),
250 key.id()
251 );
252
253 let mut builder = SourceSender::builder().with_buffer(*SOURCE_SENDER_BUFFER_SIZE);
254 let mut pumps = Vec::new();
255 let mut controls = HashMap::new();
256 let mut schema_definitions = HashMap::with_capacity(source_outputs.len());
257
258 for output in source_outputs.into_iter() {
259 let mut rx = builder.add_source_output(output.clone(), key.clone());
260
261 let (mut fanout, control) = Fanout::new();
262 let source_type = source.inner.get_component_name();
263 let source = Arc::new(key.clone());
264
265 let pump = async move {
266 debug!("Source pump starting.");
267
268 while let Some(SourceSenderItem {
269 events: mut array,
270 send_reference,
271 }) = rx.next().await
272 {
273 array.set_output_id(&source);
274 array.set_source_type(source_type);
275 fanout
276 .send(array, Some(send_reference))
277 .await
278 .map_err(|e| {
279 debug!("Source pump finished with an error.");
280 TaskError::wrapped(e)
281 })?;
282 }
283
284 debug!("Source pump finished normally.");
285 Ok(TaskOutput::Source)
286 };
287
288 pumps.push(pump.instrument(span.clone()));
289 controls.insert(
290 OutputId {
291 component: key.clone(),
292 port: output.port.clone(),
293 },
294 control,
295 );
296
297 let port = output.port.clone();
298 if let Some(definition) = output.schema_definition(self.config.schema.enabled) {
299 schema_definitions.insert(port, definition);
300 }
301 }
302
303 let (pump_error_tx, mut pump_error_rx) = oneshot::channel();
304 let pump = async move {
305 debug!("Source pump supervisor starting.");
306
307 let mut handles = FuturesUnordered::new();
312 for pump in pumps {
313 handles.push(spawn_named(pump, task_name.as_ref()));
314 }
315
316 let mut had_pump_error = false;
317 while let Some(output) = handles.try_next().await? {
318 if let Err(e) = output {
319 _ = pump_error_tx.send(e);
322 had_pump_error = true;
323 break;
324 }
325 }
326
327 if had_pump_error {
328 debug!("Source pump supervisor task finished with an error.");
329 } else {
330 debug!("Source pump supervisor task finished normally.");
331 }
332 Ok(TaskOutput::Source)
333 };
334 let pump = Task::new(key.clone(), typetag, pump);
335
336 let pipeline = builder.build();
337
338 let (shutdown_signal, force_shutdown_tripwire) = self
339 .shutdown_coordinator
340 .register_source(key, INTERNAL_SOURCES.contains(&typetag));
341
342 let context = SourceContext {
343 key: key.clone(),
344 globals: self.config.global.clone(),
345 enrichment_tables: enrichment_tables.clone(),
346 shutdown: shutdown_signal,
347 out: pipeline,
348 proxy: ProxyConfig::merge_with_env(&self.config.global.proxy, &source.proxy),
349 acknowledgements: source.sink_acknowledgements,
350 schema_definitions,
351 schema: self.config.schema,
352 extra_context: self.extra_context.clone(),
353 };
354 let source = source.inner.build(context).await;
355 let server = match source {
356 Err(error) => {
357 self.errors.push(format!("Source \"{key}\": {error}"));
358 continue;
359 }
360 Ok(server) => server,
361 };
362
363 let server = async move {
371 debug!("Source starting.");
372
373 let mut result = select! {
374 biased;
375
376 _ = force_shutdown_tripwire => Ok(()),
378
379 Ok(e) = &mut pump_error_rx => Err(e),
385
386 result = server => result.map_err(|_| TaskError::Opaque),
388 };
389
390 if let Ok(e) = pump_error_rx.try_recv() {
399 result = Err(e);
400 }
401
402 match result {
403 Ok(()) => {
404 debug!("Source finished normally.");
405 Ok(TaskOutput::Source)
406 }
407 Err(e) => {
408 debug!("Source finished with an error.");
409 Err(e)
410 }
411 }
412 };
413 let server = Task::new(key.clone(), typetag, server);
414
415 self.outputs.extend(controls);
416 self.tasks.insert(key.clone(), pump);
417 source_tasks.insert(key.clone(), server);
418 }
419
420 source_tasks
421 }
422
423 async fn build_transforms(
424 &mut self,
425 enrichment_tables: &vector_lib::enrichment::TableRegistry,
426 ) {
427 let mut definition_cache = HashMap::default();
428
429 for (key, transform) in self
430 .config
431 .transforms()
432 .filter(|(key, _)| self.diff.transforms.contains_new(key))
433 {
434 debug!(component = %key, "Building new transform.");
435
436 let input_definitions = match schema::input_definitions(
437 &transform.inputs,
438 self.config,
439 enrichment_tables.clone(),
440 &mut definition_cache,
441 ) {
442 Ok(definitions) => definitions,
443 Err(_) => {
444 return;
448 }
449 };
450
451 let merged_definition: Definition = input_definitions
452 .iter()
453 .map(|(_output_id, definition)| definition.clone())
454 .reduce(Definition::merge)
455 .unwrap_or_else(Definition::any);
457
458 let span = error_span!(
459 "transform",
460 component_kind = "transform",
461 component_id = %key.id(),
462 component_type = %transform.inner.get_component_name(),
463 );
464
465 let schema_definitions = transform
467 .inner
468 .outputs(
469 enrichment_tables.clone(),
470 &input_definitions,
471 self.config.schema.log_namespace(),
472 )
473 .into_iter()
474 .map(|output| {
475 let definitions = output.schema_definitions(self.config.schema.enabled);
476 (output.port, definitions)
477 })
478 .collect::<HashMap<_, _>>();
479
480 let context = TransformContext {
481 key: Some(key.clone()),
482 globals: self.config.global.clone(),
483 enrichment_tables: enrichment_tables.clone(),
484 schema_definitions,
485 merged_schema_definition: merged_definition.clone(),
486 schema: self.config.schema,
487 extra_context: self.extra_context.clone(),
488 };
489
490 let node = TransformNode::from_parts(
491 key.clone(),
492 enrichment_tables.clone(),
493 transform,
494 &input_definitions,
495 self.config.schema.log_namespace(),
496 );
497
498 let transform = match transform
499 .inner
500 .build(&context)
501 .instrument(span.clone())
502 .await
503 {
504 Err(error) => {
505 self.errors.push(format!("Transform \"{key}\": {error}"));
506 continue;
507 }
508 Ok(transform) => transform,
509 };
510
511 let (input_tx, input_rx) =
512 TopologyBuilder::standalone_memory(TOPOLOGY_BUFFER_SIZE, WhenFull::Block, &span)
513 .await;
514
515 self.inputs
516 .insert(key.clone(), (input_tx, node.inputs.clone()));
517
518 let (transform_task, transform_outputs) = {
519 let _span = span.enter();
520 build_transform(transform, node, input_rx, &mut self.utilization_emitter)
521 };
522
523 self.outputs.extend(transform_outputs);
524 self.tasks.insert(key.clone(), transform_task);
525 }
526 }
527
528 async fn build_sinks(&mut self, enrichment_tables: &vector_lib::enrichment::TableRegistry) {
529 let table_sinks = self
530 .config
531 .enrichment_tables
532 .iter()
533 .filter_map(|(key, table)| table.as_sink(key))
534 .collect::<Vec<_>>();
535 for (key, sink) in self
536 .config
537 .sinks()
538 .filter(|(key, _)| self.diff.sinks.contains_new(key))
539 .chain(
540 table_sinks
541 .iter()
542 .map(|(key, sink)| (key, sink))
543 .filter(|(key, _)| self.diff.enrichment_tables.contains_new(key)),
544 )
545 {
546 debug!(component = %key, "Building new sink.");
547
548 let sink_inputs = &sink.inputs;
549 let healthcheck = sink.healthcheck();
550 let enable_healthcheck = healthcheck.enabled && self.config.healthchecks.enabled;
551 let healthcheck_timeout = healthcheck.timeout;
552
553 let typetag = sink.inner.get_component_name();
554 let input_type = sink.inner.input().data_type();
555
556 let span = error_span!(
557 "sink",
558 component_kind = "sink",
559 component_id = %key.id(),
560 component_type = %sink.inner.get_component_name(),
561 );
562 let _entered_span = span.enter();
563
564 if let Err(mut err) = schema::validate_sink_expectations(
568 key,
569 sink,
570 self.config,
571 enrichment_tables.clone(),
572 ) {
573 self.errors.append(&mut err);
574 };
575
576 let (tx, rx) = match self.buffers.remove(key) {
577 Some(buffer) => buffer,
578 _ => {
579 let buffer_type =
580 match sink.buffer.stages().first().expect("cant ever be empty") {
581 BufferType::Memory { .. } => "memory",
582 BufferType::DiskV2 { .. } => "disk",
583 };
584 let buffer_span = error_span!("sink", buffer_type);
585 let buffer = sink
586 .buffer
587 .build(
588 self.config.global.data_dir.clone(),
589 key.to_string(),
590 buffer_span,
591 )
592 .await;
593 match buffer {
594 Err(error) => {
595 self.errors.push(format!("Sink \"{key}\": {error}"));
596 continue;
597 }
598 Ok((tx, rx)) => (tx, Arc::new(Mutex::new(Some(rx.into_stream())))),
599 }
600 }
601 };
602
603 let cx = SinkContext {
604 healthcheck,
605 globals: self.config.global.clone(),
606 enrichment_tables: enrichment_tables.clone(),
607 proxy: ProxyConfig::merge_with_env(&self.config.global.proxy, sink.proxy()),
608 schema: self.config.schema,
609 app_name: crate::get_app_name().to_string(),
610 app_name_slug: crate::get_slugified_app_name(),
611 extra_context: self.extra_context.clone(),
612 };
613
614 let (sink, healthcheck) = match sink.inner.build(cx).await {
615 Err(error) => {
616 self.errors.push(format!("Sink \"{key}\": {error}"));
617 continue;
618 }
619 Ok(built) => built,
620 };
621
622 let (trigger, tripwire) = Tripwire::new();
623
624 let utilization_sender = self
625 .utilization_emitter
626 .add_component(key.clone(), gauge!("utilization"));
627 let component_key = key.clone();
628 let sink = async move {
629 debug!("Sink starting.");
630
631 let rx = rx
638 .lock()
639 .unwrap()
640 .take()
641 .expect("Task started but input has been taken.");
642
643 let mut rx = wrap(utilization_sender, component_key.clone(), rx);
644
645 let events_received = register!(EventsReceived);
646 sink.run(
647 rx.by_ref()
648 .filter(|events: &EventArray| ready(filter_events_type(events, input_type)))
649 .inspect(|events| {
650 events_received.emit(CountByteSize(
651 events.len(),
652 events.estimated_json_encoded_size_of(),
653 ))
654 })
655 .take_until_if(tripwire),
656 )
657 .await
658 .map(|_| {
659 debug!("Sink finished normally.");
660 TaskOutput::Sink(rx)
661 })
662 .map_err(|_| {
663 debug!("Sink finished with an error.");
664 TaskError::Opaque
665 })
666 };
667
668 let task = Task::new(key.clone(), typetag, sink);
669
670 let component_key = key.clone();
671 let healthcheck_task = async move {
672 if enable_healthcheck {
673 timeout(healthcheck_timeout, healthcheck)
674 .map(|result| match result {
675 Ok(Ok(_)) => {
676 info!("Healthcheck passed.");
677 Ok(TaskOutput::Healthcheck)
678 }
679 Ok(Err(error)) => {
680 error!(
681 msg = "Healthcheck failed.",
682 %error,
683 component_kind = "sink",
684 component_type = typetag,
685 component_id = %component_key.id(),
686 );
687 Err(TaskError::wrapped(error))
688 }
689 Err(e) => {
690 error!(
691 msg = "Healthcheck timed out.",
692 component_kind = "sink",
693 component_type = typetag,
694 component_id = %component_key.id(),
695 );
696 Err(TaskError::wrapped(Box::new(e)))
697 }
698 })
699 .await
700 } else {
701 info!("Healthcheck disabled.");
702 Ok(TaskOutput::Healthcheck)
703 }
704 };
705
706 let healthcheck_task = Task::new(key.clone(), typetag, healthcheck_task);
707
708 self.inputs.insert(key.clone(), (tx, sink_inputs.clone()));
709 self.healthchecks.insert(key.clone(), healthcheck_task);
710 self.tasks.insert(key.clone(), task);
711 self.detach_triggers.insert(key.clone(), trigger);
712 }
713 }
714}
715
716pub async fn reload_enrichment_tables(config: &Config) {
717 let mut enrichment_tables = HashMap::new();
718 'tables: for (name, table_outer) in config.enrichment_tables.iter() {
720 let table_name = name.to_string();
721 if ENRICHMENT_TABLES.needs_reload(&table_name) {
722 let indexes = Some(ENRICHMENT_TABLES.index_fields(&table_name));
723
724 let mut table = match table_outer.inner.build(&config.global).await {
725 Ok(table) => table,
726 Err(error) => {
727 error!(
728 internal_log_rate_limit = true,
729 "Enrichment table \"{name}\" reload failed: {error}",
730 );
731 continue;
732 }
733 };
734
735 if let Some(indexes) = indexes {
736 for (case, index) in indexes {
737 match table
738 .add_index(case, &index.iter().map(|s| s.as_ref()).collect::<Vec<_>>())
739 {
740 Ok(_) => (),
741 Err(error) => {
742 error!(
746 internal_log_rate_limit = true,
747 message = "Unable to add index to reloaded enrichment table.",
748 table = ?name.to_string(),
749 %error
750 );
751 continue 'tables;
752 }
753 }
754 }
755 }
756
757 enrichment_tables.insert(table_name, table);
758 }
759 }
760
761 ENRICHMENT_TABLES.load(enrichment_tables);
762 ENRICHMENT_TABLES.finish_load();
763}
764
765pub struct TopologyPieces {
766 pub(super) inputs: HashMap<ComponentKey, (BufferSender<EventArray>, Inputs<OutputId>)>,
767 pub(crate) outputs: HashMap<ComponentKey, HashMap<Option<String>, fanout::ControlChannel>>,
768 pub(super) tasks: HashMap<ComponentKey, Task>,
769 pub(crate) source_tasks: HashMap<ComponentKey, Task>,
770 pub(super) healthchecks: HashMap<ComponentKey, Task>,
771 pub(crate) shutdown_coordinator: SourceShutdownCoordinator,
772 pub(crate) detach_triggers: HashMap<ComponentKey, Trigger>,
773 pub(crate) utilization_emitter: Option<UtilizationEmitter>,
774}
775
776impl TopologyPieces {
777 pub async fn build_or_log_errors(
778 config: &Config,
779 diff: &ConfigDiff,
780 buffers: HashMap<ComponentKey, BuiltBuffer>,
781 extra_context: ExtraContext,
782 ) -> Option<Self> {
783 match TopologyPieces::build(config, diff, buffers, extra_context).await {
784 Err(errors) => {
785 for error in errors {
786 error!(message = "Configuration error.", %error);
787 }
788 None
789 }
790 Ok(new_pieces) => Some(new_pieces),
791 }
792 }
793
794 pub async fn build(
796 config: &super::Config,
797 diff: &ConfigDiff,
798 buffers: HashMap<ComponentKey, BuiltBuffer>,
799 extra_context: ExtraContext,
800 ) -> Result<Self, Vec<String>> {
801 Builder::new(config, diff, buffers, extra_context)
802 .build()
803 .await
804 }
805}
806
807const fn filter_events_type(events: &EventArray, data_type: DataType) -> bool {
808 match events {
809 EventArray::Logs(_) => data_type.contains(DataType::Log),
810 EventArray::Metrics(_) => data_type.contains(DataType::Metric),
811 EventArray::Traces(_) => data_type.contains(DataType::Trace),
812 }
813}
814
815#[derive(Debug, Clone)]
816struct TransformNode {
817 key: ComponentKey,
818 typetag: &'static str,
819 inputs: Inputs<OutputId>,
820 input_details: Input,
821 outputs: Vec<TransformOutput>,
822 enable_concurrency: bool,
823}
824
825impl TransformNode {
826 pub fn from_parts(
827 key: ComponentKey,
828 enrichment_tables: vector_lib::enrichment::TableRegistry,
829 transform: &TransformOuter<OutputId>,
830 schema_definition: &[(OutputId, Definition)],
831 global_log_namespace: LogNamespace,
832 ) -> Self {
833 Self {
834 key,
835 typetag: transform.inner.get_component_name(),
836 inputs: transform.inputs.clone(),
837 input_details: transform.inner.input(),
838 outputs: transform.inner.outputs(
839 enrichment_tables,
840 schema_definition,
841 global_log_namespace,
842 ),
843 enable_concurrency: transform.inner.enable_concurrency(),
844 }
845 }
846}
847
848fn build_transform(
849 transform: Transform,
850 node: TransformNode,
851 input_rx: BufferReceiver<EventArray>,
852 utilization_emitter: &mut UtilizationEmitter,
853) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
854 match transform {
855 Transform::Function(t) => {
857 build_sync_transform(Box::new(t), node, input_rx, utilization_emitter)
858 }
859 Transform::Synchronous(t) => build_sync_transform(t, node, input_rx, utilization_emitter),
860 Transform::Task(t) => build_task_transform(
861 t,
862 input_rx,
863 node.input_details.data_type(),
864 node.typetag,
865 &node.key,
866 &node.outputs,
867 utilization_emitter,
868 ),
869 }
870}
871
872fn build_sync_transform(
873 t: Box<dyn SyncTransform>,
874 node: TransformNode,
875 input_rx: BufferReceiver<EventArray>,
876 utilization_emitter: &mut UtilizationEmitter,
877) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
878 let (outputs, controls) = TransformOutputs::new(node.outputs, &node.key);
879
880 let sender = utilization_emitter.add_component(node.key.clone(), gauge!("utilization"));
881 let runner = Runner::new(t, input_rx, sender, node.input_details.data_type(), outputs);
882 let transform = if node.enable_concurrency {
883 runner.run_concurrently().boxed()
884 } else {
885 runner.run_inline().boxed()
886 };
887
888 let transform = async move {
889 debug!("Synchronous transform starting.");
890
891 match transform.await {
892 Ok(v) => {
893 debug!("Synchronous transform finished normally.");
894 Ok(v)
895 }
896 Err(e) => {
897 debug!("Synchronous transform finished with an error.");
898 Err(e)
899 }
900 }
901 };
902
903 let mut output_controls = HashMap::new();
904 for (name, control) in controls {
905 let id = name
906 .map(|name| OutputId::from((&node.key, name)))
907 .unwrap_or_else(|| OutputId::from(&node.key));
908 output_controls.insert(id, control);
909 }
910
911 let task = Task::new(node.key.clone(), node.typetag, transform);
912
913 (task, output_controls)
914}
915
916struct Runner {
917 transform: Box<dyn SyncTransform>,
918 input_rx: Option<BufferReceiver<EventArray>>,
919 input_type: DataType,
920 outputs: TransformOutputs,
921 timer_tx: UtilizationComponentSender,
922 events_received: Registered<EventsReceived>,
923}
924
925impl Runner {
926 fn new(
927 transform: Box<dyn SyncTransform>,
928 input_rx: BufferReceiver<EventArray>,
929 timer_tx: UtilizationComponentSender,
930 input_type: DataType,
931 outputs: TransformOutputs,
932 ) -> Self {
933 Self {
934 transform,
935 input_rx: Some(input_rx),
936 input_type,
937 outputs,
938 timer_tx,
939 events_received: register!(EventsReceived),
940 }
941 }
942
943 fn on_events_received(&mut self, events: &EventArray) {
944 self.timer_tx.try_send_stop_wait();
945
946 self.events_received.emit(CountByteSize(
947 events.len(),
948 events.estimated_json_encoded_size_of(),
949 ));
950 }
951
952 async fn send_outputs(&mut self, outputs_buf: &mut TransformOutputsBuf) -> crate::Result<()> {
953 self.timer_tx.try_send_start_wait();
954 self.outputs.send(outputs_buf).await
955 }
956
957 async fn run_inline(mut self) -> TaskResult {
958 const INLINE_BATCH_SIZE: usize = 128;
960
961 let mut outputs_buf = self.outputs.new_buf_with_capacity(INLINE_BATCH_SIZE);
962
963 let mut input_rx = self
964 .input_rx
965 .take()
966 .expect("can't run runner twice")
967 .into_stream()
968 .filter(move |events| ready(filter_events_type(events, self.input_type)));
969
970 self.timer_tx.try_send_start_wait();
971 while let Some(events) = input_rx.next().await {
972 self.on_events_received(&events);
973 self.transform.transform_all(events, &mut outputs_buf);
974 self.send_outputs(&mut outputs_buf)
975 .await
976 .map_err(TaskError::wrapped)?;
977 }
978
979 Ok(TaskOutput::Transform)
980 }
981
982 async fn run_concurrently(mut self) -> TaskResult {
983 let input_rx = self
984 .input_rx
985 .take()
986 .expect("can't run runner twice")
987 .into_stream()
988 .filter(move |events| ready(filter_events_type(events, self.input_type)));
989
990 let mut input_rx =
991 super::ready_arrays::ReadyArrays::with_capacity(input_rx, READY_ARRAY_CAPACITY);
992
993 let mut in_flight = FuturesOrdered::new();
994 let mut shutting_down = false;
995
996 self.timer_tx.try_send_start_wait();
997 loop {
998 tokio::select! {
999 biased;
1000
1001 result = in_flight.next(), if !in_flight.is_empty() => {
1002 match result {
1003 Some(Ok(outputs_buf)) => {
1004 let mut outputs_buf: TransformOutputsBuf = outputs_buf;
1005 self.send_outputs(&mut outputs_buf).await
1006 .map_err(TaskError::wrapped)?;
1007 }
1008 _ => unreachable!("join error or bad poll"),
1009 }
1010 }
1011
1012 input_arrays = input_rx.next(), if in_flight.len() < *TRANSFORM_CONCURRENCY_LIMIT && !shutting_down => {
1013 match input_arrays {
1014 Some(input_arrays) => {
1015 let mut len = 0;
1016 for events in &input_arrays {
1017 self.on_events_received(events);
1018 len += events.len();
1019 }
1020
1021 let mut t = self.transform.clone();
1022 let mut outputs_buf = self.outputs.new_buf_with_capacity(len);
1023 let task = tokio::spawn(async move {
1024 for events in input_arrays {
1025 t.transform_all(events, &mut outputs_buf);
1026 }
1027 outputs_buf
1028 }.in_current_span());
1029 in_flight.push_back(task);
1030 }
1031 None => {
1032 shutting_down = true;
1033 continue
1034 }
1035 }
1036 }
1037
1038 else => {
1039 if shutting_down {
1040 break
1041 }
1042 }
1043 }
1044 }
1045
1046 Ok(TaskOutput::Transform)
1047 }
1048}
1049
1050fn build_task_transform(
1051 t: Box<dyn TaskTransform<EventArray>>,
1052 input_rx: BufferReceiver<EventArray>,
1053 input_type: DataType,
1054 typetag: &str,
1055 key: &ComponentKey,
1056 outputs: &[TransformOutput],
1057 utilization_emitter: &mut UtilizationEmitter,
1058) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
1059 let (mut fanout, control) = Fanout::new();
1060
1061 let sender = utilization_emitter.add_component(key.clone(), gauge!("utilization"));
1062 let input_rx = wrap(sender, key.clone(), input_rx.into_stream());
1063
1064 let events_received = register!(EventsReceived);
1065 let filtered = input_rx
1066 .filter(move |events| ready(filter_events_type(events, input_type)))
1067 .inspect(move |events| {
1068 events_received.emit(CountByteSize(
1069 events.len(),
1070 events.estimated_json_encoded_size_of(),
1071 ))
1072 });
1073 let events_sent = register!(EventsSent::from(internal_event::Output(None)));
1074 let output_id = Arc::new(OutputId {
1075 component: key.clone(),
1076 port: None,
1077 });
1078
1079 let schema_definition_map = outputs
1081 .iter()
1082 .find(|x| x.port.is_none())
1083 .expect("output for default port required for task transforms")
1084 .log_schema_definitions
1085 .clone()
1086 .into_iter()
1087 .map(|(key, value)| (key, Arc::new(value)))
1088 .collect();
1089
1090 let stream = t
1091 .transform(Box::pin(filtered))
1092 .map(move |mut events| {
1093 for event in events.iter_events_mut() {
1094 update_runtime_schema_definition(event, &output_id, &schema_definition_map);
1095 }
1096 (events, Instant::now())
1097 })
1098 .inspect(move |(events, _): &(EventArray, Instant)| {
1099 events_sent.emit(CountByteSize(
1100 events.len(),
1101 events.estimated_json_encoded_size_of(),
1102 ));
1103 });
1104 let transform = async move {
1105 debug!("Task transform starting.");
1106
1107 match fanout.send_stream(stream).await {
1108 Ok(()) => {
1109 debug!("Task transform finished normally.");
1110 Ok(TaskOutput::Transform)
1111 }
1112 Err(e) => {
1113 debug!("Task transform finished with an error.");
1114 Err(TaskError::wrapped(e))
1115 }
1116 }
1117 }
1118 .boxed();
1119
1120 let mut outputs = HashMap::new();
1121 outputs.insert(OutputId::from(key), control);
1122
1123 let task = Task::new(key.clone(), typetag, transform);
1124
1125 (task, outputs)
1126}