Skip to main content

object_store/
throttle.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! A throttling object store wrapper
19use parking_lot::Mutex;
20use std::ops::Range;
21use std::{convert::TryInto, sync::Arc};
22
23use crate::multipart::{MultipartStore, PartId};
24use crate::{CopyOptions, GetOptions, RenameOptions, UploadPart};
25use crate::{
26    GetResult, GetResultPayload, ListResult, MultipartId, MultipartUpload, ObjectMeta, ObjectStore,
27    PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, path::Path,
28};
29use async_trait::async_trait;
30use bytes::Bytes;
31use futures_util::{FutureExt, StreamExt, stream::BoxStream};
32use std::time::Duration;
33
34/// Configuration settings for throttled store
35#[derive(Debug, Default, Clone, Copy)]
36pub struct ThrottleConfig {
37    /// Sleep duration for every call to [`delete`], or every element in [`delete_stream`].
38    ///
39    /// Sleeping is done before the underlying store is called and independently of the success of
40    /// the operation.
41    ///
42    /// [`delete`]: crate::ObjectStoreExt::delete
43    /// [`delete_stream`]: ThrottledStore::delete_stream
44    pub wait_delete_per_call: Duration,
45
46    /// Sleep duration for every byte received during [`get_opts`](ThrottledStore::get_opts).
47    ///
48    /// Sleeping is performed after the underlying store returned and only for successful gets. The
49    /// sleep duration is additive to [`wait_get_per_call`](Self::wait_get_per_call).
50    ///
51    /// Note that the per-byte sleep only happens as the user consumes the output bytes. Should
52    /// there be an intermediate failure (i.e. after partly consuming the output bytes), the
53    /// resulting sleep time will be partial as well.
54    pub wait_get_per_byte: Duration,
55
56    /// Sleep duration for every call to [`get_opts`](ThrottledStore::get_opts).
57    ///
58    /// Sleeping is done before the underlying store is called and independently of the success of
59    /// the operation. The sleep duration is additive to
60    /// [`wait_get_per_byte`](Self::wait_get_per_byte).
61    pub wait_get_per_call: Duration,
62
63    /// Sleep duration for every call to [`list`](ThrottledStore::list).
64    ///
65    /// Sleeping is done before the underlying store is called and independently of the success of
66    /// the operation. The sleep duration is additive to
67    /// [`wait_list_per_entry`](Self::wait_list_per_entry).
68    pub wait_list_per_call: Duration,
69
70    /// Sleep duration for every entry received during [`list`](ThrottledStore::list).
71    ///
72    /// Sleeping is performed after the underlying store returned and only for successful lists.
73    /// The sleep duration is additive to [`wait_list_per_call`](Self::wait_list_per_call).
74    ///
75    /// Note that the per-entry sleep only happens as the user consumes the output entries. Should
76    /// there be an intermediate failure (i.e. after partly consuming the output entries), the
77    /// resulting sleep time will be partial as well.
78    pub wait_list_per_entry: Duration,
79
80    /// Sleep duration for every call to
81    /// [`list_with_delimiter`](ThrottledStore::list_with_delimiter).
82    ///
83    /// Sleeping is done before the underlying store is called and independently of the success of
84    /// the operation. The sleep duration is additive to
85    /// [`wait_list_with_delimiter_per_entry`](Self::wait_list_with_delimiter_per_entry).
86    pub wait_list_with_delimiter_per_call: Duration,
87
88    /// Sleep duration for every entry received during
89    /// [`list_with_delimiter`](ThrottledStore::list_with_delimiter).
90    ///
91    /// Sleeping is performed after the underlying store returned and only for successful gets. The
92    /// sleep duration is additive to
93    /// [`wait_list_with_delimiter_per_call`](Self::wait_list_with_delimiter_per_call).
94    pub wait_list_with_delimiter_per_entry: Duration,
95
96    /// Sleep duration for every call to [`put_opts`](ThrottledStore::put_opts).
97    ///
98    /// Sleeping is done before the underlying store is called and independently of the success of
99    /// the operation.
100    pub wait_put_per_call: Duration,
101}
102
103/// Sleep only if non-zero duration
104async fn sleep(duration: Duration) {
105    if !duration.is_zero() {
106        tokio::time::sleep(duration).await
107    }
108}
109
110/// Store wrapper that wraps an inner store with some `sleep` calls.
111///
112/// This can be used for performance testing.
113///
114/// **Note that the behavior of the wrapper is deterministic and might not reflect real-world
115/// conditions!**
116#[derive(Debug)]
117pub struct ThrottledStore<T> {
118    inner: T,
119    config: Arc<Mutex<ThrottleConfig>>,
120}
121
122impl<T> ThrottledStore<T> {
123    /// Create new wrapper with zero waiting times.
124    pub fn new(inner: T, config: ThrottleConfig) -> Self {
125        Self {
126            inner,
127            config: Arc::new(Mutex::new(config)),
128        }
129    }
130
131    /// Mutate config.
132    pub fn config_mut<F>(&self, f: F)
133    where
134        F: Fn(&mut ThrottleConfig),
135    {
136        let mut guard = self.config.lock();
137        f(&mut guard)
138    }
139
140    /// Return copy of current config.
141    pub fn config(&self) -> ThrottleConfig {
142        *self.config.lock()
143    }
144}
145
146impl<T: ObjectStore> std::fmt::Display for ThrottledStore<T> {
147    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
148        write!(f, "ThrottledStore({})", self.inner)
149    }
150}
151
152#[async_trait]
153#[deny(clippy::missing_trait_methods)]
154impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {
155    async fn put_opts(
156        &self,
157        location: &Path,
158        payload: PutPayload,
159        opts: PutOptions,
160    ) -> Result<PutResult> {
161        sleep(self.config().wait_put_per_call).await;
162        self.inner.put_opts(location, payload, opts).await
163    }
164
165    async fn put_multipart_opts(
166        &self,
167        location: &Path,
168        opts: PutMultipartOptions,
169    ) -> Result<Box<dyn MultipartUpload>> {
170        let upload = self.inner.put_multipart_opts(location, opts).await?;
171        Ok(Box::new(ThrottledUpload {
172            upload,
173            sleep: self.config().wait_put_per_call,
174        }))
175    }
176
177    async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
178        sleep(self.config().wait_get_per_call).await;
179
180        // need to copy to avoid moving / referencing `self`
181        let wait_get_per_byte = self.config().wait_get_per_byte;
182
183        let result = self.inner.get_opts(location, options).await?;
184        Ok(throttle_get(result, wait_get_per_byte))
185    }
186
187    async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
188        let config = self.config();
189
190        let total_bytes: u64 = ranges.iter().map(|range| range.end - range.start).sum();
191        let sleep_duration =
192            config.wait_get_per_call + config.wait_get_per_byte * total_bytes as u32;
193
194        sleep(sleep_duration).await;
195
196        self.inner.get_ranges(location, ranges).await
197    }
198
199    fn delete_stream(
200        &self,
201        locations: BoxStream<'static, Result<Path>>,
202    ) -> BoxStream<'static, Result<Path>> {
203        // We wait for a certain duration before each delete location.
204        // This may be suboptimal if the inner store implements batch deletes.
205        // But there is no way around unnecessary waits since we do not know
206        // how the inner store implements `delete_stream`.
207        let wait_delete_per_call = self.config().wait_delete_per_call;
208        let locations = throttle_stream(locations, move |_| wait_delete_per_call);
209        self.inner.delete_stream(locations)
210    }
211
212    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
213        let stream = self.inner.list(prefix);
214        let config = Arc::clone(&self.config);
215        futures_util::stream::once(async move {
216            let config = *config.lock();
217            let wait_list_per_entry = config.wait_list_per_entry;
218            sleep(config.wait_list_per_call).await;
219            throttle_stream(stream, move |_| wait_list_per_entry)
220        })
221        .flatten()
222        .boxed()
223    }
224
225    fn list_with_offset(
226        &self,
227        prefix: Option<&Path>,
228        offset: &Path,
229    ) -> BoxStream<'static, Result<ObjectMeta>> {
230        let stream = self.inner.list_with_offset(prefix, offset);
231        let config = Arc::clone(&self.config);
232        futures_util::stream::once(async move {
233            let config = *config.lock();
234            let wait_list_per_entry = config.wait_list_per_entry;
235            sleep(config.wait_list_per_call).await;
236            throttle_stream(stream, move |_| wait_list_per_entry)
237        })
238        .flatten()
239        .boxed()
240    }
241
242    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
243        sleep(self.config().wait_list_with_delimiter_per_call).await;
244
245        match self.inner.list_with_delimiter(prefix).await {
246            Ok(list_result) => {
247                let entries_len = usize_to_u32_saturate(list_result.objects.len());
248                sleep(self.config().wait_list_with_delimiter_per_entry * entries_len).await;
249                Ok(list_result)
250            }
251            Err(err) => Err(err),
252        }
253    }
254
255    async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) -> Result<()> {
256        sleep(self.config().wait_put_per_call).await;
257
258        self.inner.copy_opts(from, to, options).await
259    }
260
261    async fn rename_opts(&self, from: &Path, to: &Path, options: RenameOptions) -> Result<()> {
262        sleep(self.config().wait_put_per_call).await;
263
264        self.inner.rename_opts(from, to, options).await
265    }
266}
267
268/// Saturated `usize` to `u32` cast.
269fn usize_to_u32_saturate(x: usize) -> u32 {
270    x.try_into().unwrap_or(u32::MAX)
271}
272
273fn throttle_get(result: GetResult, wait_get_per_byte: Duration) -> GetResult {
274    #[allow(clippy::infallible_destructuring_match)]
275    let s = match result.payload {
276        GetResultPayload::Stream(s) => s,
277        #[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
278        GetResultPayload::File(_, _) => unimplemented!(),
279    };
280
281    let stream = throttle_stream(s, move |bytes| {
282        let bytes_len: u32 = usize_to_u32_saturate(bytes.len());
283        wait_get_per_byte * bytes_len
284    });
285
286    GetResult {
287        payload: GetResultPayload::Stream(stream),
288        ..result
289    }
290}
291
292fn throttle_stream<T: Send + 'static, E: Send + 'static, F>(
293    stream: BoxStream<'_, Result<T, E>>,
294    delay: F,
295) -> BoxStream<'_, Result<T, E>>
296where
297    F: Fn(&T) -> Duration + Send + Sync + 'static,
298{
299    stream
300        .then(move |result| {
301            let delay = result.as_ref().ok().map(&delay).unwrap_or_default();
302            sleep(delay).then(|_| futures_util::future::ready(result))
303        })
304        .boxed()
305}
306
307#[async_trait]
308impl<T: MultipartStore> MultipartStore for ThrottledStore<T> {
309    async fn create_multipart(&self, path: &Path) -> Result<MultipartId> {
310        self.inner.create_multipart(path).await
311    }
312
313    async fn put_part(
314        &self,
315        path: &Path,
316        id: &MultipartId,
317        part_idx: usize,
318        data: PutPayload,
319    ) -> Result<PartId> {
320        sleep(self.config().wait_put_per_call).await;
321        self.inner.put_part(path, id, part_idx, data).await
322    }
323
324    async fn complete_multipart(
325        &self,
326        path: &Path,
327        id: &MultipartId,
328        parts: Vec<PartId>,
329    ) -> Result<PutResult> {
330        self.inner.complete_multipart(path, id, parts).await
331    }
332
333    async fn abort_multipart(&self, path: &Path, id: &MultipartId) -> Result<()> {
334        self.inner.abort_multipart(path, id).await
335    }
336}
337
338#[derive(Debug)]
339struct ThrottledUpload {
340    upload: Box<dyn MultipartUpload>,
341    sleep: Duration,
342}
343
344#[async_trait]
345impl MultipartUpload for ThrottledUpload {
346    fn put_part(&mut self, data: PutPayload) -> UploadPart {
347        let duration = self.sleep;
348        let put = self.upload.put_part(data);
349        Box::pin(async move {
350            sleep(duration).await;
351            put.await
352        })
353    }
354
355    async fn complete(&mut self) -> Result<PutResult> {
356        self.upload.complete().await
357    }
358
359    async fn abort(&mut self) -> Result<()> {
360        self.upload.abort().await
361    }
362}
363
364#[cfg(test)]
365mod tests {
366    use super::*;
367    #[cfg(target_os = "linux")]
368    use crate::GetResultPayload;
369    use crate::ObjectStoreExt;
370    use crate::{integration::*, memory::InMemory};
371    use futures_util::TryStreamExt;
372    use tokio::time::Duration;
373    use tokio::time::Instant;
374
375    const WAIT_TIME: Duration = Duration::from_millis(100);
376    const ZERO: Duration = Duration::from_millis(0); // Duration::default isn't constant
377
378    macro_rules! assert_bounds {
379        ($d:expr, $lower:expr) => {
380            assert_bounds!($d, $lower, $lower + 2);
381        };
382        ($d:expr, $lower:expr, $upper:expr) => {
383            let d = $d;
384            let lower = $lower * WAIT_TIME;
385            let upper = $upper * WAIT_TIME;
386            assert!(d >= lower, "{:?} must be >= than {:?}", d, lower);
387            assert!(d < upper, "{:?} must be < than {:?}", d, upper);
388        };
389    }
390
391    #[tokio::test]
392    async fn throttle_test() {
393        let inner = InMemory::new();
394        let store = ThrottledStore::new(inner, ThrottleConfig::default());
395
396        put_get_delete_list(&store).await;
397        list_uses_directories_correctly(&store).await;
398        list_with_delimiter(&store).await;
399        rename_and_copy(&store).await;
400        copy_if_not_exists(&store).await;
401        stream_get(&store).await;
402        multipart(&store, &store).await;
403    }
404
405    #[tokio::test]
406    async fn delete_test() {
407        let inner = InMemory::new();
408        let store = ThrottledStore::new(inner, ThrottleConfig::default());
409
410        assert_bounds!(measure_delete(&store, None).await, 0);
411        assert_bounds!(measure_delete(&store, Some(0)).await, 0);
412        assert_bounds!(measure_delete(&store, Some(10)).await, 0);
413
414        store.config_mut(|cfg| cfg.wait_delete_per_call = WAIT_TIME);
415        assert_bounds!(measure_delete(&store, None).await, 1);
416        assert_bounds!(measure_delete(&store, Some(0)).await, 1);
417        assert_bounds!(measure_delete(&store, Some(10)).await, 1);
418    }
419
420    #[tokio::test]
421    // macos github runner is so slow it can't complete within WAIT_TIME*2
422    #[cfg(target_os = "linux")]
423    async fn delete_stream_test() {
424        let inner = InMemory::new();
425        let store = ThrottledStore::new(inner, ThrottleConfig::default());
426
427        assert_bounds!(measure_delete_stream(&store, 0).await, 0);
428        assert_bounds!(measure_delete_stream(&store, 10).await, 0);
429
430        store.config_mut(|cfg| cfg.wait_delete_per_call = WAIT_TIME);
431        assert_bounds!(measure_delete_stream(&store, 0).await, 0);
432        assert_bounds!(measure_delete_stream(&store, 10).await, 10);
433    }
434
435    #[tokio::test]
436    // macos github runner is so slow it can't complete within WAIT_TIME*2
437    #[cfg(target_os = "linux")]
438    async fn get_test() {
439        let inner = InMemory::new();
440        let store = ThrottledStore::new(inner, ThrottleConfig::default());
441
442        assert_bounds!(measure_get(&store, None).await, 0);
443        assert_bounds!(measure_get(&store, Some(0)).await, 0);
444        assert_bounds!(measure_get(&store, Some(10)).await, 0);
445
446        store.config_mut(|cfg| cfg.wait_get_per_call = WAIT_TIME);
447        assert_bounds!(measure_get(&store, None).await, 1);
448        assert_bounds!(measure_get(&store, Some(0)).await, 1);
449        assert_bounds!(measure_get(&store, Some(10)).await, 1);
450
451        store.config_mut(|cfg| {
452            cfg.wait_get_per_call = ZERO;
453            cfg.wait_get_per_byte = WAIT_TIME;
454        });
455        assert_bounds!(measure_get(&store, Some(2)).await, 2);
456
457        store.config_mut(|cfg| {
458            cfg.wait_get_per_call = WAIT_TIME;
459            cfg.wait_get_per_byte = WAIT_TIME;
460        });
461        assert_bounds!(measure_get(&store, Some(2)).await, 3);
462    }
463
464    #[tokio::test]
465    // macos github runner is so slow it can't complete within WAIT_TIME*2
466    #[cfg(target_os = "linux")]
467    async fn list_test() {
468        let inner = InMemory::new();
469        let store = ThrottledStore::new(inner, ThrottleConfig::default());
470
471        assert_bounds!(measure_list(&store, 0).await, 0);
472        assert_bounds!(measure_list(&store, 10).await, 0);
473
474        store.config_mut(|cfg| cfg.wait_list_per_call = WAIT_TIME);
475        assert_bounds!(measure_list(&store, 0).await, 1);
476        assert_bounds!(measure_list(&store, 10).await, 1);
477
478        store.config_mut(|cfg| {
479            cfg.wait_list_per_call = ZERO;
480            cfg.wait_list_per_entry = WAIT_TIME;
481        });
482        assert_bounds!(measure_list(&store, 2).await, 2);
483
484        store.config_mut(|cfg| {
485            cfg.wait_list_per_call = WAIT_TIME;
486            cfg.wait_list_per_entry = WAIT_TIME;
487        });
488        assert_bounds!(measure_list(&store, 2).await, 3);
489    }
490
491    #[tokio::test]
492    // macos github runner is so slow it can't complete within WAIT_TIME*2
493    #[cfg(target_os = "linux")]
494    async fn list_with_delimiter_test() {
495        let inner = InMemory::new();
496        let store = ThrottledStore::new(inner, ThrottleConfig::default());
497
498        assert_bounds!(measure_list_with_delimiter(&store, 0).await, 0);
499        assert_bounds!(measure_list_with_delimiter(&store, 10).await, 0);
500
501        store.config_mut(|cfg| cfg.wait_list_with_delimiter_per_call = WAIT_TIME);
502        assert_bounds!(measure_list_with_delimiter(&store, 0).await, 1);
503        assert_bounds!(measure_list_with_delimiter(&store, 10).await, 1);
504
505        store.config_mut(|cfg| {
506            cfg.wait_list_with_delimiter_per_call = ZERO;
507            cfg.wait_list_with_delimiter_per_entry = WAIT_TIME;
508        });
509        assert_bounds!(measure_list_with_delimiter(&store, 2).await, 2);
510
511        store.config_mut(|cfg| {
512            cfg.wait_list_with_delimiter_per_call = WAIT_TIME;
513            cfg.wait_list_with_delimiter_per_entry = WAIT_TIME;
514        });
515        assert_bounds!(measure_list_with_delimiter(&store, 2).await, 3);
516    }
517
518    #[tokio::test]
519    async fn put_test() {
520        let inner = InMemory::new();
521        let store = ThrottledStore::new(inner, ThrottleConfig::default());
522
523        assert_bounds!(measure_put(&store, 0).await, 0);
524        assert_bounds!(measure_put(&store, 10).await, 0);
525
526        store.config_mut(|cfg| cfg.wait_put_per_call = WAIT_TIME);
527        assert_bounds!(measure_put(&store, 0).await, 1);
528        assert_bounds!(measure_put(&store, 10).await, 1);
529
530        store.config_mut(|cfg| cfg.wait_put_per_call = ZERO);
531        assert_bounds!(measure_put(&store, 0).await, 0);
532    }
533
534    async fn place_test_object(store: &ThrottledStore<InMemory>, n_bytes: Option<usize>) -> Path {
535        let path = Path::from("foo");
536
537        if let Some(n_bytes) = n_bytes {
538            let data: Vec<_> = std::iter::repeat_n(1u8, n_bytes).collect();
539            store.put(&path, data.into()).await.unwrap();
540        } else {
541            // ensure object is absent
542            store.delete(&path).await.unwrap();
543        }
544
545        path
546    }
547
548    #[allow(dead_code)]
549    async fn place_test_objects(store: &ThrottledStore<InMemory>, n_entries: usize) -> Path {
550        let prefix = Path::from("foo");
551
552        // clean up store
553        let entries: Vec<_> = store.list(Some(&prefix)).try_collect().await.unwrap();
554
555        for entry in entries {
556            store.delete(&entry.location).await.unwrap();
557        }
558
559        // create new entries
560        for i in 0..n_entries {
561            let path = prefix.clone().join(i.to_string().as_str());
562            store.put(&path, "bar".into()).await.unwrap();
563        }
564
565        prefix
566    }
567
568    async fn measure_delete(store: &ThrottledStore<InMemory>, n_bytes: Option<usize>) -> Duration {
569        let path = place_test_object(store, n_bytes).await;
570
571        let t0 = Instant::now();
572        store.delete(&path).await.unwrap();
573
574        t0.elapsed()
575    }
576
577    #[allow(dead_code)]
578    async fn measure_delete_stream(store: &ThrottledStore<InMemory>, n_entries: usize) -> Duration {
579        let prefix = place_test_objects(store, n_entries).await;
580
581        // materialize the paths so that the throttle time for listing is not counted
582        let paths = store.list(Some(&prefix)).collect::<Vec<_>>().await;
583        let paths = futures_util::stream::iter(paths)
584            .map(|x| x.map(|m| m.location))
585            .boxed();
586
587        let t0 = Instant::now();
588        store
589            .delete_stream(paths)
590            .try_collect::<Vec<_>>()
591            .await
592            .unwrap();
593
594        t0.elapsed()
595    }
596
597    #[allow(dead_code)]
598    #[cfg(target_os = "linux")]
599    async fn measure_get(store: &ThrottledStore<InMemory>, n_bytes: Option<usize>) -> Duration {
600        let path = place_test_object(store, n_bytes).await;
601
602        let t0 = Instant::now();
603        let res = store.get(&path).await;
604        if n_bytes.is_some() {
605            // need to consume bytes to provoke sleep times
606            let s = match res.unwrap().payload {
607                GetResultPayload::Stream(s) => s,
608                GetResultPayload::File(_, _) => unimplemented!(),
609            };
610
611            s.map_ok(|b| bytes::BytesMut::from(&b[..]))
612                .try_concat()
613                .await
614                .unwrap();
615        } else {
616            assert!(res.is_err());
617        }
618
619        t0.elapsed()
620    }
621
622    #[allow(dead_code)]
623    async fn measure_list(store: &ThrottledStore<InMemory>, n_entries: usize) -> Duration {
624        let prefix = place_test_objects(store, n_entries).await;
625
626        let t0 = Instant::now();
627        store
628            .list(Some(&prefix))
629            .try_collect::<Vec<_>>()
630            .await
631            .unwrap();
632
633        t0.elapsed()
634    }
635
636    #[allow(dead_code)]
637    async fn measure_list_with_delimiter(
638        store: &ThrottledStore<InMemory>,
639        n_entries: usize,
640    ) -> Duration {
641        let prefix = place_test_objects(store, n_entries).await;
642
643        let t0 = Instant::now();
644        store.list_with_delimiter(Some(&prefix)).await.unwrap();
645
646        t0.elapsed()
647    }
648
649    async fn measure_put(store: &ThrottledStore<InMemory>, n_bytes: usize) -> Duration {
650        let data: Vec<_> = std::iter::repeat_n(1u8, n_bytes).collect();
651
652        let t0 = Instant::now();
653        store.put(&Path::from("foo"), data.into()).await.unwrap();
654
655        t0.elapsed()
656    }
657}