Skip to main content

object_store/
prefix.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! An object store wrapper handling a constant path prefix
19use bytes::Bytes;
20use futures_util::{StreamExt, TryStreamExt, stream::BoxStream};
21use std::ops::Range;
22
23use crate::multipart::{MultipartStore, PartId};
24use crate::path::Path;
25use crate::{
26    CopyOptions, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, ObjectMeta,
27    ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, RenameOptions, Result,
28};
29
30/// Store wrapper that applies a constant prefix to all paths handled by the store.
31#[derive(Debug, Clone)]
32pub struct PrefixStore<T> {
33    prefix: Path,
34    inner: T,
35}
36
37impl<T> std::fmt::Display for PrefixStore<T> {
38    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39        write!(f, "PrefixObjectStore({})", self.prefix.as_ref())
40    }
41}
42
43impl<T> PrefixStore<T> {
44    /// Create a new instance of [`PrefixStore`]
45    pub fn new(store: T, prefix: impl Into<Path>) -> Self {
46        Self {
47            prefix: prefix.into(),
48            inner: store,
49        }
50    }
51
52    /// Create the full path from a path relative to prefix
53    fn full_path(&self, location: &Path) -> Path {
54        full_path(&self.prefix, location)
55    }
56
57    /// Strip the constant prefix from a given path
58    fn strip_prefix(&self, path: Path) -> Path {
59        strip_prefix(&self.prefix, path)
60    }
61
62    /// Strip the constant prefix from a given ObjectMeta
63    fn strip_meta(&self, meta: ObjectMeta) -> ObjectMeta {
64        strip_meta(&self.prefix, meta)
65    }
66}
67
68// Note: This is a relative hack to move these functions to pure functions so they don't rely
69// on the `self` lifetime.
70
71/// Create the full path from a path relative to prefix
72fn full_path(prefix: &Path, path: &Path) -> Path {
73    prefix.parts().chain(path.parts()).collect()
74}
75
76/// Strip the constant prefix from a given path
77fn strip_prefix(prefix: &Path, path: Path) -> Path {
78    // Note cannot use match because of borrow checker
79    if let Some(suffix) = path.prefix_match(prefix) {
80        return suffix.collect();
81    }
82    path
83}
84
85/// Strip the constant prefix from a given ObjectMeta
86fn strip_meta(prefix: &Path, meta: ObjectMeta) -> ObjectMeta {
87    ObjectMeta {
88        last_modified: meta.last_modified,
89        size: meta.size,
90        location: strip_prefix(prefix, meta.location),
91        e_tag: meta.e_tag,
92        version: None,
93    }
94}
95
96#[async_trait::async_trait]
97#[deny(clippy::missing_trait_methods)]
98impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
99    async fn put_opts(
100        &self,
101        location: &Path,
102        payload: PutPayload,
103        opts: PutOptions,
104    ) -> Result<PutResult> {
105        let full_path = self.full_path(location);
106        self.inner.put_opts(&full_path, payload, opts).await
107    }
108
109    async fn put_multipart_opts(
110        &self,
111        location: &Path,
112        opts: PutMultipartOptions,
113    ) -> Result<Box<dyn MultipartUpload>> {
114        let full_path = self.full_path(location);
115        self.inner.put_multipart_opts(&full_path, opts).await
116    }
117
118    async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
119        let full_path = self.full_path(location);
120        self.inner.get_opts(&full_path, options).await
121    }
122
123    async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
124        let full_path = self.full_path(location);
125        self.inner.get_ranges(&full_path, ranges).await
126    }
127
128    fn delete_stream(
129        &self,
130        locations: BoxStream<'static, Result<Path>>,
131    ) -> BoxStream<'static, Result<Path>> {
132        let prefix = self.prefix.clone();
133        let locations = locations
134            .map(move |location| location.map(|loc| full_path(&prefix, &loc)))
135            .boxed();
136        let prefix = self.prefix.clone();
137        self.inner
138            .delete_stream(locations)
139            .map(move |location| location.map(|loc| strip_prefix(&prefix, loc)))
140            .boxed()
141    }
142
143    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
144        let prefix = self.full_path(prefix.unwrap_or(&Path::default()));
145        let s = self.inner.list(Some(&prefix));
146        let slf_prefix = self.prefix.clone();
147        s.map_ok(move |meta| strip_meta(&slf_prefix, meta)).boxed()
148    }
149
150    fn list_with_offset(
151        &self,
152        prefix: Option<&Path>,
153        offset: &Path,
154    ) -> BoxStream<'static, Result<ObjectMeta>> {
155        let offset = self.full_path(offset);
156        let prefix = self.full_path(prefix.unwrap_or(&Path::default()));
157        let s = self.inner.list_with_offset(Some(&prefix), &offset);
158        let slf_prefix = self.prefix.clone();
159        s.map_ok(move |meta| strip_meta(&slf_prefix, meta)).boxed()
160    }
161
162    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
163        let prefix = self.full_path(prefix.unwrap_or(&Path::default()));
164        self.inner
165            .list_with_delimiter(Some(&prefix))
166            .await
167            .map(|lst| ListResult {
168                common_prefixes: lst
169                    .common_prefixes
170                    .into_iter()
171                    .map(|p| self.strip_prefix(p))
172                    .collect(),
173                objects: lst
174                    .objects
175                    .into_iter()
176                    .map(|meta| self.strip_meta(meta))
177                    .collect(),
178            })
179    }
180
181    async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) -> Result<()> {
182        let full_from = self.full_path(from);
183        let full_to = self.full_path(to);
184        self.inner.copy_opts(&full_from, &full_to, options).await
185    }
186
187    async fn rename_opts(&self, from: &Path, to: &Path, options: RenameOptions) -> Result<()> {
188        let full_from = self.full_path(from);
189        let full_to = self.full_path(to);
190        self.inner.rename_opts(&full_from, &full_to, options).await
191    }
192}
193
194#[async_trait::async_trait]
195impl<T: MultipartStore> MultipartStore for PrefixStore<T> {
196    async fn create_multipart(&self, path: &Path) -> Result<MultipartId> {
197        let full_path = self.full_path(path);
198        self.inner.create_multipart(&full_path).await
199    }
200
201    async fn put_part(
202        &self,
203        path: &Path,
204        id: &MultipartId,
205        part_idx: usize,
206        data: PutPayload,
207    ) -> Result<PartId> {
208        let full_path = self.full_path(path);
209        self.inner.put_part(&full_path, id, part_idx, data).await
210    }
211
212    async fn complete_multipart(
213        &self,
214        path: &Path,
215        id: &MultipartId,
216        parts: Vec<PartId>,
217    ) -> Result<PutResult> {
218        let full_path = self.full_path(path);
219        self.inner.complete_multipart(&full_path, id, parts).await
220    }
221
222    async fn abort_multipart(&self, path: &Path, id: &MultipartId) -> Result<()> {
223        let full_path = self.full_path(path);
224        self.inner.abort_multipart(&full_path, id).await
225    }
226}
227
228#[cfg(not(target_arch = "wasm32"))]
229#[cfg(test)]
230mod tests {
231    use std::slice;
232
233    use super::*;
234    use crate::local::LocalFileSystem;
235    use crate::memory::InMemory;
236    use crate::{ObjectStoreExt, integration::*};
237
238    use tempfile::TempDir;
239
240    #[tokio::test]
241    async fn prefix_test() {
242        let root = TempDir::new().unwrap();
243        let inner = LocalFileSystem::new_with_prefix(root.path()).unwrap();
244        let integration = PrefixStore::new(inner, "prefix");
245
246        put_get_delete_list(&integration).await;
247        get_opts(&integration).await;
248        list_uses_directories_correctly(&integration).await;
249        list_with_delimiter(&integration).await;
250        rename_and_copy(&integration).await;
251        copy_if_not_exists(&integration).await;
252        stream_get(&integration).await;
253    }
254
255    #[tokio::test]
256    async fn prefix_test_applies_prefix() {
257        let tmpdir = TempDir::new().unwrap();
258        let local = LocalFileSystem::new_with_prefix(tmpdir.path()).unwrap();
259
260        let location = Path::from("prefix/test_file.json");
261        let data = Bytes::from("arbitrary data");
262
263        local.put(&location, data.clone().into()).await.unwrap();
264
265        let prefix = PrefixStore::new(local, "prefix");
266        let location_prefix = Path::from("test_file.json");
267
268        let content_list = flatten_list_stream(&prefix, None).await.unwrap();
269        assert_eq!(content_list, slice::from_ref(&location_prefix));
270
271        let root = Path::from("/");
272        let content_list = flatten_list_stream(&prefix, Some(&root)).await.unwrap();
273        assert_eq!(content_list, slice::from_ref(&location_prefix));
274
275        let read_data = prefix
276            .get(&location_prefix)
277            .await
278            .unwrap()
279            .bytes()
280            .await
281            .unwrap();
282        assert_eq!(&*read_data, data);
283
284        let target_prefix = Path::from("/test_written.json");
285        prefix
286            .put(&target_prefix, data.clone().into())
287            .await
288            .unwrap();
289
290        prefix.delete(&location_prefix).await.unwrap();
291
292        let local = LocalFileSystem::new_with_prefix(tmpdir.path()).unwrap();
293
294        let err = local.get(&location).await.unwrap_err();
295        assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
296
297        let location = Path::from("prefix/test_written.json");
298        let read_data = local.get(&location).await.unwrap().bytes().await.unwrap();
299        assert_eq!(&*read_data, data)
300    }
301
302    #[tokio::test]
303    async fn prefix_multipart() {
304        let store = PrefixStore::new(InMemory::new(), "prefix");
305
306        multipart(&store, &store).await;
307        multipart_out_of_order(&store).await;
308        multipart_race_condition(&store, true).await;
309    }
310}