vector/sinks/elasticsearch/
encoder.rs

1use std::{io, io::Write};
2
3use serde::Serialize;
4use serde_json::json;
5use vector_lib::buffers::EventCount;
6use vector_lib::{config::telemetry, event::Event, ByteSizeOf, EstimatedJsonEncodedSizeOf};
7use vector_lib::{
8    internal_event::TaggedEventsSent,
9    json_size::JsonSize,
10    request_metadata::{GetEventCountTags, GroupedCountByteSize},
11};
12
13use crate::{
14    codecs::Transformer,
15    event::{EventFinalizers, Finalizable, LogEvent},
16    sinks::{
17        elasticsearch::{BulkAction, VersionType},
18        util::encoding::{as_tracked_write, Encoder},
19    },
20};
21
22#[derive(Serialize, Clone, Debug)]
23pub enum DocumentVersionType {
24    External,
25    ExternalGte,
26}
27
28impl DocumentVersionType {
29    pub const fn as_str(&self) -> &'static str {
30        match self {
31            DocumentVersionType::External => VersionType::External.as_str(),
32            DocumentVersionType::ExternalGte => VersionType::ExternalGte.as_str(),
33        }
34    }
35}
36
37#[derive(Serialize, Clone, Debug)]
38pub struct DocumentVersion {
39    pub kind: DocumentVersionType,
40    pub value: u64,
41}
42
43#[derive(Serialize, Clone, Debug)]
44pub enum DocumentMetadata {
45    WithoutId,
46    Id(String),
47    IdAndVersion(String, DocumentVersion),
48}
49
50#[derive(Serialize, Clone, Debug)]
51pub struct ProcessedEvent {
52    pub index: String,
53    pub bulk_action: BulkAction,
54    pub log: LogEvent,
55    pub document_metadata: DocumentMetadata,
56}
57
58impl Finalizable for ProcessedEvent {
59    fn take_finalizers(&mut self) -> EventFinalizers {
60        self.log.metadata_mut().take_finalizers()
61    }
62}
63
64impl ByteSizeOf for ProcessedEvent {
65    fn allocated_bytes(&self) -> usize {
66        match &self.document_metadata {
67            DocumentMetadata::WithoutId => {
68                self.index.allocated_bytes() + self.log.allocated_bytes()
69            }
70            DocumentMetadata::Id(id) | DocumentMetadata::IdAndVersion(id, _) => {
71                self.index.allocated_bytes() + self.log.allocated_bytes() + id.allocated_bytes()
72            }
73        }
74    }
75}
76
77impl EstimatedJsonEncodedSizeOf for ProcessedEvent {
78    fn estimated_json_encoded_size_of(&self) -> JsonSize {
79        self.log.estimated_json_encoded_size_of()
80    }
81}
82
83impl EventCount for ProcessedEvent {
84    fn event_count(&self) -> usize {
85        // An Elasticsearch ProcessedEvent is mapped one-to-one with an event.
86        1
87    }
88}
89
90impl GetEventCountTags for ProcessedEvent {
91    fn get_tags(&self) -> TaggedEventsSent {
92        self.log.get_tags()
93    }
94}
95
96#[derive(PartialEq, Eq, Default, Clone, Debug)]
97pub struct ElasticsearchEncoder {
98    pub transformer: Transformer,
99    pub doc_type: String,
100    pub suppress_type_name: bool,
101}
102
103impl Encoder<Vec<ProcessedEvent>> for ElasticsearchEncoder {
104    fn encode_input(
105        &self,
106        input: Vec<ProcessedEvent>,
107        writer: &mut dyn Write,
108    ) -> std::io::Result<(usize, GroupedCountByteSize)> {
109        let mut written_bytes = 0;
110        let mut byte_size = telemetry().create_request_count_byte_size();
111        for event in input {
112            let log = {
113                let mut event = Event::from(event.log);
114                self.transformer.transform(&mut event);
115                byte_size.add_event(&event, event.estimated_json_encoded_size_of());
116
117                event.into_log()
118            };
119            written_bytes += write_bulk_action(
120                writer,
121                event.bulk_action.as_str(),
122                &event.index,
123                &self.doc_type,
124                self.suppress_type_name,
125                &event.document_metadata,
126            )?;
127            written_bytes +=
128                as_tracked_write::<_, _, io::Error>(writer, &log, |mut writer, log| {
129                    writer.write_all(b"\n")?;
130                    // False positive clippy hit on the following line. Clippy wants us to skip the
131                    // borrow, but then the value is moved for the following line.
132                    #[allow(clippy::needless_borrows_for_generic_args)]
133                    serde_json::to_writer(&mut writer, log)?;
134                    writer.write_all(b"\n")?;
135                    Ok(())
136                })?;
137        }
138
139        Ok((written_bytes, byte_size))
140    }
141}
142
143fn write_bulk_action(
144    writer: &mut dyn Write,
145    bulk_action: &str,
146    index: &str,
147    doc_type: &str,
148    suppress_type: bool,
149    document: &DocumentMetadata,
150) -> std::io::Result<usize> {
151    as_tracked_write(
152        writer,
153        (bulk_action, index, doc_type, suppress_type, document),
154        |writer, (bulk_action, index, doc_type, suppress_type, document)| match (
155            suppress_type,
156            document,
157        ) {
158            (true, DocumentMetadata::Id(id)) => {
159                write!(
160                    writer,
161                    "{}",
162                    json!({
163                        bulk_action: {
164                            "_index": index,
165                            "_id": id,
166                        }
167                    }),
168                )
169            }
170            (false, DocumentMetadata::Id(id)) => {
171                write!(
172                    writer,
173                    "{}",
174                    json!({
175                        bulk_action: {
176                            "_type": doc_type,
177                            "_index": index,
178                            "_id": id,
179                        }
180                    }),
181                )
182            }
183            (true, DocumentMetadata::WithoutId) => {
184                write!(
185                    writer,
186                    "{}",
187                    json!({
188                        bulk_action: {
189                            "_index": index,
190                        }
191                    }),
192                )
193            }
194            (false, DocumentMetadata::WithoutId) => {
195                write!(
196                    writer,
197                    "{}",
198                    json!({
199                        bulk_action: {
200                            "_type": doc_type,
201                            "_index": index,
202                        }
203                    }),
204                )
205            }
206            (true, DocumentMetadata::IdAndVersion(id, version)) => {
207                write!(
208                    writer,
209                    "{}",
210                    json!({
211                        bulk_action: {
212                            "_id": id,
213                            "_index": index,
214                            "version_type": version.kind.as_str(),
215                            "version": version.value,
216                        }
217                    }),
218                )
219            }
220            (false, DocumentMetadata::IdAndVersion(id, version)) => {
221                write!(
222                    writer,
223                    "{}",
224                    json!({
225                        bulk_action: {
226                            "_id": id,
227                            "_type": doc_type,
228                            "_index": index,
229                            "version_type": version.kind.as_str(),
230                            "version": version.value,
231                        }
232                    }),
233                )
234            }
235        },
236    )
237}
238
239#[cfg(test)]
240mod tests {
241    use super::*;
242
243    #[test]
244    fn suppress_type_with_id() {
245        let mut writer = Vec::new();
246
247        write_bulk_action(
248            &mut writer,
249            "ACTION",
250            "INDEX",
251            "TYPE",
252            true,
253            &DocumentMetadata::Id("ID".to_string()),
254        )
255        .unwrap();
256
257        let value: serde_json::Value = serde_json::from_slice(&writer).unwrap();
258        let value = value.as_object().unwrap();
259
260        assert!(value.contains_key("ACTION"));
261
262        let nested = value.get("ACTION").unwrap();
263        let nested = nested.as_object().unwrap();
264
265        assert!(nested.contains_key("_index"));
266        assert_eq!(nested.get("_index").unwrap().as_str(), Some("INDEX"));
267        assert!(nested.contains_key("_id"));
268        assert_eq!(nested.get("_id").unwrap().as_str(), Some("ID"));
269        assert!(!nested.contains_key("_type"));
270    }
271
272    #[test]
273    fn suppress_type_without_id() {
274        let mut writer = Vec::new();
275
276        write_bulk_action(
277            &mut writer,
278            "ACTION",
279            "INDEX",
280            "TYPE",
281            true,
282            &DocumentMetadata::WithoutId,
283        )
284        .unwrap();
285
286        let value: serde_json::Value = serde_json::from_slice(&writer).unwrap();
287        let value = value.as_object().unwrap();
288
289        assert!(value.contains_key("ACTION"));
290
291        let nested = value.get("ACTION").unwrap();
292        let nested = nested.as_object().unwrap();
293
294        assert!(nested.contains_key("_index"));
295        assert_eq!(nested.get("_index").unwrap().as_str(), Some("INDEX"));
296        assert!(!nested.contains_key("_id"));
297        assert!(!nested.contains_key("_type"));
298    }
299
300    #[test]
301    fn type_with_id() {
302        let mut writer = Vec::new();
303
304        write_bulk_action(
305            &mut writer,
306            "ACTION",
307            "INDEX",
308            "TYPE",
309            false,
310            &DocumentMetadata::Id("ID".to_string()),
311        )
312        .unwrap();
313
314        let value: serde_json::Value = serde_json::from_slice(&writer).unwrap();
315        let value = value.as_object().unwrap();
316
317        assert!(value.contains_key("ACTION"));
318
319        let nested = value.get("ACTION").unwrap();
320        let nested = nested.as_object().unwrap();
321
322        assert!(nested.contains_key("_index"));
323        assert_eq!(nested.get("_index").unwrap().as_str(), Some("INDEX"));
324        assert!(nested.contains_key("_id"));
325        assert_eq!(nested.get("_id").unwrap().as_str(), Some("ID"));
326        assert!(nested.contains_key("_type"));
327        assert_eq!(nested.get("_type").unwrap().as_str(), Some("TYPE"));
328    }
329
330    #[test]
331    fn type_without_id() {
332        let mut writer = Vec::new();
333
334        write_bulk_action(
335            &mut writer,
336            "ACTION",
337            "INDEX",
338            "TYPE",
339            false,
340            &DocumentMetadata::WithoutId,
341        )
342        .unwrap();
343
344        let value: serde_json::Value = serde_json::from_slice(&writer).unwrap();
345        let value = value.as_object().unwrap();
346
347        assert!(value.contains_key("ACTION"));
348
349        let nested = value.get("ACTION").unwrap();
350        let nested = nested.as_object().unwrap();
351
352        assert!(nested.contains_key("_index"));
353        assert_eq!(nested.get("_index").unwrap().as_str(), Some("INDEX"));
354        assert!(!nested.contains_key("_id"));
355        assert!(nested.contains_key("_type"));
356        assert_eq!(nested.get("_type").unwrap().as_str(), Some("TYPE"));
357    }
358
359    #[test]
360    fn encodes_fields_with_newlines() {
361        let mut writer = Vec::new();
362
363        write_bulk_action(
364            &mut writer,
365            "ACTION\n",
366            "INDEX\n",
367            "TYPE\n",
368            false,
369            &DocumentMetadata::Id("ID\n".to_string()),
370        )
371        .unwrap();
372
373        let value: serde_json::Value = serde_json::from_slice(&writer).unwrap();
374        let value = value.as_object().unwrap();
375
376        assert!(value.contains_key("ACTION\n"));
377
378        let nested = value.get("ACTION\n").unwrap();
379        let nested = nested.as_object().unwrap();
380
381        assert!(nested.contains_key("_index"));
382        assert_eq!(nested.get("_index").unwrap().as_str(), Some("INDEX\n"));
383        assert!(nested.contains_key("_id"));
384        assert_eq!(nested.get("_id").unwrap().as_str(), Some("ID\n"));
385        assert!(nested.contains_key("_type"));
386        assert_eq!(nested.get("_type").unwrap().as_str(), Some("TYPE\n"));
387    }
388}