Skip to main content

object_store/
local.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! An object store implementation for a local filesystem
19use std::fs::{File, Metadata, OpenOptions, metadata, symlink_metadata};
20use std::io::{ErrorKind, Read, Seek, SeekFrom, Write};
21use std::ops::Range;
22#[cfg(target_family = "unix")]
23use std::os::unix::fs::FileExt;
24#[cfg(target_family = "windows")]
25use std::os::windows::fs::FileExt;
26use std::sync::Arc;
27use std::time::SystemTime;
28use std::{collections::BTreeSet, io};
29use std::{collections::VecDeque, path::PathBuf};
30
31use async_trait::async_trait;
32use bytes::Bytes;
33use chrono::{DateTime, Utc};
34use futures_util::{FutureExt, TryStreamExt};
35use futures_util::{StreamExt, stream::BoxStream};
36use parking_lot::Mutex;
37use url::Url;
38use walkdir::{DirEntry, WalkDir};
39
40use crate::{
41    Attributes, GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta,
42    ObjectStore, PutMode, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result,
43    UploadPart, maybe_spawn_blocking,
44    path::{Path, absolute_path_to_url},
45    util::InvalidGetRange,
46};
47use crate::{CopyMode, CopyOptions, ObjectStoreExt, RenameOptions, RenameTargetMode};
48
49/// A specialized `Error` for filesystem object store-related errors
50#[derive(Debug, thiserror::Error)]
51pub(crate) enum Error {
52    #[error("Unable to walk dir: {}", source)]
53    UnableToWalkDir { source: walkdir::Error },
54
55    #[error("Unable to access metadata for {}: {}", path, source)]
56    Metadata {
57        source: Box<dyn std::error::Error + Send + Sync + 'static>,
58        path: String,
59    },
60
61    #[error("Unable to copy data to file: {}", source)]
62    UnableToCopyDataToFile { source: io::Error },
63
64    #[error("Unable to rename file: {}", source)]
65    UnableToRenameFile { source: io::Error },
66
67    #[error("Unable to create dir {}: {}", path.display(), source)]
68    UnableToCreateDir { source: io::Error, path: PathBuf },
69
70    #[error("Unable to create file {}: {}", path.display(), source)]
71    UnableToCreateFile { source: io::Error, path: PathBuf },
72
73    #[error("Unable to delete file {}: {}", path.display(), source)]
74    UnableToDeleteFile { source: io::Error, path: PathBuf },
75
76    #[error("Unable to open file {}: {}", path.display(), source)]
77    UnableToOpenFile { source: io::Error, path: PathBuf },
78
79    #[error("Unable to read data from file {}: {}", path.display(), source)]
80    UnableToReadBytes { source: io::Error, path: PathBuf },
81
82    #[error("Out of range of file {}, expected: {}, actual: {}", path.display(), expected, actual)]
83    OutOfRange {
84        path: PathBuf,
85        expected: u64,
86        actual: u64,
87    },
88
89    #[error("Requested range was invalid")]
90    InvalidRange { source: InvalidGetRange },
91
92    #[error("Unable to copy file from {} to {}: {}", from.display(), to.display(), source)]
93    UnableToCopyFile {
94        from: PathBuf,
95        to: PathBuf,
96        source: io::Error,
97    },
98
99    #[error("NotFound")]
100    NotFound { path: PathBuf, source: io::Error },
101
102    #[error("Error seeking file {}: {}", path.display(), source)]
103    Seek { source: io::Error, path: PathBuf },
104
105    #[error("Unable to convert URL \"{}\" to filesystem path", url)]
106    InvalidUrl { url: Url },
107
108    #[error("AlreadyExists")]
109    AlreadyExists { path: String, source: io::Error },
110
111    #[error("Unable to canonicalize filesystem root: {}", path.display())]
112    UnableToCanonicalize { path: PathBuf, source: io::Error },
113
114    #[error("Filenames containing trailing '/#\\d+/' are not supported: {}", path)]
115    InvalidPath { path: String },
116
117    #[error("Upload aborted")]
118    Aborted,
119}
120
121impl From<Error> for super::Error {
122    fn from(source: Error) -> Self {
123        match source {
124            Error::NotFound { path, source } => Self::NotFound {
125                path: path.to_string_lossy().to_string(),
126                source: source.into(),
127            },
128            Error::AlreadyExists { path, source } => Self::AlreadyExists {
129                path,
130                source: source.into(),
131            },
132            _ => Self::Generic {
133                store: "LocalFileSystem",
134                source: Box::new(source),
135            },
136        }
137    }
138}
139
140/// Local filesystem storage providing an [`ObjectStore`] interface to files on
141/// local disk. Can optionally be created with a directory prefix
142///
143/// # Path Semantics
144///
145/// This implementation follows the [file URI] scheme outlined in [RFC 3986]. In
146/// particular paths are delimited by `/`
147///
148/// [file URI]: https://en.wikipedia.org/wiki/File_URI_scheme
149/// [RFC 3986]: https://www.rfc-editor.org/rfc/rfc3986
150///
151/// # Path Semantics
152///
153/// [`LocalFileSystem`] will expose the path semantics of the underlying filesystem, which may
154/// have additional restrictions beyond those enforced by [`Path`].
155///
156/// For example:
157///
158/// * Windows forbids certain filenames, e.g. `COM0`,
159/// * Windows forbids folders with trailing `.`
160/// * Windows forbids certain ASCII characters, e.g. `<` or `|`
161/// * OS X forbids filenames containing `:`
162/// * Leading `-` are discouraged on Unix systems where they may be interpreted as CLI flags
163/// * Filesystems may have restrictions on the maximum path or path segment length
164/// * Filesystem support for non-ASCII characters is inconsistent
165///
166/// Additionally some filesystems, such as NTFS, are case-insensitive, whilst others like
167/// FAT don't preserve case at all. Further some filesystems support non-unicode character
168/// sequences, such as unpaired UTF-16 surrogates, and [`LocalFileSystem`] will error on
169/// encountering such sequences.
170///
171/// Finally, filenames matching the regex `/.*#\d+/`, e.g. `foo.parquet#123`, are not supported
172/// by [`LocalFileSystem`] as they are used to provide atomic writes. Such files will be ignored
173/// for listing operations, and attempting to address such a file will error.
174///
175/// # Tokio Compatibility
176///
177/// Tokio discourages performing blocking IO on a tokio worker thread, however,
178/// no major operating systems have stable async file APIs. Therefore if called from
179/// a tokio context, this will use [`tokio::runtime::Handle::spawn_blocking`] to dispatch
180/// IO to a blocking thread pool, much like `tokio::fs` does under-the-hood.
181///
182/// If not called from a tokio context, this will perform IO on the current thread with
183/// no additional complexity or overheads
184///
185/// # Symlinks
186///
187/// [`LocalFileSystem`] will follow symlinks as normal, however, it is worth noting:
188///
189/// * Broken symlinks will be silently ignored by listing operations
190/// * No effort is made to prevent breaking symlinks when deleting files
191/// * Symlinks that resolve to paths outside the root **will** be followed
192/// * Mutating a file through one or more symlinks will mutate the underlying file
193/// * Deleting a path that resolves to a symlink will only delete the symlink
194///
195/// # Cross-Filesystem Copy
196///
197/// [`LocalFileSystem::copy_opts`] is implemented using [`std::fs::hard_link`], and therefore
198/// does not support copying across filesystem boundaries.
199///
200#[derive(Clone, Debug)]
201pub struct LocalFileSystem {
202    config: Arc<Config>,
203    // if you want to delete empty directories when deleting files
204    automatic_cleanup: bool,
205}
206
207#[derive(Debug)]
208struct Config {
209    root: Url,
210}
211
212impl std::fmt::Display for LocalFileSystem {
213    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
214        write!(f, "LocalFileSystem({})", self.config.root)
215    }
216}
217
218impl Default for LocalFileSystem {
219    fn default() -> Self {
220        Self::new()
221    }
222}
223
224impl LocalFileSystem {
225    /// Create new filesystem storage with no prefix
226    pub fn new() -> Self {
227        Self {
228            config: Arc::new(Config {
229                root: Url::parse("file:///").unwrap(),
230            }),
231            automatic_cleanup: false,
232        }
233    }
234
235    /// Create new filesystem storage with `prefix` applied to all paths
236    ///
237    /// Returns an error if the path does not exist
238    ///
239    pub fn new_with_prefix(prefix: impl AsRef<std::path::Path>) -> Result<Self> {
240        let path = std::fs::canonicalize(&prefix).map_err(|source| {
241            let path = prefix.as_ref().into();
242            Error::UnableToCanonicalize { source, path }
243        })?;
244
245        Ok(Self {
246            config: Arc::new(Config {
247                root: absolute_path_to_url(path)?,
248            }),
249            automatic_cleanup: false,
250        })
251    }
252
253    /// Return an absolute filesystem path of the given file location
254    pub fn path_to_filesystem(&self, location: &Path) -> Result<PathBuf> {
255        self.config.path_to_filesystem(location)
256    }
257
258    /// Enable automatic cleanup of empty directories when deleting files
259    pub fn with_automatic_cleanup(mut self, automatic_cleanup: bool) -> Self {
260        self.automatic_cleanup = automatic_cleanup;
261        self
262    }
263}
264
265impl Config {
266    /// Return an absolute filesystem path of the given location
267    fn prefix_to_filesystem(&self, location: &Path) -> Result<PathBuf> {
268        let mut url = self.root.clone();
269        url.path_segments_mut()
270            .expect("url path")
271            // technically not necessary as Path ignores empty segments
272            // but avoids creating paths with "//" which look odd in error messages.
273            .pop_if_empty()
274            .extend(location.parts());
275
276        url.to_file_path()
277            .map_err(|_| Error::InvalidUrl { url }.into())
278    }
279
280    /// Return an absolute filesystem path of the given file location
281    fn path_to_filesystem(&self, location: &Path) -> Result<PathBuf> {
282        if !is_valid_file_path(location) {
283            let path = location.as_ref().into();
284            let error = Error::InvalidPath { path };
285            return Err(error.into());
286        }
287
288        let path = self.prefix_to_filesystem(location)?;
289
290        #[cfg(target_os = "windows")]
291        let path = {
292            let path = path.to_string_lossy();
293
294            // Assume the first char is the drive letter and the next is a colon.
295            let mut out = String::new();
296            let drive = &path[..2]; // The drive letter and colon (e.g., "C:")
297            let filepath = &path[2..].replace(':', "%3A"); // Replace subsequent colons
298            out.push_str(drive);
299            out.push_str(filepath);
300            PathBuf::from(out)
301        };
302
303        Ok(path)
304    }
305
306    /// Resolves the provided absolute filesystem path to a [`Path`] prefix
307    fn filesystem_to_path(&self, location: &std::path::Path) -> Result<Path> {
308        Ok(Path::from_absolute_path_with_base(
309            location,
310            Some(&self.root),
311        )?)
312    }
313}
314
315fn is_valid_file_path(path: &Path) -> bool {
316    match path.filename() {
317        Some(p) => match p.split_once('#') {
318            Some((_, suffix)) if !suffix.is_empty() => {
319                // Valid if contains non-digits
320                !suffix.as_bytes().iter().all(|x| x.is_ascii_digit())
321            }
322            _ => true,
323        },
324        None => false,
325    }
326}
327
328#[async_trait]
329impl ObjectStore for LocalFileSystem {
330    async fn put_opts(
331        &self,
332        location: &Path,
333        payload: PutPayload,
334        opts: PutOptions,
335    ) -> Result<PutResult> {
336        if matches!(opts.mode, PutMode::Update(_)) {
337            return Err(crate::Error::NotImplemented {
338                operation: "`put_opts` with mode `PutMode::Update`".into(),
339                implementer: self.to_string(),
340            });
341        }
342
343        if !opts.attributes.is_empty() {
344            return Err(crate::Error::NotImplemented {
345                operation: "`put_opts` with `opts.attributes` specified".into(),
346                implementer: self.to_string(),
347            });
348        }
349
350        let path = self.path_to_filesystem(location)?;
351        maybe_spawn_blocking(move || {
352            let (mut file, staging_path) = new_staged_upload(&path)?;
353            let mut e_tag = None;
354
355            let err = match payload.iter().try_for_each(|x| file.write_all(x)) {
356                Ok(_) => {
357                    let metadata = file.metadata().map_err(|e| Error::Metadata {
358                        source: e.into(),
359                        path: path.to_string_lossy().to_string(),
360                    })?;
361                    e_tag = Some(get_etag(&metadata));
362                    match opts.mode {
363                        PutMode::Overwrite => {
364                            // For some fuse types of file systems, the file must be closed first
365                            // to trigger the upload operation, and then renamed, such as Blobfuse
366                            std::mem::drop(file);
367                            match std::fs::rename(&staging_path, &path) {
368                                Ok(_) => None,
369                                Err(source) => Some(Error::UnableToRenameFile { source }),
370                            }
371                        }
372                        PutMode::Create => match std::fs::hard_link(&staging_path, &path) {
373                            Ok(_) => {
374                                let _ = std::fs::remove_file(&staging_path); // Attempt to cleanup
375                                None
376                            }
377                            Err(source) => match source.kind() {
378                                ErrorKind::AlreadyExists => Some(Error::AlreadyExists {
379                                    path: path.to_str().unwrap().to_string(),
380                                    source,
381                                }),
382                                _ => Some(Error::UnableToRenameFile { source }),
383                            },
384                        },
385                        PutMode::Update(_) => unreachable!(),
386                    }
387                }
388                Err(source) => Some(Error::UnableToCopyDataToFile { source }),
389            };
390
391            if let Some(err) = err {
392                let _ = std::fs::remove_file(&staging_path); // Attempt to cleanup
393                return Err(err.into());
394            }
395
396            Ok(PutResult {
397                e_tag,
398                version: None,
399            })
400        })
401        .await
402    }
403
404    async fn put_multipart_opts(
405        &self,
406        location: &Path,
407        opts: PutMultipartOptions,
408    ) -> Result<Box<dyn MultipartUpload>> {
409        if !opts.attributes.is_empty() {
410            return Err(crate::Error::NotImplemented {
411                operation: "`put_multipart_opts` with `opts.attributes` specified".into(),
412                implementer: self.to_string(),
413            });
414        }
415
416        let dest = self.path_to_filesystem(location)?;
417        let (file, src) = new_staged_upload(&dest)?;
418        Ok(Box::new(LocalUpload::new(src, dest, file)))
419    }
420
421    async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
422        let location = location.clone();
423        let path = self.path_to_filesystem(&location)?;
424        maybe_spawn_blocking(move || {
425            let file = open_file(&path)?;
426            let metadata = open_metadata(&file, &path)?;
427            let meta = convert_metadata(metadata, location);
428            options.check_preconditions(&meta)?;
429
430            let range = match options.range {
431                Some(r) => r
432                    .as_range(meta.size)
433                    .map_err(|source| Error::InvalidRange { source })?,
434                None => 0..meta.size,
435            };
436
437            Ok(GetResult {
438                payload: GetResultPayload::File(file, path),
439                attributes: Attributes::default(),
440                range,
441                meta,
442            })
443        })
444        .await
445    }
446
447    async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
448        let path = self.path_to_filesystem(location)?;
449        let ranges = ranges.to_vec();
450        maybe_spawn_blocking(move || {
451            // Vectored IO might be faster
452            // We do not read the metadata here, but error in `read_range` if necessary
453            let mut file = File::open(&path).map_err(|e| map_open_error(e, &path))?;
454            ranges
455                .into_iter()
456                .map(|r| read_range(&mut file, &path, r))
457                .collect()
458        })
459        .await
460    }
461
462    fn delete_stream(
463        &self,
464        locations: BoxStream<'static, Result<Path>>,
465    ) -> BoxStream<'static, Result<Path>> {
466        let config = Arc::clone(&self.config);
467        let automatic_cleanup = self.automatic_cleanup;
468        locations
469            .map(move |location| {
470                let config = Arc::clone(&config);
471                maybe_spawn_blocking(move || {
472                    let location = location?;
473                    Self::delete_location(config, automatic_cleanup, &location)?;
474                    Ok(location)
475                })
476            })
477            .buffered(10)
478            .boxed()
479    }
480
481    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
482        Self::list_with_maybe_offset(Arc::clone(&self.config), prefix, None)
483    }
484
485    fn list_with_offset(
486        &self,
487        prefix: Option<&Path>,
488        offset: &Path,
489    ) -> BoxStream<'static, Result<ObjectMeta>> {
490        Self::list_with_maybe_offset(Arc::clone(&self.config), prefix, Some(offset))
491    }
492
493    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
494        let config = Arc::clone(&self.config);
495
496        let prefix = prefix.cloned().unwrap_or_default();
497        let resolved_prefix = config.prefix_to_filesystem(&prefix)?;
498
499        maybe_spawn_blocking(move || {
500            let walkdir = WalkDir::new(&resolved_prefix)
501                .min_depth(1)
502                .max_depth(1)
503                .follow_links(true);
504
505            let mut common_prefixes = BTreeSet::new();
506            let mut objects = Vec::new();
507
508            for entry_res in walkdir.into_iter().map(convert_walkdir_result) {
509                if let Some(entry) = entry_res? {
510                    let is_directory = entry.file_type().is_dir();
511                    let entry_location = config.filesystem_to_path(entry.path())?;
512                    if !is_directory && !is_valid_file_path(&entry_location) {
513                        continue;
514                    }
515
516                    let mut parts = match entry_location.prefix_match(&prefix) {
517                        Some(parts) => parts,
518                        None => continue,
519                    };
520
521                    let common_prefix = match parts.next() {
522                        Some(p) => p,
523                        None => continue,
524                    };
525
526                    drop(parts);
527
528                    if is_directory {
529                        common_prefixes.insert(prefix.clone().join(common_prefix));
530                    } else if let Some(metadata) = convert_entry(entry, entry_location)? {
531                        objects.push(metadata);
532                    }
533                }
534            }
535
536            Ok(ListResult {
537                common_prefixes: common_prefixes.into_iter().collect(),
538                objects,
539            })
540        })
541        .await
542    }
543
544    async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) -> Result<()> {
545        let CopyOptions {
546            mode,
547            extensions: _,
548        } = options;
549
550        let from = self.path_to_filesystem(from)?;
551        let to = self.path_to_filesystem(to)?;
552
553        match mode {
554            CopyMode::Overwrite => {
555                let mut id = 0;
556                // In order to make this atomic we:
557                //
558                // - hard link to a hidden temporary file
559                // - atomically rename this temporary file into place
560                //
561                // This is necessary because hard_link returns an error if the destination already exists
562                maybe_spawn_blocking(move || {
563                    loop {
564                        let staged = staged_upload_path(&to, &id.to_string());
565                        match std::fs::hard_link(&from, &staged) {
566                            Ok(_) => {
567                                return std::fs::rename(&staged, &to).map_err(|source| {
568                                    let _ = std::fs::remove_file(&staged); // Attempt to clean up
569                                    Error::UnableToCopyFile { from, to, source }.into()
570                                });
571                            }
572                            Err(source) => match source.kind() {
573                                ErrorKind::AlreadyExists => id += 1,
574                                ErrorKind::NotFound => match from.exists() {
575                                    true => create_parent_dirs(&to, source)?,
576                                    false => {
577                                        return Err(Error::NotFound { path: from, source }.into());
578                                    }
579                                },
580                                _ => {
581                                    return Err(Error::UnableToCopyFile { from, to, source }.into());
582                                }
583                            },
584                        }
585                    }
586                })
587                .await
588            }
589            CopyMode::Create => {
590                maybe_spawn_blocking(move || {
591                    loop {
592                        match std::fs::hard_link(&from, &to) {
593                            Ok(_) => return Ok(()),
594                            Err(source) => match source.kind() {
595                                ErrorKind::AlreadyExists => {
596                                    return Err(Error::AlreadyExists {
597                                        path: to.to_str().unwrap().to_string(),
598                                        source,
599                                    }
600                                    .into());
601                                }
602                                ErrorKind::NotFound => match from.exists() {
603                                    true => create_parent_dirs(&to, source)?,
604                                    false => {
605                                        return Err(Error::NotFound { path: from, source }.into());
606                                    }
607                                },
608                                _ => {
609                                    return Err(Error::UnableToCopyFile { from, to, source }.into());
610                                }
611                            },
612                        }
613                    }
614                })
615                .await
616            }
617        }
618    }
619
620    async fn rename_opts(&self, from: &Path, to: &Path, options: RenameOptions) -> Result<()> {
621        let RenameOptions {
622            target_mode,
623            extensions,
624        } = options;
625
626        match target_mode {
627            // optimized implementation
628            RenameTargetMode::Overwrite => {
629                let from = self.path_to_filesystem(from)?;
630                let to = self.path_to_filesystem(to)?;
631                maybe_spawn_blocking(move || {
632                    loop {
633                        match std::fs::rename(&from, &to) {
634                            Ok(_) => return Ok(()),
635                            Err(source) => match source.kind() {
636                                ErrorKind::NotFound => match from.exists() {
637                                    true => create_parent_dirs(&to, source)?,
638                                    false => {
639                                        return Err(Error::NotFound { path: from, source }.into());
640                                    }
641                                },
642                                _ => {
643                                    return Err(Error::UnableToCopyFile { from, to, source }.into());
644                                }
645                            },
646                        }
647                    }
648                })
649                .await
650            }
651            // fall-back to copy & delete
652            RenameTargetMode::Create => {
653                self.copy_opts(
654                    from,
655                    to,
656                    CopyOptions {
657                        mode: CopyMode::Create,
658                        extensions,
659                    },
660                )
661                .await?;
662                self.delete(from).await?;
663                Ok(())
664            }
665        }
666    }
667}
668
669impl LocalFileSystem {
670    fn delete_location(
671        config: Arc<Config>,
672        automatic_cleanup: bool,
673        location: &Path,
674    ) -> Result<()> {
675        let path = config.path_to_filesystem(location)?;
676        if let Err(e) = std::fs::remove_file(&path) {
677            Err(match e.kind() {
678                ErrorKind::NotFound => Error::NotFound { path, source: e }.into(),
679                _ => Error::UnableToDeleteFile { path, source: e }.into(),
680            })
681        } else if automatic_cleanup {
682            let root = &config.root;
683            let root = root
684                .to_file_path()
685                .map_err(|_| Error::InvalidUrl { url: root.clone() })?;
686
687            // here we will try to traverse up and delete an empty dir if possible until we reach the root or get an error
688            let mut parent = path.parent();
689
690            while let Some(loc) = parent {
691                if loc != root && std::fs::remove_dir(loc).is_ok() {
692                    parent = loc.parent();
693                } else {
694                    break;
695                }
696            }
697
698            Ok(())
699        } else {
700            Ok(())
701        }
702    }
703
704    fn list_with_maybe_offset(
705        config: Arc<Config>,
706        prefix: Option<&Path>,
707        maybe_offset: Option<&Path>,
708    ) -> BoxStream<'static, Result<ObjectMeta>> {
709        let root_path = match prefix {
710            Some(prefix) => match config.prefix_to_filesystem(prefix) {
711                Ok(path) => path,
712                Err(e) => return futures_util::future::ready(Err(e)).into_stream().boxed(),
713            },
714            None => config.root.to_file_path().unwrap(),
715        };
716
717        let walkdir = WalkDir::new(root_path)
718            // Don't include the root directory itself
719            .min_depth(1)
720            .follow_links(true);
721
722        let maybe_offset = maybe_offset.cloned();
723
724        let s = walkdir.into_iter().flat_map(move |result_dir_entry| {
725            // Apply offset filter before proceeding, to reduce statx file system calls
726            // This matters for NFS mounts
727            if let (Some(offset), Ok(entry)) = (maybe_offset.as_ref(), result_dir_entry.as_ref()) {
728                let location = config.filesystem_to_path(entry.path());
729                match location {
730                    Ok(path) if path <= *offset => return None,
731                    Err(e) => return Some(Err(e)),
732                    _ => {}
733                }
734            }
735
736            let entry = match convert_walkdir_result(result_dir_entry).transpose()? {
737                Ok(entry) => entry,
738                Err(e) => return Some(Err(e)),
739            };
740
741            if !entry.path().is_file() {
742                return None;
743            }
744
745            match config.filesystem_to_path(entry.path()) {
746                Ok(path) => match is_valid_file_path(&path) {
747                    true => convert_entry(entry, path).transpose(),
748                    false => None,
749                },
750                Err(e) => Some(Err(e)),
751            }
752        });
753
754        // If no tokio context, return iterator directly as no
755        // need to perform chunked spawn_blocking reads
756        if tokio::runtime::Handle::try_current().is_err() {
757            return futures_util::stream::iter(s).boxed();
758        }
759
760        // Otherwise list in batches of CHUNK_SIZE
761        const CHUNK_SIZE: usize = 1024;
762
763        let buffer = VecDeque::with_capacity(CHUNK_SIZE);
764        futures_util::stream::try_unfold((s, buffer), |(mut s, mut buffer)| async move {
765            if buffer.is_empty() {
766                (s, buffer) = tokio::task::spawn_blocking(move || {
767                    for _ in 0..CHUNK_SIZE {
768                        match s.next() {
769                            Some(r) => buffer.push_back(r),
770                            None => break,
771                        }
772                    }
773                    (s, buffer)
774                })
775                .await?;
776            }
777
778            match buffer.pop_front() {
779                Some(Err(e)) => Err(e),
780                Some(Ok(meta)) => Ok(Some((meta, (s, buffer)))),
781                None => Ok(None),
782            }
783        })
784        .boxed()
785    }
786}
787
788/// Creates the parent directories of `path` or returns an error based on `source` if no parent
789fn create_parent_dirs(path: &std::path::Path, source: io::Error) -> Result<()> {
790    let parent = path.parent().ok_or_else(|| {
791        let path = path.to_path_buf();
792        Error::UnableToCreateFile { path, source }
793    })?;
794
795    std::fs::create_dir_all(parent).map_err(|source| {
796        let path = parent.into();
797        Error::UnableToCreateDir { source, path }
798    })?;
799    Ok(())
800}
801
802/// Generates a unique file path `{base}#{suffix}`, returning the opened `File` and `path`
803///
804/// Creates any directories if necessary
805fn new_staged_upload(base: &std::path::Path) -> Result<(File, PathBuf)> {
806    let mut multipart_id = 1;
807    loop {
808        let suffix = multipart_id.to_string();
809        let path = staged_upload_path(base, &suffix);
810        let mut options = OpenOptions::new();
811        match options.read(true).write(true).create_new(true).open(&path) {
812            Ok(f) => return Ok((f, path)),
813            Err(source) => match source.kind() {
814                ErrorKind::AlreadyExists => multipart_id += 1,
815                ErrorKind::NotFound => create_parent_dirs(&path, source)?,
816                _ => return Err(Error::UnableToOpenFile { source, path }.into()),
817            },
818        }
819    }
820}
821
822/// Returns the unique upload for the given path and suffix
823fn staged_upload_path(dest: &std::path::Path, suffix: &str) -> PathBuf {
824    let mut staging_path = dest.as_os_str().to_owned();
825    staging_path.push("#");
826    staging_path.push(suffix);
827    staging_path.into()
828}
829
830#[derive(Debug)]
831struct LocalUpload {
832    /// The upload state
833    state: Arc<UploadState>,
834    /// The location of the temporary file
835    src: Option<PathBuf>,
836    /// The next offset to write into the file
837    offset: u64,
838}
839
840#[derive(Debug)]
841struct UploadState {
842    dest: PathBuf,
843    file: Mutex<File>,
844}
845
846impl LocalUpload {
847    pub(crate) fn new(src: PathBuf, dest: PathBuf, file: File) -> Self {
848        Self {
849            state: Arc::new(UploadState {
850                dest,
851                file: Mutex::new(file),
852            }),
853            src: Some(src),
854            offset: 0,
855        }
856    }
857}
858
859#[async_trait]
860impl MultipartUpload for LocalUpload {
861    fn put_part(&mut self, data: PutPayload) -> UploadPart {
862        let offset = self.offset;
863        self.offset += data.content_length() as u64;
864
865        let s = Arc::clone(&self.state);
866        maybe_spawn_blocking(move || {
867            let mut file = s.file.lock();
868            file.seek(SeekFrom::Start(offset)).map_err(|source| {
869                let path = s.dest.clone();
870                Error::Seek { source, path }
871            })?;
872
873            data.iter()
874                .try_for_each(|x| file.write_all(x))
875                .map_err(|source| Error::UnableToCopyDataToFile { source })?;
876
877            Ok(())
878        })
879        .boxed()
880    }
881
882    async fn complete(&mut self) -> Result<PutResult> {
883        let src = self.src.take().ok_or(Error::Aborted)?;
884        let s = Arc::clone(&self.state);
885        maybe_spawn_blocking(move || {
886            // Ensure no inflight writes
887            let file = s.file.lock();
888            std::fs::rename(&src, &s.dest)
889                .map_err(|source| Error::UnableToRenameFile { source })?;
890            let metadata = file.metadata().map_err(|e| Error::Metadata {
891                source: e.into(),
892                path: src.to_string_lossy().to_string(),
893            })?;
894
895            Ok(PutResult {
896                e_tag: Some(get_etag(&metadata)),
897                version: None,
898            })
899        })
900        .await
901    }
902
903    async fn abort(&mut self) -> Result<()> {
904        let src = self.src.take().ok_or(Error::Aborted)?;
905        maybe_spawn_blocking(move || {
906            std::fs::remove_file(&src)
907                .map_err(|source| Error::UnableToDeleteFile { source, path: src })?;
908            Ok(())
909        })
910        .await
911    }
912}
913
914impl Drop for LocalUpload {
915    fn drop(&mut self) {
916        if let Some(src) = self.src.take() {
917            // Try to clean up intermediate file ignoring any error
918            match tokio::runtime::Handle::try_current() {
919                Ok(r) => drop(r.spawn_blocking(move || std::fs::remove_file(src))),
920                Err(_) => drop(std::fs::remove_file(src)),
921            };
922        }
923    }
924}
925
926pub(crate) fn chunked_stream(
927    mut file: File,
928    path: PathBuf,
929    range: Range<u64>,
930    chunk_size: usize,
931) -> BoxStream<'static, Result<Bytes, super::Error>> {
932    futures_util::stream::once(async move {
933        let requested = range.end - range.start;
934
935        let (file, path) = maybe_spawn_blocking(move || {
936            file.seek(SeekFrom::Start(range.start as _))
937                .map_err(|err| map_seek_error(err, &file, &path, range.start))?;
938            Ok((file, path))
939        })
940        .await?;
941
942        let stream = futures_util::stream::try_unfold(
943            (file, path, requested),
944            move |(mut file, path, remaining)| {
945                maybe_spawn_blocking(move || {
946                    if remaining == 0 {
947                        return Ok(None);
948                    }
949
950                    let to_read = remaining.min(chunk_size as u64);
951                    let cap = usize::try_from(to_read).map_err(|_e| Error::InvalidRange {
952                        source: InvalidGetRange::TooLarge {
953                            requested: to_read,
954                            max: usize::MAX as u64,
955                        },
956                    })?;
957                    let mut buffer = Vec::with_capacity(cap);
958                    let read = (&mut file)
959                        .take(to_read)
960                        .read_to_end(&mut buffer)
961                        .map_err(|e| Error::UnableToReadBytes {
962                            source: e,
963                            path: path.clone(),
964                        })?;
965
966                    Ok(Some((buffer.into(), (file, path, remaining - read as u64))))
967                })
968            },
969        );
970        Ok::<_, super::Error>(stream)
971    })
972    .try_flatten()
973    .boxed()
974}
975
976pub(crate) fn read_range(
977    file: &mut File,
978    path: &std::path::Path,
979    range: Range<u64>,
980) -> Result<Bytes> {
981    let requested = range.end - range.start;
982
983    let mut buf = Vec::with_capacity(requested as usize);
984
985    #[cfg(any(target_family = "unix", target_family = "windows"))]
986    {
987        buf.resize(requested as usize, 0_u8);
988
989        let mut buf_slice = &mut buf[..];
990        let mut offset = range.start;
991
992        while !buf_slice.is_empty() {
993            #[cfg(target_family = "unix")]
994            let read_result = file.read_at(buf_slice, offset);
995
996            #[cfg(target_family = "windows")]
997            let read_result = file.seek_read(buf_slice, offset);
998
999            match read_result {
1000                Ok(0) => break,
1001                Ok(n) => {
1002                    let tmp = buf_slice;
1003                    buf_slice = &mut tmp[n..];
1004                    offset += n as u64;
1005                }
1006                // This error is recoverable
1007                Err(e) if e.kind() == ErrorKind::Interrupted => {}
1008                Err(source) => {
1009                    let error = Error::UnableToReadBytes {
1010                        source,
1011                        path: path.into(),
1012                    };
1013
1014                    return Err(error.into());
1015                }
1016            }
1017        }
1018
1019        // If we reached EOF before filling the buffer
1020        if !buf_slice.is_empty() {
1021            let metadata = open_metadata(file, path)?;
1022            let file_len = metadata.len();
1023
1024            // If none of the range is satisfiable we should error, e.g. if the start offset is beyond the
1025            // extents of the file, or if its at the end of the file and wants to read a non-empty range.
1026            // if range.start > file_len || (range.start == file_len && !range.is_empty()) {
1027            if range.start >= file_len {
1028                return Err(Error::InvalidRange {
1029                    source: InvalidGetRange::StartTooLarge {
1030                        requested: range.start,
1031                        length: file_len,
1032                    },
1033                }
1034                .into());
1035            }
1036
1037            let expected = range.end.min(file_len) - range.start;
1038
1039            let error = Error::OutOfRange {
1040                path: path.into(),
1041                expected,
1042                actual: offset - range.start,
1043            };
1044
1045            return Err(error.into());
1046        }
1047    }
1048    #[cfg(all(not(windows), not(unix)))]
1049    {
1050        file.seek(SeekFrom::Start(range.start))
1051            .map_err(|err| map_seek_error(err, file, path, range.start))?;
1052
1053        let read = file.take(requested).read_to_end(&mut buf).map_err(|err| {
1054            // try to read metadata to give a better error in case of directory
1055            if let Err(e) = open_metadata(file, path) {
1056                return e;
1057            }
1058            Error::UnableToReadBytes {
1059                source: err,
1060                path: path.to_path_buf(),
1061            }
1062        })? as u64;
1063
1064        if read != requested {
1065            let metadata = open_metadata(file, path)?;
1066            let file_len = metadata.len();
1067
1068            if range.start >= file_len {
1069                return Err(Error::InvalidRange {
1070                    source: InvalidGetRange::StartTooLarge {
1071                        requested: range.start,
1072                        length: file_len,
1073                    },
1074                }
1075                .into());
1076            }
1077
1078            let expected = range.end.min(file_len) - range.start;
1079            if read != expected {
1080                return Err(Error::OutOfRange {
1081                    path: path.to_path_buf(),
1082                    expected,
1083                    actual: read,
1084                }
1085                .into());
1086            }
1087        }
1088    }
1089
1090    Ok(buf.into())
1091}
1092
1093fn open_file(path: &std::path::Path) -> Result<File, Error> {
1094    File::open(path).map_err(|e| map_open_error(e, path))
1095}
1096
1097fn open_metadata(file: &File, path: &std::path::Path) -> Result<Metadata, Error> {
1098    let metadata = file.metadata().map_err(|e| map_open_error(e, path))?;
1099    if metadata.is_dir() {
1100        Err(Error::NotFound {
1101            path: PathBuf::from(path),
1102            source: io::Error::new(ErrorKind::NotFound, "is directory"),
1103        })
1104    } else {
1105        Ok(metadata)
1106    }
1107}
1108
1109/// Translates errors from opening a file into a more specific [`Error`] when possible
1110fn map_open_error(source: io::Error, path: &std::path::Path) -> Error {
1111    let path = PathBuf::from(path);
1112    match source.kind() {
1113        ErrorKind::NotFound => Error::NotFound { path, source },
1114        _ => Error::UnableToOpenFile { path, source },
1115    }
1116}
1117
1118/// Translates errors from attempting to a file into a more specific [`Error`] when possible
1119fn map_seek_error(source: io::Error, file: &File, path: &std::path::Path, requested: u64) -> Error {
1120    // if we can't seek, check if start is out of bounds to give
1121    // a better error. Don't read metadata before to avoid
1122    // an extra syscall in the common case
1123    let m = match open_metadata(file, path) {
1124        Err(e) => return e,
1125        Ok(m) => m,
1126    };
1127    if requested >= m.len() {
1128        return Error::InvalidRange {
1129            source: InvalidGetRange::StartTooLarge {
1130                requested,
1131                length: m.len(),
1132            },
1133        };
1134    }
1135    Error::Seek {
1136        source,
1137        path: PathBuf::from(path),
1138    }
1139}
1140
1141fn convert_entry(entry: DirEntry, location: Path) -> Result<Option<ObjectMeta>> {
1142    match entry.metadata() {
1143        Ok(metadata) => Ok(Some(convert_metadata(metadata, location))),
1144        Err(e) => {
1145            if let Some(io_err) = e.io_error() {
1146                if io_err.kind() == ErrorKind::NotFound {
1147                    return Ok(None);
1148                }
1149            }
1150            Err(Error::Metadata {
1151                source: e.into(),
1152                path: location.to_string(),
1153            })?
1154        }
1155    }
1156}
1157
1158fn last_modified(metadata: &Metadata) -> DateTime<Utc> {
1159    metadata
1160        .modified()
1161        .expect("Modified file time should be supported on this platform")
1162        .into()
1163}
1164
1165fn get_etag(metadata: &Metadata) -> String {
1166    let inode = get_inode(metadata);
1167    let size = metadata.len();
1168    let mtime = metadata
1169        .modified()
1170        .ok()
1171        .and_then(|mtime| mtime.duration_since(SystemTime::UNIX_EPOCH).ok())
1172        .unwrap_or_default()
1173        .as_micros();
1174
1175    // Use an ETag scheme based on that used by many popular HTTP servers
1176    // <https://httpd.apache.org/docs/2.2/mod/core.html#fileetag>
1177    // <https://stackoverflow.com/questions/47512043/how-etags-are-generated-and-configured>
1178    format!("{inode:x}-{mtime:x}-{size:x}")
1179}
1180
1181fn convert_metadata(metadata: Metadata, location: Path) -> ObjectMeta {
1182    let last_modified = last_modified(&metadata);
1183
1184    ObjectMeta {
1185        location,
1186        last_modified,
1187        size: metadata.len(),
1188        e_tag: Some(get_etag(&metadata)),
1189        version: None,
1190    }
1191}
1192
1193#[cfg(unix)]
1194/// We include the inode when available to yield an ETag more resistant to collisions
1195/// and as used by popular web servers such as [Apache](https://httpd.apache.org/docs/2.2/mod/core.html#fileetag)
1196fn get_inode(metadata: &Metadata) -> u64 {
1197    std::os::unix::fs::MetadataExt::ino(metadata)
1198}
1199
1200#[cfg(not(unix))]
1201/// On platforms where an inode isn't available, fallback to just relying on size and mtime
1202fn get_inode(_metadata: &Metadata) -> u64 {
1203    0
1204}
1205
1206/// Convert walkdir results and converts not-found errors into `None`.
1207/// Convert broken symlinks to `None`.
1208fn convert_walkdir_result(
1209    res: std::result::Result<DirEntry, walkdir::Error>,
1210) -> Result<Option<DirEntry>> {
1211    match res {
1212        Ok(entry) => {
1213            // To check for broken symlink: call symlink_metadata() - it does not traverse symlinks);
1214            // if ok: check if entry is symlink; and try to read it by calling metadata().
1215            match symlink_metadata(entry.path()) {
1216                Ok(attr) => {
1217                    if attr.is_symlink() {
1218                        let target_metadata = metadata(entry.path());
1219                        match target_metadata {
1220                            Ok(_) => {
1221                                // symlink is valid
1222                                Ok(Some(entry))
1223                            }
1224                            Err(_) => {
1225                                // this is a broken symlink, return None
1226                                Ok(None)
1227                            }
1228                        }
1229                    } else {
1230                        Ok(Some(entry))
1231                    }
1232                }
1233                Err(_) => Ok(None),
1234            }
1235        }
1236
1237        Err(walkdir_err) => match walkdir_err.io_error() {
1238            Some(io_err) => match io_err.kind() {
1239                ErrorKind::NotFound => Ok(None),
1240                _ => Err(Error::UnableToWalkDir {
1241                    source: walkdir_err,
1242                }
1243                .into()),
1244            },
1245            None => Err(Error::UnableToWalkDir {
1246                source: walkdir_err,
1247            }
1248            .into()),
1249        },
1250    }
1251}
1252
1253#[cfg(test)]
1254mod tests {
1255    use std::fs;
1256
1257    use futures_util::TryStreamExt;
1258    use tempfile::TempDir;
1259
1260    #[cfg(target_family = "unix")]
1261    use tempfile::NamedTempFile;
1262
1263    use crate::{ObjectStoreExt, integration::*};
1264
1265    use super::*;
1266
1267    #[tokio::test]
1268    #[cfg(target_family = "unix")]
1269    async fn file_test() {
1270        let root = TempDir::new().unwrap();
1271        let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1272
1273        put_get_delete_list(&integration).await;
1274        list_with_offset_exclusivity(&integration).await;
1275        get_opts(&integration).await;
1276        list_uses_directories_correctly(&integration).await;
1277        list_with_delimiter(&integration).await;
1278        rename_and_copy(&integration).await;
1279        copy_if_not_exists(&integration).await;
1280        copy_rename_nonexistent_object(&integration).await;
1281        stream_get(&integration).await;
1282        put_opts(&integration, false).await;
1283    }
1284
1285    #[test]
1286    #[cfg(target_family = "unix")]
1287    fn test_non_tokio() {
1288        let root = TempDir::new().unwrap();
1289        let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1290        futures_executor::block_on(async move {
1291            put_get_delete_list(&integration).await;
1292            list_uses_directories_correctly(&integration).await;
1293            list_with_delimiter(&integration).await;
1294
1295            // Can't use stream_get test as WriteMultipart uses a tokio JoinSet
1296            let p = Path::from("manual_upload");
1297            let mut upload = integration.put_multipart(&p).await.unwrap();
1298            upload.put_part("123".into()).await.unwrap();
1299            upload.put_part("45678".into()).await.unwrap();
1300            let r = upload.complete().await.unwrap();
1301
1302            let get = integration.get(&p).await.unwrap();
1303            assert_eq!(get.meta.e_tag.as_ref().unwrap(), r.e_tag.as_ref().unwrap());
1304            let actual = get.bytes().await.unwrap();
1305            assert_eq!(actual.as_ref(), b"12345678");
1306        });
1307    }
1308
1309    #[tokio::test]
1310    async fn creates_dir_if_not_present() {
1311        let root = TempDir::new().unwrap();
1312        let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1313
1314        let location = Path::from("nested/file/test_file");
1315
1316        let data = Bytes::from("arbitrary data");
1317
1318        integration
1319            .put(&location, data.clone().into())
1320            .await
1321            .unwrap();
1322
1323        let read_data = integration
1324            .get(&location)
1325            .await
1326            .unwrap()
1327            .bytes()
1328            .await
1329            .unwrap();
1330        assert_eq!(&*read_data, data);
1331    }
1332
1333    #[tokio::test]
1334    async fn unknown_length() {
1335        let root = TempDir::new().unwrap();
1336        let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1337
1338        let location = Path::from("some_file");
1339
1340        let data = Bytes::from("arbitrary data");
1341
1342        integration
1343            .put(&location, data.clone().into())
1344            .await
1345            .unwrap();
1346
1347        let read_data = integration
1348            .get(&location)
1349            .await
1350            .unwrap()
1351            .bytes()
1352            .await
1353            .unwrap();
1354        assert_eq!(&*read_data, data);
1355    }
1356
1357    #[tokio::test]
1358    async fn range_request_start_beyond_end_of_file() {
1359        let root = TempDir::new().unwrap();
1360        let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1361
1362        let location = Path::from("some_file");
1363
1364        let data = Bytes::from("arbitrary data");
1365
1366        integration
1367            .put(&location, data.clone().into())
1368            .await
1369            .unwrap();
1370
1371        integration
1372            .get_range(&location, 100..200)
1373            .await
1374            .expect_err("Should error with start range beyond end of file");
1375    }
1376
1377    #[tokio::test]
1378    async fn range_request_beyond_end_of_file() {
1379        let root = TempDir::new().unwrap();
1380        let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1381
1382        let location = Path::from("some_file");
1383
1384        let data = Bytes::from("arbitrary data");
1385
1386        integration
1387            .put(&location, data.clone().into())
1388            .await
1389            .unwrap();
1390
1391        let read_data = integration.get_range(&location, 0..100).await.unwrap();
1392        assert_eq!(&*read_data, data);
1393    }
1394
1395    #[tokio::test]
1396    #[cfg(target_family = "unix")]
1397    // Fails on github actions runner (which runs the tests as root)
1398    #[ignore]
1399    async fn bubble_up_io_errors() {
1400        use std::{fs::set_permissions, os::unix::prelude::PermissionsExt};
1401
1402        let root = TempDir::new().unwrap();
1403
1404        // make non-readable
1405        let metadata = root.path().metadata().unwrap();
1406        let mut permissions = metadata.permissions();
1407        permissions.set_mode(0o000);
1408        set_permissions(root.path(), permissions).unwrap();
1409
1410        let store = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1411
1412        let mut stream = store.list(None);
1413        let mut any_err = false;
1414        while let Some(res) = stream.next().await {
1415            if res.is_err() {
1416                any_err = true;
1417            }
1418        }
1419        assert!(any_err);
1420
1421        // `list_with_delimiter
1422        assert!(store.list_with_delimiter(None).await.is_err());
1423    }
1424
1425    const NON_EXISTENT_NAME: &str = "nonexistentname";
1426
1427    #[tokio::test]
1428    async fn get_nonexistent_location() {
1429        let root = TempDir::new().unwrap();
1430        let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1431
1432        let location = Path::from(NON_EXISTENT_NAME);
1433
1434        let err = get_nonexistent_object(&integration, Some(location))
1435            .await
1436            .unwrap_err();
1437        if let crate::Error::NotFound { path, source } = err {
1438            let source_variant = source.downcast_ref::<std::io::Error>();
1439            assert!(
1440                matches!(source_variant, Some(std::io::Error { .. }),),
1441                "got: {source_variant:?}"
1442            );
1443            assert!(path.ends_with(NON_EXISTENT_NAME), "{}", path);
1444        } else {
1445            panic!("unexpected error type: {err:?}");
1446        }
1447    }
1448
1449    #[tokio::test]
1450    async fn root() {
1451        let integration = LocalFileSystem::new();
1452
1453        let canonical = std::path::Path::new("Cargo.toml").canonicalize().unwrap();
1454        let url = Url::from_directory_path(&canonical).unwrap();
1455        let path = Path::parse(url.path()).unwrap();
1456
1457        let roundtrip = integration.path_to_filesystem(&path).unwrap();
1458
1459        // Needed as on Windows canonicalize returns extended length path syntax
1460        // C:\Users\circleci -> \\?\C:\Users\circleci
1461        let roundtrip = roundtrip.canonicalize().unwrap();
1462
1463        assert_eq!(roundtrip, canonical);
1464
1465        integration.head(&path).await.unwrap();
1466    }
1467
1468    #[tokio::test]
1469    #[cfg(target_family = "windows")]
1470    async fn test_list_root() {
1471        let fs = LocalFileSystem::new();
1472        let r = fs.list_with_delimiter(None).await.unwrap_err().to_string();
1473
1474        assert!(
1475            r.contains("Unable to convert URL \"file:///\" to filesystem path"),
1476            "{}",
1477            r
1478        );
1479    }
1480
1481    #[tokio::test]
1482    #[cfg(target_os = "linux")]
1483    async fn test_list_root() {
1484        let fs = LocalFileSystem::new();
1485        fs.list_with_delimiter(None).await.unwrap();
1486    }
1487
1488    #[cfg(target_family = "unix")]
1489    async fn check_list(integration: &LocalFileSystem, prefix: Option<&Path>, expected: &[&str]) {
1490        let result: Vec<_> = integration.list(prefix).try_collect().await.unwrap();
1491
1492        let mut strings: Vec<_> = result.iter().map(|x| x.location.as_ref()).collect();
1493        strings.sort_unstable();
1494        assert_eq!(&strings, expected)
1495    }
1496
1497    #[tokio::test]
1498    #[cfg(target_family = "unix")]
1499    async fn test_symlink() {
1500        let root = TempDir::new().unwrap();
1501        let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1502
1503        let subdir = root.path().join("a");
1504        std::fs::create_dir(&subdir).unwrap();
1505        let file = subdir.join("file.parquet");
1506        std::fs::write(file, "test").unwrap();
1507
1508        check_list(&integration, None, &["a/file.parquet"]).await;
1509        integration
1510            .head(&Path::from("a/file.parquet"))
1511            .await
1512            .unwrap();
1513
1514        // Follow out of tree symlink
1515        let other = NamedTempFile::new().unwrap();
1516        std::os::unix::fs::symlink(other.path(), root.path().join("test.parquet")).unwrap();
1517
1518        // Should return test.parquet even though out of tree
1519        check_list(&integration, None, &["a/file.parquet", "test.parquet"]).await;
1520
1521        // Can fetch test.parquet
1522        integration.head(&Path::from("test.parquet")).await.unwrap();
1523
1524        // Follow in tree symlink
1525        std::os::unix::fs::symlink(&subdir, root.path().join("b")).unwrap();
1526        check_list(
1527            &integration,
1528            None,
1529            &["a/file.parquet", "b/file.parquet", "test.parquet"],
1530        )
1531        .await;
1532        check_list(&integration, Some(&Path::from("b")), &["b/file.parquet"]).await;
1533
1534        // Can fetch through symlink
1535        integration
1536            .head(&Path::from("b/file.parquet"))
1537            .await
1538            .unwrap();
1539
1540        // Ignore broken symlink
1541        std::os::unix::fs::symlink(root.path().join("foo.parquet"), root.path().join("c")).unwrap();
1542
1543        check_list(
1544            &integration,
1545            None,
1546            &["a/file.parquet", "b/file.parquet", "test.parquet"],
1547        )
1548        .await;
1549
1550        let mut r = integration.list_with_delimiter(None).await.unwrap();
1551        r.common_prefixes.sort_unstable();
1552        assert_eq!(r.common_prefixes.len(), 2);
1553        assert_eq!(r.common_prefixes[0].as_ref(), "a");
1554        assert_eq!(r.common_prefixes[1].as_ref(), "b");
1555        assert_eq!(r.objects.len(), 1);
1556        assert_eq!(r.objects[0].location.as_ref(), "test.parquet");
1557
1558        let r = integration
1559            .list_with_delimiter(Some(&Path::from("a")))
1560            .await
1561            .unwrap();
1562        assert_eq!(r.common_prefixes.len(), 0);
1563        assert_eq!(r.objects.len(), 1);
1564        assert_eq!(r.objects[0].location.as_ref(), "a/file.parquet");
1565
1566        // Deleting a symlink doesn't delete the source file
1567        integration
1568            .delete(&Path::from("test.parquet"))
1569            .await
1570            .unwrap();
1571        assert!(other.path().exists());
1572
1573        check_list(&integration, None, &["a/file.parquet", "b/file.parquet"]).await;
1574
1575        // Deleting through a symlink deletes both files
1576        integration
1577            .delete(&Path::from("b/file.parquet"))
1578            .await
1579            .unwrap();
1580
1581        check_list(&integration, None, &[]).await;
1582
1583        // Adding a file through a symlink creates in both paths
1584        integration
1585            .put(&Path::from("b/file.parquet"), vec![0, 1, 2].into())
1586            .await
1587            .unwrap();
1588
1589        check_list(&integration, None, &["a/file.parquet", "b/file.parquet"]).await;
1590    }
1591
1592    #[tokio::test]
1593    async fn invalid_path() {
1594        let root = TempDir::new().unwrap();
1595        let root = root.path().join("🙀");
1596        std::fs::create_dir(root.clone()).unwrap();
1597
1598        // Invalid paths supported above root of store
1599        let integration = LocalFileSystem::new_with_prefix(root.clone()).unwrap();
1600
1601        let directory = Path::from("directory");
1602        let object = directory.clone().join("child.txt");
1603        let data = Bytes::from("arbitrary");
1604        integration.put(&object, data.clone().into()).await.unwrap();
1605        integration.head(&object).await.unwrap();
1606        let result = integration.get(&object).await.unwrap();
1607        assert_eq!(result.bytes().await.unwrap(), data);
1608
1609        flatten_list_stream(&integration, None).await.unwrap();
1610        flatten_list_stream(&integration, Some(&directory))
1611            .await
1612            .unwrap();
1613
1614        let result = integration
1615            .list_with_delimiter(Some(&directory))
1616            .await
1617            .unwrap();
1618        assert_eq!(result.objects.len(), 1);
1619        assert!(result.common_prefixes.is_empty());
1620        assert_eq!(result.objects[0].location, object);
1621
1622        let emoji = root.join("💀");
1623        std::fs::write(emoji, "foo").unwrap();
1624
1625        // Can list illegal file
1626        let mut paths = flatten_list_stream(&integration, None).await.unwrap();
1627        paths.sort_unstable();
1628
1629        assert_eq!(
1630            paths,
1631            vec![
1632                Path::parse("directory/child.txt").unwrap(),
1633                Path::parse("💀").unwrap()
1634            ]
1635        );
1636    }
1637
1638    #[tokio::test]
1639    async fn list_hides_incomplete_uploads() {
1640        let root = TempDir::new().unwrap();
1641        let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1642        let location = Path::from("some_file");
1643
1644        let data = PutPayload::from("arbitrary data");
1645        let mut u1 = integration.put_multipart(&location).await.unwrap();
1646        u1.put_part(data.clone()).await.unwrap();
1647
1648        let mut u2 = integration.put_multipart(&location).await.unwrap();
1649        u2.put_part(data).await.unwrap();
1650
1651        let list = flatten_list_stream(&integration, None).await.unwrap();
1652        assert_eq!(list.len(), 0);
1653
1654        assert_eq!(
1655            integration
1656                .list_with_delimiter(None)
1657                .await
1658                .unwrap()
1659                .objects
1660                .len(),
1661            0
1662        );
1663    }
1664
1665    #[tokio::test]
1666    async fn test_path_with_offset() {
1667        let root = TempDir::new().unwrap();
1668        let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1669
1670        let root_path = root.path();
1671        for i in 0..5 {
1672            let filename = format!("test{i}.parquet");
1673            let file = root_path.join(filename);
1674            std::fs::write(file, "test").unwrap();
1675        }
1676        let filter_str = "test";
1677        let filter = String::from(filter_str);
1678        let offset_str = filter + "1";
1679        let offset = Path::from(offset_str.clone());
1680
1681        // Use list_with_offset to retrieve files
1682        let res = integration.list_with_offset(None, &offset);
1683        let offset_paths: Vec<_> = res.map_ok(|x| x.location).try_collect().await.unwrap();
1684        let mut offset_files: Vec<_> = offset_paths
1685            .iter()
1686            .map(|x| String::from(x.filename().unwrap()))
1687            .collect();
1688
1689        // Check result with direct filesystem read
1690        let files = fs::read_dir(root_path).unwrap();
1691        let filtered_files = files
1692            .filter_map(Result::ok)
1693            .filter_map(|d| {
1694                d.file_name().to_str().and_then(|f| {
1695                    if f.contains(filter_str) {
1696                        Some(String::from(f))
1697                    } else {
1698                        None
1699                    }
1700                })
1701            })
1702            .collect::<Vec<_>>();
1703
1704        let mut expected_offset_files: Vec<_> = filtered_files
1705            .iter()
1706            .filter(|s| **s > offset_str)
1707            .cloned()
1708            .collect();
1709
1710        fn do_vecs_match<T: PartialEq>(a: &[T], b: &[T]) -> bool {
1711            let matching = a.iter().zip(b.iter()).filter(|&(a, b)| a == b).count();
1712            matching == a.len() && matching == b.len()
1713        }
1714
1715        offset_files.sort();
1716        expected_offset_files.sort();
1717
1718        // println!("Expected Offset Files: {:?}", expected_offset_files);
1719        // println!("Actual Offset Files: {:?}", offset_files);
1720
1721        assert_eq!(offset_files.len(), expected_offset_files.len());
1722        assert!(do_vecs_match(&expected_offset_files, &offset_files));
1723    }
1724
1725    #[tokio::test]
1726    async fn filesystem_filename_with_percent() {
1727        let temp_dir = TempDir::new().unwrap();
1728        let integration = LocalFileSystem::new_with_prefix(temp_dir.path()).unwrap();
1729        let filename = "L%3ABC.parquet";
1730
1731        std::fs::write(temp_dir.path().join(filename), "foo").unwrap();
1732
1733        let res: Vec<_> = integration.list(None).try_collect().await.unwrap();
1734        assert_eq!(res.len(), 1);
1735        assert_eq!(res[0].location.as_ref(), filename);
1736
1737        let res = integration.list_with_delimiter(None).await.unwrap();
1738        assert_eq!(res.objects.len(), 1);
1739        assert_eq!(res.objects[0].location.as_ref(), filename);
1740    }
1741
1742    #[tokio::test]
1743    async fn relative_paths() {
1744        LocalFileSystem::new_with_prefix(".").unwrap();
1745        LocalFileSystem::new_with_prefix("..").unwrap();
1746        LocalFileSystem::new_with_prefix("../..").unwrap();
1747
1748        let integration = LocalFileSystem::new();
1749        let path = Path::from_filesystem_path(".").unwrap();
1750        integration.list_with_delimiter(Some(&path)).await.unwrap();
1751    }
1752
1753    #[test]
1754    fn test_valid_path() {
1755        let cases = [
1756            ("foo#123/test.txt", true),
1757            ("foo#123/test#23.txt", true),
1758            ("foo#123/test#34", false),
1759            ("foo😁/test#34", false),
1760            ("foo/test#😁34", true),
1761        ];
1762
1763        for (case, expected) in cases {
1764            let path = Path::parse(case).unwrap();
1765            assert_eq!(is_valid_file_path(&path), expected);
1766        }
1767    }
1768
1769    #[tokio::test]
1770    async fn test_intermediate_files() {
1771        let root = TempDir::new().unwrap();
1772        let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1773
1774        let a = Path::parse("foo#123/test.txt").unwrap();
1775        integration.put(&a, "test".into()).await.unwrap();
1776
1777        let list = flatten_list_stream(&integration, None).await.unwrap();
1778        assert_eq!(list, vec![a.clone()]);
1779
1780        std::fs::write(root.path().join("bar#123"), "test").unwrap();
1781
1782        // Should ignore file
1783        let list = flatten_list_stream(&integration, None).await.unwrap();
1784        assert_eq!(list, vec![a.clone()]);
1785
1786        let b = Path::parse("bar#123").unwrap();
1787        let err = integration.get(&b).await.unwrap_err().to_string();
1788        assert_eq!(
1789            err,
1790            "Generic LocalFileSystem error: Filenames containing trailing '/#\\d+/' are not supported: bar#123"
1791        );
1792
1793        let c = Path::parse("foo#123.txt").unwrap();
1794        integration.put(&c, "test".into()).await.unwrap();
1795
1796        let mut list = flatten_list_stream(&integration, None).await.unwrap();
1797        list.sort_unstable();
1798        assert_eq!(list, vec![c, a]);
1799    }
1800
1801    #[tokio::test]
1802    #[cfg(target_os = "windows")]
1803    async fn filesystem_filename_with_colon() {
1804        let root = TempDir::new().unwrap();
1805        let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1806        let path = Path::parse("file%3Aname.parquet").unwrap();
1807        let location = Path::parse("file:name.parquet").unwrap();
1808
1809        integration.put(&location, "test".into()).await.unwrap();
1810        let list = flatten_list_stream(&integration, None).await.unwrap();
1811        assert_eq!(list, vec![path.clone()]);
1812
1813        let result = integration
1814            .get(&location)
1815            .await
1816            .unwrap()
1817            .bytes()
1818            .await
1819            .unwrap();
1820        assert_eq!(result, Bytes::from("test"));
1821    }
1822
1823    #[tokio::test]
1824    async fn delete_dirs_automatically() {
1825        let root = TempDir::new().unwrap();
1826        let integration = LocalFileSystem::new_with_prefix(root.path())
1827            .unwrap()
1828            .with_automatic_cleanup(true);
1829        let location = Path::from("nested/file/test_file");
1830        let data = Bytes::from("arbitrary data");
1831
1832        integration
1833            .put(&location, data.clone().into())
1834            .await
1835            .unwrap();
1836
1837        let read_data = integration
1838            .get(&location)
1839            .await
1840            .unwrap()
1841            .bytes()
1842            .await
1843            .unwrap();
1844
1845        assert_eq!(&*read_data, data);
1846        assert!(fs::read_dir(root.path()).unwrap().count() > 0);
1847        integration.delete(&location).await.unwrap();
1848        assert!(fs::read_dir(root.path()).unwrap().count() == 0);
1849    }
1850}
1851
1852#[cfg(not(target_arch = "wasm32"))]
1853#[cfg(test)]
1854mod not_wasm_tests {
1855    use std::time::Duration;
1856    use tempfile::TempDir;
1857
1858    use crate::local::LocalFileSystem;
1859    use crate::{ObjectStoreExt, Path, PutPayload};
1860
1861    #[tokio::test]
1862    async fn test_cleanup_intermediate_files() {
1863        let root = TempDir::new().unwrap();
1864        let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1865
1866        let location = Path::from("some_file");
1867        let data = PutPayload::from_static(b"hello");
1868        let mut upload = integration.put_multipart(&location).await.unwrap();
1869        upload.put_part(data).await.unwrap();
1870
1871        let file_count = std::fs::read_dir(root.path()).unwrap().count();
1872        assert_eq!(file_count, 1);
1873        drop(upload);
1874
1875        for _ in 0..100 {
1876            tokio::time::sleep(Duration::from_millis(1)).await;
1877            let file_count = std::fs::read_dir(root.path()).unwrap().count();
1878            if file_count == 0 {
1879                return;
1880            }
1881        }
1882        panic!("Failed to cleanup file in 100ms")
1883    }
1884}
1885
1886#[cfg(target_family = "unix")]
1887#[cfg(test)]
1888mod unix_test {
1889    use std::fs::OpenOptions;
1890
1891    use nix::sys::stat;
1892    use nix::unistd;
1893    use tempfile::TempDir;
1894
1895    use crate::local::LocalFileSystem;
1896    use crate::{ObjectStoreExt, Path};
1897
1898    #[tokio::test]
1899    async fn test_fifo() {
1900        let filename = "some_file";
1901        let root = TempDir::new().unwrap();
1902        let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1903        let path = root.path().join(filename);
1904        unistd::mkfifo(&path, stat::Mode::S_IRWXU).unwrap();
1905
1906        // Need to open read and write side in parallel
1907        let spawned =
1908            tokio::task::spawn_blocking(|| OpenOptions::new().write(true).open(path).unwrap());
1909
1910        let location = Path::from(filename);
1911        integration.head(&location).await.unwrap();
1912        integration.get(&location).await.unwrap();
1913
1914        spawned.await.unwrap();
1915    }
1916}