vector/api/schema/events/
mod.rs

1pub mod encoding;
2pub mod log;
3pub mod metric;
4pub mod output;
5pub mod trace;
6
7use async_graphql::{Context, Subscription};
8use encoding::EventEncodingType;
9use futures::{stream, Stream, StreamExt};
10use output::{from_tap_payload_to_output_events, OutputEventsPayload};
11use rand::{rngs::SmallRng, Rng, SeedableRng};
12use std::time::{SystemTime, UNIX_EPOCH};
13use tokio::{select, sync::mpsc, time};
14use tokio_stream::wrappers::ReceiverStream;
15use vector_lib::tap::{
16    controller::{TapController, TapPatterns},
17    topology::WatchRx,
18};
19
20#[derive(Debug, Default)]
21pub struct EventsSubscription;
22
23#[Subscription]
24impl EventsSubscription {
25    /// A stream of events emitted from matched component ID patterns
26    pub async fn output_events_by_component_id_patterns<'a>(
27        &'a self,
28        ctx: &'a Context<'a>,
29        outputs_patterns: Vec<String>,
30        inputs_patterns: Option<Vec<String>>,
31        #[graphql(default = 500)] interval: u32,
32        #[graphql(default = 100, validator(minimum = 1, maximum = 10_000))] limit: u32,
33    ) -> impl Stream<Item = Vec<OutputEventsPayload>> + 'a {
34        let watch_rx = ctx.data_unchecked::<WatchRx>().clone();
35
36        let patterns = TapPatterns {
37            for_outputs: outputs_patterns.into_iter().collect(),
38            for_inputs: inputs_patterns.unwrap_or_default().into_iter().collect(),
39        };
40        // Client input is confined to `u32` to provide sensible bounds.
41        create_events_stream(watch_rx, patterns, interval as u64, limit as usize)
42    }
43}
44
45/// Creates an events stream based on component ids, and a provided interval. Will emit
46/// control messages that bubble up the application if the sink goes away. The stream contains
47/// all matching events; filtering should be done at the caller level.
48pub(crate) fn create_events_stream(
49    watch_rx: WatchRx,
50    patterns: TapPatterns,
51    interval: u64,
52    limit: usize,
53) -> impl Stream<Item = Vec<OutputEventsPayload>> {
54    // Channel for receiving individual tap payloads. Since we can process at most `limit` per
55    // interval, this is capped to the same value.
56    let (tap_tx, tap_rx) = mpsc::channel(limit);
57    let mut tap_rx = ReceiverStream::new(tap_rx)
58        .flat_map(|payload| stream::iter(from_tap_payload_to_output_events(payload)));
59
60    // The resulting vector of `Event` sent to the client. Only one result set will be streamed
61    // back to the client at a time. This value is set higher than `1` to prevent blocking the event
62    // pipeline on slower client connections, but low enough to apply a modest cap on mem usage.
63    let (event_tx, event_rx) = mpsc::channel::<Vec<OutputEventsPayload>>(10);
64
65    tokio::spawn(async move {
66        // Create a tap controller. When this drops out of scope, clean up will be performed on the
67        // event handlers and topology observation that the tap controller provides.
68        let _tap_controller = TapController::new(watch_rx, tap_tx, patterns);
69
70        // A tick interval to represent when to 'cut' the results back to the client.
71        let mut interval = time::interval(time::Duration::from_millis(interval));
72
73        // Temporary structure to hold sortable values of `Event`.
74        struct SortableOutputEventsPayload {
75            batch: usize,
76            payload: OutputEventsPayload,
77        }
78
79        // Collect a vector of results, with a capacity of `limit`. As new `Event`s come in,
80        // they will be sampled and added to results.
81        let mut results = Vec::<SortableOutputEventsPayload>::with_capacity(limit);
82
83        // Random number generator to allow for sampling. Speed trumps cryptographic security here.
84        // The RNG must be Send + Sync to use with the `select!` loop below, hence `SmallRng`.
85        let seed = SystemTime::now()
86            .duration_since(UNIX_EPOCH)
87            .unwrap()
88            .as_nanos() as u64;
89        let mut rng = SmallRng::seed_from_u64(seed);
90
91        // Keep a count of the batch size, which will be used as a seed for random eviction
92        // per the sampling strategy used below.
93        let mut batch = 0;
94
95        loop {
96            select! {
97                // Process `TapPayload`s. A tap payload could contain log/metric events or a
98                // notification. Notifications are emitted immediately; events buffer until
99                // the next `interval`.
100                Some(payload) = tap_rx.next() => {
101                    // Emit notifications immediately; these don't count as a 'batch'.
102                    if let OutputEventsPayload::Notification(_) = payload {
103                        // If an error occurs when sending, the subscription has likely gone
104                        // away. Break the loop to terminate the thread.
105                        if let Err(err) = event_tx.send(vec![payload]).await {
106                            debug!(message = "Couldn't send notification.", error = ?err);
107                            break;
108                        }
109                    } else {
110                        // Wrap tap in a 'sortable' wrapper, using the batch as a key, to
111                        // re-sort after random eviction.
112                        let payload = SortableOutputEventsPayload { batch, payload };
113
114                        // A simple implementation of "Algorithm R" per
115                        // https://en.wikipedia.org/wiki/Reservoir_sampling. As we're unable to
116                        // pluck the nth result, this is chosen over the more optimal "Algorithm L"
117                        // since discarding results isn't an option.
118                        if limit > results.len() {
119                            results.push(payload);
120                        } else {
121                            let random_number = rng.random_range(0..batch);
122                            if random_number < results.len() {
123                                results[random_number] = payload;
124                            }
125                        }
126                        // Increment the batch count, to be used for the next Algo R loop.
127                        batch += 1;
128                    }
129                }
130                _ = interval.tick() => {
131                    // If there are any existing results after the interval tick, emit.
132                    if !results.is_empty() {
133                        // Reset the batch count, to adjust sampling probability for the next round.
134                        batch = 0;
135
136                        // Since events will appear out of order per the random sampling
137                        // strategy, drain the existing results and sort by timestamp.
138                        results.sort_by_key(|r| r.batch);
139                        let results = results.drain(..)
140                            .map(|r| r.payload)
141                            .collect();
142
143                        // If we get an error here, it likely means that the subscription has
144                        // gone has away. This is a valid/common situation.
145                        if let Err(err) = event_tx.send(results).await {
146                            debug!(message = "Couldn't send events.", error = ?err);
147                            break;
148                        }
149                    }
150                }
151            }
152        }
153    });
154
155    ReceiverStream::new(event_rx)
156}