Skip to main content

object_store/
buffered.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Utilities for performing tokio-style buffered IO
19
20use crate::path::Path;
21use crate::{
22    Attributes, Extensions, ObjectMeta, ObjectStore, ObjectStoreExt, PutMultipartOptions,
23    PutOptions, PutPayloadMut, TagSet, WriteMultipart,
24};
25use bytes::Bytes;
26use futures_util::future::{BoxFuture, FutureExt};
27use futures_util::ready;
28use std::cmp::Ordering;
29use std::io::{Error, ErrorKind, SeekFrom};
30use std::pin::Pin;
31use std::sync::Arc;
32use std::task::{Context, Poll};
33use tokio::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};
34
35/// The default buffer size used by [`BufReader`]
36pub const DEFAULT_BUFFER_SIZE: usize = 1024 * 1024;
37
38/// An async-buffered reader compatible with the tokio IO traits
39///
40/// Internally this maintains a buffer of the requested size, and uses [`ObjectStoreExt::get_range`]
41/// to populate its internal buffer once depleted. This buffer is cleared on seek.
42///
43/// Whilst simple, this interface will typically be outperformed by the native [`ObjectStore`]
44/// methods that better map to the network APIs. This is because most object stores have
45/// very [high first-byte latencies], on the order of 100-200ms, and so avoiding unnecessary
46/// round-trips is critical to throughput.
47///
48/// Systems looking to sequentially scan a file should instead consider using [`ObjectStoreExt::get`],
49/// or [`ObjectStore::get_opts`], or [`ObjectStoreExt::get_range`] to read a particular range.
50///
51/// Systems looking to read multiple ranges of a file should instead consider using
52/// [`ObjectStore::get_ranges`], which will optimise the vectored IO.
53///
54/// [high first-byte latencies]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/optimizing-performance.html
55/// [`ObjectStoreExt::get`]: crate::ObjectStoreExt::get
56pub struct BufReader {
57    /// The object store to fetch data from
58    store: Arc<dyn ObjectStore>,
59    /// The size of the object
60    size: u64,
61    /// The path to the object
62    path: Path,
63    /// The current position in the object
64    cursor: u64,
65    /// The number of bytes to read in a single request
66    capacity: usize,
67    /// The buffered data if any
68    buffer: Buffer,
69}
70
71impl std::fmt::Debug for BufReader {
72    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73        f.debug_struct("BufReader")
74            .field("path", &self.path)
75            .field("size", &self.size)
76            .field("capacity", &self.capacity)
77            .finish()
78    }
79}
80
81enum Buffer {
82    Empty,
83    Pending(BoxFuture<'static, std::io::Result<Bytes>>),
84    Ready(Bytes),
85}
86
87impl BufReader {
88    /// Create a new [`BufReader`] from the provided [`ObjectMeta`] and [`ObjectStore`]
89    pub fn new(store: Arc<dyn ObjectStore>, meta: &ObjectMeta) -> Self {
90        Self::with_capacity(store, meta, DEFAULT_BUFFER_SIZE)
91    }
92
93    /// Create a new [`BufReader`] from the provided [`ObjectMeta`], [`ObjectStore`], and `capacity`
94    pub fn with_capacity(store: Arc<dyn ObjectStore>, meta: &ObjectMeta, capacity: usize) -> Self {
95        Self {
96            path: meta.location.clone(),
97            size: meta.size as _,
98            store,
99            capacity,
100            cursor: 0,
101            buffer: Buffer::Empty,
102        }
103    }
104
105    fn poll_fill_buf_impl(
106        &mut self,
107        cx: &mut Context<'_>,
108        amnt: usize,
109    ) -> Poll<std::io::Result<&[u8]>> {
110        let buf = &mut self.buffer;
111        loop {
112            match buf {
113                Buffer::Empty => {
114                    let store = Arc::clone(&self.store);
115                    let path = self.path.clone();
116                    let start = self.cursor.min(self.size) as _;
117                    let end = self.cursor.saturating_add(amnt as u64).min(self.size) as _;
118
119                    if start == end {
120                        return Poll::Ready(Ok(&[]));
121                    }
122
123                    *buf = Buffer::Pending(Box::pin(async move {
124                        Ok(store.get_range(&path, start..end).await?)
125                    }))
126                }
127                Buffer::Pending(fut) => match ready!(fut.poll_unpin(cx)) {
128                    Ok(b) => *buf = Buffer::Ready(b),
129                    Err(e) => return Poll::Ready(Err(e)),
130                },
131                Buffer::Ready(r) => return Poll::Ready(Ok(r)),
132            }
133        }
134    }
135}
136
137impl AsyncSeek for BufReader {
138    fn start_seek(mut self: Pin<&mut Self>, position: SeekFrom) -> std::io::Result<()> {
139        self.cursor = match position {
140            SeekFrom::Start(offset) => offset,
141            SeekFrom::End(offset) => checked_add_signed(self.size, offset).ok_or_else(|| {
142                Error::new(
143                    ErrorKind::InvalidInput,
144                    format!(
145                        "Seeking {offset} from end of {} byte file would result in overflow",
146                        self.size
147                    ),
148                )
149            })?,
150            SeekFrom::Current(offset) => {
151                checked_add_signed(self.cursor, offset).ok_or_else(|| {
152                    Error::new(
153                        ErrorKind::InvalidInput,
154                        format!(
155                            "Seeking {offset} from current offset of {} would result in overflow",
156                            self.cursor
157                        ),
158                    )
159                })?
160            }
161        };
162        self.buffer = Buffer::Empty;
163        Ok(())
164    }
165
166    fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<u64>> {
167        Poll::Ready(Ok(self.cursor))
168    }
169}
170
171impl AsyncRead for BufReader {
172    fn poll_read(
173        mut self: Pin<&mut Self>,
174        cx: &mut Context<'_>,
175        out: &mut ReadBuf<'_>,
176    ) -> Poll<std::io::Result<()>> {
177        // Read the maximum of the internal buffer and `out`
178        let to_read = out.remaining().max(self.capacity);
179        let r = match ready!(self.poll_fill_buf_impl(cx, to_read)) {
180            Ok(buf) => {
181                let to_consume = out.remaining().min(buf.len());
182                out.put_slice(&buf[..to_consume]);
183                self.consume(to_consume);
184                Ok(())
185            }
186            Err(e) => Err(e),
187        };
188        Poll::Ready(r)
189    }
190}
191
192impl AsyncBufRead for BufReader {
193    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<&[u8]>> {
194        let capacity = self.capacity;
195        self.get_mut().poll_fill_buf_impl(cx, capacity)
196    }
197
198    fn consume(mut self: Pin<&mut Self>, amt: usize) {
199        match &mut self.buffer {
200            Buffer::Empty => assert_eq!(amt, 0, "cannot consume from empty buffer"),
201            Buffer::Ready(b) => match b.len().cmp(&amt) {
202                Ordering::Less => panic!("{amt} exceeds buffer sized of {}", b.len()),
203                Ordering::Greater => *b = b.slice(amt..),
204                Ordering::Equal => self.buffer = Buffer::Empty,
205            },
206            Buffer::Pending(_) => panic!("cannot consume from pending buffer"),
207        }
208        self.cursor += amt as u64;
209    }
210}
211
212/// An async buffered writer compatible with the tokio IO traits
213///
214/// This writer adaptively uses [`ObjectStore::put_opts`] or
215/// [`ObjectStore::put_multipart_opts`] depending on the amount of data that has
216/// been written.
217///
218/// Up to `capacity` bytes will be buffered in memory, and flushed on shutdown
219/// using [`ObjectStore::put_opts`]. If `capacity` is exceeded, data will instead be
220/// streamed using [`ObjectStore::put_multipart_opts`].
221pub struct BufWriter {
222    capacity: usize,
223    max_concurrency: usize,
224    attributes: Option<Attributes>,
225    tags: Option<TagSet>,
226    extensions: Option<Extensions>,
227    state: BufWriterState,
228    store: Arc<dyn ObjectStore>,
229}
230
231impl std::fmt::Debug for BufWriter {
232    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
233        f.debug_struct("BufWriter")
234            .field("capacity", &self.capacity)
235            .finish()
236    }
237}
238
239enum BufWriterState {
240    /// Buffer up to capacity bytes
241    Buffer(Path, PutPayloadMut),
242    /// [`ObjectStore::put_multipart_opts`]
243    Prepare(BoxFuture<'static, crate::Result<WriteMultipart>>),
244    /// Write to a multipart upload
245    Write(Option<WriteMultipart>),
246    /// [`ObjectStore::put_opts`]
247    Flush(BoxFuture<'static, crate::Result<()>>),
248}
249
250impl BufWriter {
251    /// Create a new [`BufWriter`] from the provided [`ObjectStore`] and [`Path`]
252    pub fn new(store: Arc<dyn ObjectStore>, path: Path) -> Self {
253        Self::with_capacity(store, path, 10 * 1024 * 1024)
254    }
255
256    /// Create a new [`BufWriter`] from the provided [`ObjectStore`], [`Path`] and `capacity`
257    pub fn with_capacity(store: Arc<dyn ObjectStore>, path: Path, capacity: usize) -> Self {
258        Self {
259            capacity,
260            store,
261            max_concurrency: 8,
262            attributes: None,
263            tags: None,
264            extensions: None,
265            state: BufWriterState::Buffer(path, PutPayloadMut::new()),
266        }
267    }
268
269    /// Override the maximum number of in-flight requests for this writer
270    ///
271    /// Defaults to 8
272    pub fn with_max_concurrency(self, max_concurrency: usize) -> Self {
273        Self {
274            max_concurrency,
275            ..self
276        }
277    }
278
279    /// Set the attributes of the uploaded object
280    pub fn with_attributes(self, attributes: Attributes) -> Self {
281        Self {
282            attributes: Some(attributes),
283            ..self
284        }
285    }
286
287    /// Set the tags of the uploaded object
288    pub fn with_tags(self, tags: TagSet) -> Self {
289        Self {
290            tags: Some(tags),
291            ..self
292        }
293    }
294
295    /// Set the extensions of the uploaded object
296    ///
297    /// Implementation-specific extensions. Intended for use by [`ObjectStore`] implementations
298    /// that need to pass context-specific information (like tracing spans) via trait methods.
299    ///
300    /// These extensions are ignored entirely by backends offered through this crate.
301    pub fn with_extensions(self, extensions: Extensions) -> Self {
302        Self {
303            extensions: Some(extensions),
304            ..self
305        }
306    }
307
308    /// Write data to the writer in [`Bytes`].
309    ///
310    /// Unlike [`AsyncWrite::poll_write`], `put` can write data without extra copying.
311    ///
312    /// This API is recommended while the data source generates [`Bytes`].
313    pub async fn put(&mut self, bytes: Bytes) -> crate::Result<()> {
314        loop {
315            return match &mut self.state {
316                BufWriterState::Write(Some(write)) => {
317                    write.wait_for_capacity(self.max_concurrency).await?;
318                    write.put(bytes);
319                    Ok(())
320                }
321                BufWriterState::Write(None) | BufWriterState::Flush(_) => {
322                    panic!("Already shut down")
323                }
324                // NOTE
325                //
326                // This case should never happen in practice, but rust async API does
327                // make it possible for users to call `put` before `poll_write` returns `Ready`.
328                //
329                // We allow such usage by `await` the future and continue the loop.
330                BufWriterState::Prepare(f) => {
331                    self.state = BufWriterState::Write(f.await?.into());
332                    continue;
333                }
334                BufWriterState::Buffer(path, b) => {
335                    if b.content_length().saturating_add(bytes.len()) < self.capacity {
336                        b.push(bytes);
337                        Ok(())
338                    } else {
339                        let buffer = std::mem::take(b);
340                        let path = std::mem::take(path);
341                        let opts = PutMultipartOptions {
342                            attributes: self.attributes.take().unwrap_or_default(),
343                            tags: self.tags.take().unwrap_or_default(),
344                            extensions: self.extensions.take().unwrap_or_default(),
345                        };
346                        let upload = self.store.put_multipart_opts(&path, opts).await?;
347                        let mut chunked =
348                            WriteMultipart::new_with_chunk_size(upload, self.capacity);
349                        for chunk in buffer.freeze() {
350                            chunked.put(chunk);
351                        }
352                        chunked.put(bytes);
353                        self.state = BufWriterState::Write(Some(chunked));
354                        Ok(())
355                    }
356                }
357            };
358        }
359    }
360
361    /// Abort this writer, cleaning up any partially uploaded state
362    ///
363    /// # Panic
364    ///
365    /// Panics if this writer has already been shutdown or aborted
366    pub async fn abort(&mut self) -> crate::Result<()> {
367        match &mut self.state {
368            BufWriterState::Buffer(_, _) | BufWriterState::Prepare(_) => Ok(()),
369            BufWriterState::Flush(_) => panic!("Already shut down"),
370            BufWriterState::Write(x) => x.take().unwrap().abort().await,
371        }
372    }
373}
374
375impl AsyncWrite for BufWriter {
376    fn poll_write(
377        mut self: Pin<&mut Self>,
378        cx: &mut Context<'_>,
379        buf: &[u8],
380    ) -> Poll<Result<usize, Error>> {
381        let cap = self.capacity;
382        let max_concurrency = self.max_concurrency;
383        loop {
384            return match &mut self.state {
385                BufWriterState::Write(Some(write)) => {
386                    ready!(write.poll_for_capacity(cx, max_concurrency))?;
387                    write.write(buf);
388                    Poll::Ready(Ok(buf.len()))
389                }
390                BufWriterState::Write(None) | BufWriterState::Flush(_) => {
391                    panic!("Already shut down")
392                }
393                BufWriterState::Prepare(f) => {
394                    self.state = BufWriterState::Write(ready!(f.poll_unpin(cx)?).into());
395                    continue;
396                }
397                BufWriterState::Buffer(path, b) => {
398                    if b.content_length().saturating_add(buf.len()) >= cap {
399                        let buffer = std::mem::take(b);
400                        let path = std::mem::take(path);
401                        let opts = PutMultipartOptions {
402                            attributes: self.attributes.take().unwrap_or_default(),
403                            tags: self.tags.take().unwrap_or_default(),
404                            extensions: self.extensions.take().unwrap_or_default(),
405                        };
406                        let store = Arc::clone(&self.store);
407                        self.state = BufWriterState::Prepare(Box::pin(async move {
408                            let upload = store.put_multipart_opts(&path, opts).await?;
409                            let mut chunked = WriteMultipart::new_with_chunk_size(upload, cap);
410                            for chunk in buffer.freeze() {
411                                chunked.put(chunk);
412                            }
413                            Ok(chunked)
414                        }));
415                        continue;
416                    }
417                    b.extend_from_slice(buf);
418                    Poll::Ready(Ok(buf.len()))
419                }
420            };
421        }
422    }
423
424    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
425        loop {
426            return match &mut self.state {
427                BufWriterState::Write(_) | BufWriterState::Buffer(_, _) => Poll::Ready(Ok(())),
428                BufWriterState::Flush(_) => panic!("Already shut down"),
429                BufWriterState::Prepare(f) => {
430                    self.state = BufWriterState::Write(ready!(f.poll_unpin(cx)?).into());
431                    continue;
432                }
433            };
434        }
435    }
436
437    fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
438        loop {
439            match &mut self.state {
440                BufWriterState::Prepare(f) => {
441                    self.state = BufWriterState::Write(ready!(f.poll_unpin(cx)?).into());
442                }
443                BufWriterState::Buffer(p, b) => {
444                    let buf = std::mem::take(b);
445                    let path = std::mem::take(p);
446                    let opts = PutOptions {
447                        attributes: self.attributes.take().unwrap_or_default(),
448                        tags: self.tags.take().unwrap_or_default(),
449                        ..Default::default()
450                    };
451                    let store = Arc::clone(&self.store);
452                    self.state = BufWriterState::Flush(Box::pin(async move {
453                        store.put_opts(&path, buf.into(), opts).await?;
454                        Ok(())
455                    }));
456                }
457                BufWriterState::Flush(f) => return f.poll_unpin(cx).map_err(std::io::Error::from),
458                BufWriterState::Write(x) => {
459                    let upload = x.take().ok_or_else(|| {
460                        std::io::Error::new(
461                            ErrorKind::InvalidInput,
462                            "Cannot shutdown a writer that has already been shut down",
463                        )
464                    })?;
465                    self.state = BufWriterState::Flush(
466                        async move {
467                            upload.finish().await?;
468                            Ok(())
469                        }
470                        .boxed(),
471                    )
472                }
473            }
474        }
475    }
476}
477
478/// Port of standardised function as requires Rust 1.66
479///
480/// <https://github.com/rust-lang/rust/pull/87601/files#diff-b9390ee807a1dae3c3128dce36df56748ad8d23c6e361c0ebba4d744bf6efdb9R1533>
481#[inline]
482fn checked_add_signed(a: u64, rhs: i64) -> Option<u64> {
483    let (res, overflowed) = a.overflowing_add(rhs as _);
484    let overflow = overflowed ^ (rhs < 0);
485    (!overflow).then_some(res)
486}
487
488#[cfg(test)]
489mod tests {
490    use super::*;
491    use crate::memory::InMemory;
492    use crate::path::Path;
493    use crate::{Attribute, GetOptions, ObjectStoreExt};
494    use itertools::Itertools;
495    use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
496
497    #[tokio::test]
498    async fn test_buf_reader() {
499        let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
500
501        let existent = Path::from("exists.txt");
502        const BYTES: usize = 4096;
503
504        let data: Bytes = b"12345678".iter().cycle().copied().take(BYTES).collect();
505        store.put(&existent, data.clone().into()).await.unwrap();
506
507        let meta = store.head(&existent).await.unwrap();
508
509        let mut reader = BufReader::new(Arc::clone(&store), &meta);
510        let mut out = Vec::with_capacity(BYTES);
511        let read = reader.read_to_end(&mut out).await.unwrap();
512
513        assert_eq!(read, BYTES);
514        assert_eq!(&out, &data);
515
516        let err = reader.seek(SeekFrom::Current(i64::MIN)).await.unwrap_err();
517        assert_eq!(
518            err.to_string(),
519            "Seeking -9223372036854775808 from current offset of 4096 would result in overflow"
520        );
521
522        reader.rewind().await.unwrap();
523
524        let err = reader.seek(SeekFrom::Current(-1)).await.unwrap_err();
525        assert_eq!(
526            err.to_string(),
527            "Seeking -1 from current offset of 0 would result in overflow"
528        );
529
530        // Seeking beyond the bounds of the file is permitted but should return no data
531        reader.seek(SeekFrom::Start(u64::MAX)).await.unwrap();
532        let buf = reader.fill_buf().await.unwrap();
533        assert!(buf.is_empty());
534
535        let err = reader.seek(SeekFrom::Current(1)).await.unwrap_err();
536        assert_eq!(
537            err.to_string(),
538            "Seeking 1 from current offset of 18446744073709551615 would result in overflow"
539        );
540
541        for capacity in [200, 1024, 4096, DEFAULT_BUFFER_SIZE] {
542            let store = Arc::clone(&store);
543            let mut reader = BufReader::with_capacity(store, &meta, capacity);
544
545            let mut bytes_read = 0;
546            loop {
547                let buf = reader.fill_buf().await.unwrap();
548                if buf.is_empty() {
549                    assert_eq!(bytes_read, BYTES);
550                    break;
551                }
552                assert!(buf.starts_with(b"12345678"));
553                bytes_read += 8;
554                reader.consume(8);
555            }
556
557            let mut buf = Vec::with_capacity(76);
558            reader.seek(SeekFrom::Current(-76)).await.unwrap();
559            reader.read_to_end(&mut buf).await.unwrap();
560            assert_eq!(&buf, &data[BYTES - 76..]);
561
562            reader.rewind().await.unwrap();
563            let buffer = reader.fill_buf().await.unwrap();
564            assert_eq!(buffer, &data[..capacity.min(BYTES)]);
565
566            reader.seek(SeekFrom::Start(325)).await.unwrap();
567            let buffer = reader.fill_buf().await.unwrap();
568            assert_eq!(buffer, &data[325..(325 + capacity).min(BYTES)]);
569
570            reader.seek(SeekFrom::End(0)).await.unwrap();
571            let buffer = reader.fill_buf().await.unwrap();
572            assert!(buffer.is_empty());
573        }
574    }
575
576    // Note: `BufWriter::with_tags` functionality is tested in `crate::tests::tagging`
577    #[tokio::test]
578    async fn test_buf_writer() {
579        let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
580        let path = Path::from("file.txt");
581        let attributes = Attributes::from_iter([
582            (Attribute::ContentType, "text/html"),
583            (Attribute::CacheControl, "max-age=604800"),
584        ]);
585
586        // Test put
587        let mut writer = BufWriter::with_capacity(Arc::clone(&store), path.clone(), 30)
588            .with_attributes(attributes.clone());
589        writer.write_all(&[0; 20]).await.unwrap();
590        writer.flush().await.unwrap();
591        writer.write_all(&[0; 5]).await.unwrap();
592        writer.shutdown().await.unwrap();
593        let response = store
594            .get_opts(&path, GetOptions::new().with_head(true))
595            .await
596            .unwrap();
597        assert_eq!(response.meta.size, 25);
598        assert_eq!(response.attributes, attributes);
599
600        // Test multipart
601        let mut writer = BufWriter::with_capacity(Arc::clone(&store), path.clone(), 30)
602            .with_attributes(attributes.clone());
603        writer.write_all(&[0; 20]).await.unwrap();
604        writer.flush().await.unwrap();
605        writer.write_all(&[0; 20]).await.unwrap();
606        writer.shutdown().await.unwrap();
607        let response = store
608            .get_opts(&path, GetOptions::new().with_head(true))
609            .await
610            .unwrap();
611        assert_eq!(response.meta.size, 40);
612        assert_eq!(response.attributes, attributes);
613    }
614
615    #[tokio::test]
616    async fn test_buf_writer_with_put() {
617        let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
618        let path = Path::from("file.txt");
619
620        // Test put
621        let mut writer = BufWriter::with_capacity(Arc::clone(&store), path.clone(), 30);
622        writer
623            .put(Bytes::from((0..20).collect_vec()))
624            .await
625            .unwrap();
626        writer
627            .put(Bytes::from((20..25).collect_vec()))
628            .await
629            .unwrap();
630        writer.shutdown().await.unwrap();
631        let response = store
632            .get_opts(&path, GetOptions::new().with_head(true))
633            .await
634            .unwrap();
635        assert_eq!(response.meta.size, 25);
636        assert_eq!(response.bytes().await.unwrap(), (0..25).collect_vec());
637
638        // Test multipart
639        let mut writer = BufWriter::with_capacity(Arc::clone(&store), path.clone(), 30);
640        writer
641            .put(Bytes::from((0..20).collect_vec()))
642            .await
643            .unwrap();
644        writer
645            .put(Bytes::from((20..40).collect_vec()))
646            .await
647            .unwrap();
648        writer.shutdown().await.unwrap();
649        let response = store
650            .get_opts(&path, GetOptions::new().with_head(true))
651            .await
652            .unwrap();
653        assert_eq!(response.meta.size, 40);
654        assert_eq!(response.bytes().await.unwrap(), (0..40).collect_vec());
655    }
656}