vector/topology/
running.rs

1use std::{
2    collections::{HashMap, HashSet},
3    sync::{
4        Arc, Mutex,
5        atomic::{AtomicBool, Ordering},
6    },
7};
8
9use futures::{Future, FutureExt, future};
10use snafu::Snafu;
11use stream_cancel::Trigger;
12use tokio::{
13    sync::{mpsc, watch},
14    time::{Duration, Instant, interval, sleep_until},
15};
16use tracing::Instrument;
17use vector_lib::{
18    buffers::topology::channel::BufferSender,
19    shutdown::ShutdownSignal,
20    tap::topology::{TapOutput, TapResource, WatchRx, WatchTx},
21    trigger::DisabledTrigger,
22};
23
24use super::{
25    BuiltBuffer, TaskHandle,
26    builder::{self, TopologyPieces, TopologyPiecesBuilder, reload_enrichment_tables},
27    fanout::{ControlChannel, ControlMessage},
28    handle_errors, retain, take_healthchecks,
29    task::{Task, TaskOutput},
30};
31use crate::{
32    config::{ComponentKey, Config, ConfigDiff, HealthcheckOptions, Inputs, OutputId, Resource},
33    event::EventArray,
34    extra_context::ExtraContext,
35    shutdown::SourceShutdownCoordinator,
36    signal::ShutdownError,
37    spawn_named,
38    utilization::UtilizationRegistry,
39};
40
41pub type ShutdownErrorReceiver = mpsc::UnboundedReceiver<ShutdownError>;
42
43#[derive(Debug, Snafu)]
44pub enum ReloadError {
45    #[snafu(display("global options changed: {}", changed_fields.join(", ")))]
46    GlobalOptionsChanged { changed_fields: Vec<String> },
47    #[snafu(display("failed to compute global diff: {}", source))]
48    GlobalDiffFailed { source: serde_json::Error },
49    #[snafu(display("topology build failed"))]
50    TopologyBuildFailed,
51    #[snafu(display("failed to restore previous config"))]
52    FailedToRestore,
53}
54
55#[allow(dead_code)]
56pub struct RunningTopology {
57    inputs: HashMap<ComponentKey, BufferSender<EventArray>>,
58    inputs_tap_metadata: HashMap<ComponentKey, Inputs<OutputId>>,
59    outputs: HashMap<OutputId, ControlChannel>,
60    outputs_tap_metadata: HashMap<ComponentKey, (&'static str, String)>,
61    source_tasks: HashMap<ComponentKey, TaskHandle>,
62    tasks: HashMap<ComponentKey, TaskHandle>,
63    shutdown_coordinator: SourceShutdownCoordinator,
64    detach_triggers: HashMap<ComponentKey, DisabledTrigger>,
65    pub(crate) config: Config,
66    pub(crate) abort_tx: mpsc::UnboundedSender<ShutdownError>,
67    watch: (WatchTx, WatchRx),
68    pub(crate) running: Arc<AtomicBool>,
69    graceful_shutdown_duration: Option<Duration>,
70    utilization_registry: Option<UtilizationRegistry>,
71    utilization_task: Option<TaskHandle>,
72    utilization_task_shutdown_trigger: Option<Trigger>,
73    metrics_task: Option<TaskHandle>,
74    metrics_task_shutdown_trigger: Option<Trigger>,
75    pending_reload: Option<HashSet<ComponentKey>>,
76}
77
78impl RunningTopology {
79    pub fn new(config: Config, abort_tx: mpsc::UnboundedSender<ShutdownError>) -> Self {
80        Self {
81            inputs: HashMap::new(),
82            inputs_tap_metadata: HashMap::new(),
83            outputs: HashMap::new(),
84            outputs_tap_metadata: HashMap::new(),
85            shutdown_coordinator: SourceShutdownCoordinator::default(),
86            detach_triggers: HashMap::new(),
87            source_tasks: HashMap::new(),
88            tasks: HashMap::new(),
89            abort_tx,
90            watch: watch::channel(TapResource::default()),
91            running: Arc::new(AtomicBool::new(true)),
92            graceful_shutdown_duration: config.graceful_shutdown_duration,
93            config,
94            utilization_registry: None,
95            utilization_task: None,
96            utilization_task_shutdown_trigger: None,
97            metrics_task: None,
98            metrics_task_shutdown_trigger: None,
99            pending_reload: None,
100        }
101    }
102
103    /// Gets the configuration that represents this running topology.
104    pub const fn config(&self) -> &Config {
105        &self.config
106    }
107
108    /// Adds a set of component keys to the pending reload set if one exists. Otherwise, it
109    /// initializes the pending reload set.
110    pub fn extend_reload_set(&mut self, new_set: HashSet<ComponentKey>) {
111        match &mut self.pending_reload {
112            None => self.pending_reload = Some(new_set.clone()),
113            Some(existing) => existing.extend(new_set),
114        }
115    }
116
117    /// Creates a subscription to topology changes.
118    ///
119    /// This is used by the tap API to observe configuration changes, and re-wire tap sinks.
120    pub fn watch(&self) -> watch::Receiver<TapResource> {
121        self.watch.1.clone()
122    }
123
124    /// Signal that all sources in this topology are ended.
125    ///
126    /// The future returned by this function will finish once all the sources in
127    /// this topology have finished. This allows the caller to wait for or
128    /// detect that the sources in the topology are no longer
129    /// producing. [`Application`][crate::app::Application], as an example, uses this as a
130    /// shutdown signal.
131    pub fn sources_finished(&self) -> future::BoxFuture<'static, ()> {
132        self.shutdown_coordinator.shutdown_tripwire()
133    }
134
135    /// Shut down all topology components.
136    ///
137    /// This function sends the shutdown signal to all sources in this topology
138    /// and returns a future that resolves once all components (sources,
139    /// transforms, and sinks) have finished shutting down. Transforms and sinks
140    /// will shut down automatically once their input tasks finish.
141    ///
142    /// This function takes ownership of `self`, so once it returns everything
143    /// in the [`RunningTopology`] instance has been dropped except for the
144    /// `tasks` map. This map gets moved into the returned future and is used to
145    /// poll for when the tasks have completed. Once the returned future is
146    /// dropped then everything from this RunningTopology instance is fully
147    /// dropped.
148    pub fn stop(self) -> impl Future<Output = ()> {
149        // Update the API's health endpoint to signal shutdown
150        self.running.store(false, Ordering::Relaxed);
151        // Create handy handles collections of all tasks for the subsequent
152        // operations.
153        let mut wait_handles = Vec::new();
154        // We need a Vec here since source components have two tasks. One for
155        // pump in self.tasks, and the other for source in self.source_tasks.
156        let mut check_handles = HashMap::<ComponentKey, Vec<_>>::new();
157
158        let map_closure = |_result| ();
159
160        // We need to give some time to the sources to gracefully shutdown, so
161        // we will merge them with other tasks.
162        for (key, task) in self.tasks.into_iter().chain(self.source_tasks.into_iter()) {
163            let task = task.map(map_closure).shared();
164
165            wait_handles.push(task.clone());
166            check_handles.entry(key).or_default().push(task);
167        }
168
169        if let Some(utilization_task) = self.utilization_task {
170            wait_handles.push(utilization_task.map(map_closure).shared());
171        }
172
173        if let Some(metrics_task) = self.metrics_task {
174            wait_handles.push(metrics_task.map(map_closure).shared());
175        }
176
177        // If we reach this, we will forcefully shutdown the sources. If None, we will never force shutdown.
178        let deadline = self
179            .graceful_shutdown_duration
180            .map(|grace_period| Instant::now() + grace_period);
181
182        let timeout = if let Some(deadline) = deadline {
183            // If we reach the deadline, this future will print out which components
184            // won't gracefully shutdown since we will start to forcefully shutdown
185            // the sources.
186            let mut check_handles2 = check_handles.clone();
187            Box::pin(async move {
188                sleep_until(deadline).await;
189                // Remove all tasks that have shutdown.
190                check_handles2.retain(|_key, handles| {
191                    retain(handles, |handle| handle.peek().is_none());
192                    !handles.is_empty()
193                });
194                let remaining_components = check_handles2
195                    .keys()
196                    .map(|item| item.to_string())
197                    .collect::<Vec<_>>()
198                    .join(", ");
199
200                error!(
201                    components = ?remaining_components,
202                    message = "Failed to gracefully shut down in time. Killing components.",
203                    internal_log_rate_limit = false
204                );
205            }) as future::BoxFuture<'static, ()>
206        } else {
207            Box::pin(future::pending()) as future::BoxFuture<'static, ()>
208        };
209
210        // Reports in intervals which components are still running.
211        let mut interval = interval(Duration::from_secs(5));
212        let reporter = async move {
213            loop {
214                interval.tick().await;
215
216                // Remove all tasks that have shutdown.
217                check_handles.retain(|_key, handles| {
218                    retain(handles, |handle| handle.peek().is_none());
219                    !handles.is_empty()
220                });
221                let remaining_components = check_handles
222                    .keys()
223                    .map(|item| item.to_string())
224                    .collect::<Vec<_>>()
225                    .join(", ");
226
227                let (deadline_passed, time_remaining) = match deadline {
228                    Some(d) => match d.checked_duration_since(Instant::now()) {
229                        Some(remaining) => (false, format!("{} seconds left", remaining.as_secs())),
230                        None => (true, "overdue".to_string()),
231                    },
232                    None => (false, "no time limit".to_string()),
233                };
234
235                info!(
236                    remaining_components = ?remaining_components,
237                    time_remaining = ?time_remaining,
238                    "Shutting down... Waiting on running components."
239                );
240
241                let all_done = check_handles.is_empty();
242
243                if all_done {
244                    info!("Shutdown reporter exiting: all components shut down.");
245                    break;
246                } else if deadline_passed {
247                    error!(remaining_components = ?remaining_components, "Shutdown reporter: deadline exceeded.");
248                    break;
249                }
250            }
251        };
252
253        // Finishes once all tasks have shutdown.
254        let success = futures::future::join_all(wait_handles).map(|_| ());
255
256        // Aggregate future that ends once anything detects that all tasks have shutdown.
257        let shutdown_complete_future = future::select_all(vec![
258            Box::pin(timeout) as future::BoxFuture<'static, ()>,
259            Box::pin(reporter) as future::BoxFuture<'static, ()>,
260            Box::pin(success) as future::BoxFuture<'static, ()>,
261        ]);
262
263        // Now kick off the shutdown process by shutting down the sources.
264        let source_shutdown_complete = self.shutdown_coordinator.shutdown_all(deadline);
265        if let Some(trigger) = self.utilization_task_shutdown_trigger {
266            trigger.cancel();
267        }
268        if let Some(trigger) = self.metrics_task_shutdown_trigger {
269            trigger.cancel();
270        }
271
272        futures::future::join(source_shutdown_complete, shutdown_complete_future).map(|_| ())
273    }
274
275    /// Attempts to load a new configuration and update this running topology.
276    ///
277    /// If the new configuration was valid, and all changes were able to be made -- removing of
278    /// old components, changing of existing components, adding of new components -- then
279    /// `Ok(())` is returned.
280    ///
281    /// If the new configuration is not valid, or not all of the changes in the new configuration
282    /// were able to be made, then this method will attempt to undo the changes made and bring the
283    /// topology back to its previous state, returning the appropriate error.
284    ///
285    /// If the restore also fails, `ReloadError::FailedToRestore` is returned.
286    pub async fn reload_config_and_respawn(
287        &mut self,
288        new_config: Config,
289        extra_context: ExtraContext,
290    ) -> Result<(), ReloadError> {
291        info!("Reloading running topology with new configuration.");
292
293        if self.config.global != new_config.global {
294            return match self.config.global.diff(&new_config.global) {
295                Ok(changed_fields) => Err(ReloadError::GlobalOptionsChanged { changed_fields }),
296                Err(source) => Err(ReloadError::GlobalDiffFailed { source }),
297            };
298        }
299
300        // Calculate the change between the current configuration and the new configuration, and
301        // shutdown any components that are changing so that we can reclaim their buffers before
302        // spawning the new version of the component.
303        //
304        // We also shutdown any component that is simply being removed entirely.
305        let diff = if let Some(components) = &self.pending_reload {
306            ConfigDiff::new(&self.config, &new_config, components.clone())
307        } else {
308            ConfigDiff::new(&self.config, &new_config, HashSet::new())
309        };
310        let buffers = self.shutdown_diff(&diff, &new_config).await;
311
312        // Gives windows some time to make available any port
313        // released by shutdown components.
314        // Issue: https://github.com/vectordotdev/vector/issues/3035
315        if cfg!(windows) {
316            // This value is guess work.
317            tokio::time::sleep(Duration::from_millis(200)).await;
318        }
319
320        // Try to build all of the new components coming from the new configuration.  If we can
321        // successfully build them, we'll attempt to connect them up to the topology and spawn their
322        // respective component tasks.
323        if let Some(mut new_pieces) = TopologyPiecesBuilder::new(&new_config, &diff)
324            .with_buffers(buffers.clone())
325            .with_extra_context(extra_context.clone())
326            .with_utilization_registry(self.utilization_registry.clone())
327            .build_or_log_errors()
328            .await
329        {
330            // If healthchecks are configured for any of the changing/new components, try running
331            // them before moving forward with connecting and spawning.  In some cases, healthchecks
332            // failing may be configured as a non-blocking issue and so we'll still continue on.
333            if self
334                .run_healthchecks(&diff, &mut new_pieces, new_config.healthchecks)
335                .await
336            {
337                self.connect_diff(&diff, &mut new_pieces).await;
338                self.spawn_diff(&diff, new_pieces);
339                self.config = new_config;
340
341                info!("New configuration loaded successfully.");
342
343                return Ok(());
344            }
345        }
346
347        // We failed to build, connect, and spawn all of the changed/new components, so we flip
348        // around the configuration differential to generate all the components that we need to
349        // bring back to restore the current configuration.
350        warn!("Failed to completely load new configuration. Restoring old configuration.");
351
352        let diff = diff.flip();
353        if let Some(mut new_pieces) = TopologyPiecesBuilder::new(&self.config, &diff)
354            .with_buffers(buffers)
355            .with_extra_context(extra_context.clone())
356            .with_utilization_registry(self.utilization_registry.clone())
357            .build_or_log_errors()
358            .await
359            && self
360                .run_healthchecks(&diff, &mut new_pieces, self.config.healthchecks)
361                .await
362        {
363            self.connect_diff(&diff, &mut new_pieces).await;
364            self.spawn_diff(&diff, new_pieces);
365
366            info!("Old configuration restored successfully.");
367
368            return Err(ReloadError::TopologyBuildFailed);
369        }
370
371        error!(
372            message = "Failed to restore old configuration.",
373            internal_log_rate_limit = false
374        );
375
376        Err(ReloadError::FailedToRestore)
377    }
378
379    /// Attempts to reload enrichment tables.
380    pub(crate) async fn reload_enrichment_tables(&self) {
381        reload_enrichment_tables(&self.config).await;
382    }
383
384    pub(crate) async fn run_healthchecks(
385        &mut self,
386        diff: &ConfigDiff,
387        pieces: &mut TopologyPieces,
388        options: HealthcheckOptions,
389    ) -> bool {
390        if options.enabled {
391            let healthchecks = take_healthchecks(diff, pieces)
392                .into_iter()
393                .map(|(_, task)| task);
394            let healthchecks = future::try_join_all(healthchecks);
395
396            info!("Running healthchecks.");
397            if options.require_healthy {
398                let success = healthchecks.await;
399
400                if success.is_ok() {
401                    info!("All healthchecks passed.");
402                    true
403                } else {
404                    error!(
405                        message = "Sinks unhealthy.",
406                        internal_log_rate_limit = false
407                    );
408                    false
409                }
410            } else {
411                tokio::spawn(healthchecks);
412                true
413            }
414        } else {
415            true
416        }
417    }
418
419    /// Shuts down any changed/removed component in the given configuration diff.
420    ///
421    /// If buffers for any of the changed/removed components can be recovered, they'll be returned.
422    async fn shutdown_diff(
423        &mut self,
424        diff: &ConfigDiff,
425        new_config: &Config,
426    ) -> HashMap<ComponentKey, BuiltBuffer> {
427        // First, we shutdown any changed/removed sources. This ensures that we can allow downstream
428        // components to terminate naturally by virtue of the flow of events stopping.
429        if diff.sources.any_changed_or_removed() {
430            let timeout = Duration::from_secs(30);
431            let mut source_shutdown_handles = Vec::new();
432
433            let deadline = Instant::now() + timeout;
434            for key in &diff.sources.to_remove {
435                debug!(component_id = %key, "Removing source.");
436
437                let previous = self.tasks.remove(key).unwrap();
438                drop(previous); // detach and forget
439
440                self.remove_outputs(key);
441                source_shutdown_handles
442                    .push(self.shutdown_coordinator.shutdown_source(key, deadline));
443            }
444
445            for key in &diff.sources.to_change {
446                debug!(component_id = %key, "Changing source.");
447
448                self.remove_outputs(key);
449                source_shutdown_handles
450                    .push(self.shutdown_coordinator.shutdown_source(key, deadline));
451            }
452
453            debug!(
454                "Waiting for up to {} seconds for source(s) to finish shutting down.",
455                timeout.as_secs()
456            );
457            futures::future::join_all(source_shutdown_handles).await;
458
459            // Final cleanup pass now that all changed/removed sources have signalled as having shutdown.
460            for key in diff.sources.removed_and_changed() {
461                if let Some(task) = self.source_tasks.remove(key) {
462                    task.await.unwrap().unwrap();
463                }
464            }
465        }
466
467        // Next, we shutdown any changed/removed transforms.  Same as before: we want allow
468        // downstream components to terminate naturally by virtue of the flow of events stopping.
469        //
470        // Since transforms are entirely driven by the flow of events into them from upstream
471        // components, the shutdown of sources they depend on, or the shutdown of transforms they
472        // depend on, and thus the closing of their buffer, will naturally cause them to shutdown,
473        // which is why we don't do any manual triggering of shutdown here.
474        for key in &diff.transforms.to_remove {
475            debug!(component_id = %key, "Removing transform.");
476
477            let previous = self.tasks.remove(key).unwrap();
478            drop(previous); // detach and forget
479
480            self.remove_inputs(key, diff, new_config).await;
481            self.remove_outputs(key);
482
483            if let Some(registry) = self.utilization_registry.as_ref() {
484                registry.remove_component(key);
485            }
486        }
487
488        for key in &diff.transforms.to_change {
489            debug!(component_id = %key, "Changing transform.");
490
491            self.remove_inputs(key, diff, new_config).await;
492            self.remove_outputs(key);
493        }
494
495        // Now we'll process any changed/removed sinks.
496        //
497        // At this point both the old and the new config don't have conflicts in their resource
498        // usage. So if we combine their resources, all found conflicts are between to be removed
499        // and to be added components.
500        let removed_table_sinks = diff
501            .enrichment_tables
502            .removed_and_changed()
503            .filter_map(|key| {
504                self.config
505                    .enrichment_table(key)
506                    .and_then(|t| t.as_sink(key))
507                    .map(|(key, s)| (key.clone(), s.resources(&key)))
508            })
509            .collect::<Vec<_>>();
510        let remove_sink = diff
511            .sinks
512            .removed_and_changed()
513            .map(|key| {
514                (
515                    key,
516                    self.config
517                        .sink(key)
518                        .map(|s| s.resources(key))
519                        .unwrap_or_default(),
520                )
521            })
522            .chain(removed_table_sinks.iter().map(|(k, s)| (k, s.clone())));
523        let add_source = diff
524            .sources
525            .changed_and_added()
526            .map(|key| (key, new_config.source(key).unwrap().inner.resources()));
527        let added_table_sinks = diff
528            .enrichment_tables
529            .changed_and_added()
530            .filter_map(|key| {
531                self.config
532                    .enrichment_table(key)
533                    .and_then(|t| t.as_sink(key))
534                    .map(|(key, s)| (key.clone(), s.resources(&key)))
535            })
536            .collect::<Vec<_>>();
537        let add_sink = diff
538            .sinks
539            .changed_and_added()
540            .map(|key| {
541                (
542                    key,
543                    new_config
544                        .sink(key)
545                        .map(|s| s.resources(key))
546                        .unwrap_or_default(),
547                )
548            })
549            .chain(added_table_sinks.iter().map(|(k, s)| (k, s.clone())));
550        let conflicts = Resource::conflicts(
551            remove_sink.map(|(key, value)| ((true, key), value)).chain(
552                add_sink
553                    .chain(add_source)
554                    .map(|(key, value)| ((false, key), value)),
555            ),
556        )
557        .into_iter()
558        .flat_map(|(_, components)| components)
559        .collect::<HashSet<_>>();
560        // Existing conflicting sinks
561        let conflicting_sinks = conflicts
562            .into_iter()
563            .filter(|&(existing_sink, _)| existing_sink)
564            .map(|(_, key)| key.clone());
565
566        // For any sink whose buffer configuration didn't change, we can reuse their buffer.
567        let reuse_buffers = diff
568            .sinks
569            .to_change
570            .iter()
571            .filter(|&key| {
572                if diff.components_to_reload.contains(key) {
573                    return false;
574                }
575                self.config.sink(key).map(|s| s.buffer.clone()).or_else(|| {
576                    self.config
577                        .enrichment_table(key)
578                        .and_then(|t| t.as_sink(key))
579                        .map(|(_, s)| s.buffer)
580                }) == new_config.sink(key).map(|s| s.buffer.clone()).or_else(|| {
581                    self.config
582                        .enrichment_table(key)
583                        .and_then(|t| t.as_sink(key))
584                        .map(|(_, s)| s.buffer)
585                })
586            })
587            .cloned()
588            .collect::<HashSet<_>>();
589
590        // For any existing sink that has a conflicting resource dependency with a changed/added
591        // sink, or for any sink that we want to reuse their buffer, we need to explicit wait for
592        // them to finish processing so we can reclaim ownership of those resources/buffers.
593        let wait_for_sinks = conflicting_sinks
594            .chain(reuse_buffers.iter().cloned())
595            .collect::<HashSet<_>>();
596
597        // First, we remove any inputs to removed sinks so they can naturally shut down.
598        let removed_sinks = diff
599            .sinks
600            .to_remove
601            .iter()
602            .chain(diff.enrichment_tables.to_remove.iter().filter(|key| {
603                self.config
604                    .enrichment_table(key)
605                    .and_then(|t| t.as_sink(key))
606                    .is_some()
607            }))
608            .collect::<Vec<_>>();
609        for key in &removed_sinks {
610            debug!(component_id = %key, "Removing sink.");
611            self.remove_inputs(key, diff, new_config).await;
612
613            if let Some(registry) = self.utilization_registry.as_ref() {
614                registry.remove_component(key);
615            }
616        }
617
618        // After that, for any changed sinks, we temporarily detach their inputs (not remove) so
619        // they can naturally shutdown and allow us to recover their buffers if possible.
620        let mut buffer_tx = HashMap::new();
621
622        let sinks_to_change = diff
623            .sinks
624            .to_change
625            .iter()
626            .chain(diff.enrichment_tables.to_change.iter().filter(|key| {
627                self.config
628                    .enrichment_table(key)
629                    .and_then(|t| t.as_sink(key))
630                    .is_some()
631            }))
632            .collect::<Vec<_>>();
633
634        for key in &sinks_to_change {
635            debug!(component_id = %key, "Changing sink.");
636            if reuse_buffers.contains(key) {
637                self.detach_triggers
638                    .remove(key)
639                    .unwrap()
640                    .into_inner()
641                    .cancel();
642
643                // We explicitly clone the input side of the buffer and store it so we don't lose
644                // it when we remove the inputs below.
645                //
646                // We clone instead of removing here because otherwise the input will be missing for
647                // the rest of the reload process, which violates the assumption that all previous
648                // inputs for components not being removed are still available. It's simpler to
649                // allow the "old" input to stick around and be replaced (even though that's
650                // basically a no-op since we're reusing the same buffer) than it is to pass around
651                // info about which sinks are having their buffers reused and treat them differently
652                // at other stages.
653                buffer_tx.insert((*key).clone(), self.inputs.get(key).unwrap().clone());
654            }
655            self.remove_inputs(key, diff, new_config).await;
656        }
657
658        // Now that we've disconnected or temporarily detached the inputs to all changed/removed
659        // sinks, we can actually wait for them to shutdown before collecting any buffers that are
660        // marked for reuse.
661        //
662        // If a sink we're removing isn't tying up any resource that a changed/added sink depends
663        // on, we don't bother waiting for it to shutdown.
664        for key in &removed_sinks {
665            let previous = self.tasks.remove(key).unwrap();
666            if wait_for_sinks.contains(key) {
667                debug!(message = "Waiting for sink to shutdown.", component_id = %key);
668                previous.await.unwrap().unwrap();
669            } else {
670                drop(previous); // detach and forget
671            }
672        }
673
674        let mut buffers = HashMap::<ComponentKey, BuiltBuffer>::new();
675        for key in &sinks_to_change {
676            if wait_for_sinks.contains(key) {
677                let previous = self.tasks.remove(key).unwrap();
678                debug!(message = "Waiting for sink to shutdown.", component_id = %key);
679                let buffer = previous.await.unwrap().unwrap();
680
681                if reuse_buffers.contains(key) {
682                    // We clone instead of removing here because otherwise the input will be
683                    // missing for the rest of the reload process, which violates the assumption
684                    // that all previous inputs for components not being removed are still
685                    // available. It's simpler to allow the "old" input to stick around and be
686                    // replaced (even though that's basically a no-op since we're reusing the same
687                    // buffer) than it is to pass around info about which sinks are having their
688                    // buffers reused and treat them differently at other stages.
689                    let tx = buffer_tx.remove(key).unwrap();
690                    let rx = match buffer {
691                        TaskOutput::Sink(rx) => rx.into_inner(),
692                        _ => unreachable!(),
693                    };
694
695                    buffers.insert((*key).clone(), (tx, Arc::new(Mutex::new(Some(rx)))));
696                }
697            }
698        }
699
700        buffers
701    }
702
703    /// Connects all changed/added components in the given configuration diff.
704    pub(crate) async fn connect_diff(
705        &mut self,
706        diff: &ConfigDiff,
707        new_pieces: &mut TopologyPieces,
708    ) {
709        debug!("Connecting changed/added component(s).");
710
711        // Update tap metadata
712        if !self.watch.0.is_closed() {
713            for key in &diff.sources.to_remove {
714                // Sources only have outputs
715                self.outputs_tap_metadata.remove(key);
716            }
717
718            for key in &diff.transforms.to_remove {
719                // Transforms can have both inputs and outputs
720                self.outputs_tap_metadata.remove(key);
721                self.inputs_tap_metadata.remove(key);
722            }
723
724            for key in &diff.sinks.to_remove {
725                // Sinks only have inputs
726                self.inputs_tap_metadata.remove(key);
727            }
728
729            let removed_sinks = diff.enrichment_tables.to_remove.iter().filter(|key| {
730                self.config
731                    .enrichment_table(key)
732                    .and_then(|t| t.as_sink(key))
733                    .is_some()
734            });
735            for key in removed_sinks {
736                // Sinks only have inputs
737                self.inputs_tap_metadata.remove(key);
738            }
739
740            let removed_sources = diff.enrichment_tables.to_remove.iter().filter_map(|key| {
741                self.config
742                    .enrichment_table(key)
743                    .and_then(|t| t.as_source(key).map(|(key, _)| key))
744            });
745            for key in removed_sources {
746                // Sources only have outputs
747                self.outputs_tap_metadata.remove(&key);
748            }
749
750            for key in diff.sources.changed_and_added() {
751                if let Some(task) = new_pieces.tasks.get(key) {
752                    self.outputs_tap_metadata
753                        .insert(key.clone(), ("source", task.typetag().to_string()));
754                }
755            }
756
757            for key in diff
758                .enrichment_tables
759                .changed_and_added()
760                .filter_map(|key| {
761                    self.config
762                        .enrichment_table(key)
763                        .and_then(|t| t.as_source(key).map(|(key, _)| key))
764                })
765            {
766                if let Some(task) = new_pieces.tasks.get(&key) {
767                    self.outputs_tap_metadata
768                        .insert(key.clone(), ("source", task.typetag().to_string()));
769                }
770            }
771
772            for key in diff.transforms.changed_and_added() {
773                if let Some(task) = new_pieces.tasks.get(key) {
774                    self.outputs_tap_metadata
775                        .insert(key.clone(), ("transform", task.typetag().to_string()));
776                }
777            }
778
779            for (key, input) in &new_pieces.inputs {
780                self.inputs_tap_metadata
781                    .insert(key.clone(), input.1.clone());
782            }
783        }
784
785        // We configure the outputs of any changed/added sources first, so they're available to any
786        // transforms and sinks that come afterwards.
787        for key in diff.sources.changed_and_added() {
788            debug!(component_id = %key, "Configuring outputs for source.");
789            self.setup_outputs(key, new_pieces).await;
790        }
791
792        let added_changed_table_sources: Vec<&ComponentKey> = diff
793            .enrichment_tables
794            .changed_and_added()
795            .filter(|k| new_pieces.source_tasks.contains_key(k))
796            .collect();
797        for key in added_changed_table_sources.iter() {
798            debug!(component_id = %key, "Connecting outputs for enrichment table source.");
799            self.setup_outputs(key, new_pieces).await;
800        }
801
802        // We configure the outputs of any changed/added transforms next, for the same reason: we
803        // need them to be available to any transforms and sinks that come afterwards.
804        for key in diff.transforms.changed_and_added() {
805            debug!(component_id = %key, "Configuring outputs for transform.");
806            self.setup_outputs(key, new_pieces).await;
807        }
808
809        // Now that all possible outputs are configured, we can start wiring up inputs, starting
810        // with transforms.
811        for key in diff.transforms.changed_and_added() {
812            debug!(component_id = %key, "Connecting inputs for transform.");
813            self.setup_inputs(key, diff, new_pieces).await;
814        }
815
816        // Now that all sources and transforms are fully configured, we can wire up sinks.
817        for key in diff.sinks.changed_and_added() {
818            debug!(component_id = %key, "Connecting inputs for sink.");
819            self.setup_inputs(key, diff, new_pieces).await;
820        }
821        let added_changed_tables: Vec<&ComponentKey> = diff
822            .enrichment_tables
823            .changed_and_added()
824            .filter(|k| new_pieces.inputs.contains_key(k))
825            .collect();
826        for key in added_changed_tables.iter() {
827            debug!(component_id = %key, "Connecting inputs for enrichment table sink.");
828            self.setup_inputs(key, diff, new_pieces).await;
829        }
830
831        // We do a final pass here to reconnect unchanged components.
832        //
833        // Why would we reconnect unchanged components?  Well, as sources and transforms will
834        // recreate their fanouts every time they're changed, we can run into a situation where a
835        // transform/sink, which we'll call B, is pointed at a source/transform that was changed, which
836        // we'll call A, but because B itself didn't change at all, we haven't yet reconnected it.
837        //
838        // Instead of propagating connections forward -- B reconnecting A forcefully -- we only
839        // connect components backwards i.e. transforms to sources/transforms, and sinks to
840        // sources/transforms, to ensure we're connecting components in order.
841        self.reattach_severed_inputs(diff);
842
843        // Broadcast any topology changes to subscribers.
844        if !self.watch.0.is_closed() {
845            let outputs = self
846                .outputs
847                .clone()
848                .into_iter()
849                .flat_map(|(output_id, control_tx)| {
850                    self.outputs_tap_metadata.get(&output_id.component).map(
851                        |(component_kind, component_type)| {
852                            (
853                                TapOutput {
854                                    output_id,
855                                    component_kind,
856                                    component_type: component_type.clone(),
857                                },
858                                control_tx,
859                            )
860                        },
861                    )
862                })
863                .collect::<HashMap<_, _>>();
864
865            let mut removals = diff.sources.to_remove.clone();
866            removals.extend(diff.transforms.to_remove.iter().cloned());
867            self.watch
868                .0
869                .send(TapResource {
870                    outputs,
871                    inputs: self.inputs_tap_metadata.clone(),
872                    source_keys: diff
873                        .sources
874                        .changed_and_added()
875                        .map(|key| key.to_string())
876                        .chain(
877                            added_changed_table_sources
878                                .iter()
879                                .map(|key| key.to_string()),
880                        )
881                        .collect(),
882                    sink_keys: diff
883                        .sinks
884                        .changed_and_added()
885                        .map(|key| key.to_string())
886                        .chain(added_changed_tables.iter().map(|key| key.to_string()))
887                        .collect(),
888                    // Note, only sources and transforms are relevant. Sinks do
889                    // not have outputs to tap.
890                    removals,
891                })
892                .expect("Couldn't broadcast config changes.");
893        }
894    }
895
896    async fn setup_outputs(
897        &mut self,
898        key: &ComponentKey,
899        new_pieces: &mut builder::TopologyPieces,
900    ) {
901        let outputs = new_pieces.outputs.remove(key).unwrap();
902        for (port, output) in outputs {
903            debug!(component_id = %key, output_id = ?port, "Configuring output for component.");
904
905            let id = OutputId {
906                component: key.clone(),
907                port,
908            };
909
910            self.outputs.insert(id, output);
911        }
912    }
913
914    async fn setup_inputs(
915        &mut self,
916        key: &ComponentKey,
917        diff: &ConfigDiff,
918        new_pieces: &mut builder::TopologyPieces,
919    ) {
920        let (tx, inputs) = new_pieces.inputs.remove(key).unwrap();
921
922        let old_inputs = self
923            .config
924            .inputs_for_node(key)
925            .into_iter()
926            .flatten()
927            .cloned()
928            .collect::<HashSet<_>>();
929
930        let new_inputs = inputs.iter().cloned().collect::<HashSet<_>>();
931        let inputs_to_add = &new_inputs - &old_inputs;
932
933        for input in inputs {
934            let output = self.outputs.get_mut(&input).expect("unknown output");
935
936            if diff.contains(&input.component) || inputs_to_add.contains(&input) {
937                // If the input we're connecting to is changing, that means its outputs will have been
938                // recreated, so instead of replacing a paused sink, we have to add it to this new
939                // output for the first time, since there's nothing to actually replace at this point.
940                debug!(component_id = %key, fanout_id = %input, "Adding component input to fanout.");
941
942                _ = output.send(ControlMessage::Add(key.clone(), tx.clone()));
943            } else {
944                // We know that if this component is connected to a given input, and neither
945                // components were changed, then the output must still exist, which means we paused
946                // this component's connection to its output, so we have to replace that connection
947                // now:
948                debug!(component_id = %key, fanout_id = %input, "Replacing component input in fanout.");
949
950                _ = output.send(ControlMessage::Replace(key.clone(), tx.clone()));
951            }
952        }
953
954        self.inputs.insert(key.clone(), tx);
955        new_pieces
956            .detach_triggers
957            .remove(key)
958            .map(|trigger| self.detach_triggers.insert(key.clone(), trigger.into()));
959    }
960
961    fn remove_outputs(&mut self, key: &ComponentKey) {
962        self.outputs.retain(|id, _output| &id.component != key);
963    }
964
965    async fn remove_inputs(&mut self, key: &ComponentKey, diff: &ConfigDiff, new_config: &Config) {
966        self.inputs.remove(key);
967        self.detach_triggers.remove(key);
968
969        let old_inputs = self.config.inputs_for_node(key).expect("node exists");
970        let new_inputs = new_config
971            .inputs_for_node(key)
972            .unwrap_or_default()
973            .iter()
974            .collect::<HashSet<_>>();
975
976        for input in old_inputs {
977            if let Some(output) = self.outputs.get_mut(input) {
978                if diff.contains(&input.component)
979                    || diff.is_removed(key)
980                    || !new_inputs.contains(input)
981                {
982                    // 3 cases to remove the input:
983                    //
984                    // Case 1: If the input we're removing ourselves from is changing, that means its
985                    // outputs will be recreated, so instead of pausing the sink, we just delete it
986                    // outright to ensure things are clean.
987                    //
988                    // Case 2: If this component itself is being removed, then pausing makes no sense
989                    // because it isn't coming back.
990                    //
991                    // Case 3: This component is no longer connected to the input from new config.
992                    debug!(component_id = %key, fanout_id = %input, "Removing component input from fanout.");
993
994                    _ = output.send(ControlMessage::Remove(key.clone()));
995                } else {
996                    // We know that if this component is connected to a given input, and it isn't being
997                    // changed, then it will exist when we reconnect inputs, so we should pause it
998                    // now to pause further sends through that component until we reconnect:
999                    debug!(component_id = %key, fanout_id = %input, "Pausing component input in fanout.");
1000
1001                    _ = output.send(ControlMessage::Pause(key.clone()));
1002                }
1003            }
1004        }
1005    }
1006
1007    fn reattach_severed_inputs(&mut self, diff: &ConfigDiff) {
1008        let unchanged_transforms = self
1009            .config
1010            .transforms()
1011            .filter(|(key, _)| !diff.transforms.contains(key));
1012        for (transform_key, transform) in unchanged_transforms {
1013            let changed_outputs = get_changed_outputs(diff, transform.inputs.clone());
1014            for output_id in changed_outputs {
1015                debug!(component_id = %transform_key, fanout_id = %output_id.component, "Reattaching component input to fanout.");
1016
1017                let input = self.inputs.get(transform_key).cloned().unwrap();
1018                let output = self.outputs.get_mut(&output_id).unwrap();
1019                _ = output.send(ControlMessage::Add(transform_key.clone(), input));
1020            }
1021        }
1022
1023        let unchanged_sinks = self
1024            .config
1025            .sinks()
1026            .filter(|(key, _)| !diff.sinks.contains(key));
1027        for (sink_key, sink) in unchanged_sinks {
1028            let changed_outputs = get_changed_outputs(diff, sink.inputs.clone());
1029            for output_id in changed_outputs {
1030                debug!(component_id = %sink_key, fanout_id = %output_id.component, "Reattaching component input to fanout.");
1031
1032                let input = self.inputs.get(sink_key).cloned().unwrap();
1033                let output = self.outputs.get_mut(&output_id).unwrap();
1034                _ = output.send(ControlMessage::Add(sink_key.clone(), input));
1035            }
1036        }
1037    }
1038
1039    /// Starts any new or changed components in the given configuration diff.
1040    pub(crate) fn spawn_diff(&mut self, diff: &ConfigDiff, mut new_pieces: TopologyPieces) {
1041        for key in &diff.sources.to_change {
1042            debug!(message = "Spawning changed source.", component_id = %key);
1043            self.spawn_source(key, &mut new_pieces);
1044        }
1045
1046        for key in &diff.sources.to_add {
1047            debug!(message = "Spawning new source.", component_id = %key);
1048            self.spawn_source(key, &mut new_pieces);
1049        }
1050
1051        let changed_table_sources: Vec<&ComponentKey> = diff
1052            .enrichment_tables
1053            .to_change
1054            .iter()
1055            .filter(|k| new_pieces.source_tasks.contains_key(k))
1056            .collect();
1057
1058        let added_table_sources: Vec<&ComponentKey> = diff
1059            .enrichment_tables
1060            .to_add
1061            .iter()
1062            .filter(|k| new_pieces.source_tasks.contains_key(k))
1063            .collect();
1064
1065        for key in changed_table_sources {
1066            debug!(message = "Spawning changed enrichment table source.", component_id = %key);
1067            self.spawn_source(key, &mut new_pieces);
1068        }
1069
1070        for key in added_table_sources {
1071            debug!(message = "Spawning new enrichment table source.", component_id = %key);
1072            self.spawn_source(key, &mut new_pieces);
1073        }
1074
1075        for key in &diff.transforms.to_change {
1076            debug!(message = "Spawning changed transform.", component_id = %key);
1077            self.spawn_transform(key, &mut new_pieces);
1078        }
1079
1080        for key in &diff.transforms.to_add {
1081            debug!(message = "Spawning new transform.", component_id = %key);
1082            self.spawn_transform(key, &mut new_pieces);
1083        }
1084
1085        for key in &diff.sinks.to_change {
1086            debug!(message = "Spawning changed sink.", component_id = %key);
1087            self.spawn_sink(key, &mut new_pieces);
1088        }
1089
1090        for key in &diff.sinks.to_add {
1091            trace!(message = "Spawning new sink.", component_id = %key);
1092            self.spawn_sink(key, &mut new_pieces);
1093        }
1094
1095        let changed_tables: Vec<&ComponentKey> = diff
1096            .enrichment_tables
1097            .to_change
1098            .iter()
1099            .filter(|k| {
1100                new_pieces.tasks.contains_key(k) && !new_pieces.source_tasks.contains_key(k)
1101            })
1102            .collect();
1103
1104        let added_tables: Vec<&ComponentKey> = diff
1105            .enrichment_tables
1106            .to_add
1107            .iter()
1108            .filter(|k| {
1109                new_pieces.tasks.contains_key(k) && !new_pieces.source_tasks.contains_key(k)
1110            })
1111            .collect();
1112
1113        for key in changed_tables {
1114            debug!(message = "Spawning changed enrichment table sink.", component_id = %key);
1115            self.spawn_sink(key, &mut new_pieces);
1116        }
1117
1118        for key in added_tables {
1119            debug!(message = "Spawning enrichment table new sink.", component_id = %key);
1120            self.spawn_sink(key, &mut new_pieces);
1121        }
1122    }
1123
1124    fn spawn_sink(&mut self, key: &ComponentKey, new_pieces: &mut builder::TopologyPieces) {
1125        let task = new_pieces.tasks.remove(key).unwrap();
1126        let span = error_span!(
1127            "sink",
1128            component_kind = "sink",
1129            component_id = %task.id(),
1130            component_type = %task.typetag(),
1131        );
1132
1133        let task_span = span.or_current();
1134        #[cfg(feature = "allocation-tracing")]
1135        if crate::internal_telemetry::allocations::is_allocation_tracing_enabled() {
1136            let group_id = crate::internal_telemetry::allocations::acquire_allocation_group_id(
1137                task.id().to_string(),
1138                "sink".to_string(),
1139                task.typetag().to_string(),
1140            );
1141            debug!(
1142                component_kind = "sink",
1143                component_type = task.typetag(),
1144                component_id = task.id(),
1145                group_id = group_id.as_raw().to_string(),
1146                "Registered new allocation group."
1147            );
1148            group_id.attach_to_span(&task_span);
1149        }
1150
1151        let task_name = format!(">> {} ({})", task.typetag(), task.id());
1152        let task = {
1153            let key = key.clone();
1154            handle_errors(task, self.abort_tx.clone(), |error| {
1155                ShutdownError::SinkAborted { key, error }
1156            })
1157        }
1158        .instrument(task_span);
1159        let spawned = spawn_named(task, task_name.as_ref());
1160        if let Some(previous) = self.tasks.insert(key.clone(), spawned) {
1161            drop(previous); // detach and forget
1162        }
1163    }
1164
1165    fn spawn_transform(&mut self, key: &ComponentKey, new_pieces: &mut builder::TopologyPieces) {
1166        let task = new_pieces.tasks.remove(key).unwrap();
1167        let span = error_span!(
1168            "transform",
1169            component_kind = "transform",
1170            component_id = %task.id(),
1171            component_type = %task.typetag(),
1172        );
1173
1174        let task_span = span.or_current();
1175        #[cfg(feature = "allocation-tracing")]
1176        if crate::internal_telemetry::allocations::is_allocation_tracing_enabled() {
1177            let group_id = crate::internal_telemetry::allocations::acquire_allocation_group_id(
1178                task.id().to_string(),
1179                "transform".to_string(),
1180                task.typetag().to_string(),
1181            );
1182            debug!(
1183                component_kind = "transform",
1184                component_type = task.typetag(),
1185                component_id = task.id(),
1186                group_id = group_id.as_raw().to_string(),
1187                "Registered new allocation group."
1188            );
1189            group_id.attach_to_span(&task_span);
1190        }
1191
1192        let task_name = format!(">> {} ({}) >>", task.typetag(), task.id());
1193        let task = {
1194            let key = key.clone();
1195            handle_errors(task, self.abort_tx.clone(), |error| {
1196                ShutdownError::TransformAborted { key, error }
1197            })
1198        }
1199        .instrument(task_span);
1200        let spawned = spawn_named(task, task_name.as_ref());
1201        if let Some(previous) = self.tasks.insert(key.clone(), spawned) {
1202            drop(previous); // detach and forget
1203        }
1204    }
1205
1206    fn spawn_source(&mut self, key: &ComponentKey, new_pieces: &mut builder::TopologyPieces) {
1207        let task = new_pieces.tasks.remove(key).unwrap();
1208        let span = error_span!(
1209            "source",
1210            component_kind = "source",
1211            component_id = %task.id(),
1212            component_type = %task.typetag(),
1213        );
1214
1215        let task_span = span.or_current();
1216        #[cfg(feature = "allocation-tracing")]
1217        if crate::internal_telemetry::allocations::is_allocation_tracing_enabled() {
1218            let group_id = crate::internal_telemetry::allocations::acquire_allocation_group_id(
1219                task.id().to_string(),
1220                "source".to_string(),
1221                task.typetag().to_string(),
1222            );
1223
1224            debug!(
1225                component_kind = "source",
1226                component_type = task.typetag(),
1227                component_id = task.id(),
1228                group_id = group_id.as_raw().to_string(),
1229                "Registered new allocation group."
1230            );
1231            group_id.attach_to_span(&task_span);
1232        }
1233
1234        let task_name = format!("{} ({}) >>", task.typetag(), task.id());
1235        let task = {
1236            let key = key.clone();
1237            handle_errors(task, self.abort_tx.clone(), |error| {
1238                ShutdownError::SourceAborted { key, error }
1239            })
1240        }
1241        .instrument(task_span.clone());
1242        let spawned = spawn_named(task, task_name.as_ref());
1243        if let Some(previous) = self.tasks.insert(key.clone(), spawned) {
1244            drop(previous); // detach and forget
1245        }
1246
1247        self.shutdown_coordinator
1248            .takeover_source(key, &mut new_pieces.shutdown_coordinator);
1249
1250        // Now spawn the actual source task.
1251        let source_task = new_pieces.source_tasks.remove(key).unwrap();
1252        let source_task = {
1253            let key = key.clone();
1254            handle_errors(source_task, self.abort_tx.clone(), |error| {
1255                ShutdownError::SourceAborted { key, error }
1256            })
1257        }
1258        .instrument(task_span);
1259        self.source_tasks
1260            .insert(key.clone(), spawn_named(source_task, task_name.as_ref()));
1261    }
1262
1263    pub async fn start_init_validated(
1264        config: Config,
1265        extra_context: ExtraContext,
1266    ) -> Option<(Self, ShutdownErrorReceiver)> {
1267        let diff = ConfigDiff::initial(&config);
1268        let pieces = TopologyPiecesBuilder::new(&config, &diff)
1269            .with_extra_context(extra_context)
1270            .build_or_log_errors()
1271            .await?;
1272        Self::start_validated(config, diff, pieces).await
1273    }
1274
1275    pub async fn start_validated(
1276        config: Config,
1277        diff: ConfigDiff,
1278        mut pieces: TopologyPieces,
1279    ) -> Option<(Self, ShutdownErrorReceiver)> {
1280        let (abort_tx, abort_rx) = mpsc::unbounded_channel();
1281
1282        let expire_metrics = match (
1283            config.global.expire_metrics,
1284            config.global.expire_metrics_secs,
1285        ) {
1286            (Some(e), None) => {
1287                warn!(
1288                    "DEPRECATED: `expire_metrics` setting is deprecated and will be removed in a future version. Use `expire_metrics_secs` instead."
1289                );
1290                if e < Duration::from_secs(0) {
1291                    None
1292                } else {
1293                    Some(e.as_secs_f64())
1294                }
1295            }
1296            (Some(_), Some(_)) => {
1297                error!(
1298                    message = "Cannot set both `expire_metrics` and `expire_metrics_secs`.",
1299                    internal_log_rate_limit = false
1300                );
1301                return None;
1302            }
1303            (None, Some(e)) => {
1304                if e < 0f64 {
1305                    None
1306                } else {
1307                    Some(e)
1308                }
1309            }
1310            (None, None) => Some(300f64),
1311        };
1312
1313        if let Err(error) = crate::metrics::Controller::get()
1314            .expect("Metrics must be initialized")
1315            .set_expiry(
1316                expire_metrics,
1317                config
1318                    .global
1319                    .expire_metrics_per_metric_set
1320                    .clone()
1321                    .unwrap_or_default(),
1322            )
1323        {
1324            error!(message = "Invalid metrics expiry.", %error, internal_log_rate_limit = false);
1325            return None;
1326        }
1327
1328        let (utilization_emitter, utilization_registry) = pieces
1329            .utilization
1330            .take()
1331            .expect("Topology is missing the utilization metric emitter!");
1332        let metrics_storage = pieces.metrics_storage.clone();
1333        let metrics_refresh_period = config
1334            .global
1335            .metrics_storage_refresh_period
1336            .map(Duration::from_secs_f64);
1337        let mut running_topology = Self::new(config, abort_tx);
1338
1339        if !running_topology
1340            .run_healthchecks(&diff, &mut pieces, running_topology.config.healthchecks)
1341            .await
1342        {
1343            return None;
1344        }
1345        running_topology.connect_diff(&diff, &mut pieces).await;
1346        running_topology.spawn_diff(&diff, pieces);
1347
1348        let (utilization_task_shutdown_trigger, utilization_shutdown_signal, _) =
1349            ShutdownSignal::new_wired();
1350        running_topology.utilization_registry = Some(utilization_registry.clone());
1351        running_topology.utilization_task_shutdown_trigger =
1352            Some(utilization_task_shutdown_trigger);
1353        running_topology.utilization_task = Some(tokio::spawn(Task::new(
1354            "utilization_heartbeat".into(),
1355            "",
1356            async move {
1357                utilization_emitter
1358                    .run_utilization(utilization_shutdown_signal)
1359                    .await;
1360                Ok(TaskOutput::Healthcheck)
1361            },
1362        )));
1363        if let Some(metrics_refresh_period) = metrics_refresh_period {
1364            let (metrics_task_shutdown_trigger, metrics_shutdown_signal, _) =
1365                ShutdownSignal::new_wired();
1366            running_topology.metrics_task_shutdown_trigger = Some(metrics_task_shutdown_trigger);
1367            running_topology.metrics_task = Some(tokio::spawn(Task::new(
1368                "metrics_heartbeat".into(),
1369                "",
1370                async move {
1371                    metrics_storage
1372                        .run_periodic_refresh(metrics_refresh_period, metrics_shutdown_signal)
1373                        .await;
1374                    Ok(TaskOutput::Healthcheck)
1375                },
1376            )));
1377        }
1378
1379        Some((running_topology, abort_rx))
1380    }
1381}
1382
1383fn get_changed_outputs(diff: &ConfigDiff, output_ids: Inputs<OutputId>) -> Vec<OutputId> {
1384    let mut changed_outputs = Vec::new();
1385
1386    for source_key in &diff.sources.to_change {
1387        changed_outputs.extend(
1388            output_ids
1389                .iter()
1390                .filter(|id| &id.component == source_key)
1391                .cloned(),
1392        );
1393    }
1394
1395    for transform_key in &diff.transforms.to_change {
1396        changed_outputs.extend(
1397            output_ids
1398                .iter()
1399                .filter(|id| &id.component == transform_key)
1400                .cloned(),
1401        );
1402    }
1403
1404    changed_outputs
1405}