object_store/client/http/
connection.rs1use crate::ClientOptions;
19use crate::client::builder::{HttpRequestBuilder, RequestBuilderError};
20use crate::client::{HttpRequest, HttpResponse, HttpResponseBody};
21use async_trait::async_trait;
22use http::{Method, Uri};
23use http_body_util::BodyExt;
24use std::error::Error;
25use std::sync::Arc;
26use tokio::runtime::Handle;
27
28#[derive(Debug, thiserror::Error)]
35#[error("HTTP error: {source}")]
36pub struct HttpError {
37 kind: HttpErrorKind,
38 #[source]
39 source: Box<dyn Error + Send + Sync>,
40}
41
42#[derive(Debug, Copy, Clone, PartialEq, Eq)]
46#[non_exhaustive]
47pub enum HttpErrorKind {
48 Connect,
52 Request,
56 Timeout,
60 Interrupted,
64 Decode,
68 Unknown,
72}
73
74impl HttpError {
75 pub fn new<E>(kind: HttpErrorKind, e: E) -> Self
77 where
78 E: Error + Send + Sync + 'static,
79 {
80 Self {
81 kind,
82 source: Box::new(e),
83 }
84 }
85
86 pub(crate) fn reqwest(e: reqwest::Error) -> Self {
87 #[cfg(not(target_arch = "wasm32"))]
88 let is_connect = || e.is_connect();
89 #[cfg(target_arch = "wasm32")]
90 let is_connect = || false;
91
92 let mut kind = if e.is_timeout() {
93 HttpErrorKind::Timeout
94 } else if is_connect() {
95 HttpErrorKind::Connect
96 } else if e.is_decode() {
97 HttpErrorKind::Decode
98 } else {
99 HttpErrorKind::Unknown
100 };
101
102 let mut source = e.source();
104 while kind == HttpErrorKind::Unknown {
105 if let Some(e) = source {
106 if let Some(e) = e.downcast_ref::<hyper::Error>() {
107 if e.is_closed() || e.is_incomplete_message() || e.is_body_write_aborted() {
108 kind = HttpErrorKind::Request;
109 } else if e.is_timeout() {
110 kind = HttpErrorKind::Timeout;
111 }
112 }
113 if let Some(e) = e.downcast_ref::<std::io::Error>() {
114 match e.kind() {
115 std::io::ErrorKind::TimedOut => kind = HttpErrorKind::Timeout,
116 std::io::ErrorKind::ConnectionAborted
117 | std::io::ErrorKind::ConnectionReset
118 | std::io::ErrorKind::BrokenPipe
119 | std::io::ErrorKind::UnexpectedEof => kind = HttpErrorKind::Interrupted,
120 _ => {}
121 }
122 }
123 source = e.source();
124 } else {
125 break;
126 }
127 }
128 Self {
129 kind,
130 source: Box::new(e.without_url()),
132 }
133 }
134
135 pub fn kind(&self) -> HttpErrorKind {
137 self.kind
138 }
139}
140
141#[async_trait]
143pub trait HttpService: std::fmt::Debug + Send + Sync + 'static {
144 async fn call(&self, req: HttpRequest) -> Result<HttpResponse, HttpError>;
146}
147
148#[derive(Debug, Clone)]
150pub struct HttpClient(Arc<dyn HttpService>);
151
152impl HttpClient {
153 pub fn new(service: impl HttpService + 'static) -> Self {
155 Self(Arc::new(service))
156 }
157
158 pub async fn execute(&self, request: HttpRequest) -> Result<HttpResponse, HttpError> {
160 self.0.call(request).await
161 }
162
163 #[allow(unused)]
164 pub(crate) fn get<U>(&self, url: U) -> HttpRequestBuilder
165 where
166 U: TryInto<Uri>,
167 U::Error: Into<RequestBuilderError>,
168 {
169 self.request(Method::GET, url)
170 }
171
172 #[allow(unused)]
173 pub(crate) fn post<U>(&self, url: U) -> HttpRequestBuilder
174 where
175 U: TryInto<Uri>,
176 U::Error: Into<RequestBuilderError>,
177 {
178 self.request(Method::POST, url)
179 }
180
181 #[allow(unused)]
182 pub(crate) fn put<U>(&self, url: U) -> HttpRequestBuilder
183 where
184 U: TryInto<Uri>,
185 U::Error: Into<RequestBuilderError>,
186 {
187 self.request(Method::PUT, url)
188 }
189
190 #[allow(unused)]
191 pub(crate) fn delete<U>(&self, url: U) -> HttpRequestBuilder
192 where
193 U: TryInto<Uri>,
194 U::Error: Into<RequestBuilderError>,
195 {
196 self.request(Method::DELETE, url)
197 }
198
199 pub(crate) fn request<U>(&self, method: Method, url: U) -> HttpRequestBuilder
200 where
201 U: TryInto<Uri>,
202 U::Error: Into<RequestBuilderError>,
203 {
204 HttpRequestBuilder::new(self.clone())
205 .uri(url)
206 .method(method)
207 }
208}
209
210#[async_trait]
211#[cfg(not(target_arch = "wasm32"))]
212impl HttpService for reqwest::Client {
213 async fn call(&self, req: HttpRequest) -> Result<HttpResponse, HttpError> {
214 let (parts, body) = req.into_parts();
215
216 let url = parts.uri.to_string().parse().unwrap();
217 let mut req = reqwest::Request::new(parts.method, url);
218 *req.headers_mut() = parts.headers;
219 *req.body_mut() = Some(body.into_reqwest());
220
221 let r = self.execute(req).await.map_err(HttpError::reqwest)?;
222 let res: http::Response<reqwest::Body> = r.into();
223 let (parts, body) = res.into_parts();
224
225 let body = HttpResponseBody::new(body.map_err(HttpError::reqwest));
226 Ok(HttpResponse::from_parts(parts, body))
227 }
228}
229
230#[async_trait]
231#[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
232impl HttpService for reqwest::Client {
233 async fn call(&self, req: HttpRequest) -> Result<HttpResponse, HttpError> {
234 use futures_channel::{mpsc, oneshot};
235 use futures_util::{SinkExt, StreamExt, TryStreamExt};
236 use http_body_util::{Empty, StreamBody};
237 use wasm_bindgen_futures::spawn_local;
238
239 let (parts, body) = req.into_parts();
240 let url = parts.uri.to_string().parse().unwrap();
241 let mut req = reqwest::Request::new(parts.method, url);
242 *req.headers_mut() = parts.headers;
243 *req.body_mut() = Some(body.into_reqwest());
244
245 let (mut tx, rx) = mpsc::channel(1);
246 let (tx_parts, rx_parts) = oneshot::channel();
247 let res_fut = self.execute(req);
248
249 spawn_local(async move {
250 match res_fut.await.map_err(HttpError::reqwest) {
251 Err(err) => {
252 let _ = tx_parts.send(Err(err));
253 drop(tx);
254 }
255 Ok(res) => {
256 let (mut parts, _) = http::Response::new(Empty::<()>::new()).into_parts();
257 parts.headers = res.headers().clone();
258 parts.status = res.status();
259 let _ = tx_parts.send(Ok(parts));
260 let mut stream = res.bytes_stream().map_err(HttpError::reqwest);
261 while let Some(chunk) = stream.next().await {
262 if let Err(_e) = tx.send(chunk).await {
263 break;
265 }
266 }
267 }
268 }
269 });
270
271 let parts = rx_parts.await.unwrap()?;
272 let safe_stream = rx.map(|chunk| {
273 let frame = hyper::body::Frame::data(chunk?);
274 Ok(frame)
275 });
276 let body = HttpResponseBody::new(StreamBody::new(safe_stream));
277
278 Ok(HttpResponse::from_parts(parts, body))
279 }
280}
281
282pub trait HttpConnector: std::fmt::Debug + Send + Sync + 'static {
284 fn connect(&self, options: &ClientOptions) -> crate::Result<HttpClient>;
286}
287
288#[derive(Debug, Default)]
290#[allow(missing_copy_implementations)]
291#[cfg(not(all(target_arch = "wasm32", target_os = "wasi")))]
292pub struct ReqwestConnector {}
293
294#[cfg(not(all(target_arch = "wasm32", target_os = "wasi")))]
295impl HttpConnector for ReqwestConnector {
296 fn connect(&self, options: &ClientOptions) -> crate::Result<HttpClient> {
297 let client = options.client()?;
298 Ok(HttpClient::new(client))
299 }
300}
301
302#[derive(Debug)]
338#[allow(missing_copy_implementations)]
339#[cfg(not(target_arch = "wasm32"))]
340pub struct SpawnedReqwestConnector {
341 runtime: Handle,
342}
343
344#[cfg(not(target_arch = "wasm32"))]
345impl SpawnedReqwestConnector {
346 pub fn new(runtime: Handle) -> Self {
351 Self { runtime }
352 }
353}
354
355#[cfg(not(target_arch = "wasm32"))]
356impl HttpConnector for SpawnedReqwestConnector {
357 fn connect(&self, options: &ClientOptions) -> crate::Result<HttpClient> {
358 let spawn_service = super::SpawnService::new(options.client()?, self.runtime.clone());
359 Ok(HttpClient::new(spawn_service))
360 }
361}
362
363#[cfg(all(target_arch = "wasm32", target_os = "wasi"))]
364pub(crate) fn http_connector(
365 custom: Option<Arc<dyn HttpConnector>>,
366) -> crate::Result<Arc<dyn HttpConnector>> {
367 match custom {
368 Some(x) => Ok(x),
369 None => Err(crate::Error::NotSupported {
370 source: "WASI architectures must provide an HTTPConnector"
371 .to_string()
372 .into(),
373 }),
374 }
375}
376
377#[cfg(not(all(target_arch = "wasm32", target_os = "wasi")))]
378pub(crate) fn http_connector(
379 custom: Option<Arc<dyn HttpConnector>>,
380) -> crate::Result<Arc<dyn HttpConnector>> {
381 match custom {
382 Some(x) => Ok(x),
383 None => Ok(Arc::new(ReqwestConnector {})),
384 }
385}