vector/sinks/redis/
service.rs1use std::task::{Context, Poll};
2
3use super::{
4 RedisRequest, RedisSinkError,
5 config::{ListMethod, SortedSetMethod},
6 sink::{ConnectionState, RedisConnection},
7};
8use crate::sinks::prelude::*;
9
10#[derive(Clone)]
11pub struct RedisService {
12 pub(super) conn: RedisConnection,
13 pub(super) data_type: super::DataType,
14}
15
16impl Service<RedisRequest> for RedisService {
17 type Response = RedisResponse;
18 type Error = RedisSinkError;
19 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
20
21 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
23 Poll::Ready(Ok(()))
24 }
25
26 fn call(&mut self, kvs: RedisRequest) -> Self::Future {
28 let count = kvs.request.len();
29
30 let mut redis_conn = self.conn.clone();
31 let mut pipe = redis::pipe();
32
33 for kv in kvs.request {
34 match self.data_type {
35 super::DataType::List(method) => match method {
36 ListMethod::LPush => {
37 if count > 1 {
38 pipe.atomic().lpush(kv.key, kv.value.as_ref());
39 } else {
40 pipe.lpush(kv.key, kv.value.as_ref());
41 }
42 }
43 ListMethod::RPush => {
44 if count > 1 {
45 pipe.atomic().rpush(kv.key, kv.value.as_ref());
46 } else {
47 pipe.rpush(kv.key, kv.value.as_ref());
48 }
49 }
50 },
51 super::DataType::SortedSet(method) => match method {
52 SortedSetMethod::ZAdd => {
53 if count > 1 {
54 pipe.atomic().zadd(
55 kv.key,
56 kv.value.as_ref(),
57 kv.score.unwrap_or(0) as f64,
58 );
59 } else {
60 pipe.zadd(kv.key, kv.value.as_ref(), kv.score.unwrap_or(0) as f64);
61 }
62 }
63 },
64 super::DataType::Channel => {
65 if count > 1 {
66 pipe.atomic().publish(kv.key, kv.value.as_ref());
67 } else {
68 pipe.publish(kv.key, kv.value.as_ref());
69 }
70 }
71 }
72 }
73
74 let byte_size = kvs.metadata.events_byte_size();
75
76 Box::pin(async move {
77 let ConnectionState {
78 connection: mut conn,
79 generation,
80 } = redis_conn.get_connection_manager().await?;
81
82 match pipe.query_async(&mut conn).await {
83 Ok(event_status) => Ok(RedisResponse {
84 event_status,
85 events_byte_size: kvs.metadata.into_events_estimated_json_encoded_byte_size(),
86 byte_size,
87 }),
88 Err(error) => Err(RedisSinkError::SendError {
89 source: error,
90 generation,
91 }),
92 }
93 })
94 }
95}
96
97pub struct RedisResponse {
98 pub event_status: Vec<bool>,
99 pub events_byte_size: GroupedCountByteSize,
100 pub byte_size: usize,
101}
102
103impl RedisResponse {
104 pub(super) fn is_successful(&self) -> bool {
105 self.event_status.iter().all(|x| *x)
106 }
107}
108
109impl DriverResponse for RedisResponse {
110 fn event_status(&self) -> EventStatus {
111 if self.is_successful() {
112 EventStatus::Delivered
113 } else {
114 EventStatus::Errored
115 }
116 }
117
118 fn events_sent(&self) -> &GroupedCountByteSize {
119 &self.events_byte_size
120 }
121
122 fn bytes_sent(&self) -> Option<usize> {
123 Some(self.byte_size)
124 }
125}