1use std::{
6 collections::VecDeque,
7 future::Future,
8 io,
9 net::{IpAddr, SocketAddr, SocketAddrV4, SocketAddrV6},
10 pin::Pin,
11 task::{Context, Poll},
12};
13
14use actix_rt::net::{TcpSocket, TcpStream};
15use actix_service::{Service, ServiceFactory};
16use actix_utils::future::{ok, Ready};
17use futures_core::ready;
18use tokio_util::sync::ReusableBoxFuture;
19use tracing::{error, trace};
20
21use super::{connect_addrs::ConnectAddrs, error::ConnectError, ConnectInfo, Connection, Host};
22
23#[derive(Debug, Clone, Copy, Default)]
25#[non_exhaustive]
26pub struct TcpConnector;
27
28impl TcpConnector {
29 pub fn service(&self) -> TcpConnectorService {
31 TcpConnectorService::default()
32 }
33}
34
35impl<R: Host> ServiceFactory<ConnectInfo<R>> for TcpConnector {
36 type Response = Connection<R, TcpStream>;
37 type Error = ConnectError;
38 type Config = ();
39 type Service = TcpConnectorService;
40 type InitError = ();
41 type Future = Ready<Result<Self::Service, Self::InitError>>;
42
43 fn new_service(&self, _: ()) -> Self::Future {
44 ok(self.service())
45 }
46}
47
48#[derive(Debug, Copy, Clone, Default)]
50#[non_exhaustive]
51pub struct TcpConnectorService;
52
53impl<R: Host> Service<ConnectInfo<R>> for TcpConnectorService {
54 type Response = Connection<R, TcpStream>;
55 type Error = ConnectError;
56 type Future = TcpConnectorFut<R>;
57
58 actix_service::always_ready!();
59
60 fn call(&self, req: ConnectInfo<R>) -> Self::Future {
61 let port = req.port();
62
63 let ConnectInfo {
64 request: req,
65 addr,
66 local_addr,
67 ..
68 } = req;
69
70 TcpConnectorFut::new(req, port, local_addr, addr)
71 }
72}
73
74#[doc(hidden)]
76pub enum TcpConnectorFut<R> {
77 Response {
78 req: Option<R>,
79 port: u16,
80 local_addr: Option<IpAddr>,
81 addrs: Option<VecDeque<SocketAddr>>,
82 stream: ReusableBoxFuture<'static, Result<TcpStream, io::Error>>,
83 },
84
85 Error(Option<ConnectError>),
86}
87
88impl<R: Host> TcpConnectorFut<R> {
89 pub(crate) fn new(
90 req: R,
91 port: u16,
92 local_addr: Option<IpAddr>,
93 addr: ConnectAddrs,
94 ) -> TcpConnectorFut<R> {
95 if addr.is_unresolved() {
96 error!("TCP connector: unresolved connection address");
97 return TcpConnectorFut::Error(Some(ConnectError::Unresolved));
98 }
99
100 trace!(
101 "TCP connector: connecting to {} on port {}",
102 req.hostname(),
103 port
104 );
105
106 match addr {
107 ConnectAddrs::None => unreachable!("none variant already checked"),
108
109 ConnectAddrs::One(addr) => TcpConnectorFut::Response {
110 req: Some(req),
111 port,
112 local_addr,
113 addrs: None,
114 stream: ReusableBoxFuture::new(connect(addr, local_addr)),
115 },
116
117 ConnectAddrs::Multi(mut addrs) => {
120 let addr = addrs.pop_front().unwrap();
121
122 TcpConnectorFut::Response {
123 req: Some(req),
124 port,
125 local_addr,
126 addrs: Some(addrs),
127 stream: ReusableBoxFuture::new(connect(addr, local_addr)),
128 }
129 }
130 }
131 }
132}
133
134impl<R: Host> Future for TcpConnectorFut<R> {
135 type Output = Result<Connection<R, TcpStream>, ConnectError>;
136
137 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
138 match self.get_mut() {
139 TcpConnectorFut::Error(err) => Poll::Ready(Err(err.take().unwrap())),
140
141 TcpConnectorFut::Response {
142 req,
143 port,
144 local_addr,
145 addrs,
146 stream,
147 } => loop {
148 match ready!(stream.poll(cx)) {
149 Ok(sock) => {
150 let req = req.take().unwrap();
151
152 trace!(
153 "TCP connector: successfully connected to {:?} - {:?}",
154 req.hostname(),
155 sock.peer_addr()
156 );
157
158 return Poll::Ready(Ok(Connection::new(req, sock)));
159 }
160
161 Err(err) => {
162 trace!(
163 "TCP connector: failed to connect to {:?} port: {}",
164 req.as_ref().unwrap().hostname(),
165 port,
166 );
167
168 if let Some(addr) = addrs.as_mut().and_then(|addrs| addrs.pop_front()) {
169 stream.set(connect(addr, *local_addr));
170 } else {
171 return Poll::Ready(Err(ConnectError::Io(err)));
172 }
173 }
174 }
175 },
176 }
177 }
178}
179
180async fn connect(addr: SocketAddr, local_addr: Option<IpAddr>) -> io::Result<TcpStream> {
181 match local_addr {
183 Some(ip_addr) => {
184 let socket = match ip_addr {
185 IpAddr::V4(ip_addr) => {
186 let socket = TcpSocket::new_v4()?;
187 let addr = SocketAddr::V4(SocketAddrV4::new(ip_addr, 0));
188 socket.bind(addr)?;
189 socket
190 }
191 IpAddr::V6(ip_addr) => {
192 let socket = TcpSocket::new_v6()?;
193 let addr = SocketAddr::V6(SocketAddrV6::new(ip_addr, 0, 0, 0));
194 socket.bind(addr)?;
195 socket
196 }
197 };
198
199 socket.connect(addr).await
200 }
201
202 None => TcpStream::connect(addr).await,
203 }
204}