Skip to main content

object_store/client/
retry.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`RetryConfig`] connection retry policy
19
20use crate::PutPayload;
21use crate::client::backoff::{Backoff, BackoffConfig};
22use crate::client::builder::HttpRequestBuilder;
23use crate::client::{HttpClient, HttpError, HttpErrorKind, HttpRequest, HttpResponse};
24use futures_util::future::BoxFuture;
25use http::{Method, Uri};
26use reqwest::StatusCode;
27use reqwest::header::LOCATION;
28#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))]
29use std::time::{Duration, Instant};
30use tracing::info;
31#[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
32use web_time::{Duration, Instant};
33
34/// Retry request error
35#[derive(Debug)]
36pub struct RetryError(Box<RetryErrorImpl>);
37
38/// Box error to avoid large error variant
39#[derive(Debug)]
40struct RetryErrorImpl {
41    method: Method,
42    uri: Option<Uri>,
43    retries: usize,
44    max_retries: usize,
45    elapsed: Duration,
46    retry_timeout: Duration,
47    inner: RequestError,
48}
49
50impl std::fmt::Display for RetryError {
51    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52        write!(f, "Error performing {} ", self.0.method)?;
53        match &self.0.uri {
54            Some(uri) => write!(f, "{uri} ")?,
55            None => write!(f, "REDACTED ")?,
56        }
57        write!(f, "in {:?}", self.0.elapsed)?;
58        if self.0.retries != 0 {
59            write!(
60                f,
61                ", after {} retries, max_retries: {}, retry_timeout: {:?} ",
62                self.0.retries, self.0.max_retries, self.0.retry_timeout
63            )?;
64        }
65        write!(f, " - {}", self.0.inner)
66    }
67}
68
69impl std::error::Error for RetryError {
70    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
71        Some(&self.0.inner)
72    }
73}
74
75/// Context of the retry loop
76///
77/// Most use-cases should use [`RetryExt`] and [`RetryableRequestBuilder`], however,
78/// [`RetryContext`] allows preserving retry state across multiple [`RetryableRequest`]
79pub(crate) struct RetryContext {
80    backoff: Backoff,
81    retries: usize,
82    max_retries: usize,
83    retry_timeout: Duration,
84    start: Instant,
85}
86
87impl RetryContext {
88    pub(crate) fn new(config: &RetryConfig) -> Self {
89        Self {
90            max_retries: config.max_retries,
91            retry_timeout: config.retry_timeout,
92            backoff: Backoff::new(&config.backoff),
93            retries: 0,
94            start: Instant::now(),
95        }
96    }
97
98    pub(crate) fn exhausted(&self) -> bool {
99        self.retries >= self.max_retries || self.start.elapsed() > self.retry_timeout
100    }
101
102    pub(crate) fn backoff(&mut self) -> Duration {
103        self.retries += 1;
104        self.backoff.next()
105    }
106}
107
108/// The reason a request failed
109#[derive(Debug, thiserror::Error)]
110pub enum RequestError {
111    #[error(
112        "Received redirect without LOCATION, this normally indicates an incorrectly configured region"
113    )]
114    BareRedirect,
115
116    #[error("Server returned non-2xx status code: {status}: {}", body.as_deref().unwrap_or(""))]
117    Status {
118        status: StatusCode,
119        body: Option<String>,
120    },
121
122    #[error("Server returned error response: {body}")]
123    Response { status: StatusCode, body: String },
124
125    // We need to use `{0}` here rather than `error(transparent)` to ensure that
126    // `Error::source` returns `HttpError` rather than the underlying
127    // `reqwest::Error` which would happen with `error(transparent)`.
128    #[error("{0}")]
129    Http(#[from] HttpError),
130}
131
132impl RetryError {
133    /// Returns the underlying [`RequestError`]
134    pub fn inner(&self) -> &RequestError {
135        &self.0.inner
136    }
137
138    /// Returns the status code associated with this error if any
139    pub fn status(&self) -> Option<StatusCode> {
140        match self.inner() {
141            RequestError::Status { status, .. } | RequestError::Response { status, .. } => {
142                Some(*status)
143            }
144            RequestError::BareRedirect | RequestError::Http(_) => None,
145        }
146    }
147
148    /// Returns the error body if any
149    pub fn body(&self) -> Option<&str> {
150        match self.inner() {
151            RequestError::Status { body, .. } => body.as_deref(),
152            RequestError::Response { body, .. } => Some(body),
153            RequestError::BareRedirect | RequestError::Http(_) => None,
154        }
155    }
156
157    pub fn error(self, store: &'static str, path: String) -> crate::Error {
158        match self.status() {
159            Some(StatusCode::NOT_FOUND) => crate::Error::NotFound {
160                path,
161                source: Box::new(self),
162            },
163            Some(StatusCode::NOT_MODIFIED) => crate::Error::NotModified {
164                path,
165                source: Box::new(self),
166            },
167            Some(StatusCode::PRECONDITION_FAILED) => crate::Error::Precondition {
168                path,
169                source: Box::new(self),
170            },
171            Some(StatusCode::CONFLICT) => crate::Error::AlreadyExists {
172                path,
173                source: Box::new(self),
174            },
175            Some(StatusCode::FORBIDDEN) => crate::Error::PermissionDenied {
176                path,
177                source: Box::new(self),
178            },
179            Some(StatusCode::UNAUTHORIZED) => crate::Error::Unauthenticated {
180                path,
181                source: Box::new(self),
182            },
183            _ => crate::Error::Generic {
184                store,
185                source: Box::new(self),
186            },
187        }
188    }
189}
190
191impl From<RetryError> for std::io::Error {
192    fn from(err: RetryError) -> Self {
193        use std::io::ErrorKind;
194        let kind = match err.status() {
195            Some(StatusCode::NOT_FOUND) => ErrorKind::NotFound,
196            Some(StatusCode::BAD_REQUEST) => ErrorKind::InvalidInput,
197            Some(StatusCode::UNAUTHORIZED) | Some(StatusCode::FORBIDDEN) => {
198                ErrorKind::PermissionDenied
199            }
200            _ => match err.inner() {
201                RequestError::Http(h) => match h.kind() {
202                    HttpErrorKind::Timeout => ErrorKind::TimedOut,
203                    HttpErrorKind::Connect => ErrorKind::NotConnected,
204                    _ => ErrorKind::Other,
205                },
206                _ => ErrorKind::Other,
207            },
208        };
209        Self::new(kind, err)
210    }
211}
212
213pub(crate) type Result<T, E = RetryError> = std::result::Result<T, E>;
214
215/// The configuration for how to respond to request errors
216///
217/// The following categories of error will be retried:
218///
219/// * 5xx server errors
220/// * Connection errors
221/// * Dropped connections
222/// * Timeouts for [safe] / read-only requests
223///
224/// Requests will be retried up to some limit, using exponential
225/// backoff with jitter. See [`BackoffConfig`] for more information
226///
227/// [safe]: https://datatracker.ietf.org/doc/html/rfc7231#section-4.2.1
228#[derive(Debug, Clone)]
229pub struct RetryConfig {
230    /// The backoff configuration
231    pub backoff: BackoffConfig,
232
233    /// The maximum number of times to retry a request
234    ///
235    /// Set to 0 to disable retries
236    pub max_retries: usize,
237
238    /// The maximum length of time from the initial request
239    /// after which no further retries will be attempted
240    ///
241    /// This not only bounds the length of time before a server
242    /// error will be surfaced to the application, but also bounds
243    /// the length of time a request's credentials must remain valid.
244    ///
245    /// As requests are retried without renewing credentials or
246    /// regenerating request payloads, this number should be kept
247    /// below 5 minutes to avoid errors due to expired credentials
248    /// and/or request payloads
249    pub retry_timeout: Duration,
250}
251
252impl Default for RetryConfig {
253    fn default() -> Self {
254        Self {
255            backoff: Default::default(),
256            max_retries: 10,
257            retry_timeout: Duration::from_secs(3 * 60),
258        }
259    }
260}
261
262fn body_contains_error(response_body: &str) -> bool {
263    response_body.contains("InternalError") || response_body.contains("SlowDown")
264}
265
266/// Combines a [`RetryableRequest`] with a [`RetryContext`]
267pub(crate) struct RetryableRequestBuilder {
268    request: RetryableRequest,
269    context: RetryContext,
270}
271
272impl RetryableRequestBuilder {
273    /// Set whether this request is idempotent
274    ///
275    /// An idempotent request will be retried on timeout even if the request
276    /// method is not [safe](https://datatracker.ietf.org/doc/html/rfc7231#section-4.2.1)
277    pub(crate) fn idempotent(mut self, idempotent: bool) -> Self {
278        self.request.idempotent = Some(idempotent);
279        self
280    }
281
282    /// Set whether this request should be retried on a 409 Conflict response.
283    #[cfg(feature = "aws")]
284    pub(crate) fn retry_on_conflict(mut self, retry_on_conflict: bool) -> Self {
285        self.request.retry_on_conflict = retry_on_conflict;
286        self
287    }
288
289    /// Set whether this request contains sensitive data
290    ///
291    /// This will avoid printing out the URL in error messages
292    #[allow(unused)]
293    pub(crate) fn sensitive(mut self, sensitive: bool) -> Self {
294        self.request.sensitive = sensitive;
295        self
296    }
297
298    /// Provide a [`PutPayload`]
299    pub(crate) fn payload(mut self, payload: Option<PutPayload>) -> Self {
300        self.request.payload = payload;
301        self
302    }
303
304    #[allow(unused)]
305    pub(crate) fn retry_error_body(mut self, retry_error_body: bool) -> Self {
306        self.request.retry_error_body = retry_error_body;
307        self
308    }
309
310    pub(crate) async fn send(mut self) -> Result<HttpResponse> {
311        self.request.send(&mut self.context).await
312    }
313}
314
315/// A retryable request
316pub(crate) struct RetryableRequest {
317    client: HttpClient,
318    http: HttpRequest,
319
320    sensitive: bool,
321    idempotent: Option<bool>,
322    retry_on_conflict: bool,
323    payload: Option<PutPayload>,
324
325    retry_error_body: bool,
326}
327
328impl RetryableRequest {
329    #[allow(unused)]
330    pub(crate) fn sensitive(self, sensitive: bool) -> Self {
331        Self { sensitive, ..self }
332    }
333
334    fn err(&self, error: RequestError, ctx: &RetryContext) -> RetryError {
335        RetryError(Box::new(RetryErrorImpl {
336            uri: (!self.sensitive).then(|| self.http.uri().clone()),
337            method: self.http.method().clone(),
338            retries: ctx.retries,
339            max_retries: ctx.max_retries,
340            elapsed: ctx.start.elapsed(),
341            retry_timeout: ctx.retry_timeout,
342            inner: error,
343        }))
344    }
345
346    pub(crate) async fn send(self, ctx: &mut RetryContext) -> Result<HttpResponse> {
347        loop {
348            let mut request = self.http.clone();
349
350            if let Some(payload) = &self.payload {
351                *request.body_mut() = payload.clone().into();
352            }
353
354            match self.client.execute(request).await {
355                Ok(r) => {
356                    let status = r.status();
357                    if status.is_success() {
358                        // For certain S3 requests, 200 response may contain `InternalError` or
359                        // `SlowDown` in the message. These responses should be handled similarly
360                        // to r5xx errors.
361                        // More info here: https://repost.aws/knowledge-center/s3-resolve-200-internalerror
362                        if !self.retry_error_body {
363                            return Ok(r);
364                        }
365
366                        let (parts, body) = r.into_parts();
367                        let body = match body.text().await {
368                            Ok(body) => body,
369                            Err(e) => return Err(self.err(RequestError::Http(e), ctx)),
370                        };
371
372                        if !body_contains_error(&body) {
373                            // Success response and no error, clone and return response
374                            return Ok(HttpResponse::from_parts(parts, body.into()));
375                        } else {
376                            // Retry as if this was a 5xx response
377                            if ctx.exhausted() {
378                                return Err(self.err(RequestError::Response { body, status }, ctx));
379                            }
380
381                            let sleep = ctx.backoff();
382                            info!(
383                                "Encountered a response status of {} but body contains Error, backing off for {} seconds, retry {} of {}",
384                                status,
385                                sleep.as_secs_f32(),
386                                ctx.retries,
387                                ctx.max_retries,
388                            );
389                            tokio::time::sleep(sleep).await;
390                        }
391                    } else if status == StatusCode::NOT_MODIFIED {
392                        return Err(self.err(RequestError::Status { status, body: None }, ctx));
393                    } else if status.is_redirection() {
394                        let is_bare_redirect = !r.headers().contains_key(LOCATION);
395                        return match is_bare_redirect {
396                            true => Err(self.err(RequestError::BareRedirect, ctx)),
397                            false => Err(self.err(
398                                RequestError::Status {
399                                    body: None,
400                                    status: r.status(),
401                                },
402                                ctx,
403                            )),
404                        };
405                    } else {
406                        let status = r.status();
407                        if ctx.exhausted()
408                            || !(status.is_server_error()
409                                || status == StatusCode::TOO_MANY_REQUESTS
410                                || status == StatusCode::REQUEST_TIMEOUT
411                                || (self.retry_on_conflict && status == StatusCode::CONFLICT))
412                        {
413                            let source = match r.into_body().text().await {
414                                Ok(body) => RequestError::Status {
415                                    status,
416                                    body: Some(body),
417                                },
418                                Err(e) => RequestError::Http(e),
419                            };
420                            return Err(self.err(source, ctx));
421                        };
422
423                        let sleep = ctx.backoff();
424                        info!(
425                            "Encountered server error with status {}, backing off for {} seconds, retry {} of {}",
426                            status,
427                            sleep.as_secs_f32(),
428                            ctx.retries,
429                            ctx.max_retries,
430                        );
431                        tokio::time::sleep(sleep).await;
432                    }
433                }
434                Err(e) => {
435                    let is_idempotent = self
436                        .idempotent
437                        .unwrap_or_else(|| self.http.method().is_safe());
438
439                    let do_retry = match e.kind() {
440                        HttpErrorKind::Connect | HttpErrorKind::Request => true, // Request not sent, can retry
441                        HttpErrorKind::Timeout | HttpErrorKind::Interrupted => is_idempotent,
442                        HttpErrorKind::Unknown | HttpErrorKind::Decode => false,
443                    };
444
445                    if ctx.exhausted() || !do_retry {
446                        return Err(self.err(RequestError::Http(e), ctx));
447                    }
448                    let sleep = ctx.backoff();
449                    info!(
450                        "Encountered transport error of kind {:?}, backing off for {} seconds, retry {} of {}: {}",
451                        e.kind(),
452                        sleep.as_secs_f32(),
453                        ctx.retries,
454                        ctx.max_retries,
455                        e,
456                    );
457                    tokio::time::sleep(sleep).await;
458                }
459            }
460        }
461    }
462}
463
464pub(crate) trait RetryExt {
465    /// Return a [`RetryableRequestBuilder`]
466    fn retryable(self, config: &RetryConfig) -> RetryableRequestBuilder;
467
468    /// Return a [`RetryableRequest`]
469    fn retryable_request(self) -> RetryableRequest;
470
471    /// Dispatch a request with the given retry configuration
472    ///
473    /// # Panic
474    ///
475    /// This will panic if the request body is a stream
476    fn send_retry(self, config: &RetryConfig) -> BoxFuture<'static, Result<HttpResponse>>;
477}
478
479impl RetryExt for HttpRequestBuilder {
480    fn retryable(self, config: &RetryConfig) -> RetryableRequestBuilder {
481        RetryableRequestBuilder {
482            request: self.retryable_request(),
483            context: RetryContext::new(config),
484        }
485    }
486
487    fn retryable_request(self) -> RetryableRequest {
488        let (client, request) = self.into_parts();
489        let request = request.expect("request must be valid");
490
491        RetryableRequest {
492            client,
493            http: request,
494            idempotent: None,
495            payload: None,
496            sensitive: false,
497            retry_on_conflict: false,
498            retry_error_body: false,
499        }
500    }
501
502    fn send_retry(self, config: &RetryConfig) -> BoxFuture<'static, Result<HttpResponse>> {
503        let request = self.retryable(config);
504        Box::pin(async move { request.send().await })
505    }
506}
507
508#[cfg(not(target_arch = "wasm32"))]
509#[cfg(test)]
510mod tests {
511    use crate::RetryConfig;
512    use crate::client::mock_server::MockServer;
513    use crate::client::retry::{RequestError, RetryContext, RetryExt, body_contains_error};
514    use crate::client::{HttpClient, HttpError, HttpErrorKind, HttpResponse};
515    use http::StatusCode;
516    use hyper::Response;
517    use hyper::header::LOCATION;
518    use hyper::server::conn::http1;
519    use hyper::service::service_fn;
520    use hyper_util::rt::TokioIo;
521    use reqwest::{Client, Method};
522    use std::convert::Infallible;
523    use std::error::Error;
524    use std::time::Duration;
525    use tokio::net::TcpListener;
526    use tokio::time::timeout;
527
528    #[test]
529    fn test_body_contains_error() {
530        // Example error message provided by https://repost.aws/knowledge-center/s3-resolve-200-internalerror
531        let error_response = "AmazonS3Exception: We encountered an internal error. Please try again. (Service: Amazon S3; Status Code: 200; Error Code: InternalError; Request ID: 0EXAMPLE9AAEB265)";
532        assert!(body_contains_error(error_response));
533
534        let error_response_2 = "<?xml version=\"1.0\" encoding=\"UTF-8\"?><Error><Code>SlowDown</Code><Message>Please reduce your request rate.</Message><RequestId>123</RequestId><HostId>456</HostId></Error>";
535        assert!(body_contains_error(error_response_2));
536
537        // Example success response from https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html
538        let success_response = "<CopyObjectResult><LastModified>2009-10-12T17:50:30.000Z</LastModified><ETag>\"9b2cf535f27731c974343645a3985328\"</ETag></CopyObjectResult>";
539        assert!(!body_contains_error(success_response));
540    }
541
542    #[tokio::test]
543    async fn test_retry() {
544        let mock = MockServer::new().await;
545
546        let retry = RetryConfig {
547            backoff: Default::default(),
548            max_retries: 2,
549            retry_timeout: Duration::from_secs(1000),
550        };
551
552        let client = HttpClient::new(
553            Client::builder()
554                .timeout(Duration::from_millis(100))
555                .build()
556                .unwrap(),
557        );
558
559        let do_request = || client.request(Method::GET, mock.url()).send_retry(&retry);
560
561        // Simple request should work
562        let r = do_request().await.unwrap();
563        assert_eq!(r.status(), StatusCode::OK);
564
565        // Returns client errors immediately with a status message
566        mock.push(
567            Response::builder()
568                .status(StatusCode::BAD_REQUEST)
569                .body("cupcakes".to_string())
570                .unwrap(),
571        );
572
573        let e = do_request().await.unwrap_err();
574        assert_eq!(e.status().unwrap(), StatusCode::BAD_REQUEST);
575        assert_eq!(e.body(), Some("cupcakes"));
576        assert_eq!(
577            e.inner().to_string(),
578            "Server returned non-2xx status code: 400 Bad Request: cupcakes"
579        );
580
581        // Handles client errors with no payload
582        mock.push(
583            Response::builder()
584                .status(StatusCode::BAD_REQUEST)
585                .body("NAUGHTY NAUGHTY".to_string())
586                .unwrap(),
587        );
588
589        let e = do_request().await.unwrap_err();
590        assert_eq!(e.status().unwrap(), StatusCode::BAD_REQUEST);
591        assert_eq!(e.body(), Some("NAUGHTY NAUGHTY"));
592        assert_eq!(
593            e.inner().to_string(),
594            "Server returned non-2xx status code: 400 Bad Request: NAUGHTY NAUGHTY"
595        );
596
597        // Should retry server error request
598        mock.push(
599            Response::builder()
600                .status(StatusCode::BAD_GATEWAY)
601                .body(String::new())
602                .unwrap(),
603        );
604
605        let r = do_request().await.unwrap();
606        assert_eq!(r.status(), StatusCode::OK);
607
608        // Should retry 429 Too Many Requests
609        mock.push(
610            Response::builder()
611                .status(StatusCode::TOO_MANY_REQUESTS)
612                .body(String::new())
613                .unwrap(),
614        );
615
616        let r = do_request().await.unwrap();
617        assert_eq!(r.status(), StatusCode::OK);
618
619        // Should retry 408 Request Timeout
620        mock.push(
621            Response::builder()
622                .status(StatusCode::REQUEST_TIMEOUT)
623                .body(String::new())
624                .unwrap(),
625        );
626
627        let r = do_request().await.unwrap();
628        assert_eq!(r.status(), StatusCode::OK);
629
630        // Accepts 204 status code
631        mock.push(
632            Response::builder()
633                .status(StatusCode::NO_CONTENT)
634                .body(String::new())
635                .unwrap(),
636        );
637
638        let r = do_request().await.unwrap();
639        assert_eq!(r.status(), StatusCode::NO_CONTENT);
640
641        // Follows 402 redirects
642        mock.push(
643            Response::builder()
644                .status(StatusCode::FOUND)
645                .header(LOCATION, "/foo")
646                .body(String::new())
647                .unwrap(),
648        );
649
650        let r = do_request().await.unwrap();
651        assert_eq!(r.status(), StatusCode::OK);
652
653        // Follows 401 redirects
654        mock.push(
655            Response::builder()
656                .status(StatusCode::FOUND)
657                .header(LOCATION, "/bar")
658                .body(String::new())
659                .unwrap(),
660        );
661
662        let r = do_request().await.unwrap();
663        assert_eq!(r.status(), StatusCode::OK);
664
665        // Handles redirect loop
666        for _ in 0..11 {
667            mock.push(
668                Response::builder()
669                    .status(StatusCode::FOUND)
670                    .header(LOCATION, "/bar")
671                    .body(String::new())
672                    .unwrap(),
673            );
674        }
675
676        let e = do_request().await.unwrap_err().to_string();
677        assert!(e.contains("error following redirect"), "{}", e);
678
679        // Handles redirect missing location
680        mock.push(
681            Response::builder()
682                .status(StatusCode::FOUND)
683                .body(String::new())
684                .unwrap(),
685        );
686
687        let e = do_request().await.unwrap_err();
688        assert!(matches!(e.inner(), RequestError::BareRedirect));
689        assert_eq!(
690            e.inner().to_string(),
691            "Received redirect without LOCATION, this normally indicates an incorrectly configured region"
692        );
693
694        // Gives up after the retrying the specified number of times
695        for _ in 0..=retry.max_retries {
696            mock.push(
697                Response::builder()
698                    .status(StatusCode::BAD_GATEWAY)
699                    .body("ignored".to_string())
700                    .unwrap(),
701            );
702        }
703
704        let e = do_request().await.unwrap_err();
705        assert!(
706            e.to_string().contains(" after 2 retries, max_retries: 2, retry_timeout: 1000s  - Server returned non-2xx status code: 502 Bad Gateway: ignored"),
707            "{e}"
708        );
709        // verify e.source() is available as well for users who need programmatic access
710        assert_eq!(
711            e.source().unwrap().to_string(),
712            "Server returned non-2xx status code: 502 Bad Gateway: ignored",
713        );
714        // verify body is accessible
715        assert_eq!(e.body(), Some("ignored"));
716
717        // Panic results in an incomplete message error in the client
718        mock.push_fn::<_, String>(|_| panic!());
719        let r = do_request().await.unwrap();
720        assert_eq!(r.status(), StatusCode::OK);
721
722        // Gives up after retrying multiple panics
723        for _ in 0..=retry.max_retries {
724            mock.push_fn::<_, String>(|_| panic!());
725        }
726        let e = do_request().await.unwrap_err();
727        assert!(
728            e.to_string().contains("after 2 retries, max_retries: 2, retry_timeout: 1000s  - HTTP error: error sending request"),
729            "{e}"
730        );
731        // verify e.source() is available as well for users who need programmatic access
732        assert_eq!(
733            e.source().unwrap().to_string(),
734            "HTTP error: error sending request",
735        );
736        // the HttpError type needs to be directly accessible in the source chain
737        let mut err: &dyn Error = &e;
738        let mut found = false;
739        while let Some(source) = err.source() {
740            err = source;
741            if let Some(http_err) = err.downcast_ref::<HttpError>() {
742                assert_eq!(http_err.kind(), HttpErrorKind::Request);
743                found = true;
744                break;
745            }
746        }
747        assert!(found, "HttpError not found in source chain");
748
749        // Retries on client timeout
750        mock.push_async_fn(|_| async move {
751            tokio::time::sleep(Duration::from_secs(10)).await;
752            panic!()
753        });
754        do_request().await.unwrap();
755
756        // Does not retry PUT request
757        mock.push_async_fn(|_| async move {
758            tokio::time::sleep(Duration::from_secs(10)).await;
759            panic!()
760        });
761        let res = client.request(Method::PUT, mock.url()).send_retry(&retry);
762        let e = res.await.unwrap_err().to_string();
763        assert!(
764            !e.contains("retries") && e.contains("error sending request"),
765            "{e}"
766        );
767
768        let url = format!("{}/SENSITIVE", mock.url());
769        for _ in 0..=retry.max_retries {
770            mock.push(
771                Response::builder()
772                    .status(StatusCode::BAD_GATEWAY)
773                    .body("ignored".to_string())
774                    .unwrap(),
775            );
776        }
777        let res = client.request(Method::GET, url).send_retry(&retry).await;
778        let err = res.unwrap_err().to_string();
779        assert!(err.contains("SENSITIVE"), "{err}");
780
781        let url = format!("{}/SENSITIVE", mock.url());
782        for _ in 0..=retry.max_retries {
783            mock.push(
784                Response::builder()
785                    .status(StatusCode::BAD_GATEWAY)
786                    .body("ignored".to_string())
787                    .unwrap(),
788            );
789        }
790
791        // Sensitive requests should strip URL from error
792        let req = client
793            .request(Method::GET, &url)
794            .retryable(&retry)
795            .sensitive(true);
796        let err = req.send().await.unwrap_err().to_string();
797        assert!(!err.contains("SENSITIVE"), "{err}");
798
799        for _ in 0..=retry.max_retries {
800            mock.push_fn::<_, String>(|_| panic!());
801        }
802
803        let req = client
804            .request(Method::GET, &url)
805            .retryable(&retry)
806            .sensitive(true);
807        let err = req.send().await.unwrap_err().to_string();
808        assert!(!err.contains("SENSITIVE"), "{err}");
809
810        // Success response with error in body is retried
811        mock.push(
812            Response::builder()
813                .status(StatusCode::OK)
814                .body("InternalError".to_string())
815                .unwrap(),
816        );
817        let req = client
818            .request(Method::PUT, &url)
819            .retryable(&retry)
820            .idempotent(true)
821            .retry_error_body(true);
822        let r = req.send().await.unwrap();
823        assert_eq!(r.status(), StatusCode::OK);
824        // Response with InternalError should have been retried
825        let b = r.into_body().text().await.unwrap();
826        assert!(!b.contains("InternalError"));
827
828        // Should not retry success response with no error in body
829        mock.push(
830            Response::builder()
831                .status(StatusCode::OK)
832                .body("success".to_string())
833                .unwrap(),
834        );
835        let req = client
836            .request(Method::PUT, &url)
837            .retryable(&retry)
838            .idempotent(true)
839            .retry_error_body(true);
840        let r = req.send().await.unwrap();
841        assert_eq!(r.status(), StatusCode::OK);
842        let b = r.into_body().text().await.unwrap();
843        assert!(b.contains("success"));
844
845        // Shutdown
846        mock.shutdown().await
847    }
848
849    #[tokio::test]
850    async fn test_503_error_body_captured() {
851        let mock = MockServer::new().await;
852
853        let retry = RetryConfig {
854            backoff: Default::default(),
855            max_retries: 0,
856            retry_timeout: Duration::from_secs(1000),
857        };
858
859        let client = HttpClient::new(Client::builder().build().unwrap());
860
861        // Test that 503 SlowDown body is captured for throttling detection
862        let slowdown_body = r#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>SlowDown</Code><Message>Please reduce your request rate.</Message></Error>"#;
863        mock.push(
864            Response::builder()
865                .status(StatusCode::SERVICE_UNAVAILABLE)
866                .body(slowdown_body.to_string())
867                .unwrap(),
868        );
869
870        let e = client
871            .request(Method::GET, mock.url())
872            .send_retry(&retry)
873            .await
874            .unwrap_err();
875
876        assert_eq!(e.status().unwrap(), StatusCode::SERVICE_UNAVAILABLE);
877        assert_eq!(e.body(), Some(slowdown_body));
878        assert!(e.body().unwrap().contains("SlowDown"));
879
880        mock.shutdown().await
881    }
882
883    #[tokio::test]
884    #[expect(
885        deprecated,
886        reason = "SO_LINGER w/ zero timeout doesn't block, see https://github.com/tokio-rs/tokio/issues/7751#issuecomment-3709831265"
887    )]
888    async fn test_connection_reset_is_retried() {
889        let retry = RetryConfig {
890            backoff: Default::default(),
891            max_retries: 2,
892            retry_timeout: Duration::from_secs(1),
893        };
894        assert!(retry.max_retries > 0);
895
896        // Setup server which resets a connection and then quits
897        let listener = TcpListener::bind("::1:0").await.unwrap();
898        let url = format!("http://{}", listener.local_addr().unwrap());
899        let handle = tokio::spawn(async move {
900            // Reset the connection on the first n-1 attempts
901            for _ in 0..retry.max_retries {
902                let (stream, _) = listener.accept().await.unwrap();
903                stream.set_linger(Some(Duration::from_secs(0))).unwrap();
904            }
905            // Succeed on the last attempt
906            let (stream, _) = listener.accept().await.unwrap();
907            http1::Builder::new()
908                // we want the connection to end after responding
909                .keep_alive(false)
910                .serve_connection(
911                    TokioIo::new(stream),
912                    service_fn(move |_req| async {
913                        Ok::<_, Infallible>(HttpResponse::new("Success!".to_string().into()))
914                    }),
915                )
916                .await
917                .unwrap();
918        });
919
920        // Perform the request
921        let client = HttpClient::new(reqwest::Client::new());
922        let ctx = &mut RetryContext::new(&retry);
923        let res = client
924            .get(url)
925            .retryable_request()
926            .send(ctx)
927            .await
928            .expect("request should eventually succeed");
929        assert_eq!(res.status(), StatusCode::OK);
930        assert!(ctx.exhausted());
931
932        // Wait for server to shutdown
933        let _ = timeout(Duration::from_secs(1), handle)
934            .await
935            .expect("shutdown shouldn't hang");
936    }
937}