1use actix_web::web;
3use anyhow::{Context as _, Result};
4use futures_util::future::FutureExt as _;
5
6use core::convert::Infallible;
7use std::ops::{Deref, DerefMut};
8use std::rc::Rc;
9
10use crate::common::elgamal;
11
12use crate::client;
13
14use crate::api::OpenError;
15use crate::api::{
16 self, ApiResultExt as _, DiscoveryRunResp, EndpointDetails, NoPayload, ResultExt as _,
17};
18use crate::servers::{self, Config, Constellation, Handle};
19
20#[derive(
22 serde::Serialize, serde::Deserialize, Clone, Copy, Debug, PartialEq, Eq, Hash, clap::ValueEnum,
23)]
24pub enum Name {
25 #[serde(rename = "phc")]
26 PubhubsCentral,
27
28 #[serde(rename = "transcryptor")]
29 Transcryptor,
30
31 #[serde(rename = "auths")]
32 AuthenticationServer,
33}
34
35impl std::fmt::Display for Name {
36 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
37 write!(
38 f,
39 "{}",
40 match self {
41 Name::PubhubsCentral => "PubHubs Central",
42 Name::Transcryptor => "Transcryptor",
43 Name::AuthenticationServer => "Authentication Server",
44 }
45 )
46 }
47}
48
49pub trait Server: DerefMut<Target = Self::AppCreatorT> + Sized + 'static {
61 type AppT: App<Self>;
62
63 const NAME: Name;
64
65 fn default_port() -> u16 {
67 match Self::NAME {
68 Name::PubhubsCentral => 5050,
71 Name::Transcryptor => 7070,
72 Name::AuthenticationServer => 6060,
73 }
74 }
75
76 type AppCreatorT: AppCreator<Self>;
78
79 type ExtraConfig;
80
81 type ExtraRunningState: Clone + core::fmt::Debug;
83
84 type ExtraSharedState;
86
87 type ObjectStoreT: Sync;
89
90 fn new(config: &crate::servers::Config) -> Result<Self>;
91
92 fn config(&self) -> &crate::servers::Config;
93
94 fn server_config(&self) -> &servers::config::ServerConfig<Self::ExtraConfig> {
95 Self::server_config_from(self.config())
96 }
97
98 fn server_config_from(
99 config: &servers::Config,
100 ) -> &servers::config::ServerConfig<Self::ExtraConfig>;
101
102 fn create_running_state(
103 &self,
104 constellation: &Constellation,
105 ) -> Result<Self::ExtraRunningState>;
106
107 fn create_extra_shared_state(config: &servers::Config) -> Result<Self::ExtraSharedState>;
108
109 #[expect(async_fn_in_trait)]
128 async fn run_until_modifier(
129 self: Rc<Self>,
130 shutdown_receiver: tokio::sync::oneshot::Receiver<Infallible>,
131 app: Rc<Self::AppT>,
132 ) -> Result<Option<BoxModifier<Self>>>;
133
134 fn cors() -> actix_cors::Cors {
136 actix_cors::Cors::default()
137 .allow_any_origin()
138 .allowed_methods(["GET", "POST"])
139 .allowed_header(actix_web::http::header::CONTENT_TYPE)
140 .allowed_header(actix_web::http::header::AUTHORIZATION)
141 }
142}
143
144pub struct ServerImpl<D: Details> {
146 config: servers::Config,
147 app_creator: D::AppCreatorT,
148}
149
150impl<D: Details> Deref for ServerImpl<D> {
151 type Target = D::AppCreatorT;
152
153 #[inline]
154 fn deref(&self) -> &Self::Target {
155 &self.app_creator
156 }
157}
158
159impl<D: Details> DerefMut for ServerImpl<D> {
160 #[inline]
161 fn deref_mut(&mut self) -> &mut Self::Target {
162 &mut self.app_creator
163 }
164}
165
166pub trait Details: crate::servers::config::GetServerConfig + 'static + Sized {
168 const NAME: Name;
169 type AppCreatorT;
170 type AppT;
171 type ExtraRunningState: Clone + core::fmt::Debug;
172 type ExtraSharedState;
173 type ObjectStoreT;
174
175 fn create_running_state(
176 server: &ServerImpl<Self>,
177 constellation: &Constellation,
178 ) -> Result<Self::ExtraRunningState>;
179
180 fn create_extra_shared_state(config: &servers::Config) -> Result<Self::ExtraSharedState>;
181}
182
183impl<D: Details> Server for ServerImpl<D>
184where
185 D::AppT: App<Self>,
186 D::AppCreatorT: AppCreator<Self>,
187 D::ObjectStoreT: Sync,
188{
189 const NAME: Name = D::NAME;
190
191 type AppCreatorT = D::AppCreatorT;
192 type AppT = D::AppT;
193
194 type ExtraConfig = D::Extra;
195 type ExtraRunningState = D::ExtraRunningState;
196 type ExtraSharedState = D::ExtraSharedState;
197
198 type ObjectStoreT = D::ObjectStoreT;
199
200 fn new(config: &servers::Config) -> Result<Self> {
201 Ok(Self {
202 app_creator: Self::AppCreatorT::new(config)?,
203 config: config.clone(),
204 })
205 }
206
207 fn config(&self) -> &servers::Config {
208 &self.config
209 }
210
211 fn server_config_from(
212 config: &servers::Config,
213 ) -> &servers::config::ServerConfig<Self::ExtraConfig> {
214 D::server_config(config)
215 }
216
217 fn create_running_state(
218 &self,
219 constellation: &Constellation,
220 ) -> Result<Self::ExtraRunningState> {
221 D::create_running_state(self, constellation)
222 }
223
224 fn create_extra_shared_state(config: &servers::Config) -> Result<Self::ExtraSharedState> {
225 D::create_extra_shared_state(config)
226 }
227
228 async fn run_until_modifier(
229 self: Rc<Self>,
230 shutdown_receiver: tokio::sync::oneshot::Receiver<Infallible>,
231 app: Rc<Self::AppT>,
232 ) -> Result<Option<crate::servers::server::BoxModifier<Self>>> {
233 tokio::select! {
234 res = shutdown_receiver => {
235 res.expect_err("got instance of Infallible");
236 #[expect(clippy::needless_return)] return Ok(None);
238 }
239
240 res = self.run_discovery_and_then_wait_forever(app) => {
241 #[expect(clippy::needless_return)] return Err(res.expect_err("got instance of Infallible"));
243 }
244 }
245 }
246}
247
248impl<D: Details> ServerImpl<D>
249where
250 D::AppT: App<Self>,
251 D::AppCreatorT: AppCreator<Self>,
252 D::ObjectStoreT: Sync,
253{
254 async fn run_discovery_and_then_wait_forever(&self, app: Rc<D::AppT>) -> Result<Infallible> {
255 self.run_discovery(app.clone()).await?;
256
257 D::AppT::global_task(app).await }
259
260 async fn run_discovery(&self, app: Rc<D::AppT>) -> Result<()> {
261 crate::misc::task::retry(|| async {
262 (match
263 AppBase::<Self>::handle_discovery_run(app.clone()).await.retryable()?
264 {
265 Some(DiscoveryRunResp::Restarting | DiscoveryRunResp::UpToDate) => Ok(Some(())),
266 None => Ok(None),
267 }) as Result<Option<()>>
268 })
269 .await?
270 .ok_or_else(|| anyhow::anyhow!("timeout waiting for discovery of {server_name}", server_name = D::NAME))
271 }
272}
273
274pub trait AppCreator<ServerT: Server>:
276 DerefMut<Target = AppCreatorBase<ServerT>> + Send + Clone + 'static
277{
278 type ContextT: Default + Send + Clone + 'static;
281
282 fn new(config: &servers::Config) -> Result<Self>;
284
285 fn into_app(
293 self,
294 handle: &Handle<ServerT>,
295 context: &Self::ContextT,
296 generation: usize,
297 ) -> ServerT::AppT;
298}
299
300pub trait Modifier<ServerT: Server>: Send + 'static {
308 fn modify(self: Box<Self>, server: &mut ServerT) -> bool;
310
311 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error>;
312}
313
314impl<S: Server, F: FnOnce(&mut S) -> bool + Send + 'static, D: std::fmt::Display + Send + 'static>
315 Modifier<S> for (F, D)
316{
317 fn modify(self: Box<Self>, server: &mut S) -> bool {
318 self.0(server)
319 }
320
321 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
322 self.1.fmt(f)
323 }
324}
325
326pub struct Exiter;
328
329impl<S: Server> Modifier<S> for Exiter {
330 fn modify(self: Box<Self>, _server: &mut S) -> bool {
331 false
332 }
333
334 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
335 f.write_str("exiter")
336 }
337}
338
339pub type BoxModifier<S> = Box<dyn Modifier<S>>;
341
342impl<S: Server> std::fmt::Display for BoxModifier<S> {
343 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
344 Modifier::fmt(&**self, f)
345 }
346}
347
348pub(crate) trait Inspector<ServerT: Server>: Send + 'static {
350 fn inspect(self: Box<Self>, server: &ServerT);
352
353 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error>;
354}
355
356impl<S: Server, F: FnOnce(&S) + Send + 'static, D: std::fmt::Display + Send + 'static> Inspector<S>
357 for (F, D)
358{
359 fn inspect(self: Box<Self>, server: &S) {
360 self.0(server)
361 }
362
363 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
364 self.1.fmt(f)
365 }
366}
367
368pub type BoxInspector<S> = Box<dyn Inspector<S>>;
370
371impl<S: Server> std::fmt::Display for BoxInspector<S> {
372 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
373 Inspector::fmt(&**self, f)
374 }
375}
376
377pub(crate) enum Command<S: Server> {
379 Modify(BoxModifier<S>),
385
386 Inspect(BoxInspector<S>),
388
389 Exit,
391}
392impl<S: Server> std::fmt::Display for Command<S> {
393 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
394 match self {
395 Command::Inspect(inspector) => write!(f, "inspector {inspector}"),
396 Command::Modify(modifier) => write!(f, "modifier {modifier}"),
397 Command::Exit => write!(f, "exit"),
398 }
399 }
400}
401
402pub enum DiscoverVerdict {
404 Alright,
406
407 ConstellationOutdated {
409 new_constellation: Box<Constellation>,
410 },
411
412 BinaryOutdated,
414}
415
416#[allow(async_fn_in_trait)]
420pub trait App<S: Server>: Deref<Target = AppBase<S>> + 'static {
421 fn configure_actix_app(self: &Rc<Self>, sc: &mut web::ServiceConfig);
424
425 fn check_constellation(&self, constellation: &Constellation) -> bool;
427
428 async fn discover(
436 self: &Rc<Self>,
437 phc_inf: api::DiscoveryInfoResp,
438 ) -> api::Result<DiscoverVerdict> {
439 log::debug!("{server_name}: running discovery", server_name = S::NAME);
440
441 if S::NAME == Name::PubhubsCentral {
442 log::error!(
443 "{} should implement discovery itself!",
444 Name::PubhubsCentral
445 );
446 return Err(api::ErrorCode::InternalError);
447 }
448
449 if let Some(ref phc_version) = phc_inf.version
450 && let Some(my_version) = &self.version
451 {
452 let phc_version = crate::servers::version::to_semver(phc_version).map_err(|err| {
453 log::error!(
454 "could not parse semantic version returned by PHC: {phc_version}: {err}"
455 );
456 api::ErrorCode::InternalError
457 })?;
458
459 let my_version = crate::servers::version::to_semver(my_version).map_err(|err| {
460 log::error!("could not parse my semantic version {my_version}: {err}");
461 api::ErrorCode::InternalError
462 })?;
463
464 if my_version < phc_version {
465 log::warn!(
466 "{server_name}: {phc}'s version ({phc_version}) > my version ({my_version})",
467 server_name = S::NAME,
468 phc = Name::PubhubsCentral
469 );
470 return Ok(DiscoverVerdict::BinaryOutdated);
471 }
472
473 if my_version > phc_version {
474 log::warn!(
475 "{server_name}: {phc}'s version {phc_version} is out-of-date - requesting rediscovery",
476 server_name = S::NAME,
477 phc = Name::PubhubsCentral
478 );
479
480 let _drr = self
481 .client
482 .query::<api::DiscoveryRun>(&phc_inf.phc_url, NoPayload)
483 .await
484 .into_server_result()?;
485 return Err(api::ErrorCode::PleaseRetry);
486 }
487 } else {
488 log::warn!(
489 "not checking my version ({my_version}) against phc's version ({phc_version})",
490 my_version = crate::servers::version::VERSION,
491 phc_version = phc_inf.version.unwrap_or_else(|| "n/a".to_string())
492 );
493 }
494
495 assert!(
496 phc_inf.constellation_or_id.is_some(),
497 "this `discover` method should only be run when phc_inf.constellation is some"
498 );
499
500 let Some(phc_inf_constellation) = phc_inf.constellation_or_id.unwrap().into_constellation()
501 else {
502 log::warn!(
503 "{server_name}: {phc} returned only its contellation id",
504 phc = Name::PubhubsCentral,
505 server_name = S::NAME
506 );
507 return Err(api::ErrorCode::InternalError);
508 };
509
510 if !self.check_constellation(&phc_inf_constellation) {
511 log::warn!(
512 "{server_name}: {phc}'s constellation seems to be out-of-date - requesting rediscovery",
513 server_name = S::NAME,
514 phc = Name::PubhubsCentral
515 );
516
517 let _drr = self
519 .client
520 .query::<api::DiscoveryRun>(&phc_inf.phc_url, NoPayload)
521 .await
522 .into_server_result()?;
523
524 return Err(api::ErrorCode::PleaseRetry);
528 }
529
530 log::trace!(
531 "{server_name}: {phc}'s constellation looks alright! ",
532 server_name = S::NAME,
533 phc = Name::PubhubsCentral
534 );
535
536 if let Some(rs) = self.running_state.as_ref()
537 && phc_inf_constellation.id == rs.constellation.id
538 {
539 log::info!(
540 "{server_name}: my constellation is up-to-date!",
541 server_name = S::NAME,
542 );
543
544 return Ok(DiscoverVerdict::Alright);
545 }
546
547 log::info!(
548 "{}: my constellation is {}",
549 S::NAME,
550 if self.running_state.is_some() {
551 "out of date"
552 } else {
553 "not yet set"
554 }
555 );
556
557 let url = phc_inf_constellation.url(S::NAME);
559
560 let di = self
562 .client
563 .query::<api::DiscoveryInfo>(url, NoPayload)
564 .await
565 .into_server_result()?;
566
567 let _di_again = client::discovery::DiscoveryInfoCheck {
568 name: S::NAME,
569 phc_url: &self.phc_url,
570 self_check_code: Some(&self.self_check_code),
571 constellation: None,
572 }
575 .check(di, url)?;
576
577 Ok(DiscoverVerdict::ConstellationOutdated {
578 new_constellation: Box::new(phc_inf_constellation),
579 })
580 }
581
582 fn master_enc_key_part(&self) -> Option<&elgamal::PrivateKey> {
584 if matches!(S::NAME, Name::PubhubsCentral | Name::Transcryptor) {
585 panic!("this default impl should have been overriden for PHC and T")
586 }
587 None
588 }
589
590 async fn local_task(_weak: std::rc::Weak<Self>) {}
592
593 async fn global_task(_app: std::rc::Rc<Self>) -> Result<Infallible> {
595 Ok(std::future::pending::<Infallible>().await)
596 }
597}
598
599pub struct AppCreatorBase<S: Server> {
601 pub running_state: Option<RunningState<S::ExtraRunningState>>,
602 pub phc_url: url::Url,
603 pub self_check_code: String,
604 pub jwt_key: api::SigningKey,
605 pub enc_key: elgamal::PrivateKey,
606 pub admin_key: api::VerifyingKey,
607 pub shared: SharedState<S>,
608 pub version: Option<String>,
609}
610
611impl<S: Server> Clone for AppCreatorBase<S> {
613 fn clone(&self) -> Self {
614 Self {
615 running_state: self.running_state.clone(),
616 phc_url: self.phc_url.clone(),
617 self_check_code: self.self_check_code.clone(),
618 jwt_key: self.jwt_key.clone(),
619 enc_key: self.enc_key.clone(),
620 admin_key: self.admin_key.clone(),
621 shared: self.shared.clone(),
622 version: self.version.clone(),
623 }
624 }
625}
626
627impl<S: Server> AppCreatorBase<S>
628where
629 S::ObjectStoreT: for<'a> TryFrom<&'a Option<servers::config::ObjectStoreConfig>, Error = anyhow::Error>
630 + Sync,
631{
632 pub fn new(config: &crate::servers::Config) -> Result<Self> {
633 assert_eq!(
634 config.preparation_state,
635 crate::servers::config::PreparationState::Complete
636 );
637
638 let server_config = S::server_config_from(config);
639
640 Ok(Self {
641 running_state: None,
642 self_check_code: server_config
643 .self_check_code
644 .clone()
645 .expect("self_check_code was not set nor generated"),
646 jwt_key: server_config
647 .jwt_key
648 .clone()
649 .expect("jwt_key was not set nor generated"),
650 enc_key: server_config
651 .enc_key
652 .clone()
653 .expect("enc_key was not set nor generated"),
654 phc_url: config.phc_url.as_ref().clone(),
655 admin_key: server_config
656 .admin_key
657 .clone()
658 .expect("admin_key was not set nor generated"),
659 shared: SharedState::new(SharedStateInner {
660 object_store: TryFrom::try_from(&server_config.object_store)
661 .with_context(|| format!("Creating object store for {}", S::NAME))?,
662 extra: S::create_extra_shared_state(config)?,
663 }),
664 version: server_config.version.clone(),
665 })
666 }
667}
668
669pub struct AppBase<S: Server> {
673 pub running_state: Option<RunningState<S::ExtraRunningState>>,
674 pub handle: Handle<S>,
675 pub self_check_code: String,
676 pub phc_url: url::Url,
677 pub jwt_key: api::SigningKey,
678 pub enc_key: elgamal::PrivateKey,
679 pub admin_key: api::VerifyingKey,
680 pub shared: SharedState<S>,
681 pub client: client::Client,
682 pub version: Option<String>,
683 pub thread_id: std::thread::ThreadId,
684 pub generation: usize,
685}
686
687impl<S: Server> Drop for AppBase<S> {
688 fn drop(&mut self) {
689 log::trace!(
690 "{}: app for generation {} that started on {:?} dropped",
691 S::NAME,
692 self.generation,
693 self.thread_id
694 );
695 }
696}
697
698impl<S: Server> AppBase<S> {
699 pub fn new(creator_base: AppCreatorBase<S>, handle: &Handle<S>, generation: usize) -> Self {
700 let thread_id = std::thread::current().id();
701
702 log::trace!(
703 "{}: app for generation {} started on {:?}",
704 S::NAME,
705 generation,
706 thread_id
707 );
708
709 Self {
710 running_state: creator_base.running_state,
711 handle: handle.clone(),
712 phc_url: creator_base.phc_url,
713 self_check_code: creator_base.self_check_code,
714 jwt_key: creator_base.jwt_key,
715 enc_key: creator_base.enc_key,
716 admin_key: creator_base.admin_key,
717 shared: creator_base.shared,
718 client: client::Client::builder()
719 .agent(client::Agent::Server(S::NAME))
720 .finish(),
721 version: creator_base.version,
722 thread_id,
723 generation,
724 }
725 }
726
727 pub fn running_state_or_please_retry(
730 &self,
731 ) -> Result<&RunningState<S::ExtraRunningState>, api::ErrorCode> {
732 self.running_state
733 .as_ref()
734 .ok_or(api::ErrorCode::PleaseRetry)
735 }
736
737 pub fn running_state_or_internal_error(
740 &self,
741 ) -> Result<&RunningState<S::ExtraRunningState>, api::ErrorCode> {
742 self.running_state.as_ref().ok_or_else(|| {
743 log::error!(
744 "{}: expected running state to be available, but it was not",
745 S::NAME
746 );
747 api::ErrorCode::InternalError
748 })
749 }
750
751 pub fn configure_actix_app(app: &Rc<S::AppT>, sc: &mut web::ServiceConfig) {
753 api::DiscoveryRun::add_to(app, sc, Self::handle_discovery_run);
754 api::DiscoveryInfo::caching_add_to(app, sc, Self::cached_handle_discovery_info);
755
756 api::admin::UpdateConfigEP::add_to(app, sc, Self::handle_admin_post_config);
757 api::admin::InfoEP::add_to(app, sc, Self::handle_admin_info);
758 }
759
760 pub async fn handle_hub_ping(
763 app: Rc<S::AppT>,
764 signed_req: web::Json<api::phc::hub::TicketSigned<api::server::PingReq>>,
765 ) -> api::Result<api::server::PingResp> {
766 let running_state = app.running_state_or_please_retry()?;
767
768 let ts_req = signed_req.into_inner();
769
770 let (req, hub_handle) = match ts_req.open(&running_state.constellation.phc_jwt_key) {
771 Ok(opened) => opened,
772 Err(toe) => return toe.default_verdict(api::server::PingResp::RetryWithNewTicket),
773 };
774
775 Ok(api::server::PingResp::Success {
776 hub_handle,
777 nonce: req.nonce,
778 served_by: S::NAME,
779 })
780 }
781
782 async fn handle_admin_post_config(
784 app: Rc<S::AppT>,
785 signed_req: web::Json<api::Signed<api::admin::UpdateConfigReq>>,
786 ) -> api::Result<api::admin::UpdateConfigResp> {
787 let signed_req = signed_req.into_inner();
788
789 let req = match signed_req.open(&*app.admin_key, None) {
790 Ok(req) => req,
791 Err(OpenError::OtherConstellation(..)) | Err(OpenError::InternalError) => {
792 return Err(api::ErrorCode::InternalError);
793 }
794 Err(OpenError::OtherwiseInvalid) => return Err(api::ErrorCode::BadRequest),
795 Err(OpenError::Expired) => return Ok(api::admin::UpdateConfigResp::ResignRequest),
796 Err(OpenError::InvalidSignature) => {
797 return Ok(api::admin::UpdateConfigResp::InvalidAdminKey);
798 }
799 };
800
801 let config = app
806 .handle
807 .inspect(
808 "admin's retrieval of current configuration",
809 |server: &S| -> Config { server.config().clone() },
810 )
811 .await
812 .into_ec(|_| {
813 log::warn!("{}: failed to retrieve configuration from server", S::NAME,);
814 api::ErrorCode::PleaseRetry })?;
816
817 let mut new_config: Config = config
818 .json_updated(&req.pointer, req.new_value.clone())
819 .into_ec(|err| {
820 log::warn!(
821 "{}: failed to modify configuration at {} to {}: {err:#}",
822 S::NAME,
823 req.pointer,
824 req.new_value
825 );
826 api::ErrorCode::BadRequest
827 })?;
828
829 drop(config);
830
831 new_config.preliminary_prep().into_ec(|err| {
833 log::warn!(
834 "{}: failed to reprepare (preliminary step) modified configuration: {err}",
835 S::NAME
836 );
837 api::ErrorCode::BadRequest
838 })?;
839 new_config.prepare().await.into_ec(|err| {
840 log::warn!(
841 "{}: failed to reprepare modified configuration: {err}",
842 S::NAME
843 );
844 api::ErrorCode::BadRequest
845 })?;
846
847 app
849 .handle
850 .modify(
851 "admin update of current in-memory configuration",
852 move |server: &mut S| {
853
854 let new_server_maybe = S::new(&new_config);
855
856 if let Err(err) = new_server_maybe {
857 log::error!("Could not create new {} with changed configuration: {}. Restarting old server.", S::NAME, err);
858 return true; }
860
861 *server = new_server_maybe.unwrap();
862
863 true }
865 )
866 .await
867 .into_ec(|_| {
868 log::warn!("{}: failed to enqueue modification", S::NAME);
869 api::ErrorCode::PleaseRetry
870 })?;
871
872 Ok(api::admin::UpdateConfigResp::Success)
873 }
874
875 async fn handle_admin_info(
877 app: Rc<S::AppT>,
878 signed_req: web::Json<api::Signed<api::admin::InfoReq>>,
879 ) -> api::Result<api::admin::InfoResp> {
880 let signed_req = signed_req.into_inner();
881
882 let _req = match signed_req.open(&*app.admin_key, None) {
883 Ok(req) => req,
884 Err(OpenError::OtherConstellation(..)) | Err(OpenError::InternalError) => {
885 return Err(api::ErrorCode::InternalError);
886 }
887 Err(OpenError::OtherwiseInvalid) => return Err(api::ErrorCode::BadRequest),
888 Err(OpenError::Expired) => return Ok(api::admin::InfoResp::ResignRequest),
889 Err(OpenError::InvalidSignature) => return Ok(api::admin::InfoResp::InvalidAdminKey),
890 };
891
892 let config = app
893 .handle
894 .inspect(
895 "admin's retrieval of current configuration",
896 |server: &S| -> Config { server.config().clone() },
897 )
898 .await
899 .into_ec(|_| {
900 log::warn!("{}: failed to retrieve configuration from server", S::NAME,);
901 api::ErrorCode::PleaseRetry })?;
903
904 Ok(api::admin::InfoResp::Success {
905 config: Box::new(config),
906 })
907 }
908
909 async fn handle_discovery_run(app: Rc<S::AppT>) -> api::Result<api::DiscoveryRunResp> {
912 app.handle.request_discovery(app.clone()).await
913 }
914
915 pub(super) async fn discover_phc(app: Rc<S::AppT>) -> api::Result<api::DiscoveryInfoResp> {
916 let pdi = app
917 .client
918 .query::<api::DiscoveryInfo>(&app.phc_url, NoPayload)
919 .await
920 .into_server_result()?;
921
922 client::discovery::DiscoveryInfoCheck {
923 phc_url: &app.phc_url,
924 name: Name::PubhubsCentral,
925 self_check_code: if S::NAME == Name::PubhubsCentral {
926 Some(&app.self_check_code)
927 } else {
928 None
929 },
930 constellation: None,
931 }
935 .check(pdi, &app.phc_url)
936 }
937
938 fn cached_handle_discovery_info(app: &S::AppT) -> api::Result<api::DiscoveryInfoResp> {
939 use super::constellation::ConstellationOrId;
940
941 let constellation_or_id = app.running_state.as_ref().map(|rs| match S::NAME {
943 Name::PubhubsCentral => ConstellationOrId::Constellation(
944 AsRef::<Constellation>::as_ref(&rs.constellation)
945 .clone()
946 .into(),
947 ),
948 _ => ConstellationOrId::Id {
949 id: rs.constellation.id,
950 },
951 });
952
953 Ok(api::DiscoveryInfoResp {
954 name: S::NAME,
955 version: app.version.clone(),
956 self_check_code: app.self_check_code.clone(),
957 phc_url: app.phc_url.clone(),
958 jwt_key: app.jwt_key.verifying_key().into(),
963 enc_key: app.enc_key.public_key().clone(),
964 master_enc_key_part: app
965 .master_enc_key_part()
966 .map(|privk| privk.public_key().clone()),
967 constellation_or_id,
968 })
969 }
970}
971
972pub struct AppMethod<App, F, EP: ?Sized> {
975 app: Rc<App>,
976 f: F,
977 phantom: std::marker::PhantomData<EP>,
978}
979
980impl<App, F: Clone, EP> Clone for AppMethod<App, F, EP> {
983 fn clone(&self) -> Self {
984 Self {
985 app: self.app.clone(),
986 f: self.f.clone(),
987 phantom: std::marker::PhantomData,
988 }
989 }
990}
991
992impl<App, F, EP: ?Sized> AppMethod<App, F, EP> {
993 pub fn new(app: &Rc<App>, f: F) -> Self {
995 AppMethod {
996 app: app.clone(),
997 f,
998 phantom: std::marker::PhantomData,
999 }
1000 }
1001}
1002
1003macro_rules! factory_tuple ({ $($param:ident)* } => {
1007 impl<Func, Fut, App, EP, $($param,)*> actix_web::Handler<($($param,)*)> for AppMethod<App, Func, EP>
1008 where
1009 Func: Fn(Rc<App>, $($param),*) -> Fut + Clone + 'static,
1010 Fut: core::future::Future,
1011 App: 'static,
1012 EP : EndpointDetails + 'static,
1013 Fut::Output : Into<EP::ResponseType>,
1014 {
1015 type Output = api::Responder<EP>;
1016 type Future = futures::future::Map<Fut, fn(Fut::Output)->api::Responder<EP>>;
1017
1018 #[inline]
1019 #[allow(non_snake_case)]
1022 fn call(&self, ($($param,)*): ($($param,)*)) -> Self::Future {
1023 (self.f)(self.app.clone(), $($param,)*).map(response_type_to_responder)
1024 }
1025 }
1026});
1027
1028fn response_type_to_responder<EP: EndpointDetails, T: Into<EP::ResponseType>>(
1030 output: T,
1031) -> api::Responder<EP> {
1032 api::Responder(output.into())
1033}
1034
1035factory_tuple! {}
1036factory_tuple! { A }
1037factory_tuple! { A B }
1038factory_tuple! { A B C }
1039factory_tuple! { A B C D }
1040factory_tuple! { A B C D E }
1041factory_tuple! { A B C D E F }
1042factory_tuple! { A B C D E F G }
1043factory_tuple! { A B C D E F G H }
1044factory_tuple! { A B C D E F G H I }
1045factory_tuple! { A B C D E F G H I J }
1046factory_tuple! { A B C D E F G H I J K }
1047factory_tuple! { A B C D E F G H I J K L }
1048factory_tuple! { A B C D E F G H I J K L M }
1049factory_tuple! { A B C D E F G H I J K L M N }
1050factory_tuple! { A B C D E F G H I J K L M N O }
1051factory_tuple! { A B C D E F G H I J K L M N O P }
1052
1053#[derive(Clone, Debug)]
1055pub struct RunningState<Extra: Clone + core::fmt::Debug> {
1056 pub constellation: Box<Constellation>,
1057
1058 extra: Extra,
1060}
1061
1062impl<Extra: Clone + core::fmt::Debug> RunningState<Extra> {
1063 pub(crate) fn new(constellation: Constellation, extra: Extra) -> Self {
1064 RunningState {
1065 constellation: Box::new(constellation),
1066 extra,
1067 }
1068 }
1069}
1070
1071impl<Extra: Clone + core::fmt::Debug> Deref for RunningState<Extra> {
1072 type Target = Extra;
1073
1074 #[inline]
1075 fn deref(&self) -> &Extra {
1076 &self.extra
1077 }
1078}
1079
1080pub struct SharedState<S: Server> {
1082 inner: std::sync::Arc<SharedStateInner<S>>,
1083}
1084
1085impl<S: Server> Clone for SharedState<S> {
1086 fn clone(&self) -> Self {
1087 Self {
1088 inner: self.inner.clone(),
1089 }
1090 }
1091}
1092
1093impl<S: Server> std::ops::Deref for SharedState<S> {
1094 type Target = SharedStateInner<S>;
1095
1096 #[inline]
1097 fn deref(&self) -> &Self::Target {
1098 &self.inner
1099 }
1100}
1101
1102impl<S: Server> SharedState<S> {
1103 fn new(inner: SharedStateInner<S>) -> Self {
1104 Self {
1105 inner: std::sync::Arc::new(inner),
1106 }
1107 }
1108}
1109
1110pub struct SharedStateInner<S: Server> {
1111 pub object_store: S::ObjectStoreT,
1112 pub extra: S::ExtraSharedState,
1113}
1114
1115impl<S: Server> std::ops::Deref for SharedStateInner<S> {
1116 type Target = S::ExtraSharedState;
1117
1118 #[inline]
1119 fn deref(&self) -> &Self::Target {
1120 &self.extra
1121 }
1122}