vector/api/schema/events/
mod.rs

1pub mod encoding;
2pub mod log;
3pub mod metric;
4pub mod output;
5pub mod trace;
6
7use std::time::{SystemTime, UNIX_EPOCH};
8
9use async_graphql::{Context, Subscription};
10use encoding::EventEncodingType;
11use futures::{Stream, StreamExt, stream};
12use output::{OutputEventsPayload, from_tap_payload_to_output_events};
13use rand::{Rng, SeedableRng, rngs::SmallRng};
14use tokio::{select, sync::mpsc, time};
15use tokio_stream::wrappers::ReceiverStream;
16use vector_lib::tap::{
17    controller::{TapController, TapPatterns},
18    topology::WatchRx,
19};
20
21#[derive(Debug, Default)]
22pub struct EventsSubscription;
23
24#[Subscription]
25impl EventsSubscription {
26    /// A stream of events emitted from matched component ID patterns
27    pub async fn output_events_by_component_id_patterns<'a>(
28        &'a self,
29        ctx: &'a Context<'a>,
30        outputs_patterns: Vec<String>,
31        inputs_patterns: Option<Vec<String>>,
32        #[graphql(default = 500)] interval: u32,
33        #[graphql(default = 100, validator(minimum = 1, maximum = 10_000))] limit: u32,
34    ) -> impl Stream<Item = Vec<OutputEventsPayload>> + 'a {
35        let watch_rx = ctx.data_unchecked::<WatchRx>().clone();
36
37        let patterns = TapPatterns {
38            for_outputs: outputs_patterns.into_iter().collect(),
39            for_inputs: inputs_patterns.unwrap_or_default().into_iter().collect(),
40        };
41        // Client input is confined to `u32` to provide sensible bounds.
42        create_events_stream(watch_rx, patterns, interval as u64, limit as usize)
43    }
44}
45
46/// Creates an events stream based on component ids, and a provided interval. Will emit
47/// control messages that bubble up the application if the sink goes away. The stream contains
48/// all matching events; filtering should be done at the caller level.
49pub(crate) fn create_events_stream(
50    watch_rx: WatchRx,
51    patterns: TapPatterns,
52    interval: u64,
53    limit: usize,
54) -> impl Stream<Item = Vec<OutputEventsPayload>> {
55    // Channel for receiving individual tap payloads. Since we can process at most `limit` per
56    // interval, this is capped to the same value.
57    let (tap_tx, tap_rx) = mpsc::channel(limit);
58    let mut tap_rx = ReceiverStream::new(tap_rx)
59        .flat_map(|payload| stream::iter(from_tap_payload_to_output_events(payload)));
60
61    // The resulting vector of `Event` sent to the client. Only one result set will be streamed
62    // back to the client at a time. This value is set higher than `1` to prevent blocking the event
63    // pipeline on slower client connections, but low enough to apply a modest cap on mem usage.
64    let (event_tx, event_rx) = mpsc::channel::<Vec<OutputEventsPayload>>(10);
65
66    tokio::spawn(async move {
67        // Create a tap controller. When this drops out of scope, clean up will be performed on the
68        // event handlers and topology observation that the tap controller provides.
69        let _tap_controller = TapController::new(watch_rx, tap_tx, patterns);
70
71        // A tick interval to represent when to 'cut' the results back to the client.
72        let mut interval = time::interval(time::Duration::from_millis(interval));
73
74        // Temporary structure to hold sortable values of `Event`.
75        struct SortableOutputEventsPayload {
76            batch: usize,
77            payload: OutputEventsPayload,
78        }
79
80        // Collect a vector of results, with a capacity of `limit`. As new `Event`s come in,
81        // they will be sampled and added to results.
82        let mut results = Vec::<SortableOutputEventsPayload>::with_capacity(limit);
83
84        // Random number generator to allow for sampling. Speed trumps cryptographic security here.
85        // The RNG must be Send + Sync to use with the `select!` loop below, hence `SmallRng`.
86        let seed = SystemTime::now()
87            .duration_since(UNIX_EPOCH)
88            .unwrap()
89            .as_nanos() as u64;
90        let mut rng = SmallRng::seed_from_u64(seed);
91
92        // Keep a count of the batch size, which will be used as a seed for random eviction
93        // per the sampling strategy used below.
94        let mut batch = 0;
95
96        loop {
97            select! {
98                // Process `TapPayload`s. A tap payload could contain log/metric events or a
99                // notification. Notifications are emitted immediately; events buffer until
100                // the next `interval`.
101                Some(payload) = tap_rx.next() => {
102                    // Emit notifications immediately; these don't count as a 'batch'.
103                    if let OutputEventsPayload::Notification(_) = payload {
104                        // If an error occurs when sending, the subscription has likely gone
105                        // away. Break the loop to terminate the thread.
106                        if let Err(err) = event_tx.send(vec![payload]).await {
107                            debug!(message = "Couldn't send notification.", error = ?err);
108                            break;
109                        }
110                    } else {
111                        // Wrap tap in a 'sortable' wrapper, using the batch as a key, to
112                        // re-sort after random eviction.
113                        let payload = SortableOutputEventsPayload { batch, payload };
114
115                        // A simple implementation of "Algorithm R" per
116                        // https://en.wikipedia.org/wiki/Reservoir_sampling. As we're unable to
117                        // pluck the nth result, this is chosen over the more optimal "Algorithm L"
118                        // since discarding results isn't an option.
119                        if limit > results.len() {
120                            results.push(payload);
121                        } else {
122                            let random_number = rng.random_range(0..batch);
123                            if random_number < results.len() {
124                                results[random_number] = payload;
125                            }
126                        }
127                        // Increment the batch count, to be used for the next Algo R loop.
128                        batch += 1;
129                    }
130                }
131                _ = interval.tick() => {
132                    // If there are any existing results after the interval tick, emit.
133                    if !results.is_empty() {
134                        // Reset the batch count, to adjust sampling probability for the next round.
135                        batch = 0;
136
137                        // Since events will appear out of order per the random sampling
138                        // strategy, drain the existing results and sort by timestamp.
139                        results.sort_by_key(|r| r.batch);
140                        let results = results.drain(..)
141                            .map(|r| r.payload)
142                            .collect();
143
144                        // If we get an error here, it likely means that the subscription has
145                        // gone has away. This is a valid/common situation.
146                        if let Err(err) = event_tx.send(results).await {
147                            debug!(message = "Couldn't send events.", error = ?err);
148                            break;
149                        }
150                    }
151                }
152            }
153        }
154    });
155
156    ReceiverStream::new(event_rx)
157}