vector/sinks/elasticsearch/
sink.rs1use std::fmt;
2
3use vector_lib::lookup::lookup_v2::ConfigValuePath;
4use vrl::path::PathPrefix;
5
6use crate::{
7 sinks::{
8 elasticsearch::{
9 encoder::ProcessedEvent, request_builder::ElasticsearchRequestBuilder,
10 service::ElasticsearchRequest, BulkAction, ElasticsearchCommonMode,
11 },
12 prelude::*,
13 },
14 transforms::metric_to_log::MetricToLog,
15};
16
17use super::{
18 encoder::{DocumentMetadata, DocumentVersion, DocumentVersionType},
19 ElasticsearchCommon, ElasticsearchConfig, VersionType,
20};
21
22#[derive(Clone, Eq, Hash, PartialEq)]
23pub struct PartitionKey {
24 pub index: String,
25 pub bulk_action: BulkAction,
26}
27
28pub struct ElasticsearchSink<S> {
29 pub batch_settings: BatcherSettings,
30 pub request_builder: ElasticsearchRequestBuilder,
31 pub transformer: Transformer,
32 pub service: S,
33 pub metric_to_log: MetricToLog,
34 pub mode: ElasticsearchCommonMode,
35 pub id_key_field: Option<ConfigValuePath>,
36}
37
38impl<S> ElasticsearchSink<S> {
39 pub fn new(
40 common: &ElasticsearchCommon,
41 config: &ElasticsearchConfig,
42 service: S,
43 ) -> crate::Result<Self> {
44 let batch_settings = config.batch.into_batcher_settings()?;
45
46 Ok(ElasticsearchSink {
47 batch_settings,
48 request_builder: common.request_builder.clone(),
49 transformer: config.encoding.clone(),
50 service,
51 metric_to_log: common.metric_to_log.clone(),
52 mode: common.mode.clone(),
53 id_key_field: config.id_key.clone(),
54 })
55 }
56}
57
58impl<S> ElasticsearchSink<S>
59where
60 S: Service<ElasticsearchRequest> + Send + 'static,
61 S::Future: Send + 'static,
62 S::Response: DriverResponse + Send + 'static,
63 S::Error: fmt::Debug + Into<crate::Error> + Send,
64{
65 pub async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
66 let mode = self.mode;
67 let id_key_field = self.id_key_field.as_ref();
68 let transformer = self.transformer.clone();
69
70 input
71 .scan(self.metric_to_log, |metric_to_log, event| {
72 future::ready(Some(match event {
73 Event::Metric(metric) => metric_to_log.transform_one(metric),
74 Event::Log(log) => Some(log),
75 Event::Trace(_) => {
76 None
80 }
81 }))
82 })
83 .filter_map(|x| async move { x })
84 .filter_map(move |log| {
85 future::ready(process_log(log, &mode, id_key_field, &transformer))
86 })
87 .batched(self.batch_settings.as_byte_size_config())
88 .request_builder(
89 default_request_builder_concurrency_limit(),
90 self.request_builder,
91 )
92 .filter_map(|request| async move {
93 match request {
94 Err(error) => {
95 emit!(SinkRequestBuildError { error });
96 None
97 }
98 Ok(req) => Some(req),
99 }
100 })
101 .into_driver(self.service)
102 .run()
103 .await
104 }
105}
106
107pub(super) fn process_log(
110 mut log: LogEvent,
111 mode: &ElasticsearchCommonMode,
112 id_key_field: Option<&ConfigValuePath>,
113 transformer: &Transformer,
114) -> Option<ProcessedEvent> {
115 let index = mode.index(&log)?;
116 let bulk_action = mode.bulk_action(&log)?;
117
118 if let Some(cfg) = mode.as_data_stream_config() {
119 cfg.sync_fields(&mut log);
120 cfg.remap_timestamp(&mut log);
121 };
122
123 let id = id_key_field
124 .and_then(|key| log.remove((PathPrefix::Event, key)))
125 .and_then(|id| id.as_str().map(Into::into));
126 let document_metadata = match (id, mode.version_type(), mode.version(&log)) {
127 (None, _, _) => DocumentMetadata::WithoutId,
128 (Some(id), None, None) | (Some(id), None, Some(_)) | (Some(id), Some(_), None) => {
129 DocumentMetadata::Id(id)
130 }
131 (Some(id), Some(version_type), Some(version)) => match version_type {
132 VersionType::Internal => DocumentMetadata::Id(id),
133 VersionType::External => DocumentMetadata::IdAndVersion(
134 id,
135 DocumentVersion {
136 kind: DocumentVersionType::External,
137 value: version,
138 },
139 ),
140 VersionType::ExternalGte => DocumentMetadata::IdAndVersion(
141 id,
142 DocumentVersion {
143 kind: DocumentVersionType::ExternalGte,
144 value: version,
145 },
146 ),
147 },
148 };
149 let log = {
150 let mut event = Event::from(log);
151 transformer.transform(&mut event);
152 event.into_log()
153 };
154 Some(ProcessedEvent {
155 index,
156 bulk_action,
157 log,
158 document_metadata,
159 })
160}
161
162#[async_trait]
163impl<S> StreamSink<Event> for ElasticsearchSink<S>
164where
165 S: Service<ElasticsearchRequest> + Send + 'static,
166 S::Future: Send + 'static,
167 S::Response: DriverResponse + Send + 'static,
168 S::Error: fmt::Debug + Into<crate::Error> + Send,
169{
170 async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
171 self.run_inner(input).await
172 }
173}