1use redis::{
2 ProtocolVersion, RedisConnectionInfo, TlsMode,
3 sentinel::{Sentinel, SentinelNodeConnectionInfo},
4};
5use snafu::prelude::*;
6
7use super::{
8 RedisCreateFailedSnafu,
9 sink::{RedisConnection, RedisSink},
10};
11use crate::{
12 serde::OneOrMany,
13 sinks::{prelude::*, util::service::TowerRequestConfigDefaults},
14};
15
16#[derive(Clone, Copy, Debug)]
17pub struct RedisTowerRequestConfigDefaults;
18
19impl TowerRequestConfigDefaults for RedisTowerRequestConfigDefaults {
20 const CONCURRENCY: Concurrency = Concurrency::None;
21}
22
23#[configurable_component]
25#[derive(Clone, Copy, Debug, Default)]
26#[serde(rename_all = "lowercase")]
27pub enum DataTypeConfig {
28 #[default]
34 List,
35
36 SortedSet,
41
42 Channel,
46}
47
48#[configurable_component]
50#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
51#[serde(rename_all = "lowercase")]
52pub struct ListOption {
53 pub method: ListMethod,
55}
56
57#[configurable_component]
59#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
60#[serde(rename_all = "lowercase")]
61pub enum ListMethod {
62 #[default]
68 RPush,
69
70 LPush,
74}
75
76#[configurable_component]
78#[derive(Clone, Debug, Default, Eq, PartialEq)]
79#[serde(rename_all = "lowercase")]
80pub struct SortedSetOption {
81 pub method: Option<SortedSetMethod>,
83
84 #[configurable(validation(length(min = 1)))]
92 pub score: Option<UnsignedIntTemplate>,
93}
94
95#[configurable_component]
97#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
98#[serde(rename_all = "lowercase")]
99pub enum SortedSetMethod {
100 #[default]
106 ZAdd,
107}
108
109#[derive(Clone, Copy, Debug, Default)]
110pub struct RedisDefaultBatchSettings;
111
112impl SinkBatchSettings for RedisDefaultBatchSettings {
113 const MAX_EVENTS: Option<usize> = Some(1);
114 const MAX_BYTES: Option<usize> = None;
115 const TIMEOUT_SECS: f64 = 1.0;
116}
117
118#[configurable_component(sink("redis", "Publish observability data to Redis."))]
120#[derive(Clone, Debug)]
121#[serde(deny_unknown_fields)]
122pub struct RedisSinkConfig {
123 #[configurable(derived)]
124 pub(super) encoding: EncodingConfig,
125
126 #[configurable(derived)]
127 #[serde(default)]
128 pub(super) data_type: DataTypeConfig,
129
130 #[configurable(derived)]
131 #[serde(alias = "list")]
132 pub(super) list_option: Option<ListOption>,
133
134 #[configurable(derived)]
135 #[serde(alias = "sorted_set")]
136 pub(super) sorted_set_option: Option<SortedSetOption>,
137
138 #[configurable(metadata(docs::examples = "redis://127.0.0.1:6379/0"))]
143 #[serde(alias = "url")]
144 pub(super) endpoint: OneOrMany<String>,
145
146 #[configurable]
151 pub(super) sentinel_service: Option<String>,
152
153 #[configurable(derived)]
154 #[serde(default)]
155 pub(super) sentinel_connect: Option<SentinelConnectionSettings>,
156
157 #[configurable(validation(length(min = 1)))]
159 #[configurable(metadata(docs::examples = "syslog:{{ app }}", docs::examples = "vector"))]
160 pub(super) key: Template,
161
162 #[configurable(derived)]
163 #[serde(default)]
164 pub(super) batch: BatchConfig<RedisDefaultBatchSettings>,
165
166 #[configurable(derived)]
167 #[serde(default)]
168 pub(super) request: TowerRequestConfig<RedisTowerRequestConfigDefaults>,
169
170 #[configurable(derived)]
171 #[serde(
172 default,
173 deserialize_with = "crate::serde::bool_or_struct",
174 skip_serializing_if = "crate::serde::is_default"
175 )]
176 pub(super) acknowledgements: AcknowledgementsConfig,
177}
178
179impl GenerateConfig for RedisSinkConfig {
180 fn generate_config() -> toml::Value {
181 toml::from_str(
182 r#"
183 url = "redis://127.0.0.1:6379/0"
184 key = "vector"
185 data_type = "list"
186 list.method = "lpush"
187 encoding.codec = "json"
188 batch.max_events = 1
189 "#,
190 )
191 .unwrap()
192 }
193}
194
195#[async_trait::async_trait]
196#[typetag::serde(name = "redis")]
197impl SinkConfig for RedisSinkConfig {
198 async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
199 if self.key.is_empty() {
200 return Err("`key` cannot be empty.".into());
201 }
202 let conn = self.build_connection().await?;
203 let healthcheck = RedisSinkConfig::healthcheck(conn.clone()).boxed();
204 let sink = RedisSink::new(self, conn)?;
205 Ok((super::VectorSink::from_event_streamsink(sink), healthcheck))
206 }
207
208 fn input(&self) -> Input {
209 Input::new(self.encoding.config().input_type())
210 }
211
212 fn acknowledgements(&self) -> &AcknowledgementsConfig {
213 &self.acknowledgements
214 }
215}
216
217impl RedisSinkConfig {
218 pub(super) async fn build_connection(&self) -> crate::Result<RedisConnection> {
219 let endpoints = self.endpoint.clone().to_vec();
220
221 if endpoints.is_empty() {
222 return Err("`endpoint` cannot be empty.".into());
223 }
224
225 if let Some(sentinel_service) = &self.sentinel_service {
226 let sentinel = Sentinel::build(endpoints).context(RedisCreateFailedSnafu)?;
227
228 Ok(RedisConnection::new_sentinel(
229 sentinel,
230 sentinel_service.clone(),
231 self.sentinel_connect.clone().unwrap_or_default().into(),
232 )
233 .await
234 .context(RedisCreateFailedSnafu)?)
235 } else {
236 let client =
238 redis::Client::open(endpoints[0].as_str()).context(RedisCreateFailedSnafu)?;
239 let conn = client
240 .get_connection_manager()
241 .await
242 .context(RedisCreateFailedSnafu)?;
243
244 Ok(RedisConnection::new_direct(conn))
245 }
246 }
247
248 async fn healthcheck(mut conn: RedisConnection) -> crate::Result<()> {
249 redis::cmd("PING")
250 .query_async(&mut conn.get_connection_manager().await?.connection)
251 .await
252 .map_err(Into::into)
253 }
254}
255
256#[configurable_component]
258#[derive(Clone, Debug, Default)]
259#[serde(deny_unknown_fields)]
260pub struct SentinelConnectionSettings {
261 #[configurable(derived)]
262 #[serde(default)]
263 pub tls: MaybeTlsMode,
264
265 #[configurable(derived)]
266 #[serde(default)]
267 pub connections: Option<RedisConnectionSettings>,
268}
269
270impl From<SentinelConnectionSettings> for SentinelNodeConnectionInfo {
271 fn from(value: SentinelConnectionSettings) -> Self {
272 SentinelNodeConnectionInfo {
273 tls_mode: value.tls.into(),
274 redis_connection_info: value.connections.map(Into::into),
275 }
276 }
277}
278
279#[configurable_component]
281#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
282#[serde(rename_all = "lowercase")]
283pub enum MaybeTlsMode {
284 #[default]
288 None,
289
290 Secure,
292
293 Insecure,
295}
296
297impl From<MaybeTlsMode> for Option<TlsMode> {
298 fn from(value: MaybeTlsMode) -> Self {
299 match value {
300 MaybeTlsMode::None => None,
301 MaybeTlsMode::Secure => Some(TlsMode::Secure),
302 MaybeTlsMode::Insecure => Some(TlsMode::Insecure),
303 }
304 }
305}
306
307#[configurable_component]
310#[derive(Clone, Debug, Default)]
311pub struct RedisConnectionSettings {
312 pub db: i64,
314
315 pub username: Option<String>,
317
318 pub password: Option<String>,
320
321 pub protocol: RedisProtocolVersion,
323}
324
325impl From<RedisConnectionSettings> for RedisConnectionInfo {
326 fn from(value: RedisConnectionSettings) -> Self {
327 RedisConnectionInfo {
328 db: value.db,
329 username: value.username,
330 password: value.password,
331 protocol: value.protocol.into(),
332 }
333 }
334}
335
336#[configurable_component]
338#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
339pub enum RedisProtocolVersion {
340 #[default]
344 RESP2,
345
346 RESP3,
348}
349
350impl From<RedisProtocolVersion> for ProtocolVersion {
351 fn from(value: RedisProtocolVersion) -> Self {
352 match value {
353 RedisProtocolVersion::RESP2 => ProtocolVersion::RESP2,
354 RedisProtocolVersion::RESP3 => ProtocolVersion::RESP3,
355 }
356 }
357}