vector/sinks/appsignal/
service.rs

1use std::task::Poll;
2
3use bytes::Bytes;
4use futures::{
5    future,
6    future::{BoxFuture, Ready},
7};
8use http::{header::AUTHORIZATION, Request, StatusCode, Uri};
9use hyper::Body;
10use tower::{Service, ServiceExt};
11
12use vector_lib::stream::DriverResponse;
13use vector_lib::{
14    finalization::EventStatus, request_metadata::GroupedCountByteSize,
15    request_metadata::MetaDescriptive, sensitive_string::SensitiveString,
16};
17
18use crate::{
19    http::HttpClient,
20    sinks::util::{http::HttpBatchService, sink::Response, Compression},
21};
22
23use super::request_builder::AppsignalRequest;
24
25#[derive(Clone)]
26pub(super) struct AppsignalService {
27    // TODO: `HttpBatchService` has been deprecated for direct use in sinks.
28    //       This sink should undergo a refactor to utilize the `HttpService`
29    //       instead, which extracts much of the boilerplate code for `Service`.
30    pub(super) batch_service:
31        HttpBatchService<Ready<Result<http::Request<Bytes>, crate::Error>>, AppsignalRequest>,
32}
33
34impl AppsignalService {
35    pub fn new(
36        http_client: HttpClient<Body>,
37        endpoint: Uri,
38        push_api_key: SensitiveString,
39        compression: Compression,
40    ) -> Self {
41        let batch_service = HttpBatchService::new(http_client, move |req| {
42            let req: AppsignalRequest = req;
43
44            let mut request = Request::post(&endpoint)
45                .header("Content-Type", "application/json")
46                .header(AUTHORIZATION, format!("Bearer {}", push_api_key.inner()))
47                .header("Content-Length", req.payload.len());
48            if let Some(ce) = compression.content_encoding() {
49                request = request.header("Content-Encoding", ce)
50            }
51            let result = request.body(req.payload).map_err(|x| x.into());
52            future::ready(result)
53        });
54        Self { batch_service }
55    }
56}
57
58impl Service<AppsignalRequest> for AppsignalService {
59    type Response = AppsignalResponse;
60    type Error = crate::Error;
61    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
62
63    fn poll_ready(
64        &mut self,
65        _cx: &mut std::task::Context<'_>,
66    ) -> std::task::Poll<Result<(), Self::Error>> {
67        Poll::Ready(Ok(()))
68    }
69
70    fn call(&mut self, mut request: AppsignalRequest) -> Self::Future {
71        let mut http_service = self.batch_service.clone();
72
73        Box::pin(async move {
74            let metadata = std::mem::take(request.metadata_mut());
75            http_service.ready().await?;
76            let bytes_sent = metadata.request_encoded_size();
77            let event_byte_size = metadata.into_events_estimated_json_encoded_byte_size();
78            let http_response = http_service.call(request).await?;
79            let event_status = if http_response.is_successful() {
80                EventStatus::Delivered
81            } else if http_response.is_transient() {
82                EventStatus::Errored
83            } else {
84                EventStatus::Rejected
85            };
86            Ok(AppsignalResponse {
87                event_status,
88                http_status: http_response.status(),
89                event_byte_size,
90                bytes_sent,
91            })
92        })
93    }
94}
95
96pub struct AppsignalResponse {
97    pub(super) event_status: EventStatus,
98    pub(super) http_status: StatusCode,
99    pub(super) event_byte_size: GroupedCountByteSize,
100    pub(super) bytes_sent: usize,
101}
102
103impl DriverResponse for AppsignalResponse {
104    fn event_status(&self) -> EventStatus {
105        self.event_status
106    }
107
108    fn events_sent(&self) -> &GroupedCountByteSize {
109        &self.event_byte_size
110    }
111
112    fn bytes_sent(&self) -> Option<usize> {
113        Some(self.bytes_sent)
114    }
115}