vector/sinks/new_relic/
service.rs1use std::{
2 fmt::Debug,
3 sync::Arc,
4 task::{Context, Poll},
5};
6
7use bytes::Bytes;
8use http::{
9 Request,
10 header::{CONTENT_ENCODING, CONTENT_LENGTH, CONTENT_TYPE},
11};
12use hyper::Body;
13use tracing::Instrument;
14
15use super::{NewRelicCredentials, NewRelicSinkError};
16use crate::{
17 http::HttpClient,
18 sinks::{prelude::*, util::Compression},
19};
20
21#[derive(Debug, Clone)]
22pub struct NewRelicApiRequest {
23 pub metadata: RequestMetadata,
24 pub finalizers: EventFinalizers,
25 pub credentials: Arc<NewRelicCredentials>,
26 pub payload: Bytes,
27 pub compression: Compression,
28}
29
30impl Finalizable for NewRelicApiRequest {
31 fn take_finalizers(&mut self) -> EventFinalizers {
32 std::mem::take(&mut self.finalizers)
33 }
34}
35
36impl MetaDescriptive for NewRelicApiRequest {
37 fn get_metadata(&self) -> &RequestMetadata {
38 &self.metadata
39 }
40
41 fn metadata_mut(&mut self) -> &mut RequestMetadata {
42 &mut self.metadata
43 }
44}
45
46#[derive(Debug)]
47pub struct NewRelicApiResponse {
48 event_status: EventStatus,
49 metadata: RequestMetadata,
50}
51
52impl DriverResponse for NewRelicApiResponse {
53 fn event_status(&self) -> EventStatus {
54 self.event_status
55 }
56
57 fn events_sent(&self) -> &GroupedCountByteSize {
58 self.metadata.events_estimated_json_encoded_byte_size()
59 }
60
61 fn bytes_sent(&self) -> Option<usize> {
62 Some(self.metadata.request_encoded_size())
63 }
64}
65
66#[derive(Debug, Clone)]
67pub struct NewRelicApiService {
68 pub client: HttpClient,
69}
70
71impl Service<NewRelicApiRequest> for NewRelicApiService {
72 type Response = NewRelicApiResponse;
73 type Error = NewRelicSinkError;
74 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
75
76 fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
77 Poll::Ready(Ok(()))
78 }
79
80 fn call(&mut self, request: NewRelicApiRequest) -> Self::Future {
81 let mut client = self.client.clone();
82
83 let uri = request.credentials.get_uri();
84
85 let http_request = Request::post(&uri)
86 .header(CONTENT_TYPE, "application/json")
87 .header("Api-Key", request.credentials.license_key.clone());
88
89 let http_request = if let Some(ce) = request.compression.content_encoding() {
90 http_request.header(CONTENT_ENCODING, ce)
91 } else {
92 http_request
93 };
94
95 let payload_len = request.payload.len();
96 let metadata = request.get_metadata().clone();
97 let http_request = http_request
98 .header(CONTENT_LENGTH, payload_len)
99 .body(Body::from(request.payload))
100 .expect("building HTTP request failed unexpectedly");
101
102 Box::pin(async move {
103 match client.call(http_request).in_current_span().await {
104 Ok(_) => Ok(NewRelicApiResponse {
105 event_status: EventStatus::Delivered,
106 metadata,
107 }),
108 Err(_) => Err(NewRelicSinkError::new("HTTP request error")),
109 }
110 })
111 }
112}