vector/sinks/new_relic/
service.rs

1use std::{
2    fmt::Debug,
3    sync::Arc,
4    task::{Context, Poll},
5};
6
7use bytes::Bytes;
8use http::{
9    Request,
10    header::{CONTENT_ENCODING, CONTENT_LENGTH, CONTENT_TYPE},
11};
12use hyper::Body;
13use tracing::Instrument;
14
15use super::{NewRelicCredentials, NewRelicSinkError};
16use crate::{
17    http::HttpClient,
18    sinks::{prelude::*, util::Compression},
19};
20
21#[derive(Debug, Clone)]
22pub struct NewRelicApiRequest {
23    pub metadata: RequestMetadata,
24    pub finalizers: EventFinalizers,
25    pub credentials: Arc<NewRelicCredentials>,
26    pub payload: Bytes,
27    pub compression: Compression,
28}
29
30impl Finalizable for NewRelicApiRequest {
31    fn take_finalizers(&mut self) -> EventFinalizers {
32        std::mem::take(&mut self.finalizers)
33    }
34}
35
36impl MetaDescriptive for NewRelicApiRequest {
37    fn get_metadata(&self) -> &RequestMetadata {
38        &self.metadata
39    }
40
41    fn metadata_mut(&mut self) -> &mut RequestMetadata {
42        &mut self.metadata
43    }
44}
45
46#[derive(Debug)]
47pub struct NewRelicApiResponse {
48    event_status: EventStatus,
49    metadata: RequestMetadata,
50}
51
52impl DriverResponse for NewRelicApiResponse {
53    fn event_status(&self) -> EventStatus {
54        self.event_status
55    }
56
57    fn events_sent(&self) -> &GroupedCountByteSize {
58        self.metadata.events_estimated_json_encoded_byte_size()
59    }
60
61    fn bytes_sent(&self) -> Option<usize> {
62        Some(self.metadata.request_encoded_size())
63    }
64}
65
66#[derive(Debug, Clone)]
67pub struct NewRelicApiService {
68    pub client: HttpClient,
69}
70
71impl Service<NewRelicApiRequest> for NewRelicApiService {
72    type Response = NewRelicApiResponse;
73    type Error = NewRelicSinkError;
74    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
75
76    fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
77        Poll::Ready(Ok(()))
78    }
79
80    fn call(&mut self, request: NewRelicApiRequest) -> Self::Future {
81        let mut client = self.client.clone();
82
83        let uri = request.credentials.get_uri();
84
85        let http_request = Request::post(&uri)
86            .header(CONTENT_TYPE, "application/json")
87            .header("Api-Key", request.credentials.license_key.clone());
88
89        let http_request = if let Some(ce) = request.compression.content_encoding() {
90            http_request.header(CONTENT_ENCODING, ce)
91        } else {
92            http_request
93        };
94
95        let payload_len = request.payload.len();
96        let metadata = request.get_metadata().clone();
97        let http_request = http_request
98            .header(CONTENT_LENGTH, payload_len)
99            .body(Body::from(request.payload))
100            .expect("building HTTP request failed unexpectedly");
101
102        Box::pin(async move {
103            match client.call(http_request).in_current_span().await {
104                Ok(_) => Ok(NewRelicApiResponse {
105                    event_status: EventStatus::Delivered,
106                    metadata,
107                }),
108                Err(_) => Err(NewRelicSinkError::new("HTTP request error")),
109            }
110        })
111    }
112}