vector/sources/kubernetes_logs/
util.rs1use std::{error::Error, future::Future, time::Duration};
2
3use futures::{
4 FutureExt, Sink,
5 future::{Either, select},
6 pin_mut,
7};
8use vector_lib::{
9 file_source::{
10 file_server::{FileServer, Line, Shutdown as FileServerShutdown},
11 paths_provider::PathsProvider,
12 },
13 file_source_common::{Checkpointer, FileSourceInternalEvents},
14};
15
16pub async fn run_file_server<PP, E, C, S>(
19 file_server: FileServer<PP, E>,
20 chans: C,
21 shutdown: S,
22 checkpointer: Checkpointer,
23) -> Result<FileServerShutdown, tokio::task::JoinError>
24where
25 PP: PathsProvider + Send + Sync + 'static,
26 E: FileSourceInternalEvents,
27 C: Sink<Vec<Line>> + Unpin + Send + 'static,
28 <C as Sink<Vec<Line>>>::Error: Error + Send,
29 S: Future + Unpin + Send + 'static,
30 <S as Future>::Output: Clone + Send + Sync,
31 <<PP as PathsProvider>::IntoIter as IntoIterator>::IntoIter: Send,
32{
33 let span = info_span!("file_server");
34
35 let join_handle = tokio::task::spawn_blocking(move || {
37 let shutdown = shutdown.shared();
40 let shutdown2 = shutdown.clone();
41 let _enter = span.enter();
42
43 let rt = tokio::runtime::Handle::current();
44 let result = rt.block_on(file_server.run(chans, shutdown, shutdown2, checkpointer));
45 result.expect("file server exited with an error")
46 });
47 join_handle.await
48}
49
50pub async fn complete_with_deadline_on_signal<F, S>(
51 future: F,
52 signal: S,
53 deadline: Duration,
54) -> Result<<F as Future>::Output, tokio::time::error::Elapsed>
55where
56 F: Future,
57 S: Future<Output = ()>,
58{
59 pin_mut!(future);
60 pin_mut!(signal);
61 let future = match select(future, signal).await {
62 Either::Left((future_output, _)) => return Ok(future_output),
63 Either::Right(((), future)) => future,
64 };
65 pin_mut!(future);
66 tokio::time::timeout(deadline, future).await
67}