diff --git a/src/console/api.rs b/src/console/api.rs index e589c37b..9d5cf804 100644 --- a/src/console/api.rs +++ b/src/console/api.rs @@ -15,7 +15,9 @@ use crate::common::error_code::NO_PERMISSION; use crate::common::string_utils::StringUtils; use crate::naming::ops::ops_api::query_opt_service_list; use crate::openapi::naming::instance::{del_instance, get_instance, update_instance}; -use crate::openapi::naming::service::{query_service, remove_service, update_service}; +use crate::openapi::naming::service::{ + query_service, query_subscribers_list, remove_service, update_service, +}; use crate::user_namespace_privilege; use actix_web::{http::header, web, HttpMessage, HttpRequest, HttpResponse, Responder}; use uuid::Uuid; @@ -148,6 +150,10 @@ pub fn console_api_config_v1(config: &mut web::ServiceConfig) { .route(web::delete().to(remove_service)) .route(web::get().to(query_service)), ) + .service( + web::resource("/ns/service/subscribers") + .route(web::get().to(query_subscribers_list)), + ) .service( web::resource("/ns/instance") .route(web::get().to(get_instance)) diff --git a/src/naming/core.rs b/src/naming/core.rs index cda70440..35f6aee7 100644 --- a/src/naming/core.rs +++ b/src/naming/core.rs @@ -28,9 +28,9 @@ use super::naming_delay_nofity::DelayNotifyActor; use super::naming_delay_nofity::DelayNotifyCmd; use super::naming_subscriber::NamingListenerItem; use super::naming_subscriber::Subscriber; -use super::service::Service; use super::service::ServiceInfoDto; use super::service::ServiceMetadata; +use super::service::{Service, SubscriberInfoDto}; use super::service_index::NamespaceIndex; use super::service_index::ServiceQueryParam; use super::NamingUtils; @@ -647,6 +647,57 @@ impl NamingActor { (size, service_names) } + pub fn get_subscribers_list( + &self, + page_size: usize, + page_index: usize, + key: &ServiceKey, + ) -> (usize, Vec>) { + let mut ret = Vec::new(); + + let res = self.subscriber.fuzzy_match_listener( + &key.group_name, + &key.service_name, + &key.namespace_id, + ); + + for (service_key, val) in res { + for (ip_port, _) in val { + let parts: Vec<&str> = ip_port.split(':').collect(); + if parts.len() == 2 { + if let Ok(port) = parts[1].parse::() { + let subscriber_info = SubscriberInfoDto { + service_name: service_key.service_name.clone(), + group_name: service_key.group_name.clone(), + namespace_id: service_key.namespace_id.clone(), + ip: Arc::new(parts[0].to_string()), + port, + }; + + ret.push(Arc::new(subscriber_info)); + } + } + } + } + + let total = ret.len(); + let start = (page_index - 1) * page_size; + ret.sort_by(|a, b| { + a.service_name + .cmp(&b.service_name) + .then(a.group_name.cmp(&b.group_name)) + .then(a.ip.cmp(&b.ip)) + .then(a.port.cmp(&b.port)) + }); + let paginated_result = ret + .into_iter() + .skip(start) + .take(page_size) + .collect::>(); + + (total, paginated_result) + } + pub fn get_service_info_page(&self, param: ServiceQueryParam) -> (usize, Vec) { let (size, list) = self.namespace_index.query_service_page(¶m); @@ -855,6 +906,7 @@ pub enum NamingCmd { QueryListString(ServiceKey, String, bool, Option), QueryServiceInfo(ServiceKey, String, bool), QueryServicePage(ServiceKey, usize, usize), + QueryServiceSubscribersPage(ServiceKey, usize, usize), //查询服务实际信息列表 QueryServiceInfoPage(ServiceQueryParam), //CreateService(ServiceDetailDto), @@ -881,6 +933,7 @@ pub enum NamingResult { InstanceListString(String), ServiceInfo(ServiceInfo), ServicePage((usize, Vec>)), + ServiceSubscribersPage((usize, Vec>)), ServiceInfoPage((usize, Vec)), ClientInstanceCount(Vec<(Arc, usize)>), RewriteToCluster(u64, Instance), @@ -975,6 +1028,11 @@ impl Handler for NamingActor { &service_key, ))) } + NamingCmd::QueryServiceSubscribersPage(service_key, page_size, page_index) => { + Ok(NamingResult::ServiceSubscribersPage( + self.get_subscribers_list(page_size, page_index, &service_key), + )) + } NamingCmd::QueryServiceInfoPage(param) => Ok(NamingResult::ServiceInfoPage( self.get_service_info_page(param), )), @@ -1127,8 +1185,6 @@ async fn query_healthy_instances() { #[test] fn test_add_service() { - use super::*; - use tokio::net::UdpSocket; let mut naming = NamingActor::new(); let service_key = ServiceKey::new("1", "1", "1"); let service_info = ServiceDetailDto { @@ -1147,8 +1203,6 @@ fn test_add_service() { #[test] fn test_remove_has_instance_service() { - use super::*; - use tokio::net::UdpSocket; let mut naming = NamingActor::new(); let mut instance = Instance::new("127.0.0.1".to_owned(), 8080); instance.namespace_id = Arc::new("public".to_owned()); diff --git a/src/naming/naming_subscriber.rs b/src/naming/naming_subscriber.rs index d4041250..def1217f 100644 --- a/src/naming/naming_subscriber.rs +++ b/src/naming/naming_subscriber.rs @@ -8,7 +8,7 @@ use std::{ use actix::prelude::*; use super::{ - model::{Instance, ServiceInfo, ServiceKey}, + model::ServiceKey, naming_delay_nofity::{DelayNotifyActor, DelayNotifyCmd}, }; @@ -185,4 +185,21 @@ impl Subscriber { } sum } + + pub fn fuzzy_match_listener( + &self, + group_name: &str, + service_name: &str, + namespace_id: &str, + ) -> HashMap, Option>>> { + self.listener + .iter() + .filter(|(key, _)| { + key.group_name.contains(group_name) + && key.service_name.contains(service_name) + && key.namespace_id.contains(namespace_id) + }) // 模糊匹配 + .map(|(key, value)| (key.clone(), value.clone())) + .collect() + } } diff --git a/src/naming/service.rs b/src/naming/service.rs index 544d7b8f..5cb61c2c 100644 --- a/src/naming/service.rs +++ b/src/naming/service.rs @@ -8,10 +8,10 @@ use std::{ use crate::common::constant::EMPTY_ARC_STRING; use crate::naming::cluster::model::ProcessRange; +use crate::now_millis; use actix_web::rt; use inner_mem_cache::TimeoutSet; - -use crate::now_millis; +use serde::{Deserialize, Serialize}; use super::{ api_model::QueryListResult, @@ -389,3 +389,13 @@ pub struct ServiceInfoDto { pub metadata: Option>>, pub protect_threshold: Option, } + +#[derive(Debug, Default, Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct SubscriberInfoDto { + pub service_name: Arc, + pub group_name: Arc, + pub namespace_id: Arc, + pub ip: Arc, + pub port: u16, +} diff --git a/src/openapi/naming/model.rs b/src/openapi/naming/model.rs index da4d197d..4896a687 100644 --- a/src/openapi/naming/model.rs +++ b/src/openapi/naming/model.rs @@ -1,8 +1,9 @@ #![allow(unused_imports, unused_assignments, unused_variables)] use crate::common::option_utils::OptionUtils; use crate::naming::model::{Instance, ServiceKey}; +use crate::naming::service::SubscriberInfoDto; use crate::naming::NamingUtils; -use crate::utils::{get_bool_from_string, select_option_by_clone}; +use crate::utils::get_bool_from_string; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::net::SocketAddr; @@ -351,3 +352,9 @@ pub struct ServiceQueryListResponce { pub count: usize, pub doms: Vec>, } + +#[derive(Debug, Serialize, Deserialize, Default)] +pub struct ServiceQuerySubscribersListResponce { + pub count: usize, + pub subscribers: Vec>, +} diff --git a/src/openapi/naming/service.rs b/src/openapi/naming/service.rs index c19175fd..2b0a4f40 100644 --- a/src/openapi/naming/service.rs +++ b/src/openapi/naming/service.rs @@ -1,13 +1,15 @@ -use actix::Addr; -use actix_web::{web, HttpResponse, Responder, Scope}; - use crate::merge_web_param; use crate::naming::api_model::ServiceInfoParam; use crate::naming::core::{NamingActor, NamingCmd, NamingResult}; use crate::naming::model::ServiceKey; use crate::naming::NamingUtils; use crate::openapi::constant::EMPTY; -use crate::openapi::naming::model::{ServiceQueryListRequest, ServiceQueryListResponce}; +use crate::openapi::naming::model::{ + ServiceQueryListRequest, ServiceQueryListResponce, ServiceQuerySubscribersListResponce, +}; +use actix::Addr; +use actix_web::http::header; +use actix_web::{web, HttpResponse, Responder, Scope}; pub(super) fn service() -> Scope { web::scope("/service") @@ -19,6 +21,7 @@ pub(super) fn service() -> Scope { .route(web::get().to(query_service)), ) .service(web::resource("/list").route(web::get().to(query_service_list))) + .service(web::resource("/subscribers").route(web::get().to(query_subscribers_list))) } pub async fn query_service( @@ -107,3 +110,55 @@ pub async fn query_service_list( Err(_) => HttpResponse::InternalServerError().body("error"), } } + +pub async fn query_subscribers_list( + param: web::Query, + naming_addr: web::Data>, +) -> impl Responder { + let page_size = param.page_size.unwrap_or(0x7fffffff); + let page_index = param.page_no.unwrap_or(1); + let namespace_id = NamingUtils::default_namespace( + param + .namespace_id + .as_ref() + .unwrap_or(&"".to_owned()) + .to_owned(), + ); + let group = NamingUtils::default_group( + param + .group_name + .as_ref() + .unwrap_or(&"".to_owned()) + .to_owned(), + ); + let service = param + .service_name + .as_ref() + .unwrap_or(&"".to_owned()) + .to_owned(); + + let key = ServiceKey::new(&namespace_id, &group, &service); + match naming_addr + .send(NamingCmd::QueryServiceSubscribersPage( + key, page_size, page_index, + )) + .await + { + Ok(res) => { + let result: NamingResult = res.unwrap(); + match result { + NamingResult::ServiceSubscribersPage((c, v)) => { + let resp = ServiceQuerySubscribersListResponce { + count: c, + subscribers: v, + }; + HttpResponse::Ok() + .insert_header(header::ContentType(mime::APPLICATION_JSON)) + .body(serde_json::to_string(&resp).unwrap()) + } + _ => HttpResponse::InternalServerError().body("error"), + } + } + Err(_) => HttpResponse::InternalServerError().body("error"), + } +} diff --git a/src/user/permission.rs b/src/user/permission.rs index 23b40c3d..fa497d6a 100644 --- a/src/user/permission.rs +++ b/src/user/permission.rs @@ -5,6 +5,7 @@ /// 2)http请求路径,由后端拦截器控制否支持请求; use std::{collections::HashSet, hash::Hash, sync::Arc}; + use crate::common::constant::{EMPTY_STR, HTTP_METHOD_ALL, HTTP_METHOD_GET}; pub enum Resource { @@ -259,6 +260,7 @@ lazy_static::lazy_static! { //WebResource R::WebResource("/manage/service"), R::WebResource("/manage/service/instance"), + R::WebResource("/manage/subscriber"), R::WebResource("/rnacos/manage/service"), R::WebResource("/rnacos/manage/service/instance"), //path @@ -267,6 +269,7 @@ lazy_static::lazy_static! { R::Path("/rnacos/api/console/ns/services",HTTP_METHOD_GET), R::Path("/rnacos/api/console/ns/service",HTTP_METHOD_GET), + R::Path("/rnacos/api/console/ns/service/subscribers",HTTP_METHOD_GET), R::Path("/rnacos/api/console/instances",HTTP_METHOD_GET), R::Path("/rnacos/api/console/ns/instance",HTTP_METHOD_GET), @@ -279,6 +282,7 @@ lazy_static::lazy_static! { //WebResource R::WebResource("/manage/service"), R::WebResource("/manage/service/instance"), + R::WebResource("/manage/subscriber"), R::WebResource("/rnacos/manage/service"), R::WebResource("/rnacos/manage/service/instance"), R::WebResource("SERVICE_UPDATE"), @@ -288,6 +292,7 @@ lazy_static::lazy_static! { R::Path("/rnacos/api/console/ns/services",HTTP_METHOD_GET), R::Path("/rnacos/api/console/ns/service",HTTP_METHOD_ALL), + R::Path("/rnacos/api/console/ns/service/subscribers",HTTP_METHOD_GET), R::Path("/rnacos/api/console/instances",HTTP_METHOD_GET), R::Path("/rnacos/api/console/ns/instance",HTTP_METHOD_ALL),