1use bytes::Bytes;
2use chrono::Utc;
3use futures::StreamExt;
4use snafu::{ResultExt, Snafu};
5use vector_lib::{
6 EstimatedJsonEncodedSizeOf,
7 codecs::{
8 Decoder, DecoderFramedRead, DecodingConfig, StreamDecodingError,
9 decoding::{DeserializerConfig, FramingConfig},
10 },
11 config::{LegacyKey, LogNamespace},
12 configurable::configurable_component,
13 internal_event::{
14 ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol, Registered,
15 },
16 lookup::{OwnedValuePath, lookup_v2::OptionalValuePath, owned_value_path, path},
17};
18use vrl::value::Kind;
19
20use crate::{
21 config::{GenerateConfig, SourceConfig, SourceContext, SourceOutput, log_schema},
22 event::Event,
23 internal_events::{EventsReceived, StreamClosedError},
24 serde::{default_decoding, default_framing_message_based},
25};
26
27mod channel;
28mod list;
29
30#[derive(Debug, Snafu)]
31enum BuildError {
32 #[snafu(display("Failed to build redis client: {}", source))]
33 Client { source: redis::RedisError },
34}
35
36#[configurable_component]
38#[derive(Copy, Clone, Debug, Derivative)]
39#[derivative(Default)]
40#[serde(rename_all = "lowercase")]
41pub enum DataTypeConfig {
42 #[derivative(Default)]
44 List,
45
46 Channel,
50}
51
52#[configurable_component]
54#[derive(Copy, Clone, Debug, Default, Derivative, Eq, PartialEq)]
55#[serde(deny_unknown_fields, rename_all = "lowercase")]
56pub struct ListOption {
57 #[configurable(derived)]
58 method: Method,
59}
60
61#[configurable_component]
63#[derive(Clone, Copy, Debug, Derivative, Eq, PartialEq)]
64#[derivative(Default)]
65#[serde(rename_all = "lowercase")]
66pub enum Method {
67 #[derivative(Default)]
69 Lpop,
70
71 Rpop,
73}
74
75pub struct ConnectionInfo {
76 protocol: &'static str,
77 endpoint: String,
78}
79
80impl From<&redis::ConnectionInfo> for ConnectionInfo {
81 fn from(redis_conn_info: &redis::ConnectionInfo) -> Self {
82 let (protocol, endpoint) = match &redis_conn_info.addr {
83 redis::ConnectionAddr::Tcp(host, port)
84 | redis::ConnectionAddr::TcpTls { host, port, .. } => ("tcp", format!("{host}:{port}")),
85 redis::ConnectionAddr::Unix(path) => ("uds", path.to_string_lossy().to_string()),
86 };
87
88 Self { protocol, endpoint }
89 }
90}
91
92#[configurable_component(source("redis", "Collect observability data from Redis."))]
94#[derive(Clone, Debug, Derivative)]
95#[serde(deny_unknown_fields)]
96pub struct RedisSourceConfig {
97 #[serde(default)]
99 data_type: DataTypeConfig,
100
101 #[configurable(derived)]
102 list: Option<ListOption>,
103
104 #[configurable(metadata(docs::examples = "redis://127.0.0.1:6379/0"))]
108 url: String,
109
110 #[configurable(metadata(docs::examples = "vector"))]
112 key: String,
113
114 #[configurable(metadata(docs::examples = "redis_key"))]
120 redis_key: Option<OptionalValuePath>,
121
122 #[configurable(derived)]
123 #[serde(default = "default_framing_message_based")]
124 #[derivative(Default(value = "default_framing_message_based()"))]
125 framing: FramingConfig,
126
127 #[configurable(derived)]
128 #[serde(default = "default_decoding")]
129 #[derivative(Default(value = "default_decoding()"))]
130 decoding: DeserializerConfig,
131
132 #[configurable(metadata(docs::hidden))]
134 #[serde(default)]
135 log_namespace: Option<bool>,
136}
137
138impl GenerateConfig for RedisSourceConfig {
139 fn generate_config() -> toml::Value {
140 toml::from_str(
141 r#"
142 url = "redis://127.0.0.1:6379/0"
143 key = "vector"
144 data_type = "list"
145 list.method = "lpop"
146 redis_key = "redis_key"
147 "#,
148 )
149 .unwrap()
150 }
151}
152
153#[async_trait::async_trait]
154#[typetag::serde(name = "redis")]
155impl SourceConfig for RedisSourceConfig {
156 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
157 let log_namespace = cx.log_namespace(self.log_namespace);
158
159 if self.key.is_empty() {
161 return Err("`key` cannot be empty.".into());
162 }
163 let redis_key = self.redis_key.clone().and_then(|k| k.path);
164
165 let client = redis::Client::open(self.url.as_str()).context(ClientSnafu {})?;
166 let connection_info = ConnectionInfo::from(client.get_connection_info());
167 let decoder =
168 DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
169 .build()?;
170
171 let bytes_received = register!(BytesReceived::from(Protocol::from(
172 connection_info.protocol
173 )));
174 let events_received = register!(EventsReceived);
175 let handler = InputHandler {
176 client,
177 bytes_received: bytes_received.clone(),
178 events_received: events_received.clone(),
179 key: self.key.clone(),
180 redis_key,
181 decoder,
182 cx,
183 log_namespace,
184 };
185
186 match self.data_type {
187 DataTypeConfig::List => {
188 let method = self.list.unwrap_or_default().method;
189 handler.watch(method).await
190 }
191 DataTypeConfig::Channel => handler.subscribe(connection_info).await,
192 }
193 }
194
195 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
196 let log_namespace = global_log_namespace.merge(self.log_namespace);
197
198 let redis_key_path = self
199 .redis_key
200 .clone()
201 .and_then(|k| k.path)
202 .map(LegacyKey::InsertIfEmpty);
203
204 let schema_definition = self
205 .decoding
206 .schema_definition(log_namespace)
207 .with_source_metadata(
208 Self::NAME,
209 redis_key_path,
210 &owned_value_path!("key"),
211 Kind::bytes(),
212 None,
213 )
214 .with_standard_vector_source_metadata();
215
216 vec![SourceOutput::new_maybe_logs(
217 self.decoding.output_type(),
218 schema_definition,
219 )]
220 }
221
222 fn can_acknowledge(&self) -> bool {
223 false
224 }
225}
226
227struct InputHandler {
228 pub client: redis::Client,
229 pub bytes_received: Registered<BytesReceived>,
230 pub events_received: Registered<EventsReceived>,
231 pub key: String,
232 pub redis_key: Option<OwnedValuePath>,
233 pub decoder: Decoder,
234 pub log_namespace: LogNamespace,
235 pub cx: SourceContext,
236}
237
238impl InputHandler {
239 async fn handle_line(&mut self, line: String) -> Result<(), ()> {
240 let now = Utc::now();
241
242 self.bytes_received.emit(ByteSize(line.len()));
243
244 let mut stream = DecoderFramedRead::new(line.as_ref(), self.decoder.clone());
245 while let Some(next) = stream.next().await {
246 match next {
247 Ok((events, _byte_size)) => {
248 let count = events.len();
249 let byte_size = events.estimated_json_encoded_size_of();
250 self.events_received.emit(CountByteSize(count, byte_size));
251
252 let events = events.into_iter().map(|mut event| {
253 if let Event::Log(ref mut log) = event {
254 self.log_namespace.insert_vector_metadata(
255 log,
256 log_schema().source_type_key(),
257 path!("source_type"),
258 Bytes::from(RedisSourceConfig::NAME),
259 );
260 self.log_namespace.insert_vector_metadata(
261 log,
262 log_schema().timestamp_key(),
263 path!("ingest_timestamp"),
264 now,
265 );
266
267 self.log_namespace.insert_source_metadata(
268 RedisSourceConfig::NAME,
269 log,
270 self.redis_key.as_ref().map(LegacyKey::InsertIfEmpty),
271 path!("key"),
272 self.key.as_str(),
273 );
274 };
275
276 event
277 });
278
279 if (self.cx.out.send_batch(events).await).is_err() {
280 emit!(StreamClosedError { count });
281 return Err(());
282 }
283 }
284 Err(error) => {
285 if !error.can_continue() {
288 break;
289 }
290 }
291 }
292 }
293 Ok(())
294 }
295}
296
297#[cfg(test)]
298mod test {
299 use super::*;
300
301 #[test]
302 fn generate_config() {
303 crate::test_util::test_generate_config::<RedisSourceConfig>();
304 }
305}
306
307#[cfg(all(test, feature = "redis-integration-tests"))]
308mod integration_test {
309 use redis::AsyncCommands;
310 use vrl::value;
311
312 use super::*;
313 use crate::{
314 SourceSender,
315 config::log_schema,
316 test_util::{
317 collect_n,
318 components::{SOURCE_TAGS, run_and_assert_source_compliance_n},
319 random_string,
320 },
321 };
322
323 const REDIS_SERVER: &str = "redis://redis-primary:6379/0";
324
325 #[tokio::test]
326 async fn redis_source_list_rpop() {
327 let client = redis::Client::open(REDIS_SERVER).unwrap();
329 let mut conn = client.get_connection_manager().await.unwrap();
330
331 let key = format!("test-key-{}", random_string(10));
332 debug!("Test key name: {}.", key);
333
334 let _: i32 = conn.rpush(&key, "1").await.unwrap();
335 let _: i32 = conn.rpush(&key, "2").await.unwrap();
336 let _: i32 = conn.rpush(&key, "3").await.unwrap();
337
338 let config = RedisSourceConfig {
340 data_type: DataTypeConfig::List,
341 list: Some(ListOption {
342 method: Method::Rpop,
343 }),
344 url: REDIS_SERVER.to_owned(),
345 key: key.clone(),
346 redis_key: None,
347 framing: default_framing_message_based(),
348 decoding: default_decoding(),
349 log_namespace: Some(false),
350 };
351
352 let events = run_and_assert_source_compliance_n(config, 3, &SOURCE_TAGS).await;
353
354 assert_eq!(
355 events[0].as_log()[log_schema().message_key().unwrap().to_string()],
356 "3".into()
357 );
358 assert_eq!(
359 events[1].as_log()[log_schema().message_key().unwrap().to_string()],
360 "2".into()
361 );
362 assert_eq!(
363 events[2].as_log()[log_schema().message_key().unwrap().to_string()],
364 "1".into()
365 );
366 }
367
368 #[tokio::test]
369 async fn redis_source_list_rpop_with_log_namespace() {
370 let client = redis::Client::open(REDIS_SERVER).unwrap();
372 let mut conn = client.get_connection_manager().await.unwrap();
373
374 let key = format!("test-key-{}", random_string(10));
375 debug!("Test key name: {}.", key);
376
377 let _: i32 = conn.rpush(&key, "1").await.unwrap();
378
379 let config = RedisSourceConfig {
381 data_type: DataTypeConfig::List,
382 list: Some(ListOption {
383 method: Method::Rpop,
384 }),
385 url: REDIS_SERVER.to_owned(),
386 key: key.clone(),
387 redis_key: Some(OptionalValuePath::from(owned_value_path!("remapped_key"))),
388 framing: default_framing_message_based(),
389 decoding: default_decoding(),
390 log_namespace: Some(true),
391 };
392
393 let events = run_and_assert_source_compliance_n(config, 1, &SOURCE_TAGS).await;
394
395 let log_event = events[0].as_log();
396 let meta = log_event.metadata();
397
398 assert_eq!(log_event.value(), &"1".into());
399 assert_eq!(
400 meta.value()
401 .get(path!(RedisSourceConfig::NAME, "key"))
402 .unwrap(),
403 &value!(key)
404 );
405 }
406
407 #[tokio::test]
408 async fn redis_source_list_lpop() {
409 let client = redis::Client::open(REDIS_SERVER).unwrap();
411 let mut conn = client.get_connection_manager().await.unwrap();
412
413 let key = format!("test-key-{}", random_string(10));
414 debug!("Test key name: {}.", key);
415
416 let _: i32 = conn.rpush(&key, "1").await.unwrap();
417 let _: i32 = conn.rpush(&key, "2").await.unwrap();
418 let _: i32 = conn.rpush(&key, "3").await.unwrap();
419
420 let config = RedisSourceConfig {
422 data_type: DataTypeConfig::List,
423 list: Some(ListOption {
424 method: Method::Lpop,
425 }),
426 url: REDIS_SERVER.to_owned(),
427 key: key.clone(),
428 redis_key: None,
429 framing: default_framing_message_based(),
430 decoding: default_decoding(),
431 log_namespace: Some(false),
432 };
433
434 let events = run_and_assert_source_compliance_n(config, 3, &SOURCE_TAGS).await;
435
436 assert_eq!(
437 events[0].as_log()[log_schema().message_key().unwrap().to_string()],
438 "1".into()
439 );
440 assert_eq!(
441 events[1].as_log()[log_schema().message_key().unwrap().to_string()],
442 "2".into()
443 );
444 assert_eq!(
445 events[2].as_log()[log_schema().message_key().unwrap().to_string()],
446 "3".into()
447 );
448 }
449
450 #[tokio::test]
451 async fn redis_source_channel_consume_event() {
452 let key = format!("test-channel-{}", random_string(10));
453 let text = "test message for channel";
454
455 let config = RedisSourceConfig {
457 data_type: DataTypeConfig::Channel,
458 list: None,
459 url: REDIS_SERVER.to_owned(),
460 key: key.clone(),
461 redis_key: None,
462 framing: default_framing_message_based(),
463 decoding: default_decoding(),
464 log_namespace: Some(false),
465 };
466
467 let (tx, rx) = SourceSender::new_test();
468 let context = SourceContext::new_test(tx, None);
469 let source = config
470 .build(context)
471 .await
472 .expect("source should not fail to build");
473
474 tokio::spawn(source);
475
476 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
481
482 let client = redis::Client::open(REDIS_SERVER).unwrap();
484
485 let mut async_conn = client
486 .get_multiplexed_async_connection()
487 .await
488 .expect("Failed to get redis async connection.");
489
490 for _i in 0..10000 {
491 let _: i32 = async_conn.publish(key.clone(), text).await.unwrap();
492 }
493
494 let events = collect_n(rx, 10000).await;
495 assert_eq!(events.len(), 10000);
496
497 for event in events {
498 assert_eq!(
499 event.as_log()[log_schema().message_key().unwrap().to_string()],
500 text.into()
501 );
502 assert_eq!(
503 event.as_log()[log_schema().source_type_key().unwrap().to_string()],
504 RedisSourceConfig::NAME.into()
505 );
506 }
507 }
508}