vector/sources/file_descriptors/
mod.rs

1use std::io;
2
3use async_stream::stream;
4use bytes::Bytes;
5use chrono::Utc;
6use futures::{channel::mpsc, executor, SinkExt, StreamExt};
7use tokio_util::{codec::FramedRead, io::StreamReader};
8use vector_lib::codecs::{
9    decoding::{DeserializerConfig, FramingConfig},
10    StreamDecodingError,
11};
12use vector_lib::configurable::NamedComponent;
13use vector_lib::internal_event::{
14    ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol,
15};
16use vector_lib::lookup::{lookup_v2::OptionalValuePath, owned_value_path, path, OwnedValuePath};
17use vector_lib::{
18    config::{LegacyKey, LogNamespace},
19    event::Event,
20    EstimatedJsonEncodedSizeOf,
21};
22use vrl::value::Kind;
23
24use crate::{
25    codecs::{Decoder, DecodingConfig},
26    config::{log_schema, SourceOutput},
27    internal_events::{EventsReceived, FileDescriptorReadError, StreamClosedError},
28    shutdown::ShutdownSignal,
29    SourceSender,
30};
31
32#[cfg(all(unix, feature = "sources-file_descriptor"))]
33pub mod file_descriptor;
34#[cfg(feature = "sources-stdin")]
35pub mod stdin;
36
37pub trait FileDescriptorConfig: NamedComponent {
38    fn host_key(&self) -> Option<OptionalValuePath>;
39    fn framing(&self) -> Option<FramingConfig>;
40    fn decoding(&self) -> DeserializerConfig;
41    fn description(&self) -> String;
42
43    fn source<R>(
44        &self,
45        reader: R,
46        shutdown: ShutdownSignal,
47        out: SourceSender,
48        log_namespace: LogNamespace,
49    ) -> crate::Result<crate::sources::Source>
50    where
51        R: Send + io::BufRead + 'static,
52    {
53        let host_key = self
54            .host_key()
55            .and_then(|k| k.path)
56            .or(log_schema().host_key().cloned());
57        let hostname = crate::get_hostname().ok();
58
59        let description = self.description();
60
61        let decoding = self.decoding();
62        let framing = self
63            .framing()
64            .unwrap_or_else(|| decoding.default_stream_framing());
65        let decoder = DecodingConfig::new(framing, decoding, log_namespace).build()?;
66
67        let (sender, receiver) = mpsc::channel(1024);
68
69        // Spawn background thread with blocking I/O to process fd.
70        //
71        // This is recommended by Tokio, as otherwise the process will not shut down
72        // until another newline is entered. See
73        // https://github.com/tokio-rs/tokio/blob/a73428252b08bf1436f12e76287acbc4600ca0e5/tokio/src/io/stdin.rs#L33-L42
74        std::thread::spawn(move || {
75            info!("Capturing {}.", description);
76            read_from_fd(reader, sender);
77        });
78
79        Ok(Box::pin(process_stream(
80            receiver,
81            decoder,
82            out,
83            shutdown,
84            host_key,
85            self.get_component_name(),
86            hostname,
87            log_namespace,
88        )))
89    }
90}
91
92type Sender = mpsc::Sender<Result<Bytes, io::Error>>;
93
94fn read_from_fd<R>(mut reader: R, mut sender: Sender)
95where
96    R: Send + io::BufRead + 'static,
97{
98    loop {
99        let (buffer, len) = match reader.fill_buf() {
100            Ok([]) => break, // EOF.
101            Ok(buffer) => (Ok(Bytes::copy_from_slice(buffer)), buffer.len()),
102            Err(error) if error.kind() == io::ErrorKind::Interrupted => continue,
103            Err(error) => (Err(error), 0),
104        };
105
106        reader.consume(len);
107
108        if executor::block_on(sender.send(buffer)).is_err() {
109            // Receiver has closed so we should shutdown.
110            break;
111        }
112    }
113}
114
115type Receiver = mpsc::Receiver<Result<Bytes, io::Error>>;
116
117#[allow(clippy::too_many_arguments)]
118async fn process_stream(
119    receiver: Receiver,
120    decoder: Decoder,
121    mut out: SourceSender,
122    shutdown: ShutdownSignal,
123    host_key: Option<OwnedValuePath>,
124    source_type: &'static str,
125    hostname: Option<String>,
126    log_namespace: LogNamespace,
127) -> Result<(), ()> {
128    let bytes_received = register!(BytesReceived::from(Protocol::NONE));
129    let events_received = register!(EventsReceived);
130    let stream = receiver.inspect(|result| {
131        if let Err(error) = result {
132            emit!(FileDescriptorReadError { error: &error });
133        }
134    });
135    let stream = StreamReader::new(stream);
136    let mut stream = FramedRead::new(stream, decoder).take_until(shutdown);
137    let mut stream = stream! {
138        while let Some(result) = stream.next().await {
139            match result {
140                Ok((events, byte_size)) => {
141                    bytes_received.emit(ByteSize(byte_size));
142                    events_received.emit(CountByteSize(
143                         events.len(),
144                         events.estimated_json_encoded_size_of(),
145                    ));
146
147                    let now = Utc::now();
148
149                    for mut event in events {
150                        match event{
151                            Event::Log(_) => {
152                                let log = event.as_mut_log();
153
154                                log_namespace.insert_standard_vector_source_metadata(
155                                    log,
156                                    source_type,
157                                    now
158                                );
159
160                                if let Some(hostname) = &hostname {
161                                    log_namespace.insert_source_metadata(
162                                        source_type,
163                                        log,
164                                        host_key.as_ref().map(LegacyKey::InsertIfEmpty),
165                                        path!("host"),
166                                        hostname.clone()
167                                    );
168                                }
169
170                                yield event;
171                            },
172                            _ => {
173                                yield event;
174                            }
175                        }
176                    }
177                }
178                Err(error) => {
179                    // Error is logged by `crate::codecs::Decoder`, no
180                    // further handling is needed here.
181                    if !error.can_continue() {
182                        break;
183                    }
184                }
185            }
186        }
187    }
188    .boxed();
189
190    match out.send_event_stream(&mut stream).await {
191        Ok(()) => {
192            debug!("Finished sending.");
193            Ok(())
194        }
195        Err(_) => {
196            let (count, _) = stream.size_hint();
197            emit!(StreamClosedError { count });
198            Err(())
199        }
200    }
201}
202
203/// Builds the `vector_lib::config::Outputs` for stdin and
204/// file_descriptor sources.
205fn outputs(
206    log_namespace: LogNamespace,
207    host_key: &Option<OptionalValuePath>,
208    decoding: &DeserializerConfig,
209    source_name: &'static str,
210) -> Vec<SourceOutput> {
211    let schema_definition = decoding
212        .schema_definition(log_namespace)
213        .with_source_metadata(
214            source_name,
215            host_key
216                .clone()
217                .map_or(log_schema().host_key().cloned(), |key| key.path)
218                .map(LegacyKey::Overwrite),
219            &owned_value_path!("host"),
220            Kind::bytes(),
221            Some("host"),
222        )
223        .with_standard_vector_source_metadata();
224
225    vec![SourceOutput::new_maybe_logs(
226        decoding.output_type(),
227        schema_definition,
228    )]
229}