vector/topology/
running.rs

1use std::{
2    collections::{HashMap, HashSet},
3    sync::{
4        Arc, Mutex,
5        atomic::{AtomicBool, Ordering},
6    },
7};
8
9use futures::{Future, FutureExt, future};
10use snafu::Snafu;
11use stream_cancel::Trigger;
12use tokio::{
13    sync::{mpsc, watch},
14    time::{Duration, Instant, interval, sleep_until},
15};
16use tracing::Instrument;
17use vector_lib::{
18    buffers::topology::channel::BufferSender,
19    shutdown::ShutdownSignal,
20    tap::topology::{TapOutput, TapResource, WatchRx, WatchTx},
21    trigger::DisabledTrigger,
22};
23
24use super::{
25    BuiltBuffer, TaskHandle,
26    builder::{self, TopologyPieces, TopologyPiecesBuilder, reload_enrichment_tables},
27    fanout::{ControlChannel, ControlMessage},
28    handle_errors, retain, take_healthchecks,
29    task::{Task, TaskOutput},
30};
31use crate::{
32    config::{ComponentKey, Config, ConfigDiff, HealthcheckOptions, Inputs, OutputId, Resource},
33    event::EventArray,
34    extra_context::ExtraContext,
35    shutdown::SourceShutdownCoordinator,
36    signal::ShutdownError,
37    spawn_named,
38    utilization::UtilizationRegistry,
39};
40
41pub type ShutdownErrorReceiver = mpsc::UnboundedReceiver<ShutdownError>;
42
43#[derive(Debug, Snafu)]
44pub enum ReloadError {
45    #[snafu(display("global options changed: {}", changed_fields.join(", ")))]
46    GlobalOptionsChanged { changed_fields: Vec<String> },
47    #[snafu(display("failed to compute global diff: {}", source))]
48    GlobalDiffFailed { source: serde_json::Error },
49    #[snafu(display("topology build failed"))]
50    TopologyBuildFailed,
51    #[snafu(display("failed to restore previous config"))]
52    FailedToRestore,
53}
54
55#[allow(dead_code)]
56pub struct RunningTopology {
57    inputs: HashMap<ComponentKey, BufferSender<EventArray>>,
58    inputs_tap_metadata: HashMap<ComponentKey, Inputs<OutputId>>,
59    outputs: HashMap<OutputId, ControlChannel>,
60    outputs_tap_metadata: HashMap<ComponentKey, (&'static str, String)>,
61    source_tasks: HashMap<ComponentKey, TaskHandle>,
62    tasks: HashMap<ComponentKey, TaskHandle>,
63    shutdown_coordinator: SourceShutdownCoordinator,
64    detach_triggers: HashMap<ComponentKey, DisabledTrigger>,
65    pub(crate) config: Config,
66    pub(crate) abort_tx: mpsc::UnboundedSender<ShutdownError>,
67    watch: (WatchTx, WatchRx),
68    pub(crate) running: Arc<AtomicBool>,
69    graceful_shutdown_duration: Option<Duration>,
70    utilization_registry: Option<UtilizationRegistry>,
71    utilization_task: Option<TaskHandle>,
72    utilization_task_shutdown_trigger: Option<Trigger>,
73    pending_reload: Option<HashSet<ComponentKey>>,
74}
75
76impl RunningTopology {
77    pub fn new(config: Config, abort_tx: mpsc::UnboundedSender<ShutdownError>) -> Self {
78        Self {
79            inputs: HashMap::new(),
80            inputs_tap_metadata: HashMap::new(),
81            outputs: HashMap::new(),
82            outputs_tap_metadata: HashMap::new(),
83            shutdown_coordinator: SourceShutdownCoordinator::default(),
84            detach_triggers: HashMap::new(),
85            source_tasks: HashMap::new(),
86            tasks: HashMap::new(),
87            abort_tx,
88            watch: watch::channel(TapResource::default()),
89            running: Arc::new(AtomicBool::new(true)),
90            graceful_shutdown_duration: config.graceful_shutdown_duration,
91            config,
92            utilization_registry: None,
93            utilization_task: None,
94            utilization_task_shutdown_trigger: None,
95            pending_reload: None,
96        }
97    }
98
99    /// Gets the configuration that represents this running topology.
100    pub const fn config(&self) -> &Config {
101        &self.config
102    }
103
104    /// Adds a set of component keys to the pending reload set if one exists. Otherwise, it
105    /// initializes the pending reload set.
106    pub fn extend_reload_set(&mut self, new_set: HashSet<ComponentKey>) {
107        match &mut self.pending_reload {
108            None => self.pending_reload = Some(new_set.clone()),
109            Some(existing) => existing.extend(new_set),
110        }
111    }
112
113    /// Creates a subscription to topology changes.
114    ///
115    /// This is used by the tap API to observe configuration changes, and re-wire tap sinks.
116    pub fn watch(&self) -> watch::Receiver<TapResource> {
117        self.watch.1.clone()
118    }
119
120    /// Signal that all sources in this topology are ended.
121    ///
122    /// The future returned by this function will finish once all the sources in
123    /// this topology have finished. This allows the caller to wait for or
124    /// detect that the sources in the topology are no longer
125    /// producing. [`Application`][crate::app::Application], as an example, uses this as a
126    /// shutdown signal.
127    pub fn sources_finished(&self) -> future::BoxFuture<'static, ()> {
128        self.shutdown_coordinator.shutdown_tripwire()
129    }
130
131    /// Shut down all topology components.
132    ///
133    /// This function sends the shutdown signal to all sources in this topology
134    /// and returns a future that resolves once all components (sources,
135    /// transforms, and sinks) have finished shutting down. Transforms and sinks
136    /// will shut down automatically once their input tasks finish.
137    ///
138    /// This function takes ownership of `self`, so once it returns everything
139    /// in the [`RunningTopology`] instance has been dropped except for the
140    /// `tasks` map. This map gets moved into the returned future and is used to
141    /// poll for when the tasks have completed. Once the returned future is
142    /// dropped then everything from this RunningTopology instance is fully
143    /// dropped.
144    pub fn stop(self) -> impl Future<Output = ()> {
145        // Update the API's health endpoint to signal shutdown
146        self.running.store(false, Ordering::Relaxed);
147        // Create handy handles collections of all tasks for the subsequent
148        // operations.
149        let mut wait_handles = Vec::new();
150        // We need a Vec here since source components have two tasks. One for
151        // pump in self.tasks, and the other for source in self.source_tasks.
152        let mut check_handles = HashMap::<ComponentKey, Vec<_>>::new();
153
154        let map_closure = |_result| ();
155
156        // We need to give some time to the sources to gracefully shutdown, so
157        // we will merge them with other tasks.
158        for (key, task) in self.tasks.into_iter().chain(self.source_tasks.into_iter()) {
159            let task = task.map(map_closure).shared();
160
161            wait_handles.push(task.clone());
162            check_handles.entry(key).or_default().push(task);
163        }
164
165        if let Some(utilization_task) = self.utilization_task {
166            wait_handles.push(utilization_task.map(map_closure).shared());
167        }
168
169        // If we reach this, we will forcefully shutdown the sources. If None, we will never force shutdown.
170        let deadline = self
171            .graceful_shutdown_duration
172            .map(|grace_period| Instant::now() + grace_period);
173
174        let timeout = if let Some(deadline) = deadline {
175            // If we reach the deadline, this future will print out which components
176            // won't gracefully shutdown since we will start to forcefully shutdown
177            // the sources.
178            let mut check_handles2 = check_handles.clone();
179            Box::pin(async move {
180                sleep_until(deadline).await;
181                // Remove all tasks that have shutdown.
182                check_handles2.retain(|_key, handles| {
183                    retain(handles, |handle| handle.peek().is_none());
184                    !handles.is_empty()
185                });
186                let remaining_components = check_handles2
187                    .keys()
188                    .map(|item| item.to_string())
189                    .collect::<Vec<_>>()
190                    .join(", ");
191
192                error!(
193                    components = ?remaining_components,
194                    message = "Failed to gracefully shut down in time. Killing components.",
195                    internal_log_rate_limit = false
196                );
197            }) as future::BoxFuture<'static, ()>
198        } else {
199            Box::pin(future::pending()) as future::BoxFuture<'static, ()>
200        };
201
202        // Reports in intervals which components are still running.
203        let mut interval = interval(Duration::from_secs(5));
204        let reporter = async move {
205            loop {
206                interval.tick().await;
207
208                // Remove all tasks that have shutdown.
209                check_handles.retain(|_key, handles| {
210                    retain(handles, |handle| handle.peek().is_none());
211                    !handles.is_empty()
212                });
213                let remaining_components = check_handles
214                    .keys()
215                    .map(|item| item.to_string())
216                    .collect::<Vec<_>>()
217                    .join(", ");
218
219                let (deadline_passed, time_remaining) = match deadline {
220                    Some(d) => match d.checked_duration_since(Instant::now()) {
221                        Some(remaining) => (false, format!("{} seconds left", remaining.as_secs())),
222                        None => (true, "overdue".to_string()),
223                    },
224                    None => (false, "no time limit".to_string()),
225                };
226
227                info!(
228                    remaining_components = ?remaining_components,
229                    time_remaining = ?time_remaining,
230                    "Shutting down... Waiting on running components."
231                );
232
233                let all_done = check_handles.is_empty();
234
235                if all_done {
236                    info!("Shutdown reporter exiting: all components shut down.");
237                    break;
238                } else if deadline_passed {
239                    error!(remaining_components = ?remaining_components, "Shutdown reporter: deadline exceeded.");
240                    break;
241                }
242            }
243        };
244
245        // Finishes once all tasks have shutdown.
246        let success = futures::future::join_all(wait_handles).map(|_| ());
247
248        // Aggregate future that ends once anything detects that all tasks have shutdown.
249        let shutdown_complete_future = future::select_all(vec![
250            Box::pin(timeout) as future::BoxFuture<'static, ()>,
251            Box::pin(reporter) as future::BoxFuture<'static, ()>,
252            Box::pin(success) as future::BoxFuture<'static, ()>,
253        ]);
254
255        // Now kick off the shutdown process by shutting down the sources.
256        let source_shutdown_complete = self.shutdown_coordinator.shutdown_all(deadline);
257        if let Some(trigger) = self.utilization_task_shutdown_trigger {
258            trigger.cancel();
259        }
260
261        futures::future::join(source_shutdown_complete, shutdown_complete_future).map(|_| ())
262    }
263
264    /// Attempts to load a new configuration and update this running topology.
265    ///
266    /// If the new configuration was valid, and all changes were able to be made -- removing of
267    /// old components, changing of existing components, adding of new components -- then
268    /// `Ok(())` is returned.
269    ///
270    /// If the new configuration is not valid, or not all of the changes in the new configuration
271    /// were able to be made, then this method will attempt to undo the changes made and bring the
272    /// topology back to its previous state, returning the appropriate error.
273    ///
274    /// If the restore also fails, `ReloadError::FailedToRestore` is returned.
275    pub async fn reload_config_and_respawn(
276        &mut self,
277        new_config: Config,
278        extra_context: ExtraContext,
279    ) -> Result<(), ReloadError> {
280        info!("Reloading running topology with new configuration.");
281
282        if self.config.global != new_config.global {
283            return match self.config.global.diff(&new_config.global) {
284                Ok(changed_fields) => Err(ReloadError::GlobalOptionsChanged { changed_fields }),
285                Err(source) => Err(ReloadError::GlobalDiffFailed { source }),
286            };
287        }
288
289        // Calculate the change between the current configuration and the new configuration, and
290        // shutdown any components that are changing so that we can reclaim their buffers before
291        // spawning the new version of the component.
292        //
293        // We also shutdown any component that is simply being removed entirely.
294        let diff = if let Some(components) = &self.pending_reload {
295            ConfigDiff::new(&self.config, &new_config, components.clone())
296        } else {
297            ConfigDiff::new(&self.config, &new_config, HashSet::new())
298        };
299        let buffers = self.shutdown_diff(&diff, &new_config).await;
300
301        // Gives windows some time to make available any port
302        // released by shutdown components.
303        // Issue: https://github.com/vectordotdev/vector/issues/3035
304        if cfg!(windows) {
305            // This value is guess work.
306            tokio::time::sleep(Duration::from_millis(200)).await;
307        }
308
309        // Try to build all of the new components coming from the new configuration.  If we can
310        // successfully build them, we'll attempt to connect them up to the topology and spawn their
311        // respective component tasks.
312        if let Some(mut new_pieces) = TopologyPiecesBuilder::new(&new_config, &diff)
313            .with_buffers(buffers.clone())
314            .with_extra_context(extra_context.clone())
315            .with_utilization_registry(self.utilization_registry.clone())
316            .build_or_log_errors()
317            .await
318        {
319            // If healthchecks are configured for any of the changing/new components, try running
320            // them before moving forward with connecting and spawning.  In some cases, healthchecks
321            // failing may be configured as a non-blocking issue and so we'll still continue on.
322            if self
323                .run_healthchecks(&diff, &mut new_pieces, new_config.healthchecks)
324                .await
325            {
326                self.connect_diff(&diff, &mut new_pieces).await;
327                self.spawn_diff(&diff, new_pieces);
328                self.config = new_config;
329
330                info!("New configuration loaded successfully.");
331
332                return Ok(());
333            }
334        }
335
336        // We failed to build, connect, and spawn all of the changed/new components, so we flip
337        // around the configuration differential to generate all the components that we need to
338        // bring back to restore the current configuration.
339        warn!("Failed to completely load new configuration. Restoring old configuration.");
340
341        let diff = diff.flip();
342        if let Some(mut new_pieces) = TopologyPiecesBuilder::new(&self.config, &diff)
343            .with_buffers(buffers)
344            .with_extra_context(extra_context.clone())
345            .with_utilization_registry(self.utilization_registry.clone())
346            .build_or_log_errors()
347            .await
348            && self
349                .run_healthchecks(&diff, &mut new_pieces, self.config.healthchecks)
350                .await
351        {
352            self.connect_diff(&diff, &mut new_pieces).await;
353            self.spawn_diff(&diff, new_pieces);
354
355            info!("Old configuration restored successfully.");
356
357            return Err(ReloadError::TopologyBuildFailed);
358        }
359
360        error!(
361            message = "Failed to restore old configuration.",
362            internal_log_rate_limit = false
363        );
364
365        Err(ReloadError::FailedToRestore)
366    }
367
368    /// Attempts to reload enrichment tables.
369    pub(crate) async fn reload_enrichment_tables(&self) {
370        reload_enrichment_tables(&self.config).await;
371    }
372
373    pub(crate) async fn run_healthchecks(
374        &mut self,
375        diff: &ConfigDiff,
376        pieces: &mut TopologyPieces,
377        options: HealthcheckOptions,
378    ) -> bool {
379        if options.enabled {
380            let healthchecks = take_healthchecks(diff, pieces)
381                .into_iter()
382                .map(|(_, task)| task);
383            let healthchecks = future::try_join_all(healthchecks);
384
385            info!("Running healthchecks.");
386            if options.require_healthy {
387                let success = healthchecks.await;
388
389                if success.is_ok() {
390                    info!("All healthchecks passed.");
391                    true
392                } else {
393                    error!(
394                        message = "Sinks unhealthy.",
395                        internal_log_rate_limit = false
396                    );
397                    false
398                }
399            } else {
400                tokio::spawn(healthchecks);
401                true
402            }
403        } else {
404            true
405        }
406    }
407
408    /// Shuts down any changed/removed component in the given configuration diff.
409    ///
410    /// If buffers for any of the changed/removed components can be recovered, they'll be returned.
411    async fn shutdown_diff(
412        &mut self,
413        diff: &ConfigDiff,
414        new_config: &Config,
415    ) -> HashMap<ComponentKey, BuiltBuffer> {
416        // First, we shutdown any changed/removed sources. This ensures that we can allow downstream
417        // components to terminate naturally by virtue of the flow of events stopping.
418        if diff.sources.any_changed_or_removed() {
419            let timeout = Duration::from_secs(30);
420            let mut source_shutdown_handles = Vec::new();
421
422            let deadline = Instant::now() + timeout;
423            for key in &diff.sources.to_remove {
424                debug!(component_id = %key, "Removing source.");
425
426                let previous = self.tasks.remove(key).unwrap();
427                drop(previous); // detach and forget
428
429                self.remove_outputs(key);
430                source_shutdown_handles
431                    .push(self.shutdown_coordinator.shutdown_source(key, deadline));
432            }
433
434            for key in &diff.sources.to_change {
435                debug!(component_id = %key, "Changing source.");
436
437                self.remove_outputs(key);
438                source_shutdown_handles
439                    .push(self.shutdown_coordinator.shutdown_source(key, deadline));
440            }
441
442            debug!(
443                "Waiting for up to {} seconds for source(s) to finish shutting down.",
444                timeout.as_secs()
445            );
446            futures::future::join_all(source_shutdown_handles).await;
447
448            // Final cleanup pass now that all changed/removed sources have signalled as having shutdown.
449            for key in diff.sources.removed_and_changed() {
450                if let Some(task) = self.source_tasks.remove(key) {
451                    task.await.unwrap().unwrap();
452                }
453            }
454        }
455
456        // Next, we shutdown any changed/removed transforms.  Same as before: we want allow
457        // downstream components to terminate naturally by virtue of the flow of events stopping.
458        //
459        // Since transforms are entirely driven by the flow of events into them from upstream
460        // components, the shutdown of sources they depend on, or the shutdown of transforms they
461        // depend on, and thus the closing of their buffer, will naturally cause them to shutdown,
462        // which is why we don't do any manual triggering of shutdown here.
463        for key in &diff.transforms.to_remove {
464            debug!(component_id = %key, "Removing transform.");
465
466            let previous = self.tasks.remove(key).unwrap();
467            drop(previous); // detach and forget
468
469            self.remove_inputs(key, diff, new_config).await;
470            self.remove_outputs(key);
471
472            if let Some(registry) = self.utilization_registry.as_ref() {
473                registry.remove_component(key);
474            }
475        }
476
477        for key in &diff.transforms.to_change {
478            debug!(component_id = %key, "Changing transform.");
479
480            self.remove_inputs(key, diff, new_config).await;
481            self.remove_outputs(key);
482        }
483
484        // Now we'll process any changed/removed sinks.
485        //
486        // At this point both the old and the new config don't have conflicts in their resource
487        // usage. So if we combine their resources, all found conflicts are between to be removed
488        // and to be added components.
489        let removed_table_sinks = diff
490            .enrichment_tables
491            .removed_and_changed()
492            .filter_map(|key| {
493                self.config
494                    .enrichment_table(key)
495                    .and_then(|t| t.as_sink(key))
496                    .map(|(key, s)| (key.clone(), s.resources(&key)))
497            })
498            .collect::<Vec<_>>();
499        let remove_sink = diff
500            .sinks
501            .removed_and_changed()
502            .map(|key| {
503                (
504                    key,
505                    self.config
506                        .sink(key)
507                        .map(|s| s.resources(key))
508                        .unwrap_or_default(),
509                )
510            })
511            .chain(removed_table_sinks.iter().map(|(k, s)| (k, s.clone())));
512        let add_source = diff
513            .sources
514            .changed_and_added()
515            .map(|key| (key, new_config.source(key).unwrap().inner.resources()));
516        let added_table_sinks = diff
517            .enrichment_tables
518            .changed_and_added()
519            .filter_map(|key| {
520                self.config
521                    .enrichment_table(key)
522                    .and_then(|t| t.as_sink(key))
523                    .map(|(key, s)| (key.clone(), s.resources(&key)))
524            })
525            .collect::<Vec<_>>();
526        let add_sink = diff
527            .sinks
528            .changed_and_added()
529            .map(|key| {
530                (
531                    key,
532                    new_config
533                        .sink(key)
534                        .map(|s| s.resources(key))
535                        .unwrap_or_default(),
536                )
537            })
538            .chain(added_table_sinks.iter().map(|(k, s)| (k, s.clone())));
539        let conflicts = Resource::conflicts(
540            remove_sink.map(|(key, value)| ((true, key), value)).chain(
541                add_sink
542                    .chain(add_source)
543                    .map(|(key, value)| ((false, key), value)),
544            ),
545        )
546        .into_iter()
547        .flat_map(|(_, components)| components)
548        .collect::<HashSet<_>>();
549        // Existing conflicting sinks
550        let conflicting_sinks = conflicts
551            .into_iter()
552            .filter(|&(existing_sink, _)| existing_sink)
553            .map(|(_, key)| key.clone());
554
555        // For any sink whose buffer configuration didn't change, we can reuse their buffer.
556        let reuse_buffers = diff
557            .sinks
558            .to_change
559            .iter()
560            .filter(|&key| {
561                if diff.components_to_reload.contains(key) {
562                    return false;
563                }
564                self.config.sink(key).map(|s| s.buffer.clone()).or_else(|| {
565                    self.config
566                        .enrichment_table(key)
567                        .and_then(|t| t.as_sink(key))
568                        .map(|(_, s)| s.buffer)
569                }) == new_config.sink(key).map(|s| s.buffer.clone()).or_else(|| {
570                    self.config
571                        .enrichment_table(key)
572                        .and_then(|t| t.as_sink(key))
573                        .map(|(_, s)| s.buffer)
574                })
575            })
576            .cloned()
577            .collect::<HashSet<_>>();
578
579        // For any existing sink that has a conflicting resource dependency with a changed/added
580        // sink, or for any sink that we want to reuse their buffer, we need to explicit wait for
581        // them to finish processing so we can reclaim ownership of those resources/buffers.
582        let wait_for_sinks = conflicting_sinks
583            .chain(reuse_buffers.iter().cloned())
584            .collect::<HashSet<_>>();
585
586        // First, we remove any inputs to removed sinks so they can naturally shut down.
587        let removed_sinks = diff
588            .sinks
589            .to_remove
590            .iter()
591            .chain(diff.enrichment_tables.to_remove.iter().filter(|key| {
592                self.config
593                    .enrichment_table(key)
594                    .and_then(|t| t.as_sink(key))
595                    .is_some()
596            }))
597            .collect::<Vec<_>>();
598        for key in &removed_sinks {
599            debug!(component_id = %key, "Removing sink.");
600            self.remove_inputs(key, diff, new_config).await;
601
602            if let Some(registry) = self.utilization_registry.as_ref() {
603                registry.remove_component(key);
604            }
605        }
606
607        // After that, for any changed sinks, we temporarily detach their inputs (not remove) so
608        // they can naturally shutdown and allow us to recover their buffers if possible.
609        let mut buffer_tx = HashMap::new();
610
611        let sinks_to_change = diff
612            .sinks
613            .to_change
614            .iter()
615            .chain(diff.enrichment_tables.to_change.iter().filter(|key| {
616                self.config
617                    .enrichment_table(key)
618                    .and_then(|t| t.as_sink(key))
619                    .is_some()
620            }))
621            .collect::<Vec<_>>();
622
623        for key in &sinks_to_change {
624            debug!(component_id = %key, "Changing sink.");
625            if reuse_buffers.contains(key) {
626                self.detach_triggers
627                    .remove(key)
628                    .unwrap()
629                    .into_inner()
630                    .cancel();
631
632                // We explicitly clone the input side of the buffer and store it so we don't lose
633                // it when we remove the inputs below.
634                //
635                // We clone instead of removing here because otherwise the input will be missing for
636                // the rest of the reload process, which violates the assumption that all previous
637                // inputs for components not being removed are still available. It's simpler to
638                // allow the "old" input to stick around and be replaced (even though that's
639                // basically a no-op since we're reusing the same buffer) than it is to pass around
640                // info about which sinks are having their buffers reused and treat them differently
641                // at other stages.
642                buffer_tx.insert((*key).clone(), self.inputs.get(key).unwrap().clone());
643            }
644            self.remove_inputs(key, diff, new_config).await;
645        }
646
647        // Now that we've disconnected or temporarily detached the inputs to all changed/removed
648        // sinks, we can actually wait for them to shutdown before collecting any buffers that are
649        // marked for reuse.
650        //
651        // If a sink we're removing isn't tying up any resource that a changed/added sink depends
652        // on, we don't bother waiting for it to shutdown.
653        for key in &removed_sinks {
654            let previous = self.tasks.remove(key).unwrap();
655            if wait_for_sinks.contains(key) {
656                debug!(message = "Waiting for sink to shutdown.", component_id = %key);
657                previous.await.unwrap().unwrap();
658            } else {
659                drop(previous); // detach and forget
660            }
661        }
662
663        let mut buffers = HashMap::<ComponentKey, BuiltBuffer>::new();
664        for key in &sinks_to_change {
665            if wait_for_sinks.contains(key) {
666                let previous = self.tasks.remove(key).unwrap();
667                debug!(message = "Waiting for sink to shutdown.", component_id = %key);
668                let buffer = previous.await.unwrap().unwrap();
669
670                if reuse_buffers.contains(key) {
671                    // We clone instead of removing here because otherwise the input will be
672                    // missing for the rest of the reload process, which violates the assumption
673                    // that all previous inputs for components not being removed are still
674                    // available. It's simpler to allow the "old" input to stick around and be
675                    // replaced (even though that's basically a no-op since we're reusing the same
676                    // buffer) than it is to pass around info about which sinks are having their
677                    // buffers reused and treat them differently at other stages.
678                    let tx = buffer_tx.remove(key).unwrap();
679                    let rx = match buffer {
680                        TaskOutput::Sink(rx) => rx.into_inner(),
681                        _ => unreachable!(),
682                    };
683
684                    buffers.insert((*key).clone(), (tx, Arc::new(Mutex::new(Some(rx)))));
685                }
686            }
687        }
688
689        buffers
690    }
691
692    /// Connects all changed/added components in the given configuration diff.
693    pub(crate) async fn connect_diff(
694        &mut self,
695        diff: &ConfigDiff,
696        new_pieces: &mut TopologyPieces,
697    ) {
698        debug!("Connecting changed/added component(s).");
699
700        // Update tap metadata
701        if !self.watch.0.is_closed() {
702            for key in &diff.sources.to_remove {
703                // Sources only have outputs
704                self.outputs_tap_metadata.remove(key);
705            }
706
707            for key in &diff.transforms.to_remove {
708                // Transforms can have both inputs and outputs
709                self.outputs_tap_metadata.remove(key);
710                self.inputs_tap_metadata.remove(key);
711            }
712
713            for key in &diff.sinks.to_remove {
714                // Sinks only have inputs
715                self.inputs_tap_metadata.remove(key);
716            }
717
718            let removed_sinks = diff.enrichment_tables.to_remove.iter().filter(|key| {
719                self.config
720                    .enrichment_table(key)
721                    .and_then(|t| t.as_sink(key))
722                    .is_some()
723            });
724            for key in removed_sinks {
725                // Sinks only have inputs
726                self.inputs_tap_metadata.remove(key);
727            }
728
729            let removed_sources = diff.enrichment_tables.to_remove.iter().filter_map(|key| {
730                self.config
731                    .enrichment_table(key)
732                    .and_then(|t| t.as_source(key).map(|(key, _)| key))
733            });
734            for key in removed_sources {
735                // Sources only have outputs
736                self.outputs_tap_metadata.remove(&key);
737            }
738
739            for key in diff.sources.changed_and_added() {
740                if let Some(task) = new_pieces.tasks.get(key) {
741                    self.outputs_tap_metadata
742                        .insert(key.clone(), ("source", task.typetag().to_string()));
743                }
744            }
745
746            for key in diff
747                .enrichment_tables
748                .changed_and_added()
749                .filter_map(|key| {
750                    self.config
751                        .enrichment_table(key)
752                        .and_then(|t| t.as_source(key).map(|(key, _)| key))
753                })
754            {
755                if let Some(task) = new_pieces.tasks.get(&key) {
756                    self.outputs_tap_metadata
757                        .insert(key.clone(), ("source", task.typetag().to_string()));
758                }
759            }
760
761            for key in diff.transforms.changed_and_added() {
762                if let Some(task) = new_pieces.tasks.get(key) {
763                    self.outputs_tap_metadata
764                        .insert(key.clone(), ("transform", task.typetag().to_string()));
765                }
766            }
767
768            for (key, input) in &new_pieces.inputs {
769                self.inputs_tap_metadata
770                    .insert(key.clone(), input.1.clone());
771            }
772        }
773
774        // We configure the outputs of any changed/added sources first, so they're available to any
775        // transforms and sinks that come afterwards.
776        for key in diff.sources.changed_and_added() {
777            debug!(component_id = %key, "Configuring outputs for source.");
778            self.setup_outputs(key, new_pieces).await;
779        }
780
781        let added_changed_table_sources: Vec<&ComponentKey> = diff
782            .enrichment_tables
783            .changed_and_added()
784            .filter(|k| new_pieces.source_tasks.contains_key(k))
785            .collect();
786        for key in added_changed_table_sources.iter() {
787            debug!(component_id = %key, "Connecting outputs for enrichment table source.");
788            self.setup_outputs(key, new_pieces).await;
789        }
790
791        // We configure the outputs of any changed/added transforms next, for the same reason: we
792        // need them to be available to any transforms and sinks that come afterwards.
793        for key in diff.transforms.changed_and_added() {
794            debug!(component_id = %key, "Configuring outputs for transform.");
795            self.setup_outputs(key, new_pieces).await;
796        }
797
798        // Now that all possible outputs are configured, we can start wiring up inputs, starting
799        // with transforms.
800        for key in diff.transforms.changed_and_added() {
801            debug!(component_id = %key, "Connecting inputs for transform.");
802            self.setup_inputs(key, diff, new_pieces).await;
803        }
804
805        // Now that all sources and transforms are fully configured, we can wire up sinks.
806        for key in diff.sinks.changed_and_added() {
807            debug!(component_id = %key, "Connecting inputs for sink.");
808            self.setup_inputs(key, diff, new_pieces).await;
809        }
810        let added_changed_tables: Vec<&ComponentKey> = diff
811            .enrichment_tables
812            .changed_and_added()
813            .filter(|k| new_pieces.inputs.contains_key(k))
814            .collect();
815        for key in added_changed_tables.iter() {
816            debug!(component_id = %key, "Connecting inputs for enrichment table sink.");
817            self.setup_inputs(key, diff, new_pieces).await;
818        }
819
820        // We do a final pass here to reconnect unchanged components.
821        //
822        // Why would we reconnect unchanged components?  Well, as sources and transforms will
823        // recreate their fanouts every time they're changed, we can run into a situation where a
824        // transform/sink, which we'll call B, is pointed at a source/transform that was changed, which
825        // we'll call A, but because B itself didn't change at all, we haven't yet reconnected it.
826        //
827        // Instead of propagating connections forward -- B reconnecting A forcefully -- we only
828        // connect components backwards i.e. transforms to sources/transforms, and sinks to
829        // sources/transforms, to ensure we're connecting components in order.
830        self.reattach_severed_inputs(diff);
831
832        // Broadcast any topology changes to subscribers.
833        if !self.watch.0.is_closed() {
834            let outputs = self
835                .outputs
836                .clone()
837                .into_iter()
838                .flat_map(|(output_id, control_tx)| {
839                    self.outputs_tap_metadata.get(&output_id.component).map(
840                        |(component_kind, component_type)| {
841                            (
842                                TapOutput {
843                                    output_id,
844                                    component_kind,
845                                    component_type: component_type.clone(),
846                                },
847                                control_tx,
848                            )
849                        },
850                    )
851                })
852                .collect::<HashMap<_, _>>();
853
854            let mut removals = diff.sources.to_remove.clone();
855            removals.extend(diff.transforms.to_remove.iter().cloned());
856            self.watch
857                .0
858                .send(TapResource {
859                    outputs,
860                    inputs: self.inputs_tap_metadata.clone(),
861                    source_keys: diff
862                        .sources
863                        .changed_and_added()
864                        .map(|key| key.to_string())
865                        .chain(
866                            added_changed_table_sources
867                                .iter()
868                                .map(|key| key.to_string()),
869                        )
870                        .collect(),
871                    sink_keys: diff
872                        .sinks
873                        .changed_and_added()
874                        .map(|key| key.to_string())
875                        .chain(added_changed_tables.iter().map(|key| key.to_string()))
876                        .collect(),
877                    // Note, only sources and transforms are relevant. Sinks do
878                    // not have outputs to tap.
879                    removals,
880                })
881                .expect("Couldn't broadcast config changes.");
882        }
883    }
884
885    async fn setup_outputs(
886        &mut self,
887        key: &ComponentKey,
888        new_pieces: &mut builder::TopologyPieces,
889    ) {
890        let outputs = new_pieces.outputs.remove(key).unwrap();
891        for (port, output) in outputs {
892            debug!(component_id = %key, output_id = ?port, "Configuring output for component.");
893
894            let id = OutputId {
895                component: key.clone(),
896                port,
897            };
898
899            self.outputs.insert(id, output);
900        }
901    }
902
903    async fn setup_inputs(
904        &mut self,
905        key: &ComponentKey,
906        diff: &ConfigDiff,
907        new_pieces: &mut builder::TopologyPieces,
908    ) {
909        let (tx, inputs) = new_pieces.inputs.remove(key).unwrap();
910
911        let old_inputs = self
912            .config
913            .inputs_for_node(key)
914            .into_iter()
915            .flatten()
916            .cloned()
917            .collect::<HashSet<_>>();
918
919        let new_inputs = inputs.iter().cloned().collect::<HashSet<_>>();
920        let inputs_to_add = &new_inputs - &old_inputs;
921
922        for input in inputs {
923            let output = self.outputs.get_mut(&input).expect("unknown output");
924
925            if diff.contains(&input.component) || inputs_to_add.contains(&input) {
926                // If the input we're connecting to is changing, that means its outputs will have been
927                // recreated, so instead of replacing a paused sink, we have to add it to this new
928                // output for the first time, since there's nothing to actually replace at this point.
929                debug!(component_id = %key, fanout_id = %input, "Adding component input to fanout.");
930
931                _ = output.send(ControlMessage::Add(key.clone(), tx.clone()));
932            } else {
933                // We know that if this component is connected to a given input, and neither
934                // components were changed, then the output must still exist, which means we paused
935                // this component's connection to its output, so we have to replace that connection
936                // now:
937                debug!(component_id = %key, fanout_id = %input, "Replacing component input in fanout.");
938
939                _ = output.send(ControlMessage::Replace(key.clone(), tx.clone()));
940            }
941        }
942
943        self.inputs.insert(key.clone(), tx);
944        new_pieces
945            .detach_triggers
946            .remove(key)
947            .map(|trigger| self.detach_triggers.insert(key.clone(), trigger.into()));
948    }
949
950    fn remove_outputs(&mut self, key: &ComponentKey) {
951        self.outputs.retain(|id, _output| &id.component != key);
952    }
953
954    async fn remove_inputs(&mut self, key: &ComponentKey, diff: &ConfigDiff, new_config: &Config) {
955        self.inputs.remove(key);
956        self.detach_triggers.remove(key);
957
958        let old_inputs = self.config.inputs_for_node(key).expect("node exists");
959        let new_inputs = new_config
960            .inputs_for_node(key)
961            .unwrap_or_default()
962            .iter()
963            .collect::<HashSet<_>>();
964
965        for input in old_inputs {
966            if let Some(output) = self.outputs.get_mut(input) {
967                if diff.contains(&input.component)
968                    || diff.is_removed(key)
969                    || !new_inputs.contains(input)
970                {
971                    // 3 cases to remove the input:
972                    //
973                    // Case 1: If the input we're removing ourselves from is changing, that means its
974                    // outputs will be recreated, so instead of pausing the sink, we just delete it
975                    // outright to ensure things are clean.
976                    //
977                    // Case 2: If this component itself is being removed, then pausing makes no sense
978                    // because it isn't coming back.
979                    //
980                    // Case 3: This component is no longer connected to the input from new config.
981                    debug!(component_id = %key, fanout_id = %input, "Removing component input from fanout.");
982
983                    _ = output.send(ControlMessage::Remove(key.clone()));
984                } else {
985                    // We know that if this component is connected to a given input, and it isn't being
986                    // changed, then it will exist when we reconnect inputs, so we should pause it
987                    // now to pause further sends through that component until we reconnect:
988                    debug!(component_id = %key, fanout_id = %input, "Pausing component input in fanout.");
989
990                    _ = output.send(ControlMessage::Pause(key.clone()));
991                }
992            }
993        }
994    }
995
996    fn reattach_severed_inputs(&mut self, diff: &ConfigDiff) {
997        let unchanged_transforms = self
998            .config
999            .transforms()
1000            .filter(|(key, _)| !diff.transforms.contains(key));
1001        for (transform_key, transform) in unchanged_transforms {
1002            let changed_outputs = get_changed_outputs(diff, transform.inputs.clone());
1003            for output_id in changed_outputs {
1004                debug!(component_id = %transform_key, fanout_id = %output_id.component, "Reattaching component input to fanout.");
1005
1006                let input = self.inputs.get(transform_key).cloned().unwrap();
1007                let output = self.outputs.get_mut(&output_id).unwrap();
1008                _ = output.send(ControlMessage::Add(transform_key.clone(), input));
1009            }
1010        }
1011
1012        let unchanged_sinks = self
1013            .config
1014            .sinks()
1015            .filter(|(key, _)| !diff.sinks.contains(key));
1016        for (sink_key, sink) in unchanged_sinks {
1017            let changed_outputs = get_changed_outputs(diff, sink.inputs.clone());
1018            for output_id in changed_outputs {
1019                debug!(component_id = %sink_key, fanout_id = %output_id.component, "Reattaching component input to fanout.");
1020
1021                let input = self.inputs.get(sink_key).cloned().unwrap();
1022                let output = self.outputs.get_mut(&output_id).unwrap();
1023                _ = output.send(ControlMessage::Add(sink_key.clone(), input));
1024            }
1025        }
1026    }
1027
1028    /// Starts any new or changed components in the given configuration diff.
1029    pub(crate) fn spawn_diff(&mut self, diff: &ConfigDiff, mut new_pieces: TopologyPieces) {
1030        for key in &diff.sources.to_change {
1031            debug!(message = "Spawning changed source.", component_id = %key);
1032            self.spawn_source(key, &mut new_pieces);
1033        }
1034
1035        for key in &diff.sources.to_add {
1036            debug!(message = "Spawning new source.", component_id = %key);
1037            self.spawn_source(key, &mut new_pieces);
1038        }
1039
1040        let changed_table_sources: Vec<&ComponentKey> = diff
1041            .enrichment_tables
1042            .to_change
1043            .iter()
1044            .filter(|k| new_pieces.source_tasks.contains_key(k))
1045            .collect();
1046
1047        let added_table_sources: Vec<&ComponentKey> = diff
1048            .enrichment_tables
1049            .to_add
1050            .iter()
1051            .filter(|k| new_pieces.source_tasks.contains_key(k))
1052            .collect();
1053
1054        for key in changed_table_sources {
1055            debug!(message = "Spawning changed enrichment table source.", component_id = %key);
1056            self.spawn_source(key, &mut new_pieces);
1057        }
1058
1059        for key in added_table_sources {
1060            debug!(message = "Spawning new enrichment table source.", component_id = %key);
1061            self.spawn_source(key, &mut new_pieces);
1062        }
1063
1064        for key in &diff.transforms.to_change {
1065            debug!(message = "Spawning changed transform.", component_id = %key);
1066            self.spawn_transform(key, &mut new_pieces);
1067        }
1068
1069        for key in &diff.transforms.to_add {
1070            debug!(message = "Spawning new transform.", component_id = %key);
1071            self.spawn_transform(key, &mut new_pieces);
1072        }
1073
1074        for key in &diff.sinks.to_change {
1075            debug!(message = "Spawning changed sink.", component_id = %key);
1076            self.spawn_sink(key, &mut new_pieces);
1077        }
1078
1079        for key in &diff.sinks.to_add {
1080            trace!(message = "Spawning new sink.", component_id = %key);
1081            self.spawn_sink(key, &mut new_pieces);
1082        }
1083
1084        let changed_tables: Vec<&ComponentKey> = diff
1085            .enrichment_tables
1086            .to_change
1087            .iter()
1088            .filter(|k| {
1089                new_pieces.tasks.contains_key(k) && !new_pieces.source_tasks.contains_key(k)
1090            })
1091            .collect();
1092
1093        let added_tables: Vec<&ComponentKey> = diff
1094            .enrichment_tables
1095            .to_add
1096            .iter()
1097            .filter(|k| {
1098                new_pieces.tasks.contains_key(k) && !new_pieces.source_tasks.contains_key(k)
1099            })
1100            .collect();
1101
1102        for key in changed_tables {
1103            debug!(message = "Spawning changed enrichment table sink.", component_id = %key);
1104            self.spawn_sink(key, &mut new_pieces);
1105        }
1106
1107        for key in added_tables {
1108            debug!(message = "Spawning enrichment table new sink.", component_id = %key);
1109            self.spawn_sink(key, &mut new_pieces);
1110        }
1111    }
1112
1113    fn spawn_sink(&mut self, key: &ComponentKey, new_pieces: &mut builder::TopologyPieces) {
1114        let task = new_pieces.tasks.remove(key).unwrap();
1115        let span = error_span!(
1116            "sink",
1117            component_kind = "sink",
1118            component_id = %task.id(),
1119            component_type = %task.typetag(),
1120        );
1121
1122        let task_span = span.or_current();
1123        #[cfg(feature = "allocation-tracing")]
1124        if crate::internal_telemetry::allocations::is_allocation_tracing_enabled() {
1125            let group_id = crate::internal_telemetry::allocations::acquire_allocation_group_id(
1126                task.id().to_string(),
1127                "sink".to_string(),
1128                task.typetag().to_string(),
1129            );
1130            debug!(
1131                component_kind = "sink",
1132                component_type = task.typetag(),
1133                component_id = task.id(),
1134                group_id = group_id.as_raw().to_string(),
1135                "Registered new allocation group."
1136            );
1137            group_id.attach_to_span(&task_span);
1138        }
1139
1140        let task_name = format!(">> {} ({})", task.typetag(), task.id());
1141        let task = {
1142            let key = key.clone();
1143            handle_errors(task, self.abort_tx.clone(), |error| {
1144                ShutdownError::SinkAborted { key, error }
1145            })
1146        }
1147        .instrument(task_span);
1148        let spawned = spawn_named(task, task_name.as_ref());
1149        if let Some(previous) = self.tasks.insert(key.clone(), spawned) {
1150            drop(previous); // detach and forget
1151        }
1152    }
1153
1154    fn spawn_transform(&mut self, key: &ComponentKey, new_pieces: &mut builder::TopologyPieces) {
1155        let task = new_pieces.tasks.remove(key).unwrap();
1156        let span = error_span!(
1157            "transform",
1158            component_kind = "transform",
1159            component_id = %task.id(),
1160            component_type = %task.typetag(),
1161        );
1162
1163        let task_span = span.or_current();
1164        #[cfg(feature = "allocation-tracing")]
1165        if crate::internal_telemetry::allocations::is_allocation_tracing_enabled() {
1166            let group_id = crate::internal_telemetry::allocations::acquire_allocation_group_id(
1167                task.id().to_string(),
1168                "transform".to_string(),
1169                task.typetag().to_string(),
1170            );
1171            debug!(
1172                component_kind = "transform",
1173                component_type = task.typetag(),
1174                component_id = task.id(),
1175                group_id = group_id.as_raw().to_string(),
1176                "Registered new allocation group."
1177            );
1178            group_id.attach_to_span(&task_span);
1179        }
1180
1181        let task_name = format!(">> {} ({}) >>", task.typetag(), task.id());
1182        let task = {
1183            let key = key.clone();
1184            handle_errors(task, self.abort_tx.clone(), |error| {
1185                ShutdownError::TransformAborted { key, error }
1186            })
1187        }
1188        .instrument(task_span);
1189        let spawned = spawn_named(task, task_name.as_ref());
1190        if let Some(previous) = self.tasks.insert(key.clone(), spawned) {
1191            drop(previous); // detach and forget
1192        }
1193    }
1194
1195    fn spawn_source(&mut self, key: &ComponentKey, new_pieces: &mut builder::TopologyPieces) {
1196        let task = new_pieces.tasks.remove(key).unwrap();
1197        let span = error_span!(
1198            "source",
1199            component_kind = "source",
1200            component_id = %task.id(),
1201            component_type = %task.typetag(),
1202        );
1203
1204        let task_span = span.or_current();
1205        #[cfg(feature = "allocation-tracing")]
1206        if crate::internal_telemetry::allocations::is_allocation_tracing_enabled() {
1207            let group_id = crate::internal_telemetry::allocations::acquire_allocation_group_id(
1208                task.id().to_string(),
1209                "source".to_string(),
1210                task.typetag().to_string(),
1211            );
1212
1213            debug!(
1214                component_kind = "source",
1215                component_type = task.typetag(),
1216                component_id = task.id(),
1217                group_id = group_id.as_raw().to_string(),
1218                "Registered new allocation group."
1219            );
1220            group_id.attach_to_span(&task_span);
1221        }
1222
1223        let task_name = format!("{} ({}) >>", task.typetag(), task.id());
1224        let task = {
1225            let key = key.clone();
1226            handle_errors(task, self.abort_tx.clone(), |error| {
1227                ShutdownError::SourceAborted { key, error }
1228            })
1229        }
1230        .instrument(task_span.clone());
1231        let spawned = spawn_named(task, task_name.as_ref());
1232        if let Some(previous) = self.tasks.insert(key.clone(), spawned) {
1233            drop(previous); // detach and forget
1234        }
1235
1236        self.shutdown_coordinator
1237            .takeover_source(key, &mut new_pieces.shutdown_coordinator);
1238
1239        // Now spawn the actual source task.
1240        let source_task = new_pieces.source_tasks.remove(key).unwrap();
1241        let source_task = {
1242            let key = key.clone();
1243            handle_errors(source_task, self.abort_tx.clone(), |error| {
1244                ShutdownError::SourceAborted { key, error }
1245            })
1246        }
1247        .instrument(task_span);
1248        self.source_tasks
1249            .insert(key.clone(), spawn_named(source_task, task_name.as_ref()));
1250    }
1251
1252    pub async fn start_init_validated(
1253        config: Config,
1254        extra_context: ExtraContext,
1255    ) -> Option<(Self, ShutdownErrorReceiver)> {
1256        let diff = ConfigDiff::initial(&config);
1257        let pieces = TopologyPiecesBuilder::new(&config, &diff)
1258            .with_extra_context(extra_context)
1259            .build_or_log_errors()
1260            .await?;
1261        Self::start_validated(config, diff, pieces).await
1262    }
1263
1264    pub async fn start_validated(
1265        config: Config,
1266        diff: ConfigDiff,
1267        mut pieces: TopologyPieces,
1268    ) -> Option<(Self, ShutdownErrorReceiver)> {
1269        let (abort_tx, abort_rx) = mpsc::unbounded_channel();
1270
1271        let expire_metrics = match (
1272            config.global.expire_metrics,
1273            config.global.expire_metrics_secs,
1274        ) {
1275            (Some(e), None) => {
1276                warn!(
1277                    "DEPRECATED: `expire_metrics` setting is deprecated and will be removed in a future version. Use `expire_metrics_secs` instead."
1278                );
1279                if e < Duration::from_secs(0) {
1280                    None
1281                } else {
1282                    Some(e.as_secs_f64())
1283                }
1284            }
1285            (Some(_), Some(_)) => {
1286                error!(
1287                    message = "Cannot set both `expire_metrics` and `expire_metrics_secs`.",
1288                    internal_log_rate_limit = false
1289                );
1290                return None;
1291            }
1292            (None, Some(e)) => {
1293                if e < 0f64 {
1294                    None
1295                } else {
1296                    Some(e)
1297                }
1298            }
1299            (None, None) => Some(300f64),
1300        };
1301
1302        if let Err(error) = crate::metrics::Controller::get()
1303            .expect("Metrics must be initialized")
1304            .set_expiry(
1305                expire_metrics,
1306                config
1307                    .global
1308                    .expire_metrics_per_metric_set
1309                    .clone()
1310                    .unwrap_or_default(),
1311            )
1312        {
1313            error!(message = "Invalid metrics expiry.", %error, internal_log_rate_limit = false);
1314            return None;
1315        }
1316
1317        let (utilization_emitter, utilization_registry) = pieces
1318            .utilization
1319            .take()
1320            .expect("Topology is missing the utilization metric emitter!");
1321        let mut running_topology = Self::new(config, abort_tx);
1322
1323        if !running_topology
1324            .run_healthchecks(&diff, &mut pieces, running_topology.config.healthchecks)
1325            .await
1326        {
1327            return None;
1328        }
1329        running_topology.connect_diff(&diff, &mut pieces).await;
1330        running_topology.spawn_diff(&diff, pieces);
1331
1332        let (utilization_task_shutdown_trigger, utilization_shutdown_signal, _) =
1333            ShutdownSignal::new_wired();
1334        running_topology.utilization_registry = Some(utilization_registry.clone());
1335        running_topology.utilization_task_shutdown_trigger =
1336            Some(utilization_task_shutdown_trigger);
1337        running_topology.utilization_task = Some(tokio::spawn(Task::new(
1338            "utilization_heartbeat".into(),
1339            "",
1340            async move {
1341                utilization_emitter
1342                    .run_utilization(utilization_shutdown_signal)
1343                    .await;
1344                // TODO: new task output type for this? Or handle this task in a completely
1345                // different way
1346                Ok(TaskOutput::Healthcheck)
1347            },
1348        )));
1349
1350        Some((running_topology, abort_rx))
1351    }
1352}
1353
1354fn get_changed_outputs(diff: &ConfigDiff, output_ids: Inputs<OutputId>) -> Vec<OutputId> {
1355    let mut changed_outputs = Vec::new();
1356
1357    for source_key in &diff.sources.to_change {
1358        changed_outputs.extend(
1359            output_ids
1360                .iter()
1361                .filter(|id| &id.component == source_key)
1362                .cloned(),
1363        );
1364    }
1365
1366    for transform_key in &diff.transforms.to_change {
1367        changed_outputs.extend(
1368            output_ids
1369                .iter()
1370                .filter(|id| &id.component == transform_key)
1371                .cloned(),
1372        );
1373    }
1374
1375    changed_outputs
1376}