vector/topology/
running.rs

1use std::{
2    collections::{HashMap, HashSet},
3    sync::{
4        Arc, Mutex,
5        atomic::{AtomicBool, Ordering},
6    },
7};
8
9use futures::{Future, FutureExt, future};
10use stream_cancel::Trigger;
11use tokio::{
12    sync::{mpsc, watch},
13    time::{Duration, Instant, interval, sleep_until},
14};
15use tracing::Instrument;
16use vector_lib::{
17    buffers::topology::channel::BufferSender,
18    shutdown::ShutdownSignal,
19    tap::topology::{TapOutput, TapResource, WatchRx, WatchTx},
20    trigger::DisabledTrigger,
21};
22
23use super::{
24    BuiltBuffer, TaskHandle,
25    builder::{self, TopologyPieces, reload_enrichment_tables},
26    fanout::{ControlChannel, ControlMessage},
27    handle_errors, retain, take_healthchecks,
28    task::{Task, TaskOutput},
29};
30use crate::{
31    config::{ComponentKey, Config, ConfigDiff, HealthcheckOptions, Inputs, OutputId, Resource},
32    event::EventArray,
33    extra_context::ExtraContext,
34    shutdown::SourceShutdownCoordinator,
35    signal::ShutdownError,
36    spawn_named,
37};
38
39pub type ShutdownErrorReceiver = mpsc::UnboundedReceiver<ShutdownError>;
40
41#[allow(dead_code)]
42pub struct RunningTopology {
43    inputs: HashMap<ComponentKey, BufferSender<EventArray>>,
44    inputs_tap_metadata: HashMap<ComponentKey, Inputs<OutputId>>,
45    outputs: HashMap<OutputId, ControlChannel>,
46    outputs_tap_metadata: HashMap<ComponentKey, (&'static str, String)>,
47    source_tasks: HashMap<ComponentKey, TaskHandle>,
48    tasks: HashMap<ComponentKey, TaskHandle>,
49    shutdown_coordinator: SourceShutdownCoordinator,
50    detach_triggers: HashMap<ComponentKey, DisabledTrigger>,
51    pub(crate) config: Config,
52    pub(crate) abort_tx: mpsc::UnboundedSender<ShutdownError>,
53    watch: (WatchTx, WatchRx),
54    pub(crate) running: Arc<AtomicBool>,
55    graceful_shutdown_duration: Option<Duration>,
56    utilization_task: Option<TaskHandle>,
57    utilization_task_shutdown_trigger: Option<Trigger>,
58    pending_reload: Option<HashSet<ComponentKey>>,
59}
60
61impl RunningTopology {
62    pub fn new(config: Config, abort_tx: mpsc::UnboundedSender<ShutdownError>) -> Self {
63        Self {
64            inputs: HashMap::new(),
65            inputs_tap_metadata: HashMap::new(),
66            outputs: HashMap::new(),
67            outputs_tap_metadata: HashMap::new(),
68            shutdown_coordinator: SourceShutdownCoordinator::default(),
69            detach_triggers: HashMap::new(),
70            source_tasks: HashMap::new(),
71            tasks: HashMap::new(),
72            abort_tx,
73            watch: watch::channel(TapResource::default()),
74            running: Arc::new(AtomicBool::new(true)),
75            graceful_shutdown_duration: config.graceful_shutdown_duration,
76            config,
77            utilization_task: None,
78            utilization_task_shutdown_trigger: None,
79            pending_reload: None,
80        }
81    }
82
83    /// Gets the configuration that represents this running topology.
84    pub const fn config(&self) -> &Config {
85        &self.config
86    }
87
88    /// Adds a set of component keys to the pending reload set if one exists. Otherwise, it
89    /// initializes the pending reload set.
90    pub fn extend_reload_set(&mut self, new_set: HashSet<ComponentKey>) {
91        match &mut self.pending_reload {
92            None => self.pending_reload = Some(new_set.clone()),
93            Some(existing) => existing.extend(new_set),
94        }
95    }
96
97    /// Creates a subscription to topology changes.
98    ///
99    /// This is used by the tap API to observe configuration changes, and re-wire tap sinks.
100    pub fn watch(&self) -> watch::Receiver<TapResource> {
101        self.watch.1.clone()
102    }
103
104    /// Signal that all sources in this topology are ended.
105    ///
106    /// The future returned by this function will finish once all the sources in
107    /// this topology have finished. This allows the caller to wait for or
108    /// detect that the sources in the topology are no longer
109    /// producing. [`Application`][crate::app::Application], as an example, uses this as a
110    /// shutdown signal.
111    pub fn sources_finished(&self) -> future::BoxFuture<'static, ()> {
112        self.shutdown_coordinator.shutdown_tripwire()
113    }
114
115    /// Shut down all topology components.
116    ///
117    /// This function sends the shutdown signal to all sources in this topology
118    /// and returns a future that resolves once all components (sources,
119    /// transforms, and sinks) have finished shutting down. Transforms and sinks
120    /// will shut down automatically once their input tasks finish.
121    ///
122    /// This function takes ownership of `self`, so once it returns everything
123    /// in the [`RunningTopology`] instance has been dropped except for the
124    /// `tasks` map. This map gets moved into the returned future and is used to
125    /// poll for when the tasks have completed. Once the returned future is
126    /// dropped then everything from this RunningTopology instance is fully
127    /// dropped.
128    pub fn stop(self) -> impl Future<Output = ()> {
129        // Update the API's health endpoint to signal shutdown
130        self.running.store(false, Ordering::Relaxed);
131        // Create handy handles collections of all tasks for the subsequent
132        // operations.
133        let mut wait_handles = Vec::new();
134        // We need a Vec here since source components have two tasks. One for
135        // pump in self.tasks, and the other for source in self.source_tasks.
136        let mut check_handles = HashMap::<ComponentKey, Vec<_>>::new();
137
138        let map_closure = |_result| ();
139
140        // We need to give some time to the sources to gracefully shutdown, so
141        // we will merge them with other tasks.
142        for (key, task) in self.tasks.into_iter().chain(self.source_tasks.into_iter()) {
143            let task = task.map(map_closure).shared();
144
145            wait_handles.push(task.clone());
146            check_handles.entry(key).or_default().push(task);
147        }
148
149        if let Some(utilization_task) = self.utilization_task {
150            wait_handles.push(utilization_task.map(map_closure).shared());
151        }
152
153        // If we reach this, we will forcefully shutdown the sources. If None, we will never force shutdown.
154        let deadline = self
155            .graceful_shutdown_duration
156            .map(|grace_period| Instant::now() + grace_period);
157
158        let timeout = if let Some(deadline) = deadline {
159            // If we reach the deadline, this future will print out which components
160            // won't gracefully shutdown since we will start to forcefully shutdown
161            // the sources.
162            let mut check_handles2 = check_handles.clone();
163            Box::pin(async move {
164                sleep_until(deadline).await;
165                // Remove all tasks that have shutdown.
166                check_handles2.retain(|_key, handles| {
167                    retain(handles, |handle| handle.peek().is_none());
168                    !handles.is_empty()
169                });
170                let remaining_components = check_handles2
171                    .keys()
172                    .map(|item| item.to_string())
173                    .collect::<Vec<_>>()
174                    .join(", ");
175
176                error!(
177                    components = ?remaining_components,
178                    "Failed to gracefully shut down in time. Killing components."
179                );
180            }) as future::BoxFuture<'static, ()>
181        } else {
182            Box::pin(future::pending()) as future::BoxFuture<'static, ()>
183        };
184
185        // Reports in intervals which components are still running.
186        let mut interval = interval(Duration::from_secs(5));
187        let reporter = async move {
188            loop {
189                interval.tick().await;
190
191                // Remove all tasks that have shutdown.
192                check_handles.retain(|_key, handles| {
193                    retain(handles, |handle| handle.peek().is_none());
194                    !handles.is_empty()
195                });
196                let remaining_components = check_handles
197                    .keys()
198                    .map(|item| item.to_string())
199                    .collect::<Vec<_>>()
200                    .join(", ");
201
202                let (deadline_passed, time_remaining) = match deadline {
203                    Some(d) => match d.checked_duration_since(Instant::now()) {
204                        Some(remaining) => (false, format!("{} seconds left", remaining.as_secs())),
205                        None => (true, "overdue".to_string()),
206                    },
207                    None => (false, "no time limit".to_string()),
208                };
209
210                info!(
211                    remaining_components = ?remaining_components,
212                    time_remaining = ?time_remaining,
213                    "Shutting down... Waiting on running components."
214                );
215
216                let all_done = check_handles.is_empty();
217
218                if all_done {
219                    info!("Shutdown reporter exiting: all components shut down.");
220                    break;
221                } else if deadline_passed {
222                    error!(remaining_components = ?remaining_components, "Shutdown reporter: deadline exceeded.");
223                    break;
224                }
225            }
226        };
227
228        // Finishes once all tasks have shutdown.
229        let success = futures::future::join_all(wait_handles).map(|_| ());
230
231        // Aggregate future that ends once anything detects that all tasks have shutdown.
232        let shutdown_complete_future = future::select_all(vec![
233            Box::pin(timeout) as future::BoxFuture<'static, ()>,
234            Box::pin(reporter) as future::BoxFuture<'static, ()>,
235            Box::pin(success) as future::BoxFuture<'static, ()>,
236        ]);
237
238        // Now kick off the shutdown process by shutting down the sources.
239        let source_shutdown_complete = self.shutdown_coordinator.shutdown_all(deadline);
240        if let Some(trigger) = self.utilization_task_shutdown_trigger {
241            trigger.cancel();
242        }
243
244        futures::future::join(source_shutdown_complete, shutdown_complete_future).map(|_| ())
245    }
246
247    /// Attempts to load a new configuration and update this running topology.
248    ///
249    /// If the new configuration was valid, and all changes were able to be made -- removing of
250    /// old components, changing of existing components, adding of new components -- then `Ok(true)`
251    /// is returned.
252    ///
253    /// If the new configuration is not valid, or not all of the changes in the new configuration
254    /// were able to be made, then this method will attempt to undo the changes made and bring the
255    /// topology back to its previous state.  If either of these scenarios occur, then `Ok(false)`
256    /// is returned.
257    ///
258    /// # Errors
259    ///
260    /// If all changes from the new configuration cannot be made, and the current configuration
261    /// cannot be fully restored, then `Err(())` is returned.
262    pub async fn reload_config_and_respawn(
263        &mut self,
264        new_config: Config,
265        extra_context: ExtraContext,
266    ) -> Result<bool, ()> {
267        info!("Reloading running topology with new configuration.");
268
269        if self.config.global != new_config.global {
270            match self.config.global.diff(&new_config.global) {
271                Ok(changed) => {
272                    error!(
273                        message = "Global options changed; reload aborted.",
274                        changed_fields = ?changed
275                    );
276                }
277                Err(err) => {
278                    error!(
279                        message = "Failed to compute config diff; reload aborted.",
280                        %err
281                    );
282                }
283            }
284            error!(
285                message = "Global options can't be changed while reloading config file; reload aborted. Please restart Vector to reload the configuration file."
286            );
287            return Ok(false);
288        }
289
290        // Calculate the change between the current configuration and the new configuration, and
291        // shutdown any components that are changing so that we can reclaim their buffers before
292        // spawning the new version of the component.
293        //
294        // We also shutdown any component that is simply being removed entirely.
295        let diff = if let Some(components) = &self.pending_reload {
296            ConfigDiff::new(&self.config, &new_config, components.clone())
297        } else {
298            ConfigDiff::new(&self.config, &new_config, HashSet::new())
299        };
300        let buffers = self.shutdown_diff(&diff, &new_config).await;
301
302        // Gives windows some time to make available any port
303        // released by shutdown components.
304        // Issue: https://github.com/vectordotdev/vector/issues/3035
305        if cfg!(windows) {
306            // This value is guess work.
307            tokio::time::sleep(Duration::from_millis(200)).await;
308        }
309
310        // Try to build all of the new components coming from the new configuration.  If we can
311        // successfully build them, we'll attempt to connect them up to the topology and spawn their
312        // respective component tasks.
313        if let Some(mut new_pieces) = TopologyPieces::build_or_log_errors(
314            &new_config,
315            &diff,
316            buffers.clone(),
317            extra_context.clone(),
318        )
319        .await
320        {
321            // If healthchecks are configured for any of the changing/new components, try running
322            // them before moving forward with connecting and spawning.  In some cases, healthchecks
323            // failing may be configured as a non-blocking issue and so we'll still continue on.
324            if self
325                .run_healthchecks(&diff, &mut new_pieces, new_config.healthchecks)
326                .await
327            {
328                self.connect_diff(&diff, &mut new_pieces).await;
329                self.spawn_diff(&diff, new_pieces);
330                self.config = new_config;
331
332                info!("New configuration loaded successfully.");
333
334                return Ok(true);
335            }
336        }
337
338        // We failed to build, connect, and spawn all of the changed/new components, so we flip
339        // around the configuration differential to generate all the components that we need to
340        // bring back to restore the current configuration.
341        warn!("Failed to completely load new configuration. Restoring old configuration.");
342
343        let diff = diff.flip();
344        if let Some(mut new_pieces) =
345            TopologyPieces::build_or_log_errors(&self.config, &diff, buffers, extra_context.clone())
346                .await
347            && self
348                .run_healthchecks(&diff, &mut new_pieces, self.config.healthchecks)
349                .await
350        {
351            self.connect_diff(&diff, &mut new_pieces).await;
352            self.spawn_diff(&diff, new_pieces);
353
354            info!("Old configuration restored successfully.");
355
356            return Ok(false);
357        }
358
359        error!("Failed to restore old configuration.");
360
361        Err(())
362    }
363
364    /// Attempts to reload enrichment tables.
365    pub(crate) async fn reload_enrichment_tables(&self) {
366        reload_enrichment_tables(&self.config).await;
367    }
368
369    pub(crate) async fn run_healthchecks(
370        &mut self,
371        diff: &ConfigDiff,
372        pieces: &mut TopologyPieces,
373        options: HealthcheckOptions,
374    ) -> bool {
375        if options.enabled {
376            let healthchecks = take_healthchecks(diff, pieces)
377                .into_iter()
378                .map(|(_, task)| task);
379            let healthchecks = future::try_join_all(healthchecks);
380
381            info!("Running healthchecks.");
382            if options.require_healthy {
383                let success = healthchecks.await;
384
385                if success.is_ok() {
386                    info!("All healthchecks passed.");
387                    true
388                } else {
389                    error!("Sinks unhealthy.");
390                    false
391                }
392            } else {
393                tokio::spawn(healthchecks);
394                true
395            }
396        } else {
397            true
398        }
399    }
400
401    /// Shuts down any changed/removed component in the given configuration diff.
402    ///
403    /// If buffers for any of the changed/removed components can be recovered, they'll be returned.
404    async fn shutdown_diff(
405        &mut self,
406        diff: &ConfigDiff,
407        new_config: &Config,
408    ) -> HashMap<ComponentKey, BuiltBuffer> {
409        // First, we shutdown any changed/removed sources. This ensures that we can allow downstream
410        // components to terminate naturally by virtue of the flow of events stopping.
411        if diff.sources.any_changed_or_removed() {
412            let timeout = Duration::from_secs(30);
413            let mut source_shutdown_handles = Vec::new();
414
415            let deadline = Instant::now() + timeout;
416            for key in &diff.sources.to_remove {
417                debug!(component = %key, "Removing source.");
418
419                let previous = self.tasks.remove(key).unwrap();
420                drop(previous); // detach and forget
421
422                self.remove_outputs(key);
423                source_shutdown_handles
424                    .push(self.shutdown_coordinator.shutdown_source(key, deadline));
425            }
426
427            for key in &diff.sources.to_change {
428                debug!(component = %key, "Changing source.");
429
430                self.remove_outputs(key);
431                source_shutdown_handles
432                    .push(self.shutdown_coordinator.shutdown_source(key, deadline));
433            }
434
435            debug!(
436                "Waiting for up to {} seconds for source(s) to finish shutting down.",
437                timeout.as_secs()
438            );
439            futures::future::join_all(source_shutdown_handles).await;
440
441            // Final cleanup pass now that all changed/removed sources have signalled as having shutdown.
442            for key in diff.sources.removed_and_changed() {
443                if let Some(task) = self.source_tasks.remove(key) {
444                    task.await.unwrap().unwrap();
445                }
446            }
447        }
448
449        // Next, we shutdown any changed/removed transforms.  Same as before: we want allow
450        // downstream components to terminate naturally by virtue of the flow of events stopping.
451        //
452        // Since transforms are entirely driven by the flow of events into them from upstream
453        // components, the shutdown of sources they depend on, or the shutdown of transforms they
454        // depend on, and thus the closing of their buffer, will naturally cause them to shutdown,
455        // which is why we don't do any manual triggering of shutdown here.
456        for key in &diff.transforms.to_remove {
457            debug!(component = %key, "Removing transform.");
458
459            let previous = self.tasks.remove(key).unwrap();
460            drop(previous); // detach and forget
461
462            self.remove_inputs(key, diff, new_config).await;
463            self.remove_outputs(key);
464        }
465
466        for key in &diff.transforms.to_change {
467            debug!(component = %key, "Changing transform.");
468
469            self.remove_inputs(key, diff, new_config).await;
470            self.remove_outputs(key);
471        }
472
473        // Now we'll process any changed/removed sinks.
474        //
475        // At this point both the old and the new config don't have conflicts in their resource
476        // usage. So if we combine their resources, all found conflicts are between to be removed
477        // and to be added components.
478        let removed_table_sinks = diff
479            .enrichment_tables
480            .removed_and_changed()
481            .filter_map(|key| {
482                self.config
483                    .enrichment_table(key)
484                    .and_then(|t| t.as_sink(key))
485                    .map(|(key, s)| (key.clone(), s.resources(&key)))
486            })
487            .collect::<Vec<_>>();
488        let remove_sink = diff
489            .sinks
490            .removed_and_changed()
491            .map(|key| {
492                (
493                    key,
494                    self.config
495                        .sink(key)
496                        .map(|s| s.resources(key))
497                        .unwrap_or_default(),
498                )
499            })
500            .chain(removed_table_sinks.iter().map(|(k, s)| (k, s.clone())));
501        let add_source = diff
502            .sources
503            .changed_and_added()
504            .map(|key| (key, new_config.source(key).unwrap().inner.resources()));
505        let added_table_sinks = diff
506            .enrichment_tables
507            .changed_and_added()
508            .filter_map(|key| {
509                self.config
510                    .enrichment_table(key)
511                    .and_then(|t| t.as_sink(key))
512                    .map(|(key, s)| (key.clone(), s.resources(&key)))
513            })
514            .collect::<Vec<_>>();
515        let add_sink = diff
516            .sinks
517            .changed_and_added()
518            .map(|key| {
519                (
520                    key,
521                    new_config
522                        .sink(key)
523                        .map(|s| s.resources(key))
524                        .unwrap_or_default(),
525                )
526            })
527            .chain(added_table_sinks.iter().map(|(k, s)| (k, s.clone())));
528        let conflicts = Resource::conflicts(
529            remove_sink.map(|(key, value)| ((true, key), value)).chain(
530                add_sink
531                    .chain(add_source)
532                    .map(|(key, value)| ((false, key), value)),
533            ),
534        )
535        .into_iter()
536        .flat_map(|(_, components)| components)
537        .collect::<HashSet<_>>();
538        // Existing conflicting sinks
539        let conflicting_sinks = conflicts
540            .into_iter()
541            .filter(|&(existing_sink, _)| existing_sink)
542            .map(|(_, key)| key.clone());
543
544        // For any sink whose buffer configuration didn't change, we can reuse their buffer.
545        let reuse_buffers = diff
546            .sinks
547            .to_change
548            .iter()
549            .filter(|&key| {
550                if diff.components_to_reload.contains(key) {
551                    return false;
552                }
553                self.config.sink(key).map(|s| s.buffer.clone()).or_else(|| {
554                    self.config
555                        .enrichment_table(key)
556                        .and_then(|t| t.as_sink(key))
557                        .map(|(_, s)| s.buffer)
558                }) == new_config.sink(key).map(|s| s.buffer.clone()).or_else(|| {
559                    self.config
560                        .enrichment_table(key)
561                        .and_then(|t| t.as_sink(key))
562                        .map(|(_, s)| s.buffer)
563                })
564            })
565            .cloned()
566            .collect::<HashSet<_>>();
567
568        // For any existing sink that has a conflicting resource dependency with a changed/added
569        // sink, or for any sink that we want to reuse their buffer, we need to explicit wait for
570        // them to finish processing so we can reclaim ownership of those resources/buffers.
571        let wait_for_sinks = conflicting_sinks
572            .chain(reuse_buffers.iter().cloned())
573            .collect::<HashSet<_>>();
574
575        // First, we remove any inputs to removed sinks so they can naturally shut down.
576        let removed_sinks = diff
577            .sinks
578            .to_remove
579            .iter()
580            .chain(diff.enrichment_tables.to_remove.iter().filter(|key| {
581                self.config
582                    .enrichment_table(key)
583                    .and_then(|t| t.as_sink(key))
584                    .is_some()
585            }))
586            .collect::<Vec<_>>();
587        for key in &removed_sinks {
588            debug!(component = %key, "Removing sink.");
589            self.remove_inputs(key, diff, new_config).await;
590        }
591
592        // After that, for any changed sinks, we temporarily detach their inputs (not remove) so
593        // they can naturally shutdown and allow us to recover their buffers if possible.
594        let mut buffer_tx = HashMap::new();
595
596        let sinks_to_change = diff
597            .sinks
598            .to_change
599            .iter()
600            .chain(diff.enrichment_tables.to_change.iter().filter(|key| {
601                self.config
602                    .enrichment_table(key)
603                    .and_then(|t| t.as_sink(key))
604                    .is_some()
605            }))
606            .collect::<Vec<_>>();
607
608        for key in &sinks_to_change {
609            debug!(component = %key, "Changing sink.");
610            if reuse_buffers.contains(key) {
611                self.detach_triggers
612                    .remove(key)
613                    .unwrap()
614                    .into_inner()
615                    .cancel();
616
617                // We explicitly clone the input side of the buffer and store it so we don't lose
618                // it when we remove the inputs below.
619                //
620                // We clone instead of removing here because otherwise the input will be missing for
621                // the rest of the reload process, which violates the assumption that all previous
622                // inputs for components not being removed are still available. It's simpler to
623                // allow the "old" input to stick around and be replaced (even though that's
624                // basically a no-op since we're reusing the same buffer) than it is to pass around
625                // info about which sinks are having their buffers reused and treat them differently
626                // at other stages.
627                buffer_tx.insert((*key).clone(), self.inputs.get(key).unwrap().clone());
628            }
629            self.remove_inputs(key, diff, new_config).await;
630        }
631
632        // Now that we've disconnected or temporarily detached the inputs to all changed/removed
633        // sinks, we can actually wait for them to shutdown before collecting any buffers that are
634        // marked for reuse.
635        //
636        // If a sink we're removing isn't tying up any resource that a changed/added sink depends
637        // on, we don't bother waiting for it to shutdown.
638        for key in &removed_sinks {
639            let previous = self.tasks.remove(key).unwrap();
640            if wait_for_sinks.contains(key) {
641                debug!(message = "Waiting for sink to shutdown.", %key);
642                previous.await.unwrap().unwrap();
643            } else {
644                drop(previous); // detach and forget
645            }
646        }
647
648        let mut buffers = HashMap::<ComponentKey, BuiltBuffer>::new();
649        for key in &sinks_to_change {
650            if wait_for_sinks.contains(key) {
651                let previous = self.tasks.remove(key).unwrap();
652                debug!(message = "Waiting for sink to shutdown.", %key);
653                let buffer = previous.await.unwrap().unwrap();
654
655                if reuse_buffers.contains(key) {
656                    // We clone instead of removing here because otherwise the input will be
657                    // missing for the rest of the reload process, which violates the assumption
658                    // that all previous inputs for components not being removed are still
659                    // available. It's simpler to allow the "old" input to stick around and be
660                    // replaced (even though that's basically a no-op since we're reusing the same
661                    // buffer) than it is to pass around info about which sinks are having their
662                    // buffers reused and treat them differently at other stages.
663                    let tx = buffer_tx.remove(key).unwrap();
664                    let rx = match buffer {
665                        TaskOutput::Sink(rx) => rx.into_inner(),
666                        _ => unreachable!(),
667                    };
668
669                    buffers.insert((*key).clone(), (tx, Arc::new(Mutex::new(Some(rx)))));
670                }
671            }
672        }
673
674        buffers
675    }
676
677    /// Connects all changed/added components in the given configuration diff.
678    pub(crate) async fn connect_diff(
679        &mut self,
680        diff: &ConfigDiff,
681        new_pieces: &mut TopologyPieces,
682    ) {
683        debug!("Connecting changed/added component(s).");
684
685        // Update tap metadata
686        if !self.watch.0.is_closed() {
687            for key in &diff.sources.to_remove {
688                // Sources only have outputs
689                self.outputs_tap_metadata.remove(key);
690            }
691
692            for key in &diff.transforms.to_remove {
693                // Transforms can have both inputs and outputs
694                self.outputs_tap_metadata.remove(key);
695                self.inputs_tap_metadata.remove(key);
696            }
697
698            for key in &diff.sinks.to_remove {
699                // Sinks only have inputs
700                self.inputs_tap_metadata.remove(key);
701            }
702
703            let removed_sinks = diff.enrichment_tables.to_remove.iter().filter(|key| {
704                self.config
705                    .enrichment_table(key)
706                    .and_then(|t| t.as_sink(key))
707                    .is_some()
708            });
709            for key in removed_sinks {
710                // Sinks only have inputs
711                self.inputs_tap_metadata.remove(key);
712            }
713
714            for key in diff.sources.changed_and_added() {
715                if let Some(task) = new_pieces.tasks.get(key) {
716                    self.outputs_tap_metadata
717                        .insert(key.clone(), ("source", task.typetag().to_string()));
718                }
719            }
720
721            for key in diff.transforms.changed_and_added() {
722                if let Some(task) = new_pieces.tasks.get(key) {
723                    self.outputs_tap_metadata
724                        .insert(key.clone(), ("transform", task.typetag().to_string()));
725                }
726            }
727
728            for (key, input) in &new_pieces.inputs {
729                self.inputs_tap_metadata
730                    .insert(key.clone(), input.1.clone());
731            }
732        }
733
734        // We configure the outputs of any changed/added sources first, so they're available to any
735        // transforms and sinks that come afterwards.
736        for key in diff.sources.changed_and_added() {
737            debug!(component = %key, "Configuring outputs for source.");
738            self.setup_outputs(key, new_pieces).await;
739        }
740
741        let added_changed_table_sources: Vec<&ComponentKey> = diff
742            .enrichment_tables
743            .changed_and_added()
744            .filter(|k| new_pieces.source_tasks.contains_key(k))
745            .collect();
746        for key in added_changed_table_sources {
747            debug!(component = %key, "Connecting outputs for enrichment table source.");
748            self.setup_outputs(key, new_pieces).await;
749        }
750
751        // We configure the outputs of any changed/added transforms next, for the same reason: we
752        // need them to be available to any transforms and sinks that come afterwards.
753        for key in diff.transforms.changed_and_added() {
754            debug!(component = %key, "Configuring outputs for transform.");
755            self.setup_outputs(key, new_pieces).await;
756        }
757
758        // Now that all possible outputs are configured, we can start wiring up inputs, starting
759        // with transforms.
760        for key in diff.transforms.changed_and_added() {
761            debug!(component = %key, "Connecting inputs for transform.");
762            self.setup_inputs(key, diff, new_pieces).await;
763        }
764
765        // Now that all sources and transforms are fully configured, we can wire up sinks.
766        for key in diff.sinks.changed_and_added() {
767            debug!(component = %key, "Connecting inputs for sink.");
768            self.setup_inputs(key, diff, new_pieces).await;
769        }
770        let added_changed_tables: Vec<&ComponentKey> = diff
771            .enrichment_tables
772            .changed_and_added()
773            .filter(|k| new_pieces.inputs.contains_key(k))
774            .collect();
775        for key in added_changed_tables {
776            debug!(component = %key, "Connecting inputs for enrichment table sink.");
777            self.setup_inputs(key, diff, new_pieces).await;
778        }
779
780        // We do a final pass here to reconnect unchanged components.
781        //
782        // Why would we reconnect unchanged components?  Well, as sources and transforms will
783        // recreate their fanouts every time they're changed, we can run into a situation where a
784        // transform/sink, which we'll call B, is pointed at a source/transform that was changed, which
785        // we'll call A, but because B itself didn't change at all, we haven't yet reconnected it.
786        //
787        // Instead of propagating connections forward -- B reconnecting A forcefully -- we only
788        // connect components backwards i.e. transforms to sources/transforms, and sinks to
789        // sources/transforms, to ensure we're connecting components in order.
790        self.reattach_severed_inputs(diff);
791
792        // Broadcast any topology changes to subscribers.
793        if !self.watch.0.is_closed() {
794            let outputs = self
795                .outputs
796                .clone()
797                .into_iter()
798                .flat_map(|(output_id, control_tx)| {
799                    self.outputs_tap_metadata.get(&output_id.component).map(
800                        |(component_kind, component_type)| {
801                            (
802                                TapOutput {
803                                    output_id,
804                                    component_kind,
805                                    component_type: component_type.clone(),
806                                },
807                                control_tx,
808                            )
809                        },
810                    )
811                })
812                .collect::<HashMap<_, _>>();
813
814            let mut removals = diff.sources.to_remove.clone();
815            removals.extend(diff.transforms.to_remove.iter().cloned());
816            self.watch
817                .0
818                .send(TapResource {
819                    outputs,
820                    inputs: self.inputs_tap_metadata.clone(),
821                    source_keys: diff
822                        .sources
823                        .changed_and_added()
824                        .map(|key| key.to_string())
825                        .collect(),
826                    sink_keys: diff
827                        .sinks
828                        .changed_and_added()
829                        .map(|key| key.to_string())
830                        .collect(),
831                    // Note, only sources and transforms are relevant. Sinks do
832                    // not have outputs to tap.
833                    removals,
834                })
835                .expect("Couldn't broadcast config changes.");
836        }
837    }
838
839    async fn setup_outputs(
840        &mut self,
841        key: &ComponentKey,
842        new_pieces: &mut builder::TopologyPieces,
843    ) {
844        let outputs = new_pieces.outputs.remove(key).unwrap();
845        for (port, output) in outputs {
846            debug!(component = %key, output_id = ?port, "Configuring output for component.");
847
848            let id = OutputId {
849                component: key.clone(),
850                port,
851            };
852
853            self.outputs.insert(id, output);
854        }
855    }
856
857    async fn setup_inputs(
858        &mut self,
859        key: &ComponentKey,
860        diff: &ConfigDiff,
861        new_pieces: &mut builder::TopologyPieces,
862    ) {
863        let (tx, inputs) = new_pieces.inputs.remove(key).unwrap();
864
865        let old_inputs = self
866            .config
867            .inputs_for_node(key)
868            .into_iter()
869            .flatten()
870            .cloned()
871            .collect::<HashSet<_>>();
872
873        let new_inputs = inputs.iter().cloned().collect::<HashSet<_>>();
874        let inputs_to_add = &new_inputs - &old_inputs;
875
876        for input in inputs {
877            let output = self.outputs.get_mut(&input).expect("unknown output");
878
879            if diff.contains(&input.component) || inputs_to_add.contains(&input) {
880                // If the input we're connecting to is changing, that means its outputs will have been
881                // recreated, so instead of replacing a paused sink, we have to add it to this new
882                // output for the first time, since there's nothing to actually replace at this point.
883                debug!(component = %key, fanout_id = %input, "Adding component input to fanout.");
884
885                _ = output.send(ControlMessage::Add(key.clone(), tx.clone()));
886            } else {
887                // We know that if this component is connected to a given input, and neither
888                // components were changed, then the output must still exist, which means we paused
889                // this component's connection to its output, so we have to replace that connection
890                // now:
891                debug!(component = %key, fanout_id = %input, "Replacing component input in fanout.");
892
893                _ = output.send(ControlMessage::Replace(key.clone(), tx.clone()));
894            }
895        }
896
897        self.inputs.insert(key.clone(), tx);
898        new_pieces
899            .detach_triggers
900            .remove(key)
901            .map(|trigger| self.detach_triggers.insert(key.clone(), trigger.into()));
902    }
903
904    fn remove_outputs(&mut self, key: &ComponentKey) {
905        self.outputs.retain(|id, _output| &id.component != key);
906    }
907
908    async fn remove_inputs(&mut self, key: &ComponentKey, diff: &ConfigDiff, new_config: &Config) {
909        self.inputs.remove(key);
910        self.detach_triggers.remove(key);
911
912        let old_inputs = self.config.inputs_for_node(key).expect("node exists");
913        let new_inputs = new_config
914            .inputs_for_node(key)
915            .unwrap_or_default()
916            .iter()
917            .collect::<HashSet<_>>();
918
919        for input in old_inputs {
920            if let Some(output) = self.outputs.get_mut(input) {
921                if diff.contains(&input.component)
922                    || diff.is_removed(key)
923                    || !new_inputs.contains(input)
924                {
925                    // 3 cases to remove the input:
926                    //
927                    // Case 1: If the input we're removing ourselves from is changing, that means its
928                    // outputs will be recreated, so instead of pausing the sink, we just delete it
929                    // outright to ensure things are clean.
930                    //
931                    // Case 2: If this component itself is being removed, then pausing makes no sense
932                    // because it isn't coming back.
933                    //
934                    // Case 3: This component is no longer connected to the input from new config.
935                    debug!(component = %key, fanout_id = %input, "Removing component input from fanout.");
936
937                    _ = output.send(ControlMessage::Remove(key.clone()));
938                } else {
939                    // We know that if this component is connected to a given input, and it isn't being
940                    // changed, then it will exist when we reconnect inputs, so we should pause it
941                    // now to pause further sends through that component until we reconnect:
942                    debug!(component = %key, fanout_id = %input, "Pausing component input in fanout.");
943
944                    _ = output.send(ControlMessage::Pause(key.clone()));
945                }
946            }
947        }
948    }
949
950    fn reattach_severed_inputs(&mut self, diff: &ConfigDiff) {
951        let unchanged_transforms = self
952            .config
953            .transforms()
954            .filter(|(key, _)| !diff.transforms.contains(key));
955        for (transform_key, transform) in unchanged_transforms {
956            let changed_outputs = get_changed_outputs(diff, transform.inputs.clone());
957            for output_id in changed_outputs {
958                debug!(component = %transform_key, fanout_id = %output_id.component, "Reattaching component input to fanout.");
959
960                let input = self.inputs.get(transform_key).cloned().unwrap();
961                let output = self.outputs.get_mut(&output_id).unwrap();
962                _ = output.send(ControlMessage::Add(transform_key.clone(), input));
963            }
964        }
965
966        let unchanged_sinks = self
967            .config
968            .sinks()
969            .filter(|(key, _)| !diff.sinks.contains(key));
970        for (sink_key, sink) in unchanged_sinks {
971            let changed_outputs = get_changed_outputs(diff, sink.inputs.clone());
972            for output_id in changed_outputs {
973                debug!(component = %sink_key, fanout_id = %output_id.component, "Reattaching component input to fanout.");
974
975                let input = self.inputs.get(sink_key).cloned().unwrap();
976                let output = self.outputs.get_mut(&output_id).unwrap();
977                _ = output.send(ControlMessage::Add(sink_key.clone(), input));
978            }
979        }
980    }
981
982    /// Starts any new or changed components in the given configuration diff.
983    pub(crate) fn spawn_diff(&mut self, diff: &ConfigDiff, mut new_pieces: TopologyPieces) {
984        for key in &diff.sources.to_change {
985            debug!(message = "Spawning changed source.", key = %key);
986            self.spawn_source(key, &mut new_pieces);
987        }
988
989        for key in &diff.sources.to_add {
990            debug!(message = "Spawning new source.", key = %key);
991            self.spawn_source(key, &mut new_pieces);
992        }
993
994        let changed_table_sources: Vec<&ComponentKey> = diff
995            .enrichment_tables
996            .to_change
997            .iter()
998            .filter(|k| new_pieces.source_tasks.contains_key(k))
999            .collect();
1000
1001        let added_table_sources: Vec<&ComponentKey> = diff
1002            .enrichment_tables
1003            .to_add
1004            .iter()
1005            .filter(|k| new_pieces.source_tasks.contains_key(k))
1006            .collect();
1007
1008        for key in changed_table_sources {
1009            debug!(message = "Spawning changed enrichment table source.", key = %key);
1010            self.spawn_source(key, &mut new_pieces);
1011        }
1012
1013        for key in added_table_sources {
1014            debug!(message = "Spawning new enrichment table source.", key = %key);
1015            self.spawn_source(key, &mut new_pieces);
1016        }
1017
1018        for key in &diff.transforms.to_change {
1019            debug!(message = "Spawning changed transform.", key = %key);
1020            self.spawn_transform(key, &mut new_pieces);
1021        }
1022
1023        for key in &diff.transforms.to_add {
1024            debug!(message = "Spawning new transform.", key = %key);
1025            self.spawn_transform(key, &mut new_pieces);
1026        }
1027
1028        for key in &diff.sinks.to_change {
1029            debug!(message = "Spawning changed sink.", key = %key);
1030            self.spawn_sink(key, &mut new_pieces);
1031        }
1032
1033        for key in &diff.sinks.to_add {
1034            trace!(message = "Spawning new sink.", key = %key);
1035            self.spawn_sink(key, &mut new_pieces);
1036        }
1037
1038        let changed_tables: Vec<&ComponentKey> = diff
1039            .enrichment_tables
1040            .to_change
1041            .iter()
1042            .filter(|k| {
1043                new_pieces.tasks.contains_key(k) && !new_pieces.source_tasks.contains_key(k)
1044            })
1045            .collect();
1046
1047        let added_tables: Vec<&ComponentKey> = diff
1048            .enrichment_tables
1049            .to_add
1050            .iter()
1051            .filter(|k| {
1052                new_pieces.tasks.contains_key(k) && !new_pieces.source_tasks.contains_key(k)
1053            })
1054            .collect();
1055
1056        for key in changed_tables {
1057            debug!(message = "Spawning changed enrichment table sink.", key = %key);
1058            self.spawn_sink(key, &mut new_pieces);
1059        }
1060
1061        for key in added_tables {
1062            debug!(message = "Spawning enrichment table new sink.", key = %key);
1063            self.spawn_sink(key, &mut new_pieces);
1064        }
1065    }
1066
1067    fn spawn_sink(&mut self, key: &ComponentKey, new_pieces: &mut builder::TopologyPieces) {
1068        let task = new_pieces.tasks.remove(key).unwrap();
1069        let span = error_span!(
1070            "sink",
1071            component_kind = "sink",
1072            component_id = %task.id(),
1073            component_type = %task.typetag(),
1074        );
1075
1076        let task_span = span.or_current();
1077        #[cfg(feature = "allocation-tracing")]
1078        if crate::internal_telemetry::allocations::is_allocation_tracing_enabled() {
1079            let group_id = crate::internal_telemetry::allocations::acquire_allocation_group_id(
1080                task.id().to_string(),
1081                "sink".to_string(),
1082                task.typetag().to_string(),
1083            );
1084            debug!(
1085                component_kind = "sink",
1086                component_type = task.typetag(),
1087                component_id = task.id(),
1088                group_id = group_id.as_raw().to_string(),
1089                "Registered new allocation group."
1090            );
1091            group_id.attach_to_span(&task_span);
1092        }
1093
1094        let task_name = format!(">> {} ({})", task.typetag(), task.id());
1095        let task = {
1096            let key = key.clone();
1097            handle_errors(task, self.abort_tx.clone(), |error| {
1098                ShutdownError::SinkAborted { key, error }
1099            })
1100        }
1101        .instrument(task_span);
1102        let spawned = spawn_named(task, task_name.as_ref());
1103        if let Some(previous) = self.tasks.insert(key.clone(), spawned) {
1104            drop(previous); // detach and forget
1105        }
1106    }
1107
1108    fn spawn_transform(&mut self, key: &ComponentKey, new_pieces: &mut builder::TopologyPieces) {
1109        let task = new_pieces.tasks.remove(key).unwrap();
1110        let span = error_span!(
1111            "transform",
1112            component_kind = "transform",
1113            component_id = %task.id(),
1114            component_type = %task.typetag(),
1115        );
1116
1117        let task_span = span.or_current();
1118        #[cfg(feature = "allocation-tracing")]
1119        if crate::internal_telemetry::allocations::is_allocation_tracing_enabled() {
1120            let group_id = crate::internal_telemetry::allocations::acquire_allocation_group_id(
1121                task.id().to_string(),
1122                "transform".to_string(),
1123                task.typetag().to_string(),
1124            );
1125            debug!(
1126                component_kind = "transform",
1127                component_type = task.typetag(),
1128                component_id = task.id(),
1129                group_id = group_id.as_raw().to_string(),
1130                "Registered new allocation group."
1131            );
1132            group_id.attach_to_span(&task_span);
1133        }
1134
1135        let task_name = format!(">> {} ({}) >>", task.typetag(), task.id());
1136        let task = {
1137            let key = key.clone();
1138            handle_errors(task, self.abort_tx.clone(), |error| {
1139                ShutdownError::TransformAborted { key, error }
1140            })
1141        }
1142        .instrument(task_span);
1143        let spawned = spawn_named(task, task_name.as_ref());
1144        if let Some(previous) = self.tasks.insert(key.clone(), spawned) {
1145            drop(previous); // detach and forget
1146        }
1147    }
1148
1149    fn spawn_source(&mut self, key: &ComponentKey, new_pieces: &mut builder::TopologyPieces) {
1150        let task = new_pieces.tasks.remove(key).unwrap();
1151        let span = error_span!(
1152            "source",
1153            component_kind = "source",
1154            component_id = %task.id(),
1155            component_type = %task.typetag(),
1156        );
1157
1158        let task_span = span.or_current();
1159        #[cfg(feature = "allocation-tracing")]
1160        if crate::internal_telemetry::allocations::is_allocation_tracing_enabled() {
1161            let group_id = crate::internal_telemetry::allocations::acquire_allocation_group_id(
1162                task.id().to_string(),
1163                "source".to_string(),
1164                task.typetag().to_string(),
1165            );
1166
1167            debug!(
1168                component_kind = "source",
1169                component_type = task.typetag(),
1170                component_id = task.id(),
1171                group_id = group_id.as_raw().to_string(),
1172                "Registered new allocation group."
1173            );
1174            group_id.attach_to_span(&task_span);
1175        }
1176
1177        let task_name = format!("{} ({}) >>", task.typetag(), task.id());
1178        let task = {
1179            let key = key.clone();
1180            handle_errors(task, self.abort_tx.clone(), |error| {
1181                ShutdownError::SourceAborted { key, error }
1182            })
1183        }
1184        .instrument(task_span.clone());
1185        let spawned = spawn_named(task, task_name.as_ref());
1186        if let Some(previous) = self.tasks.insert(key.clone(), spawned) {
1187            drop(previous); // detach and forget
1188        }
1189
1190        self.shutdown_coordinator
1191            .takeover_source(key, &mut new_pieces.shutdown_coordinator);
1192
1193        // Now spawn the actual source task.
1194        let source_task = new_pieces.source_tasks.remove(key).unwrap();
1195        let source_task = {
1196            let key = key.clone();
1197            handle_errors(source_task, self.abort_tx.clone(), |error| {
1198                ShutdownError::SourceAborted { key, error }
1199            })
1200        }
1201        .instrument(task_span);
1202        self.source_tasks
1203            .insert(key.clone(), spawn_named(source_task, task_name.as_ref()));
1204    }
1205
1206    pub async fn start_init_validated(
1207        config: Config,
1208        extra_context: ExtraContext,
1209    ) -> Option<(Self, ShutdownErrorReceiver)> {
1210        let diff = ConfigDiff::initial(&config);
1211        let pieces =
1212            TopologyPieces::build_or_log_errors(&config, &diff, HashMap::new(), extra_context)
1213                .await?;
1214        Self::start_validated(config, diff, pieces).await
1215    }
1216
1217    pub async fn start_validated(
1218        config: Config,
1219        diff: ConfigDiff,
1220        mut pieces: TopologyPieces,
1221    ) -> Option<(Self, ShutdownErrorReceiver)> {
1222        let (abort_tx, abort_rx) = mpsc::unbounded_channel();
1223
1224        let expire_metrics = match (
1225            config.global.expire_metrics,
1226            config.global.expire_metrics_secs,
1227        ) {
1228            (Some(e), None) => {
1229                warn!(
1230                    "DEPRECATED: `expire_metrics` setting is deprecated and will be removed in a future version. Use `expire_metrics_secs` instead."
1231                );
1232                if e < Duration::from_secs(0) {
1233                    None
1234                } else {
1235                    Some(e.as_secs_f64())
1236                }
1237            }
1238            (Some(_), Some(_)) => {
1239                error!("Cannot set both `expire_metrics` and `expire_metrics_secs`.");
1240                return None;
1241            }
1242            (None, Some(e)) => {
1243                if e < 0f64 {
1244                    None
1245                } else {
1246                    Some(e)
1247                }
1248            }
1249            (None, None) => Some(300f64),
1250        };
1251
1252        if let Err(error) = crate::metrics::Controller::get()
1253            .expect("Metrics must be initialized")
1254            .set_expiry(
1255                expire_metrics,
1256                config
1257                    .global
1258                    .expire_metrics_per_metric_set
1259                    .clone()
1260                    .unwrap_or_default(),
1261            )
1262        {
1263            error!(message = "Invalid metrics expiry.", %error);
1264            return None;
1265        }
1266
1267        let mut utilization_emitter = pieces
1268            .utilization_emitter
1269            .take()
1270            .expect("Topology is missing the utilization metric emitter!");
1271        let mut running_topology = Self::new(config, abort_tx);
1272
1273        if !running_topology
1274            .run_healthchecks(&diff, &mut pieces, running_topology.config.healthchecks)
1275            .await
1276        {
1277            return None;
1278        }
1279        running_topology.connect_diff(&diff, &mut pieces).await;
1280        running_topology.spawn_diff(&diff, pieces);
1281
1282        let (utilization_task_shutdown_trigger, utilization_shutdown_signal, _) =
1283            ShutdownSignal::new_wired();
1284        running_topology.utilization_task_shutdown_trigger =
1285            Some(utilization_task_shutdown_trigger);
1286        running_topology.utilization_task = Some(tokio::spawn(Task::new(
1287            "utilization_heartbeat".into(),
1288            "",
1289            async move {
1290                utilization_emitter
1291                    .run_utilization(utilization_shutdown_signal)
1292                    .await;
1293                // TODO: new task output type for this? Or handle this task in a completely
1294                // different way
1295                Ok(TaskOutput::Healthcheck)
1296            },
1297        )));
1298
1299        Some((running_topology, abort_rx))
1300    }
1301}
1302
1303fn get_changed_outputs(diff: &ConfigDiff, output_ids: Inputs<OutputId>) -> Vec<OutputId> {
1304    let mut changed_outputs = Vec::new();
1305
1306    for source_key in &diff.sources.to_change {
1307        changed_outputs.extend(
1308            output_ids
1309                .iter()
1310                .filter(|id| &id.component == source_key)
1311                .cloned(),
1312        );
1313    }
1314
1315    for transform_key in &diff.transforms.to_change {
1316        changed_outputs.extend(
1317            output_ids
1318                .iter()
1319                .filter(|id| &id.component == transform_key)
1320                .cloned(),
1321        );
1322    }
1323
1324    changed_outputs
1325}