Skip to main content

pubhubs/servers/auths/
yivi.rs

1//! Implementation of the `/yivi/...` endpoints
2use super::server::*;
3
4use crate::api;
5use crate::id;
6use crate::misc;
7use crate::misc::jwt;
8use crate::misc::stream_ext::StreamExt as _;
9use crate::servers::yivi;
10
11use super::server::YiviCtx;
12
13use std::collections::{HashMap, VecDeque};
14use std::rc::Rc;
15
16use actix_web::web;
17use futures::future::FutureExt as _;
18use futures::stream::StreamExt as _;
19
20impl App {
21    pub fn chained_sessions_ctl_or_bad_request(&self) -> api::Result<&ChainedSessionsCtl> {
22        self.chained_sessions_ctl.as_ref().ok_or_else(|| {
23            log::debug!("chained sessions control requested, but not available");
24            api::ErrorCode::BadRequest
25        })
26    }
27
28    /// Implements the [`api::auths::YiviWaitForResultEP`] endpoint.
29    pub async fn handle_yivi_wait_for_result(
30        app: Rc<Self>,
31        req: web::Json<api::auths::YiviWaitForResultReq>,
32    ) -> api::Result<api::auths::YiviWaitForResultResp> {
33        let csc = app.chained_sessions_ctl_or_bad_request()?;
34
35        let api::auths::YiviWaitForResultReq { state } = req.into_inner();
36
37        let Some(state) = AuthState::unseal(&state, &app.auth_state_secret) else {
38            return Ok(api::auths::YiviWaitForResultResp::PleaseRestartAuth);
39        };
40
41        let Some(ChainedSessionSetup { id: session_id, .. }) = state.yivi_chained_session else {
42            log::debug!(
43                "yivi-wait-for-result endpoint called on a authentication session without a yivi chained session"
44            );
45            return Err(api::ErrorCode::BadRequest);
46        };
47
48        csc.wait_for_result(session_id).await
49    }
50
51    /// Implements the [`api::auths::YIVI_NEXT_SESSION_PATH`] endpoint.
52    pub async fn handle_yivi_next_session(
53        app: web::Data<std::rc::Rc<App>>,
54        query: web::Query<api::auths::YiviNextSessionQuery>,
55        result_jwt: String,
56    ) -> impl actix_web::Responder {
57        use actix_web::Either::{Left, Right};
58
59        let app = app.into_inner();
60        let api::auths::YiviNextSessionQuery { state } = query.into_inner();
61
62        log::trace!(
63            "yivi server (or imposter) submits next sessions request; jwt: {result_jwt:?}; auth state: {}",
64            state
65        );
66
67        let result_jwt = jwt::JWT::from(result_jwt);
68
69        let Some(state) = AuthState::unseal(&state, &app.auth_state_secret) else {
70            log::debug!(
71                "yivi server (or an imposter) submitted invalid (or expired) auth state to next-session endpoint"
72            );
73            return Left(actix_web::HttpResponse::BadRequest().finish());
74        };
75
76        let Some(ChainedSessionSetup {
77            id: chained_session_id,
78            drip,
79        }) = state.yivi_chained_session
80        else {
81            log::warn!(
82                "yivi server submitted auth state to next-session endpoint without chained session id"
83            );
84            return Left(actix_web::HttpResponse::BadRequest().finish());
85        };
86
87        // NOTE: ChainedSessionsCtl is cheaply cloneable
88        let Some(csc) = app.chained_sessions_ctl.clone() else {
89            log::warn!("next-session endpoint invoked, but chained sessions are not supported");
90            return Left(actix_web::HttpResponse::BadRequest().finish());
91        };
92
93        let Some(yivi) = app.yivi.as_ref() else {
94            log::warn!("next-session endpoint invoked, but yivi is not supported");
95            return Left(actix_web::HttpResponse::BadRequest().finish());
96        };
97
98        let Ok(..) = yivi::SessionResult::open_signed(&result_jwt, &yivi.server_creds) else {
99            log::debug!(
100                "invalid yivi signed session result submitted by yivi server (or imposter)",
101            );
102            log::trace!("invalid signed session result jwt: {result_jwt}");
103            return Left(actix_web::HttpResponse::BadRequest().finish());
104        };
105
106        log::trace!(
107            "yivi server submitted disclosure and is waiting for chained session {chained_session_id}"
108        );
109
110        let request_id = id::Id::random();
111
112        let wfns_fut =
113            csc.clone()
114                .wait_for_next_session(chained_session_id, request_id, result_jwt);
115
116        if drip {
117            // When wfns_fut is dropped (because actix has detected the yivi server has
118            // disconnected), we want to abort the WaitForNextSession via AbortWaitForNextSession.
119            let on_drop = move || {
120                tokio::task::spawn_local(async move {
121                    let _ = csc
122                        .send_command(CscCommand::AbortWaitForNextSession {
123                            chained_session_id,
124                            request_id,
125                        })
126                        .await;
127                });
128            };
129
130            let wfns_fut = async move {
131                let _deferred = misc::defer(on_drop);
132                wfns_fut.await
133            };
134
135            Right(Left(Self::dripping_wfns_responder(wfns_fut)))
136        } else {
137            Right(Right(Self::regular_wfns_responder(wfns_fut).await))
138        }
139    }
140
141    async fn regular_wfns_responder(
142        wfns_fut: impl Future<Output = api::Result<NextSession>>,
143    ) -> impl actix_web::Responder {
144        match wfns_fut.await {
145            Ok(None) => actix_web::HttpResponse::NoContent().finish(),
146            Ok(Some(session_request)) => {
147                log::debug!("sent chained session to yivi server");
148                actix_web::HttpResponse::Ok().json(session_request)
149            }
150            Err(api::ErrorCode::InternalError) => {
151                actix_web::HttpResponse::InternalServerError().finish()
152            }
153            Err(api::ErrorCode::BadRequest) => actix_web::HttpResponse::BadRequest().finish(),
154            Err(api::ErrorCode::PleaseRetry) => panic!("not expecting 'please retry' here"),
155        }
156    }
157
158    fn dripping_wfns_responder(
159        wfns_fut: impl Future<Output = api::Result<NextSession>> + 'static,
160    ) -> impl actix_web::Responder {
161        let drips = tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(
162            core::time::Duration::from_millis(100),
163        ))
164        .map(|_| Ok(bytes::Bytes::from_static(b" ")));
165
166        let result_bytes_fut = wfns_fut.map(|result| match result {
167            Ok(Some(session_request)) => {
168                log::debug!("sent chained session to yivi server: {session_request:?}");
169                Ok(serde_json::to_vec_pretty(&session_request)
170                    .map_err(|err| {
171                        log::error!("failed to serialize session_request to json: {err:#}");
172                        api::ErrorCode::InternalError
173                    })?
174                    .into())
175            }
176            Ok(None) => {
177                log::error!("bug: `None` session_request reached dripping chained session");
178                Err(api::ErrorCode::InternalError)
179            }
180            Err(err) => {
181                log::warn!("failed to release session_request to yivi server: {err:#}");
182                Err(err)
183            }
184        });
185
186        let stream = drips.until_overridden_by(result_bytes_fut.into_stream());
187
188        actix_web::HttpResponse::Ok()
189            // NB: Content-Type is not checked by irmago at the moment
190            .content_type(mime::APPLICATION_JSON)
191            .streaming(stream)
192    }
193
194    /// Implements the [`api::auths::YiviReleaseNextSessionEP`] endpoint.
195    pub async fn handle_yivi_release_next_session(
196        app: Rc<Self>,
197        req: web::Json<api::auths::YiviReleaseNextSessionReq>,
198    ) -> api::Result<api::auths::YiviReleaseNextSessionResp> {
199        let csc = app.chained_sessions_ctl_or_bad_request()?;
200        let yivi = app.get_yivi()?;
201
202        let api::auths::YiviReleaseNextSessionReq {
203            state,
204            next_session,
205            stale_after,
206        } = req.into_inner();
207
208        let Some(state) = AuthState::unseal(&state, &app.auth_state_secret) else {
209            return Ok(api::auths::YiviReleaseNextSessionResp::PleaseRestartAuth);
210        };
211
212        let Some(ChainedSessionSetup {
213            id: session_id,
214            drip,
215        }) = state.yivi_chained_session
216        else {
217            log::debug!(
218                "yivi-release-next-session endpoint called on a authentication session without a yivi chained session"
219            );
220            return Err(api::ErrorCode::BadRequest);
221        };
222
223        if next_session.is_none() && drip {
224            log::debug!(
225                "yivi-release-next-session endpoint on a dripping chained session, but with empty next session."
226            );
227            return Err(api::ErrorCode::BadRequest);
228        }
229
230        let esr = if let Some(jwt) = next_session {
231            Some(
232                yivi::ExtendedSessionRequest::open_signed(
233                    &jwt,
234                    &yivi.requestor_creds.to_verifying_credentials(),
235                )
236                .map_err(|err| {
237                    log::debug!("failed to open signed extended session request: {}", err);
238                    api::ErrorCode::BadRequest
239                })?,
240            )
241        } else {
242            None
243        };
244
245        csc.release_next_session(session_id, esr, stale_after).await
246    }
247}
248
249/// Keeps track of chained sessions
250///
251/// Create using [`Self::new`].  Cheaply cloneable.
252#[derive(Clone)]
253pub struct ChainedSessionsCtl {
254    sender: tokio::sync::mpsc::Sender<CscCommand>,
255}
256
257#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)]
258pub struct ChainedSessionsConfig {
259    #[serde(with = "crate::misc::time_ext::human_duration")]
260    #[serde(default = "default_chained_session_validity")]
261    pub session_validity: core::time::Duration,
262}
263
264fn default_chained_session_validity() -> core::time::Duration {
265    core::time::Duration::from_secs(10 * 60) // 10 minutes
266}
267
268impl Default for ChainedSessionsConfig {
269    fn default() -> Self {
270        crate::misc::serde_ext::default_object()
271    }
272}
273
274type NextSession = Option<yivi::ExtendedSessionRequest>;
275
276type CreateSessionCrs = tokio::sync::oneshot::Sender<api::Result<id::Id>>;
277type WaitForResultCrs =
278    tokio::sync::oneshot::Sender<api::Result<api::auths::YiviWaitForResultResp>>;
279type WaitForNextSessionCrs = tokio::sync::oneshot::Sender<api::Result<NextSession>>;
280type ReleaseNextSessionCrs =
281    tokio::sync::oneshot::Sender<api::Result<api::auths::YiviReleaseNextSessionResp>>;
282
283enum CscCommand {
284    CreateSession {
285        resp_sender: CreateSessionCrs,
286    },
287    WaitForResult {
288        chained_session_id: id::Id,
289        resp_sender: WaitForResultCrs,
290    },
291    WaitForNextSession {
292        chained_session_id: id::Id,
293        /// `request_id` should be random and is passed in `Self::AbortWaitForNextSession` to abort
294        request_id: id::Id,
295        disclosure: jwt::JWT,
296        resp_sender: WaitForNextSessionCrs,
297    },
298    AbortWaitForNextSession {
299        chained_session_id: id::Id,
300        request_id: id::Id,
301    },
302    ReleaseNextSession {
303        chained_session_id: id::Id,
304        next_session_request: NextSession,
305        stale_after: Option<u16>,
306        resp_sender: ReleaseNextSessionCrs,
307    },
308}
309
310impl ChainedSessionsCtl {
311    /// Creates a new chained session, returning its [`id::Id`]
312    pub async fn create_session(&self) -> api::Result<id::Id> {
313        let (resp_sender, resp_receiver) = tokio::sync::oneshot::channel();
314
315        self.send_command(CscCommand::CreateSession { resp_sender })
316            .await?;
317
318        let Ok(resp) = resp_receiver.await else {
319            log::warn!("chained session control create-session response channel closed early");
320            return Err(api::ErrorCode::InternalError);
321        };
322
323        resp
324    }
325
326    /// Wait for the disclosure to arrive for the given chained session
327    pub async fn wait_for_result(
328        &self,
329        chained_session_id: id::Id,
330    ) -> api::Result<api::auths::YiviWaitForResultResp> {
331        let (resp_sender, resp_receiver) = tokio::sync::oneshot::channel();
332
333        self.send_command(CscCommand::WaitForResult {
334            chained_session_id,
335            resp_sender,
336        })
337        .await?;
338
339        let Ok(resp) = resp_receiver.await else {
340            log::warn!("chained session control wait-for-result response channel closed early");
341            return Err(api::ErrorCode::InternalError);
342        };
343
344        resp
345    }
346
347    /// Registers incoming disclosure and waits for the next session.
348    ///
349    /// Returns `None` if the yivi session is to be ended normally, without starting a next
350    /// session.
351    pub async fn wait_for_next_session(
352        self,
353        chained_session_id: id::Id,
354        request_id: id::Id,
355        disclosure: jwt::JWT,
356    ) -> api::Result<NextSession> {
357        let (resp_sender, resp_receiver) = tokio::sync::oneshot::channel();
358
359        self.send_command(CscCommand::WaitForNextSession {
360            chained_session_id,
361            request_id,
362            disclosure,
363            resp_sender,
364        })
365        .await?;
366
367        let Ok(resp) = resp_receiver.await else {
368            log::warn!(
369                "chained session control wait-for-next-session response channel closed early"
370            );
371            return Err(api::ErrorCode::InternalError);
372        };
373
374        resp
375    }
376
377    /// Hands the next session request (if any) to the waiting yivi server
378    pub async fn release_next_session(
379        &self,
380        chained_session_id: id::Id,
381        next_session_request: NextSession,
382        stale_after: Option<u16>,
383    ) -> api::Result<api::auths::YiviReleaseNextSessionResp> {
384        let (resp_sender, resp_receiver) = tokio::sync::oneshot::channel();
385
386        self.send_command(CscCommand::ReleaseNextSession {
387            chained_session_id,
388            next_session_request,
389            stale_after,
390            resp_sender,
391        })
392        .await?;
393
394        let Ok(resp) = resp_receiver.await else {
395            log::warn!(
396                "chained session control release-next-session response channel closed early"
397            );
398            return Err(api::ErrorCode::InternalError);
399        };
400
401        resp
402    }
403
404    async fn send_command(&self, cmd: CscCommand) -> api::Result<()> {
405        self.sender.send(cmd).await.map_err(|_| {
406            log::warn!("chained session control command channel closed early");
407            api::ErrorCode::InternalError
408        })
409    }
410
411    /// Creates a new [`ChainedSessionsCtl`] instance, and spawns a background task to drive it.
412    pub fn new(ctx: YiviCtx) -> Self {
413        let (sender, receiver) = tokio::sync::mpsc::channel(10);
414
415        tokio::spawn(async {
416            log::trace!("spawned chained sessions control task");
417
418            ChainedSessionsCtl::drive(ctx, receiver).await;
419
420            log::trace!("chained sessions control task is about to complete");
421        });
422
423        Self { sender }
424    }
425
426    async fn drive(ctx: YiviCtx, mut receiver: tokio::sync::mpsc::Receiver<CscCommand>) {
427        let mut backend = ChainedSessionsBackend::new(ctx);
428
429        loop {
430            tokio::select! {
431                cmd_maybe = receiver.recv() => {
432                    let Some(cmd) = cmd_maybe else {
433                        // channel is closed, no more commands are coming, so we can abort
434                        return
435                    };
436
437                    backend.handle_cmd(cmd).await;
438                }
439                _ = backend.sleep_until_next_expiry() => {
440                    backend.expire_next();
441                }
442            };
443        }
444    }
445}
446
447enum ChainedSessionState {
448    WaitingForYiviServer {
449        waiters: Vec<WaitForResultCrs>,
450    },
451    YiviServerWaiting {
452        disclosure: jwt::JWT,
453        /// Yivi servers waiting to be released.  The `Id` refers to the yivi server.
454        waiters: HashMap<id::Id, WaitForNextSessionCrs>,
455        first_arrived_at: std::time::Instant,
456    },
457}
458
459/// Backend to [`ChainedSessionsCtl`].
460struct ChainedSessionsBackend {
461    ctx: YiviCtx,
462    sessions: HashMap<id::Id, ChainedSessionState>,
463
464    /// Session ids ordered by expiry instant.  The session that will expire soonest
465    /// is in the front.  May contain ids already removed from [`Self::sessions`]
466    /// (namely, sessions that completed normally before expiry).
467    expiry_queue: VecDeque<(tokio::time::Instant, id::Id)>,
468}
469
470impl ChainedSessionsBackend {
471    fn new(ctx: YiviCtx) -> Self {
472        Self {
473            ctx,
474            sessions: Default::default(),
475            expiry_queue: Default::default(),
476        }
477    }
478
479    async fn handle_cmd(&mut self, cmd: CscCommand) {
480        match cmd {
481            CscCommand::WaitForResult {
482                chained_session_id,
483                resp_sender,
484            } => {
485                self.handle_wait_for_result(chained_session_id, resp_sender)
486                    .await
487            }
488            CscCommand::WaitForNextSession {
489                chained_session_id,
490                request_id,
491                disclosure,
492                resp_sender,
493            } => {
494                self.handle_wait_for_next_session(
495                    chained_session_id,
496                    request_id,
497                    disclosure,
498                    resp_sender,
499                )
500                .await
501            }
502            CscCommand::AbortWaitForNextSession {
503                chained_session_id,
504                request_id,
505            } => {
506                self.handle_abort_wait_for_next_session(chained_session_id, request_id)
507                    .await
508            }
509            CscCommand::CreateSession { resp_sender } => {
510                self.handle_create_session(resp_sender).await
511            }
512            CscCommand::ReleaseNextSession {
513                chained_session_id,
514                next_session_request,
515                stale_after,
516                resp_sender,
517            } => {
518                self.handle_release_next_session(
519                    chained_session_id,
520                    next_session_request,
521                    stale_after,
522                    resp_sender,
523                )
524                .await
525            }
526        }
527    }
528
529    fn respond_to<T>(
530        resp_sender: tokio::sync::oneshot::Sender<T>,
531        resp: T,
532        chained_session_id: id::Id,
533    ) {
534        if resp_sender.send(resp).is_err() {
535            log::warn!(
536                "response channel for chained session {chained_session_id} was closed before response could be sent"
537            );
538        }
539    }
540
541    async fn handle_create_session(&mut self, resp_sender: CreateSessionCrs) {
542        let chained_session_id = id::Id::random();
543
544        assert!(
545            self.sessions
546                .insert(
547                    chained_session_id,
548                    ChainedSessionState::WaitingForYiviServer { waiters: vec![] }
549                )
550                .is_none(),
551            "against all odds, 256-bit random ids collided!"
552        );
553
554        let expiry =
555            tokio::time::Instant::now() + self.ctx.chained_sessions_config.session_validity;
556        self.expiry_queue.push_back((expiry, chained_session_id));
557
558        log::trace!("chained session {chained_session_id} created");
559
560        Self::respond_to(resp_sender, Ok(chained_session_id), chained_session_id);
561    }
562
563    async fn handle_wait_for_result(
564        &mut self,
565        chained_session_id: id::Id,
566        resp_sender: WaitForResultCrs,
567    ) {
568        let Some(session) = self.sessions.get_mut(&chained_session_id) else {
569            Self::respond_to(
570                resp_sender,
571                Ok(api::auths::YiviWaitForResultResp::SessionGone),
572                chained_session_id,
573            );
574            return;
575        };
576
577        match session {
578            ChainedSessionState::WaitingForYiviServer { waiters } => {
579                log::trace!(
580                    "registered waiter for the result of chained session {chained_session_id}",
581                );
582                waiters.push(resp_sender)
583            }
584            ChainedSessionState::YiviServerWaiting { disclosure, .. } => {
585                log::trace!(
586                    "result for chained session {chained_session_id} requested and immediately available"
587                );
588                Self::respond_to(
589                    resp_sender,
590                    Ok(api::auths::YiviWaitForResultResp::Success {
591                        disclosure: disclosure.clone(),
592                    }),
593                    chained_session_id,
594                )
595            }
596        }
597    }
598
599    async fn handle_wait_for_next_session(
600        &mut self,
601        chained_session_id: id::Id,
602        request_id: id::Id,
603        disclosure: jwt::JWT,
604        resp_sender: WaitForNextSessionCrs,
605    ) {
606        let Some(session) = self.sessions.get_mut(&chained_session_id) else {
607            log::warn!(
608                "yivi server submitted disclosure for a chained session {chained_session_id} that cannot be found - was the yivi server too slow?"
609            );
610            Self::respond_to(resp_sender, Ok(None), chained_session_id);
611            return;
612        };
613
614        // check session is ready for the yivi server
615        match session {
616            ChainedSessionState::WaitingForYiviServer { .. } => {
617                // this is what we want; handled below
618            }
619            ChainedSessionState::YiviServerWaiting {
620                disclosure: stored_disclosure,
621                waiters,
622                ..
623            } => {
624                if &disclosure != stored_disclosure {
625                    log::warn!(
626                        "second yivi server submitted a different disclosure for chained session {chained_session_id}"
627                    );
628                    Self::respond_to(
629                        resp_sender,
630                        Err(api::ErrorCode::BadRequest),
631                        chained_session_id,
632                    );
633                    return;
634                }
635                log::trace!(
636                    "an additional yivi server ({request_id}) is waiting for chained session {chained_session_id}"
637                );
638                let should_be_none = waiters.insert(request_id, resp_sender);
639                if should_be_none.is_some() {
640                    log::error!("bug: 256-bit random `request_id` collided - was it random?");
641                    panic!("bug: random `request_id`s collided");
642                }
643                return;
644            }
645        }
646        let old_session = std::mem::replace(
647            session,
648            ChainedSessionState::YiviServerWaiting {
649                disclosure: disclosure.clone(),
650                waiters: [(request_id, resp_sender)].into(),
651                first_arrived_at: std::time::Instant::now(),
652            },
653        );
654
655        match old_session {
656            ChainedSessionState::WaitingForYiviServer { waiters } => {
657                // release waiters
658                log::trace!(
659                    "releasing {} waiter(s) on the result of chained session {chained_session_id}",
660                    waiters.len()
661                );
662                for waiter in waiters.into_iter() {
663                    Self::respond_to(
664                        waiter,
665                        Ok(api::auths::YiviWaitForResultResp::Success {
666                            disclosure: disclosure.clone(),
667                        }),
668                        chained_session_id,
669                    )
670                }
671            }
672            ChainedSessionState::YiviServerWaiting { .. } => {
673                panic!("session changed unexpectedly");
674            }
675        }
676    }
677
678    async fn handle_abort_wait_for_next_session(
679        &mut self,
680        chained_session_id: id::Id,
681        request_id: id::Id,
682    ) {
683        let Some(session) = self.sessions.get_mut(&chained_session_id) else {
684            log::trace!(
685                "wanting to abort yivi server request {request_id} of chained session \
686                {chained_session_id}, but this session is not there (anymore)"
687            );
688            return;
689        };
690
691        match session {
692            ChainedSessionState::WaitingForYiviServer { .. } => {
693                log::warn!(
694                    "to be aborted yivi server request {request_id} of chained session \
695                    {chained_session_id} is not (yet) recorded in chained session state \
696                    (nor any other yivi server request)"
697                );
698            }
699            ChainedSessionState::YiviServerWaiting { waiters, .. } => {
700                let removed = waiters.remove(&request_id);
701
702                if removed.is_none() {
703                    log::warn!(
704                        "to be aborted yivi server request {request_id} of chained session \
705                        {chained_session_id} is not (yet) recorded in chained session state"
706                    );
707                }
708
709                // NOTE: there is no need to actually send anything over the response sender in
710                // `removed` back to the waiting yivi server, because it has aborted.
711            }
712        }
713    }
714
715    async fn handle_release_next_session(
716        &mut self,
717        chained_session_id: id::Id,
718        next_session_request: NextSession,
719        stale_after: Option<u16>,
720        resp_sender: ReleaseNextSessionCrs,
721    ) {
722        let Some(session) = self.sessions.get_mut(&chained_session_id) else {
723            log::debug!(
724                "request to release chained session {chained_session_id} that cannot be found "
725            );
726            Self::respond_to(
727                resp_sender,
728                Ok(api::auths::YiviReleaseNextSessionResp::SessionGone),
729                chained_session_id,
730            );
731            return;
732        };
733
734        // check session is ready for the yivi server
735        match session {
736            ChainedSessionState::WaitingForYiviServer { .. } => {
737                log::debug!(
738                    "request to release a yivi server that's not there yet, \
739                    in chained session {chained_session_id}"
740                );
741                Self::respond_to(
742                    resp_sender,
743                    Ok(api::auths::YiviReleaseNextSessionResp::TooEarly),
744                    chained_session_id,
745                );
746                return;
747            }
748            ChainedSessionState::YiviServerWaiting { .. } => {
749                // this is what we want
750            }
751        }
752
753        log::trace!(
754            "yivi server is about to be released from chained session {chained_session_id}"
755        );
756        let Some(ChainedSessionState::YiviServerWaiting {
757            waiters,
758            first_arrived_at,
759            ..
760        }) = self.sessions.remove(&chained_session_id)
761        else {
762            panic!("chained session state changed unexpectedly");
763        };
764
765        if waiters.is_empty() {
766            Self::respond_to(
767                resp_sender,
768                Ok(api::auths::YiviReleaseNextSessionResp::YiviServerGone),
769                chained_session_id,
770            );
771            return;
772        }
773
774        if let Some(stale_after) = stale_after
775            && std::time::Instant::now()
776                .duration_since(first_arrived_at)
777                .as_millis()
778                > stale_after.into()
779        {
780            Self::respond_to(
781                resp_sender,
782                Ok(api::auths::YiviReleaseNextSessionResp::YiviServerGone),
783                chained_session_id,
784            );
785            return;
786        }
787
788        for waiter in waiters.into_values() {
789            Self::respond_to(waiter, Ok(next_session_request.clone()), chained_session_id);
790        }
791
792        Self::respond_to(
793            resp_sender,
794            Ok(api::auths::YiviReleaseNextSessionResp::Success {}),
795            chained_session_id,
796        );
797    }
798
799    async fn sleep_until_next_expiry(&self) {
800        match self.expiry_queue.front() {
801            Some((expiry, _)) => tokio::time::sleep_until(*expiry).await,
802            None => std::future::pending().await,
803        }
804    }
805
806    fn expire_next(&mut self) {
807        let now = tokio::time::Instant::now();
808
809        while let Some((_, id)) = self.expiry_queue.pop_front_if(|(expiry, _)| *expiry <= now) {
810            let Some(session) = self.sessions.remove(&id) else {
811                continue; // already completed normally
812            };
813
814            log::debug!("chained session {id} expired");
815            match session {
816                ChainedSessionState::WaitingForYiviServer { waiters } => {
817                    for waiter in waiters {
818                        Self::respond_to(
819                            waiter,
820                            Ok(api::auths::YiviWaitForResultResp::SessionGone),
821                            id,
822                        );
823                    }
824                }
825                ChainedSessionState::YiviServerWaiting { waiters, .. } => {
826                    for waiter in waiters.into_values() {
827                        Self::respond_to(waiter, Ok(None), id);
828                    }
829                }
830            }
831        }
832    }
833}