1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
use std::{
    fmt::Debug,
    sync::Arc,
    task::{Context, Poll},
};

use bytes::Bytes;
use http::{
    header::{CONTENT_ENCODING, CONTENT_LENGTH, CONTENT_TYPE},
    Request,
};
use hyper::Body;
use tracing::Instrument;

use super::{NewRelicCredentials, NewRelicSinkError};
use crate::sinks::prelude::*;
use crate::{http::HttpClient, sinks::util::Compression};

#[derive(Debug, Clone)]
pub struct NewRelicApiRequest {
    pub metadata: RequestMetadata,
    pub finalizers: EventFinalizers,
    pub credentials: Arc<NewRelicCredentials>,
    pub payload: Bytes,
    pub compression: Compression,
}

impl Finalizable for NewRelicApiRequest {
    fn take_finalizers(&mut self) -> EventFinalizers {
        std::mem::take(&mut self.finalizers)
    }
}

impl MetaDescriptive for NewRelicApiRequest {
    fn get_metadata(&self) -> &RequestMetadata {
        &self.metadata
    }

    fn metadata_mut(&mut self) -> &mut RequestMetadata {
        &mut self.metadata
    }
}

#[derive(Debug)]
pub struct NewRelicApiResponse {
    event_status: EventStatus,
    metadata: RequestMetadata,
}

impl DriverResponse for NewRelicApiResponse {
    fn event_status(&self) -> EventStatus {
        self.event_status
    }

    fn events_sent(&self) -> &GroupedCountByteSize {
        self.metadata.events_estimated_json_encoded_byte_size()
    }

    fn bytes_sent(&self) -> Option<usize> {
        Some(self.metadata.request_encoded_size())
    }
}

#[derive(Debug, Clone)]
pub struct NewRelicApiService {
    pub client: HttpClient,
}

impl Service<NewRelicApiRequest> for NewRelicApiService {
    type Response = NewRelicApiResponse;
    type Error = NewRelicSinkError;
    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;

    fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }

    fn call(&mut self, request: NewRelicApiRequest) -> Self::Future {
        let mut client = self.client.clone();

        let uri = request.credentials.get_uri();

        let http_request = Request::post(&uri)
            .header(CONTENT_TYPE, "application/json")
            .header("Api-Key", request.credentials.license_key.clone());

        let http_request = if let Some(ce) = request.compression.content_encoding() {
            http_request.header(CONTENT_ENCODING, ce)
        } else {
            http_request
        };

        let payload_len = request.payload.len();
        let metadata = request.get_metadata().clone();
        let http_request = http_request
            .header(CONTENT_LENGTH, payload_len)
            .body(Body::from(request.payload))
            .expect("building HTTP request failed unexpectedly");

        Box::pin(async move {
            match client.call(http_request).in_current_span().await {
                Ok(_) => Ok(NewRelicApiResponse {
                    event_status: EventStatus::Delivered,
                    metadata,
                }),
                Err(_) => Err(NewRelicSinkError::new("HTTP request error")),
            }
        })
    }
}