vector/sinks/new_relic/
service.rs1use std::{
2 fmt::Debug,
3 sync::Arc,
4 task::{Context, Poll},
5};
6
7use bytes::Bytes;
8use http::{
9 header::{CONTENT_ENCODING, CONTENT_LENGTH, CONTENT_TYPE},
10 Request,
11};
12use hyper::Body;
13use tracing::Instrument;
14
15use super::{NewRelicCredentials, NewRelicSinkError};
16use crate::sinks::prelude::*;
17use crate::{http::HttpClient, sinks::util::Compression};
18
19#[derive(Debug, Clone)]
20pub struct NewRelicApiRequest {
21 pub metadata: RequestMetadata,
22 pub finalizers: EventFinalizers,
23 pub credentials: Arc<NewRelicCredentials>,
24 pub payload: Bytes,
25 pub compression: Compression,
26}
27
28impl Finalizable for NewRelicApiRequest {
29 fn take_finalizers(&mut self) -> EventFinalizers {
30 std::mem::take(&mut self.finalizers)
31 }
32}
33
34impl MetaDescriptive for NewRelicApiRequest {
35 fn get_metadata(&self) -> &RequestMetadata {
36 &self.metadata
37 }
38
39 fn metadata_mut(&mut self) -> &mut RequestMetadata {
40 &mut self.metadata
41 }
42}
43
44#[derive(Debug)]
45pub struct NewRelicApiResponse {
46 event_status: EventStatus,
47 metadata: RequestMetadata,
48}
49
50impl DriverResponse for NewRelicApiResponse {
51 fn event_status(&self) -> EventStatus {
52 self.event_status
53 }
54
55 fn events_sent(&self) -> &GroupedCountByteSize {
56 self.metadata.events_estimated_json_encoded_byte_size()
57 }
58
59 fn bytes_sent(&self) -> Option<usize> {
60 Some(self.metadata.request_encoded_size())
61 }
62}
63
64#[derive(Debug, Clone)]
65pub struct NewRelicApiService {
66 pub client: HttpClient,
67}
68
69impl Service<NewRelicApiRequest> for NewRelicApiService {
70 type Response = NewRelicApiResponse;
71 type Error = NewRelicSinkError;
72 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
73
74 fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
75 Poll::Ready(Ok(()))
76 }
77
78 fn call(&mut self, request: NewRelicApiRequest) -> Self::Future {
79 let mut client = self.client.clone();
80
81 let uri = request.credentials.get_uri();
82
83 let http_request = Request::post(&uri)
84 .header(CONTENT_TYPE, "application/json")
85 .header("Api-Key", request.credentials.license_key.clone());
86
87 let http_request = if let Some(ce) = request.compression.content_encoding() {
88 http_request.header(CONTENT_ENCODING, ce)
89 } else {
90 http_request
91 };
92
93 let payload_len = request.payload.len();
94 let metadata = request.get_metadata().clone();
95 let http_request = http_request
96 .header(CONTENT_LENGTH, payload_len)
97 .body(Body::from(request.payload))
98 .expect("building HTTP request failed unexpectedly");
99
100 Box::pin(async move {
101 match client.call(http_request).in_current_span().await {
102 Ok(_) => Ok(NewRelicApiResponse {
103 event_status: EventStatus::Delivered,
104 metadata,
105 }),
106 Err(_) => Err(NewRelicSinkError::new("HTTP request error")),
107 }
108 })
109 }
110}