vector/sources/redis/
mod.rs

1use bytes::Bytes;
2use chrono::Utc;
3use futures::StreamExt;
4use snafu::{ResultExt, Snafu};
5use vector_lib::{
6    EstimatedJsonEncodedSizeOf,
7    codecs::{
8        Decoder, DecoderFramedRead, DecodingConfig, StreamDecodingError,
9        decoding::{DeserializerConfig, FramingConfig},
10    },
11    config::{LegacyKey, LogNamespace},
12    configurable::configurable_component,
13    internal_event::{
14        ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol, Registered,
15    },
16    lookup::{OwnedValuePath, lookup_v2::OptionalValuePath, owned_value_path, path},
17};
18use vrl::value::Kind;
19
20use crate::{
21    config::{GenerateConfig, SourceConfig, SourceContext, SourceOutput, log_schema},
22    event::Event,
23    internal_events::{EventsReceived, StreamClosedError},
24    serde::{default_decoding, default_framing_message_based},
25};
26
27mod channel;
28mod list;
29
30#[derive(Debug, Snafu)]
31enum BuildError {
32    #[snafu(display("Failed to build redis client: {}", source))]
33    Client { source: redis::RedisError },
34}
35
36/// Data type to use for reading messages from Redis.
37#[configurable_component]
38#[derive(Copy, Clone, Debug, Derivative)]
39#[derivative(Default)]
40#[serde(rename_all = "lowercase")]
41pub enum DataTypeConfig {
42    /// The `list` data type.
43    #[derivative(Default)]
44    List,
45
46    /// The `channel` data type.
47    ///
48    /// This is based on Redis' Pub/Sub capabilities.
49    Channel,
50}
51
52/// Options for the Redis `list` data type.
53#[configurable_component]
54#[derive(Copy, Clone, Debug, Default, Derivative, Eq, PartialEq)]
55#[serde(deny_unknown_fields, rename_all = "lowercase")]
56pub struct ListOption {
57    #[configurable(derived)]
58    method: Method,
59}
60
61/// Method for getting events from the `list` data type.
62#[configurable_component]
63#[derive(Clone, Copy, Debug, Derivative, Eq, PartialEq)]
64#[derivative(Default)]
65#[serde(rename_all = "lowercase")]
66pub enum Method {
67    /// Pop messages from the head of the list.
68    #[derivative(Default)]
69    Lpop,
70
71    /// Pop messages from the tail of the list.
72    Rpop,
73}
74
75pub struct ConnectionInfo {
76    protocol: &'static str,
77    endpoint: String,
78}
79
80impl From<&redis::ConnectionInfo> for ConnectionInfo {
81    fn from(redis_conn_info: &redis::ConnectionInfo) -> Self {
82        let (protocol, endpoint) = match &redis_conn_info.addr {
83            redis::ConnectionAddr::Tcp(host, port)
84            | redis::ConnectionAddr::TcpTls { host, port, .. } => ("tcp", format!("{host}:{port}")),
85            redis::ConnectionAddr::Unix(path) => ("uds", path.to_string_lossy().to_string()),
86        };
87
88        Self { protocol, endpoint }
89    }
90}
91
92/// Configuration for the `redis` source.
93#[configurable_component(source("redis", "Collect observability data from Redis."))]
94#[derive(Clone, Debug, Derivative)]
95#[serde(deny_unknown_fields)]
96pub struct RedisSourceConfig {
97    /// The Redis data type (`list` or `channel`) to use.
98    #[serde(default)]
99    data_type: DataTypeConfig,
100
101    #[configurable(derived)]
102    list: Option<ListOption>,
103
104    /// The Redis URL to connect to.
105    ///
106    /// The URL must take the form of `protocol://server:port/db` where the `protocol` can either be `redis` or `rediss` for connections secured using TLS.
107    #[configurable(metadata(docs::examples = "redis://127.0.0.1:6379/0"))]
108    url: String,
109
110    /// The Redis key to read messages from.
111    #[configurable(metadata(docs::examples = "vector"))]
112    key: String,
113
114    /// Sets the name of the log field to use to add the key to each event.
115    ///
116    /// The value is the Redis key that the event was read from.
117    ///
118    /// By default, this is not set and the field is not automatically added.
119    #[configurable(metadata(docs::examples = "redis_key"))]
120    redis_key: Option<OptionalValuePath>,
121
122    #[configurable(derived)]
123    #[serde(default = "default_framing_message_based")]
124    #[derivative(Default(value = "default_framing_message_based()"))]
125    framing: FramingConfig,
126
127    #[configurable(derived)]
128    #[serde(default = "default_decoding")]
129    #[derivative(Default(value = "default_decoding()"))]
130    decoding: DeserializerConfig,
131
132    /// The namespace to use for logs. This overrides the global setting.
133    #[configurable(metadata(docs::hidden))]
134    #[serde(default)]
135    log_namespace: Option<bool>,
136}
137
138impl GenerateConfig for RedisSourceConfig {
139    fn generate_config() -> toml::Value {
140        toml::from_str(
141            r#"
142            url = "redis://127.0.0.1:6379/0"
143            key = "vector"
144            data_type = "list"
145            list.method = "lpop"
146            redis_key = "redis_key"
147            "#,
148        )
149        .unwrap()
150    }
151}
152
153#[async_trait::async_trait]
154#[typetag::serde(name = "redis")]
155impl SourceConfig for RedisSourceConfig {
156    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
157        let log_namespace = cx.log_namespace(self.log_namespace);
158
159        // A key must be specified to actually query i.e. the list to pop from, or the channel to subscribe to.
160        if self.key.is_empty() {
161            return Err("`key` cannot be empty.".into());
162        }
163        let redis_key = self.redis_key.clone().and_then(|k| k.path);
164
165        let client = redis::Client::open(self.url.as_str()).context(ClientSnafu {})?;
166        let connection_info = ConnectionInfo::from(client.get_connection_info());
167        let decoder =
168            DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
169                .build()?;
170
171        let bytes_received = register!(BytesReceived::from(Protocol::from(
172            connection_info.protocol
173        )));
174        let events_received = register!(EventsReceived);
175        let handler = InputHandler {
176            client,
177            bytes_received: bytes_received.clone(),
178            events_received: events_received.clone(),
179            key: self.key.clone(),
180            redis_key,
181            decoder,
182            cx,
183            log_namespace,
184        };
185
186        match self.data_type {
187            DataTypeConfig::List => {
188                let method = self.list.unwrap_or_default().method;
189                handler.watch(method).await
190            }
191            DataTypeConfig::Channel => handler.subscribe(connection_info).await,
192        }
193    }
194
195    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
196        let log_namespace = global_log_namespace.merge(self.log_namespace);
197
198        let redis_key_path = self
199            .redis_key
200            .clone()
201            .and_then(|k| k.path)
202            .map(LegacyKey::InsertIfEmpty);
203
204        let schema_definition = self
205            .decoding
206            .schema_definition(log_namespace)
207            .with_source_metadata(
208                Self::NAME,
209                redis_key_path,
210                &owned_value_path!("key"),
211                Kind::bytes(),
212                None,
213            )
214            .with_standard_vector_source_metadata();
215
216        vec![SourceOutput::new_maybe_logs(
217            self.decoding.output_type(),
218            schema_definition,
219        )]
220    }
221
222    fn can_acknowledge(&self) -> bool {
223        false
224    }
225}
226
227struct InputHandler {
228    pub client: redis::Client,
229    pub bytes_received: Registered<BytesReceived>,
230    pub events_received: Registered<EventsReceived>,
231    pub key: String,
232    pub redis_key: Option<OwnedValuePath>,
233    pub decoder: Decoder,
234    pub log_namespace: LogNamespace,
235    pub cx: SourceContext,
236}
237
238impl InputHandler {
239    async fn handle_line(&mut self, line: String) -> Result<(), ()> {
240        let now = Utc::now();
241
242        self.bytes_received.emit(ByteSize(line.len()));
243
244        let mut stream = DecoderFramedRead::new(line.as_ref(), self.decoder.clone());
245        while let Some(next) = stream.next().await {
246            match next {
247                Ok((events, _byte_size)) => {
248                    let count = events.len();
249                    let byte_size = events.estimated_json_encoded_size_of();
250                    self.events_received.emit(CountByteSize(count, byte_size));
251
252                    let events = events.into_iter().map(|mut event| {
253                        if let Event::Log(ref mut log) = event {
254                            self.log_namespace.insert_vector_metadata(
255                                log,
256                                log_schema().source_type_key(),
257                                path!("source_type"),
258                                Bytes::from(RedisSourceConfig::NAME),
259                            );
260                            self.log_namespace.insert_vector_metadata(
261                                log,
262                                log_schema().timestamp_key(),
263                                path!("ingest_timestamp"),
264                                now,
265                            );
266
267                            self.log_namespace.insert_source_metadata(
268                                RedisSourceConfig::NAME,
269                                log,
270                                self.redis_key.as_ref().map(LegacyKey::InsertIfEmpty),
271                                path!("key"),
272                                self.key.as_str(),
273                            );
274                        };
275
276                        event
277                    });
278
279                    if (self.cx.out.send_batch(events).await).is_err() {
280                        emit!(StreamClosedError { count });
281                        return Err(());
282                    }
283                }
284                Err(error) => {
285                    // Error is logged by `vector_lib::codecs::Decoder`, no further
286                    // handling is needed here.
287                    if !error.can_continue() {
288                        break;
289                    }
290                }
291            }
292        }
293        Ok(())
294    }
295}
296
297#[cfg(test)]
298mod test {
299    use super::*;
300
301    #[test]
302    fn generate_config() {
303        crate::test_util::test_generate_config::<RedisSourceConfig>();
304    }
305}
306
307#[cfg(all(test, feature = "redis-integration-tests"))]
308mod integration_test {
309    use redis::AsyncCommands;
310    use vrl::value;
311
312    use super::*;
313    use crate::{
314        SourceSender,
315        config::log_schema,
316        test_util::{
317            collect_n,
318            components::{SOURCE_TAGS, run_and_assert_source_compliance_n},
319            random_string,
320        },
321    };
322
323    const REDIS_SERVER: &str = "redis://redis-primary:6379/0";
324
325    #[tokio::test]
326    async fn redis_source_list_rpop() {
327        // Push some test data into a list object which we'll read from.
328        let client = redis::Client::open(REDIS_SERVER).unwrap();
329        let mut conn = client.get_connection_manager().await.unwrap();
330
331        let key = format!("test-key-{}", random_string(10));
332        debug!("Test key name: {}.", key);
333
334        let _: i32 = conn.rpush(&key, "1").await.unwrap();
335        let _: i32 = conn.rpush(&key, "2").await.unwrap();
336        let _: i32 = conn.rpush(&key, "3").await.unwrap();
337
338        // Now run the source and make sure we get all three events.
339        let config = RedisSourceConfig {
340            data_type: DataTypeConfig::List,
341            list: Some(ListOption {
342                method: Method::Rpop,
343            }),
344            url: REDIS_SERVER.to_owned(),
345            key: key.clone(),
346            redis_key: None,
347            framing: default_framing_message_based(),
348            decoding: default_decoding(),
349            log_namespace: Some(false),
350        };
351
352        let events = run_and_assert_source_compliance_n(config, 3, &SOURCE_TAGS).await;
353
354        assert_eq!(
355            events[0].as_log()[log_schema().message_key().unwrap().to_string()],
356            "3".into()
357        );
358        assert_eq!(
359            events[1].as_log()[log_schema().message_key().unwrap().to_string()],
360            "2".into()
361        );
362        assert_eq!(
363            events[2].as_log()[log_schema().message_key().unwrap().to_string()],
364            "1".into()
365        );
366    }
367
368    #[tokio::test]
369    async fn redis_source_list_rpop_with_log_namespace() {
370        // Push some test data into a list object which we'll read from.
371        let client = redis::Client::open(REDIS_SERVER).unwrap();
372        let mut conn = client.get_connection_manager().await.unwrap();
373
374        let key = format!("test-key-{}", random_string(10));
375        debug!("Test key name: {}.", key);
376
377        let _: i32 = conn.rpush(&key, "1").await.unwrap();
378
379        // Now run the source and make sure we get all three events.
380        let config = RedisSourceConfig {
381            data_type: DataTypeConfig::List,
382            list: Some(ListOption {
383                method: Method::Rpop,
384            }),
385            url: REDIS_SERVER.to_owned(),
386            key: key.clone(),
387            redis_key: Some(OptionalValuePath::from(owned_value_path!("remapped_key"))),
388            framing: default_framing_message_based(),
389            decoding: default_decoding(),
390            log_namespace: Some(true),
391        };
392
393        let events = run_and_assert_source_compliance_n(config, 1, &SOURCE_TAGS).await;
394
395        let log_event = events[0].as_log();
396        let meta = log_event.metadata();
397
398        assert_eq!(log_event.value(), &"1".into());
399        assert_eq!(
400            meta.value()
401                .get(path!(RedisSourceConfig::NAME, "key"))
402                .unwrap(),
403            &value!(key)
404        );
405    }
406
407    #[tokio::test]
408    async fn redis_source_list_lpop() {
409        // Push some test data into a list object which we'll read from.
410        let client = redis::Client::open(REDIS_SERVER).unwrap();
411        let mut conn = client.get_connection_manager().await.unwrap();
412
413        let key = format!("test-key-{}", random_string(10));
414        debug!("Test key name: {}.", key);
415
416        let _: i32 = conn.rpush(&key, "1").await.unwrap();
417        let _: i32 = conn.rpush(&key, "2").await.unwrap();
418        let _: i32 = conn.rpush(&key, "3").await.unwrap();
419
420        // Now run the source and make sure we get all three events.
421        let config = RedisSourceConfig {
422            data_type: DataTypeConfig::List,
423            list: Some(ListOption {
424                method: Method::Lpop,
425            }),
426            url: REDIS_SERVER.to_owned(),
427            key: key.clone(),
428            redis_key: None,
429            framing: default_framing_message_based(),
430            decoding: default_decoding(),
431            log_namespace: Some(false),
432        };
433
434        let events = run_and_assert_source_compliance_n(config, 3, &SOURCE_TAGS).await;
435
436        assert_eq!(
437            events[0].as_log()[log_schema().message_key().unwrap().to_string()],
438            "1".into()
439        );
440        assert_eq!(
441            events[1].as_log()[log_schema().message_key().unwrap().to_string()],
442            "2".into()
443        );
444        assert_eq!(
445            events[2].as_log()[log_schema().message_key().unwrap().to_string()],
446            "3".into()
447        );
448    }
449
450    #[tokio::test]
451    async fn redis_source_channel_consume_event() {
452        let key = format!("test-channel-{}", random_string(10));
453        let text = "test message for channel";
454
455        // Create the source and spawn it in the background, so that we're already listening before we publish any messages.
456        let config = RedisSourceConfig {
457            data_type: DataTypeConfig::Channel,
458            list: None,
459            url: REDIS_SERVER.to_owned(),
460            key: key.clone(),
461            redis_key: None,
462            framing: default_framing_message_based(),
463            decoding: default_decoding(),
464            log_namespace: Some(false),
465        };
466
467        let (tx, rx) = SourceSender::new_test();
468        let context = SourceContext::new_test(tx, None);
469        let source = config
470            .build(context)
471            .await
472            .expect("source should not fail to build");
473
474        tokio::spawn(source);
475
476        // Briefly wait to ensure the source is subscribed.
477        //
478        // TODO: This is a prime example of where being able to check if the shutdown signal had been polled at least
479        // once would serve as the most precise indicator of "is the source ready and waiting to receive?".
480        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
481
482        // Now create a normal Redis client and use it to publish a bunch of message, which we'll ensure the source consumes.
483        let client = redis::Client::open(REDIS_SERVER).unwrap();
484
485        let mut async_conn = client
486            .get_multiplexed_async_connection()
487            .await
488            .expect("Failed to get redis async connection.");
489
490        for _i in 0..10000 {
491            let _: i32 = async_conn.publish(key.clone(), text).await.unwrap();
492        }
493
494        let events = collect_n(rx, 10000).await;
495        assert_eq!(events.len(), 10000);
496
497        for event in events {
498            assert_eq!(
499                event.as_log()[log_schema().message_key().unwrap().to_string()],
500                text.into()
501            );
502            assert_eq!(
503                event.as_log()[log_schema().source_type_key().unwrap().to_string()],
504                RedisSourceConfig::NAME.into()
505            );
506        }
507    }
508}