vector/sinks/sematext/
logs.rs

1use async_trait::async_trait;
2use futures::stream::{BoxStream, StreamExt};
3use indoc::indoc;
4use vector_lib::{configurable::configurable_component, sensitive_string::SensitiveString};
5use vrl::event_path;
6
7use super::Region;
8use crate::{
9    codecs::Transformer,
10    config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
11    event::EventArray,
12    sinks::{
13        Healthcheck, VectorSink,
14        elasticsearch::{BulkConfig, ElasticsearchApiVersion, ElasticsearchConfig},
15        util::{
16            BatchConfig, Compression, RealtimeSizeBasedDefaultBatchSettings, StreamSink,
17            TowerRequestConfig, http::RequestConfig,
18        },
19    },
20    template::Template,
21};
22
23/// Configuration for the `sematext_logs` sink.
24#[configurable_component(sink("sematext_logs", "Publish log events to Sematext."))]
25#[derive(Clone, Debug)]
26pub struct SematextLogsConfig {
27    #[serde(default = "super::default_region")]
28    #[configurable(derived)]
29    region: Region,
30
31    /// The endpoint to send data to.
32    ///
33    /// Setting this option overrides the `region` option.
34    #[serde(alias = "host")]
35    #[configurable(metadata(docs::examples = "http://127.0.0.1"))]
36    #[configurable(metadata(docs::examples = "https://example.com"))]
37    endpoint: Option<String>,
38
39    /// The token that is used to write to Sematext.
40    #[configurable(metadata(docs::examples = "${SEMATEXT_TOKEN}"))]
41    #[configurable(metadata(docs::examples = "some-sematext-token"))]
42    token: SensitiveString,
43
44    #[configurable(derived)]
45    #[serde(skip_serializing_if = "crate::serde::is_default", default)]
46    pub encoding: Transformer,
47
48    #[configurable(derived)]
49    #[serde(default)]
50    request: TowerRequestConfig,
51
52    #[configurable(derived)]
53    #[serde(default)]
54    batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
55
56    #[configurable(derived)]
57    #[serde(
58        default,
59        deserialize_with = "crate::serde::bool_or_struct",
60        skip_serializing_if = "crate::serde::is_default"
61    )]
62    acknowledgements: AcknowledgementsConfig,
63}
64
65impl GenerateConfig for SematextLogsConfig {
66    fn generate_config() -> toml::Value {
67        toml::from_str(indoc! {r#"
68            token = "${SEMATEXT_TOKEN}"
69        "#})
70        .unwrap()
71    }
72}
73
74// https://sematext.com/docs/logs/index-events-via-elasticsearch-api/
75const US_ENDPOINT: &str = "https://logsene-receiver.sematext.com";
76const EU_ENDPOINT: &str = "https://logsene-receiver.eu.sematext.com";
77
78#[async_trait::async_trait]
79#[typetag::serde(name = "sematext_logs")]
80impl SinkConfig for SematextLogsConfig {
81    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
82        let endpoint = match (&self.endpoint, &self.region) {
83            (Some(endpoint), _) => endpoint.clone(),
84            (None, Region::Us) => US_ENDPOINT.to_owned(),
85            (None, Region::Eu) => EU_ENDPOINT.to_owned(),
86        };
87
88        let (sink, healthcheck) = ElasticsearchConfig {
89            endpoints: vec![endpoint],
90            compression: Compression::None,
91            doc_type: "\
92                logs"
93                .to_string(),
94            bulk: BulkConfig {
95                index: Template::try_from(self.token.inner())
96                    .expect("unable to parse token as Template"),
97                ..Default::default()
98            },
99            batch: self.batch,
100            request: RequestConfig {
101                tower: self.request,
102                ..Default::default()
103            },
104            encoding: self.encoding.clone(),
105            api_version: ElasticsearchApiVersion::V6,
106            ..Default::default()
107        }
108        .build(cx)
109        .await?;
110
111        let stream = sink.into_stream();
112        let mapped_stream = MapTimestampStream { inner: stream };
113
114        Ok((VectorSink::Stream(Box::new(mapped_stream)), healthcheck))
115    }
116
117    fn input(&self) -> Input {
118        Input::log()
119    }
120
121    fn acknowledgements(&self) -> &AcknowledgementsConfig {
122        &self.acknowledgements
123    }
124}
125
126struct MapTimestampStream {
127    inner: Box<dyn StreamSink<EventArray> + Send>,
128}
129
130#[async_trait]
131impl StreamSink<EventArray> for MapTimestampStream {
132    async fn run(self: Box<Self>, input: BoxStream<'_, EventArray>) -> Result<(), ()> {
133        let mapped_input = input.map(map_timestamp).boxed();
134        self.inner.run(mapped_input).await
135    }
136}
137
138/// Used to map `timestamp` to `@timestamp`.
139fn map_timestamp(mut events: EventArray) -> EventArray {
140    match &mut events {
141        EventArray::Logs(logs) => {
142            for log in logs {
143                if let Some(path) = log.timestamp_path().cloned().as_ref() {
144                    log.rename_key(path, event_path!("@timestamp"));
145                }
146
147                if let Some(path) = log.host_path().cloned().as_ref() {
148                    log.rename_key(path, event_path!("os.host"));
149                }
150            }
151        }
152        _ => unreachable!("This sink only accepts logs"),
153    }
154
155    events
156}
157
158#[cfg(test)]
159mod tests {
160    use futures::StreamExt;
161    use indoc::indoc;
162
163    use super::*;
164    use crate::{
165        config::SinkConfig,
166        sinks::util::test::{build_test_server, load_sink},
167        test_util::{
168            addr::next_addr,
169            components::{self, HTTP_SINK_TAGS},
170            random_lines_with_stream,
171        },
172    };
173
174    #[test]
175    fn generate_config() {
176        crate::test_util::test_generate_config::<SematextLogsConfig>();
177    }
178
179    #[tokio::test]
180    async fn smoke() {
181        let (mut config, cx) = load_sink::<SematextLogsConfig>(indoc! {r#"
182            token = "mylogtoken"
183        "#})
184        .unwrap();
185
186        // Make sure we can build the config
187        _ = config.build(cx.clone()).await.unwrap();
188
189        let (_guard, addr) = next_addr();
190        // Swap out the host so we can force send it
191        // to our local server
192        config.endpoint = Some(format!("http://{addr}"));
193
194        let (sink, _) = config.build(cx).await.unwrap();
195
196        let (mut rx, _trigger, server) = build_test_server(addr);
197        tokio::spawn(server);
198
199        let (expected, events) = random_lines_with_stream(100, 10, None);
200        components::run_and_assert_sink_compliance(sink, events, &HTTP_SINK_TAGS).await;
201
202        let output = rx.next().await.unwrap();
203
204        // A stream of `serde_json::Value`
205        let json = serde_json::Deserializer::from_slice(&output.1[..])
206            .into_iter::<serde_json::Value>()
207            .map(|v| v.expect("decoding json"));
208
209        let mut expected_message_idx = 0;
210        for (i, val) in json.enumerate() {
211            // Every even message is the index which contains the token for sematext
212            // Every odd message is the actual message in JSON format.
213            if i % 2 == 0 {
214                // Fetch {index: {_index: ""}}
215                let token = val
216                    .get("index")
217                    .unwrap()
218                    .get("_index")
219                    .unwrap()
220                    .as_str()
221                    .unwrap();
222
223                assert_eq!(token, "mylogtoken");
224            } else {
225                let message = val.get("message").unwrap().as_str().unwrap();
226                assert_eq!(message, &expected[expected_message_idx]);
227                expected_message_idx += 1;
228            }
229        }
230    }
231}