vector/sinks/appsignal/
service.rs1use std::task::Poll;
2
3use bytes::Bytes;
4use futures::{
5 future,
6 future::{BoxFuture, Ready},
7};
8use http::{Request, StatusCode, Uri, header::AUTHORIZATION};
9use hyper::Body;
10use tower::{Service, ServiceExt};
11use vector_lib::{
12 finalization::EventStatus,
13 request_metadata::{GroupedCountByteSize, MetaDescriptive},
14 sensitive_string::SensitiveString,
15 stream::DriverResponse,
16};
17
18use super::request_builder::AppsignalRequest;
19use crate::{
20 http::HttpClient,
21 sinks::util::{Compression, http::HttpBatchService, sink::Response},
22};
23
24#[derive(Clone)]
25pub(super) struct AppsignalService {
26 pub(super) batch_service:
30 HttpBatchService<Ready<Result<http::Request<Bytes>, crate::Error>>, AppsignalRequest>,
31}
32
33impl AppsignalService {
34 pub fn new(
35 http_client: HttpClient<Body>,
36 endpoint: Uri,
37 push_api_key: SensitiveString,
38 compression: Compression,
39 ) -> Self {
40 let batch_service = HttpBatchService::new(http_client, move |req| {
41 let req: AppsignalRequest = req;
42
43 let mut request = Request::post(&endpoint)
44 .header("Content-Type", "application/json")
45 .header(AUTHORIZATION, format!("Bearer {}", push_api_key.inner()))
46 .header("Content-Length", req.payload.len());
47 if let Some(ce) = compression.content_encoding() {
48 request = request.header("Content-Encoding", ce)
49 }
50 let result = request.body(req.payload).map_err(|x| x.into());
51 future::ready(result)
52 });
53 Self { batch_service }
54 }
55}
56
57impl Service<AppsignalRequest> for AppsignalService {
58 type Response = AppsignalResponse;
59 type Error = crate::Error;
60 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
61
62 fn poll_ready(
63 &mut self,
64 _cx: &mut std::task::Context<'_>,
65 ) -> std::task::Poll<Result<(), Self::Error>> {
66 Poll::Ready(Ok(()))
67 }
68
69 fn call(&mut self, mut request: AppsignalRequest) -> Self::Future {
70 let mut http_service = self.batch_service.clone();
71
72 Box::pin(async move {
73 let metadata = std::mem::take(request.metadata_mut());
74 http_service.ready().await?;
75 let bytes_sent = metadata.request_encoded_size();
76 let event_byte_size = metadata.into_events_estimated_json_encoded_byte_size();
77 let http_response = http_service.call(request).await?;
78 let event_status = if http_response.is_successful() {
79 EventStatus::Delivered
80 } else if http_response.is_transient() {
81 EventStatus::Errored
82 } else {
83 EventStatus::Rejected
84 };
85 Ok(AppsignalResponse {
86 event_status,
87 http_status: http_response.status(),
88 event_byte_size,
89 bytes_sent,
90 })
91 })
92 }
93}
94
95pub struct AppsignalResponse {
96 pub(super) event_status: EventStatus,
97 pub(super) http_status: StatusCode,
98 pub(super) event_byte_size: GroupedCountByteSize,
99 pub(super) bytes_sent: usize,
100}
101
102impl DriverResponse for AppsignalResponse {
103 fn event_status(&self) -> EventStatus {
104 self.event_status
105 }
106
107 fn events_sent(&self) -> &GroupedCountByteSize {
108 &self.event_byte_size
109 }
110
111 fn bytes_sent(&self) -> Option<usize> {
112 Some(self.bytes_sent)
113 }
114}