Skip to main content

object_store/client/http/
body.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::client::{HttpError, HttpErrorKind};
19use crate::{PutPayload, collect_bytes};
20use bytes::Bytes;
21use futures_util::StreamExt;
22use futures_util::stream::BoxStream;
23use http_body_util::combinators::BoxBody;
24use http_body_util::{BodyExt, Full};
25use hyper::body::{Body, Frame, SizeHint};
26use std::pin::Pin;
27use std::task::{Context, Poll};
28
29/// An HTTP Request
30pub type HttpRequest = http::Request<HttpRequestBody>;
31
32/// The [`Body`] of an [`HttpRequest`]
33#[derive(Debug, Clone)]
34pub struct HttpRequestBody(Inner);
35
36impl HttpRequestBody {
37    /// An empty [`HttpRequestBody`]
38    pub fn empty() -> Self {
39        Self(Inner::Bytes(Bytes::new()))
40    }
41
42    #[cfg(not(target_arch = "wasm32"))]
43    pub(crate) fn into_reqwest(self) -> reqwest::Body {
44        match self.0 {
45            Inner::Bytes(b) => b.into(),
46            Inner::PutPayload(_, payload) => reqwest::Body::wrap_stream(
47                futures_util::stream::iter(payload.into_iter().map(Ok::<_, HttpError>)),
48            ),
49        }
50    }
51
52    #[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
53    pub(crate) fn into_reqwest(self) -> reqwest::Body {
54        match self.0 {
55            Inner::Bytes(b) => b.into(),
56            Inner::PutPayload(_, payload) => Bytes::from(payload).into(),
57        }
58    }
59
60    /// Returns true if this body is empty
61    pub fn is_empty(&self) -> bool {
62        match &self.0 {
63            Inner::Bytes(x) => x.is_empty(),
64            Inner::PutPayload(_, x) => x.iter().any(|x| !x.is_empty()),
65        }
66    }
67
68    /// Returns the total length of the [`Bytes`] in this body
69    pub fn content_length(&self) -> usize {
70        match &self.0 {
71            Inner::Bytes(x) => x.len(),
72            Inner::PutPayload(_, x) => x.content_length(),
73        }
74    }
75
76    /// If this body consists of a single contiguous [`Bytes`], returns it
77    pub fn as_bytes(&self) -> Option<&Bytes> {
78        match &self.0 {
79            Inner::Bytes(x) => Some(x),
80            _ => None,
81        }
82    }
83}
84
85impl From<Bytes> for HttpRequestBody {
86    fn from(value: Bytes) -> Self {
87        Self(Inner::Bytes(value))
88    }
89}
90
91impl From<Vec<u8>> for HttpRequestBody {
92    fn from(value: Vec<u8>) -> Self {
93        Self(Inner::Bytes(value.into()))
94    }
95}
96
97impl From<String> for HttpRequestBody {
98    fn from(value: String) -> Self {
99        Self(Inner::Bytes(value.into()))
100    }
101}
102
103impl From<PutPayload> for HttpRequestBody {
104    fn from(value: PutPayload) -> Self {
105        Self(Inner::PutPayload(0, value))
106    }
107}
108
109#[derive(Debug, Clone)]
110enum Inner {
111    Bytes(Bytes),
112    PutPayload(usize, PutPayload),
113}
114
115impl Body for HttpRequestBody {
116    type Data = Bytes;
117    type Error = HttpError;
118
119    fn poll_frame(
120        mut self: Pin<&mut Self>,
121        _cx: &mut Context<'_>,
122    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
123        Poll::Ready(match &mut self.0 {
124            Inner::Bytes(bytes) => {
125                let out = bytes.split_off(0);
126                if out.is_empty() {
127                    None
128                } else {
129                    Some(Ok(Frame::data(out)))
130                }
131            }
132            Inner::PutPayload(offset, payload) => {
133                let slice = payload.as_ref();
134                if *offset == slice.len() {
135                    None
136                } else {
137                    Some(Ok(Frame::data(
138                        slice[std::mem::replace(offset, *offset + 1)].clone(),
139                    )))
140                }
141            }
142        })
143    }
144
145    fn is_end_stream(&self) -> bool {
146        match self.0 {
147            Inner::Bytes(ref bytes) => bytes.is_empty(),
148            Inner::PutPayload(offset, ref body) => offset == body.as_ref().len(),
149        }
150    }
151
152    fn size_hint(&self) -> SizeHint {
153        match self.0 {
154            Inner::Bytes(ref bytes) => SizeHint::with_exact(bytes.len() as u64),
155            Inner::PutPayload(offset, ref payload) => {
156                let iter = payload.as_ref().iter().skip(offset);
157                SizeHint::with_exact(iter.map(|x| x.len() as u64).sum())
158            }
159        }
160    }
161}
162
163/// An HTTP response
164pub type HttpResponse = http::Response<HttpResponseBody>;
165
166/// The body of an [`HttpResponse`]
167#[derive(Debug)]
168pub struct HttpResponseBody(BoxBody<Bytes, HttpError>);
169
170impl HttpResponseBody {
171    /// Create an [`HttpResponseBody`] from the provided [`Body`]
172    ///
173    /// Note: [`BodyExt::map_err`] can be used to alter error variants
174    pub fn new<B>(body: B) -> Self
175    where
176        B: Body<Data = Bytes, Error = HttpError> + Send + Sync + 'static,
177    {
178        Self(BoxBody::new(body))
179    }
180
181    /// Collects this response into a [`Bytes`]
182    pub async fn bytes(self) -> Result<Bytes, HttpError> {
183        let size_hint = self.0.size_hint().lower();
184        let s = self.0.into_data_stream();
185        collect_bytes(s, Some(size_hint)).await
186    }
187
188    /// Returns a stream of this response data
189    pub fn bytes_stream(self) -> BoxStream<'static, Result<Bytes, HttpError>> {
190        self.0.into_data_stream().boxed()
191    }
192
193    /// Returns the response as a [`String`]
194    pub(crate) async fn text(self) -> Result<String, HttpError> {
195        let b = self.bytes().await?;
196        String::from_utf8(b.into()).map_err(|e| HttpError::new(HttpErrorKind::Decode, e))
197    }
198
199    #[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
200    pub(crate) async fn json<B: serde::de::DeserializeOwned>(self) -> Result<B, HttpError> {
201        let b = self.bytes().await?;
202        serde_json::from_slice(&b).map_err(|e| HttpError::new(HttpErrorKind::Decode, e))
203    }
204}
205
206impl Body for HttpResponseBody {
207    type Data = Bytes;
208    type Error = HttpError;
209
210    fn poll_frame(
211        mut self: Pin<&mut Self>,
212        cx: &mut Context<'_>,
213    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
214        Pin::new(&mut self.0).poll_frame(cx)
215    }
216
217    fn is_end_stream(&self) -> bool {
218        self.0.is_end_stream()
219    }
220
221    fn size_hint(&self) -> SizeHint {
222        self.0.size_hint()
223    }
224}
225
226impl From<Bytes> for HttpResponseBody {
227    fn from(value: Bytes) -> Self {
228        Self::new(Full::new(value).map_err(|e| match e {}))
229    }
230}
231
232impl From<Vec<u8>> for HttpResponseBody {
233    fn from(value: Vec<u8>) -> Self {
234        Bytes::from(value).into()
235    }
236}
237
238impl From<String> for HttpResponseBody {
239    fn from(value: String) -> Self {
240        Bytes::from(value).into()
241    }
242}