vector/api/schema/events/mod.rs
1pub mod encoding;
2pub mod log;
3pub mod metric;
4pub mod output;
5pub mod trace;
6
7use std::time::{SystemTime, UNIX_EPOCH};
8
9use async_graphql::{Context, Subscription};
10use encoding::EventEncodingType;
11use futures::{Stream, StreamExt, stream};
12use output::{OutputEventsPayload, from_tap_payload_to_output_events};
13use rand::{Rng, SeedableRng, rngs::SmallRng};
14use tokio::{select, sync::mpsc, time};
15use tokio_stream::wrappers::ReceiverStream;
16use vector_lib::tap::{
17 controller::{TapController, TapPatterns},
18 topology::WatchRx,
19};
20
21#[derive(Debug, Default)]
22pub struct EventsSubscription;
23
24#[Subscription]
25impl EventsSubscription {
26 /// A stream of events emitted from matched component ID patterns
27 pub async fn output_events_by_component_id_patterns<'a>(
28 &'a self,
29 ctx: &'a Context<'a>,
30 outputs_patterns: Vec<String>,
31 inputs_patterns: Option<Vec<String>>,
32 #[graphql(default = 500)] interval: u32,
33 #[graphql(default = 100, validator(minimum = 1, maximum = 10_000))] limit: u32,
34 ) -> impl Stream<Item = Vec<OutputEventsPayload>> + 'a {
35 let watch_rx = ctx.data_unchecked::<WatchRx>().clone();
36
37 let patterns = TapPatterns {
38 for_outputs: outputs_patterns.into_iter().collect(),
39 for_inputs: inputs_patterns.unwrap_or_default().into_iter().collect(),
40 };
41 // Client input is confined to `u32` to provide sensible bounds.
42 create_events_stream(watch_rx, patterns, interval as u64, limit as usize)
43 }
44}
45
46/// Creates an events stream based on component ids, and a provided interval. Will emit
47/// control messages that bubble up the application if the sink goes away. The stream contains
48/// all matching events; filtering should be done at the caller level.
49pub(crate) fn create_events_stream(
50 watch_rx: WatchRx,
51 patterns: TapPatterns,
52 interval: u64,
53 limit: usize,
54) -> impl Stream<Item = Vec<OutputEventsPayload>> {
55 // Channel for receiving individual tap payloads. Since we can process at most `limit` per
56 // interval, this is capped to the same value.
57 let (tap_tx, tap_rx) = mpsc::channel(limit);
58 let mut tap_rx = ReceiverStream::new(tap_rx)
59 .flat_map(|payload| stream::iter(from_tap_payload_to_output_events(payload)));
60
61 // The resulting vector of `Event` sent to the client. Only one result set will be streamed
62 // back to the client at a time. This value is set higher than `1` to prevent blocking the event
63 // pipeline on slower client connections, but low enough to apply a modest cap on mem usage.
64 let (event_tx, event_rx) = mpsc::channel::<Vec<OutputEventsPayload>>(10);
65
66 tokio::spawn(async move {
67 // Create a tap controller. When this drops out of scope, clean up will be performed on the
68 // event handlers and topology observation that the tap controller provides.
69 let _tap_controller = TapController::new(watch_rx, tap_tx, patterns);
70
71 // A tick interval to represent when to 'cut' the results back to the client.
72 let mut interval = time::interval(time::Duration::from_millis(interval));
73
74 // Temporary structure to hold sortable values of `Event`.
75 struct SortableOutputEventsPayload {
76 batch: usize,
77 payload: OutputEventsPayload,
78 }
79
80 // Collect a vector of results, with a capacity of `limit`. As new `Event`s come in,
81 // they will be sampled and added to results.
82 let mut results = Vec::<SortableOutputEventsPayload>::with_capacity(limit);
83
84 // Random number generator to allow for sampling. Speed trumps cryptographic security here.
85 // The RNG must be Send + Sync to use with the `select!` loop below, hence `SmallRng`.
86 let seed = SystemTime::now()
87 .duration_since(UNIX_EPOCH)
88 .unwrap()
89 .as_nanos() as u64;
90 let mut rng = SmallRng::seed_from_u64(seed);
91
92 // Keep a count of the batch size, which will be used as a seed for random eviction
93 // per the sampling strategy used below.
94 let mut batch = 0;
95
96 loop {
97 select! {
98 // Process `TapPayload`s. A tap payload could contain log/metric events or a
99 // notification. Notifications are emitted immediately; events buffer until
100 // the next `interval`.
101 Some(payload) = tap_rx.next() => {
102 // Emit notifications immediately; these don't count as a 'batch'.
103 if let OutputEventsPayload::Notification(_) = payload {
104 // If an error occurs when sending, the subscription has likely gone
105 // away. Break the loop to terminate the thread.
106 if let Err(err) = event_tx.send(vec![payload]).await {
107 debug!(message = "Couldn't send notification.", error = ?err);
108 break;
109 }
110 } else {
111 // Wrap tap in a 'sortable' wrapper, using the batch as a key, to
112 // re-sort after random eviction.
113 let payload = SortableOutputEventsPayload { batch, payload };
114
115 // A simple implementation of "Algorithm R" per
116 // https://en.wikipedia.org/wiki/Reservoir_sampling. As we're unable to
117 // pluck the nth result, this is chosen over the more optimal "Algorithm L"
118 // since discarding results isn't an option.
119 if limit > results.len() {
120 results.push(payload);
121 } else {
122 let random_number = rng.random_range(0..batch);
123 if random_number < results.len() {
124 results[random_number] = payload;
125 }
126 }
127 // Increment the batch count, to be used for the next Algo R loop.
128 batch += 1;
129 }
130 }
131 _ = interval.tick() => {
132 // If there are any existing results after the interval tick, emit.
133 if !results.is_empty() {
134 // Reset the batch count, to adjust sampling probability for the next round.
135 batch = 0;
136
137 // Since events will appear out of order per the random sampling
138 // strategy, drain the existing results and sort by timestamp.
139 results.sort_by_key(|r| r.batch);
140 let results = results.drain(..)
141 .map(|r| r.payload)
142 .collect();
143
144 // If we get an error here, it likely means that the subscription has
145 // gone has away. This is a valid/common situation.
146 if let Err(err) = event_tx.send(results).await {
147 debug!(message = "Couldn't send events.", error = ?err);
148 break;
149 }
150 }
151 }
152 }
153 }
154 });
155
156 ReceiverStream::new(event_rx)
157}