diff --git a/src/coordinator/coordinator_control.h b/src/coordinator/coordinator_control.h index 2561580f1..37dadbb25 100644 --- a/src/coordinator/coordinator_control.h +++ b/src/coordinator/coordinator_control.h @@ -318,6 +318,10 @@ class CoordinatorControl : public MetaControl { butil::Status TransferLeaderRegionWithTaskList(int64_t region_id, int64_t new_leader_store_id, bool is_force, pb::coordinator_internal::MetaIncrement &meta_increment); + // create region + butil::Status CreateRegionWithTaskList(std::vector &store_operations, + int64_t new_region_id, pb::coordinator_internal::MetaIncrement &meta_increment); + // create schema // in: tenant_id // in: schema_name diff --git a/src/coordinator/coordinator_control_coor.cc b/src/coordinator/coordinator_control_coor.cc index 289ead356..44c0cd21b 100644 --- a/src/coordinator/coordinator_control_coor.cc +++ b/src/coordinator/coordinator_control_coor.cc @@ -1520,12 +1520,17 @@ butil::Status CoordinatorControl::CreateRegionForSplit(const std::string& region // create region with split_from_region_id & store_ids std::vector store_operations; - return CreateRegionFinal(region_name, region_type, split_from_region.definition().raw_engine(), - split_from_region.definition().store_engine(), resource_tag, store_ids.size(), region_range, - split_from_region.definition().schema_id(), split_from_region.definition().table_id(), - split_from_region.definition().index_id(), split_from_region.definition().part_id(), - split_from_region.definition().tenant_id(), split_from_region.definition().index_parameter(), - store_ids, split_from_region_id, new_region_id, store_operations, meta_increment); + auto ret1 = + CreateRegionFinal(region_name, region_type, split_from_region.definition().raw_engine(), + split_from_region.definition().store_engine(), resource_tag, store_ids.size(), region_range, + split_from_region.definition().schema_id(), split_from_region.definition().table_id(), + split_from_region.definition().index_id(), split_from_region.definition().part_id(), + split_from_region.definition().tenant_id(), split_from_region.definition().index_parameter(), + store_ids, split_from_region_id, new_region_id, store_operations, meta_increment); + if (!ret1.ok()) { + return ret1; + } + return CreateRegionWithTaskList(store_operations, new_region_id, meta_increment); } butil::Status CoordinatorControl::SelectStore(pb::common::StoreType store_type, int32_t replica_num, @@ -2331,21 +2336,22 @@ butil::Status CoordinatorControl::CreateRegionFinal( auto* region_increment_region = region_increment->mutable_region(); *region_increment_region = new_region; + // yjddebug remove this code // add store operations to meta_increment - for (const auto& store_operation : store_operations) { - for (const auto& region_cmd : store_operation.region_cmds()) { - auto ret = AddRegionCmd(store_operation.id(), 0, region_cmd, meta_increment); - if (!ret.ok()) { - DINGO_LOG(ERROR) << "CreateRegion AddRegionCmd failed, store_id=" << store_operation.id() - << ", region_cmd=" << region_cmd.ShortDebugString(); - return ret; - } - DINGO_LOG(INFO) << "AddRegionCmd store_id=" << store_operation.id() - << ", region_cmd=" << region_cmd.ShortDebugString(); - } - - DINGO_LOG(INFO) << "store_operation_increment = " << store_operation.ShortDebugString(); - } + // for (const auto& store_operation : store_operations) { + // for (const auto& region_cmd : store_operation.region_cmds()) { + // auto ret = AddRegionCmd(store_operation.id(), 0, region_cmd, meta_increment); + // if (!ret.ok()) { + // DINGO_LOG(ERROR) << "CreateRegion AddRegionCmd failed, store_id=" << store_operation.id() + // << ", region_cmd=" << region_cmd.ShortDebugString(); + // return ret; + // } + // DINGO_LOG(INFO) << "AddRegionCmd store_id=" << store_operation.id() + // << ", region_cmd=" << region_cmd.ShortDebugString(); + // } + + // DINGO_LOG(INFO) << "store_operation_increment = " << store_operation.ShortDebugString(); + // } return butil::Status::OK(); } @@ -3741,6 +3747,23 @@ butil::Status CoordinatorControl::TransferLeaderRegionWithTaskList( return butil::Status::OK(); } +butil::Status CoordinatorControl::CreateRegionWithTaskList( + std::vector& store_operations, int64_t new_region_id, + pb::coordinator_internal::MetaIncrement& meta_increment) { + // create task list + auto* new_task_list = CreateTaskList(meta_increment, "CreateRegion"); + + // build create_region task + auto* create_region_task = new_task_list->add_tasks(); + create_region_task->set_parallelism(true); + for (const auto& it : store_operations) { + auto* parallel_create_region_task = create_region_task->add_parallel_tasks(); + auto* new_store_operation = parallel_create_region_task->add_store_operations(); + *new_store_operation = it; + } + return butil::Status::OK(); +} + butil::Status CoordinatorControl::ValidateTaskListConflict(int64_t region_id, int64_t second_region_id) { // check task_list conflict butil::FlatMap task_list_map_temp; diff --git a/src/coordinator/coordinator_control_meta.cc b/src/coordinator/coordinator_control_meta.cc index 3161fada3..5f8a6e9a7 100644 --- a/src/coordinator/coordinator_control_meta.cc +++ b/src/coordinator/coordinator_control_meta.cc @@ -755,7 +755,7 @@ butil::Status CoordinatorControl::CreateTable(int64_t schema_id, const pb::meta: << ", table_definition:" << table_definition.ShortDebugString() << " ret: " << ret.error_str(); return ret; } - + CreateRegionWithTaskList(store_operations, new_region_id, meta_increment); DINGO_LOG(INFO) << "CreateTable create region success, region_id=" << new_region_id; new_region_ids.push_back(new_region_id); @@ -1408,7 +1408,7 @@ butil::Status CoordinatorControl::CreateIndex(int64_t schema_id, const pb::meta: DINGO_LOG(ERROR) << "CreateRegion failed in CreateIndex index_name=" << table_definition.name(); return ret; } - + CreateRegionWithTaskList(store_operations, new_region_id, meta_increment); DINGO_LOG(INFO) << "CreateIndex create region success, region_id=" << new_region_id; new_region_ids.push_back(new_region_id); diff --git a/src/server/coordinator_service.cc b/src/server/coordinator_service.cc index dd2e036f8..1cf3954b2 100644 --- a/src/server/coordinator_service.cc +++ b/src/server/coordinator_service.cc @@ -1311,6 +1311,13 @@ void DoCreateRegion(google::protobuf::RpcController * /*controller*/, ret = coordinator_control->CreateRegionFinal( region_name, region_type, raw_engine, store_engine, resource_tag, replica_num, range, schema_id, table_id, index_id, part_id, tenant_id, index_parameter, store_ids, 0, new_region_id, store_operations, meta_increment); + if (!ret.ok()) { + DINGO_LOG(ERROR) << "Create Region Failed, errno=" << ret << " Request:" << request->ShortDebugString(); + response->mutable_error()->set_errcode(static_cast(ret.error_code())); + response->mutable_error()->set_errmsg(ret.error_str()); + return; + } + ret = coordinator_control->CreateRegionWithTaskList(store_operations, new_region_id, meta_increment); } else { // store_ids is empty, will auto select store std::vector store_ids; @@ -1319,6 +1326,13 @@ void DoCreateRegion(google::protobuf::RpcController * /*controller*/, ret = coordinator_control->CreateRegionFinal( region_name, region_type, raw_engine, store_engine, resource_tag, replica_num, range, schema_id, table_id, index_id, part_id, tenant_id, index_parameter, store_ids, 0, new_region_id, store_operations, meta_increment); + if (!ret.ok()) { + DINGO_LOG(ERROR) << "Create Region Failed, errno=" << ret << " Request:" << request->ShortDebugString(); + response->mutable_error()->set_errcode(static_cast(ret.error_code())); + response->mutable_error()->set_errmsg(ret.error_str()); + return; + } + ret = coordinator_control->CreateRegionWithTaskList(store_operations, new_region_id, meta_increment); } if (!ret.ok()) { diff --git a/src/store/heartbeat.cc b/src/store/heartbeat.cc index 6ff6c4541..de292d629 100644 --- a/src/store/heartbeat.cc +++ b/src/store/heartbeat.cc @@ -489,7 +489,8 @@ void CoordinatorPushTask::SendCoordinatorPushToStore(std::shared_ptrTryToSendStoreOperations(); + // yjddebug not use send + // coordinator_control->TryToSendStoreOperations(); } // this is for coordinator