vector/sinks/redis/
mod.rs1mod config;
5mod request_builder;
6mod service;
7mod sink;
8
9#[cfg(test)]
10mod tests;
11
12#[cfg(feature = "redis-integration-tests")]
13#[cfg(test)]
14mod integration_tests;
15
16use bytes::Bytes;
17use redis::RedisError;
18use snafu::Snafu;
19use tokio::sync::watch::error::RecvError;
20
21use self::{
22 config::{ListMethod, SortedSetMethod},
23 sink::GenerationCount,
24};
25use super::util::EncodedLength;
26use crate::sinks::prelude::*;
27
28#[derive(Debug, Snafu)]
29pub(super) enum RedisSinkError {
30 #[snafu(display("Creating Redis producer failed: {source}"))]
31 RedisCreateFailed { source: RedisError },
32 #[snafu(display(
33 "Error sending query: {source}{}",
34 if let Some(generation) = generation { format!(", gen={generation}") } else { String::new() }
35 ))]
36 SendError {
37 source: RedisError,
38 generation: Option<GenerationCount>,
39 },
40 #[snafu(display("Repair channel was closed: {source}"))]
41 RepairChannelError { source: RecvError },
42}
43
44#[derive(Clone, Copy, Debug, Derivative)]
45#[derivative(Default)]
46pub enum DataType {
47 #[derivative(Default)]
51 List(ListMethod),
52
53 SortedSet(SortedSetMethod),
57
58 Channel,
62}
63
64pub(super) struct RedisEvent {
66 event: Event,
67 key: String,
68 score: Option<u64>,
69}
70
71impl Finalizable for RedisEvent {
72 fn take_finalizers(&mut self) -> EventFinalizers {
73 self.event.take_finalizers()
74 }
75}
76
77impl ByteSizeOf for RedisEvent {
78 fn allocated_bytes(&self) -> usize {
79 self.event.allocated_bytes()
80 }
81}
82
83impl GetEventCountTags for RedisEvent {
84 fn get_tags(&self) -> TaggedEventsSent {
85 self.event.get_tags()
86 }
87}
88
89impl EstimatedJsonEncodedSizeOf for RedisEvent {
90 fn estimated_json_encoded_size_of(&self) -> JsonSize {
91 self.event.estimated_json_encoded_size_of()
92 }
93}
94
95#[derive(Clone)]
96pub(super) struct RedisRequest {
97 request: Vec<RedisKvEntry>,
98 finalizers: EventFinalizers,
99 metadata: RequestMetadata,
100}
101
102impl Finalizable for RedisRequest {
103 fn take_finalizers(&mut self) -> EventFinalizers {
104 std::mem::take(&mut self.finalizers)
105 }
106}
107
108impl MetaDescriptive for RedisRequest {
109 fn get_metadata(&self) -> &RequestMetadata {
110 &self.metadata
111 }
112
113 fn metadata_mut(&mut self) -> &mut RequestMetadata {
114 &mut self.metadata
115 }
116}
117
118#[derive(Debug, Clone)]
119pub(super) struct RedisKvEntry {
120 key: String,
121 value: Bytes,
122 score: Option<u64>,
123}
124
125impl EncodedLength for RedisKvEntry {
126 fn encoded_length(&self) -> usize {
127 self.value.len()
128 }
129}