1use std::{
2 collections::HashMap,
3 future::ready,
4 num::NonZeroUsize,
5 sync::{Arc, LazyLock, Mutex},
6 time::Instant,
7};
8
9use futures::{FutureExt, StreamExt, TryStreamExt, stream::FuturesOrdered};
10use futures_util::stream::FuturesUnordered;
11use metrics::gauge;
12use stream_cancel::{StreamExt as StreamCancelExt, Trigger, Tripwire};
13use tokio::{
14 select,
15 sync::{mpsc::UnboundedSender, oneshot},
16 time::timeout,
17};
18use tracing::Instrument;
19use vector_lib::{
20 EstimatedJsonEncodedSizeOf,
21 buffers::{
22 BufferType, WhenFull,
23 topology::{
24 builder::TopologyBuilder,
25 channel::{BufferReceiver, BufferSender, ChannelMetricMetadata},
26 },
27 },
28 internal_event::{self, CountByteSize, EventsSent, InternalEventHandle as _, Registered},
29 schema::Definition,
30 source_sender::{CHUNK_SIZE, SourceSenderItem},
31 transform::update_runtime_schema_definition,
32};
33use vector_vrl_metrics::MetricsStorage;
34
35use super::{
36 BuiltBuffer, ConfigDiff,
37 fanout::{self, Fanout},
38 schema,
39 task::{Task, TaskOutput, TaskResult},
40};
41use crate::{
42 SourceSender,
43 config::{
44 ComponentKey, Config, DataType, EnrichmentTableConfig, Input, Inputs, OutputId,
45 ProxyConfig, SinkContext, SourceContext, TransformContext, TransformOuter, TransformOutput,
46 },
47 event::{EventArray, EventContainer},
48 extra_context::ExtraContext,
49 internal_events::EventsReceived,
50 shutdown::SourceShutdownCoordinator,
51 spawn_named,
52 topology::task::TaskError,
53 transforms::{SyncTransform, TaskTransform, Transform, TransformOutputs, TransformOutputsBuf},
54 utilization::{UtilizationComponentSender, UtilizationEmitter, UtilizationRegistry, wrap},
55};
56
57static ENRICHMENT_TABLES: LazyLock<vector_lib::enrichment::TableRegistry> =
58 LazyLock::new(vector_lib::enrichment::TableRegistry::default);
59static METRICS_STORAGE: LazyLock<MetricsStorage> = LazyLock::new(MetricsStorage::default);
60
61pub(crate) static SOURCE_SENDER_BUFFER_SIZE: LazyLock<usize> =
62 LazyLock::new(|| *TRANSFORM_CONCURRENCY_LIMIT * CHUNK_SIZE);
63
64const READY_ARRAY_CAPACITY: NonZeroUsize = NonZeroUsize::new(CHUNK_SIZE * 4).unwrap();
65pub(crate) const TOPOLOGY_BUFFER_SIZE: NonZeroUsize = NonZeroUsize::new(100).unwrap();
66const TRANSFORM_CHANNEL_METRIC_PREFIX: &str = "transform_buffer";
67
68static TRANSFORM_CONCURRENCY_LIMIT: LazyLock<usize> = LazyLock::new(|| {
69 crate::app::worker_threads()
70 .map(std::num::NonZeroUsize::get)
71 .unwrap_or_else(crate::num_threads)
72});
73
74const INTERNAL_SOURCES: [&str; 2] = ["internal_logs", "internal_metrics"];
75
76struct Builder<'a> {
77 config: &'a super::Config,
78 diff: &'a ConfigDiff,
79 shutdown_coordinator: SourceShutdownCoordinator,
80 errors: Vec<String>,
81 outputs: HashMap<OutputId, UnboundedSender<fanout::ControlMessage>>,
82 tasks: HashMap<ComponentKey, Task>,
83 buffers: HashMap<ComponentKey, BuiltBuffer>,
84 inputs: HashMap<ComponentKey, (BufferSender<EventArray>, Inputs<OutputId>)>,
85 healthchecks: HashMap<ComponentKey, Task>,
86 detach_triggers: HashMap<ComponentKey, Trigger>,
87 extra_context: ExtraContext,
88 utilization_emitter: Option<UtilizationEmitter>,
89 utilization_registry: UtilizationRegistry,
90}
91
92impl<'a> Builder<'a> {
93 fn new(
94 config: &'a super::Config,
95 diff: &'a ConfigDiff,
96 buffers: HashMap<ComponentKey, BuiltBuffer>,
97 extra_context: ExtraContext,
98 utilization_registry: Option<UtilizationRegistry>,
99 ) -> Self {
100 let (emitter, registry) = if let Some(registry) = utilization_registry {
103 (None, registry)
104 } else {
105 let (emitter, registry) = UtilizationEmitter::new();
106 (Some(emitter), registry)
107 };
108 Self {
109 config,
110 diff,
111 buffers,
112 shutdown_coordinator: SourceShutdownCoordinator::default(),
113 errors: vec![],
114 outputs: HashMap::new(),
115 tasks: HashMap::new(),
116 inputs: HashMap::new(),
117 healthchecks: HashMap::new(),
118 detach_triggers: HashMap::new(),
119 extra_context,
120 utilization_emitter: emitter,
121 utilization_registry: registry,
122 }
123 }
124
125 async fn build(mut self) -> Result<TopologyPieces, Vec<String>> {
127 let enrichment_tables = self.load_enrichment_tables().await;
128 let source_tasks = self.build_sources(enrichment_tables).await;
129 self.build_transforms(enrichment_tables).await;
130 self.build_sinks(enrichment_tables).await;
131
132 enrichment_tables.finish_load();
135
136 if self.errors.is_empty() {
137 Ok(TopologyPieces {
138 inputs: self.inputs,
139 outputs: Self::finalize_outputs(self.outputs),
140 tasks: self.tasks,
141 source_tasks,
142 healthchecks: self.healthchecks,
143 shutdown_coordinator: self.shutdown_coordinator,
144 detach_triggers: self.detach_triggers,
145 metrics_storage: METRICS_STORAGE.clone(),
146 utilization: self
147 .utilization_emitter
148 .map(|e| (e, self.utilization_registry)),
149 })
150 } else {
151 Err(self.errors)
152 }
153 }
154
155 fn finalize_outputs(
156 outputs: HashMap<OutputId, UnboundedSender<fanout::ControlMessage>>,
157 ) -> HashMap<ComponentKey, HashMap<Option<String>, UnboundedSender<fanout::ControlMessage>>>
158 {
159 let mut finalized_outputs = HashMap::new();
160 for (id, output) in outputs {
161 let entry = finalized_outputs
162 .entry(id.component)
163 .or_insert_with(HashMap::new);
164 entry.insert(id.port, output);
165 }
166
167 finalized_outputs
168 }
169
170 async fn load_enrichment_tables(&mut self) -> &'static vector_lib::enrichment::TableRegistry {
173 let mut enrichment_tables = HashMap::new();
174
175 'tables: for (name, table_outer) in self.config.enrichment_tables.iter() {
177 let table_name = name.to_string();
178 if ENRICHMENT_TABLES.needs_reload(&table_name) {
179 let indexes = if !self.diff.enrichment_tables.is_added(name) {
180 Some(ENRICHMENT_TABLES.index_fields(&table_name))
183 } else {
184 None
185 };
186
187 let mut table = match table_outer.inner.build(&self.config.global).await {
188 Ok(table) => table,
189 Err(error) => {
190 self.errors
191 .push(format!("Enrichment Table \"{name}\": {error}"));
192 continue;
193 }
194 };
195
196 if let Some(indexes) = indexes {
197 for (case, index) in indexes {
198 match table
199 .add_index(case, &index.iter().map(|s| s.as_ref()).collect::<Vec<_>>())
200 {
201 Ok(_) => (),
202 Err(error) => {
203 error!(message = "Unable to add index to reloaded enrichment table.",
207 table = ?name.to_string(),
208 %error);
209 continue 'tables;
210 }
211 }
212 }
213 }
214
215 enrichment_tables.insert(table_name, table);
216 }
217 }
218
219 ENRICHMENT_TABLES.load(enrichment_tables);
220
221 &ENRICHMENT_TABLES
222 }
223
224 async fn build_sources(
225 &mut self,
226 enrichment_tables: &vector_lib::enrichment::TableRegistry,
227 ) -> HashMap<ComponentKey, Task> {
228 let mut source_tasks = HashMap::new();
229
230 let table_sources = self
231 .config
232 .enrichment_tables
233 .iter()
234 .filter_map(|(key, table)| table.as_source(key))
235 .collect::<Vec<_>>();
236 for (key, source) in self
237 .config
238 .sources()
239 .filter(|(key, _)| self.diff.sources.contains_new(key))
240 .chain(
241 table_sources
242 .iter()
243 .map(|(key, source)| (key, source))
244 .filter(|(key, _)| self.diff.enrichment_tables.contains_new(key)),
245 )
246 {
247 debug!(component_id = %key, "Building new source.");
248
249 let typetag = source.inner.get_component_name();
250 let source_outputs = source.inner.outputs(self.config.schema.log_namespace());
251
252 let span = error_span!(
253 "source",
254 component_kind = "source",
255 component_id = %key.id(),
256 component_type = %source.inner.get_component_name(),
257 );
258 let _entered_span = span.enter();
259
260 let task_name = format!(
261 ">> {} ({}, pump) >>",
262 source.inner.get_component_name(),
263 key.id()
264 );
265
266 let mut builder = SourceSender::builder()
267 .with_buffer(*SOURCE_SENDER_BUFFER_SIZE)
268 .with_timeout(source.inner.send_timeout())
269 .with_ewma_alpha(self.config.global.buffer_utilization_ewma_alpha);
270 let mut pumps = Vec::new();
271 let mut controls = HashMap::new();
272 let mut schema_definitions = HashMap::with_capacity(source_outputs.len());
273
274 for output in source_outputs.into_iter() {
275 let mut rx = builder.add_source_output(output.clone(), key.clone());
276
277 let (mut fanout, control) = Fanout::new();
278 let source_type = source.inner.get_component_name();
279 let source = Arc::new(key.clone());
280
281 let pump = async move {
282 debug!("Source pump starting.");
283
284 while let Some(SourceSenderItem {
285 events: mut array,
286 send_reference,
287 }) = rx.next().await
288 {
289 array.set_output_id(&source);
290 array.set_source_type(source_type);
291 fanout
292 .send(array, Some(send_reference))
293 .await
294 .map_err(|e| {
295 debug!("Source pump finished with an error.");
296 TaskError::wrapped(e)
297 })?;
298 }
299
300 debug!("Source pump finished normally.");
301 Ok(TaskOutput::Source)
302 };
303
304 pumps.push(pump.instrument(span.clone()));
305 controls.insert(
306 OutputId {
307 component: key.clone(),
308 port: output.port.clone(),
309 },
310 control,
311 );
312
313 let port = output.port.clone();
314 if let Some(definition) = output.schema_definition(self.config.schema.enabled) {
315 schema_definitions.insert(port, definition);
316 }
317 }
318
319 let (pump_error_tx, mut pump_error_rx) = oneshot::channel();
320 let pump = async move {
321 debug!("Source pump supervisor starting.");
322
323 let mut handles = FuturesUnordered::new();
328 for pump in pumps {
329 handles.push(spawn_named(pump, task_name.as_ref()));
330 }
331
332 let mut had_pump_error = false;
333 while let Some(output) = handles.try_next().await? {
334 if let Err(e) = output {
335 _ = pump_error_tx.send(e);
338 had_pump_error = true;
339 break;
340 }
341 }
342
343 if had_pump_error {
344 debug!("Source pump supervisor task finished with an error.");
345 } else {
346 debug!("Source pump supervisor task finished normally.");
347 }
348 Ok(TaskOutput::Source)
349 };
350 let pump = Task::new(key.clone(), typetag, pump);
351
352 let (shutdown_signal, force_shutdown_tripwire) = self
353 .shutdown_coordinator
354 .register_source(key, INTERNAL_SOURCES.contains(&typetag));
355
356 let context = SourceContext {
357 key: key.clone(),
358 globals: self.config.global.clone(),
359 enrichment_tables: enrichment_tables.clone(),
360 metrics_storage: METRICS_STORAGE.clone(),
361 shutdown: shutdown_signal,
362 out: builder.build(),
363 proxy: ProxyConfig::merge_with_env(&self.config.global.proxy, &source.proxy),
364 acknowledgements: source.sink_acknowledgements,
365 schema_definitions,
366 schema: self.config.schema,
367 extra_context: self.extra_context.clone(),
368 };
369 let server = match source.inner.build(context).await {
370 Err(error) => {
371 self.errors.push(format!("Source \"{key}\": {error}"));
372 continue;
373 }
374 Ok(server) => server,
375 };
376
377 let server = async move {
385 debug!("Source starting.");
386
387 let mut result = select! {
388 biased;
389
390 _ = force_shutdown_tripwire => Ok(()),
392
393 Ok(e) = &mut pump_error_rx => Err(e),
399
400 result = server => result.map_err(|_| TaskError::Opaque),
402 };
403
404 if let Ok(e) = pump_error_rx.try_recv() {
413 result = Err(e);
414 }
415
416 match result {
417 Ok(()) => {
418 debug!("Source finished normally.");
419 Ok(TaskOutput::Source)
420 }
421 Err(e) => {
422 debug!("Source finished with an error.");
423 Err(e)
424 }
425 }
426 };
427 let server = Task::new(key.clone(), typetag, server);
428
429 self.outputs.extend(controls);
430 self.tasks.insert(key.clone(), pump);
431 source_tasks.insert(key.clone(), server);
432 }
433
434 source_tasks
435 }
436
437 async fn build_transforms(
438 &mut self,
439 enrichment_tables: &vector_lib::enrichment::TableRegistry,
440 ) {
441 let mut definition_cache = HashMap::default();
442
443 for (key, transform) in self
444 .config
445 .transforms()
446 .filter(|(key, _)| self.diff.transforms.contains_new(key))
447 {
448 debug!(component_id = %key, "Building new transform.");
449
450 let input_definitions = match schema::input_definitions(
451 &transform.inputs,
452 self.config,
453 enrichment_tables.clone(),
454 &mut definition_cache,
455 ) {
456 Ok(definitions) => definitions,
457 Err(_) => {
458 return;
462 }
463 };
464
465 let merged_definition: Definition = input_definitions
466 .iter()
467 .map(|(_output_id, definition)| definition.clone())
468 .reduce(Definition::merge)
469 .unwrap_or_else(Definition::any);
471
472 let span = error_span!(
473 "transform",
474 component_kind = "transform",
475 component_id = %key.id(),
476 component_type = %transform.inner.get_component_name(),
477 );
478 let _span = span.enter();
479
480 let schema_definitions = transform
482 .inner
483 .outputs(
484 &TransformContext {
485 enrichment_tables: enrichment_tables.clone(),
486 metrics_storage: METRICS_STORAGE.clone(),
487 schema: self.config.schema,
488 ..Default::default()
489 },
490 &input_definitions,
491 )
492 .into_iter()
493 .map(|output| {
494 let definitions = output.schema_definitions(self.config.schema.enabled);
495 (output.port, definitions)
496 })
497 .collect::<HashMap<_, _>>();
498
499 let context = TransformContext {
500 key: Some(key.clone()),
501 globals: self.config.global.clone(),
502 enrichment_tables: enrichment_tables.clone(),
503 metrics_storage: METRICS_STORAGE.clone(),
504 schema_definitions,
505 merged_schema_definition: merged_definition.clone(),
506 schema: self.config.schema,
507 extra_context: self.extra_context.clone(),
508 };
509
510 let node =
511 TransformNode::from_parts(key.clone(), &context, transform, &input_definitions);
512
513 let transform = match transform
514 .inner
515 .build(&context)
516 .instrument(span.clone())
517 .await
518 {
519 Err(error) => {
520 self.errors.push(format!("Transform \"{key}\": {error}"));
521 continue;
522 }
523 Ok(transform) => transform,
524 };
525
526 let metrics = ChannelMetricMetadata::new(TRANSFORM_CHANNEL_METRIC_PREFIX, None);
527 let (input_tx, input_rx) = TopologyBuilder::standalone_memory(
528 TOPOLOGY_BUFFER_SIZE,
529 WhenFull::Block,
530 &span,
531 Some(metrics),
532 self.config.global.buffer_utilization_ewma_alpha,
533 );
534
535 self.inputs
536 .insert(key.clone(), (input_tx, node.inputs.clone()));
537
538 let (transform_task, transform_outputs) =
539 build_transform(transform, node, input_rx, &self.utilization_registry);
540
541 self.outputs.extend(transform_outputs);
542 self.tasks.insert(key.clone(), transform_task);
543 }
544 }
545
546 async fn build_sinks(&mut self, enrichment_tables: &vector_lib::enrichment::TableRegistry) {
547 let table_sinks = self
548 .config
549 .enrichment_tables
550 .iter()
551 .filter_map(|(key, table)| table.as_sink(key))
552 .collect::<Vec<_>>();
553 for (key, sink) in self
554 .config
555 .sinks()
556 .filter(|(key, _)| self.diff.sinks.contains_new(key))
557 .chain(
558 table_sinks
559 .iter()
560 .map(|(key, sink)| (key, sink))
561 .filter(|(key, _)| self.diff.enrichment_tables.contains_new(key)),
562 )
563 {
564 debug!(component_id = %key, "Building new sink.");
565
566 let sink_inputs = &sink.inputs;
567 let healthcheck = sink.healthcheck();
568 let enable_healthcheck = healthcheck.enabled && self.config.healthchecks.enabled;
569 let healthcheck_timeout = healthcheck.timeout;
570
571 let typetag = sink.inner.get_component_name();
572 let input_type = sink.inner.input().data_type();
573
574 let span = error_span!(
575 "sink",
576 component_kind = "sink",
577 component_id = %key.id(),
578 component_type = %sink.inner.get_component_name(),
579 );
580 let _entered_span = span.enter();
581
582 if let Err(mut err) = schema::validate_sink_expectations(
586 key,
587 sink,
588 self.config,
589 enrichment_tables.clone(),
590 ) {
591 self.errors.append(&mut err);
592 };
593
594 let (tx, rx) = match self.buffers.remove(key) {
595 Some(buffer) => buffer,
596 _ => {
597 let buffer_type =
598 match sink.buffer.stages().first().expect("cant ever be empty") {
599 BufferType::Memory { .. } => "memory",
600 BufferType::DiskV2 { .. } => "disk",
601 };
602 let buffer_span = error_span!("sink", buffer_type);
603 let buffer = sink
604 .buffer
605 .build(
606 self.config.global.data_dir.clone(),
607 key.to_string(),
608 buffer_span,
609 )
610 .await;
611 match buffer {
612 Err(error) => {
613 self.errors.push(format!("Sink \"{key}\": {error}"));
614 continue;
615 }
616 Ok((tx, rx)) => (tx, Arc::new(Mutex::new(Some(rx.into_stream())))),
617 }
618 }
619 };
620
621 let cx = SinkContext {
622 healthcheck,
623 globals: self.config.global.clone(),
624 enrichment_tables: enrichment_tables.clone(),
625 metrics_storage: METRICS_STORAGE.clone(),
626 proxy: ProxyConfig::merge_with_env(&self.config.global.proxy, sink.proxy()),
627 schema: self.config.schema,
628 app_name: crate::get_app_name().to_string(),
629 app_name_slug: crate::get_slugified_app_name(),
630 extra_context: self.extra_context.clone(),
631 };
632
633 let (sink, healthcheck) = match sink.inner.build(cx).await {
634 Err(error) => {
635 self.errors.push(format!("Sink \"{key}\": {error}"));
636 continue;
637 }
638 Ok(built) => built,
639 };
640
641 let (trigger, tripwire) = Tripwire::new();
642
643 let utilization_sender = self
644 .utilization_registry
645 .add_component(key.clone(), gauge!("utilization"));
646 let component_key = key.clone();
647 let sink = async move {
648 debug!("Sink starting.");
649
650 let rx = rx
657 .lock()
658 .unwrap()
659 .take()
660 .expect("Task started but input has been taken.");
661
662 let mut rx = wrap(utilization_sender, component_key.clone(), rx);
663
664 let events_received = register!(EventsReceived);
665 sink.run(
666 rx.by_ref()
667 .filter(|events: &EventArray| ready(filter_events_type(events, input_type)))
668 .inspect(|events| {
669 events_received.emit(CountByteSize(
670 events.len(),
671 events.estimated_json_encoded_size_of(),
672 ))
673 })
674 .take_until_if(tripwire),
675 )
676 .await
677 .map(|_| {
678 debug!("Sink finished normally.");
679 TaskOutput::Sink(rx)
680 })
681 .map_err(|_| {
682 debug!("Sink finished with an error.");
683 TaskError::Opaque
684 })
685 };
686
687 let task = Task::new(key.clone(), typetag, sink);
688
689 let component_key = key.clone();
690 let healthcheck_task = async move {
691 if enable_healthcheck {
692 timeout(healthcheck_timeout, healthcheck)
693 .map(|result| match result {
694 Ok(Ok(_)) => {
695 info!("Healthcheck passed.");
696 Ok(TaskOutput::Healthcheck)
697 }
698 Ok(Err(error)) => {
699 error!(
700 msg = "Healthcheck failed.",
701 %error,
702 component_kind = "sink",
703 component_type = typetag,
704 component_id = %component_key.id(),
705 );
706 Err(TaskError::wrapped(error))
707 }
708 Err(e) => {
709 error!(
710 msg = "Healthcheck timed out.",
711 component_kind = "sink",
712 component_type = typetag,
713 component_id = %component_key.id(),
714 );
715 Err(TaskError::wrapped(Box::new(e)))
716 }
717 })
718 .await
719 } else {
720 info!("Healthcheck disabled.");
721 Ok(TaskOutput::Healthcheck)
722 }
723 };
724
725 let healthcheck_task = Task::new(key.clone(), typetag, healthcheck_task);
726
727 self.inputs.insert(key.clone(), (tx, sink_inputs.clone()));
728 self.healthchecks.insert(key.clone(), healthcheck_task);
729 self.tasks.insert(key.clone(), task);
730 self.detach_triggers.insert(key.clone(), trigger);
731 }
732 }
733}
734
735pub async fn reload_enrichment_tables(config: &Config) {
736 let mut enrichment_tables = HashMap::new();
737 'tables: for (name, table_outer) in config.enrichment_tables.iter() {
739 let table_name = name.to_string();
740 if ENRICHMENT_TABLES.needs_reload(&table_name) {
741 let indexes = Some(ENRICHMENT_TABLES.index_fields(&table_name));
742
743 let mut table = match table_outer.inner.build(&config.global).await {
744 Ok(table) => table,
745 Err(error) => {
746 error!("Enrichment table \"{name}\" reload failed: {error}");
747 continue;
748 }
749 };
750
751 if let Some(indexes) = indexes {
752 for (case, index) in indexes {
753 match table
754 .add_index(case, &index.iter().map(|s| s.as_ref()).collect::<Vec<_>>())
755 {
756 Ok(_) => (),
757 Err(error) => {
758 error!(
762 message = "Unable to add index to reloaded enrichment table.",
763 table = ?name.to_string(),
764 %error
765 );
766 continue 'tables;
767 }
768 }
769 }
770 }
771
772 enrichment_tables.insert(table_name, table);
773 }
774 }
775
776 ENRICHMENT_TABLES.load(enrichment_tables);
777 ENRICHMENT_TABLES.finish_load();
778}
779
780pub struct TopologyPieces {
781 pub(super) inputs: HashMap<ComponentKey, (BufferSender<EventArray>, Inputs<OutputId>)>,
782 pub(crate) outputs: HashMap<ComponentKey, HashMap<Option<String>, fanout::ControlChannel>>,
783 pub(super) tasks: HashMap<ComponentKey, Task>,
784 pub(crate) source_tasks: HashMap<ComponentKey, Task>,
785 pub(super) healthchecks: HashMap<ComponentKey, Task>,
786 pub(crate) shutdown_coordinator: SourceShutdownCoordinator,
787 pub(crate) detach_triggers: HashMap<ComponentKey, Trigger>,
788 pub(crate) metrics_storage: MetricsStorage,
789 pub(crate) utilization: Option<(UtilizationEmitter, UtilizationRegistry)>,
790}
791
792pub struct TopologyPiecesBuilder<'a> {
804 config: &'a Config,
805 diff: &'a ConfigDiff,
806 buffers: HashMap<ComponentKey, BuiltBuffer>,
807 extra_context: ExtraContext,
808 utilization_registry: Option<UtilizationRegistry>,
809}
810
811impl<'a> TopologyPiecesBuilder<'a> {
812 pub fn new(config: &'a Config, diff: &'a ConfigDiff) -> Self {
814 Self {
815 config,
816 diff,
817 buffers: HashMap::new(),
818 extra_context: ExtraContext::default(),
819 utilization_registry: None,
820 }
821 }
822
823 pub fn with_buffers(mut self, buffers: HashMap<ComponentKey, BuiltBuffer>) -> Self {
825 self.buffers = buffers;
826 self
827 }
828
829 pub fn with_extra_context(mut self, extra_context: ExtraContext) -> Self {
831 self.extra_context = extra_context;
832 self
833 }
834
835 pub fn with_utilization_registry(mut self, registry: Option<UtilizationRegistry>) -> Self {
837 self.utilization_registry = registry;
838 self
839 }
840
841 pub async fn build(self) -> Result<TopologyPieces, Vec<String>> {
846 Builder::new(
847 self.config,
848 self.diff,
849 self.buffers,
850 self.extra_context,
851 self.utilization_registry,
852 )
853 .build()
854 .await
855 }
856
857 pub async fn build_or_log_errors(self) -> Option<TopologyPieces> {
862 match self.build().await {
863 Err(errors) => {
864 for error in errors {
865 error!(message = "Configuration error.", %error, internal_log_rate_limit = false);
866 }
867 None
868 }
869 Ok(new_pieces) => Some(new_pieces),
870 }
871 }
872}
873
874impl TopologyPieces {
875 pub async fn build_or_log_errors(
876 config: &Config,
877 diff: &ConfigDiff,
878 buffers: HashMap<ComponentKey, BuiltBuffer>,
879 extra_context: ExtraContext,
880 utilization_registry: Option<UtilizationRegistry>,
881 ) -> Option<Self> {
882 TopologyPiecesBuilder::new(config, diff)
883 .with_buffers(buffers)
884 .with_extra_context(extra_context)
885 .with_utilization_registry(utilization_registry)
886 .build_or_log_errors()
887 .await
888 }
889
890 pub async fn build(
892 config: &super::Config,
893 diff: &ConfigDiff,
894 buffers: HashMap<ComponentKey, BuiltBuffer>,
895 extra_context: ExtraContext,
896 utilization_registry: Option<UtilizationRegistry>,
897 ) -> Result<Self, Vec<String>> {
898 TopologyPiecesBuilder::new(config, diff)
899 .with_buffers(buffers)
900 .with_extra_context(extra_context)
901 .with_utilization_registry(utilization_registry)
902 .build()
903 .await
904 }
905}
906
907const fn filter_events_type(events: &EventArray, data_type: DataType) -> bool {
908 match events {
909 EventArray::Logs(_) => data_type.contains(DataType::Log),
910 EventArray::Metrics(_) => data_type.contains(DataType::Metric),
911 EventArray::Traces(_) => data_type.contains(DataType::Trace),
912 }
913}
914
915#[derive(Debug, Clone)]
916struct TransformNode {
917 key: ComponentKey,
918 typetag: &'static str,
919 inputs: Inputs<OutputId>,
920 input_details: Input,
921 outputs: Vec<TransformOutput>,
922 enable_concurrency: bool,
923}
924
925impl TransformNode {
926 pub fn from_parts(
927 key: ComponentKey,
928 context: &TransformContext,
929 transform: &TransformOuter<OutputId>,
930 schema_definition: &[(OutputId, Definition)],
931 ) -> Self {
932 Self {
933 key,
934 typetag: transform.inner.get_component_name(),
935 inputs: transform.inputs.clone(),
936 input_details: transform.inner.input(),
937 outputs: transform.inner.outputs(context, schema_definition),
938 enable_concurrency: transform.inner.enable_concurrency(),
939 }
940 }
941}
942
943fn build_transform(
944 transform: Transform,
945 node: TransformNode,
946 input_rx: BufferReceiver<EventArray>,
947 utilization_registry: &UtilizationRegistry,
948) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
949 match transform {
950 Transform::Function(t) => {
952 build_sync_transform(Box::new(t), node, input_rx, utilization_registry)
953 }
954 Transform::Synchronous(t) => build_sync_transform(t, node, input_rx, utilization_registry),
955 Transform::Task(t) => build_task_transform(
956 t,
957 input_rx,
958 node.input_details.data_type(),
959 node.typetag,
960 &node.key,
961 &node.outputs,
962 utilization_registry,
963 ),
964 }
965}
966
967fn build_sync_transform(
968 t: Box<dyn SyncTransform>,
969 node: TransformNode,
970 input_rx: BufferReceiver<EventArray>,
971 utilization_registry: &UtilizationRegistry,
972) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
973 let (outputs, controls) = TransformOutputs::new(node.outputs, &node.key);
974
975 let sender = utilization_registry.add_component(node.key.clone(), gauge!("utilization"));
976 let runner = Runner::new(t, input_rx, sender, node.input_details.data_type(), outputs);
977 let transform = if node.enable_concurrency {
978 runner.run_concurrently().boxed()
979 } else {
980 runner.run_inline().boxed()
981 };
982
983 let transform = async move {
984 debug!("Synchronous transform starting.");
985
986 match transform.await {
987 Ok(v) => {
988 debug!("Synchronous transform finished normally.");
989 Ok(v)
990 }
991 Err(e) => {
992 debug!("Synchronous transform finished with an error.");
993 Err(e)
994 }
995 }
996 };
997
998 let mut output_controls = HashMap::new();
999 for (name, control) in controls {
1000 let id = name
1001 .map(|name| OutputId::from((&node.key, name)))
1002 .unwrap_or_else(|| OutputId::from(&node.key));
1003 output_controls.insert(id, control);
1004 }
1005
1006 let task = Task::new(node.key.clone(), node.typetag, transform);
1007
1008 (task, output_controls)
1009}
1010
1011struct Runner {
1012 transform: Box<dyn SyncTransform>,
1013 input_rx: Option<BufferReceiver<EventArray>>,
1014 input_type: DataType,
1015 outputs: TransformOutputs,
1016 timer_tx: UtilizationComponentSender,
1017 events_received: Registered<EventsReceived>,
1018}
1019
1020impl Runner {
1021 fn new(
1022 transform: Box<dyn SyncTransform>,
1023 input_rx: BufferReceiver<EventArray>,
1024 timer_tx: UtilizationComponentSender,
1025 input_type: DataType,
1026 outputs: TransformOutputs,
1027 ) -> Self {
1028 Self {
1029 transform,
1030 input_rx: Some(input_rx),
1031 input_type,
1032 outputs,
1033 timer_tx,
1034 events_received: register!(EventsReceived),
1035 }
1036 }
1037
1038 fn on_events_received(&mut self, events: &EventArray) {
1039 self.timer_tx.try_send_stop_wait();
1040
1041 self.events_received.emit(CountByteSize(
1042 events.len(),
1043 events.estimated_json_encoded_size_of(),
1044 ));
1045 }
1046
1047 async fn send_outputs(&mut self, outputs_buf: &mut TransformOutputsBuf) -> crate::Result<()> {
1048 self.timer_tx.try_send_start_wait();
1049 self.outputs.send(outputs_buf).await
1050 }
1051
1052 async fn run_inline(mut self) -> TaskResult {
1053 const INLINE_BATCH_SIZE: usize = 128;
1055
1056 let mut outputs_buf = self.outputs.new_buf_with_capacity(INLINE_BATCH_SIZE);
1057
1058 let mut input_rx = self
1059 .input_rx
1060 .take()
1061 .expect("can't run runner twice")
1062 .into_stream()
1063 .filter(move |events| ready(filter_events_type(events, self.input_type)));
1064
1065 self.timer_tx.try_send_start_wait();
1066 while let Some(events) = input_rx.next().await {
1067 self.on_events_received(&events);
1068 self.transform.transform_all(events, &mut outputs_buf);
1069 self.send_outputs(&mut outputs_buf)
1070 .await
1071 .map_err(TaskError::wrapped)?;
1072 }
1073
1074 Ok(TaskOutput::Transform)
1075 }
1076
1077 async fn run_concurrently(mut self) -> TaskResult {
1078 let input_rx = self
1079 .input_rx
1080 .take()
1081 .expect("can't run runner twice")
1082 .into_stream()
1083 .filter(move |events| ready(filter_events_type(events, self.input_type)));
1084
1085 let mut input_rx =
1086 super::ready_arrays::ReadyArrays::with_capacity(input_rx, READY_ARRAY_CAPACITY);
1087
1088 let mut in_flight = FuturesOrdered::new();
1089 let mut shutting_down = false;
1090
1091 self.timer_tx.try_send_start_wait();
1092 loop {
1093 tokio::select! {
1094 biased;
1095
1096 result = in_flight.next(), if !in_flight.is_empty() => {
1097 match result {
1098 Some(Ok(outputs_buf)) => {
1099 let mut outputs_buf: TransformOutputsBuf = outputs_buf;
1100 self.send_outputs(&mut outputs_buf).await
1101 .map_err(TaskError::wrapped)?;
1102 }
1103 _ => unreachable!("join error or bad poll"),
1104 }
1105 }
1106
1107 input_arrays = input_rx.next(), if in_flight.len() < *TRANSFORM_CONCURRENCY_LIMIT && !shutting_down => {
1108 match input_arrays {
1109 Some(input_arrays) => {
1110 let mut len = 0;
1111 for events in &input_arrays {
1112 self.on_events_received(events);
1113 len += events.len();
1114 }
1115
1116 let mut t = self.transform.clone();
1117 let mut outputs_buf = self.outputs.new_buf_with_capacity(len);
1118 let task = tokio::spawn(async move {
1119 for events in input_arrays {
1120 t.transform_all(events, &mut outputs_buf);
1121 }
1122 outputs_buf
1123 }.in_current_span());
1124 in_flight.push_back(task);
1125 }
1126 None => {
1127 shutting_down = true;
1128 continue
1129 }
1130 }
1131 }
1132
1133 else => {
1134 if shutting_down {
1135 break
1136 }
1137 }
1138 }
1139 }
1140
1141 Ok(TaskOutput::Transform)
1142 }
1143}
1144
1145fn build_task_transform(
1146 t: Box<dyn TaskTransform<EventArray>>,
1147 input_rx: BufferReceiver<EventArray>,
1148 input_type: DataType,
1149 typetag: &str,
1150 key: &ComponentKey,
1151 outputs: &[TransformOutput],
1152 utilization_registry: &UtilizationRegistry,
1153) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
1154 let (mut fanout, control) = Fanout::new();
1155
1156 let sender = utilization_registry.add_component(key.clone(), gauge!("utilization"));
1157 let input_rx = wrap(sender, key.clone(), input_rx.into_stream());
1158
1159 let events_received = register!(EventsReceived);
1160 let filtered = input_rx
1161 .filter(move |events| ready(filter_events_type(events, input_type)))
1162 .inspect(move |events| {
1163 events_received.emit(CountByteSize(
1164 events.len(),
1165 events.estimated_json_encoded_size_of(),
1166 ))
1167 });
1168 let events_sent = register!(EventsSent::from(internal_event::Output(None)));
1169 let output_id = Arc::new(OutputId {
1170 component: key.clone(),
1171 port: None,
1172 });
1173
1174 let schema_definition_map = outputs
1176 .iter()
1177 .find(|x| x.port.is_none())
1178 .expect("output for default port required for task transforms")
1179 .log_schema_definitions
1180 .clone()
1181 .into_iter()
1182 .map(|(key, value)| (key, Arc::new(value)))
1183 .collect();
1184
1185 let stream = t
1186 .transform(Box::pin(filtered))
1187 .map(move |mut events| {
1188 for event in events.iter_events_mut() {
1189 update_runtime_schema_definition(event, &output_id, &schema_definition_map);
1190 }
1191 (events, Instant::now())
1192 })
1193 .inspect(move |(events, _): &(EventArray, Instant)| {
1194 events_sent.emit(CountByteSize(
1195 events.len(),
1196 events.estimated_json_encoded_size_of(),
1197 ));
1198 });
1199 let transform = async move {
1200 debug!("Task transform starting.");
1201
1202 match fanout.send_stream(stream).await {
1203 Ok(()) => {
1204 debug!("Task transform finished normally.");
1205 Ok(TaskOutput::Transform)
1206 }
1207 Err(e) => {
1208 debug!("Task transform finished with an error.");
1209 Err(TaskError::wrapped(e))
1210 }
1211 }
1212 }
1213 .boxed();
1214
1215 let mut outputs = HashMap::new();
1216 outputs.insert(OutputId::from(key), control);
1217
1218 let task = Task::new(key.clone(), typetag, transform);
1219
1220 (task, outputs)
1221}