1use std::net::{Ipv4Addr, SocketAddr};
2
3use super::default_host_key;
4use bytes::BytesMut;
5use chrono::Utc;
6use futures::StreamExt;
7use listenfd::ListenFd;
8use tokio_util::codec::FramedRead;
9use vector_lib::codecs::{
10 decoding::{DeserializerConfig, FramingConfig},
11 StreamDecodingError,
12};
13use vector_lib::configurable::configurable_component;
14use vector_lib::internal_event::{ByteSize, BytesReceived, InternalEventHandle as _, Protocol};
15use vector_lib::lookup::{lookup_v2::OptionalValuePath, owned_value_path, path};
16use vector_lib::{
17 config::{LegacyKey, LogNamespace},
18 EstimatedJsonEncodedSizeOf,
19};
20
21use crate::{
22 codecs::Decoder,
23 event::Event,
24 internal_events::{
25 SocketBindError, SocketEventsReceived, SocketMode, SocketMulticastGroupJoinError,
26 SocketReceiveError, StreamClosedError,
27 },
28 net,
29 serde::default_decoding,
30 shutdown::ShutdownSignal,
31 sources::{
32 socket::SocketConfig,
33 util::net::{try_bind_udp_socket, SocketListenAddr},
34 Source,
35 },
36 SourceSender,
37};
38
39#[configurable_component]
41#[serde(deny_unknown_fields)]
42#[derive(Clone, Debug)]
43pub struct UdpConfig {
44 #[configurable(derived)]
45 address: SocketListenAddr,
46
47 #[serde(default)]
59 #[configurable(metadata(docs::examples = "['224.0.0.2', '224.0.0.4']"))]
60 pub(super) multicast_groups: Vec<Ipv4Addr>,
61
62 #[serde(default = "default_max_length")]
66 #[configurable(metadata(docs::type_unit = "bytes"))]
67 pub(super) max_length: usize,
68
69 host_key: Option<OptionalValuePath>,
79
80 #[serde(default = "default_port_key")]
88 port_key: OptionalValuePath,
89
90 #[configurable(metadata(docs::type_unit = "bytes"))]
92 receive_buffer_bytes: Option<usize>,
93
94 #[configurable(derived)]
95 pub(super) framing: Option<FramingConfig>,
96
97 #[configurable(derived)]
98 #[serde(default = "default_decoding")]
99 pub(super) decoding: DeserializerConfig,
100
101 #[serde(default)]
103 #[configurable(metadata(docs::hidden))]
104 pub log_namespace: Option<bool>,
105}
106
107fn default_port_key() -> OptionalValuePath {
108 OptionalValuePath::from(owned_value_path!("port"))
109}
110
111fn default_max_length() -> usize {
112 crate::serde::default_max_length()
113}
114
115impl UdpConfig {
116 pub(super) fn host_key(&self) -> OptionalValuePath {
117 self.host_key.clone().unwrap_or(default_host_key())
118 }
119
120 pub const fn port_key(&self) -> &OptionalValuePath {
121 &self.port_key
122 }
123
124 pub(super) const fn framing(&self) -> &Option<FramingConfig> {
125 &self.framing
126 }
127
128 pub(super) const fn decoding(&self) -> &DeserializerConfig {
129 &self.decoding
130 }
131
132 pub(super) const fn address(&self) -> SocketListenAddr {
133 self.address
134 }
135
136 pub fn from_address(address: SocketListenAddr) -> Self {
137 Self {
138 address,
139 multicast_groups: Vec::new(),
140 max_length: default_max_length(),
141 host_key: None,
142 port_key: default_port_key(),
143 receive_buffer_bytes: None,
144 framing: None,
145 decoding: default_decoding(),
146 log_namespace: None,
147 }
148 }
149
150 pub const fn set_log_namespace(&mut self, val: Option<bool>) -> &mut Self {
151 self.log_namespace = val;
152 self
153 }
154}
155
156pub(super) fn udp(
157 config: UdpConfig,
158 decoder: Decoder,
159 mut shutdown: ShutdownSignal,
160 mut out: SourceSender,
161 log_namespace: LogNamespace,
162) -> Source {
163 Box::pin(async move {
164 let listenfd = ListenFd::from_env();
165 let socket = try_bind_udp_socket(config.address, listenfd)
166 .await
167 .map_err(|error| {
168 emit!(SocketBindError {
169 mode: SocketMode::Udp,
170 error,
171 })
172 })?;
173
174 if !config.multicast_groups.is_empty() {
175 socket.set_multicast_loop_v4(true).unwrap();
176 let listen_addr = match config.address() {
177 SocketListenAddr::SocketAddr(SocketAddr::V4(addr)) => addr,
178 SocketListenAddr::SocketAddr(SocketAddr::V6(_)) => {
179 unimplemented!("IPv6 multicast is not supported")
183 }
184 SocketListenAddr::SystemdFd(_) => {
185 unimplemented!("Multicast for systemd fd sockets is not supported")
186 }
187 };
188 for group_addr in config.multicast_groups {
189 let interface = *listen_addr.ip();
190 socket
191 .join_multicast_v4(group_addr, interface)
192 .map_err(|error| {
193 emit!(SocketMulticastGroupJoinError {
194 error,
195 group_addr,
196 interface,
197 })
198 })?;
199 info!(message = "Joined multicast group.", group = %group_addr);
200 }
201 }
202
203 if let Some(receive_buffer_bytes) = config.receive_buffer_bytes {
204 if let Err(error) = net::set_receive_buffer_size(&socket, receive_buffer_bytes) {
205 warn!(message = "Failed configuring receive buffer size on UDP socket.", %error);
206 }
207 }
208
209 let mut max_length = config.max_length;
210
211 if let Some(receive_buffer_bytes) = config.receive_buffer_bytes {
212 max_length = std::cmp::min(max_length, receive_buffer_bytes);
213 }
214
215 let bytes_received = register!(BytesReceived::from(Protocol::UDP));
216
217 info!(message = "Listening.", address = %config.address);
218 let mut buf = BytesMut::with_capacity(max_length + 1);
220 loop {
221 buf.resize(max_length + 1, 0);
222 tokio::select! {
223 recv = socket.recv_from(&mut buf) => {
224 let (byte_size, address) = match recv {
225 Ok(res) => res,
226 Err(error) => {
227 #[cfg(windows)]
228 if let Some(err) = error.raw_os_error() {
229 if err == 10040 {
230 warn!(
232 message = "Discarding frame larger than max_length.",
233 max_length = max_length,
234 internal_log_rate_limit = true
235 );
236 continue;
237 }
238 }
239
240 return Err(emit!(SocketReceiveError {
241 mode: SocketMode::Udp,
242 error
243 }));
244 }
245 };
246
247 bytes_received.emit(ByteSize(byte_size));
248 let payload = buf.split_to(byte_size);
249 let truncated = byte_size == max_length + 1;
250 let mut stream = FramedRead::new(payload.as_ref(), decoder.clone()).peekable();
251
252 while let Some(result) = stream.next().await {
253 let last = Pin::new(&mut stream).peek().await.is_none();
254 match result {
255 Ok((mut events, _byte_size)) => {
256 if last && truncated {
257 _ = events.pop();
259 warn!(
260 message = "Discarding frame larger than max_length.",
261 max_length = max_length,
262 internal_log_rate_limit = true
263 );
264 }
265
266 if events.is_empty() {
267 continue;
268 }
269
270 let count = events.len();
271 emit!(SocketEventsReceived {
272 mode: SocketMode::Udp,
273 byte_size: events.estimated_json_encoded_size_of(),
274 count,
275 });
276
277 let now = Utc::now();
278
279 for event in &mut events {
280 if let Event::Log(log) = event {
281 log_namespace.insert_standard_vector_source_metadata(
282 log,
283 SocketConfig::NAME,
284 now,
285 );
286
287 let legacy_host_key = config
288 .host_key
289 .clone()
290 .unwrap_or(default_host_key())
291 .path;
292
293 log_namespace.insert_source_metadata(
294 SocketConfig::NAME,
295 log,
296 legacy_host_key.as_ref().map(LegacyKey::InsertIfEmpty),
297 path!("host"),
298 address.ip().to_string()
299 );
300
301 let legacy_port_key = config.port_key.clone().path;
302
303 log_namespace.insert_source_metadata(
304 SocketConfig::NAME,
305 log,
306 legacy_port_key.as_ref().map(LegacyKey::InsertIfEmpty),
307 path!("port"),
308 address.port()
309 );
310 }
311 }
312
313 tokio::select!{
314 result = out.send_batch(events) => {
315 if result.is_err() {
316 emit!(StreamClosedError { count });
317 return Ok(())
318 }
319 }
320 _ = &mut shutdown => return Ok(()),
321 }
322 }
323 Err(error) => {
324 if !error.can_continue() {
327 break;
328 }
329 }
330 }
331 }
332 }
333 _ = &mut shutdown => return Ok(()),
334 }
335 }
336 })
337}