1use std::net::{Ipv4Addr, SocketAddr};
2
3use bytes::BytesMut;
4use chrono::Utc;
5use futures::StreamExt;
6use listenfd::ListenFd;
7use tokio_util::codec::FramedRead;
8use vector_lib::{
9 EstimatedJsonEncodedSizeOf,
10 codecs::{
11 StreamDecodingError,
12 decoding::{DeserializerConfig, FramingConfig},
13 },
14 config::{LegacyKey, LogNamespace},
15 configurable::configurable_component,
16 internal_event::{ByteSize, BytesReceived, InternalEventHandle as _, Protocol},
17 lookup::{lookup_v2::OptionalValuePath, owned_value_path, path},
18};
19
20use super::default_host_key;
21use crate::{
22 SourceSender,
23 codecs::Decoder,
24 event::Event,
25 internal_events::{
26 SocketBindError, SocketEventsReceived, SocketMode, SocketMulticastGroupJoinError,
27 SocketReceiveError, StreamClosedError,
28 },
29 net,
30 serde::default_decoding,
31 shutdown::ShutdownSignal,
32 sources::{
33 Source,
34 socket::SocketConfig,
35 util::net::{SocketListenAddr, try_bind_udp_socket},
36 },
37};
38
39#[configurable_component]
41#[serde(deny_unknown_fields)]
42#[derive(Clone, Debug)]
43pub struct UdpConfig {
44 #[configurable(derived)]
45 address: SocketListenAddr,
46
47 #[serde(default)]
59 #[configurable(metadata(docs::examples = "['224.0.0.2', '224.0.0.4']"))]
60 pub(super) multicast_groups: Vec<Ipv4Addr>,
61
62 #[serde(default = "default_max_length")]
66 #[configurable(metadata(docs::type_unit = "bytes"))]
67 pub(super) max_length: usize,
68
69 host_key: Option<OptionalValuePath>,
79
80 #[serde(default = "default_port_key")]
88 port_key: OptionalValuePath,
89
90 #[configurable(metadata(docs::type_unit = "bytes"))]
92 receive_buffer_bytes: Option<usize>,
93
94 #[configurable(derived)]
95 pub(super) framing: Option<FramingConfig>,
96
97 #[configurable(derived)]
98 #[serde(default = "default_decoding")]
99 pub(super) decoding: DeserializerConfig,
100
101 #[serde(default)]
103 #[configurable(metadata(docs::hidden))]
104 pub log_namespace: Option<bool>,
105}
106
107fn default_port_key() -> OptionalValuePath {
108 OptionalValuePath::from(owned_value_path!("port"))
109}
110
111fn default_max_length() -> usize {
112 crate::serde::default_max_length()
113}
114
115impl UdpConfig {
116 pub(super) fn host_key(&self) -> OptionalValuePath {
117 self.host_key.clone().unwrap_or(default_host_key())
118 }
119
120 pub const fn port_key(&self) -> &OptionalValuePath {
121 &self.port_key
122 }
123
124 pub(super) const fn framing(&self) -> &Option<FramingConfig> {
125 &self.framing
126 }
127
128 pub(super) const fn decoding(&self) -> &DeserializerConfig {
129 &self.decoding
130 }
131
132 pub(super) const fn address(&self) -> SocketListenAddr {
133 self.address
134 }
135
136 pub fn from_address(address: SocketListenAddr) -> Self {
137 Self {
138 address,
139 multicast_groups: Vec::new(),
140 max_length: default_max_length(),
141 host_key: None,
142 port_key: default_port_key(),
143 receive_buffer_bytes: None,
144 framing: None,
145 decoding: default_decoding(),
146 log_namespace: None,
147 }
148 }
149
150 pub const fn set_log_namespace(&mut self, val: Option<bool>) -> &mut Self {
151 self.log_namespace = val;
152 self
153 }
154}
155
156pub(super) fn udp(
157 config: UdpConfig,
158 decoder: Decoder,
159 mut shutdown: ShutdownSignal,
160 mut out: SourceSender,
161 log_namespace: LogNamespace,
162) -> Source {
163 Box::pin(async move {
164 let listenfd = ListenFd::from_env();
165 let socket = try_bind_udp_socket(config.address, listenfd)
166 .await
167 .map_err(|error| {
168 emit!(SocketBindError {
169 mode: SocketMode::Udp,
170 error,
171 })
172 })?;
173
174 if !config.multicast_groups.is_empty() {
175 socket.set_multicast_loop_v4(true).unwrap();
176 let listen_addr = match config.address() {
177 SocketListenAddr::SocketAddr(SocketAddr::V4(addr)) => addr,
178 SocketListenAddr::SocketAddr(SocketAddr::V6(_)) => {
179 unimplemented!("IPv6 multicast is not supported")
183 }
184 SocketListenAddr::SystemdFd(_) => {
185 unimplemented!("Multicast for systemd fd sockets is not supported")
186 }
187 };
188 for group_addr in config.multicast_groups {
189 let interface = *listen_addr.ip();
190 socket
191 .join_multicast_v4(group_addr, interface)
192 .map_err(|error| {
193 emit!(SocketMulticastGroupJoinError {
194 error,
195 group_addr,
196 interface,
197 })
198 })?;
199 info!(message = "Joined multicast group.", group = %group_addr);
200 }
201 }
202
203 if let Some(receive_buffer_bytes) = config.receive_buffer_bytes
204 && let Err(error) = net::set_receive_buffer_size(&socket, receive_buffer_bytes)
205 {
206 warn!(message = "Failed configuring receive buffer size on UDP socket.", %error);
207 }
208
209 let mut max_length = config.max_length;
210
211 if let Some(receive_buffer_bytes) = config.receive_buffer_bytes {
212 max_length = std::cmp::min(max_length, receive_buffer_bytes);
213 }
214
215 let bytes_received = register!(BytesReceived::from(Protocol::UDP));
216
217 info!(message = "Listening.", address = %config.address);
218 let mut buf = BytesMut::with_capacity(max_length + 1);
220 loop {
221 buf.resize(max_length + 1, 0);
222 tokio::select! {
223 recv = socket.recv_from(&mut buf) => {
224 let (byte_size, address) = match recv {
225 Ok(res) => res,
226 Err(error) => {
227 #[cfg(windows)]
228 if let Some(err) = error.raw_os_error() {
229 if err == 10040 {
230 warn!(
232 message = "Discarding frame larger than max_length.",
233 max_length = max_length
234 );
235 continue;
236 }
237 }
238
239 return Err(emit!(SocketReceiveError {
240 mode: SocketMode::Udp,
241 error
242 }));
243 }
244 };
245
246 bytes_received.emit(ByteSize(byte_size));
247 let payload = buf.split_to(byte_size);
248 let truncated = byte_size == max_length + 1;
249 let mut stream = FramedRead::new(payload.as_ref(), decoder.clone()).peekable();
250
251 while let Some(result) = stream.next().await {
252 let last = Pin::new(&mut stream).peek().await.is_none();
253 match result {
254 Ok((mut events, _byte_size)) => {
255 if last && truncated {
256 _ = events.pop();
258 warn!(
259 message = "Discarding frame larger than max_length.",
260 max_length = max_length
261 );
262 }
263
264 if events.is_empty() {
265 continue;
266 }
267
268 let count = events.len();
269 emit!(SocketEventsReceived {
270 mode: SocketMode::Udp,
271 byte_size: events.estimated_json_encoded_size_of(),
272 count,
273 });
274
275 let now = Utc::now();
276
277 for event in &mut events {
278 if let Event::Log(log) = event {
279 log_namespace.insert_standard_vector_source_metadata(
280 log,
281 SocketConfig::NAME,
282 now,
283 );
284
285 let legacy_host_key = config
286 .host_key
287 .clone()
288 .unwrap_or(default_host_key())
289 .path;
290
291 log_namespace.insert_source_metadata(
292 SocketConfig::NAME,
293 log,
294 legacy_host_key.as_ref().map(LegacyKey::InsertIfEmpty),
295 path!("host"),
296 address.ip().to_string()
297 );
298
299 let legacy_port_key = config.port_key.clone().path;
300
301 log_namespace.insert_source_metadata(
302 SocketConfig::NAME,
303 log,
304 legacy_port_key.as_ref().map(LegacyKey::InsertIfEmpty),
305 path!("port"),
306 address.port()
307 );
308 }
309 }
310
311 tokio::select!{
312 result = out.send_batch(events) => {
313 if result.is_err() {
314 emit!(StreamClosedError { count });
315 return Ok(())
316 }
317 }
318 _ = &mut shutdown => return Ok(()),
319 }
320 }
321 Err(error) => {
322 if !error.can_continue() {
325 break;
326 }
327 }
328 }
329 }
330 }
331 _ = &mut shutdown => return Ok(()),
332 }
333 }
334 })
335}