1use std::{
2 collections::{HashMap, HashSet},
3 sync::{
4 atomic::{AtomicBool, Ordering},
5 Arc, Mutex,
6 },
7};
8
9use super::{
10 builder::{self, reload_enrichment_tables, TopologyPieces},
11 fanout::{ControlChannel, ControlMessage},
12 handle_errors, retain, take_healthchecks,
13 task::{Task, TaskOutput},
14 BuiltBuffer, TaskHandle,
15};
16use crate::{
17 config::{ComponentKey, Config, ConfigDiff, HealthcheckOptions, Inputs, OutputId, Resource},
18 event::EventArray,
19 extra_context::ExtraContext,
20 shutdown::SourceShutdownCoordinator,
21 signal::ShutdownError,
22 spawn_named,
23};
24use futures::{future, Future, FutureExt};
25use stream_cancel::Trigger;
26use tokio::{
27 sync::{mpsc, watch},
28 time::{interval, sleep_until, Duration, Instant},
29};
30use tracing::Instrument;
31use vector_lib::tap::topology::{TapOutput, TapResource, WatchRx, WatchTx};
32use vector_lib::trigger::DisabledTrigger;
33use vector_lib::{buffers::topology::channel::BufferSender, shutdown::ShutdownSignal};
34
35pub type ShutdownErrorReceiver = mpsc::UnboundedReceiver<ShutdownError>;
36
37#[allow(dead_code)]
38pub struct RunningTopology {
39 inputs: HashMap<ComponentKey, BufferSender<EventArray>>,
40 inputs_tap_metadata: HashMap<ComponentKey, Inputs<OutputId>>,
41 outputs: HashMap<OutputId, ControlChannel>,
42 outputs_tap_metadata: HashMap<ComponentKey, (&'static str, String)>,
43 source_tasks: HashMap<ComponentKey, TaskHandle>,
44 tasks: HashMap<ComponentKey, TaskHandle>,
45 shutdown_coordinator: SourceShutdownCoordinator,
46 detach_triggers: HashMap<ComponentKey, DisabledTrigger>,
47 pub(crate) config: Config,
48 pub(crate) abort_tx: mpsc::UnboundedSender<ShutdownError>,
49 watch: (WatchTx, WatchRx),
50 pub(crate) running: Arc<AtomicBool>,
51 graceful_shutdown_duration: Option<Duration>,
52 utilization_task: Option<TaskHandle>,
53 utilization_task_shutdown_trigger: Option<Trigger>,
54 pending_reload: Option<HashSet<ComponentKey>>,
55}
56
57impl RunningTopology {
58 pub fn new(config: Config, abort_tx: mpsc::UnboundedSender<ShutdownError>) -> Self {
59 Self {
60 inputs: HashMap::new(),
61 inputs_tap_metadata: HashMap::new(),
62 outputs: HashMap::new(),
63 outputs_tap_metadata: HashMap::new(),
64 shutdown_coordinator: SourceShutdownCoordinator::default(),
65 detach_triggers: HashMap::new(),
66 source_tasks: HashMap::new(),
67 tasks: HashMap::new(),
68 abort_tx,
69 watch: watch::channel(TapResource::default()),
70 running: Arc::new(AtomicBool::new(true)),
71 graceful_shutdown_duration: config.graceful_shutdown_duration,
72 config,
73 utilization_task: None,
74 utilization_task_shutdown_trigger: None,
75 pending_reload: None,
76 }
77 }
78
79 pub const fn config(&self) -> &Config {
81 &self.config
82 }
83
84 pub fn extend_reload_set(&mut self, new_set: HashSet<ComponentKey>) {
87 match &mut self.pending_reload {
88 None => self.pending_reload = Some(new_set.clone()),
89 Some(existing) => existing.extend(new_set),
90 }
91 }
92
93 pub fn watch(&self) -> watch::Receiver<TapResource> {
97 self.watch.1.clone()
98 }
99
100 pub fn sources_finished(&self) -> future::BoxFuture<'static, ()> {
108 self.shutdown_coordinator.shutdown_tripwire()
109 }
110
111 pub fn stop(self) -> impl Future<Output = ()> {
125 self.running.store(false, Ordering::Relaxed);
127 let mut wait_handles = Vec::new();
130 let mut check_handles = HashMap::<ComponentKey, Vec<_>>::new();
133
134 let map_closure = |_result| ();
135
136 for (key, task) in self.tasks.into_iter().chain(self.source_tasks.into_iter()) {
139 let task = task.map(map_closure).shared();
140
141 wait_handles.push(task.clone());
142 check_handles.entry(key).or_default().push(task);
143 }
144
145 if let Some(utilization_task) = self.utilization_task {
146 wait_handles.push(utilization_task.map(map_closure).shared());
147 }
148
149 let deadline = self
151 .graceful_shutdown_duration
152 .map(|grace_period| Instant::now() + grace_period);
153
154 let timeout = if let Some(deadline) = deadline {
155 let mut check_handles2 = check_handles.clone();
159 Box::pin(async move {
160 sleep_until(deadline).await;
161 check_handles2.retain(|_key, handles| {
163 retain(handles, |handle| handle.peek().is_none());
164 !handles.is_empty()
165 });
166 let remaining_components = check_handles2
167 .keys()
168 .map(|item| item.to_string())
169 .collect::<Vec<_>>()
170 .join(", ");
171
172 error!(
173 components = ?remaining_components,
174 "Failed to gracefully shut down in time. Killing components."
175 );
176 }) as future::BoxFuture<'static, ()>
177 } else {
178 Box::pin(future::pending()) as future::BoxFuture<'static, ()>
179 };
180
181 let mut interval = interval(Duration::from_secs(5));
183 let reporter = async move {
184 loop {
185 interval.tick().await;
186
187 check_handles.retain(|_key, handles| {
189 retain(handles, |handle| handle.peek().is_none());
190 !handles.is_empty()
191 });
192 let remaining_components = check_handles
193 .keys()
194 .map(|item| item.to_string())
195 .collect::<Vec<_>>()
196 .join(", ");
197
198 let time_remaining = deadline
199 .map(|d| match d.checked_duration_since(Instant::now()) {
200 Some(remaining) => format!("{} seconds left", remaining.as_secs()),
201 None => "overdue".to_string(),
202 })
203 .unwrap_or("no time limit".to_string());
204
205 info!(
206 remaining_components = ?remaining_components,
207 time_remaining = ?time_remaining,
208 "Shutting down... Waiting on running components."
209 );
210 }
211 };
212
213 let success = futures::future::join_all(wait_handles).map(|_| ());
215
216 let shutdown_complete_future = future::select_all(vec![
218 Box::pin(timeout) as future::BoxFuture<'static, ()>,
219 Box::pin(reporter) as future::BoxFuture<'static, ()>,
220 Box::pin(success) as future::BoxFuture<'static, ()>,
221 ]);
222
223 let source_shutdown_complete = self.shutdown_coordinator.shutdown_all(deadline);
225 if let Some(trigger) = self.utilization_task_shutdown_trigger {
226 trigger.cancel();
227 }
228
229 futures::future::join(source_shutdown_complete, shutdown_complete_future).map(|_| ())
230 }
231
232 pub async fn reload_config_and_respawn(
248 &mut self,
249 new_config: Config,
250 extra_context: ExtraContext,
251 ) -> Result<bool, ()> {
252 info!("Reloading running topology with new configuration.");
253
254 if self.config.global != new_config.global {
255 error!(
256 message =
257 "Global options can't be changed while reloading config file; reload aborted. Please restart Vector to reload the configuration file."
258 );
259 return Ok(false);
260 }
261
262 let diff = if let Some(components) = &self.pending_reload {
268 ConfigDiff::new(&self.config, &new_config, components.clone())
269 } else {
270 ConfigDiff::new(&self.config, &new_config, HashSet::new())
271 };
272 let buffers = self.shutdown_diff(&diff, &new_config).await;
273
274 if cfg!(windows) {
278 tokio::time::sleep(Duration::from_millis(200)).await;
280 }
281
282 if let Some(mut new_pieces) = TopologyPieces::build_or_log_errors(
286 &new_config,
287 &diff,
288 buffers.clone(),
289 extra_context.clone(),
290 )
291 .await
292 {
293 if self
297 .run_healthchecks(&diff, &mut new_pieces, new_config.healthchecks)
298 .await
299 {
300 self.connect_diff(&diff, &mut new_pieces).await;
301 self.spawn_diff(&diff, new_pieces);
302 self.config = new_config;
303
304 info!("New configuration loaded successfully.");
305
306 return Ok(true);
307 }
308 }
309
310 warn!("Failed to completely load new configuration. Restoring old configuration.");
314
315 let diff = diff.flip();
316 if let Some(mut new_pieces) =
317 TopologyPieces::build_or_log_errors(&self.config, &diff, buffers, extra_context.clone())
318 .await
319 {
320 if self
321 .run_healthchecks(&diff, &mut new_pieces, self.config.healthchecks)
322 .await
323 {
324 self.connect_diff(&diff, &mut new_pieces).await;
325 self.spawn_diff(&diff, new_pieces);
326
327 info!("Old configuration restored successfully.");
328
329 return Ok(false);
330 }
331 }
332
333 error!("Failed to restore old configuration.");
334
335 Err(())
336 }
337
338 pub(crate) async fn reload_enrichment_tables(&self) {
340 reload_enrichment_tables(&self.config).await;
341 }
342
343 pub(crate) async fn run_healthchecks(
344 &mut self,
345 diff: &ConfigDiff,
346 pieces: &mut TopologyPieces,
347 options: HealthcheckOptions,
348 ) -> bool {
349 if options.enabled {
350 let healthchecks = take_healthchecks(diff, pieces)
351 .into_iter()
352 .map(|(_, task)| task);
353 let healthchecks = future::try_join_all(healthchecks);
354
355 info!("Running healthchecks.");
356 if options.require_healthy {
357 let success = healthchecks.await;
358
359 if success.is_ok() {
360 info!("All healthchecks passed.");
361 true
362 } else {
363 error!("Sinks unhealthy.");
364 false
365 }
366 } else {
367 tokio::spawn(healthchecks);
368 true
369 }
370 } else {
371 true
372 }
373 }
374
375 async fn shutdown_diff(
379 &mut self,
380 diff: &ConfigDiff,
381 new_config: &Config,
382 ) -> HashMap<ComponentKey, BuiltBuffer> {
383 if diff.sources.any_changed_or_removed() {
386 let timeout = Duration::from_secs(30);
387 let mut source_shutdown_handles = Vec::new();
388
389 let deadline = Instant::now() + timeout;
390 for key in &diff.sources.to_remove {
391 debug!(component = %key, "Removing source.");
392
393 let previous = self.tasks.remove(key).unwrap();
394 drop(previous); self.remove_outputs(key);
397 source_shutdown_handles
398 .push(self.shutdown_coordinator.shutdown_source(key, deadline));
399 }
400
401 for key in &diff.sources.to_change {
402 debug!(component = %key, "Changing source.");
403
404 self.remove_outputs(key);
405 source_shutdown_handles
406 .push(self.shutdown_coordinator.shutdown_source(key, deadline));
407 }
408
409 debug!(
410 "Waiting for up to {} seconds for source(s) to finish shutting down.",
411 timeout.as_secs()
412 );
413 futures::future::join_all(source_shutdown_handles).await;
414
415 for key in diff.sources.removed_and_changed() {
417 if let Some(task) = self.source_tasks.remove(key) {
418 task.await.unwrap().unwrap();
419 }
420 }
421 }
422
423 for key in &diff.transforms.to_remove {
431 debug!(component = %key, "Removing transform.");
432
433 let previous = self.tasks.remove(key).unwrap();
434 drop(previous); self.remove_inputs(key, diff, new_config).await;
437 self.remove_outputs(key);
438 }
439
440 for key in &diff.transforms.to_change {
441 debug!(component = %key, "Changing transform.");
442
443 self.remove_inputs(key, diff, new_config).await;
444 self.remove_outputs(key);
445 }
446
447 let removed_table_sinks = diff
453 .enrichment_tables
454 .removed_and_changed()
455 .filter_map(|key| {
456 self.config
457 .enrichment_table(key)
458 .and_then(|t| t.as_sink(key))
459 .map(|(key, s)| (key.clone(), s.resources(&key)))
460 })
461 .collect::<Vec<_>>();
462 let remove_sink = diff
463 .sinks
464 .removed_and_changed()
465 .map(|key| {
466 (
467 key,
468 self.config
469 .sink(key)
470 .map(|s| s.resources(key))
471 .unwrap_or_default(),
472 )
473 })
474 .chain(removed_table_sinks.iter().map(|(k, s)| (k, s.clone())));
475 let add_source = diff
476 .sources
477 .changed_and_added()
478 .map(|key| (key, new_config.source(key).unwrap().inner.resources()));
479 let added_table_sinks = diff
480 .enrichment_tables
481 .changed_and_added()
482 .filter_map(|key| {
483 self.config
484 .enrichment_table(key)
485 .and_then(|t| t.as_sink(key))
486 .map(|(key, s)| (key.clone(), s.resources(&key)))
487 })
488 .collect::<Vec<_>>();
489 let add_sink = diff
490 .sinks
491 .changed_and_added()
492 .map(|key| {
493 (
494 key,
495 new_config
496 .sink(key)
497 .map(|s| s.resources(key))
498 .unwrap_or_default(),
499 )
500 })
501 .chain(added_table_sinks.iter().map(|(k, s)| (k, s.clone())));
502 let conflicts = Resource::conflicts(
503 remove_sink.map(|(key, value)| ((true, key), value)).chain(
504 add_sink
505 .chain(add_source)
506 .map(|(key, value)| ((false, key), value)),
507 ),
508 )
509 .into_iter()
510 .flat_map(|(_, components)| components)
511 .collect::<HashSet<_>>();
512 let conflicting_sinks = conflicts
514 .into_iter()
515 .filter(|&(existing_sink, _)| existing_sink)
516 .map(|(_, key)| key.clone());
517
518 let reuse_buffers = diff
520 .sinks
521 .to_change
522 .iter()
523 .filter(|&key| {
524 if diff.components_to_reload.contains(key) {
525 return false;
526 }
527 self.config.sink(key).map(|s| s.buffer.clone()).or_else(|| {
528 self.config
529 .enrichment_table(key)
530 .and_then(|t| t.as_sink(key))
531 .map(|(_, s)| s.buffer)
532 }) == new_config.sink(key).map(|s| s.buffer.clone()).or_else(|| {
533 self.config
534 .enrichment_table(key)
535 .and_then(|t| t.as_sink(key))
536 .map(|(_, s)| s.buffer)
537 })
538 })
539 .cloned()
540 .collect::<HashSet<_>>();
541
542 let wait_for_sinks = conflicting_sinks
546 .chain(reuse_buffers.iter().cloned())
547 .collect::<HashSet<_>>();
548
549 let removed_sinks = diff
551 .sinks
552 .to_remove
553 .iter()
554 .chain(diff.enrichment_tables.to_remove.iter().filter(|key| {
555 self.config
556 .enrichment_table(key)
557 .and_then(|t| t.as_sink(key))
558 .is_some()
559 }))
560 .collect::<Vec<_>>();
561 for key in &removed_sinks {
562 debug!(component = %key, "Removing sink.");
563 self.remove_inputs(key, diff, new_config).await;
564 }
565
566 let mut buffer_tx = HashMap::new();
569
570 let sinks_to_change = diff
571 .sinks
572 .to_change
573 .iter()
574 .chain(diff.enrichment_tables.to_change.iter().filter(|key| {
575 self.config
576 .enrichment_table(key)
577 .and_then(|t| t.as_sink(key))
578 .is_some()
579 }))
580 .collect::<Vec<_>>();
581
582 for key in &sinks_to_change {
583 debug!(component = %key, "Changing sink.");
584 if reuse_buffers.contains(key) {
585 self.detach_triggers
586 .remove(key)
587 .unwrap()
588 .into_inner()
589 .cancel();
590
591 buffer_tx.insert((*key).clone(), self.inputs.get(key).unwrap().clone());
602 }
603 self.remove_inputs(key, diff, new_config).await;
604 }
605
606 for key in &removed_sinks {
613 let previous = self.tasks.remove(key).unwrap();
614 if wait_for_sinks.contains(key) {
615 debug!(message = "Waiting for sink to shutdown.", %key);
616 previous.await.unwrap().unwrap();
617 } else {
618 drop(previous); }
620 }
621
622 let mut buffers = HashMap::<ComponentKey, BuiltBuffer>::new();
623 for key in &sinks_to_change {
624 if wait_for_sinks.contains(key) {
625 let previous = self.tasks.remove(key).unwrap();
626 debug!(message = "Waiting for sink to shutdown.", %key);
627 let buffer = previous.await.unwrap().unwrap();
628
629 if reuse_buffers.contains(key) {
630 let tx = buffer_tx.remove(key).unwrap();
638 let rx = match buffer {
639 TaskOutput::Sink(rx) => rx.into_inner(),
640 _ => unreachable!(),
641 };
642
643 buffers.insert((*key).clone(), (tx, Arc::new(Mutex::new(Some(rx)))));
644 }
645 }
646 }
647
648 buffers
649 }
650
651 pub(crate) async fn connect_diff(
653 &mut self,
654 diff: &ConfigDiff,
655 new_pieces: &mut TopologyPieces,
656 ) {
657 debug!("Connecting changed/added component(s).");
658
659 if !self.watch.0.is_closed() {
661 for key in &diff.sources.to_remove {
662 self.outputs_tap_metadata.remove(key);
664 }
665
666 for key in &diff.transforms.to_remove {
667 self.outputs_tap_metadata.remove(key);
669 self.inputs_tap_metadata.remove(key);
670 }
671
672 for key in &diff.sinks.to_remove {
673 self.inputs_tap_metadata.remove(key);
675 }
676
677 let removed_sinks = diff.enrichment_tables.to_remove.iter().filter(|key| {
678 self.config
679 .enrichment_table(key)
680 .and_then(|t| t.as_sink(key))
681 .is_some()
682 });
683 for key in removed_sinks {
684 self.inputs_tap_metadata.remove(key);
686 }
687
688 for key in diff.sources.changed_and_added() {
689 if let Some(task) = new_pieces.tasks.get(key) {
690 self.outputs_tap_metadata
691 .insert(key.clone(), ("source", task.typetag().to_string()));
692 }
693 }
694
695 for key in diff.transforms.changed_and_added() {
696 if let Some(task) = new_pieces.tasks.get(key) {
697 self.outputs_tap_metadata
698 .insert(key.clone(), ("transform", task.typetag().to_string()));
699 }
700 }
701
702 for (key, input) in &new_pieces.inputs {
703 self.inputs_tap_metadata
704 .insert(key.clone(), input.1.clone());
705 }
706 }
707
708 for key in diff.sources.changed_and_added() {
711 debug!(component = %key, "Configuring outputs for source.");
712 self.setup_outputs(key, new_pieces).await;
713 }
714
715 let added_changed_table_sources: Vec<&ComponentKey> = diff
716 .enrichment_tables
717 .changed_and_added()
718 .filter(|k| new_pieces.source_tasks.contains_key(k))
719 .collect();
720 for key in added_changed_table_sources {
721 debug!(component = %key, "Connecting outputs for enrichment table source.");
722 self.setup_outputs(key, new_pieces).await;
723 }
724
725 for key in diff.transforms.changed_and_added() {
728 debug!(component = %key, "Configuring outputs for transform.");
729 self.setup_outputs(key, new_pieces).await;
730 }
731
732 for key in diff.transforms.changed_and_added() {
735 debug!(component = %key, "Connecting inputs for transform.");
736 self.setup_inputs(key, diff, new_pieces).await;
737 }
738
739 for key in diff.sinks.changed_and_added() {
741 debug!(component = %key, "Connecting inputs for sink.");
742 self.setup_inputs(key, diff, new_pieces).await;
743 }
744 let added_changed_tables: Vec<&ComponentKey> = diff
745 .enrichment_tables
746 .changed_and_added()
747 .filter(|k| new_pieces.inputs.contains_key(k))
748 .collect();
749 for key in added_changed_tables {
750 debug!(component = %key, "Connecting inputs for enrichment table sink.");
751 self.setup_inputs(key, diff, new_pieces).await;
752 }
753
754 self.reattach_severed_inputs(diff);
765
766 if !self.watch.0.is_closed() {
768 let outputs = self
769 .outputs
770 .clone()
771 .into_iter()
772 .flat_map(|(output_id, control_tx)| {
773 self.outputs_tap_metadata.get(&output_id.component).map(
774 |(component_kind, component_type)| {
775 (
776 TapOutput {
777 output_id,
778 component_kind,
779 component_type: component_type.clone(),
780 },
781 control_tx,
782 )
783 },
784 )
785 })
786 .collect::<HashMap<_, _>>();
787
788 let mut removals = diff.sources.to_remove.clone();
789 removals.extend(diff.transforms.to_remove.iter().cloned());
790 self.watch
791 .0
792 .send(TapResource {
793 outputs,
794 inputs: self.inputs_tap_metadata.clone(),
795 source_keys: diff
796 .sources
797 .changed_and_added()
798 .map(|key| key.to_string())
799 .collect(),
800 sink_keys: diff
801 .sinks
802 .changed_and_added()
803 .map(|key| key.to_string())
804 .collect(),
805 removals,
808 })
809 .expect("Couldn't broadcast config changes.");
810 }
811 }
812
813 async fn setup_outputs(
814 &mut self,
815 key: &ComponentKey,
816 new_pieces: &mut builder::TopologyPieces,
817 ) {
818 let outputs = new_pieces.outputs.remove(key).unwrap();
819 for (port, output) in outputs {
820 debug!(component = %key, output_id = ?port, "Configuring output for component.");
821
822 let id = OutputId {
823 component: key.clone(),
824 port,
825 };
826
827 self.outputs.insert(id, output);
828 }
829 }
830
831 async fn setup_inputs(
832 &mut self,
833 key: &ComponentKey,
834 diff: &ConfigDiff,
835 new_pieces: &mut builder::TopologyPieces,
836 ) {
837 let (tx, inputs) = new_pieces.inputs.remove(key).unwrap();
838
839 let old_inputs = self
840 .config
841 .inputs_for_node(key)
842 .into_iter()
843 .flatten()
844 .cloned()
845 .collect::<HashSet<_>>();
846
847 let new_inputs = inputs.iter().cloned().collect::<HashSet<_>>();
848 let inputs_to_add = &new_inputs - &old_inputs;
849
850 for input in inputs {
851 let output = self.outputs.get_mut(&input).expect("unknown output");
852
853 if diff.contains(&input.component) || inputs_to_add.contains(&input) {
854 debug!(component = %key, fanout_id = %input, "Adding component input to fanout.");
858
859 _ = output.send(ControlMessage::Add(key.clone(), tx.clone()));
860 } else {
861 debug!(component = %key, fanout_id = %input, "Replacing component input in fanout.");
866
867 _ = output.send(ControlMessage::Replace(key.clone(), tx.clone()));
868 }
869 }
870
871 self.inputs.insert(key.clone(), tx);
872 new_pieces
873 .detach_triggers
874 .remove(key)
875 .map(|trigger| self.detach_triggers.insert(key.clone(), trigger.into()));
876 }
877
878 fn remove_outputs(&mut self, key: &ComponentKey) {
879 self.outputs.retain(|id, _output| &id.component != key);
880 }
881
882 async fn remove_inputs(&mut self, key: &ComponentKey, diff: &ConfigDiff, new_config: &Config) {
883 self.inputs.remove(key);
884 self.detach_triggers.remove(key);
885
886 let old_inputs = self.config.inputs_for_node(key).expect("node exists");
887 let new_inputs = new_config
888 .inputs_for_node(key)
889 .unwrap_or_default()
890 .iter()
891 .collect::<HashSet<_>>();
892
893 for input in old_inputs {
894 if let Some(output) = self.outputs.get_mut(input) {
895 if diff.contains(&input.component)
896 || diff.is_removed(key)
897 || !new_inputs.contains(input)
898 {
899 debug!(component = %key, fanout_id = %input, "Removing component input from fanout.");
910
911 _ = output.send(ControlMessage::Remove(key.clone()));
912 } else {
913 debug!(component = %key, fanout_id = %input, "Pausing component input in fanout.");
917
918 _ = output.send(ControlMessage::Pause(key.clone()));
919 }
920 }
921 }
922 }
923
924 fn reattach_severed_inputs(&mut self, diff: &ConfigDiff) {
925 let unchanged_transforms = self
926 .config
927 .transforms()
928 .filter(|(key, _)| !diff.transforms.contains(key));
929 for (transform_key, transform) in unchanged_transforms {
930 let changed_outputs = get_changed_outputs(diff, transform.inputs.clone());
931 for output_id in changed_outputs {
932 debug!(component = %transform_key, fanout_id = %output_id.component, "Reattaching component input to fanout.");
933
934 let input = self.inputs.get(transform_key).cloned().unwrap();
935 let output = self.outputs.get_mut(&output_id).unwrap();
936 _ = output.send(ControlMessage::Add(transform_key.clone(), input));
937 }
938 }
939
940 let unchanged_sinks = self
941 .config
942 .sinks()
943 .filter(|(key, _)| !diff.sinks.contains(key));
944 for (sink_key, sink) in unchanged_sinks {
945 let changed_outputs = get_changed_outputs(diff, sink.inputs.clone());
946 for output_id in changed_outputs {
947 debug!(component = %sink_key, fanout_id = %output_id.component, "Reattaching component input to fanout.");
948
949 let input = self.inputs.get(sink_key).cloned().unwrap();
950 let output = self.outputs.get_mut(&output_id).unwrap();
951 _ = output.send(ControlMessage::Add(sink_key.clone(), input));
952 }
953 }
954 }
955
956 pub(crate) fn spawn_diff(&mut self, diff: &ConfigDiff, mut new_pieces: TopologyPieces) {
958 for key in &diff.sources.to_change {
959 debug!(message = "Spawning changed source.", key = %key);
960 self.spawn_source(key, &mut new_pieces);
961 }
962
963 for key in &diff.sources.to_add {
964 debug!(message = "Spawning new source.", key = %key);
965 self.spawn_source(key, &mut new_pieces);
966 }
967
968 let changed_table_sources: Vec<&ComponentKey> = diff
969 .enrichment_tables
970 .to_change
971 .iter()
972 .filter(|k| new_pieces.source_tasks.contains_key(k))
973 .collect();
974
975 let added_table_sources: Vec<&ComponentKey> = diff
976 .enrichment_tables
977 .to_add
978 .iter()
979 .filter(|k| new_pieces.source_tasks.contains_key(k))
980 .collect();
981
982 for key in changed_table_sources {
983 debug!(message = "Spawning changed enrichment table source.", key = %key);
984 self.spawn_source(key, &mut new_pieces);
985 }
986
987 for key in added_table_sources {
988 debug!(message = "Spawning new enrichment table source.", key = %key);
989 self.spawn_source(key, &mut new_pieces);
990 }
991
992 for key in &diff.transforms.to_change {
993 debug!(message = "Spawning changed transform.", key = %key);
994 self.spawn_transform(key, &mut new_pieces);
995 }
996
997 for key in &diff.transforms.to_add {
998 debug!(message = "Spawning new transform.", key = %key);
999 self.spawn_transform(key, &mut new_pieces);
1000 }
1001
1002 for key in &diff.sinks.to_change {
1003 debug!(message = "Spawning changed sink.", key = %key);
1004 self.spawn_sink(key, &mut new_pieces);
1005 }
1006
1007 for key in &diff.sinks.to_add {
1008 trace!(message = "Spawning new sink.", key = %key);
1009 self.spawn_sink(key, &mut new_pieces);
1010 }
1011
1012 let changed_tables: Vec<&ComponentKey> = diff
1013 .enrichment_tables
1014 .to_change
1015 .iter()
1016 .filter(|k| {
1017 new_pieces.tasks.contains_key(k) && !new_pieces.source_tasks.contains_key(k)
1018 })
1019 .collect();
1020
1021 let added_tables: Vec<&ComponentKey> = diff
1022 .enrichment_tables
1023 .to_add
1024 .iter()
1025 .filter(|k| {
1026 new_pieces.tasks.contains_key(k) && !new_pieces.source_tasks.contains_key(k)
1027 })
1028 .collect();
1029
1030 for key in changed_tables {
1031 debug!(message = "Spawning changed enrichment table sink.", key = %key);
1032 self.spawn_sink(key, &mut new_pieces);
1033 }
1034
1035 for key in added_tables {
1036 debug!(message = "Spawning enrichment table new sink.", key = %key);
1037 self.spawn_sink(key, &mut new_pieces);
1038 }
1039 }
1040
1041 fn spawn_sink(&mut self, key: &ComponentKey, new_pieces: &mut builder::TopologyPieces) {
1042 let task = new_pieces.tasks.remove(key).unwrap();
1043 let span = error_span!(
1044 "sink",
1045 component_kind = "sink",
1046 component_id = %task.id(),
1047 component_type = %task.typetag(),
1048 );
1049
1050 let task_span = span.or_current();
1051 #[cfg(feature = "allocation-tracing")]
1052 if crate::internal_telemetry::allocations::is_allocation_tracing_enabled() {
1053 let group_id = crate::internal_telemetry::allocations::acquire_allocation_group_id(
1054 task.id().to_string(),
1055 "sink".to_string(),
1056 task.typetag().to_string(),
1057 );
1058 debug!(
1059 component_kind = "sink",
1060 component_type = task.typetag(),
1061 component_id = task.id(),
1062 group_id = group_id.as_raw().to_string(),
1063 "Registered new allocation group."
1064 );
1065 group_id.attach_to_span(&task_span);
1066 }
1067
1068 let task_name = format!(">> {} ({})", task.typetag(), task.id());
1069 let task = {
1070 let key = key.clone();
1071 handle_errors(task, self.abort_tx.clone(), |error| {
1072 ShutdownError::SinkAborted { key, error }
1073 })
1074 }
1075 .instrument(task_span);
1076 let spawned = spawn_named(task, task_name.as_ref());
1077 if let Some(previous) = self.tasks.insert(key.clone(), spawned) {
1078 drop(previous); }
1080 }
1081
1082 fn spawn_transform(&mut self, key: &ComponentKey, new_pieces: &mut builder::TopologyPieces) {
1083 let task = new_pieces.tasks.remove(key).unwrap();
1084 let span = error_span!(
1085 "transform",
1086 component_kind = "transform",
1087 component_id = %task.id(),
1088 component_type = %task.typetag(),
1089 );
1090
1091 let task_span = span.or_current();
1092 #[cfg(feature = "allocation-tracing")]
1093 if crate::internal_telemetry::allocations::is_allocation_tracing_enabled() {
1094 let group_id = crate::internal_telemetry::allocations::acquire_allocation_group_id(
1095 task.id().to_string(),
1096 "transform".to_string(),
1097 task.typetag().to_string(),
1098 );
1099 debug!(
1100 component_kind = "transform",
1101 component_type = task.typetag(),
1102 component_id = task.id(),
1103 group_id = group_id.as_raw().to_string(),
1104 "Registered new allocation group."
1105 );
1106 group_id.attach_to_span(&task_span);
1107 }
1108
1109 let task_name = format!(">> {} ({}) >>", task.typetag(), task.id());
1110 let task = {
1111 let key = key.clone();
1112 handle_errors(task, self.abort_tx.clone(), |error| {
1113 ShutdownError::TransformAborted { key, error }
1114 })
1115 }
1116 .instrument(task_span);
1117 let spawned = spawn_named(task, task_name.as_ref());
1118 if let Some(previous) = self.tasks.insert(key.clone(), spawned) {
1119 drop(previous); }
1121 }
1122
1123 fn spawn_source(&mut self, key: &ComponentKey, new_pieces: &mut builder::TopologyPieces) {
1124 let task = new_pieces.tasks.remove(key).unwrap();
1125 let span = error_span!(
1126 "source",
1127 component_kind = "source",
1128 component_id = %task.id(),
1129 component_type = %task.typetag(),
1130 );
1131
1132 let task_span = span.or_current();
1133 #[cfg(feature = "allocation-tracing")]
1134 if crate::internal_telemetry::allocations::is_allocation_tracing_enabled() {
1135 let group_id = crate::internal_telemetry::allocations::acquire_allocation_group_id(
1136 task.id().to_string(),
1137 "source".to_string(),
1138 task.typetag().to_string(),
1139 );
1140
1141 debug!(
1142 component_kind = "source",
1143 component_type = task.typetag(),
1144 component_id = task.id(),
1145 group_id = group_id.as_raw().to_string(),
1146 "Registered new allocation group."
1147 );
1148 group_id.attach_to_span(&task_span);
1149 }
1150
1151 let task_name = format!("{} ({}) >>", task.typetag(), task.id());
1152 let task = {
1153 let key = key.clone();
1154 handle_errors(task, self.abort_tx.clone(), |error| {
1155 ShutdownError::SourceAborted { key, error }
1156 })
1157 }
1158 .instrument(task_span.clone());
1159 let spawned = spawn_named(task, task_name.as_ref());
1160 if let Some(previous) = self.tasks.insert(key.clone(), spawned) {
1161 drop(previous); }
1163
1164 self.shutdown_coordinator
1165 .takeover_source(key, &mut new_pieces.shutdown_coordinator);
1166
1167 let source_task = new_pieces.source_tasks.remove(key).unwrap();
1169 let source_task = {
1170 let key = key.clone();
1171 handle_errors(source_task, self.abort_tx.clone(), |error| {
1172 ShutdownError::SourceAborted { key, error }
1173 })
1174 }
1175 .instrument(task_span);
1176 self.source_tasks
1177 .insert(key.clone(), spawn_named(source_task, task_name.as_ref()));
1178 }
1179
1180 pub async fn start_init_validated(
1181 config: Config,
1182 extra_context: ExtraContext,
1183 ) -> Option<(Self, ShutdownErrorReceiver)> {
1184 let diff = ConfigDiff::initial(&config);
1185 let pieces =
1186 TopologyPieces::build_or_log_errors(&config, &diff, HashMap::new(), extra_context)
1187 .await?;
1188 Self::start_validated(config, diff, pieces).await
1189 }
1190
1191 pub async fn start_validated(
1192 config: Config,
1193 diff: ConfigDiff,
1194 mut pieces: TopologyPieces,
1195 ) -> Option<(Self, ShutdownErrorReceiver)> {
1196 let (abort_tx, abort_rx) = mpsc::unbounded_channel();
1197
1198 let expire_metrics = match (
1199 config.global.expire_metrics,
1200 config.global.expire_metrics_secs,
1201 ) {
1202 (Some(e), None) => {
1203 warn!(
1204 "DEPRECATED: `expire_metrics` setting is deprecated and will be removed in a future version. Use `expire_metrics_secs` instead."
1205 );
1206 if e < Duration::from_secs(0) {
1207 None
1208 } else {
1209 Some(e.as_secs_f64())
1210 }
1211 }
1212 (Some(_), Some(_)) => {
1213 error!("Cannot set both `expire_metrics` and `expire_metrics_secs`.");
1214 return None;
1215 }
1216 (None, Some(e)) => {
1217 if e < 0f64 {
1218 None
1219 } else {
1220 Some(e)
1221 }
1222 }
1223 (None, None) => Some(300f64),
1224 };
1225
1226 if let Err(error) = crate::metrics::Controller::get()
1227 .expect("Metrics must be initialized")
1228 .set_expiry(
1229 expire_metrics,
1230 config
1231 .global
1232 .expire_metrics_per_metric_set
1233 .clone()
1234 .unwrap_or_default(),
1235 )
1236 {
1237 error!(message = "Invalid metrics expiry.", %error);
1238 return None;
1239 }
1240
1241 let mut utilization_emitter = pieces
1242 .utilization_emitter
1243 .take()
1244 .expect("Topology is missing the utilization metric emitter!");
1245 let mut running_topology = Self::new(config, abort_tx);
1246
1247 if !running_topology
1248 .run_healthchecks(&diff, &mut pieces, running_topology.config.healthchecks)
1249 .await
1250 {
1251 return None;
1252 }
1253 running_topology.connect_diff(&diff, &mut pieces).await;
1254 running_topology.spawn_diff(&diff, pieces);
1255
1256 let (utilization_task_shutdown_trigger, utilization_shutdown_signal, _) =
1257 ShutdownSignal::new_wired();
1258 running_topology.utilization_task_shutdown_trigger =
1259 Some(utilization_task_shutdown_trigger);
1260 running_topology.utilization_task = Some(tokio::spawn(Task::new(
1261 "utilization_heartbeat".into(),
1262 "",
1263 async move {
1264 utilization_emitter
1265 .run_utilization(utilization_shutdown_signal)
1266 .await;
1267 Ok(TaskOutput::Healthcheck)
1270 },
1271 )));
1272
1273 Some((running_topology, abort_rx))
1274 }
1275}
1276
1277fn get_changed_outputs(diff: &ConfigDiff, output_ids: Inputs<OutputId>) -> Vec<OutputId> {
1278 let mut changed_outputs = Vec::new();
1279
1280 for source_key in &diff.sources.to_change {
1281 changed_outputs.extend(
1282 output_ids
1283 .iter()
1284 .filter(|id| &id.component == source_key)
1285 .cloned(),
1286 );
1287 }
1288
1289 for transform_key in &diff.transforms.to_change {
1290 changed_outputs.extend(
1291 output_ids
1292 .iter()
1293 .filter(|id| &id.component == transform_key)
1294 .cloned(),
1295 );
1296 }
1297
1298 changed_outputs
1299}