|
@@ -14,25 +14,32 @@
|
|
|
// You should have received a copy of the GNU Affero General Public License
|
|
|
// along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
|
|
-use std::{future::IntoFuture, net::SocketAddr};
|
|
|
+use std::{future::IntoFuture, net::SocketAddr, sync::Arc};
|
|
|
|
|
|
use anyhow::Result;
|
|
|
+use futures::TryFutureExt;
|
|
|
use tokio::net::TcpListener;
|
|
|
+use tracing::info;
|
|
|
|
|
|
use crate::{
|
|
|
api::{metrics_router, router, user_count, Counters},
|
|
|
config,
|
|
|
+ distributed::{
|
|
|
+ cluster::Cluster,
|
|
|
+ member::{Member, Service},
|
|
|
+ sonic::{self, service::sonic_service},
|
|
|
+ },
|
|
|
+ inverted_index::KeyPhrase,
|
|
|
metrics::Label,
|
|
|
+ searcher::{DistributedSearcher, SearchClient},
|
|
|
};
|
|
|
|
|
|
-pub async fn run(config: config::ApiConfig) -> Result<()> {
|
|
|
+fn counters(registry: &mut crate::metrics::PrometheusRegistry) -> Result<Counters> {
|
|
|
let search_counter_success = crate::metrics::Counter::default();
|
|
|
let search_counter_fail = crate::metrics::Counter::default();
|
|
|
let explore_counter = crate::metrics::Counter::default();
|
|
|
let daily_active_users = user_count::UserCount::new()?;
|
|
|
|
|
|
- let mut registry = crate::metrics::PrometheusRegistry::default();
|
|
|
-
|
|
|
let group = registry
|
|
|
.new_group(
|
|
|
"stract_search_requests".to_string(),
|
|
@@ -71,14 +78,85 @@ pub async fn run(config: config::ApiConfig) -> Result<()> {
|
|
|
.unwrap();
|
|
|
group.register(daily_active_users.metric(), vec![]);
|
|
|
|
|
|
- let counters = Counters {
|
|
|
+ Ok(Counters {
|
|
|
search_counter_success,
|
|
|
search_counter_fail,
|
|
|
explore_counter,
|
|
|
daily_active_users,
|
|
|
- };
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
+async fn cluster(config: &config::ApiConfig) -> Result<Cluster> {
|
|
|
+ Cluster::join(
|
|
|
+ Member {
|
|
|
+ id: config.cluster_id.clone(),
|
|
|
+ service: Service::Api { host: config.host },
|
|
|
+ },
|
|
|
+ config.gossip_addr,
|
|
|
+ config.gossip_seed_nodes.clone().unwrap_or_default(),
|
|
|
+ )
|
|
|
+ .await
|
|
|
+}
|
|
|
+
|
|
|
+pub struct ManagementService {
|
|
|
+ cluster: Arc<Cluster>,
|
|
|
+ searcher: DistributedSearcher,
|
|
|
+}
|
|
|
+sonic_service!(ManagementService, [TopKeyphrases, ClusterStatus]);
|
|
|
+
|
|
|
+impl ManagementService {
|
|
|
+ pub async fn new(cluster: Arc<Cluster>) -> Result<Self> {
|
|
|
+ let searcher = DistributedSearcher::new(Arc::clone(&cluster)).await;
|
|
|
+ Ok(ManagementService { cluster, searcher })
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+#[derive(Debug, Clone, bincode::Encode, bincode::Decode)]
|
|
|
+pub struct TopKeyphrases {
|
|
|
+ pub top: usize,
|
|
|
+}
|
|
|
+impl sonic::service::Message<ManagementService> for TopKeyphrases {
|
|
|
+ type Response = Vec<KeyPhrase>;
|
|
|
+ async fn handle(self, server: &ManagementService) -> Self::Response {
|
|
|
+ server.searcher.top_key_phrases(self.top).await
|
|
|
+ }
|
|
|
+}
|
|
|
|
|
|
- let app = router(&config, counters).await?;
|
|
|
+#[derive(Debug, Clone, bincode::Encode, bincode::Decode)]
|
|
|
+pub struct Status {
|
|
|
+ pub members: Vec<Member>,
|
|
|
+}
|
|
|
+
|
|
|
+#[derive(Debug, Clone, bincode::Encode, bincode::Decode)]
|
|
|
+pub struct ClusterStatus;
|
|
|
+impl sonic::service::Message<ManagementService> for ClusterStatus {
|
|
|
+ type Response = Status;
|
|
|
+ async fn handle(self, server: &ManagementService) -> Self::Response {
|
|
|
+ Status {
|
|
|
+ members: server.cluster.members().await,
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+async fn run_management(addr: SocketAddr, cluster: Arc<Cluster>) -> Result<()> {
|
|
|
+ let server = ManagementService::new(cluster).await?.bind(addr).await?;
|
|
|
+
|
|
|
+ info!("search server is ready to accept requests on {}", addr);
|
|
|
+
|
|
|
+ loop {
|
|
|
+ if let Err(e) = server.accept().await {
|
|
|
+ tracing::error!("{:?}", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+pub async fn run(config: config::ApiConfig) -> Result<()> {
|
|
|
+ let mut registry = crate::metrics::PrometheusRegistry::default();
|
|
|
+ let counters = counters(&mut registry)?;
|
|
|
+
|
|
|
+ let cluster = Arc::new(cluster(&config).await?);
|
|
|
+
|
|
|
+ let app = router(&config, counters, cluster.clone()).await?;
|
|
|
let metrics_app = metrics_router(registry);
|
|
|
|
|
|
let addr = config.host;
|
|
@@ -87,7 +165,8 @@ pub async fn run(config: config::ApiConfig) -> Result<()> {
|
|
|
TcpListener::bind(&addr).await.unwrap(),
|
|
|
app.into_make_service_with_connect_info::<SocketAddr>(),
|
|
|
)
|
|
|
- .into_future();
|
|
|
+ .into_future()
|
|
|
+ .map_err(|e| anyhow::anyhow!(e));
|
|
|
|
|
|
let addr = config.prometheus_host;
|
|
|
tracing::info!("prometheus exporter listening on {}", addr);
|
|
@@ -95,9 +174,17 @@ pub async fn run(config: config::ApiConfig) -> Result<()> {
|
|
|
TcpListener::bind(&addr).await.unwrap(),
|
|
|
metrics_app.into_make_service(),
|
|
|
)
|
|
|
- .into_future();
|
|
|
+ .into_future()
|
|
|
+ .map_err(|e| e.into());
|
|
|
+
|
|
|
+ let management = tokio::spawn(async move {
|
|
|
+ run_management(config.management_host, cluster)
|
|
|
+ .await
|
|
|
+ .unwrap();
|
|
|
+ })
|
|
|
+ .map_err(|e| e.into());
|
|
|
|
|
|
- tokio::try_join!(server, metrics_server)?;
|
|
|
+ tokio::try_join!(server, metrics_server, management)?;
|
|
|
|
|
|
Ok(())
|
|
|
}
|