vector/sinks/new_relic/
service.rs

1use std::{
2    fmt::Debug,
3    sync::Arc,
4    task::{Context, Poll},
5};
6
7use bytes::Bytes;
8use http::{
9    header::{CONTENT_ENCODING, CONTENT_LENGTH, CONTENT_TYPE},
10    Request,
11};
12use hyper::Body;
13use tracing::Instrument;
14
15use super::{NewRelicCredentials, NewRelicSinkError};
16use crate::sinks::prelude::*;
17use crate::{http::HttpClient, sinks::util::Compression};
18
19#[derive(Debug, Clone)]
20pub struct NewRelicApiRequest {
21    pub metadata: RequestMetadata,
22    pub finalizers: EventFinalizers,
23    pub credentials: Arc<NewRelicCredentials>,
24    pub payload: Bytes,
25    pub compression: Compression,
26}
27
28impl Finalizable for NewRelicApiRequest {
29    fn take_finalizers(&mut self) -> EventFinalizers {
30        std::mem::take(&mut self.finalizers)
31    }
32}
33
34impl MetaDescriptive for NewRelicApiRequest {
35    fn get_metadata(&self) -> &RequestMetadata {
36        &self.metadata
37    }
38
39    fn metadata_mut(&mut self) -> &mut RequestMetadata {
40        &mut self.metadata
41    }
42}
43
44#[derive(Debug)]
45pub struct NewRelicApiResponse {
46    event_status: EventStatus,
47    metadata: RequestMetadata,
48}
49
50impl DriverResponse for NewRelicApiResponse {
51    fn event_status(&self) -> EventStatus {
52        self.event_status
53    }
54
55    fn events_sent(&self) -> &GroupedCountByteSize {
56        self.metadata.events_estimated_json_encoded_byte_size()
57    }
58
59    fn bytes_sent(&self) -> Option<usize> {
60        Some(self.metadata.request_encoded_size())
61    }
62}
63
64#[derive(Debug, Clone)]
65pub struct NewRelicApiService {
66    pub client: HttpClient,
67}
68
69impl Service<NewRelicApiRequest> for NewRelicApiService {
70    type Response = NewRelicApiResponse;
71    type Error = NewRelicSinkError;
72    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
73
74    fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
75        Poll::Ready(Ok(()))
76    }
77
78    fn call(&mut self, request: NewRelicApiRequest) -> Self::Future {
79        let mut client = self.client.clone();
80
81        let uri = request.credentials.get_uri();
82
83        let http_request = Request::post(&uri)
84            .header(CONTENT_TYPE, "application/json")
85            .header("Api-Key", request.credentials.license_key.clone());
86
87        let http_request = if let Some(ce) = request.compression.content_encoding() {
88            http_request.header(CONTENT_ENCODING, ce)
89        } else {
90            http_request
91        };
92
93        let payload_len = request.payload.len();
94        let metadata = request.get_metadata().clone();
95        let http_request = http_request
96            .header(CONTENT_LENGTH, payload_len)
97            .body(Body::from(request.payload))
98            .expect("building HTTP request failed unexpectedly");
99
100        Box::pin(async move {
101            match client.call(http_request).in_current_span().await {
102                Ok(_) => Ok(NewRelicApiResponse {
103                    event_status: EventStatus::Delivered,
104                    metadata,
105                }),
106                Err(_) => Err(NewRelicSinkError::new("HTTP request error")),
107            }
108        })
109    }
110}