vector/topology/running.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082
use std::{
collections::{HashMap, HashSet},
sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
},
};
use super::{
builder,
builder::TopologyPieces,
fanout::{ControlChannel, ControlMessage},
handle_errors, retain, take_healthchecks,
task::TaskOutput,
BuiltBuffer, TaskHandle,
};
use crate::{
config::{ComponentKey, Config, ConfigDiff, HealthcheckOptions, Inputs, OutputId, Resource},
event::EventArray,
extra_context::ExtraContext,
shutdown::SourceShutdownCoordinator,
signal::ShutdownError,
spawn_named,
};
use futures::{future, Future, FutureExt};
use tokio::{
sync::{mpsc, watch},
time::{interval, sleep_until, Duration, Instant},
};
use tracing::Instrument;
use vector_lib::buffers::topology::channel::BufferSender;
use vector_lib::tap::topology::{TapOutput, TapResource, WatchRx, WatchTx};
use vector_lib::trigger::DisabledTrigger;
pub type ShutdownErrorReceiver = mpsc::UnboundedReceiver<ShutdownError>;
#[allow(dead_code)]
pub struct RunningTopology {
inputs: HashMap<ComponentKey, BufferSender<EventArray>>,
inputs_tap_metadata: HashMap<ComponentKey, Inputs<OutputId>>,
outputs: HashMap<OutputId, ControlChannel>,
outputs_tap_metadata: HashMap<ComponentKey, (&'static str, String)>,
source_tasks: HashMap<ComponentKey, TaskHandle>,
tasks: HashMap<ComponentKey, TaskHandle>,
shutdown_coordinator: SourceShutdownCoordinator,
detach_triggers: HashMap<ComponentKey, DisabledTrigger>,
pub(crate) config: Config,
pub(crate) abort_tx: mpsc::UnboundedSender<ShutdownError>,
watch: (WatchTx, WatchRx),
pub(crate) running: Arc<AtomicBool>,
graceful_shutdown_duration: Option<Duration>,
}
impl RunningTopology {
pub fn new(config: Config, abort_tx: mpsc::UnboundedSender<ShutdownError>) -> Self {
Self {
inputs: HashMap::new(),
inputs_tap_metadata: HashMap::new(),
outputs: HashMap::new(),
outputs_tap_metadata: HashMap::new(),
shutdown_coordinator: SourceShutdownCoordinator::default(),
detach_triggers: HashMap::new(),
source_tasks: HashMap::new(),
tasks: HashMap::new(),
abort_tx,
watch: watch::channel(TapResource::default()),
running: Arc::new(AtomicBool::new(true)),
graceful_shutdown_duration: config.graceful_shutdown_duration,
config,
}
}
/// Gets the configuration that represents this running topology.
pub const fn config(&self) -> &Config {
&self.config
}
/// Creates a subscription to topology changes.
///
/// This is used by the tap API to observe configuration changes, and re-wire tap sinks.
pub fn watch(&self) -> watch::Receiver<TapResource> {
self.watch.1.clone()
}
/// Signal that all sources in this topology are ended.
///
/// The future returned by this function will finish once all the sources in
/// this topology have finished. This allows the caller to wait for or
/// detect that the sources in the topology are no longer
/// producing. [`Application`][crate::app::Application], as an example, uses this as a
/// shutdown signal.
pub fn sources_finished(&self) -> future::BoxFuture<'static, ()> {
self.shutdown_coordinator.shutdown_tripwire()
}
/// Shut down all topology components.
///
/// This function sends the shutdown signal to all sources in this topology
/// and returns a future that resolves once all components (sources,
/// transforms, and sinks) have finished shutting down. Transforms and sinks
/// will shut down automatically once their input tasks finish.
///
/// This function takes ownership of `self`, so once it returns everything
/// in the [`RunningTopology`] instance has been dropped except for the
/// `tasks` map. This map gets moved into the returned future and is used to
/// poll for when the tasks have completed. Once the returned future is
/// dropped then everything from this RunningTopology instance is fully
/// dropped.
pub fn stop(self) -> impl Future<Output = ()> {
// Update the API's health endpoint to signal shutdown
self.running.store(false, Ordering::Relaxed);
// Create handy handles collections of all tasks for the subsequent
// operations.
let mut wait_handles = Vec::new();
// We need a Vec here since source components have two tasks. One for
// pump in self.tasks, and the other for source in self.source_tasks.
let mut check_handles = HashMap::<ComponentKey, Vec<_>>::new();
// We need to give some time to the sources to gracefully shutdown, so
// we will merge them with other tasks.
for (key, task) in self.tasks.into_iter().chain(self.source_tasks.into_iter()) {
let task = task.map(|_result| ()).shared();
wait_handles.push(task.clone());
check_handles.entry(key).or_default().push(task);
}
// If we reach this, we will forcefully shutdown the sources. If None, we will never force shutdown.
let deadline = self
.graceful_shutdown_duration
.map(|grace_period| Instant::now() + grace_period);
let timeout = if let Some(deadline) = deadline {
// If we reach the deadline, this future will print out which components
// won't gracefully shutdown since we will start to forcefully shutdown
// the sources.
let mut check_handles2 = check_handles.clone();
Box::pin(async move {
sleep_until(deadline).await;
// Remove all tasks that have shutdown.
check_handles2.retain(|_key, handles| {
retain(handles, |handle| handle.peek().is_none());
!handles.is_empty()
});
let remaining_components = check_handles2
.keys()
.map(|item| item.to_string())
.collect::<Vec<_>>()
.join(", ");
error!(
components = ?remaining_components,
"Failed to gracefully shut down in time. Killing components."
);
}) as future::BoxFuture<'static, ()>
} else {
Box::pin(future::pending()) as future::BoxFuture<'static, ()>
};
// Reports in intervals which components are still running.
let mut interval = interval(Duration::from_secs(5));
let reporter = async move {
loop {
interval.tick().await;
// Remove all tasks that have shutdown.
check_handles.retain(|_key, handles| {
retain(handles, |handle| handle.peek().is_none());
!handles.is_empty()
});
let remaining_components = check_handles
.keys()
.map(|item| item.to_string())
.collect::<Vec<_>>()
.join(", ");
let time_remaining = deadline
.map(|d| match d.checked_duration_since(Instant::now()) {
Some(remaining) => format!("{} seconds left", remaining.as_secs()),
None => "overdue".to_string(),
})
.unwrap_or("no time limit".to_string());
info!(
remaining_components = ?remaining_components,
time_remaining = ?time_remaining,
"Shutting down... Waiting on running components."
);
}
};
// Finishes once all tasks have shutdown.
let success = futures::future::join_all(wait_handles).map(|_| ());
// Aggregate future that ends once anything detects that all tasks have shutdown.
let shutdown_complete_future = future::select_all(vec![
Box::pin(timeout) as future::BoxFuture<'static, ()>,
Box::pin(reporter) as future::BoxFuture<'static, ()>,
Box::pin(success) as future::BoxFuture<'static, ()>,
]);
// Now kick off the shutdown process by shutting down the sources.
let source_shutdown_complete = self.shutdown_coordinator.shutdown_all(deadline);
futures::future::join(source_shutdown_complete, shutdown_complete_future).map(|_| ())
}
/// Attempts to load a new configuration and update this running topology.
///
/// If the new configuration was valid, and all changes were able to be made -- removing of
/// old components, changing of existing components, adding of new components -- then `Ok(true)`
/// is returned.
///
/// If the new configuration is not valid, or not all of the changes in the new configuration
/// were able to be made, then this method will attempt to undo the changes made and bring the
/// topology back to its previous state. If either of these scenarios occur, then `Ok(false)`
/// is returned.
///
/// # Errors
///
/// If all changes from the new configuration cannot be made, and the current configuration
/// cannot be fully restored, then `Err(())` is returned.
pub async fn reload_config_and_respawn(
&mut self,
new_config: Config,
extra_context: ExtraContext,
) -> Result<bool, ()> {
info!("Reloading running topology with new configuration.");
if self.config.global != new_config.global {
error!(
message =
"Global options can't be changed while reloading config file; reload aborted. Please restart Vector to reload the configuration file."
);
return Ok(false);
}
// Calculate the change between the current configuration and the new configuration, and
// shutdown any components that are changing so that we can reclaim their buffers before
// spawning the new version of the component.
//
// We also shutdown any component that is simply being removed entirely.
let diff = ConfigDiff::new(&self.config, &new_config);
let buffers = self.shutdown_diff(&diff, &new_config).await;
// Gives windows some time to make available any port
// released by shutdown components.
// Issue: https://github.com/vectordotdev/vector/issues/3035
if cfg!(windows) {
// This value is guess work.
tokio::time::sleep(Duration::from_millis(200)).await;
}
// Try to build all of the new components coming from the new configuration. If we can
// successfully build them, we'll attempt to connect them up to the topology and spawn their
// respective component tasks.
if let Some(mut new_pieces) = TopologyPieces::build_or_log_errors(
&new_config,
&diff,
buffers.clone(),
extra_context.clone(),
)
.await
{
// If healthchecks are configured for any of the changing/new components, try running
// them before moving forward with connecting and spawning. In some cases, healthchecks
// failing may be configured as a non-blocking issue and so we'll still continue on.
if self
.run_healthchecks(&diff, &mut new_pieces, new_config.healthchecks)
.await
{
self.connect_diff(&diff, &mut new_pieces).await;
self.spawn_diff(&diff, new_pieces);
self.config = new_config;
info!("New configuration loaded successfully.");
return Ok(true);
}
}
// We failed to build, connect, and spawn all of the changed/new components, so we flip
// around the configuration differential to generate all the components that we need to
// bring back to restore the current configuration.
warn!("Failed to completely load new configuration. Restoring old configuration.");
let diff = diff.flip();
if let Some(mut new_pieces) =
TopologyPieces::build_or_log_errors(&self.config, &diff, buffers, extra_context.clone())
.await
{
if self
.run_healthchecks(&diff, &mut new_pieces, self.config.healthchecks)
.await
{
self.connect_diff(&diff, &mut new_pieces).await;
self.spawn_diff(&diff, new_pieces);
info!("Old configuration restored successfully.");
return Ok(false);
}
}
error!("Failed to restore old configuration.");
Err(())
}
pub(crate) async fn run_healthchecks(
&mut self,
diff: &ConfigDiff,
pieces: &mut TopologyPieces,
options: HealthcheckOptions,
) -> bool {
if options.enabled {
let healthchecks = take_healthchecks(diff, pieces)
.into_iter()
.map(|(_, task)| task);
let healthchecks = future::try_join_all(healthchecks);
info!("Running healthchecks.");
if options.require_healthy {
let success = healthchecks.await;
if success.is_ok() {
info!("All healthchecks passed.");
true
} else {
error!("Sinks unhealthy.");
false
}
} else {
tokio::spawn(healthchecks);
true
}
} else {
true
}
}
/// Shuts down any changed/removed component in the given configuration diff.
///
/// If buffers for any of the changed/removed components can be recovered, they'll be returned.
async fn shutdown_diff(
&mut self,
diff: &ConfigDiff,
new_config: &Config,
) -> HashMap<ComponentKey, BuiltBuffer> {
// First, we shutdown any changed/removed sources. This ensures that we can allow downstream
// components to terminate naturally by virtue of the flow of events stopping.
if diff.sources.any_changed_or_removed() {
let timeout = Duration::from_secs(30);
let mut source_shutdown_handles = Vec::new();
let deadline = Instant::now() + timeout;
for key in &diff.sources.to_remove {
debug!(component = %key, "Removing source.");
let previous = self.tasks.remove(key).unwrap();
drop(previous); // detach and forget
self.remove_outputs(key);
source_shutdown_handles
.push(self.shutdown_coordinator.shutdown_source(key, deadline));
}
for key in &diff.sources.to_change {
debug!(component = %key, "Changing source.");
self.remove_outputs(key);
source_shutdown_handles
.push(self.shutdown_coordinator.shutdown_source(key, deadline));
}
debug!(
"Waiting for up to {} seconds for source(s) to finish shutting down.",
timeout.as_secs()
);
futures::future::join_all(source_shutdown_handles).await;
// Final cleanup pass now that all changed/removed sources have signalled as having shutdown.
for key in diff.sources.removed_and_changed() {
if let Some(task) = self.source_tasks.remove(key) {
task.await.unwrap().unwrap();
}
}
}
// Next, we shutdown any changed/removed transforms. Same as before: we want allow
// downstream components to terminate naturally by virtue of the flow of events stopping.
//
// Since transforms are entirely driven by the flow of events into them from upstream
// components, the shutdown of sources they depend on, or the shutdown of transforms they
// depend on, and thus the closing of their buffer, will naturally cause them to shutdown,
// which is why we don't do any manual triggering of shutdown here.
for key in &diff.transforms.to_remove {
debug!(component = %key, "Removing transform.");
let previous = self.tasks.remove(key).unwrap();
drop(previous); // detach and forget
self.remove_inputs(key, diff, new_config).await;
self.remove_outputs(key);
}
for key in &diff.transforms.to_change {
debug!(component = %key, "Changing transform.");
self.remove_inputs(key, diff, new_config).await;
self.remove_outputs(key);
}
// Now we'll process any changed/removed sinks.
//
// At this point both the old and the new config don't have conflicts in their resource
// usage. So if we combine their resources, all found conflicts are between to be removed
// and to be added components.
let remove_sink = diff
.sinks
.removed_and_changed()
.map(|key| (key, self.config.sink(key).unwrap().resources(key)));
let add_source = diff
.sources
.changed_and_added()
.map(|key| (key, new_config.source(key).unwrap().inner.resources()));
let add_sink = diff
.sinks
.changed_and_added()
.map(|key| (key, new_config.sink(key).unwrap().resources(key)));
let conflicts = Resource::conflicts(
remove_sink.map(|(key, value)| ((true, key), value)).chain(
add_sink
.chain(add_source)
.map(|(key, value)| ((false, key), value)),
),
)
.into_iter()
.flat_map(|(_, components)| components)
.collect::<HashSet<_>>();
// Existing conflicting sinks
let conflicting_sinks = conflicts
.into_iter()
.filter(|&(existing_sink, _)| existing_sink)
.map(|(_, key)| key.clone());
// For any sink whose buffer configuration didn't change, we can reuse their buffer.
let reuse_buffers = diff
.sinks
.to_change
.iter()
.filter(|&key| {
self.config.sink(key).unwrap().buffer == new_config.sink(key).unwrap().buffer
})
.cloned()
.collect::<HashSet<_>>();
// For any existing sink that has a conflicting resource dependency with a changed/added
// sink, or for any sink that we want to reuse their buffer, we need to explicit wait for
// them to finish processing so we can reclaim ownership of those resources/buffers.
let wait_for_sinks = conflicting_sinks
.chain(reuse_buffers.iter().cloned())
.collect::<HashSet<_>>();
// First, we remove any inputs to removed sinks so they can naturally shut down.
for key in &diff.sinks.to_remove {
debug!(component = %key, "Removing sink.");
self.remove_inputs(key, diff, new_config).await;
}
// After that, for any changed sinks, we temporarily detach their inputs (not remove) so
// they can naturally shutdown and allow us to recover their buffers if possible.
let mut buffer_tx = HashMap::new();
for key in &diff.sinks.to_change {
debug!(component = %key, "Changing sink.");
if reuse_buffers.contains(key) {
self.detach_triggers
.remove(key)
.unwrap()
.into_inner()
.cancel();
// We explicitly clone the input side of the buffer and store it so we don't lose
// it when we remove the inputs below.
//
// We clone instead of removing here because otherwise the input will be missing for
// the rest of the reload process, which violates the assumption that all previous
// inputs for components not being removed are still available. It's simpler to
// allow the "old" input to stick around and be replaced (even though that's
// basically a no-op since we're reusing the same buffer) than it is to pass around
// info about which sinks are having their buffers reused and treat them differently
// at other stages.
buffer_tx.insert(key.clone(), self.inputs.get(key).unwrap().clone());
}
self.remove_inputs(key, diff, new_config).await;
}
// Now that we've disconnected or temporarily detached the inputs to all changed/removed
// sinks, we can actually wait for them to shutdown before collecting any buffers that are
// marked for reuse.
//
// If a sink we're removing isn't tying up any resource that a changed/added sink depends
// on, we don't bother waiting for it to shutdown.
for key in &diff.sinks.to_remove {
let previous = self.tasks.remove(key).unwrap();
if wait_for_sinks.contains(key) {
debug!(message = "Waiting for sink to shutdown.", %key);
previous.await.unwrap().unwrap();
} else {
drop(previous); // detach and forget
}
}
let mut buffers = HashMap::<ComponentKey, BuiltBuffer>::new();
for key in &diff.sinks.to_change {
if wait_for_sinks.contains(key) {
let previous = self.tasks.remove(key).unwrap();
debug!(message = "Waiting for sink to shutdown.", %key);
let buffer = previous.await.unwrap().unwrap();
if reuse_buffers.contains(key) {
// We clone instead of removing here because otherwise the input will be
// missing for the rest of the reload process, which violates the assumption
// that all previous inputs for components not being removed are still
// available. It's simpler to allow the "old" input to stick around and be
// replaced (even though that's basically a no-op since we're reusing the same
// buffer) than it is to pass around info about which sinks are having their
// buffers reused and treat them differently at other stages.
let tx = buffer_tx.remove(key).unwrap();
let rx = match buffer {
TaskOutput::Sink(rx) => rx.into_inner(),
_ => unreachable!(),
};
buffers.insert(key.clone(), (tx, Arc::new(Mutex::new(Some(rx)))));
}
}
}
buffers
}
/// Connects all changed/added components in the given configuration diff.
pub(crate) async fn connect_diff(
&mut self,
diff: &ConfigDiff,
new_pieces: &mut TopologyPieces,
) {
debug!("Connecting changed/added component(s).");
// Update tap metadata
if !self.watch.0.is_closed() {
for key in &diff.sources.to_remove {
// Sources only have outputs
self.outputs_tap_metadata.remove(key);
}
for key in &diff.transforms.to_remove {
// Transforms can have both inputs and outputs
self.outputs_tap_metadata.remove(key);
self.inputs_tap_metadata.remove(key);
}
for key in &diff.sinks.to_remove {
// Sinks only have inputs
self.inputs_tap_metadata.remove(key);
}
for key in diff.sources.changed_and_added() {
if let Some(task) = new_pieces.tasks.get(key) {
self.outputs_tap_metadata
.insert(key.clone(), ("source", task.typetag().to_string()));
}
}
for key in diff.transforms.changed_and_added() {
if let Some(task) = new_pieces.tasks.get(key) {
self.outputs_tap_metadata
.insert(key.clone(), ("transform", task.typetag().to_string()));
}
}
for (key, input) in &new_pieces.inputs {
self.inputs_tap_metadata
.insert(key.clone(), input.1.clone());
}
}
// We configure the outputs of any changed/added sources first, so they're available to any
// transforms and sinks that come afterwards.
for key in diff.sources.changed_and_added() {
debug!(component = %key, "Configuring outputs for source.");
self.setup_outputs(key, new_pieces).await;
}
// We configure the outputs of any changed/added transforms next, for the same reason: we
// need them to be available to any transforms and sinks that come afterwards.
for key in diff.transforms.changed_and_added() {
debug!(component = %key, "Configuring outputs for transform.");
self.setup_outputs(key, new_pieces).await;
}
// Now that all possible outputs are configured, we can start wiring up inputs, starting
// with transforms.
for key in diff.transforms.changed_and_added() {
debug!(component = %key, "Connecting inputs for transform.");
self.setup_inputs(key, diff, new_pieces).await;
}
// Now that all sources and transforms are fully configured, we can wire up sinks.
for key in diff.sinks.changed_and_added() {
debug!(component = %key, "Connecting inputs for sink.");
self.setup_inputs(key, diff, new_pieces).await;
}
// We do a final pass here to reconnect unchanged components.
//
// Why would we reconnect unchanged components? Well, as sources and transforms will
// recreate their fanouts every time they're changed, we can run into a situation where a
// transform/sink, which we'll call B, is pointed at a source/transform that was changed, which
// we'll call A, but because B itself didn't change at all, we haven't yet reconnected it.
//
// Instead of propagating connections forward -- B reconnecting A forcefully -- we only
// connect components backwards i.e. transforms to sources/transforms, and sinks to
// sources/transforms, to ensure we're connecting components in order.
self.reattach_severed_inputs(diff);
// Broadcast any topology changes to subscribers.
if !self.watch.0.is_closed() {
let outputs = self
.outputs
.clone()
.into_iter()
.flat_map(|(output_id, control_tx)| {
self.outputs_tap_metadata.get(&output_id.component).map(
|(component_kind, component_type)| {
(
TapOutput {
output_id,
component_kind,
component_type: component_type.clone(),
},
control_tx,
)
},
)
})
.collect::<HashMap<_, _>>();
let mut removals = diff.sources.to_remove.clone();
removals.extend(diff.transforms.to_remove.iter().cloned());
self.watch
.0
.send(TapResource {
outputs,
inputs: self.inputs_tap_metadata.clone(),
source_keys: diff
.sources
.changed_and_added()
.map(|key| key.to_string())
.collect(),
sink_keys: diff
.sinks
.changed_and_added()
.map(|key| key.to_string())
.collect(),
// Note, only sources and transforms are relevant. Sinks do
// not have outputs to tap.
removals,
})
.expect("Couldn't broadcast config changes.");
}
}
async fn setup_outputs(
&mut self,
key: &ComponentKey,
new_pieces: &mut builder::TopologyPieces,
) {
let outputs = new_pieces.outputs.remove(key).unwrap();
for (port, output) in outputs {
debug!(component = %key, output_id = ?port, "Configuring output for component.");
let id = OutputId {
component: key.clone(),
port,
};
self.outputs.insert(id, output);
}
}
async fn setup_inputs(
&mut self,
key: &ComponentKey,
diff: &ConfigDiff,
new_pieces: &mut builder::TopologyPieces,
) {
let (tx, inputs) = new_pieces.inputs.remove(key).unwrap();
let old_inputs = self
.config
.inputs_for_node(key)
.into_iter()
.flatten()
.cloned()
.collect::<HashSet<_>>();
let new_inputs = inputs.iter().cloned().collect::<HashSet<_>>();
let inputs_to_add = &new_inputs - &old_inputs;
for input in inputs {
let output = self.outputs.get_mut(&input).expect("unknown output");
if diff.contains(&input.component) || inputs_to_add.contains(&input) {
// If the input we're connecting to is changing, that means its outputs will have been
// recreated, so instead of replacing a paused sink, we have to add it to this new
// output for the first time, since there's nothing to actually replace at this point.
debug!(component = %key, fanout_id = %input, "Adding component input to fanout.");
_ = output.send(ControlMessage::Add(key.clone(), tx.clone()));
} else {
// We know that if this component is connected to a given input, and neither
// components were changed, then the output must still exist, which means we paused
// this component's connection to its output, so we have to replace that connection
// now:
debug!(component = %key, fanout_id = %input, "Replacing component input in fanout.");
_ = output.send(ControlMessage::Replace(key.clone(), tx.clone()));
}
}
self.inputs.insert(key.clone(), tx);
new_pieces
.detach_triggers
.remove(key)
.map(|trigger| self.detach_triggers.insert(key.clone(), trigger.into()));
}
fn remove_outputs(&mut self, key: &ComponentKey) {
self.outputs.retain(|id, _output| &id.component != key);
}
async fn remove_inputs(&mut self, key: &ComponentKey, diff: &ConfigDiff, new_config: &Config) {
self.inputs.remove(key);
self.detach_triggers.remove(key);
let old_inputs = self.config.inputs_for_node(key).expect("node exists");
let new_inputs = new_config
.inputs_for_node(key)
.unwrap_or_default()
.iter()
.collect::<HashSet<_>>();
for input in old_inputs {
if let Some(output) = self.outputs.get_mut(input) {
if diff.contains(&input.component)
|| diff.is_removed(key)
|| !new_inputs.contains(input)
{
// 3 cases to remove the input:
//
// Case 1: If the input we're removing ourselves from is changing, that means its
// outputs will be recreated, so instead of pausing the sink, we just delete it
// outright to ensure things are clean.
//
// Case 2: If this component itself is being removed, then pausing makes no sense
// because it isn't coming back.
//
// Case 3: This component is no longer connected to the input from new config.
debug!(component = %key, fanout_id = %input, "Removing component input from fanout.");
_ = output.send(ControlMessage::Remove(key.clone()));
} else {
// We know that if this component is connected to a given input, and it isn't being
// changed, then it will exist when we reconnect inputs, so we should pause it
// now to pause further sends through that component until we reconnect:
debug!(component = %key, fanout_id = %input, "Pausing component input in fanout.");
_ = output.send(ControlMessage::Pause(key.clone()));
}
}
}
}
fn reattach_severed_inputs(&mut self, diff: &ConfigDiff) {
let unchanged_transforms = self
.config
.transforms()
.filter(|(key, _)| !diff.transforms.contains(key));
for (transform_key, transform) in unchanged_transforms {
let changed_outputs = get_changed_outputs(diff, transform.inputs.clone());
for output_id in changed_outputs {
debug!(component = %transform_key, fanout_id = %output_id.component, "Reattaching component input to fanout.");
let input = self.inputs.get(transform_key).cloned().unwrap();
let output = self.outputs.get_mut(&output_id).unwrap();
_ = output.send(ControlMessage::Add(transform_key.clone(), input));
}
}
let unchanged_sinks = self
.config
.sinks()
.filter(|(key, _)| !diff.sinks.contains(key));
for (sink_key, sink) in unchanged_sinks {
let changed_outputs = get_changed_outputs(diff, sink.inputs.clone());
for output_id in changed_outputs {
debug!(component = %sink_key, fanout_id = %output_id.component, "Reattaching component input to fanout.");
let input = self.inputs.get(sink_key).cloned().unwrap();
let output = self.outputs.get_mut(&output_id).unwrap();
_ = output.send(ControlMessage::Add(sink_key.clone(), input));
}
}
}
/// Starts any new or changed components in the given configuration diff.
pub(crate) fn spawn_diff(&mut self, diff: &ConfigDiff, mut new_pieces: TopologyPieces) {
for key in &diff.sources.to_change {
debug!(message = "Spawning changed source.", key = %key);
self.spawn_source(key, &mut new_pieces);
}
for key in &diff.sources.to_add {
debug!(message = "Spawning new source.", key = %key);
self.spawn_source(key, &mut new_pieces);
}
for key in &diff.transforms.to_change {
debug!(message = "Spawning changed transform.", key = %key);
self.spawn_transform(key, &mut new_pieces);
}
for key in &diff.transforms.to_add {
debug!(message = "Spawning new transform.", key = %key);
self.spawn_transform(key, &mut new_pieces);
}
for key in &diff.sinks.to_change {
debug!(message = "Spawning changed sink.", key = %key);
self.spawn_sink(key, &mut new_pieces);
}
for key in &diff.sinks.to_add {
trace!(message = "Spawning new sink.", key = %key);
self.spawn_sink(key, &mut new_pieces);
}
}
fn spawn_sink(&mut self, key: &ComponentKey, new_pieces: &mut builder::TopologyPieces) {
let task = new_pieces.tasks.remove(key).unwrap();
let span = error_span!(
"sink",
component_kind = "sink",
component_id = %task.id(),
component_type = %task.typetag(),
);
let task_span = span.or_current();
#[cfg(feature = "allocation-tracing")]
if crate::internal_telemetry::allocations::is_allocation_tracing_enabled() {
let group_id = crate::internal_telemetry::allocations::acquire_allocation_group_id(
task.id().to_string(),
"sink".to_string(),
task.typetag().to_string(),
);
debug!(
component_kind = "sink",
component_type = task.typetag(),
component_id = task.id(),
group_id = group_id.as_raw().to_string(),
"Registered new allocation group."
);
group_id.attach_to_span(&task_span);
}
let task_name = format!(">> {} ({})", task.typetag(), task.id());
let task = {
let key = key.clone();
handle_errors(task, self.abort_tx.clone(), |error| {
ShutdownError::SinkAborted { key, error }
})
}
.instrument(task_span);
let spawned = spawn_named(task, task_name.as_ref());
if let Some(previous) = self.tasks.insert(key.clone(), spawned) {
drop(previous); // detach and forget
}
}
fn spawn_transform(&mut self, key: &ComponentKey, new_pieces: &mut builder::TopologyPieces) {
let task = new_pieces.tasks.remove(key).unwrap();
let span = error_span!(
"transform",
component_kind = "transform",
component_id = %task.id(),
component_type = %task.typetag(),
);
let task_span = span.or_current();
#[cfg(feature = "allocation-tracing")]
if crate::internal_telemetry::allocations::is_allocation_tracing_enabled() {
let group_id = crate::internal_telemetry::allocations::acquire_allocation_group_id(
task.id().to_string(),
"transform".to_string(),
task.typetag().to_string(),
);
debug!(
component_kind = "transform",
component_type = task.typetag(),
component_id = task.id(),
group_id = group_id.as_raw().to_string(),
"Registered new allocation group."
);
group_id.attach_to_span(&task_span);
}
let task_name = format!(">> {} ({}) >>", task.typetag(), task.id());
let task = {
let key = key.clone();
handle_errors(task, self.abort_tx.clone(), |error| {
ShutdownError::TransformAborted { key, error }
})
}
.instrument(task_span);
let spawned = spawn_named(task, task_name.as_ref());
if let Some(previous) = self.tasks.insert(key.clone(), spawned) {
drop(previous); // detach and forget
}
}
fn spawn_source(&mut self, key: &ComponentKey, new_pieces: &mut builder::TopologyPieces) {
let task = new_pieces.tasks.remove(key).unwrap();
let span = error_span!(
"source",
component_kind = "source",
component_id = %task.id(),
component_type = %task.typetag(),
);
let task_span = span.or_current();
#[cfg(feature = "allocation-tracing")]
if crate::internal_telemetry::allocations::is_allocation_tracing_enabled() {
let group_id = crate::internal_telemetry::allocations::acquire_allocation_group_id(
task.id().to_string(),
"source".to_string(),
task.typetag().to_string(),
);
debug!(
component_kind = "source",
component_type = task.typetag(),
component_id = task.id(),
group_id = group_id.as_raw().to_string(),
"Registered new allocation group."
);
group_id.attach_to_span(&task_span);
}
let task_name = format!("{} ({}) >>", task.typetag(), task.id());
let task = {
let key = key.clone();
handle_errors(task, self.abort_tx.clone(), |error| {
ShutdownError::SourceAborted { key, error }
})
}
.instrument(task_span.clone());
let spawned = spawn_named(task, task_name.as_ref());
if let Some(previous) = self.tasks.insert(key.clone(), spawned) {
drop(previous); // detach and forget
}
self.shutdown_coordinator
.takeover_source(key, &mut new_pieces.shutdown_coordinator);
// Now spawn the actual source task.
let source_task = new_pieces.source_tasks.remove(key).unwrap();
let source_task = {
let key = key.clone();
handle_errors(source_task, self.abort_tx.clone(), |error| {
ShutdownError::SourceAborted { key, error }
})
}
.instrument(task_span);
self.source_tasks
.insert(key.clone(), spawn_named(source_task, task_name.as_ref()));
}
pub async fn start_init_validated(
config: Config,
extra_context: ExtraContext,
) -> Option<(Self, ShutdownErrorReceiver)> {
let diff = ConfigDiff::initial(&config);
let pieces =
TopologyPieces::build_or_log_errors(&config, &diff, HashMap::new(), extra_context)
.await?;
Self::start_validated(config, diff, pieces).await
}
pub async fn start_validated(
config: Config,
diff: ConfigDiff,
mut pieces: TopologyPieces,
) -> Option<(Self, ShutdownErrorReceiver)> {
let (abort_tx, abort_rx) = mpsc::unbounded_channel();
let expire_metrics = match (
config.global.expire_metrics,
config.global.expire_metrics_secs,
) {
(Some(e), None) => {
warn!(
"DEPRECATED: `expire_metrics` setting is deprecated and will be removed in a future version. Use `expire_metrics_secs` instead."
);
if e < Duration::from_secs(0) {
None
} else {
Some(e.as_secs_f64())
}
}
(Some(_), Some(_)) => {
error!("Cannot set both `expire_metrics` and `expire_metrics_secs`.");
return None;
}
(None, Some(e)) => {
if e < 0f64 {
None
} else {
Some(e)
}
}
(None, None) => Some(300f64),
};
if let Err(error) = crate::metrics::Controller::get()
.expect("Metrics must be initialized")
.set_expiry(expire_metrics)
{
error!(message = "Invalid metrics expiry.", %error);
return None;
}
let mut running_topology = Self::new(config, abort_tx);
if !running_topology
.run_healthchecks(&diff, &mut pieces, running_topology.config.healthchecks)
.await
{
return None;
}
running_topology.connect_diff(&diff, &mut pieces).await;
running_topology.spawn_diff(&diff, pieces);
Some((running_topology, abort_rx))
}
}
fn get_changed_outputs(diff: &ConfigDiff, output_ids: Inputs<OutputId>) -> Vec<OutputId> {
let mut changed_outputs = Vec::new();
for source_key in &diff.sources.to_change {
changed_outputs.extend(
output_ids
.iter()
.filter(|id| &id.component == source_key)
.cloned(),
);
}
for transform_key in &diff.transforms.to_change {
changed_outputs.extend(
output_ids
.iter()
.filter(|id| &id.component == transform_key)
.cloned(),
);
}
changed_outputs
}