vector/sinks/elasticsearch/
sink.rs

1use std::fmt;
2
3use vector_lib::lookup::lookup_v2::ConfigValuePath;
4use vrl::path::PathPrefix;
5
6use super::{
7    ElasticsearchCommon, ElasticsearchConfig, VersionType,
8    encoder::{DocumentMetadata, DocumentVersion, DocumentVersionType},
9};
10use crate::{
11    sinks::{
12        elasticsearch::{
13            BulkAction, ElasticsearchCommonMode, encoder::ProcessedEvent,
14            request_builder::ElasticsearchRequestBuilder, service::ElasticsearchRequest,
15        },
16        prelude::*,
17    },
18    transforms::metric_to_log::MetricToLog,
19};
20
21#[derive(Clone, Eq, Hash, PartialEq)]
22pub struct PartitionKey {
23    pub index: String,
24    pub bulk_action: BulkAction,
25}
26
27pub struct ElasticsearchSink<S> {
28    pub batch_settings: BatcherSettings,
29    pub request_builder: ElasticsearchRequestBuilder,
30    pub transformer: Transformer,
31    pub service: S,
32    pub metric_to_log: MetricToLog,
33    pub mode: ElasticsearchCommonMode,
34    pub id_key_field: Option<ConfigValuePath>,
35}
36
37impl<S> ElasticsearchSink<S> {
38    pub fn new(
39        common: &ElasticsearchCommon,
40        config: &ElasticsearchConfig,
41        service: S,
42    ) -> crate::Result<Self> {
43        let batch_settings = config.batch.into_batcher_settings()?;
44
45        Ok(ElasticsearchSink {
46            batch_settings,
47            request_builder: common.request_builder.clone(),
48            transformer: config.encoding.clone(),
49            service,
50            metric_to_log: common.metric_to_log.clone(),
51            mode: common.mode.clone(),
52            id_key_field: config.id_key.clone(),
53        })
54    }
55}
56
57impl<S> ElasticsearchSink<S>
58where
59    S: Service<ElasticsearchRequest> + Send + 'static,
60    S::Future: Send + 'static,
61    S::Response: DriverResponse + Send + 'static,
62    S::Error: fmt::Debug + Into<crate::Error> + Send,
63{
64    pub async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
65        let mode = self.mode;
66        let id_key_field = self.id_key_field.as_ref();
67        let transformer = self.transformer.clone();
68
69        input
70            .scan(self.metric_to_log, |metric_to_log, event| {
71                future::ready(Some(match event {
72                    Event::Metric(metric) => metric_to_log.transform_one(metric),
73                    Event::Log(log) => Some(log),
74                    Event::Trace(_) => {
75                        // Although technically this will cause the event to be dropped, due to the sink
76                        // config it is not possible to send traces to this sink - so this situation can
77                        // never occur. We don't need to emit an `EventsDropped` event.
78                        None
79                    }
80                }))
81            })
82            .filter_map(|x| async move { x })
83            .filter_map(move |log| {
84                future::ready(process_log(log, &mode, id_key_field, &transformer))
85            })
86            .batched(self.batch_settings.as_byte_size_config())
87            .request_builder(
88                default_request_builder_concurrency_limit(),
89                self.request_builder,
90            )
91            .filter_map(|request| async move {
92                match request {
93                    Err(error) => {
94                        emit!(SinkRequestBuildError { error });
95                        None
96                    }
97                    Ok(req) => Some(req),
98                }
99            })
100            .into_driver(self.service)
101            .run()
102            .await
103    }
104}
105
106/// Any `None` values returned from this function will already result in a `TemplateRenderingError`
107/// being emitted, so no further `EventsDropped` event needs emitting.
108pub(super) fn process_log(
109    mut log: LogEvent,
110    mode: &ElasticsearchCommonMode,
111    id_key_field: Option<&ConfigValuePath>,
112    transformer: &Transformer,
113) -> Option<ProcessedEvent> {
114    let index = mode.index(&log)?;
115    let bulk_action = mode.bulk_action(&log)?;
116
117    if let Some(cfg) = mode.as_data_stream_config() {
118        cfg.sync_fields(&mut log);
119        cfg.remap_timestamp(&mut log);
120    };
121
122    let id = id_key_field
123        .and_then(|key| log.remove((PathPrefix::Event, key)))
124        .and_then(|id| id.as_str().map(Into::into));
125    let document_metadata = match (id, mode.version_type(), mode.version(&log)) {
126        (None, _, _) => DocumentMetadata::WithoutId,
127        (Some(id), None, None) | (Some(id), None, Some(_)) | (Some(id), Some(_), None) => {
128            DocumentMetadata::Id(id)
129        }
130        (Some(id), Some(version_type), Some(version)) => match version_type {
131            VersionType::Internal => DocumentMetadata::Id(id),
132            VersionType::External => DocumentMetadata::IdAndVersion(
133                id,
134                DocumentVersion {
135                    kind: DocumentVersionType::External,
136                    value: version,
137                },
138            ),
139            VersionType::ExternalGte => DocumentMetadata::IdAndVersion(
140                id,
141                DocumentVersion {
142                    kind: DocumentVersionType::ExternalGte,
143                    value: version,
144                },
145            ),
146        },
147    };
148    let log = {
149        let mut event = Event::from(log);
150        transformer.transform(&mut event);
151        event.into_log()
152    };
153    Some(ProcessedEvent {
154        index,
155        bulk_action,
156        log,
157        document_metadata,
158    })
159}
160
161#[async_trait]
162impl<S> StreamSink<Event> for ElasticsearchSink<S>
163where
164    S: Service<ElasticsearchRequest> + Send + 'static,
165    S::Future: Send + 'static,
166    S::Response: DriverResponse + Send + 'static,
167    S::Error: fmt::Debug + Into<crate::Error> + Send,
168{
169    async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
170        self.run_inner(input).await
171    }
172}