vector/sources/file_descriptors/
mod.rs1use std::io;
2
3use async_stream::stream;
4use bytes::Bytes;
5use chrono::Utc;
6use futures::{channel::mpsc, executor, SinkExt, StreamExt};
7use tokio_util::{codec::FramedRead, io::StreamReader};
8use vector_lib::codecs::{
9 decoding::{DeserializerConfig, FramingConfig},
10 StreamDecodingError,
11};
12use vector_lib::configurable::NamedComponent;
13use vector_lib::internal_event::{
14 ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol,
15};
16use vector_lib::lookup::{lookup_v2::OptionalValuePath, owned_value_path, path, OwnedValuePath};
17use vector_lib::{
18 config::{LegacyKey, LogNamespace},
19 event::Event,
20 EstimatedJsonEncodedSizeOf,
21};
22use vrl::value::Kind;
23
24use crate::{
25 codecs::{Decoder, DecodingConfig},
26 config::{log_schema, SourceOutput},
27 internal_events::{EventsReceived, FileDescriptorReadError, StreamClosedError},
28 shutdown::ShutdownSignal,
29 SourceSender,
30};
31
32#[cfg(all(unix, feature = "sources-file_descriptor"))]
33pub mod file_descriptor;
34#[cfg(feature = "sources-stdin")]
35pub mod stdin;
36
37pub trait FileDescriptorConfig: NamedComponent {
38 fn host_key(&self) -> Option<OptionalValuePath>;
39 fn framing(&self) -> Option<FramingConfig>;
40 fn decoding(&self) -> DeserializerConfig;
41 fn description(&self) -> String;
42
43 fn source<R>(
44 &self,
45 reader: R,
46 shutdown: ShutdownSignal,
47 out: SourceSender,
48 log_namespace: LogNamespace,
49 ) -> crate::Result<crate::sources::Source>
50 where
51 R: Send + io::BufRead + 'static,
52 {
53 let host_key = self
54 .host_key()
55 .and_then(|k| k.path)
56 .or(log_schema().host_key().cloned());
57 let hostname = crate::get_hostname().ok();
58
59 let description = self.description();
60
61 let decoding = self.decoding();
62 let framing = self
63 .framing()
64 .unwrap_or_else(|| decoding.default_stream_framing());
65 let decoder = DecodingConfig::new(framing, decoding, log_namespace).build()?;
66
67 let (sender, receiver) = mpsc::channel(1024);
68
69 std::thread::spawn(move || {
75 info!("Capturing {}.", description);
76 read_from_fd(reader, sender);
77 });
78
79 Ok(Box::pin(process_stream(
80 receiver,
81 decoder,
82 out,
83 shutdown,
84 host_key,
85 self.get_component_name(),
86 hostname,
87 log_namespace,
88 )))
89 }
90}
91
92type Sender = mpsc::Sender<Result<Bytes, io::Error>>;
93
94fn read_from_fd<R>(mut reader: R, mut sender: Sender)
95where
96 R: Send + io::BufRead + 'static,
97{
98 loop {
99 let (buffer, len) = match reader.fill_buf() {
100 Ok([]) => break, Ok(buffer) => (Ok(Bytes::copy_from_slice(buffer)), buffer.len()),
102 Err(error) if error.kind() == io::ErrorKind::Interrupted => continue,
103 Err(error) => (Err(error), 0),
104 };
105
106 reader.consume(len);
107
108 if executor::block_on(sender.send(buffer)).is_err() {
109 break;
111 }
112 }
113}
114
115type Receiver = mpsc::Receiver<Result<Bytes, io::Error>>;
116
117#[allow(clippy::too_many_arguments)]
118async fn process_stream(
119 receiver: Receiver,
120 decoder: Decoder,
121 mut out: SourceSender,
122 shutdown: ShutdownSignal,
123 host_key: Option<OwnedValuePath>,
124 source_type: &'static str,
125 hostname: Option<String>,
126 log_namespace: LogNamespace,
127) -> Result<(), ()> {
128 let bytes_received = register!(BytesReceived::from(Protocol::NONE));
129 let events_received = register!(EventsReceived);
130 let stream = receiver.inspect(|result| {
131 if let Err(error) = result {
132 emit!(FileDescriptorReadError { error: &error });
133 }
134 });
135 let stream = StreamReader::new(stream);
136 let mut stream = FramedRead::new(stream, decoder).take_until(shutdown);
137 let mut stream = stream! {
138 while let Some(result) = stream.next().await {
139 match result {
140 Ok((events, byte_size)) => {
141 bytes_received.emit(ByteSize(byte_size));
142 events_received.emit(CountByteSize(
143 events.len(),
144 events.estimated_json_encoded_size_of(),
145 ));
146
147 let now = Utc::now();
148
149 for mut event in events {
150 match event{
151 Event::Log(_) => {
152 let log = event.as_mut_log();
153
154 log_namespace.insert_standard_vector_source_metadata(
155 log,
156 source_type,
157 now
158 );
159
160 if let Some(hostname) = &hostname {
161 log_namespace.insert_source_metadata(
162 source_type,
163 log,
164 host_key.as_ref().map(LegacyKey::InsertIfEmpty),
165 path!("host"),
166 hostname.clone()
167 );
168 }
169
170 yield event;
171 },
172 _ => {
173 yield event;
174 }
175 }
176 }
177 }
178 Err(error) => {
179 if !error.can_continue() {
182 break;
183 }
184 }
185 }
186 }
187 }
188 .boxed();
189
190 match out.send_event_stream(&mut stream).await {
191 Ok(()) => {
192 debug!("Finished sending.");
193 Ok(())
194 }
195 Err(_) => {
196 let (count, _) = stream.size_hint();
197 emit!(StreamClosedError { count });
198 Err(())
199 }
200 }
201}
202
203fn outputs(
206 log_namespace: LogNamespace,
207 host_key: &Option<OptionalValuePath>,
208 decoding: &DeserializerConfig,
209 source_name: &'static str,
210) -> Vec<SourceOutput> {
211 let schema_definition = decoding
212 .schema_definition(log_namespace)
213 .with_source_metadata(
214 source_name,
215 host_key
216 .clone()
217 .map_or(log_schema().host_key().cloned(), |key| key.path)
218 .map(LegacyKey::Overwrite),
219 &owned_value_path!("host"),
220 Kind::bytes(),
221 Some("host"),
222 )
223 .with_standard_vector_source_metadata();
224
225 vec![SourceOutput::new_maybe_logs(
226 decoding.output_type(),
227 schema_definition,
228 )]
229}