1use std::{
2 collections::HashMap,
3 future::ready,
4 num::NonZeroUsize,
5 sync::{Arc, LazyLock, Mutex},
6 time::Instant,
7};
8
9use futures::{FutureExt, StreamExt, TryStreamExt, stream::FuturesOrdered};
10use futures_util::stream::FuturesUnordered;
11use metrics::gauge;
12use stream_cancel::{StreamExt as StreamCancelExt, Trigger, Tripwire};
13use tokio::{
14 select,
15 sync::{mpsc::UnboundedSender, oneshot},
16 time::timeout,
17};
18use tracing::Instrument;
19use vector_lib::{
20 EstimatedJsonEncodedSizeOf,
21 buffers::{
22 BufferType, WhenFull,
23 topology::{
24 builder::TopologyBuilder,
25 channel::{BufferReceiver, BufferSender},
26 },
27 },
28 config::LogNamespace,
29 internal_event::{self, CountByteSize, EventsSent, InternalEventHandle as _, Registered},
30 schema::Definition,
31 transform::update_runtime_schema_definition,
32};
33
34use super::{
35 BuiltBuffer, ConfigDiff,
36 fanout::{self, Fanout},
37 schema,
38 task::{Task, TaskOutput, TaskResult},
39};
40use crate::{
41 SourceSender,
42 config::{
43 ComponentKey, Config, DataType, EnrichmentTableConfig, Input, Inputs, OutputId,
44 ProxyConfig, SinkContext, SourceContext, TransformContext, TransformOuter, TransformOutput,
45 },
46 event::{EventArray, EventContainer},
47 extra_context::ExtraContext,
48 internal_events::EventsReceived,
49 shutdown::SourceShutdownCoordinator,
50 source_sender::{CHUNK_SIZE, SourceSenderItem},
51 spawn_named,
52 topology::task::TaskError,
53 transforms::{SyncTransform, TaskTransform, Transform, TransformOutputs, TransformOutputsBuf},
54 utilization::{UtilizationComponentSender, UtilizationEmitter, UtilizationRegistry, wrap},
55};
56
57static ENRICHMENT_TABLES: LazyLock<vector_lib::enrichment::TableRegistry> =
58 LazyLock::new(vector_lib::enrichment::TableRegistry::default);
59
60pub(crate) static SOURCE_SENDER_BUFFER_SIZE: LazyLock<usize> =
61 LazyLock::new(|| *TRANSFORM_CONCURRENCY_LIMIT * CHUNK_SIZE);
62
63const READY_ARRAY_CAPACITY: NonZeroUsize = NonZeroUsize::new(CHUNK_SIZE * 4).unwrap();
64pub(crate) const TOPOLOGY_BUFFER_SIZE: NonZeroUsize = NonZeroUsize::new(100).unwrap();
65
66static TRANSFORM_CONCURRENCY_LIMIT: LazyLock<usize> = LazyLock::new(|| {
67 crate::app::worker_threads()
68 .map(std::num::NonZeroUsize::get)
69 .unwrap_or_else(crate::num_threads)
70});
71
72const INTERNAL_SOURCES: [&str; 2] = ["internal_logs", "internal_metrics"];
73
74struct Builder<'a> {
75 config: &'a super::Config,
76 diff: &'a ConfigDiff,
77 shutdown_coordinator: SourceShutdownCoordinator,
78 errors: Vec<String>,
79 outputs: HashMap<OutputId, UnboundedSender<fanout::ControlMessage>>,
80 tasks: HashMap<ComponentKey, Task>,
81 buffers: HashMap<ComponentKey, BuiltBuffer>,
82 inputs: HashMap<ComponentKey, (BufferSender<EventArray>, Inputs<OutputId>)>,
83 healthchecks: HashMap<ComponentKey, Task>,
84 detach_triggers: HashMap<ComponentKey, Trigger>,
85 extra_context: ExtraContext,
86 utilization_emitter: Option<UtilizationEmitter>,
87 utilization_registry: UtilizationRegistry,
88}
89
90impl<'a> Builder<'a> {
91 fn new(
92 config: &'a super::Config,
93 diff: &'a ConfigDiff,
94 buffers: HashMap<ComponentKey, BuiltBuffer>,
95 extra_context: ExtraContext,
96 utilization_registry: Option<UtilizationRegistry>,
97 ) -> Self {
98 let (emitter, registry) = if let Some(registry) = utilization_registry {
101 (None, registry)
102 } else {
103 let (emitter, registry) = UtilizationEmitter::new();
104 (Some(emitter), registry)
105 };
106 Self {
107 config,
108 diff,
109 buffers,
110 shutdown_coordinator: SourceShutdownCoordinator::default(),
111 errors: vec![],
112 outputs: HashMap::new(),
113 tasks: HashMap::new(),
114 inputs: HashMap::new(),
115 healthchecks: HashMap::new(),
116 detach_triggers: HashMap::new(),
117 extra_context,
118 utilization_emitter: emitter,
119 utilization_registry: registry,
120 }
121 }
122
123 async fn build(mut self) -> Result<TopologyPieces, Vec<String>> {
125 let enrichment_tables = self.load_enrichment_tables().await;
126 let source_tasks = self.build_sources(enrichment_tables).await;
127 self.build_transforms(enrichment_tables).await;
128 self.build_sinks(enrichment_tables).await;
129
130 enrichment_tables.finish_load();
133
134 if self.errors.is_empty() {
135 Ok(TopologyPieces {
136 inputs: self.inputs,
137 outputs: Self::finalize_outputs(self.outputs),
138 tasks: self.tasks,
139 source_tasks,
140 healthchecks: self.healthchecks,
141 shutdown_coordinator: self.shutdown_coordinator,
142 detach_triggers: self.detach_triggers,
143 utilization: self
144 .utilization_emitter
145 .map(|e| (e, self.utilization_registry)),
146 })
147 } else {
148 Err(self.errors)
149 }
150 }
151
152 fn finalize_outputs(
153 outputs: HashMap<OutputId, UnboundedSender<fanout::ControlMessage>>,
154 ) -> HashMap<ComponentKey, HashMap<Option<String>, UnboundedSender<fanout::ControlMessage>>>
155 {
156 let mut finalized_outputs = HashMap::new();
157 for (id, output) in outputs {
158 let entry = finalized_outputs
159 .entry(id.component)
160 .or_insert_with(HashMap::new);
161 entry.insert(id.port, output);
162 }
163
164 finalized_outputs
165 }
166
167 async fn load_enrichment_tables(&mut self) -> &'static vector_lib::enrichment::TableRegistry {
170 let mut enrichment_tables = HashMap::new();
171
172 'tables: for (name, table_outer) in self.config.enrichment_tables.iter() {
174 let table_name = name.to_string();
175 if ENRICHMENT_TABLES.needs_reload(&table_name) {
176 let indexes = if !self.diff.enrichment_tables.is_added(name) {
177 Some(ENRICHMENT_TABLES.index_fields(&table_name))
180 } else {
181 None
182 };
183
184 let mut table = match table_outer.inner.build(&self.config.global).await {
185 Ok(table) => table,
186 Err(error) => {
187 self.errors
188 .push(format!("Enrichment Table \"{name}\": {error}"));
189 continue;
190 }
191 };
192
193 if let Some(indexes) = indexes {
194 for (case, index) in indexes {
195 match table
196 .add_index(case, &index.iter().map(|s| s.as_ref()).collect::<Vec<_>>())
197 {
198 Ok(_) => (),
199 Err(error) => {
200 error!(message = "Unable to add index to reloaded enrichment table.",
204 table = ?name.to_string(),
205 %error);
206 continue 'tables;
207 }
208 }
209 }
210 }
211
212 enrichment_tables.insert(table_name, table);
213 }
214 }
215
216 ENRICHMENT_TABLES.load(enrichment_tables);
217
218 &ENRICHMENT_TABLES
219 }
220
221 async fn build_sources(
222 &mut self,
223 enrichment_tables: &vector_lib::enrichment::TableRegistry,
224 ) -> HashMap<ComponentKey, Task> {
225 let mut source_tasks = HashMap::new();
226
227 let table_sources = self
228 .config
229 .enrichment_tables
230 .iter()
231 .filter_map(|(key, table)| table.as_source(key))
232 .collect::<Vec<_>>();
233 for (key, source) in self
234 .config
235 .sources()
236 .filter(|(key, _)| self.diff.sources.contains_new(key))
237 .chain(
238 table_sources
239 .iter()
240 .map(|(key, source)| (key, source))
241 .filter(|(key, _)| self.diff.enrichment_tables.contains_new(key)),
242 )
243 {
244 debug!(component_id = %key, "Building new source.");
245
246 let typetag = source.inner.get_component_name();
247 let source_outputs = source.inner.outputs(self.config.schema.log_namespace());
248
249 let span = error_span!(
250 "source",
251 component_kind = "source",
252 component_id = %key.id(),
253 component_type = %source.inner.get_component_name(),
254 );
255 let _entered_span = span.enter();
256
257 let task_name = format!(
258 ">> {} ({}, pump) >>",
259 source.inner.get_component_name(),
260 key.id()
261 );
262
263 let mut builder = SourceSender::builder().with_buffer(*SOURCE_SENDER_BUFFER_SIZE);
264 let mut pumps = Vec::new();
265 let mut controls = HashMap::new();
266 let mut schema_definitions = HashMap::with_capacity(source_outputs.len());
267
268 for output in source_outputs.into_iter() {
269 let mut rx = builder.add_source_output(output.clone(), key.clone());
270
271 let (mut fanout, control) = Fanout::new();
272 let source_type = source.inner.get_component_name();
273 let source = Arc::new(key.clone());
274
275 let pump = async move {
276 debug!("Source pump starting.");
277
278 while let Some(SourceSenderItem {
279 events: mut array,
280 send_reference,
281 }) = rx.next().await
282 {
283 array.set_output_id(&source);
284 array.set_source_type(source_type);
285 fanout
286 .send(array, Some(send_reference))
287 .await
288 .map_err(|e| {
289 debug!("Source pump finished with an error.");
290 TaskError::wrapped(e)
291 })?;
292 }
293
294 debug!("Source pump finished normally.");
295 Ok(TaskOutput::Source)
296 };
297
298 pumps.push(pump.instrument(span.clone()));
299 controls.insert(
300 OutputId {
301 component: key.clone(),
302 port: output.port.clone(),
303 },
304 control,
305 );
306
307 let port = output.port.clone();
308 if let Some(definition) = output.schema_definition(self.config.schema.enabled) {
309 schema_definitions.insert(port, definition);
310 }
311 }
312
313 let (pump_error_tx, mut pump_error_rx) = oneshot::channel();
314 let pump = async move {
315 debug!("Source pump supervisor starting.");
316
317 let mut handles = FuturesUnordered::new();
322 for pump in pumps {
323 handles.push(spawn_named(pump, task_name.as_ref()));
324 }
325
326 let mut had_pump_error = false;
327 while let Some(output) = handles.try_next().await? {
328 if let Err(e) = output {
329 _ = pump_error_tx.send(e);
332 had_pump_error = true;
333 break;
334 }
335 }
336
337 if had_pump_error {
338 debug!("Source pump supervisor task finished with an error.");
339 } else {
340 debug!("Source pump supervisor task finished normally.");
341 }
342 Ok(TaskOutput::Source)
343 };
344 let pump = Task::new(key.clone(), typetag, pump);
345
346 let pipeline = builder.build();
347
348 let (shutdown_signal, force_shutdown_tripwire) = self
349 .shutdown_coordinator
350 .register_source(key, INTERNAL_SOURCES.contains(&typetag));
351
352 let context = SourceContext {
353 key: key.clone(),
354 globals: self.config.global.clone(),
355 enrichment_tables: enrichment_tables.clone(),
356 shutdown: shutdown_signal,
357 out: pipeline,
358 proxy: ProxyConfig::merge_with_env(&self.config.global.proxy, &source.proxy),
359 acknowledgements: source.sink_acknowledgements,
360 schema_definitions,
361 schema: self.config.schema,
362 extra_context: self.extra_context.clone(),
363 };
364 let source = source.inner.build(context).await;
365 let server = match source {
366 Err(error) => {
367 self.errors.push(format!("Source \"{key}\": {error}"));
368 continue;
369 }
370 Ok(server) => server,
371 };
372
373 let server = async move {
381 debug!("Source starting.");
382
383 let mut result = select! {
384 biased;
385
386 _ = force_shutdown_tripwire => Ok(()),
388
389 Ok(e) = &mut pump_error_rx => Err(e),
395
396 result = server => result.map_err(|_| TaskError::Opaque),
398 };
399
400 if let Ok(e) = pump_error_rx.try_recv() {
409 result = Err(e);
410 }
411
412 match result {
413 Ok(()) => {
414 debug!("Source finished normally.");
415 Ok(TaskOutput::Source)
416 }
417 Err(e) => {
418 debug!("Source finished with an error.");
419 Err(e)
420 }
421 }
422 };
423 let server = Task::new(key.clone(), typetag, server);
424
425 self.outputs.extend(controls);
426 self.tasks.insert(key.clone(), pump);
427 source_tasks.insert(key.clone(), server);
428 }
429
430 source_tasks
431 }
432
433 async fn build_transforms(
434 &mut self,
435 enrichment_tables: &vector_lib::enrichment::TableRegistry,
436 ) {
437 let mut definition_cache = HashMap::default();
438
439 for (key, transform) in self
440 .config
441 .transforms()
442 .filter(|(key, _)| self.diff.transforms.contains_new(key))
443 {
444 debug!(component_id = %key, "Building new transform.");
445
446 let input_definitions = match schema::input_definitions(
447 &transform.inputs,
448 self.config,
449 enrichment_tables.clone(),
450 &mut definition_cache,
451 ) {
452 Ok(definitions) => definitions,
453 Err(_) => {
454 return;
458 }
459 };
460
461 let merged_definition: Definition = input_definitions
462 .iter()
463 .map(|(_output_id, definition)| definition.clone())
464 .reduce(Definition::merge)
465 .unwrap_or_else(Definition::any);
467
468 let span = error_span!(
469 "transform",
470 component_kind = "transform",
471 component_id = %key.id(),
472 component_type = %transform.inner.get_component_name(),
473 );
474
475 let schema_definitions = transform
477 .inner
478 .outputs(
479 enrichment_tables.clone(),
480 &input_definitions,
481 self.config.schema.log_namespace(),
482 )
483 .into_iter()
484 .map(|output| {
485 let definitions = output.schema_definitions(self.config.schema.enabled);
486 (output.port, definitions)
487 })
488 .collect::<HashMap<_, _>>();
489
490 let context = TransformContext {
491 key: Some(key.clone()),
492 globals: self.config.global.clone(),
493 enrichment_tables: enrichment_tables.clone(),
494 schema_definitions,
495 merged_schema_definition: merged_definition.clone(),
496 schema: self.config.schema,
497 extra_context: self.extra_context.clone(),
498 };
499
500 let node = TransformNode::from_parts(
501 key.clone(),
502 enrichment_tables.clone(),
503 transform,
504 &input_definitions,
505 self.config.schema.log_namespace(),
506 );
507
508 let transform = match transform
509 .inner
510 .build(&context)
511 .instrument(span.clone())
512 .await
513 {
514 Err(error) => {
515 self.errors.push(format!("Transform \"{key}\": {error}"));
516 continue;
517 }
518 Ok(transform) => transform,
519 };
520
521 let (input_tx, input_rx) =
522 TopologyBuilder::standalone_memory(TOPOLOGY_BUFFER_SIZE, WhenFull::Block, &span)
523 .await;
524
525 self.inputs
526 .insert(key.clone(), (input_tx, node.inputs.clone()));
527
528 let (transform_task, transform_outputs) = {
529 let _span = span.enter();
530 build_transform(transform, node, input_rx, &self.utilization_registry)
531 };
532
533 self.outputs.extend(transform_outputs);
534 self.tasks.insert(key.clone(), transform_task);
535 }
536 }
537
538 async fn build_sinks(&mut self, enrichment_tables: &vector_lib::enrichment::TableRegistry) {
539 let table_sinks = self
540 .config
541 .enrichment_tables
542 .iter()
543 .filter_map(|(key, table)| table.as_sink(key))
544 .collect::<Vec<_>>();
545 for (key, sink) in self
546 .config
547 .sinks()
548 .filter(|(key, _)| self.diff.sinks.contains_new(key))
549 .chain(
550 table_sinks
551 .iter()
552 .map(|(key, sink)| (key, sink))
553 .filter(|(key, _)| self.diff.enrichment_tables.contains_new(key)),
554 )
555 {
556 debug!(component_id = %key, "Building new sink.");
557
558 let sink_inputs = &sink.inputs;
559 let healthcheck = sink.healthcheck();
560 let enable_healthcheck = healthcheck.enabled && self.config.healthchecks.enabled;
561 let healthcheck_timeout = healthcheck.timeout;
562
563 let typetag = sink.inner.get_component_name();
564 let input_type = sink.inner.input().data_type();
565
566 let span = error_span!(
567 "sink",
568 component_kind = "sink",
569 component_id = %key.id(),
570 component_type = %sink.inner.get_component_name(),
571 );
572 let _entered_span = span.enter();
573
574 if let Err(mut err) = schema::validate_sink_expectations(
578 key,
579 sink,
580 self.config,
581 enrichment_tables.clone(),
582 ) {
583 self.errors.append(&mut err);
584 };
585
586 let (tx, rx) = match self.buffers.remove(key) {
587 Some(buffer) => buffer,
588 _ => {
589 let buffer_type =
590 match sink.buffer.stages().first().expect("cant ever be empty") {
591 BufferType::Memory { .. } => "memory",
592 BufferType::DiskV2 { .. } => "disk",
593 };
594 let buffer_span = error_span!("sink", buffer_type);
595 let buffer = sink
596 .buffer
597 .build(
598 self.config.global.data_dir.clone(),
599 key.to_string(),
600 buffer_span,
601 )
602 .await;
603 match buffer {
604 Err(error) => {
605 self.errors.push(format!("Sink \"{key}\": {error}"));
606 continue;
607 }
608 Ok((tx, rx)) => (tx, Arc::new(Mutex::new(Some(rx.into_stream())))),
609 }
610 }
611 };
612
613 let cx = SinkContext {
614 healthcheck,
615 globals: self.config.global.clone(),
616 enrichment_tables: enrichment_tables.clone(),
617 proxy: ProxyConfig::merge_with_env(&self.config.global.proxy, sink.proxy()),
618 schema: self.config.schema,
619 app_name: crate::get_app_name().to_string(),
620 app_name_slug: crate::get_slugified_app_name(),
621 extra_context: self.extra_context.clone(),
622 };
623
624 let (sink, healthcheck) = match sink.inner.build(cx).await {
625 Err(error) => {
626 self.errors.push(format!("Sink \"{key}\": {error}"));
627 continue;
628 }
629 Ok(built) => built,
630 };
631
632 let (trigger, tripwire) = Tripwire::new();
633
634 let utilization_sender = self
635 .utilization_registry
636 .add_component(key.clone(), gauge!("utilization"));
637 let component_key = key.clone();
638 let sink = async move {
639 debug!("Sink starting.");
640
641 let rx = rx
648 .lock()
649 .unwrap()
650 .take()
651 .expect("Task started but input has been taken.");
652
653 let mut rx = wrap(utilization_sender, component_key.clone(), rx);
654
655 let events_received = register!(EventsReceived);
656 sink.run(
657 rx.by_ref()
658 .filter(|events: &EventArray| ready(filter_events_type(events, input_type)))
659 .inspect(|events| {
660 events_received.emit(CountByteSize(
661 events.len(),
662 events.estimated_json_encoded_size_of(),
663 ))
664 })
665 .take_until_if(tripwire),
666 )
667 .await
668 .map(|_| {
669 debug!("Sink finished normally.");
670 TaskOutput::Sink(rx)
671 })
672 .map_err(|_| {
673 debug!("Sink finished with an error.");
674 TaskError::Opaque
675 })
676 };
677
678 let task = Task::new(key.clone(), typetag, sink);
679
680 let component_key = key.clone();
681 let healthcheck_task = async move {
682 if enable_healthcheck {
683 timeout(healthcheck_timeout, healthcheck)
684 .map(|result| match result {
685 Ok(Ok(_)) => {
686 info!("Healthcheck passed.");
687 Ok(TaskOutput::Healthcheck)
688 }
689 Ok(Err(error)) => {
690 error!(
691 msg = "Healthcheck failed.",
692 %error,
693 component_kind = "sink",
694 component_type = typetag,
695 component_id = %component_key.id(),
696 );
697 Err(TaskError::wrapped(error))
698 }
699 Err(e) => {
700 error!(
701 msg = "Healthcheck timed out.",
702 component_kind = "sink",
703 component_type = typetag,
704 component_id = %component_key.id(),
705 );
706 Err(TaskError::wrapped(Box::new(e)))
707 }
708 })
709 .await
710 } else {
711 info!("Healthcheck disabled.");
712 Ok(TaskOutput::Healthcheck)
713 }
714 };
715
716 let healthcheck_task = Task::new(key.clone(), typetag, healthcheck_task);
717
718 self.inputs.insert(key.clone(), (tx, sink_inputs.clone()));
719 self.healthchecks.insert(key.clone(), healthcheck_task);
720 self.tasks.insert(key.clone(), task);
721 self.detach_triggers.insert(key.clone(), trigger);
722 }
723 }
724}
725
726pub async fn reload_enrichment_tables(config: &Config) {
727 let mut enrichment_tables = HashMap::new();
728 'tables: for (name, table_outer) in config.enrichment_tables.iter() {
730 let table_name = name.to_string();
731 if ENRICHMENT_TABLES.needs_reload(&table_name) {
732 let indexes = Some(ENRICHMENT_TABLES.index_fields(&table_name));
733
734 let mut table = match table_outer.inner.build(&config.global).await {
735 Ok(table) => table,
736 Err(error) => {
737 error!("Enrichment table \"{name}\" reload failed: {error}");
738 continue;
739 }
740 };
741
742 if let Some(indexes) = indexes {
743 for (case, index) in indexes {
744 match table
745 .add_index(case, &index.iter().map(|s| s.as_ref()).collect::<Vec<_>>())
746 {
747 Ok(_) => (),
748 Err(error) => {
749 error!(
753 message = "Unable to add index to reloaded enrichment table.",
754 table = ?name.to_string(),
755 %error
756 );
757 continue 'tables;
758 }
759 }
760 }
761 }
762
763 enrichment_tables.insert(table_name, table);
764 }
765 }
766
767 ENRICHMENT_TABLES.load(enrichment_tables);
768 ENRICHMENT_TABLES.finish_load();
769}
770
771pub struct TopologyPieces {
772 pub(super) inputs: HashMap<ComponentKey, (BufferSender<EventArray>, Inputs<OutputId>)>,
773 pub(crate) outputs: HashMap<ComponentKey, HashMap<Option<String>, fanout::ControlChannel>>,
774 pub(super) tasks: HashMap<ComponentKey, Task>,
775 pub(crate) source_tasks: HashMap<ComponentKey, Task>,
776 pub(super) healthchecks: HashMap<ComponentKey, Task>,
777 pub(crate) shutdown_coordinator: SourceShutdownCoordinator,
778 pub(crate) detach_triggers: HashMap<ComponentKey, Trigger>,
779 pub(crate) utilization: Option<(UtilizationEmitter, UtilizationRegistry)>,
780}
781
782pub struct TopologyPiecesBuilder<'a> {
794 config: &'a Config,
795 diff: &'a ConfigDiff,
796 buffers: HashMap<ComponentKey, BuiltBuffer>,
797 extra_context: ExtraContext,
798 utilization_registry: Option<UtilizationRegistry>,
799}
800
801impl<'a> TopologyPiecesBuilder<'a> {
802 pub fn new(config: &'a Config, diff: &'a ConfigDiff) -> Self {
804 Self {
805 config,
806 diff,
807 buffers: HashMap::new(),
808 extra_context: ExtraContext::default(),
809 utilization_registry: None,
810 }
811 }
812
813 pub fn with_buffers(mut self, buffers: HashMap<ComponentKey, BuiltBuffer>) -> Self {
815 self.buffers = buffers;
816 self
817 }
818
819 pub fn with_extra_context(mut self, extra_context: ExtraContext) -> Self {
821 self.extra_context = extra_context;
822 self
823 }
824
825 pub fn with_utilization_registry(mut self, registry: Option<UtilizationRegistry>) -> Self {
827 self.utilization_registry = registry;
828 self
829 }
830
831 pub async fn build(self) -> Result<TopologyPieces, Vec<String>> {
836 Builder::new(
837 self.config,
838 self.diff,
839 self.buffers,
840 self.extra_context,
841 self.utilization_registry,
842 )
843 .build()
844 .await
845 }
846
847 pub async fn build_or_log_errors(self) -> Option<TopologyPieces> {
852 match self.build().await {
853 Err(errors) => {
854 for error in errors {
855 error!(message = "Configuration error.", %error, internal_log_rate_limit = false);
856 }
857 None
858 }
859 Ok(new_pieces) => Some(new_pieces),
860 }
861 }
862}
863
864impl TopologyPieces {
865 pub async fn build_or_log_errors(
866 config: &Config,
867 diff: &ConfigDiff,
868 buffers: HashMap<ComponentKey, BuiltBuffer>,
869 extra_context: ExtraContext,
870 utilization_registry: Option<UtilizationRegistry>,
871 ) -> Option<Self> {
872 TopologyPiecesBuilder::new(config, diff)
873 .with_buffers(buffers)
874 .with_extra_context(extra_context)
875 .with_utilization_registry(utilization_registry)
876 .build_or_log_errors()
877 .await
878 }
879
880 pub async fn build(
882 config: &super::Config,
883 diff: &ConfigDiff,
884 buffers: HashMap<ComponentKey, BuiltBuffer>,
885 extra_context: ExtraContext,
886 utilization_registry: Option<UtilizationRegistry>,
887 ) -> Result<Self, Vec<String>> {
888 TopologyPiecesBuilder::new(config, diff)
889 .with_buffers(buffers)
890 .with_extra_context(extra_context)
891 .with_utilization_registry(utilization_registry)
892 .build()
893 .await
894 }
895}
896
897const fn filter_events_type(events: &EventArray, data_type: DataType) -> bool {
898 match events {
899 EventArray::Logs(_) => data_type.contains(DataType::Log),
900 EventArray::Metrics(_) => data_type.contains(DataType::Metric),
901 EventArray::Traces(_) => data_type.contains(DataType::Trace),
902 }
903}
904
905#[derive(Debug, Clone)]
906struct TransformNode {
907 key: ComponentKey,
908 typetag: &'static str,
909 inputs: Inputs<OutputId>,
910 input_details: Input,
911 outputs: Vec<TransformOutput>,
912 enable_concurrency: bool,
913}
914
915impl TransformNode {
916 pub fn from_parts(
917 key: ComponentKey,
918 enrichment_tables: vector_lib::enrichment::TableRegistry,
919 transform: &TransformOuter<OutputId>,
920 schema_definition: &[(OutputId, Definition)],
921 global_log_namespace: LogNamespace,
922 ) -> Self {
923 Self {
924 key,
925 typetag: transform.inner.get_component_name(),
926 inputs: transform.inputs.clone(),
927 input_details: transform.inner.input(),
928 outputs: transform.inner.outputs(
929 enrichment_tables,
930 schema_definition,
931 global_log_namespace,
932 ),
933 enable_concurrency: transform.inner.enable_concurrency(),
934 }
935 }
936}
937
938fn build_transform(
939 transform: Transform,
940 node: TransformNode,
941 input_rx: BufferReceiver<EventArray>,
942 utilization_registry: &UtilizationRegistry,
943) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
944 match transform {
945 Transform::Function(t) => {
947 build_sync_transform(Box::new(t), node, input_rx, utilization_registry)
948 }
949 Transform::Synchronous(t) => build_sync_transform(t, node, input_rx, utilization_registry),
950 Transform::Task(t) => build_task_transform(
951 t,
952 input_rx,
953 node.input_details.data_type(),
954 node.typetag,
955 &node.key,
956 &node.outputs,
957 utilization_registry,
958 ),
959 }
960}
961
962fn build_sync_transform(
963 t: Box<dyn SyncTransform>,
964 node: TransformNode,
965 input_rx: BufferReceiver<EventArray>,
966 utilization_registry: &UtilizationRegistry,
967) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
968 let (outputs, controls) = TransformOutputs::new(node.outputs, &node.key);
969
970 let sender = utilization_registry.add_component(node.key.clone(), gauge!("utilization"));
971 let runner = Runner::new(t, input_rx, sender, node.input_details.data_type(), outputs);
972 let transform = if node.enable_concurrency {
973 runner.run_concurrently().boxed()
974 } else {
975 runner.run_inline().boxed()
976 };
977
978 let transform = async move {
979 debug!("Synchronous transform starting.");
980
981 match transform.await {
982 Ok(v) => {
983 debug!("Synchronous transform finished normally.");
984 Ok(v)
985 }
986 Err(e) => {
987 debug!("Synchronous transform finished with an error.");
988 Err(e)
989 }
990 }
991 };
992
993 let mut output_controls = HashMap::new();
994 for (name, control) in controls {
995 let id = name
996 .map(|name| OutputId::from((&node.key, name)))
997 .unwrap_or_else(|| OutputId::from(&node.key));
998 output_controls.insert(id, control);
999 }
1000
1001 let task = Task::new(node.key.clone(), node.typetag, transform);
1002
1003 (task, output_controls)
1004}
1005
1006struct Runner {
1007 transform: Box<dyn SyncTransform>,
1008 input_rx: Option<BufferReceiver<EventArray>>,
1009 input_type: DataType,
1010 outputs: TransformOutputs,
1011 timer_tx: UtilizationComponentSender,
1012 events_received: Registered<EventsReceived>,
1013}
1014
1015impl Runner {
1016 fn new(
1017 transform: Box<dyn SyncTransform>,
1018 input_rx: BufferReceiver<EventArray>,
1019 timer_tx: UtilizationComponentSender,
1020 input_type: DataType,
1021 outputs: TransformOutputs,
1022 ) -> Self {
1023 Self {
1024 transform,
1025 input_rx: Some(input_rx),
1026 input_type,
1027 outputs,
1028 timer_tx,
1029 events_received: register!(EventsReceived),
1030 }
1031 }
1032
1033 fn on_events_received(&mut self, events: &EventArray) {
1034 self.timer_tx.try_send_stop_wait();
1035
1036 self.events_received.emit(CountByteSize(
1037 events.len(),
1038 events.estimated_json_encoded_size_of(),
1039 ));
1040 }
1041
1042 async fn send_outputs(&mut self, outputs_buf: &mut TransformOutputsBuf) -> crate::Result<()> {
1043 self.timer_tx.try_send_start_wait();
1044 self.outputs.send(outputs_buf).await
1045 }
1046
1047 async fn run_inline(mut self) -> TaskResult {
1048 const INLINE_BATCH_SIZE: usize = 128;
1050
1051 let mut outputs_buf = self.outputs.new_buf_with_capacity(INLINE_BATCH_SIZE);
1052
1053 let mut input_rx = self
1054 .input_rx
1055 .take()
1056 .expect("can't run runner twice")
1057 .into_stream()
1058 .filter(move |events| ready(filter_events_type(events, self.input_type)));
1059
1060 self.timer_tx.try_send_start_wait();
1061 while let Some(events) = input_rx.next().await {
1062 self.on_events_received(&events);
1063 self.transform.transform_all(events, &mut outputs_buf);
1064 self.send_outputs(&mut outputs_buf)
1065 .await
1066 .map_err(TaskError::wrapped)?;
1067 }
1068
1069 Ok(TaskOutput::Transform)
1070 }
1071
1072 async fn run_concurrently(mut self) -> TaskResult {
1073 let input_rx = self
1074 .input_rx
1075 .take()
1076 .expect("can't run runner twice")
1077 .into_stream()
1078 .filter(move |events| ready(filter_events_type(events, self.input_type)));
1079
1080 let mut input_rx =
1081 super::ready_arrays::ReadyArrays::with_capacity(input_rx, READY_ARRAY_CAPACITY);
1082
1083 let mut in_flight = FuturesOrdered::new();
1084 let mut shutting_down = false;
1085
1086 self.timer_tx.try_send_start_wait();
1087 loop {
1088 tokio::select! {
1089 biased;
1090
1091 result = in_flight.next(), if !in_flight.is_empty() => {
1092 match result {
1093 Some(Ok(outputs_buf)) => {
1094 let mut outputs_buf: TransformOutputsBuf = outputs_buf;
1095 self.send_outputs(&mut outputs_buf).await
1096 .map_err(TaskError::wrapped)?;
1097 }
1098 _ => unreachable!("join error or bad poll"),
1099 }
1100 }
1101
1102 input_arrays = input_rx.next(), if in_flight.len() < *TRANSFORM_CONCURRENCY_LIMIT && !shutting_down => {
1103 match input_arrays {
1104 Some(input_arrays) => {
1105 let mut len = 0;
1106 for events in &input_arrays {
1107 self.on_events_received(events);
1108 len += events.len();
1109 }
1110
1111 let mut t = self.transform.clone();
1112 let mut outputs_buf = self.outputs.new_buf_with_capacity(len);
1113 let task = tokio::spawn(async move {
1114 for events in input_arrays {
1115 t.transform_all(events, &mut outputs_buf);
1116 }
1117 outputs_buf
1118 }.in_current_span());
1119 in_flight.push_back(task);
1120 }
1121 None => {
1122 shutting_down = true;
1123 continue
1124 }
1125 }
1126 }
1127
1128 else => {
1129 if shutting_down {
1130 break
1131 }
1132 }
1133 }
1134 }
1135
1136 Ok(TaskOutput::Transform)
1137 }
1138}
1139
1140fn build_task_transform(
1141 t: Box<dyn TaskTransform<EventArray>>,
1142 input_rx: BufferReceiver<EventArray>,
1143 input_type: DataType,
1144 typetag: &str,
1145 key: &ComponentKey,
1146 outputs: &[TransformOutput],
1147 utilization_registry: &UtilizationRegistry,
1148) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
1149 let (mut fanout, control) = Fanout::new();
1150
1151 let sender = utilization_registry.add_component(key.clone(), gauge!("utilization"));
1152 let input_rx = wrap(sender, key.clone(), input_rx.into_stream());
1153
1154 let events_received = register!(EventsReceived);
1155 let filtered = input_rx
1156 .filter(move |events| ready(filter_events_type(events, input_type)))
1157 .inspect(move |events| {
1158 events_received.emit(CountByteSize(
1159 events.len(),
1160 events.estimated_json_encoded_size_of(),
1161 ))
1162 });
1163 let events_sent = register!(EventsSent::from(internal_event::Output(None)));
1164 let output_id = Arc::new(OutputId {
1165 component: key.clone(),
1166 port: None,
1167 });
1168
1169 let schema_definition_map = outputs
1171 .iter()
1172 .find(|x| x.port.is_none())
1173 .expect("output for default port required for task transforms")
1174 .log_schema_definitions
1175 .clone()
1176 .into_iter()
1177 .map(|(key, value)| (key, Arc::new(value)))
1178 .collect();
1179
1180 let stream = t
1181 .transform(Box::pin(filtered))
1182 .map(move |mut events| {
1183 for event in events.iter_events_mut() {
1184 update_runtime_schema_definition(event, &output_id, &schema_definition_map);
1185 }
1186 (events, Instant::now())
1187 })
1188 .inspect(move |(events, _): &(EventArray, Instant)| {
1189 events_sent.emit(CountByteSize(
1190 events.len(),
1191 events.estimated_json_encoded_size_of(),
1192 ));
1193 });
1194 let transform = async move {
1195 debug!("Task transform starting.");
1196
1197 match fanout.send_stream(stream).await {
1198 Ok(()) => {
1199 debug!("Task transform finished normally.");
1200 Ok(TaskOutput::Transform)
1201 }
1202 Err(e) => {
1203 debug!("Task transform finished with an error.");
1204 Err(TaskError::wrapped(e))
1205 }
1206 }
1207 }
1208 .boxed();
1209
1210 let mut outputs = HashMap::new();
1211 outputs.insert(OutputId::from(key), control);
1212
1213 let task = Task::new(key.clone(), typetag, transform);
1214
1215 (task, outputs)
1216}