vector/sinks/
mezmo.rs

1use std::time::SystemTime;
2
3use bytes::Bytes;
4use futures::{FutureExt, SinkExt};
5use http::{Request, StatusCode, Uri};
6use serde_json::json;
7use vector_lib::configurable::configurable_component;
8use vector_lib::sensitive_string::SensitiveString;
9use vrl::event_path;
10use vrl::value::{Kind, Value};
11
12use crate::{
13    codecs::Transformer,
14    config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
15    event::Event,
16    http::{Auth, HttpClient},
17    schema,
18    sinks::util::{
19        http::{HttpEventEncoder, HttpSink, PartitionHttpSink},
20        BatchConfig, BoxedRawValue, JsonArrayBuffer, PartitionBuffer, PartitionInnerBuffer,
21        RealtimeSizeBasedDefaultBatchSettings, TowerRequestConfig, UriSerde,
22    },
23    template::{Template, TemplateRenderingError},
24};
25
26const PATH: &str = "/logs/ingest";
27
28/// Configuration for the `logdna` sink.
29#[configurable_component(sink("logdna", "Deliver log event data to LogDNA."))]
30#[configurable(metadata(
31    deprecated = "The `logdna` sink has been renamed. Please use `mezmo` instead."
32))]
33#[derive(Clone, Debug)]
34pub struct LogdnaConfig(MezmoConfig);
35
36impl GenerateConfig for LogdnaConfig {
37    fn generate_config() -> toml::Value {
38        <MezmoConfig as GenerateConfig>::generate_config()
39    }
40}
41
42#[async_trait::async_trait]
43#[typetag::serde(name = "logdna")]
44impl SinkConfig for LogdnaConfig {
45    async fn build(
46        &self,
47        cx: SinkContext,
48    ) -> crate::Result<(super::VectorSink, super::Healthcheck)> {
49        warn!("DEPRECATED: The `logdna` sink has been renamed. Please use `mezmo` instead.");
50        self.0.build(cx).await
51    }
52
53    fn input(&self) -> Input {
54        self.0.input()
55    }
56
57    fn acknowledgements(&self) -> &AcknowledgementsConfig {
58        self.0.acknowledgements()
59    }
60}
61
62/// Configuration for the `mezmo` (formerly `logdna`) sink.
63#[configurable_component(sink("mezmo", "Deliver log event data to Mezmo."))]
64#[derive(Clone, Debug)]
65pub struct MezmoConfig {
66    /// The Ingestion API key.
67    #[configurable(metadata(docs::examples = "${LOGDNA_API_KEY}"))]
68    #[configurable(metadata(docs::examples = "ef8d5de700e7989468166c40fc8a0ccd"))]
69    api_key: SensitiveString,
70
71    /// The HTTP endpoint to send logs to.
72    ///
73    /// Both IP address and hostname are accepted formats.
74    #[serde(alias = "host")]
75    #[serde(default = "default_endpoint")]
76    #[configurable(metadata(docs::examples = "http://127.0.0.1"))]
77    #[configurable(metadata(docs::examples = "http://example.com"))]
78    endpoint: UriSerde,
79
80    /// The hostname that is attached to each batch of events.
81    #[configurable(metadata(docs::examples = "${HOSTNAME}"))]
82    #[configurable(metadata(docs::examples = "my-local-machine"))]
83    hostname: Template,
84
85    /// The MAC address that is attached to each batch of events.
86    #[configurable(metadata(docs::examples = "my-mac-address"))]
87    #[configurable(metadata(docs::human_name = "MAC Address"))]
88    mac: Option<String>,
89
90    /// The IP address that is attached to each batch of events.
91    #[configurable(metadata(docs::examples = "0.0.0.0"))]
92    #[configurable(metadata(docs::human_name = "IP Address"))]
93    ip: Option<String>,
94
95    /// The tags that are attached to each batch of events.
96    #[configurable(metadata(docs::examples = "tag1"))]
97    #[configurable(metadata(docs::examples = "tag2"))]
98    tags: Option<Vec<Template>>,
99
100    #[configurable(derived)]
101    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
102    pub encoding: Transformer,
103
104    /// The default app that is set for events that do not contain a `file` or `app` field.
105    #[serde(default = "default_app")]
106    #[configurable(metadata(docs::examples = "my-app"))]
107    default_app: String,
108
109    /// The default environment that is set for events that do not contain an `env` field.
110    #[serde(default = "default_env")]
111    #[configurable(metadata(docs::examples = "staging"))]
112    default_env: String,
113
114    #[configurable(derived)]
115    #[serde(default)]
116    batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
117
118    #[configurable(derived)]
119    #[serde(default)]
120    request: TowerRequestConfig,
121
122    #[configurable(derived)]
123    #[serde(
124        default,
125        deserialize_with = "crate::serde::bool_or_struct",
126        skip_serializing_if = "crate::serde::is_default"
127    )]
128    acknowledgements: AcknowledgementsConfig,
129}
130
131fn default_endpoint() -> UriSerde {
132    UriSerde {
133        uri: Uri::from_static("https://logs.mezmo.com"),
134        auth: None,
135    }
136}
137
138fn default_app() -> String {
139    "vector".to_owned()
140}
141
142fn default_env() -> String {
143    "production".to_owned()
144}
145
146impl GenerateConfig for MezmoConfig {
147    fn generate_config() -> toml::Value {
148        toml::from_str(
149            r#"hostname = "hostname"
150            api_key = "${LOGDNA_API_KEY}""#,
151        )
152        .unwrap()
153    }
154}
155
156#[async_trait::async_trait]
157#[typetag::serde(name = "mezmo")]
158impl SinkConfig for MezmoConfig {
159    async fn build(
160        &self,
161        cx: SinkContext,
162    ) -> crate::Result<(super::VectorSink, super::Healthcheck)> {
163        let request_settings = self.request.into_settings();
164        let batch_settings = self.batch.into_batch_settings()?;
165        let client = HttpClient::new(None, cx.proxy())?;
166
167        let sink = PartitionHttpSink::new(
168            self.clone(),
169            PartitionBuffer::new(JsonArrayBuffer::new(batch_settings.size)),
170            request_settings,
171            batch_settings.timeout,
172            client.clone(),
173        )
174        .sink_map_err(|error| error!(message = "Fatal mezmo sink error.", %error));
175
176        let healthcheck = healthcheck(self.clone(), client).boxed();
177
178        #[allow(deprecated)]
179        Ok((super::VectorSink::from_event_sink(sink), healthcheck))
180    }
181
182    fn input(&self) -> Input {
183        let requirement = schema::Requirement::empty()
184            .optional_meaning("timestamp", Kind::timestamp())
185            .optional_meaning("message", Kind::bytes());
186
187        Input::log().with_schema_requirement(requirement)
188    }
189
190    fn acknowledgements(&self) -> &AcknowledgementsConfig {
191        &self.acknowledgements
192    }
193}
194
195#[derive(Hash, Eq, PartialEq, Clone)]
196pub struct PartitionKey {
197    hostname: String,
198    tags: Option<Vec<String>>,
199}
200
201pub struct MezmoEventEncoder {
202    hostname: Template,
203    tags: Option<Vec<Template>>,
204    transformer: Transformer,
205    default_app: String,
206    default_env: String,
207}
208
209impl MezmoEventEncoder {
210    fn render_key(
211        &self,
212        event: &Event,
213    ) -> Result<PartitionKey, (Option<&str>, TemplateRenderingError)> {
214        let hostname = self
215            .hostname
216            .render_string(event)
217            .map_err(|e| (Some("hostname"), e))?;
218        let tags = self
219            .tags
220            .as_ref()
221            .map(|tags| {
222                let mut vec = Vec::with_capacity(tags.len());
223                for tag in tags {
224                    vec.push(tag.render_string(event).map_err(|e| (None, e))?);
225                }
226                Ok(Some(vec))
227            })
228            .unwrap_or(Ok(None))?;
229        Ok(PartitionKey { hostname, tags })
230    }
231}
232
233impl HttpEventEncoder<PartitionInnerBuffer<serde_json::Value, PartitionKey>> for MezmoEventEncoder {
234    fn encode_event(
235        &mut self,
236        mut event: Event,
237    ) -> Option<PartitionInnerBuffer<serde_json::Value, PartitionKey>> {
238        let key = self
239            .render_key(&event)
240            .map_err(|(field, error)| {
241                emit!(crate::internal_events::TemplateRenderingError {
242                    error,
243                    field,
244                    drop_event: true,
245                });
246            })
247            .ok()?;
248
249        self.transformer.transform(&mut event);
250        let mut log = event.into_log();
251
252        let line = log
253            .message_path()
254            .cloned()
255            .as_ref()
256            .and_then(|path| log.remove(path))
257            .unwrap_or_else(|| String::from("").into());
258
259        let timestamp: Value = log
260            .timestamp_path()
261            .cloned()
262            .and_then(|path| log.remove(&path))
263            .unwrap_or_else(|| chrono::Utc::now().into());
264
265        let mut map = serde_json::map::Map::new();
266
267        map.insert("line".to_string(), json!(line));
268        map.insert("timestamp".to_string(), json!(timestamp));
269
270        if let Some(env) = log.remove(event_path!("env")) {
271            map.insert("env".to_string(), json!(env));
272        }
273
274        if let Some(app) = log.remove(event_path!("app")) {
275            map.insert("app".to_string(), json!(app));
276        }
277
278        if let Some(file) = log.remove(event_path!("file")) {
279            map.insert("file".to_string(), json!(file));
280        }
281
282        if !map.contains_key("env") {
283            map.insert("env".to_string(), json!(self.default_env));
284        }
285
286        if !map.contains_key("app") && !map.contains_key("file") {
287            map.insert("app".to_string(), json!(self.default_app.as_str()));
288        }
289
290        if !log.is_empty_object() {
291            map.insert("meta".into(), json!(&log));
292        }
293
294        Some(PartitionInnerBuffer::new(map.into(), key))
295    }
296}
297
298impl HttpSink for MezmoConfig {
299    type Input = PartitionInnerBuffer<serde_json::Value, PartitionKey>;
300    type Output = PartitionInnerBuffer<Vec<BoxedRawValue>, PartitionKey>;
301    type Encoder = MezmoEventEncoder;
302
303    fn build_encoder(&self) -> Self::Encoder {
304        MezmoEventEncoder {
305            hostname: self.hostname.clone(),
306            tags: self.tags.clone(),
307            transformer: self.encoding.clone(),
308            default_app: self.default_app.clone(),
309            default_env: self.default_env.clone(),
310        }
311    }
312
313    async fn build_request(&self, output: Self::Output) -> crate::Result<http::Request<Bytes>> {
314        let (events, key) = output.into_parts();
315        let mut query = url::form_urlencoded::Serializer::new(String::new());
316
317        let now = SystemTime::now()
318            .duration_since(SystemTime::UNIX_EPOCH)
319            .expect("Time can't drift behind the epoch!")
320            .as_millis();
321
322        query.append_pair("hostname", &key.hostname);
323        query.append_pair("now", &now.to_string());
324
325        if let Some(mac) = &self.mac {
326            query.append_pair("mac", mac);
327        }
328
329        if let Some(ip) = &self.ip {
330            query.append_pair("ip", ip);
331        }
332
333        if let Some(tags) = &key.tags {
334            let tags = tags.join(",");
335            query.append_pair("tags", &tags);
336        }
337
338        let query = query.finish();
339
340        let body = crate::serde::json::to_bytes(&json!({
341            "lines": events,
342        }))
343        .unwrap()
344        .freeze();
345
346        let uri = self.build_uri(&query);
347
348        let mut request = Request::builder()
349            .uri(uri)
350            .method("POST")
351            .header("Content-Type", "application/json")
352            .body(body)
353            .unwrap();
354
355        let auth = Auth::Basic {
356            user: self.api_key.inner().to_string(),
357            password: SensitiveString::default(),
358        };
359
360        auth.apply(&mut request);
361
362        Ok(request)
363    }
364}
365
366impl MezmoConfig {
367    fn build_uri(&self, query: &str) -> Uri {
368        let host = &self.endpoint.uri;
369
370        let uri = format!("{host}{PATH}?{query}");
371
372        uri.parse::<http::Uri>()
373            .expect("This should be a valid uri")
374    }
375}
376
377async fn healthcheck(config: MezmoConfig, client: HttpClient) -> crate::Result<()> {
378    let uri = config.build_uri("");
379
380    let req = Request::post(uri).body(hyper::Body::empty()).unwrap();
381
382    let res = client.send(req).await?;
383
384    if res.status().is_server_error() {
385        return Err("Server returned a server error".into());
386    }
387
388    if res.status() == StatusCode::FORBIDDEN {
389        return Err("Token is not valid, 403 returned.".into());
390    }
391
392    Ok(())
393}
394
395#[cfg(test)]
396mod tests {
397    use futures::{channel::mpsc, StreamExt};
398    use futures_util::stream;
399    use http::{request::Parts, StatusCode};
400    use serde_json::json;
401    use vector_lib::event::{BatchNotifier, BatchStatus, Event, LogEvent};
402
403    use super::*;
404    use crate::{
405        config::SinkConfig,
406        sinks::util::test::{build_test_server_status, load_sink},
407        test_util::{
408            components::{assert_sink_compliance, HTTP_SINK_TAGS},
409            next_addr, random_lines,
410        },
411    };
412
413    #[test]
414    fn generate_config() {
415        crate::test_util::test_generate_config::<MezmoConfig>();
416    }
417
418    #[test]
419    fn encode_event() {
420        let (config, _cx) = load_sink::<MezmoConfig>(
421            r#"
422            api_key = "mylogtoken"
423            hostname = "vector"
424            default_env = "acceptance"
425            codec.except_fields = ["magic"]
426        "#,
427        )
428        .unwrap();
429        let mut encoder = config.build_encoder();
430
431        let mut event1 = Event::Log(LogEvent::from("hello world"));
432        event1.as_mut_log().insert("app", "notvector");
433        event1.as_mut_log().insert("magic", "vector");
434
435        let mut event2 = Event::Log(LogEvent::from("hello world"));
436        event2.as_mut_log().insert("file", "log.txt");
437
438        let event3 = Event::Log(LogEvent::from("hello world"));
439
440        let mut event4 = Event::Log(LogEvent::from("hello world"));
441        event4.as_mut_log().insert("env", "staging");
442
443        let event1_out = encoder.encode_event(event1).unwrap().into_parts().0;
444        let event1_out = event1_out.as_object().unwrap();
445        let event2_out = encoder.encode_event(event2).unwrap().into_parts().0;
446        let event2_out = event2_out.as_object().unwrap();
447        let event3_out = encoder.encode_event(event3).unwrap().into_parts().0;
448        let event3_out = event3_out.as_object().unwrap();
449        let event4_out = encoder.encode_event(event4).unwrap().into_parts().0;
450        let event4_out = event4_out.as_object().unwrap();
451
452        assert_eq!(event1_out.get("app").unwrap(), &json!("notvector"));
453        assert_eq!(event2_out.get("file").unwrap(), &json!("log.txt"));
454        assert_eq!(event3_out.get("app").unwrap(), &json!("vector"));
455        assert_eq!(event3_out.get("env").unwrap(), &json!("acceptance"));
456        assert_eq!(event4_out.get("env").unwrap(), &json!("staging"));
457    }
458
459    async fn smoke_start(
460        status_code: StatusCode,
461        batch_status: BatchStatus,
462    ) -> (
463        Vec<&'static str>,
464        Vec<Vec<String>>,
465        mpsc::Receiver<(Parts, bytes::Bytes)>,
466    ) {
467        let (mut config, cx) = load_sink::<MezmoConfig>(
468            r#"
469            api_key = "mylogtoken"
470            ip = "127.0.0.1"
471            mac = "some-mac-addr"
472            hostname = "{{ hostname }}"
473            tags = ["test","maybeanothertest"]
474        "#,
475        )
476        .unwrap();
477
478        // Make sure we can build the config
479        _ = config.build(cx.clone()).await.unwrap();
480
481        let addr = next_addr();
482        // Swap out the host so we can force send it
483        // to our local server
484        let endpoint = UriSerde {
485            uri: format!("http://{addr}").parse::<http::Uri>().unwrap(),
486            auth: None,
487        };
488        config.endpoint = endpoint;
489
490        let (sink, _) = config.build(cx).await.unwrap();
491
492        let (rx, _trigger, server) = build_test_server_status(addr, status_code);
493        tokio::spawn(server);
494
495        let lines = random_lines(100).take(10).collect::<Vec<_>>();
496        let mut events = Vec::new();
497        let hosts = vec!["host0", "host1"];
498
499        let (batch, mut receiver) = BatchNotifier::new_with_receiver();
500        let mut partitions = vec![Vec::new(), Vec::new()];
501        // Create 10 events where the first one contains custom
502        // fields that are not just `message`.
503        for (i, line) in lines.iter().enumerate() {
504            let mut event = LogEvent::from(line.as_str()).with_batch_notifier(&batch);
505            let p = i % 2;
506            event.insert("hostname", hosts[p]);
507
508            partitions[p].push(line.into());
509            events.push(Event::Log(event));
510        }
511        drop(batch);
512
513        let events = stream::iter(events).map(Into::into);
514        sink.run(events).await.expect("Running sink failed");
515
516        assert_eq!(receiver.try_recv(), Ok(batch_status));
517
518        (hosts, partitions, rx)
519    }
520
521    #[tokio::test]
522    async fn smoke_fails() {
523        let (_hosts, _partitions, mut rx) =
524            smoke_start(StatusCode::FORBIDDEN, BatchStatus::Rejected).await;
525        assert!(matches!(rx.try_next(), Err(mpsc::TryRecvError { .. })));
526    }
527
528    #[tokio::test]
529    async fn smoke() {
530        assert_sink_compliance(&HTTP_SINK_TAGS, async {
531            let (hosts, partitions, mut rx) =
532                smoke_start(StatusCode::OK, BatchStatus::Delivered).await;
533
534            for _ in 0..partitions.len() {
535                let output = rx.next().await.unwrap();
536
537                let request = &output.0;
538                let body: serde_json::Value = serde_json::from_slice(&output.1[..]).unwrap();
539
540                let query = request.uri.query().unwrap();
541
542                let (p, host) = hosts
543                    .iter()
544                    .enumerate()
545                    .find(|(_, host)| query.contains(&format!("hostname={host}")))
546                    .expect("invalid hostname");
547                let lines = &partitions[p];
548
549                assert!(query.contains("ip=127.0.0.1"));
550                assert!(query.contains("mac=some-mac-addr"));
551                assert!(query.contains("tags=test%2Cmaybeanothertest"));
552
553                let output = body
554                    .as_object()
555                    .unwrap()
556                    .get("lines")
557                    .unwrap()
558                    .as_array()
559                    .unwrap();
560
561                for (i, line) in output.iter().enumerate() {
562                    // All lines are json objects
563                    let line = line.as_object().unwrap();
564
565                    assert_eq!(line.get("app").unwrap(), &json!("vector"));
566                    assert_eq!(line.get("env").unwrap(), &json!("production"));
567                    assert_eq!(line.get("line").unwrap(), &json!(lines[i]));
568
569                    assert_eq!(
570                        line.get("meta").unwrap(),
571                        &json!({
572                            "hostname": host,
573                        })
574                    );
575                }
576            }
577        })
578        .await;
579    }
580}