object_store/client/http/
body.rs1use crate::client::{HttpError, HttpErrorKind};
19use crate::{PutPayload, collect_bytes};
20use bytes::Bytes;
21use futures_util::StreamExt;
22use futures_util::stream::BoxStream;
23use http_body_util::combinators::BoxBody;
24use http_body_util::{BodyExt, Full};
25use hyper::body::{Body, Frame, SizeHint};
26use std::pin::Pin;
27use std::task::{Context, Poll};
28
29pub type HttpRequest = http::Request<HttpRequestBody>;
31
32#[derive(Debug, Clone)]
34pub struct HttpRequestBody(Inner);
35
36impl HttpRequestBody {
37 pub fn empty() -> Self {
39 Self(Inner::Bytes(Bytes::new()))
40 }
41
42 #[cfg(not(target_arch = "wasm32"))]
43 pub(crate) fn into_reqwest(self) -> reqwest::Body {
44 match self.0 {
45 Inner::Bytes(b) => b.into(),
46 Inner::PutPayload(_, payload) => reqwest::Body::wrap_stream(
47 futures_util::stream::iter(payload.into_iter().map(Ok::<_, HttpError>)),
48 ),
49 }
50 }
51
52 #[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
53 pub(crate) fn into_reqwest(self) -> reqwest::Body {
54 match self.0 {
55 Inner::Bytes(b) => b.into(),
56 Inner::PutPayload(_, payload) => Bytes::from(payload).into(),
57 }
58 }
59
60 pub fn is_empty(&self) -> bool {
62 match &self.0 {
63 Inner::Bytes(x) => x.is_empty(),
64 Inner::PutPayload(_, x) => x.iter().any(|x| !x.is_empty()),
65 }
66 }
67
68 pub fn content_length(&self) -> usize {
70 match &self.0 {
71 Inner::Bytes(x) => x.len(),
72 Inner::PutPayload(_, x) => x.content_length(),
73 }
74 }
75
76 pub fn as_bytes(&self) -> Option<&Bytes> {
78 match &self.0 {
79 Inner::Bytes(x) => Some(x),
80 _ => None,
81 }
82 }
83}
84
85impl From<Bytes> for HttpRequestBody {
86 fn from(value: Bytes) -> Self {
87 Self(Inner::Bytes(value))
88 }
89}
90
91impl From<Vec<u8>> for HttpRequestBody {
92 fn from(value: Vec<u8>) -> Self {
93 Self(Inner::Bytes(value.into()))
94 }
95}
96
97impl From<String> for HttpRequestBody {
98 fn from(value: String) -> Self {
99 Self(Inner::Bytes(value.into()))
100 }
101}
102
103impl From<PutPayload> for HttpRequestBody {
104 fn from(value: PutPayload) -> Self {
105 Self(Inner::PutPayload(0, value))
106 }
107}
108
109#[derive(Debug, Clone)]
110enum Inner {
111 Bytes(Bytes),
112 PutPayload(usize, PutPayload),
113}
114
115impl Body for HttpRequestBody {
116 type Data = Bytes;
117 type Error = HttpError;
118
119 fn poll_frame(
120 mut self: Pin<&mut Self>,
121 _cx: &mut Context<'_>,
122 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
123 Poll::Ready(match &mut self.0 {
124 Inner::Bytes(bytes) => {
125 let out = bytes.split_off(0);
126 if out.is_empty() {
127 None
128 } else {
129 Some(Ok(Frame::data(out)))
130 }
131 }
132 Inner::PutPayload(offset, payload) => {
133 let slice = payload.as_ref();
134 if *offset == slice.len() {
135 None
136 } else {
137 Some(Ok(Frame::data(
138 slice[std::mem::replace(offset, *offset + 1)].clone(),
139 )))
140 }
141 }
142 })
143 }
144
145 fn is_end_stream(&self) -> bool {
146 match self.0 {
147 Inner::Bytes(ref bytes) => bytes.is_empty(),
148 Inner::PutPayload(offset, ref body) => offset == body.as_ref().len(),
149 }
150 }
151
152 fn size_hint(&self) -> SizeHint {
153 match self.0 {
154 Inner::Bytes(ref bytes) => SizeHint::with_exact(bytes.len() as u64),
155 Inner::PutPayload(offset, ref payload) => {
156 let iter = payload.as_ref().iter().skip(offset);
157 SizeHint::with_exact(iter.map(|x| x.len() as u64).sum())
158 }
159 }
160 }
161}
162
163pub type HttpResponse = http::Response<HttpResponseBody>;
165
166#[derive(Debug)]
168pub struct HttpResponseBody(BoxBody<Bytes, HttpError>);
169
170impl HttpResponseBody {
171 pub fn new<B>(body: B) -> Self
175 where
176 B: Body<Data = Bytes, Error = HttpError> + Send + Sync + 'static,
177 {
178 Self(BoxBody::new(body))
179 }
180
181 pub async fn bytes(self) -> Result<Bytes, HttpError> {
183 let size_hint = self.0.size_hint().lower();
184 let s = self.0.into_data_stream();
185 collect_bytes(s, Some(size_hint)).await
186 }
187
188 pub fn bytes_stream(self) -> BoxStream<'static, Result<Bytes, HttpError>> {
190 self.0.into_data_stream().boxed()
191 }
192
193 pub(crate) async fn text(self) -> Result<String, HttpError> {
195 let b = self.bytes().await?;
196 String::from_utf8(b.into()).map_err(|e| HttpError::new(HttpErrorKind::Decode, e))
197 }
198
199 #[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
200 pub(crate) async fn json<B: serde::de::DeserializeOwned>(self) -> Result<B, HttpError> {
201 let b = self.bytes().await?;
202 serde_json::from_slice(&b).map_err(|e| HttpError::new(HttpErrorKind::Decode, e))
203 }
204}
205
206impl Body for HttpResponseBody {
207 type Data = Bytes;
208 type Error = HttpError;
209
210 fn poll_frame(
211 mut self: Pin<&mut Self>,
212 cx: &mut Context<'_>,
213 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
214 Pin::new(&mut self.0).poll_frame(cx)
215 }
216
217 fn is_end_stream(&self) -> bool {
218 self.0.is_end_stream()
219 }
220
221 fn size_hint(&self) -> SizeHint {
222 self.0.size_hint()
223 }
224}
225
226impl From<Bytes> for HttpResponseBody {
227 fn from(value: Bytes) -> Self {
228 Self::new(Full::new(value).map_err(|e| match e {}))
229 }
230}
231
232impl From<Vec<u8>> for HttpResponseBody {
233 fn from(value: Vec<u8>) -> Self {
234 Bytes::from(value).into()
235 }
236}
237
238impl From<String> for HttpResponseBody {
239 fn from(value: String) -> Self {
240 Bytes::from(value).into()
241 }
242}