1use crate::path::{InvalidPart, Path, PathPart};
21use crate::{ObjectStore, parse_url_opts};
22use parking_lot::RwLock;
23use std::collections::HashMap;
24use std::sync::Arc;
25use url::Url;
26
27pub trait ObjectStoreRegistry: Send + Sync + std::fmt::Debug + 'static {
29 fn register(&self, url: Url, store: Arc<dyn ObjectStore>) -> Option<Arc<dyn ObjectStore>>;
33
34 fn resolve(&self, url: &Url) -> crate::Result<(Arc<dyn ObjectStore>, Path)>;
85}
86
87#[derive(Debug, thiserror::Error)]
92#[non_exhaustive]
93enum Error {
94 #[error("ObjectStore not found")]
95 NotFound,
96
97 #[error("Error parsing URL path segment")]
98 InvalidPart(#[from] InvalidPart),
99}
100
101impl From<Error> for crate::Error {
102 fn from(value: Error) -> Self {
103 Self::Generic {
104 store: "ObjectStoreRegistry",
105 source: Box::new(value),
106 }
107 }
108}
109
110#[derive(Debug, Default)]
112pub struct DefaultObjectStoreRegistry {
113 map: RwLock<HashMap<String, PathEntry>>,
115}
116
117#[derive(Debug, Default)]
137struct PathEntry {
138 store: Option<Arc<dyn ObjectStore>>,
140 children: HashMap<String, Self>,
142}
143
144impl PathEntry {
145 fn lookup(&self, to_resolve: &Url) -> Option<(&Arc<dyn ObjectStore>, usize)> {
149 let mut current = self;
150 let mut ret = self.store.as_ref().map(|store| (store, 0));
151 let mut depth = 0;
152 for segment in path_segments(to_resolve.path()) {
154 match current.children.get(segment) {
155 Some(e) => {
156 current = e;
157 depth += 1;
158 if let Some(store) = ¤t.store {
159 ret = Some((store, depth))
160 }
161 }
162 None => break,
163 }
164 }
165 ret
166 }
167}
168
169impl DefaultObjectStoreRegistry {
170 pub fn new() -> Self {
172 Self::default()
173 }
174}
175
176impl ObjectStoreRegistry for DefaultObjectStoreRegistry {
177 fn register(&self, url: Url, store: Arc<dyn ObjectStore>) -> Option<Arc<dyn ObjectStore>> {
178 let mut map = self.map.write();
179 let key = url_key(&url);
180 let mut entry = map.entry(key.to_string()).or_default();
181
182 for segment in path_segments(url.path()) {
183 entry = entry.children.entry(segment.to_string()).or_default();
184 }
185 entry.store.replace(store)
186 }
187
188 fn resolve(&self, to_resolve: &Url) -> crate::Result<(Arc<dyn ObjectStore>, Path)> {
189 let key = url_key(to_resolve);
190 {
191 let map = self.map.read();
192
193 if let Some((store, depth)) = map.get(key).and_then(|entry| entry.lookup(to_resolve)) {
194 let path = path_suffix(to_resolve, depth)?;
195 return Ok((Arc::clone(store), path));
196 }
197 }
198
199 if let Ok((store, path)) = parse_url_opts(to_resolve, std::env::vars()) {
200 let depth = num_segments(to_resolve.path()) - num_segments(path.as_ref());
201
202 let mut map = self.map.write();
203 let mut entry = map.entry(key.to_string()).or_default();
204 for segment in path_segments(to_resolve.path()).take(depth) {
205 entry = entry.children.entry(segment.to_string()).or_default();
206 }
207 let store = Arc::clone(match &entry.store {
208 None => entry.store.insert(Arc::from(store)),
209 Some(x) => x, });
211
212 let path = path_suffix(to_resolve, depth)?;
213 return Ok((store, path));
214 }
215
216 Err(Error::NotFound.into())
217 }
218}
219
220fn url_key(url: &Url) -> &str {
222 &url[..url::Position::AfterPort]
223}
224
225fn path_segments(s: &str) -> impl Iterator<Item = &str> {
229 s.split('/').filter(|x| !x.is_empty())
230}
231
232fn num_segments(s: &str) -> usize {
234 path_segments(s).count()
235}
236
237fn path_suffix(url: &Url, depth: usize) -> Result<Path, Error> {
239 let segments = path_segments(url.path()).skip(depth);
240 let path = segments.map(PathPart::parse).collect::<Result<_, _>>()?;
241 Ok(path)
242}
243
244#[cfg(test)]
245mod tests {
246 use super::*;
247 use crate::memory::InMemory;
248 use crate::prefix::PrefixStore;
249
250 #[test]
251 fn test_num_segments() {
252 assert_eq!(num_segments(""), 0);
253 assert_eq!(num_segments("/"), 0);
254 assert_eq!(num_segments("/banana"), 1);
255 assert_eq!(num_segments("banana"), 1);
256 assert_eq!(num_segments("/banana/crumble"), 2);
257 assert_eq!(num_segments("banana/crumble"), 2);
258 }
259
260 #[test]
261 fn test_default_registry() {
262 let registry = DefaultObjectStoreRegistry::new();
263
264 let banana_url = Url::parse("memory:///banana").unwrap();
266 let (resolved, path) = registry.resolve(&banana_url).unwrap();
267 assert_eq!(path.as_ref(), "banana");
268
269 let url = Url::parse("memory:///").unwrap();
271 let root = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
272 let replaced = registry.register(url, Arc::clone(&root)).unwrap();
273 assert!(Arc::ptr_eq(&resolved, &replaced));
274
275 let banana = Arc::new(PrefixStore::new(InMemory::new(), "banana")) as Arc<dyn ObjectStore>;
277 assert!(
278 registry
279 .register(banana_url.clone(), Arc::clone(&banana))
280 .is_none()
281 );
282
283 let (resolved, path) = registry.resolve(&banana_url).unwrap();
285 assert_eq!(path.as_ref(), "");
286 assert!(Arc::ptr_eq(&resolved, &banana));
287
288 let apples_url = Url::parse("memory:///apples").unwrap();
290 let apples = Arc::new(PrefixStore::new(InMemory::new(), "apples")) as Arc<dyn ObjectStore>;
291 assert!(registry.register(apples_url, Arc::clone(&apples)).is_none());
292
293 let (resolved, path) = registry.resolve(&banana_url).unwrap();
295 assert_eq!(path.as_ref(), "");
296 assert!(Arc::ptr_eq(&resolved, &banana));
297
298 let banana_muffins_url = Url::parse("memory:///banana_muffins").unwrap();
300 let (resolved, path) = registry.resolve(&banana_muffins_url).unwrap();
301 assert_eq!(path.as_ref(), "banana_muffins");
302 assert!(Arc::ptr_eq(&resolved, &root));
303
304 let to_resolve = Url::parse("memory:///foo/banana").unwrap();
306 let (resolved, path) = registry.resolve(&to_resolve).unwrap();
307 assert_eq!(path.as_ref(), "foo/banana");
308 assert!(Arc::ptr_eq(&resolved, &root));
309
310 let nested_url = Url::parse("memory:///apples/bananas").unwrap();
311 let nested =
312 Arc::new(PrefixStore::new(InMemory::new(), "apples/bananas")) as Arc<dyn ObjectStore>;
313 assert!(registry.register(nested_url, Arc::clone(&nested)).is_none());
314
315 let to_resolve = Url::parse("memory:///apples/bananas/muffins/cupcakes").unwrap();
316 let (resolved, path) = registry.resolve(&to_resolve).unwrap();
317 assert_eq!(path.as_ref(), "muffins/cupcakes");
318 assert!(Arc::ptr_eq(&resolved, &nested));
319
320 let nested_url2 = Url::parse("memory:///1/2/3").unwrap();
321 let nested2 = Arc::new(PrefixStore::new(InMemory::new(), "1/2/3")) as Arc<dyn ObjectStore>;
322 assert!(
323 registry
324 .register(nested_url2, Arc::clone(&nested2))
325 .is_none()
326 );
327
328 let to_resolve = Url::parse("memory:///1/2/3/4/5/6").unwrap();
329 let (resolved, path) = registry.resolve(&to_resolve).unwrap();
330 assert_eq!(path.as_ref(), "4/5/6");
331 assert!(Arc::ptr_eq(&resolved, &nested2));
332
333 let custom_scheme_url = Url::parse("custom:///").unwrap();
334 let custom_scheme = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
335 assert!(
336 registry
337 .register(custom_scheme_url, Arc::clone(&custom_scheme))
338 .is_none()
339 );
340
341 let to_resolve = Url::parse("custom:///6/7").unwrap();
342 let (resolved, path) = registry.resolve(&to_resolve).unwrap();
343 assert_eq!(path.as_ref(), "6/7");
344 assert!(Arc::ptr_eq(&resolved, &custom_scheme));
345 }
346}