1use std::fs::{File, Metadata, OpenOptions, metadata, symlink_metadata};
20use std::io::{ErrorKind, Read, Seek, SeekFrom, Write};
21use std::ops::Range;
22#[cfg(target_family = "unix")]
23use std::os::unix::fs::FileExt;
24#[cfg(target_family = "windows")]
25use std::os::windows::fs::FileExt;
26use std::sync::Arc;
27use std::time::SystemTime;
28use std::{collections::BTreeSet, io};
29use std::{collections::VecDeque, path::PathBuf};
30
31use async_trait::async_trait;
32use bytes::Bytes;
33use chrono::{DateTime, Utc};
34use futures_util::{FutureExt, TryStreamExt};
35use futures_util::{StreamExt, stream::BoxStream};
36use parking_lot::Mutex;
37use url::Url;
38use walkdir::{DirEntry, WalkDir};
39
40use crate::{
41 Attributes, GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta,
42 ObjectStore, PutMode, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result,
43 UploadPart, maybe_spawn_blocking,
44 path::{Path, absolute_path_to_url},
45 util::InvalidGetRange,
46};
47use crate::{CopyMode, CopyOptions, ObjectStoreExt, RenameOptions, RenameTargetMode};
48
49#[derive(Debug, thiserror::Error)]
51pub(crate) enum Error {
52 #[error("Unable to walk dir: {}", source)]
53 UnableToWalkDir { source: walkdir::Error },
54
55 #[error("Unable to access metadata for {}: {}", path, source)]
56 Metadata {
57 source: Box<dyn std::error::Error + Send + Sync + 'static>,
58 path: String,
59 },
60
61 #[error("Unable to copy data to file: {}", source)]
62 UnableToCopyDataToFile { source: io::Error },
63
64 #[error("Unable to rename file: {}", source)]
65 UnableToRenameFile { source: io::Error },
66
67 #[error("Unable to create dir {}: {}", path.display(), source)]
68 UnableToCreateDir { source: io::Error, path: PathBuf },
69
70 #[error("Unable to create file {}: {}", path.display(), source)]
71 UnableToCreateFile { source: io::Error, path: PathBuf },
72
73 #[error("Unable to delete file {}: {}", path.display(), source)]
74 UnableToDeleteFile { source: io::Error, path: PathBuf },
75
76 #[error("Unable to open file {}: {}", path.display(), source)]
77 UnableToOpenFile { source: io::Error, path: PathBuf },
78
79 #[error("Unable to read data from file {}: {}", path.display(), source)]
80 UnableToReadBytes { source: io::Error, path: PathBuf },
81
82 #[error("Out of range of file {}, expected: {}, actual: {}", path.display(), expected, actual)]
83 OutOfRange {
84 path: PathBuf,
85 expected: u64,
86 actual: u64,
87 },
88
89 #[error("Requested range was invalid")]
90 InvalidRange { source: InvalidGetRange },
91
92 #[error("Unable to copy file from {} to {}: {}", from.display(), to.display(), source)]
93 UnableToCopyFile {
94 from: PathBuf,
95 to: PathBuf,
96 source: io::Error,
97 },
98
99 #[error("NotFound")]
100 NotFound { path: PathBuf, source: io::Error },
101
102 #[error("Error seeking file {}: {}", path.display(), source)]
103 Seek { source: io::Error, path: PathBuf },
104
105 #[error("Unable to convert URL \"{}\" to filesystem path", url)]
106 InvalidUrl { url: Url },
107
108 #[error("AlreadyExists")]
109 AlreadyExists { path: String, source: io::Error },
110
111 #[error("Unable to canonicalize filesystem root: {}", path.display())]
112 UnableToCanonicalize { path: PathBuf, source: io::Error },
113
114 #[error("Filenames containing trailing '/#\\d+/' are not supported: {}", path)]
115 InvalidPath { path: String },
116
117 #[error("Upload aborted")]
118 Aborted,
119}
120
121impl From<Error> for super::Error {
122 fn from(source: Error) -> Self {
123 match source {
124 Error::NotFound { path, source } => Self::NotFound {
125 path: path.to_string_lossy().to_string(),
126 source: source.into(),
127 },
128 Error::AlreadyExists { path, source } => Self::AlreadyExists {
129 path,
130 source: source.into(),
131 },
132 _ => Self::Generic {
133 store: "LocalFileSystem",
134 source: Box::new(source),
135 },
136 }
137 }
138}
139
140#[derive(Clone, Debug)]
201pub struct LocalFileSystem {
202 config: Arc<Config>,
203 automatic_cleanup: bool,
205}
206
207#[derive(Debug)]
208struct Config {
209 root: Url,
210}
211
212impl std::fmt::Display for LocalFileSystem {
213 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
214 write!(f, "LocalFileSystem({})", self.config.root)
215 }
216}
217
218impl Default for LocalFileSystem {
219 fn default() -> Self {
220 Self::new()
221 }
222}
223
224impl LocalFileSystem {
225 pub fn new() -> Self {
227 Self {
228 config: Arc::new(Config {
229 root: Url::parse("file:///").unwrap(),
230 }),
231 automatic_cleanup: false,
232 }
233 }
234
235 pub fn new_with_prefix(prefix: impl AsRef<std::path::Path>) -> Result<Self> {
240 let path = std::fs::canonicalize(&prefix).map_err(|source| {
241 let path = prefix.as_ref().into();
242 Error::UnableToCanonicalize { source, path }
243 })?;
244
245 Ok(Self {
246 config: Arc::new(Config {
247 root: absolute_path_to_url(path)?,
248 }),
249 automatic_cleanup: false,
250 })
251 }
252
253 pub fn path_to_filesystem(&self, location: &Path) -> Result<PathBuf> {
255 self.config.path_to_filesystem(location)
256 }
257
258 pub fn with_automatic_cleanup(mut self, automatic_cleanup: bool) -> Self {
260 self.automatic_cleanup = automatic_cleanup;
261 self
262 }
263}
264
265impl Config {
266 fn prefix_to_filesystem(&self, location: &Path) -> Result<PathBuf> {
268 let mut url = self.root.clone();
269 url.path_segments_mut()
270 .expect("url path")
271 .pop_if_empty()
274 .extend(location.parts());
275
276 url.to_file_path()
277 .map_err(|_| Error::InvalidUrl { url }.into())
278 }
279
280 fn path_to_filesystem(&self, location: &Path) -> Result<PathBuf> {
282 if !is_valid_file_path(location) {
283 let path = location.as_ref().into();
284 let error = Error::InvalidPath { path };
285 return Err(error.into());
286 }
287
288 let path = self.prefix_to_filesystem(location)?;
289
290 #[cfg(target_os = "windows")]
291 let path = {
292 let path = path.to_string_lossy();
293
294 let mut out = String::new();
296 let drive = &path[..2]; let filepath = &path[2..].replace(':', "%3A"); out.push_str(drive);
299 out.push_str(filepath);
300 PathBuf::from(out)
301 };
302
303 Ok(path)
304 }
305
306 fn filesystem_to_path(&self, location: &std::path::Path) -> Result<Path> {
308 Ok(Path::from_absolute_path_with_base(
309 location,
310 Some(&self.root),
311 )?)
312 }
313}
314
315fn is_valid_file_path(path: &Path) -> bool {
316 match path.filename() {
317 Some(p) => match p.split_once('#') {
318 Some((_, suffix)) if !suffix.is_empty() => {
319 !suffix.as_bytes().iter().all(|x| x.is_ascii_digit())
321 }
322 _ => true,
323 },
324 None => false,
325 }
326}
327
328#[async_trait]
329impl ObjectStore for LocalFileSystem {
330 async fn put_opts(
331 &self,
332 location: &Path,
333 payload: PutPayload,
334 opts: PutOptions,
335 ) -> Result<PutResult> {
336 if matches!(opts.mode, PutMode::Update(_)) {
337 return Err(crate::Error::NotImplemented {
338 operation: "`put_opts` with mode `PutMode::Update`".into(),
339 implementer: self.to_string(),
340 });
341 }
342
343 if !opts.attributes.is_empty() {
344 return Err(crate::Error::NotImplemented {
345 operation: "`put_opts` with `opts.attributes` specified".into(),
346 implementer: self.to_string(),
347 });
348 }
349
350 let path = self.path_to_filesystem(location)?;
351 maybe_spawn_blocking(move || {
352 let (mut file, staging_path) = new_staged_upload(&path)?;
353 let mut e_tag = None;
354
355 let err = match payload.iter().try_for_each(|x| file.write_all(x)) {
356 Ok(_) => {
357 let metadata = file.metadata().map_err(|e| Error::Metadata {
358 source: e.into(),
359 path: path.to_string_lossy().to_string(),
360 })?;
361 e_tag = Some(get_etag(&metadata));
362 match opts.mode {
363 PutMode::Overwrite => {
364 std::mem::drop(file);
367 match std::fs::rename(&staging_path, &path) {
368 Ok(_) => None,
369 Err(source) => Some(Error::UnableToRenameFile { source }),
370 }
371 }
372 PutMode::Create => match std::fs::hard_link(&staging_path, &path) {
373 Ok(_) => {
374 let _ = std::fs::remove_file(&staging_path); None
376 }
377 Err(source) => match source.kind() {
378 ErrorKind::AlreadyExists => Some(Error::AlreadyExists {
379 path: path.to_str().unwrap().to_string(),
380 source,
381 }),
382 _ => Some(Error::UnableToRenameFile { source }),
383 },
384 },
385 PutMode::Update(_) => unreachable!(),
386 }
387 }
388 Err(source) => Some(Error::UnableToCopyDataToFile { source }),
389 };
390
391 if let Some(err) = err {
392 let _ = std::fs::remove_file(&staging_path); return Err(err.into());
394 }
395
396 Ok(PutResult {
397 e_tag,
398 version: None,
399 })
400 })
401 .await
402 }
403
404 async fn put_multipart_opts(
405 &self,
406 location: &Path,
407 opts: PutMultipartOptions,
408 ) -> Result<Box<dyn MultipartUpload>> {
409 if !opts.attributes.is_empty() {
410 return Err(crate::Error::NotImplemented {
411 operation: "`put_multipart_opts` with `opts.attributes` specified".into(),
412 implementer: self.to_string(),
413 });
414 }
415
416 let dest = self.path_to_filesystem(location)?;
417 let (file, src) = new_staged_upload(&dest)?;
418 Ok(Box::new(LocalUpload::new(src, dest, file)))
419 }
420
421 async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
422 let location = location.clone();
423 let path = self.path_to_filesystem(&location)?;
424 maybe_spawn_blocking(move || {
425 let file = open_file(&path)?;
426 let metadata = open_metadata(&file, &path)?;
427 let meta = convert_metadata(metadata, location);
428 options.check_preconditions(&meta)?;
429
430 let range = match options.range {
431 Some(r) => r
432 .as_range(meta.size)
433 .map_err(|source| Error::InvalidRange { source })?,
434 None => 0..meta.size,
435 };
436
437 Ok(GetResult {
438 payload: GetResultPayload::File(file, path),
439 attributes: Attributes::default(),
440 range,
441 meta,
442 })
443 })
444 .await
445 }
446
447 async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
448 let path = self.path_to_filesystem(location)?;
449 let ranges = ranges.to_vec();
450 maybe_spawn_blocking(move || {
451 let mut file = File::open(&path).map_err(|e| map_open_error(e, &path))?;
454 ranges
455 .into_iter()
456 .map(|r| read_range(&mut file, &path, r))
457 .collect()
458 })
459 .await
460 }
461
462 fn delete_stream(
463 &self,
464 locations: BoxStream<'static, Result<Path>>,
465 ) -> BoxStream<'static, Result<Path>> {
466 let config = Arc::clone(&self.config);
467 let automatic_cleanup = self.automatic_cleanup;
468 locations
469 .map(move |location| {
470 let config = Arc::clone(&config);
471 maybe_spawn_blocking(move || {
472 let location = location?;
473 Self::delete_location(config, automatic_cleanup, &location)?;
474 Ok(location)
475 })
476 })
477 .buffered(10)
478 .boxed()
479 }
480
481 fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
482 Self::list_with_maybe_offset(Arc::clone(&self.config), prefix, None)
483 }
484
485 fn list_with_offset(
486 &self,
487 prefix: Option<&Path>,
488 offset: &Path,
489 ) -> BoxStream<'static, Result<ObjectMeta>> {
490 Self::list_with_maybe_offset(Arc::clone(&self.config), prefix, Some(offset))
491 }
492
493 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
494 let config = Arc::clone(&self.config);
495
496 let prefix = prefix.cloned().unwrap_or_default();
497 let resolved_prefix = config.prefix_to_filesystem(&prefix)?;
498
499 maybe_spawn_blocking(move || {
500 let walkdir = WalkDir::new(&resolved_prefix)
501 .min_depth(1)
502 .max_depth(1)
503 .follow_links(true);
504
505 let mut common_prefixes = BTreeSet::new();
506 let mut objects = Vec::new();
507
508 for entry_res in walkdir.into_iter().map(convert_walkdir_result) {
509 if let Some(entry) = entry_res? {
510 let is_directory = entry.file_type().is_dir();
511 let entry_location = config.filesystem_to_path(entry.path())?;
512 if !is_directory && !is_valid_file_path(&entry_location) {
513 continue;
514 }
515
516 let mut parts = match entry_location.prefix_match(&prefix) {
517 Some(parts) => parts,
518 None => continue,
519 };
520
521 let common_prefix = match parts.next() {
522 Some(p) => p,
523 None => continue,
524 };
525
526 drop(parts);
527
528 if is_directory {
529 common_prefixes.insert(prefix.clone().join(common_prefix));
530 } else if let Some(metadata) = convert_entry(entry, entry_location)? {
531 objects.push(metadata);
532 }
533 }
534 }
535
536 Ok(ListResult {
537 common_prefixes: common_prefixes.into_iter().collect(),
538 objects,
539 })
540 })
541 .await
542 }
543
544 async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) -> Result<()> {
545 let CopyOptions {
546 mode,
547 extensions: _,
548 } = options;
549
550 let from = self.path_to_filesystem(from)?;
551 let to = self.path_to_filesystem(to)?;
552
553 match mode {
554 CopyMode::Overwrite => {
555 let mut id = 0;
556 maybe_spawn_blocking(move || {
563 loop {
564 let staged = staged_upload_path(&to, &id.to_string());
565 match std::fs::hard_link(&from, &staged) {
566 Ok(_) => {
567 return std::fs::rename(&staged, &to).map_err(|source| {
568 let _ = std::fs::remove_file(&staged); Error::UnableToCopyFile { from, to, source }.into()
570 });
571 }
572 Err(source) => match source.kind() {
573 ErrorKind::AlreadyExists => id += 1,
574 ErrorKind::NotFound => match from.exists() {
575 true => create_parent_dirs(&to, source)?,
576 false => {
577 return Err(Error::NotFound { path: from, source }.into());
578 }
579 },
580 _ => {
581 return Err(Error::UnableToCopyFile { from, to, source }.into());
582 }
583 },
584 }
585 }
586 })
587 .await
588 }
589 CopyMode::Create => {
590 maybe_spawn_blocking(move || {
591 loop {
592 match std::fs::hard_link(&from, &to) {
593 Ok(_) => return Ok(()),
594 Err(source) => match source.kind() {
595 ErrorKind::AlreadyExists => {
596 return Err(Error::AlreadyExists {
597 path: to.to_str().unwrap().to_string(),
598 source,
599 }
600 .into());
601 }
602 ErrorKind::NotFound => match from.exists() {
603 true => create_parent_dirs(&to, source)?,
604 false => {
605 return Err(Error::NotFound { path: from, source }.into());
606 }
607 },
608 _ => {
609 return Err(Error::UnableToCopyFile { from, to, source }.into());
610 }
611 },
612 }
613 }
614 })
615 .await
616 }
617 }
618 }
619
620 async fn rename_opts(&self, from: &Path, to: &Path, options: RenameOptions) -> Result<()> {
621 let RenameOptions {
622 target_mode,
623 extensions,
624 } = options;
625
626 match target_mode {
627 RenameTargetMode::Overwrite => {
629 let from = self.path_to_filesystem(from)?;
630 let to = self.path_to_filesystem(to)?;
631 maybe_spawn_blocking(move || {
632 loop {
633 match std::fs::rename(&from, &to) {
634 Ok(_) => return Ok(()),
635 Err(source) => match source.kind() {
636 ErrorKind::NotFound => match from.exists() {
637 true => create_parent_dirs(&to, source)?,
638 false => {
639 return Err(Error::NotFound { path: from, source }.into());
640 }
641 },
642 _ => {
643 return Err(Error::UnableToCopyFile { from, to, source }.into());
644 }
645 },
646 }
647 }
648 })
649 .await
650 }
651 RenameTargetMode::Create => {
653 self.copy_opts(
654 from,
655 to,
656 CopyOptions {
657 mode: CopyMode::Create,
658 extensions,
659 },
660 )
661 .await?;
662 self.delete(from).await?;
663 Ok(())
664 }
665 }
666 }
667}
668
669impl LocalFileSystem {
670 fn delete_location(
671 config: Arc<Config>,
672 automatic_cleanup: bool,
673 location: &Path,
674 ) -> Result<()> {
675 let path = config.path_to_filesystem(location)?;
676 if let Err(e) = std::fs::remove_file(&path) {
677 Err(match e.kind() {
678 ErrorKind::NotFound => Error::NotFound { path, source: e }.into(),
679 _ => Error::UnableToDeleteFile { path, source: e }.into(),
680 })
681 } else if automatic_cleanup {
682 let root = &config.root;
683 let root = root
684 .to_file_path()
685 .map_err(|_| Error::InvalidUrl { url: root.clone() })?;
686
687 let mut parent = path.parent();
689
690 while let Some(loc) = parent {
691 if loc != root && std::fs::remove_dir(loc).is_ok() {
692 parent = loc.parent();
693 } else {
694 break;
695 }
696 }
697
698 Ok(())
699 } else {
700 Ok(())
701 }
702 }
703
704 fn list_with_maybe_offset(
705 config: Arc<Config>,
706 prefix: Option<&Path>,
707 maybe_offset: Option<&Path>,
708 ) -> BoxStream<'static, Result<ObjectMeta>> {
709 let root_path = match prefix {
710 Some(prefix) => match config.prefix_to_filesystem(prefix) {
711 Ok(path) => path,
712 Err(e) => return futures_util::future::ready(Err(e)).into_stream().boxed(),
713 },
714 None => config.root.to_file_path().unwrap(),
715 };
716
717 let walkdir = WalkDir::new(root_path)
718 .min_depth(1)
720 .follow_links(true);
721
722 let maybe_offset = maybe_offset.cloned();
723
724 let s = walkdir.into_iter().flat_map(move |result_dir_entry| {
725 if let (Some(offset), Ok(entry)) = (maybe_offset.as_ref(), result_dir_entry.as_ref()) {
728 let location = config.filesystem_to_path(entry.path());
729 match location {
730 Ok(path) if path <= *offset => return None,
731 Err(e) => return Some(Err(e)),
732 _ => {}
733 }
734 }
735
736 let entry = match convert_walkdir_result(result_dir_entry).transpose()? {
737 Ok(entry) => entry,
738 Err(e) => return Some(Err(e)),
739 };
740
741 if !entry.path().is_file() {
742 return None;
743 }
744
745 match config.filesystem_to_path(entry.path()) {
746 Ok(path) => match is_valid_file_path(&path) {
747 true => convert_entry(entry, path).transpose(),
748 false => None,
749 },
750 Err(e) => Some(Err(e)),
751 }
752 });
753
754 if tokio::runtime::Handle::try_current().is_err() {
757 return futures_util::stream::iter(s).boxed();
758 }
759
760 const CHUNK_SIZE: usize = 1024;
762
763 let buffer = VecDeque::with_capacity(CHUNK_SIZE);
764 futures_util::stream::try_unfold((s, buffer), |(mut s, mut buffer)| async move {
765 if buffer.is_empty() {
766 (s, buffer) = tokio::task::spawn_blocking(move || {
767 for _ in 0..CHUNK_SIZE {
768 match s.next() {
769 Some(r) => buffer.push_back(r),
770 None => break,
771 }
772 }
773 (s, buffer)
774 })
775 .await?;
776 }
777
778 match buffer.pop_front() {
779 Some(Err(e)) => Err(e),
780 Some(Ok(meta)) => Ok(Some((meta, (s, buffer)))),
781 None => Ok(None),
782 }
783 })
784 .boxed()
785 }
786}
787
788fn create_parent_dirs(path: &std::path::Path, source: io::Error) -> Result<()> {
790 let parent = path.parent().ok_or_else(|| {
791 let path = path.to_path_buf();
792 Error::UnableToCreateFile { path, source }
793 })?;
794
795 std::fs::create_dir_all(parent).map_err(|source| {
796 let path = parent.into();
797 Error::UnableToCreateDir { source, path }
798 })?;
799 Ok(())
800}
801
802fn new_staged_upload(base: &std::path::Path) -> Result<(File, PathBuf)> {
806 let mut multipart_id = 1;
807 loop {
808 let suffix = multipart_id.to_string();
809 let path = staged_upload_path(base, &suffix);
810 let mut options = OpenOptions::new();
811 match options.read(true).write(true).create_new(true).open(&path) {
812 Ok(f) => return Ok((f, path)),
813 Err(source) => match source.kind() {
814 ErrorKind::AlreadyExists => multipart_id += 1,
815 ErrorKind::NotFound => create_parent_dirs(&path, source)?,
816 _ => return Err(Error::UnableToOpenFile { source, path }.into()),
817 },
818 }
819 }
820}
821
822fn staged_upload_path(dest: &std::path::Path, suffix: &str) -> PathBuf {
824 let mut staging_path = dest.as_os_str().to_owned();
825 staging_path.push("#");
826 staging_path.push(suffix);
827 staging_path.into()
828}
829
830#[derive(Debug)]
831struct LocalUpload {
832 state: Arc<UploadState>,
834 src: Option<PathBuf>,
836 offset: u64,
838}
839
840#[derive(Debug)]
841struct UploadState {
842 dest: PathBuf,
843 file: Mutex<File>,
844}
845
846impl LocalUpload {
847 pub(crate) fn new(src: PathBuf, dest: PathBuf, file: File) -> Self {
848 Self {
849 state: Arc::new(UploadState {
850 dest,
851 file: Mutex::new(file),
852 }),
853 src: Some(src),
854 offset: 0,
855 }
856 }
857}
858
859#[async_trait]
860impl MultipartUpload for LocalUpload {
861 fn put_part(&mut self, data: PutPayload) -> UploadPart {
862 let offset = self.offset;
863 self.offset += data.content_length() as u64;
864
865 let s = Arc::clone(&self.state);
866 maybe_spawn_blocking(move || {
867 let mut file = s.file.lock();
868 file.seek(SeekFrom::Start(offset)).map_err(|source| {
869 let path = s.dest.clone();
870 Error::Seek { source, path }
871 })?;
872
873 data.iter()
874 .try_for_each(|x| file.write_all(x))
875 .map_err(|source| Error::UnableToCopyDataToFile { source })?;
876
877 Ok(())
878 })
879 .boxed()
880 }
881
882 async fn complete(&mut self) -> Result<PutResult> {
883 let src = self.src.take().ok_or(Error::Aborted)?;
884 let s = Arc::clone(&self.state);
885 maybe_spawn_blocking(move || {
886 let file = s.file.lock();
888 std::fs::rename(&src, &s.dest)
889 .map_err(|source| Error::UnableToRenameFile { source })?;
890 let metadata = file.metadata().map_err(|e| Error::Metadata {
891 source: e.into(),
892 path: src.to_string_lossy().to_string(),
893 })?;
894
895 Ok(PutResult {
896 e_tag: Some(get_etag(&metadata)),
897 version: None,
898 })
899 })
900 .await
901 }
902
903 async fn abort(&mut self) -> Result<()> {
904 let src = self.src.take().ok_or(Error::Aborted)?;
905 maybe_spawn_blocking(move || {
906 std::fs::remove_file(&src)
907 .map_err(|source| Error::UnableToDeleteFile { source, path: src })?;
908 Ok(())
909 })
910 .await
911 }
912}
913
914impl Drop for LocalUpload {
915 fn drop(&mut self) {
916 if let Some(src) = self.src.take() {
917 match tokio::runtime::Handle::try_current() {
919 Ok(r) => drop(r.spawn_blocking(move || std::fs::remove_file(src))),
920 Err(_) => drop(std::fs::remove_file(src)),
921 };
922 }
923 }
924}
925
926pub(crate) fn chunked_stream(
927 mut file: File,
928 path: PathBuf,
929 range: Range<u64>,
930 chunk_size: usize,
931) -> BoxStream<'static, Result<Bytes, super::Error>> {
932 futures_util::stream::once(async move {
933 let requested = range.end - range.start;
934
935 let (file, path) = maybe_spawn_blocking(move || {
936 file.seek(SeekFrom::Start(range.start as _))
937 .map_err(|err| map_seek_error(err, &file, &path, range.start))?;
938 Ok((file, path))
939 })
940 .await?;
941
942 let stream = futures_util::stream::try_unfold(
943 (file, path, requested),
944 move |(mut file, path, remaining)| {
945 maybe_spawn_blocking(move || {
946 if remaining == 0 {
947 return Ok(None);
948 }
949
950 let to_read = remaining.min(chunk_size as u64);
951 let cap = usize::try_from(to_read).map_err(|_e| Error::InvalidRange {
952 source: InvalidGetRange::TooLarge {
953 requested: to_read,
954 max: usize::MAX as u64,
955 },
956 })?;
957 let mut buffer = Vec::with_capacity(cap);
958 let read = (&mut file)
959 .take(to_read)
960 .read_to_end(&mut buffer)
961 .map_err(|e| Error::UnableToReadBytes {
962 source: e,
963 path: path.clone(),
964 })?;
965
966 Ok(Some((buffer.into(), (file, path, remaining - read as u64))))
967 })
968 },
969 );
970 Ok::<_, super::Error>(stream)
971 })
972 .try_flatten()
973 .boxed()
974}
975
976pub(crate) fn read_range(
977 file: &mut File,
978 path: &std::path::Path,
979 range: Range<u64>,
980) -> Result<Bytes> {
981 let requested = range.end - range.start;
982
983 let mut buf = Vec::with_capacity(requested as usize);
984
985 #[cfg(any(target_family = "unix", target_family = "windows"))]
986 {
987 buf.resize(requested as usize, 0_u8);
988
989 let mut buf_slice = &mut buf[..];
990 let mut offset = range.start;
991
992 while !buf_slice.is_empty() {
993 #[cfg(target_family = "unix")]
994 let read_result = file.read_at(buf_slice, offset);
995
996 #[cfg(target_family = "windows")]
997 let read_result = file.seek_read(buf_slice, offset);
998
999 match read_result {
1000 Ok(0) => break,
1001 Ok(n) => {
1002 let tmp = buf_slice;
1003 buf_slice = &mut tmp[n..];
1004 offset += n as u64;
1005 }
1006 Err(e) if e.kind() == ErrorKind::Interrupted => {}
1008 Err(source) => {
1009 let error = Error::UnableToReadBytes {
1010 source,
1011 path: path.into(),
1012 };
1013
1014 return Err(error.into());
1015 }
1016 }
1017 }
1018
1019 if !buf_slice.is_empty() {
1021 let metadata = open_metadata(file, path)?;
1022 let file_len = metadata.len();
1023
1024 if range.start >= file_len {
1028 return Err(Error::InvalidRange {
1029 source: InvalidGetRange::StartTooLarge {
1030 requested: range.start,
1031 length: file_len,
1032 },
1033 }
1034 .into());
1035 }
1036
1037 let expected = range.end.min(file_len) - range.start;
1038
1039 let error = Error::OutOfRange {
1040 path: path.into(),
1041 expected,
1042 actual: offset - range.start,
1043 };
1044
1045 return Err(error.into());
1046 }
1047 }
1048 #[cfg(all(not(windows), not(unix)))]
1049 {
1050 file.seek(SeekFrom::Start(range.start))
1051 .map_err(|err| map_seek_error(err, file, path, range.start))?;
1052
1053 let read = file.take(requested).read_to_end(&mut buf).map_err(|err| {
1054 if let Err(e) = open_metadata(file, path) {
1056 return e;
1057 }
1058 Error::UnableToReadBytes {
1059 source: err,
1060 path: path.to_path_buf(),
1061 }
1062 })? as u64;
1063
1064 if read != requested {
1065 let metadata = open_metadata(file, path)?;
1066 let file_len = metadata.len();
1067
1068 if range.start >= file_len {
1069 return Err(Error::InvalidRange {
1070 source: InvalidGetRange::StartTooLarge {
1071 requested: range.start,
1072 length: file_len,
1073 },
1074 }
1075 .into());
1076 }
1077
1078 let expected = range.end.min(file_len) - range.start;
1079 if read != expected {
1080 return Err(Error::OutOfRange {
1081 path: path.to_path_buf(),
1082 expected,
1083 actual: read,
1084 }
1085 .into());
1086 }
1087 }
1088 }
1089
1090 Ok(buf.into())
1091}
1092
1093fn open_file(path: &std::path::Path) -> Result<File, Error> {
1094 File::open(path).map_err(|e| map_open_error(e, path))
1095}
1096
1097fn open_metadata(file: &File, path: &std::path::Path) -> Result<Metadata, Error> {
1098 let metadata = file.metadata().map_err(|e| map_open_error(e, path))?;
1099 if metadata.is_dir() {
1100 Err(Error::NotFound {
1101 path: PathBuf::from(path),
1102 source: io::Error::new(ErrorKind::NotFound, "is directory"),
1103 })
1104 } else {
1105 Ok(metadata)
1106 }
1107}
1108
1109fn map_open_error(source: io::Error, path: &std::path::Path) -> Error {
1111 let path = PathBuf::from(path);
1112 match source.kind() {
1113 ErrorKind::NotFound => Error::NotFound { path, source },
1114 _ => Error::UnableToOpenFile { path, source },
1115 }
1116}
1117
1118fn map_seek_error(source: io::Error, file: &File, path: &std::path::Path, requested: u64) -> Error {
1120 let m = match open_metadata(file, path) {
1124 Err(e) => return e,
1125 Ok(m) => m,
1126 };
1127 if requested >= m.len() {
1128 return Error::InvalidRange {
1129 source: InvalidGetRange::StartTooLarge {
1130 requested,
1131 length: m.len(),
1132 },
1133 };
1134 }
1135 Error::Seek {
1136 source,
1137 path: PathBuf::from(path),
1138 }
1139}
1140
1141fn convert_entry(entry: DirEntry, location: Path) -> Result<Option<ObjectMeta>> {
1142 match entry.metadata() {
1143 Ok(metadata) => Ok(Some(convert_metadata(metadata, location))),
1144 Err(e) => {
1145 if let Some(io_err) = e.io_error() {
1146 if io_err.kind() == ErrorKind::NotFound {
1147 return Ok(None);
1148 }
1149 }
1150 Err(Error::Metadata {
1151 source: e.into(),
1152 path: location.to_string(),
1153 })?
1154 }
1155 }
1156}
1157
1158fn last_modified(metadata: &Metadata) -> DateTime<Utc> {
1159 metadata
1160 .modified()
1161 .expect("Modified file time should be supported on this platform")
1162 .into()
1163}
1164
1165fn get_etag(metadata: &Metadata) -> String {
1166 let inode = get_inode(metadata);
1167 let size = metadata.len();
1168 let mtime = metadata
1169 .modified()
1170 .ok()
1171 .and_then(|mtime| mtime.duration_since(SystemTime::UNIX_EPOCH).ok())
1172 .unwrap_or_default()
1173 .as_micros();
1174
1175 format!("{inode:x}-{mtime:x}-{size:x}")
1179}
1180
1181fn convert_metadata(metadata: Metadata, location: Path) -> ObjectMeta {
1182 let last_modified = last_modified(&metadata);
1183
1184 ObjectMeta {
1185 location,
1186 last_modified,
1187 size: metadata.len(),
1188 e_tag: Some(get_etag(&metadata)),
1189 version: None,
1190 }
1191}
1192
1193#[cfg(unix)]
1194fn get_inode(metadata: &Metadata) -> u64 {
1197 std::os::unix::fs::MetadataExt::ino(metadata)
1198}
1199
1200#[cfg(not(unix))]
1201fn get_inode(_metadata: &Metadata) -> u64 {
1203 0
1204}
1205
1206fn convert_walkdir_result(
1209 res: std::result::Result<DirEntry, walkdir::Error>,
1210) -> Result<Option<DirEntry>> {
1211 match res {
1212 Ok(entry) => {
1213 match symlink_metadata(entry.path()) {
1216 Ok(attr) => {
1217 if attr.is_symlink() {
1218 let target_metadata = metadata(entry.path());
1219 match target_metadata {
1220 Ok(_) => {
1221 Ok(Some(entry))
1223 }
1224 Err(_) => {
1225 Ok(None)
1227 }
1228 }
1229 } else {
1230 Ok(Some(entry))
1231 }
1232 }
1233 Err(_) => Ok(None),
1234 }
1235 }
1236
1237 Err(walkdir_err) => match walkdir_err.io_error() {
1238 Some(io_err) => match io_err.kind() {
1239 ErrorKind::NotFound => Ok(None),
1240 _ => Err(Error::UnableToWalkDir {
1241 source: walkdir_err,
1242 }
1243 .into()),
1244 },
1245 None => Err(Error::UnableToWalkDir {
1246 source: walkdir_err,
1247 }
1248 .into()),
1249 },
1250 }
1251}
1252
1253#[cfg(test)]
1254mod tests {
1255 use std::fs;
1256
1257 use futures_util::TryStreamExt;
1258 use tempfile::TempDir;
1259
1260 #[cfg(target_family = "unix")]
1261 use tempfile::NamedTempFile;
1262
1263 use crate::{ObjectStoreExt, integration::*};
1264
1265 use super::*;
1266
1267 #[tokio::test]
1268 #[cfg(target_family = "unix")]
1269 async fn file_test() {
1270 let root = TempDir::new().unwrap();
1271 let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1272
1273 put_get_delete_list(&integration).await;
1274 list_with_offset_exclusivity(&integration).await;
1275 get_opts(&integration).await;
1276 list_uses_directories_correctly(&integration).await;
1277 list_with_delimiter(&integration).await;
1278 rename_and_copy(&integration).await;
1279 copy_if_not_exists(&integration).await;
1280 copy_rename_nonexistent_object(&integration).await;
1281 stream_get(&integration).await;
1282 put_opts(&integration, false).await;
1283 }
1284
1285 #[test]
1286 #[cfg(target_family = "unix")]
1287 fn test_non_tokio() {
1288 let root = TempDir::new().unwrap();
1289 let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1290 futures_executor::block_on(async move {
1291 put_get_delete_list(&integration).await;
1292 list_uses_directories_correctly(&integration).await;
1293 list_with_delimiter(&integration).await;
1294
1295 let p = Path::from("manual_upload");
1297 let mut upload = integration.put_multipart(&p).await.unwrap();
1298 upload.put_part("123".into()).await.unwrap();
1299 upload.put_part("45678".into()).await.unwrap();
1300 let r = upload.complete().await.unwrap();
1301
1302 let get = integration.get(&p).await.unwrap();
1303 assert_eq!(get.meta.e_tag.as_ref().unwrap(), r.e_tag.as_ref().unwrap());
1304 let actual = get.bytes().await.unwrap();
1305 assert_eq!(actual.as_ref(), b"12345678");
1306 });
1307 }
1308
1309 #[tokio::test]
1310 async fn creates_dir_if_not_present() {
1311 let root = TempDir::new().unwrap();
1312 let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1313
1314 let location = Path::from("nested/file/test_file");
1315
1316 let data = Bytes::from("arbitrary data");
1317
1318 integration
1319 .put(&location, data.clone().into())
1320 .await
1321 .unwrap();
1322
1323 let read_data = integration
1324 .get(&location)
1325 .await
1326 .unwrap()
1327 .bytes()
1328 .await
1329 .unwrap();
1330 assert_eq!(&*read_data, data);
1331 }
1332
1333 #[tokio::test]
1334 async fn unknown_length() {
1335 let root = TempDir::new().unwrap();
1336 let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1337
1338 let location = Path::from("some_file");
1339
1340 let data = Bytes::from("arbitrary data");
1341
1342 integration
1343 .put(&location, data.clone().into())
1344 .await
1345 .unwrap();
1346
1347 let read_data = integration
1348 .get(&location)
1349 .await
1350 .unwrap()
1351 .bytes()
1352 .await
1353 .unwrap();
1354 assert_eq!(&*read_data, data);
1355 }
1356
1357 #[tokio::test]
1358 async fn range_request_start_beyond_end_of_file() {
1359 let root = TempDir::new().unwrap();
1360 let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1361
1362 let location = Path::from("some_file");
1363
1364 let data = Bytes::from("arbitrary data");
1365
1366 integration
1367 .put(&location, data.clone().into())
1368 .await
1369 .unwrap();
1370
1371 integration
1372 .get_range(&location, 100..200)
1373 .await
1374 .expect_err("Should error with start range beyond end of file");
1375 }
1376
1377 #[tokio::test]
1378 async fn range_request_beyond_end_of_file() {
1379 let root = TempDir::new().unwrap();
1380 let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1381
1382 let location = Path::from("some_file");
1383
1384 let data = Bytes::from("arbitrary data");
1385
1386 integration
1387 .put(&location, data.clone().into())
1388 .await
1389 .unwrap();
1390
1391 let read_data = integration.get_range(&location, 0..100).await.unwrap();
1392 assert_eq!(&*read_data, data);
1393 }
1394
1395 #[tokio::test]
1396 #[cfg(target_family = "unix")]
1397 #[ignore]
1399 async fn bubble_up_io_errors() {
1400 use std::{fs::set_permissions, os::unix::prelude::PermissionsExt};
1401
1402 let root = TempDir::new().unwrap();
1403
1404 let metadata = root.path().metadata().unwrap();
1406 let mut permissions = metadata.permissions();
1407 permissions.set_mode(0o000);
1408 set_permissions(root.path(), permissions).unwrap();
1409
1410 let store = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1411
1412 let mut stream = store.list(None);
1413 let mut any_err = false;
1414 while let Some(res) = stream.next().await {
1415 if res.is_err() {
1416 any_err = true;
1417 }
1418 }
1419 assert!(any_err);
1420
1421 assert!(store.list_with_delimiter(None).await.is_err());
1423 }
1424
1425 const NON_EXISTENT_NAME: &str = "nonexistentname";
1426
1427 #[tokio::test]
1428 async fn get_nonexistent_location() {
1429 let root = TempDir::new().unwrap();
1430 let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1431
1432 let location = Path::from(NON_EXISTENT_NAME);
1433
1434 let err = get_nonexistent_object(&integration, Some(location))
1435 .await
1436 .unwrap_err();
1437 if let crate::Error::NotFound { path, source } = err {
1438 let source_variant = source.downcast_ref::<std::io::Error>();
1439 assert!(
1440 matches!(source_variant, Some(std::io::Error { .. }),),
1441 "got: {source_variant:?}"
1442 );
1443 assert!(path.ends_with(NON_EXISTENT_NAME), "{}", path);
1444 } else {
1445 panic!("unexpected error type: {err:?}");
1446 }
1447 }
1448
1449 #[tokio::test]
1450 async fn root() {
1451 let integration = LocalFileSystem::new();
1452
1453 let canonical = std::path::Path::new("Cargo.toml").canonicalize().unwrap();
1454 let url = Url::from_directory_path(&canonical).unwrap();
1455 let path = Path::parse(url.path()).unwrap();
1456
1457 let roundtrip = integration.path_to_filesystem(&path).unwrap();
1458
1459 let roundtrip = roundtrip.canonicalize().unwrap();
1462
1463 assert_eq!(roundtrip, canonical);
1464
1465 integration.head(&path).await.unwrap();
1466 }
1467
1468 #[tokio::test]
1469 #[cfg(target_family = "windows")]
1470 async fn test_list_root() {
1471 let fs = LocalFileSystem::new();
1472 let r = fs.list_with_delimiter(None).await.unwrap_err().to_string();
1473
1474 assert!(
1475 r.contains("Unable to convert URL \"file:///\" to filesystem path"),
1476 "{}",
1477 r
1478 );
1479 }
1480
1481 #[tokio::test]
1482 #[cfg(target_os = "linux")]
1483 async fn test_list_root() {
1484 let fs = LocalFileSystem::new();
1485 fs.list_with_delimiter(None).await.unwrap();
1486 }
1487
1488 #[cfg(target_family = "unix")]
1489 async fn check_list(integration: &LocalFileSystem, prefix: Option<&Path>, expected: &[&str]) {
1490 let result: Vec<_> = integration.list(prefix).try_collect().await.unwrap();
1491
1492 let mut strings: Vec<_> = result.iter().map(|x| x.location.as_ref()).collect();
1493 strings.sort_unstable();
1494 assert_eq!(&strings, expected)
1495 }
1496
1497 #[tokio::test]
1498 #[cfg(target_family = "unix")]
1499 async fn test_symlink() {
1500 let root = TempDir::new().unwrap();
1501 let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1502
1503 let subdir = root.path().join("a");
1504 std::fs::create_dir(&subdir).unwrap();
1505 let file = subdir.join("file.parquet");
1506 std::fs::write(file, "test").unwrap();
1507
1508 check_list(&integration, None, &["a/file.parquet"]).await;
1509 integration
1510 .head(&Path::from("a/file.parquet"))
1511 .await
1512 .unwrap();
1513
1514 let other = NamedTempFile::new().unwrap();
1516 std::os::unix::fs::symlink(other.path(), root.path().join("test.parquet")).unwrap();
1517
1518 check_list(&integration, None, &["a/file.parquet", "test.parquet"]).await;
1520
1521 integration.head(&Path::from("test.parquet")).await.unwrap();
1523
1524 std::os::unix::fs::symlink(&subdir, root.path().join("b")).unwrap();
1526 check_list(
1527 &integration,
1528 None,
1529 &["a/file.parquet", "b/file.parquet", "test.parquet"],
1530 )
1531 .await;
1532 check_list(&integration, Some(&Path::from("b")), &["b/file.parquet"]).await;
1533
1534 integration
1536 .head(&Path::from("b/file.parquet"))
1537 .await
1538 .unwrap();
1539
1540 std::os::unix::fs::symlink(root.path().join("foo.parquet"), root.path().join("c")).unwrap();
1542
1543 check_list(
1544 &integration,
1545 None,
1546 &["a/file.parquet", "b/file.parquet", "test.parquet"],
1547 )
1548 .await;
1549
1550 let mut r = integration.list_with_delimiter(None).await.unwrap();
1551 r.common_prefixes.sort_unstable();
1552 assert_eq!(r.common_prefixes.len(), 2);
1553 assert_eq!(r.common_prefixes[0].as_ref(), "a");
1554 assert_eq!(r.common_prefixes[1].as_ref(), "b");
1555 assert_eq!(r.objects.len(), 1);
1556 assert_eq!(r.objects[0].location.as_ref(), "test.parquet");
1557
1558 let r = integration
1559 .list_with_delimiter(Some(&Path::from("a")))
1560 .await
1561 .unwrap();
1562 assert_eq!(r.common_prefixes.len(), 0);
1563 assert_eq!(r.objects.len(), 1);
1564 assert_eq!(r.objects[0].location.as_ref(), "a/file.parquet");
1565
1566 integration
1568 .delete(&Path::from("test.parquet"))
1569 .await
1570 .unwrap();
1571 assert!(other.path().exists());
1572
1573 check_list(&integration, None, &["a/file.parquet", "b/file.parquet"]).await;
1574
1575 integration
1577 .delete(&Path::from("b/file.parquet"))
1578 .await
1579 .unwrap();
1580
1581 check_list(&integration, None, &[]).await;
1582
1583 integration
1585 .put(&Path::from("b/file.parquet"), vec![0, 1, 2].into())
1586 .await
1587 .unwrap();
1588
1589 check_list(&integration, None, &["a/file.parquet", "b/file.parquet"]).await;
1590 }
1591
1592 #[tokio::test]
1593 async fn invalid_path() {
1594 let root = TempDir::new().unwrap();
1595 let root = root.path().join("🙀");
1596 std::fs::create_dir(root.clone()).unwrap();
1597
1598 let integration = LocalFileSystem::new_with_prefix(root.clone()).unwrap();
1600
1601 let directory = Path::from("directory");
1602 let object = directory.clone().join("child.txt");
1603 let data = Bytes::from("arbitrary");
1604 integration.put(&object, data.clone().into()).await.unwrap();
1605 integration.head(&object).await.unwrap();
1606 let result = integration.get(&object).await.unwrap();
1607 assert_eq!(result.bytes().await.unwrap(), data);
1608
1609 flatten_list_stream(&integration, None).await.unwrap();
1610 flatten_list_stream(&integration, Some(&directory))
1611 .await
1612 .unwrap();
1613
1614 let result = integration
1615 .list_with_delimiter(Some(&directory))
1616 .await
1617 .unwrap();
1618 assert_eq!(result.objects.len(), 1);
1619 assert!(result.common_prefixes.is_empty());
1620 assert_eq!(result.objects[0].location, object);
1621
1622 let emoji = root.join("💀");
1623 std::fs::write(emoji, "foo").unwrap();
1624
1625 let mut paths = flatten_list_stream(&integration, None).await.unwrap();
1627 paths.sort_unstable();
1628
1629 assert_eq!(
1630 paths,
1631 vec![
1632 Path::parse("directory/child.txt").unwrap(),
1633 Path::parse("💀").unwrap()
1634 ]
1635 );
1636 }
1637
1638 #[tokio::test]
1639 async fn list_hides_incomplete_uploads() {
1640 let root = TempDir::new().unwrap();
1641 let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1642 let location = Path::from("some_file");
1643
1644 let data = PutPayload::from("arbitrary data");
1645 let mut u1 = integration.put_multipart(&location).await.unwrap();
1646 u1.put_part(data.clone()).await.unwrap();
1647
1648 let mut u2 = integration.put_multipart(&location).await.unwrap();
1649 u2.put_part(data).await.unwrap();
1650
1651 let list = flatten_list_stream(&integration, None).await.unwrap();
1652 assert_eq!(list.len(), 0);
1653
1654 assert_eq!(
1655 integration
1656 .list_with_delimiter(None)
1657 .await
1658 .unwrap()
1659 .objects
1660 .len(),
1661 0
1662 );
1663 }
1664
1665 #[tokio::test]
1666 async fn test_path_with_offset() {
1667 let root = TempDir::new().unwrap();
1668 let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1669
1670 let root_path = root.path();
1671 for i in 0..5 {
1672 let filename = format!("test{i}.parquet");
1673 let file = root_path.join(filename);
1674 std::fs::write(file, "test").unwrap();
1675 }
1676 let filter_str = "test";
1677 let filter = String::from(filter_str);
1678 let offset_str = filter + "1";
1679 let offset = Path::from(offset_str.clone());
1680
1681 let res = integration.list_with_offset(None, &offset);
1683 let offset_paths: Vec<_> = res.map_ok(|x| x.location).try_collect().await.unwrap();
1684 let mut offset_files: Vec<_> = offset_paths
1685 .iter()
1686 .map(|x| String::from(x.filename().unwrap()))
1687 .collect();
1688
1689 let files = fs::read_dir(root_path).unwrap();
1691 let filtered_files = files
1692 .filter_map(Result::ok)
1693 .filter_map(|d| {
1694 d.file_name().to_str().and_then(|f| {
1695 if f.contains(filter_str) {
1696 Some(String::from(f))
1697 } else {
1698 None
1699 }
1700 })
1701 })
1702 .collect::<Vec<_>>();
1703
1704 let mut expected_offset_files: Vec<_> = filtered_files
1705 .iter()
1706 .filter(|s| **s > offset_str)
1707 .cloned()
1708 .collect();
1709
1710 fn do_vecs_match<T: PartialEq>(a: &[T], b: &[T]) -> bool {
1711 let matching = a.iter().zip(b.iter()).filter(|&(a, b)| a == b).count();
1712 matching == a.len() && matching == b.len()
1713 }
1714
1715 offset_files.sort();
1716 expected_offset_files.sort();
1717
1718 assert_eq!(offset_files.len(), expected_offset_files.len());
1722 assert!(do_vecs_match(&expected_offset_files, &offset_files));
1723 }
1724
1725 #[tokio::test]
1726 async fn filesystem_filename_with_percent() {
1727 let temp_dir = TempDir::new().unwrap();
1728 let integration = LocalFileSystem::new_with_prefix(temp_dir.path()).unwrap();
1729 let filename = "L%3ABC.parquet";
1730
1731 std::fs::write(temp_dir.path().join(filename), "foo").unwrap();
1732
1733 let res: Vec<_> = integration.list(None).try_collect().await.unwrap();
1734 assert_eq!(res.len(), 1);
1735 assert_eq!(res[0].location.as_ref(), filename);
1736
1737 let res = integration.list_with_delimiter(None).await.unwrap();
1738 assert_eq!(res.objects.len(), 1);
1739 assert_eq!(res.objects[0].location.as_ref(), filename);
1740 }
1741
1742 #[tokio::test]
1743 async fn relative_paths() {
1744 LocalFileSystem::new_with_prefix(".").unwrap();
1745 LocalFileSystem::new_with_prefix("..").unwrap();
1746 LocalFileSystem::new_with_prefix("../..").unwrap();
1747
1748 let integration = LocalFileSystem::new();
1749 let path = Path::from_filesystem_path(".").unwrap();
1750 integration.list_with_delimiter(Some(&path)).await.unwrap();
1751 }
1752
1753 #[test]
1754 fn test_valid_path() {
1755 let cases = [
1756 ("foo#123/test.txt", true),
1757 ("foo#123/test#23.txt", true),
1758 ("foo#123/test#34", false),
1759 ("foo😁/test#34", false),
1760 ("foo/test#😁34", true),
1761 ];
1762
1763 for (case, expected) in cases {
1764 let path = Path::parse(case).unwrap();
1765 assert_eq!(is_valid_file_path(&path), expected);
1766 }
1767 }
1768
1769 #[tokio::test]
1770 async fn test_intermediate_files() {
1771 let root = TempDir::new().unwrap();
1772 let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1773
1774 let a = Path::parse("foo#123/test.txt").unwrap();
1775 integration.put(&a, "test".into()).await.unwrap();
1776
1777 let list = flatten_list_stream(&integration, None).await.unwrap();
1778 assert_eq!(list, vec![a.clone()]);
1779
1780 std::fs::write(root.path().join("bar#123"), "test").unwrap();
1781
1782 let list = flatten_list_stream(&integration, None).await.unwrap();
1784 assert_eq!(list, vec![a.clone()]);
1785
1786 let b = Path::parse("bar#123").unwrap();
1787 let err = integration.get(&b).await.unwrap_err().to_string();
1788 assert_eq!(
1789 err,
1790 "Generic LocalFileSystem error: Filenames containing trailing '/#\\d+/' are not supported: bar#123"
1791 );
1792
1793 let c = Path::parse("foo#123.txt").unwrap();
1794 integration.put(&c, "test".into()).await.unwrap();
1795
1796 let mut list = flatten_list_stream(&integration, None).await.unwrap();
1797 list.sort_unstable();
1798 assert_eq!(list, vec![c, a]);
1799 }
1800
1801 #[tokio::test]
1802 #[cfg(target_os = "windows")]
1803 async fn filesystem_filename_with_colon() {
1804 let root = TempDir::new().unwrap();
1805 let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1806 let path = Path::parse("file%3Aname.parquet").unwrap();
1807 let location = Path::parse("file:name.parquet").unwrap();
1808
1809 integration.put(&location, "test".into()).await.unwrap();
1810 let list = flatten_list_stream(&integration, None).await.unwrap();
1811 assert_eq!(list, vec![path.clone()]);
1812
1813 let result = integration
1814 .get(&location)
1815 .await
1816 .unwrap()
1817 .bytes()
1818 .await
1819 .unwrap();
1820 assert_eq!(result, Bytes::from("test"));
1821 }
1822
1823 #[tokio::test]
1824 async fn delete_dirs_automatically() {
1825 let root = TempDir::new().unwrap();
1826 let integration = LocalFileSystem::new_with_prefix(root.path())
1827 .unwrap()
1828 .with_automatic_cleanup(true);
1829 let location = Path::from("nested/file/test_file");
1830 let data = Bytes::from("arbitrary data");
1831
1832 integration
1833 .put(&location, data.clone().into())
1834 .await
1835 .unwrap();
1836
1837 let read_data = integration
1838 .get(&location)
1839 .await
1840 .unwrap()
1841 .bytes()
1842 .await
1843 .unwrap();
1844
1845 assert_eq!(&*read_data, data);
1846 assert!(fs::read_dir(root.path()).unwrap().count() > 0);
1847 integration.delete(&location).await.unwrap();
1848 assert!(fs::read_dir(root.path()).unwrap().count() == 0);
1849 }
1850}
1851
1852#[cfg(not(target_arch = "wasm32"))]
1853#[cfg(test)]
1854mod not_wasm_tests {
1855 use std::time::Duration;
1856 use tempfile::TempDir;
1857
1858 use crate::local::LocalFileSystem;
1859 use crate::{ObjectStoreExt, Path, PutPayload};
1860
1861 #[tokio::test]
1862 async fn test_cleanup_intermediate_files() {
1863 let root = TempDir::new().unwrap();
1864 let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1865
1866 let location = Path::from("some_file");
1867 let data = PutPayload::from_static(b"hello");
1868 let mut upload = integration.put_multipart(&location).await.unwrap();
1869 upload.put_part(data).await.unwrap();
1870
1871 let file_count = std::fs::read_dir(root.path()).unwrap().count();
1872 assert_eq!(file_count, 1);
1873 drop(upload);
1874
1875 for _ in 0..100 {
1876 tokio::time::sleep(Duration::from_millis(1)).await;
1877 let file_count = std::fs::read_dir(root.path()).unwrap().count();
1878 if file_count == 0 {
1879 return;
1880 }
1881 }
1882 panic!("Failed to cleanup file in 100ms")
1883 }
1884}
1885
1886#[cfg(target_family = "unix")]
1887#[cfg(test)]
1888mod unix_test {
1889 use std::fs::OpenOptions;
1890
1891 use nix::sys::stat;
1892 use nix::unistd;
1893 use tempfile::TempDir;
1894
1895 use crate::local::LocalFileSystem;
1896 use crate::{ObjectStoreExt, Path};
1897
1898 #[tokio::test]
1899 async fn test_fifo() {
1900 let filename = "some_file";
1901 let root = TempDir::new().unwrap();
1902 let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1903 let path = root.path().join(filename);
1904 unistd::mkfifo(&path, stat::Mode::S_IRWXU).unwrap();
1905
1906 let spawned =
1908 tokio::task::spawn_blocking(|| OpenOptions::new().write(true).open(path).unwrap());
1909
1910 let location = Path::from(filename);
1911 integration.head(&location).await.unwrap();
1912 integration.get(&location).await.unwrap();
1913
1914 spawned.await.unwrap();
1915 }
1916}