1use parking_lot::Mutex;
20use std::ops::Range;
21use std::{convert::TryInto, sync::Arc};
22
23use crate::multipart::{MultipartStore, PartId};
24use crate::{CopyOptions, GetOptions, RenameOptions, UploadPart};
25use crate::{
26 GetResult, GetResultPayload, ListResult, MultipartId, MultipartUpload, ObjectMeta, ObjectStore,
27 PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, path::Path,
28};
29use async_trait::async_trait;
30use bytes::Bytes;
31use futures_util::{FutureExt, StreamExt, stream::BoxStream};
32use std::time::Duration;
33
34#[derive(Debug, Default, Clone, Copy)]
36pub struct ThrottleConfig {
37 pub wait_delete_per_call: Duration,
45
46 pub wait_get_per_byte: Duration,
55
56 pub wait_get_per_call: Duration,
62
63 pub wait_list_per_call: Duration,
69
70 pub wait_list_per_entry: Duration,
79
80 pub wait_list_with_delimiter_per_call: Duration,
87
88 pub wait_list_with_delimiter_per_entry: Duration,
95
96 pub wait_put_per_call: Duration,
101}
102
103async fn sleep(duration: Duration) {
105 if !duration.is_zero() {
106 tokio::time::sleep(duration).await
107 }
108}
109
110#[derive(Debug)]
117pub struct ThrottledStore<T> {
118 inner: T,
119 config: Arc<Mutex<ThrottleConfig>>,
120}
121
122impl<T> ThrottledStore<T> {
123 pub fn new(inner: T, config: ThrottleConfig) -> Self {
125 Self {
126 inner,
127 config: Arc::new(Mutex::new(config)),
128 }
129 }
130
131 pub fn config_mut<F>(&self, f: F)
133 where
134 F: Fn(&mut ThrottleConfig),
135 {
136 let mut guard = self.config.lock();
137 f(&mut guard)
138 }
139
140 pub fn config(&self) -> ThrottleConfig {
142 *self.config.lock()
143 }
144}
145
146impl<T: ObjectStore> std::fmt::Display for ThrottledStore<T> {
147 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
148 write!(f, "ThrottledStore({})", self.inner)
149 }
150}
151
152#[async_trait]
153#[deny(clippy::missing_trait_methods)]
154impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {
155 async fn put_opts(
156 &self,
157 location: &Path,
158 payload: PutPayload,
159 opts: PutOptions,
160 ) -> Result<PutResult> {
161 sleep(self.config().wait_put_per_call).await;
162 self.inner.put_opts(location, payload, opts).await
163 }
164
165 async fn put_multipart_opts(
166 &self,
167 location: &Path,
168 opts: PutMultipartOptions,
169 ) -> Result<Box<dyn MultipartUpload>> {
170 let upload = self.inner.put_multipart_opts(location, opts).await?;
171 Ok(Box::new(ThrottledUpload {
172 upload,
173 sleep: self.config().wait_put_per_call,
174 }))
175 }
176
177 async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
178 sleep(self.config().wait_get_per_call).await;
179
180 let wait_get_per_byte = self.config().wait_get_per_byte;
182
183 let result = self.inner.get_opts(location, options).await?;
184 Ok(throttle_get(result, wait_get_per_byte))
185 }
186
187 async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
188 let config = self.config();
189
190 let total_bytes: u64 = ranges.iter().map(|range| range.end - range.start).sum();
191 let sleep_duration =
192 config.wait_get_per_call + config.wait_get_per_byte * total_bytes as u32;
193
194 sleep(sleep_duration).await;
195
196 self.inner.get_ranges(location, ranges).await
197 }
198
199 fn delete_stream(
200 &self,
201 locations: BoxStream<'static, Result<Path>>,
202 ) -> BoxStream<'static, Result<Path>> {
203 let wait_delete_per_call = self.config().wait_delete_per_call;
208 let locations = throttle_stream(locations, move |_| wait_delete_per_call);
209 self.inner.delete_stream(locations)
210 }
211
212 fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
213 let stream = self.inner.list(prefix);
214 let config = Arc::clone(&self.config);
215 futures_util::stream::once(async move {
216 let config = *config.lock();
217 let wait_list_per_entry = config.wait_list_per_entry;
218 sleep(config.wait_list_per_call).await;
219 throttle_stream(stream, move |_| wait_list_per_entry)
220 })
221 .flatten()
222 .boxed()
223 }
224
225 fn list_with_offset(
226 &self,
227 prefix: Option<&Path>,
228 offset: &Path,
229 ) -> BoxStream<'static, Result<ObjectMeta>> {
230 let stream = self.inner.list_with_offset(prefix, offset);
231 let config = Arc::clone(&self.config);
232 futures_util::stream::once(async move {
233 let config = *config.lock();
234 let wait_list_per_entry = config.wait_list_per_entry;
235 sleep(config.wait_list_per_call).await;
236 throttle_stream(stream, move |_| wait_list_per_entry)
237 })
238 .flatten()
239 .boxed()
240 }
241
242 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
243 sleep(self.config().wait_list_with_delimiter_per_call).await;
244
245 match self.inner.list_with_delimiter(prefix).await {
246 Ok(list_result) => {
247 let entries_len = usize_to_u32_saturate(list_result.objects.len());
248 sleep(self.config().wait_list_with_delimiter_per_entry * entries_len).await;
249 Ok(list_result)
250 }
251 Err(err) => Err(err),
252 }
253 }
254
255 async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) -> Result<()> {
256 sleep(self.config().wait_put_per_call).await;
257
258 self.inner.copy_opts(from, to, options).await
259 }
260
261 async fn rename_opts(&self, from: &Path, to: &Path, options: RenameOptions) -> Result<()> {
262 sleep(self.config().wait_put_per_call).await;
263
264 self.inner.rename_opts(from, to, options).await
265 }
266}
267
268fn usize_to_u32_saturate(x: usize) -> u32 {
270 x.try_into().unwrap_or(u32::MAX)
271}
272
273fn throttle_get(result: GetResult, wait_get_per_byte: Duration) -> GetResult {
274 #[allow(clippy::infallible_destructuring_match)]
275 let s = match result.payload {
276 GetResultPayload::Stream(s) => s,
277 #[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
278 GetResultPayload::File(_, _) => unimplemented!(),
279 };
280
281 let stream = throttle_stream(s, move |bytes| {
282 let bytes_len: u32 = usize_to_u32_saturate(bytes.len());
283 wait_get_per_byte * bytes_len
284 });
285
286 GetResult {
287 payload: GetResultPayload::Stream(stream),
288 ..result
289 }
290}
291
292fn throttle_stream<T: Send + 'static, E: Send + 'static, F>(
293 stream: BoxStream<'_, Result<T, E>>,
294 delay: F,
295) -> BoxStream<'_, Result<T, E>>
296where
297 F: Fn(&T) -> Duration + Send + Sync + 'static,
298{
299 stream
300 .then(move |result| {
301 let delay = result.as_ref().ok().map(&delay).unwrap_or_default();
302 sleep(delay).then(|_| futures_util::future::ready(result))
303 })
304 .boxed()
305}
306
307#[async_trait]
308impl<T: MultipartStore> MultipartStore for ThrottledStore<T> {
309 async fn create_multipart(&self, path: &Path) -> Result<MultipartId> {
310 self.inner.create_multipart(path).await
311 }
312
313 async fn put_part(
314 &self,
315 path: &Path,
316 id: &MultipartId,
317 part_idx: usize,
318 data: PutPayload,
319 ) -> Result<PartId> {
320 sleep(self.config().wait_put_per_call).await;
321 self.inner.put_part(path, id, part_idx, data).await
322 }
323
324 async fn complete_multipart(
325 &self,
326 path: &Path,
327 id: &MultipartId,
328 parts: Vec<PartId>,
329 ) -> Result<PutResult> {
330 self.inner.complete_multipart(path, id, parts).await
331 }
332
333 async fn abort_multipart(&self, path: &Path, id: &MultipartId) -> Result<()> {
334 self.inner.abort_multipart(path, id).await
335 }
336}
337
338#[derive(Debug)]
339struct ThrottledUpload {
340 upload: Box<dyn MultipartUpload>,
341 sleep: Duration,
342}
343
344#[async_trait]
345impl MultipartUpload for ThrottledUpload {
346 fn put_part(&mut self, data: PutPayload) -> UploadPart {
347 let duration = self.sleep;
348 let put = self.upload.put_part(data);
349 Box::pin(async move {
350 sleep(duration).await;
351 put.await
352 })
353 }
354
355 async fn complete(&mut self) -> Result<PutResult> {
356 self.upload.complete().await
357 }
358
359 async fn abort(&mut self) -> Result<()> {
360 self.upload.abort().await
361 }
362}
363
364#[cfg(test)]
365mod tests {
366 use super::*;
367 #[cfg(target_os = "linux")]
368 use crate::GetResultPayload;
369 use crate::ObjectStoreExt;
370 use crate::{integration::*, memory::InMemory};
371 use futures_util::TryStreamExt;
372 use tokio::time::Duration;
373 use tokio::time::Instant;
374
375 const WAIT_TIME: Duration = Duration::from_millis(100);
376 const ZERO: Duration = Duration::from_millis(0); macro_rules! assert_bounds {
379 ($d:expr, $lower:expr) => {
380 assert_bounds!($d, $lower, $lower + 2);
381 };
382 ($d:expr, $lower:expr, $upper:expr) => {
383 let d = $d;
384 let lower = $lower * WAIT_TIME;
385 let upper = $upper * WAIT_TIME;
386 assert!(d >= lower, "{:?} must be >= than {:?}", d, lower);
387 assert!(d < upper, "{:?} must be < than {:?}", d, upper);
388 };
389 }
390
391 #[tokio::test]
392 async fn throttle_test() {
393 let inner = InMemory::new();
394 let store = ThrottledStore::new(inner, ThrottleConfig::default());
395
396 put_get_delete_list(&store).await;
397 list_uses_directories_correctly(&store).await;
398 list_with_delimiter(&store).await;
399 rename_and_copy(&store).await;
400 copy_if_not_exists(&store).await;
401 stream_get(&store).await;
402 multipart(&store, &store).await;
403 }
404
405 #[tokio::test]
406 async fn delete_test() {
407 let inner = InMemory::new();
408 let store = ThrottledStore::new(inner, ThrottleConfig::default());
409
410 assert_bounds!(measure_delete(&store, None).await, 0);
411 assert_bounds!(measure_delete(&store, Some(0)).await, 0);
412 assert_bounds!(measure_delete(&store, Some(10)).await, 0);
413
414 store.config_mut(|cfg| cfg.wait_delete_per_call = WAIT_TIME);
415 assert_bounds!(measure_delete(&store, None).await, 1);
416 assert_bounds!(measure_delete(&store, Some(0)).await, 1);
417 assert_bounds!(measure_delete(&store, Some(10)).await, 1);
418 }
419
420 #[tokio::test]
421 #[cfg(target_os = "linux")]
423 async fn delete_stream_test() {
424 let inner = InMemory::new();
425 let store = ThrottledStore::new(inner, ThrottleConfig::default());
426
427 assert_bounds!(measure_delete_stream(&store, 0).await, 0);
428 assert_bounds!(measure_delete_stream(&store, 10).await, 0);
429
430 store.config_mut(|cfg| cfg.wait_delete_per_call = WAIT_TIME);
431 assert_bounds!(measure_delete_stream(&store, 0).await, 0);
432 assert_bounds!(measure_delete_stream(&store, 10).await, 10);
433 }
434
435 #[tokio::test]
436 #[cfg(target_os = "linux")]
438 async fn get_test() {
439 let inner = InMemory::new();
440 let store = ThrottledStore::new(inner, ThrottleConfig::default());
441
442 assert_bounds!(measure_get(&store, None).await, 0);
443 assert_bounds!(measure_get(&store, Some(0)).await, 0);
444 assert_bounds!(measure_get(&store, Some(10)).await, 0);
445
446 store.config_mut(|cfg| cfg.wait_get_per_call = WAIT_TIME);
447 assert_bounds!(measure_get(&store, None).await, 1);
448 assert_bounds!(measure_get(&store, Some(0)).await, 1);
449 assert_bounds!(measure_get(&store, Some(10)).await, 1);
450
451 store.config_mut(|cfg| {
452 cfg.wait_get_per_call = ZERO;
453 cfg.wait_get_per_byte = WAIT_TIME;
454 });
455 assert_bounds!(measure_get(&store, Some(2)).await, 2);
456
457 store.config_mut(|cfg| {
458 cfg.wait_get_per_call = WAIT_TIME;
459 cfg.wait_get_per_byte = WAIT_TIME;
460 });
461 assert_bounds!(measure_get(&store, Some(2)).await, 3);
462 }
463
464 #[tokio::test]
465 #[cfg(target_os = "linux")]
467 async fn list_test() {
468 let inner = InMemory::new();
469 let store = ThrottledStore::new(inner, ThrottleConfig::default());
470
471 assert_bounds!(measure_list(&store, 0).await, 0);
472 assert_bounds!(measure_list(&store, 10).await, 0);
473
474 store.config_mut(|cfg| cfg.wait_list_per_call = WAIT_TIME);
475 assert_bounds!(measure_list(&store, 0).await, 1);
476 assert_bounds!(measure_list(&store, 10).await, 1);
477
478 store.config_mut(|cfg| {
479 cfg.wait_list_per_call = ZERO;
480 cfg.wait_list_per_entry = WAIT_TIME;
481 });
482 assert_bounds!(measure_list(&store, 2).await, 2);
483
484 store.config_mut(|cfg| {
485 cfg.wait_list_per_call = WAIT_TIME;
486 cfg.wait_list_per_entry = WAIT_TIME;
487 });
488 assert_bounds!(measure_list(&store, 2).await, 3);
489 }
490
491 #[tokio::test]
492 #[cfg(target_os = "linux")]
494 async fn list_with_delimiter_test() {
495 let inner = InMemory::new();
496 let store = ThrottledStore::new(inner, ThrottleConfig::default());
497
498 assert_bounds!(measure_list_with_delimiter(&store, 0).await, 0);
499 assert_bounds!(measure_list_with_delimiter(&store, 10).await, 0);
500
501 store.config_mut(|cfg| cfg.wait_list_with_delimiter_per_call = WAIT_TIME);
502 assert_bounds!(measure_list_with_delimiter(&store, 0).await, 1);
503 assert_bounds!(measure_list_with_delimiter(&store, 10).await, 1);
504
505 store.config_mut(|cfg| {
506 cfg.wait_list_with_delimiter_per_call = ZERO;
507 cfg.wait_list_with_delimiter_per_entry = WAIT_TIME;
508 });
509 assert_bounds!(measure_list_with_delimiter(&store, 2).await, 2);
510
511 store.config_mut(|cfg| {
512 cfg.wait_list_with_delimiter_per_call = WAIT_TIME;
513 cfg.wait_list_with_delimiter_per_entry = WAIT_TIME;
514 });
515 assert_bounds!(measure_list_with_delimiter(&store, 2).await, 3);
516 }
517
518 #[tokio::test]
519 async fn put_test() {
520 let inner = InMemory::new();
521 let store = ThrottledStore::new(inner, ThrottleConfig::default());
522
523 assert_bounds!(measure_put(&store, 0).await, 0);
524 assert_bounds!(measure_put(&store, 10).await, 0);
525
526 store.config_mut(|cfg| cfg.wait_put_per_call = WAIT_TIME);
527 assert_bounds!(measure_put(&store, 0).await, 1);
528 assert_bounds!(measure_put(&store, 10).await, 1);
529
530 store.config_mut(|cfg| cfg.wait_put_per_call = ZERO);
531 assert_bounds!(measure_put(&store, 0).await, 0);
532 }
533
534 async fn place_test_object(store: &ThrottledStore<InMemory>, n_bytes: Option<usize>) -> Path {
535 let path = Path::from("foo");
536
537 if let Some(n_bytes) = n_bytes {
538 let data: Vec<_> = std::iter::repeat_n(1u8, n_bytes).collect();
539 store.put(&path, data.into()).await.unwrap();
540 } else {
541 store.delete(&path).await.unwrap();
543 }
544
545 path
546 }
547
548 #[allow(dead_code)]
549 async fn place_test_objects(store: &ThrottledStore<InMemory>, n_entries: usize) -> Path {
550 let prefix = Path::from("foo");
551
552 let entries: Vec<_> = store.list(Some(&prefix)).try_collect().await.unwrap();
554
555 for entry in entries {
556 store.delete(&entry.location).await.unwrap();
557 }
558
559 for i in 0..n_entries {
561 let path = prefix.clone().join(i.to_string().as_str());
562 store.put(&path, "bar".into()).await.unwrap();
563 }
564
565 prefix
566 }
567
568 async fn measure_delete(store: &ThrottledStore<InMemory>, n_bytes: Option<usize>) -> Duration {
569 let path = place_test_object(store, n_bytes).await;
570
571 let t0 = Instant::now();
572 store.delete(&path).await.unwrap();
573
574 t0.elapsed()
575 }
576
577 #[allow(dead_code)]
578 async fn measure_delete_stream(store: &ThrottledStore<InMemory>, n_entries: usize) -> Duration {
579 let prefix = place_test_objects(store, n_entries).await;
580
581 let paths = store.list(Some(&prefix)).collect::<Vec<_>>().await;
583 let paths = futures_util::stream::iter(paths)
584 .map(|x| x.map(|m| m.location))
585 .boxed();
586
587 let t0 = Instant::now();
588 store
589 .delete_stream(paths)
590 .try_collect::<Vec<_>>()
591 .await
592 .unwrap();
593
594 t0.elapsed()
595 }
596
597 #[allow(dead_code)]
598 #[cfg(target_os = "linux")]
599 async fn measure_get(store: &ThrottledStore<InMemory>, n_bytes: Option<usize>) -> Duration {
600 let path = place_test_object(store, n_bytes).await;
601
602 let t0 = Instant::now();
603 let res = store.get(&path).await;
604 if n_bytes.is_some() {
605 let s = match res.unwrap().payload {
607 GetResultPayload::Stream(s) => s,
608 GetResultPayload::File(_, _) => unimplemented!(),
609 };
610
611 s.map_ok(|b| bytes::BytesMut::from(&b[..]))
612 .try_concat()
613 .await
614 .unwrap();
615 } else {
616 assert!(res.is_err());
617 }
618
619 t0.elapsed()
620 }
621
622 #[allow(dead_code)]
623 async fn measure_list(store: &ThrottledStore<InMemory>, n_entries: usize) -> Duration {
624 let prefix = place_test_objects(store, n_entries).await;
625
626 let t0 = Instant::now();
627 store
628 .list(Some(&prefix))
629 .try_collect::<Vec<_>>()
630 .await
631 .unwrap();
632
633 t0.elapsed()
634 }
635
636 #[allow(dead_code)]
637 async fn measure_list_with_delimiter(
638 store: &ThrottledStore<InMemory>,
639 n_entries: usize,
640 ) -> Duration {
641 let prefix = place_test_objects(store, n_entries).await;
642
643 let t0 = Instant::now();
644 store.list_with_delimiter(Some(&prefix)).await.unwrap();
645
646 t0.elapsed()
647 }
648
649 async fn measure_put(store: &ThrottledStore<InMemory>, n_bytes: usize) -> Duration {
650 let data: Vec<_> = std::iter::repeat_n(1u8, n_bytes).collect();
651
652 let t0 = Instant::now();
653 store.put(&Path::from("foo"), data.into()).await.unwrap();
654
655 t0.elapsed()
656 }
657}