1use std::{
2 collections::HashMap,
3 future::ready,
4 num::NonZeroUsize,
5 sync::{Arc, LazyLock, Mutex},
6 time::Instant,
7};
8
9use futures::{FutureExt, StreamExt, TryStreamExt, stream::FuturesOrdered};
10use futures_util::stream::FuturesUnordered;
11use metrics::gauge;
12use stream_cancel::{StreamExt as StreamCancelExt, Trigger, Tripwire};
13use tokio::{
14 select,
15 sync::{mpsc::UnboundedSender, oneshot},
16 time::timeout,
17};
18use tracing::Instrument;
19use vector_lib::{
20 EstimatedJsonEncodedSizeOf,
21 buffers::{
22 BufferType, WhenFull,
23 topology::{
24 builder::TopologyBuilder,
25 channel::{BufferReceiver, BufferSender, ChannelMetricMetadata, LimitedReceiver},
26 },
27 },
28 internal_event::{self, CountByteSize, EventsSent, InternalEventHandle as _, Registered},
29 latency::LatencyRecorder,
30 schema::Definition,
31 source_sender::{CHUNK_SIZE, SourceSenderItem},
32 transform::update_runtime_schema_definition,
33};
34use vector_vrl_metrics::MetricsStorage;
35
36use super::{
37 BuiltBuffer, ConfigDiff,
38 fanout::{self, Fanout},
39 schema,
40 task::{Task, TaskOutput, TaskResult},
41};
42use crate::{
43 SourceSender,
44 config::{
45 ComponentKey, Config, DataType, EnrichmentTableConfig, Input, Inputs, OutputId,
46 ProxyConfig, SinkContext, SourceContext, TransformContext, TransformOuter, TransformOutput,
47 },
48 event::{EventArray, EventContainer},
49 extra_context::ExtraContext,
50 internal_events::EventsReceived,
51 shutdown::SourceShutdownCoordinator,
52 spawn_named,
53 topology::task::TaskError,
54 transforms::{SyncTransform, TaskTransform, Transform, TransformOutputs, TransformOutputsBuf},
55 utilization::{
56 OutputUtilization, Utilization, UtilizationComponentSender, UtilizationEmitter,
57 UtilizationRegistry,
58 },
59};
60
61static ENRICHMENT_TABLES: LazyLock<vector_lib::enrichment::TableRegistry> =
62 LazyLock::new(vector_lib::enrichment::TableRegistry::default);
63static METRICS_STORAGE: LazyLock<MetricsStorage> = LazyLock::new(MetricsStorage::default);
64
65pub(crate) static SOURCE_SENDER_BUFFER_SIZE: LazyLock<usize> =
66 LazyLock::new(|| *TRANSFORM_CONCURRENCY_LIMIT * CHUNK_SIZE);
67
68const READY_ARRAY_CAPACITY: NonZeroUsize = NonZeroUsize::new(CHUNK_SIZE * 4).unwrap();
69pub(crate) const TOPOLOGY_BUFFER_SIZE: NonZeroUsize = NonZeroUsize::new(100).unwrap();
70const TRANSFORM_CHANNEL_METRIC_PREFIX: &str = "transform_buffer";
71
72static TRANSFORM_CONCURRENCY_LIMIT: LazyLock<usize> = LazyLock::new(|| {
73 crate::app::worker_threads()
74 .map(std::num::NonZeroUsize::get)
75 .unwrap_or_else(crate::num_threads)
76});
77
78const INTERNAL_SOURCES: [&str; 2] = ["internal_logs", "internal_metrics"];
79
80struct Builder<'a> {
81 config: &'a super::Config,
82 diff: &'a ConfigDiff,
83 shutdown_coordinator: SourceShutdownCoordinator,
84 errors: Vec<String>,
85 outputs: HashMap<OutputId, UnboundedSender<fanout::ControlMessage>>,
86 tasks: HashMap<ComponentKey, Task>,
87 buffers: HashMap<ComponentKey, BuiltBuffer>,
88 inputs: HashMap<ComponentKey, (BufferSender<EventArray>, Inputs<OutputId>)>,
89 healthchecks: HashMap<ComponentKey, Task>,
90 detach_triggers: HashMap<ComponentKey, Trigger>,
91 extra_context: ExtraContext,
92 utilization_emitter: Option<UtilizationEmitter>,
93 utilization_registry: UtilizationRegistry,
94}
95
96impl<'a> Builder<'a> {
97 fn new(
98 config: &'a super::Config,
99 diff: &'a ConfigDiff,
100 buffers: HashMap<ComponentKey, BuiltBuffer>,
101 extra_context: ExtraContext,
102 utilization_registry: Option<UtilizationRegistry>,
103 ) -> Self {
104 let (emitter, registry) = if let Some(registry) = utilization_registry {
107 (None, registry)
108 } else {
109 let (emitter, registry) = UtilizationEmitter::new();
110 (Some(emitter), registry)
111 };
112 Self {
113 config,
114 diff,
115 buffers,
116 shutdown_coordinator: SourceShutdownCoordinator::default(),
117 errors: vec![],
118 outputs: HashMap::new(),
119 tasks: HashMap::new(),
120 inputs: HashMap::new(),
121 healthchecks: HashMap::new(),
122 detach_triggers: HashMap::new(),
123 extra_context,
124 utilization_emitter: emitter,
125 utilization_registry: registry,
126 }
127 }
128
129 async fn build(mut self) -> Result<TopologyPieces, Vec<String>> {
131 let enrichment_tables = self.load_enrichment_tables().await;
132 let source_tasks = self.build_sources(enrichment_tables).await;
133 self.build_transforms(enrichment_tables).await;
134 self.build_sinks(enrichment_tables).await;
135
136 enrichment_tables.finish_load();
139
140 if self.errors.is_empty() {
141 Ok(TopologyPieces {
142 inputs: self.inputs,
143 outputs: Self::finalize_outputs(self.outputs),
144 tasks: self.tasks,
145 source_tasks,
146 healthchecks: self.healthchecks,
147 shutdown_coordinator: self.shutdown_coordinator,
148 detach_triggers: self.detach_triggers,
149 metrics_storage: METRICS_STORAGE.clone(),
150 utilization: self
151 .utilization_emitter
152 .map(|e| (e, self.utilization_registry)),
153 })
154 } else {
155 Err(self.errors)
156 }
157 }
158
159 fn finalize_outputs(
160 outputs: HashMap<OutputId, UnboundedSender<fanout::ControlMessage>>,
161 ) -> HashMap<ComponentKey, HashMap<Option<String>, UnboundedSender<fanout::ControlMessage>>>
162 {
163 let mut finalized_outputs = HashMap::new();
164 for (id, output) in outputs {
165 let entry = finalized_outputs
166 .entry(id.component)
167 .or_insert_with(HashMap::new);
168 entry.insert(id.port, output);
169 }
170
171 finalized_outputs
172 }
173
174 async fn load_enrichment_tables(&mut self) -> &'static vector_lib::enrichment::TableRegistry {
177 let mut enrichment_tables = HashMap::new();
178
179 'tables: for (name, table_outer) in self.config.enrichment_tables.iter() {
181 let table_name = name.to_string();
182 if ENRICHMENT_TABLES.needs_reload(&table_name) {
183 let indexes = if !self.diff.enrichment_tables.is_added(name) {
184 Some(ENRICHMENT_TABLES.index_fields(&table_name))
187 } else {
188 None
189 };
190
191 let mut table = match table_outer.inner.build(&self.config.global).await {
192 Ok(table) => table,
193 Err(error) => {
194 self.errors
195 .push(format!("Enrichment Table \"{name}\": {error}"));
196 continue;
197 }
198 };
199
200 if let Some(indexes) = indexes {
201 for (case, index) in indexes {
202 match table
203 .add_index(case, &index.iter().map(|s| s.as_ref()).collect::<Vec<_>>())
204 {
205 Ok(_) => (),
206 Err(error) => {
207 error!(message = "Unable to add index to reloaded enrichment table.",
211 table = ?name.to_string(),
212 %error);
213 continue 'tables;
214 }
215 }
216 }
217 }
218
219 enrichment_tables.insert(table_name, table);
220 }
221 }
222
223 ENRICHMENT_TABLES.load(enrichment_tables);
224
225 &ENRICHMENT_TABLES
226 }
227
228 async fn build_sources(
229 &mut self,
230 enrichment_tables: &vector_lib::enrichment::TableRegistry,
231 ) -> HashMap<ComponentKey, Task> {
232 let mut source_tasks = HashMap::new();
233
234 let table_sources = self
235 .config
236 .enrichment_tables
237 .iter()
238 .filter_map(|(key, table)| table.as_source(key))
239 .collect::<Vec<_>>();
240 for (key, source) in self
241 .config
242 .sources()
243 .filter(|(key, _)| self.diff.sources.contains_new(key))
244 .chain(
245 table_sources
246 .iter()
247 .map(|(key, source)| (key, source))
248 .filter(|(key, _)| self.diff.enrichment_tables.contains_new(key)),
249 )
250 {
251 debug!(component_id = %key, "Building new source.");
252
253 let typetag = source.inner.get_component_name();
254 let source_outputs = source.inner.outputs(self.config.schema.log_namespace());
255
256 let span = error_span!(
257 "source",
258 component_kind = "source",
259 component_id = %key.id(),
260 component_type = %source.inner.get_component_name(),
261 );
262 let _entered_span = span.enter();
263
264 let task_name = format!(
265 ">> {} ({}, pump) >>",
266 source.inner.get_component_name(),
267 key.id()
268 );
269
270 let mut builder = SourceSender::builder()
271 .with_buffer(*SOURCE_SENDER_BUFFER_SIZE)
272 .with_timeout(source.inner.send_timeout())
273 .with_ewma_half_life_seconds(
274 self.config.global.buffer_utilization_ewma_half_life_seconds,
275 );
276 let mut pumps = Vec::new();
277 let mut controls = HashMap::new();
278 let mut schema_definitions = HashMap::with_capacity(source_outputs.len());
279
280 for output in source_outputs.into_iter() {
281 let rx = builder.add_source_output(output.clone(), key.clone());
282
283 let (fanout, control) = Fanout::new();
284 let source_type = source.inner.get_component_name();
285 let source = Arc::new(key.clone());
286
287 let pump = run_source_output_pump(rx, fanout, source, source_type);
288
289 pumps.push(pump.instrument(span.clone()));
290 controls.insert(
291 OutputId {
292 component: key.clone(),
293 port: output.port.clone(),
294 },
295 control,
296 );
297
298 let port = output.port.clone();
299 if let Some(definition) = output.schema_definition(self.config.schema.enabled) {
300 schema_definitions.insert(port, definition);
301 }
302 }
303
304 let (pump_error_tx, mut pump_error_rx) = oneshot::channel();
305 let pump = async move {
306 debug!("Source pump supervisor starting.");
307
308 let mut handles = FuturesUnordered::new();
313 for pump in pumps {
314 handles.push(spawn_named(pump, task_name.as_ref()));
315 }
316
317 let mut had_pump_error = false;
318 while let Some(output) = handles.try_next().await? {
319 if let Err(e) = output {
320 _ = pump_error_tx.send(e);
323 had_pump_error = true;
324 break;
325 }
326 }
327
328 if had_pump_error {
329 debug!("Source pump supervisor task finished with an error.");
330 } else {
331 debug!("Source pump supervisor task finished normally.");
332 }
333 Ok(TaskOutput::Source)
334 };
335 let pump = Task::new(key.clone(), typetag, pump);
336
337 let (shutdown_signal, force_shutdown_tripwire) = self
338 .shutdown_coordinator
339 .register_source(key, INTERNAL_SOURCES.contains(&typetag));
340
341 let context = SourceContext {
342 key: key.clone(),
343 globals: self.config.global.clone(),
344 enrichment_tables: enrichment_tables.clone(),
345 metrics_storage: METRICS_STORAGE.clone(),
346 shutdown: shutdown_signal,
347 out: builder.build(),
348 proxy: ProxyConfig::merge_with_env(&self.config.global.proxy, &source.proxy),
349 acknowledgements: source.sink_acknowledgements,
350 schema_definitions,
351 schema: self.config.schema,
352 extra_context: self.extra_context.clone(),
353 };
354 let server = match source.inner.build(context).await {
355 Err(error) => {
356 self.errors.push(format!("Source \"{key}\": {error}"));
357 continue;
358 }
359 Ok(server) => server,
360 };
361
362 let server = async move {
370 debug!("Source starting.");
371
372 let mut result = select! {
373 biased;
374
375 _ = force_shutdown_tripwire => Ok(()),
377
378 Ok(e) = &mut pump_error_rx => Err(e),
384
385 result = server => result.map_err(|_| TaskError::Opaque),
387 };
388
389 if let Ok(e) = pump_error_rx.try_recv() {
398 result = Err(e);
399 }
400
401 match result {
402 Ok(()) => {
403 debug!("Source finished normally.");
404 Ok(TaskOutput::Source)
405 }
406 Err(e) => {
407 debug!("Source finished with an error.");
408 Err(e)
409 }
410 }
411 };
412 let server = Task::new(key.clone(), typetag, server);
413
414 self.outputs.extend(controls);
415 self.tasks.insert(key.clone(), pump);
416 source_tasks.insert(key.clone(), server);
417 }
418
419 source_tasks
420 }
421
422 async fn build_transforms(
423 &mut self,
424 enrichment_tables: &vector_lib::enrichment::TableRegistry,
425 ) {
426 let mut definition_cache = HashMap::default();
427
428 for (key, transform) in self
429 .config
430 .transforms()
431 .filter(|(key, _)| self.diff.transforms.contains_new(key))
432 {
433 debug!(component_id = %key, "Building new transform.");
434
435 let input_definitions = match schema::input_definitions(
436 &transform.inputs,
437 self.config,
438 enrichment_tables.clone(),
439 &mut definition_cache,
440 ) {
441 Ok(definitions) => definitions,
442 Err(_) => {
443 return;
447 }
448 };
449
450 let merged_definition: Definition = input_definitions
451 .iter()
452 .map(|(_output_id, definition)| definition.clone())
453 .reduce(Definition::merge)
454 .unwrap_or_else(Definition::any);
456
457 let span = error_span!(
458 "transform",
459 component_kind = "transform",
460 component_id = %key.id(),
461 component_type = %transform.inner.get_component_name(),
462 );
463 let _span = span.enter();
464
465 let schema_definitions = transform
467 .inner
468 .outputs(
469 &TransformContext {
470 enrichment_tables: enrichment_tables.clone(),
471 metrics_storage: METRICS_STORAGE.clone(),
472 schema: self.config.schema,
473 ..Default::default()
474 },
475 &input_definitions,
476 )
477 .into_iter()
478 .map(|output| {
479 let definitions = output.schema_definitions(self.config.schema.enabled);
480 (output.port, definitions)
481 })
482 .collect::<HashMap<_, _>>();
483
484 let context = TransformContext {
485 key: Some(key.clone()),
486 globals: self.config.global.clone(),
487 enrichment_tables: enrichment_tables.clone(),
488 metrics_storage: METRICS_STORAGE.clone(),
489 schema_definitions,
490 merged_schema_definition: merged_definition.clone(),
491 schema: self.config.schema,
492 extra_context: self.extra_context.clone(),
493 };
494
495 let node =
496 TransformNode::from_parts(key.clone(), &context, transform, &input_definitions);
497
498 let transform = match transform
499 .inner
500 .build(&context)
501 .instrument(span.clone())
502 .await
503 {
504 Err(error) => {
505 self.errors.push(format!("Transform \"{key}\": {error}"));
506 continue;
507 }
508 Ok(transform) => transform,
509 };
510
511 let metrics = ChannelMetricMetadata::new(TRANSFORM_CHANNEL_METRIC_PREFIX, None);
512 let (input_tx, input_rx) = TopologyBuilder::standalone_memory(
513 TOPOLOGY_BUFFER_SIZE,
514 WhenFull::Block,
515 &span,
516 Some(metrics),
517 self.config.global.buffer_utilization_ewma_half_life_seconds,
518 );
519
520 self.inputs
521 .insert(key.clone(), (input_tx, node.inputs.clone()));
522
523 let (transform_task, transform_outputs) =
524 self.build_transform(transform, node, input_rx);
525
526 self.outputs.extend(transform_outputs);
527 self.tasks.insert(key.clone(), transform_task);
528 }
529 }
530
531 async fn build_sinks(&mut self, enrichment_tables: &vector_lib::enrichment::TableRegistry) {
532 let table_sinks = self
533 .config
534 .enrichment_tables
535 .iter()
536 .filter_map(|(key, table)| table.as_sink(key))
537 .collect::<Vec<_>>();
538 for (key, sink) in self
539 .config
540 .sinks()
541 .filter(|(key, _)| self.diff.sinks.contains_new(key))
542 .chain(
543 table_sinks
544 .iter()
545 .map(|(key, sink)| (key, sink))
546 .filter(|(key, _)| self.diff.enrichment_tables.contains_new(key)),
547 )
548 {
549 debug!(component_id = %key, "Building new sink.");
550
551 let sink_inputs = &sink.inputs;
552 let healthcheck = sink.healthcheck();
553 let enable_healthcheck = healthcheck.enabled && self.config.healthchecks.enabled;
554 let healthcheck_timeout = healthcheck.timeout;
555
556 let typetag = sink.inner.get_component_name();
557 let input_type = sink.inner.input().data_type();
558
559 let span = error_span!(
560 "sink",
561 component_kind = "sink",
562 component_id = %key.id(),
563 component_type = %sink.inner.get_component_name(),
564 );
565 let _entered_span = span.enter();
566
567 if let Err(mut err) = schema::validate_sink_expectations(
571 key,
572 sink,
573 self.config,
574 enrichment_tables.clone(),
575 ) {
576 self.errors.append(&mut err);
577 };
578
579 let (tx, rx) = match self.buffers.remove(key) {
580 Some(buffer) => buffer,
581 _ => {
582 let buffer_type =
583 match sink.buffer.stages().first().expect("cant ever be empty") {
584 BufferType::Memory { .. } => "memory",
585 BufferType::DiskV2 { .. } => "disk",
586 };
587 let buffer_span = error_span!("sink", buffer_type);
588 let buffer = sink
589 .buffer
590 .build(
591 self.config.global.data_dir.clone(),
592 key.to_string(),
593 buffer_span,
594 )
595 .await;
596 match buffer {
597 Err(error) => {
598 self.errors.push(format!("Sink \"{key}\": {error}"));
599 continue;
600 }
601 Ok((tx, rx)) => (tx, Arc::new(Mutex::new(Some(rx.into_stream())))),
602 }
603 }
604 };
605
606 let cx = SinkContext {
607 healthcheck,
608 globals: self.config.global.clone(),
609 enrichment_tables: enrichment_tables.clone(),
610 metrics_storage: METRICS_STORAGE.clone(),
611 proxy: ProxyConfig::merge_with_env(&self.config.global.proxy, sink.proxy()),
612 schema: self.config.schema,
613 app_name: crate::get_app_name().to_string(),
614 app_name_slug: crate::get_slugified_app_name(),
615 extra_context: self.extra_context.clone(),
616 };
617
618 let (sink, healthcheck) = match sink.inner.build(cx).await {
619 Err(error) => {
620 self.errors.push(format!("Sink \"{key}\": {error}"));
621 continue;
622 }
623 Ok(built) => built,
624 };
625
626 let (trigger, tripwire) = Tripwire::new();
627
628 let utilization_sender = self
629 .utilization_registry
630 .add_component(key.clone(), gauge!("utilization"));
631 let component_key = key.clone();
632 let sink = async move {
633 debug!("Sink starting.");
634
635 let rx = rx
642 .lock()
643 .unwrap()
644 .take()
645 .expect("Task started but input has been taken.");
646
647 let mut rx = Utilization::new(utilization_sender, component_key.clone(), rx);
648
649 let events_received = register!(EventsReceived);
650 sink.run(
651 rx.by_ref()
652 .filter(|events: &EventArray| ready(filter_events_type(events, input_type)))
653 .inspect(|events| {
654 events_received.emit(CountByteSize(
655 events.len(),
656 events.estimated_json_encoded_size_of(),
657 ))
658 })
659 .take_until_if(tripwire),
660 )
661 .await
662 .map(|_| {
663 debug!("Sink finished normally.");
664 TaskOutput::Sink(rx)
665 })
666 .map_err(|_| {
667 debug!("Sink finished with an error.");
668 TaskError::Opaque
669 })
670 };
671
672 let task = Task::new(key.clone(), typetag, sink);
673
674 let component_key = key.clone();
675 let healthcheck_task = async move {
676 if enable_healthcheck {
677 timeout(healthcheck_timeout, healthcheck)
678 .map(|result| match result {
679 Ok(Ok(_)) => {
680 info!("Healthcheck passed.");
681 Ok(TaskOutput::Healthcheck)
682 }
683 Ok(Err(error)) => {
684 error!(
685 msg = "Healthcheck failed.",
686 %error,
687 component_kind = "sink",
688 component_type = typetag,
689 component_id = %component_key.id(),
690 );
691 Err(TaskError::wrapped(error))
692 }
693 Err(e) => {
694 error!(
695 msg = "Healthcheck timed out.",
696 component_kind = "sink",
697 component_type = typetag,
698 component_id = %component_key.id(),
699 );
700 Err(TaskError::wrapped(Box::new(e)))
701 }
702 })
703 .await
704 } else {
705 info!("Healthcheck disabled.");
706 Ok(TaskOutput::Healthcheck)
707 }
708 };
709
710 let healthcheck_task = Task::new(key.clone(), typetag, healthcheck_task);
711
712 self.inputs.insert(key.clone(), (tx, sink_inputs.clone()));
713 self.healthchecks.insert(key.clone(), healthcheck_task);
714 self.tasks.insert(key.clone(), task);
715 self.detach_triggers.insert(key.clone(), trigger);
716 }
717 }
718
719 fn build_transform(
720 &self,
721 transform: Transform,
722 node: TransformNode,
723 input_rx: BufferReceiver<EventArray>,
724 ) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
725 match transform {
726 Transform::Function(t) => self.build_sync_transform(Box::new(t), node, input_rx),
728 Transform::Synchronous(t) => self.build_sync_transform(t, node, input_rx),
729 Transform::Task(t) => self.build_task_transform(
730 t,
731 input_rx,
732 node.input_details.data_type(),
733 node.typetag,
734 &node.key,
735 &node.outputs,
736 ),
737 }
738 }
739
740 fn build_sync_transform(
741 &self,
742 t: Box<dyn SyncTransform>,
743 node: TransformNode,
744 input_rx: BufferReceiver<EventArray>,
745 ) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
746 let (outputs, controls) = TransformOutputs::new(node.outputs, &node.key);
747
748 let sender = self
749 .utilization_registry
750 .add_component(node.key.clone(), gauge!("utilization"));
751 let runner = Runner::new(
752 t,
753 input_rx,
754 sender,
755 node.input_details.data_type(),
756 outputs,
757 LatencyRecorder::new(self.config.global.latency_ewma_alpha),
758 );
759 let transform = if node.enable_concurrency {
760 runner.run_concurrently().boxed()
761 } else {
762 runner.run_inline().boxed()
763 };
764
765 let transform = async move {
766 debug!("Synchronous transform starting.");
767
768 match transform.await {
769 Ok(v) => {
770 debug!("Synchronous transform finished normally.");
771 Ok(v)
772 }
773 Err(e) => {
774 debug!("Synchronous transform finished with an error.");
775 Err(e)
776 }
777 }
778 };
779
780 let mut output_controls = HashMap::new();
781 for (name, control) in controls {
782 let id = name
783 .map(|name| OutputId::from((&node.key, name)))
784 .unwrap_or_else(|| OutputId::from(&node.key));
785 output_controls.insert(id, control);
786 }
787
788 let task = Task::new(node.key.clone(), node.typetag, transform);
789
790 (task, output_controls)
791 }
792
793 fn build_task_transform(
794 &self,
795 t: Box<dyn TaskTransform<EventArray>>,
796 input_rx: BufferReceiver<EventArray>,
797 input_type: DataType,
798 typetag: &str,
799 key: &ComponentKey,
800 outputs: &[TransformOutput],
801 ) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
802 let (mut fanout, control) = Fanout::new();
803
804 let sender = self
805 .utilization_registry
806 .add_component(key.clone(), gauge!("utilization"));
807 let output_sender = sender.clone();
808 let input_rx = Utilization::new(sender, key.clone(), input_rx.into_stream());
809
810 let events_received = register!(EventsReceived);
811 let filtered = input_rx
812 .filter(move |events| ready(filter_events_type(events, input_type)))
813 .inspect(move |events| {
814 events_received.emit(CountByteSize(
815 events.len(),
816 events.estimated_json_encoded_size_of(),
817 ))
818 });
819 let events_sent = register!(EventsSent::from(internal_event::Output(None)));
820 let output_id = Arc::new(OutputId {
821 component: key.clone(),
822 port: None,
823 });
824 let latency_recorder = LatencyRecorder::new(self.config.global.latency_ewma_alpha);
825
826 let schema_definition_map = outputs
828 .iter()
829 .find(|x| x.port.is_none())
830 .expect("output for default port required for task transforms")
831 .log_schema_definitions
832 .clone()
833 .into_iter()
834 .map(|(key, value)| (key, Arc::new(value)))
835 .collect();
836
837 let stream = t
838 .transform(Box::pin(filtered))
839 .map(move |mut events| {
840 for event in events.iter_events_mut() {
841 update_runtime_schema_definition(event, &output_id, &schema_definition_map);
842 }
843 let now = Instant::now();
844 latency_recorder.on_send(&mut events, now);
845 (events, now)
846 })
847 .inspect(move |(events, _): &(EventArray, Instant)| {
848 events_sent.emit(CountByteSize(
849 events.len(),
850 events.estimated_json_encoded_size_of(),
851 ));
852 });
853 let stream = OutputUtilization::new(output_sender, stream);
854 let transform = async move {
855 debug!("Task transform starting.");
856
857 match fanout.send_stream(stream).await {
858 Ok(()) => {
859 debug!("Task transform finished normally.");
860 Ok(TaskOutput::Transform)
861 }
862 Err(e) => {
863 debug!("Task transform finished with an error.");
864 Err(TaskError::wrapped(e))
865 }
866 }
867 }
868 .boxed();
869
870 let mut outputs = HashMap::new();
871 outputs.insert(OutputId::from(key), control);
872
873 let task = Task::new(key.clone(), typetag, transform);
874
875 (task, outputs)
876 }
877}
878
879async fn run_source_output_pump(
880 mut rx: LimitedReceiver<SourceSenderItem>,
881 mut fanout: Fanout,
882 source: Arc<ComponentKey>,
883 source_type: &'static str,
884) -> TaskResult {
885 debug!("Source pump starting.");
886
887 while let Some(SourceSenderItem {
888 events: mut array,
889 send_reference,
890 }) = rx.next().await
891 {
892 let now = Instant::now();
899 array.for_each_metadata_mut(|metadata| {
900 metadata.set_source_id(Arc::clone(&source));
901 metadata.set_source_type(source_type);
902 metadata.set_last_transform_timestamp(now);
903 });
904 fanout
905 .send(array, Some(send_reference))
906 .await
907 .map_err(|e| {
908 debug!("Source pump finished with an error.");
909 TaskError::wrapped(e)
910 })?;
911 }
912
913 debug!("Source pump finished normally.");
914 Ok(TaskOutput::Source)
915}
916
917pub async fn reload_enrichment_tables(config: &Config) {
918 let mut enrichment_tables = HashMap::new();
919 'tables: for (name, table_outer) in config.enrichment_tables.iter() {
921 let table_name = name.to_string();
922 if ENRICHMENT_TABLES.needs_reload(&table_name) {
923 let indexes = Some(ENRICHMENT_TABLES.index_fields(&table_name));
924
925 let mut table = match table_outer.inner.build(&config.global).await {
926 Ok(table) => table,
927 Err(error) => {
928 error!("Enrichment table \"{name}\" reload failed: {error}");
929 continue;
930 }
931 };
932
933 if let Some(indexes) = indexes {
934 for (case, index) in indexes {
935 match table
936 .add_index(case, &index.iter().map(|s| s.as_ref()).collect::<Vec<_>>())
937 {
938 Ok(_) => (),
939 Err(error) => {
940 error!(
944 message = "Unable to add index to reloaded enrichment table.",
945 table = ?name.to_string(),
946 %error
947 );
948 continue 'tables;
949 }
950 }
951 }
952 }
953
954 enrichment_tables.insert(table_name, table);
955 }
956 }
957
958 ENRICHMENT_TABLES.load(enrichment_tables);
959 ENRICHMENT_TABLES.finish_load();
960}
961
962pub struct TopologyPieces {
963 pub(super) inputs: HashMap<ComponentKey, (BufferSender<EventArray>, Inputs<OutputId>)>,
964 pub(crate) outputs: HashMap<ComponentKey, HashMap<Option<String>, fanout::ControlChannel>>,
965 pub(super) tasks: HashMap<ComponentKey, Task>,
966 pub(crate) source_tasks: HashMap<ComponentKey, Task>,
967 pub(super) healthchecks: HashMap<ComponentKey, Task>,
968 pub(crate) shutdown_coordinator: SourceShutdownCoordinator,
969 pub(crate) detach_triggers: HashMap<ComponentKey, Trigger>,
970 pub(crate) metrics_storage: MetricsStorage,
971 pub(crate) utilization: Option<(UtilizationEmitter, UtilizationRegistry)>,
972}
973
974pub struct TopologyPiecesBuilder<'a> {
986 config: &'a Config,
987 diff: &'a ConfigDiff,
988 buffers: HashMap<ComponentKey, BuiltBuffer>,
989 extra_context: ExtraContext,
990 utilization_registry: Option<UtilizationRegistry>,
991}
992
993impl<'a> TopologyPiecesBuilder<'a> {
994 pub fn new(config: &'a Config, diff: &'a ConfigDiff) -> Self {
996 Self {
997 config,
998 diff,
999 buffers: HashMap::new(),
1000 extra_context: ExtraContext::default(),
1001 utilization_registry: None,
1002 }
1003 }
1004
1005 pub fn with_buffers(mut self, buffers: HashMap<ComponentKey, BuiltBuffer>) -> Self {
1007 self.buffers = buffers;
1008 self
1009 }
1010
1011 pub fn with_extra_context(mut self, extra_context: ExtraContext) -> Self {
1013 self.extra_context = extra_context;
1014 self
1015 }
1016
1017 pub fn with_utilization_registry(mut self, registry: Option<UtilizationRegistry>) -> Self {
1019 self.utilization_registry = registry;
1020 self
1021 }
1022
1023 pub async fn build(self) -> Result<TopologyPieces, Vec<String>> {
1028 Builder::new(
1029 self.config,
1030 self.diff,
1031 self.buffers,
1032 self.extra_context,
1033 self.utilization_registry,
1034 )
1035 .build()
1036 .await
1037 }
1038
1039 pub async fn build_or_log_errors(self) -> Option<TopologyPieces> {
1044 match self.build().await {
1045 Err(errors) => {
1046 for error in errors {
1047 error!(message = "Configuration error.", %error, internal_log_rate_limit = false);
1048 }
1049 None
1050 }
1051 Ok(new_pieces) => Some(new_pieces),
1052 }
1053 }
1054}
1055
1056impl TopologyPieces {
1057 pub async fn build_or_log_errors(
1058 config: &Config,
1059 diff: &ConfigDiff,
1060 buffers: HashMap<ComponentKey, BuiltBuffer>,
1061 extra_context: ExtraContext,
1062 utilization_registry: Option<UtilizationRegistry>,
1063 ) -> Option<Self> {
1064 TopologyPiecesBuilder::new(config, diff)
1065 .with_buffers(buffers)
1066 .with_extra_context(extra_context)
1067 .with_utilization_registry(utilization_registry)
1068 .build_or_log_errors()
1069 .await
1070 }
1071
1072 pub async fn build(
1074 config: &super::Config,
1075 diff: &ConfigDiff,
1076 buffers: HashMap<ComponentKey, BuiltBuffer>,
1077 extra_context: ExtraContext,
1078 utilization_registry: Option<UtilizationRegistry>,
1079 ) -> Result<Self, Vec<String>> {
1080 TopologyPiecesBuilder::new(config, diff)
1081 .with_buffers(buffers)
1082 .with_extra_context(extra_context)
1083 .with_utilization_registry(utilization_registry)
1084 .build()
1085 .await
1086 }
1087}
1088
1089const fn filter_events_type(events: &EventArray, data_type: DataType) -> bool {
1090 match events {
1091 EventArray::Logs(_) => data_type.contains(DataType::Log),
1092 EventArray::Metrics(_) => data_type.contains(DataType::Metric),
1093 EventArray::Traces(_) => data_type.contains(DataType::Trace),
1094 }
1095}
1096
1097#[derive(Debug, Clone)]
1098struct TransformNode {
1099 key: ComponentKey,
1100 typetag: &'static str,
1101 inputs: Inputs<OutputId>,
1102 input_details: Input,
1103 outputs: Vec<TransformOutput>,
1104 enable_concurrency: bool,
1105}
1106
1107impl TransformNode {
1108 pub fn from_parts(
1109 key: ComponentKey,
1110 context: &TransformContext,
1111 transform: &TransformOuter<OutputId>,
1112 schema_definition: &[(OutputId, Definition)],
1113 ) -> Self {
1114 Self {
1115 key,
1116 typetag: transform.inner.get_component_name(),
1117 inputs: transform.inputs.clone(),
1118 input_details: transform.inner.input(),
1119 outputs: transform.inner.outputs(context, schema_definition),
1120 enable_concurrency: transform.inner.enable_concurrency(),
1121 }
1122 }
1123}
1124
1125struct Runner {
1126 transform: Box<dyn SyncTransform>,
1127 input_rx: Option<BufferReceiver<EventArray>>,
1128 input_type: DataType,
1129 outputs: TransformOutputs,
1130 timer_tx: UtilizationComponentSender,
1131 latency_recorder: LatencyRecorder,
1132 events_received: Registered<EventsReceived>,
1133}
1134
1135impl Runner {
1136 fn new(
1137 transform: Box<dyn SyncTransform>,
1138 input_rx: BufferReceiver<EventArray>,
1139 timer_tx: UtilizationComponentSender,
1140 input_type: DataType,
1141 outputs: TransformOutputs,
1142 latency_recorder: LatencyRecorder,
1143 ) -> Self {
1144 Self {
1145 transform,
1146 input_rx: Some(input_rx),
1147 input_type,
1148 outputs,
1149 timer_tx,
1150 latency_recorder,
1151 events_received: register!(EventsReceived),
1152 }
1153 }
1154
1155 fn on_events_received(&mut self, events: &EventArray) {
1156 self.timer_tx.try_send_stop_wait();
1157
1158 self.events_received.emit(CountByteSize(
1159 events.len(),
1160 events.estimated_json_encoded_size_of(),
1161 ));
1162 }
1163
1164 async fn send_outputs(&mut self, outputs_buf: &mut TransformOutputsBuf) -> crate::Result<()> {
1165 self.timer_tx.try_send_start_wait();
1166 let now = Instant::now();
1167 outputs_buf.for_each_array_mut(|array| self.latency_recorder.on_send(array, now));
1168 self.outputs.send(outputs_buf).await
1169 }
1170
1171 async fn run_inline(mut self) -> TaskResult {
1172 const INLINE_BATCH_SIZE: usize = 128;
1174
1175 let mut outputs_buf = self.outputs.new_buf_with_capacity(INLINE_BATCH_SIZE);
1176
1177 let mut input_rx = self
1178 .input_rx
1179 .take()
1180 .expect("can't run runner twice")
1181 .into_stream()
1182 .filter(move |events| ready(filter_events_type(events, self.input_type)));
1183
1184 self.timer_tx.try_send_start_wait();
1185 while let Some(events) = input_rx.next().await {
1186 self.on_events_received(&events);
1187 self.transform.transform_all(events, &mut outputs_buf);
1188 self.send_outputs(&mut outputs_buf)
1189 .await
1190 .map_err(TaskError::wrapped)?;
1191 }
1192
1193 Ok(TaskOutput::Transform)
1194 }
1195
1196 async fn run_concurrently(mut self) -> TaskResult {
1197 let input_rx = self
1198 .input_rx
1199 .take()
1200 .expect("can't run runner twice")
1201 .into_stream()
1202 .filter(move |events| ready(filter_events_type(events, self.input_type)));
1203
1204 let mut input_rx =
1205 super::ready_arrays::ReadyArrays::with_capacity(input_rx, READY_ARRAY_CAPACITY);
1206
1207 let mut in_flight = FuturesOrdered::new();
1208 let mut shutting_down = false;
1209
1210 self.timer_tx.try_send_start_wait();
1211 loop {
1212 tokio::select! {
1213 biased;
1214
1215 result = in_flight.next(), if !in_flight.is_empty() => {
1216 match result {
1217 Some(Ok(mut outputs_buf)) => {
1218 self.send_outputs(&mut outputs_buf).await
1219 .map_err(TaskError::wrapped)?;
1220 }
1221 _ => unreachable!("join error or bad poll"),
1222 }
1223 }
1224
1225 input_arrays = input_rx.next(), if in_flight.len() < *TRANSFORM_CONCURRENCY_LIMIT && !shutting_down => {
1226 match input_arrays {
1227 Some(input_arrays) => {
1228 let mut len = 0;
1229 for events in &input_arrays {
1230 self.on_events_received(events);
1231 len += events.len();
1232 }
1233
1234 let mut t = self.transform.clone();
1235 let mut outputs_buf = self.outputs.new_buf_with_capacity(len);
1236 let task = tokio::spawn(async move {
1237 for events in input_arrays {
1238 t.transform_all(events, &mut outputs_buf);
1239 }
1240 outputs_buf
1241 }.in_current_span());
1242 in_flight.push_back(task);
1243 }
1244 None => {
1245 shutting_down = true;
1246 continue
1247 }
1248 }
1249 }
1250
1251 else => {
1252 if shutting_down {
1253 break
1254 }
1255 }
1256 }
1257 }
1258
1259 Ok(TaskOutput::Transform)
1260 }
1261}