vector_tap/
controller.rs

1use std::{
2    collections::{HashMap, HashSet},
3    num::NonZeroUsize,
4};
5
6use futures::{future::try_join_all, FutureExt};
7use tokio::sync::{
8    mpsc as tokio_mpsc,
9    mpsc::error::{SendError, TrySendError},
10    oneshot,
11};
12use tracing::{Instrument, Span};
13use uuid::Uuid;
14use vector_buffers::{topology::builder::TopologyBuilder, WhenFull};
15use vector_common::config::ComponentKey;
16use vector_core::event::{EventArray, LogArray, MetricArray, TraceArray};
17use vector_core::fanout;
18
19use crate::notification::{InvalidMatch, Matched, NotMatched, Notification};
20use crate::topology::{TapOutput, TapResource, WatchRx};
21
22/// A tap sender is the control channel used to surface tap payloads to a client.
23type TapSender = tokio_mpsc::Sender<TapPayload>;
24
25// Shutdown channel types
26type ShutdownTx = oneshot::Sender<()>;
27type ShutdownRx = oneshot::Receiver<()>;
28
29const TAP_BUFFER_SIZE: NonZeroUsize = NonZeroUsize::new(100).unwrap();
30
31/// Clients can supply glob patterns to find matched topology components.
32trait GlobMatcher<T> {
33    fn matches_glob(&self, rhs: T) -> bool;
34}
35
36impl GlobMatcher<&str> for String {
37    fn matches_glob(&self, rhs: &str) -> bool {
38        match glob::Pattern::new(self) {
39            Ok(pattern) => pattern.matches(rhs),
40            _ => false,
41        }
42    }
43}
44
45/// Distinguishing between pattern variants helps us preserve user-friendly tap
46/// notifications. Otherwise, after translating an input pattern into relevant
47/// output patterns, we'd be unable to send a [`TapPayload::Notification`] with
48/// the original user-specified input pattern.
49#[derive(Debug, Eq, PartialEq, Hash)]
50enum Pattern {
51    /// A pattern used to tap into outputs of components
52    OutputPattern(glob::Pattern),
53    /// A pattern used to tap into inputs of components.
54    ///
55    /// For a tap user, an input pattern is effectively a shortcut for specifying
56    /// one or more output patterns since a component's inputs are other
57    /// components' outputs. This variant captures the original user-supplied
58    /// pattern alongside the output patterns it's translated into.
59    InputPattern(String, Vec<glob::Pattern>),
60}
61
62impl GlobMatcher<&str> for Pattern {
63    fn matches_glob(&self, rhs: &str) -> bool {
64        match self {
65            Pattern::OutputPattern(pattern) => pattern.matches(rhs),
66            Pattern::InputPattern(_, patterns) => {
67                patterns.iter().any(|pattern| pattern.matches(rhs))
68            }
69        }
70    }
71}
72
73/// Patterns (glob) used by tap to match against components and access events
74/// flowing into (for_inputs) or out of (for_outputs) specified components
75#[derive(Debug)]
76pub struct TapPatterns {
77    pub for_outputs: HashSet<String>,
78    pub for_inputs: HashSet<String>,
79}
80
81impl TapPatterns {
82    pub const fn new(for_outputs: HashSet<String>, for_inputs: HashSet<String>) -> Self {
83        Self {
84            for_outputs,
85            for_inputs,
86        }
87    }
88
89    /// Get all user-specified patterns
90    pub fn all_patterns(&self) -> HashSet<String> {
91        self.for_outputs
92            .iter()
93            .cloned()
94            .chain(self.for_inputs.iter().cloned())
95            .collect()
96    }
97}
98
99/// A tap payload contains events or notifications that alert users about the
100/// status of the tap request.
101#[derive(Debug)]
102pub enum TapPayload {
103    Log(TapOutput, LogArray),
104    Metric(TapOutput, MetricArray),
105    Trace(TapOutput, TraceArray),
106    Notification(Notification),
107}
108
109impl TapPayload {
110    /// Raise a `matched` event against the provided pattern.
111    pub fn matched<T: Into<String>>(pattern: T) -> Self {
112        Self::Notification(Notification::Matched(Matched::new(pattern.into())))
113    }
114
115    /// Raise a `not_matched` event against the provided pattern.
116    pub fn not_matched<T: Into<String>>(pattern: T) -> Self {
117        Self::Notification(Notification::NotMatched(NotMatched::new(pattern.into())))
118    }
119
120    /// Raise an `invalid_match` event against the provided input pattern.
121    pub fn invalid_input_pattern_match<T: Into<String>>(
122        pattern: T,
123        invalid_matches: Vec<String>,
124    ) -> Self {
125        let pattern = pattern.into();
126        let message = format!("[tap] Warning: source inputs cannot be tapped. Input pattern '{pattern}' matches sources {invalid_matches:?}");
127        Self::Notification(Notification::InvalidMatch(InvalidMatch::new(
128            message,
129            pattern,
130            invalid_matches,
131        )))
132    }
133
134    /// Raise an `invalid_match` event against the provided output pattern.
135    pub fn invalid_output_pattern_match<T: Into<String>>(
136        pattern: T,
137        invalid_matches: Vec<String>,
138    ) -> Self {
139        let pattern = pattern.into();
140        let message = format!(
141            "[tap] Warning: sink outputs cannot be tapped. Output pattern '{pattern}' matches sinks {invalid_matches:?}"
142        );
143        Self::Notification(Notification::InvalidMatch(InvalidMatch::new(
144            message,
145            pattern,
146            invalid_matches,
147        )))
148    }
149}
150
151/// A `TapTransformer` transforms raw events and ships them to the global tap receiver.
152#[derive(Clone)]
153pub struct TapTransformer {
154    tap_tx: TapSender,
155    output: TapOutput,
156}
157
158impl TapTransformer {
159    pub const fn new(tap_tx: TapSender, output: TapOutput) -> Self {
160        Self { tap_tx, output }
161    }
162
163    pub fn try_send(&mut self, events: EventArray) {
164        let payload = match events {
165            EventArray::Logs(logs) => TapPayload::Log(self.output.clone(), logs),
166            EventArray::Metrics(metrics) => TapPayload::Metric(self.output.clone(), metrics),
167            EventArray::Traces(traces) => TapPayload::Trace(self.output.clone(), traces),
168        };
169
170        if let Err(TrySendError::Closed(payload)) = self.tap_tx.try_send(payload) {
171            debug!(
172                message = "Couldn't send event.",
173                payload = ?payload,
174                component_id = ?self.output.output_id,
175            );
176        }
177    }
178}
179
180/// A tap sink spawns a process for listening for topology changes. If topology changes,
181/// sinks are rewired to accommodate matched/unmatched patterns.
182#[derive(Debug)]
183pub struct TapController {
184    _shutdown: ShutdownTx,
185}
186
187impl TapController {
188    /// Creates a new tap sink, and spawns a handler for watching for topology changes
189    /// and a separate inner handler for events. Uses a oneshot channel to trigger shutdown
190    /// of handlers when the `TapSink` drops out of scope.
191    pub fn new(watch_rx: WatchRx, tap_tx: TapSender, patterns: TapPatterns) -> Self {
192        let (_shutdown, shutdown_rx) = oneshot::channel();
193
194        tokio::spawn(
195            tap_handler(patterns, tap_tx, watch_rx, shutdown_rx).instrument(error_span!(
196                "tap_handler",
197                component_kind = "sink",
198                component_id = "_tap", // It isn't clear what the component_id should be here other than "_tap"
199                component_type = "tap",
200            )),
201        );
202
203        Self { _shutdown }
204    }
205}
206
207/// Provides a `ShutdownTx` that disconnects a component sink when it drops out of scope.
208fn shutdown_trigger(control_tx: fanout::ControlChannel, sink_id: ComponentKey) -> ShutdownTx {
209    let (shutdown_tx, shutdown_rx) = oneshot::channel();
210
211    tokio::spawn(async move {
212        _ = shutdown_rx.await;
213        if control_tx
214            .send(fanout::ControlMessage::Remove(sink_id.clone()))
215            .is_err()
216        {
217            debug!(message = "Couldn't disconnect sink.", ?sink_id);
218        } else {
219            debug!(message = "Disconnected sink.", ?sink_id);
220        }
221    });
222
223    shutdown_tx
224}
225
226/// Sends a 'matched' tap payload.
227async fn send_matched(tx: TapSender, pattern: String) -> Result<(), SendError<TapPayload>> {
228    debug!(message = "Sending matched notification.", pattern = ?pattern);
229    tx.send(TapPayload::matched(pattern)).await
230}
231
232/// Sends a 'not matched' tap payload.
233async fn send_not_matched(tx: TapSender, pattern: String) -> Result<(), SendError<TapPayload>> {
234    debug!(message = "Sending not matched notification.", pattern = ?pattern);
235    tx.send(TapPayload::not_matched(pattern)).await
236}
237
238/// Sends an 'invalid input pattern match' tap payload.
239async fn send_invalid_input_pattern_match(
240    tx: TapSender,
241    pattern: String,
242    invalid_matches: Vec<String>,
243) -> Result<(), SendError<TapPayload>> {
244    debug!(message = "Sending invalid input pattern match notification.", pattern = ?pattern, invalid_matches = ?invalid_matches);
245    tx.send(TapPayload::invalid_input_pattern_match(
246        pattern,
247        invalid_matches,
248    ))
249    .await
250}
251
252/// Sends an 'invalid output pattern match' tap payload.
253async fn send_invalid_output_pattern_match(
254    tx: TapSender,
255    pattern: String,
256    invalid_matches: Vec<String>,
257) -> Result<(), SendError<TapPayload>> {
258    debug!(message = "Sending invalid output pattern match notification.", pattern = ?pattern, invalid_matches = ?invalid_matches);
259    tx.send(TapPayload::invalid_output_pattern_match(
260        pattern,
261        invalid_matches,
262    ))
263    .await
264}
265
266/// Returns a tap handler that listens for topology changes, and connects sinks to observe
267/// `LogEvent`s` when a component matches one or more of the provided patterns.
268async fn tap_handler(
269    patterns: TapPatterns,
270    tx: TapSender,
271    mut watch_rx: WatchRx,
272    mut shutdown_rx: ShutdownRx,
273) {
274    debug!(message = "Started tap.", outputs_patterns = ?patterns.for_outputs, inputs_patterns = ?patterns.for_inputs);
275
276    // Sinks register for the current tap. Contains the id of the matched component, and
277    // a shutdown trigger for sending a remove control message when matching sinks change.
278    let mut sinks: HashMap<ComponentKey, _> = HashMap::new();
279
280    // Recording user-provided patterns for later use in sending notifications
281    // (determining patterns which did not match)
282    let user_provided_patterns = patterns.all_patterns();
283
284    // The patterns that matched on the last iteration, to compare with the latest
285    // round of matches when sending notifications.
286    let mut last_matches = HashSet::new();
287
288    loop {
289        tokio::select! {
290            _ = &mut shutdown_rx => break,
291            Ok(_) = watch_rx.changed() => {
292                // Cache of matched patterns. A `HashSet` is used here to ignore repetition.
293                let mut matched = HashSet::new();
294
295                // Borrow and clone the latest resources to register sinks. Since this blocks the
296                // watch channel and the returned ref isn't `Send`, this requires a clone.
297                let TapResource {
298                    outputs,
299                    inputs,
300                    source_keys,
301                    sink_keys,
302                    removals,
303                } = watch_rx.borrow().clone();
304
305                // Remove tap sinks from components that have gone away/can no longer match.
306                let output_keys = outputs.keys().map(|output| output.output_id.component.clone()).collect::<HashSet<_>>();
307                sinks.retain(|key, _| {
308                    !removals.contains(key) && output_keys.contains(key) || {
309                        debug!(message = "Removing component.", component_id = %key);
310                        false
311                    }
312                });
313
314                let mut component_id_patterns = patterns.for_outputs.iter()
315                                                                    .filter_map(|p| glob::Pattern::new(p).ok())
316                                                                    .map(Pattern::OutputPattern).collect::<HashSet<_>>();
317
318                // Matching an input pattern is equivalent to matching the outputs of the component's inputs
319                for pattern in patterns.for_inputs.iter() {
320                    if let Ok(glob) = glob::Pattern::new(pattern) {
321                        match inputs.iter().filter(|(key, _)|
322                            glob.matches(&key.to_string())
323                        ).flat_map(|(_, related_inputs)| related_inputs.iter().map(|id| id.to_string()).collect::<Vec<_>>()).collect::<HashSet<_>>() {
324                            found if !found.is_empty() => {
325                                component_id_patterns.insert(Pattern::InputPattern(pattern.clone(), found.into_iter()
326                                                                                                         .filter_map(|p| glob::Pattern::new(&p).ok()).collect::<Vec<_>>()));
327                            }
328                            _ => {
329                                debug!(message="Input pattern not expanded: no matching components.", ?pattern);
330                            }
331                        }
332                    }
333                }
334
335                // Loop over all outputs, and connect sinks for the components that match one
336                // or more patterns.
337                for (output, control_tx) in outputs.iter() {
338                    match component_id_patterns
339                        .iter()
340                        .filter(|pattern| pattern.matches_glob(&output.output_id.to_string()))
341                        .collect::<Vec<_>>()
342                    {
343                        found if !found.is_empty() => {
344                            debug!(
345                                message="Component matched.",
346                                ?output.output_id, ?component_id_patterns, matched = ?found
347                            );
348
349                            // Build a new intermediate buffer pair that we can insert as a sink
350                            // target for the component, and spawn our transformer task which will
351                            // wrap each event payload with the necessary metadata before forwarding
352                            // it to our global tap receiver.
353                            let (tap_buffer_tx, mut tap_buffer_rx) = TopologyBuilder::standalone_memory(TAP_BUFFER_SIZE, WhenFull::DropNewest, &Span::current()).await;
354                            let mut tap_transformer = TapTransformer::new(tx.clone(), output.clone());
355
356                            tokio::spawn(async move {
357                                while let Some(events) = tap_buffer_rx.next().await {
358                                    tap_transformer.try_send(events);
359                                }
360                            });
361
362                            // Attempt to connect the sink.
363                            //
364                            // This is necessary because a sink may be reconfigured with the same id
365                            // as a previous, and we are not getting involved in config diffing at
366                            // this point.
367                            let sink_id = Uuid::new_v4().to_string();
368                            match control_tx
369                                .send(fanout::ControlMessage::Add(ComponentKey::from(sink_id.as_str()), tap_buffer_tx))
370                            {
371                                Ok(_) => {
372                                    debug!(
373                                        message = "Sink connected.", ?sink_id, ?output.output_id,
374                                    );
375
376                                    // Create a sink shutdown trigger to remove the sink
377                                    // when matched components change.
378                                    sinks.entry(output.output_id.component.clone()).or_insert_with(Vec::new).push(
379                                        shutdown_trigger(control_tx.clone(), ComponentKey::from(sink_id.as_str()))
380                                    );
381                                }
382                                Err(error) => {
383                                    error!(
384                                        message = "Couldn't connect sink.",
385                                        ?error,
386                                        ?output.output_id,
387                                        ?sink_id,
388                                    );
389                                }
390                            }
391
392                            matched.extend(found.iter().map(|pattern| {
393                                match pattern {
394                                    Pattern::OutputPattern(p) => p.to_string(),
395                                    Pattern::InputPattern(p, _) => p.to_owned(),
396                                }
397                            }));
398                        }
399                        _ => {
400                            debug!(
401                                message="Component not matched.", ?output.output_id, ?component_id_patterns
402                            );
403                        }
404                    }
405                }
406
407                // Notifications to send to the client.
408                let mut notifications = Vec::new();
409
410                // Matched notifications.
411                for pattern in matched.difference(&last_matches) {
412                    notifications.push(send_matched(tx.clone(), pattern.clone()).boxed());
413                }
414
415                // Not matched notifications.
416                for pattern in user_provided_patterns.difference(&matched) {
417                    notifications.push(send_not_matched(tx.clone(), pattern.clone()).boxed());
418                }
419
420                // Warnings on invalid matches.
421                for pattern in patterns.for_inputs.iter() {
422                    if let Ok(glob) = glob::Pattern::new(pattern) {
423                        let invalid_matches = source_keys.iter().filter(|key| glob.matches(key)).cloned().collect::<Vec<_>>();
424                        if !invalid_matches.is_empty() {
425                            notifications.push(send_invalid_input_pattern_match(tx.clone(), pattern.clone(), invalid_matches).boxed())
426                        }
427                    }
428                }
429                for pattern in patterns.for_outputs.iter() {
430                    if let Ok(glob) = glob::Pattern::new(pattern) {
431                        let invalid_matches = sink_keys.iter().filter(|key| glob.matches(key)).cloned().collect::<Vec<_>>();
432                        if !invalid_matches.is_empty() {
433                            notifications.push(send_invalid_output_pattern_match(tx.clone(), pattern.clone(), invalid_matches).boxed())
434                        }
435                    }
436                }
437
438                last_matches = matched;
439
440                // Send all events. If any event returns an error, this means the client
441                // channel has gone away, so we can break the loop.
442                if try_join_all(notifications).await.is_err() {
443                    debug!("Couldn't send notification(s); tap gone away.");
444                    break;
445                }
446            }
447        }
448    }
449
450    debug!(message = "Stopped tap.", outputs_patterns = ?patterns.for_outputs, inputs_patterns = ?patterns.for_inputs);
451}
452
453mod tests {
454    #[test]
455    /// Patterns should accept globbing.
456    fn matches() {
457        use super::GlobMatcher;
458
459        let patterns = ["ab*", "12?", "xy?"];
460
461        // Should find.
462        for id in &["abc", "123", "xyz"] {
463            assert!(patterns.iter().any(|p| p.to_string().matches_glob(id)));
464        }
465
466        // Should not find.
467        for id in &["xzy", "ad*", "1234"] {
468            assert!(!patterns.iter().any(|p| p.to_string().matches_glob(id)));
469        }
470    }
471}