Skip to main content

pubhubs/servers/phc/
user_object_store.rs

1//! User object store endpoints
2use std::rc::Rc;
3
4use anyhow::Context as _;
5use bytes::BufMut as _;
6use sha2::digest::Digest as _;
7
8use super::user::UserState;
9use crate::id::Id;
10
11use crate::api;
12use crate::handle;
13use crate::phcrypto;
14
15use super::server::*;
16use crate::api::phc::user::*;
17
18impl App {
19    /// Implements [`NewObjectEP`]
20    pub(super) async fn handle_user_new_object(
21        app: Rc<Self>,
22        payload: bytes::Bytes,
23        path: actix_web::web::Path<(handle::Handle,)>,
24        actix_web::web::Header(auth_token): actix_web::web::Header<AuthToken>,
25    ) -> api::Result<StoreObjectResp> {
26        let (handle,) = path.into_inner();
27
28        app.handle_user_store_object(payload, handle, None, auth_token)
29            .await
30    }
31
32    /// Implements [`api::phc::user::OverwriteObjectEP`]
33    pub(super) async fn handle_user_overwrite_object(
34        app: Rc<Self>,
35        payload: bytes::Bytes,
36        path: actix_web::web::Path<(handle::Handle, Id)>,
37        actix_web::web::Header(auth_token): actix_web::web::Header<AuthToken>,
38    ) -> api::Result<StoreObjectResp> {
39        let (handle, overwrite_hash) = path.into_inner();
40
41        app.handle_user_store_object(payload, handle, Some(overwrite_hash), auth_token)
42            .await
43    }
44
45    /// Called by [`Self::handle_user_new_object`] and [`Self::handle_user_overwrite_object`].
46    ///
47    /// # Note on the implementation
48    ///
49    /// To add a user object we:
50    ///   (1) First add the [`UserObject`] to the object store, if its not there already;
51    ///   (2) Add a reference to that object in [`UserState`]; and
52    ///   (3) Delete the old [`UserObject`] object, if there is any.
53    ///
54    /// This way, if the process fails between steps (1) and (2), the client will simply retry,
55    /// and if the process fails between steps (2) and (3) we are only left with an orphaned
56    /// object. (Which is not a big deal.)
57    async fn handle_user_store_object(
58        &self,
59        payload: bytes::Bytes,
60        handle: handle::Handle,
61        overwrite_hash: Option<Id>,
62        auth_token: AuthToken,
63    ) -> api::Result<StoreObjectResp> {
64        let user_id = if let Ok(user_id) = self.open_auth_token(auth_token) {
65            user_id
66        } else {
67            return Ok(StoreObjectResp::RetryWithNewAuthToken);
68        };
69
70        let (mut user_state, user_state_version) = self
71            .get_object::<UserState>(&user_id)
72            .await?
73            .ok_or_else(|| {
74                log::error!(
75                    "auth token refers to non- (or no longer) existing user with id {user_id}",
76                );
77                api::ErrorCode::InternalError
78            })?;
79
80        let obj = UserObject::new(payload, user_id);
81
82        if let Some(overwrite_hash) = overwrite_hash {
83            // client expects an object with `overwrite_hash` to exist; let's check this
84            let existing_obj_details =
85                if let Some(existing_obj_details) = user_state.stored_objects.get(&handle) {
86                    existing_obj_details
87                } else {
88                    return Ok(StoreObjectResp::NotFound);
89                };
90
91            if existing_obj_details.id != overwrite_hash {
92                return Ok(StoreObjectResp::HashDidNotMatch);
93            }
94
95            if existing_obj_details.id == obj.object_id {
96                return Ok(StoreObjectResp::NoChanges);
97            }
98        } else {
99            // client expects no object to exist
100            if user_state.stored_objects.contains_key(&handle) {
101                return Ok(StoreObjectResp::MissingHash);
102            }
103        }
104
105        let object_details: UserObjectDetails = UserObjectDetails {
106            size: obj.payload.len() as u32,
107            id: obj.object_id,
108        };
109
110        // modify `user_state` locally to check quotum
111        let mut existing_object_id: Option<Id> = None;
112
113        user_state
114            .stored_objects
115            .entry(handle)
116            .and_modify(|e| existing_object_id = Some(e.id))
117            .insert_entry(object_details.clone());
118
119        // check quota
120        let _quota = match user_state.update_quota(self.quota.clone()) {
121            Ok(quota) => quota,
122            Err(quotum_name) => {
123                return Ok(StoreObjectResp::QuotumReached(quotum_name));
124            }
125        };
126
127        // Ok, everything if fine; start by putting object
128        if self.put_object(&obj, None).await?.is_none() {
129            log::debug!("user object {} already exists", obj.object_id);
130            // might happen when updating `user_state` below fails
131        }
132
133        if self
134            .put_object(&user_state, Some(user_state_version))
135            .await?
136            .is_none()
137        {
138            // someone else is changing `user_state` too
139            return Ok(StoreObjectResp::PleaseRetry);
140        }
141
142        // remove previous object, if there is any
143        if let Some(existing_object_id) = existing_object_id {
144            match self.delete_object::<UserObject>(existing_object_id).await {
145                Err(err) => {
146                    log::warn!(
147                        "failed to delete user object {existing_object_id} that is replaced by {}: {err:#}",
148                        obj.object_id
149                    );
150                }
151                Ok(false) => {
152                    log::warn!("expected to delete {existing_object_id}, but it is already gone");
153                }
154                Ok(true) => { /* ok */ }
155            }
156        }
157
158        Ok(StoreObjectResp::Stored {
159            stored_objects: user_state
160                .stored_objects
161                .into_iter()
162                .map(|(handle, uod)| (handle, uod.into_user_version(&self.user_object_hmac_secret)))
163                .collect(),
164        })
165    }
166
167    /// Implements [`GetObjectEP`].
168    pub(crate) async fn handle_user_get_object(
169        app: Rc<Self>,
170        path: actix_web::web::Path<(Id, Id)>,
171    ) -> api::Payload<api::Result<GetObjectResp>> {
172        let (hash, hmac) = path.into_inner();
173
174        if phcrypto::phc_user_object_hmac(hash, &*app.user_object_hmac_secret) != hmac {
175            return api::Payload::Json(Ok(GetObjectResp::RetryWithNewHmac));
176        }
177
178        let (obj, _) = match app.get_object::<UserObject>(&hash).await {
179            Ok(Some(obj)) => obj,
180            Ok(None) => {
181                log::debug!("user object {hash} was requested (with valid hmac), but not found");
182                return api::Payload::Json(Ok(GetObjectResp::NotFound));
183            }
184            Err(err) => {
185                return api::Payload::Json(Err(err));
186            }
187        };
188
189        if obj.object_id != hash {
190            log::error!(
191                "user object {} submitted by user {} is corrupted!",
192                hash,
193                obj.user_id
194            );
195            return api::Payload::Json(Err(api::ErrorCode::InternalError));
196        }
197
198        api::Payload::Octets(obj.payload)
199    }
200}
201
202/// Represents how an object stored for a user is stored in our object store
203pub(crate) struct UserObject {
204    /// So that we can change the format in the future
205    version: u8,
206
207    /// The user that uploaded this object
208    user_id: Id,
209
210    /// Actual contents of the object
211    payload: bytes::Bytes,
212
213    /// Digest of this object - not actually stored in the object itself, but cached here
214    object_id: Id,
215}
216
217impl UserObject {
218    fn new(payload: bytes::Bytes, user_id: Id) -> Self {
219        let version = 0;
220        let object_id = Self::derive_id(version, user_id, &payload);
221
222        Self {
223            version,
224            user_id,
225            payload,
226            object_id,
227        }
228    }
229
230    fn derive_id(version: u8, user_id: Id, payload: &bytes::Bytes) -> Id {
231        let mut hasher = sha2::Sha256::new();
232
233        hasher.update([version]);
234        hasher.update(user_id.as_slice());
235        hasher.update(payload);
236
237        <[u8; 32]>::from(hasher.finalize()).into()
238    }
239}
240
241impl crate::servers::object_store::ObjectDetails for UserObject {
242    type Identifier = Id;
243    const PREFIX: &str = "user-obj";
244
245    fn object_id(&self) -> &Id {
246        &self.object_id
247    }
248
249    fn from_bytes(mut bytes: bytes::Bytes) -> anyhow::Result<Self> {
250        let object_id: Id = <[u8; 32]>::from(sha2::Sha256::digest(&bytes)).into();
251
252        let version: u8 = *bytes
253            .first()
254            .context("missing version number byte in stored user object")?;
255
256        if version != 0 {
257            anyhow::bail!("unsupported stored user object version, {version}");
258        }
259
260        if bytes.len() < 1 + 32 {
261            anyhow::bail!("stored user object header too small");
262        }
263
264        let payload = bytes.split_off(1 + 32);
265        let user_id = Id::from(<[u8; 32]>::try_from(&bytes[1..33]).unwrap());
266
267        Ok(Self {
268            version,
269            user_id,
270            payload,
271            object_id,
272        })
273    }
274
275    fn to_put_payload(&self) -> anyhow::Result<object_store::PutPayload> {
276        let mut header = bytes::BytesMut::with_capacity(1 + 32);
277        header.put_u8(self.version);
278        header.put(self.user_id.as_slice());
279
280        Ok(vec![header.freeze(), self.payload.clone()]
281            .into_iter()
282            .collect())
283    }
284}
285
286/// Details contained in [`UserState`] about an object stored by a user at pubhubs central
287#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)]
288pub struct UserObjectDetails {
289    /// To make sure a user does not exceed their quotum
290    pub size: u32,
291
292    /// The sha256 digest of the stored object
293    pub id: Id,
294}
295
296impl UserObjectDetails {
297    /// Turns this [`UserObjectDetails`] into a [`api::phc::user::UserObjectDetails`].
298    pub(crate) fn into_user_version(self, hmac_secret: &[u8]) -> api::phc::user::UserObjectDetails {
299        api::phc::user::UserObjectDetails {
300            hash: self.id,
301            hmac: phcrypto::phc_user_object_hmac(self.id, hmac_secret),
302            size: self.size,
303        }
304    }
305}