vector/topology/
running.rs

1use std::{
2    collections::{HashMap, HashSet},
3    sync::{
4        atomic::{AtomicBool, Ordering},
5        Arc, Mutex,
6    },
7};
8
9use super::{
10    builder::{self, reload_enrichment_tables, TopologyPieces},
11    fanout::{ControlChannel, ControlMessage},
12    handle_errors, retain, take_healthchecks,
13    task::{Task, TaskOutput},
14    BuiltBuffer, TaskHandle,
15};
16use crate::{
17    config::{ComponentKey, Config, ConfigDiff, HealthcheckOptions, Inputs, OutputId, Resource},
18    event::EventArray,
19    extra_context::ExtraContext,
20    shutdown::SourceShutdownCoordinator,
21    signal::ShutdownError,
22    spawn_named,
23};
24use futures::{future, Future, FutureExt};
25use stream_cancel::Trigger;
26use tokio::{
27    sync::{mpsc, watch},
28    time::{interval, sleep_until, Duration, Instant},
29};
30use tracing::Instrument;
31use vector_lib::tap::topology::{TapOutput, TapResource, WatchRx, WatchTx};
32use vector_lib::trigger::DisabledTrigger;
33use vector_lib::{buffers::topology::channel::BufferSender, shutdown::ShutdownSignal};
34
35pub type ShutdownErrorReceiver = mpsc::UnboundedReceiver<ShutdownError>;
36
37#[allow(dead_code)]
38pub struct RunningTopology {
39    inputs: HashMap<ComponentKey, BufferSender<EventArray>>,
40    inputs_tap_metadata: HashMap<ComponentKey, Inputs<OutputId>>,
41    outputs: HashMap<OutputId, ControlChannel>,
42    outputs_tap_metadata: HashMap<ComponentKey, (&'static str, String)>,
43    source_tasks: HashMap<ComponentKey, TaskHandle>,
44    tasks: HashMap<ComponentKey, TaskHandle>,
45    shutdown_coordinator: SourceShutdownCoordinator,
46    detach_triggers: HashMap<ComponentKey, DisabledTrigger>,
47    pub(crate) config: Config,
48    pub(crate) abort_tx: mpsc::UnboundedSender<ShutdownError>,
49    watch: (WatchTx, WatchRx),
50    pub(crate) running: Arc<AtomicBool>,
51    graceful_shutdown_duration: Option<Duration>,
52    utilization_task: Option<TaskHandle>,
53    utilization_task_shutdown_trigger: Option<Trigger>,
54    pending_reload: Option<HashSet<ComponentKey>>,
55}
56
57impl RunningTopology {
58    pub fn new(config: Config, abort_tx: mpsc::UnboundedSender<ShutdownError>) -> Self {
59        Self {
60            inputs: HashMap::new(),
61            inputs_tap_metadata: HashMap::new(),
62            outputs: HashMap::new(),
63            outputs_tap_metadata: HashMap::new(),
64            shutdown_coordinator: SourceShutdownCoordinator::default(),
65            detach_triggers: HashMap::new(),
66            source_tasks: HashMap::new(),
67            tasks: HashMap::new(),
68            abort_tx,
69            watch: watch::channel(TapResource::default()),
70            running: Arc::new(AtomicBool::new(true)),
71            graceful_shutdown_duration: config.graceful_shutdown_duration,
72            config,
73            utilization_task: None,
74            utilization_task_shutdown_trigger: None,
75            pending_reload: None,
76        }
77    }
78
79    /// Gets the configuration that represents this running topology.
80    pub const fn config(&self) -> &Config {
81        &self.config
82    }
83
84    /// Adds a set of component keys to the pending reload set if one exists. Otherwise, it
85    /// initializes the pending reload set.
86    pub fn extend_reload_set(&mut self, new_set: HashSet<ComponentKey>) {
87        match &mut self.pending_reload {
88            None => self.pending_reload = Some(new_set.clone()),
89            Some(existing) => existing.extend(new_set),
90        }
91    }
92
93    /// Creates a subscription to topology changes.
94    ///
95    /// This is used by the tap API to observe configuration changes, and re-wire tap sinks.
96    pub fn watch(&self) -> watch::Receiver<TapResource> {
97        self.watch.1.clone()
98    }
99
100    /// Signal that all sources in this topology are ended.
101    ///
102    /// The future returned by this function will finish once all the sources in
103    /// this topology have finished. This allows the caller to wait for or
104    /// detect that the sources in the topology are no longer
105    /// producing. [`Application`][crate::app::Application], as an example, uses this as a
106    /// shutdown signal.
107    pub fn sources_finished(&self) -> future::BoxFuture<'static, ()> {
108        self.shutdown_coordinator.shutdown_tripwire()
109    }
110
111    /// Shut down all topology components.
112    ///
113    /// This function sends the shutdown signal to all sources in this topology
114    /// and returns a future that resolves once all components (sources,
115    /// transforms, and sinks) have finished shutting down. Transforms and sinks
116    /// will shut down automatically once their input tasks finish.
117    ///
118    /// This function takes ownership of `self`, so once it returns everything
119    /// in the [`RunningTopology`] instance has been dropped except for the
120    /// `tasks` map. This map gets moved into the returned future and is used to
121    /// poll for when the tasks have completed. Once the returned future is
122    /// dropped then everything from this RunningTopology instance is fully
123    /// dropped.
124    pub fn stop(self) -> impl Future<Output = ()> {
125        // Update the API's health endpoint to signal shutdown
126        self.running.store(false, Ordering::Relaxed);
127        // Create handy handles collections of all tasks for the subsequent
128        // operations.
129        let mut wait_handles = Vec::new();
130        // We need a Vec here since source components have two tasks. One for
131        // pump in self.tasks, and the other for source in self.source_tasks.
132        let mut check_handles = HashMap::<ComponentKey, Vec<_>>::new();
133
134        let map_closure = |_result| ();
135
136        // We need to give some time to the sources to gracefully shutdown, so
137        // we will merge them with other tasks.
138        for (key, task) in self.tasks.into_iter().chain(self.source_tasks.into_iter()) {
139            let task = task.map(map_closure).shared();
140
141            wait_handles.push(task.clone());
142            check_handles.entry(key).or_default().push(task);
143        }
144
145        if let Some(utilization_task) = self.utilization_task {
146            wait_handles.push(utilization_task.map(map_closure).shared());
147        }
148
149        // If we reach this, we will forcefully shutdown the sources. If None, we will never force shutdown.
150        let deadline = self
151            .graceful_shutdown_duration
152            .map(|grace_period| Instant::now() + grace_period);
153
154        let timeout = if let Some(deadline) = deadline {
155            // If we reach the deadline, this future will print out which components
156            // won't gracefully shutdown since we will start to forcefully shutdown
157            // the sources.
158            let mut check_handles2 = check_handles.clone();
159            Box::pin(async move {
160                sleep_until(deadline).await;
161                // Remove all tasks that have shutdown.
162                check_handles2.retain(|_key, handles| {
163                    retain(handles, |handle| handle.peek().is_none());
164                    !handles.is_empty()
165                });
166                let remaining_components = check_handles2
167                    .keys()
168                    .map(|item| item.to_string())
169                    .collect::<Vec<_>>()
170                    .join(", ");
171
172                error!(
173                    components = ?remaining_components,
174                    "Failed to gracefully shut down in time. Killing components."
175                );
176            }) as future::BoxFuture<'static, ()>
177        } else {
178            Box::pin(future::pending()) as future::BoxFuture<'static, ()>
179        };
180
181        // Reports in intervals which components are still running.
182        let mut interval = interval(Duration::from_secs(5));
183        let reporter = async move {
184            loop {
185                interval.tick().await;
186
187                // Remove all tasks that have shutdown.
188                check_handles.retain(|_key, handles| {
189                    retain(handles, |handle| handle.peek().is_none());
190                    !handles.is_empty()
191                });
192                let remaining_components = check_handles
193                    .keys()
194                    .map(|item| item.to_string())
195                    .collect::<Vec<_>>()
196                    .join(", ");
197
198                let time_remaining = deadline
199                    .map(|d| match d.checked_duration_since(Instant::now()) {
200                        Some(remaining) => format!("{} seconds left", remaining.as_secs()),
201                        None => "overdue".to_string(),
202                    })
203                    .unwrap_or("no time limit".to_string());
204
205                info!(
206                    remaining_components = ?remaining_components,
207                    time_remaining = ?time_remaining,
208                    "Shutting down... Waiting on running components."
209                );
210            }
211        };
212
213        // Finishes once all tasks have shutdown.
214        let success = futures::future::join_all(wait_handles).map(|_| ());
215
216        // Aggregate future that ends once anything detects that all tasks have shutdown.
217        let shutdown_complete_future = future::select_all(vec![
218            Box::pin(timeout) as future::BoxFuture<'static, ()>,
219            Box::pin(reporter) as future::BoxFuture<'static, ()>,
220            Box::pin(success) as future::BoxFuture<'static, ()>,
221        ]);
222
223        // Now kick off the shutdown process by shutting down the sources.
224        let source_shutdown_complete = self.shutdown_coordinator.shutdown_all(deadline);
225        if let Some(trigger) = self.utilization_task_shutdown_trigger {
226            trigger.cancel();
227        }
228
229        futures::future::join(source_shutdown_complete, shutdown_complete_future).map(|_| ())
230    }
231
232    /// Attempts to load a new configuration and update this running topology.
233    ///
234    /// If the new configuration was valid, and all changes were able to be made -- removing of
235    /// old components, changing of existing components, adding of new components -- then `Ok(true)`
236    /// is returned.
237    ///
238    /// If the new configuration is not valid, or not all of the changes in the new configuration
239    /// were able to be made, then this method will attempt to undo the changes made and bring the
240    /// topology back to its previous state.  If either of these scenarios occur, then `Ok(false)`
241    /// is returned.
242    ///
243    /// # Errors
244    ///
245    /// If all changes from the new configuration cannot be made, and the current configuration
246    /// cannot be fully restored, then `Err(())` is returned.
247    pub async fn reload_config_and_respawn(
248        &mut self,
249        new_config: Config,
250        extra_context: ExtraContext,
251    ) -> Result<bool, ()> {
252        info!("Reloading running topology with new configuration.");
253
254        if self.config.global != new_config.global {
255            error!(
256                message =
257                "Global options can't be changed while reloading config file; reload aborted. Please restart Vector to reload the configuration file."
258            );
259            return Ok(false);
260        }
261
262        // Calculate the change between the current configuration and the new configuration, and
263        // shutdown any components that are changing so that we can reclaim their buffers before
264        // spawning the new version of the component.
265        //
266        // We also shutdown any component that is simply being removed entirely.
267        let diff = if let Some(components) = &self.pending_reload {
268            ConfigDiff::new(&self.config, &new_config, components.clone())
269        } else {
270            ConfigDiff::new(&self.config, &new_config, HashSet::new())
271        };
272        let buffers = self.shutdown_diff(&diff, &new_config).await;
273
274        // Gives windows some time to make available any port
275        // released by shutdown components.
276        // Issue: https://github.com/vectordotdev/vector/issues/3035
277        if cfg!(windows) {
278            // This value is guess work.
279            tokio::time::sleep(Duration::from_millis(200)).await;
280        }
281
282        // Try to build all of the new components coming from the new configuration.  If we can
283        // successfully build them, we'll attempt to connect them up to the topology and spawn their
284        // respective component tasks.
285        if let Some(mut new_pieces) = TopologyPieces::build_or_log_errors(
286            &new_config,
287            &diff,
288            buffers.clone(),
289            extra_context.clone(),
290        )
291        .await
292        {
293            // If healthchecks are configured for any of the changing/new components, try running
294            // them before moving forward with connecting and spawning.  In some cases, healthchecks
295            // failing may be configured as a non-blocking issue and so we'll still continue on.
296            if self
297                .run_healthchecks(&diff, &mut new_pieces, new_config.healthchecks)
298                .await
299            {
300                self.connect_diff(&diff, &mut new_pieces).await;
301                self.spawn_diff(&diff, new_pieces);
302                self.config = new_config;
303
304                info!("New configuration loaded successfully.");
305
306                return Ok(true);
307            }
308        }
309
310        // We failed to build, connect, and spawn all of the changed/new components, so we flip
311        // around the configuration differential to generate all the components that we need to
312        // bring back to restore the current configuration.
313        warn!("Failed to completely load new configuration. Restoring old configuration.");
314
315        let diff = diff.flip();
316        if let Some(mut new_pieces) =
317            TopologyPieces::build_or_log_errors(&self.config, &diff, buffers, extra_context.clone())
318                .await
319        {
320            if self
321                .run_healthchecks(&diff, &mut new_pieces, self.config.healthchecks)
322                .await
323            {
324                self.connect_diff(&diff, &mut new_pieces).await;
325                self.spawn_diff(&diff, new_pieces);
326
327                info!("Old configuration restored successfully.");
328
329                return Ok(false);
330            }
331        }
332
333        error!("Failed to restore old configuration.");
334
335        Err(())
336    }
337
338    /// Attempts to reload enrichment tables.
339    pub(crate) async fn reload_enrichment_tables(&self) {
340        reload_enrichment_tables(&self.config).await;
341    }
342
343    pub(crate) async fn run_healthchecks(
344        &mut self,
345        diff: &ConfigDiff,
346        pieces: &mut TopologyPieces,
347        options: HealthcheckOptions,
348    ) -> bool {
349        if options.enabled {
350            let healthchecks = take_healthchecks(diff, pieces)
351                .into_iter()
352                .map(|(_, task)| task);
353            let healthchecks = future::try_join_all(healthchecks);
354
355            info!("Running healthchecks.");
356            if options.require_healthy {
357                let success = healthchecks.await;
358
359                if success.is_ok() {
360                    info!("All healthchecks passed.");
361                    true
362                } else {
363                    error!("Sinks unhealthy.");
364                    false
365                }
366            } else {
367                tokio::spawn(healthchecks);
368                true
369            }
370        } else {
371            true
372        }
373    }
374
375    /// Shuts down any changed/removed component in the given configuration diff.
376    ///
377    /// If buffers for any of the changed/removed components can be recovered, they'll be returned.
378    async fn shutdown_diff(
379        &mut self,
380        diff: &ConfigDiff,
381        new_config: &Config,
382    ) -> HashMap<ComponentKey, BuiltBuffer> {
383        // First, we shutdown any changed/removed sources. This ensures that we can allow downstream
384        // components to terminate naturally by virtue of the flow of events stopping.
385        if diff.sources.any_changed_or_removed() {
386            let timeout = Duration::from_secs(30);
387            let mut source_shutdown_handles = Vec::new();
388
389            let deadline = Instant::now() + timeout;
390            for key in &diff.sources.to_remove {
391                debug!(component = %key, "Removing source.");
392
393                let previous = self.tasks.remove(key).unwrap();
394                drop(previous); // detach and forget
395
396                self.remove_outputs(key);
397                source_shutdown_handles
398                    .push(self.shutdown_coordinator.shutdown_source(key, deadline));
399            }
400
401            for key in &diff.sources.to_change {
402                debug!(component = %key, "Changing source.");
403
404                self.remove_outputs(key);
405                source_shutdown_handles
406                    .push(self.shutdown_coordinator.shutdown_source(key, deadline));
407            }
408
409            debug!(
410                "Waiting for up to {} seconds for source(s) to finish shutting down.",
411                timeout.as_secs()
412            );
413            futures::future::join_all(source_shutdown_handles).await;
414
415            // Final cleanup pass now that all changed/removed sources have signalled as having shutdown.
416            for key in diff.sources.removed_and_changed() {
417                if let Some(task) = self.source_tasks.remove(key) {
418                    task.await.unwrap().unwrap();
419                }
420            }
421        }
422
423        // Next, we shutdown any changed/removed transforms.  Same as before: we want allow
424        // downstream components to terminate naturally by virtue of the flow of events stopping.
425        //
426        // Since transforms are entirely driven by the flow of events into them from upstream
427        // components, the shutdown of sources they depend on, or the shutdown of transforms they
428        // depend on, and thus the closing of their buffer, will naturally cause them to shutdown,
429        // which is why we don't do any manual triggering of shutdown here.
430        for key in &diff.transforms.to_remove {
431            debug!(component = %key, "Removing transform.");
432
433            let previous = self.tasks.remove(key).unwrap();
434            drop(previous); // detach and forget
435
436            self.remove_inputs(key, diff, new_config).await;
437            self.remove_outputs(key);
438        }
439
440        for key in &diff.transforms.to_change {
441            debug!(component = %key, "Changing transform.");
442
443            self.remove_inputs(key, diff, new_config).await;
444            self.remove_outputs(key);
445        }
446
447        // Now we'll process any changed/removed sinks.
448        //
449        // At this point both the old and the new config don't have conflicts in their resource
450        // usage. So if we combine their resources, all found conflicts are between to be removed
451        // and to be added components.
452        let removed_table_sinks = diff
453            .enrichment_tables
454            .removed_and_changed()
455            .filter_map(|key| {
456                self.config
457                    .enrichment_table(key)
458                    .and_then(|t| t.as_sink(key))
459                    .map(|(key, s)| (key.clone(), s.resources(&key)))
460            })
461            .collect::<Vec<_>>();
462        let remove_sink = diff
463            .sinks
464            .removed_and_changed()
465            .map(|key| {
466                (
467                    key,
468                    self.config
469                        .sink(key)
470                        .map(|s| s.resources(key))
471                        .unwrap_or_default(),
472                )
473            })
474            .chain(removed_table_sinks.iter().map(|(k, s)| (k, s.clone())));
475        let add_source = diff
476            .sources
477            .changed_and_added()
478            .map(|key| (key, new_config.source(key).unwrap().inner.resources()));
479        let added_table_sinks = diff
480            .enrichment_tables
481            .changed_and_added()
482            .filter_map(|key| {
483                self.config
484                    .enrichment_table(key)
485                    .and_then(|t| t.as_sink(key))
486                    .map(|(key, s)| (key.clone(), s.resources(&key)))
487            })
488            .collect::<Vec<_>>();
489        let add_sink = diff
490            .sinks
491            .changed_and_added()
492            .map(|key| {
493                (
494                    key,
495                    new_config
496                        .sink(key)
497                        .map(|s| s.resources(key))
498                        .unwrap_or_default(),
499                )
500            })
501            .chain(added_table_sinks.iter().map(|(k, s)| (k, s.clone())));
502        let conflicts = Resource::conflicts(
503            remove_sink.map(|(key, value)| ((true, key), value)).chain(
504                add_sink
505                    .chain(add_source)
506                    .map(|(key, value)| ((false, key), value)),
507            ),
508        )
509        .into_iter()
510        .flat_map(|(_, components)| components)
511        .collect::<HashSet<_>>();
512        // Existing conflicting sinks
513        let conflicting_sinks = conflicts
514            .into_iter()
515            .filter(|&(existing_sink, _)| existing_sink)
516            .map(|(_, key)| key.clone());
517
518        // For any sink whose buffer configuration didn't change, we can reuse their buffer.
519        let reuse_buffers = diff
520            .sinks
521            .to_change
522            .iter()
523            .filter(|&key| {
524                if diff.components_to_reload.contains(key) {
525                    return false;
526                }
527                self.config.sink(key).map(|s| s.buffer.clone()).or_else(|| {
528                    self.config
529                        .enrichment_table(key)
530                        .and_then(|t| t.as_sink(key))
531                        .map(|(_, s)| s.buffer)
532                }) == new_config.sink(key).map(|s| s.buffer.clone()).or_else(|| {
533                    self.config
534                        .enrichment_table(key)
535                        .and_then(|t| t.as_sink(key))
536                        .map(|(_, s)| s.buffer)
537                })
538            })
539            .cloned()
540            .collect::<HashSet<_>>();
541
542        // For any existing sink that has a conflicting resource dependency with a changed/added
543        // sink, or for any sink that we want to reuse their buffer, we need to explicit wait for
544        // them to finish processing so we can reclaim ownership of those resources/buffers.
545        let wait_for_sinks = conflicting_sinks
546            .chain(reuse_buffers.iter().cloned())
547            .collect::<HashSet<_>>();
548
549        // First, we remove any inputs to removed sinks so they can naturally shut down.
550        let removed_sinks = diff
551            .sinks
552            .to_remove
553            .iter()
554            .chain(diff.enrichment_tables.to_remove.iter().filter(|key| {
555                self.config
556                    .enrichment_table(key)
557                    .and_then(|t| t.as_sink(key))
558                    .is_some()
559            }))
560            .collect::<Vec<_>>();
561        for key in &removed_sinks {
562            debug!(component = %key, "Removing sink.");
563            self.remove_inputs(key, diff, new_config).await;
564        }
565
566        // After that, for any changed sinks, we temporarily detach their inputs (not remove) so
567        // they can naturally shutdown and allow us to recover their buffers if possible.
568        let mut buffer_tx = HashMap::new();
569
570        let sinks_to_change = diff
571            .sinks
572            .to_change
573            .iter()
574            .chain(diff.enrichment_tables.to_change.iter().filter(|key| {
575                self.config
576                    .enrichment_table(key)
577                    .and_then(|t| t.as_sink(key))
578                    .is_some()
579            }))
580            .collect::<Vec<_>>();
581
582        for key in &sinks_to_change {
583            debug!(component = %key, "Changing sink.");
584            if reuse_buffers.contains(key) {
585                self.detach_triggers
586                    .remove(key)
587                    .unwrap()
588                    .into_inner()
589                    .cancel();
590
591                // We explicitly clone the input side of the buffer and store it so we don't lose
592                // it when we remove the inputs below.
593                //
594                // We clone instead of removing here because otherwise the input will be missing for
595                // the rest of the reload process, which violates the assumption that all previous
596                // inputs for components not being removed are still available. It's simpler to
597                // allow the "old" input to stick around and be replaced (even though that's
598                // basically a no-op since we're reusing the same buffer) than it is to pass around
599                // info about which sinks are having their buffers reused and treat them differently
600                // at other stages.
601                buffer_tx.insert((*key).clone(), self.inputs.get(key).unwrap().clone());
602            }
603            self.remove_inputs(key, diff, new_config).await;
604        }
605
606        // Now that we've disconnected or temporarily detached the inputs to all changed/removed
607        // sinks, we can actually wait for them to shutdown before collecting any buffers that are
608        // marked for reuse.
609        //
610        // If a sink we're removing isn't tying up any resource that a changed/added sink depends
611        // on, we don't bother waiting for it to shutdown.
612        for key in &removed_sinks {
613            let previous = self.tasks.remove(key).unwrap();
614            if wait_for_sinks.contains(key) {
615                debug!(message = "Waiting for sink to shutdown.", %key);
616                previous.await.unwrap().unwrap();
617            } else {
618                drop(previous); // detach and forget
619            }
620        }
621
622        let mut buffers = HashMap::<ComponentKey, BuiltBuffer>::new();
623        for key in &sinks_to_change {
624            if wait_for_sinks.contains(key) {
625                let previous = self.tasks.remove(key).unwrap();
626                debug!(message = "Waiting for sink to shutdown.", %key);
627                let buffer = previous.await.unwrap().unwrap();
628
629                if reuse_buffers.contains(key) {
630                    // We clone instead of removing here because otherwise the input will be
631                    // missing for the rest of the reload process, which violates the assumption
632                    // that all previous inputs for components not being removed are still
633                    // available. It's simpler to allow the "old" input to stick around and be
634                    // replaced (even though that's basically a no-op since we're reusing the same
635                    // buffer) than it is to pass around info about which sinks are having their
636                    // buffers reused and treat them differently at other stages.
637                    let tx = buffer_tx.remove(key).unwrap();
638                    let rx = match buffer {
639                        TaskOutput::Sink(rx) => rx.into_inner(),
640                        _ => unreachable!(),
641                    };
642
643                    buffers.insert((*key).clone(), (tx, Arc::new(Mutex::new(Some(rx)))));
644                }
645            }
646        }
647
648        buffers
649    }
650
651    /// Connects all changed/added components in the given configuration diff.
652    pub(crate) async fn connect_diff(
653        &mut self,
654        diff: &ConfigDiff,
655        new_pieces: &mut TopologyPieces,
656    ) {
657        debug!("Connecting changed/added component(s).");
658
659        // Update tap metadata
660        if !self.watch.0.is_closed() {
661            for key in &diff.sources.to_remove {
662                // Sources only have outputs
663                self.outputs_tap_metadata.remove(key);
664            }
665
666            for key in &diff.transforms.to_remove {
667                // Transforms can have both inputs and outputs
668                self.outputs_tap_metadata.remove(key);
669                self.inputs_tap_metadata.remove(key);
670            }
671
672            for key in &diff.sinks.to_remove {
673                // Sinks only have inputs
674                self.inputs_tap_metadata.remove(key);
675            }
676
677            let removed_sinks = diff.enrichment_tables.to_remove.iter().filter(|key| {
678                self.config
679                    .enrichment_table(key)
680                    .and_then(|t| t.as_sink(key))
681                    .is_some()
682            });
683            for key in removed_sinks {
684                // Sinks only have inputs
685                self.inputs_tap_metadata.remove(key);
686            }
687
688            for key in diff.sources.changed_and_added() {
689                if let Some(task) = new_pieces.tasks.get(key) {
690                    self.outputs_tap_metadata
691                        .insert(key.clone(), ("source", task.typetag().to_string()));
692                }
693            }
694
695            for key in diff.transforms.changed_and_added() {
696                if let Some(task) = new_pieces.tasks.get(key) {
697                    self.outputs_tap_metadata
698                        .insert(key.clone(), ("transform", task.typetag().to_string()));
699                }
700            }
701
702            for (key, input) in &new_pieces.inputs {
703                self.inputs_tap_metadata
704                    .insert(key.clone(), input.1.clone());
705            }
706        }
707
708        // We configure the outputs of any changed/added sources first, so they're available to any
709        // transforms and sinks that come afterwards.
710        for key in diff.sources.changed_and_added() {
711            debug!(component = %key, "Configuring outputs for source.");
712            self.setup_outputs(key, new_pieces).await;
713        }
714
715        let added_changed_table_sources: Vec<&ComponentKey> = diff
716            .enrichment_tables
717            .changed_and_added()
718            .filter(|k| new_pieces.source_tasks.contains_key(k))
719            .collect();
720        for key in added_changed_table_sources {
721            debug!(component = %key, "Connecting outputs for enrichment table source.");
722            self.setup_outputs(key, new_pieces).await;
723        }
724
725        // We configure the outputs of any changed/added transforms next, for the same reason: we
726        // need them to be available to any transforms and sinks that come afterwards.
727        for key in diff.transforms.changed_and_added() {
728            debug!(component = %key, "Configuring outputs for transform.");
729            self.setup_outputs(key, new_pieces).await;
730        }
731
732        // Now that all possible outputs are configured, we can start wiring up inputs, starting
733        // with transforms.
734        for key in diff.transforms.changed_and_added() {
735            debug!(component = %key, "Connecting inputs for transform.");
736            self.setup_inputs(key, diff, new_pieces).await;
737        }
738
739        // Now that all sources and transforms are fully configured, we can wire up sinks.
740        for key in diff.sinks.changed_and_added() {
741            debug!(component = %key, "Connecting inputs for sink.");
742            self.setup_inputs(key, diff, new_pieces).await;
743        }
744        let added_changed_tables: Vec<&ComponentKey> = diff
745            .enrichment_tables
746            .changed_and_added()
747            .filter(|k| new_pieces.inputs.contains_key(k))
748            .collect();
749        for key in added_changed_tables {
750            debug!(component = %key, "Connecting inputs for enrichment table sink.");
751            self.setup_inputs(key, diff, new_pieces).await;
752        }
753
754        // We do a final pass here to reconnect unchanged components.
755        //
756        // Why would we reconnect unchanged components?  Well, as sources and transforms will
757        // recreate their fanouts every time they're changed, we can run into a situation where a
758        // transform/sink, which we'll call B, is pointed at a source/transform that was changed, which
759        // we'll call A, but because B itself didn't change at all, we haven't yet reconnected it.
760        //
761        // Instead of propagating connections forward -- B reconnecting A forcefully -- we only
762        // connect components backwards i.e. transforms to sources/transforms, and sinks to
763        // sources/transforms, to ensure we're connecting components in order.
764        self.reattach_severed_inputs(diff);
765
766        // Broadcast any topology changes to subscribers.
767        if !self.watch.0.is_closed() {
768            let outputs = self
769                .outputs
770                .clone()
771                .into_iter()
772                .flat_map(|(output_id, control_tx)| {
773                    self.outputs_tap_metadata.get(&output_id.component).map(
774                        |(component_kind, component_type)| {
775                            (
776                                TapOutput {
777                                    output_id,
778                                    component_kind,
779                                    component_type: component_type.clone(),
780                                },
781                                control_tx,
782                            )
783                        },
784                    )
785                })
786                .collect::<HashMap<_, _>>();
787
788            let mut removals = diff.sources.to_remove.clone();
789            removals.extend(diff.transforms.to_remove.iter().cloned());
790            self.watch
791                .0
792                .send(TapResource {
793                    outputs,
794                    inputs: self.inputs_tap_metadata.clone(),
795                    source_keys: diff
796                        .sources
797                        .changed_and_added()
798                        .map(|key| key.to_string())
799                        .collect(),
800                    sink_keys: diff
801                        .sinks
802                        .changed_and_added()
803                        .map(|key| key.to_string())
804                        .collect(),
805                    // Note, only sources and transforms are relevant. Sinks do
806                    // not have outputs to tap.
807                    removals,
808                })
809                .expect("Couldn't broadcast config changes.");
810        }
811    }
812
813    async fn setup_outputs(
814        &mut self,
815        key: &ComponentKey,
816        new_pieces: &mut builder::TopologyPieces,
817    ) {
818        let outputs = new_pieces.outputs.remove(key).unwrap();
819        for (port, output) in outputs {
820            debug!(component = %key, output_id = ?port, "Configuring output for component.");
821
822            let id = OutputId {
823                component: key.clone(),
824                port,
825            };
826
827            self.outputs.insert(id, output);
828        }
829    }
830
831    async fn setup_inputs(
832        &mut self,
833        key: &ComponentKey,
834        diff: &ConfigDiff,
835        new_pieces: &mut builder::TopologyPieces,
836    ) {
837        let (tx, inputs) = new_pieces.inputs.remove(key).unwrap();
838
839        let old_inputs = self
840            .config
841            .inputs_for_node(key)
842            .into_iter()
843            .flatten()
844            .cloned()
845            .collect::<HashSet<_>>();
846
847        let new_inputs = inputs.iter().cloned().collect::<HashSet<_>>();
848        let inputs_to_add = &new_inputs - &old_inputs;
849
850        for input in inputs {
851            let output = self.outputs.get_mut(&input).expect("unknown output");
852
853            if diff.contains(&input.component) || inputs_to_add.contains(&input) {
854                // If the input we're connecting to is changing, that means its outputs will have been
855                // recreated, so instead of replacing a paused sink, we have to add it to this new
856                // output for the first time, since there's nothing to actually replace at this point.
857                debug!(component = %key, fanout_id = %input, "Adding component input to fanout.");
858
859                _ = output.send(ControlMessage::Add(key.clone(), tx.clone()));
860            } else {
861                // We know that if this component is connected to a given input, and neither
862                // components were changed, then the output must still exist, which means we paused
863                // this component's connection to its output, so we have to replace that connection
864                // now:
865                debug!(component = %key, fanout_id = %input, "Replacing component input in fanout.");
866
867                _ = output.send(ControlMessage::Replace(key.clone(), tx.clone()));
868            }
869        }
870
871        self.inputs.insert(key.clone(), tx);
872        new_pieces
873            .detach_triggers
874            .remove(key)
875            .map(|trigger| self.detach_triggers.insert(key.clone(), trigger.into()));
876    }
877
878    fn remove_outputs(&mut self, key: &ComponentKey) {
879        self.outputs.retain(|id, _output| &id.component != key);
880    }
881
882    async fn remove_inputs(&mut self, key: &ComponentKey, diff: &ConfigDiff, new_config: &Config) {
883        self.inputs.remove(key);
884        self.detach_triggers.remove(key);
885
886        let old_inputs = self.config.inputs_for_node(key).expect("node exists");
887        let new_inputs = new_config
888            .inputs_for_node(key)
889            .unwrap_or_default()
890            .iter()
891            .collect::<HashSet<_>>();
892
893        for input in old_inputs {
894            if let Some(output) = self.outputs.get_mut(input) {
895                if diff.contains(&input.component)
896                    || diff.is_removed(key)
897                    || !new_inputs.contains(input)
898                {
899                    // 3 cases to remove the input:
900                    //
901                    // Case 1: If the input we're removing ourselves from is changing, that means its
902                    // outputs will be recreated, so instead of pausing the sink, we just delete it
903                    // outright to ensure things are clean.
904                    //
905                    // Case 2: If this component itself is being removed, then pausing makes no sense
906                    // because it isn't coming back.
907                    //
908                    // Case 3: This component is no longer connected to the input from new config.
909                    debug!(component = %key, fanout_id = %input, "Removing component input from fanout.");
910
911                    _ = output.send(ControlMessage::Remove(key.clone()));
912                } else {
913                    // We know that if this component is connected to a given input, and it isn't being
914                    // changed, then it will exist when we reconnect inputs, so we should pause it
915                    // now to pause further sends through that component until we reconnect:
916                    debug!(component = %key, fanout_id = %input, "Pausing component input in fanout.");
917
918                    _ = output.send(ControlMessage::Pause(key.clone()));
919                }
920            }
921        }
922    }
923
924    fn reattach_severed_inputs(&mut self, diff: &ConfigDiff) {
925        let unchanged_transforms = self
926            .config
927            .transforms()
928            .filter(|(key, _)| !diff.transforms.contains(key));
929        for (transform_key, transform) in unchanged_transforms {
930            let changed_outputs = get_changed_outputs(diff, transform.inputs.clone());
931            for output_id in changed_outputs {
932                debug!(component = %transform_key, fanout_id = %output_id.component, "Reattaching component input to fanout.");
933
934                let input = self.inputs.get(transform_key).cloned().unwrap();
935                let output = self.outputs.get_mut(&output_id).unwrap();
936                _ = output.send(ControlMessage::Add(transform_key.clone(), input));
937            }
938        }
939
940        let unchanged_sinks = self
941            .config
942            .sinks()
943            .filter(|(key, _)| !diff.sinks.contains(key));
944        for (sink_key, sink) in unchanged_sinks {
945            let changed_outputs = get_changed_outputs(diff, sink.inputs.clone());
946            for output_id in changed_outputs {
947                debug!(component = %sink_key, fanout_id = %output_id.component, "Reattaching component input to fanout.");
948
949                let input = self.inputs.get(sink_key).cloned().unwrap();
950                let output = self.outputs.get_mut(&output_id).unwrap();
951                _ = output.send(ControlMessage::Add(sink_key.clone(), input));
952            }
953        }
954    }
955
956    /// Starts any new or changed components in the given configuration diff.
957    pub(crate) fn spawn_diff(&mut self, diff: &ConfigDiff, mut new_pieces: TopologyPieces) {
958        for key in &diff.sources.to_change {
959            debug!(message = "Spawning changed source.", key = %key);
960            self.spawn_source(key, &mut new_pieces);
961        }
962
963        for key in &diff.sources.to_add {
964            debug!(message = "Spawning new source.", key = %key);
965            self.spawn_source(key, &mut new_pieces);
966        }
967
968        let changed_table_sources: Vec<&ComponentKey> = diff
969            .enrichment_tables
970            .to_change
971            .iter()
972            .filter(|k| new_pieces.source_tasks.contains_key(k))
973            .collect();
974
975        let added_table_sources: Vec<&ComponentKey> = diff
976            .enrichment_tables
977            .to_add
978            .iter()
979            .filter(|k| new_pieces.source_tasks.contains_key(k))
980            .collect();
981
982        for key in changed_table_sources {
983            debug!(message = "Spawning changed enrichment table source.", key = %key);
984            self.spawn_source(key, &mut new_pieces);
985        }
986
987        for key in added_table_sources {
988            debug!(message = "Spawning new enrichment table source.", key = %key);
989            self.spawn_source(key, &mut new_pieces);
990        }
991
992        for key in &diff.transforms.to_change {
993            debug!(message = "Spawning changed transform.", key = %key);
994            self.spawn_transform(key, &mut new_pieces);
995        }
996
997        for key in &diff.transforms.to_add {
998            debug!(message = "Spawning new transform.", key = %key);
999            self.spawn_transform(key, &mut new_pieces);
1000        }
1001
1002        for key in &diff.sinks.to_change {
1003            debug!(message = "Spawning changed sink.", key = %key);
1004            self.spawn_sink(key, &mut new_pieces);
1005        }
1006
1007        for key in &diff.sinks.to_add {
1008            trace!(message = "Spawning new sink.", key = %key);
1009            self.spawn_sink(key, &mut new_pieces);
1010        }
1011
1012        let changed_tables: Vec<&ComponentKey> = diff
1013            .enrichment_tables
1014            .to_change
1015            .iter()
1016            .filter(|k| {
1017                new_pieces.tasks.contains_key(k) && !new_pieces.source_tasks.contains_key(k)
1018            })
1019            .collect();
1020
1021        let added_tables: Vec<&ComponentKey> = diff
1022            .enrichment_tables
1023            .to_add
1024            .iter()
1025            .filter(|k| {
1026                new_pieces.tasks.contains_key(k) && !new_pieces.source_tasks.contains_key(k)
1027            })
1028            .collect();
1029
1030        for key in changed_tables {
1031            debug!(message = "Spawning changed enrichment table sink.", key = %key);
1032            self.spawn_sink(key, &mut new_pieces);
1033        }
1034
1035        for key in added_tables {
1036            debug!(message = "Spawning enrichment table new sink.", key = %key);
1037            self.spawn_sink(key, &mut new_pieces);
1038        }
1039    }
1040
1041    fn spawn_sink(&mut self, key: &ComponentKey, new_pieces: &mut builder::TopologyPieces) {
1042        let task = new_pieces.tasks.remove(key).unwrap();
1043        let span = error_span!(
1044            "sink",
1045            component_kind = "sink",
1046            component_id = %task.id(),
1047            component_type = %task.typetag(),
1048        );
1049
1050        let task_span = span.or_current();
1051        #[cfg(feature = "allocation-tracing")]
1052        if crate::internal_telemetry::allocations::is_allocation_tracing_enabled() {
1053            let group_id = crate::internal_telemetry::allocations::acquire_allocation_group_id(
1054                task.id().to_string(),
1055                "sink".to_string(),
1056                task.typetag().to_string(),
1057            );
1058            debug!(
1059                component_kind = "sink",
1060                component_type = task.typetag(),
1061                component_id = task.id(),
1062                group_id = group_id.as_raw().to_string(),
1063                "Registered new allocation group."
1064            );
1065            group_id.attach_to_span(&task_span);
1066        }
1067
1068        let task_name = format!(">> {} ({})", task.typetag(), task.id());
1069        let task = {
1070            let key = key.clone();
1071            handle_errors(task, self.abort_tx.clone(), |error| {
1072                ShutdownError::SinkAborted { key, error }
1073            })
1074        }
1075        .instrument(task_span);
1076        let spawned = spawn_named(task, task_name.as_ref());
1077        if let Some(previous) = self.tasks.insert(key.clone(), spawned) {
1078            drop(previous); // detach and forget
1079        }
1080    }
1081
1082    fn spawn_transform(&mut self, key: &ComponentKey, new_pieces: &mut builder::TopologyPieces) {
1083        let task = new_pieces.tasks.remove(key).unwrap();
1084        let span = error_span!(
1085            "transform",
1086            component_kind = "transform",
1087            component_id = %task.id(),
1088            component_type = %task.typetag(),
1089        );
1090
1091        let task_span = span.or_current();
1092        #[cfg(feature = "allocation-tracing")]
1093        if crate::internal_telemetry::allocations::is_allocation_tracing_enabled() {
1094            let group_id = crate::internal_telemetry::allocations::acquire_allocation_group_id(
1095                task.id().to_string(),
1096                "transform".to_string(),
1097                task.typetag().to_string(),
1098            );
1099            debug!(
1100                component_kind = "transform",
1101                component_type = task.typetag(),
1102                component_id = task.id(),
1103                group_id = group_id.as_raw().to_string(),
1104                "Registered new allocation group."
1105            );
1106            group_id.attach_to_span(&task_span);
1107        }
1108
1109        let task_name = format!(">> {} ({}) >>", task.typetag(), task.id());
1110        let task = {
1111            let key = key.clone();
1112            handle_errors(task, self.abort_tx.clone(), |error| {
1113                ShutdownError::TransformAborted { key, error }
1114            })
1115        }
1116        .instrument(task_span);
1117        let spawned = spawn_named(task, task_name.as_ref());
1118        if let Some(previous) = self.tasks.insert(key.clone(), spawned) {
1119            drop(previous); // detach and forget
1120        }
1121    }
1122
1123    fn spawn_source(&mut self, key: &ComponentKey, new_pieces: &mut builder::TopologyPieces) {
1124        let task = new_pieces.tasks.remove(key).unwrap();
1125        let span = error_span!(
1126            "source",
1127            component_kind = "source",
1128            component_id = %task.id(),
1129            component_type = %task.typetag(),
1130        );
1131
1132        let task_span = span.or_current();
1133        #[cfg(feature = "allocation-tracing")]
1134        if crate::internal_telemetry::allocations::is_allocation_tracing_enabled() {
1135            let group_id = crate::internal_telemetry::allocations::acquire_allocation_group_id(
1136                task.id().to_string(),
1137                "source".to_string(),
1138                task.typetag().to_string(),
1139            );
1140
1141            debug!(
1142                component_kind = "source",
1143                component_type = task.typetag(),
1144                component_id = task.id(),
1145                group_id = group_id.as_raw().to_string(),
1146                "Registered new allocation group."
1147            );
1148            group_id.attach_to_span(&task_span);
1149        }
1150
1151        let task_name = format!("{} ({}) >>", task.typetag(), task.id());
1152        let task = {
1153            let key = key.clone();
1154            handle_errors(task, self.abort_tx.clone(), |error| {
1155                ShutdownError::SourceAborted { key, error }
1156            })
1157        }
1158        .instrument(task_span.clone());
1159        let spawned = spawn_named(task, task_name.as_ref());
1160        if let Some(previous) = self.tasks.insert(key.clone(), spawned) {
1161            drop(previous); // detach and forget
1162        }
1163
1164        self.shutdown_coordinator
1165            .takeover_source(key, &mut new_pieces.shutdown_coordinator);
1166
1167        // Now spawn the actual source task.
1168        let source_task = new_pieces.source_tasks.remove(key).unwrap();
1169        let source_task = {
1170            let key = key.clone();
1171            handle_errors(source_task, self.abort_tx.clone(), |error| {
1172                ShutdownError::SourceAborted { key, error }
1173            })
1174        }
1175        .instrument(task_span);
1176        self.source_tasks
1177            .insert(key.clone(), spawn_named(source_task, task_name.as_ref()));
1178    }
1179
1180    pub async fn start_init_validated(
1181        config: Config,
1182        extra_context: ExtraContext,
1183    ) -> Option<(Self, ShutdownErrorReceiver)> {
1184        let diff = ConfigDiff::initial(&config);
1185        let pieces =
1186            TopologyPieces::build_or_log_errors(&config, &diff, HashMap::new(), extra_context)
1187                .await?;
1188        Self::start_validated(config, diff, pieces).await
1189    }
1190
1191    pub async fn start_validated(
1192        config: Config,
1193        diff: ConfigDiff,
1194        mut pieces: TopologyPieces,
1195    ) -> Option<(Self, ShutdownErrorReceiver)> {
1196        let (abort_tx, abort_rx) = mpsc::unbounded_channel();
1197
1198        let expire_metrics = match (
1199            config.global.expire_metrics,
1200            config.global.expire_metrics_secs,
1201        ) {
1202            (Some(e), None) => {
1203                warn!(
1204                "DEPRECATED: `expire_metrics` setting is deprecated and will be removed in a future version. Use `expire_metrics_secs` instead."
1205            );
1206                if e < Duration::from_secs(0) {
1207                    None
1208                } else {
1209                    Some(e.as_secs_f64())
1210                }
1211            }
1212            (Some(_), Some(_)) => {
1213                error!("Cannot set both `expire_metrics` and `expire_metrics_secs`.");
1214                return None;
1215            }
1216            (None, Some(e)) => {
1217                if e < 0f64 {
1218                    None
1219                } else {
1220                    Some(e)
1221                }
1222            }
1223            (None, None) => Some(300f64),
1224        };
1225
1226        if let Err(error) = crate::metrics::Controller::get()
1227            .expect("Metrics must be initialized")
1228            .set_expiry(
1229                expire_metrics,
1230                config
1231                    .global
1232                    .expire_metrics_per_metric_set
1233                    .clone()
1234                    .unwrap_or_default(),
1235            )
1236        {
1237            error!(message = "Invalid metrics expiry.", %error);
1238            return None;
1239        }
1240
1241        let mut utilization_emitter = pieces
1242            .utilization_emitter
1243            .take()
1244            .expect("Topology is missing the utilization metric emitter!");
1245        let mut running_topology = Self::new(config, abort_tx);
1246
1247        if !running_topology
1248            .run_healthchecks(&diff, &mut pieces, running_topology.config.healthchecks)
1249            .await
1250        {
1251            return None;
1252        }
1253        running_topology.connect_diff(&diff, &mut pieces).await;
1254        running_topology.spawn_diff(&diff, pieces);
1255
1256        let (utilization_task_shutdown_trigger, utilization_shutdown_signal, _) =
1257            ShutdownSignal::new_wired();
1258        running_topology.utilization_task_shutdown_trigger =
1259            Some(utilization_task_shutdown_trigger);
1260        running_topology.utilization_task = Some(tokio::spawn(Task::new(
1261            "utilization_heartbeat".into(),
1262            "",
1263            async move {
1264                utilization_emitter
1265                    .run_utilization(utilization_shutdown_signal)
1266                    .await;
1267                // TODO: new task output type for this? Or handle this task in a completely
1268                // different way
1269                Ok(TaskOutput::Healthcheck)
1270            },
1271        )));
1272
1273        Some((running_topology, abort_rx))
1274    }
1275}
1276
1277fn get_changed_outputs(diff: &ConfigDiff, output_ids: Inputs<OutputId>) -> Vec<OutputId> {
1278    let mut changed_outputs = Vec::new();
1279
1280    for source_key in &diff.sources.to_change {
1281        changed_outputs.extend(
1282            output_ids
1283                .iter()
1284                .filter(|id| &id.component == source_key)
1285                .cloned(),
1286        );
1287    }
1288
1289    for transform_key in &diff.transforms.to_change {
1290        changed_outputs.extend(
1291            output_ids
1292                .iter()
1293                .filter(|id| &id.component == transform_key)
1294                .cloned(),
1295        );
1296    }
1297
1298    changed_outputs
1299}