vector/sinks/elasticsearch/
encoder.rs

1use std::{io, io::Write};
2
3use serde::Serialize;
4use serde_json::json;
5use vector_lib::{
6    ByteSizeOf, EstimatedJsonEncodedSizeOf,
7    buffers::EventCount,
8    config::telemetry,
9    event::Event,
10    internal_event::TaggedEventsSent,
11    json_size::JsonSize,
12    request_metadata::{GetEventCountTags, GroupedCountByteSize},
13};
14
15use crate::{
16    codecs::Transformer,
17    event::{EventFinalizers, Finalizable, LogEvent},
18    sinks::{
19        elasticsearch::{BulkAction, VersionType},
20        util::encoding::{Encoder, as_tracked_write},
21    },
22};
23
24#[derive(Serialize, Clone, Debug)]
25pub enum DocumentVersionType {
26    External,
27    ExternalGte,
28}
29
30impl DocumentVersionType {
31    pub const fn as_str(&self) -> &'static str {
32        match self {
33            DocumentVersionType::External => VersionType::External.as_str(),
34            DocumentVersionType::ExternalGte => VersionType::ExternalGte.as_str(),
35        }
36    }
37}
38
39#[derive(Serialize, Clone, Debug)]
40pub struct DocumentVersion {
41    pub kind: DocumentVersionType,
42    pub value: u64,
43}
44
45#[derive(Serialize, Clone, Debug)]
46pub enum DocumentMetadata {
47    WithoutId,
48    Id(String),
49    IdAndVersion(String, DocumentVersion),
50}
51
52#[derive(Serialize, Clone, Debug)]
53pub struct ProcessedEvent {
54    pub index: String,
55    pub bulk_action: BulkAction,
56    pub log: LogEvent,
57    pub document_metadata: DocumentMetadata,
58}
59
60impl Finalizable for ProcessedEvent {
61    fn take_finalizers(&mut self) -> EventFinalizers {
62        self.log.metadata_mut().take_finalizers()
63    }
64}
65
66impl ByteSizeOf for ProcessedEvent {
67    fn allocated_bytes(&self) -> usize {
68        match &self.document_metadata {
69            DocumentMetadata::WithoutId => {
70                self.index.allocated_bytes() + self.log.allocated_bytes()
71            }
72            DocumentMetadata::Id(id) | DocumentMetadata::IdAndVersion(id, _) => {
73                self.index.allocated_bytes() + self.log.allocated_bytes() + id.allocated_bytes()
74            }
75        }
76    }
77}
78
79impl EstimatedJsonEncodedSizeOf for ProcessedEvent {
80    fn estimated_json_encoded_size_of(&self) -> JsonSize {
81        self.log.estimated_json_encoded_size_of()
82    }
83}
84
85impl EventCount for ProcessedEvent {
86    fn event_count(&self) -> usize {
87        // An Elasticsearch ProcessedEvent is mapped one-to-one with an event.
88        1
89    }
90}
91
92impl GetEventCountTags for ProcessedEvent {
93    fn get_tags(&self) -> TaggedEventsSent {
94        self.log.get_tags()
95    }
96}
97
98#[derive(PartialEq, Eq, Default, Clone, Debug)]
99pub struct ElasticsearchEncoder {
100    pub transformer: Transformer,
101    pub doc_type: String,
102    pub suppress_type_name: bool,
103}
104
105impl Encoder<Vec<ProcessedEvent>> for ElasticsearchEncoder {
106    fn encode_input(
107        &self,
108        input: Vec<ProcessedEvent>,
109        writer: &mut dyn Write,
110    ) -> std::io::Result<(usize, GroupedCountByteSize)> {
111        let mut written_bytes = 0;
112        let mut byte_size = telemetry().create_request_count_byte_size();
113        for event in input {
114            let log = {
115                let mut event = Event::from(event.log);
116                self.transformer.transform(&mut event);
117                byte_size.add_event(&event, event.estimated_json_encoded_size_of());
118
119                event.into_log()
120            };
121            written_bytes += write_bulk_action(
122                writer,
123                event.bulk_action.as_str(),
124                &event.index,
125                &self.doc_type,
126                self.suppress_type_name,
127                &event.document_metadata,
128            )?;
129            written_bytes +=
130                as_tracked_write::<_, _, io::Error>(writer, &log, |mut writer, log| {
131                    writer.write_all(b"\n")?;
132                    // False positive clippy hit on the following line. Clippy wants us to skip the
133                    // borrow, but then the value is moved for the following line.
134                    #[allow(clippy::needless_borrows_for_generic_args)]
135                    serde_json::to_writer(&mut writer, log)?;
136                    writer.write_all(b"\n")?;
137                    Ok(())
138                })?;
139        }
140
141        Ok((written_bytes, byte_size))
142    }
143}
144
145fn write_bulk_action(
146    writer: &mut dyn Write,
147    bulk_action: &str,
148    index: &str,
149    doc_type: &str,
150    suppress_type: bool,
151    document: &DocumentMetadata,
152) -> std::io::Result<usize> {
153    as_tracked_write(
154        writer,
155        (bulk_action, index, doc_type, suppress_type, document),
156        |writer, (bulk_action, index, doc_type, suppress_type, document)| match (
157            suppress_type,
158            document,
159        ) {
160            (true, DocumentMetadata::Id(id)) => {
161                write!(
162                    writer,
163                    "{}",
164                    json!({
165                        bulk_action: {
166                            "_index": index,
167                            "_id": id,
168                        }
169                    }),
170                )
171            }
172            (false, DocumentMetadata::Id(id)) => {
173                write!(
174                    writer,
175                    "{}",
176                    json!({
177                        bulk_action: {
178                            "_type": doc_type,
179                            "_index": index,
180                            "_id": id,
181                        }
182                    }),
183                )
184            }
185            (true, DocumentMetadata::WithoutId) => {
186                write!(
187                    writer,
188                    "{}",
189                    json!({
190                        bulk_action: {
191                            "_index": index,
192                        }
193                    }),
194                )
195            }
196            (false, DocumentMetadata::WithoutId) => {
197                write!(
198                    writer,
199                    "{}",
200                    json!({
201                        bulk_action: {
202                            "_type": doc_type,
203                            "_index": index,
204                        }
205                    }),
206                )
207            }
208            (true, DocumentMetadata::IdAndVersion(id, version)) => {
209                write!(
210                    writer,
211                    "{}",
212                    json!({
213                        bulk_action: {
214                            "_id": id,
215                            "_index": index,
216                            "version_type": version.kind.as_str(),
217                            "version": version.value,
218                        }
219                    }),
220                )
221            }
222            (false, DocumentMetadata::IdAndVersion(id, version)) => {
223                write!(
224                    writer,
225                    "{}",
226                    json!({
227                        bulk_action: {
228                            "_id": id,
229                            "_type": doc_type,
230                            "_index": index,
231                            "version_type": version.kind.as_str(),
232                            "version": version.value,
233                        }
234                    }),
235                )
236            }
237        },
238    )
239}
240
241#[cfg(test)]
242mod tests {
243    use super::*;
244
245    #[test]
246    fn suppress_type_with_id() {
247        let mut writer = Vec::new();
248
249        write_bulk_action(
250            &mut writer,
251            "ACTION",
252            "INDEX",
253            "TYPE",
254            true,
255            &DocumentMetadata::Id("ID".to_string()),
256        )
257        .unwrap();
258
259        let value: serde_json::Value = serde_json::from_slice(&writer).unwrap();
260        let value = value.as_object().unwrap();
261
262        assert!(value.contains_key("ACTION"));
263
264        let nested = value.get("ACTION").unwrap();
265        let nested = nested.as_object().unwrap();
266
267        assert!(nested.contains_key("_index"));
268        assert_eq!(nested.get("_index").unwrap().as_str(), Some("INDEX"));
269        assert!(nested.contains_key("_id"));
270        assert_eq!(nested.get("_id").unwrap().as_str(), Some("ID"));
271        assert!(!nested.contains_key("_type"));
272    }
273
274    #[test]
275    fn suppress_type_without_id() {
276        let mut writer = Vec::new();
277
278        write_bulk_action(
279            &mut writer,
280            "ACTION",
281            "INDEX",
282            "TYPE",
283            true,
284            &DocumentMetadata::WithoutId,
285        )
286        .unwrap();
287
288        let value: serde_json::Value = serde_json::from_slice(&writer).unwrap();
289        let value = value.as_object().unwrap();
290
291        assert!(value.contains_key("ACTION"));
292
293        let nested = value.get("ACTION").unwrap();
294        let nested = nested.as_object().unwrap();
295
296        assert!(nested.contains_key("_index"));
297        assert_eq!(nested.get("_index").unwrap().as_str(), Some("INDEX"));
298        assert!(!nested.contains_key("_id"));
299        assert!(!nested.contains_key("_type"));
300    }
301
302    #[test]
303    fn type_with_id() {
304        let mut writer = Vec::new();
305
306        write_bulk_action(
307            &mut writer,
308            "ACTION",
309            "INDEX",
310            "TYPE",
311            false,
312            &DocumentMetadata::Id("ID".to_string()),
313        )
314        .unwrap();
315
316        let value: serde_json::Value = serde_json::from_slice(&writer).unwrap();
317        let value = value.as_object().unwrap();
318
319        assert!(value.contains_key("ACTION"));
320
321        let nested = value.get("ACTION").unwrap();
322        let nested = nested.as_object().unwrap();
323
324        assert!(nested.contains_key("_index"));
325        assert_eq!(nested.get("_index").unwrap().as_str(), Some("INDEX"));
326        assert!(nested.contains_key("_id"));
327        assert_eq!(nested.get("_id").unwrap().as_str(), Some("ID"));
328        assert!(nested.contains_key("_type"));
329        assert_eq!(nested.get("_type").unwrap().as_str(), Some("TYPE"));
330    }
331
332    #[test]
333    fn type_without_id() {
334        let mut writer = Vec::new();
335
336        write_bulk_action(
337            &mut writer,
338            "ACTION",
339            "INDEX",
340            "TYPE",
341            false,
342            &DocumentMetadata::WithoutId,
343        )
344        .unwrap();
345
346        let value: serde_json::Value = serde_json::from_slice(&writer).unwrap();
347        let value = value.as_object().unwrap();
348
349        assert!(value.contains_key("ACTION"));
350
351        let nested = value.get("ACTION").unwrap();
352        let nested = nested.as_object().unwrap();
353
354        assert!(nested.contains_key("_index"));
355        assert_eq!(nested.get("_index").unwrap().as_str(), Some("INDEX"));
356        assert!(!nested.contains_key("_id"));
357        assert!(nested.contains_key("_type"));
358        assert_eq!(nested.get("_type").unwrap().as_str(), Some("TYPE"));
359    }
360
361    #[test]
362    fn encodes_fields_with_newlines() {
363        let mut writer = Vec::new();
364
365        write_bulk_action(
366            &mut writer,
367            "ACTION\n",
368            "INDEX\n",
369            "TYPE\n",
370            false,
371            &DocumentMetadata::Id("ID\n".to_string()),
372        )
373        .unwrap();
374
375        let value: serde_json::Value = serde_json::from_slice(&writer).unwrap();
376        let value = value.as_object().unwrap();
377
378        assert!(value.contains_key("ACTION\n"));
379
380        let nested = value.get("ACTION\n").unwrap();
381        let nested = nested.as_object().unwrap();
382
383        assert!(nested.contains_key("_index"));
384        assert_eq!(nested.get("_index").unwrap().as_str(), Some("INDEX\n"));
385        assert!(nested.contains_key("_id"));
386        assert_eq!(nested.get("_id").unwrap().as_str(), Some("ID\n"));
387        assert!(nested.contains_key("_type"));
388        assert_eq!(nested.get("_type").unwrap().as_str(), Some("TYPE\n"));
389    }
390}