awc/responses/
response.rs1use std::{
2 cell::{Ref, RefCell, RefMut},
3 fmt, mem,
4 pin::Pin,
5 task::{Context, Poll},
6 time::{Duration, Instant},
7};
8
9use actix_http::{
10 error::PayloadError, header::HeaderMap, BoxedPayloadStream, Extensions, HttpMessage, Payload,
11 ResponseHead, StatusCode, Version,
12};
13use actix_rt::time::{sleep, Sleep};
14use bytes::Bytes;
15use futures_core::Stream;
16use pin_project_lite::pin_project;
17use serde::de::DeserializeOwned;
18
19use super::{JsonBody, ResponseBody, ResponseTimeout};
20#[cfg(feature = "cookies")]
21use crate::cookie::{Cookie, ParseError as CookieParseError};
22
23pin_project! {
24 pub struct ClientResponse<S = BoxedPayloadStream> {
26 pub(crate) head: ResponseHead,
27 #[pin]
28 pub(crate) payload: Payload<S>,
29 pub(crate) timeout: ResponseTimeout,
30 pub(crate) extensions: RefCell<Extensions>,
31
32 }
33}
34
35impl<S> ClientResponse<S> {
36 pub(crate) fn new(head: ResponseHead, payload: Payload<S>) -> Self {
38 ClientResponse {
39 head,
40 payload,
41 timeout: ResponseTimeout::default(),
42 extensions: RefCell::new(Extensions::new()),
43 }
44 }
45
46 #[inline]
47 pub(crate) fn head(&self) -> &ResponseHead {
48 &self.head
49 }
50
51 #[inline]
53 pub fn version(&self) -> Version {
54 self.head().version
55 }
56
57 #[inline]
59 pub fn status(&self) -> StatusCode {
60 self.head().status
61 }
62
63 #[inline]
64 pub fn headers(&self) -> &HeaderMap {
66 &self.head().headers
67 }
68
69 pub fn map_body<F, U>(mut self, f: F) -> ClientResponse<U>
73 where
74 F: FnOnce(&mut ResponseHead, Payload<S>) -> Payload<U>,
75 {
76 let payload = f(&mut self.head, self.payload);
77
78 ClientResponse {
79 payload,
80 head: self.head,
81 timeout: self.timeout,
82 extensions: self.extensions,
83 }
84 }
85
86 pub fn timeout(self, dur: Duration) -> Self {
93 let timeout = match self.timeout {
94 ResponseTimeout::Disabled(Some(mut timeout))
95 | ResponseTimeout::Enabled(mut timeout) => match Instant::now().checked_add(dur) {
96 Some(deadline) => {
97 timeout.as_mut().reset(deadline.into());
98 ResponseTimeout::Enabled(timeout)
99 }
100 None => ResponseTimeout::Enabled(Box::pin(sleep(dur))),
101 },
102 _ => ResponseTimeout::Enabled(Box::pin(sleep(dur))),
103 };
104
105 Self {
106 payload: self.payload,
107 head: self.head,
108 timeout,
109 extensions: self.extensions,
110 }
111 }
112
113 pub(crate) fn _timeout(mut self, timeout: Option<Pin<Box<Sleep>>>) -> Self {
117 self.timeout = ResponseTimeout::Disabled(timeout);
118 self
119 }
120
121 #[cfg(feature = "cookies")]
123 pub fn cookies(&self) -> Result<Ref<'_, Vec<Cookie<'static>>>, CookieParseError> {
124 struct Cookies(Vec<Cookie<'static>>);
125
126 if self.extensions().get::<Cookies>().is_none() {
127 let mut cookies = Vec::new();
128 for hdr in self.headers().get_all(&actix_http::header::SET_COOKIE) {
129 let s = std::str::from_utf8(hdr.as_bytes()).map_err(CookieParseError::from)?;
130 cookies.push(Cookie::parse_encoded(s)?.into_owned());
131 }
132 self.extensions_mut().insert(Cookies(cookies));
133 }
134
135 Ok(Ref::map(self.extensions(), |ext| {
136 &ext.get::<Cookies>().unwrap().0
137 }))
138 }
139
140 #[cfg(feature = "cookies")]
142 pub fn cookie(&self, name: &str) -> Option<Cookie<'static>> {
143 if let Ok(cookies) = self.cookies() {
144 for cookie in cookies.iter() {
145 if cookie.name() == name {
146 return Some(cookie.to_owned());
147 }
148 }
149 }
150 None
151 }
152}
153
154impl<S> ClientResponse<S>
155where
156 S: Stream<Item = Result<Bytes, PayloadError>>,
157{
158 pub fn body(&mut self) -> ResponseBody<S> {
179 ResponseBody::new(self)
180 }
181
182 pub fn json<T: DeserializeOwned>(&mut self) -> JsonBody<S, T> {
205 JsonBody::new(self)
206 }
207}
208
209impl<S> fmt::Debug for ClientResponse<S> {
210 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
211 writeln!(f, "\nClientResponse {:?} {}", self.version(), self.status(),)?;
212 writeln!(f, " headers:")?;
213 for (key, val) in self.headers().iter() {
214 writeln!(f, " {:?}: {:?}", key, val)?;
215 }
216 Ok(())
217 }
218}
219
220impl<S> HttpMessage for ClientResponse<S> {
221 type Stream = S;
222
223 fn headers(&self) -> &HeaderMap {
224 &self.head.headers
225 }
226
227 fn take_payload(&mut self) -> Payload<S> {
228 mem::replace(&mut self.payload, Payload::None)
229 }
230
231 fn extensions(&self) -> Ref<'_, Extensions> {
232 self.extensions.borrow()
233 }
234
235 fn extensions_mut(&self) -> RefMut<'_, Extensions> {
236 self.extensions.borrow_mut()
237 }
238}
239
240impl<S> Stream for ClientResponse<S>
241where
242 S: Stream<Item = Result<Bytes, PayloadError>> + Unpin,
243{
244 type Item = Result<Bytes, PayloadError>;
245
246 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
247 let this = self.project();
248 this.timeout.poll_timeout(cx)?;
249 this.payload.poll_next(cx)
250 }
251}
252
253#[cfg(test)]
254mod tests {
255 use static_assertions::assert_impl_all;
256
257 use super::*;
258 use crate::any_body::AnyBody;
259
260 assert_impl_all!(ClientResponse: Unpin);
261 assert_impl_all!(ClientResponse<()>: Unpin);
262 assert_impl_all!(ClientResponse<AnyBody>: Unpin);
263}