1use std::{
2 collections::{HashMap, HashSet},
3 sync::{
4 Arc, Mutex,
5 atomic::{AtomicBool, Ordering},
6 },
7};
8
9use futures::{Future, FutureExt, future};
10use snafu::Snafu;
11use stream_cancel::Trigger;
12use tokio::{
13 sync::{mpsc, watch},
14 time::{Duration, Instant, interval, sleep_until},
15};
16use tracing::Instrument;
17use vector_lib::{
18 buffers::topology::channel::BufferSender,
19 shutdown::ShutdownSignal,
20 tap::topology::{TapOutput, TapResource, WatchRx, WatchTx},
21 trigger::DisabledTrigger,
22};
23
24use super::{
25 BuiltBuffer, TaskHandle,
26 builder::{self, TopologyPieces, TopologyPiecesBuilder, reload_enrichment_tables},
27 fanout::{ControlChannel, ControlMessage},
28 handle_errors, retain, take_healthchecks,
29 task::{Task, TaskOutput},
30};
31use crate::{
32 config::{ComponentKey, Config, ConfigDiff, HealthcheckOptions, Inputs, OutputId, Resource},
33 event::EventArray,
34 extra_context::ExtraContext,
35 shutdown::SourceShutdownCoordinator,
36 signal::ShutdownError,
37 spawn_named,
38 utilization::UtilizationRegistry,
39};
40
41pub type ShutdownErrorReceiver = mpsc::UnboundedReceiver<ShutdownError>;
42
43#[derive(Debug, Snafu)]
44pub enum ReloadError {
45 #[snafu(display("global options changed: {}", changed_fields.join(", ")))]
46 GlobalOptionsChanged { changed_fields: Vec<String> },
47 #[snafu(display("failed to compute global diff: {}", source))]
48 GlobalDiffFailed { source: serde_json::Error },
49 #[snafu(display("topology build failed"))]
50 TopologyBuildFailed,
51 #[snafu(display("failed to restore previous config"))]
52 FailedToRestore,
53}
54
55#[allow(dead_code)]
56pub struct RunningTopology {
57 inputs: HashMap<ComponentKey, BufferSender<EventArray>>,
58 inputs_tap_metadata: HashMap<ComponentKey, Inputs<OutputId>>,
59 outputs: HashMap<OutputId, ControlChannel>,
60 outputs_tap_metadata: HashMap<ComponentKey, (&'static str, String)>,
61 source_tasks: HashMap<ComponentKey, TaskHandle>,
62 tasks: HashMap<ComponentKey, TaskHandle>,
63 shutdown_coordinator: SourceShutdownCoordinator,
64 detach_triggers: HashMap<ComponentKey, DisabledTrigger>,
65 pub(crate) config: Config,
66 pub(crate) abort_tx: mpsc::UnboundedSender<ShutdownError>,
67 watch: (WatchTx, WatchRx),
68 pub(crate) running: Arc<AtomicBool>,
69 graceful_shutdown_duration: Option<Duration>,
70 utilization_registry: Option<UtilizationRegistry>,
71 utilization_task: Option<TaskHandle>,
72 utilization_task_shutdown_trigger: Option<Trigger>,
73 pending_reload: Option<HashSet<ComponentKey>>,
74}
75
76impl RunningTopology {
77 pub fn new(config: Config, abort_tx: mpsc::UnboundedSender<ShutdownError>) -> Self {
78 Self {
79 inputs: HashMap::new(),
80 inputs_tap_metadata: HashMap::new(),
81 outputs: HashMap::new(),
82 outputs_tap_metadata: HashMap::new(),
83 shutdown_coordinator: SourceShutdownCoordinator::default(),
84 detach_triggers: HashMap::new(),
85 source_tasks: HashMap::new(),
86 tasks: HashMap::new(),
87 abort_tx,
88 watch: watch::channel(TapResource::default()),
89 running: Arc::new(AtomicBool::new(true)),
90 graceful_shutdown_duration: config.graceful_shutdown_duration,
91 config,
92 utilization_registry: None,
93 utilization_task: None,
94 utilization_task_shutdown_trigger: None,
95 pending_reload: None,
96 }
97 }
98
99 pub const fn config(&self) -> &Config {
101 &self.config
102 }
103
104 pub fn extend_reload_set(&mut self, new_set: HashSet<ComponentKey>) {
107 match &mut self.pending_reload {
108 None => self.pending_reload = Some(new_set.clone()),
109 Some(existing) => existing.extend(new_set),
110 }
111 }
112
113 pub fn watch(&self) -> watch::Receiver<TapResource> {
117 self.watch.1.clone()
118 }
119
120 pub fn sources_finished(&self) -> future::BoxFuture<'static, ()> {
128 self.shutdown_coordinator.shutdown_tripwire()
129 }
130
131 pub fn stop(self) -> impl Future<Output = ()> {
145 self.running.store(false, Ordering::Relaxed);
147 let mut wait_handles = Vec::new();
150 let mut check_handles = HashMap::<ComponentKey, Vec<_>>::new();
153
154 let map_closure = |_result| ();
155
156 for (key, task) in self.tasks.into_iter().chain(self.source_tasks.into_iter()) {
159 let task = task.map(map_closure).shared();
160
161 wait_handles.push(task.clone());
162 check_handles.entry(key).or_default().push(task);
163 }
164
165 if let Some(utilization_task) = self.utilization_task {
166 wait_handles.push(utilization_task.map(map_closure).shared());
167 }
168
169 let deadline = self
171 .graceful_shutdown_duration
172 .map(|grace_period| Instant::now() + grace_period);
173
174 let timeout = if let Some(deadline) = deadline {
175 let mut check_handles2 = check_handles.clone();
179 Box::pin(async move {
180 sleep_until(deadline).await;
181 check_handles2.retain(|_key, handles| {
183 retain(handles, |handle| handle.peek().is_none());
184 !handles.is_empty()
185 });
186 let remaining_components = check_handles2
187 .keys()
188 .map(|item| item.to_string())
189 .collect::<Vec<_>>()
190 .join(", ");
191
192 error!(
193 components = ?remaining_components,
194 message = "Failed to gracefully shut down in time. Killing components.",
195 internal_log_rate_limit = false
196 );
197 }) as future::BoxFuture<'static, ()>
198 } else {
199 Box::pin(future::pending()) as future::BoxFuture<'static, ()>
200 };
201
202 let mut interval = interval(Duration::from_secs(5));
204 let reporter = async move {
205 loop {
206 interval.tick().await;
207
208 check_handles.retain(|_key, handles| {
210 retain(handles, |handle| handle.peek().is_none());
211 !handles.is_empty()
212 });
213 let remaining_components = check_handles
214 .keys()
215 .map(|item| item.to_string())
216 .collect::<Vec<_>>()
217 .join(", ");
218
219 let (deadline_passed, time_remaining) = match deadline {
220 Some(d) => match d.checked_duration_since(Instant::now()) {
221 Some(remaining) => (false, format!("{} seconds left", remaining.as_secs())),
222 None => (true, "overdue".to_string()),
223 },
224 None => (false, "no time limit".to_string()),
225 };
226
227 info!(
228 remaining_components = ?remaining_components,
229 time_remaining = ?time_remaining,
230 "Shutting down... Waiting on running components."
231 );
232
233 let all_done = check_handles.is_empty();
234
235 if all_done {
236 info!("Shutdown reporter exiting: all components shut down.");
237 break;
238 } else if deadline_passed {
239 error!(remaining_components = ?remaining_components, "Shutdown reporter: deadline exceeded.");
240 break;
241 }
242 }
243 };
244
245 let success = futures::future::join_all(wait_handles).map(|_| ());
247
248 let shutdown_complete_future = future::select_all(vec![
250 Box::pin(timeout) as future::BoxFuture<'static, ()>,
251 Box::pin(reporter) as future::BoxFuture<'static, ()>,
252 Box::pin(success) as future::BoxFuture<'static, ()>,
253 ]);
254
255 let source_shutdown_complete = self.shutdown_coordinator.shutdown_all(deadline);
257 if let Some(trigger) = self.utilization_task_shutdown_trigger {
258 trigger.cancel();
259 }
260
261 futures::future::join(source_shutdown_complete, shutdown_complete_future).map(|_| ())
262 }
263
264 pub async fn reload_config_and_respawn(
276 &mut self,
277 new_config: Config,
278 extra_context: ExtraContext,
279 ) -> Result<(), ReloadError> {
280 info!("Reloading running topology with new configuration.");
281
282 if self.config.global != new_config.global {
283 return match self.config.global.diff(&new_config.global) {
284 Ok(changed_fields) => Err(ReloadError::GlobalOptionsChanged { changed_fields }),
285 Err(source) => Err(ReloadError::GlobalDiffFailed { source }),
286 };
287 }
288
289 let diff = if let Some(components) = &self.pending_reload {
295 ConfigDiff::new(&self.config, &new_config, components.clone())
296 } else {
297 ConfigDiff::new(&self.config, &new_config, HashSet::new())
298 };
299 let buffers = self.shutdown_diff(&diff, &new_config).await;
300
301 if cfg!(windows) {
305 tokio::time::sleep(Duration::from_millis(200)).await;
307 }
308
309 if let Some(mut new_pieces) = TopologyPiecesBuilder::new(&new_config, &diff)
313 .with_buffers(buffers.clone())
314 .with_extra_context(extra_context.clone())
315 .with_utilization_registry(self.utilization_registry.clone())
316 .build_or_log_errors()
317 .await
318 {
319 if self
323 .run_healthchecks(&diff, &mut new_pieces, new_config.healthchecks)
324 .await
325 {
326 self.connect_diff(&diff, &mut new_pieces).await;
327 self.spawn_diff(&diff, new_pieces);
328 self.config = new_config;
329
330 info!("New configuration loaded successfully.");
331
332 return Ok(());
333 }
334 }
335
336 warn!("Failed to completely load new configuration. Restoring old configuration.");
340
341 let diff = diff.flip();
342 if let Some(mut new_pieces) = TopologyPiecesBuilder::new(&self.config, &diff)
343 .with_buffers(buffers)
344 .with_extra_context(extra_context.clone())
345 .with_utilization_registry(self.utilization_registry.clone())
346 .build_or_log_errors()
347 .await
348 && self
349 .run_healthchecks(&diff, &mut new_pieces, self.config.healthchecks)
350 .await
351 {
352 self.connect_diff(&diff, &mut new_pieces).await;
353 self.spawn_diff(&diff, new_pieces);
354
355 info!("Old configuration restored successfully.");
356
357 return Err(ReloadError::TopologyBuildFailed);
358 }
359
360 error!(
361 message = "Failed to restore old configuration.",
362 internal_log_rate_limit = false
363 );
364
365 Err(ReloadError::FailedToRestore)
366 }
367
368 pub(crate) async fn reload_enrichment_tables(&self) {
370 reload_enrichment_tables(&self.config).await;
371 }
372
373 pub(crate) async fn run_healthchecks(
374 &mut self,
375 diff: &ConfigDiff,
376 pieces: &mut TopologyPieces,
377 options: HealthcheckOptions,
378 ) -> bool {
379 if options.enabled {
380 let healthchecks = take_healthchecks(diff, pieces)
381 .into_iter()
382 .map(|(_, task)| task);
383 let healthchecks = future::try_join_all(healthchecks);
384
385 info!("Running healthchecks.");
386 if options.require_healthy {
387 let success = healthchecks.await;
388
389 if success.is_ok() {
390 info!("All healthchecks passed.");
391 true
392 } else {
393 error!(
394 message = "Sinks unhealthy.",
395 internal_log_rate_limit = false
396 );
397 false
398 }
399 } else {
400 tokio::spawn(healthchecks);
401 true
402 }
403 } else {
404 true
405 }
406 }
407
408 async fn shutdown_diff(
412 &mut self,
413 diff: &ConfigDiff,
414 new_config: &Config,
415 ) -> HashMap<ComponentKey, BuiltBuffer> {
416 if diff.sources.any_changed_or_removed() {
419 let timeout = Duration::from_secs(30);
420 let mut source_shutdown_handles = Vec::new();
421
422 let deadline = Instant::now() + timeout;
423 for key in &diff.sources.to_remove {
424 debug!(component_id = %key, "Removing source.");
425
426 let previous = self.tasks.remove(key).unwrap();
427 drop(previous); self.remove_outputs(key);
430 source_shutdown_handles
431 .push(self.shutdown_coordinator.shutdown_source(key, deadline));
432 }
433
434 for key in &diff.sources.to_change {
435 debug!(component_id = %key, "Changing source.");
436
437 self.remove_outputs(key);
438 source_shutdown_handles
439 .push(self.shutdown_coordinator.shutdown_source(key, deadline));
440 }
441
442 debug!(
443 "Waiting for up to {} seconds for source(s) to finish shutting down.",
444 timeout.as_secs()
445 );
446 futures::future::join_all(source_shutdown_handles).await;
447
448 for key in diff.sources.removed_and_changed() {
450 if let Some(task) = self.source_tasks.remove(key) {
451 task.await.unwrap().unwrap();
452 }
453 }
454 }
455
456 for key in &diff.transforms.to_remove {
464 debug!(component_id = %key, "Removing transform.");
465
466 let previous = self.tasks.remove(key).unwrap();
467 drop(previous); self.remove_inputs(key, diff, new_config).await;
470 self.remove_outputs(key);
471
472 if let Some(registry) = self.utilization_registry.as_ref() {
473 registry.remove_component(key);
474 }
475 }
476
477 for key in &diff.transforms.to_change {
478 debug!(component_id = %key, "Changing transform.");
479
480 self.remove_inputs(key, diff, new_config).await;
481 self.remove_outputs(key);
482 }
483
484 let removed_table_sinks = diff
490 .enrichment_tables
491 .removed_and_changed()
492 .filter_map(|key| {
493 self.config
494 .enrichment_table(key)
495 .and_then(|t| t.as_sink(key))
496 .map(|(key, s)| (key.clone(), s.resources(&key)))
497 })
498 .collect::<Vec<_>>();
499 let remove_sink = diff
500 .sinks
501 .removed_and_changed()
502 .map(|key| {
503 (
504 key,
505 self.config
506 .sink(key)
507 .map(|s| s.resources(key))
508 .unwrap_or_default(),
509 )
510 })
511 .chain(removed_table_sinks.iter().map(|(k, s)| (k, s.clone())));
512 let add_source = diff
513 .sources
514 .changed_and_added()
515 .map(|key| (key, new_config.source(key).unwrap().inner.resources()));
516 let added_table_sinks = diff
517 .enrichment_tables
518 .changed_and_added()
519 .filter_map(|key| {
520 self.config
521 .enrichment_table(key)
522 .and_then(|t| t.as_sink(key))
523 .map(|(key, s)| (key.clone(), s.resources(&key)))
524 })
525 .collect::<Vec<_>>();
526 let add_sink = diff
527 .sinks
528 .changed_and_added()
529 .map(|key| {
530 (
531 key,
532 new_config
533 .sink(key)
534 .map(|s| s.resources(key))
535 .unwrap_or_default(),
536 )
537 })
538 .chain(added_table_sinks.iter().map(|(k, s)| (k, s.clone())));
539 let conflicts = Resource::conflicts(
540 remove_sink.map(|(key, value)| ((true, key), value)).chain(
541 add_sink
542 .chain(add_source)
543 .map(|(key, value)| ((false, key), value)),
544 ),
545 )
546 .into_iter()
547 .flat_map(|(_, components)| components)
548 .collect::<HashSet<_>>();
549 let conflicting_sinks = conflicts
551 .into_iter()
552 .filter(|&(existing_sink, _)| existing_sink)
553 .map(|(_, key)| key.clone());
554
555 let reuse_buffers = diff
557 .sinks
558 .to_change
559 .iter()
560 .filter(|&key| {
561 if diff.components_to_reload.contains(key) {
562 return false;
563 }
564 self.config.sink(key).map(|s| s.buffer.clone()).or_else(|| {
565 self.config
566 .enrichment_table(key)
567 .and_then(|t| t.as_sink(key))
568 .map(|(_, s)| s.buffer)
569 }) == new_config.sink(key).map(|s| s.buffer.clone()).or_else(|| {
570 self.config
571 .enrichment_table(key)
572 .and_then(|t| t.as_sink(key))
573 .map(|(_, s)| s.buffer)
574 })
575 })
576 .cloned()
577 .collect::<HashSet<_>>();
578
579 let wait_for_sinks = conflicting_sinks
583 .chain(reuse_buffers.iter().cloned())
584 .collect::<HashSet<_>>();
585
586 let removed_sinks = diff
588 .sinks
589 .to_remove
590 .iter()
591 .chain(diff.enrichment_tables.to_remove.iter().filter(|key| {
592 self.config
593 .enrichment_table(key)
594 .and_then(|t| t.as_sink(key))
595 .is_some()
596 }))
597 .collect::<Vec<_>>();
598 for key in &removed_sinks {
599 debug!(component_id = %key, "Removing sink.");
600 self.remove_inputs(key, diff, new_config).await;
601
602 if let Some(registry) = self.utilization_registry.as_ref() {
603 registry.remove_component(key);
604 }
605 }
606
607 let mut buffer_tx = HashMap::new();
610
611 let sinks_to_change = diff
612 .sinks
613 .to_change
614 .iter()
615 .chain(diff.enrichment_tables.to_change.iter().filter(|key| {
616 self.config
617 .enrichment_table(key)
618 .and_then(|t| t.as_sink(key))
619 .is_some()
620 }))
621 .collect::<Vec<_>>();
622
623 for key in &sinks_to_change {
624 debug!(component_id = %key, "Changing sink.");
625 if reuse_buffers.contains(key) {
626 self.detach_triggers
627 .remove(key)
628 .unwrap()
629 .into_inner()
630 .cancel();
631
632 buffer_tx.insert((*key).clone(), self.inputs.get(key).unwrap().clone());
643 }
644 self.remove_inputs(key, diff, new_config).await;
645 }
646
647 for key in &removed_sinks {
654 let previous = self.tasks.remove(key).unwrap();
655 if wait_for_sinks.contains(key) {
656 debug!(message = "Waiting for sink to shutdown.", component_id = %key);
657 previous.await.unwrap().unwrap();
658 } else {
659 drop(previous); }
661 }
662
663 let mut buffers = HashMap::<ComponentKey, BuiltBuffer>::new();
664 for key in &sinks_to_change {
665 if wait_for_sinks.contains(key) {
666 let previous = self.tasks.remove(key).unwrap();
667 debug!(message = "Waiting for sink to shutdown.", component_id = %key);
668 let buffer = previous.await.unwrap().unwrap();
669
670 if reuse_buffers.contains(key) {
671 let tx = buffer_tx.remove(key).unwrap();
679 let rx = match buffer {
680 TaskOutput::Sink(rx) => rx.into_inner(),
681 _ => unreachable!(),
682 };
683
684 buffers.insert((*key).clone(), (tx, Arc::new(Mutex::new(Some(rx)))));
685 }
686 }
687 }
688
689 buffers
690 }
691
692 pub(crate) async fn connect_diff(
694 &mut self,
695 diff: &ConfigDiff,
696 new_pieces: &mut TopologyPieces,
697 ) {
698 debug!("Connecting changed/added component(s).");
699
700 if !self.watch.0.is_closed() {
702 for key in &diff.sources.to_remove {
703 self.outputs_tap_metadata.remove(key);
705 }
706
707 for key in &diff.transforms.to_remove {
708 self.outputs_tap_metadata.remove(key);
710 self.inputs_tap_metadata.remove(key);
711 }
712
713 for key in &diff.sinks.to_remove {
714 self.inputs_tap_metadata.remove(key);
716 }
717
718 let removed_sinks = diff.enrichment_tables.to_remove.iter().filter(|key| {
719 self.config
720 .enrichment_table(key)
721 .and_then(|t| t.as_sink(key))
722 .is_some()
723 });
724 for key in removed_sinks {
725 self.inputs_tap_metadata.remove(key);
727 }
728
729 let removed_sources = diff.enrichment_tables.to_remove.iter().filter_map(|key| {
730 self.config
731 .enrichment_table(key)
732 .and_then(|t| t.as_source(key).map(|(key, _)| key))
733 });
734 for key in removed_sources {
735 self.outputs_tap_metadata.remove(&key);
737 }
738
739 for key in diff.sources.changed_and_added() {
740 if let Some(task) = new_pieces.tasks.get(key) {
741 self.outputs_tap_metadata
742 .insert(key.clone(), ("source", task.typetag().to_string()));
743 }
744 }
745
746 for key in diff
747 .enrichment_tables
748 .changed_and_added()
749 .filter_map(|key| {
750 self.config
751 .enrichment_table(key)
752 .and_then(|t| t.as_source(key).map(|(key, _)| key))
753 })
754 {
755 if let Some(task) = new_pieces.tasks.get(&key) {
756 self.outputs_tap_metadata
757 .insert(key.clone(), ("source", task.typetag().to_string()));
758 }
759 }
760
761 for key in diff.transforms.changed_and_added() {
762 if let Some(task) = new_pieces.tasks.get(key) {
763 self.outputs_tap_metadata
764 .insert(key.clone(), ("transform", task.typetag().to_string()));
765 }
766 }
767
768 for (key, input) in &new_pieces.inputs {
769 self.inputs_tap_metadata
770 .insert(key.clone(), input.1.clone());
771 }
772 }
773
774 for key in diff.sources.changed_and_added() {
777 debug!(component_id = %key, "Configuring outputs for source.");
778 self.setup_outputs(key, new_pieces).await;
779 }
780
781 let added_changed_table_sources: Vec<&ComponentKey> = diff
782 .enrichment_tables
783 .changed_and_added()
784 .filter(|k| new_pieces.source_tasks.contains_key(k))
785 .collect();
786 for key in added_changed_table_sources.iter() {
787 debug!(component_id = %key, "Connecting outputs for enrichment table source.");
788 self.setup_outputs(key, new_pieces).await;
789 }
790
791 for key in diff.transforms.changed_and_added() {
794 debug!(component_id = %key, "Configuring outputs for transform.");
795 self.setup_outputs(key, new_pieces).await;
796 }
797
798 for key in diff.transforms.changed_and_added() {
801 debug!(component_id = %key, "Connecting inputs for transform.");
802 self.setup_inputs(key, diff, new_pieces).await;
803 }
804
805 for key in diff.sinks.changed_and_added() {
807 debug!(component_id = %key, "Connecting inputs for sink.");
808 self.setup_inputs(key, diff, new_pieces).await;
809 }
810 let added_changed_tables: Vec<&ComponentKey> = diff
811 .enrichment_tables
812 .changed_and_added()
813 .filter(|k| new_pieces.inputs.contains_key(k))
814 .collect();
815 for key in added_changed_tables.iter() {
816 debug!(component_id = %key, "Connecting inputs for enrichment table sink.");
817 self.setup_inputs(key, diff, new_pieces).await;
818 }
819
820 self.reattach_severed_inputs(diff);
831
832 if !self.watch.0.is_closed() {
834 let outputs = self
835 .outputs
836 .clone()
837 .into_iter()
838 .flat_map(|(output_id, control_tx)| {
839 self.outputs_tap_metadata.get(&output_id.component).map(
840 |(component_kind, component_type)| {
841 (
842 TapOutput {
843 output_id,
844 component_kind,
845 component_type: component_type.clone(),
846 },
847 control_tx,
848 )
849 },
850 )
851 })
852 .collect::<HashMap<_, _>>();
853
854 let mut removals = diff.sources.to_remove.clone();
855 removals.extend(diff.transforms.to_remove.iter().cloned());
856 self.watch
857 .0
858 .send(TapResource {
859 outputs,
860 inputs: self.inputs_tap_metadata.clone(),
861 source_keys: diff
862 .sources
863 .changed_and_added()
864 .map(|key| key.to_string())
865 .chain(
866 added_changed_table_sources
867 .iter()
868 .map(|key| key.to_string()),
869 )
870 .collect(),
871 sink_keys: diff
872 .sinks
873 .changed_and_added()
874 .map(|key| key.to_string())
875 .chain(added_changed_tables.iter().map(|key| key.to_string()))
876 .collect(),
877 removals,
880 })
881 .expect("Couldn't broadcast config changes.");
882 }
883 }
884
885 async fn setup_outputs(
886 &mut self,
887 key: &ComponentKey,
888 new_pieces: &mut builder::TopologyPieces,
889 ) {
890 let outputs = new_pieces.outputs.remove(key).unwrap();
891 for (port, output) in outputs {
892 debug!(component_id = %key, output_id = ?port, "Configuring output for component.");
893
894 let id = OutputId {
895 component: key.clone(),
896 port,
897 };
898
899 self.outputs.insert(id, output);
900 }
901 }
902
903 async fn setup_inputs(
904 &mut self,
905 key: &ComponentKey,
906 diff: &ConfigDiff,
907 new_pieces: &mut builder::TopologyPieces,
908 ) {
909 let (tx, inputs) = new_pieces.inputs.remove(key).unwrap();
910
911 let old_inputs = self
912 .config
913 .inputs_for_node(key)
914 .into_iter()
915 .flatten()
916 .cloned()
917 .collect::<HashSet<_>>();
918
919 let new_inputs = inputs.iter().cloned().collect::<HashSet<_>>();
920 let inputs_to_add = &new_inputs - &old_inputs;
921
922 for input in inputs {
923 let output = self.outputs.get_mut(&input).expect("unknown output");
924
925 if diff.contains(&input.component) || inputs_to_add.contains(&input) {
926 debug!(component_id = %key, fanout_id = %input, "Adding component input to fanout.");
930
931 _ = output.send(ControlMessage::Add(key.clone(), tx.clone()));
932 } else {
933 debug!(component_id = %key, fanout_id = %input, "Replacing component input in fanout.");
938
939 _ = output.send(ControlMessage::Replace(key.clone(), tx.clone()));
940 }
941 }
942
943 self.inputs.insert(key.clone(), tx);
944 new_pieces
945 .detach_triggers
946 .remove(key)
947 .map(|trigger| self.detach_triggers.insert(key.clone(), trigger.into()));
948 }
949
950 fn remove_outputs(&mut self, key: &ComponentKey) {
951 self.outputs.retain(|id, _output| &id.component != key);
952 }
953
954 async fn remove_inputs(&mut self, key: &ComponentKey, diff: &ConfigDiff, new_config: &Config) {
955 self.inputs.remove(key);
956 self.detach_triggers.remove(key);
957
958 let old_inputs = self.config.inputs_for_node(key).expect("node exists");
959 let new_inputs = new_config
960 .inputs_for_node(key)
961 .unwrap_or_default()
962 .iter()
963 .collect::<HashSet<_>>();
964
965 for input in old_inputs {
966 if let Some(output) = self.outputs.get_mut(input) {
967 if diff.contains(&input.component)
968 || diff.is_removed(key)
969 || !new_inputs.contains(input)
970 {
971 debug!(component_id = %key, fanout_id = %input, "Removing component input from fanout.");
982
983 _ = output.send(ControlMessage::Remove(key.clone()));
984 } else {
985 debug!(component_id = %key, fanout_id = %input, "Pausing component input in fanout.");
989
990 _ = output.send(ControlMessage::Pause(key.clone()));
991 }
992 }
993 }
994 }
995
996 fn reattach_severed_inputs(&mut self, diff: &ConfigDiff) {
997 let unchanged_transforms = self
998 .config
999 .transforms()
1000 .filter(|(key, _)| !diff.transforms.contains(key));
1001 for (transform_key, transform) in unchanged_transforms {
1002 let changed_outputs = get_changed_outputs(diff, transform.inputs.clone());
1003 for output_id in changed_outputs {
1004 debug!(component_id = %transform_key, fanout_id = %output_id.component, "Reattaching component input to fanout.");
1005
1006 let input = self.inputs.get(transform_key).cloned().unwrap();
1007 let output = self.outputs.get_mut(&output_id).unwrap();
1008 _ = output.send(ControlMessage::Add(transform_key.clone(), input));
1009 }
1010 }
1011
1012 let unchanged_sinks = self
1013 .config
1014 .sinks()
1015 .filter(|(key, _)| !diff.sinks.contains(key));
1016 for (sink_key, sink) in unchanged_sinks {
1017 let changed_outputs = get_changed_outputs(diff, sink.inputs.clone());
1018 for output_id in changed_outputs {
1019 debug!(component_id = %sink_key, fanout_id = %output_id.component, "Reattaching component input to fanout.");
1020
1021 let input = self.inputs.get(sink_key).cloned().unwrap();
1022 let output = self.outputs.get_mut(&output_id).unwrap();
1023 _ = output.send(ControlMessage::Add(sink_key.clone(), input));
1024 }
1025 }
1026 }
1027
1028 pub(crate) fn spawn_diff(&mut self, diff: &ConfigDiff, mut new_pieces: TopologyPieces) {
1030 for key in &diff.sources.to_change {
1031 debug!(message = "Spawning changed source.", component_id = %key);
1032 self.spawn_source(key, &mut new_pieces);
1033 }
1034
1035 for key in &diff.sources.to_add {
1036 debug!(message = "Spawning new source.", component_id = %key);
1037 self.spawn_source(key, &mut new_pieces);
1038 }
1039
1040 let changed_table_sources: Vec<&ComponentKey> = diff
1041 .enrichment_tables
1042 .to_change
1043 .iter()
1044 .filter(|k| new_pieces.source_tasks.contains_key(k))
1045 .collect();
1046
1047 let added_table_sources: Vec<&ComponentKey> = diff
1048 .enrichment_tables
1049 .to_add
1050 .iter()
1051 .filter(|k| new_pieces.source_tasks.contains_key(k))
1052 .collect();
1053
1054 for key in changed_table_sources {
1055 debug!(message = "Spawning changed enrichment table source.", component_id = %key);
1056 self.spawn_source(key, &mut new_pieces);
1057 }
1058
1059 for key in added_table_sources {
1060 debug!(message = "Spawning new enrichment table source.", component_id = %key);
1061 self.spawn_source(key, &mut new_pieces);
1062 }
1063
1064 for key in &diff.transforms.to_change {
1065 debug!(message = "Spawning changed transform.", component_id = %key);
1066 self.spawn_transform(key, &mut new_pieces);
1067 }
1068
1069 for key in &diff.transforms.to_add {
1070 debug!(message = "Spawning new transform.", component_id = %key);
1071 self.spawn_transform(key, &mut new_pieces);
1072 }
1073
1074 for key in &diff.sinks.to_change {
1075 debug!(message = "Spawning changed sink.", component_id = %key);
1076 self.spawn_sink(key, &mut new_pieces);
1077 }
1078
1079 for key in &diff.sinks.to_add {
1080 trace!(message = "Spawning new sink.", component_id = %key);
1081 self.spawn_sink(key, &mut new_pieces);
1082 }
1083
1084 let changed_tables: Vec<&ComponentKey> = diff
1085 .enrichment_tables
1086 .to_change
1087 .iter()
1088 .filter(|k| {
1089 new_pieces.tasks.contains_key(k) && !new_pieces.source_tasks.contains_key(k)
1090 })
1091 .collect();
1092
1093 let added_tables: Vec<&ComponentKey> = diff
1094 .enrichment_tables
1095 .to_add
1096 .iter()
1097 .filter(|k| {
1098 new_pieces.tasks.contains_key(k) && !new_pieces.source_tasks.contains_key(k)
1099 })
1100 .collect();
1101
1102 for key in changed_tables {
1103 debug!(message = "Spawning changed enrichment table sink.", component_id = %key);
1104 self.spawn_sink(key, &mut new_pieces);
1105 }
1106
1107 for key in added_tables {
1108 debug!(message = "Spawning enrichment table new sink.", component_id = %key);
1109 self.spawn_sink(key, &mut new_pieces);
1110 }
1111 }
1112
1113 fn spawn_sink(&mut self, key: &ComponentKey, new_pieces: &mut builder::TopologyPieces) {
1114 let task = new_pieces.tasks.remove(key).unwrap();
1115 let span = error_span!(
1116 "sink",
1117 component_kind = "sink",
1118 component_id = %task.id(),
1119 component_type = %task.typetag(),
1120 );
1121
1122 let task_span = span.or_current();
1123 #[cfg(feature = "allocation-tracing")]
1124 if crate::internal_telemetry::allocations::is_allocation_tracing_enabled() {
1125 let group_id = crate::internal_telemetry::allocations::acquire_allocation_group_id(
1126 task.id().to_string(),
1127 "sink".to_string(),
1128 task.typetag().to_string(),
1129 );
1130 debug!(
1131 component_kind = "sink",
1132 component_type = task.typetag(),
1133 component_id = task.id(),
1134 group_id = group_id.as_raw().to_string(),
1135 "Registered new allocation group."
1136 );
1137 group_id.attach_to_span(&task_span);
1138 }
1139
1140 let task_name = format!(">> {} ({})", task.typetag(), task.id());
1141 let task = {
1142 let key = key.clone();
1143 handle_errors(task, self.abort_tx.clone(), |error| {
1144 ShutdownError::SinkAborted { key, error }
1145 })
1146 }
1147 .instrument(task_span);
1148 let spawned = spawn_named(task, task_name.as_ref());
1149 if let Some(previous) = self.tasks.insert(key.clone(), spawned) {
1150 drop(previous); }
1152 }
1153
1154 fn spawn_transform(&mut self, key: &ComponentKey, new_pieces: &mut builder::TopologyPieces) {
1155 let task = new_pieces.tasks.remove(key).unwrap();
1156 let span = error_span!(
1157 "transform",
1158 component_kind = "transform",
1159 component_id = %task.id(),
1160 component_type = %task.typetag(),
1161 );
1162
1163 let task_span = span.or_current();
1164 #[cfg(feature = "allocation-tracing")]
1165 if crate::internal_telemetry::allocations::is_allocation_tracing_enabled() {
1166 let group_id = crate::internal_telemetry::allocations::acquire_allocation_group_id(
1167 task.id().to_string(),
1168 "transform".to_string(),
1169 task.typetag().to_string(),
1170 );
1171 debug!(
1172 component_kind = "transform",
1173 component_type = task.typetag(),
1174 component_id = task.id(),
1175 group_id = group_id.as_raw().to_string(),
1176 "Registered new allocation group."
1177 );
1178 group_id.attach_to_span(&task_span);
1179 }
1180
1181 let task_name = format!(">> {} ({}) >>", task.typetag(), task.id());
1182 let task = {
1183 let key = key.clone();
1184 handle_errors(task, self.abort_tx.clone(), |error| {
1185 ShutdownError::TransformAborted { key, error }
1186 })
1187 }
1188 .instrument(task_span);
1189 let spawned = spawn_named(task, task_name.as_ref());
1190 if let Some(previous) = self.tasks.insert(key.clone(), spawned) {
1191 drop(previous); }
1193 }
1194
1195 fn spawn_source(&mut self, key: &ComponentKey, new_pieces: &mut builder::TopologyPieces) {
1196 let task = new_pieces.tasks.remove(key).unwrap();
1197 let span = error_span!(
1198 "source",
1199 component_kind = "source",
1200 component_id = %task.id(),
1201 component_type = %task.typetag(),
1202 );
1203
1204 let task_span = span.or_current();
1205 #[cfg(feature = "allocation-tracing")]
1206 if crate::internal_telemetry::allocations::is_allocation_tracing_enabled() {
1207 let group_id = crate::internal_telemetry::allocations::acquire_allocation_group_id(
1208 task.id().to_string(),
1209 "source".to_string(),
1210 task.typetag().to_string(),
1211 );
1212
1213 debug!(
1214 component_kind = "source",
1215 component_type = task.typetag(),
1216 component_id = task.id(),
1217 group_id = group_id.as_raw().to_string(),
1218 "Registered new allocation group."
1219 );
1220 group_id.attach_to_span(&task_span);
1221 }
1222
1223 let task_name = format!("{} ({}) >>", task.typetag(), task.id());
1224 let task = {
1225 let key = key.clone();
1226 handle_errors(task, self.abort_tx.clone(), |error| {
1227 ShutdownError::SourceAborted { key, error }
1228 })
1229 }
1230 .instrument(task_span.clone());
1231 let spawned = spawn_named(task, task_name.as_ref());
1232 if let Some(previous) = self.tasks.insert(key.clone(), spawned) {
1233 drop(previous); }
1235
1236 self.shutdown_coordinator
1237 .takeover_source(key, &mut new_pieces.shutdown_coordinator);
1238
1239 let source_task = new_pieces.source_tasks.remove(key).unwrap();
1241 let source_task = {
1242 let key = key.clone();
1243 handle_errors(source_task, self.abort_tx.clone(), |error| {
1244 ShutdownError::SourceAborted { key, error }
1245 })
1246 }
1247 .instrument(task_span);
1248 self.source_tasks
1249 .insert(key.clone(), spawn_named(source_task, task_name.as_ref()));
1250 }
1251
1252 pub async fn start_init_validated(
1253 config: Config,
1254 extra_context: ExtraContext,
1255 ) -> Option<(Self, ShutdownErrorReceiver)> {
1256 let diff = ConfigDiff::initial(&config);
1257 let pieces = TopologyPiecesBuilder::new(&config, &diff)
1258 .with_extra_context(extra_context)
1259 .build_or_log_errors()
1260 .await?;
1261 Self::start_validated(config, diff, pieces).await
1262 }
1263
1264 pub async fn start_validated(
1265 config: Config,
1266 diff: ConfigDiff,
1267 mut pieces: TopologyPieces,
1268 ) -> Option<(Self, ShutdownErrorReceiver)> {
1269 let (abort_tx, abort_rx) = mpsc::unbounded_channel();
1270
1271 let expire_metrics = match (
1272 config.global.expire_metrics,
1273 config.global.expire_metrics_secs,
1274 ) {
1275 (Some(e), None) => {
1276 warn!(
1277 "DEPRECATED: `expire_metrics` setting is deprecated and will be removed in a future version. Use `expire_metrics_secs` instead."
1278 );
1279 if e < Duration::from_secs(0) {
1280 None
1281 } else {
1282 Some(e.as_secs_f64())
1283 }
1284 }
1285 (Some(_), Some(_)) => {
1286 error!(
1287 message = "Cannot set both `expire_metrics` and `expire_metrics_secs`.",
1288 internal_log_rate_limit = false
1289 );
1290 return None;
1291 }
1292 (None, Some(e)) => {
1293 if e < 0f64 {
1294 None
1295 } else {
1296 Some(e)
1297 }
1298 }
1299 (None, None) => Some(300f64),
1300 };
1301
1302 if let Err(error) = crate::metrics::Controller::get()
1303 .expect("Metrics must be initialized")
1304 .set_expiry(
1305 expire_metrics,
1306 config
1307 .global
1308 .expire_metrics_per_metric_set
1309 .clone()
1310 .unwrap_or_default(),
1311 )
1312 {
1313 error!(message = "Invalid metrics expiry.", %error, internal_log_rate_limit = false);
1314 return None;
1315 }
1316
1317 let (utilization_emitter, utilization_registry) = pieces
1318 .utilization
1319 .take()
1320 .expect("Topology is missing the utilization metric emitter!");
1321 let mut running_topology = Self::new(config, abort_tx);
1322
1323 if !running_topology
1324 .run_healthchecks(&diff, &mut pieces, running_topology.config.healthchecks)
1325 .await
1326 {
1327 return None;
1328 }
1329 running_topology.connect_diff(&diff, &mut pieces).await;
1330 running_topology.spawn_diff(&diff, pieces);
1331
1332 let (utilization_task_shutdown_trigger, utilization_shutdown_signal, _) =
1333 ShutdownSignal::new_wired();
1334 running_topology.utilization_registry = Some(utilization_registry.clone());
1335 running_topology.utilization_task_shutdown_trigger =
1336 Some(utilization_task_shutdown_trigger);
1337 running_topology.utilization_task = Some(tokio::spawn(Task::new(
1338 "utilization_heartbeat".into(),
1339 "",
1340 async move {
1341 utilization_emitter
1342 .run_utilization(utilization_shutdown_signal)
1343 .await;
1344 Ok(TaskOutput::Healthcheck)
1347 },
1348 )));
1349
1350 Some((running_topology, abort_rx))
1351 }
1352}
1353
1354fn get_changed_outputs(diff: &ConfigDiff, output_ids: Inputs<OutputId>) -> Vec<OutputId> {
1355 let mut changed_outputs = Vec::new();
1356
1357 for source_key in &diff.sources.to_change {
1358 changed_outputs.extend(
1359 output_ids
1360 .iter()
1361 .filter(|id| &id.component == source_key)
1362 .cloned(),
1363 );
1364 }
1365
1366 for transform_key in &diff.transforms.to_change {
1367 changed_outputs.extend(
1368 output_ids
1369 .iter()
1370 .filter(|id| &id.component == transform_key)
1371 .cloned(),
1372 );
1373 }
1374
1375 changed_outputs
1376}