vector/sinks/sematext/
logs.rs1use async_trait::async_trait;
2use futures::stream::{BoxStream, StreamExt};
3use indoc::indoc;
4use vector_lib::configurable::configurable_component;
5use vector_lib::sensitive_string::SensitiveString;
6use vrl::event_path;
7
8use super::Region;
9use crate::{
10 codecs::Transformer,
11 config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
12 event::EventArray,
13 sinks::{
14 elasticsearch::{BulkConfig, ElasticsearchApiVersion, ElasticsearchConfig},
15 util::{
16 http::RequestConfig, BatchConfig, Compression, RealtimeSizeBasedDefaultBatchSettings,
17 StreamSink, TowerRequestConfig,
18 },
19 Healthcheck, VectorSink,
20 },
21 template::Template,
22};
23
24#[configurable_component(sink("sematext_logs", "Publish log events to Sematext."))]
26#[derive(Clone, Debug)]
27pub struct SematextLogsConfig {
28 #[serde(default = "super::default_region")]
29 #[configurable(derived)]
30 region: Region,
31
32 #[serde(alias = "host")]
36 #[configurable(metadata(docs::examples = "http://127.0.0.1"))]
37 #[configurable(metadata(docs::examples = "https://example.com"))]
38 endpoint: Option<String>,
39
40 #[configurable(metadata(docs::examples = "${SEMATEXT_TOKEN}"))]
42 #[configurable(metadata(docs::examples = "some-sematext-token"))]
43 token: SensitiveString,
44
45 #[configurable(derived)]
46 #[serde(skip_serializing_if = "crate::serde::is_default", default)]
47 pub encoding: Transformer,
48
49 #[configurable(derived)]
50 #[serde(default)]
51 request: TowerRequestConfig,
52
53 #[configurable(derived)]
54 #[serde(default)]
55 batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
56
57 #[configurable(derived)]
58 #[serde(
59 default,
60 deserialize_with = "crate::serde::bool_or_struct",
61 skip_serializing_if = "crate::serde::is_default"
62 )]
63 acknowledgements: AcknowledgementsConfig,
64}
65
66impl GenerateConfig for SematextLogsConfig {
67 fn generate_config() -> toml::Value {
68 toml::from_str(indoc! {r#"
69 token = "${SEMATEXT_TOKEN}"
70 "#})
71 .unwrap()
72 }
73}
74
75const US_ENDPOINT: &str = "https://logsene-receiver.sematext.com";
77const EU_ENDPOINT: &str = "https://logsene-receiver.eu.sematext.com";
78
79#[async_trait::async_trait]
80#[typetag::serde(name = "sematext_logs")]
81impl SinkConfig for SematextLogsConfig {
82 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
83 let endpoint = match (&self.endpoint, &self.region) {
84 (Some(endpoint), _) => endpoint.clone(),
85 (None, Region::Us) => US_ENDPOINT.to_owned(),
86 (None, Region::Eu) => EU_ENDPOINT.to_owned(),
87 };
88
89 let (sink, healthcheck) = ElasticsearchConfig {
90 endpoints: vec![endpoint],
91 compression: Compression::None,
92 doc_type: "\
93 logs"
94 .to_string(),
95 bulk: BulkConfig {
96 index: Template::try_from(self.token.inner())
97 .expect("unable to parse token as Template"),
98 ..Default::default()
99 },
100 batch: self.batch,
101 request: RequestConfig {
102 tower: self.request,
103 ..Default::default()
104 },
105 encoding: self.encoding.clone(),
106 api_version: ElasticsearchApiVersion::V6,
107 ..Default::default()
108 }
109 .build(cx)
110 .await?;
111
112 let stream = sink.into_stream();
113 let mapped_stream = MapTimestampStream { inner: stream };
114
115 Ok((VectorSink::Stream(Box::new(mapped_stream)), healthcheck))
116 }
117
118 fn input(&self) -> Input {
119 Input::log()
120 }
121
122 fn acknowledgements(&self) -> &AcknowledgementsConfig {
123 &self.acknowledgements
124 }
125}
126
127struct MapTimestampStream {
128 inner: Box<dyn StreamSink<EventArray> + Send>,
129}
130
131#[async_trait]
132impl StreamSink<EventArray> for MapTimestampStream {
133 async fn run(self: Box<Self>, input: BoxStream<'_, EventArray>) -> Result<(), ()> {
134 let mapped_input = input.map(map_timestamp).boxed();
135 self.inner.run(mapped_input).await
136 }
137}
138
139fn map_timestamp(mut events: EventArray) -> EventArray {
141 match &mut events {
142 EventArray::Logs(logs) => {
143 for log in logs {
144 if let Some(path) = log.timestamp_path().cloned().as_ref() {
145 log.rename_key(path, event_path!("@timestamp"));
146 }
147
148 if let Some(path) = log.host_path().cloned().as_ref() {
149 log.rename_key(path, event_path!("os.host"));
150 }
151 }
152 }
153 _ => unreachable!("This sink only accepts logs"),
154 }
155
156 events
157}
158
159#[cfg(test)]
160mod tests {
161 use futures::StreamExt;
162 use indoc::indoc;
163
164 use super::*;
165 use crate::{
166 config::SinkConfig,
167 sinks::util::test::{build_test_server, load_sink},
168 test_util::{
169 components::{self, HTTP_SINK_TAGS},
170 next_addr, random_lines_with_stream,
171 },
172 };
173
174 #[test]
175 fn generate_config() {
176 crate::test_util::test_generate_config::<SematextLogsConfig>();
177 }
178
179 #[tokio::test]
180 async fn smoke() {
181 let (mut config, cx) = load_sink::<SematextLogsConfig>(indoc! {r#"
182 token = "mylogtoken"
183 "#})
184 .unwrap();
185
186 _ = config.build(cx.clone()).await.unwrap();
188
189 let addr = next_addr();
190 config.endpoint = Some(format!("http://{addr}"));
193
194 let (sink, _) = config.build(cx).await.unwrap();
195
196 let (mut rx, _trigger, server) = build_test_server(addr);
197 tokio::spawn(server);
198
199 let (expected, events) = random_lines_with_stream(100, 10, None);
200 components::run_and_assert_sink_compliance(sink, events, &HTTP_SINK_TAGS).await;
201
202 let output = rx.next().await.unwrap();
203
204 let json = serde_json::Deserializer::from_slice(&output.1[..])
206 .into_iter::<serde_json::Value>()
207 .map(|v| v.expect("decoding json"));
208
209 let mut expected_message_idx = 0;
210 for (i, val) in json.enumerate() {
211 if i % 2 == 0 {
214 let token = val
216 .get("index")
217 .unwrap()
218 .get("_index")
219 .unwrap()
220 .as_str()
221 .unwrap();
222
223 assert_eq!(token, "mylogtoken");
224 } else {
225 let message = val.get("message").unwrap().as_str().unwrap();
226 assert_eq!(message, &expected[expected_message_idx]);
227 expected_message_idx += 1;
228 }
229 }
230 }
231}