1use std::{
2 collections::{HashMap, HashSet},
3 num::NonZeroUsize,
4};
5
6use futures::{FutureExt, future::try_join_all};
7use tokio::sync::{
8 mpsc as tokio_mpsc,
9 mpsc::error::{SendError, TrySendError},
10 oneshot,
11};
12use tracing::{Instrument, Span};
13use uuid::Uuid;
14use vector_buffers::{WhenFull, topology::builder::TopologyBuilder};
15use vector_common::config::ComponentKey;
16use vector_core::{
17 event::{EventArray, LogArray, MetricArray, TraceArray},
18 fanout,
19};
20
21use crate::{
22 notification::{InvalidMatch, Matched, NotMatched, Notification},
23 topology::{TapOutput, TapResource, WatchRx},
24};
25
26type TapSender = tokio_mpsc::Sender<TapPayload>;
28
29type ShutdownTx = oneshot::Sender<()>;
31type ShutdownRx = oneshot::Receiver<()>;
32
33const TAP_BUFFER_SIZE: NonZeroUsize = NonZeroUsize::new(100).unwrap();
34
35trait GlobMatcher<T> {
37 fn matches_glob(&self, rhs: T) -> bool;
38}
39
40impl GlobMatcher<&str> for String {
41 fn matches_glob(&self, rhs: &str) -> bool {
42 match glob::Pattern::new(self) {
43 Ok(pattern) => pattern.matches(rhs),
44 _ => false,
45 }
46 }
47}
48
49#[derive(Debug, Eq, PartialEq, Hash)]
54enum Pattern {
55 OutputPattern(glob::Pattern),
57 InputPattern(String, Vec<glob::Pattern>),
64}
65
66impl GlobMatcher<&str> for Pattern {
67 fn matches_glob(&self, rhs: &str) -> bool {
68 match self {
69 Pattern::OutputPattern(pattern) => pattern.matches(rhs),
70 Pattern::InputPattern(_, patterns) => {
71 patterns.iter().any(|pattern| pattern.matches(rhs))
72 }
73 }
74 }
75}
76
77#[derive(Debug)]
80pub struct TapPatterns {
81 pub for_outputs: HashSet<String>,
82 pub for_inputs: HashSet<String>,
83}
84
85impl TapPatterns {
86 pub const fn new(for_outputs: HashSet<String>, for_inputs: HashSet<String>) -> Self {
87 Self {
88 for_outputs,
89 for_inputs,
90 }
91 }
92
93 pub fn all_patterns(&self) -> HashSet<String> {
95 self.for_outputs
96 .iter()
97 .cloned()
98 .chain(self.for_inputs.iter().cloned())
99 .collect()
100 }
101}
102
103#[derive(Debug)]
106pub enum TapPayload {
107 Log(TapOutput, LogArray),
108 Metric(TapOutput, MetricArray),
109 Trace(TapOutput, TraceArray),
110 Notification(Notification),
111}
112
113impl TapPayload {
114 pub fn matched<T: Into<String>>(pattern: T) -> Self {
116 Self::Notification(Notification::Matched(Matched::new(pattern.into())))
117 }
118
119 pub fn not_matched<T: Into<String>>(pattern: T) -> Self {
121 Self::Notification(Notification::NotMatched(NotMatched::new(pattern.into())))
122 }
123
124 pub fn invalid_input_pattern_match<T: Into<String>>(
126 pattern: T,
127 invalid_matches: Vec<String>,
128 ) -> Self {
129 let pattern = pattern.into();
130 let message = format!(
131 "[tap] Warning: source inputs cannot be tapped. Input pattern '{pattern}' matches sources {invalid_matches:?}"
132 );
133 Self::Notification(Notification::InvalidMatch(InvalidMatch::new(
134 message,
135 pattern,
136 invalid_matches,
137 )))
138 }
139
140 pub fn invalid_output_pattern_match<T: Into<String>>(
142 pattern: T,
143 invalid_matches: Vec<String>,
144 ) -> Self {
145 let pattern = pattern.into();
146 let message = format!(
147 "[tap] Warning: sink outputs cannot be tapped. Output pattern '{pattern}' matches sinks {invalid_matches:?}"
148 );
149 Self::Notification(Notification::InvalidMatch(InvalidMatch::new(
150 message,
151 pattern,
152 invalid_matches,
153 )))
154 }
155}
156
157#[derive(Clone)]
159pub struct TapTransformer {
160 tap_tx: TapSender,
161 output: TapOutput,
162}
163
164impl TapTransformer {
165 pub const fn new(tap_tx: TapSender, output: TapOutput) -> Self {
166 Self { tap_tx, output }
167 }
168
169 pub fn try_send(&mut self, events: EventArray) {
170 let payload = match events {
171 EventArray::Logs(logs) => TapPayload::Log(self.output.clone(), logs),
172 EventArray::Metrics(metrics) => TapPayload::Metric(self.output.clone(), metrics),
173 EventArray::Traces(traces) => TapPayload::Trace(self.output.clone(), traces),
174 };
175
176 if let Err(TrySendError::Closed(payload)) = self.tap_tx.try_send(payload) {
177 debug!(
178 message = "Couldn't send event.",
179 payload = ?payload,
180 component_id = ?self.output.output_id,
181 );
182 }
183 }
184}
185
186#[derive(Debug)]
189pub struct TapController {
190 _shutdown: ShutdownTx,
191}
192
193impl TapController {
194 pub fn new(watch_rx: WatchRx, tap_tx: TapSender, patterns: TapPatterns) -> Self {
198 let (_shutdown, shutdown_rx) = oneshot::channel();
199
200 tokio::spawn(
201 tap_handler(patterns, tap_tx, watch_rx, shutdown_rx).instrument(error_span!(
202 "tap_handler",
203 component_kind = "sink",
204 component_id = "_tap", component_type = "tap",
206 )),
207 );
208
209 Self { _shutdown }
210 }
211}
212
213fn shutdown_trigger(control_tx: fanout::ControlChannel, sink_id: ComponentKey) -> ShutdownTx {
215 let (shutdown_tx, shutdown_rx) = oneshot::channel();
216
217 tokio::spawn(async move {
218 _ = shutdown_rx.await;
219 if control_tx
220 .send(fanout::ControlMessage::Remove(sink_id.clone()))
221 .is_err()
222 {
223 debug!(message = "Couldn't disconnect sink.", ?sink_id);
224 } else {
225 debug!(message = "Disconnected sink.", ?sink_id);
226 }
227 });
228
229 shutdown_tx
230}
231
232async fn send_matched(tx: TapSender, pattern: String) -> Result<(), SendError<TapPayload>> {
234 debug!(message = "Sending matched notification.", pattern = ?pattern);
235 tx.send(TapPayload::matched(pattern)).await
236}
237
238async fn send_not_matched(tx: TapSender, pattern: String) -> Result<(), SendError<TapPayload>> {
240 debug!(message = "Sending not matched notification.", pattern = ?pattern);
241 tx.send(TapPayload::not_matched(pattern)).await
242}
243
244async fn send_invalid_input_pattern_match(
246 tx: TapSender,
247 pattern: String,
248 invalid_matches: Vec<String>,
249) -> Result<(), SendError<TapPayload>> {
250 debug!(message = "Sending invalid input pattern match notification.", pattern = ?pattern, invalid_matches = ?invalid_matches);
251 tx.send(TapPayload::invalid_input_pattern_match(
252 pattern,
253 invalid_matches,
254 ))
255 .await
256}
257
258async fn send_invalid_output_pattern_match(
260 tx: TapSender,
261 pattern: String,
262 invalid_matches: Vec<String>,
263) -> Result<(), SendError<TapPayload>> {
264 debug!(message = "Sending invalid output pattern match notification.", pattern = ?pattern, invalid_matches = ?invalid_matches);
265 tx.send(TapPayload::invalid_output_pattern_match(
266 pattern,
267 invalid_matches,
268 ))
269 .await
270}
271
272async fn tap_handler(
275 patterns: TapPatterns,
276 tx: TapSender,
277 mut watch_rx: WatchRx,
278 mut shutdown_rx: ShutdownRx,
279) {
280 debug!(message = "Started tap.", outputs_patterns = ?patterns.for_outputs, inputs_patterns = ?patterns.for_inputs);
281
282 let mut sinks: HashMap<ComponentKey, _> = HashMap::new();
285
286 let user_provided_patterns = patterns.all_patterns();
289
290 let mut last_matches = HashSet::new();
293
294 loop {
295 tokio::select! {
296 _ = &mut shutdown_rx => break,
297 Ok(_) = watch_rx.changed() => {
298 let mut matched = HashSet::new();
300
301 let TapResource {
304 outputs,
305 inputs,
306 source_keys,
307 sink_keys,
308 removals,
309 } = watch_rx.borrow().clone();
310
311 let output_keys = outputs.keys().map(|output| output.output_id.component.clone()).collect::<HashSet<_>>();
313 sinks.retain(|key, _| {
314 !removals.contains(key) && output_keys.contains(key) || {
315 debug!(message = "Removing component.", component_id = %key);
316 false
317 }
318 });
319
320 let mut component_id_patterns = patterns.for_outputs.iter()
321 .filter_map(|p| glob::Pattern::new(p).ok())
322 .map(Pattern::OutputPattern).collect::<HashSet<_>>();
323
324 for pattern in patterns.for_inputs.iter() {
326 if let Ok(glob) = glob::Pattern::new(pattern) {
327 match inputs.iter().filter(|(key, _)|
328 glob.matches(&key.to_string())
329 ).flat_map(|(_, related_inputs)| related_inputs.iter().map(|id| id.to_string()).collect::<Vec<_>>()).collect::<HashSet<_>>() {
330 found if !found.is_empty() => {
331 component_id_patterns.insert(Pattern::InputPattern(pattern.clone(), found.into_iter()
332 .filter_map(|p| glob::Pattern::new(&p).ok()).collect::<Vec<_>>()));
333 }
334 _ => {
335 debug!(message="Input pattern not expanded: no matching components.", ?pattern);
336 }
337 }
338 }
339 }
340
341 for (output, control_tx) in outputs.iter() {
344 match component_id_patterns
345 .iter()
346 .filter(|pattern| pattern.matches_glob(&output.output_id.to_string()))
347 .collect::<Vec<_>>()
348 {
349 found if !found.is_empty() => {
350 debug!(
351 message="Component matched.",
352 ?output.output_id, ?component_id_patterns, matched = ?found
353 );
354
355 let (tap_buffer_tx, mut tap_buffer_rx) = TopologyBuilder::standalone_memory(TAP_BUFFER_SIZE, WhenFull::DropNewest, &Span::current()).await;
360 let mut tap_transformer = TapTransformer::new(tx.clone(), output.clone());
361
362 tokio::spawn(async move {
363 while let Some(events) = tap_buffer_rx.next().await {
364 tap_transformer.try_send(events);
365 }
366 });
367
368 let sink_id = Uuid::new_v4().to_string();
374 match control_tx
375 .send(fanout::ControlMessage::Add(ComponentKey::from(sink_id.as_str()), tap_buffer_tx))
376 {
377 Ok(_) => {
378 debug!(
379 message = "Sink connected.", ?sink_id, ?output.output_id,
380 );
381
382 sinks.entry(output.output_id.component.clone()).or_insert_with(Vec::new).push(
385 shutdown_trigger(control_tx.clone(), ComponentKey::from(sink_id.as_str()))
386 );
387 }
388 Err(error) => {
389 error!(
390 message = "Couldn't connect sink.",
391 ?error,
392 ?output.output_id,
393 ?sink_id,
394 );
395 }
396 }
397
398 matched.extend(found.iter().map(|pattern| {
399 match pattern {
400 Pattern::OutputPattern(p) => p.to_string(),
401 Pattern::InputPattern(p, _) => p.to_owned(),
402 }
403 }));
404 }
405 _ => {
406 debug!(
407 message="Component not matched.", ?output.output_id, ?component_id_patterns
408 );
409 }
410 }
411 }
412
413 let mut notifications = Vec::new();
415
416 for pattern in matched.difference(&last_matches) {
418 notifications.push(send_matched(tx.clone(), pattern.clone()).boxed());
419 }
420
421 for pattern in user_provided_patterns.difference(&matched) {
423 notifications.push(send_not_matched(tx.clone(), pattern.clone()).boxed());
424 }
425
426 for pattern in patterns.for_inputs.iter() {
428 if let Ok(glob) = glob::Pattern::new(pattern) {
429 let invalid_matches = source_keys.iter().filter(|key| glob.matches(key)).cloned().collect::<Vec<_>>();
430 if !invalid_matches.is_empty() {
431 notifications.push(send_invalid_input_pattern_match(tx.clone(), pattern.clone(), invalid_matches).boxed())
432 }
433 }
434 }
435 for pattern in patterns.for_outputs.iter() {
436 if let Ok(glob) = glob::Pattern::new(pattern) {
437 let invalid_matches = sink_keys.iter().filter(|key| glob.matches(key)).cloned().collect::<Vec<_>>();
438 if !invalid_matches.is_empty() {
439 notifications.push(send_invalid_output_pattern_match(tx.clone(), pattern.clone(), invalid_matches).boxed())
440 }
441 }
442 }
443
444 last_matches = matched;
445
446 if try_join_all(notifications).await.is_err() {
449 debug!("Couldn't send notification(s); tap gone away.");
450 break;
451 }
452 }
453 }
454 }
455
456 debug!(message = "Stopped tap.", outputs_patterns = ?patterns.for_outputs, inputs_patterns = ?patterns.for_inputs);
457}
458
459mod tests {
460 #[test]
461 fn matches() {
463 use super::GlobMatcher;
464
465 let patterns = ["ab*", "12?", "xy?"];
466
467 for id in &["abc", "123", "xyz"] {
469 assert!(patterns.iter().any(|p| p.to_string().matches_glob(id)));
470 }
471
472 for id in &["xzy", "ad*", "1234"] {
474 assert!(!patterns.iter().any(|p| p.to_string().matches_glob(id)));
475 }
476 }
477}