vector/sinks/sematext/
logs.rs1use async_trait::async_trait;
2use futures::stream::{BoxStream, StreamExt};
3use indoc::indoc;
4use vector_lib::{configurable::configurable_component, sensitive_string::SensitiveString};
5use vrl::event_path;
6
7use super::Region;
8use crate::{
9 codecs::Transformer,
10 config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
11 event::EventArray,
12 sinks::{
13 Healthcheck, VectorSink,
14 elasticsearch::{BulkConfig, ElasticsearchApiVersion, ElasticsearchConfig},
15 util::{
16 BatchConfig, Compression, RealtimeSizeBasedDefaultBatchSettings, StreamSink,
17 TowerRequestConfig, http::RequestConfig,
18 },
19 },
20 template::Template,
21};
22
23#[configurable_component(sink("sematext_logs", "Publish log events to Sematext."))]
25#[derive(Clone, Debug)]
26pub struct SematextLogsConfig {
27 #[serde(default = "super::default_region")]
28 #[configurable(derived)]
29 region: Region,
30
31 #[serde(alias = "host")]
35 #[configurable(metadata(docs::examples = "http://127.0.0.1"))]
36 #[configurable(metadata(docs::examples = "https://example.com"))]
37 endpoint: Option<String>,
38
39 #[configurable(metadata(docs::examples = "${SEMATEXT_TOKEN}"))]
41 #[configurable(metadata(docs::examples = "some-sematext-token"))]
42 token: SensitiveString,
43
44 #[configurable(derived)]
45 #[serde(skip_serializing_if = "crate::serde::is_default", default)]
46 pub encoding: Transformer,
47
48 #[configurable(derived)]
49 #[serde(default)]
50 request: TowerRequestConfig,
51
52 #[configurable(derived)]
53 #[serde(default)]
54 batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
55
56 #[configurable(derived)]
57 #[serde(
58 default,
59 deserialize_with = "crate::serde::bool_or_struct",
60 skip_serializing_if = "crate::serde::is_default"
61 )]
62 acknowledgements: AcknowledgementsConfig,
63}
64
65impl GenerateConfig for SematextLogsConfig {
66 fn generate_config() -> toml::Value {
67 toml::from_str(indoc! {r#"
68 token = "${SEMATEXT_TOKEN}"
69 "#})
70 .unwrap()
71 }
72}
73
74const US_ENDPOINT: &str = "https://logsene-receiver.sematext.com";
76const EU_ENDPOINT: &str = "https://logsene-receiver.eu.sematext.com";
77
78#[async_trait::async_trait]
79#[typetag::serde(name = "sematext_logs")]
80impl SinkConfig for SematextLogsConfig {
81 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
82 let endpoint = match (&self.endpoint, &self.region) {
83 (Some(endpoint), _) => endpoint.clone(),
84 (None, Region::Us) => US_ENDPOINT.to_owned(),
85 (None, Region::Eu) => EU_ENDPOINT.to_owned(),
86 };
87
88 let (sink, healthcheck) = ElasticsearchConfig {
89 endpoints: vec![endpoint],
90 compression: Compression::None,
91 doc_type: "\
92 logs"
93 .to_string(),
94 bulk: BulkConfig {
95 index: Template::try_from(self.token.inner())
96 .expect("unable to parse token as Template"),
97 ..Default::default()
98 },
99 batch: self.batch,
100 request: RequestConfig {
101 tower: self.request,
102 ..Default::default()
103 },
104 encoding: self.encoding.clone(),
105 api_version: ElasticsearchApiVersion::V6,
106 ..Default::default()
107 }
108 .build(cx)
109 .await?;
110
111 let stream = sink.into_stream();
112 let mapped_stream = MapTimestampStream { inner: stream };
113
114 Ok((VectorSink::Stream(Box::new(mapped_stream)), healthcheck))
115 }
116
117 fn input(&self) -> Input {
118 Input::log()
119 }
120
121 fn acknowledgements(&self) -> &AcknowledgementsConfig {
122 &self.acknowledgements
123 }
124}
125
126struct MapTimestampStream {
127 inner: Box<dyn StreamSink<EventArray> + Send>,
128}
129
130#[async_trait]
131impl StreamSink<EventArray> for MapTimestampStream {
132 async fn run(self: Box<Self>, input: BoxStream<'_, EventArray>) -> Result<(), ()> {
133 let mapped_input = input.map(map_timestamp).boxed();
134 self.inner.run(mapped_input).await
135 }
136}
137
138fn map_timestamp(mut events: EventArray) -> EventArray {
140 match &mut events {
141 EventArray::Logs(logs) => {
142 for log in logs {
143 if let Some(path) = log.timestamp_path().cloned().as_ref() {
144 log.rename_key(path, event_path!("@timestamp"));
145 }
146
147 if let Some(path) = log.host_path().cloned().as_ref() {
148 log.rename_key(path, event_path!("os.host"));
149 }
150 }
151 }
152 _ => unreachable!("This sink only accepts logs"),
153 }
154
155 events
156}
157
158#[cfg(test)]
159mod tests {
160 use futures::StreamExt;
161 use indoc::indoc;
162
163 use super::*;
164 use crate::{
165 config::SinkConfig,
166 sinks::util::test::{build_test_server, load_sink},
167 test_util::{
168 components::{self, HTTP_SINK_TAGS},
169 next_addr, random_lines_with_stream,
170 },
171 };
172
173 #[test]
174 fn generate_config() {
175 crate::test_util::test_generate_config::<SematextLogsConfig>();
176 }
177
178 #[tokio::test]
179 async fn smoke() {
180 let (mut config, cx) = load_sink::<SematextLogsConfig>(indoc! {r#"
181 token = "mylogtoken"
182 "#})
183 .unwrap();
184
185 _ = config.build(cx.clone()).await.unwrap();
187
188 let addr = next_addr();
189 config.endpoint = Some(format!("http://{addr}"));
192
193 let (sink, _) = config.build(cx).await.unwrap();
194
195 let (mut rx, _trigger, server) = build_test_server(addr);
196 tokio::spawn(server);
197
198 let (expected, events) = random_lines_with_stream(100, 10, None);
199 components::run_and_assert_sink_compliance(sink, events, &HTTP_SINK_TAGS).await;
200
201 let output = rx.next().await.unwrap();
202
203 let json = serde_json::Deserializer::from_slice(&output.1[..])
205 .into_iter::<serde_json::Value>()
206 .map(|v| v.expect("decoding json"));
207
208 let mut expected_message_idx = 0;
209 for (i, val) in json.enumerate() {
210 if i % 2 == 0 {
213 let token = val
215 .get("index")
216 .unwrap()
217 .get("_index")
218 .unwrap()
219 .as_str()
220 .unwrap();
221
222 assert_eq!(token, "mylogtoken");
223 } else {
224 let message = val.get("message").unwrap().as_str().unwrap();
225 assert_eq!(message, &expected[expected_message_idx]);
226 expected_message_idx += 1;
227 }
228 }
229 }
230}