1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
use std::sync::Arc;

use bytes::Bytes;
use vector_lib::event::{EventFinalizers, Finalizable};
use vector_lib::request_metadata::RequestMetadata;

use super::{
    encoder::HecLogsEncoder,
    sink::{HecProcessedEvent, Partitioned},
};
use crate::sinks::{
    splunk_hec::common::request::HecRequest,
    util::{
        metadata::RequestMetadataBuilder, request_builder::EncodeResult, Compression,
        RequestBuilder,
    },
};

pub struct HecLogsRequestBuilder {
    pub encoder: HecLogsEncoder,
    pub compression: Compression,
}

#[derive(Debug, Clone)]
pub struct HecRequestMetadata {
    finalizers: EventFinalizers,
    partition: Option<Arc<str>>,
    source: Option<String>,
    sourcetype: Option<String>,
    index: Option<String>,
    host: Option<String>,
}

impl RequestBuilder<(Option<Partitioned>, Vec<HecProcessedEvent>)> for HecLogsRequestBuilder {
    type Metadata = HecRequestMetadata;
    type Events = Vec<HecProcessedEvent>;
    type Encoder = HecLogsEncoder;
    type Payload = Bytes;
    type Request = HecRequest;
    type Error = std::io::Error;

    fn compression(&self) -> Compression {
        self.compression
    }

    fn encoder(&self) -> &Self::Encoder {
        &self.encoder
    }

    fn split_input(
        &self,
        input: (Option<Partitioned>, Vec<HecProcessedEvent>),
    ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
        let (mut partition, mut events) = input;

        let finalizers = events.take_finalizers();

        let builder = RequestMetadataBuilder::from_events(&events);

        (
            HecRequestMetadata {
                finalizers,
                partition: partition.as_ref().and_then(|p| p.token.clone()),
                source: partition.as_mut().and_then(|p| p.source.take()),
                sourcetype: partition.as_mut().and_then(|p| p.sourcetype.take()),
                index: partition.as_mut().and_then(|p| p.index.take()),
                host: partition.as_mut().and_then(|p| p.host.take()),
            },
            builder,
            events,
        )
    }

    fn build_request(
        &self,
        hec_metadata: Self::Metadata,
        metadata: RequestMetadata,
        payload: EncodeResult<Self::Payload>,
    ) -> Self::Request {
        HecRequest {
            body: payload.into_payload(),
            finalizers: hec_metadata.finalizers,
            passthrough_token: hec_metadata.partition,
            source: hec_metadata.source,
            sourcetype: hec_metadata.sourcetype,
            index: hec_metadata.index,
            host: hec_metadata.host,
            metadata,
        }
    }
}