1use std::net::SocketAddr;
3
4use chrono::Utc;
5use futures::TryFutureExt;
6use tonic::{Request, Response, Status, transport::server::RoutesBuilder};
7use tonic_health::server::health_reporter;
8use vector_lib::{
9 EstimatedJsonEncodedSizeOf,
10 codecs::NativeDeserializerConfig,
11 config::LogNamespace,
12 configurable::configurable_component,
13 event::{BatchNotifier, BatchStatus, BatchStatusReceiver, Event},
14 internal_event::{CountByteSize, InternalEventHandle as _},
15};
16
17use crate::{
18 SourceSender,
19 config::{
20 DataType, GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig,
21 SourceContext, SourceOutput,
22 },
23 internal_events::{EventsReceived, StreamClosedError},
24 proto::vector as proto,
25 serde::bool_or_struct,
26 sources::{Source, util::grpc::run_grpc_server_with_routes},
27 tls::{MaybeTlsSettings, TlsEnableableConfig},
28};
29
30#[configurable_component]
32#[derive(Clone, Debug)]
33enum VectorConfigVersion {
34 #[serde(rename = "2")]
36 V2,
37}
38
39#[derive(Debug, Clone)]
40struct Service {
41 pipeline: SourceSender,
42 acknowledgements: bool,
43 log_namespace: LogNamespace,
44}
45
46#[tonic::async_trait]
47impl proto::Service for Service {
48 async fn push_events(
49 &self,
50 request: Request<proto::PushEventsRequest>,
51 ) -> Result<Response<proto::PushEventsResponse>, Status> {
52 let mut events: Vec<Event> = request
53 .into_inner()
54 .events
55 .into_iter()
56 .map(Event::from)
57 .collect();
58
59 let now = Utc::now();
60 for event in &mut events {
61 if let Event::Log(log) = event {
62 self.log_namespace.insert_standard_vector_source_metadata(
63 log,
64 VectorConfig::NAME,
65 now,
66 );
67 }
68 }
69
70 let count = events.len();
71 let byte_size = events.estimated_json_encoded_size_of();
72 let events_received = register!(EventsReceived);
73 events_received.emit(CountByteSize(count, byte_size));
74
75 let receiver = BatchNotifier::maybe_apply_to(self.acknowledgements, &mut events);
76
77 self.pipeline
78 .clone()
79 .send_batch(events)
80 .map_err(|error| {
81 let message = error.to_string();
82 emit!(StreamClosedError { count });
83 Status::unavailable(message)
84 })
85 .and_then(|_| handle_batch_status(receiver))
86 .await?;
87
88 Ok(Response::new(proto::PushEventsResponse {}))
89 }
90
91 async fn health_check(
93 &self,
94 _: Request<proto::HealthCheckRequest>,
95 ) -> Result<Response<proto::HealthCheckResponse>, Status> {
96 let message = proto::HealthCheckResponse {
97 status: proto::ServingStatus::Serving.into(),
98 };
99
100 Ok(Response::new(message))
101 }
102}
103
104async fn handle_batch_status(receiver: Option<BatchStatusReceiver>) -> Result<(), Status> {
105 let status = match receiver {
106 Some(receiver) => receiver.await,
107 None => BatchStatus::Delivered,
108 };
109
110 match status {
111 BatchStatus::Errored => Err(Status::internal("Delivery error")),
112 BatchStatus::Rejected => Err(Status::data_loss("Delivery failed")),
113 BatchStatus::Delivered => Ok(()),
114 }
115}
116
117#[configurable_component(source("vector", "Collect observability data from a Vector instance."))]
119#[derive(Clone, Debug)]
120#[serde(deny_unknown_fields)]
121pub struct VectorConfig {
122 version: Option<VectorConfigVersion>,
124
125 pub address: SocketAddr,
129
130 #[configurable(derived)]
131 #[serde(default)]
132 tls: Option<TlsEnableableConfig>,
133
134 #[configurable(derived)]
135 #[serde(default, deserialize_with = "bool_or_struct")]
136 acknowledgements: SourceAcknowledgementsConfig,
137
138 #[serde(default)]
140 #[configurable(metadata(docs::hidden))]
141 pub log_namespace: Option<bool>,
142}
143
144impl VectorConfig {
145 pub fn from_address(addr: SocketAddr) -> Self {
147 Self {
148 address: addr,
149 ..Default::default()
150 }
151 }
152}
153
154impl Default for VectorConfig {
155 fn default() -> Self {
156 Self {
157 version: None,
158 address: "0.0.0.0:6000".parse().unwrap(),
159 tls: None,
160 acknowledgements: Default::default(),
161 log_namespace: None,
162 }
163 }
164}
165
166impl GenerateConfig for VectorConfig {
167 fn generate_config() -> toml::Value {
168 toml::Value::try_from(VectorConfig::default()).unwrap()
169 }
170}
171
172#[async_trait::async_trait]
173#[typetag::serde(name = "vector")]
174impl SourceConfig for VectorConfig {
175 async fn build(&self, cx: SourceContext) -> crate::Result<Source> {
176 let tls_settings = MaybeTlsSettings::from_config(self.tls.as_ref(), true)?;
177 let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
178 let log_namespace = cx.log_namespace(self.log_namespace);
179
180 let vector_service = proto::Server::new(Service {
182 pipeline: cx.out,
183 acknowledgements,
184 log_namespace,
185 })
186 .accept_compressed(tonic::codec::CompressionEncoding::Gzip)
187 .max_decoding_message_size(usize::MAX);
189
190 let (mut health_reporter, health_service) = health_reporter();
192
193 health_reporter
195 .set_service_status("vector.Vector", tonic_health::ServingStatus::Serving)
196 .await;
197
198 let mut builder = RoutesBuilder::default();
200 builder
201 .add_service(health_service)
202 .add_service(vector_service);
203
204 let source =
205 run_grpc_server_with_routes(self.address, tls_settings, builder.routes(), cx.shutdown)
206 .map_err(|error| {
207 error!(message = "Source future failed.", %error);
208 });
209
210 Ok(Box::pin(source))
211 }
212
213 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
214 let log_namespace = global_log_namespace.merge(self.log_namespace);
215
216 let schema_definition = NativeDeserializerConfig
217 .schema_definition(log_namespace)
218 .with_standard_vector_source_metadata();
219
220 vec![SourceOutput::new_maybe_logs(
221 DataType::all_bits(),
222 schema_definition,
223 )]
224 }
225
226 fn resources(&self) -> Vec<Resource> {
227 vec![Resource::tcp(self.address)]
228 }
229
230 fn can_acknowledge(&self) -> bool {
231 true
232 }
233}
234
235#[cfg(test)]
236mod test {
237 use vector_lib::{config::LogNamespace, lookup::owned_value_path, schema::Definition};
238 use vrl::value::{Kind, kind::Collection};
239
240 use super::VectorConfig;
241 use crate::config::SourceConfig;
242
243 #[test]
244 fn generate_config() {
245 crate::test_util::test_generate_config::<super::VectorConfig>();
246 }
247
248 #[test]
249 fn output_schema_definition_vector_namespace() {
250 let config = VectorConfig::default();
251
252 let definitions = config
253 .outputs(LogNamespace::Vector)
254 .remove(0)
255 .schema_definition(true);
256
257 let expected_definition =
258 Definition::new_with_default_metadata(Kind::any(), [LogNamespace::Vector])
259 .with_metadata_field(
260 &owned_value_path!("vector", "source_type"),
261 Kind::bytes(),
262 None,
263 )
264 .with_metadata_field(
265 &owned_value_path!("vector", "ingest_timestamp"),
266 Kind::timestamp(),
267 None,
268 );
269
270 assert_eq!(definitions, Some(expected_definition))
271 }
272
273 #[test]
274 fn output_schema_definition_legacy_namespace() {
275 let config = VectorConfig::default();
276
277 let definitions = config
278 .outputs(LogNamespace::Legacy)
279 .remove(0)
280 .schema_definition(true);
281
282 let expected_definition = Definition::new_with_default_metadata(
283 Kind::object(Collection::empty()),
284 [LogNamespace::Legacy],
285 )
286 .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
287 .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None);
288
289 assert_eq!(definitions, Some(expected_definition))
290 }
291}
292
293#[cfg(feature = "sinks-vector")]
294#[cfg(test)]
295mod tests {
296 use vector_lib::{assert_event_data_eq, config::log_schema};
297
298 use super::*;
299 use crate::{
300 SourceSender,
301 config::{SinkConfig as _, SinkContext},
302 sinks::vector::VectorConfig as SinkConfig,
303 test_util,
304 };
305
306 async fn run_test(vector_source_config_str: &str, addr: SocketAddr) {
307 let config = format!(r#"address = "{addr}""#);
308 let source: VectorConfig = toml::from_str(&config).unwrap();
309
310 let (tx, rx) = SourceSender::new_test();
311 let server = source
312 .build(SourceContext::new_test(tx, None))
313 .await
314 .unwrap();
315 tokio::spawn(server);
316 test_util::wait_for_tcp(addr).await;
317
318 let sink: SinkConfig = toml::from_str(vector_source_config_str).unwrap();
322 let cx = SinkContext::default();
323 let (sink, _) = sink.build(cx).await.unwrap();
324
325 let (mut events, stream) = test_util::random_events_with_stream(100, 100, None);
326 sink.run(stream).await.unwrap();
327
328 for event in &mut events {
329 event.as_mut_log().insert(
330 log_schema().source_type_key_target_path().unwrap(),
331 "vector",
332 );
333 }
334
335 let output = test_util::collect_ready(rx).await;
336 assert_event_data_eq!(events, output);
337 }
338
339 #[tokio::test]
340 async fn receive_message() {
341 let (_guard, addr) = test_util::addr::next_addr();
342
343 let config = format!(r#"address = "{addr}""#);
344 run_test(&config, addr).await;
345 }
346
347 #[tokio::test]
348 async fn receive_compressed_message() {
349 let (_guard, addr) = test_util::addr::next_addr();
350
351 let config = format!(
352 r#"address = "{addr}"
353 compression=true"#
354 );
355 run_test(&config, addr).await;
356 }
357
358 #[tokio::test]
359 async fn custom_health_check_works() {
360 use tonic::transport::Channel;
361
362 let (_guard, addr) = test_util::addr::next_addr();
363
364 let config = format!(r#"address = "{addr}""#);
365 let source: VectorConfig = toml::from_str(&config).unwrap();
366
367 let (tx, _rx) = SourceSender::new_test();
368 let server = source
369 .build(SourceContext::new_test(tx, None))
370 .await
371 .unwrap();
372 tokio::spawn(server);
373 test_util::wait_for_tcp(addr).await;
374
375 let endpoint = format!("http://{addr}");
377 let channel = Channel::from_shared(endpoint)
378 .unwrap()
379 .connect()
380 .await
381 .unwrap();
382
383 let mut client = proto::Client::new(channel);
384 let response = client
385 .health_check(proto::HealthCheckRequest {})
386 .await
387 .unwrap();
388
389 assert_eq!(
390 response.into_inner().status,
391 proto::ServingStatus::Serving as i32
392 );
393 }
394
395 #[tokio::test]
396 async fn standard_grpc_health_check_works() {
397 use tonic::transport::Channel;
398 use tonic_health::pb::{HealthCheckRequest, health_client::HealthClient};
399
400 let (_guard, addr) = test_util::addr::next_addr();
401
402 let config = format!(r#"address = "{addr}""#);
403 let source: VectorConfig = toml::from_str(&config).unwrap();
404
405 let (tx, _rx) = SourceSender::new_test();
406 let server = source
407 .build(SourceContext::new_test(tx, None))
408 .await
409 .unwrap();
410 tokio::spawn(server);
411 test_util::wait_for_tcp(addr).await;
412
413 let endpoint = format!("http://{addr}");
415 let channel = Channel::from_shared(endpoint)
416 .unwrap()
417 .connect()
418 .await
419 .unwrap();
420
421 let mut client = HealthClient::new(channel);
422
423 let response = client
425 .check(HealthCheckRequest {
426 service: String::new(),
427 })
428 .await
429 .unwrap();
430
431 use tonic_health::pb::health_check_response::ServingStatus;
432 assert_eq!(response.into_inner().status, ServingStatus::Serving as i32);
433
434 let response = client
436 .check(HealthCheckRequest {
437 service: "vector.Vector".to_string(),
438 })
439 .await
440 .unwrap();
441
442 assert_eq!(response.into_inner().status, ServingStatus::Serving as i32);
443 }
444}