vector/sinks/
mezmo.rs

1use std::time::SystemTime;
2
3use bytes::Bytes;
4use futures::{FutureExt, SinkExt};
5use http::{Request, StatusCode, Uri};
6use serde_json::json;
7use vector_lib::{configurable::configurable_component, sensitive_string::SensitiveString};
8use vrl::{
9    event_path,
10    value::{Kind, Value},
11};
12
13use crate::{
14    codecs::Transformer,
15    config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
16    event::Event,
17    http::{Auth, HttpClient},
18    schema,
19    sinks::util::{
20        BatchConfig, BoxedRawValue, JsonArrayBuffer, PartitionBuffer, PartitionInnerBuffer,
21        RealtimeSizeBasedDefaultBatchSettings, TowerRequestConfig, UriSerde,
22        http::{HttpEventEncoder, HttpSink, PartitionHttpSink},
23    },
24    template::{Template, TemplateRenderingError},
25};
26
27const PATH: &str = "/logs/ingest";
28
29/// Configuration for the `logdna` sink.
30#[configurable_component(sink("logdna", "Deliver log event data to LogDNA."))]
31#[configurable(metadata(
32    deprecated = "The `logdna` sink has been renamed. Please use `mezmo` instead."
33))]
34#[derive(Clone, Debug)]
35pub struct LogdnaConfig(MezmoConfig);
36
37impl GenerateConfig for LogdnaConfig {
38    fn generate_config() -> toml::Value {
39        <MezmoConfig as GenerateConfig>::generate_config()
40    }
41}
42
43#[async_trait::async_trait]
44#[typetag::serde(name = "logdna")]
45impl SinkConfig for LogdnaConfig {
46    async fn build(
47        &self,
48        cx: SinkContext,
49    ) -> crate::Result<(super::VectorSink, super::Healthcheck)> {
50        warn!("DEPRECATED: The `logdna` sink has been renamed. Please use `mezmo` instead.");
51        self.0.build(cx).await
52    }
53
54    fn input(&self) -> Input {
55        self.0.input()
56    }
57
58    fn acknowledgements(&self) -> &AcknowledgementsConfig {
59        self.0.acknowledgements()
60    }
61}
62
63/// Configuration for the `mezmo` (formerly `logdna`) sink.
64#[configurable_component(sink("mezmo", "Deliver log event data to Mezmo."))]
65#[derive(Clone, Debug)]
66pub struct MezmoConfig {
67    /// The Ingestion API key.
68    #[configurable(metadata(docs::examples = "${LOGDNA_API_KEY}"))]
69    #[configurable(metadata(docs::examples = "ef8d5de700e7989468166c40fc8a0ccd"))]
70    api_key: SensitiveString,
71
72    /// The HTTP endpoint to send logs to.
73    ///
74    /// Both IP address and hostname are accepted formats.
75    #[serde(alias = "host")]
76    #[serde(default = "default_endpoint")]
77    #[configurable(metadata(docs::examples = "http://127.0.0.1"))]
78    #[configurable(metadata(docs::examples = "http://example.com"))]
79    endpoint: UriSerde,
80
81    /// The hostname that is attached to each batch of events.
82    #[configurable(metadata(docs::examples = "${HOSTNAME}"))]
83    #[configurable(metadata(docs::examples = "my-local-machine"))]
84    hostname: Template,
85
86    /// The MAC address that is attached to each batch of events.
87    #[configurable(metadata(docs::examples = "my-mac-address"))]
88    #[configurable(metadata(docs::human_name = "MAC Address"))]
89    mac: Option<String>,
90
91    /// The IP address that is attached to each batch of events.
92    #[configurable(metadata(docs::examples = "0.0.0.0"))]
93    #[configurable(metadata(docs::human_name = "IP Address"))]
94    ip: Option<String>,
95
96    /// The tags that are attached to each batch of events.
97    #[configurable(metadata(docs::examples = "tag1"))]
98    #[configurable(metadata(docs::examples = "tag2"))]
99    tags: Option<Vec<Template>>,
100
101    #[configurable(derived)]
102    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
103    pub encoding: Transformer,
104
105    /// The default app that is set for events that do not contain a `file` or `app` field.
106    #[serde(default = "default_app")]
107    #[configurable(metadata(docs::examples = "my-app"))]
108    default_app: String,
109
110    /// The default environment that is set for events that do not contain an `env` field.
111    #[serde(default = "default_env")]
112    #[configurable(metadata(docs::examples = "staging"))]
113    default_env: String,
114
115    #[configurable(derived)]
116    #[serde(default)]
117    batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
118
119    #[configurable(derived)]
120    #[serde(default)]
121    request: TowerRequestConfig,
122
123    #[configurable(derived)]
124    #[serde(
125        default,
126        deserialize_with = "crate::serde::bool_or_struct",
127        skip_serializing_if = "crate::serde::is_default"
128    )]
129    acknowledgements: AcknowledgementsConfig,
130}
131
132fn default_endpoint() -> UriSerde {
133    UriSerde {
134        uri: Uri::from_static("https://logs.mezmo.com"),
135        auth: None,
136    }
137}
138
139fn default_app() -> String {
140    "vector".to_owned()
141}
142
143fn default_env() -> String {
144    "production".to_owned()
145}
146
147impl GenerateConfig for MezmoConfig {
148    fn generate_config() -> toml::Value {
149        toml::from_str(
150            r#"hostname = "hostname"
151            api_key = "${LOGDNA_API_KEY}""#,
152        )
153        .unwrap()
154    }
155}
156
157#[async_trait::async_trait]
158#[typetag::serde(name = "mezmo")]
159impl SinkConfig for MezmoConfig {
160    async fn build(
161        &self,
162        cx: SinkContext,
163    ) -> crate::Result<(super::VectorSink, super::Healthcheck)> {
164        let request_settings = self.request.into_settings();
165        let batch_settings = self.batch.into_batch_settings()?;
166        let client = HttpClient::new(None, cx.proxy())?;
167
168        let sink = PartitionHttpSink::new(
169            self.clone(),
170            PartitionBuffer::new(JsonArrayBuffer::new(batch_settings.size)),
171            request_settings,
172            batch_settings.timeout,
173            client.clone(),
174        )
175        .sink_map_err(|error| error!(message = "Fatal mezmo sink error.", %error));
176
177        let healthcheck = healthcheck(self.clone(), client).boxed();
178
179        #[allow(deprecated)]
180        Ok((super::VectorSink::from_event_sink(sink), healthcheck))
181    }
182
183    fn input(&self) -> Input {
184        let requirement = schema::Requirement::empty()
185            .optional_meaning("timestamp", Kind::timestamp())
186            .optional_meaning("message", Kind::bytes());
187
188        Input::log().with_schema_requirement(requirement)
189    }
190
191    fn acknowledgements(&self) -> &AcknowledgementsConfig {
192        &self.acknowledgements
193    }
194}
195
196#[derive(Hash, Eq, PartialEq, Clone)]
197pub struct PartitionKey {
198    hostname: String,
199    tags: Option<Vec<String>>,
200}
201
202pub struct MezmoEventEncoder {
203    hostname: Template,
204    tags: Option<Vec<Template>>,
205    transformer: Transformer,
206    default_app: String,
207    default_env: String,
208}
209
210impl MezmoEventEncoder {
211    fn render_key(
212        &self,
213        event: &Event,
214    ) -> Result<PartitionKey, (Option<&str>, TemplateRenderingError)> {
215        let hostname = self
216            .hostname
217            .render_string(event)
218            .map_err(|e| (Some("hostname"), e))?;
219        let tags = self
220            .tags
221            .as_ref()
222            .map(|tags| {
223                let mut vec = Vec::with_capacity(tags.len());
224                for tag in tags {
225                    vec.push(tag.render_string(event).map_err(|e| (None, e))?);
226                }
227                Ok(Some(vec))
228            })
229            .unwrap_or(Ok(None))?;
230        Ok(PartitionKey { hostname, tags })
231    }
232}
233
234impl HttpEventEncoder<PartitionInnerBuffer<serde_json::Value, PartitionKey>> for MezmoEventEncoder {
235    fn encode_event(
236        &mut self,
237        mut event: Event,
238    ) -> Option<PartitionInnerBuffer<serde_json::Value, PartitionKey>> {
239        let key = self
240            .render_key(&event)
241            .map_err(|(field, error)| {
242                emit!(crate::internal_events::TemplateRenderingError {
243                    error,
244                    field,
245                    drop_event: true,
246                });
247            })
248            .ok()?;
249
250        self.transformer.transform(&mut event);
251        let mut log = event.into_log();
252
253        let line = log
254            .message_path()
255            .cloned()
256            .as_ref()
257            .and_then(|path| log.remove(path))
258            .unwrap_or_else(|| String::from("").into());
259
260        let timestamp: Value = log
261            .timestamp_path()
262            .cloned()
263            .and_then(|path| log.remove(&path))
264            .unwrap_or_else(|| chrono::Utc::now().into());
265
266        let mut map = serde_json::map::Map::new();
267
268        map.insert("line".to_string(), json!(line));
269        map.insert("timestamp".to_string(), json!(timestamp));
270
271        if let Some(env) = log.remove(event_path!("env")) {
272            map.insert("env".to_string(), json!(env));
273        }
274
275        if let Some(app) = log.remove(event_path!("app")) {
276            map.insert("app".to_string(), json!(app));
277        }
278
279        if let Some(file) = log.remove(event_path!("file")) {
280            map.insert("file".to_string(), json!(file));
281        }
282
283        if !map.contains_key("env") {
284            map.insert("env".to_string(), json!(self.default_env));
285        }
286
287        if !map.contains_key("app") && !map.contains_key("file") {
288            map.insert("app".to_string(), json!(self.default_app.as_str()));
289        }
290
291        if !log.is_empty_object() {
292            map.insert("meta".into(), json!(&log));
293        }
294
295        Some(PartitionInnerBuffer::new(map.into(), key))
296    }
297}
298
299impl HttpSink for MezmoConfig {
300    type Input = PartitionInnerBuffer<serde_json::Value, PartitionKey>;
301    type Output = PartitionInnerBuffer<Vec<BoxedRawValue>, PartitionKey>;
302    type Encoder = MezmoEventEncoder;
303
304    fn build_encoder(&self) -> Self::Encoder {
305        MezmoEventEncoder {
306            hostname: self.hostname.clone(),
307            tags: self.tags.clone(),
308            transformer: self.encoding.clone(),
309            default_app: self.default_app.clone(),
310            default_env: self.default_env.clone(),
311        }
312    }
313
314    async fn build_request(&self, output: Self::Output) -> crate::Result<http::Request<Bytes>> {
315        let (events, key) = output.into_parts();
316        let mut query = url::form_urlencoded::Serializer::new(String::new());
317
318        let now = SystemTime::now()
319            .duration_since(SystemTime::UNIX_EPOCH)
320            .expect("Time can't drift behind the epoch!")
321            .as_millis();
322
323        query.append_pair("hostname", &key.hostname);
324        query.append_pair("now", &now.to_string());
325
326        if let Some(mac) = &self.mac {
327            query.append_pair("mac", mac);
328        }
329
330        if let Some(ip) = &self.ip {
331            query.append_pair("ip", ip);
332        }
333
334        if let Some(tags) = &key.tags {
335            let tags = tags.join(",");
336            query.append_pair("tags", &tags);
337        }
338
339        let query = query.finish();
340
341        let body = crate::serde::json::to_bytes(&json!({
342            "lines": events,
343        }))
344        .unwrap()
345        .freeze();
346
347        let uri = self.build_uri(&query);
348
349        let mut request = Request::builder()
350            .uri(uri)
351            .method("POST")
352            .header("Content-Type", "application/json")
353            .body(body)
354            .unwrap();
355
356        let auth = Auth::Basic {
357            user: self.api_key.inner().to_string(),
358            password: SensitiveString::default(),
359        };
360
361        auth.apply(&mut request);
362
363        Ok(request)
364    }
365}
366
367impl MezmoConfig {
368    fn build_uri(&self, query: &str) -> Uri {
369        let host = &self.endpoint.uri;
370
371        let uri = format!("{host}{PATH}?{query}");
372
373        uri.parse::<http::Uri>()
374            .expect("This should be a valid uri")
375    }
376}
377
378async fn healthcheck(config: MezmoConfig, client: HttpClient) -> crate::Result<()> {
379    let uri = config.build_uri("");
380
381    let req = Request::post(uri).body(hyper::Body::empty()).unwrap();
382
383    let res = client.send(req).await?;
384
385    if res.status().is_server_error() {
386        return Err("Server returned a server error".into());
387    }
388
389    if res.status() == StatusCode::FORBIDDEN {
390        return Err("Token is not valid, 403 returned.".into());
391    }
392
393    Ok(())
394}
395
396#[cfg(test)]
397mod tests {
398    use futures::{StreamExt, channel::mpsc};
399    use futures_util::stream;
400    use http::{StatusCode, request::Parts};
401    use serde_json::json;
402    use vector_lib::event::{BatchNotifier, BatchStatus, Event, LogEvent};
403
404    use super::*;
405    use crate::{
406        config::SinkConfig,
407        sinks::util::test::{build_test_server_status, load_sink},
408        test_util::{
409            components::{HTTP_SINK_TAGS, assert_sink_compliance},
410            next_addr, random_lines,
411        },
412    };
413
414    #[test]
415    fn generate_config() {
416        crate::test_util::test_generate_config::<MezmoConfig>();
417    }
418
419    #[test]
420    fn encode_event() {
421        let (config, _cx) = load_sink::<MezmoConfig>(
422            r#"
423            api_key = "mylogtoken"
424            hostname = "vector"
425            default_env = "acceptance"
426            codec.except_fields = ["magic"]
427        "#,
428        )
429        .unwrap();
430        let mut encoder = config.build_encoder();
431
432        let mut event1 = Event::Log(LogEvent::from("hello world"));
433        event1.as_mut_log().insert("app", "notvector");
434        event1.as_mut_log().insert("magic", "vector");
435
436        let mut event2 = Event::Log(LogEvent::from("hello world"));
437        event2.as_mut_log().insert("file", "log.txt");
438
439        let event3 = Event::Log(LogEvent::from("hello world"));
440
441        let mut event4 = Event::Log(LogEvent::from("hello world"));
442        event4.as_mut_log().insert("env", "staging");
443
444        let event1_out = encoder.encode_event(event1).unwrap().into_parts().0;
445        let event1_out = event1_out.as_object().unwrap();
446        let event2_out = encoder.encode_event(event2).unwrap().into_parts().0;
447        let event2_out = event2_out.as_object().unwrap();
448        let event3_out = encoder.encode_event(event3).unwrap().into_parts().0;
449        let event3_out = event3_out.as_object().unwrap();
450        let event4_out = encoder.encode_event(event4).unwrap().into_parts().0;
451        let event4_out = event4_out.as_object().unwrap();
452
453        assert_eq!(event1_out.get("app").unwrap(), &json!("notvector"));
454        assert_eq!(event2_out.get("file").unwrap(), &json!("log.txt"));
455        assert_eq!(event3_out.get("app").unwrap(), &json!("vector"));
456        assert_eq!(event3_out.get("env").unwrap(), &json!("acceptance"));
457        assert_eq!(event4_out.get("env").unwrap(), &json!("staging"));
458    }
459
460    async fn smoke_start(
461        status_code: StatusCode,
462        batch_status: BatchStatus,
463    ) -> (
464        Vec<&'static str>,
465        Vec<Vec<String>>,
466        mpsc::Receiver<(Parts, bytes::Bytes)>,
467    ) {
468        let (mut config, cx) = load_sink::<MezmoConfig>(
469            r#"
470            api_key = "mylogtoken"
471            ip = "127.0.0.1"
472            mac = "some-mac-addr"
473            hostname = "{{ hostname }}"
474            tags = ["test","maybeanothertest"]
475        "#,
476        )
477        .unwrap();
478
479        // Make sure we can build the config
480        _ = config.build(cx.clone()).await.unwrap();
481
482        let addr = next_addr();
483        // Swap out the host so we can force send it
484        // to our local server
485        let endpoint = UriSerde {
486            uri: format!("http://{addr}").parse::<http::Uri>().unwrap(),
487            auth: None,
488        };
489        config.endpoint = endpoint;
490
491        let (sink, _) = config.build(cx).await.unwrap();
492
493        let (rx, _trigger, server) = build_test_server_status(addr, status_code);
494        tokio::spawn(server);
495
496        let lines = random_lines(100).take(10).collect::<Vec<_>>();
497        let mut events = Vec::new();
498        let hosts = vec!["host0", "host1"];
499
500        let (batch, mut receiver) = BatchNotifier::new_with_receiver();
501        let mut partitions = vec![Vec::new(), Vec::new()];
502        // Create 10 events where the first one contains custom
503        // fields that are not just `message`.
504        for (i, line) in lines.iter().enumerate() {
505            let mut event = LogEvent::from(line.as_str()).with_batch_notifier(&batch);
506            let p = i % 2;
507            event.insert("hostname", hosts[p]);
508
509            partitions[p].push(line.into());
510            events.push(Event::Log(event));
511        }
512        drop(batch);
513
514        let events = stream::iter(events).map(Into::into);
515        sink.run(events).await.expect("Running sink failed");
516
517        assert_eq!(receiver.try_recv(), Ok(batch_status));
518
519        (hosts, partitions, rx)
520    }
521
522    #[tokio::test]
523    async fn smoke_fails() {
524        let (_hosts, _partitions, mut rx) =
525            smoke_start(StatusCode::FORBIDDEN, BatchStatus::Rejected).await;
526        assert!(matches!(rx.try_next(), Err(mpsc::TryRecvError { .. })));
527    }
528
529    #[tokio::test]
530    async fn smoke() {
531        assert_sink_compliance(&HTTP_SINK_TAGS, async {
532            let (hosts, partitions, mut rx) =
533                smoke_start(StatusCode::OK, BatchStatus::Delivered).await;
534
535            for _ in 0..partitions.len() {
536                let output = rx.next().await.unwrap();
537
538                let request = &output.0;
539                let body: serde_json::Value = serde_json::from_slice(&output.1[..]).unwrap();
540
541                let query = request.uri.query().unwrap();
542
543                let (p, host) = hosts
544                    .iter()
545                    .enumerate()
546                    .find(|(_, host)| query.contains(&format!("hostname={host}")))
547                    .expect("invalid hostname");
548                let lines = &partitions[p];
549
550                assert!(query.contains("ip=127.0.0.1"));
551                assert!(query.contains("mac=some-mac-addr"));
552                assert!(query.contains("tags=test%2Cmaybeanothertest"));
553
554                let output = body
555                    .as_object()
556                    .unwrap()
557                    .get("lines")
558                    .unwrap()
559                    .as_array()
560                    .unwrap();
561
562                for (i, line) in output.iter().enumerate() {
563                    // All lines are json objects
564                    let line = line.as_object().unwrap();
565
566                    assert_eq!(line.get("app").unwrap(), &json!("vector"));
567                    assert_eq!(line.get("env").unwrap(), &json!("production"));
568                    assert_eq!(line.get("line").unwrap(), &json!(lines[i]));
569
570                    assert_eq!(
571                        line.get("meta").unwrap(),
572                        &json!({
573                            "hostname": host,
574                        })
575                    );
576                }
577            }
578        })
579        .await;
580    }
581}