vector/internal_events/
splunk_hec.rs

1// ## skip check-dropped-events ##
2
3#[cfg(feature = "sinks-splunk_hec")]
4pub use self::sink::*;
5#[cfg(feature = "sources-splunk_hec")]
6pub use self::source::*;
7
8#[cfg(feature = "sinks-splunk_hec")]
9mod sink {
10    use metrics::{counter, gauge};
11    use serde_json::Error;
12    use vector_lib::internal_event::{
13        ComponentEventsDropped, InternalEvent, UNINTENTIONAL, error_stage, error_type,
14    };
15
16    use crate::{
17        event::metric::{MetricKind, MetricValue},
18        sinks::splunk_hec::common::acknowledgements::HecAckApiError,
19    };
20
21    #[derive(Debug)]
22    pub struct SplunkEventEncodeError {
23        pub error: vector_lib::Error,
24    }
25
26    impl InternalEvent for SplunkEventEncodeError {
27        fn emit(self) {
28            let reason = "Failed to encode Splunk HEC event as JSON.";
29            error!(
30                message = reason,
31                error = ?self.error,
32                error_code = "serializing_json",
33                error_type = error_type::ENCODER_FAILED,
34                stage = error_stage::PROCESSING,
35            );
36            counter!(
37                "component_errors_total",
38                "error_code" => "serializing_json",
39                "error_type" => error_type::ENCODER_FAILED,
40                "stage" => error_stage::PROCESSING,
41            )
42            .increment(1);
43            emit!(ComponentEventsDropped::<UNINTENTIONAL> { count: 1, reason });
44        }
45    }
46
47    #[derive(Debug)]
48    pub(crate) struct SplunkInvalidMetricReceivedError<'a> {
49        pub value: &'a MetricValue,
50        pub kind: &'a MetricKind,
51        pub error: crate::Error,
52    }
53
54    impl InternalEvent for SplunkInvalidMetricReceivedError<'_> {
55        fn emit(self) {
56            error!(
57                message = "Invalid metric received.",
58                error = ?self.error,
59                error_type = error_type::INVALID_METRIC,
60                stage = error_stage::PROCESSING,
61                value = ?self.value,
62                kind = ?self.kind,
63            );
64            counter!(
65                "component_errors_total",
66                "error_type" => error_type::INVALID_METRIC,
67                "stage" => error_stage::PROCESSING,
68            )
69            .increment(1);
70            counter!(
71                "component_discarded_events_total",
72                "error_type" => error_type::INVALID_METRIC,
73                "stage" => error_stage::PROCESSING,
74            )
75            .increment(1);
76        }
77    }
78
79    #[derive(Debug)]
80    pub struct SplunkResponseParseError {
81        pub error: Error,
82    }
83
84    impl InternalEvent for SplunkResponseParseError {
85        fn emit(self) {
86            error!(
87                message = "Unable to parse Splunk HEC response. Acknowledging based on initial 200 OK.",
88                error = ?self.error,
89                error_code = "invalid_response",
90                error_type = error_type::PARSER_FAILED,
91                stage = error_stage::SENDING,
92            );
93            counter!(
94                "component_errors_total",
95                "error_code" => "invalid_response",
96                "error_type" => error_type::PARSER_FAILED,
97                "stage" => error_stage::SENDING,
98            )
99            .increment(1);
100        }
101    }
102
103    #[derive(Debug)]
104    pub struct SplunkIndexerAcknowledgementAPIError {
105        pub message: &'static str,
106        pub error: HecAckApiError,
107    }
108
109    impl InternalEvent for SplunkIndexerAcknowledgementAPIError {
110        fn emit(self) {
111            error!(
112                message = self.message,
113                error = ?self.error,
114                error_code = "indexer_ack_failed",
115                error_type = error_type::ACKNOWLEDGMENT_FAILED,
116                stage = error_stage::SENDING,
117            );
118            counter!(
119                "component_errors_total",
120                "error_code" => "indexer_ack_failed",
121                "error_type" => error_type::ACKNOWLEDGMENT_FAILED,
122                "stage" => error_stage::SENDING,
123            )
124            .increment(1);
125        }
126    }
127
128    #[derive(Debug)]
129    pub struct SplunkIndexerAcknowledgementUnavailableError<E> {
130        pub error: E,
131    }
132
133    impl<E: std::fmt::Display> InternalEvent for SplunkIndexerAcknowledgementUnavailableError<E> {
134        fn emit(self) {
135            error!(
136                message = "Internal indexer acknowledgement client unavailable. Acknowledging based on initial 200 OK.",
137                error = %self.error,
138                error_code = "indexer_ack_unavailable",
139                error_type = error_type::ACKNOWLEDGMENT_FAILED,
140                stage = error_stage::SENDING,
141            );
142            counter!(
143                "component_errors_total",
144                "error_code" => "indexer_ack_unavailable",
145                "error_type" => error_type::ACKNOWLEDGMENT_FAILED,
146                "stage" => error_stage::SENDING,
147            )
148            .increment(1);
149        }
150    }
151
152    pub struct SplunkIndexerAcknowledgementAckAdded;
153
154    impl InternalEvent for SplunkIndexerAcknowledgementAckAdded {
155        fn emit(self) {
156            gauge!("splunk_pending_acks").increment(1.0);
157        }
158    }
159
160    pub struct SplunkIndexerAcknowledgementAcksRemoved {
161        pub count: f64,
162    }
163
164    impl InternalEvent for SplunkIndexerAcknowledgementAcksRemoved {
165        fn emit(self) {
166            gauge!("splunk_pending_acks").decrement(self.count);
167        }
168    }
169
170    pub struct SplunkEventTimestampInvalidType<'a> {
171        pub r#type: &'a str,
172    }
173
174    impl InternalEvent for SplunkEventTimestampInvalidType<'_> {
175        fn emit(self) {
176            warn!(
177                message =
178                    "Timestamp was an unexpected type. Deferring to Splunk to set the timestamp.",
179                invalid_type = self.r#type
180            );
181        }
182    }
183
184    pub struct SplunkEventTimestampMissing;
185
186    impl InternalEvent for SplunkEventTimestampMissing {
187        fn emit(self) {
188            warn!("Timestamp was not found. Deferring to Splunk to set the timestamp.");
189        }
190    }
191}
192
193#[cfg(feature = "sources-splunk_hec")]
194mod source {
195    use metrics::counter;
196    use vector_lib::internal_event::{InternalEvent, error_stage, error_type};
197
198    use crate::sources::splunk_hec::ApiError;
199
200    #[derive(Debug)]
201    pub struct SplunkHecRequestBodyInvalidError {
202        pub error: std::io::Error,
203    }
204
205    impl InternalEvent for SplunkHecRequestBodyInvalidError {
206        fn emit(self) {
207            error!(
208                message = "Invalid request body.",
209                error = ?self.error,
210                error_code = "invalid_request_body",
211                error_type = error_type::PARSER_FAILED,
212                stage = error_stage::PROCESSING
213            );
214            counter!(
215                "component_errors_total",
216                "error_code" => "invalid_request_body",
217                "error_type" => error_type::PARSER_FAILED,
218                "stage" => error_stage::PROCESSING,
219            )
220            .increment(1);
221        }
222    }
223
224    #[derive(Debug)]
225    pub struct SplunkHecRequestError {
226        pub(crate) error: ApiError,
227    }
228
229    impl InternalEvent for SplunkHecRequestError {
230        fn emit(self) {
231            error!(
232                message = "Error processing request.",
233                error = ?self.error,
234                error_type = error_type::REQUEST_FAILED,
235                stage = error_stage::RECEIVING
236            );
237            counter!(
238                "component_errors_total",
239                "error_type" => error_type::REQUEST_FAILED,
240                "stage" => error_stage::RECEIVING,
241            )
242            .increment(1);
243        }
244    }
245}