vector/internal_events/
aws_kinesis_firehose.rs

1use metrics::counter;
2use vector_lib::internal_event::{InternalEvent, error_stage, error_type};
3
4use super::prelude::{http_error_code, io_error_code};
5use crate::sources::aws_kinesis_firehose::Compression;
6
7#[derive(Debug)]
8pub struct AwsKinesisFirehoseRequestReceived<'a> {
9    pub request_id: Option<&'a str>,
10    pub source_arn: Option<&'a str>,
11}
12
13impl InternalEvent for AwsKinesisFirehoseRequestReceived<'_> {
14    fn emit(self) {
15        debug!(
16            message = "Handling AWS Kinesis Firehose request.",
17            request_id = %self.request_id.unwrap_or_default(),
18            source_arn = %self.source_arn.unwrap_or_default(),
19            internal_log_rate_limit = true
20        );
21    }
22}
23
24#[derive(Debug)]
25pub struct AwsKinesisFirehoseRequestError<'a> {
26    request_id: Option<&'a str>,
27    error_code: String,
28    error: &'a str,
29}
30
31impl<'a> AwsKinesisFirehoseRequestError<'a> {
32    pub fn new(code: hyper::StatusCode, error: &'a str, request_id: Option<&'a str>) -> Self {
33        Self {
34            error_code: http_error_code(code.as_u16()),
35            error,
36            request_id,
37        }
38    }
39}
40
41impl InternalEvent for AwsKinesisFirehoseRequestError<'_> {
42    fn emit(self) {
43        error!(
44            message = "Error occurred while handling request.",
45            error = ?self.error,
46            stage = error_stage::RECEIVING,
47            error_type = error_type::REQUEST_FAILED,
48            error_code = %self.error_code,
49            request_id = %self.request_id.unwrap_or(""),
50            internal_log_rate_limit = true,
51        );
52        counter!(
53            "component_errors_total",
54            "stage" => error_stage::RECEIVING,
55            "error_type" => error_type::REQUEST_FAILED,
56            "error_code" => self.error_code,
57        )
58        .increment(1);
59    }
60}
61
62#[derive(Debug)]
63pub struct AwsKinesisFirehoseAutomaticRecordDecodeError {
64    pub compression: Compression,
65    pub error: std::io::Error,
66}
67
68impl InternalEvent for AwsKinesisFirehoseAutomaticRecordDecodeError {
69    fn emit(self) {
70        error!(
71            message = "Detected record failed to decode so passing along data as-is.",
72            error = ?self.error,
73            stage = error_stage::PROCESSING,
74            error_type = error_type::PARSER_FAILED,
75            error_code = %io_error_code(&self.error),
76            compression = %self.compression,
77            internal_log_rate_limit = true,
78        );
79        counter!(
80            "component_errors_total",
81            "stage" => error_stage::PROCESSING,
82            "error_type" => error_type::PARSER_FAILED,
83            "error_code" => io_error_code(&self.error),
84        )
85        .increment(1);
86    }
87}