vector/topology/
running.rs

1use std::{
2    collections::{HashMap, HashSet},
3    sync::{Arc, Mutex},
4};
5
6use futures::{Future, FutureExt, future};
7use snafu::Snafu;
8use stream_cancel::Trigger;
9use tokio::{
10    sync::{mpsc, watch},
11    time::{Duration, Instant, interval, sleep_until},
12};
13use tracing::Instrument;
14use vector_lib::{
15    buffers::topology::channel::BufferSender,
16    shutdown::ShutdownSignal,
17    tap::topology::{TapOutput, TapResource, WatchRx, WatchTx},
18    trigger::DisabledTrigger,
19};
20
21use super::{
22    BuiltBuffer, TaskHandle,
23    builder::{self, TopologyPieces, TopologyPiecesBuilder, reload_enrichment_tables},
24    fanout::{ControlChannel, ControlMessage},
25    handle_errors, retain, take_healthchecks,
26    task::{Task, TaskOutput},
27};
28use crate::{
29    config::{ComponentKey, Config, ConfigDiff, HealthcheckOptions, Inputs, OutputId, Resource},
30    event::EventArray,
31    extra_context::ExtraContext,
32    shutdown::SourceShutdownCoordinator,
33    signal::ShutdownError,
34    spawn_named,
35    utilization::UtilizationRegistry,
36};
37
38pub type ShutdownErrorReceiver = mpsc::UnboundedReceiver<ShutdownError>;
39
40#[derive(Debug, Snafu)]
41pub enum ReloadError {
42    #[snafu(display("global options changed: {}", changed_fields.join(", ")))]
43    GlobalOptionsChanged { changed_fields: Vec<String> },
44    #[snafu(display("failed to compute global diff: {}", source))]
45    GlobalDiffFailed { source: serde_json::Error },
46    #[snafu(display("topology build failed"))]
47    TopologyBuildFailed,
48    #[snafu(display("failed to restore previous config"))]
49    FailedToRestore,
50}
51
52#[allow(dead_code)]
53pub struct RunningTopology {
54    inputs: HashMap<ComponentKey, BufferSender<EventArray>>,
55    inputs_tap_metadata: HashMap<ComponentKey, Inputs<OutputId>>,
56    outputs: HashMap<OutputId, ControlChannel>,
57    outputs_tap_metadata: HashMap<ComponentKey, (&'static str, String)>,
58    component_type_names: HashMap<ComponentKey, String>,
59    source_tasks: HashMap<ComponentKey, TaskHandle>,
60    tasks: HashMap<ComponentKey, TaskHandle>,
61    shutdown_coordinator: SourceShutdownCoordinator,
62    detach_triggers: HashMap<ComponentKey, DisabledTrigger>,
63    pub(crate) config: Config,
64    pub(crate) abort_tx: mpsc::UnboundedSender<ShutdownError>,
65    watch: (WatchTx, WatchRx),
66    graceful_shutdown_duration: Option<Duration>,
67    utilization_registry: Option<UtilizationRegistry>,
68    utilization_task: Option<TaskHandle>,
69    utilization_task_shutdown_trigger: Option<Trigger>,
70    metrics_task: Option<TaskHandle>,
71    metrics_task_shutdown_trigger: Option<Trigger>,
72    pending_reload: Option<HashSet<ComponentKey>>,
73}
74
75impl RunningTopology {
76    pub fn new(config: Config, abort_tx: mpsc::UnboundedSender<ShutdownError>) -> Self {
77        Self {
78            inputs: HashMap::new(),
79            inputs_tap_metadata: HashMap::new(),
80            outputs: HashMap::new(),
81            outputs_tap_metadata: HashMap::new(),
82            component_type_names: HashMap::new(),
83            shutdown_coordinator: SourceShutdownCoordinator::default(),
84            detach_triggers: HashMap::new(),
85            source_tasks: HashMap::new(),
86            tasks: HashMap::new(),
87            abort_tx,
88            watch: watch::channel(TapResource::default()),
89            graceful_shutdown_duration: config.graceful_shutdown_duration,
90            config,
91            utilization_registry: None,
92            utilization_task: None,
93            utilization_task_shutdown_trigger: None,
94            metrics_task: None,
95            metrics_task_shutdown_trigger: None,
96            pending_reload: None,
97        }
98    }
99
100    /// Gets the configuration that represents this running topology.
101    pub const fn config(&self) -> &Config {
102        &self.config
103    }
104
105    /// Adds a set of component keys to the pending reload set if one exists. Otherwise, it
106    /// initializes the pending reload set.
107    pub fn extend_reload_set(&mut self, new_set: HashSet<ComponentKey>) {
108        match &mut self.pending_reload {
109            None => self.pending_reload = Some(new_set.clone()),
110            Some(existing) => existing.extend(new_set),
111        }
112    }
113
114    /// Creates a subscription to topology changes.
115    ///
116    /// This is used by the tap API to observe configuration changes, and re-wire tap sinks.
117    pub fn watch(&self) -> watch::Receiver<TapResource> {
118        self.watch.1.clone()
119    }
120
121    /// Signal that all sources in this topology are ended.
122    ///
123    /// The future returned by this function will finish once all the sources in
124    /// this topology have finished. This allows the caller to wait for or
125    /// detect that the sources in the topology are no longer
126    /// producing. [`Application`][crate::app::Application], as an example, uses this as a
127    /// shutdown signal.
128    pub fn sources_finished(&self) -> future::BoxFuture<'static, ()> {
129        self.shutdown_coordinator.shutdown_tripwire()
130    }
131
132    /// Shut down all topology components.
133    ///
134    /// This function sends the shutdown signal to all sources in this topology
135    /// and returns a future that resolves once all components (sources,
136    /// transforms, and sinks) have finished shutting down. Transforms and sinks
137    /// will shut down automatically once their input tasks finish.
138    ///
139    /// This function takes ownership of `self`, so once it returns everything
140    /// in the [`RunningTopology`] instance has been dropped except for the
141    /// `tasks` map. This map gets moved into the returned future and is used to
142    /// poll for when the tasks have completed. Once the returned future is
143    /// dropped then everything from this RunningTopology instance is fully
144    /// dropped.
145    pub fn stop(self) -> impl Future<Output = ()> {
146        // Create handy handles collections of all tasks for the subsequent
147        // operations.
148        let mut wait_handles = Vec::new();
149        // We need a Vec here since source components have two tasks. One for
150        // pump in self.tasks, and the other for source in self.source_tasks.
151        let mut check_handles = HashMap::<ComponentKey, Vec<_>>::new();
152
153        let map_closure = |_result| ();
154
155        // We need to give some time to the sources to gracefully shutdown, so
156        // we will merge them with other tasks.
157        for (key, task) in self.tasks.into_iter().chain(self.source_tasks.into_iter()) {
158            let task = task.map(map_closure).shared();
159
160            wait_handles.push(task.clone());
161            check_handles.entry(key).or_default().push(task);
162        }
163
164        if let Some(utilization_task) = self.utilization_task {
165            wait_handles.push(utilization_task.map(map_closure).shared());
166        }
167
168        if let Some(metrics_task) = self.metrics_task {
169            wait_handles.push(metrics_task.map(map_closure).shared());
170        }
171
172        // If we reach this, we will forcefully shutdown the sources. If None, we will never force shutdown.
173        let deadline = self
174            .graceful_shutdown_duration
175            .map(|grace_period| Instant::now() + grace_period);
176
177        let timeout = if let Some(deadline) = deadline {
178            // If we reach the deadline, this future will print out which components
179            // won't gracefully shutdown since we will start to forcefully shutdown
180            // the sources.
181            let mut check_handles2 = check_handles.clone();
182            Box::pin(async move {
183                sleep_until(deadline).await;
184                // Remove all tasks that have shutdown.
185                check_handles2.retain(|_key, handles| {
186                    retain(handles, |handle| handle.peek().is_none());
187                    !handles.is_empty()
188                });
189                let remaining_components = check_handles2
190                    .keys()
191                    .map(|item| item.to_string())
192                    .collect::<Vec<_>>()
193                    .join(", ");
194
195                error!(
196                    components = ?remaining_components,
197                    message = "Failed to gracefully shut down in time. Killing components.",
198                    internal_log_rate_limit = false
199                );
200            }) as future::BoxFuture<'static, ()>
201        } else {
202            Box::pin(future::pending()) as future::BoxFuture<'static, ()>
203        };
204
205        // Reports in intervals which components are still running.
206        let mut interval = interval(Duration::from_secs(5));
207        let reporter = async move {
208            loop {
209                interval.tick().await;
210
211                // Remove all tasks that have shutdown.
212                check_handles.retain(|_key, handles| {
213                    retain(handles, |handle| handle.peek().is_none());
214                    !handles.is_empty()
215                });
216                let remaining_components = check_handles
217                    .keys()
218                    .map(|item| item.to_string())
219                    .collect::<Vec<_>>()
220                    .join(", ");
221
222                let (deadline_passed, time_remaining) = match deadline {
223                    Some(d) => match d.checked_duration_since(Instant::now()) {
224                        Some(remaining) => (false, format!("{} seconds left", remaining.as_secs())),
225                        None => (true, "overdue".to_string()),
226                    },
227                    None => (false, "no time limit".to_string()),
228                };
229
230                info!(
231                    remaining_components = ?remaining_components,
232                    time_remaining = ?time_remaining,
233                    "Shutting down... Waiting on running components."
234                );
235
236                let all_done = check_handles.is_empty();
237
238                if all_done {
239                    info!("Shutdown reporter exiting: all components shut down.");
240                    break;
241                } else if deadline_passed {
242                    error!(remaining_components = ?remaining_components, "Shutdown reporter: deadline exceeded.");
243                    break;
244                }
245            }
246        };
247
248        // Finishes once all tasks have shutdown.
249        let success = futures::future::join_all(wait_handles).map(|_| ());
250
251        // Aggregate future that ends once anything detects that all tasks have shutdown.
252        let shutdown_complete_future = future::select_all(vec![
253            Box::pin(timeout) as future::BoxFuture<'static, ()>,
254            Box::pin(reporter) as future::BoxFuture<'static, ()>,
255            Box::pin(success) as future::BoxFuture<'static, ()>,
256        ]);
257
258        // Now kick off the shutdown process by shutting down the sources.
259        let source_shutdown_complete = self.shutdown_coordinator.shutdown_all(deadline);
260        if let Some(trigger) = self.utilization_task_shutdown_trigger {
261            trigger.cancel();
262        }
263        if let Some(trigger) = self.metrics_task_shutdown_trigger {
264            trigger.cancel();
265        }
266
267        futures::future::join(source_shutdown_complete, shutdown_complete_future).map(|_| ())
268    }
269
270    /// Attempts to load a new configuration and update this running topology.
271    ///
272    /// If the new configuration was valid, and all changes were able to be made -- removing of
273    /// old components, changing of existing components, adding of new components -- then
274    /// `Ok(())` is returned.
275    ///
276    /// If the new configuration is not valid, or not all of the changes in the new configuration
277    /// were able to be made, then this method will attempt to undo the changes made and bring the
278    /// topology back to its previous state, returning the appropriate error.
279    ///
280    /// If the restore also fails, `ReloadError::FailedToRestore` is returned.
281    pub async fn reload_config_and_respawn(
282        &mut self,
283        new_config: Config,
284        extra_context: ExtraContext,
285    ) -> Result<(), ReloadError> {
286        info!("Reloading running topology with new configuration.");
287
288        if self.config.global != new_config.global {
289            return match self.config.global.diff(&new_config.global) {
290                Ok(changed_fields) => Err(ReloadError::GlobalOptionsChanged { changed_fields }),
291                Err(source) => Err(ReloadError::GlobalDiffFailed { source }),
292            };
293        }
294
295        // Calculate the change between the current configuration and the new configuration, and
296        // shutdown any components that are changing so that we can reclaim their buffers before
297        // spawning the new version of the component.
298        //
299        // We also shutdown any component that is simply being removed entirely.
300        let diff = if let Some(components) = &self.pending_reload {
301            ConfigDiff::new(&self.config, &new_config, components.clone())
302        } else {
303            ConfigDiff::new(&self.config, &new_config, HashSet::new())
304        };
305        let buffers = self.shutdown_diff(&diff, &new_config).await;
306
307        // Gives windows some time to make available any port
308        // released by shutdown components.
309        // Issue: https://github.com/vectordotdev/vector/issues/3035
310        if cfg!(windows) {
311            // This value is guess work.
312            tokio::time::sleep(Duration::from_millis(200)).await;
313        }
314
315        // Try to build all of the new components coming from the new configuration.  If we can
316        // successfully build them, we'll attempt to connect them up to the topology and spawn their
317        // respective component tasks.
318        if let Some(mut new_pieces) = TopologyPiecesBuilder::new(&new_config, &diff)
319            .with_buffers(buffers.clone())
320            .with_extra_context(extra_context.clone())
321            .with_utilization_registry(self.utilization_registry.clone())
322            .build_or_log_errors()
323            .await
324        {
325            // If healthchecks are configured for any of the changing/new components, try running
326            // them before moving forward with connecting and spawning.  In some cases, healthchecks
327            // failing may be configured as a non-blocking issue and so we'll still continue on.
328            if self
329                .run_healthchecks(&diff, &mut new_pieces, new_config.healthchecks)
330                .await
331            {
332                self.connect_diff(&diff, &mut new_pieces).await;
333                self.spawn_diff(&diff, new_pieces);
334                self.config = new_config;
335
336                info!("New configuration loaded successfully.");
337
338                return Ok(());
339            }
340        }
341
342        // We failed to build, connect, and spawn all of the changed/new components, so we flip
343        // around the configuration differential to generate all the components that we need to
344        // bring back to restore the current configuration.
345        warn!("Failed to completely load new configuration. Restoring old configuration.");
346
347        let diff = diff.flip();
348        if let Some(mut new_pieces) = TopologyPiecesBuilder::new(&self.config, &diff)
349            .with_buffers(buffers)
350            .with_extra_context(extra_context.clone())
351            .with_utilization_registry(self.utilization_registry.clone())
352            .build_or_log_errors()
353            .await
354            && self
355                .run_healthchecks(&diff, &mut new_pieces, self.config.healthchecks)
356                .await
357        {
358            self.connect_diff(&diff, &mut new_pieces).await;
359            self.spawn_diff(&diff, new_pieces);
360
361            info!("Old configuration restored successfully.");
362
363            return Err(ReloadError::TopologyBuildFailed);
364        }
365
366        error!(
367            message = "Failed to restore old configuration.",
368            internal_log_rate_limit = false
369        );
370
371        Err(ReloadError::FailedToRestore)
372    }
373
374    /// Attempts to reload enrichment tables.
375    pub(crate) async fn reload_enrichment_tables(&self) {
376        reload_enrichment_tables(&self.config).await;
377    }
378
379    pub(crate) async fn run_healthchecks(
380        &mut self,
381        diff: &ConfigDiff,
382        pieces: &mut TopologyPieces,
383        options: HealthcheckOptions,
384    ) -> bool {
385        if options.enabled {
386            let healthchecks = take_healthchecks(diff, pieces)
387                .into_iter()
388                .map(|(_, task)| task);
389            let healthchecks = future::try_join_all(healthchecks);
390
391            info!("Running healthchecks.");
392            if options.require_healthy {
393                let success = healthchecks.await;
394
395                if success.is_ok() {
396                    info!("All healthchecks passed.");
397                    true
398                } else {
399                    error!(
400                        message = "Sinks unhealthy.",
401                        internal_log_rate_limit = false
402                    );
403                    false
404                }
405            } else {
406                tokio::spawn(healthchecks);
407                true
408            }
409        } else {
410            true
411        }
412    }
413
414    /// Shuts down any changed/removed component in the given configuration diff.
415    ///
416    /// If buffers for any of the changed/removed components can be recovered, they'll be returned.
417    async fn shutdown_diff(
418        &mut self,
419        diff: &ConfigDiff,
420        new_config: &Config,
421    ) -> HashMap<ComponentKey, BuiltBuffer> {
422        // First, we shutdown any changed/removed sources. This ensures that we can allow downstream
423        // components to terminate naturally by virtue of the flow of events stopping.
424        if diff.sources.any_changed_or_removed() {
425            let timeout = Duration::from_secs(30);
426            let mut source_shutdown_handles = Vec::new();
427
428            let deadline = Instant::now() + timeout;
429            for key in &diff.sources.to_remove {
430                debug!(component_id = %key, "Removing source.");
431
432                let previous = self.tasks.remove(key).unwrap();
433                drop(previous); // detach and forget
434
435                self.remove_outputs(key);
436                source_shutdown_handles
437                    .push(self.shutdown_coordinator.shutdown_source(key, deadline));
438            }
439
440            for key in &diff.sources.to_change {
441                debug!(component_id = %key, "Changing source.");
442
443                self.remove_outputs(key);
444                source_shutdown_handles
445                    .push(self.shutdown_coordinator.shutdown_source(key, deadline));
446            }
447
448            debug!(
449                "Waiting for up to {} seconds for source(s) to finish shutting down.",
450                timeout.as_secs()
451            );
452            futures::future::join_all(source_shutdown_handles).await;
453
454            // Final cleanup pass now that all changed/removed sources have signalled as having shutdown.
455            for key in diff.sources.removed_and_changed() {
456                if let Some(task) = self.source_tasks.remove(key) {
457                    task.await.unwrap().unwrap();
458                }
459            }
460        }
461
462        // Next, we shutdown any changed/removed transforms.  Same as before: we want allow
463        // downstream components to terminate naturally by virtue of the flow of events stopping.
464        //
465        // Since transforms are entirely driven by the flow of events into them from upstream
466        // components, the shutdown of sources they depend on, or the shutdown of transforms they
467        // depend on, and thus the closing of their buffer, will naturally cause them to shutdown,
468        // which is why we don't do any manual triggering of shutdown here.
469        for key in &diff.transforms.to_remove {
470            debug!(component_id = %key, "Removing transform.");
471
472            let previous = self.tasks.remove(key).unwrap();
473            drop(previous); // detach and forget
474
475            self.remove_inputs(key, diff, new_config).await;
476            self.remove_outputs(key);
477
478            if let Some(registry) = self.utilization_registry.as_ref() {
479                registry.remove_component(key);
480            }
481        }
482
483        for key in &diff.transforms.to_change {
484            debug!(component_id = %key, "Changing transform.");
485
486            self.remove_inputs(key, diff, new_config).await;
487            self.remove_outputs(key);
488        }
489
490        // Now we'll process any changed/removed sinks.
491        //
492        // At this point both the old and the new config don't have conflicts in their resource
493        // usage. So if we combine their resources, all found conflicts are between to be removed
494        // and to be added components.
495        let removed_table_sinks = diff
496            .enrichment_tables
497            .removed_and_changed()
498            .filter_map(|key| {
499                self.config
500                    .enrichment_table(key)
501                    .and_then(|t| t.as_sink(key))
502                    .map(|(key, s)| (key.clone(), s.resources(&key)))
503            })
504            .collect::<Vec<_>>();
505        let remove_sink = diff
506            .sinks
507            .removed_and_changed()
508            .map(|key| {
509                (
510                    key,
511                    self.config
512                        .sink(key)
513                        .map(|s| s.resources(key))
514                        .unwrap_or_default(),
515                )
516            })
517            .chain(removed_table_sinks.iter().map(|(k, s)| (k, s.clone())));
518        let add_source = diff
519            .sources
520            .changed_and_added()
521            .map(|key| (key, new_config.source(key).unwrap().inner.resources()));
522        let added_table_sinks = diff
523            .enrichment_tables
524            .changed_and_added()
525            .filter_map(|key| {
526                self.config
527                    .enrichment_table(key)
528                    .and_then(|t| t.as_sink(key))
529                    .map(|(key, s)| (key.clone(), s.resources(&key)))
530            })
531            .collect::<Vec<_>>();
532        let add_sink = diff
533            .sinks
534            .changed_and_added()
535            .map(|key| {
536                (
537                    key,
538                    new_config
539                        .sink(key)
540                        .map(|s| s.resources(key))
541                        .unwrap_or_default(),
542                )
543            })
544            .chain(added_table_sinks.iter().map(|(k, s)| (k, s.clone())));
545        let conflicts = Resource::conflicts(
546            remove_sink.map(|(key, value)| ((true, key), value)).chain(
547                add_sink
548                    .chain(add_source)
549                    .map(|(key, value)| ((false, key), value)),
550            ),
551        )
552        .into_iter()
553        .flat_map(|(_, components)| components)
554        .collect::<HashSet<_>>();
555        // Existing conflicting sinks
556        let conflicting_sinks = conflicts
557            .into_iter()
558            .filter(|&(existing_sink, _)| existing_sink)
559            .map(|(_, key)| key.clone());
560
561        // For any sink whose buffer configuration didn't change, we can reuse their buffer.
562        let reuse_buffers = diff
563            .sinks
564            .to_change
565            .iter()
566            .filter(|&key| {
567                if diff.components_to_reload.contains(key) {
568                    return false;
569                }
570                self.config.sink(key).map(|s| s.buffer.clone()).or_else(|| {
571                    self.config
572                        .enrichment_table(key)
573                        .and_then(|t| t.as_sink(key))
574                        .map(|(_, s)| s.buffer)
575                }) == new_config.sink(key).map(|s| s.buffer.clone()).or_else(|| {
576                    self.config
577                        .enrichment_table(key)
578                        .and_then(|t| t.as_sink(key))
579                        .map(|(_, s)| s.buffer)
580                })
581            })
582            .cloned()
583            .collect::<HashSet<_>>();
584
585        // For any existing sink that has a conflicting resource dependency with a changed/added
586        // sink, or for any sink that we want to reuse their buffer, we need to explicit wait for
587        // them to finish processing so we can reclaim ownership of those resources/buffers.
588        let wait_for_sinks = conflicting_sinks
589            .chain(reuse_buffers.iter().cloned())
590            .collect::<HashSet<_>>();
591
592        // First, we remove any inputs to removed sinks so they can naturally shut down.
593        let removed_sinks = diff
594            .sinks
595            .to_remove
596            .iter()
597            .chain(diff.enrichment_tables.to_remove.iter().filter(|key| {
598                self.config
599                    .enrichment_table(key)
600                    .and_then(|t| t.as_sink(key))
601                    .is_some()
602            }))
603            .collect::<Vec<_>>();
604        for key in &removed_sinks {
605            debug!(component_id = %key, "Removing sink.");
606            self.remove_inputs(key, diff, new_config).await;
607
608            if let Some(registry) = self.utilization_registry.as_ref() {
609                registry.remove_component(key);
610            }
611        }
612
613        // After that, for any changed sinks, we temporarily detach their inputs (not remove) so
614        // they can naturally shutdown and allow us to recover their buffers if possible.
615        let mut buffer_tx = HashMap::new();
616
617        let sinks_to_change = diff
618            .sinks
619            .to_change
620            .iter()
621            .chain(diff.enrichment_tables.to_change.iter().filter(|key| {
622                self.config
623                    .enrichment_table(key)
624                    .and_then(|t| t.as_sink(key))
625                    .is_some()
626            }))
627            .collect::<Vec<_>>();
628
629        for key in &sinks_to_change {
630            debug!(component_id = %key, "Changing sink.");
631            if reuse_buffers.contains(key) {
632                self.detach_triggers
633                    .remove(key)
634                    .unwrap()
635                    .into_inner()
636                    .cancel();
637
638                // We explicitly clone the input side of the buffer and store it so we don't lose
639                // it when we remove the inputs below.
640                //
641                // We clone instead of removing here because otherwise the input will be missing for
642                // the rest of the reload process, which violates the assumption that all previous
643                // inputs for components not being removed are still available. It's simpler to
644                // allow the "old" input to stick around and be replaced (even though that's
645                // basically a no-op since we're reusing the same buffer) than it is to pass around
646                // info about which sinks are having their buffers reused and treat them differently
647                // at other stages.
648                buffer_tx.insert((*key).clone(), self.inputs.get(key).unwrap().clone());
649            }
650            self.remove_inputs(key, diff, new_config).await;
651        }
652
653        // Now that we've disconnected or temporarily detached the inputs to all changed/removed
654        // sinks, we can actually wait for them to shutdown before collecting any buffers that are
655        // marked for reuse.
656        //
657        // If a sink we're removing isn't tying up any resource that a changed/added sink depends
658        // on, we don't bother waiting for it to shutdown.
659        for key in &removed_sinks {
660            let previous = self.tasks.remove(key).unwrap();
661            if wait_for_sinks.contains(key) {
662                debug!(message = "Waiting for sink to shutdown.", component_id = %key);
663                previous.await.unwrap().unwrap();
664            } else {
665                drop(previous); // detach and forget
666            }
667        }
668
669        let mut buffers = HashMap::<ComponentKey, BuiltBuffer>::new();
670        for key in &sinks_to_change {
671            if wait_for_sinks.contains(key) {
672                let previous = self.tasks.remove(key).unwrap();
673                debug!(message = "Waiting for sink to shutdown.", component_id = %key);
674                let buffer = previous.await.unwrap().unwrap();
675
676                if reuse_buffers.contains(key) {
677                    // We clone instead of removing here because otherwise the input will be
678                    // missing for the rest of the reload process, which violates the assumption
679                    // that all previous inputs for components not being removed are still
680                    // available. It's simpler to allow the "old" input to stick around and be
681                    // replaced (even though that's basically a no-op since we're reusing the same
682                    // buffer) than it is to pass around info about which sinks are having their
683                    // buffers reused and treat them differently at other stages.
684                    let tx = buffer_tx.remove(key).unwrap();
685                    let rx = match buffer {
686                        TaskOutput::Sink(rx) => rx.into_inner(),
687                        _ => unreachable!(),
688                    };
689
690                    buffers.insert((*key).clone(), (tx, Arc::new(Mutex::new(Some(rx)))));
691                }
692            }
693        }
694
695        buffers
696    }
697
698    /// Connects all changed/added components in the given configuration diff.
699    pub(crate) async fn connect_diff(
700        &mut self,
701        diff: &ConfigDiff,
702        new_pieces: &mut TopologyPieces,
703    ) {
704        debug!("Connecting changed/added component(s).");
705
706        // Update tap metadata
707        if !self.watch.0.is_closed() {
708            for key in &diff.sources.to_remove {
709                // Sources only have outputs
710                self.outputs_tap_metadata.remove(key);
711                self.component_type_names.remove(key);
712            }
713
714            for key in &diff.transforms.to_remove {
715                // Transforms can have both inputs and outputs
716                self.outputs_tap_metadata.remove(key);
717                self.inputs_tap_metadata.remove(key);
718                self.component_type_names.remove(key);
719            }
720
721            for key in &diff.sinks.to_remove {
722                // Sinks only have inputs
723                self.inputs_tap_metadata.remove(key);
724                self.component_type_names.remove(key);
725            }
726
727            let removed_sinks = diff.enrichment_tables.to_remove.iter().filter(|key| {
728                self.config
729                    .enrichment_table(key)
730                    .and_then(|t| t.as_sink(key))
731                    .is_some()
732            });
733            for key in removed_sinks {
734                // Sinks only have inputs
735                self.inputs_tap_metadata.remove(key);
736            }
737
738            let removed_sources = diff.enrichment_tables.to_remove.iter().filter_map(|key| {
739                self.config
740                    .enrichment_table(key)
741                    .and_then(|t| t.as_source(key).map(|(key, _)| key))
742            });
743            for key in removed_sources {
744                // Sources only have outputs
745                self.outputs_tap_metadata.remove(&key);
746            }
747
748            for key in diff.sources.changed_and_added() {
749                if let Some(task) = new_pieces.tasks.get(key) {
750                    let typetag = task.typetag().to_string();
751                    self.outputs_tap_metadata
752                        .insert(key.clone(), ("source", typetag.clone()));
753                    self.component_type_names.insert(key.clone(), typetag);
754                }
755            }
756
757            for key in diff
758                .enrichment_tables
759                .changed_and_added()
760                .filter_map(|key| {
761                    self.config
762                        .enrichment_table(key)
763                        .and_then(|t| t.as_source(key).map(|(key, _)| key))
764                })
765            {
766                if let Some(task) = new_pieces.tasks.get(&key) {
767                    self.outputs_tap_metadata
768                        .insert(key.clone(), ("source", task.typetag().to_string()));
769                }
770            }
771
772            for key in diff.transforms.changed_and_added() {
773                if let Some(task) = new_pieces.tasks.get(key) {
774                    let typetag = task.typetag().to_string();
775                    self.outputs_tap_metadata
776                        .insert(key.clone(), ("transform", typetag.clone()));
777                    self.component_type_names.insert(key.clone(), typetag);
778                }
779            }
780
781            for key in diff.sinks.changed_and_added() {
782                if let Some(task) = new_pieces.tasks.get(key) {
783                    self.component_type_names
784                        .insert(key.clone(), task.typetag().to_string());
785                }
786            }
787
788            for (key, input) in &new_pieces.inputs {
789                self.inputs_tap_metadata
790                    .insert(key.clone(), input.1.clone());
791            }
792        }
793
794        // We configure the outputs of any changed/added sources first, so they're available to any
795        // transforms and sinks that come afterwards.
796        for key in diff.sources.changed_and_added() {
797            debug!(component_id = %key, "Configuring outputs for source.");
798            self.setup_outputs(key, new_pieces).await;
799        }
800
801        let added_changed_table_sources: Vec<&ComponentKey> = diff
802            .enrichment_tables
803            .changed_and_added()
804            .filter(|k| new_pieces.source_tasks.contains_key(k))
805            .collect();
806        for key in added_changed_table_sources.iter() {
807            debug!(component_id = %key, "Connecting outputs for enrichment table source.");
808            self.setup_outputs(key, new_pieces).await;
809        }
810
811        // We configure the outputs of any changed/added transforms next, for the same reason: we
812        // need them to be available to any transforms and sinks that come afterwards.
813        for key in diff.transforms.changed_and_added() {
814            debug!(component_id = %key, "Configuring outputs for transform.");
815            self.setup_outputs(key, new_pieces).await;
816        }
817
818        // Now that all possible outputs are configured, we can start wiring up inputs, starting
819        // with transforms.
820        for key in diff.transforms.changed_and_added() {
821            debug!(component_id = %key, "Connecting inputs for transform.");
822            self.setup_inputs(key, diff, new_pieces).await;
823        }
824
825        // Now that all sources and transforms are fully configured, we can wire up sinks.
826        for key in diff.sinks.changed_and_added() {
827            debug!(component_id = %key, "Connecting inputs for sink.");
828            self.setup_inputs(key, diff, new_pieces).await;
829        }
830        let added_changed_tables: Vec<&ComponentKey> = diff
831            .enrichment_tables
832            .changed_and_added()
833            .filter(|k| new_pieces.inputs.contains_key(k))
834            .collect();
835        for key in added_changed_tables.iter() {
836            debug!(component_id = %key, "Connecting inputs for enrichment table sink.");
837            self.setup_inputs(key, diff, new_pieces).await;
838        }
839
840        // We do a final pass here to reconnect unchanged components.
841        //
842        // Why would we reconnect unchanged components?  Well, as sources and transforms will
843        // recreate their fanouts every time they're changed, we can run into a situation where a
844        // transform/sink, which we'll call B, is pointed at a source/transform that was changed, which
845        // we'll call A, but because B itself didn't change at all, we haven't yet reconnected it.
846        //
847        // Instead of propagating connections forward -- B reconnecting A forcefully -- we only
848        // connect components backwards i.e. transforms to sources/transforms, and sinks to
849        // sources/transforms, to ensure we're connecting components in order.
850        self.reattach_severed_inputs(diff);
851
852        // Broadcast any topology changes to subscribers.
853        if !self.watch.0.is_closed() {
854            let outputs = self
855                .outputs
856                .clone()
857                .into_iter()
858                .flat_map(|(output_id, control_tx)| {
859                    self.outputs_tap_metadata.get(&output_id.component).map(
860                        |(component_kind, component_type)| {
861                            (
862                                TapOutput {
863                                    output_id,
864                                    component_kind,
865                                    component_type: component_type.clone(),
866                                },
867                                control_tx,
868                            )
869                        },
870                    )
871                })
872                .collect::<HashMap<_, _>>();
873
874            let mut removals = diff.sources.to_remove.clone();
875            removals.extend(diff.transforms.to_remove.iter().cloned());
876            self.watch
877                .0
878                .send(TapResource {
879                    outputs,
880                    inputs: self.inputs_tap_metadata.clone(),
881                    source_keys: diff
882                        .sources
883                        .changed_and_added()
884                        .map(|key| key.to_string())
885                        .chain(
886                            added_changed_table_sources
887                                .iter()
888                                .map(|key| key.to_string()),
889                        )
890                        .collect(),
891                    sink_keys: diff
892                        .sinks
893                        .changed_and_added()
894                        .map(|key| key.to_string())
895                        .chain(added_changed_tables.iter().map(|key| key.to_string()))
896                        .collect(),
897                    // Note, only sources and transforms are relevant. Sinks do
898                    // not have outputs to tap.
899                    removals,
900                    type_names: self
901                        .component_type_names
902                        .iter()
903                        .map(|(k, v)| (k.to_string(), v.clone()))
904                        .collect(),
905                })
906                .expect("Couldn't broadcast config changes.");
907        }
908    }
909
910    async fn setup_outputs(
911        &mut self,
912        key: &ComponentKey,
913        new_pieces: &mut builder::TopologyPieces,
914    ) {
915        let outputs = new_pieces.outputs.remove(key).unwrap();
916        for (port, output) in outputs {
917            debug!(component_id = %key, output_id = ?port, "Configuring output for component.");
918
919            let id = OutputId {
920                component: key.clone(),
921                port,
922            };
923
924            self.outputs.insert(id, output);
925        }
926    }
927
928    async fn setup_inputs(
929        &mut self,
930        key: &ComponentKey,
931        diff: &ConfigDiff,
932        new_pieces: &mut builder::TopologyPieces,
933    ) {
934        let (tx, inputs) = new_pieces.inputs.remove(key).unwrap();
935
936        let old_inputs = self
937            .config
938            .inputs_for_node(key)
939            .into_iter()
940            .flatten()
941            .cloned()
942            .collect::<HashSet<_>>();
943
944        let new_inputs = inputs.iter().cloned().collect::<HashSet<_>>();
945        let inputs_to_add = &new_inputs - &old_inputs;
946
947        for input in inputs {
948            let output = self.outputs.get_mut(&input).expect("unknown output");
949
950            if diff.contains(&input.component) || inputs_to_add.contains(&input) {
951                // If the input we're connecting to is changing, that means its outputs will have been
952                // recreated, so instead of replacing a paused sink, we have to add it to this new
953                // output for the first time, since there's nothing to actually replace at this point.
954                debug!(component_id = %key, fanout_id = %input, "Adding component input to fanout.");
955
956                _ = output.send(ControlMessage::Add(key.clone(), tx.clone()));
957            } else {
958                // We know that if this component is connected to a given input, and neither
959                // components were changed, then the output must still exist, which means we paused
960                // this component's connection to its output, so we have to replace that connection
961                // now:
962                debug!(component_id = %key, fanout_id = %input, "Replacing component input in fanout.");
963
964                _ = output.send(ControlMessage::Replace(key.clone(), tx.clone()));
965            }
966        }
967
968        self.inputs.insert(key.clone(), tx);
969        new_pieces
970            .detach_triggers
971            .remove(key)
972            .map(|trigger| self.detach_triggers.insert(key.clone(), trigger.into()));
973    }
974
975    fn remove_outputs(&mut self, key: &ComponentKey) {
976        self.outputs.retain(|id, _output| &id.component != key);
977    }
978
979    async fn remove_inputs(&mut self, key: &ComponentKey, diff: &ConfigDiff, new_config: &Config) {
980        self.inputs.remove(key);
981        self.detach_triggers.remove(key);
982
983        let old_inputs = self.config.inputs_for_node(key).expect("node exists");
984        let new_inputs = new_config
985            .inputs_for_node(key)
986            .unwrap_or_default()
987            .iter()
988            .collect::<HashSet<_>>();
989
990        for input in old_inputs {
991            if let Some(output) = self.outputs.get_mut(input) {
992                if diff.contains(&input.component)
993                    || diff.is_removed(key)
994                    || !new_inputs.contains(input)
995                {
996                    // 3 cases to remove the input:
997                    //
998                    // Case 1: If the input we're removing ourselves from is changing, that means its
999                    // outputs will be recreated, so instead of pausing the sink, we just delete it
1000                    // outright to ensure things are clean.
1001                    //
1002                    // Case 2: If this component itself is being removed, then pausing makes no sense
1003                    // because it isn't coming back.
1004                    //
1005                    // Case 3: This component is no longer connected to the input from new config.
1006                    debug!(component_id = %key, fanout_id = %input, "Removing component input from fanout.");
1007
1008                    _ = output.send(ControlMessage::Remove(key.clone()));
1009                } else {
1010                    // We know that if this component is connected to a given input, and it isn't being
1011                    // changed, then it will exist when we reconnect inputs, so we should pause it
1012                    // now to pause further sends through that component until we reconnect:
1013                    debug!(component_id = %key, fanout_id = %input, "Pausing component input in fanout.");
1014
1015                    _ = output.send(ControlMessage::Pause(key.clone()));
1016                }
1017            }
1018        }
1019    }
1020
1021    fn reattach_severed_inputs(&mut self, diff: &ConfigDiff) {
1022        let unchanged_transforms = self
1023            .config
1024            .transforms()
1025            .filter(|(key, _)| !diff.transforms.contains(key));
1026        for (transform_key, transform) in unchanged_transforms {
1027            let changed_outputs = get_changed_outputs(diff, transform.inputs.clone());
1028            for output_id in changed_outputs {
1029                debug!(component_id = %transform_key, fanout_id = %output_id.component, "Reattaching component input to fanout.");
1030
1031                let input = self.inputs.get(transform_key).cloned().unwrap();
1032                let output = self.outputs.get_mut(&output_id).unwrap();
1033                _ = output.send(ControlMessage::Add(transform_key.clone(), input));
1034            }
1035        }
1036
1037        let unchanged_sinks = self
1038            .config
1039            .sinks()
1040            .filter(|(key, _)| !diff.sinks.contains(key));
1041        for (sink_key, sink) in unchanged_sinks {
1042            let changed_outputs = get_changed_outputs(diff, sink.inputs.clone());
1043            for output_id in changed_outputs {
1044                debug!(component_id = %sink_key, fanout_id = %output_id.component, "Reattaching component input to fanout.");
1045
1046                let input = self.inputs.get(sink_key).cloned().unwrap();
1047                let output = self.outputs.get_mut(&output_id).unwrap();
1048                _ = output.send(ControlMessage::Add(sink_key.clone(), input));
1049            }
1050        }
1051    }
1052
1053    /// Starts any new or changed components in the given configuration diff.
1054    pub(crate) fn spawn_diff(&mut self, diff: &ConfigDiff, mut new_pieces: TopologyPieces) {
1055        for key in &diff.sources.to_change {
1056            debug!(message = "Spawning changed source.", component_id = %key);
1057            self.spawn_source(key, &mut new_pieces);
1058        }
1059
1060        for key in &diff.sources.to_add {
1061            debug!(message = "Spawning new source.", component_id = %key);
1062            self.spawn_source(key, &mut new_pieces);
1063        }
1064
1065        let changed_table_sources: Vec<&ComponentKey> = diff
1066            .enrichment_tables
1067            .to_change
1068            .iter()
1069            .filter(|k| new_pieces.source_tasks.contains_key(k))
1070            .collect();
1071
1072        let added_table_sources: Vec<&ComponentKey> = diff
1073            .enrichment_tables
1074            .to_add
1075            .iter()
1076            .filter(|k| new_pieces.source_tasks.contains_key(k))
1077            .collect();
1078
1079        for key in changed_table_sources {
1080            debug!(message = "Spawning changed enrichment table source.", component_id = %key);
1081            self.spawn_source(key, &mut new_pieces);
1082        }
1083
1084        for key in added_table_sources {
1085            debug!(message = "Spawning new enrichment table source.", component_id = %key);
1086            self.spawn_source(key, &mut new_pieces);
1087        }
1088
1089        for key in &diff.transforms.to_change {
1090            debug!(message = "Spawning changed transform.", component_id = %key);
1091            self.spawn_transform(key, &mut new_pieces);
1092        }
1093
1094        for key in &diff.transforms.to_add {
1095            debug!(message = "Spawning new transform.", component_id = %key);
1096            self.spawn_transform(key, &mut new_pieces);
1097        }
1098
1099        for key in &diff.sinks.to_change {
1100            debug!(message = "Spawning changed sink.", component_id = %key);
1101            self.spawn_sink(key, &mut new_pieces);
1102        }
1103
1104        for key in &diff.sinks.to_add {
1105            trace!(message = "Spawning new sink.", component_id = %key);
1106            self.spawn_sink(key, &mut new_pieces);
1107        }
1108
1109        let changed_tables: Vec<&ComponentKey> = diff
1110            .enrichment_tables
1111            .to_change
1112            .iter()
1113            .filter(|k| {
1114                new_pieces.tasks.contains_key(k) && !new_pieces.source_tasks.contains_key(k)
1115            })
1116            .collect();
1117
1118        let added_tables: Vec<&ComponentKey> = diff
1119            .enrichment_tables
1120            .to_add
1121            .iter()
1122            .filter(|k| {
1123                new_pieces.tasks.contains_key(k) && !new_pieces.source_tasks.contains_key(k)
1124            })
1125            .collect();
1126
1127        for key in changed_tables {
1128            debug!(message = "Spawning changed enrichment table sink.", component_id = %key);
1129            self.spawn_sink(key, &mut new_pieces);
1130        }
1131
1132        for key in added_tables {
1133            debug!(message = "Spawning enrichment table new sink.", component_id = %key);
1134            self.spawn_sink(key, &mut new_pieces);
1135        }
1136    }
1137
1138    fn spawn_sink(&mut self, key: &ComponentKey, new_pieces: &mut builder::TopologyPieces) {
1139        let task = new_pieces.tasks.remove(key).unwrap();
1140        let span = error_span!(
1141            "sink",
1142            component_kind = "sink",
1143            component_id = %task.id(),
1144            component_type = %task.typetag(),
1145        );
1146
1147        let task_span = span.or_current();
1148        #[cfg(feature = "allocation-tracing")]
1149        if crate::internal_telemetry::allocations::is_allocation_tracing_enabled() {
1150            let group_id = crate::internal_telemetry::allocations::acquire_allocation_group_id(
1151                task.id().to_string(),
1152                "sink".to_string(),
1153                task.typetag().to_string(),
1154            );
1155            debug!(
1156                component_kind = "sink",
1157                component_type = task.typetag(),
1158                component_id = task.id(),
1159                group_id = group_id.as_raw().to_string(),
1160                "Registered new allocation group."
1161            );
1162            group_id.attach_to_span(&task_span);
1163        }
1164
1165        let task_name = format!(">> {} ({})", task.typetag(), task.id());
1166        let task = {
1167            let key = key.clone();
1168            handle_errors(task, self.abort_tx.clone(), |error| {
1169                ShutdownError::SinkAborted { key, error }
1170            })
1171        }
1172        .instrument(task_span);
1173        let spawned = spawn_named(task, task_name.as_ref());
1174        if let Some(previous) = self.tasks.insert(key.clone(), spawned) {
1175            drop(previous); // detach and forget
1176        }
1177    }
1178
1179    fn spawn_transform(&mut self, key: &ComponentKey, new_pieces: &mut builder::TopologyPieces) {
1180        let task = new_pieces.tasks.remove(key).unwrap();
1181        let span = error_span!(
1182            "transform",
1183            component_kind = "transform",
1184            component_id = %task.id(),
1185            component_type = %task.typetag(),
1186        );
1187
1188        let task_span = span.or_current();
1189        #[cfg(feature = "allocation-tracing")]
1190        if crate::internal_telemetry::allocations::is_allocation_tracing_enabled() {
1191            let group_id = crate::internal_telemetry::allocations::acquire_allocation_group_id(
1192                task.id().to_string(),
1193                "transform".to_string(),
1194                task.typetag().to_string(),
1195            );
1196            debug!(
1197                component_kind = "transform",
1198                component_type = task.typetag(),
1199                component_id = task.id(),
1200                group_id = group_id.as_raw().to_string(),
1201                "Registered new allocation group."
1202            );
1203            group_id.attach_to_span(&task_span);
1204        }
1205
1206        let task_name = format!(">> {} ({}) >>", task.typetag(), task.id());
1207        let task = {
1208            let key = key.clone();
1209            handle_errors(task, self.abort_tx.clone(), |error| {
1210                ShutdownError::TransformAborted { key, error }
1211            })
1212        }
1213        .instrument(task_span);
1214        let spawned = spawn_named(task, task_name.as_ref());
1215        if let Some(previous) = self.tasks.insert(key.clone(), spawned) {
1216            drop(previous); // detach and forget
1217        }
1218    }
1219
1220    fn spawn_source(&mut self, key: &ComponentKey, new_pieces: &mut builder::TopologyPieces) {
1221        let task = new_pieces.tasks.remove(key).unwrap();
1222        let span = error_span!(
1223            "source",
1224            component_kind = "source",
1225            component_id = %task.id(),
1226            component_type = %task.typetag(),
1227        );
1228
1229        let task_span = span.or_current();
1230        #[cfg(feature = "allocation-tracing")]
1231        if crate::internal_telemetry::allocations::is_allocation_tracing_enabled() {
1232            let group_id = crate::internal_telemetry::allocations::acquire_allocation_group_id(
1233                task.id().to_string(),
1234                "source".to_string(),
1235                task.typetag().to_string(),
1236            );
1237
1238            debug!(
1239                component_kind = "source",
1240                component_type = task.typetag(),
1241                component_id = task.id(),
1242                group_id = group_id.as_raw().to_string(),
1243                "Registered new allocation group."
1244            );
1245            group_id.attach_to_span(&task_span);
1246        }
1247
1248        let task_name = format!("{} ({}) >>", task.typetag(), task.id());
1249        let task = {
1250            let key = key.clone();
1251            handle_errors(task, self.abort_tx.clone(), |error| {
1252                ShutdownError::SourceAborted { key, error }
1253            })
1254        }
1255        .instrument(task_span.clone());
1256        let spawned = spawn_named(task, task_name.as_ref());
1257        if let Some(previous) = self.tasks.insert(key.clone(), spawned) {
1258            drop(previous); // detach and forget
1259        }
1260
1261        self.shutdown_coordinator
1262            .takeover_source(key, &mut new_pieces.shutdown_coordinator);
1263
1264        // Now spawn the actual source task.
1265        let source_task = new_pieces.source_tasks.remove(key).unwrap();
1266        let source_task = {
1267            let key = key.clone();
1268            handle_errors(source_task, self.abort_tx.clone(), |error| {
1269                ShutdownError::SourceAborted { key, error }
1270            })
1271        }
1272        .instrument(task_span);
1273        self.source_tasks
1274            .insert(key.clone(), spawn_named(source_task, task_name.as_ref()));
1275    }
1276
1277    pub async fn start_init_validated(
1278        config: Config,
1279        extra_context: ExtraContext,
1280    ) -> Option<(Self, ShutdownErrorReceiver)> {
1281        let diff = ConfigDiff::initial(&config);
1282        let pieces = TopologyPiecesBuilder::new(&config, &diff)
1283            .with_extra_context(extra_context)
1284            .build_or_log_errors()
1285            .await?;
1286        Self::start_validated(config, diff, pieces).await
1287    }
1288
1289    pub async fn start_validated(
1290        config: Config,
1291        diff: ConfigDiff,
1292        mut pieces: TopologyPieces,
1293    ) -> Option<(Self, ShutdownErrorReceiver)> {
1294        let (abort_tx, abort_rx) = mpsc::unbounded_channel();
1295
1296        let expire_metrics = match (
1297            config.global.expire_metrics,
1298            config.global.expire_metrics_secs,
1299        ) {
1300            (Some(e), None) => {
1301                warn!(
1302                    "DEPRECATED: `expire_metrics` setting is deprecated and will be removed in a future version. Use `expire_metrics_secs` instead."
1303                );
1304                if e < Duration::from_secs(0) {
1305                    None
1306                } else {
1307                    Some(e.as_secs_f64())
1308                }
1309            }
1310            (Some(_), Some(_)) => {
1311                error!(
1312                    message = "Cannot set both `expire_metrics` and `expire_metrics_secs`.",
1313                    internal_log_rate_limit = false
1314                );
1315                return None;
1316            }
1317            (None, Some(e)) => {
1318                if e < 0f64 {
1319                    None
1320                } else {
1321                    Some(e)
1322                }
1323            }
1324            (None, None) => Some(300f64),
1325        };
1326
1327        if let Err(error) = crate::metrics::Controller::get()
1328            .expect("Metrics must be initialized")
1329            .set_expiry(
1330                expire_metrics,
1331                config
1332                    .global
1333                    .expire_metrics_per_metric_set
1334                    .clone()
1335                    .unwrap_or_default(),
1336            )
1337        {
1338            error!(message = "Invalid metrics expiry.", %error, internal_log_rate_limit = false);
1339            return None;
1340        }
1341
1342        let (utilization_emitter, utilization_registry) = pieces
1343            .utilization
1344            .take()
1345            .expect("Topology is missing the utilization metric emitter!");
1346        let metrics_storage = pieces.metrics_storage.clone();
1347        let metrics_refresh_period = config
1348            .global
1349            .metrics_storage_refresh_period
1350            .map(Duration::from_secs_f64);
1351        let mut running_topology = Self::new(config, abort_tx);
1352
1353        if !running_topology
1354            .run_healthchecks(&diff, &mut pieces, running_topology.config.healthchecks)
1355            .await
1356        {
1357            return None;
1358        }
1359        running_topology.connect_diff(&diff, &mut pieces).await;
1360        running_topology.spawn_diff(&diff, pieces);
1361
1362        let (utilization_task_shutdown_trigger, utilization_shutdown_signal, _) =
1363            ShutdownSignal::new_wired();
1364        running_topology.utilization_registry = Some(utilization_registry.clone());
1365        running_topology.utilization_task_shutdown_trigger =
1366            Some(utilization_task_shutdown_trigger);
1367        running_topology.utilization_task = Some(tokio::spawn(Task::new(
1368            "utilization_heartbeat".into(),
1369            "",
1370            async move {
1371                utilization_emitter
1372                    .run_utilization(utilization_shutdown_signal)
1373                    .await;
1374                Ok(TaskOutput::Healthcheck)
1375            },
1376        )));
1377        if let Some(metrics_refresh_period) = metrics_refresh_period {
1378            let (metrics_task_shutdown_trigger, metrics_shutdown_signal, _) =
1379                ShutdownSignal::new_wired();
1380            running_topology.metrics_task_shutdown_trigger = Some(metrics_task_shutdown_trigger);
1381            running_topology.metrics_task = Some(tokio::spawn(Task::new(
1382                "metrics_heartbeat".into(),
1383                "",
1384                async move {
1385                    metrics_storage
1386                        .run_periodic_refresh(metrics_refresh_period, metrics_shutdown_signal)
1387                        .await;
1388                    Ok(TaskOutput::Healthcheck)
1389                },
1390            )));
1391        }
1392
1393        Some((running_topology, abort_rx))
1394    }
1395}
1396
1397fn get_changed_outputs(diff: &ConfigDiff, output_ids: Inputs<OutputId>) -> Vec<OutputId> {
1398    let mut changed_outputs = Vec::new();
1399
1400    for source_key in &diff.sources.to_change {
1401        changed_outputs.extend(
1402            output_ids
1403                .iter()
1404                .filter(|id| &id.component == source_key)
1405                .cloned(),
1406        );
1407    }
1408
1409    for transform_key in &diff.transforms.to_change {
1410        changed_outputs.extend(
1411            output_ids
1412                .iter()
1413                .filter(|id| &id.component == transform_key)
1414                .cloned(),
1415        );
1416    }
1417
1418    changed_outputs
1419}