vector/api/schema/events/mod.rs
1pub mod encoding;
2pub mod log;
3pub mod metric;
4pub mod output;
5pub mod trace;
6
7use async_graphql::{Context, Subscription};
8use encoding::EventEncodingType;
9use futures::{stream, Stream, StreamExt};
10use output::{from_tap_payload_to_output_events, OutputEventsPayload};
11use rand::{rngs::SmallRng, Rng, SeedableRng};
12use std::time::{SystemTime, UNIX_EPOCH};
13use tokio::{select, sync::mpsc, time};
14use tokio_stream::wrappers::ReceiverStream;
15use vector_lib::tap::{
16 controller::{TapController, TapPatterns},
17 topology::WatchRx,
18};
19
20#[derive(Debug, Default)]
21pub struct EventsSubscription;
22
23#[Subscription]
24impl EventsSubscription {
25 /// A stream of events emitted from matched component ID patterns
26 pub async fn output_events_by_component_id_patterns<'a>(
27 &'a self,
28 ctx: &'a Context<'a>,
29 outputs_patterns: Vec<String>,
30 inputs_patterns: Option<Vec<String>>,
31 #[graphql(default = 500)] interval: u32,
32 #[graphql(default = 100, validator(minimum = 1, maximum = 10_000))] limit: u32,
33 ) -> impl Stream<Item = Vec<OutputEventsPayload>> + 'a {
34 let watch_rx = ctx.data_unchecked::<WatchRx>().clone();
35
36 let patterns = TapPatterns {
37 for_outputs: outputs_patterns.into_iter().collect(),
38 for_inputs: inputs_patterns.unwrap_or_default().into_iter().collect(),
39 };
40 // Client input is confined to `u32` to provide sensible bounds.
41 create_events_stream(watch_rx, patterns, interval as u64, limit as usize)
42 }
43}
44
45/// Creates an events stream based on component ids, and a provided interval. Will emit
46/// control messages that bubble up the application if the sink goes away. The stream contains
47/// all matching events; filtering should be done at the caller level.
48pub(crate) fn create_events_stream(
49 watch_rx: WatchRx,
50 patterns: TapPatterns,
51 interval: u64,
52 limit: usize,
53) -> impl Stream<Item = Vec<OutputEventsPayload>> {
54 // Channel for receiving individual tap payloads. Since we can process at most `limit` per
55 // interval, this is capped to the same value.
56 let (tap_tx, tap_rx) = mpsc::channel(limit);
57 let mut tap_rx = ReceiverStream::new(tap_rx)
58 .flat_map(|payload| stream::iter(from_tap_payload_to_output_events(payload)));
59
60 // The resulting vector of `Event` sent to the client. Only one result set will be streamed
61 // back to the client at a time. This value is set higher than `1` to prevent blocking the event
62 // pipeline on slower client connections, but low enough to apply a modest cap on mem usage.
63 let (event_tx, event_rx) = mpsc::channel::<Vec<OutputEventsPayload>>(10);
64
65 tokio::spawn(async move {
66 // Create a tap controller. When this drops out of scope, clean up will be performed on the
67 // event handlers and topology observation that the tap controller provides.
68 let _tap_controller = TapController::new(watch_rx, tap_tx, patterns);
69
70 // A tick interval to represent when to 'cut' the results back to the client.
71 let mut interval = time::interval(time::Duration::from_millis(interval));
72
73 // Temporary structure to hold sortable values of `Event`.
74 struct SortableOutputEventsPayload {
75 batch: usize,
76 payload: OutputEventsPayload,
77 }
78
79 // Collect a vector of results, with a capacity of `limit`. As new `Event`s come in,
80 // they will be sampled and added to results.
81 let mut results = Vec::<SortableOutputEventsPayload>::with_capacity(limit);
82
83 // Random number generator to allow for sampling. Speed trumps cryptographic security here.
84 // The RNG must be Send + Sync to use with the `select!` loop below, hence `SmallRng`.
85 let seed = SystemTime::now()
86 .duration_since(UNIX_EPOCH)
87 .unwrap()
88 .as_nanos() as u64;
89 let mut rng = SmallRng::seed_from_u64(seed);
90
91 // Keep a count of the batch size, which will be used as a seed for random eviction
92 // per the sampling strategy used below.
93 let mut batch = 0;
94
95 loop {
96 select! {
97 // Process `TapPayload`s. A tap payload could contain log/metric events or a
98 // notification. Notifications are emitted immediately; events buffer until
99 // the next `interval`.
100 Some(payload) = tap_rx.next() => {
101 // Emit notifications immediately; these don't count as a 'batch'.
102 if let OutputEventsPayload::Notification(_) = payload {
103 // If an error occurs when sending, the subscription has likely gone
104 // away. Break the loop to terminate the thread.
105 if let Err(err) = event_tx.send(vec![payload]).await {
106 debug!(message = "Couldn't send notification.", error = ?err);
107 break;
108 }
109 } else {
110 // Wrap tap in a 'sortable' wrapper, using the batch as a key, to
111 // re-sort after random eviction.
112 let payload = SortableOutputEventsPayload { batch, payload };
113
114 // A simple implementation of "Algorithm R" per
115 // https://en.wikipedia.org/wiki/Reservoir_sampling. As we're unable to
116 // pluck the nth result, this is chosen over the more optimal "Algorithm L"
117 // since discarding results isn't an option.
118 if limit > results.len() {
119 results.push(payload);
120 } else {
121 let random_number = rng.random_range(0..batch);
122 if random_number < results.len() {
123 results[random_number] = payload;
124 }
125 }
126 // Increment the batch count, to be used for the next Algo R loop.
127 batch += 1;
128 }
129 }
130 _ = interval.tick() => {
131 // If there are any existing results after the interval tick, emit.
132 if !results.is_empty() {
133 // Reset the batch count, to adjust sampling probability for the next round.
134 batch = 0;
135
136 // Since events will appear out of order per the random sampling
137 // strategy, drain the existing results and sort by timestamp.
138 results.sort_by_key(|r| r.batch);
139 let results = results.drain(..)
140 .map(|r| r.payload)
141 .collect();
142
143 // If we get an error here, it likely means that the subscription has
144 // gone has away. This is a valid/common situation.
145 if let Err(err) = event_tx.send(results).await {
146 debug!(message = "Couldn't send events.", error = ?err);
147 break;
148 }
149 }
150 }
151 }
152 }
153 });
154
155 ReceiverStream::new(event_rx)
156}