1use std::{
2 cmp,
3 collections::{BTreeMap, HashMap},
4 path::PathBuf,
5 sync::Arc,
6 time::{self, Duration},
7};
8
9use bytes::Bytes;
10use chrono::{DateTime, Utc};
11use file_source_common::{
12 FileFingerprint, FileSourceInternalEvents, Fingerprinter, ReadFrom,
13 checkpointer::{Checkpointer, CheckpointsView},
14};
15use futures::{
16 Future, Sink, SinkExt,
17 future::{Either, select},
18};
19use futures_util::future::join_all;
20use indexmap::IndexMap;
21use tokio::{
22 fs::{self, remove_file},
23 task::{Id, JoinSet},
24 time::sleep,
25};
26
27use tracing::{debug, error, info, trace};
28
29use crate::{
30 file_watcher::{FileWatcher, RawLineResult},
31 paths_provider::PathsProvider,
32};
33
34pub struct FileServer<PP, E: FileSourceInternalEvents>
44where
45 PP: PathsProvider,
46{
47 pub paths_provider: PP,
48 pub max_read_bytes: usize,
49 pub ignore_checkpoints: bool,
50 pub read_from: ReadFrom,
51 pub ignore_before: Option<DateTime<Utc>>,
52 pub max_line_bytes: usize,
53 pub line_delimiter: Bytes,
54 pub data_dir: PathBuf,
55 pub glob_minimum_cooldown: Duration,
56 pub fingerprinter: Fingerprinter,
57 pub oldest_first: bool,
58 pub remove_after: Option<Duration>,
59 pub emitter: E,
60 pub rotate_wait: Duration,
61}
62
63impl<PP, E> FileServer<PP, E>
77where
78 PP: PathsProvider,
79 E: FileSourceInternalEvents,
80{
81 pub async fn run<C, S1, S2>(
87 mut self,
88 mut chans: C,
89 mut shutdown_data: S1,
90 shutdown_checkpointer: S2,
91 mut checkpointer: Checkpointer,
92 ) -> Result<Shutdown, <C as Sink<Vec<Line>>>::Error>
93 where
94 C: Sink<Vec<Line>> + Unpin,
95 <C as Sink<Vec<Line>>>::Error: std::error::Error,
96 S1: Future + Unpin + Send + 'static,
97 S2: Future + Unpin + Send + 'static,
98 {
99 let mut fp_map: IndexMap<FileFingerprint, FileWatcher> = Default::default();
100
101 let mut backoff_cap: usize = 1;
102 let mut lines = Vec::new();
103
104 checkpointer.read_checkpoints(self.ignore_before).await;
105
106 let mut known_small_files = HashMap::new();
107
108 let mut existing_files = Vec::new();
109 for path in self.paths_provider.paths().into_iter() {
110 if let Some(file_id) = self
111 .fingerprinter
112 .fingerprint_or_emit(&path, &mut known_small_files, &self.emitter)
113 .await
114 {
115 existing_files.push((path, file_id));
116 }
117 }
118
119 let metadata = join_all(
120 existing_files
121 .iter()
122 .map(|(path, _file_id)| fs::metadata(path)),
123 )
124 .await;
125
126 let created = metadata.into_iter().map(|m| {
127 m.and_then(|m| m.created())
128 .map(DateTime::<Utc>::from)
129 .unwrap_or_else(|_| Utc::now())
130 });
131
132 let mut existing_files: Vec<(DateTime<Utc>, PathBuf, FileFingerprint)> = existing_files
133 .into_iter()
134 .zip(created)
135 .map(|((path, file_id), key)| (key, path, file_id))
136 .collect();
137
138 existing_files.sort_by_key(|(key, _, _)| *key);
139
140 let checkpoints = checkpointer.view();
141
142 for (_key, path, file_id) in existing_files {
143 self.watch_new_file(path, file_id, &mut fp_map, &checkpoints, true)
144 .await;
145 }
146 self.emitter.emit_files_open(fp_map.len());
147
148 let mut stats = TimingStats::default();
149
150 let checkpoint_task_handle = tokio::spawn(checkpoint_writer(
152 checkpointer,
153 self.glob_minimum_cooldown,
154 shutdown_checkpointer,
155 self.emitter.clone(),
156 ));
157
158 let mut next_glob_time = time::Instant::now();
168 loop {
169 let now_time = time::Instant::now();
171 if next_glob_time <= now_time {
172 next_glob_time = now_time.checked_add(self.glob_minimum_cooldown).unwrap();
174
175 if stats.started_at.elapsed() > Duration::from_secs(1) {
176 stats.report();
177 }
178
179 if stats.started_at.elapsed() > Duration::from_secs(10) {
180 stats = TimingStats::default();
181 }
182
183 let start = time::Instant::now();
185 for (_file_id, watcher) in &mut fp_map {
186 watcher.set_file_findable(false); }
188 for path in self.paths_provider.paths().into_iter() {
189 if let Some(file_id) = self
190 .fingerprinter
191 .fingerprint_or_emit(&path, &mut known_small_files, &self.emitter)
192 .await
193 {
194 if let Some(watcher) = fp_map.get_mut(&file_id) {
195 let was_found_this_cycle = watcher.file_findable();
197 watcher.set_file_findable(true);
198 if watcher.path == path {
199 trace!(
200 message = "Continue watching file.",
201 path = ?path,
202 );
203 } else if !was_found_this_cycle {
204 info!(
206 message = "Watched file has been renamed.",
207 path = ?path,
208 old_path = ?watcher.path
209 );
210 watcher.update_path(path).await.ok(); } else {
212 info!(
213 message = "More than one file has the same fingerprint.",
214 path = ?path,
215 old_path = ?watcher.path
216 );
217 let (old_path, new_path) = (&watcher.path, &path);
218 if let (Ok(old_modified_time), Ok(new_modified_time)) = (
219 fs::metadata(old_path).await.and_then(|m| m.modified()),
220 fs::metadata(new_path).await.and_then(|m| m.modified()),
221 ) && old_modified_time < new_modified_time
222 {
223 info!(
224 message = "Switching to watch most recently modified file.",
225 new_modified_time = ?new_modified_time,
226 old_modified_time = ?old_modified_time,
227 );
228 watcher.update_path(path).await.ok(); }
230 }
231 } else {
232 self.watch_new_file(path, file_id, &mut fp_map, &checkpoints, false)
234 .await;
235 self.emitter.emit_files_open(fp_map.len());
236 }
237 }
238 }
239 stats.record("discovery", start.elapsed());
240 }
241
242 if let Some(grace_period) = self.remove_after {
244 let mut set = JoinSet::new();
245
246 let remove_file_tasks: HashMap<Id, PathBuf> = known_small_files
247 .iter()
248 .filter(|&(_path, last_time_open)| last_time_open.elapsed() >= grace_period)
249 .map(|(path, _last_time_open)| path.clone())
250 .map(|path| {
251 let path_ = path.clone();
252 let abort_handle =
253 set.spawn(async move { (path_.clone(), remove_file(&path_).await) });
254 (abort_handle.id(), path)
255 })
256 .collect();
257
258 while let Some(res) = set.join_next().await {
259 match res {
260 Ok((path, Ok(()))) => {
261 let removed = known_small_files.remove(&path);
262
263 if removed.is_some() {
264 self.emitter.emit_file_deleted(&path);
265 }
266 }
267 Ok((path, Err(err))) => {
268 self.emitter.emit_file_delete_error(&path, err);
269 }
270 Err(join_err) => {
271 self.emitter.emit_file_delete_error(
272 remove_file_tasks
273 .get(&join_err.id())
274 .expect("panicked/cancelled task id not in task id pool"),
275 std::io::Error::other(join_err),
276 );
277 }
278 }
279 }
280 }
281
282 let mut global_bytes_read: usize = 0;
284 let mut maxed_out_reading_single_file = false;
285 for (&file_id, watcher) in &mut fp_map {
286 if !watcher.should_read() {
287 continue;
288 }
289
290 let start = time::Instant::now();
291 let mut bytes_read: usize = 0;
292 while let Ok(RawLineResult {
293 raw_line: Some(line),
294 discarded_for_size_and_truncated,
295 }) = watcher.read_line().await
296 {
297 discarded_for_size_and_truncated.iter().for_each(|buf| {
298 self.emitter.emit_file_line_too_long(
299 &buf.clone(),
300 self.max_line_bytes,
301 buf.len(),
302 )
303 });
304
305 let sz = line.bytes.len();
306 trace!(
307 message = "Read bytes.",
308 path = ?watcher.path,
309 bytes = ?sz
310 );
311 stats.record_bytes(sz);
312
313 bytes_read += sz;
314
315 lines.push(Line {
316 text: line.bytes,
317 filename: watcher.path.to_str().expect("not a valid path").to_owned(),
318 file_id,
319 start_offset: line.offset,
320 end_offset: watcher.get_file_position(),
321 });
322
323 if bytes_read > self.max_read_bytes {
324 maxed_out_reading_single_file = true;
325 break;
326 }
327 }
328 stats.record("reading", start.elapsed());
329
330 if bytes_read > 0 {
331 global_bytes_read = global_bytes_read.saturating_add(bytes_read);
332 } else {
333 if let Some(grace_period) = self.remove_after
335 && watcher.last_read_success().elapsed() >= grace_period
336 {
337 match remove_file(&watcher.path).await {
339 Ok(()) => {
340 self.emitter.emit_file_deleted(&watcher.path);
341 watcher.set_dead();
342 }
343 Err(error) => {
344 self.emitter.emit_file_delete_error(&watcher.path, error);
346 }
347 }
348 }
349 }
350
351 if self.oldest_first && maxed_out_reading_single_file {
353 break;
354 }
355 }
356
357 for (_, watcher) in &mut fp_map {
358 if !watcher.file_findable() && watcher.last_seen().elapsed() > self.rotate_wait {
359 watcher.set_dead();
360 }
361 }
362
363 fp_map.retain(|file_id, watcher| {
366 if watcher.dead() {
367 self.emitter
368 .emit_file_unwatched(&watcher.path, watcher.reached_eof());
369 checkpoints.set_dead(*file_id);
370 false
371 } else {
372 true
373 }
374 });
375 self.emitter.emit_files_open(fp_map.len());
376
377 let start = time::Instant::now();
378 let to_send = std::mem::take(&mut lines);
379
380 let result = chans.send(to_send).await;
381 match result {
382 Ok(()) => {}
383 Err(error) => {
384 error!(message = "Output channel closed.", %error);
385 return Err(error);
386 }
387 }
388 stats.record("sending", start.elapsed());
389
390 let start = time::Instant::now();
391 backoff_cap = if global_bytes_read == 0 {
396 cmp::min(2_048, backoff_cap.saturating_mul(2))
397 } else {
398 1
399 };
400 let backoff = backoff_cap.saturating_sub(global_bytes_read);
401
402 let sleep = async move {
408 if backoff > 0 {
409 sleep(Duration::from_millis(backoff as u64)).await;
410 }
411 };
412 futures::pin_mut!(sleep);
413 match select(shutdown_data, sleep).await {
414 Either::Left((_, _)) => {
415 chans
416 .close()
417 .await
418 .expect("error closing file_server data channel.");
419 let checkpointer = checkpoint_task_handle
420 .await
421 .expect("checkpoint task has panicked");
422 if let Err(error) = checkpointer.write_checkpoints().await {
423 error!(?error, "Error writing checkpoints before shutdown");
424 }
425 return Ok(Shutdown);
426 }
427 Either::Right((_, future)) => shutdown_data = future,
428 }
429 stats.record("sleeping", start.elapsed());
430 }
431 }
432
433 async fn watch_new_file(
434 &self,
435 path: PathBuf,
436 file_id: FileFingerprint,
437 fp_map: &mut IndexMap<FileFingerprint, FileWatcher>,
438 checkpoints: &CheckpointsView,
439 startup: bool,
440 ) {
441 let fallback = if startup {
445 self.read_from
446 } else {
447 ReadFrom::Beginning
451 };
452
453 let read_from = if !self.ignore_checkpoints {
459 checkpoints
460 .get(file_id)
461 .map(ReadFrom::Checkpoint)
462 .unwrap_or(fallback)
463 } else {
464 fallback
465 };
466
467 match FileWatcher::new(
468 path.clone(),
469 read_from,
470 self.ignore_before,
471 self.max_line_bytes,
472 self.line_delimiter.clone(),
473 )
474 .await
475 {
476 Ok(mut watcher) => {
477 if let ReadFrom::Checkpoint(file_position) = read_from {
478 self.emitter.emit_file_resumed(&path, file_position);
479 } else {
480 self.emitter.emit_file_added(&path);
481 }
482 watcher.set_file_findable(true);
483 fp_map.insert(file_id, watcher);
484 }
485 Err(error) => self.emitter.emit_file_watch_error(&path, error),
486 };
487 }
488}
489
490async fn checkpoint_writer(
491 checkpointer: Checkpointer,
492 sleep_duration: Duration,
493 mut shutdown: impl Future + Unpin,
494 emitter: impl FileSourceInternalEvents,
495) -> Arc<Checkpointer> {
496 let checkpointer = Arc::new(checkpointer);
497 loop {
498 let sleep = sleep(sleep_duration);
499 tokio::select! {
500 _ = &mut shutdown => break,
501 _ = sleep => {},
502 }
503
504 let emitter = emitter.clone();
505 let checkpointer = Arc::clone(&checkpointer);
506 let start = time::Instant::now();
507 match checkpointer.write_checkpoints().await {
508 Ok(count) => emitter.emit_file_checkpointed(count, start.elapsed()),
509 Err(error) => emitter.emit_file_checkpoint_write_error(error),
510 };
511 }
512 checkpointer
513}
514
515pub fn calculate_ignore_before(ignore_older_secs: Option<u64>) -> Option<DateTime<Utc>> {
516 ignore_older_secs.map(|secs| Utc::now() - chrono::Duration::seconds(secs as i64))
517}
518
519#[derive(Debug)]
525pub struct Shutdown;
526
527struct TimingStats {
528 started_at: time::Instant,
529 segments: BTreeMap<&'static str, Duration>,
530 events: usize,
531 bytes: usize,
532}
533
534impl TimingStats {
535 fn record(&mut self, key: &'static str, duration: Duration) {
536 let segment = self.segments.entry(key).or_default();
537 *segment += duration;
538 }
539
540 fn record_bytes(&mut self, bytes: usize) {
541 self.events += 1;
542 self.bytes += bytes;
543 }
544
545 fn report(&self) {
546 if !tracing::level_enabled!(tracing::Level::DEBUG) {
547 return;
548 }
549 let total = self.started_at.elapsed();
550 let counted: Duration = self.segments.values().sum();
551 let other: Duration = total.saturating_sub(counted);
552 let mut ratios = self
553 .segments
554 .iter()
555 .map(|(k, v)| (*k, v.as_secs_f32() / total.as_secs_f32()))
556 .collect::<BTreeMap<_, _>>();
557 ratios.insert("other", other.as_secs_f32() / total.as_secs_f32());
558 let (event_throughput, bytes_throughput) = if total.as_secs() > 0 {
559 (
560 self.events as u64 / total.as_secs(),
561 self.bytes as u64 / total.as_secs(),
562 )
563 } else {
564 (0, 0)
565 };
566 debug!(event_throughput = %scale(event_throughput), bytes_throughput = %scale(bytes_throughput), ?ratios);
567 }
568}
569
570fn scale(bytes: u64) -> String {
571 let units = ["", "k", "m", "g"];
572 let mut bytes = bytes as f32;
573 let mut i = 0;
574 while bytes > 1000.0 && i <= 3 {
575 bytes /= 1000.0;
576 i += 1;
577 }
578 format!("{:.3}{}/sec", bytes, units[i])
579}
580
581impl Default for TimingStats {
582 fn default() -> Self {
583 Self {
584 started_at: time::Instant::now(),
585 segments: Default::default(),
586 events: Default::default(),
587 bytes: Default::default(),
588 }
589 }
590}
591
592#[derive(Debug)]
593pub struct Line {
594 pub text: Bytes,
595 pub filename: String,
596 pub file_id: FileFingerprint,
597 pub start_offset: u64,
598 pub end_offset: u64,
599}