Skip to main content

object_store/
util.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Common logic for interacting with remote object stores
19use std::{
20    fmt::Display,
21    ops::{Range, RangeBounds},
22};
23
24use super::Result;
25use bytes::Bytes;
26use futures_util::{Stream, TryStreamExt, stream::StreamExt};
27
28#[cfg(any(feature = "azure", feature = "http"))]
29pub(crate) static RFC1123_FMT: &str = "%a, %d %h %Y %T GMT";
30
31// deserialize dates according to rfc1123
32#[cfg(any(feature = "azure", feature = "http"))]
33pub(crate) fn deserialize_rfc1123<'de, D>(
34    deserializer: D,
35) -> Result<chrono::DateTime<chrono::Utc>, D::Error>
36where
37    D: serde::Deserializer<'de>,
38{
39    let s: String = serde::Deserialize::deserialize(deserializer)?;
40    let naive =
41        chrono::NaiveDateTime::parse_from_str(&s, RFC1123_FMT).map_err(serde::de::Error::custom)?;
42    Ok(chrono::TimeZone::from_utc_datetime(&chrono::Utc, &naive))
43}
44
45#[cfg(any(feature = "aws", feature = "azure"))]
46pub(crate) fn hmac_sha256(secret: impl AsRef<[u8]>, bytes: impl AsRef<[u8]>) -> ring::hmac::Tag {
47    let key = ring::hmac::Key::new(ring::hmac::HMAC_SHA256, secret.as_ref());
48    ring::hmac::sign(&key, bytes.as_ref())
49}
50
51/// Collect a stream into [`Bytes`] avoiding copying in the event of a single chunk
52pub async fn collect_bytes<S, E>(mut stream: S, size_hint: Option<u64>) -> Result<Bytes, E>
53where
54    E: Send,
55    S: Stream<Item = Result<Bytes, E>> + Send + Unpin,
56{
57    let first = stream.next().await.transpose()?.unwrap_or_default();
58
59    // Avoid copying if single response
60    match stream.next().await.transpose()? {
61        None => Ok(first),
62        Some(second) => {
63            let size_hint = size_hint.unwrap_or_else(|| first.len() as u64 + second.len() as u64);
64
65            let mut buf = Vec::with_capacity(size_hint as usize);
66            buf.extend_from_slice(&first);
67            buf.extend_from_slice(&second);
68            while let Some(maybe_bytes) = stream.next().await {
69                buf.extend_from_slice(&maybe_bytes?);
70            }
71
72            Ok(buf.into())
73        }
74    }
75}
76
77#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
78/// Takes a function and spawns it to a tokio blocking pool if available
79pub(crate) async fn maybe_spawn_blocking<F, T>(f: F) -> Result<T>
80where
81    F: FnOnce() -> Result<T> + Send + 'static,
82    T: Send + 'static,
83{
84    match tokio::runtime::Handle::try_current() {
85        Ok(runtime) => runtime.spawn_blocking(f).await?,
86        Err(_) => f(),
87    }
88}
89
90/// Range requests with a gap less than or equal to this,
91/// will be coalesced into a single request by [`coalesce_ranges`]
92pub const OBJECT_STORE_COALESCE_DEFAULT: u64 = 1024 * 1024;
93
94/// Up to this number of range requests will be performed in parallel by [`coalesce_ranges`]
95pub(crate) const OBJECT_STORE_COALESCE_PARALLEL: usize = 10;
96
97/// Takes a function `fetch` that can fetch a range of bytes and uses this to
98/// fetch the provided byte `ranges`
99///
100/// To improve performance it will:
101///
102/// * Combine ranges less than `coalesce` bytes apart into a single call to `fetch`
103/// * Make multiple `fetch` requests in parallel (up to maximum of 10)
104///
105pub async fn coalesce_ranges<F, E, Fut>(
106    ranges: &[Range<u64>],
107    fetch: F,
108    coalesce: u64,
109) -> Result<Vec<Bytes>, E>
110where
111    F: Send + FnMut(Range<u64>) -> Fut,
112    E: Send,
113    Fut: std::future::Future<Output = Result<Bytes, E>> + Send,
114{
115    let fetch_ranges = merge_ranges(ranges, coalesce);
116
117    let fetched: Vec<_> = futures_util::stream::iter(fetch_ranges.iter().cloned())
118        .map(fetch)
119        .buffered(OBJECT_STORE_COALESCE_PARALLEL)
120        .try_collect()
121        .await?;
122
123    Ok(ranges
124        .iter()
125        .map(|range| {
126            let idx = fetch_ranges.partition_point(|v| v.start <= range.start) - 1;
127            let fetch_range = &fetch_ranges[idx];
128            let fetch_bytes = &fetched[idx];
129
130            let start = range.start - fetch_range.start;
131            let end = range.end - fetch_range.start;
132            let range = (start as usize)..(end as usize).min(fetch_bytes.len());
133            fetch_bytes.slice(range)
134        })
135        .collect())
136}
137
138/// Returns a sorted list of ranges that cover `ranges`
139fn merge_ranges(ranges: &[Range<u64>], coalesce: u64) -> Vec<Range<u64>> {
140    if ranges.is_empty() {
141        return vec![];
142    }
143
144    let mut ranges = ranges.to_vec();
145    ranges.sort_unstable_by_key(|range| range.start);
146
147    let mut ret = Vec::with_capacity(ranges.len());
148    let mut start_idx = 0;
149    let mut end_idx = 1;
150
151    while start_idx != ranges.len() {
152        let mut range_end = ranges[start_idx].end;
153
154        while end_idx != ranges.len()
155            && ranges[end_idx]
156                .start
157                .checked_sub(range_end)
158                .map(|delta| delta <= coalesce)
159                .unwrap_or(true)
160        {
161            range_end = range_end.max(ranges[end_idx].end);
162            end_idx += 1;
163        }
164
165        let start = ranges[start_idx].start;
166        let end = range_end;
167        ret.push(start..end);
168
169        start_idx = end_idx;
170        end_idx += 1;
171    }
172
173    ret
174}
175
176/// Request only a portion of an object's bytes
177///
178/// These can be created from [usize] ranges, like
179///
180/// ```rust
181/// # use object_store::GetRange;
182/// let range1: GetRange = (50..150).into();
183/// let range2: GetRange = (50..=150).into();
184/// let range3: GetRange = (50..).into();
185/// let range4: GetRange = (..150).into();
186/// ```
187///
188/// Implementations may wish to inspect [`GetResult`] for the exact byte
189/// range returned.
190///
191/// [`GetResult`]: crate::GetResult
192#[derive(Debug, PartialEq, Eq, Clone)]
193pub enum GetRange {
194    /// Request a specific range of bytes
195    ///
196    /// If the given range is zero-length or starts after the end of the object,
197    /// an error will be returned. Additionally, if the range ends after the end
198    /// of the object, the entire remainder of the object will be returned.
199    /// Otherwise, the exact requested range will be returned.
200    ///
201    /// Note that range is u64 (i.e., not usize),
202    /// as `object_store` supports 32-bit architectures such as WASM
203    Bounded(Range<u64>),
204    /// Request all bytes starting from a given byte offset
205    Offset(u64),
206    /// Request up to the last n bytes
207    Suffix(u64),
208}
209
210#[derive(Debug, thiserror::Error)]
211#[non_exhaustive]
212pub enum InvalidGetRange {
213    #[error("Wanted range starting at {requested}, but object was only {length} bytes long")]
214    StartTooLarge { requested: u64, length: u64 },
215
216    #[error("Range started at {start} and ended at {end}")]
217    Inconsistent { start: u64, end: u64 },
218
219    #[error("Range {requested} is larger than system memory limit {max}")]
220    TooLarge { requested: u64, max: u64 },
221}
222
223impl GetRange {
224    /// Check if the range is valid.
225    pub fn is_valid(&self) -> Result<(), InvalidGetRange> {
226        if let Self::Bounded(r) = self {
227            if r.end <= r.start {
228                return Err(InvalidGetRange::Inconsistent {
229                    start: r.start,
230                    end: r.end,
231                });
232            }
233            if (r.end - r.start) > usize::MAX as u64 {
234                return Err(InvalidGetRange::TooLarge {
235                    requested: r.start,
236                    max: usize::MAX as u64,
237                });
238            }
239        }
240        Ok(())
241    }
242
243    /// Convert to a [`Range`] if [valid](Self::is_valid).
244    pub fn as_range(&self, len: u64) -> Result<Range<u64>, InvalidGetRange> {
245        self.is_valid()?;
246        match self {
247            Self::Bounded(r) => {
248                if r.start >= len {
249                    Err(InvalidGetRange::StartTooLarge {
250                        requested: r.start,
251                        length: len,
252                    })
253                } else if r.end > len {
254                    Ok(r.start..len)
255                } else {
256                    Ok(r.clone())
257                }
258            }
259            Self::Offset(o) => {
260                if *o >= len {
261                    Err(InvalidGetRange::StartTooLarge {
262                        requested: *o,
263                        length: len,
264                    })
265                } else {
266                    Ok(*o..len)
267                }
268            }
269            Self::Suffix(n) => Ok(len.saturating_sub(*n)..len),
270        }
271    }
272}
273
274impl Display for GetRange {
275    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
276        match self {
277            Self::Bounded(r) => write!(f, "bytes={}-{}", r.start, r.end - 1),
278            Self::Offset(o) => write!(f, "bytes={o}-"),
279            Self::Suffix(n) => write!(f, "bytes=-{n}"),
280        }
281    }
282}
283
284impl<T: RangeBounds<u64>> From<T> for GetRange {
285    fn from(value: T) -> Self {
286        use std::ops::Bound::*;
287        let first = match value.start_bound() {
288            Included(i) => *i,
289            Excluded(i) => i + 1,
290            Unbounded => 0,
291        };
292        match value.end_bound() {
293            Included(i) => Self::Bounded(first..(i + 1)),
294            Excluded(i) => Self::Bounded(first..*i),
295            Unbounded => Self::Offset(first),
296        }
297    }
298}
299// http://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html
300//
301// Do not URI-encode any of the unreserved characters that RFC 3986 defines:
302// A-Z, a-z, 0-9, hyphen ( - ), underscore ( _ ), period ( . ), and tilde ( ~ ).
303#[cfg(any(feature = "aws", feature = "gcp"))]
304pub(crate) const STRICT_ENCODE_SET: percent_encoding::AsciiSet = percent_encoding::NON_ALPHANUMERIC
305    .remove(b'-')
306    .remove(b'.')
307    .remove(b'_')
308    .remove(b'~');
309
310/// Computes the SHA256 digest of `body` returned as a hex encoded string
311#[cfg(any(feature = "aws", feature = "gcp"))]
312pub(crate) fn hex_digest(bytes: &[u8]) -> String {
313    let digest = ring::digest::digest(&ring::digest::SHA256, bytes);
314    hex_encode(digest.as_ref())
315}
316
317/// Returns `bytes` as a lower-case hex encoded string
318#[cfg(any(feature = "aws", feature = "gcp"))]
319pub(crate) fn hex_encode(bytes: &[u8]) -> String {
320    use std::fmt::Write;
321    let mut out = String::with_capacity(bytes.len() * 2);
322    for byte in bytes {
323        // String writing is infallible
324        let _ = write!(out, "{byte:02x}");
325    }
326    out
327}
328
329#[cfg(test)]
330mod tests {
331    use crate::Error;
332
333    use super::*;
334    use rand::{RngExt, rng};
335    use std::ops::Range;
336
337    /// Calls coalesce_ranges and validates the returned data is correct
338    ///
339    /// Returns the fetched ranges
340    async fn do_fetch(ranges: Vec<Range<u64>>, coalesce: u64) -> Vec<Range<u64>> {
341        let max = ranges.iter().map(|x| x.end).max().unwrap_or(0);
342        let src: Vec<_> = (0..max).map(|x| x as u8).collect();
343
344        let mut fetches = vec![];
345        let coalesced = coalesce_ranges::<_, Error, _>(
346            &ranges,
347            |range| {
348                fetches.push(range.clone());
349                let start = usize::try_from(range.start).unwrap();
350                let end = usize::try_from(range.end).unwrap();
351                futures_util::future::ready(Ok(Bytes::from(src[start..end].to_vec())))
352            },
353            coalesce,
354        )
355        .await
356        .unwrap();
357
358        assert_eq!(ranges.len(), coalesced.len());
359        for (range, bytes) in ranges.iter().zip(coalesced) {
360            assert_eq!(
361                bytes.as_ref(),
362                &src[usize::try_from(range.start).unwrap()..usize::try_from(range.end).unwrap()]
363            );
364        }
365        fetches
366    }
367
368    #[tokio::test]
369    async fn test_coalesce_ranges() {
370        let fetches = do_fetch(vec![], 0).await;
371        assert!(fetches.is_empty());
372
373        let fetches = do_fetch(vec![0..3; 1], 0).await;
374        assert_eq!(fetches, vec![0..3]);
375
376        let fetches = do_fetch(vec![0..2, 3..5], 0).await;
377        assert_eq!(fetches, vec![0..2, 3..5]);
378
379        let fetches = do_fetch(vec![0..1, 1..2], 0).await;
380        assert_eq!(fetches, vec![0..2]);
381
382        let fetches = do_fetch(vec![0..1, 2..72], 1).await;
383        assert_eq!(fetches, vec![0..72]);
384
385        let fetches = do_fetch(vec![0..1, 56..72, 73..75], 1).await;
386        assert_eq!(fetches, vec![0..1, 56..75]);
387
388        let fetches = do_fetch(vec![0..1, 5..6, 7..9, 2..3, 4..6], 1).await;
389        assert_eq!(fetches, vec![0..9]);
390
391        let fetches = do_fetch(vec![0..1, 5..6, 7..9, 2..3, 4..6], 1).await;
392        assert_eq!(fetches, vec![0..9]);
393
394        let fetches = do_fetch(vec![0..1, 6..7, 8..9, 10..14, 9..10], 4).await;
395        assert_eq!(fetches, vec![0..1, 6..14]);
396    }
397
398    #[tokio::test]
399    async fn test_coalesce_fuzz() {
400        let mut rand = rng();
401        for _ in 0..100 {
402            let object_len = rand.random_range(10..250);
403            let range_count = rand.random_range(0..10);
404            let ranges: Vec<_> = (0..range_count)
405                .map(|_| {
406                    let start = rand.random_range(0..object_len);
407                    let max_len = 20.min(object_len - start);
408                    let len = rand.random_range(0..max_len);
409                    start..start + len
410                })
411                .collect();
412
413            let coalesce = rand.random_range(1..5);
414            let fetches = do_fetch(ranges.clone(), coalesce).await;
415
416            for fetch in fetches.windows(2) {
417                assert!(
418                    fetch[0].start <= fetch[1].start,
419                    "fetches should be sorted, {:?} vs {:?}",
420                    fetch[0],
421                    fetch[1]
422                );
423
424                let delta = fetch[1].end - fetch[0].end;
425                assert!(
426                    delta > coalesce,
427                    "fetches should not overlap by {}, {:?} vs {:?} for {:?}",
428                    coalesce,
429                    fetch[0],
430                    fetch[1],
431                    ranges
432                );
433            }
434        }
435    }
436
437    #[test]
438    fn getrange_str() {
439        assert_eq!(GetRange::Offset(0).to_string(), "bytes=0-");
440        assert_eq!(GetRange::Bounded(10..19).to_string(), "bytes=10-18");
441        assert_eq!(GetRange::Suffix(10).to_string(), "bytes=-10");
442    }
443
444    #[test]
445    fn getrange_from() {
446        assert_eq!(Into::<GetRange>::into(10..15), GetRange::Bounded(10..15),);
447        assert_eq!(Into::<GetRange>::into(10..=15), GetRange::Bounded(10..16),);
448        assert_eq!(Into::<GetRange>::into(10..), GetRange::Offset(10),);
449        assert_eq!(Into::<GetRange>::into(..=15), GetRange::Bounded(0..16));
450    }
451
452    #[test]
453    fn test_as_range() {
454        let range = GetRange::Bounded(2..5);
455        assert_eq!(range.as_range(5).unwrap(), 2..5);
456
457        let range = range.as_range(4).unwrap();
458        assert_eq!(range, 2..4);
459
460        let range = GetRange::Bounded(3..3);
461        let err = range.as_range(2).unwrap_err().to_string();
462        assert_eq!(err, "Range started at 3 and ended at 3");
463
464        let range = GetRange::Bounded(2..2);
465        let err = range.as_range(3).unwrap_err().to_string();
466        assert_eq!(err, "Range started at 2 and ended at 2");
467
468        let range = GetRange::Suffix(3);
469        assert_eq!(range.as_range(3).unwrap(), 0..3);
470        assert_eq!(range.as_range(2).unwrap(), 0..2);
471
472        let range = GetRange::Suffix(0);
473        assert_eq!(range.as_range(0).unwrap(), 0..0);
474
475        let range = GetRange::Offset(2);
476        let err = range.as_range(2).unwrap_err().to_string();
477        assert_eq!(
478            err,
479            "Wanted range starting at 2, but object was only 2 bytes long"
480        );
481
482        let err = range.as_range(1).unwrap_err().to_string();
483        assert_eq!(
484            err,
485            "Wanted range starting at 2, but object was only 1 bytes long"
486        );
487
488        let range = GetRange::Offset(1);
489        assert_eq!(range.as_range(2).unwrap(), 1..2);
490    }
491}