vector/sinks/gcp/stackdriver/logs/
config.rs

1//! Configuration for the `gcp_stackdriver_logs` sink.
2
3use crate::{
4    gcp::{GcpAuthConfig, GcpAuthenticator, Scope},
5    http::HttpClient,
6    schema,
7    sinks::{
8        gcs_common::config::healthcheck_response,
9        prelude::*,
10        util::{
11            http::{http_response_retry_logic, HttpService},
12            service::TowerRequestConfigDefaults,
13            BoxedRawValue, RealtimeSizeBasedDefaultBatchSettings,
14        },
15    },
16};
17use http::{Request, Uri};
18use hyper::Body;
19use snafu::Snafu;
20use std::collections::HashMap;
21use vector_lib::lookup::lookup_v2::ConfigValuePath;
22use vrl::value::Kind;
23
24use super::{
25    encoder::StackdriverLogsEncoder, request_builder::StackdriverLogsRequestBuilder,
26    service::StackdriverLogsServiceRequestBuilder, sink::StackdriverLogsSink,
27};
28
29#[derive(Debug, Snafu)]
30enum HealthcheckError {
31    #[snafu(display("Resource not found"))]
32    NotFound,
33}
34
35#[derive(Clone, Copy, Debug)]
36pub struct StackdriverTowerRequestConfigDefaults;
37
38impl TowerRequestConfigDefaults for StackdriverTowerRequestConfigDefaults {
39    const RATE_LIMIT_NUM: u64 = 1_000;
40}
41
42/// Configuration for the `gcp_stackdriver_logs` sink.
43#[configurable_component(sink(
44    "gcp_stackdriver_logs",
45    "Deliver logs to GCP's Cloud Operations suite."
46))]
47#[derive(Clone, Debug, Default)]
48#[serde(deny_unknown_fields)]
49pub(super) struct StackdriverConfig {
50    #[serde(skip, default = "default_endpoint")]
51    pub(super) endpoint: String,
52
53    #[serde(flatten)]
54    pub(super) log_name: StackdriverLogName,
55
56    /// The log ID to which to publish logs.
57    ///
58    /// This is a name you create to identify this log stream.
59    pub(super) log_id: Template,
60
61    /// The monitored resource to associate the logs with.
62    pub(super) resource: StackdriverResource,
63
64    #[serde(flatten)]
65    pub(super) label_config: StackdriverLabelConfig,
66
67    /// The field of the log event from which to take the outgoing log’s `severity` field.
68    ///
69    /// The named field is removed from the log event if present, and must be either an integer
70    /// between 0 and 800 or a string containing one of the [severity level names][sev_names] (case
71    /// is ignored) or a common prefix such as `err`.
72    ///
73    /// If no severity key is specified, the severity of outgoing records is set to 0 (`DEFAULT`).
74    ///
75    /// See the [GCP Stackdriver Logging LogSeverity description][logsev_docs] for more details on
76    /// the value of the `severity` field.
77    ///
78    /// [sev_names]: https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#logseverity
79    /// [logsev_docs]: https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#logseverity
80    #[configurable(metadata(docs::examples = "severity"))]
81    pub(super) severity_key: Option<ConfigValuePath>,
82
83    #[serde(flatten)]
84    pub(super) auth: GcpAuthConfig,
85
86    #[configurable(derived)]
87    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
88    pub(super) encoding: Transformer,
89
90    #[configurable(derived)]
91    #[serde(default)]
92    pub(super) batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
93
94    #[configurable(derived)]
95    #[serde(default)]
96    pub(super) request: TowerRequestConfig<StackdriverTowerRequestConfigDefaults>,
97
98    #[configurable(derived)]
99    pub(super) tls: Option<TlsConfig>,
100
101    #[configurable(derived)]
102    #[serde(
103        default,
104        deserialize_with = "crate::serde::bool_or_struct",
105        skip_serializing_if = "crate::serde::is_default"
106    )]
107    acknowledgements: AcknowledgementsConfig,
108}
109
110pub(super) fn default_endpoint() -> String {
111    "https://logging.googleapis.com/v2/entries:write".to_string()
112}
113
114// 10MB limit for entries.write: https://cloud.google.com/logging/quotas#api-limits
115const MAX_BATCH_PAYLOAD_SIZE: usize = 10_000_000;
116
117/// Logging locations.
118#[configurable_component]
119#[derive(Clone, Debug, Derivative)]
120#[derivative(Default)]
121pub(super) enum StackdriverLogName {
122    /// The billing account ID to which to publish logs.
123    ///
124    ///	Exactly one of `billing_account_id`, `folder_id`, `organization_id`, or `project_id` must be set.
125    #[serde(rename = "billing_account_id")]
126    #[configurable(metadata(docs::examples = "012345-6789AB-CDEF01"))]
127    BillingAccount(String),
128
129    /// The folder ID to which to publish logs.
130    ///
131    /// See the [Google Cloud Platform folder documentation][folder_docs] for more details.
132    ///
133    ///	Exactly one of `billing_account_id`, `folder_id`, `organization_id`, or `project_id` must be set.
134    ///
135    /// [folder_docs]: https://cloud.google.com/resource-manager/docs/creating-managing-folders
136    #[serde(rename = "folder_id")]
137    #[configurable(metadata(docs::examples = "My Folder"))]
138    Folder(String),
139
140    /// The organization ID to which to publish logs.
141    ///
142    /// This would be the identifier assigned to your organization on Google Cloud Platform.
143    ///
144    ///	Exactly one of `billing_account_id`, `folder_id`, `organization_id`, or `project_id` must be set.
145    #[serde(rename = "organization_id")]
146    #[configurable(metadata(docs::examples = "622418129737"))]
147    Organization(String),
148
149    /// The project ID to which to publish logs.
150    ///
151    /// See the [Google Cloud Platform project management documentation][project_docs] for more details.
152    ///
153    ///	Exactly one of `billing_account_id`, `folder_id`, `organization_id`, or `project_id` must be set.
154    ///
155    /// [project_docs]: https://cloud.google.com/resource-manager/docs/creating-managing-projects
156    #[derivative(Default)]
157    #[serde(rename = "project_id")]
158    #[configurable(metadata(docs::examples = "vector-123456"))]
159    Project(String),
160}
161
162/// Label Configuration.
163#[configurable_component]
164#[derive(Clone, Debug, Derivative)]
165#[derivative(Default)]
166pub(super) struct StackdriverLabelConfig {
167    /// The value of this field is used to retrieve the associated labels from the `jsonPayload`
168    /// and extract their values to set as LogEntry labels.
169    #[configurable(metadata(docs::examples = "logging.googleapis.com/labels"))]
170    #[serde(default = "default_labels_key")]
171    pub(super) labels_key: Option<String>,
172
173    /// A map of key, value pairs that provides additional information about the log entry.
174    #[configurable(metadata(
175        docs::additional_props_description = "A key, value pair that describes a log entry."
176    ))]
177    #[configurable(metadata(docs::examples = "labels_examples()"))]
178    #[serde(default)]
179    pub(super) labels: HashMap<String, Template>,
180}
181
182fn labels_examples() -> HashMap<String, String> {
183    let mut example = HashMap::new();
184    example.insert("label_1".to_string(), "value_1".to_string());
185    example.insert("label_2".to_string(), "{{ template_value_2 }}".to_string());
186    example
187}
188
189pub(super) fn default_labels_key() -> Option<String> {
190    Some("logging.googleapis.com/labels".to_string())
191}
192
193/// A monitored resource.
194///
195/// Monitored resources in GCP allow associating logs and metrics specifically with native resources
196/// within Google Cloud Platform. This takes the form of a "type" field which identifies the
197/// resource, and a set of type-specific labels to uniquely identify a resource of that type.
198///
199/// See [Monitored resource types][mon_docs] for more information.
200///
201/// [mon_docs]: https://cloud.google.com/monitoring/api/resources
202// TODO: this type is specific to the stackdrivers log sink because it allows for template-able
203// label values, but we should consider replacing `sinks::gcp::GcpTypedResource` with this so both
204// the stackdriver metrics _and_ logs sink can have template-able label values, and less duplication
205#[configurable_component]
206#[derive(Clone, Debug, Default)]
207pub(super) struct StackdriverResource {
208    /// The monitored resource type.
209    ///
210    /// For example, the type of a Compute Engine VM instance is `gce_instance`.
211    /// See the [Google Cloud Platform monitored resource documentation][gcp_resources] for
212    /// more details.
213    ///
214    /// [gcp_resources]: https://cloud.google.com/monitoring/api/resources
215    #[serde(rename = "type")]
216    pub(super) type_: String,
217
218    /// Type-specific labels.
219    #[serde(flatten)]
220    #[configurable(metadata(docs::additional_props_description = "A type-specific label."))]
221    #[configurable(metadata(docs::examples = "label_examples()"))]
222    pub(super) labels: HashMap<String, Template>,
223}
224
225fn label_examples() -> HashMap<String, String> {
226    let mut example = HashMap::new();
227    example.insert("instanceId".to_string(), "Twilight".to_string());
228    example.insert("zone".to_string(), "{{ zone }}".to_string());
229    example
230}
231
232impl_generate_config_from_default!(StackdriverConfig);
233
234#[async_trait::async_trait]
235#[typetag::serde(name = "gcp_stackdriver_logs")]
236impl SinkConfig for StackdriverConfig {
237    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
238        let auth = self.auth.build(Scope::LoggingWrite).await?;
239
240        let request_builder = StackdriverLogsRequestBuilder {
241            encoder: StackdriverLogsEncoder::new(
242                self.encoding.clone(),
243                self.log_id.clone(),
244                self.log_name.clone(),
245                self.label_config.clone(),
246                self.resource.clone(),
247                self.severity_key.clone(),
248            ),
249        };
250
251        let batch_settings = self
252            .batch
253            .validate()?
254            .limit_max_bytes(MAX_BATCH_PAYLOAD_SIZE)?
255            .into_batcher_settings()?;
256
257        let request_limits = self.request.into_settings();
258
259        let tls_settings = TlsSettings::from_options(self.tls.as_ref())?;
260        let client = HttpClient::new(tls_settings, cx.proxy())?;
261
262        let uri: Uri = self.endpoint.parse()?;
263
264        let stackdriver_logs_service_request_builder = StackdriverLogsServiceRequestBuilder {
265            uri: uri.clone(),
266            auth: auth.clone(),
267        };
268
269        let service = HttpService::new(client.clone(), stackdriver_logs_service_request_builder);
270
271        let service = ServiceBuilder::new()
272            .settings(request_limits, http_response_retry_logic())
273            .service(service);
274
275        let sink = StackdriverLogsSink::new(service, batch_settings, request_builder);
276
277        let healthcheck = healthcheck(client, auth.clone(), uri).boxed();
278
279        auth.spawn_regenerate_token();
280
281        Ok((VectorSink::from_event_streamsink(sink), healthcheck))
282    }
283
284    fn input(&self) -> Input {
285        let requirement =
286            schema::Requirement::empty().required_meaning("timestamp", Kind::timestamp());
287
288        Input::log().with_schema_requirement(requirement)
289    }
290
291    fn acknowledgements(&self) -> &AcknowledgementsConfig {
292        &self.acknowledgements
293    }
294}
295
296async fn healthcheck(client: HttpClient, auth: GcpAuthenticator, uri: Uri) -> crate::Result<()> {
297    let entries: Vec<BoxedRawValue> = Vec::new();
298    let events = serde_json::json!({ "entries": entries });
299
300    let body = crate::serde::json::to_bytes(&events).unwrap().freeze();
301
302    let mut request = Request::post(uri)
303        .header("Content-Type", "application/json")
304        .body(body)
305        .unwrap();
306
307    auth.apply(&mut request);
308
309    let request = request.map(Body::from);
310
311    let response = client.send(request).await?;
312
313    healthcheck_response(response, HealthcheckError::NotFound.into())
314}