1use crate::path::Path;
21use crate::{
22 Attributes, Extensions, ObjectMeta, ObjectStore, ObjectStoreExt, PutMultipartOptions,
23 PutOptions, PutPayloadMut, TagSet, WriteMultipart,
24};
25use bytes::Bytes;
26use futures_util::future::{BoxFuture, FutureExt};
27use futures_util::ready;
28use std::cmp::Ordering;
29use std::io::{Error, ErrorKind, SeekFrom};
30use std::pin::Pin;
31use std::sync::Arc;
32use std::task::{Context, Poll};
33use tokio::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};
34
35pub const DEFAULT_BUFFER_SIZE: usize = 1024 * 1024;
37
38pub struct BufReader {
57 store: Arc<dyn ObjectStore>,
59 size: u64,
61 path: Path,
63 cursor: u64,
65 capacity: usize,
67 buffer: Buffer,
69}
70
71impl std::fmt::Debug for BufReader {
72 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73 f.debug_struct("BufReader")
74 .field("path", &self.path)
75 .field("size", &self.size)
76 .field("capacity", &self.capacity)
77 .finish()
78 }
79}
80
81enum Buffer {
82 Empty,
83 Pending(BoxFuture<'static, std::io::Result<Bytes>>),
84 Ready(Bytes),
85}
86
87impl BufReader {
88 pub fn new(store: Arc<dyn ObjectStore>, meta: &ObjectMeta) -> Self {
90 Self::with_capacity(store, meta, DEFAULT_BUFFER_SIZE)
91 }
92
93 pub fn with_capacity(store: Arc<dyn ObjectStore>, meta: &ObjectMeta, capacity: usize) -> Self {
95 Self {
96 path: meta.location.clone(),
97 size: meta.size as _,
98 store,
99 capacity,
100 cursor: 0,
101 buffer: Buffer::Empty,
102 }
103 }
104
105 fn poll_fill_buf_impl(
106 &mut self,
107 cx: &mut Context<'_>,
108 amnt: usize,
109 ) -> Poll<std::io::Result<&[u8]>> {
110 let buf = &mut self.buffer;
111 loop {
112 match buf {
113 Buffer::Empty => {
114 let store = Arc::clone(&self.store);
115 let path = self.path.clone();
116 let start = self.cursor.min(self.size) as _;
117 let end = self.cursor.saturating_add(amnt as u64).min(self.size) as _;
118
119 if start == end {
120 return Poll::Ready(Ok(&[]));
121 }
122
123 *buf = Buffer::Pending(Box::pin(async move {
124 Ok(store.get_range(&path, start..end).await?)
125 }))
126 }
127 Buffer::Pending(fut) => match ready!(fut.poll_unpin(cx)) {
128 Ok(b) => *buf = Buffer::Ready(b),
129 Err(e) => return Poll::Ready(Err(e)),
130 },
131 Buffer::Ready(r) => return Poll::Ready(Ok(r)),
132 }
133 }
134 }
135}
136
137impl AsyncSeek for BufReader {
138 fn start_seek(mut self: Pin<&mut Self>, position: SeekFrom) -> std::io::Result<()> {
139 self.cursor = match position {
140 SeekFrom::Start(offset) => offset,
141 SeekFrom::End(offset) => checked_add_signed(self.size, offset).ok_or_else(|| {
142 Error::new(
143 ErrorKind::InvalidInput,
144 format!(
145 "Seeking {offset} from end of {} byte file would result in overflow",
146 self.size
147 ),
148 )
149 })?,
150 SeekFrom::Current(offset) => {
151 checked_add_signed(self.cursor, offset).ok_or_else(|| {
152 Error::new(
153 ErrorKind::InvalidInput,
154 format!(
155 "Seeking {offset} from current offset of {} would result in overflow",
156 self.cursor
157 ),
158 )
159 })?
160 }
161 };
162 self.buffer = Buffer::Empty;
163 Ok(())
164 }
165
166 fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<u64>> {
167 Poll::Ready(Ok(self.cursor))
168 }
169}
170
171impl AsyncRead for BufReader {
172 fn poll_read(
173 mut self: Pin<&mut Self>,
174 cx: &mut Context<'_>,
175 out: &mut ReadBuf<'_>,
176 ) -> Poll<std::io::Result<()>> {
177 let to_read = out.remaining().max(self.capacity);
179 let r = match ready!(self.poll_fill_buf_impl(cx, to_read)) {
180 Ok(buf) => {
181 let to_consume = out.remaining().min(buf.len());
182 out.put_slice(&buf[..to_consume]);
183 self.consume(to_consume);
184 Ok(())
185 }
186 Err(e) => Err(e),
187 };
188 Poll::Ready(r)
189 }
190}
191
192impl AsyncBufRead for BufReader {
193 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<&[u8]>> {
194 let capacity = self.capacity;
195 self.get_mut().poll_fill_buf_impl(cx, capacity)
196 }
197
198 fn consume(mut self: Pin<&mut Self>, amt: usize) {
199 match &mut self.buffer {
200 Buffer::Empty => assert_eq!(amt, 0, "cannot consume from empty buffer"),
201 Buffer::Ready(b) => match b.len().cmp(&amt) {
202 Ordering::Less => panic!("{amt} exceeds buffer sized of {}", b.len()),
203 Ordering::Greater => *b = b.slice(amt..),
204 Ordering::Equal => self.buffer = Buffer::Empty,
205 },
206 Buffer::Pending(_) => panic!("cannot consume from pending buffer"),
207 }
208 self.cursor += amt as u64;
209 }
210}
211
212pub struct BufWriter {
222 capacity: usize,
223 max_concurrency: usize,
224 attributes: Option<Attributes>,
225 tags: Option<TagSet>,
226 extensions: Option<Extensions>,
227 state: BufWriterState,
228 store: Arc<dyn ObjectStore>,
229}
230
231impl std::fmt::Debug for BufWriter {
232 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
233 f.debug_struct("BufWriter")
234 .field("capacity", &self.capacity)
235 .finish()
236 }
237}
238
239enum BufWriterState {
240 Buffer(Path, PutPayloadMut),
242 Prepare(BoxFuture<'static, crate::Result<WriteMultipart>>),
244 Write(Option<WriteMultipart>),
246 Flush(BoxFuture<'static, crate::Result<()>>),
248}
249
250impl BufWriter {
251 pub fn new(store: Arc<dyn ObjectStore>, path: Path) -> Self {
253 Self::with_capacity(store, path, 10 * 1024 * 1024)
254 }
255
256 pub fn with_capacity(store: Arc<dyn ObjectStore>, path: Path, capacity: usize) -> Self {
258 Self {
259 capacity,
260 store,
261 max_concurrency: 8,
262 attributes: None,
263 tags: None,
264 extensions: None,
265 state: BufWriterState::Buffer(path, PutPayloadMut::new()),
266 }
267 }
268
269 pub fn with_max_concurrency(self, max_concurrency: usize) -> Self {
273 Self {
274 max_concurrency,
275 ..self
276 }
277 }
278
279 pub fn with_attributes(self, attributes: Attributes) -> Self {
281 Self {
282 attributes: Some(attributes),
283 ..self
284 }
285 }
286
287 pub fn with_tags(self, tags: TagSet) -> Self {
289 Self {
290 tags: Some(tags),
291 ..self
292 }
293 }
294
295 pub fn with_extensions(self, extensions: Extensions) -> Self {
302 Self {
303 extensions: Some(extensions),
304 ..self
305 }
306 }
307
308 pub async fn put(&mut self, bytes: Bytes) -> crate::Result<()> {
314 loop {
315 return match &mut self.state {
316 BufWriterState::Write(Some(write)) => {
317 write.wait_for_capacity(self.max_concurrency).await?;
318 write.put(bytes);
319 Ok(())
320 }
321 BufWriterState::Write(None) | BufWriterState::Flush(_) => {
322 panic!("Already shut down")
323 }
324 BufWriterState::Prepare(f) => {
331 self.state = BufWriterState::Write(f.await?.into());
332 continue;
333 }
334 BufWriterState::Buffer(path, b) => {
335 if b.content_length().saturating_add(bytes.len()) < self.capacity {
336 b.push(bytes);
337 Ok(())
338 } else {
339 let buffer = std::mem::take(b);
340 let path = std::mem::take(path);
341 let opts = PutMultipartOptions {
342 attributes: self.attributes.take().unwrap_or_default(),
343 tags: self.tags.take().unwrap_or_default(),
344 extensions: self.extensions.take().unwrap_or_default(),
345 };
346 let upload = self.store.put_multipart_opts(&path, opts).await?;
347 let mut chunked =
348 WriteMultipart::new_with_chunk_size(upload, self.capacity);
349 for chunk in buffer.freeze() {
350 chunked.put(chunk);
351 }
352 chunked.put(bytes);
353 self.state = BufWriterState::Write(Some(chunked));
354 Ok(())
355 }
356 }
357 };
358 }
359 }
360
361 pub async fn abort(&mut self) -> crate::Result<()> {
367 match &mut self.state {
368 BufWriterState::Buffer(_, _) | BufWriterState::Prepare(_) => Ok(()),
369 BufWriterState::Flush(_) => panic!("Already shut down"),
370 BufWriterState::Write(x) => x.take().unwrap().abort().await,
371 }
372 }
373}
374
375impl AsyncWrite for BufWriter {
376 fn poll_write(
377 mut self: Pin<&mut Self>,
378 cx: &mut Context<'_>,
379 buf: &[u8],
380 ) -> Poll<Result<usize, Error>> {
381 let cap = self.capacity;
382 let max_concurrency = self.max_concurrency;
383 loop {
384 return match &mut self.state {
385 BufWriterState::Write(Some(write)) => {
386 ready!(write.poll_for_capacity(cx, max_concurrency))?;
387 write.write(buf);
388 Poll::Ready(Ok(buf.len()))
389 }
390 BufWriterState::Write(None) | BufWriterState::Flush(_) => {
391 panic!("Already shut down")
392 }
393 BufWriterState::Prepare(f) => {
394 self.state = BufWriterState::Write(ready!(f.poll_unpin(cx)?).into());
395 continue;
396 }
397 BufWriterState::Buffer(path, b) => {
398 if b.content_length().saturating_add(buf.len()) >= cap {
399 let buffer = std::mem::take(b);
400 let path = std::mem::take(path);
401 let opts = PutMultipartOptions {
402 attributes: self.attributes.take().unwrap_or_default(),
403 tags: self.tags.take().unwrap_or_default(),
404 extensions: self.extensions.take().unwrap_or_default(),
405 };
406 let store = Arc::clone(&self.store);
407 self.state = BufWriterState::Prepare(Box::pin(async move {
408 let upload = store.put_multipart_opts(&path, opts).await?;
409 let mut chunked = WriteMultipart::new_with_chunk_size(upload, cap);
410 for chunk in buffer.freeze() {
411 chunked.put(chunk);
412 }
413 Ok(chunked)
414 }));
415 continue;
416 }
417 b.extend_from_slice(buf);
418 Poll::Ready(Ok(buf.len()))
419 }
420 };
421 }
422 }
423
424 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
425 loop {
426 return match &mut self.state {
427 BufWriterState::Write(_) | BufWriterState::Buffer(_, _) => Poll::Ready(Ok(())),
428 BufWriterState::Flush(_) => panic!("Already shut down"),
429 BufWriterState::Prepare(f) => {
430 self.state = BufWriterState::Write(ready!(f.poll_unpin(cx)?).into());
431 continue;
432 }
433 };
434 }
435 }
436
437 fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
438 loop {
439 match &mut self.state {
440 BufWriterState::Prepare(f) => {
441 self.state = BufWriterState::Write(ready!(f.poll_unpin(cx)?).into());
442 }
443 BufWriterState::Buffer(p, b) => {
444 let buf = std::mem::take(b);
445 let path = std::mem::take(p);
446 let opts = PutOptions {
447 attributes: self.attributes.take().unwrap_or_default(),
448 tags: self.tags.take().unwrap_or_default(),
449 ..Default::default()
450 };
451 let store = Arc::clone(&self.store);
452 self.state = BufWriterState::Flush(Box::pin(async move {
453 store.put_opts(&path, buf.into(), opts).await?;
454 Ok(())
455 }));
456 }
457 BufWriterState::Flush(f) => return f.poll_unpin(cx).map_err(std::io::Error::from),
458 BufWriterState::Write(x) => {
459 let upload = x.take().ok_or_else(|| {
460 std::io::Error::new(
461 ErrorKind::InvalidInput,
462 "Cannot shutdown a writer that has already been shut down",
463 )
464 })?;
465 self.state = BufWriterState::Flush(
466 async move {
467 upload.finish().await?;
468 Ok(())
469 }
470 .boxed(),
471 )
472 }
473 }
474 }
475 }
476}
477
478#[inline]
482fn checked_add_signed(a: u64, rhs: i64) -> Option<u64> {
483 let (res, overflowed) = a.overflowing_add(rhs as _);
484 let overflow = overflowed ^ (rhs < 0);
485 (!overflow).then_some(res)
486}
487
488#[cfg(test)]
489mod tests {
490 use super::*;
491 use crate::memory::InMemory;
492 use crate::path::Path;
493 use crate::{Attribute, GetOptions, ObjectStoreExt};
494 use itertools::Itertools;
495 use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
496
497 #[tokio::test]
498 async fn test_buf_reader() {
499 let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
500
501 let existent = Path::from("exists.txt");
502 const BYTES: usize = 4096;
503
504 let data: Bytes = b"12345678".iter().cycle().copied().take(BYTES).collect();
505 store.put(&existent, data.clone().into()).await.unwrap();
506
507 let meta = store.head(&existent).await.unwrap();
508
509 let mut reader = BufReader::new(Arc::clone(&store), &meta);
510 let mut out = Vec::with_capacity(BYTES);
511 let read = reader.read_to_end(&mut out).await.unwrap();
512
513 assert_eq!(read, BYTES);
514 assert_eq!(&out, &data);
515
516 let err = reader.seek(SeekFrom::Current(i64::MIN)).await.unwrap_err();
517 assert_eq!(
518 err.to_string(),
519 "Seeking -9223372036854775808 from current offset of 4096 would result in overflow"
520 );
521
522 reader.rewind().await.unwrap();
523
524 let err = reader.seek(SeekFrom::Current(-1)).await.unwrap_err();
525 assert_eq!(
526 err.to_string(),
527 "Seeking -1 from current offset of 0 would result in overflow"
528 );
529
530 reader.seek(SeekFrom::Start(u64::MAX)).await.unwrap();
532 let buf = reader.fill_buf().await.unwrap();
533 assert!(buf.is_empty());
534
535 let err = reader.seek(SeekFrom::Current(1)).await.unwrap_err();
536 assert_eq!(
537 err.to_string(),
538 "Seeking 1 from current offset of 18446744073709551615 would result in overflow"
539 );
540
541 for capacity in [200, 1024, 4096, DEFAULT_BUFFER_SIZE] {
542 let store = Arc::clone(&store);
543 let mut reader = BufReader::with_capacity(store, &meta, capacity);
544
545 let mut bytes_read = 0;
546 loop {
547 let buf = reader.fill_buf().await.unwrap();
548 if buf.is_empty() {
549 assert_eq!(bytes_read, BYTES);
550 break;
551 }
552 assert!(buf.starts_with(b"12345678"));
553 bytes_read += 8;
554 reader.consume(8);
555 }
556
557 let mut buf = Vec::with_capacity(76);
558 reader.seek(SeekFrom::Current(-76)).await.unwrap();
559 reader.read_to_end(&mut buf).await.unwrap();
560 assert_eq!(&buf, &data[BYTES - 76..]);
561
562 reader.rewind().await.unwrap();
563 let buffer = reader.fill_buf().await.unwrap();
564 assert_eq!(buffer, &data[..capacity.min(BYTES)]);
565
566 reader.seek(SeekFrom::Start(325)).await.unwrap();
567 let buffer = reader.fill_buf().await.unwrap();
568 assert_eq!(buffer, &data[325..(325 + capacity).min(BYTES)]);
569
570 reader.seek(SeekFrom::End(0)).await.unwrap();
571 let buffer = reader.fill_buf().await.unwrap();
572 assert!(buffer.is_empty());
573 }
574 }
575
576 #[tokio::test]
578 async fn test_buf_writer() {
579 let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
580 let path = Path::from("file.txt");
581 let attributes = Attributes::from_iter([
582 (Attribute::ContentType, "text/html"),
583 (Attribute::CacheControl, "max-age=604800"),
584 ]);
585
586 let mut writer = BufWriter::with_capacity(Arc::clone(&store), path.clone(), 30)
588 .with_attributes(attributes.clone());
589 writer.write_all(&[0; 20]).await.unwrap();
590 writer.flush().await.unwrap();
591 writer.write_all(&[0; 5]).await.unwrap();
592 writer.shutdown().await.unwrap();
593 let response = store
594 .get_opts(&path, GetOptions::new().with_head(true))
595 .await
596 .unwrap();
597 assert_eq!(response.meta.size, 25);
598 assert_eq!(response.attributes, attributes);
599
600 let mut writer = BufWriter::with_capacity(Arc::clone(&store), path.clone(), 30)
602 .with_attributes(attributes.clone());
603 writer.write_all(&[0; 20]).await.unwrap();
604 writer.flush().await.unwrap();
605 writer.write_all(&[0; 20]).await.unwrap();
606 writer.shutdown().await.unwrap();
607 let response = store
608 .get_opts(&path, GetOptions::new().with_head(true))
609 .await
610 .unwrap();
611 assert_eq!(response.meta.size, 40);
612 assert_eq!(response.attributes, attributes);
613 }
614
615 #[tokio::test]
616 async fn test_buf_writer_with_put() {
617 let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
618 let path = Path::from("file.txt");
619
620 let mut writer = BufWriter::with_capacity(Arc::clone(&store), path.clone(), 30);
622 writer
623 .put(Bytes::from((0..20).collect_vec()))
624 .await
625 .unwrap();
626 writer
627 .put(Bytes::from((20..25).collect_vec()))
628 .await
629 .unwrap();
630 writer.shutdown().await.unwrap();
631 let response = store
632 .get_opts(&path, GetOptions::new().with_head(true))
633 .await
634 .unwrap();
635 assert_eq!(response.meta.size, 25);
636 assert_eq!(response.bytes().await.unwrap(), (0..25).collect_vec());
637
638 let mut writer = BufWriter::with_capacity(Arc::clone(&store), path.clone(), 30);
640 writer
641 .put(Bytes::from((0..20).collect_vec()))
642 .await
643 .unwrap();
644 writer
645 .put(Bytes::from((20..40).collect_vec()))
646 .await
647 .unwrap();
648 writer.shutdown().await.unwrap();
649 let response = store
650 .get_opts(&path, GetOptions::new().with_head(true))
651 .await
652 .unwrap();
653 assert_eq!(response.meta.size, 40);
654 assert_eq!(response.bytes().await.unwrap(), (0..40).collect_vec());
655 }
656}