feat: schedule mCaptcha/survey registration and uploads

This commit is contained in:
Aravinth Manivannan 2023-10-20 01:47:32 +05:30
parent 74364c4e17
commit 960283324d
No known key found for this signature in database
GPG key ID: F8F50389936984FF
3 changed files with 146 additions and 41 deletions

View file

@ -67,7 +67,7 @@ port = 10025
username = "admin"
password = "password"
[survey]
nodes = ["http://localhost:7001"]
rate_limit = 10 # upload every hour
instance_root_url = "http://localhost:7000"
#[survey]
#nodes = ["http://localhost:7001"]
#rate_limit = 10 # upload every hour
#instance_root_url = "http://localhost:7000"

View file

@ -46,6 +46,7 @@ use static_assets::FileMap;
pub use widget::WIDGET_ROUTES;
use crate::demo::DemoUser;
use survey::SurveyClientTrait;
lazy_static! {
pub static ref SETTINGS: Settings = Settings::new().unwrap();
@ -106,7 +107,7 @@ async fn main() -> std::io::Result<()> {
let settings = Settings::new().unwrap();
let secrets = survey::SecretsStore::default();
let data = Data::new(&settings, secrets).await;
let data = Data::new(&settings, secrets.clone()).await;
let data = actix_web::web::Data::new(data);
let mut demo_user: Option<DemoUser> = None;
@ -119,6 +120,13 @@ async fn main() -> std::io::Result<()> {
);
}
let (mut survey_upload_tx, mut survey_upload_handle) = (None, None);
if settings.survey.is_some() {
let survey_runner_ctx = survey::Survey::new(data.clone());
let (x, y) = survey_runner_ctx.start_job().await.unwrap();
(survey_upload_tx, survey_upload_handle) = (Some(x), Some(y));
}
let ip = settings.server.get_ip();
println!("Starting server on: http://{ip}");
@ -143,9 +151,18 @@ async fn main() -> std::io::Result<()> {
.run()
.await?;
if let Some(survey_upload_tx) = survey_upload_tx {
survey_upload_tx.send(()).unwrap();
}
if let Some(demo_user) = demo_user {
demo_user.abort();
}
if let Some(survey_upload_handle) = survey_upload_handle {
survey_upload_handle.await.unwrap();
}
Ok(())
}

View file

@ -9,17 +9,21 @@ use std::time::Duration;
use reqwest::Client;
use serde::{Deserialize, Serialize};
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use tokio::time::sleep;
use crate::errors::*;
use crate::settings::Settings;
use crate::AppData;
use crate::V1_API_ROUTES;
#[async_trait::async_trait]
trait SurveyClientTrait {
async fn start_job(&self, data: AppData) -> ServiceResult<JoinHandle<()>>;
async fn register(&self, data: &AppData) -> ServiceResult<()>;
pub trait SurveyClientTrait {
async fn start_job(&self) -> ServiceResult<(oneshot::Sender<()>, JoinHandle<()>)>;
async fn schedule_upload_job(&self) -> ServiceResult<()>;
async fn is_online(&self) -> ServiceResult<bool>;
async fn register(&self) -> ServiceResult<()>;
}
#[derive(Clone, Debug, Default)]
@ -46,60 +50,145 @@ impl SecretsStore {
}
}
struct Survey {
settings: Settings,
#[derive(Clone)]
pub struct Survey {
client: Client,
secrets: SecretsStore,
app_ctx: AppData,
}
impl Survey {
fn new(settings: Settings, secrets: SecretsStore) -> Self {
pub fn new(app_ctx: AppData) -> Self {
if app_ctx.settings.survey.is_none() {
panic!("Survey uploader shouldn't be initialized it isn't configured, please report this bug")
}
Survey {
client: Client::new(),
settings,
secrets,
app_ctx,
}
}
}
#[async_trait::async_trait]
impl SurveyClientTrait for Survey {
async fn start_job(&self, data: AppData) -> ServiceResult<JoinHandle<()>> {
async fn start_job(&self) -> ServiceResult<(oneshot::Sender<()>, JoinHandle<()>)> {
fn can_run(rx: &mut oneshot::Receiver<()>) -> bool {
match rx.try_recv() {
Err(oneshot::error::TryRecvError::Empty) => true,
_ => false,
}
}
let (tx, mut rx) = oneshot::channel();
let this = self.clone();
let mut register = false;
let fut = async move {
loop {
sleep(Duration::new(data.settings.survey.rate_limit, 0)).await;
// if let Err(e) = Self::delete_demo_user(&data).await {
// log::error!("Error while deleting demo user: {:?}", e);
// }
// if let Err(e) = Self::register_demo_user(&data).await {
// log::error!("Error while registering demo user: {:?}", e);
// }
if !can_run(&mut rx) {
log::info!("Stopping survey uploads");
break;
}
if !register {
loop {
if this.is_online().await.unwrap() {
this.register().await.unwrap();
register = true;
break;
} else {
sleep(Duration::new(1, 0)).await;
}
}
}
for i in 0..this.app_ctx.settings.survey.as_ref().unwrap().rate_limit {
if !can_run(&mut rx) {
log::info!("Stopping survey uploads");
break;
}
sleep(Duration::new(1, 0)).await;
}
let _ = this.schedule_upload_job().await;
// for url in this.app_ctx.settings.survey.as_ref().unwrap().nodes.iter() {
// if !can_run(&mut rx) {
// log::info!("Stopping survey uploads");
// break;
// }
// log::info!("Uploading to survey instance {}", url);
// }
}
};
let handle = tokio::spawn(fut);
Ok(handle)
Ok((tx, handle))
}
async fn register(&self, data: &AppData) -> ServiceResult<()> {
let protocol = if self.settings.server.proxy_has_tls {
"https://"
} else {
"http://"
};
async fn is_online(&self) -> ServiceResult<bool> {
let res = self
.client
.get(format!(
"http://{}{}",
self.app_ctx.settings.server.get_ip(),
V1_API_ROUTES.meta.health
))
.send()
.await
.unwrap();
Ok(res.status() == 200)
}
async fn schedule_upload_job(&self) -> ServiceResult<()> {
log::debug!("Running upload job");
#[derive(Serialize)]
struct Secret {
secret: String,
}
let mut page = 0;
loop {
let psuedo_ids = self.app_ctx.db.analytics_get_all_psuedo_ids(page).await?;
if psuedo_ids.is_empty() {
log::debug!("upload job complete, no more IDs to upload");
break;
}
for id in psuedo_ids {
for url in self.app_ctx.settings.survey.as_ref().unwrap().nodes.iter() {
if let Some(secret) = self.app_ctx.survey_secrets.get(url.as_str()) {
let payload = Secret { secret };
log::info!("Uploading to survey instance {} campaign {id}", url);
let mut url = url.clone();
url.set_path(&format!("/mcaptcha/api/v1/{id}/upload"));
let resp =
self.client.post(url).json(&payload).send().await.unwrap();
println!("{}", resp.text().await.unwrap());
}
}
}
page += 1;
}
Ok(())
}
async fn register(&self) -> ServiceResult<()> {
#[derive(Serialize)]
struct MCaptchaInstance {
url: url::Url,
secret: String,
auth_token: String,
}
let this_instance_url =
url::Url::parse(&format!("{protocol}{}", self.settings.server.domain))?;
for url in self.settings.survey.nodes.iter() {
let this_instance_url = self
.app_ctx
.settings
.survey
.as_ref()
.unwrap()
.instance_root_url
.clone();
for url in self.app_ctx.settings.survey.as_ref().unwrap().nodes.iter() {
// mCaptcha/survey must send this token while uploading secret to authenticate itself
// this token must be sent to mCaptcha/survey with the registration payload
let secret_upload_auth_token = crate::api::v1::mcaptcha::get_random(20);
let payload = MCaptchaInstance {
url: this_instance_url.clone(),
secret: secret_upload_auth_token.clone(),
auth_token: secret_upload_auth_token.clone(),
};
// SecretsStore will store auth tokens generated by both mCaptcha/mCaptcha and
@ -108,13 +197,12 @@ impl SurveyClientTrait for Survey {
// Storage schema:
// - mCaptcha/mCaptcha generated auth token: (<auth_token>, <survey_instance_url>)
// - mCaptcha/survey generated auth token (<survey_instance_url>, <auth_token)
self.secrets.set(secret_upload_auth_token, url.to_string());
self.client
.post(url.clone())
.json(&payload)
.send()
.await
.unwrap();
self.app_ctx
.survey_secrets
.set(secret_upload_auth_token, url.to_string());
let mut url = url.clone();
url.set_path("/mcaptcha/api/v1/register");
let resp = self.client.post(url).json(&payload).send().await.unwrap();
}
Ok(())
}