vector/sinks/azure_monitor_logs/
sink.rs1use std::{fmt::Debug, io};
2
3use bytes::Bytes;
4use vector_lib::codecs::{encoding::Framer, CharacterDelimitedEncoder, JsonSerializerConfig};
5use vector_lib::lookup::{OwnedValuePath, PathPrefix};
6
7use crate::sinks::prelude::*;
8
9use super::service::AzureMonitorLogsRequest;
10
11pub struct AzureMonitorLogsSink<S> {
12 batch_settings: BatcherSettings,
13 encoding: JsonEncoding,
14 service: S,
15 protocol: String,
16}
17
18impl<S> AzureMonitorLogsSink<S>
19where
20 S: Service<AzureMonitorLogsRequest> + Send + 'static,
21 S::Future: Send + 'static,
22 S::Response: DriverResponse + Send + 'static,
23 S::Error: Debug + Into<crate::Error> + Send,
24{
25 pub fn new(
26 batch_settings: BatcherSettings,
27 transformer: Transformer,
28 service: S,
29 time_generated_key: Option<OwnedValuePath>,
30 protocol: String,
31 ) -> Self {
32 Self {
33 batch_settings,
34 encoding: JsonEncoding::new(transformer, time_generated_key),
35 service,
36 protocol,
37 }
38 }
39
40 async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
41 input
42 .batched(self.batch_settings.as_byte_size_config())
43 .request_builder(
44 default_request_builder_concurrency_limit(),
45 AzureMonitorLogsRequestBuilder {
46 encoding: self.encoding,
47 },
48 )
49 .filter_map(|request| async {
50 match request {
51 Err(error) => {
52 emit!(SinkRequestBuildError { error });
53 None
54 }
55 Ok(req) => Some(req),
56 }
57 })
58 .into_driver(self.service)
59 .protocol(self.protocol.clone())
60 .run()
61 .await
62 }
63}
64
65#[async_trait::async_trait]
66impl<S> StreamSink<Event> for AzureMonitorLogsSink<S>
67where
68 S: Service<AzureMonitorLogsRequest> + Send + 'static,
69 S::Future: Send + 'static,
70 S::Response: DriverResponse + Send + 'static,
71 S::Error: Debug + Into<crate::Error> + Send,
72{
73 async fn run(
74 self: Box<Self>,
75 input: futures_util::stream::BoxStream<'_, Event>,
76 ) -> Result<(), ()> {
77 self.run_inner(input).await
78 }
79}
80
81#[derive(Clone, Debug)]
84pub(super) struct JsonEncoding {
85 time_generated_key: Option<OwnedValuePath>,
86 encoder: (Transformer, Encoder<Framer>),
87}
88
89impl JsonEncoding {
90 pub fn new(transformer: Transformer, time_generated_key: Option<OwnedValuePath>) -> Self {
91 Self {
92 time_generated_key,
93 encoder: (
94 transformer,
95 Encoder::<Framer>::new(
96 CharacterDelimitedEncoder::new(b',').into(),
97 JsonSerializerConfig::default().build().into(),
98 ),
99 ),
100 }
101 }
102}
103
104impl crate::sinks::util::encoding::Encoder<Vec<Event>> for JsonEncoding {
105 fn encode_input(
106 &self,
107 mut input: Vec<Event>,
108 writer: &mut dyn io::Write,
109 ) -> io::Result<(usize, GroupedCountByteSize)> {
110 for event in input.iter_mut() {
111 let log = event.as_mut_log();
112
113 let timestamp = match log.remove_timestamp() {
116 Some(Value::Timestamp(ts)) => ts,
117 _ => chrono::Utc::now(),
118 };
119
120 if let Some(timestamp_key) = &self.time_generated_key {
121 log.insert(
122 (PathPrefix::Event, timestamp_key),
123 serde_json::Value::String(
124 timestamp.to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
125 ),
126 );
127 }
128 }
129
130 self.encoder.encode_input(input, writer)
131 }
132}
133
134struct AzureMonitorLogsRequestBuilder {
135 encoding: JsonEncoding,
136}
137
138impl RequestBuilder<Vec<Event>> for AzureMonitorLogsRequestBuilder {
139 type Metadata = EventFinalizers;
140 type Events = Vec<Event>;
141 type Encoder = JsonEncoding;
142 type Payload = Bytes;
143 type Request = AzureMonitorLogsRequest;
144 type Error = std::io::Error;
145
146 fn compression(&self) -> Compression {
147 Compression::None
148 }
149
150 fn encoder(&self) -> &Self::Encoder {
151 &self.encoding
152 }
153
154 fn split_input(
155 &self,
156 mut events: Vec<Event>,
157 ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
158 let finalizers = events.take_finalizers();
159 let builder = RequestMetadataBuilder::from_events(&events);
160 (finalizers, builder, events)
161 }
162
163 fn build_request(
164 &self,
165 finalizers: Self::Metadata,
166 request_metadata: RequestMetadata,
167 payload: EncodeResult<Self::Payload>,
168 ) -> Self::Request {
169 AzureMonitorLogsRequest {
170 body: payload.into_payload(),
171 finalizers,
172 metadata: request_metadata,
173 }
174 }
175}