1use std::{io, io::Write};
2
3use serde::Serialize;
4use serde_json::json;
5use vector_lib::{
6 ByteSizeOf, EstimatedJsonEncodedSizeOf,
7 buffers::EventCount,
8 config::telemetry,
9 event::Event,
10 internal_event::TaggedEventsSent,
11 json_size::JsonSize,
12 request_metadata::{GetEventCountTags, GroupedCountByteSize},
13};
14
15use crate::{
16 codecs::Transformer,
17 event::{EventFinalizers, Finalizable, LogEvent},
18 sinks::{
19 elasticsearch::{BulkAction, VersionType},
20 util::encoding::{Encoder, as_tracked_write},
21 },
22};
23
24#[derive(Serialize, Clone, Debug)]
25pub enum DocumentVersionType {
26 External,
27 ExternalGte,
28}
29
30impl DocumentVersionType {
31 pub const fn as_str(&self) -> &'static str {
32 match self {
33 DocumentVersionType::External => VersionType::External.as_str(),
34 DocumentVersionType::ExternalGte => VersionType::ExternalGte.as_str(),
35 }
36 }
37}
38
39#[derive(Serialize, Clone, Debug)]
40pub struct DocumentVersion {
41 pub kind: DocumentVersionType,
42 pub value: u64,
43}
44
45#[derive(Serialize, Clone, Debug)]
46pub enum DocumentMetadata {
47 WithoutId,
48 Id(String),
49 IdAndVersion(String, DocumentVersion),
50}
51
52#[derive(Serialize, Clone, Debug)]
53pub struct ProcessedEvent {
54 pub index: String,
55 pub bulk_action: BulkAction,
56 pub log: LogEvent,
57 pub document_metadata: DocumentMetadata,
58}
59
60impl Finalizable for ProcessedEvent {
61 fn take_finalizers(&mut self) -> EventFinalizers {
62 self.log.metadata_mut().take_finalizers()
63 }
64}
65
66impl ByteSizeOf for ProcessedEvent {
67 fn allocated_bytes(&self) -> usize {
68 match &self.document_metadata {
69 DocumentMetadata::WithoutId => {
70 self.index.allocated_bytes() + self.log.allocated_bytes()
71 }
72 DocumentMetadata::Id(id) | DocumentMetadata::IdAndVersion(id, _) => {
73 self.index.allocated_bytes() + self.log.allocated_bytes() + id.allocated_bytes()
74 }
75 }
76 }
77}
78
79impl EstimatedJsonEncodedSizeOf for ProcessedEvent {
80 fn estimated_json_encoded_size_of(&self) -> JsonSize {
81 self.log.estimated_json_encoded_size_of()
82 }
83}
84
85impl EventCount for ProcessedEvent {
86 fn event_count(&self) -> usize {
87 1
89 }
90}
91
92impl GetEventCountTags for ProcessedEvent {
93 fn get_tags(&self) -> TaggedEventsSent {
94 self.log.get_tags()
95 }
96}
97
98#[derive(PartialEq, Eq, Default, Clone, Debug)]
99pub struct ElasticsearchEncoder {
100 pub transformer: Transformer,
101 pub doc_type: String,
102 pub suppress_type_name: bool,
103}
104
105impl Encoder<Vec<ProcessedEvent>> for ElasticsearchEncoder {
106 fn encode_input(
107 &self,
108 input: Vec<ProcessedEvent>,
109 writer: &mut dyn Write,
110 ) -> std::io::Result<(usize, GroupedCountByteSize)> {
111 let mut written_bytes = 0;
112 let mut byte_size = telemetry().create_request_count_byte_size();
113 for event in input {
114 let log = {
115 let mut event = Event::from(event.log);
116 self.transformer.transform(&mut event);
117 byte_size.add_event(&event, event.estimated_json_encoded_size_of());
118
119 event.into_log()
120 };
121 written_bytes += write_bulk_action(
122 writer,
123 event.bulk_action.as_str(),
124 &event.index,
125 &self.doc_type,
126 self.suppress_type_name,
127 &event.document_metadata,
128 )?;
129 written_bytes +=
130 as_tracked_write::<_, _, io::Error>(writer, &log, |mut writer, log| {
131 writer.write_all(b"\n")?;
132 #[allow(clippy::needless_borrows_for_generic_args)]
135 serde_json::to_writer(&mut writer, log)?;
136 writer.write_all(b"\n")?;
137 Ok(())
138 })?;
139 }
140
141 Ok((written_bytes, byte_size))
142 }
143}
144
145fn write_bulk_action(
146 writer: &mut dyn Write,
147 bulk_action: &str,
148 index: &str,
149 doc_type: &str,
150 suppress_type: bool,
151 document: &DocumentMetadata,
152) -> std::io::Result<usize> {
153 as_tracked_write(
154 writer,
155 (bulk_action, index, doc_type, suppress_type, document),
156 |writer, (bulk_action, index, doc_type, suppress_type, document)| match (
157 suppress_type,
158 document,
159 ) {
160 (true, DocumentMetadata::Id(id)) => {
161 write!(
162 writer,
163 "{}",
164 json!({
165 bulk_action: {
166 "_index": index,
167 "_id": id,
168 }
169 }),
170 )
171 }
172 (false, DocumentMetadata::Id(id)) => {
173 write!(
174 writer,
175 "{}",
176 json!({
177 bulk_action: {
178 "_type": doc_type,
179 "_index": index,
180 "_id": id,
181 }
182 }),
183 )
184 }
185 (true, DocumentMetadata::WithoutId) => {
186 write!(
187 writer,
188 "{}",
189 json!({
190 bulk_action: {
191 "_index": index,
192 }
193 }),
194 )
195 }
196 (false, DocumentMetadata::WithoutId) => {
197 write!(
198 writer,
199 "{}",
200 json!({
201 bulk_action: {
202 "_type": doc_type,
203 "_index": index,
204 }
205 }),
206 )
207 }
208 (true, DocumentMetadata::IdAndVersion(id, version)) => {
209 write!(
210 writer,
211 "{}",
212 json!({
213 bulk_action: {
214 "_id": id,
215 "_index": index,
216 "version_type": version.kind.as_str(),
217 "version": version.value,
218 }
219 }),
220 )
221 }
222 (false, DocumentMetadata::IdAndVersion(id, version)) => {
223 write!(
224 writer,
225 "{}",
226 json!({
227 bulk_action: {
228 "_id": id,
229 "_type": doc_type,
230 "_index": index,
231 "version_type": version.kind.as_str(),
232 "version": version.value,
233 }
234 }),
235 )
236 }
237 },
238 )
239}
240
241#[cfg(test)]
242mod tests {
243 use super::*;
244
245 #[test]
246 fn suppress_type_with_id() {
247 let mut writer = Vec::new();
248
249 write_bulk_action(
250 &mut writer,
251 "ACTION",
252 "INDEX",
253 "TYPE",
254 true,
255 &DocumentMetadata::Id("ID".to_string()),
256 )
257 .unwrap();
258
259 let value: serde_json::Value = serde_json::from_slice(&writer).unwrap();
260 let value = value.as_object().unwrap();
261
262 assert!(value.contains_key("ACTION"));
263
264 let nested = value.get("ACTION").unwrap();
265 let nested = nested.as_object().unwrap();
266
267 assert!(nested.contains_key("_index"));
268 assert_eq!(nested.get("_index").unwrap().as_str(), Some("INDEX"));
269 assert!(nested.contains_key("_id"));
270 assert_eq!(nested.get("_id").unwrap().as_str(), Some("ID"));
271 assert!(!nested.contains_key("_type"));
272 }
273
274 #[test]
275 fn suppress_type_without_id() {
276 let mut writer = Vec::new();
277
278 write_bulk_action(
279 &mut writer,
280 "ACTION",
281 "INDEX",
282 "TYPE",
283 true,
284 &DocumentMetadata::WithoutId,
285 )
286 .unwrap();
287
288 let value: serde_json::Value = serde_json::from_slice(&writer).unwrap();
289 let value = value.as_object().unwrap();
290
291 assert!(value.contains_key("ACTION"));
292
293 let nested = value.get("ACTION").unwrap();
294 let nested = nested.as_object().unwrap();
295
296 assert!(nested.contains_key("_index"));
297 assert_eq!(nested.get("_index").unwrap().as_str(), Some("INDEX"));
298 assert!(!nested.contains_key("_id"));
299 assert!(!nested.contains_key("_type"));
300 }
301
302 #[test]
303 fn type_with_id() {
304 let mut writer = Vec::new();
305
306 write_bulk_action(
307 &mut writer,
308 "ACTION",
309 "INDEX",
310 "TYPE",
311 false,
312 &DocumentMetadata::Id("ID".to_string()),
313 )
314 .unwrap();
315
316 let value: serde_json::Value = serde_json::from_slice(&writer).unwrap();
317 let value = value.as_object().unwrap();
318
319 assert!(value.contains_key("ACTION"));
320
321 let nested = value.get("ACTION").unwrap();
322 let nested = nested.as_object().unwrap();
323
324 assert!(nested.contains_key("_index"));
325 assert_eq!(nested.get("_index").unwrap().as_str(), Some("INDEX"));
326 assert!(nested.contains_key("_id"));
327 assert_eq!(nested.get("_id").unwrap().as_str(), Some("ID"));
328 assert!(nested.contains_key("_type"));
329 assert_eq!(nested.get("_type").unwrap().as_str(), Some("TYPE"));
330 }
331
332 #[test]
333 fn type_without_id() {
334 let mut writer = Vec::new();
335
336 write_bulk_action(
337 &mut writer,
338 "ACTION",
339 "INDEX",
340 "TYPE",
341 false,
342 &DocumentMetadata::WithoutId,
343 )
344 .unwrap();
345
346 let value: serde_json::Value = serde_json::from_slice(&writer).unwrap();
347 let value = value.as_object().unwrap();
348
349 assert!(value.contains_key("ACTION"));
350
351 let nested = value.get("ACTION").unwrap();
352 let nested = nested.as_object().unwrap();
353
354 assert!(nested.contains_key("_index"));
355 assert_eq!(nested.get("_index").unwrap().as_str(), Some("INDEX"));
356 assert!(!nested.contains_key("_id"));
357 assert!(nested.contains_key("_type"));
358 assert_eq!(nested.get("_type").unwrap().as_str(), Some("TYPE"));
359 }
360
361 #[test]
362 fn encodes_fields_with_newlines() {
363 let mut writer = Vec::new();
364
365 write_bulk_action(
366 &mut writer,
367 "ACTION\n",
368 "INDEX\n",
369 "TYPE\n",
370 false,
371 &DocumentMetadata::Id("ID\n".to_string()),
372 )
373 .unwrap();
374
375 let value: serde_json::Value = serde_json::from_slice(&writer).unwrap();
376 let value = value.as_object().unwrap();
377
378 assert!(value.contains_key("ACTION\n"));
379
380 let nested = value.get("ACTION\n").unwrap();
381 let nested = nested.as_object().unwrap();
382
383 assert!(nested.contains_key("_index"));
384 assert_eq!(nested.get("_index").unwrap().as_str(), Some("INDEX\n"));
385 assert!(nested.contains_key("_id"));
386 assert_eq!(nested.get("_id").unwrap().as_str(), Some("ID\n"));
387 assert!(nested.contains_key("_type"));
388 assert_eq!(nested.get("_type").unwrap().as_str(), Some("TYPE\n"));
389 }
390}