vector/sources/file_descriptors/
mod.rs

1use std::io;
2
3use async_stream::stream;
4use bytes::Bytes;
5use chrono::Utc;
6use futures::{SinkExt, StreamExt, channel::mpsc, executor};
7use tokio_util::{codec::FramedRead, io::StreamReader};
8use vector_lib::{
9    EstimatedJsonEncodedSizeOf,
10    codecs::{
11        StreamDecodingError,
12        decoding::{DeserializerConfig, FramingConfig},
13    },
14    config::{LegacyKey, LogNamespace},
15    configurable::NamedComponent,
16    event::Event,
17    internal_event::{ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol},
18    lookup::{OwnedValuePath, lookup_v2::OptionalValuePath, owned_value_path, path},
19};
20use vrl::value::Kind;
21
22use crate::{
23    SourceSender,
24    codecs::{Decoder, DecodingConfig},
25    config::{SourceOutput, log_schema},
26    internal_events::{EventsReceived, FileDescriptorReadError, StreamClosedError},
27    shutdown::ShutdownSignal,
28};
29
30#[cfg(all(unix, feature = "sources-file_descriptor"))]
31pub mod file_descriptor;
32#[cfg(feature = "sources-stdin")]
33pub mod stdin;
34
35pub trait FileDescriptorConfig: NamedComponent {
36    fn host_key(&self) -> Option<OptionalValuePath>;
37    fn framing(&self) -> Option<FramingConfig>;
38    fn decoding(&self) -> DeserializerConfig;
39    fn description(&self) -> String;
40
41    fn source<R>(
42        &self,
43        reader: R,
44        shutdown: ShutdownSignal,
45        out: SourceSender,
46        log_namespace: LogNamespace,
47    ) -> crate::Result<crate::sources::Source>
48    where
49        R: Send + io::BufRead + 'static,
50    {
51        let host_key = self
52            .host_key()
53            .and_then(|k| k.path)
54            .or(log_schema().host_key().cloned());
55        let hostname = crate::get_hostname().ok();
56
57        let description = self.description();
58
59        let decoding = self.decoding();
60        let framing = self
61            .framing()
62            .unwrap_or_else(|| decoding.default_stream_framing());
63        let decoder = DecodingConfig::new(framing, decoding, log_namespace).build()?;
64
65        let (sender, receiver) = mpsc::channel(1024);
66
67        // Spawn background thread with blocking I/O to process fd.
68        //
69        // This is recommended by Tokio, as otherwise the process will not shut down
70        // until another newline is entered. See
71        // https://github.com/tokio-rs/tokio/blob/a73428252b08bf1436f12e76287acbc4600ca0e5/tokio/src/io/stdin.rs#L33-L42
72        std::thread::spawn(move || {
73            info!("Capturing {}.", description);
74            read_from_fd(reader, sender);
75        });
76
77        Ok(Box::pin(process_stream(
78            receiver,
79            decoder,
80            out,
81            shutdown,
82            host_key,
83            self.get_component_name(),
84            hostname,
85            log_namespace,
86        )))
87    }
88}
89
90type Sender = mpsc::Sender<Result<Bytes, io::Error>>;
91
92fn read_from_fd<R>(mut reader: R, mut sender: Sender)
93where
94    R: Send + io::BufRead + 'static,
95{
96    loop {
97        let (buffer, len) = match reader.fill_buf() {
98            Ok([]) => break, // EOF.
99            Ok(buffer) => (Ok(Bytes::copy_from_slice(buffer)), buffer.len()),
100            Err(error) if error.kind() == io::ErrorKind::Interrupted => continue,
101            Err(error) => (Err(error), 0),
102        };
103
104        reader.consume(len);
105
106        if executor::block_on(sender.send(buffer)).is_err() {
107            // Receiver has closed so we should shutdown.
108            break;
109        }
110    }
111}
112
113type Receiver = mpsc::Receiver<Result<Bytes, io::Error>>;
114
115#[allow(clippy::too_many_arguments)]
116async fn process_stream(
117    receiver: Receiver,
118    decoder: Decoder,
119    mut out: SourceSender,
120    shutdown: ShutdownSignal,
121    host_key: Option<OwnedValuePath>,
122    source_type: &'static str,
123    hostname: Option<String>,
124    log_namespace: LogNamespace,
125) -> Result<(), ()> {
126    let bytes_received = register!(BytesReceived::from(Protocol::NONE));
127    let events_received = register!(EventsReceived);
128    let stream = receiver.inspect(|result| {
129        if let Err(error) = result {
130            emit!(FileDescriptorReadError { error: &error });
131        }
132    });
133    let stream = StreamReader::new(stream);
134    let mut stream = FramedRead::new(stream, decoder).take_until(shutdown);
135    let mut stream = stream! {
136        while let Some(result) = stream.next().await {
137            match result {
138                Ok((events, byte_size)) => {
139                    bytes_received.emit(ByteSize(byte_size));
140                    events_received.emit(CountByteSize(
141                         events.len(),
142                         events.estimated_json_encoded_size_of(),
143                    ));
144
145                    let now = Utc::now();
146
147                    for mut event in events {
148                        match event{
149                            Event::Log(_) => {
150                                let log = event.as_mut_log();
151
152                                log_namespace.insert_standard_vector_source_metadata(
153                                    log,
154                                    source_type,
155                                    now
156                                );
157
158                                if let Some(hostname) = &hostname {
159                                    log_namespace.insert_source_metadata(
160                                        source_type,
161                                        log,
162                                        host_key.as_ref().map(LegacyKey::InsertIfEmpty),
163                                        path!("host"),
164                                        hostname.clone()
165                                    );
166                                }
167
168                                yield event;
169                            },
170                            _ => {
171                                yield event;
172                            }
173                        }
174                    }
175                }
176                Err(error) => {
177                    // Error is logged by `crate::codecs::Decoder`, no
178                    // further handling is needed here.
179                    if !error.can_continue() {
180                        break;
181                    }
182                }
183            }
184        }
185    }
186    .boxed();
187
188    match out.send_event_stream(&mut stream).await {
189        Ok(()) => {
190            debug!("Finished sending.");
191            Ok(())
192        }
193        Err(_) => {
194            let (count, _) = stream.size_hint();
195            emit!(StreamClosedError { count });
196            Err(())
197        }
198    }
199}
200
201/// Builds the `vector_lib::config::Outputs` for stdin and
202/// file_descriptor sources.
203fn outputs(
204    log_namespace: LogNamespace,
205    host_key: &Option<OptionalValuePath>,
206    decoding: &DeserializerConfig,
207    source_name: &'static str,
208) -> Vec<SourceOutput> {
209    let schema_definition = decoding
210        .schema_definition(log_namespace)
211        .with_source_metadata(
212            source_name,
213            host_key
214                .clone()
215                .map_or(log_schema().host_key().cloned(), |key| key.path)
216                .map(LegacyKey::Overwrite),
217            &owned_value_path!("host"),
218            Kind::bytes(),
219            Some("host"),
220        )
221        .with_standard_vector_source_metadata();
222
223    vec![SourceOutput::new_maybe_logs(
224        decoding.output_type(),
225        schema_definition,
226    )]
227}