1use std::cell::{Cell, RefCell};
3use std::collections::HashMap;
4use std::convert::Infallible;
5use std::ops::{Deref, DerefMut};
6use std::rc::Rc;
7
8use actix_web::web;
9use sha2::digest::Digest as _;
10
11use crate::api::{self, ApiResultExt as _, EndpointDetails as _, NoPayload};
12use crate::client;
13use crate::common::secret::DigestibleSecret as _;
14use crate::handle;
15use crate::misc::crypto;
16use crate::misc::jwt;
17use crate::misc::serde_ext;
18use crate::misc::time_ext;
19use crate::phcrypto;
20use crate::servers::{
21 self, AppBase, AppCreatorBase, Constellation, DiscoverVerdict, Handle, Server as _,
22 constellation,
23};
24
25use crate::{common::elgamal, hub};
26
27pub type Server = servers::ServerImpl<Details>;
29
30pub struct Details;
31impl servers::Details for Details {
32 const NAME: servers::Name = servers::Name::PubhubsCentral;
33 type AppT = App;
34 type AppCreatorT = AppCreator;
35 type ExtraRunningState = ExtraRunningState;
36 type ExtraSharedState = ExtraSharedState;
37 type ObjectStoreT = servers::object_store::DefaultObjectStore;
38
39 fn create_running_state(
40 server: &Server,
41 constellation: &Constellation,
42 ) -> anyhow::Result<Self::ExtraRunningState> {
43 let auths_ss = server.enc_key.shared_secret(&constellation.auths_enc_key);
44 let t_ss = server
45 .enc_key
46 .shared_secret(&constellation.transcryptor_enc_key);
47 Ok(ExtraRunningState {
48 attr_signing_key: phcrypto::attr_signing_key(&auths_ss),
49 t_sealing_secret: phcrypto::sealing_secret(&t_ss),
50 auths_sealing_secret: phcrypto::sealing_secret(&auths_ss),
51 auths_ss,
52 t_ss,
53 })
54 }
55
56 fn create_extra_shared_state(_config: &servers::Config) -> anyhow::Result<ExtraSharedState> {
57 Ok(ExtraSharedState {})
58 }
59}
60
61pub struct ExtraSharedState {}
62
63pub struct App {
64 pub base: AppBase<Server>,
65 pub transcryptor_url: url::Url,
66 pub auths_url: url::Url,
67 pub global_client_url: url::Url,
68 pub hubs: crate::map::Map<hub::BasicInfo>,
69 pub master_enc_key_part: elgamal::PrivateKey,
70 pub attr_id_secret: Box<[u8]>,
71 pub auth_token_secret: crypto::SealingKey,
72 pub auth_token_validity: core::time::Duration,
73 pub pp_nonce_secret: crypto::SealingKey,
74 pub pp_nonce_validity: core::time::Duration,
75 pub user_object_hmac_secret: Box<[u8]>,
76 pub quota: api::phc::user::Quota,
77 pub card_pseud_validity: core::time::Duration,
78
79 pub broadcast: tokio::sync::broadcast::Sender<InterAppMsg>,
81
82 pub cached_hub_info: std::cell::RefCell<api::CachedResponse<api::phc::user::CachedHubInfoEP>>,
83 pub hub_cache_config: HubCacheConfig,
84}
85
86impl Deref for App {
87 type Target = AppBase<Server>;
88
89 fn deref(&self) -> &Self::Target {
90 &self.base
91 }
92}
93
94#[derive(Clone, Debug)]
95pub struct ExtraRunningState {
96 #[expect(dead_code)]
98 pub(super) t_ss: elgamal::SharedSecret,
99
100 #[expect(dead_code)]
102 pub(super) auths_ss: elgamal::SharedSecret,
103
104 pub(super) attr_signing_key: jwt::HS256,
108
109 pub(super) t_sealing_secret: crypto::SealingKey,
111
112 #[expect(dead_code)]
114 pub(super) auths_sealing_secret: crypto::SealingKey,
115}
116
117impl crate::servers::App<Server> for App {
118 fn configure_actix_app(self: &Rc<Self>, sc: &mut web::ServiceConfig) {
119 api::phc::hub::TicketEP::add_to(self, sc, App::handle_hub_ticket);
120 api::server::HubPingEP::add_to(self, sc, App::handle_hub_ping);
121
122 api::phc::user::WelcomeEP::caching_add_to(self, sc, App::cached_handle_user_welcome);
123 api::phc::user::EnterEP::add_to(self, sc, App::handle_user_enter);
124 api::phc::user::RefreshEP::add_to(self, sc, App::handle_user_refresh);
125 api::phc::user::StateEP::add_to(self, sc, App::handle_user_state);
126
127 api::phc::user::NewObjectEP::add_to(self, sc, App::handle_user_new_object);
128 api::phc::user::OverwriteObjectEP::add_to(self, sc, App::handle_user_overwrite_object);
129 api::phc::user::GetObjectEP::add_to(self, sc, App::handle_user_get_object);
130
131 api::phc::user::PppEP::add_to(self, sc, App::handle_user_ppp);
132 api::phc::user::HhppEP::add_to(self, sc, App::handle_user_hhpp);
133
134 api::phc::user::CardPseudEP::add_to(self, sc, App::handle_user_card_pseud);
135
136 sc.app_data(web::Data::new(self.clone())).route(
138 api::phc::user::CachedHubInfoEP::PATH,
139 web::method(api::phc::user::CachedHubInfoEP::METHOD).to(App::handle_cached_hub_info),
140 );
141 }
142
143 fn check_constellation(&self, _constellation: &Constellation) -> bool {
144 panic!("PHC creates the constellation; it has no need to check it")
145 }
146
147 async fn discover(
148 self: &Rc<Self>,
149 _phc_di: api::DiscoveryInfoResp,
150 ) -> api::Result<DiscoverVerdict> {
151 let (tdi_res, asdi_res) = tokio::join!(
152 self.discovery_info_of(servers::Name::Transcryptor, &self.transcryptor_url),
153 self.discovery_info_of(servers::Name::AuthenticationServer, &self.auths_url)
154 );
155
156 let tdi = tdi_res?;
157 let asdi = asdi_res?;
158
159 for (odi, other_server_name) in [
160 (&tdi, servers::Name::Transcryptor),
161 (&asdi, servers::Name::AuthenticationServer),
162 ] {
163 if let Some(ref other_version) = odi.version
164 && let Some(my_version) = &self.version
165 {
166 let other_version = crate::servers::version::to_semver(other_version).map_err(|err| {
167 log::error!(
168 "{my_server_name}: could not parse semantic version returned by {other_server_name}: {other_version}: {err}",
169 my_server_name = Server::NAME
170 );
171 api::ErrorCode::InternalError
172 })?;
173
174 let my_version = crate::servers::version::to_semver(my_version).map_err(|err| {
175 log::error!(
176 "{my_server_name}: could not parse my semantic version {my_version}: {err}",
177 my_server_name = Server::NAME
178 );
179 api::ErrorCode::InternalError
180 })?;
181
182 if my_version < other_version {
183 log::warn!(
184 "{my_server_name}: {other_server_name}'s version ({other_version}) > my version ({my_version})",
185 my_server_name = Server::NAME,
186 );
187 return Ok(DiscoverVerdict::BinaryOutdated);
188 }
189 } else {
190 log::warn!(
191 "{my_server_name}: not checking my version ({my_version}) against {other_server_name}'s version ({other_version})",
192 my_server_name = Server::NAME,
193 my_version = crate::servers::version::VERSION,
194 other_version = odi.version.as_deref().unwrap_or("n/a")
195 );
196 }
197 }
198
199 let transcryptor_master_enc_key_part = tdi
200 .master_enc_key_part
201 .expect("should already have been checked to be some by discovery_info_of");
202 let new_constellation_inner = constellation::Inner {
203 master_enc_key: phcrypto::combine_master_enc_key_parts(
205 &transcryptor_master_enc_key_part,
206 &self.master_enc_key_part,
207 ),
208 transcryptor_master_enc_key_part,
209 global_client_url: self.global_client_url.clone(),
210 phc_url: self.phc_url.clone(),
211 phc_jwt_key: self.jwt_key.verifying_key().into(),
212 phc_enc_key: self.enc_key.public_key().clone(),
213 transcryptor_url: self.transcryptor_url.clone(),
214 transcryptor_jwt_key: tdi.jwt_key,
215 transcryptor_enc_key: tdi.enc_key,
216 auths_url: self.auths_url.clone(),
217 auths_jwt_key: asdi.jwt_key,
218 auths_enc_key: asdi.enc_key,
219 ph_version: self.version.clone(),
220 };
221
222 if self.running_state.is_none()
223 || self.running_state.as_ref().unwrap().constellation.inner != new_constellation_inner
224 {
225 let new_constellation_id = constellation::Inner::derive_id(&new_constellation_inner);
226
227 if let Some(ref running_state) = self.running_state {
228 log::info!(
229 "Detected change in constellation {} -> {}",
230 running_state.constellation.id,
231 new_constellation_id
232 );
233 } else {
234 log::info!("Computed constellation {new_constellation_id}");
235 }
236
237 return Ok(DiscoverVerdict::ConstellationOutdated {
238 new_constellation: Box::new(Constellation {
239 id: new_constellation_id,
240 created_at: api::NumericDate::now(),
241 inner: new_constellation_inner,
242 }),
243 });
244 }
245
246 let constellation = &self.running_state.as_ref().unwrap().constellation;
247
248 log::info!("My own constellation is up-to-date");
249
250 let mut js = tokio::task::JoinSet::new();
253
254 if tdi
255 .constellation_or_id
256 .as_ref()
257 .is_some_and(|c| *c.id() != constellation.id)
258 {
259 log::info!(
261 "{phc}: {t}'s constellation is out of date - invoking its discovery..",
262 phc = servers::Name::PubhubsCentral,
263 t = servers::Name::Transcryptor
264 );
265 let url = self.transcryptor_url.clone();
266 js.spawn_local(
267 self.client
268 .query::<api::DiscoveryRun>(&url, NoPayload)
269 .into_future(),
270 );
271 }
272
273 if asdi
274 .constellation_or_id
275 .as_ref()
276 .is_some_and(|c| *c.id() != constellation.id)
277 {
278 log::info!(
280 "{phc}: {auths}'s constellation is out of date - invoking its discovery..",
281 phc = servers::Name::PubhubsCentral,
282 auths = servers::Name::AuthenticationServer
283 );
284 let url = self.auths_url.clone();
285 js.spawn_local(
286 self.client
287 .query::<api::DiscoveryRun>(&url, NoPayload)
288 .into_future(),
289 );
290 }
291
292 let result_maybe = js.join_next().await;
293
294 js.detach_all();
297
298 match result_maybe {
299 None => {
301 if tdi.constellation_or_id.is_some() && asdi.constellation_or_id.is_some() {
302 log::info!("Constellation of all servers up to date!");
303 Ok(DiscoverVerdict::Alright)
304 } else {
305 log::info!("Waiting for the other servers to update their constellation.");
306 Err(api::ErrorCode::PleaseRetry)
307 }
308 }
309 Some(Err(join_err)) => {
311 log::error!("discovery run task joined unexpectedly: {join_err}");
312 Err(api::ErrorCode::InternalError)
313 }
314 Some(Ok(res)) => {
316 match res.retryable() {
317 Ok(_) => {
318 Err(api::ErrorCode::PleaseRetry)
322 }
323 Err(err) => {
324 log::error!("Failed to run discovery of other server: {err}",);
325 Err(api::ErrorCode::InternalError)
326 }
327 }
328 }
329 }
330 }
331
332 fn master_enc_key_part(&self) -> Option<&elgamal::PrivateKey> {
333 Some(&self.master_enc_key_part)
334 }
335
336 async fn local_task(weak: std::rc::Weak<Self>) {
337 use tokio::sync::broadcast::error::RecvError;
338
339 let mut receiver: tokio::sync::broadcast::Receiver<InterAppMsg>;
340
341 {
342 let Some(app) = weak.upgrade() else {
343 log::debug!("App is gone before local task started");
344 return;
345 };
346
347 receiver = app.broadcast.subscribe();
348 }
349
350 loop {
351 let recv_result = receiver.recv().await;
352 let Ok(msg) = recv_result else {
353 match recv_result.unwrap_err() {
354 RecvError::Closed => {
355 return;
356 }
357 RecvError::Lagged(skipped) => {
358 log::error!(
359 "PHC local task on {:?} is lagging behind, \
360 and has skipped processing {skipped} messages!",
361 std::thread::current().id()
362 );
363 continue;
364 }
365 }
366 };
367
368 let Some(app) = weak.upgrade() else {
369 log::warn!("Inter app message dropped because app is gone");
370 return;
371 };
372
373 match msg {
374 InterAppMsg::UpdatedHubInfo(cached_hub_info) => {
375 app.cached_hub_info.replace(cached_hub_info);
376 }
377 }
378 }
379 }
380
381 async fn global_task(app: Rc<Self>) -> anyhow::Result<Infallible> {
382 let localset = tokio::task::LocalSet::new();
383 let _hcu = HubCacheUpdater::new(app, &localset);
384
385 localset.await;
386
387 log::error!("bug: PHC global task exits prematurely");
388 anyhow::bail!("bug: PHC global task exits prematurely")
389 }
390}
391
392#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)]
394pub struct HubCacheConfig {
395 #[serde(with = "time_ext::human_duration")]
397 #[serde(default = "default_hub_cache_request_interval")]
398 request_interval: core::time::Duration,
399
400 #[serde(with = "time_ext::human_duration")]
402 #[serde(default = "default_hub_cache_request_timeout")]
403 request_timeout: core::time::Duration,
404
405 #[serde(with = "time_ext::human_duration")]
407 #[serde(default = "default_hub_cache_push_interval")]
408 push_interval: core::time::Duration,
409}
410
411fn default_hub_cache_request_interval() -> core::time::Duration {
412 core::time::Duration::from_secs(60)
413}
414
415fn default_hub_cache_request_timeout() -> core::time::Duration {
416 core::time::Duration::from_secs(10)
417}
418
419fn default_hub_cache_push_interval() -> core::time::Duration {
420 core::time::Duration::from_secs(5)
421}
422
423impl Default for HubCacheConfig {
424 fn default() -> Self {
425 serde_ext::default_object()
426 }
427}
428
429struct HubCacheUpdater {
430 app: Rc<App>,
431 hub_info: RefCell<HashMap<handle::Handle, Option<api::hub::InfoResp>>>,
432 unpublished_updates: Cell<bool>,
433}
434
435impl HubCacheUpdater {
436 fn new(app: Rc<App>, localset: &tokio::task::LocalSet) -> Rc<Self> {
437 let hcu = Rc::new(Self {
438 app: app.clone(),
439 hub_info: RefCell::new(Default::default()),
440 unpublished_updates: Cell::new(false),
441 });
442
443 for basic_hub_info in app.hubs.values() {
444 localset.spawn_local(hcu.clone().handle_hub(basic_hub_info.clone()));
445 }
446
447 localset.spawn_local(hcu.clone().push_updates());
448
449 hcu
450 }
451
452 async fn handle_hub(self: Rc<Self>, basic_hub_info: hub::BasicInfo) {
453 let hub_handle = basic_hub_info.handles.preferred();
454
455 self.hub_info.borrow_mut().insert(hub_handle.clone(), None);
456
457 let mut interval = tokio::time::interval(self.app.hub_cache_config.request_interval);
458 let mut failure_since: Option<std::time::SystemTime> = None;
459
460 loop {
461 interval.tick().await;
462
463 let hir = self
464 .app
465 .client
466 .query::<api::hub::InfoEP>(&basic_hub_info.url, api::NoPayload)
467 .quiet()
468 .timeout(self.app.hub_cache_config.request_timeout)
469 .await;
470
471 let Ok(hi) = hir else {
472 if failure_since.is_none() {
473 log::warn!("hub {hub_handle} not reachable");
474 failure_since = Some(std::time::SystemTime::now());
475 }
476
477 continue;
478 };
479
480 if let Some(time) = failure_since.take() {
481 log::info!(
482 "hub {hub_handle} is reachable again; it was unreachable since {}.)",
483 time_ext::format_time(time)
484 );
485 }
486
487 use std::collections::hash_map::Entry;
488
489 {
490 let mut hub_info = self.hub_info.borrow_mut();
491
492 let Entry::Occupied(mut oe) = hub_info.entry(hub_handle.clone()) else {
493 panic!("bug: hub info cache entry for hub {hub_handle} disappeared");
494 };
495
496 if oe.get().as_ref() == Some(&hi) {
497 continue;
498 }
499
500 oe.insert(Some(hi));
501 }
502
503 if !self.unpublished_updates.replace(true) {
504 log::trace!("new hub info on {hub_handle} will be pushed to app soon");
505 }
506 }
507 }
508
509 async fn push_updates(self: Rc<Self>) {
510 let mut interval = tokio::time::interval(self.app.hub_cache_config.push_interval);
511
512 loop {
513 interval.tick().await;
514
515 if !self.unpublished_updates.get() {
516 continue;
517 }
518
519 let chir = api::phc::user::CachedHubInfoResp {
520 hubs: self.hub_info.borrow().clone(),
521 };
522
523 let cr = api::Responder(Ok(chir)).into_cached();
524
525 log::trace!("pushing updated cached hub info to apps");
526 if self
527 .app
528 .broadcast
529 .send(InterAppMsg::UpdatedHubInfo(cr))
530 .is_err()
531 {
532 log::error!("failed to internally broadcast updated hub information");
533 continue;
534 };
535
536 self.unpublished_updates.set(false);
537 }
538 }
539}
540
541impl App {
542 async fn discovery_info_of(
544 &self,
545 name: servers::Name,
546 url: &url::Url,
547 ) -> api::Result<api::DiscoveryInfoResp> {
548 let tdi = self
549 .client
550 .query::<api::DiscoveryInfo>(url, NoPayload)
551 .await
552 .into_server_result()?;
553
554 client::discovery::DiscoveryInfoCheck {
555 phc_url: &self.phc_url,
556 name,
557 self_check_code: None,
558 constellation: None,
559 }
560 .check(tdi, url)
561 }
562}
563
564#[derive(Clone)]
565pub struct AppCreator {
566 pub base: AppCreatorBase<Server>,
567 pub transcryptor_url: url::Url,
568 pub auths_url: url::Url,
569 pub global_client_url: url::Url,
570 pub hubs: crate::map::Map<hub::BasicInfo>,
571 pub master_enc_key_part: elgamal::PrivateKey,
572 pub attr_id_secret: Box<[u8]>,
573 pub auth_token_secret: crypto::SealingKey,
574 pub auth_token_validity: core::time::Duration,
575 pub pp_nonce_secret: crypto::SealingKey,
576 pub pp_nonce_validity: core::time::Duration,
577 pub user_object_hmac_secret: Box<[u8]>,
578 pub quota: api::phc::user::Quota,
579 pub card_pseud_validity: core::time::Duration,
580 pub hub_cache_config: HubCacheConfig,
581}
582
583impl Deref for AppCreator {
584 type Target = AppCreatorBase<Server>;
585
586 fn deref(&self) -> &Self::Target {
587 &self.base
588 }
589}
590
591impl DerefMut for AppCreator {
592 fn deref_mut(&mut self) -> &mut Self::Target {
593 &mut self.base
594 }
595}
596
597#[derive(Clone)]
598pub struct AppCreatorContext {
599 broadcast: tokio::sync::broadcast::Sender<InterAppMsg>,
600}
601
602impl Default for AppCreatorContext {
603 fn default() -> Self {
604 Self {
605 broadcast: tokio::sync::broadcast::Sender::<InterAppMsg>::new(10),
606 }
607 }
608}
609
610impl crate::servers::AppCreator<Server> for AppCreator {
611 type ContextT = AppCreatorContext;
612
613 fn into_app(self, handle: &Handle<Server>, context: &Self::ContextT, generation: usize) -> App {
614 App {
615 base: AppBase::new(self.base, handle, generation),
616 transcryptor_url: self.transcryptor_url,
617 auths_url: self.auths_url,
618 global_client_url: self.global_client_url,
619 hubs: self.hubs,
620 master_enc_key_part: self.master_enc_key_part,
621 attr_id_secret: self.attr_id_secret,
622 auth_token_secret: self.auth_token_secret,
623 auth_token_validity: self.auth_token_validity,
624 pp_nonce_secret: self.pp_nonce_secret,
625 pp_nonce_validity: self.pp_nonce_validity,
626 user_object_hmac_secret: self.user_object_hmac_secret,
627 quota: self.quota,
628 card_pseud_validity: self.card_pseud_validity,
629 broadcast: context.broadcast.clone(),
630 cached_hub_info: std::cell::RefCell::new(
632 api::Responder(Err(api::ErrorCode::PleaseRetry)).into_cached(),
633 ),
634 hub_cache_config: self.hub_cache_config,
635 }
636 }
637
638 fn new(config: &servers::Config) -> anyhow::Result<Self> {
639 let mut hubs: crate::map::Map<hub::BasicInfo> = Default::default();
640
641 let xconf = &config.phc.as_ref().unwrap();
642
643 for basic_hub_info in xconf.hubs.iter() {
644 if let Some(hub_or_id) = hubs.insert_new(basic_hub_info.clone().into()) {
645 anyhow::bail!("two hubs are known as {hub_or_id}");
646 }
647 }
648
649 let master_enc_key_part: elgamal::PrivateKey = xconf
650 .master_enc_key_part
651 .clone()
652 .expect("master_enc_key_part not generated");
653
654 let base = AppCreatorBase::<Server>::new(config)?;
655
656 let auth_token_secret: crypto::SealingKey = base
657 .enc_key
658 .derive_sealing_key(sha2::Sha256::new(), "pubhubs-phc-auth-token-secret");
659
660 let pp_nonce_secret: crypto::SealingKey = base
661 .enc_key
662 .derive_sealing_key(sha2::Sha256::new(), "pubhubs-pp-nonce-secret");
663
664 Ok(Self {
665 base,
666 transcryptor_url: xconf.transcryptor_url.as_ref().clone(),
667 auths_url: xconf.auths_url.as_ref().clone(),
668 global_client_url: xconf.global_client_url.as_ref().clone(),
669 hubs,
670 master_enc_key_part,
671 attr_id_secret: <serde_bytes::ByteBuf as Clone>::clone(
672 xconf
673 .attr_id_secret
674 .as_ref()
675 .expect("attr_id_secret was not initialized"),
676 )
677 .into_vec()
678 .into_boxed_slice(),
679 auth_token_secret,
680 auth_token_validity: xconf.auth_token_validity,
681 pp_nonce_secret,
682 pp_nonce_validity: xconf.pp_nonce_validity,
683 user_object_hmac_secret: <serde_bytes::ByteBuf as Clone>::clone(
684 xconf
685 .user_object_hmac_secret
686 .as_ref()
687 .expect("user_object_hmac_secret was not initialized"),
688 )
689 .into_vec()
690 .into_boxed_slice(),
691 quota: xconf.user_quota.clone(),
692 card_pseud_validity: xconf.card_pseud_validity,
693 hub_cache_config: xconf.hub_cache.clone(),
694 })
695 }
696}
697
698#[derive(Clone, Debug)]
700pub(crate) enum InterAppMsg {
701 UpdatedHubInfo(api::CachedResponse<api::phc::user::CachedHubInfoEP>),
702}