1use std::{
2 fmt,
3 future::Future,
4 net::IpAddr,
5 pin::Pin,
6 rc::Rc,
7 task::{Context, Poll},
8 time::Duration,
9};
10
11use actix_http::Protocol;
12use actix_rt::{
13 net::{ActixStream, TcpStream},
14 time::{sleep, Sleep},
15};
16use actix_service::Service;
17use actix_tls::connect::{
18 ConnectError as TcpConnectError, ConnectInfo, Connection as TcpConnection,
19 Connector as TcpConnector, Resolver,
20};
21use futures_core::{future::LocalBoxFuture, ready};
22use http::Uri;
23use pin_project_lite::pin_project;
24
25use super::{
26 config::ConnectorConfig,
27 connection::{Connection, ConnectionIo},
28 error::ConnectError,
29 pool::ConnectionPool,
30 Connect,
31};
32
33enum OurTlsConnector {
34 #[allow(dead_code)] None,
36
37 #[cfg(feature = "openssl")]
38 Openssl(actix_tls::connect::openssl::reexports::SslConnector),
39
40 #[cfg(feature = "openssl")]
43 #[allow(dead_code)] OpensslBuilder(actix_tls::connect::openssl::reexports::SslConnectorBuilder),
45
46 #[cfg(feature = "rustls-0_20")]
47 #[allow(dead_code)] Rustls020(std::sync::Arc<actix_tls::connect::rustls_0_20::reexports::ClientConfig>),
49
50 #[cfg(feature = "rustls-0_21")]
51 #[allow(dead_code)] Rustls021(std::sync::Arc<actix_tls::connect::rustls_0_21::reexports::ClientConfig>),
53
54 #[cfg(any(
55 feature = "rustls-0_22-webpki-roots",
56 feature = "rustls-0_22-native-roots",
57 ))]
58 #[allow(dead_code)] Rustls022(std::sync::Arc<actix_tls::connect::rustls_0_22::reexports::ClientConfig>),
60
61 #[cfg(feature = "rustls-0_23")]
62 #[allow(dead_code)] Rustls023(std::sync::Arc<actix_tls::connect::rustls_0_23::reexports::ClientConfig>),
64}
65
66pub struct Connector<T> {
79 connector: T,
80 config: ConnectorConfig,
81
82 #[allow(dead_code)] tls: OurTlsConnector,
84}
85
86impl Connector<()> {
87 #[allow(clippy::new_ret_no_self, clippy::let_unit_value)]
96 pub fn new() -> Connector<
97 impl Service<
98 ConnectInfo<Uri>,
99 Response = TcpConnection<Uri, TcpStream>,
100 Error = actix_tls::connect::ConnectError,
101 > + Clone,
102 > {
103 Connector {
104 connector: TcpConnector::new(resolver::resolver()).service(),
105 config: ConnectorConfig::default(),
106 tls: Self::build_tls(vec![b"h2".to_vec(), b"http/1.1".to_vec()]),
107 }
108 }
109
110 cfg_if::cfg_if! {
111 if #[cfg(any(feature = "rustls-0_23-webpki-roots", feature = "rustls-0_23-native-roots"))] {
112 fn build_tls(protocols: Vec<Vec<u8>>) -> OurTlsConnector {
116 use actix_tls::connect::rustls_0_23::{self, reexports::ClientConfig};
117
118 cfg_if::cfg_if! {
119 if #[cfg(feature = "rustls-0_23-webpki-roots")] {
120 let certs = rustls_0_23::webpki_roots_cert_store();
121 } else if #[cfg(feature = "rustls-0_23-native-roots")] {
122 let certs = rustls_0_23::native_roots_cert_store().expect("Failed to find native root certificates");
123 }
124 }
125
126 let mut config = ClientConfig::builder()
127 .with_root_certificates(certs)
128 .with_no_client_auth();
129
130 config.alpn_protocols = protocols;
131
132 OurTlsConnector::Rustls023(std::sync::Arc::new(config))
133 }
134 } else if #[cfg(any(feature = "rustls-0_22-webpki-roots", feature = "rustls-0_22-native-roots"))] {
135 fn build_tls(protocols: Vec<Vec<u8>>) -> OurTlsConnector {
137 use actix_tls::connect::rustls_0_22::{self, reexports::ClientConfig};
138
139 cfg_if::cfg_if! {
140 if #[cfg(feature = "rustls-0_22-webpki-roots")] {
141 let certs = rustls_0_22::webpki_roots_cert_store();
142 } else if #[cfg(feature = "rustls-0_22-native-roots")] {
143 let certs = rustls_0_22::native_roots_cert_store().expect("Failed to find native root certificates");
144 }
145 }
146
147 let mut config = ClientConfig::builder()
148 .with_root_certificates(certs)
149 .with_no_client_auth();
150
151 config.alpn_protocols = protocols;
152
153 OurTlsConnector::Rustls022(std::sync::Arc::new(config))
154 }
155 } else if #[cfg(feature = "rustls-0_21")] {
156 fn build_tls(protocols: Vec<Vec<u8>>) -> OurTlsConnector {
158 use actix_tls::connect::rustls_0_21::{reexports::ClientConfig, webpki_roots_cert_store};
159
160 let mut config = ClientConfig::builder()
161 .with_safe_defaults()
162 .with_root_certificates(webpki_roots_cert_store())
163 .with_no_client_auth();
164
165 config.alpn_protocols = protocols;
166
167 OurTlsConnector::Rustls021(std::sync::Arc::new(config))
168 }
169 } else if #[cfg(feature = "rustls-0_20")] {
170 fn build_tls(protocols: Vec<Vec<u8>>) -> OurTlsConnector {
172 use actix_tls::connect::rustls_0_20::{reexports::ClientConfig, webpki_roots_cert_store};
173
174 let mut config = ClientConfig::builder()
175 .with_safe_defaults()
176 .with_root_certificates(webpki_roots_cert_store())
177 .with_no_client_auth();
178
179 config.alpn_protocols = protocols;
180
181 OurTlsConnector::Rustls020(std::sync::Arc::new(config))
182 }
183 } else if #[cfg(feature = "openssl")] {
184 fn build_tls(protocols: Vec<Vec<u8>>) -> OurTlsConnector {
186 use actix_tls::connect::openssl::reexports::{SslConnector, SslMethod};
187 use bytes::{BufMut, BytesMut};
188
189 let mut alpn = BytesMut::with_capacity(20);
190 for proto in &protocols {
191 alpn.put_u8(proto.len() as u8);
192 alpn.put(proto.as_slice());
193 }
194
195 let mut ssl = SslConnector::builder(SslMethod::tls()).unwrap();
196 if let Err(err) = ssl.set_alpn_protos(&alpn) {
197 log::error!("Can not set ALPN protocol: {err:?}");
198 }
199
200 OurTlsConnector::OpensslBuilder(ssl)
201 }
202 } else {
203 fn build_tls(_: Vec<Vec<u8>>) -> OurTlsConnector {
206 OurTlsConnector::None
207 }
208 }
209 }
210}
211
212impl<S> Connector<S> {
213 pub fn connector<S1, Io1>(self, connector: S1) -> Connector<S1>
215 where
216 Io1: ActixStream + fmt::Debug + 'static,
217 S1: Service<ConnectInfo<Uri>, Response = TcpConnection<Uri, Io1>, Error = TcpConnectError>
218 + Clone,
219 {
220 Connector {
221 connector,
222 config: self.config,
223 tls: self.tls,
224 }
225 }
226}
227
228impl<S, IO> Connector<S>
229where
230 IO: ActixStream + fmt::Debug + 'static,
238 S: Service<ConnectInfo<Uri>, Response = TcpConnection<Uri, IO>, Error = TcpConnectError>
239 + Clone
240 + 'static,
241{
242 pub fn timeout(mut self, timeout: Duration) -> Self {
248 self.config.timeout = timeout;
249 self
250 }
251
252 pub fn handshake_timeout(mut self, timeout: Duration) -> Self {
259 self.config.handshake_timeout = timeout;
260 self
261 }
262
263 #[cfg(feature = "openssl")]
265 pub fn openssl(
266 mut self,
267 connector: actix_tls::connect::openssl::reexports::SslConnector,
268 ) -> Self {
269 self.tls = OurTlsConnector::Openssl(connector);
270 self
271 }
272
273 #[doc(hidden)]
275 #[cfg(feature = "openssl")]
276 #[deprecated(since = "3.0.0", note = "Renamed to `Connector::openssl`.")]
277 pub fn ssl(mut self, connector: actix_tls::connect::openssl::reexports::SslConnector) -> Self {
278 self.tls = OurTlsConnector::Openssl(connector);
279 self
280 }
281
282 #[cfg(feature = "rustls-0_20")]
284 pub fn rustls(
285 mut self,
286 connector: std::sync::Arc<actix_tls::connect::rustls_0_20::reexports::ClientConfig>,
287 ) -> Self {
288 self.tls = OurTlsConnector::Rustls020(connector);
289 self
290 }
291
292 #[cfg(feature = "rustls-0_21")]
294 pub fn rustls_021(
295 mut self,
296 connector: std::sync::Arc<actix_tls::connect::rustls_0_21::reexports::ClientConfig>,
297 ) -> Self {
298 self.tls = OurTlsConnector::Rustls021(connector);
299 self
300 }
301
302 #[cfg(any(
304 feature = "rustls-0_22-webpki-roots",
305 feature = "rustls-0_22-native-roots",
306 ))]
307 pub fn rustls_0_22(
308 mut self,
309 connector: std::sync::Arc<actix_tls::connect::rustls_0_22::reexports::ClientConfig>,
310 ) -> Self {
311 self.tls = OurTlsConnector::Rustls022(connector);
312 self
313 }
314
315 #[cfg(feature = "rustls-0_23")]
325 pub fn rustls_0_23(
326 mut self,
327 connector: std::sync::Arc<actix_tls::connect::rustls_0_23::reexports::ClientConfig>,
328 ) -> Self {
329 self.tls = OurTlsConnector::Rustls023(connector);
330 self
331 }
332
333 pub fn max_http_version(mut self, val: http::Version) -> Self {
337 let versions = match val {
338 http::Version::HTTP_11 => vec![b"http/1.1".to_vec()],
339 http::Version::HTTP_2 => vec![b"h2".to_vec(), b"http/1.1".to_vec()],
340 _ => {
341 unimplemented!("actix-http client only supports versions http/1.1 & http/2")
342 }
343 };
344 self.tls = Connector::build_tls(versions);
345 self
346 }
347
348 pub fn initial_window_size(mut self, size: u32) -> Self {
353 self.config.stream_window_size = size;
354 self
355 }
356
357 pub fn initial_connection_window_size(mut self, size: u32) -> Self {
362 self.config.conn_window_size = size;
363 self
364 }
365
366 pub fn limit(mut self, limit: usize) -> Self {
372 if limit == 0 {
373 self.config.limit = u32::MAX as usize;
374 } else {
375 self.config.limit = limit;
376 }
377
378 self
379 }
380
381 pub fn conn_keep_alive(mut self, dur: Duration) -> Self {
388 self.config.conn_keep_alive = dur;
389 self
390 }
391
392 pub fn conn_lifetime(mut self, dur: Duration) -> Self {
398 self.config.conn_lifetime = dur;
399 self
400 }
401
402 pub fn disconnect_timeout(mut self, dur: Duration) -> Self {
411 self.config.disconnect_timeout = Some(dur);
412 self
413 }
414
415 pub fn local_address(mut self, addr: IpAddr) -> Self {
417 self.config.local_address = Some(addr);
418 self
419 }
420
421 pub fn finish(self) -> ConnectorService<S, IO> {
425 let local_address = self.config.local_address;
426 let timeout = self.config.timeout;
427
428 let tcp_service_inner =
429 TcpConnectorInnerService::new(self.connector, timeout, local_address);
430
431 #[allow(clippy::redundant_clone)]
432 let tcp_service = TcpConnectorService {
433 service: tcp_service_inner.clone(),
434 };
435
436 let tls = match self.tls {
437 #[cfg(feature = "openssl")]
438 OurTlsConnector::OpensslBuilder(builder) => OurTlsConnector::Openssl(builder.build()),
439 tls => tls,
440 };
441
442 let tls_service = match tls {
443 OurTlsConnector::None => {
444 #[cfg(not(feature = "dangerous-h2c"))]
445 {
446 None
447 }
448
449 #[cfg(feature = "dangerous-h2c")]
450 {
451 use std::io;
452
453 use actix_tls::connect::Connection;
454 use actix_utils::future::{ready, Ready};
455
456 #[allow(non_local_definitions)]
457 impl IntoConnectionIo for TcpConnection<Uri, Box<dyn ConnectionIo>> {
458 fn into_connection_io(self) -> (Box<dyn ConnectionIo>, Protocol) {
459 let io = self.into_parts().0;
460 (io, Protocol::Http2)
461 }
462 }
463
464 #[derive(Clone)]
469 struct NoOpTlsConnectorService;
470
471 impl<R, IO> Service<Connection<R, IO>> for NoOpTlsConnectorService
472 where
473 IO: ActixStream + 'static,
474 {
475 type Response = Connection<R, Box<dyn ConnectionIo>>;
476 type Error = io::Error;
477 type Future = Ready<Result<Self::Response, Self::Error>>;
478
479 actix_service::always_ready!();
480
481 fn call(&self, connection: Connection<R, IO>) -> Self::Future {
482 let (io, connection) = connection.replace_io(());
483 let (_, connection) = connection.replace_io(Box::new(io) as _);
484
485 ready(Ok(connection))
486 }
487 }
488
489 let handshake_timeout = self.config.handshake_timeout;
490
491 let tls_service = TlsConnectorService {
492 tcp_service: tcp_service_inner,
493 tls_service: NoOpTlsConnectorService,
494 timeout: handshake_timeout,
495 };
496
497 Some(actix_service::boxed::rc_service(tls_service))
498 }
499 }
500
501 #[cfg(feature = "openssl")]
502 OurTlsConnector::Openssl(tls) => {
503 const H2: &[u8] = b"h2";
504
505 use actix_tls::connect::openssl::{reexports::AsyncSslStream, TlsConnector};
506
507 #[allow(non_local_definitions)]
508 impl<IO: ConnectionIo> IntoConnectionIo for TcpConnection<Uri, AsyncSslStream<IO>> {
509 fn into_connection_io(self) -> (Box<dyn ConnectionIo>, Protocol) {
510 let sock = self.into_parts().0;
511 let h2 = sock
512 .ssl()
513 .selected_alpn_protocol()
514 .is_some_and(|protos| protos.windows(2).any(|w| w == H2));
515
516 if h2 {
517 (Box::new(sock), Protocol::Http2)
518 } else {
519 (Box::new(sock), Protocol::Http1)
520 }
521 }
522 }
523
524 let handshake_timeout = self.config.handshake_timeout;
525
526 let tls_service = TlsConnectorService {
527 tcp_service: tcp_service_inner,
528 tls_service: TlsConnector::service(tls),
529 timeout: handshake_timeout,
530 };
531
532 Some(actix_service::boxed::rc_service(tls_service))
533 }
534
535 #[cfg(feature = "openssl")]
536 OurTlsConnector::OpensslBuilder(_) => {
537 unreachable!("OpenSSL builder is built before this match.");
538 }
539
540 #[cfg(feature = "rustls-0_20")]
541 OurTlsConnector::Rustls020(tls) => {
542 const H2: &[u8] = b"h2";
543
544 use actix_tls::connect::rustls_0_20::{reexports::AsyncTlsStream, TlsConnector};
545
546 #[allow(non_local_definitions)]
547 impl<Io: ConnectionIo> IntoConnectionIo for TcpConnection<Uri, AsyncTlsStream<Io>> {
548 fn into_connection_io(self) -> (Box<dyn ConnectionIo>, Protocol) {
549 let sock = self.into_parts().0;
550 let h2 = sock
551 .get_ref()
552 .1
553 .alpn_protocol()
554 .is_some_and(|protos| protos.windows(2).any(|w| w == H2));
555
556 if h2 {
557 (Box::new(sock), Protocol::Http2)
558 } else {
559 (Box::new(sock), Protocol::Http1)
560 }
561 }
562 }
563
564 let handshake_timeout = self.config.handshake_timeout;
565
566 let tls_service = TlsConnectorService {
567 tcp_service: tcp_service_inner,
568 tls_service: TlsConnector::service(tls),
569 timeout: handshake_timeout,
570 };
571
572 Some(actix_service::boxed::rc_service(tls_service))
573 }
574
575 #[cfg(feature = "rustls-0_21")]
576 OurTlsConnector::Rustls021(tls) => {
577 const H2: &[u8] = b"h2";
578
579 use actix_tls::connect::rustls_0_21::{reexports::AsyncTlsStream, TlsConnector};
580
581 #[allow(non_local_definitions)]
582 impl<Io: ConnectionIo> IntoConnectionIo for TcpConnection<Uri, AsyncTlsStream<Io>> {
583 fn into_connection_io(self) -> (Box<dyn ConnectionIo>, Protocol) {
584 let sock = self.into_parts().0;
585 let h2 = sock
586 .get_ref()
587 .1
588 .alpn_protocol()
589 .is_some_and(|protos| protos.windows(2).any(|w| w == H2));
590
591 if h2 {
592 (Box::new(sock), Protocol::Http2)
593 } else {
594 (Box::new(sock), Protocol::Http1)
595 }
596 }
597 }
598
599 let handshake_timeout = self.config.handshake_timeout;
600
601 let tls_service = TlsConnectorService {
602 tcp_service: tcp_service_inner,
603 tls_service: TlsConnector::service(tls),
604 timeout: handshake_timeout,
605 };
606
607 Some(actix_service::boxed::rc_service(tls_service))
608 }
609
610 #[cfg(any(
611 feature = "rustls-0_22-webpki-roots",
612 feature = "rustls-0_22-native-roots",
613 ))]
614 OurTlsConnector::Rustls022(tls) => {
615 const H2: &[u8] = b"h2";
616
617 use actix_tls::connect::rustls_0_22::{reexports::AsyncTlsStream, TlsConnector};
618
619 #[allow(non_local_definitions)]
620 impl<Io: ConnectionIo> IntoConnectionIo for TcpConnection<Uri, AsyncTlsStream<Io>> {
621 fn into_connection_io(self) -> (Box<dyn ConnectionIo>, Protocol) {
622 let sock = self.into_parts().0;
623 let h2 = sock
624 .get_ref()
625 .1
626 .alpn_protocol()
627 .is_some_and(|protos| protos.windows(2).any(|w| w == H2));
628
629 if h2 {
630 (Box::new(sock), Protocol::Http2)
631 } else {
632 (Box::new(sock), Protocol::Http1)
633 }
634 }
635 }
636
637 let handshake_timeout = self.config.handshake_timeout;
638
639 let tls_service = TlsConnectorService {
640 tcp_service: tcp_service_inner,
641 tls_service: TlsConnector::service(tls),
642 timeout: handshake_timeout,
643 };
644
645 Some(actix_service::boxed::rc_service(tls_service))
646 }
647
648 #[cfg(feature = "rustls-0_23")]
649 OurTlsConnector::Rustls023(tls) => {
650 const H2: &[u8] = b"h2";
651
652 use actix_tls::connect::rustls_0_23::{reexports::AsyncTlsStream, TlsConnector};
653
654 #[allow(non_local_definitions)]
655 impl<Io: ConnectionIo> IntoConnectionIo for TcpConnection<Uri, AsyncTlsStream<Io>> {
656 fn into_connection_io(self) -> (Box<dyn ConnectionIo>, Protocol) {
657 let sock = self.into_parts().0;
658 let h2 = sock
659 .get_ref()
660 .1
661 .alpn_protocol()
662 .is_some_and(|protos| protos.windows(2).any(|w| w == H2));
663
664 if h2 {
665 (Box::new(sock), Protocol::Http2)
666 } else {
667 (Box::new(sock), Protocol::Http1)
668 }
669 }
670 }
671
672 let handshake_timeout = self.config.handshake_timeout;
673
674 let tls_service = TlsConnectorService {
675 tcp_service: tcp_service_inner,
676 tls_service: TlsConnector::service(tls),
677 timeout: handshake_timeout,
678 };
679
680 Some(actix_service::boxed::rc_service(tls_service))
681 }
682 };
683
684 let tcp_config = self.config.no_disconnect_timeout();
685
686 let tcp_pool = ConnectionPool::new(tcp_service, tcp_config);
687
688 let tls_config = self.config;
689 let tls_pool =
690 tls_service.map(move |tls_service| ConnectionPool::new(tls_service, tls_config));
691
692 ConnectorServicePriv { tcp_pool, tls_pool }
693 }
694}
695
696#[derive(Clone)]
698pub struct TcpConnectorService<S: Clone> {
699 service: S,
700}
701
702impl<S, Io> Service<Connect> for TcpConnectorService<S>
703where
704 S: Service<Connect, Response = TcpConnection<Uri, Io>, Error = ConnectError> + Clone + 'static,
705{
706 type Response = (Io, Protocol);
707 type Error = ConnectError;
708 type Future = TcpConnectorFuture<S::Future>;
709
710 actix_service::forward_ready!(service);
711
712 fn call(&self, req: Connect) -> Self::Future {
713 TcpConnectorFuture {
714 fut: self.service.call(req),
715 }
716 }
717}
718
719pin_project! {
720 #[project = TcpConnectorFutureProj]
721 pub struct TcpConnectorFuture<Fut> {
722 #[pin]
723 fut: Fut,
724 }
725}
726
727impl<Fut, Io> Future for TcpConnectorFuture<Fut>
728where
729 Fut: Future<Output = Result<TcpConnection<Uri, Io>, ConnectError>>,
730{
731 type Output = Result<(Io, Protocol), ConnectError>;
732
733 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
734 self.project()
735 .fut
736 .poll(cx)
737 .map_ok(|res| (res.into_parts().0, Protocol::Http1))
738 }
739}
740
741#[cfg(any(
744 feature = "dangerous-h2c",
745 feature = "openssl",
746 feature = "rustls-0_20",
747 feature = "rustls-0_21",
748 feature = "rustls-0_22-webpki-roots",
749 feature = "rustls-0_22-native-roots",
750 feature = "rustls-0_23",
751 feature = "rustls-0_23-webpki-roots",
752 feature = "rustls-0_23-native-roots"
753))]
754struct TlsConnectorService<Tcp, Tls> {
755 tcp_service: Tcp,
757
758 tls_service: Tls,
760
761 timeout: Duration,
762}
763
764#[cfg(any(
765 feature = "dangerous-h2c",
766 feature = "openssl",
767 feature = "rustls-0_20",
768 feature = "rustls-0_21",
769 feature = "rustls-0_22-webpki-roots",
770 feature = "rustls-0_22-native-roots",
771 feature = "rustls-0_23",
772))]
773impl<Tcp, Tls, IO> Service<Connect> for TlsConnectorService<Tcp, Tls>
774where
775 Tcp:
776 Service<Connect, Response = TcpConnection<Uri, IO>, Error = ConnectError> + Clone + 'static,
777 Tls: Service<TcpConnection<Uri, IO>, Error = std::io::Error> + Clone + 'static,
778 Tls::Response: IntoConnectionIo,
779 IO: ConnectionIo,
780{
781 type Response = (Box<dyn ConnectionIo>, Protocol);
782 type Error = ConnectError;
783 type Future = TlsConnectorFuture<Tls, Tcp::Future, Tls::Future>;
784
785 fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
786 ready!(self.tcp_service.poll_ready(cx))?;
787 ready!(self.tls_service.poll_ready(cx))?;
788 Poll::Ready(Ok(()))
789 }
790
791 fn call(&self, req: Connect) -> Self::Future {
792 let fut = self.tcp_service.call(req);
793 let tls_service = self.tls_service.clone();
794 let timeout = self.timeout;
795
796 TlsConnectorFuture::TcpConnect {
797 fut,
798 tls_service: Some(tls_service),
799 timeout,
800 }
801 }
802}
803
804pin_project! {
805 #[project = TlsConnectorProj]
806 #[allow(clippy::large_enum_variant)]
807 enum TlsConnectorFuture<S, Fut1, Fut2> {
808 TcpConnect {
809 #[pin]
810 fut: Fut1,
811 tls_service: Option<S>,
812 timeout: Duration,
813 },
814 TlsConnect {
815 #[pin]
816 fut: Fut2,
817 #[pin]
818 timeout: Sleep,
819 },
820 }
821
822}
823trait IntoConnectionIo {
825 fn into_connection_io(self) -> (Box<dyn ConnectionIo>, Protocol);
826}
827
828impl<S, Io, Fut1, Fut2, Res> Future for TlsConnectorFuture<S, Fut1, Fut2>
829where
830 S: Service<TcpConnection<Uri, Io>, Response = Res, Error = std::io::Error, Future = Fut2>,
831 S::Response: IntoConnectionIo,
832 Fut1: Future<Output = Result<TcpConnection<Uri, Io>, ConnectError>>,
833 Fut2: Future<Output = Result<S::Response, S::Error>>,
834 Io: ConnectionIo,
835{
836 type Output = Result<(Box<dyn ConnectionIo>, Protocol), ConnectError>;
837
838 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
839 match self.as_mut().project() {
840 TlsConnectorProj::TcpConnect {
841 fut,
842 tls_service,
843 timeout,
844 } => {
845 let res = ready!(fut.poll(cx))?;
846 let fut = tls_service
847 .take()
848 .expect("TlsConnectorFuture polled after complete")
849 .call(res);
850 let timeout = sleep(*timeout);
851 self.set(TlsConnectorFuture::TlsConnect { fut, timeout });
852 self.poll(cx)
853 }
854 TlsConnectorProj::TlsConnect { fut, timeout } => match fut.poll(cx)? {
855 Poll::Ready(res) => Poll::Ready(Ok(res.into_connection_io())),
856 Poll::Pending => timeout.poll(cx).map(|_| Err(ConnectError::Timeout)),
857 },
858 }
859 }
860}
861
862#[derive(Clone)]
865pub struct TcpConnectorInnerService<S: Clone> {
866 service: S,
867 timeout: Duration,
868 local_address: Option<std::net::IpAddr>,
869}
870
871impl<S: Clone> TcpConnectorInnerService<S> {
872 fn new(service: S, timeout: Duration, local_address: Option<std::net::IpAddr>) -> Self {
873 Self {
874 service,
875 timeout,
876 local_address,
877 }
878 }
879}
880
881impl<S, Io> Service<Connect> for TcpConnectorInnerService<S>
882where
883 S: Service<ConnectInfo<Uri>, Response = TcpConnection<Uri, Io>, Error = TcpConnectError>
884 + Clone
885 + 'static,
886{
887 type Response = S::Response;
888 type Error = ConnectError;
889 type Future = TcpConnectorInnerFuture<S::Future>;
890
891 actix_service::forward_ready!(service);
892
893 fn call(&self, req: Connect) -> Self::Future {
894 let mut req = ConnectInfo::new(req.uri).set_addr(req.addr);
895
896 if let Some(local_addr) = self.local_address {
897 req = req.set_local_addr(local_addr);
898 }
899
900 TcpConnectorInnerFuture {
901 fut: self.service.call(req),
902 timeout: sleep(self.timeout),
903 }
904 }
905}
906
907pin_project! {
908 #[project = TcpConnectorInnerFutureProj]
909 pub struct TcpConnectorInnerFuture<Fut> {
910 #[pin]
911 fut: Fut,
912 #[pin]
913 timeout: Sleep,
914 }
915}
916
917impl<Fut, Io> Future for TcpConnectorInnerFuture<Fut>
918where
919 Fut: Future<Output = Result<TcpConnection<Uri, Io>, TcpConnectError>>,
920{
921 type Output = Result<TcpConnection<Uri, Io>, ConnectError>;
922
923 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
924 let this = self.project();
925 match this.fut.poll(cx) {
926 Poll::Ready(res) => Poll::Ready(res.map_err(ConnectError::from)),
927 Poll::Pending => this.timeout.poll(cx).map(|_| Err(ConnectError::Timeout)),
928 }
929 }
930}
931
932pub type ConnectorService<Svc, IO> = ConnectorServicePriv<
934 TcpConnectorService<TcpConnectorInnerService<Svc>>,
935 Rc<
936 dyn Service<
937 Connect,
938 Response = (Box<dyn ConnectionIo>, Protocol),
939 Error = ConnectError,
940 Future = LocalBoxFuture<
941 'static,
942 Result<(Box<dyn ConnectionIo>, Protocol), ConnectError>,
943 >,
944 >,
945 >,
946 IO,
947 Box<dyn ConnectionIo>,
948>;
949
950pub struct ConnectorServicePriv<S1, S2, Io1, Io2>
951where
952 S1: Service<Connect, Response = (Io1, Protocol), Error = ConnectError>,
953 S2: Service<Connect, Response = (Io2, Protocol), Error = ConnectError>,
954 Io1: ConnectionIo,
955 Io2: ConnectionIo,
956{
957 tcp_pool: ConnectionPool<S1, Io1>,
958 tls_pool: Option<ConnectionPool<S2, Io2>>,
959}
960
961impl<S1, S2, Io1, Io2> Service<Connect> for ConnectorServicePriv<S1, S2, Io1, Io2>
962where
963 S1: Service<Connect, Response = (Io1, Protocol), Error = ConnectError> + Clone + 'static,
964 S2: Service<Connect, Response = (Io2, Protocol), Error = ConnectError> + Clone + 'static,
965 Io1: ConnectionIo,
966 Io2: ConnectionIo,
967{
968 type Response = Connection<Io1, Io2>;
969 type Error = ConnectError;
970 type Future = ConnectorServiceFuture<S1, S2, Io1, Io2>;
971
972 fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
973 ready!(self.tcp_pool.poll_ready(cx))?;
974 if let Some(ref tls_pool) = self.tls_pool {
975 ready!(tls_pool.poll_ready(cx))?;
976 }
977 Poll::Ready(Ok(()))
978 }
979
980 fn call(&self, req: Connect) -> Self::Future {
981 match req.uri.scheme_str() {
982 Some("https") | Some("wss") => match self.tls_pool {
983 None => ConnectorServiceFuture::SslIsNotSupported,
984 Some(ref pool) => ConnectorServiceFuture::Tls {
985 fut: pool.call(req),
986 },
987 },
988 _ => ConnectorServiceFuture::Tcp {
989 fut: self.tcp_pool.call(req),
990 },
991 }
992 }
993}
994
995pin_project! {
996 #[project = ConnectorServiceFutureProj]
997 pub enum ConnectorServiceFuture<S1, S2, Io1, Io2>
998 where
999 S1: Service<Connect, Response = (Io1, Protocol), Error = ConnectError>,
1000 S1: Clone,
1001 S1: 'static,
1002 S2: Service<Connect, Response = (Io2, Protocol), Error = ConnectError>,
1003 S2: Clone,
1004 S2: 'static,
1005 Io1: ConnectionIo,
1006 Io2: ConnectionIo,
1007 {
1008 Tcp {
1009 #[pin]
1010 fut: <ConnectionPool<S1, Io1> as Service<Connect>>::Future
1011 },
1012 Tls {
1013 #[pin]
1014 fut: <ConnectionPool<S2, Io2> as Service<Connect>>::Future
1015 },
1016 SslIsNotSupported
1017 }
1018}
1019
1020impl<S1, S2, Io1, Io2> Future for ConnectorServiceFuture<S1, S2, Io1, Io2>
1021where
1022 S1: Service<Connect, Response = (Io1, Protocol), Error = ConnectError> + Clone + 'static,
1023 S2: Service<Connect, Response = (Io2, Protocol), Error = ConnectError> + Clone + 'static,
1024 Io1: ConnectionIo,
1025 Io2: ConnectionIo,
1026{
1027 type Output = Result<Connection<Io1, Io2>, ConnectError>;
1028
1029 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1030 match self.project() {
1031 ConnectorServiceFutureProj::Tcp { fut } => fut.poll(cx).map_ok(Connection::Tcp),
1032 ConnectorServiceFutureProj::Tls { fut } => fut.poll(cx).map_ok(Connection::Tls),
1033 ConnectorServiceFutureProj::SslIsNotSupported => {
1034 Poll::Ready(Err(ConnectError::SslIsNotSupported))
1035 }
1036 }
1037 }
1038}
1039
1040#[cfg(not(feature = "hickory-dns"))]
1041mod resolver {
1042 use super::*;
1043
1044 pub(super) fn resolver() -> Resolver {
1045 Resolver::default()
1046 }
1047}
1048
1049#[cfg(feature = "hickory-dns")]
1050mod resolver {
1051 use std::{cell::OnceCell, net::SocketAddr};
1052
1053 use actix_tls::connect::Resolve;
1054 use hickory_resolver::{
1055 config::{ResolverConfig, ResolverOpts},
1056 name_server::TokioConnectionProvider,
1057 system_conf::read_system_conf,
1058 TokioResolver,
1059 };
1060
1061 use super::*;
1062
1063 pub(super) fn resolver() -> Resolver {
1064 struct HickoryDnsResolver(TokioResolver);
1066
1067 impl Resolve for HickoryDnsResolver {
1068 fn lookup<'a>(
1069 &'a self,
1070 host: &'a str,
1071 port: u16,
1072 ) -> LocalBoxFuture<'a, Result<Vec<SocketAddr>, Box<dyn std::error::Error>>>
1073 {
1074 Box::pin(async move {
1075 let res = self
1076 .0
1077 .lookup_ip(host)
1078 .await?
1079 .iter()
1080 .map(|ip| SocketAddr::new(ip, port))
1081 .collect();
1082 Ok(res)
1083 })
1084 }
1085 }
1086
1087 thread_local! {
1089 static HICKORY_DNS_RESOLVER: OnceCell<Resolver> = const { OnceCell::new() };
1090 }
1091
1092 HICKORY_DNS_RESOLVER.with(|local| {
1094 local
1095 .get_or_init(|| {
1096 let (cfg, opts) = match read_system_conf() {
1097 Ok((cfg, opts)) => (cfg, opts),
1098 Err(err) => {
1099 log::error!("Hickory DNS can not load system config: {err}");
1100 (ResolverConfig::default(), ResolverOpts::default())
1101 }
1102 };
1103
1104 let resolver =
1105 TokioResolver::builder_with_config(cfg, TokioConnectionProvider::default())
1106 .with_options(opts)
1107 .build();
1108
1109 Resolver::custom(HickoryDnsResolver(resolver))
1110 })
1111 .clone()
1112 })
1113 }
1114}
1115
1116#[cfg(feature = "dangerous-h2c")]
1117#[cfg(test)]
1118mod tests {
1119 use std::convert::Infallible;
1120
1121 use actix_http::{HttpService, Request, Response, Version};
1122 use actix_http_test::test_server;
1123 use actix_service::ServiceFactoryExt as _;
1124
1125 use super::*;
1126 use crate::Client;
1127
1128 #[actix_rt::test]
1129 async fn h2c_connector() {
1130 let mut srv = test_server(|| {
1131 HttpService::build()
1132 .h2(|_req: Request| async { Ok::<_, Infallible>(Response::ok()) })
1133 .tcp()
1134 .map_err(|_| ())
1135 })
1136 .await;
1137
1138 let connector = Connector {
1139 connector: TcpConnector::new(resolver::resolver()).service(),
1140 config: ConnectorConfig::default(),
1141 tls: OurTlsConnector::None,
1142 };
1143
1144 let client = Client::builder().connector(connector).finish();
1145
1146 let request = client.get(srv.surl("/")).send();
1147 let response = request.await.unwrap();
1148 assert!(response.status().is_success());
1149 assert_eq!(response.version(), Version::HTTP_2);
1150
1151 srv.stop().await;
1152 }
1153}