vector/sources/dnstap/
mod.rs

1use std::path::PathBuf;
2
3use base64::prelude::{BASE64_STANDARD, Engine as _};
4use dnsmsg_parser::dns_message_parser::DnsParserOptions;
5use dnstap_parser::{
6    parser::DnstapParser,
7    schema::{DNSTAP_VALUE_PATHS, DnstapEventSchema},
8};
9use vector_lib::{
10    configurable::configurable_component,
11    event::{Event, LogEvent},
12    internal_event::{ByteSize, BytesReceived, InternalEventHandle, Protocol, Registered},
13    lookup::{owned_value_path, path},
14    tls::MaybeTlsSettings,
15};
16use vrl::{
17    path::{OwnedValuePath, PathPrefix},
18    value::{Kind, kind::Collection},
19};
20
21use super::util::framestream::{
22    FrameHandler, build_framestream_tcp_source, build_framestream_unix_source,
23};
24use crate::{
25    Result,
26    config::{DataType, SourceConfig, SourceContext, SourceOutput, log_schema},
27    internal_events::DnstapParseError,
28};
29
30pub mod tcp;
31#[cfg(unix)]
32pub mod unix;
33use vector_lib::{
34    config::{LegacyKey, LogNamespace},
35    lookup::lookup_v2::OptionalValuePath,
36};
37
38/// Configuration for the `dnstap` source.
39#[configurable_component(source("dnstap", "Collect DNS logs from a dnstap-compatible server."))]
40#[derive(Clone, Debug)]
41pub struct DnstapConfig {
42    #[serde(flatten)]
43    pub mode: Mode,
44
45    /// Maximum DNSTAP frame length that the source accepts.
46    ///
47    /// If any frame is longer than this, it is discarded.
48    #[serde(default = "default_max_frame_length")]
49    #[configurable(metadata(docs::type_unit = "bytes"))]
50    pub max_frame_length: usize,
51
52    /// Overrides the name of the log field used to add the source path to each event.
53    ///
54    /// The value is the socket path itself.
55    ///
56    /// By default, the [global `log_schema.host_key` option][global_host_key] is used.
57    ///
58    /// [global_host_key]: https://vector.dev/docs/reference/configuration/global-options/#log_schema.host_key
59    pub host_key: Option<OptionalValuePath>,
60
61    /// Whether or not to skip parsing or decoding of DNSTAP frames.
62    ///
63    /// If set to `true`, frames are not parsed or decoded. The raw frame data is set as a field on the event
64    /// (called `rawData`) and encoded as a base64 string.
65    pub raw_data_only: Option<bool>,
66
67    /// Whether or not to concurrently process DNSTAP frames.
68    pub multithreaded: Option<bool>,
69
70    /// Maximum number of frames that can be processed concurrently.
71    pub max_frame_handling_tasks: Option<usize>,
72
73    /// Whether to downcase all DNSTAP hostnames received for consistency
74    #[serde(default = "crate::serde::default_false")]
75    pub lowercase_hostnames: bool,
76
77    /// The namespace to use for logs. This overrides the global settings.
78    #[configurable(metadata(docs::hidden))]
79    #[serde(default)]
80    pub log_namespace: Option<bool>,
81}
82
83fn default_max_frame_length() -> usize {
84    bytesize::kib(100u64) as usize
85}
86
87/// Listening mode for the `dnstap` source.
88#[configurable_component]
89#[derive(Clone, Debug)]
90#[serde(tag = "mode", rename_all = "snake_case")]
91#[configurable(metadata(docs::enum_tag_description = "The type of dnstap socket to use."))]
92#[allow(clippy::large_enum_variant)] // just used for configuration
93pub enum Mode {
94    /// Listen on TCP.
95    Tcp(tcp::TcpConfig),
96
97    /// Listen on a Unix domain socket
98    #[cfg(unix)]
99    Unix(unix::UnixConfig),
100}
101
102impl DnstapConfig {
103    pub fn new(socket_path: PathBuf) -> Self {
104        Self {
105            mode: Mode::Unix(unix::UnixConfig::new(socket_path)),
106            ..Default::default()
107        }
108    }
109
110    fn log_namespace(&self) -> LogNamespace {
111        self.log_namespace.unwrap_or(false).into()
112    }
113
114    fn raw_data_only(&self) -> bool {
115        self.raw_data_only.unwrap_or(false)
116    }
117
118    pub fn schema_definition(&self, log_namespace: LogNamespace) -> vector_lib::schema::Definition {
119        let event_schema = DnstapEventSchema;
120
121        match self.log_namespace() {
122            LogNamespace::Legacy => {
123                let schema = vector_lib::schema::Definition::empty_legacy_namespace();
124
125                if self.raw_data_only()
126                    && let Some(message_key) = log_schema().message_key()
127                {
128                    return schema.with_event_field(message_key, Kind::bytes(), Some("message"));
129                }
130                event_schema.schema_definition(schema)
131            }
132            LogNamespace::Vector => {
133                let schema = vector_lib::schema::Definition::new_with_default_metadata(
134                    Kind::object(Collection::empty()),
135                    [log_namespace],
136                )
137                .with_standard_vector_source_metadata();
138
139                if self.raw_data_only() {
140                    schema.with_event_field(
141                        &owned_value_path!("message"),
142                        Kind::bytes(),
143                        Some("message"),
144                    )
145                } else {
146                    event_schema.schema_definition(schema)
147                }
148            }
149        }
150    }
151}
152
153impl Default for DnstapConfig {
154    fn default() -> Self {
155        Self {
156            #[cfg(unix)]
157            mode: Mode::Unix(unix::UnixConfig::default()),
158            #[cfg(not(unix))]
159            mode: Mode::Tcp(tcp::TcpConfig::from_address(std::net::SocketAddr::new(
160                std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
161                9000,
162            ))),
163            max_frame_length: default_max_frame_length(),
164            host_key: None,
165            raw_data_only: None,
166            multithreaded: None,
167            max_frame_handling_tasks: None,
168            lowercase_hostnames: false,
169            log_namespace: None,
170        }
171    }
172}
173
174impl_generate_config_from_default!(DnstapConfig);
175
176#[async_trait::async_trait]
177#[typetag::serde(name = "dnstap")]
178impl SourceConfig for DnstapConfig {
179    async fn build(&self, cx: SourceContext) -> Result<super::Source> {
180        let log_namespace = cx.log_namespace(self.log_namespace);
181        let common_frame_handler = CommonFrameHandler::new(self, log_namespace);
182        match &self.mode {
183            Mode::Tcp(config) => {
184                let tls_config = config.tls().as_ref().map(|tls| tls.tls_config.clone());
185
186                let tls = MaybeTlsSettings::from_config(tls_config.as_ref(), true)?;
187                let frame_handler = tcp::DnstapFrameHandler::new(
188                    config.clone(),
189                    tls,
190                    common_frame_handler,
191                    log_namespace,
192                );
193
194                build_framestream_tcp_source(frame_handler, cx.shutdown, cx.out)
195            }
196            #[cfg(unix)]
197            Mode::Unix(config) => {
198                let frame_handler =
199                    unix::DnstapFrameHandler::new(config.clone(), common_frame_handler);
200                build_framestream_unix_source(frame_handler, cx.shutdown, cx.out)
201            }
202        }
203    }
204
205    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
206        let log_namespace = global_log_namespace.merge(Some(self.log_namespace()));
207        let schema_definition = self
208            .schema_definition(log_namespace)
209            .with_standard_vector_source_metadata();
210        vec![SourceOutput::new_maybe_logs(
211            DataType::Log,
212            schema_definition,
213        )]
214    }
215
216    fn can_acknowledge(&self) -> bool {
217        false
218    }
219}
220
221#[derive(Clone)]
222struct CommonFrameHandler {
223    max_frame_length: usize,
224    content_type: String,
225    raw_data_only: bool,
226    multithreaded: bool,
227    max_frame_handling_tasks: usize,
228    host_key: Option<OwnedValuePath>,
229    timestamp_key: Option<OwnedValuePath>,
230    source_type_key: Option<OwnedValuePath>,
231    bytes_received: Registered<BytesReceived>,
232    lowercase_hostnames: bool,
233    log_namespace: LogNamespace,
234}
235
236impl CommonFrameHandler {
237    pub fn new(config: &DnstapConfig, log_namespace: LogNamespace) -> Self {
238        let source_type_key = log_schema().source_type_key();
239        let timestamp_key = log_schema().timestamp_key();
240
241        let host_key = config
242            .host_key
243            .clone()
244            .map_or(log_schema().host_key().cloned(), |k| k.path);
245
246        Self {
247            max_frame_length: config.max_frame_length,
248            content_type: "protobuf:dnstap.Dnstap".to_string(),
249            raw_data_only: config.raw_data_only.unwrap_or(false),
250            multithreaded: config.multithreaded.unwrap_or(false),
251            max_frame_handling_tasks: config.max_frame_handling_tasks.unwrap_or(1000),
252            host_key,
253            timestamp_key: timestamp_key.cloned(),
254            source_type_key: source_type_key.cloned(),
255            bytes_received: register!(BytesReceived::from(Protocol::from("protobuf"))),
256            lowercase_hostnames: config.lowercase_hostnames,
257            log_namespace,
258        }
259    }
260}
261
262impl FrameHandler for CommonFrameHandler {
263    fn content_type(&self) -> String {
264        self.content_type.clone()
265    }
266
267    fn max_frame_length(&self) -> usize {
268        self.max_frame_length
269    }
270
271    fn handle_event(
272        &self,
273        received_from: Option<vrl::prelude::Bytes>,
274        frame: vrl::prelude::Bytes,
275    ) -> Option<vector_lib::event::Event> {
276        self.bytes_received.emit(ByteSize(frame.len()));
277
278        let mut log_event = LogEvent::default();
279
280        if let Some(host) = received_from {
281            self.log_namespace.insert_source_metadata(
282                DnstapConfig::NAME,
283                &mut log_event,
284                self.host_key.as_ref().map(LegacyKey::Overwrite),
285                path!("host"),
286                host,
287            );
288        }
289
290        if self.raw_data_only {
291            log_event.insert(
292                (PathPrefix::Event, &DNSTAP_VALUE_PATHS.raw_data),
293                BASE64_STANDARD.encode(&frame),
294            );
295        } else if let Err(err) = DnstapParser::parse(
296            &mut log_event,
297            frame,
298            DnsParserOptions {
299                lowercase_hostnames: self.lowercase_hostnames,
300            },
301        ) {
302            emit!(DnstapParseError {
303                error: format!("Dnstap protobuf decode error {err:?}.")
304            });
305            return None;
306        }
307
308        if self.log_namespace == LogNamespace::Vector {
309            // The timestamp is inserted by the parser which caters for the Legacy namespace.
310            self.log_namespace.insert_vector_metadata(
311                &mut log_event,
312                self.timestamp_key(),
313                path!("ingest_timestamp"),
314                chrono::Utc::now(),
315            );
316        }
317
318        self.log_namespace.insert_vector_metadata(
319            &mut log_event,
320            self.source_type_key(),
321            path!("source_type"),
322            DnstapConfig::NAME,
323        );
324
325        Some(Event::from(log_event))
326    }
327
328    fn multithreaded(&self) -> bool {
329        self.multithreaded
330    }
331
332    fn max_frame_handling_tasks(&self) -> usize {
333        self.max_frame_handling_tasks
334    }
335
336    fn host_key(&self) -> &Option<vrl::path::OwnedValuePath> {
337        &self.host_key
338    }
339
340    fn timestamp_key(&self) -> Option<&vrl::path::OwnedValuePath> {
341        self.timestamp_key.as_ref()
342    }
343
344    fn source_type_key(&self) -> Option<&vrl::path::OwnedValuePath> {
345        self.source_type_key.as_ref()
346    }
347}
348
349#[cfg(test)]
350mod tests {
351    use vector_lib::event::{Event, LogEvent};
352
353    use super::*;
354
355    #[test]
356    fn simple_matches_schema() {
357        let record = r#"{"dataType":"Message",
358                         "dataTypeId":1,
359                         "messageType":"ClientQuery",
360                         "messageTypeId":5,
361                         "requestData":{
362                           "fullRcode":0,
363                           "header":{
364                             "aa":false,
365                             "ad":true,
366                             "anCount":0,
367                             "arCount":1,
368                             "cd":false,
369                             "id":38339,
370                             "nsCount":0,
371                             "opcode":0,
372                             "qdCount":1,
373                             "qr":0,
374                             "ra":false,
375                             "rcode":0,
376                             "rd":true,
377                             "tc":false},
378                           "opt":{
379                             "do":false,
380                             "ednsVersion":0,
381                             "extendedRcode":0,
382                             "options":[{"optCode":10,
383                                         "optName":"Cookie",
384                                         "optValue":"5JiWq4VYa7U="}],
385                             "udpPayloadSize":1232},
386                           "question":[{"class":"IN","domainName":"whoami.example.org.","questionType":"A","questionTypeId":1}],
387                           "rcodeName":"NoError",
388                           "time":1667909880863224758,
389                           "timePrecision":"ns"},
390                         "serverId":"stephenwakely-Precision-5570",
391                         "serverVersion":"CoreDNS-1.10.0",
392                         "socketFamily":"INET",
393                         "socketProtocol":"UDP",
394                         "sourceAddress":"0.0.0.0",
395                         "sourcePort":54782,
396                         "source_type":"dnstap",
397                         "time":1667909880863224758,
398                         "timePrecision":"ns"
399                         }"#;
400
401        let json: serde_json::Value = serde_json::from_str(record).unwrap();
402        let mut event = Event::from(LogEvent::from(vrl::value::Value::from(json)));
403        event.as_mut_log().insert("timestamp", chrono::Utc::now());
404
405        let definition = DnstapEventSchema;
406        let schema = vector_lib::schema::Definition::empty_legacy_namespace()
407            .with_standard_vector_source_metadata();
408
409        definition
410            .schema_definition(schema)
411            .assert_valid_for_event(&event)
412    }
413}
414
415#[cfg(all(test, feature = "dnstap-integration-tests"))]
416mod integration_tests {
417    #![allow(clippy::print_stdout)] // tests
418
419    use bollard::{
420        Docker,
421        exec::{CreateExecOptions, StartExecOptions},
422    };
423    use futures::StreamExt;
424    use serde_json::json;
425    use tokio::time;
426    use vector_lib::{event::Event, lookup::lookup_v2::OptionalValuePath};
427
428    use self::unix::UnixConfig;
429    use super::*;
430    use crate::{
431        SourceSender,
432        event::Value,
433        test_util::{
434            components::{SOURCE_TAGS, assert_source_compliance},
435            wait_for,
436        },
437    };
438
439    async fn test_dnstap(raw_data: bool, query_type: &'static str) {
440        assert_source_compliance(&SOURCE_TAGS, async {
441            let (sender, mut recv) = SourceSender::new_test();
442
443            tokio::spawn(async move {
444                let socket = get_socket(raw_data, query_type);
445
446                DnstapConfig {
447                    mode: Mode::Unix(UnixConfig {
448                        socket_path: socket,
449                        socket_file_mode: Some(511),
450                        socket_receive_buffer_size: Some(10485760),
451                        socket_send_buffer_size: Some(10485760),
452                    }),
453                    max_frame_length: 102400,
454                    host_key: Some(OptionalValuePath::from(owned_value_path!("key"))),
455                    raw_data_only: Some(raw_data),
456                    multithreaded: Some(false),
457                    max_frame_handling_tasks: Some(100000),
458                    lowercase_hostnames: false,
459                    log_namespace: None,
460                }
461                .build(SourceContext::new_test(sender, None))
462                .await
463                .unwrap()
464                .await
465                .unwrap()
466            });
467
468            send_query(raw_data, query_type);
469
470            let event = time::timeout(time::Duration::from_secs(10), recv.next())
471                .await
472                .expect("fetch dnstap source event timeout")
473                .expect("failed to get dnstap source event from a stream");
474            let mut events = vec![event];
475            loop {
476                match time::timeout(time::Duration::from_secs(1), recv.next()).await {
477                    Ok(Some(event)) => events.push(event),
478                    Ok(None) => {
479                        println!("None: No event");
480                        break;
481                    }
482                    Err(e) => {
483                        println!("Error: {e}");
484                        break;
485                    }
486                }
487            }
488
489            verify_events(raw_data, query_type, &events);
490        })
491        .await;
492    }
493
494    fn send_query(raw_data: bool, query_type: &'static str) {
495        tokio::spawn(async move {
496            let socket_path = get_socket(raw_data, query_type);
497            let (query_port, control_port) = get_bind_ports(raw_data, query_type);
498
499            // Wait for the source to create its respective socket before telling BIND to reload, causing it to open
500            // that new socket file.
501            wait_for(move || {
502                let path = socket_path.clone();
503                async move { path.exists() }
504            })
505            .await;
506
507            // Now instruct BIND to reopen its DNSTAP socket file and execute the given query.
508            reload_bind_dnstap_socket(control_port).await;
509
510            match query_type {
511                "query" => {
512                    nslookup(query_port).await;
513                }
514                "update" => {
515                    nsupdate().await;
516                }
517                _ => (),
518            }
519        });
520    }
521
522    fn verify_events(raw_data: bool, query_event: &'static str, events: &[Event]) {
523        if raw_data {
524            assert_eq!(events.len(), 2);
525            assert!(
526                events.iter().all(|v| v.as_log().get("rawData").is_some()),
527                "No rawData field!"
528            );
529        } else if query_event == "query" {
530            assert_eq!(events.len(), 2);
531            assert!(
532                events
533                    .iter()
534                    .any(|v| v.as_log().get("messageType")
535                        == Some(&Value::Bytes("ClientQuery".into()))),
536                "No ClientQuery event!"
537            );
538            assert!(
539                events.iter().any(|v| v.as_log().get("messageType")
540                    == Some(&Value::Bytes("ClientResponse".into()))),
541                "No ClientResponse event!"
542            );
543        } else if query_event == "update" {
544            assert_eq!(events.len(), 4);
545            assert!(
546                events
547                    .iter()
548                    .any(|v| v.as_log().get("messageType")
549                        == Some(&Value::Bytes("UpdateQuery".into()))),
550                "No UpdateQuery event!"
551            );
552            assert!(
553                events.iter().any(|v| v.as_log().get("messageType")
554                    == Some(&Value::Bytes("UpdateResponse".into()))),
555                "No UpdateResponse event!"
556            );
557            assert!(
558                events
559                    .iter()
560                    .any(|v| v.as_log().get("messageType")
561                        == Some(&Value::Bytes("AuthQuery".into()))),
562                "No UpdateQuery event!"
563            );
564            assert!(
565                events
566                    .iter()
567                    .any(|v| v.as_log().get("messageType")
568                        == Some(&Value::Bytes("AuthResponse".into()))),
569                "No UpdateResponse event!"
570            );
571        }
572
573        for event in events {
574            let json = serde_json::to_value(event.as_log().all_event_fields().unwrap()).unwrap();
575            match query_event {
576                "query" => {
577                    if json["messageType"] == json!("ClientQuery") {
578                        assert_eq!(
579                            json["requestData.question[0].domainName"],
580                            json!("h1.example.com.")
581                        );
582                        assert_eq!(json["requestData.rcodeName"], json!("NoError"));
583                    } else if json["messageType"] == json!("ClientResponse") {
584                        assert_eq!(
585                            json["responseData.answers[0].domainName"],
586                            json!("h1.example.com.")
587                        );
588                        assert_eq!(json["responseData.answers[0].rData"], json!("10.0.0.11"));
589                        assert_eq!(json["responseData.rcodeName"], json!("NoError"));
590                    }
591                }
592                "update" => {
593                    if json["messageType"] == json!("UpdateQuery") {
594                        assert_eq!(
595                            json["requestData.update[0].domainName"],
596                            json!("dh1.example.com.")
597                        );
598                        assert_eq!(json["requestData.update[0].rData"], json!("10.0.0.21"));
599                        assert_eq!(json["requestData.rcodeName"], json!("NoError"));
600                    } else if json["messageType"] == json!("UpdateResponse") {
601                        assert_eq!(json["responseData.rcodeName"], json!("NoError"));
602                    }
603                }
604                _ => (),
605            }
606        }
607    }
608
609    fn get_container() -> String {
610        std::env::var("CONTAINER_NAME").unwrap_or_else(|_| "vector_dnstap".into())
611    }
612
613    fn get_socket(raw_data: bool, query_type: &'static str) -> PathBuf {
614        let socket_folder = std::env::var("BIND_SOCKET")
615            .map(PathBuf::from)
616            .expect("BIND socket directory must be specified via BIND_SOCKET");
617
618        match query_type {
619            "query" if raw_data => socket_folder.join("dnstap.sock1"),
620            "query" => socket_folder.join("dnstap.sock2"),
621            "update" => socket_folder.join("dnstap.sock3"),
622            _ => unreachable!("no other test variants should exist"),
623        }
624    }
625
626    fn get_bind_ports(raw_data: bool, query_type: &'static str) -> (&'static str, &'static str) {
627        // Returns the query port and control port, respectively, for the given BIND instance.
628        match query_type {
629            "query" if raw_data => ("8001", "9001"),
630            "query" => ("8002", "9002"),
631            "update" => ("8003", "9003"),
632            _ => unreachable!("no other test variants should exist"),
633        }
634    }
635
636    async fn dnstap_exec(cmd: Vec<&str>) {
637        let docker = Docker::connect_with_defaults().expect("failed binding to docker socket");
638        let config = CreateExecOptions {
639            cmd: Some(cmd),
640            attach_stdout: Some(true),
641            attach_stderr: Some(true),
642            ..Default::default()
643        };
644        let result = docker
645            .create_exec(get_container().as_str(), config)
646            .await
647            .expect("failed to execute command");
648        docker
649            .start_exec(&result.id, None::<StartExecOptions>)
650            .await
651            .expect("failed to execute command");
652    }
653
654    async fn reload_bind_dnstap_socket(control_port: &str) {
655        dnstap_exec(vec![
656            "/usr/sbin/rndc",
657            "-p",
658            control_port,
659            "dnstap",
660            "-reopen",
661        ])
662        .await
663    }
664
665    async fn nslookup(port: &str) {
666        dnstap_exec(vec![
667            "nslookup",
668            "-type=A",
669            format!("-port={port}").as_str(),
670            "h1.example.com",
671            "localhost",
672        ])
673        .await
674    }
675
676    async fn nsupdate() {
677        dnstap_exec(vec!["nsupdate", "-v", "/bind3/etc/bind/nsupdate.txt"]).await
678    }
679
680    #[tokio::test]
681    async fn test_dnstap_raw_event() {
682        test_dnstap(true, "query").await;
683    }
684
685    #[tokio::test]
686    async fn test_dnstap_query_event() {
687        test_dnstap(false, "query").await;
688    }
689
690    #[tokio::test]
691    async fn test_dnstap_update_event() {
692        test_dnstap(false, "update").await;
693    }
694}