Skip to main content

object_store/client/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Generic utilities for [`reqwest`] based [`ObjectStore`] implementations
19//!
20//! [`ObjectStore`]: crate::ObjectStore
21
22pub(crate) mod backoff;
23
24#[cfg(not(target_arch = "wasm32"))]
25mod dns;
26
27#[cfg(not(target_arch = "wasm32"))]
28#[cfg(test)]
29pub(crate) mod mock_server;
30
31pub(crate) mod retry;
32
33#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
34pub(crate) mod pagination;
35
36pub(crate) mod get;
37
38#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
39pub(crate) mod list;
40
41#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
42pub(crate) mod token;
43
44pub(crate) mod header;
45
46#[cfg(any(feature = "aws", feature = "gcp"))]
47pub(crate) mod s3;
48
49pub(crate) mod builder;
50mod http;
51
52#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
53pub(crate) mod parts;
54pub use http::*;
55
56use async_trait::async_trait;
57use reqwest::header::{HeaderMap, HeaderValue};
58use serde::{Deserialize, Serialize};
59use std::collections::HashMap;
60use std::str::FromStr;
61use std::sync::Arc;
62use std::time::Duration;
63
64#[cfg(not(target_arch = "wasm32"))]
65use reqwest::{NoProxy, Proxy};
66
67use crate::config::{ConfigValue, fmt_duration};
68use crate::path::Path;
69use crate::{GetOptions, Result};
70
71fn map_client_error(e: reqwest::Error) -> super::Error {
72    super::Error::Generic {
73        store: "HTTP client",
74        source: Box::new(e),
75    }
76}
77
78static DEFAULT_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"),);
79
80/// Configuration keys for [`ClientOptions`]
81#[derive(PartialEq, Eq, Hash, Clone, Debug, Copy, Deserialize, Serialize)]
82#[non_exhaustive]
83pub enum ClientConfigKey {
84    /// Allow non-TLS, i.e. non-HTTPS connections
85    ///
86    /// Supported keys:
87    /// - `allow_http`
88    AllowHttp,
89    /// Skip certificate validation on https connections.
90    ///
91    /// <div class="warning">
92    ///
93    /// **Warning**
94    ///
95    /// You should think very carefully before using this method. If
96    /// invalid certificates are trusted, *any* certificate for *any* site
97    /// will be trusted for use. This includes expired certificates. This
98    /// introduces significant vulnerabilities, and should only be used
99    /// as a last resort or for testing
100    ///
101    /// </div>
102    ///
103    /// Supported keys:
104    /// - `allow_invalid_certificates`
105    AllowInvalidCertificates,
106    /// Timeout for only the connect phase of a Client
107    ///
108    /// Supported keys:
109    /// - `connect_timeout`
110    ConnectTimeout,
111    /// default [`Content-Type`](https://developer.mozilla.org/en-US/docs/Web/HTTP/Reference/Headers/Content-Type) for uploads
112    ///
113    /// Supported keys:
114    /// - `default_content_type`
115    DefaultContentType,
116    /// Only use HTTP/1 connections
117    ///
118    /// Supported keys:
119    /// - `http1_only`
120    Http1Only,
121    /// Interval for HTTP/2 Ping frames should be sent to keep a connection alive.
122    ///
123    /// Supported keys:
124    /// - `http2_keep_alive_interval`
125    Http2KeepAliveInterval,
126    /// Timeout for receiving an acknowledgement of the keep-alive ping.
127    ///
128    /// Supported keys:
129    /// - `http2_keep_alive_timeout`
130    Http2KeepAliveTimeout,
131    /// Enable HTTP/2 keep alive pings for idle connections
132    ///
133    /// Supported keys:
134    /// - `http2_keep_alive_while_idle`
135    Http2KeepAliveWhileIdle,
136    /// Sets the maximum frame size to use for HTTP/2.
137    ///
138    /// Supported keys:
139    /// - `http2_max_frame_size`
140    Http2MaxFrameSize,
141    /// Only use HTTP/2 connections
142    ///
143    /// Supported keys:
144    /// - `http2_only`
145    Http2Only,
146    /// The pool max idle timeout
147    ///
148    /// This is the length of time an idle connection will be kept alive
149    ///
150    /// Supported keys:
151    /// - `pool_idle_timeout`
152    PoolIdleTimeout,
153    /// maximum number of idle connections per host
154    ///
155    /// Supported keys:
156    /// - `pool_max_idle_per_host`
157    PoolMaxIdlePerHost,
158    /// HTTP proxy to use for requests
159    ///
160    /// Supported keys:
161    /// - `proxy_url`
162    ProxyUrl,
163    /// PEM-formatted CA certificate for proxy connections
164    ///
165    /// Supported keys:
166    /// - `proxy_ca_certificate`
167    ProxyCaCertificate,
168    /// List of hosts that bypass proxy
169    ///
170    /// Supported keys:
171    /// - `proxy_excludes`
172    ProxyExcludes,
173    /// Randomize order addresses that the DNS resolution yields.
174    ///
175    /// This will spread the connections across more servers.
176    ///
177    /// <div class="warning">
178    ///
179    /// **Warning**
180    ///
181    /// This will override the DNS resolver configured by [`reqwest`].
182    ///
183    /// </div>
184    ///
185    /// Supported keys:
186    /// - `randomize_addresses`
187    RandomizeAddresses,
188    /// Request timeout
189    ///
190    /// The timeout is applied from when the request starts connecting until the
191    /// response body has finished
192    ///
193    /// Supported keys:
194    /// - `timeout`
195    Timeout,
196    /// User-Agent header to be used by this client
197    ///
198    /// Supported keys:
199    /// - `user_agent`
200    UserAgent,
201}
202
203impl AsRef<str> for ClientConfigKey {
204    fn as_ref(&self) -> &str {
205        match self {
206            Self::AllowHttp => "allow_http",
207            Self::AllowInvalidCertificates => "allow_invalid_certificates",
208            Self::ConnectTimeout => "connect_timeout",
209            Self::DefaultContentType => "default_content_type",
210            Self::Http1Only => "http1_only",
211            Self::Http2Only => "http2_only",
212            Self::Http2KeepAliveInterval => "http2_keep_alive_interval",
213            Self::Http2KeepAliveTimeout => "http2_keep_alive_timeout",
214            Self::Http2KeepAliveWhileIdle => "http2_keep_alive_while_idle",
215            Self::Http2MaxFrameSize => "http2_max_frame_size",
216            Self::PoolIdleTimeout => "pool_idle_timeout",
217            Self::PoolMaxIdlePerHost => "pool_max_idle_per_host",
218            Self::ProxyUrl => "proxy_url",
219            Self::ProxyCaCertificate => "proxy_ca_certificate",
220            Self::ProxyExcludes => "proxy_excludes",
221            Self::RandomizeAddresses => "randomize_addresses",
222            Self::Timeout => "timeout",
223            Self::UserAgent => "user_agent",
224        }
225    }
226}
227
228impl FromStr for ClientConfigKey {
229    type Err = super::Error;
230
231    fn from_str(s: &str) -> Result<Self, Self::Err> {
232        match s {
233            "allow_http" => Ok(Self::AllowHttp),
234            "allow_invalid_certificates" => Ok(Self::AllowInvalidCertificates),
235            "connect_timeout" => Ok(Self::ConnectTimeout),
236            "default_content_type" => Ok(Self::DefaultContentType),
237            "http1_only" => Ok(Self::Http1Only),
238            "http2_only" => Ok(Self::Http2Only),
239            "http2_keep_alive_interval" => Ok(Self::Http2KeepAliveInterval),
240            "http2_keep_alive_timeout" => Ok(Self::Http2KeepAliveTimeout),
241            "http2_keep_alive_while_idle" => Ok(Self::Http2KeepAliveWhileIdle),
242            "http2_max_frame_size" => Ok(Self::Http2MaxFrameSize),
243            "pool_idle_timeout" => Ok(Self::PoolIdleTimeout),
244            "pool_max_idle_per_host" => Ok(Self::PoolMaxIdlePerHost),
245            "proxy_url" => Ok(Self::ProxyUrl),
246            "proxy_ca_certificate" => Ok(Self::ProxyCaCertificate),
247            "proxy_excludes" => Ok(Self::ProxyExcludes),
248            "randomize_addresses" => Ok(Self::RandomizeAddresses),
249            "timeout" => Ok(Self::Timeout),
250            "user_agent" => Ok(Self::UserAgent),
251            _ => Err(super::Error::UnknownConfigurationKey {
252                store: "HTTP",
253                key: s.into(),
254            }),
255        }
256    }
257}
258
259/// Represents a CA certificate provided by the user.
260///
261/// This is used to configure the client to trust a specific certificate. See
262/// [Self::from_pem] for an example
263#[derive(Debug, Clone)]
264#[cfg(not(target_arch = "wasm32"))]
265pub struct Certificate(reqwest::tls::Certificate);
266
267#[cfg(not(target_arch = "wasm32"))]
268impl Certificate {
269    /// Create a `Certificate` from a PEM encoded certificate.
270    ///
271    /// # Example from a PEM file
272    ///
273    /// ```no_run
274    /// # use object_store::Certificate;
275    /// # use std::fs::File;
276    /// # use std::io::Read;
277    /// let mut buf = Vec::new();
278    /// File::open("my_cert.pem").unwrap()
279    ///   .read_to_end(&mut buf).unwrap();
280    /// let cert = Certificate::from_pem(&buf).unwrap();
281    ///
282    /// ```
283    pub fn from_pem(pem: &[u8]) -> Result<Self> {
284        Ok(Self(
285            reqwest::tls::Certificate::from_pem(pem).map_err(map_client_error)?,
286        ))
287    }
288
289    /// Create a collection of `Certificate` from a PEM encoded certificate
290    /// bundle.
291    ///
292    /// Files that contain such collections have extensions such as `.crt`,
293    /// `.cer` and `.pem` files.
294    pub fn from_pem_bundle(pem_bundle: &[u8]) -> Result<Vec<Self>> {
295        Ok(reqwest::tls::Certificate::from_pem_bundle(pem_bundle)
296            .map_err(map_client_error)?
297            .into_iter()
298            .map(Self)
299            .collect())
300    }
301
302    /// Create a `Certificate` from a binary DER encoded certificate.
303    pub fn from_der(der: &[u8]) -> Result<Self> {
304        Ok(Self(
305            reqwest::tls::Certificate::from_der(der).map_err(map_client_error)?,
306        ))
307    }
308}
309
310/// HTTP client configuration for remote object stores
311#[derive(Debug, Clone)]
312pub struct ClientOptions {
313    user_agent: Option<ConfigValue<HeaderValue>>,
314    #[cfg(not(target_arch = "wasm32"))]
315    root_certificates: Vec<Certificate>,
316    content_type_map: HashMap<String, String>,
317    default_content_type: Option<String>,
318    default_headers: Option<HeaderMap>,
319    proxy_url: Option<String>,
320    proxy_ca_certificate: Option<String>,
321    proxy_excludes: Option<String>,
322    allow_http: ConfigValue<bool>,
323    allow_insecure: ConfigValue<bool>,
324    timeout: Option<ConfigValue<Duration>>,
325    connect_timeout: Option<ConfigValue<Duration>>,
326    pool_idle_timeout: Option<ConfigValue<Duration>>,
327    pool_max_idle_per_host: Option<ConfigValue<usize>>,
328    http2_keep_alive_interval: Option<ConfigValue<Duration>>,
329    http2_keep_alive_timeout: Option<ConfigValue<Duration>>,
330    http2_keep_alive_while_idle: ConfigValue<bool>,
331    http2_max_frame_size: Option<ConfigValue<u32>>,
332    http1_only: ConfigValue<bool>,
333    http2_only: ConfigValue<bool>,
334    randomize_addresses: ConfigValue<bool>,
335}
336
337impl Default for ClientOptions {
338    fn default() -> Self {
339        // Defaults based on
340        // <https://docs.aws.amazon.com/sdkref/latest/guide/feature-smart-config-defaults.html>
341        // <https://docs.aws.amazon.com/whitepapers/latest/s3-optimizing-performance-best-practices/timeouts-and-retries-for-latency-sensitive-applications.html>
342        // Which recommend a connection timeout of 3.1s and a request timeout of 2s
343        //
344        // As object store requests may involve the transfer of non-trivial volumes of data
345        // we opt for a slightly higher default timeout of 30 seconds
346        Self {
347            user_agent: None,
348            #[cfg(not(target_arch = "wasm32"))]
349            root_certificates: Default::default(),
350            content_type_map: Default::default(),
351            default_content_type: None,
352            default_headers: None,
353            proxy_url: None,
354            proxy_ca_certificate: None,
355            proxy_excludes: None,
356            allow_http: Default::default(),
357            allow_insecure: Default::default(),
358            timeout: Some(Duration::from_secs(30).into()),
359            connect_timeout: Some(Duration::from_secs(5).into()),
360            pool_idle_timeout: None,
361            pool_max_idle_per_host: None,
362            http2_keep_alive_interval: None,
363            http2_keep_alive_timeout: None,
364            http2_keep_alive_while_idle: Default::default(),
365            http2_max_frame_size: None,
366            // HTTP/2 is known to be significantly slower than HTTP/1, so we default
367            // to HTTP/1 for now.
368            // https://github.com/apache/arrow-rs/issues/5194
369            http1_only: true.into(),
370            http2_only: Default::default(),
371            randomize_addresses: true.into(),
372        }
373    }
374}
375
376impl ClientOptions {
377    /// Create a new [`ClientOptions`] with default values
378    pub fn new() -> Self {
379        Default::default()
380    }
381
382    /// Set an option by key
383    pub fn with_config(mut self, key: ClientConfigKey, value: impl Into<String>) -> Self {
384        match key {
385            ClientConfigKey::AllowHttp => self.allow_http.parse(value),
386            ClientConfigKey::AllowInvalidCertificates => self.allow_insecure.parse(value),
387            ClientConfigKey::ConnectTimeout => {
388                self.connect_timeout = Some(ConfigValue::Deferred(value.into()))
389            }
390            ClientConfigKey::DefaultContentType => self.default_content_type = Some(value.into()),
391            ClientConfigKey::Http1Only => self.http1_only.parse(value),
392            ClientConfigKey::Http2Only => self.http2_only.parse(value),
393            ClientConfigKey::Http2KeepAliveInterval => {
394                self.http2_keep_alive_interval = Some(ConfigValue::Deferred(value.into()))
395            }
396            ClientConfigKey::Http2KeepAliveTimeout => {
397                self.http2_keep_alive_timeout = Some(ConfigValue::Deferred(value.into()))
398            }
399            ClientConfigKey::Http2KeepAliveWhileIdle => {
400                self.http2_keep_alive_while_idle.parse(value)
401            }
402            ClientConfigKey::Http2MaxFrameSize => {
403                self.http2_max_frame_size = Some(ConfigValue::Deferred(value.into()))
404            }
405            ClientConfigKey::PoolIdleTimeout => {
406                self.pool_idle_timeout = Some(ConfigValue::Deferred(value.into()))
407            }
408            ClientConfigKey::PoolMaxIdlePerHost => {
409                self.pool_max_idle_per_host = Some(ConfigValue::Deferred(value.into()))
410            }
411            ClientConfigKey::ProxyUrl => self.proxy_url = Some(value.into()),
412            ClientConfigKey::ProxyCaCertificate => self.proxy_ca_certificate = Some(value.into()),
413            ClientConfigKey::ProxyExcludes => self.proxy_excludes = Some(value.into()),
414            ClientConfigKey::RandomizeAddresses => {
415                self.randomize_addresses.parse(value);
416            }
417            ClientConfigKey::Timeout => self.timeout = Some(ConfigValue::Deferred(value.into())),
418            ClientConfigKey::UserAgent => {
419                self.user_agent = Some(ConfigValue::Deferred(value.into()))
420            }
421        }
422        self
423    }
424
425    /// Get an option by key
426    pub fn get_config_value(&self, key: &ClientConfigKey) -> Option<String> {
427        match key {
428            ClientConfigKey::AllowHttp => Some(self.allow_http.to_string()),
429            ClientConfigKey::AllowInvalidCertificates => Some(self.allow_insecure.to_string()),
430            ClientConfigKey::ConnectTimeout => self.connect_timeout.as_ref().map(fmt_duration),
431            ClientConfigKey::DefaultContentType => self.default_content_type.clone(),
432            ClientConfigKey::Http1Only => Some(self.http1_only.to_string()),
433            ClientConfigKey::Http2KeepAliveInterval => {
434                self.http2_keep_alive_interval.as_ref().map(fmt_duration)
435            }
436            ClientConfigKey::Http2KeepAliveTimeout => {
437                self.http2_keep_alive_timeout.as_ref().map(fmt_duration)
438            }
439            ClientConfigKey::Http2KeepAliveWhileIdle => {
440                Some(self.http2_keep_alive_while_idle.to_string())
441            }
442            ClientConfigKey::Http2MaxFrameSize => {
443                self.http2_max_frame_size.as_ref().map(|v| v.to_string())
444            }
445            ClientConfigKey::Http2Only => Some(self.http2_only.to_string()),
446            ClientConfigKey::PoolIdleTimeout => self.pool_idle_timeout.as_ref().map(fmt_duration),
447            ClientConfigKey::PoolMaxIdlePerHost => {
448                self.pool_max_idle_per_host.as_ref().map(|v| v.to_string())
449            }
450            ClientConfigKey::ProxyUrl => self.proxy_url.clone(),
451            ClientConfigKey::ProxyCaCertificate => self.proxy_ca_certificate.clone(),
452            ClientConfigKey::ProxyExcludes => self.proxy_excludes.clone(),
453            ClientConfigKey::RandomizeAddresses => Some(self.randomize_addresses.to_string()),
454            ClientConfigKey::Timeout => self.timeout.as_ref().map(fmt_duration),
455            ClientConfigKey::UserAgent => self
456                .user_agent
457                .as_ref()
458                .and_then(|v| v.get().ok())
459                .and_then(|v| v.to_str().ok().map(|s| s.to_string())),
460        }
461    }
462
463    /// Sets the [`User-Agent`](https://developer.mozilla.org/en-US/docs/Web/HTTP/Reference/Headers/User-Agent) header to be used by this client
464    ///
465    /// Default is based on the version of this crate
466    pub fn with_user_agent(mut self, agent: HeaderValue) -> Self {
467        self.user_agent = Some(agent.into());
468        self
469    }
470
471    /// Add a custom root certificate.
472    ///
473    /// This can be used to connect to a server that has a self-signed
474    /// certificate for example.
475    #[cfg(not(target_arch = "wasm32"))]
476    pub fn with_root_certificate(mut self, certificate: Certificate) -> Self {
477        self.root_certificates.push(certificate);
478        self
479    }
480
481    /// Set the default [`Content-Type`](https://developer.mozilla.org/en-US/docs/Web/HTTP/Reference/Headers/Content-Type) for uploads
482    pub fn with_default_content_type(mut self, mime: impl Into<String>) -> Self {
483        self.default_content_type = Some(mime.into());
484        self
485    }
486
487    /// Set the [`Content-Type`](https://developer.mozilla.org/en-US/docs/Web/HTTP/Reference/Headers/Content-Type) for a given file extension
488    pub fn with_content_type_for_suffix(
489        mut self,
490        extension: impl Into<String>,
491        mime: impl Into<String>,
492    ) -> Self {
493        self.content_type_map.insert(extension.into(), mime.into());
494        self
495    }
496
497    /// Sets the default headers for every request
498    pub fn with_default_headers(mut self, headers: HeaderMap) -> Self {
499        self.default_headers = Some(headers);
500        self
501    }
502
503    /// Sets what protocol is allowed.
504    ///
505    /// If `allow_http` is :
506    /// * `false` (default):  Only HTTPS is allowed
507    /// * `true`:  HTTP and HTTPS are allowed
508    pub fn with_allow_http(mut self, allow_http: bool) -> Self {
509        self.allow_http = allow_http.into();
510        self
511    }
512    /// Allows connections to invalid SSL certificates
513    ///
514    /// If `allow_invalid_certificates` is :
515    /// * `false` (default):  Only valid HTTPS certificates are allowed
516    /// * `true`:  All HTTPS certificates are allowed
517    ///
518    /// <div class="warning">
519    ///
520    /// **Warning**
521    ///
522    /// You should think very carefully before using this method. If
523    /// invalid certificates are trusted, *any* certificate for *any* site
524    /// will be trusted for use. This includes expired certificates. This
525    /// introduces significant vulnerabilities, and should only be used
526    /// as a last resort or for testing
527    ///
528    /// </div>
529    pub fn with_allow_invalid_certificates(mut self, allow_insecure: bool) -> Self {
530        self.allow_insecure = allow_insecure.into();
531        self
532    }
533
534    /// Only use HTTP/1 connections (default)
535    ///
536    /// # See Also
537    /// * [`Self::with_http2_only`] if you only want to use HTTP/2
538    /// * [`Self::with_allow_http2`] if you want to use HTTP/1 or HTTP/2
539    ///
540    /// <div class="warning">
541    /// HTTP/2 is not used by default. See details [#104](https://github.com/apache/arrow-rs-object-store/issues/104)
542    /// </div>
543    pub fn with_http1_only(mut self) -> Self {
544        self.http2_only = false.into();
545        self.http1_only = true.into();
546        self
547    }
548
549    /// Only use HTTP/2 connections
550    ///
551    /// # See Also
552    /// * [`Self::with_http1_only`] if you only want to use HTTP/1
553    /// * [`Self::with_allow_http2`] if you want to use HTTP/1 or HTTP/2
554    ///
555    /// <div class="warning">
556    /// HTTP/2 is not used by default. See details [#104](https://github.com/apache/arrow-rs-object-store/issues/104)
557    /// </div>
558    pub fn with_http2_only(mut self) -> Self {
559        self.http1_only = false.into();
560        self.http2_only = true.into();
561        self
562    }
563
564    /// Use HTTP/2 if supported, otherwise use HTTP/1.
565    ///
566    /// # See Also
567    /// * [`Self::with_http1_only`] if you only want to use HTTP/1
568    /// * [`Self::with_http2_only`] if you only want to use HTTP/2
569    ///
570    /// <div class="warning">
571    /// HTTP/2 is not used by default. See details [#104](https://github.com/apache/arrow-rs-object-store/issues/104)
572    /// </div>
573    pub fn with_allow_http2(mut self) -> Self {
574        self.http1_only = false.into();
575        self.http2_only = false.into();
576        self
577    }
578
579    /// Set a proxy URL to use for requests
580    pub fn with_proxy_url(mut self, proxy_url: impl Into<String>) -> Self {
581        self.proxy_url = Some(proxy_url.into());
582        self
583    }
584
585    /// Set a trusted proxy CA certificate
586    pub fn with_proxy_ca_certificate(mut self, proxy_ca_certificate: impl Into<String>) -> Self {
587        self.proxy_ca_certificate = Some(proxy_ca_certificate.into());
588        self
589    }
590
591    /// Set a list of hosts to exclude from proxy connections
592    pub fn with_proxy_excludes(mut self, proxy_excludes: impl Into<String>) -> Self {
593        self.proxy_excludes = Some(proxy_excludes.into());
594        self
595    }
596
597    /// Set timeout for the overall request
598    ///
599    /// The timeout starts from when the request starts connecting until the
600    /// response body has finished. If the request does not complete within the
601    /// timeout, the client returns a timeout error.
602    ///
603    /// Timeout errors are retried, subject to the [`RetryConfig`]
604    ///
605    /// Default is 30 seconds
606    ///
607    /// # See Also
608    /// * [`Self::with_timeout_disabled`] to disable the timeout
609    /// * [`Self::with_connect_timeout`] to set a timeout for the connect phase
610    ///
611    /// [`RetryConfig`]: crate::RetryConfig
612    pub fn with_timeout(mut self, timeout: Duration) -> Self {
613        self.timeout = Some(ConfigValue::Parsed(timeout));
614        self
615    }
616
617    /// Disables the request timeout
618    ///
619    /// # See Also
620    /// * [`Self::with_timeout`]
621    pub fn with_timeout_disabled(mut self) -> Self {
622        self.timeout = None;
623        self
624    }
625
626    /// Set a timeout for only the connect phase of a Client
627    ///
628    /// This is the time allowed for the client to establish a connection
629    /// and if the connection is not established within this time,
630    /// the client returns a timeout error.
631    ///
632    /// Timeout errors are retried, subject to the [`RetryConfig`]
633    ///
634    /// Default is 5 seconds
635    ///
636    /// # See Also
637    /// * [`Self::with_timeout`] to set a timeout for the overall request
638    /// * [`Self::with_connect_timeout_disabled`] to disable the connect timeout
639    ///
640    /// [`RetryConfig`]: crate::RetryConfig
641    pub fn with_connect_timeout(mut self, timeout: Duration) -> Self {
642        self.connect_timeout = Some(ConfigValue::Parsed(timeout));
643        self
644    }
645
646    /// Disables the connection timeout
647    ///
648    /// # See Also
649    /// * [`Self::with_connect_timeout`]
650    pub fn with_connect_timeout_disabled(mut self) -> Self {
651        self.connect_timeout = None;
652        self
653    }
654
655    /// Set the pool max idle timeout
656    ///
657    /// This is the length of time an idle connection will be kept alive
658    ///
659    /// Default is 90 seconds enforced by reqwest
660    pub fn with_pool_idle_timeout(mut self, timeout: Duration) -> Self {
661        self.pool_idle_timeout = Some(ConfigValue::Parsed(timeout));
662        self
663    }
664
665    /// Set the maximum number of idle connections per host
666    ///
667    /// Default is no limit enforced by reqwest
668    pub fn with_pool_max_idle_per_host(mut self, max: usize) -> Self {
669        self.pool_max_idle_per_host = Some(max.into());
670        self
671    }
672
673    /// Sets an interval for HTTP/2 Ping frames should be sent to keep a connection alive.
674    ///
675    /// Default is disabled enforced by reqwest
676    pub fn with_http2_keep_alive_interval(mut self, interval: Duration) -> Self {
677        self.http2_keep_alive_interval = Some(ConfigValue::Parsed(interval));
678        self
679    }
680
681    /// Sets a timeout for receiving an acknowledgement of the keep-alive ping.
682    ///
683    /// If the ping is not acknowledged within the timeout, the connection will be closed.
684    /// Does nothing if `http2_keep_alive_interval` is disabled.
685    ///
686    /// Default is disabled enforced by reqwest
687    pub fn with_http2_keep_alive_timeout(mut self, interval: Duration) -> Self {
688        self.http2_keep_alive_timeout = Some(ConfigValue::Parsed(interval));
689        self
690    }
691
692    /// Enable HTTP/2 keep alive pings for idle connections
693    ///
694    /// If disabled, keep-alive pings are only sent while there are open request/response
695    /// streams. If enabled, pings are also sent when no streams are active
696    ///
697    /// Default is disabled enforced by reqwest
698    pub fn with_http2_keep_alive_while_idle(mut self) -> Self {
699        self.http2_keep_alive_while_idle = true.into();
700        self
701    }
702
703    /// Sets the maximum frame size to use for HTTP/2.
704    ///
705    /// Default is currently 16,384 but may change internally to optimize for common uses.
706    pub fn with_http2_max_frame_size(mut self, sz: u32) -> Self {
707        self.http2_max_frame_size = Some(ConfigValue::Parsed(sz));
708        self
709    }
710
711    /// Get the default headers defined through `ClientOptions::with_default_headers`
712    pub fn get_default_headers(&self) -> Option<&HeaderMap> {
713        self.default_headers.as_ref()
714    }
715
716    /// Get the mime type for the file in `path` to be uploaded
717    ///
718    /// Gets the file extension from `path`, and returns the
719    /// mime type if it was defined initially through
720    /// `ClientOptions::with_content_type_for_suffix`
721    ///
722    /// Otherwise, returns the default mime type if it was defined
723    /// earlier through `ClientOptions::with_default_content_type`
724    pub fn get_content_type(&self, path: &Path) -> Option<&str> {
725        match path.extension() {
726            Some(extension) => match self.content_type_map.get(extension) {
727                Some(ct) => Some(ct.as_str()),
728                None => self.default_content_type.as_deref(),
729            },
730            None => self.default_content_type.as_deref(),
731        }
732    }
733
734    /// Returns a copy of this [`ClientOptions`] with overrides necessary for metadata endpoint access
735    ///
736    /// In particular:
737    /// * Allows HTTP as metadata endpoints do not use TLS
738    /// * Configures a low connection timeout to provide quick feedback if not present
739    #[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
740    pub(crate) fn metadata_options(&self) -> Self {
741        self.clone()
742            .with_allow_http(true)
743            .with_connect_timeout(Duration::from_secs(1))
744    }
745
746    #[cfg(not(target_arch = "wasm32"))]
747    pub(crate) fn client(&self) -> Result<reqwest::Client> {
748        let mut builder = reqwest::ClientBuilder::new();
749
750        match &self.user_agent {
751            Some(user_agent) => builder = builder.user_agent(user_agent.get()?),
752            None => builder = builder.user_agent(DEFAULT_USER_AGENT),
753        }
754
755        if let Some(headers) = &self.default_headers {
756            builder = builder.default_headers(headers.clone())
757        }
758
759        if let Some(proxy) = &self.proxy_url {
760            let mut proxy = Proxy::all(proxy).map_err(map_client_error)?;
761
762            if let Some(certificate) = &self.proxy_ca_certificate {
763                let certificate = reqwest::tls::Certificate::from_pem(certificate.as_bytes())
764                    .map_err(map_client_error)?;
765
766                builder = builder.add_root_certificate(certificate);
767            }
768
769            if let Some(proxy_excludes) = &self.proxy_excludes {
770                let no_proxy = NoProxy::from_string(proxy_excludes);
771
772                proxy = proxy.no_proxy(no_proxy);
773            }
774
775            builder = builder.proxy(proxy);
776        }
777
778        for certificate in &self.root_certificates {
779            builder = builder.add_root_certificate(certificate.0.clone());
780        }
781
782        if let Some(timeout) = &self.timeout {
783            builder = builder.timeout(timeout.get()?)
784        }
785
786        if let Some(timeout) = &self.connect_timeout {
787            builder = builder.connect_timeout(timeout.get()?)
788        }
789
790        if let Some(timeout) = &self.pool_idle_timeout {
791            builder = builder.pool_idle_timeout(timeout.get()?)
792        }
793
794        if let Some(max) = &self.pool_max_idle_per_host {
795            builder = builder.pool_max_idle_per_host(max.get()?)
796        }
797
798        if let Some(interval) = &self.http2_keep_alive_interval {
799            builder = builder.http2_keep_alive_interval(interval.get()?)
800        }
801
802        if let Some(interval) = &self.http2_keep_alive_timeout {
803            builder = builder.http2_keep_alive_timeout(interval.get()?)
804        }
805
806        if self.http2_keep_alive_while_idle.get()? {
807            builder = builder.http2_keep_alive_while_idle(true)
808        }
809
810        if let Some(sz) = &self.http2_max_frame_size {
811            builder = builder.http2_max_frame_size(Some(sz.get()?))
812        }
813
814        if self.http1_only.get()? {
815            builder = builder.http1_only()
816        }
817
818        if self.http2_only.get()? {
819            builder = builder.http2_prior_knowledge()
820        }
821
822        if self.allow_insecure.get()? {
823            builder = builder.danger_accept_invalid_certs(true)
824        }
825
826        // Explicitly disable compression, since it may be automatically enabled
827        // when certain reqwest features are enabled. Compression interferes
828        // with the `Content-Length` header, which is used to determine the
829        // size of objects.
830        builder = builder.no_gzip().no_brotli().no_zstd().no_deflate();
831
832        if self.randomize_addresses.get()? {
833            builder = builder.dns_resolver(Arc::new(dns::ShuffleResolver));
834        }
835
836        builder
837            .https_only(!self.allow_http.get()?)
838            .build()
839            .map_err(map_client_error)
840    }
841
842    #[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
843    pub(crate) fn client(&self) -> Result<reqwest::Client> {
844        let mut builder = reqwest::ClientBuilder::new();
845
846        match &self.user_agent {
847            Some(user_agent) => builder = builder.user_agent(user_agent.get()?),
848            None => builder = builder.user_agent(DEFAULT_USER_AGENT),
849        }
850
851        if let Some(headers) = &self.default_headers {
852            builder = builder.default_headers(headers.clone())
853        }
854
855        builder.build().map_err(map_client_error)
856    }
857}
858
859pub(crate) trait GetOptionsExt {
860    fn with_get_options(self, options: GetOptions) -> Self;
861}
862
863impl GetOptionsExt for HttpRequestBuilder {
864    fn with_get_options(mut self, options: GetOptions) -> Self {
865        use hyper::header::*;
866
867        let GetOptions {
868            if_match,
869            if_none_match,
870            if_modified_since,
871            if_unmodified_since,
872            range,
873            version: _,
874            head: _,
875            extensions,
876        } = options;
877
878        if let Some(range) = range {
879            self = self.header(RANGE, range.to_string());
880        }
881
882        if let Some(tag) = if_match {
883            self = self.header(IF_MATCH, tag);
884        }
885
886        if let Some(tag) = if_none_match {
887            self = self.header(IF_NONE_MATCH, tag);
888        }
889
890        const DATE_FORMAT: &str = "%a, %d %b %Y %H:%M:%S GMT";
891        if let Some(date) = if_unmodified_since {
892            self = self.header(IF_UNMODIFIED_SINCE, date.format(DATE_FORMAT).to_string());
893        }
894
895        if let Some(date) = if_modified_since {
896            self = self.header(IF_MODIFIED_SINCE, date.format(DATE_FORMAT).to_string());
897        }
898
899        self = self.extensions(extensions);
900
901        self
902    }
903}
904
905/// Provides credentials for use when signing requests
906#[async_trait]
907pub trait CredentialProvider: std::fmt::Debug + Send + Sync {
908    /// The type of credential returned by this provider
909    type Credential;
910
911    /// Return a credential
912    async fn get_credential(&self) -> Result<Arc<Self::Credential>>;
913}
914
915/// A static set of credentials
916#[derive(Debug)]
917pub struct StaticCredentialProvider<T> {
918    credential: Arc<T>,
919}
920
921impl<T> StaticCredentialProvider<T> {
922    /// A [`CredentialProvider`] for a static credential of type `T`
923    pub fn new(credential: T) -> Self {
924        Self {
925            credential: Arc::new(credential),
926        }
927    }
928}
929
930#[async_trait]
931impl<T> CredentialProvider for StaticCredentialProvider<T>
932where
933    T: std::fmt::Debug + Send + Sync,
934{
935    type Credential = T;
936
937    async fn get_credential(&self) -> Result<Arc<T>> {
938        Ok(Arc::clone(&self.credential))
939    }
940}
941
942#[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
943mod cloud {
944    use super::*;
945    use crate::RetryConfig;
946    use crate::client::token::{TemporaryToken, TokenCache};
947
948    /// A [`CredentialProvider`] that uses [`HttpClient`] to fetch temporary tokens
949    #[derive(Debug)]
950    pub(crate) struct TokenCredentialProvider<T: TokenProvider> {
951        inner: T,
952        client: HttpClient,
953        retry: RetryConfig,
954        cache: TokenCache<Arc<T::Credential>>,
955    }
956
957    impl<T: TokenProvider> TokenCredentialProvider<T> {
958        pub(crate) fn new(inner: T, client: HttpClient, retry: RetryConfig) -> Self {
959            Self {
960                inner,
961                client,
962                retry,
963                cache: Default::default(),
964            }
965        }
966
967        /// Override the minimum remaining TTL for a cached token to be used
968        #[cfg(any(feature = "aws", feature = "gcp"))]
969        pub(crate) fn with_min_ttl(mut self, min_ttl: Duration) -> Self {
970            self.cache = self.cache.with_min_ttl(min_ttl);
971            self
972        }
973    }
974
975    #[async_trait]
976    impl<T: TokenProvider> CredentialProvider for TokenCredentialProvider<T> {
977        type Credential = T::Credential;
978
979        async fn get_credential(&self) -> Result<Arc<Self::Credential>> {
980            self.cache
981                .get_or_insert_with(|| self.inner.fetch_token(&self.client, &self.retry))
982                .await
983        }
984    }
985
986    #[async_trait]
987    pub(crate) trait TokenProvider: std::fmt::Debug + Send + Sync {
988        type Credential: std::fmt::Debug + Send + Sync;
989
990        async fn fetch_token(
991            &self,
992            client: &HttpClient,
993            retry: &RetryConfig,
994        ) -> Result<TemporaryToken<Arc<Self::Credential>>>;
995    }
996}
997
998use crate::client::builder::HttpRequestBuilder;
999#[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
1000pub(crate) use cloud::*;
1001
1002#[cfg(test)]
1003mod tests {
1004    use super::*;
1005    use std::collections::HashMap;
1006
1007    #[test]
1008    fn client_test_config_from_map() {
1009        let allow_http = "true".to_string();
1010        let allow_invalid_certificates = "false".to_string();
1011        let connect_timeout = "90 seconds".to_string();
1012        let default_content_type = "object_store:fake_default_content_type".to_string();
1013        let http1_only = "true".to_string();
1014        let http2_only = "false".to_string();
1015        let http2_keep_alive_interval = "90 seconds".to_string();
1016        let http2_keep_alive_timeout = "91 seconds".to_string();
1017        let http2_keep_alive_while_idle = "92 seconds".to_string();
1018        let http2_max_frame_size = "1337".to_string();
1019        let pool_idle_timeout = "93 seconds".to_string();
1020        let pool_max_idle_per_host = "94".to_string();
1021        let proxy_url = "https://fake_proxy_url".to_string();
1022        let timeout = "95 seconds".to_string();
1023        let user_agent = "object_store:fake_user_agent".to_string();
1024
1025        let options = HashMap::from([
1026            ("allow_http", allow_http.clone()),
1027            (
1028                "allow_invalid_certificates",
1029                allow_invalid_certificates.clone(),
1030            ),
1031            ("connect_timeout", connect_timeout.clone()),
1032            ("default_content_type", default_content_type.clone()),
1033            ("http1_only", http1_only.clone()),
1034            ("http2_only", http2_only.clone()),
1035            (
1036                "http2_keep_alive_interval",
1037                http2_keep_alive_interval.clone(),
1038            ),
1039            ("http2_keep_alive_timeout", http2_keep_alive_timeout.clone()),
1040            (
1041                "http2_keep_alive_while_idle",
1042                http2_keep_alive_while_idle.clone(),
1043            ),
1044            ("http2_max_frame_size", http2_max_frame_size.clone()),
1045            ("pool_idle_timeout", pool_idle_timeout.clone()),
1046            ("pool_max_idle_per_host", pool_max_idle_per_host.clone()),
1047            ("proxy_url", proxy_url.clone()),
1048            ("timeout", timeout.clone()),
1049            ("user_agent", user_agent.clone()),
1050        ]);
1051
1052        let builder = options
1053            .into_iter()
1054            .fold(ClientOptions::new(), |builder, (key, value)| {
1055                builder.with_config(key.parse().unwrap(), value)
1056            });
1057
1058        assert_eq!(
1059            builder
1060                .get_config_value(&ClientConfigKey::AllowHttp)
1061                .unwrap(),
1062            allow_http
1063        );
1064        assert_eq!(
1065            builder
1066                .get_config_value(&ClientConfigKey::AllowInvalidCertificates)
1067                .unwrap(),
1068            allow_invalid_certificates
1069        );
1070        assert_eq!(
1071            builder
1072                .get_config_value(&ClientConfigKey::ConnectTimeout)
1073                .unwrap(),
1074            connect_timeout
1075        );
1076        assert_eq!(
1077            builder
1078                .get_config_value(&ClientConfigKey::DefaultContentType)
1079                .unwrap(),
1080            default_content_type
1081        );
1082        assert_eq!(
1083            builder
1084                .get_config_value(&ClientConfigKey::Http1Only)
1085                .unwrap(),
1086            http1_only
1087        );
1088        assert_eq!(
1089            builder
1090                .get_config_value(&ClientConfigKey::Http2Only)
1091                .unwrap(),
1092            http2_only
1093        );
1094        assert_eq!(
1095            builder
1096                .get_config_value(&ClientConfigKey::Http2KeepAliveInterval)
1097                .unwrap(),
1098            http2_keep_alive_interval
1099        );
1100        assert_eq!(
1101            builder
1102                .get_config_value(&ClientConfigKey::Http2KeepAliveTimeout)
1103                .unwrap(),
1104            http2_keep_alive_timeout
1105        );
1106        assert_eq!(
1107            builder
1108                .get_config_value(&ClientConfigKey::Http2KeepAliveWhileIdle)
1109                .unwrap(),
1110            http2_keep_alive_while_idle
1111        );
1112        assert_eq!(
1113            builder
1114                .get_config_value(&ClientConfigKey::Http2MaxFrameSize)
1115                .unwrap(),
1116            http2_max_frame_size
1117        );
1118
1119        assert_eq!(
1120            builder
1121                .get_config_value(&ClientConfigKey::PoolIdleTimeout)
1122                .unwrap(),
1123            pool_idle_timeout
1124        );
1125        assert_eq!(
1126            builder
1127                .get_config_value(&ClientConfigKey::PoolMaxIdlePerHost)
1128                .unwrap(),
1129            pool_max_idle_per_host
1130        );
1131        assert_eq!(
1132            builder
1133                .get_config_value(&ClientConfigKey::ProxyUrl)
1134                .unwrap(),
1135            proxy_url
1136        );
1137        assert_eq!(
1138            builder.get_config_value(&ClientConfigKey::Timeout).unwrap(),
1139            timeout
1140        );
1141        assert_eq!(
1142            builder
1143                .get_config_value(&ClientConfigKey::UserAgent)
1144                .unwrap(),
1145            user_agent
1146        );
1147    }
1148}