1use std::time::SystemTime;
2
3use bytes::Bytes;
4use futures::{FutureExt, SinkExt};
5use http::{Request, StatusCode, Uri};
6use serde_json::json;
7use vector_lib::configurable::configurable_component;
8use vector_lib::sensitive_string::SensitiveString;
9use vrl::event_path;
10use vrl::value::{Kind, Value};
11
12use crate::{
13 codecs::Transformer,
14 config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
15 event::Event,
16 http::{Auth, HttpClient},
17 schema,
18 sinks::util::{
19 http::{HttpEventEncoder, HttpSink, PartitionHttpSink},
20 BatchConfig, BoxedRawValue, JsonArrayBuffer, PartitionBuffer, PartitionInnerBuffer,
21 RealtimeSizeBasedDefaultBatchSettings, TowerRequestConfig, UriSerde,
22 },
23 template::{Template, TemplateRenderingError},
24};
25
26const PATH: &str = "/logs/ingest";
27
28#[configurable_component(sink("logdna", "Deliver log event data to LogDNA."))]
30#[configurable(metadata(
31 deprecated = "The `logdna` sink has been renamed. Please use `mezmo` instead."
32))]
33#[derive(Clone, Debug)]
34pub struct LogdnaConfig(MezmoConfig);
35
36impl GenerateConfig for LogdnaConfig {
37 fn generate_config() -> toml::Value {
38 <MezmoConfig as GenerateConfig>::generate_config()
39 }
40}
41
42#[async_trait::async_trait]
43#[typetag::serde(name = "logdna")]
44impl SinkConfig for LogdnaConfig {
45 async fn build(
46 &self,
47 cx: SinkContext,
48 ) -> crate::Result<(super::VectorSink, super::Healthcheck)> {
49 warn!("DEPRECATED: The `logdna` sink has been renamed. Please use `mezmo` instead.");
50 self.0.build(cx).await
51 }
52
53 fn input(&self) -> Input {
54 self.0.input()
55 }
56
57 fn acknowledgements(&self) -> &AcknowledgementsConfig {
58 self.0.acknowledgements()
59 }
60}
61
62#[configurable_component(sink("mezmo", "Deliver log event data to Mezmo."))]
64#[derive(Clone, Debug)]
65pub struct MezmoConfig {
66 #[configurable(metadata(docs::examples = "${LOGDNA_API_KEY}"))]
68 #[configurable(metadata(docs::examples = "ef8d5de700e7989468166c40fc8a0ccd"))]
69 api_key: SensitiveString,
70
71 #[serde(alias = "host")]
75 #[serde(default = "default_endpoint")]
76 #[configurable(metadata(docs::examples = "http://127.0.0.1"))]
77 #[configurable(metadata(docs::examples = "http://example.com"))]
78 endpoint: UriSerde,
79
80 #[configurable(metadata(docs::examples = "${HOSTNAME}"))]
82 #[configurable(metadata(docs::examples = "my-local-machine"))]
83 hostname: Template,
84
85 #[configurable(metadata(docs::examples = "my-mac-address"))]
87 #[configurable(metadata(docs::human_name = "MAC Address"))]
88 mac: Option<String>,
89
90 #[configurable(metadata(docs::examples = "0.0.0.0"))]
92 #[configurable(metadata(docs::human_name = "IP Address"))]
93 ip: Option<String>,
94
95 #[configurable(metadata(docs::examples = "tag1"))]
97 #[configurable(metadata(docs::examples = "tag2"))]
98 tags: Option<Vec<Template>>,
99
100 #[configurable(derived)]
101 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
102 pub encoding: Transformer,
103
104 #[serde(default = "default_app")]
106 #[configurable(metadata(docs::examples = "my-app"))]
107 default_app: String,
108
109 #[serde(default = "default_env")]
111 #[configurable(metadata(docs::examples = "staging"))]
112 default_env: String,
113
114 #[configurable(derived)]
115 #[serde(default)]
116 batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
117
118 #[configurable(derived)]
119 #[serde(default)]
120 request: TowerRequestConfig,
121
122 #[configurable(derived)]
123 #[serde(
124 default,
125 deserialize_with = "crate::serde::bool_or_struct",
126 skip_serializing_if = "crate::serde::is_default"
127 )]
128 acknowledgements: AcknowledgementsConfig,
129}
130
131fn default_endpoint() -> UriSerde {
132 UriSerde {
133 uri: Uri::from_static("https://logs.mezmo.com"),
134 auth: None,
135 }
136}
137
138fn default_app() -> String {
139 "vector".to_owned()
140}
141
142fn default_env() -> String {
143 "production".to_owned()
144}
145
146impl GenerateConfig for MezmoConfig {
147 fn generate_config() -> toml::Value {
148 toml::from_str(
149 r#"hostname = "hostname"
150 api_key = "${LOGDNA_API_KEY}""#,
151 )
152 .unwrap()
153 }
154}
155
156#[async_trait::async_trait]
157#[typetag::serde(name = "mezmo")]
158impl SinkConfig for MezmoConfig {
159 async fn build(
160 &self,
161 cx: SinkContext,
162 ) -> crate::Result<(super::VectorSink, super::Healthcheck)> {
163 let request_settings = self.request.into_settings();
164 let batch_settings = self.batch.into_batch_settings()?;
165 let client = HttpClient::new(None, cx.proxy())?;
166
167 let sink = PartitionHttpSink::new(
168 self.clone(),
169 PartitionBuffer::new(JsonArrayBuffer::new(batch_settings.size)),
170 request_settings,
171 batch_settings.timeout,
172 client.clone(),
173 )
174 .sink_map_err(|error| error!(message = "Fatal mezmo sink error.", %error));
175
176 let healthcheck = healthcheck(self.clone(), client).boxed();
177
178 #[allow(deprecated)]
179 Ok((super::VectorSink::from_event_sink(sink), healthcheck))
180 }
181
182 fn input(&self) -> Input {
183 let requirement = schema::Requirement::empty()
184 .optional_meaning("timestamp", Kind::timestamp())
185 .optional_meaning("message", Kind::bytes());
186
187 Input::log().with_schema_requirement(requirement)
188 }
189
190 fn acknowledgements(&self) -> &AcknowledgementsConfig {
191 &self.acknowledgements
192 }
193}
194
195#[derive(Hash, Eq, PartialEq, Clone)]
196pub struct PartitionKey {
197 hostname: String,
198 tags: Option<Vec<String>>,
199}
200
201pub struct MezmoEventEncoder {
202 hostname: Template,
203 tags: Option<Vec<Template>>,
204 transformer: Transformer,
205 default_app: String,
206 default_env: String,
207}
208
209impl MezmoEventEncoder {
210 fn render_key(
211 &self,
212 event: &Event,
213 ) -> Result<PartitionKey, (Option<&str>, TemplateRenderingError)> {
214 let hostname = self
215 .hostname
216 .render_string(event)
217 .map_err(|e| (Some("hostname"), e))?;
218 let tags = self
219 .tags
220 .as_ref()
221 .map(|tags| {
222 let mut vec = Vec::with_capacity(tags.len());
223 for tag in tags {
224 vec.push(tag.render_string(event).map_err(|e| (None, e))?);
225 }
226 Ok(Some(vec))
227 })
228 .unwrap_or(Ok(None))?;
229 Ok(PartitionKey { hostname, tags })
230 }
231}
232
233impl HttpEventEncoder<PartitionInnerBuffer<serde_json::Value, PartitionKey>> for MezmoEventEncoder {
234 fn encode_event(
235 &mut self,
236 mut event: Event,
237 ) -> Option<PartitionInnerBuffer<serde_json::Value, PartitionKey>> {
238 let key = self
239 .render_key(&event)
240 .map_err(|(field, error)| {
241 emit!(crate::internal_events::TemplateRenderingError {
242 error,
243 field,
244 drop_event: true,
245 });
246 })
247 .ok()?;
248
249 self.transformer.transform(&mut event);
250 let mut log = event.into_log();
251
252 let line = log
253 .message_path()
254 .cloned()
255 .as_ref()
256 .and_then(|path| log.remove(path))
257 .unwrap_or_else(|| String::from("").into());
258
259 let timestamp: Value = log
260 .timestamp_path()
261 .cloned()
262 .and_then(|path| log.remove(&path))
263 .unwrap_or_else(|| chrono::Utc::now().into());
264
265 let mut map = serde_json::map::Map::new();
266
267 map.insert("line".to_string(), json!(line));
268 map.insert("timestamp".to_string(), json!(timestamp));
269
270 if let Some(env) = log.remove(event_path!("env")) {
271 map.insert("env".to_string(), json!(env));
272 }
273
274 if let Some(app) = log.remove(event_path!("app")) {
275 map.insert("app".to_string(), json!(app));
276 }
277
278 if let Some(file) = log.remove(event_path!("file")) {
279 map.insert("file".to_string(), json!(file));
280 }
281
282 if !map.contains_key("env") {
283 map.insert("env".to_string(), json!(self.default_env));
284 }
285
286 if !map.contains_key("app") && !map.contains_key("file") {
287 map.insert("app".to_string(), json!(self.default_app.as_str()));
288 }
289
290 if !log.is_empty_object() {
291 map.insert("meta".into(), json!(&log));
292 }
293
294 Some(PartitionInnerBuffer::new(map.into(), key))
295 }
296}
297
298impl HttpSink for MezmoConfig {
299 type Input = PartitionInnerBuffer<serde_json::Value, PartitionKey>;
300 type Output = PartitionInnerBuffer<Vec<BoxedRawValue>, PartitionKey>;
301 type Encoder = MezmoEventEncoder;
302
303 fn build_encoder(&self) -> Self::Encoder {
304 MezmoEventEncoder {
305 hostname: self.hostname.clone(),
306 tags: self.tags.clone(),
307 transformer: self.encoding.clone(),
308 default_app: self.default_app.clone(),
309 default_env: self.default_env.clone(),
310 }
311 }
312
313 async fn build_request(&self, output: Self::Output) -> crate::Result<http::Request<Bytes>> {
314 let (events, key) = output.into_parts();
315 let mut query = url::form_urlencoded::Serializer::new(String::new());
316
317 let now = SystemTime::now()
318 .duration_since(SystemTime::UNIX_EPOCH)
319 .expect("Time can't drift behind the epoch!")
320 .as_millis();
321
322 query.append_pair("hostname", &key.hostname);
323 query.append_pair("now", &now.to_string());
324
325 if let Some(mac) = &self.mac {
326 query.append_pair("mac", mac);
327 }
328
329 if let Some(ip) = &self.ip {
330 query.append_pair("ip", ip);
331 }
332
333 if let Some(tags) = &key.tags {
334 let tags = tags.join(",");
335 query.append_pair("tags", &tags);
336 }
337
338 let query = query.finish();
339
340 let body = crate::serde::json::to_bytes(&json!({
341 "lines": events,
342 }))
343 .unwrap()
344 .freeze();
345
346 let uri = self.build_uri(&query);
347
348 let mut request = Request::builder()
349 .uri(uri)
350 .method("POST")
351 .header("Content-Type", "application/json")
352 .body(body)
353 .unwrap();
354
355 let auth = Auth::Basic {
356 user: self.api_key.inner().to_string(),
357 password: SensitiveString::default(),
358 };
359
360 auth.apply(&mut request);
361
362 Ok(request)
363 }
364}
365
366impl MezmoConfig {
367 fn build_uri(&self, query: &str) -> Uri {
368 let host = &self.endpoint.uri;
369
370 let uri = format!("{host}{PATH}?{query}");
371
372 uri.parse::<http::Uri>()
373 .expect("This should be a valid uri")
374 }
375}
376
377async fn healthcheck(config: MezmoConfig, client: HttpClient) -> crate::Result<()> {
378 let uri = config.build_uri("");
379
380 let req = Request::post(uri).body(hyper::Body::empty()).unwrap();
381
382 let res = client.send(req).await?;
383
384 if res.status().is_server_error() {
385 return Err("Server returned a server error".into());
386 }
387
388 if res.status() == StatusCode::FORBIDDEN {
389 return Err("Token is not valid, 403 returned.".into());
390 }
391
392 Ok(())
393}
394
395#[cfg(test)]
396mod tests {
397 use futures::{channel::mpsc, StreamExt};
398 use futures_util::stream;
399 use http::{request::Parts, StatusCode};
400 use serde_json::json;
401 use vector_lib::event::{BatchNotifier, BatchStatus, Event, LogEvent};
402
403 use super::*;
404 use crate::{
405 config::SinkConfig,
406 sinks::util::test::{build_test_server_status, load_sink},
407 test_util::{
408 components::{assert_sink_compliance, HTTP_SINK_TAGS},
409 next_addr, random_lines,
410 },
411 };
412
413 #[test]
414 fn generate_config() {
415 crate::test_util::test_generate_config::<MezmoConfig>();
416 }
417
418 #[test]
419 fn encode_event() {
420 let (config, _cx) = load_sink::<MezmoConfig>(
421 r#"
422 api_key = "mylogtoken"
423 hostname = "vector"
424 default_env = "acceptance"
425 codec.except_fields = ["magic"]
426 "#,
427 )
428 .unwrap();
429 let mut encoder = config.build_encoder();
430
431 let mut event1 = Event::Log(LogEvent::from("hello world"));
432 event1.as_mut_log().insert("app", "notvector");
433 event1.as_mut_log().insert("magic", "vector");
434
435 let mut event2 = Event::Log(LogEvent::from("hello world"));
436 event2.as_mut_log().insert("file", "log.txt");
437
438 let event3 = Event::Log(LogEvent::from("hello world"));
439
440 let mut event4 = Event::Log(LogEvent::from("hello world"));
441 event4.as_mut_log().insert("env", "staging");
442
443 let event1_out = encoder.encode_event(event1).unwrap().into_parts().0;
444 let event1_out = event1_out.as_object().unwrap();
445 let event2_out = encoder.encode_event(event2).unwrap().into_parts().0;
446 let event2_out = event2_out.as_object().unwrap();
447 let event3_out = encoder.encode_event(event3).unwrap().into_parts().0;
448 let event3_out = event3_out.as_object().unwrap();
449 let event4_out = encoder.encode_event(event4).unwrap().into_parts().0;
450 let event4_out = event4_out.as_object().unwrap();
451
452 assert_eq!(event1_out.get("app").unwrap(), &json!("notvector"));
453 assert_eq!(event2_out.get("file").unwrap(), &json!("log.txt"));
454 assert_eq!(event3_out.get("app").unwrap(), &json!("vector"));
455 assert_eq!(event3_out.get("env").unwrap(), &json!("acceptance"));
456 assert_eq!(event4_out.get("env").unwrap(), &json!("staging"));
457 }
458
459 async fn smoke_start(
460 status_code: StatusCode,
461 batch_status: BatchStatus,
462 ) -> (
463 Vec<&'static str>,
464 Vec<Vec<String>>,
465 mpsc::Receiver<(Parts, bytes::Bytes)>,
466 ) {
467 let (mut config, cx) = load_sink::<MezmoConfig>(
468 r#"
469 api_key = "mylogtoken"
470 ip = "127.0.0.1"
471 mac = "some-mac-addr"
472 hostname = "{{ hostname }}"
473 tags = ["test","maybeanothertest"]
474 "#,
475 )
476 .unwrap();
477
478 _ = config.build(cx.clone()).await.unwrap();
480
481 let addr = next_addr();
482 let endpoint = UriSerde {
485 uri: format!("http://{addr}").parse::<http::Uri>().unwrap(),
486 auth: None,
487 };
488 config.endpoint = endpoint;
489
490 let (sink, _) = config.build(cx).await.unwrap();
491
492 let (rx, _trigger, server) = build_test_server_status(addr, status_code);
493 tokio::spawn(server);
494
495 let lines = random_lines(100).take(10).collect::<Vec<_>>();
496 let mut events = Vec::new();
497 let hosts = vec!["host0", "host1"];
498
499 let (batch, mut receiver) = BatchNotifier::new_with_receiver();
500 let mut partitions = vec![Vec::new(), Vec::new()];
501 for (i, line) in lines.iter().enumerate() {
504 let mut event = LogEvent::from(line.as_str()).with_batch_notifier(&batch);
505 let p = i % 2;
506 event.insert("hostname", hosts[p]);
507
508 partitions[p].push(line.into());
509 events.push(Event::Log(event));
510 }
511 drop(batch);
512
513 let events = stream::iter(events).map(Into::into);
514 sink.run(events).await.expect("Running sink failed");
515
516 assert_eq!(receiver.try_recv(), Ok(batch_status));
517
518 (hosts, partitions, rx)
519 }
520
521 #[tokio::test]
522 async fn smoke_fails() {
523 let (_hosts, _partitions, mut rx) =
524 smoke_start(StatusCode::FORBIDDEN, BatchStatus::Rejected).await;
525 assert!(matches!(rx.try_next(), Err(mpsc::TryRecvError { .. })));
526 }
527
528 #[tokio::test]
529 async fn smoke() {
530 assert_sink_compliance(&HTTP_SINK_TAGS, async {
531 let (hosts, partitions, mut rx) =
532 smoke_start(StatusCode::OK, BatchStatus::Delivered).await;
533
534 for _ in 0..partitions.len() {
535 let output = rx.next().await.unwrap();
536
537 let request = &output.0;
538 let body: serde_json::Value = serde_json::from_slice(&output.1[..]).unwrap();
539
540 let query = request.uri.query().unwrap();
541
542 let (p, host) = hosts
543 .iter()
544 .enumerate()
545 .find(|(_, host)| query.contains(&format!("hostname={host}")))
546 .expect("invalid hostname");
547 let lines = &partitions[p];
548
549 assert!(query.contains("ip=127.0.0.1"));
550 assert!(query.contains("mac=some-mac-addr"));
551 assert!(query.contains("tags=test%2Cmaybeanothertest"));
552
553 let output = body
554 .as_object()
555 .unwrap()
556 .get("lines")
557 .unwrap()
558 .as_array()
559 .unwrap();
560
561 for (i, line) in output.iter().enumerate() {
562 let line = line.as_object().unwrap();
564
565 assert_eq!(line.get("app").unwrap(), &json!("vector"));
566 assert_eq!(line.get("env").unwrap(), &json!("production"));
567 assert_eq!(line.get("line").unwrap(), &json!(lines[i]));
568
569 assert_eq!(
570 line.get("meta").unwrap(),
571 &json!({
572 "hostname": host,
573 })
574 );
575 }
576 }
577 })
578 .await;
579 }
580}