vector/topology/
builder.rs

1use std::{
2    collections::HashMap,
3    future::ready,
4    num::NonZeroUsize,
5    sync::{Arc, LazyLock, Mutex},
6    time::Instant,
7};
8
9use futures::{FutureExt, StreamExt, TryStreamExt, stream::FuturesOrdered};
10use futures_util::stream::FuturesUnordered;
11use metrics::gauge;
12use stream_cancel::{StreamExt as StreamCancelExt, Trigger, Tripwire};
13use tokio::{
14    select,
15    sync::{mpsc::UnboundedSender, oneshot},
16    time::timeout,
17};
18use tracing::Instrument;
19use vector_lib::{
20    EstimatedJsonEncodedSizeOf,
21    buffers::{
22        BufferType, WhenFull,
23        topology::{
24            builder::TopologyBuilder,
25            channel::{BufferReceiver, BufferSender, ChannelMetricMetadata},
26        },
27    },
28    internal_event::{self, CountByteSize, EventsSent, InternalEventHandle as _, Registered},
29    schema::Definition,
30    source_sender::{CHUNK_SIZE, SourceSenderItem},
31    transform::update_runtime_schema_definition,
32};
33use vector_vrl_metrics::MetricsStorage;
34
35use super::{
36    BuiltBuffer, ConfigDiff,
37    fanout::{self, Fanout},
38    schema,
39    task::{Task, TaskOutput, TaskResult},
40};
41use crate::{
42    SourceSender,
43    config::{
44        ComponentKey, Config, DataType, EnrichmentTableConfig, Input, Inputs, OutputId,
45        ProxyConfig, SinkContext, SourceContext, TransformContext, TransformOuter, TransformOutput,
46    },
47    event::{EventArray, EventContainer},
48    extra_context::ExtraContext,
49    internal_events::EventsReceived,
50    shutdown::SourceShutdownCoordinator,
51    spawn_named,
52    topology::task::TaskError,
53    transforms::{SyncTransform, TaskTransform, Transform, TransformOutputs, TransformOutputsBuf},
54    utilization::{UtilizationComponentSender, UtilizationEmitter, UtilizationRegistry, wrap},
55};
56
57static ENRICHMENT_TABLES: LazyLock<vector_lib::enrichment::TableRegistry> =
58    LazyLock::new(vector_lib::enrichment::TableRegistry::default);
59static METRICS_STORAGE: LazyLock<MetricsStorage> = LazyLock::new(MetricsStorage::default);
60
61pub(crate) static SOURCE_SENDER_BUFFER_SIZE: LazyLock<usize> =
62    LazyLock::new(|| *TRANSFORM_CONCURRENCY_LIMIT * CHUNK_SIZE);
63
64const READY_ARRAY_CAPACITY: NonZeroUsize = NonZeroUsize::new(CHUNK_SIZE * 4).unwrap();
65pub(crate) const TOPOLOGY_BUFFER_SIZE: NonZeroUsize = NonZeroUsize::new(100).unwrap();
66const TRANSFORM_CHANNEL_METRIC_PREFIX: &str = "transform_buffer";
67
68static TRANSFORM_CONCURRENCY_LIMIT: LazyLock<usize> = LazyLock::new(|| {
69    crate::app::worker_threads()
70        .map(std::num::NonZeroUsize::get)
71        .unwrap_or_else(crate::num_threads)
72});
73
74const INTERNAL_SOURCES: [&str; 2] = ["internal_logs", "internal_metrics"];
75
76struct Builder<'a> {
77    config: &'a super::Config,
78    diff: &'a ConfigDiff,
79    shutdown_coordinator: SourceShutdownCoordinator,
80    errors: Vec<String>,
81    outputs: HashMap<OutputId, UnboundedSender<fanout::ControlMessage>>,
82    tasks: HashMap<ComponentKey, Task>,
83    buffers: HashMap<ComponentKey, BuiltBuffer>,
84    inputs: HashMap<ComponentKey, (BufferSender<EventArray>, Inputs<OutputId>)>,
85    healthchecks: HashMap<ComponentKey, Task>,
86    detach_triggers: HashMap<ComponentKey, Trigger>,
87    extra_context: ExtraContext,
88    utilization_emitter: Option<UtilizationEmitter>,
89    utilization_registry: UtilizationRegistry,
90}
91
92impl<'a> Builder<'a> {
93    fn new(
94        config: &'a super::Config,
95        diff: &'a ConfigDiff,
96        buffers: HashMap<ComponentKey, BuiltBuffer>,
97        extra_context: ExtraContext,
98        utilization_registry: Option<UtilizationRegistry>,
99    ) -> Self {
100        // If registry is not passed, we need to build a whole new utilization emitter + registry
101        // Otherwise, we just store the registry and reuse it for this build
102        let (emitter, registry) = if let Some(registry) = utilization_registry {
103            (None, registry)
104        } else {
105            let (emitter, registry) = UtilizationEmitter::new();
106            (Some(emitter), registry)
107        };
108        Self {
109            config,
110            diff,
111            buffers,
112            shutdown_coordinator: SourceShutdownCoordinator::default(),
113            errors: vec![],
114            outputs: HashMap::new(),
115            tasks: HashMap::new(),
116            inputs: HashMap::new(),
117            healthchecks: HashMap::new(),
118            detach_triggers: HashMap::new(),
119            extra_context,
120            utilization_emitter: emitter,
121            utilization_registry: registry,
122        }
123    }
124
125    /// Builds the new pieces of the topology found in `self.diff`.
126    async fn build(mut self) -> Result<TopologyPieces, Vec<String>> {
127        let enrichment_tables = self.load_enrichment_tables().await;
128        let source_tasks = self.build_sources(enrichment_tables).await;
129        self.build_transforms(enrichment_tables).await;
130        self.build_sinks(enrichment_tables).await;
131
132        // We should have all the data for the enrichment tables loaded now, so switch them over to
133        // readonly.
134        enrichment_tables.finish_load();
135
136        if self.errors.is_empty() {
137            Ok(TopologyPieces {
138                inputs: self.inputs,
139                outputs: Self::finalize_outputs(self.outputs),
140                tasks: self.tasks,
141                source_tasks,
142                healthchecks: self.healthchecks,
143                shutdown_coordinator: self.shutdown_coordinator,
144                detach_triggers: self.detach_triggers,
145                metrics_storage: METRICS_STORAGE.clone(),
146                utilization: self
147                    .utilization_emitter
148                    .map(|e| (e, self.utilization_registry)),
149            })
150        } else {
151            Err(self.errors)
152        }
153    }
154
155    fn finalize_outputs(
156        outputs: HashMap<OutputId, UnboundedSender<fanout::ControlMessage>>,
157    ) -> HashMap<ComponentKey, HashMap<Option<String>, UnboundedSender<fanout::ControlMessage>>>
158    {
159        let mut finalized_outputs = HashMap::new();
160        for (id, output) in outputs {
161            let entry = finalized_outputs
162                .entry(id.component)
163                .or_insert_with(HashMap::new);
164            entry.insert(id.port, output);
165        }
166
167        finalized_outputs
168    }
169
170    /// Loads, or reloads the enrichment tables.
171    /// The tables are stored in the `ENRICHMENT_TABLES` global variable.
172    async fn load_enrichment_tables(&mut self) -> &'static vector_lib::enrichment::TableRegistry {
173        let mut enrichment_tables = HashMap::new();
174
175        // Build enrichment tables
176        'tables: for (name, table_outer) in self.config.enrichment_tables.iter() {
177            let table_name = name.to_string();
178            if ENRICHMENT_TABLES.needs_reload(&table_name) {
179                let indexes = if !self.diff.enrichment_tables.is_added(name) {
180                    // If this is an existing enrichment table, we need to store the indexes to reapply
181                    // them again post load.
182                    Some(ENRICHMENT_TABLES.index_fields(&table_name))
183                } else {
184                    None
185                };
186
187                let mut table = match table_outer.inner.build(&self.config.global).await {
188                    Ok(table) => table,
189                    Err(error) => {
190                        self.errors
191                            .push(format!("Enrichment Table \"{name}\": {error}"));
192                        continue;
193                    }
194                };
195
196                if let Some(indexes) = indexes {
197                    for (case, index) in indexes {
198                        match table
199                            .add_index(case, &index.iter().map(|s| s.as_ref()).collect::<Vec<_>>())
200                        {
201                            Ok(_) => (),
202                            Err(error) => {
203                                // If there is an error adding an index we do not want to use the reloaded
204                                // data, the previously loaded data will still need to be used.
205                                // Just report the error and continue.
206                                error!(message = "Unable to add index to reloaded enrichment table.",
207                                    table = ?name.to_string(),
208                                    %error);
209                                continue 'tables;
210                            }
211                        }
212                    }
213                }
214
215                enrichment_tables.insert(table_name, table);
216            }
217        }
218
219        ENRICHMENT_TABLES.load(enrichment_tables);
220
221        &ENRICHMENT_TABLES
222    }
223
224    async fn build_sources(
225        &mut self,
226        enrichment_tables: &vector_lib::enrichment::TableRegistry,
227    ) -> HashMap<ComponentKey, Task> {
228        let mut source_tasks = HashMap::new();
229
230        let table_sources = self
231            .config
232            .enrichment_tables
233            .iter()
234            .filter_map(|(key, table)| table.as_source(key))
235            .collect::<Vec<_>>();
236        for (key, source) in self
237            .config
238            .sources()
239            .filter(|(key, _)| self.diff.sources.contains_new(key))
240            .chain(
241                table_sources
242                    .iter()
243                    .map(|(key, source)| (key, source))
244                    .filter(|(key, _)| self.diff.enrichment_tables.contains_new(key)),
245            )
246        {
247            debug!(component_id = %key, "Building new source.");
248
249            let typetag = source.inner.get_component_name();
250            let source_outputs = source.inner.outputs(self.config.schema.log_namespace());
251
252            let span = error_span!(
253                "source",
254                component_kind = "source",
255                component_id = %key.id(),
256                component_type = %source.inner.get_component_name(),
257            );
258            let _entered_span = span.enter();
259
260            let task_name = format!(
261                ">> {} ({}, pump) >>",
262                source.inner.get_component_name(),
263                key.id()
264            );
265
266            let mut builder = SourceSender::builder()
267                .with_buffer(*SOURCE_SENDER_BUFFER_SIZE)
268                .with_timeout(source.inner.send_timeout())
269                .with_ewma_alpha(self.config.global.buffer_utilization_ewma_alpha);
270            let mut pumps = Vec::new();
271            let mut controls = HashMap::new();
272            let mut schema_definitions = HashMap::with_capacity(source_outputs.len());
273
274            for output in source_outputs.into_iter() {
275                let mut rx = builder.add_source_output(output.clone(), key.clone());
276
277                let (mut fanout, control) = Fanout::new();
278                let source_type = source.inner.get_component_name();
279                let source = Arc::new(key.clone());
280
281                let pump = async move {
282                    debug!("Source pump starting.");
283
284                    while let Some(SourceSenderItem {
285                        events: mut array,
286                        send_reference,
287                    }) = rx.next().await
288                    {
289                        array.set_output_id(&source);
290                        array.set_source_type(source_type);
291                        fanout
292                            .send(array, Some(send_reference))
293                            .await
294                            .map_err(|e| {
295                                debug!("Source pump finished with an error.");
296                                TaskError::wrapped(e)
297                            })?;
298                    }
299
300                    debug!("Source pump finished normally.");
301                    Ok(TaskOutput::Source)
302                };
303
304                pumps.push(pump.instrument(span.clone()));
305                controls.insert(
306                    OutputId {
307                        component: key.clone(),
308                        port: output.port.clone(),
309                    },
310                    control,
311                );
312
313                let port = output.port.clone();
314                if let Some(definition) = output.schema_definition(self.config.schema.enabled) {
315                    schema_definitions.insert(port, definition);
316                }
317            }
318
319            let (pump_error_tx, mut pump_error_rx) = oneshot::channel();
320            let pump = async move {
321                debug!("Source pump supervisor starting.");
322
323                // Spawn all of the per-output pumps and then await their completion.
324                //
325                // If any of the pumps complete with an error, or panic/are cancelled, we return
326                // immediately.
327                let mut handles = FuturesUnordered::new();
328                for pump in pumps {
329                    handles.push(spawn_named(pump, task_name.as_ref()));
330                }
331
332                let mut had_pump_error = false;
333                while let Some(output) = handles.try_next().await? {
334                    if let Err(e) = output {
335                        // Immediately send the error to the source's wrapper future, but ignore any
336                        // errors during the send, since nested errors wouldn't make any sense here.
337                        _ = pump_error_tx.send(e);
338                        had_pump_error = true;
339                        break;
340                    }
341                }
342
343                if had_pump_error {
344                    debug!("Source pump supervisor task finished with an error.");
345                } else {
346                    debug!("Source pump supervisor task finished normally.");
347                }
348                Ok(TaskOutput::Source)
349            };
350            let pump = Task::new(key.clone(), typetag, pump);
351
352            let (shutdown_signal, force_shutdown_tripwire) = self
353                .shutdown_coordinator
354                .register_source(key, INTERNAL_SOURCES.contains(&typetag));
355
356            let context = SourceContext {
357                key: key.clone(),
358                globals: self.config.global.clone(),
359                enrichment_tables: enrichment_tables.clone(),
360                metrics_storage: METRICS_STORAGE.clone(),
361                shutdown: shutdown_signal,
362                out: builder.build(),
363                proxy: ProxyConfig::merge_with_env(&self.config.global.proxy, &source.proxy),
364                acknowledgements: source.sink_acknowledgements,
365                schema_definitions,
366                schema: self.config.schema,
367                extra_context: self.extra_context.clone(),
368            };
369            let server = match source.inner.build(context).await {
370                Err(error) => {
371                    self.errors.push(format!("Source \"{key}\": {error}"));
372                    continue;
373                }
374                Ok(server) => server,
375            };
376
377            // Build a wrapper future that drives the actual source future, but returns early if we've
378            // been signalled to forcefully shutdown, or if the source pump encounters an error.
379            //
380            // The forceful shutdown will only resolve if the source itself doesn't shutdown gracefully
381            // within the allotted time window. This can occur normally for certain sources, like stdin,
382            // where the I/O is blocking (in a separate thread) and won't wake up to check if it's time
383            // to shutdown unless some input is given.
384            let server = async move {
385                debug!("Source starting.");
386
387                let mut result = select! {
388                    biased;
389
390                    // We've been told that we must forcefully shut down.
391                    _ = force_shutdown_tripwire => Ok(()),
392
393                    // The source pump encountered an error, which we're now bubbling up here to stop
394                    // the source as well, since the source running makes no sense without the pump.
395                    //
396                    // We only match receiving a message, not the error of the sender being dropped,
397                    // just to keep things simpler.
398                    Ok(e) = &mut pump_error_rx => Err(e),
399
400                    // The source finished normally.
401                    result = server => result.map_err(|_| TaskError::Opaque),
402                };
403
404                // Even though we already tried to receive any pump task error above, we may have exited
405                // on the source itself returning an error due to task scheduling, where the pump task
406                // encountered an error, sent it over the oneshot, but we were polling the source
407                // already and hit an error trying to send to the now-shutdown pump task.
408                //
409                // Since the error from the source is opaque at the moment (i.e. `()`), we try a final
410                // time to see if the pump task encountered an error, using _that_ instead if so, to
411                // propagate the true error that caused the source to have to stop.
412                if let Ok(e) = pump_error_rx.try_recv() {
413                    result = Err(e);
414                }
415
416                match result {
417                    Ok(()) => {
418                        debug!("Source finished normally.");
419                        Ok(TaskOutput::Source)
420                    }
421                    Err(e) => {
422                        debug!("Source finished with an error.");
423                        Err(e)
424                    }
425                }
426            };
427            let server = Task::new(key.clone(), typetag, server);
428
429            self.outputs.extend(controls);
430            self.tasks.insert(key.clone(), pump);
431            source_tasks.insert(key.clone(), server);
432        }
433
434        source_tasks
435    }
436
437    async fn build_transforms(
438        &mut self,
439        enrichment_tables: &vector_lib::enrichment::TableRegistry,
440    ) {
441        let mut definition_cache = HashMap::default();
442
443        for (key, transform) in self
444            .config
445            .transforms()
446            .filter(|(key, _)| self.diff.transforms.contains_new(key))
447        {
448            debug!(component_id = %key, "Building new transform.");
449
450            let input_definitions = match schema::input_definitions(
451                &transform.inputs,
452                self.config,
453                enrichment_tables.clone(),
454                &mut definition_cache,
455            ) {
456                Ok(definitions) => definitions,
457                Err(_) => {
458                    // We have received an error whilst retrieving the definitions,
459                    // there is no point in continuing.
460
461                    return;
462                }
463            };
464
465            let merged_definition: Definition = input_definitions
466                .iter()
467                .map(|(_output_id, definition)| definition.clone())
468                .reduce(Definition::merge)
469                // We may not have any definitions if all the inputs are from metrics sources.
470                .unwrap_or_else(Definition::any);
471
472            let span = error_span!(
473                "transform",
474                component_kind = "transform",
475                component_id = %key.id(),
476                component_type = %transform.inner.get_component_name(),
477            );
478            let _span = span.enter();
479
480            // Create a map of the outputs to the list of possible definitions from those outputs.
481            let schema_definitions = transform
482                .inner
483                .outputs(
484                    &TransformContext {
485                        enrichment_tables: enrichment_tables.clone(),
486                        metrics_storage: METRICS_STORAGE.clone(),
487                        schema: self.config.schema,
488                        ..Default::default()
489                    },
490                    &input_definitions,
491                )
492                .into_iter()
493                .map(|output| {
494                    let definitions = output.schema_definitions(self.config.schema.enabled);
495                    (output.port, definitions)
496                })
497                .collect::<HashMap<_, _>>();
498
499            let context = TransformContext {
500                key: Some(key.clone()),
501                globals: self.config.global.clone(),
502                enrichment_tables: enrichment_tables.clone(),
503                metrics_storage: METRICS_STORAGE.clone(),
504                schema_definitions,
505                merged_schema_definition: merged_definition.clone(),
506                schema: self.config.schema,
507                extra_context: self.extra_context.clone(),
508            };
509
510            let node =
511                TransformNode::from_parts(key.clone(), &context, transform, &input_definitions);
512
513            let transform = match transform
514                .inner
515                .build(&context)
516                .instrument(span.clone())
517                .await
518            {
519                Err(error) => {
520                    self.errors.push(format!("Transform \"{key}\": {error}"));
521                    continue;
522                }
523                Ok(transform) => transform,
524            };
525
526            let metrics = ChannelMetricMetadata::new(TRANSFORM_CHANNEL_METRIC_PREFIX, None);
527            let (input_tx, input_rx) = TopologyBuilder::standalone_memory(
528                TOPOLOGY_BUFFER_SIZE,
529                WhenFull::Block,
530                &span,
531                Some(metrics),
532                self.config.global.buffer_utilization_ewma_alpha,
533            );
534
535            self.inputs
536                .insert(key.clone(), (input_tx, node.inputs.clone()));
537
538            let (transform_task, transform_outputs) =
539                build_transform(transform, node, input_rx, &self.utilization_registry);
540
541            self.outputs.extend(transform_outputs);
542            self.tasks.insert(key.clone(), transform_task);
543        }
544    }
545
546    async fn build_sinks(&mut self, enrichment_tables: &vector_lib::enrichment::TableRegistry) {
547        let table_sinks = self
548            .config
549            .enrichment_tables
550            .iter()
551            .filter_map(|(key, table)| table.as_sink(key))
552            .collect::<Vec<_>>();
553        for (key, sink) in self
554            .config
555            .sinks()
556            .filter(|(key, _)| self.diff.sinks.contains_new(key))
557            .chain(
558                table_sinks
559                    .iter()
560                    .map(|(key, sink)| (key, sink))
561                    .filter(|(key, _)| self.diff.enrichment_tables.contains_new(key)),
562            )
563        {
564            debug!(component_id = %key, "Building new sink.");
565
566            let sink_inputs = &sink.inputs;
567            let healthcheck = sink.healthcheck();
568            let enable_healthcheck = healthcheck.enabled && self.config.healthchecks.enabled;
569            let healthcheck_timeout = healthcheck.timeout;
570
571            let typetag = sink.inner.get_component_name();
572            let input_type = sink.inner.input().data_type();
573
574            let span = error_span!(
575                "sink",
576                component_kind = "sink",
577                component_id = %key.id(),
578                component_type = %sink.inner.get_component_name(),
579            );
580            let _entered_span = span.enter();
581
582            // At this point, we've validated that all transforms are valid, including any
583            // transform that mutates the schema provided by their sources. We can now validate the
584            // schema expectations of each individual sink.
585            if let Err(mut err) = schema::validate_sink_expectations(
586                key,
587                sink,
588                self.config,
589                enrichment_tables.clone(),
590            ) {
591                self.errors.append(&mut err);
592            };
593
594            let (tx, rx) = match self.buffers.remove(key) {
595                Some(buffer) => buffer,
596                _ => {
597                    let buffer_type =
598                        match sink.buffer.stages().first().expect("cant ever be empty") {
599                            BufferType::Memory { .. } => "memory",
600                            BufferType::DiskV2 { .. } => "disk",
601                        };
602                    let buffer_span = error_span!("sink", buffer_type);
603                    let buffer = sink
604                        .buffer
605                        .build(
606                            self.config.global.data_dir.clone(),
607                            key.to_string(),
608                            buffer_span,
609                        )
610                        .await;
611                    match buffer {
612                        Err(error) => {
613                            self.errors.push(format!("Sink \"{key}\": {error}"));
614                            continue;
615                        }
616                        Ok((tx, rx)) => (tx, Arc::new(Mutex::new(Some(rx.into_stream())))),
617                    }
618                }
619            };
620
621            let cx = SinkContext {
622                healthcheck,
623                globals: self.config.global.clone(),
624                enrichment_tables: enrichment_tables.clone(),
625                metrics_storage: METRICS_STORAGE.clone(),
626                proxy: ProxyConfig::merge_with_env(&self.config.global.proxy, sink.proxy()),
627                schema: self.config.schema,
628                app_name: crate::get_app_name().to_string(),
629                app_name_slug: crate::get_slugified_app_name(),
630                extra_context: self.extra_context.clone(),
631            };
632
633            let (sink, healthcheck) = match sink.inner.build(cx).await {
634                Err(error) => {
635                    self.errors.push(format!("Sink \"{key}\": {error}"));
636                    continue;
637                }
638                Ok(built) => built,
639            };
640
641            let (trigger, tripwire) = Tripwire::new();
642
643            let utilization_sender = self
644                .utilization_registry
645                .add_component(key.clone(), gauge!("utilization"));
646            let component_key = key.clone();
647            let sink = async move {
648                debug!("Sink starting.");
649
650                // Why is this Arc<Mutex<Option<_>>> needed you ask.
651                // In case when this function build_pieces errors
652                // this future won't be run so this rx won't be taken
653                // which will enable us to reuse rx to rebuild
654                // old configuration by passing this Arc<Mutex<Option<_>>>
655                // yet again.
656                let rx = rx
657                    .lock()
658                    .unwrap()
659                    .take()
660                    .expect("Task started but input has been taken.");
661
662                let mut rx = wrap(utilization_sender, component_key.clone(), rx);
663
664                let events_received = register!(EventsReceived);
665                sink.run(
666                    rx.by_ref()
667                        .filter(|events: &EventArray| ready(filter_events_type(events, input_type)))
668                        .inspect(|events| {
669                            events_received.emit(CountByteSize(
670                                events.len(),
671                                events.estimated_json_encoded_size_of(),
672                            ))
673                        })
674                        .take_until_if(tripwire),
675                )
676                .await
677                .map(|_| {
678                    debug!("Sink finished normally.");
679                    TaskOutput::Sink(rx)
680                })
681                .map_err(|_| {
682                    debug!("Sink finished with an error.");
683                    TaskError::Opaque
684                })
685            };
686
687            let task = Task::new(key.clone(), typetag, sink);
688
689            let component_key = key.clone();
690            let healthcheck_task = async move {
691                if enable_healthcheck {
692                    timeout(healthcheck_timeout, healthcheck)
693                        .map(|result| match result {
694                            Ok(Ok(_)) => {
695                                info!("Healthcheck passed.");
696                                Ok(TaskOutput::Healthcheck)
697                            }
698                            Ok(Err(error)) => {
699                                error!(
700                                    msg = "Healthcheck failed.",
701                                    %error,
702                                    component_kind = "sink",
703                                    component_type = typetag,
704                                    component_id = %component_key.id(),
705                                );
706                                Err(TaskError::wrapped(error))
707                            }
708                            Err(e) => {
709                                error!(
710                                    msg = "Healthcheck timed out.",
711                                    component_kind = "sink",
712                                    component_type = typetag,
713                                    component_id = %component_key.id(),
714                                );
715                                Err(TaskError::wrapped(Box::new(e)))
716                            }
717                        })
718                        .await
719                } else {
720                    info!("Healthcheck disabled.");
721                    Ok(TaskOutput::Healthcheck)
722                }
723            };
724
725            let healthcheck_task = Task::new(key.clone(), typetag, healthcheck_task);
726
727            self.inputs.insert(key.clone(), (tx, sink_inputs.clone()));
728            self.healthchecks.insert(key.clone(), healthcheck_task);
729            self.tasks.insert(key.clone(), task);
730            self.detach_triggers.insert(key.clone(), trigger);
731        }
732    }
733}
734
735pub async fn reload_enrichment_tables(config: &Config) {
736    let mut enrichment_tables = HashMap::new();
737    // Build enrichment tables
738    'tables: for (name, table_outer) in config.enrichment_tables.iter() {
739        let table_name = name.to_string();
740        if ENRICHMENT_TABLES.needs_reload(&table_name) {
741            let indexes = Some(ENRICHMENT_TABLES.index_fields(&table_name));
742
743            let mut table = match table_outer.inner.build(&config.global).await {
744                Ok(table) => table,
745                Err(error) => {
746                    error!("Enrichment table \"{name}\" reload failed: {error}");
747                    continue;
748                }
749            };
750
751            if let Some(indexes) = indexes {
752                for (case, index) in indexes {
753                    match table
754                        .add_index(case, &index.iter().map(|s| s.as_ref()).collect::<Vec<_>>())
755                    {
756                        Ok(_) => (),
757                        Err(error) => {
758                            // If there is an error adding an index we do not want to use the reloaded
759                            // data, the previously loaded data will still need to be used.
760                            // Just report the error and continue.
761                            error!(
762                                message = "Unable to add index to reloaded enrichment table.",
763                                table = ?name.to_string(),
764                                %error
765                            );
766                            continue 'tables;
767                        }
768                    }
769                }
770            }
771
772            enrichment_tables.insert(table_name, table);
773        }
774    }
775
776    ENRICHMENT_TABLES.load(enrichment_tables);
777    ENRICHMENT_TABLES.finish_load();
778}
779
780pub struct TopologyPieces {
781    pub(super) inputs: HashMap<ComponentKey, (BufferSender<EventArray>, Inputs<OutputId>)>,
782    pub(crate) outputs: HashMap<ComponentKey, HashMap<Option<String>, fanout::ControlChannel>>,
783    pub(super) tasks: HashMap<ComponentKey, Task>,
784    pub(crate) source_tasks: HashMap<ComponentKey, Task>,
785    pub(super) healthchecks: HashMap<ComponentKey, Task>,
786    pub(crate) shutdown_coordinator: SourceShutdownCoordinator,
787    pub(crate) detach_triggers: HashMap<ComponentKey, Trigger>,
788    pub(crate) metrics_storage: MetricsStorage,
789    pub(crate) utilization: Option<(UtilizationEmitter, UtilizationRegistry)>,
790}
791
792/// Builder for constructing TopologyPieces with a fluent API.
793///
794/// # Examples
795///
796/// ```ignore
797/// let pieces = TopologyPiecesBuilder::new(&config, &diff)
798///     .with_buffers(buffers)
799///     .with_extra_context(extra_context)
800///     .build()
801///     .await?;
802/// ```
803pub struct TopologyPiecesBuilder<'a> {
804    config: &'a Config,
805    diff: &'a ConfigDiff,
806    buffers: HashMap<ComponentKey, BuiltBuffer>,
807    extra_context: ExtraContext,
808    utilization_registry: Option<UtilizationRegistry>,
809}
810
811impl<'a> TopologyPiecesBuilder<'a> {
812    /// Creates a new builder with required parameters.
813    pub fn new(config: &'a Config, diff: &'a ConfigDiff) -> Self {
814        Self {
815            config,
816            diff,
817            buffers: HashMap::new(),
818            extra_context: ExtraContext::default(),
819            utilization_registry: None,
820        }
821    }
822
823    /// Sets the buffers for the topology.
824    pub fn with_buffers(mut self, buffers: HashMap<ComponentKey, BuiltBuffer>) -> Self {
825        self.buffers = buffers;
826        self
827    }
828
829    /// Sets the extra context for the topology.
830    pub fn with_extra_context(mut self, extra_context: ExtraContext) -> Self {
831        self.extra_context = extra_context;
832        self
833    }
834
835    /// Sets the utilization registry for the topology.
836    pub fn with_utilization_registry(mut self, registry: Option<UtilizationRegistry>) -> Self {
837        self.utilization_registry = registry;
838        self
839    }
840
841    /// Builds the topology pieces, returning errors if any occur.
842    ///
843    /// Use this method when you need to handle errors explicitly,
844    /// such as in tests or validation code.
845    pub async fn build(self) -> Result<TopologyPieces, Vec<String>> {
846        Builder::new(
847            self.config,
848            self.diff,
849            self.buffers,
850            self.extra_context,
851            self.utilization_registry,
852        )
853        .build()
854        .await
855    }
856
857    /// Builds the topology pieces, logging any errors that occur.
858    ///
859    /// Use this method for runtime configuration loading where
860    /// errors should be logged and execution should continue.
861    pub async fn build_or_log_errors(self) -> Option<TopologyPieces> {
862        match self.build().await {
863            Err(errors) => {
864                for error in errors {
865                    error!(message = "Configuration error.", %error, internal_log_rate_limit = false);
866                }
867                None
868            }
869            Ok(new_pieces) => Some(new_pieces),
870        }
871    }
872}
873
874impl TopologyPieces {
875    pub async fn build_or_log_errors(
876        config: &Config,
877        diff: &ConfigDiff,
878        buffers: HashMap<ComponentKey, BuiltBuffer>,
879        extra_context: ExtraContext,
880        utilization_registry: Option<UtilizationRegistry>,
881    ) -> Option<Self> {
882        TopologyPiecesBuilder::new(config, diff)
883            .with_buffers(buffers)
884            .with_extra_context(extra_context)
885            .with_utilization_registry(utilization_registry)
886            .build_or_log_errors()
887            .await
888    }
889
890    /// Builds only the new pieces, and doesn't check their topology.
891    pub async fn build(
892        config: &super::Config,
893        diff: &ConfigDiff,
894        buffers: HashMap<ComponentKey, BuiltBuffer>,
895        extra_context: ExtraContext,
896        utilization_registry: Option<UtilizationRegistry>,
897    ) -> Result<Self, Vec<String>> {
898        TopologyPiecesBuilder::new(config, diff)
899            .with_buffers(buffers)
900            .with_extra_context(extra_context)
901            .with_utilization_registry(utilization_registry)
902            .build()
903            .await
904    }
905}
906
907const fn filter_events_type(events: &EventArray, data_type: DataType) -> bool {
908    match events {
909        EventArray::Logs(_) => data_type.contains(DataType::Log),
910        EventArray::Metrics(_) => data_type.contains(DataType::Metric),
911        EventArray::Traces(_) => data_type.contains(DataType::Trace),
912    }
913}
914
915#[derive(Debug, Clone)]
916struct TransformNode {
917    key: ComponentKey,
918    typetag: &'static str,
919    inputs: Inputs<OutputId>,
920    input_details: Input,
921    outputs: Vec<TransformOutput>,
922    enable_concurrency: bool,
923}
924
925impl TransformNode {
926    pub fn from_parts(
927        key: ComponentKey,
928        context: &TransformContext,
929        transform: &TransformOuter<OutputId>,
930        schema_definition: &[(OutputId, Definition)],
931    ) -> Self {
932        Self {
933            key,
934            typetag: transform.inner.get_component_name(),
935            inputs: transform.inputs.clone(),
936            input_details: transform.inner.input(),
937            outputs: transform.inner.outputs(context, schema_definition),
938            enable_concurrency: transform.inner.enable_concurrency(),
939        }
940    }
941}
942
943fn build_transform(
944    transform: Transform,
945    node: TransformNode,
946    input_rx: BufferReceiver<EventArray>,
947    utilization_registry: &UtilizationRegistry,
948) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
949    match transform {
950        // TODO: avoid the double boxing for function transforms here
951        Transform::Function(t) => {
952            build_sync_transform(Box::new(t), node, input_rx, utilization_registry)
953        }
954        Transform::Synchronous(t) => build_sync_transform(t, node, input_rx, utilization_registry),
955        Transform::Task(t) => build_task_transform(
956            t,
957            input_rx,
958            node.input_details.data_type(),
959            node.typetag,
960            &node.key,
961            &node.outputs,
962            utilization_registry,
963        ),
964    }
965}
966
967fn build_sync_transform(
968    t: Box<dyn SyncTransform>,
969    node: TransformNode,
970    input_rx: BufferReceiver<EventArray>,
971    utilization_registry: &UtilizationRegistry,
972) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
973    let (outputs, controls) = TransformOutputs::new(node.outputs, &node.key);
974
975    let sender = utilization_registry.add_component(node.key.clone(), gauge!("utilization"));
976    let runner = Runner::new(t, input_rx, sender, node.input_details.data_type(), outputs);
977    let transform = if node.enable_concurrency {
978        runner.run_concurrently().boxed()
979    } else {
980        runner.run_inline().boxed()
981    };
982
983    let transform = async move {
984        debug!("Synchronous transform starting.");
985
986        match transform.await {
987            Ok(v) => {
988                debug!("Synchronous transform finished normally.");
989                Ok(v)
990            }
991            Err(e) => {
992                debug!("Synchronous transform finished with an error.");
993                Err(e)
994            }
995        }
996    };
997
998    let mut output_controls = HashMap::new();
999    for (name, control) in controls {
1000        let id = name
1001            .map(|name| OutputId::from((&node.key, name)))
1002            .unwrap_or_else(|| OutputId::from(&node.key));
1003        output_controls.insert(id, control);
1004    }
1005
1006    let task = Task::new(node.key.clone(), node.typetag, transform);
1007
1008    (task, output_controls)
1009}
1010
1011struct Runner {
1012    transform: Box<dyn SyncTransform>,
1013    input_rx: Option<BufferReceiver<EventArray>>,
1014    input_type: DataType,
1015    outputs: TransformOutputs,
1016    timer_tx: UtilizationComponentSender,
1017    events_received: Registered<EventsReceived>,
1018}
1019
1020impl Runner {
1021    fn new(
1022        transform: Box<dyn SyncTransform>,
1023        input_rx: BufferReceiver<EventArray>,
1024        timer_tx: UtilizationComponentSender,
1025        input_type: DataType,
1026        outputs: TransformOutputs,
1027    ) -> Self {
1028        Self {
1029            transform,
1030            input_rx: Some(input_rx),
1031            input_type,
1032            outputs,
1033            timer_tx,
1034            events_received: register!(EventsReceived),
1035        }
1036    }
1037
1038    fn on_events_received(&mut self, events: &EventArray) {
1039        self.timer_tx.try_send_stop_wait();
1040
1041        self.events_received.emit(CountByteSize(
1042            events.len(),
1043            events.estimated_json_encoded_size_of(),
1044        ));
1045    }
1046
1047    async fn send_outputs(&mut self, outputs_buf: &mut TransformOutputsBuf) -> crate::Result<()> {
1048        self.timer_tx.try_send_start_wait();
1049        self.outputs.send(outputs_buf).await
1050    }
1051
1052    async fn run_inline(mut self) -> TaskResult {
1053        // 128 is an arbitrary, smallish constant
1054        const INLINE_BATCH_SIZE: usize = 128;
1055
1056        let mut outputs_buf = self.outputs.new_buf_with_capacity(INLINE_BATCH_SIZE);
1057
1058        let mut input_rx = self
1059            .input_rx
1060            .take()
1061            .expect("can't run runner twice")
1062            .into_stream()
1063            .filter(move |events| ready(filter_events_type(events, self.input_type)));
1064
1065        self.timer_tx.try_send_start_wait();
1066        while let Some(events) = input_rx.next().await {
1067            self.on_events_received(&events);
1068            self.transform.transform_all(events, &mut outputs_buf);
1069            self.send_outputs(&mut outputs_buf)
1070                .await
1071                .map_err(TaskError::wrapped)?;
1072        }
1073
1074        Ok(TaskOutput::Transform)
1075    }
1076
1077    async fn run_concurrently(mut self) -> TaskResult {
1078        let input_rx = self
1079            .input_rx
1080            .take()
1081            .expect("can't run runner twice")
1082            .into_stream()
1083            .filter(move |events| ready(filter_events_type(events, self.input_type)));
1084
1085        let mut input_rx =
1086            super::ready_arrays::ReadyArrays::with_capacity(input_rx, READY_ARRAY_CAPACITY);
1087
1088        let mut in_flight = FuturesOrdered::new();
1089        let mut shutting_down = false;
1090
1091        self.timer_tx.try_send_start_wait();
1092        loop {
1093            tokio::select! {
1094                biased;
1095
1096                result = in_flight.next(), if !in_flight.is_empty() => {
1097                    match result {
1098                        Some(Ok(outputs_buf)) => {
1099                            let mut outputs_buf: TransformOutputsBuf = outputs_buf;
1100                            self.send_outputs(&mut outputs_buf).await
1101                                .map_err(TaskError::wrapped)?;
1102                        }
1103                        _ => unreachable!("join error or bad poll"),
1104                    }
1105                }
1106
1107                input_arrays = input_rx.next(), if in_flight.len() < *TRANSFORM_CONCURRENCY_LIMIT && !shutting_down => {
1108                    match input_arrays {
1109                        Some(input_arrays) => {
1110                            let mut len = 0;
1111                            for events in &input_arrays {
1112                                self.on_events_received(events);
1113                                len += events.len();
1114                            }
1115
1116                            let mut t = self.transform.clone();
1117                            let mut outputs_buf = self.outputs.new_buf_with_capacity(len);
1118                            let task = tokio::spawn(async move {
1119                                for events in input_arrays {
1120                                    t.transform_all(events, &mut outputs_buf);
1121                                }
1122                                outputs_buf
1123                            }.in_current_span());
1124                            in_flight.push_back(task);
1125                        }
1126                        None => {
1127                            shutting_down = true;
1128                            continue
1129                        }
1130                    }
1131                }
1132
1133                else => {
1134                    if shutting_down {
1135                        break
1136                    }
1137                }
1138            }
1139        }
1140
1141        Ok(TaskOutput::Transform)
1142    }
1143}
1144
1145fn build_task_transform(
1146    t: Box<dyn TaskTransform<EventArray>>,
1147    input_rx: BufferReceiver<EventArray>,
1148    input_type: DataType,
1149    typetag: &str,
1150    key: &ComponentKey,
1151    outputs: &[TransformOutput],
1152    utilization_registry: &UtilizationRegistry,
1153) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
1154    let (mut fanout, control) = Fanout::new();
1155
1156    let sender = utilization_registry.add_component(key.clone(), gauge!("utilization"));
1157    let input_rx = wrap(sender, key.clone(), input_rx.into_stream());
1158
1159    let events_received = register!(EventsReceived);
1160    let filtered = input_rx
1161        .filter(move |events| ready(filter_events_type(events, input_type)))
1162        .inspect(move |events| {
1163            events_received.emit(CountByteSize(
1164                events.len(),
1165                events.estimated_json_encoded_size_of(),
1166            ))
1167        });
1168    let events_sent = register!(EventsSent::from(internal_event::Output(None)));
1169    let output_id = Arc::new(OutputId {
1170        component: key.clone(),
1171        port: None,
1172    });
1173
1174    // Task transforms can only write to the default output, so only a single schema def map is needed
1175    let schema_definition_map = outputs
1176        .iter()
1177        .find(|x| x.port.is_none())
1178        .expect("output for default port required for task transforms")
1179        .log_schema_definitions
1180        .clone()
1181        .into_iter()
1182        .map(|(key, value)| (key, Arc::new(value)))
1183        .collect();
1184
1185    let stream = t
1186        .transform(Box::pin(filtered))
1187        .map(move |mut events| {
1188            for event in events.iter_events_mut() {
1189                update_runtime_schema_definition(event, &output_id, &schema_definition_map);
1190            }
1191            (events, Instant::now())
1192        })
1193        .inspect(move |(events, _): &(EventArray, Instant)| {
1194            events_sent.emit(CountByteSize(
1195                events.len(),
1196                events.estimated_json_encoded_size_of(),
1197            ));
1198        });
1199    let transform = async move {
1200        debug!("Task transform starting.");
1201
1202        match fanout.send_stream(stream).await {
1203            Ok(()) => {
1204                debug!("Task transform finished normally.");
1205                Ok(TaskOutput::Transform)
1206            }
1207            Err(e) => {
1208                debug!("Task transform finished with an error.");
1209                Err(TaskError::wrapped(e))
1210            }
1211        }
1212    }
1213    .boxed();
1214
1215    let mut outputs = HashMap::new();
1216    outputs.insert(OutputId::from(key), control);
1217
1218    let task = Task::new(key.clone(), typetag, transform);
1219
1220    (task, outputs)
1221}