vector/sources/kubernetes_logs/
util.rs

1use std::{error::Error, future::Future, time::Duration};
2
3use futures::{
4    FutureExt, Sink,
5    future::{Either, select},
6    pin_mut,
7};
8use vector_lib::{
9    file_source::{
10        file_server::{FileServer, Line, Shutdown as FileServerShutdown},
11        paths_provider::PathsProvider,
12    },
13    file_source_common::{Checkpointer, FileSourceInternalEvents},
14};
15
16/// A tiny wrapper around a [`FileServer`] that runs it as a [`spawn_blocking`]
17/// task.
18pub async fn run_file_server<PP, E, C, S>(
19    file_server: FileServer<PP, E>,
20    chans: C,
21    shutdown: S,
22    checkpointer: Checkpointer,
23) -> Result<FileServerShutdown, tokio::task::JoinError>
24where
25    PP: PathsProvider + Send + Sync + 'static,
26    E: FileSourceInternalEvents,
27    C: Sink<Vec<Line>> + Unpin + Send + 'static,
28    <C as Sink<Vec<Line>>>::Error: Error + Send,
29    S: Future + Unpin + Send + 'static,
30    <S as Future>::Output: Clone + Send + Sync,
31    <<PP as PathsProvider>::IntoIter as IntoIterator>::IntoIter: Send,
32{
33    let span = info_span!("file_server");
34
35    // spawn_blocking shouldn't be needed: https://github.com/vectordotdev/vector/issues/23743
36    let join_handle = tokio::task::spawn_blocking(move || {
37        // These will need to be separated when this source is updated
38        // to support end-to-end acknowledgements.
39        let shutdown = shutdown.shared();
40        let shutdown2 = shutdown.clone();
41        let _enter = span.enter();
42
43        let rt = tokio::runtime::Handle::current();
44        let result = rt.block_on(file_server.run(chans, shutdown, shutdown2, checkpointer));
45        result.expect("file server exited with an error")
46    });
47    join_handle.await
48}
49
50pub async fn complete_with_deadline_on_signal<F, S>(
51    future: F,
52    signal: S,
53    deadline: Duration,
54) -> Result<<F as Future>::Output, tokio::time::error::Elapsed>
55where
56    F: Future,
57    S: Future<Output = ()>,
58{
59    pin_mut!(future);
60    pin_mut!(signal);
61    let future = match select(future, signal).await {
62        Either::Left((future_output, _)) => return Ok(future_output),
63        Either::Right(((), future)) => future,
64    };
65    pin_mut!(future);
66    tokio::time::timeout(deadline, future).await
67}