vector/sinks/gcp/stackdriver/logs/
config.rs

1//! Configuration for the `gcp_stackdriver_logs` sink.
2
3use std::collections::HashMap;
4
5use http::{Request, Uri};
6use hyper::Body;
7use snafu::Snafu;
8use vector_lib::lookup::lookup_v2::ConfigValuePath;
9use vrl::value::Kind;
10
11use super::{
12    encoder::StackdriverLogsEncoder, request_builder::StackdriverLogsRequestBuilder,
13    service::StackdriverLogsServiceRequestBuilder, sink::StackdriverLogsSink,
14};
15use crate::{
16    gcp::{GcpAuthConfig, GcpAuthenticator, Scope},
17    http::HttpClient,
18    schema,
19    sinks::{
20        gcs_common::config::healthcheck_response,
21        prelude::*,
22        util::{
23            BoxedRawValue, RealtimeSizeBasedDefaultBatchSettings,
24            http::{HttpService, http_response_retry_logic},
25            service::TowerRequestConfigDefaults,
26        },
27    },
28};
29
30#[derive(Debug, Snafu)]
31enum HealthcheckError {
32    #[snafu(display("Resource not found"))]
33    NotFound,
34}
35
36#[derive(Clone, Copy, Debug)]
37pub struct StackdriverTowerRequestConfigDefaults;
38
39impl TowerRequestConfigDefaults for StackdriverTowerRequestConfigDefaults {
40    const RATE_LIMIT_NUM: u64 = 1_000;
41}
42
43/// Configuration for the `gcp_stackdriver_logs` sink.
44#[configurable_component(sink(
45    "gcp_stackdriver_logs",
46    "Deliver logs to GCP's Cloud Operations suite."
47))]
48#[derive(Clone, Debug, Default)]
49#[serde(deny_unknown_fields)]
50pub(super) struct StackdriverConfig {
51    #[serde(skip, default = "default_endpoint")]
52    pub(super) endpoint: String,
53
54    #[serde(flatten)]
55    pub(super) log_name: StackdriverLogName,
56
57    /// The log ID to which to publish logs.
58    ///
59    /// This is a name you create to identify this log stream.
60    pub(super) log_id: Template,
61
62    /// The monitored resource to associate the logs with.
63    pub(super) resource: StackdriverResource,
64
65    #[serde(flatten)]
66    pub(super) label_config: StackdriverLabelConfig,
67
68    /// The field of the log event from which to take the outgoing log’s `severity` field.
69    ///
70    /// The named field is removed from the log event if present, and must be either an integer
71    /// between 0 and 800 or a string containing one of the [severity level names][sev_names] (case
72    /// is ignored) or a common prefix such as `err`.
73    ///
74    /// If no severity key is specified, the severity of outgoing records is set to 0 (`DEFAULT`).
75    ///
76    /// See the [GCP Stackdriver Logging LogSeverity description][logsev_docs] for more details on
77    /// the value of the `severity` field.
78    ///
79    /// [sev_names]: https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#logseverity
80    /// [logsev_docs]: https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#logseverity
81    #[configurable(metadata(docs::examples = "severity"))]
82    pub(super) severity_key: Option<ConfigValuePath>,
83
84    #[serde(flatten)]
85    pub(super) auth: GcpAuthConfig,
86
87    #[configurable(derived)]
88    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
89    pub(super) encoding: Transformer,
90
91    #[configurable(derived)]
92    #[serde(default)]
93    pub(super) batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
94
95    #[configurable(derived)]
96    #[serde(default)]
97    pub(super) request: TowerRequestConfig<StackdriverTowerRequestConfigDefaults>,
98
99    #[configurable(derived)]
100    pub(super) tls: Option<TlsConfig>,
101
102    #[configurable(derived)]
103    #[serde(
104        default,
105        deserialize_with = "crate::serde::bool_or_struct",
106        skip_serializing_if = "crate::serde::is_default"
107    )]
108    acknowledgements: AcknowledgementsConfig,
109}
110
111pub(super) fn default_endpoint() -> String {
112    "https://logging.googleapis.com/v2/entries:write".to_string()
113}
114
115// 10MB limit for entries.write: https://cloud.google.com/logging/quotas#api-limits
116const MAX_BATCH_PAYLOAD_SIZE: usize = 10_000_000;
117
118/// Logging locations.
119#[configurable_component]
120#[derive(Clone, Debug, Derivative)]
121#[derivative(Default)]
122pub(super) enum StackdriverLogName {
123    /// The billing account ID to which to publish logs.
124    ///
125    ///	Exactly one of `billing_account_id`, `folder_id`, `organization_id`, or `project_id` must be set.
126    #[serde(rename = "billing_account_id")]
127    #[configurable(metadata(docs::examples = "012345-6789AB-CDEF01"))]
128    BillingAccount(String),
129
130    /// The folder ID to which to publish logs.
131    ///
132    /// See the [Google Cloud Platform folder documentation][folder_docs] for more details.
133    ///
134    ///	Exactly one of `billing_account_id`, `folder_id`, `organization_id`, or `project_id` must be set.
135    ///
136    /// [folder_docs]: https://cloud.google.com/resource-manager/docs/creating-managing-folders
137    #[serde(rename = "folder_id")]
138    #[configurable(metadata(docs::examples = "My Folder"))]
139    Folder(String),
140
141    /// The organization ID to which to publish logs.
142    ///
143    /// This would be the identifier assigned to your organization on Google Cloud Platform.
144    ///
145    ///	Exactly one of `billing_account_id`, `folder_id`, `organization_id`, or `project_id` must be set.
146    #[serde(rename = "organization_id")]
147    #[configurable(metadata(docs::examples = "622418129737"))]
148    Organization(String),
149
150    /// The project ID to which to publish logs.
151    ///
152    /// See the [Google Cloud Platform project management documentation][project_docs] for more details.
153    ///
154    ///	Exactly one of `billing_account_id`, `folder_id`, `organization_id`, or `project_id` must be set.
155    ///
156    /// [project_docs]: https://cloud.google.com/resource-manager/docs/creating-managing-projects
157    #[derivative(Default)]
158    #[serde(rename = "project_id")]
159    #[configurable(metadata(docs::examples = "vector-123456"))]
160    Project(String),
161}
162
163/// Label Configuration.
164#[configurable_component]
165#[derive(Clone, Debug, Derivative)]
166#[derivative(Default)]
167pub(super) struct StackdriverLabelConfig {
168    /// The value of this field is used to retrieve the associated labels from the `jsonPayload`
169    /// and extract their values to set as LogEntry labels.
170    #[configurable(metadata(docs::examples = "logging.googleapis.com/labels"))]
171    #[serde(default = "default_labels_key")]
172    pub(super) labels_key: Option<String>,
173
174    /// A map of key, value pairs that provides additional information about the log entry.
175    #[configurable(metadata(
176        docs::additional_props_description = "A key, value pair that describes a log entry."
177    ))]
178    #[configurable(metadata(docs::examples = "labels_examples()"))]
179    #[serde(default)]
180    pub(super) labels: HashMap<String, Template>,
181}
182
183fn labels_examples() -> HashMap<String, String> {
184    let mut example = HashMap::new();
185    example.insert("label_1".to_string(), "value_1".to_string());
186    example.insert("label_2".to_string(), "{{ template_value_2 }}".to_string());
187    example
188}
189
190pub(super) fn default_labels_key() -> Option<String> {
191    Some("logging.googleapis.com/labels".to_string())
192}
193
194/// A monitored resource.
195///
196/// Monitored resources in GCP allow associating logs and metrics specifically with native resources
197/// within Google Cloud Platform. This takes the form of a "type" field which identifies the
198/// resource, and a set of type-specific labels to uniquely identify a resource of that type.
199///
200/// See [Monitored resource types][mon_docs] for more information.
201///
202/// [mon_docs]: https://cloud.google.com/monitoring/api/resources
203// TODO: this type is specific to the stackdrivers log sink because it allows for template-able
204// label values, but we should consider replacing `sinks::gcp::GcpTypedResource` with this so both
205// the stackdriver metrics _and_ logs sink can have template-able label values, and less duplication
206#[configurable_component]
207#[derive(Clone, Debug, Default)]
208pub(super) struct StackdriverResource {
209    /// The monitored resource type.
210    ///
211    /// For example, the type of a Compute Engine VM instance is `gce_instance`.
212    /// See the [Google Cloud Platform monitored resource documentation][gcp_resources] for
213    /// more details.
214    ///
215    /// [gcp_resources]: https://cloud.google.com/monitoring/api/resources
216    #[serde(rename = "type")]
217    pub(super) type_: String,
218
219    /// Type-specific labels.
220    #[serde(flatten)]
221    #[configurable(metadata(docs::additional_props_description = "A type-specific label."))]
222    #[configurable(metadata(docs::examples = "label_examples()"))]
223    pub(super) labels: HashMap<String, Template>,
224}
225
226fn label_examples() -> HashMap<String, String> {
227    let mut example = HashMap::new();
228    example.insert("instanceId".to_string(), "Twilight".to_string());
229    example.insert("zone".to_string(), "{{ zone }}".to_string());
230    example
231}
232
233impl_generate_config_from_default!(StackdriverConfig);
234
235#[async_trait::async_trait]
236#[typetag::serde(name = "gcp_stackdriver_logs")]
237impl SinkConfig for StackdriverConfig {
238    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
239        let auth = self.auth.build(Scope::LoggingWrite).await?;
240
241        let request_builder = StackdriverLogsRequestBuilder {
242            encoder: StackdriverLogsEncoder::new(
243                self.encoding.clone(),
244                self.log_id.clone(),
245                self.log_name.clone(),
246                self.label_config.clone(),
247                self.resource.clone(),
248                self.severity_key.clone(),
249            ),
250        };
251
252        let batch_settings = self
253            .batch
254            .validate()?
255            .limit_max_bytes(MAX_BATCH_PAYLOAD_SIZE)?
256            .into_batcher_settings()?;
257
258        let request_limits = self.request.into_settings();
259
260        let tls_settings = TlsSettings::from_options(self.tls.as_ref())?;
261        let client = HttpClient::new(tls_settings, cx.proxy())?;
262
263        let uri: Uri = self.endpoint.parse()?;
264
265        let stackdriver_logs_service_request_builder = StackdriverLogsServiceRequestBuilder {
266            uri: uri.clone(),
267            auth: auth.clone(),
268        };
269
270        let service = HttpService::new(client.clone(), stackdriver_logs_service_request_builder);
271
272        let service = ServiceBuilder::new()
273            .settings(request_limits, http_response_retry_logic())
274            .service(service);
275
276        let sink = StackdriverLogsSink::new(service, batch_settings, request_builder);
277
278        let healthcheck = healthcheck(client, auth.clone(), uri).boxed();
279
280        auth.spawn_regenerate_token();
281
282        Ok((VectorSink::from_event_streamsink(sink), healthcheck))
283    }
284
285    fn input(&self) -> Input {
286        let requirement =
287            schema::Requirement::empty().required_meaning("timestamp", Kind::timestamp());
288
289        Input::log().with_schema_requirement(requirement)
290    }
291
292    fn acknowledgements(&self) -> &AcknowledgementsConfig {
293        &self.acknowledgements
294    }
295}
296
297async fn healthcheck(client: HttpClient, auth: GcpAuthenticator, uri: Uri) -> crate::Result<()> {
298    let entries: Vec<BoxedRawValue> = Vec::new();
299    let events = serde_json::json!({ "entries": entries });
300
301    let body = crate::serde::json::to_bytes(&events).unwrap().freeze();
302
303    let mut request = Request::post(uri)
304        .header("Content-Type", "application/json")
305        .body(body)
306        .unwrap();
307
308    auth.apply(&mut request);
309
310    let request = request.map(Body::from);
311
312    let response = client.send(request).await?;
313
314    healthcheck_response(response, HealthcheckError::NotFound.into())
315}