Skip to main content

pubhubs/servers/
server.rs

1//! What's common between PubHubs servers
2use actix_web::web;
3use anyhow::{Context as _, Result};
4use futures_util::future::FutureExt as _;
5
6use core::convert::Infallible;
7use std::ops::{Deref, DerefMut};
8use std::rc::Rc;
9
10use crate::common::elgamal;
11
12use crate::client;
13
14use crate::api::OpenError;
15use crate::api::{
16    self, ApiResultExt as _, DiscoveryRunResp, EndpointDetails, NoPayload, ResultExt as _,
17};
18use crate::servers::{self, Config, Constellation, Handle};
19
20/// Enumerates the names of the different PubHubs servers
21#[derive(
22    serde::Serialize, serde::Deserialize, Clone, Copy, Debug, PartialEq, Eq, Hash, clap::ValueEnum,
23)]
24pub enum Name {
25    #[serde(rename = "phc")]
26    PubhubsCentral,
27
28    #[serde(rename = "transcryptor")]
29    Transcryptor,
30
31    #[serde(rename = "auths")]
32    AuthenticationServer,
33}
34
35impl std::fmt::Display for Name {
36    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
37        write!(
38            f,
39            "{}",
40            match self {
41                Name::PubhubsCentral => "PubHubs Central",
42                Name::Transcryptor => "Transcryptor",
43                Name::AuthenticationServer => "Authentication Server",
44            }
45        )
46    }
47}
48
49/// Common API to the different PubHubs servers.
50///
51/// A single instance of the [`ServerImpl`] implementation of [`Server`] is created
52/// for each server that's being run, and it's mainly responsible for creating
53/// immutable [`App`] instances to be sent to the individual threads.
54///
55/// For efficiency's sake, only the [`App`] instances are available to each thread,
56/// and are mostly immutable. To change the server's state, generally all apps must be restarted.
57///
58/// An exception to this no-shared-mutable-state is the shared state in [`Handle`], for example the
59/// `crate::servers::run::DiscoveryLimiter` and the object store
60pub trait Server: DerefMut<Target = Self::AppCreatorT> + Sized + 'static {
61    type AppT: App<Self>;
62
63    const NAME: Name;
64
65    /// Returns the default TCP port this server binds to.
66    fn default_port() -> u16 {
67        match Self::NAME {
68            // we've changed phc's port to from 8080 to 5050
69            // so that the old and new phc can be run simultaneously.
70            Name::PubhubsCentral => 5050,
71            Name::Transcryptor => 7070,
72            Name::AuthenticationServer => 6060,
73        }
74    }
75
76    /// Is moved accross threads to create the [`App`]s.
77    type AppCreatorT: AppCreator<Self>;
78
79    type ExtraConfig;
80
81    /// Additional state when the server is running
82    type ExtraRunningState: Clone + core::fmt::Debug;
83
84    /// Additional shared state
85    type ExtraSharedState;
86
87    /// Type of this server's object store, usually an [`object_store::ObjectStore`], or `()`.
88    type ObjectStoreT: Sync;
89
90    fn new(config: &crate::servers::Config) -> Result<Self>;
91
92    fn config(&self) -> &crate::servers::Config;
93
94    fn server_config(&self) -> &servers::config::ServerConfig<Self::ExtraConfig> {
95        Self::server_config_from(self.config())
96    }
97
98    fn server_config_from(
99        config: &servers::Config,
100    ) -> &servers::config::ServerConfig<Self::ExtraConfig>;
101
102    fn create_running_state(
103        &self,
104        constellation: &Constellation,
105    ) -> Result<Self::ExtraRunningState>;
106
107    fn create_extra_shared_state(config: &servers::Config) -> Result<Self::ExtraSharedState>;
108
109    /// This function is called when the server is started to run discovery.
110    ///
111    /// It is only passed a shared (and thus immutable) reference to itself to prevent any modifications
112    /// going unnoticed by [`App`] instances.
113    ///
114    /// It can be ordered to stop via the `shutdown_receiver`, in which case
115    /// it should return Ok(None).
116    ///
117    /// If can also return on its own to modify itself via the returned [`BoxModifier`].
118    ///
119    /// If it returns an error, the whole binary crashes.
120    ///
121    /// Before this function's future finishes, it should relinquish all references to `self`.
122    /// Otherwise the modification following it will panic.
123    ///
124    /// It is given its own [`App`] instance.
125    ///
126    /// TODO: remove returning BoxModifier since that can be achieved via App instance?
127    #[expect(async_fn_in_trait)]
128    async fn run_until_modifier(
129        self: Rc<Self>,
130        shutdown_receiver: tokio::sync::oneshot::Receiver<Infallible>,
131        app: Rc<Self::AppT>,
132    ) -> Result<Option<BoxModifier<Self>>>;
133
134    /// Creates cross-origin resource sharing middleware for this server.
135    fn cors() -> actix_cors::Cors {
136        actix_cors::Cors::default()
137            .allow_any_origin()
138            .allowed_methods(["GET", "POST"])
139            .allowed_header(actix_web::http::header::CONTENT_TYPE)
140            .allowed_header(actix_web::http::header::AUTHORIZATION)
141    }
142}
143
144/// Basic implementation of [Server].
145pub struct ServerImpl<D: Details> {
146    config: servers::Config,
147    app_creator: D::AppCreatorT,
148}
149
150impl<D: Details> Deref for ServerImpl<D> {
151    type Target = D::AppCreatorT;
152
153    #[inline]
154    fn deref(&self) -> &Self::Target {
155        &self.app_creator
156    }
157}
158
159impl<D: Details> DerefMut for ServerImpl<D> {
160    #[inline]
161    fn deref_mut(&mut self) -> &mut Self::Target {
162        &mut self.app_creator
163    }
164}
165
166/// Details needed to create a [ServerImpl] type.
167pub trait Details: crate::servers::config::GetServerConfig + 'static + Sized {
168    const NAME: Name;
169    type AppCreatorT;
170    type AppT;
171    type ExtraRunningState: Clone + core::fmt::Debug;
172    type ExtraSharedState;
173    type ObjectStoreT;
174
175    fn create_running_state(
176        server: &ServerImpl<Self>,
177        constellation: &Constellation,
178    ) -> Result<Self::ExtraRunningState>;
179
180    fn create_extra_shared_state(config: &servers::Config) -> Result<Self::ExtraSharedState>;
181}
182
183impl<D: Details> Server for ServerImpl<D>
184where
185    D::AppT: App<Self>,
186    D::AppCreatorT: AppCreator<Self>,
187    D::ObjectStoreT: Sync,
188{
189    const NAME: Name = D::NAME;
190
191    type AppCreatorT = D::AppCreatorT;
192    type AppT = D::AppT;
193
194    type ExtraConfig = D::Extra;
195    type ExtraRunningState = D::ExtraRunningState;
196    type ExtraSharedState = D::ExtraSharedState;
197
198    type ObjectStoreT = D::ObjectStoreT;
199
200    fn new(config: &servers::Config) -> Result<Self> {
201        Ok(Self {
202            app_creator: Self::AppCreatorT::new(config)?,
203            config: config.clone(),
204        })
205    }
206
207    fn config(&self) -> &servers::Config {
208        &self.config
209    }
210
211    fn server_config_from(
212        config: &servers::Config,
213    ) -> &servers::config::ServerConfig<Self::ExtraConfig> {
214        D::server_config(config)
215    }
216
217    fn create_running_state(
218        &self,
219        constellation: &Constellation,
220    ) -> Result<Self::ExtraRunningState> {
221        D::create_running_state(self, constellation)
222    }
223
224    fn create_extra_shared_state(config: &servers::Config) -> Result<Self::ExtraSharedState> {
225        D::create_extra_shared_state(config)
226    }
227
228    async fn run_until_modifier(
229        self: Rc<Self>,
230        shutdown_receiver: tokio::sync::oneshot::Receiver<Infallible>,
231        app: Rc<Self::AppT>,
232    ) -> Result<Option<crate::servers::server::BoxModifier<Self>>> {
233        tokio::select! {
234            res = shutdown_receiver => {
235               res.expect_err("got instance of Infallible");
236               #[expect(clippy::needless_return)] // It's more clear this way
237               return Ok(None);
238            }
239
240            res = self.run_discovery_and_then_wait_forever(app) => {
241               #[expect(clippy::needless_return)] // It's more clear this way
242                return Err(res.expect_err("got instance of Infallible"));
243            }
244        }
245    }
246}
247
248impl<D: Details> ServerImpl<D>
249where
250    D::AppT: App<Self>,
251    D::AppCreatorT: AppCreator<Self>,
252    D::ObjectStoreT: Sync,
253{
254    async fn run_discovery_and_then_wait_forever(&self, app: Rc<D::AppT>) -> Result<Infallible> {
255        self.run_discovery(app.clone()).await?;
256
257        D::AppT::global_task(app).await // waits forever
258    }
259
260    async fn run_discovery(&self, app: Rc<D::AppT>) -> Result<()> {
261        crate::misc::task::retry(|| async {
262            (match
263                AppBase::<Self>::handle_discovery_run(app.clone()).await.retryable()/* <- turns retryable error Err(err) into Ok(None) */?
264            {
265                Some(DiscoveryRunResp::Restarting | DiscoveryRunResp::UpToDate) => Ok(Some(())),
266                None => Ok(None),
267            }) as Result<Option<()>>
268        })
269        .await?
270        .ok_or_else(|| anyhow::anyhow!("timeout waiting for discovery of {server_name}", server_name = D::NAME))
271    }
272}
273
274/// What's cloned and moved accross threads by a [`Server`] to create its [`App`] instances.
275pub trait AppCreator<ServerT: Server>:
276    DerefMut<Target = AppCreatorBase<ServerT>> + Send + Clone + 'static
277{
278    /// When [`App`]s are created a new [`AppCreator::ContextT`]  is created,
279    /// and a reference to it is passed to [`AppCreator::into_app`].
280    type ContextT: Default + Send + Clone + 'static;
281
282    /// Creates a new instance of this [`AppCreator`] based on the given configuration.
283    fn new(config: &servers::Config) -> Result<Self>;
284
285    /// Create an [`App`] instance.
286    ///
287    /// The `handle` [`Handle`] can be used to restart the server.  It's
288    /// up to the implementor to clone it.
289    ///
290    /// `generation` indicates how many times the server has been restarted while running this
291    /// binary
292    fn into_app(
293        self,
294        handle: &Handle<ServerT>,
295        context: &Self::ContextT,
296        generation: usize,
297    ) -> ServerT::AppT;
298}
299
300/// What modifies a [Server] via [Command::Modify].
301///
302/// It is [Send] and `'static` because it's moved accross threads, from an [App] to the task
303/// running the [Server].
304///
305/// We do not use a trait like `(FnOnce(&mut ServerT)) + Send + 'static`,
306/// because it can not (yet) be implemented by users.
307pub trait Modifier<ServerT: Server>: Send + 'static {
308    /// Stops server, perform modification, and restarts server if true was returned.
309    fn modify(self: Box<Self>, server: &mut ServerT) -> bool;
310
311    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error>;
312}
313
314impl<S: Server, F: FnOnce(&mut S) -> bool + Send + 'static, D: std::fmt::Display + Send + 'static>
315    Modifier<S> for (F, D)
316{
317    fn modify(self: Box<Self>, server: &mut S) -> bool {
318        self.0(server)
319    }
320
321    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
322        self.1.fmt(f)
323    }
324}
325
326/// [Modifier] that stops the server
327pub struct Exiter;
328
329impl<S: Server> Modifier<S> for Exiter {
330    fn modify(self: Box<Self>, _server: &mut S) -> bool {
331        false
332    }
333
334    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
335        f.write_str("exiter")
336    }
337}
338
339/// Owned dynamically typed [Modifier].
340pub type BoxModifier<S> = Box<dyn Modifier<S>>;
341
342impl<S: Server> std::fmt::Display for BoxModifier<S> {
343    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
344        Modifier::fmt(&**self, f)
345    }
346}
347
348/// What inspects a server via [Command::Inspect].
349pub(crate) trait Inspector<ServerT: Server>: Send + 'static {
350    /// Calls this function with server as argument
351    fn inspect(self: Box<Self>, server: &ServerT);
352
353    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error>;
354}
355
356impl<S: Server, F: FnOnce(&S) + Send + 'static, D: std::fmt::Display + Send + 'static> Inspector<S>
357    for (F, D)
358{
359    fn inspect(self: Box<Self>, server: &S) {
360        self.0(server)
361    }
362
363    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
364        self.1.fmt(f)
365    }
366}
367
368/// Owned dynamically typed [Inspector].
369pub type BoxInspector<S> = Box<dyn Inspector<S>>;
370
371impl<S: Server> std::fmt::Display for BoxInspector<S> {
372    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
373        Inspector::fmt(&**self, f)
374    }
375}
376
377/// Commands an [App] can issue to its runner.
378pub(crate) enum Command<S: Server> {
379    /// Stop the server, apply the enclosed modification, and, depending on the result restart
380    /// the server.
381    ///
382    /// Server restarts should be performed sparingly, and may take seconds to minutes (because
383    /// actix waits for workers to shutdown gracefully.)
384    Modify(BoxModifier<S>),
385
386    /// Calls the enclosed function on the server
387    Inspect(BoxInspector<S>),
388
389    /// Stops the server
390    Exit,
391}
392impl<S: Server> std::fmt::Display for Command<S> {
393    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
394        match self {
395            Command::Inspect(inspector) => write!(f, "inspector {inspector}"),
396            Command::Modify(modifier) => write!(f, "modifier {modifier}"),
397            Command::Exit => write!(f, "exit"),
398        }
399    }
400}
401
402/// Result of [`App::discover`].
403pub enum DiscoverVerdict {
404    /// My and PHC's constellation seem up-to-date
405    Alright,
406
407    /// My constellation is out-of-date and must be replaced with this constellation
408    ConstellationOutdated {
409        new_constellation: Box<Constellation>,
410    },
411
412    /// My binary is out-of-date.  Exit this binary, and hope the binary is updated.
413    BinaryOutdated,
414}
415
416/// What's common between the [`actix_web::App`]s used by the different PubHubs servers.
417///
418/// Each [`actix_web::App`] gets access to an instance of the appropriate implementation of [`App`]..
419#[allow(async_fn_in_trait)]
420pub trait App<S: Server>: Deref<Target = AppBase<S>> + 'static {
421    /// Allows [`App`] to add server-specific endpoints.  Non-server specific endpoints are added by
422    /// [`AppBase::configure_actix_app`].
423    fn configure_actix_app(self: &Rc<Self>, sc: &mut web::ServiceConfig);
424
425    /// Checks whether the given constellation properly reflects this server's configuration.
426    fn check_constellation(&self, constellation: &Constellation) -> bool;
427
428    /// Runs the discovery routine for this server given [`api::DiscoveryInfoResp`] already
429    /// obtained from Pubhubs Central.  If the server is not PHC itself, the [`Constellation`]
430    /// in this [`api::DiscoveryInfoResp`] must be set.
431    ///
432    /// If one of the other servers is not up-to-date
433    /// according to this server, discovery of that server is invoked and
434    /// [`api::ErrorCode::PleaseRetry`] is returned.
435    async fn discover(
436        self: &Rc<Self>,
437        phc_inf: api::DiscoveryInfoResp,
438    ) -> api::Result<DiscoverVerdict> {
439        log::debug!("{server_name}: running discovery", server_name = S::NAME);
440
441        if S::NAME == Name::PubhubsCentral {
442            log::error!(
443                "{} should implement discovery itself!",
444                Name::PubhubsCentral
445            );
446            return Err(api::ErrorCode::InternalError);
447        }
448
449        if let Some(ref phc_version) = phc_inf.version
450            && let Some(my_version) = &self.version
451        {
452            let phc_version = crate::servers::version::to_semver(phc_version).map_err(|err| {
453                log::error!(
454                    "could not parse semantic version returned by PHC: {phc_version}: {err}"
455                );
456                api::ErrorCode::InternalError
457            })?;
458
459            let my_version = crate::servers::version::to_semver(my_version).map_err(|err| {
460                log::error!("could not parse my semantic version {my_version}: {err}");
461                api::ErrorCode::InternalError
462            })?;
463
464            if my_version < phc_version {
465                log::warn!(
466                    "{server_name}: {phc}'s version ({phc_version}) > my version ({my_version})",
467                    server_name = S::NAME,
468                    phc = Name::PubhubsCentral
469                );
470                return Ok(DiscoverVerdict::BinaryOutdated);
471            }
472
473            if my_version > phc_version {
474                log::warn!(
475                    "{server_name}: {phc}'s version {phc_version} is out-of-date - requesting rediscovery",
476                    server_name = S::NAME,
477                    phc = Name::PubhubsCentral
478                );
479
480                let _drr = self
481                    .client
482                    .query::<api::DiscoveryRun>(&phc_inf.phc_url, NoPayload)
483                    .await
484                    .into_server_result()?;
485                return Err(api::ErrorCode::PleaseRetry);
486            }
487        } else {
488            log::warn!(
489                "not checking my version ({my_version}) against phc's version ({phc_version})",
490                my_version = crate::servers::version::VERSION,
491                phc_version = phc_inf.version.unwrap_or_else(|| "n/a".to_string())
492            );
493        }
494
495        assert!(
496            phc_inf.constellation_or_id.is_some(),
497            "this `discover` method should only be run when phc_inf.constellation is some"
498        );
499
500        let Some(phc_inf_constellation) = phc_inf.constellation_or_id.unwrap().into_constellation()
501        else {
502            log::warn!(
503                "{server_name}: {phc} returned only its contellation id",
504                phc = Name::PubhubsCentral,
505                server_name = S::NAME
506            );
507            return Err(api::ErrorCode::InternalError);
508        };
509
510        if !self.check_constellation(&phc_inf_constellation) {
511            log::warn!(
512                "{server_name}: {phc}'s constellation seems to be out-of-date - requesting rediscovery",
513                server_name = S::NAME,
514                phc = Name::PubhubsCentral
515            );
516
517            // PHC's discovery is out of date; invoke discovery and return
518            let _drr = self
519                .client
520                .query::<api::DiscoveryRun>(&phc_inf.phc_url, NoPayload)
521                .await
522                .into_server_result()?;
523
524            // We don't do anything with _drr: whether or not PHC has been updated in the
525            // meantime, we want to start discovery again from the start.
526
527            return Err(api::ErrorCode::PleaseRetry);
528        }
529
530        log::trace!(
531            "{server_name}: {phc}'s constellation looks alright! ",
532            server_name = S::NAME,
533            phc = Name::PubhubsCentral
534        );
535
536        if let Some(rs) = self.running_state.as_ref()
537            && phc_inf_constellation.id == rs.constellation.id
538        {
539            log::info!(
540                "{server_name}: my constellation is up-to-date!",
541                server_name = S::NAME,
542            );
543
544            return Ok(DiscoverVerdict::Alright);
545        }
546
547        log::info!(
548            "{}: my constellation is {}",
549            S::NAME,
550            if self.running_state.is_some() {
551                "out of date"
552            } else {
553                "not yet set"
554            }
555        );
556
557        // NOTE: phc_inf has already been (partially) checked
558        let url = phc_inf_constellation.url(S::NAME);
559
560        // obtain DiscoveryInfo from oneself
561        let di = self
562            .client
563            .query::<api::DiscoveryInfo>(url, NoPayload)
564            .await
565            .into_server_result()?;
566
567        let _di_again = client::discovery::DiscoveryInfoCheck {
568            name: S::NAME,
569            phc_url: &self.phc_url,
570            self_check_code: Some(&self.self_check_code),
571            constellation: None,
572            // NOTE: we're not checking whether our own constellation is up-to-date,
573            // because it likely is not - why would we run discovery otherwise?
574        }
575        .check(di, url)?;
576
577        Ok(DiscoverVerdict::ConstellationOutdated {
578            new_constellation: Box::new(phc_inf_constellation),
579        })
580    }
581
582    /// Should return the master encryption key part for PHC and the transcryption.
583    fn master_enc_key_part(&self) -> Option<&elgamal::PrivateKey> {
584        if matches!(S::NAME, Name::PubhubsCentral | Name::Transcryptor) {
585            panic!("this default impl should have been  overriden for PHC and T")
586        }
587        None
588    }
589
590    /// Will be invoked for each instance of [`App`] that is created.
591    async fn local_task(_weak: std::rc::Weak<Self>) {}
592
593    /// Will be invoked once for each server, after discovery
594    async fn global_task(_app: std::rc::Rc<Self>) -> Result<Infallible> {
595        Ok(std::future::pending::<Infallible>().await)
596    }
597}
598
599/// What's internally common between PubHubs [`AppCreator`]s.
600pub struct AppCreatorBase<S: Server> {
601    pub running_state: Option<RunningState<S::ExtraRunningState>>,
602    pub phc_url: url::Url,
603    pub self_check_code: String,
604    pub jwt_key: api::SigningKey,
605    pub enc_key: elgamal::PrivateKey,
606    pub admin_key: api::VerifyingKey,
607    pub shared: SharedState<S>,
608    pub version: Option<String>,
609}
610
611// need to implement this manually, because we do not want `Server` to implement `Clone`
612impl<S: Server> Clone for AppCreatorBase<S> {
613    fn clone(&self) -> Self {
614        Self {
615            running_state: self.running_state.clone(),
616            phc_url: self.phc_url.clone(),
617            self_check_code: self.self_check_code.clone(),
618            jwt_key: self.jwt_key.clone(),
619            enc_key: self.enc_key.clone(),
620            admin_key: self.admin_key.clone(),
621            shared: self.shared.clone(),
622            version: self.version.clone(),
623        }
624    }
625}
626
627impl<S: Server> AppCreatorBase<S>
628where
629    S::ObjectStoreT: for<'a> TryFrom<&'a Option<servers::config::ObjectStoreConfig>, Error = anyhow::Error>
630        + Sync,
631{
632    pub fn new(config: &crate::servers::Config) -> Result<Self> {
633        assert_eq!(
634            config.preparation_state,
635            crate::servers::config::PreparationState::Complete
636        );
637
638        let server_config = S::server_config_from(config);
639
640        Ok(Self {
641            running_state: None,
642            self_check_code: server_config
643                .self_check_code
644                .clone()
645                .expect("self_check_code was not set nor generated"),
646            jwt_key: server_config
647                .jwt_key
648                .clone()
649                .expect("jwt_key was not set nor generated"),
650            enc_key: server_config
651                .enc_key
652                .clone()
653                .expect("enc_key was not set nor generated"),
654            phc_url: config.phc_url.as_ref().clone(),
655            admin_key: server_config
656                .admin_key
657                .clone()
658                .expect("admin_key was not set nor generated"),
659            shared: SharedState::new(SharedStateInner {
660                object_store: TryFrom::try_from(&server_config.object_store)
661                    .with_context(|| format!("Creating object store for {}", S::NAME))?,
662                extra: S::create_extra_shared_state(config)?,
663            }),
664            version: server_config.version.clone(),
665        })
666    }
667}
668
669/// What's internally common between PubHubs [`App`]s.
670///
671/// Should *NOT* be cloned.
672pub struct AppBase<S: Server> {
673    pub running_state: Option<RunningState<S::ExtraRunningState>>,
674    pub handle: Handle<S>,
675    pub self_check_code: String,
676    pub phc_url: url::Url,
677    pub jwt_key: api::SigningKey,
678    pub enc_key: elgamal::PrivateKey,
679    pub admin_key: api::VerifyingKey,
680    pub shared: SharedState<S>,
681    pub client: client::Client,
682    pub version: Option<String>,
683    pub thread_id: std::thread::ThreadId,
684    pub generation: usize,
685}
686
687impl<S: Server> Drop for AppBase<S> {
688    fn drop(&mut self) {
689        log::trace!(
690            "{}: app for generation {} that started on {:?} dropped",
691            S::NAME,
692            self.generation,
693            self.thread_id
694        );
695    }
696}
697
698impl<S: Server> AppBase<S> {
699    pub fn new(creator_base: AppCreatorBase<S>, handle: &Handle<S>, generation: usize) -> Self {
700        let thread_id = std::thread::current().id();
701
702        log::trace!(
703            "{}: app for generation {} started on {:?}",
704            S::NAME,
705            generation,
706            thread_id
707        );
708
709        Self {
710            running_state: creator_base.running_state,
711            handle: handle.clone(),
712            phc_url: creator_base.phc_url,
713            self_check_code: creator_base.self_check_code,
714            jwt_key: creator_base.jwt_key,
715            enc_key: creator_base.enc_key,
716            admin_key: creator_base.admin_key,
717            shared: creator_base.shared,
718            client: client::Client::builder()
719                .agent(client::Agent::Server(S::NAME))
720                .finish(),
721            version: creator_base.version,
722            thread_id,
723            generation,
724        }
725    }
726
727    /// Returns the current [`RunningState`] of this server when available.
728    /// Otherwise returns [`api::ErrorCode::PleaseRetry`].
729    pub fn running_state_or_please_retry(
730        &self,
731    ) -> Result<&RunningState<S::ExtraRunningState>, api::ErrorCode> {
732        self.running_state
733            .as_ref()
734            .ok_or(api::ErrorCode::PleaseRetry)
735    }
736
737    /// Returns the current [`RunningState`] of this server when available.
738    /// Otherwise returns [`api::ErrorCode::InternalError`].
739    pub fn running_state_or_internal_error(
740        &self,
741    ) -> Result<&RunningState<S::ExtraRunningState>, api::ErrorCode> {
742        self.running_state.as_ref().ok_or_else(|| {
743            log::error!(
744                "{}: expected running state to be available, but it was not",
745                S::NAME
746            );
747            api::ErrorCode::InternalError
748        })
749    }
750
751    /// Configures common endpoints
752    pub fn configure_actix_app(app: &Rc<S::AppT>, sc: &mut web::ServiceConfig) {
753        api::DiscoveryRun::add_to(app, sc, Self::handle_discovery_run);
754        api::DiscoveryInfo::caching_add_to(app, sc, Self::cached_handle_discovery_info);
755
756        api::admin::UpdateConfigEP::add_to(app, sc, Self::handle_admin_post_config);
757        api::admin::InfoEP::add_to(app, sc, Self::handle_admin_info);
758    }
759
760    /// Shared body of [`api::server::HubPingEP`].  Each server has its own `handle_hub_ping`
761    /// method that delegates here.
762    pub async fn handle_hub_ping(
763        app: Rc<S::AppT>,
764        signed_req: web::Json<api::phc::hub::TicketSigned<api::server::PingReq>>,
765    ) -> api::Result<api::server::PingResp> {
766        let running_state = app.running_state_or_please_retry()?;
767
768        let ts_req = signed_req.into_inner();
769
770        let (req, hub_handle) = match ts_req.open(&running_state.constellation.phc_jwt_key) {
771            Ok(opened) => opened,
772            Err(toe) => return toe.default_verdict(api::server::PingResp::RetryWithNewTicket),
773        };
774
775        Ok(api::server::PingResp::Success {
776            hub_handle,
777            nonce: req.nonce,
778            served_by: S::NAME,
779        })
780    }
781
782    /// Changes server config, and restarts server
783    async fn handle_admin_post_config(
784        app: Rc<S::AppT>,
785        signed_req: web::Json<api::Signed<api::admin::UpdateConfigReq>>,
786    ) -> api::Result<api::admin::UpdateConfigResp> {
787        let signed_req = signed_req.into_inner();
788
789        let req = match signed_req.open(&*app.admin_key, None) {
790            Ok(req) => req,
791            Err(OpenError::OtherConstellation(..)) | Err(OpenError::InternalError) => {
792                return Err(api::ErrorCode::InternalError);
793            }
794            Err(OpenError::OtherwiseInvalid) => return Err(api::ErrorCode::BadRequest),
795            Err(OpenError::Expired) => return Ok(api::admin::UpdateConfigResp::ResignRequest),
796            Err(OpenError::InvalidSignature) => {
797                return Ok(api::admin::UpdateConfigResp::InvalidAdminKey);
798            }
799        };
800
801        // Before restarting the server, check that the modification would work,
802        // so we can return an error to the requestor.  Once we issue a modification command
803        // the present connection is severed, and so no error can be returned.
804
805        let config = app
806            .handle
807            .inspect(
808                "admin's retrieval of current configuration",
809                |server: &S| -> Config { server.config().clone() },
810            )
811            .await
812            .into_ec(|_| {
813                log::warn!("{}: failed to retrieve configuration from server", S::NAME,);
814                api::ErrorCode::PleaseRetry // probably the server is restarting
815            })?;
816
817        let mut new_config: Config = config
818            .json_updated(&req.pointer, req.new_value.clone())
819            .into_ec(|err| {
820                log::warn!(
821                    "{}: failed to modify configuration at {} to {}: {err:#}",
822                    S::NAME,
823                    req.pointer,
824                    req.new_value
825                );
826                api::ErrorCode::BadRequest
827            })?;
828
829        drop(config);
830
831        // reprepare config...
832        new_config.preliminary_prep().into_ec(|err| {
833            log::warn!(
834                "{}: failed to reprepare (preliminary step) modified configuration: {err}",
835                S::NAME
836            );
837            api::ErrorCode::BadRequest
838        })?;
839        new_config.prepare().await.into_ec(|err| {
840            log::warn!(
841                "{}: failed to reprepare modified configuration: {err}",
842                S::NAME
843            );
844            api::ErrorCode::BadRequest
845        })?;
846
847        // All is well - let's restart the server with the new configuration
848        app
849        .handle
850        .modify(
851            "admin update of current in-memory configuration",
852            move |server: &mut S| {
853
854                let new_server_maybe = S::new(&new_config);
855
856                if let Err(err) = new_server_maybe {
857                    log::error!("Could not create new {} with changed configuration: {}. Restarting old server.", S::NAME, err);
858                    return true; // restart
859                }
860
861                *server = new_server_maybe.unwrap();
862
863                true // restart
864            }
865        )
866        .await
867        .into_ec(|_| {
868            log::warn!("{}: failed to enqueue modification", S::NAME);
869            api::ErrorCode::PleaseRetry
870        })?;
871
872        Ok(api::admin::UpdateConfigResp::Success)
873    }
874
875    /// Retrieve non-public information about the server
876    async fn handle_admin_info(
877        app: Rc<S::AppT>,
878        signed_req: web::Json<api::Signed<api::admin::InfoReq>>,
879    ) -> api::Result<api::admin::InfoResp> {
880        let signed_req = signed_req.into_inner();
881
882        let _req = match signed_req.open(&*app.admin_key, None) {
883            Ok(req) => req,
884            Err(OpenError::OtherConstellation(..)) | Err(OpenError::InternalError) => {
885                return Err(api::ErrorCode::InternalError);
886            }
887            Err(OpenError::OtherwiseInvalid) => return Err(api::ErrorCode::BadRequest),
888            Err(OpenError::Expired) => return Ok(api::admin::InfoResp::ResignRequest),
889            Err(OpenError::InvalidSignature) => return Ok(api::admin::InfoResp::InvalidAdminKey),
890        };
891
892        let config = app
893            .handle
894            .inspect(
895                "admin's retrieval of current configuration",
896                |server: &S| -> Config { server.config().clone() },
897            )
898            .await
899            .into_ec(|_| {
900                log::warn!("{}: failed to retrieve configuration from server", S::NAME,);
901                api::ErrorCode::PleaseRetry // probably the server is restarting
902            })?;
903
904        Ok(api::admin::InfoResp::Success {
905            config: Box::new(config),
906        })
907    }
908
909    /// Run the discovery process, and restarts server if necessary.  Returns when
910    /// the discovery process is completed, but before a possible restart.
911    async fn handle_discovery_run(app: Rc<S::AppT>) -> api::Result<api::DiscoveryRunResp> {
912        app.handle.request_discovery(app.clone()).await
913    }
914
915    pub(super) async fn discover_phc(app: Rc<S::AppT>) -> api::Result<api::DiscoveryInfoResp> {
916        let pdi = app
917            .client
918            .query::<api::DiscoveryInfo>(&app.phc_url, NoPayload)
919            .await
920            .into_server_result()?;
921
922        client::discovery::DiscoveryInfoCheck {
923            phc_url: &app.phc_url,
924            name: Name::PubhubsCentral,
925            self_check_code: if S::NAME == Name::PubhubsCentral {
926                Some(&app.self_check_code)
927            } else {
928                None
929            },
930            constellation: None,
931            // NOTE: don't check whether our constellation coincides with PHC's constellation here,
932            // because if they're not the same that will cause an error to be returned, while
933            // we want to initiate a restart instead.
934        }
935        .check(pdi, &app.phc_url)
936    }
937
938    fn cached_handle_discovery_info(app: &S::AppT) -> api::Result<api::DiscoveryInfoResp> {
939        use super::constellation::ConstellationOrId;
940
941        // Return our constellation (if we have one), but only its id if we're not PHC.
942        let constellation_or_id = app.running_state.as_ref().map(|rs| match S::NAME {
943            Name::PubhubsCentral => ConstellationOrId::Constellation(
944                AsRef::<Constellation>::as_ref(&rs.constellation)
945                    .clone()
946                    .into(),
947            ),
948            _ => ConstellationOrId::Id {
949                id: rs.constellation.id,
950            },
951        });
952
953        Ok(api::DiscoveryInfoResp {
954            name: S::NAME,
955            version: app.version.clone(),
956            self_check_code: app.self_check_code.clone(),
957            phc_url: app.phc_url.clone(),
958            // NOTE on efficiency:  the ed25519_dalek::SigningKey contains a precomputed
959            // ed25519_dalek::VerifyingKey, which contains a precomputed compressed (=serialized)
960            // form.  So no expensive cryptographic operations like finite field inversion
961            // or scalar multiplication are performed here.
962            jwt_key: app.jwt_key.verifying_key().into(),
963            enc_key: app.enc_key.public_key().clone(),
964            master_enc_key_part: app
965                .master_enc_key_part()
966                .map(|privk| privk.public_key().clone()),
967            constellation_or_id,
968        })
969    }
970}
971
972/// An [`App`] together with a method on it.  Used to pass [`App`]s to [`actix_web::Handler`]s
973/// as first argument. See [`api::EndpointDetails::add_to`].
974pub struct AppMethod<App, F, EP: ?Sized> {
975    app: Rc<App>,
976    f: F,
977    phantom: std::marker::PhantomData<EP>,
978}
979
980/// Implement [`Clone`] manually so we don't have to require `EP` to implement
981/// [`Clone`].
982impl<App, F: Clone, EP> Clone for AppMethod<App, F, EP> {
983    fn clone(&self) -> Self {
984        Self {
985            app: self.app.clone(),
986            f: self.f.clone(),
987            phantom: std::marker::PhantomData,
988        }
989    }
990}
991
992impl<App, F, EP: ?Sized> AppMethod<App, F, EP> {
993    /// Creates a new [`AppMethod`], cloning [`App`].
994    pub fn new(app: &Rc<App>, f: F) -> Self {
995        AppMethod {
996            app: app.clone(),
997            f,
998            phantom: std::marker::PhantomData,
999        }
1000    }
1001}
1002
1003/// Implements [`actix_web::Handler`] for an [`AppMethod`] with the given number of arguments.
1004///
1005/// Based on [`actix_web`]'s implementation of [`actix_web::Handler`] for [`Fn`]s.
1006macro_rules! factory_tuple ({ $($param:ident)* } => {
1007    impl<Func, Fut, App, EP, $($param,)*> actix_web::Handler<($($param,)*)> for AppMethod<App, Func, EP>
1008    where
1009        Func: Fn(Rc<App>, $($param),*) -> Fut + Clone + 'static,
1010        Fut: core::future::Future,
1011        App: 'static,
1012        EP : EndpointDetails + 'static,
1013        Fut::Output : Into<EP::ResponseType>,
1014    {
1015        type Output = api::Responder<EP>;
1016        type Future = futures::future::Map<Fut, fn(Fut::Output)->api::Responder<EP>>;
1017
1018        #[inline]
1019        // allow(non_snake_case), because the signature will be:  call(&self, A: A, B: B, ...)
1020        // not expect(...), because this macro definition does not fulfill this condition
1021        #[allow(non_snake_case)]
1022        fn call(&self, ($($param,)*): ($($param,)*)) -> Self::Future {
1023            (self.f)(self.app.clone(), $($param,)*).map(response_type_to_responder)
1024        }
1025    }
1026});
1027
1028/// Helper method for [`factory_tuple`] macro.
1029fn response_type_to_responder<EP: EndpointDetails, T: Into<EP::ResponseType>>(
1030    output: T,
1031) -> api::Responder<EP> {
1032    api::Responder(output.into())
1033}
1034
1035factory_tuple! {}
1036factory_tuple! { A }
1037factory_tuple! { A B }
1038factory_tuple! { A B C }
1039factory_tuple! { A B C D }
1040factory_tuple! { A B C D E }
1041factory_tuple! { A B C D E F }
1042factory_tuple! { A B C D E F G }
1043factory_tuple! { A B C D E F G H }
1044factory_tuple! { A B C D E F G H I }
1045factory_tuple! { A B C D E F G H I J }
1046factory_tuple! { A B C D E F G H I J K }
1047factory_tuple! { A B C D E F G H I J K L }
1048factory_tuple! { A B C D E F G H I J K L M }
1049factory_tuple! { A B C D E F G H I J K L M N }
1050factory_tuple! { A B C D E F G H I J K L M N O }
1051factory_tuple! { A B C D E F G H I J K L M N O P }
1052
1053/// Additional state when discovery has been completed.  Derefs to `Extra`.
1054#[derive(Clone, Debug)]
1055pub struct RunningState<Extra: Clone + core::fmt::Debug> {
1056    pub constellation: Box<Constellation>,
1057
1058    /// Accessible via [`Deref`].
1059    extra: Extra,
1060}
1061
1062impl<Extra: Clone + core::fmt::Debug> RunningState<Extra> {
1063    pub(crate) fn new(constellation: Constellation, extra: Extra) -> Self {
1064        RunningState {
1065            constellation: Box::new(constellation),
1066            extra,
1067        }
1068    }
1069}
1070
1071impl<Extra: Clone + core::fmt::Debug> Deref for RunningState<Extra> {
1072    type Target = Extra;
1073
1074    #[inline]
1075    fn deref(&self) -> &Extra {
1076        &self.extra
1077    }
1078}
1079
1080/// Shared state between [`App`]s.  Use sparingly!
1081pub struct SharedState<S: Server> {
1082    inner: std::sync::Arc<SharedStateInner<S>>,
1083}
1084
1085impl<S: Server> Clone for SharedState<S> {
1086    fn clone(&self) -> Self {
1087        Self {
1088            inner: self.inner.clone(),
1089        }
1090    }
1091}
1092
1093impl<S: Server> std::ops::Deref for SharedState<S> {
1094    type Target = SharedStateInner<S>;
1095
1096    #[inline]
1097    fn deref(&self) -> &Self::Target {
1098        &self.inner
1099    }
1100}
1101
1102impl<S: Server> SharedState<S> {
1103    fn new(inner: SharedStateInner<S>) -> Self {
1104        Self {
1105            inner: std::sync::Arc::new(inner),
1106        }
1107    }
1108}
1109
1110pub struct SharedStateInner<S: Server> {
1111    pub object_store: S::ObjectStoreT,
1112    pub extra: S::ExtraSharedState,
1113}
1114
1115impl<S: Server> std::ops::Deref for SharedStateInner<S> {
1116    type Target = S::ExtraSharedState;
1117
1118    #[inline]
1119    fn deref(&self) -> &Self::Target {
1120        &self.extra
1121    }
1122}