Skip to main content

awc/client/
connector.rs

1use std::{
2    fmt,
3    future::Future,
4    net::IpAddr,
5    pin::Pin,
6    rc::Rc,
7    task::{Context, Poll},
8    time::Duration,
9};
10
11use actix_http::Protocol;
12use actix_rt::{
13    net::{ActixStream, TcpStream},
14    time::{sleep, Sleep},
15};
16use actix_service::Service;
17use actix_tls::connect::{
18    ConnectError as TcpConnectError, ConnectInfo, Connection as TcpConnection,
19    Connector as TcpConnector, Resolver,
20};
21use futures_core::{future::LocalBoxFuture, ready};
22use http::Uri;
23use pin_project_lite::pin_project;
24
25use super::{
26    config::ConnectorConfig,
27    connection::{Connection, ConnectionIo},
28    error::ConnectError,
29    pool::ConnectionPool,
30    Connect,
31};
32
33enum OurTlsConnector {
34    #[allow(dead_code)] // only dead when no TLS feature is enabled
35    None,
36
37    #[cfg(feature = "openssl")]
38    Openssl(actix_tls::connect::openssl::reexports::SslConnector),
39
40    /// Provided because building the OpenSSL context on newer versions can be very slow.
41    /// This prevents unnecessary calls to `.build()` while constructing the client connector.
42    #[cfg(feature = "openssl")]
43    #[allow(dead_code)] // false positive; used in build_tls
44    OpensslBuilder(actix_tls::connect::openssl::reexports::SslConnectorBuilder),
45
46    #[cfg(feature = "rustls-0_20")]
47    #[allow(dead_code)] // false positive; used in build_tls
48    Rustls020(std::sync::Arc<actix_tls::connect::rustls_0_20::reexports::ClientConfig>),
49
50    #[cfg(feature = "rustls-0_21")]
51    #[allow(dead_code)] // false positive; used in build_tls
52    Rustls021(std::sync::Arc<actix_tls::connect::rustls_0_21::reexports::ClientConfig>),
53
54    #[cfg(any(
55        feature = "rustls-0_22-webpki-roots",
56        feature = "rustls-0_22-native-roots",
57    ))]
58    #[allow(dead_code)] // false positive; used in build_tls
59    Rustls022(std::sync::Arc<actix_tls::connect::rustls_0_22::reexports::ClientConfig>),
60
61    #[cfg(feature = "rustls-0_23")]
62    #[allow(dead_code)] // false positive; used in build_tls
63    Rustls023(std::sync::Arc<actix_tls::connect::rustls_0_23::reexports::ClientConfig>),
64}
65
66/// Manages HTTP client network connectivity.
67///
68/// The `Connector` type uses a builder-like combinator pattern for service construction that
69/// finishes by calling the `.finish()` method.
70///
71/// ```no_run
72/// use std::time::Duration;
73///
74/// let connector = awc::Connector::new()
75///      .timeout(Duration::from_secs(5))
76///      .finish();
77/// ```
78pub struct Connector<T> {
79    connector: T,
80    config: ConnectorConfig,
81
82    #[allow(dead_code)] // only dead when no TLS feature is enabled
83    tls: OurTlsConnector,
84}
85
86impl Connector<()> {
87    /// Create a new connector with default TLS settings
88    ///
89    /// # Panics
90    ///
91    /// - When the `rustls-0_23-webpki-roots` or `rustls-0_23-native-roots` features are enabled
92    ///   and no default crypto provider has been loaded, this method will panic.
93    /// - When the `rustls-0_23-native-roots` or `rustls-0_22-native-roots` features are enabled
94    ///   and the runtime system has no native root certificates, this method will panic.
95    #[allow(clippy::new_ret_no_self, clippy::let_unit_value)]
96    pub fn new() -> Connector<
97        impl Service<
98                ConnectInfo<Uri>,
99                Response = TcpConnection<Uri, TcpStream>,
100                Error = actix_tls::connect::ConnectError,
101            > + Clone,
102    > {
103        Connector {
104            connector: TcpConnector::new(resolver::resolver()).service(),
105            config: ConnectorConfig::default(),
106            tls: Self::build_tls(vec![b"h2".to_vec(), b"http/1.1".to_vec()]),
107        }
108    }
109
110    cfg_if::cfg_if! {
111        if #[cfg(any(feature = "rustls-0_23-webpki-roots", feature = "rustls-0_23-native-roots"))] {
112            /// Build TLS connector with Rustls v0.23, based on supplied ALPN protocols.
113            ///
114            /// Note that if other TLS crate features are enabled, Rustls v0.23 will be used.
115            fn build_tls(protocols: Vec<Vec<u8>>) -> OurTlsConnector {
116                use actix_tls::connect::rustls_0_23::{self, reexports::ClientConfig};
117
118                cfg_if::cfg_if! {
119                    if #[cfg(feature = "rustls-0_23-webpki-roots")] {
120                        let certs = rustls_0_23::webpki_roots_cert_store();
121                    } else if #[cfg(feature = "rustls-0_23-native-roots")] {
122                        let certs = rustls_0_23::native_roots_cert_store().expect("Failed to find native root certificates");
123                    }
124                }
125
126                let mut config = ClientConfig::builder()
127                    .with_root_certificates(certs)
128                    .with_no_client_auth();
129
130                config.alpn_protocols = protocols;
131
132                OurTlsConnector::Rustls023(std::sync::Arc::new(config))
133            }
134        } else if #[cfg(any(feature = "rustls-0_22-webpki-roots", feature = "rustls-0_22-native-roots"))] {
135            /// Build TLS connector with Rustls v0.22, based on supplied ALPN protocols.
136            fn build_tls(protocols: Vec<Vec<u8>>) -> OurTlsConnector {
137                use actix_tls::connect::rustls_0_22::{self, reexports::ClientConfig};
138
139                cfg_if::cfg_if! {
140                    if #[cfg(feature = "rustls-0_22-webpki-roots")] {
141                        let certs = rustls_0_22::webpki_roots_cert_store();
142                    } else if #[cfg(feature = "rustls-0_22-native-roots")] {
143                        let certs = rustls_0_22::native_roots_cert_store().expect("Failed to find native root certificates");
144                    }
145                }
146
147                let mut config = ClientConfig::builder()
148                    .with_root_certificates(certs)
149                    .with_no_client_auth();
150
151                config.alpn_protocols = protocols;
152
153                OurTlsConnector::Rustls022(std::sync::Arc::new(config))
154            }
155        } else if #[cfg(feature = "rustls-0_21")] {
156            /// Build TLS connector with Rustls v0.21, based on supplied ALPN protocols.
157            fn build_tls(protocols: Vec<Vec<u8>>) -> OurTlsConnector {
158                use actix_tls::connect::rustls_0_21::{reexports::ClientConfig, webpki_roots_cert_store};
159
160                let mut config = ClientConfig::builder()
161                    .with_safe_defaults()
162                    .with_root_certificates(webpki_roots_cert_store())
163                    .with_no_client_auth();
164
165                config.alpn_protocols = protocols;
166
167                OurTlsConnector::Rustls021(std::sync::Arc::new(config))
168            }
169        } else if #[cfg(feature = "rustls-0_20")] {
170            /// Build TLS connector with Rustls v0.20, based on supplied ALPN protocols.
171            fn build_tls(protocols: Vec<Vec<u8>>) -> OurTlsConnector {
172                use actix_tls::connect::rustls_0_20::{reexports::ClientConfig, webpki_roots_cert_store};
173
174                let mut config = ClientConfig::builder()
175                    .with_safe_defaults()
176                    .with_root_certificates(webpki_roots_cert_store())
177                    .with_no_client_auth();
178
179                config.alpn_protocols = protocols;
180
181                OurTlsConnector::Rustls020(std::sync::Arc::new(config))
182            }
183        } else if #[cfg(feature = "openssl")] {
184            /// Build TLS connector with OpenSSL, based on supplied ALPN protocols.
185            fn build_tls(protocols: Vec<Vec<u8>>) -> OurTlsConnector {
186                use actix_tls::connect::openssl::reexports::{SslConnector, SslMethod};
187                use bytes::{BufMut, BytesMut};
188
189                let mut alpn = BytesMut::with_capacity(20);
190                for proto in &protocols {
191                    alpn.put_u8(proto.len() as u8);
192                    alpn.put(proto.as_slice());
193                }
194
195                let mut ssl = SslConnector::builder(SslMethod::tls()).unwrap();
196                if let Err(err) = ssl.set_alpn_protos(&alpn) {
197                    log::error!("Can not set ALPN protocol: {err:?}");
198                }
199
200                OurTlsConnector::OpensslBuilder(ssl)
201            }
202        } else {
203            /// Provides an empty TLS connector when no TLS feature is enabled, or when only the
204            /// `rustls-0_23` crate feature is enabled.
205            fn build_tls(_: Vec<Vec<u8>>) -> OurTlsConnector {
206                OurTlsConnector::None
207            }
208        }
209    }
210}
211
212impl<S> Connector<S> {
213    /// Sets custom connector.
214    pub fn connector<S1, Io1>(self, connector: S1) -> Connector<S1>
215    where
216        Io1: ActixStream + fmt::Debug + 'static,
217        S1: Service<ConnectInfo<Uri>, Response = TcpConnection<Uri, Io1>, Error = TcpConnectError>
218            + Clone,
219    {
220        Connector {
221            connector,
222            config: self.config,
223            tls: self.tls,
224        }
225    }
226}
227
228impl<S, IO> Connector<S>
229where
230    // Note:
231    // Input Io type is bound to ActixStream trait but internally in client module they
232    // are bound to ConnectionIo trait alias. And latter is the trait exposed to public
233    // in the form of Box<dyn ConnectionIo> type.
234    //
235    // This remap is to hide ActixStream's trait methods. They are not meant to be called
236    // from user code.
237    IO: ActixStream + fmt::Debug + 'static,
238    S: Service<ConnectInfo<Uri>, Response = TcpConnection<Uri, IO>, Error = TcpConnectError>
239        + Clone
240        + 'static,
241{
242    /// Sets TCP connection timeout.
243    ///
244    /// This is the max time allowed to connect to remote host, including DNS name resolution.
245    ///
246    /// By default, the timeout is 5 seconds.
247    pub fn timeout(mut self, timeout: Duration) -> Self {
248        self.config.timeout = timeout;
249        self
250    }
251
252    /// Sets TLS handshake timeout.
253    ///
254    /// This is the max time allowed to perform the TLS handshake with remote host after TCP
255    /// connection is established.
256    ///
257    /// By default, the timeout is 5 seconds.
258    pub fn handshake_timeout(mut self, timeout: Duration) -> Self {
259        self.config.handshake_timeout = timeout;
260        self
261    }
262
263    /// Sets custom OpenSSL `SslConnector` instance.
264    #[cfg(feature = "openssl")]
265    pub fn openssl(
266        mut self,
267        connector: actix_tls::connect::openssl::reexports::SslConnector,
268    ) -> Self {
269        self.tls = OurTlsConnector::Openssl(connector);
270        self
271    }
272
273    /// See docs for [`Connector::openssl`].
274    #[doc(hidden)]
275    #[cfg(feature = "openssl")]
276    #[deprecated(since = "3.0.0", note = "Renamed to `Connector::openssl`.")]
277    pub fn ssl(mut self, connector: actix_tls::connect::openssl::reexports::SslConnector) -> Self {
278        self.tls = OurTlsConnector::Openssl(connector);
279        self
280    }
281
282    /// Sets custom Rustls v0.20 `ClientConfig` instance.
283    #[cfg(feature = "rustls-0_20")]
284    pub fn rustls(
285        mut self,
286        connector: std::sync::Arc<actix_tls::connect::rustls_0_20::reexports::ClientConfig>,
287    ) -> Self {
288        self.tls = OurTlsConnector::Rustls020(connector);
289        self
290    }
291
292    /// Sets custom Rustls v0.21 `ClientConfig` instance.
293    #[cfg(feature = "rustls-0_21")]
294    pub fn rustls_021(
295        mut self,
296        connector: std::sync::Arc<actix_tls::connect::rustls_0_21::reexports::ClientConfig>,
297    ) -> Self {
298        self.tls = OurTlsConnector::Rustls021(connector);
299        self
300    }
301
302    /// Sets custom Rustls v0.22 `ClientConfig` instance.
303    #[cfg(any(
304        feature = "rustls-0_22-webpki-roots",
305        feature = "rustls-0_22-native-roots",
306    ))]
307    pub fn rustls_0_22(
308        mut self,
309        connector: std::sync::Arc<actix_tls::connect::rustls_0_22::reexports::ClientConfig>,
310    ) -> Self {
311        self.tls = OurTlsConnector::Rustls022(connector);
312        self
313    }
314
315    /// Sets custom Rustls v0.23 `ClientConfig` instance.
316    ///
317    /// In order to enable ALPN, set the `.alpn_protocols` field on the ClientConfig to the
318    /// following:
319    ///
320    /// ```no_run
321    /// vec![b"h2".to_vec(), b"http/1.1".to_vec()]
322    /// # ;
323    /// ```
324    #[cfg(feature = "rustls-0_23")]
325    pub fn rustls_0_23(
326        mut self,
327        connector: std::sync::Arc<actix_tls::connect::rustls_0_23::reexports::ClientConfig>,
328    ) -> Self {
329        self.tls = OurTlsConnector::Rustls023(connector);
330        self
331    }
332
333    /// Sets maximum supported HTTP major version.
334    ///
335    /// Supported versions are HTTP/1.1 and HTTP/2.
336    pub fn max_http_version(mut self, val: http::Version) -> Self {
337        let versions = match val {
338            http::Version::HTTP_11 => vec![b"http/1.1".to_vec()],
339            http::Version::HTTP_2 => vec![b"h2".to_vec(), b"http/1.1".to_vec()],
340            _ => {
341                unimplemented!("actix-http client only supports versions http/1.1 & http/2")
342            }
343        };
344        self.tls = Connector::build_tls(versions);
345        self
346    }
347
348    /// Sets the initial window size (in bytes) for HTTP/2 stream-level flow control for received
349    /// data.
350    ///
351    /// The default value is 65,535 and is good for APIs, but not for big objects.
352    pub fn initial_window_size(mut self, size: u32) -> Self {
353        self.config.stream_window_size = size;
354        self
355    }
356
357    /// Sets the initial window size (in bytes) for HTTP/2 connection-level flow control for
358    /// received data.
359    ///
360    /// The default value is 65,535 and is good for APIs, but not for big objects.
361    pub fn initial_connection_window_size(mut self, size: u32) -> Self {
362        self.config.conn_window_size = size;
363        self
364    }
365
366    /// Set total number of simultaneous connections per type of scheme.
367    ///
368    /// If limit is 0, the connector has no limit.
369    ///
370    /// The default limit size is 100.
371    pub fn limit(mut self, limit: usize) -> Self {
372        if limit == 0 {
373            self.config.limit = u32::MAX as usize;
374        } else {
375            self.config.limit = limit;
376        }
377
378        self
379    }
380
381    /// Set keep-alive period for opened connection.
382    ///
383    /// Keep-alive period is the period between connection usage. If
384    /// the delay between repeated usages of the same connection
385    /// exceeds this period, the connection is closed.
386    /// Default keep-alive period is 15 seconds.
387    pub fn conn_keep_alive(mut self, dur: Duration) -> Self {
388        self.config.conn_keep_alive = dur;
389        self
390    }
391
392    /// Set max lifetime period for connection.
393    ///
394    /// Connection lifetime is max lifetime of any opened connection
395    /// until it is closed regardless of keep-alive period.
396    /// Default lifetime period is 75 seconds.
397    pub fn conn_lifetime(mut self, dur: Duration) -> Self {
398        self.config.conn_lifetime = dur;
399        self
400    }
401
402    /// Set server connection disconnect timeout in milliseconds.
403    ///
404    /// Defines a timeout for disconnect connection. If a disconnect procedure does not complete
405    /// within this time, the socket get dropped. This timeout affects only secure connections.
406    ///
407    /// To disable timeout set value to 0.
408    ///
409    /// By default disconnect timeout is set to 3000 milliseconds.
410    pub fn disconnect_timeout(mut self, dur: Duration) -> Self {
411        self.config.disconnect_timeout = Some(dur);
412        self
413    }
414
415    /// Set local IP Address the connector would use for establishing connection.
416    pub fn local_address(mut self, addr: IpAddr) -> Self {
417        self.config.local_address = Some(addr);
418        self
419    }
420
421    /// Finish configuration process and create connector service.
422    ///
423    /// The `Connector` builder always concludes by calling `finish()` last in its combinator chain.
424    pub fn finish(self) -> ConnectorService<S, IO> {
425        let local_address = self.config.local_address;
426        let timeout = self.config.timeout;
427
428        let tcp_service_inner =
429            TcpConnectorInnerService::new(self.connector, timeout, local_address);
430
431        #[allow(clippy::redundant_clone)]
432        let tcp_service = TcpConnectorService {
433            service: tcp_service_inner.clone(),
434        };
435
436        let tls = match self.tls {
437            #[cfg(feature = "openssl")]
438            OurTlsConnector::OpensslBuilder(builder) => OurTlsConnector::Openssl(builder.build()),
439            tls => tls,
440        };
441
442        let tls_service = match tls {
443            OurTlsConnector::None => {
444                #[cfg(not(feature = "dangerous-h2c"))]
445                {
446                    None
447                }
448
449                #[cfg(feature = "dangerous-h2c")]
450                {
451                    use std::io;
452
453                    use actix_tls::connect::Connection;
454                    use actix_utils::future::{ready, Ready};
455
456                    #[allow(non_local_definitions)]
457                    impl IntoConnectionIo for TcpConnection<Uri, Box<dyn ConnectionIo>> {
458                        fn into_connection_io(self) -> (Box<dyn ConnectionIo>, Protocol) {
459                            let io = self.into_parts().0;
460                            (io, Protocol::Http2)
461                        }
462                    }
463
464                    /// With the `dangerous-h2c` feature enabled, this connector uses a no-op TLS
465                    /// connection service that passes through plain TCP as a TLS connection.
466                    ///
467                    /// The protocol version of this fake TLS connection is set to be HTTP/2.
468                    #[derive(Clone)]
469                    struct NoOpTlsConnectorService;
470
471                    impl<R, IO> Service<Connection<R, IO>> for NoOpTlsConnectorService
472                    where
473                        IO: ActixStream + 'static,
474                    {
475                        type Response = Connection<R, Box<dyn ConnectionIo>>;
476                        type Error = io::Error;
477                        type Future = Ready<Result<Self::Response, Self::Error>>;
478
479                        actix_service::always_ready!();
480
481                        fn call(&self, connection: Connection<R, IO>) -> Self::Future {
482                            let (io, connection) = connection.replace_io(());
483                            let (_, connection) = connection.replace_io(Box::new(io) as _);
484
485                            ready(Ok(connection))
486                        }
487                    }
488
489                    let handshake_timeout = self.config.handshake_timeout;
490
491                    let tls_service = TlsConnectorService {
492                        tcp_service: tcp_service_inner,
493                        tls_service: NoOpTlsConnectorService,
494                        timeout: handshake_timeout,
495                    };
496
497                    Some(actix_service::boxed::rc_service(tls_service))
498                }
499            }
500
501            #[cfg(feature = "openssl")]
502            OurTlsConnector::Openssl(tls) => {
503                const H2: &[u8] = b"h2";
504
505                use actix_tls::connect::openssl::{reexports::AsyncSslStream, TlsConnector};
506
507                #[allow(non_local_definitions)]
508                impl<IO: ConnectionIo> IntoConnectionIo for TcpConnection<Uri, AsyncSslStream<IO>> {
509                    fn into_connection_io(self) -> (Box<dyn ConnectionIo>, Protocol) {
510                        let sock = self.into_parts().0;
511                        let h2 = sock
512                            .ssl()
513                            .selected_alpn_protocol()
514                            .is_some_and(|protos| protos.windows(2).any(|w| w == H2));
515
516                        if h2 {
517                            (Box::new(sock), Protocol::Http2)
518                        } else {
519                            (Box::new(sock), Protocol::Http1)
520                        }
521                    }
522                }
523
524                let handshake_timeout = self.config.handshake_timeout;
525
526                let tls_service = TlsConnectorService {
527                    tcp_service: tcp_service_inner,
528                    tls_service: TlsConnector::service(tls),
529                    timeout: handshake_timeout,
530                };
531
532                Some(actix_service::boxed::rc_service(tls_service))
533            }
534
535            #[cfg(feature = "openssl")]
536            OurTlsConnector::OpensslBuilder(_) => {
537                unreachable!("OpenSSL builder is built before this match.");
538            }
539
540            #[cfg(feature = "rustls-0_20")]
541            OurTlsConnector::Rustls020(tls) => {
542                const H2: &[u8] = b"h2";
543
544                use actix_tls::connect::rustls_0_20::{reexports::AsyncTlsStream, TlsConnector};
545
546                #[allow(non_local_definitions)]
547                impl<Io: ConnectionIo> IntoConnectionIo for TcpConnection<Uri, AsyncTlsStream<Io>> {
548                    fn into_connection_io(self) -> (Box<dyn ConnectionIo>, Protocol) {
549                        let sock = self.into_parts().0;
550                        let h2 = sock
551                            .get_ref()
552                            .1
553                            .alpn_protocol()
554                            .is_some_and(|protos| protos.windows(2).any(|w| w == H2));
555
556                        if h2 {
557                            (Box::new(sock), Protocol::Http2)
558                        } else {
559                            (Box::new(sock), Protocol::Http1)
560                        }
561                    }
562                }
563
564                let handshake_timeout = self.config.handshake_timeout;
565
566                let tls_service = TlsConnectorService {
567                    tcp_service: tcp_service_inner,
568                    tls_service: TlsConnector::service(tls),
569                    timeout: handshake_timeout,
570                };
571
572                Some(actix_service::boxed::rc_service(tls_service))
573            }
574
575            #[cfg(feature = "rustls-0_21")]
576            OurTlsConnector::Rustls021(tls) => {
577                const H2: &[u8] = b"h2";
578
579                use actix_tls::connect::rustls_0_21::{reexports::AsyncTlsStream, TlsConnector};
580
581                #[allow(non_local_definitions)]
582                impl<Io: ConnectionIo> IntoConnectionIo for TcpConnection<Uri, AsyncTlsStream<Io>> {
583                    fn into_connection_io(self) -> (Box<dyn ConnectionIo>, Protocol) {
584                        let sock = self.into_parts().0;
585                        let h2 = sock
586                            .get_ref()
587                            .1
588                            .alpn_protocol()
589                            .is_some_and(|protos| protos.windows(2).any(|w| w == H2));
590
591                        if h2 {
592                            (Box::new(sock), Protocol::Http2)
593                        } else {
594                            (Box::new(sock), Protocol::Http1)
595                        }
596                    }
597                }
598
599                let handshake_timeout = self.config.handshake_timeout;
600
601                let tls_service = TlsConnectorService {
602                    tcp_service: tcp_service_inner,
603                    tls_service: TlsConnector::service(tls),
604                    timeout: handshake_timeout,
605                };
606
607                Some(actix_service::boxed::rc_service(tls_service))
608            }
609
610            #[cfg(any(
611                feature = "rustls-0_22-webpki-roots",
612                feature = "rustls-0_22-native-roots",
613            ))]
614            OurTlsConnector::Rustls022(tls) => {
615                const H2: &[u8] = b"h2";
616
617                use actix_tls::connect::rustls_0_22::{reexports::AsyncTlsStream, TlsConnector};
618
619                #[allow(non_local_definitions)]
620                impl<Io: ConnectionIo> IntoConnectionIo for TcpConnection<Uri, AsyncTlsStream<Io>> {
621                    fn into_connection_io(self) -> (Box<dyn ConnectionIo>, Protocol) {
622                        let sock = self.into_parts().0;
623                        let h2 = sock
624                            .get_ref()
625                            .1
626                            .alpn_protocol()
627                            .is_some_and(|protos| protos.windows(2).any(|w| w == H2));
628
629                        if h2 {
630                            (Box::new(sock), Protocol::Http2)
631                        } else {
632                            (Box::new(sock), Protocol::Http1)
633                        }
634                    }
635                }
636
637                let handshake_timeout = self.config.handshake_timeout;
638
639                let tls_service = TlsConnectorService {
640                    tcp_service: tcp_service_inner,
641                    tls_service: TlsConnector::service(tls),
642                    timeout: handshake_timeout,
643                };
644
645                Some(actix_service::boxed::rc_service(tls_service))
646            }
647
648            #[cfg(feature = "rustls-0_23")]
649            OurTlsConnector::Rustls023(tls) => {
650                const H2: &[u8] = b"h2";
651
652                use actix_tls::connect::rustls_0_23::{reexports::AsyncTlsStream, TlsConnector};
653
654                #[allow(non_local_definitions)]
655                impl<Io: ConnectionIo> IntoConnectionIo for TcpConnection<Uri, AsyncTlsStream<Io>> {
656                    fn into_connection_io(self) -> (Box<dyn ConnectionIo>, Protocol) {
657                        let sock = self.into_parts().0;
658                        let h2 = sock
659                            .get_ref()
660                            .1
661                            .alpn_protocol()
662                            .is_some_and(|protos| protos.windows(2).any(|w| w == H2));
663
664                        if h2 {
665                            (Box::new(sock), Protocol::Http2)
666                        } else {
667                            (Box::new(sock), Protocol::Http1)
668                        }
669                    }
670                }
671
672                let handshake_timeout = self.config.handshake_timeout;
673
674                let tls_service = TlsConnectorService {
675                    tcp_service: tcp_service_inner,
676                    tls_service: TlsConnector::service(tls),
677                    timeout: handshake_timeout,
678                };
679
680                Some(actix_service::boxed::rc_service(tls_service))
681            }
682        };
683
684        let tcp_config = self.config.no_disconnect_timeout();
685
686        let tcp_pool = ConnectionPool::new(tcp_service, tcp_config);
687
688        let tls_config = self.config;
689        let tls_pool =
690            tls_service.map(move |tls_service| ConnectionPool::new(tls_service, tls_config));
691
692        ConnectorServicePriv { tcp_pool, tls_pool }
693    }
694}
695
696/// tcp service for map `TcpConnection<Uri, Io>` type to `(Io, Protocol)`
697#[derive(Clone)]
698pub struct TcpConnectorService<S: Clone> {
699    service: S,
700}
701
702impl<S, Io> Service<Connect> for TcpConnectorService<S>
703where
704    S: Service<Connect, Response = TcpConnection<Uri, Io>, Error = ConnectError> + Clone + 'static,
705{
706    type Response = (Io, Protocol);
707    type Error = ConnectError;
708    type Future = TcpConnectorFuture<S::Future>;
709
710    actix_service::forward_ready!(service);
711
712    fn call(&self, req: Connect) -> Self::Future {
713        TcpConnectorFuture {
714            fut: self.service.call(req),
715        }
716    }
717}
718
719pin_project! {
720    #[project = TcpConnectorFutureProj]
721    pub struct TcpConnectorFuture<Fut> {
722        #[pin]
723        fut: Fut,
724    }
725}
726
727impl<Fut, Io> Future for TcpConnectorFuture<Fut>
728where
729    Fut: Future<Output = Result<TcpConnection<Uri, Io>, ConnectError>>,
730{
731    type Output = Result<(Io, Protocol), ConnectError>;
732
733    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
734        self.project()
735            .fut
736            .poll(cx)
737            .map_ok(|res| (res.into_parts().0, Protocol::Http1))
738    }
739}
740
741/// service for establish tcp connection and do client tls handshake.
742/// operation is canceled when timeout limit reached.
743#[cfg(any(
744    feature = "dangerous-h2c",
745    feature = "openssl",
746    feature = "rustls-0_20",
747    feature = "rustls-0_21",
748    feature = "rustls-0_22-webpki-roots",
749    feature = "rustls-0_22-native-roots",
750    feature = "rustls-0_23",
751    feature = "rustls-0_23-webpki-roots",
752    feature = "rustls-0_23-native-roots"
753))]
754struct TlsConnectorService<Tcp, Tls> {
755    /// TCP connection is canceled on `TcpConnectorInnerService`'s timeout setting.
756    tcp_service: Tcp,
757
758    /// TLS connection is canceled on `TlsConnectorService`'s timeout setting.
759    tls_service: Tls,
760
761    timeout: Duration,
762}
763
764#[cfg(any(
765    feature = "dangerous-h2c",
766    feature = "openssl",
767    feature = "rustls-0_20",
768    feature = "rustls-0_21",
769    feature = "rustls-0_22-webpki-roots",
770    feature = "rustls-0_22-native-roots",
771    feature = "rustls-0_23",
772))]
773impl<Tcp, Tls, IO> Service<Connect> for TlsConnectorService<Tcp, Tls>
774where
775    Tcp:
776        Service<Connect, Response = TcpConnection<Uri, IO>, Error = ConnectError> + Clone + 'static,
777    Tls: Service<TcpConnection<Uri, IO>, Error = std::io::Error> + Clone + 'static,
778    Tls::Response: IntoConnectionIo,
779    IO: ConnectionIo,
780{
781    type Response = (Box<dyn ConnectionIo>, Protocol);
782    type Error = ConnectError;
783    type Future = TlsConnectorFuture<Tls, Tcp::Future, Tls::Future>;
784
785    fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
786        ready!(self.tcp_service.poll_ready(cx))?;
787        ready!(self.tls_service.poll_ready(cx))?;
788        Poll::Ready(Ok(()))
789    }
790
791    fn call(&self, req: Connect) -> Self::Future {
792        let fut = self.tcp_service.call(req);
793        let tls_service = self.tls_service.clone();
794        let timeout = self.timeout;
795
796        TlsConnectorFuture::TcpConnect {
797            fut,
798            tls_service: Some(tls_service),
799            timeout,
800        }
801    }
802}
803
804pin_project! {
805    #[project = TlsConnectorProj]
806    #[allow(clippy::large_enum_variant)]
807    enum TlsConnectorFuture<S, Fut1, Fut2> {
808        TcpConnect {
809            #[pin]
810            fut: Fut1,
811            tls_service: Option<S>,
812            timeout: Duration,
813        },
814        TlsConnect {
815            #[pin]
816            fut: Fut2,
817            #[pin]
818            timeout: Sleep,
819        },
820    }
821
822}
823/// helper trait for generic over different TlsStream types between tls crates.
824trait IntoConnectionIo {
825    fn into_connection_io(self) -> (Box<dyn ConnectionIo>, Protocol);
826}
827
828impl<S, Io, Fut1, Fut2, Res> Future for TlsConnectorFuture<S, Fut1, Fut2>
829where
830    S: Service<TcpConnection<Uri, Io>, Response = Res, Error = std::io::Error, Future = Fut2>,
831    S::Response: IntoConnectionIo,
832    Fut1: Future<Output = Result<TcpConnection<Uri, Io>, ConnectError>>,
833    Fut2: Future<Output = Result<S::Response, S::Error>>,
834    Io: ConnectionIo,
835{
836    type Output = Result<(Box<dyn ConnectionIo>, Protocol), ConnectError>;
837
838    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
839        match self.as_mut().project() {
840            TlsConnectorProj::TcpConnect {
841                fut,
842                tls_service,
843                timeout,
844            } => {
845                let res = ready!(fut.poll(cx))?;
846                let fut = tls_service
847                    .take()
848                    .expect("TlsConnectorFuture polled after complete")
849                    .call(res);
850                let timeout = sleep(*timeout);
851                self.set(TlsConnectorFuture::TlsConnect { fut, timeout });
852                self.poll(cx)
853            }
854            TlsConnectorProj::TlsConnect { fut, timeout } => match fut.poll(cx)? {
855                Poll::Ready(res) => Poll::Ready(Ok(res.into_connection_io())),
856                Poll::Pending => timeout.poll(cx).map(|_| Err(ConnectError::Timeout)),
857            },
858        }
859    }
860}
861
862/// service for establish tcp connection.
863/// operation is canceled when timeout limit reached.
864#[derive(Clone)]
865pub struct TcpConnectorInnerService<S: Clone> {
866    service: S,
867    timeout: Duration,
868    local_address: Option<std::net::IpAddr>,
869}
870
871impl<S: Clone> TcpConnectorInnerService<S> {
872    fn new(service: S, timeout: Duration, local_address: Option<std::net::IpAddr>) -> Self {
873        Self {
874            service,
875            timeout,
876            local_address,
877        }
878    }
879}
880
881impl<S, Io> Service<Connect> for TcpConnectorInnerService<S>
882where
883    S: Service<ConnectInfo<Uri>, Response = TcpConnection<Uri, Io>, Error = TcpConnectError>
884        + Clone
885        + 'static,
886{
887    type Response = S::Response;
888    type Error = ConnectError;
889    type Future = TcpConnectorInnerFuture<S::Future>;
890
891    actix_service::forward_ready!(service);
892
893    fn call(&self, req: Connect) -> Self::Future {
894        let mut req = ConnectInfo::new(req.uri).set_addr(req.addr);
895
896        if let Some(local_addr) = self.local_address {
897            req = req.set_local_addr(local_addr);
898        }
899
900        TcpConnectorInnerFuture {
901            fut: self.service.call(req),
902            timeout: sleep(self.timeout),
903        }
904    }
905}
906
907pin_project! {
908    #[project = TcpConnectorInnerFutureProj]
909    pub struct TcpConnectorInnerFuture<Fut> {
910        #[pin]
911        fut: Fut,
912        #[pin]
913        timeout: Sleep,
914    }
915}
916
917impl<Fut, Io> Future for TcpConnectorInnerFuture<Fut>
918where
919    Fut: Future<Output = Result<TcpConnection<Uri, Io>, TcpConnectError>>,
920{
921    type Output = Result<TcpConnection<Uri, Io>, ConnectError>;
922
923    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
924        let this = self.project();
925        match this.fut.poll(cx) {
926            Poll::Ready(res) => Poll::Ready(res.map_err(ConnectError::from)),
927            Poll::Pending => this.timeout.poll(cx).map(|_| Err(ConnectError::Timeout)),
928        }
929    }
930}
931
932/// Connector service for pooled Plain/Tls Tcp connections.
933pub type ConnectorService<Svc, IO> = ConnectorServicePriv<
934    TcpConnectorService<TcpConnectorInnerService<Svc>>,
935    Rc<
936        dyn Service<
937            Connect,
938            Response = (Box<dyn ConnectionIo>, Protocol),
939            Error = ConnectError,
940            Future = LocalBoxFuture<
941                'static,
942                Result<(Box<dyn ConnectionIo>, Protocol), ConnectError>,
943            >,
944        >,
945    >,
946    IO,
947    Box<dyn ConnectionIo>,
948>;
949
950pub struct ConnectorServicePriv<S1, S2, Io1, Io2>
951where
952    S1: Service<Connect, Response = (Io1, Protocol), Error = ConnectError>,
953    S2: Service<Connect, Response = (Io2, Protocol), Error = ConnectError>,
954    Io1: ConnectionIo,
955    Io2: ConnectionIo,
956{
957    tcp_pool: ConnectionPool<S1, Io1>,
958    tls_pool: Option<ConnectionPool<S2, Io2>>,
959}
960
961impl<S1, S2, Io1, Io2> Service<Connect> for ConnectorServicePriv<S1, S2, Io1, Io2>
962where
963    S1: Service<Connect, Response = (Io1, Protocol), Error = ConnectError> + Clone + 'static,
964    S2: Service<Connect, Response = (Io2, Protocol), Error = ConnectError> + Clone + 'static,
965    Io1: ConnectionIo,
966    Io2: ConnectionIo,
967{
968    type Response = Connection<Io1, Io2>;
969    type Error = ConnectError;
970    type Future = ConnectorServiceFuture<S1, S2, Io1, Io2>;
971
972    fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
973        ready!(self.tcp_pool.poll_ready(cx))?;
974        if let Some(ref tls_pool) = self.tls_pool {
975            ready!(tls_pool.poll_ready(cx))?;
976        }
977        Poll::Ready(Ok(()))
978    }
979
980    fn call(&self, req: Connect) -> Self::Future {
981        match req.uri.scheme_str() {
982            Some("https") | Some("wss") => match self.tls_pool {
983                None => ConnectorServiceFuture::SslIsNotSupported,
984                Some(ref pool) => ConnectorServiceFuture::Tls {
985                    fut: pool.call(req),
986                },
987            },
988            _ => ConnectorServiceFuture::Tcp {
989                fut: self.tcp_pool.call(req),
990            },
991        }
992    }
993}
994
995pin_project! {
996    #[project = ConnectorServiceFutureProj]
997    pub enum ConnectorServiceFuture<S1, S2, Io1, Io2>
998    where
999        S1: Service<Connect, Response = (Io1, Protocol), Error = ConnectError>,
1000        S1: Clone,
1001        S1: 'static,
1002        S2: Service<Connect, Response = (Io2, Protocol), Error = ConnectError>,
1003        S2: Clone,
1004        S2: 'static,
1005        Io1: ConnectionIo,
1006        Io2: ConnectionIo,
1007    {
1008        Tcp {
1009            #[pin]
1010            fut: <ConnectionPool<S1, Io1> as Service<Connect>>::Future
1011        },
1012        Tls {
1013            #[pin]
1014            fut:  <ConnectionPool<S2, Io2> as Service<Connect>>::Future
1015        },
1016        SslIsNotSupported
1017    }
1018}
1019
1020impl<S1, S2, Io1, Io2> Future for ConnectorServiceFuture<S1, S2, Io1, Io2>
1021where
1022    S1: Service<Connect, Response = (Io1, Protocol), Error = ConnectError> + Clone + 'static,
1023    S2: Service<Connect, Response = (Io2, Protocol), Error = ConnectError> + Clone + 'static,
1024    Io1: ConnectionIo,
1025    Io2: ConnectionIo,
1026{
1027    type Output = Result<Connection<Io1, Io2>, ConnectError>;
1028
1029    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1030        match self.project() {
1031            ConnectorServiceFutureProj::Tcp { fut } => fut.poll(cx).map_ok(Connection::Tcp),
1032            ConnectorServiceFutureProj::Tls { fut } => fut.poll(cx).map_ok(Connection::Tls),
1033            ConnectorServiceFutureProj::SslIsNotSupported => {
1034                Poll::Ready(Err(ConnectError::SslIsNotSupported))
1035            }
1036        }
1037    }
1038}
1039
1040#[cfg(not(feature = "hickory-dns"))]
1041mod resolver {
1042    use super::*;
1043
1044    pub(super) fn resolver() -> Resolver {
1045        Resolver::default()
1046    }
1047}
1048
1049#[cfg(feature = "hickory-dns")]
1050mod resolver {
1051    use std::{cell::OnceCell, net::SocketAddr};
1052
1053    use actix_tls::connect::Resolve;
1054    use hickory_resolver::{
1055        config::{ResolverConfig, ResolverOpts},
1056        name_server::TokioConnectionProvider,
1057        system_conf::read_system_conf,
1058        TokioResolver,
1059    };
1060
1061    use super::*;
1062
1063    pub(super) fn resolver() -> Resolver {
1064        // new type for impl Resolve trait for TokioAsyncResolver.
1065        struct HickoryDnsResolver(TokioResolver);
1066
1067        impl Resolve for HickoryDnsResolver {
1068            fn lookup<'a>(
1069                &'a self,
1070                host: &'a str,
1071                port: u16,
1072            ) -> LocalBoxFuture<'a, Result<Vec<SocketAddr>, Box<dyn std::error::Error>>>
1073            {
1074                Box::pin(async move {
1075                    let res = self
1076                        .0
1077                        .lookup_ip(host)
1078                        .await?
1079                        .iter()
1080                        .map(|ip| SocketAddr::new(ip, port))
1081                        .collect();
1082                    Ok(res)
1083                })
1084            }
1085        }
1086
1087        // resolver struct is cached in thread local so new clients can reuse the existing instance
1088        thread_local! {
1089            static HICKORY_DNS_RESOLVER: OnceCell<Resolver> = const { OnceCell::new() };
1090        }
1091
1092        // get from thread local or construct a new hickory dns resolver.
1093        HICKORY_DNS_RESOLVER.with(|local| {
1094            local
1095                .get_or_init(|| {
1096                    let (cfg, opts) = match read_system_conf() {
1097                        Ok((cfg, opts)) => (cfg, opts),
1098                        Err(err) => {
1099                            log::error!("Hickory DNS can not load system config: {err}");
1100                            (ResolverConfig::default(), ResolverOpts::default())
1101                        }
1102                    };
1103
1104                    let resolver =
1105                        TokioResolver::builder_with_config(cfg, TokioConnectionProvider::default())
1106                            .with_options(opts)
1107                            .build();
1108
1109                    Resolver::custom(HickoryDnsResolver(resolver))
1110                })
1111                .clone()
1112        })
1113    }
1114}
1115
1116#[cfg(feature = "dangerous-h2c")]
1117#[cfg(test)]
1118mod tests {
1119    use std::convert::Infallible;
1120
1121    use actix_http::{HttpService, Request, Response, Version};
1122    use actix_http_test::test_server;
1123    use actix_service::ServiceFactoryExt as _;
1124
1125    use super::*;
1126    use crate::Client;
1127
1128    #[actix_rt::test]
1129    async fn h2c_connector() {
1130        let mut srv = test_server(|| {
1131            HttpService::build()
1132                .h2(|_req: Request| async { Ok::<_, Infallible>(Response::ok()) })
1133                .tcp()
1134                .map_err(|_| ())
1135        })
1136        .await;
1137
1138        let connector = Connector {
1139            connector: TcpConnector::new(resolver::resolver()).service(),
1140            config: ConnectorConfig::default(),
1141            tls: OurTlsConnector::None,
1142        };
1143
1144        let client = Client::builder().connector(connector).finish();
1145
1146        let request = client.get(srv.surl("/")).send();
1147        let response = request.await.unwrap();
1148        assert!(response.status().is_success());
1149        assert_eq!(response.version(), Version::HTTP_2);
1150
1151        srv.stop().await;
1152    }
1153}