vector/sinks/redis/
mod.rs

1//! `redis` sink.
2//!
3//! Writes data to [redis](https://redis.io/).
4mod config;
5mod request_builder;
6mod service;
7mod sink;
8
9#[cfg(test)]
10mod tests;
11
12#[cfg(feature = "redis-integration-tests")]
13#[cfg(test)]
14mod integration_tests;
15
16use bytes::Bytes;
17use redis::RedisError;
18use snafu::Snafu;
19use tokio::sync::watch::error::RecvError;
20
21use crate::sinks::prelude::*;
22
23use self::{
24    config::{ListMethod, SortedSetMethod},
25    sink::GenerationCount,
26};
27
28use super::util::EncodedLength;
29
30#[derive(Debug, Snafu)]
31pub(super) enum RedisSinkError {
32    #[snafu(display("Creating Redis producer failed: {source}"))]
33    RedisCreateFailed { source: RedisError },
34    #[snafu(display(
35        "Error sending query: {source}{}",
36        if let Some(gen) = generation { format!(", gen={gen}") } else { String::new() }
37    ))]
38    SendError {
39        source: RedisError,
40        generation: Option<GenerationCount>,
41    },
42    #[snafu(display("Repair channel was closed: {source}"))]
43    RepairChannelError { source: RecvError },
44}
45
46#[derive(Clone, Copy, Debug, Derivative)]
47#[derivative(Default)]
48pub enum DataType {
49    /// The Redis `list` type.
50    ///
51    /// This resembles a deque, where messages can be popped and pushed from either end.
52    #[derivative(Default)]
53    List(ListMethod),
54
55    /// The Redis `sorted set` type.
56    ///
57    /// This resembles a priority queue, where messages can be pushed with a score.
58    SortedSet(SortedSetMethod),
59
60    /// The Redis `channel` type.
61    ///
62    /// Redis channels function in a pub/sub fashion, allowing many-to-many broadcasting and receiving.
63    Channel,
64}
65
66/// Wrapper for an `Event` that also stored the rendered key.
67pub(super) struct RedisEvent {
68    event: Event,
69    key: String,
70    score: Option<u64>,
71}
72
73impl Finalizable for RedisEvent {
74    fn take_finalizers(&mut self) -> EventFinalizers {
75        self.event.take_finalizers()
76    }
77}
78
79impl ByteSizeOf for RedisEvent {
80    fn allocated_bytes(&self) -> usize {
81        self.event.allocated_bytes()
82    }
83}
84
85impl GetEventCountTags for RedisEvent {
86    fn get_tags(&self) -> TaggedEventsSent {
87        self.event.get_tags()
88    }
89}
90
91impl EstimatedJsonEncodedSizeOf for RedisEvent {
92    fn estimated_json_encoded_size_of(&self) -> JsonSize {
93        self.event.estimated_json_encoded_size_of()
94    }
95}
96
97#[derive(Clone)]
98pub(super) struct RedisRequest {
99    request: Vec<RedisKvEntry>,
100    finalizers: EventFinalizers,
101    metadata: RequestMetadata,
102}
103
104impl Finalizable for RedisRequest {
105    fn take_finalizers(&mut self) -> EventFinalizers {
106        std::mem::take(&mut self.finalizers)
107    }
108}
109
110impl MetaDescriptive for RedisRequest {
111    fn get_metadata(&self) -> &RequestMetadata {
112        &self.metadata
113    }
114
115    fn metadata_mut(&mut self) -> &mut RequestMetadata {
116        &mut self.metadata
117    }
118}
119
120#[derive(Debug, Clone)]
121pub(super) struct RedisKvEntry {
122    key: String,
123    value: Bytes,
124    score: Option<u64>,
125}
126
127impl EncodedLength for RedisKvEntry {
128    fn encoded_length(&self) -> usize {
129        self.value.len()
130    }
131}