Skip to main content

pubhubs/cli/
admin.rs

1use anyhow::{Context as _, Result};
2
3use crate::api::{self};
4use crate::cli;
5use crate::client;
6use crate::servers::{self, Config};
7
8#[derive(clap::Args, Debug)]
9pub struct AdminArgs {
10    #[command(flatten)]
11    common: cli::CommonArgs,
12
13    #[arg(value_name = "SERVER")]
14    server: servers::Name,
15
16    /// Admin secret key, hex encoded.
17    #[arg(value_name = "ADMIN_KEY")]
18    admin_key: api::SigningKey,
19
20    #[command(subcommand)]
21    command: Commands,
22}
23
24impl AdminArgs {
25    pub fn run(self, _spec: &mut clap::Command) -> Result<()> {
26        env_logger::init();
27        let config = self.common.load_config()?;
28
29        match self.command {
30            Commands::Config(args) => args.run(AdminContext {
31                config,
32                server: self.server,
33                admin_key: self.admin_key,
34                url: tokio::sync::OnceCell::const_new(),
35                client: client::Client::builder().agent(client::Agent::Cli).finish(),
36            }),
37        }
38    }
39}
40
41struct AdminContext {
42    config: Config,
43    server: servers::Name,
44    admin_key: api::SigningKey,
45    url: tokio::sync::OnceCell<url::Url>,
46    client: client::Client,
47}
48
49impl AdminContext {
50    async fn retrieve_config(&self) -> anyhow::Result<Config> {
51        let resp = self
52            .client
53            .query_with_retry::<api::admin::InfoEP, _, _>(
54                &self.get_url().await?.clone(),
55                &api::Signed::<api::admin::InfoReq>::new(
56                    &*self.admin_key,
57                    &api::admin::InfoReq {},
58                    std::time::Duration::from_secs(10),
59                )
60                .unwrap(),
61            )
62            .await?;
63
64        match resp {
65            api::admin::InfoResp::Success { config } => Ok(*config),
66            api::admin::InfoResp::ResignRequest => {
67                anyhow::bail!("request expired unexpectedly quickly")
68            }
69            api::admin::InfoResp::InvalidAdminKey => anyhow::bail!("invalid admin key"),
70        }
71    }
72
73    async fn get_url(&self) -> Result<&url::Url> {
74        self.url
75            .get_or_try_init(|| async {
76                if self.server == servers::Name::PubhubsCentral {
77                    return Ok(self.config.phc_url.as_ref().clone());
78                }
79
80                log::info!(
81                    "retrieving constellation from {phc_url} to get url of {server_name}",
82                    phc_url = self.config.phc_url.as_ref(),
83                    server_name = self.server
84                );
85
86                let constellation_or_id = self
87                    .client
88                    .get_constellation(self.config.phc_url.as_ref())
89                    .await?;
90
91                let constellation = constellation_or_id
92                    .constellation()
93                    .context("phc did not return a constellation; just an id")?;
94
95                Ok(constellation.url(self.server).clone())
96            })
97            .await
98    }
99}
100
101#[derive(clap::Subcommand, Debug)]
102enum Commands {
103    /// Retrieves the current configuration,
104    /// or change it, using the `update` subcommand.
105    Config(ConfigArgs),
106}
107
108#[derive(clap::Args, Debug)]
109pub struct ConfigArgs {
110    #[command(subcommand)]
111    command: Option<ConfigCommands>,
112}
113
114impl ConfigArgs {
115    fn run(self, ctx: AdminContext) -> Result<()> {
116        tokio::runtime::Builder::new_current_thread()
117            .enable_all()
118            .build()?
119            .block_on(tokio::task::LocalSet::new().run_until(self.run_async(ctx)))
120    }
121
122    async fn run_async(self, ctx: AdminContext) -> Result<()> {
123        self.command.unwrap_or_default().run(ctx).await
124    }
125}
126
127#[derive(clap::Subcommand, Debug)]
128enum ConfigCommands {
129    /// Retrieves the current config used by the server - the default command.
130    Get(ConfigGetArgs),
131
132    /// Temporarily updates part of the server configuration.
133    ///
134    /// The changes are made in-memory, not on disk, and thus only persist across 'soft' server
135    /// restarts (where the `pubhubs serve` process does not actually restart) such as the ones
136    /// caused by discovery.
137    Update(ConfigUpdateArgs),
138}
139
140impl ConfigCommands {
141    async fn run(self, ctx: AdminContext) -> Result<()> {
142        match self {
143            ConfigCommands::Get(args) => args.run(ctx).await,
144            ConfigCommands::Update(args) => args.run(ctx).await,
145        }
146    }
147}
148
149impl Default for ConfigCommands {
150    fn default() -> Self {
151        ConfigCommands::Get(ConfigGetArgs::default())
152    }
153}
154
155#[derive(clap::Args, Debug)]
156pub struct ConfigUpdateArgs {
157    /// Points to the part of the configuratio to change, e.g. '/phc/enc_key'.
158    #[arg(value_name = "POINTER")]
159    pub pointer: String,
160
161    /// The new JSON value to insert at `pointer`. FIXME: always parsed as JSON string
162    #[arg(value_name = "NEW_VALUE")]
163    pub new_value: serde_json::Value,
164}
165
166impl ConfigUpdateArgs {
167    async fn run(self, ctx: AdminContext) -> Result<()> {
168        let config = ctx.retrieve_config().await?;
169
170        log::info!("Checking the configuration change locally...");
171        config.json_updated(&self.pointer, self.new_value.clone())?;
172
173        log::info!("Sending configuration change to {}...", ctx.server);
174        let resp = ctx
175            .client
176            .query_with_retry::<api::admin::UpdateConfigEP, _, _>(
177                ctx.get_url().await?,
178                &api::Signed::<api::admin::UpdateConfigReq>::new(
179                    &*ctx.admin_key,
180                    &api::admin::UpdateConfigReq {
181                        pointer: self.pointer.clone(),
182                        new_value: self.new_value.clone(),
183                    },
184                    std::time::Duration::from_secs(10),
185                )?,
186            )
187            .await?;
188
189        match resp {
190            api::admin::UpdateConfigResp::Success => {}
191            api::admin::UpdateConfigResp::ResignRequest => {
192                anyhow::bail!("request expired unexpectedly quickly")
193            }
194            api::admin::UpdateConfigResp::InvalidAdminKey => {
195                anyhow::bail!("admin key suddenly became invalid")
196            }
197        }
198
199        log::info!("Waiting for the configuration change to take effect...");
200        crate::misc::task::retry::<(), anyhow::Error, _>(|| async {
201            let new_config: Config = ctx.retrieve_config().await?;
202            let json_new_config = serde_json::to_value(new_config)?;
203
204            let current_value = json_new_config.pointer(&self.pointer).ok_or_else(|| {
205                anyhow::anyhow!(
206                "indexing configuration using pointer failed although it worked just a moment ago!"
207            )
208            })?;
209
210            if *current_value == self.new_value {
211                Ok(Some(()))
212            } else {
213                log::info!(
214                    "{}'s configuration at {} is still {} and not yet {}",
215                    ctx.server,
216                    &self.pointer,
217                    current_value,
218                    self.new_value
219                );
220                Ok(None)
221            }
222        })
223        .await?
224        .ok_or_else(|| anyhow::anyhow!("timed out"))?;
225
226        log::info!("configuration update applied and verified");
227
228        Ok(())
229    }
230}
231
232#[derive(clap::Args, Debug, Default)]
233pub struct ConfigGetArgs {}
234
235impl ConfigGetArgs {
236    async fn run(self, ctx: AdminContext) -> Result<()> {
237        let config = ctx.retrieve_config().await?;
238
239        let stdout = std::io::stdout().lock();
240
241        serde_json::to_writer_pretty(stdout, &config)?;
242
243        Ok(())
244    }
245}