vector/sinks/loki/
service.rs1use std::task::{Context, Poll};
2
3use bytes::Bytes;
4use http::StatusCode;
5use snafu::Snafu;
6use tracing::Instrument;
7
8use crate::{
9 http::{Auth, HttpClient},
10 sinks::{prelude::*, util::UriSerde},
11};
12
13#[derive(Clone)]
14pub struct LokiRetryLogic;
15
16impl RetryLogic for LokiRetryLogic {
17 type Error = LokiError;
18 type Request = LokiRequest;
19 type Response = LokiResponse;
20
21 fn is_retriable_error(&self, error: &Self::Error) -> bool {
22 match error {
23 LokiError::ServerError { code } => match *code {
24 StatusCode::TOO_MANY_REQUESTS => true,
25 StatusCode::NOT_IMPLEMENTED => false,
26 _ if code.is_server_error() => true,
27 _ => false,
28 },
29 LokiError::HttpError { .. } => true,
30 }
31 }
32}
33
34#[derive(Debug, Snafu)]
35pub enum LokiError {
36 #[snafu(display("Server responded with an error: {}", code))]
37 ServerError { code: StatusCode },
38 #[snafu(display("Failed to make HTTP(S) request: {}", error))]
39 HttpError { error: crate::http::HttpError },
40}
41
42#[derive(Debug, Snafu)]
43pub struct LokiResponse {
44 metadata: RequestMetadata,
45}
46
47impl DriverResponse for LokiResponse {
48 fn event_status(&self) -> EventStatus {
49 EventStatus::Delivered
50 }
51
52 fn events_sent(&self) -> &GroupedCountByteSize {
53 self.metadata.events_estimated_json_encoded_byte_size()
54 }
55
56 fn bytes_sent(&self) -> Option<usize> {
57 Some(self.metadata.request_encoded_size())
58 }
59}
60
61#[derive(Clone)]
62pub struct LokiRequest {
63 pub compression: Compression,
64 pub finalizers: EventFinalizers,
65 pub payload: Bytes,
66 pub tenant_id: Option<String>,
67 pub metadata: RequestMetadata,
68}
69
70impl Finalizable for LokiRequest {
71 fn take_finalizers(&mut self) -> EventFinalizers {
72 self.finalizers.take_finalizers()
73 }
74}
75
76impl MetaDescriptive for LokiRequest {
77 fn get_metadata(&self) -> &RequestMetadata {
78 &self.metadata
79 }
80
81 fn metadata_mut(&mut self) -> &mut RequestMetadata {
82 &mut self.metadata
83 }
84}
85
86#[derive(Debug, Clone)]
87pub struct LokiService {
88 endpoint: UriSerde,
89 client: HttpClient,
90}
91
92impl LokiService {
93 pub fn new(
94 client: HttpClient,
95 endpoint: UriSerde,
96 path: String,
97 auth: Option<Auth>,
98 ) -> crate::Result<Self> {
99 let endpoint = endpoint.append_path(&path)?.with_auth(auth);
100
101 Ok(Self { client, endpoint })
102 }
103}
104
105impl Service<LokiRequest> for LokiService {
106 type Response = LokiResponse;
107 type Error = LokiError;
108 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
109
110 fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
111 Poll::Ready(Ok(()))
112 }
113
114 fn call(&mut self, request: LokiRequest) -> Self::Future {
115 let content_type = match request.compression {
116 Compression::Snappy => "application/x-protobuf",
117 _ => "application/json",
118 };
119 let mut req = http::Request::post(&self.endpoint.uri).header("Content-Type", content_type);
120
121 let metadata = request.get_metadata().clone();
122
123 if let Some(tenant_id) = request.tenant_id {
124 req = req.header("X-Scope-OrgID", tenant_id);
125 }
126
127 if let Some(ce) = request.compression.content_encoding() {
128 req = req.header("Content-Encoding", ce);
129 }
130
131 let body = hyper::Body::from(request.payload);
132 let mut req = req.body(body).unwrap();
133
134 if let Some(auth) = &self.endpoint.auth {
135 auth.apply(&mut req);
136 }
137
138 let mut client = self.client.clone();
139
140 Box::pin(async move {
141 match client.call(req).in_current_span().await {
142 Ok(response) => {
143 let status = response.status();
144
145 if status.is_success() {
146 Ok(LokiResponse { metadata })
147 } else {
148 Err(LokiError::ServerError { code: status })
149 }
150 }
151 Err(error) => Err(LokiError::HttpError { error }),
152 }
153 })
154 }
155}