vector/sources/socket/
udp.rs

1use std::net::{Ipv4Addr, SocketAddr};
2
3use super::default_host_key;
4use bytes::BytesMut;
5use chrono::Utc;
6use futures::StreamExt;
7use listenfd::ListenFd;
8use tokio_util::codec::FramedRead;
9use vector_lib::codecs::{
10    decoding::{DeserializerConfig, FramingConfig},
11    StreamDecodingError,
12};
13use vector_lib::configurable::configurable_component;
14use vector_lib::internal_event::{ByteSize, BytesReceived, InternalEventHandle as _, Protocol};
15use vector_lib::lookup::{lookup_v2::OptionalValuePath, owned_value_path, path};
16use vector_lib::{
17    config::{LegacyKey, LogNamespace},
18    EstimatedJsonEncodedSizeOf,
19};
20
21use crate::{
22    codecs::Decoder,
23    event::Event,
24    internal_events::{
25        SocketBindError, SocketEventsReceived, SocketMode, SocketMulticastGroupJoinError,
26        SocketReceiveError, StreamClosedError,
27    },
28    net,
29    serde::default_decoding,
30    shutdown::ShutdownSignal,
31    sources::{
32        socket::SocketConfig,
33        util::net::{try_bind_udp_socket, SocketListenAddr},
34        Source,
35    },
36    SourceSender,
37};
38
39/// UDP configuration for the `socket` source.
40#[configurable_component]
41#[serde(deny_unknown_fields)]
42#[derive(Clone, Debug)]
43pub struct UdpConfig {
44    #[configurable(derived)]
45    address: SocketListenAddr,
46
47    /// List of IPv4 multicast groups to join on socket's binding process.
48    ///
49    /// In order to read multicast packets, this source's listening address should be set to `0.0.0.0`.
50    /// If any other address is used (such as `127.0.0.1` or an specific interface address), the
51    /// listening interface will filter out all multicast packets received,
52    /// as their target IP would be the one of the multicast group
53    /// and it will not match the socket's bound IP.
54    ///
55    /// Note that this setting will only work if the source's address
56    /// is an IPv4 address (IPv6 and systemd file descriptor as source's address are not supported
57    /// with multicast groups).
58    #[serde(default)]
59    #[configurable(metadata(docs::examples = "['224.0.0.2', '224.0.0.4']"))]
60    pub(super) multicast_groups: Vec<Ipv4Addr>,
61
62    /// The maximum buffer size of incoming messages.
63    ///
64    /// Messages larger than this are truncated.
65    #[serde(default = "default_max_length")]
66    #[configurable(metadata(docs::type_unit = "bytes"))]
67    pub(super) max_length: usize,
68
69    /// Overrides the name of the log field used to add the peer host to each event.
70    ///
71    /// The value will be the peer host's address, including the port i.e. `1.2.3.4:9000`.
72    ///
73    /// By default, the [global `log_schema.host_key` option][global_host_key] is used.
74    ///
75    /// Set to `""` to suppress this key.
76    ///
77    /// [global_host_key]: https://vector.dev/docs/reference/configuration/global-options/#log_schema.host_key
78    host_key: Option<OptionalValuePath>,
79
80    /// Overrides the name of the log field used to add the peer host's port to each event.
81    ///
82    /// The value will be the peer host's port i.e. `9000`.
83    ///
84    /// By default, `"port"` is used.
85    ///
86    /// Set to `""` to suppress this key.
87    #[serde(default = "default_port_key")]
88    port_key: OptionalValuePath,
89
90    /// The size of the receive buffer used for the listening socket.
91    #[configurable(metadata(docs::type_unit = "bytes"))]
92    receive_buffer_bytes: Option<usize>,
93
94    #[configurable(derived)]
95    pub(super) framing: Option<FramingConfig>,
96
97    #[configurable(derived)]
98    #[serde(default = "default_decoding")]
99    pub(super) decoding: DeserializerConfig,
100
101    /// The namespace to use for logs. This overrides the global setting.
102    #[serde(default)]
103    #[configurable(metadata(docs::hidden))]
104    pub log_namespace: Option<bool>,
105}
106
107fn default_port_key() -> OptionalValuePath {
108    OptionalValuePath::from(owned_value_path!("port"))
109}
110
111fn default_max_length() -> usize {
112    crate::serde::default_max_length()
113}
114
115impl UdpConfig {
116    pub(super) fn host_key(&self) -> OptionalValuePath {
117        self.host_key.clone().unwrap_or(default_host_key())
118    }
119
120    pub const fn port_key(&self) -> &OptionalValuePath {
121        &self.port_key
122    }
123
124    pub(super) const fn framing(&self) -> &Option<FramingConfig> {
125        &self.framing
126    }
127
128    pub(super) const fn decoding(&self) -> &DeserializerConfig {
129        &self.decoding
130    }
131
132    pub(super) const fn address(&self) -> SocketListenAddr {
133        self.address
134    }
135
136    pub fn from_address(address: SocketListenAddr) -> Self {
137        Self {
138            address,
139            multicast_groups: Vec::new(),
140            max_length: default_max_length(),
141            host_key: None,
142            port_key: default_port_key(),
143            receive_buffer_bytes: None,
144            framing: None,
145            decoding: default_decoding(),
146            log_namespace: None,
147        }
148    }
149
150    pub const fn set_log_namespace(&mut self, val: Option<bool>) -> &mut Self {
151        self.log_namespace = val;
152        self
153    }
154}
155
156pub(super) fn udp(
157    config: UdpConfig,
158    decoder: Decoder,
159    mut shutdown: ShutdownSignal,
160    mut out: SourceSender,
161    log_namespace: LogNamespace,
162) -> Source {
163    Box::pin(async move {
164        let listenfd = ListenFd::from_env();
165        let socket = try_bind_udp_socket(config.address, listenfd)
166            .await
167            .map_err(|error| {
168                emit!(SocketBindError {
169                    mode: SocketMode::Udp,
170                    error,
171                })
172            })?;
173
174        if !config.multicast_groups.is_empty() {
175            socket.set_multicast_loop_v4(true).unwrap();
176            let listen_addr = match config.address() {
177                SocketListenAddr::SocketAddr(SocketAddr::V4(addr)) => addr,
178                SocketListenAddr::SocketAddr(SocketAddr::V6(_)) => {
179                    // We could support Ipv6 multicast with the
180                    // https://doc.rust-lang.org/std/net/struct.UdpSocket.html#method.join_multicast_v6 method
181                    // and specifying the interface index as `0`, in order to bind all interfaces.
182                    unimplemented!("IPv6 multicast is not supported")
183                }
184                SocketListenAddr::SystemdFd(_) => {
185                    unimplemented!("Multicast for systemd fd sockets is not supported")
186                }
187            };
188            for group_addr in config.multicast_groups {
189                let interface = *listen_addr.ip();
190                socket
191                    .join_multicast_v4(group_addr, interface)
192                    .map_err(|error| {
193                        emit!(SocketMulticastGroupJoinError {
194                            error,
195                            group_addr,
196                            interface,
197                        })
198                    })?;
199                info!(message = "Joined multicast group.", group = %group_addr);
200            }
201        }
202
203        if let Some(receive_buffer_bytes) = config.receive_buffer_bytes {
204            if let Err(error) = net::set_receive_buffer_size(&socket, receive_buffer_bytes) {
205                warn!(message = "Failed configuring receive buffer size on UDP socket.", %error);
206            }
207        }
208
209        let mut max_length = config.max_length;
210
211        if let Some(receive_buffer_bytes) = config.receive_buffer_bytes {
212            max_length = std::cmp::min(max_length, receive_buffer_bytes);
213        }
214
215        let bytes_received = register!(BytesReceived::from(Protocol::UDP));
216
217        info!(message = "Listening.", address = %config.address);
218        // We add 1 to the max_length in order to determine if the received data has been truncated.
219        let mut buf = BytesMut::with_capacity(max_length + 1);
220        loop {
221            buf.resize(max_length + 1, 0);
222            tokio::select! {
223                recv = socket.recv_from(&mut buf) => {
224                    let (byte_size, address) = match recv {
225                        Ok(res) => res,
226                        Err(error) => {
227                            #[cfg(windows)]
228                            if let Some(err) = error.raw_os_error() {
229                                if err == 10040 {
230                                    // 10040 is the Windows error that the Udp message has exceeded max_length
231                                    warn!(
232                                        message = "Discarding frame larger than max_length.",
233                                        max_length = max_length,
234                                        internal_log_rate_limit = true
235                                    );
236                                    continue;
237                                }
238                            }
239
240                            return Err(emit!(SocketReceiveError {
241                                mode: SocketMode::Udp,
242                                error
243                            }));
244                       }
245                    };
246
247                    bytes_received.emit(ByteSize(byte_size));
248                    let payload = buf.split_to(byte_size);
249                    let truncated = byte_size == max_length + 1;
250                    let mut stream = FramedRead::new(payload.as_ref(), decoder.clone()).peekable();
251
252                    while let Some(result) = stream.next().await {
253                        let last = Pin::new(&mut stream).peek().await.is_none();
254                        match result {
255                            Ok((mut events, _byte_size)) => {
256                                if last && truncated {
257                                    // The last event in this payload was truncated, so we want to drop it.
258                                    _ = events.pop();
259                                    warn!(
260                                        message = "Discarding frame larger than max_length.",
261                                        max_length = max_length,
262                                        internal_log_rate_limit = true
263                                    );
264                                }
265
266                                if events.is_empty() {
267                                    continue;
268                                }
269
270                                let count = events.len();
271                                emit!(SocketEventsReceived {
272                                    mode: SocketMode::Udp,
273                                    byte_size: events.estimated_json_encoded_size_of(),
274                                    count,
275                                });
276
277                                let now = Utc::now();
278
279                                for event in &mut events {
280                                    if let Event::Log(log) = event {
281                                        log_namespace.insert_standard_vector_source_metadata(
282                                            log,
283                                            SocketConfig::NAME,
284                                            now,
285                                        );
286
287                                        let legacy_host_key = config
288                                            .host_key
289                                            .clone()
290                                            .unwrap_or(default_host_key())
291                                            .path;
292
293                                        log_namespace.insert_source_metadata(
294                                            SocketConfig::NAME,
295                                            log,
296                                            legacy_host_key.as_ref().map(LegacyKey::InsertIfEmpty),
297                                            path!("host"),
298                                            address.ip().to_string()
299                                        );
300
301                                        let legacy_port_key = config.port_key.clone().path;
302
303                                        log_namespace.insert_source_metadata(
304                                            SocketConfig::NAME,
305                                            log,
306                                            legacy_port_key.as_ref().map(LegacyKey::InsertIfEmpty),
307                                            path!("port"),
308                                            address.port()
309                                        );
310                                    }
311                                }
312
313                                tokio::select!{
314                                    result = out.send_batch(events) => {
315                                        if result.is_err() {
316                                            emit!(StreamClosedError { count });
317                                            return Ok(())
318                                        }
319                                    }
320                                    _ = &mut shutdown => return Ok(()),
321                                }
322                            }
323                            Err(error) => {
324                                // Error is logged by `crate::codecs::Decoder`, no
325                                // further handling is needed here.
326                                if !error.can_continue() {
327                                    break;
328                                }
329                            }
330                        }
331                    }
332                }
333                _ = &mut shutdown => return Ok(()),
334            }
335        }
336    })
337}