1use super::server::*;
3
4use crate::api;
5use crate::id;
6use crate::misc;
7use crate::misc::jwt;
8use crate::misc::stream_ext::StreamExt as _;
9use crate::servers::yivi;
10
11use super::server::YiviCtx;
12
13use std::collections::{HashMap, VecDeque};
14use std::rc::Rc;
15
16use actix_web::web;
17use futures::future::FutureExt as _;
18use futures::stream::StreamExt as _;
19
20impl App {
21 pub fn chained_sessions_ctl_or_bad_request(&self) -> api::Result<&ChainedSessionsCtl> {
22 self.chained_sessions_ctl.as_ref().ok_or_else(|| {
23 log::debug!("chained sessions control requested, but not available");
24 api::ErrorCode::BadRequest
25 })
26 }
27
28 pub async fn handle_yivi_wait_for_result(
30 app: Rc<Self>,
31 req: web::Json<api::auths::YiviWaitForResultReq>,
32 ) -> api::Result<api::auths::YiviWaitForResultResp> {
33 let csc = app.chained_sessions_ctl_or_bad_request()?;
34
35 let api::auths::YiviWaitForResultReq { state } = req.into_inner();
36
37 let Some(state) = AuthState::unseal(&state, &app.auth_state_secret) else {
38 return Ok(api::auths::YiviWaitForResultResp::PleaseRestartAuth);
39 };
40
41 let Some(ChainedSessionSetup { id: session_id, .. }) = state.yivi_chained_session else {
42 log::debug!(
43 "yivi-wait-for-result endpoint called on a authentication session without a yivi chained session"
44 );
45 return Err(api::ErrorCode::BadRequest);
46 };
47
48 csc.wait_for_result(session_id).await
49 }
50
51 pub async fn handle_yivi_next_session(
53 app: web::Data<std::rc::Rc<App>>,
54 query: web::Query<api::auths::YiviNextSessionQuery>,
55 result_jwt: String,
56 ) -> impl actix_web::Responder {
57 use actix_web::Either::{Left, Right};
58
59 let app = app.into_inner();
60 let api::auths::YiviNextSessionQuery { state } = query.into_inner();
61
62 log::trace!(
63 "yivi server (or imposter) submits next sessions request; jwt: {result_jwt:?}; auth state: {}",
64 state
65 );
66
67 let result_jwt = jwt::JWT::from(result_jwt);
68
69 let Some(state) = AuthState::unseal(&state, &app.auth_state_secret) else {
70 log::debug!(
71 "yivi server (or an imposter) submitted invalid (or expired) auth state to next-session endpoint"
72 );
73 return Left(actix_web::HttpResponse::BadRequest().finish());
74 };
75
76 let Some(ChainedSessionSetup {
77 id: chained_session_id,
78 drip,
79 }) = state.yivi_chained_session
80 else {
81 log::warn!(
82 "yivi server submitted auth state to next-session endpoint without chained session id"
83 );
84 return Left(actix_web::HttpResponse::BadRequest().finish());
85 };
86
87 let Some(csc) = app.chained_sessions_ctl.clone() else {
89 log::warn!("next-session endpoint invoked, but chained sessions are not supported");
90 return Left(actix_web::HttpResponse::BadRequest().finish());
91 };
92
93 let Some(yivi) = app.yivi.as_ref() else {
94 log::warn!("next-session endpoint invoked, but yivi is not supported");
95 return Left(actix_web::HttpResponse::BadRequest().finish());
96 };
97
98 let Ok(..) = yivi::SessionResult::open_signed(&result_jwt, &yivi.server_creds) else {
99 log::debug!(
100 "invalid yivi signed session result submitted by yivi server (or imposter)",
101 );
102 log::trace!("invalid signed session result jwt: {result_jwt}");
103 return Left(actix_web::HttpResponse::BadRequest().finish());
104 };
105
106 log::trace!(
107 "yivi server submitted disclosure and is waiting for chained session {chained_session_id}"
108 );
109
110 let request_id = id::Id::random();
111
112 let wfns_fut =
113 csc.clone()
114 .wait_for_next_session(chained_session_id, request_id, result_jwt);
115
116 if drip {
117 let on_drop = move || {
120 tokio::task::spawn_local(async move {
121 let _ = csc
122 .send_command(CscCommand::AbortWaitForNextSession {
123 chained_session_id,
124 request_id,
125 })
126 .await;
127 });
128 };
129
130 let wfns_fut = async move {
131 let _deferred = misc::defer(on_drop);
132 wfns_fut.await
133 };
134
135 Right(Left(Self::dripping_wfns_responder(wfns_fut)))
136 } else {
137 Right(Right(Self::regular_wfns_responder(wfns_fut).await))
138 }
139 }
140
141 async fn regular_wfns_responder(
142 wfns_fut: impl Future<Output = api::Result<NextSession>>,
143 ) -> impl actix_web::Responder {
144 match wfns_fut.await {
145 Ok(None) => actix_web::HttpResponse::NoContent().finish(),
146 Ok(Some(session_request)) => {
147 log::debug!("sent chained session to yivi server");
148 actix_web::HttpResponse::Ok().json(session_request)
149 }
150 Err(api::ErrorCode::InternalError) => {
151 actix_web::HttpResponse::InternalServerError().finish()
152 }
153 Err(api::ErrorCode::BadRequest) => actix_web::HttpResponse::BadRequest().finish(),
154 Err(api::ErrorCode::PleaseRetry) => panic!("not expecting 'please retry' here"),
155 }
156 }
157
158 fn dripping_wfns_responder(
159 wfns_fut: impl Future<Output = api::Result<NextSession>> + 'static,
160 ) -> impl actix_web::Responder {
161 let drips = tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(
162 core::time::Duration::from_millis(100),
163 ))
164 .map(|_| Ok(bytes::Bytes::from_static(b" ")));
165
166 let result_bytes_fut = wfns_fut.map(|result| match result {
167 Ok(Some(session_request)) => {
168 log::debug!("sent chained session to yivi server: {session_request:?}");
169 Ok(serde_json::to_vec_pretty(&session_request)
170 .map_err(|err| {
171 log::error!("failed to serialize session_request to json: {err:#}");
172 api::ErrorCode::InternalError
173 })?
174 .into())
175 }
176 Ok(None) => {
177 log::error!("bug: `None` session_request reached dripping chained session");
178 Err(api::ErrorCode::InternalError)
179 }
180 Err(err) => {
181 log::warn!("failed to release session_request to yivi server: {err:#}");
182 Err(err)
183 }
184 });
185
186 let stream = drips.until_overridden_by(result_bytes_fut.into_stream());
187
188 actix_web::HttpResponse::Ok()
189 .content_type(mime::APPLICATION_JSON)
191 .streaming(stream)
192 }
193
194 pub async fn handle_yivi_release_next_session(
196 app: Rc<Self>,
197 req: web::Json<api::auths::YiviReleaseNextSessionReq>,
198 ) -> api::Result<api::auths::YiviReleaseNextSessionResp> {
199 let csc = app.chained_sessions_ctl_or_bad_request()?;
200 let yivi = app.get_yivi()?;
201
202 let api::auths::YiviReleaseNextSessionReq {
203 state,
204 next_session,
205 stale_after,
206 } = req.into_inner();
207
208 let Some(state) = AuthState::unseal(&state, &app.auth_state_secret) else {
209 return Ok(api::auths::YiviReleaseNextSessionResp::PleaseRestartAuth);
210 };
211
212 let Some(ChainedSessionSetup {
213 id: session_id,
214 drip,
215 }) = state.yivi_chained_session
216 else {
217 log::debug!(
218 "yivi-release-next-session endpoint called on a authentication session without a yivi chained session"
219 );
220 return Err(api::ErrorCode::BadRequest);
221 };
222
223 if next_session.is_none() && drip {
224 log::debug!(
225 "yivi-release-next-session endpoint on a dripping chained session, but with empty next session."
226 );
227 return Err(api::ErrorCode::BadRequest);
228 }
229
230 let esr = if let Some(jwt) = next_session {
231 Some(
232 yivi::ExtendedSessionRequest::open_signed(
233 &jwt,
234 &yivi.requestor_creds.to_verifying_credentials(),
235 )
236 .map_err(|err| {
237 log::debug!("failed to open signed extended session request: {}", err);
238 api::ErrorCode::BadRequest
239 })?,
240 )
241 } else {
242 None
243 };
244
245 csc.release_next_session(session_id, esr, stale_after).await
246 }
247}
248
249#[derive(Clone)]
253pub struct ChainedSessionsCtl {
254 sender: tokio::sync::mpsc::Sender<CscCommand>,
255}
256
257#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)]
258pub struct ChainedSessionsConfig {
259 #[serde(with = "crate::misc::time_ext::human_duration")]
260 #[serde(default = "default_chained_session_validity")]
261 pub session_validity: core::time::Duration,
262}
263
264fn default_chained_session_validity() -> core::time::Duration {
265 core::time::Duration::from_secs(10 * 60) }
267
268impl Default for ChainedSessionsConfig {
269 fn default() -> Self {
270 crate::misc::serde_ext::default_object()
271 }
272}
273
274type NextSession = Option<yivi::ExtendedSessionRequest>;
275
276type CreateSessionCrs = tokio::sync::oneshot::Sender<api::Result<id::Id>>;
277type WaitForResultCrs =
278 tokio::sync::oneshot::Sender<api::Result<api::auths::YiviWaitForResultResp>>;
279type WaitForNextSessionCrs = tokio::sync::oneshot::Sender<api::Result<NextSession>>;
280type ReleaseNextSessionCrs =
281 tokio::sync::oneshot::Sender<api::Result<api::auths::YiviReleaseNextSessionResp>>;
282
283enum CscCommand {
284 CreateSession {
285 resp_sender: CreateSessionCrs,
286 },
287 WaitForResult {
288 chained_session_id: id::Id,
289 resp_sender: WaitForResultCrs,
290 },
291 WaitForNextSession {
292 chained_session_id: id::Id,
293 request_id: id::Id,
295 disclosure: jwt::JWT,
296 resp_sender: WaitForNextSessionCrs,
297 },
298 AbortWaitForNextSession {
299 chained_session_id: id::Id,
300 request_id: id::Id,
301 },
302 ReleaseNextSession {
303 chained_session_id: id::Id,
304 next_session_request: NextSession,
305 stale_after: Option<u16>,
306 resp_sender: ReleaseNextSessionCrs,
307 },
308}
309
310impl ChainedSessionsCtl {
311 pub async fn create_session(&self) -> api::Result<id::Id> {
313 let (resp_sender, resp_receiver) = tokio::sync::oneshot::channel();
314
315 self.send_command(CscCommand::CreateSession { resp_sender })
316 .await?;
317
318 let Ok(resp) = resp_receiver.await else {
319 log::warn!("chained session control create-session response channel closed early");
320 return Err(api::ErrorCode::InternalError);
321 };
322
323 resp
324 }
325
326 pub async fn wait_for_result(
328 &self,
329 chained_session_id: id::Id,
330 ) -> api::Result<api::auths::YiviWaitForResultResp> {
331 let (resp_sender, resp_receiver) = tokio::sync::oneshot::channel();
332
333 self.send_command(CscCommand::WaitForResult {
334 chained_session_id,
335 resp_sender,
336 })
337 .await?;
338
339 let Ok(resp) = resp_receiver.await else {
340 log::warn!("chained session control wait-for-result response channel closed early");
341 return Err(api::ErrorCode::InternalError);
342 };
343
344 resp
345 }
346
347 pub async fn wait_for_next_session(
352 self,
353 chained_session_id: id::Id,
354 request_id: id::Id,
355 disclosure: jwt::JWT,
356 ) -> api::Result<NextSession> {
357 let (resp_sender, resp_receiver) = tokio::sync::oneshot::channel();
358
359 self.send_command(CscCommand::WaitForNextSession {
360 chained_session_id,
361 request_id,
362 disclosure,
363 resp_sender,
364 })
365 .await?;
366
367 let Ok(resp) = resp_receiver.await else {
368 log::warn!(
369 "chained session control wait-for-next-session response channel closed early"
370 );
371 return Err(api::ErrorCode::InternalError);
372 };
373
374 resp
375 }
376
377 pub async fn release_next_session(
379 &self,
380 chained_session_id: id::Id,
381 next_session_request: NextSession,
382 stale_after: Option<u16>,
383 ) -> api::Result<api::auths::YiviReleaseNextSessionResp> {
384 let (resp_sender, resp_receiver) = tokio::sync::oneshot::channel();
385
386 self.send_command(CscCommand::ReleaseNextSession {
387 chained_session_id,
388 next_session_request,
389 stale_after,
390 resp_sender,
391 })
392 .await?;
393
394 let Ok(resp) = resp_receiver.await else {
395 log::warn!(
396 "chained session control release-next-session response channel closed early"
397 );
398 return Err(api::ErrorCode::InternalError);
399 };
400
401 resp
402 }
403
404 async fn send_command(&self, cmd: CscCommand) -> api::Result<()> {
405 self.sender.send(cmd).await.map_err(|_| {
406 log::warn!("chained session control command channel closed early");
407 api::ErrorCode::InternalError
408 })
409 }
410
411 pub fn new(ctx: YiviCtx) -> Self {
413 let (sender, receiver) = tokio::sync::mpsc::channel(10);
414
415 tokio::spawn(async {
416 log::trace!("spawned chained sessions control task");
417
418 ChainedSessionsCtl::drive(ctx, receiver).await;
419
420 log::trace!("chained sessions control task is about to complete");
421 });
422
423 Self { sender }
424 }
425
426 async fn drive(ctx: YiviCtx, mut receiver: tokio::sync::mpsc::Receiver<CscCommand>) {
427 let mut backend = ChainedSessionsBackend::new(ctx);
428
429 loop {
430 tokio::select! {
431 cmd_maybe = receiver.recv() => {
432 let Some(cmd) = cmd_maybe else {
433 return
435 };
436
437 backend.handle_cmd(cmd).await;
438 }
439 _ = backend.sleep_until_next_expiry() => {
440 backend.expire_next();
441 }
442 };
443 }
444 }
445}
446
447enum ChainedSessionState {
448 WaitingForYiviServer {
449 waiters: Vec<WaitForResultCrs>,
450 },
451 YiviServerWaiting {
452 disclosure: jwt::JWT,
453 waiters: HashMap<id::Id, WaitForNextSessionCrs>,
455 first_arrived_at: std::time::Instant,
456 },
457}
458
459struct ChainedSessionsBackend {
461 ctx: YiviCtx,
462 sessions: HashMap<id::Id, ChainedSessionState>,
463
464 expiry_queue: VecDeque<(tokio::time::Instant, id::Id)>,
468}
469
470impl ChainedSessionsBackend {
471 fn new(ctx: YiviCtx) -> Self {
472 Self {
473 ctx,
474 sessions: Default::default(),
475 expiry_queue: Default::default(),
476 }
477 }
478
479 async fn handle_cmd(&mut self, cmd: CscCommand) {
480 match cmd {
481 CscCommand::WaitForResult {
482 chained_session_id,
483 resp_sender,
484 } => {
485 self.handle_wait_for_result(chained_session_id, resp_sender)
486 .await
487 }
488 CscCommand::WaitForNextSession {
489 chained_session_id,
490 request_id,
491 disclosure,
492 resp_sender,
493 } => {
494 self.handle_wait_for_next_session(
495 chained_session_id,
496 request_id,
497 disclosure,
498 resp_sender,
499 )
500 .await
501 }
502 CscCommand::AbortWaitForNextSession {
503 chained_session_id,
504 request_id,
505 } => {
506 self.handle_abort_wait_for_next_session(chained_session_id, request_id)
507 .await
508 }
509 CscCommand::CreateSession { resp_sender } => {
510 self.handle_create_session(resp_sender).await
511 }
512 CscCommand::ReleaseNextSession {
513 chained_session_id,
514 next_session_request,
515 stale_after,
516 resp_sender,
517 } => {
518 self.handle_release_next_session(
519 chained_session_id,
520 next_session_request,
521 stale_after,
522 resp_sender,
523 )
524 .await
525 }
526 }
527 }
528
529 fn respond_to<T>(
530 resp_sender: tokio::sync::oneshot::Sender<T>,
531 resp: T,
532 chained_session_id: id::Id,
533 ) {
534 if resp_sender.send(resp).is_err() {
535 log::warn!(
536 "response channel for chained session {chained_session_id} was closed before response could be sent"
537 );
538 }
539 }
540
541 async fn handle_create_session(&mut self, resp_sender: CreateSessionCrs) {
542 let chained_session_id = id::Id::random();
543
544 assert!(
545 self.sessions
546 .insert(
547 chained_session_id,
548 ChainedSessionState::WaitingForYiviServer { waiters: vec![] }
549 )
550 .is_none(),
551 "against all odds, 256-bit random ids collided!"
552 );
553
554 let expiry =
555 tokio::time::Instant::now() + self.ctx.chained_sessions_config.session_validity;
556 self.expiry_queue.push_back((expiry, chained_session_id));
557
558 log::trace!("chained session {chained_session_id} created");
559
560 Self::respond_to(resp_sender, Ok(chained_session_id), chained_session_id);
561 }
562
563 async fn handle_wait_for_result(
564 &mut self,
565 chained_session_id: id::Id,
566 resp_sender: WaitForResultCrs,
567 ) {
568 let Some(session) = self.sessions.get_mut(&chained_session_id) else {
569 Self::respond_to(
570 resp_sender,
571 Ok(api::auths::YiviWaitForResultResp::SessionGone),
572 chained_session_id,
573 );
574 return;
575 };
576
577 match session {
578 ChainedSessionState::WaitingForYiviServer { waiters } => {
579 log::trace!(
580 "registered waiter for the result of chained session {chained_session_id}",
581 );
582 waiters.push(resp_sender)
583 }
584 ChainedSessionState::YiviServerWaiting { disclosure, .. } => {
585 log::trace!(
586 "result for chained session {chained_session_id} requested and immediately available"
587 );
588 Self::respond_to(
589 resp_sender,
590 Ok(api::auths::YiviWaitForResultResp::Success {
591 disclosure: disclosure.clone(),
592 }),
593 chained_session_id,
594 )
595 }
596 }
597 }
598
599 async fn handle_wait_for_next_session(
600 &mut self,
601 chained_session_id: id::Id,
602 request_id: id::Id,
603 disclosure: jwt::JWT,
604 resp_sender: WaitForNextSessionCrs,
605 ) {
606 let Some(session) = self.sessions.get_mut(&chained_session_id) else {
607 log::warn!(
608 "yivi server submitted disclosure for a chained session {chained_session_id} that cannot be found - was the yivi server too slow?"
609 );
610 Self::respond_to(resp_sender, Ok(None), chained_session_id);
611 return;
612 };
613
614 match session {
616 ChainedSessionState::WaitingForYiviServer { .. } => {
617 }
619 ChainedSessionState::YiviServerWaiting {
620 disclosure: stored_disclosure,
621 waiters,
622 ..
623 } => {
624 if &disclosure != stored_disclosure {
625 log::warn!(
626 "second yivi server submitted a different disclosure for chained session {chained_session_id}"
627 );
628 Self::respond_to(
629 resp_sender,
630 Err(api::ErrorCode::BadRequest),
631 chained_session_id,
632 );
633 return;
634 }
635 log::trace!(
636 "an additional yivi server ({request_id}) is waiting for chained session {chained_session_id}"
637 );
638 let should_be_none = waiters.insert(request_id, resp_sender);
639 if should_be_none.is_some() {
640 log::error!("bug: 256-bit random `request_id` collided - was it random?");
641 panic!("bug: random `request_id`s collided");
642 }
643 return;
644 }
645 }
646 let old_session = std::mem::replace(
647 session,
648 ChainedSessionState::YiviServerWaiting {
649 disclosure: disclosure.clone(),
650 waiters: [(request_id, resp_sender)].into(),
651 first_arrived_at: std::time::Instant::now(),
652 },
653 );
654
655 match old_session {
656 ChainedSessionState::WaitingForYiviServer { waiters } => {
657 log::trace!(
659 "releasing {} waiter(s) on the result of chained session {chained_session_id}",
660 waiters.len()
661 );
662 for waiter in waiters.into_iter() {
663 Self::respond_to(
664 waiter,
665 Ok(api::auths::YiviWaitForResultResp::Success {
666 disclosure: disclosure.clone(),
667 }),
668 chained_session_id,
669 )
670 }
671 }
672 ChainedSessionState::YiviServerWaiting { .. } => {
673 panic!("session changed unexpectedly");
674 }
675 }
676 }
677
678 async fn handle_abort_wait_for_next_session(
679 &mut self,
680 chained_session_id: id::Id,
681 request_id: id::Id,
682 ) {
683 let Some(session) = self.sessions.get_mut(&chained_session_id) else {
684 log::trace!(
685 "wanting to abort yivi server request {request_id} of chained session \
686 {chained_session_id}, but this session is not there (anymore)"
687 );
688 return;
689 };
690
691 match session {
692 ChainedSessionState::WaitingForYiviServer { .. } => {
693 log::warn!(
694 "to be aborted yivi server request {request_id} of chained session \
695 {chained_session_id} is not (yet) recorded in chained session state \
696 (nor any other yivi server request)"
697 );
698 }
699 ChainedSessionState::YiviServerWaiting { waiters, .. } => {
700 let removed = waiters.remove(&request_id);
701
702 if removed.is_none() {
703 log::warn!(
704 "to be aborted yivi server request {request_id} of chained session \
705 {chained_session_id} is not (yet) recorded in chained session state"
706 );
707 }
708
709 }
712 }
713 }
714
715 async fn handle_release_next_session(
716 &mut self,
717 chained_session_id: id::Id,
718 next_session_request: NextSession,
719 stale_after: Option<u16>,
720 resp_sender: ReleaseNextSessionCrs,
721 ) {
722 let Some(session) = self.sessions.get_mut(&chained_session_id) else {
723 log::debug!(
724 "request to release chained session {chained_session_id} that cannot be found "
725 );
726 Self::respond_to(
727 resp_sender,
728 Ok(api::auths::YiviReleaseNextSessionResp::SessionGone),
729 chained_session_id,
730 );
731 return;
732 };
733
734 match session {
736 ChainedSessionState::WaitingForYiviServer { .. } => {
737 log::debug!(
738 "request to release a yivi server that's not there yet, \
739 in chained session {chained_session_id}"
740 );
741 Self::respond_to(
742 resp_sender,
743 Ok(api::auths::YiviReleaseNextSessionResp::TooEarly),
744 chained_session_id,
745 );
746 return;
747 }
748 ChainedSessionState::YiviServerWaiting { .. } => {
749 }
751 }
752
753 log::trace!(
754 "yivi server is about to be released from chained session {chained_session_id}"
755 );
756 let Some(ChainedSessionState::YiviServerWaiting {
757 waiters,
758 first_arrived_at,
759 ..
760 }) = self.sessions.remove(&chained_session_id)
761 else {
762 panic!("chained session state changed unexpectedly");
763 };
764
765 if waiters.is_empty() {
766 Self::respond_to(
767 resp_sender,
768 Ok(api::auths::YiviReleaseNextSessionResp::YiviServerGone),
769 chained_session_id,
770 );
771 return;
772 }
773
774 if let Some(stale_after) = stale_after
775 && std::time::Instant::now()
776 .duration_since(first_arrived_at)
777 .as_millis()
778 > stale_after.into()
779 {
780 Self::respond_to(
781 resp_sender,
782 Ok(api::auths::YiviReleaseNextSessionResp::YiviServerGone),
783 chained_session_id,
784 );
785 return;
786 }
787
788 for waiter in waiters.into_values() {
789 Self::respond_to(waiter, Ok(next_session_request.clone()), chained_session_id);
790 }
791
792 Self::respond_to(
793 resp_sender,
794 Ok(api::auths::YiviReleaseNextSessionResp::Success {}),
795 chained_session_id,
796 );
797 }
798
799 async fn sleep_until_next_expiry(&self) {
800 match self.expiry_queue.front() {
801 Some((expiry, _)) => tokio::time::sleep_until(*expiry).await,
802 None => std::future::pending().await,
803 }
804 }
805
806 fn expire_next(&mut self) {
807 let now = tokio::time::Instant::now();
808
809 while let Some((_, id)) = self.expiry_queue.pop_front_if(|(expiry, _)| *expiry <= now) {
810 let Some(session) = self.sessions.remove(&id) else {
811 continue; };
813
814 log::debug!("chained session {id} expired");
815 match session {
816 ChainedSessionState::WaitingForYiviServer { waiters } => {
817 for waiter in waiters {
818 Self::respond_to(
819 waiter,
820 Ok(api::auths::YiviWaitForResultResp::SessionGone),
821 id,
822 );
823 }
824 }
825 ChainedSessionState::YiviServerWaiting { waiters, .. } => {
826 for waiter in waiters.into_values() {
827 Self::respond_to(waiter, Ok(None), id);
828 }
829 }
830 }
831 }
832 }
833}