vector/sinks/redis/
mod.rs

1//! `redis` sink.
2//!
3//! Writes data to [redis](https://redis.io/).
4mod config;
5mod request_builder;
6mod service;
7mod sink;
8
9#[cfg(test)]
10mod tests;
11
12#[cfg(feature = "redis-integration-tests")]
13#[cfg(test)]
14mod integration_tests;
15
16use bytes::Bytes;
17use redis::RedisError;
18use snafu::Snafu;
19use tokio::sync::watch::error::RecvError;
20
21use self::{
22    config::{ListMethod, SortedSetMethod},
23    sink::GenerationCount,
24};
25use super::util::EncodedLength;
26use crate::sinks::prelude::*;
27
28#[derive(Debug, Snafu)]
29pub(super) enum RedisSinkError {
30    #[snafu(display("Creating Redis producer failed: {source}"))]
31    RedisCreateFailed { source: RedisError },
32    #[snafu(display(
33        "Error sending query: {source}{}",
34        if let Some(generation) = generation { format!(", gen={generation}") } else { String::new() }
35    ))]
36    SendError {
37        source: RedisError,
38        generation: Option<GenerationCount>,
39    },
40    #[snafu(display("Repair channel was closed: {source}"))]
41    RepairChannelError { source: RecvError },
42}
43
44#[derive(Clone, Copy, Debug, Derivative)]
45#[derivative(Default)]
46pub enum DataType {
47    /// The Redis `list` type.
48    ///
49    /// This resembles a deque, where messages can be popped and pushed from either end.
50    #[derivative(Default)]
51    List(ListMethod),
52
53    /// The Redis `sorted set` type.
54    ///
55    /// This resembles a priority queue, where messages can be pushed with a score.
56    SortedSet(SortedSetMethod),
57
58    /// The Redis `channel` type.
59    ///
60    /// Redis channels function in a pub/sub fashion, allowing many-to-many broadcasting and receiving.
61    Channel,
62}
63
64/// Wrapper for an `Event` that also stored the rendered key.
65pub(super) struct RedisEvent {
66    event: Event,
67    key: String,
68    score: Option<u64>,
69}
70
71impl Finalizable for RedisEvent {
72    fn take_finalizers(&mut self) -> EventFinalizers {
73        self.event.take_finalizers()
74    }
75}
76
77impl ByteSizeOf for RedisEvent {
78    fn allocated_bytes(&self) -> usize {
79        self.event.allocated_bytes()
80    }
81}
82
83impl GetEventCountTags for RedisEvent {
84    fn get_tags(&self) -> TaggedEventsSent {
85        self.event.get_tags()
86    }
87}
88
89impl EstimatedJsonEncodedSizeOf for RedisEvent {
90    fn estimated_json_encoded_size_of(&self) -> JsonSize {
91        self.event.estimated_json_encoded_size_of()
92    }
93}
94
95#[derive(Clone)]
96pub(super) struct RedisRequest {
97    request: Vec<RedisKvEntry>,
98    finalizers: EventFinalizers,
99    metadata: RequestMetadata,
100}
101
102impl Finalizable for RedisRequest {
103    fn take_finalizers(&mut self) -> EventFinalizers {
104        std::mem::take(&mut self.finalizers)
105    }
106}
107
108impl MetaDescriptive for RedisRequest {
109    fn get_metadata(&self) -> &RequestMetadata {
110        &self.metadata
111    }
112
113    fn metadata_mut(&mut self) -> &mut RequestMetadata {
114        &mut self.metadata
115    }
116}
117
118#[derive(Debug, Clone)]
119pub(super) struct RedisKvEntry {
120    key: String,
121    value: Bytes,
122    score: Option<u64>,
123}
124
125impl EncodedLength for RedisKvEntry {
126    fn encoded_length(&self) -> usize {
127        self.value.len()
128    }
129}