vector/sources/windows_event_log/
mod.rs

1use async_trait::async_trait;
2use vector_lib::config::LogNamespace;
3use vrl::value::{Kind, kind::Collection};
4
5use vector_config::component::SourceDescription;
6
7use crate::config::{DataType, SourceConfig, SourceContext, SourceOutput};
8
9// Cross-platform: config types (pure serde structs, no Windows dependencies)
10mod config;
11pub use self::config::*;
12
13cfg_if::cfg_if! {
14    if #[cfg(windows)] {
15        mod bookmark;
16        mod checkpoint;
17        pub mod error;
18        mod metadata;
19        mod parser;
20        mod render;
21        mod sid_resolver;
22        mod subscription;
23        mod xml_parser;
24
25        use std::path::PathBuf;
26        use std::sync::Arc;
27
28        use futures::StreamExt;
29        use vector_lib::EstimatedJsonEncodedSizeOf;
30        use vector_lib::finalizer::OrderedFinalizer;
31        use vector_lib::internal_event::{
32            ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol,
33        };
34        use windows::Win32::Foundation::{DUPLICATE_SAME_ACCESS, DuplicateHandle, HANDLE};
35        use windows::Win32::System::Threading::GetCurrentProcess;
36
37        use crate::{
38            SourceSender,
39            event::{BatchNotifier, BatchStatus, BatchStatusReceiver},
40            internal_events::{
41                EventsReceived, StreamClosedError, WindowsEventLogParseError, WindowsEventLogQueryError,
42            },
43            shutdown::ShutdownSignal,
44        };
45
46        use self::{
47            checkpoint::Checkpointer,
48            error::WindowsEventLogError,
49            parser::EventLogParser,
50            subscription::{EventLogSubscription, WaitResult},
51        };
52    }
53}
54
55#[cfg(all(test, windows))]
56mod tests;
57
58// Integration tests are feature-gated to avoid requiring Windows Event Log service.
59// To run integration tests on Windows: cargo test --features sources-windows_event_log-integration-tests
60#[cfg(all(test, windows, feature = "sources-windows_event_log-integration-tests"))]
61mod integration_tests;
62
63cfg_if::cfg_if! {
64if #[cfg(windows)] {
65
66/// Entry for the acknowledgment finalizer containing checkpoint information.
67/// Each entry represents a batch of events that need to be acknowledged before
68/// the checkpoint can be safely updated. Contains all channel bookmarks from
69/// the batch since a single batch may span multiple channels.
70#[derive(Debug, Clone)]
71struct FinalizerEntry {
72    /// Channel bookmarks: (channel_name, bookmark_xml) pairs
73    bookmarks: Vec<(String, String)>,
74}
75
76/// Shared checkpointer type for use with the finalizer
77type SharedCheckpointer = Arc<Checkpointer>;
78
79/// Finalizer for handling acknowledgments.
80/// Supports both synchronous (immediate checkpoint) and asynchronous (deferred checkpoint) modes.
81enum Finalizer {
82    /// Synchronous mode: checkpoints are updated immediately after reading events.
83    /// Used when acknowledgements are disabled.
84    Sync(SharedCheckpointer),
85    /// Asynchronous mode: checkpoints are updated only after downstream sinks acknowledge receipt.
86    /// Used when acknowledgements are enabled.
87    Async(OrderedFinalizer<FinalizerEntry>),
88}
89
90impl Finalizer {
91    /// Create a new finalizer based on acknowledgement configuration.
92    fn new(
93        acknowledgements: bool,
94        checkpointer: SharedCheckpointer,
95        shutdown: ShutdownSignal,
96    ) -> Self {
97        if acknowledgements {
98            let (finalizer, mut ack_stream) =
99                OrderedFinalizer::<FinalizerEntry>::new(Some(shutdown.clone()));
100
101            // Spawn background task to process acknowledgments and update checkpoints
102            tokio::spawn(async move {
103                while let Some((status, entry)) = ack_stream.next().await {
104                    if status == BatchStatus::Delivered {
105                        if let Err(e) = checkpointer.set_batch(entry.bookmarks.clone()).await {
106                            warn!(
107                                message = "Failed to update checkpoint after acknowledgement.",
108                                error = %e
109                            );
110                        } else {
111                            debug!(
112                                message = "Checkpoint updated after acknowledgement.",
113                                channels = entry.bookmarks.len()
114                            );
115                        }
116                    } else {
117                        debug!(
118                            message = "Events not delivered, checkpoint not updated.",
119                            status = ?status
120                        );
121                    }
122                }
123                debug!(message = "Acknowledgement stream completed.");
124            });
125
126            Self::Async(finalizer)
127        } else {
128            Self::Sync(checkpointer)
129        }
130    }
131
132    /// Finalize a batch of events.
133    /// In sync mode, immediately updates the checkpoint.
134    /// In async mode, registers the entry for deferred checkpoint update.
135    async fn finalize(&self, entry: FinalizerEntry, receiver: Option<BatchStatusReceiver>) {
136        match (self, receiver) {
137            (Self::Sync(checkpointer), None) => {
138                if let Err(e) = checkpointer.set_batch(entry.bookmarks.clone()).await {
139                    warn!(
140                        message = "Failed to update checkpoint.",
141                        error = %e
142                    );
143                }
144            }
145            (Self::Async(finalizer), Some(receiver)) => {
146                finalizer.add(entry, receiver);
147            }
148            (Self::Sync(_), Some(_)) => {
149                warn!(message = "Received acknowledgement receiver in sync mode, ignoring.");
150            }
151            (Self::Async(_), None) => {
152                warn!(
153                    message = "No acknowledgement receiver in async mode, checkpoint may be lost."
154                );
155            }
156        }
157    }
158}
159
160/// Windows Event Log source implementation
161pub struct WindowsEventLogSource {
162    config: WindowsEventLogConfig,
163    data_dir: PathBuf,
164    acknowledgements: bool,
165    log_namespace: LogNamespace,
166}
167
168impl WindowsEventLogSource {
169    pub fn new(
170        config: WindowsEventLogConfig,
171        data_dir: PathBuf,
172        acknowledgements: bool,
173        log_namespace: LogNamespace,
174    ) -> crate::Result<Self> {
175        config.validate()?;
176
177        Ok(Self {
178            config,
179            data_dir,
180            acknowledgements,
181            log_namespace,
182        })
183    }
184
185    async fn run_internal(
186        &mut self,
187        mut out: SourceSender,
188        shutdown: ShutdownSignal,
189    ) -> Result<(), WindowsEventLogError> {
190        let checkpointer = Arc::new(Checkpointer::new(&self.data_dir).await?);
191
192        let finalizer = Finalizer::new(
193            self.acknowledgements,
194            Arc::clone(&checkpointer),
195            shutdown.clone(),
196        );
197
198        let mut subscription = EventLogSubscription::new(
199            &self.config,
200            Arc::clone(&checkpointer),
201            self.acknowledgements,
202        )
203        .await?;
204        let parser = EventLogParser::new(&self.config, self.log_namespace);
205
206        let events_received = register!(EventsReceived);
207        let bytes_received = register!(BytesReceived::from(Protocol::from("windows_event_log")));
208
209        let timeout_ms = self.config.event_timeout_ms as u32;
210        let batch_size = self.config.batch_size as usize;
211        let acknowledgements = self.acknowledgements;
212
213        info!(
214            message = "Starting Windows Event Log source (pull mode).",
215            acknowledgements = acknowledgements,
216        );
217
218        // Spawn async shutdown watcher that signals the Windows shutdown event
219        // when the Vector shutdown signal fires. This wakes WaitForMultipleObjects
220        // while subscription is moved into spawn_blocking.
221        //
222        // We duplicate the handle so the watcher owns an independent kernel reference.
223        // This prevents use-after-close if the subscription panics and drops before
224        // the watcher fires — the duplicate remains valid until explicitly closed.
225        let (watcher_handle_raw, watcher_owns_handle): (isize, bool) = {
226            unsafe {
227                let src = HANDLE(subscription.shutdown_event_raw());
228                let process = GetCurrentProcess();
229                let mut dup = HANDLE::default();
230                if DuplicateHandle(
231                    process,
232                    src,
233                    process,
234                    &mut dup,
235                    0,
236                    false,
237                    DUPLICATE_SAME_ACCESS,
238                )
239                .is_ok()
240                {
241                    (dup.0 as isize, true)
242                } else {
243                    // Fallback: use the original handle without ownership.
244                    // The watcher will signal but NOT close — EventLogSubscription::drop
245                    // owns the handle and will close it.
246                    warn!(
247                        message = "Failed to duplicate shutdown event handle, falling back to shared handle."
248                    );
249                    (src.0 as isize, false)
250                }
251            }
252        };
253        let shutdown_watcher = shutdown.clone();
254        tokio::spawn(async move {
255            shutdown_watcher.await;
256            unsafe {
257                let handle =
258                    windows::Win32::Foundation::HANDLE(watcher_handle_raw as *mut std::ffi::c_void);
259                let _ = windows::Win32::System::Threading::SetEvent(handle);
260                if watcher_owns_handle {
261                    let _ = windows::Win32::Foundation::CloseHandle(handle);
262                }
263            }
264        });
265
266        // Track when we last flushed checkpoints
267        let mut last_checkpoint = std::time::Instant::now();
268        let checkpoint_interval =
269            std::time::Duration::from_secs(self.config.checkpoint_interval_secs);
270
271        // Exponential backoff on consecutive recoverable errors
272        let mut error_backoff = std::time::Duration::from_millis(100);
273        const MAX_ERROR_BACKOFF: std::time::Duration = std::time::Duration::from_secs(5);
274
275        // Health heartbeat: log every ~30s regardless of checkpoint interval
276        let mut timeout_count: u32 = 0;
277        let health_interval_timeouts = (30_000 / self.config.event_timeout_ms).max(1) as u32;
278
279        loop {
280            // Move subscription into blocking thread for WaitForMultipleObjects.
281            // Ownership transfer ensures no data races between the blocking thread
282            // and async code. The shutdown watcher uses a raw HANDLE value (just an
283            // integer) to signal shutdown without needing access to the subscription.
284            let (returned_sub, wait_result) = tokio::task::spawn_blocking({
285                let sub = subscription;
286                move || {
287                    let result = sub.wait_for_events_blocking(timeout_ms);
288                    (sub, result)
289                }
290            })
291            .await
292            .map_err(|e| WindowsEventLogError::ConfigError {
293                message: format!("Wait task panicked: {e}"),
294            })?;
295
296            subscription = returned_sub;
297
298            match wait_result {
299                WaitResult::EventsAvailable => {
300                    // Pull events via spawn_blocking (EvtNext/EvtRender are blocking APIs)
301                    let (returned_sub, events_result) = tokio::task::spawn_blocking({
302                        let mut sub = subscription;
303                        move || {
304                            let result = sub.pull_events(batch_size);
305                            (sub, result)
306                        }
307                    })
308                    .await
309                    .map_err(|e| WindowsEventLogError::ConfigError {
310                        message: format!("Pull task panicked: {e}"),
311                    })?;
312
313                    subscription = returned_sub;
314
315                    // Rate limiting between batches (async-compatible)
316                    if let Some(limiter) = subscription.rate_limiter() {
317                        limiter.until_ready().await;
318                    }
319
320                    match events_result {
321                        Ok(events) if events.is_empty() => {
322                            error_backoff = std::time::Duration::from_millis(100);
323                            continue;
324                        }
325                        Ok(events) => {
326                            error_backoff = std::time::Duration::from_millis(100);
327                            debug!(
328                                message = "Pulled Windows Event Log events.",
329                                event_count = events.len()
330                            );
331
332                            let (batch, receiver) =
333                                BatchNotifier::maybe_new_with_receiver(acknowledgements);
334
335                            let mut log_events = Vec::new();
336                            let mut total_byte_size = 0;
337                            let mut channels_in_batch = std::collections::HashSet::new();
338
339                            for event in events {
340                                let channel = event.channel.clone();
341                                channels_in_batch.insert(channel.clone());
342                                let event_id = event.event_id;
343                                match parser.parse_event(event) {
344                                    Ok(mut log_event) => {
345                                        let byte_size = log_event.estimated_json_encoded_size_of();
346                                        total_byte_size += byte_size.get();
347
348                                        if let Some(ref batch) = batch {
349                                            log_event = log_event.with_batch_notifier(batch);
350                                        }
351
352                                        log_events.push(log_event);
353                                    }
354                                    Err(e) => {
355                                        emit!(WindowsEventLogParseError {
356                                            error: e.to_string(),
357                                            channel,
358                                            event_id: Some(event_id),
359                                        });
360                                    }
361                                }
362                            }
363
364                            if !log_events.is_empty() {
365                                let count = log_events.len();
366                                events_received.emit(CountByteSize(count, total_byte_size.into()));
367                                bytes_received.emit(ByteSize(total_byte_size));
368
369                                // BACK PRESSURE: block here until the pipeline accepts
370                                // the batch. We don't call EvtNext again until this completes.
371                                if let Err(_error) = out.send_batch(log_events).await {
372                                    emit!(StreamClosedError { count });
373                                    break;
374                                }
375
376                                // Register checkpoint entry with finalizer
377                                let bookmarks: Vec<(String, String)> = channels_in_batch
378                                    .into_iter()
379                                    .filter_map(|channel| {
380                                        subscription
381                                            .get_bookmark_xml(&channel)
382                                            .map(|xml| (channel, xml))
383                                    })
384                                    .collect();
385
386                                if !bookmarks.is_empty() {
387                                    let entry = FinalizerEntry { bookmarks };
388                                    finalizer.finalize(entry, receiver).await;
389                                }
390                            }
391                        }
392                        Err(e) => {
393                            emit!(WindowsEventLogQueryError {
394                                channel: "all".to_string(),
395                                query: None,
396                                error: e.to_string(),
397                            });
398                            if !e.is_recoverable() {
399                                error!(
400                                    message = "Non-recoverable pull error, shutting down.",
401                                    error = %e
402                                );
403                                break;
404                            }
405                            // Exponential backoff on consecutive recoverable errors
406                            warn!(
407                                message = "Recoverable pull error, backing off.",
408                                backoff_ms = error_backoff.as_millis() as u64,
409                                error = %e
410                            );
411                            tokio::time::sleep(error_backoff).await;
412                            error_backoff = (error_backoff * 2).min(MAX_ERROR_BACKOFF);
413                        }
414                    }
415                }
416
417                WaitResult::Timeout => {
418                    // A full wait cycle without errors means the system is healthy;
419                    // reset backoff so the next transient error starts fresh.
420                    error_backoff = std::time::Duration::from_millis(100);
421
422                    // Periodic checkpoint flush (sync mode only)
423                    if !acknowledgements && last_checkpoint.elapsed() >= checkpoint_interval {
424                        if let Err(e) = subscription.flush_bookmarks().await {
425                            warn!(
426                                message = "Failed to flush bookmarks during periodic checkpoint.",
427                                error = %e
428                            );
429                        }
430                        last_checkpoint = std::time::Instant::now();
431                    }
432
433                    // Health heartbeat on a separate ~30s cadence
434                    timeout_count += 1;
435                    if timeout_count >= health_interval_timeouts {
436                        timeout_count = 0;
437                        let (total, active) = subscription.channel_health_summary();
438                        if active < total {
439                            warn!(
440                                message = "Some channel subscriptions are inactive.",
441                                total_channels = total,
442                                active_channels = active,
443                            );
444                        } else {
445                            debug!(
446                                message = "All channel subscriptions healthy.",
447                                total_channels = total,
448                            );
449                        }
450                    }
451                }
452
453                WaitResult::Shutdown => {
454                    info!(message = "Windows Event Log wait received shutdown signal.");
455                    if !acknowledgements {
456                        info!(message = "Flushing bookmarks before shutdown.");
457                        if let Err(e) = subscription.flush_bookmarks().await {
458                            warn!(message = "Failed to flush bookmarks on shutdown.", error = %e);
459                        }
460                    }
461                    break;
462                }
463            }
464        }
465
466        Ok(())
467    }
468}
469
470} // if #[cfg(windows)]
471} // cfg_if!
472
473#[async_trait]
474#[typetag::serde(name = "windows_event_log")]
475impl SourceConfig for WindowsEventLogConfig {
476    async fn build(&self, _cx: SourceContext) -> crate::Result<super::Source> {
477        #[cfg(not(windows))]
478        {
479            Err("The windows_event_log source is only supported on Windows.".into())
480        }
481
482        #[cfg(windows)]
483        {
484            let data_dir = _cx
485                .globals
486                .resolve_and_make_data_subdir(self.data_dir.as_ref(), _cx.key.id())?;
487
488            let acknowledgements = _cx.do_acknowledgements(self.acknowledgements);
489
490            let log_namespace = _cx.log_namespace(self.log_namespace);
491            let source = WindowsEventLogSource::new(
492                self.clone(),
493                data_dir,
494                acknowledgements,
495                log_namespace,
496            )?;
497            Ok(Box::pin(async move {
498                let mut source = source;
499                if let Err(error) = source.run_internal(_cx.out, _cx.shutdown).await {
500                    error!(message = "Windows Event Log source failed.", %error);
501                }
502                Ok(())
503            }))
504        }
505    }
506
507    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
508        let log_namespace = self
509            .log_namespace
510            .map(|b| {
511                if b {
512                    LogNamespace::Vector
513                } else {
514                    LogNamespace::Legacy
515                }
516            })
517            .unwrap_or(global_log_namespace);
518
519        let schema_definition = match log_namespace {
520            LogNamespace::Vector => vector_lib::schema::Definition::new_with_default_metadata(
521                Kind::object(std::collections::BTreeMap::from([
522                    ("timestamp".into(), Kind::timestamp().or_undefined()),
523                    ("message".into(), Kind::bytes().or_undefined()),
524                    ("level".into(), Kind::bytes().or_undefined()),
525                    ("source".into(), Kind::bytes().or_undefined()),
526                    ("event_id".into(), Kind::integer().or_undefined()),
527                    ("provider_name".into(), Kind::bytes().or_undefined()),
528                    ("computer".into(), Kind::bytes().or_undefined()),
529                    ("user_id".into(), Kind::bytes().or_undefined()),
530                    ("user_name".into(), Kind::bytes().or_undefined()),
531                    ("record_id".into(), Kind::integer().or_undefined()),
532                    ("activity_id".into(), Kind::bytes().or_undefined()),
533                    ("related_activity_id".into(), Kind::bytes().or_undefined()),
534                    ("process_id".into(), Kind::integer().or_undefined()),
535                    ("thread_id".into(), Kind::integer().or_undefined()),
536                    ("channel".into(), Kind::bytes().or_undefined()),
537                    ("opcode".into(), Kind::integer().or_undefined()),
538                    ("task".into(), Kind::integer().or_undefined()),
539                    ("keywords".into(), Kind::bytes().or_undefined()),
540                    ("level_value".into(), Kind::integer().or_undefined()),
541                    ("provider_guid".into(), Kind::bytes().or_undefined()),
542                    ("version".into(), Kind::integer().or_undefined()),
543                    ("qualifiers".into(), Kind::integer().or_undefined()),
544                    (
545                        "string_inserts".into(),
546                        Kind::array(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
547                    ),
548                    (
549                        "event_data".into(),
550                        Kind::object(std::collections::BTreeMap::new()).or_undefined(),
551                    ),
552                    (
553                        "user_data".into(),
554                        Kind::object(std::collections::BTreeMap::new()).or_undefined(),
555                    ),
556                    ("task_name".into(), Kind::bytes().or_undefined()),
557                    ("opcode_name".into(), Kind::bytes().or_undefined()),
558                    (
559                        "keyword_names".into(),
560                        Kind::array(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
561                    ),
562                ])),
563                [LogNamespace::Vector],
564            ),
565            LogNamespace::Legacy => vector_lib::schema::Definition::any(),
566        };
567
568        vec![SourceOutput::new_maybe_logs(
569            DataType::Log,
570            schema_definition,
571        )]
572    }
573
574    fn resources(&self) -> Vec<crate::config::Resource> {
575        self.channels
576            .iter()
577            .map(|channel| crate::config::Resource::DiskBuffer(channel.clone()))
578            .collect()
579    }
580
581    fn can_acknowledge(&self) -> bool {
582        true
583    }
584}
585
586inventory::submit! {
587    SourceDescription::new::<WindowsEventLogConfig>(
588        "windows_event_log",
589        "Collect logs from Windows Event Log channels",
590        "A Windows-specific source that subscribes to Windows Event Log channels and streams events in real-time using the Windows Event Log API.",
591        "https://vector.dev/docs/reference/configuration/sources/windows_event_log/"
592    )
593}