vector/topology/
builder.rs

1use std::{
2    collections::HashMap,
3    future::ready,
4    num::NonZeroUsize,
5    sync::{Arc, LazyLock, Mutex},
6    time::Instant,
7};
8
9use futures::{FutureExt, StreamExt, TryStreamExt, stream::FuturesOrdered};
10use futures_util::stream::FuturesUnordered;
11use metrics::gauge;
12use stream_cancel::{StreamExt as StreamCancelExt, Trigger, Tripwire};
13use tokio::{
14    select,
15    sync::{mpsc::UnboundedSender, oneshot},
16    time::timeout,
17};
18use tracing::Instrument;
19use vector_lib::{
20    EstimatedJsonEncodedSizeOf,
21    buffers::{
22        BufferType, WhenFull,
23        topology::{
24            builder::TopologyBuilder,
25            channel::{BufferReceiver, BufferSender},
26        },
27    },
28    config::LogNamespace,
29    internal_event::{self, CountByteSize, EventsSent, InternalEventHandle as _, Registered},
30    schema::Definition,
31    transform::update_runtime_schema_definition,
32};
33
34use super::{
35    BuiltBuffer, ConfigDiff,
36    fanout::{self, Fanout},
37    schema,
38    task::{Task, TaskOutput, TaskResult},
39};
40use crate::{
41    SourceSender,
42    config::{
43        ComponentKey, Config, DataType, EnrichmentTableConfig, Input, Inputs, OutputId,
44        ProxyConfig, SinkContext, SourceContext, TransformContext, TransformOuter, TransformOutput,
45    },
46    event::{EventArray, EventContainer},
47    extra_context::ExtraContext,
48    internal_events::EventsReceived,
49    shutdown::SourceShutdownCoordinator,
50    source_sender::{CHUNK_SIZE, SourceSenderItem},
51    spawn_named,
52    topology::task::TaskError,
53    transforms::{SyncTransform, TaskTransform, Transform, TransformOutputs, TransformOutputsBuf},
54    utilization::{UtilizationComponentSender, UtilizationEmitter, UtilizationRegistry, wrap},
55};
56
57static ENRICHMENT_TABLES: LazyLock<vector_lib::enrichment::TableRegistry> =
58    LazyLock::new(vector_lib::enrichment::TableRegistry::default);
59
60pub(crate) static SOURCE_SENDER_BUFFER_SIZE: LazyLock<usize> =
61    LazyLock::new(|| *TRANSFORM_CONCURRENCY_LIMIT * CHUNK_SIZE);
62
63const READY_ARRAY_CAPACITY: NonZeroUsize = NonZeroUsize::new(CHUNK_SIZE * 4).unwrap();
64pub(crate) const TOPOLOGY_BUFFER_SIZE: NonZeroUsize = NonZeroUsize::new(100).unwrap();
65
66static TRANSFORM_CONCURRENCY_LIMIT: LazyLock<usize> = LazyLock::new(|| {
67    crate::app::worker_threads()
68        .map(std::num::NonZeroUsize::get)
69        .unwrap_or_else(crate::num_threads)
70});
71
72const INTERNAL_SOURCES: [&str; 2] = ["internal_logs", "internal_metrics"];
73
74struct Builder<'a> {
75    config: &'a super::Config,
76    diff: &'a ConfigDiff,
77    shutdown_coordinator: SourceShutdownCoordinator,
78    errors: Vec<String>,
79    outputs: HashMap<OutputId, UnboundedSender<fanout::ControlMessage>>,
80    tasks: HashMap<ComponentKey, Task>,
81    buffers: HashMap<ComponentKey, BuiltBuffer>,
82    inputs: HashMap<ComponentKey, (BufferSender<EventArray>, Inputs<OutputId>)>,
83    healthchecks: HashMap<ComponentKey, Task>,
84    detach_triggers: HashMap<ComponentKey, Trigger>,
85    extra_context: ExtraContext,
86    utilization_emitter: Option<UtilizationEmitter>,
87    utilization_registry: UtilizationRegistry,
88}
89
90impl<'a> Builder<'a> {
91    fn new(
92        config: &'a super::Config,
93        diff: &'a ConfigDiff,
94        buffers: HashMap<ComponentKey, BuiltBuffer>,
95        extra_context: ExtraContext,
96        utilization_registry: Option<UtilizationRegistry>,
97    ) -> Self {
98        // If registry is not passed, we need to build a whole new utilization emitter + registry
99        // Otherwise, we just store the registry and reuse it for this build
100        let (emitter, registry) = if let Some(registry) = utilization_registry {
101            (None, registry)
102        } else {
103            let (emitter, registry) = UtilizationEmitter::new();
104            (Some(emitter), registry)
105        };
106        Self {
107            config,
108            diff,
109            buffers,
110            shutdown_coordinator: SourceShutdownCoordinator::default(),
111            errors: vec![],
112            outputs: HashMap::new(),
113            tasks: HashMap::new(),
114            inputs: HashMap::new(),
115            healthchecks: HashMap::new(),
116            detach_triggers: HashMap::new(),
117            extra_context,
118            utilization_emitter: emitter,
119            utilization_registry: registry,
120        }
121    }
122
123    /// Builds the new pieces of the topology found in `self.diff`.
124    async fn build(mut self) -> Result<TopologyPieces, Vec<String>> {
125        let enrichment_tables = self.load_enrichment_tables().await;
126        let source_tasks = self.build_sources(enrichment_tables).await;
127        self.build_transforms(enrichment_tables).await;
128        self.build_sinks(enrichment_tables).await;
129
130        // We should have all the data for the enrichment tables loaded now, so switch them over to
131        // readonly.
132        enrichment_tables.finish_load();
133
134        if self.errors.is_empty() {
135            Ok(TopologyPieces {
136                inputs: self.inputs,
137                outputs: Self::finalize_outputs(self.outputs),
138                tasks: self.tasks,
139                source_tasks,
140                healthchecks: self.healthchecks,
141                shutdown_coordinator: self.shutdown_coordinator,
142                detach_triggers: self.detach_triggers,
143                utilization: self
144                    .utilization_emitter
145                    .map(|e| (e, self.utilization_registry)),
146            })
147        } else {
148            Err(self.errors)
149        }
150    }
151
152    fn finalize_outputs(
153        outputs: HashMap<OutputId, UnboundedSender<fanout::ControlMessage>>,
154    ) -> HashMap<ComponentKey, HashMap<Option<String>, UnboundedSender<fanout::ControlMessage>>>
155    {
156        let mut finalized_outputs = HashMap::new();
157        for (id, output) in outputs {
158            let entry = finalized_outputs
159                .entry(id.component)
160                .or_insert_with(HashMap::new);
161            entry.insert(id.port, output);
162        }
163
164        finalized_outputs
165    }
166
167    /// Loads, or reloads the enrichment tables.
168    /// The tables are stored in the `ENRICHMENT_TABLES` global variable.
169    async fn load_enrichment_tables(&mut self) -> &'static vector_lib::enrichment::TableRegistry {
170        let mut enrichment_tables = HashMap::new();
171
172        // Build enrichment tables
173        'tables: for (name, table_outer) in self.config.enrichment_tables.iter() {
174            let table_name = name.to_string();
175            if ENRICHMENT_TABLES.needs_reload(&table_name) {
176                let indexes = if !self.diff.enrichment_tables.is_added(name) {
177                    // If this is an existing enrichment table, we need to store the indexes to reapply
178                    // them again post load.
179                    Some(ENRICHMENT_TABLES.index_fields(&table_name))
180                } else {
181                    None
182                };
183
184                let mut table = match table_outer.inner.build(&self.config.global).await {
185                    Ok(table) => table,
186                    Err(error) => {
187                        self.errors
188                            .push(format!("Enrichment Table \"{name}\": {error}"));
189                        continue;
190                    }
191                };
192
193                if let Some(indexes) = indexes {
194                    for (case, index) in indexes {
195                        match table
196                            .add_index(case, &index.iter().map(|s| s.as_ref()).collect::<Vec<_>>())
197                        {
198                            Ok(_) => (),
199                            Err(error) => {
200                                // If there is an error adding an index we do not want to use the reloaded
201                                // data, the previously loaded data will still need to be used.
202                                // Just report the error and continue.
203                                error!(message = "Unable to add index to reloaded enrichment table.",
204                                    table = ?name.to_string(),
205                                    %error);
206                                continue 'tables;
207                            }
208                        }
209                    }
210                }
211
212                enrichment_tables.insert(table_name, table);
213            }
214        }
215
216        ENRICHMENT_TABLES.load(enrichment_tables);
217
218        &ENRICHMENT_TABLES
219    }
220
221    async fn build_sources(
222        &mut self,
223        enrichment_tables: &vector_lib::enrichment::TableRegistry,
224    ) -> HashMap<ComponentKey, Task> {
225        let mut source_tasks = HashMap::new();
226
227        let table_sources = self
228            .config
229            .enrichment_tables
230            .iter()
231            .filter_map(|(key, table)| table.as_source(key))
232            .collect::<Vec<_>>();
233        for (key, source) in self
234            .config
235            .sources()
236            .filter(|(key, _)| self.diff.sources.contains_new(key))
237            .chain(
238                table_sources
239                    .iter()
240                    .map(|(key, source)| (key, source))
241                    .filter(|(key, _)| self.diff.enrichment_tables.contains_new(key)),
242            )
243        {
244            debug!(component_id = %key, "Building new source.");
245
246            let typetag = source.inner.get_component_name();
247            let source_outputs = source.inner.outputs(self.config.schema.log_namespace());
248
249            let span = error_span!(
250                "source",
251                component_kind = "source",
252                component_id = %key.id(),
253                component_type = %source.inner.get_component_name(),
254            );
255            let _entered_span = span.enter();
256
257            let task_name = format!(
258                ">> {} ({}, pump) >>",
259                source.inner.get_component_name(),
260                key.id()
261            );
262
263            let mut builder = SourceSender::builder().with_buffer(*SOURCE_SENDER_BUFFER_SIZE);
264            let mut pumps = Vec::new();
265            let mut controls = HashMap::new();
266            let mut schema_definitions = HashMap::with_capacity(source_outputs.len());
267
268            for output in source_outputs.into_iter() {
269                let mut rx = builder.add_source_output(output.clone(), key.clone());
270
271                let (mut fanout, control) = Fanout::new();
272                let source_type = source.inner.get_component_name();
273                let source = Arc::new(key.clone());
274
275                let pump = async move {
276                    debug!("Source pump starting.");
277
278                    while let Some(SourceSenderItem {
279                        events: mut array,
280                        send_reference,
281                    }) = rx.next().await
282                    {
283                        array.set_output_id(&source);
284                        array.set_source_type(source_type);
285                        fanout
286                            .send(array, Some(send_reference))
287                            .await
288                            .map_err(|e| {
289                                debug!("Source pump finished with an error.");
290                                TaskError::wrapped(e)
291                            })?;
292                    }
293
294                    debug!("Source pump finished normally.");
295                    Ok(TaskOutput::Source)
296                };
297
298                pumps.push(pump.instrument(span.clone()));
299                controls.insert(
300                    OutputId {
301                        component: key.clone(),
302                        port: output.port.clone(),
303                    },
304                    control,
305                );
306
307                let port = output.port.clone();
308                if let Some(definition) = output.schema_definition(self.config.schema.enabled) {
309                    schema_definitions.insert(port, definition);
310                }
311            }
312
313            let (pump_error_tx, mut pump_error_rx) = oneshot::channel();
314            let pump = async move {
315                debug!("Source pump supervisor starting.");
316
317                // Spawn all of the per-output pumps and then await their completion.
318                //
319                // If any of the pumps complete with an error, or panic/are cancelled, we return
320                // immediately.
321                let mut handles = FuturesUnordered::new();
322                for pump in pumps {
323                    handles.push(spawn_named(pump, task_name.as_ref()));
324                }
325
326                let mut had_pump_error = false;
327                while let Some(output) = handles.try_next().await? {
328                    if let Err(e) = output {
329                        // Immediately send the error to the source's wrapper future, but ignore any
330                        // errors during the send, since nested errors wouldn't make any sense here.
331                        _ = pump_error_tx.send(e);
332                        had_pump_error = true;
333                        break;
334                    }
335                }
336
337                if had_pump_error {
338                    debug!("Source pump supervisor task finished with an error.");
339                } else {
340                    debug!("Source pump supervisor task finished normally.");
341                }
342                Ok(TaskOutput::Source)
343            };
344            let pump = Task::new(key.clone(), typetag, pump);
345
346            let pipeline = builder.build();
347
348            let (shutdown_signal, force_shutdown_tripwire) = self
349                .shutdown_coordinator
350                .register_source(key, INTERNAL_SOURCES.contains(&typetag));
351
352            let context = SourceContext {
353                key: key.clone(),
354                globals: self.config.global.clone(),
355                enrichment_tables: enrichment_tables.clone(),
356                shutdown: shutdown_signal,
357                out: pipeline,
358                proxy: ProxyConfig::merge_with_env(&self.config.global.proxy, &source.proxy),
359                acknowledgements: source.sink_acknowledgements,
360                schema_definitions,
361                schema: self.config.schema,
362                extra_context: self.extra_context.clone(),
363            };
364            let source = source.inner.build(context).await;
365            let server = match source {
366                Err(error) => {
367                    self.errors.push(format!("Source \"{key}\": {error}"));
368                    continue;
369                }
370                Ok(server) => server,
371            };
372
373            // Build a wrapper future that drives the actual source future, but returns early if we've
374            // been signalled to forcefully shutdown, or if the source pump encounters an error.
375            //
376            // The forceful shutdown will only resolve if the source itself doesn't shutdown gracefully
377            // within the allotted time window. This can occur normally for certain sources, like stdin,
378            // where the I/O is blocking (in a separate thread) and won't wake up to check if it's time
379            // to shutdown unless some input is given.
380            let server = async move {
381                debug!("Source starting.");
382
383                let mut result = select! {
384                    biased;
385
386                    // We've been told that we must forcefully shut down.
387                    _ = force_shutdown_tripwire => Ok(()),
388
389                    // The source pump encountered an error, which we're now bubbling up here to stop
390                    // the source as well, since the source running makes no sense without the pump.
391                    //
392                    // We only match receiving a message, not the error of the sender being dropped,
393                    // just to keep things simpler.
394                    Ok(e) = &mut pump_error_rx => Err(e),
395
396                    // The source finished normally.
397                    result = server => result.map_err(|_| TaskError::Opaque),
398                };
399
400                // Even though we already tried to receive any pump task error above, we may have exited
401                // on the source itself returning an error due to task scheduling, where the pump task
402                // encountered an error, sent it over the oneshot, but we were polling the source
403                // already and hit an error trying to send to the now-shutdown pump task.
404                //
405                // Since the error from the source is opaque at the moment (i.e. `()`), we try a final
406                // time to see if the pump task encountered an error, using _that_ instead if so, to
407                // propagate the true error that caused the source to have to stop.
408                if let Ok(e) = pump_error_rx.try_recv() {
409                    result = Err(e);
410                }
411
412                match result {
413                    Ok(()) => {
414                        debug!("Source finished normally.");
415                        Ok(TaskOutput::Source)
416                    }
417                    Err(e) => {
418                        debug!("Source finished with an error.");
419                        Err(e)
420                    }
421                }
422            };
423            let server = Task::new(key.clone(), typetag, server);
424
425            self.outputs.extend(controls);
426            self.tasks.insert(key.clone(), pump);
427            source_tasks.insert(key.clone(), server);
428        }
429
430        source_tasks
431    }
432
433    async fn build_transforms(
434        &mut self,
435        enrichment_tables: &vector_lib::enrichment::TableRegistry,
436    ) {
437        let mut definition_cache = HashMap::default();
438
439        for (key, transform) in self
440            .config
441            .transforms()
442            .filter(|(key, _)| self.diff.transforms.contains_new(key))
443        {
444            debug!(component_id = %key, "Building new transform.");
445
446            let input_definitions = match schema::input_definitions(
447                &transform.inputs,
448                self.config,
449                enrichment_tables.clone(),
450                &mut definition_cache,
451            ) {
452                Ok(definitions) => definitions,
453                Err(_) => {
454                    // We have received an error whilst retrieving the definitions,
455                    // there is no point in continuing.
456
457                    return;
458                }
459            };
460
461            let merged_definition: Definition = input_definitions
462                .iter()
463                .map(|(_output_id, definition)| definition.clone())
464                .reduce(Definition::merge)
465                // We may not have any definitions if all the inputs are from metrics sources.
466                .unwrap_or_else(Definition::any);
467
468            let span = error_span!(
469                "transform",
470                component_kind = "transform",
471                component_id = %key.id(),
472                component_type = %transform.inner.get_component_name(),
473            );
474
475            // Create a map of the outputs to the list of possible definitions from those outputs.
476            let schema_definitions = transform
477                .inner
478                .outputs(
479                    enrichment_tables.clone(),
480                    &input_definitions,
481                    self.config.schema.log_namespace(),
482                )
483                .into_iter()
484                .map(|output| {
485                    let definitions = output.schema_definitions(self.config.schema.enabled);
486                    (output.port, definitions)
487                })
488                .collect::<HashMap<_, _>>();
489
490            let context = TransformContext {
491                key: Some(key.clone()),
492                globals: self.config.global.clone(),
493                enrichment_tables: enrichment_tables.clone(),
494                schema_definitions,
495                merged_schema_definition: merged_definition.clone(),
496                schema: self.config.schema,
497                extra_context: self.extra_context.clone(),
498            };
499
500            let node = TransformNode::from_parts(
501                key.clone(),
502                enrichment_tables.clone(),
503                transform,
504                &input_definitions,
505                self.config.schema.log_namespace(),
506            );
507
508            let transform = match transform
509                .inner
510                .build(&context)
511                .instrument(span.clone())
512                .await
513            {
514                Err(error) => {
515                    self.errors.push(format!("Transform \"{key}\": {error}"));
516                    continue;
517                }
518                Ok(transform) => transform,
519            };
520
521            let (input_tx, input_rx) =
522                TopologyBuilder::standalone_memory(TOPOLOGY_BUFFER_SIZE, WhenFull::Block, &span)
523                    .await;
524
525            self.inputs
526                .insert(key.clone(), (input_tx, node.inputs.clone()));
527
528            let (transform_task, transform_outputs) = {
529                let _span = span.enter();
530                build_transform(transform, node, input_rx, &self.utilization_registry)
531            };
532
533            self.outputs.extend(transform_outputs);
534            self.tasks.insert(key.clone(), transform_task);
535        }
536    }
537
538    async fn build_sinks(&mut self, enrichment_tables: &vector_lib::enrichment::TableRegistry) {
539        let table_sinks = self
540            .config
541            .enrichment_tables
542            .iter()
543            .filter_map(|(key, table)| table.as_sink(key))
544            .collect::<Vec<_>>();
545        for (key, sink) in self
546            .config
547            .sinks()
548            .filter(|(key, _)| self.diff.sinks.contains_new(key))
549            .chain(
550                table_sinks
551                    .iter()
552                    .map(|(key, sink)| (key, sink))
553                    .filter(|(key, _)| self.diff.enrichment_tables.contains_new(key)),
554            )
555        {
556            debug!(component_id = %key, "Building new sink.");
557
558            let sink_inputs = &sink.inputs;
559            let healthcheck = sink.healthcheck();
560            let enable_healthcheck = healthcheck.enabled && self.config.healthchecks.enabled;
561            let healthcheck_timeout = healthcheck.timeout;
562
563            let typetag = sink.inner.get_component_name();
564            let input_type = sink.inner.input().data_type();
565
566            let span = error_span!(
567                "sink",
568                component_kind = "sink",
569                component_id = %key.id(),
570                component_type = %sink.inner.get_component_name(),
571            );
572            let _entered_span = span.enter();
573
574            // At this point, we've validated that all transforms are valid, including any
575            // transform that mutates the schema provided by their sources. We can now validate the
576            // schema expectations of each individual sink.
577            if let Err(mut err) = schema::validate_sink_expectations(
578                key,
579                sink,
580                self.config,
581                enrichment_tables.clone(),
582            ) {
583                self.errors.append(&mut err);
584            };
585
586            let (tx, rx) = match self.buffers.remove(key) {
587                Some(buffer) => buffer,
588                _ => {
589                    let buffer_type =
590                        match sink.buffer.stages().first().expect("cant ever be empty") {
591                            BufferType::Memory { .. } => "memory",
592                            BufferType::DiskV2 { .. } => "disk",
593                        };
594                    let buffer_span = error_span!("sink", buffer_type);
595                    let buffer = sink
596                        .buffer
597                        .build(
598                            self.config.global.data_dir.clone(),
599                            key.to_string(),
600                            buffer_span,
601                        )
602                        .await;
603                    match buffer {
604                        Err(error) => {
605                            self.errors.push(format!("Sink \"{key}\": {error}"));
606                            continue;
607                        }
608                        Ok((tx, rx)) => (tx, Arc::new(Mutex::new(Some(rx.into_stream())))),
609                    }
610                }
611            };
612
613            let cx = SinkContext {
614                healthcheck,
615                globals: self.config.global.clone(),
616                enrichment_tables: enrichment_tables.clone(),
617                proxy: ProxyConfig::merge_with_env(&self.config.global.proxy, sink.proxy()),
618                schema: self.config.schema,
619                app_name: crate::get_app_name().to_string(),
620                app_name_slug: crate::get_slugified_app_name(),
621                extra_context: self.extra_context.clone(),
622            };
623
624            let (sink, healthcheck) = match sink.inner.build(cx).await {
625                Err(error) => {
626                    self.errors.push(format!("Sink \"{key}\": {error}"));
627                    continue;
628                }
629                Ok(built) => built,
630            };
631
632            let (trigger, tripwire) = Tripwire::new();
633
634            let utilization_sender = self
635                .utilization_registry
636                .add_component(key.clone(), gauge!("utilization"));
637            let component_key = key.clone();
638            let sink = async move {
639                debug!("Sink starting.");
640
641                // Why is this Arc<Mutex<Option<_>>> needed you ask.
642                // In case when this function build_pieces errors
643                // this future won't be run so this rx won't be taken
644                // which will enable us to reuse rx to rebuild
645                // old configuration by passing this Arc<Mutex<Option<_>>>
646                // yet again.
647                let rx = rx
648                    .lock()
649                    .unwrap()
650                    .take()
651                    .expect("Task started but input has been taken.");
652
653                let mut rx = wrap(utilization_sender, component_key.clone(), rx);
654
655                let events_received = register!(EventsReceived);
656                sink.run(
657                    rx.by_ref()
658                        .filter(|events: &EventArray| ready(filter_events_type(events, input_type)))
659                        .inspect(|events| {
660                            events_received.emit(CountByteSize(
661                                events.len(),
662                                events.estimated_json_encoded_size_of(),
663                            ))
664                        })
665                        .take_until_if(tripwire),
666                )
667                .await
668                .map(|_| {
669                    debug!("Sink finished normally.");
670                    TaskOutput::Sink(rx)
671                })
672                .map_err(|_| {
673                    debug!("Sink finished with an error.");
674                    TaskError::Opaque
675                })
676            };
677
678            let task = Task::new(key.clone(), typetag, sink);
679
680            let component_key = key.clone();
681            let healthcheck_task = async move {
682                if enable_healthcheck {
683                    timeout(healthcheck_timeout, healthcheck)
684                        .map(|result| match result {
685                            Ok(Ok(_)) => {
686                                info!("Healthcheck passed.");
687                                Ok(TaskOutput::Healthcheck)
688                            }
689                            Ok(Err(error)) => {
690                                error!(
691                                    msg = "Healthcheck failed.",
692                                    %error,
693                                    component_kind = "sink",
694                                    component_type = typetag,
695                                    component_id = %component_key.id(),
696                                );
697                                Err(TaskError::wrapped(error))
698                            }
699                            Err(e) => {
700                                error!(
701                                    msg = "Healthcheck timed out.",
702                                    component_kind = "sink",
703                                    component_type = typetag,
704                                    component_id = %component_key.id(),
705                                );
706                                Err(TaskError::wrapped(Box::new(e)))
707                            }
708                        })
709                        .await
710                } else {
711                    info!("Healthcheck disabled.");
712                    Ok(TaskOutput::Healthcheck)
713                }
714            };
715
716            let healthcheck_task = Task::new(key.clone(), typetag, healthcheck_task);
717
718            self.inputs.insert(key.clone(), (tx, sink_inputs.clone()));
719            self.healthchecks.insert(key.clone(), healthcheck_task);
720            self.tasks.insert(key.clone(), task);
721            self.detach_triggers.insert(key.clone(), trigger);
722        }
723    }
724}
725
726pub async fn reload_enrichment_tables(config: &Config) {
727    let mut enrichment_tables = HashMap::new();
728    // Build enrichment tables
729    'tables: for (name, table_outer) in config.enrichment_tables.iter() {
730        let table_name = name.to_string();
731        if ENRICHMENT_TABLES.needs_reload(&table_name) {
732            let indexes = Some(ENRICHMENT_TABLES.index_fields(&table_name));
733
734            let mut table = match table_outer.inner.build(&config.global).await {
735                Ok(table) => table,
736                Err(error) => {
737                    error!("Enrichment table \"{name}\" reload failed: {error}");
738                    continue;
739                }
740            };
741
742            if let Some(indexes) = indexes {
743                for (case, index) in indexes {
744                    match table
745                        .add_index(case, &index.iter().map(|s| s.as_ref()).collect::<Vec<_>>())
746                    {
747                        Ok(_) => (),
748                        Err(error) => {
749                            // If there is an error adding an index we do not want to use the reloaded
750                            // data, the previously loaded data will still need to be used.
751                            // Just report the error and continue.
752                            error!(
753                                message = "Unable to add index to reloaded enrichment table.",
754                                table = ?name.to_string(),
755                                %error
756                            );
757                            continue 'tables;
758                        }
759                    }
760                }
761            }
762
763            enrichment_tables.insert(table_name, table);
764        }
765    }
766
767    ENRICHMENT_TABLES.load(enrichment_tables);
768    ENRICHMENT_TABLES.finish_load();
769}
770
771pub struct TopologyPieces {
772    pub(super) inputs: HashMap<ComponentKey, (BufferSender<EventArray>, Inputs<OutputId>)>,
773    pub(crate) outputs: HashMap<ComponentKey, HashMap<Option<String>, fanout::ControlChannel>>,
774    pub(super) tasks: HashMap<ComponentKey, Task>,
775    pub(crate) source_tasks: HashMap<ComponentKey, Task>,
776    pub(super) healthchecks: HashMap<ComponentKey, Task>,
777    pub(crate) shutdown_coordinator: SourceShutdownCoordinator,
778    pub(crate) detach_triggers: HashMap<ComponentKey, Trigger>,
779    pub(crate) utilization: Option<(UtilizationEmitter, UtilizationRegistry)>,
780}
781
782/// Builder for constructing TopologyPieces with a fluent API.
783///
784/// # Examples
785///
786/// ```ignore
787/// let pieces = TopologyPiecesBuilder::new(&config, &diff)
788///     .with_buffers(buffers)
789///     .with_extra_context(extra_context)
790///     .build()
791///     .await?;
792/// ```
793pub struct TopologyPiecesBuilder<'a> {
794    config: &'a Config,
795    diff: &'a ConfigDiff,
796    buffers: HashMap<ComponentKey, BuiltBuffer>,
797    extra_context: ExtraContext,
798    utilization_registry: Option<UtilizationRegistry>,
799}
800
801impl<'a> TopologyPiecesBuilder<'a> {
802    /// Creates a new builder with required parameters.
803    pub fn new(config: &'a Config, diff: &'a ConfigDiff) -> Self {
804        Self {
805            config,
806            diff,
807            buffers: HashMap::new(),
808            extra_context: ExtraContext::default(),
809            utilization_registry: None,
810        }
811    }
812
813    /// Sets the buffers for the topology.
814    pub fn with_buffers(mut self, buffers: HashMap<ComponentKey, BuiltBuffer>) -> Self {
815        self.buffers = buffers;
816        self
817    }
818
819    /// Sets the extra context for the topology.
820    pub fn with_extra_context(mut self, extra_context: ExtraContext) -> Self {
821        self.extra_context = extra_context;
822        self
823    }
824
825    /// Sets the utilization registry for the topology.
826    pub fn with_utilization_registry(mut self, registry: Option<UtilizationRegistry>) -> Self {
827        self.utilization_registry = registry;
828        self
829    }
830
831    /// Builds the topology pieces, returning errors if any occur.
832    ///
833    /// Use this method when you need to handle errors explicitly,
834    /// such as in tests or validation code.
835    pub async fn build(self) -> Result<TopologyPieces, Vec<String>> {
836        Builder::new(
837            self.config,
838            self.diff,
839            self.buffers,
840            self.extra_context,
841            self.utilization_registry,
842        )
843        .build()
844        .await
845    }
846
847    /// Builds the topology pieces, logging any errors that occur.
848    ///
849    /// Use this method for runtime configuration loading where
850    /// errors should be logged and execution should continue.
851    pub async fn build_or_log_errors(self) -> Option<TopologyPieces> {
852        match self.build().await {
853            Err(errors) => {
854                for error in errors {
855                    error!(message = "Configuration error.", %error, internal_log_rate_limit = false);
856                }
857                None
858            }
859            Ok(new_pieces) => Some(new_pieces),
860        }
861    }
862}
863
864impl TopologyPieces {
865    pub async fn build_or_log_errors(
866        config: &Config,
867        diff: &ConfigDiff,
868        buffers: HashMap<ComponentKey, BuiltBuffer>,
869        extra_context: ExtraContext,
870        utilization_registry: Option<UtilizationRegistry>,
871    ) -> Option<Self> {
872        TopologyPiecesBuilder::new(config, diff)
873            .with_buffers(buffers)
874            .with_extra_context(extra_context)
875            .with_utilization_registry(utilization_registry)
876            .build_or_log_errors()
877            .await
878    }
879
880    /// Builds only the new pieces, and doesn't check their topology.
881    pub async fn build(
882        config: &super::Config,
883        diff: &ConfigDiff,
884        buffers: HashMap<ComponentKey, BuiltBuffer>,
885        extra_context: ExtraContext,
886        utilization_registry: Option<UtilizationRegistry>,
887    ) -> Result<Self, Vec<String>> {
888        TopologyPiecesBuilder::new(config, diff)
889            .with_buffers(buffers)
890            .with_extra_context(extra_context)
891            .with_utilization_registry(utilization_registry)
892            .build()
893            .await
894    }
895}
896
897const fn filter_events_type(events: &EventArray, data_type: DataType) -> bool {
898    match events {
899        EventArray::Logs(_) => data_type.contains(DataType::Log),
900        EventArray::Metrics(_) => data_type.contains(DataType::Metric),
901        EventArray::Traces(_) => data_type.contains(DataType::Trace),
902    }
903}
904
905#[derive(Debug, Clone)]
906struct TransformNode {
907    key: ComponentKey,
908    typetag: &'static str,
909    inputs: Inputs<OutputId>,
910    input_details: Input,
911    outputs: Vec<TransformOutput>,
912    enable_concurrency: bool,
913}
914
915impl TransformNode {
916    pub fn from_parts(
917        key: ComponentKey,
918        enrichment_tables: vector_lib::enrichment::TableRegistry,
919        transform: &TransformOuter<OutputId>,
920        schema_definition: &[(OutputId, Definition)],
921        global_log_namespace: LogNamespace,
922    ) -> Self {
923        Self {
924            key,
925            typetag: transform.inner.get_component_name(),
926            inputs: transform.inputs.clone(),
927            input_details: transform.inner.input(),
928            outputs: transform.inner.outputs(
929                enrichment_tables,
930                schema_definition,
931                global_log_namespace,
932            ),
933            enable_concurrency: transform.inner.enable_concurrency(),
934        }
935    }
936}
937
938fn build_transform(
939    transform: Transform,
940    node: TransformNode,
941    input_rx: BufferReceiver<EventArray>,
942    utilization_registry: &UtilizationRegistry,
943) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
944    match transform {
945        // TODO: avoid the double boxing for function transforms here
946        Transform::Function(t) => {
947            build_sync_transform(Box::new(t), node, input_rx, utilization_registry)
948        }
949        Transform::Synchronous(t) => build_sync_transform(t, node, input_rx, utilization_registry),
950        Transform::Task(t) => build_task_transform(
951            t,
952            input_rx,
953            node.input_details.data_type(),
954            node.typetag,
955            &node.key,
956            &node.outputs,
957            utilization_registry,
958        ),
959    }
960}
961
962fn build_sync_transform(
963    t: Box<dyn SyncTransform>,
964    node: TransformNode,
965    input_rx: BufferReceiver<EventArray>,
966    utilization_registry: &UtilizationRegistry,
967) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
968    let (outputs, controls) = TransformOutputs::new(node.outputs, &node.key);
969
970    let sender = utilization_registry.add_component(node.key.clone(), gauge!("utilization"));
971    let runner = Runner::new(t, input_rx, sender, node.input_details.data_type(), outputs);
972    let transform = if node.enable_concurrency {
973        runner.run_concurrently().boxed()
974    } else {
975        runner.run_inline().boxed()
976    };
977
978    let transform = async move {
979        debug!("Synchronous transform starting.");
980
981        match transform.await {
982            Ok(v) => {
983                debug!("Synchronous transform finished normally.");
984                Ok(v)
985            }
986            Err(e) => {
987                debug!("Synchronous transform finished with an error.");
988                Err(e)
989            }
990        }
991    };
992
993    let mut output_controls = HashMap::new();
994    for (name, control) in controls {
995        let id = name
996            .map(|name| OutputId::from((&node.key, name)))
997            .unwrap_or_else(|| OutputId::from(&node.key));
998        output_controls.insert(id, control);
999    }
1000
1001    let task = Task::new(node.key.clone(), node.typetag, transform);
1002
1003    (task, output_controls)
1004}
1005
1006struct Runner {
1007    transform: Box<dyn SyncTransform>,
1008    input_rx: Option<BufferReceiver<EventArray>>,
1009    input_type: DataType,
1010    outputs: TransformOutputs,
1011    timer_tx: UtilizationComponentSender,
1012    events_received: Registered<EventsReceived>,
1013}
1014
1015impl Runner {
1016    fn new(
1017        transform: Box<dyn SyncTransform>,
1018        input_rx: BufferReceiver<EventArray>,
1019        timer_tx: UtilizationComponentSender,
1020        input_type: DataType,
1021        outputs: TransformOutputs,
1022    ) -> Self {
1023        Self {
1024            transform,
1025            input_rx: Some(input_rx),
1026            input_type,
1027            outputs,
1028            timer_tx,
1029            events_received: register!(EventsReceived),
1030        }
1031    }
1032
1033    fn on_events_received(&mut self, events: &EventArray) {
1034        self.timer_tx.try_send_stop_wait();
1035
1036        self.events_received.emit(CountByteSize(
1037            events.len(),
1038            events.estimated_json_encoded_size_of(),
1039        ));
1040    }
1041
1042    async fn send_outputs(&mut self, outputs_buf: &mut TransformOutputsBuf) -> crate::Result<()> {
1043        self.timer_tx.try_send_start_wait();
1044        self.outputs.send(outputs_buf).await
1045    }
1046
1047    async fn run_inline(mut self) -> TaskResult {
1048        // 128 is an arbitrary, smallish constant
1049        const INLINE_BATCH_SIZE: usize = 128;
1050
1051        let mut outputs_buf = self.outputs.new_buf_with_capacity(INLINE_BATCH_SIZE);
1052
1053        let mut input_rx = self
1054            .input_rx
1055            .take()
1056            .expect("can't run runner twice")
1057            .into_stream()
1058            .filter(move |events| ready(filter_events_type(events, self.input_type)));
1059
1060        self.timer_tx.try_send_start_wait();
1061        while let Some(events) = input_rx.next().await {
1062            self.on_events_received(&events);
1063            self.transform.transform_all(events, &mut outputs_buf);
1064            self.send_outputs(&mut outputs_buf)
1065                .await
1066                .map_err(TaskError::wrapped)?;
1067        }
1068
1069        Ok(TaskOutput::Transform)
1070    }
1071
1072    async fn run_concurrently(mut self) -> TaskResult {
1073        let input_rx = self
1074            .input_rx
1075            .take()
1076            .expect("can't run runner twice")
1077            .into_stream()
1078            .filter(move |events| ready(filter_events_type(events, self.input_type)));
1079
1080        let mut input_rx =
1081            super::ready_arrays::ReadyArrays::with_capacity(input_rx, READY_ARRAY_CAPACITY);
1082
1083        let mut in_flight = FuturesOrdered::new();
1084        let mut shutting_down = false;
1085
1086        self.timer_tx.try_send_start_wait();
1087        loop {
1088            tokio::select! {
1089                biased;
1090
1091                result = in_flight.next(), if !in_flight.is_empty() => {
1092                    match result {
1093                        Some(Ok(outputs_buf)) => {
1094                            let mut outputs_buf: TransformOutputsBuf = outputs_buf;
1095                            self.send_outputs(&mut outputs_buf).await
1096                                .map_err(TaskError::wrapped)?;
1097                        }
1098                        _ => unreachable!("join error or bad poll"),
1099                    }
1100                }
1101
1102                input_arrays = input_rx.next(), if in_flight.len() < *TRANSFORM_CONCURRENCY_LIMIT && !shutting_down => {
1103                    match input_arrays {
1104                        Some(input_arrays) => {
1105                            let mut len = 0;
1106                            for events in &input_arrays {
1107                                self.on_events_received(events);
1108                                len += events.len();
1109                            }
1110
1111                            let mut t = self.transform.clone();
1112                            let mut outputs_buf = self.outputs.new_buf_with_capacity(len);
1113                            let task = tokio::spawn(async move {
1114                                for events in input_arrays {
1115                                    t.transform_all(events, &mut outputs_buf);
1116                                }
1117                                outputs_buf
1118                            }.in_current_span());
1119                            in_flight.push_back(task);
1120                        }
1121                        None => {
1122                            shutting_down = true;
1123                            continue
1124                        }
1125                    }
1126                }
1127
1128                else => {
1129                    if shutting_down {
1130                        break
1131                    }
1132                }
1133            }
1134        }
1135
1136        Ok(TaskOutput::Transform)
1137    }
1138}
1139
1140fn build_task_transform(
1141    t: Box<dyn TaskTransform<EventArray>>,
1142    input_rx: BufferReceiver<EventArray>,
1143    input_type: DataType,
1144    typetag: &str,
1145    key: &ComponentKey,
1146    outputs: &[TransformOutput],
1147    utilization_registry: &UtilizationRegistry,
1148) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
1149    let (mut fanout, control) = Fanout::new();
1150
1151    let sender = utilization_registry.add_component(key.clone(), gauge!("utilization"));
1152    let input_rx = wrap(sender, key.clone(), input_rx.into_stream());
1153
1154    let events_received = register!(EventsReceived);
1155    let filtered = input_rx
1156        .filter(move |events| ready(filter_events_type(events, input_type)))
1157        .inspect(move |events| {
1158            events_received.emit(CountByteSize(
1159                events.len(),
1160                events.estimated_json_encoded_size_of(),
1161            ))
1162        });
1163    let events_sent = register!(EventsSent::from(internal_event::Output(None)));
1164    let output_id = Arc::new(OutputId {
1165        component: key.clone(),
1166        port: None,
1167    });
1168
1169    // Task transforms can only write to the default output, so only a single schema def map is needed
1170    let schema_definition_map = outputs
1171        .iter()
1172        .find(|x| x.port.is_none())
1173        .expect("output for default port required for task transforms")
1174        .log_schema_definitions
1175        .clone()
1176        .into_iter()
1177        .map(|(key, value)| (key, Arc::new(value)))
1178        .collect();
1179
1180    let stream = t
1181        .transform(Box::pin(filtered))
1182        .map(move |mut events| {
1183            for event in events.iter_events_mut() {
1184                update_runtime_schema_definition(event, &output_id, &schema_definition_map);
1185            }
1186            (events, Instant::now())
1187        })
1188        .inspect(move |(events, _): &(EventArray, Instant)| {
1189            events_sent.emit(CountByteSize(
1190                events.len(),
1191                events.estimated_json_encoded_size_of(),
1192            ));
1193        });
1194    let transform = async move {
1195        debug!("Task transform starting.");
1196
1197        match fanout.send_stream(stream).await {
1198            Ok(()) => {
1199                debug!("Task transform finished normally.");
1200                Ok(TaskOutput::Transform)
1201            }
1202            Err(e) => {
1203                debug!("Task transform finished with an error.");
1204                Err(TaskError::wrapped(e))
1205            }
1206        }
1207    }
1208    .boxed();
1209
1210    let mut outputs = HashMap::new();
1211    outputs.insert(OutputId::from(key), control);
1212
1213    let task = Task::new(key.clone(), typetag, transform);
1214
1215    (task, outputs)
1216}