1use bytes::Bytes;
2use chrono::Utc;
3use futures::StreamExt;
4use snafu::{ResultExt, Snafu};
5use vector_lib::{
6 EstimatedJsonEncodedSizeOf,
7 codecs::{
8 Decoder, DecoderFramedRead, DecodingConfig, StreamDecodingError,
9 decoding::{DeserializerConfig, FramingConfig},
10 },
11 config::{LegacyKey, LogNamespace},
12 configurable::configurable_component,
13 internal_event::{
14 ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol, Registered,
15 },
16 lookup::{OwnedValuePath, lookup_v2::OptionalValuePath, owned_value_path, path},
17};
18use vrl::value::Kind;
19
20use crate::{
21 config::{GenerateConfig, SourceConfig, SourceContext, SourceOutput, log_schema},
22 event::Event,
23 internal_events::{EventsReceived, StreamClosedError},
24 serde::{default_decoding, default_framing_message_based},
25};
26
27mod channel;
28mod list;
29
30#[derive(Debug, Snafu)]
31enum BuildError {
32 #[snafu(display("Failed to build redis client: {}", source))]
33 Client { source: redis::RedisError },
34}
35
36#[configurable_component]
38#[derive(Copy, Clone, Debug, Default)]
39#[serde(rename_all = "lowercase")]
40pub enum DataTypeConfig {
41 #[default]
43 List,
44
45 Channel,
49}
50
51#[configurable_component]
53#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
54#[serde(deny_unknown_fields, rename_all = "lowercase")]
55pub struct ListOption {
56 #[configurable(derived)]
57 method: Method,
58}
59
60#[configurable_component]
62#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
63#[serde(rename_all = "lowercase")]
64pub enum Method {
65 #[default]
67 Lpop,
68
69 Rpop,
71}
72
73pub struct ConnectionInfo {
74 protocol: &'static str,
75 endpoint: String,
76}
77
78impl From<&redis::ConnectionInfo> for ConnectionInfo {
79 fn from(redis_conn_info: &redis::ConnectionInfo) -> Self {
80 let (protocol, endpoint) = match &redis_conn_info.addr {
81 redis::ConnectionAddr::Tcp(host, port)
82 | redis::ConnectionAddr::TcpTls { host, port, .. } => ("tcp", format!("{host}:{port}")),
83 redis::ConnectionAddr::Unix(path) => ("uds", path.to_string_lossy().to_string()),
84 };
85
86 Self { protocol, endpoint }
87 }
88}
89
90#[configurable_component(source("redis", "Collect observability data from Redis."))]
92#[derive(Clone, Debug, Derivative)]
93#[serde(deny_unknown_fields)]
94pub struct RedisSourceConfig {
95 #[serde(default)]
97 data_type: DataTypeConfig,
98
99 #[configurable(derived)]
100 list: Option<ListOption>,
101
102 #[configurable(metadata(docs::examples = "redis://127.0.0.1:6379/0"))]
106 url: String,
107
108 #[configurable(metadata(docs::examples = "vector"))]
110 key: String,
111
112 #[configurable(metadata(docs::examples = "redis_key"))]
118 redis_key: Option<OptionalValuePath>,
119
120 #[configurable(derived)]
121 #[serde(default = "default_framing_message_based")]
122 #[derivative(Default(value = "default_framing_message_based()"))]
123 framing: FramingConfig,
124
125 #[configurable(derived)]
126 #[serde(default = "default_decoding")]
127 #[derivative(Default(value = "default_decoding()"))]
128 decoding: DeserializerConfig,
129
130 #[configurable(metadata(docs::hidden))]
132 #[serde(default)]
133 log_namespace: Option<bool>,
134}
135
136impl GenerateConfig for RedisSourceConfig {
137 fn generate_config() -> toml::Value {
138 toml::from_str(
139 r#"
140 url = "redis://127.0.0.1:6379/0"
141 key = "vector"
142 data_type = "list"
143 list.method = "lpop"
144 redis_key = "redis_key"
145 "#,
146 )
147 .unwrap()
148 }
149}
150
151#[async_trait::async_trait]
152#[typetag::serde(name = "redis")]
153impl SourceConfig for RedisSourceConfig {
154 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
155 let log_namespace = cx.log_namespace(self.log_namespace);
156
157 if self.key.is_empty() {
159 return Err("`key` cannot be empty.".into());
160 }
161 let redis_key = self.redis_key.clone().and_then(|k| k.path);
162
163 let client = redis::Client::open(self.url.as_str()).context(ClientSnafu {})?;
164 let connection_info = ConnectionInfo::from(client.get_connection_info());
165 let decoder =
166 DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
167 .build()?;
168
169 let bytes_received = register!(BytesReceived::from(Protocol::from(
170 connection_info.protocol
171 )));
172 let events_received = register!(EventsReceived);
173 let handler = InputHandler {
174 client,
175 bytes_received: bytes_received.clone(),
176 events_received: events_received.clone(),
177 key: self.key.clone(),
178 redis_key,
179 decoder,
180 cx,
181 log_namespace,
182 };
183
184 match self.data_type {
185 DataTypeConfig::List => {
186 let method = self.list.unwrap_or_default().method;
187 handler.watch(method).await
188 }
189 DataTypeConfig::Channel => handler.subscribe(connection_info).await,
190 }
191 }
192
193 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
194 let log_namespace = global_log_namespace.merge(self.log_namespace);
195
196 let redis_key_path = self
197 .redis_key
198 .clone()
199 .and_then(|k| k.path)
200 .map(LegacyKey::InsertIfEmpty);
201
202 let schema_definition = self
203 .decoding
204 .schema_definition(log_namespace)
205 .with_source_metadata(
206 Self::NAME,
207 redis_key_path,
208 &owned_value_path!("key"),
209 Kind::bytes(),
210 None,
211 )
212 .with_standard_vector_source_metadata();
213
214 vec![SourceOutput::new_maybe_logs(
215 self.decoding.output_type(),
216 schema_definition,
217 )]
218 }
219
220 fn can_acknowledge(&self) -> bool {
221 false
222 }
223}
224
225struct InputHandler {
226 pub client: redis::Client,
227 pub bytes_received: Registered<BytesReceived>,
228 pub events_received: Registered<EventsReceived>,
229 pub key: String,
230 pub redis_key: Option<OwnedValuePath>,
231 pub decoder: Decoder,
232 pub log_namespace: LogNamespace,
233 pub cx: SourceContext,
234}
235
236impl InputHandler {
237 async fn handle_line(&mut self, line: String) -> Result<(), ()> {
238 let now = Utc::now();
239
240 self.bytes_received.emit(ByteSize(line.len()));
241
242 let mut stream = DecoderFramedRead::new(line.as_ref(), self.decoder.clone());
243 while let Some(next) = stream.next().await {
244 match next {
245 Ok((events, _byte_size)) => {
246 let count = events.len();
247 let byte_size = events.estimated_json_encoded_size_of();
248 self.events_received.emit(CountByteSize(count, byte_size));
249
250 let events = events.into_iter().map(|mut event| {
251 if let Event::Log(ref mut log) = event {
252 self.log_namespace.insert_vector_metadata(
253 log,
254 log_schema().source_type_key(),
255 path!("source_type"),
256 Bytes::from(RedisSourceConfig::NAME),
257 );
258 self.log_namespace.insert_vector_metadata(
259 log,
260 log_schema().timestamp_key(),
261 path!("ingest_timestamp"),
262 now,
263 );
264
265 self.log_namespace.insert_source_metadata(
266 RedisSourceConfig::NAME,
267 log,
268 self.redis_key.as_ref().map(LegacyKey::InsertIfEmpty),
269 path!("key"),
270 self.key.as_str(),
271 );
272 };
273
274 event
275 });
276
277 if (self.cx.out.send_batch(events).await).is_err() {
278 emit!(StreamClosedError { count });
279 return Err(());
280 }
281 }
282 Err(error) => {
283 if !error.can_continue() {
286 break;
287 }
288 }
289 }
290 }
291 Ok(())
292 }
293}
294
295#[cfg(test)]
296mod test {
297 use super::*;
298
299 #[test]
300 fn generate_config() {
301 crate::test_util::test_generate_config::<RedisSourceConfig>();
302 }
303}
304
305#[cfg(all(test, feature = "redis-integration-tests"))]
306mod integration_test {
307 use redis::AsyncCommands;
308 use vrl::value;
309
310 use super::*;
311 use crate::{
312 SourceSender,
313 config::log_schema,
314 test_util::{
315 collect_n,
316 components::{SOURCE_TAGS, run_and_assert_source_compliance_n},
317 random_string,
318 },
319 };
320
321 const REDIS_SERVER: &str = "redis://redis-primary:6379/0";
322
323 #[tokio::test]
324 async fn redis_source_list_rpop() {
325 let client = redis::Client::open(REDIS_SERVER).unwrap();
327 let mut conn = client.get_connection_manager().await.unwrap();
328
329 let key = format!("test-key-{}", random_string(10));
330 debug!("Test key name: {}.", key);
331
332 let _: i32 = conn.rpush(&key, "1").await.unwrap();
333 let _: i32 = conn.rpush(&key, "2").await.unwrap();
334 let _: i32 = conn.rpush(&key, "3").await.unwrap();
335
336 let config = RedisSourceConfig {
338 data_type: DataTypeConfig::List,
339 list: Some(ListOption {
340 method: Method::Rpop,
341 }),
342 url: REDIS_SERVER.to_owned(),
343 key: key.clone(),
344 redis_key: None,
345 framing: default_framing_message_based(),
346 decoding: default_decoding(),
347 log_namespace: Some(false),
348 };
349
350 let events = run_and_assert_source_compliance_n(config, 3, &SOURCE_TAGS).await;
351
352 assert_eq!(
353 events[0].as_log()[log_schema().message_key().unwrap().to_string()],
354 "3".into()
355 );
356 assert_eq!(
357 events[1].as_log()[log_schema().message_key().unwrap().to_string()],
358 "2".into()
359 );
360 assert_eq!(
361 events[2].as_log()[log_schema().message_key().unwrap().to_string()],
362 "1".into()
363 );
364 }
365
366 #[tokio::test]
367 async fn redis_source_list_rpop_with_log_namespace() {
368 let client = redis::Client::open(REDIS_SERVER).unwrap();
370 let mut conn = client.get_connection_manager().await.unwrap();
371
372 let key = format!("test-key-{}", random_string(10));
373 debug!("Test key name: {}.", key);
374
375 let _: i32 = conn.rpush(&key, "1").await.unwrap();
376
377 let config = RedisSourceConfig {
379 data_type: DataTypeConfig::List,
380 list: Some(ListOption {
381 method: Method::Rpop,
382 }),
383 url: REDIS_SERVER.to_owned(),
384 key: key.clone(),
385 redis_key: Some(OptionalValuePath::from(owned_value_path!("remapped_key"))),
386 framing: default_framing_message_based(),
387 decoding: default_decoding(),
388 log_namespace: Some(true),
389 };
390
391 let events = run_and_assert_source_compliance_n(config, 1, &SOURCE_TAGS).await;
392
393 let log_event = events[0].as_log();
394 let meta = log_event.metadata();
395
396 assert_eq!(log_event.value(), &"1".into());
397 assert_eq!(
398 meta.value()
399 .get(path!(RedisSourceConfig::NAME, "key"))
400 .unwrap(),
401 &value!(key)
402 );
403 }
404
405 #[tokio::test]
406 async fn redis_source_list_lpop() {
407 let client = redis::Client::open(REDIS_SERVER).unwrap();
409 let mut conn = client.get_connection_manager().await.unwrap();
410
411 let key = format!("test-key-{}", random_string(10));
412 debug!("Test key name: {}.", key);
413
414 let _: i32 = conn.rpush(&key, "1").await.unwrap();
415 let _: i32 = conn.rpush(&key, "2").await.unwrap();
416 let _: i32 = conn.rpush(&key, "3").await.unwrap();
417
418 let config = RedisSourceConfig {
420 data_type: DataTypeConfig::List,
421 list: Some(ListOption {
422 method: Method::Lpop,
423 }),
424 url: REDIS_SERVER.to_owned(),
425 key: key.clone(),
426 redis_key: None,
427 framing: default_framing_message_based(),
428 decoding: default_decoding(),
429 log_namespace: Some(false),
430 };
431
432 let events = run_and_assert_source_compliance_n(config, 3, &SOURCE_TAGS).await;
433
434 assert_eq!(
435 events[0].as_log()[log_schema().message_key().unwrap().to_string()],
436 "1".into()
437 );
438 assert_eq!(
439 events[1].as_log()[log_schema().message_key().unwrap().to_string()],
440 "2".into()
441 );
442 assert_eq!(
443 events[2].as_log()[log_schema().message_key().unwrap().to_string()],
444 "3".into()
445 );
446 }
447
448 #[tokio::test]
449 async fn redis_source_channel_consume_event() {
450 let key = format!("test-channel-{}", random_string(10));
451 let text = "test message for channel";
452
453 let config = RedisSourceConfig {
455 data_type: DataTypeConfig::Channel,
456 list: None,
457 url: REDIS_SERVER.to_owned(),
458 key: key.clone(),
459 redis_key: None,
460 framing: default_framing_message_based(),
461 decoding: default_decoding(),
462 log_namespace: Some(false),
463 };
464
465 let (tx, rx) = SourceSender::new_test();
466 let context = SourceContext::new_test(tx, None);
467 let source = config
468 .build(context)
469 .await
470 .expect("source should not fail to build");
471
472 tokio::spawn(source);
473
474 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
479
480 let client = redis::Client::open(REDIS_SERVER).unwrap();
482
483 let mut async_conn = client
484 .get_multiplexed_async_connection()
485 .await
486 .expect("Failed to get redis async connection.");
487
488 for _i in 0..10000 {
489 let _: i32 = async_conn.publish(key.clone(), text).await.unwrap();
490 }
491
492 let events = collect_n(rx, 10000).await;
493 assert_eq!(events.len(), 10000);
494
495 for event in events {
496 assert_eq!(
497 event.as_log()[log_schema().message_key().unwrap().to_string()],
498 text.into()
499 );
500 assert_eq!(
501 event.as_log()[log_schema().source_type_key().unwrap().to_string()],
502 RedisSourceConfig::NAME.into()
503 );
504 }
505 }
506}