Skip to main content

pubhubs/client/
core.rs

1use std::borrow::{Borrow, Cow};
2use std::collections::HashMap;
3use std::marker::PhantomData;
4use std::rc::Rc;
5
6use crate::api::{
7    ApiResultExt as _, EndpointDetails, ErrorCode, Payload, PayloadTrait, Result,
8    ResultPayloadTrait as _,
9};
10use crate::misc::fmt_ext;
11
12use awc::error::StatusCode;
13use awc::http::{self, header::TryIntoHeaderValue};
14use futures_util::FutureExt as _;
15
16/// Client for making requests to pubhubs servers and hubs; cheaply clonable
17#[derive(Clone)]
18pub struct Client {
19    inner: Rc<Inner>,
20}
21
22/// Identifies the entity usig the client, for debugging purposes
23#[derive(Default)]
24pub enum Agent {
25    Server(crate::servers::server::Name),
26    Cli,
27    Hub,
28    IntegrationTest,
29
30    #[default]
31    Unspecified,
32}
33
34impl std::fmt::Display for Agent {
35    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
36        match self {
37            Agent::Server(name) => name.fmt(f),
38            Agent::Cli => "CLI".fmt(f),
39            Agent::Hub => "Hub".fmt(f),
40            Agent::IntegrationTest => "Integration Test".fmt(f),
41            Agent::Unspecified => "???".fmt(f),
42        }
43    }
44}
45
46/// Builder for [`Client`]
47#[derive(Default)]
48pub struct Builder {
49    agent: Agent,
50}
51
52impl Builder {
53    pub fn finish(self) -> Client {
54        let http_client = awc::Client::default();
55
56        Client {
57            inner: Rc::new(Inner {
58                http_client,
59                agent: self.agent,
60            }),
61        }
62    }
63
64    pub fn agent(mut self, agent: Agent) -> Self {
65        self.agent = agent;
66        self
67    }
68}
69
70/// The inner part of [`Client`]
71struct Inner {
72    http_client: awc::Client,
73    agent: Agent,
74}
75
76/// Details for a query to be sent to a pubhubs server
77pub struct QuerySetup<EP, BU, BR, PP, HV> {
78    client: Client,
79    phantom_ep: PhantomData<EP>,
80    url: BU,
81    request: BR,
82    path_params: PP,
83    auth_header: Option<HV>,
84    timeout: Option<core::time::Duration>,
85    quiet: bool,
86}
87
88impl<'a, EP, BU, BR, PP, HV> IntoFuture for QuerySetup<EP, BU, BR, PP, HV>
89where
90    EP: EndpointDetails + 'static,
91    BU: Borrow<url::Url>,
92    BR: Borrow<EP::RequestType>,
93    PP: Borrow<PathParams<'a>>,
94    HV: Borrow<http::header::HeaderValue>,
95{
96    type Output = EP::ResponseType;
97    type IntoFuture = futures::future::LocalBoxFuture<'static, Self::Output>;
98
99    fn into_future(self) -> Self::IntoFuture {
100        let borrowed = self.borrow();
101        let fut = borrowed.into_future_impl();
102        fut.boxed_local()
103    }
104}
105
106impl<'pp, EP, BU, BR, PP, HV> QuerySetup<EP, BU, BR, PP, HV>
107where
108    EP: EndpointDetails + 'static,
109    BU: Borrow<url::Url>,
110    BR: Borrow<EP::RequestType>,
111    PP: Borrow<PathParams<'pp>>,
112    HV: Borrow<http::header::HeaderValue>,
113{
114    fn borrow<'s>(&'s self) -> BorrowedQuerySetup<'s, EP>
115    where
116        'pp: 's,
117    {
118        QuerySetup {
119            client: self.client.clone(),
120            phantom_ep: PhantomData,
121            path_params: self.path_params.borrow(),
122            request: self.request.borrow(),
123            url: self.url.borrow(),
124            auth_header: self.auth_header.as_ref().map(|v| v.borrow()),
125            timeout: self.timeout,
126            quiet: self.quiet,
127        }
128    }
129
130    pub async fn with_retry(self) -> EP::ResponseType {
131        let borrowed = self.borrow();
132        let retry_fut =
133            crate::misc::task::retry(|| async { borrowed.clone().await.into_result().retryable() });
134
135        EP::ResponseType::from_result(match retry_fut.await {
136            Ok(Some(resp)) => Result::Ok(resp),
137            Ok(None) => Result::Err(ErrorCode::PleaseRetry),
138            Err(ec) => Result::Err(ec),
139        })
140    }
141}
142
143impl<EP, BU, BR, HV, PP> QuerySetup<EP, BU, BR, PP, HV> {
144    /// Override timeout for this request
145    pub fn timeout(mut self, duration: core::time::Duration) -> Self {
146        self.timeout = Some(duration);
147
148        self
149    }
150
151    /// Reduces logging for common errors, like not being able to connect.
152    /// Use this to prevent errors in recurrring querries from swamping the log.
153    pub fn quiet(mut self) -> Self {
154        self.quiet = true;
155        self
156    }
157}
158
159impl<'a, EP, BU, BR, HV> QuerySetup<EP, BU, BR, PathParams<'a>, HV> {
160    /// Sets path parameter `name` in [`EndpointDetails::PATH`] to `value`.
161    pub fn path_param(mut self, name: &'static str, value: impl Into<Cow<'a, str>>) -> Self {
162        self.path_params.insert(name, value.into());
163
164        self
165    }
166}
167
168impl<EP, BU, BR, PP> QuerySetup<EP, BU, BR, PP, http::header::HeaderValue> {
169    /// Set `Authorization` header value.
170    pub fn auth_header(mut self, value: impl TryIntoHeaderValue) -> Self {
171        let original_value = std::mem::replace(
172            &mut self.auth_header,
173            value.try_into_value().map(Some).unwrap_or_else(|_| {
174                log::error!("failed to set authorization header on request",);
175                None
176            }),
177        );
178
179        if original_value.is_some() {
180            log::warn!("authorization header set twice");
181        }
182
183        self
184    }
185}
186
187/// Base type for [`QuerySetup::path_params`]
188type PathParams<'a> = HashMap<&'static str, Cow<'a, str>>;
189
190/// Result of [`QuerySetup::borrow`], used by [`QuerySetup::with_retry`].
191pub(crate) type BorrowedQuerySetup<'a, EP> = QuerySetup<
192    EP,
193    &'a url::Url,
194    &'a <EP as EndpointDetails>::RequestType,
195    &'a PathParams<'a>,
196    &'a http::header::HeaderValue,
197>;
198
199impl<EP: EndpointDetails + 'static> Clone for BorrowedQuerySetup<'_, EP> {
200    fn clone(&self) -> Self {
201        QuerySetup {
202            client: self.client.clone(), // cheap, Rc
203            phantom_ep: PhantomData,
204            path_params: self.path_params,
205            request: self.request,
206            url: self.url,
207            auth_header: self.auth_header,
208            timeout: self.timeout,
209            quiet: self.quiet,
210        }
211    }
212}
213
214impl<EP: EndpointDetails + 'static> BorrowedQuerySetup<'_, EP> {
215    fn into_future_impl(
216        self,
217    ) -> impl std::future::Future<Output = EP::ResponseType> + 'static + use<EP> {
218        // endpoint url
219        let ep_url = {
220            let mut path = String::new();
221            let resource = actix_web::dev::ResourceDef::new(EP::PATH);
222
223            if !resource.resource_path_from_map(&mut path, self.path_params) {
224                log::warn!(
225                    "Failed to replace path parameters in {} by {:?} - did you provide all path params?",
226                    EP::PATH,
227                    self.path_params
228                );
229                return futures::future::Either::Left(std::future::ready(
230                    EP::ResponseType::from_ec(ErrorCode::InternalError),
231                ));
232            }
233
234            let result = self.url.join(&path);
235
236            if let Err(err) = result {
237                log::error!("Could not join urls {} and {}: {}", self.url, path, err);
238                return futures::future::Either::Left(std::future::ready(
239                    EP::ResponseType::from_ec(ErrorCode::InternalError),
240                ));
241            }
242            result.unwrap()
243        };
244
245        let payload = self.request.to_payload();
246
247        if !self.quiet {
248            log::debug!(
249                "{}: Querying {} {} {payload}",
250                self.client.inner.agent,
251                EP::METHOD,
252                &ep_url,
253            );
254        }
255
256        let client_req = {
257            let mut client_req = self
258                .client
259                .inner
260                .http_client
261                .request(EP::METHOD, ep_url.to_string())
262                .insert_header(("User-Agent", "pubhubs")); // see issue #1432
263
264            if let Some(ct) = payload.content_type() {
265                client_req = client_req.content_type(ct.try_into_value().unwrap());
266            }
267
268            if let Some(auth_header) = self.auth_header {
269                client_req = client_req.insert_header(("Authorization", auth_header));
270            }
271
272            if let Some(timeout) = self.timeout {
273                client_req = client_req.timeout(timeout);
274            }
275
276            client_req
277        };
278
279        let payload_bytes_maybe = match payload.into_body() {
280            Ok(payload_bytes_maybe) => payload_bytes_maybe,
281            Err(err) => {
282                log::error!(
283                    "{agent}: Failed to query {method} {url}: could not serialize payload: {err:#}",
284                    agent = self.client.inner.agent,
285                    method = EP::METHOD,
286                    url = &ep_url
287                );
288                return futures::future::Either::Left(std::future::ready(
289                    EP::ResponseType::from_ec(ErrorCode::BadRequest),
290                ));
291            }
292        };
293
294        let send_client_req = match payload_bytes_maybe {
295            Some(bytes) => client_req.send_body(bytes),
296            None => client_req.send(),
297        };
298
299        futures::future::Either::Right(
300            self.client
301                .clone()
302                .query_inner::<EP>(ep_url, send_client_req, self.quiet)
303                .map(EP::ResponseType::from_result),
304        )
305    }
306}
307
308impl Client {
309    /// Creates a new [`Builder`].
310    pub fn builder() -> Builder {
311        Builder::default()
312    }
313
314    /// Returns the underlying [`awc::Client`], for making raw HTTP requests.
315    pub fn http_client(&self) -> &awc::Client {
316        &self.inner.http_client
317    }
318
319    /// Like [`Client::query`], but retries the query when it fails with a [`crate::api::ErrorInfo::retryable`] [`ErrorCode`].
320    ////
321    /// When `A` queries `B` and `B` queries `C`, the `B` should, in general, not use
322    /// [`Client::query_with_retry`], but let `A` manage retries.  This prevents `A`'s request from hanging
323    /// without any explanation.
324    ///
325    /// Unlike [`Client::query`], the future returned by `query_with_retry` borrows `server_url`
326    /// and `req`.  (It does not borrow `self`.)
327    ///
328    /// The borrowing of `server_url` and `req` by the returned future has the unfortunate
329    /// side-effect that when the future is passed to, say, [`tokio::spawn`], `server_url`
330    /// and `req` are forced to have the `'static` lifetime.  In such cases it's easiest to pass
331    /// `server_url` and `req` not by reference, but by value - whence the use of the [`Borrow<T>`] trait,
332    /// which is implemented both by `T` and `&T`.
333    pub fn query_with_retry<EP: EndpointDetails + 'static, BU, BR>(
334        &self,
335        server_url: BU,
336        req: BR,
337    ) -> impl std::future::Future<Output = EP::ResponseType> + use<EP, BU, BR>
338    where
339        BU: Borrow<url::Url>,
340        BR: Borrow<EP::RequestType>,
341    {
342        QuerySetup {
343            client: self.clone(),
344            url: server_url,
345            request: req,
346            phantom_ep: PhantomData::<EP>,
347            path_params: HashMap::new(),
348            auth_header: None::<http::header::HeaderValue>,
349            timeout: None,
350            quiet: false,
351        }
352        .with_retry()
353    }
354
355    /// Sends a request to `EP` [endpoint](EndpointDetails) at `server_url`.
356    pub fn query<'a, EP: EndpointDetails + 'static>(
357        &self,
358        server_url: &'a url::Url,
359        req: impl Borrow<EP::RequestType> + 'a,
360    ) -> QuerySetup<
361        EP,
362        &'a url::Url,
363        impl Borrow<EP::RequestType>,
364        PathParams<'a>,
365        http::header::HeaderValue,
366    > {
367        QuerySetup {
368            client: self.clone(),
369            url: server_url,
370            request: req,
371            phantom_ep: PhantomData,
372            path_params: HashMap::new(),
373            auth_header: None,
374            timeout: None,
375            quiet: false,
376        }
377    }
378
379    async fn query_inner<EP: EndpointDetails + 'static>(
380        self,
381        url: url::Url,
382        req: awc::SendClientRequest,
383        quiet: bool,
384    ) -> Result<EP::ResponseType> {
385        let mut resp = {
386            let result = req.await;
387
388            if let Err(err) = result {
389                return Result::Err(match err {
390                    awc::error::SendRequestError::Url(err) => {
391                        log::error!("unexpected problem with {url}: {err}");
392                        ErrorCode::InternalError
393                    }
394                    awc::error::SendRequestError::Connect(err) => match err {
395                        awc::error::ConnectError::Timeout => {
396                            if !quiet {
397                                log::warn!("connecting to {url} timed out");
398                            }
399                            ErrorCode::PleaseRetry
400                        }
401                        awc::error::ConnectError::Resolver(err) => {
402                            if !quiet {
403                                log::warn!("resolving {url}: {err}");
404                            }
405                            ErrorCode::PleaseRetry
406                        }
407                        awc::error::ConnectError::Io(err) => {
408                            // might happen when the port is closed
409                            if !quiet {
410                                log::warn!("io error while connecting to {url}: {err}");
411                            }
412                            ErrorCode::PleaseRetry
413                        }
414                        awc::error::ConnectError::Disconnected => {
415                            // might happen when the contacted server shuts down
416                            log::warn!("server disconnected while querying {url}");
417                            ErrorCode::PleaseRetry
418                        }
419                        _ => {
420                            log::error!("error connecting to {url}: {err}");
421                            ErrorCode::BadRequest
422                        }
423                    },
424                    awc::error::SendRequestError::Send(err) => {
425                        log::warn!(
426                            "error while sending request to {} {url}: {}",
427                            EP::METHOD,
428                            err
429                        );
430                        ErrorCode::PleaseRetry
431                    }
432                    awc::error::SendRequestError::Response(err) => match err {
433                        actix_web::error::ParseError::Timeout => {
434                            if !quiet {
435                                log::warn!(
436                                    "getting response to request to {} {url} timed out",
437                                    EP::METHOD
438                                );
439                            }
440                            ErrorCode::PleaseRetry
441                        }
442                        actix_web::error::ParseError::Io(io_err) => {
443                            // this sometimes happens when the request causes the server to exit
444                            log::warn!(
445                                "error getting response to request to {} {url}: {io_err}",
446                                EP::METHOD
447                            );
448                            ErrorCode::PleaseRetry
449                        }
450                        actix_web::error::ParseError::Method
451                        | actix_web::error::ParseError::Uri(_)
452                        | actix_web::error::ParseError::Version
453                        | actix_web::error::ParseError::Header
454                        | actix_web::error::ParseError::TooLarge
455                        | actix_web::error::ParseError::Incomplete
456                        | actix_web::error::ParseError::Status
457                        | actix_web::error::ParseError::Utf8(_) => {
458                            log::error!(
459                                "problem parsing response from {} {url}: {err}",
460                                EP::METHOD,
461                            );
462                            ErrorCode::InternalError
463                        }
464                        err => {
465                            log::error!(
466                                "unexpected error type while parsing response to request to {} {url}: {err}",
467                                EP::METHOD
468                            );
469                            ErrorCode::InternalError
470                        }
471                    },
472                    awc::error::SendRequestError::Http(err) => {
473                        log::error!("HTTP error with request {} {url}: {err}", EP::METHOD,);
474                        ErrorCode::InternalError
475                    }
476                    awc::error::SendRequestError::H2(err) => {
477                        log::error!("HTTP/2 error with request {} {url}: {err}", EP::METHOD,);
478                        ErrorCode::InternalError
479                    }
480                    awc::error::SendRequestError::Timeout => {
481                        if !quiet {
482                            log::warn!("request to {} {url} timed out", EP::METHOD);
483                        }
484                        ErrorCode::PleaseRetry
485                    }
486                    awc::error::SendRequestError::TunnelNotSupported => {
487                        log::error!("unexpected 'TunnelNotSupported' error");
488                        ErrorCode::InternalError
489                    }
490                    awc::error::SendRequestError::Body(err) => {
491                        log::warn!(
492                            "problem sending request body to {} {url}: {err}",
493                            EP::METHOD
494                        );
495                        ErrorCode::PleaseRetry
496                    }
497                    awc::error::SendRequestError::Custom(err, dbg) => {
498                        log::error!("unexpected custom error: {err}; {dbg:?}",);
499                        ErrorCode::InternalError
500                    }
501                    err => {
502                        log::error!(
503                            "unexpected error type while sending request to {} {url}: {err}",
504                            EP::METHOD
505                        );
506                        ErrorCode::InternalError
507                    }
508                });
509            }
510
511            result.unwrap()
512        };
513
514        // check statuscode
515        let status = resp.status();
516        if !status.is_success() {
517            let body = resp
518                .body()
519                .await
520                .unwrap_or_else(|_| bytes::Bytes::from_static(b"<failed to load body>"))
521                .to_vec();
522
523            if !quiet {
524                log::warn!(
525                    "{agent}: {method} {url} was not succesfull: {status} {body:.100}",
526                    method = EP::METHOD,
527                    body = fmt_ext::Bytes(&body),
528                    agent = self.inner.agent,
529                );
530            }
531
532            return Result::Err(match status {
533                // Caddy returns 502 Bad Gateway when the service proxied to is (temporarily) down
534                StatusCode::BAD_GATEWAY | StatusCode::GATEWAY_TIMEOUT => ErrorCode::PleaseRetry,
535                _ => ErrorCode::BadRequest,
536            });
537        }
538
539        let payload =
540            Payload::<<EP::ResponseType as PayloadTrait>::JsonType>::from_client_response(resp)
541                .await
542                .map_err(|err| {
543                    log::error!(
544                        "{agent}: {method} {url} failed to deserialize payload: {err:#}",
545                        method = EP::METHOD,
546                        agent = self.inner.agent,
547                    );
548                    ErrorCode::InternalError
549                })?;
550
551        if !quiet {
552            log::debug!(
553                "{agent}: {method} {url} returned {payload}",
554                agent = self.inner.agent,
555                method = EP::METHOD,
556            );
557        }
558
559        EP::ResponseType::from_payload(payload).map_err(|err| {
560            log::error!(
561                "{agent}: {method} {url} failed to convert to {typename}: {err:#}",
562                typename = std::any::type_name::<EP::ResponseType>(),
563                method = EP::METHOD,
564                agent = self.inner.agent,
565            );
566            ErrorCode::InternalError
567        })
568    }
569}