Skip to main content

object_store/aws/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! An object store implementation for S3
19//!
20//! ## Multipart uploads
21//!
22//! Multipart uploads can be initiated with the [`ObjectStore::put_multipart_opts`] method.
23//!
24//! If the writer fails for any reason, you may have parts uploaded to AWS but not
25//! used that you will be charged for. [`MultipartUpload::abort`] may be invoked to drop
26//! these unneeded parts, however, it is recommended that you consider implementing
27//! [automatic cleanup] of unused parts that are older than some threshold.
28//!
29//! [automatic cleanup]: https://aws.amazon.com/blogs/aws/s3-lifecycle-management-update-support-for-multipart-uploads-and-delete-markers/
30
31use async_trait::async_trait;
32use futures_util::stream::BoxStream;
33use futures_util::{StreamExt, TryStreamExt};
34use reqwest::header::{HeaderName, IF_MATCH, IF_NONE_MATCH};
35use reqwest::{Method, StatusCode};
36use std::{sync::Arc, time::Duration};
37use url::Url;
38
39use crate::aws::client::{CompleteMultipartMode, PutPartPayload, RequestError, S3Client};
40use crate::client::CredentialProvider;
41use crate::client::get::GetClientExt;
42use crate::client::list::{ListClient, ListClientExt};
43use crate::multipart::{MultipartStore, PartId};
44use crate::signer::Signer;
45use crate::util::STRICT_ENCODE_SET;
46use crate::{
47    CopyMode, CopyOptions, Error, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload,
48    ObjectMeta, ObjectStore, Path, PutMode, PutMultipartOptions, PutOptions, PutPayload, PutResult,
49    Result, UploadPart,
50};
51
52static TAGS_HEADER: HeaderName = HeaderName::from_static("x-amz-tagging");
53static COPY_SOURCE_HEADER: HeaderName = HeaderName::from_static("x-amz-copy-source");
54
55mod builder;
56mod checksum;
57mod client;
58mod credential;
59mod precondition;
60
61#[cfg(not(target_arch = "wasm32"))]
62mod resolve;
63
64pub use builder::{AmazonS3Builder, AmazonS3ConfigKey};
65pub use checksum::Checksum;
66pub use precondition::{S3ConditionalPut, S3CopyIfNotExists};
67
68#[cfg(not(target_arch = "wasm32"))]
69pub use resolve::resolve_bucket_region;
70
71/// This struct is used to maintain the URI path encoding
72const STRICT_PATH_ENCODE_SET: percent_encoding::AsciiSet = STRICT_ENCODE_SET.remove(b'/');
73
74const STORE: &str = "S3";
75
76/// [`CredentialProvider`] for [`AmazonS3`]
77pub type AwsCredentialProvider = Arc<dyn CredentialProvider<Credential = AwsCredential>>;
78use crate::client::parts::Parts;
79use crate::list::{PaginatedListOptions, PaginatedListResult, PaginatedListStore};
80pub use credential::{AwsAuthorizer, AwsCredential};
81
82/// Interface for [Amazon S3](https://aws.amazon.com/s3/).
83#[derive(Debug, Clone)]
84pub struct AmazonS3 {
85    client: Arc<S3Client>,
86}
87
88impl std::fmt::Display for AmazonS3 {
89    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
90        write!(f, "AmazonS3({})", self.client.config.bucket)
91    }
92}
93
94impl AmazonS3 {
95    /// Returns the [`AwsCredentialProvider`] used by [`AmazonS3`]
96    pub fn credentials(&self) -> &AwsCredentialProvider {
97        &self.client.config.credentials
98    }
99
100    /// Create a full URL to the resource specified by `path` with this instance's configuration.
101    fn path_url(&self, path: &Path) -> String {
102        self.client.config.path_url(path)
103    }
104}
105
106#[async_trait]
107impl Signer for AmazonS3 {
108    /// Create a URL containing the relevant [AWS SigV4] query parameters that authorize a request
109    /// via `method` to the resource at `path` valid for the duration specified in `expires_in`.
110    ///
111    /// [AWS SigV4]: https://docs.aws.amazon.com/IAM/latest/UserGuide/create-signed-request.html
112    ///
113    /// # Example
114    ///
115    /// This example returns a URL that will enable a user to upload a file to
116    /// "some-folder/some-file.txt" in the next hour.
117    ///
118    /// ```
119    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
120    /// # use object_store::{aws::AmazonS3Builder, path::Path, signer::Signer};
121    /// # use reqwest::Method;
122    /// # use std::time::Duration;
123    /// #
124    /// let region = "us-east-1";
125    /// let s3 = AmazonS3Builder::new()
126    ///     .with_region(region)
127    ///     .with_bucket_name("my-bucket")
128    ///     .with_access_key_id("my-access-key-id")
129    ///     .with_secret_access_key("my-secret-access-key")
130    ///     .build()?;
131    ///
132    /// let url = s3.signed_url(
133    ///     Method::PUT,
134    ///     &Path::from("some-folder/some-file.txt"),
135    ///     Duration::from_secs(60 * 60)
136    /// ).await?;
137    /// #     Ok(())
138    /// # }
139    /// ```
140    async fn signed_url(&self, method: Method, path: &Path, expires_in: Duration) -> Result<Url> {
141        let credential = self.credentials().get_credential().await?;
142        let authorizer = AwsAuthorizer::new(&credential, "s3", &self.client.config.region)
143            .with_request_payer(self.client.config.request_payer);
144
145        let path_url = self.path_url(path);
146        let mut url = path_url.parse().map_err(|e| Error::Generic {
147            store: STORE,
148            source: format!("Unable to parse url {path_url}: {e}").into(),
149        })?;
150
151        authorizer.sign(method, &mut url, expires_in);
152
153        Ok(url)
154    }
155}
156
157#[async_trait]
158impl ObjectStore for AmazonS3 {
159    async fn put_opts(
160        &self,
161        location: &Path,
162        payload: PutPayload,
163        opts: PutOptions,
164    ) -> Result<PutResult> {
165        let PutOptions {
166            mode,
167            tags,
168            attributes,
169            extensions,
170        } = opts;
171
172        let request = self
173            .client
174            .request(Method::PUT, location)
175            .with_payload(payload)
176            .with_attributes(attributes)
177            .with_tags(tags)
178            .with_extensions(extensions)
179            .with_encryption_headers();
180
181        match (mode, &self.client.config.conditional_put) {
182            (PutMode::Overwrite, _) => request.idempotent(true).do_put().await,
183            (PutMode::Create, S3ConditionalPut::Disabled) => Err(Error::NotImplemented {
184                operation:
185                    "`put_opts` with mode `PutMode::Create` when conditional put is disabled".into(),
186                implementer: self.to_string(),
187            }),
188            (PutMode::Create, S3ConditionalPut::ETagMatch) => {
189                match request.header(&IF_NONE_MATCH, "*").do_put().await {
190                    // Technically If-None-Match should return NotModified but some stores,
191                    // such as R2, instead return PreconditionFailed
192                    // https://developers.cloudflare.com/r2/api/s3/extensions/#conditional-operations-in-putobject
193                    Err(e @ Error::NotModified { .. } | e @ Error::Precondition { .. }) => {
194                        Err(Error::AlreadyExists {
195                            path: location.to_string(),
196                            source: Box::new(e),
197                        })
198                    }
199                    r => r,
200                }
201            }
202            (PutMode::Update(v), put) => {
203                let etag = v.e_tag.ok_or_else(|| Error::Generic {
204                    store: STORE,
205                    source: "ETag required for conditional put".to_string().into(),
206                })?;
207                match put {
208                    S3ConditionalPut::ETagMatch => {
209                        match request
210                            .header(&IF_MATCH, etag.as_str())
211                            // Real S3 will occasionally report 409 Conflict
212                            // if there are concurrent `If-Match` requests
213                            // in flight, so we need to be prepared to retry
214                            // 409 responses.
215                            .retry_on_conflict(true)
216                            .do_put()
217                            .await
218                        {
219                            // Real S3 reports NotFound rather than PreconditionFailed when the
220                            // object doesn't exist. Convert to PreconditionFailed for
221                            // consistency with R2. This also matches what the HTTP spec
222                            // says the behavior should be.
223                            Err(Error::NotFound { path, source }) => {
224                                Err(Error::Precondition { path, source })
225                            }
226                            r => r,
227                        }
228                    }
229                    S3ConditionalPut::Disabled => Err(Error::NotImplemented {
230                        operation:
231                            "`put_opts` with mode `PutMode::Update` when conditional put is disabled"
232                                .into(),
233                        implementer: self.to_string(),
234                    }),
235                }
236            }
237        }
238    }
239
240    async fn put_multipart_opts(
241        &self,
242        location: &Path,
243        opts: PutMultipartOptions,
244    ) -> Result<Box<dyn MultipartUpload>> {
245        let upload_id = self.client.create_multipart(location, opts).await?;
246
247        Ok(Box::new(S3MultiPartUpload {
248            part_idx: 0,
249            state: Arc::new(UploadState {
250                client: Arc::clone(&self.client),
251                location: location.clone(),
252                upload_id: upload_id.clone(),
253                parts: Default::default(),
254            }),
255        }))
256    }
257
258    async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
259        self.client.get_opts(location, options).await
260    }
261
262    fn delete_stream(
263        &self,
264        locations: BoxStream<'static, Result<Path>>,
265    ) -> BoxStream<'static, Result<Path>> {
266        let client = Arc::clone(&self.client);
267        locations
268            .try_chunks(1_000)
269            .map(move |locations| {
270                let client = Arc::clone(&client);
271                async move {
272                    // Early return the error. We ignore the paths that have already been
273                    // collected into the chunk.
274                    let locations = locations.map_err(|e| e.1)?;
275                    client
276                        .bulk_delete_request(locations)
277                        .await
278                        .map(futures_util::stream::iter)
279                }
280            })
281            .buffered(20)
282            .try_flatten()
283            .boxed()
284    }
285
286    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
287        self.client.list(prefix)
288    }
289
290    fn list_with_offset(
291        &self,
292        prefix: Option<&Path>,
293        offset: &Path,
294    ) -> BoxStream<'static, Result<ObjectMeta>> {
295        if self.client.config.is_s3_express() {
296            let offset = offset.clone();
297            // S3 Express does not support start-after
298            return self
299                .client
300                .list(prefix)
301                .try_filter(move |f| futures_util::future::ready(f.location > offset))
302                .boxed();
303        }
304
305        self.client.list_with_offset(prefix, offset)
306    }
307
308    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
309        self.client.list_with_delimiter(prefix).await
310    }
311
312    async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) -> Result<()> {
313        let CopyOptions {
314            mode,
315            extensions: _,
316        } = options;
317
318        match mode {
319            CopyMode::Overwrite => {
320                self.client
321                    .copy_request(from, to)
322                    .idempotent(true)
323                    .send()
324                    .await?;
325                Ok(())
326            }
327            CopyMode::Create => {
328                let (k, v, status) = match &self.client.config.copy_if_not_exists {
329                    Some(S3CopyIfNotExists::Header(k, v)) => {
330                        (k, v, StatusCode::PRECONDITION_FAILED)
331                    }
332                    Some(S3CopyIfNotExists::HeaderWithStatus(k, v, status)) => (k, v, *status),
333                    Some(S3CopyIfNotExists::Multipart) => {
334                        let upload_id = self
335                            .client
336                            .create_multipart(to, PutMultipartOptions::default())
337                            .await?;
338
339                        let res = async {
340                            let part_id = self
341                                .client
342                                .put_part(to, &upload_id, 0, PutPartPayload::Copy(from))
343                                .await?;
344                            match self
345                                .client
346                                .complete_multipart(
347                                    to,
348                                    &upload_id,
349                                    vec![part_id],
350                                    CompleteMultipartMode::Create,
351                                )
352                                .await
353                            {
354                                Err(e @ Error::Precondition { .. }) => Err(Error::AlreadyExists {
355                                    path: to.to_string(),
356                                    source: Box::new(e),
357                                }),
358                                Ok(_) => Ok(()),
359                                Err(e) => Err(e),
360                            }
361                        }
362                        .await;
363
364                        // If the multipart upload failed, make a best effort attempt to
365                        // clean it up. It's the caller's responsibility to add a
366                        // lifecycle rule if guaranteed cleanup is required, as we
367                        // cannot protect against an ill-timed process crash.
368                        if res.is_err() {
369                            let _ = self.client.abort_multipart(to, &upload_id).await;
370                        }
371
372                        return res;
373                    }
374                    None => {
375                        return Err(Error::NotSupported {
376                            source: "S3 does not support copy-if-not-exists".to_string().into(),
377                        });
378                    }
379                };
380
381                let req = self.client.copy_request(from, to);
382                match req.header(k, v).send().await {
383                    Err(RequestError::Retry { source, path })
384                        if source.status() == Some(status) =>
385                    {
386                        Err(Error::AlreadyExists {
387                            source: Box::new(source),
388                            path,
389                        })
390                    }
391                    Err(e) => Err(e.into()),
392                    Ok(_) => Ok(()),
393                }
394            }
395        }
396    }
397}
398
399#[derive(Debug)]
400struct S3MultiPartUpload {
401    part_idx: usize,
402    state: Arc<UploadState>,
403}
404
405#[derive(Debug)]
406struct UploadState {
407    parts: Parts,
408    location: Path,
409    upload_id: String,
410    client: Arc<S3Client>,
411}
412
413#[async_trait]
414impl MultipartUpload for S3MultiPartUpload {
415    fn put_part(&mut self, data: PutPayload) -> UploadPart {
416        let idx = self.part_idx;
417        self.part_idx += 1;
418        let state = Arc::clone(&self.state);
419        Box::pin(async move {
420            let part = state
421                .client
422                .put_part(
423                    &state.location,
424                    &state.upload_id,
425                    idx,
426                    PutPartPayload::Part(data),
427                )
428                .await?;
429            state.parts.put(idx, part);
430            Ok(())
431        })
432    }
433
434    async fn complete(&mut self) -> Result<PutResult> {
435        let parts = self.state.parts.finish(self.part_idx)?;
436
437        self.state
438            .client
439            .complete_multipart(
440                &self.state.location,
441                &self.state.upload_id,
442                parts,
443                CompleteMultipartMode::Overwrite,
444            )
445            .await
446    }
447
448    async fn abort(&mut self) -> Result<()> {
449        self.state
450            .client
451            .request(Method::DELETE, &self.state.location)
452            .query(&[("uploadId", &self.state.upload_id)])
453            .idempotent(true)
454            .send()
455            .await?;
456
457        Ok(())
458    }
459}
460
461#[async_trait]
462impl MultipartStore for AmazonS3 {
463    async fn create_multipart(&self, path: &Path) -> Result<MultipartId> {
464        self.client
465            .create_multipart(path, PutMultipartOptions::default())
466            .await
467    }
468
469    async fn put_part(
470        &self,
471        path: &Path,
472        id: &MultipartId,
473        part_idx: usize,
474        data: PutPayload,
475    ) -> Result<PartId> {
476        self.client
477            .put_part(path, id, part_idx, PutPartPayload::Part(data))
478            .await
479    }
480
481    async fn complete_multipart(
482        &self,
483        path: &Path,
484        id: &MultipartId,
485        parts: Vec<PartId>,
486    ) -> Result<PutResult> {
487        self.client
488            .complete_multipart(path, id, parts, CompleteMultipartMode::Overwrite)
489            .await
490    }
491
492    async fn abort_multipart(&self, path: &Path, id: &MultipartId) -> Result<()> {
493        self.client
494            .request(Method::DELETE, path)
495            .query(&[("uploadId", id)])
496            .send()
497            .await?;
498        Ok(())
499    }
500}
501
502#[async_trait]
503impl PaginatedListStore for AmazonS3 {
504    async fn list_paginated(
505        &self,
506        prefix: Option<&str>,
507        opts: PaginatedListOptions,
508    ) -> Result<PaginatedListResult> {
509        self.client.list_request(prefix, opts).await
510    }
511}
512
513#[cfg(test)]
514mod tests {
515    use super::*;
516    use crate::ClientOptions;
517    use crate::ObjectStoreExt;
518    use crate::client::SpawnedReqwestConnector;
519    use crate::client::get::GetClient;
520    use crate::client::retry::RetryContext;
521    use crate::integration::*;
522    use crate::tests::*;
523    use base64::Engine;
524    use base64::prelude::BASE64_STANDARD;
525    use http::HeaderMap;
526
527    const NON_EXISTENT_NAME: &str = "nonexistentname";
528
529    #[tokio::test]
530    async fn write_multipart_file_with_signature() {
531        maybe_skip_integration!();
532
533        let bucket = "test-bucket-for-checksum";
534        let store = AmazonS3Builder::from_env()
535            .with_bucket_name(bucket)
536            .with_checksum_algorithm(Checksum::SHA256)
537            .build()
538            .unwrap();
539
540        let str = "test.bin";
541        let path = Path::parse(str).unwrap();
542        let opts = PutMultipartOptions::default();
543        let mut upload = store.put_multipart_opts(&path, opts).await.unwrap();
544
545        upload
546            .put_part(PutPayload::from(vec![0u8; 10_000_000]))
547            .await
548            .unwrap();
549        upload
550            .put_part(PutPayload::from(vec![0u8; 5_000_000]))
551            .await
552            .unwrap();
553
554        let res = upload.complete().await.unwrap();
555        assert!(res.e_tag.is_some(), "Should have valid etag");
556
557        store.delete(&path).await.unwrap();
558    }
559
560    #[tokio::test]
561    async fn copy_multipart_file_with_signature() {
562        maybe_skip_integration!();
563
564        let bucket = "test-bucket-for-copy-if-not-exists";
565        let store = AmazonS3Builder::from_env()
566            .with_bucket_name(bucket)
567            .with_checksum_algorithm(Checksum::SHA256)
568            .with_copy_if_not_exists(S3CopyIfNotExists::Multipart)
569            .build()
570            .unwrap();
571
572        let src = Path::parse("src.bin").unwrap();
573        let dst = Path::parse("dst.bin").unwrap();
574        store
575            .put(&src, PutPayload::from(vec![0u8; 100_000]))
576            .await
577            .unwrap();
578        if store.head(&dst).await.is_ok() {
579            store.delete(&dst).await.unwrap();
580        }
581        store.copy_if_not_exists(&src, &dst).await.unwrap();
582        store.delete(&src).await.unwrap();
583        store.delete(&dst).await.unwrap();
584    }
585
586    #[tokio::test]
587    async fn write_multipart_file_with_signature_object_lock() {
588        maybe_skip_integration!();
589
590        let bucket = "test-object-lock";
591        let store = AmazonS3Builder::from_env()
592            .with_bucket_name(bucket)
593            .with_checksum_algorithm(Checksum::SHA256)
594            .build()
595            .unwrap();
596
597        let str = "test.bin";
598        let path = Path::parse(str).unwrap();
599        let opts = PutMultipartOptions::default();
600        let mut upload = store.put_multipart_opts(&path, opts).await.unwrap();
601
602        upload
603            .put_part(PutPayload::from(vec![0u8; 10_000_000]))
604            .await
605            .unwrap();
606        upload
607            .put_part(PutPayload::from(vec![0u8; 5_000_000]))
608            .await
609            .unwrap();
610
611        let res = upload.complete().await.unwrap();
612        assert!(res.e_tag.is_some(), "Should have valid etag");
613
614        store.delete(&path).await.unwrap();
615    }
616
617    #[tokio::test]
618    async fn s3_test() {
619        maybe_skip_integration!();
620        let config = AmazonS3Builder::from_env();
621
622        let integration = config.build().unwrap();
623        let config = &integration.client.config;
624        let test_not_exists = config.copy_if_not_exists.is_some();
625        let test_conditional_put = config.conditional_put != S3ConditionalPut::Disabled;
626
627        put_get_delete_list(&integration).await;
628        list_with_offset_exclusivity(&integration).await;
629        get_opts(&integration).await;
630        list_uses_directories_correctly(&integration).await;
631        list_with_delimiter(&integration).await;
632        rename_and_copy(&integration).await;
633        stream_get(&integration).await;
634        multipart(&integration, &integration).await;
635        multipart_race_condition(&integration, true).await;
636        multipart_out_of_order(&integration).await;
637        signing(&integration).await;
638        s3_encryption(&integration).await;
639        put_get_attributes(&integration).await;
640        list_paginated(&integration, &integration).await;
641
642        // Object tagging is not supported by S3 Express One Zone
643        if config.session_provider.is_none() {
644            tagging(
645                Arc::new(AmazonS3 {
646                    client: Arc::clone(&integration.client),
647                }),
648                !config.disable_tagging,
649                |p| {
650                    let client = Arc::clone(&integration.client);
651                    async move { client.get_object_tagging(&p).await }
652                },
653            )
654            .await;
655        }
656
657        if test_not_exists {
658            copy_if_not_exists(&integration).await;
659        }
660        if test_conditional_put {
661            put_opts(&integration, true).await;
662        }
663
664        // run integration test with unsigned payload enabled
665        let builder = AmazonS3Builder::from_env().with_unsigned_payload(true);
666        let integration = builder.build().unwrap();
667        put_get_delete_list(&integration).await;
668
669        // run integration test with checksum set to sha256
670        let builder = AmazonS3Builder::from_env().with_checksum_algorithm(Checksum::SHA256);
671        let integration = builder.build().unwrap();
672        put_get_delete_list(&integration).await;
673    }
674
675    #[tokio::test]
676    async fn s3_test_get_nonexistent_location() {
677        maybe_skip_integration!();
678        let integration = AmazonS3Builder::from_env().build().unwrap();
679
680        let location = Path::from_iter([NON_EXISTENT_NAME]);
681
682        let err = get_nonexistent_object(&integration, Some(location))
683            .await
684            .unwrap_err();
685        assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
686    }
687
688    #[tokio::test]
689    async fn s3_test_get_nonexistent_bucket() {
690        maybe_skip_integration!();
691        let config = AmazonS3Builder::from_env().with_bucket_name(NON_EXISTENT_NAME);
692        let integration = config.build().unwrap();
693
694        let location = Path::from_iter([NON_EXISTENT_NAME]);
695
696        let err = integration.get(&location).await.unwrap_err();
697        assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
698    }
699
700    #[tokio::test]
701    async fn s3_test_put_nonexistent_bucket() {
702        maybe_skip_integration!();
703        let config = AmazonS3Builder::from_env().with_bucket_name(NON_EXISTENT_NAME);
704        let integration = config.build().unwrap();
705
706        let location = Path::from_iter([NON_EXISTENT_NAME]);
707        let data = PutPayload::from("arbitrary data");
708
709        let err = integration.put(&location, data).await.unwrap_err();
710        assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
711    }
712
713    #[tokio::test]
714    async fn s3_test_delete_nonexistent_location() {
715        maybe_skip_integration!();
716        let integration = AmazonS3Builder::from_env().build().unwrap();
717
718        let location = Path::from_iter([NON_EXISTENT_NAME]);
719
720        integration.delete(&location).await.unwrap();
721    }
722
723    #[tokio::test]
724    async fn s3_test_delete_nonexistent_bucket() {
725        maybe_skip_integration!();
726        let config = AmazonS3Builder::from_env().with_bucket_name(NON_EXISTENT_NAME);
727        let integration = config.build().unwrap();
728
729        let location = Path::from_iter([NON_EXISTENT_NAME]);
730
731        let err = integration.delete(&location).await.unwrap_err();
732        assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
733    }
734
735    #[tokio::test]
736    #[ignore = "Tests shouldn't call use remote services by default"]
737    async fn test_disable_creds() {
738        // https://registry.opendata.aws/daylight-osm/
739        let v1 = AmazonS3Builder::new()
740            .with_bucket_name("daylight-map-distribution")
741            .with_region("us-west-1")
742            .with_access_key_id("local")
743            .with_secret_access_key("development")
744            .build()
745            .unwrap();
746
747        let prefix = Path::from("release");
748
749        v1.list_with_delimiter(Some(&prefix)).await.unwrap_err();
750
751        let v2 = AmazonS3Builder::new()
752            .with_bucket_name("daylight-map-distribution")
753            .with_region("us-west-1")
754            .with_skip_signature(true)
755            .build()
756            .unwrap();
757
758        v2.list_with_delimiter(Some(&prefix)).await.unwrap();
759    }
760
761    async fn s3_encryption(store: &AmazonS3) {
762        maybe_skip_integration!();
763
764        let data = PutPayload::from(vec![3u8; 1024]);
765
766        let encryption_headers: HeaderMap = store.client.config.encryption_headers.clone().into();
767        let expected_encryption =
768            if let Some(encryption_type) = encryption_headers.get("x-amz-server-side-encryption") {
769                encryption_type
770            } else {
771                eprintln!("Skipping S3 encryption test - encryption not configured");
772                return;
773            };
774
775        let locations = [
776            Path::from("test-encryption-1"),
777            Path::from("test-encryption-2"),
778            Path::from("test-encryption-3"),
779        ];
780
781        store.put(&locations[0], data.clone()).await.unwrap();
782        store.copy(&locations[0], &locations[1]).await.unwrap();
783
784        let mut upload = store.put_multipart(&locations[2]).await.unwrap();
785        upload.put_part(data.clone()).await.unwrap();
786        upload.complete().await.unwrap();
787
788        for location in &locations {
789            let mut context = RetryContext::new(&store.client.config.retry_config);
790
791            let res = store
792                .client
793                .get_request(&mut context, location, GetOptions::default())
794                .await
795                .unwrap();
796
797            let headers = res.headers();
798            assert_eq!(
799                headers
800                    .get("x-amz-server-side-encryption")
801                    .expect("object is not encrypted"),
802                expected_encryption
803            );
804
805            store.delete(location).await.unwrap();
806        }
807    }
808
809    /// See CONTRIBUTING.md for the MinIO setup for this test.
810    #[tokio::test]
811    async fn test_s3_ssec_encryption_with_minio() {
812        if std::env::var("TEST_S3_SSEC_ENCRYPTION").is_err() {
813            eprintln!("Skipping S3 SSE-C encryption test");
814            return;
815        }
816        eprintln!("Running S3 SSE-C encryption test");
817
818        let customer_key = "1234567890abcdef1234567890abcdef";
819        let expected_md5 = "JMwgiexXqwuPqIPjYFmIZQ==";
820
821        let store = AmazonS3Builder::from_env()
822            .with_ssec_encryption(BASE64_STANDARD.encode(customer_key))
823            .with_client_options(ClientOptions::default().with_allow_invalid_certificates(true))
824            .build()
825            .unwrap();
826
827        let data = PutPayload::from(vec![3u8; 1024]);
828
829        let locations = [
830            Path::from("test-encryption-1"),
831            Path::from("test-encryption-2"),
832            Path::from("test-encryption-3"),
833        ];
834
835        // Test put with sse-c.
836        store.put(&locations[0], data.clone()).await.unwrap();
837
838        // Test copy with sse-c.
839        store.copy(&locations[0], &locations[1]).await.unwrap();
840
841        // Test multipart upload with sse-c.
842        let mut upload = store.put_multipart(&locations[2]).await.unwrap();
843        upload.put_part(data.clone()).await.unwrap();
844        upload.complete().await.unwrap();
845
846        // Test get with sse-c.
847        for location in &locations {
848            let mut context = RetryContext::new(&store.client.config.retry_config);
849
850            let res = store
851                .client
852                .get_request(&mut context, location, GetOptions::default())
853                .await
854                .unwrap();
855
856            let headers = res.headers();
857            assert_eq!(
858                headers
859                    .get("x-amz-server-side-encryption-customer-algorithm")
860                    .expect("object is not encrypted with SSE-C"),
861                "AES256"
862            );
863
864            assert_eq!(
865                headers
866                    .get("x-amz-server-side-encryption-customer-key-MD5")
867                    .expect("object is not encrypted with SSE-C"),
868                expected_md5
869            );
870
871            store.delete(location).await.unwrap();
872        }
873    }
874
875    /// Integration test that ensures I/O is done on an alternate threadpool
876    /// when using the `SpawnedReqwestConnector`.
877    #[test]
878    fn s3_alternate_threadpool_spawned_request_connector() {
879        maybe_skip_integration!();
880        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
881
882        // Runtime with I/O enabled
883        let io_runtime = tokio::runtime::Builder::new_current_thread()
884            .enable_all() // <-- turns on IO
885            .build()
886            .unwrap();
887
888        // Runtime without I/O enabled
889        let non_io_runtime = tokio::runtime::Builder::new_current_thread()
890            // note: no call to enable_all
891            .build()
892            .unwrap();
893
894        // run the io runtime in a different thread
895        let io_handle = io_runtime.handle().clone();
896        let thread_handle = std::thread::spawn(move || {
897            io_runtime.block_on(async move {
898                shutdown_rx.await.unwrap();
899            });
900        });
901
902        let store = AmazonS3Builder::from_env()
903            // use different bucket to avoid collisions with other tests
904            .with_bucket_name("test-bucket-for-spawn")
905            .with_http_connector(SpawnedReqwestConnector::new(io_handle))
906            .build()
907            .unwrap();
908
909        // run a request on the non io runtime -- will fail if the connector
910        // does not spawn the request to the io runtime
911        non_io_runtime
912            .block_on(async move {
913                let path = Path::from("alternate_threadpool/test.txt");
914                store.delete(&path).await.ok(); // remove the file if it exists from prior runs
915                store.put(&path, "foo".into()).await?;
916                let res = store.get(&path).await?.bytes().await?;
917                assert_eq!(res.as_ref(), b"foo");
918                store.delete(&path).await?; // cleanup
919                Ok(()) as Result<()>
920            })
921            .expect("failed to run request on non io runtime");
922
923        // shutdown the io runtime and thread
924        shutdown_tx.send(()).ok();
925        thread_handle.join().expect("runtime thread panicked");
926    }
927}