1use std::net::{Ipv4Addr, SocketAddr};
2
3use bytes::BytesMut;
4use chrono::Utc;
5use futures::StreamExt;
6use listenfd::ListenFd;
7use vector_lib::{
8 EstimatedJsonEncodedSizeOf,
9 codecs::{
10 DecoderFramedRead, StreamDecodingError,
11 decoding::{DeserializerConfig, FramingConfig},
12 },
13 config::{LegacyKey, LogNamespace},
14 configurable::configurable_component,
15 internal_event::{ByteSize, BytesReceived, InternalEventHandle as _, Protocol},
16 lookup::{lookup_v2::OptionalValuePath, owned_value_path, path},
17};
18
19use super::default_host_key;
20use crate::{
21 SourceSender,
22 codecs::Decoder,
23 event::Event,
24 internal_events::{
25 SocketBindError, SocketEventsReceived, SocketMode, SocketMulticastGroupJoinError,
26 SocketReceiveError, StreamClosedError,
27 },
28 net,
29 serde::default_decoding,
30 shutdown::ShutdownSignal,
31 sources::{
32 Source,
33 socket::SocketConfig,
34 util::net::{SocketListenAddr, try_bind_udp_socket},
35 },
36};
37
38#[configurable_component]
40#[serde(deny_unknown_fields)]
41#[derive(Clone, Debug)]
42pub struct UdpConfig {
43 #[configurable(derived)]
44 address: SocketListenAddr,
45
46 #[serde(default)]
58 #[configurable(metadata(docs::examples = "['224.0.0.2', '224.0.0.4']"))]
59 pub(super) multicast_groups: Vec<Ipv4Addr>,
60
61 #[serde(default = "default_max_length")]
65 #[configurable(metadata(docs::type_unit = "bytes"))]
66 pub(super) max_length: usize,
67
68 host_key: Option<OptionalValuePath>,
78
79 #[serde(default = "default_port_key")]
87 port_key: OptionalValuePath,
88
89 #[configurable(metadata(docs::type_unit = "bytes"))]
91 receive_buffer_bytes: Option<usize>,
92
93 #[configurable(derived)]
94 pub(super) framing: Option<FramingConfig>,
95
96 #[configurable(derived)]
97 #[serde(default = "default_decoding")]
98 pub(super) decoding: DeserializerConfig,
99
100 #[serde(default)]
102 #[configurable(metadata(docs::hidden))]
103 pub log_namespace: Option<bool>,
104}
105
106fn default_port_key() -> OptionalValuePath {
107 OptionalValuePath::from(owned_value_path!("port"))
108}
109
110fn default_max_length() -> usize {
111 crate::serde::default_max_length()
112}
113
114impl UdpConfig {
115 pub(super) fn host_key(&self) -> OptionalValuePath {
116 self.host_key.clone().unwrap_or(default_host_key())
117 }
118
119 pub const fn port_key(&self) -> &OptionalValuePath {
120 &self.port_key
121 }
122
123 pub(super) const fn framing(&self) -> &Option<FramingConfig> {
124 &self.framing
125 }
126
127 pub(super) const fn decoding(&self) -> &DeserializerConfig {
128 &self.decoding
129 }
130
131 pub(super) const fn address(&self) -> SocketListenAddr {
132 self.address
133 }
134
135 pub fn from_address(address: SocketListenAddr) -> Self {
136 Self {
137 address,
138 multicast_groups: Vec::new(),
139 max_length: default_max_length(),
140 host_key: None,
141 port_key: default_port_key(),
142 receive_buffer_bytes: None,
143 framing: None,
144 decoding: default_decoding(),
145 log_namespace: None,
146 }
147 }
148
149 pub const fn set_log_namespace(&mut self, val: Option<bool>) -> &mut Self {
150 self.log_namespace = val;
151 self
152 }
153}
154
155pub(super) fn udp(
156 config: UdpConfig,
157 decoder: Decoder,
158 mut shutdown: ShutdownSignal,
159 mut out: SourceSender,
160 log_namespace: LogNamespace,
161) -> Source {
162 Box::pin(async move {
163 let listenfd = ListenFd::from_env();
164 let socket = try_bind_udp_socket(config.address, listenfd)
165 .await
166 .map_err(|error| {
167 emit!(SocketBindError {
168 mode: SocketMode::Udp,
169 error,
170 })
171 })?;
172
173 if !config.multicast_groups.is_empty() {
174 socket.set_multicast_loop_v4(true).unwrap();
175 let listen_addr = match config.address() {
176 SocketListenAddr::SocketAddr(SocketAddr::V4(addr)) => addr,
177 SocketListenAddr::SocketAddr(SocketAddr::V6(_)) => {
178 unimplemented!("IPv6 multicast is not supported")
182 }
183 SocketListenAddr::SystemdFd(_) => {
184 unimplemented!("Multicast for systemd fd sockets is not supported")
185 }
186 };
187 for group_addr in config.multicast_groups {
188 let interface = *listen_addr.ip();
189 socket
190 .join_multicast_v4(group_addr, interface)
191 .map_err(|error| {
192 emit!(SocketMulticastGroupJoinError {
193 error,
194 group_addr,
195 interface,
196 })
197 })?;
198 info!(message = "Joined multicast group.", group = %group_addr);
199 }
200 }
201
202 if let Some(receive_buffer_bytes) = config.receive_buffer_bytes
203 && let Err(error) = net::set_receive_buffer_size(&socket, receive_buffer_bytes)
204 {
205 warn!(message = "Failed configuring receive buffer size on UDP socket.", %error);
206 }
207
208 let mut max_length = config.max_length;
209
210 if let Some(receive_buffer_bytes) = config.receive_buffer_bytes {
211 max_length = std::cmp::min(max_length, receive_buffer_bytes);
212 }
213
214 let bytes_received = register!(BytesReceived::from(Protocol::UDP));
215
216 info!(message = "Listening.", address = %config.address);
217 let mut buf = BytesMut::with_capacity(max_length + 1);
219 loop {
220 buf.resize(max_length + 1, 0);
221 tokio::select! {
222 recv = socket.recv_from(&mut buf) => {
223 let (byte_size, address) = match recv {
224 Ok(res) => res,
225 Err(error) => {
226 #[cfg(windows)]
227 if let Some(err) = error.raw_os_error() {
228 if err == 10040 {
229 warn!(
231 message = "Discarding frame larger than max_length.",
232 max_length = max_length
233 );
234 continue;
235 }
236 }
237
238 return Err(emit!(SocketReceiveError {
239 mode: SocketMode::Udp,
240 error
241 }));
242 }
243 };
244
245 bytes_received.emit(ByteSize(byte_size));
246 let payload = buf.split_to(byte_size);
247 let truncated = byte_size == max_length + 1;
248 let mut stream =
249 DecoderFramedRead::new(payload.as_ref(), decoder.clone()).peekable();
250
251 while let Some(result) = stream.next().await {
252 let last = Pin::new(&mut stream).peek().await.is_none();
253 match result {
254 Ok((mut events, _byte_size)) => {
255 if last && truncated {
256 _ = events.pop();
258 warn!(
259 message = "Discarding frame larger than max_length.",
260 max_length = max_length
261 );
262 }
263
264 if events.is_empty() {
265 continue;
266 }
267
268 let count = events.len();
269 emit!(SocketEventsReceived {
270 mode: SocketMode::Udp,
271 byte_size: events.estimated_json_encoded_size_of(),
272 count,
273 });
274
275 let now = Utc::now();
276
277 for event in &mut events {
278 if let Event::Log(log) = event {
279 log_namespace.insert_standard_vector_source_metadata(
280 log,
281 SocketConfig::NAME,
282 now,
283 );
284
285 let legacy_host_key = config
286 .host_key
287 .clone()
288 .unwrap_or(default_host_key())
289 .path;
290
291 log_namespace.insert_source_metadata(
292 SocketConfig::NAME,
293 log,
294 legacy_host_key.as_ref().map(LegacyKey::InsertIfEmpty),
295 path!("host"),
296 address.ip().to_string()
297 );
298
299 let legacy_port_key = config.port_key.clone().path;
300
301 log_namespace.insert_source_metadata(
302 SocketConfig::NAME,
303 log,
304 legacy_port_key.as_ref().map(LegacyKey::InsertIfEmpty),
305 path!("port"),
306 address.port()
307 );
308 }
309 }
310
311 tokio::select!{
312 result = out.send_batch(events) => {
313 if result.is_err() {
314 emit!(StreamClosedError { count });
315 return Ok(())
316 }
317 }
318 _ = &mut shutdown => return Ok(()),
319 }
320 }
321 Err(error) => {
322 if !error.can_continue() {
325 break;
326 }
327 }
328 }
329 }
330 }
331 _ = &mut shutdown => return Ok(()),
332 }
333 }
334 })
335}