1use std::path::PathBuf;
2
3use base64::prelude::{Engine as _, BASE64_STANDARD};
4use dnsmsg_parser::dns_message_parser::DnsParserOptions;
5use dnstap_parser::parser::DnstapParser;
6use dnstap_parser::schema::DnstapEventSchema;
7use vector_lib::event::{Event, LogEvent};
8use vector_lib::internal_event::{
9 ByteSize, BytesReceived, InternalEventHandle, Protocol, Registered,
10};
11use vector_lib::lookup::{owned_value_path, path};
12use vector_lib::{configurable::configurable_component, tls::MaybeTlsSettings};
13use vrl::path::{OwnedValuePath, PathPrefix};
14use vrl::value::{kind::Collection, Kind};
15
16use super::util::framestream::{
17 build_framestream_tcp_source, build_framestream_unix_source, FrameHandler,
18};
19use crate::internal_events::DnstapParseError;
20use crate::{
21 config::{log_schema, DataType, SourceConfig, SourceContext, SourceOutput},
22 Result,
23};
24use dnstap_parser::schema::DNSTAP_VALUE_PATHS;
25
26pub mod tcp;
27#[cfg(unix)]
28pub mod unix;
29use vector_lib::config::{LegacyKey, LogNamespace};
30use vector_lib::lookup::lookup_v2::OptionalValuePath;
31
32#[configurable_component(source("dnstap", "Collect DNS logs from a dnstap-compatible server."))]
34#[derive(Clone, Debug)]
35pub struct DnstapConfig {
36 #[serde(flatten)]
37 pub mode: Mode,
38
39 #[serde(default = "default_max_frame_length")]
43 #[configurable(metadata(docs::type_unit = "bytes"))]
44 pub max_frame_length: usize,
45
46 pub host_key: Option<OptionalValuePath>,
54
55 pub raw_data_only: Option<bool>,
60
61 pub multithreaded: Option<bool>,
63
64 pub max_frame_handling_tasks: Option<usize>,
66
67 #[serde(default = "crate::serde::default_false")]
69 pub lowercase_hostnames: bool,
70
71 #[configurable(metadata(docs::hidden))]
73 #[serde(default)]
74 pub log_namespace: Option<bool>,
75}
76
77fn default_max_frame_length() -> usize {
78 bytesize::kib(100u64) as usize
79}
80
81#[configurable_component]
83#[derive(Clone, Debug)]
84#[serde(tag = "mode", rename_all = "snake_case")]
85#[configurable(metadata(docs::enum_tag_description = "The type of dnstap socket to use."))]
86#[allow(clippy::large_enum_variant)] pub enum Mode {
88 Tcp(tcp::TcpConfig),
90
91 #[cfg(unix)]
93 Unix(unix::UnixConfig),
94}
95
96impl DnstapConfig {
97 pub fn new(socket_path: PathBuf) -> Self {
98 Self {
99 mode: Mode::Unix(unix::UnixConfig::new(socket_path)),
100 ..Default::default()
101 }
102 }
103
104 fn log_namespace(&self) -> LogNamespace {
105 self.log_namespace.unwrap_or(false).into()
106 }
107
108 fn raw_data_only(&self) -> bool {
109 self.raw_data_only.unwrap_or(false)
110 }
111
112 pub fn schema_definition(&self, log_namespace: LogNamespace) -> vector_lib::schema::Definition {
113 let event_schema = DnstapEventSchema;
114
115 match self.log_namespace() {
116 LogNamespace::Legacy => {
117 let schema = vector_lib::schema::Definition::empty_legacy_namespace();
118
119 if self.raw_data_only() {
120 if let Some(message_key) = log_schema().message_key() {
121 return schema.with_event_field(
122 message_key,
123 Kind::bytes(),
124 Some("message"),
125 );
126 }
127 }
128 event_schema.schema_definition(schema)
129 }
130 LogNamespace::Vector => {
131 let schema = vector_lib::schema::Definition::new_with_default_metadata(
132 Kind::object(Collection::empty()),
133 [log_namespace],
134 )
135 .with_standard_vector_source_metadata();
136
137 if self.raw_data_only() {
138 schema.with_event_field(
139 &owned_value_path!("message"),
140 Kind::bytes(),
141 Some("message"),
142 )
143 } else {
144 event_schema.schema_definition(schema)
145 }
146 }
147 }
148 }
149}
150
151impl Default for DnstapConfig {
152 fn default() -> Self {
153 Self {
154 #[cfg(unix)]
155 mode: Mode::Unix(unix::UnixConfig::default()),
156 #[cfg(not(unix))]
157 mode: Mode::Tcp(tcp::TcpConfig::from_address(std::net::SocketAddr::new(
158 std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
159 9000,
160 ))),
161 max_frame_length: default_max_frame_length(),
162 host_key: None,
163 raw_data_only: None,
164 multithreaded: None,
165 max_frame_handling_tasks: None,
166 lowercase_hostnames: false,
167 log_namespace: None,
168 }
169 }
170}
171
172impl_generate_config_from_default!(DnstapConfig);
173
174#[async_trait::async_trait]
175#[typetag::serde(name = "dnstap")]
176impl SourceConfig for DnstapConfig {
177 async fn build(&self, cx: SourceContext) -> Result<super::Source> {
178 let log_namespace = cx.log_namespace(self.log_namespace);
179 let common_frame_handler = CommonFrameHandler::new(self, log_namespace);
180 match &self.mode {
181 Mode::Tcp(config) => {
182 let tls_config = config.tls().as_ref().map(|tls| tls.tls_config.clone());
183
184 let tls = MaybeTlsSettings::from_config(tls_config.as_ref(), true)?;
185 let frame_handler = tcp::DnstapFrameHandler::new(
186 config.clone(),
187 tls,
188 common_frame_handler,
189 log_namespace,
190 );
191
192 build_framestream_tcp_source(frame_handler, cx.shutdown, cx.out)
193 }
194 #[cfg(unix)]
195 Mode::Unix(config) => {
196 let frame_handler =
197 unix::DnstapFrameHandler::new(config.clone(), common_frame_handler);
198 build_framestream_unix_source(frame_handler, cx.shutdown, cx.out)
199 }
200 }
201 }
202
203 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
204 let log_namespace = global_log_namespace.merge(Some(self.log_namespace()));
205 let schema_definition = self
206 .schema_definition(log_namespace)
207 .with_standard_vector_source_metadata();
208 vec![SourceOutput::new_maybe_logs(
209 DataType::Log,
210 schema_definition,
211 )]
212 }
213
214 fn can_acknowledge(&self) -> bool {
215 false
216 }
217}
218
219#[derive(Clone)]
220struct CommonFrameHandler {
221 max_frame_length: usize,
222 content_type: String,
223 raw_data_only: bool,
224 multithreaded: bool,
225 max_frame_handling_tasks: usize,
226 host_key: Option<OwnedValuePath>,
227 timestamp_key: Option<OwnedValuePath>,
228 source_type_key: Option<OwnedValuePath>,
229 bytes_received: Registered<BytesReceived>,
230 lowercase_hostnames: bool,
231 log_namespace: LogNamespace,
232}
233
234impl CommonFrameHandler {
235 pub fn new(config: &DnstapConfig, log_namespace: LogNamespace) -> Self {
236 let source_type_key = log_schema().source_type_key();
237 let timestamp_key = log_schema().timestamp_key();
238
239 let host_key = config
240 .host_key
241 .clone()
242 .map_or(log_schema().host_key().cloned(), |k| k.path);
243
244 Self {
245 max_frame_length: config.max_frame_length,
246 content_type: "protobuf:dnstap.Dnstap".to_string(),
247 raw_data_only: config.raw_data_only.unwrap_or(false),
248 multithreaded: config.multithreaded.unwrap_or(false),
249 max_frame_handling_tasks: config.max_frame_handling_tasks.unwrap_or(1000),
250 host_key,
251 timestamp_key: timestamp_key.cloned(),
252 source_type_key: source_type_key.cloned(),
253 bytes_received: register!(BytesReceived::from(Protocol::from("protobuf"))),
254 lowercase_hostnames: config.lowercase_hostnames,
255 log_namespace,
256 }
257 }
258}
259
260impl FrameHandler for CommonFrameHandler {
261 fn content_type(&self) -> String {
262 self.content_type.clone()
263 }
264
265 fn max_frame_length(&self) -> usize {
266 self.max_frame_length
267 }
268
269 fn handle_event(
270 &self,
271 received_from: Option<vrl::prelude::Bytes>,
272 frame: vrl::prelude::Bytes,
273 ) -> Option<vector_lib::event::Event> {
274 self.bytes_received.emit(ByteSize(frame.len()));
275
276 let mut log_event = LogEvent::default();
277
278 if let Some(host) = received_from {
279 self.log_namespace.insert_source_metadata(
280 DnstapConfig::NAME,
281 &mut log_event,
282 self.host_key.as_ref().map(LegacyKey::Overwrite),
283 path!("host"),
284 host,
285 );
286 }
287
288 if self.raw_data_only {
289 log_event.insert(
290 (PathPrefix::Event, &DNSTAP_VALUE_PATHS.raw_data),
291 BASE64_STANDARD.encode(&frame),
292 );
293 } else if let Err(err) = DnstapParser::parse(
294 &mut log_event,
295 frame,
296 DnsParserOptions {
297 lowercase_hostnames: self.lowercase_hostnames,
298 },
299 ) {
300 emit!(DnstapParseError {
301 error: format!("Dnstap protobuf decode error {err:?}.")
302 });
303 return None;
304 }
305
306 if self.log_namespace == LogNamespace::Vector {
307 self.log_namespace.insert_vector_metadata(
309 &mut log_event,
310 self.timestamp_key(),
311 path!("ingest_timestamp"),
312 chrono::Utc::now(),
313 );
314 }
315
316 self.log_namespace.insert_vector_metadata(
317 &mut log_event,
318 self.source_type_key(),
319 path!("source_type"),
320 DnstapConfig::NAME,
321 );
322
323 Some(Event::from(log_event))
324 }
325
326 fn multithreaded(&self) -> bool {
327 self.multithreaded
328 }
329
330 fn max_frame_handling_tasks(&self) -> usize {
331 self.max_frame_handling_tasks
332 }
333
334 fn host_key(&self) -> &Option<vrl::path::OwnedValuePath> {
335 &self.host_key
336 }
337
338 fn timestamp_key(&self) -> Option<&vrl::path::OwnedValuePath> {
339 self.timestamp_key.as_ref()
340 }
341
342 fn source_type_key(&self) -> Option<&vrl::path::OwnedValuePath> {
343 self.source_type_key.as_ref()
344 }
345}
346
347#[cfg(test)]
348mod tests {
349 use vector_lib::event::{Event, LogEvent};
350
351 use super::*;
352
353 #[test]
354 fn simple_matches_schema() {
355 let record = r#"{"dataType":"Message",
356 "dataTypeId":1,
357 "messageType":"ClientQuery",
358 "messageTypeId":5,
359 "requestData":{
360 "fullRcode":0,
361 "header":{
362 "aa":false,
363 "ad":true,
364 "anCount":0,
365 "arCount":1,
366 "cd":false,
367 "id":38339,
368 "nsCount":0,
369 "opcode":0,
370 "qdCount":1,
371 "qr":0,
372 "ra":false,
373 "rcode":0,
374 "rd":true,
375 "tc":false},
376 "opt":{
377 "do":false,
378 "ednsVersion":0,
379 "extendedRcode":0,
380 "options":[{"optCode":10,
381 "optName":"Cookie",
382 "optValue":"5JiWq4VYa7U="}],
383 "udpPayloadSize":1232},
384 "question":[{"class":"IN","domainName":"whoami.example.org.","questionType":"A","questionTypeId":1}],
385 "rcodeName":"NoError",
386 "time":1667909880863224758,
387 "timePrecision":"ns"},
388 "serverId":"stephenwakely-Precision-5570",
389 "serverVersion":"CoreDNS-1.10.0",
390 "socketFamily":"INET",
391 "socketProtocol":"UDP",
392 "sourceAddress":"0.0.0.0",
393 "sourcePort":54782,
394 "source_type":"dnstap",
395 "time":1667909880863224758,
396 "timePrecision":"ns"
397 }"#;
398
399 let json: serde_json::Value = serde_json::from_str(record).unwrap();
400 let mut event = Event::from(LogEvent::from(vrl::value::Value::from(json)));
401 event.as_mut_log().insert("timestamp", chrono::Utc::now());
402
403 let definition = DnstapEventSchema;
404 let schema = vector_lib::schema::Definition::empty_legacy_namespace()
405 .with_standard_vector_source_metadata();
406
407 definition
408 .schema_definition(schema)
409 .assert_valid_for_event(&event)
410 }
411}
412
413#[cfg(all(test, feature = "dnstap-integration-tests"))]
414mod integration_tests {
415 #![allow(clippy::print_stdout)] use bollard::exec::{CreateExecOptions, StartExecOptions};
418 use bollard::Docker;
419 use futures::StreamExt;
420 use serde_json::json;
421 use tokio::time;
422 use vector_lib::event::Event;
423 use vector_lib::lookup::lookup_v2::OptionalValuePath;
424
425 use self::unix::UnixConfig;
426
427 use super::*;
428 use crate::{
429 event::Value,
430 test_util::{
431 components::{assert_source_compliance, SOURCE_TAGS},
432 wait_for,
433 },
434 SourceSender,
435 };
436
437 async fn test_dnstap(raw_data: bool, query_type: &'static str) {
438 assert_source_compliance(&SOURCE_TAGS, async {
439 let (sender, mut recv) = SourceSender::new_test();
440
441 tokio::spawn(async move {
442 let socket = get_socket(raw_data, query_type);
443
444 DnstapConfig {
445 mode: Mode::Unix(UnixConfig {
446 socket_path: socket,
447 socket_file_mode: Some(511),
448 socket_receive_buffer_size: Some(10485760),
449 socket_send_buffer_size: Some(10485760),
450 }),
451 max_frame_length: 102400,
452 host_key: Some(OptionalValuePath::from(owned_value_path!("key"))),
453 raw_data_only: Some(raw_data),
454 multithreaded: Some(false),
455 max_frame_handling_tasks: Some(100000),
456 lowercase_hostnames: false,
457 log_namespace: None,
458 }
459 .build(SourceContext::new_test(sender, None))
460 .await
461 .unwrap()
462 .await
463 .unwrap()
464 });
465
466 send_query(raw_data, query_type);
467
468 let event = time::timeout(time::Duration::from_secs(10), recv.next())
469 .await
470 .expect("fetch dnstap source event timeout")
471 .expect("failed to get dnstap source event from a stream");
472 let mut events = vec![event];
473 loop {
474 match time::timeout(time::Duration::from_secs(1), recv.next()).await {
475 Ok(Some(event)) => events.push(event),
476 Ok(None) => {
477 println!("None: No event");
478 break;
479 }
480 Err(e) => {
481 println!("Error: {e}");
482 break;
483 }
484 }
485 }
486
487 verify_events(raw_data, query_type, &events);
488 })
489 .await;
490 }
491
492 fn send_query(raw_data: bool, query_type: &'static str) {
493 tokio::spawn(async move {
494 let socket_path = get_socket(raw_data, query_type);
495 let (query_port, control_port) = get_bind_ports(raw_data, query_type);
496
497 wait_for(move || {
500 let path = socket_path.clone();
501 async move { path.exists() }
502 })
503 .await;
504
505 reload_bind_dnstap_socket(control_port).await;
507
508 match query_type {
509 "query" => {
510 nslookup(query_port).await;
511 }
512 "update" => {
513 nsupdate().await;
514 }
515 _ => (),
516 }
517 });
518 }
519
520 fn verify_events(raw_data: bool, query_event: &'static str, events: &[Event]) {
521 if raw_data {
522 assert_eq!(events.len(), 2);
523 assert!(
524 events.iter().all(|v| v.as_log().get("rawData").is_some()),
525 "No rawData field!"
526 );
527 } else if query_event == "query" {
528 assert_eq!(events.len(), 2);
529 assert!(
530 events
531 .iter()
532 .any(|v| v.as_log().get("messageType")
533 == Some(&Value::Bytes("ClientQuery".into()))),
534 "No ClientQuery event!"
535 );
536 assert!(
537 events.iter().any(|v| v.as_log().get("messageType")
538 == Some(&Value::Bytes("ClientResponse".into()))),
539 "No ClientResponse event!"
540 );
541 } else if query_event == "update" {
542 assert_eq!(events.len(), 4);
543 assert!(
544 events
545 .iter()
546 .any(|v| v.as_log().get("messageType")
547 == Some(&Value::Bytes("UpdateQuery".into()))),
548 "No UpdateQuery event!"
549 );
550 assert!(
551 events.iter().any(|v| v.as_log().get("messageType")
552 == Some(&Value::Bytes("UpdateResponse".into()))),
553 "No UpdateResponse event!"
554 );
555 assert!(
556 events
557 .iter()
558 .any(|v| v.as_log().get("messageType")
559 == Some(&Value::Bytes("AuthQuery".into()))),
560 "No UpdateQuery event!"
561 );
562 assert!(
563 events
564 .iter()
565 .any(|v| v.as_log().get("messageType")
566 == Some(&Value::Bytes("AuthResponse".into()))),
567 "No UpdateResponse event!"
568 );
569 }
570
571 for event in events {
572 let json = serde_json::to_value(event.as_log().all_event_fields().unwrap()).unwrap();
573 match query_event {
574 "query" => {
575 if json["messageType"] == json!("ClientQuery") {
576 assert_eq!(
577 json["requestData.question[0].domainName"],
578 json!("h1.example.com.")
579 );
580 assert_eq!(json["requestData.rcodeName"], json!("NoError"));
581 } else if json["messageType"] == json!("ClientResponse") {
582 assert_eq!(
583 json["responseData.answers[0].domainName"],
584 json!("h1.example.com.")
585 );
586 assert_eq!(json["responseData.answers[0].rData"], json!("10.0.0.11"));
587 assert_eq!(json["responseData.rcodeName"], json!("NoError"));
588 }
589 }
590 "update" => {
591 if json["messageType"] == json!("UpdateQuery") {
592 assert_eq!(
593 json["requestData.update[0].domainName"],
594 json!("dh1.example.com.")
595 );
596 assert_eq!(json["requestData.update[0].rData"], json!("10.0.0.21"));
597 assert_eq!(json["requestData.rcodeName"], json!("NoError"));
598 } else if json["messageType"] == json!("UpdateResponse") {
599 assert_eq!(json["responseData.rcodeName"], json!("NoError"));
600 }
601 }
602 _ => (),
603 }
604 }
605 }
606
607 fn get_container() -> String {
608 std::env::var("CONTAINER_NAME").unwrap_or_else(|_| "vector_dnstap".into())
609 }
610
611 fn get_socket(raw_data: bool, query_type: &'static str) -> PathBuf {
612 let socket_folder = std::env::var("BIND_SOCKET")
613 .map(PathBuf::from)
614 .expect("BIND socket directory must be specified via BIND_SOCKET");
615
616 match query_type {
617 "query" if raw_data => socket_folder.join("dnstap.sock1"),
618 "query" => socket_folder.join("dnstap.sock2"),
619 "update" => socket_folder.join("dnstap.sock3"),
620 _ => unreachable!("no other test variants should exist"),
621 }
622 }
623
624 fn get_bind_ports(raw_data: bool, query_type: &'static str) -> (&'static str, &'static str) {
625 match query_type {
627 "query" if raw_data => ("8001", "9001"),
628 "query" => ("8002", "9002"),
629 "update" => ("8003", "9003"),
630 _ => unreachable!("no other test variants should exist"),
631 }
632 }
633
634 async fn dnstap_exec(cmd: Vec<&str>) {
635 let docker = Docker::connect_with_defaults().expect("failed binding to docker socket");
636 let config = CreateExecOptions {
637 cmd: Some(cmd),
638 attach_stdout: Some(true),
639 attach_stderr: Some(true),
640 ..Default::default()
641 };
642 let result = docker
643 .create_exec(get_container().as_str(), config)
644 .await
645 .expect("failed to execute command");
646 docker
647 .start_exec(&result.id, None::<StartExecOptions>)
648 .await
649 .expect("failed to execute command");
650 }
651
652 async fn reload_bind_dnstap_socket(control_port: &str) {
653 dnstap_exec(vec![
654 "/usr/sbin/rndc",
655 "-p",
656 control_port,
657 "dnstap",
658 "-reopen",
659 ])
660 .await
661 }
662
663 async fn nslookup(port: &str) {
664 dnstap_exec(vec![
665 "nslookup",
666 "-type=A",
667 format!("-port={port}").as_str(),
668 "h1.example.com",
669 "localhost",
670 ])
671 .await
672 }
673
674 async fn nsupdate() {
675 dnstap_exec(vec!["nsupdate", "-v", "/bind3/etc/bind/nsupdate.txt"]).await
676 }
677
678 #[tokio::test]
679 async fn test_dnstap_raw_event() {
680 test_dnstap(true, "query").await;
681 }
682
683 #[tokio::test]
684 async fn test_dnstap_query_event() {
685 test_dnstap(false, "query").await;
686 }
687
688 #[tokio::test]
689 async fn test_dnstap_update_event() {
690 test_dnstap(false, "update").await;
691 }
692}