1use std::{
2 collections::{HashMap, HashSet},
3 sync::{
4 Arc, Mutex,
5 atomic::{AtomicBool, Ordering},
6 },
7};
8
9use futures::{Future, FutureExt, future};
10use snafu::Snafu;
11use stream_cancel::Trigger;
12use tokio::{
13 sync::{mpsc, watch},
14 time::{Duration, Instant, interval, sleep_until},
15};
16use tracing::Instrument;
17use vector_lib::{
18 buffers::topology::channel::BufferSender,
19 shutdown::ShutdownSignal,
20 tap::topology::{TapOutput, TapResource, WatchRx, WatchTx},
21 trigger::DisabledTrigger,
22};
23
24use super::{
25 BuiltBuffer, TaskHandle,
26 builder::{self, TopologyPieces, TopologyPiecesBuilder, reload_enrichment_tables},
27 fanout::{ControlChannel, ControlMessage},
28 handle_errors, retain, take_healthchecks,
29 task::{Task, TaskOutput},
30};
31use crate::{
32 config::{ComponentKey, Config, ConfigDiff, HealthcheckOptions, Inputs, OutputId, Resource},
33 event::EventArray,
34 extra_context::ExtraContext,
35 shutdown::SourceShutdownCoordinator,
36 signal::ShutdownError,
37 spawn_named,
38 utilization::UtilizationRegistry,
39};
40
41pub type ShutdownErrorReceiver = mpsc::UnboundedReceiver<ShutdownError>;
42
43#[derive(Debug, Snafu)]
44pub enum ReloadError {
45 #[snafu(display("global options changed: {}", changed_fields.join(", ")))]
46 GlobalOptionsChanged { changed_fields: Vec<String> },
47 #[snafu(display("failed to compute global diff: {}", source))]
48 GlobalDiffFailed { source: serde_json::Error },
49 #[snafu(display("topology build failed"))]
50 TopologyBuildFailed,
51 #[snafu(display("failed to restore previous config"))]
52 FailedToRestore,
53}
54
55#[allow(dead_code)]
56pub struct RunningTopology {
57 inputs: HashMap<ComponentKey, BufferSender<EventArray>>,
58 inputs_tap_metadata: HashMap<ComponentKey, Inputs<OutputId>>,
59 outputs: HashMap<OutputId, ControlChannel>,
60 outputs_tap_metadata: HashMap<ComponentKey, (&'static str, String)>,
61 source_tasks: HashMap<ComponentKey, TaskHandle>,
62 tasks: HashMap<ComponentKey, TaskHandle>,
63 shutdown_coordinator: SourceShutdownCoordinator,
64 detach_triggers: HashMap<ComponentKey, DisabledTrigger>,
65 pub(crate) config: Config,
66 pub(crate) abort_tx: mpsc::UnboundedSender<ShutdownError>,
67 watch: (WatchTx, WatchRx),
68 pub(crate) running: Arc<AtomicBool>,
69 graceful_shutdown_duration: Option<Duration>,
70 utilization_registry: Option<UtilizationRegistry>,
71 utilization_task: Option<TaskHandle>,
72 utilization_task_shutdown_trigger: Option<Trigger>,
73 metrics_task: Option<TaskHandle>,
74 metrics_task_shutdown_trigger: Option<Trigger>,
75 pending_reload: Option<HashSet<ComponentKey>>,
76}
77
78impl RunningTopology {
79 pub fn new(config: Config, abort_tx: mpsc::UnboundedSender<ShutdownError>) -> Self {
80 Self {
81 inputs: HashMap::new(),
82 inputs_tap_metadata: HashMap::new(),
83 outputs: HashMap::new(),
84 outputs_tap_metadata: HashMap::new(),
85 shutdown_coordinator: SourceShutdownCoordinator::default(),
86 detach_triggers: HashMap::new(),
87 source_tasks: HashMap::new(),
88 tasks: HashMap::new(),
89 abort_tx,
90 watch: watch::channel(TapResource::default()),
91 running: Arc::new(AtomicBool::new(true)),
92 graceful_shutdown_duration: config.graceful_shutdown_duration,
93 config,
94 utilization_registry: None,
95 utilization_task: None,
96 utilization_task_shutdown_trigger: None,
97 metrics_task: None,
98 metrics_task_shutdown_trigger: None,
99 pending_reload: None,
100 }
101 }
102
103 pub const fn config(&self) -> &Config {
105 &self.config
106 }
107
108 pub fn extend_reload_set(&mut self, new_set: HashSet<ComponentKey>) {
111 match &mut self.pending_reload {
112 None => self.pending_reload = Some(new_set.clone()),
113 Some(existing) => existing.extend(new_set),
114 }
115 }
116
117 pub fn watch(&self) -> watch::Receiver<TapResource> {
121 self.watch.1.clone()
122 }
123
124 pub fn sources_finished(&self) -> future::BoxFuture<'static, ()> {
132 self.shutdown_coordinator.shutdown_tripwire()
133 }
134
135 pub fn stop(self) -> impl Future<Output = ()> {
149 self.running.store(false, Ordering::Relaxed);
151 let mut wait_handles = Vec::new();
154 let mut check_handles = HashMap::<ComponentKey, Vec<_>>::new();
157
158 let map_closure = |_result| ();
159
160 for (key, task) in self.tasks.into_iter().chain(self.source_tasks.into_iter()) {
163 let task = task.map(map_closure).shared();
164
165 wait_handles.push(task.clone());
166 check_handles.entry(key).or_default().push(task);
167 }
168
169 if let Some(utilization_task) = self.utilization_task {
170 wait_handles.push(utilization_task.map(map_closure).shared());
171 }
172
173 if let Some(metrics_task) = self.metrics_task {
174 wait_handles.push(metrics_task.map(map_closure).shared());
175 }
176
177 let deadline = self
179 .graceful_shutdown_duration
180 .map(|grace_period| Instant::now() + grace_period);
181
182 let timeout = if let Some(deadline) = deadline {
183 let mut check_handles2 = check_handles.clone();
187 Box::pin(async move {
188 sleep_until(deadline).await;
189 check_handles2.retain(|_key, handles| {
191 retain(handles, |handle| handle.peek().is_none());
192 !handles.is_empty()
193 });
194 let remaining_components = check_handles2
195 .keys()
196 .map(|item| item.to_string())
197 .collect::<Vec<_>>()
198 .join(", ");
199
200 error!(
201 components = ?remaining_components,
202 message = "Failed to gracefully shut down in time. Killing components.",
203 internal_log_rate_limit = false
204 );
205 }) as future::BoxFuture<'static, ()>
206 } else {
207 Box::pin(future::pending()) as future::BoxFuture<'static, ()>
208 };
209
210 let mut interval = interval(Duration::from_secs(5));
212 let reporter = async move {
213 loop {
214 interval.tick().await;
215
216 check_handles.retain(|_key, handles| {
218 retain(handles, |handle| handle.peek().is_none());
219 !handles.is_empty()
220 });
221 let remaining_components = check_handles
222 .keys()
223 .map(|item| item.to_string())
224 .collect::<Vec<_>>()
225 .join(", ");
226
227 let (deadline_passed, time_remaining) = match deadline {
228 Some(d) => match d.checked_duration_since(Instant::now()) {
229 Some(remaining) => (false, format!("{} seconds left", remaining.as_secs())),
230 None => (true, "overdue".to_string()),
231 },
232 None => (false, "no time limit".to_string()),
233 };
234
235 info!(
236 remaining_components = ?remaining_components,
237 time_remaining = ?time_remaining,
238 "Shutting down... Waiting on running components."
239 );
240
241 let all_done = check_handles.is_empty();
242
243 if all_done {
244 info!("Shutdown reporter exiting: all components shut down.");
245 break;
246 } else if deadline_passed {
247 error!(remaining_components = ?remaining_components, "Shutdown reporter: deadline exceeded.");
248 break;
249 }
250 }
251 };
252
253 let success = futures::future::join_all(wait_handles).map(|_| ());
255
256 let shutdown_complete_future = future::select_all(vec![
258 Box::pin(timeout) as future::BoxFuture<'static, ()>,
259 Box::pin(reporter) as future::BoxFuture<'static, ()>,
260 Box::pin(success) as future::BoxFuture<'static, ()>,
261 ]);
262
263 let source_shutdown_complete = self.shutdown_coordinator.shutdown_all(deadline);
265 if let Some(trigger) = self.utilization_task_shutdown_trigger {
266 trigger.cancel();
267 }
268 if let Some(trigger) = self.metrics_task_shutdown_trigger {
269 trigger.cancel();
270 }
271
272 futures::future::join(source_shutdown_complete, shutdown_complete_future).map(|_| ())
273 }
274
275 pub async fn reload_config_and_respawn(
287 &mut self,
288 new_config: Config,
289 extra_context: ExtraContext,
290 ) -> Result<(), ReloadError> {
291 info!("Reloading running topology with new configuration.");
292
293 if self.config.global != new_config.global {
294 return match self.config.global.diff(&new_config.global) {
295 Ok(changed_fields) => Err(ReloadError::GlobalOptionsChanged { changed_fields }),
296 Err(source) => Err(ReloadError::GlobalDiffFailed { source }),
297 };
298 }
299
300 let diff = if let Some(components) = &self.pending_reload {
306 ConfigDiff::new(&self.config, &new_config, components.clone())
307 } else {
308 ConfigDiff::new(&self.config, &new_config, HashSet::new())
309 };
310 let buffers = self.shutdown_diff(&diff, &new_config).await;
311
312 if cfg!(windows) {
316 tokio::time::sleep(Duration::from_millis(200)).await;
318 }
319
320 if let Some(mut new_pieces) = TopologyPiecesBuilder::new(&new_config, &diff)
324 .with_buffers(buffers.clone())
325 .with_extra_context(extra_context.clone())
326 .with_utilization_registry(self.utilization_registry.clone())
327 .build_or_log_errors()
328 .await
329 {
330 if self
334 .run_healthchecks(&diff, &mut new_pieces, new_config.healthchecks)
335 .await
336 {
337 self.connect_diff(&diff, &mut new_pieces).await;
338 self.spawn_diff(&diff, new_pieces);
339 self.config = new_config;
340
341 info!("New configuration loaded successfully.");
342
343 return Ok(());
344 }
345 }
346
347 warn!("Failed to completely load new configuration. Restoring old configuration.");
351
352 let diff = diff.flip();
353 if let Some(mut new_pieces) = TopologyPiecesBuilder::new(&self.config, &diff)
354 .with_buffers(buffers)
355 .with_extra_context(extra_context.clone())
356 .with_utilization_registry(self.utilization_registry.clone())
357 .build_or_log_errors()
358 .await
359 && self
360 .run_healthchecks(&diff, &mut new_pieces, self.config.healthchecks)
361 .await
362 {
363 self.connect_diff(&diff, &mut new_pieces).await;
364 self.spawn_diff(&diff, new_pieces);
365
366 info!("Old configuration restored successfully.");
367
368 return Err(ReloadError::TopologyBuildFailed);
369 }
370
371 error!(
372 message = "Failed to restore old configuration.",
373 internal_log_rate_limit = false
374 );
375
376 Err(ReloadError::FailedToRestore)
377 }
378
379 pub(crate) async fn reload_enrichment_tables(&self) {
381 reload_enrichment_tables(&self.config).await;
382 }
383
384 pub(crate) async fn run_healthchecks(
385 &mut self,
386 diff: &ConfigDiff,
387 pieces: &mut TopologyPieces,
388 options: HealthcheckOptions,
389 ) -> bool {
390 if options.enabled {
391 let healthchecks = take_healthchecks(diff, pieces)
392 .into_iter()
393 .map(|(_, task)| task);
394 let healthchecks = future::try_join_all(healthchecks);
395
396 info!("Running healthchecks.");
397 if options.require_healthy {
398 let success = healthchecks.await;
399
400 if success.is_ok() {
401 info!("All healthchecks passed.");
402 true
403 } else {
404 error!(
405 message = "Sinks unhealthy.",
406 internal_log_rate_limit = false
407 );
408 false
409 }
410 } else {
411 tokio::spawn(healthchecks);
412 true
413 }
414 } else {
415 true
416 }
417 }
418
419 async fn shutdown_diff(
423 &mut self,
424 diff: &ConfigDiff,
425 new_config: &Config,
426 ) -> HashMap<ComponentKey, BuiltBuffer> {
427 if diff.sources.any_changed_or_removed() {
430 let timeout = Duration::from_secs(30);
431 let mut source_shutdown_handles = Vec::new();
432
433 let deadline = Instant::now() + timeout;
434 for key in &diff.sources.to_remove {
435 debug!(component_id = %key, "Removing source.");
436
437 let previous = self.tasks.remove(key).unwrap();
438 drop(previous); self.remove_outputs(key);
441 source_shutdown_handles
442 .push(self.shutdown_coordinator.shutdown_source(key, deadline));
443 }
444
445 for key in &diff.sources.to_change {
446 debug!(component_id = %key, "Changing source.");
447
448 self.remove_outputs(key);
449 source_shutdown_handles
450 .push(self.shutdown_coordinator.shutdown_source(key, deadline));
451 }
452
453 debug!(
454 "Waiting for up to {} seconds for source(s) to finish shutting down.",
455 timeout.as_secs()
456 );
457 futures::future::join_all(source_shutdown_handles).await;
458
459 for key in diff.sources.removed_and_changed() {
461 if let Some(task) = self.source_tasks.remove(key) {
462 task.await.unwrap().unwrap();
463 }
464 }
465 }
466
467 for key in &diff.transforms.to_remove {
475 debug!(component_id = %key, "Removing transform.");
476
477 let previous = self.tasks.remove(key).unwrap();
478 drop(previous); self.remove_inputs(key, diff, new_config).await;
481 self.remove_outputs(key);
482
483 if let Some(registry) = self.utilization_registry.as_ref() {
484 registry.remove_component(key);
485 }
486 }
487
488 for key in &diff.transforms.to_change {
489 debug!(component_id = %key, "Changing transform.");
490
491 self.remove_inputs(key, diff, new_config).await;
492 self.remove_outputs(key);
493 }
494
495 let removed_table_sinks = diff
501 .enrichment_tables
502 .removed_and_changed()
503 .filter_map(|key| {
504 self.config
505 .enrichment_table(key)
506 .and_then(|t| t.as_sink(key))
507 .map(|(key, s)| (key.clone(), s.resources(&key)))
508 })
509 .collect::<Vec<_>>();
510 let remove_sink = diff
511 .sinks
512 .removed_and_changed()
513 .map(|key| {
514 (
515 key,
516 self.config
517 .sink(key)
518 .map(|s| s.resources(key))
519 .unwrap_or_default(),
520 )
521 })
522 .chain(removed_table_sinks.iter().map(|(k, s)| (k, s.clone())));
523 let add_source = diff
524 .sources
525 .changed_and_added()
526 .map(|key| (key, new_config.source(key).unwrap().inner.resources()));
527 let added_table_sinks = diff
528 .enrichment_tables
529 .changed_and_added()
530 .filter_map(|key| {
531 self.config
532 .enrichment_table(key)
533 .and_then(|t| t.as_sink(key))
534 .map(|(key, s)| (key.clone(), s.resources(&key)))
535 })
536 .collect::<Vec<_>>();
537 let add_sink = diff
538 .sinks
539 .changed_and_added()
540 .map(|key| {
541 (
542 key,
543 new_config
544 .sink(key)
545 .map(|s| s.resources(key))
546 .unwrap_or_default(),
547 )
548 })
549 .chain(added_table_sinks.iter().map(|(k, s)| (k, s.clone())));
550 let conflicts = Resource::conflicts(
551 remove_sink.map(|(key, value)| ((true, key), value)).chain(
552 add_sink
553 .chain(add_source)
554 .map(|(key, value)| ((false, key), value)),
555 ),
556 )
557 .into_iter()
558 .flat_map(|(_, components)| components)
559 .collect::<HashSet<_>>();
560 let conflicting_sinks = conflicts
562 .into_iter()
563 .filter(|&(existing_sink, _)| existing_sink)
564 .map(|(_, key)| key.clone());
565
566 let reuse_buffers = diff
568 .sinks
569 .to_change
570 .iter()
571 .filter(|&key| {
572 if diff.components_to_reload.contains(key) {
573 return false;
574 }
575 self.config.sink(key).map(|s| s.buffer.clone()).or_else(|| {
576 self.config
577 .enrichment_table(key)
578 .and_then(|t| t.as_sink(key))
579 .map(|(_, s)| s.buffer)
580 }) == new_config.sink(key).map(|s| s.buffer.clone()).or_else(|| {
581 self.config
582 .enrichment_table(key)
583 .and_then(|t| t.as_sink(key))
584 .map(|(_, s)| s.buffer)
585 })
586 })
587 .cloned()
588 .collect::<HashSet<_>>();
589
590 let wait_for_sinks = conflicting_sinks
594 .chain(reuse_buffers.iter().cloned())
595 .collect::<HashSet<_>>();
596
597 let removed_sinks = diff
599 .sinks
600 .to_remove
601 .iter()
602 .chain(diff.enrichment_tables.to_remove.iter().filter(|key| {
603 self.config
604 .enrichment_table(key)
605 .and_then(|t| t.as_sink(key))
606 .is_some()
607 }))
608 .collect::<Vec<_>>();
609 for key in &removed_sinks {
610 debug!(component_id = %key, "Removing sink.");
611 self.remove_inputs(key, diff, new_config).await;
612
613 if let Some(registry) = self.utilization_registry.as_ref() {
614 registry.remove_component(key);
615 }
616 }
617
618 let mut buffer_tx = HashMap::new();
621
622 let sinks_to_change = diff
623 .sinks
624 .to_change
625 .iter()
626 .chain(diff.enrichment_tables.to_change.iter().filter(|key| {
627 self.config
628 .enrichment_table(key)
629 .and_then(|t| t.as_sink(key))
630 .is_some()
631 }))
632 .collect::<Vec<_>>();
633
634 for key in &sinks_to_change {
635 debug!(component_id = %key, "Changing sink.");
636 if reuse_buffers.contains(key) {
637 self.detach_triggers
638 .remove(key)
639 .unwrap()
640 .into_inner()
641 .cancel();
642
643 buffer_tx.insert((*key).clone(), self.inputs.get(key).unwrap().clone());
654 }
655 self.remove_inputs(key, diff, new_config).await;
656 }
657
658 for key in &removed_sinks {
665 let previous = self.tasks.remove(key).unwrap();
666 if wait_for_sinks.contains(key) {
667 debug!(message = "Waiting for sink to shutdown.", component_id = %key);
668 previous.await.unwrap().unwrap();
669 } else {
670 drop(previous); }
672 }
673
674 let mut buffers = HashMap::<ComponentKey, BuiltBuffer>::new();
675 for key in &sinks_to_change {
676 if wait_for_sinks.contains(key) {
677 let previous = self.tasks.remove(key).unwrap();
678 debug!(message = "Waiting for sink to shutdown.", component_id = %key);
679 let buffer = previous.await.unwrap().unwrap();
680
681 if reuse_buffers.contains(key) {
682 let tx = buffer_tx.remove(key).unwrap();
690 let rx = match buffer {
691 TaskOutput::Sink(rx) => rx.into_inner(),
692 _ => unreachable!(),
693 };
694
695 buffers.insert((*key).clone(), (tx, Arc::new(Mutex::new(Some(rx)))));
696 }
697 }
698 }
699
700 buffers
701 }
702
703 pub(crate) async fn connect_diff(
705 &mut self,
706 diff: &ConfigDiff,
707 new_pieces: &mut TopologyPieces,
708 ) {
709 debug!("Connecting changed/added component(s).");
710
711 if !self.watch.0.is_closed() {
713 for key in &diff.sources.to_remove {
714 self.outputs_tap_metadata.remove(key);
716 }
717
718 for key in &diff.transforms.to_remove {
719 self.outputs_tap_metadata.remove(key);
721 self.inputs_tap_metadata.remove(key);
722 }
723
724 for key in &diff.sinks.to_remove {
725 self.inputs_tap_metadata.remove(key);
727 }
728
729 let removed_sinks = diff.enrichment_tables.to_remove.iter().filter(|key| {
730 self.config
731 .enrichment_table(key)
732 .and_then(|t| t.as_sink(key))
733 .is_some()
734 });
735 for key in removed_sinks {
736 self.inputs_tap_metadata.remove(key);
738 }
739
740 let removed_sources = diff.enrichment_tables.to_remove.iter().filter_map(|key| {
741 self.config
742 .enrichment_table(key)
743 .and_then(|t| t.as_source(key).map(|(key, _)| key))
744 });
745 for key in removed_sources {
746 self.outputs_tap_metadata.remove(&key);
748 }
749
750 for key in diff.sources.changed_and_added() {
751 if let Some(task) = new_pieces.tasks.get(key) {
752 self.outputs_tap_metadata
753 .insert(key.clone(), ("source", task.typetag().to_string()));
754 }
755 }
756
757 for key in diff
758 .enrichment_tables
759 .changed_and_added()
760 .filter_map(|key| {
761 self.config
762 .enrichment_table(key)
763 .and_then(|t| t.as_source(key).map(|(key, _)| key))
764 })
765 {
766 if let Some(task) = new_pieces.tasks.get(&key) {
767 self.outputs_tap_metadata
768 .insert(key.clone(), ("source", task.typetag().to_string()));
769 }
770 }
771
772 for key in diff.transforms.changed_and_added() {
773 if let Some(task) = new_pieces.tasks.get(key) {
774 self.outputs_tap_metadata
775 .insert(key.clone(), ("transform", task.typetag().to_string()));
776 }
777 }
778
779 for (key, input) in &new_pieces.inputs {
780 self.inputs_tap_metadata
781 .insert(key.clone(), input.1.clone());
782 }
783 }
784
785 for key in diff.sources.changed_and_added() {
788 debug!(component_id = %key, "Configuring outputs for source.");
789 self.setup_outputs(key, new_pieces).await;
790 }
791
792 let added_changed_table_sources: Vec<&ComponentKey> = diff
793 .enrichment_tables
794 .changed_and_added()
795 .filter(|k| new_pieces.source_tasks.contains_key(k))
796 .collect();
797 for key in added_changed_table_sources.iter() {
798 debug!(component_id = %key, "Connecting outputs for enrichment table source.");
799 self.setup_outputs(key, new_pieces).await;
800 }
801
802 for key in diff.transforms.changed_and_added() {
805 debug!(component_id = %key, "Configuring outputs for transform.");
806 self.setup_outputs(key, new_pieces).await;
807 }
808
809 for key in diff.transforms.changed_and_added() {
812 debug!(component_id = %key, "Connecting inputs for transform.");
813 self.setup_inputs(key, diff, new_pieces).await;
814 }
815
816 for key in diff.sinks.changed_and_added() {
818 debug!(component_id = %key, "Connecting inputs for sink.");
819 self.setup_inputs(key, diff, new_pieces).await;
820 }
821 let added_changed_tables: Vec<&ComponentKey> = diff
822 .enrichment_tables
823 .changed_and_added()
824 .filter(|k| new_pieces.inputs.contains_key(k))
825 .collect();
826 for key in added_changed_tables.iter() {
827 debug!(component_id = %key, "Connecting inputs for enrichment table sink.");
828 self.setup_inputs(key, diff, new_pieces).await;
829 }
830
831 self.reattach_severed_inputs(diff);
842
843 if !self.watch.0.is_closed() {
845 let outputs = self
846 .outputs
847 .clone()
848 .into_iter()
849 .flat_map(|(output_id, control_tx)| {
850 self.outputs_tap_metadata.get(&output_id.component).map(
851 |(component_kind, component_type)| {
852 (
853 TapOutput {
854 output_id,
855 component_kind,
856 component_type: component_type.clone(),
857 },
858 control_tx,
859 )
860 },
861 )
862 })
863 .collect::<HashMap<_, _>>();
864
865 let mut removals = diff.sources.to_remove.clone();
866 removals.extend(diff.transforms.to_remove.iter().cloned());
867 self.watch
868 .0
869 .send(TapResource {
870 outputs,
871 inputs: self.inputs_tap_metadata.clone(),
872 source_keys: diff
873 .sources
874 .changed_and_added()
875 .map(|key| key.to_string())
876 .chain(
877 added_changed_table_sources
878 .iter()
879 .map(|key| key.to_string()),
880 )
881 .collect(),
882 sink_keys: diff
883 .sinks
884 .changed_and_added()
885 .map(|key| key.to_string())
886 .chain(added_changed_tables.iter().map(|key| key.to_string()))
887 .collect(),
888 removals,
891 })
892 .expect("Couldn't broadcast config changes.");
893 }
894 }
895
896 async fn setup_outputs(
897 &mut self,
898 key: &ComponentKey,
899 new_pieces: &mut builder::TopologyPieces,
900 ) {
901 let outputs = new_pieces.outputs.remove(key).unwrap();
902 for (port, output) in outputs {
903 debug!(component_id = %key, output_id = ?port, "Configuring output for component.");
904
905 let id = OutputId {
906 component: key.clone(),
907 port,
908 };
909
910 self.outputs.insert(id, output);
911 }
912 }
913
914 async fn setup_inputs(
915 &mut self,
916 key: &ComponentKey,
917 diff: &ConfigDiff,
918 new_pieces: &mut builder::TopologyPieces,
919 ) {
920 let (tx, inputs) = new_pieces.inputs.remove(key).unwrap();
921
922 let old_inputs = self
923 .config
924 .inputs_for_node(key)
925 .into_iter()
926 .flatten()
927 .cloned()
928 .collect::<HashSet<_>>();
929
930 let new_inputs = inputs.iter().cloned().collect::<HashSet<_>>();
931 let inputs_to_add = &new_inputs - &old_inputs;
932
933 for input in inputs {
934 let output = self.outputs.get_mut(&input).expect("unknown output");
935
936 if diff.contains(&input.component) || inputs_to_add.contains(&input) {
937 debug!(component_id = %key, fanout_id = %input, "Adding component input to fanout.");
941
942 _ = output.send(ControlMessage::Add(key.clone(), tx.clone()));
943 } else {
944 debug!(component_id = %key, fanout_id = %input, "Replacing component input in fanout.");
949
950 _ = output.send(ControlMessage::Replace(key.clone(), tx.clone()));
951 }
952 }
953
954 self.inputs.insert(key.clone(), tx);
955 new_pieces
956 .detach_triggers
957 .remove(key)
958 .map(|trigger| self.detach_triggers.insert(key.clone(), trigger.into()));
959 }
960
961 fn remove_outputs(&mut self, key: &ComponentKey) {
962 self.outputs.retain(|id, _output| &id.component != key);
963 }
964
965 async fn remove_inputs(&mut self, key: &ComponentKey, diff: &ConfigDiff, new_config: &Config) {
966 self.inputs.remove(key);
967 self.detach_triggers.remove(key);
968
969 let old_inputs = self.config.inputs_for_node(key).expect("node exists");
970 let new_inputs = new_config
971 .inputs_for_node(key)
972 .unwrap_or_default()
973 .iter()
974 .collect::<HashSet<_>>();
975
976 for input in old_inputs {
977 if let Some(output) = self.outputs.get_mut(input) {
978 if diff.contains(&input.component)
979 || diff.is_removed(key)
980 || !new_inputs.contains(input)
981 {
982 debug!(component_id = %key, fanout_id = %input, "Removing component input from fanout.");
993
994 _ = output.send(ControlMessage::Remove(key.clone()));
995 } else {
996 debug!(component_id = %key, fanout_id = %input, "Pausing component input in fanout.");
1000
1001 _ = output.send(ControlMessage::Pause(key.clone()));
1002 }
1003 }
1004 }
1005 }
1006
1007 fn reattach_severed_inputs(&mut self, diff: &ConfigDiff) {
1008 let unchanged_transforms = self
1009 .config
1010 .transforms()
1011 .filter(|(key, _)| !diff.transforms.contains(key));
1012 for (transform_key, transform) in unchanged_transforms {
1013 let changed_outputs = get_changed_outputs(diff, transform.inputs.clone());
1014 for output_id in changed_outputs {
1015 debug!(component_id = %transform_key, fanout_id = %output_id.component, "Reattaching component input to fanout.");
1016
1017 let input = self.inputs.get(transform_key).cloned().unwrap();
1018 let output = self.outputs.get_mut(&output_id).unwrap();
1019 _ = output.send(ControlMessage::Add(transform_key.clone(), input));
1020 }
1021 }
1022
1023 let unchanged_sinks = self
1024 .config
1025 .sinks()
1026 .filter(|(key, _)| !diff.sinks.contains(key));
1027 for (sink_key, sink) in unchanged_sinks {
1028 let changed_outputs = get_changed_outputs(diff, sink.inputs.clone());
1029 for output_id in changed_outputs {
1030 debug!(component_id = %sink_key, fanout_id = %output_id.component, "Reattaching component input to fanout.");
1031
1032 let input = self.inputs.get(sink_key).cloned().unwrap();
1033 let output = self.outputs.get_mut(&output_id).unwrap();
1034 _ = output.send(ControlMessage::Add(sink_key.clone(), input));
1035 }
1036 }
1037 }
1038
1039 pub(crate) fn spawn_diff(&mut self, diff: &ConfigDiff, mut new_pieces: TopologyPieces) {
1041 for key in &diff.sources.to_change {
1042 debug!(message = "Spawning changed source.", component_id = %key);
1043 self.spawn_source(key, &mut new_pieces);
1044 }
1045
1046 for key in &diff.sources.to_add {
1047 debug!(message = "Spawning new source.", component_id = %key);
1048 self.spawn_source(key, &mut new_pieces);
1049 }
1050
1051 let changed_table_sources: Vec<&ComponentKey> = diff
1052 .enrichment_tables
1053 .to_change
1054 .iter()
1055 .filter(|k| new_pieces.source_tasks.contains_key(k))
1056 .collect();
1057
1058 let added_table_sources: Vec<&ComponentKey> = diff
1059 .enrichment_tables
1060 .to_add
1061 .iter()
1062 .filter(|k| new_pieces.source_tasks.contains_key(k))
1063 .collect();
1064
1065 for key in changed_table_sources {
1066 debug!(message = "Spawning changed enrichment table source.", component_id = %key);
1067 self.spawn_source(key, &mut new_pieces);
1068 }
1069
1070 for key in added_table_sources {
1071 debug!(message = "Spawning new enrichment table source.", component_id = %key);
1072 self.spawn_source(key, &mut new_pieces);
1073 }
1074
1075 for key in &diff.transforms.to_change {
1076 debug!(message = "Spawning changed transform.", component_id = %key);
1077 self.spawn_transform(key, &mut new_pieces);
1078 }
1079
1080 for key in &diff.transforms.to_add {
1081 debug!(message = "Spawning new transform.", component_id = %key);
1082 self.spawn_transform(key, &mut new_pieces);
1083 }
1084
1085 for key in &diff.sinks.to_change {
1086 debug!(message = "Spawning changed sink.", component_id = %key);
1087 self.spawn_sink(key, &mut new_pieces);
1088 }
1089
1090 for key in &diff.sinks.to_add {
1091 trace!(message = "Spawning new sink.", component_id = %key);
1092 self.spawn_sink(key, &mut new_pieces);
1093 }
1094
1095 let changed_tables: Vec<&ComponentKey> = diff
1096 .enrichment_tables
1097 .to_change
1098 .iter()
1099 .filter(|k| {
1100 new_pieces.tasks.contains_key(k) && !new_pieces.source_tasks.contains_key(k)
1101 })
1102 .collect();
1103
1104 let added_tables: Vec<&ComponentKey> = diff
1105 .enrichment_tables
1106 .to_add
1107 .iter()
1108 .filter(|k| {
1109 new_pieces.tasks.contains_key(k) && !new_pieces.source_tasks.contains_key(k)
1110 })
1111 .collect();
1112
1113 for key in changed_tables {
1114 debug!(message = "Spawning changed enrichment table sink.", component_id = %key);
1115 self.spawn_sink(key, &mut new_pieces);
1116 }
1117
1118 for key in added_tables {
1119 debug!(message = "Spawning enrichment table new sink.", component_id = %key);
1120 self.spawn_sink(key, &mut new_pieces);
1121 }
1122 }
1123
1124 fn spawn_sink(&mut self, key: &ComponentKey, new_pieces: &mut builder::TopologyPieces) {
1125 let task = new_pieces.tasks.remove(key).unwrap();
1126 let span = error_span!(
1127 "sink",
1128 component_kind = "sink",
1129 component_id = %task.id(),
1130 component_type = %task.typetag(),
1131 );
1132
1133 let task_span = span.or_current();
1134 #[cfg(feature = "allocation-tracing")]
1135 if crate::internal_telemetry::allocations::is_allocation_tracing_enabled() {
1136 let group_id = crate::internal_telemetry::allocations::acquire_allocation_group_id(
1137 task.id().to_string(),
1138 "sink".to_string(),
1139 task.typetag().to_string(),
1140 );
1141 debug!(
1142 component_kind = "sink",
1143 component_type = task.typetag(),
1144 component_id = task.id(),
1145 group_id = group_id.as_raw().to_string(),
1146 "Registered new allocation group."
1147 );
1148 group_id.attach_to_span(&task_span);
1149 }
1150
1151 let task_name = format!(">> {} ({})", task.typetag(), task.id());
1152 let task = {
1153 let key = key.clone();
1154 handle_errors(task, self.abort_tx.clone(), |error| {
1155 ShutdownError::SinkAborted { key, error }
1156 })
1157 }
1158 .instrument(task_span);
1159 let spawned = spawn_named(task, task_name.as_ref());
1160 if let Some(previous) = self.tasks.insert(key.clone(), spawned) {
1161 drop(previous); }
1163 }
1164
1165 fn spawn_transform(&mut self, key: &ComponentKey, new_pieces: &mut builder::TopologyPieces) {
1166 let task = new_pieces.tasks.remove(key).unwrap();
1167 let span = error_span!(
1168 "transform",
1169 component_kind = "transform",
1170 component_id = %task.id(),
1171 component_type = %task.typetag(),
1172 );
1173
1174 let task_span = span.or_current();
1175 #[cfg(feature = "allocation-tracing")]
1176 if crate::internal_telemetry::allocations::is_allocation_tracing_enabled() {
1177 let group_id = crate::internal_telemetry::allocations::acquire_allocation_group_id(
1178 task.id().to_string(),
1179 "transform".to_string(),
1180 task.typetag().to_string(),
1181 );
1182 debug!(
1183 component_kind = "transform",
1184 component_type = task.typetag(),
1185 component_id = task.id(),
1186 group_id = group_id.as_raw().to_string(),
1187 "Registered new allocation group."
1188 );
1189 group_id.attach_to_span(&task_span);
1190 }
1191
1192 let task_name = format!(">> {} ({}) >>", task.typetag(), task.id());
1193 let task = {
1194 let key = key.clone();
1195 handle_errors(task, self.abort_tx.clone(), |error| {
1196 ShutdownError::TransformAborted { key, error }
1197 })
1198 }
1199 .instrument(task_span);
1200 let spawned = spawn_named(task, task_name.as_ref());
1201 if let Some(previous) = self.tasks.insert(key.clone(), spawned) {
1202 drop(previous); }
1204 }
1205
1206 fn spawn_source(&mut self, key: &ComponentKey, new_pieces: &mut builder::TopologyPieces) {
1207 let task = new_pieces.tasks.remove(key).unwrap();
1208 let span = error_span!(
1209 "source",
1210 component_kind = "source",
1211 component_id = %task.id(),
1212 component_type = %task.typetag(),
1213 );
1214
1215 let task_span = span.or_current();
1216 #[cfg(feature = "allocation-tracing")]
1217 if crate::internal_telemetry::allocations::is_allocation_tracing_enabled() {
1218 let group_id = crate::internal_telemetry::allocations::acquire_allocation_group_id(
1219 task.id().to_string(),
1220 "source".to_string(),
1221 task.typetag().to_string(),
1222 );
1223
1224 debug!(
1225 component_kind = "source",
1226 component_type = task.typetag(),
1227 component_id = task.id(),
1228 group_id = group_id.as_raw().to_string(),
1229 "Registered new allocation group."
1230 );
1231 group_id.attach_to_span(&task_span);
1232 }
1233
1234 let task_name = format!("{} ({}) >>", task.typetag(), task.id());
1235 let task = {
1236 let key = key.clone();
1237 handle_errors(task, self.abort_tx.clone(), |error| {
1238 ShutdownError::SourceAborted { key, error }
1239 })
1240 }
1241 .instrument(task_span.clone());
1242 let spawned = spawn_named(task, task_name.as_ref());
1243 if let Some(previous) = self.tasks.insert(key.clone(), spawned) {
1244 drop(previous); }
1246
1247 self.shutdown_coordinator
1248 .takeover_source(key, &mut new_pieces.shutdown_coordinator);
1249
1250 let source_task = new_pieces.source_tasks.remove(key).unwrap();
1252 let source_task = {
1253 let key = key.clone();
1254 handle_errors(source_task, self.abort_tx.clone(), |error| {
1255 ShutdownError::SourceAborted { key, error }
1256 })
1257 }
1258 .instrument(task_span);
1259 self.source_tasks
1260 .insert(key.clone(), spawn_named(source_task, task_name.as_ref()));
1261 }
1262
1263 pub async fn start_init_validated(
1264 config: Config,
1265 extra_context: ExtraContext,
1266 ) -> Option<(Self, ShutdownErrorReceiver)> {
1267 let diff = ConfigDiff::initial(&config);
1268 let pieces = TopologyPiecesBuilder::new(&config, &diff)
1269 .with_extra_context(extra_context)
1270 .build_or_log_errors()
1271 .await?;
1272 Self::start_validated(config, diff, pieces).await
1273 }
1274
1275 pub async fn start_validated(
1276 config: Config,
1277 diff: ConfigDiff,
1278 mut pieces: TopologyPieces,
1279 ) -> Option<(Self, ShutdownErrorReceiver)> {
1280 let (abort_tx, abort_rx) = mpsc::unbounded_channel();
1281
1282 let expire_metrics = match (
1283 config.global.expire_metrics,
1284 config.global.expire_metrics_secs,
1285 ) {
1286 (Some(e), None) => {
1287 warn!(
1288 "DEPRECATED: `expire_metrics` setting is deprecated and will be removed in a future version. Use `expire_metrics_secs` instead."
1289 );
1290 if e < Duration::from_secs(0) {
1291 None
1292 } else {
1293 Some(e.as_secs_f64())
1294 }
1295 }
1296 (Some(_), Some(_)) => {
1297 error!(
1298 message = "Cannot set both `expire_metrics` and `expire_metrics_secs`.",
1299 internal_log_rate_limit = false
1300 );
1301 return None;
1302 }
1303 (None, Some(e)) => {
1304 if e < 0f64 {
1305 None
1306 } else {
1307 Some(e)
1308 }
1309 }
1310 (None, None) => Some(300f64),
1311 };
1312
1313 if let Err(error) = crate::metrics::Controller::get()
1314 .expect("Metrics must be initialized")
1315 .set_expiry(
1316 expire_metrics,
1317 config
1318 .global
1319 .expire_metrics_per_metric_set
1320 .clone()
1321 .unwrap_or_default(),
1322 )
1323 {
1324 error!(message = "Invalid metrics expiry.", %error, internal_log_rate_limit = false);
1325 return None;
1326 }
1327
1328 let (utilization_emitter, utilization_registry) = pieces
1329 .utilization
1330 .take()
1331 .expect("Topology is missing the utilization metric emitter!");
1332 let metrics_storage = pieces.metrics_storage.clone();
1333 let metrics_refresh_period = config
1334 .global
1335 .metrics_storage_refresh_period
1336 .map(Duration::from_secs_f64);
1337 let mut running_topology = Self::new(config, abort_tx);
1338
1339 if !running_topology
1340 .run_healthchecks(&diff, &mut pieces, running_topology.config.healthchecks)
1341 .await
1342 {
1343 return None;
1344 }
1345 running_topology.connect_diff(&diff, &mut pieces).await;
1346 running_topology.spawn_diff(&diff, pieces);
1347
1348 let (utilization_task_shutdown_trigger, utilization_shutdown_signal, _) =
1349 ShutdownSignal::new_wired();
1350 running_topology.utilization_registry = Some(utilization_registry.clone());
1351 running_topology.utilization_task_shutdown_trigger =
1352 Some(utilization_task_shutdown_trigger);
1353 running_topology.utilization_task = Some(tokio::spawn(Task::new(
1354 "utilization_heartbeat".into(),
1355 "",
1356 async move {
1357 utilization_emitter
1358 .run_utilization(utilization_shutdown_signal)
1359 .await;
1360 Ok(TaskOutput::Healthcheck)
1361 },
1362 )));
1363 if let Some(metrics_refresh_period) = metrics_refresh_period {
1364 let (metrics_task_shutdown_trigger, metrics_shutdown_signal, _) =
1365 ShutdownSignal::new_wired();
1366 running_topology.metrics_task_shutdown_trigger = Some(metrics_task_shutdown_trigger);
1367 running_topology.metrics_task = Some(tokio::spawn(Task::new(
1368 "metrics_heartbeat".into(),
1369 "",
1370 async move {
1371 metrics_storage
1372 .run_periodic_refresh(metrics_refresh_period, metrics_shutdown_signal)
1373 .await;
1374 Ok(TaskOutput::Healthcheck)
1375 },
1376 )));
1377 }
1378
1379 Some((running_topology, abort_rx))
1380 }
1381}
1382
1383fn get_changed_outputs(diff: &ConfigDiff, output_ids: Inputs<OutputId>) -> Vec<OutputId> {
1384 let mut changed_outputs = Vec::new();
1385
1386 for source_key in &diff.sources.to_change {
1387 changed_outputs.extend(
1388 output_ids
1389 .iter()
1390 .filter(|id| &id.component == source_key)
1391 .cloned(),
1392 );
1393 }
1394
1395 for transform_key in &diff.transforms.to_change {
1396 changed_outputs.extend(
1397 output_ids
1398 .iter()
1399 .filter(|id| &id.component == transform_key)
1400 .cloned(),
1401 );
1402 }
1403
1404 changed_outputs
1405}