1use redis::{
2 sentinel::{Sentinel, SentinelNodeConnectionInfo},
3 ProtocolVersion, RedisConnectionInfo, TlsMode,
4};
5use snafu::prelude::*;
6
7use crate::{
8 serde::OneOrMany,
9 sinks::{prelude::*, util::service::TowerRequestConfigDefaults},
10};
11
12use super::{
13 sink::{RedisConnection, RedisSink},
14 RedisCreateFailedSnafu,
15};
16
17#[derive(Clone, Copy, Debug)]
18pub struct RedisTowerRequestConfigDefaults;
19
20impl TowerRequestConfigDefaults for RedisTowerRequestConfigDefaults {
21 const CONCURRENCY: Concurrency = Concurrency::None;
22}
23
24#[configurable_component]
26#[derive(Clone, Copy, Debug, Derivative)]
27#[derivative(Default)]
28#[serde(rename_all = "lowercase")]
29pub enum DataTypeConfig {
30 #[derivative(Default)]
36 List,
37
38 SortedSet,
43
44 Channel,
48}
49
50#[configurable_component]
52#[derive(Clone, Copy, Debug, Derivative, Eq, PartialEq)]
53#[serde(rename_all = "lowercase")]
54pub struct ListOption {
55 pub method: ListMethod,
57}
58
59#[configurable_component]
61#[derive(Clone, Copy, Debug, Derivative, Eq, PartialEq)]
62#[derivative(Default)]
63#[serde(rename_all = "lowercase")]
64pub enum ListMethod {
65 #[derivative(Default)]
71 RPush,
72
73 LPush,
77}
78
79#[configurable_component]
81#[derive(Clone, Debug, Derivative, Eq, PartialEq)]
82#[serde(rename_all = "lowercase")]
83pub struct SortedSetOption {
84 pub method: Option<SortedSetMethod>,
86
87 #[configurable(validation(length(min = 1)))]
95 pub score: Option<UnsignedIntTemplate>,
96}
97
98#[configurable_component]
100#[derive(Clone, Copy, Debug, Derivative, Eq, PartialEq)]
101#[derivative(Default)]
102#[serde(rename_all = "lowercase")]
103pub enum SortedSetMethod {
104 #[derivative(Default)]
110 ZAdd,
111}
112
113#[derive(Clone, Copy, Debug, Default)]
114pub struct RedisDefaultBatchSettings;
115
116impl SinkBatchSettings for RedisDefaultBatchSettings {
117 const MAX_EVENTS: Option<usize> = Some(1);
118 const MAX_BYTES: Option<usize> = None;
119 const TIMEOUT_SECS: f64 = 1.0;
120}
121
122#[configurable_component(sink("redis", "Publish observability data to Redis."))]
124#[derive(Clone, Debug)]
125#[serde(deny_unknown_fields)]
126pub struct RedisSinkConfig {
127 #[configurable(derived)]
128 pub(super) encoding: EncodingConfig,
129
130 #[configurable(derived)]
131 #[serde(default)]
132 pub(super) data_type: DataTypeConfig,
133
134 #[configurable(derived)]
135 #[serde(alias = "list")]
136 pub(super) list_option: Option<ListOption>,
137
138 #[configurable(derived)]
139 #[serde(alias = "sorted_set")]
140 pub(super) sorted_set_option: Option<SortedSetOption>,
141
142 #[configurable(metadata(docs::examples = "redis://127.0.0.1:6379/0"))]
147 #[serde(alias = "url")]
148 pub(super) endpoint: OneOrMany<String>,
149
150 #[configurable]
155 pub(super) sentinel_service: Option<String>,
156
157 #[configurable(derived)]
158 #[serde(default)]
159 pub(super) sentinel_connect: Option<SentinelConnectionSettings>,
160
161 #[configurable(validation(length(min = 1)))]
163 #[configurable(metadata(docs::examples = "syslog:{{ app }}", docs::examples = "vector"))]
164 pub(super) key: Template,
165
166 #[configurable(derived)]
167 #[serde(default)]
168 pub(super) batch: BatchConfig<RedisDefaultBatchSettings>,
169
170 #[configurable(derived)]
171 #[serde(default)]
172 pub(super) request: TowerRequestConfig<RedisTowerRequestConfigDefaults>,
173
174 #[configurable(derived)]
175 #[serde(
176 default,
177 deserialize_with = "crate::serde::bool_or_struct",
178 skip_serializing_if = "crate::serde::is_default"
179 )]
180 pub(super) acknowledgements: AcknowledgementsConfig,
181}
182
183impl GenerateConfig for RedisSinkConfig {
184 fn generate_config() -> toml::Value {
185 toml::from_str(
186 r#"
187 url = "redis://127.0.0.1:6379/0"
188 key = "vector"
189 data_type = "list"
190 list.method = "lpush"
191 encoding.codec = "json"
192 batch.max_events = 1
193 "#,
194 )
195 .unwrap()
196 }
197}
198
199#[async_trait::async_trait]
200#[typetag::serde(name = "redis")]
201impl SinkConfig for RedisSinkConfig {
202 async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
203 if self.key.is_empty() {
204 return Err("`key` cannot be empty.".into());
205 }
206 let conn = self.build_connection().await?;
207 let healthcheck = RedisSinkConfig::healthcheck(conn.clone()).boxed();
208 let sink = RedisSink::new(self, conn)?;
209 Ok((super::VectorSink::from_event_streamsink(sink), healthcheck))
210 }
211
212 fn input(&self) -> Input {
213 Input::new(self.encoding.config().input_type())
214 }
215
216 fn acknowledgements(&self) -> &AcknowledgementsConfig {
217 &self.acknowledgements
218 }
219}
220
221impl RedisSinkConfig {
222 pub(super) async fn build_connection(&self) -> crate::Result<RedisConnection> {
223 let endpoints = self.endpoint.clone().to_vec();
224
225 if endpoints.is_empty() {
226 return Err("`endpoint` cannot be empty.".into());
227 }
228
229 if let Some(sentinel_service) = &self.sentinel_service {
230 let sentinel = Sentinel::build(endpoints).context(RedisCreateFailedSnafu)?;
231
232 Ok(RedisConnection::new_sentinel(
233 sentinel,
234 sentinel_service.clone(),
235 self.sentinel_connect.clone().unwrap_or_default().into(),
236 )
237 .await
238 .context(RedisCreateFailedSnafu)?)
239 } else {
240 let client =
242 redis::Client::open(endpoints[0].as_str()).context(RedisCreateFailedSnafu)?;
243 let conn = client
244 .get_connection_manager()
245 .await
246 .context(RedisCreateFailedSnafu)?;
247
248 Ok(RedisConnection::new_direct(conn))
249 }
250 }
251
252 async fn healthcheck(mut conn: RedisConnection) -> crate::Result<()> {
253 redis::cmd("PING")
254 .query_async(&mut conn.get_connection_manager().await?.connection)
255 .await
256 .map_err(Into::into)
257 }
258}
259
260#[configurable_component]
262#[derive(Clone, Debug, Default)]
263#[serde(deny_unknown_fields)]
264pub struct SentinelConnectionSettings {
265 #[configurable(derived)]
266 #[serde(default)]
267 pub tls: MaybeTlsMode,
268
269 #[configurable(derived)]
270 #[serde(default)]
271 pub connections: Option<RedisConnectionSettings>,
272}
273
274impl From<SentinelConnectionSettings> for SentinelNodeConnectionInfo {
275 fn from(value: SentinelConnectionSettings) -> Self {
276 SentinelNodeConnectionInfo {
277 tls_mode: value.tls.into(),
278 redis_connection_info: value.connections.map(Into::into),
279 }
280 }
281}
282
283#[configurable_component]
285#[derive(Clone, Copy, Debug, Derivative, Eq, PartialEq)]
286#[derivative(Default)]
287#[serde(rename_all = "lowercase")]
288pub enum MaybeTlsMode {
289 #[derivative(Default)]
293 None,
294
295 Secure,
297
298 Insecure,
300}
301
302impl From<MaybeTlsMode> for Option<TlsMode> {
303 fn from(value: MaybeTlsMode) -> Self {
304 match value {
305 MaybeTlsMode::None => None,
306 MaybeTlsMode::Secure => Some(TlsMode::Secure),
307 MaybeTlsMode::Insecure => Some(TlsMode::Insecure),
308 }
309 }
310}
311
312#[configurable_component]
315#[derive(Clone, Debug, Derivative)]
316#[derivative(Default)]
317pub struct RedisConnectionSettings {
318 pub db: i64,
320
321 pub username: Option<String>,
323
324 pub password: Option<String>,
326
327 pub protocol: RedisProtocolVersion,
329}
330
331impl From<RedisConnectionSettings> for RedisConnectionInfo {
332 fn from(value: RedisConnectionSettings) -> Self {
333 RedisConnectionInfo {
334 db: value.db,
335 username: value.username,
336 password: value.password,
337 protocol: value.protocol.into(),
338 }
339 }
340}
341
342#[configurable_component]
344#[derive(Clone, Copy, Debug, Derivative, Eq, PartialEq)]
345#[derivative(Default)]
346pub enum RedisProtocolVersion {
347 #[derivative(Default)]
351 RESP2,
352
353 RESP3,
355}
356
357impl From<RedisProtocolVersion> for ProtocolVersion {
358 fn from(value: RedisProtocolVersion) -> Self {
359 match value {
360 RedisProtocolVersion::RESP2 => ProtocolVersion::RESP2,
361 RedisProtocolVersion::RESP3 => ProtocolVersion::RESP3,
362 }
363 }
364}