1use bytes::Bytes;
20use futures_util::{StreamExt, TryStreamExt, stream::BoxStream};
21use std::ops::Range;
22
23use crate::multipart::{MultipartStore, PartId};
24use crate::path::Path;
25use crate::{
26 CopyOptions, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, ObjectMeta,
27 ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, RenameOptions, Result,
28};
29
30#[derive(Debug, Clone)]
32pub struct PrefixStore<T> {
33 prefix: Path,
34 inner: T,
35}
36
37impl<T> std::fmt::Display for PrefixStore<T> {
38 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39 write!(f, "PrefixObjectStore({})", self.prefix.as_ref())
40 }
41}
42
43impl<T> PrefixStore<T> {
44 pub fn new(store: T, prefix: impl Into<Path>) -> Self {
46 Self {
47 prefix: prefix.into(),
48 inner: store,
49 }
50 }
51
52 fn full_path(&self, location: &Path) -> Path {
54 full_path(&self.prefix, location)
55 }
56
57 fn strip_prefix(&self, path: Path) -> Path {
59 strip_prefix(&self.prefix, path)
60 }
61
62 fn strip_meta(&self, meta: ObjectMeta) -> ObjectMeta {
64 strip_meta(&self.prefix, meta)
65 }
66}
67
68fn full_path(prefix: &Path, path: &Path) -> Path {
73 prefix.parts().chain(path.parts()).collect()
74}
75
76fn strip_prefix(prefix: &Path, path: Path) -> Path {
78 if let Some(suffix) = path.prefix_match(prefix) {
80 return suffix.collect();
81 }
82 path
83}
84
85fn strip_meta(prefix: &Path, meta: ObjectMeta) -> ObjectMeta {
87 ObjectMeta {
88 last_modified: meta.last_modified,
89 size: meta.size,
90 location: strip_prefix(prefix, meta.location),
91 e_tag: meta.e_tag,
92 version: None,
93 }
94}
95
96#[async_trait::async_trait]
97#[deny(clippy::missing_trait_methods)]
98impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
99 async fn put_opts(
100 &self,
101 location: &Path,
102 payload: PutPayload,
103 opts: PutOptions,
104 ) -> Result<PutResult> {
105 let full_path = self.full_path(location);
106 self.inner.put_opts(&full_path, payload, opts).await
107 }
108
109 async fn put_multipart_opts(
110 &self,
111 location: &Path,
112 opts: PutMultipartOptions,
113 ) -> Result<Box<dyn MultipartUpload>> {
114 let full_path = self.full_path(location);
115 self.inner.put_multipart_opts(&full_path, opts).await
116 }
117
118 async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
119 let full_path = self.full_path(location);
120 self.inner.get_opts(&full_path, options).await
121 }
122
123 async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
124 let full_path = self.full_path(location);
125 self.inner.get_ranges(&full_path, ranges).await
126 }
127
128 fn delete_stream(
129 &self,
130 locations: BoxStream<'static, Result<Path>>,
131 ) -> BoxStream<'static, Result<Path>> {
132 let prefix = self.prefix.clone();
133 let locations = locations
134 .map(move |location| location.map(|loc| full_path(&prefix, &loc)))
135 .boxed();
136 let prefix = self.prefix.clone();
137 self.inner
138 .delete_stream(locations)
139 .map(move |location| location.map(|loc| strip_prefix(&prefix, loc)))
140 .boxed()
141 }
142
143 fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
144 let prefix = self.full_path(prefix.unwrap_or(&Path::default()));
145 let s = self.inner.list(Some(&prefix));
146 let slf_prefix = self.prefix.clone();
147 s.map_ok(move |meta| strip_meta(&slf_prefix, meta)).boxed()
148 }
149
150 fn list_with_offset(
151 &self,
152 prefix: Option<&Path>,
153 offset: &Path,
154 ) -> BoxStream<'static, Result<ObjectMeta>> {
155 let offset = self.full_path(offset);
156 let prefix = self.full_path(prefix.unwrap_or(&Path::default()));
157 let s = self.inner.list_with_offset(Some(&prefix), &offset);
158 let slf_prefix = self.prefix.clone();
159 s.map_ok(move |meta| strip_meta(&slf_prefix, meta)).boxed()
160 }
161
162 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
163 let prefix = self.full_path(prefix.unwrap_or(&Path::default()));
164 self.inner
165 .list_with_delimiter(Some(&prefix))
166 .await
167 .map(|lst| ListResult {
168 common_prefixes: lst
169 .common_prefixes
170 .into_iter()
171 .map(|p| self.strip_prefix(p))
172 .collect(),
173 objects: lst
174 .objects
175 .into_iter()
176 .map(|meta| self.strip_meta(meta))
177 .collect(),
178 })
179 }
180
181 async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) -> Result<()> {
182 let full_from = self.full_path(from);
183 let full_to = self.full_path(to);
184 self.inner.copy_opts(&full_from, &full_to, options).await
185 }
186
187 async fn rename_opts(&self, from: &Path, to: &Path, options: RenameOptions) -> Result<()> {
188 let full_from = self.full_path(from);
189 let full_to = self.full_path(to);
190 self.inner.rename_opts(&full_from, &full_to, options).await
191 }
192}
193
194#[async_trait::async_trait]
195impl<T: MultipartStore> MultipartStore for PrefixStore<T> {
196 async fn create_multipart(&self, path: &Path) -> Result<MultipartId> {
197 let full_path = self.full_path(path);
198 self.inner.create_multipart(&full_path).await
199 }
200
201 async fn put_part(
202 &self,
203 path: &Path,
204 id: &MultipartId,
205 part_idx: usize,
206 data: PutPayload,
207 ) -> Result<PartId> {
208 let full_path = self.full_path(path);
209 self.inner.put_part(&full_path, id, part_idx, data).await
210 }
211
212 async fn complete_multipart(
213 &self,
214 path: &Path,
215 id: &MultipartId,
216 parts: Vec<PartId>,
217 ) -> Result<PutResult> {
218 let full_path = self.full_path(path);
219 self.inner.complete_multipart(&full_path, id, parts).await
220 }
221
222 async fn abort_multipart(&self, path: &Path, id: &MultipartId) -> Result<()> {
223 let full_path = self.full_path(path);
224 self.inner.abort_multipart(&full_path, id).await
225 }
226}
227
228#[cfg(not(target_arch = "wasm32"))]
229#[cfg(test)]
230mod tests {
231 use std::slice;
232
233 use super::*;
234 use crate::local::LocalFileSystem;
235 use crate::memory::InMemory;
236 use crate::{ObjectStoreExt, integration::*};
237
238 use tempfile::TempDir;
239
240 #[tokio::test]
241 async fn prefix_test() {
242 let root = TempDir::new().unwrap();
243 let inner = LocalFileSystem::new_with_prefix(root.path()).unwrap();
244 let integration = PrefixStore::new(inner, "prefix");
245
246 put_get_delete_list(&integration).await;
247 get_opts(&integration).await;
248 list_uses_directories_correctly(&integration).await;
249 list_with_delimiter(&integration).await;
250 rename_and_copy(&integration).await;
251 copy_if_not_exists(&integration).await;
252 stream_get(&integration).await;
253 }
254
255 #[tokio::test]
256 async fn prefix_test_applies_prefix() {
257 let tmpdir = TempDir::new().unwrap();
258 let local = LocalFileSystem::new_with_prefix(tmpdir.path()).unwrap();
259
260 let location = Path::from("prefix/test_file.json");
261 let data = Bytes::from("arbitrary data");
262
263 local.put(&location, data.clone().into()).await.unwrap();
264
265 let prefix = PrefixStore::new(local, "prefix");
266 let location_prefix = Path::from("test_file.json");
267
268 let content_list = flatten_list_stream(&prefix, None).await.unwrap();
269 assert_eq!(content_list, slice::from_ref(&location_prefix));
270
271 let root = Path::from("/");
272 let content_list = flatten_list_stream(&prefix, Some(&root)).await.unwrap();
273 assert_eq!(content_list, slice::from_ref(&location_prefix));
274
275 let read_data = prefix
276 .get(&location_prefix)
277 .await
278 .unwrap()
279 .bytes()
280 .await
281 .unwrap();
282 assert_eq!(&*read_data, data);
283
284 let target_prefix = Path::from("/test_written.json");
285 prefix
286 .put(&target_prefix, data.clone().into())
287 .await
288 .unwrap();
289
290 prefix.delete(&location_prefix).await.unwrap();
291
292 let local = LocalFileSystem::new_with_prefix(tmpdir.path()).unwrap();
293
294 let err = local.get(&location).await.unwrap_err();
295 assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
296
297 let location = Path::from("prefix/test_written.json");
298 let read_data = local.get(&location).await.unwrap().bytes().await.unwrap();
299 assert_eq!(&*read_data, data)
300 }
301
302 #[tokio::test]
303 async fn prefix_multipart() {
304 let store = PrefixStore::new(InMemory::new(), "prefix");
305
306 multipart(&store, &store).await;
307 multipart_out_of_order(&store).await;
308 multipart_race_condition(&store, true).await;
309 }
310}