vector/internal_events/
splunk_hec.rs

1// ## skip check-dropped-events ##
2
3#[cfg(feature = "sinks-splunk_hec")]
4pub use self::sink::*;
5#[cfg(feature = "sources-splunk_hec")]
6pub use self::source::*;
7
8#[cfg(feature = "sinks-splunk_hec")]
9mod sink {
10    use serde_json::Error;
11    use vector_lib::{
12        NamedInternalEvent, counter, gauge,
13        internal_event::{
14            ComponentEventsDropped, CounterName, GaugeName, InternalEvent, UNINTENTIONAL,
15            error_stage, error_type,
16        },
17    };
18
19    use crate::{
20        event::metric::{MetricKind, MetricValue},
21        sinks::splunk_hec::common::acknowledgements::HecAckApiError,
22    };
23
24    #[derive(Debug, NamedInternalEvent)]
25    pub struct SplunkEventEncodeError {
26        pub error: vector_lib::Error,
27    }
28
29    impl InternalEvent for SplunkEventEncodeError {
30        fn emit(self) {
31            let reason = "Failed to encode Splunk HEC event as JSON.";
32            error!(
33                message = reason,
34                error = ?self.error,
35                error_code = "serializing_json",
36                error_type = error_type::ENCODER_FAILED,
37                stage = error_stage::PROCESSING,
38            );
39            counter!(
40                CounterName::ComponentErrorsTotal,
41                "error_code" => "serializing_json",
42                "error_type" => error_type::ENCODER_FAILED,
43                "stage" => error_stage::PROCESSING,
44            )
45            .increment(1);
46            emit!(ComponentEventsDropped::<UNINTENTIONAL> { count: 1, reason });
47        }
48    }
49
50    #[derive(Debug, NamedInternalEvent)]
51    pub(crate) struct SplunkInvalidMetricReceivedError<'a> {
52        pub value: &'a MetricValue,
53        pub kind: &'a MetricKind,
54        pub error: crate::Error,
55    }
56
57    impl InternalEvent for SplunkInvalidMetricReceivedError<'_> {
58        fn emit(self) {
59            error!(
60                message = "Invalid metric received.",
61                error = ?self.error,
62                error_type = error_type::INVALID_METRIC,
63                stage = error_stage::PROCESSING,
64                value = ?self.value,
65                kind = ?self.kind,
66            );
67            counter!(
68                CounterName::ComponentErrorsTotal,
69                "error_type" => error_type::INVALID_METRIC,
70                "stage" => error_stage::PROCESSING,
71            )
72            .increment(1);
73            counter!(
74                CounterName::ComponentDiscardedEventsTotal,
75                "error_type" => error_type::INVALID_METRIC,
76                "stage" => error_stage::PROCESSING,
77            )
78            .increment(1);
79        }
80    }
81
82    #[derive(Debug, NamedInternalEvent)]
83    pub struct SplunkResponseParseError {
84        pub error: Error,
85    }
86
87    impl InternalEvent for SplunkResponseParseError {
88        fn emit(self) {
89            error!(
90                message = "Unable to parse Splunk HEC response. Acknowledging based on initial 200 OK.",
91                error = ?self.error,
92                error_code = "invalid_response",
93                error_type = error_type::PARSER_FAILED,
94                stage = error_stage::SENDING,
95            );
96            counter!(
97                CounterName::ComponentErrorsTotal,
98                "error_code" => "invalid_response",
99                "error_type" => error_type::PARSER_FAILED,
100                "stage" => error_stage::SENDING,
101            )
102            .increment(1);
103        }
104    }
105
106    #[derive(Debug, NamedInternalEvent)]
107    pub struct SplunkIndexerAcknowledgementAPIError {
108        pub message: &'static str,
109        pub error: HecAckApiError,
110    }
111
112    impl InternalEvent for SplunkIndexerAcknowledgementAPIError {
113        fn emit(self) {
114            error!(
115                message = self.message,
116                error = ?self.error,
117                error_code = "indexer_ack_failed",
118                error_type = error_type::ACKNOWLEDGMENT_FAILED,
119                stage = error_stage::SENDING,
120            );
121            counter!(
122                CounterName::ComponentErrorsTotal,
123                "error_code" => "indexer_ack_failed",
124                "error_type" => error_type::ACKNOWLEDGMENT_FAILED,
125                "stage" => error_stage::SENDING,
126            )
127            .increment(1);
128        }
129    }
130
131    #[derive(Debug, NamedInternalEvent)]
132    pub struct SplunkIndexerAcknowledgementUnavailableError<E> {
133        pub error: E,
134    }
135
136    impl<E: std::fmt::Display> InternalEvent for SplunkIndexerAcknowledgementUnavailableError<E> {
137        fn emit(self) {
138            error!(
139                message = "Internal indexer acknowledgement client unavailable. Acknowledging based on initial 200 OK.",
140                error = %self.error,
141                error_code = "indexer_ack_unavailable",
142                error_type = error_type::ACKNOWLEDGMENT_FAILED,
143                stage = error_stage::SENDING,
144            );
145            counter!(
146                CounterName::ComponentErrorsTotal,
147                "error_code" => "indexer_ack_unavailable",
148                "error_type" => error_type::ACKNOWLEDGMENT_FAILED,
149                "stage" => error_stage::SENDING,
150            )
151            .increment(1);
152        }
153    }
154
155    #[derive(NamedInternalEvent)]
156    pub struct SplunkIndexerAcknowledgementAckAdded;
157
158    impl InternalEvent for SplunkIndexerAcknowledgementAckAdded {
159        fn emit(self) {
160            gauge!(GaugeName::SplunkPendingAcks).increment(1.0);
161        }
162    }
163
164    #[derive(NamedInternalEvent)]
165    pub struct SplunkIndexerAcknowledgementAcksRemoved {
166        pub count: f64,
167    }
168
169    impl InternalEvent for SplunkIndexerAcknowledgementAcksRemoved {
170        fn emit(self) {
171            gauge!(GaugeName::SplunkPendingAcks).decrement(self.count);
172        }
173    }
174
175    #[derive(NamedInternalEvent)]
176    pub struct SplunkEventTimestampInvalidType<'a> {
177        pub r#type: &'a str,
178    }
179
180    impl InternalEvent for SplunkEventTimestampInvalidType<'_> {
181        fn emit(self) {
182            warn!(
183                message =
184                    "Timestamp was an unexpected type. Deferring to Splunk to set the timestamp.",
185                invalid_type = self.r#type
186            );
187        }
188    }
189
190    #[derive(NamedInternalEvent)]
191    pub struct SplunkEventTimestampMissing;
192
193    impl InternalEvent for SplunkEventTimestampMissing {
194        fn emit(self) {
195            warn!("Timestamp was not found. Deferring to Splunk to set the timestamp.");
196        }
197    }
198}
199
200#[cfg(feature = "sources-splunk_hec")]
201mod source {
202    use vector_lib::{
203        NamedInternalEvent, counter,
204        internal_event::{CounterName, InternalEvent, error_stage, error_type},
205    };
206
207    use crate::sources::splunk_hec::ApiError;
208
209    #[derive(Debug, NamedInternalEvent)]
210    pub struct SplunkHecRequestBodyInvalidError {
211        pub error: std::io::Error,
212    }
213
214    impl InternalEvent for SplunkHecRequestBodyInvalidError {
215        fn emit(self) {
216            error!(
217                message = "Invalid request body.",
218                error = ?self.error,
219                error_code = "invalid_request_body",
220                error_type = error_type::PARSER_FAILED,
221                stage = error_stage::PROCESSING
222            );
223            counter!(
224                CounterName::ComponentErrorsTotal,
225                "error_code" => "invalid_request_body",
226                "error_type" => error_type::PARSER_FAILED,
227                "stage" => error_stage::PROCESSING,
228            )
229            .increment(1);
230        }
231    }
232
233    #[derive(Debug, NamedInternalEvent)]
234    pub struct SplunkHecRequestError {
235        pub(crate) error: ApiError,
236    }
237
238    impl InternalEvent for SplunkHecRequestError {
239        fn emit(self) {
240            match self.error {
241                ApiError::InvalidAuthorization | ApiError::MissingAuthorization => {
242                    error!(
243                        message = "Unauthenticated request.",
244                        error = ?self.error,
245                        error_type = error_type::AUTHENTICATION_FAILED,
246                        stage = error_stage::RECEIVING
247                    );
248                    counter!(
249                        CounterName::ComponentErrorsTotal,
250                        "error_type" => error_type::AUTHENTICATION_FAILED,
251                        "stage" => error_stage::RECEIVING,
252                    )
253                    .increment(1);
254                }
255                _ => {
256                    error!(
257                        message = "Error processing request.",
258                        error = ?self.error,
259                        error_type = error_type::REQUEST_FAILED,
260                        stage = error_stage::RECEIVING
261                    );
262                    counter!(
263                        CounterName::ComponentErrorsTotal,
264                        "error_type" => error_type::REQUEST_FAILED,
265                        "stage" => error_stage::RECEIVING,
266                    )
267                    .increment(1);
268                }
269            }
270        }
271    }
272}