Skip to main content

pubhubs/servers/
run.rs

1//! Running PubHubs [`Server`]s
2use std::num::NonZero;
3use std::rc::Rc;
4use std::sync::Arc;
5
6use actix_web::web;
7use anyhow::{Context as _, Result, bail};
8use core::convert::Infallible;
9use tokio::sync::mpsc;
10
11use crate::api;
12use crate::misc::defer;
13use crate::servers::{
14    App, AppBase, AppCreator, Command, DiscoverVerdict, Name, Server, for_all_servers,
15    server::RunningState,
16};
17
18/// A set of running PubHubs servers.
19pub struct Set {
20    /// Handle to the task waiting on [SetInner::wait].
21    wait_jh: tokio::task::JoinHandle<usize>,
22}
23
24impl Set {
25    /// Creates a new set of PubHubs servers from the given config.
26    /// To signal shutdown of these senders, drop the returned `sender`.
27    pub fn new(
28        config: &crate::servers::Config,
29    ) -> Result<(Self, tokio::sync::oneshot::Sender<Infallible>)> {
30        let (inner, shutdown_sender) = SetInner::new(config)?;
31
32        let wait_jh = tokio::task::spawn(inner.wait());
33
34        Ok((Self { wait_jh }, shutdown_sender))
35    }
36
37    /// Waits for one of the servers to return, panic, or be cancelled.
38    /// If that happens, the other servers are directed to shutdown as well.
39    ///
40    /// Returns the number of servers that did *not* shutdown cleanly.
41    ///
42    /// Panics when the tokio runtime is shut down.
43    pub async fn wait(self) -> usize {
44        match self.wait_jh.await {
45            Ok(nr) => nr,
46            Err(join_error) => {
47                panic!("task waiting on servers to exit was cancelled or panicked: {join_error}");
48            }
49        }
50    }
51}
52
53/// A set of running PubHubs servers.
54struct SetInner {
55    /// The servers' tasks
56    joinset: tokio::task::JoinSet<Result<()>>,
57
58    /// Via `shutdown_sender` [`Set`] broadcasts the instruction to shutdown to all servers
59    /// running in the `joinset`.  It does so not by `send`ing a message, but by dropping
60    /// the `shutdown_sender`.
61    shutdown_sender: Option<tokio::sync::broadcast::Sender<Infallible>>,
62
63    /// Via `shutdown_receiver`, the [Set] received the instruction to close.
64    shutdown_receiver: tokio::sync::oneshot::Receiver<Infallible>,
65}
66
67impl Drop for SetInner {
68    fn drop(&mut self) {
69        if self.shutdown_sender.is_some() {
70            log::error!(
71                "the completion of all pubhubs servers was not awaited - please consume SetInner using wait() or shutdown()"
72            )
73        }
74    }
75}
76
77impl SetInner {
78    /// Creates a new set of PubHubs servers from the given config.
79    ///
80    /// Returns not only the [`SetInner`] instance, but also a [`tokio::sync::oneshot::Sender<Infallible>`]
81    /// that can be dropped to signal the [`SetInner`] should shutdown.
82    pub fn new(
83        config: &crate::servers::Config,
84    ) -> Result<(Self, tokio::sync::oneshot::Sender<Infallible>)> {
85        let rt_handle: tokio::runtime::Handle = tokio::runtime::Handle::current();
86        let mut joinset = tokio::task::JoinSet::<Result<()>>::new();
87
88        let (shutdown_sender, _) = tokio::sync::broadcast::channel(1); // NB capacity of 0 is not allowed
89
90        // count the number of servers
91        let server_count: usize = {
92            let mut counter: usize = 0;
93
94            macro_rules! count_server {
95                ($server:ident) => {
96                    if config.$server.is_some() {
97                        counter += 1;
98                    }
99                };
100            }
101
102            for_all_servers!(count_server);
103
104            counter
105        };
106
107        // don't use one thread per code for each server - this speeds up testing
108        let worker_count: Option<NonZero<usize>> =
109            std::thread::available_parallelism()
110                .ok()
111                .map(|parallelism: NonZero<usize>| {
112                    if server_count == 0 {
113                        return NonZero::<usize>::new(1).unwrap();
114                    }
115
116                    NonZero::<usize>::try_from(parallelism.get() / server_count)
117                        .unwrap_or(NonZero::<usize>::new(1).unwrap()) // more servers than cores
118                });
119
120        macro_rules! run_server {
121            ($server:ident) => {
122                if config.$server.is_some() {
123                    let config = config.clone();
124                    let rt_handle = rt_handle.clone();
125                    let shutdown_receiver = shutdown_sender.subscribe();
126
127                    // We use spawn_blocking instead of spawn, because we want a separate thread
128                    // for each server to run on
129                    joinset.spawn_blocking(move || -> Result<()> {
130                        Self::run_server::<crate::servers::$server::Server>(
131                            config,
132                            rt_handle,
133                            shutdown_receiver,
134                            worker_count,
135                        )
136                    });
137                }
138            };
139        }
140
141        for_all_servers!(run_server);
142
143        let (external_shutdown_sender, external_shutdown_receiver) =
144            tokio::sync::oneshot::channel();
145
146        Ok((
147            Self {
148                joinset,
149                shutdown_sender: Some(shutdown_sender),
150                shutdown_receiver: external_shutdown_receiver,
151            },
152            external_shutdown_sender,
153        ))
154    }
155
156    // Creates a server from the given `config` and run it ont the given tokio runtime.
157    //
158    // Abort when the `shutdown_receiver` channel is closed.
159    fn run_server<S: Server>(
160        config: crate::servers::Config,
161        rt_handle: tokio::runtime::Handle,
162        shutdown_receiver: tokio::sync::broadcast::Receiver<Infallible>,
163        worker_count: Option<NonZero<usize>>,
164    ) -> Result<()> {
165        assert!(config.preparation_state == crate::servers::config::PreparationState::Preliminary);
166
167        let localset = tokio::task::LocalSet::new();
168
169        let fut = localset.run_until(async {
170            let config = config.prepare_for(S::NAME).await?;
171
172            crate::servers::run::Runner::<S>::new(&config, shutdown_receiver, worker_count)?
173                .run()
174                .await
175        });
176
177        let result = rt_handle.block_on(fut);
178
179        rt_handle.block_on(localset);
180
181        log::debug!("{} stopped with {:?}", S::NAME, result);
182
183        result
184    }
185
186    /// Waits for one of the servers to return, panic, or be cancelled.
187    /// If that happens, the other servers are directed to shutdown as well.
188    ///
189    /// If this function is not called, servers can fail silently.
190    ///
191    /// Returns the number of servers that did *not* shutdown cleanly
192    pub async fn wait(mut self) -> usize {
193        log::trace!("waiting for one of the servers to exit...");
194        let err_count: usize = tokio::select! {
195            // either one of the servers exits
196            result_maybe = self
197                .joinset
198                .join_next() => {
199                    let result = result_maybe.expect("no servers to wait on");
200                    let is_err : bool =  matches!(result, Err(_) | Ok(Err(_)));
201
202                    log::log!( if is_err { log::Level::Error } else { log::Level::Debug },
203                        "one of the servers exited with {result:?};  stopping all servers.."
204                    );
205
206                    if is_err {
207                        1
208                    } else {
209                        0
210                    }
211                },
212
213            // or we get the command to shut down from higher up
214            result = &mut self.shutdown_receiver => {
215                result.expect_err("received Infallible");
216                log::debug!("shutdown requested"); 0
217            }
218        };
219
220        self.shutdown().await + err_count
221    }
222
223    /// Requests shutdown of all servers, and wait for it to complete.
224    /// Returns the number of servers that did *not* shutdown cleanly.
225    pub async fn shutdown(mut self) -> usize {
226        assert!(
227            self.shutdown_sender.is_some(),
228            "only signal_shutdown should take shutdown_sender"
229        );
230
231        // This causes the shutdown_receivers at the different servers to be closed,
232        // which in turn should cause those servers' threads to join.
233        drop(self.shutdown_sender.take());
234
235        let mut err_count = 0usize;
236
237        while let Some(result) = self.joinset.join_next().await {
238            err_count += if matches!(result, Err(_) | Ok(Err(_))) {
239                1
240            } else {
241                0
242            };
243        }
244
245        // join_next returned None, which means the JoinSet is empty
246
247        err_count
248    }
249}
250
251/// Runs a [`Server`]
252struct Runner<ServerT: Server> {
253    pubhubs_server: Rc<ServerT>,
254    shutdown_receiver: tokio::sync::broadcast::Receiver<Infallible>,
255    worker_count: Option<NonZero<usize>>,
256
257    /// Number of restarts (i.e. modifications applied)
258    generation: usize,
259}
260
261/// The handles to control an [actix_web::dev::Server] running a pubhubs [Server].
262struct Handles<S: Server> {
263    /// Handle to the actual actix TCP server.  The [actix_web::dev::Server] is owned
264    /// by the task driving it.
265    actix_server_handle: actix_web::dev::ServerHandle,
266
267    /// Handle to the task driving the actix TCP server
268    actix_join_handle: tokio::task::JoinHandle<Result<(), std::io::Error>>,
269
270    /// Handle to the task running (discovery for) the PubHubs server.
271    ph_join_handle: tokio::task::JoinHandle<Result<Option<crate::servers::server::BoxModifier<S>>>>,
272
273    /// Dropped to order `ph_join_handle` to shutdown.  [None] when used.
274    ph_shutdown_sender: Option<tokio::sync::oneshot::Sender<Infallible>>,
275
276    /// Receives commands from the [App]s
277    command_receiver: mpsc::Receiver<CommandRequest<S>>,
278
279    /// To check whether [Handles::shutdown] was completed before being dropped
280    drop_bomb: crate::misc::drop_ext::Bomb,
281}
282
283impl<S: Server> Handles<S> {
284    /// Drives the actix server until a [Command] is received - which is returned
285    async fn run_until_command(&mut self, runner: &mut Runner<S>) -> anyhow::Result<Command<S>> {
286        tokio::select! {
287
288            // received command from running pubhubs/actix server
289            command_request_maybe = self.command_receiver.recv() => {
290                if let Some(command_request) = command_request_maybe {
291                    return Ok(command_request.accept());
292                }
293
294                log::error!("{}'s command receiver is unexpectedly closed", S::NAME);
295                bail!("{}'s command receiver is unexpectedly closed", S::NAME);
296            },
297
298            // pubhubs server exited, returning a modification request
299            res = &mut self.ph_join_handle => {
300                let modifier : crate::servers::server::BoxModifier<S> =
301                res.with_context(|| format!("{}'s pubhubs task joined unexpectedly", S::NAME))?
302                    .with_context(|| format!("{}'s pubhubs task crashed", S::NAME))?
303                    .with_context(|| format!("{}'s pubhubs task stopped without being asked to", S::NAME))?;
304
305
306                #[expect(clippy::needless_return)] // "return" makes the code more readable here
307                return Ok(Command::Modify(modifier));
308            },
309
310            // the thread running this server wants us to quit
311            Err(err) = runner.shutdown_receiver.recv() => {
312                match err {
313                    tokio::sync::broadcast::error::RecvError::Lagged(_) => {
314                        panic!("got impossible `Lagged` error from shutdown sender");
315                    },
316                    tokio::sync::broadcast::error::RecvError::Closed => {
317                        #[expect(clippy::needless_return)] // "return" is more readable here
318                        return Ok(Command::Exit);
319                    },
320                }
321            },
322
323            // the actix serer exited unexpectedly
324            res = &mut self.actix_join_handle => {
325                res.inspect_err(|err| log::error!("{}'s actix task joined unexpectedly: {}", S::NAME, err) )
326                    .with_context(|| format!("{}'s actix task joined unexpectedly", S::NAME))?
327                    .inspect_err(|err| log::error!("{}'s http server crashed: {}", S::NAME, err) )
328                    .with_context(|| format!("{}'s http server crashed", S::NAME))?;
329
330                log::error!("{}'s actix server stopped unexpectedly", S::NAME);
331                bail!("{}'s actix server stopped unexpectedly", S::NAME);
332            },
333        };
334    }
335
336    /// Consumes this [`Handles`] shutting down the actix server and pubhubs tasks.
337    async fn shutdown(mut self) -> anyhow::Result<()> {
338        log::debug!("Shut down of {} started", S::NAME);
339
340        anyhow::ensure!(
341            self.ph_shutdown_sender.is_some(),
342            "shutdown of ph task already ordered"
343        );
344
345        drop(self.ph_shutdown_sender.take());
346
347        // This is a noop if the actix server is already stopped
348        //
349        // We do not use graceful shutdown, becaus in practise the persistent connections
350        // delay shutdown for the maximal shutdown timeout, which causes more disruption
351        // than the graceful shutdown aims to prevent
352        self.actix_server_handle.stop(false).await;
353
354        let maybe_modifier = self
355            .ph_join_handle
356            .await
357            .with_context(|| {
358                format!(
359                    "{}'s pubhubs task did not join gracefully after being asked to stop",
360                    S::NAME
361                )
362            })?
363            .with_context(|| {
364                format!(
365                    "{}'s pubhubs task crashed after being asked to stop",
366                    S::NAME
367                )
368            })?;
369
370        if let Some(ph_modifier) = maybe_modifier {
371            log::error!(
372                "Woops! {}'s pubhubs task's modifier {} was ignored, because another modifier was first",
373                S::NAME,
374                ph_modifier
375            );
376        }
377
378        self.drop_bomb.diffuse();
379
380        log::debug!("Shut down of {} completed", S::NAME);
381
382        Ok(())
383    }
384}
385
386/// Encapsulates the handling of running just one discovery process per server
387struct DiscoveryLimiter {
388    /// Lock that makes sure only one discovery task is running at the same time.
389    ///
390    /// The protected value is true when restart due to a changed constellation is imminent.
391    restart_imminent_lock: Arc<tokio::sync::RwLock<bool>>,
392
393    /// Set when contents of `restart_imminent_lock` lock was observed to be true,
394    /// reducing the load on this lock.
395    restart_imminent_cached: std::cell::OnceCell<()>,
396}
397
398impl Clone for DiscoveryLimiter {
399    fn clone(&self) -> Self {
400        Self {
401            restart_imminent_lock: self.restart_imminent_lock.clone(),
402            // The DiscoveryLimiter is never cloned as part of an `AppBase`.
403            restart_imminent_cached: std::cell::OnceCell::<()>::new(),
404        }
405    }
406}
407
408impl DiscoveryLimiter {
409    fn new() -> Self {
410        DiscoveryLimiter {
411            restart_imminent_lock: Arc::new(tokio::sync::RwLock::new(false)),
412            restart_imminent_cached: std::cell::OnceCell::<()>::new(),
413        }
414    }
415
416    /// This functions contains the discovery logic that's shared between servers.
417    ///
418    /// Discovery can be invoked for two reasons:  
419    ///
420    ///  1. This server has just been restarted and is not aware of the current constellation.
421    ///     Perhaps part of our configuration has changed that makes the current constellation
422    ///     obsolete.
423    ///
424    ///  2. The `.ph/discovery/run` endpoint was triggered.  This should happen when another
425    ///     server detects that our constellation is out-of-date, but since the `.ph/discovery/run`
426    ///     endpoint is unprotected, anyone can invoke it at any time.
427    ///
428    ///     The non-PHC servers check during their discovery whether the constellation PHC advertises
429    ///     is up-to-date with respect to their own configuration.  If it isn't, the non-PHC server
430    ///     triggers PHC to run discovery.
431    ///     
432    ///     PHC checks during its discovery (after it obtained recent details from each of the
433    ///     other servers) whether the constellations of the other servers are up-to-date,
434    ///     and will trigger their discovery routine when these aren't.
435    ///
436    ///
437    /// Thus the procedure for discovery is as follows.
438    ///
439    ///
440    /// Non-PHC:
441    ///   
442    ///  1. Obtain constellation from PHC.  Return if it coincides with the constellation that we already
443    ///     got - if we already got one.
444    ///  2. Check the constellation against our own configuration.  If it's up-to-date, restart
445    ///     this server but with the new constellation.
446    ///  3. Invoke discovery on PHC, and return a retryable error - effectively go back to step 1.
447    ///
448    ///
449    /// PHC:
450    ///
451    ///  1. Obtain discovery info from ourselves - i.e. check whether `phc_url` is configured
452    ///     correctly.
453    ///  2. Retrieve discovery info from the other servers and construct a constellation from it.
454    ///  3. If the constellation has changed, restart to update it.
455    ///  4. Invoke discovery on those servers that have no or outdated constellations,
456    ///     and return a retryable error - effectively go back to step 1.
457    ///
458    ///
459    async fn request_discovery<S: Server>(
460        &self,
461        app: Rc<S::AppT>,
462    ) -> api::Result<api::DiscoveryRunResp> {
463        log::debug!(
464            "{server_name}: discovery is requested",
465            server_name = S::NAME
466        );
467
468        let mut restart_imminent_guard = match self.obtain_lock().await {
469            Some(guard) => guard,
470            None => {
471                log::debug!(
472                    "{server_name}: discovery aborted because the server is already restarting",
473                    server_name = S::NAME
474                );
475                return Ok(api::DiscoveryRunResp::Restarting);
476            }
477        };
478
479        // Obtain discovery info from PHC (even when we are PHC ourselves, for perhaps
480        // the phc_url is misconfigured) and perform some basis checks.
481        // Should not return an error when our constellation is out of sync.
482        let phc_discovery_info = AppBase::<S>::discover_phc(app.clone()).await?;
483
484        if phc_discovery_info.constellation_or_id.is_none() && S::NAME != Name::PubhubsCentral {
485            // PubHubs Central is not yet ready - make the caller retry
486            log::info!(
487                "Discovery of {} is run but {} has no constellation yet",
488                S::NAME,
489                Name::PubhubsCentral,
490            );
491            return Err(api::ErrorCode::PleaseRetry);
492        }
493
494        let new_constellation_maybe = match app.discover(phc_discovery_info).await? {
495            DiscoverVerdict::ConstellationOutdated { new_constellation } => Some(new_constellation),
496            DiscoverVerdict::BinaryOutdated => None,
497            DiscoverVerdict::Alright => return Ok(api::DiscoveryRunResp::UpToDate),
498        };
499
500        // modify server, and restart (to modify all Apps)
501
502        let result = app
503            .handle
504            .modify(
505                if new_constellation_maybe.is_some() {
506                    "updated constellation after discovery"
507                } else {
508                    "restarting binary hoping to update version"
509                },
510                |server: &mut S| -> bool {
511                    let Some(new_constellation) = new_constellation_maybe else {
512                        return false; // no, don't restart the server, but exit the binary so that
513                        // - hopefully - a new version of the binary will be started
514                        // by e.g. systemd
515                    };
516
517                    let extra = match server.create_running_state(&new_constellation) {
518                        Ok(extra) => extra,
519                        Err(err) => {
520                            log::error!(
521                                "Error while restarting {} after discovery: {}",
522                                S::NAME,
523                                err
524                            );
525                            return false; // do not restart
526                        }
527                    };
528
529                    let new_url = new_constellation.url(S::NAME).clone();
530
531                    let old_running_state = server
532                        .running_state
533                        .replace(RunningState::new(*new_constellation, extra));
534
535                    // See if our url has changed
536                    if old_running_state.is_none_or(|rs| rs.constellation.url(S::NAME) != &new_url)
537                    {
538                        log::info!("{}: at {}", S::NAME, new_url);
539                    }
540
541                    true // yes, restart this server
542                },
543            )
544            .await;
545
546        if let Err(()) = result {
547            log::warn!(
548                "failed to initiate restart of {} for discovery, probably because the server is already shutting down",
549                S::NAME,
550            );
551            return Err(api::ErrorCode::PleaseRetry);
552        }
553
554        log::trace!(
555            "{server_name}: registering imminent restart",
556            server_name = S::NAME
557        );
558        *restart_imminent_guard = true;
559        let _ = self.restart_imminent_cached.set(());
560
561        Ok(api::DiscoveryRunResp::Restarting)
562    }
563
564    /// Obtains write lock to `self.restart_imminent_lock` when restart is not imminent.
565    async fn obtain_lock(&self) -> Option<tokio::sync::RwLockWriteGuard<'_, bool>> {
566        if self.restart_imminent_cached.get().is_some() {
567            log::trace!("restart imminent: cached");
568            return None;
569        }
570
571        if *self.restart_imminent_lock.read().await {
572            log::trace!("restart imminent: discovered after obtaining read lock");
573            let _ = self.restart_imminent_cached.set(());
574            return None;
575        }
576
577        let restart_imminent_guard = self.restart_imminent_lock.write().await;
578
579        if *restart_imminent_guard {
580            // while we re-obtained the lock, discovery has completed
581            log::trace!("restart imminent: discovered after obtaining write lock");
582            let _ = self.restart_imminent_cached.set(());
583            return None;
584        }
585
586        Some(restart_imminent_guard)
587    }
588}
589
590/// Handle to a [`Server`] passed to [`App`]s.
591///
592/// Used to issue commands to the server.  Since discovery is requested a often a separate struct
593/// is used to deal with discovery requests.
594pub struct Handle<S: Server> {
595    /// To send commands to the server
596    sender: mpsc::Sender<CommandRequest<S>>,
597
598    /// To coordinate the handling of discovery requests
599    discovery_limiter: DiscoveryLimiter,
600}
601
602struct CommandRequest<S: Server> {
603    /// The actual command
604    command: Command<S>,
605    /// A way for the [`Server`] to inform the [`App`] that the command is about to be executed.
606    feedback_sender: tokio::sync::oneshot::Sender<()>,
607}
608
609impl<S: Server> CommandRequest<S> {
610    /// Let's the issuer of the command know that the command is to be fulfilled
611    fn accept(self) -> Command<S> {
612        if self.feedback_sender.send(()).is_err() {
613            log::warn!(
614                "The app issuing command '{}' that is about to execute has already dropped.",
615                &self.command
616            );
617        }
618        self.command
619    }
620}
621
622// We cannot use "derive(Clone)", because Server is not Clone.
623impl<S: Server> Clone for Handle<S> {
624    fn clone(&self) -> Self {
625        Handle {
626            sender: self.sender.clone(),
627            discovery_limiter: self.discovery_limiter.clone(),
628        }
629    }
630}
631
632impl<S: Server> Handle<S> {
633    /// Issues command to [Runner].  Waits for the command to be next in line,
634    /// but does not wait for the command to be completed.
635    ///
636    /// May return `Err(())` when another command shutdown the server before this
637    /// command could be executed.
638    ///
639    /// When `Ok(())` is returned, this means the command is guaranteed to be executed momentarily.
640    pub(crate) async fn issue_command(&self, command: Command<S>) -> Result<(), ()> {
641        let (feedback_sender, feedback_receiver) = tokio::sync::oneshot::channel();
642
643        let result = self
644            .sender
645            .send(CommandRequest {
646                command,
647                feedback_sender,
648            })
649            .await;
650
651        if let Err(send_error) = result {
652            log::warn!(
653                "{server_name}: since the command receiver is closed (probably because the server is shutting down/restarting) we could not issue the command {cmd:?}",
654                server_name = S::NAME,
655                cmd = send_error.0.command.to_string(),
656            );
657            return Err(());
658        };
659
660        // Wait for the command to be the next in line.
661        //
662        // If feedback receiver returns an error, the command might not have executed.
663        feedback_receiver.await.map_err(|_| ())
664    }
665
666    pub async fn modify(
667        &self,
668        display: impl std::fmt::Display + Send + 'static,
669        modifier: impl FnOnce(&mut S) -> bool + Send + 'static,
670    ) -> Result<(), ()> {
671        self.issue_command(Command::Modify(Box::new((modifier, display))))
672            .await
673    }
674
675    /// Executes `inspector` on the server instance, returning its result.
676    ///
677    /// Returns Err(()) when the command or its result could not be sent, probably because the
678    /// server was shutting down.
679    pub async fn inspect<T: Send + 'static>(
680        &self,
681        display: impl std::fmt::Display + Send + 'static,
682        inspector: impl FnOnce(&S) -> T + Send + 'static,
683    ) -> Result<T, ()> {
684        let (sender, receiver) = tokio::sync::oneshot::channel::<T>();
685
686        self.issue_command(Command::Inspect(Box::new((
687            |server: &S| {
688                if sender.send(inspector(server)).is_err() {
689                    log::warn!(
690                        "{}: could not return result of inspection because receiver was already closed",
691                        S::NAME
692                    );
693                    // Might happen when the server is restarted ungracefully - so don't panic here.
694                }
695            },
696            display,
697        ))))
698        .await?;
699
700        receiver.await.map_err(|_| {
701            log::warn!(
702                "{server_name}: could receive result of inspector",
703                server_name = S::NAME,
704            );
705        })
706    }
707
708    pub async fn request_discovery(&self, app: Rc<S::AppT>) -> api::Result<api::DiscoveryRunResp> {
709        self.discovery_limiter.request_discovery::<S>(app).await
710    }
711}
712
713impl<S: Server> Runner<S> {
714    pub fn new(
715        global_config: &crate::servers::Config,
716        shutdown_receiver: tokio::sync::broadcast::Receiver<Infallible>,
717        worker_count: Option<NonZero<usize>>,
718    ) -> Result<Self> {
719        log::trace!("{}: creating runner...", S::NAME);
720
721        let pubhubs_server = Rc::new(S::new(global_config)?);
722
723        log::trace!("{}: created runner", S::NAME);
724
725        Ok(Runner {
726            pubhubs_server,
727            shutdown_receiver,
728            worker_count,
729            generation: 0,
730        })
731    }
732
733    fn create_actix_server(&self) -> Result<Handles<S>> {
734        let app_creator: S::AppCreatorT = self.pubhubs_server.deref().clone();
735
736        let (command_sender, command_receiver) = mpsc::channel(1);
737
738        let handle = Handle::<S> {
739            sender: command_sender,
740            discovery_limiter: DiscoveryLimiter::new(),
741        };
742
743        let ac_context: <S::AppCreatorT as AppCreator<S>>::ContextT = Default::default();
744
745        let server_config = self.pubhubs_server.server_config();
746
747        log::info!(
748            "{}:  binding actix server to {}, port {}, running on {:?}",
749            S::NAME,
750            server_config
751                .ips
752                .iter()
753                .map(|ip| ip.to_string())
754                .collect::<Box<[String]>>()
755                .join(", "),
756            server_config.port,
757            std::thread::current().id()
758        );
759
760        let app_creator2 = app_creator.clone();
761        let handle2 = handle.clone();
762        let ac_context2 = ac_context.clone();
763        let generation: usize = self.generation;
764
765        let actual_actix_server: actix_web::dev::Server = {
766            // Build actix server
767            let mut builder: actix_web::HttpServer<_, _, _, _> =
768                actix_web::HttpServer::new(move || {
769                    let app: Rc<S::AppT> = Rc::new(app_creator2.clone().into_app(
770                        &handle2,
771                        &ac_context2,
772                        generation,
773                    ));
774
775                    actix_web::App::new().wrap(S::cors()).configure(
776                        |sc: &mut web::ServiceConfig| {
777                            // first configure endpoints common to all servers
778                            AppBase::<S>::configure_actix_app(&app, sc);
779
780                            // and then server-specific endpoints
781                            app.configure_actix_app(sc);
782
783                            let weak = Rc::downgrade(&app);
784
785                            tokio::task::spawn_local(async move {
786                                let thread = std::thread::current();
787
788                                log::debug!(
789                                    "{}: spawned local task on thread {:?}",
790                                    S::NAME,
791                                    thread.id()
792                                );
793
794                                let _deferred = defer(|| {
795                                    log::debug!(
796                                        "{}: local task on thread {:?} stopped",
797                                        S::NAME,
798                                        thread.id()
799                                    );
800                                });
801
802                                S::AppT::local_task(weak).await;
803                            });
804                        },
805                    )
806                })
807                .disable_signals() // we handle signals ourselves
808                .bind(server_config)?;
809
810            if let Some(worker_count) = self.worker_count {
811                builder = builder.workers(worker_count.get());
812            }
813
814            builder.run()
815        };
816
817        // start actix server
818        let actix_server_handle = actual_actix_server.handle().clone();
819        let actix_join_handle = tokio::task::spawn(actual_actix_server);
820
821        // start PH server task (doing discovery)
822        let (ph_shutdown_sender, ph_shutdown_receiver) = tokio::sync::oneshot::channel();
823        let ph_join_handle =
824            tokio::task::spawn_local(self.pubhubs_server.clone().run_until_modifier(
825                ph_shutdown_receiver,
826                Rc::new(app_creator.into_app(&handle, &ac_context, generation)),
827            ));
828
829        Ok(Handles {
830            actix_server_handle,
831            actix_join_handle,
832            command_receiver,
833            ph_join_handle,
834            ph_shutdown_sender: Some(ph_shutdown_sender),
835            drop_bomb: crate::misc::drop_ext::Bomb::new(|| {
836                format!("Part of {} was not shut down properly", S::NAME)
837            }),
838        })
839    }
840
841    pub async fn run(mut self) -> Result<()> {
842        loop {
843            let modifier = self.run_until_modifier().await?;
844
845            let pubhubs_server_mutref: &mut S =
846                Rc::get_mut(&mut self.pubhubs_server).expect("pubhubs_server is still borrowed");
847
848            let modifier_fmt = format!("{modifier}"); // so modifier can be consumed
849
850            log::info!("{}: applying modification {:?}", S::NAME, modifier_fmt);
851
852            if !modifier.modify(pubhubs_server_mutref) {
853                log::info!(
854                    "{}: not restarting upon request of {:?}",
855                    S::NAME,
856                    modifier_fmt
857                );
858                return Ok(());
859            }
860
861            self.generation += 1;
862            log::info!("{}: restarting...", S::NAME);
863        }
864    }
865
866    pub async fn run_until_modifier(
867        &mut self,
868    ) -> Result<crate::servers::server::BoxModifier<S>, anyhow::Error> {
869        let mut handles = self.create_actix_server()?;
870
871        let result = Self::run_until_modifier_inner(&mut handles, self).await;
872
873        handles.shutdown().await?;
874
875        result
876    }
877
878    async fn run_until_modifier_inner(
879        handles: &mut Handles<S>,
880        runner: &mut Runner<S>,
881    ) -> Result<crate::servers::server::BoxModifier<S>, anyhow::Error> {
882        loop {
883            match handles.run_until_command(runner).await? {
884                Command::Modify(modifier) => {
885                    log::debug!(
886                        "Stopping {} for modification {:?}...",
887                        S::NAME,
888                        modifier.to_string()
889                    );
890
891                    return Ok::<_, anyhow::Error>(modifier);
892                }
893                Command::Inspect(inspector) => {
894                    log::debug!(
895                        "{}: applying inspection {:?}",
896                        S::NAME,
897                        inspector.to_string()
898                    );
899                    inspector.inspect(&*runner.pubhubs_server);
900                }
901                Command::Exit => {
902                    log::debug!("Stopping {}, as requested", S::NAME);
903
904                    return Ok::<_, anyhow::Error>(Box::new(crate::servers::server::Exiter));
905                }
906            }
907        }
908    }
909}