vector/internal_events/
splunk_hec.rs

1// ## skip check-dropped-events ##
2
3#[cfg(feature = "sinks-splunk_hec")]
4pub use self::sink::*;
5#[cfg(feature = "sources-splunk_hec")]
6pub use self::source::*;
7
8#[cfg(feature = "sinks-splunk_hec")]
9mod sink {
10    use metrics::{counter, gauge};
11    use serde_json::Error;
12    use vector_lib::internal_event::{
13        ComponentEventsDropped, InternalEvent, UNINTENTIONAL, error_stage, error_type,
14    };
15
16    use crate::{
17        event::metric::{MetricKind, MetricValue},
18        sinks::splunk_hec::common::acknowledgements::HecAckApiError,
19    };
20
21    #[derive(Debug)]
22    pub struct SplunkEventEncodeError {
23        pub error: vector_lib::Error,
24    }
25
26    impl InternalEvent for SplunkEventEncodeError {
27        fn emit(self) {
28            let reason = "Failed to encode Splunk HEC event as JSON.";
29            error!(
30                message = reason,
31                error = ?self.error,
32                error_code = "serializing_json",
33                error_type = error_type::ENCODER_FAILED,
34                stage = error_stage::PROCESSING,
35                internal_log_rate_limit = true,
36            );
37            counter!(
38                "component_errors_total",
39                "error_code" => "serializing_json",
40                "error_type" => error_type::ENCODER_FAILED,
41                "stage" => error_stage::PROCESSING,
42            )
43            .increment(1);
44            emit!(ComponentEventsDropped::<UNINTENTIONAL> { count: 1, reason });
45        }
46    }
47
48    #[derive(Debug)]
49    pub(crate) struct SplunkInvalidMetricReceivedError<'a> {
50        pub value: &'a MetricValue,
51        pub kind: &'a MetricKind,
52        pub error: crate::Error,
53    }
54
55    impl InternalEvent for SplunkInvalidMetricReceivedError<'_> {
56        fn emit(self) {
57            error!(
58                message = "Invalid metric received.",
59                error = ?self.error,
60                error_type = error_type::INVALID_METRIC,
61                stage = error_stage::PROCESSING,
62                value = ?self.value,
63                kind = ?self.kind,
64                internal_log_rate_limit = true,
65            );
66            counter!(
67                "component_errors_total",
68                "error_type" => error_type::INVALID_METRIC,
69                "stage" => error_stage::PROCESSING,
70            )
71            .increment(1);
72            counter!(
73                "component_discarded_events_total",
74                "error_type" => error_type::INVALID_METRIC,
75                "stage" => error_stage::PROCESSING,
76            )
77            .increment(1);
78        }
79    }
80
81    #[derive(Debug)]
82    pub struct SplunkResponseParseError {
83        pub error: Error,
84    }
85
86    impl InternalEvent for SplunkResponseParseError {
87        fn emit(self) {
88            error!(
89                message = "Unable to parse Splunk HEC response. Acknowledging based on initial 200 OK.",
90                error = ?self.error,
91                error_code = "invalid_response",
92                error_type = error_type::PARSER_FAILED,
93                stage = error_stage::SENDING,
94                internal_log_rate_limit = true,
95            );
96            counter!(
97                "component_errors_total",
98                "error_code" => "invalid_response",
99                "error_type" => error_type::PARSER_FAILED,
100                "stage" => error_stage::SENDING,
101            )
102            .increment(1);
103        }
104    }
105
106    #[derive(Debug)]
107    pub struct SplunkIndexerAcknowledgementAPIError {
108        pub message: &'static str,
109        pub error: HecAckApiError,
110    }
111
112    impl InternalEvent for SplunkIndexerAcknowledgementAPIError {
113        fn emit(self) {
114            error!(
115                message = self.message,
116                error = ?self.error,
117                error_code = "indexer_ack_failed",
118                error_type = error_type::ACKNOWLEDGMENT_FAILED,
119                stage = error_stage::SENDING,
120                internal_log_rate_limit = true,
121            );
122            counter!(
123                "component_errors_total",
124                "error_code" => "indexer_ack_failed",
125                "error_type" => error_type::ACKNOWLEDGMENT_FAILED,
126                "stage" => error_stage::SENDING,
127            )
128            .increment(1);
129        }
130    }
131
132    #[derive(Debug)]
133    pub struct SplunkIndexerAcknowledgementUnavailableError<E> {
134        pub error: E,
135    }
136
137    impl<E: std::fmt::Display> InternalEvent for SplunkIndexerAcknowledgementUnavailableError<E> {
138        fn emit(self) {
139            error!(
140                message = "Internal indexer acknowledgement client unavailable. Acknowledging based on initial 200 OK.",
141                error = %self.error,
142                error_code = "indexer_ack_unavailable",
143                error_type = error_type::ACKNOWLEDGMENT_FAILED,
144                stage = error_stage::SENDING,
145                internal_log_rate_limit = true,
146            );
147            counter!(
148                "component_errors_total",
149                "error_code" => "indexer_ack_unavailable",
150                "error_type" => error_type::ACKNOWLEDGMENT_FAILED,
151                "stage" => error_stage::SENDING,
152            )
153            .increment(1);
154        }
155    }
156
157    pub struct SplunkIndexerAcknowledgementAckAdded;
158
159    impl InternalEvent for SplunkIndexerAcknowledgementAckAdded {
160        fn emit(self) {
161            gauge!("splunk_pending_acks").increment(1.0);
162        }
163    }
164
165    pub struct SplunkIndexerAcknowledgementAcksRemoved {
166        pub count: f64,
167    }
168
169    impl InternalEvent for SplunkIndexerAcknowledgementAcksRemoved {
170        fn emit(self) {
171            gauge!("splunk_pending_acks").decrement(self.count);
172        }
173    }
174
175    pub struct SplunkEventTimestampInvalidType<'a> {
176        pub r#type: &'a str,
177    }
178
179    impl InternalEvent for SplunkEventTimestampInvalidType<'_> {
180        fn emit(self) {
181            warn!(
182                message =
183                    "Timestamp was an unexpected type. Deferring to Splunk to set the timestamp.",
184                invalid_type = self.r#type,
185                internal_log_rate_limit = true
186            );
187        }
188    }
189
190    pub struct SplunkEventTimestampMissing;
191
192    impl InternalEvent for SplunkEventTimestampMissing {
193        fn emit(self) {
194            warn!(
195                message = "Timestamp was not found. Deferring to Splunk to set the timestamp.",
196                internal_log_rate_limit = true
197            );
198        }
199    }
200}
201
202#[cfg(feature = "sources-splunk_hec")]
203mod source {
204    use metrics::counter;
205    use vector_lib::internal_event::{InternalEvent, error_stage, error_type};
206
207    use crate::sources::splunk_hec::ApiError;
208
209    #[derive(Debug)]
210    pub struct SplunkHecRequestBodyInvalidError {
211        pub error: std::io::Error,
212    }
213
214    impl InternalEvent for SplunkHecRequestBodyInvalidError {
215        fn emit(self) {
216            error!(
217                message = "Invalid request body.",
218                error = ?self.error,
219                error_code = "invalid_request_body",
220                error_type = error_type::PARSER_FAILED,
221                stage = error_stage::PROCESSING,
222                internal_log_rate_limit = true
223            );
224            counter!(
225                "component_errors_total",
226                "error_code" => "invalid_request_body",
227                "error_type" => error_type::PARSER_FAILED,
228                "stage" => error_stage::PROCESSING,
229            )
230            .increment(1);
231        }
232    }
233
234    #[derive(Debug)]
235    pub struct SplunkHecRequestError {
236        pub(crate) error: ApiError,
237    }
238
239    impl InternalEvent for SplunkHecRequestError {
240        fn emit(self) {
241            error!(
242                message = "Error processing request.",
243                error = ?self.error,
244                error_type = error_type::REQUEST_FAILED,
245                stage = error_stage::RECEIVING,
246                internal_log_rate_limit = true
247            );
248            counter!(
249                "component_errors_total",
250                "error_type" => error_type::REQUEST_FAILED,
251                "stage" => error_stage::RECEIVING,
252            )
253            .increment(1);
254        }
255    }
256}