vector/sinks/elasticsearch/
sink.rs

1use std::fmt;
2
3use vector_lib::lookup::lookup_v2::ConfigValuePath;
4use vrl::path::PathPrefix;
5
6use crate::{
7    sinks::{
8        elasticsearch::{
9            encoder::ProcessedEvent, request_builder::ElasticsearchRequestBuilder,
10            service::ElasticsearchRequest, BulkAction, ElasticsearchCommonMode,
11        },
12        prelude::*,
13    },
14    transforms::metric_to_log::MetricToLog,
15};
16
17use super::{
18    encoder::{DocumentMetadata, DocumentVersion, DocumentVersionType},
19    ElasticsearchCommon, ElasticsearchConfig, VersionType,
20};
21
22#[derive(Clone, Eq, Hash, PartialEq)]
23pub struct PartitionKey {
24    pub index: String,
25    pub bulk_action: BulkAction,
26}
27
28pub struct ElasticsearchSink<S> {
29    pub batch_settings: BatcherSettings,
30    pub request_builder: ElasticsearchRequestBuilder,
31    pub transformer: Transformer,
32    pub service: S,
33    pub metric_to_log: MetricToLog,
34    pub mode: ElasticsearchCommonMode,
35    pub id_key_field: Option<ConfigValuePath>,
36}
37
38impl<S> ElasticsearchSink<S> {
39    pub fn new(
40        common: &ElasticsearchCommon,
41        config: &ElasticsearchConfig,
42        service: S,
43    ) -> crate::Result<Self> {
44        let batch_settings = config.batch.into_batcher_settings()?;
45
46        Ok(ElasticsearchSink {
47            batch_settings,
48            request_builder: common.request_builder.clone(),
49            transformer: config.encoding.clone(),
50            service,
51            metric_to_log: common.metric_to_log.clone(),
52            mode: common.mode.clone(),
53            id_key_field: config.id_key.clone(),
54        })
55    }
56}
57
58impl<S> ElasticsearchSink<S>
59where
60    S: Service<ElasticsearchRequest> + Send + 'static,
61    S::Future: Send + 'static,
62    S::Response: DriverResponse + Send + 'static,
63    S::Error: fmt::Debug + Into<crate::Error> + Send,
64{
65    pub async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
66        let mode = self.mode;
67        let id_key_field = self.id_key_field.as_ref();
68        let transformer = self.transformer.clone();
69
70        input
71            .scan(self.metric_to_log, |metric_to_log, event| {
72                future::ready(Some(match event {
73                    Event::Metric(metric) => metric_to_log.transform_one(metric),
74                    Event::Log(log) => Some(log),
75                    Event::Trace(_) => {
76                        // Although technically this will cause the event to be dropped, due to the sink
77                        // config it is not possible to send traces to this sink - so this situation can
78                        // never occur. We don't need to emit an `EventsDropped` event.
79                        None
80                    }
81                }))
82            })
83            .filter_map(|x| async move { x })
84            .filter_map(move |log| {
85                future::ready(process_log(log, &mode, id_key_field, &transformer))
86            })
87            .batched(self.batch_settings.as_byte_size_config())
88            .request_builder(
89                default_request_builder_concurrency_limit(),
90                self.request_builder,
91            )
92            .filter_map(|request| async move {
93                match request {
94                    Err(error) => {
95                        emit!(SinkRequestBuildError { error });
96                        None
97                    }
98                    Ok(req) => Some(req),
99                }
100            })
101            .into_driver(self.service)
102            .run()
103            .await
104    }
105}
106
107/// Any `None` values returned from this function will already result in a `TemplateRenderingError`
108/// being emitted, so no further `EventsDropped` event needs emitting.
109pub(super) fn process_log(
110    mut log: LogEvent,
111    mode: &ElasticsearchCommonMode,
112    id_key_field: Option<&ConfigValuePath>,
113    transformer: &Transformer,
114) -> Option<ProcessedEvent> {
115    let index = mode.index(&log)?;
116    let bulk_action = mode.bulk_action(&log)?;
117
118    if let Some(cfg) = mode.as_data_stream_config() {
119        cfg.sync_fields(&mut log);
120        cfg.remap_timestamp(&mut log);
121    };
122
123    let id = id_key_field
124        .and_then(|key| log.remove((PathPrefix::Event, key)))
125        .and_then(|id| id.as_str().map(Into::into));
126    let document_metadata = match (id, mode.version_type(), mode.version(&log)) {
127        (None, _, _) => DocumentMetadata::WithoutId,
128        (Some(id), None, None) | (Some(id), None, Some(_)) | (Some(id), Some(_), None) => {
129            DocumentMetadata::Id(id)
130        }
131        (Some(id), Some(version_type), Some(version)) => match version_type {
132            VersionType::Internal => DocumentMetadata::Id(id),
133            VersionType::External => DocumentMetadata::IdAndVersion(
134                id,
135                DocumentVersion {
136                    kind: DocumentVersionType::External,
137                    value: version,
138                },
139            ),
140            VersionType::ExternalGte => DocumentMetadata::IdAndVersion(
141                id,
142                DocumentVersion {
143                    kind: DocumentVersionType::ExternalGte,
144                    value: version,
145                },
146            ),
147        },
148    };
149    let log = {
150        let mut event = Event::from(log);
151        transformer.transform(&mut event);
152        event.into_log()
153    };
154    Some(ProcessedEvent {
155        index,
156        bulk_action,
157        log,
158        document_metadata,
159    })
160}
161
162#[async_trait]
163impl<S> StreamSink<Event> for ElasticsearchSink<S>
164where
165    S: Service<ElasticsearchRequest> + Send + 'static,
166    S::Future: Send + 'static,
167    S::Response: DriverResponse + Send + 'static,
168    S::Error: fmt::Debug + Into<crate::Error> + Send,
169{
170    async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
171        self.run_inner(input).await
172    }
173}