vector/sinks/websocket_server/
buffering.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
use std::{collections::VecDeque, num::NonZeroUsize};

use bytes::Bytes;
use tokio_tungstenite::tungstenite::{handshake::server::Request, Message};
use url::Url;
use uuid::Uuid;
use vector_config::configurable_component;
use vector_lib::{
    event::{Event, MaybeAsLogMut},
    lookup::lookup_v2::ConfigValuePath,
};

/// Configuration for message buffering which enables message replay for clients that connect later.
#[configurable_component]
#[derive(Clone, Debug)]
pub struct MessageBufferingConfig {
    /// Max events to hold in buffer.
    ///
    /// The buffer is backed by a ring buffer, so the oldest messages will be lost when the size
    /// limit is reached.
    #[serde(default = "default_max_events")]
    pub max_events: NonZeroUsize,

    /// Message ID path.
    ///
    /// This has to be defined to expose message ID to clients in the messages. Using that ID,
    /// clients can request replay starting from the message ID of their choosing.
    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
    pub message_id_path: Option<ConfigValuePath>,
}

const fn default_max_events() -> NonZeroUsize {
    unsafe { NonZeroUsize::new_unchecked(1000) }
}

const LAST_RECEIVED_QUERY_PARAM_NAME: &str = "last_received";

pub struct BufferReplayRequest {
    should_replay: bool,
    replay_from: Option<Uuid>,
}

impl BufferReplayRequest {
    pub const NO_REPLAY: Self = Self {
        should_replay: false,
        replay_from: None,
    };
    pub const REPLAY_ALL: Self = Self {
        should_replay: true,
        replay_from: None,
    };

    const fn with_replay_from(replay_from: Uuid) -> Self {
        Self {
            should_replay: true,
            replay_from: Some(replay_from),
        }
    }

    pub fn replay_messages(
        &self,
        buffer: &VecDeque<(Uuid, Message)>,
        replay: impl FnMut(&(Uuid, Message)),
    ) {
        if self.should_replay {
            buffer
                .iter()
                .filter(|(id, _)| Some(*id) > self.replay_from)
                .for_each(replay);
        }
    }
}

pub trait WsMessageBufferConfig {
    /// Returns true if this configuration enables buffering.
    fn should_buffer(&self) -> bool;
    /// Returns configured size of the buffer.
    fn buffer_capacity(&self) -> usize;
    /// Extracts buffer replay request from the given connection request, based on configuration.
    fn extract_message_replay_request(&self, request: &Request) -> BufferReplayRequest;
    /// Adds a message ID that can be used for requesting replay into the event.
    /// Created ID is returned to be stored in the buffer.
    fn add_replay_message_id_to_event(&self, event: &mut Event) -> Uuid;
}

impl WsMessageBufferConfig for Option<MessageBufferingConfig> {
    fn should_buffer(&self) -> bool {
        self.is_some()
    }

    fn buffer_capacity(&self) -> usize {
        self.as_ref().map_or(0, |mb| mb.max_events.get())
    }

    fn extract_message_replay_request(&self, request: &Request) -> BufferReplayRequest {
        // Early return if buffering is disabled
        if self.is_none() {
            return BufferReplayRequest::NO_REPLAY;
        }

        // Early return if query params are missing
        let Some(query_params) = request.uri().query() else {
            return BufferReplayRequest::NO_REPLAY;
        };

        // Early return if there is no query param for replay
        if !query_params.contains(LAST_RECEIVED_QUERY_PARAM_NAME) {
            return BufferReplayRequest::NO_REPLAY;
        }

        let base_url = Url::parse("ws://localhost").ok();
        match Url::options()
            .base_url(base_url.as_ref())
            .parse(request.uri().to_string().as_str())
        {
            Ok(url) => {
                if let Some((_, last_received_param_value)) = url
                    .query_pairs()
                    .find(|(k, _)| k == LAST_RECEIVED_QUERY_PARAM_NAME)
                {
                    match Uuid::parse_str(&last_received_param_value) {
                        Ok(last_received_val) => {
                            return BufferReplayRequest::with_replay_from(last_received_val)
                        }
                        Err(err) => {
                            warn!(message = "Parsing last received message UUID failed.", %err)
                        }
                    }
                }
            }
            Err(err) => {
                warn!(message = "Parsing request URL for websocket connection request failed.", %err)
            }
        }

        // Even if we can't find the provided message ID, we should dump whatever we have
        // buffered so far, because the provided message ID might have expired by now
        BufferReplayRequest::REPLAY_ALL
    }

    fn add_replay_message_id_to_event(&self, event: &mut Event) -> Uuid {
        let message_id = Uuid::now_v7();
        if let Some(MessageBufferingConfig {
            message_id_path: Some(ref message_id_path),
            ..
        }) = self
        {
            if let Some(log) = event.maybe_as_log_mut() {
                let mut buffer = [0; 36];
                let uuid = message_id.hyphenated().encode_lower(&mut buffer);
                log.value_mut()
                    .insert(message_id_path, Bytes::copy_from_slice(uuid.as_bytes()));
            }
        }
        message_id
    }
}