vector/sources/kubernetes_logs/
util.rs

1use std::{error::Error, future::Future, time::Duration};
2
3use futures::{
4    future::{select, Either},
5    pin_mut, FutureExt, Sink,
6};
7use tokio::task::spawn_blocking;
8use vector_lib::file_source::{
9    paths_provider::PathsProvider, Checkpointer, FileServer, FileServerShutdown,
10    FileSourceInternalEvents, Line,
11};
12
13/// A tiny wrapper around a [`FileServer`] that runs it as a [`spawn_blocking`]
14/// task.
15pub async fn run_file_server<PP, E, C, S>(
16    file_server: FileServer<PP, E>,
17    chans: C,
18    shutdown: S,
19    checkpointer: Checkpointer,
20) -> Result<FileServerShutdown, tokio::task::JoinError>
21where
22    PP: PathsProvider + Send + 'static,
23    E: FileSourceInternalEvents,
24    C: Sink<Vec<Line>> + Unpin + Send + 'static,
25    <C as Sink<Vec<Line>>>::Error: Error + Send,
26    S: Future + Unpin + Send + 'static,
27    <S as Future>::Output: Clone + Send + Sync,
28{
29    let span = info_span!("file_server");
30    let join_handle = spawn_blocking(move || {
31        // These will need to be separated when this source is updated
32        // to support end-to-end acknowledgements.
33        let shutdown = shutdown.shared();
34        let shutdown2 = shutdown.clone();
35        let _enter = span.enter();
36        let result = file_server.run(chans, shutdown, shutdown2, checkpointer);
37        result.expect("file server exited with an error")
38    });
39    join_handle.await
40}
41
42pub async fn complete_with_deadline_on_signal<F, S>(
43    future: F,
44    signal: S,
45    deadline: Duration,
46) -> Result<<F as Future>::Output, tokio::time::error::Elapsed>
47where
48    F: Future,
49    S: Future<Output = ()>,
50{
51    pin_mut!(future);
52    pin_mut!(signal);
53    let future = match select(future, signal).await {
54        Either::Left((future_output, _)) => return Ok(future_output),
55        Either::Right(((), future)) => future,
56    };
57    pin_mut!(future);
58    tokio::time::timeout(deadline, future).await
59}