1use bytes::{Bytes, BytesMut};
5
6use futures_util::{future::BoxFuture, task::Poll};
7use goauth::scopes::Scope;
8use http::header::{self, HeaderName, HeaderValue};
9use http::{Request, StatusCode, Uri};
10use hyper::Body;
11use indoc::indoc;
12use serde::Serialize;
13use serde_json::json;
14use snafu::Snafu;
15use std::collections::HashMap;
16use std::io;
17use tokio_util::codec::Encoder as _;
18use tower::{Service, ServiceBuilder};
19use vector_lib::configurable::configurable_component;
20use vector_lib::request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata};
21use vector_lib::{
22 config::{telemetry, AcknowledgementsConfig, Input},
23 event::{Event, EventFinalizers, Finalizable},
24 sink::VectorSink,
25 EstimatedJsonEncodedSizeOf,
26};
27use vrl::value::Kind;
28
29use crate::sinks::util::service::TowerRequestConfigDefaults;
30use crate::{
31 codecs::{self, EncodingConfig},
32 config::{GenerateConfig, SinkConfig, SinkContext},
33 gcp::{GcpAuthConfig, GcpAuthenticator},
34 http::HttpClient,
35 schema,
36 sinks::{
37 gcp_chronicle::{
38 compression::ChronicleCompression,
39 partitioner::{ChroniclePartitionKey, ChroniclePartitioner},
40 sink::ChronicleSink,
41 },
42 gcs_common::{
43 config::{healthcheck_response, GcsRetryLogic},
44 service::GcsResponse,
45 },
46 util::{
47 encoding::{as_tracked_write, Encoder},
48 metadata::RequestMetadataBuilder,
49 request_builder::EncodeResult,
50 BatchConfig, Compression, RequestBuilder, SinkBatchSettings, TowerRequestConfig,
51 },
52 Healthcheck,
53 },
54 template::{Template, TemplateParseError},
55 tls::{TlsConfig, TlsSettings},
56};
57
58#[derive(Debug, Snafu)]
59#[snafu(visibility(pub))]
60pub enum GcsHealthcheckError {
61 #[snafu(display("log_type template parse error: {}", source))]
62 LogTypeTemplate { source: TemplateParseError },
63
64 #[snafu(display("Endpoint not found"))]
65 NotFound,
66}
67
68#[configurable_component]
70#[derive(Clone, Copy, Debug, Eq, PartialEq)]
71#[serde(rename_all = "snake_case")]
72pub enum Region {
73 Eu,
75
76 Us,
78
79 Asia,
81
82 SãoPaulo,
84
85 Canada,
87
88 Dammam,
90
91 Doha,
93
94 Frankfurt,
96
97 London,
99
100 Mumbai,
102
103 Paris,
105
106 Singapore,
108
109 Sydney,
111
112 TelAviv,
114
115 Tokyo,
117
118 Turin,
120
121 Zurich,
123}
124
125impl Region {
126 const fn endpoint(self) -> &'static str {
128 match self {
129 Region::Eu => "https://europe-malachiteingestion-pa.googleapis.com",
130 Region::Us => "https://malachiteingestion-pa.googleapis.com",
131 Region::Asia => "https://asia-southeast1-malachiteingestion-pa.googleapis.com",
132 Region::SãoPaulo => "https://southamerica-east1-malachiteingestion-pa.googleapis.com",
133 Region::Canada => {
134 "https://northamerica-northeast2-malachiteingestion-pa.googleapis.com"
135 }
136 Region::Dammam => "https://me-central2-malachiteingestion-pa.googleapis.com",
137 Region::Doha => "https://me-central1-malachiteingestion-pa.googleapis.com",
138 Region::Frankfurt => "https://europe-west3-malachiteingestion-pa.googleapis.com",
139 Region::London => "https://europe-west2-malachiteingestion-pa.googleapis.com",
140 Region::Mumbai => "https://asia-south1-malachiteingestion-pa.googleapis.com",
141 Region::Paris => "https://europe-west9-malachiteingestion-pa.googleapis.com",
142 Region::Singapore => "https://asia-southeast1-malachiteingestion-pa.googleapis.com",
143 Region::Sydney => "https://australia-southeast1-malachiteingestion-pa.googleapis.com",
144 Region::TelAviv => "https://me-west1-malachiteingestion-pa.googleapis.com",
145 Region::Tokyo => "https://asia-northeast1-malachiteingestion-pa.googleapis.com",
146 Region::Turin => "https://europe-west12-malachiteingestion-pa.googleapis.com",
147 Region::Zurich => "https://europe-west6-malachiteingestion-pa.googleapis.com",
148 }
149 }
150}
151
152#[derive(Clone, Copy, Debug, Default)]
153pub struct ChronicleUnstructuredDefaultBatchSettings;
154
155impl SinkBatchSettings for ChronicleUnstructuredDefaultBatchSettings {
161 const MAX_EVENTS: Option<usize> = None;
162 const MAX_BYTES: Option<usize> = Some(1_000_000);
163 const TIMEOUT_SECS: f64 = 15.0;
164}
165
166#[derive(Clone, Copy, Debug)]
167pub struct ChronicleUnstructuredTowerRequestConfigDefaults;
168
169impl TowerRequestConfigDefaults for ChronicleUnstructuredTowerRequestConfigDefaults {
170 const RATE_LIMIT_NUM: u64 = 1_000;
171}
172
173#[configurable_component(sink(
175 "gcp_chronicle_unstructured",
176 "Store unstructured log events in Google Chronicle."
177))]
178#[derive(Clone, Debug)]
179pub struct ChronicleUnstructuredConfig {
180 #[configurable(metadata(
182 docs::examples = "127.0.0.1:8080",
183 docs::examples = "example.com:12345"
184 ))]
185 pub endpoint: Option<String>,
186
187 #[configurable(derived)]
189 pub region: Option<Region>,
190
191 #[configurable(validation(format = "uuid"))]
193 #[configurable(metadata(docs::examples = "c8c65bfa-5f2c-42d4-9189-64bb7b939f2c"))]
194 pub customer_id: String,
195
196 #[configurable(metadata(docs::templateable))]
198 #[configurable(metadata(
199 docs::examples = "production",
200 docs::examples = "production-{{ namespace }}",
201 ))]
202 #[configurable(metadata(docs::advanced))]
203 pub namespace: Option<Template>,
204
205 #[configurable(metadata(docs::examples = "chronicle_labels_examples()"))]
207 #[configurable(metadata(docs::additional_props_description = "A Chronicle label."))]
208 pub labels: Option<HashMap<String, String>>,
209
210 #[serde(flatten)]
211 pub auth: GcpAuthConfig,
212
213 #[configurable(derived)]
214 #[serde(default)]
215 pub batch: BatchConfig<ChronicleUnstructuredDefaultBatchSettings>,
216
217 #[configurable(derived)]
218 pub encoding: EncodingConfig,
219
220 #[serde(default)]
221 #[configurable(derived)]
222 pub compression: ChronicleCompression,
223
224 #[configurable(derived)]
225 #[serde(default)]
226 pub request: TowerRequestConfig<ChronicleUnstructuredTowerRequestConfigDefaults>,
227
228 #[configurable(derived)]
229 pub tls: Option<TlsConfig>,
230
231 #[configurable(metadata(docs::examples = "WINDOWS_DNS", docs::examples = "{{ log_type }}"))]
238 pub log_type: Template,
239
240 #[configurable(metadata(docs::examples = "VECTOR_DEV"))]
242 pub fallback_log_type: Option<String>,
243
244 #[configurable(derived)]
245 #[serde(
246 default,
247 deserialize_with = "crate::serde::bool_or_struct",
248 skip_serializing_if = "crate::serde::is_default"
249 )]
250 acknowledgements: AcknowledgementsConfig,
251}
252
253fn chronicle_labels_examples() -> HashMap<String, String> {
254 let mut examples = HashMap::new();
255 examples.insert("source".to_string(), "vector".to_string());
256 examples.insert("tenant".to_string(), "marketing".to_string());
257 examples
258}
259
260impl GenerateConfig for ChronicleUnstructuredConfig {
261 fn generate_config() -> toml::Value {
262 toml::from_str(indoc! {r#"
263 credentials_path = "/path/to/credentials.json"
264 customer_id = "customer_id"
265 namespace = "namespace"
266 compression = "gzip"
267 log_type = "log_type"
268 fallback_log_type = "VECTOR_DEV"
269 encoding.codec = "text"
270 "#})
271 .unwrap()
272 }
273}
274
275pub fn build_healthcheck(
276 client: HttpClient,
277 base_url: &str,
278 auth: GcpAuthenticator,
279) -> crate::Result<Healthcheck> {
280 let uri = base_url.parse::<Uri>()?;
281
282 let healthcheck = async move {
283 let mut request = http::Request::get(&uri).body(Body::empty())?;
284 auth.apply(&mut request);
285
286 let response = client.send(request).await?;
287 healthcheck_response(response, GcsHealthcheckError::NotFound.into())
288 };
289
290 Ok(Box::pin(healthcheck))
291}
292
293#[derive(Debug, Snafu)]
294pub enum ChronicleError {
295 #[snafu(display("Region or endpoint not defined"))]
296 RegionOrEndpoint,
297 #[snafu(display("You can only specify one of region or endpoint"))]
298 BothRegionAndEndpoint,
299}
300
301#[async_trait::async_trait]
302#[typetag::serde(name = "gcp_chronicle_unstructured")]
303impl SinkConfig for ChronicleUnstructuredConfig {
304 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
305 let creds = self.auth.build(Scope::MalachiteIngestion).await?;
306
307 let tls = TlsSettings::from_options(self.tls.as_ref())?;
308 let client = HttpClient::new(tls, cx.proxy())?;
309
310 let endpoint = self.create_endpoint("v2/unstructuredlogentries:batchCreate")?;
311
312 let healthcheck_endpoint = self.create_endpoint("v2/logtypes")?;
314
315 let healthcheck = build_healthcheck(client.clone(), &healthcheck_endpoint, creds.clone())?;
316 creds.spawn_regenerate_token();
317 let sink = self.build_sink(client, endpoint, creds)?;
318
319 Ok((sink, healthcheck))
320 }
321
322 fn input(&self) -> Input {
323 let requirement =
324 schema::Requirement::empty().required_meaning("timestamp", Kind::timestamp());
325
326 Input::log().with_schema_requirement(requirement)
327 }
328
329 fn acknowledgements(&self) -> &AcknowledgementsConfig {
330 &self.acknowledgements
331 }
332}
333
334impl ChronicleUnstructuredConfig {
335 fn build_sink(
336 &self,
337 client: HttpClient,
338 base_url: String,
339 creds: GcpAuthenticator,
340 ) -> crate::Result<VectorSink> {
341 use crate::sinks::util::service::ServiceBuilderExt;
342
343 let request = self.request.into_settings();
344
345 let batch_settings = self.batch.into_batcher_settings()?;
346
347 let partitioner = self.partitioner()?;
348
349 let svc = ServiceBuilder::new()
350 .settings(request, GcsRetryLogic::default())
351 .service(ChronicleService::new(client, base_url, creds));
352
353 let request_settings = ChronicleRequestBuilder::new(self)?;
354
355 let sink = ChronicleSink::new(svc, request_settings, partitioner, batch_settings, "http");
356
357 Ok(VectorSink::from_event_streamsink(sink))
358 }
359
360 fn partitioner(&self) -> crate::Result<ChroniclePartitioner> {
361 Ok(ChroniclePartitioner::new(
362 self.log_type.clone(),
363 self.fallback_log_type.clone(),
364 self.namespace.clone(),
365 ))
366 }
367
368 fn create_endpoint(&self, path: &str) -> Result<String, ChronicleError> {
369 Ok(format!(
370 "{}/{}",
371 match (&self.endpoint, self.region) {
372 (Some(endpoint), None) => endpoint.trim_end_matches('/'),
373 (None, Some(region)) => region.endpoint(),
374 (Some(_), Some(_)) => return Err(ChronicleError::BothRegionAndEndpoint),
375 (None, None) => return Err(ChronicleError::RegionOrEndpoint),
376 },
377 path
378 ))
379 }
380}
381
382#[derive(Clone, Debug)]
383pub struct ChronicleRequest {
384 pub body: Bytes,
385 pub finalizers: EventFinalizers,
386 pub headers: HashMap<HeaderName, HeaderValue>,
387 metadata: RequestMetadata,
388}
389
390impl Finalizable for ChronicleRequest {
391 fn take_finalizers(&mut self) -> EventFinalizers {
392 std::mem::take(&mut self.finalizers)
393 }
394}
395
396impl MetaDescriptive for ChronicleRequest {
397 fn get_metadata(&self) -> &RequestMetadata {
398 &self.metadata
399 }
400
401 fn metadata_mut(&mut self) -> &mut RequestMetadata {
402 &mut self.metadata
403 }
404}
405
406#[derive(Clone, Debug, Serialize)]
407struct ChronicleRequestBody {
408 customer_id: String,
409 #[serde(skip_serializing_if = "Option::is_none")]
410 namespace: Option<String>,
411 #[serde(skip_serializing_if = "Option::is_none")]
412 labels: Option<Vec<Label>>,
413 log_type: String,
414 entries: Vec<serde_json::Value>,
415}
416
417#[derive(Clone, Debug)]
418struct ChronicleEncoder {
419 customer_id: String,
420 labels: Option<Vec<Label>>,
421 encoder: codecs::Encoder<()>,
422 transformer: codecs::Transformer,
423}
424
425impl Encoder<(ChroniclePartitionKey, Vec<Event>)> for ChronicleEncoder {
426 fn encode_input(
427 &self,
428 input: (ChroniclePartitionKey, Vec<Event>),
429 writer: &mut dyn io::Write,
430 ) -> io::Result<(usize, GroupedCountByteSize)> {
431 let (key, events) = input;
432 let mut encoder = self.encoder.clone();
433 let mut byte_size = telemetry().create_request_count_byte_size();
434 let events = events
435 .into_iter()
436 .filter_map(|mut event| {
437 let timestamp = event
438 .as_log()
439 .get_timestamp()
440 .and_then(|ts| ts.as_timestamp())
441 .cloned();
442 let mut bytes = BytesMut::new();
443 self.transformer.transform(&mut event);
444
445 byte_size.add_event(&event, event.estimated_json_encoded_size_of());
446
447 encoder.encode(event, &mut bytes).ok()?;
448
449 let mut value = json!({
450 "log_text": String::from_utf8_lossy(&bytes),
451 });
452
453 if let Some(ts) = timestamp {
454 value.as_object_mut().unwrap().insert(
455 "ts_rfc3339".to_string(),
456 ts.to_rfc3339_opts(chrono::SecondsFormat::AutoSi, true)
457 .into(),
458 );
459 }
460
461 Some(value)
462 })
463 .collect::<Vec<_>>();
464
465 let json = json!(ChronicleRequestBody {
466 customer_id: self.customer_id.clone(),
467 namespace: key.namespace,
468 labels: self.labels.clone(),
469 log_type: key.log_type,
470 entries: events,
471 });
472
473 let size = as_tracked_write::<_, _, io::Error>(writer, &json, |writer, json| {
474 serde_json::to_writer(writer, json)?;
475 Ok(())
476 })?;
477
478 Ok((size, byte_size))
479 }
480}
481
482#[derive(Clone, Debug)]
486struct ChronicleRequestBuilder {
487 encoder: ChronicleEncoder,
488 compression: Compression,
489}
490
491struct ChronicleRequestPayload {
492 bytes: Bytes,
493}
494
495impl From<Bytes> for ChronicleRequestPayload {
496 fn from(bytes: Bytes) -> Self {
497 Self { bytes }
498 }
499}
500
501impl AsRef<[u8]> for ChronicleRequestPayload {
502 fn as_ref(&self) -> &[u8] {
503 self.bytes.as_ref()
504 }
505}
506
507impl RequestBuilder<(ChroniclePartitionKey, Vec<Event>)> for ChronicleRequestBuilder {
508 type Metadata = EventFinalizers;
509 type Events = (ChroniclePartitionKey, Vec<Event>);
510 type Encoder = ChronicleEncoder;
511 type Payload = ChronicleRequestPayload;
512 type Request = ChronicleRequest;
513 type Error = io::Error;
514
515 fn compression(&self) -> Compression {
516 self.compression
517 }
518
519 fn encoder(&self) -> &Self::Encoder {
520 &self.encoder
521 }
522
523 fn split_input(
524 &self,
525 input: (ChroniclePartitionKey, Vec<Event>),
526 ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
527 let (partition_key, mut events) = input;
528 let finalizers = events.take_finalizers();
529
530 let builder = RequestMetadataBuilder::from_events(&events);
531 (finalizers, builder, (partition_key, events))
532 }
533
534 fn build_request(
535 &self,
536 finalizers: Self::Metadata,
537 metadata: RequestMetadata,
538 payload: EncodeResult<Self::Payload>,
539 ) -> Self::Request {
540 let mut headers = HashMap::new();
541 headers.insert(
542 header::CONTENT_TYPE,
543 HeaderValue::from_static("application/json"),
544 );
545
546 match payload.compressed_byte_size {
547 Some(compressed_byte_size) => {
548 headers.insert(
549 header::CONTENT_LENGTH,
550 HeaderValue::from_str(&compressed_byte_size.to_string()).unwrap(),
551 );
552 headers.insert(
553 header::CONTENT_ENCODING,
554 HeaderValue::from_str(self.compression.content_encoding().unwrap()).unwrap(),
555 );
556 }
557 None => {
558 headers.insert(
559 header::CONTENT_LENGTH,
560 HeaderValue::from_str(&payload.uncompressed_byte_size.to_string()).unwrap(),
561 );
562 }
563 }
564
565 ChronicleRequest {
566 headers,
567 body: payload.into_payload().bytes,
568 finalizers,
569 metadata,
570 }
571 }
572}
573
574#[derive(Clone, Debug, Serialize)]
575struct Label {
576 key: String,
577 value: String,
578}
579
580impl ChronicleRequestBuilder {
581 fn new(config: &ChronicleUnstructuredConfig) -> crate::Result<Self> {
582 let transformer = config.encoding.transformer();
583 let serializer = config.encoding.config().build()?;
584 let compression = Compression::from(config.compression);
585 let encoder = crate::codecs::Encoder::<()>::new(serializer);
586 let encoder = ChronicleEncoder {
587 customer_id: config.customer_id.clone(),
588 labels: config.labels.as_ref().map(|labs| {
589 labs.iter()
590 .map(|(k, v)| Label {
591 key: k.to_string(),
592 value: v.to_string(),
593 })
594 .collect::<Vec<_>>()
595 }),
596 encoder,
597 transformer,
598 };
599 Ok(Self {
600 encoder,
601 compression,
602 })
603 }
604}
605
606#[derive(Debug, Clone)]
607pub struct ChronicleService {
608 client: HttpClient,
609 base_url: String,
610 creds: GcpAuthenticator,
611}
612
613impl ChronicleService {
614 pub const fn new(client: HttpClient, base_url: String, creds: GcpAuthenticator) -> Self {
615 Self {
616 client,
617 base_url,
618 creds,
619 }
620 }
621}
622
623#[derive(Debug, Snafu)]
624pub enum ChronicleResponseError {
625 #[snafu(display("Server responded with an error: {}", code))]
626 ServerError { code: StatusCode },
627 #[snafu(display("Failed to make HTTP(S) request: {}", error))]
628 HttpError { error: crate::http::HttpError },
629}
630
631impl Service<ChronicleRequest> for ChronicleService {
632 type Response = GcsResponse;
633 type Error = ChronicleResponseError;
634 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
635
636 fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
637 Poll::Ready(Ok(()))
638 }
639
640 fn call(&mut self, request: ChronicleRequest) -> Self::Future {
641 let mut builder = Request::post(&self.base_url);
642 let metadata = request.get_metadata().clone();
643
644 let headers = builder.headers_mut().unwrap();
645 for (name, value) in request.headers {
646 headers.insert(name, value);
647 }
648
649 let mut http_request = builder.body(Body::from(request.body)).unwrap();
650 self.creds.apply(&mut http_request);
651
652 let mut client = self.client.clone();
653 Box::pin(async move {
654 match client.call(http_request).await {
655 Ok(response) => {
656 let status = response.status();
657 if status.is_success() {
658 Ok(GcsResponse {
659 inner: response,
660 metadata,
661 })
662 } else {
663 Err(ChronicleResponseError::ServerError { code: status })
664 }
665 }
666 Err(error) => Err(ChronicleResponseError::HttpError { error }),
667 }
668 })
669 }
670}
671
672#[cfg(all(test, feature = "chronicle-integration-tests"))]
673mod integration_tests {
674 use reqwest::{Client, Method, Response};
675 use serde::{Deserialize, Serialize};
676 use vector_lib::event::{BatchNotifier, BatchStatus};
677
678 use super::*;
679 use crate::test_util::{
680 components::{
681 run_and_assert_sink_compliance, run_and_assert_sink_error, COMPONENT_ERROR_TAGS,
682 SINK_TAGS,
683 },
684 random_events_with_stream, random_string, trace_init,
685 };
686
687 const ADDRESS_ENV_VAR: &str = "CHRONICLE_ADDRESS";
688
689 fn config(log_type: &str, auth_path: &str) -> ChronicleUnstructuredConfig {
690 let address = std::env::var(ADDRESS_ENV_VAR).unwrap();
691 let config = format!(
692 indoc! { r#"
693 endpoint = "{}"
694 customer_id = "customer id"
695 namespace = "namespace"
696 credentials_path = "{}"
697 log_type = "{}"
698 encoding.codec = "text"
699 "# },
700 address, auth_path, log_type
701 );
702
703 let config: ChronicleUnstructuredConfig = toml::from_str(&config).unwrap();
704 config
705 }
706
707 async fn config_build(
708 log_type: &str,
709 auth_path: &str,
710 ) -> crate::Result<(VectorSink, crate::sinks::Healthcheck)> {
711 let cx = SinkContext::default();
712 config(log_type, auth_path).build(cx).await
713 }
714
715 #[tokio::test]
716 async fn publish_events() {
717 trace_init();
718
719 let log_type = random_string(10);
720 let (sink, healthcheck) =
721 config_build(&log_type, "/home/vector/scripts/integration/gcp/auth.json")
722 .await
723 .expect("Building sink failed");
724
725 healthcheck.await.expect("Health check failed");
726
727 let (batch, mut receiver) = BatchNotifier::new_with_receiver();
728 let (input, events) = random_events_with_stream(100, 100, Some(batch));
729 run_and_assert_sink_compliance(sink, events, &SINK_TAGS).await;
730 assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
731
732 let response = pull_messages(&log_type).await;
733 let messages = response
734 .into_iter()
735 .map(|message| message.log_text)
736 .collect::<Vec<_>>();
737 assert_eq!(input.len(), messages.len());
738 for i in 0..input.len() {
739 let data = serde_json::to_value(&messages[i]).unwrap();
740 let expected = serde_json::to_value(input[i].as_log().get("message").unwrap()).unwrap();
741 assert_eq!(data, expected);
742 }
743 }
744
745 #[tokio::test]
746 async fn invalid_credentials() {
747 trace_init();
748
749 let log_type = random_string(10);
750 let sink = config_build(
752 &log_type,
753 "/home/vector/scripts/integration/gcp/invalidauth.json",
754 )
755 .await;
756
757 assert!(sink.is_err())
758 }
759
760 #[tokio::test]
761 async fn publish_invalid_events() {
762 trace_init();
763
764 let log_type = "INVALID";
767 let (sink, healthcheck) =
768 config_build(log_type, "/home/vector/scripts/integration/gcp/auth.json")
769 .await
770 .expect("Building sink failed");
771
772 healthcheck.await.expect("Health check failed");
773
774 let (batch, mut receiver) = BatchNotifier::new_with_receiver();
775 let (_input, events) = random_events_with_stream(100, 100, Some(batch));
776 run_and_assert_sink_error(sink, events, &COMPONENT_ERROR_TAGS).await;
777 assert_eq!(receiver.try_recv(), Ok(BatchStatus::Rejected));
778 }
779
780 #[derive(Clone, Debug, Deserialize, Serialize)]
781 pub struct Log {
782 customer_id: String,
783 namespace: String,
784 log_type: String,
785 log_text: String,
786 ts_rfc3339: String,
787 }
788
789 async fn request(method: Method, path: &str, log_type: &str) -> Response {
790 let address = std::env::var(ADDRESS_ENV_VAR).unwrap();
791 let url = format!("{address}/{path}");
792 Client::new()
793 .request(method.clone(), &url)
794 .query(&[("log_type", log_type)])
795 .send()
796 .await
797 .unwrap_or_else(|_| panic!("Sending {method} request to {url} failed"))
798 }
799
800 async fn pull_messages(log_type: &str) -> Vec<Log> {
801 request(Method::GET, "logs", log_type)
802 .await
803 .json::<Vec<Log>>()
804 .await
805 .expect("Extracting pull data failed")
806 }
807}