vector/sinks/gcp/
cloud_storage.rs

1use std::{collections::HashMap, convert::TryFrom, io};
2
3use bytes::Bytes;
4use chrono::{FixedOffset, Utc};
5use http::{
6    Uri,
7    header::{HeaderName, HeaderValue},
8};
9use indoc::indoc;
10use snafu::{ResultExt, Snafu};
11use tower::ServiceBuilder;
12use uuid::Uuid;
13use vector_lib::{
14    TimeZone,
15    codecs::encoding::Framer,
16    configurable::configurable_component,
17    event::{EventFinalizers, Finalizable},
18    request_metadata::RequestMetadata,
19};
20
21use crate::{
22    codecs::{Encoder, EncodingConfigWithFraming, SinkType, Transformer},
23    config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext},
24    event::Event,
25    gcp::{GcpAuthConfig, GcpAuthenticator, Scope},
26    http::{HttpClient, get_http_scheme_from_uri},
27    serde::json::to_string,
28    sinks::{
29        Healthcheck, VectorSink,
30        gcs_common::{
31            config::{
32                GcsPredefinedAcl, GcsRetryLogic, GcsStorageClass, build_healthcheck,
33                default_endpoint,
34            },
35            service::{GcsRequest, GcsRequestSettings, GcsService},
36            sink::GcsSink,
37        },
38        util::{
39            BulkSizeBasedDefaultBatchSettings, Compression, RequestBuilder, ServiceBuilderExt,
40            TowerRequestConfig, batch::BatchConfig, metadata::RequestMetadataBuilder,
41            partitioner::KeyPartitioner, request_builder::EncodeResult,
42            service::TowerRequestConfigDefaults, timezone_to_offset,
43        },
44    },
45    template::{Template, TemplateParseError},
46    tls::{TlsConfig, TlsSettings},
47};
48
49#[derive(Debug, Snafu)]
50#[snafu(visibility(pub))]
51pub enum GcsHealthcheckError {
52    #[snafu(display("key_prefix template parse error: {}", source))]
53    KeyPrefixTemplate { source: TemplateParseError },
54}
55
56#[derive(Clone, Copy, Debug)]
57pub struct GcsTowerRequestConfigDefaults;
58
59impl TowerRequestConfigDefaults for GcsTowerRequestConfigDefaults {
60    const RATE_LIMIT_NUM: u64 = 1_000;
61}
62
63/// Configuration for the `gcp_cloud_storage` sink.
64#[configurable_component(sink(
65    "gcp_cloud_storage",
66    "Store observability events in GCP Cloud Storage."
67))]
68#[derive(Clone, Debug)]
69#[serde(deny_unknown_fields)]
70pub struct GcsSinkConfig {
71    /// The GCS bucket name.
72    #[configurable(metadata(docs::examples = "my-bucket"))]
73    bucket: String,
74
75    /// The Predefined ACL to apply to created objects.
76    ///
77    /// For more information, see [Predefined ACLs][predefined_acls].
78    ///
79    /// [predefined_acls]: https://cloud.google.com/storage/docs/access-control/lists#predefined-acl
80    acl: Option<GcsPredefinedAcl>,
81
82    /// The storage class for created objects.
83    ///
84    /// For more information, see the [storage classes][storage_classes] documentation.
85    ///
86    /// [storage_classes]: https://cloud.google.com/storage/docs/storage-classes
87    storage_class: Option<GcsStorageClass>,
88
89    /// The set of metadata `key:value` pairs for the created objects.
90    ///
91    /// For more information, see the [custom metadata][custom_metadata] documentation.
92    ///
93    /// [custom_metadata]: https://cloud.google.com/storage/docs/metadata#custom-metadata
94    #[configurable(metadata(docs::additional_props_description = "A key/value pair."))]
95    #[configurable(metadata(docs::advanced))]
96    metadata: Option<HashMap<String, String>>,
97
98    /// A prefix to apply to all object keys.
99    ///
100    /// Prefixes are useful for partitioning objects, such as by creating an object key that
101    /// stores objects under a particular directory. If using a prefix for this purpose, it must end
102    /// in `/` in order to act as a directory path. A trailing `/` is **not** automatically added.
103    #[configurable(metadata(docs::templateable))]
104    #[configurable(metadata(
105        docs::examples = "date=%F/",
106        docs::examples = "date=%F/hour=%H/",
107        docs::examples = "year=%Y/month=%m/day=%d/",
108        docs::examples = "application_id={{ application_id }}/date=%F/"
109    ))]
110    #[configurable(metadata(docs::advanced))]
111    key_prefix: Option<String>,
112
113    /// The timestamp format for the time component of the object key.
114    ///
115    /// By default, object keys are appended with a timestamp that reflects when the objects are
116    /// sent to S3, such that the resulting object key is functionally equivalent to joining the key
117    /// prefix with the formatted timestamp, such as `date=2022-07-18/1658176486`.
118    ///
119    /// This would represent a `key_prefix` set to `date=%F/` and the timestamp of Mon Jul 18 2022
120    /// 20:34:44 GMT+0000, with the `filename_time_format` being set to `%s`, which renders
121    /// timestamps in seconds since the Unix epoch.
122    ///
123    /// Supports the common [`strftime`][chrono_strftime_specifiers] specifiers found in most
124    /// languages.
125    ///
126    /// When set to an empty string, no timestamp is appended to the key prefix.
127    ///
128    /// [chrono_strftime_specifiers]: https://docs.rs/chrono/latest/chrono/format/strftime/index.html#specifiers
129    #[serde(default = "default_time_format")]
130    #[configurable(metadata(docs::advanced))]
131    filename_time_format: String,
132
133    /// Whether or not to append a UUID v4 token to the end of the object key.
134    ///
135    /// The UUID is appended to the timestamp portion of the object key, such that if the object key
136    /// generated is `date=2022-07-18/1658176486`, setting this field to `true` results
137    /// in an object key that looks like `date=2022-07-18/1658176486-30f6652c-71da-4f9f-800d-a1189c47c547`.
138    ///
139    /// This ensures there are no name collisions, and can be useful in high-volume workloads where
140    /// object keys must be unique.
141    #[serde(default = "crate::serde::default_true")]
142    #[configurable(metadata(docs::advanced))]
143    filename_append_uuid: bool,
144
145    /// The filename extension to use in the object key.
146    ///
147    /// If not specified, the extension is determined by the compression scheme used.
148    #[configurable(metadata(docs::advanced))]
149    filename_extension: Option<String>,
150
151    #[serde(flatten)]
152    encoding: EncodingConfigWithFraming,
153
154    /// Compression configuration.
155    ///
156    /// All compression algorithms use the default compression level unless otherwise specified.
157    ///
158    /// Some cloud storage API clients and browsers handle decompression transparently, so
159    /// depending on how they are accessed, files may not always appear to be compressed.
160    #[configurable(derived)]
161    #[serde(default)]
162    compression: Compression,
163
164    /// Overrides the MIME type of the created objects.
165    ///
166    /// Directly comparable to the `Content-Type` HTTP header.
167    ///
168    /// If not specified, defaults to the encoder's content type.
169    #[configurable(metadata(
170        docs::examples = "text/plain; charset=utf-8",
171        docs::examples = "application/gzip"
172    ))]
173    content_type: Option<String>,
174
175    #[configurable(derived)]
176    #[serde(default)]
177    batch: BatchConfig<BulkSizeBasedDefaultBatchSettings>,
178
179    /// API endpoint for Google Cloud Storage
180    #[configurable(metadata(docs::examples = "http://localhost:9000"))]
181    #[configurable(validation(format = "uri"))]
182    #[serde(default = "default_endpoint")]
183    endpoint: String,
184
185    #[configurable(derived)]
186    #[serde(default)]
187    request: TowerRequestConfig<GcsTowerRequestConfigDefaults>,
188
189    #[serde(flatten)]
190    auth: GcpAuthConfig,
191
192    #[configurable(derived)]
193    tls: Option<TlsConfig>,
194
195    #[configurable(derived)]
196    #[serde(
197        default,
198        deserialize_with = "crate::serde::bool_or_struct",
199        skip_serializing_if = "crate::serde::is_default"
200    )]
201    acknowledgements: AcknowledgementsConfig,
202
203    #[configurable(derived)]
204    #[serde(default)]
205    pub timezone: Option<TimeZone>,
206}
207
208fn default_time_format() -> String {
209    "%s".to_string()
210}
211
212#[cfg(test)]
213fn default_config(encoding: EncodingConfigWithFraming) -> GcsSinkConfig {
214    GcsSinkConfig {
215        bucket: Default::default(),
216        acl: Default::default(),
217        storage_class: Default::default(),
218        metadata: Default::default(),
219        key_prefix: Default::default(),
220        filename_time_format: default_time_format(),
221        filename_append_uuid: true,
222        filename_extension: Default::default(),
223        content_type: Default::default(),
224        encoding,
225        compression: Compression::gzip_default(),
226        batch: Default::default(),
227        endpoint: Default::default(),
228        request: Default::default(),
229        auth: Default::default(),
230        tls: Default::default(),
231        acknowledgements: Default::default(),
232        timezone: Default::default(),
233    }
234}
235
236impl GenerateConfig for GcsSinkConfig {
237    fn generate_config() -> toml::Value {
238        toml::from_str(indoc! {r#"
239            bucket = "my-bucket"
240            credentials_path = "/path/to/credentials.json"
241            framing.method = "newline_delimited"
242            encoding.codec = "json"
243        "#})
244        .unwrap()
245    }
246}
247
248#[async_trait::async_trait]
249#[typetag::serde(name = "gcp_cloud_storage")]
250impl SinkConfig for GcsSinkConfig {
251    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
252        let auth = self.auth.build(Scope::DevStorageReadWrite).await?;
253        let base_url = format!("{}/{}/", self.endpoint, self.bucket);
254        let tls = TlsSettings::from_options(self.tls.as_ref())?;
255        let client = HttpClient::new(tls, cx.proxy())?;
256        let healthcheck = build_healthcheck(
257            self.bucket.clone(),
258            client.clone(),
259            base_url.clone(),
260            auth.clone(),
261        )?;
262        auth.spawn_regenerate_token();
263        let sink = self.build_sink(client, base_url, auth, cx)?;
264
265        Ok((sink, healthcheck))
266    }
267
268    fn input(&self) -> Input {
269        Input::new(self.encoding.config().1.input_type() & DataType::Log)
270    }
271
272    fn acknowledgements(&self) -> &AcknowledgementsConfig {
273        &self.acknowledgements
274    }
275}
276
277impl GcsSinkConfig {
278    fn build_sink(
279        &self,
280        client: HttpClient,
281        base_url: String,
282        auth: GcpAuthenticator,
283        cx: SinkContext,
284    ) -> crate::Result<VectorSink> {
285        let request = self.request.into_settings();
286
287        let batch_settings = self.batch.into_batcher_settings()?;
288
289        let partitioner = self.key_partitioner()?;
290
291        let protocol = get_http_scheme_from_uri(&base_url.parse::<Uri>().unwrap());
292
293        let svc = ServiceBuilder::new()
294            .settings(request, GcsRetryLogic::default())
295            .service(GcsService::new(client, base_url, auth));
296
297        let request_settings = RequestSettings::new(self, cx)?;
298
299        let sink = GcsSink::new(svc, request_settings, partitioner, batch_settings, protocol);
300
301        Ok(VectorSink::from_event_streamsink(sink))
302    }
303
304    fn key_partitioner(&self) -> crate::Result<KeyPartitioner> {
305        Ok(KeyPartitioner::new(
306            Template::try_from(self.key_prefix.as_deref().unwrap_or("date=%F/"))
307                .context(KeyPrefixTemplateSnafu)?,
308            None,
309        ))
310    }
311}
312
313// Settings required to produce a request that do not change per
314// request. All possible values are pre-computed for direct use in
315// producing a request.
316#[derive(Clone, Debug)]
317struct RequestSettings {
318    acl: Option<HeaderValue>,
319    content_type: HeaderValue,
320    content_encoding: Option<HeaderValue>,
321    storage_class: HeaderValue,
322    headers: Vec<(HeaderName, HeaderValue)>,
323    extension: String,
324    time_format: String,
325    append_uuid: bool,
326    encoder: (Transformer, Encoder<Framer>),
327    compression: Compression,
328    tz_offset: Option<FixedOffset>,
329}
330
331impl RequestBuilder<(String, Vec<Event>)> for RequestSettings {
332    type Metadata = (String, EventFinalizers);
333    type Events = Vec<Event>;
334    type Encoder = (Transformer, Encoder<Framer>);
335    type Payload = Bytes;
336    type Request = GcsRequest;
337    type Error = io::Error;
338
339    fn compression(&self) -> Compression {
340        self.compression
341    }
342
343    fn encoder(&self) -> &Self::Encoder {
344        &self.encoder
345    }
346
347    fn split_input(
348        &self,
349        input: (String, Vec<Event>),
350    ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
351        let (partition_key, mut events) = input;
352        let finalizers = events.take_finalizers();
353        let builder = RequestMetadataBuilder::from_events(&events);
354
355        ((partition_key, finalizers), builder, events)
356    }
357
358    fn build_request(
359        &self,
360        gcp_metadata: Self::Metadata,
361        metadata: RequestMetadata,
362        payload: EncodeResult<Self::Payload>,
363    ) -> Self::Request {
364        let (key, finalizers) = gcp_metadata;
365        // TODO: pull the seconds from the last event
366        let filename = {
367            let seconds = match self.tz_offset {
368                Some(offset) => Utc::now().with_timezone(&offset).format(&self.time_format),
369                None => Utc::now()
370                    .with_timezone(&chrono::Utc)
371                    .format(&self.time_format),
372            };
373
374            if self.append_uuid {
375                let uuid = Uuid::new_v4();
376                format!("{}-{}", seconds, uuid.hyphenated())
377            } else {
378                seconds.to_string()
379            }
380        };
381
382        let key = format!("{}{}.{}", key, filename, self.extension);
383        let body = payload.into_payload();
384
385        GcsRequest {
386            key,
387            body,
388            finalizers,
389            settings: GcsRequestSettings {
390                acl: self.acl.clone(),
391                content_type: self.content_type.clone(),
392                content_encoding: self.content_encoding.clone(),
393                storage_class: self.storage_class.clone(),
394                headers: self.headers.clone(),
395            },
396            metadata,
397        }
398    }
399}
400
401impl RequestSettings {
402    fn new(config: &GcsSinkConfig, cx: SinkContext) -> crate::Result<Self> {
403        let transformer = config.encoding.transformer();
404        let (framer, serializer) = config.encoding.build(SinkType::MessageBased)?;
405        let encoder = Encoder::<Framer>::new(framer, serializer);
406        let acl = config
407            .acl
408            .map(|acl| HeaderValue::from_str(&to_string(acl)).unwrap());
409        let content_type_str = config
410            .content_type
411            .as_deref()
412            .unwrap_or_else(|| encoder.content_type());
413        let content_type = HeaderValue::from_str(content_type_str)?;
414        let content_encoding = config
415            .compression
416            .content_encoding()
417            .map(|ce| HeaderValue::from_str(&to_string(ce)).unwrap());
418        let storage_class = config.storage_class.unwrap_or_default();
419        let storage_class = HeaderValue::from_str(&to_string(storage_class)).unwrap();
420        let metadata = config
421            .metadata
422            .as_ref()
423            .map(|metadata| {
424                metadata
425                    .iter()
426                    .map(make_header)
427                    .collect::<Result<Vec<_>, _>>()
428            })
429            .unwrap_or_else(|| Ok(vec![]))?;
430        let extension = config
431            .filename_extension
432            .clone()
433            .unwrap_or_else(|| config.compression.extension().into());
434        let time_format = config.filename_time_format.clone();
435        let append_uuid = config.filename_append_uuid;
436        let offset = config
437            .timezone
438            .or(cx.globals.timezone)
439            .and_then(timezone_to_offset);
440
441        Ok(Self {
442            acl,
443            content_type,
444            content_encoding,
445            storage_class,
446            headers: metadata,
447            extension,
448            time_format,
449            append_uuid,
450            compression: config.compression,
451            encoder: (transformer, encoder),
452            tz_offset: offset,
453        })
454    }
455}
456
457// Make a header pair from a key-value string pair
458fn make_header((name, value): (&String, &String)) -> crate::Result<(HeaderName, HeaderValue)> {
459    Ok((
460        HeaderName::from_bytes(name.as_bytes())?,
461        HeaderValue::from_str(value)?,
462    ))
463}
464
465#[cfg(test)]
466mod tests {
467    use futures_util::{future::ready, stream};
468    use vector_lib::{
469        EstimatedJsonEncodedSizeOf,
470        codecs::{
471            JsonSerializerConfig, NewlineDelimitedEncoderConfig, TextSerializerConfig,
472            encoding::FramingConfig,
473        },
474        partition::Partitioner,
475        request_metadata::GroupedCountByteSize,
476    };
477
478    use super::*;
479    use crate::{
480        event::LogEvent,
481        test_util::{
482            components::{SINK_TAGS, run_and_assert_sink_compliance},
483            http::{always_200_response, spawn_blackhole_http_server},
484        },
485    };
486
487    #[test]
488    fn generate_config() {
489        crate::test_util::test_generate_config::<GcsSinkConfig>();
490    }
491
492    #[tokio::test]
493    async fn component_spec_compliance() {
494        let mock_endpoint = spawn_blackhole_http_server(always_200_response).await;
495
496        let context = SinkContext::default();
497
498        let tls = TlsSettings::default();
499        let client =
500            HttpClient::new(tls, context.proxy()).expect("should not fail to create HTTP client");
501
502        let config =
503            default_config((None::<FramingConfig>, JsonSerializerConfig::default()).into());
504        let sink = config
505            .build_sink(
506                client,
507                mock_endpoint.to_string(),
508                GcpAuthenticator::None,
509                context,
510            )
511            .expect("failed to build sink");
512
513        let event = Event::Log(LogEvent::from("simple message"));
514        run_and_assert_sink_compliance(sink, stream::once(ready(event)), &SINK_TAGS).await;
515    }
516
517    #[test]
518    fn gcs_encode_event_apply_rules() {
519        crate::test_util::trace_init();
520
521        let message = "hello world".to_string();
522        let mut event = LogEvent::from(message);
523        event.insert("key", "value");
524
525        let sink_config = GcsSinkConfig {
526            key_prefix: Some("key: {{ key }}".into()),
527            ..default_config((None::<FramingConfig>, TextSerializerConfig::default()).into())
528        };
529        let key = sink_config
530            .key_partitioner()
531            .unwrap()
532            .partition(&Event::Log(event))
533            .expect("key wasn't provided");
534
535        assert_eq!(key, "key: value");
536    }
537
538    fn request_settings(sink_config: &GcsSinkConfig, context: SinkContext) -> RequestSettings {
539        RequestSettings::new(sink_config, context).expect("Could not create request settings")
540    }
541
542    fn build_request(extension: Option<&str>, uuid: bool, compression: Compression) -> GcsRequest {
543        let context = SinkContext::default();
544        let sink_config = GcsSinkConfig {
545            key_prefix: Some("key/".into()),
546            filename_time_format: "date".into(),
547            filename_extension: extension.map(Into::into),
548            filename_append_uuid: uuid,
549            compression,
550            ..default_config(
551                (
552                    Some(NewlineDelimitedEncoderConfig::new()),
553                    JsonSerializerConfig::default(),
554                )
555                    .into(),
556            )
557        };
558        let log = LogEvent::default().into();
559        let key = sink_config
560            .key_partitioner()
561            .unwrap()
562            .partition(&log)
563            .expect("key wasn't provided");
564
565        let mut byte_size = GroupedCountByteSize::new_untagged();
566        byte_size.add_event(&log, log.estimated_json_encoded_size_of());
567
568        let request_settings = request_settings(&sink_config, context);
569        let (metadata, metadata_request_builder, _events) =
570            request_settings.split_input((key, vec![log]));
571        let payload = EncodeResult::uncompressed(Bytes::new(), byte_size);
572        let request_metadata = metadata_request_builder.build(&payload);
573
574        request_settings.build_request(metadata, request_metadata, payload)
575    }
576
577    #[test]
578    fn gcs_build_request() {
579        let req = build_request(Some("ext"), false, Compression::None);
580        assert_eq!(req.key, "key/date.ext".to_string());
581
582        let req = build_request(None, false, Compression::None);
583        assert_eq!(req.key, "key/date.log".to_string());
584
585        let req = build_request(None, false, Compression::gzip_default());
586        assert_eq!(req.key, "key/date.log.gz".to_string());
587
588        let req = build_request(None, true, Compression::gzip_default());
589        assert_ne!(req.key, "key/date.log.gz".to_string());
590    }
591
592    #[test]
593    fn gcs_content_type_default() {
594        let context = SinkContext::default();
595        let sink_config = GcsSinkConfig {
596            content_type: None,
597            ..default_config((None::<FramingConfig>, TextSerializerConfig::default()).into())
598        };
599
600        let request_settings = request_settings(&sink_config, context);
601        // Should default to encoder's content type which is "text/plain" for text codec
602        assert_eq!(
603            request_settings.content_type.to_str().unwrap(),
604            "text/plain"
605        );
606    }
607
608    #[test]
609    fn gcs_content_type_custom() {
610        let context = SinkContext::default();
611        let sink_config = GcsSinkConfig {
612            content_type: Some("text/plain; charset=utf-8".to_string()),
613            ..default_config((None::<FramingConfig>, TextSerializerConfig::default()).into())
614        };
615
616        let request_settings = request_settings(&sink_config, context);
617        // Should use custom content type
618        assert_eq!(
619            request_settings.content_type.to_str().unwrap(),
620            "text/plain; charset=utf-8"
621        );
622    }
623
624    #[test]
625    fn gcs_content_type_invalid() {
626        let context = SinkContext::default();
627        let sink_config = GcsSinkConfig {
628            // Invalid header value with newline character
629            content_type: Some("text/plain\nInvalid".to_string()),
630            ..default_config((None::<FramingConfig>, TextSerializerConfig::default()).into())
631        };
632
633        let result = RequestSettings::new(&sink_config, context);
634        // Should return an error, not panic
635        assert!(result.is_err());
636    }
637}