vector/sources/redis/
mod.rs

1use bytes::Bytes;
2use chrono::Utc;
3use futures::StreamExt;
4use snafu::{ResultExt, Snafu};
5use vector_lib::{
6    EstimatedJsonEncodedSizeOf,
7    codecs::{
8        Decoder, DecoderFramedRead, DecodingConfig, StreamDecodingError,
9        decoding::{DeserializerConfig, FramingConfig},
10    },
11    config::{LegacyKey, LogNamespace},
12    configurable::configurable_component,
13    internal_event::{
14        ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol, Registered,
15    },
16    lookup::{OwnedValuePath, lookup_v2::OptionalValuePath, owned_value_path, path},
17};
18use vrl::value::Kind;
19
20use crate::{
21    config::{GenerateConfig, SourceConfig, SourceContext, SourceOutput, log_schema},
22    event::Event,
23    internal_events::{EventsReceived, StreamClosedError},
24    serde::{default_decoding, default_framing_message_based},
25};
26
27mod channel;
28mod list;
29
30#[derive(Debug, Snafu)]
31enum BuildError {
32    #[snafu(display("Failed to build redis client: {}", source))]
33    Client { source: redis::RedisError },
34}
35
36/// Data type to use for reading messages from Redis.
37#[configurable_component]
38#[derive(Copy, Clone, Debug, Default)]
39#[serde(rename_all = "lowercase")]
40pub enum DataTypeConfig {
41    /// The `list` data type.
42    #[default]
43    List,
44
45    /// The `channel` data type.
46    ///
47    /// This is based on Redis' Pub/Sub capabilities.
48    Channel,
49}
50
51/// Options for the Redis `list` data type.
52#[configurable_component]
53#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
54#[serde(deny_unknown_fields, rename_all = "lowercase")]
55pub struct ListOption {
56    #[configurable(derived)]
57    method: Method,
58}
59
60/// Method for getting events from the `list` data type.
61#[configurable_component]
62#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
63#[serde(rename_all = "lowercase")]
64pub enum Method {
65    /// Pop messages from the head of the list.
66    #[default]
67    Lpop,
68
69    /// Pop messages from the tail of the list.
70    Rpop,
71}
72
73pub struct ConnectionInfo {
74    protocol: &'static str,
75    endpoint: String,
76}
77
78impl From<&redis::ConnectionInfo> for ConnectionInfo {
79    fn from(redis_conn_info: &redis::ConnectionInfo) -> Self {
80        let (protocol, endpoint) = match &redis_conn_info.addr {
81            redis::ConnectionAddr::Tcp(host, port)
82            | redis::ConnectionAddr::TcpTls { host, port, .. } => ("tcp", format!("{host}:{port}")),
83            redis::ConnectionAddr::Unix(path) => ("uds", path.to_string_lossy().to_string()),
84        };
85
86        Self { protocol, endpoint }
87    }
88}
89
90/// Configuration for the `redis` source.
91#[configurable_component(source("redis", "Collect observability data from Redis."))]
92#[derive(Clone, Debug, Derivative)]
93#[serde(deny_unknown_fields)]
94pub struct RedisSourceConfig {
95    /// The Redis data type (`list` or `channel`) to use.
96    #[serde(default)]
97    data_type: DataTypeConfig,
98
99    #[configurable(derived)]
100    list: Option<ListOption>,
101
102    /// The Redis URL to connect to.
103    ///
104    /// The URL must take the form of `protocol://server:port/db` where the `protocol` can either be `redis` or `rediss` for connections secured using TLS.
105    #[configurable(metadata(docs::examples = "redis://127.0.0.1:6379/0"))]
106    url: String,
107
108    /// The Redis key to read messages from.
109    #[configurable(metadata(docs::examples = "vector"))]
110    key: String,
111
112    /// Sets the name of the log field to use to add the key to each event.
113    ///
114    /// The value is the Redis key that the event was read from.
115    ///
116    /// By default, this is not set and the field is not automatically added.
117    #[configurable(metadata(docs::examples = "redis_key"))]
118    redis_key: Option<OptionalValuePath>,
119
120    #[configurable(derived)]
121    #[serde(default = "default_framing_message_based")]
122    #[derivative(Default(value = "default_framing_message_based()"))]
123    framing: FramingConfig,
124
125    #[configurable(derived)]
126    #[serde(default = "default_decoding")]
127    #[derivative(Default(value = "default_decoding()"))]
128    decoding: DeserializerConfig,
129
130    /// The namespace to use for logs. This overrides the global setting.
131    #[configurable(metadata(docs::hidden))]
132    #[serde(default)]
133    log_namespace: Option<bool>,
134}
135
136impl GenerateConfig for RedisSourceConfig {
137    fn generate_config() -> toml::Value {
138        toml::from_str(
139            r#"
140            url = "redis://127.0.0.1:6379/0"
141            key = "vector"
142            data_type = "list"
143            list.method = "lpop"
144            redis_key = "redis_key"
145            "#,
146        )
147        .unwrap()
148    }
149}
150
151#[async_trait::async_trait]
152#[typetag::serde(name = "redis")]
153impl SourceConfig for RedisSourceConfig {
154    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
155        let log_namespace = cx.log_namespace(self.log_namespace);
156
157        // A key must be specified to actually query i.e. the list to pop from, or the channel to subscribe to.
158        if self.key.is_empty() {
159            return Err("`key` cannot be empty.".into());
160        }
161        let redis_key = self.redis_key.clone().and_then(|k| k.path);
162
163        let client = redis::Client::open(self.url.as_str()).context(ClientSnafu {})?;
164        let connection_info = ConnectionInfo::from(client.get_connection_info());
165        let decoder =
166            DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
167                .build()?;
168
169        let bytes_received = register!(BytesReceived::from(Protocol::from(
170            connection_info.protocol
171        )));
172        let events_received = register!(EventsReceived);
173        let handler = InputHandler {
174            client,
175            bytes_received: bytes_received.clone(),
176            events_received: events_received.clone(),
177            key: self.key.clone(),
178            redis_key,
179            decoder,
180            cx,
181            log_namespace,
182        };
183
184        match self.data_type {
185            DataTypeConfig::List => {
186                let method = self.list.unwrap_or_default().method;
187                handler.watch(method).await
188            }
189            DataTypeConfig::Channel => handler.subscribe(connection_info).await,
190        }
191    }
192
193    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
194        let log_namespace = global_log_namespace.merge(self.log_namespace);
195
196        let redis_key_path = self
197            .redis_key
198            .clone()
199            .and_then(|k| k.path)
200            .map(LegacyKey::InsertIfEmpty);
201
202        let schema_definition = self
203            .decoding
204            .schema_definition(log_namespace)
205            .with_source_metadata(
206                Self::NAME,
207                redis_key_path,
208                &owned_value_path!("key"),
209                Kind::bytes(),
210                None,
211            )
212            .with_standard_vector_source_metadata();
213
214        vec![SourceOutput::new_maybe_logs(
215            self.decoding.output_type(),
216            schema_definition,
217        )]
218    }
219
220    fn can_acknowledge(&self) -> bool {
221        false
222    }
223}
224
225struct InputHandler {
226    pub client: redis::Client,
227    pub bytes_received: Registered<BytesReceived>,
228    pub events_received: Registered<EventsReceived>,
229    pub key: String,
230    pub redis_key: Option<OwnedValuePath>,
231    pub decoder: Decoder,
232    pub log_namespace: LogNamespace,
233    pub cx: SourceContext,
234}
235
236impl InputHandler {
237    async fn handle_line(&mut self, line: String) -> Result<(), ()> {
238        let now = Utc::now();
239
240        self.bytes_received.emit(ByteSize(line.len()));
241
242        let mut stream = DecoderFramedRead::new(line.as_ref(), self.decoder.clone());
243        while let Some(next) = stream.next().await {
244            match next {
245                Ok((events, _byte_size)) => {
246                    let count = events.len();
247                    let byte_size = events.estimated_json_encoded_size_of();
248                    self.events_received.emit(CountByteSize(count, byte_size));
249
250                    let events = events.into_iter().map(|mut event| {
251                        if let Event::Log(ref mut log) = event {
252                            self.log_namespace.insert_vector_metadata(
253                                log,
254                                log_schema().source_type_key(),
255                                path!("source_type"),
256                                Bytes::from(RedisSourceConfig::NAME),
257                            );
258                            self.log_namespace.insert_vector_metadata(
259                                log,
260                                log_schema().timestamp_key(),
261                                path!("ingest_timestamp"),
262                                now,
263                            );
264
265                            self.log_namespace.insert_source_metadata(
266                                RedisSourceConfig::NAME,
267                                log,
268                                self.redis_key.as_ref().map(LegacyKey::InsertIfEmpty),
269                                path!("key"),
270                                self.key.as_str(),
271                            );
272                        };
273
274                        event
275                    });
276
277                    if (self.cx.out.send_batch(events).await).is_err() {
278                        emit!(StreamClosedError { count });
279                        return Err(());
280                    }
281                }
282                Err(error) => {
283                    // Error is logged by `vector_lib::codecs::Decoder`, no further
284                    // handling is needed here.
285                    if !error.can_continue() {
286                        break;
287                    }
288                }
289            }
290        }
291        Ok(())
292    }
293}
294
295#[cfg(test)]
296mod test {
297    use super::*;
298
299    #[test]
300    fn generate_config() {
301        crate::test_util::test_generate_config::<RedisSourceConfig>();
302    }
303}
304
305#[cfg(all(test, feature = "redis-integration-tests"))]
306mod integration_test {
307    use redis::AsyncCommands;
308    use vrl::value;
309
310    use super::*;
311    use crate::{
312        SourceSender,
313        config::log_schema,
314        test_util::{
315            collect_n,
316            components::{SOURCE_TAGS, run_and_assert_source_compliance_n},
317            random_string,
318        },
319    };
320
321    const REDIS_SERVER: &str = "redis://redis-primary:6379/0";
322
323    #[tokio::test]
324    async fn redis_source_list_rpop() {
325        // Push some test data into a list object which we'll read from.
326        let client = redis::Client::open(REDIS_SERVER).unwrap();
327        let mut conn = client.get_connection_manager().await.unwrap();
328
329        let key = format!("test-key-{}", random_string(10));
330        debug!("Test key name: {}.", key);
331
332        let _: i32 = conn.rpush(&key, "1").await.unwrap();
333        let _: i32 = conn.rpush(&key, "2").await.unwrap();
334        let _: i32 = conn.rpush(&key, "3").await.unwrap();
335
336        // Now run the source and make sure we get all three events.
337        let config = RedisSourceConfig {
338            data_type: DataTypeConfig::List,
339            list: Some(ListOption {
340                method: Method::Rpop,
341            }),
342            url: REDIS_SERVER.to_owned(),
343            key: key.clone(),
344            redis_key: None,
345            framing: default_framing_message_based(),
346            decoding: default_decoding(),
347            log_namespace: Some(false),
348        };
349
350        let events = run_and_assert_source_compliance_n(config, 3, &SOURCE_TAGS).await;
351
352        assert_eq!(
353            events[0].as_log()[log_schema().message_key().unwrap().to_string()],
354            "3".into()
355        );
356        assert_eq!(
357            events[1].as_log()[log_schema().message_key().unwrap().to_string()],
358            "2".into()
359        );
360        assert_eq!(
361            events[2].as_log()[log_schema().message_key().unwrap().to_string()],
362            "1".into()
363        );
364    }
365
366    #[tokio::test]
367    async fn redis_source_list_rpop_with_log_namespace() {
368        // Push some test data into a list object which we'll read from.
369        let client = redis::Client::open(REDIS_SERVER).unwrap();
370        let mut conn = client.get_connection_manager().await.unwrap();
371
372        let key = format!("test-key-{}", random_string(10));
373        debug!("Test key name: {}.", key);
374
375        let _: i32 = conn.rpush(&key, "1").await.unwrap();
376
377        // Now run the source and make sure we get all three events.
378        let config = RedisSourceConfig {
379            data_type: DataTypeConfig::List,
380            list: Some(ListOption {
381                method: Method::Rpop,
382            }),
383            url: REDIS_SERVER.to_owned(),
384            key: key.clone(),
385            redis_key: Some(OptionalValuePath::from(owned_value_path!("remapped_key"))),
386            framing: default_framing_message_based(),
387            decoding: default_decoding(),
388            log_namespace: Some(true),
389        };
390
391        let events = run_and_assert_source_compliance_n(config, 1, &SOURCE_TAGS).await;
392
393        let log_event = events[0].as_log();
394        let meta = log_event.metadata();
395
396        assert_eq!(log_event.value(), &"1".into());
397        assert_eq!(
398            meta.value()
399                .get(path!(RedisSourceConfig::NAME, "key"))
400                .unwrap(),
401            &value!(key)
402        );
403    }
404
405    #[tokio::test]
406    async fn redis_source_list_lpop() {
407        // Push some test data into a list object which we'll read from.
408        let client = redis::Client::open(REDIS_SERVER).unwrap();
409        let mut conn = client.get_connection_manager().await.unwrap();
410
411        let key = format!("test-key-{}", random_string(10));
412        debug!("Test key name: {}.", key);
413
414        let _: i32 = conn.rpush(&key, "1").await.unwrap();
415        let _: i32 = conn.rpush(&key, "2").await.unwrap();
416        let _: i32 = conn.rpush(&key, "3").await.unwrap();
417
418        // Now run the source and make sure we get all three events.
419        let config = RedisSourceConfig {
420            data_type: DataTypeConfig::List,
421            list: Some(ListOption {
422                method: Method::Lpop,
423            }),
424            url: REDIS_SERVER.to_owned(),
425            key: key.clone(),
426            redis_key: None,
427            framing: default_framing_message_based(),
428            decoding: default_decoding(),
429            log_namespace: Some(false),
430        };
431
432        let events = run_and_assert_source_compliance_n(config, 3, &SOURCE_TAGS).await;
433
434        assert_eq!(
435            events[0].as_log()[log_schema().message_key().unwrap().to_string()],
436            "1".into()
437        );
438        assert_eq!(
439            events[1].as_log()[log_schema().message_key().unwrap().to_string()],
440            "2".into()
441        );
442        assert_eq!(
443            events[2].as_log()[log_schema().message_key().unwrap().to_string()],
444            "3".into()
445        );
446    }
447
448    #[tokio::test]
449    async fn redis_source_channel_consume_event() {
450        let key = format!("test-channel-{}", random_string(10));
451        let text = "test message for channel";
452
453        // Create the source and spawn it in the background, so that we're already listening before we publish any messages.
454        let config = RedisSourceConfig {
455            data_type: DataTypeConfig::Channel,
456            list: None,
457            url: REDIS_SERVER.to_owned(),
458            key: key.clone(),
459            redis_key: None,
460            framing: default_framing_message_based(),
461            decoding: default_decoding(),
462            log_namespace: Some(false),
463        };
464
465        let (tx, rx) = SourceSender::new_test();
466        let context = SourceContext::new_test(tx, None);
467        let source = config
468            .build(context)
469            .await
470            .expect("source should not fail to build");
471
472        tokio::spawn(source);
473
474        // Briefly wait to ensure the source is subscribed.
475        //
476        // TODO: This is a prime example of where being able to check if the shutdown signal had been polled at least
477        // once would serve as the most precise indicator of "is the source ready and waiting to receive?".
478        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
479
480        // Now create a normal Redis client and use it to publish a bunch of message, which we'll ensure the source consumes.
481        let client = redis::Client::open(REDIS_SERVER).unwrap();
482
483        let mut async_conn = client
484            .get_multiplexed_async_connection()
485            .await
486            .expect("Failed to get redis async connection.");
487
488        for _i in 0..10000 {
489            let _: i32 = async_conn.publish(key.clone(), text).await.unwrap();
490        }
491
492        let events = collect_n(rx, 10000).await;
493        assert_eq!(events.len(), 10000);
494
495        for event in events {
496            assert_eq!(
497                event.as_log()[log_schema().message_key().unwrap().to_string()],
498                text.into()
499            );
500            assert_eq!(
501                event.as_log()[log_schema().source_type_key().unwrap().to_string()],
502                RedisSourceConfig::NAME.into()
503            );
504        }
505    }
506}