Skip to main content

object_store/
payload.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use bytes::Bytes;
19use std::sync::Arc;
20
21/// A cheaply cloneable, ordered collection of [`Bytes`]
22#[derive(Debug, Clone)]
23pub struct PutPayload(Arc<[Bytes]>);
24
25impl Default for PutPayload {
26    fn default() -> Self {
27        Self(Arc::new([]))
28    }
29}
30
31impl PutPayload {
32    /// Create a new empty [`PutPayload`]
33    pub fn new() -> Self {
34        Self::default()
35    }
36
37    /// Creates a [`PutPayload`] from a static slice
38    pub fn from_static(s: &'static [u8]) -> Self {
39        s.into()
40    }
41
42    /// Creates a [`PutPayload`] from a [`Bytes`]
43    pub fn from_bytes(s: Bytes) -> Self {
44        s.into()
45    }
46
47    /// Returns the total length of the [`Bytes`] in this payload
48    pub fn content_length(&self) -> usize {
49        self.0.iter().map(|b| b.len()).sum()
50    }
51
52    /// Returns an iterator over the [`Bytes`] in this payload
53    pub fn iter(&self) -> PutPayloadIter<'_> {
54        PutPayloadIter(self.0.iter())
55    }
56}
57
58impl AsRef<[Bytes]> for PutPayload {
59    fn as_ref(&self) -> &[Bytes] {
60        self.0.as_ref()
61    }
62}
63
64impl<'a> IntoIterator for &'a PutPayload {
65    type Item = &'a Bytes;
66    type IntoIter = PutPayloadIter<'a>;
67
68    fn into_iter(self) -> Self::IntoIter {
69        self.iter()
70    }
71}
72
73impl IntoIterator for PutPayload {
74    type Item = Bytes;
75    type IntoIter = PutPayloadIntoIter;
76
77    fn into_iter(self) -> Self::IntoIter {
78        PutPayloadIntoIter {
79            payload: self,
80            idx: 0,
81        }
82    }
83}
84
85/// An iterator over [`PutPayload`]
86#[derive(Debug)]
87pub struct PutPayloadIter<'a>(std::slice::Iter<'a, Bytes>);
88
89impl<'a> Iterator for PutPayloadIter<'a> {
90    type Item = &'a Bytes;
91
92    fn next(&mut self) -> Option<Self::Item> {
93        self.0.next()
94    }
95
96    fn size_hint(&self) -> (usize, Option<usize>) {
97        self.0.size_hint()
98    }
99}
100
101/// An owning iterator of [`PutPayload`]
102#[derive(Debug)]
103pub struct PutPayloadIntoIter {
104    payload: PutPayload,
105    idx: usize,
106}
107
108impl Iterator for PutPayloadIntoIter {
109    type Item = Bytes;
110
111    fn next(&mut self) -> Option<Self::Item> {
112        let p = self.payload.0.get(self.idx)?.clone();
113        self.idx += 1;
114        Some(p)
115    }
116
117    fn size_hint(&self) -> (usize, Option<usize>) {
118        let l = self.payload.0.len() - self.idx;
119        (l, Some(l))
120    }
121}
122
123impl From<Bytes> for PutPayload {
124    fn from(value: Bytes) -> Self {
125        Self(Arc::new([value]))
126    }
127}
128
129impl From<Vec<u8>> for PutPayload {
130    fn from(value: Vec<u8>) -> Self {
131        Self(Arc::new([value.into()]))
132    }
133}
134
135impl From<&'static str> for PutPayload {
136    fn from(value: &'static str) -> Self {
137        Bytes::from(value).into()
138    }
139}
140
141impl From<&'static [u8]> for PutPayload {
142    fn from(value: &'static [u8]) -> Self {
143        Bytes::from(value).into()
144    }
145}
146
147impl From<String> for PutPayload {
148    fn from(value: String) -> Self {
149        Bytes::from(value).into()
150    }
151}
152
153impl FromIterator<u8> for PutPayload {
154    fn from_iter<T: IntoIterator<Item = u8>>(iter: T) -> Self {
155        Bytes::from_iter(iter).into()
156    }
157}
158
159impl FromIterator<Bytes> for PutPayload {
160    fn from_iter<T: IntoIterator<Item = Bytes>>(iter: T) -> Self {
161        Self(iter.into_iter().collect())
162    }
163}
164
165impl From<PutPayload> for Bytes {
166    fn from(value: PutPayload) -> Self {
167        match value.0.len() {
168            0 => Self::new(),
169            1 => value.0[0].clone(),
170            _ => {
171                let mut buf = Vec::with_capacity(value.content_length());
172                value.iter().for_each(|x| buf.extend_from_slice(x));
173                buf.into()
174            }
175        }
176    }
177}
178
179/// A builder for [`PutPayload`] that avoids reallocating memory
180///
181/// Data is allocated in fixed blocks, which are flushed to [`Bytes`] once full.
182/// Unlike [`Vec`] this avoids needing to repeatedly reallocate blocks of memory,
183/// which typically involves copying all the previously written data to a new
184/// contiguous memory region.
185#[derive(Debug)]
186pub struct PutPayloadMut {
187    len: usize,
188    completed: Vec<Bytes>,
189    in_progress: Vec<u8>,
190    block_size: usize,
191}
192
193impl Default for PutPayloadMut {
194    fn default() -> Self {
195        Self {
196            len: 0,
197            completed: vec![],
198            in_progress: vec![],
199
200            block_size: 8 * 1024,
201        }
202    }
203}
204
205impl PutPayloadMut {
206    /// Create a new [`PutPayloadMut`]
207    pub fn new() -> Self {
208        Self::default()
209    }
210
211    /// Configures the minimum allocation size
212    ///
213    /// Defaults to 8KB
214    pub fn with_block_size(self, block_size: usize) -> Self {
215        Self { block_size, ..self }
216    }
217
218    /// Write bytes into this [`PutPayloadMut`]
219    ///
220    /// If there is an in-progress block, data will be first written to it, flushing
221    /// it to [`Bytes`] once full. If data remains to be written, a new block of memory
222    /// of at least the configured block size will be allocated, to hold the remaining data.
223    pub fn extend_from_slice(&mut self, slice: &[u8]) {
224        let remaining = self.in_progress.capacity() - self.in_progress.len();
225        let to_copy = remaining.min(slice.len());
226
227        self.in_progress.extend_from_slice(&slice[..to_copy]);
228        if self.in_progress.capacity() == self.in_progress.len() {
229            let new_cap = self.block_size.max(slice.len() - to_copy);
230            let completed = std::mem::replace(&mut self.in_progress, Vec::with_capacity(new_cap));
231            if !completed.is_empty() {
232                self.completed.push(completed.into())
233            }
234            self.in_progress.extend_from_slice(&slice[to_copy..])
235        }
236        self.len += slice.len();
237    }
238
239    /// Append a [`Bytes`] to this [`PutPayloadMut`] without copying
240    ///
241    /// This will close any currently buffered block populated by [`Self::extend_from_slice`],
242    /// and append `bytes` to this payload without copying.
243    pub fn push(&mut self, bytes: Bytes) {
244        if !self.in_progress.is_empty() {
245            let completed = std::mem::take(&mut self.in_progress);
246            self.completed.push(completed.into())
247        }
248        self.len += bytes.len();
249        self.completed.push(bytes);
250    }
251
252    /// Returns `true` if this [`PutPayloadMut`] contains no bytes
253    #[inline]
254    pub fn is_empty(&self) -> bool {
255        self.len == 0
256    }
257
258    /// Returns the total length of the [`Bytes`] in this payload
259    #[inline]
260    pub fn content_length(&self) -> usize {
261        self.len
262    }
263
264    /// Convert into [`PutPayload`]
265    pub fn freeze(mut self) -> PutPayload {
266        if !self.in_progress.is_empty() {
267            let completed = std::mem::take(&mut self.in_progress).into();
268            self.completed.push(completed);
269        }
270        PutPayload(self.completed.into())
271    }
272}
273
274impl From<PutPayloadMut> for PutPayload {
275    fn from(value: PutPayloadMut) -> Self {
276        value.freeze()
277    }
278}
279
280#[cfg(test)]
281mod test {
282    use crate::PutPayloadMut;
283
284    #[test]
285    fn test_put_payload() {
286        let mut chunk = PutPayloadMut::new().with_block_size(23);
287        chunk.extend_from_slice(&[1; 16]);
288        chunk.extend_from_slice(&[2; 32]);
289        chunk.extend_from_slice(&[2; 5]);
290        chunk.extend_from_slice(&[2; 21]);
291        chunk.extend_from_slice(&[2; 40]);
292        chunk.extend_from_slice(&[0; 0]);
293        chunk.push("foobar".into());
294
295        let payload = chunk.freeze();
296        assert_eq!(payload.content_length(), 120);
297
298        let chunks = payload.as_ref();
299        assert_eq!(chunks.len(), 6);
300
301        assert_eq!(chunks[0].len(), 23);
302        assert_eq!(chunks[1].len(), 25); // 32 - (23 - 16)
303        assert_eq!(chunks[2].len(), 23);
304        assert_eq!(chunks[3].len(), 23);
305        assert_eq!(chunks[4].len(), 20);
306        assert_eq!(chunks[5].len(), 6);
307    }
308
309    #[test]
310    fn test_content_length() {
311        let mut chunk = PutPayloadMut::new();
312        chunk.push(vec![0; 23].into());
313        assert_eq!(chunk.content_length(), 23);
314        chunk.extend_from_slice(&[0; 4]);
315        assert_eq!(chunk.content_length(), 27);
316        chunk.push(vec![0; 121].into());
317        assert_eq!(chunk.content_length(), 148);
318        let payload = chunk.freeze();
319        assert_eq!(payload.content_length(), 148);
320    }
321}