vector/internal_events/
splunk_hec.rs

1// ## skip check-dropped-events ##
2
3#[cfg(feature = "sinks-splunk_hec")]
4pub use self::sink::*;
5#[cfg(feature = "sources-splunk_hec")]
6pub use self::source::*;
7
8#[cfg(feature = "sinks-splunk_hec")]
9mod sink {
10    use metrics::{counter, gauge};
11    use serde_json::Error;
12    use vector_lib::internal_event::InternalEvent;
13    use vector_lib::internal_event::{
14        error_stage, error_type, ComponentEventsDropped, UNINTENTIONAL,
15    };
16
17    use crate::{
18        event::metric::{MetricKind, MetricValue},
19        sinks::splunk_hec::common::acknowledgements::HecAckApiError,
20    };
21
22    #[derive(Debug)]
23    pub struct SplunkEventEncodeError {
24        pub error: vector_lib::Error,
25    }
26
27    impl InternalEvent for SplunkEventEncodeError {
28        fn emit(self) {
29            let reason = "Failed to encode Splunk HEC event as JSON.";
30            error!(
31                message = reason,
32                error = ?self.error,
33                error_code = "serializing_json",
34                error_type = error_type::ENCODER_FAILED,
35                stage = error_stage::PROCESSING,
36                internal_log_rate_limit = true,
37            );
38            counter!(
39                "component_errors_total",
40                "error_code" => "serializing_json",
41                "error_type" => error_type::ENCODER_FAILED,
42                "stage" => error_stage::PROCESSING,
43            )
44            .increment(1);
45            emit!(ComponentEventsDropped::<UNINTENTIONAL> { count: 1, reason });
46        }
47    }
48
49    #[derive(Debug)]
50    pub(crate) struct SplunkInvalidMetricReceivedError<'a> {
51        pub value: &'a MetricValue,
52        pub kind: &'a MetricKind,
53        pub error: crate::Error,
54    }
55
56    impl InternalEvent for SplunkInvalidMetricReceivedError<'_> {
57        fn emit(self) {
58            error!(
59                message = "Invalid metric received.",
60                error = ?self.error,
61                error_type = error_type::INVALID_METRIC,
62                stage = error_stage::PROCESSING,
63                value = ?self.value,
64                kind = ?self.kind,
65                internal_log_rate_limit = true,
66            );
67            counter!(
68                "component_errors_total",
69                "error_type" => error_type::INVALID_METRIC,
70                "stage" => error_stage::PROCESSING,
71            )
72            .increment(1);
73            counter!(
74                "component_discarded_events_total",
75                "error_type" => error_type::INVALID_METRIC,
76                "stage" => error_stage::PROCESSING,
77            )
78            .increment(1);
79        }
80    }
81
82    #[derive(Debug)]
83    pub struct SplunkResponseParseError {
84        pub error: Error,
85    }
86
87    impl InternalEvent for SplunkResponseParseError {
88        fn emit(self) {
89            error!(
90                message = "Unable to parse Splunk HEC response. Acknowledging based on initial 200 OK.",
91                error = ?self.error,
92                error_code = "invalid_response",
93                error_type = error_type::PARSER_FAILED,
94                stage = error_stage::SENDING,
95                internal_log_rate_limit = true,
96            );
97            counter!(
98                "component_errors_total",
99                "error_code" => "invalid_response",
100                "error_type" => error_type::PARSER_FAILED,
101                "stage" => error_stage::SENDING,
102            )
103            .increment(1);
104        }
105    }
106
107    #[derive(Debug)]
108    pub struct SplunkIndexerAcknowledgementAPIError {
109        pub message: &'static str,
110        pub error: HecAckApiError,
111    }
112
113    impl InternalEvent for SplunkIndexerAcknowledgementAPIError {
114        fn emit(self) {
115            error!(
116                message = self.message,
117                error = ?self.error,
118                error_code = "indexer_ack_failed",
119                error_type = error_type::ACKNOWLEDGMENT_FAILED,
120                stage = error_stage::SENDING,
121                internal_log_rate_limit = true,
122            );
123            counter!(
124                "component_errors_total",
125                "error_code" => "indexer_ack_failed",
126                "error_type" => error_type::ACKNOWLEDGMENT_FAILED,
127                "stage" => error_stage::SENDING,
128            )
129            .increment(1);
130        }
131    }
132
133    #[derive(Debug)]
134    pub struct SplunkIndexerAcknowledgementUnavailableError<E> {
135        pub error: E,
136    }
137
138    impl<E: std::fmt::Display> InternalEvent for SplunkIndexerAcknowledgementUnavailableError<E> {
139        fn emit(self) {
140            error!(
141                message = "Internal indexer acknowledgement client unavailable. Acknowledging based on initial 200 OK.",
142                error = %self.error,
143                error_code = "indexer_ack_unavailable",
144                error_type = error_type::ACKNOWLEDGMENT_FAILED,
145                stage = error_stage::SENDING,
146                internal_log_rate_limit = true,
147            );
148            counter!(
149                "component_errors_total",
150                "error_code" => "indexer_ack_unavailable",
151                "error_type" => error_type::ACKNOWLEDGMENT_FAILED,
152                "stage" => error_stage::SENDING,
153            )
154            .increment(1);
155        }
156    }
157
158    pub struct SplunkIndexerAcknowledgementAckAdded;
159
160    impl InternalEvent for SplunkIndexerAcknowledgementAckAdded {
161        fn emit(self) {
162            gauge!("splunk_pending_acks").increment(1.0);
163        }
164    }
165
166    pub struct SplunkIndexerAcknowledgementAcksRemoved {
167        pub count: f64,
168    }
169
170    impl InternalEvent for SplunkIndexerAcknowledgementAcksRemoved {
171        fn emit(self) {
172            gauge!("splunk_pending_acks").decrement(self.count);
173        }
174    }
175
176    pub struct SplunkEventTimestampInvalidType<'a> {
177        pub r#type: &'a str,
178    }
179
180    impl InternalEvent for SplunkEventTimestampInvalidType<'_> {
181        fn emit(self) {
182            warn!(
183                message =
184                    "Timestamp was an unexpected type. Deferring to Splunk to set the timestamp.",
185                invalid_type = self.r#type,
186                internal_log_rate_limit = true
187            );
188        }
189    }
190
191    pub struct SplunkEventTimestampMissing;
192
193    impl InternalEvent for SplunkEventTimestampMissing {
194        fn emit(self) {
195            warn!(
196                message = "Timestamp was not found. Deferring to Splunk to set the timestamp.",
197                internal_log_rate_limit = true
198            );
199        }
200    }
201}
202
203#[cfg(feature = "sources-splunk_hec")]
204mod source {
205    use metrics::counter;
206    use vector_lib::internal_event::InternalEvent;
207
208    use crate::sources::splunk_hec::ApiError;
209    use vector_lib::internal_event::{error_stage, error_type};
210
211    #[derive(Debug)]
212    pub struct SplunkHecRequestBodyInvalidError {
213        pub error: std::io::Error,
214    }
215
216    impl InternalEvent for SplunkHecRequestBodyInvalidError {
217        fn emit(self) {
218            error!(
219                message = "Invalid request body.",
220                error = ?self.error,
221                error_code = "invalid_request_body",
222                error_type = error_type::PARSER_FAILED,
223                stage = error_stage::PROCESSING,
224                internal_log_rate_limit = true
225            );
226            counter!(
227                "component_errors_total",
228                "error_code" => "invalid_request_body",
229                "error_type" => error_type::PARSER_FAILED,
230                "stage" => error_stage::PROCESSING,
231            )
232            .increment(1);
233        }
234    }
235
236    #[derive(Debug)]
237    pub struct SplunkHecRequestError {
238        pub(crate) error: ApiError,
239    }
240
241    impl InternalEvent for SplunkHecRequestError {
242        fn emit(self) {
243            error!(
244                message = "Error processing request.",
245                error = ?self.error,
246                error_type = error_type::REQUEST_FAILED,
247                stage = error_stage::RECEIVING,
248                internal_log_rate_limit = true
249            );
250            counter!(
251                "component_errors_total",
252                "error_type" => error_type::REQUEST_FAILED,
253                "stage" => error_stage::RECEIVING,
254            )
255            .increment(1);
256        }
257    }
258}