vector/sources/redis/
mod.rs

1use bytes::Bytes;
2use chrono::Utc;
3use futures::StreamExt;
4use snafu::{ResultExt, Snafu};
5use tokio_util::codec::FramedRead;
6use vector_lib::codecs::{
7    decoding::{DeserializerConfig, FramingConfig},
8    StreamDecodingError,
9};
10use vector_lib::configurable::configurable_component;
11use vector_lib::internal_event::{
12    ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol, Registered,
13};
14use vector_lib::lookup::{lookup_v2::OptionalValuePath, owned_value_path, path, OwnedValuePath};
15use vector_lib::{
16    config::{LegacyKey, LogNamespace},
17    EstimatedJsonEncodedSizeOf,
18};
19use vrl::value::Kind;
20
21use crate::{
22    codecs::{Decoder, DecodingConfig},
23    config::{log_schema, GenerateConfig, SourceConfig, SourceContext, SourceOutput},
24    event::Event,
25    internal_events::{EventsReceived, StreamClosedError},
26    serde::{default_decoding, default_framing_message_based},
27};
28
29mod channel;
30mod list;
31
32#[derive(Debug, Snafu)]
33enum BuildError {
34    #[snafu(display("Failed to build redis client: {}", source))]
35    Client { source: redis::RedisError },
36}
37
38/// Data type to use for reading messages from Redis.
39#[configurable_component]
40#[derive(Copy, Clone, Debug, Derivative)]
41#[derivative(Default)]
42#[serde(rename_all = "lowercase")]
43pub enum DataTypeConfig {
44    /// The `list` data type.
45    #[derivative(Default)]
46    List,
47
48    /// The `channel` data type.
49    ///
50    /// This is based on Redis' Pub/Sub capabilities.
51    Channel,
52}
53
54/// Options for the Redis `list` data type.
55#[configurable_component]
56#[derive(Copy, Clone, Debug, Default, Derivative, Eq, PartialEq)]
57#[serde(deny_unknown_fields, rename_all = "lowercase")]
58pub struct ListOption {
59    #[configurable(derived)]
60    method: Method,
61}
62
63/// Method for getting events from the `list` data type.
64#[configurable_component]
65#[derive(Clone, Copy, Debug, Derivative, Eq, PartialEq)]
66#[derivative(Default)]
67#[serde(rename_all = "lowercase")]
68pub enum Method {
69    /// Pop messages from the head of the list.
70    #[derivative(Default)]
71    Lpop,
72
73    /// Pop messages from the tail of the list.
74    Rpop,
75}
76
77pub struct ConnectionInfo {
78    protocol: &'static str,
79    endpoint: String,
80}
81
82impl From<&redis::ConnectionInfo> for ConnectionInfo {
83    fn from(redis_conn_info: &redis::ConnectionInfo) -> Self {
84        let (protocol, endpoint) = match &redis_conn_info.addr {
85            redis::ConnectionAddr::Tcp(host, port)
86            | redis::ConnectionAddr::TcpTls { host, port, .. } => ("tcp", format!("{host}:{port}")),
87            redis::ConnectionAddr::Unix(path) => ("uds", path.to_string_lossy().to_string()),
88        };
89
90        Self { protocol, endpoint }
91    }
92}
93
94/// Configuration for the `redis` source.
95#[configurable_component(source("redis", "Collect observability data from Redis."))]
96#[derive(Clone, Debug, Derivative)]
97#[serde(deny_unknown_fields)]
98pub struct RedisSourceConfig {
99    /// The Redis data type (`list` or `channel`) to use.
100    #[serde(default)]
101    data_type: DataTypeConfig,
102
103    #[configurable(derived)]
104    list: Option<ListOption>,
105
106    /// The Redis URL to connect to.
107    ///
108    /// The URL must take the form of `protocol://server:port/db` where the `protocol` can either be `redis` or `rediss` for connections secured using TLS.
109    #[configurable(metadata(docs::examples = "redis://127.0.0.1:6379/0"))]
110    url: String,
111
112    /// The Redis key to read messages from.
113    #[configurable(metadata(docs::examples = "vector"))]
114    key: String,
115
116    /// Sets the name of the log field to use to add the key to each event.
117    ///
118    /// The value is the Redis key that the event was read from.
119    ///
120    /// By default, this is not set and the field is not automatically added.
121    #[configurable(metadata(docs::examples = "redis_key"))]
122    redis_key: Option<OptionalValuePath>,
123
124    #[configurable(derived)]
125    #[serde(default = "default_framing_message_based")]
126    #[derivative(Default(value = "default_framing_message_based()"))]
127    framing: FramingConfig,
128
129    #[configurable(derived)]
130    #[serde(default = "default_decoding")]
131    #[derivative(Default(value = "default_decoding()"))]
132    decoding: DeserializerConfig,
133
134    /// The namespace to use for logs. This overrides the global setting.
135    #[configurable(metadata(docs::hidden))]
136    #[serde(default)]
137    log_namespace: Option<bool>,
138}
139
140impl GenerateConfig for RedisSourceConfig {
141    fn generate_config() -> toml::Value {
142        toml::from_str(
143            r#"
144            url = "redis://127.0.0.1:6379/0"
145            key = "vector"
146            data_type = "list"
147            list.method = "lpop"
148            redis_key = "redis_key"
149            "#,
150        )
151        .unwrap()
152    }
153}
154
155#[async_trait::async_trait]
156#[typetag::serde(name = "redis")]
157impl SourceConfig for RedisSourceConfig {
158    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
159        let log_namespace = cx.log_namespace(self.log_namespace);
160
161        // A key must be specified to actually query i.e. the list to pop from, or the channel to subscribe to.
162        if self.key.is_empty() {
163            return Err("`key` cannot be empty.".into());
164        }
165        let redis_key = self.redis_key.clone().and_then(|k| k.path);
166
167        let client = redis::Client::open(self.url.as_str()).context(ClientSnafu {})?;
168        let connection_info = ConnectionInfo::from(client.get_connection_info());
169        let decoder =
170            DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
171                .build()?;
172
173        let bytes_received = register!(BytesReceived::from(Protocol::from(
174            connection_info.protocol
175        )));
176        let events_received = register!(EventsReceived);
177        let handler = InputHandler {
178            client,
179            bytes_received: bytes_received.clone(),
180            events_received: events_received.clone(),
181            key: self.key.clone(),
182            redis_key,
183            decoder,
184            cx,
185            log_namespace,
186        };
187
188        match self.data_type {
189            DataTypeConfig::List => {
190                let method = self.list.unwrap_or_default().method;
191                handler.watch(method).await
192            }
193            DataTypeConfig::Channel => handler.subscribe(connection_info).await,
194        }
195    }
196
197    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
198        let log_namespace = global_log_namespace.merge(self.log_namespace);
199
200        let redis_key_path = self
201            .redis_key
202            .clone()
203            .and_then(|k| k.path)
204            .map(LegacyKey::InsertIfEmpty);
205
206        let schema_definition = self
207            .decoding
208            .schema_definition(log_namespace)
209            .with_source_metadata(
210                Self::NAME,
211                redis_key_path,
212                &owned_value_path!("key"),
213                Kind::bytes(),
214                None,
215            )
216            .with_standard_vector_source_metadata();
217
218        vec![SourceOutput::new_maybe_logs(
219            self.decoding.output_type(),
220            schema_definition,
221        )]
222    }
223
224    fn can_acknowledge(&self) -> bool {
225        false
226    }
227}
228
229struct InputHandler {
230    pub client: redis::Client,
231    pub bytes_received: Registered<BytesReceived>,
232    pub events_received: Registered<EventsReceived>,
233    pub key: String,
234    pub redis_key: Option<OwnedValuePath>,
235    pub decoder: Decoder,
236    pub log_namespace: LogNamespace,
237    pub cx: SourceContext,
238}
239
240impl InputHandler {
241    async fn handle_line(&mut self, line: String) -> Result<(), ()> {
242        let now = Utc::now();
243
244        self.bytes_received.emit(ByteSize(line.len()));
245
246        let mut stream = FramedRead::new(line.as_ref(), self.decoder.clone());
247        while let Some(next) = stream.next().await {
248            match next {
249                Ok((events, _byte_size)) => {
250                    let count = events.len();
251                    let byte_size = events.estimated_json_encoded_size_of();
252                    self.events_received.emit(CountByteSize(count, byte_size));
253
254                    let events = events.into_iter().map(|mut event| {
255                        if let Event::Log(ref mut log) = event {
256                            self.log_namespace.insert_vector_metadata(
257                                log,
258                                log_schema().source_type_key(),
259                                path!("source_type"),
260                                Bytes::from(RedisSourceConfig::NAME),
261                            );
262                            self.log_namespace.insert_vector_metadata(
263                                log,
264                                log_schema().timestamp_key(),
265                                path!("ingest_timestamp"),
266                                now,
267                            );
268
269                            self.log_namespace.insert_source_metadata(
270                                RedisSourceConfig::NAME,
271                                log,
272                                self.redis_key.as_ref().map(LegacyKey::InsertIfEmpty),
273                                path!("key"),
274                                self.key.as_str(),
275                            );
276                        };
277
278                        event
279                    });
280
281                    if (self.cx.out.send_batch(events).await).is_err() {
282                        emit!(StreamClosedError { count });
283                        return Err(());
284                    }
285                }
286                Err(error) => {
287                    // Error is logged by `crate::codecs::Decoder`, no further
288                    // handling is needed here.
289                    if !error.can_continue() {
290                        break;
291                    }
292                }
293            }
294        }
295        Ok(())
296    }
297}
298
299#[cfg(test)]
300mod test {
301    use super::*;
302
303    #[test]
304    fn generate_config() {
305        crate::test_util::test_generate_config::<RedisSourceConfig>();
306    }
307}
308
309#[cfg(all(test, feature = "redis-integration-tests"))]
310mod integration_test {
311    use redis::AsyncCommands;
312
313    use super::*;
314    use crate::{
315        config::log_schema,
316        test_util::{
317            collect_n,
318            components::{run_and_assert_source_compliance_n, SOURCE_TAGS},
319            random_string,
320        },
321        SourceSender,
322    };
323    use vrl::value;
324
325    const REDIS_SERVER: &str = "redis://redis-primary:6379/0";
326
327    #[tokio::test]
328    async fn redis_source_list_rpop() {
329        // Push some test data into a list object which we'll read from.
330        let client = redis::Client::open(REDIS_SERVER).unwrap();
331        let mut conn = client.get_connection_manager().await.unwrap();
332
333        let key = format!("test-key-{}", random_string(10));
334        debug!("Test key name: {}.", key);
335
336        let _: i32 = conn.rpush(&key, "1").await.unwrap();
337        let _: i32 = conn.rpush(&key, "2").await.unwrap();
338        let _: i32 = conn.rpush(&key, "3").await.unwrap();
339
340        // Now run the source and make sure we get all three events.
341        let config = RedisSourceConfig {
342            data_type: DataTypeConfig::List,
343            list: Some(ListOption {
344                method: Method::Rpop,
345            }),
346            url: REDIS_SERVER.to_owned(),
347            key: key.clone(),
348            redis_key: None,
349            framing: default_framing_message_based(),
350            decoding: default_decoding(),
351            log_namespace: Some(false),
352        };
353
354        let events = run_and_assert_source_compliance_n(config, 3, &SOURCE_TAGS).await;
355
356        assert_eq!(
357            events[0].as_log()[log_schema().message_key().unwrap().to_string()],
358            "3".into()
359        );
360        assert_eq!(
361            events[1].as_log()[log_schema().message_key().unwrap().to_string()],
362            "2".into()
363        );
364        assert_eq!(
365            events[2].as_log()[log_schema().message_key().unwrap().to_string()],
366            "1".into()
367        );
368    }
369
370    #[tokio::test]
371    async fn redis_source_list_rpop_with_log_namespace() {
372        // Push some test data into a list object which we'll read from.
373        let client = redis::Client::open(REDIS_SERVER).unwrap();
374        let mut conn = client.get_connection_manager().await.unwrap();
375
376        let key = format!("test-key-{}", random_string(10));
377        debug!("Test key name: {}.", key);
378
379        let _: i32 = conn.rpush(&key, "1").await.unwrap();
380
381        // Now run the source and make sure we get all three events.
382        let config = RedisSourceConfig {
383            data_type: DataTypeConfig::List,
384            list: Some(ListOption {
385                method: Method::Rpop,
386            }),
387            url: REDIS_SERVER.to_owned(),
388            key: key.clone(),
389            redis_key: Some(OptionalValuePath::from(owned_value_path!("remapped_key"))),
390            framing: default_framing_message_based(),
391            decoding: default_decoding(),
392            log_namespace: Some(true),
393        };
394
395        let events = run_and_assert_source_compliance_n(config, 1, &SOURCE_TAGS).await;
396
397        let log_event = events[0].as_log();
398        let meta = log_event.metadata();
399
400        assert_eq!(log_event.value(), &"1".into());
401        assert_eq!(
402            meta.value()
403                .get(path!(RedisSourceConfig::NAME, "key"))
404                .unwrap(),
405            &value!(key)
406        );
407    }
408
409    #[tokio::test]
410    async fn redis_source_list_lpop() {
411        // Push some test data into a list object which we'll read from.
412        let client = redis::Client::open(REDIS_SERVER).unwrap();
413        let mut conn = client.get_connection_manager().await.unwrap();
414
415        let key = format!("test-key-{}", random_string(10));
416        debug!("Test key name: {}.", key);
417
418        let _: i32 = conn.rpush(&key, "1").await.unwrap();
419        let _: i32 = conn.rpush(&key, "2").await.unwrap();
420        let _: i32 = conn.rpush(&key, "3").await.unwrap();
421
422        // Now run the source and make sure we get all three events.
423        let config = RedisSourceConfig {
424            data_type: DataTypeConfig::List,
425            list: Some(ListOption {
426                method: Method::Lpop,
427            }),
428            url: REDIS_SERVER.to_owned(),
429            key: key.clone(),
430            redis_key: None,
431            framing: default_framing_message_based(),
432            decoding: default_decoding(),
433            log_namespace: Some(false),
434        };
435
436        let events = run_and_assert_source_compliance_n(config, 3, &SOURCE_TAGS).await;
437
438        assert_eq!(
439            events[0].as_log()[log_schema().message_key().unwrap().to_string()],
440            "1".into()
441        );
442        assert_eq!(
443            events[1].as_log()[log_schema().message_key().unwrap().to_string()],
444            "2".into()
445        );
446        assert_eq!(
447            events[2].as_log()[log_schema().message_key().unwrap().to_string()],
448            "3".into()
449        );
450    }
451
452    #[tokio::test]
453    async fn redis_source_channel_consume_event() {
454        let key = format!("test-channel-{}", random_string(10));
455        let text = "test message for channel";
456
457        // Create the source and spawn it in the background, so that we're already listening before we publish any messages.
458        let config = RedisSourceConfig {
459            data_type: DataTypeConfig::Channel,
460            list: None,
461            url: REDIS_SERVER.to_owned(),
462            key: key.clone(),
463            redis_key: None,
464            framing: default_framing_message_based(),
465            decoding: default_decoding(),
466            log_namespace: Some(false),
467        };
468
469        let (tx, rx) = SourceSender::new_test();
470        let context = SourceContext::new_test(tx, None);
471        let source = config
472            .build(context)
473            .await
474            .expect("source should not fail to build");
475
476        tokio::spawn(source);
477
478        // Briefly wait to ensure the source is subscribed.
479        //
480        // TODO: This is a prime example of where being able to check if the shutdown signal had been polled at least
481        // once would serve as the most precise indicator of "is the source ready and waiting to receive?".
482        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
483
484        // Now create a normal Redis client and use it to publish a bunch of message, which we'll ensure the source consumes.
485        let client = redis::Client::open(REDIS_SERVER).unwrap();
486
487        let mut async_conn = client
488            .get_multiplexed_async_connection()
489            .await
490            .expect("Failed to get redis async connection.");
491
492        for _i in 0..10000 {
493            let _: i32 = async_conn.publish(key.clone(), text).await.unwrap();
494        }
495
496        let events = collect_n(rx, 10000).await;
497        assert_eq!(events.len(), 10000);
498
499        for event in events {
500            assert_eq!(
501                event.as_log()[log_schema().message_key().unwrap().to_string()],
502                text.into()
503            );
504            assert_eq!(
505                event.as_log()[log_schema().source_type_key().unwrap().to_string()],
506                RedisSourceConfig::NAME.into()
507            );
508        }
509    }
510}