1use std::fmt::{Debug, Display, Formatter};
21use std::ops::Range;
22use std::sync::Arc;
23
24use async_trait::async_trait;
25use bytes::{BufMut, Bytes, BytesMut};
26use futures_util::StreamExt;
27use futures_util::stream::BoxStream;
28
29use crate::path::Path;
30use crate::{
31 CopyOptions, GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta,
32 ObjectStore, PutMultipartOptions, PutOptions, PutResult, RenameOptions,
33};
34use crate::{PutPayload, Result};
35
36#[derive(Debug)]
45pub struct ChunkedStore {
46 inner: Arc<dyn ObjectStore>,
47 chunk_size: usize, }
49
50impl ChunkedStore {
51 pub fn new(inner: Arc<dyn ObjectStore>, chunk_size: usize) -> Self {
53 Self { inner, chunk_size }
54 }
55}
56
57impl Display for ChunkedStore {
58 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
59 write!(f, "ChunkedStore({})", self.inner)
60 }
61}
62
63#[async_trait]
64#[deny(clippy::missing_trait_methods)]
65impl ObjectStore for ChunkedStore {
66 async fn put_opts(
67 &self,
68 location: &Path,
69 payload: PutPayload,
70 opts: PutOptions,
71 ) -> Result<PutResult> {
72 self.inner.put_opts(location, payload, opts).await
73 }
74
75 async fn put_multipart_opts(
76 &self,
77 location: &Path,
78 opts: PutMultipartOptions,
79 ) -> Result<Box<dyn MultipartUpload>> {
80 self.inner.put_multipart_opts(location, opts).await
81 }
82
83 async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
84 let r = self.inner.get_opts(location, options).await?;
85 let stream = match r.payload {
86 #[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
87 GetResultPayload::File(file, path) => {
88 crate::local::chunked_stream(file, path, r.range.clone(), self.chunk_size)
89 }
90 GetResultPayload::Stream(stream) => {
91 let buffer = BytesMut::new();
92 futures_util::stream::unfold(
93 (stream, buffer, false, self.chunk_size),
94 |(mut stream, mut buffer, mut exhausted, chunk_size)| async move {
95 if exhausted {
98 return None;
99 }
100 while buffer.len() < chunk_size {
101 match stream.next().await {
102 None => {
103 exhausted = true;
104 let slice = buffer.split_off(0).freeze();
105 return Some((
106 Ok(slice),
107 (stream, buffer, exhausted, chunk_size),
108 ));
109 }
110 Some(Ok(bytes)) => {
111 buffer.put(bytes);
112 }
113 Some(Err(e)) => {
114 return Some((
115 Err(crate::Error::Generic {
116 store: "ChunkedStore",
117 source: Box::new(e),
118 }),
119 (stream, buffer, exhausted, chunk_size),
120 ));
121 }
122 };
123 }
124 let slice = buffer.split_to(chunk_size).freeze();
126 Some((Ok(slice), (stream, buffer, exhausted, chunk_size)))
127 },
128 )
129 .boxed()
130 }
131 };
132 Ok(GetResult {
133 payload: GetResultPayload::Stream(stream),
134 ..r
135 })
136 }
137
138 async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
139 self.inner.get_ranges(location, ranges).await
140 }
141
142 fn delete_stream(
143 &self,
144 locations: BoxStream<'static, Result<Path>>,
145 ) -> BoxStream<'static, Result<Path>> {
146 self.inner.delete_stream(locations)
147 }
148
149 fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
150 self.inner.list(prefix)
151 }
152
153 fn list_with_offset(
154 &self,
155 prefix: Option<&Path>,
156 offset: &Path,
157 ) -> BoxStream<'static, Result<ObjectMeta>> {
158 self.inner.list_with_offset(prefix, offset)
159 }
160
161 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
162 self.inner.list_with_delimiter(prefix).await
163 }
164
165 async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) -> Result<()> {
166 self.inner.copy_opts(from, to, options).await
167 }
168
169 async fn rename_opts(&self, from: &Path, to: &Path, options: RenameOptions) -> Result<()> {
170 self.inner.rename_opts(from, to, options).await
171 }
172}
173
174#[cfg(test)]
175mod tests {
176 use futures_util::StreamExt;
177
178 use crate::ObjectStoreExt;
179 #[cfg(feature = "fs")]
180 use crate::integration::*;
181 #[cfg(feature = "fs")]
182 use crate::local::LocalFileSystem;
183 use crate::memory::InMemory;
184 use crate::path::Path;
185
186 use super::*;
187
188 #[tokio::test]
189 async fn test_chunked_basic() {
190 let location = Path::parse("test").unwrap();
191 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
192 store.put(&location, vec![0; 1001].into()).await.unwrap();
193
194 for chunk_size in [10, 20, 31] {
195 let store = ChunkedStore::new(Arc::clone(&store), chunk_size);
196 let mut s = match store.get(&location).await.unwrap().payload {
197 GetResultPayload::Stream(s) => s,
198 _ => unreachable!(),
199 };
200
201 let mut remaining = 1001;
202 while let Some(next) = s.next().await {
203 let size = next.unwrap().len() as u64;
204 let expected = remaining.min(chunk_size as u64);
205 assert_eq!(size, expected);
206 remaining -= expected;
207 }
208 assert_eq!(remaining, 0);
209 }
210 }
211
212 #[cfg(feature = "fs")]
213 #[tokio::test]
214 async fn test_chunked() {
215 let temporary = tempfile::tempdir().unwrap();
216 let integrations: &[Arc<dyn ObjectStore>] = &[
217 Arc::new(InMemory::new()),
218 Arc::new(LocalFileSystem::new_with_prefix(temporary.path()).unwrap()),
219 ];
220
221 for integration in integrations {
222 let integration = ChunkedStore::new(Arc::clone(integration), 100);
223
224 put_get_delete_list(&integration).await;
225 get_opts(&integration).await;
226 list_uses_directories_correctly(&integration).await;
227 list_with_delimiter(&integration).await;
228 rename_and_copy(&integration).await;
229 copy_if_not_exists(&integration).await;
230 stream_get(&integration).await;
231 }
232 }
233}