From 960283324d3af998da6f98cc05b87732b5d5fb76 Mon Sep 17 00:00:00 2001 From: Aravinth Manivannan Date: Fri, 20 Oct 2023 01:47:32 +0530 Subject: [PATCH] feat: schedule mCaptcha/survey registration and uploads --- config/default.toml | 8 +-- src/main.rs | 19 +++++- src/survey.rs | 160 ++++++++++++++++++++++++++++++++++---------- 3 files changed, 146 insertions(+), 41 deletions(-) diff --git a/config/default.toml b/config/default.toml index 1f69a48b..72e0f7b9 100644 --- a/config/default.toml +++ b/config/default.toml @@ -67,7 +67,7 @@ port = 10025 username = "admin" password = "password" -[survey] -nodes = ["http://localhost:7001"] -rate_limit = 10 # upload every hour -instance_root_url = "http://localhost:7000" +#[survey] +#nodes = ["http://localhost:7001"] +#rate_limit = 10 # upload every hour +#instance_root_url = "http://localhost:7000" diff --git a/src/main.rs b/src/main.rs index 55650eda..c96b475c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -46,6 +46,7 @@ use static_assets::FileMap; pub use widget::WIDGET_ROUTES; use crate::demo::DemoUser; +use survey::SurveyClientTrait; lazy_static! { pub static ref SETTINGS: Settings = Settings::new().unwrap(); @@ -106,7 +107,7 @@ async fn main() -> std::io::Result<()> { let settings = Settings::new().unwrap(); let secrets = survey::SecretsStore::default(); - let data = Data::new(&settings, secrets).await; + let data = Data::new(&settings, secrets.clone()).await; let data = actix_web::web::Data::new(data); let mut demo_user: Option = None; @@ -119,6 +120,13 @@ async fn main() -> std::io::Result<()> { ); } + let (mut survey_upload_tx, mut survey_upload_handle) = (None, None); + if settings.survey.is_some() { + let survey_runner_ctx = survey::Survey::new(data.clone()); + let (x, y) = survey_runner_ctx.start_job().await.unwrap(); + (survey_upload_tx, survey_upload_handle) = (Some(x), Some(y)); + } + let ip = settings.server.get_ip(); println!("Starting server on: http://{ip}"); @@ -143,9 +151,18 @@ async fn main() -> std::io::Result<()> { .run() .await?; + if let Some(survey_upload_tx) = survey_upload_tx { + survey_upload_tx.send(()).unwrap(); + } + if let Some(demo_user) = demo_user { demo_user.abort(); } + + if let Some(survey_upload_handle) = survey_upload_handle { + survey_upload_handle.await.unwrap(); + } + Ok(()) } diff --git a/src/survey.rs b/src/survey.rs index 243d7ce9..b2ecfb5e 100644 --- a/src/survey.rs +++ b/src/survey.rs @@ -9,17 +9,21 @@ use std::time::Duration; use reqwest::Client; use serde::{Deserialize, Serialize}; +use tokio::sync::oneshot; use tokio::task::JoinHandle; use tokio::time::sleep; use crate::errors::*; use crate::settings::Settings; use crate::AppData; +use crate::V1_API_ROUTES; #[async_trait::async_trait] -trait SurveyClientTrait { - async fn start_job(&self, data: AppData) -> ServiceResult>; - async fn register(&self, data: &AppData) -> ServiceResult<()>; +pub trait SurveyClientTrait { + async fn start_job(&self) -> ServiceResult<(oneshot::Sender<()>, JoinHandle<()>)>; + async fn schedule_upload_job(&self) -> ServiceResult<()>; + async fn is_online(&self) -> ServiceResult; + async fn register(&self) -> ServiceResult<()>; } #[derive(Clone, Debug, Default)] @@ -46,60 +50,145 @@ impl SecretsStore { } } -struct Survey { - settings: Settings, +#[derive(Clone)] +pub struct Survey { client: Client, - secrets: SecretsStore, + app_ctx: AppData, } impl Survey { - fn new(settings: Settings, secrets: SecretsStore) -> Self { + pub fn new(app_ctx: AppData) -> Self { + if app_ctx.settings.survey.is_none() { + panic!("Survey uploader shouldn't be initialized it isn't configured, please report this bug") + } Survey { client: Client::new(), - settings, - secrets, + app_ctx, } } } #[async_trait::async_trait] impl SurveyClientTrait for Survey { - async fn start_job(&self, data: AppData) -> ServiceResult> { + async fn start_job(&self) -> ServiceResult<(oneshot::Sender<()>, JoinHandle<()>)> { + fn can_run(rx: &mut oneshot::Receiver<()>) -> bool { + match rx.try_recv() { + Err(oneshot::error::TryRecvError::Empty) => true, + _ => false, + } + } + + let (tx, mut rx) = oneshot::channel(); + let this = self.clone(); + let mut register = false; let fut = async move { loop { - sleep(Duration::new(data.settings.survey.rate_limit, 0)).await; - // if let Err(e) = Self::delete_demo_user(&data).await { - // log::error!("Error while deleting demo user: {:?}", e); - // } - // if let Err(e) = Self::register_demo_user(&data).await { - // log::error!("Error while registering demo user: {:?}", e); - // } + if !can_run(&mut rx) { + log::info!("Stopping survey uploads"); + break; + } + + if !register { + loop { + if this.is_online().await.unwrap() { + this.register().await.unwrap(); + register = true; + break; + } else { + sleep(Duration::new(1, 0)).await; + } + } + } + + for i in 0..this.app_ctx.settings.survey.as_ref().unwrap().rate_limit { + if !can_run(&mut rx) { + log::info!("Stopping survey uploads"); + break; + } + sleep(Duration::new(1, 0)).await; + } + let _ = this.schedule_upload_job().await; + + // for url in this.app_ctx.settings.survey.as_ref().unwrap().nodes.iter() { + // if !can_run(&mut rx) { + // log::info!("Stopping survey uploads"); + // break; + // } + // log::info!("Uploading to survey instance {}", url); + // } } }; let handle = tokio::spawn(fut); - Ok(handle) + Ok((tx, handle)) } - async fn register(&self, data: &AppData) -> ServiceResult<()> { - let protocol = if self.settings.server.proxy_has_tls { - "https://" - } else { - "http://" - }; + async fn is_online(&self) -> ServiceResult { + let res = self + .client + .get(format!( + "http://{}{}", + self.app_ctx.settings.server.get_ip(), + V1_API_ROUTES.meta.health + )) + .send() + .await + .unwrap(); + Ok(res.status() == 200) + } + + async fn schedule_upload_job(&self) -> ServiceResult<()> { + log::debug!("Running upload job"); + #[derive(Serialize)] + struct Secret { + secret: String, + } + let mut page = 0; + loop { + let psuedo_ids = self.app_ctx.db.analytics_get_all_psuedo_ids(page).await?; + if psuedo_ids.is_empty() { + log::debug!("upload job complete, no more IDs to upload"); + break; + } + for id in psuedo_ids { + for url in self.app_ctx.settings.survey.as_ref().unwrap().nodes.iter() { + if let Some(secret) = self.app_ctx.survey_secrets.get(url.as_str()) { + let payload = Secret { secret }; + + log::info!("Uploading to survey instance {} campaign {id}", url); + let mut url = url.clone(); + url.set_path(&format!("/mcaptcha/api/v1/{id}/upload")); + let resp = + self.client.post(url).json(&payload).send().await.unwrap(); + println!("{}", resp.text().await.unwrap()); + } + } + } + page += 1; + } + Ok(()) + } + + async fn register(&self) -> ServiceResult<()> { #[derive(Serialize)] struct MCaptchaInstance { url: url::Url, - secret: String, + auth_token: String, } - let this_instance_url = - url::Url::parse(&format!("{protocol}{}", self.settings.server.domain))?; - for url in self.settings.survey.nodes.iter() { + let this_instance_url = self + .app_ctx + .settings + .survey + .as_ref() + .unwrap() + .instance_root_url + .clone(); + for url in self.app_ctx.settings.survey.as_ref().unwrap().nodes.iter() { // mCaptcha/survey must send this token while uploading secret to authenticate itself // this token must be sent to mCaptcha/survey with the registration payload let secret_upload_auth_token = crate::api::v1::mcaptcha::get_random(20); let payload = MCaptchaInstance { url: this_instance_url.clone(), - secret: secret_upload_auth_token.clone(), + auth_token: secret_upload_auth_token.clone(), }; // SecretsStore will store auth tokens generated by both mCaptcha/mCaptcha and @@ -108,13 +197,12 @@ impl SurveyClientTrait for Survey { // Storage schema: // - mCaptcha/mCaptcha generated auth token: (, ) // - mCaptcha/survey generated auth token (,