vector/sinks/elasticsearch/
sink.rs1use std::fmt;
2
3use vector_lib::lookup::lookup_v2::ConfigValuePath;
4use vrl::path::PathPrefix;
5
6use super::{
7 ElasticsearchCommon, ElasticsearchConfig, VersionType,
8 encoder::{DocumentMetadata, DocumentVersion, DocumentVersionType},
9};
10use crate::{
11 sinks::{
12 elasticsearch::{
13 BulkAction, ElasticsearchCommonMode, encoder::ProcessedEvent,
14 request_builder::ElasticsearchRequestBuilder, service::ElasticsearchRequest,
15 },
16 prelude::*,
17 },
18 transforms::metric_to_log::MetricToLog,
19};
20
21#[derive(Clone, Eq, Hash, PartialEq)]
22pub struct PartitionKey {
23 pub index: String,
24 pub bulk_action: BulkAction,
25}
26
27pub struct ElasticsearchSink<S> {
28 pub batch_settings: BatcherSettings,
29 pub request_builder: ElasticsearchRequestBuilder,
30 pub transformer: Transformer,
31 pub service: S,
32 pub metric_to_log: MetricToLog,
33 pub mode: ElasticsearchCommonMode,
34 pub id_key_field: Option<ConfigValuePath>,
35}
36
37impl<S> ElasticsearchSink<S> {
38 pub fn new(
39 common: &ElasticsearchCommon,
40 config: &ElasticsearchConfig,
41 service: S,
42 ) -> crate::Result<Self> {
43 let batch_settings = config.batch.into_batcher_settings()?;
44
45 Ok(ElasticsearchSink {
46 batch_settings,
47 request_builder: common.request_builder.clone(),
48 transformer: config.encoding.clone(),
49 service,
50 metric_to_log: common.metric_to_log.clone(),
51 mode: common.mode.clone(),
52 id_key_field: config.id_key.clone(),
53 })
54 }
55}
56
57impl<S> ElasticsearchSink<S>
58where
59 S: Service<ElasticsearchRequest> + Send + 'static,
60 S::Future: Send + 'static,
61 S::Response: DriverResponse + Send + 'static,
62 S::Error: fmt::Debug + Into<crate::Error> + Send,
63{
64 pub async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
65 let mode = self.mode;
66 let id_key_field = self.id_key_field.as_ref();
67 let transformer = self.transformer.clone();
68
69 input
70 .scan(self.metric_to_log, |metric_to_log, event| {
71 future::ready(Some(match event {
72 Event::Metric(metric) => metric_to_log.transform_one(metric),
73 Event::Log(log) => Some(log),
74 Event::Trace(_) => {
75 None
79 }
80 }))
81 })
82 .filter_map(|x| async move { x })
83 .filter_map(move |log| {
84 future::ready(process_log(log, &mode, id_key_field, &transformer))
85 })
86 .batched(self.batch_settings.as_byte_size_config())
87 .request_builder(
88 default_request_builder_concurrency_limit(),
89 self.request_builder,
90 )
91 .filter_map(|request| async move {
92 match request {
93 Err(error) => {
94 emit!(SinkRequestBuildError { error });
95 None
96 }
97 Ok(req) => Some(req),
98 }
99 })
100 .into_driver(self.service)
101 .run()
102 .await
103 }
104}
105
106pub(super) fn process_log(
109 mut log: LogEvent,
110 mode: &ElasticsearchCommonMode,
111 id_key_field: Option<&ConfigValuePath>,
112 transformer: &Transformer,
113) -> Option<ProcessedEvent> {
114 let index = mode.index(&log)?;
115 let bulk_action = mode.bulk_action(&log)?;
116
117 if let Some(cfg) = mode.as_data_stream_config() {
118 cfg.sync_fields(&mut log);
119 cfg.remap_timestamp(&mut log);
120 };
121
122 let id = id_key_field
123 .and_then(|key| log.remove((PathPrefix::Event, key)))
124 .and_then(|id| id.as_str().map(Into::into));
125 let document_metadata = match (id, mode.version_type(), mode.version(&log)) {
126 (None, _, _) => DocumentMetadata::WithoutId,
127 (Some(id), None, None) | (Some(id), None, Some(_)) | (Some(id), Some(_), None) => {
128 DocumentMetadata::Id(id)
129 }
130 (Some(id), Some(version_type), Some(version)) => match version_type {
131 VersionType::Internal => DocumentMetadata::Id(id),
132 VersionType::External => DocumentMetadata::IdAndVersion(
133 id,
134 DocumentVersion {
135 kind: DocumentVersionType::External,
136 value: version,
137 },
138 ),
139 VersionType::ExternalGte => DocumentMetadata::IdAndVersion(
140 id,
141 DocumentVersion {
142 kind: DocumentVersionType::ExternalGte,
143 value: version,
144 },
145 ),
146 },
147 };
148 let log = {
149 let mut event = Event::from(log);
150 transformer.transform(&mut event);
151 event.into_log()
152 };
153 Some(ProcessedEvent {
154 index,
155 bulk_action,
156 log,
157 document_metadata,
158 })
159}
160
161#[async_trait]
162impl<S> StreamSink<Event> for ElasticsearchSink<S>
163where
164 S: Service<ElasticsearchRequest> + Send + 'static,
165 S::Future: Send + 'static,
166 S::Response: DriverResponse + Send + 'static,
167 S::Error: fmt::Debug + Into<crate::Error> + Send,
168{
169 async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
170 self.run_inner(input).await
171 }
172}