vector/sources/vector/
mod.rs

1//! The `vector` source. See [VectorConfig].
2use std::net::SocketAddr;
3
4use chrono::Utc;
5use futures::TryFutureExt;
6use tonic::{Request, Response, Status, transport::server::RoutesBuilder};
7use tonic_health::server::health_reporter;
8use vector_lib::{
9    EstimatedJsonEncodedSizeOf,
10    codecs::NativeDeserializerConfig,
11    config::LogNamespace,
12    configurable::configurable_component,
13    event::{BatchNotifier, BatchStatus, BatchStatusReceiver, Event},
14    internal_event::{CountByteSize, InternalEventHandle as _},
15};
16
17use crate::{
18    SourceSender,
19    config::{
20        DataType, GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig,
21        SourceContext, SourceOutput,
22    },
23    internal_events::{EventsReceived, StreamClosedError},
24    proto::vector as proto,
25    serde::bool_or_struct,
26    sources::{Source, util::grpc::run_grpc_server_with_routes},
27    tls::{MaybeTlsSettings, TlsEnableableConfig},
28};
29
30/// Marker type for version two of the configuration for the `vector` source.
31#[configurable_component]
32#[derive(Clone, Debug)]
33enum VectorConfigVersion {
34    /// Marker value for version two.
35    #[serde(rename = "2")]
36    V2,
37}
38
39#[derive(Debug, Clone)]
40struct Service {
41    pipeline: SourceSender,
42    acknowledgements: bool,
43    log_namespace: LogNamespace,
44}
45
46#[tonic::async_trait]
47impl proto::Service for Service {
48    async fn push_events(
49        &self,
50        request: Request<proto::PushEventsRequest>,
51    ) -> Result<Response<proto::PushEventsResponse>, Status> {
52        let mut events: Vec<Event> = request
53            .into_inner()
54            .events
55            .into_iter()
56            .map(Event::from)
57            .collect();
58
59        let now = Utc::now();
60        for event in &mut events {
61            if let Event::Log(log) = event {
62                self.log_namespace.insert_standard_vector_source_metadata(
63                    log,
64                    VectorConfig::NAME,
65                    now,
66                );
67            }
68        }
69
70        let count = events.len();
71        let byte_size = events.estimated_json_encoded_size_of();
72        let events_received = register!(EventsReceived);
73        events_received.emit(CountByteSize(count, byte_size));
74
75        let receiver = BatchNotifier::maybe_apply_to(self.acknowledgements, &mut events);
76
77        self.pipeline
78            .clone()
79            .send_batch(events)
80            .map_err(|error| {
81                let message = error.to_string();
82                emit!(StreamClosedError { count });
83                Status::unavailable(message)
84            })
85            .and_then(|_| handle_batch_status(receiver))
86            .await?;
87
88        Ok(Response::new(proto::PushEventsResponse {}))
89    }
90
91    // TODO: figure out a way to determine if the current Vector instance is "healthy".
92    async fn health_check(
93        &self,
94        _: Request<proto::HealthCheckRequest>,
95    ) -> Result<Response<proto::HealthCheckResponse>, Status> {
96        let message = proto::HealthCheckResponse {
97            status: proto::ServingStatus::Serving.into(),
98        };
99
100        Ok(Response::new(message))
101    }
102}
103
104async fn handle_batch_status(receiver: Option<BatchStatusReceiver>) -> Result<(), Status> {
105    let status = match receiver {
106        Some(receiver) => receiver.await,
107        None => BatchStatus::Delivered,
108    };
109
110    match status {
111        BatchStatus::Errored => Err(Status::internal("Delivery error")),
112        BatchStatus::Rejected => Err(Status::data_loss("Delivery failed")),
113        BatchStatus::Delivered => Ok(()),
114    }
115}
116
117/// Configuration for the `vector` source.
118#[configurable_component(source("vector", "Collect observability data from a Vector instance."))]
119#[derive(Clone, Debug)]
120#[serde(deny_unknown_fields)]
121pub struct VectorConfig {
122    /// Version of the configuration.
123    version: Option<VectorConfigVersion>,
124
125    /// The socket address to listen for connections on.
126    ///
127    /// It _must_ include a port.
128    pub address: SocketAddr,
129
130    #[configurable(derived)]
131    #[serde(default)]
132    tls: Option<TlsEnableableConfig>,
133
134    #[configurable(derived)]
135    #[serde(default, deserialize_with = "bool_or_struct")]
136    acknowledgements: SourceAcknowledgementsConfig,
137
138    /// The namespace to use for logs. This overrides the global setting.
139    #[serde(default)]
140    #[configurable(metadata(docs::hidden))]
141    pub log_namespace: Option<bool>,
142}
143
144impl VectorConfig {
145    /// Creates a `VectorConfig` with the given address.
146    pub fn from_address(addr: SocketAddr) -> Self {
147        Self {
148            address: addr,
149            ..Default::default()
150        }
151    }
152}
153
154impl Default for VectorConfig {
155    fn default() -> Self {
156        Self {
157            version: None,
158            address: "0.0.0.0:6000".parse().unwrap(),
159            tls: None,
160            acknowledgements: Default::default(),
161            log_namespace: None,
162        }
163    }
164}
165
166impl GenerateConfig for VectorConfig {
167    fn generate_config() -> toml::Value {
168        toml::Value::try_from(VectorConfig::default()).unwrap()
169    }
170}
171
172#[async_trait::async_trait]
173#[typetag::serde(name = "vector")]
174impl SourceConfig for VectorConfig {
175    async fn build(&self, cx: SourceContext) -> crate::Result<Source> {
176        let tls_settings = MaybeTlsSettings::from_config(self.tls.as_ref(), true)?;
177        let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
178        let log_namespace = cx.log_namespace(self.log_namespace);
179
180        // Create the custom Vector service (existing)
181        let vector_service = proto::Server::new(Service {
182            pipeline: cx.out,
183            acknowledgements,
184            log_namespace,
185        })
186        .accept_compressed(tonic::codec::CompressionEncoding::Gzip)
187        // Tonic added a default of 4MB in 0.9. This replaces the old behavior.
188        .max_decoding_message_size(usize::MAX);
189
190        // Create the standard gRPC health service
191        let (mut health_reporter, health_service) = health_reporter();
192
193        // Register the Vector service as serving in the health reporter
194        health_reporter
195            .set_service_status("vector.Vector", tonic_health::ServingStatus::Serving)
196            .await;
197
198        // Combine both services using RoutesBuilder
199        let mut builder = RoutesBuilder::default();
200        builder
201            .add_service(health_service)
202            .add_service(vector_service);
203
204        let source =
205            run_grpc_server_with_routes(self.address, tls_settings, builder.routes(), cx.shutdown)
206                .map_err(|error| {
207                    error!(message = "Source future failed.", %error);
208                });
209
210        Ok(Box::pin(source))
211    }
212
213    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
214        let log_namespace = global_log_namespace.merge(self.log_namespace);
215
216        let schema_definition = NativeDeserializerConfig
217            .schema_definition(log_namespace)
218            .with_standard_vector_source_metadata();
219
220        vec![SourceOutput::new_maybe_logs(
221            DataType::all_bits(),
222            schema_definition,
223        )]
224    }
225
226    fn resources(&self) -> Vec<Resource> {
227        vec![Resource::tcp(self.address)]
228    }
229
230    fn can_acknowledge(&self) -> bool {
231        true
232    }
233}
234
235#[cfg(test)]
236mod test {
237    use vector_lib::{config::LogNamespace, lookup::owned_value_path, schema::Definition};
238    use vrl::value::{Kind, kind::Collection};
239
240    use super::VectorConfig;
241    use crate::config::SourceConfig;
242
243    #[test]
244    fn generate_config() {
245        crate::test_util::test_generate_config::<super::VectorConfig>();
246    }
247
248    #[test]
249    fn output_schema_definition_vector_namespace() {
250        let config = VectorConfig::default();
251
252        let definitions = config
253            .outputs(LogNamespace::Vector)
254            .remove(0)
255            .schema_definition(true);
256
257        let expected_definition =
258            Definition::new_with_default_metadata(Kind::any(), [LogNamespace::Vector])
259                .with_metadata_field(
260                    &owned_value_path!("vector", "source_type"),
261                    Kind::bytes(),
262                    None,
263                )
264                .with_metadata_field(
265                    &owned_value_path!("vector", "ingest_timestamp"),
266                    Kind::timestamp(),
267                    None,
268                );
269
270        assert_eq!(definitions, Some(expected_definition))
271    }
272
273    #[test]
274    fn output_schema_definition_legacy_namespace() {
275        let config = VectorConfig::default();
276
277        let definitions = config
278            .outputs(LogNamespace::Legacy)
279            .remove(0)
280            .schema_definition(true);
281
282        let expected_definition = Definition::new_with_default_metadata(
283            Kind::object(Collection::empty()),
284            [LogNamespace::Legacy],
285        )
286        .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
287        .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None);
288
289        assert_eq!(definitions, Some(expected_definition))
290    }
291}
292
293#[cfg(feature = "sinks-vector")]
294#[cfg(test)]
295mod tests {
296    use vector_lib::{assert_event_data_eq, config::log_schema};
297
298    use super::*;
299    use crate::{
300        SourceSender,
301        config::{SinkConfig as _, SinkContext},
302        sinks::vector::VectorConfig as SinkConfig,
303        test_util,
304    };
305
306    async fn run_test(vector_source_config_str: &str, addr: SocketAddr) {
307        let config = format!(r#"address = "{addr}""#);
308        let source: VectorConfig = toml::from_str(&config).unwrap();
309
310        let (tx, rx) = SourceSender::new_test();
311        let server = source
312            .build(SourceContext::new_test(tx, None))
313            .await
314            .unwrap();
315        tokio::spawn(server);
316        test_util::wait_for_tcp(addr).await;
317
318        // Ideally, this would be a fully custom agent to send the data,
319        // but the sink side already does such a test and this is good
320        // to ensure interoperability.
321        let sink: SinkConfig = toml::from_str(vector_source_config_str).unwrap();
322        let cx = SinkContext::default();
323        let (sink, _) = sink.build(cx).await.unwrap();
324
325        let (mut events, stream) = test_util::random_events_with_stream(100, 100, None);
326        sink.run(stream).await.unwrap();
327
328        for event in &mut events {
329            event.as_mut_log().insert(
330                log_schema().source_type_key_target_path().unwrap(),
331                "vector",
332            );
333        }
334
335        let output = test_util::collect_ready(rx).await;
336        assert_event_data_eq!(events, output);
337    }
338
339    #[tokio::test]
340    async fn receive_message() {
341        let (_guard, addr) = test_util::addr::next_addr();
342
343        let config = format!(r#"address = "{addr}""#);
344        run_test(&config, addr).await;
345    }
346
347    #[tokio::test]
348    async fn receive_compressed_message() {
349        let (_guard, addr) = test_util::addr::next_addr();
350
351        let config = format!(
352            r#"address = "{addr}"
353            compression=true"#
354        );
355        run_test(&config, addr).await;
356    }
357
358    #[tokio::test]
359    async fn custom_health_check_works() {
360        use tonic::transport::Channel;
361
362        let (_guard, addr) = test_util::addr::next_addr();
363
364        let config = format!(r#"address = "{addr}""#);
365        let source: VectorConfig = toml::from_str(&config).unwrap();
366
367        let (tx, _rx) = SourceSender::new_test();
368        let server = source
369            .build(SourceContext::new_test(tx, None))
370            .await
371            .unwrap();
372        tokio::spawn(server);
373        test_util::wait_for_tcp(addr).await;
374
375        // Test the custom Vector health check endpoint
376        let endpoint = format!("http://{addr}");
377        let channel = Channel::from_shared(endpoint)
378            .unwrap()
379            .connect()
380            .await
381            .unwrap();
382
383        let mut client = proto::Client::new(channel);
384        let response = client
385            .health_check(proto::HealthCheckRequest {})
386            .await
387            .unwrap();
388
389        assert_eq!(
390            response.into_inner().status,
391            proto::ServingStatus::Serving as i32
392        );
393    }
394
395    #[tokio::test]
396    async fn standard_grpc_health_check_works() {
397        use tonic::transport::Channel;
398        use tonic_health::pb::{HealthCheckRequest, health_client::HealthClient};
399
400        let (_guard, addr) = test_util::addr::next_addr();
401
402        let config = format!(r#"address = "{addr}""#);
403        let source: VectorConfig = toml::from_str(&config).unwrap();
404
405        let (tx, _rx) = SourceSender::new_test();
406        let server = source
407            .build(SourceContext::new_test(tx, None))
408            .await
409            .unwrap();
410        tokio::spawn(server);
411        test_util::wait_for_tcp(addr).await;
412
413        // Test the standard gRPC health check protocol
414        let endpoint = format!("http://{addr}");
415        let channel = Channel::from_shared(endpoint)
416            .unwrap()
417            .connect()
418            .await
419            .unwrap();
420
421        let mut client = HealthClient::new(channel);
422
423        // Check aggregate server health (empty service string)
424        let response = client
425            .check(HealthCheckRequest {
426                service: String::new(),
427            })
428            .await
429            .unwrap();
430
431        use tonic_health::pb::health_check_response::ServingStatus;
432        assert_eq!(response.into_inner().status, ServingStatus::Serving as i32);
433
434        // Check the named Vector service health
435        let response = client
436            .check(HealthCheckRequest {
437                service: "vector.Vector".to_string(),
438            })
439            .await
440            .unwrap();
441
442        assert_eq!(response.into_inner().status, ServingStatus::Serving as i32);
443    }
444}