1use std::num::NonZero;
3use std::rc::Rc;
4use std::sync::Arc;
5
6use actix_web::web;
7use anyhow::{Context as _, Result, bail};
8use core::convert::Infallible;
9use tokio::sync::mpsc;
10
11use crate::api;
12use crate::misc::defer;
13use crate::servers::{
14 App, AppBase, AppCreator, Command, DiscoverVerdict, Name, Server, for_all_servers,
15 server::RunningState,
16};
17
18pub struct Set {
20 wait_jh: tokio::task::JoinHandle<usize>,
22}
23
24impl Set {
25 pub fn new(
28 config: &crate::servers::Config,
29 ) -> Result<(Self, tokio::sync::oneshot::Sender<Infallible>)> {
30 let (inner, shutdown_sender) = SetInner::new(config)?;
31
32 let wait_jh = tokio::task::spawn(inner.wait());
33
34 Ok((Self { wait_jh }, shutdown_sender))
35 }
36
37 pub async fn wait(self) -> usize {
44 match self.wait_jh.await {
45 Ok(nr) => nr,
46 Err(join_error) => {
47 panic!("task waiting on servers to exit was cancelled or panicked: {join_error}");
48 }
49 }
50 }
51}
52
53struct SetInner {
55 joinset: tokio::task::JoinSet<Result<()>>,
57
58 shutdown_sender: Option<tokio::sync::broadcast::Sender<Infallible>>,
62
63 shutdown_receiver: tokio::sync::oneshot::Receiver<Infallible>,
65}
66
67impl Drop for SetInner {
68 fn drop(&mut self) {
69 if self.shutdown_sender.is_some() {
70 log::error!(
71 "the completion of all pubhubs servers was not awaited - please consume SetInner using wait() or shutdown()"
72 )
73 }
74 }
75}
76
77impl SetInner {
78 pub fn new(
83 config: &crate::servers::Config,
84 ) -> Result<(Self, tokio::sync::oneshot::Sender<Infallible>)> {
85 let rt_handle: tokio::runtime::Handle = tokio::runtime::Handle::current();
86 let mut joinset = tokio::task::JoinSet::<Result<()>>::new();
87
88 let (shutdown_sender, _) = tokio::sync::broadcast::channel(1); let server_count: usize = {
92 let mut counter: usize = 0;
93
94 macro_rules! count_server {
95 ($server:ident) => {
96 if config.$server.is_some() {
97 counter += 1;
98 }
99 };
100 }
101
102 for_all_servers!(count_server);
103
104 counter
105 };
106
107 let worker_count: Option<NonZero<usize>> =
109 std::thread::available_parallelism()
110 .ok()
111 .map(|parallelism: NonZero<usize>| {
112 if server_count == 0 {
113 return NonZero::<usize>::new(1).unwrap();
114 }
115
116 NonZero::<usize>::try_from(parallelism.get() / server_count)
117 .unwrap_or(NonZero::<usize>::new(1).unwrap()) });
119
120 macro_rules! run_server {
121 ($server:ident) => {
122 if config.$server.is_some() {
123 let config = config.clone();
124 let rt_handle = rt_handle.clone();
125 let shutdown_receiver = shutdown_sender.subscribe();
126
127 joinset.spawn_blocking(move || -> Result<()> {
130 Self::run_server::<crate::servers::$server::Server>(
131 config,
132 rt_handle,
133 shutdown_receiver,
134 worker_count,
135 )
136 });
137 }
138 };
139 }
140
141 for_all_servers!(run_server);
142
143 let (external_shutdown_sender, external_shutdown_receiver) =
144 tokio::sync::oneshot::channel();
145
146 Ok((
147 Self {
148 joinset,
149 shutdown_sender: Some(shutdown_sender),
150 shutdown_receiver: external_shutdown_receiver,
151 },
152 external_shutdown_sender,
153 ))
154 }
155
156 fn run_server<S: Server>(
160 config: crate::servers::Config,
161 rt_handle: tokio::runtime::Handle,
162 shutdown_receiver: tokio::sync::broadcast::Receiver<Infallible>,
163 worker_count: Option<NonZero<usize>>,
164 ) -> Result<()> {
165 assert!(config.preparation_state == crate::servers::config::PreparationState::Preliminary);
166
167 let localset = tokio::task::LocalSet::new();
168
169 let fut = localset.run_until(async {
170 let config = config.prepare_for(S::NAME).await?;
171
172 crate::servers::run::Runner::<S>::new(&config, shutdown_receiver, worker_count)?
173 .run()
174 .await
175 });
176
177 let result = rt_handle.block_on(fut);
178
179 rt_handle.block_on(localset);
180
181 log::debug!("{} stopped with {:?}", S::NAME, result);
182
183 result
184 }
185
186 pub async fn wait(mut self) -> usize {
193 log::trace!("waiting for one of the servers to exit...");
194 let err_count: usize = tokio::select! {
195 result_maybe = self
197 .joinset
198 .join_next() => {
199 let result = result_maybe.expect("no servers to wait on");
200 let is_err : bool = matches!(result, Err(_) | Ok(Err(_)));
201
202 log::log!( if is_err { log::Level::Error } else { log::Level::Debug },
203 "one of the servers exited with {result:?}; stopping all servers.."
204 );
205
206 if is_err {
207 1
208 } else {
209 0
210 }
211 },
212
213 result = &mut self.shutdown_receiver => {
215 result.expect_err("received Infallible");
216 log::debug!("shutdown requested"); 0
217 }
218 };
219
220 self.shutdown().await + err_count
221 }
222
223 pub async fn shutdown(mut self) -> usize {
226 assert!(
227 self.shutdown_sender.is_some(),
228 "only signal_shutdown should take shutdown_sender"
229 );
230
231 drop(self.shutdown_sender.take());
234
235 let mut err_count = 0usize;
236
237 while let Some(result) = self.joinset.join_next().await {
238 err_count += if matches!(result, Err(_) | Ok(Err(_))) {
239 1
240 } else {
241 0
242 };
243 }
244
245 err_count
248 }
249}
250
251struct Runner<ServerT: Server> {
253 pubhubs_server: Rc<ServerT>,
254 shutdown_receiver: tokio::sync::broadcast::Receiver<Infallible>,
255 worker_count: Option<NonZero<usize>>,
256
257 generation: usize,
259}
260
261struct Handles<S: Server> {
263 actix_server_handle: actix_web::dev::ServerHandle,
266
267 actix_join_handle: tokio::task::JoinHandle<Result<(), std::io::Error>>,
269
270 ph_join_handle: tokio::task::JoinHandle<Result<Option<crate::servers::server::BoxModifier<S>>>>,
272
273 ph_shutdown_sender: Option<tokio::sync::oneshot::Sender<Infallible>>,
275
276 command_receiver: mpsc::Receiver<CommandRequest<S>>,
278
279 drop_bomb: crate::misc::drop_ext::Bomb,
281}
282
283impl<S: Server> Handles<S> {
284 async fn run_until_command(&mut self, runner: &mut Runner<S>) -> anyhow::Result<Command<S>> {
286 tokio::select! {
287
288 command_request_maybe = self.command_receiver.recv() => {
290 if let Some(command_request) = command_request_maybe {
291 return Ok(command_request.accept());
292 }
293
294 log::error!("{}'s command receiver is unexpectedly closed", S::NAME);
295 bail!("{}'s command receiver is unexpectedly closed", S::NAME);
296 },
297
298 res = &mut self.ph_join_handle => {
300 let modifier : crate::servers::server::BoxModifier<S> =
301 res.with_context(|| format!("{}'s pubhubs task joined unexpectedly", S::NAME))?
302 .with_context(|| format!("{}'s pubhubs task crashed", S::NAME))?
303 .with_context(|| format!("{}'s pubhubs task stopped without being asked to", S::NAME))?;
304
305
306 #[expect(clippy::needless_return)] return Ok(Command::Modify(modifier));
308 },
309
310 Err(err) = runner.shutdown_receiver.recv() => {
312 match err {
313 tokio::sync::broadcast::error::RecvError::Lagged(_) => {
314 panic!("got impossible `Lagged` error from shutdown sender");
315 },
316 tokio::sync::broadcast::error::RecvError::Closed => {
317 #[expect(clippy::needless_return)] return Ok(Command::Exit);
319 },
320 }
321 },
322
323 res = &mut self.actix_join_handle => {
325 res.inspect_err(|err| log::error!("{}'s actix task joined unexpectedly: {}", S::NAME, err) )
326 .with_context(|| format!("{}'s actix task joined unexpectedly", S::NAME))?
327 .inspect_err(|err| log::error!("{}'s http server crashed: {}", S::NAME, err) )
328 .with_context(|| format!("{}'s http server crashed", S::NAME))?;
329
330 log::error!("{}'s actix server stopped unexpectedly", S::NAME);
331 bail!("{}'s actix server stopped unexpectedly", S::NAME);
332 },
333 };
334 }
335
336 async fn shutdown(mut self) -> anyhow::Result<()> {
338 log::debug!("Shut down of {} started", S::NAME);
339
340 anyhow::ensure!(
341 self.ph_shutdown_sender.is_some(),
342 "shutdown of ph task already ordered"
343 );
344
345 drop(self.ph_shutdown_sender.take());
346
347 self.actix_server_handle.stop(false).await;
353
354 let maybe_modifier = self
355 .ph_join_handle
356 .await
357 .with_context(|| {
358 format!(
359 "{}'s pubhubs task did not join gracefully after being asked to stop",
360 S::NAME
361 )
362 })?
363 .with_context(|| {
364 format!(
365 "{}'s pubhubs task crashed after being asked to stop",
366 S::NAME
367 )
368 })?;
369
370 if let Some(ph_modifier) = maybe_modifier {
371 log::error!(
372 "Woops! {}'s pubhubs task's modifier {} was ignored, because another modifier was first",
373 S::NAME,
374 ph_modifier
375 );
376 }
377
378 self.drop_bomb.diffuse();
379
380 log::debug!("Shut down of {} completed", S::NAME);
381
382 Ok(())
383 }
384}
385
386struct DiscoveryLimiter {
388 restart_imminent_lock: Arc<tokio::sync::RwLock<bool>>,
392
393 restart_imminent_cached: std::cell::OnceCell<()>,
396}
397
398impl Clone for DiscoveryLimiter {
399 fn clone(&self) -> Self {
400 Self {
401 restart_imminent_lock: self.restart_imminent_lock.clone(),
402 restart_imminent_cached: std::cell::OnceCell::<()>::new(),
404 }
405 }
406}
407
408impl DiscoveryLimiter {
409 fn new() -> Self {
410 DiscoveryLimiter {
411 restart_imminent_lock: Arc::new(tokio::sync::RwLock::new(false)),
412 restart_imminent_cached: std::cell::OnceCell::<()>::new(),
413 }
414 }
415
416 async fn request_discovery<S: Server>(
460 &self,
461 app: Rc<S::AppT>,
462 ) -> api::Result<api::DiscoveryRunResp> {
463 log::debug!(
464 "{server_name}: discovery is requested",
465 server_name = S::NAME
466 );
467
468 let mut restart_imminent_guard = match self.obtain_lock().await {
469 Some(guard) => guard,
470 None => {
471 log::debug!(
472 "{server_name}: discovery aborted because the server is already restarting",
473 server_name = S::NAME
474 );
475 return Ok(api::DiscoveryRunResp::Restarting);
476 }
477 };
478
479 let phc_discovery_info = AppBase::<S>::discover_phc(app.clone()).await?;
483
484 if phc_discovery_info.constellation_or_id.is_none() && S::NAME != Name::PubhubsCentral {
485 log::info!(
487 "Discovery of {} is run but {} has no constellation yet",
488 S::NAME,
489 Name::PubhubsCentral,
490 );
491 return Err(api::ErrorCode::PleaseRetry);
492 }
493
494 let new_constellation_maybe = match app.discover(phc_discovery_info).await? {
495 DiscoverVerdict::ConstellationOutdated { new_constellation } => Some(new_constellation),
496 DiscoverVerdict::BinaryOutdated => None,
497 DiscoverVerdict::Alright => return Ok(api::DiscoveryRunResp::UpToDate),
498 };
499
500 let result = app
503 .handle
504 .modify(
505 if new_constellation_maybe.is_some() {
506 "updated constellation after discovery"
507 } else {
508 "restarting binary hoping to update version"
509 },
510 |server: &mut S| -> bool {
511 let Some(new_constellation) = new_constellation_maybe else {
512 return false; };
516
517 let extra = match server.create_running_state(&new_constellation) {
518 Ok(extra) => extra,
519 Err(err) => {
520 log::error!(
521 "Error while restarting {} after discovery: {}",
522 S::NAME,
523 err
524 );
525 return false; }
527 };
528
529 let new_url = new_constellation.url(S::NAME).clone();
530
531 let old_running_state = server
532 .running_state
533 .replace(RunningState::new(*new_constellation, extra));
534
535 if old_running_state.is_none_or(|rs| rs.constellation.url(S::NAME) != &new_url)
537 {
538 log::info!("{}: at {}", S::NAME, new_url);
539 }
540
541 true },
543 )
544 .await;
545
546 if let Err(()) = result {
547 log::warn!(
548 "failed to initiate restart of {} for discovery, probably because the server is already shutting down",
549 S::NAME,
550 );
551 return Err(api::ErrorCode::PleaseRetry);
552 }
553
554 log::trace!(
555 "{server_name}: registering imminent restart",
556 server_name = S::NAME
557 );
558 *restart_imminent_guard = true;
559 let _ = self.restart_imminent_cached.set(());
560
561 Ok(api::DiscoveryRunResp::Restarting)
562 }
563
564 async fn obtain_lock(&self) -> Option<tokio::sync::RwLockWriteGuard<'_, bool>> {
566 if self.restart_imminent_cached.get().is_some() {
567 log::trace!("restart imminent: cached");
568 return None;
569 }
570
571 if *self.restart_imminent_lock.read().await {
572 log::trace!("restart imminent: discovered after obtaining read lock");
573 let _ = self.restart_imminent_cached.set(());
574 return None;
575 }
576
577 let restart_imminent_guard = self.restart_imminent_lock.write().await;
578
579 if *restart_imminent_guard {
580 log::trace!("restart imminent: discovered after obtaining write lock");
582 let _ = self.restart_imminent_cached.set(());
583 return None;
584 }
585
586 Some(restart_imminent_guard)
587 }
588}
589
590pub struct Handle<S: Server> {
595 sender: mpsc::Sender<CommandRequest<S>>,
597
598 discovery_limiter: DiscoveryLimiter,
600}
601
602struct CommandRequest<S: Server> {
603 command: Command<S>,
605 feedback_sender: tokio::sync::oneshot::Sender<()>,
607}
608
609impl<S: Server> CommandRequest<S> {
610 fn accept(self) -> Command<S> {
612 if self.feedback_sender.send(()).is_err() {
613 log::warn!(
614 "The app issuing command '{}' that is about to execute has already dropped.",
615 &self.command
616 );
617 }
618 self.command
619 }
620}
621
622impl<S: Server> Clone for Handle<S> {
624 fn clone(&self) -> Self {
625 Handle {
626 sender: self.sender.clone(),
627 discovery_limiter: self.discovery_limiter.clone(),
628 }
629 }
630}
631
632impl<S: Server> Handle<S> {
633 pub(crate) async fn issue_command(&self, command: Command<S>) -> Result<(), ()> {
641 let (feedback_sender, feedback_receiver) = tokio::sync::oneshot::channel();
642
643 let result = self
644 .sender
645 .send(CommandRequest {
646 command,
647 feedback_sender,
648 })
649 .await;
650
651 if let Err(send_error) = result {
652 log::warn!(
653 "{server_name}: since the command receiver is closed (probably because the server is shutting down/restarting) we could not issue the command {cmd:?}",
654 server_name = S::NAME,
655 cmd = send_error.0.command.to_string(),
656 );
657 return Err(());
658 };
659
660 feedback_receiver.await.map_err(|_| ())
664 }
665
666 pub async fn modify(
667 &self,
668 display: impl std::fmt::Display + Send + 'static,
669 modifier: impl FnOnce(&mut S) -> bool + Send + 'static,
670 ) -> Result<(), ()> {
671 self.issue_command(Command::Modify(Box::new((modifier, display))))
672 .await
673 }
674
675 pub async fn inspect<T: Send + 'static>(
680 &self,
681 display: impl std::fmt::Display + Send + 'static,
682 inspector: impl FnOnce(&S) -> T + Send + 'static,
683 ) -> Result<T, ()> {
684 let (sender, receiver) = tokio::sync::oneshot::channel::<T>();
685
686 self.issue_command(Command::Inspect(Box::new((
687 |server: &S| {
688 if sender.send(inspector(server)).is_err() {
689 log::warn!(
690 "{}: could not return result of inspection because receiver was already closed",
691 S::NAME
692 );
693 }
695 },
696 display,
697 ))))
698 .await?;
699
700 receiver.await.map_err(|_| {
701 log::warn!(
702 "{server_name}: could receive result of inspector",
703 server_name = S::NAME,
704 );
705 })
706 }
707
708 pub async fn request_discovery(&self, app: Rc<S::AppT>) -> api::Result<api::DiscoveryRunResp> {
709 self.discovery_limiter.request_discovery::<S>(app).await
710 }
711}
712
713impl<S: Server> Runner<S> {
714 pub fn new(
715 global_config: &crate::servers::Config,
716 shutdown_receiver: tokio::sync::broadcast::Receiver<Infallible>,
717 worker_count: Option<NonZero<usize>>,
718 ) -> Result<Self> {
719 log::trace!("{}: creating runner...", S::NAME);
720
721 let pubhubs_server = Rc::new(S::new(global_config)?);
722
723 log::trace!("{}: created runner", S::NAME);
724
725 Ok(Runner {
726 pubhubs_server,
727 shutdown_receiver,
728 worker_count,
729 generation: 0,
730 })
731 }
732
733 fn create_actix_server(&self) -> Result<Handles<S>> {
734 let app_creator: S::AppCreatorT = self.pubhubs_server.deref().clone();
735
736 let (command_sender, command_receiver) = mpsc::channel(1);
737
738 let handle = Handle::<S> {
739 sender: command_sender,
740 discovery_limiter: DiscoveryLimiter::new(),
741 };
742
743 let ac_context: <S::AppCreatorT as AppCreator<S>>::ContextT = Default::default();
744
745 let server_config = self.pubhubs_server.server_config();
746
747 log::info!(
748 "{}: binding actix server to {}, port {}, running on {:?}",
749 S::NAME,
750 server_config
751 .ips
752 .iter()
753 .map(|ip| ip.to_string())
754 .collect::<Box<[String]>>()
755 .join(", "),
756 server_config.port,
757 std::thread::current().id()
758 );
759
760 let app_creator2 = app_creator.clone();
761 let handle2 = handle.clone();
762 let ac_context2 = ac_context.clone();
763 let generation: usize = self.generation;
764
765 let actual_actix_server: actix_web::dev::Server = {
766 let mut builder: actix_web::HttpServer<_, _, _, _> =
768 actix_web::HttpServer::new(move || {
769 let app: Rc<S::AppT> = Rc::new(app_creator2.clone().into_app(
770 &handle2,
771 &ac_context2,
772 generation,
773 ));
774
775 actix_web::App::new().wrap(S::cors()).configure(
776 |sc: &mut web::ServiceConfig| {
777 AppBase::<S>::configure_actix_app(&app, sc);
779
780 app.configure_actix_app(sc);
782
783 let weak = Rc::downgrade(&app);
784
785 tokio::task::spawn_local(async move {
786 let thread = std::thread::current();
787
788 log::debug!(
789 "{}: spawned local task on thread {:?}",
790 S::NAME,
791 thread.id()
792 );
793
794 let _deferred = defer(|| {
795 log::debug!(
796 "{}: local task on thread {:?} stopped",
797 S::NAME,
798 thread.id()
799 );
800 });
801
802 S::AppT::local_task(weak).await;
803 });
804 },
805 )
806 })
807 .disable_signals() .bind(server_config)?;
809
810 if let Some(worker_count) = self.worker_count {
811 builder = builder.workers(worker_count.get());
812 }
813
814 builder.run()
815 };
816
817 let actix_server_handle = actual_actix_server.handle().clone();
819 let actix_join_handle = tokio::task::spawn(actual_actix_server);
820
821 let (ph_shutdown_sender, ph_shutdown_receiver) = tokio::sync::oneshot::channel();
823 let ph_join_handle =
824 tokio::task::spawn_local(self.pubhubs_server.clone().run_until_modifier(
825 ph_shutdown_receiver,
826 Rc::new(app_creator.into_app(&handle, &ac_context, generation)),
827 ));
828
829 Ok(Handles {
830 actix_server_handle,
831 actix_join_handle,
832 command_receiver,
833 ph_join_handle,
834 ph_shutdown_sender: Some(ph_shutdown_sender),
835 drop_bomb: crate::misc::drop_ext::Bomb::new(|| {
836 format!("Part of {} was not shut down properly", S::NAME)
837 }),
838 })
839 }
840
841 pub async fn run(mut self) -> Result<()> {
842 loop {
843 let modifier = self.run_until_modifier().await?;
844
845 let pubhubs_server_mutref: &mut S =
846 Rc::get_mut(&mut self.pubhubs_server).expect("pubhubs_server is still borrowed");
847
848 let modifier_fmt = format!("{modifier}"); log::info!("{}: applying modification {:?}", S::NAME, modifier_fmt);
851
852 if !modifier.modify(pubhubs_server_mutref) {
853 log::info!(
854 "{}: not restarting upon request of {:?}",
855 S::NAME,
856 modifier_fmt
857 );
858 return Ok(());
859 }
860
861 self.generation += 1;
862 log::info!("{}: restarting...", S::NAME);
863 }
864 }
865
866 pub async fn run_until_modifier(
867 &mut self,
868 ) -> Result<crate::servers::server::BoxModifier<S>, anyhow::Error> {
869 let mut handles = self.create_actix_server()?;
870
871 let result = Self::run_until_modifier_inner(&mut handles, self).await;
872
873 handles.shutdown().await?;
874
875 result
876 }
877
878 async fn run_until_modifier_inner(
879 handles: &mut Handles<S>,
880 runner: &mut Runner<S>,
881 ) -> Result<crate::servers::server::BoxModifier<S>, anyhow::Error> {
882 loop {
883 match handles.run_until_command(runner).await? {
884 Command::Modify(modifier) => {
885 log::debug!(
886 "Stopping {} for modification {:?}...",
887 S::NAME,
888 modifier.to_string()
889 );
890
891 return Ok::<_, anyhow::Error>(modifier);
892 }
893 Command::Inspect(inspector) => {
894 log::debug!(
895 "{}: applying inspection {:?}",
896 S::NAME,
897 inspector.to_string()
898 );
899 inspector.inspect(&*runner.pubhubs_server);
900 }
901 Command::Exit => {
902 log::debug!("Stopping {}, as requested", S::NAME);
903
904 return Ok::<_, anyhow::Error>(Box::new(crate::servers::server::Exiter));
905 }
906 }
907 }
908 }
909}