1use std::{
20 fmt::Display,
21 ops::{Range, RangeBounds},
22};
23
24use super::Result;
25use bytes::Bytes;
26use futures_util::{Stream, TryStreamExt, stream::StreamExt};
27
28#[cfg(any(feature = "azure", feature = "http"))]
29pub(crate) static RFC1123_FMT: &str = "%a, %d %h %Y %T GMT";
30
31#[cfg(any(feature = "azure", feature = "http"))]
33pub(crate) fn deserialize_rfc1123<'de, D>(
34 deserializer: D,
35) -> Result<chrono::DateTime<chrono::Utc>, D::Error>
36where
37 D: serde::Deserializer<'de>,
38{
39 let s: String = serde::Deserialize::deserialize(deserializer)?;
40 let naive =
41 chrono::NaiveDateTime::parse_from_str(&s, RFC1123_FMT).map_err(serde::de::Error::custom)?;
42 Ok(chrono::TimeZone::from_utc_datetime(&chrono::Utc, &naive))
43}
44
45#[cfg(any(feature = "aws", feature = "azure"))]
46pub(crate) fn hmac_sha256(secret: impl AsRef<[u8]>, bytes: impl AsRef<[u8]>) -> ring::hmac::Tag {
47 let key = ring::hmac::Key::new(ring::hmac::HMAC_SHA256, secret.as_ref());
48 ring::hmac::sign(&key, bytes.as_ref())
49}
50
51pub async fn collect_bytes<S, E>(mut stream: S, size_hint: Option<u64>) -> Result<Bytes, E>
53where
54 E: Send,
55 S: Stream<Item = Result<Bytes, E>> + Send + Unpin,
56{
57 let first = stream.next().await.transpose()?.unwrap_or_default();
58
59 match stream.next().await.transpose()? {
61 None => Ok(first),
62 Some(second) => {
63 let size_hint = size_hint.unwrap_or_else(|| first.len() as u64 + second.len() as u64);
64
65 let mut buf = Vec::with_capacity(size_hint as usize);
66 buf.extend_from_slice(&first);
67 buf.extend_from_slice(&second);
68 while let Some(maybe_bytes) = stream.next().await {
69 buf.extend_from_slice(&maybe_bytes?);
70 }
71
72 Ok(buf.into())
73 }
74 }
75}
76
77#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
78pub(crate) async fn maybe_spawn_blocking<F, T>(f: F) -> Result<T>
80where
81 F: FnOnce() -> Result<T> + Send + 'static,
82 T: Send + 'static,
83{
84 match tokio::runtime::Handle::try_current() {
85 Ok(runtime) => runtime.spawn_blocking(f).await?,
86 Err(_) => f(),
87 }
88}
89
90pub const OBJECT_STORE_COALESCE_DEFAULT: u64 = 1024 * 1024;
93
94pub(crate) const OBJECT_STORE_COALESCE_PARALLEL: usize = 10;
96
97pub async fn coalesce_ranges<F, E, Fut>(
106 ranges: &[Range<u64>],
107 fetch: F,
108 coalesce: u64,
109) -> Result<Vec<Bytes>, E>
110where
111 F: Send + FnMut(Range<u64>) -> Fut,
112 E: Send,
113 Fut: std::future::Future<Output = Result<Bytes, E>> + Send,
114{
115 let fetch_ranges = merge_ranges(ranges, coalesce);
116
117 let fetched: Vec<_> = futures_util::stream::iter(fetch_ranges.iter().cloned())
118 .map(fetch)
119 .buffered(OBJECT_STORE_COALESCE_PARALLEL)
120 .try_collect()
121 .await?;
122
123 Ok(ranges
124 .iter()
125 .map(|range| {
126 let idx = fetch_ranges.partition_point(|v| v.start <= range.start) - 1;
127 let fetch_range = &fetch_ranges[idx];
128 let fetch_bytes = &fetched[idx];
129
130 let start = range.start - fetch_range.start;
131 let end = range.end - fetch_range.start;
132 let range = (start as usize)..(end as usize).min(fetch_bytes.len());
133 fetch_bytes.slice(range)
134 })
135 .collect())
136}
137
138fn merge_ranges(ranges: &[Range<u64>], coalesce: u64) -> Vec<Range<u64>> {
140 if ranges.is_empty() {
141 return vec![];
142 }
143
144 let mut ranges = ranges.to_vec();
145 ranges.sort_unstable_by_key(|range| range.start);
146
147 let mut ret = Vec::with_capacity(ranges.len());
148 let mut start_idx = 0;
149 let mut end_idx = 1;
150
151 while start_idx != ranges.len() {
152 let mut range_end = ranges[start_idx].end;
153
154 while end_idx != ranges.len()
155 && ranges[end_idx]
156 .start
157 .checked_sub(range_end)
158 .map(|delta| delta <= coalesce)
159 .unwrap_or(true)
160 {
161 range_end = range_end.max(ranges[end_idx].end);
162 end_idx += 1;
163 }
164
165 let start = ranges[start_idx].start;
166 let end = range_end;
167 ret.push(start..end);
168
169 start_idx = end_idx;
170 end_idx += 1;
171 }
172
173 ret
174}
175
176#[derive(Debug, PartialEq, Eq, Clone)]
193pub enum GetRange {
194 Bounded(Range<u64>),
204 Offset(u64),
206 Suffix(u64),
208}
209
210#[derive(Debug, thiserror::Error)]
211#[non_exhaustive]
212pub enum InvalidGetRange {
213 #[error("Wanted range starting at {requested}, but object was only {length} bytes long")]
214 StartTooLarge { requested: u64, length: u64 },
215
216 #[error("Range started at {start} and ended at {end}")]
217 Inconsistent { start: u64, end: u64 },
218
219 #[error("Range {requested} is larger than system memory limit {max}")]
220 TooLarge { requested: u64, max: u64 },
221}
222
223impl GetRange {
224 pub fn is_valid(&self) -> Result<(), InvalidGetRange> {
226 if let Self::Bounded(r) = self {
227 if r.end <= r.start {
228 return Err(InvalidGetRange::Inconsistent {
229 start: r.start,
230 end: r.end,
231 });
232 }
233 if (r.end - r.start) > usize::MAX as u64 {
234 return Err(InvalidGetRange::TooLarge {
235 requested: r.start,
236 max: usize::MAX as u64,
237 });
238 }
239 }
240 Ok(())
241 }
242
243 pub fn as_range(&self, len: u64) -> Result<Range<u64>, InvalidGetRange> {
245 self.is_valid()?;
246 match self {
247 Self::Bounded(r) => {
248 if r.start >= len {
249 Err(InvalidGetRange::StartTooLarge {
250 requested: r.start,
251 length: len,
252 })
253 } else if r.end > len {
254 Ok(r.start..len)
255 } else {
256 Ok(r.clone())
257 }
258 }
259 Self::Offset(o) => {
260 if *o >= len {
261 Err(InvalidGetRange::StartTooLarge {
262 requested: *o,
263 length: len,
264 })
265 } else {
266 Ok(*o..len)
267 }
268 }
269 Self::Suffix(n) => Ok(len.saturating_sub(*n)..len),
270 }
271 }
272}
273
274impl Display for GetRange {
275 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
276 match self {
277 Self::Bounded(r) => write!(f, "bytes={}-{}", r.start, r.end - 1),
278 Self::Offset(o) => write!(f, "bytes={o}-"),
279 Self::Suffix(n) => write!(f, "bytes=-{n}"),
280 }
281 }
282}
283
284impl<T: RangeBounds<u64>> From<T> for GetRange {
285 fn from(value: T) -> Self {
286 use std::ops::Bound::*;
287 let first = match value.start_bound() {
288 Included(i) => *i,
289 Excluded(i) => i + 1,
290 Unbounded => 0,
291 };
292 match value.end_bound() {
293 Included(i) => Self::Bounded(first..(i + 1)),
294 Excluded(i) => Self::Bounded(first..*i),
295 Unbounded => Self::Offset(first),
296 }
297 }
298}
299#[cfg(any(feature = "aws", feature = "gcp"))]
304pub(crate) const STRICT_ENCODE_SET: percent_encoding::AsciiSet = percent_encoding::NON_ALPHANUMERIC
305 .remove(b'-')
306 .remove(b'.')
307 .remove(b'_')
308 .remove(b'~');
309
310#[cfg(any(feature = "aws", feature = "gcp"))]
312pub(crate) fn hex_digest(bytes: &[u8]) -> String {
313 let digest = ring::digest::digest(&ring::digest::SHA256, bytes);
314 hex_encode(digest.as_ref())
315}
316
317#[cfg(any(feature = "aws", feature = "gcp"))]
319pub(crate) fn hex_encode(bytes: &[u8]) -> String {
320 use std::fmt::Write;
321 let mut out = String::with_capacity(bytes.len() * 2);
322 for byte in bytes {
323 let _ = write!(out, "{byte:02x}");
325 }
326 out
327}
328
329#[cfg(test)]
330mod tests {
331 use crate::Error;
332
333 use super::*;
334 use rand::{RngExt, rng};
335 use std::ops::Range;
336
337 async fn do_fetch(ranges: Vec<Range<u64>>, coalesce: u64) -> Vec<Range<u64>> {
341 let max = ranges.iter().map(|x| x.end).max().unwrap_or(0);
342 let src: Vec<_> = (0..max).map(|x| x as u8).collect();
343
344 let mut fetches = vec![];
345 let coalesced = coalesce_ranges::<_, Error, _>(
346 &ranges,
347 |range| {
348 fetches.push(range.clone());
349 let start = usize::try_from(range.start).unwrap();
350 let end = usize::try_from(range.end).unwrap();
351 futures_util::future::ready(Ok(Bytes::from(src[start..end].to_vec())))
352 },
353 coalesce,
354 )
355 .await
356 .unwrap();
357
358 assert_eq!(ranges.len(), coalesced.len());
359 for (range, bytes) in ranges.iter().zip(coalesced) {
360 assert_eq!(
361 bytes.as_ref(),
362 &src[usize::try_from(range.start).unwrap()..usize::try_from(range.end).unwrap()]
363 );
364 }
365 fetches
366 }
367
368 #[tokio::test]
369 async fn test_coalesce_ranges() {
370 let fetches = do_fetch(vec![], 0).await;
371 assert!(fetches.is_empty());
372
373 let fetches = do_fetch(vec![0..3; 1], 0).await;
374 assert_eq!(fetches, vec![0..3]);
375
376 let fetches = do_fetch(vec![0..2, 3..5], 0).await;
377 assert_eq!(fetches, vec![0..2, 3..5]);
378
379 let fetches = do_fetch(vec![0..1, 1..2], 0).await;
380 assert_eq!(fetches, vec![0..2]);
381
382 let fetches = do_fetch(vec![0..1, 2..72], 1).await;
383 assert_eq!(fetches, vec![0..72]);
384
385 let fetches = do_fetch(vec![0..1, 56..72, 73..75], 1).await;
386 assert_eq!(fetches, vec![0..1, 56..75]);
387
388 let fetches = do_fetch(vec![0..1, 5..6, 7..9, 2..3, 4..6], 1).await;
389 assert_eq!(fetches, vec![0..9]);
390
391 let fetches = do_fetch(vec![0..1, 5..6, 7..9, 2..3, 4..6], 1).await;
392 assert_eq!(fetches, vec![0..9]);
393
394 let fetches = do_fetch(vec![0..1, 6..7, 8..9, 10..14, 9..10], 4).await;
395 assert_eq!(fetches, vec![0..1, 6..14]);
396 }
397
398 #[tokio::test]
399 async fn test_coalesce_fuzz() {
400 let mut rand = rng();
401 for _ in 0..100 {
402 let object_len = rand.random_range(10..250);
403 let range_count = rand.random_range(0..10);
404 let ranges: Vec<_> = (0..range_count)
405 .map(|_| {
406 let start = rand.random_range(0..object_len);
407 let max_len = 20.min(object_len - start);
408 let len = rand.random_range(0..max_len);
409 start..start + len
410 })
411 .collect();
412
413 let coalesce = rand.random_range(1..5);
414 let fetches = do_fetch(ranges.clone(), coalesce).await;
415
416 for fetch in fetches.windows(2) {
417 assert!(
418 fetch[0].start <= fetch[1].start,
419 "fetches should be sorted, {:?} vs {:?}",
420 fetch[0],
421 fetch[1]
422 );
423
424 let delta = fetch[1].end - fetch[0].end;
425 assert!(
426 delta > coalesce,
427 "fetches should not overlap by {}, {:?} vs {:?} for {:?}",
428 coalesce,
429 fetch[0],
430 fetch[1],
431 ranges
432 );
433 }
434 }
435 }
436
437 #[test]
438 fn getrange_str() {
439 assert_eq!(GetRange::Offset(0).to_string(), "bytes=0-");
440 assert_eq!(GetRange::Bounded(10..19).to_string(), "bytes=10-18");
441 assert_eq!(GetRange::Suffix(10).to_string(), "bytes=-10");
442 }
443
444 #[test]
445 fn getrange_from() {
446 assert_eq!(Into::<GetRange>::into(10..15), GetRange::Bounded(10..15),);
447 assert_eq!(Into::<GetRange>::into(10..=15), GetRange::Bounded(10..16),);
448 assert_eq!(Into::<GetRange>::into(10..), GetRange::Offset(10),);
449 assert_eq!(Into::<GetRange>::into(..=15), GetRange::Bounded(0..16));
450 }
451
452 #[test]
453 fn test_as_range() {
454 let range = GetRange::Bounded(2..5);
455 assert_eq!(range.as_range(5).unwrap(), 2..5);
456
457 let range = range.as_range(4).unwrap();
458 assert_eq!(range, 2..4);
459
460 let range = GetRange::Bounded(3..3);
461 let err = range.as_range(2).unwrap_err().to_string();
462 assert_eq!(err, "Range started at 3 and ended at 3");
463
464 let range = GetRange::Bounded(2..2);
465 let err = range.as_range(3).unwrap_err().to_string();
466 assert_eq!(err, "Range started at 2 and ended at 2");
467
468 let range = GetRange::Suffix(3);
469 assert_eq!(range.as_range(3).unwrap(), 0..3);
470 assert_eq!(range.as_range(2).unwrap(), 0..2);
471
472 let range = GetRange::Suffix(0);
473 assert_eq!(range.as_range(0).unwrap(), 0..0);
474
475 let range = GetRange::Offset(2);
476 let err = range.as_range(2).unwrap_err().to_string();
477 assert_eq!(
478 err,
479 "Wanted range starting at 2, but object was only 2 bytes long"
480 );
481
482 let err = range.as_range(1).unwrap_err().to_string();
483 assert_eq!(
484 err,
485 "Wanted range starting at 2, but object was only 1 bytes long"
486 );
487
488 let range = GetRange::Offset(1);
489 assert_eq!(range.as_range(2).unwrap(), 1..2);
490 }
491}