vector/sources/socket/
udp.rs

1use std::net::{Ipv4Addr, SocketAddr};
2
3use bytes::BytesMut;
4use chrono::Utc;
5use futures::StreamExt;
6use listenfd::ListenFd;
7use tokio_util::codec::FramedRead;
8use vector_lib::{
9    EstimatedJsonEncodedSizeOf,
10    codecs::{
11        StreamDecodingError,
12        decoding::{DeserializerConfig, FramingConfig},
13    },
14    config::{LegacyKey, LogNamespace},
15    configurable::configurable_component,
16    internal_event::{ByteSize, BytesReceived, InternalEventHandle as _, Protocol},
17    lookup::{lookup_v2::OptionalValuePath, owned_value_path, path},
18};
19
20use super::default_host_key;
21use crate::{
22    SourceSender,
23    codecs::Decoder,
24    event::Event,
25    internal_events::{
26        SocketBindError, SocketEventsReceived, SocketMode, SocketMulticastGroupJoinError,
27        SocketReceiveError, StreamClosedError,
28    },
29    net,
30    serde::default_decoding,
31    shutdown::ShutdownSignal,
32    sources::{
33        Source,
34        socket::SocketConfig,
35        util::net::{SocketListenAddr, try_bind_udp_socket},
36    },
37};
38
39/// UDP configuration for the `socket` source.
40#[configurable_component]
41#[serde(deny_unknown_fields)]
42#[derive(Clone, Debug)]
43pub struct UdpConfig {
44    #[configurable(derived)]
45    address: SocketListenAddr,
46
47    /// List of IPv4 multicast groups to join on socket's binding process.
48    ///
49    /// In order to read multicast packets, this source's listening address should be set to `0.0.0.0`.
50    /// If any other address is used (such as `127.0.0.1` or an specific interface address), the
51    /// listening interface will filter out all multicast packets received,
52    /// as their target IP would be the one of the multicast group
53    /// and it will not match the socket's bound IP.
54    ///
55    /// Note that this setting will only work if the source's address
56    /// is an IPv4 address (IPv6 and systemd file descriptor as source's address are not supported
57    /// with multicast groups).
58    #[serde(default)]
59    #[configurable(metadata(docs::examples = "['224.0.0.2', '224.0.0.4']"))]
60    pub(super) multicast_groups: Vec<Ipv4Addr>,
61
62    /// The maximum buffer size of incoming messages.
63    ///
64    /// Messages larger than this are truncated.
65    #[serde(default = "default_max_length")]
66    #[configurable(metadata(docs::type_unit = "bytes"))]
67    pub(super) max_length: usize,
68
69    /// Overrides the name of the log field used to add the peer host to each event.
70    ///
71    /// The value will be the peer host's address, including the port i.e. `1.2.3.4:9000`.
72    ///
73    /// By default, the [global `log_schema.host_key` option][global_host_key] is used.
74    ///
75    /// Set to `""` to suppress this key.
76    ///
77    /// [global_host_key]: https://vector.dev/docs/reference/configuration/global-options/#log_schema.host_key
78    host_key: Option<OptionalValuePath>,
79
80    /// Overrides the name of the log field used to add the peer host's port to each event.
81    ///
82    /// The value will be the peer host's port i.e. `9000`.
83    ///
84    /// By default, `"port"` is used.
85    ///
86    /// Set to `""` to suppress this key.
87    #[serde(default = "default_port_key")]
88    port_key: OptionalValuePath,
89
90    /// The size of the receive buffer used for the listening socket.
91    #[configurable(metadata(docs::type_unit = "bytes"))]
92    receive_buffer_bytes: Option<usize>,
93
94    #[configurable(derived)]
95    pub(super) framing: Option<FramingConfig>,
96
97    #[configurable(derived)]
98    #[serde(default = "default_decoding")]
99    pub(super) decoding: DeserializerConfig,
100
101    /// The namespace to use for logs. This overrides the global setting.
102    #[serde(default)]
103    #[configurable(metadata(docs::hidden))]
104    pub log_namespace: Option<bool>,
105}
106
107fn default_port_key() -> OptionalValuePath {
108    OptionalValuePath::from(owned_value_path!("port"))
109}
110
111fn default_max_length() -> usize {
112    crate::serde::default_max_length()
113}
114
115impl UdpConfig {
116    pub(super) fn host_key(&self) -> OptionalValuePath {
117        self.host_key.clone().unwrap_or(default_host_key())
118    }
119
120    pub const fn port_key(&self) -> &OptionalValuePath {
121        &self.port_key
122    }
123
124    pub(super) const fn framing(&self) -> &Option<FramingConfig> {
125        &self.framing
126    }
127
128    pub(super) const fn decoding(&self) -> &DeserializerConfig {
129        &self.decoding
130    }
131
132    pub(super) const fn address(&self) -> SocketListenAddr {
133        self.address
134    }
135
136    pub fn from_address(address: SocketListenAddr) -> Self {
137        Self {
138            address,
139            multicast_groups: Vec::new(),
140            max_length: default_max_length(),
141            host_key: None,
142            port_key: default_port_key(),
143            receive_buffer_bytes: None,
144            framing: None,
145            decoding: default_decoding(),
146            log_namespace: None,
147        }
148    }
149
150    pub const fn set_log_namespace(&mut self, val: Option<bool>) -> &mut Self {
151        self.log_namespace = val;
152        self
153    }
154}
155
156pub(super) fn udp(
157    config: UdpConfig,
158    decoder: Decoder,
159    mut shutdown: ShutdownSignal,
160    mut out: SourceSender,
161    log_namespace: LogNamespace,
162) -> Source {
163    Box::pin(async move {
164        let listenfd = ListenFd::from_env();
165        let socket = try_bind_udp_socket(config.address, listenfd)
166            .await
167            .map_err(|error| {
168                emit!(SocketBindError {
169                    mode: SocketMode::Udp,
170                    error,
171                })
172            })?;
173
174        if !config.multicast_groups.is_empty() {
175            socket.set_multicast_loop_v4(true).unwrap();
176            let listen_addr = match config.address() {
177                SocketListenAddr::SocketAddr(SocketAddr::V4(addr)) => addr,
178                SocketListenAddr::SocketAddr(SocketAddr::V6(_)) => {
179                    // We could support Ipv6 multicast with the
180                    // https://doc.rust-lang.org/std/net/struct.UdpSocket.html#method.join_multicast_v6 method
181                    // and specifying the interface index as `0`, in order to bind all interfaces.
182                    unimplemented!("IPv6 multicast is not supported")
183                }
184                SocketListenAddr::SystemdFd(_) => {
185                    unimplemented!("Multicast for systemd fd sockets is not supported")
186                }
187            };
188            for group_addr in config.multicast_groups {
189                let interface = *listen_addr.ip();
190                socket
191                    .join_multicast_v4(group_addr, interface)
192                    .map_err(|error| {
193                        emit!(SocketMulticastGroupJoinError {
194                            error,
195                            group_addr,
196                            interface,
197                        })
198                    })?;
199                info!(message = "Joined multicast group.", group = %group_addr);
200            }
201        }
202
203        if let Some(receive_buffer_bytes) = config.receive_buffer_bytes
204            && let Err(error) = net::set_receive_buffer_size(&socket, receive_buffer_bytes)
205        {
206            warn!(message = "Failed configuring receive buffer size on UDP socket.", %error);
207        }
208
209        let mut max_length = config.max_length;
210
211        if let Some(receive_buffer_bytes) = config.receive_buffer_bytes {
212            max_length = std::cmp::min(max_length, receive_buffer_bytes);
213        }
214
215        let bytes_received = register!(BytesReceived::from(Protocol::UDP));
216
217        info!(message = "Listening.", address = %config.address);
218        // We add 1 to the max_length in order to determine if the received data has been truncated.
219        let mut buf = BytesMut::with_capacity(max_length + 1);
220        loop {
221            buf.resize(max_length + 1, 0);
222            tokio::select! {
223                recv = socket.recv_from(&mut buf) => {
224                    let (byte_size, address) = match recv {
225                        Ok(res) => res,
226                        Err(error) => {
227                            #[cfg(windows)]
228                            if let Some(err) = error.raw_os_error() {
229                                if err == 10040 {
230                                    // 10040 is the Windows error that the Udp message has exceeded max_length
231                                    warn!(
232                                        message = "Discarding frame larger than max_length.",
233                                        max_length = max_length
234                                    );
235                                    continue;
236                                }
237                            }
238
239                            return Err(emit!(SocketReceiveError {
240                                mode: SocketMode::Udp,
241                                error
242                            }));
243                       }
244                    };
245
246                    bytes_received.emit(ByteSize(byte_size));
247                    let payload = buf.split_to(byte_size);
248                    let truncated = byte_size == max_length + 1;
249                    let mut stream = FramedRead::new(payload.as_ref(), decoder.clone()).peekable();
250
251                    while let Some(result) = stream.next().await {
252                        let last = Pin::new(&mut stream).peek().await.is_none();
253                        match result {
254                            Ok((mut events, _byte_size)) => {
255                                if last && truncated {
256                                    // The last event in this payload was truncated, so we want to drop it.
257                                    _ = events.pop();
258                                    warn!(
259                                        message = "Discarding frame larger than max_length.",
260                                        max_length = max_length
261                                    );
262                                }
263
264                                if events.is_empty() {
265                                    continue;
266                                }
267
268                                let count = events.len();
269                                emit!(SocketEventsReceived {
270                                    mode: SocketMode::Udp,
271                                    byte_size: events.estimated_json_encoded_size_of(),
272                                    count,
273                                });
274
275                                let now = Utc::now();
276
277                                for event in &mut events {
278                                    if let Event::Log(log) = event {
279                                        log_namespace.insert_standard_vector_source_metadata(
280                                            log,
281                                            SocketConfig::NAME,
282                                            now,
283                                        );
284
285                                        let legacy_host_key = config
286                                            .host_key
287                                            .clone()
288                                            .unwrap_or(default_host_key())
289                                            .path;
290
291                                        log_namespace.insert_source_metadata(
292                                            SocketConfig::NAME,
293                                            log,
294                                            legacy_host_key.as_ref().map(LegacyKey::InsertIfEmpty),
295                                            path!("host"),
296                                            address.ip().to_string()
297                                        );
298
299                                        let legacy_port_key = config.port_key.clone().path;
300
301                                        log_namespace.insert_source_metadata(
302                                            SocketConfig::NAME,
303                                            log,
304                                            legacy_port_key.as_ref().map(LegacyKey::InsertIfEmpty),
305                                            path!("port"),
306                                            address.port()
307                                        );
308                                    }
309                                }
310
311                                tokio::select!{
312                                    result = out.send_batch(events) => {
313                                        if result.is_err() {
314                                            emit!(StreamClosedError { count });
315                                            return Ok(())
316                                        }
317                                    }
318                                    _ = &mut shutdown => return Ok(()),
319                                }
320                            }
321                            Err(error) => {
322                                // Error is logged by `crate::codecs::Decoder`, no
323                                // further handling is needed here.
324                                if !error.can_continue() {
325                                    break;
326                                }
327                            }
328                        }
329                    }
330                }
331                _ = &mut shutdown => return Ok(()),
332            }
333        }
334    })
335}