vector/sinks/sematext/
logs.rs

1use async_trait::async_trait;
2use futures::stream::{BoxStream, StreamExt};
3use indoc::indoc;
4use vector_lib::configurable::configurable_component;
5use vector_lib::sensitive_string::SensitiveString;
6use vrl::event_path;
7
8use super::Region;
9use crate::{
10    codecs::Transformer,
11    config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
12    event::EventArray,
13    sinks::{
14        elasticsearch::{BulkConfig, ElasticsearchApiVersion, ElasticsearchConfig},
15        util::{
16            http::RequestConfig, BatchConfig, Compression, RealtimeSizeBasedDefaultBatchSettings,
17            StreamSink, TowerRequestConfig,
18        },
19        Healthcheck, VectorSink,
20    },
21    template::Template,
22};
23
24/// Configuration for the `sematext_logs` sink.
25#[configurable_component(sink("sematext_logs", "Publish log events to Sematext."))]
26#[derive(Clone, Debug)]
27pub struct SematextLogsConfig {
28    #[serde(default = "super::default_region")]
29    #[configurable(derived)]
30    region: Region,
31
32    /// The endpoint to send data to.
33    ///
34    /// Setting this option overrides the `region` option.
35    #[serde(alias = "host")]
36    #[configurable(metadata(docs::examples = "http://127.0.0.1"))]
37    #[configurable(metadata(docs::examples = "https://example.com"))]
38    endpoint: Option<String>,
39
40    /// The token that is used to write to Sematext.
41    #[configurable(metadata(docs::examples = "${SEMATEXT_TOKEN}"))]
42    #[configurable(metadata(docs::examples = "some-sematext-token"))]
43    token: SensitiveString,
44
45    #[configurable(derived)]
46    #[serde(skip_serializing_if = "crate::serde::is_default", default)]
47    pub encoding: Transformer,
48
49    #[configurable(derived)]
50    #[serde(default)]
51    request: TowerRequestConfig,
52
53    #[configurable(derived)]
54    #[serde(default)]
55    batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
56
57    #[configurable(derived)]
58    #[serde(
59        default,
60        deserialize_with = "crate::serde::bool_or_struct",
61        skip_serializing_if = "crate::serde::is_default"
62    )]
63    acknowledgements: AcknowledgementsConfig,
64}
65
66impl GenerateConfig for SematextLogsConfig {
67    fn generate_config() -> toml::Value {
68        toml::from_str(indoc! {r#"
69            token = "${SEMATEXT_TOKEN}"
70        "#})
71        .unwrap()
72    }
73}
74
75// https://sematext.com/docs/logs/index-events-via-elasticsearch-api/
76const US_ENDPOINT: &str = "https://logsene-receiver.sematext.com";
77const EU_ENDPOINT: &str = "https://logsene-receiver.eu.sematext.com";
78
79#[async_trait::async_trait]
80#[typetag::serde(name = "sematext_logs")]
81impl SinkConfig for SematextLogsConfig {
82    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
83        let endpoint = match (&self.endpoint, &self.region) {
84            (Some(endpoint), _) => endpoint.clone(),
85            (None, Region::Us) => US_ENDPOINT.to_owned(),
86            (None, Region::Eu) => EU_ENDPOINT.to_owned(),
87        };
88
89        let (sink, healthcheck) = ElasticsearchConfig {
90            endpoints: vec![endpoint],
91            compression: Compression::None,
92            doc_type: "\
93                logs"
94                .to_string(),
95            bulk: BulkConfig {
96                index: Template::try_from(self.token.inner())
97                    .expect("unable to parse token as Template"),
98                ..Default::default()
99            },
100            batch: self.batch,
101            request: RequestConfig {
102                tower: self.request,
103                ..Default::default()
104            },
105            encoding: self.encoding.clone(),
106            api_version: ElasticsearchApiVersion::V6,
107            ..Default::default()
108        }
109        .build(cx)
110        .await?;
111
112        let stream = sink.into_stream();
113        let mapped_stream = MapTimestampStream { inner: stream };
114
115        Ok((VectorSink::Stream(Box::new(mapped_stream)), healthcheck))
116    }
117
118    fn input(&self) -> Input {
119        Input::log()
120    }
121
122    fn acknowledgements(&self) -> &AcknowledgementsConfig {
123        &self.acknowledgements
124    }
125}
126
127struct MapTimestampStream {
128    inner: Box<dyn StreamSink<EventArray> + Send>,
129}
130
131#[async_trait]
132impl StreamSink<EventArray> for MapTimestampStream {
133    async fn run(self: Box<Self>, input: BoxStream<'_, EventArray>) -> Result<(), ()> {
134        let mapped_input = input.map(map_timestamp).boxed();
135        self.inner.run(mapped_input).await
136    }
137}
138
139/// Used to map `timestamp` to `@timestamp`.
140fn map_timestamp(mut events: EventArray) -> EventArray {
141    match &mut events {
142        EventArray::Logs(logs) => {
143            for log in logs {
144                if let Some(path) = log.timestamp_path().cloned().as_ref() {
145                    log.rename_key(path, event_path!("@timestamp"));
146                }
147
148                if let Some(path) = log.host_path().cloned().as_ref() {
149                    log.rename_key(path, event_path!("os.host"));
150                }
151            }
152        }
153        _ => unreachable!("This sink only accepts logs"),
154    }
155
156    events
157}
158
159#[cfg(test)]
160mod tests {
161    use futures::StreamExt;
162    use indoc::indoc;
163
164    use super::*;
165    use crate::{
166        config::SinkConfig,
167        sinks::util::test::{build_test_server, load_sink},
168        test_util::{
169            components::{self, HTTP_SINK_TAGS},
170            next_addr, random_lines_with_stream,
171        },
172    };
173
174    #[test]
175    fn generate_config() {
176        crate::test_util::test_generate_config::<SematextLogsConfig>();
177    }
178
179    #[tokio::test]
180    async fn smoke() {
181        let (mut config, cx) = load_sink::<SematextLogsConfig>(indoc! {r#"
182            token = "mylogtoken"
183        "#})
184        .unwrap();
185
186        // Make sure we can build the config
187        _ = config.build(cx.clone()).await.unwrap();
188
189        let addr = next_addr();
190        // Swap out the host so we can force send it
191        // to our local server
192        config.endpoint = Some(format!("http://{addr}"));
193
194        let (sink, _) = config.build(cx).await.unwrap();
195
196        let (mut rx, _trigger, server) = build_test_server(addr);
197        tokio::spawn(server);
198
199        let (expected, events) = random_lines_with_stream(100, 10, None);
200        components::run_and_assert_sink_compliance(sink, events, &HTTP_SINK_TAGS).await;
201
202        let output = rx.next().await.unwrap();
203
204        // A stream of `serde_json::Value`
205        let json = serde_json::Deserializer::from_slice(&output.1[..])
206            .into_iter::<serde_json::Value>()
207            .map(|v| v.expect("decoding json"));
208
209        let mut expected_message_idx = 0;
210        for (i, val) in json.enumerate() {
211            // Every even message is the index which contains the token for sematext
212            // Every odd message is the actual message in JSON format.
213            if i % 2 == 0 {
214                // Fetch {index: {_index: ""}}
215                let token = val
216                    .get("index")
217                    .unwrap()
218                    .get("_index")
219                    .unwrap()
220                    .as_str()
221                    .unwrap();
222
223                assert_eq!(token, "mylogtoken");
224            } else {
225                let message = val.get("message").unwrap().as_str().unwrap();
226                assert_eq!(message, &expected[expected_message_idx]);
227                expected_message_idx += 1;
228            }
229        }
230    }
231}