1use std::{
2 collections::{HashMap, HashSet},
3 sync::{
4 Arc, Mutex,
5 atomic::{AtomicBool, Ordering},
6 },
7};
8
9use futures::{Future, FutureExt, future};
10use stream_cancel::Trigger;
11use tokio::{
12 sync::{mpsc, watch},
13 time::{Duration, Instant, interval, sleep_until},
14};
15use tracing::Instrument;
16use vector_lib::{
17 buffers::topology::channel::BufferSender,
18 shutdown::ShutdownSignal,
19 tap::topology::{TapOutput, TapResource, WatchRx, WatchTx},
20 trigger::DisabledTrigger,
21};
22
23use super::{
24 BuiltBuffer, TaskHandle,
25 builder::{self, TopologyPieces, reload_enrichment_tables},
26 fanout::{ControlChannel, ControlMessage},
27 handle_errors, retain, take_healthchecks,
28 task::{Task, TaskOutput},
29};
30use crate::{
31 config::{ComponentKey, Config, ConfigDiff, HealthcheckOptions, Inputs, OutputId, Resource},
32 event::EventArray,
33 extra_context::ExtraContext,
34 shutdown::SourceShutdownCoordinator,
35 signal::ShutdownError,
36 spawn_named,
37};
38
39pub type ShutdownErrorReceiver = mpsc::UnboundedReceiver<ShutdownError>;
40
41#[allow(dead_code)]
42pub struct RunningTopology {
43 inputs: HashMap<ComponentKey, BufferSender<EventArray>>,
44 inputs_tap_metadata: HashMap<ComponentKey, Inputs<OutputId>>,
45 outputs: HashMap<OutputId, ControlChannel>,
46 outputs_tap_metadata: HashMap<ComponentKey, (&'static str, String)>,
47 source_tasks: HashMap<ComponentKey, TaskHandle>,
48 tasks: HashMap<ComponentKey, TaskHandle>,
49 shutdown_coordinator: SourceShutdownCoordinator,
50 detach_triggers: HashMap<ComponentKey, DisabledTrigger>,
51 pub(crate) config: Config,
52 pub(crate) abort_tx: mpsc::UnboundedSender<ShutdownError>,
53 watch: (WatchTx, WatchRx),
54 pub(crate) running: Arc<AtomicBool>,
55 graceful_shutdown_duration: Option<Duration>,
56 utilization_task: Option<TaskHandle>,
57 utilization_task_shutdown_trigger: Option<Trigger>,
58 pending_reload: Option<HashSet<ComponentKey>>,
59}
60
61impl RunningTopology {
62 pub fn new(config: Config, abort_tx: mpsc::UnboundedSender<ShutdownError>) -> Self {
63 Self {
64 inputs: HashMap::new(),
65 inputs_tap_metadata: HashMap::new(),
66 outputs: HashMap::new(),
67 outputs_tap_metadata: HashMap::new(),
68 shutdown_coordinator: SourceShutdownCoordinator::default(),
69 detach_triggers: HashMap::new(),
70 source_tasks: HashMap::new(),
71 tasks: HashMap::new(),
72 abort_tx,
73 watch: watch::channel(TapResource::default()),
74 running: Arc::new(AtomicBool::new(true)),
75 graceful_shutdown_duration: config.graceful_shutdown_duration,
76 config,
77 utilization_task: None,
78 utilization_task_shutdown_trigger: None,
79 pending_reload: None,
80 }
81 }
82
83 pub const fn config(&self) -> &Config {
85 &self.config
86 }
87
88 pub fn extend_reload_set(&mut self, new_set: HashSet<ComponentKey>) {
91 match &mut self.pending_reload {
92 None => self.pending_reload = Some(new_set.clone()),
93 Some(existing) => existing.extend(new_set),
94 }
95 }
96
97 pub fn watch(&self) -> watch::Receiver<TapResource> {
101 self.watch.1.clone()
102 }
103
104 pub fn sources_finished(&self) -> future::BoxFuture<'static, ()> {
112 self.shutdown_coordinator.shutdown_tripwire()
113 }
114
115 pub fn stop(self) -> impl Future<Output = ()> {
129 self.running.store(false, Ordering::Relaxed);
131 let mut wait_handles = Vec::new();
134 let mut check_handles = HashMap::<ComponentKey, Vec<_>>::new();
137
138 let map_closure = |_result| ();
139
140 for (key, task) in self.tasks.into_iter().chain(self.source_tasks.into_iter()) {
143 let task = task.map(map_closure).shared();
144
145 wait_handles.push(task.clone());
146 check_handles.entry(key).or_default().push(task);
147 }
148
149 if let Some(utilization_task) = self.utilization_task {
150 wait_handles.push(utilization_task.map(map_closure).shared());
151 }
152
153 let deadline = self
155 .graceful_shutdown_duration
156 .map(|grace_period| Instant::now() + grace_period);
157
158 let timeout = if let Some(deadline) = deadline {
159 let mut check_handles2 = check_handles.clone();
163 Box::pin(async move {
164 sleep_until(deadline).await;
165 check_handles2.retain(|_key, handles| {
167 retain(handles, |handle| handle.peek().is_none());
168 !handles.is_empty()
169 });
170 let remaining_components = check_handles2
171 .keys()
172 .map(|item| item.to_string())
173 .collect::<Vec<_>>()
174 .join(", ");
175
176 error!(
177 components = ?remaining_components,
178 "Failed to gracefully shut down in time. Killing components."
179 );
180 }) as future::BoxFuture<'static, ()>
181 } else {
182 Box::pin(future::pending()) as future::BoxFuture<'static, ()>
183 };
184
185 let mut interval = interval(Duration::from_secs(5));
187 let reporter = async move {
188 loop {
189 interval.tick().await;
190
191 check_handles.retain(|_key, handles| {
193 retain(handles, |handle| handle.peek().is_none());
194 !handles.is_empty()
195 });
196 let remaining_components = check_handles
197 .keys()
198 .map(|item| item.to_string())
199 .collect::<Vec<_>>()
200 .join(", ");
201
202 let (deadline_passed, time_remaining) = match deadline {
203 Some(d) => match d.checked_duration_since(Instant::now()) {
204 Some(remaining) => (false, format!("{} seconds left", remaining.as_secs())),
205 None => (true, "overdue".to_string()),
206 },
207 None => (false, "no time limit".to_string()),
208 };
209
210 info!(
211 remaining_components = ?remaining_components,
212 time_remaining = ?time_remaining,
213 "Shutting down... Waiting on running components."
214 );
215
216 let all_done = check_handles.is_empty();
217
218 if all_done {
219 info!("Shutdown reporter exiting: all components shut down.");
220 break;
221 } else if deadline_passed {
222 error!(remaining_components = ?remaining_components, "Shutdown reporter: deadline exceeded.");
223 break;
224 }
225 }
226 };
227
228 let success = futures::future::join_all(wait_handles).map(|_| ());
230
231 let shutdown_complete_future = future::select_all(vec![
233 Box::pin(timeout) as future::BoxFuture<'static, ()>,
234 Box::pin(reporter) as future::BoxFuture<'static, ()>,
235 Box::pin(success) as future::BoxFuture<'static, ()>,
236 ]);
237
238 let source_shutdown_complete = self.shutdown_coordinator.shutdown_all(deadline);
240 if let Some(trigger) = self.utilization_task_shutdown_trigger {
241 trigger.cancel();
242 }
243
244 futures::future::join(source_shutdown_complete, shutdown_complete_future).map(|_| ())
245 }
246
247 pub async fn reload_config_and_respawn(
263 &mut self,
264 new_config: Config,
265 extra_context: ExtraContext,
266 ) -> Result<bool, ()> {
267 info!("Reloading running topology with new configuration.");
268
269 if self.config.global != new_config.global {
270 match self.config.global.diff(&new_config.global) {
271 Ok(changed) => {
272 error!(
273 message = "Global options changed; reload aborted.",
274 changed_fields = ?changed
275 );
276 }
277 Err(err) => {
278 error!(
279 message = "Failed to compute config diff; reload aborted.",
280 %err
281 );
282 }
283 }
284 error!(
285 message = "Global options can't be changed while reloading config file; reload aborted. Please restart Vector to reload the configuration file."
286 );
287 return Ok(false);
288 }
289
290 let diff = if let Some(components) = &self.pending_reload {
296 ConfigDiff::new(&self.config, &new_config, components.clone())
297 } else {
298 ConfigDiff::new(&self.config, &new_config, HashSet::new())
299 };
300 let buffers = self.shutdown_diff(&diff, &new_config).await;
301
302 if cfg!(windows) {
306 tokio::time::sleep(Duration::from_millis(200)).await;
308 }
309
310 if let Some(mut new_pieces) = TopologyPieces::build_or_log_errors(
314 &new_config,
315 &diff,
316 buffers.clone(),
317 extra_context.clone(),
318 )
319 .await
320 {
321 if self
325 .run_healthchecks(&diff, &mut new_pieces, new_config.healthchecks)
326 .await
327 {
328 self.connect_diff(&diff, &mut new_pieces).await;
329 self.spawn_diff(&diff, new_pieces);
330 self.config = new_config;
331
332 info!("New configuration loaded successfully.");
333
334 return Ok(true);
335 }
336 }
337
338 warn!("Failed to completely load new configuration. Restoring old configuration.");
342
343 let diff = diff.flip();
344 if let Some(mut new_pieces) =
345 TopologyPieces::build_or_log_errors(&self.config, &diff, buffers, extra_context.clone())
346 .await
347 && self
348 .run_healthchecks(&diff, &mut new_pieces, self.config.healthchecks)
349 .await
350 {
351 self.connect_diff(&diff, &mut new_pieces).await;
352 self.spawn_diff(&diff, new_pieces);
353
354 info!("Old configuration restored successfully.");
355
356 return Ok(false);
357 }
358
359 error!("Failed to restore old configuration.");
360
361 Err(())
362 }
363
364 pub(crate) async fn reload_enrichment_tables(&self) {
366 reload_enrichment_tables(&self.config).await;
367 }
368
369 pub(crate) async fn run_healthchecks(
370 &mut self,
371 diff: &ConfigDiff,
372 pieces: &mut TopologyPieces,
373 options: HealthcheckOptions,
374 ) -> bool {
375 if options.enabled {
376 let healthchecks = take_healthchecks(diff, pieces)
377 .into_iter()
378 .map(|(_, task)| task);
379 let healthchecks = future::try_join_all(healthchecks);
380
381 info!("Running healthchecks.");
382 if options.require_healthy {
383 let success = healthchecks.await;
384
385 if success.is_ok() {
386 info!("All healthchecks passed.");
387 true
388 } else {
389 error!("Sinks unhealthy.");
390 false
391 }
392 } else {
393 tokio::spawn(healthchecks);
394 true
395 }
396 } else {
397 true
398 }
399 }
400
401 async fn shutdown_diff(
405 &mut self,
406 diff: &ConfigDiff,
407 new_config: &Config,
408 ) -> HashMap<ComponentKey, BuiltBuffer> {
409 if diff.sources.any_changed_or_removed() {
412 let timeout = Duration::from_secs(30);
413 let mut source_shutdown_handles = Vec::new();
414
415 let deadline = Instant::now() + timeout;
416 for key in &diff.sources.to_remove {
417 debug!(component = %key, "Removing source.");
418
419 let previous = self.tasks.remove(key).unwrap();
420 drop(previous); self.remove_outputs(key);
423 source_shutdown_handles
424 .push(self.shutdown_coordinator.shutdown_source(key, deadline));
425 }
426
427 for key in &diff.sources.to_change {
428 debug!(component = %key, "Changing source.");
429
430 self.remove_outputs(key);
431 source_shutdown_handles
432 .push(self.shutdown_coordinator.shutdown_source(key, deadline));
433 }
434
435 debug!(
436 "Waiting for up to {} seconds for source(s) to finish shutting down.",
437 timeout.as_secs()
438 );
439 futures::future::join_all(source_shutdown_handles).await;
440
441 for key in diff.sources.removed_and_changed() {
443 if let Some(task) = self.source_tasks.remove(key) {
444 task.await.unwrap().unwrap();
445 }
446 }
447 }
448
449 for key in &diff.transforms.to_remove {
457 debug!(component = %key, "Removing transform.");
458
459 let previous = self.tasks.remove(key).unwrap();
460 drop(previous); self.remove_inputs(key, diff, new_config).await;
463 self.remove_outputs(key);
464 }
465
466 for key in &diff.transforms.to_change {
467 debug!(component = %key, "Changing transform.");
468
469 self.remove_inputs(key, diff, new_config).await;
470 self.remove_outputs(key);
471 }
472
473 let removed_table_sinks = diff
479 .enrichment_tables
480 .removed_and_changed()
481 .filter_map(|key| {
482 self.config
483 .enrichment_table(key)
484 .and_then(|t| t.as_sink(key))
485 .map(|(key, s)| (key.clone(), s.resources(&key)))
486 })
487 .collect::<Vec<_>>();
488 let remove_sink = diff
489 .sinks
490 .removed_and_changed()
491 .map(|key| {
492 (
493 key,
494 self.config
495 .sink(key)
496 .map(|s| s.resources(key))
497 .unwrap_or_default(),
498 )
499 })
500 .chain(removed_table_sinks.iter().map(|(k, s)| (k, s.clone())));
501 let add_source = diff
502 .sources
503 .changed_and_added()
504 .map(|key| (key, new_config.source(key).unwrap().inner.resources()));
505 let added_table_sinks = diff
506 .enrichment_tables
507 .changed_and_added()
508 .filter_map(|key| {
509 self.config
510 .enrichment_table(key)
511 .and_then(|t| t.as_sink(key))
512 .map(|(key, s)| (key.clone(), s.resources(&key)))
513 })
514 .collect::<Vec<_>>();
515 let add_sink = diff
516 .sinks
517 .changed_and_added()
518 .map(|key| {
519 (
520 key,
521 new_config
522 .sink(key)
523 .map(|s| s.resources(key))
524 .unwrap_or_default(),
525 )
526 })
527 .chain(added_table_sinks.iter().map(|(k, s)| (k, s.clone())));
528 let conflicts = Resource::conflicts(
529 remove_sink.map(|(key, value)| ((true, key), value)).chain(
530 add_sink
531 .chain(add_source)
532 .map(|(key, value)| ((false, key), value)),
533 ),
534 )
535 .into_iter()
536 .flat_map(|(_, components)| components)
537 .collect::<HashSet<_>>();
538 let conflicting_sinks = conflicts
540 .into_iter()
541 .filter(|&(existing_sink, _)| existing_sink)
542 .map(|(_, key)| key.clone());
543
544 let reuse_buffers = diff
546 .sinks
547 .to_change
548 .iter()
549 .filter(|&key| {
550 if diff.components_to_reload.contains(key) {
551 return false;
552 }
553 self.config.sink(key).map(|s| s.buffer.clone()).or_else(|| {
554 self.config
555 .enrichment_table(key)
556 .and_then(|t| t.as_sink(key))
557 .map(|(_, s)| s.buffer)
558 }) == new_config.sink(key).map(|s| s.buffer.clone()).or_else(|| {
559 self.config
560 .enrichment_table(key)
561 .and_then(|t| t.as_sink(key))
562 .map(|(_, s)| s.buffer)
563 })
564 })
565 .cloned()
566 .collect::<HashSet<_>>();
567
568 let wait_for_sinks = conflicting_sinks
572 .chain(reuse_buffers.iter().cloned())
573 .collect::<HashSet<_>>();
574
575 let removed_sinks = diff
577 .sinks
578 .to_remove
579 .iter()
580 .chain(diff.enrichment_tables.to_remove.iter().filter(|key| {
581 self.config
582 .enrichment_table(key)
583 .and_then(|t| t.as_sink(key))
584 .is_some()
585 }))
586 .collect::<Vec<_>>();
587 for key in &removed_sinks {
588 debug!(component = %key, "Removing sink.");
589 self.remove_inputs(key, diff, new_config).await;
590 }
591
592 let mut buffer_tx = HashMap::new();
595
596 let sinks_to_change = diff
597 .sinks
598 .to_change
599 .iter()
600 .chain(diff.enrichment_tables.to_change.iter().filter(|key| {
601 self.config
602 .enrichment_table(key)
603 .and_then(|t| t.as_sink(key))
604 .is_some()
605 }))
606 .collect::<Vec<_>>();
607
608 for key in &sinks_to_change {
609 debug!(component = %key, "Changing sink.");
610 if reuse_buffers.contains(key) {
611 self.detach_triggers
612 .remove(key)
613 .unwrap()
614 .into_inner()
615 .cancel();
616
617 buffer_tx.insert((*key).clone(), self.inputs.get(key).unwrap().clone());
628 }
629 self.remove_inputs(key, diff, new_config).await;
630 }
631
632 for key in &removed_sinks {
639 let previous = self.tasks.remove(key).unwrap();
640 if wait_for_sinks.contains(key) {
641 debug!(message = "Waiting for sink to shutdown.", %key);
642 previous.await.unwrap().unwrap();
643 } else {
644 drop(previous); }
646 }
647
648 let mut buffers = HashMap::<ComponentKey, BuiltBuffer>::new();
649 for key in &sinks_to_change {
650 if wait_for_sinks.contains(key) {
651 let previous = self.tasks.remove(key).unwrap();
652 debug!(message = "Waiting for sink to shutdown.", %key);
653 let buffer = previous.await.unwrap().unwrap();
654
655 if reuse_buffers.contains(key) {
656 let tx = buffer_tx.remove(key).unwrap();
664 let rx = match buffer {
665 TaskOutput::Sink(rx) => rx.into_inner(),
666 _ => unreachable!(),
667 };
668
669 buffers.insert((*key).clone(), (tx, Arc::new(Mutex::new(Some(rx)))));
670 }
671 }
672 }
673
674 buffers
675 }
676
677 pub(crate) async fn connect_diff(
679 &mut self,
680 diff: &ConfigDiff,
681 new_pieces: &mut TopologyPieces,
682 ) {
683 debug!("Connecting changed/added component(s).");
684
685 if !self.watch.0.is_closed() {
687 for key in &diff.sources.to_remove {
688 self.outputs_tap_metadata.remove(key);
690 }
691
692 for key in &diff.transforms.to_remove {
693 self.outputs_tap_metadata.remove(key);
695 self.inputs_tap_metadata.remove(key);
696 }
697
698 for key in &diff.sinks.to_remove {
699 self.inputs_tap_metadata.remove(key);
701 }
702
703 let removed_sinks = diff.enrichment_tables.to_remove.iter().filter(|key| {
704 self.config
705 .enrichment_table(key)
706 .and_then(|t| t.as_sink(key))
707 .is_some()
708 });
709 for key in removed_sinks {
710 self.inputs_tap_metadata.remove(key);
712 }
713
714 for key in diff.sources.changed_and_added() {
715 if let Some(task) = new_pieces.tasks.get(key) {
716 self.outputs_tap_metadata
717 .insert(key.clone(), ("source", task.typetag().to_string()));
718 }
719 }
720
721 for key in diff.transforms.changed_and_added() {
722 if let Some(task) = new_pieces.tasks.get(key) {
723 self.outputs_tap_metadata
724 .insert(key.clone(), ("transform", task.typetag().to_string()));
725 }
726 }
727
728 for (key, input) in &new_pieces.inputs {
729 self.inputs_tap_metadata
730 .insert(key.clone(), input.1.clone());
731 }
732 }
733
734 for key in diff.sources.changed_and_added() {
737 debug!(component = %key, "Configuring outputs for source.");
738 self.setup_outputs(key, new_pieces).await;
739 }
740
741 let added_changed_table_sources: Vec<&ComponentKey> = diff
742 .enrichment_tables
743 .changed_and_added()
744 .filter(|k| new_pieces.source_tasks.contains_key(k))
745 .collect();
746 for key in added_changed_table_sources {
747 debug!(component = %key, "Connecting outputs for enrichment table source.");
748 self.setup_outputs(key, new_pieces).await;
749 }
750
751 for key in diff.transforms.changed_and_added() {
754 debug!(component = %key, "Configuring outputs for transform.");
755 self.setup_outputs(key, new_pieces).await;
756 }
757
758 for key in diff.transforms.changed_and_added() {
761 debug!(component = %key, "Connecting inputs for transform.");
762 self.setup_inputs(key, diff, new_pieces).await;
763 }
764
765 for key in diff.sinks.changed_and_added() {
767 debug!(component = %key, "Connecting inputs for sink.");
768 self.setup_inputs(key, diff, new_pieces).await;
769 }
770 let added_changed_tables: Vec<&ComponentKey> = diff
771 .enrichment_tables
772 .changed_and_added()
773 .filter(|k| new_pieces.inputs.contains_key(k))
774 .collect();
775 for key in added_changed_tables {
776 debug!(component = %key, "Connecting inputs for enrichment table sink.");
777 self.setup_inputs(key, diff, new_pieces).await;
778 }
779
780 self.reattach_severed_inputs(diff);
791
792 if !self.watch.0.is_closed() {
794 let outputs = self
795 .outputs
796 .clone()
797 .into_iter()
798 .flat_map(|(output_id, control_tx)| {
799 self.outputs_tap_metadata.get(&output_id.component).map(
800 |(component_kind, component_type)| {
801 (
802 TapOutput {
803 output_id,
804 component_kind,
805 component_type: component_type.clone(),
806 },
807 control_tx,
808 )
809 },
810 )
811 })
812 .collect::<HashMap<_, _>>();
813
814 let mut removals = diff.sources.to_remove.clone();
815 removals.extend(diff.transforms.to_remove.iter().cloned());
816 self.watch
817 .0
818 .send(TapResource {
819 outputs,
820 inputs: self.inputs_tap_metadata.clone(),
821 source_keys: diff
822 .sources
823 .changed_and_added()
824 .map(|key| key.to_string())
825 .collect(),
826 sink_keys: diff
827 .sinks
828 .changed_and_added()
829 .map(|key| key.to_string())
830 .collect(),
831 removals,
834 })
835 .expect("Couldn't broadcast config changes.");
836 }
837 }
838
839 async fn setup_outputs(
840 &mut self,
841 key: &ComponentKey,
842 new_pieces: &mut builder::TopologyPieces,
843 ) {
844 let outputs = new_pieces.outputs.remove(key).unwrap();
845 for (port, output) in outputs {
846 debug!(component = %key, output_id = ?port, "Configuring output for component.");
847
848 let id = OutputId {
849 component: key.clone(),
850 port,
851 };
852
853 self.outputs.insert(id, output);
854 }
855 }
856
857 async fn setup_inputs(
858 &mut self,
859 key: &ComponentKey,
860 diff: &ConfigDiff,
861 new_pieces: &mut builder::TopologyPieces,
862 ) {
863 let (tx, inputs) = new_pieces.inputs.remove(key).unwrap();
864
865 let old_inputs = self
866 .config
867 .inputs_for_node(key)
868 .into_iter()
869 .flatten()
870 .cloned()
871 .collect::<HashSet<_>>();
872
873 let new_inputs = inputs.iter().cloned().collect::<HashSet<_>>();
874 let inputs_to_add = &new_inputs - &old_inputs;
875
876 for input in inputs {
877 let output = self.outputs.get_mut(&input).expect("unknown output");
878
879 if diff.contains(&input.component) || inputs_to_add.contains(&input) {
880 debug!(component = %key, fanout_id = %input, "Adding component input to fanout.");
884
885 _ = output.send(ControlMessage::Add(key.clone(), tx.clone()));
886 } else {
887 debug!(component = %key, fanout_id = %input, "Replacing component input in fanout.");
892
893 _ = output.send(ControlMessage::Replace(key.clone(), tx.clone()));
894 }
895 }
896
897 self.inputs.insert(key.clone(), tx);
898 new_pieces
899 .detach_triggers
900 .remove(key)
901 .map(|trigger| self.detach_triggers.insert(key.clone(), trigger.into()));
902 }
903
904 fn remove_outputs(&mut self, key: &ComponentKey) {
905 self.outputs.retain(|id, _output| &id.component != key);
906 }
907
908 async fn remove_inputs(&mut self, key: &ComponentKey, diff: &ConfigDiff, new_config: &Config) {
909 self.inputs.remove(key);
910 self.detach_triggers.remove(key);
911
912 let old_inputs = self.config.inputs_for_node(key).expect("node exists");
913 let new_inputs = new_config
914 .inputs_for_node(key)
915 .unwrap_or_default()
916 .iter()
917 .collect::<HashSet<_>>();
918
919 for input in old_inputs {
920 if let Some(output) = self.outputs.get_mut(input) {
921 if diff.contains(&input.component)
922 || diff.is_removed(key)
923 || !new_inputs.contains(input)
924 {
925 debug!(component = %key, fanout_id = %input, "Removing component input from fanout.");
936
937 _ = output.send(ControlMessage::Remove(key.clone()));
938 } else {
939 debug!(component = %key, fanout_id = %input, "Pausing component input in fanout.");
943
944 _ = output.send(ControlMessage::Pause(key.clone()));
945 }
946 }
947 }
948 }
949
950 fn reattach_severed_inputs(&mut self, diff: &ConfigDiff) {
951 let unchanged_transforms = self
952 .config
953 .transforms()
954 .filter(|(key, _)| !diff.transforms.contains(key));
955 for (transform_key, transform) in unchanged_transforms {
956 let changed_outputs = get_changed_outputs(diff, transform.inputs.clone());
957 for output_id in changed_outputs {
958 debug!(component = %transform_key, fanout_id = %output_id.component, "Reattaching component input to fanout.");
959
960 let input = self.inputs.get(transform_key).cloned().unwrap();
961 let output = self.outputs.get_mut(&output_id).unwrap();
962 _ = output.send(ControlMessage::Add(transform_key.clone(), input));
963 }
964 }
965
966 let unchanged_sinks = self
967 .config
968 .sinks()
969 .filter(|(key, _)| !diff.sinks.contains(key));
970 for (sink_key, sink) in unchanged_sinks {
971 let changed_outputs = get_changed_outputs(diff, sink.inputs.clone());
972 for output_id in changed_outputs {
973 debug!(component = %sink_key, fanout_id = %output_id.component, "Reattaching component input to fanout.");
974
975 let input = self.inputs.get(sink_key).cloned().unwrap();
976 let output = self.outputs.get_mut(&output_id).unwrap();
977 _ = output.send(ControlMessage::Add(sink_key.clone(), input));
978 }
979 }
980 }
981
982 pub(crate) fn spawn_diff(&mut self, diff: &ConfigDiff, mut new_pieces: TopologyPieces) {
984 for key in &diff.sources.to_change {
985 debug!(message = "Spawning changed source.", key = %key);
986 self.spawn_source(key, &mut new_pieces);
987 }
988
989 for key in &diff.sources.to_add {
990 debug!(message = "Spawning new source.", key = %key);
991 self.spawn_source(key, &mut new_pieces);
992 }
993
994 let changed_table_sources: Vec<&ComponentKey> = diff
995 .enrichment_tables
996 .to_change
997 .iter()
998 .filter(|k| new_pieces.source_tasks.contains_key(k))
999 .collect();
1000
1001 let added_table_sources: Vec<&ComponentKey> = diff
1002 .enrichment_tables
1003 .to_add
1004 .iter()
1005 .filter(|k| new_pieces.source_tasks.contains_key(k))
1006 .collect();
1007
1008 for key in changed_table_sources {
1009 debug!(message = "Spawning changed enrichment table source.", key = %key);
1010 self.spawn_source(key, &mut new_pieces);
1011 }
1012
1013 for key in added_table_sources {
1014 debug!(message = "Spawning new enrichment table source.", key = %key);
1015 self.spawn_source(key, &mut new_pieces);
1016 }
1017
1018 for key in &diff.transforms.to_change {
1019 debug!(message = "Spawning changed transform.", key = %key);
1020 self.spawn_transform(key, &mut new_pieces);
1021 }
1022
1023 for key in &diff.transforms.to_add {
1024 debug!(message = "Spawning new transform.", key = %key);
1025 self.spawn_transform(key, &mut new_pieces);
1026 }
1027
1028 for key in &diff.sinks.to_change {
1029 debug!(message = "Spawning changed sink.", key = %key);
1030 self.spawn_sink(key, &mut new_pieces);
1031 }
1032
1033 for key in &diff.sinks.to_add {
1034 trace!(message = "Spawning new sink.", key = %key);
1035 self.spawn_sink(key, &mut new_pieces);
1036 }
1037
1038 let changed_tables: Vec<&ComponentKey> = diff
1039 .enrichment_tables
1040 .to_change
1041 .iter()
1042 .filter(|k| {
1043 new_pieces.tasks.contains_key(k) && !new_pieces.source_tasks.contains_key(k)
1044 })
1045 .collect();
1046
1047 let added_tables: Vec<&ComponentKey> = diff
1048 .enrichment_tables
1049 .to_add
1050 .iter()
1051 .filter(|k| {
1052 new_pieces.tasks.contains_key(k) && !new_pieces.source_tasks.contains_key(k)
1053 })
1054 .collect();
1055
1056 for key in changed_tables {
1057 debug!(message = "Spawning changed enrichment table sink.", key = %key);
1058 self.spawn_sink(key, &mut new_pieces);
1059 }
1060
1061 for key in added_tables {
1062 debug!(message = "Spawning enrichment table new sink.", key = %key);
1063 self.spawn_sink(key, &mut new_pieces);
1064 }
1065 }
1066
1067 fn spawn_sink(&mut self, key: &ComponentKey, new_pieces: &mut builder::TopologyPieces) {
1068 let task = new_pieces.tasks.remove(key).unwrap();
1069 let span = error_span!(
1070 "sink",
1071 component_kind = "sink",
1072 component_id = %task.id(),
1073 component_type = %task.typetag(),
1074 );
1075
1076 let task_span = span.or_current();
1077 #[cfg(feature = "allocation-tracing")]
1078 if crate::internal_telemetry::allocations::is_allocation_tracing_enabled() {
1079 let group_id = crate::internal_telemetry::allocations::acquire_allocation_group_id(
1080 task.id().to_string(),
1081 "sink".to_string(),
1082 task.typetag().to_string(),
1083 );
1084 debug!(
1085 component_kind = "sink",
1086 component_type = task.typetag(),
1087 component_id = task.id(),
1088 group_id = group_id.as_raw().to_string(),
1089 "Registered new allocation group."
1090 );
1091 group_id.attach_to_span(&task_span);
1092 }
1093
1094 let task_name = format!(">> {} ({})", task.typetag(), task.id());
1095 let task = {
1096 let key = key.clone();
1097 handle_errors(task, self.abort_tx.clone(), |error| {
1098 ShutdownError::SinkAborted { key, error }
1099 })
1100 }
1101 .instrument(task_span);
1102 let spawned = spawn_named(task, task_name.as_ref());
1103 if let Some(previous) = self.tasks.insert(key.clone(), spawned) {
1104 drop(previous); }
1106 }
1107
1108 fn spawn_transform(&mut self, key: &ComponentKey, new_pieces: &mut builder::TopologyPieces) {
1109 let task = new_pieces.tasks.remove(key).unwrap();
1110 let span = error_span!(
1111 "transform",
1112 component_kind = "transform",
1113 component_id = %task.id(),
1114 component_type = %task.typetag(),
1115 );
1116
1117 let task_span = span.or_current();
1118 #[cfg(feature = "allocation-tracing")]
1119 if crate::internal_telemetry::allocations::is_allocation_tracing_enabled() {
1120 let group_id = crate::internal_telemetry::allocations::acquire_allocation_group_id(
1121 task.id().to_string(),
1122 "transform".to_string(),
1123 task.typetag().to_string(),
1124 );
1125 debug!(
1126 component_kind = "transform",
1127 component_type = task.typetag(),
1128 component_id = task.id(),
1129 group_id = group_id.as_raw().to_string(),
1130 "Registered new allocation group."
1131 );
1132 group_id.attach_to_span(&task_span);
1133 }
1134
1135 let task_name = format!(">> {} ({}) >>", task.typetag(), task.id());
1136 let task = {
1137 let key = key.clone();
1138 handle_errors(task, self.abort_tx.clone(), |error| {
1139 ShutdownError::TransformAborted { key, error }
1140 })
1141 }
1142 .instrument(task_span);
1143 let spawned = spawn_named(task, task_name.as_ref());
1144 if let Some(previous) = self.tasks.insert(key.clone(), spawned) {
1145 drop(previous); }
1147 }
1148
1149 fn spawn_source(&mut self, key: &ComponentKey, new_pieces: &mut builder::TopologyPieces) {
1150 let task = new_pieces.tasks.remove(key).unwrap();
1151 let span = error_span!(
1152 "source",
1153 component_kind = "source",
1154 component_id = %task.id(),
1155 component_type = %task.typetag(),
1156 );
1157
1158 let task_span = span.or_current();
1159 #[cfg(feature = "allocation-tracing")]
1160 if crate::internal_telemetry::allocations::is_allocation_tracing_enabled() {
1161 let group_id = crate::internal_telemetry::allocations::acquire_allocation_group_id(
1162 task.id().to_string(),
1163 "source".to_string(),
1164 task.typetag().to_string(),
1165 );
1166
1167 debug!(
1168 component_kind = "source",
1169 component_type = task.typetag(),
1170 component_id = task.id(),
1171 group_id = group_id.as_raw().to_string(),
1172 "Registered new allocation group."
1173 );
1174 group_id.attach_to_span(&task_span);
1175 }
1176
1177 let task_name = format!("{} ({}) >>", task.typetag(), task.id());
1178 let task = {
1179 let key = key.clone();
1180 handle_errors(task, self.abort_tx.clone(), |error| {
1181 ShutdownError::SourceAborted { key, error }
1182 })
1183 }
1184 .instrument(task_span.clone());
1185 let spawned = spawn_named(task, task_name.as_ref());
1186 if let Some(previous) = self.tasks.insert(key.clone(), spawned) {
1187 drop(previous); }
1189
1190 self.shutdown_coordinator
1191 .takeover_source(key, &mut new_pieces.shutdown_coordinator);
1192
1193 let source_task = new_pieces.source_tasks.remove(key).unwrap();
1195 let source_task = {
1196 let key = key.clone();
1197 handle_errors(source_task, self.abort_tx.clone(), |error| {
1198 ShutdownError::SourceAborted { key, error }
1199 })
1200 }
1201 .instrument(task_span);
1202 self.source_tasks
1203 .insert(key.clone(), spawn_named(source_task, task_name.as_ref()));
1204 }
1205
1206 pub async fn start_init_validated(
1207 config: Config,
1208 extra_context: ExtraContext,
1209 ) -> Option<(Self, ShutdownErrorReceiver)> {
1210 let diff = ConfigDiff::initial(&config);
1211 let pieces =
1212 TopologyPieces::build_or_log_errors(&config, &diff, HashMap::new(), extra_context)
1213 .await?;
1214 Self::start_validated(config, diff, pieces).await
1215 }
1216
1217 pub async fn start_validated(
1218 config: Config,
1219 diff: ConfigDiff,
1220 mut pieces: TopologyPieces,
1221 ) -> Option<(Self, ShutdownErrorReceiver)> {
1222 let (abort_tx, abort_rx) = mpsc::unbounded_channel();
1223
1224 let expire_metrics = match (
1225 config.global.expire_metrics,
1226 config.global.expire_metrics_secs,
1227 ) {
1228 (Some(e), None) => {
1229 warn!(
1230 "DEPRECATED: `expire_metrics` setting is deprecated and will be removed in a future version. Use `expire_metrics_secs` instead."
1231 );
1232 if e < Duration::from_secs(0) {
1233 None
1234 } else {
1235 Some(e.as_secs_f64())
1236 }
1237 }
1238 (Some(_), Some(_)) => {
1239 error!("Cannot set both `expire_metrics` and `expire_metrics_secs`.");
1240 return None;
1241 }
1242 (None, Some(e)) => {
1243 if e < 0f64 {
1244 None
1245 } else {
1246 Some(e)
1247 }
1248 }
1249 (None, None) => Some(300f64),
1250 };
1251
1252 if let Err(error) = crate::metrics::Controller::get()
1253 .expect("Metrics must be initialized")
1254 .set_expiry(
1255 expire_metrics,
1256 config
1257 .global
1258 .expire_metrics_per_metric_set
1259 .clone()
1260 .unwrap_or_default(),
1261 )
1262 {
1263 error!(message = "Invalid metrics expiry.", %error);
1264 return None;
1265 }
1266
1267 let mut utilization_emitter = pieces
1268 .utilization_emitter
1269 .take()
1270 .expect("Topology is missing the utilization metric emitter!");
1271 let mut running_topology = Self::new(config, abort_tx);
1272
1273 if !running_topology
1274 .run_healthchecks(&diff, &mut pieces, running_topology.config.healthchecks)
1275 .await
1276 {
1277 return None;
1278 }
1279 running_topology.connect_diff(&diff, &mut pieces).await;
1280 running_topology.spawn_diff(&diff, pieces);
1281
1282 let (utilization_task_shutdown_trigger, utilization_shutdown_signal, _) =
1283 ShutdownSignal::new_wired();
1284 running_topology.utilization_task_shutdown_trigger =
1285 Some(utilization_task_shutdown_trigger);
1286 running_topology.utilization_task = Some(tokio::spawn(Task::new(
1287 "utilization_heartbeat".into(),
1288 "",
1289 async move {
1290 utilization_emitter
1291 .run_utilization(utilization_shutdown_signal)
1292 .await;
1293 Ok(TaskOutput::Healthcheck)
1296 },
1297 )));
1298
1299 Some((running_topology, abort_rx))
1300 }
1301}
1302
1303fn get_changed_outputs(diff: &ConfigDiff, output_ids: Inputs<OutputId>) -> Vec<OutputId> {
1304 let mut changed_outputs = Vec::new();
1305
1306 for source_key in &diff.sources.to_change {
1307 changed_outputs.extend(
1308 output_ids
1309 .iter()
1310 .filter(|id| &id.component == source_key)
1311 .cloned(),
1312 );
1313 }
1314
1315 for transform_key in &diff.transforms.to_change {
1316 changed_outputs.extend(
1317 output_ids
1318 .iter()
1319 .filter(|id| &id.component == transform_key)
1320 .cloned(),
1321 );
1322 }
1323
1324 changed_outputs
1325}