pubhubs/servers/phc/
user_object_store.rs1use std::rc::Rc;
3
4use anyhow::Context as _;
5use bytes::BufMut as _;
6use sha2::digest::Digest as _;
7
8use super::user::UserState;
9use crate::id::Id;
10
11use crate::api;
12use crate::handle;
13use crate::phcrypto;
14
15use super::server::*;
16use crate::api::phc::user::*;
17
18impl App {
19 pub(super) async fn handle_user_new_object(
21 app: Rc<Self>,
22 payload: bytes::Bytes,
23 path: actix_web::web::Path<(handle::Handle,)>,
24 actix_web::web::Header(auth_token): actix_web::web::Header<AuthToken>,
25 ) -> api::Result<StoreObjectResp> {
26 let (handle,) = path.into_inner();
27
28 app.handle_user_store_object(payload, handle, None, auth_token)
29 .await
30 }
31
32 pub(super) async fn handle_user_overwrite_object(
34 app: Rc<Self>,
35 payload: bytes::Bytes,
36 path: actix_web::web::Path<(handle::Handle, Id)>,
37 actix_web::web::Header(auth_token): actix_web::web::Header<AuthToken>,
38 ) -> api::Result<StoreObjectResp> {
39 let (handle, overwrite_hash) = path.into_inner();
40
41 app.handle_user_store_object(payload, handle, Some(overwrite_hash), auth_token)
42 .await
43 }
44
45 async fn handle_user_store_object(
58 &self,
59 payload: bytes::Bytes,
60 handle: handle::Handle,
61 overwrite_hash: Option<Id>,
62 auth_token: AuthToken,
63 ) -> api::Result<StoreObjectResp> {
64 let user_id = if let Ok(user_id) = self.open_auth_token(auth_token) {
65 user_id
66 } else {
67 return Ok(StoreObjectResp::RetryWithNewAuthToken);
68 };
69
70 let (mut user_state, user_state_version) = self
71 .get_object::<UserState>(&user_id)
72 .await?
73 .ok_or_else(|| {
74 log::error!(
75 "auth token refers to non- (or no longer) existing user with id {user_id}",
76 );
77 api::ErrorCode::InternalError
78 })?;
79
80 let obj = UserObject::new(payload, user_id);
81
82 if let Some(overwrite_hash) = overwrite_hash {
83 let existing_obj_details =
85 if let Some(existing_obj_details) = user_state.stored_objects.get(&handle) {
86 existing_obj_details
87 } else {
88 return Ok(StoreObjectResp::NotFound);
89 };
90
91 if existing_obj_details.id != overwrite_hash {
92 return Ok(StoreObjectResp::HashDidNotMatch);
93 }
94
95 if existing_obj_details.id == obj.object_id {
96 return Ok(StoreObjectResp::NoChanges);
97 }
98 } else {
99 if user_state.stored_objects.contains_key(&handle) {
101 return Ok(StoreObjectResp::MissingHash);
102 }
103 }
104
105 let object_details: UserObjectDetails = UserObjectDetails {
106 size: obj.payload.len() as u32,
107 id: obj.object_id,
108 };
109
110 let mut existing_object_id: Option<Id> = None;
112
113 user_state
114 .stored_objects
115 .entry(handle)
116 .and_modify(|e| existing_object_id = Some(e.id))
117 .insert_entry(object_details.clone());
118
119 let _quota = match user_state.update_quota(self.quota.clone()) {
121 Ok(quota) => quota,
122 Err(quotum_name) => {
123 return Ok(StoreObjectResp::QuotumReached(quotum_name));
124 }
125 };
126
127 if self.put_object(&obj, None).await?.is_none() {
129 log::debug!("user object {} already exists", obj.object_id);
130 }
132
133 if self
134 .put_object(&user_state, Some(user_state_version))
135 .await?
136 .is_none()
137 {
138 return Ok(StoreObjectResp::PleaseRetry);
140 }
141
142 if let Some(existing_object_id) = existing_object_id {
144 match self.delete_object::<UserObject>(existing_object_id).await {
145 Err(err) => {
146 log::warn!(
147 "failed to delete user object {existing_object_id} that is replaced by {}: {err:#}",
148 obj.object_id
149 );
150 }
151 Ok(false) => {
152 log::warn!("expected to delete {existing_object_id}, but it is already gone");
153 }
154 Ok(true) => { }
155 }
156 }
157
158 Ok(StoreObjectResp::Stored {
159 stored_objects: user_state
160 .stored_objects
161 .into_iter()
162 .map(|(handle, uod)| (handle, uod.into_user_version(&self.user_object_hmac_secret)))
163 .collect(),
164 })
165 }
166
167 pub(crate) async fn handle_user_get_object(
169 app: Rc<Self>,
170 path: actix_web::web::Path<(Id, Id)>,
171 ) -> api::Payload<api::Result<GetObjectResp>> {
172 let (hash, hmac) = path.into_inner();
173
174 if phcrypto::phc_user_object_hmac(hash, &*app.user_object_hmac_secret) != hmac {
175 return api::Payload::Json(Ok(GetObjectResp::RetryWithNewHmac));
176 }
177
178 let (obj, _) = match app.get_object::<UserObject>(&hash).await {
179 Ok(Some(obj)) => obj,
180 Ok(None) => {
181 log::debug!("user object {hash} was requested (with valid hmac), but not found");
182 return api::Payload::Json(Ok(GetObjectResp::NotFound));
183 }
184 Err(err) => {
185 return api::Payload::Json(Err(err));
186 }
187 };
188
189 if obj.object_id != hash {
190 log::error!(
191 "user object {} submitted by user {} is corrupted!",
192 hash,
193 obj.user_id
194 );
195 return api::Payload::Json(Err(api::ErrorCode::InternalError));
196 }
197
198 api::Payload::Octets(obj.payload)
199 }
200}
201
202pub(crate) struct UserObject {
204 version: u8,
206
207 user_id: Id,
209
210 payload: bytes::Bytes,
212
213 object_id: Id,
215}
216
217impl UserObject {
218 fn new(payload: bytes::Bytes, user_id: Id) -> Self {
219 let version = 0;
220 let object_id = Self::derive_id(version, user_id, &payload);
221
222 Self {
223 version,
224 user_id,
225 payload,
226 object_id,
227 }
228 }
229
230 fn derive_id(version: u8, user_id: Id, payload: &bytes::Bytes) -> Id {
231 let mut hasher = sha2::Sha256::new();
232
233 hasher.update([version]);
234 hasher.update(user_id.as_slice());
235 hasher.update(payload);
236
237 <[u8; 32]>::from(hasher.finalize()).into()
238 }
239}
240
241impl crate::servers::object_store::ObjectDetails for UserObject {
242 type Identifier = Id;
243 const PREFIX: &str = "user-obj";
244
245 fn object_id(&self) -> &Id {
246 &self.object_id
247 }
248
249 fn from_bytes(mut bytes: bytes::Bytes) -> anyhow::Result<Self> {
250 let object_id: Id = <[u8; 32]>::from(sha2::Sha256::digest(&bytes)).into();
251
252 let version: u8 = *bytes
253 .first()
254 .context("missing version number byte in stored user object")?;
255
256 if version != 0 {
257 anyhow::bail!("unsupported stored user object version, {version}");
258 }
259
260 if bytes.len() < 1 + 32 {
261 anyhow::bail!("stored user object header too small");
262 }
263
264 let payload = bytes.split_off(1 + 32);
265 let user_id = Id::from(<[u8; 32]>::try_from(&bytes[1..33]).unwrap());
266
267 Ok(Self {
268 version,
269 user_id,
270 payload,
271 object_id,
272 })
273 }
274
275 fn to_put_payload(&self) -> anyhow::Result<object_store::PutPayload> {
276 let mut header = bytes::BytesMut::with_capacity(1 + 32);
277 header.put_u8(self.version);
278 header.put(self.user_id.as_slice());
279
280 Ok(vec![header.freeze(), self.payload.clone()]
281 .into_iter()
282 .collect())
283 }
284}
285
286#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)]
288pub struct UserObjectDetails {
289 pub size: u32,
291
292 pub id: Id,
294}
295
296impl UserObjectDetails {
297 pub(crate) fn into_user_version(self, hmac_secret: &[u8]) -> api::phc::user::UserObjectDetails {
299 api::phc::user::UserObjectDetails {
300 hash: self.id,
301 hmac: phcrypto::phc_user_object_hmac(self.id, hmac_secret),
302 size: self.size,
303 }
304 }
305}