vector/topology/
builder.rs

1use std::{
2    collections::HashMap,
3    future::ready,
4    num::NonZeroUsize,
5    sync::{Arc, LazyLock, Mutex},
6    time::Instant,
7};
8
9use futures::{FutureExt, StreamExt, TryStreamExt, stream::FuturesOrdered};
10use futures_util::stream::FuturesUnordered;
11use metrics::gauge;
12use stream_cancel::{StreamExt as StreamCancelExt, Trigger, Tripwire};
13use tokio::{
14    select,
15    sync::{mpsc::UnboundedSender, oneshot},
16    time::timeout,
17};
18use tracing::Instrument;
19use vector_lib::{
20    EstimatedJsonEncodedSizeOf,
21    buffers::{
22        BufferType, WhenFull,
23        topology::{
24            builder::TopologyBuilder,
25            channel::{BufferReceiver, BufferSender, ChannelMetricMetadata, LimitedReceiver},
26        },
27    },
28    internal_event::{self, CountByteSize, EventsSent, InternalEventHandle as _, Registered},
29    latency::LatencyRecorder,
30    schema::Definition,
31    source_sender::{CHUNK_SIZE, SourceSenderItem},
32    transform::update_runtime_schema_definition,
33};
34use vector_vrl_metrics::MetricsStorage;
35
36use super::{
37    BuiltBuffer, ConfigDiff,
38    fanout::{self, Fanout},
39    schema,
40    task::{Task, TaskOutput, TaskResult},
41};
42use crate::{
43    SourceSender,
44    config::{
45        ComponentKey, Config, DataType, EnrichmentTableConfig, Input, Inputs, OutputId,
46        ProxyConfig, SinkContext, SourceContext, TransformContext, TransformOuter, TransformOutput,
47    },
48    event::{EventArray, EventContainer},
49    extra_context::ExtraContext,
50    internal_events::EventsReceived,
51    shutdown::SourceShutdownCoordinator,
52    spawn_named,
53    topology::task::TaskError,
54    transforms::{SyncTransform, TaskTransform, Transform, TransformOutputs, TransformOutputsBuf},
55    utilization::{
56        OutputUtilization, Utilization, UtilizationComponentSender, UtilizationEmitter,
57        UtilizationRegistry,
58    },
59};
60
61static ENRICHMENT_TABLES: LazyLock<vector_lib::enrichment::TableRegistry> =
62    LazyLock::new(vector_lib::enrichment::TableRegistry::default);
63static METRICS_STORAGE: LazyLock<MetricsStorage> = LazyLock::new(MetricsStorage::default);
64
65pub(crate) static SOURCE_SENDER_BUFFER_SIZE: LazyLock<usize> =
66    LazyLock::new(|| *TRANSFORM_CONCURRENCY_LIMIT * CHUNK_SIZE);
67
68const READY_ARRAY_CAPACITY: NonZeroUsize = NonZeroUsize::new(CHUNK_SIZE * 4).unwrap();
69pub(crate) const TOPOLOGY_BUFFER_SIZE: NonZeroUsize = NonZeroUsize::new(100).unwrap();
70const TRANSFORM_CHANNEL_METRIC_PREFIX: &str = "transform_buffer";
71
72static TRANSFORM_CONCURRENCY_LIMIT: LazyLock<usize> = LazyLock::new(|| {
73    crate::app::worker_threads()
74        .map(std::num::NonZeroUsize::get)
75        .unwrap_or_else(crate::num_threads)
76});
77
78const INTERNAL_SOURCES: [&str; 2] = ["internal_logs", "internal_metrics"];
79
80struct Builder<'a> {
81    config: &'a super::Config,
82    diff: &'a ConfigDiff,
83    shutdown_coordinator: SourceShutdownCoordinator,
84    errors: Vec<String>,
85    outputs: HashMap<OutputId, UnboundedSender<fanout::ControlMessage>>,
86    tasks: HashMap<ComponentKey, Task>,
87    buffers: HashMap<ComponentKey, BuiltBuffer>,
88    inputs: HashMap<ComponentKey, (BufferSender<EventArray>, Inputs<OutputId>)>,
89    healthchecks: HashMap<ComponentKey, Task>,
90    detach_triggers: HashMap<ComponentKey, Trigger>,
91    extra_context: ExtraContext,
92    utilization_emitter: Option<UtilizationEmitter>,
93    utilization_registry: UtilizationRegistry,
94}
95
96impl<'a> Builder<'a> {
97    fn new(
98        config: &'a super::Config,
99        diff: &'a ConfigDiff,
100        buffers: HashMap<ComponentKey, BuiltBuffer>,
101        extra_context: ExtraContext,
102        utilization_registry: Option<UtilizationRegistry>,
103    ) -> Self {
104        // If registry is not passed, we need to build a whole new utilization emitter + registry
105        // Otherwise, we just store the registry and reuse it for this build
106        let (emitter, registry) = if let Some(registry) = utilization_registry {
107            (None, registry)
108        } else {
109            let (emitter, registry) = UtilizationEmitter::new();
110            (Some(emitter), registry)
111        };
112        Self {
113            config,
114            diff,
115            buffers,
116            shutdown_coordinator: SourceShutdownCoordinator::default(),
117            errors: vec![],
118            outputs: HashMap::new(),
119            tasks: HashMap::new(),
120            inputs: HashMap::new(),
121            healthchecks: HashMap::new(),
122            detach_triggers: HashMap::new(),
123            extra_context,
124            utilization_emitter: emitter,
125            utilization_registry: registry,
126        }
127    }
128
129    /// Builds the new pieces of the topology found in `self.diff`.
130    async fn build(mut self) -> Result<TopologyPieces, Vec<String>> {
131        let enrichment_tables = self.load_enrichment_tables().await;
132        let source_tasks = self.build_sources(enrichment_tables).await;
133        self.build_transforms(enrichment_tables).await;
134        self.build_sinks(enrichment_tables).await;
135
136        // We should have all the data for the enrichment tables loaded now, so switch them over to
137        // readonly.
138        enrichment_tables.finish_load();
139
140        if self.errors.is_empty() {
141            Ok(TopologyPieces {
142                inputs: self.inputs,
143                outputs: Self::finalize_outputs(self.outputs),
144                tasks: self.tasks,
145                source_tasks,
146                healthchecks: self.healthchecks,
147                shutdown_coordinator: self.shutdown_coordinator,
148                detach_triggers: self.detach_triggers,
149                metrics_storage: METRICS_STORAGE.clone(),
150                utilization: self
151                    .utilization_emitter
152                    .map(|e| (e, self.utilization_registry)),
153            })
154        } else {
155            Err(self.errors)
156        }
157    }
158
159    fn finalize_outputs(
160        outputs: HashMap<OutputId, UnboundedSender<fanout::ControlMessage>>,
161    ) -> HashMap<ComponentKey, HashMap<Option<String>, UnboundedSender<fanout::ControlMessage>>>
162    {
163        let mut finalized_outputs = HashMap::new();
164        for (id, output) in outputs {
165            let entry = finalized_outputs
166                .entry(id.component)
167                .or_insert_with(HashMap::new);
168            entry.insert(id.port, output);
169        }
170
171        finalized_outputs
172    }
173
174    /// Loads, or reloads the enrichment tables.
175    /// The tables are stored in the `ENRICHMENT_TABLES` global variable.
176    async fn load_enrichment_tables(&mut self) -> &'static vector_lib::enrichment::TableRegistry {
177        let mut enrichment_tables = HashMap::new();
178
179        // Build enrichment tables
180        'tables: for (name, table_outer) in self.config.enrichment_tables.iter() {
181            let table_name = name.to_string();
182            if ENRICHMENT_TABLES.needs_reload(&table_name) {
183                let indexes = if !self.diff.enrichment_tables.is_added(name) {
184                    // If this is an existing enrichment table, we need to store the indexes to reapply
185                    // them again post load.
186                    Some(ENRICHMENT_TABLES.index_fields(&table_name))
187                } else {
188                    None
189                };
190
191                let mut table = match table_outer.inner.build(&self.config.global).await {
192                    Ok(table) => table,
193                    Err(error) => {
194                        self.errors
195                            .push(format!("Enrichment Table \"{name}\": {error}"));
196                        continue;
197                    }
198                };
199
200                if let Some(indexes) = indexes {
201                    for (case, index) in indexes {
202                        match table
203                            .add_index(case, &index.iter().map(|s| s.as_ref()).collect::<Vec<_>>())
204                        {
205                            Ok(_) => (),
206                            Err(error) => {
207                                // If there is an error adding an index we do not want to use the reloaded
208                                // data, the previously loaded data will still need to be used.
209                                // Just report the error and continue.
210                                error!(message = "Unable to add index to reloaded enrichment table.",
211                                    table = ?name.to_string(),
212                                    %error);
213                                continue 'tables;
214                            }
215                        }
216                    }
217                }
218
219                enrichment_tables.insert(table_name, table);
220            }
221        }
222
223        ENRICHMENT_TABLES.load(enrichment_tables);
224
225        &ENRICHMENT_TABLES
226    }
227
228    async fn build_sources(
229        &mut self,
230        enrichment_tables: &vector_lib::enrichment::TableRegistry,
231    ) -> HashMap<ComponentKey, Task> {
232        let mut source_tasks = HashMap::new();
233
234        let table_sources = self
235            .config
236            .enrichment_tables
237            .iter()
238            .filter_map(|(key, table)| table.as_source(key))
239            .collect::<Vec<_>>();
240        for (key, source) in self
241            .config
242            .sources()
243            .filter(|(key, _)| self.diff.sources.contains_new(key))
244            .chain(
245                table_sources
246                    .iter()
247                    .map(|(key, source)| (key, source))
248                    .filter(|(key, _)| self.diff.enrichment_tables.contains_new(key)),
249            )
250        {
251            debug!(component_id = %key, "Building new source.");
252
253            let typetag = source.inner.get_component_name();
254            let source_outputs = source.inner.outputs(self.config.schema.log_namespace());
255
256            let span = error_span!(
257                "source",
258                component_kind = "source",
259                component_id = %key.id(),
260                component_type = %source.inner.get_component_name(),
261            );
262            let _entered_span = span.enter();
263
264            let task_name = format!(
265                ">> {} ({}, pump) >>",
266                source.inner.get_component_name(),
267                key.id()
268            );
269
270            let mut builder = SourceSender::builder()
271                .with_buffer(*SOURCE_SENDER_BUFFER_SIZE)
272                .with_timeout(source.inner.send_timeout())
273                .with_ewma_half_life_seconds(
274                    self.config.global.buffer_utilization_ewma_half_life_seconds,
275                );
276            let mut pumps = Vec::new();
277            let mut controls = HashMap::new();
278            let mut schema_definitions = HashMap::with_capacity(source_outputs.len());
279
280            for output in source_outputs.into_iter() {
281                let rx = builder.add_source_output(output.clone(), key.clone());
282
283                let (fanout, control) = Fanout::new();
284                let source_type = source.inner.get_component_name();
285                let source = Arc::new(key.clone());
286
287                let pump = run_source_output_pump(rx, fanout, source, source_type);
288
289                pumps.push(pump.instrument(span.clone()));
290                controls.insert(
291                    OutputId {
292                        component: key.clone(),
293                        port: output.port.clone(),
294                    },
295                    control,
296                );
297
298                let port = output.port.clone();
299                if let Some(definition) = output.schema_definition(self.config.schema.enabled) {
300                    schema_definitions.insert(port, definition);
301                }
302            }
303
304            let (pump_error_tx, mut pump_error_rx) = oneshot::channel();
305            let pump = async move {
306                debug!("Source pump supervisor starting.");
307
308                // Spawn all of the per-output pumps and then await their completion.
309                //
310                // If any of the pumps complete with an error, or panic/are cancelled, we return
311                // immediately.
312                let mut handles = FuturesUnordered::new();
313                for pump in pumps {
314                    handles.push(spawn_named(pump, task_name.as_ref()));
315                }
316
317                let mut had_pump_error = false;
318                while let Some(output) = handles.try_next().await? {
319                    if let Err(e) = output {
320                        // Immediately send the error to the source's wrapper future, but ignore any
321                        // errors during the send, since nested errors wouldn't make any sense here.
322                        _ = pump_error_tx.send(e);
323                        had_pump_error = true;
324                        break;
325                    }
326                }
327
328                if had_pump_error {
329                    debug!("Source pump supervisor task finished with an error.");
330                } else {
331                    debug!("Source pump supervisor task finished normally.");
332                }
333                Ok(TaskOutput::Source)
334            };
335            let pump = Task::new(key.clone(), typetag, pump);
336
337            let (shutdown_signal, force_shutdown_tripwire) = self
338                .shutdown_coordinator
339                .register_source(key, INTERNAL_SOURCES.contains(&typetag));
340
341            let context = SourceContext {
342                key: key.clone(),
343                globals: self.config.global.clone(),
344                enrichment_tables: enrichment_tables.clone(),
345                metrics_storage: METRICS_STORAGE.clone(),
346                shutdown: shutdown_signal,
347                out: builder.build(),
348                proxy: ProxyConfig::merge_with_env(&self.config.global.proxy, &source.proxy),
349                acknowledgements: source.sink_acknowledgements,
350                schema_definitions,
351                schema: self.config.schema,
352                extra_context: self.extra_context.clone(),
353            };
354            let server = match source.inner.build(context).await {
355                Err(error) => {
356                    self.errors.push(format!("Source \"{key}\": {error}"));
357                    continue;
358                }
359                Ok(server) => server,
360            };
361
362            // Build a wrapper future that drives the actual source future, but returns early if we've
363            // been signalled to forcefully shutdown, or if the source pump encounters an error.
364            //
365            // The forceful shutdown will only resolve if the source itself doesn't shutdown gracefully
366            // within the allotted time window. This can occur normally for certain sources, like stdin,
367            // where the I/O is blocking (in a separate thread) and won't wake up to check if it's time
368            // to shutdown unless some input is given.
369            let server = async move {
370                debug!("Source starting.");
371
372                let mut result = select! {
373                    biased;
374
375                    // We've been told that we must forcefully shut down.
376                    _ = force_shutdown_tripwire => Ok(()),
377
378                    // The source pump encountered an error, which we're now bubbling up here to stop
379                    // the source as well, since the source running makes no sense without the pump.
380                    //
381                    // We only match receiving a message, not the error of the sender being dropped,
382                    // just to keep things simpler.
383                    Ok(e) = &mut pump_error_rx => Err(e),
384
385                    // The source finished normally.
386                    result = server => result.map_err(|_| TaskError::Opaque),
387                };
388
389                // Even though we already tried to receive any pump task error above, we may have exited
390                // on the source itself returning an error due to task scheduling, where the pump task
391                // encountered an error, sent it over the oneshot, but we were polling the source
392                // already and hit an error trying to send to the now-shutdown pump task.
393                //
394                // Since the error from the source is opaque at the moment (i.e. `()`), we try a final
395                // time to see if the pump task encountered an error, using _that_ instead if so, to
396                // propagate the true error that caused the source to have to stop.
397                if let Ok(e) = pump_error_rx.try_recv() {
398                    result = Err(e);
399                }
400
401                match result {
402                    Ok(()) => {
403                        debug!("Source finished normally.");
404                        Ok(TaskOutput::Source)
405                    }
406                    Err(e) => {
407                        debug!("Source finished with an error.");
408                        Err(e)
409                    }
410                }
411            };
412            let server = Task::new(key.clone(), typetag, server);
413
414            self.outputs.extend(controls);
415            self.tasks.insert(key.clone(), pump);
416            source_tasks.insert(key.clone(), server);
417        }
418
419        source_tasks
420    }
421
422    async fn build_transforms(
423        &mut self,
424        enrichment_tables: &vector_lib::enrichment::TableRegistry,
425    ) {
426        let mut definition_cache = HashMap::default();
427
428        for (key, transform) in self
429            .config
430            .transforms()
431            .filter(|(key, _)| self.diff.transforms.contains_new(key))
432        {
433            debug!(component_id = %key, "Building new transform.");
434
435            let input_definitions = match schema::input_definitions(
436                &transform.inputs,
437                self.config,
438                enrichment_tables.clone(),
439                &mut definition_cache,
440            ) {
441                Ok(definitions) => definitions,
442                Err(_) => {
443                    // We have received an error whilst retrieving the definitions,
444                    // there is no point in continuing.
445
446                    return;
447                }
448            };
449
450            let merged_definition: Definition = input_definitions
451                .iter()
452                .map(|(_output_id, definition)| definition.clone())
453                .reduce(Definition::merge)
454                // We may not have any definitions if all the inputs are from metrics sources.
455                .unwrap_or_else(Definition::any);
456
457            let span = error_span!(
458                "transform",
459                component_kind = "transform",
460                component_id = %key.id(),
461                component_type = %transform.inner.get_component_name(),
462            );
463            let _span = span.enter();
464
465            // Create a map of the outputs to the list of possible definitions from those outputs.
466            let schema_definitions = transform
467                .inner
468                .outputs(
469                    &TransformContext {
470                        enrichment_tables: enrichment_tables.clone(),
471                        metrics_storage: METRICS_STORAGE.clone(),
472                        schema: self.config.schema,
473                        ..Default::default()
474                    },
475                    &input_definitions,
476                )
477                .into_iter()
478                .map(|output| {
479                    let definitions = output.schema_definitions(self.config.schema.enabled);
480                    (output.port, definitions)
481                })
482                .collect::<HashMap<_, _>>();
483
484            let context = TransformContext {
485                key: Some(key.clone()),
486                globals: self.config.global.clone(),
487                enrichment_tables: enrichment_tables.clone(),
488                metrics_storage: METRICS_STORAGE.clone(),
489                schema_definitions,
490                merged_schema_definition: merged_definition.clone(),
491                schema: self.config.schema,
492                extra_context: self.extra_context.clone(),
493            };
494
495            let node =
496                TransformNode::from_parts(key.clone(), &context, transform, &input_definitions);
497
498            let transform = match transform
499                .inner
500                .build(&context)
501                .instrument(span.clone())
502                .await
503            {
504                Err(error) => {
505                    self.errors.push(format!("Transform \"{key}\": {error}"));
506                    continue;
507                }
508                Ok(transform) => transform,
509            };
510
511            let metrics = ChannelMetricMetadata::new(TRANSFORM_CHANNEL_METRIC_PREFIX, None);
512            let (input_tx, input_rx) = TopologyBuilder::standalone_memory(
513                TOPOLOGY_BUFFER_SIZE,
514                WhenFull::Block,
515                &span,
516                Some(metrics),
517                self.config.global.buffer_utilization_ewma_half_life_seconds,
518            );
519
520            self.inputs
521                .insert(key.clone(), (input_tx, node.inputs.clone()));
522
523            let (transform_task, transform_outputs) =
524                self.build_transform(transform, node, input_rx);
525
526            self.outputs.extend(transform_outputs);
527            self.tasks.insert(key.clone(), transform_task);
528        }
529    }
530
531    async fn build_sinks(&mut self, enrichment_tables: &vector_lib::enrichment::TableRegistry) {
532        let table_sinks = self
533            .config
534            .enrichment_tables
535            .iter()
536            .filter_map(|(key, table)| table.as_sink(key))
537            .collect::<Vec<_>>();
538        for (key, sink) in self
539            .config
540            .sinks()
541            .filter(|(key, _)| self.diff.sinks.contains_new(key))
542            .chain(
543                table_sinks
544                    .iter()
545                    .map(|(key, sink)| (key, sink))
546                    .filter(|(key, _)| self.diff.enrichment_tables.contains_new(key)),
547            )
548        {
549            debug!(component_id = %key, "Building new sink.");
550
551            let sink_inputs = &sink.inputs;
552            let healthcheck = sink.healthcheck();
553            let enable_healthcheck = healthcheck.enabled && self.config.healthchecks.enabled;
554            let healthcheck_timeout = healthcheck.timeout;
555
556            let typetag = sink.inner.get_component_name();
557            let input_type = sink.inner.input().data_type();
558
559            let span = error_span!(
560                "sink",
561                component_kind = "sink",
562                component_id = %key.id(),
563                component_type = %sink.inner.get_component_name(),
564            );
565            let _entered_span = span.enter();
566
567            // At this point, we've validated that all transforms are valid, including any
568            // transform that mutates the schema provided by their sources. We can now validate the
569            // schema expectations of each individual sink.
570            if let Err(mut err) = schema::validate_sink_expectations(
571                key,
572                sink,
573                self.config,
574                enrichment_tables.clone(),
575            ) {
576                self.errors.append(&mut err);
577            };
578
579            let (tx, rx) = match self.buffers.remove(key) {
580                Some(buffer) => buffer,
581                _ => {
582                    let buffer_type =
583                        match sink.buffer.stages().first().expect("cant ever be empty") {
584                            BufferType::Memory { .. } => "memory",
585                            BufferType::DiskV2 { .. } => "disk",
586                        };
587                    let buffer_span = error_span!("sink", buffer_type);
588                    let buffer = sink
589                        .buffer
590                        .build(
591                            self.config.global.data_dir.clone(),
592                            key.to_string(),
593                            buffer_span,
594                        )
595                        .await;
596                    match buffer {
597                        Err(error) => {
598                            self.errors.push(format!("Sink \"{key}\": {error}"));
599                            continue;
600                        }
601                        Ok((tx, rx)) => (tx, Arc::new(Mutex::new(Some(rx.into_stream())))),
602                    }
603                }
604            };
605
606            let cx = SinkContext {
607                healthcheck,
608                globals: self.config.global.clone(),
609                enrichment_tables: enrichment_tables.clone(),
610                metrics_storage: METRICS_STORAGE.clone(),
611                proxy: ProxyConfig::merge_with_env(&self.config.global.proxy, sink.proxy()),
612                schema: self.config.schema,
613                app_name: crate::get_app_name().to_string(),
614                app_name_slug: crate::get_slugified_app_name(),
615                extra_context: self.extra_context.clone(),
616            };
617
618            let (sink, healthcheck) = match sink.inner.build(cx).await {
619                Err(error) => {
620                    self.errors.push(format!("Sink \"{key}\": {error}"));
621                    continue;
622                }
623                Ok(built) => built,
624            };
625
626            let (trigger, tripwire) = Tripwire::new();
627
628            let utilization_sender = self
629                .utilization_registry
630                .add_component(key.clone(), gauge!("utilization"));
631            let component_key = key.clone();
632            let sink = async move {
633                debug!("Sink starting.");
634
635                // Why is this Arc<Mutex<Option<_>>> needed you ask.
636                // In case when this function build_pieces errors
637                // this future won't be run so this rx won't be taken
638                // which will enable us to reuse rx to rebuild
639                // old configuration by passing this Arc<Mutex<Option<_>>>
640                // yet again.
641                let rx = rx
642                    .lock()
643                    .unwrap()
644                    .take()
645                    .expect("Task started but input has been taken.");
646
647                let mut rx = Utilization::new(utilization_sender, component_key.clone(), rx);
648
649                let events_received = register!(EventsReceived);
650                sink.run(
651                    rx.by_ref()
652                        .filter(|events: &EventArray| ready(filter_events_type(events, input_type)))
653                        .inspect(|events| {
654                            events_received.emit(CountByteSize(
655                                events.len(),
656                                events.estimated_json_encoded_size_of(),
657                            ))
658                        })
659                        .take_until_if(tripwire),
660                )
661                .await
662                .map(|_| {
663                    debug!("Sink finished normally.");
664                    TaskOutput::Sink(rx)
665                })
666                .map_err(|_| {
667                    debug!("Sink finished with an error.");
668                    TaskError::Opaque
669                })
670            };
671
672            let task = Task::new(key.clone(), typetag, sink);
673
674            let component_key = key.clone();
675            let healthcheck_task = async move {
676                if enable_healthcheck {
677                    timeout(healthcheck_timeout, healthcheck)
678                        .map(|result| match result {
679                            Ok(Ok(_)) => {
680                                info!("Healthcheck passed.");
681                                Ok(TaskOutput::Healthcheck)
682                            }
683                            Ok(Err(error)) => {
684                                error!(
685                                    msg = "Healthcheck failed.",
686                                    %error,
687                                    component_kind = "sink",
688                                    component_type = typetag,
689                                    component_id = %component_key.id(),
690                                );
691                                Err(TaskError::wrapped(error))
692                            }
693                            Err(e) => {
694                                error!(
695                                    msg = "Healthcheck timed out.",
696                                    component_kind = "sink",
697                                    component_type = typetag,
698                                    component_id = %component_key.id(),
699                                );
700                                Err(TaskError::wrapped(Box::new(e)))
701                            }
702                        })
703                        .await
704                } else {
705                    info!("Healthcheck disabled.");
706                    Ok(TaskOutput::Healthcheck)
707                }
708            };
709
710            let healthcheck_task = Task::new(key.clone(), typetag, healthcheck_task);
711
712            self.inputs.insert(key.clone(), (tx, sink_inputs.clone()));
713            self.healthchecks.insert(key.clone(), healthcheck_task);
714            self.tasks.insert(key.clone(), task);
715            self.detach_triggers.insert(key.clone(), trigger);
716        }
717    }
718
719    fn build_transform(
720        &self,
721        transform: Transform,
722        node: TransformNode,
723        input_rx: BufferReceiver<EventArray>,
724    ) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
725        match transform {
726            // TODO: avoid the double boxing for function transforms here
727            Transform::Function(t) => self.build_sync_transform(Box::new(t), node, input_rx),
728            Transform::Synchronous(t) => self.build_sync_transform(t, node, input_rx),
729            Transform::Task(t) => self.build_task_transform(
730                t,
731                input_rx,
732                node.input_details.data_type(),
733                node.typetag,
734                &node.key,
735                &node.outputs,
736            ),
737        }
738    }
739
740    fn build_sync_transform(
741        &self,
742        t: Box<dyn SyncTransform>,
743        node: TransformNode,
744        input_rx: BufferReceiver<EventArray>,
745    ) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
746        let (outputs, controls) = TransformOutputs::new(node.outputs, &node.key);
747
748        let sender = self
749            .utilization_registry
750            .add_component(node.key.clone(), gauge!("utilization"));
751        let runner = Runner::new(
752            t,
753            input_rx,
754            sender,
755            node.input_details.data_type(),
756            outputs,
757            LatencyRecorder::new(self.config.global.latency_ewma_alpha),
758        );
759        let transform = if node.enable_concurrency {
760            runner.run_concurrently().boxed()
761        } else {
762            runner.run_inline().boxed()
763        };
764
765        let transform = async move {
766            debug!("Synchronous transform starting.");
767
768            match transform.await {
769                Ok(v) => {
770                    debug!("Synchronous transform finished normally.");
771                    Ok(v)
772                }
773                Err(e) => {
774                    debug!("Synchronous transform finished with an error.");
775                    Err(e)
776                }
777            }
778        };
779
780        let mut output_controls = HashMap::new();
781        for (name, control) in controls {
782            let id = name
783                .map(|name| OutputId::from((&node.key, name)))
784                .unwrap_or_else(|| OutputId::from(&node.key));
785            output_controls.insert(id, control);
786        }
787
788        let task = Task::new(node.key.clone(), node.typetag, transform);
789
790        (task, output_controls)
791    }
792
793    fn build_task_transform(
794        &self,
795        t: Box<dyn TaskTransform<EventArray>>,
796        input_rx: BufferReceiver<EventArray>,
797        input_type: DataType,
798        typetag: &str,
799        key: &ComponentKey,
800        outputs: &[TransformOutput],
801    ) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
802        let (mut fanout, control) = Fanout::new();
803
804        let sender = self
805            .utilization_registry
806            .add_component(key.clone(), gauge!("utilization"));
807        let output_sender = sender.clone();
808        let input_rx = Utilization::new(sender, key.clone(), input_rx.into_stream());
809
810        let events_received = register!(EventsReceived);
811        let filtered = input_rx
812            .filter(move |events| ready(filter_events_type(events, input_type)))
813            .inspect(move |events| {
814                events_received.emit(CountByteSize(
815                    events.len(),
816                    events.estimated_json_encoded_size_of(),
817                ))
818            });
819        let events_sent = register!(EventsSent::from(internal_event::Output(None)));
820        let output_id = Arc::new(OutputId {
821            component: key.clone(),
822            port: None,
823        });
824        let latency_recorder = LatencyRecorder::new(self.config.global.latency_ewma_alpha);
825
826        // Task transforms can only write to the default output, so only a single schema def map is needed
827        let schema_definition_map = outputs
828            .iter()
829            .find(|x| x.port.is_none())
830            .expect("output for default port required for task transforms")
831            .log_schema_definitions
832            .clone()
833            .into_iter()
834            .map(|(key, value)| (key, Arc::new(value)))
835            .collect();
836
837        let stream = t
838            .transform(Box::pin(filtered))
839            .map(move |mut events| {
840                for event in events.iter_events_mut() {
841                    update_runtime_schema_definition(event, &output_id, &schema_definition_map);
842                }
843                let now = Instant::now();
844                latency_recorder.on_send(&mut events, now);
845                (events, now)
846            })
847            .inspect(move |(events, _): &(EventArray, Instant)| {
848                events_sent.emit(CountByteSize(
849                    events.len(),
850                    events.estimated_json_encoded_size_of(),
851                ));
852            });
853        let stream = OutputUtilization::new(output_sender, stream);
854        let transform = async move {
855            debug!("Task transform starting.");
856
857            match fanout.send_stream(stream).await {
858                Ok(()) => {
859                    debug!("Task transform finished normally.");
860                    Ok(TaskOutput::Transform)
861                }
862                Err(e) => {
863                    debug!("Task transform finished with an error.");
864                    Err(TaskError::wrapped(e))
865                }
866            }
867        }
868        .boxed();
869
870        let mut outputs = HashMap::new();
871        outputs.insert(OutputId::from(key), control);
872
873        let task = Task::new(key.clone(), typetag, transform);
874
875        (task, outputs)
876    }
877}
878
879async fn run_source_output_pump(
880    mut rx: LimitedReceiver<SourceSenderItem>,
881    mut fanout: Fanout,
882    source: Arc<ComponentKey>,
883    source_type: &'static str,
884) -> TaskResult {
885    debug!("Source pump starting.");
886
887    while let Some(SourceSenderItem {
888        events: mut array,
889        send_reference,
890    }) = rx.next().await
891    {
892        // Even though we have a `send_reference` timestamp above, that reference time is when
893        // the events were enqueued in the `SourceSender`, not when they were pulled out of the
894        // `rx` stream on this end. Since those times can be quite different (due to blocking
895        // inherent to the fanout send operation), we set the `last_transform_timestamp` to the
896        // current time instead to get an accurate reference for when the events started waiting
897        // for the first transform.
898        let now = Instant::now();
899        array.for_each_metadata_mut(|metadata| {
900            metadata.set_source_id(Arc::clone(&source));
901            metadata.set_source_type(source_type);
902            metadata.set_last_transform_timestamp(now);
903        });
904        fanout
905            .send(array, Some(send_reference))
906            .await
907            .map_err(|e| {
908                debug!("Source pump finished with an error.");
909                TaskError::wrapped(e)
910            })?;
911    }
912
913    debug!("Source pump finished normally.");
914    Ok(TaskOutput::Source)
915}
916
917pub async fn reload_enrichment_tables(config: &Config) {
918    let mut enrichment_tables = HashMap::new();
919    // Build enrichment tables
920    'tables: for (name, table_outer) in config.enrichment_tables.iter() {
921        let table_name = name.to_string();
922        if ENRICHMENT_TABLES.needs_reload(&table_name) {
923            let indexes = Some(ENRICHMENT_TABLES.index_fields(&table_name));
924
925            let mut table = match table_outer.inner.build(&config.global).await {
926                Ok(table) => table,
927                Err(error) => {
928                    error!("Enrichment table \"{name}\" reload failed: {error}");
929                    continue;
930                }
931            };
932
933            if let Some(indexes) = indexes {
934                for (case, index) in indexes {
935                    match table
936                        .add_index(case, &index.iter().map(|s| s.as_ref()).collect::<Vec<_>>())
937                    {
938                        Ok(_) => (),
939                        Err(error) => {
940                            // If there is an error adding an index we do not want to use the reloaded
941                            // data, the previously loaded data will still need to be used.
942                            // Just report the error and continue.
943                            error!(
944                                message = "Unable to add index to reloaded enrichment table.",
945                                table = ?name.to_string(),
946                                %error
947                            );
948                            continue 'tables;
949                        }
950                    }
951                }
952            }
953
954            enrichment_tables.insert(table_name, table);
955        }
956    }
957
958    ENRICHMENT_TABLES.load(enrichment_tables);
959    ENRICHMENT_TABLES.finish_load();
960}
961
962pub struct TopologyPieces {
963    pub(super) inputs: HashMap<ComponentKey, (BufferSender<EventArray>, Inputs<OutputId>)>,
964    pub(crate) outputs: HashMap<ComponentKey, HashMap<Option<String>, fanout::ControlChannel>>,
965    pub(super) tasks: HashMap<ComponentKey, Task>,
966    pub(crate) source_tasks: HashMap<ComponentKey, Task>,
967    pub(super) healthchecks: HashMap<ComponentKey, Task>,
968    pub(crate) shutdown_coordinator: SourceShutdownCoordinator,
969    pub(crate) detach_triggers: HashMap<ComponentKey, Trigger>,
970    pub(crate) metrics_storage: MetricsStorage,
971    pub(crate) utilization: Option<(UtilizationEmitter, UtilizationRegistry)>,
972}
973
974/// Builder for constructing TopologyPieces with a fluent API.
975///
976/// # Examples
977///
978/// ```ignore
979/// let pieces = TopologyPiecesBuilder::new(&config, &diff)
980///     .with_buffers(buffers)
981///     .with_extra_context(extra_context)
982///     .build()
983///     .await?;
984/// ```
985pub struct TopologyPiecesBuilder<'a> {
986    config: &'a Config,
987    diff: &'a ConfigDiff,
988    buffers: HashMap<ComponentKey, BuiltBuffer>,
989    extra_context: ExtraContext,
990    utilization_registry: Option<UtilizationRegistry>,
991}
992
993impl<'a> TopologyPiecesBuilder<'a> {
994    /// Creates a new builder with required parameters.
995    pub fn new(config: &'a Config, diff: &'a ConfigDiff) -> Self {
996        Self {
997            config,
998            diff,
999            buffers: HashMap::new(),
1000            extra_context: ExtraContext::default(),
1001            utilization_registry: None,
1002        }
1003    }
1004
1005    /// Sets the buffers for the topology.
1006    pub fn with_buffers(mut self, buffers: HashMap<ComponentKey, BuiltBuffer>) -> Self {
1007        self.buffers = buffers;
1008        self
1009    }
1010
1011    /// Sets the extra context for the topology.
1012    pub fn with_extra_context(mut self, extra_context: ExtraContext) -> Self {
1013        self.extra_context = extra_context;
1014        self
1015    }
1016
1017    /// Sets the utilization registry for the topology.
1018    pub fn with_utilization_registry(mut self, registry: Option<UtilizationRegistry>) -> Self {
1019        self.utilization_registry = registry;
1020        self
1021    }
1022
1023    /// Builds the topology pieces, returning errors if any occur.
1024    ///
1025    /// Use this method when you need to handle errors explicitly,
1026    /// such as in tests or validation code.
1027    pub async fn build(self) -> Result<TopologyPieces, Vec<String>> {
1028        Builder::new(
1029            self.config,
1030            self.diff,
1031            self.buffers,
1032            self.extra_context,
1033            self.utilization_registry,
1034        )
1035        .build()
1036        .await
1037    }
1038
1039    /// Builds the topology pieces, logging any errors that occur.
1040    ///
1041    /// Use this method for runtime configuration loading where
1042    /// errors should be logged and execution should continue.
1043    pub async fn build_or_log_errors(self) -> Option<TopologyPieces> {
1044        match self.build().await {
1045            Err(errors) => {
1046                for error in errors {
1047                    error!(message = "Configuration error.", %error, internal_log_rate_limit = false);
1048                }
1049                None
1050            }
1051            Ok(new_pieces) => Some(new_pieces),
1052        }
1053    }
1054}
1055
1056impl TopologyPieces {
1057    pub async fn build_or_log_errors(
1058        config: &Config,
1059        diff: &ConfigDiff,
1060        buffers: HashMap<ComponentKey, BuiltBuffer>,
1061        extra_context: ExtraContext,
1062        utilization_registry: Option<UtilizationRegistry>,
1063    ) -> Option<Self> {
1064        TopologyPiecesBuilder::new(config, diff)
1065            .with_buffers(buffers)
1066            .with_extra_context(extra_context)
1067            .with_utilization_registry(utilization_registry)
1068            .build_or_log_errors()
1069            .await
1070    }
1071
1072    /// Builds only the new pieces, and doesn't check their topology.
1073    pub async fn build(
1074        config: &super::Config,
1075        diff: &ConfigDiff,
1076        buffers: HashMap<ComponentKey, BuiltBuffer>,
1077        extra_context: ExtraContext,
1078        utilization_registry: Option<UtilizationRegistry>,
1079    ) -> Result<Self, Vec<String>> {
1080        TopologyPiecesBuilder::new(config, diff)
1081            .with_buffers(buffers)
1082            .with_extra_context(extra_context)
1083            .with_utilization_registry(utilization_registry)
1084            .build()
1085            .await
1086    }
1087}
1088
1089const fn filter_events_type(events: &EventArray, data_type: DataType) -> bool {
1090    match events {
1091        EventArray::Logs(_) => data_type.contains(DataType::Log),
1092        EventArray::Metrics(_) => data_type.contains(DataType::Metric),
1093        EventArray::Traces(_) => data_type.contains(DataType::Trace),
1094    }
1095}
1096
1097#[derive(Debug, Clone)]
1098struct TransformNode {
1099    key: ComponentKey,
1100    typetag: &'static str,
1101    inputs: Inputs<OutputId>,
1102    input_details: Input,
1103    outputs: Vec<TransformOutput>,
1104    enable_concurrency: bool,
1105}
1106
1107impl TransformNode {
1108    pub fn from_parts(
1109        key: ComponentKey,
1110        context: &TransformContext,
1111        transform: &TransformOuter<OutputId>,
1112        schema_definition: &[(OutputId, Definition)],
1113    ) -> Self {
1114        Self {
1115            key,
1116            typetag: transform.inner.get_component_name(),
1117            inputs: transform.inputs.clone(),
1118            input_details: transform.inner.input(),
1119            outputs: transform.inner.outputs(context, schema_definition),
1120            enable_concurrency: transform.inner.enable_concurrency(),
1121        }
1122    }
1123}
1124
1125struct Runner {
1126    transform: Box<dyn SyncTransform>,
1127    input_rx: Option<BufferReceiver<EventArray>>,
1128    input_type: DataType,
1129    outputs: TransformOutputs,
1130    timer_tx: UtilizationComponentSender,
1131    latency_recorder: LatencyRecorder,
1132    events_received: Registered<EventsReceived>,
1133}
1134
1135impl Runner {
1136    fn new(
1137        transform: Box<dyn SyncTransform>,
1138        input_rx: BufferReceiver<EventArray>,
1139        timer_tx: UtilizationComponentSender,
1140        input_type: DataType,
1141        outputs: TransformOutputs,
1142        latency_recorder: LatencyRecorder,
1143    ) -> Self {
1144        Self {
1145            transform,
1146            input_rx: Some(input_rx),
1147            input_type,
1148            outputs,
1149            timer_tx,
1150            latency_recorder,
1151            events_received: register!(EventsReceived),
1152        }
1153    }
1154
1155    fn on_events_received(&mut self, events: &EventArray) {
1156        self.timer_tx.try_send_stop_wait();
1157
1158        self.events_received.emit(CountByteSize(
1159            events.len(),
1160            events.estimated_json_encoded_size_of(),
1161        ));
1162    }
1163
1164    async fn send_outputs(&mut self, outputs_buf: &mut TransformOutputsBuf) -> crate::Result<()> {
1165        self.timer_tx.try_send_start_wait();
1166        let now = Instant::now();
1167        outputs_buf.for_each_array_mut(|array| self.latency_recorder.on_send(array, now));
1168        self.outputs.send(outputs_buf).await
1169    }
1170
1171    async fn run_inline(mut self) -> TaskResult {
1172        // 128 is an arbitrary, smallish constant
1173        const INLINE_BATCH_SIZE: usize = 128;
1174
1175        let mut outputs_buf = self.outputs.new_buf_with_capacity(INLINE_BATCH_SIZE);
1176
1177        let mut input_rx = self
1178            .input_rx
1179            .take()
1180            .expect("can't run runner twice")
1181            .into_stream()
1182            .filter(move |events| ready(filter_events_type(events, self.input_type)));
1183
1184        self.timer_tx.try_send_start_wait();
1185        while let Some(events) = input_rx.next().await {
1186            self.on_events_received(&events);
1187            self.transform.transform_all(events, &mut outputs_buf);
1188            self.send_outputs(&mut outputs_buf)
1189                .await
1190                .map_err(TaskError::wrapped)?;
1191        }
1192
1193        Ok(TaskOutput::Transform)
1194    }
1195
1196    async fn run_concurrently(mut self) -> TaskResult {
1197        let input_rx = self
1198            .input_rx
1199            .take()
1200            .expect("can't run runner twice")
1201            .into_stream()
1202            .filter(move |events| ready(filter_events_type(events, self.input_type)));
1203
1204        let mut input_rx =
1205            super::ready_arrays::ReadyArrays::with_capacity(input_rx, READY_ARRAY_CAPACITY);
1206
1207        let mut in_flight = FuturesOrdered::new();
1208        let mut shutting_down = false;
1209
1210        self.timer_tx.try_send_start_wait();
1211        loop {
1212            tokio::select! {
1213                biased;
1214
1215                result = in_flight.next(), if !in_flight.is_empty() => {
1216                    match result {
1217                        Some(Ok(mut outputs_buf)) => {
1218                            self.send_outputs(&mut outputs_buf).await
1219                                .map_err(TaskError::wrapped)?;
1220                        }
1221                        _ => unreachable!("join error or bad poll"),
1222                    }
1223                }
1224
1225                input_arrays = input_rx.next(), if in_flight.len() < *TRANSFORM_CONCURRENCY_LIMIT && !shutting_down => {
1226                    match input_arrays {
1227                        Some(input_arrays) => {
1228                            let mut len = 0;
1229                            for events in &input_arrays {
1230                                self.on_events_received(events);
1231                                len += events.len();
1232                            }
1233
1234                            let mut t = self.transform.clone();
1235                            let mut outputs_buf = self.outputs.new_buf_with_capacity(len);
1236                            let task = tokio::spawn(async move {
1237                                for events in input_arrays {
1238                                    t.transform_all(events, &mut outputs_buf);
1239                                }
1240                                outputs_buf
1241                            }.in_current_span());
1242                            in_flight.push_back(task);
1243                        }
1244                        None => {
1245                            shutting_down = true;
1246                            continue
1247                        }
1248                    }
1249                }
1250
1251                else => {
1252                    if shutting_down {
1253                        break
1254                    }
1255                }
1256            }
1257        }
1258
1259        Ok(TaskOutput::Transform)
1260    }
1261}