1use std::borrow::{Borrow, Cow};
2use std::collections::HashMap;
3use std::marker::PhantomData;
4use std::rc::Rc;
5
6use crate::api::{
7 ApiResultExt as _, EndpointDetails, ErrorCode, Payload, PayloadTrait, Result,
8 ResultPayloadTrait as _,
9};
10use crate::misc::fmt_ext;
11
12use awc::error::StatusCode;
13use awc::http::{self, header::TryIntoHeaderValue};
14use futures_util::FutureExt as _;
15
16#[derive(Clone)]
18pub struct Client {
19 inner: Rc<Inner>,
20}
21
22#[derive(Default)]
24pub enum Agent {
25 Server(crate::servers::server::Name),
26 Cli,
27 Hub,
28 IntegrationTest,
29
30 #[default]
31 Unspecified,
32}
33
34impl std::fmt::Display for Agent {
35 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
36 match self {
37 Agent::Server(name) => name.fmt(f),
38 Agent::Cli => "CLI".fmt(f),
39 Agent::Hub => "Hub".fmt(f),
40 Agent::IntegrationTest => "Integration Test".fmt(f),
41 Agent::Unspecified => "???".fmt(f),
42 }
43 }
44}
45
46#[derive(Default)]
48pub struct Builder {
49 agent: Agent,
50}
51
52impl Builder {
53 pub fn finish(self) -> Client {
54 let http_client = awc::Client::default();
55
56 Client {
57 inner: Rc::new(Inner {
58 http_client,
59 agent: self.agent,
60 }),
61 }
62 }
63
64 pub fn agent(mut self, agent: Agent) -> Self {
65 self.agent = agent;
66 self
67 }
68}
69
70struct Inner {
72 http_client: awc::Client,
73 agent: Agent,
74}
75
76pub struct QuerySetup<EP, BU, BR, PP, HV> {
78 client: Client,
79 phantom_ep: PhantomData<EP>,
80 url: BU,
81 request: BR,
82 path_params: PP,
83 auth_header: Option<HV>,
84 timeout: Option<core::time::Duration>,
85 quiet: bool,
86}
87
88impl<'a, EP, BU, BR, PP, HV> IntoFuture for QuerySetup<EP, BU, BR, PP, HV>
89where
90 EP: EndpointDetails + 'static,
91 BU: Borrow<url::Url>,
92 BR: Borrow<EP::RequestType>,
93 PP: Borrow<PathParams<'a>>,
94 HV: Borrow<http::header::HeaderValue>,
95{
96 type Output = EP::ResponseType;
97 type IntoFuture = futures::future::LocalBoxFuture<'static, Self::Output>;
98
99 fn into_future(self) -> Self::IntoFuture {
100 let borrowed = self.borrow();
101 let fut = borrowed.into_future_impl();
102 fut.boxed_local()
103 }
104}
105
106impl<'pp, EP, BU, BR, PP, HV> QuerySetup<EP, BU, BR, PP, HV>
107where
108 EP: EndpointDetails + 'static,
109 BU: Borrow<url::Url>,
110 BR: Borrow<EP::RequestType>,
111 PP: Borrow<PathParams<'pp>>,
112 HV: Borrow<http::header::HeaderValue>,
113{
114 fn borrow<'s>(&'s self) -> BorrowedQuerySetup<'s, EP>
115 where
116 'pp: 's,
117 {
118 QuerySetup {
119 client: self.client.clone(),
120 phantom_ep: PhantomData,
121 path_params: self.path_params.borrow(),
122 request: self.request.borrow(),
123 url: self.url.borrow(),
124 auth_header: self.auth_header.as_ref().map(|v| v.borrow()),
125 timeout: self.timeout,
126 quiet: self.quiet,
127 }
128 }
129
130 pub async fn with_retry(self) -> EP::ResponseType {
131 let borrowed = self.borrow();
132 let retry_fut =
133 crate::misc::task::retry(|| async { borrowed.clone().await.into_result().retryable() });
134
135 EP::ResponseType::from_result(match retry_fut.await {
136 Ok(Some(resp)) => Result::Ok(resp),
137 Ok(None) => Result::Err(ErrorCode::PleaseRetry),
138 Err(ec) => Result::Err(ec),
139 })
140 }
141}
142
143impl<EP, BU, BR, HV, PP> QuerySetup<EP, BU, BR, PP, HV> {
144 pub fn timeout(mut self, duration: core::time::Duration) -> Self {
146 self.timeout = Some(duration);
147
148 self
149 }
150
151 pub fn quiet(mut self) -> Self {
154 self.quiet = true;
155 self
156 }
157}
158
159impl<'a, EP, BU, BR, HV> QuerySetup<EP, BU, BR, PathParams<'a>, HV> {
160 pub fn path_param(mut self, name: &'static str, value: impl Into<Cow<'a, str>>) -> Self {
162 self.path_params.insert(name, value.into());
163
164 self
165 }
166}
167
168impl<EP, BU, BR, PP> QuerySetup<EP, BU, BR, PP, http::header::HeaderValue> {
169 pub fn auth_header(mut self, value: impl TryIntoHeaderValue) -> Self {
171 let original_value = std::mem::replace(
172 &mut self.auth_header,
173 value.try_into_value().map(Some).unwrap_or_else(|_| {
174 log::error!("failed to set authorization header on request",);
175 None
176 }),
177 );
178
179 if original_value.is_some() {
180 log::warn!("authorization header set twice");
181 }
182
183 self
184 }
185}
186
187type PathParams<'a> = HashMap<&'static str, Cow<'a, str>>;
189
190pub(crate) type BorrowedQuerySetup<'a, EP> = QuerySetup<
192 EP,
193 &'a url::Url,
194 &'a <EP as EndpointDetails>::RequestType,
195 &'a PathParams<'a>,
196 &'a http::header::HeaderValue,
197>;
198
199impl<EP: EndpointDetails + 'static> Clone for BorrowedQuerySetup<'_, EP> {
200 fn clone(&self) -> Self {
201 QuerySetup {
202 client: self.client.clone(), phantom_ep: PhantomData,
204 path_params: self.path_params,
205 request: self.request,
206 url: self.url,
207 auth_header: self.auth_header,
208 timeout: self.timeout,
209 quiet: self.quiet,
210 }
211 }
212}
213
214impl<EP: EndpointDetails + 'static> BorrowedQuerySetup<'_, EP> {
215 fn into_future_impl(
216 self,
217 ) -> impl std::future::Future<Output = EP::ResponseType> + 'static + use<EP> {
218 let ep_url = {
220 let mut path = String::new();
221 let resource = actix_web::dev::ResourceDef::new(EP::PATH);
222
223 if !resource.resource_path_from_map(&mut path, self.path_params) {
224 log::warn!(
225 "Failed to replace path parameters in {} by {:?} - did you provide all path params?",
226 EP::PATH,
227 self.path_params
228 );
229 return futures::future::Either::Left(std::future::ready(
230 EP::ResponseType::from_ec(ErrorCode::InternalError),
231 ));
232 }
233
234 let result = self.url.join(&path);
235
236 if let Err(err) = result {
237 log::error!("Could not join urls {} and {}: {}", self.url, path, err);
238 return futures::future::Either::Left(std::future::ready(
239 EP::ResponseType::from_ec(ErrorCode::InternalError),
240 ));
241 }
242 result.unwrap()
243 };
244
245 let payload = self.request.to_payload();
246
247 if !self.quiet {
248 log::debug!(
249 "{}: Querying {} {} {payload}",
250 self.client.inner.agent,
251 EP::METHOD,
252 &ep_url,
253 );
254 }
255
256 let client_req = {
257 let mut client_req = self
258 .client
259 .inner
260 .http_client
261 .request(EP::METHOD, ep_url.to_string())
262 .insert_header(("User-Agent", "pubhubs")); if let Some(ct) = payload.content_type() {
265 client_req = client_req.content_type(ct.try_into_value().unwrap());
266 }
267
268 if let Some(auth_header) = self.auth_header {
269 client_req = client_req.insert_header(("Authorization", auth_header));
270 }
271
272 if let Some(timeout) = self.timeout {
273 client_req = client_req.timeout(timeout);
274 }
275
276 client_req
277 };
278
279 let payload_bytes_maybe = match payload.into_body() {
280 Ok(payload_bytes_maybe) => payload_bytes_maybe,
281 Err(err) => {
282 log::error!(
283 "{agent}: Failed to query {method} {url}: could not serialize payload: {err:#}",
284 agent = self.client.inner.agent,
285 method = EP::METHOD,
286 url = &ep_url
287 );
288 return futures::future::Either::Left(std::future::ready(
289 EP::ResponseType::from_ec(ErrorCode::BadRequest),
290 ));
291 }
292 };
293
294 let send_client_req = match payload_bytes_maybe {
295 Some(bytes) => client_req.send_body(bytes),
296 None => client_req.send(),
297 };
298
299 futures::future::Either::Right(
300 self.client
301 .clone()
302 .query_inner::<EP>(ep_url, send_client_req, self.quiet)
303 .map(EP::ResponseType::from_result),
304 )
305 }
306}
307
308impl Client {
309 pub fn builder() -> Builder {
311 Builder::default()
312 }
313
314 pub fn http_client(&self) -> &awc::Client {
316 &self.inner.http_client
317 }
318
319 pub fn query_with_retry<EP: EndpointDetails + 'static, BU, BR>(
334 &self,
335 server_url: BU,
336 req: BR,
337 ) -> impl std::future::Future<Output = EP::ResponseType> + use<EP, BU, BR>
338 where
339 BU: Borrow<url::Url>,
340 BR: Borrow<EP::RequestType>,
341 {
342 QuerySetup {
343 client: self.clone(),
344 url: server_url,
345 request: req,
346 phantom_ep: PhantomData::<EP>,
347 path_params: HashMap::new(),
348 auth_header: None::<http::header::HeaderValue>,
349 timeout: None,
350 quiet: false,
351 }
352 .with_retry()
353 }
354
355 pub fn query<'a, EP: EndpointDetails + 'static>(
357 &self,
358 server_url: &'a url::Url,
359 req: impl Borrow<EP::RequestType> + 'a,
360 ) -> QuerySetup<
361 EP,
362 &'a url::Url,
363 impl Borrow<EP::RequestType>,
364 PathParams<'a>,
365 http::header::HeaderValue,
366 > {
367 QuerySetup {
368 client: self.clone(),
369 url: server_url,
370 request: req,
371 phantom_ep: PhantomData,
372 path_params: HashMap::new(),
373 auth_header: None,
374 timeout: None,
375 quiet: false,
376 }
377 }
378
379 async fn query_inner<EP: EndpointDetails + 'static>(
380 self,
381 url: url::Url,
382 req: awc::SendClientRequest,
383 quiet: bool,
384 ) -> Result<EP::ResponseType> {
385 let mut resp = {
386 let result = req.await;
387
388 if let Err(err) = result {
389 return Result::Err(match err {
390 awc::error::SendRequestError::Url(err) => {
391 log::error!("unexpected problem with {url}: {err}");
392 ErrorCode::InternalError
393 }
394 awc::error::SendRequestError::Connect(err) => match err {
395 awc::error::ConnectError::Timeout => {
396 if !quiet {
397 log::warn!("connecting to {url} timed out");
398 }
399 ErrorCode::PleaseRetry
400 }
401 awc::error::ConnectError::Resolver(err) => {
402 if !quiet {
403 log::warn!("resolving {url}: {err}");
404 }
405 ErrorCode::PleaseRetry
406 }
407 awc::error::ConnectError::Io(err) => {
408 if !quiet {
410 log::warn!("io error while connecting to {url}: {err}");
411 }
412 ErrorCode::PleaseRetry
413 }
414 awc::error::ConnectError::Disconnected => {
415 log::warn!("server disconnected while querying {url}");
417 ErrorCode::PleaseRetry
418 }
419 _ => {
420 log::error!("error connecting to {url}: {err}");
421 ErrorCode::BadRequest
422 }
423 },
424 awc::error::SendRequestError::Send(err) => {
425 log::warn!(
426 "error while sending request to {} {url}: {}",
427 EP::METHOD,
428 err
429 );
430 ErrorCode::PleaseRetry
431 }
432 awc::error::SendRequestError::Response(err) => match err {
433 actix_web::error::ParseError::Timeout => {
434 if !quiet {
435 log::warn!(
436 "getting response to request to {} {url} timed out",
437 EP::METHOD
438 );
439 }
440 ErrorCode::PleaseRetry
441 }
442 actix_web::error::ParseError::Io(io_err) => {
443 log::warn!(
445 "error getting response to request to {} {url}: {io_err}",
446 EP::METHOD
447 );
448 ErrorCode::PleaseRetry
449 }
450 actix_web::error::ParseError::Method
451 | actix_web::error::ParseError::Uri(_)
452 | actix_web::error::ParseError::Version
453 | actix_web::error::ParseError::Header
454 | actix_web::error::ParseError::TooLarge
455 | actix_web::error::ParseError::Incomplete
456 | actix_web::error::ParseError::Status
457 | actix_web::error::ParseError::Utf8(_) => {
458 log::error!(
459 "problem parsing response from {} {url}: {err}",
460 EP::METHOD,
461 );
462 ErrorCode::InternalError
463 }
464 err => {
465 log::error!(
466 "unexpected error type while parsing response to request to {} {url}: {err}",
467 EP::METHOD
468 );
469 ErrorCode::InternalError
470 }
471 },
472 awc::error::SendRequestError::Http(err) => {
473 log::error!("HTTP error with request {} {url}: {err}", EP::METHOD,);
474 ErrorCode::InternalError
475 }
476 awc::error::SendRequestError::H2(err) => {
477 log::error!("HTTP/2 error with request {} {url}: {err}", EP::METHOD,);
478 ErrorCode::InternalError
479 }
480 awc::error::SendRequestError::Timeout => {
481 if !quiet {
482 log::warn!("request to {} {url} timed out", EP::METHOD);
483 }
484 ErrorCode::PleaseRetry
485 }
486 awc::error::SendRequestError::TunnelNotSupported => {
487 log::error!("unexpected 'TunnelNotSupported' error");
488 ErrorCode::InternalError
489 }
490 awc::error::SendRequestError::Body(err) => {
491 log::warn!(
492 "problem sending request body to {} {url}: {err}",
493 EP::METHOD
494 );
495 ErrorCode::PleaseRetry
496 }
497 awc::error::SendRequestError::Custom(err, dbg) => {
498 log::error!("unexpected custom error: {err}; {dbg:?}",);
499 ErrorCode::InternalError
500 }
501 err => {
502 log::error!(
503 "unexpected error type while sending request to {} {url}: {err}",
504 EP::METHOD
505 );
506 ErrorCode::InternalError
507 }
508 });
509 }
510
511 result.unwrap()
512 };
513
514 let status = resp.status();
516 if !status.is_success() {
517 let body = resp
518 .body()
519 .await
520 .unwrap_or_else(|_| bytes::Bytes::from_static(b"<failed to load body>"))
521 .to_vec();
522
523 if !quiet {
524 log::warn!(
525 "{agent}: {method} {url} was not succesfull: {status} {body:.100}",
526 method = EP::METHOD,
527 body = fmt_ext::Bytes(&body),
528 agent = self.inner.agent,
529 );
530 }
531
532 return Result::Err(match status {
533 StatusCode::BAD_GATEWAY | StatusCode::GATEWAY_TIMEOUT => ErrorCode::PleaseRetry,
535 _ => ErrorCode::BadRequest,
536 });
537 }
538
539 let payload =
540 Payload::<<EP::ResponseType as PayloadTrait>::JsonType>::from_client_response(resp)
541 .await
542 .map_err(|err| {
543 log::error!(
544 "{agent}: {method} {url} failed to deserialize payload: {err:#}",
545 method = EP::METHOD,
546 agent = self.inner.agent,
547 );
548 ErrorCode::InternalError
549 })?;
550
551 if !quiet {
552 log::debug!(
553 "{agent}: {method} {url} returned {payload}",
554 agent = self.inner.agent,
555 method = EP::METHOD,
556 );
557 }
558
559 EP::ResponseType::from_payload(payload).map_err(|err| {
560 log::error!(
561 "{agent}: {method} {url} failed to convert to {typename}: {err:#}",
562 typename = std::any::type_name::<EP::ResponseType>(),
563 method = EP::METHOD,
564 agent = self.inner.agent,
565 );
566 ErrorCode::InternalError
567 })
568 }
569}