vector/sources/okta/
client.rs

1use std::{sync::Arc, time::Duration};
2
3use bytes::{Bytes, BytesMut};
4use chrono::Utc;
5use futures::StreamExt as _;
6use futures_util::{FutureExt, Stream, stream};
7use http::Uri;
8use hyper::{Body, Request};
9use percent_encoding::utf8_percent_encode;
10use serde_with::serde_as;
11use tokio::sync::Mutex;
12use tokio_stream::wrappers::IntervalStream;
13use tokio_util::codec::Decoder as _;
14use vector_lib::{
15    EstimatedJsonEncodedSizeOf,
16    codecs::{
17        JsonDeserializerConfig, StreamDecodingError,
18        decoding::{DeserializerConfig, FramingConfig},
19    },
20    config::{LogNamespace, SourceOutput, proxy::ProxyConfig},
21    configurable::configurable_component,
22    event::Event,
23    json_size::JsonSize,
24    shutdown::ShutdownSignal,
25    tls::TlsConfig,
26};
27
28use crate::{
29    SourceSender,
30    codecs::{Decoder, DecodingConfig},
31    config::{SourceConfig, SourceContext},
32    http::{HttpClient, HttpError},
33    internal_events::{
34        DecoderDeserializeError, EndpointBytesReceived, HttpClientEventsReceived,
35        HttpClientHttpError, HttpClientHttpResponseError, StreamClosedError,
36    },
37    sources,
38    sources::util::http_client::{default_interval, default_timeout, warn_if_interval_too_low},
39    tls::TlsSettings,
40};
41
42/// Configuration for the `okta` source.
43#[serde_as]
44#[configurable_component(source("okta", "Pull Okta system logs via the Okta API",))]
45#[derive(Clone, Debug)]
46pub struct OktaConfig {
47    /// The Okta subdomain to scrape
48    #[configurable(metadata(docs::examples = "foo.okta.com"))]
49    pub domain: String,
50
51    /// API token for authentication
52    #[configurable(metadata(docs::examples = "00xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"))]
53    pub token: String,
54
55    /// The interval between scrapes. Requests are run concurrently so if a scrape takes longer
56    /// than the interval, a new scrape will be started. This can take extra resources, set the timeout
57    /// to a value lower than the scrape interval to prevent this from happening.
58    #[serde(default = "default_interval")]
59    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
60    #[serde(rename = "scrape_interval_secs")]
61    #[configurable(metadata(docs::human_name = "Scrape Interval"))]
62    pub interval: Duration,
63
64    /// The timeout for each scrape request.
65    #[serde(default = "default_timeout")]
66    #[serde_as(as = "serde_with::DurationSecondsWithFrac<f64>")]
67    #[serde(rename = "scrape_timeout_secs")]
68    #[configurable(metadata(docs::human_name = "Scrape Timeout"))]
69    pub timeout: Duration,
70
71    /// The time to look back for logs. This is used to determine the start time of the first request
72    /// (that is, the earliest log to fetch)
73    #[configurable(metadata(docs::human_name = "Since (seconds before now)"))]
74    pub since: Option<u64>,
75
76    /// TLS configuration.
77    #[configurable(derived)]
78    pub tls: Option<TlsConfig>,
79
80    /// The namespace to use for logs. This overrides the global setting.
81    #[configurable(metadata(docs::hidden))]
82    #[serde(default)]
83    pub log_namespace: Option<bool>,
84}
85
86impl Default for OktaConfig {
87    fn default() -> Self {
88        Self {
89            domain: "".to_string(),
90            token: "".to_string(),
91            interval: default_interval(),
92            timeout: default_timeout(),
93            since: None,
94            tls: None,
95            log_namespace: None,
96        }
97    }
98}
99
100impl_generate_config_from_default!(OktaConfig);
101
102fn find_rel_next_link(header: &str) -> Option<String> {
103    for part in header.split(',') {
104        let relpart: Vec<_> = part.split(';').collect();
105        if let Some(url) = relpart
106            .first()
107            .map(|s| s.trim().trim_matches(|c| c == '<' || c == '>'))
108            && part.contains("rel=\"next\"")
109        {
110            return Some(url.to_string());
111        }
112    }
113    None
114}
115
116#[async_trait::async_trait]
117#[typetag::serde(name = "okta")]
118impl SourceConfig for OktaConfig {
119    async fn build(&self, cx: SourceContext) -> crate::Result<sources::Source> {
120        let since = match self.since {
121            Some(since) => Utc::now() - Duration::from_secs(since),
122            _ => Utc::now(),
123        };
124
125        let path_and_query = format!(
126            "/api/v1/logs?since={}",
127            utf8_percent_encode(&since.to_rfc3339(), percent_encoding::NON_ALPHANUMERIC)
128        );
129
130        let mut url_parts = Uri::try_from(&self.domain)
131            .map_err(|_| {
132                format!(
133                    "Invalid domain: {}. Must be a valid Okta subdomain.",
134                    self.domain
135                )
136            })?
137            .into_parts();
138
139        url_parts.path_and_query = Some(path_and_query.parse()?);
140        if url_parts.scheme.is_none() {
141            url_parts.scheme = Some(http::uri::Scheme::HTTPS);
142        }
143
144        let url = Uri::from_parts(url_parts).map_err(|_| {
145            format!(
146                "Invalid domain: {}. Must be a valid Okta subdomain.",
147                self.domain
148            )
149        })?;
150
151        let tls = TlsSettings::from_options(self.tls.as_ref())?;
152
153        let log_namespace = cx.log_namespace(self.log_namespace);
154
155        warn_if_interval_too_low(self.timeout, self.interval);
156
157        Ok(run(
158            url,
159            tls,
160            cx.proxy,
161            self.token.clone(),
162            self.interval,
163            self.timeout,
164            log_namespace,
165            cx.shutdown,
166            cx.out,
167        )
168        .boxed())
169    }
170
171    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
172        // There is a global and per-source `log_namespace` config. The source config overrides the global setting,
173        // and is merged here.
174        let log_namespace = global_log_namespace.merge(self.log_namespace);
175
176        vec![SourceOutput::new_maybe_logs(
177            JsonDeserializerConfig::default().output_type(),
178            JsonDeserializerConfig::default().schema_definition(log_namespace),
179        )]
180    }
181
182    fn can_acknowledge(&self) -> bool {
183        false
184    }
185}
186
187fn enrich_events(events: &mut Vec<Event>, log_namespace: LogNamespace) {
188    let now = Utc::now();
189    for event in events {
190        log_namespace.insert_standard_vector_source_metadata(
191            event.as_mut_log(),
192            OktaConfig::NAME,
193            now,
194        );
195    }
196}
197
198type OktaRunResult =
199    Result<(http::response::Parts, Bytes, Option<Uri>), Box<dyn std::error::Error + Send + Sync>>;
200
201type OktaTimeoutResult =
202    Result<Result<http::Response<Body>, HttpError>, tokio::time::error::Elapsed>;
203
204async fn run_once(url: String, result: OktaTimeoutResult, timeout: Duration) -> OktaRunResult {
205    let mut next: Option<Uri> = None;
206    match result {
207        Ok(Ok(response)) => {
208            let (header, body) = response.into_parts();
209            if let Some(next_url) = header
210                .headers
211                .get_all("link")
212                .iter()
213                .filter_map(|v| v.to_str().ok())
214                .filter_map(find_rel_next_link)
215                .next()
216                .and_then(|next| Uri::try_from(next).ok())
217            {
218                next = Some(next_url);
219            };
220
221            let body = hyper::body::to_bytes(body).await?;
222
223            emit!(EndpointBytesReceived {
224                byte_size: body.len(),
225                protocol: "http",
226                endpoint: &url,
227            });
228            Ok((header, body, next))
229        }
230        Ok(Err(error)) => Err(error.into()),
231        Err(_) => Err(format!("Timeout error: request exceeded {}s", timeout.as_secs_f64()).into()),
232    }
233}
234
235fn handle_response(
236    response: OktaRunResult,
237    decoder: Decoder,
238    log_namespace: LogNamespace,
239    url: String,
240) -> Option<impl Stream<Item = Event> + Send + use<>> {
241    match response {
242        Ok((header, body, _)) if header.status == hyper::StatusCode::OK => {
243            let mut buf = BytesMut::new();
244            buf.extend_from_slice(&body);
245            let mut events = decode_events(&mut buf, decoder);
246            let byte_size = if events.is_empty() {
247                JsonSize::zero()
248            } else {
249                events.estimated_json_encoded_size_of()
250            };
251
252            emit!(HttpClientEventsReceived {
253                byte_size,
254                count: events.len(),
255                url,
256            });
257
258            if events.is_empty() {
259                return None;
260            }
261
262            enrich_events(&mut events, log_namespace);
263
264            Some(stream::iter(events))
265        }
266        Ok((header, _, _)) => {
267            emit!(HttpClientHttpResponseError {
268                code: header.status,
269                url,
270            });
271            None
272        }
273        Err(error) => {
274            emit!(HttpClientHttpError { error, url });
275            None
276        }
277    }
278}
279
280/// Calls the Okta system logs API and sends the events to the output stream.
281///
282/// Okta's API paginates with a `link` header that contains a url (in `rel=next`) to the next page of results,
283/// and will always return a `rel=next` link regardless of whether there are more results.
284/// This function fetches all pages until there are no more results (an empty JSON array) and finishes until
285/// the next interval
286/// The function will run until the `shutdown` signal is received.
287#[allow(clippy::too_many_arguments)] // internal function
288async fn run(
289    url: Uri,
290    tls: TlsSettings,
291    proxy: ProxyConfig,
292    token: String,
293    interval: Duration,
294    timeout: Duration,
295    log_namespace: LogNamespace,
296    shutdown: ShutdownSignal,
297    mut out: SourceSender,
298) -> Result<(), ()> {
299    let url_mutex = Arc::new(Mutex::new(url.clone()));
300    let decoder = DecodingConfig::new(
301        FramingConfig::Bytes,
302        DeserializerConfig::Json(JsonDeserializerConfig::default()),
303        log_namespace,
304    )
305    .build()
306    .map_err(|ref e| {
307        emit!(DecoderDeserializeError { error: e });
308    })?;
309
310    let client = HttpClient::new(tls, &proxy).map_err(|e| {
311        emit!(HttpClientHttpError {
312            error: Box::new(e),
313            url: url.to_string()
314        });
315    })?;
316
317    let mut stream = IntervalStream::new(tokio::time::interval(interval))
318        .take_until(shutdown)
319        .then(move |_| {
320            let client = client.clone();
321            let url_mutex = Arc::clone(&url_mutex);
322            let token = token.clone();
323            let decoder = decoder.clone();
324
325            async move {
326                stream::unfold((), move |_| {
327                    let url_mutex = Arc::clone(&url_mutex);
328                    let token = token.clone();
329                    let decoder = decoder.clone();
330                    let client = client.clone();
331
332                    async move {
333                        let (run_url, response): (String, OktaRunResult) = {
334                            // We update the actual URL based on the response the API returns
335                            // so the critical section is between here & when the request finishes
336                            let mut url_lock = url_mutex.lock().await;
337                            let url = url_lock.to_string();
338
339                            let mut request = match Request::get(&url).body(Body::empty()) {
340                                Ok(request) => request,
341                                Err(e) => {
342                                    emit!(HttpClientHttpError {
343                                        error: e.into(),
344                                        url: url.clone(),
345                                    });
346                                    return None;
347                                }
348                            };
349
350                            let headers = request.headers_mut();
351                            headers.insert(
352                                http::header::AUTHORIZATION,
353                                format!("SSWS {token}").parse().unwrap(),
354                            );
355                            headers
356                                .insert(http::header::ACCEPT, "application/json".parse().unwrap());
357                            headers.insert(
358                                http::header::CONTENT_TYPE,
359                                "application/json".parse().unwrap(),
360                            );
361
362                            let client = client.clone();
363                            let response = tokio::time::timeout(timeout, client.send(request))
364                                .then({
365                                    let url = url.clone();
366                                    move |result| run_once(url, result, timeout)
367                                })
368                                .await;
369
370                            if let Ok((_, _, Some(ref next))) = response {
371                                *url_lock = next.clone();
372                            }
373                            let new_url = url_lock.to_string();
374
375                            (new_url, response)
376                        };
377
378                        handle_response(response, decoder, log_namespace, run_url)
379                            .map(|events| (events, ()))
380                    }
381                })
382                .flatten()
383                .boxed()
384            }
385        })
386        .flatten_unordered(None)
387        .boxed();
388
389    match out.send_event_stream(&mut stream).await {
390        Ok(()) => {
391            debug!("Finished sending.");
392            Ok(())
393        }
394        Err(_) => {
395            let (count, _) = stream.size_hint();
396            emit!(StreamClosedError { count });
397            Err(())
398        }
399    }
400}
401
402fn decode_events(buf: &mut BytesMut, mut decoder: Decoder) -> Vec<Event> {
403    let mut events = Vec::new();
404    loop {
405        match decoder.decode_eof(buf) {
406            Ok(Some((next, _))) => {
407                events.extend(next);
408            }
409            Ok(None) => break,
410            Err(error) => {
411                // Error is logged by `crate::codecs::Decoder`, no further
412                // handling is needed here.
413                if !error.can_continue() {
414                    break;
415                }
416                break;
417            }
418        }
419    }
420    events
421}