vector/sinks/azure_monitor_logs/
sink.rs

1use std::{fmt::Debug, io};
2
3use bytes::Bytes;
4use vector_lib::codecs::{encoding::Framer, CharacterDelimitedEncoder, JsonSerializerConfig};
5use vector_lib::lookup::{OwnedValuePath, PathPrefix};
6
7use crate::sinks::prelude::*;
8
9use super::service::AzureMonitorLogsRequest;
10
11pub struct AzureMonitorLogsSink<S> {
12    batch_settings: BatcherSettings,
13    encoding: JsonEncoding,
14    service: S,
15    protocol: String,
16}
17
18impl<S> AzureMonitorLogsSink<S>
19where
20    S: Service<AzureMonitorLogsRequest> + Send + 'static,
21    S::Future: Send + 'static,
22    S::Response: DriverResponse + Send + 'static,
23    S::Error: Debug + Into<crate::Error> + Send,
24{
25    pub fn new(
26        batch_settings: BatcherSettings,
27        transformer: Transformer,
28        service: S,
29        time_generated_key: Option<OwnedValuePath>,
30        protocol: String,
31    ) -> Self {
32        Self {
33            batch_settings,
34            encoding: JsonEncoding::new(transformer, time_generated_key),
35            service,
36            protocol,
37        }
38    }
39
40    async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
41        input
42            .batched(self.batch_settings.as_byte_size_config())
43            .request_builder(
44                default_request_builder_concurrency_limit(),
45                AzureMonitorLogsRequestBuilder {
46                    encoding: self.encoding,
47                },
48            )
49            .filter_map(|request| async {
50                match request {
51                    Err(error) => {
52                        emit!(SinkRequestBuildError { error });
53                        None
54                    }
55                    Ok(req) => Some(req),
56                }
57            })
58            .into_driver(self.service)
59            .protocol(self.protocol.clone())
60            .run()
61            .await
62    }
63}
64
65#[async_trait::async_trait]
66impl<S> StreamSink<Event> for AzureMonitorLogsSink<S>
67where
68    S: Service<AzureMonitorLogsRequest> + Send + 'static,
69    S::Future: Send + 'static,
70    S::Response: DriverResponse + Send + 'static,
71    S::Error: Debug + Into<crate::Error> + Send,
72{
73    async fn run(
74        self: Box<Self>,
75        input: futures_util::stream::BoxStream<'_, Event>,
76    ) -> Result<(), ()> {
77        self.run_inner(input).await
78    }
79}
80
81/// Customized encoding specific to the Azure Monitor Logs sink, as the API does not support full
82/// 9-digit nanosecond precision timestamps.
83#[derive(Clone, Debug)]
84pub(super) struct JsonEncoding {
85    time_generated_key: Option<OwnedValuePath>,
86    encoder: (Transformer, Encoder<Framer>),
87}
88
89impl JsonEncoding {
90    pub fn new(transformer: Transformer, time_generated_key: Option<OwnedValuePath>) -> Self {
91        Self {
92            time_generated_key,
93            encoder: (
94                transformer,
95                Encoder::<Framer>::new(
96                    CharacterDelimitedEncoder::new(b',').into(),
97                    JsonSerializerConfig::default().build().into(),
98                ),
99            ),
100        }
101    }
102}
103
104impl crate::sinks::util::encoding::Encoder<Vec<Event>> for JsonEncoding {
105    fn encode_input(
106        &self,
107        mut input: Vec<Event>,
108        writer: &mut dyn io::Write,
109    ) -> io::Result<(usize, GroupedCountByteSize)> {
110        for event in input.iter_mut() {
111            let log = event.as_mut_log();
112
113            // `.remove_timestamp()` will return the `timestamp` value regardless of location in Event or
114            // Metadata, the following `insert()` ensures it's encoded in the request.
115            let timestamp = match log.remove_timestamp() {
116                Some(Value::Timestamp(ts)) => ts,
117                _ => chrono::Utc::now(),
118            };
119
120            if let Some(timestamp_key) = &self.time_generated_key {
121                log.insert(
122                    (PathPrefix::Event, timestamp_key),
123                    serde_json::Value::String(
124                        timestamp.to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
125                    ),
126                );
127            }
128        }
129
130        self.encoder.encode_input(input, writer)
131    }
132}
133
134struct AzureMonitorLogsRequestBuilder {
135    encoding: JsonEncoding,
136}
137
138impl RequestBuilder<Vec<Event>> for AzureMonitorLogsRequestBuilder {
139    type Metadata = EventFinalizers;
140    type Events = Vec<Event>;
141    type Encoder = JsonEncoding;
142    type Payload = Bytes;
143    type Request = AzureMonitorLogsRequest;
144    type Error = std::io::Error;
145
146    fn compression(&self) -> Compression {
147        Compression::None
148    }
149
150    fn encoder(&self) -> &Self::Encoder {
151        &self.encoding
152    }
153
154    fn split_input(
155        &self,
156        mut events: Vec<Event>,
157    ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
158        let finalizers = events.take_finalizers();
159        let builder = RequestMetadataBuilder::from_events(&events);
160        (finalizers, builder, events)
161    }
162
163    fn build_request(
164        &self,
165        finalizers: Self::Metadata,
166        request_metadata: RequestMetadata,
167        payload: EncodeResult<Self::Payload>,
168    ) -> Self::Request {
169        AzureMonitorLogsRequest {
170            body: payload.into_payload(),
171            finalizers,
172            metadata: request_metadata,
173        }
174    }
175}