vector/topology/
builder.rs

1use std::{
2    collections::HashMap,
3    future::ready,
4    num::NonZeroUsize,
5    sync::{Arc, LazyLock, Mutex},
6    time::Instant,
7};
8
9use futures::{stream::FuturesOrdered, FutureExt, StreamExt, TryStreamExt};
10use futures_util::stream::FuturesUnordered;
11use metrics::gauge;
12use stream_cancel::{StreamExt as StreamCancelExt, Trigger, Tripwire};
13use tokio::{
14    select,
15    sync::{mpsc::UnboundedSender, oneshot},
16    time::timeout,
17};
18use tracing::Instrument;
19use vector_lib::config::LogNamespace;
20use vector_lib::internal_event::{
21    self, CountByteSize, EventsSent, InternalEventHandle as _, Registered,
22};
23use vector_lib::transform::update_runtime_schema_definition;
24use vector_lib::{
25    buffers::{
26        topology::{
27            builder::TopologyBuilder,
28            channel::{BufferReceiver, BufferSender},
29        },
30        BufferType, WhenFull,
31    },
32    schema::Definition,
33    EstimatedJsonEncodedSizeOf,
34};
35
36use super::{
37    fanout::{self, Fanout},
38    schema,
39    task::{Task, TaskOutput, TaskResult},
40    BuiltBuffer, ConfigDiff,
41};
42use crate::{
43    config::{
44        ComponentKey, Config, DataType, EnrichmentTableConfig, Input, Inputs, OutputId,
45        ProxyConfig, SinkContext, SourceContext, TransformContext, TransformOuter, TransformOutput,
46    },
47    event::{EventArray, EventContainer},
48    extra_context::ExtraContext,
49    internal_events::EventsReceived,
50    shutdown::SourceShutdownCoordinator,
51    source_sender::{SourceSenderItem, CHUNK_SIZE},
52    spawn_named,
53    topology::task::TaskError,
54    transforms::{SyncTransform, TaskTransform, Transform, TransformOutputs, TransformOutputsBuf},
55    utilization::{wrap, UtilizationComponentSender, UtilizationEmitter},
56    SourceSender,
57};
58
59static ENRICHMENT_TABLES: LazyLock<vector_lib::enrichment::TableRegistry> =
60    LazyLock::new(vector_lib::enrichment::TableRegistry::default);
61
62pub(crate) static SOURCE_SENDER_BUFFER_SIZE: LazyLock<usize> =
63    LazyLock::new(|| *TRANSFORM_CONCURRENCY_LIMIT * CHUNK_SIZE);
64
65const READY_ARRAY_CAPACITY: NonZeroUsize = NonZeroUsize::new(CHUNK_SIZE * 4).unwrap();
66pub(crate) const TOPOLOGY_BUFFER_SIZE: NonZeroUsize = NonZeroUsize::new(100).unwrap();
67
68static TRANSFORM_CONCURRENCY_LIMIT: LazyLock<usize> = LazyLock::new(|| {
69    crate::app::worker_threads()
70        .map(std::num::NonZeroUsize::get)
71        .unwrap_or_else(crate::num_threads)
72});
73
74const INTERNAL_SOURCES: [&str; 2] = ["internal_logs", "internal_metrics"];
75
76struct Builder<'a> {
77    config: &'a super::Config,
78    diff: &'a ConfigDiff,
79    shutdown_coordinator: SourceShutdownCoordinator,
80    errors: Vec<String>,
81    outputs: HashMap<OutputId, UnboundedSender<fanout::ControlMessage>>,
82    tasks: HashMap<ComponentKey, Task>,
83    buffers: HashMap<ComponentKey, BuiltBuffer>,
84    inputs: HashMap<ComponentKey, (BufferSender<EventArray>, Inputs<OutputId>)>,
85    healthchecks: HashMap<ComponentKey, Task>,
86    detach_triggers: HashMap<ComponentKey, Trigger>,
87    extra_context: ExtraContext,
88    utilization_emitter: UtilizationEmitter,
89}
90
91impl<'a> Builder<'a> {
92    fn new(
93        config: &'a super::Config,
94        diff: &'a ConfigDiff,
95        buffers: HashMap<ComponentKey, BuiltBuffer>,
96        extra_context: ExtraContext,
97    ) -> Self {
98        Self {
99            config,
100            diff,
101            buffers,
102            shutdown_coordinator: SourceShutdownCoordinator::default(),
103            errors: vec![],
104            outputs: HashMap::new(),
105            tasks: HashMap::new(),
106            inputs: HashMap::new(),
107            healthchecks: HashMap::new(),
108            detach_triggers: HashMap::new(),
109            extra_context,
110            utilization_emitter: UtilizationEmitter::new(),
111        }
112    }
113
114    /// Builds the new pieces of the topology found in `self.diff`.
115    async fn build(mut self) -> Result<TopologyPieces, Vec<String>> {
116        let enrichment_tables = self.load_enrichment_tables().await;
117        let source_tasks = self.build_sources(enrichment_tables).await;
118        self.build_transforms(enrichment_tables).await;
119        self.build_sinks(enrichment_tables).await;
120
121        // We should have all the data for the enrichment tables loaded now, so switch them over to
122        // readonly.
123        enrichment_tables.finish_load();
124
125        if self.errors.is_empty() {
126            Ok(TopologyPieces {
127                inputs: self.inputs,
128                outputs: Self::finalize_outputs(self.outputs),
129                tasks: self.tasks,
130                source_tasks,
131                healthchecks: self.healthchecks,
132                shutdown_coordinator: self.shutdown_coordinator,
133                detach_triggers: self.detach_triggers,
134                utilization_emitter: Some(self.utilization_emitter),
135            })
136        } else {
137            Err(self.errors)
138        }
139    }
140
141    fn finalize_outputs(
142        outputs: HashMap<OutputId, UnboundedSender<fanout::ControlMessage>>,
143    ) -> HashMap<ComponentKey, HashMap<Option<String>, UnboundedSender<fanout::ControlMessage>>>
144    {
145        let mut finalized_outputs = HashMap::new();
146        for (id, output) in outputs {
147            let entry = finalized_outputs
148                .entry(id.component)
149                .or_insert_with(HashMap::new);
150            entry.insert(id.port, output);
151        }
152
153        finalized_outputs
154    }
155
156    /// Loads, or reloads the enrichment tables.
157    /// The tables are stored in the `ENRICHMENT_TABLES` global variable.
158    async fn load_enrichment_tables(&mut self) -> &'static vector_lib::enrichment::TableRegistry {
159        let mut enrichment_tables = HashMap::new();
160
161        // Build enrichment tables
162        'tables: for (name, table_outer) in self.config.enrichment_tables.iter() {
163            let table_name = name.to_string();
164            if ENRICHMENT_TABLES.needs_reload(&table_name) {
165                let indexes = if !self.diff.enrichment_tables.is_added(name) {
166                    // If this is an existing enrichment table, we need to store the indexes to reapply
167                    // them again post load.
168                    Some(ENRICHMENT_TABLES.index_fields(&table_name))
169                } else {
170                    None
171                };
172
173                let mut table = match table_outer.inner.build(&self.config.global).await {
174                    Ok(table) => table,
175                    Err(error) => {
176                        self.errors
177                            .push(format!("Enrichment Table \"{name}\": {error}"));
178                        continue;
179                    }
180                };
181
182                if let Some(indexes) = indexes {
183                    for (case, index) in indexes {
184                        match table
185                            .add_index(case, &index.iter().map(|s| s.as_ref()).collect::<Vec<_>>())
186                        {
187                            Ok(_) => (),
188                            Err(error) => {
189                                // If there is an error adding an index we do not want to use the reloaded
190                                // data, the previously loaded data will still need to be used.
191                                // Just report the error and continue.
192                                error!(message = "Unable to add index to reloaded enrichment table.",
193                                    table = ?name.to_string(),
194                                    %error,
195                                    internal_log_rate_limit = true);
196                                continue 'tables;
197                            }
198                        }
199                    }
200                }
201
202                enrichment_tables.insert(table_name, table);
203            }
204        }
205
206        ENRICHMENT_TABLES.load(enrichment_tables);
207
208        &ENRICHMENT_TABLES
209    }
210
211    async fn build_sources(
212        &mut self,
213        enrichment_tables: &vector_lib::enrichment::TableRegistry,
214    ) -> HashMap<ComponentKey, Task> {
215        let mut source_tasks = HashMap::new();
216
217        let table_sources = self
218            .config
219            .enrichment_tables
220            .iter()
221            .filter_map(|(key, table)| table.as_source(key))
222            .collect::<Vec<_>>();
223        for (key, source) in self
224            .config
225            .sources()
226            .filter(|(key, _)| self.diff.sources.contains_new(key))
227            .chain(
228                table_sources
229                    .iter()
230                    .map(|(key, sink)| (key, sink))
231                    .filter(|(key, _)| self.diff.enrichment_tables.contains_new(key)),
232            )
233        {
234            debug!(component = %key, "Building new source.");
235
236            let typetag = source.inner.get_component_name();
237            let source_outputs = source.inner.outputs(self.config.schema.log_namespace());
238
239            let span = error_span!(
240                "source",
241                component_kind = "source",
242                component_id = %key.id(),
243                component_type = %source.inner.get_component_name(),
244            );
245            let _entered_span = span.enter();
246
247            let task_name = format!(
248                ">> {} ({}, pump) >>",
249                source.inner.get_component_name(),
250                key.id()
251            );
252
253            let mut builder = SourceSender::builder().with_buffer(*SOURCE_SENDER_BUFFER_SIZE);
254            let mut pumps = Vec::new();
255            let mut controls = HashMap::new();
256            let mut schema_definitions = HashMap::with_capacity(source_outputs.len());
257
258            for output in source_outputs.into_iter() {
259                let mut rx = builder.add_source_output(output.clone(), key.clone());
260
261                let (mut fanout, control) = Fanout::new();
262                let source_type = source.inner.get_component_name();
263                let source = Arc::new(key.clone());
264
265                let pump = async move {
266                    debug!("Source pump starting.");
267
268                    while let Some(SourceSenderItem {
269                        events: mut array,
270                        send_reference,
271                    }) = rx.next().await
272                    {
273                        array.set_output_id(&source);
274                        array.set_source_type(source_type);
275                        fanout
276                            .send(array, Some(send_reference))
277                            .await
278                            .map_err(|e| {
279                                debug!("Source pump finished with an error.");
280                                TaskError::wrapped(e)
281                            })?;
282                    }
283
284                    debug!("Source pump finished normally.");
285                    Ok(TaskOutput::Source)
286                };
287
288                pumps.push(pump.instrument(span.clone()));
289                controls.insert(
290                    OutputId {
291                        component: key.clone(),
292                        port: output.port.clone(),
293                    },
294                    control,
295                );
296
297                let port = output.port.clone();
298                if let Some(definition) = output.schema_definition(self.config.schema.enabled) {
299                    schema_definitions.insert(port, definition);
300                }
301            }
302
303            let (pump_error_tx, mut pump_error_rx) = oneshot::channel();
304            let pump = async move {
305                debug!("Source pump supervisor starting.");
306
307                // Spawn all of the per-output pumps and then await their completion.
308                //
309                // If any of the pumps complete with an error, or panic/are cancelled, we return
310                // immediately.
311                let mut handles = FuturesUnordered::new();
312                for pump in pumps {
313                    handles.push(spawn_named(pump, task_name.as_ref()));
314                }
315
316                let mut had_pump_error = false;
317                while let Some(output) = handles.try_next().await? {
318                    if let Err(e) = output {
319                        // Immediately send the error to the source's wrapper future, but ignore any
320                        // errors during the send, since nested errors wouldn't make any sense here.
321                        _ = pump_error_tx.send(e);
322                        had_pump_error = true;
323                        break;
324                    }
325                }
326
327                if had_pump_error {
328                    debug!("Source pump supervisor task finished with an error.");
329                } else {
330                    debug!("Source pump supervisor task finished normally.");
331                }
332                Ok(TaskOutput::Source)
333            };
334            let pump = Task::new(key.clone(), typetag, pump);
335
336            let pipeline = builder.build();
337
338            let (shutdown_signal, force_shutdown_tripwire) = self
339                .shutdown_coordinator
340                .register_source(key, INTERNAL_SOURCES.contains(&typetag));
341
342            let context = SourceContext {
343                key: key.clone(),
344                globals: self.config.global.clone(),
345                enrichment_tables: enrichment_tables.clone(),
346                shutdown: shutdown_signal,
347                out: pipeline,
348                proxy: ProxyConfig::merge_with_env(&self.config.global.proxy, &source.proxy),
349                acknowledgements: source.sink_acknowledgements,
350                schema_definitions,
351                schema: self.config.schema,
352                extra_context: self.extra_context.clone(),
353            };
354            let source = source.inner.build(context).await;
355            let server = match source {
356                Err(error) => {
357                    self.errors.push(format!("Source \"{key}\": {error}"));
358                    continue;
359                }
360                Ok(server) => server,
361            };
362
363            // Build a wrapper future that drives the actual source future, but returns early if we've
364            // been signalled to forcefully shutdown, or if the source pump encounters an error.
365            //
366            // The forceful shutdown will only resolve if the source itself doesn't shutdown gracefully
367            // within the allotted time window. This can occur normally for certain sources, like stdin,
368            // where the I/O is blocking (in a separate thread) and won't wake up to check if it's time
369            // to shutdown unless some input is given.
370            let server = async move {
371                debug!("Source starting.");
372
373                let mut result = select! {
374                    biased;
375
376                    // We've been told that we must forcefully shut down.
377                    _ = force_shutdown_tripwire => Ok(()),
378
379                    // The source pump encountered an error, which we're now bubbling up here to stop
380                    // the source as well, since the source running makes no sense without the pump.
381                    //
382                    // We only match receiving a message, not the error of the sender being dropped,
383                    // just to keep things simpler.
384                    Ok(e) = &mut pump_error_rx => Err(e),
385
386                    // The source finished normally.
387                    result = server => result.map_err(|_| TaskError::Opaque),
388                };
389
390                // Even though we already tried to receive any pump task error above, we may have exited
391                // on the source itself returning an error due to task scheduling, where the pump task
392                // encountered an error, sent it over the oneshot, but we were polling the source
393                // already and hit an error trying to send to the now-shutdown pump task.
394                //
395                // Since the error from the source is opaque at the moment (i.e. `()`), we try a final
396                // time to see if the pump task encountered an error, using _that_ instead if so, to
397                // propagate the true error that caused the source to have to stop.
398                if let Ok(e) = pump_error_rx.try_recv() {
399                    result = Err(e);
400                }
401
402                match result {
403                    Ok(()) => {
404                        debug!("Source finished normally.");
405                        Ok(TaskOutput::Source)
406                    }
407                    Err(e) => {
408                        debug!("Source finished with an error.");
409                        Err(e)
410                    }
411                }
412            };
413            let server = Task::new(key.clone(), typetag, server);
414
415            self.outputs.extend(controls);
416            self.tasks.insert(key.clone(), pump);
417            source_tasks.insert(key.clone(), server);
418        }
419
420        source_tasks
421    }
422
423    async fn build_transforms(
424        &mut self,
425        enrichment_tables: &vector_lib::enrichment::TableRegistry,
426    ) {
427        let mut definition_cache = HashMap::default();
428
429        for (key, transform) in self
430            .config
431            .transforms()
432            .filter(|(key, _)| self.diff.transforms.contains_new(key))
433        {
434            debug!(component = %key, "Building new transform.");
435
436            let input_definitions = match schema::input_definitions(
437                &transform.inputs,
438                self.config,
439                enrichment_tables.clone(),
440                &mut definition_cache,
441            ) {
442                Ok(definitions) => definitions,
443                Err(_) => {
444                    // We have received an error whilst retrieving the definitions,
445                    // there is no point in continuing.
446
447                    return;
448                }
449            };
450
451            let merged_definition: Definition = input_definitions
452                .iter()
453                .map(|(_output_id, definition)| definition.clone())
454                .reduce(Definition::merge)
455                // We may not have any definitions if all the inputs are from metrics sources.
456                .unwrap_or_else(Definition::any);
457
458            let span = error_span!(
459                "transform",
460                component_kind = "transform",
461                component_id = %key.id(),
462                component_type = %transform.inner.get_component_name(),
463            );
464
465            // Create a map of the outputs to the list of possible definitions from those outputs.
466            let schema_definitions = transform
467                .inner
468                .outputs(
469                    enrichment_tables.clone(),
470                    &input_definitions,
471                    self.config.schema.log_namespace(),
472                )
473                .into_iter()
474                .map(|output| {
475                    let definitions = output.schema_definitions(self.config.schema.enabled);
476                    (output.port, definitions)
477                })
478                .collect::<HashMap<_, _>>();
479
480            let context = TransformContext {
481                key: Some(key.clone()),
482                globals: self.config.global.clone(),
483                enrichment_tables: enrichment_tables.clone(),
484                schema_definitions,
485                merged_schema_definition: merged_definition.clone(),
486                schema: self.config.schema,
487                extra_context: self.extra_context.clone(),
488            };
489
490            let node = TransformNode::from_parts(
491                key.clone(),
492                enrichment_tables.clone(),
493                transform,
494                &input_definitions,
495                self.config.schema.log_namespace(),
496            );
497
498            let transform = match transform
499                .inner
500                .build(&context)
501                .instrument(span.clone())
502                .await
503            {
504                Err(error) => {
505                    self.errors.push(format!("Transform \"{key}\": {error}"));
506                    continue;
507                }
508                Ok(transform) => transform,
509            };
510
511            let (input_tx, input_rx) =
512                TopologyBuilder::standalone_memory(TOPOLOGY_BUFFER_SIZE, WhenFull::Block, &span)
513                    .await;
514
515            self.inputs
516                .insert(key.clone(), (input_tx, node.inputs.clone()));
517
518            let (transform_task, transform_outputs) = {
519                let _span = span.enter();
520                build_transform(transform, node, input_rx, &mut self.utilization_emitter)
521            };
522
523            self.outputs.extend(transform_outputs);
524            self.tasks.insert(key.clone(), transform_task);
525        }
526    }
527
528    async fn build_sinks(&mut self, enrichment_tables: &vector_lib::enrichment::TableRegistry) {
529        let table_sinks = self
530            .config
531            .enrichment_tables
532            .iter()
533            .filter_map(|(key, table)| table.as_sink(key))
534            .collect::<Vec<_>>();
535        for (key, sink) in self
536            .config
537            .sinks()
538            .filter(|(key, _)| self.diff.sinks.contains_new(key))
539            .chain(
540                table_sinks
541                    .iter()
542                    .map(|(key, sink)| (key, sink))
543                    .filter(|(key, _)| self.diff.enrichment_tables.contains_new(key)),
544            )
545        {
546            debug!(component = %key, "Building new sink.");
547
548            let sink_inputs = &sink.inputs;
549            let healthcheck = sink.healthcheck();
550            let enable_healthcheck = healthcheck.enabled && self.config.healthchecks.enabled;
551            let healthcheck_timeout = healthcheck.timeout;
552
553            let typetag = sink.inner.get_component_name();
554            let input_type = sink.inner.input().data_type();
555
556            let span = error_span!(
557                "sink",
558                component_kind = "sink",
559                component_id = %key.id(),
560                component_type = %sink.inner.get_component_name(),
561            );
562            let _entered_span = span.enter();
563
564            // At this point, we've validated that all transforms are valid, including any
565            // transform that mutates the schema provided by their sources. We can now validate the
566            // schema expectations of each individual sink.
567            if let Err(mut err) = schema::validate_sink_expectations(
568                key,
569                sink,
570                self.config,
571                enrichment_tables.clone(),
572            ) {
573                self.errors.append(&mut err);
574            };
575
576            let (tx, rx) = match self.buffers.remove(key) {
577                Some(buffer) => buffer,
578                _ => {
579                    let buffer_type =
580                        match sink.buffer.stages().first().expect("cant ever be empty") {
581                            BufferType::Memory { .. } => "memory",
582                            BufferType::DiskV2 { .. } => "disk",
583                        };
584                    let buffer_span = error_span!("sink", buffer_type);
585                    let buffer = sink
586                        .buffer
587                        .build(
588                            self.config.global.data_dir.clone(),
589                            key.to_string(),
590                            buffer_span,
591                        )
592                        .await;
593                    match buffer {
594                        Err(error) => {
595                            self.errors.push(format!("Sink \"{key}\": {error}"));
596                            continue;
597                        }
598                        Ok((tx, rx)) => (tx, Arc::new(Mutex::new(Some(rx.into_stream())))),
599                    }
600                }
601            };
602
603            let cx = SinkContext {
604                healthcheck,
605                globals: self.config.global.clone(),
606                enrichment_tables: enrichment_tables.clone(),
607                proxy: ProxyConfig::merge_with_env(&self.config.global.proxy, sink.proxy()),
608                schema: self.config.schema,
609                app_name: crate::get_app_name().to_string(),
610                app_name_slug: crate::get_slugified_app_name(),
611                extra_context: self.extra_context.clone(),
612            };
613
614            let (sink, healthcheck) = match sink.inner.build(cx).await {
615                Err(error) => {
616                    self.errors.push(format!("Sink \"{key}\": {error}"));
617                    continue;
618                }
619                Ok(built) => built,
620            };
621
622            let (trigger, tripwire) = Tripwire::new();
623
624            let utilization_sender = self
625                .utilization_emitter
626                .add_component(key.clone(), gauge!("utilization"));
627            let component_key = key.clone();
628            let sink = async move {
629                debug!("Sink starting.");
630
631                // Why is this Arc<Mutex<Option<_>>> needed you ask.
632                // In case when this function build_pieces errors
633                // this future won't be run so this rx won't be taken
634                // which will enable us to reuse rx to rebuild
635                // old configuration by passing this Arc<Mutex<Option<_>>>
636                // yet again.
637                let rx = rx
638                    .lock()
639                    .unwrap()
640                    .take()
641                    .expect("Task started but input has been taken.");
642
643                let mut rx = wrap(utilization_sender, component_key.clone(), rx);
644
645                let events_received = register!(EventsReceived);
646                sink.run(
647                    rx.by_ref()
648                        .filter(|events: &EventArray| ready(filter_events_type(events, input_type)))
649                        .inspect(|events| {
650                            events_received.emit(CountByteSize(
651                                events.len(),
652                                events.estimated_json_encoded_size_of(),
653                            ))
654                        })
655                        .take_until_if(tripwire),
656                )
657                .await
658                .map(|_| {
659                    debug!("Sink finished normally.");
660                    TaskOutput::Sink(rx)
661                })
662                .map_err(|_| {
663                    debug!("Sink finished with an error.");
664                    TaskError::Opaque
665                })
666            };
667
668            let task = Task::new(key.clone(), typetag, sink);
669
670            let component_key = key.clone();
671            let healthcheck_task = async move {
672                if enable_healthcheck {
673                    timeout(healthcheck_timeout, healthcheck)
674                        .map(|result| match result {
675                            Ok(Ok(_)) => {
676                                info!("Healthcheck passed.");
677                                Ok(TaskOutput::Healthcheck)
678                            }
679                            Ok(Err(error)) => {
680                                error!(
681                                    msg = "Healthcheck failed.",
682                                    %error,
683                                    component_kind = "sink",
684                                    component_type = typetag,
685                                    component_id = %component_key.id(),
686                                );
687                                Err(TaskError::wrapped(error))
688                            }
689                            Err(e) => {
690                                error!(
691                                    msg = "Healthcheck timed out.",
692                                    component_kind = "sink",
693                                    component_type = typetag,
694                                    component_id = %component_key.id(),
695                                );
696                                Err(TaskError::wrapped(Box::new(e)))
697                            }
698                        })
699                        .await
700                } else {
701                    info!("Healthcheck disabled.");
702                    Ok(TaskOutput::Healthcheck)
703                }
704            };
705
706            let healthcheck_task = Task::new(key.clone(), typetag, healthcheck_task);
707
708            self.inputs.insert(key.clone(), (tx, sink_inputs.clone()));
709            self.healthchecks.insert(key.clone(), healthcheck_task);
710            self.tasks.insert(key.clone(), task);
711            self.detach_triggers.insert(key.clone(), trigger);
712        }
713    }
714}
715
716pub async fn reload_enrichment_tables(config: &Config) {
717    let mut enrichment_tables = HashMap::new();
718    // Build enrichment tables
719    'tables: for (name, table_outer) in config.enrichment_tables.iter() {
720        let table_name = name.to_string();
721        if ENRICHMENT_TABLES.needs_reload(&table_name) {
722            let indexes = Some(ENRICHMENT_TABLES.index_fields(&table_name));
723
724            let mut table = match table_outer.inner.build(&config.global).await {
725                Ok(table) => table,
726                Err(error) => {
727                    error!(
728                        internal_log_rate_limit = true,
729                        "Enrichment table \"{name}\" reload failed: {error}",
730                    );
731                    continue;
732                }
733            };
734
735            if let Some(indexes) = indexes {
736                for (case, index) in indexes {
737                    match table
738                        .add_index(case, &index.iter().map(|s| s.as_ref()).collect::<Vec<_>>())
739                    {
740                        Ok(_) => (),
741                        Err(error) => {
742                            // If there is an error adding an index we do not want to use the reloaded
743                            // data, the previously loaded data will still need to be used.
744                            // Just report the error and continue.
745                            error!(
746                                internal_log_rate_limit = true,
747                                message = "Unable to add index to reloaded enrichment table.",
748                                table = ?name.to_string(),
749                                %error
750                            );
751                            continue 'tables;
752                        }
753                    }
754                }
755            }
756
757            enrichment_tables.insert(table_name, table);
758        }
759    }
760
761    ENRICHMENT_TABLES.load(enrichment_tables);
762    ENRICHMENT_TABLES.finish_load();
763}
764
765pub struct TopologyPieces {
766    pub(super) inputs: HashMap<ComponentKey, (BufferSender<EventArray>, Inputs<OutputId>)>,
767    pub(crate) outputs: HashMap<ComponentKey, HashMap<Option<String>, fanout::ControlChannel>>,
768    pub(super) tasks: HashMap<ComponentKey, Task>,
769    pub(crate) source_tasks: HashMap<ComponentKey, Task>,
770    pub(super) healthchecks: HashMap<ComponentKey, Task>,
771    pub(crate) shutdown_coordinator: SourceShutdownCoordinator,
772    pub(crate) detach_triggers: HashMap<ComponentKey, Trigger>,
773    pub(crate) utilization_emitter: Option<UtilizationEmitter>,
774}
775
776impl TopologyPieces {
777    pub async fn build_or_log_errors(
778        config: &Config,
779        diff: &ConfigDiff,
780        buffers: HashMap<ComponentKey, BuiltBuffer>,
781        extra_context: ExtraContext,
782    ) -> Option<Self> {
783        match TopologyPieces::build(config, diff, buffers, extra_context).await {
784            Err(errors) => {
785                for error in errors {
786                    error!(message = "Configuration error.", %error);
787                }
788                None
789            }
790            Ok(new_pieces) => Some(new_pieces),
791        }
792    }
793
794    /// Builds only the new pieces, and doesn't check their topology.
795    pub async fn build(
796        config: &super::Config,
797        diff: &ConfigDiff,
798        buffers: HashMap<ComponentKey, BuiltBuffer>,
799        extra_context: ExtraContext,
800    ) -> Result<Self, Vec<String>> {
801        Builder::new(config, diff, buffers, extra_context)
802            .build()
803            .await
804    }
805}
806
807const fn filter_events_type(events: &EventArray, data_type: DataType) -> bool {
808    match events {
809        EventArray::Logs(_) => data_type.contains(DataType::Log),
810        EventArray::Metrics(_) => data_type.contains(DataType::Metric),
811        EventArray::Traces(_) => data_type.contains(DataType::Trace),
812    }
813}
814
815#[derive(Debug, Clone)]
816struct TransformNode {
817    key: ComponentKey,
818    typetag: &'static str,
819    inputs: Inputs<OutputId>,
820    input_details: Input,
821    outputs: Vec<TransformOutput>,
822    enable_concurrency: bool,
823}
824
825impl TransformNode {
826    pub fn from_parts(
827        key: ComponentKey,
828        enrichment_tables: vector_lib::enrichment::TableRegistry,
829        transform: &TransformOuter<OutputId>,
830        schema_definition: &[(OutputId, Definition)],
831        global_log_namespace: LogNamespace,
832    ) -> Self {
833        Self {
834            key,
835            typetag: transform.inner.get_component_name(),
836            inputs: transform.inputs.clone(),
837            input_details: transform.inner.input(),
838            outputs: transform.inner.outputs(
839                enrichment_tables,
840                schema_definition,
841                global_log_namespace,
842            ),
843            enable_concurrency: transform.inner.enable_concurrency(),
844        }
845    }
846}
847
848fn build_transform(
849    transform: Transform,
850    node: TransformNode,
851    input_rx: BufferReceiver<EventArray>,
852    utilization_emitter: &mut UtilizationEmitter,
853) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
854    match transform {
855        // TODO: avoid the double boxing for function transforms here
856        Transform::Function(t) => {
857            build_sync_transform(Box::new(t), node, input_rx, utilization_emitter)
858        }
859        Transform::Synchronous(t) => build_sync_transform(t, node, input_rx, utilization_emitter),
860        Transform::Task(t) => build_task_transform(
861            t,
862            input_rx,
863            node.input_details.data_type(),
864            node.typetag,
865            &node.key,
866            &node.outputs,
867            utilization_emitter,
868        ),
869    }
870}
871
872fn build_sync_transform(
873    t: Box<dyn SyncTransform>,
874    node: TransformNode,
875    input_rx: BufferReceiver<EventArray>,
876    utilization_emitter: &mut UtilizationEmitter,
877) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
878    let (outputs, controls) = TransformOutputs::new(node.outputs, &node.key);
879
880    let sender = utilization_emitter.add_component(node.key.clone(), gauge!("utilization"));
881    let runner = Runner::new(t, input_rx, sender, node.input_details.data_type(), outputs);
882    let transform = if node.enable_concurrency {
883        runner.run_concurrently().boxed()
884    } else {
885        runner.run_inline().boxed()
886    };
887
888    let transform = async move {
889        debug!("Synchronous transform starting.");
890
891        match transform.await {
892            Ok(v) => {
893                debug!("Synchronous transform finished normally.");
894                Ok(v)
895            }
896            Err(e) => {
897                debug!("Synchronous transform finished with an error.");
898                Err(e)
899            }
900        }
901    };
902
903    let mut output_controls = HashMap::new();
904    for (name, control) in controls {
905        let id = name
906            .map(|name| OutputId::from((&node.key, name)))
907            .unwrap_or_else(|| OutputId::from(&node.key));
908        output_controls.insert(id, control);
909    }
910
911    let task = Task::new(node.key.clone(), node.typetag, transform);
912
913    (task, output_controls)
914}
915
916struct Runner {
917    transform: Box<dyn SyncTransform>,
918    input_rx: Option<BufferReceiver<EventArray>>,
919    input_type: DataType,
920    outputs: TransformOutputs,
921    timer_tx: UtilizationComponentSender,
922    events_received: Registered<EventsReceived>,
923}
924
925impl Runner {
926    fn new(
927        transform: Box<dyn SyncTransform>,
928        input_rx: BufferReceiver<EventArray>,
929        timer_tx: UtilizationComponentSender,
930        input_type: DataType,
931        outputs: TransformOutputs,
932    ) -> Self {
933        Self {
934            transform,
935            input_rx: Some(input_rx),
936            input_type,
937            outputs,
938            timer_tx,
939            events_received: register!(EventsReceived),
940        }
941    }
942
943    fn on_events_received(&mut self, events: &EventArray) {
944        self.timer_tx.try_send_stop_wait();
945
946        self.events_received.emit(CountByteSize(
947            events.len(),
948            events.estimated_json_encoded_size_of(),
949        ));
950    }
951
952    async fn send_outputs(&mut self, outputs_buf: &mut TransformOutputsBuf) -> crate::Result<()> {
953        self.timer_tx.try_send_start_wait();
954        self.outputs.send(outputs_buf).await
955    }
956
957    async fn run_inline(mut self) -> TaskResult {
958        // 128 is an arbitrary, smallish constant
959        const INLINE_BATCH_SIZE: usize = 128;
960
961        let mut outputs_buf = self.outputs.new_buf_with_capacity(INLINE_BATCH_SIZE);
962
963        let mut input_rx = self
964            .input_rx
965            .take()
966            .expect("can't run runner twice")
967            .into_stream()
968            .filter(move |events| ready(filter_events_type(events, self.input_type)));
969
970        self.timer_tx.try_send_start_wait();
971        while let Some(events) = input_rx.next().await {
972            self.on_events_received(&events);
973            self.transform.transform_all(events, &mut outputs_buf);
974            self.send_outputs(&mut outputs_buf)
975                .await
976                .map_err(TaskError::wrapped)?;
977        }
978
979        Ok(TaskOutput::Transform)
980    }
981
982    async fn run_concurrently(mut self) -> TaskResult {
983        let input_rx = self
984            .input_rx
985            .take()
986            .expect("can't run runner twice")
987            .into_stream()
988            .filter(move |events| ready(filter_events_type(events, self.input_type)));
989
990        let mut input_rx =
991            super::ready_arrays::ReadyArrays::with_capacity(input_rx, READY_ARRAY_CAPACITY);
992
993        let mut in_flight = FuturesOrdered::new();
994        let mut shutting_down = false;
995
996        self.timer_tx.try_send_start_wait();
997        loop {
998            tokio::select! {
999                biased;
1000
1001                result = in_flight.next(), if !in_flight.is_empty() => {
1002                    match result {
1003                        Some(Ok(outputs_buf)) => {
1004                            let mut outputs_buf: TransformOutputsBuf = outputs_buf;
1005                            self.send_outputs(&mut outputs_buf).await
1006                                .map_err(TaskError::wrapped)?;
1007                        }
1008                        _ => unreachable!("join error or bad poll"),
1009                    }
1010                }
1011
1012                input_arrays = input_rx.next(), if in_flight.len() < *TRANSFORM_CONCURRENCY_LIMIT && !shutting_down => {
1013                    match input_arrays {
1014                        Some(input_arrays) => {
1015                            let mut len = 0;
1016                            for events in &input_arrays {
1017                                self.on_events_received(events);
1018                                len += events.len();
1019                            }
1020
1021                            let mut t = self.transform.clone();
1022                            let mut outputs_buf = self.outputs.new_buf_with_capacity(len);
1023                            let task = tokio::spawn(async move {
1024                                for events in input_arrays {
1025                                    t.transform_all(events, &mut outputs_buf);
1026                                }
1027                                outputs_buf
1028                            }.in_current_span());
1029                            in_flight.push_back(task);
1030                        }
1031                        None => {
1032                            shutting_down = true;
1033                            continue
1034                        }
1035                    }
1036                }
1037
1038                else => {
1039                    if shutting_down {
1040                        break
1041                    }
1042                }
1043            }
1044        }
1045
1046        Ok(TaskOutput::Transform)
1047    }
1048}
1049
1050fn build_task_transform(
1051    t: Box<dyn TaskTransform<EventArray>>,
1052    input_rx: BufferReceiver<EventArray>,
1053    input_type: DataType,
1054    typetag: &str,
1055    key: &ComponentKey,
1056    outputs: &[TransformOutput],
1057    utilization_emitter: &mut UtilizationEmitter,
1058) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
1059    let (mut fanout, control) = Fanout::new();
1060
1061    let sender = utilization_emitter.add_component(key.clone(), gauge!("utilization"));
1062    let input_rx = wrap(sender, key.clone(), input_rx.into_stream());
1063
1064    let events_received = register!(EventsReceived);
1065    let filtered = input_rx
1066        .filter(move |events| ready(filter_events_type(events, input_type)))
1067        .inspect(move |events| {
1068            events_received.emit(CountByteSize(
1069                events.len(),
1070                events.estimated_json_encoded_size_of(),
1071            ))
1072        });
1073    let events_sent = register!(EventsSent::from(internal_event::Output(None)));
1074    let output_id = Arc::new(OutputId {
1075        component: key.clone(),
1076        port: None,
1077    });
1078
1079    // Task transforms can only write to the default output, so only a single schema def map is needed
1080    let schema_definition_map = outputs
1081        .iter()
1082        .find(|x| x.port.is_none())
1083        .expect("output for default port required for task transforms")
1084        .log_schema_definitions
1085        .clone()
1086        .into_iter()
1087        .map(|(key, value)| (key, Arc::new(value)))
1088        .collect();
1089
1090    let stream = t
1091        .transform(Box::pin(filtered))
1092        .map(move |mut events| {
1093            for event in events.iter_events_mut() {
1094                update_runtime_schema_definition(event, &output_id, &schema_definition_map);
1095            }
1096            (events, Instant::now())
1097        })
1098        .inspect(move |(events, _): &(EventArray, Instant)| {
1099            events_sent.emit(CountByteSize(
1100                events.len(),
1101                events.estimated_json_encoded_size_of(),
1102            ));
1103        });
1104    let transform = async move {
1105        debug!("Task transform starting.");
1106
1107        match fanout.send_stream(stream).await {
1108            Ok(()) => {
1109                debug!("Task transform finished normally.");
1110                Ok(TaskOutput::Transform)
1111            }
1112            Err(e) => {
1113                debug!("Task transform finished with an error.");
1114                Err(TaskError::wrapped(e))
1115            }
1116        }
1117    }
1118    .boxed();
1119
1120    let mut outputs = HashMap::new();
1121    outputs.insert(OutputId::from(key), control);
1122
1123    let task = Task::new(key.clone(), typetag, transform);
1124
1125    (task, outputs)
1126}