vector/sinks/gcp_chronicle/
chronicle_unstructured.rs

1//! This sink sends data to Google Chronicles unstructured log entries endpoint.
2//! See <https://cloud.google.com/chronicle/docs/reference/ingestion-api#unstructuredlogentries>
3//! for more information.
4use std::{collections::HashMap, io};
5
6use bytes::{Bytes, BytesMut};
7use futures_util::{future::BoxFuture, task::Poll};
8use goauth::scopes::Scope;
9use http::{
10    Request, StatusCode, Uri,
11    header::{self, HeaderName, HeaderValue},
12};
13use hyper::Body;
14use indoc::indoc;
15use serde::Serialize;
16use serde_json::json;
17use snafu::Snafu;
18use tokio_util::codec::Encoder as _;
19use tower::{Service, ServiceBuilder};
20use vector_lib::{
21    EstimatedJsonEncodedSizeOf,
22    config::{AcknowledgementsConfig, Input, telemetry},
23    configurable::configurable_component,
24    event::{Event, EventFinalizers, Finalizable},
25    request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata},
26    sink::VectorSink,
27};
28use vrl::value::Kind;
29
30use crate::{
31    codecs::{self, EncodingConfig},
32    config::{GenerateConfig, SinkConfig, SinkContext},
33    gcp::{GcpAuthConfig, GcpAuthenticator},
34    http::HttpClient,
35    schema,
36    sinks::{
37        Healthcheck,
38        gcp_chronicle::{
39            compression::ChronicleCompression,
40            partitioner::{ChroniclePartitionKey, ChroniclePartitioner},
41            sink::ChronicleSink,
42        },
43        gcs_common::{
44            config::{GcsRetryLogic, healthcheck_response},
45            service::GcsResponse,
46        },
47        util::{
48            BatchConfig, Compression, RequestBuilder, SinkBatchSettings, TowerRequestConfig,
49            encoding::{Encoder, as_tracked_write},
50            metadata::RequestMetadataBuilder,
51            request_builder::EncodeResult,
52            service::TowerRequestConfigDefaults,
53        },
54    },
55    template::{Template, TemplateParseError},
56    tls::{TlsConfig, TlsSettings},
57};
58
59#[derive(Debug, Snafu)]
60#[snafu(visibility(pub))]
61pub enum GcsHealthcheckError {
62    #[snafu(display("log_type template parse error: {}", source))]
63    LogTypeTemplate { source: TemplateParseError },
64
65    #[snafu(display("Endpoint not found"))]
66    NotFound,
67}
68
69/// Google Chronicle regions.
70#[configurable_component]
71#[derive(Clone, Copy, Debug, Eq, PartialEq)]
72#[serde(rename_all = "snake_case")]
73pub enum Region {
74    /// European Multi region
75    Eu,
76
77    /// US Multi region
78    Us,
79
80    /// APAC region (this is the same as the Singapore region endpoint retained for backwards compatibility)
81    Asia,
82
83    /// SãoPaulo Region
84    SãoPaulo,
85
86    /// Canada Region
87    Canada,
88
89    /// Dammam Region
90    Dammam,
91
92    /// Doha Region
93    Doha,
94
95    /// Frankfurt Region
96    Frankfurt,
97
98    /// London Region
99    London,
100
101    /// Mumbai Region
102    Mumbai,
103
104    /// Paris Region
105    Paris,
106
107    /// Singapore Region
108    Singapore,
109
110    /// Sydney Region
111    Sydney,
112
113    /// TelAviv Region
114    TelAviv,
115
116    /// Tokyo Region
117    Tokyo,
118
119    /// Turin Region
120    Turin,
121
122    /// Zurich Region
123    Zurich,
124}
125
126impl Region {
127    /// Each region has a its own endpoint.
128    const fn endpoint(self) -> &'static str {
129        match self {
130            Region::Eu => "https://europe-malachiteingestion-pa.googleapis.com",
131            Region::Us => "https://malachiteingestion-pa.googleapis.com",
132            Region::Asia => "https://asia-southeast1-malachiteingestion-pa.googleapis.com",
133            Region::SãoPaulo => "https://southamerica-east1-malachiteingestion-pa.googleapis.com",
134            Region::Canada => {
135                "https://northamerica-northeast2-malachiteingestion-pa.googleapis.com"
136            }
137            Region::Dammam => "https://me-central2-malachiteingestion-pa.googleapis.com",
138            Region::Doha => "https://me-central1-malachiteingestion-pa.googleapis.com",
139            Region::Frankfurt => "https://europe-west3-malachiteingestion-pa.googleapis.com",
140            Region::London => "https://europe-west2-malachiteingestion-pa.googleapis.com",
141            Region::Mumbai => "https://asia-south1-malachiteingestion-pa.googleapis.com",
142            Region::Paris => "https://europe-west9-malachiteingestion-pa.googleapis.com",
143            Region::Singapore => "https://asia-southeast1-malachiteingestion-pa.googleapis.com",
144            Region::Sydney => "https://australia-southeast1-malachiteingestion-pa.googleapis.com",
145            Region::TelAviv => "https://me-west1-malachiteingestion-pa.googleapis.com",
146            Region::Tokyo => "https://asia-northeast1-malachiteingestion-pa.googleapis.com",
147            Region::Turin => "https://europe-west12-malachiteingestion-pa.googleapis.com",
148            Region::Zurich => "https://europe-west6-malachiteingestion-pa.googleapis.com",
149        }
150    }
151}
152
153#[derive(Clone, Copy, Debug, Default)]
154pub struct ChronicleUnstructuredDefaultBatchSettings;
155
156// Chronicle Ingestion API has a 1MB limit[1] for unstructured log entries. We're also using a
157// conservatively low batch timeout to ensure events make it to Chronicle in a timely fashion, but
158// high enough that it allows for reasonable batching.
159//
160// [1]: https://cloud.google.com/chronicle/docs/reference/ingestion-api#unstructuredlogentries
161impl SinkBatchSettings for ChronicleUnstructuredDefaultBatchSettings {
162    const MAX_EVENTS: Option<usize> = None;
163    const MAX_BYTES: Option<usize> = Some(1_000_000);
164    const TIMEOUT_SECS: f64 = 15.0;
165}
166
167#[derive(Clone, Copy, Debug)]
168pub struct ChronicleUnstructuredTowerRequestConfigDefaults;
169
170impl TowerRequestConfigDefaults for ChronicleUnstructuredTowerRequestConfigDefaults {
171    const RATE_LIMIT_NUM: u64 = 1_000;
172}
173
174/// Configuration for the `gcp_chronicle_unstructured` sink.
175#[configurable_component(sink(
176    "gcp_chronicle_unstructured",
177    "Store unstructured log events in Google Chronicle."
178))]
179#[derive(Clone, Debug)]
180pub struct ChronicleUnstructuredConfig {
181    /// The endpoint to send data to.
182    #[configurable(metadata(
183        docs::examples = "127.0.0.1:8080",
184        docs::examples = "example.com:12345"
185    ))]
186    pub endpoint: Option<String>,
187
188    /// The GCP region to use.
189    #[configurable(derived)]
190    pub region: Option<Region>,
191
192    /// The Unique identifier (UUID) corresponding to the Chronicle instance.
193    #[configurable(validation(format = "uuid"))]
194    #[configurable(metadata(docs::examples = "c8c65bfa-5f2c-42d4-9189-64bb7b939f2c"))]
195    pub customer_id: String,
196
197    /// User-configured environment namespace to identify the data domain the logs originated from.
198    #[configurable(metadata(docs::templateable))]
199    #[configurable(metadata(
200        docs::examples = "production",
201        docs::examples = "production-{{ namespace }}",
202    ))]
203    #[configurable(metadata(docs::advanced))]
204    pub namespace: Option<Template>,
205
206    /// A set of labels that are attached to each batch of events.
207    #[configurable(metadata(docs::examples = "chronicle_labels_examples()"))]
208    #[configurable(metadata(docs::additional_props_description = "A Chronicle label."))]
209    pub labels: Option<HashMap<String, String>>,
210
211    #[serde(flatten)]
212    pub auth: GcpAuthConfig,
213
214    #[configurable(derived)]
215    #[serde(default)]
216    pub batch: BatchConfig<ChronicleUnstructuredDefaultBatchSettings>,
217
218    #[configurable(derived)]
219    pub encoding: EncodingConfig,
220
221    #[serde(default)]
222    #[configurable(derived)]
223    pub compression: ChronicleCompression,
224
225    #[configurable(derived)]
226    #[serde(default)]
227    pub request: TowerRequestConfig<ChronicleUnstructuredTowerRequestConfigDefaults>,
228
229    #[configurable(derived)]
230    pub tls: Option<TlsConfig>,
231
232    /// The type of log entries in a request.
233    ///
234    /// This must be one of the [supported log types][unstructured_log_types_doc], otherwise
235    /// Chronicle rejects the entry with an error.
236    ///
237    /// [unstructured_log_types_doc]: https://cloud.google.com/chronicle/docs/ingestion/parser-list/supported-default-parsers
238    #[configurable(metadata(docs::examples = "WINDOWS_DNS", docs::examples = "{{ log_type }}"))]
239    pub log_type: Template,
240
241    /// The default `log_type` to attach to events if the template in `log_type` cannot be resolved.
242    #[configurable(metadata(docs::examples = "VECTOR_DEV"))]
243    pub fallback_log_type: Option<String>,
244
245    #[configurable(derived)]
246    #[serde(
247        default,
248        deserialize_with = "crate::serde::bool_or_struct",
249        skip_serializing_if = "crate::serde::is_default"
250    )]
251    acknowledgements: AcknowledgementsConfig,
252}
253
254fn chronicle_labels_examples() -> HashMap<String, String> {
255    let mut examples = HashMap::new();
256    examples.insert("source".to_string(), "vector".to_string());
257    examples.insert("tenant".to_string(), "marketing".to_string());
258    examples
259}
260
261impl GenerateConfig for ChronicleUnstructuredConfig {
262    fn generate_config() -> toml::Value {
263        toml::from_str(indoc! {r#"
264            credentials_path = "/path/to/credentials.json"
265            customer_id = "customer_id"
266            namespace = "namespace"
267            compression = "gzip"
268            log_type = "log_type"
269            fallback_log_type = "VECTOR_DEV"
270            encoding.codec = "text"
271        "#})
272        .unwrap()
273    }
274}
275
276pub fn build_healthcheck(
277    client: HttpClient,
278    base_url: &str,
279    auth: GcpAuthenticator,
280) -> crate::Result<Healthcheck> {
281    let uri = base_url.parse::<Uri>()?;
282
283    let healthcheck = async move {
284        let mut request = http::Request::get(&uri).body(Body::empty())?;
285        auth.apply(&mut request);
286
287        let response = client.send(request).await?;
288        healthcheck_response(response, GcsHealthcheckError::NotFound.into())
289    };
290
291    Ok(Box::pin(healthcheck))
292}
293
294#[derive(Debug, Snafu)]
295pub enum ChronicleError {
296    #[snafu(display("Region or endpoint not defined"))]
297    RegionOrEndpoint,
298    #[snafu(display("You can only specify one of region or endpoint"))]
299    BothRegionAndEndpoint,
300}
301
302#[async_trait::async_trait]
303#[typetag::serde(name = "gcp_chronicle_unstructured")]
304impl SinkConfig for ChronicleUnstructuredConfig {
305    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
306        let creds = self.auth.build(Scope::MalachiteIngestion).await?;
307
308        let tls = TlsSettings::from_options(self.tls.as_ref())?;
309        let client = HttpClient::new(tls, cx.proxy())?;
310
311        let endpoint = self.create_endpoint("v2/unstructuredlogentries:batchCreate")?;
312
313        // For the healthcheck we see if we can fetch the list of available log types.
314        let healthcheck_endpoint = self.create_endpoint("v2/logtypes")?;
315
316        let healthcheck = build_healthcheck(client.clone(), &healthcheck_endpoint, creds.clone())?;
317        creds.spawn_regenerate_token();
318        let sink = self.build_sink(client, endpoint, creds)?;
319
320        Ok((sink, healthcheck))
321    }
322
323    fn input(&self) -> Input {
324        let requirement =
325            schema::Requirement::empty().required_meaning("timestamp", Kind::timestamp());
326
327        Input::log().with_schema_requirement(requirement)
328    }
329
330    fn acknowledgements(&self) -> &AcknowledgementsConfig {
331        &self.acknowledgements
332    }
333}
334
335impl ChronicleUnstructuredConfig {
336    fn build_sink(
337        &self,
338        client: HttpClient,
339        base_url: String,
340        creds: GcpAuthenticator,
341    ) -> crate::Result<VectorSink> {
342        use crate::sinks::util::service::ServiceBuilderExt;
343
344        let request = self.request.into_settings();
345
346        let batch_settings = self.batch.into_batcher_settings()?;
347
348        let partitioner = self.partitioner()?;
349
350        let svc = ServiceBuilder::new()
351            .settings(request, GcsRetryLogic::default())
352            .service(ChronicleService::new(client, base_url, creds));
353
354        let request_settings = ChronicleRequestBuilder::new(self)?;
355
356        let sink = ChronicleSink::new(svc, request_settings, partitioner, batch_settings, "http");
357
358        Ok(VectorSink::from_event_streamsink(sink))
359    }
360
361    fn partitioner(&self) -> crate::Result<ChroniclePartitioner> {
362        Ok(ChroniclePartitioner::new(
363            self.log_type.clone(),
364            self.fallback_log_type.clone(),
365            self.namespace.clone(),
366        ))
367    }
368
369    fn create_endpoint(&self, path: &str) -> Result<String, ChronicleError> {
370        Ok(format!(
371            "{}/{}",
372            match (&self.endpoint, self.region) {
373                (Some(endpoint), None) => endpoint.trim_end_matches('/'),
374                (None, Some(region)) => region.endpoint(),
375                (Some(_), Some(_)) => return Err(ChronicleError::BothRegionAndEndpoint),
376                (None, None) => return Err(ChronicleError::RegionOrEndpoint),
377            },
378            path
379        ))
380    }
381}
382
383#[derive(Clone, Debug)]
384pub struct ChronicleRequest {
385    pub body: Bytes,
386    pub finalizers: EventFinalizers,
387    pub headers: HashMap<HeaderName, HeaderValue>,
388    metadata: RequestMetadata,
389}
390
391impl Finalizable for ChronicleRequest {
392    fn take_finalizers(&mut self) -> EventFinalizers {
393        std::mem::take(&mut self.finalizers)
394    }
395}
396
397impl MetaDescriptive for ChronicleRequest {
398    fn get_metadata(&self) -> &RequestMetadata {
399        &self.metadata
400    }
401
402    fn metadata_mut(&mut self) -> &mut RequestMetadata {
403        &mut self.metadata
404    }
405}
406
407#[derive(Clone, Debug, Serialize)]
408struct ChronicleRequestBody {
409    customer_id: String,
410    #[serde(skip_serializing_if = "Option::is_none")]
411    namespace: Option<String>,
412    #[serde(skip_serializing_if = "Option::is_none")]
413    labels: Option<Vec<Label>>,
414    log_type: String,
415    entries: Vec<serde_json::Value>,
416}
417
418#[derive(Clone, Debug)]
419struct ChronicleEncoder {
420    customer_id: String,
421    labels: Option<Vec<Label>>,
422    encoder: codecs::Encoder<()>,
423    transformer: codecs::Transformer,
424}
425
426impl Encoder<(ChroniclePartitionKey, Vec<Event>)> for ChronicleEncoder {
427    fn encode_input(
428        &self,
429        input: (ChroniclePartitionKey, Vec<Event>),
430        writer: &mut dyn io::Write,
431    ) -> io::Result<(usize, GroupedCountByteSize)> {
432        let (key, events) = input;
433        let mut encoder = self.encoder.clone();
434        let mut byte_size = telemetry().create_request_count_byte_size();
435        let events = events
436            .into_iter()
437            .filter_map(|mut event| {
438                let timestamp = event
439                    .as_log()
440                    .get_timestamp()
441                    .and_then(|ts| ts.as_timestamp())
442                    .cloned();
443                let mut bytes = BytesMut::new();
444                self.transformer.transform(&mut event);
445
446                byte_size.add_event(&event, event.estimated_json_encoded_size_of());
447
448                encoder.encode(event, &mut bytes).ok()?;
449
450                let mut value = json!({
451                    "log_text": String::from_utf8_lossy(&bytes),
452                });
453
454                if let Some(ts) = timestamp {
455                    value.as_object_mut().unwrap().insert(
456                        "ts_rfc3339".to_string(),
457                        ts.to_rfc3339_opts(chrono::SecondsFormat::AutoSi, true)
458                            .into(),
459                    );
460                }
461
462                Some(value)
463            })
464            .collect::<Vec<_>>();
465
466        let json = json!(ChronicleRequestBody {
467            customer_id: self.customer_id.clone(),
468            namespace: key.namespace,
469            labels: self.labels.clone(),
470            log_type: key.log_type,
471            entries: events,
472        });
473
474        let size = as_tracked_write::<_, _, io::Error>(writer, &json, |writer, json| {
475            serde_json::to_writer(writer, json)?;
476            Ok(())
477        })?;
478
479        Ok((size, byte_size))
480    }
481}
482
483// Settings required to produce a request that do not change per
484// request. All possible values are pre-computed for direct use in
485// producing a request.
486#[derive(Clone, Debug)]
487struct ChronicleRequestBuilder {
488    encoder: ChronicleEncoder,
489    compression: Compression,
490}
491
492struct ChronicleRequestPayload {
493    bytes: Bytes,
494}
495
496impl From<Bytes> for ChronicleRequestPayload {
497    fn from(bytes: Bytes) -> Self {
498        Self { bytes }
499    }
500}
501
502impl AsRef<[u8]> for ChronicleRequestPayload {
503    fn as_ref(&self) -> &[u8] {
504        self.bytes.as_ref()
505    }
506}
507
508impl RequestBuilder<(ChroniclePartitionKey, Vec<Event>)> for ChronicleRequestBuilder {
509    type Metadata = EventFinalizers;
510    type Events = (ChroniclePartitionKey, Vec<Event>);
511    type Encoder = ChronicleEncoder;
512    type Payload = ChronicleRequestPayload;
513    type Request = ChronicleRequest;
514    type Error = io::Error;
515
516    fn compression(&self) -> Compression {
517        self.compression
518    }
519
520    fn encoder(&self) -> &Self::Encoder {
521        &self.encoder
522    }
523
524    fn split_input(
525        &self,
526        input: (ChroniclePartitionKey, Vec<Event>),
527    ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
528        let (partition_key, mut events) = input;
529        let finalizers = events.take_finalizers();
530
531        let builder = RequestMetadataBuilder::from_events(&events);
532        (finalizers, builder, (partition_key, events))
533    }
534
535    fn build_request(
536        &self,
537        finalizers: Self::Metadata,
538        metadata: RequestMetadata,
539        payload: EncodeResult<Self::Payload>,
540    ) -> Self::Request {
541        let mut headers = HashMap::new();
542        headers.insert(
543            header::CONTENT_TYPE,
544            HeaderValue::from_static("application/json"),
545        );
546
547        match payload.compressed_byte_size {
548            Some(compressed_byte_size) => {
549                headers.insert(
550                    header::CONTENT_LENGTH,
551                    HeaderValue::from_str(&compressed_byte_size.to_string()).unwrap(),
552                );
553                headers.insert(
554                    header::CONTENT_ENCODING,
555                    HeaderValue::from_str(self.compression.content_encoding().unwrap()).unwrap(),
556                );
557            }
558            None => {
559                headers.insert(
560                    header::CONTENT_LENGTH,
561                    HeaderValue::from_str(&payload.uncompressed_byte_size.to_string()).unwrap(),
562                );
563            }
564        }
565
566        ChronicleRequest {
567            headers,
568            body: payload.into_payload().bytes,
569            finalizers,
570            metadata,
571        }
572    }
573}
574
575#[derive(Clone, Debug, Serialize)]
576struct Label {
577    key: String,
578    value: String,
579}
580
581impl ChronicleRequestBuilder {
582    fn new(config: &ChronicleUnstructuredConfig) -> crate::Result<Self> {
583        let transformer = config.encoding.transformer();
584        let serializer = config.encoding.config().build()?;
585        let compression = Compression::from(config.compression);
586        let encoder = crate::codecs::Encoder::<()>::new(serializer);
587        let encoder = ChronicleEncoder {
588            customer_id: config.customer_id.clone(),
589            labels: config.labels.as_ref().map(|labs| {
590                labs.iter()
591                    .map(|(k, v)| Label {
592                        key: k.to_string(),
593                        value: v.to_string(),
594                    })
595                    .collect::<Vec<_>>()
596            }),
597            encoder,
598            transformer,
599        };
600        Ok(Self {
601            encoder,
602            compression,
603        })
604    }
605}
606
607#[derive(Debug, Clone)]
608pub struct ChronicleService {
609    client: HttpClient,
610    base_url: String,
611    creds: GcpAuthenticator,
612}
613
614impl ChronicleService {
615    pub const fn new(client: HttpClient, base_url: String, creds: GcpAuthenticator) -> Self {
616        Self {
617            client,
618            base_url,
619            creds,
620        }
621    }
622}
623
624#[derive(Debug, Snafu)]
625pub enum ChronicleResponseError {
626    #[snafu(display("Server responded with an error: {}", code))]
627    ServerError { code: StatusCode },
628    #[snafu(display("Failed to make HTTP(S) request: {}", error))]
629    HttpError { error: crate::http::HttpError },
630}
631
632impl Service<ChronicleRequest> for ChronicleService {
633    type Response = GcsResponse;
634    type Error = ChronicleResponseError;
635    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
636
637    fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
638        Poll::Ready(Ok(()))
639    }
640
641    fn call(&mut self, request: ChronicleRequest) -> Self::Future {
642        let mut builder = Request::post(&self.base_url);
643        let metadata = request.get_metadata().clone();
644
645        let headers = builder.headers_mut().unwrap();
646        for (name, value) in request.headers {
647            headers.insert(name, value);
648        }
649
650        let mut http_request = builder.body(Body::from(request.body)).unwrap();
651        self.creds.apply(&mut http_request);
652
653        let mut client = self.client.clone();
654        Box::pin(async move {
655            match client.call(http_request).await {
656                Ok(response) => {
657                    let status = response.status();
658                    if status.is_success() {
659                        Ok(GcsResponse {
660                            inner: response,
661                            metadata,
662                        })
663                    } else {
664                        Err(ChronicleResponseError::ServerError { code: status })
665                    }
666                }
667                Err(error) => Err(ChronicleResponseError::HttpError { error }),
668            }
669        })
670    }
671}
672
673#[cfg(all(test, feature = "chronicle-integration-tests"))]
674mod integration_tests {
675    use reqwest::{Client, Method, Response};
676    use serde::{Deserialize, Serialize};
677    use vector_lib::event::{BatchNotifier, BatchStatus};
678
679    use super::*;
680    use crate::test_util::{
681        components::{
682            COMPONENT_ERROR_TAGS, SINK_TAGS, run_and_assert_sink_compliance,
683            run_and_assert_sink_error,
684        },
685        random_events_with_stream, random_string, trace_init,
686    };
687
688    const ADDRESS_ENV_VAR: &str = "CHRONICLE_ADDRESS";
689
690    fn config(log_type: &str, auth_path: &str) -> ChronicleUnstructuredConfig {
691        let address = std::env::var(ADDRESS_ENV_VAR).unwrap();
692        let config = format!(
693            indoc! { r#"
694             endpoint = "{}"
695             customer_id = "customer id"
696             namespace = "namespace"
697             credentials_path = "{}"
698             log_type = "{}"
699             encoding.codec = "text"
700        "# },
701            address, auth_path, log_type
702        );
703
704        let config: ChronicleUnstructuredConfig = toml::from_str(&config).unwrap();
705        config
706    }
707
708    async fn config_build(
709        log_type: &str,
710        auth_path: &str,
711    ) -> crate::Result<(VectorSink, crate::sinks::Healthcheck)> {
712        let cx = SinkContext::default();
713        config(log_type, auth_path).build(cx).await
714    }
715
716    #[tokio::test]
717    async fn publish_events() {
718        trace_init();
719
720        let log_type = random_string(10);
721        let (sink, healthcheck) =
722            config_build(&log_type, "/home/vector/scripts/integration/gcp/auth.json")
723                .await
724                .expect("Building sink failed");
725
726        healthcheck.await.expect("Health check failed");
727
728        let (batch, mut receiver) = BatchNotifier::new_with_receiver();
729        let (input, events) = random_events_with_stream(100, 100, Some(batch));
730        run_and_assert_sink_compliance(sink, events, &SINK_TAGS).await;
731        assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
732
733        let response = pull_messages(&log_type).await;
734        let messages = response
735            .into_iter()
736            .map(|message| message.log_text)
737            .collect::<Vec<_>>();
738        assert_eq!(input.len(), messages.len());
739        for i in 0..input.len() {
740            let data = serde_json::to_value(&messages[i]).unwrap();
741            let expected = serde_json::to_value(input[i].as_log().get("message").unwrap()).unwrap();
742            assert_eq!(data, expected);
743        }
744    }
745
746    #[tokio::test]
747    async fn invalid_credentials() {
748        trace_init();
749
750        let log_type = random_string(10);
751        // Test with an auth file that doesnt match the public key sent to the dummy chronicle server.
752        let sink = config_build(
753            &log_type,
754            "/home/vector/scripts/integration/gcp/invalidauth.json",
755        )
756        .await;
757
758        assert!(sink.is_err())
759    }
760
761    #[tokio::test]
762    async fn publish_invalid_events() {
763        trace_init();
764
765        // The chronicle-emulator we are testing against is setup so a `log_type` of "INVALID"
766        // will return a `400 BAD_REQUEST`.
767        let log_type = "INVALID";
768        let (sink, healthcheck) =
769            config_build(log_type, "/home/vector/scripts/integration/gcp/auth.json")
770                .await
771                .expect("Building sink failed");
772
773        healthcheck.await.expect("Health check failed");
774
775        let (batch, mut receiver) = BatchNotifier::new_with_receiver();
776        let (_input, events) = random_events_with_stream(100, 100, Some(batch));
777        run_and_assert_sink_error(sink, events, &COMPONENT_ERROR_TAGS).await;
778        assert_eq!(receiver.try_recv(), Ok(BatchStatus::Rejected));
779    }
780
781    #[derive(Clone, Debug, Deserialize, Serialize)]
782    pub struct Log {
783        customer_id: String,
784        namespace: String,
785        log_type: String,
786        log_text: String,
787        ts_rfc3339: String,
788    }
789
790    async fn request(method: Method, path: &str, log_type: &str) -> Response {
791        let address = std::env::var(ADDRESS_ENV_VAR).unwrap();
792        let url = format!("{address}/{path}");
793        Client::new()
794            .request(method.clone(), &url)
795            .query(&[("log_type", log_type)])
796            .send()
797            .await
798            .unwrap_or_else(|_| panic!("Sending {method} request to {url} failed"))
799    }
800
801    async fn pull_messages(log_type: &str) -> Vec<Log> {
802        request(Method::GET, "logs", log_type)
803            .await
804            .json::<Vec<Log>>()
805            .await
806            .expect("Extracting pull data failed")
807    }
808}