vector/sinks/redis/
service.rs1use std::task::{Context, Poll};
2
3use crate::sinks::prelude::*;
4
5use super::{
6 config::{ListMethod, SortedSetMethod},
7 sink::{ConnectionState, RedisConnection},
8 RedisRequest, RedisSinkError,
9};
10
11#[derive(Clone)]
12pub struct RedisService {
13 pub(super) conn: RedisConnection,
14 pub(super) data_type: super::DataType,
15}
16
17impl Service<RedisRequest> for RedisService {
18 type Response = RedisResponse;
19 type Error = RedisSinkError;
20 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
21
22 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
24 Poll::Ready(Ok(()))
25 }
26
27 fn call(&mut self, kvs: RedisRequest) -> Self::Future {
29 let count = kvs.request.len();
30
31 let mut redis_conn = self.conn.clone();
32 let mut pipe = redis::pipe();
33
34 for kv in kvs.request {
35 match self.data_type {
36 super::DataType::List(method) => match method {
37 ListMethod::LPush => {
38 if count > 1 {
39 pipe.atomic().lpush(kv.key, kv.value.as_ref());
40 } else {
41 pipe.lpush(kv.key, kv.value.as_ref());
42 }
43 }
44 ListMethod::RPush => {
45 if count > 1 {
46 pipe.atomic().rpush(kv.key, kv.value.as_ref());
47 } else {
48 pipe.rpush(kv.key, kv.value.as_ref());
49 }
50 }
51 },
52 super::DataType::SortedSet(method) => match method {
53 SortedSetMethod::ZAdd => {
54 if count > 1 {
55 pipe.atomic().zadd(
56 kv.key,
57 kv.value.as_ref(),
58 kv.score.unwrap_or(0) as f64,
59 );
60 } else {
61 pipe.zadd(kv.key, kv.value.as_ref(), kv.score.unwrap_or(0) as f64);
62 }
63 }
64 },
65 super::DataType::Channel => {
66 if count > 1 {
67 pipe.atomic().publish(kv.key, kv.value.as_ref());
68 } else {
69 pipe.publish(kv.key, kv.value.as_ref());
70 }
71 }
72 }
73 }
74
75 let byte_size = kvs.metadata.events_byte_size();
76
77 Box::pin(async move {
78 let ConnectionState {
79 connection: mut conn,
80 generation,
81 } = redis_conn.get_connection_manager().await?;
82
83 match pipe.query_async(&mut conn).await {
84 Ok(event_status) => Ok(RedisResponse {
85 event_status,
86 events_byte_size: kvs.metadata.into_events_estimated_json_encoded_byte_size(),
87 byte_size,
88 }),
89 Err(error) => Err(RedisSinkError::SendError {
90 source: error,
91 generation,
92 }),
93 }
94 })
95 }
96}
97
98pub struct RedisResponse {
99 pub event_status: Vec<bool>,
100 pub events_byte_size: GroupedCountByteSize,
101 pub byte_size: usize,
102}
103
104impl RedisResponse {
105 pub(super) fn is_successful(&self) -> bool {
106 self.event_status.iter().all(|x| *x)
107 }
108}
109
110impl DriverResponse for RedisResponse {
111 fn event_status(&self) -> EventStatus {
112 if self.is_successful() {
113 EventStatus::Delivered
114 } else {
115 EventStatus::Errored
116 }
117 }
118
119 fn events_sent(&self) -> &GroupedCountByteSize {
120 &self.events_byte_size
121 }
122
123 fn bytes_sent(&self) -> Option<usize> {
124 Some(self.byte_size)
125 }
126}