Skip to content

Commit

Permalink
feat: 初步完成支持把中间数据转化到sqlite数据库功能 #138
Browse files Browse the repository at this point in the history
  • Loading branch information
heqingpan committed Oct 19, 2024
1 parent 8f183e4 commit 57097eb
Show file tree
Hide file tree
Showing 14 changed files with 375 additions and 98 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ tracing-subscriber = { version = "0.3.0", features = ["env-filter"] }
# reqwest = { version = "0.11", features = ["json"], default-features = false }
async-raft-ext = "0.6.3"
thiserror = "1.0.20"
clap = { version = "4.3", features = ["derive"] }
clap = { version = "4.5", features = ["derive"] }

#inject
bean_factory = "0.1.4"
Expand Down
25 changes: 25 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use clap::{Args, Parser, Subcommand, ValueEnum};

/// A fictional versioning CLI
#[derive(Debug, Parser)] // requires `derive` feature
#[command(name = "git")]
#[command(about = "rnacos cli", long_about = None)]
pub struct Cli {
#[command(subcommand)]
pub command: Option<Commands>,
/// env file path
#[arg(short, long, default_value = "")]
pub env_file: String,
}

#[derive(Debug, Subcommand)]
pub enum Commands {
/// transfer middle data to sqlite
#[command(arg_required_else_help = true)]
DataToSqlite {
/// the transfer middle data file
file: String,
/// out to sqlite db file
out: String,
},
}
2 changes: 0 additions & 2 deletions src/common/constant.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use std::sync::Arc;

pub const APP_VERSION: &str = "0.6.1";

pub const EMPTY_STR: &str = "";

pub const HTTP_METHOD_GET: &str = "GET";
Expand Down
4 changes: 4 additions & 0 deletions src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,3 +276,7 @@ pub fn gen_uuid() -> i64 {

((msb << 32) | lsb) as i64
}

pub fn get_app_version() -> &'static str {
env!("CARGO_PKG_VERSION")
}
10 changes: 5 additions & 5 deletions src/console/user_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,17 @@ use actix_web::{
};
use serde::{Deserialize, Serialize};

use super::model::user_model::{UpdateUserInfoParam, UserInfo, UserPageParams, UserPermissions};
use crate::common::get_app_version;
use crate::{
common::{
appdata::AppShareData,
constant::{APP_VERSION, EMPTY_STR},
constant::EMPTY_STR,
model::{ApiResult, PageResultOld, UserSession},
},
user::{model::UserDto, permission::UserRole, UserManagerReq, UserManagerResult},
};

use super::model::user_model::{UpdateUserInfoParam, UserInfo, UserPageParams, UserPermissions};

#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ResetPasswordParam {
Expand Down Expand Up @@ -51,7 +51,7 @@ pub async fn get_user_web_resources(req: HttpRequest) -> actix_web::Result<impl
let data = UserPermissions {
resources,
from: EMPTY_STR,
version: APP_VERSION,
version: get_app_version(),
username: Some(session.username.clone()),
};
Ok(HttpResponse::Ok().json(ApiResult::success(Some(data))))
Expand All @@ -60,7 +60,7 @@ pub async fn get_user_web_resources(req: HttpRequest) -> actix_web::Result<impl
let data = UserPermissions {
resources,
from: "OLD_CONSOLE",
version: APP_VERSION,
version: get_app_version(),
username: None,
};
Ok(HttpResponse::Ok().json(ApiResult::success(Some(data))))
Expand Down
40 changes: 23 additions & 17 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
#![allow(unused_imports)]

mod cli;

use actix::Actor;
use actix_web::{web::Data, App};
use async_raft_ext::raft::ClientWriteRequest;
use async_raft_ext::{Config, Raft, RaftStorage};
use rnacos::common::AppSysConfig;
use rnacos::common::{get_app_version, AppSysConfig};
use rnacos::config::core::{ConfigActor, ConfigCmd};
use rnacos::console::middle::login_middle::CheckLogin;
use rnacos::grpc::bistream_manage::BiStreamManage;
Expand Down Expand Up @@ -34,38 +36,34 @@ use clap::Parser;
use env_logger::TimestampPrecision;
use env_logger_timezone_fmt::{TimeZoneFormat, TimeZoneFormatEnv};
//use mimalloc::MiMalloc;
use crate::cli::{Cli, Commands};
use rnacos::common::appdata::AppShareData;
use rnacos::common::constant::APP_VERSION;
use rnacos::openapi::middle::auth_middle::ApiCheckAuth;
use rnacos::raft::NacosRaft;
use rnacos::transfer::data_to_sqlite::data_to_sqlite;
use rnacos::web_config::{app_config, console_config};

//#[global_allocator]
//static GLOBAL: MiMalloc = MiMalloc;

