vector/sinks/websocket_server/
buffering.rsuse std::{collections::VecDeque, num::NonZeroUsize};
use bytes::Bytes;
use tokio_tungstenite::tungstenite::{handshake::server::Request, Message};
use url::Url;
use uuid::Uuid;
use vector_config::configurable_component;
use vector_lib::{
event::{Event, MaybeAsLogMut},
lookup::lookup_v2::ConfigValuePath,
};
#[configurable_component]
#[derive(Clone, Debug)]
pub struct MessageBufferingConfig {
#[serde(default = "default_max_events")]
pub max_events: NonZeroUsize,
#[serde(default, skip_serializing_if = "crate::serde::is_default")]
pub message_id_path: Option<ConfigValuePath>,
}
const fn default_max_events() -> NonZeroUsize {
unsafe { NonZeroUsize::new_unchecked(1000) }
}
const LAST_RECEIVED_QUERY_PARAM_NAME: &str = "last_received";
pub struct BufferReplayRequest {
should_replay: bool,
replay_from: Option<Uuid>,
}
impl BufferReplayRequest {
pub const NO_REPLAY: Self = Self {
should_replay: false,
replay_from: None,
};
pub const REPLAY_ALL: Self = Self {
should_replay: true,
replay_from: None,
};
const fn with_replay_from(replay_from: Uuid) -> Self {
Self {
should_replay: true,
replay_from: Some(replay_from),
}
}
pub fn replay_messages(
&self,
buffer: &VecDeque<(Uuid, Message)>,
replay: impl FnMut(&(Uuid, Message)),
) {
if self.should_replay {
buffer
.iter()
.filter(|(id, _)| Some(*id) > self.replay_from)
.for_each(replay);
}
}
}
pub trait WsMessageBufferConfig {
fn should_buffer(&self) -> bool;
fn buffer_capacity(&self) -> usize;
fn extract_message_replay_request(&self, request: &Request) -> BufferReplayRequest;
fn add_replay_message_id_to_event(&self, event: &mut Event) -> Uuid;
}
impl WsMessageBufferConfig for Option<MessageBufferingConfig> {
fn should_buffer(&self) -> bool {
self.is_some()
}
fn buffer_capacity(&self) -> usize {
self.as_ref().map_or(0, |mb| mb.max_events.get())
}
fn extract_message_replay_request(&self, request: &Request) -> BufferReplayRequest {
if self.is_none() {
return BufferReplayRequest::NO_REPLAY;
}
let Some(query_params) = request.uri().query() else {
return BufferReplayRequest::NO_REPLAY;
};
if !query_params.contains(LAST_RECEIVED_QUERY_PARAM_NAME) {
return BufferReplayRequest::NO_REPLAY;
}
let base_url = Url::parse("ws://localhost").ok();
match Url::options()
.base_url(base_url.as_ref())
.parse(request.uri().to_string().as_str())
{
Ok(url) => {
if let Some((_, last_received_param_value)) = url
.query_pairs()
.find(|(k, _)| k == LAST_RECEIVED_QUERY_PARAM_NAME)
{
match Uuid::parse_str(&last_received_param_value) {
Ok(last_received_val) => {
return BufferReplayRequest::with_replay_from(last_received_val)
}
Err(err) => {
warn!(message = "Parsing last received message UUID failed.", %err)
}
}
}
}
Err(err) => {
warn!(message = "Parsing request URL for websocket connection request failed.", %err)
}
}
BufferReplayRequest::REPLAY_ALL
}
fn add_replay_message_id_to_event(&self, event: &mut Event) -> Uuid {
let message_id = Uuid::now_v7();
if let Some(MessageBufferingConfig {
message_id_path: Some(ref message_id_path),
..
}) = self
{
if let Some(log) = event.maybe_as_log_mut() {
let mut buffer = [0; 36];
let uuid = message_id.hyphenated().encode_lower(&mut buffer);
log.value_mut()
.insert(message_id_path, Bytes::copy_from_slice(uuid.as_bytes()));
}
}
message_id
}
}