1use std::{
2 collections::{HashMap, HashSet},
3 num::NonZeroUsize,
4};
5
6use futures::{future::try_join_all, FutureExt};
7use tokio::sync::{
8 mpsc as tokio_mpsc,
9 mpsc::error::{SendError, TrySendError},
10 oneshot,
11};
12use tracing::{Instrument, Span};
13use uuid::Uuid;
14use vector_buffers::{topology::builder::TopologyBuilder, WhenFull};
15use vector_common::config::ComponentKey;
16use vector_core::event::{EventArray, LogArray, MetricArray, TraceArray};
17use vector_core::fanout;
18
19use crate::notification::{InvalidMatch, Matched, NotMatched, Notification};
20use crate::topology::{TapOutput, TapResource, WatchRx};
21
22type TapSender = tokio_mpsc::Sender<TapPayload>;
24
25type ShutdownTx = oneshot::Sender<()>;
27type ShutdownRx = oneshot::Receiver<()>;
28
29const TAP_BUFFER_SIZE: NonZeroUsize = NonZeroUsize::new(100).unwrap();
30
31trait GlobMatcher<T> {
33 fn matches_glob(&self, rhs: T) -> bool;
34}
35
36impl GlobMatcher<&str> for String {
37 fn matches_glob(&self, rhs: &str) -> bool {
38 match glob::Pattern::new(self) {
39 Ok(pattern) => pattern.matches(rhs),
40 _ => false,
41 }
42 }
43}
44
45#[derive(Debug, Eq, PartialEq, Hash)]
50enum Pattern {
51 OutputPattern(glob::Pattern),
53 InputPattern(String, Vec<glob::Pattern>),
60}
61
62impl GlobMatcher<&str> for Pattern {
63 fn matches_glob(&self, rhs: &str) -> bool {
64 match self {
65 Pattern::OutputPattern(pattern) => pattern.matches(rhs),
66 Pattern::InputPattern(_, patterns) => {
67 patterns.iter().any(|pattern| pattern.matches(rhs))
68 }
69 }
70 }
71}
72
73#[derive(Debug)]
76pub struct TapPatterns {
77 pub for_outputs: HashSet<String>,
78 pub for_inputs: HashSet<String>,
79}
80
81impl TapPatterns {
82 pub const fn new(for_outputs: HashSet<String>, for_inputs: HashSet<String>) -> Self {
83 Self {
84 for_outputs,
85 for_inputs,
86 }
87 }
88
89 pub fn all_patterns(&self) -> HashSet<String> {
91 self.for_outputs
92 .iter()
93 .cloned()
94 .chain(self.for_inputs.iter().cloned())
95 .collect()
96 }
97}
98
99#[derive(Debug)]
102pub enum TapPayload {
103 Log(TapOutput, LogArray),
104 Metric(TapOutput, MetricArray),
105 Trace(TapOutput, TraceArray),
106 Notification(Notification),
107}
108
109impl TapPayload {
110 pub fn matched<T: Into<String>>(pattern: T) -> Self {
112 Self::Notification(Notification::Matched(Matched::new(pattern.into())))
113 }
114
115 pub fn not_matched<T: Into<String>>(pattern: T) -> Self {
117 Self::Notification(Notification::NotMatched(NotMatched::new(pattern.into())))
118 }
119
120 pub fn invalid_input_pattern_match<T: Into<String>>(
122 pattern: T,
123 invalid_matches: Vec<String>,
124 ) -> Self {
125 let pattern = pattern.into();
126 let message = format!("[tap] Warning: source inputs cannot be tapped. Input pattern '{pattern}' matches sources {invalid_matches:?}");
127 Self::Notification(Notification::InvalidMatch(InvalidMatch::new(
128 message,
129 pattern,
130 invalid_matches,
131 )))
132 }
133
134 pub fn invalid_output_pattern_match<T: Into<String>>(
136 pattern: T,
137 invalid_matches: Vec<String>,
138 ) -> Self {
139 let pattern = pattern.into();
140 let message = format!(
141 "[tap] Warning: sink outputs cannot be tapped. Output pattern '{pattern}' matches sinks {invalid_matches:?}"
142 );
143 Self::Notification(Notification::InvalidMatch(InvalidMatch::new(
144 message,
145 pattern,
146 invalid_matches,
147 )))
148 }
149}
150
151#[derive(Clone)]
153pub struct TapTransformer {
154 tap_tx: TapSender,
155 output: TapOutput,
156}
157
158impl TapTransformer {
159 pub const fn new(tap_tx: TapSender, output: TapOutput) -> Self {
160 Self { tap_tx, output }
161 }
162
163 pub fn try_send(&mut self, events: EventArray) {
164 let payload = match events {
165 EventArray::Logs(logs) => TapPayload::Log(self.output.clone(), logs),
166 EventArray::Metrics(metrics) => TapPayload::Metric(self.output.clone(), metrics),
167 EventArray::Traces(traces) => TapPayload::Trace(self.output.clone(), traces),
168 };
169
170 if let Err(TrySendError::Closed(payload)) = self.tap_tx.try_send(payload) {
171 debug!(
172 message = "Couldn't send event.",
173 payload = ?payload,
174 component_id = ?self.output.output_id,
175 );
176 }
177 }
178}
179
180#[derive(Debug)]
183pub struct TapController {
184 _shutdown: ShutdownTx,
185}
186
187impl TapController {
188 pub fn new(watch_rx: WatchRx, tap_tx: TapSender, patterns: TapPatterns) -> Self {
192 let (_shutdown, shutdown_rx) = oneshot::channel();
193
194 tokio::spawn(
195 tap_handler(patterns, tap_tx, watch_rx, shutdown_rx).instrument(error_span!(
196 "tap_handler",
197 component_kind = "sink",
198 component_id = "_tap", component_type = "tap",
200 )),
201 );
202
203 Self { _shutdown }
204 }
205}
206
207fn shutdown_trigger(control_tx: fanout::ControlChannel, sink_id: ComponentKey) -> ShutdownTx {
209 let (shutdown_tx, shutdown_rx) = oneshot::channel();
210
211 tokio::spawn(async move {
212 _ = shutdown_rx.await;
213 if control_tx
214 .send(fanout::ControlMessage::Remove(sink_id.clone()))
215 .is_err()
216 {
217 debug!(message = "Couldn't disconnect sink.", ?sink_id);
218 } else {
219 debug!(message = "Disconnected sink.", ?sink_id);
220 }
221 });
222
223 shutdown_tx
224}
225
226async fn send_matched(tx: TapSender, pattern: String) -> Result<(), SendError<TapPayload>> {
228 debug!(message = "Sending matched notification.", pattern = ?pattern);
229 tx.send(TapPayload::matched(pattern)).await
230}
231
232async fn send_not_matched(tx: TapSender, pattern: String) -> Result<(), SendError<TapPayload>> {
234 debug!(message = "Sending not matched notification.", pattern = ?pattern);
235 tx.send(TapPayload::not_matched(pattern)).await
236}
237
238async fn send_invalid_input_pattern_match(
240 tx: TapSender,
241 pattern: String,
242 invalid_matches: Vec<String>,
243) -> Result<(), SendError<TapPayload>> {
244 debug!(message = "Sending invalid input pattern match notification.", pattern = ?pattern, invalid_matches = ?invalid_matches);
245 tx.send(TapPayload::invalid_input_pattern_match(
246 pattern,
247 invalid_matches,
248 ))
249 .await
250}
251
252async fn send_invalid_output_pattern_match(
254 tx: TapSender,
255 pattern: String,
256 invalid_matches: Vec<String>,
257) -> Result<(), SendError<TapPayload>> {
258 debug!(message = "Sending invalid output pattern match notification.", pattern = ?pattern, invalid_matches = ?invalid_matches);
259 tx.send(TapPayload::invalid_output_pattern_match(
260 pattern,
261 invalid_matches,
262 ))
263 .await
264}
265
266async fn tap_handler(
269 patterns: TapPatterns,
270 tx: TapSender,
271 mut watch_rx: WatchRx,
272 mut shutdown_rx: ShutdownRx,
273) {
274 debug!(message = "Started tap.", outputs_patterns = ?patterns.for_outputs, inputs_patterns = ?patterns.for_inputs);
275
276 let mut sinks: HashMap<ComponentKey, _> = HashMap::new();
279
280 let user_provided_patterns = patterns.all_patterns();
283
284 let mut last_matches = HashSet::new();
287
288 loop {
289 tokio::select! {
290 _ = &mut shutdown_rx => break,
291 Ok(_) = watch_rx.changed() => {
292 let mut matched = HashSet::new();
294
295 let TapResource {
298 outputs,
299 inputs,
300 source_keys,
301 sink_keys,
302 removals,
303 } = watch_rx.borrow().clone();
304
305 let output_keys = outputs.keys().map(|output| output.output_id.component.clone()).collect::<HashSet<_>>();
307 sinks.retain(|key, _| {
308 !removals.contains(key) && output_keys.contains(key) || {
309 debug!(message = "Removing component.", component_id = %key);
310 false
311 }
312 });
313
314 let mut component_id_patterns = patterns.for_outputs.iter()
315 .filter_map(|p| glob::Pattern::new(p).ok())
316 .map(Pattern::OutputPattern).collect::<HashSet<_>>();
317
318 for pattern in patterns.for_inputs.iter() {
320 if let Ok(glob) = glob::Pattern::new(pattern) {
321 match inputs.iter().filter(|(key, _)|
322 glob.matches(&key.to_string())
323 ).flat_map(|(_, related_inputs)| related_inputs.iter().map(|id| id.to_string()).collect::<Vec<_>>()).collect::<HashSet<_>>() {
324 found if !found.is_empty() => {
325 component_id_patterns.insert(Pattern::InputPattern(pattern.clone(), found.into_iter()
326 .filter_map(|p| glob::Pattern::new(&p).ok()).collect::<Vec<_>>()));
327 }
328 _ => {
329 debug!(message="Input pattern not expanded: no matching components.", ?pattern);
330 }
331 }
332 }
333 }
334
335 for (output, control_tx) in outputs.iter() {
338 match component_id_patterns
339 .iter()
340 .filter(|pattern| pattern.matches_glob(&output.output_id.to_string()))
341 .collect::<Vec<_>>()
342 {
343 found if !found.is_empty() => {
344 debug!(
345 message="Component matched.",
346 ?output.output_id, ?component_id_patterns, matched = ?found
347 );
348
349 let (tap_buffer_tx, mut tap_buffer_rx) = TopologyBuilder::standalone_memory(TAP_BUFFER_SIZE, WhenFull::DropNewest, &Span::current()).await;
354 let mut tap_transformer = TapTransformer::new(tx.clone(), output.clone());
355
356 tokio::spawn(async move {
357 while let Some(events) = tap_buffer_rx.next().await {
358 tap_transformer.try_send(events);
359 }
360 });
361
362 let sink_id = Uuid::new_v4().to_string();
368 match control_tx
369 .send(fanout::ControlMessage::Add(ComponentKey::from(sink_id.as_str()), tap_buffer_tx))
370 {
371 Ok(_) => {
372 debug!(
373 message = "Sink connected.", ?sink_id, ?output.output_id,
374 );
375
376 sinks.entry(output.output_id.component.clone()).or_insert_with(Vec::new).push(
379 shutdown_trigger(control_tx.clone(), ComponentKey::from(sink_id.as_str()))
380 );
381 }
382 Err(error) => {
383 error!(
384 message = "Couldn't connect sink.",
385 ?error,
386 ?output.output_id,
387 ?sink_id,
388 );
389 }
390 }
391
392 matched.extend(found.iter().map(|pattern| {
393 match pattern {
394 Pattern::OutputPattern(p) => p.to_string(),
395 Pattern::InputPattern(p, _) => p.to_owned(),
396 }
397 }));
398 }
399 _ => {
400 debug!(
401 message="Component not matched.", ?output.output_id, ?component_id_patterns
402 );
403 }
404 }
405 }
406
407 let mut notifications = Vec::new();
409
410 for pattern in matched.difference(&last_matches) {
412 notifications.push(send_matched(tx.clone(), pattern.clone()).boxed());
413 }
414
415 for pattern in user_provided_patterns.difference(&matched) {
417 notifications.push(send_not_matched(tx.clone(), pattern.clone()).boxed());
418 }
419
420 for pattern in patterns.for_inputs.iter() {
422 if let Ok(glob) = glob::Pattern::new(pattern) {
423 let invalid_matches = source_keys.iter().filter(|key| glob.matches(key)).cloned().collect::<Vec<_>>();
424 if !invalid_matches.is_empty() {
425 notifications.push(send_invalid_input_pattern_match(tx.clone(), pattern.clone(), invalid_matches).boxed())
426 }
427 }
428 }
429 for pattern in patterns.for_outputs.iter() {
430 if let Ok(glob) = glob::Pattern::new(pattern) {
431 let invalid_matches = sink_keys.iter().filter(|key| glob.matches(key)).cloned().collect::<Vec<_>>();
432 if !invalid_matches.is_empty() {
433 notifications.push(send_invalid_output_pattern_match(tx.clone(), pattern.clone(), invalid_matches).boxed())
434 }
435 }
436 }
437
438 last_matches = matched;
439
440 if try_join_all(notifications).await.is_err() {
443 debug!("Couldn't send notification(s); tap gone away.");
444 break;
445 }
446 }
447 }
448 }
449
450 debug!(message = "Stopped tap.", outputs_patterns = ?patterns.for_outputs, inputs_patterns = ?patterns.for_inputs);
451}
452
453mod tests {
454 #[test]
455 fn matches() {
457 use super::GlobMatcher;
458
459 let patterns = ["ab*", "12?", "xy?"];
460
461 for id in &["abc", "123", "xyz"] {
463 assert!(patterns.iter().any(|p| p.to_string().matches_glob(id)));
464 }
465
466 for id in &["xzy", "ad*", "1234"] {
468 assert!(!patterns.iter().any(|p| p.to_string().matches_glob(id)));
469 }
470 }
471}