1use std::path::PathBuf;
2
3use base64::prelude::{BASE64_STANDARD, Engine as _};
4use dnsmsg_parser::dns_message_parser::DnsParserOptions;
5use dnstap_parser::{
6 parser::DnstapParser,
7 schema::{DNSTAP_VALUE_PATHS, DnstapEventSchema},
8};
9use vector_lib::{
10 configurable::configurable_component,
11 event::{Event, LogEvent},
12 internal_event::{ByteSize, BytesReceived, InternalEventHandle, Protocol, Registered},
13 lookup::{owned_value_path, path},
14 tls::MaybeTlsSettings,
15};
16use vrl::{
17 path::{OwnedValuePath, PathPrefix},
18 value::{Kind, kind::Collection},
19};
20
21use super::util::framestream::{
22 FrameHandler, build_framestream_tcp_source, build_framestream_unix_source,
23};
24use crate::{
25 Result,
26 config::{DataType, SourceConfig, SourceContext, SourceOutput, log_schema},
27 internal_events::DnstapParseError,
28};
29
30pub mod tcp;
31#[cfg(unix)]
32pub mod unix;
33use vector_lib::{
34 config::{LegacyKey, LogNamespace},
35 lookup::lookup_v2::OptionalValuePath,
36};
37
38#[configurable_component(source("dnstap", "Collect DNS logs from a dnstap-compatible server."))]
40#[derive(Clone, Debug)]
41pub struct DnstapConfig {
42 #[serde(flatten)]
43 pub mode: Mode,
44
45 #[serde(default = "default_max_frame_length")]
49 #[configurable(metadata(docs::type_unit = "bytes"))]
50 pub max_frame_length: usize,
51
52 pub host_key: Option<OptionalValuePath>,
60
61 pub raw_data_only: Option<bool>,
66
67 pub multithreaded: Option<bool>,
69
70 pub max_frame_handling_tasks: Option<usize>,
72
73 #[serde(default = "crate::serde::default_false")]
75 pub lowercase_hostnames: bool,
76
77 #[configurable(metadata(docs::hidden))]
79 #[serde(default)]
80 pub log_namespace: Option<bool>,
81}
82
83fn default_max_frame_length() -> usize {
84 bytesize::kib(100u64) as usize
85}
86
87#[configurable_component]
89#[derive(Clone, Debug)]
90#[serde(tag = "mode", rename_all = "snake_case")]
91#[configurable(metadata(docs::enum_tag_description = "The type of dnstap socket to use."))]
92#[allow(clippy::large_enum_variant)] pub enum Mode {
94 Tcp(tcp::TcpConfig),
96
97 #[cfg(unix)]
99 Unix(unix::UnixConfig),
100}
101
102impl DnstapConfig {
103 pub fn new(socket_path: PathBuf) -> Self {
104 Self {
105 mode: Mode::Unix(unix::UnixConfig::new(socket_path)),
106 ..Default::default()
107 }
108 }
109
110 fn log_namespace(&self) -> LogNamespace {
111 self.log_namespace.unwrap_or(false).into()
112 }
113
114 fn raw_data_only(&self) -> bool {
115 self.raw_data_only.unwrap_or(false)
116 }
117
118 pub fn schema_definition(&self, log_namespace: LogNamespace) -> vector_lib::schema::Definition {
119 let event_schema = DnstapEventSchema;
120
121 match self.log_namespace() {
122 LogNamespace::Legacy => {
123 let schema = vector_lib::schema::Definition::empty_legacy_namespace();
124
125 if self.raw_data_only()
126 && let Some(message_key) = log_schema().message_key()
127 {
128 return schema.with_event_field(message_key, Kind::bytes(), Some("message"));
129 }
130 event_schema.schema_definition(schema)
131 }
132 LogNamespace::Vector => {
133 let schema = vector_lib::schema::Definition::new_with_default_metadata(
134 Kind::object(Collection::empty()),
135 [log_namespace],
136 )
137 .with_standard_vector_source_metadata();
138
139 if self.raw_data_only() {
140 schema.with_event_field(
141 &owned_value_path!("message"),
142 Kind::bytes(),
143 Some("message"),
144 )
145 } else {
146 event_schema.schema_definition(schema)
147 }
148 }
149 }
150 }
151}
152
153impl Default for DnstapConfig {
154 fn default() -> Self {
155 Self {
156 #[cfg(unix)]
157 mode: Mode::Unix(unix::UnixConfig::default()),
158 #[cfg(not(unix))]
159 mode: Mode::Tcp(tcp::TcpConfig::from_address(std::net::SocketAddr::new(
160 std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
161 9000,
162 ))),
163 max_frame_length: default_max_frame_length(),
164 host_key: None,
165 raw_data_only: None,
166 multithreaded: None,
167 max_frame_handling_tasks: None,
168 lowercase_hostnames: false,
169 log_namespace: None,
170 }
171 }
172}
173
174impl_generate_config_from_default!(DnstapConfig);
175
176#[async_trait::async_trait]
177#[typetag::serde(name = "dnstap")]
178impl SourceConfig for DnstapConfig {
179 async fn build(&self, cx: SourceContext) -> Result<super::Source> {
180 let log_namespace = cx.log_namespace(self.log_namespace);
181 let common_frame_handler = CommonFrameHandler::new(self, log_namespace);
182 match &self.mode {
183 Mode::Tcp(config) => {
184 let tls_config = config.tls().as_ref().map(|tls| tls.tls_config.clone());
185
186 let tls = MaybeTlsSettings::from_config(tls_config.as_ref(), true)?;
187 let frame_handler = tcp::DnstapFrameHandler::new(
188 config.clone(),
189 tls,
190 common_frame_handler,
191 log_namespace,
192 );
193
194 build_framestream_tcp_source(frame_handler, cx.shutdown, cx.out)
195 }
196 #[cfg(unix)]
197 Mode::Unix(config) => {
198 let frame_handler =
199 unix::DnstapFrameHandler::new(config.clone(), common_frame_handler);
200 build_framestream_unix_source(frame_handler, cx.shutdown, cx.out)
201 }
202 }
203 }
204
205 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
206 let log_namespace = global_log_namespace.merge(Some(self.log_namespace()));
207 let schema_definition = self
208 .schema_definition(log_namespace)
209 .with_standard_vector_source_metadata();
210 vec![SourceOutput::new_maybe_logs(
211 DataType::Log,
212 schema_definition,
213 )]
214 }
215
216 fn can_acknowledge(&self) -> bool {
217 false
218 }
219}
220
221#[derive(Clone)]
222struct CommonFrameHandler {
223 max_frame_length: usize,
224 content_type: String,
225 raw_data_only: bool,
226 multithreaded: bool,
227 max_frame_handling_tasks: usize,
228 host_key: Option<OwnedValuePath>,
229 timestamp_key: Option<OwnedValuePath>,
230 source_type_key: Option<OwnedValuePath>,
231 bytes_received: Registered<BytesReceived>,
232 lowercase_hostnames: bool,
233 log_namespace: LogNamespace,
234}
235
236impl CommonFrameHandler {
237 pub fn new(config: &DnstapConfig, log_namespace: LogNamespace) -> Self {
238 let source_type_key = log_schema().source_type_key();
239 let timestamp_key = log_schema().timestamp_key();
240
241 let host_key = config
242 .host_key
243 .clone()
244 .map_or(log_schema().host_key().cloned(), |k| k.path);
245
246 Self {
247 max_frame_length: config.max_frame_length,
248 content_type: "protobuf:dnstap.Dnstap".to_string(),
249 raw_data_only: config.raw_data_only.unwrap_or(false),
250 multithreaded: config.multithreaded.unwrap_or(false),
251 max_frame_handling_tasks: config.max_frame_handling_tasks.unwrap_or(1000),
252 host_key,
253 timestamp_key: timestamp_key.cloned(),
254 source_type_key: source_type_key.cloned(),
255 bytes_received: register!(BytesReceived::from(Protocol::from("protobuf"))),
256 lowercase_hostnames: config.lowercase_hostnames,
257 log_namespace,
258 }
259 }
260}
261
262impl FrameHandler for CommonFrameHandler {
263 fn content_type(&self) -> String {
264 self.content_type.clone()
265 }
266
267 fn max_frame_length(&self) -> usize {
268 self.max_frame_length
269 }
270
271 fn handle_event(
272 &self,
273 received_from: Option<vrl::prelude::Bytes>,
274 frame: vrl::prelude::Bytes,
275 ) -> Option<vector_lib::event::Event> {
276 self.bytes_received.emit(ByteSize(frame.len()));
277
278 let mut log_event = LogEvent::default();
279
280 if let Some(host) = received_from {
281 self.log_namespace.insert_source_metadata(
282 DnstapConfig::NAME,
283 &mut log_event,
284 self.host_key.as_ref().map(LegacyKey::Overwrite),
285 path!("host"),
286 host,
287 );
288 }
289
290 if self.raw_data_only {
291 log_event.insert(
292 (PathPrefix::Event, &DNSTAP_VALUE_PATHS.raw_data),
293 BASE64_STANDARD.encode(&frame),
294 );
295 } else if let Err(err) = DnstapParser::parse(
296 &mut log_event,
297 frame,
298 DnsParserOptions {
299 lowercase_hostnames: self.lowercase_hostnames,
300 },
301 ) {
302 emit!(DnstapParseError {
303 error: format!("Dnstap protobuf decode error {err:?}.")
304 });
305 return None;
306 }
307
308 if self.log_namespace == LogNamespace::Vector {
309 self.log_namespace.insert_vector_metadata(
311 &mut log_event,
312 self.timestamp_key(),
313 path!("ingest_timestamp"),
314 chrono::Utc::now(),
315 );
316 }
317
318 self.log_namespace.insert_vector_metadata(
319 &mut log_event,
320 self.source_type_key(),
321 path!("source_type"),
322 DnstapConfig::NAME,
323 );
324
325 Some(Event::from(log_event))
326 }
327
328 fn multithreaded(&self) -> bool {
329 self.multithreaded
330 }
331
332 fn max_frame_handling_tasks(&self) -> usize {
333 self.max_frame_handling_tasks
334 }
335
336 fn host_key(&self) -> &Option<vrl::path::OwnedValuePath> {
337 &self.host_key
338 }
339
340 fn timestamp_key(&self) -> Option<&vrl::path::OwnedValuePath> {
341 self.timestamp_key.as_ref()
342 }
343
344 fn source_type_key(&self) -> Option<&vrl::path::OwnedValuePath> {
345 self.source_type_key.as_ref()
346 }
347}
348
349#[cfg(test)]
350mod tests {
351 use vector_lib::event::{Event, LogEvent};
352
353 use super::*;
354
355 #[test]
356 fn simple_matches_schema() {
357 let record = r#"{"dataType":"Message",
358 "dataTypeId":1,
359 "messageType":"ClientQuery",
360 "messageTypeId":5,
361 "requestData":{
362 "fullRcode":0,
363 "header":{
364 "aa":false,
365 "ad":true,
366 "anCount":0,
367 "arCount":1,
368 "cd":false,
369 "id":38339,
370 "nsCount":0,
371 "opcode":0,
372 "qdCount":1,
373 "qr":0,
374 "ra":false,
375 "rcode":0,
376 "rd":true,
377 "tc":false},
378 "opt":{
379 "do":false,
380 "ednsVersion":0,
381 "extendedRcode":0,
382 "options":[{"optCode":10,
383 "optName":"Cookie",
384 "optValue":"5JiWq4VYa7U="}],
385 "udpPayloadSize":1232},
386 "question":[{"class":"IN","domainName":"whoami.example.org.","questionType":"A","questionTypeId":1}],
387 "rcodeName":"NoError",
388 "time":1667909880863224758,
389 "timePrecision":"ns"},
390 "serverId":"stephenwakely-Precision-5570",
391 "serverVersion":"CoreDNS-1.10.0",
392 "socketFamily":"INET",
393 "socketProtocol":"UDP",
394 "sourceAddress":"0.0.0.0",
395 "sourcePort":54782,
396 "source_type":"dnstap",
397 "time":1667909880863224758,
398 "timePrecision":"ns"
399 }"#;
400
401 let json: serde_json::Value = serde_json::from_str(record).unwrap();
402 let mut event = Event::from(LogEvent::from(vrl::value::Value::from(json)));
403 event.as_mut_log().insert("timestamp", chrono::Utc::now());
404
405 let definition = DnstapEventSchema;
406 let schema = vector_lib::schema::Definition::empty_legacy_namespace()
407 .with_standard_vector_source_metadata();
408
409 definition
410 .schema_definition(schema)
411 .assert_valid_for_event(&event)
412 }
413}
414
415#[cfg(all(test, feature = "dnstap-integration-tests"))]
416mod integration_tests {
417 #![allow(clippy::print_stdout)] use bollard::{
420 Docker,
421 exec::{CreateExecOptions, StartExecOptions},
422 };
423 use futures::StreamExt;
424 use serde_json::json;
425 use tokio::time;
426 use vector_lib::{event::Event, lookup::lookup_v2::OptionalValuePath};
427
428 use self::unix::UnixConfig;
429 use super::*;
430 use crate::{
431 SourceSender,
432 event::Value,
433 test_util::{
434 components::{SOURCE_TAGS, assert_source_compliance},
435 wait_for,
436 },
437 };
438
439 async fn test_dnstap(raw_data: bool, query_type: &'static str) {
440 assert_source_compliance(&SOURCE_TAGS, async {
441 let (sender, mut recv) = SourceSender::new_test();
442
443 tokio::spawn(async move {
444 let socket = get_socket(raw_data, query_type);
445
446 DnstapConfig {
447 mode: Mode::Unix(UnixConfig {
448 socket_path: socket,
449 socket_file_mode: Some(511),
450 socket_receive_buffer_size: Some(10485760),
451 socket_send_buffer_size: Some(10485760),
452 }),
453 max_frame_length: 102400,
454 host_key: Some(OptionalValuePath::from(owned_value_path!("key"))),
455 raw_data_only: Some(raw_data),
456 multithreaded: Some(false),
457 max_frame_handling_tasks: Some(100000),
458 lowercase_hostnames: false,
459 log_namespace: None,
460 }
461 .build(SourceContext::new_test(sender, None))
462 .await
463 .unwrap()
464 .await
465 .unwrap()
466 });
467
468 send_query(raw_data, query_type);
469
470 let event = time::timeout(time::Duration::from_secs(10), recv.next())
471 .await
472 .expect("fetch dnstap source event timeout")
473 .expect("failed to get dnstap source event from a stream");
474 let mut events = vec![event];
475 loop {
476 match time::timeout(time::Duration::from_secs(1), recv.next()).await {
477 Ok(Some(event)) => events.push(event),
478 Ok(None) => {
479 println!("None: No event");
480 break;
481 }
482 Err(e) => {
483 println!("Error: {e}");
484 break;
485 }
486 }
487 }
488
489 verify_events(raw_data, query_type, &events);
490 })
491 .await;
492 }
493
494 fn send_query(raw_data: bool, query_type: &'static str) {
495 tokio::spawn(async move {
496 let socket_path = get_socket(raw_data, query_type);
497 let (query_port, control_port) = get_bind_ports(raw_data, query_type);
498
499 wait_for(move || {
502 let path = socket_path.clone();
503 async move { path.exists() }
504 })
505 .await;
506
507 reload_bind_dnstap_socket(control_port).await;
509
510 match query_type {
511 "query" => {
512 nslookup(query_port).await;
513 }
514 "update" => {
515 nsupdate().await;
516 }
517 _ => (),
518 }
519 });
520 }
521
522 fn verify_events(raw_data: bool, query_event: &'static str, events: &[Event]) {
523 if raw_data {
524 assert_eq!(events.len(), 2);
525 assert!(
526 events.iter().all(|v| v.as_log().get("rawData").is_some()),
527 "No rawData field!"
528 );
529 } else if query_event == "query" {
530 assert_eq!(events.len(), 2);
531 assert!(
532 events
533 .iter()
534 .any(|v| v.as_log().get("messageType")
535 == Some(&Value::Bytes("ClientQuery".into()))),
536 "No ClientQuery event!"
537 );
538 assert!(
539 events.iter().any(|v| v.as_log().get("messageType")
540 == Some(&Value::Bytes("ClientResponse".into()))),
541 "No ClientResponse event!"
542 );
543 } else if query_event == "update" {
544 assert_eq!(events.len(), 4);
545 assert!(
546 events
547 .iter()
548 .any(|v| v.as_log().get("messageType")
549 == Some(&Value::Bytes("UpdateQuery".into()))),
550 "No UpdateQuery event!"
551 );
552 assert!(
553 events.iter().any(|v| v.as_log().get("messageType")
554 == Some(&Value::Bytes("UpdateResponse".into()))),
555 "No UpdateResponse event!"
556 );
557 assert!(
558 events
559 .iter()
560 .any(|v| v.as_log().get("messageType")
561 == Some(&Value::Bytes("AuthQuery".into()))),
562 "No UpdateQuery event!"
563 );
564 assert!(
565 events
566 .iter()
567 .any(|v| v.as_log().get("messageType")
568 == Some(&Value::Bytes("AuthResponse".into()))),
569 "No UpdateResponse event!"
570 );
571 }
572
573 for event in events {
574 let json = serde_json::to_value(event.as_log().all_event_fields().unwrap()).unwrap();
575 match query_event {
576 "query" => {
577 if json["messageType"] == json!("ClientQuery") {
578 assert_eq!(
579 json["requestData.question[0].domainName"],
580 json!("h1.example.com.")
581 );
582 assert_eq!(json["requestData.rcodeName"], json!("NoError"));
583 } else if json["messageType"] == json!("ClientResponse") {
584 assert_eq!(
585 json["responseData.answers[0].domainName"],
586 json!("h1.example.com.")
587 );
588 assert_eq!(json["responseData.answers[0].rData"], json!("10.0.0.11"));
589 assert_eq!(json["responseData.rcodeName"], json!("NoError"));
590 }
591 }
592 "update" => {
593 if json["messageType"] == json!("UpdateQuery") {
594 assert_eq!(
595 json["requestData.update[0].domainName"],
596 json!("dh1.example.com.")
597 );
598 assert_eq!(json["requestData.update[0].rData"], json!("10.0.0.21"));
599 assert_eq!(json["requestData.rcodeName"], json!("NoError"));
600 } else if json["messageType"] == json!("UpdateResponse") {
601 assert_eq!(json["responseData.rcodeName"], json!("NoError"));
602 }
603 }
604 _ => (),
605 }
606 }
607 }
608
609 fn get_container() -> String {
610 std::env::var("CONTAINER_NAME").unwrap_or_else(|_| "vector_dnstap".into())
611 }
612
613 fn get_socket(raw_data: bool, query_type: &'static str) -> PathBuf {
614 let socket_folder = std::env::var("BIND_SOCKET")
615 .map(PathBuf::from)
616 .expect("BIND socket directory must be specified via BIND_SOCKET");
617
618 match query_type {
619 "query" if raw_data => socket_folder.join("dnstap.sock1"),
620 "query" => socket_folder.join("dnstap.sock2"),
621 "update" => socket_folder.join("dnstap.sock3"),
622 _ => unreachable!("no other test variants should exist"),
623 }
624 }
625
626 fn get_bind_ports(raw_data: bool, query_type: &'static str) -> (&'static str, &'static str) {
627 match query_type {
629 "query" if raw_data => ("8001", "9001"),
630 "query" => ("8002", "9002"),
631 "update" => ("8003", "9003"),
632 _ => unreachable!("no other test variants should exist"),
633 }
634 }
635
636 async fn dnstap_exec(cmd: Vec<&str>) {
637 let docker = Docker::connect_with_defaults().expect("failed binding to docker socket");
638 let config = CreateExecOptions {
639 cmd: Some(cmd),
640 attach_stdout: Some(true),
641 attach_stderr: Some(true),
642 ..Default::default()
643 };
644 let result = docker
645 .create_exec(get_container().as_str(), config)
646 .await
647 .expect("failed to execute command");
648 docker
649 .start_exec(&result.id, None::<StartExecOptions>)
650 .await
651 .expect("failed to execute command");
652 }
653
654 async fn reload_bind_dnstap_socket(control_port: &str) {
655 dnstap_exec(vec![
656 "/usr/sbin/rndc",
657 "-p",
658 control_port,
659 "dnstap",
660 "-reopen",
661 ])
662 .await
663 }
664
665 async fn nslookup(port: &str) {
666 dnstap_exec(vec![
667 "nslookup",
668 "-type=A",
669 format!("-port={port}").as_str(),
670 "h1.example.com",
671 "localhost",
672 ])
673 .await
674 }
675
676 async fn nsupdate() {
677 dnstap_exec(vec!["nsupdate", "-v", "/bind3/etc/bind/nsupdate.txt"]).await
678 }
679
680 #[tokio::test]
681 async fn test_dnstap_raw_event() {
682 test_dnstap(true, "query").await;
683 }
684
685 #[tokio::test]
686 async fn test_dnstap_query_event() {
687 test_dnstap(false, "query").await;
688 }
689
690 #[tokio::test]
691 async fn test_dnstap_update_event() {
692 test_dnstap(false, "update").await;
693 }
694}