1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151
pub mod encoding;
pub mod log;
pub mod metric;
pub mod output;
pub mod trace;
use async_graphql::{Context, Subscription};
use encoding::EventEncodingType;
use futures::{stream, Stream, StreamExt};
use output::{from_tap_payload_to_output_events, OutputEventsPayload};
use rand::{rngs::SmallRng, Rng, SeedableRng};
use tokio::{select, sync::mpsc, time};
use tokio_stream::wrappers::ReceiverStream;
use vector_lib::tap::{
controller::{TapController, TapPatterns},
topology::WatchRx,
};
#[derive(Debug, Default)]
pub struct EventsSubscription;
#[Subscription]
impl EventsSubscription {
/// A stream of events emitted from matched component ID patterns
pub async fn output_events_by_component_id_patterns<'a>(
&'a self,
ctx: &'a Context<'a>,
outputs_patterns: Vec<String>,
inputs_patterns: Option<Vec<String>>,
#[graphql(default = 500)] interval: u32,
#[graphql(default = 100, validator(minimum = 1, maximum = 10_000))] limit: u32,
) -> impl Stream<Item = Vec<OutputEventsPayload>> + 'a {
let watch_rx = ctx.data_unchecked::<WatchRx>().clone();
let patterns = TapPatterns {
for_outputs: outputs_patterns.into_iter().collect(),
for_inputs: inputs_patterns.unwrap_or_default().into_iter().collect(),
};
// Client input is confined to `u32` to provide sensible bounds.
create_events_stream(watch_rx, patterns, interval as u64, limit as usize)
}
}
/// Creates an events stream based on component ids, and a provided interval. Will emit
/// control messages that bubble up the application if the sink goes away. The stream contains
/// all matching events; filtering should be done at the caller level.
pub(crate) fn create_events_stream(
watch_rx: WatchRx,
patterns: TapPatterns,
interval: u64,
limit: usize,
) -> impl Stream<Item = Vec<OutputEventsPayload>> {
// Channel for receiving individual tap payloads. Since we can process at most `limit` per
// interval, this is capped to the same value.
let (tap_tx, tap_rx) = mpsc::channel(limit);
let mut tap_rx = ReceiverStream::new(tap_rx)
.flat_map(|payload| stream::iter(from_tap_payload_to_output_events(payload)));
// The resulting vector of `Event` sent to the client. Only one result set will be streamed
// back to the client at a time. This value is set higher than `1` to prevent blocking the event
// pipeline on slower client connections, but low enough to apply a modest cap on mem usage.
let (event_tx, event_rx) = mpsc::channel::<Vec<OutputEventsPayload>>(10);
tokio::spawn(async move {
// Create a tap controller. When this drops out of scope, clean up will be performed on the
// event handlers and topology observation that the tap controller provides.
let _tap_controller = TapController::new(watch_rx, tap_tx, patterns);
// A tick interval to represent when to 'cut' the results back to the client.
let mut interval = time::interval(time::Duration::from_millis(interval));
// Temporary structure to hold sortable values of `Event`.
struct SortableOutputEventsPayload {
batch: usize,
payload: OutputEventsPayload,
}
// Collect a vector of results, with a capacity of `limit`. As new `Event`s come in,
// they will be sampled and added to results.
let mut results = Vec::<SortableOutputEventsPayload>::with_capacity(limit);
// Random number generator to allow for sampling. Speed trumps cryptographic security here.
// The RNG must be Send + Sync to use with the `select!` loop below, hence `SmallRng`.
let mut rng = SmallRng::from_entropy();
// Keep a count of the batch size, which will be used as a seed for random eviction
// per the sampling strategy used below.
let mut batch = 0;
loop {
select! {
// Process `TapPayload`s. A tap payload could contain log/metric events or a
// notification. Notifications are emitted immediately; events buffer until
// the next `interval`.
Some(payload) = tap_rx.next() => {
// Emit notifications immediately; these don't count as a 'batch'.
if let OutputEventsPayload::Notification(_) = payload {
// If an error occurs when sending, the subscription has likely gone
// away. Break the loop to terminate the thread.
if let Err(err) = event_tx.send(vec![payload]).await {
debug!(message = "Couldn't send notification.", error = ?err);
break;
}
} else {
// Wrap tap in a 'sortable' wrapper, using the batch as a key, to
// re-sort after random eviction.
let payload = SortableOutputEventsPayload { batch, payload };
// A simple implementation of "Algorithm R" per
// https://en.wikipedia.org/wiki/Reservoir_sampling. As we're unable to
// pluck the nth result, this is chosen over the more optimal "Algorithm L"
// since discarding results isn't an option.
if limit > results.len() {
results.push(payload);
} else {
let random_number = rng.gen_range(0..batch);
if random_number < results.len() {
results[random_number] = payload;
}
}
// Increment the batch count, to be used for the next Algo R loop.
batch += 1;
}
}
_ = interval.tick() => {
// If there are any existing results after the interval tick, emit.
if !results.is_empty() {
// Reset the batch count, to adjust sampling probability for the next round.
batch = 0;
// Since events will appear out of order per the random sampling
// strategy, drain the existing results and sort by timestamp.
results.sort_by_key(|r| r.batch);
let results = results.drain(..)
.map(|r| r.payload)
.collect();
// If we get an error here, it likely means that the subscription has
// gone has away. This is a valid/common situation.
if let Err(err) = event_tx.send(results).await {
debug!(message = "Couldn't send events.", error = ?err);
break;
}
}
}
}
}
});
ReceiverStream::new(event_rx)
}