vector/sinks/gcp/
cloud_storage.rs

1use std::{collections::HashMap, convert::TryFrom, io};
2
3use bytes::Bytes;
4use chrono::{FixedOffset, Utc};
5use http::{
6    Uri,
7    header::{HeaderName, HeaderValue},
8};
9use indoc::indoc;
10use snafu::{ResultExt, Snafu};
11use tower::ServiceBuilder;
12use uuid::Uuid;
13use vector_lib::{
14    TimeZone,
15    codecs::encoding::Framer,
16    configurable::configurable_component,
17    event::{EventFinalizers, Finalizable},
18    request_metadata::RequestMetadata,
19};
20
21use crate::{
22    codecs::{Encoder, EncodingConfigWithFraming, SinkType, Transformer},
23    config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext},
24    event::Event,
25    gcp::{GcpAuthConfig, GcpAuthenticator, Scope},
26    http::{HttpClient, get_http_scheme_from_uri},
27    serde::json::to_string,
28    sinks::{
29        Healthcheck, VectorSink,
30        gcs_common::{
31            config::{
32                GcsPredefinedAcl, GcsRetryLogic, GcsStorageClass, build_healthcheck,
33                default_endpoint,
34            },
35            service::{GcsRequest, GcsRequestSettings, GcsService},
36            sink::GcsSink,
37        },
38        util::{
39            BulkSizeBasedDefaultBatchSettings, Compression, RequestBuilder, ServiceBuilderExt,
40            TowerRequestConfig, batch::BatchConfig, metadata::RequestMetadataBuilder,
41            partitioner::KeyPartitioner, request_builder::EncodeResult,
42            service::TowerRequestConfigDefaults, timezone_to_offset,
43        },
44    },
45    template::{Template, TemplateParseError},
46    tls::{TlsConfig, TlsSettings},
47};
48
49#[derive(Debug, Snafu)]
50#[snafu(visibility(pub))]
51pub enum GcsHealthcheckError {
52    #[snafu(display("key_prefix template parse error: {}", source))]
53    KeyPrefixTemplate { source: TemplateParseError },
54}
55
56#[derive(Clone, Copy, Debug)]
57pub struct GcsTowerRequestConfigDefaults;
58
59impl TowerRequestConfigDefaults for GcsTowerRequestConfigDefaults {
60    const RATE_LIMIT_NUM: u64 = 1_000;
61}
62
63/// Configuration for the `gcp_cloud_storage` sink.
64#[configurable_component(sink(
65    "gcp_cloud_storage",
66    "Store observability events in GCP Cloud Storage."
67))]
68#[derive(Clone, Debug)]
69#[serde(deny_unknown_fields)]
70pub struct GcsSinkConfig {
71    /// The GCS bucket name.
72    #[configurable(metadata(docs::examples = "my-bucket"))]
73    bucket: String,
74
75    /// The Predefined ACL to apply to created objects.
76    ///
77    /// For more information, see [Predefined ACLs][predefined_acls].
78    ///
79    /// [predefined_acls]: https://cloud.google.com/storage/docs/access-control/lists#predefined-acl
80    acl: Option<GcsPredefinedAcl>,
81
82    /// The storage class for created objects.
83    ///
84    /// For more information, see the [storage classes][storage_classes] documentation.
85    ///
86    /// [storage_classes]: https://cloud.google.com/storage/docs/storage-classes
87    storage_class: Option<GcsStorageClass>,
88
89    /// The set of metadata `key:value` pairs for the created objects.
90    ///
91    /// For more information, see the [custom metadata][custom_metadata] documentation.
92    ///
93    /// [custom_metadata]: https://cloud.google.com/storage/docs/metadata#custom-metadata
94    #[configurable(metadata(docs::additional_props_description = "A key/value pair."))]
95    #[configurable(metadata(docs::advanced))]
96    metadata: Option<HashMap<String, String>>,
97
98    /// A prefix to apply to all object keys.
99    ///
100    /// Prefixes are useful for partitioning objects, such as by creating an object key that
101    /// stores objects under a particular directory. If using a prefix for this purpose, it must end
102    /// in `/` in order to act as a directory path. A trailing `/` is **not** automatically added.
103    #[configurable(metadata(docs::templateable))]
104    #[configurable(metadata(
105        docs::examples = "date=%F/",
106        docs::examples = "date=%F/hour=%H/",
107        docs::examples = "year=%Y/month=%m/day=%d/",
108        docs::examples = "application_id={{ application_id }}/date=%F/"
109    ))]
110    #[configurable(metadata(docs::advanced))]
111    key_prefix: Option<String>,
112
113    /// The timestamp format for the time component of the object key.
114    ///
115    /// By default, object keys are appended with a timestamp that reflects when the objects are
116    /// sent to S3, such that the resulting object key is functionally equivalent to joining the key
117    /// prefix with the formatted timestamp, such as `date=2022-07-18/1658176486`.
118    ///
119    /// This would represent a `key_prefix` set to `date=%F/` and the timestamp of Mon Jul 18 2022
120    /// 20:34:44 GMT+0000, with the `filename_time_format` being set to `%s`, which renders
121    /// timestamps in seconds since the Unix epoch.
122    ///
123    /// Supports the common [`strftime`][chrono_strftime_specifiers] specifiers found in most
124    /// languages.
125    ///
126    /// When set to an empty string, no timestamp is appended to the key prefix.
127    ///
128    /// [chrono_strftime_specifiers]: https://docs.rs/chrono/latest/chrono/format/strftime/index.html#specifiers
129    #[serde(default = "default_time_format")]
130    #[configurable(metadata(docs::advanced))]
131    filename_time_format: String,
132
133    /// Whether or not to append a UUID v4 token to the end of the object key.
134    ///
135    /// The UUID is appended to the timestamp portion of the object key, such that if the object key
136    /// generated is `date=2022-07-18/1658176486`, setting this field to `true` results
137    /// in an object key that looks like `date=2022-07-18/1658176486-30f6652c-71da-4f9f-800d-a1189c47c547`.
138    ///
139    /// This ensures there are no name collisions, and can be useful in high-volume workloads where
140    /// object keys must be unique.
141    #[serde(default = "crate::serde::default_true")]
142    #[configurable(metadata(docs::advanced))]
143    filename_append_uuid: bool,
144
145    /// The filename extension to use in the object key.
146    ///
147    /// If not specified, the extension is determined by the compression scheme used.
148    #[configurable(metadata(docs::advanced))]
149    filename_extension: Option<String>,
150
151    #[serde(flatten)]
152    encoding: EncodingConfigWithFraming,
153
154    /// Compression configuration.
155    ///
156    /// All compression algorithms use the default compression level unless otherwise specified.
157    ///
158    /// Some cloud storage API clients and browsers handle decompression transparently, so
159    /// depending on how they are accessed, files may not always appear to be compressed.
160    #[configurable(derived)]
161    #[serde(default)]
162    compression: Compression,
163
164    /// Overrides the MIME type of the created objects.
165    ///
166    /// Directly comparable to the `Content-Type` HTTP header.
167    ///
168    /// If not specified, defaults to the encoder's content type.
169    #[configurable(metadata(
170        docs::examples = "text/plain; charset=utf-8",
171        docs::examples = "application/gzip"
172    ))]
173    content_type: Option<String>,
174
175    /// Overrides what content encoding has been applied to the object.
176    ///
177    /// Directly comparable to the `Content-Encoding` HTTP header.
178    ///
179    /// If not specified, the compression scheme used dictates this value.
180    #[configurable(metadata(docs::examples = "gzip", docs::examples = "zstd"))]
181    content_encoding: Option<String>,
182
183    /// Sets the `Cache-Control` header for the created objects.
184    ///
185    /// Directly comparable to the `Cache-Control` HTTP header.
186    #[configurable(metadata(docs::examples = "no-transform"))]
187    cache_control: Option<String>,
188
189    #[configurable(derived)]
190    #[serde(default)]
191    batch: BatchConfig<BulkSizeBasedDefaultBatchSettings>,
192
193    /// API endpoint for Google Cloud Storage
194    #[configurable(metadata(docs::examples = "http://localhost:9000"))]
195    #[configurable(validation(format = "uri"))]
196    #[serde(default = "default_endpoint")]
197    endpoint: String,
198
199    #[configurable(derived)]
200    #[serde(default)]
201    request: TowerRequestConfig<GcsTowerRequestConfigDefaults>,
202
203    #[serde(flatten)]
204    auth: GcpAuthConfig,
205
206    #[configurable(derived)]
207    tls: Option<TlsConfig>,
208
209    #[configurable(derived)]
210    #[serde(
211        default,
212        deserialize_with = "crate::serde::bool_or_struct",
213        skip_serializing_if = "crate::serde::is_default"
214    )]
215    acknowledgements: AcknowledgementsConfig,
216
217    #[configurable(derived)]
218    #[serde(default)]
219    pub timezone: Option<TimeZone>,
220}
221
222fn default_time_format() -> String {
223    "%s".to_string()
224}
225
226#[cfg(test)]
227fn default_config(encoding: EncodingConfigWithFraming) -> GcsSinkConfig {
228    GcsSinkConfig {
229        bucket: Default::default(),
230        acl: Default::default(),
231        storage_class: Default::default(),
232        metadata: Default::default(),
233        key_prefix: Default::default(),
234        filename_time_format: default_time_format(),
235        filename_append_uuid: true,
236        filename_extension: Default::default(),
237        content_type: Default::default(),
238        content_encoding: Default::default(),
239        cache_control: Default::default(),
240        encoding,
241        compression: Compression::gzip_default(),
242        batch: Default::default(),
243        endpoint: Default::default(),
244        request: Default::default(),
245        auth: Default::default(),
246        tls: Default::default(),
247        acknowledgements: Default::default(),
248        timezone: Default::default(),
249    }
250}
251
252impl GenerateConfig for GcsSinkConfig {
253    fn generate_config() -> toml::Value {
254        toml::from_str(indoc! {r#"
255            bucket = "my-bucket"
256            credentials_path = "/path/to/credentials.json"
257            framing.method = "newline_delimited"
258            encoding.codec = "json"
259        "#})
260        .unwrap()
261    }
262}
263
264#[async_trait::async_trait]
265#[typetag::serde(name = "gcp_cloud_storage")]
266impl SinkConfig for GcsSinkConfig {
267    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
268        let auth = self.auth.build(Scope::DevStorageReadWrite).await?;
269        let base_url = format!("{}/{}/", self.endpoint, self.bucket);
270        let tls = TlsSettings::from_options(self.tls.as_ref())?;
271        let client = HttpClient::new(tls, cx.proxy())?;
272        let healthcheck = build_healthcheck(
273            self.bucket.clone(),
274            client.clone(),
275            base_url.clone(),
276            auth.clone(),
277        )?;
278        auth.spawn_regenerate_token();
279        let sink = self.build_sink(client, base_url, auth, cx)?;
280
281        Ok((sink, healthcheck))
282    }
283
284    fn input(&self) -> Input {
285        Input::new(self.encoding.config().1.input_type() & DataType::Log)
286    }
287
288    fn acknowledgements(&self) -> &AcknowledgementsConfig {
289        &self.acknowledgements
290    }
291}
292
293impl GcsSinkConfig {
294    fn build_sink(
295        &self,
296        client: HttpClient,
297        base_url: String,
298        auth: GcpAuthenticator,
299        cx: SinkContext,
300    ) -> crate::Result<VectorSink> {
301        let request = self.request.into_settings();
302
303        let batch_settings = self.batch.into_batcher_settings()?;
304
305        let partitioner = self.key_partitioner()?;
306
307        let protocol = get_http_scheme_from_uri(&base_url.parse::<Uri>().unwrap());
308
309        let svc = ServiceBuilder::new()
310            .settings(request, GcsRetryLogic::default())
311            .service(GcsService::new(client, base_url, auth));
312
313        let request_settings = RequestSettings::new(self, cx)?;
314
315        let sink = GcsSink::new(svc, request_settings, partitioner, batch_settings, protocol);
316
317        Ok(VectorSink::from_event_streamsink(sink))
318    }
319
320    fn key_partitioner(&self) -> crate::Result<KeyPartitioner> {
321        Ok(KeyPartitioner::new(
322            Template::try_from(self.key_prefix.as_deref().unwrap_or("date=%F/"))
323                .context(KeyPrefixTemplateSnafu)?,
324            None,
325        ))
326    }
327}
328
329// Settings required to produce a request that do not change per
330// request. All possible values are pre-computed for direct use in
331// producing a request.
332#[derive(Clone, Debug)]
333struct RequestSettings {
334    acl: Option<HeaderValue>,
335    content_type: HeaderValue,
336    content_encoding: Option<HeaderValue>,
337    storage_class: HeaderValue,
338    cache_control: Option<HeaderValue>,
339    headers: Vec<(HeaderName, HeaderValue)>,
340    extension: String,
341    time_format: String,
342    append_uuid: bool,
343    encoder: (Transformer, Encoder<Framer>),
344    compression: Compression,
345    tz_offset: Option<FixedOffset>,
346}
347
348impl RequestBuilder<(String, Vec<Event>)> for RequestSettings {
349    type Metadata = (String, EventFinalizers);
350    type Events = Vec<Event>;
351    type Encoder = (Transformer, Encoder<Framer>);
352    type Payload = Bytes;
353    type Request = GcsRequest;
354    type Error = io::Error;
355
356    fn compression(&self) -> Compression {
357        self.compression
358    }
359
360    fn encoder(&self) -> &Self::Encoder {
361        &self.encoder
362    }
363
364    fn split_input(
365        &self,
366        input: (String, Vec<Event>),
367    ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
368        let (partition_key, mut events) = input;
369        let finalizers = events.take_finalizers();
370        let builder = RequestMetadataBuilder::from_events(&events);
371
372        ((partition_key, finalizers), builder, events)
373    }
374
375    fn build_request(
376        &self,
377        gcp_metadata: Self::Metadata,
378        metadata: RequestMetadata,
379        payload: EncodeResult<Self::Payload>,
380    ) -> Self::Request {
381        let (key, finalizers) = gcp_metadata;
382        // TODO: pull the seconds from the last event
383        let filename = {
384            let seconds = match self.tz_offset {
385                Some(offset) => Utc::now().with_timezone(&offset).format(&self.time_format),
386                None => Utc::now()
387                    .with_timezone(&chrono::Utc)
388                    .format(&self.time_format),
389            };
390
391            if self.append_uuid {
392                let uuid = Uuid::new_v4();
393                format!("{}-{}", seconds, uuid.hyphenated())
394            } else {
395                seconds.to_string()
396            }
397        };
398
399        let key = format!("{}{}.{}", key, filename, self.extension);
400        let body = payload.into_payload();
401
402        GcsRequest {
403            key,
404            body,
405            finalizers,
406            settings: GcsRequestSettings {
407                acl: self.acl.clone(),
408                content_type: self.content_type.clone(),
409                content_encoding: self.content_encoding.clone(),
410                storage_class: self.storage_class.clone(),
411                cache_control: self.cache_control.clone(),
412                headers: self.headers.clone(),
413            },
414            metadata,
415        }
416    }
417}
418
419impl RequestSettings {
420    fn new(config: &GcsSinkConfig, cx: SinkContext) -> crate::Result<Self> {
421        let transformer = config.encoding.transformer();
422        let (framer, serializer) = config.encoding.build(SinkType::MessageBased)?;
423        let encoder = Encoder::<Framer>::new(framer, serializer);
424        let acl = config
425            .acl
426            .map(|acl| HeaderValue::from_str(&to_string(acl)).unwrap());
427        let content_type_str = config
428            .content_type
429            .as_deref()
430            .unwrap_or_else(|| encoder.content_type());
431        let content_type = HeaderValue::from_str(content_type_str)?;
432        let content_encoding = match &config.content_encoding {
433            Some(ce) => Some(HeaderValue::from_str(ce)?),
434            None => config
435                .compression
436                .content_encoding()
437                .map(|ce| HeaderValue::from_str(&to_string(ce)).unwrap()),
438        };
439        let storage_class = config.storage_class.unwrap_or_default();
440        let storage_class = HeaderValue::from_str(&to_string(storage_class)).unwrap();
441        let cache_control = config
442            .cache_control
443            .as_ref()
444            .map(|cc| HeaderValue::from_str(cc))
445            .transpose()?;
446        let metadata = config
447            .metadata
448            .as_ref()
449            .map(|metadata| {
450                metadata
451                    .iter()
452                    .map(make_header)
453                    .collect::<Result<Vec<_>, _>>()
454            })
455            .unwrap_or_else(|| Ok(vec![]))?;
456        let extension = config
457            .filename_extension
458            .clone()
459            .unwrap_or_else(|| config.compression.extension().into());
460        let time_format = config.filename_time_format.clone();
461        let append_uuid = config.filename_append_uuid;
462        let offset = config
463            .timezone
464            .or(cx.globals.timezone)
465            .and_then(timezone_to_offset);
466
467        Ok(Self {
468            acl,
469            content_type,
470            content_encoding,
471            storage_class,
472            cache_control,
473            headers: metadata,
474            extension,
475            time_format,
476            append_uuid,
477            compression: config.compression,
478            encoder: (transformer, encoder),
479            tz_offset: offset,
480        })
481    }
482}
483
484// Make a header pair from a key-value string pair
485fn make_header((name, value): (&String, &String)) -> crate::Result<(HeaderName, HeaderValue)> {
486    Ok((
487        HeaderName::from_bytes(name.as_bytes())?,
488        HeaderValue::from_str(value)?,
489    ))
490}
491
492#[cfg(test)]
493mod tests {
494    use futures_util::{future::ready, stream};
495    use vector_lib::{
496        EstimatedJsonEncodedSizeOf,
497        codecs::{
498            JsonSerializerConfig, NewlineDelimitedEncoderConfig, TextSerializerConfig,
499            encoding::FramingConfig,
500        },
501        partition::Partitioner,
502        request_metadata::GroupedCountByteSize,
503    };
504
505    use super::*;
506    use crate::{
507        event::LogEvent,
508        test_util::{
509            components::{SINK_TAGS, run_and_assert_sink_compliance},
510            http::{always_200_response, spawn_blackhole_http_server},
511        },
512    };
513
514    #[test]
515    fn generate_config() {
516        crate::test_util::test_generate_config::<GcsSinkConfig>();
517    }
518
519    #[tokio::test]
520    async fn component_spec_compliance() {
521        let mock_endpoint = spawn_blackhole_http_server(always_200_response).await;
522
523        let context = SinkContext::default();
524
525        let tls = TlsSettings::default();
526        let client =
527            HttpClient::new(tls, context.proxy()).expect("should not fail to create HTTP client");
528
529        let config =
530            default_config((None::<FramingConfig>, JsonSerializerConfig::default()).into());
531        let sink = config
532            .build_sink(
533                client,
534                mock_endpoint.to_string(),
535                GcpAuthenticator::None,
536                context,
537            )
538            .expect("failed to build sink");
539
540        let event = Event::Log(LogEvent::from("simple message"));
541        run_and_assert_sink_compliance(sink, stream::once(ready(event)), &SINK_TAGS).await;
542    }
543
544    #[test]
545    fn gcs_encode_event_apply_rules() {
546        crate::test_util::trace_init();
547
548        let message = "hello world".to_string();
549        let mut event = LogEvent::from(message);
550        event.insert("key", "value");
551
552        let sink_config = GcsSinkConfig {
553            key_prefix: Some("key: {{ key }}".into()),
554            ..default_config((None::<FramingConfig>, TextSerializerConfig::default()).into())
555        };
556        let key = sink_config
557            .key_partitioner()
558            .unwrap()
559            .partition(&Event::Log(event))
560            .expect("key wasn't provided");
561
562        assert_eq!(key, "key: value");
563    }
564
565    fn request_settings(sink_config: &GcsSinkConfig, context: SinkContext) -> RequestSettings {
566        RequestSettings::new(sink_config, context).expect("Could not create request settings")
567    }
568
569    fn build_request(extension: Option<&str>, uuid: bool, compression: Compression) -> GcsRequest {
570        let context = SinkContext::default();
571        let sink_config = GcsSinkConfig {
572            key_prefix: Some("key/".into()),
573            filename_time_format: "date".into(),
574            filename_extension: extension.map(Into::into),
575            filename_append_uuid: uuid,
576            compression,
577            ..default_config(
578                (
579                    Some(NewlineDelimitedEncoderConfig::new()),
580                    JsonSerializerConfig::default(),
581                )
582                    .into(),
583            )
584        };
585        let log = LogEvent::default().into();
586        let key = sink_config
587            .key_partitioner()
588            .unwrap()
589            .partition(&log)
590            .expect("key wasn't provided");
591
592        let mut byte_size = GroupedCountByteSize::new_untagged();
593        byte_size.add_event(&log, log.estimated_json_encoded_size_of());
594
595        let request_settings = request_settings(&sink_config, context);
596        let (metadata, metadata_request_builder, _events) =
597            request_settings.split_input((key, vec![log]));
598        let payload = EncodeResult::uncompressed(Bytes::new(), byte_size);
599        let request_metadata = metadata_request_builder.build(&payload);
600
601        request_settings.build_request(metadata, request_metadata, payload)
602    }
603
604    #[test]
605    fn gcs_build_request() {
606        let req = build_request(Some("ext"), false, Compression::None);
607        assert_eq!(req.key, "key/date.ext".to_string());
608
609        let req = build_request(None, false, Compression::None);
610        assert_eq!(req.key, "key/date.log".to_string());
611
612        let req = build_request(None, false, Compression::gzip_default());
613        assert_eq!(req.key, "key/date.log.gz".to_string());
614
615        let req = build_request(None, true, Compression::gzip_default());
616        assert_ne!(req.key, "key/date.log.gz".to_string());
617    }
618
619    #[test]
620    fn gcs_content_type_default() {
621        let context = SinkContext::default();
622        let sink_config = GcsSinkConfig {
623            content_type: None,
624            ..default_config((None::<FramingConfig>, TextSerializerConfig::default()).into())
625        };
626
627        let request_settings = request_settings(&sink_config, context);
628        // Should default to encoder's content type which is "text/plain" for text codec
629        assert_eq!(
630            request_settings.content_type.to_str().unwrap(),
631            "text/plain"
632        );
633    }
634
635    #[test]
636    fn gcs_content_type_custom() {
637        let context = SinkContext::default();
638        let sink_config = GcsSinkConfig {
639            content_type: Some("text/plain; charset=utf-8".to_string()),
640            ..default_config((None::<FramingConfig>, TextSerializerConfig::default()).into())
641        };
642
643        let request_settings = request_settings(&sink_config, context);
644        // Should use custom content type
645        assert_eq!(
646            request_settings.content_type.to_str().unwrap(),
647            "text/plain; charset=utf-8"
648        );
649    }
650
651    #[test]
652    fn gcs_content_type_invalid() {
653        let context = SinkContext::default();
654        let sink_config = GcsSinkConfig {
655            // Invalid header value with newline character
656            content_type: Some("text/plain\nInvalid".to_string()),
657            ..default_config((None::<FramingConfig>, TextSerializerConfig::default()).into())
658        };
659
660        let result = RequestSettings::new(&sink_config, context);
661        // Should return an error, not panic
662        assert!(result.is_err());
663    }
664
665    #[test]
666    fn gcs_content_encoding_default() {
667        let context = SinkContext::default();
668        let sink_config = GcsSinkConfig {
669            content_encoding: None,
670            compression: Compression::gzip_default(),
671            ..default_config((None::<FramingConfig>, TextSerializerConfig::default()).into())
672        };
673
674        let request_settings = request_settings(&sink_config, context);
675        // Should default to compression's content encoding which is "gzip"
676        assert_eq!(
677            request_settings.content_encoding.unwrap().to_str().unwrap(),
678            "gzip"
679        );
680    }
681
682    #[test]
683    fn gcs_content_encoding_none_when_no_compression() {
684        let context = SinkContext::default();
685        let sink_config = GcsSinkConfig {
686            content_encoding: None,
687            compression: Compression::None,
688            ..default_config((None::<FramingConfig>, TextSerializerConfig::default()).into())
689        };
690
691        let request_settings = request_settings(&sink_config, context);
692        // Should be None when compression is None
693        assert!(request_settings.content_encoding.is_none());
694    }
695
696    #[test]
697    fn gcs_content_encoding_custom() {
698        let context = SinkContext::default();
699        let sink_config = GcsSinkConfig {
700            content_encoding: Some("gzip".to_string()),
701            compression: Compression::None,
702            ..default_config((None::<FramingConfig>, TextSerializerConfig::default()).into())
703        };
704
705        let request_settings = request_settings(&sink_config, context);
706        // Should use custom content encoding
707        assert_eq!(
708            request_settings.content_encoding.unwrap().to_str().unwrap(),
709            "gzip"
710        );
711    }
712
713    #[test]
714    fn gcs_content_encoding_invalid() {
715        let context = SinkContext::default();
716        let sink_config = GcsSinkConfig {
717            // Invalid header value with newline character
718            content_encoding: Some("gzip\nInvalid".to_string()),
719            ..default_config((None::<FramingConfig>, TextSerializerConfig::default()).into())
720        };
721
722        let result = RequestSettings::new(&sink_config, context);
723        // Should return an error, not panic
724        assert!(result.is_err());
725    }
726
727    #[test]
728    fn gcs_content_encoding_empty() {
729        let context = SinkContext::default();
730        let sink_config = GcsSinkConfig {
731            // Empty string to disable content encoding header even with compression
732            content_encoding: Some("".to_string()),
733            compression: Compression::gzip_default(),
734            ..default_config((None::<FramingConfig>, TextSerializerConfig::default()).into())
735        };
736
737        let request_settings = request_settings(&sink_config, context);
738        // Should use empty content encoding (overriding the compression default)
739        assert_eq!(
740            request_settings.content_encoding.unwrap().to_str().unwrap(),
741            ""
742        );
743    }
744
745    #[test]
746    fn gcs_cache_control_default() {
747        let context = SinkContext::default();
748        let sink_config = GcsSinkConfig {
749            cache_control: None,
750            ..default_config((None::<FramingConfig>, TextSerializerConfig::default()).into())
751        };
752
753        let request_settings = request_settings(&sink_config, context);
754        // Should be None by default
755        assert!(request_settings.cache_control.is_none());
756    }
757
758    #[test]
759    fn gcs_cache_control_custom() {
760        let context = SinkContext::default();
761        let sink_config = GcsSinkConfig {
762            cache_control: Some("no-transform".to_string()),
763            ..default_config((None::<FramingConfig>, TextSerializerConfig::default()).into())
764        };
765
766        let request_settings = request_settings(&sink_config, context);
767        assert_eq!(
768            request_settings.cache_control.unwrap().to_str().unwrap(),
769            "no-transform"
770        );
771    }
772
773    #[test]
774    fn gcs_cache_control_invalid() {
775        let context = SinkContext::default();
776        let sink_config = GcsSinkConfig {
777            // Invalid header value with newline character
778            cache_control: Some("no-cache\nInvalid".to_string()),
779            ..default_config((None::<FramingConfig>, TextSerializerConfig::default()).into())
780        };
781
782        let result = RequestSettings::new(&sink_config, context);
783        // Should return an error, not panic
784        assert!(result.is_err());
785    }
786}