Skip to main content

object_store/
registry.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Map object URLs to [`ObjectStore`]
19
20use crate::path::{InvalidPart, Path, PathPart};
21use crate::{ObjectStore, parse_url_opts};
22use parking_lot::RwLock;
23use std::collections::HashMap;
24use std::sync::Arc;
25use url::Url;
26
27/// [`ObjectStoreRegistry`] maps a URL to an [`ObjectStore`] instance
28pub trait ObjectStoreRegistry: Send + Sync + std::fmt::Debug + 'static {
29    /// Register a new store for the provided store URL
30    ///
31    /// If a store with the same URL existed before, it is replaced and returned
32    fn register(&self, url: Url, store: Arc<dyn ObjectStore>) -> Option<Arc<dyn ObjectStore>>;
33
34    /// Resolve an object URL
35    ///
36    /// If [`ObjectStoreRegistry::register`] has been called with a URL with the same
37    /// scheme, and authority as the object URL, and a path that is a prefix of the object
38    /// URL's, it should be returned along with the trailing path. Paths should be matched
39    /// on a path segment basis, and in the event of multiple possibilities the longest
40    /// path match should be returned.
41    ///
42    /// If a store hasn't been registered, an [`ObjectStoreRegistry`] may lazily create
43    /// one if the URL is understood
44    ///
45    /// For example
46    ///
47    /// ```
48    /// # use std::sync::Arc;
49    /// # use url::Url;
50    /// # use object_store::memory::InMemory;
51    /// # use object_store::ObjectStore;
52    /// # use object_store::prefix::PrefixStore;
53    /// # use object_store::registry::{DefaultObjectStoreRegistry, ObjectStoreRegistry};
54    /// #
55    /// let registry = DefaultObjectStoreRegistry::new();
56    ///
57    /// let bucket1 = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
58    /// let base = Url::parse("s3://bucket1/").unwrap();
59    /// registry.register(base, bucket1.clone());
60    ///
61    /// let url = Url::parse("s3://bucket1/path/to/object").unwrap();
62    /// let (ret, path) = registry.resolve(&url).unwrap();
63    /// assert_eq!(path.as_ref(), "path/to/object");
64    /// assert!(Arc::ptr_eq(&ret, &bucket1));
65    ///
66    /// let bucket2 = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
67    /// let base = Url::parse("https://s3.region.amazonaws.com/bucket").unwrap();
68    /// registry.register(base, bucket2.clone());
69    ///
70    /// let url = Url::parse("https://s3.region.amazonaws.com/bucket/path/to/object").unwrap();
71    /// let (ret, path) = registry.resolve(&url).unwrap();
72    /// assert_eq!(path.as_ref(), "path/to/object");
73    /// assert!(Arc::ptr_eq(&ret, &bucket2));
74    ///
75    /// let bucket3 = Arc::new(PrefixStore::new(InMemory::new(), "path")) as Arc<dyn ObjectStore>;
76    /// let base = Url::parse("https://s3.region.amazonaws.com/bucket/path").unwrap();
77    /// registry.register(base, bucket3.clone());
78    ///
79    /// let url = Url::parse("https://s3.region.amazonaws.com/bucket/path/to/object").unwrap();
80    /// let (ret, path) = registry.resolve(&url).unwrap();
81    /// assert_eq!(path.as_ref(), "to/object");
82    /// assert!(Arc::ptr_eq(&ret, &bucket3));
83    /// ```
84    fn resolve(&self, url: &Url) -> crate::Result<(Arc<dyn ObjectStore>, Path)>;
85}
86
87/// Error type for [`DefaultObjectStoreRegistry`]
88///
89/// Crate private/opaque type to make the error handling code more ergonomic.
90/// Always converted into `crate::Error` when reported externally.
91#[derive(Debug, thiserror::Error)]
92#[non_exhaustive]
93enum Error {
94    #[error("ObjectStore not found")]
95    NotFound,
96
97    #[error("Error parsing URL path segment")]
98    InvalidPart(#[from] InvalidPart),
99}
100
101impl From<Error> for crate::Error {
102    fn from(value: Error) -> Self {
103        Self::Generic {
104            store: "ObjectStoreRegistry",
105            source: Box::new(value),
106        }
107    }
108}
109
110/// An [`ObjectStoreRegistry`] that uses [`parse_url_opts`] to create stores based on the environment
111#[derive(Debug, Default)]
112pub struct DefaultObjectStoreRegistry {
113    /// Mapping from [`url_key`] to [`PathEntry`]
114    map: RwLock<HashMap<String, PathEntry>>,
115}
116
117/// [`PathEntry`] construct a tree of path segments starting from the root
118///
119/// For example the following paths
120///
121/// * `/` => store1
122/// * `/foo/bar` => store2
123///
124/// Would be represented by
125///
126/// ```yaml
127/// store: Some(store1)
128/// children:
129///   foo:
130///     store: None
131///     children:
132///       bar:
133///         store: Some(store2)
134/// ```
135///
136#[derive(Debug, Default)]
137struct PathEntry {
138    /// Store, if defined at this path
139    store: Option<Arc<dyn ObjectStore>>,
140    /// Child [`PathEntry`], keyed by the next path segment in their path
141    children: HashMap<String, Self>,
142}
143
144impl PathEntry {
145    /// Lookup a store based on URL path
146    ///
147    /// Returns the store and its path segment depth
148    fn lookup(&self, to_resolve: &Url) -> Option<(&Arc<dyn ObjectStore>, usize)> {
149        let mut current = self;
150        let mut ret = self.store.as_ref().map(|store| (store, 0));
151        let mut depth = 0;
152        // Traverse the PathEntry tree to find the longest match
153        for segment in path_segments(to_resolve.path()) {
154            match current.children.get(segment) {
155                Some(e) => {
156                    current = e;
157                    depth += 1;
158                    if let Some(store) = &current.store {
159                        ret = Some((store, depth))
160                    }
161                }
162                None => break,
163            }
164        }
165        ret
166    }
167}
168
169impl DefaultObjectStoreRegistry {
170    /// Create a new [`DefaultObjectStoreRegistry`]
171    pub fn new() -> Self {
172        Self::default()
173    }
174}
175
176impl ObjectStoreRegistry for DefaultObjectStoreRegistry {
177    fn register(&self, url: Url, store: Arc<dyn ObjectStore>) -> Option<Arc<dyn ObjectStore>> {
178        let mut map = self.map.write();
179        let key = url_key(&url);
180        let mut entry = map.entry(key.to_string()).or_default();
181
182        for segment in path_segments(url.path()) {
183            entry = entry.children.entry(segment.to_string()).or_default();
184        }
185        entry.store.replace(store)
186    }
187
188    fn resolve(&self, to_resolve: &Url) -> crate::Result<(Arc<dyn ObjectStore>, Path)> {
189        let key = url_key(to_resolve);
190        {
191            let map = self.map.read();
192
193            if let Some((store, depth)) = map.get(key).and_then(|entry| entry.lookup(to_resolve)) {
194                let path = path_suffix(to_resolve, depth)?;
195                return Ok((Arc::clone(store), path));
196            }
197        }
198
199        if let Ok((store, path)) = parse_url_opts(to_resolve, std::env::vars()) {
200            let depth = num_segments(to_resolve.path()) - num_segments(path.as_ref());
201
202            let mut map = self.map.write();
203            let mut entry = map.entry(key.to_string()).or_default();
204            for segment in path_segments(to_resolve.path()).take(depth) {
205                entry = entry.children.entry(segment.to_string()).or_default();
206            }
207            let store = Arc::clone(match &entry.store {
208                None => entry.store.insert(Arc::from(store)),
209                Some(x) => x, // Racing creation - use existing
210            });
211
212            let path = path_suffix(to_resolve, depth)?;
213            return Ok((store, path));
214        }
215
216        Err(Error::NotFound.into())
217    }
218}
219
220/// Extracts the scheme and authority of a URL (components before the Path)
221fn url_key(url: &Url) -> &str {
222    &url[..url::Position::AfterPort]
223}
224
225/// Returns the non-empty segments of a path
226///
227/// Note: We don't use [`Url::path_segments`] as we only want non-empty paths
228fn path_segments(s: &str) -> impl Iterator<Item = &str> {
229    s.split('/').filter(|x| !x.is_empty())
230}
231
232/// Returns the number of non-empty path segments in a path
233fn num_segments(s: &str) -> usize {
234    path_segments(s).count()
235}
236
237/// Returns the path of `url` skipping the first `depth` segments
238fn path_suffix(url: &Url, depth: usize) -> Result<Path, Error> {
239    let segments = path_segments(url.path()).skip(depth);
240    let path = segments.map(PathPart::parse).collect::<Result<_, _>>()?;
241    Ok(path)
242}
243
244#[cfg(test)]
245mod tests {
246    use super::*;
247    use crate::memory::InMemory;
248    use crate::prefix::PrefixStore;
249
250    #[test]
251    fn test_num_segments() {
252        assert_eq!(num_segments(""), 0);
253        assert_eq!(num_segments("/"), 0);
254        assert_eq!(num_segments("/banana"), 1);
255        assert_eq!(num_segments("banana"), 1);
256        assert_eq!(num_segments("/banana/crumble"), 2);
257        assert_eq!(num_segments("banana/crumble"), 2);
258    }
259
260    #[test]
261    fn test_default_registry() {
262        let registry = DefaultObjectStoreRegistry::new();
263
264        // Should automatically register in memory store
265        let banana_url = Url::parse("memory:///banana").unwrap();
266        let (resolved, path) = registry.resolve(&banana_url).unwrap();
267        assert_eq!(path.as_ref(), "banana");
268
269        // Should replace store
270        let url = Url::parse("memory:///").unwrap();
271        let root = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
272        let replaced = registry.register(url, Arc::clone(&root)).unwrap();
273        assert!(Arc::ptr_eq(&resolved, &replaced));
274
275        // Should not replace store
276        let banana = Arc::new(PrefixStore::new(InMemory::new(), "banana")) as Arc<dyn ObjectStore>;
277        assert!(
278            registry
279                .register(banana_url.clone(), Arc::clone(&banana))
280                .is_none()
281        );
282
283        // Should resolve to banana store
284        let (resolved, path) = registry.resolve(&banana_url).unwrap();
285        assert_eq!(path.as_ref(), "");
286        assert!(Arc::ptr_eq(&resolved, &banana));
287
288        // If we register another store it still resolves banana
289        let apples_url = Url::parse("memory:///apples").unwrap();
290        let apples = Arc::new(PrefixStore::new(InMemory::new(), "apples")) as Arc<dyn ObjectStore>;
291        assert!(registry.register(apples_url, Arc::clone(&apples)).is_none());
292
293        // Should still resolve to banana store
294        let (resolved, path) = registry.resolve(&banana_url).unwrap();
295        assert_eq!(path.as_ref(), "");
296        assert!(Arc::ptr_eq(&resolved, &banana));
297
298        // Should be path segment based
299        let banana_muffins_url = Url::parse("memory:///banana_muffins").unwrap();
300        let (resolved, path) = registry.resolve(&banana_muffins_url).unwrap();
301        assert_eq!(path.as_ref(), "banana_muffins");
302        assert!(Arc::ptr_eq(&resolved, &root));
303
304        // Should resolve to root even though path contains prefix of valid store
305        let to_resolve = Url::parse("memory:///foo/banana").unwrap();
306        let (resolved, path) = registry.resolve(&to_resolve).unwrap();
307        assert_eq!(path.as_ref(), "foo/banana");
308        assert!(Arc::ptr_eq(&resolved, &root));
309
310        let nested_url = Url::parse("memory:///apples/bananas").unwrap();
311        let nested =
312            Arc::new(PrefixStore::new(InMemory::new(), "apples/bananas")) as Arc<dyn ObjectStore>;
313        assert!(registry.register(nested_url, Arc::clone(&nested)).is_none());
314
315        let to_resolve = Url::parse("memory:///apples/bananas/muffins/cupcakes").unwrap();
316        let (resolved, path) = registry.resolve(&to_resolve).unwrap();
317        assert_eq!(path.as_ref(), "muffins/cupcakes");
318        assert!(Arc::ptr_eq(&resolved, &nested));
319
320        let nested_url2 = Url::parse("memory:///1/2/3").unwrap();
321        let nested2 = Arc::new(PrefixStore::new(InMemory::new(), "1/2/3")) as Arc<dyn ObjectStore>;
322        assert!(
323            registry
324                .register(nested_url2, Arc::clone(&nested2))
325                .is_none()
326        );
327
328        let to_resolve = Url::parse("memory:///1/2/3/4/5/6").unwrap();
329        let (resolved, path) = registry.resolve(&to_resolve).unwrap();
330        assert_eq!(path.as_ref(), "4/5/6");
331        assert!(Arc::ptr_eq(&resolved, &nested2));
332
333        let custom_scheme_url = Url::parse("custom:///").unwrap();
334        let custom_scheme = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
335        assert!(
336            registry
337                .register(custom_scheme_url, Arc::clone(&custom_scheme))
338                .is_none()
339        );
340
341        let to_resolve = Url::parse("custom:///6/7").unwrap();
342        let (resolved, path) = registry.resolve(&to_resolve).unwrap();
343        assert_eq!(path.as_ref(), "6/7");
344        assert!(Arc::ptr_eq(&resolved, &custom_scheme));
345    }
346}