Skip to main content

object_store/client/http/
connection.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::ClientOptions;
19use crate::client::builder::{HttpRequestBuilder, RequestBuilderError};
20use crate::client::{HttpRequest, HttpResponse, HttpResponseBody};
21use async_trait::async_trait;
22use http::{Method, Uri};
23use http_body_util::BodyExt;
24use std::error::Error;
25use std::sync::Arc;
26use tokio::runtime::Handle;
27
28/// An HTTP protocol error
29///
30/// Clients should return this when an HTTP request fails to be completed, e.g. because
31/// of a connection issue. This does **not** include HTTP requests that are return
32/// non 2xx Status Codes, as these should instead be returned as an [`HttpResponse`]
33/// with the appropriate status code set.
34#[derive(Debug, thiserror::Error)]
35#[error("HTTP error: {source}")]
36pub struct HttpError {
37    kind: HttpErrorKind,
38    #[source]
39    source: Box<dyn Error + Send + Sync>,
40}
41
42/// Identifies the kind of [`HttpError`]
43///
44/// This is used, among other things, to determine if a request can be retried
45#[derive(Debug, Copy, Clone, PartialEq, Eq)]
46#[non_exhaustive]
47pub enum HttpErrorKind {
48    /// An error occurred whilst connecting to the remote
49    ///
50    /// Will be automatically retried
51    Connect,
52    /// An error occurred whilst making the request
53    ///
54    /// Will be automatically retried
55    Request,
56    /// Request timed out
57    ///
58    /// Will be automatically retried if the request is idempotent
59    Timeout,
60    /// The request was aborted
61    ///
62    /// Will be automatically retried if the request is idempotent
63    Interrupted,
64    /// An error occurred whilst decoding the response
65    ///
66    /// Will not be automatically retried
67    Decode,
68    /// An unknown error occurred
69    ///
70    /// Will not be automatically retried
71    Unknown,
72}
73
74impl HttpError {
75    /// Create a new [`HttpError`] with the optional status code
76    pub fn new<E>(kind: HttpErrorKind, e: E) -> Self
77    where
78        E: Error + Send + Sync + 'static,
79    {
80        Self {
81            kind,
82            source: Box::new(e),
83        }
84    }
85
86    pub(crate) fn reqwest(e: reqwest::Error) -> Self {
87        #[cfg(not(target_arch = "wasm32"))]
88        let is_connect = || e.is_connect();
89        #[cfg(target_arch = "wasm32")]
90        let is_connect = || false;
91
92        let mut kind = if e.is_timeout() {
93            HttpErrorKind::Timeout
94        } else if is_connect() {
95            HttpErrorKind::Connect
96        } else if e.is_decode() {
97            HttpErrorKind::Decode
98        } else {
99            HttpErrorKind::Unknown
100        };
101
102        // Reqwest error variants aren't great, attempt to refine them
103        let mut source = e.source();
104        while kind == HttpErrorKind::Unknown {
105            if let Some(e) = source {
106                if let Some(e) = e.downcast_ref::<hyper::Error>() {
107                    if e.is_closed() || e.is_incomplete_message() || e.is_body_write_aborted() {
108                        kind = HttpErrorKind::Request;
109                    } else if e.is_timeout() {
110                        kind = HttpErrorKind::Timeout;
111                    }
112                }
113                if let Some(e) = e.downcast_ref::<std::io::Error>() {
114                    match e.kind() {
115                        std::io::ErrorKind::TimedOut => kind = HttpErrorKind::Timeout,
116                        std::io::ErrorKind::ConnectionAborted
117                        | std::io::ErrorKind::ConnectionReset
118                        | std::io::ErrorKind::BrokenPipe
119                        | std::io::ErrorKind::UnexpectedEof => kind = HttpErrorKind::Interrupted,
120                        _ => {}
121                    }
122                }
123                source = e.source();
124            } else {
125                break;
126            }
127        }
128        Self {
129            kind,
130            // We strip URL as it will be included by RetryError if not sensitive
131            source: Box::new(e.without_url()),
132        }
133    }
134
135    /// Returns the [`HttpErrorKind`]
136    pub fn kind(&self) -> HttpErrorKind {
137        self.kind
138    }
139}
140
141/// An asynchronous function from a [`HttpRequest`] to a [`HttpResponse`].
142#[async_trait]
143pub trait HttpService: std::fmt::Debug + Send + Sync + 'static {
144    /// Perform [`HttpRequest`] returning [`HttpResponse`]
145    async fn call(&self, req: HttpRequest) -> Result<HttpResponse, HttpError>;
146}
147
148/// An HTTP client
149#[derive(Debug, Clone)]
150pub struct HttpClient(Arc<dyn HttpService>);
151
152impl HttpClient {
153    /// Create a new [`HttpClient`] from an [`HttpService`]
154    pub fn new(service: impl HttpService + 'static) -> Self {
155        Self(Arc::new(service))
156    }
157
158    /// Performs [`HttpRequest`] using this client
159    pub async fn execute(&self, request: HttpRequest) -> Result<HttpResponse, HttpError> {
160        self.0.call(request).await
161    }
162
163    #[allow(unused)]
164    pub(crate) fn get<U>(&self, url: U) -> HttpRequestBuilder
165    where
166        U: TryInto<Uri>,
167        U::Error: Into<RequestBuilderError>,
168    {
169        self.request(Method::GET, url)
170    }
171
172    #[allow(unused)]
173    pub(crate) fn post<U>(&self, url: U) -> HttpRequestBuilder
174    where
175        U: TryInto<Uri>,
176        U::Error: Into<RequestBuilderError>,
177    {
178        self.request(Method::POST, url)
179    }
180
181    #[allow(unused)]
182    pub(crate) fn put<U>(&self, url: U) -> HttpRequestBuilder
183    where
184        U: TryInto<Uri>,
185        U::Error: Into<RequestBuilderError>,
186    {
187        self.request(Method::PUT, url)
188    }
189
190    #[allow(unused)]
191    pub(crate) fn delete<U>(&self, url: U) -> HttpRequestBuilder
192    where
193        U: TryInto<Uri>,
194        U::Error: Into<RequestBuilderError>,
195    {
196        self.request(Method::DELETE, url)
197    }
198
199    pub(crate) fn request<U>(&self, method: Method, url: U) -> HttpRequestBuilder
200    where
201        U: TryInto<Uri>,
202        U::Error: Into<RequestBuilderError>,
203    {
204        HttpRequestBuilder::new(self.clone())
205            .uri(url)
206            .method(method)
207    }
208}
209
210#[async_trait]
211#[cfg(not(target_arch = "wasm32"))]
212impl HttpService for reqwest::Client {
213    async fn call(&self, req: HttpRequest) -> Result<HttpResponse, HttpError> {
214        let (parts, body) = req.into_parts();
215
216        let url = parts.uri.to_string().parse().unwrap();
217        let mut req = reqwest::Request::new(parts.method, url);
218        *req.headers_mut() = parts.headers;
219        *req.body_mut() = Some(body.into_reqwest());
220
221        let r = self.execute(req).await.map_err(HttpError::reqwest)?;
222        let res: http::Response<reqwest::Body> = r.into();
223        let (parts, body) = res.into_parts();
224
225        let body = HttpResponseBody::new(body.map_err(HttpError::reqwest));
226        Ok(HttpResponse::from_parts(parts, body))
227    }
228}
229
230#[async_trait]
231#[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
232impl HttpService for reqwest::Client {
233    async fn call(&self, req: HttpRequest) -> Result<HttpResponse, HttpError> {
234        use futures_channel::{mpsc, oneshot};
235        use futures_util::{SinkExt, StreamExt, TryStreamExt};
236        use http_body_util::{Empty, StreamBody};
237        use wasm_bindgen_futures::spawn_local;
238
239        let (parts, body) = req.into_parts();
240        let url = parts.uri.to_string().parse().unwrap();
241        let mut req = reqwest::Request::new(parts.method, url);
242        *req.headers_mut() = parts.headers;
243        *req.body_mut() = Some(body.into_reqwest());
244
245        let (mut tx, rx) = mpsc::channel(1);
246        let (tx_parts, rx_parts) = oneshot::channel();
247        let res_fut = self.execute(req);
248
249        spawn_local(async move {
250            match res_fut.await.map_err(HttpError::reqwest) {
251                Err(err) => {
252                    let _ = tx_parts.send(Err(err));
253                    drop(tx);
254                }
255                Ok(res) => {
256                    let (mut parts, _) = http::Response::new(Empty::<()>::new()).into_parts();
257                    parts.headers = res.headers().clone();
258                    parts.status = res.status();
259                    let _ = tx_parts.send(Ok(parts));
260                    let mut stream = res.bytes_stream().map_err(HttpError::reqwest);
261                    while let Some(chunk) = stream.next().await {
262                        if let Err(_e) = tx.send(chunk).await {
263                            // Disconnected due to a transitive drop of the receiver
264                            break;
265                        }
266                    }
267                }
268            }
269        });
270
271        let parts = rx_parts.await.unwrap()?;
272        let safe_stream = rx.map(|chunk| {
273            let frame = hyper::body::Frame::data(chunk?);
274            Ok(frame)
275        });
276        let body = HttpResponseBody::new(StreamBody::new(safe_stream));
277
278        Ok(HttpResponse::from_parts(parts, body))
279    }
280}
281
282/// A factory for [`HttpClient`]
283pub trait HttpConnector: std::fmt::Debug + Send + Sync + 'static {
284    /// Create a new [`HttpClient`] with the provided [`ClientOptions`]
285    fn connect(&self, options: &ClientOptions) -> crate::Result<HttpClient>;
286}
287
288/// [`HttpConnector`] using [`reqwest::Client`]
289#[derive(Debug, Default)]
290#[allow(missing_copy_implementations)]
291#[cfg(not(all(target_arch = "wasm32", target_os = "wasi")))]
292pub struct ReqwestConnector {}
293
294#[cfg(not(all(target_arch = "wasm32", target_os = "wasi")))]
295impl HttpConnector for ReqwestConnector {
296    fn connect(&self, options: &ClientOptions) -> crate::Result<HttpClient> {
297        let client = options.client()?;
298        Ok(HttpClient::new(client))
299    }
300}
301
302/// [`reqwest::Client`] connector that performs all I/O on the provided tokio
303/// [`Runtime`] (thread pool).
304///
305/// This adapter is most useful when you wish to segregate I/O from CPU bound
306/// work that may be happening on the [`Runtime`].
307///
308/// [`Runtime`]: tokio::runtime::Runtime
309///
310/// # Example: Spawning requests on separate runtime
311///
312/// ```
313/// # use std::sync::Arc;
314/// # use tokio::runtime::Runtime;
315/// # use object_store::azure::MicrosoftAzureBuilder;
316/// # use object_store::client::SpawnedReqwestConnector;
317/// # use object_store::ObjectStore;
318/// # fn get_io_runtime() -> Runtime {
319/// #   tokio::runtime::Builder::new_current_thread().build().unwrap()
320/// # }
321/// # fn main() -> Result<(), object_store::Error> {
322/// // create a tokio runtime for I/O.
323/// let io_runtime: Runtime = get_io_runtime();
324/// // configure a store using the runtime.
325/// let handle = io_runtime.handle().clone(); // get a handle to the same runtime
326/// let store: Arc<dyn ObjectStore> = Arc::new(
327///   MicrosoftAzureBuilder::new()
328///     .with_http_connector(SpawnedReqwestConnector::new(handle))
329///     .with_container_name("my_container")
330///     .with_account("my_account")
331///     .build()?
332///  );
333/// // any requests made using store will be spawned on the io_runtime
334/// # Ok(())
335/// # }
336/// ```
337#[derive(Debug)]
338#[allow(missing_copy_implementations)]
339#[cfg(not(target_arch = "wasm32"))]
340pub struct SpawnedReqwestConnector {
341    runtime: Handle,
342}
343
344#[cfg(not(target_arch = "wasm32"))]
345impl SpawnedReqwestConnector {
346    /// Create a new [`SpawnedReqwestConnector`] with the provided [`Handle`] to
347    /// a tokio [`Runtime`]
348    ///
349    /// [`Runtime`]: tokio::runtime::Runtime
350    pub fn new(runtime: Handle) -> Self {
351        Self { runtime }
352    }
353}
354
355#[cfg(not(target_arch = "wasm32"))]
356impl HttpConnector for SpawnedReqwestConnector {
357    fn connect(&self, options: &ClientOptions) -> crate::Result<HttpClient> {
358        let spawn_service = super::SpawnService::new(options.client()?, self.runtime.clone());
359        Ok(HttpClient::new(spawn_service))
360    }
361}
362
363#[cfg(all(target_arch = "wasm32", target_os = "wasi"))]
364pub(crate) fn http_connector(
365    custom: Option<Arc<dyn HttpConnector>>,
366) -> crate::Result<Arc<dyn HttpConnector>> {
367    match custom {
368        Some(x) => Ok(x),
369        None => Err(crate::Error::NotSupported {
370            source: "WASI architectures must provide an HTTPConnector"
371                .to_string()
372                .into(),
373        }),
374    }
375}
376
377#[cfg(not(all(target_arch = "wasm32", target_os = "wasi")))]
378pub(crate) fn http_connector(
379    custom: Option<Arc<dyn HttpConnector>>,
380) -> crate::Result<Arc<dyn HttpConnector>> {
381    match custom {
382        Some(x) => Ok(x),
383        None => Ok(Arc::new(ReqwestConnector {})),
384    }
385}