1use std::time::SystemTime;
2
3use bytes::Bytes;
4use futures::{FutureExt, SinkExt};
5use http::{Request, StatusCode, Uri};
6use serde_json::json;
7use vector_lib::{configurable::configurable_component, sensitive_string::SensitiveString};
8use vrl::{
9 event_path,
10 value::{Kind, Value},
11};
12
13use crate::{
14 codecs::Transformer,
15 config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
16 event::Event,
17 http::{Auth, HttpClient},
18 schema,
19 sinks::util::{
20 BatchConfig, BoxedRawValue, JsonArrayBuffer, PartitionBuffer, PartitionInnerBuffer,
21 RealtimeSizeBasedDefaultBatchSettings, TowerRequestConfig, UriSerde,
22 http::{HttpEventEncoder, HttpSink, PartitionHttpSink},
23 },
24 template::{Template, TemplateRenderingError},
25};
26
27const PATH: &str = "/logs/ingest";
28
29#[configurable_component(sink("logdna", "Deliver log event data to LogDNA."))]
31#[configurable(metadata(
32 deprecated = "The `logdna` sink has been renamed. Please use `mezmo` instead."
33))]
34#[derive(Clone, Debug)]
35pub struct LogdnaConfig(MezmoConfig);
36
37impl GenerateConfig for LogdnaConfig {
38 fn generate_config() -> toml::Value {
39 <MezmoConfig as GenerateConfig>::generate_config()
40 }
41}
42
43#[async_trait::async_trait]
44#[typetag::serde(name = "logdna")]
45impl SinkConfig for LogdnaConfig {
46 async fn build(
47 &self,
48 cx: SinkContext,
49 ) -> crate::Result<(super::VectorSink, super::Healthcheck)> {
50 warn!("DEPRECATED: The `logdna` sink has been renamed. Please use `mezmo` instead.");
51 self.0.build(cx).await
52 }
53
54 fn input(&self) -> Input {
55 self.0.input()
56 }
57
58 fn acknowledgements(&self) -> &AcknowledgementsConfig {
59 self.0.acknowledgements()
60 }
61}
62
63#[configurable_component(sink("mezmo", "Deliver log event data to Mezmo."))]
65#[derive(Clone, Debug)]
66pub struct MezmoConfig {
67 #[configurable(metadata(docs::examples = "${LOGDNA_API_KEY}"))]
69 #[configurable(metadata(docs::examples = "ef8d5de700e7989468166c40fc8a0ccd"))]
70 api_key: SensitiveString,
71
72 #[serde(alias = "host")]
76 #[serde(default = "default_endpoint")]
77 #[configurable(metadata(docs::examples = "http://127.0.0.1"))]
78 #[configurable(metadata(docs::examples = "http://example.com"))]
79 endpoint: UriSerde,
80
81 #[configurable(metadata(docs::examples = "${HOSTNAME}"))]
83 #[configurable(metadata(docs::examples = "my-local-machine"))]
84 hostname: Template,
85
86 #[configurable(metadata(docs::examples = "my-mac-address"))]
88 #[configurable(metadata(docs::human_name = "MAC Address"))]
89 mac: Option<String>,
90
91 #[configurable(metadata(docs::examples = "0.0.0.0"))]
93 #[configurable(metadata(docs::human_name = "IP Address"))]
94 ip: Option<String>,
95
96 #[configurable(metadata(docs::examples = "tag1"))]
98 #[configurable(metadata(docs::examples = "tag2"))]
99 tags: Option<Vec<Template>>,
100
101 #[configurable(derived)]
102 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
103 pub encoding: Transformer,
104
105 #[serde(default = "default_app")]
107 #[configurable(metadata(docs::examples = "my-app"))]
108 default_app: String,
109
110 #[serde(default = "default_env")]
112 #[configurable(metadata(docs::examples = "staging"))]
113 default_env: String,
114
115 #[configurable(derived)]
116 #[serde(default)]
117 batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
118
119 #[configurable(derived)]
120 #[serde(default)]
121 request: TowerRequestConfig,
122
123 #[configurable(derived)]
124 #[serde(
125 default,
126 deserialize_with = "crate::serde::bool_or_struct",
127 skip_serializing_if = "crate::serde::is_default"
128 )]
129 acknowledgements: AcknowledgementsConfig,
130}
131
132fn default_endpoint() -> UriSerde {
133 UriSerde {
134 uri: Uri::from_static("https://logs.mezmo.com"),
135 auth: None,
136 }
137}
138
139fn default_app() -> String {
140 "vector".to_owned()
141}
142
143fn default_env() -> String {
144 "production".to_owned()
145}
146
147impl GenerateConfig for MezmoConfig {
148 fn generate_config() -> toml::Value {
149 toml::from_str(
150 r#"hostname = "hostname"
151 api_key = "${LOGDNA_API_KEY}""#,
152 )
153 .unwrap()
154 }
155}
156
157#[async_trait::async_trait]
158#[typetag::serde(name = "mezmo")]
159impl SinkConfig for MezmoConfig {
160 async fn build(
161 &self,
162 cx: SinkContext,
163 ) -> crate::Result<(super::VectorSink, super::Healthcheck)> {
164 let request_settings = self.request.into_settings();
165 let batch_settings = self.batch.into_batch_settings()?;
166 let client = HttpClient::new(None, cx.proxy())?;
167
168 let sink = PartitionHttpSink::new(
169 self.clone(),
170 PartitionBuffer::new(JsonArrayBuffer::new(batch_settings.size)),
171 request_settings,
172 batch_settings.timeout,
173 client.clone(),
174 )
175 .sink_map_err(|error| error!(message = "Fatal mezmo sink error.", %error, internal_log_rate_limit = false));
176
177 let healthcheck = healthcheck(self.clone(), client).boxed();
178
179 #[allow(deprecated)]
180 Ok((super::VectorSink::from_event_sink(sink), healthcheck))
181 }
182
183 fn input(&self) -> Input {
184 let requirement = schema::Requirement::empty()
185 .optional_meaning("timestamp", Kind::timestamp())
186 .optional_meaning("message", Kind::bytes());
187
188 Input::log().with_schema_requirement(requirement)
189 }
190
191 fn acknowledgements(&self) -> &AcknowledgementsConfig {
192 &self.acknowledgements
193 }
194}
195
196#[derive(Hash, Eq, PartialEq, Clone)]
197pub struct PartitionKey {
198 hostname: String,
199 tags: Option<Vec<String>>,
200}
201
202pub struct MezmoEventEncoder {
203 hostname: Template,
204 tags: Option<Vec<Template>>,
205 transformer: Transformer,
206 default_app: String,
207 default_env: String,
208}
209
210impl MezmoEventEncoder {
211 fn render_key(
212 &self,
213 event: &Event,
214 ) -> Result<PartitionKey, (Option<&str>, TemplateRenderingError)> {
215 let hostname = self
216 .hostname
217 .render_string(event)
218 .map_err(|e| (Some("hostname"), e))?;
219 let tags = self
220 .tags
221 .as_ref()
222 .map(|tags| {
223 let mut vec = Vec::with_capacity(tags.len());
224 for tag in tags {
225 vec.push(tag.render_string(event).map_err(|e| (None, e))?);
226 }
227 Ok(Some(vec))
228 })
229 .unwrap_or(Ok(None))?;
230 Ok(PartitionKey { hostname, tags })
231 }
232}
233
234impl HttpEventEncoder<PartitionInnerBuffer<serde_json::Value, PartitionKey>> for MezmoEventEncoder {
235 fn encode_event(
236 &mut self,
237 mut event: Event,
238 ) -> Option<PartitionInnerBuffer<serde_json::Value, PartitionKey>> {
239 let key = self
240 .render_key(&event)
241 .map_err(|(field, error)| {
242 emit!(crate::internal_events::TemplateRenderingError {
243 error,
244 field,
245 drop_event: true,
246 });
247 })
248 .ok()?;
249
250 self.transformer.transform(&mut event);
251 let mut log = event.into_log();
252
253 let line = log
254 .message_path()
255 .cloned()
256 .as_ref()
257 .and_then(|path| log.remove(path))
258 .unwrap_or_else(|| String::from("").into());
259
260 let timestamp: Value = log
261 .timestamp_path()
262 .cloned()
263 .and_then(|path| log.remove(&path))
264 .unwrap_or_else(|| chrono::Utc::now().into());
265
266 let mut map = serde_json::map::Map::new();
267
268 map.insert("line".to_string(), json!(line));
269 map.insert("timestamp".to_string(), json!(timestamp));
270
271 if let Some(env) = log.remove(event_path!("env")) {
272 map.insert("env".to_string(), json!(env));
273 }
274
275 if let Some(app) = log.remove(event_path!("app")) {
276 map.insert("app".to_string(), json!(app));
277 }
278
279 if let Some(file) = log.remove(event_path!("file")) {
280 map.insert("file".to_string(), json!(file));
281 }
282
283 if !map.contains_key("env") {
284 map.insert("env".to_string(), json!(self.default_env));
285 }
286
287 if !map.contains_key("app") && !map.contains_key("file") {
288 map.insert("app".to_string(), json!(self.default_app.as_str()));
289 }
290
291 if !log.is_empty_object() {
292 map.insert("meta".into(), json!(&log));
293 }
294
295 Some(PartitionInnerBuffer::new(map.into(), key))
296 }
297}
298
299impl HttpSink for MezmoConfig {
300 type Input = PartitionInnerBuffer<serde_json::Value, PartitionKey>;
301 type Output = PartitionInnerBuffer<Vec<BoxedRawValue>, PartitionKey>;
302 type Encoder = MezmoEventEncoder;
303
304 fn build_encoder(&self) -> Self::Encoder {
305 MezmoEventEncoder {
306 hostname: self.hostname.clone(),
307 tags: self.tags.clone(),
308 transformer: self.encoding.clone(),
309 default_app: self.default_app.clone(),
310 default_env: self.default_env.clone(),
311 }
312 }
313
314 async fn build_request(&self, output: Self::Output) -> crate::Result<http::Request<Bytes>> {
315 let (events, key) = output.into_parts();
316 let mut query = url::form_urlencoded::Serializer::new(String::new());
317
318 let now = SystemTime::now()
319 .duration_since(SystemTime::UNIX_EPOCH)
320 .expect("Time can't drift behind the epoch!")
321 .as_millis();
322
323 query.append_pair("hostname", &key.hostname);
324 query.append_pair("now", &now.to_string());
325
326 if let Some(mac) = &self.mac {
327 query.append_pair("mac", mac);
328 }
329
330 if let Some(ip) = &self.ip {
331 query.append_pair("ip", ip);
332 }
333
334 if let Some(tags) = &key.tags {
335 let tags = tags.join(",");
336 query.append_pair("tags", &tags);
337 }
338
339 let query = query.finish();
340
341 let body = crate::serde::json::to_bytes(&json!({
342 "lines": events,
343 }))
344 .unwrap()
345 .freeze();
346
347 let uri = self.build_uri(&query);
348
349 let mut request = Request::builder()
350 .uri(uri)
351 .method("POST")
352 .header("Content-Type", "application/json")
353 .body(body)
354 .unwrap();
355
356 let auth = Auth::Basic {
357 user: self.api_key.inner().to_string(),
358 password: SensitiveString::default(),
359 };
360
361 auth.apply(&mut request);
362
363 Ok(request)
364 }
365}
366
367impl MezmoConfig {
368 fn build_uri(&self, query: &str) -> Uri {
369 let host = &self.endpoint.uri;
370
371 let uri = format!("{host}{PATH}?{query}");
372
373 uri.parse::<http::Uri>()
374 .expect("This should be a valid uri")
375 }
376}
377
378async fn healthcheck(config: MezmoConfig, client: HttpClient) -> crate::Result<()> {
379 let uri = config.build_uri("");
380
381 let req = Request::post(uri).body(hyper::Body::empty()).unwrap();
382
383 let res = client.send(req).await?;
384
385 if res.status().is_server_error() {
386 return Err("Server returned a server error".into());
387 }
388
389 if res.status() == StatusCode::FORBIDDEN {
390 return Err("Token is not valid, 403 returned.".into());
391 }
392
393 Ok(())
394}
395
396#[cfg(test)]
397mod tests {
398 use futures::{StreamExt, channel::mpsc};
399 use futures_util::stream;
400 use http::{StatusCode, request::Parts};
401 use serde_json::json;
402 use vector_lib::event::{BatchNotifier, BatchStatus, Event, LogEvent};
403
404 use super::*;
405 use crate::{
406 config::SinkConfig,
407 sinks::util::test::{build_test_server_status, load_sink},
408 test_util::{
409 addr::next_addr,
410 components::{HTTP_SINK_TAGS, assert_sink_compliance},
411 random_lines,
412 },
413 };
414
415 #[test]
416 fn generate_config() {
417 crate::test_util::test_generate_config::<MezmoConfig>();
418 }
419
420 #[test]
421 fn encode_event() {
422 let (config, _cx) = load_sink::<MezmoConfig>(
423 r#"
424 api_key = "mylogtoken"
425 hostname = "vector"
426 default_env = "acceptance"
427 codec.except_fields = ["magic"]
428 "#,
429 )
430 .unwrap();
431 let mut encoder = config.build_encoder();
432
433 let mut event1 = Event::Log(LogEvent::from("hello world"));
434 event1.as_mut_log().insert("app", "notvector");
435 event1.as_mut_log().insert("magic", "vector");
436
437 let mut event2 = Event::Log(LogEvent::from("hello world"));
438 event2.as_mut_log().insert("file", "log.txt");
439
440 let event3 = Event::Log(LogEvent::from("hello world"));
441
442 let mut event4 = Event::Log(LogEvent::from("hello world"));
443 event4.as_mut_log().insert("env", "staging");
444
445 let event1_out = encoder.encode_event(event1).unwrap().into_parts().0;
446 let event1_out = event1_out.as_object().unwrap();
447 let event2_out = encoder.encode_event(event2).unwrap().into_parts().0;
448 let event2_out = event2_out.as_object().unwrap();
449 let event3_out = encoder.encode_event(event3).unwrap().into_parts().0;
450 let event3_out = event3_out.as_object().unwrap();
451 let event4_out = encoder.encode_event(event4).unwrap().into_parts().0;
452 let event4_out = event4_out.as_object().unwrap();
453
454 assert_eq!(event1_out.get("app").unwrap(), &json!("notvector"));
455 assert_eq!(event2_out.get("file").unwrap(), &json!("log.txt"));
456 assert_eq!(event3_out.get("app").unwrap(), &json!("vector"));
457 assert_eq!(event3_out.get("env").unwrap(), &json!("acceptance"));
458 assert_eq!(event4_out.get("env").unwrap(), &json!("staging"));
459 }
460
461 async fn smoke_start(
462 status_code: StatusCode,
463 batch_status: BatchStatus,
464 ) -> (
465 Vec<&'static str>,
466 Vec<Vec<String>>,
467 mpsc::Receiver<(Parts, bytes::Bytes)>,
468 ) {
469 let (mut config, cx) = load_sink::<MezmoConfig>(
470 r#"
471 api_key = "mylogtoken"
472 ip = "127.0.0.1"
473 mac = "some-mac-addr"
474 hostname = "{{ hostname }}"
475 tags = ["test","maybeanothertest"]
476 "#,
477 )
478 .unwrap();
479
480 _ = config.build(cx.clone()).await.unwrap();
482
483 let (_guard, addr) = next_addr();
484 let endpoint = UriSerde {
487 uri: format!("http://{addr}").parse::<http::Uri>().unwrap(),
488 auth: None,
489 };
490 config.endpoint = endpoint;
491
492 let (sink, _) = config.build(cx).await.unwrap();
493
494 let (rx, _trigger, server) = build_test_server_status(addr, status_code);
495 tokio::spawn(server);
496
497 let lines = random_lines(100).take(10).collect::<Vec<_>>();
498 let mut events = Vec::new();
499 let hosts = vec!["host0", "host1"];
500
501 let (batch, mut receiver) = BatchNotifier::new_with_receiver();
502 let mut partitions = vec![Vec::new(), Vec::new()];
503 for (i, line) in lines.iter().enumerate() {
506 let mut event = LogEvent::from(line.as_str()).with_batch_notifier(&batch);
507 let p = i % 2;
508 event.insert("hostname", hosts[p]);
509
510 partitions[p].push(line.into());
511 events.push(Event::Log(event));
512 }
513 drop(batch);
514
515 let events = stream::iter(events).map(Into::into);
516 sink.run(events).await.expect("Running sink failed");
517
518 assert_eq!(receiver.try_recv(), Ok(batch_status));
519
520 (hosts, partitions, rx)
521 }
522
523 #[tokio::test]
524 async fn smoke_fails() {
525 let (_hosts, _partitions, mut rx) =
526 smoke_start(StatusCode::FORBIDDEN, BatchStatus::Rejected).await;
527 assert!(matches!(rx.try_next(), Err(mpsc::TryRecvError { .. })));
528 }
529
530 #[tokio::test]
531 async fn smoke() {
532 assert_sink_compliance(&HTTP_SINK_TAGS, async {
533 let (hosts, partitions, mut rx) =
534 smoke_start(StatusCode::OK, BatchStatus::Delivered).await;
535
536 for _ in 0..partitions.len() {
537 let output = rx.next().await.unwrap();
538
539 let request = &output.0;
540 let body: serde_json::Value = serde_json::from_slice(&output.1[..]).unwrap();
541
542 let query = request.uri.query().unwrap();
543
544 let (p, host) = hosts
545 .iter()
546 .enumerate()
547 .find(|(_, host)| query.contains(&format!("hostname={host}")))
548 .expect("invalid hostname");
549 let lines = &partitions[p];
550
551 assert!(query.contains("ip=127.0.0.1"));
552 assert!(query.contains("mac=some-mac-addr"));
553 assert!(query.contains("tags=test%2Cmaybeanothertest"));
554
555 let output = body
556 .as_object()
557 .unwrap()
558 .get("lines")
559 .unwrap()
560 .as_array()
561 .unwrap();
562
563 for (i, line) in output.iter().enumerate() {
564 let line = line.as_object().unwrap();
566
567 assert_eq!(line.get("app").unwrap(), &json!("vector"));
568 assert_eq!(line.get("env").unwrap(), &json!("production"));
569 assert_eq!(line.get("line").unwrap(), &json!(lines[i]));
570
571 assert_eq!(
572 line.get("meta").unwrap(),
573 &json!({
574 "hostname": host,
575 })
576 );
577 }
578 }
579 })
580 .await;
581 }
582}