Skip to main content

awc/
connect.rs

1use std::{
2    future::Future,
3    net,
4    pin::Pin,
5    rc::Rc,
6    task::{Context, Poll},
7};
8
9use actix_codec::Framed;
10use actix_http::{h1::ClientCodec, Payload, RequestHead, RequestHeadType, ResponseHead};
11use actix_service::Service;
12use futures_core::{future::LocalBoxFuture, ready};
13
14use crate::{
15    any_body::AnyBody,
16    client::{Connect as ClientConnect, ConnectError, Connection, ConnectionIo, SendRequestError},
17    ClientResponse,
18};
19
20pub type BoxConnectorService = Rc<
21    dyn Service<
22        ConnectRequest,
23        Response = ConnectResponse,
24        Error = SendRequestError,
25        Future = LocalBoxFuture<'static, Result<ConnectResponse, SendRequestError>>,
26    >,
27>;
28
29pub type BoxedSocket = Box<dyn ConnectionIo>;
30
31/// Combined HTTP and WebSocket request type received by connection service.
32pub enum ConnectRequest {
33    /// Standard HTTP request.
34    ///
35    /// Contains the request head, body type, and optional pre-resolved socket address.
36    Client(RequestHeadType, AnyBody, Option<net::SocketAddr>),
37
38    /// Tunnel used by WebSocket connection requests.
39    ///
40    /// Contains the request head and optional pre-resolved socket address.
41    Tunnel(RequestHead, Option<net::SocketAddr>),
42}
43
44/// Combined HTTP response & WebSocket tunnel type returned from connection service.
45pub enum ConnectResponse {
46    /// Standard HTTP response.
47    Client(ClientResponse),
48
49    /// Tunnel used for WebSocket communication.
50    ///
51    /// Contains response head and framed HTTP/1.1 codec.
52    Tunnel(ResponseHead, Framed<BoxedSocket, ClientCodec>),
53}
54
55impl ConnectResponse {
56    /// Unwraps type into HTTP response.
57    ///
58    /// # Panics
59    /// Panics if enum variant is not `Client`.
60    pub fn into_client_response(self) -> ClientResponse {
61        match self {
62            ConnectResponse::Client(res) => res,
63            _ => {
64                panic!("ClientResponse only reachable with ConnectResponse::ClientResponse variant")
65            }
66        }
67    }
68
69    /// Unwraps type into WebSocket tunnel response.
70    ///
71    /// # Panics
72    /// Panics if enum variant is not `Tunnel`.
73    pub fn into_tunnel_response(self) -> (ResponseHead, Framed<BoxedSocket, ClientCodec>) {
74        match self {
75            ConnectResponse::Tunnel(head, framed) => (head, framed),
76            _ => {
77                panic!("TunnelResponse only reachable with ConnectResponse::TunnelResponse variant")
78            }
79        }
80    }
81}
82
83pub struct DefaultConnector<S> {
84    connector: S,
85}
86
87impl<S> DefaultConnector<S> {
88    pub(crate) fn new(connector: S) -> Self {
89        Self { connector }
90    }
91}
92
93impl<S, Io> Service<ConnectRequest> for DefaultConnector<S>
94where
95    S: Service<ClientConnect, Error = ConnectError, Response = Connection<Io>>,
96    Io: ConnectionIo,
97{
98    type Response = ConnectResponse;
99    type Error = SendRequestError;
100    type Future = ConnectRequestFuture<S::Future, Io>;
101
102    actix_service::forward_ready!(connector);
103
104    fn call(&self, req: ConnectRequest) -> Self::Future {
105        // connect to the host
106        let fut = match req {
107            ConnectRequest::Client(ref head, .., addr) => self.connector.call(ClientConnect {
108                uri: head.as_ref().uri.clone(),
109                addr,
110            }),
111            ConnectRequest::Tunnel(ref head, addr) => self.connector.call(ClientConnect {
112                uri: head.uri.clone(),
113                addr,
114            }),
115        };
116
117        ConnectRequestFuture::Connection {
118            fut,
119            req: Some(req),
120        }
121    }
122}
123
124pin_project_lite::pin_project! {
125    #[project = ConnectRequestProj]
126    pub enum ConnectRequestFuture<Fut, Io>
127    where
128        Io: ConnectionIo
129    {
130        Connection {
131            #[pin]
132            fut: Fut,
133            req: Option<ConnectRequest>
134        },
135        Client {
136            fut: LocalBoxFuture<'static, Result<(ResponseHead, Payload), SendRequestError>>
137        },
138        Tunnel {
139            fut: LocalBoxFuture<
140                'static,
141                Result<(ResponseHead, Framed<Connection<Io>, ClientCodec>), SendRequestError>,
142            >,
143        }
144    }
145}
146
147impl<Fut, Io> Future for ConnectRequestFuture<Fut, Io>
148where
149    Fut: Future<Output = Result<Connection<Io>, ConnectError>>,
150    Io: ConnectionIo,
151{
152    type Output = Result<ConnectResponse, SendRequestError>;
153
154    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
155        match self.as_mut().project() {
156            ConnectRequestProj::Connection { fut, req } => {
157                let connection = ready!(fut.poll(cx))?;
158                let req = req.take().unwrap();
159
160                match req {
161                    ConnectRequest::Client(head, body, ..) => {
162                        // send request
163                        let fut = ConnectRequestFuture::Client {
164                            fut: connection.send_request(head, body),
165                        };
166
167                        self.set(fut);
168                    }
169
170                    ConnectRequest::Tunnel(head, ..) => {
171                        // send request
172                        let fut = ConnectRequestFuture::Tunnel {
173                            fut: connection.open_tunnel(RequestHeadType::from(head)),
174                        };
175
176                        self.set(fut);
177                    }
178                }
179
180                self.poll(cx)
181            }
182
183            ConnectRequestProj::Client { fut } => {
184                let (head, payload) = ready!(fut.as_mut().poll(cx))?;
185                Poll::Ready(Ok(ConnectResponse::Client(ClientResponse::new(
186                    head, payload,
187                ))))
188            }
189
190            ConnectRequestProj::Tunnel { fut } => {
191                let (head, framed) = ready!(fut.as_mut().poll(cx))?;
192                let framed = framed.into_map_io(|io| Box::new(io) as _);
193                Poll::Ready(Ok(ConnectResponse::Tunnel(head, framed)))
194            }
195        }
196    }
197}