1use std::{
2 future::Future,
3 net,
4 pin::Pin,
5 rc::Rc,
6 task::{Context, Poll},
7};
8
9use actix_codec::Framed;
10use actix_http::{h1::ClientCodec, Payload, RequestHead, RequestHeadType, ResponseHead};
11use actix_service::Service;
12use futures_core::{future::LocalBoxFuture, ready};
13
14use crate::{
15 any_body::AnyBody,
16 client::{Connect as ClientConnect, ConnectError, Connection, ConnectionIo, SendRequestError},
17 ClientResponse,
18};
19
20pub type BoxConnectorService = Rc<
21 dyn Service<
22 ConnectRequest,
23 Response = ConnectResponse,
24 Error = SendRequestError,
25 Future = LocalBoxFuture<'static, Result<ConnectResponse, SendRequestError>>,
26 >,
27>;
28
29pub type BoxedSocket = Box<dyn ConnectionIo>;
30
31pub enum ConnectRequest {
33 Client(RequestHeadType, AnyBody, Option<net::SocketAddr>),
37
38 Tunnel(RequestHead, Option<net::SocketAddr>),
42}
43
44pub enum ConnectResponse {
46 Client(ClientResponse),
48
49 Tunnel(ResponseHead, Framed<BoxedSocket, ClientCodec>),
53}
54
55impl ConnectResponse {
56 pub fn into_client_response(self) -> ClientResponse {
61 match self {
62 ConnectResponse::Client(res) => res,
63 _ => {
64 panic!("ClientResponse only reachable with ConnectResponse::ClientResponse variant")
65 }
66 }
67 }
68
69 pub fn into_tunnel_response(self) -> (ResponseHead, Framed<BoxedSocket, ClientCodec>) {
74 match self {
75 ConnectResponse::Tunnel(head, framed) => (head, framed),
76 _ => {
77 panic!("TunnelResponse only reachable with ConnectResponse::TunnelResponse variant")
78 }
79 }
80 }
81}
82
83pub struct DefaultConnector<S> {
84 connector: S,
85}
86
87impl<S> DefaultConnector<S> {
88 pub(crate) fn new(connector: S) -> Self {
89 Self { connector }
90 }
91}
92
93impl<S, Io> Service<ConnectRequest> for DefaultConnector<S>
94where
95 S: Service<ClientConnect, Error = ConnectError, Response = Connection<Io>>,
96 Io: ConnectionIo,
97{
98 type Response = ConnectResponse;
99 type Error = SendRequestError;
100 type Future = ConnectRequestFuture<S::Future, Io>;
101
102 actix_service::forward_ready!(connector);
103
104 fn call(&self, req: ConnectRequest) -> Self::Future {
105 let fut = match req {
107 ConnectRequest::Client(ref head, .., addr) => self.connector.call(ClientConnect {
108 uri: head.as_ref().uri.clone(),
109 addr,
110 }),
111 ConnectRequest::Tunnel(ref head, addr) => self.connector.call(ClientConnect {
112 uri: head.uri.clone(),
113 addr,
114 }),
115 };
116
117 ConnectRequestFuture::Connection {
118 fut,
119 req: Some(req),
120 }
121 }
122}
123
124pin_project_lite::pin_project! {
125 #[project = ConnectRequestProj]
126 pub enum ConnectRequestFuture<Fut, Io>
127 where
128 Io: ConnectionIo
129 {
130 Connection {
131 #[pin]
132 fut: Fut,
133 req: Option<ConnectRequest>
134 },
135 Client {
136 fut: LocalBoxFuture<'static, Result<(ResponseHead, Payload), SendRequestError>>
137 },
138 Tunnel {
139 fut: LocalBoxFuture<
140 'static,
141 Result<(ResponseHead, Framed<Connection<Io>, ClientCodec>), SendRequestError>,
142 >,
143 }
144 }
145}
146
147impl<Fut, Io> Future for ConnectRequestFuture<Fut, Io>
148where
149 Fut: Future<Output = Result<Connection<Io>, ConnectError>>,
150 Io: ConnectionIo,
151{
152 type Output = Result<ConnectResponse, SendRequestError>;
153
154 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
155 match self.as_mut().project() {
156 ConnectRequestProj::Connection { fut, req } => {
157 let connection = ready!(fut.poll(cx))?;
158 let req = req.take().unwrap();
159
160 match req {
161 ConnectRequest::Client(head, body, ..) => {
162 let fut = ConnectRequestFuture::Client {
164 fut: connection.send_request(head, body),
165 };
166
167 self.set(fut);
168 }
169
170 ConnectRequest::Tunnel(head, ..) => {
171 let fut = ConnectRequestFuture::Tunnel {
173 fut: connection.open_tunnel(RequestHeadType::from(head)),
174 };
175
176 self.set(fut);
177 }
178 }
179
180 self.poll(cx)
181 }
182
183 ConnectRequestProj::Client { fut } => {
184 let (head, payload) = ready!(fut.as_mut().poll(cx))?;
185 Poll::Ready(Ok(ConnectResponse::Client(ClientResponse::new(
186 head, payload,
187 ))))
188 }
189
190 ConnectRequestProj::Tunnel { fut } => {
191 let (head, framed) = ready!(fut.as_mut().poll(cx))?;
192 let framed = framed.into_map_io(|io| Box::new(io) as _);
193 Poll::Ready(Ok(ConnectResponse::Tunnel(head, framed)))
194 }
195 }
196 }
197}