Skip to main content

awc/responses/
response.rs

1use std::{
2    cell::{Ref, RefCell, RefMut},
3    fmt, mem,
4    pin::Pin,
5    task::{Context, Poll},
6    time::{Duration, Instant},
7};
8
9use actix_http::{
10    error::PayloadError, header::HeaderMap, BoxedPayloadStream, Extensions, HttpMessage, Payload,
11    ResponseHead, StatusCode, Version,
12};
13use actix_rt::time::{sleep, Sleep};
14use bytes::Bytes;
15use futures_core::Stream;
16use pin_project_lite::pin_project;
17use serde::de::DeserializeOwned;
18
19use super::{JsonBody, ResponseBody, ResponseTimeout};
20#[cfg(feature = "cookies")]
21use crate::cookie::{Cookie, ParseError as CookieParseError};
22
23pin_project! {
24    /// Client Response
25    pub struct ClientResponse<S = BoxedPayloadStream> {
26        pub(crate) head: ResponseHead,
27        #[pin]
28        pub(crate) payload: Payload<S>,
29        pub(crate) timeout: ResponseTimeout,
30        pub(crate) extensions: RefCell<Extensions>,
31
32    }
33}
34
35impl<S> ClientResponse<S> {
36    /// Create new Request instance
37    pub(crate) fn new(head: ResponseHead, payload: Payload<S>) -> Self {
38        ClientResponse {
39            head,
40            payload,
41            timeout: ResponseTimeout::default(),
42            extensions: RefCell::new(Extensions::new()),
43        }
44    }
45
46    #[inline]
47    pub(crate) fn head(&self) -> &ResponseHead {
48        &self.head
49    }
50
51    /// Read the Request Version.
52    #[inline]
53    pub fn version(&self) -> Version {
54        self.head().version
55    }
56
57    /// Get the status from the server.
58    #[inline]
59    pub fn status(&self) -> StatusCode {
60        self.head().status
61    }
62
63    #[inline]
64    /// Returns request's headers.
65    pub fn headers(&self) -> &HeaderMap {
66        &self.head().headers
67    }
68
69    /// Map the current body type to another using a closure. Returns a new response.
70    ///
71    /// Closure receives the response head and the current body type.
72    pub fn map_body<F, U>(mut self, f: F) -> ClientResponse<U>
73    where
74        F: FnOnce(&mut ResponseHead, Payload<S>) -> Payload<U>,
75    {
76        let payload = f(&mut self.head, self.payload);
77
78        ClientResponse {
79            payload,
80            head: self.head,
81            timeout: self.timeout,
82            extensions: self.extensions,
83        }
84    }
85
86    /// Set a timeout duration for [`ClientResponse`](self::ClientResponse).
87    ///
88    /// This duration covers the duration of processing the response body stream
89    /// and would end it as timeout error when deadline met.
90    ///
91    /// Disabled by default.
92    pub fn timeout(self, dur: Duration) -> Self {
93        let timeout = match self.timeout {
94            ResponseTimeout::Disabled(Some(mut timeout))
95            | ResponseTimeout::Enabled(mut timeout) => match Instant::now().checked_add(dur) {
96                Some(deadline) => {
97                    timeout.as_mut().reset(deadline.into());
98                    ResponseTimeout::Enabled(timeout)
99                }
100                None => ResponseTimeout::Enabled(Box::pin(sleep(dur))),
101            },
102            _ => ResponseTimeout::Enabled(Box::pin(sleep(dur))),
103        };
104
105        Self {
106            payload: self.payload,
107            head: self.head,
108            timeout,
109            extensions: self.extensions,
110        }
111    }
112
113    /// This method does not enable timeout. It's used to pass the boxed `Sleep` from
114    /// `SendClientRequest` and reuse it's heap allocation together with it's slot in
115    /// timer wheel.
116    pub(crate) fn _timeout(mut self, timeout: Option<Pin<Box<Sleep>>>) -> Self {
117        self.timeout = ResponseTimeout::Disabled(timeout);
118        self
119    }
120
121    /// Load request cookies.
122    #[cfg(feature = "cookies")]
123    pub fn cookies(&self) -> Result<Ref<'_, Vec<Cookie<'static>>>, CookieParseError> {
124        struct Cookies(Vec<Cookie<'static>>);
125
126        if self.extensions().get::<Cookies>().is_none() {
127            let mut cookies = Vec::new();
128            for hdr in self.headers().get_all(&actix_http::header::SET_COOKIE) {
129                let s = std::str::from_utf8(hdr.as_bytes()).map_err(CookieParseError::from)?;
130                cookies.push(Cookie::parse_encoded(s)?.into_owned());
131            }
132            self.extensions_mut().insert(Cookies(cookies));
133        }
134
135        Ok(Ref::map(self.extensions(), |ext| {
136            &ext.get::<Cookies>().unwrap().0
137        }))
138    }
139
140    /// Return request cookie.
141    #[cfg(feature = "cookies")]
142    pub fn cookie(&self, name: &str) -> Option<Cookie<'static>> {
143        if let Ok(cookies) = self.cookies() {
144            for cookie in cookies.iter() {
145                if cookie.name() == name {
146                    return Some(cookie.to_owned());
147                }
148            }
149        }
150        None
151    }
152}
153
154impl<S> ClientResponse<S>
155where
156    S: Stream<Item = Result<Bytes, PayloadError>>,
157{
158    /// Returns a [`Future`] that consumes the body stream and resolves to [`Bytes`].
159    ///
160    /// # Errors
161    /// `Future` implementation returns error if:
162    /// - content length is greater than [limit](ResponseBody::limit) (default: 2 MiB)
163    ///
164    /// # Examples
165    /// ```no_run
166    /// # use awc::Client;
167    /// # use bytes::Bytes;
168    /// # #[actix_rt::main]
169    /// # async fn async_ctx() -> Result<(), Box<dyn std::error::Error>> {
170    /// let client = Client::default();
171    /// let mut res = client.get("https://httpbin.org/robots.txt").send().await?;
172    /// let body: Bytes = res.body().await?;
173    /// # Ok(())
174    /// # }
175    /// ```
176    ///
177    /// [`Future`]: std::future::Future
178    pub fn body(&mut self) -> ResponseBody<S> {
179        ResponseBody::new(self)
180    }
181
182    /// Returns a [`Future`] consumes the body stream, parses JSON, and resolves to a deserialized
183    /// `T` value.
184    ///
185    /// # Errors
186    /// Future returns error if:
187    /// - content type is not `application/json`;
188    /// - content length is greater than [limit](JsonBody::limit) (default: 2 MiB).
189    ///
190    /// # Examples
191    /// ```no_run
192    /// # use awc::Client;
193    /// # #[actix_rt::main]
194    /// # async fn async_ctx() -> Result<(), Box<dyn std::error::Error>> {
195    /// let client = Client::default();
196    /// let mut res = client.get("https://httpbin.org/json").send().await?;
197    /// let val = res.json::<serde_json::Value>().await?;
198    /// assert!(val.is_object());
199    /// # Ok(())
200    /// # }
201    /// ```
202    ///
203    /// [`Future`]: std::future::Future
204    pub fn json<T: DeserializeOwned>(&mut self) -> JsonBody<S, T> {
205        JsonBody::new(self)
206    }
207}
208
209impl<S> fmt::Debug for ClientResponse<S> {
210    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
211        writeln!(f, "\nClientResponse {:?} {}", self.version(), self.status(),)?;
212        writeln!(f, "  headers:")?;
213        for (key, val) in self.headers().iter() {
214            writeln!(f, "    {:?}: {:?}", key, val)?;
215        }
216        Ok(())
217    }
218}
219
220impl<S> HttpMessage for ClientResponse<S> {
221    type Stream = S;
222
223    fn headers(&self) -> &HeaderMap {
224        &self.head.headers
225    }
226
227    fn take_payload(&mut self) -> Payload<S> {
228        mem::replace(&mut self.payload, Payload::None)
229    }
230
231    fn extensions(&self) -> Ref<'_, Extensions> {
232        self.extensions.borrow()
233    }
234
235    fn extensions_mut(&self) -> RefMut<'_, Extensions> {
236        self.extensions.borrow_mut()
237    }
238}
239
240impl<S> Stream for ClientResponse<S>
241where
242    S: Stream<Item = Result<Bytes, PayloadError>> + Unpin,
243{
244    type Item = Result<Bytes, PayloadError>;
245
246    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
247        let this = self.project();
248        this.timeout.poll_timeout(cx)?;
249        this.payload.poll_next(cx)
250    }
251}
252
253#[cfg(test)]
254mod tests {
255    use static_assertions::assert_impl_all;
256
257    use super::*;
258    use crate::any_body::AnyBody;
259
260    assert_impl_all!(ClientResponse: Unpin);
261    assert_impl_all!(ClientResponse<()>: Unpin);
262    assert_impl_all!(ClientResponse<AnyBody>: Unpin);
263}