1use async_trait::async_trait;
32use futures_util::stream::BoxStream;
33use futures_util::{StreamExt, TryStreamExt};
34use reqwest::header::{HeaderName, IF_MATCH, IF_NONE_MATCH};
35use reqwest::{Method, StatusCode};
36use std::{sync::Arc, time::Duration};
37use url::Url;
38
39use crate::aws::client::{CompleteMultipartMode, PutPartPayload, RequestError, S3Client};
40use crate::client::CredentialProvider;
41use crate::client::get::GetClientExt;
42use crate::client::list::{ListClient, ListClientExt};
43use crate::multipart::{MultipartStore, PartId};
44use crate::signer::Signer;
45use crate::util::STRICT_ENCODE_SET;
46use crate::{
47 CopyMode, CopyOptions, Error, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload,
48 ObjectMeta, ObjectStore, Path, PutMode, PutMultipartOptions, PutOptions, PutPayload, PutResult,
49 Result, UploadPart,
50};
51
52static TAGS_HEADER: HeaderName = HeaderName::from_static("x-amz-tagging");
53static COPY_SOURCE_HEADER: HeaderName = HeaderName::from_static("x-amz-copy-source");
54
55mod builder;
56mod checksum;
57mod client;
58mod credential;
59mod precondition;
60
61#[cfg(not(target_arch = "wasm32"))]
62mod resolve;
63
64pub use builder::{AmazonS3Builder, AmazonS3ConfigKey};
65pub use checksum::Checksum;
66pub use precondition::{S3ConditionalPut, S3CopyIfNotExists};
67
68#[cfg(not(target_arch = "wasm32"))]
69pub use resolve::resolve_bucket_region;
70
71const STRICT_PATH_ENCODE_SET: percent_encoding::AsciiSet = STRICT_ENCODE_SET.remove(b'/');
73
74const STORE: &str = "S3";
75
76pub type AwsCredentialProvider = Arc<dyn CredentialProvider<Credential = AwsCredential>>;
78use crate::client::parts::Parts;
79use crate::list::{PaginatedListOptions, PaginatedListResult, PaginatedListStore};
80pub use credential::{AwsAuthorizer, AwsCredential};
81
82#[derive(Debug, Clone)]
84pub struct AmazonS3 {
85 client: Arc<S3Client>,
86}
87
88impl std::fmt::Display for AmazonS3 {
89 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
90 write!(f, "AmazonS3({})", self.client.config.bucket)
91 }
92}
93
94impl AmazonS3 {
95 pub fn credentials(&self) -> &AwsCredentialProvider {
97 &self.client.config.credentials
98 }
99
100 fn path_url(&self, path: &Path) -> String {
102 self.client.config.path_url(path)
103 }
104}
105
106#[async_trait]
107impl Signer for AmazonS3 {
108 async fn signed_url(&self, method: Method, path: &Path, expires_in: Duration) -> Result<Url> {
141 let credential = self.credentials().get_credential().await?;
142 let authorizer = AwsAuthorizer::new(&credential, "s3", &self.client.config.region)
143 .with_request_payer(self.client.config.request_payer);
144
145 let path_url = self.path_url(path);
146 let mut url = path_url.parse().map_err(|e| Error::Generic {
147 store: STORE,
148 source: format!("Unable to parse url {path_url}: {e}").into(),
149 })?;
150
151 authorizer.sign(method, &mut url, expires_in);
152
153 Ok(url)
154 }
155}
156
157#[async_trait]
158impl ObjectStore for AmazonS3 {
159 async fn put_opts(
160 &self,
161 location: &Path,
162 payload: PutPayload,
163 opts: PutOptions,
164 ) -> Result<PutResult> {
165 let PutOptions {
166 mode,
167 tags,
168 attributes,
169 extensions,
170 } = opts;
171
172 let request = self
173 .client
174 .request(Method::PUT, location)
175 .with_payload(payload)
176 .with_attributes(attributes)
177 .with_tags(tags)
178 .with_extensions(extensions)
179 .with_encryption_headers();
180
181 match (mode, &self.client.config.conditional_put) {
182 (PutMode::Overwrite, _) => request.idempotent(true).do_put().await,
183 (PutMode::Create, S3ConditionalPut::Disabled) => Err(Error::NotImplemented {
184 operation:
185 "`put_opts` with mode `PutMode::Create` when conditional put is disabled".into(),
186 implementer: self.to_string(),
187 }),
188 (PutMode::Create, S3ConditionalPut::ETagMatch) => {
189 match request.header(&IF_NONE_MATCH, "*").do_put().await {
190 Err(e @ Error::NotModified { .. } | e @ Error::Precondition { .. }) => {
194 Err(Error::AlreadyExists {
195 path: location.to_string(),
196 source: Box::new(e),
197 })
198 }
199 r => r,
200 }
201 }
202 (PutMode::Update(v), put) => {
203 let etag = v.e_tag.ok_or_else(|| Error::Generic {
204 store: STORE,
205 source: "ETag required for conditional put".to_string().into(),
206 })?;
207 match put {
208 S3ConditionalPut::ETagMatch => {
209 match request
210 .header(&IF_MATCH, etag.as_str())
211 .retry_on_conflict(true)
216 .do_put()
217 .await
218 {
219 Err(Error::NotFound { path, source }) => {
224 Err(Error::Precondition { path, source })
225 }
226 r => r,
227 }
228 }
229 S3ConditionalPut::Disabled => Err(Error::NotImplemented {
230 operation:
231 "`put_opts` with mode `PutMode::Update` when conditional put is disabled"
232 .into(),
233 implementer: self.to_string(),
234 }),
235 }
236 }
237 }
238 }
239
240 async fn put_multipart_opts(
241 &self,
242 location: &Path,
243 opts: PutMultipartOptions,
244 ) -> Result<Box<dyn MultipartUpload>> {
245 let upload_id = self.client.create_multipart(location, opts).await?;
246
247 Ok(Box::new(S3MultiPartUpload {
248 part_idx: 0,
249 state: Arc::new(UploadState {
250 client: Arc::clone(&self.client),
251 location: location.clone(),
252 upload_id: upload_id.clone(),
253 parts: Default::default(),
254 }),
255 }))
256 }
257
258 async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
259 self.client.get_opts(location, options).await
260 }
261
262 fn delete_stream(
263 &self,
264 locations: BoxStream<'static, Result<Path>>,
265 ) -> BoxStream<'static, Result<Path>> {
266 let client = Arc::clone(&self.client);
267 locations
268 .try_chunks(1_000)
269 .map(move |locations| {
270 let client = Arc::clone(&client);
271 async move {
272 let locations = locations.map_err(|e| e.1)?;
275 client
276 .bulk_delete_request(locations)
277 .await
278 .map(futures_util::stream::iter)
279 }
280 })
281 .buffered(20)
282 .try_flatten()
283 .boxed()
284 }
285
286 fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
287 self.client.list(prefix)
288 }
289
290 fn list_with_offset(
291 &self,
292 prefix: Option<&Path>,
293 offset: &Path,
294 ) -> BoxStream<'static, Result<ObjectMeta>> {
295 if self.client.config.is_s3_express() {
296 let offset = offset.clone();
297 return self
299 .client
300 .list(prefix)
301 .try_filter(move |f| futures_util::future::ready(f.location > offset))
302 .boxed();
303 }
304
305 self.client.list_with_offset(prefix, offset)
306 }
307
308 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
309 self.client.list_with_delimiter(prefix).await
310 }
311
312 async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) -> Result<()> {
313 let CopyOptions {
314 mode,
315 extensions: _,
316 } = options;
317
318 match mode {
319 CopyMode::Overwrite => {
320 self.client
321 .copy_request(from, to)
322 .idempotent(true)
323 .send()
324 .await?;
325 Ok(())
326 }
327 CopyMode::Create => {
328 let (k, v, status) = match &self.client.config.copy_if_not_exists {
329 Some(S3CopyIfNotExists::Header(k, v)) => {
330 (k, v, StatusCode::PRECONDITION_FAILED)
331 }
332 Some(S3CopyIfNotExists::HeaderWithStatus(k, v, status)) => (k, v, *status),
333 Some(S3CopyIfNotExists::Multipart) => {
334 let upload_id = self
335 .client
336 .create_multipart(to, PutMultipartOptions::default())
337 .await?;
338
339 let res = async {
340 let part_id = self
341 .client
342 .put_part(to, &upload_id, 0, PutPartPayload::Copy(from))
343 .await?;
344 match self
345 .client
346 .complete_multipart(
347 to,
348 &upload_id,
349 vec![part_id],
350 CompleteMultipartMode::Create,
351 )
352 .await
353 {
354 Err(e @ Error::Precondition { .. }) => Err(Error::AlreadyExists {
355 path: to.to_string(),
356 source: Box::new(e),
357 }),
358 Ok(_) => Ok(()),
359 Err(e) => Err(e),
360 }
361 }
362 .await;
363
364 if res.is_err() {
369 let _ = self.client.abort_multipart(to, &upload_id).await;
370 }
371
372 return res;
373 }
374 None => {
375 return Err(Error::NotSupported {
376 source: "S3 does not support copy-if-not-exists".to_string().into(),
377 });
378 }
379 };
380
381 let req = self.client.copy_request(from, to);
382 match req.header(k, v).send().await {
383 Err(RequestError::Retry { source, path })
384 if source.status() == Some(status) =>
385 {
386 Err(Error::AlreadyExists {
387 source: Box::new(source),
388 path,
389 })
390 }
391 Err(e) => Err(e.into()),
392 Ok(_) => Ok(()),
393 }
394 }
395 }
396 }
397}
398
399#[derive(Debug)]
400struct S3MultiPartUpload {
401 part_idx: usize,
402 state: Arc<UploadState>,
403}
404
405#[derive(Debug)]
406struct UploadState {
407 parts: Parts,
408 location: Path,
409 upload_id: String,
410 client: Arc<S3Client>,
411}
412
413#[async_trait]
414impl MultipartUpload for S3MultiPartUpload {
415 fn put_part(&mut self, data: PutPayload) -> UploadPart {
416 let idx = self.part_idx;
417 self.part_idx += 1;
418 let state = Arc::clone(&self.state);
419 Box::pin(async move {
420 let part = state
421 .client
422 .put_part(
423 &state.location,
424 &state.upload_id,
425 idx,
426 PutPartPayload::Part(data),
427 )
428 .await?;
429 state.parts.put(idx, part);
430 Ok(())
431 })
432 }
433
434 async fn complete(&mut self) -> Result<PutResult> {
435 let parts = self.state.parts.finish(self.part_idx)?;
436
437 self.state
438 .client
439 .complete_multipart(
440 &self.state.location,
441 &self.state.upload_id,
442 parts,
443 CompleteMultipartMode::Overwrite,
444 )
445 .await
446 }
447
448 async fn abort(&mut self) -> Result<()> {
449 self.state
450 .client
451 .request(Method::DELETE, &self.state.location)
452 .query(&[("uploadId", &self.state.upload_id)])
453 .idempotent(true)
454 .send()
455 .await?;
456
457 Ok(())
458 }
459}
460
461#[async_trait]
462impl MultipartStore for AmazonS3 {
463 async fn create_multipart(&self, path: &Path) -> Result<MultipartId> {
464 self.client
465 .create_multipart(path, PutMultipartOptions::default())
466 .await
467 }
468
469 async fn put_part(
470 &self,
471 path: &Path,
472 id: &MultipartId,
473 part_idx: usize,
474 data: PutPayload,
475 ) -> Result<PartId> {
476 self.client
477 .put_part(path, id, part_idx, PutPartPayload::Part(data))
478 .await
479 }
480
481 async fn complete_multipart(
482 &self,
483 path: &Path,
484 id: &MultipartId,
485 parts: Vec<PartId>,
486 ) -> Result<PutResult> {
487 self.client
488 .complete_multipart(path, id, parts, CompleteMultipartMode::Overwrite)
489 .await
490 }
491
492 async fn abort_multipart(&self, path: &Path, id: &MultipartId) -> Result<()> {
493 self.client
494 .request(Method::DELETE, path)
495 .query(&[("uploadId", id)])
496 .send()
497 .await?;
498 Ok(())
499 }
500}
501
502#[async_trait]
503impl PaginatedListStore for AmazonS3 {
504 async fn list_paginated(
505 &self,
506 prefix: Option<&str>,
507 opts: PaginatedListOptions,
508 ) -> Result<PaginatedListResult> {
509 self.client.list_request(prefix, opts).await
510 }
511}
512
513#[cfg(test)]
514mod tests {
515 use super::*;
516 use crate::ClientOptions;
517 use crate::ObjectStoreExt;
518 use crate::client::SpawnedReqwestConnector;
519 use crate::client::get::GetClient;
520 use crate::client::retry::RetryContext;
521 use crate::integration::*;
522 use crate::tests::*;
523 use base64::Engine;
524 use base64::prelude::BASE64_STANDARD;
525 use http::HeaderMap;
526
527 const NON_EXISTENT_NAME: &str = "nonexistentname";
528
529 #[tokio::test]
530 async fn write_multipart_file_with_signature() {
531 maybe_skip_integration!();
532
533 let bucket = "test-bucket-for-checksum";
534 let store = AmazonS3Builder::from_env()
535 .with_bucket_name(bucket)
536 .with_checksum_algorithm(Checksum::SHA256)
537 .build()
538 .unwrap();
539
540 let str = "test.bin";
541 let path = Path::parse(str).unwrap();
542 let opts = PutMultipartOptions::default();
543 let mut upload = store.put_multipart_opts(&path, opts).await.unwrap();
544
545 upload
546 .put_part(PutPayload::from(vec![0u8; 10_000_000]))
547 .await
548 .unwrap();
549 upload
550 .put_part(PutPayload::from(vec![0u8; 5_000_000]))
551 .await
552 .unwrap();
553
554 let res = upload.complete().await.unwrap();
555 assert!(res.e_tag.is_some(), "Should have valid etag");
556
557 store.delete(&path).await.unwrap();
558 }
559
560 #[tokio::test]
561 async fn copy_multipart_file_with_signature() {
562 maybe_skip_integration!();
563
564 let bucket = "test-bucket-for-copy-if-not-exists";
565 let store = AmazonS3Builder::from_env()
566 .with_bucket_name(bucket)
567 .with_checksum_algorithm(Checksum::SHA256)
568 .with_copy_if_not_exists(S3CopyIfNotExists::Multipart)
569 .build()
570 .unwrap();
571
572 let src = Path::parse("src.bin").unwrap();
573 let dst = Path::parse("dst.bin").unwrap();
574 store
575 .put(&src, PutPayload::from(vec![0u8; 100_000]))
576 .await
577 .unwrap();
578 if store.head(&dst).await.is_ok() {
579 store.delete(&dst).await.unwrap();
580 }
581 store.copy_if_not_exists(&src, &dst).await.unwrap();
582 store.delete(&src).await.unwrap();
583 store.delete(&dst).await.unwrap();
584 }
585
586 #[tokio::test]
587 async fn write_multipart_file_with_signature_object_lock() {
588 maybe_skip_integration!();
589
590 let bucket = "test-object-lock";
591 let store = AmazonS3Builder::from_env()
592 .with_bucket_name(bucket)
593 .with_checksum_algorithm(Checksum::SHA256)
594 .build()
595 .unwrap();
596
597 let str = "test.bin";
598 let path = Path::parse(str).unwrap();
599 let opts = PutMultipartOptions::default();
600 let mut upload = store.put_multipart_opts(&path, opts).await.unwrap();
601
602 upload
603 .put_part(PutPayload::from(vec![0u8; 10_000_000]))
604 .await
605 .unwrap();
606 upload
607 .put_part(PutPayload::from(vec![0u8; 5_000_000]))
608 .await
609 .unwrap();
610
611 let res = upload.complete().await.unwrap();
612 assert!(res.e_tag.is_some(), "Should have valid etag");
613
614 store.delete(&path).await.unwrap();
615 }
616
617 #[tokio::test]
618 async fn s3_test() {
619 maybe_skip_integration!();
620 let config = AmazonS3Builder::from_env();
621
622 let integration = config.build().unwrap();
623 let config = &integration.client.config;
624 let test_not_exists = config.copy_if_not_exists.is_some();
625 let test_conditional_put = config.conditional_put != S3ConditionalPut::Disabled;
626
627 put_get_delete_list(&integration).await;
628 list_with_offset_exclusivity(&integration).await;
629 get_opts(&integration).await;
630 list_uses_directories_correctly(&integration).await;
631 list_with_delimiter(&integration).await;
632 rename_and_copy(&integration).await;
633 stream_get(&integration).await;
634 multipart(&integration, &integration).await;
635 multipart_race_condition(&integration, true).await;
636 multipart_out_of_order(&integration).await;
637 signing(&integration).await;
638 s3_encryption(&integration).await;
639 put_get_attributes(&integration).await;
640 list_paginated(&integration, &integration).await;
641
642 if config.session_provider.is_none() {
644 tagging(
645 Arc::new(AmazonS3 {
646 client: Arc::clone(&integration.client),
647 }),
648 !config.disable_tagging,
649 |p| {
650 let client = Arc::clone(&integration.client);
651 async move { client.get_object_tagging(&p).await }
652 },
653 )
654 .await;
655 }
656
657 if test_not_exists {
658 copy_if_not_exists(&integration).await;
659 }
660 if test_conditional_put {
661 put_opts(&integration, true).await;
662 }
663
664 let builder = AmazonS3Builder::from_env().with_unsigned_payload(true);
666 let integration = builder.build().unwrap();
667 put_get_delete_list(&integration).await;
668
669 let builder = AmazonS3Builder::from_env().with_checksum_algorithm(Checksum::SHA256);
671 let integration = builder.build().unwrap();
672 put_get_delete_list(&integration).await;
673 }
674
675 #[tokio::test]
676 async fn s3_test_get_nonexistent_location() {
677 maybe_skip_integration!();
678 let integration = AmazonS3Builder::from_env().build().unwrap();
679
680 let location = Path::from_iter([NON_EXISTENT_NAME]);
681
682 let err = get_nonexistent_object(&integration, Some(location))
683 .await
684 .unwrap_err();
685 assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
686 }
687
688 #[tokio::test]
689 async fn s3_test_get_nonexistent_bucket() {
690 maybe_skip_integration!();
691 let config = AmazonS3Builder::from_env().with_bucket_name(NON_EXISTENT_NAME);
692 let integration = config.build().unwrap();
693
694 let location = Path::from_iter([NON_EXISTENT_NAME]);
695
696 let err = integration.get(&location).await.unwrap_err();
697 assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
698 }
699
700 #[tokio::test]
701 async fn s3_test_put_nonexistent_bucket() {
702 maybe_skip_integration!();
703 let config = AmazonS3Builder::from_env().with_bucket_name(NON_EXISTENT_NAME);
704 let integration = config.build().unwrap();
705
706 let location = Path::from_iter([NON_EXISTENT_NAME]);
707 let data = PutPayload::from("arbitrary data");
708
709 let err = integration.put(&location, data).await.unwrap_err();
710 assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
711 }
712
713 #[tokio::test]
714 async fn s3_test_delete_nonexistent_location() {
715 maybe_skip_integration!();
716 let integration = AmazonS3Builder::from_env().build().unwrap();
717
718 let location = Path::from_iter([NON_EXISTENT_NAME]);
719
720 integration.delete(&location).await.unwrap();
721 }
722
723 #[tokio::test]
724 async fn s3_test_delete_nonexistent_bucket() {
725 maybe_skip_integration!();
726 let config = AmazonS3Builder::from_env().with_bucket_name(NON_EXISTENT_NAME);
727 let integration = config.build().unwrap();
728
729 let location = Path::from_iter([NON_EXISTENT_NAME]);
730
731 let err = integration.delete(&location).await.unwrap_err();
732 assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
733 }
734
735 #[tokio::test]
736 #[ignore = "Tests shouldn't call use remote services by default"]
737 async fn test_disable_creds() {
738 let v1 = AmazonS3Builder::new()
740 .with_bucket_name("daylight-map-distribution")
741 .with_region("us-west-1")
742 .with_access_key_id("local")
743 .with_secret_access_key("development")
744 .build()
745 .unwrap();
746
747 let prefix = Path::from("release");
748
749 v1.list_with_delimiter(Some(&prefix)).await.unwrap_err();
750
751 let v2 = AmazonS3Builder::new()
752 .with_bucket_name("daylight-map-distribution")
753 .with_region("us-west-1")
754 .with_skip_signature(true)
755 .build()
756 .unwrap();
757
758 v2.list_with_delimiter(Some(&prefix)).await.unwrap();
759 }
760
761 async fn s3_encryption(store: &AmazonS3) {
762 maybe_skip_integration!();
763
764 let data = PutPayload::from(vec![3u8; 1024]);
765
766 let encryption_headers: HeaderMap = store.client.config.encryption_headers.clone().into();
767 let expected_encryption =
768 if let Some(encryption_type) = encryption_headers.get("x-amz-server-side-encryption") {
769 encryption_type
770 } else {
771 eprintln!("Skipping S3 encryption test - encryption not configured");
772 return;
773 };
774
775 let locations = [
776 Path::from("test-encryption-1"),
777 Path::from("test-encryption-2"),
778 Path::from("test-encryption-3"),
779 ];
780
781 store.put(&locations[0], data.clone()).await.unwrap();
782 store.copy(&locations[0], &locations[1]).await.unwrap();
783
784 let mut upload = store.put_multipart(&locations[2]).await.unwrap();
785 upload.put_part(data.clone()).await.unwrap();
786 upload.complete().await.unwrap();
787
788 for location in &locations {
789 let mut context = RetryContext::new(&store.client.config.retry_config);
790
791 let res = store
792 .client
793 .get_request(&mut context, location, GetOptions::default())
794 .await
795 .unwrap();
796
797 let headers = res.headers();
798 assert_eq!(
799 headers
800 .get("x-amz-server-side-encryption")
801 .expect("object is not encrypted"),
802 expected_encryption
803 );
804
805 store.delete(location).await.unwrap();
806 }
807 }
808
809 #[tokio::test]
811 async fn test_s3_ssec_encryption_with_minio() {
812 if std::env::var("TEST_S3_SSEC_ENCRYPTION").is_err() {
813 eprintln!("Skipping S3 SSE-C encryption test");
814 return;
815 }
816 eprintln!("Running S3 SSE-C encryption test");
817
818 let customer_key = "1234567890abcdef1234567890abcdef";
819 let expected_md5 = "JMwgiexXqwuPqIPjYFmIZQ==";
820
821 let store = AmazonS3Builder::from_env()
822 .with_ssec_encryption(BASE64_STANDARD.encode(customer_key))
823 .with_client_options(ClientOptions::default().with_allow_invalid_certificates(true))
824 .build()
825 .unwrap();
826
827 let data = PutPayload::from(vec![3u8; 1024]);
828
829 let locations = [
830 Path::from("test-encryption-1"),
831 Path::from("test-encryption-2"),
832 Path::from("test-encryption-3"),
833 ];
834
835 store.put(&locations[0], data.clone()).await.unwrap();
837
838 store.copy(&locations[0], &locations[1]).await.unwrap();
840
841 let mut upload = store.put_multipart(&locations[2]).await.unwrap();
843 upload.put_part(data.clone()).await.unwrap();
844 upload.complete().await.unwrap();
845
846 for location in &locations {
848 let mut context = RetryContext::new(&store.client.config.retry_config);
849
850 let res = store
851 .client
852 .get_request(&mut context, location, GetOptions::default())
853 .await
854 .unwrap();
855
856 let headers = res.headers();
857 assert_eq!(
858 headers
859 .get("x-amz-server-side-encryption-customer-algorithm")
860 .expect("object is not encrypted with SSE-C"),
861 "AES256"
862 );
863
864 assert_eq!(
865 headers
866 .get("x-amz-server-side-encryption-customer-key-MD5")
867 .expect("object is not encrypted with SSE-C"),
868 expected_md5
869 );
870
871 store.delete(location).await.unwrap();
872 }
873 }
874
875 #[test]
878 fn s3_alternate_threadpool_spawned_request_connector() {
879 maybe_skip_integration!();
880 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
881
882 let io_runtime = tokio::runtime::Builder::new_current_thread()
884 .enable_all() .build()
886 .unwrap();
887
888 let non_io_runtime = tokio::runtime::Builder::new_current_thread()
890 .build()
892 .unwrap();
893
894 let io_handle = io_runtime.handle().clone();
896 let thread_handle = std::thread::spawn(move || {
897 io_runtime.block_on(async move {
898 shutdown_rx.await.unwrap();
899 });
900 });
901
902 let store = AmazonS3Builder::from_env()
903 .with_bucket_name("test-bucket-for-spawn")
905 .with_http_connector(SpawnedReqwestConnector::new(io_handle))
906 .build()
907 .unwrap();
908
909 non_io_runtime
912 .block_on(async move {
913 let path = Path::from("alternate_threadpool/test.txt");
914 store.delete(&path).await.ok(); store.put(&path, "foo".into()).await?;
916 let res = store.get(&path).await?.bytes().await?;
917 assert_eq!(res.as_ref(), b"foo");
918 store.delete(&path).await?; Ok(()) as Result<()>
920 })
921 .expect("failed to run request on non io runtime");
922
923 shutdown_tx.send(()).ok();
925 thread_handle.join().expect("runtime thread panicked");
926 }
927}