vector/sinks/elasticsearch/
request_builder.rs1use bytes::Bytes;
2use vector_lib::{
3 EstimatedJsonEncodedSizeOf, json_size::JsonSize, request_metadata::RequestMetadata,
4};
5
6use crate::{
7 event::{EventFinalizers, Finalizable},
8 sinks::{
9 elasticsearch::{
10 encoder::{ElasticsearchEncoder, ProcessedEvent},
11 service::ElasticsearchRequest,
12 },
13 util::{
14 Compression, RequestBuilder, metadata::RequestMetadataBuilder,
15 request_builder::EncodeResult,
16 },
17 },
18};
19
20#[derive(Debug, Clone)]
21pub struct ElasticsearchRequestBuilder {
22 pub compression: Compression,
23 pub encoder: ElasticsearchEncoder,
24}
25
26pub struct Metadata {
27 finalizers: EventFinalizers,
28 batch_size: usize,
29 events_byte_size: JsonSize,
30 original_events: Vec<ProcessedEvent>,
31}
32
33impl RequestBuilder<Vec<ProcessedEvent>> for ElasticsearchRequestBuilder {
34 type Metadata = Metadata;
35 type Events = Vec<ProcessedEvent>;
36 type Encoder = ElasticsearchEncoder;
37 type Payload = Bytes;
38 type Request = ElasticsearchRequest;
39 type Error = std::io::Error;
40
41 fn compression(&self) -> Compression {
42 self.compression
43 }
44
45 fn encoder(&self) -> &Self::Encoder {
46 &self.encoder
47 }
48
49 fn split_input(
50 &self,
51 mut events: Vec<ProcessedEvent>,
52 ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
53 let events_byte_size = events
54 .iter()
55 .map(|x| x.log.estimated_json_encoded_size_of())
56 .reduce(|a, b| a + b)
57 .unwrap_or(JsonSize::zero());
58
59 let metadata_builder = RequestMetadataBuilder::from_events(&events);
60
61 let es_metadata = Metadata {
62 finalizers: events.take_finalizers(),
63 batch_size: events.len(),
64 events_byte_size,
65 original_events: events.clone(),
66 };
67 (es_metadata, metadata_builder, events)
68 }
69
70 fn build_request(
71 &self,
72 es_metadata: Self::Metadata,
73 metadata: RequestMetadata,
74 payload: EncodeResult<Self::Payload>,
75 ) -> Self::Request {
76 ElasticsearchRequest {
77 payload: payload.into_payload(),
78 finalizers: es_metadata.finalizers,
79 batch_size: es_metadata.batch_size,
80 events_byte_size: es_metadata.events_byte_size,
81 metadata,
82 original_events: es_metadata.original_events,
83 elasticsearch_request_builder: self.clone(),
84 }
85 }
86}