vector/sources/dnstap/
mod.rs

1use std::path::PathBuf;
2
3use base64::prelude::{Engine as _, BASE64_STANDARD};
4use dnsmsg_parser::dns_message_parser::DnsParserOptions;
5use dnstap_parser::parser::DnstapParser;
6use dnstap_parser::schema::DnstapEventSchema;
7use vector_lib::event::{Event, LogEvent};
8use vector_lib::internal_event::{
9    ByteSize, BytesReceived, InternalEventHandle, Protocol, Registered,
10};
11use vector_lib::lookup::{owned_value_path, path};
12use vector_lib::{configurable::configurable_component, tls::MaybeTlsSettings};
13use vrl::path::{OwnedValuePath, PathPrefix};
14use vrl::value::{kind::Collection, Kind};
15
16use super::util::framestream::{
17    build_framestream_tcp_source, build_framestream_unix_source, FrameHandler,
18};
19use crate::internal_events::DnstapParseError;
20use crate::{
21    config::{log_schema, DataType, SourceConfig, SourceContext, SourceOutput},
22    Result,
23};
24use dnstap_parser::schema::DNSTAP_VALUE_PATHS;
25
26pub mod tcp;
27#[cfg(unix)]
28pub mod unix;
29use vector_lib::config::{LegacyKey, LogNamespace};
30use vector_lib::lookup::lookup_v2::OptionalValuePath;
31
32/// Configuration for the `dnstap` source.
33#[configurable_component(source("dnstap", "Collect DNS logs from a dnstap-compatible server."))]
34#[derive(Clone, Debug)]
35pub struct DnstapConfig {
36    #[serde(flatten)]
37    pub mode: Mode,
38
39    /// Maximum DNSTAP frame length that the source accepts.
40    ///
41    /// If any frame is longer than this, it is discarded.
42    #[serde(default = "default_max_frame_length")]
43    #[configurable(metadata(docs::type_unit = "bytes"))]
44    pub max_frame_length: usize,
45
46    /// Overrides the name of the log field used to add the source path to each event.
47    ///
48    /// The value is the socket path itself.
49    ///
50    /// By default, the [global `log_schema.host_key` option][global_host_key] is used.
51    ///
52    /// [global_host_key]: https://vector.dev/docs/reference/configuration/global-options/#log_schema.host_key
53    pub host_key: Option<OptionalValuePath>,
54
55    /// Whether or not to skip parsing or decoding of DNSTAP frames.
56    ///
57    /// If set to `true`, frames are not parsed or decoded. The raw frame data is set as a field on the event
58    /// (called `rawData`) and encoded as a base64 string.
59    pub raw_data_only: Option<bool>,
60
61    /// Whether or not to concurrently process DNSTAP frames.
62    pub multithreaded: Option<bool>,
63
64    /// Maximum number of frames that can be processed concurrently.
65    pub max_frame_handling_tasks: Option<usize>,
66
67    /// Whether to downcase all DNSTAP hostnames received for consistency
68    #[serde(default = "crate::serde::default_false")]
69    pub lowercase_hostnames: bool,
70
71    /// The namespace to use for logs. This overrides the global settings.
72    #[configurable(metadata(docs::hidden))]
73    #[serde(default)]
74    pub log_namespace: Option<bool>,
75}
76
77fn default_max_frame_length() -> usize {
78    bytesize::kib(100u64) as usize
79}
80
81/// Listening mode for the `dnstap` source.
82#[configurable_component]
83#[derive(Clone, Debug)]
84#[serde(tag = "mode", rename_all = "snake_case")]
85#[configurable(metadata(docs::enum_tag_description = "The type of dnstap socket to use."))]
86#[allow(clippy::large_enum_variant)] // just used for configuration
87pub enum Mode {
88    /// Listen on TCP.
89    Tcp(tcp::TcpConfig),
90
91    /// Listen on a Unix domain socket
92    #[cfg(unix)]
93    Unix(unix::UnixConfig),
94}
95
96impl DnstapConfig {
97    pub fn new(socket_path: PathBuf) -> Self {
98        Self {
99            mode: Mode::Unix(unix::UnixConfig::new(socket_path)),
100            ..Default::default()
101        }
102    }
103
104    fn log_namespace(&self) -> LogNamespace {
105        self.log_namespace.unwrap_or(false).into()
106    }
107
108    fn raw_data_only(&self) -> bool {
109        self.raw_data_only.unwrap_or(false)
110    }
111
112    pub fn schema_definition(&self, log_namespace: LogNamespace) -> vector_lib::schema::Definition {
113        let event_schema = DnstapEventSchema;
114
115        match self.log_namespace() {
116            LogNamespace::Legacy => {
117                let schema = vector_lib::schema::Definition::empty_legacy_namespace();
118
119                if self.raw_data_only() {
120                    if let Some(message_key) = log_schema().message_key() {
121                        return schema.with_event_field(
122                            message_key,
123                            Kind::bytes(),
124                            Some("message"),
125                        );
126                    }
127                }
128                event_schema.schema_definition(schema)
129            }
130            LogNamespace::Vector => {
131                let schema = vector_lib::schema::Definition::new_with_default_metadata(
132                    Kind::object(Collection::empty()),
133                    [log_namespace],
134                )
135                .with_standard_vector_source_metadata();
136
137                if self.raw_data_only() {
138                    schema.with_event_field(
139                        &owned_value_path!("message"),
140                        Kind::bytes(),
141                        Some("message"),
142                    )
143                } else {
144                    event_schema.schema_definition(schema)
145                }
146            }
147        }
148    }
149}
150
151impl Default for DnstapConfig {
152    fn default() -> Self {
153        Self {
154            #[cfg(unix)]
155            mode: Mode::Unix(unix::UnixConfig::default()),
156            #[cfg(not(unix))]
157            mode: Mode::Tcp(tcp::TcpConfig::from_address(std::net::SocketAddr::new(
158                std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
159                9000,
160            ))),
161            max_frame_length: default_max_frame_length(),
162            host_key: None,
163            raw_data_only: None,
164            multithreaded: None,
165            max_frame_handling_tasks: None,
166            lowercase_hostnames: false,
167            log_namespace: None,
168        }
169    }
170}
171
172impl_generate_config_from_default!(DnstapConfig);
173
174#[async_trait::async_trait]
175#[typetag::serde(name = "dnstap")]
176impl SourceConfig for DnstapConfig {
177    async fn build(&self, cx: SourceContext) -> Result<super::Source> {
178        let log_namespace = cx.log_namespace(self.log_namespace);
179        let common_frame_handler = CommonFrameHandler::new(self, log_namespace);
180        match &self.mode {
181            Mode::Tcp(config) => {
182                let tls_config = config.tls().as_ref().map(|tls| tls.tls_config.clone());
183
184                let tls = MaybeTlsSettings::from_config(tls_config.as_ref(), true)?;
185                let frame_handler = tcp::DnstapFrameHandler::new(
186                    config.clone(),
187                    tls,
188                    common_frame_handler,
189                    log_namespace,
190                );
191
192                build_framestream_tcp_source(frame_handler, cx.shutdown, cx.out)
193            }
194            #[cfg(unix)]
195            Mode::Unix(config) => {
196                let frame_handler =
197                    unix::DnstapFrameHandler::new(config.clone(), common_frame_handler);
198                build_framestream_unix_source(frame_handler, cx.shutdown, cx.out)
199            }
200        }
201    }
202
203    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
204        let log_namespace = global_log_namespace.merge(Some(self.log_namespace()));
205        let schema_definition = self
206            .schema_definition(log_namespace)
207            .with_standard_vector_source_metadata();
208        vec![SourceOutput::new_maybe_logs(
209            DataType::Log,
210            schema_definition,
211        )]
212    }
213
214    fn can_acknowledge(&self) -> bool {
215        false
216    }
217}
218
219#[derive(Clone)]
220struct CommonFrameHandler {
221    max_frame_length: usize,
222    content_type: String,
223    raw_data_only: bool,
224    multithreaded: bool,
225    max_frame_handling_tasks: usize,
226    host_key: Option<OwnedValuePath>,
227    timestamp_key: Option<OwnedValuePath>,
228    source_type_key: Option<OwnedValuePath>,
229    bytes_received: Registered<BytesReceived>,
230    lowercase_hostnames: bool,
231    log_namespace: LogNamespace,
232}
233
234impl CommonFrameHandler {
235    pub fn new(config: &DnstapConfig, log_namespace: LogNamespace) -> Self {
236        let source_type_key = log_schema().source_type_key();
237        let timestamp_key = log_schema().timestamp_key();
238
239        let host_key = config
240            .host_key
241            .clone()
242            .map_or(log_schema().host_key().cloned(), |k| k.path);
243
244        Self {
245            max_frame_length: config.max_frame_length,
246            content_type: "protobuf:dnstap.Dnstap".to_string(),
247            raw_data_only: config.raw_data_only.unwrap_or(false),
248            multithreaded: config.multithreaded.unwrap_or(false),
249            max_frame_handling_tasks: config.max_frame_handling_tasks.unwrap_or(1000),
250            host_key,
251            timestamp_key: timestamp_key.cloned(),
252            source_type_key: source_type_key.cloned(),
253            bytes_received: register!(BytesReceived::from(Protocol::from("protobuf"))),
254            lowercase_hostnames: config.lowercase_hostnames,
255            log_namespace,
256        }
257    }
258}
259
260impl FrameHandler for CommonFrameHandler {
261    fn content_type(&self) -> String {
262        self.content_type.clone()
263    }
264
265    fn max_frame_length(&self) -> usize {
266        self.max_frame_length
267    }
268
269    fn handle_event(
270        &self,
271        received_from: Option<vrl::prelude::Bytes>,
272        frame: vrl::prelude::Bytes,
273    ) -> Option<vector_lib::event::Event> {
274        self.bytes_received.emit(ByteSize(frame.len()));
275
276        let mut log_event = LogEvent::default();
277
278        if let Some(host) = received_from {
279            self.log_namespace.insert_source_metadata(
280                DnstapConfig::NAME,
281                &mut log_event,
282                self.host_key.as_ref().map(LegacyKey::Overwrite),
283                path!("host"),
284                host,
285            );
286        }
287
288        if self.raw_data_only {
289            log_event.insert(
290                (PathPrefix::Event, &DNSTAP_VALUE_PATHS.raw_data),
291                BASE64_STANDARD.encode(&frame),
292            );
293        } else if let Err(err) = DnstapParser::parse(
294            &mut log_event,
295            frame,
296            DnsParserOptions {
297                lowercase_hostnames: self.lowercase_hostnames,
298            },
299        ) {
300            emit!(DnstapParseError {
301                error: format!("Dnstap protobuf decode error {err:?}.")
302            });
303            return None;
304        }
305
306        if self.log_namespace == LogNamespace::Vector {
307            // The timestamp is inserted by the parser which caters for the Legacy namespace.
308            self.log_namespace.insert_vector_metadata(
309                &mut log_event,
310                self.timestamp_key(),
311                path!("ingest_timestamp"),
312                chrono::Utc::now(),
313            );
314        }
315
316        self.log_namespace.insert_vector_metadata(
317            &mut log_event,
318            self.source_type_key(),
319            path!("source_type"),
320            DnstapConfig::NAME,
321        );
322
323        Some(Event::from(log_event))
324    }
325
326    fn multithreaded(&self) -> bool {
327        self.multithreaded
328    }
329
330    fn max_frame_handling_tasks(&self) -> usize {
331        self.max_frame_handling_tasks
332    }
333
334    fn host_key(&self) -> &Option<vrl::path::OwnedValuePath> {
335        &self.host_key
336    }
337
338    fn timestamp_key(&self) -> Option<&vrl::path::OwnedValuePath> {
339        self.timestamp_key.as_ref()
340    }
341
342    fn source_type_key(&self) -> Option<&vrl::path::OwnedValuePath> {
343        self.source_type_key.as_ref()
344    }
345}
346
347#[cfg(test)]
348mod tests {
349    use vector_lib::event::{Event, LogEvent};
350
351    use super::*;
352
353    #[test]
354    fn simple_matches_schema() {
355        let record = r#"{"dataType":"Message",
356                         "dataTypeId":1,
357                         "messageType":"ClientQuery",
358                         "messageTypeId":5,
359                         "requestData":{
360                           "fullRcode":0,
361                           "header":{
362                             "aa":false,
363                             "ad":true,
364                             "anCount":0,
365                             "arCount":1,
366                             "cd":false,
367                             "id":38339,
368                             "nsCount":0,
369                             "opcode":0,
370                             "qdCount":1,
371                             "qr":0,
372                             "ra":false,
373                             "rcode":0,
374                             "rd":true,
375                             "tc":false},
376                           "opt":{
377                             "do":false,
378                             "ednsVersion":0,
379                             "extendedRcode":0,
380                             "options":[{"optCode":10,
381                                         "optName":"Cookie",
382                                         "optValue":"5JiWq4VYa7U="}],
383                             "udpPayloadSize":1232},
384                           "question":[{"class":"IN","domainName":"whoami.example.org.","questionType":"A","questionTypeId":1}],
385                           "rcodeName":"NoError",
386                           "time":1667909880863224758,
387                           "timePrecision":"ns"},
388                         "serverId":"stephenwakely-Precision-5570",
389                         "serverVersion":"CoreDNS-1.10.0",
390                         "socketFamily":"INET",
391                         "socketProtocol":"UDP",
392                         "sourceAddress":"0.0.0.0",
393                         "sourcePort":54782,
394                         "source_type":"dnstap",
395                         "time":1667909880863224758,
396                         "timePrecision":"ns"
397                         }"#;
398
399        let json: serde_json::Value = serde_json::from_str(record).unwrap();
400        let mut event = Event::from(LogEvent::from(vrl::value::Value::from(json)));
401        event.as_mut_log().insert("timestamp", chrono::Utc::now());
402
403        let definition = DnstapEventSchema;
404        let schema = vector_lib::schema::Definition::empty_legacy_namespace()
405            .with_standard_vector_source_metadata();
406
407        definition
408            .schema_definition(schema)
409            .assert_valid_for_event(&event)
410    }
411}
412
413#[cfg(all(test, feature = "dnstap-integration-tests"))]
414mod integration_tests {
415    #![allow(clippy::print_stdout)] // tests
416
417    use bollard::exec::{CreateExecOptions, StartExecOptions};
418    use bollard::Docker;
419    use futures::StreamExt;
420    use serde_json::json;
421    use tokio::time;
422    use vector_lib::event::Event;
423    use vector_lib::lookup::lookup_v2::OptionalValuePath;
424
425    use self::unix::UnixConfig;
426
427    use super::*;
428    use crate::{
429        event::Value,
430        test_util::{
431            components::{assert_source_compliance, SOURCE_TAGS},
432            wait_for,
433        },
434        SourceSender,
435    };
436
437    async fn test_dnstap(raw_data: bool, query_type: &'static str) {
438        assert_source_compliance(&SOURCE_TAGS, async {
439            let (sender, mut recv) = SourceSender::new_test();
440
441            tokio::spawn(async move {
442                let socket = get_socket(raw_data, query_type);
443
444                DnstapConfig {
445                    mode: Mode::Unix(UnixConfig {
446                        socket_path: socket,
447                        socket_file_mode: Some(511),
448                        socket_receive_buffer_size: Some(10485760),
449                        socket_send_buffer_size: Some(10485760),
450                    }),
451                    max_frame_length: 102400,
452                    host_key: Some(OptionalValuePath::from(owned_value_path!("key"))),
453                    raw_data_only: Some(raw_data),
454                    multithreaded: Some(false),
455                    max_frame_handling_tasks: Some(100000),
456                    lowercase_hostnames: false,
457                    log_namespace: None,
458                }
459                .build(SourceContext::new_test(sender, None))
460                .await
461                .unwrap()
462                .await
463                .unwrap()
464            });
465
466            send_query(raw_data, query_type);
467
468            let event = time::timeout(time::Duration::from_secs(10), recv.next())
469                .await
470                .expect("fetch dnstap source event timeout")
471                .expect("failed to get dnstap source event from a stream");
472            let mut events = vec![event];
473            loop {
474                match time::timeout(time::Duration::from_secs(1), recv.next()).await {
475                    Ok(Some(event)) => events.push(event),
476                    Ok(None) => {
477                        println!("None: No event");
478                        break;
479                    }
480                    Err(e) => {
481                        println!("Error: {e}");
482                        break;
483                    }
484                }
485            }
486
487            verify_events(raw_data, query_type, &events);
488        })
489        .await;
490    }
491
492    fn send_query(raw_data: bool, query_type: &'static str) {
493        tokio::spawn(async move {
494            let socket_path = get_socket(raw_data, query_type);
495            let (query_port, control_port) = get_bind_ports(raw_data, query_type);
496
497            // Wait for the source to create its respective socket before telling BIND to reload, causing it to open
498            // that new socket file.
499            wait_for(move || {
500                let path = socket_path.clone();
501                async move { path.exists() }
502            })
503            .await;
504
505            // Now instruct BIND to reopen its DNSTAP socket file and execute the given query.
506            reload_bind_dnstap_socket(control_port).await;
507
508            match query_type {
509                "query" => {
510                    nslookup(query_port).await;
511                }
512                "update" => {
513                    nsupdate().await;
514                }
515                _ => (),
516            }
517        });
518    }
519
520    fn verify_events(raw_data: bool, query_event: &'static str, events: &[Event]) {
521        if raw_data {
522            assert_eq!(events.len(), 2);
523            assert!(
524                events.iter().all(|v| v.as_log().get("rawData").is_some()),
525                "No rawData field!"
526            );
527        } else if query_event == "query" {
528            assert_eq!(events.len(), 2);
529            assert!(
530                events
531                    .iter()
532                    .any(|v| v.as_log().get("messageType")
533                        == Some(&Value::Bytes("ClientQuery".into()))),
534                "No ClientQuery event!"
535            );
536            assert!(
537                events.iter().any(|v| v.as_log().get("messageType")
538                    == Some(&Value::Bytes("ClientResponse".into()))),
539                "No ClientResponse event!"
540            );
541        } else if query_event == "update" {
542            assert_eq!(events.len(), 4);
543            assert!(
544                events
545                    .iter()
546                    .any(|v| v.as_log().get("messageType")
547                        == Some(&Value::Bytes("UpdateQuery".into()))),
548                "No UpdateQuery event!"
549            );
550            assert!(
551                events.iter().any(|v| v.as_log().get("messageType")
552                    == Some(&Value::Bytes("UpdateResponse".into()))),
553                "No UpdateResponse event!"
554            );
555            assert!(
556                events
557                    .iter()
558                    .any(|v| v.as_log().get("messageType")
559                        == Some(&Value::Bytes("AuthQuery".into()))),
560                "No UpdateQuery event!"
561            );
562            assert!(
563                events
564                    .iter()
565                    .any(|v| v.as_log().get("messageType")
566                        == Some(&Value::Bytes("AuthResponse".into()))),
567                "No UpdateResponse event!"
568            );
569        }
570
571        for event in events {
572            let json = serde_json::to_value(event.as_log().all_event_fields().unwrap()).unwrap();
573            match query_event {
574                "query" => {
575                    if json["messageType"] == json!("ClientQuery") {
576                        assert_eq!(
577                            json["requestData.question[0].domainName"],
578                            json!("h1.example.com.")
579                        );
580                        assert_eq!(json["requestData.rcodeName"], json!("NoError"));
581                    } else if json["messageType"] == json!("ClientResponse") {
582                        assert_eq!(
583                            json["responseData.answers[0].domainName"],
584                            json!("h1.example.com.")
585                        );
586                        assert_eq!(json["responseData.answers[0].rData"], json!("10.0.0.11"));
587                        assert_eq!(json["responseData.rcodeName"], json!("NoError"));
588                    }
589                }
590                "update" => {
591                    if json["messageType"] == json!("UpdateQuery") {
592                        assert_eq!(
593                            json["requestData.update[0].domainName"],
594                            json!("dh1.example.com.")
595                        );
596                        assert_eq!(json["requestData.update[0].rData"], json!("10.0.0.21"));
597                        assert_eq!(json["requestData.rcodeName"], json!("NoError"));
598                    } else if json["messageType"] == json!("UpdateResponse") {
599                        assert_eq!(json["responseData.rcodeName"], json!("NoError"));
600                    }
601                }
602                _ => (),
603            }
604        }
605    }
606
607    fn get_container() -> String {
608        std::env::var("CONTAINER_NAME").unwrap_or_else(|_| "vector_dnstap".into())
609    }
610
611    fn get_socket(raw_data: bool, query_type: &'static str) -> PathBuf {
612        let socket_folder = std::env::var("BIND_SOCKET")
613            .map(PathBuf::from)
614            .expect("BIND socket directory must be specified via BIND_SOCKET");
615
616        match query_type {
617            "query" if raw_data => socket_folder.join("dnstap.sock1"),
618            "query" => socket_folder.join("dnstap.sock2"),
619            "update" => socket_folder.join("dnstap.sock3"),
620            _ => unreachable!("no other test variants should exist"),
621        }
622    }
623
624    fn get_bind_ports(raw_data: bool, query_type: &'static str) -> (&'static str, &'static str) {
625        // Returns the query port and control port, respectively, for the given BIND instance.
626        match query_type {
627            "query" if raw_data => ("8001", "9001"),
628            "query" => ("8002", "9002"),
629            "update" => ("8003", "9003"),
630            _ => unreachable!("no other test variants should exist"),
631        }
632    }
633
634    async fn dnstap_exec(cmd: Vec<&str>) {
635        let docker = Docker::connect_with_defaults().expect("failed binding to docker socket");
636        let config = CreateExecOptions {
637            cmd: Some(cmd),
638            attach_stdout: Some(true),
639            attach_stderr: Some(true),
640            ..Default::default()
641        };
642        let result = docker
643            .create_exec(get_container().as_str(), config)
644            .await
645            .expect("failed to execute command");
646        docker
647            .start_exec(&result.id, None::<StartExecOptions>)
648            .await
649            .expect("failed to execute command");
650    }
651
652    async fn reload_bind_dnstap_socket(control_port: &str) {
653        dnstap_exec(vec![
654            "/usr/sbin/rndc",
655            "-p",
656            control_port,
657            "dnstap",
658            "-reopen",
659        ])
660        .await
661    }
662
663    async fn nslookup(port: &str) {
664        dnstap_exec(vec![
665            "nslookup",
666            "-type=A",
667            format!("-port={port}").as_str(),
668            "h1.example.com",
669            "localhost",
670        ])
671        .await
672    }
673
674    async fn nsupdate() {
675        dnstap_exec(vec!["nsupdate", "-v", "/bind3/etc/bind/nsupdate.txt"]).await
676    }
677
678    #[tokio::test]
679    async fn test_dnstap_raw_event() {
680        test_dnstap(true, "query").await;
681    }
682
683    #[tokio::test]
684    async fn test_dnstap_query_event() {
685        test_dnstap(false, "query").await;
686    }
687
688    #[tokio::test]
689    async fn test_dnstap_update_event() {
690        test_dnstap(false, "update").await;
691    }
692}