1use std::{
2 collections::{HashMap, HashSet},
3 sync::{Arc, Mutex},
4};
5
6use futures::{Future, FutureExt, future};
7use snafu::Snafu;
8use stream_cancel::Trigger;
9use tokio::{
10 sync::{mpsc, watch},
11 time::{Duration, Instant, interval, sleep_until},
12};
13use tracing::Instrument;
14use vector_lib::{
15 buffers::topology::channel::BufferSender,
16 shutdown::ShutdownSignal,
17 tap::topology::{TapOutput, TapResource, WatchRx, WatchTx},
18 trigger::DisabledTrigger,
19};
20
21use super::{
22 BuiltBuffer, TaskHandle,
23 builder::{self, TopologyPieces, TopologyPiecesBuilder, reload_enrichment_tables},
24 fanout::{ControlChannel, ControlMessage},
25 handle_errors, retain, take_healthchecks,
26 task::{Task, TaskOutput},
27};
28use crate::{
29 config::{ComponentKey, Config, ConfigDiff, HealthcheckOptions, Inputs, OutputId, Resource},
30 event::EventArray,
31 extra_context::ExtraContext,
32 shutdown::SourceShutdownCoordinator,
33 signal::ShutdownError,
34 spawn_named,
35 utilization::UtilizationRegistry,
36};
37
38pub type ShutdownErrorReceiver = mpsc::UnboundedReceiver<ShutdownError>;
39
40#[derive(Debug, Snafu)]
41pub enum ReloadError {
42 #[snafu(display("global options changed: {}", changed_fields.join(", ")))]
43 GlobalOptionsChanged { changed_fields: Vec<String> },
44 #[snafu(display("failed to compute global diff: {}", source))]
45 GlobalDiffFailed { source: serde_json::Error },
46 #[snafu(display("topology build failed"))]
47 TopologyBuildFailed,
48 #[snafu(display("failed to restore previous config"))]
49 FailedToRestore,
50}
51
52#[allow(dead_code)]
53pub struct RunningTopology {
54 inputs: HashMap<ComponentKey, BufferSender<EventArray>>,
55 inputs_tap_metadata: HashMap<ComponentKey, Inputs<OutputId>>,
56 outputs: HashMap<OutputId, ControlChannel>,
57 outputs_tap_metadata: HashMap<ComponentKey, (&'static str, String)>,
58 component_type_names: HashMap<ComponentKey, String>,
59 source_tasks: HashMap<ComponentKey, TaskHandle>,
60 tasks: HashMap<ComponentKey, TaskHandle>,
61 shutdown_coordinator: SourceShutdownCoordinator,
62 detach_triggers: HashMap<ComponentKey, DisabledTrigger>,
63 pub(crate) config: Config,
64 pub(crate) abort_tx: mpsc::UnboundedSender<ShutdownError>,
65 watch: (WatchTx, WatchRx),
66 graceful_shutdown_duration: Option<Duration>,
67 utilization_registry: Option<UtilizationRegistry>,
68 utilization_task: Option<TaskHandle>,
69 utilization_task_shutdown_trigger: Option<Trigger>,
70 metrics_task: Option<TaskHandle>,
71 metrics_task_shutdown_trigger: Option<Trigger>,
72 pending_reload: Option<HashSet<ComponentKey>>,
73}
74
75impl RunningTopology {
76 pub fn new(config: Config, abort_tx: mpsc::UnboundedSender<ShutdownError>) -> Self {
77 Self {
78 inputs: HashMap::new(),
79 inputs_tap_metadata: HashMap::new(),
80 outputs: HashMap::new(),
81 outputs_tap_metadata: HashMap::new(),
82 component_type_names: HashMap::new(),
83 shutdown_coordinator: SourceShutdownCoordinator::default(),
84 detach_triggers: HashMap::new(),
85 source_tasks: HashMap::new(),
86 tasks: HashMap::new(),
87 abort_tx,
88 watch: watch::channel(TapResource::default()),
89 graceful_shutdown_duration: config.graceful_shutdown_duration,
90 config,
91 utilization_registry: None,
92 utilization_task: None,
93 utilization_task_shutdown_trigger: None,
94 metrics_task: None,
95 metrics_task_shutdown_trigger: None,
96 pending_reload: None,
97 }
98 }
99
100 pub const fn config(&self) -> &Config {
102 &self.config
103 }
104
105 pub fn extend_reload_set(&mut self, new_set: HashSet<ComponentKey>) {
108 match &mut self.pending_reload {
109 None => self.pending_reload = Some(new_set.clone()),
110 Some(existing) => existing.extend(new_set),
111 }
112 }
113
114 pub fn watch(&self) -> watch::Receiver<TapResource> {
118 self.watch.1.clone()
119 }
120
121 pub fn sources_finished(&self) -> future::BoxFuture<'static, ()> {
129 self.shutdown_coordinator.shutdown_tripwire()
130 }
131
132 pub fn stop(self) -> impl Future<Output = ()> {
146 let mut wait_handles = Vec::new();
149 let mut check_handles = HashMap::<ComponentKey, Vec<_>>::new();
152
153 let map_closure = |_result| ();
154
155 for (key, task) in self.tasks.into_iter().chain(self.source_tasks.into_iter()) {
158 let task = task.map(map_closure).shared();
159
160 wait_handles.push(task.clone());
161 check_handles.entry(key).or_default().push(task);
162 }
163
164 if let Some(utilization_task) = self.utilization_task {
165 wait_handles.push(utilization_task.map(map_closure).shared());
166 }
167
168 if let Some(metrics_task) = self.metrics_task {
169 wait_handles.push(metrics_task.map(map_closure).shared());
170 }
171
172 let deadline = self
174 .graceful_shutdown_duration
175 .map(|grace_period| Instant::now() + grace_period);
176
177 let timeout = if let Some(deadline) = deadline {
178 let mut check_handles2 = check_handles.clone();
182 Box::pin(async move {
183 sleep_until(deadline).await;
184 check_handles2.retain(|_key, handles| {
186 retain(handles, |handle| handle.peek().is_none());
187 !handles.is_empty()
188 });
189 let remaining_components = check_handles2
190 .keys()
191 .map(|item| item.to_string())
192 .collect::<Vec<_>>()
193 .join(", ");
194
195 error!(
196 components = ?remaining_components,
197 message = "Failed to gracefully shut down in time. Killing components.",
198 internal_log_rate_limit = false
199 );
200 }) as future::BoxFuture<'static, ()>
201 } else {
202 Box::pin(future::pending()) as future::BoxFuture<'static, ()>
203 };
204
205 let mut interval = interval(Duration::from_secs(5));
207 let reporter = async move {
208 loop {
209 interval.tick().await;
210
211 check_handles.retain(|_key, handles| {
213 retain(handles, |handle| handle.peek().is_none());
214 !handles.is_empty()
215 });
216 let remaining_components = check_handles
217 .keys()
218 .map(|item| item.to_string())
219 .collect::<Vec<_>>()
220 .join(", ");
221
222 let (deadline_passed, time_remaining) = match deadline {
223 Some(d) => match d.checked_duration_since(Instant::now()) {
224 Some(remaining) => (false, format!("{} seconds left", remaining.as_secs())),
225 None => (true, "overdue".to_string()),
226 },
227 None => (false, "no time limit".to_string()),
228 };
229
230 info!(
231 remaining_components = ?remaining_components,
232 time_remaining = ?time_remaining,
233 "Shutting down... Waiting on running components."
234 );
235
236 let all_done = check_handles.is_empty();
237
238 if all_done {
239 info!("Shutdown reporter exiting: all components shut down.");
240 break;
241 } else if deadline_passed {
242 error!(remaining_components = ?remaining_components, "Shutdown reporter: deadline exceeded.");
243 break;
244 }
245 }
246 };
247
248 let success = futures::future::join_all(wait_handles).map(|_| ());
250
251 let shutdown_complete_future = future::select_all(vec![
253 Box::pin(timeout) as future::BoxFuture<'static, ()>,
254 Box::pin(reporter) as future::BoxFuture<'static, ()>,
255 Box::pin(success) as future::BoxFuture<'static, ()>,
256 ]);
257
258 let source_shutdown_complete = self.shutdown_coordinator.shutdown_all(deadline);
260 if let Some(trigger) = self.utilization_task_shutdown_trigger {
261 trigger.cancel();
262 }
263 if let Some(trigger) = self.metrics_task_shutdown_trigger {
264 trigger.cancel();
265 }
266
267 futures::future::join(source_shutdown_complete, shutdown_complete_future).map(|_| ())
268 }
269
270 pub async fn reload_config_and_respawn(
282 &mut self,
283 new_config: Config,
284 extra_context: ExtraContext,
285 ) -> Result<(), ReloadError> {
286 info!("Reloading running topology with new configuration.");
287
288 if self.config.global != new_config.global {
289 return match self.config.global.diff(&new_config.global) {
290 Ok(changed_fields) => Err(ReloadError::GlobalOptionsChanged { changed_fields }),
291 Err(source) => Err(ReloadError::GlobalDiffFailed { source }),
292 };
293 }
294
295 let diff = if let Some(components) = &self.pending_reload {
301 ConfigDiff::new(&self.config, &new_config, components.clone())
302 } else {
303 ConfigDiff::new(&self.config, &new_config, HashSet::new())
304 };
305 let buffers = self.shutdown_diff(&diff, &new_config).await;
306
307 if cfg!(windows) {
311 tokio::time::sleep(Duration::from_millis(200)).await;
313 }
314
315 if let Some(mut new_pieces) = TopologyPiecesBuilder::new(&new_config, &diff)
319 .with_buffers(buffers.clone())
320 .with_extra_context(extra_context.clone())
321 .with_utilization_registry(self.utilization_registry.clone())
322 .build_or_log_errors()
323 .await
324 {
325 if self
329 .run_healthchecks(&diff, &mut new_pieces, new_config.healthchecks)
330 .await
331 {
332 self.connect_diff(&diff, &mut new_pieces).await;
333 self.spawn_diff(&diff, new_pieces);
334 self.config = new_config;
335
336 info!("New configuration loaded successfully.");
337
338 return Ok(());
339 }
340 }
341
342 warn!("Failed to completely load new configuration. Restoring old configuration.");
346
347 let diff = diff.flip();
348 if let Some(mut new_pieces) = TopologyPiecesBuilder::new(&self.config, &diff)
349 .with_buffers(buffers)
350 .with_extra_context(extra_context.clone())
351 .with_utilization_registry(self.utilization_registry.clone())
352 .build_or_log_errors()
353 .await
354 && self
355 .run_healthchecks(&diff, &mut new_pieces, self.config.healthchecks)
356 .await
357 {
358 self.connect_diff(&diff, &mut new_pieces).await;
359 self.spawn_diff(&diff, new_pieces);
360
361 info!("Old configuration restored successfully.");
362
363 return Err(ReloadError::TopologyBuildFailed);
364 }
365
366 error!(
367 message = "Failed to restore old configuration.",
368 internal_log_rate_limit = false
369 );
370
371 Err(ReloadError::FailedToRestore)
372 }
373
374 pub(crate) async fn reload_enrichment_tables(&self) {
376 reload_enrichment_tables(&self.config).await;
377 }
378
379 pub(crate) async fn run_healthchecks(
380 &mut self,
381 diff: &ConfigDiff,
382 pieces: &mut TopologyPieces,
383 options: HealthcheckOptions,
384 ) -> bool {
385 if options.enabled {
386 let healthchecks = take_healthchecks(diff, pieces)
387 .into_iter()
388 .map(|(_, task)| task);
389 let healthchecks = future::try_join_all(healthchecks);
390
391 info!("Running healthchecks.");
392 if options.require_healthy {
393 let success = healthchecks.await;
394
395 if success.is_ok() {
396 info!("All healthchecks passed.");
397 true
398 } else {
399 error!(
400 message = "Sinks unhealthy.",
401 internal_log_rate_limit = false
402 );
403 false
404 }
405 } else {
406 tokio::spawn(healthchecks);
407 true
408 }
409 } else {
410 true
411 }
412 }
413
414 async fn shutdown_diff(
418 &mut self,
419 diff: &ConfigDiff,
420 new_config: &Config,
421 ) -> HashMap<ComponentKey, BuiltBuffer> {
422 if diff.sources.any_changed_or_removed() {
425 let timeout = Duration::from_secs(30);
426 let mut source_shutdown_handles = Vec::new();
427
428 let deadline = Instant::now() + timeout;
429 for key in &diff.sources.to_remove {
430 debug!(component_id = %key, "Removing source.");
431
432 let previous = self.tasks.remove(key).unwrap();
433 drop(previous); self.remove_outputs(key);
436 source_shutdown_handles
437 .push(self.shutdown_coordinator.shutdown_source(key, deadline));
438 }
439
440 for key in &diff.sources.to_change {
441 debug!(component_id = %key, "Changing source.");
442
443 self.remove_outputs(key);
444 source_shutdown_handles
445 .push(self.shutdown_coordinator.shutdown_source(key, deadline));
446 }
447
448 debug!(
449 "Waiting for up to {} seconds for source(s) to finish shutting down.",
450 timeout.as_secs()
451 );
452 futures::future::join_all(source_shutdown_handles).await;
453
454 for key in diff.sources.removed_and_changed() {
456 if let Some(task) = self.source_tasks.remove(key) {
457 task.await.unwrap().unwrap();
458 }
459 }
460 }
461
462 for key in &diff.transforms.to_remove {
470 debug!(component_id = %key, "Removing transform.");
471
472 let previous = self.tasks.remove(key).unwrap();
473 drop(previous); self.remove_inputs(key, diff, new_config).await;
476 self.remove_outputs(key);
477
478 if let Some(registry) = self.utilization_registry.as_ref() {
479 registry.remove_component(key);
480 }
481 }
482
483 for key in &diff.transforms.to_change {
484 debug!(component_id = %key, "Changing transform.");
485
486 self.remove_inputs(key, diff, new_config).await;
487 self.remove_outputs(key);
488 }
489
490 let removed_table_sinks = diff
496 .enrichment_tables
497 .removed_and_changed()
498 .filter_map(|key| {
499 self.config
500 .enrichment_table(key)
501 .and_then(|t| t.as_sink(key))
502 .map(|(key, s)| (key.clone(), s.resources(&key)))
503 })
504 .collect::<Vec<_>>();
505 let remove_sink = diff
506 .sinks
507 .removed_and_changed()
508 .map(|key| {
509 (
510 key,
511 self.config
512 .sink(key)
513 .map(|s| s.resources(key))
514 .unwrap_or_default(),
515 )
516 })
517 .chain(removed_table_sinks.iter().map(|(k, s)| (k, s.clone())));
518 let add_source = diff
519 .sources
520 .changed_and_added()
521 .map(|key| (key, new_config.source(key).unwrap().inner.resources()));
522 let added_table_sinks = diff
523 .enrichment_tables
524 .changed_and_added()
525 .filter_map(|key| {
526 self.config
527 .enrichment_table(key)
528 .and_then(|t| t.as_sink(key))
529 .map(|(key, s)| (key.clone(), s.resources(&key)))
530 })
531 .collect::<Vec<_>>();
532 let add_sink = diff
533 .sinks
534 .changed_and_added()
535 .map(|key| {
536 (
537 key,
538 new_config
539 .sink(key)
540 .map(|s| s.resources(key))
541 .unwrap_or_default(),
542 )
543 })
544 .chain(added_table_sinks.iter().map(|(k, s)| (k, s.clone())));
545 let conflicts = Resource::conflicts(
546 remove_sink.map(|(key, value)| ((true, key), value)).chain(
547 add_sink
548 .chain(add_source)
549 .map(|(key, value)| ((false, key), value)),
550 ),
551 )
552 .into_iter()
553 .flat_map(|(_, components)| components)
554 .collect::<HashSet<_>>();
555 let conflicting_sinks = conflicts
557 .into_iter()
558 .filter(|&(existing_sink, _)| existing_sink)
559 .map(|(_, key)| key.clone());
560
561 let reuse_buffers = diff
563 .sinks
564 .to_change
565 .iter()
566 .filter(|&key| {
567 if diff.components_to_reload.contains(key) {
568 return false;
569 }
570 self.config.sink(key).map(|s| s.buffer.clone()).or_else(|| {
571 self.config
572 .enrichment_table(key)
573 .and_then(|t| t.as_sink(key))
574 .map(|(_, s)| s.buffer)
575 }) == new_config.sink(key).map(|s| s.buffer.clone()).or_else(|| {
576 self.config
577 .enrichment_table(key)
578 .and_then(|t| t.as_sink(key))
579 .map(|(_, s)| s.buffer)
580 })
581 })
582 .cloned()
583 .collect::<HashSet<_>>();
584
585 let wait_for_sinks = conflicting_sinks
589 .chain(reuse_buffers.iter().cloned())
590 .collect::<HashSet<_>>();
591
592 let removed_sinks = diff
594 .sinks
595 .to_remove
596 .iter()
597 .chain(diff.enrichment_tables.to_remove.iter().filter(|key| {
598 self.config
599 .enrichment_table(key)
600 .and_then(|t| t.as_sink(key))
601 .is_some()
602 }))
603 .collect::<Vec<_>>();
604 for key in &removed_sinks {
605 debug!(component_id = %key, "Removing sink.");
606 self.remove_inputs(key, diff, new_config).await;
607
608 if let Some(registry) = self.utilization_registry.as_ref() {
609 registry.remove_component(key);
610 }
611 }
612
613 let mut buffer_tx = HashMap::new();
616
617 let sinks_to_change = diff
618 .sinks
619 .to_change
620 .iter()
621 .chain(diff.enrichment_tables.to_change.iter().filter(|key| {
622 self.config
623 .enrichment_table(key)
624 .and_then(|t| t.as_sink(key))
625 .is_some()
626 }))
627 .collect::<Vec<_>>();
628
629 for key in &sinks_to_change {
630 debug!(component_id = %key, "Changing sink.");
631 if reuse_buffers.contains(key) {
632 self.detach_triggers
633 .remove(key)
634 .unwrap()
635 .into_inner()
636 .cancel();
637
638 buffer_tx.insert((*key).clone(), self.inputs.get(key).unwrap().clone());
649 }
650 self.remove_inputs(key, diff, new_config).await;
651 }
652
653 for key in &removed_sinks {
660 let previous = self.tasks.remove(key).unwrap();
661 if wait_for_sinks.contains(key) {
662 debug!(message = "Waiting for sink to shutdown.", component_id = %key);
663 previous.await.unwrap().unwrap();
664 } else {
665 drop(previous); }
667 }
668
669 let mut buffers = HashMap::<ComponentKey, BuiltBuffer>::new();
670 for key in &sinks_to_change {
671 if wait_for_sinks.contains(key) {
672 let previous = self.tasks.remove(key).unwrap();
673 debug!(message = "Waiting for sink to shutdown.", component_id = %key);
674 let buffer = previous.await.unwrap().unwrap();
675
676 if reuse_buffers.contains(key) {
677 let tx = buffer_tx.remove(key).unwrap();
685 let rx = match buffer {
686 TaskOutput::Sink(rx) => rx.into_inner(),
687 _ => unreachable!(),
688 };
689
690 buffers.insert((*key).clone(), (tx, Arc::new(Mutex::new(Some(rx)))));
691 }
692 }
693 }
694
695 buffers
696 }
697
698 pub(crate) async fn connect_diff(
700 &mut self,
701 diff: &ConfigDiff,
702 new_pieces: &mut TopologyPieces,
703 ) {
704 debug!("Connecting changed/added component(s).");
705
706 if !self.watch.0.is_closed() {
708 for key in &diff.sources.to_remove {
709 self.outputs_tap_metadata.remove(key);
711 self.component_type_names.remove(key);
712 }
713
714 for key in &diff.transforms.to_remove {
715 self.outputs_tap_metadata.remove(key);
717 self.inputs_tap_metadata.remove(key);
718 self.component_type_names.remove(key);
719 }
720
721 for key in &diff.sinks.to_remove {
722 self.inputs_tap_metadata.remove(key);
724 self.component_type_names.remove(key);
725 }
726
727 let removed_sinks = diff.enrichment_tables.to_remove.iter().filter(|key| {
728 self.config
729 .enrichment_table(key)
730 .and_then(|t| t.as_sink(key))
731 .is_some()
732 });
733 for key in removed_sinks {
734 self.inputs_tap_metadata.remove(key);
736 }
737
738 let removed_sources = diff.enrichment_tables.to_remove.iter().filter_map(|key| {
739 self.config
740 .enrichment_table(key)
741 .and_then(|t| t.as_source(key).map(|(key, _)| key))
742 });
743 for key in removed_sources {
744 self.outputs_tap_metadata.remove(&key);
746 }
747
748 for key in diff.sources.changed_and_added() {
749 if let Some(task) = new_pieces.tasks.get(key) {
750 let typetag = task.typetag().to_string();
751 self.outputs_tap_metadata
752 .insert(key.clone(), ("source", typetag.clone()));
753 self.component_type_names.insert(key.clone(), typetag);
754 }
755 }
756
757 for key in diff
758 .enrichment_tables
759 .changed_and_added()
760 .filter_map(|key| {
761 self.config
762 .enrichment_table(key)
763 .and_then(|t| t.as_source(key).map(|(key, _)| key))
764 })
765 {
766 if let Some(task) = new_pieces.tasks.get(&key) {
767 self.outputs_tap_metadata
768 .insert(key.clone(), ("source", task.typetag().to_string()));
769 }
770 }
771
772 for key in diff.transforms.changed_and_added() {
773 if let Some(task) = new_pieces.tasks.get(key) {
774 let typetag = task.typetag().to_string();
775 self.outputs_tap_metadata
776 .insert(key.clone(), ("transform", typetag.clone()));
777 self.component_type_names.insert(key.clone(), typetag);
778 }
779 }
780
781 for key in diff.sinks.changed_and_added() {
782 if let Some(task) = new_pieces.tasks.get(key) {
783 self.component_type_names
784 .insert(key.clone(), task.typetag().to_string());
785 }
786 }
787
788 for (key, input) in &new_pieces.inputs {
789 self.inputs_tap_metadata
790 .insert(key.clone(), input.1.clone());
791 }
792 }
793
794 for key in diff.sources.changed_and_added() {
797 debug!(component_id = %key, "Configuring outputs for source.");
798 self.setup_outputs(key, new_pieces).await;
799 }
800
801 let added_changed_table_sources: Vec<&ComponentKey> = diff
802 .enrichment_tables
803 .changed_and_added()
804 .filter(|k| new_pieces.source_tasks.contains_key(k))
805 .collect();
806 for key in added_changed_table_sources.iter() {
807 debug!(component_id = %key, "Connecting outputs for enrichment table source.");
808 self.setup_outputs(key, new_pieces).await;
809 }
810
811 for key in diff.transforms.changed_and_added() {
814 debug!(component_id = %key, "Configuring outputs for transform.");
815 self.setup_outputs(key, new_pieces).await;
816 }
817
818 for key in diff.transforms.changed_and_added() {
821 debug!(component_id = %key, "Connecting inputs for transform.");
822 self.setup_inputs(key, diff, new_pieces).await;
823 }
824
825 for key in diff.sinks.changed_and_added() {
827 debug!(component_id = %key, "Connecting inputs for sink.");
828 self.setup_inputs(key, diff, new_pieces).await;
829 }
830 let added_changed_tables: Vec<&ComponentKey> = diff
831 .enrichment_tables
832 .changed_and_added()
833 .filter(|k| new_pieces.inputs.contains_key(k))
834 .collect();
835 for key in added_changed_tables.iter() {
836 debug!(component_id = %key, "Connecting inputs for enrichment table sink.");
837 self.setup_inputs(key, diff, new_pieces).await;
838 }
839
840 self.reattach_severed_inputs(diff);
851
852 if !self.watch.0.is_closed() {
854 let outputs = self
855 .outputs
856 .clone()
857 .into_iter()
858 .flat_map(|(output_id, control_tx)| {
859 self.outputs_tap_metadata.get(&output_id.component).map(
860 |(component_kind, component_type)| {
861 (
862 TapOutput {
863 output_id,
864 component_kind,
865 component_type: component_type.clone(),
866 },
867 control_tx,
868 )
869 },
870 )
871 })
872 .collect::<HashMap<_, _>>();
873
874 let mut removals = diff.sources.to_remove.clone();
875 removals.extend(diff.transforms.to_remove.iter().cloned());
876 self.watch
877 .0
878 .send(TapResource {
879 outputs,
880 inputs: self.inputs_tap_metadata.clone(),
881 source_keys: diff
882 .sources
883 .changed_and_added()
884 .map(|key| key.to_string())
885 .chain(
886 added_changed_table_sources
887 .iter()
888 .map(|key| key.to_string()),
889 )
890 .collect(),
891 sink_keys: diff
892 .sinks
893 .changed_and_added()
894 .map(|key| key.to_string())
895 .chain(added_changed_tables.iter().map(|key| key.to_string()))
896 .collect(),
897 removals,
900 type_names: self
901 .component_type_names
902 .iter()
903 .map(|(k, v)| (k.to_string(), v.clone()))
904 .collect(),
905 })
906 .expect("Couldn't broadcast config changes.");
907 }
908 }
909
910 async fn setup_outputs(
911 &mut self,
912 key: &ComponentKey,
913 new_pieces: &mut builder::TopologyPieces,
914 ) {
915 let outputs = new_pieces.outputs.remove(key).unwrap();
916 for (port, output) in outputs {
917 debug!(component_id = %key, output_id = ?port, "Configuring output for component.");
918
919 let id = OutputId {
920 component: key.clone(),
921 port,
922 };
923
924 self.outputs.insert(id, output);
925 }
926 }
927
928 async fn setup_inputs(
929 &mut self,
930 key: &ComponentKey,
931 diff: &ConfigDiff,
932 new_pieces: &mut builder::TopologyPieces,
933 ) {
934 let (tx, inputs) = new_pieces.inputs.remove(key).unwrap();
935
936 let old_inputs = self
937 .config
938 .inputs_for_node(key)
939 .into_iter()
940 .flatten()
941 .cloned()
942 .collect::<HashSet<_>>();
943
944 let new_inputs = inputs.iter().cloned().collect::<HashSet<_>>();
945 let inputs_to_add = &new_inputs - &old_inputs;
946
947 for input in inputs {
948 let output = self.outputs.get_mut(&input).expect("unknown output");
949
950 if diff.contains(&input.component) || inputs_to_add.contains(&input) {
951 debug!(component_id = %key, fanout_id = %input, "Adding component input to fanout.");
955
956 _ = output.send(ControlMessage::Add(key.clone(), tx.clone()));
957 } else {
958 debug!(component_id = %key, fanout_id = %input, "Replacing component input in fanout.");
963
964 _ = output.send(ControlMessage::Replace(key.clone(), tx.clone()));
965 }
966 }
967
968 self.inputs.insert(key.clone(), tx);
969 new_pieces
970 .detach_triggers
971 .remove(key)
972 .map(|trigger| self.detach_triggers.insert(key.clone(), trigger.into()));
973 }
974
975 fn remove_outputs(&mut self, key: &ComponentKey) {
976 self.outputs.retain(|id, _output| &id.component != key);
977 }
978
979 async fn remove_inputs(&mut self, key: &ComponentKey, diff: &ConfigDiff, new_config: &Config) {
980 self.inputs.remove(key);
981 self.detach_triggers.remove(key);
982
983 let old_inputs = self.config.inputs_for_node(key).expect("node exists");
984 let new_inputs = new_config
985 .inputs_for_node(key)
986 .unwrap_or_default()
987 .iter()
988 .collect::<HashSet<_>>();
989
990 for input in old_inputs {
991 if let Some(output) = self.outputs.get_mut(input) {
992 if diff.contains(&input.component)
993 || diff.is_removed(key)
994 || !new_inputs.contains(input)
995 {
996 debug!(component_id = %key, fanout_id = %input, "Removing component input from fanout.");
1007
1008 _ = output.send(ControlMessage::Remove(key.clone()));
1009 } else {
1010 debug!(component_id = %key, fanout_id = %input, "Pausing component input in fanout.");
1014
1015 _ = output.send(ControlMessage::Pause(key.clone()));
1016 }
1017 }
1018 }
1019 }
1020
1021 fn reattach_severed_inputs(&mut self, diff: &ConfigDiff) {
1022 let unchanged_transforms = self
1023 .config
1024 .transforms()
1025 .filter(|(key, _)| !diff.transforms.contains(key));
1026 for (transform_key, transform) in unchanged_transforms {
1027 let changed_outputs = get_changed_outputs(diff, transform.inputs.clone());
1028 for output_id in changed_outputs {
1029 debug!(component_id = %transform_key, fanout_id = %output_id.component, "Reattaching component input to fanout.");
1030
1031 let input = self.inputs.get(transform_key).cloned().unwrap();
1032 let output = self.outputs.get_mut(&output_id).unwrap();
1033 _ = output.send(ControlMessage::Add(transform_key.clone(), input));
1034 }
1035 }
1036
1037 let unchanged_sinks = self
1038 .config
1039 .sinks()
1040 .filter(|(key, _)| !diff.sinks.contains(key));
1041 for (sink_key, sink) in unchanged_sinks {
1042 let changed_outputs = get_changed_outputs(diff, sink.inputs.clone());
1043 for output_id in changed_outputs {
1044 debug!(component_id = %sink_key, fanout_id = %output_id.component, "Reattaching component input to fanout.");
1045
1046 let input = self.inputs.get(sink_key).cloned().unwrap();
1047 let output = self.outputs.get_mut(&output_id).unwrap();
1048 _ = output.send(ControlMessage::Add(sink_key.clone(), input));
1049 }
1050 }
1051 }
1052
1053 pub(crate) fn spawn_diff(&mut self, diff: &ConfigDiff, mut new_pieces: TopologyPieces) {
1055 for key in &diff.sources.to_change {
1056 debug!(message = "Spawning changed source.", component_id = %key);
1057 self.spawn_source(key, &mut new_pieces);
1058 }
1059
1060 for key in &diff.sources.to_add {
1061 debug!(message = "Spawning new source.", component_id = %key);
1062 self.spawn_source(key, &mut new_pieces);
1063 }
1064
1065 let changed_table_sources: Vec<&ComponentKey> = diff
1066 .enrichment_tables
1067 .to_change
1068 .iter()
1069 .filter(|k| new_pieces.source_tasks.contains_key(k))
1070 .collect();
1071
1072 let added_table_sources: Vec<&ComponentKey> = diff
1073 .enrichment_tables
1074 .to_add
1075 .iter()
1076 .filter(|k| new_pieces.source_tasks.contains_key(k))
1077 .collect();
1078
1079 for key in changed_table_sources {
1080 debug!(message = "Spawning changed enrichment table source.", component_id = %key);
1081 self.spawn_source(key, &mut new_pieces);
1082 }
1083
1084 for key in added_table_sources {
1085 debug!(message = "Spawning new enrichment table source.", component_id = %key);
1086 self.spawn_source(key, &mut new_pieces);
1087 }
1088
1089 for key in &diff.transforms.to_change {
1090 debug!(message = "Spawning changed transform.", component_id = %key);
1091 self.spawn_transform(key, &mut new_pieces);
1092 }
1093
1094 for key in &diff.transforms.to_add {
1095 debug!(message = "Spawning new transform.", component_id = %key);
1096 self.spawn_transform(key, &mut new_pieces);
1097 }
1098
1099 for key in &diff.sinks.to_change {
1100 debug!(message = "Spawning changed sink.", component_id = %key);
1101 self.spawn_sink(key, &mut new_pieces);
1102 }
1103
1104 for key in &diff.sinks.to_add {
1105 trace!(message = "Spawning new sink.", component_id = %key);
1106 self.spawn_sink(key, &mut new_pieces);
1107 }
1108
1109 let changed_tables: Vec<&ComponentKey> = diff
1110 .enrichment_tables
1111 .to_change
1112 .iter()
1113 .filter(|k| {
1114 new_pieces.tasks.contains_key(k) && !new_pieces.source_tasks.contains_key(k)
1115 })
1116 .collect();
1117
1118 let added_tables: Vec<&ComponentKey> = diff
1119 .enrichment_tables
1120 .to_add
1121 .iter()
1122 .filter(|k| {
1123 new_pieces.tasks.contains_key(k) && !new_pieces.source_tasks.contains_key(k)
1124 })
1125 .collect();
1126
1127 for key in changed_tables {
1128 debug!(message = "Spawning changed enrichment table sink.", component_id = %key);
1129 self.spawn_sink(key, &mut new_pieces);
1130 }
1131
1132 for key in added_tables {
1133 debug!(message = "Spawning enrichment table new sink.", component_id = %key);
1134 self.spawn_sink(key, &mut new_pieces);
1135 }
1136 }
1137
1138 fn spawn_sink(&mut self, key: &ComponentKey, new_pieces: &mut builder::TopologyPieces) {
1139 let task = new_pieces.tasks.remove(key).unwrap();
1140 let span = error_span!(
1141 "sink",
1142 component_kind = "sink",
1143 component_id = %task.id(),
1144 component_type = %task.typetag(),
1145 );
1146
1147 let task_span = span.or_current();
1148 #[cfg(feature = "allocation-tracing")]
1149 if crate::internal_telemetry::allocations::is_allocation_tracing_enabled() {
1150 let group_id = crate::internal_telemetry::allocations::acquire_allocation_group_id(
1151 task.id().to_string(),
1152 "sink".to_string(),
1153 task.typetag().to_string(),
1154 );
1155 debug!(
1156 component_kind = "sink",
1157 component_type = task.typetag(),
1158 component_id = task.id(),
1159 group_id = group_id.as_raw().to_string(),
1160 "Registered new allocation group."
1161 );
1162 group_id.attach_to_span(&task_span);
1163 }
1164
1165 let task_name = format!(">> {} ({})", task.typetag(), task.id());
1166 let task = {
1167 let key = key.clone();
1168 handle_errors(task, self.abort_tx.clone(), |error| {
1169 ShutdownError::SinkAborted { key, error }
1170 })
1171 }
1172 .instrument(task_span);
1173 let spawned = spawn_named(task, task_name.as_ref());
1174 if let Some(previous) = self.tasks.insert(key.clone(), spawned) {
1175 drop(previous); }
1177 }
1178
1179 fn spawn_transform(&mut self, key: &ComponentKey, new_pieces: &mut builder::TopologyPieces) {
1180 let task = new_pieces.tasks.remove(key).unwrap();
1181 let span = error_span!(
1182 "transform",
1183 component_kind = "transform",
1184 component_id = %task.id(),
1185 component_type = %task.typetag(),
1186 );
1187
1188 let task_span = span.or_current();
1189 #[cfg(feature = "allocation-tracing")]
1190 if crate::internal_telemetry::allocations::is_allocation_tracing_enabled() {
1191 let group_id = crate::internal_telemetry::allocations::acquire_allocation_group_id(
1192 task.id().to_string(),
1193 "transform".to_string(),
1194 task.typetag().to_string(),
1195 );
1196 debug!(
1197 component_kind = "transform",
1198 component_type = task.typetag(),
1199 component_id = task.id(),
1200 group_id = group_id.as_raw().to_string(),
1201 "Registered new allocation group."
1202 );
1203 group_id.attach_to_span(&task_span);
1204 }
1205
1206 let task_name = format!(">> {} ({}) >>", task.typetag(), task.id());
1207 let task = {
1208 let key = key.clone();
1209 handle_errors(task, self.abort_tx.clone(), |error| {
1210 ShutdownError::TransformAborted { key, error }
1211 })
1212 }
1213 .instrument(task_span);
1214 let spawned = spawn_named(task, task_name.as_ref());
1215 if let Some(previous) = self.tasks.insert(key.clone(), spawned) {
1216 drop(previous); }
1218 }
1219
1220 fn spawn_source(&mut self, key: &ComponentKey, new_pieces: &mut builder::TopologyPieces) {
1221 let task = new_pieces.tasks.remove(key).unwrap();
1222 let span = error_span!(
1223 "source",
1224 component_kind = "source",
1225 component_id = %task.id(),
1226 component_type = %task.typetag(),
1227 );
1228
1229 let task_span = span.or_current();
1230 #[cfg(feature = "allocation-tracing")]
1231 if crate::internal_telemetry::allocations::is_allocation_tracing_enabled() {
1232 let group_id = crate::internal_telemetry::allocations::acquire_allocation_group_id(
1233 task.id().to_string(),
1234 "source".to_string(),
1235 task.typetag().to_string(),
1236 );
1237
1238 debug!(
1239 component_kind = "source",
1240 component_type = task.typetag(),
1241 component_id = task.id(),
1242 group_id = group_id.as_raw().to_string(),
1243 "Registered new allocation group."
1244 );
1245 group_id.attach_to_span(&task_span);
1246 }
1247
1248 let task_name = format!("{} ({}) >>", task.typetag(), task.id());
1249 let task = {
1250 let key = key.clone();
1251 handle_errors(task, self.abort_tx.clone(), |error| {
1252 ShutdownError::SourceAborted { key, error }
1253 })
1254 }
1255 .instrument(task_span.clone());
1256 let spawned = spawn_named(task, task_name.as_ref());
1257 if let Some(previous) = self.tasks.insert(key.clone(), spawned) {
1258 drop(previous); }
1260
1261 self.shutdown_coordinator
1262 .takeover_source(key, &mut new_pieces.shutdown_coordinator);
1263
1264 let source_task = new_pieces.source_tasks.remove(key).unwrap();
1266 let source_task = {
1267 let key = key.clone();
1268 handle_errors(source_task, self.abort_tx.clone(), |error| {
1269 ShutdownError::SourceAborted { key, error }
1270 })
1271 }
1272 .instrument(task_span);
1273 self.source_tasks
1274 .insert(key.clone(), spawn_named(source_task, task_name.as_ref()));
1275 }
1276
1277 pub async fn start_init_validated(
1278 config: Config,
1279 extra_context: ExtraContext,
1280 ) -> Option<(Self, ShutdownErrorReceiver)> {
1281 let diff = ConfigDiff::initial(&config);
1282 let pieces = TopologyPiecesBuilder::new(&config, &diff)
1283 .with_extra_context(extra_context)
1284 .build_or_log_errors()
1285 .await?;
1286 Self::start_validated(config, diff, pieces).await
1287 }
1288
1289 pub async fn start_validated(
1290 config: Config,
1291 diff: ConfigDiff,
1292 mut pieces: TopologyPieces,
1293 ) -> Option<(Self, ShutdownErrorReceiver)> {
1294 let (abort_tx, abort_rx) = mpsc::unbounded_channel();
1295
1296 let expire_metrics = match (
1297 config.global.expire_metrics,
1298 config.global.expire_metrics_secs,
1299 ) {
1300 (Some(e), None) => {
1301 warn!(
1302 "DEPRECATED: `expire_metrics` setting is deprecated and will be removed in a future version. Use `expire_metrics_secs` instead."
1303 );
1304 if e < Duration::from_secs(0) {
1305 None
1306 } else {
1307 Some(e.as_secs_f64())
1308 }
1309 }
1310 (Some(_), Some(_)) => {
1311 error!(
1312 message = "Cannot set both `expire_metrics` and `expire_metrics_secs`.",
1313 internal_log_rate_limit = false
1314 );
1315 return None;
1316 }
1317 (None, Some(e)) => {
1318 if e < 0f64 {
1319 None
1320 } else {
1321 Some(e)
1322 }
1323 }
1324 (None, None) => Some(300f64),
1325 };
1326
1327 if let Err(error) = crate::metrics::Controller::get()
1328 .expect("Metrics must be initialized")
1329 .set_expiry(
1330 expire_metrics,
1331 config
1332 .global
1333 .expire_metrics_per_metric_set
1334 .clone()
1335 .unwrap_or_default(),
1336 )
1337 {
1338 error!(message = "Invalid metrics expiry.", %error, internal_log_rate_limit = false);
1339 return None;
1340 }
1341
1342 let (utilization_emitter, utilization_registry) = pieces
1343 .utilization
1344 .take()
1345 .expect("Topology is missing the utilization metric emitter!");
1346 let metrics_storage = pieces.metrics_storage.clone();
1347 let metrics_refresh_period = config
1348 .global
1349 .metrics_storage_refresh_period
1350 .map(Duration::from_secs_f64);
1351 let mut running_topology = Self::new(config, abort_tx);
1352
1353 if !running_topology
1354 .run_healthchecks(&diff, &mut pieces, running_topology.config.healthchecks)
1355 .await
1356 {
1357 return None;
1358 }
1359 running_topology.connect_diff(&diff, &mut pieces).await;
1360 running_topology.spawn_diff(&diff, pieces);
1361
1362 let (utilization_task_shutdown_trigger, utilization_shutdown_signal, _) =
1363 ShutdownSignal::new_wired();
1364 running_topology.utilization_registry = Some(utilization_registry.clone());
1365 running_topology.utilization_task_shutdown_trigger =
1366 Some(utilization_task_shutdown_trigger);
1367 running_topology.utilization_task = Some(tokio::spawn(Task::new(
1368 "utilization_heartbeat".into(),
1369 "",
1370 async move {
1371 utilization_emitter
1372 .run_utilization(utilization_shutdown_signal)
1373 .await;
1374 Ok(TaskOutput::Healthcheck)
1375 },
1376 )));
1377 if let Some(metrics_refresh_period) = metrics_refresh_period {
1378 let (metrics_task_shutdown_trigger, metrics_shutdown_signal, _) =
1379 ShutdownSignal::new_wired();
1380 running_topology.metrics_task_shutdown_trigger = Some(metrics_task_shutdown_trigger);
1381 running_topology.metrics_task = Some(tokio::spawn(Task::new(
1382 "metrics_heartbeat".into(),
1383 "",
1384 async move {
1385 metrics_storage
1386 .run_periodic_refresh(metrics_refresh_period, metrics_shutdown_signal)
1387 .await;
1388 Ok(TaskOutput::Healthcheck)
1389 },
1390 )));
1391 }
1392
1393 Some((running_topology, abort_rx))
1394 }
1395}
1396
1397fn get_changed_outputs(diff: &ConfigDiff, output_ids: Inputs<OutputId>) -> Vec<OutputId> {
1398 let mut changed_outputs = Vec::new();
1399
1400 for source_key in &diff.sources.to_change {
1401 changed_outputs.extend(
1402 output_ids
1403 .iter()
1404 .filter(|id| &id.component == source_key)
1405 .cloned(),
1406 );
1407 }
1408
1409 for transform_key in &diff.transforms.to_change {
1410 changed_outputs.extend(
1411 output_ids
1412 .iter()
1413 .filter(|id| &id.component == transform_key)
1414 .cloned(),
1415 );
1416 }
1417
1418 changed_outputs
1419}