#[derive(Parser, Clone, Debug)]
#[clap(author, version, about, long_about = None)]
pub struct AppOpt {
/// env file path
#[arg(short, long, default_value = "")]
pub env_file: String,
}

#[actix_web::main]
async fn main() -> Result<(), Box<dyn Error>> {
init_env();
let cli_opt = cli::Cli::parse();
init_env(&cli_opt.env_file);
let rust_log = std::env::var("RUST_LOG").unwrap_or("info".to_owned());
println!("version:{}, RUST_LOG:{}", APP_VERSION, &rust_log);
std::env::set_var("RUST_LOG", &rust_log);
let sys_config = Arc::new(AppSysConfig::init_from_env());
println!("data dir:{}", sys_config.local_db_dir);
let timezone_fmt = Arc::new(TimeZoneFormatEnv::new(
sys_config.gmt_fixed_offset_hours.map(|v| v * 60 * 60),
Some(TimestampPrecision::Micros),
));
env_logger::Builder::from_default_env()
.format(move |buf, record| TimeZoneFormat::new(buf, &timezone_fmt).write(record))
.init();
if let Some(cmd) = cli_opt.command {
return run_subcommand(cmd).await;
}
println!("version:{}, RUST_LOG:{}", get_app_version(), &rust_log);
println!("data dir:{}", sys_config.local_db_dir);
let factory_data = config_factory(sys_config.clone()).await?;
let app_data = build_share_data(factory_data.clone())?;
let http_addr = sys_config.get_http_addr();
Expand Down Expand Up @@ -131,9 +129,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
Ok(())
}

