vector/sinks/redis/
mod.rs1mod config;
5mod request_builder;
6mod service;
7mod sink;
8
9#[cfg(test)]
10mod tests;
11
12#[cfg(feature = "redis-integration-tests")]
13#[cfg(test)]
14mod integration_tests;
15
16use bytes::Bytes;
17use redis::RedisError;
18use snafu::Snafu;
19use tokio::sync::watch::error::RecvError;
20
21use crate::sinks::prelude::*;
22
23use self::{
24 config::{ListMethod, SortedSetMethod},
25 sink::GenerationCount,
26};
27
28use super::util::EncodedLength;
29
30#[derive(Debug, Snafu)]
31pub(super) enum RedisSinkError {
32 #[snafu(display("Creating Redis producer failed: {source}"))]
33 RedisCreateFailed { source: RedisError },
34 #[snafu(display(
35 "Error sending query: {source}{}",
36 if let Some(gen) = generation { format!(", gen={gen}") } else { String::new() }
37 ))]
38 SendError {
39 source: RedisError,
40 generation: Option<GenerationCount>,
41 },
42 #[snafu(display("Repair channel was closed: {source}"))]
43 RepairChannelError { source: RecvError },
44}
45
46#[derive(Clone, Copy, Debug, Derivative)]
47#[derivative(Default)]
48pub enum DataType {
49 #[derivative(Default)]
53 List(ListMethod),
54
55 SortedSet(SortedSetMethod),
59
60 Channel,
64}
65
66pub(super) struct RedisEvent {
68 event: Event,
69 key: String,
70 score: Option<u64>,
71}
72
73impl Finalizable for RedisEvent {
74 fn take_finalizers(&mut self) -> EventFinalizers {
75 self.event.take_finalizers()
76 }
77}
78
79impl ByteSizeOf for RedisEvent {
80 fn allocated_bytes(&self) -> usize {
81 self.event.allocated_bytes()
82 }
83}
84
85impl GetEventCountTags for RedisEvent {
86 fn get_tags(&self) -> TaggedEventsSent {
87 self.event.get_tags()
88 }
89}
90
91impl EstimatedJsonEncodedSizeOf for RedisEvent {
92 fn estimated_json_encoded_size_of(&self) -> JsonSize {
93 self.event.estimated_json_encoded_size_of()
94 }
95}
96
97#[derive(Clone)]
98pub(super) struct RedisRequest {
99 request: Vec<RedisKvEntry>,
100 finalizers: EventFinalizers,
101 metadata: RequestMetadata,
102}
103
104impl Finalizable for RedisRequest {
105 fn take_finalizers(&mut self) -> EventFinalizers {
106 std::mem::take(&mut self.finalizers)
107 }
108}
109
110impl MetaDescriptive for RedisRequest {
111 fn get_metadata(&self) -> &RequestMetadata {
112 &self.metadata
113 }
114
115 fn metadata_mut(&mut self) -> &mut RequestMetadata {
116 &mut self.metadata
117 }
118}
119
120#[derive(Debug, Clone)]
121pub(super) struct RedisKvEntry {
122 key: String,
123 value: Bytes,
124 score: Option<u64>,
125}
126
127impl EncodedLength for RedisKvEntry {
128 fn encoded_length(&self) -> usize {
129 self.value.len()
130 }
131}