1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
use std::{
    convert::Infallible,
    net::SocketAddr,
    sync::{atomic::AtomicBool, Arc},
};

use async_graphql::{
    http::{playground_source, GraphQLPlaygroundConfig, WebSocketProtocols},
    Data, Request, Schema,
};
use async_graphql_warp::{graphql_protocol, GraphQLResponse, GraphQLWebSocket};
use hyper::{server::conn::AddrIncoming, service::make_service_fn, Server as HyperServer};
use tokio::runtime::Handle;
use tokio::sync::oneshot;
use tower::ServiceBuilder;
use tracing::Span;
use vector_lib::tap::topology;
use warp::{filters::BoxedFilter, http::Response, ws::Ws, Filter, Reply};

use super::{handler, schema};
use crate::{
    config::{self, api},
    http::build_http_trace_layer,
    internal_events::{SocketBindError, SocketMode},
};

pub struct Server {
    _shutdown: oneshot::Sender<()>,
    addr: SocketAddr,
}

impl Server {
    /// Start the API server. This creates the routes and spawns a Warp server. The server is
    /// gracefully shut down when Self falls out of scope by way of the oneshot sender closing.
    pub fn start(
        config: &config::Config,
        watch_rx: topology::WatchRx,
        running: Arc<AtomicBool>,
        handle: &Handle,
    ) -> crate::Result<Self> {
        let routes = make_routes(config.api, watch_rx, running);

        let (_shutdown, rx) = oneshot::channel();
        // warp uses `tokio::spawn` and so needs us to enter the runtime context.
        let _guard = handle.enter();

        let addr = config.api.address.expect("No socket address");
        let incoming = AddrIncoming::bind(&addr).inspect_err(|error| {
            emit!(SocketBindError {
                mode: SocketMode::Tcp,
                error,
            });
        })?;

        let span = Span::current();
        let make_svc = make_service_fn(move |_conn| {
            let svc = ServiceBuilder::new()
                .layer(build_http_trace_layer(span.clone()))
                .service(warp::service(routes.clone()));
            futures_util::future::ok::<_, Infallible>(svc)
        });

        let server = async move {
            HyperServer::builder(incoming)
                .serve(make_svc)
                .with_graceful_shutdown(async {
                    rx.await.ok();
                })
                .await
                .map_err(|err| {
                    error!("An error occurred: {:?}.", err);
                })
        };

        // Update component schema with the config before starting the server.
        schema::components::update_config(config);

        // Spawn the server in the background.
        handle.spawn(server);

        Ok(Self { _shutdown, addr })
    }

    /// Returns a copy of the SocketAddr that the server was started on.
    pub const fn addr(&self) -> SocketAddr {
        self.addr
    }

    /// Update the configuration of a running server. While this instance method doesn't
    /// directly involve `self`, it provides a neater API to expose an internal implementation
    /// detail than exposing the function of the sub-mod directly.
    pub fn update_config(&self, config: &config::Config) {
        schema::components::update_config(config)
    }
}

fn make_routes(
    api: api::Options,
    watch_tx: topology::WatchRx,
    running: Arc<AtomicBool>,
) -> BoxedFilter<(impl Reply,)> {
    // Routes...

    // Health.
    let health = warp::path("health")
        .and(with_shared(running))
        .and_then(handler::health);

    // 404.
    let not_found_graphql = warp::any().and_then(|| async { Err(warp::reject::not_found()) });
    let not_found = warp::any().and_then(|| async { Err(warp::reject::not_found()) });

    // GraphQL subscription handler. Creates a Warp WebSocket handler and for each connection,
    // parses the required headers for GraphQL and builds per-connection context based on the
    // provided `WatchTx` channel sender. This allows GraphQL resolvers to subscribe to
    // topology changes.
    let graphql_subscription_handler =
        warp::ws()
            .and(graphql_protocol())
            .map(move |ws: Ws, protocol: WebSocketProtocols| {
                let schema = schema::build_schema().finish();
                let watch_tx = watch_tx.clone();

                let reply = ws.on_upgrade(move |socket| {
                    let mut data = Data::default();
                    data.insert(watch_tx);

                    GraphQLWebSocket::new(socket, schema, protocol)
                        .with_data(data)
                        .serve()
                });

                warp::reply::with_header(
                    reply,
                    "Sec-WebSocket-Protocol",
                    protocol.sec_websocket_protocol(),
                )
            });

    // Handle GraphQL queries. Headers will first be parsed to determine whether the query is
    // a subscription and if so, an attempt will be made to upgrade the connection to WebSockets.
    // All other queries will fall back to the default HTTP handler.
    let graphql_handler = if api.graphql {
        warp::path("graphql")
            .and(graphql_subscription_handler.or(
                async_graphql_warp::graphql(schema::build_schema().finish()).and_then(
                    |(schema, request): (Schema<_, _, _>, Request)| async move {
                        Ok::<_, Infallible>(GraphQLResponse::from(schema.execute(request).await))
                    },
                ),
            ))
            .boxed()
    } else {
        not_found_graphql.boxed()
    };

    // Provide a playground for executing GraphQL queries/mutations/subscriptions.
    let graphql_playground = if api.playground && api.graphql {
        warp::path("playground")
            .map(move || {
                Response::builder()
                    .header("content-type", "text/html")
                    .body(playground_source(
                        GraphQLPlaygroundConfig::new("/graphql").subscription_endpoint("/graphql"),
                    ))
            })
            .boxed()
    } else {
        not_found.boxed()
    };

    // Wire up the health + GraphQL endpoints. Provides a permissive CORS policy to allow for
    // cross-origin interaction with the Vector API.
    health
        .or(graphql_handler)
        .or(graphql_playground)
        .or(not_found)
        .with(
            warp::cors()
                .allow_any_origin()
                .allow_headers(vec![
                    "User-Agent",
                    "Sec-Fetch-Mode",
                    "Referer",
                    "Origin",
                    "Access-Control-Request-Method",
                    "Access-Control-Allow-Origin",
                    "Access-Control-Request-Headers",
                    "Content-Type",
                    "X-Apollo-Tracing", // for Apollo GraphQL clients
                    "Pragma",
                    "Host",
                    "Connection",
                    "Cache-Control",
                ])
                .allow_methods(vec!["POST", "GET"]),
        )
        .boxed()
}

fn with_shared(
    shared: Arc<AtomicBool>,
) -> impl Filter<Extract = (Arc<AtomicBool>,), Error = Infallible> + Clone {
    warp::any().map(move || Arc::<AtomicBool>::clone(&shared))
}