Skip to main content

pubhubs/servers/
object_store.rs

1//! Storage backend for pubhubs servers
2use std::borrow::Cow;
3
4use anyhow::Context as _;
5use object_store::{ObjectStore as _, ObjectStoreExt as _};
6
7use crate::api;
8use crate::id::Id;
9use crate::servers;
10
11use crate::servers::config::ObjectStoreConfig;
12
13/// Don't use an object store.
14pub struct UseNone;
15
16impl<'a> TryFrom<&'a Option<ObjectStoreConfig>> for UseNone {
17    type Error = anyhow::Error;
18
19    fn try_from(c: &'a Option<ObjectStoreConfig>) -> anyhow::Result<Self> {
20        if c.is_none() {
21            return Ok(UseNone);
22        }
23
24        anyhow::bail!("Object store configured, but this server does not use one.");
25    }
26}
27
28/// A type that can provide an [`object_store::ObjectStore`] implementation.
29pub trait AsObjectStore {
30    type ObjectStoreT: object_store::ObjectStore + ?Sized;
31
32    fn as_object_store(&self) -> &Self::ObjectStoreT;
33}
34
35/// The default object store we use.
36pub struct DefaultObjectStore(Box<object_store::DynObjectStore>);
37
38impl AsObjectStore for DefaultObjectStore {
39    type ObjectStoreT = object_store::DynObjectStore;
40
41    fn as_object_store(&self) -> &Self::ObjectStoreT {
42        &self.0
43    }
44}
45
46impl std::ops::Deref for DefaultObjectStore {
47    type Target = object_store::DynObjectStore;
48
49    fn deref(&self) -> &Self::Target {
50        &*self.0
51    }
52}
53
54impl<'a> TryFrom<&'a Option<ObjectStoreConfig>> for DefaultObjectStore {
55    type Error = anyhow::Error;
56
57    fn try_from(c_maybe: &'a Option<ObjectStoreConfig>) -> anyhow::Result<Self> {
58        // Turn &Option<ObjectStoreConfig> into &ObjectStoreConfig,
59        // by using the default value of ObjectStoreConfig if necessary
60        let c: Cow<'a, ObjectStoreConfig> = match c_maybe {
61            None => Cow::<'a, ObjectStoreConfig>::Owned(Default::default()),
62            Some(c) => Cow::<'a, ObjectStoreConfig>::Borrowed(c),
63        };
64
65        let (os, path) = object_store::parse_url_opts(c.url.as_ref(), c.options.iter())
66            .with_context(|| {
67                format!(
68                    "creating object store from url {} and options {}",
69                    c.url,
70                    serde_json::to_string(&c.options)
71                        .context("error while formatting error")
72                        .unwrap_or("<failed to format>".to_string())
73                )
74            })?;
75
76        Ok(Self(Box::new(object_store::prefix::PrefixStore::new(
77            os, path,
78        ))))
79    }
80}
81
82/// Details on how to store this type in the object store.
83///
84/// You probably want to implement this trait via [`JsonObjectDetails`].
85pub trait ObjectDetails: std::marker::Sized {
86    type Identifier: std::fmt::Display;
87
88    const PREFIX: &'static str;
89
90    fn object_id(&self) -> &Self::Identifier;
91
92    fn path_for(id: &Self::Identifier) -> object_store::path::Path {
93        std::format!("{}/{id}", Self::PREFIX).into()
94    }
95
96    fn from_bytes(bytes: bytes::Bytes) -> anyhow::Result<Self>;
97
98    /// Turn this object into one (or more) [`bytes::Bytes`]
99    fn to_put_payload(&self) -> anyhow::Result<object_store::PutPayload>;
100}
101
102/// Default way to implement [`ObjectDetails`], via json serialization.
103pub trait JsonObjectDetails: serde::Serialize + serde::de::DeserializeOwned {
104    type Identifier: std::fmt::Display;
105
106    const PREFIX: &'static str;
107
108    fn object_id(&self) -> &Self::Identifier;
109}
110
111impl<T: JsonObjectDetails> ObjectDetails for T {
112    type Identifier = <T as JsonObjectDetails>::Identifier;
113
114    const PREFIX: &str = <T as JsonObjectDetails>::PREFIX;
115
116    fn object_id(&self) -> &Self::Identifier {
117        <T as JsonObjectDetails>::object_id(self)
118    }
119
120    fn from_bytes(bytes: bytes::Bytes) -> anyhow::Result<Self> {
121        Ok(serde_json::from_slice(&bytes)?)
122    }
123
124    fn to_put_payload(&self) -> anyhow::Result<object_store::PutPayload> {
125        Ok(object_store::PutPayload::from_bytes(
126            serde_json::to_vec(&self)?.into(),
127        ))
128    }
129}
130
131impl<S> crate::servers::AppBase<S>
132where
133    S::ObjectStoreT: AsObjectStore,
134    S: servers::Server,
135{
136    /// Tries to retrieve an object of type `T` from this server's object store with the given
137    /// `id`, returning `Ok(None)` if no such object exists.
138    pub async fn get_object<T>(
139        &self,
140        id: &T::Identifier,
141    ) -> api::Result<Option<(T, object_store::UpdateVersion)>>
142    where
143        T: ObjectDetails,
144    {
145        let os = self.shared.object_store.as_object_store();
146
147        let path = T::path_for(id);
148
149        log::debug!("getting {path}");
150
151        match os.get(&path).await {
152            Ok(get_result) => {
153                let version = object_store::UpdateVersion {
154                    e_tag: get_result.meta.e_tag.clone(),
155                    version: get_result.meta.version.clone(),
156                };
157
158                let bytes: bytes::Bytes = get_result.bytes().await.map_err(|err| {
159                    log::error!(
160                        "{}'s object store: unexpected error getting body of {path}: {err:#}",
161                        S::NAME
162                    );
163                    api::ErrorCode::InternalError
164                })?;
165
166                log::debug!("got {path}");
167
168                Ok(Some((
169                    T::from_bytes(bytes).map_err(|err| {
170                        log::error!(
171                            "{}'s object store: unexpected error parsing object at {path}: {err:#}",
172                            S::NAME
173                        );
174                        api::ErrorCode::InternalError
175                    })?,
176                    version,
177                )))
178            }
179            Err(object_store::Error::NotFound { .. }) => {
180                log::debug!("did not get {path}: not found");
181                Ok(None)
182            }
183            // TODO: deal with timeouts
184            Err(err) => Err({
185                log::error!(
186                    "{}'s object store: unexpected error getting {path}: {err:#}",
187                    S::NAME
188                );
189                api::ErrorCode::InternalError
190            }),
191        }
192    }
193
194    /// Attempts to put an object of type `T` into the object store, only overwriting the object that
195    /// is already present when the version of the to-be-overwritten object is passed via `update`.
196    ///
197    /// Returs `Ok(None)` when there is already an object present in the store with that id and
198    /// type, but its version was not specified in `update`.
199    ///
200    /// [`get_object`]: Self::get_object
201    pub async fn put_object<T>(
202        &self,
203        obj: &T,
204        update: Option<object_store::UpdateVersion>,
205    ) -> api::Result<Option<object_store::UpdateVersion>>
206    where
207        T: ObjectDetails,
208    {
209        let os = self.shared.object_store.as_object_store();
210
211        let path = T::path_for(obj.object_id());
212
213        log::debug!("putting {path}");
214
215        let put_payload: object_store::PutPayload = obj.to_put_payload().map_err(|err| {
216            log::error!(
217                "{}'s object store: unexpected error encoding object to be put at {path}: {err:#}",
218                S::NAME
219            );
220            api::ErrorCode::InternalError
221        })?;
222
223        match os
224            .put_opts(
225                &path,
226                put_payload,
227                object_store::PutOptions {
228                    mode: if let Some(ref version) = update {
229                        object_store::PutMode::Update(version.clone())
230                    } else {
231                        object_store::PutMode::Create
232                    },
233                    tags: Default::default(),
234                    attributes: Default::default(),
235                    extensions: Default::default(),
236                },
237            )
238            .await
239        {
240            Ok(put_result) => {
241                log::debug!("putting {path} succeeded");
242
243                Ok(Some(object_store::UpdateVersion {
244                    e_tag: put_result.e_tag,
245                    version: put_result.version,
246                }))
247            }
248            Err(object_store::Error::Precondition { .. }) => {
249                if update.is_some() {
250                    return Ok(None);
251                }
252                log::error!("object store create put mode caused unexpected 'precondition' error");
253                Err(api::ErrorCode::InternalError)
254            }
255            Err(object_store::Error::AlreadyExists { .. }) => {
256                if update.is_none() {
257                    return Ok(None);
258                }
259                log::error!(
260                    "object store update put mode caused unexpected 'already exists' error"
261                );
262                Err(api::ErrorCode::InternalError)
263            }
264            Err(err) => Err({
265                log::error!(
266                    "{}'s object store: unexpected error putting {path}: {err:#}",
267                    S::NAME
268                );
269                api::ErrorCode::InternalError
270            }),
271        }
272    }
273
274    /// Attempts to delete an object with the given [`Id`]; returns `true` when an object was
275    /// deleted, and false when no object with the given `id` was found.
276    pub async fn delete_object<T>(&self, id: T::Identifier) -> api::Result<bool>
277    where
278        T: ObjectDetails,
279    {
280        let os = self.shared.object_store.as_object_store();
281
282        let path = T::path_for(&id);
283
284        log::debug!("deleting {path}");
285
286        match os.delete(&path).await {
287            Ok(()) => {
288                log::debug!("deleted {path}");
289                Ok(true)
290            }
291            Err(object_store::Error::NotFound { .. }) => {
292                log::info!("deleting {path} failed: not found");
293                Ok(false)
294            }
295            Err(err) => Err({
296                log::error!(
297                    "{}'s object store: failed to delete {path}: {err:#}",
298                    S::NAME
299                );
300                api::ErrorCode::InternalError
301            }),
302        }
303    }
304}
305
306impl JsonObjectDetails for crate::attr::AttrState {
307    type Identifier = Id;
308    const PREFIX: &str = "attr";
309
310    fn object_id(&self) -> &Id {
311        &self.attr
312    }
313}
314
315impl JsonObjectDetails for crate::servers::phc::UserState {
316    type Identifier = Id;
317    const PREFIX: &str = "user";
318
319    fn object_id(&self) -> &Id {
320        &self.id
321    }
322}