1use std::{io, io::Write};
2
3use serde::Serialize;
4use serde_json::json;
5use vector_lib::buffers::EventCount;
6use vector_lib::{config::telemetry, event::Event, ByteSizeOf, EstimatedJsonEncodedSizeOf};
7use vector_lib::{
8 internal_event::TaggedEventsSent,
9 json_size::JsonSize,
10 request_metadata::{GetEventCountTags, GroupedCountByteSize},
11};
12
13use crate::{
14 codecs::Transformer,
15 event::{EventFinalizers, Finalizable, LogEvent},
16 sinks::{
17 elasticsearch::{BulkAction, VersionType},
18 util::encoding::{as_tracked_write, Encoder},
19 },
20};
21
22#[derive(Serialize, Clone, Debug)]
23pub enum DocumentVersionType {
24 External,
25 ExternalGte,
26}
27
28impl DocumentVersionType {
29 pub const fn as_str(&self) -> &'static str {
30 match self {
31 DocumentVersionType::External => VersionType::External.as_str(),
32 DocumentVersionType::ExternalGte => VersionType::ExternalGte.as_str(),
33 }
34 }
35}
36
37#[derive(Serialize, Clone, Debug)]
38pub struct DocumentVersion {
39 pub kind: DocumentVersionType,
40 pub value: u64,
41}
42
43#[derive(Serialize, Clone, Debug)]
44pub enum DocumentMetadata {
45 WithoutId,
46 Id(String),
47 IdAndVersion(String, DocumentVersion),
48}
49
50#[derive(Serialize, Clone, Debug)]
51pub struct ProcessedEvent {
52 pub index: String,
53 pub bulk_action: BulkAction,
54 pub log: LogEvent,
55 pub document_metadata: DocumentMetadata,
56}
57
58impl Finalizable for ProcessedEvent {
59 fn take_finalizers(&mut self) -> EventFinalizers {
60 self.log.metadata_mut().take_finalizers()
61 }
62}
63
64impl ByteSizeOf for ProcessedEvent {
65 fn allocated_bytes(&self) -> usize {
66 match &self.document_metadata {
67 DocumentMetadata::WithoutId => {
68 self.index.allocated_bytes() + self.log.allocated_bytes()
69 }
70 DocumentMetadata::Id(id) | DocumentMetadata::IdAndVersion(id, _) => {
71 self.index.allocated_bytes() + self.log.allocated_bytes() + id.allocated_bytes()
72 }
73 }
74 }
75}
76
77impl EstimatedJsonEncodedSizeOf for ProcessedEvent {
78 fn estimated_json_encoded_size_of(&self) -> JsonSize {
79 self.log.estimated_json_encoded_size_of()
80 }
81}
82
83impl EventCount for ProcessedEvent {
84 fn event_count(&self) -> usize {
85 1
87 }
88}
89
90impl GetEventCountTags for ProcessedEvent {
91 fn get_tags(&self) -> TaggedEventsSent {
92 self.log.get_tags()
93 }
94}
95
96#[derive(PartialEq, Eq, Default, Clone, Debug)]
97pub struct ElasticsearchEncoder {
98 pub transformer: Transformer,
99 pub doc_type: String,
100 pub suppress_type_name: bool,
101}
102
103impl Encoder<Vec<ProcessedEvent>> for ElasticsearchEncoder {
104 fn encode_input(
105 &self,
106 input: Vec<ProcessedEvent>,
107 writer: &mut dyn Write,
108 ) -> std::io::Result<(usize, GroupedCountByteSize)> {
109 let mut written_bytes = 0;
110 let mut byte_size = telemetry().create_request_count_byte_size();
111 for event in input {
112 let log = {
113 let mut event = Event::from(event.log);
114 self.transformer.transform(&mut event);
115 byte_size.add_event(&event, event.estimated_json_encoded_size_of());
116
117 event.into_log()
118 };
119 written_bytes += write_bulk_action(
120 writer,
121 event.bulk_action.as_str(),
122 &event.index,
123 &self.doc_type,
124 self.suppress_type_name,
125 &event.document_metadata,
126 )?;
127 written_bytes +=
128 as_tracked_write::<_, _, io::Error>(writer, &log, |mut writer, log| {
129 writer.write_all(b"\n")?;
130 #[allow(clippy::needless_borrows_for_generic_args)]
133 serde_json::to_writer(&mut writer, log)?;
134 writer.write_all(b"\n")?;
135 Ok(())
136 })?;
137 }
138
139 Ok((written_bytes, byte_size))
140 }
141}
142
143fn write_bulk_action(
144 writer: &mut dyn Write,
145 bulk_action: &str,
146 index: &str,
147 doc_type: &str,
148 suppress_type: bool,
149 document: &DocumentMetadata,
150) -> std::io::Result<usize> {
151 as_tracked_write(
152 writer,
153 (bulk_action, index, doc_type, suppress_type, document),
154 |writer, (bulk_action, index, doc_type, suppress_type, document)| match (
155 suppress_type,
156 document,
157 ) {
158 (true, DocumentMetadata::Id(id)) => {
159 write!(
160 writer,
161 "{}",
162 json!({
163 bulk_action: {
164 "_index": index,
165 "_id": id,
166 }
167 }),
168 )
169 }
170 (false, DocumentMetadata::Id(id)) => {
171 write!(
172 writer,
173 "{}",
174 json!({
175 bulk_action: {
176 "_type": doc_type,
177 "_index": index,
178 "_id": id,
179 }
180 }),
181 )
182 }
183 (true, DocumentMetadata::WithoutId) => {
184 write!(
185 writer,
186 "{}",
187 json!({
188 bulk_action: {
189 "_index": index,
190 }
191 }),
192 )
193 }
194 (false, DocumentMetadata::WithoutId) => {
195 write!(
196 writer,
197 "{}",
198 json!({
199 bulk_action: {
200 "_type": doc_type,
201 "_index": index,
202 }
203 }),
204 )
205 }
206 (true, DocumentMetadata::IdAndVersion(id, version)) => {
207 write!(
208 writer,
209 "{}",
210 json!({
211 bulk_action: {
212 "_id": id,
213 "_index": index,
214 "version_type": version.kind.as_str(),
215 "version": version.value,
216 }
217 }),
218 )
219 }
220 (false, DocumentMetadata::IdAndVersion(id, version)) => {
221 write!(
222 writer,
223 "{}",
224 json!({
225 bulk_action: {
226 "_id": id,
227 "_type": doc_type,
228 "_index": index,
229 "version_type": version.kind.as_str(),
230 "version": version.value,
231 }
232 }),
233 )
234 }
235 },
236 )
237}
238
239#[cfg(test)]
240mod tests {
241 use super::*;
242
243 #[test]
244 fn suppress_type_with_id() {
245 let mut writer = Vec::new();
246
247 write_bulk_action(
248 &mut writer,
249 "ACTION",
250 "INDEX",
251 "TYPE",
252 true,
253 &DocumentMetadata::Id("ID".to_string()),
254 )
255 .unwrap();
256
257 let value: serde_json::Value = serde_json::from_slice(&writer).unwrap();
258 let value = value.as_object().unwrap();
259
260 assert!(value.contains_key("ACTION"));
261
262 let nested = value.get("ACTION").unwrap();
263 let nested = nested.as_object().unwrap();
264
265 assert!(nested.contains_key("_index"));
266 assert_eq!(nested.get("_index").unwrap().as_str(), Some("INDEX"));
267 assert!(nested.contains_key("_id"));
268 assert_eq!(nested.get("_id").unwrap().as_str(), Some("ID"));
269 assert!(!nested.contains_key("_type"));
270 }
271
272 #[test]
273 fn suppress_type_without_id() {
274 let mut writer = Vec::new();
275
276 write_bulk_action(
277 &mut writer,
278 "ACTION",
279 "INDEX",
280 "TYPE",
281 true,
282 &DocumentMetadata::WithoutId,
283 )
284 .unwrap();
285
286 let value: serde_json::Value = serde_json::from_slice(&writer).unwrap();
287 let value = value.as_object().unwrap();
288
289 assert!(value.contains_key("ACTION"));
290
291 let nested = value.get("ACTION").unwrap();
292 let nested = nested.as_object().unwrap();
293
294 assert!(nested.contains_key("_index"));
295 assert_eq!(nested.get("_index").unwrap().as_str(), Some("INDEX"));
296 assert!(!nested.contains_key("_id"));
297 assert!(!nested.contains_key("_type"));
298 }
299
300 #[test]
301 fn type_with_id() {
302 let mut writer = Vec::new();
303
304 write_bulk_action(
305 &mut writer,
306 "ACTION",
307 "INDEX",
308 "TYPE",
309 false,
310 &DocumentMetadata::Id("ID".to_string()),
311 )
312 .unwrap();
313
314 let value: serde_json::Value = serde_json::from_slice(&writer).unwrap();
315 let value = value.as_object().unwrap();
316
317 assert!(value.contains_key("ACTION"));
318
319 let nested = value.get("ACTION").unwrap();
320 let nested = nested.as_object().unwrap();
321
322 assert!(nested.contains_key("_index"));
323 assert_eq!(nested.get("_index").unwrap().as_str(), Some("INDEX"));
324 assert!(nested.contains_key("_id"));
325 assert_eq!(nested.get("_id").unwrap().as_str(), Some("ID"));
326 assert!(nested.contains_key("_type"));
327 assert_eq!(nested.get("_type").unwrap().as_str(), Some("TYPE"));
328 }
329
330 #[test]
331 fn type_without_id() {
332 let mut writer = Vec::new();
333
334 write_bulk_action(
335 &mut writer,
336 "ACTION",
337 "INDEX",
338 "TYPE",
339 false,
340 &DocumentMetadata::WithoutId,
341 )
342 .unwrap();
343
344 let value: serde_json::Value = serde_json::from_slice(&writer).unwrap();
345 let value = value.as_object().unwrap();
346
347 assert!(value.contains_key("ACTION"));
348
349 let nested = value.get("ACTION").unwrap();
350 let nested = nested.as_object().unwrap();
351
352 assert!(nested.contains_key("_index"));
353 assert_eq!(nested.get("_index").unwrap().as_str(), Some("INDEX"));
354 assert!(!nested.contains_key("_id"));
355 assert!(nested.contains_key("_type"));
356 assert_eq!(nested.get("_type").unwrap().as_str(), Some("TYPE"));
357 }
358
359 #[test]
360 fn encodes_fields_with_newlines() {
361 let mut writer = Vec::new();
362
363 write_bulk_action(
364 &mut writer,
365 "ACTION\n",
366 "INDEX\n",
367 "TYPE\n",
368 false,
369 &DocumentMetadata::Id("ID\n".to_string()),
370 )
371 .unwrap();
372
373 let value: serde_json::Value = serde_json::from_slice(&writer).unwrap();
374 let value = value.as_object().unwrap();
375
376 assert!(value.contains_key("ACTION\n"));
377
378 let nested = value.get("ACTION\n").unwrap();
379 let nested = nested.as_object().unwrap();
380
381 assert!(nested.contains_key("_index"));
382 assert_eq!(nested.get("_index").unwrap().as_str(), Some("INDEX\n"));
383 assert!(nested.contains_key("_id"));
384 assert_eq!(nested.get("_id").unwrap().as_str(), Some("ID\n"));
385 assert!(nested.contains_key("_type"));
386 assert_eq!(nested.get("_type").unwrap().as_str(), Some("TYPE\n"));
387 }
388}