1use std::{collections::HashMap, convert::TryFrom, io};
2
3use bytes::Bytes;
4use chrono::{FixedOffset, Utc};
5use http::{
6    Uri,
7    header::{HeaderName, HeaderValue},
8};
9use indoc::indoc;
10use snafu::{ResultExt, Snafu};
11use tower::ServiceBuilder;
12use uuid::Uuid;
13use vector_lib::{
14    TimeZone,
15    codecs::encoding::Framer,
16    configurable::configurable_component,
17    event::{EventFinalizers, Finalizable},
18    request_metadata::RequestMetadata,
19};
20
21use crate::{
22    codecs::{Encoder, EncodingConfigWithFraming, SinkType, Transformer},
23    config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext},
24    event::Event,
25    gcp::{GcpAuthConfig, GcpAuthenticator, Scope},
26    http::{HttpClient, get_http_scheme_from_uri},
27    serde::json::to_string,
28    sinks::{
29        Healthcheck, VectorSink,
30        gcs_common::{
31            config::{
32                GcsPredefinedAcl, GcsRetryLogic, GcsStorageClass, build_healthcheck,
33                default_endpoint,
34            },
35            service::{GcsRequest, GcsRequestSettings, GcsService},
36            sink::GcsSink,
37        },
38        util::{
39            BulkSizeBasedDefaultBatchSettings, Compression, RequestBuilder, ServiceBuilderExt,
40            TowerRequestConfig, batch::BatchConfig, metadata::RequestMetadataBuilder,
41            partitioner::KeyPartitioner, request_builder::EncodeResult,
42            service::TowerRequestConfigDefaults, timezone_to_offset,
43        },
44    },
45    template::{Template, TemplateParseError},
46    tls::{TlsConfig, TlsSettings},
47};
48
49#[derive(Debug, Snafu)]
50#[snafu(visibility(pub))]
51pub enum GcsHealthcheckError {
52    #[snafu(display("key_prefix template parse error: {}", source))]
53    KeyPrefixTemplate { source: TemplateParseError },
54}
55
56#[derive(Clone, Copy, Debug)]
57pub struct GcsTowerRequestConfigDefaults;
58
59impl TowerRequestConfigDefaults for GcsTowerRequestConfigDefaults {
60    const RATE_LIMIT_NUM: u64 = 1_000;
61}
62
63#[configurable_component(sink(
65    "gcp_cloud_storage",
66    "Store observability events in GCP Cloud Storage."
67))]
68#[derive(Clone, Debug)]
69#[serde(deny_unknown_fields)]
70pub struct GcsSinkConfig {
71    #[configurable(metadata(docs::examples = "my-bucket"))]
73    bucket: String,
74
75    acl: Option<GcsPredefinedAcl>,
81
82    storage_class: Option<GcsStorageClass>,
88
89    #[configurable(metadata(docs::additional_props_description = "A key/value pair."))]
95    #[configurable(metadata(docs::advanced))]
96    metadata: Option<HashMap<String, String>>,
97
98    #[configurable(metadata(docs::templateable))]
104    #[configurable(metadata(
105        docs::examples = "date=%F/",
106        docs::examples = "date=%F/hour=%H/",
107        docs::examples = "year=%Y/month=%m/day=%d/",
108        docs::examples = "application_id={{ application_id }}/date=%F/"
109    ))]
110    #[configurable(metadata(docs::advanced))]
111    key_prefix: Option<String>,
112
113    #[serde(default = "default_time_format")]
130    #[configurable(metadata(docs::advanced))]
131    filename_time_format: String,
132
133    #[serde(default = "crate::serde::default_true")]
142    #[configurable(metadata(docs::advanced))]
143    filename_append_uuid: bool,
144
145    #[configurable(metadata(docs::advanced))]
149    filename_extension: Option<String>,
150
151    #[serde(flatten)]
152    encoding: EncodingConfigWithFraming,
153
154    #[configurable(derived)]
161    #[serde(default)]
162    compression: Compression,
163
164    #[configurable(derived)]
165    #[serde(default)]
166    batch: BatchConfig<BulkSizeBasedDefaultBatchSettings>,
167
168    #[configurable(metadata(docs::examples = "http://localhost:9000"))]
170    #[configurable(validation(format = "uri"))]
171    #[serde(default = "default_endpoint")]
172    endpoint: String,
173
174    #[configurable(derived)]
175    #[serde(default)]
176    request: TowerRequestConfig<GcsTowerRequestConfigDefaults>,
177
178    #[serde(flatten)]
179    auth: GcpAuthConfig,
180
181    #[configurable(derived)]
182    tls: Option<TlsConfig>,
183
184    #[configurable(derived)]
185    #[serde(
186        default,
187        deserialize_with = "crate::serde::bool_or_struct",
188        skip_serializing_if = "crate::serde::is_default"
189    )]
190    acknowledgements: AcknowledgementsConfig,
191
192    #[configurable(derived)]
193    #[serde(default)]
194    pub timezone: Option<TimeZone>,
195}
196
197fn default_time_format() -> String {
198    "%s".to_string()
199}
200
201#[cfg(test)]
202fn default_config(encoding: EncodingConfigWithFraming) -> GcsSinkConfig {
203    GcsSinkConfig {
204        bucket: Default::default(),
205        acl: Default::default(),
206        storage_class: Default::default(),
207        metadata: Default::default(),
208        key_prefix: Default::default(),
209        filename_time_format: default_time_format(),
210        filename_append_uuid: true,
211        filename_extension: Default::default(),
212        encoding,
213        compression: Compression::gzip_default(),
214        batch: Default::default(),
215        endpoint: Default::default(),
216        request: Default::default(),
217        auth: Default::default(),
218        tls: Default::default(),
219        acknowledgements: Default::default(),
220        timezone: Default::default(),
221    }
222}
223
224impl GenerateConfig for GcsSinkConfig {
225    fn generate_config() -> toml::Value {
226        toml::from_str(indoc! {r#"
227            bucket = "my-bucket"
228            credentials_path = "/path/to/credentials.json"
229            framing.method = "newline_delimited"
230            encoding.codec = "json"
231        "#})
232        .unwrap()
233    }
234}
235
236#[async_trait::async_trait]
237#[typetag::serde(name = "gcp_cloud_storage")]
238impl SinkConfig for GcsSinkConfig {
239    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
240        let auth = self.auth.build(Scope::DevStorageReadWrite).await?;
241        let base_url = format!("{}/{}/", self.endpoint, self.bucket);
242        let tls = TlsSettings::from_options(self.tls.as_ref())?;
243        let client = HttpClient::new(tls, cx.proxy())?;
244        let healthcheck = build_healthcheck(
245            self.bucket.clone(),
246            client.clone(),
247            base_url.clone(),
248            auth.clone(),
249        )?;
250        auth.spawn_regenerate_token();
251        let sink = self.build_sink(client, base_url, auth, cx)?;
252
253        Ok((sink, healthcheck))
254    }
255
256    fn input(&self) -> Input {
257        Input::new(self.encoding.config().1.input_type() & DataType::Log)
258    }
259
260    fn acknowledgements(&self) -> &AcknowledgementsConfig {
261        &self.acknowledgements
262    }
263}
264
265impl GcsSinkConfig {
266    fn build_sink(
267        &self,
268        client: HttpClient,
269        base_url: String,
270        auth: GcpAuthenticator,
271        cx: SinkContext,
272    ) -> crate::Result<VectorSink> {
273        let request = self.request.into_settings();
274
275        let batch_settings = self.batch.into_batcher_settings()?;
276
277        let partitioner = self.key_partitioner()?;
278
279        let protocol = get_http_scheme_from_uri(&base_url.parse::<Uri>().unwrap());
280
281        let svc = ServiceBuilder::new()
282            .settings(request, GcsRetryLogic::default())
283            .service(GcsService::new(client, base_url, auth));
284
285        let request_settings = RequestSettings::new(self, cx)?;
286
287        let sink = GcsSink::new(svc, request_settings, partitioner, batch_settings, protocol);
288
289        Ok(VectorSink::from_event_streamsink(sink))
290    }
291
292    fn key_partitioner(&self) -> crate::Result<KeyPartitioner> {
293        Ok(KeyPartitioner::new(
294            Template::try_from(self.key_prefix.as_deref().unwrap_or("date=%F/"))
295                .context(KeyPrefixTemplateSnafu)?,
296            None,
297        ))
298    }
299}
300
301#[derive(Clone, Debug)]
305struct RequestSettings {
306    acl: Option<HeaderValue>,
307    content_type: HeaderValue,
308    content_encoding: Option<HeaderValue>,
309    storage_class: HeaderValue,
310    headers: Vec<(HeaderName, HeaderValue)>,
311    extension: String,
312    time_format: String,
313    append_uuid: bool,
314    encoder: (Transformer, Encoder<Framer>),
315    compression: Compression,
316    tz_offset: Option<FixedOffset>,
317}
318
319impl RequestBuilder<(String, Vec<Event>)> for RequestSettings {
320    type Metadata = (String, EventFinalizers);
321    type Events = Vec<Event>;
322    type Encoder = (Transformer, Encoder<Framer>);
323    type Payload = Bytes;
324    type Request = GcsRequest;
325    type Error = io::Error;
326
327    fn compression(&self) -> Compression {
328        self.compression
329    }
330
331    fn encoder(&self) -> &Self::Encoder {
332        &self.encoder
333    }
334
335    fn split_input(
336        &self,
337        input: (String, Vec<Event>),
338    ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
339        let (partition_key, mut events) = input;
340        let finalizers = events.take_finalizers();
341        let builder = RequestMetadataBuilder::from_events(&events);
342
343        ((partition_key, finalizers), builder, events)
344    }
345
346    fn build_request(
347        &self,
348        gcp_metadata: Self::Metadata,
349        metadata: RequestMetadata,
350        payload: EncodeResult<Self::Payload>,
351    ) -> Self::Request {
352        let (key, finalizers) = gcp_metadata;
353        let filename = {
355            let seconds = match self.tz_offset {
356                Some(offset) => Utc::now().with_timezone(&offset).format(&self.time_format),
357                None => Utc::now()
358                    .with_timezone(&chrono::Utc)
359                    .format(&self.time_format),
360            };
361
362            if self.append_uuid {
363                let uuid = Uuid::new_v4();
364                format!("{}-{}", seconds, uuid.hyphenated())
365            } else {
366                seconds.to_string()
367            }
368        };
369
370        let key = format!("{}{}.{}", key, filename, self.extension);
371        let body = payload.into_payload();
372
373        GcsRequest {
374            key,
375            body,
376            finalizers,
377            settings: GcsRequestSettings {
378                acl: self.acl.clone(),
379                content_type: self.content_type.clone(),
380                content_encoding: self.content_encoding.clone(),
381                storage_class: self.storage_class.clone(),
382                headers: self.headers.clone(),
383            },
384            metadata,
385        }
386    }
387}
388
389impl RequestSettings {
390    fn new(config: &GcsSinkConfig, cx: SinkContext) -> crate::Result<Self> {
391        let transformer = config.encoding.transformer();
392        let (framer, serializer) = config.encoding.build(SinkType::MessageBased)?;
393        let encoder = Encoder::<Framer>::new(framer, serializer);
394        let acl = config
395            .acl
396            .map(|acl| HeaderValue::from_str(&to_string(acl)).unwrap());
397        let content_type = HeaderValue::from_str(encoder.content_type()).unwrap();
398        let content_encoding = config
399            .compression
400            .content_encoding()
401            .map(|ce| HeaderValue::from_str(&to_string(ce)).unwrap());
402        let storage_class = config.storage_class.unwrap_or_default();
403        let storage_class = HeaderValue::from_str(&to_string(storage_class)).unwrap();
404        let metadata = config
405            .metadata
406            .as_ref()
407            .map(|metadata| {
408                metadata
409                    .iter()
410                    .map(make_header)
411                    .collect::<Result<Vec<_>, _>>()
412            })
413            .unwrap_or_else(|| Ok(vec![]))?;
414        let extension = config
415            .filename_extension
416            .clone()
417            .unwrap_or_else(|| config.compression.extension().into());
418        let time_format = config.filename_time_format.clone();
419        let append_uuid = config.filename_append_uuid;
420        let offset = config
421            .timezone
422            .or(cx.globals.timezone)
423            .and_then(timezone_to_offset);
424
425        Ok(Self {
426            acl,
427            content_type,
428            content_encoding,
429            storage_class,
430            headers: metadata,
431            extension,
432            time_format,
433            append_uuid,
434            compression: config.compression,
435            encoder: (transformer, encoder),
436            tz_offset: offset,
437        })
438    }
439}
440
441fn make_header((name, value): (&String, &String)) -> crate::Result<(HeaderName, HeaderValue)> {
443    Ok((
444        HeaderName::from_bytes(name.as_bytes())?,
445        HeaderValue::from_str(value)?,
446    ))
447}
448
449#[cfg(test)]
450mod tests {
451    use futures_util::{future::ready, stream};
452    use vector_lib::{
453        EstimatedJsonEncodedSizeOf,
454        codecs::{
455            JsonSerializerConfig, NewlineDelimitedEncoderConfig, TextSerializerConfig,
456            encoding::FramingConfig,
457        },
458        partition::Partitioner,
459        request_metadata::GroupedCountByteSize,
460    };
461
462    use super::*;
463    use crate::{
464        event::LogEvent,
465        test_util::{
466            components::{SINK_TAGS, run_and_assert_sink_compliance},
467            http::{always_200_response, spawn_blackhole_http_server},
468        },
469    };
470
471    #[test]
472    fn generate_config() {
473        crate::test_util::test_generate_config::<GcsSinkConfig>();
474    }
475
476    #[tokio::test]
477    async fn component_spec_compliance() {
478        let mock_endpoint = spawn_blackhole_http_server(always_200_response).await;
479
480        let context = SinkContext::default();
481
482        let tls = TlsSettings::default();
483        let client =
484            HttpClient::new(tls, context.proxy()).expect("should not fail to create HTTP client");
485
486        let config =
487            default_config((None::<FramingConfig>, JsonSerializerConfig::default()).into());
488        let sink = config
489            .build_sink(
490                client,
491                mock_endpoint.to_string(),
492                GcpAuthenticator::None,
493                context,
494            )
495            .expect("failed to build sink");
496
497        let event = Event::Log(LogEvent::from("simple message"));
498        run_and_assert_sink_compliance(sink, stream::once(ready(event)), &SINK_TAGS).await;
499    }
500
501    #[test]
502    fn gcs_encode_event_apply_rules() {
503        crate::test_util::trace_init();
504
505        let message = "hello world".to_string();
506        let mut event = LogEvent::from(message);
507        event.insert("key", "value");
508
509        let sink_config = GcsSinkConfig {
510            key_prefix: Some("key: {{ key }}".into()),
511            ..default_config((None::<FramingConfig>, TextSerializerConfig::default()).into())
512        };
513        let key = sink_config
514            .key_partitioner()
515            .unwrap()
516            .partition(&Event::Log(event))
517            .expect("key wasn't provided");
518
519        assert_eq!(key, "key: value");
520    }
521
522    fn request_settings(sink_config: &GcsSinkConfig, context: SinkContext) -> RequestSettings {
523        RequestSettings::new(sink_config, context).expect("Could not create request settings")
524    }
525
526    fn build_request(extension: Option<&str>, uuid: bool, compression: Compression) -> GcsRequest {
527        let context = SinkContext::default();
528        let sink_config = GcsSinkConfig {
529            key_prefix: Some("key/".into()),
530            filename_time_format: "date".into(),
531            filename_extension: extension.map(Into::into),
532            filename_append_uuid: uuid,
533            compression,
534            ..default_config(
535                (
536                    Some(NewlineDelimitedEncoderConfig::new()),
537                    JsonSerializerConfig::default(),
538                )
539                    .into(),
540            )
541        };
542        let log = LogEvent::default().into();
543        let key = sink_config
544            .key_partitioner()
545            .unwrap()
546            .partition(&log)
547            .expect("key wasn't provided");
548
549        let mut byte_size = GroupedCountByteSize::new_untagged();
550        byte_size.add_event(&log, log.estimated_json_encoded_size_of());
551
552        let request_settings = request_settings(&sink_config, context);
553        let (metadata, metadata_request_builder, _events) =
554            request_settings.split_input((key, vec![log]));
555        let payload = EncodeResult::uncompressed(Bytes::new(), byte_size);
556        let request_metadata = metadata_request_builder.build(&payload);
557
558        request_settings.build_request(metadata, request_metadata, payload)
559    }
560
561    #[test]
562    fn gcs_build_request() {
563        let req = build_request(Some("ext"), false, Compression::None);
564        assert_eq!(req.key, "key/date.ext".to_string());
565
566        let req = build_request(None, false, Compression::None);
567        assert_eq!(req.key, "key/date.log".to_string());
568
569        let req = build_request(None, false, Compression::gzip_default());
570        assert_eq!(req.key, "key/date.log.gz".to_string());
571
572        let req = build_request(None, true, Compression::gzip_default());
573        assert_ne!(req.key, "key/date.log.gz".to_string());
574    }
575}