1use std::time::SystemTime;
2
3use bytes::Bytes;
4use futures::{FutureExt, SinkExt};
5use http::{Request, StatusCode, Uri};
6use serde_json::json;
7use vector_lib::{configurable::configurable_component, sensitive_string::SensitiveString};
8use vrl::{
9 event_path,
10 value::{Kind, Value},
11};
12
13use crate::{
14 codecs::Transformer,
15 config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
16 event::Event,
17 http::{Auth, HttpClient},
18 schema,
19 sinks::util::{
20 BatchConfig, BoxedRawValue, JsonArrayBuffer, PartitionBuffer, PartitionInnerBuffer,
21 RealtimeSizeBasedDefaultBatchSettings, TowerRequestConfig, UriSerde,
22 http::{HttpEventEncoder, HttpSink, PartitionHttpSink},
23 },
24 template::{Template, TemplateRenderingError},
25};
26
27const PATH: &str = "/logs/ingest";
28
29#[configurable_component(sink("logdna", "Deliver log event data to LogDNA."))]
31#[configurable(metadata(
32 deprecated = "The `logdna` sink has been renamed. Please use `mezmo` instead."
33))]
34#[derive(Clone, Debug)]
35pub struct LogdnaConfig(MezmoConfig);
36
37impl GenerateConfig for LogdnaConfig {
38 fn generate_config() -> toml::Value {
39 <MezmoConfig as GenerateConfig>::generate_config()
40 }
41}
42
43#[async_trait::async_trait]
44#[typetag::serde(name = "logdna")]
45impl SinkConfig for LogdnaConfig {
46 async fn build(
47 &self,
48 cx: SinkContext,
49 ) -> crate::Result<(super::VectorSink, super::Healthcheck)> {
50 warn!("DEPRECATED: The `logdna` sink has been renamed. Please use `mezmo` instead.");
51 self.0.build(cx).await
52 }
53
54 fn input(&self) -> Input {
55 self.0.input()
56 }
57
58 fn acknowledgements(&self) -> &AcknowledgementsConfig {
59 self.0.acknowledgements()
60 }
61}
62
63#[configurable_component(sink("mezmo", "Deliver log event data to Mezmo."))]
65#[derive(Clone, Debug)]
66pub struct MezmoConfig {
67 #[configurable(metadata(docs::examples = "${LOGDNA_API_KEY}"))]
69 #[configurable(metadata(docs::examples = "ef8d5de700e7989468166c40fc8a0ccd"))]
70 api_key: SensitiveString,
71
72 #[serde(alias = "host")]
76 #[serde(default = "default_endpoint")]
77 #[configurable(metadata(docs::examples = "http://127.0.0.1"))]
78 #[configurable(metadata(docs::examples = "http://example.com"))]
79 endpoint: UriSerde,
80
81 #[configurable(metadata(docs::examples = "${HOSTNAME}"))]
83 #[configurable(metadata(docs::examples = "my-local-machine"))]
84 hostname: Template,
85
86 #[configurable(metadata(docs::examples = "my-mac-address"))]
88 #[configurable(metadata(docs::human_name = "MAC Address"))]
89 mac: Option<String>,
90
91 #[configurable(metadata(docs::examples = "0.0.0.0"))]
93 #[configurable(metadata(docs::human_name = "IP Address"))]
94 ip: Option<String>,
95
96 #[configurable(metadata(docs::examples = "tag1"))]
98 #[configurable(metadata(docs::examples = "tag2"))]
99 tags: Option<Vec<Template>>,
100
101 #[configurable(derived)]
102 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
103 pub encoding: Transformer,
104
105 #[serde(default = "default_app")]
107 #[configurable(metadata(docs::examples = "my-app"))]
108 default_app: String,
109
110 #[serde(default = "default_env")]
112 #[configurable(metadata(docs::examples = "staging"))]
113 default_env: String,
114
115 #[configurable(derived)]
116 #[serde(default)]
117 batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
118
119 #[configurable(derived)]
120 #[serde(default)]
121 request: TowerRequestConfig,
122
123 #[configurable(derived)]
124 #[serde(
125 default,
126 deserialize_with = "crate::serde::bool_or_struct",
127 skip_serializing_if = "crate::serde::is_default"
128 )]
129 acknowledgements: AcknowledgementsConfig,
130}
131
132fn default_endpoint() -> UriSerde {
133 UriSerde {
134 uri: Uri::from_static("https://logs.mezmo.com"),
135 auth: None,
136 }
137}
138
139fn default_app() -> String {
140 "vector".to_owned()
141}
142
143fn default_env() -> String {
144 "production".to_owned()
145}
146
147impl GenerateConfig for MezmoConfig {
148 fn generate_config() -> toml::Value {
149 toml::from_str(
150 r#"hostname = "hostname"
151 api_key = "${LOGDNA_API_KEY}""#,
152 )
153 .unwrap()
154 }
155}
156
157#[async_trait::async_trait]
158#[typetag::serde(name = "mezmo")]
159impl SinkConfig for MezmoConfig {
160 async fn build(
161 &self,
162 cx: SinkContext,
163 ) -> crate::Result<(super::VectorSink, super::Healthcheck)> {
164 let request_settings = self.request.into_settings();
165 let batch_settings = self.batch.into_batch_settings()?;
166 let client = HttpClient::new(None, cx.proxy())?;
167
168 let sink = PartitionHttpSink::new(
169 self.clone(),
170 PartitionBuffer::new(JsonArrayBuffer::new(batch_settings.size)),
171 request_settings,
172 batch_settings.timeout,
173 client.clone(),
174 )
175 .sink_map_err(|error| error!(message = "Fatal mezmo sink error.", %error));
176
177 let healthcheck = healthcheck(self.clone(), client).boxed();
178
179 #[allow(deprecated)]
180 Ok((super::VectorSink::from_event_sink(sink), healthcheck))
181 }
182
183 fn input(&self) -> Input {
184 let requirement = schema::Requirement::empty()
185 .optional_meaning("timestamp", Kind::timestamp())
186 .optional_meaning("message", Kind::bytes());
187
188 Input::log().with_schema_requirement(requirement)
189 }
190
191 fn acknowledgements(&self) -> &AcknowledgementsConfig {
192 &self.acknowledgements
193 }
194}
195
196#[derive(Hash, Eq, PartialEq, Clone)]
197pub struct PartitionKey {
198 hostname: String,
199 tags: Option<Vec<String>>,
200}
201
202pub struct MezmoEventEncoder {
203 hostname: Template,
204 tags: Option<Vec<Template>>,
205 transformer: Transformer,
206 default_app: String,
207 default_env: String,
208}
209
210impl MezmoEventEncoder {
211 fn render_key(
212 &self,
213 event: &Event,
214 ) -> Result<PartitionKey, (Option<&str>, TemplateRenderingError)> {
215 let hostname = self
216 .hostname
217 .render_string(event)
218 .map_err(|e| (Some("hostname"), e))?;
219 let tags = self
220 .tags
221 .as_ref()
222 .map(|tags| {
223 let mut vec = Vec::with_capacity(tags.len());
224 for tag in tags {
225 vec.push(tag.render_string(event).map_err(|e| (None, e))?);
226 }
227 Ok(Some(vec))
228 })
229 .unwrap_or(Ok(None))?;
230 Ok(PartitionKey { hostname, tags })
231 }
232}
233
234impl HttpEventEncoder<PartitionInnerBuffer<serde_json::Value, PartitionKey>> for MezmoEventEncoder {
235 fn encode_event(
236 &mut self,
237 mut event: Event,
238 ) -> Option<PartitionInnerBuffer<serde_json::Value, PartitionKey>> {
239 let key = self
240 .render_key(&event)
241 .map_err(|(field, error)| {
242 emit!(crate::internal_events::TemplateRenderingError {
243 error,
244 field,
245 drop_event: true,
246 });
247 })
248 .ok()?;
249
250 self.transformer.transform(&mut event);
251 let mut log = event.into_log();
252
253 let line = log
254 .message_path()
255 .cloned()
256 .as_ref()
257 .and_then(|path| log.remove(path))
258 .unwrap_or_else(|| String::from("").into());
259
260 let timestamp: Value = log
261 .timestamp_path()
262 .cloned()
263 .and_then(|path| log.remove(&path))
264 .unwrap_or_else(|| chrono::Utc::now().into());
265
266 let mut map = serde_json::map::Map::new();
267
268 map.insert("line".to_string(), json!(line));
269 map.insert("timestamp".to_string(), json!(timestamp));
270
271 if let Some(env) = log.remove(event_path!("env")) {
272 map.insert("env".to_string(), json!(env));
273 }
274
275 if let Some(app) = log.remove(event_path!("app")) {
276 map.insert("app".to_string(), json!(app));
277 }
278
279 if let Some(file) = log.remove(event_path!("file")) {
280 map.insert("file".to_string(), json!(file));
281 }
282
283 if !map.contains_key("env") {
284 map.insert("env".to_string(), json!(self.default_env));
285 }
286
287 if !map.contains_key("app") && !map.contains_key("file") {
288 map.insert("app".to_string(), json!(self.default_app.as_str()));
289 }
290
291 if !log.is_empty_object() {
292 map.insert("meta".into(), json!(&log));
293 }
294
295 Some(PartitionInnerBuffer::new(map.into(), key))
296 }
297}
298
299impl HttpSink for MezmoConfig {
300 type Input = PartitionInnerBuffer<serde_json::Value, PartitionKey>;
301 type Output = PartitionInnerBuffer<Vec<BoxedRawValue>, PartitionKey>;
302 type Encoder = MezmoEventEncoder;
303
304 fn build_encoder(&self) -> Self::Encoder {
305 MezmoEventEncoder {
306 hostname: self.hostname.clone(),
307 tags: self.tags.clone(),
308 transformer: self.encoding.clone(),
309 default_app: self.default_app.clone(),
310 default_env: self.default_env.clone(),
311 }
312 }
313
314 async fn build_request(&self, output: Self::Output) -> crate::Result<http::Request<Bytes>> {
315 let (events, key) = output.into_parts();
316 let mut query = url::form_urlencoded::Serializer::new(String::new());
317
318 let now = SystemTime::now()
319 .duration_since(SystemTime::UNIX_EPOCH)
320 .expect("Time can't drift behind the epoch!")
321 .as_millis();
322
323 query.append_pair("hostname", &key.hostname);
324 query.append_pair("now", &now.to_string());
325
326 if let Some(mac) = &self.mac {
327 query.append_pair("mac", mac);
328 }
329
330 if let Some(ip) = &self.ip {
331 query.append_pair("ip", ip);
332 }
333
334 if let Some(tags) = &key.tags {
335 let tags = tags.join(",");
336 query.append_pair("tags", &tags);
337 }
338
339 let query = query.finish();
340
341 let body = crate::serde::json::to_bytes(&json!({
342 "lines": events,
343 }))
344 .unwrap()
345 .freeze();
346
347 let uri = self.build_uri(&query);
348
349 let mut request = Request::builder()
350 .uri(uri)
351 .method("POST")
352 .header("Content-Type", "application/json")
353 .body(body)
354 .unwrap();
355
356 let auth = Auth::Basic {
357 user: self.api_key.inner().to_string(),
358 password: SensitiveString::default(),
359 };
360
361 auth.apply(&mut request);
362
363 Ok(request)
364 }
365}
366
367impl MezmoConfig {
368 fn build_uri(&self, query: &str) -> Uri {
369 let host = &self.endpoint.uri;
370
371 let uri = format!("{host}{PATH}?{query}");
372
373 uri.parse::<http::Uri>()
374 .expect("This should be a valid uri")
375 }
376}
377
378async fn healthcheck(config: MezmoConfig, client: HttpClient) -> crate::Result<()> {
379 let uri = config.build_uri("");
380
381 let req = Request::post(uri).body(hyper::Body::empty()).unwrap();
382
383 let res = client.send(req).await?;
384
385 if res.status().is_server_error() {
386 return Err("Server returned a server error".into());
387 }
388
389 if res.status() == StatusCode::FORBIDDEN {
390 return Err("Token is not valid, 403 returned.".into());
391 }
392
393 Ok(())
394}
395
396#[cfg(test)]
397mod tests {
398 use futures::{StreamExt, channel::mpsc};
399 use futures_util::stream;
400 use http::{StatusCode, request::Parts};
401 use serde_json::json;
402 use vector_lib::event::{BatchNotifier, BatchStatus, Event, LogEvent};
403
404 use super::*;
405 use crate::{
406 config::SinkConfig,
407 sinks::util::test::{build_test_server_status, load_sink},
408 test_util::{
409 components::{HTTP_SINK_TAGS, assert_sink_compliance},
410 next_addr, random_lines,
411 },
412 };
413
414 #[test]
415 fn generate_config() {
416 crate::test_util::test_generate_config::<MezmoConfig>();
417 }
418
419 #[test]
420 fn encode_event() {
421 let (config, _cx) = load_sink::<MezmoConfig>(
422 r#"
423 api_key = "mylogtoken"
424 hostname = "vector"
425 default_env = "acceptance"
426 codec.except_fields = ["magic"]
427 "#,
428 )
429 .unwrap();
430 let mut encoder = config.build_encoder();
431
432 let mut event1 = Event::Log(LogEvent::from("hello world"));
433 event1.as_mut_log().insert("app", "notvector");
434 event1.as_mut_log().insert("magic", "vector");
435
436 let mut event2 = Event::Log(LogEvent::from("hello world"));
437 event2.as_mut_log().insert("file", "log.txt");
438
439 let event3 = Event::Log(LogEvent::from("hello world"));
440
441 let mut event4 = Event::Log(LogEvent::from("hello world"));
442 event4.as_mut_log().insert("env", "staging");
443
444 let event1_out = encoder.encode_event(event1).unwrap().into_parts().0;
445 let event1_out = event1_out.as_object().unwrap();
446 let event2_out = encoder.encode_event(event2).unwrap().into_parts().0;
447 let event2_out = event2_out.as_object().unwrap();
448 let event3_out = encoder.encode_event(event3).unwrap().into_parts().0;
449 let event3_out = event3_out.as_object().unwrap();
450 let event4_out = encoder.encode_event(event4).unwrap().into_parts().0;
451 let event4_out = event4_out.as_object().unwrap();
452
453 assert_eq!(event1_out.get("app").unwrap(), &json!("notvector"));
454 assert_eq!(event2_out.get("file").unwrap(), &json!("log.txt"));
455 assert_eq!(event3_out.get("app").unwrap(), &json!("vector"));
456 assert_eq!(event3_out.get("env").unwrap(), &json!("acceptance"));
457 assert_eq!(event4_out.get("env").unwrap(), &json!("staging"));
458 }
459
460 async fn smoke_start(
461 status_code: StatusCode,
462 batch_status: BatchStatus,
463 ) -> (
464 Vec<&'static str>,
465 Vec<Vec<String>>,
466 mpsc::Receiver<(Parts, bytes::Bytes)>,
467 ) {
468 let (mut config, cx) = load_sink::<MezmoConfig>(
469 r#"
470 api_key = "mylogtoken"
471 ip = "127.0.0.1"
472 mac = "some-mac-addr"
473 hostname = "{{ hostname }}"
474 tags = ["test","maybeanothertest"]
475 "#,
476 )
477 .unwrap();
478
479 _ = config.build(cx.clone()).await.unwrap();
481
482 let addr = next_addr();
483 let endpoint = UriSerde {
486 uri: format!("http://{addr}").parse::<http::Uri>().unwrap(),
487 auth: None,
488 };
489 config.endpoint = endpoint;
490
491 let (sink, _) = config.build(cx).await.unwrap();
492
493 let (rx, _trigger, server) = build_test_server_status(addr, status_code);
494 tokio::spawn(server);
495
496 let lines = random_lines(100).take(10).collect::<Vec<_>>();
497 let mut events = Vec::new();
498 let hosts = vec!["host0", "host1"];
499
500 let (batch, mut receiver) = BatchNotifier::new_with_receiver();
501 let mut partitions = vec![Vec::new(), Vec::new()];
502 for (i, line) in lines.iter().enumerate() {
505 let mut event = LogEvent::from(line.as_str()).with_batch_notifier(&batch);
506 let p = i % 2;
507 event.insert("hostname", hosts[p]);
508
509 partitions[p].push(line.into());
510 events.push(Event::Log(event));
511 }
512 drop(batch);
513
514 let events = stream::iter(events).map(Into::into);
515 sink.run(events).await.expect("Running sink failed");
516
517 assert_eq!(receiver.try_recv(), Ok(batch_status));
518
519 (hosts, partitions, rx)
520 }
521
522 #[tokio::test]
523 async fn smoke_fails() {
524 let (_hosts, _partitions, mut rx) =
525 smoke_start(StatusCode::FORBIDDEN, BatchStatus::Rejected).await;
526 assert!(matches!(rx.try_next(), Err(mpsc::TryRecvError { .. })));
527 }
528
529 #[tokio::test]
530 async fn smoke() {
531 assert_sink_compliance(&HTTP_SINK_TAGS, async {
532 let (hosts, partitions, mut rx) =
533 smoke_start(StatusCode::OK, BatchStatus::Delivered).await;
534
535 for _ in 0..partitions.len() {
536 let output = rx.next().await.unwrap();
537
538 let request = &output.0;
539 let body: serde_json::Value = serde_json::from_slice(&output.1[..]).unwrap();
540
541 let query = request.uri.query().unwrap();
542
543 let (p, host) = hosts
544 .iter()
545 .enumerate()
546 .find(|(_, host)| query.contains(&format!("hostname={host}")))
547 .expect("invalid hostname");
548 let lines = &partitions[p];
549
550 assert!(query.contains("ip=127.0.0.1"));
551 assert!(query.contains("mac=some-mac-addr"));
552 assert!(query.contains("tags=test%2Cmaybeanothertest"));
553
554 let output = body
555 .as_object()
556 .unwrap()
557 .get("lines")
558 .unwrap()
559 .as_array()
560 .unwrap();
561
562 for (i, line) in output.iter().enumerate() {
563 let line = line.as_object().unwrap();
565
566 assert_eq!(line.get("app").unwrap(), &json!("vector"));
567 assert_eq!(line.get("env").unwrap(), &json!("production"));
568 assert_eq!(line.get("line").unwrap(), &json!(lines[i]));
569
570 assert_eq!(
571 line.get("meta").unwrap(),
572 &json!({
573 "hostname": host,
574 })
575 );
576 }
577 }
578 })
579 .await;
580 }
581}