vector/sinks/sematext/
logs.rs

1use async_trait::async_trait;
2use futures::stream::{BoxStream, StreamExt};
3use indoc::indoc;
4use vector_lib::{configurable::configurable_component, sensitive_string::SensitiveString};
5use vrl::event_path;
6
7use super::Region;
8use crate::{
9    codecs::Transformer,
10    config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
11    event::EventArray,
12    sinks::{
13        Healthcheck, VectorSink,
14        elasticsearch::{BulkConfig, ElasticsearchApiVersion, ElasticsearchConfig},
15        util::{
16            BatchConfig, Compression, RealtimeSizeBasedDefaultBatchSettings, StreamSink,
17            TowerRequestConfig, http::RequestConfig,
18        },
19    },
20    template::Template,
21};
22
23/// Configuration for the `sematext_logs` sink.
24#[configurable_component(sink("sematext_logs", "Publish log events to Sematext."))]
25#[derive(Clone, Debug)]
26pub struct SematextLogsConfig {
27    #[serde(default = "super::default_region")]
28    #[configurable(derived)]
29    region: Region,
30
31    /// The endpoint to send data to.
32    ///
33    /// Setting this option overrides the `region` option.
34    #[serde(alias = "host")]
35    #[configurable(metadata(docs::examples = "http://127.0.0.1"))]
36    #[configurable(metadata(docs::examples = "https://example.com"))]
37    endpoint: Option<String>,
38
39    /// The token that is used to write to Sematext.
40    #[configurable(metadata(docs::examples = "${SEMATEXT_TOKEN}"))]
41    #[configurable(metadata(docs::examples = "some-sematext-token"))]
42    token: SensitiveString,
43
44    #[configurable(derived)]
45    #[serde(skip_serializing_if = "crate::serde::is_default", default)]
46    pub encoding: Transformer,
47
48    #[configurable(derived)]
49    #[serde(default)]
50    request: TowerRequestConfig,
51
52    #[configurable(derived)]
53    #[serde(default)]
54    batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
55
56    #[configurable(derived)]
57    #[serde(
58        default,
59        deserialize_with = "crate::serde::bool_or_struct",
60        skip_serializing_if = "crate::serde::is_default"
61    )]
62    acknowledgements: AcknowledgementsConfig,
63}
64
65impl GenerateConfig for SematextLogsConfig {
66    fn generate_config() -> toml::Value {
67        toml::from_str(indoc! {r#"
68            token = "${SEMATEXT_TOKEN}"
69        "#})
70        .unwrap()
71    }
72}
73
74// https://sematext.com/docs/logs/index-events-via-elasticsearch-api/
75const US_ENDPOINT: &str = "https://logsene-receiver.sematext.com";
76const EU_ENDPOINT: &str = "https://logsene-receiver.eu.sematext.com";
77
78#[async_trait::async_trait]
79#[typetag::serde(name = "sematext_logs")]
80impl SinkConfig for SematextLogsConfig {
81    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
82        let endpoint = match (&self.endpoint, &self.region) {
83            (Some(endpoint), _) => endpoint.clone(),
84            (None, Region::Us) => US_ENDPOINT.to_owned(),
85            (None, Region::Eu) => EU_ENDPOINT.to_owned(),
86        };
87
88        let (sink, healthcheck) = ElasticsearchConfig {
89            endpoints: vec![endpoint],
90            compression: Compression::None,
91            doc_type: "\
92                logs"
93                .to_string(),
94            bulk: BulkConfig {
95                index: Template::try_from(self.token.inner())
96                    .expect("unable to parse token as Template"),
97                ..Default::default()
98            },
99            batch: self.batch,
100            request: RequestConfig {
101                tower: self.request,
102                ..Default::default()
103            },
104            encoding: self.encoding.clone(),
105            api_version: ElasticsearchApiVersion::V6,
106            ..Default::default()
107        }
108        .build(cx)
109        .await?;
110
111        let stream = sink.into_stream();
112        let mapped_stream = MapTimestampStream { inner: stream };
113
114        Ok((VectorSink::Stream(Box::new(mapped_stream)), healthcheck))
115    }
116
117    fn input(&self) -> Input {
118        Input::log()
119    }
120
121    fn acknowledgements(&self) -> &AcknowledgementsConfig {
122        &self.acknowledgements
123    }
124}
125
126struct MapTimestampStream {
127    inner: Box<dyn StreamSink<EventArray> + Send>,
128}
129
130#[async_trait]
131impl StreamSink<EventArray> for MapTimestampStream {
132    async fn run(self: Box<Self>, input: BoxStream<'_, EventArray>) -> Result<(), ()> {
133        let mapped_input = input.map(map_timestamp).boxed();
134        self.inner.run(mapped_input).await
135    }
136}
137
138/// Used to map `timestamp` to `@timestamp`.
139fn map_timestamp(mut events: EventArray) -> EventArray {
140    match &mut events {
141        EventArray::Logs(logs) => {
142            for log in logs {
143                if let Some(path) = log.timestamp_path().cloned().as_ref() {
144                    log.rename_key(path, event_path!("@timestamp"));
145                }
146
147                if let Some(path) = log.host_path().cloned().as_ref() {
148                    log.rename_key(path, event_path!("os.host"));
149                }
150            }
151        }
152        _ => unreachable!("This sink only accepts logs"),
153    }
154
155    events
156}
157
158#[cfg(test)]
159mod tests {
160    use futures::StreamExt;
161    use indoc::indoc;
162
163    use super::*;
164    use crate::{
165        config::SinkConfig,
166        sinks::util::test::{build_test_server, load_sink},
167        test_util::{
168            components::{self, HTTP_SINK_TAGS},
169            next_addr, random_lines_with_stream,
170        },
171    };
172
173    #[test]
174    fn generate_config() {
175        crate::test_util::test_generate_config::<SematextLogsConfig>();
176    }
177
178    #[tokio::test]
179    async fn smoke() {
180        let (mut config, cx) = load_sink::<SematextLogsConfig>(indoc! {r#"
181            token = "mylogtoken"
182        "#})
183        .unwrap();
184
185        // Make sure we can build the config
186        _ = config.build(cx.clone()).await.unwrap();
187
188        let addr = next_addr();
189        // Swap out the host so we can force send it
190        // to our local server
191        config.endpoint = Some(format!("http://{addr}"));
192
193        let (sink, _) = config.build(cx).await.unwrap();
194
195        let (mut rx, _trigger, server) = build_test_server(addr);
196        tokio::spawn(server);
197
198        let (expected, events) = random_lines_with_stream(100, 10, None);
199        components::run_and_assert_sink_compliance(sink, events, &HTTP_SINK_TAGS).await;
200
201        let output = rx.next().await.unwrap();
202
203        // A stream of `serde_json::Value`
204        let json = serde_json::Deserializer::from_slice(&output.1[..])
205            .into_iter::<serde_json::Value>()
206            .map(|v| v.expect("decoding json"));
207
208        let mut expected_message_idx = 0;
209        for (i, val) in json.enumerate() {
210            // Every even message is the index which contains the token for sematext
211            // Every odd message is the actual message in JSON format.
212            if i % 2 == 0 {
213                // Fetch {index: {_index: ""}}
214                let token = val
215                    .get("index")
216                    .unwrap()
217                    .get("_index")
218                    .unwrap()
219                    .as_str()
220                    .unwrap();
221
222                assert_eq!(token, "mylogtoken");
223            } else {
224                let message = val.get("message").unwrap().as_str().unwrap();
225                assert_eq!(message, &expected[expected_message_idx]);
226                expected_message_idx += 1;
227            }
228        }
229    }
230}