1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
use std::fmt;

use vector_lib::lookup::lookup_v2::ConfigValuePath;
use vrl::path::PathPrefix;

use crate::{
    sinks::{
        elasticsearch::{
            encoder::ProcessedEvent, request_builder::ElasticsearchRequestBuilder,
            service::ElasticsearchRequest, BulkAction, ElasticsearchCommonMode,
        },
        prelude::*,
    },
    transforms::metric_to_log::MetricToLog,
};

use super::{
    encoder::{DocumentMetadata, DocumentVersion, DocumentVersionType},
    ElasticsearchCommon, ElasticsearchConfig, VersionType,
};

#[derive(Clone, Eq, Hash, PartialEq)]
pub struct PartitionKey {
    pub index: String,
    pub bulk_action: BulkAction,
}

pub struct ElasticsearchSink<S> {
    pub batch_settings: BatcherSettings,
    pub request_builder: ElasticsearchRequestBuilder,
    pub transformer: Transformer,
    pub service: S,
    pub metric_to_log: MetricToLog,
    pub mode: ElasticsearchCommonMode,
    pub id_key_field: Option<ConfigValuePath>,
}

impl<S> ElasticsearchSink<S> {
    pub fn new(
        common: &ElasticsearchCommon,
        config: &ElasticsearchConfig,
        service: S,
    ) -> crate::Result<Self> {
        let batch_settings = config.batch.into_batcher_settings()?;

        Ok(ElasticsearchSink {
            batch_settings,
            request_builder: common.request_builder.clone(),
            transformer: config.encoding.clone(),
            service,
            metric_to_log: common.metric_to_log.clone(),
            mode: common.mode.clone(),
            id_key_field: config.id_key.clone(),
        })
    }
}

impl<S> ElasticsearchSink<S>
where
    S: Service<ElasticsearchRequest> + Send + 'static,
    S::Future: Send + 'static,
    S::Response: DriverResponse + Send + 'static,
    S::Error: fmt::Debug + Into<crate::Error> + Send,
{
    pub async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
        let mode = self.mode;
        let id_key_field = self.id_key_field.as_ref();
        let transformer = self.transformer.clone();

        input
            .scan(self.metric_to_log, |metric_to_log, event| {
                future::ready(Some(match event {
                    Event::Metric(metric) => metric_to_log.transform_one(metric),
                    Event::Log(log) => Some(log),
                    Event::Trace(_) => {
                        // Although technically this will cause the event to be dropped, due to the sink
                        // config it is not possible to send traces to this sink - so this situation can
                        // never occur. We don't need to emit an `EventsDropped` event.
                        None
                    }
                }))
            })
            .filter_map(|x| async move { x })
            .filter_map(move |log| {
                future::ready(process_log(log, &mode, id_key_field, &transformer))
            })
            .batched(self.batch_settings.as_byte_size_config())
            .request_builder(
                default_request_builder_concurrency_limit(),
                self.request_builder,
            )
            .filter_map(|request| async move {
                match request {
                    Err(error) => {
                        emit!(SinkRequestBuildError { error });
                        None
                    }
                    Ok(req) => Some(req),
                }
            })
            .into_driver(self.service)
            .run()
            .await
    }
}

/// Any `None` values returned from this function will already result in a `TemplateRenderingError`
/// being emitted, so no further `EventsDropped` event needs emitting.
pub(super) fn process_log(
    mut log: LogEvent,
    mode: &ElasticsearchCommonMode,
    id_key_field: Option<&ConfigValuePath>,
    transformer: &Transformer,
) -> Option<ProcessedEvent> {
    let index = mode.index(&log)?;
    let bulk_action = mode.bulk_action(&log)?;

    if let Some(cfg) = mode.as_data_stream_config() {
        cfg.sync_fields(&mut log);
        cfg.remap_timestamp(&mut log);
    };
    let id = if let Some(Value::Bytes(key)) =
        id_key_field.and_then(|key| log.remove((PathPrefix::Event, key)))
    {
        Some(String::from_utf8_lossy(&key).into_owned())
    } else {
        None
    };
    let document_metadata = match (id.clone(), mode.version_type(), mode.version(&log)) {
        (None, _, _) => DocumentMetadata::WithoutId,
        (Some(id), None, None) | (Some(id), None, Some(_)) | (Some(id), Some(_), None) => {
            DocumentMetadata::Id(id)
        }
        (Some(id), Some(version_type), Some(version)) => match version_type {
            VersionType::Internal => DocumentMetadata::Id(id),
            VersionType::External => DocumentMetadata::IdAndVersion(
                id,
                DocumentVersion {
                    kind: DocumentVersionType::External,
                    value: version,
                },
            ),
            VersionType::ExternalGte => DocumentMetadata::IdAndVersion(
                id,
                DocumentVersion {
                    kind: DocumentVersionType::ExternalGte,
                    value: version,
                },
            ),
        },
    };
    let log = {
        let mut event = Event::from(log);
        transformer.transform(&mut event);
        event.into_log()
    };
    Some(ProcessedEvent {
        index,
        bulk_action,
        log,
        document_metadata,
    })
}

#[async_trait]
impl<S> StreamSink<Event> for ElasticsearchSink<S>
where
    S: Service<ElasticsearchRequest> + Send + 'static,
    S::Future: Send + 'static,
    S::Response: DriverResponse + Send + 'static,
    S::Error: fmt::Debug + Into<crate::Error> + Send,
{
    async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
        self.run_inner(input).await
    }
}