1use bytes::Bytes;
2use chrono::Utc;
3use futures::StreamExt;
4use snafu::{ResultExt, Snafu};
5use tokio_util::codec::FramedRead;
6use vector_lib::codecs::{
7 decoding::{DeserializerConfig, FramingConfig},
8 StreamDecodingError,
9};
10use vector_lib::configurable::configurable_component;
11use vector_lib::internal_event::{
12 ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol, Registered,
13};
14use vector_lib::lookup::{lookup_v2::OptionalValuePath, owned_value_path, path, OwnedValuePath};
15use vector_lib::{
16 config::{LegacyKey, LogNamespace},
17 EstimatedJsonEncodedSizeOf,
18};
19use vrl::value::Kind;
20
21use crate::{
22 codecs::{Decoder, DecodingConfig},
23 config::{log_schema, GenerateConfig, SourceConfig, SourceContext, SourceOutput},
24 event::Event,
25 internal_events::{EventsReceived, StreamClosedError},
26 serde::{default_decoding, default_framing_message_based},
27};
28
29mod channel;
30mod list;
31
32#[derive(Debug, Snafu)]
33enum BuildError {
34 #[snafu(display("Failed to build redis client: {}", source))]
35 Client { source: redis::RedisError },
36}
37
38#[configurable_component]
40#[derive(Copy, Clone, Debug, Derivative)]
41#[derivative(Default)]
42#[serde(rename_all = "lowercase")]
43pub enum DataTypeConfig {
44 #[derivative(Default)]
46 List,
47
48 Channel,
52}
53
54#[configurable_component]
56#[derive(Copy, Clone, Debug, Default, Derivative, Eq, PartialEq)]
57#[serde(deny_unknown_fields, rename_all = "lowercase")]
58pub struct ListOption {
59 #[configurable(derived)]
60 method: Method,
61}
62
63#[configurable_component]
65#[derive(Clone, Copy, Debug, Derivative, Eq, PartialEq)]
66#[derivative(Default)]
67#[serde(rename_all = "lowercase")]
68pub enum Method {
69 #[derivative(Default)]
71 Lpop,
72
73 Rpop,
75}
76
77pub struct ConnectionInfo {
78 protocol: &'static str,
79 endpoint: String,
80}
81
82impl From<&redis::ConnectionInfo> for ConnectionInfo {
83 fn from(redis_conn_info: &redis::ConnectionInfo) -> Self {
84 let (protocol, endpoint) = match &redis_conn_info.addr {
85 redis::ConnectionAddr::Tcp(host, port)
86 | redis::ConnectionAddr::TcpTls { host, port, .. } => ("tcp", format!("{host}:{port}")),
87 redis::ConnectionAddr::Unix(path) => ("uds", path.to_string_lossy().to_string()),
88 };
89
90 Self { protocol, endpoint }
91 }
92}
93
94#[configurable_component(source("redis", "Collect observability data from Redis."))]
96#[derive(Clone, Debug, Derivative)]
97#[serde(deny_unknown_fields)]
98pub struct RedisSourceConfig {
99 #[serde(default)]
101 data_type: DataTypeConfig,
102
103 #[configurable(derived)]
104 list: Option<ListOption>,
105
106 #[configurable(metadata(docs::examples = "redis://127.0.0.1:6379/0"))]
110 url: String,
111
112 #[configurable(metadata(docs::examples = "vector"))]
114 key: String,
115
116 #[configurable(metadata(docs::examples = "redis_key"))]
122 redis_key: Option<OptionalValuePath>,
123
124 #[configurable(derived)]
125 #[serde(default = "default_framing_message_based")]
126 #[derivative(Default(value = "default_framing_message_based()"))]
127 framing: FramingConfig,
128
129 #[configurable(derived)]
130 #[serde(default = "default_decoding")]
131 #[derivative(Default(value = "default_decoding()"))]
132 decoding: DeserializerConfig,
133
134 #[configurable(metadata(docs::hidden))]
136 #[serde(default)]
137 log_namespace: Option<bool>,
138}
139
140impl GenerateConfig for RedisSourceConfig {
141 fn generate_config() -> toml::Value {
142 toml::from_str(
143 r#"
144 url = "redis://127.0.0.1:6379/0"
145 key = "vector"
146 data_type = "list"
147 list.method = "lpop"
148 redis_key = "redis_key"
149 "#,
150 )
151 .unwrap()
152 }
153}
154
155#[async_trait::async_trait]
156#[typetag::serde(name = "redis")]
157impl SourceConfig for RedisSourceConfig {
158 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
159 let log_namespace = cx.log_namespace(self.log_namespace);
160
161 if self.key.is_empty() {
163 return Err("`key` cannot be empty.".into());
164 }
165 let redis_key = self.redis_key.clone().and_then(|k| k.path);
166
167 let client = redis::Client::open(self.url.as_str()).context(ClientSnafu {})?;
168 let connection_info = ConnectionInfo::from(client.get_connection_info());
169 let decoder =
170 DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
171 .build()?;
172
173 let bytes_received = register!(BytesReceived::from(Protocol::from(
174 connection_info.protocol
175 )));
176 let events_received = register!(EventsReceived);
177 let handler = InputHandler {
178 client,
179 bytes_received: bytes_received.clone(),
180 events_received: events_received.clone(),
181 key: self.key.clone(),
182 redis_key,
183 decoder,
184 cx,
185 log_namespace,
186 };
187
188 match self.data_type {
189 DataTypeConfig::List => {
190 let method = self.list.unwrap_or_default().method;
191 handler.watch(method).await
192 }
193 DataTypeConfig::Channel => handler.subscribe(connection_info).await,
194 }
195 }
196
197 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
198 let log_namespace = global_log_namespace.merge(self.log_namespace);
199
200 let redis_key_path = self
201 .redis_key
202 .clone()
203 .and_then(|k| k.path)
204 .map(LegacyKey::InsertIfEmpty);
205
206 let schema_definition = self
207 .decoding
208 .schema_definition(log_namespace)
209 .with_source_metadata(
210 Self::NAME,
211 redis_key_path,
212 &owned_value_path!("key"),
213 Kind::bytes(),
214 None,
215 )
216 .with_standard_vector_source_metadata();
217
218 vec![SourceOutput::new_maybe_logs(
219 self.decoding.output_type(),
220 schema_definition,
221 )]
222 }
223
224 fn can_acknowledge(&self) -> bool {
225 false
226 }
227}
228
229struct InputHandler {
230 pub client: redis::Client,
231 pub bytes_received: Registered<BytesReceived>,
232 pub events_received: Registered<EventsReceived>,
233 pub key: String,
234 pub redis_key: Option<OwnedValuePath>,
235 pub decoder: Decoder,
236 pub log_namespace: LogNamespace,
237 pub cx: SourceContext,
238}
239
240impl InputHandler {
241 async fn handle_line(&mut self, line: String) -> Result<(), ()> {
242 let now = Utc::now();
243
244 self.bytes_received.emit(ByteSize(line.len()));
245
246 let mut stream = FramedRead::new(line.as_ref(), self.decoder.clone());
247 while let Some(next) = stream.next().await {
248 match next {
249 Ok((events, _byte_size)) => {
250 let count = events.len();
251 let byte_size = events.estimated_json_encoded_size_of();
252 self.events_received.emit(CountByteSize(count, byte_size));
253
254 let events = events.into_iter().map(|mut event| {
255 if let Event::Log(ref mut log) = event {
256 self.log_namespace.insert_vector_metadata(
257 log,
258 log_schema().source_type_key(),
259 path!("source_type"),
260 Bytes::from(RedisSourceConfig::NAME),
261 );
262 self.log_namespace.insert_vector_metadata(
263 log,
264 log_schema().timestamp_key(),
265 path!("ingest_timestamp"),
266 now,
267 );
268
269 self.log_namespace.insert_source_metadata(
270 RedisSourceConfig::NAME,
271 log,
272 self.redis_key.as_ref().map(LegacyKey::InsertIfEmpty),
273 path!("key"),
274 self.key.as_str(),
275 );
276 };
277
278 event
279 });
280
281 if (self.cx.out.send_batch(events).await).is_err() {
282 emit!(StreamClosedError { count });
283 return Err(());
284 }
285 }
286 Err(error) => {
287 if !error.can_continue() {
290 break;
291 }
292 }
293 }
294 }
295 Ok(())
296 }
297}
298
299#[cfg(test)]
300mod test {
301 use super::*;
302
303 #[test]
304 fn generate_config() {
305 crate::test_util::test_generate_config::<RedisSourceConfig>();
306 }
307}
308
309#[cfg(all(test, feature = "redis-integration-tests"))]
310mod integration_test {
311 use redis::AsyncCommands;
312
313 use super::*;
314 use crate::{
315 config::log_schema,
316 test_util::{
317 collect_n,
318 components::{run_and_assert_source_compliance_n, SOURCE_TAGS},
319 random_string,
320 },
321 SourceSender,
322 };
323 use vrl::value;
324
325 const REDIS_SERVER: &str = "redis://redis-primary:6379/0";
326
327 #[tokio::test]
328 async fn redis_source_list_rpop() {
329 let client = redis::Client::open(REDIS_SERVER).unwrap();
331 let mut conn = client.get_connection_manager().await.unwrap();
332
333 let key = format!("test-key-{}", random_string(10));
334 debug!("Test key name: {}.", key);
335
336 let _: i32 = conn.rpush(&key, "1").await.unwrap();
337 let _: i32 = conn.rpush(&key, "2").await.unwrap();
338 let _: i32 = conn.rpush(&key, "3").await.unwrap();
339
340 let config = RedisSourceConfig {
342 data_type: DataTypeConfig::List,
343 list: Some(ListOption {
344 method: Method::Rpop,
345 }),
346 url: REDIS_SERVER.to_owned(),
347 key: key.clone(),
348 redis_key: None,
349 framing: default_framing_message_based(),
350 decoding: default_decoding(),
351 log_namespace: Some(false),
352 };
353
354 let events = run_and_assert_source_compliance_n(config, 3, &SOURCE_TAGS).await;
355
356 assert_eq!(
357 events[0].as_log()[log_schema().message_key().unwrap().to_string()],
358 "3".into()
359 );
360 assert_eq!(
361 events[1].as_log()[log_schema().message_key().unwrap().to_string()],
362 "2".into()
363 );
364 assert_eq!(
365 events[2].as_log()[log_schema().message_key().unwrap().to_string()],
366 "1".into()
367 );
368 }
369
370 #[tokio::test]
371 async fn redis_source_list_rpop_with_log_namespace() {
372 let client = redis::Client::open(REDIS_SERVER).unwrap();
374 let mut conn = client.get_connection_manager().await.unwrap();
375
376 let key = format!("test-key-{}", random_string(10));
377 debug!("Test key name: {}.", key);
378
379 let _: i32 = conn.rpush(&key, "1").await.unwrap();
380
381 let config = RedisSourceConfig {
383 data_type: DataTypeConfig::List,
384 list: Some(ListOption {
385 method: Method::Rpop,
386 }),
387 url: REDIS_SERVER.to_owned(),
388 key: key.clone(),
389 redis_key: Some(OptionalValuePath::from(owned_value_path!("remapped_key"))),
390 framing: default_framing_message_based(),
391 decoding: default_decoding(),
392 log_namespace: Some(true),
393 };
394
395 let events = run_and_assert_source_compliance_n(config, 1, &SOURCE_TAGS).await;
396
397 let log_event = events[0].as_log();
398 let meta = log_event.metadata();
399
400 assert_eq!(log_event.value(), &"1".into());
401 assert_eq!(
402 meta.value()
403 .get(path!(RedisSourceConfig::NAME, "key"))
404 .unwrap(),
405 &value!(key)
406 );
407 }
408
409 #[tokio::test]
410 async fn redis_source_list_lpop() {
411 let client = redis::Client::open(REDIS_SERVER).unwrap();
413 let mut conn = client.get_connection_manager().await.unwrap();
414
415 let key = format!("test-key-{}", random_string(10));
416 debug!("Test key name: {}.", key);
417
418 let _: i32 = conn.rpush(&key, "1").await.unwrap();
419 let _: i32 = conn.rpush(&key, "2").await.unwrap();
420 let _: i32 = conn.rpush(&key, "3").await.unwrap();
421
422 let config = RedisSourceConfig {
424 data_type: DataTypeConfig::List,
425 list: Some(ListOption {
426 method: Method::Lpop,
427 }),
428 url: REDIS_SERVER.to_owned(),
429 key: key.clone(),
430 redis_key: None,
431 framing: default_framing_message_based(),
432 decoding: default_decoding(),
433 log_namespace: Some(false),
434 };
435
436 let events = run_and_assert_source_compliance_n(config, 3, &SOURCE_TAGS).await;
437
438 assert_eq!(
439 events[0].as_log()[log_schema().message_key().unwrap().to_string()],
440 "1".into()
441 );
442 assert_eq!(
443 events[1].as_log()[log_schema().message_key().unwrap().to_string()],
444 "2".into()
445 );
446 assert_eq!(
447 events[2].as_log()[log_schema().message_key().unwrap().to_string()],
448 "3".into()
449 );
450 }
451
452 #[tokio::test]
453 async fn redis_source_channel_consume_event() {
454 let key = format!("test-channel-{}", random_string(10));
455 let text = "test message for channel";
456
457 let config = RedisSourceConfig {
459 data_type: DataTypeConfig::Channel,
460 list: None,
461 url: REDIS_SERVER.to_owned(),
462 key: key.clone(),
463 redis_key: None,
464 framing: default_framing_message_based(),
465 decoding: default_decoding(),
466 log_namespace: Some(false),
467 };
468
469 let (tx, rx) = SourceSender::new_test();
470 let context = SourceContext::new_test(tx, None);
471 let source = config
472 .build(context)
473 .await
474 .expect("source should not fail to build");
475
476 tokio::spawn(source);
477
478 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
483
484 let client = redis::Client::open(REDIS_SERVER).unwrap();
486
487 let mut async_conn = client
488 .get_multiplexed_async_connection()
489 .await
490 .expect("Failed to get redis async connection.");
491
492 for _i in 0..10000 {
493 let _: i32 = async_conn.publish(key.clone(), text).await.unwrap();
494 }
495
496 let events = collect_n(rx, 10000).await;
497 assert_eq!(events.len(), 10000);
498
499 for event in events {
500 assert_eq!(
501 event.as_log()[log_schema().message_key().unwrap().to_string()],
502 text.into()
503 );
504 assert_eq!(
505 event.as_log()[log_schema().source_type_key().unwrap().to_string()],
506 RedisSourceConfig::NAME.into()
507 );
508 }
509 }
510}