vector_tap/
controller.rs

1use std::{
2    collections::{HashMap, HashSet},
3    num::NonZeroUsize,
4};
5
6use futures::{FutureExt, future::try_join_all};
7use tokio::sync::{
8    mpsc as tokio_mpsc,
9    mpsc::error::{SendError, TrySendError},
10    oneshot,
11};
12use tracing::{Instrument, Span};
13use uuid::Uuid;
14use vector_buffers::{WhenFull, topology::builder::TopologyBuilder};
15use vector_common::config::ComponentKey;
16use vector_core::{
17    event::{EventArray, LogArray, MetricArray, TraceArray},
18    fanout,
19};
20
21use crate::{
22    notification::{InvalidMatch, Matched, NotMatched, Notification},
23    topology::{TapOutput, TapResource, WatchRx},
24};
25
26/// A tap sender is the control channel used to surface tap payloads to a client.
27type TapSender = tokio_mpsc::Sender<TapPayload>;
28
29// Shutdown channel types
30type ShutdownTx = oneshot::Sender<()>;
31type ShutdownRx = oneshot::Receiver<()>;
32
33const TAP_BUFFER_SIZE: NonZeroUsize = NonZeroUsize::new(100).unwrap();
34
35/// Clients can supply glob patterns to find matched topology components.
36trait GlobMatcher<T> {
37    fn matches_glob(&self, rhs: T) -> bool;
38}
39
40impl GlobMatcher<&str> for String {
41    fn matches_glob(&self, rhs: &str) -> bool {
42        match glob::Pattern::new(self) {
43            Ok(pattern) => pattern.matches(rhs),
44            _ => false,
45        }
46    }
47}
48
49/// Distinguishing between pattern variants helps us preserve user-friendly tap
50/// notifications. Otherwise, after translating an input pattern into relevant
51/// output patterns, we'd be unable to send a [`TapPayload::Notification`] with
52/// the original user-specified input pattern.
53#[derive(Debug, Eq, PartialEq, Hash)]
54enum Pattern {
55    /// A pattern used to tap into outputs of components
56    OutputPattern(glob::Pattern),
57    /// A pattern used to tap into inputs of components.
58    ///
59    /// For a tap user, an input pattern is effectively a shortcut for specifying
60    /// one or more output patterns since a component's inputs are other
61    /// components' outputs. This variant captures the original user-supplied
62    /// pattern alongside the output patterns it's translated into.
63    InputPattern(String, Vec<glob::Pattern>),
64}
65
66impl GlobMatcher<&str> for Pattern {
67    fn matches_glob(&self, rhs: &str) -> bool {
68        match self {
69            Pattern::OutputPattern(pattern) => pattern.matches(rhs),
70            Pattern::InputPattern(_, patterns) => {
71                patterns.iter().any(|pattern| pattern.matches(rhs))
72            }
73        }
74    }
75}
76
77/// Patterns (glob) used by tap to match against components and access events
78/// flowing into (for_inputs) or out of (for_outputs) specified components
79#[derive(Debug)]
80pub struct TapPatterns {
81    pub for_outputs: HashSet<String>,
82    pub for_inputs: HashSet<String>,
83}
84
85impl TapPatterns {
86    pub const fn new(for_outputs: HashSet<String>, for_inputs: HashSet<String>) -> Self {
87        Self {
88            for_outputs,
89            for_inputs,
90        }
91    }
92
93    /// Get all user-specified patterns
94    pub fn all_patterns(&self) -> HashSet<String> {
95        self.for_outputs
96            .iter()
97            .cloned()
98            .chain(self.for_inputs.iter().cloned())
99            .collect()
100    }
101}
102
103/// A tap payload contains events or notifications that alert users about the
104/// status of the tap request.
105#[derive(Debug)]
106pub enum TapPayload {
107    Log(TapOutput, LogArray),
108    Metric(TapOutput, MetricArray),
109    Trace(TapOutput, TraceArray),
110    Notification(Notification),
111}
112
113impl TapPayload {
114    /// Raise a `matched` event against the provided pattern.
115    pub fn matched<T: Into<String>>(pattern: T) -> Self {
116        Self::Notification(Notification::Matched(Matched::new(pattern.into())))
117    }
118
119    /// Raise a `not_matched` event against the provided pattern.
120    pub fn not_matched<T: Into<String>>(pattern: T) -> Self {
121        Self::Notification(Notification::NotMatched(NotMatched::new(pattern.into())))
122    }
123
124    /// Raise an `invalid_match` event against the provided input pattern.
125    pub fn invalid_input_pattern_match<T: Into<String>>(
126        pattern: T,
127        invalid_matches: Vec<String>,
128    ) -> Self {
129        let pattern = pattern.into();
130        let message = format!(
131            "[tap] Warning: source inputs cannot be tapped. Input pattern '{pattern}' matches sources {invalid_matches:?}"
132        );
133        Self::Notification(Notification::InvalidMatch(InvalidMatch::new(
134            message,
135            pattern,
136            invalid_matches,
137        )))
138    }
139
140    /// Raise an `invalid_match` event against the provided output pattern.
141    pub fn invalid_output_pattern_match<T: Into<String>>(
142        pattern: T,
143        invalid_matches: Vec<String>,
144    ) -> Self {
145        let pattern = pattern.into();
146        let message = format!(
147            "[tap] Warning: sink outputs cannot be tapped. Output pattern '{pattern}' matches sinks {invalid_matches:?}"
148        );
149        Self::Notification(Notification::InvalidMatch(InvalidMatch::new(
150            message,
151            pattern,
152            invalid_matches,
153        )))
154    }
155}
156
157/// A `TapTransformer` transforms raw events and ships them to the global tap receiver.
158#[derive(Clone)]
159pub struct TapTransformer {
160    tap_tx: TapSender,
161    output: TapOutput,
162}
163
164impl TapTransformer {
165    pub const fn new(tap_tx: TapSender, output: TapOutput) -> Self {
166        Self { tap_tx, output }
167    }
168
169    pub fn try_send(&mut self, events: EventArray) {
170        let payload = match events {
171            EventArray::Logs(logs) => TapPayload::Log(self.output.clone(), logs),
172            EventArray::Metrics(metrics) => TapPayload::Metric(self.output.clone(), metrics),
173            EventArray::Traces(traces) => TapPayload::Trace(self.output.clone(), traces),
174        };
175
176        if let Err(TrySendError::Closed(payload)) = self.tap_tx.try_send(payload) {
177            debug!(
178                message = "Couldn't send event.",
179                payload = ?payload,
180                component_id = ?self.output.output_id,
181            );
182        }
183    }
184}
185
186/// A tap sink spawns a process for listening for topology changes. If topology changes,
187/// sinks are rewired to accommodate matched/unmatched patterns.
188#[derive(Debug)]
189pub struct TapController {
190    _shutdown: ShutdownTx,
191}
192
193impl TapController {
194    /// Creates a new tap sink, and spawns a handler for watching for topology changes
195    /// and a separate inner handler for events. Uses a oneshot channel to trigger shutdown
196    /// of handlers when the `TapSink` drops out of scope.
197    pub fn new(watch_rx: WatchRx, tap_tx: TapSender, patterns: TapPatterns) -> Self {
198        let (_shutdown, shutdown_rx) = oneshot::channel();
199
200        tokio::spawn(
201            tap_handler(patterns, tap_tx, watch_rx, shutdown_rx).instrument(error_span!(
202                "tap_handler",
203                component_kind = "sink",
204                component_id = "_tap", // It isn't clear what the component_id should be here other than "_tap"
205                component_type = "tap",
206            )),
207        );
208
209        Self { _shutdown }
210    }
211}
212
213/// Provides a `ShutdownTx` that disconnects a component sink when it drops out of scope.
214fn shutdown_trigger(control_tx: fanout::ControlChannel, sink_id: ComponentKey) -> ShutdownTx {
215    let (shutdown_tx, shutdown_rx) = oneshot::channel();
216
217    tokio::spawn(async move {
218        _ = shutdown_rx.await;
219        if control_tx
220            .send(fanout::ControlMessage::Remove(sink_id.clone()))
221            .is_err()
222        {
223            debug!(message = "Couldn't disconnect sink.", ?sink_id);
224        } else {
225            debug!(message = "Disconnected sink.", ?sink_id);
226        }
227    });
228
229    shutdown_tx
230}
231
232/// Sends a 'matched' tap payload.
233async fn send_matched(tx: TapSender, pattern: String) -> Result<(), SendError<TapPayload>> {
234    debug!(message = "Sending matched notification.", pattern = ?pattern);
235    tx.send(TapPayload::matched(pattern)).await
236}
237
238/// Sends a 'not matched' tap payload.
239async fn send_not_matched(tx: TapSender, pattern: String) -> Result<(), SendError<TapPayload>> {
240    debug!(message = "Sending not matched notification.", pattern = ?pattern);
241    tx.send(TapPayload::not_matched(pattern)).await
242}
243
244/// Sends an 'invalid input pattern match' tap payload.
245async fn send_invalid_input_pattern_match(
246    tx: TapSender,
247    pattern: String,
248    invalid_matches: Vec<String>,
249) -> Result<(), SendError<TapPayload>> {
250    debug!(message = "Sending invalid input pattern match notification.", pattern = ?pattern, invalid_matches = ?invalid_matches);
251    tx.send(TapPayload::invalid_input_pattern_match(
252        pattern,
253        invalid_matches,
254    ))
255    .await
256}
257
258/// Sends an 'invalid output pattern match' tap payload.
259async fn send_invalid_output_pattern_match(
260    tx: TapSender,
261    pattern: String,
262    invalid_matches: Vec<String>,
263) -> Result<(), SendError<TapPayload>> {
264    debug!(message = "Sending invalid output pattern match notification.", pattern = ?pattern, invalid_matches = ?invalid_matches);
265    tx.send(TapPayload::invalid_output_pattern_match(
266        pattern,
267        invalid_matches,
268    ))
269    .await
270}
271
272/// Returns a tap handler that listens for topology changes, and connects sinks to observe
273/// `LogEvent`s` when a component matches one or more of the provided patterns.
274async fn tap_handler(
275    patterns: TapPatterns,
276    tx: TapSender,
277    mut watch_rx: WatchRx,
278    mut shutdown_rx: ShutdownRx,
279) {
280    debug!(message = "Started tap.", outputs_patterns = ?patterns.for_outputs, inputs_patterns = ?patterns.for_inputs);
281
282    // Sinks register for the current tap. Contains the id of the matched component, and
283    // a shutdown trigger for sending a remove control message when matching sinks change.
284    let mut sinks: HashMap<ComponentKey, _> = HashMap::new();
285
286    // Recording user-provided patterns for later use in sending notifications
287    // (determining patterns which did not match)
288    let user_provided_patterns = patterns.all_patterns();
289
290    // The patterns that matched on the last iteration, to compare with the latest
291    // round of matches when sending notifications.
292    let mut last_matches = HashSet::new();
293
294    loop {
295        tokio::select! {
296            _ = &mut shutdown_rx => break,
297            Ok(_) = watch_rx.changed() => {
298                // Cache of matched patterns. A `HashSet` is used here to ignore repetition.
299                let mut matched = HashSet::new();
300
301                // Borrow and clone the latest resources to register sinks. Since this blocks the
302                // watch channel and the returned ref isn't `Send`, this requires a clone.
303                let TapResource {
304                    outputs,
305                    inputs,
306                    source_keys,
307                    sink_keys,
308                    removals,
309                } = watch_rx.borrow().clone();
310
311                // Remove tap sinks from components that have gone away/can no longer match.
312                let output_keys = outputs.keys().map(|output| output.output_id.component.clone()).collect::<HashSet<_>>();
313                sinks.retain(|key, _| {
314                    !removals.contains(key) && output_keys.contains(key) || {
315                        debug!(message = "Removing component.", component_id = %key);
316                        false
317                    }
318                });
319
320                let mut component_id_patterns = patterns.for_outputs.iter()
321                                                                    .filter_map(|p| glob::Pattern::new(p).ok())
322                                                                    .map(Pattern::OutputPattern).collect::<HashSet<_>>();
323
324                // Matching an input pattern is equivalent to matching the outputs of the component's inputs
325                for pattern in patterns.for_inputs.iter() {
326                    if let Ok(glob) = glob::Pattern::new(pattern) {
327                        match inputs.iter().filter(|(key, _)|
328                            glob.matches(&key.to_string())
329                        ).flat_map(|(_, related_inputs)| related_inputs.iter().map(|id| id.to_string()).collect::<Vec<_>>()).collect::<HashSet<_>>() {
330                            found if !found.is_empty() => {
331                                component_id_patterns.insert(Pattern::InputPattern(pattern.clone(), found.into_iter()
332                                                                                                         .filter_map(|p| glob::Pattern::new(&p).ok()).collect::<Vec<_>>()));
333                            }
334                            _ => {
335                                debug!(message="Input pattern not expanded: no matching components.", ?pattern);
336                            }
337                        }
338                    }
339                }
340
341                // Loop over all outputs, and connect sinks for the components that match one
342                // or more patterns.
343                for (output, control_tx) in outputs.iter() {
344                    match component_id_patterns
345                        .iter()
346                        .filter(|pattern| pattern.matches_glob(&output.output_id.to_string()))
347                        .collect::<Vec<_>>()
348                    {
349                        found if !found.is_empty() => {
350                            debug!(
351                                message="Component matched.",
352                                ?output.output_id, ?component_id_patterns, matched = ?found
353                            );
354
355                            // Build a new intermediate buffer pair that we can insert as a sink
356                            // target for the component, and spawn our transformer task which will
357                            // wrap each event payload with the necessary metadata before forwarding
358                            // it to our global tap receiver.
359                            let (tap_buffer_tx, mut tap_buffer_rx) = TopologyBuilder::standalone_memory(TAP_BUFFER_SIZE, WhenFull::DropNewest, &Span::current()).await;
360                            let mut tap_transformer = TapTransformer::new(tx.clone(), output.clone());
361
362                            tokio::spawn(async move {
363                                while let Some(events) = tap_buffer_rx.next().await {
364                                    tap_transformer.try_send(events);
365                                }
366                            });
367
368                            // Attempt to connect the sink.
369                            //
370                            // This is necessary because a sink may be reconfigured with the same id
371                            // as a previous, and we are not getting involved in config diffing at
372                            // this point.
373                            let sink_id = Uuid::new_v4().to_string();
374                            match control_tx
375                                .send(fanout::ControlMessage::Add(ComponentKey::from(sink_id.as_str()), tap_buffer_tx))
376                            {
377                                Ok(_) => {
378                                    debug!(
379                                        message = "Sink connected.", ?sink_id, ?output.output_id,
380                                    );
381
382                                    // Create a sink shutdown trigger to remove the sink
383                                    // when matched components change.
384                                    sinks.entry(output.output_id.component.clone()).or_insert_with(Vec::new).push(
385                                        shutdown_trigger(control_tx.clone(), ComponentKey::from(sink_id.as_str()))
386                                    );
387                                }
388                                Err(error) => {
389                                    error!(
390                                        message = "Couldn't connect sink.",
391                                        ?error,
392                                        ?output.output_id,
393                                        ?sink_id,
394                                    );
395                                }
396                            }
397
398                            matched.extend(found.iter().map(|pattern| {
399                                match pattern {
400                                    Pattern::OutputPattern(p) => p.to_string(),
401                                    Pattern::InputPattern(p, _) => p.to_owned(),
402                                }
403                            }));
404                        }
405                        _ => {
406                            debug!(
407                                message="Component not matched.", ?output.output_id, ?component_id_patterns
408                            );
409                        }
410                    }
411                }
412
413                // Notifications to send to the client.
414                let mut notifications = Vec::new();
415
416                // Matched notifications.
417                for pattern in matched.difference(&last_matches) {
418                    notifications.push(send_matched(tx.clone(), pattern.clone()).boxed());
419                }
420
421                // Not matched notifications.
422                for pattern in user_provided_patterns.difference(&matched) {
423                    notifications.push(send_not_matched(tx.clone(), pattern.clone()).boxed());
424                }
425
426                // Warnings on invalid matches.
427                for pattern in patterns.for_inputs.iter() {
428                    if let Ok(glob) = glob::Pattern::new(pattern) {
429                        let invalid_matches = source_keys.iter().filter(|key| glob.matches(key)).cloned().collect::<Vec<_>>();
430                        if !invalid_matches.is_empty() {
431                            notifications.push(send_invalid_input_pattern_match(tx.clone(), pattern.clone(), invalid_matches).boxed())
432                        }
433                    }
434                }
435                for pattern in patterns.for_outputs.iter() {
436                    if let Ok(glob) = glob::Pattern::new(pattern) {
437                        let invalid_matches = sink_keys.iter().filter(|key| glob.matches(key)).cloned().collect::<Vec<_>>();
438                        if !invalid_matches.is_empty() {
439                            notifications.push(send_invalid_output_pattern_match(tx.clone(), pattern.clone(), invalid_matches).boxed())
440                        }
441                    }
442                }
443
444                last_matches = matched;
445
446                // Send all events. If any event returns an error, this means the client
447                // channel has gone away, so we can break the loop.
448                if try_join_all(notifications).await.is_err() {
449                    debug!("Couldn't send notification(s); tap gone away.");
450                    break;
451                }
452            }
453        }
454    }
455
456    debug!(message = "Stopped tap.", outputs_patterns = ?patterns.for_outputs, inputs_patterns = ?patterns.for_inputs);
457}
458
459mod tests {
460    #[test]
461    /// Patterns should accept globbing.
462    fn matches() {
463        use super::GlobMatcher;
464
465        let patterns = ["ab*", "12?", "xy?"];
466
467        // Should find.
468        for id in &["abc", "123", "xyz"] {
469            assert!(patterns.iter().any(|p| p.to_string().matches_glob(id)));
470        }
471
472        // Should not find.
473        for id in &["xzy", "ad*", "1234"] {
474            assert!(!patterns.iter().any(|p| p.to_string().matches_glob(id)));
475        }
476    }
477}