1use crate::PutPayload;
21use crate::client::backoff::{Backoff, BackoffConfig};
22use crate::client::builder::HttpRequestBuilder;
23use crate::client::{HttpClient, HttpError, HttpErrorKind, HttpRequest, HttpResponse};
24use futures_util::future::BoxFuture;
25use http::{Method, Uri};
26use reqwest::StatusCode;
27use reqwest::header::LOCATION;
28#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))]
29use std::time::{Duration, Instant};
30use tracing::info;
31#[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
32use web_time::{Duration, Instant};
33
34#[derive(Debug)]
36pub struct RetryError(Box<RetryErrorImpl>);
37
38#[derive(Debug)]
40struct RetryErrorImpl {
41 method: Method,
42 uri: Option<Uri>,
43 retries: usize,
44 max_retries: usize,
45 elapsed: Duration,
46 retry_timeout: Duration,
47 inner: RequestError,
48}
49
50impl std::fmt::Display for RetryError {
51 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52 write!(f, "Error performing {} ", self.0.method)?;
53 match &self.0.uri {
54 Some(uri) => write!(f, "{uri} ")?,
55 None => write!(f, "REDACTED ")?,
56 }
57 write!(f, "in {:?}", self.0.elapsed)?;
58 if self.0.retries != 0 {
59 write!(
60 f,
61 ", after {} retries, max_retries: {}, retry_timeout: {:?} ",
62 self.0.retries, self.0.max_retries, self.0.retry_timeout
63 )?;
64 }
65 write!(f, " - {}", self.0.inner)
66 }
67}
68
69impl std::error::Error for RetryError {
70 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
71 Some(&self.0.inner)
72 }
73}
74
75pub(crate) struct RetryContext {
80 backoff: Backoff,
81 retries: usize,
82 max_retries: usize,
83 retry_timeout: Duration,
84 start: Instant,
85}
86
87impl RetryContext {
88 pub(crate) fn new(config: &RetryConfig) -> Self {
89 Self {
90 max_retries: config.max_retries,
91 retry_timeout: config.retry_timeout,
92 backoff: Backoff::new(&config.backoff),
93 retries: 0,
94 start: Instant::now(),
95 }
96 }
97
98 pub(crate) fn exhausted(&self) -> bool {
99 self.retries >= self.max_retries || self.start.elapsed() > self.retry_timeout
100 }
101
102 pub(crate) fn backoff(&mut self) -> Duration {
103 self.retries += 1;
104 self.backoff.next()
105 }
106}
107
108#[derive(Debug, thiserror::Error)]
110pub enum RequestError {
111 #[error(
112 "Received redirect without LOCATION, this normally indicates an incorrectly configured region"
113 )]
114 BareRedirect,
115
116 #[error("Server returned non-2xx status code: {status}: {}", body.as_deref().unwrap_or(""))]
117 Status {
118 status: StatusCode,
119 body: Option<String>,
120 },
121
122 #[error("Server returned error response: {body}")]
123 Response { status: StatusCode, body: String },
124
125 #[error("{0}")]
129 Http(#[from] HttpError),
130}
131
132impl RetryError {
133 pub fn inner(&self) -> &RequestError {
135 &self.0.inner
136 }
137
138 pub fn status(&self) -> Option<StatusCode> {
140 match self.inner() {
141 RequestError::Status { status, .. } | RequestError::Response { status, .. } => {
142 Some(*status)
143 }
144 RequestError::BareRedirect | RequestError::Http(_) => None,
145 }
146 }
147
148 pub fn body(&self) -> Option<&str> {
150 match self.inner() {
151 RequestError::Status { body, .. } => body.as_deref(),
152 RequestError::Response { body, .. } => Some(body),
153 RequestError::BareRedirect | RequestError::Http(_) => None,
154 }
155 }
156
157 pub fn error(self, store: &'static str, path: String) -> crate::Error {
158 match self.status() {
159 Some(StatusCode::NOT_FOUND) => crate::Error::NotFound {
160 path,
161 source: Box::new(self),
162 },
163 Some(StatusCode::NOT_MODIFIED) => crate::Error::NotModified {
164 path,
165 source: Box::new(self),
166 },
167 Some(StatusCode::PRECONDITION_FAILED) => crate::Error::Precondition {
168 path,
169 source: Box::new(self),
170 },
171 Some(StatusCode::CONFLICT) => crate::Error::AlreadyExists {
172 path,
173 source: Box::new(self),
174 },
175 Some(StatusCode::FORBIDDEN) => crate::Error::PermissionDenied {
176 path,
177 source: Box::new(self),
178 },
179 Some(StatusCode::UNAUTHORIZED) => crate::Error::Unauthenticated {
180 path,
181 source: Box::new(self),
182 },
183 _ => crate::Error::Generic {
184 store,
185 source: Box::new(self),
186 },
187 }
188 }
189}
190
191impl From<RetryError> for std::io::Error {
192 fn from(err: RetryError) -> Self {
193 use std::io::ErrorKind;
194 let kind = match err.status() {
195 Some(StatusCode::NOT_FOUND) => ErrorKind::NotFound,
196 Some(StatusCode::BAD_REQUEST) => ErrorKind::InvalidInput,
197 Some(StatusCode::UNAUTHORIZED) | Some(StatusCode::FORBIDDEN) => {
198 ErrorKind::PermissionDenied
199 }
200 _ => match err.inner() {
201 RequestError::Http(h) => match h.kind() {
202 HttpErrorKind::Timeout => ErrorKind::TimedOut,
203 HttpErrorKind::Connect => ErrorKind::NotConnected,
204 _ => ErrorKind::Other,
205 },
206 _ => ErrorKind::Other,
207 },
208 };
209 Self::new(kind, err)
210 }
211}
212
213pub(crate) type Result<T, E = RetryError> = std::result::Result<T, E>;
214
215#[derive(Debug, Clone)]
229pub struct RetryConfig {
230 pub backoff: BackoffConfig,
232
233 pub max_retries: usize,
237
238 pub retry_timeout: Duration,
250}
251
252impl Default for RetryConfig {
253 fn default() -> Self {
254 Self {
255 backoff: Default::default(),
256 max_retries: 10,
257 retry_timeout: Duration::from_secs(3 * 60),
258 }
259 }
260}
261
262fn body_contains_error(response_body: &str) -> bool {
263 response_body.contains("InternalError") || response_body.contains("SlowDown")
264}
265
266pub(crate) struct RetryableRequestBuilder {
268 request: RetryableRequest,
269 context: RetryContext,
270}
271
272impl RetryableRequestBuilder {
273 pub(crate) fn idempotent(mut self, idempotent: bool) -> Self {
278 self.request.idempotent = Some(idempotent);
279 self
280 }
281
282 #[cfg(feature = "aws")]
284 pub(crate) fn retry_on_conflict(mut self, retry_on_conflict: bool) -> Self {
285 self.request.retry_on_conflict = retry_on_conflict;
286 self
287 }
288
289 #[allow(unused)]
293 pub(crate) fn sensitive(mut self, sensitive: bool) -> Self {
294 self.request.sensitive = sensitive;
295 self
296 }
297
298 pub(crate) fn payload(mut self, payload: Option<PutPayload>) -> Self {
300 self.request.payload = payload;
301 self
302 }
303
304 #[allow(unused)]
305 pub(crate) fn retry_error_body(mut self, retry_error_body: bool) -> Self {
306 self.request.retry_error_body = retry_error_body;
307 self
308 }
309
310 pub(crate) async fn send(mut self) -> Result<HttpResponse> {
311 self.request.send(&mut self.context).await
312 }
313}
314
315pub(crate) struct RetryableRequest {
317 client: HttpClient,
318 http: HttpRequest,
319
320 sensitive: bool,
321 idempotent: Option<bool>,
322 retry_on_conflict: bool,
323 payload: Option<PutPayload>,
324
325 retry_error_body: bool,
326}
327
328impl RetryableRequest {
329 #[allow(unused)]
330 pub(crate) fn sensitive(self, sensitive: bool) -> Self {
331 Self { sensitive, ..self }
332 }
333
334 fn err(&self, error: RequestError, ctx: &RetryContext) -> RetryError {
335 RetryError(Box::new(RetryErrorImpl {
336 uri: (!self.sensitive).then(|| self.http.uri().clone()),
337 method: self.http.method().clone(),
338 retries: ctx.retries,
339 max_retries: ctx.max_retries,
340 elapsed: ctx.start.elapsed(),
341 retry_timeout: ctx.retry_timeout,
342 inner: error,
343 }))
344 }
345
346 pub(crate) async fn send(self, ctx: &mut RetryContext) -> Result<HttpResponse> {
347 loop {
348 let mut request = self.http.clone();
349
350 if let Some(payload) = &self.payload {
351 *request.body_mut() = payload.clone().into();
352 }
353
354 match self.client.execute(request).await {
355 Ok(r) => {
356 let status = r.status();
357 if status.is_success() {
358 if !self.retry_error_body {
363 return Ok(r);
364 }
365
366 let (parts, body) = r.into_parts();
367 let body = match body.text().await {
368 Ok(body) => body,
369 Err(e) => return Err(self.err(RequestError::Http(e), ctx)),
370 };
371
372 if !body_contains_error(&body) {
373 return Ok(HttpResponse::from_parts(parts, body.into()));
375 } else {
376 if ctx.exhausted() {
378 return Err(self.err(RequestError::Response { body, status }, ctx));
379 }
380
381 let sleep = ctx.backoff();
382 info!(
383 "Encountered a response status of {} but body contains Error, backing off for {} seconds, retry {} of {}",
384 status,
385 sleep.as_secs_f32(),
386 ctx.retries,
387 ctx.max_retries,
388 );
389 tokio::time::sleep(sleep).await;
390 }
391 } else if status == StatusCode::NOT_MODIFIED {
392 return Err(self.err(RequestError::Status { status, body: None }, ctx));
393 } else if status.is_redirection() {
394 let is_bare_redirect = !r.headers().contains_key(LOCATION);
395 return match is_bare_redirect {
396 true => Err(self.err(RequestError::BareRedirect, ctx)),
397 false => Err(self.err(
398 RequestError::Status {
399 body: None,
400 status: r.status(),
401 },
402 ctx,
403 )),
404 };
405 } else {
406 let status = r.status();
407 if ctx.exhausted()
408 || !(status.is_server_error()
409 || status == StatusCode::TOO_MANY_REQUESTS
410 || status == StatusCode::REQUEST_TIMEOUT
411 || (self.retry_on_conflict && status == StatusCode::CONFLICT))
412 {
413 let source = match r.into_body().text().await {
414 Ok(body) => RequestError::Status {
415 status,
416 body: Some(body),
417 },
418 Err(e) => RequestError::Http(e),
419 };
420 return Err(self.err(source, ctx));
421 };
422
423 let sleep = ctx.backoff();
424 info!(
425 "Encountered server error with status {}, backing off for {} seconds, retry {} of {}",
426 status,
427 sleep.as_secs_f32(),
428 ctx.retries,
429 ctx.max_retries,
430 );
431 tokio::time::sleep(sleep).await;
432 }
433 }
434 Err(e) => {
435 let is_idempotent = self
436 .idempotent
437 .unwrap_or_else(|| self.http.method().is_safe());
438
439 let do_retry = match e.kind() {
440 HttpErrorKind::Connect | HttpErrorKind::Request => true, HttpErrorKind::Timeout | HttpErrorKind::Interrupted => is_idempotent,
442 HttpErrorKind::Unknown | HttpErrorKind::Decode => false,
443 };
444
445 if ctx.exhausted() || !do_retry {
446 return Err(self.err(RequestError::Http(e), ctx));
447 }
448 let sleep = ctx.backoff();
449 info!(
450 "Encountered transport error of kind {:?}, backing off for {} seconds, retry {} of {}: {}",
451 e.kind(),
452 sleep.as_secs_f32(),
453 ctx.retries,
454 ctx.max_retries,
455 e,
456 );
457 tokio::time::sleep(sleep).await;
458 }
459 }
460 }
461 }
462}
463
464pub(crate) trait RetryExt {
465 fn retryable(self, config: &RetryConfig) -> RetryableRequestBuilder;
467
468 fn retryable_request(self) -> RetryableRequest;
470
471 fn send_retry(self, config: &RetryConfig) -> BoxFuture<'static, Result<HttpResponse>>;
477}
478
479impl RetryExt for HttpRequestBuilder {
480 fn retryable(self, config: &RetryConfig) -> RetryableRequestBuilder {
481 RetryableRequestBuilder {
482 request: self.retryable_request(),
483 context: RetryContext::new(config),
484 }
485 }
486
487 fn retryable_request(self) -> RetryableRequest {
488 let (client, request) = self.into_parts();
489 let request = request.expect("request must be valid");
490
491 RetryableRequest {
492 client,
493 http: request,
494 idempotent: None,
495 payload: None,
496 sensitive: false,
497 retry_on_conflict: false,
498 retry_error_body: false,
499 }
500 }
501
502 fn send_retry(self, config: &RetryConfig) -> BoxFuture<'static, Result<HttpResponse>> {
503 let request = self.retryable(config);
504 Box::pin(async move { request.send().await })
505 }
506}
507
508#[cfg(not(target_arch = "wasm32"))]
509#[cfg(test)]
510mod tests {
511 use crate::RetryConfig;
512 use crate::client::mock_server::MockServer;
513 use crate::client::retry::{RequestError, RetryContext, RetryExt, body_contains_error};
514 use crate::client::{HttpClient, HttpError, HttpErrorKind, HttpResponse};
515 use http::StatusCode;
516 use hyper::Response;
517 use hyper::header::LOCATION;
518 use hyper::server::conn::http1;
519 use hyper::service::service_fn;
520 use hyper_util::rt::TokioIo;
521 use reqwest::{Client, Method};
522 use std::convert::Infallible;
523 use std::error::Error;
524 use std::time::Duration;
525 use tokio::net::TcpListener;
526 use tokio::time::timeout;
527
528 #[test]
529 fn test_body_contains_error() {
530 let error_response = "AmazonS3Exception: We encountered an internal error. Please try again. (Service: Amazon S3; Status Code: 200; Error Code: InternalError; Request ID: 0EXAMPLE9AAEB265)";
532 assert!(body_contains_error(error_response));
533
534 let error_response_2 = "<?xml version=\"1.0\" encoding=\"UTF-8\"?><Error><Code>SlowDown</Code><Message>Please reduce your request rate.</Message><RequestId>123</RequestId><HostId>456</HostId></Error>";
535 assert!(body_contains_error(error_response_2));
536
537 let success_response = "<CopyObjectResult><LastModified>2009-10-12T17:50:30.000Z</LastModified><ETag>\"9b2cf535f27731c974343645a3985328\"</ETag></CopyObjectResult>";
539 assert!(!body_contains_error(success_response));
540 }
541
542 #[tokio::test]
543 async fn test_retry() {
544 let mock = MockServer::new().await;
545
546 let retry = RetryConfig {
547 backoff: Default::default(),
548 max_retries: 2,
549 retry_timeout: Duration::from_secs(1000),
550 };
551
552 let client = HttpClient::new(
553 Client::builder()
554 .timeout(Duration::from_millis(100))
555 .build()
556 .unwrap(),
557 );
558
559 let do_request = || client.request(Method::GET, mock.url()).send_retry(&retry);
560
561 let r = do_request().await.unwrap();
563 assert_eq!(r.status(), StatusCode::OK);
564
565 mock.push(
567 Response::builder()
568 .status(StatusCode::BAD_REQUEST)
569 .body("cupcakes".to_string())
570 .unwrap(),
571 );
572
573 let e = do_request().await.unwrap_err();
574 assert_eq!(e.status().unwrap(), StatusCode::BAD_REQUEST);
575 assert_eq!(e.body(), Some("cupcakes"));
576 assert_eq!(
577 e.inner().to_string(),
578 "Server returned non-2xx status code: 400 Bad Request: cupcakes"
579 );
580
581 mock.push(
583 Response::builder()
584 .status(StatusCode::BAD_REQUEST)
585 .body("NAUGHTY NAUGHTY".to_string())
586 .unwrap(),
587 );
588
589 let e = do_request().await.unwrap_err();
590 assert_eq!(e.status().unwrap(), StatusCode::BAD_REQUEST);
591 assert_eq!(e.body(), Some("NAUGHTY NAUGHTY"));
592 assert_eq!(
593 e.inner().to_string(),
594 "Server returned non-2xx status code: 400 Bad Request: NAUGHTY NAUGHTY"
595 );
596
597 mock.push(
599 Response::builder()
600 .status(StatusCode::BAD_GATEWAY)
601 .body(String::new())
602 .unwrap(),
603 );
604
605 let r = do_request().await.unwrap();
606 assert_eq!(r.status(), StatusCode::OK);
607
608 mock.push(
610 Response::builder()
611 .status(StatusCode::TOO_MANY_REQUESTS)
612 .body(String::new())
613 .unwrap(),
614 );
615
616 let r = do_request().await.unwrap();
617 assert_eq!(r.status(), StatusCode::OK);
618
619 mock.push(
621 Response::builder()
622 .status(StatusCode::REQUEST_TIMEOUT)
623 .body(String::new())
624 .unwrap(),
625 );
626
627 let r = do_request().await.unwrap();
628 assert_eq!(r.status(), StatusCode::OK);
629
630 mock.push(
632 Response::builder()
633 .status(StatusCode::NO_CONTENT)
634 .body(String::new())
635 .unwrap(),
636 );
637
638 let r = do_request().await.unwrap();
639 assert_eq!(r.status(), StatusCode::NO_CONTENT);
640
641 mock.push(
643 Response::builder()
644 .status(StatusCode::FOUND)
645 .header(LOCATION, "/foo")
646 .body(String::new())
647 .unwrap(),
648 );
649
650 let r = do_request().await.unwrap();
651 assert_eq!(r.status(), StatusCode::OK);
652
653 mock.push(
655 Response::builder()
656 .status(StatusCode::FOUND)
657 .header(LOCATION, "/bar")
658 .body(String::new())
659 .unwrap(),
660 );
661
662 let r = do_request().await.unwrap();
663 assert_eq!(r.status(), StatusCode::OK);
664
665 for _ in 0..11 {
667 mock.push(
668 Response::builder()
669 .status(StatusCode::FOUND)
670 .header(LOCATION, "/bar")
671 .body(String::new())
672 .unwrap(),
673 );
674 }
675
676 let e = do_request().await.unwrap_err().to_string();
677 assert!(e.contains("error following redirect"), "{}", e);
678
679 mock.push(
681 Response::builder()
682 .status(StatusCode::FOUND)
683 .body(String::new())
684 .unwrap(),
685 );
686
687 let e = do_request().await.unwrap_err();
688 assert!(matches!(e.inner(), RequestError::BareRedirect));
689 assert_eq!(
690 e.inner().to_string(),
691 "Received redirect without LOCATION, this normally indicates an incorrectly configured region"
692 );
693
694 for _ in 0..=retry.max_retries {
696 mock.push(
697 Response::builder()
698 .status(StatusCode::BAD_GATEWAY)
699 .body("ignored".to_string())
700 .unwrap(),
701 );
702 }
703
704 let e = do_request().await.unwrap_err();
705 assert!(
706 e.to_string().contains(" after 2 retries, max_retries: 2, retry_timeout: 1000s - Server returned non-2xx status code: 502 Bad Gateway: ignored"),
707 "{e}"
708 );
709 assert_eq!(
711 e.source().unwrap().to_string(),
712 "Server returned non-2xx status code: 502 Bad Gateway: ignored",
713 );
714 assert_eq!(e.body(), Some("ignored"));
716
717 mock.push_fn::<_, String>(|_| panic!());
719 let r = do_request().await.unwrap();
720 assert_eq!(r.status(), StatusCode::OK);
721
722 for _ in 0..=retry.max_retries {
724 mock.push_fn::<_, String>(|_| panic!());
725 }
726 let e = do_request().await.unwrap_err();
727 assert!(
728 e.to_string().contains("after 2 retries, max_retries: 2, retry_timeout: 1000s - HTTP error: error sending request"),
729 "{e}"
730 );
731 assert_eq!(
733 e.source().unwrap().to_string(),
734 "HTTP error: error sending request",
735 );
736 let mut err: &dyn Error = &e;
738 let mut found = false;
739 while let Some(source) = err.source() {
740 err = source;
741 if let Some(http_err) = err.downcast_ref::<HttpError>() {
742 assert_eq!(http_err.kind(), HttpErrorKind::Request);
743 found = true;
744 break;
745 }
746 }
747 assert!(found, "HttpError not found in source chain");
748
749 mock.push_async_fn(|_| async move {
751 tokio::time::sleep(Duration::from_secs(10)).await;
752 panic!()
753 });
754 do_request().await.unwrap();
755
756 mock.push_async_fn(|_| async move {
758 tokio::time::sleep(Duration::from_secs(10)).await;
759 panic!()
760 });
761 let res = client.request(Method::PUT, mock.url()).send_retry(&retry);
762 let e = res.await.unwrap_err().to_string();
763 assert!(
764 !e.contains("retries") && e.contains("error sending request"),
765 "{e}"
766 );
767
768 let url = format!("{}/SENSITIVE", mock.url());
769 for _ in 0..=retry.max_retries {
770 mock.push(
771 Response::builder()
772 .status(StatusCode::BAD_GATEWAY)
773 .body("ignored".to_string())
774 .unwrap(),
775 );
776 }
777 let res = client.request(Method::GET, url).send_retry(&retry).await;
778 let err = res.unwrap_err().to_string();
779 assert!(err.contains("SENSITIVE"), "{err}");
780
781 let url = format!("{}/SENSITIVE", mock.url());
782 for _ in 0..=retry.max_retries {
783 mock.push(
784 Response::builder()
785 .status(StatusCode::BAD_GATEWAY)
786 .body("ignored".to_string())
787 .unwrap(),
788 );
789 }
790
791 let req = client
793 .request(Method::GET, &url)
794 .retryable(&retry)
795 .sensitive(true);
796 let err = req.send().await.unwrap_err().to_string();
797 assert!(!err.contains("SENSITIVE"), "{err}");
798
799 for _ in 0..=retry.max_retries {
800 mock.push_fn::<_, String>(|_| panic!());
801 }
802
803 let req = client
804 .request(Method::GET, &url)
805 .retryable(&retry)
806 .sensitive(true);
807 let err = req.send().await.unwrap_err().to_string();
808 assert!(!err.contains("SENSITIVE"), "{err}");
809
810 mock.push(
812 Response::builder()
813 .status(StatusCode::OK)
814 .body("InternalError".to_string())
815 .unwrap(),
816 );
817 let req = client
818 .request(Method::PUT, &url)
819 .retryable(&retry)
820 .idempotent(true)
821 .retry_error_body(true);
822 let r = req.send().await.unwrap();
823 assert_eq!(r.status(), StatusCode::OK);
824 let b = r.into_body().text().await.unwrap();
826 assert!(!b.contains("InternalError"));
827
828 mock.push(
830 Response::builder()
831 .status(StatusCode::OK)
832 .body("success".to_string())
833 .unwrap(),
834 );
835 let req = client
836 .request(Method::PUT, &url)
837 .retryable(&retry)
838 .idempotent(true)
839 .retry_error_body(true);
840 let r = req.send().await.unwrap();
841 assert_eq!(r.status(), StatusCode::OK);
842 let b = r.into_body().text().await.unwrap();
843 assert!(b.contains("success"));
844
845 mock.shutdown().await
847 }
848
849 #[tokio::test]
850 async fn test_503_error_body_captured() {
851 let mock = MockServer::new().await;
852
853 let retry = RetryConfig {
854 backoff: Default::default(),
855 max_retries: 0,
856 retry_timeout: Duration::from_secs(1000),
857 };
858
859 let client = HttpClient::new(Client::builder().build().unwrap());
860
861 let slowdown_body = r#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>SlowDown</Code><Message>Please reduce your request rate.</Message></Error>"#;
863 mock.push(
864 Response::builder()
865 .status(StatusCode::SERVICE_UNAVAILABLE)
866 .body(slowdown_body.to_string())
867 .unwrap(),
868 );
869
870 let e = client
871 .request(Method::GET, mock.url())
872 .send_retry(&retry)
873 .await
874 .unwrap_err();
875
876 assert_eq!(e.status().unwrap(), StatusCode::SERVICE_UNAVAILABLE);
877 assert_eq!(e.body(), Some(slowdown_body));
878 assert!(e.body().unwrap().contains("SlowDown"));
879
880 mock.shutdown().await
881 }
882
883 #[tokio::test]
884 #[expect(
885 deprecated,
886 reason = "SO_LINGER w/ zero timeout doesn't block, see https://github.com/tokio-rs/tokio/issues/7751#issuecomment-3709831265"
887 )]
888 async fn test_connection_reset_is_retried() {
889 let retry = RetryConfig {
890 backoff: Default::default(),
891 max_retries: 2,
892 retry_timeout: Duration::from_secs(1),
893 };
894 assert!(retry.max_retries > 0);
895
896 let listener = TcpListener::bind("::1:0").await.unwrap();
898 let url = format!("http://{}", listener.local_addr().unwrap());
899 let handle = tokio::spawn(async move {
900 for _ in 0..retry.max_retries {
902 let (stream, _) = listener.accept().await.unwrap();
903 stream.set_linger(Some(Duration::from_secs(0))).unwrap();
904 }
905 let (stream, _) = listener.accept().await.unwrap();
907 http1::Builder::new()
908 .keep_alive(false)
910 .serve_connection(
911 TokioIo::new(stream),
912 service_fn(move |_req| async {
913 Ok::<_, Infallible>(HttpResponse::new("Success!".to_string().into()))
914 }),
915 )
916 .await
917 .unwrap();
918 });
919
920 let client = HttpClient::new(reqwest::Client::new());
922 let ctx = &mut RetryContext::new(&retry);
923 let res = client
924 .get(url)
925 .retryable_request()
926 .send(ctx)
927 .await
928 .expect("request should eventually succeed");
929 assert_eq!(res.status(), StatusCode::OK);
930 assert!(ctx.exhausted());
931
932 let _ = timeout(Duration::from_secs(1), handle)
934 .await
935 .expect("shutdown shouldn't hang");
936 }
937}