vector/sources/socket/
udp.rs

1use std::net::{Ipv4Addr, SocketAddr};
2
3use bytes::BytesMut;
4use chrono::Utc;
5use futures::StreamExt;
6use listenfd::ListenFd;
7use vector_lib::{
8    EstimatedJsonEncodedSizeOf,
9    codecs::{
10        DecoderFramedRead, StreamDecodingError,
11        decoding::{DeserializerConfig, FramingConfig},
12    },
13    config::{LegacyKey, LogNamespace},
14    configurable::configurable_component,
15    internal_event::{ByteSize, BytesReceived, InternalEventHandle as _, Protocol},
16    lookup::{lookup_v2::OptionalValuePath, owned_value_path, path},
17};
18
19use super::default_host_key;
20use crate::{
21    SourceSender,
22    codecs::Decoder,
23    event::Event,
24    internal_events::{
25        SocketBindError, SocketEventsReceived, SocketMode, SocketMulticastGroupJoinError,
26        SocketReceiveError, StreamClosedError,
27    },
28    net,
29    serde::default_decoding,
30    shutdown::ShutdownSignal,
31    sources::{
32        Source,
33        socket::SocketConfig,
34        util::net::{SocketListenAddr, try_bind_udp_socket},
35    },
36};
37
38/// UDP configuration for the `socket` source.
39#[configurable_component]
40#[serde(deny_unknown_fields)]
41#[derive(Clone, Debug)]
42pub struct UdpConfig {
43    #[configurable(derived)]
44    address: SocketListenAddr,
45
46    /// List of IPv4 multicast groups to join on socket's binding process.
47    ///
48    /// In order to read multicast packets, this source's listening address should be set to `0.0.0.0`.
49    /// If any other address is used (such as `127.0.0.1` or an specific interface address), the
50    /// listening interface will filter out all multicast packets received,
51    /// as their target IP would be the one of the multicast group
52    /// and it will not match the socket's bound IP.
53    ///
54    /// Note that this setting will only work if the source's address
55    /// is an IPv4 address (IPv6 and systemd file descriptor as source's address are not supported
56    /// with multicast groups).
57    #[serde(default)]
58    #[configurable(metadata(docs::examples = "['224.0.0.2', '224.0.0.4']"))]
59    pub(super) multicast_groups: Vec<Ipv4Addr>,
60
61    /// The maximum buffer size of incoming messages.
62    ///
63    /// Messages larger than this are truncated.
64    #[serde(default = "default_max_length")]
65    #[configurable(metadata(docs::type_unit = "bytes"))]
66    pub(super) max_length: usize,
67
68    /// Overrides the name of the log field used to add the peer host to each event.
69    ///
70    /// The value will be the peer host's address, including the port i.e. `1.2.3.4:9000`.
71    ///
72    /// By default, the [global `log_schema.host_key` option][global_host_key] is used.
73    ///
74    /// Set to `""` to suppress this key.
75    ///
76    /// [global_host_key]: https://vector.dev/docs/reference/configuration/global-options/#log_schema.host_key
77    host_key: Option<OptionalValuePath>,
78
79    /// Overrides the name of the log field used to add the peer host's port to each event.
80    ///
81    /// The value will be the peer host's port i.e. `9000`.
82    ///
83    /// By default, `"port"` is used.
84    ///
85    /// Set to `""` to suppress this key.
86    #[serde(default = "default_port_key")]
87    port_key: OptionalValuePath,
88
89    /// The size of the receive buffer used for the listening socket.
90    #[configurable(metadata(docs::type_unit = "bytes"))]
91    receive_buffer_bytes: Option<usize>,
92
93    #[configurable(derived)]
94    pub(super) framing: Option<FramingConfig>,
95
96    #[configurable(derived)]
97    #[serde(default = "default_decoding")]
98    pub(super) decoding: DeserializerConfig,
99
100    /// The namespace to use for logs. This overrides the global setting.
101    #[serde(default)]
102    #[configurable(metadata(docs::hidden))]
103    pub log_namespace: Option<bool>,
104}
105
106fn default_port_key() -> OptionalValuePath {
107    OptionalValuePath::from(owned_value_path!("port"))
108}
109
110fn default_max_length() -> usize {
111    crate::serde::default_max_length()
112}
113
114impl UdpConfig {
115    pub(super) fn host_key(&self) -> OptionalValuePath {
116        self.host_key.clone().unwrap_or(default_host_key())
117    }
118
119    pub const fn port_key(&self) -> &OptionalValuePath {
120        &self.port_key
121    }
122
123    pub(super) const fn framing(&self) -> &Option<FramingConfig> {
124        &self.framing
125    }
126
127    pub(super) const fn decoding(&self) -> &DeserializerConfig {
128        &self.decoding
129    }
130
131    pub(super) const fn address(&self) -> SocketListenAddr {
132        self.address
133    }
134
135    pub fn from_address(address: SocketListenAddr) -> Self {
136        Self {
137            address,
138            multicast_groups: Vec::new(),
139            max_length: default_max_length(),
140            host_key: None,
141            port_key: default_port_key(),
142            receive_buffer_bytes: None,
143            framing: None,
144            decoding: default_decoding(),
145            log_namespace: None,
146        }
147    }
148
149    pub const fn set_log_namespace(&mut self, val: Option<bool>) -> &mut Self {
150        self.log_namespace = val;
151        self
152    }
153}
154
155pub(super) fn udp(
156    config: UdpConfig,
157    decoder: Decoder,
158    mut shutdown: ShutdownSignal,
159    mut out: SourceSender,
160    log_namespace: LogNamespace,
161) -> Source {
162    Box::pin(async move {
163        let listenfd = ListenFd::from_env();
164        let socket = try_bind_udp_socket(config.address, listenfd)
165            .await
166            .map_err(|error| {
167                emit!(SocketBindError {
168                    mode: SocketMode::Udp,
169                    error,
170                })
171            })?;
172
173        if !config.multicast_groups.is_empty() {
174            socket.set_multicast_loop_v4(true).unwrap();
175            let listen_addr = match config.address() {
176                SocketListenAddr::SocketAddr(SocketAddr::V4(addr)) => addr,
177                SocketListenAddr::SocketAddr(SocketAddr::V6(_)) => {
178                    // We could support Ipv6 multicast with the
179                    // https://doc.rust-lang.org/std/net/struct.UdpSocket.html#method.join_multicast_v6 method
180                    // and specifying the interface index as `0`, in order to bind all interfaces.
181                    unimplemented!("IPv6 multicast is not supported")
182                }
183                SocketListenAddr::SystemdFd(_) => {
184                    unimplemented!("Multicast for systemd fd sockets is not supported")
185                }
186            };
187            for group_addr in config.multicast_groups {
188                let interface = *listen_addr.ip();
189                socket
190                    .join_multicast_v4(group_addr, interface)
191                    .map_err(|error| {
192                        emit!(SocketMulticastGroupJoinError {
193                            error,
194                            group_addr,
195                            interface,
196                        })
197                    })?;
198                info!(message = "Joined multicast group.", group = %group_addr);
199            }
200        }
201
202        if let Some(receive_buffer_bytes) = config.receive_buffer_bytes
203            && let Err(error) = net::set_receive_buffer_size(&socket, receive_buffer_bytes)
204        {
205            warn!(message = "Failed configuring receive buffer size on UDP socket.", %error);
206        }
207
208        let mut max_length = config.max_length;
209
210        if let Some(receive_buffer_bytes) = config.receive_buffer_bytes {
211            max_length = std::cmp::min(max_length, receive_buffer_bytes);
212        }
213
214        let bytes_received = register!(BytesReceived::from(Protocol::UDP));
215
216        info!(message = "Listening.", address = %config.address);
217        // We add 1 to the max_length in order to determine if the received data has been truncated.
218        let mut buf = BytesMut::with_capacity(max_length + 1);
219        loop {
220            buf.resize(max_length + 1, 0);
221            tokio::select! {
222                recv = socket.recv_from(&mut buf) => {
223                    let (byte_size, address) = match recv {
224                        Ok(res) => res,
225                        Err(error) => {
226                            #[cfg(windows)]
227                            if let Some(err) = error.raw_os_error() {
228                                if err == 10040 {
229                                    // 10040 is the Windows error that the Udp message has exceeded max_length
230                                    warn!(
231                                        message = "Discarding frame larger than max_length.",
232                                        max_length = max_length
233                                    );
234                                    continue;
235                                }
236                            }
237
238                            return Err(emit!(SocketReceiveError {
239                                mode: SocketMode::Udp,
240                                error
241                            }));
242                       }
243                    };
244
245                    bytes_received.emit(ByteSize(byte_size));
246                    let payload = buf.split_to(byte_size);
247                    let truncated = byte_size == max_length + 1;
248                    let mut stream =
249                        DecoderFramedRead::new(payload.as_ref(), decoder.clone()).peekable();
250
251                    while let Some(result) = stream.next().await {
252                        let last = Pin::new(&mut stream).peek().await.is_none();
253                        match result {
254                            Ok((mut events, _byte_size)) => {
255                                if last && truncated {
256                                    // The last event in this payload was truncated, so we want to drop it.
257                                    _ = events.pop();
258                                    warn!(
259                                        message = "Discarding frame larger than max_length.",
260                                        max_length = max_length
261                                    );
262                                }
263
264                                if events.is_empty() {
265                                    continue;
266                                }
267
268                                let count = events.len();
269                                emit!(SocketEventsReceived {
270                                    mode: SocketMode::Udp,
271                                    byte_size: events.estimated_json_encoded_size_of(),
272                                    count,
273                                });
274
275                                let now = Utc::now();
276
277                                for event in &mut events {
278                                    if let Event::Log(log) = event {
279                                        log_namespace.insert_standard_vector_source_metadata(
280                                            log,
281                                            SocketConfig::NAME,
282                                            now,
283                                        );
284
285                                        let legacy_host_key = config
286                                            .host_key
287                                            .clone()
288                                            .unwrap_or(default_host_key())
289                                            .path;
290
291                                        log_namespace.insert_source_metadata(
292                                            SocketConfig::NAME,
293                                            log,
294                                            legacy_host_key.as_ref().map(LegacyKey::InsertIfEmpty),
295                                            path!("host"),
296                                            address.ip().to_string()
297                                        );
298
299                                        let legacy_port_key = config.port_key.clone().path;
300
301                                        log_namespace.insert_source_metadata(
302                                            SocketConfig::NAME,
303                                            log,
304                                            legacy_port_key.as_ref().map(LegacyKey::InsertIfEmpty),
305                                            path!("port"),
306                                            address.port()
307                                        );
308                                    }
309                                }
310
311                                tokio::select!{
312                                    result = out.send_batch(events) => {
313                                        if result.is_err() {
314                                            emit!(StreamClosedError { count });
315                                            return Ok(())
316                                        }
317                                    }
318                                    _ = &mut shutdown => return Ok(()),
319                                }
320                            }
321                            Err(error) => {
322                                // Error is logged by `vector_lib::codecs::Decoder`, no
323                                // further handling is needed here.
324                                if !error.can_continue() {
325                                    break;
326                                }
327                            }
328                        }
329                    }
330                }
331                _ = &mut shutdown => return Ok(()),
332            }
333        }
334    })
335}