vector/topology/
builder.rs

1use std::{
2    collections::HashMap,
3    future::ready,
4    num::NonZeroUsize,
5    sync::{Arc, LazyLock, Mutex},
6    time::Instant,
7};
8
9use futures::{FutureExt, StreamExt, TryStreamExt, stream::FuturesOrdered};
10use futures_util::stream::FuturesUnordered;
11use metrics::gauge;
12use stream_cancel::{StreamExt as StreamCancelExt, Trigger, Tripwire};
13use tokio::{
14    select,
15    sync::{mpsc::UnboundedSender, oneshot},
16    time::timeout,
17};
18use tracing::Instrument;
19use vector_lib::{
20    EstimatedJsonEncodedSizeOf,
21    buffers::{
22        BufferType, WhenFull,
23        topology::{
24            builder::TopologyBuilder,
25            channel::{BufferReceiver, BufferSender},
26        },
27    },
28    config::LogNamespace,
29    internal_event::{self, CountByteSize, EventsSent, InternalEventHandle as _, Registered},
30    schema::Definition,
31    transform::update_runtime_schema_definition,
32};
33
34use super::{
35    BuiltBuffer, ConfigDiff,
36    fanout::{self, Fanout},
37    schema,
38    task::{Task, TaskOutput, TaskResult},
39};
40use crate::{
41    SourceSender,
42    config::{
43        ComponentKey, Config, DataType, EnrichmentTableConfig, Input, Inputs, OutputId,
44        ProxyConfig, SinkContext, SourceContext, TransformContext, TransformOuter, TransformOutput,
45    },
46    event::{EventArray, EventContainer},
47    extra_context::ExtraContext,
48    internal_events::EventsReceived,
49    shutdown::SourceShutdownCoordinator,
50    source_sender::{CHUNK_SIZE, SourceSenderItem},
51    spawn_named,
52    topology::task::TaskError,
53    transforms::{SyncTransform, TaskTransform, Transform, TransformOutputs, TransformOutputsBuf},
54    utilization::{UtilizationComponentSender, UtilizationEmitter, wrap},
55};
56
57static ENRICHMENT_TABLES: LazyLock<vector_lib::enrichment::TableRegistry> =
58    LazyLock::new(vector_lib::enrichment::TableRegistry::default);
59
60pub(crate) static SOURCE_SENDER_BUFFER_SIZE: LazyLock<usize> =
61    LazyLock::new(|| *TRANSFORM_CONCURRENCY_LIMIT * CHUNK_SIZE);
62
63const READY_ARRAY_CAPACITY: NonZeroUsize = NonZeroUsize::new(CHUNK_SIZE * 4).unwrap();
64pub(crate) const TOPOLOGY_BUFFER_SIZE: NonZeroUsize = NonZeroUsize::new(100).unwrap();
65
66static TRANSFORM_CONCURRENCY_LIMIT: LazyLock<usize> = LazyLock::new(|| {
67    crate::app::worker_threads()
68        .map(std::num::NonZeroUsize::get)
69        .unwrap_or_else(crate::num_threads)
70});
71
72const INTERNAL_SOURCES: [&str; 2] = ["internal_logs", "internal_metrics"];
73
74struct Builder<'a> {
75    config: &'a super::Config,
76    diff: &'a ConfigDiff,
77    shutdown_coordinator: SourceShutdownCoordinator,
78    errors: Vec<String>,
79    outputs: HashMap<OutputId, UnboundedSender<fanout::ControlMessage>>,
80    tasks: HashMap<ComponentKey, Task>,
81    buffers: HashMap<ComponentKey, BuiltBuffer>,
82    inputs: HashMap<ComponentKey, (BufferSender<EventArray>, Inputs<OutputId>)>,
83    healthchecks: HashMap<ComponentKey, Task>,
84    detach_triggers: HashMap<ComponentKey, Trigger>,
85    extra_context: ExtraContext,
86    utilization_emitter: UtilizationEmitter,
87}
88
89impl<'a> Builder<'a> {
90    fn new(
91        config: &'a super::Config,
92        diff: &'a ConfigDiff,
93        buffers: HashMap<ComponentKey, BuiltBuffer>,
94        extra_context: ExtraContext,
95    ) -> Self {
96        Self {
97            config,
98            diff,
99            buffers,
100            shutdown_coordinator: SourceShutdownCoordinator::default(),
101            errors: vec![],
102            outputs: HashMap::new(),
103            tasks: HashMap::new(),
104            inputs: HashMap::new(),
105            healthchecks: HashMap::new(),
106            detach_triggers: HashMap::new(),
107            extra_context,
108            utilization_emitter: UtilizationEmitter::new(),
109        }
110    }
111
112    /// Builds the new pieces of the topology found in `self.diff`.
113    async fn build(mut self) -> Result<TopologyPieces, Vec<String>> {
114        let enrichment_tables = self.load_enrichment_tables().await;
115        let source_tasks = self.build_sources(enrichment_tables).await;
116        self.build_transforms(enrichment_tables).await;
117        self.build_sinks(enrichment_tables).await;
118
119        // We should have all the data for the enrichment tables loaded now, so switch them over to
120        // readonly.
121        enrichment_tables.finish_load();
122
123        if self.errors.is_empty() {
124            Ok(TopologyPieces {
125                inputs: self.inputs,
126                outputs: Self::finalize_outputs(self.outputs),
127                tasks: self.tasks,
128                source_tasks,
129                healthchecks: self.healthchecks,
130                shutdown_coordinator: self.shutdown_coordinator,
131                detach_triggers: self.detach_triggers,
132                utilization_emitter: Some(self.utilization_emitter),
133            })
134        } else {
135            Err(self.errors)
136        }
137    }
138
139    fn finalize_outputs(
140        outputs: HashMap<OutputId, UnboundedSender<fanout::ControlMessage>>,
141    ) -> HashMap<ComponentKey, HashMap<Option<String>, UnboundedSender<fanout::ControlMessage>>>
142    {
143        let mut finalized_outputs = HashMap::new();
144        for (id, output) in outputs {
145            let entry = finalized_outputs
146                .entry(id.component)
147                .or_insert_with(HashMap::new);
148            entry.insert(id.port, output);
149        }
150
151        finalized_outputs
152    }
153
154    /// Loads, or reloads the enrichment tables.
155    /// The tables are stored in the `ENRICHMENT_TABLES` global variable.
156    async fn load_enrichment_tables(&mut self) -> &'static vector_lib::enrichment::TableRegistry {
157        let mut enrichment_tables = HashMap::new();
158
159        // Build enrichment tables
160        'tables: for (name, table_outer) in self.config.enrichment_tables.iter() {
161            let table_name = name.to_string();
162            if ENRICHMENT_TABLES.needs_reload(&table_name) {
163                let indexes = if !self.diff.enrichment_tables.is_added(name) {
164                    // If this is an existing enrichment table, we need to store the indexes to reapply
165                    // them again post load.
166                    Some(ENRICHMENT_TABLES.index_fields(&table_name))
167                } else {
168                    None
169                };
170
171                let mut table = match table_outer.inner.build(&self.config.global).await {
172                    Ok(table) => table,
173                    Err(error) => {
174                        self.errors
175                            .push(format!("Enrichment Table \"{name}\": {error}"));
176                        continue;
177                    }
178                };
179
180                if let Some(indexes) = indexes {
181                    for (case, index) in indexes {
182                        match table
183                            .add_index(case, &index.iter().map(|s| s.as_ref()).collect::<Vec<_>>())
184                        {
185                            Ok(_) => (),
186                            Err(error) => {
187                                // If there is an error adding an index we do not want to use the reloaded
188                                // data, the previously loaded data will still need to be used.
189                                // Just report the error and continue.
190                                error!(message = "Unable to add index to reloaded enrichment table.",
191                                    table = ?name.to_string(),
192                                    %error,
193                                    internal_log_rate_limit = true);
194                                continue 'tables;
195                            }
196                        }
197                    }
198                }
199
200                enrichment_tables.insert(table_name, table);
201            }
202        }
203
204        ENRICHMENT_TABLES.load(enrichment_tables);
205
206        &ENRICHMENT_TABLES
207    }
208
209    async fn build_sources(
210        &mut self,
211        enrichment_tables: &vector_lib::enrichment::TableRegistry,
212    ) -> HashMap<ComponentKey, Task> {
213        let mut source_tasks = HashMap::new();
214
215        let table_sources = self
216            .config
217            .enrichment_tables
218            .iter()
219            .filter_map(|(key, table)| table.as_source(key))
220            .collect::<Vec<_>>();
221        for (key, source) in self
222            .config
223            .sources()
224            .filter(|(key, _)| self.diff.sources.contains_new(key))
225            .chain(
226                table_sources
227                    .iter()
228                    .map(|(key, sink)| (key, sink))
229                    .filter(|(key, _)| self.diff.enrichment_tables.contains_new(key)),
230            )
231        {
232            debug!(component = %key, "Building new source.");
233
234            let typetag = source.inner.get_component_name();
235            let source_outputs = source.inner.outputs(self.config.schema.log_namespace());
236
237            let span = error_span!(
238                "source",
239                component_kind = "source",
240                component_id = %key.id(),
241                component_type = %source.inner.get_component_name(),
242            );
243            let _entered_span = span.enter();
244
245            let task_name = format!(
246                ">> {} ({}, pump) >>",
247                source.inner.get_component_name(),
248                key.id()
249            );
250
251            let mut builder = SourceSender::builder().with_buffer(*SOURCE_SENDER_BUFFER_SIZE);
252            let mut pumps = Vec::new();
253            let mut controls = HashMap::new();
254            let mut schema_definitions = HashMap::with_capacity(source_outputs.len());
255
256            for output in source_outputs.into_iter() {
257                let mut rx = builder.add_source_output(output.clone(), key.clone());
258
259                let (mut fanout, control) = Fanout::new();
260                let source_type = source.inner.get_component_name();
261                let source = Arc::new(key.clone());
262
263                let pump = async move {
264                    debug!("Source pump starting.");
265
266                    while let Some(SourceSenderItem {
267                        events: mut array,
268                        send_reference,
269                    }) = rx.next().await
270                    {
271                        array.set_output_id(&source);
272                        array.set_source_type(source_type);
273                        fanout
274                            .send(array, Some(send_reference))
275                            .await
276                            .map_err(|e| {
277                                debug!("Source pump finished with an error.");
278                                TaskError::wrapped(e)
279                            })?;
280                    }
281
282                    debug!("Source pump finished normally.");
283                    Ok(TaskOutput::Source)
284                };
285
286                pumps.push(pump.instrument(span.clone()));
287                controls.insert(
288                    OutputId {
289                        component: key.clone(),
290                        port: output.port.clone(),
291                    },
292                    control,
293                );
294
295                let port = output.port.clone();
296                if let Some(definition) = output.schema_definition(self.config.schema.enabled) {
297                    schema_definitions.insert(port, definition);
298                }
299            }
300
301            let (pump_error_tx, mut pump_error_rx) = oneshot::channel();
302            let pump = async move {
303                debug!("Source pump supervisor starting.");
304
305                // Spawn all of the per-output pumps and then await their completion.
306                //
307                // If any of the pumps complete with an error, or panic/are cancelled, we return
308                // immediately.
309                let mut handles = FuturesUnordered::new();
310                for pump in pumps {
311                    handles.push(spawn_named(pump, task_name.as_ref()));
312                }
313
314                let mut had_pump_error = false;
315                while let Some(output) = handles.try_next().await? {
316                    if let Err(e) = output {
317                        // Immediately send the error to the source's wrapper future, but ignore any
318                        // errors during the send, since nested errors wouldn't make any sense here.
319                        _ = pump_error_tx.send(e);
320                        had_pump_error = true;
321                        break;
322                    }
323                }
324
325                if had_pump_error {
326                    debug!("Source pump supervisor task finished with an error.");
327                } else {
328                    debug!("Source pump supervisor task finished normally.");
329                }
330                Ok(TaskOutput::Source)
331            };
332            let pump = Task::new(key.clone(), typetag, pump);
333
334            let pipeline = builder.build();
335
336            let (shutdown_signal, force_shutdown_tripwire) = self
337                .shutdown_coordinator
338                .register_source(key, INTERNAL_SOURCES.contains(&typetag));
339
340            let context = SourceContext {
341                key: key.clone(),
342                globals: self.config.global.clone(),
343                enrichment_tables: enrichment_tables.clone(),
344                shutdown: shutdown_signal,
345                out: pipeline,
346                proxy: ProxyConfig::merge_with_env(&self.config.global.proxy, &source.proxy),
347                acknowledgements: source.sink_acknowledgements,
348                schema_definitions,
349                schema: self.config.schema,
350                extra_context: self.extra_context.clone(),
351            };
352            let source = source.inner.build(context).await;
353            let server = match source {
354                Err(error) => {
355                    self.errors.push(format!("Source \"{key}\": {error}"));
356                    continue;
357                }
358                Ok(server) => server,
359            };
360
361            // Build a wrapper future that drives the actual source future, but returns early if we've
362            // been signalled to forcefully shutdown, or if the source pump encounters an error.
363            //
364            // The forceful shutdown will only resolve if the source itself doesn't shutdown gracefully
365            // within the allotted time window. This can occur normally for certain sources, like stdin,
366            // where the I/O is blocking (in a separate thread) and won't wake up to check if it's time
367            // to shutdown unless some input is given.
368            let server = async move {
369                debug!("Source starting.");
370
371                let mut result = select! {
372                    biased;
373
374                    // We've been told that we must forcefully shut down.
375                    _ = force_shutdown_tripwire => Ok(()),
376
377                    // The source pump encountered an error, which we're now bubbling up here to stop
378                    // the source as well, since the source running makes no sense without the pump.
379                    //
380                    // We only match receiving a message, not the error of the sender being dropped,
381                    // just to keep things simpler.
382                    Ok(e) = &mut pump_error_rx => Err(e),
383
384                    // The source finished normally.
385                    result = server => result.map_err(|_| TaskError::Opaque),
386                };
387
388                // Even though we already tried to receive any pump task error above, we may have exited
389                // on the source itself returning an error due to task scheduling, where the pump task
390                // encountered an error, sent it over the oneshot, but we were polling the source
391                // already and hit an error trying to send to the now-shutdown pump task.
392                //
393                // Since the error from the source is opaque at the moment (i.e. `()`), we try a final
394                // time to see if the pump task encountered an error, using _that_ instead if so, to
395                // propagate the true error that caused the source to have to stop.
396                if let Ok(e) = pump_error_rx.try_recv() {
397                    result = Err(e);
398                }
399
400                match result {
401                    Ok(()) => {
402                        debug!("Source finished normally.");
403                        Ok(TaskOutput::Source)
404                    }
405                    Err(e) => {
406                        debug!("Source finished with an error.");
407                        Err(e)
408                    }
409                }
410            };
411            let server = Task::new(key.clone(), typetag, server);
412
413            self.outputs.extend(controls);
414            self.tasks.insert(key.clone(), pump);
415            source_tasks.insert(key.clone(), server);
416        }
417
418        source_tasks
419    }
420
421    async fn build_transforms(
422        &mut self,
423        enrichment_tables: &vector_lib::enrichment::TableRegistry,
424    ) {
425        let mut definition_cache = HashMap::default();
426
427        for (key, transform) in self
428            .config
429            .transforms()
430            .filter(|(key, _)| self.diff.transforms.contains_new(key))
431        {
432            debug!(component = %key, "Building new transform.");
433
434            let input_definitions = match schema::input_definitions(
435                &transform.inputs,
436                self.config,
437                enrichment_tables.clone(),
438                &mut definition_cache,
439            ) {
440                Ok(definitions) => definitions,
441                Err(_) => {
442                    // We have received an error whilst retrieving the definitions,
443                    // there is no point in continuing.
444
445                    return;
446                }
447            };
448
449            let merged_definition: Definition = input_definitions
450                .iter()
451                .map(|(_output_id, definition)| definition.clone())
452                .reduce(Definition::merge)
453                // We may not have any definitions if all the inputs are from metrics sources.
454                .unwrap_or_else(Definition::any);
455
456            let span = error_span!(
457                "transform",
458                component_kind = "transform",
459                component_id = %key.id(),
460                component_type = %transform.inner.get_component_name(),
461            );
462
463            // Create a map of the outputs to the list of possible definitions from those outputs.
464            let schema_definitions = transform
465                .inner
466                .outputs(
467                    enrichment_tables.clone(),
468                    &input_definitions,
469                    self.config.schema.log_namespace(),
470                )
471                .into_iter()
472                .map(|output| {
473                    let definitions = output.schema_definitions(self.config.schema.enabled);
474                    (output.port, definitions)
475                })
476                .collect::<HashMap<_, _>>();
477
478            let context = TransformContext {
479                key: Some(key.clone()),
480                globals: self.config.global.clone(),
481                enrichment_tables: enrichment_tables.clone(),
482                schema_definitions,
483                merged_schema_definition: merged_definition.clone(),
484                schema: self.config.schema,
485                extra_context: self.extra_context.clone(),
486            };
487
488            let node = TransformNode::from_parts(
489                key.clone(),
490                enrichment_tables.clone(),
491                transform,
492                &input_definitions,
493                self.config.schema.log_namespace(),
494            );
495
496            let transform = match transform
497                .inner
498                .build(&context)
499                .instrument(span.clone())
500                .await
501            {
502                Err(error) => {
503                    self.errors.push(format!("Transform \"{key}\": {error}"));
504                    continue;
505                }
506                Ok(transform) => transform,
507            };
508
509            let (input_tx, input_rx) =
510                TopologyBuilder::standalone_memory(TOPOLOGY_BUFFER_SIZE, WhenFull::Block, &span)
511                    .await;
512
513            self.inputs
514                .insert(key.clone(), (input_tx, node.inputs.clone()));
515
516            let (transform_task, transform_outputs) = {
517                let _span = span.enter();
518                build_transform(transform, node, input_rx, &mut self.utilization_emitter)
519            };
520
521            self.outputs.extend(transform_outputs);
522            self.tasks.insert(key.clone(), transform_task);
523        }
524    }
525
526    async fn build_sinks(&mut self, enrichment_tables: &vector_lib::enrichment::TableRegistry) {
527        let table_sinks = self
528            .config
529            .enrichment_tables
530            .iter()
531            .filter_map(|(key, table)| table.as_sink(key))
532            .collect::<Vec<_>>();
533        for (key, sink) in self
534            .config
535            .sinks()
536            .filter(|(key, _)| self.diff.sinks.contains_new(key))
537            .chain(
538                table_sinks
539                    .iter()
540                    .map(|(key, sink)| (key, sink))
541                    .filter(|(key, _)| self.diff.enrichment_tables.contains_new(key)),
542            )
543        {
544            debug!(component = %key, "Building new sink.");
545
546            let sink_inputs = &sink.inputs;
547            let healthcheck = sink.healthcheck();
548            let enable_healthcheck = healthcheck.enabled && self.config.healthchecks.enabled;
549            let healthcheck_timeout = healthcheck.timeout;
550
551            let typetag = sink.inner.get_component_name();
552            let input_type = sink.inner.input().data_type();
553
554            let span = error_span!(
555                "sink",
556                component_kind = "sink",
557                component_id = %key.id(),
558                component_type = %sink.inner.get_component_name(),
559            );
560            let _entered_span = span.enter();
561
562            // At this point, we've validated that all transforms are valid, including any
563            // transform that mutates the schema provided by their sources. We can now validate the
564            // schema expectations of each individual sink.
565            if let Err(mut err) = schema::validate_sink_expectations(
566                key,
567                sink,
568                self.config,
569                enrichment_tables.clone(),
570            ) {
571                self.errors.append(&mut err);
572            };
573
574            let (tx, rx) = match self.buffers.remove(key) {
575                Some(buffer) => buffer,
576                _ => {
577                    let buffer_type =
578                        match sink.buffer.stages().first().expect("cant ever be empty") {
579                            BufferType::Memory { .. } => "memory",
580                            BufferType::DiskV2 { .. } => "disk",
581                        };
582                    let buffer_span = error_span!("sink", buffer_type);
583                    let buffer = sink
584                        .buffer
585                        .build(
586                            self.config.global.data_dir.clone(),
587                            key.to_string(),
588                            buffer_span,
589                        )
590                        .await;
591                    match buffer {
592                        Err(error) => {
593                            self.errors.push(format!("Sink \"{key}\": {error}"));
594                            continue;
595                        }
596                        Ok((tx, rx)) => (tx, Arc::new(Mutex::new(Some(rx.into_stream())))),
597                    }
598                }
599            };
600
601            let cx = SinkContext {
602                healthcheck,
603                globals: self.config.global.clone(),
604                enrichment_tables: enrichment_tables.clone(),
605                proxy: ProxyConfig::merge_with_env(&self.config.global.proxy, sink.proxy()),
606                schema: self.config.schema,
607                app_name: crate::get_app_name().to_string(),
608                app_name_slug: crate::get_slugified_app_name(),
609                extra_context: self.extra_context.clone(),
610            };
611
612            let (sink, healthcheck) = match sink.inner.build(cx).await {
613                Err(error) => {
614                    self.errors.push(format!("Sink \"{key}\": {error}"));
615                    continue;
616                }
617                Ok(built) => built,
618            };
619
620            let (trigger, tripwire) = Tripwire::new();
621
622            let utilization_sender = self
623                .utilization_emitter
624                .add_component(key.clone(), gauge!("utilization"));
625            let component_key = key.clone();
626            let sink = async move {
627                debug!("Sink starting.");
628
629                // Why is this Arc<Mutex<Option<_>>> needed you ask.
630                // In case when this function build_pieces errors
631                // this future won't be run so this rx won't be taken
632                // which will enable us to reuse rx to rebuild
633                // old configuration by passing this Arc<Mutex<Option<_>>>
634                // yet again.
635                let rx = rx
636                    .lock()
637                    .unwrap()
638                    .take()
639                    .expect("Task started but input has been taken.");
640
641                let mut rx = wrap(utilization_sender, component_key.clone(), rx);
642
643                let events_received = register!(EventsReceived);
644                sink.run(
645                    rx.by_ref()
646                        .filter(|events: &EventArray| ready(filter_events_type(events, input_type)))
647                        .inspect(|events| {
648                            events_received.emit(CountByteSize(
649                                events.len(),
650                                events.estimated_json_encoded_size_of(),
651                            ))
652                        })
653                        .take_until_if(tripwire),
654                )
655                .await
656                .map(|_| {
657                    debug!("Sink finished normally.");
658                    TaskOutput::Sink(rx)
659                })
660                .map_err(|_| {
661                    debug!("Sink finished with an error.");
662                    TaskError::Opaque
663                })
664            };
665
666            let task = Task::new(key.clone(), typetag, sink);
667
668            let component_key = key.clone();
669            let healthcheck_task = async move {
670                if enable_healthcheck {
671                    timeout(healthcheck_timeout, healthcheck)
672                        .map(|result| match result {
673                            Ok(Ok(_)) => {
674                                info!("Healthcheck passed.");
675                                Ok(TaskOutput::Healthcheck)
676                            }
677                            Ok(Err(error)) => {
678                                error!(
679                                    msg = "Healthcheck failed.",
680                                    %error,
681                                    component_kind = "sink",
682                                    component_type = typetag,
683                                    component_id = %component_key.id(),
684                                );
685                                Err(TaskError::wrapped(error))
686                            }
687                            Err(e) => {
688                                error!(
689                                    msg = "Healthcheck timed out.",
690                                    component_kind = "sink",
691                                    component_type = typetag,
692                                    component_id = %component_key.id(),
693                                );
694                                Err(TaskError::wrapped(Box::new(e)))
695                            }
696                        })
697                        .await
698                } else {
699                    info!("Healthcheck disabled.");
700                    Ok(TaskOutput::Healthcheck)
701                }
702            };
703
704            let healthcheck_task = Task::new(key.clone(), typetag, healthcheck_task);
705
706            self.inputs.insert(key.clone(), (tx, sink_inputs.clone()));
707            self.healthchecks.insert(key.clone(), healthcheck_task);
708            self.tasks.insert(key.clone(), task);
709            self.detach_triggers.insert(key.clone(), trigger);
710        }
711    }
712}
713
714pub async fn reload_enrichment_tables(config: &Config) {
715    let mut enrichment_tables = HashMap::new();
716    // Build enrichment tables
717    'tables: for (name, table_outer) in config.enrichment_tables.iter() {
718        let table_name = name.to_string();
719        if ENRICHMENT_TABLES.needs_reload(&table_name) {
720            let indexes = Some(ENRICHMENT_TABLES.index_fields(&table_name));
721
722            let mut table = match table_outer.inner.build(&config.global).await {
723                Ok(table) => table,
724                Err(error) => {
725                    error!(
726                        internal_log_rate_limit = true,
727                        "Enrichment table \"{name}\" reload failed: {error}",
728                    );
729                    continue;
730                }
731            };
732
733            if let Some(indexes) = indexes {
734                for (case, index) in indexes {
735                    match table
736                        .add_index(case, &index.iter().map(|s| s.as_ref()).collect::<Vec<_>>())
737                    {
738                        Ok(_) => (),
739                        Err(error) => {
740                            // If there is an error adding an index we do not want to use the reloaded
741                            // data, the previously loaded data will still need to be used.
742                            // Just report the error and continue.
743                            error!(
744                                internal_log_rate_limit = true,
745                                message = "Unable to add index to reloaded enrichment table.",
746                                table = ?name.to_string(),
747                                %error
748                            );
749                            continue 'tables;
750                        }
751                    }
752                }
753            }
754
755            enrichment_tables.insert(table_name, table);
756        }
757    }
758
759    ENRICHMENT_TABLES.load(enrichment_tables);
760    ENRICHMENT_TABLES.finish_load();
761}
762
763pub struct TopologyPieces {
764    pub(super) inputs: HashMap<ComponentKey, (BufferSender<EventArray>, Inputs<OutputId>)>,
765    pub(crate) outputs: HashMap<ComponentKey, HashMap<Option<String>, fanout::ControlChannel>>,
766    pub(super) tasks: HashMap<ComponentKey, Task>,
767    pub(crate) source_tasks: HashMap<ComponentKey, Task>,
768    pub(super) healthchecks: HashMap<ComponentKey, Task>,
769    pub(crate) shutdown_coordinator: SourceShutdownCoordinator,
770    pub(crate) detach_triggers: HashMap<ComponentKey, Trigger>,
771    pub(crate) utilization_emitter: Option<UtilizationEmitter>,
772}
773
774impl TopologyPieces {
775    pub async fn build_or_log_errors(
776        config: &Config,
777        diff: &ConfigDiff,
778        buffers: HashMap<ComponentKey, BuiltBuffer>,
779        extra_context: ExtraContext,
780    ) -> Option<Self> {
781        match TopologyPieces::build(config, diff, buffers, extra_context).await {
782            Err(errors) => {
783                for error in errors {
784                    error!(message = "Configuration error.", %error);
785                }
786                None
787            }
788            Ok(new_pieces) => Some(new_pieces),
789        }
790    }
791
792    /// Builds only the new pieces, and doesn't check their topology.
793    pub async fn build(
794        config: &super::Config,
795        diff: &ConfigDiff,
796        buffers: HashMap<ComponentKey, BuiltBuffer>,
797        extra_context: ExtraContext,
798    ) -> Result<Self, Vec<String>> {
799        Builder::new(config, diff, buffers, extra_context)
800            .build()
801            .await
802    }
803}
804
805const fn filter_events_type(events: &EventArray, data_type: DataType) -> bool {
806    match events {
807        EventArray::Logs(_) => data_type.contains(DataType::Log),
808        EventArray::Metrics(_) => data_type.contains(DataType::Metric),
809        EventArray::Traces(_) => data_type.contains(DataType::Trace),
810    }
811}
812
813#[derive(Debug, Clone)]
814struct TransformNode {
815    key: ComponentKey,
816    typetag: &'static str,
817    inputs: Inputs<OutputId>,
818    input_details: Input,
819    outputs: Vec<TransformOutput>,
820    enable_concurrency: bool,
821}
822
823impl TransformNode {
824    pub fn from_parts(
825        key: ComponentKey,
826        enrichment_tables: vector_lib::enrichment::TableRegistry,
827        transform: &TransformOuter<OutputId>,
828        schema_definition: &[(OutputId, Definition)],
829        global_log_namespace: LogNamespace,
830    ) -> Self {
831        Self {
832            key,
833            typetag: transform.inner.get_component_name(),
834            inputs: transform.inputs.clone(),
835            input_details: transform.inner.input(),
836            outputs: transform.inner.outputs(
837                enrichment_tables,
838                schema_definition,
839                global_log_namespace,
840            ),
841            enable_concurrency: transform.inner.enable_concurrency(),
842        }
843    }
844}
845
846fn build_transform(
847    transform: Transform,
848    node: TransformNode,
849    input_rx: BufferReceiver<EventArray>,
850    utilization_emitter: &mut UtilizationEmitter,
851) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
852    match transform {
853        // TODO: avoid the double boxing for function transforms here
854        Transform::Function(t) => {
855            build_sync_transform(Box::new(t), node, input_rx, utilization_emitter)
856        }
857        Transform::Synchronous(t) => build_sync_transform(t, node, input_rx, utilization_emitter),
858        Transform::Task(t) => build_task_transform(
859            t,
860            input_rx,
861            node.input_details.data_type(),
862            node.typetag,
863            &node.key,
864            &node.outputs,
865            utilization_emitter,
866        ),
867    }
868}
869
870fn build_sync_transform(
871    t: Box<dyn SyncTransform>,
872    node: TransformNode,
873    input_rx: BufferReceiver<EventArray>,
874    utilization_emitter: &mut UtilizationEmitter,
875) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
876    let (outputs, controls) = TransformOutputs::new(node.outputs, &node.key);
877
878    let sender = utilization_emitter.add_component(node.key.clone(), gauge!("utilization"));
879    let runner = Runner::new(t, input_rx, sender, node.input_details.data_type(), outputs);
880    let transform = if node.enable_concurrency {
881        runner.run_concurrently().boxed()
882    } else {
883        runner.run_inline().boxed()
884    };
885
886    let transform = async move {
887        debug!("Synchronous transform starting.");
888
889        match transform.await {
890            Ok(v) => {
891                debug!("Synchronous transform finished normally.");
892                Ok(v)
893            }
894            Err(e) => {
895                debug!("Synchronous transform finished with an error.");
896                Err(e)
897            }
898        }
899    };
900
901    let mut output_controls = HashMap::new();
902    for (name, control) in controls {
903        let id = name
904            .map(|name| OutputId::from((&node.key, name)))
905            .unwrap_or_else(|| OutputId::from(&node.key));
906        output_controls.insert(id, control);
907    }
908
909    let task = Task::new(node.key.clone(), node.typetag, transform);
910
911    (task, output_controls)
912}
913
914struct Runner {
915    transform: Box<dyn SyncTransform>,
916    input_rx: Option<BufferReceiver<EventArray>>,
917    input_type: DataType,
918    outputs: TransformOutputs,
919    timer_tx: UtilizationComponentSender,
920    events_received: Registered<EventsReceived>,
921}
922
923impl Runner {
924    fn new(
925        transform: Box<dyn SyncTransform>,
926        input_rx: BufferReceiver<EventArray>,
927        timer_tx: UtilizationComponentSender,
928        input_type: DataType,
929        outputs: TransformOutputs,
930    ) -> Self {
931        Self {
932            transform,
933            input_rx: Some(input_rx),
934            input_type,
935            outputs,
936            timer_tx,
937            events_received: register!(EventsReceived),
938        }
939    }
940
941    fn on_events_received(&mut self, events: &EventArray) {
942        self.timer_tx.try_send_stop_wait();
943
944        self.events_received.emit(CountByteSize(
945            events.len(),
946            events.estimated_json_encoded_size_of(),
947        ));
948    }
949
950    async fn send_outputs(&mut self, outputs_buf: &mut TransformOutputsBuf) -> crate::Result<()> {
951        self.timer_tx.try_send_start_wait();
952        self.outputs.send(outputs_buf).await
953    }
954
955    async fn run_inline(mut self) -> TaskResult {
956        // 128 is an arbitrary, smallish constant
957        const INLINE_BATCH_SIZE: usize = 128;
958
959        let mut outputs_buf = self.outputs.new_buf_with_capacity(INLINE_BATCH_SIZE);
960
961        let mut input_rx = self
962            .input_rx
963            .take()
964            .expect("can't run runner twice")
965            .into_stream()
966            .filter(move |events| ready(filter_events_type(events, self.input_type)));
967
968        self.timer_tx.try_send_start_wait();
969        while let Some(events) = input_rx.next().await {
970            self.on_events_received(&events);
971            self.transform.transform_all(events, &mut outputs_buf);
972            self.send_outputs(&mut outputs_buf)
973                .await
974                .map_err(TaskError::wrapped)?;
975        }
976
977        Ok(TaskOutput::Transform)
978    }
979
980    async fn run_concurrently(mut self) -> TaskResult {
981        let input_rx = self
982            .input_rx
983            .take()
984            .expect("can't run runner twice")
985            .into_stream()
986            .filter(move |events| ready(filter_events_type(events, self.input_type)));
987
988        let mut input_rx =
989            super::ready_arrays::ReadyArrays::with_capacity(input_rx, READY_ARRAY_CAPACITY);
990
991        let mut in_flight = FuturesOrdered::new();
992        let mut shutting_down = false;
993
994        self.timer_tx.try_send_start_wait();
995        loop {
996            tokio::select! {
997                biased;
998
999                result = in_flight.next(), if !in_flight.is_empty() => {
1000                    match result {
1001                        Some(Ok(outputs_buf)) => {
1002                            let mut outputs_buf: TransformOutputsBuf = outputs_buf;
1003                            self.send_outputs(&mut outputs_buf).await
1004                                .map_err(TaskError::wrapped)?;
1005                        }
1006                        _ => unreachable!("join error or bad poll"),
1007                    }
1008                }
1009
1010                input_arrays = input_rx.next(), if in_flight.len() < *TRANSFORM_CONCURRENCY_LIMIT && !shutting_down => {
1011                    match input_arrays {
1012                        Some(input_arrays) => {
1013                            let mut len = 0;
1014                            for events in &input_arrays {
1015                                self.on_events_received(events);
1016                                len += events.len();
1017                            }
1018
1019                            let mut t = self.transform.clone();
1020                            let mut outputs_buf = self.outputs.new_buf_with_capacity(len);
1021                            let task = tokio::spawn(async move {
1022                                for events in input_arrays {
1023                                    t.transform_all(events, &mut outputs_buf);
1024                                }
1025                                outputs_buf
1026                            }.in_current_span());
1027                            in_flight.push_back(task);
1028                        }
1029                        None => {
1030                            shutting_down = true;
1031                            continue
1032                        }
1033                    }
1034                }
1035
1036                else => {
1037                    if shutting_down {
1038                        break
1039                    }
1040                }
1041            }
1042        }
1043
1044        Ok(TaskOutput::Transform)
1045    }
1046}
1047
1048fn build_task_transform(
1049    t: Box<dyn TaskTransform<EventArray>>,
1050    input_rx: BufferReceiver<EventArray>,
1051    input_type: DataType,
1052    typetag: &str,
1053    key: &ComponentKey,
1054    outputs: &[TransformOutput],
1055    utilization_emitter: &mut UtilizationEmitter,
1056) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
1057    let (mut fanout, control) = Fanout::new();
1058
1059    let sender = utilization_emitter.add_component(key.clone(), gauge!("utilization"));
1060    let input_rx = wrap(sender, key.clone(), input_rx.into_stream());
1061
1062    let events_received = register!(EventsReceived);
1063    let filtered = input_rx
1064        .filter(move |events| ready(filter_events_type(events, input_type)))
1065        .inspect(move |events| {
1066            events_received.emit(CountByteSize(
1067                events.len(),
1068                events.estimated_json_encoded_size_of(),
1069            ))
1070        });
1071    let events_sent = register!(EventsSent::from(internal_event::Output(None)));
1072    let output_id = Arc::new(OutputId {
1073        component: key.clone(),
1074        port: None,
1075    });
1076
1077    // Task transforms can only write to the default output, so only a single schema def map is needed
1078    let schema_definition_map = outputs
1079        .iter()
1080        .find(|x| x.port.is_none())
1081        .expect("output for default port required for task transforms")
1082        .log_schema_definitions
1083        .clone()
1084        .into_iter()
1085        .map(|(key, value)| (key, Arc::new(value)))
1086        .collect();
1087
1088    let stream = t
1089        .transform(Box::pin(filtered))
1090        .map(move |mut events| {
1091            for event in events.iter_events_mut() {
1092                update_runtime_schema_definition(event, &output_id, &schema_definition_map);
1093            }
1094            (events, Instant::now())
1095        })
1096        .inspect(move |(events, _): &(EventArray, Instant)| {
1097            events_sent.emit(CountByteSize(
1098                events.len(),
1099                events.estimated_json_encoded_size_of(),
1100            ));
1101        });
1102    let transform = async move {
1103        debug!("Task transform starting.");
1104
1105        match fanout.send_stream(stream).await {
1106            Ok(()) => {
1107                debug!("Task transform finished normally.");
1108                Ok(TaskOutput::Transform)
1109            }
1110            Err(e) => {
1111                debug!("Task transform finished with an error.");
1112                Err(TaskError::wrapped(e))
1113            }
1114        }
1115    }
1116    .boxed();
1117
1118    let mut outputs = HashMap::new();
1119    outputs.insert(OutputId::from(key), control);
1120
1121    let task = Task::new(key.clone(), typetag, transform);
1122
1123    (task, outputs)
1124}