fn init_env() {
let app_opt = AppOpt::parse();
let env_path = app_opt.env_file;
fn init_env(env_path: &str) {
//let env_path = std::env::var("RNACOS_ENV_FILE").unwrap_or_default();
if env_path.is_empty() {
dotenv::dotenv().ok();
Expand All @@ -142,6 +138,16 @@ fn init_env() {
}
}

async fn run_subcommand(commands: Commands) -> Result<(), Box<dyn Error>> {
match commands {
Commands::DataToSqlite { file, out } => {
log::info!("middle data to sqlite, from:{file} to:{out}");
data_to_sqlite(&file, &out).await?;
}
}
Ok(())
}

async fn run_console_web(source_app_data: Arc<AppShareData>) {
let http_console_addr = source_app_data.sys_config.get_http_console_addr();
log::info!("new console server http addr:{}", &http_console_addr);
Expand Down
206 changes: 206 additions & 0 deletions src/transfer/data_to_sqlite.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
use crate::common::constant::{CONFIG_TREE_NAME, NAMESPACE_TREE_NAME, USER_TREE_NAME};
use crate::config::core::{ConfigKey, ConfigValue};
use crate::config::model::ConfigValueDO;
use crate::namespace::model::NamespaceDO;
use crate::transfer::model::TransferRecordRef;
use crate::transfer::reader::{reader_transfer_record, TransferFileReader};
use crate::transfer::sqlite::dao::config::{ConfigDO, ConfigDao};
use crate::transfer::sqlite::dao::config_history::{ConfigHistoryDO, ConfigHistoryDao};
use crate::transfer::sqlite::dao::tenant::{TenantDO, TenantDao};
use crate::transfer::sqlite::dao::user::{UserDO, UserDao};
use crate::user::model::UserDo;
use rusqlite::Connection;

#[derive(Debug, Default)]
pub struct TableSeq {
pub(crate) config_id: i64,
pub(crate) config_history_id: i64,
pub(crate) tenant_id: i64,
pub(crate) user_id: i64,
}

impl TableSeq {
pub fn next_config_id(&mut self) -> i64 {
self.config_id += 1;
self.config_id
}

pub fn next_config_history_id(&mut self) -> i64 {
self.config_history_id += 1;
self.config_history_id
}

pub fn next_tenant_id(&mut self) -> i64 {
self.tenant_id += 1;
self.tenant_id
}
pub fn next_user_id(&mut self) -> i64 {
self.user_id += 1;
self.user_id
}
}

pub async fn data_to_sqlite(data_file: &str, db_path: &str) -> anyhow::Result<()> {
let mut file_reader = TransferFileReader::new(data_file).await?;
let conn = open_init_db(db_path)?;
let mut config_count = 0;
let mut tenant_count = 0;
let mut user_count = 0;
let mut ignore = 0;
let mut table_seq = TableSeq::default();
let config_dao = ConfigDao::new(&conn);
let config_history_dao = ConfigHistoryDao::new(&conn);
let user_dao = UserDao::new(&conn);
let tenant_dao = TenantDao::new(&conn);
while let Ok(Some(vec)) = file_reader.read_record_vec().await {
let record = reader_transfer_record(&vec, &file_reader.header)?;
if record.table_name.as_str() == CONFIG_TREE_NAME.as_str() {
config_count += 1;
insert_config(&mut table_seq, &config_dao, &config_history_dao, record)?;
} else if record.table_name.as_str() == NAMESPACE_TREE_NAME.as_str() {
tenant_count += 1;
insert_namespace(&mut table_seq, &tenant_dao, record)?
} else if record.table_name.as_str() == USER_TREE_NAME.as_str() {
user_count += 1;
insert_user(&mut table_seq, &user_dao, record)?
} else {
ignore += 1;
}
}
log::info!(
"transfer to sqlite db finished,config count:{},tenant count:{},use count:{},ignore count:{}",
config_count,
tenant_count,
user_count,
ignore
);
Ok(())
}

fn insert_config(
table_seq: &mut TableSeq,
config_dao: &ConfigDao<'_>,
config_history_dao: &ConfigHistoryDao<'_>,
record: TransferRecordRef<'_>,
) -> anyhow::Result<()> {
let value_do = ConfigValueDO::from_bytes(&record.value)?;
let key = String::from_utf8_lossy(&record.key).to_string();
let key: ConfigKey = (&key as &str).into();
let config_value: ConfigValue = value_do.into();
let config_do = ConfigDO {
id: Some(table_seq.next_config_id()),
data_id: Some(key.data_id.clone()),
group_id: Some(key.group.clone()),
tenant_id: Some(key.tenant.clone()),
content: Some(config_value.content.clone()),
config_type: config_value.config_type.clone(),
config_desc: config_value.desc,
last_time: Some(config_value.last_modified),
};
config_dao.insert(&config_do)?;
for history_item in config_value.histories {
let history = ConfigHistoryDO {
id: Some(table_seq.next_config_history_id()),
data_id: Some(key.data_id.clone()),
group_id: Some(key.group.clone()),
tenant_id: Some(key.tenant.clone()),
content: Some(history_item.content.clone()),
config_type: None,
config_desc: None,
op_user: history_item.op_user.clone(),
last_time: Some(history_item.modified_time),
};
config_history_dao.insert(&history)?;
}
Ok(())
}

fn insert_namespace(
table_seq: &mut TableSeq,
tenant_dao: &TenantDao<'_>,
record: TransferRecordRef<'_>,
) -> anyhow::Result<()> {
let value_do: NamespaceDO = NamespaceDO::from_bytes(&record.value)?;
let tenant_do = TenantDO {
id: Some(table_seq.next_tenant_id()),
tenant_id: value_do.namespace_id,
tenant_name: value_do.namespace_name,
tenant_desc: None,
create_flag: None,
};
tenant_dao.insert(&tenant_do)?;
Ok(())
}

fn insert_user(
table_seq: &mut TableSeq,
user_dao: &UserDao<'_>,
record: TransferRecordRef<'_>,
) -> anyhow::Result<()> {
let value_do = UserDo::from_bytes(&record.value)?;
let user_do = UserDO {
id: Some(table_seq.next_user_id()),
username: Some(value_do.username),
nickname: Some(value_do.nickname),
password_hash: value_do.password_hash,
gmt_create: Some(value_do.gmt_create as i64),
gmt_modified: Some(value_do.gmt_modified as i64),
enabled: Some(value_do.enable.to_string()),
roles: Some(serde_json::to_string(&value_do.roles)?),
extend_info: Some(serde_json::to_string(&value_do.extend_info)?),
};
user_dao.insert(&user_do)?;
Ok(())
}

pub fn open_init_db(db_path: &str) -> anyhow::Result<Connection> {
let conn = Connection::open(db_path)?;
let create_table_sql = r"
create table if not exists tb_config(
id integer primary key autoincrement,
data_id text,
group_id text,
tenant_id text,
content text,
config_type text,
config_desc text,
last_time long
);
create index if not exists tb_config_key_idx on tb_config(data_id,group_id,tenant_id);
create table if not exists tb_config_history(
id integer primary key autoincrement,
data_id text,
group_id text,
tenant_id text,
content text,
config_type text,
config_desc text,
op_user text,
last_time long
);
create index if not exists tb_config_history_key_idx on tb_config_history(data_id,group_id,tenant_id);
create table if not exists tb_tenant(
id integer primary key autoincrement,
tenant_id text,
tenant_name text,
tenant_desc text,
create_flag integer
);
create table if not exists tb_user(
id integer primary key autoincrement,
username text,
nickname text,
password_hash text,
gmt_create integer,
gmt_modified integer,
enabled text,
roles text,
extend_info text
);
";
conn.execute_batch(create_table_sql)?;
Ok(conn)
}
1 change: 1 addition & 0 deletions src/transfer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod data_to_sqlite;
pub mod model;
pub mod reader;
pub mod sqlite;
Expand Down
Loading

0 comments on commit 57097eb

Please sign in to comment.