vector/sinks/gcp/
cloud_storage.rs

1use std::{collections::HashMap, convert::TryFrom, io};
2
3use bytes::Bytes;
4use chrono::{FixedOffset, Utc};
5use http::header::{HeaderName, HeaderValue};
6use http::Uri;
7use indoc::indoc;
8use snafu::ResultExt;
9use snafu::Snafu;
10use tower::ServiceBuilder;
11use uuid::Uuid;
12use vector_lib::codecs::encoding::Framer;
13use vector_lib::configurable::configurable_component;
14use vector_lib::event::{EventFinalizers, Finalizable};
15use vector_lib::{request_metadata::RequestMetadata, TimeZone};
16
17use crate::sinks::util::metadata::RequestMetadataBuilder;
18use crate::sinks::util::service::TowerRequestConfigDefaults;
19use crate::{
20    codecs::{Encoder, EncodingConfigWithFraming, SinkType, Transformer},
21    config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext},
22    event::Event,
23    gcp::{GcpAuthConfig, GcpAuthenticator, Scope},
24    http::{get_http_scheme_from_uri, HttpClient},
25    serde::json::to_string,
26    sinks::{
27        gcs_common::{
28            config::{
29                build_healthcheck, default_endpoint, GcsPredefinedAcl, GcsRetryLogic,
30                GcsStorageClass,
31            },
32            service::{GcsRequest, GcsRequestSettings, GcsService},
33            sink::GcsSink,
34        },
35        util::{
36            batch::BatchConfig, partitioner::KeyPartitioner, request_builder::EncodeResult,
37            timezone_to_offset, BulkSizeBasedDefaultBatchSettings, Compression, RequestBuilder,
38            ServiceBuilderExt, TowerRequestConfig,
39        },
40        Healthcheck, VectorSink,
41    },
42    template::{Template, TemplateParseError},
43    tls::{TlsConfig, TlsSettings},
44};
45
46#[derive(Debug, Snafu)]
47#[snafu(visibility(pub))]
48pub enum GcsHealthcheckError {
49    #[snafu(display("key_prefix template parse error: {}", source))]
50    KeyPrefixTemplate { source: TemplateParseError },
51}
52
53#[derive(Clone, Copy, Debug)]
54pub struct GcsTowerRequestConfigDefaults;
55
56impl TowerRequestConfigDefaults for GcsTowerRequestConfigDefaults {
57    const RATE_LIMIT_NUM: u64 = 1_000;
58}
59
60/// Configuration for the `gcp_cloud_storage` sink.
61#[configurable_component(sink(
62    "gcp_cloud_storage",
63    "Store observability events in GCP Cloud Storage."
64))]
65#[derive(Clone, Debug)]
66#[serde(deny_unknown_fields)]
67pub struct GcsSinkConfig {
68    /// The GCS bucket name.
69    #[configurable(metadata(docs::examples = "my-bucket"))]
70    bucket: String,
71
72    /// The Predefined ACL to apply to created objects.
73    ///
74    /// For more information, see [Predefined ACLs][predefined_acls].
75    ///
76    /// [predefined_acls]: https://cloud.google.com/storage/docs/access-control/lists#predefined-acl
77    acl: Option<GcsPredefinedAcl>,
78
79    /// The storage class for created objects.
80    ///
81    /// For more information, see the [storage classes][storage_classes] documentation.
82    ///
83    /// [storage_classes]: https://cloud.google.com/storage/docs/storage-classes
84    storage_class: Option<GcsStorageClass>,
85
86    /// The set of metadata `key:value` pairs for the created objects.
87    ///
88    /// For more information, see the [custom metadata][custom_metadata] documentation.
89    ///
90    /// [custom_metadata]: https://cloud.google.com/storage/docs/metadata#custom-metadata
91    #[configurable(metadata(docs::additional_props_description = "A key/value pair."))]
92    #[configurable(metadata(docs::advanced))]
93    metadata: Option<HashMap<String, String>>,
94
95    /// A prefix to apply to all object keys.
96    ///
97    /// Prefixes are useful for partitioning objects, such as by creating an object key that
98    /// stores objects under a particular directory. If using a prefix for this purpose, it must end
99    /// in `/` in order to act as a directory path. A trailing `/` is **not** automatically added.
100    #[configurable(metadata(docs::templateable))]
101    #[configurable(metadata(
102        docs::examples = "date=%F/",
103        docs::examples = "date=%F/hour=%H/",
104        docs::examples = "year=%Y/month=%m/day=%d/",
105        docs::examples = "application_id={{ application_id }}/date=%F/"
106    ))]
107    #[configurable(metadata(docs::advanced))]
108    key_prefix: Option<String>,
109
110    /// The timestamp format for the time component of the object key.
111    ///
112    /// By default, object keys are appended with a timestamp that reflects when the objects are
113    /// sent to S3, such that the resulting object key is functionally equivalent to joining the key
114    /// prefix with the formatted timestamp, such as `date=2022-07-18/1658176486`.
115    ///
116    /// This would represent a `key_prefix` set to `date=%F/` and the timestamp of Mon Jul 18 2022
117    /// 20:34:44 GMT+0000, with the `filename_time_format` being set to `%s`, which renders
118    /// timestamps in seconds since the Unix epoch.
119    ///
120    /// Supports the common [`strftime`][chrono_strftime_specifiers] specifiers found in most
121    /// languages.
122    ///
123    /// When set to an empty string, no timestamp is appended to the key prefix.
124    ///
125    /// [chrono_strftime_specifiers]: https://docs.rs/chrono/latest/chrono/format/strftime/index.html#specifiers
126    #[serde(default = "default_time_format")]
127    #[configurable(metadata(docs::advanced))]
128    filename_time_format: String,
129
130    /// Whether or not to append a UUID v4 token to the end of the object key.
131    ///
132    /// The UUID is appended to the timestamp portion of the object key, such that if the object key
133    /// generated is `date=2022-07-18/1658176486`, setting this field to `true` results
134    /// in an object key that looks like `date=2022-07-18/1658176486-30f6652c-71da-4f9f-800d-a1189c47c547`.
135    ///
136    /// This ensures there are no name collisions, and can be useful in high-volume workloads where
137    /// object keys must be unique.
138    #[serde(default = "crate::serde::default_true")]
139    #[configurable(metadata(docs::advanced))]
140    filename_append_uuid: bool,
141
142    /// The filename extension to use in the object key.
143    ///
144    /// If not specified, the extension is determined by the compression scheme used.
145    #[configurable(metadata(docs::advanced))]
146    filename_extension: Option<String>,
147
148    #[serde(flatten)]
149    encoding: EncodingConfigWithFraming,
150
151    /// Compression configuration.
152    ///
153    /// All compression algorithms use the default compression level unless otherwise specified.
154    ///
155    /// Some cloud storage API clients and browsers handle decompression transparently, so
156    /// depending on how they are accessed, files may not always appear to be compressed.
157    #[configurable(derived)]
158    #[serde(default)]
159    compression: Compression,
160
161    #[configurable(derived)]
162    #[serde(default)]
163    batch: BatchConfig<BulkSizeBasedDefaultBatchSettings>,
164
165    /// API endpoint for Google Cloud Storage
166    #[configurable(metadata(docs::examples = "http://localhost:9000"))]
167    #[configurable(validation(format = "uri"))]
168    #[serde(default = "default_endpoint")]
169    endpoint: String,
170
171    #[configurable(derived)]
172    #[serde(default)]
173    request: TowerRequestConfig<GcsTowerRequestConfigDefaults>,
174
175    #[serde(flatten)]
176    auth: GcpAuthConfig,
177
178    #[configurable(derived)]
179    tls: Option<TlsConfig>,
180
181    #[configurable(derived)]
182    #[serde(
183        default,
184        deserialize_with = "crate::serde::bool_or_struct",
185        skip_serializing_if = "crate::serde::is_default"
186    )]
187    acknowledgements: AcknowledgementsConfig,
188
189    #[configurable(derived)]
190    #[serde(default)]
191    pub timezone: Option<TimeZone>,
192}
193
194fn default_time_format() -> String {
195    "%s".to_string()
196}
197
198#[cfg(test)]
199fn default_config(encoding: EncodingConfigWithFraming) -> GcsSinkConfig {
200    GcsSinkConfig {
201        bucket: Default::default(),
202        acl: Default::default(),
203        storage_class: Default::default(),
204        metadata: Default::default(),
205        key_prefix: Default::default(),
206        filename_time_format: default_time_format(),
207        filename_append_uuid: true,
208        filename_extension: Default::default(),
209        encoding,
210        compression: Compression::gzip_default(),
211        batch: Default::default(),
212        endpoint: Default::default(),
213        request: Default::default(),
214        auth: Default::default(),
215        tls: Default::default(),
216        acknowledgements: Default::default(),
217        timezone: Default::default(),
218    }
219}
220
221impl GenerateConfig for GcsSinkConfig {
222    fn generate_config() -> toml::Value {
223        toml::from_str(indoc! {r#"
224            bucket = "my-bucket"
225            credentials_path = "/path/to/credentials.json"
226            framing.method = "newline_delimited"
227            encoding.codec = "json"
228        "#})
229        .unwrap()
230    }
231}
232
233#[async_trait::async_trait]
234#[typetag::serde(name = "gcp_cloud_storage")]
235impl SinkConfig for GcsSinkConfig {
236    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
237        let auth = self.auth.build(Scope::DevStorageReadWrite).await?;
238        let base_url = format!("{}/{}/", self.endpoint, self.bucket);
239        let tls = TlsSettings::from_options(self.tls.as_ref())?;
240        let client = HttpClient::new(tls, cx.proxy())?;
241        let healthcheck = build_healthcheck(
242            self.bucket.clone(),
243            client.clone(),
244            base_url.clone(),
245            auth.clone(),
246        )?;
247        auth.spawn_regenerate_token();
248        let sink = self.build_sink(client, base_url, auth, cx)?;
249
250        Ok((sink, healthcheck))
251    }
252
253    fn input(&self) -> Input {
254        Input::new(self.encoding.config().1.input_type() & DataType::Log)
255    }
256
257    fn acknowledgements(&self) -> &AcknowledgementsConfig {
258        &self.acknowledgements
259    }
260}
261
262impl GcsSinkConfig {
263    fn build_sink(
264        &self,
265        client: HttpClient,
266        base_url: String,
267        auth: GcpAuthenticator,
268        cx: SinkContext,
269    ) -> crate::Result<VectorSink> {
270        let request = self.request.into_settings();
271
272        let batch_settings = self.batch.into_batcher_settings()?;
273
274        let partitioner = self.key_partitioner()?;
275
276        let protocol = get_http_scheme_from_uri(&base_url.parse::<Uri>().unwrap());
277
278        let svc = ServiceBuilder::new()
279            .settings(request, GcsRetryLogic::default())
280            .service(GcsService::new(client, base_url, auth));
281
282        let request_settings = RequestSettings::new(self, cx)?;
283
284        let sink = GcsSink::new(svc, request_settings, partitioner, batch_settings, protocol);
285
286        Ok(VectorSink::from_event_streamsink(sink))
287    }
288
289    fn key_partitioner(&self) -> crate::Result<KeyPartitioner> {
290        Ok(KeyPartitioner::new(
291            Template::try_from(self.key_prefix.as_deref().unwrap_or("date=%F/"))
292                .context(KeyPrefixTemplateSnafu)?,
293            None,
294        ))
295    }
296}
297
298// Settings required to produce a request that do not change per
299// request. All possible values are pre-computed for direct use in
300// producing a request.
301#[derive(Clone, Debug)]
302struct RequestSettings {
303    acl: Option<HeaderValue>,
304    content_type: HeaderValue,
305    content_encoding: Option<HeaderValue>,
306    storage_class: HeaderValue,
307    headers: Vec<(HeaderName, HeaderValue)>,
308    extension: String,
309    time_format: String,
310    append_uuid: bool,
311    encoder: (Transformer, Encoder<Framer>),
312    compression: Compression,
313    tz_offset: Option<FixedOffset>,
314}
315
316impl RequestBuilder<(String, Vec<Event>)> for RequestSettings {
317    type Metadata = (String, EventFinalizers);
318    type Events = Vec<Event>;
319    type Encoder = (Transformer, Encoder<Framer>);
320    type Payload = Bytes;
321    type Request = GcsRequest;
322    type Error = io::Error;
323
324    fn compression(&self) -> Compression {
325        self.compression
326    }
327
328    fn encoder(&self) -> &Self::Encoder {
329        &self.encoder
330    }
331
332    fn split_input(
333        &self,
334        input: (String, Vec<Event>),
335    ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
336        let (partition_key, mut events) = input;
337        let finalizers = events.take_finalizers();
338        let builder = RequestMetadataBuilder::from_events(&events);
339
340        ((partition_key, finalizers), builder, events)
341    }
342
343    fn build_request(
344        &self,
345        gcp_metadata: Self::Metadata,
346        metadata: RequestMetadata,
347        payload: EncodeResult<Self::Payload>,
348    ) -> Self::Request {
349        let (key, finalizers) = gcp_metadata;
350        // TODO: pull the seconds from the last event
351        let filename = {
352            let seconds = match self.tz_offset {
353                Some(offset) => Utc::now().with_timezone(&offset).format(&self.time_format),
354                None => Utc::now()
355                    .with_timezone(&chrono::Utc)
356                    .format(&self.time_format),
357            };
358
359            if self.append_uuid {
360                let uuid = Uuid::new_v4();
361                format!("{}-{}", seconds, uuid.hyphenated())
362            } else {
363                seconds.to_string()
364            }
365        };
366
367        let key = format!("{}{}.{}", key, filename, self.extension);
368        let body = payload.into_payload();
369
370        GcsRequest {
371            key,
372            body,
373            finalizers,
374            settings: GcsRequestSettings {
375                acl: self.acl.clone(),
376                content_type: self.content_type.clone(),
377                content_encoding: self.content_encoding.clone(),
378                storage_class: self.storage_class.clone(),
379                headers: self.headers.clone(),
380            },
381            metadata,
382        }
383    }
384}
385
386impl RequestSettings {
387    fn new(config: &GcsSinkConfig, cx: SinkContext) -> crate::Result<Self> {
388        let transformer = config.encoding.transformer();
389        let (framer, serializer) = config.encoding.build(SinkType::MessageBased)?;
390        let encoder = Encoder::<Framer>::new(framer, serializer);
391        let acl = config
392            .acl
393            .map(|acl| HeaderValue::from_str(&to_string(acl)).unwrap());
394        let content_type = HeaderValue::from_str(encoder.content_type()).unwrap();
395        let content_encoding = config
396            .compression
397            .content_encoding()
398            .map(|ce| HeaderValue::from_str(&to_string(ce)).unwrap());
399        let storage_class = config.storage_class.unwrap_or_default();
400        let storage_class = HeaderValue::from_str(&to_string(storage_class)).unwrap();
401        let metadata = config
402            .metadata
403            .as_ref()
404            .map(|metadata| {
405                metadata
406                    .iter()
407                    .map(make_header)
408                    .collect::<Result<Vec<_>, _>>()
409            })
410            .unwrap_or_else(|| Ok(vec![]))?;
411        let extension = config
412            .filename_extension
413            .clone()
414            .unwrap_or_else(|| config.compression.extension().into());
415        let time_format = config.filename_time_format.clone();
416        let append_uuid = config.filename_append_uuid;
417        let offset = config
418            .timezone
419            .or(cx.globals.timezone)
420            .and_then(timezone_to_offset);
421
422        Ok(Self {
423            acl,
424            content_type,
425            content_encoding,
426            storage_class,
427            headers: metadata,
428            extension,
429            time_format,
430            append_uuid,
431            compression: config.compression,
432            encoder: (transformer, encoder),
433            tz_offset: offset,
434        })
435    }
436}
437
438// Make a header pair from a key-value string pair
439fn make_header((name, value): (&String, &String)) -> crate::Result<(HeaderName, HeaderValue)> {
440    Ok((
441        HeaderName::from_bytes(name.as_bytes())?,
442        HeaderValue::from_str(value)?,
443    ))
444}
445
446#[cfg(test)]
447mod tests {
448    use futures_util::{future::ready, stream};
449    use vector_lib::codecs::encoding::FramingConfig;
450    use vector_lib::codecs::{
451        JsonSerializerConfig, NewlineDelimitedEncoderConfig, TextSerializerConfig,
452    };
453    use vector_lib::partition::Partitioner;
454    use vector_lib::request_metadata::GroupedCountByteSize;
455    use vector_lib::EstimatedJsonEncodedSizeOf;
456
457    use crate::event::LogEvent;
458    use crate::test_util::{
459        components::{run_and_assert_sink_compliance, SINK_TAGS},
460        http::{always_200_response, spawn_blackhole_http_server},
461    };
462
463    use super::*;
464
465    #[test]
466    fn generate_config() {
467        crate::test_util::test_generate_config::<GcsSinkConfig>();
468    }
469
470    #[tokio::test]
471    async fn component_spec_compliance() {
472        let mock_endpoint = spawn_blackhole_http_server(always_200_response).await;
473
474        let context = SinkContext::default();
475
476        let tls = TlsSettings::default();
477        let client =
478            HttpClient::new(tls, context.proxy()).expect("should not fail to create HTTP client");
479
480        let config =
481            default_config((None::<FramingConfig>, JsonSerializerConfig::default()).into());
482        let sink = config
483            .build_sink(
484                client,
485                mock_endpoint.to_string(),
486                GcpAuthenticator::None,
487                context,
488            )
489            .expect("failed to build sink");
490
491        let event = Event::Log(LogEvent::from("simple message"));
492        run_and_assert_sink_compliance(sink, stream::once(ready(event)), &SINK_TAGS).await;
493    }
494
495    #[test]
496    fn gcs_encode_event_apply_rules() {
497        crate::test_util::trace_init();
498
499        let message = "hello world".to_string();
500        let mut event = LogEvent::from(message);
501        event.insert("key", "value");
502
503        let sink_config = GcsSinkConfig {
504            key_prefix: Some("key: {{ key }}".into()),
505            ..default_config((None::<FramingConfig>, TextSerializerConfig::default()).into())
506        };
507        let key = sink_config
508            .key_partitioner()
509            .unwrap()
510            .partition(&Event::Log(event))
511            .expect("key wasn't provided");
512
513        assert_eq!(key, "key: value");
514    }
515
516    fn request_settings(sink_config: &GcsSinkConfig, context: SinkContext) -> RequestSettings {
517        RequestSettings::new(sink_config, context).expect("Could not create request settings")
518    }
519
520    fn build_request(extension: Option<&str>, uuid: bool, compression: Compression) -> GcsRequest {
521        let context = SinkContext::default();
522        let sink_config = GcsSinkConfig {
523            key_prefix: Some("key/".into()),
524            filename_time_format: "date".into(),
525            filename_extension: extension.map(Into::into),
526            filename_append_uuid: uuid,
527            compression,
528            ..default_config(
529                (
530                    Some(NewlineDelimitedEncoderConfig::new()),
531                    JsonSerializerConfig::default(),
532                )
533                    .into(),
534            )
535        };
536        let log = LogEvent::default().into();
537        let key = sink_config
538            .key_partitioner()
539            .unwrap()
540            .partition(&log)
541            .expect("key wasn't provided");
542
543        let mut byte_size = GroupedCountByteSize::new_untagged();
544        byte_size.add_event(&log, log.estimated_json_encoded_size_of());
545
546        let request_settings = request_settings(&sink_config, context);
547        let (metadata, metadata_request_builder, _events) =
548            request_settings.split_input((key, vec![log]));
549        let payload = EncodeResult::uncompressed(Bytes::new(), byte_size);
550        let request_metadata = metadata_request_builder.build(&payload);
551
552        request_settings.build_request(metadata, request_metadata, payload)
553    }
554
555    #[test]
556    fn gcs_build_request() {
557        let req = build_request(Some("ext"), false, Compression::None);
558        assert_eq!(req.key, "key/date.ext".to_string());
559
560        let req = build_request(None, false, Compression::None);
561        assert_eq!(req.key, "key/date.log".to_string());
562
563        let req = build_request(None, false, Compression::gzip_default());
564        assert_eq!(req.key, "key/date.log.gz".to_string());
565
566        let req = build_request(None, true, Compression::gzip_default());
567        assert_ne!(req.key, "key/date.log.gz".to_string());
568    }
569}