vector/sinks/appsignal/
service.rs

1use std::task::Poll;
2
3use bytes::Bytes;
4use futures::{
5    future,
6    future::{BoxFuture, Ready},
7};
8use http::{Request, StatusCode, Uri, header::AUTHORIZATION};
9use hyper::Body;
10use tower::{Service, ServiceExt};
11use vector_lib::{
12    finalization::EventStatus,
13    request_metadata::{GroupedCountByteSize, MetaDescriptive},
14    sensitive_string::SensitiveString,
15    stream::DriverResponse,
16};
17
18use super::request_builder::AppsignalRequest;
19use crate::{
20    http::HttpClient,
21    sinks::util::{Compression, http::HttpBatchService, sink::Response},
22};
23
24#[derive(Clone)]
25pub(super) struct AppsignalService {
26    // TODO: `HttpBatchService` has been deprecated for direct use in sinks.
27    //       This sink should undergo a refactor to utilize the `HttpService`
28    //       instead, which extracts much of the boilerplate code for `Service`.
29    pub(super) batch_service:
30        HttpBatchService<Ready<Result<http::Request<Bytes>, crate::Error>>, AppsignalRequest>,
31}
32
33impl AppsignalService {
34    pub fn new(
35        http_client: HttpClient<Body>,
36        endpoint: Uri,
37        push_api_key: SensitiveString,
38        compression: Compression,
39    ) -> Self {
40        let batch_service = HttpBatchService::new(http_client, move |req| {
41            let req: AppsignalRequest = req;
42
43            let mut request = Request::post(&endpoint)
44                .header("Content-Type", "application/json")
45                .header(AUTHORIZATION, format!("Bearer {}", push_api_key.inner()))
46                .header("Content-Length", req.payload.len());
47            if let Some(ce) = compression.content_encoding() {
48                request = request.header("Content-Encoding", ce)
49            }
50            let result = request.body(req.payload).map_err(|x| x.into());
51            future::ready(result)
52        });
53        Self { batch_service }
54    }
55}
56
57impl Service<AppsignalRequest> for AppsignalService {
58    type Response = AppsignalResponse;
59    type Error = crate::Error;
60    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
61
62    fn poll_ready(
63        &mut self,
64        _cx: &mut std::task::Context<'_>,
65    ) -> std::task::Poll<Result<(), Self::Error>> {
66        Poll::Ready(Ok(()))
67    }
68
69    fn call(&mut self, mut request: AppsignalRequest) -> Self::Future {
70        let mut http_service = self.batch_service.clone();
71
72        Box::pin(async move {
73            let metadata = std::mem::take(request.metadata_mut());
74            http_service.ready().await?;
75            let bytes_sent = metadata.request_encoded_size();
76            let event_byte_size = metadata.into_events_estimated_json_encoded_byte_size();
77            let http_response = http_service.call(request).await?;
78            let event_status = if http_response.is_successful() {
79                EventStatus::Delivered
80            } else if http_response.is_transient() {
81                EventStatus::Errored
82            } else {
83                EventStatus::Rejected
84            };
85            Ok(AppsignalResponse {
86                event_status,
87                http_status: http_response.status(),
88                event_byte_size,
89                bytes_sent,
90            })
91        })
92    }
93}
94
95pub struct AppsignalResponse {
96    pub(super) event_status: EventStatus,
97    pub(super) http_status: StatusCode,
98    pub(super) event_byte_size: GroupedCountByteSize,
99    pub(super) bytes_sent: usize,
100}
101
102impl DriverResponse for AppsignalResponse {
103    fn event_status(&self) -> EventStatus {
104        self.event_status
105    }
106
107    fn events_sent(&self) -> &GroupedCountByteSize {
108        &self.event_byte_size
109    }
110
111    fn bytes_sent(&self) -> Option<usize> {
112        Some(self.bytes_sent)
113    }
114}