vector/sinks/appsignal/
service.rs1use std::task::Poll;
2
3use bytes::Bytes;
4use futures::{
5 future,
6 future::{BoxFuture, Ready},
7};
8use http::{header::AUTHORIZATION, Request, StatusCode, Uri};
9use hyper::Body;
10use tower::{Service, ServiceExt};
11
12use vector_lib::stream::DriverResponse;
13use vector_lib::{
14 finalization::EventStatus, request_metadata::GroupedCountByteSize,
15 request_metadata::MetaDescriptive, sensitive_string::SensitiveString,
16};
17
18use crate::{
19 http::HttpClient,
20 sinks::util::{http::HttpBatchService, sink::Response, Compression},
21};
22
23use super::request_builder::AppsignalRequest;
24
25#[derive(Clone)]
26pub(super) struct AppsignalService {
27 pub(super) batch_service:
31 HttpBatchService<Ready<Result<http::Request<Bytes>, crate::Error>>, AppsignalRequest>,
32}
33
34impl AppsignalService {
35 pub fn new(
36 http_client: HttpClient<Body>,
37 endpoint: Uri,
38 push_api_key: SensitiveString,
39 compression: Compression,
40 ) -> Self {
41 let batch_service = HttpBatchService::new(http_client, move |req| {
42 let req: AppsignalRequest = req;
43
44 let mut request = Request::post(&endpoint)
45 .header("Content-Type", "application/json")
46 .header(AUTHORIZATION, format!("Bearer {}", push_api_key.inner()))
47 .header("Content-Length", req.payload.len());
48 if let Some(ce) = compression.content_encoding() {
49 request = request.header("Content-Encoding", ce)
50 }
51 let result = request.body(req.payload).map_err(|x| x.into());
52 future::ready(result)
53 });
54 Self { batch_service }
55 }
56}
57
58impl Service<AppsignalRequest> for AppsignalService {
59 type Response = AppsignalResponse;
60 type Error = crate::Error;
61 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
62
63 fn poll_ready(
64 &mut self,
65 _cx: &mut std::task::Context<'_>,
66 ) -> std::task::Poll<Result<(), Self::Error>> {
67 Poll::Ready(Ok(()))
68 }
69
70 fn call(&mut self, mut request: AppsignalRequest) -> Self::Future {
71 let mut http_service = self.batch_service.clone();
72
73 Box::pin(async move {
74 let metadata = std::mem::take(request.metadata_mut());
75 http_service.ready().await?;
76 let bytes_sent = metadata.request_encoded_size();
77 let event_byte_size = metadata.into_events_estimated_json_encoded_byte_size();
78 let http_response = http_service.call(request).await?;
79 let event_status = if http_response.is_successful() {
80 EventStatus::Delivered
81 } else if http_response.is_transient() {
82 EventStatus::Errored
83 } else {
84 EventStatus::Rejected
85 };
86 Ok(AppsignalResponse {
87 event_status,
88 http_status: http_response.status(),
89 event_byte_size,
90 bytes_sent,
91 })
92 })
93 }
94}
95
96pub struct AppsignalResponse {
97 pub(super) event_status: EventStatus,
98 pub(super) http_status: StatusCode,
99 pub(super) event_byte_size: GroupedCountByteSize,
100 pub(super) bytes_sent: usize,
101}
102
103impl DriverResponse for AppsignalResponse {
104 fn event_status(&self) -> EventStatus {
105 self.event_status
106 }
107
108 fn events_sent(&self) -> &GroupedCountByteSize {
109 &self.event_byte_size
110 }
111
112 fn bytes_sent(&self) -> Option<usize> {
113 Some(self.bytes_sent)
114 }
115}