vector/sinks/gcp_chronicle/
chronicle_unstructured.rs

1//! This sink sends data to Google Chronicles unstructured log entries endpoint.
2//! See <https://cloud.google.com/chronicle/docs/reference/ingestion-api#unstructuredlogentries>
3//! for more information.
4use bytes::{Bytes, BytesMut};
5
6use futures_util::{future::BoxFuture, task::Poll};
7use goauth::scopes::Scope;
8use http::header::{self, HeaderName, HeaderValue};
9use http::{Request, StatusCode, Uri};
10use hyper::Body;
11use indoc::indoc;
12use serde::Serialize;
13use serde_json::json;
14use snafu::Snafu;
15use std::collections::HashMap;
16use std::io;
17use tokio_util::codec::Encoder as _;
18use tower::{Service, ServiceBuilder};
19use vector_lib::configurable::configurable_component;
20use vector_lib::request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata};
21use vector_lib::{
22    config::{telemetry, AcknowledgementsConfig, Input},
23    event::{Event, EventFinalizers, Finalizable},
24    sink::VectorSink,
25    EstimatedJsonEncodedSizeOf,
26};
27use vrl::value::Kind;
28
29use crate::sinks::util::service::TowerRequestConfigDefaults;
30use crate::{
31    codecs::{self, EncodingConfig},
32    config::{GenerateConfig, SinkConfig, SinkContext},
33    gcp::{GcpAuthConfig, GcpAuthenticator},
34    http::HttpClient,
35    schema,
36    sinks::{
37        gcp_chronicle::{
38            compression::ChronicleCompression,
39            partitioner::{ChroniclePartitionKey, ChroniclePartitioner},
40            sink::ChronicleSink,
41        },
42        gcs_common::{
43            config::{healthcheck_response, GcsRetryLogic},
44            service::GcsResponse,
45        },
46        util::{
47            encoding::{as_tracked_write, Encoder},
48            metadata::RequestMetadataBuilder,
49            request_builder::EncodeResult,
50            BatchConfig, Compression, RequestBuilder, SinkBatchSettings, TowerRequestConfig,
51        },
52        Healthcheck,
53    },
54    template::{Template, TemplateParseError},
55    tls::{TlsConfig, TlsSettings},
56};
57
58#[derive(Debug, Snafu)]
59#[snafu(visibility(pub))]
60pub enum GcsHealthcheckError {
61    #[snafu(display("log_type template parse error: {}", source))]
62    LogTypeTemplate { source: TemplateParseError },
63
64    #[snafu(display("Endpoint not found"))]
65    NotFound,
66}
67
68/// Google Chronicle regions.
69#[configurable_component]
70#[derive(Clone, Copy, Debug, Eq, PartialEq)]
71#[serde(rename_all = "snake_case")]
72pub enum Region {
73    /// European Multi region
74    Eu,
75
76    /// US Multi region
77    Us,
78
79    /// APAC region (this is the same as the Singapore region endpoint retained for backwards compatibility)
80    Asia,
81
82    /// SãoPaulo Region
83    SãoPaulo,
84
85    /// Canada Region
86    Canada,
87
88    /// Dammam Region
89    Dammam,
90
91    /// Doha Region
92    Doha,
93
94    /// Frankfurt Region
95    Frankfurt,
96
97    /// London Region
98    London,
99
100    /// Mumbai Region
101    Mumbai,
102
103    /// Paris Region
104    Paris,
105
106    /// Singapore Region
107    Singapore,
108
109    /// Sydney Region
110    Sydney,
111
112    /// TelAviv Region
113    TelAviv,
114
115    /// Tokyo Region
116    Tokyo,
117
118    /// Turin Region
119    Turin,
120
121    /// Zurich Region
122    Zurich,
123}
124
125impl Region {
126    /// Each region has a its own endpoint.
127    const fn endpoint(self) -> &'static str {
128        match self {
129            Region::Eu => "https://europe-malachiteingestion-pa.googleapis.com",
130            Region::Us => "https://malachiteingestion-pa.googleapis.com",
131            Region::Asia => "https://asia-southeast1-malachiteingestion-pa.googleapis.com",
132            Region::SãoPaulo => "https://southamerica-east1-malachiteingestion-pa.googleapis.com",
133            Region::Canada => {
134                "https://northamerica-northeast2-malachiteingestion-pa.googleapis.com"
135            }
136            Region::Dammam => "https://me-central2-malachiteingestion-pa.googleapis.com",
137            Region::Doha => "https://me-central1-malachiteingestion-pa.googleapis.com",
138            Region::Frankfurt => "https://europe-west3-malachiteingestion-pa.googleapis.com",
139            Region::London => "https://europe-west2-malachiteingestion-pa.googleapis.com",
140            Region::Mumbai => "https://asia-south1-malachiteingestion-pa.googleapis.com",
141            Region::Paris => "https://europe-west9-malachiteingestion-pa.googleapis.com",
142            Region::Singapore => "https://asia-southeast1-malachiteingestion-pa.googleapis.com",
143            Region::Sydney => "https://australia-southeast1-malachiteingestion-pa.googleapis.com",
144            Region::TelAviv => "https://me-west1-malachiteingestion-pa.googleapis.com",
145            Region::Tokyo => "https://asia-northeast1-malachiteingestion-pa.googleapis.com",
146            Region::Turin => "https://europe-west12-malachiteingestion-pa.googleapis.com",
147            Region::Zurich => "https://europe-west6-malachiteingestion-pa.googleapis.com",
148        }
149    }
150}
151
152#[derive(Clone, Copy, Debug, Default)]
153pub struct ChronicleUnstructuredDefaultBatchSettings;
154
155// Chronicle Ingestion API has a 1MB limit[1] for unstructured log entries. We're also using a
156// conservatively low batch timeout to ensure events make it to Chronicle in a timely fashion, but
157// high enough that it allows for reasonable batching.
158//
159// [1]: https://cloud.google.com/chronicle/docs/reference/ingestion-api#unstructuredlogentries
160impl SinkBatchSettings for ChronicleUnstructuredDefaultBatchSettings {
161    const MAX_EVENTS: Option<usize> = None;
162    const MAX_BYTES: Option<usize> = Some(1_000_000);
163    const TIMEOUT_SECS: f64 = 15.0;
164}
165
166#[derive(Clone, Copy, Debug)]
167pub struct ChronicleUnstructuredTowerRequestConfigDefaults;
168
169impl TowerRequestConfigDefaults for ChronicleUnstructuredTowerRequestConfigDefaults {
170    const RATE_LIMIT_NUM: u64 = 1_000;
171}
172
173/// Configuration for the `gcp_chronicle_unstructured` sink.
174#[configurable_component(sink(
175    "gcp_chronicle_unstructured",
176    "Store unstructured log events in Google Chronicle."
177))]
178#[derive(Clone, Debug)]
179pub struct ChronicleUnstructuredConfig {
180    /// The endpoint to send data to.
181    #[configurable(metadata(
182        docs::examples = "127.0.0.1:8080",
183        docs::examples = "example.com:12345"
184    ))]
185    pub endpoint: Option<String>,
186
187    /// The GCP region to use.
188    #[configurable(derived)]
189    pub region: Option<Region>,
190
191    /// The Unique identifier (UUID) corresponding to the Chronicle instance.
192    #[configurable(validation(format = "uuid"))]
193    #[configurable(metadata(docs::examples = "c8c65bfa-5f2c-42d4-9189-64bb7b939f2c"))]
194    pub customer_id: String,
195
196    /// User-configured environment namespace to identify the data domain the logs originated from.
197    #[configurable(metadata(docs::templateable))]
198    #[configurable(metadata(
199        docs::examples = "production",
200        docs::examples = "production-{{ namespace }}",
201    ))]
202    #[configurable(metadata(docs::advanced))]
203    pub namespace: Option<Template>,
204
205    /// A set of labels that are attached to each batch of events.
206    #[configurable(metadata(docs::examples = "chronicle_labels_examples()"))]
207    #[configurable(metadata(docs::additional_props_description = "A Chronicle label."))]
208    pub labels: Option<HashMap<String, String>>,
209
210    #[serde(flatten)]
211    pub auth: GcpAuthConfig,
212
213    #[configurable(derived)]
214    #[serde(default)]
215    pub batch: BatchConfig<ChronicleUnstructuredDefaultBatchSettings>,
216
217    #[configurable(derived)]
218    pub encoding: EncodingConfig,
219
220    #[serde(default)]
221    #[configurable(derived)]
222    pub compression: ChronicleCompression,
223
224    #[configurable(derived)]
225    #[serde(default)]
226    pub request: TowerRequestConfig<ChronicleUnstructuredTowerRequestConfigDefaults>,
227
228    #[configurable(derived)]
229    pub tls: Option<TlsConfig>,
230
231    /// The type of log entries in a request.
232    ///
233    /// This must be one of the [supported log types][unstructured_log_types_doc], otherwise
234    /// Chronicle rejects the entry with an error.
235    ///
236    /// [unstructured_log_types_doc]: https://cloud.google.com/chronicle/docs/ingestion/parser-list/supported-default-parsers
237    #[configurable(metadata(docs::examples = "WINDOWS_DNS", docs::examples = "{{ log_type }}"))]
238    pub log_type: Template,
239
240    /// The default `log_type` to attach to events if the template in `log_type` cannot be resolved.
241    #[configurable(metadata(docs::examples = "VECTOR_DEV"))]
242    pub fallback_log_type: Option<String>,
243
244    #[configurable(derived)]
245    #[serde(
246        default,
247        deserialize_with = "crate::serde::bool_or_struct",
248        skip_serializing_if = "crate::serde::is_default"
249    )]
250    acknowledgements: AcknowledgementsConfig,
251}
252
253fn chronicle_labels_examples() -> HashMap<String, String> {
254    let mut examples = HashMap::new();
255    examples.insert("source".to_string(), "vector".to_string());
256    examples.insert("tenant".to_string(), "marketing".to_string());
257    examples
258}
259
260impl GenerateConfig for ChronicleUnstructuredConfig {
261    fn generate_config() -> toml::Value {
262        toml::from_str(indoc! {r#"
263            credentials_path = "/path/to/credentials.json"
264            customer_id = "customer_id"
265            namespace = "namespace"
266            compression = "gzip"
267            log_type = "log_type"
268            fallback_log_type = "VECTOR_DEV"
269            encoding.codec = "text"
270        "#})
271        .unwrap()
272    }
273}
274
275pub fn build_healthcheck(
276    client: HttpClient,
277    base_url: &str,
278    auth: GcpAuthenticator,
279) -> crate::Result<Healthcheck> {
280    let uri = base_url.parse::<Uri>()?;
281
282    let healthcheck = async move {
283        let mut request = http::Request::get(&uri).body(Body::empty())?;
284        auth.apply(&mut request);
285
286        let response = client.send(request).await?;
287        healthcheck_response(response, GcsHealthcheckError::NotFound.into())
288    };
289
290    Ok(Box::pin(healthcheck))
291}
292
293#[derive(Debug, Snafu)]
294pub enum ChronicleError {
295    #[snafu(display("Region or endpoint not defined"))]
296    RegionOrEndpoint,
297    #[snafu(display("You can only specify one of region or endpoint"))]
298    BothRegionAndEndpoint,
299}
300
301#[async_trait::async_trait]
302#[typetag::serde(name = "gcp_chronicle_unstructured")]
303impl SinkConfig for ChronicleUnstructuredConfig {
304    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
305        let creds = self.auth.build(Scope::MalachiteIngestion).await?;
306
307        let tls = TlsSettings::from_options(self.tls.as_ref())?;
308        let client = HttpClient::new(tls, cx.proxy())?;
309
310        let endpoint = self.create_endpoint("v2/unstructuredlogentries:batchCreate")?;
311
312        // For the healthcheck we see if we can fetch the list of available log types.
313        let healthcheck_endpoint = self.create_endpoint("v2/logtypes")?;
314
315        let healthcheck = build_healthcheck(client.clone(), &healthcheck_endpoint, creds.clone())?;
316        creds.spawn_regenerate_token();
317        let sink = self.build_sink(client, endpoint, creds)?;
318
319        Ok((sink, healthcheck))
320    }
321
322    fn input(&self) -> Input {
323        let requirement =
324            schema::Requirement::empty().required_meaning("timestamp", Kind::timestamp());
325
326        Input::log().with_schema_requirement(requirement)
327    }
328
329    fn acknowledgements(&self) -> &AcknowledgementsConfig {
330        &self.acknowledgements
331    }
332}
333
334impl ChronicleUnstructuredConfig {
335    fn build_sink(
336        &self,
337        client: HttpClient,
338        base_url: String,
339        creds: GcpAuthenticator,
340    ) -> crate::Result<VectorSink> {
341        use crate::sinks::util::service::ServiceBuilderExt;
342
343        let request = self.request.into_settings();
344
345        let batch_settings = self.batch.into_batcher_settings()?;
346
347        let partitioner = self.partitioner()?;
348
349        let svc = ServiceBuilder::new()
350            .settings(request, GcsRetryLogic::default())
351            .service(ChronicleService::new(client, base_url, creds));
352
353        let request_settings = ChronicleRequestBuilder::new(self)?;
354
355        let sink = ChronicleSink::new(svc, request_settings, partitioner, batch_settings, "http");
356
357        Ok(VectorSink::from_event_streamsink(sink))
358    }
359
360    fn partitioner(&self) -> crate::Result<ChroniclePartitioner> {
361        Ok(ChroniclePartitioner::new(
362            self.log_type.clone(),
363            self.fallback_log_type.clone(),
364            self.namespace.clone(),
365        ))
366    }
367
368    fn create_endpoint(&self, path: &str) -> Result<String, ChronicleError> {
369        Ok(format!(
370            "{}/{}",
371            match (&self.endpoint, self.region) {
372                (Some(endpoint), None) => endpoint.trim_end_matches('/'),
373                (None, Some(region)) => region.endpoint(),
374                (Some(_), Some(_)) => return Err(ChronicleError::BothRegionAndEndpoint),
375                (None, None) => return Err(ChronicleError::RegionOrEndpoint),
376            },
377            path
378        ))
379    }
380}
381
382#[derive(Clone, Debug)]
383pub struct ChronicleRequest {
384    pub body: Bytes,
385    pub finalizers: EventFinalizers,
386    pub headers: HashMap<HeaderName, HeaderValue>,
387    metadata: RequestMetadata,
388}
389
390impl Finalizable for ChronicleRequest {
391    fn take_finalizers(&mut self) -> EventFinalizers {
392        std::mem::take(&mut self.finalizers)
393    }
394}
395
396impl MetaDescriptive for ChronicleRequest {
397    fn get_metadata(&self) -> &RequestMetadata {
398        &self.metadata
399    }
400
401    fn metadata_mut(&mut self) -> &mut RequestMetadata {
402        &mut self.metadata
403    }
404}
405
406#[derive(Clone, Debug, Serialize)]
407struct ChronicleRequestBody {
408    customer_id: String,
409    #[serde(skip_serializing_if = "Option::is_none")]
410    namespace: Option<String>,
411    #[serde(skip_serializing_if = "Option::is_none")]
412    labels: Option<Vec<Label>>,
413    log_type: String,
414    entries: Vec<serde_json::Value>,
415}
416
417#[derive(Clone, Debug)]
418struct ChronicleEncoder {
419    customer_id: String,
420    labels: Option<Vec<Label>>,
421    encoder: codecs::Encoder<()>,
422    transformer: codecs::Transformer,
423}
424
425impl Encoder<(ChroniclePartitionKey, Vec<Event>)> for ChronicleEncoder {
426    fn encode_input(
427        &self,
428        input: (ChroniclePartitionKey, Vec<Event>),
429        writer: &mut dyn io::Write,
430    ) -> io::Result<(usize, GroupedCountByteSize)> {
431        let (key, events) = input;
432        let mut encoder = self.encoder.clone();
433        let mut byte_size = telemetry().create_request_count_byte_size();
434        let events = events
435            .into_iter()
436            .filter_map(|mut event| {
437                let timestamp = event
438                    .as_log()
439                    .get_timestamp()
440                    .and_then(|ts| ts.as_timestamp())
441                    .cloned();
442                let mut bytes = BytesMut::new();
443                self.transformer.transform(&mut event);
444
445                byte_size.add_event(&event, event.estimated_json_encoded_size_of());
446
447                encoder.encode(event, &mut bytes).ok()?;
448
449                let mut value = json!({
450                    "log_text": String::from_utf8_lossy(&bytes),
451                });
452
453                if let Some(ts) = timestamp {
454                    value.as_object_mut().unwrap().insert(
455                        "ts_rfc3339".to_string(),
456                        ts.to_rfc3339_opts(chrono::SecondsFormat::AutoSi, true)
457                            .into(),
458                    );
459                }
460
461                Some(value)
462            })
463            .collect::<Vec<_>>();
464
465        let json = json!(ChronicleRequestBody {
466            customer_id: self.customer_id.clone(),
467            namespace: key.namespace,
468            labels: self.labels.clone(),
469            log_type: key.log_type,
470            entries: events,
471        });
472
473        let size = as_tracked_write::<_, _, io::Error>(writer, &json, |writer, json| {
474            serde_json::to_writer(writer, json)?;
475            Ok(())
476        })?;
477
478        Ok((size, byte_size))
479    }
480}
481
482// Settings required to produce a request that do not change per
483// request. All possible values are pre-computed for direct use in
484// producing a request.
485#[derive(Clone, Debug)]
486struct ChronicleRequestBuilder {
487    encoder: ChronicleEncoder,
488    compression: Compression,
489}
490
491struct ChronicleRequestPayload {
492    bytes: Bytes,
493}
494
495impl From<Bytes> for ChronicleRequestPayload {
496    fn from(bytes: Bytes) -> Self {
497        Self { bytes }
498    }
499}
500
501impl AsRef<[u8]> for ChronicleRequestPayload {
502    fn as_ref(&self) -> &[u8] {
503        self.bytes.as_ref()
504    }
505}
506
507impl RequestBuilder<(ChroniclePartitionKey, Vec<Event>)> for ChronicleRequestBuilder {
508    type Metadata = EventFinalizers;
509    type Events = (ChroniclePartitionKey, Vec<Event>);
510    type Encoder = ChronicleEncoder;
511    type Payload = ChronicleRequestPayload;
512    type Request = ChronicleRequest;
513    type Error = io::Error;
514
515    fn compression(&self) -> Compression {
516        self.compression
517    }
518
519    fn encoder(&self) -> &Self::Encoder {
520        &self.encoder
521    }
522
523    fn split_input(
524        &self,
525        input: (ChroniclePartitionKey, Vec<Event>),
526    ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
527        let (partition_key, mut events) = input;
528        let finalizers = events.take_finalizers();
529
530        let builder = RequestMetadataBuilder::from_events(&events);
531        (finalizers, builder, (partition_key, events))
532    }
533
534    fn build_request(
535        &self,
536        finalizers: Self::Metadata,
537        metadata: RequestMetadata,
538        payload: EncodeResult<Self::Payload>,
539    ) -> Self::Request {
540        let mut headers = HashMap::new();
541        headers.insert(
542            header::CONTENT_TYPE,
543            HeaderValue::from_static("application/json"),
544        );
545
546        match payload.compressed_byte_size {
547            Some(compressed_byte_size) => {
548                headers.insert(
549                    header::CONTENT_LENGTH,
550                    HeaderValue::from_str(&compressed_byte_size.to_string()).unwrap(),
551                );
552                headers.insert(
553                    header::CONTENT_ENCODING,
554                    HeaderValue::from_str(self.compression.content_encoding().unwrap()).unwrap(),
555                );
556            }
557            None => {
558                headers.insert(
559                    header::CONTENT_LENGTH,
560                    HeaderValue::from_str(&payload.uncompressed_byte_size.to_string()).unwrap(),
561                );
562            }
563        }
564
565        ChronicleRequest {
566            headers,
567            body: payload.into_payload().bytes,
568            finalizers,
569            metadata,
570        }
571    }
572}
573
574#[derive(Clone, Debug, Serialize)]
575struct Label {
576    key: String,
577    value: String,
578}
579
580impl ChronicleRequestBuilder {
581    fn new(config: &ChronicleUnstructuredConfig) -> crate::Result<Self> {
582        let transformer = config.encoding.transformer();
583        let serializer = config.encoding.config().build()?;
584        let compression = Compression::from(config.compression);
585        let encoder = crate::codecs::Encoder::<()>::new(serializer);
586        let encoder = ChronicleEncoder {
587            customer_id: config.customer_id.clone(),
588            labels: config.labels.as_ref().map(|labs| {
589                labs.iter()
590                    .map(|(k, v)| Label {
591                        key: k.to_string(),
592                        value: v.to_string(),
593                    })
594                    .collect::<Vec<_>>()
595            }),
596            encoder,
597            transformer,
598        };
599        Ok(Self {
600            encoder,
601            compression,
602        })
603    }
604}
605
606#[derive(Debug, Clone)]
607pub struct ChronicleService {
608    client: HttpClient,
609    base_url: String,
610    creds: GcpAuthenticator,
611}
612
613impl ChronicleService {
614    pub const fn new(client: HttpClient, base_url: String, creds: GcpAuthenticator) -> Self {
615        Self {
616            client,
617            base_url,
618            creds,
619        }
620    }
621}
622
623#[derive(Debug, Snafu)]
624pub enum ChronicleResponseError {
625    #[snafu(display("Server responded with an error: {}", code))]
626    ServerError { code: StatusCode },
627    #[snafu(display("Failed to make HTTP(S) request: {}", error))]
628    HttpError { error: crate::http::HttpError },
629}
630
631impl Service<ChronicleRequest> for ChronicleService {
632    type Response = GcsResponse;
633    type Error = ChronicleResponseError;
634    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
635
636    fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
637        Poll::Ready(Ok(()))
638    }
639
640    fn call(&mut self, request: ChronicleRequest) -> Self::Future {
641        let mut builder = Request::post(&self.base_url);
642        let metadata = request.get_metadata().clone();
643
644        let headers = builder.headers_mut().unwrap();
645        for (name, value) in request.headers {
646            headers.insert(name, value);
647        }
648
649        let mut http_request = builder.body(Body::from(request.body)).unwrap();
650        self.creds.apply(&mut http_request);
651
652        let mut client = self.client.clone();
653        Box::pin(async move {
654            match client.call(http_request).await {
655                Ok(response) => {
656                    let status = response.status();
657                    if status.is_success() {
658                        Ok(GcsResponse {
659                            inner: response,
660                            metadata,
661                        })
662                    } else {
663                        Err(ChronicleResponseError::ServerError { code: status })
664                    }
665                }
666                Err(error) => Err(ChronicleResponseError::HttpError { error }),
667            }
668        })
669    }
670}
671
672#[cfg(all(test, feature = "chronicle-integration-tests"))]
673mod integration_tests {
674    use reqwest::{Client, Method, Response};
675    use serde::{Deserialize, Serialize};
676    use vector_lib::event::{BatchNotifier, BatchStatus};
677
678    use super::*;
679    use crate::test_util::{
680        components::{
681            run_and_assert_sink_compliance, run_and_assert_sink_error, COMPONENT_ERROR_TAGS,
682            SINK_TAGS,
683        },
684        random_events_with_stream, random_string, trace_init,
685    };
686
687    const ADDRESS_ENV_VAR: &str = "CHRONICLE_ADDRESS";
688
689    fn config(log_type: &str, auth_path: &str) -> ChronicleUnstructuredConfig {
690        let address = std::env::var(ADDRESS_ENV_VAR).unwrap();
691        let config = format!(
692            indoc! { r#"
693             endpoint = "{}"
694             customer_id = "customer id"
695             namespace = "namespace"
696             credentials_path = "{}"
697             log_type = "{}"
698             encoding.codec = "text"
699        "# },
700            address, auth_path, log_type
701        );
702
703        let config: ChronicleUnstructuredConfig = toml::from_str(&config).unwrap();
704        config
705    }
706
707    async fn config_build(
708        log_type: &str,
709        auth_path: &str,
710    ) -> crate::Result<(VectorSink, crate::sinks::Healthcheck)> {
711        let cx = SinkContext::default();
712        config(log_type, auth_path).build(cx).await
713    }
714
715    #[tokio::test]
716    async fn publish_events() {
717        trace_init();
718
719        let log_type = random_string(10);
720        let (sink, healthcheck) =
721            config_build(&log_type, "/home/vector/scripts/integration/gcp/auth.json")
722                .await
723                .expect("Building sink failed");
724
725        healthcheck.await.expect("Health check failed");
726
727        let (batch, mut receiver) = BatchNotifier::new_with_receiver();
728        let (input, events) = random_events_with_stream(100, 100, Some(batch));
729        run_and_assert_sink_compliance(sink, events, &SINK_TAGS).await;
730        assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
731
732        let response = pull_messages(&log_type).await;
733        let messages = response
734            .into_iter()
735            .map(|message| message.log_text)
736            .collect::<Vec<_>>();
737        assert_eq!(input.len(), messages.len());
738        for i in 0..input.len() {
739            let data = serde_json::to_value(&messages[i]).unwrap();
740            let expected = serde_json::to_value(input[i].as_log().get("message").unwrap()).unwrap();
741            assert_eq!(data, expected);
742        }
743    }
744
745    #[tokio::test]
746    async fn invalid_credentials() {
747        trace_init();
748
749        let log_type = random_string(10);
750        // Test with an auth file that doesnt match the public key sent to the dummy chronicle server.
751        let sink = config_build(
752            &log_type,
753            "/home/vector/scripts/integration/gcp/invalidauth.json",
754        )
755        .await;
756
757        assert!(sink.is_err())
758    }
759
760    #[tokio::test]
761    async fn publish_invalid_events() {
762        trace_init();
763
764        // The chronicle-emulator we are testing against is setup so a `log_type` of "INVALID"
765        // will return a `400 BAD_REQUEST`.
766        let log_type = "INVALID";
767        let (sink, healthcheck) =
768            config_build(log_type, "/home/vector/scripts/integration/gcp/auth.json")
769                .await
770                .expect("Building sink failed");
771
772        healthcheck.await.expect("Health check failed");
773
774        let (batch, mut receiver) = BatchNotifier::new_with_receiver();
775        let (_input, events) = random_events_with_stream(100, 100, Some(batch));
776        run_and_assert_sink_error(sink, events, &COMPONENT_ERROR_TAGS).await;
777        assert_eq!(receiver.try_recv(), Ok(BatchStatus::Rejected));
778    }
779
780    #[derive(Clone, Debug, Deserialize, Serialize)]
781    pub struct Log {
782        customer_id: String,
783        namespace: String,
784        log_type: String,
785        log_text: String,
786        ts_rfc3339: String,
787    }
788
789    async fn request(method: Method, path: &str, log_type: &str) -> Response {
790        let address = std::env::var(ADDRESS_ENV_VAR).unwrap();
791        let url = format!("{address}/{path}");
792        Client::new()
793            .request(method.clone(), &url)
794            .query(&[("log_type", log_type)])
795            .send()
796            .await
797            .unwrap_or_else(|_| panic!("Sending {method} request to {url} failed"))
798    }
799
800    async fn pull_messages(log_type: &str) -> Vec<Log> {
801        request(Method::GET, "logs", log_type)
802            .await
803            .json::<Vec<Log>>()
804            .await
805            .expect("Extracting pull data failed")
806    }
807}