Skip to main content

object_store/
chunked.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! A [`ChunkedStore`] that can be used to test streaming behaviour
19
20use std::fmt::{Debug, Display, Formatter};
21use std::ops::Range;
22use std::sync::Arc;
23
24use async_trait::async_trait;
25use bytes::{BufMut, Bytes, BytesMut};
26use futures_util::StreamExt;
27use futures_util::stream::BoxStream;
28
29use crate::path::Path;
30use crate::{
31    CopyOptions, GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta,
32    ObjectStore, PutMultipartOptions, PutOptions, PutResult, RenameOptions,
33};
34use crate::{PutPayload, Result};
35
36/// Wraps a [`ObjectStore`] and makes its get response return chunks
37/// in a controllable manner.
38///
39/// A `ChunkedStore` makes the memory consumption and performance of
40/// the wrapped [`ObjectStore`] worse. It is intended for use within
41/// tests, to control the chunks in the produced output streams. For
42/// example, it is used to verify the delimiting logic in
43/// newline_delimited_stream.
44#[derive(Debug)]
45pub struct ChunkedStore {
46    inner: Arc<dyn ObjectStore>,
47    chunk_size: usize, // chunks are in memory, so we use usize not u64
48}
49
50impl ChunkedStore {
51    /// Creates a new [`ChunkedStore`] with the specified chunk_size
52    pub fn new(inner: Arc<dyn ObjectStore>, chunk_size: usize) -> Self {
53        Self { inner, chunk_size }
54    }
55}
56
57impl Display for ChunkedStore {
58    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
59        write!(f, "ChunkedStore({})", self.inner)
60    }
61}
62
63#[async_trait]
64#[deny(clippy::missing_trait_methods)]
65impl ObjectStore for ChunkedStore {
66    async fn put_opts(
67        &self,
68        location: &Path,
69        payload: PutPayload,
70        opts: PutOptions,
71    ) -> Result<PutResult> {
72        self.inner.put_opts(location, payload, opts).await
73    }
74
75    async fn put_multipart_opts(
76        &self,
77        location: &Path,
78        opts: PutMultipartOptions,
79    ) -> Result<Box<dyn MultipartUpload>> {
80        self.inner.put_multipart_opts(location, opts).await
81    }
82
83    async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
84        let r = self.inner.get_opts(location, options).await?;
85        let stream = match r.payload {
86            #[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
87            GetResultPayload::File(file, path) => {
88                crate::local::chunked_stream(file, path, r.range.clone(), self.chunk_size)
89            }
90            GetResultPayload::Stream(stream) => {
91                let buffer = BytesMut::new();
92                futures_util::stream::unfold(
93                    (stream, buffer, false, self.chunk_size),
94                    |(mut stream, mut buffer, mut exhausted, chunk_size)| async move {
95                        // Keep accumulating bytes until we reach capacity as long as
96                        // the stream can provide them:
97                        if exhausted {
98                            return None;
99                        }
100                        while buffer.len() < chunk_size {
101                            match stream.next().await {
102                                None => {
103                                    exhausted = true;
104                                    let slice = buffer.split_off(0).freeze();
105                                    return Some((
106                                        Ok(slice),
107                                        (stream, buffer, exhausted, chunk_size),
108                                    ));
109                                }
110                                Some(Ok(bytes)) => {
111                                    buffer.put(bytes);
112                                }
113                                Some(Err(e)) => {
114                                    return Some((
115                                        Err(crate::Error::Generic {
116                                            store: "ChunkedStore",
117                                            source: Box::new(e),
118                                        }),
119                                        (stream, buffer, exhausted, chunk_size),
120                                    ));
121                                }
122                            };
123                        }
124                        // Return the chunked values as the next value in the stream
125                        let slice = buffer.split_to(chunk_size).freeze();
126                        Some((Ok(slice), (stream, buffer, exhausted, chunk_size)))
127                    },
128                )
129                .boxed()
130            }
131        };
132        Ok(GetResult {
133            payload: GetResultPayload::Stream(stream),
134            ..r
135        })
136    }
137
138    async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
139        self.inner.get_ranges(location, ranges).await
140    }
141
142    fn delete_stream(
143        &self,
144        locations: BoxStream<'static, Result<Path>>,
145    ) -> BoxStream<'static, Result<Path>> {
146        self.inner.delete_stream(locations)
147    }
148
149    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
150        self.inner.list(prefix)
151    }
152
153    fn list_with_offset(
154        &self,
155        prefix: Option<&Path>,
156        offset: &Path,
157    ) -> BoxStream<'static, Result<ObjectMeta>> {
158        self.inner.list_with_offset(prefix, offset)
159    }
160
161    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
162        self.inner.list_with_delimiter(prefix).await
163    }
164
165    async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) -> Result<()> {
166        self.inner.copy_opts(from, to, options).await
167    }
168
169    async fn rename_opts(&self, from: &Path, to: &Path, options: RenameOptions) -> Result<()> {
170        self.inner.rename_opts(from, to, options).await
171    }
172}
173
174#[cfg(test)]
175mod tests {
176    use futures_util::StreamExt;
177
178    use crate::ObjectStoreExt;
179    #[cfg(feature = "fs")]
180    use crate::integration::*;
181    #[cfg(feature = "fs")]
182    use crate::local::LocalFileSystem;
183    use crate::memory::InMemory;
184    use crate::path::Path;
185
186    use super::*;
187
188    #[tokio::test]
189    async fn test_chunked_basic() {
190        let location = Path::parse("test").unwrap();
191        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
192        store.put(&location, vec![0; 1001].into()).await.unwrap();
193
194        for chunk_size in [10, 20, 31] {
195            let store = ChunkedStore::new(Arc::clone(&store), chunk_size);
196            let mut s = match store.get(&location).await.unwrap().payload {
197                GetResultPayload::Stream(s) => s,
198                _ => unreachable!(),
199            };
200
201            let mut remaining = 1001;
202            while let Some(next) = s.next().await {
203                let size = next.unwrap().len() as u64;
204                let expected = remaining.min(chunk_size as u64);
205                assert_eq!(size, expected);
206                remaining -= expected;
207            }
208            assert_eq!(remaining, 0);
209        }
210    }
211
212    #[cfg(feature = "fs")]
213    #[tokio::test]
214    async fn test_chunked() {
215        let temporary = tempfile::tempdir().unwrap();
216        let integrations: &[Arc<dyn ObjectStore>] = &[
217            Arc::new(InMemory::new()),
218            Arc::new(LocalFileSystem::new_with_prefix(temporary.path()).unwrap()),
219        ];
220
221        for integration in integrations {
222            let integration = ChunkedStore::new(Arc::clone(integration), 100);
223
224            put_get_delete_list(&integration).await;
225            get_opts(&integration).await;
226            list_uses_directories_correctly(&integration).await;
227            list_with_delimiter(&integration).await;
228            rename_and_copy(&integration).await;
229            copy_if_not_exists(&integration).await;
230            stream_get(&integration).await;
231        }
232    }
233}