vector/sinks/loki/
service.rs

1use std::task::{Context, Poll};
2
3use bytes::Bytes;
4use http::StatusCode;
5use snafu::Snafu;
6use tracing::Instrument;
7
8use crate::{
9    http::{Auth, HttpClient},
10    sinks::{prelude::*, util::UriSerde},
11};
12
13#[derive(Clone)]
14pub struct LokiRetryLogic;
15
16impl RetryLogic for LokiRetryLogic {
17    type Error = LokiError;
18    type Request = LokiRequest;
19    type Response = LokiResponse;
20
21    fn is_retriable_error(&self, error: &Self::Error) -> bool {
22        match error {
23            LokiError::ServerError { code } => match *code {
24                StatusCode::TOO_MANY_REQUESTS => true,
25                StatusCode::NOT_IMPLEMENTED => false,
26                _ if code.is_server_error() => true,
27                _ => false,
28            },
29            LokiError::HttpError { .. } => true,
30        }
31    }
32}
33
34#[derive(Debug, Snafu)]
35pub enum LokiError {
36    #[snafu(display("Server responded with an error: {}", code))]
37    ServerError { code: StatusCode },
38    #[snafu(display("Failed to make HTTP(S) request: {}", error))]
39    HttpError { error: crate::http::HttpError },
40}
41
42#[derive(Debug, Snafu)]
43pub struct LokiResponse {
44    metadata: RequestMetadata,
45}
46
47impl DriverResponse for LokiResponse {
48    fn event_status(&self) -> EventStatus {
49        EventStatus::Delivered
50    }
51
52    fn events_sent(&self) -> &GroupedCountByteSize {
53        self.metadata.events_estimated_json_encoded_byte_size()
54    }
55
56    fn bytes_sent(&self) -> Option<usize> {
57        Some(self.metadata.request_encoded_size())
58    }
59}
60
61#[derive(Clone)]
62pub struct LokiRequest {
63    pub compression: Compression,
64    pub finalizers: EventFinalizers,
65    pub payload: Bytes,
66    pub tenant_id: Option<String>,
67    pub metadata: RequestMetadata,
68}
69
70impl Finalizable for LokiRequest {
71    fn take_finalizers(&mut self) -> EventFinalizers {
72        self.finalizers.take_finalizers()
73    }
74}
75
76impl MetaDescriptive for LokiRequest {
77    fn get_metadata(&self) -> &RequestMetadata {
78        &self.metadata
79    }
80
81    fn metadata_mut(&mut self) -> &mut RequestMetadata {
82        &mut self.metadata
83    }
84}
85
86#[derive(Debug, Clone)]
87pub struct LokiService {
88    endpoint: UriSerde,
89    client: HttpClient,
90}
91
92impl LokiService {
93    pub fn new(
94        client: HttpClient,
95        endpoint: UriSerde,
96        path: String,
97        auth: Option<Auth>,
98    ) -> crate::Result<Self> {
99        let endpoint = endpoint.append_path(&path)?.with_auth(auth);
100
101        Ok(Self { client, endpoint })
102    }
103}
104
105impl Service<LokiRequest> for LokiService {
106    type Response = LokiResponse;
107    type Error = LokiError;
108    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
109
110    fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
111        Poll::Ready(Ok(()))
112    }
113
114    fn call(&mut self, request: LokiRequest) -> Self::Future {
115        let content_type = match request.compression {
116            Compression::Snappy => "application/x-protobuf",
117            _ => "application/json",
118        };
119        let mut req = http::Request::post(&self.endpoint.uri).header("Content-Type", content_type);
120
121        let metadata = request.get_metadata().clone();
122
123        if let Some(tenant_id) = request.tenant_id {
124            req = req.header("X-Scope-OrgID", tenant_id);
125        }
126
127        if let Some(ce) = request.compression.content_encoding() {
128            req = req.header("Content-Encoding", ce);
129        }
130
131        let body = hyper::Body::from(request.payload);
132        let mut req = req.body(body).unwrap();
133
134        if let Some(auth) = &self.endpoint.auth {
135            auth.apply(&mut req);
136        }
137
138        let mut client = self.client.clone();
139
140        Box::pin(async move {
141            match client.call(req).in_current_span().await {
142                Ok(response) => {
143                    let status = response.status();
144
145                    if status.is_success() {
146                        Ok(LokiResponse { metadata })
147                    } else {
148                        Err(LokiError::ServerError { code: status })
149                    }
150                }
151                Err(error) => Err(LokiError::HttpError { error }),
152            }
153        })
154    }
155}