vector/sources/kubernetes_logs/
util.rs1use std::{error::Error, future::Future, time::Duration};
2
3use futures::{
4 future::{select, Either},
5 pin_mut, FutureExt, Sink,
6};
7use tokio::task::spawn_blocking;
8use vector_lib::file_source::{
9 paths_provider::PathsProvider, Checkpointer, FileServer, FileServerShutdown,
10 FileSourceInternalEvents, Line,
11};
12
13pub async fn run_file_server<PP, E, C, S>(
16 file_server: FileServer<PP, E>,
17 chans: C,
18 shutdown: S,
19 checkpointer: Checkpointer,
20) -> Result<FileServerShutdown, tokio::task::JoinError>
21where
22 PP: PathsProvider + Send + 'static,
23 E: FileSourceInternalEvents,
24 C: Sink<Vec<Line>> + Unpin + Send + 'static,
25 <C as Sink<Vec<Line>>>::Error: Error + Send,
26 S: Future + Unpin + Send + 'static,
27 <S as Future>::Output: Clone + Send + Sync,
28{
29 let span = info_span!("file_server");
30 let join_handle = spawn_blocking(move || {
31 let shutdown = shutdown.shared();
34 let shutdown2 = shutdown.clone();
35 let _enter = span.enter();
36 let result = file_server.run(chans, shutdown, shutdown2, checkpointer);
37 result.expect("file server exited with an error")
38 });
39 join_handle.await
40}
41
42pub async fn complete_with_deadline_on_signal<F, S>(
43 future: F,
44 signal: S,
45 deadline: Duration,
46) -> Result<<F as Future>::Output, tokio::time::error::Elapsed>
47where
48 F: Future,
49 S: Future<Output = ()>,
50{
51 pin_mut!(future);
52 pin_mut!(signal);
53 let future = match select(future, signal).await {
54 Either::Left((future_output, _)) => return Ok(future_output),
55 Either::Right(((), future)) => future,
56 };
57 pin_mut!(future);
58 tokio::time::timeout(deadline, future).await
59}