vector/sinks/azure_monitor_logs/
sink.rs1use std::{fmt::Debug, io};
2
3use bytes::Bytes;
4use vector_lib::{
5 codecs::{CharacterDelimitedEncoder, JsonSerializerConfig, encoding::Framer},
6 lookup::{OwnedValuePath, PathPrefix},
7};
8
9use super::service::AzureMonitorLogsRequest;
10use crate::sinks::prelude::*;
11
12pub struct AzureMonitorLogsSink<S> {
13 batch_settings: BatcherSettings,
14 encoding: JsonEncoding,
15 service: S,
16 protocol: String,
17}
18
19impl<S> AzureMonitorLogsSink<S>
20where
21 S: Service<AzureMonitorLogsRequest> + Send + 'static,
22 S::Future: Send + 'static,
23 S::Response: DriverResponse + Send + 'static,
24 S::Error: Debug + Into<crate::Error> + Send,
25{
26 pub fn new(
27 batch_settings: BatcherSettings,
28 transformer: Transformer,
29 service: S,
30 time_generated_key: Option<OwnedValuePath>,
31 protocol: String,
32 ) -> Self {
33 Self {
34 batch_settings,
35 encoding: JsonEncoding::new(transformer, time_generated_key),
36 service,
37 protocol,
38 }
39 }
40
41 async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
42 input
43 .batched(self.batch_settings.as_byte_size_config())
44 .request_builder(
45 default_request_builder_concurrency_limit(),
46 AzureMonitorLogsRequestBuilder {
47 encoding: self.encoding,
48 },
49 )
50 .filter_map(|request| async {
51 match request {
52 Err(error) => {
53 emit!(SinkRequestBuildError { error });
54 None
55 }
56 Ok(req) => Some(req),
57 }
58 })
59 .into_driver(self.service)
60 .protocol(self.protocol.clone())
61 .run()
62 .await
63 }
64}
65
66#[async_trait::async_trait]
67impl<S> StreamSink<Event> for AzureMonitorLogsSink<S>
68where
69 S: Service<AzureMonitorLogsRequest> + Send + 'static,
70 S::Future: Send + 'static,
71 S::Response: DriverResponse + Send + 'static,
72 S::Error: Debug + Into<crate::Error> + Send,
73{
74 async fn run(
75 self: Box<Self>,
76 input: futures_util::stream::BoxStream<'_, Event>,
77 ) -> Result<(), ()> {
78 self.run_inner(input).await
79 }
80}
81
82#[derive(Clone, Debug)]
85pub(super) struct JsonEncoding {
86 time_generated_key: Option<OwnedValuePath>,
87 encoder: (Transformer, Encoder<Framer>),
88}
89
90impl JsonEncoding {
91 pub fn new(transformer: Transformer, time_generated_key: Option<OwnedValuePath>) -> Self {
92 Self {
93 time_generated_key,
94 encoder: (
95 transformer,
96 Encoder::<Framer>::new(
97 CharacterDelimitedEncoder::new(b',').into(),
98 JsonSerializerConfig::default().build().into(),
99 ),
100 ),
101 }
102 }
103}
104
105impl crate::sinks::util::encoding::Encoder<Vec<Event>> for JsonEncoding {
106 fn encode_input(
107 &self,
108 mut input: Vec<Event>,
109 writer: &mut dyn io::Write,
110 ) -> io::Result<(usize, GroupedCountByteSize)> {
111 for event in input.iter_mut() {
112 let log = event.as_mut_log();
113
114 let timestamp = match log.remove_timestamp() {
117 Some(Value::Timestamp(ts)) => ts,
118 _ => chrono::Utc::now(),
119 };
120
121 if let Some(timestamp_key) = &self.time_generated_key {
122 log.insert(
123 (PathPrefix::Event, timestamp_key),
124 serde_json::Value::String(
125 timestamp.to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
126 ),
127 );
128 }
129 }
130
131 self.encoder.encode_input(input, writer)
132 }
133}
134
135struct AzureMonitorLogsRequestBuilder {
136 encoding: JsonEncoding,
137}
138
139impl RequestBuilder<Vec<Event>> for AzureMonitorLogsRequestBuilder {
140 type Metadata = EventFinalizers;
141 type Events = Vec<Event>;
142 type Encoder = JsonEncoding;
143 type Payload = Bytes;
144 type Request = AzureMonitorLogsRequest;
145 type Error = std::io::Error;
146
147 fn compression(&self) -> Compression {
148 Compression::None
149 }
150
151 fn encoder(&self) -> &Self::Encoder {
152 &self.encoding
153 }
154
155 fn split_input(
156 &self,
157 mut events: Vec<Event>,
158 ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
159 let finalizers = events.take_finalizers();
160 let builder = RequestMetadataBuilder::from_events(&events);
161 (finalizers, builder, events)
162 }
163
164 fn build_request(
165 &self,
166 finalizers: Self::Metadata,
167 request_metadata: RequestMetadata,
168 payload: EncodeResult<Self::Payload>,
169 ) -> Self::Request {
170 AzureMonitorLogsRequest {
171 body: payload.into_payload(),
172 finalizers,
173 metadata: request_metadata,
174 }
175 }
176}