pubhubs/servers/
object_store.rs1use std::borrow::Cow;
3
4use anyhow::Context as _;
5use object_store::{ObjectStore as _, ObjectStoreExt as _};
6
7use crate::api;
8use crate::id::Id;
9use crate::servers;
10
11use crate::servers::config::ObjectStoreConfig;
12
13pub struct UseNone;
15
16impl<'a> TryFrom<&'a Option<ObjectStoreConfig>> for UseNone {
17 type Error = anyhow::Error;
18
19 fn try_from(c: &'a Option<ObjectStoreConfig>) -> anyhow::Result<Self> {
20 if c.is_none() {
21 return Ok(UseNone);
22 }
23
24 anyhow::bail!("Object store configured, but this server does not use one.");
25 }
26}
27
28pub trait AsObjectStore {
30 type ObjectStoreT: object_store::ObjectStore + ?Sized;
31
32 fn as_object_store(&self) -> &Self::ObjectStoreT;
33}
34
35pub struct DefaultObjectStore(Box<object_store::DynObjectStore>);
37
38impl AsObjectStore for DefaultObjectStore {
39 type ObjectStoreT = object_store::DynObjectStore;
40
41 fn as_object_store(&self) -> &Self::ObjectStoreT {
42 &self.0
43 }
44}
45
46impl std::ops::Deref for DefaultObjectStore {
47 type Target = object_store::DynObjectStore;
48
49 fn deref(&self) -> &Self::Target {
50 &*self.0
51 }
52}
53
54impl<'a> TryFrom<&'a Option<ObjectStoreConfig>> for DefaultObjectStore {
55 type Error = anyhow::Error;
56
57 fn try_from(c_maybe: &'a Option<ObjectStoreConfig>) -> anyhow::Result<Self> {
58 let c: Cow<'a, ObjectStoreConfig> = match c_maybe {
61 None => Cow::<'a, ObjectStoreConfig>::Owned(Default::default()),
62 Some(c) => Cow::<'a, ObjectStoreConfig>::Borrowed(c),
63 };
64
65 let (os, path) = object_store::parse_url_opts(c.url.as_ref(), c.options.iter())
66 .with_context(|| {
67 format!(
68 "creating object store from url {} and options {}",
69 c.url,
70 serde_json::to_string(&c.options)
71 .context("error while formatting error")
72 .unwrap_or("<failed to format>".to_string())
73 )
74 })?;
75
76 Ok(Self(Box::new(object_store::prefix::PrefixStore::new(
77 os, path,
78 ))))
79 }
80}
81
82pub trait ObjectDetails: std::marker::Sized {
86 type Identifier: std::fmt::Display;
87
88 const PREFIX: &'static str;
89
90 fn object_id(&self) -> &Self::Identifier;
91
92 fn path_for(id: &Self::Identifier) -> object_store::path::Path {
93 std::format!("{}/{id}", Self::PREFIX).into()
94 }
95
96 fn from_bytes(bytes: bytes::Bytes) -> anyhow::Result<Self>;
97
98 fn to_put_payload(&self) -> anyhow::Result<object_store::PutPayload>;
100}
101
102pub trait JsonObjectDetails: serde::Serialize + serde::de::DeserializeOwned {
104 type Identifier: std::fmt::Display;
105
106 const PREFIX: &'static str;
107
108 fn object_id(&self) -> &Self::Identifier;
109}
110
111impl<T: JsonObjectDetails> ObjectDetails for T {
112 type Identifier = <T as JsonObjectDetails>::Identifier;
113
114 const PREFIX: &str = <T as JsonObjectDetails>::PREFIX;
115
116 fn object_id(&self) -> &Self::Identifier {
117 <T as JsonObjectDetails>::object_id(self)
118 }
119
120 fn from_bytes(bytes: bytes::Bytes) -> anyhow::Result<Self> {
121 Ok(serde_json::from_slice(&bytes)?)
122 }
123
124 fn to_put_payload(&self) -> anyhow::Result<object_store::PutPayload> {
125 Ok(object_store::PutPayload::from_bytes(
126 serde_json::to_vec(&self)?.into(),
127 ))
128 }
129}
130
131impl<S> crate::servers::AppBase<S>
132where
133 S::ObjectStoreT: AsObjectStore,
134 S: servers::Server,
135{
136 pub async fn get_object<T>(
139 &self,
140 id: &T::Identifier,
141 ) -> api::Result<Option<(T, object_store::UpdateVersion)>>
142 where
143 T: ObjectDetails,
144 {
145 let os = self.shared.object_store.as_object_store();
146
147 let path = T::path_for(id);
148
149 log::debug!("getting {path}");
150
151 match os.get(&path).await {
152 Ok(get_result) => {
153 let version = object_store::UpdateVersion {
154 e_tag: get_result.meta.e_tag.clone(),
155 version: get_result.meta.version.clone(),
156 };
157
158 let bytes: bytes::Bytes = get_result.bytes().await.map_err(|err| {
159 log::error!(
160 "{}'s object store: unexpected error getting body of {path}: {err:#}",
161 S::NAME
162 );
163 api::ErrorCode::InternalError
164 })?;
165
166 log::debug!("got {path}");
167
168 Ok(Some((
169 T::from_bytes(bytes).map_err(|err| {
170 log::error!(
171 "{}'s object store: unexpected error parsing object at {path}: {err:#}",
172 S::NAME
173 );
174 api::ErrorCode::InternalError
175 })?,
176 version,
177 )))
178 }
179 Err(object_store::Error::NotFound { .. }) => {
180 log::debug!("did not get {path}: not found");
181 Ok(None)
182 }
183 Err(err) => Err({
185 log::error!(
186 "{}'s object store: unexpected error getting {path}: {err:#}",
187 S::NAME
188 );
189 api::ErrorCode::InternalError
190 }),
191 }
192 }
193
194 pub async fn put_object<T>(
202 &self,
203 obj: &T,
204 update: Option<object_store::UpdateVersion>,
205 ) -> api::Result<Option<object_store::UpdateVersion>>
206 where
207 T: ObjectDetails,
208 {
209 let os = self.shared.object_store.as_object_store();
210
211 let path = T::path_for(obj.object_id());
212
213 log::debug!("putting {path}");
214
215 let put_payload: object_store::PutPayload = obj.to_put_payload().map_err(|err| {
216 log::error!(
217 "{}'s object store: unexpected error encoding object to be put at {path}: {err:#}",
218 S::NAME
219 );
220 api::ErrorCode::InternalError
221 })?;
222
223 match os
224 .put_opts(
225 &path,
226 put_payload,
227 object_store::PutOptions {
228 mode: if let Some(ref version) = update {
229 object_store::PutMode::Update(version.clone())
230 } else {
231 object_store::PutMode::Create
232 },
233 tags: Default::default(),
234 attributes: Default::default(),
235 extensions: Default::default(),
236 },
237 )
238 .await
239 {
240 Ok(put_result) => {
241 log::debug!("putting {path} succeeded");
242
243 Ok(Some(object_store::UpdateVersion {
244 e_tag: put_result.e_tag,
245 version: put_result.version,
246 }))
247 }
248 Err(object_store::Error::Precondition { .. }) => {
249 if update.is_some() {
250 return Ok(None);
251 }
252 log::error!("object store create put mode caused unexpected 'precondition' error");
253 Err(api::ErrorCode::InternalError)
254 }
255 Err(object_store::Error::AlreadyExists { .. }) => {
256 if update.is_none() {
257 return Ok(None);
258 }
259 log::error!(
260 "object store update put mode caused unexpected 'already exists' error"
261 );
262 Err(api::ErrorCode::InternalError)
263 }
264 Err(err) => Err({
265 log::error!(
266 "{}'s object store: unexpected error putting {path}: {err:#}",
267 S::NAME
268 );
269 api::ErrorCode::InternalError
270 }),
271 }
272 }
273
274 pub async fn delete_object<T>(&self, id: T::Identifier) -> api::Result<bool>
277 where
278 T: ObjectDetails,
279 {
280 let os = self.shared.object_store.as_object_store();
281
282 let path = T::path_for(&id);
283
284 log::debug!("deleting {path}");
285
286 match os.delete(&path).await {
287 Ok(()) => {
288 log::debug!("deleted {path}");
289 Ok(true)
290 }
291 Err(object_store::Error::NotFound { .. }) => {
292 log::info!("deleting {path} failed: not found");
293 Ok(false)
294 }
295 Err(err) => Err({
296 log::error!(
297 "{}'s object store: failed to delete {path}: {err:#}",
298 S::NAME
299 );
300 api::ErrorCode::InternalError
301 }),
302 }
303 }
304}
305
306impl JsonObjectDetails for crate::attr::AttrState {
307 type Identifier = Id;
308 const PREFIX: &str = "attr";
309
310 fn object_id(&self) -> &Id {
311 &self.attr
312 }
313}
314
315impl JsonObjectDetails for crate::servers::phc::UserState {
316 type Identifier = Id;
317 const PREFIX: &str = "user";
318
319 fn object_id(&self) -> &Id {
320 &self.id
321 }
322}