vector/sinks/azure_monitor_logs/
sink.rs

1use std::{fmt::Debug, io};
2
3use bytes::Bytes;
4use vector_lib::{
5    codecs::{CharacterDelimitedEncoder, JsonSerializerConfig, encoding::Framer},
6    lookup::{OwnedValuePath, PathPrefix},
7};
8
9use super::service::AzureMonitorLogsRequest;
10use crate::sinks::prelude::*;
11
12pub struct AzureMonitorLogsSink<S> {
13    batch_settings: BatcherSettings,
14    encoding: JsonEncoding,
15    service: S,
16    protocol: String,
17}
18
19impl<S> AzureMonitorLogsSink<S>
20where
21    S: Service<AzureMonitorLogsRequest> + Send + 'static,
22    S::Future: Send + 'static,
23    S::Response: DriverResponse + Send + 'static,
24    S::Error: Debug + Into<crate::Error> + Send,
25{
26    pub fn new(
27        batch_settings: BatcherSettings,
28        transformer: Transformer,
29        service: S,
30        time_generated_key: Option<OwnedValuePath>,
31        protocol: String,
32    ) -> Self {
33        Self {
34            batch_settings,
35            encoding: JsonEncoding::new(transformer, time_generated_key),
36            service,
37            protocol,
38        }
39    }
40
41    async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
42        input
43            .batched(self.batch_settings.as_byte_size_config())
44            .request_builder(
45                default_request_builder_concurrency_limit(),
46                AzureMonitorLogsRequestBuilder {
47                    encoding: self.encoding,
48                },
49            )
50            .filter_map(|request| async {
51                match request {
52                    Err(error) => {
53                        emit!(SinkRequestBuildError { error });
54                        None
55                    }
56                    Ok(req) => Some(req),
57                }
58            })
59            .into_driver(self.service)
60            .protocol(self.protocol.clone())
61            .run()
62            .await
63    }
64}
65
66#[async_trait::async_trait]
67impl<S> StreamSink<Event> for AzureMonitorLogsSink<S>
68where
69    S: Service<AzureMonitorLogsRequest> + Send + 'static,
70    S::Future: Send + 'static,
71    S::Response: DriverResponse + Send + 'static,
72    S::Error: Debug + Into<crate::Error> + Send,
73{
74    async fn run(
75        self: Box<Self>,
76        input: futures_util::stream::BoxStream<'_, Event>,
77    ) -> Result<(), ()> {
78        self.run_inner(input).await
79    }
80}
81
82/// Customized encoding specific to the Azure Monitor Logs sink, as the API does not support full
83/// 9-digit nanosecond precision timestamps.
84#[derive(Clone, Debug)]
85pub(super) struct JsonEncoding {
86    time_generated_key: Option<OwnedValuePath>,
87    encoder: (Transformer, Encoder<Framer>),
88}
89
90impl JsonEncoding {
91    pub fn new(transformer: Transformer, time_generated_key: Option<OwnedValuePath>) -> Self {
92        Self {
93            time_generated_key,
94            encoder: (
95                transformer,
96                Encoder::<Framer>::new(
97                    CharacterDelimitedEncoder::new(b',').into(),
98                    JsonSerializerConfig::default().build().into(),
99                ),
100            ),
101        }
102    }
103}
104
105impl crate::sinks::util::encoding::Encoder<Vec<Event>> for JsonEncoding {
106    fn encode_input(
107        &self,
108        mut input: Vec<Event>,
109        writer: &mut dyn io::Write,
110    ) -> io::Result<(usize, GroupedCountByteSize)> {
111        for event in input.iter_mut() {
112            let log = event.as_mut_log();
113
114            // `.remove_timestamp()` will return the `timestamp` value regardless of location in Event or
115            // Metadata, the following `insert()` ensures it's encoded in the request.
116            let timestamp = match log.remove_timestamp() {
117                Some(Value::Timestamp(ts)) => ts,
118                _ => chrono::Utc::now(),
119            };
120
121            if let Some(timestamp_key) = &self.time_generated_key {
122                log.insert(
123                    (PathPrefix::Event, timestamp_key),
124                    serde_json::Value::String(
125                        timestamp.to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
126                    ),
127                );
128            }
129        }
130
131        self.encoder.encode_input(input, writer)
132    }
133}
134
135struct AzureMonitorLogsRequestBuilder {
136    encoding: JsonEncoding,
137}
138
139impl RequestBuilder<Vec<Event>> for AzureMonitorLogsRequestBuilder {
140    type Metadata = EventFinalizers;
141    type Events = Vec<Event>;
142    type Encoder = JsonEncoding;
143    type Payload = Bytes;
144    type Request = AzureMonitorLogsRequest;
145    type Error = std::io::Error;
146
147    fn compression(&self) -> Compression {
148        Compression::None
149    }
150
151    fn encoder(&self) -> &Self::Encoder {
152        &self.encoding
153    }
154
155    fn split_input(
156        &self,
157        mut events: Vec<Event>,
158    ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
159        let finalizers = events.take_finalizers();
160        let builder = RequestMetadataBuilder::from_events(&events);
161        (finalizers, builder, events)
162    }
163
164    fn build_request(
165        &self,
166        finalizers: Self::Metadata,
167        request_metadata: RequestMetadata,
168        payload: EncodeResult<Self::Payload>,
169    ) -> Self::Request {
170        AzureMonitorLogsRequest {
171            body: payload.into_payload(),
172            finalizers,
173            metadata: request_metadata,
174        }
175    }
176}