vector/sources/file_descriptors/
mod.rs1use std::io;
2
3use async_stream::stream;
4use bytes::Bytes;
5use chrono::Utc;
6use futures::{SinkExt, StreamExt, channel::mpsc, executor};
7use tokio_util::{codec::FramedRead, io::StreamReader};
8use vector_lib::{
9 EstimatedJsonEncodedSizeOf,
10 codecs::{
11 StreamDecodingError,
12 decoding::{DeserializerConfig, FramingConfig},
13 },
14 config::{LegacyKey, LogNamespace},
15 configurable::NamedComponent,
16 event::Event,
17 internal_event::{ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol},
18 lookup::{OwnedValuePath, lookup_v2::OptionalValuePath, owned_value_path, path},
19};
20use vrl::value::Kind;
21
22use crate::{
23 SourceSender,
24 codecs::{Decoder, DecodingConfig},
25 config::{SourceOutput, log_schema},
26 internal_events::{EventsReceived, FileDescriptorReadError, StreamClosedError},
27 shutdown::ShutdownSignal,
28};
29
30#[cfg(all(unix, feature = "sources-file_descriptor"))]
31pub mod file_descriptor;
32#[cfg(feature = "sources-stdin")]
33pub mod stdin;
34
35pub trait FileDescriptorConfig: NamedComponent {
36 fn host_key(&self) -> Option<OptionalValuePath>;
37 fn framing(&self) -> Option<FramingConfig>;
38 fn decoding(&self) -> DeserializerConfig;
39 fn description(&self) -> String;
40
41 fn source<R>(
42 &self,
43 reader: R,
44 shutdown: ShutdownSignal,
45 out: SourceSender,
46 log_namespace: LogNamespace,
47 ) -> crate::Result<crate::sources::Source>
48 where
49 R: Send + io::BufRead + 'static,
50 {
51 let host_key = self
52 .host_key()
53 .and_then(|k| k.path)
54 .or(log_schema().host_key().cloned());
55 let hostname = crate::get_hostname().ok();
56
57 let description = self.description();
58
59 let decoding = self.decoding();
60 let framing = self
61 .framing()
62 .unwrap_or_else(|| decoding.default_stream_framing());
63 let decoder = DecodingConfig::new(framing, decoding, log_namespace).build()?;
64
65 let (sender, receiver) = mpsc::channel(1024);
66
67 std::thread::spawn(move || {
73 info!("Capturing {}.", description);
74 read_from_fd(reader, sender);
75 });
76
77 Ok(Box::pin(process_stream(
78 receiver,
79 decoder,
80 out,
81 shutdown,
82 host_key,
83 self.get_component_name(),
84 hostname,
85 log_namespace,
86 )))
87 }
88}
89
90type Sender = mpsc::Sender<Result<Bytes, io::Error>>;
91
92fn read_from_fd<R>(mut reader: R, mut sender: Sender)
93where
94 R: Send + io::BufRead + 'static,
95{
96 loop {
97 let (buffer, len) = match reader.fill_buf() {
98 Ok([]) => break, Ok(buffer) => (Ok(Bytes::copy_from_slice(buffer)), buffer.len()),
100 Err(error) if error.kind() == io::ErrorKind::Interrupted => continue,
101 Err(error) => (Err(error), 0),
102 };
103
104 reader.consume(len);
105
106 if executor::block_on(sender.send(buffer)).is_err() {
107 break;
109 }
110 }
111}
112
113type Receiver = mpsc::Receiver<Result<Bytes, io::Error>>;
114
115#[allow(clippy::too_many_arguments)]
116async fn process_stream(
117 receiver: Receiver,
118 decoder: Decoder,
119 mut out: SourceSender,
120 shutdown: ShutdownSignal,
121 host_key: Option<OwnedValuePath>,
122 source_type: &'static str,
123 hostname: Option<String>,
124 log_namespace: LogNamespace,
125) -> Result<(), ()> {
126 let bytes_received = register!(BytesReceived::from(Protocol::NONE));
127 let events_received = register!(EventsReceived);
128 let stream = receiver.inspect(|result| {
129 if let Err(error) = result {
130 emit!(FileDescriptorReadError { error: &error });
131 }
132 });
133 let stream = StreamReader::new(stream);
134 let mut stream = FramedRead::new(stream, decoder).take_until(shutdown);
135 let mut stream = stream! {
136 while let Some(result) = stream.next().await {
137 match result {
138 Ok((events, byte_size)) => {
139 bytes_received.emit(ByteSize(byte_size));
140 events_received.emit(CountByteSize(
141 events.len(),
142 events.estimated_json_encoded_size_of(),
143 ));
144
145 let now = Utc::now();
146
147 for mut event in events {
148 match event{
149 Event::Log(_) => {
150 let log = event.as_mut_log();
151
152 log_namespace.insert_standard_vector_source_metadata(
153 log,
154 source_type,
155 now
156 );
157
158 if let Some(hostname) = &hostname {
159 log_namespace.insert_source_metadata(
160 source_type,
161 log,
162 host_key.as_ref().map(LegacyKey::InsertIfEmpty),
163 path!("host"),
164 hostname.clone()
165 );
166 }
167
168 yield event;
169 },
170 _ => {
171 yield event;
172 }
173 }
174 }
175 }
176 Err(error) => {
177 if !error.can_continue() {
180 break;
181 }
182 }
183 }
184 }
185 }
186 .boxed();
187
188 match out.send_event_stream(&mut stream).await {
189 Ok(()) => {
190 debug!("Finished sending.");
191 Ok(())
192 }
193 Err(_) => {
194 let (count, _) = stream.size_hint();
195 emit!(StreamClosedError { count });
196 Err(())
197 }
198 }
199}
200
201fn outputs(
204 log_namespace: LogNamespace,
205 host_key: &Option<OptionalValuePath>,
206 decoding: &DeserializerConfig,
207 source_name: &'static str,
208) -> Vec<SourceOutput> {
209 let schema_definition = decoding
210 .schema_definition(log_namespace)
211 .with_source_metadata(
212 source_name,
213 host_key
214 .clone()
215 .map_or(log_schema().host_key().cloned(), |key| key.path)
216 .map(LegacyKey::Overwrite),
217 &owned_value_path!("host"),
218 Kind::bytes(),
219 Some("host"),
220 )
221 .with_standard_vector_source_metadata();
222
223 vec![SourceOutput::new_maybe_logs(
224 decoding.output_type(),
225 schema_definition,
226 )]
227}