1use async_trait::async_trait;
2use vector_lib::config::LogNamespace;
3use vrl::value::{Kind, kind::Collection};
4
5use vector_config::component::SourceDescription;
6
7use crate::config::{DataType, SourceConfig, SourceContext, SourceOutput};
8
9mod config;
11pub use self::config::*;
12
13cfg_if::cfg_if! {
14 if #[cfg(windows)] {
15 mod bookmark;
16 mod checkpoint;
17 pub mod error;
18 mod metadata;
19 mod parser;
20 mod render;
21 mod sid_resolver;
22 mod subscription;
23 mod xml_parser;
24
25 use std::path::PathBuf;
26 use std::sync::Arc;
27
28 use futures::StreamExt;
29 use vector_lib::EstimatedJsonEncodedSizeOf;
30 use vector_lib::finalizer::OrderedFinalizer;
31 use vector_lib::internal_event::{
32 ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol,
33 };
34 use windows::Win32::Foundation::{DUPLICATE_SAME_ACCESS, DuplicateHandle, HANDLE};
35 use windows::Win32::System::Threading::GetCurrentProcess;
36
37 use crate::{
38 SourceSender,
39 event::{BatchNotifier, BatchStatus, BatchStatusReceiver},
40 internal_events::{
41 EventsReceived, StreamClosedError, WindowsEventLogParseError, WindowsEventLogQueryError,
42 },
43 shutdown::ShutdownSignal,
44 };
45
46 use self::{
47 checkpoint::Checkpointer,
48 error::WindowsEventLogError,
49 parser::EventLogParser,
50 subscription::{EventLogSubscription, WaitResult},
51 };
52 }
53}
54
55#[cfg(all(test, windows))]
56mod tests;
57
58#[cfg(all(test, windows, feature = "sources-windows_event_log-integration-tests"))]
61mod integration_tests;
62
63cfg_if::cfg_if! {
64if #[cfg(windows)] {
65
66#[derive(Debug, Clone)]
71struct FinalizerEntry {
72 bookmarks: Vec<(String, String)>,
74}
75
76type SharedCheckpointer = Arc<Checkpointer>;
78
79enum Finalizer {
82 Sync(SharedCheckpointer),
85 Async(OrderedFinalizer<FinalizerEntry>),
88}
89
90impl Finalizer {
91 fn new(
93 acknowledgements: bool,
94 checkpointer: SharedCheckpointer,
95 shutdown: ShutdownSignal,
96 ) -> Self {
97 if acknowledgements {
98 let (finalizer, mut ack_stream) =
99 OrderedFinalizer::<FinalizerEntry>::new(Some(shutdown.clone()));
100
101 tokio::spawn(async move {
103 while let Some((status, entry)) = ack_stream.next().await {
104 if status == BatchStatus::Delivered {
105 if let Err(e) = checkpointer.set_batch(entry.bookmarks.clone()).await {
106 warn!(
107 message = "Failed to update checkpoint after acknowledgement.",
108 error = %e
109 );
110 } else {
111 debug!(
112 message = "Checkpoint updated after acknowledgement.",
113 channels = entry.bookmarks.len()
114 );
115 }
116 } else {
117 debug!(
118 message = "Events not delivered, checkpoint not updated.",
119 status = ?status
120 );
121 }
122 }
123 debug!(message = "Acknowledgement stream completed.");
124 });
125
126 Self::Async(finalizer)
127 } else {
128 Self::Sync(checkpointer)
129 }
130 }
131
132 async fn finalize(&self, entry: FinalizerEntry, receiver: Option<BatchStatusReceiver>) {
136 match (self, receiver) {
137 (Self::Sync(checkpointer), None) => {
138 if let Err(e) = checkpointer.set_batch(entry.bookmarks.clone()).await {
139 warn!(
140 message = "Failed to update checkpoint.",
141 error = %e
142 );
143 }
144 }
145 (Self::Async(finalizer), Some(receiver)) => {
146 finalizer.add(entry, receiver);
147 }
148 (Self::Sync(_), Some(_)) => {
149 warn!(message = "Received acknowledgement receiver in sync mode, ignoring.");
150 }
151 (Self::Async(_), None) => {
152 warn!(
153 message = "No acknowledgement receiver in async mode, checkpoint may be lost."
154 );
155 }
156 }
157 }
158}
159
160pub struct WindowsEventLogSource {
162 config: WindowsEventLogConfig,
163 data_dir: PathBuf,
164 acknowledgements: bool,
165 log_namespace: LogNamespace,
166}
167
168impl WindowsEventLogSource {
169 pub fn new(
170 config: WindowsEventLogConfig,
171 data_dir: PathBuf,
172 acknowledgements: bool,
173 log_namespace: LogNamespace,
174 ) -> crate::Result<Self> {
175 config.validate()?;
176
177 Ok(Self {
178 config,
179 data_dir,
180 acknowledgements,
181 log_namespace,
182 })
183 }
184
185 async fn run_internal(
186 &mut self,
187 mut out: SourceSender,
188 shutdown: ShutdownSignal,
189 ) -> Result<(), WindowsEventLogError> {
190 let checkpointer = Arc::new(Checkpointer::new(&self.data_dir).await?);
191
192 let finalizer = Finalizer::new(
193 self.acknowledgements,
194 Arc::clone(&checkpointer),
195 shutdown.clone(),
196 );
197
198 let mut subscription = EventLogSubscription::new(
199 &self.config,
200 Arc::clone(&checkpointer),
201 self.acknowledgements,
202 )
203 .await?;
204 let parser = EventLogParser::new(&self.config, self.log_namespace);
205
206 let events_received = register!(EventsReceived);
207 let bytes_received = register!(BytesReceived::from(Protocol::from("windows_event_log")));
208
209 let timeout_ms = self.config.event_timeout_ms as u32;
210 let batch_size = self.config.batch_size as usize;
211 let acknowledgements = self.acknowledgements;
212
213 info!(
214 message = "Starting Windows Event Log source (pull mode).",
215 acknowledgements = acknowledgements,
216 );
217
218 let (watcher_handle_raw, watcher_owns_handle): (isize, bool) = {
226 unsafe {
227 let src = HANDLE(subscription.shutdown_event_raw());
228 let process = GetCurrentProcess();
229 let mut dup = HANDLE::default();
230 if DuplicateHandle(
231 process,
232 src,
233 process,
234 &mut dup,
235 0,
236 false,
237 DUPLICATE_SAME_ACCESS,
238 )
239 .is_ok()
240 {
241 (dup.0 as isize, true)
242 } else {
243 warn!(
247 message = "Failed to duplicate shutdown event handle, falling back to shared handle."
248 );
249 (src.0 as isize, false)
250 }
251 }
252 };
253 let shutdown_watcher = shutdown.clone();
254 tokio::spawn(async move {
255 shutdown_watcher.await;
256 unsafe {
257 let handle =
258 windows::Win32::Foundation::HANDLE(watcher_handle_raw as *mut std::ffi::c_void);
259 let _ = windows::Win32::System::Threading::SetEvent(handle);
260 if watcher_owns_handle {
261 let _ = windows::Win32::Foundation::CloseHandle(handle);
262 }
263 }
264 });
265
266 let mut last_checkpoint = std::time::Instant::now();
268 let checkpoint_interval =
269 std::time::Duration::from_secs(self.config.checkpoint_interval_secs);
270
271 let mut error_backoff = std::time::Duration::from_millis(100);
273 const MAX_ERROR_BACKOFF: std::time::Duration = std::time::Duration::from_secs(5);
274
275 let mut timeout_count: u32 = 0;
277 let health_interval_timeouts = (30_000 / self.config.event_timeout_ms).max(1) as u32;
278
279 loop {
280 let (returned_sub, wait_result) = tokio::task::spawn_blocking({
285 let sub = subscription;
286 move || {
287 let result = sub.wait_for_events_blocking(timeout_ms);
288 (sub, result)
289 }
290 })
291 .await
292 .map_err(|e| WindowsEventLogError::ConfigError {
293 message: format!("Wait task panicked: {e}"),
294 })?;
295
296 subscription = returned_sub;
297
298 match wait_result {
299 WaitResult::EventsAvailable => {
300 let (returned_sub, events_result) = tokio::task::spawn_blocking({
302 let mut sub = subscription;
303 move || {
304 let result = sub.pull_events(batch_size);
305 (sub, result)
306 }
307 })
308 .await
309 .map_err(|e| WindowsEventLogError::ConfigError {
310 message: format!("Pull task panicked: {e}"),
311 })?;
312
313 subscription = returned_sub;
314
315 if let Some(limiter) = subscription.rate_limiter() {
317 limiter.until_ready().await;
318 }
319
320 match events_result {
321 Ok(events) if events.is_empty() => {
322 error_backoff = std::time::Duration::from_millis(100);
323 continue;
324 }
325 Ok(events) => {
326 error_backoff = std::time::Duration::from_millis(100);
327 debug!(
328 message = "Pulled Windows Event Log events.",
329 event_count = events.len()
330 );
331
332 let (batch, receiver) =
333 BatchNotifier::maybe_new_with_receiver(acknowledgements);
334
335 let mut log_events = Vec::new();
336 let mut total_byte_size = 0;
337 let mut channels_in_batch = std::collections::HashSet::new();
338
339 for event in events {
340 let channel = event.channel.clone();
341 channels_in_batch.insert(channel.clone());
342 let event_id = event.event_id;
343 match parser.parse_event(event) {
344 Ok(mut log_event) => {
345 let byte_size = log_event.estimated_json_encoded_size_of();
346 total_byte_size += byte_size.get();
347
348 if let Some(ref batch) = batch {
349 log_event = log_event.with_batch_notifier(batch);
350 }
351
352 log_events.push(log_event);
353 }
354 Err(e) => {
355 emit!(WindowsEventLogParseError {
356 error: e.to_string(),
357 channel,
358 event_id: Some(event_id),
359 });
360 }
361 }
362 }
363
364 if !log_events.is_empty() {
365 let count = log_events.len();
366 events_received.emit(CountByteSize(count, total_byte_size.into()));
367 bytes_received.emit(ByteSize(total_byte_size));
368
369 if let Err(_error) = out.send_batch(log_events).await {
372 emit!(StreamClosedError { count });
373 break;
374 }
375
376 let bookmarks: Vec<(String, String)> = channels_in_batch
378 .into_iter()
379 .filter_map(|channel| {
380 subscription
381 .get_bookmark_xml(&channel)
382 .map(|xml| (channel, xml))
383 })
384 .collect();
385
386 if !bookmarks.is_empty() {
387 let entry = FinalizerEntry { bookmarks };
388 finalizer.finalize(entry, receiver).await;
389 }
390 }
391 }
392 Err(e) => {
393 emit!(WindowsEventLogQueryError {
394 channel: "all".to_string(),
395 query: None,
396 error: e.to_string(),
397 });
398 if !e.is_recoverable() {
399 error!(
400 message = "Non-recoverable pull error, shutting down.",
401 error = %e
402 );
403 break;
404 }
405 warn!(
407 message = "Recoverable pull error, backing off.",
408 backoff_ms = error_backoff.as_millis() as u64,
409 error = %e
410 );
411 tokio::time::sleep(error_backoff).await;
412 error_backoff = (error_backoff * 2).min(MAX_ERROR_BACKOFF);
413 }
414 }
415 }
416
417 WaitResult::Timeout => {
418 error_backoff = std::time::Duration::from_millis(100);
421
422 if !acknowledgements && last_checkpoint.elapsed() >= checkpoint_interval {
424 if let Err(e) = subscription.flush_bookmarks().await {
425 warn!(
426 message = "Failed to flush bookmarks during periodic checkpoint.",
427 error = %e
428 );
429 }
430 last_checkpoint = std::time::Instant::now();
431 }
432
433 timeout_count += 1;
435 if timeout_count >= health_interval_timeouts {
436 timeout_count = 0;
437 let (total, active) = subscription.channel_health_summary();
438 if active < total {
439 warn!(
440 message = "Some channel subscriptions are inactive.",
441 total_channels = total,
442 active_channels = active,
443 );
444 } else {
445 debug!(
446 message = "All channel subscriptions healthy.",
447 total_channels = total,
448 );
449 }
450 }
451 }
452
453 WaitResult::Shutdown => {
454 info!(message = "Windows Event Log wait received shutdown signal.");
455 if !acknowledgements {
456 info!(message = "Flushing bookmarks before shutdown.");
457 if let Err(e) = subscription.flush_bookmarks().await {
458 warn!(message = "Failed to flush bookmarks on shutdown.", error = %e);
459 }
460 }
461 break;
462 }
463 }
464 }
465
466 Ok(())
467 }
468}
469
470} } #[async_trait]
474#[typetag::serde(name = "windows_event_log")]
475impl SourceConfig for WindowsEventLogConfig {
476 async fn build(&self, _cx: SourceContext) -> crate::Result<super::Source> {
477 #[cfg(not(windows))]
478 {
479 Err("The windows_event_log source is only supported on Windows.".into())
480 }
481
482 #[cfg(windows)]
483 {
484 let data_dir = _cx
485 .globals
486 .resolve_and_make_data_subdir(self.data_dir.as_ref(), _cx.key.id())?;
487
488 let acknowledgements = _cx.do_acknowledgements(self.acknowledgements);
489
490 let log_namespace = _cx.log_namespace(self.log_namespace);
491 let source = WindowsEventLogSource::new(
492 self.clone(),
493 data_dir,
494 acknowledgements,
495 log_namespace,
496 )?;
497 Ok(Box::pin(async move {
498 let mut source = source;
499 if let Err(error) = source.run_internal(_cx.out, _cx.shutdown).await {
500 error!(message = "Windows Event Log source failed.", %error);
501 }
502 Ok(())
503 }))
504 }
505 }
506
507 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
508 let log_namespace = self
509 .log_namespace
510 .map(|b| {
511 if b {
512 LogNamespace::Vector
513 } else {
514 LogNamespace::Legacy
515 }
516 })
517 .unwrap_or(global_log_namespace);
518
519 let schema_definition = match log_namespace {
520 LogNamespace::Vector => vector_lib::schema::Definition::new_with_default_metadata(
521 Kind::object(std::collections::BTreeMap::from([
522 ("timestamp".into(), Kind::timestamp().or_undefined()),
523 ("message".into(), Kind::bytes().or_undefined()),
524 ("level".into(), Kind::bytes().or_undefined()),
525 ("source".into(), Kind::bytes().or_undefined()),
526 ("event_id".into(), Kind::integer().or_undefined()),
527 ("provider_name".into(), Kind::bytes().or_undefined()),
528 ("computer".into(), Kind::bytes().or_undefined()),
529 ("user_id".into(), Kind::bytes().or_undefined()),
530 ("user_name".into(), Kind::bytes().or_undefined()),
531 ("record_id".into(), Kind::integer().or_undefined()),
532 ("activity_id".into(), Kind::bytes().or_undefined()),
533 ("related_activity_id".into(), Kind::bytes().or_undefined()),
534 ("process_id".into(), Kind::integer().or_undefined()),
535 ("thread_id".into(), Kind::integer().or_undefined()),
536 ("channel".into(), Kind::bytes().or_undefined()),
537 ("opcode".into(), Kind::integer().or_undefined()),
538 ("task".into(), Kind::integer().or_undefined()),
539 ("keywords".into(), Kind::bytes().or_undefined()),
540 ("level_value".into(), Kind::integer().or_undefined()),
541 ("provider_guid".into(), Kind::bytes().or_undefined()),
542 ("version".into(), Kind::integer().or_undefined()),
543 ("qualifiers".into(), Kind::integer().or_undefined()),
544 (
545 "string_inserts".into(),
546 Kind::array(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
547 ),
548 (
549 "event_data".into(),
550 Kind::object(std::collections::BTreeMap::new()).or_undefined(),
551 ),
552 (
553 "user_data".into(),
554 Kind::object(std::collections::BTreeMap::new()).or_undefined(),
555 ),
556 ("task_name".into(), Kind::bytes().or_undefined()),
557 ("opcode_name".into(), Kind::bytes().or_undefined()),
558 (
559 "keyword_names".into(),
560 Kind::array(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
561 ),
562 ])),
563 [LogNamespace::Vector],
564 ),
565 LogNamespace::Legacy => vector_lib::schema::Definition::any(),
566 };
567
568 vec![SourceOutput::new_maybe_logs(
569 DataType::Log,
570 schema_definition,
571 )]
572 }
573
574 fn resources(&self) -> Vec<crate::config::Resource> {
575 self.channels
576 .iter()
577 .map(|channel| crate::config::Resource::DiskBuffer(channel.clone()))
578 .collect()
579 }
580
581 fn can_acknowledge(&self) -> bool {
582 true
583 }
584}
585
586inventory::submit! {
587 SourceDescription::new::<WindowsEventLogConfig>(
588 "windows_event_log",
589 "Collect logs from Windows Event Log channels",
590 "A Windows-specific source that subscribes to Windows Event Log channels and streams events in real-time using the Windows Event Log API.",
591 "https://vector.dev/docs/reference/configuration/sources/windows_event_log/"
592 )
593}