vector/internal_events/
aws_kinesis_firehose.rs1use metrics::counter;
2use vector_lib::internal_event::{InternalEvent, error_stage, error_type};
3
4use super::prelude::{http_error_code, io_error_code};
5use crate::sources::aws_kinesis_firehose::Compression;
6
7#[derive(Debug)]
8pub struct AwsKinesisFirehoseRequestReceived<'a> {
9 pub request_id: Option<&'a str>,
10 pub source_arn: Option<&'a str>,
11}
12
13impl InternalEvent for AwsKinesisFirehoseRequestReceived<'_> {
14 fn emit(self) {
15 debug!(
16 message = "Handling AWS Kinesis Firehose request.",
17 request_id = %self.request_id.unwrap_or_default(),
18 source_arn = %self.source_arn.unwrap_or_default(),
19 internal_log_rate_limit = true
20 );
21 }
22}
23
24#[derive(Debug)]
25pub struct AwsKinesisFirehoseRequestError<'a> {
26 request_id: Option<&'a str>,
27 error_code: String,
28 error: &'a str,
29}
30
31impl<'a> AwsKinesisFirehoseRequestError<'a> {
32 pub fn new(code: hyper::StatusCode, error: &'a str, request_id: Option<&'a str>) -> Self {
33 Self {
34 error_code: http_error_code(code.as_u16()),
35 error,
36 request_id,
37 }
38 }
39}
40
41impl InternalEvent for AwsKinesisFirehoseRequestError<'_> {
42 fn emit(self) {
43 error!(
44 message = "Error occurred while handling request.",
45 error = ?self.error,
46 stage = error_stage::RECEIVING,
47 error_type = error_type::REQUEST_FAILED,
48 error_code = %self.error_code,
49 request_id = %self.request_id.unwrap_or(""),
50 internal_log_rate_limit = true,
51 );
52 counter!(
53 "component_errors_total",
54 "stage" => error_stage::RECEIVING,
55 "error_type" => error_type::REQUEST_FAILED,
56 "error_code" => self.error_code,
57 )
58 .increment(1);
59 }
60}
61
62#[derive(Debug)]
63pub struct AwsKinesisFirehoseAutomaticRecordDecodeError {
64 pub compression: Compression,
65 pub error: std::io::Error,
66}
67
68impl InternalEvent for AwsKinesisFirehoseAutomaticRecordDecodeError {
69 fn emit(self) {
70 error!(
71 message = "Detected record failed to decode so passing along data as-is.",
72 error = ?self.error,
73 stage = error_stage::PROCESSING,
74 error_type = error_type::PARSER_FAILED,
75 error_code = %io_error_code(&self.error),
76 compression = %self.compression,
77 internal_log_rate_limit = true,
78 );
79 counter!(
80 "component_errors_total",
81 "stage" => error_stage::PROCESSING,
82 "error_type" => error_type::PARSER_FAILED,
83 "error_code" => io_error_code(&self.error),
84 )
85 .increment(1);
86 }
87}