vector/sinks/elasticsearch/
request_builder.rs

1use bytes::Bytes;
2use vector_lib::{
3    EstimatedJsonEncodedSizeOf, json_size::JsonSize, request_metadata::RequestMetadata,
4};
5
6use crate::{
7    event::{EventFinalizers, Finalizable},
8    sinks::{
9        elasticsearch::{
10            encoder::{ElasticsearchEncoder, ProcessedEvent},
11            service::ElasticsearchRequest,
12        },
13        util::{
14            Compression, RequestBuilder, metadata::RequestMetadataBuilder,
15            request_builder::EncodeResult,
16        },
17    },
18};
19
20#[derive(Debug, Clone)]
21pub struct ElasticsearchRequestBuilder {
22    pub compression: Compression,
23    pub encoder: ElasticsearchEncoder,
24}
25
26pub struct Metadata {
27    finalizers: EventFinalizers,
28    batch_size: usize,
29    events_byte_size: JsonSize,
30    original_events: Vec<ProcessedEvent>,
31}
32
33impl RequestBuilder<Vec<ProcessedEvent>> for ElasticsearchRequestBuilder {
34    type Metadata = Metadata;
35    type Events = Vec<ProcessedEvent>;
36    type Encoder = ElasticsearchEncoder;
37    type Payload = Bytes;
38    type Request = ElasticsearchRequest;
39    type Error = std::io::Error;
40
41    fn compression(&self) -> Compression {
42        self.compression
43    }
44
45    fn encoder(&self) -> &Self::Encoder {
46        &self.encoder
47    }
48
49    fn split_input(
50        &self,
51        mut events: Vec<ProcessedEvent>,
52    ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
53        let events_byte_size = events
54            .iter()
55            .map(|x| x.log.estimated_json_encoded_size_of())
56            .reduce(|a, b| a + b)
57            .unwrap_or(JsonSize::zero());
58
59        let metadata_builder = RequestMetadataBuilder::from_events(&events);
60
61        let es_metadata = Metadata {
62            finalizers: events.take_finalizers(),
63            batch_size: events.len(),
64            events_byte_size,
65            original_events: events.clone(),
66        };
67        (es_metadata, metadata_builder, events)
68    }
69
70    fn build_request(
71        &self,
72        es_metadata: Self::Metadata,
73        metadata: RequestMetadata,
74        payload: EncodeResult<Self::Payload>,
75    ) -> Self::Request {
76        ElasticsearchRequest {
77            payload: payload.into_payload(),
78            finalizers: es_metadata.finalizers,
79            batch_size: es_metadata.batch_size,
80            events_byte_size: es_metadata.events_byte_size,
81            metadata,
82            original_events: es_metadata.original_events,
83            elasticsearch_request_builder: self.clone(),
84        }
85    }
86}