Skip to content

Commit

Permalink
[fix] Add create region task
Browse files Browse the repository at this point in the history
  • Loading branch information
visualYJD committed Jan 13, 2025
1 parent 371e188 commit e1a078a
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 23 deletions.
4 changes: 4 additions & 0 deletions src/coordinator/coordinator_control.h
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,10 @@ class CoordinatorControl : public MetaControl {
butil::Status TransferLeaderRegionWithTaskList(int64_t region_id, int64_t new_leader_store_id, bool is_force,
pb::coordinator_internal::MetaIncrement &meta_increment);

// create region
butil::Status CreateRegionWithTaskList(std::vector<pb::coordinator::StoreOperation> &store_operations,
int64_t new_region_id, pb::coordinator_internal::MetaIncrement &meta_increment);

// create schema
// in: tenant_id
// in: schema_name
Expand Down
63 changes: 43 additions & 20 deletions src/coordinator/coordinator_control_coor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1520,12 +1520,17 @@ butil::Status CoordinatorControl::CreateRegionForSplit(const std::string& region

// create region with split_from_region_id & store_ids
std::vector<pb::coordinator::StoreOperation> store_operations;
return CreateRegionFinal(region_name, region_type, split_from_region.definition().raw_engine(),
split_from_region.definition().store_engine(), resource_tag, store_ids.size(), region_range,
split_from_region.definition().schema_id(), split_from_region.definition().table_id(),
split_from_region.definition().index_id(), split_from_region.definition().part_id(),
split_from_region.definition().tenant_id(), split_from_region.definition().index_parameter(),
store_ids, split_from_region_id, new_region_id, store_operations, meta_increment);
auto ret1 =
CreateRegionFinal(region_name, region_type, split_from_region.definition().raw_engine(),
split_from_region.definition().store_engine(), resource_tag, store_ids.size(), region_range,
split_from_region.definition().schema_id(), split_from_region.definition().table_id(),
split_from_region.definition().index_id(), split_from_region.definition().part_id(),
split_from_region.definition().tenant_id(), split_from_region.definition().index_parameter(),
store_ids, split_from_region_id, new_region_id, store_operations, meta_increment);
if (!ret1.ok()) {
return ret1;
}
return CreateRegionWithTaskList(store_operations, new_region_id, meta_increment);
}

butil::Status CoordinatorControl::SelectStore(pb::common::StoreType store_type, int32_t replica_num,
Expand Down Expand Up @@ -2331,21 +2336,22 @@ butil::Status CoordinatorControl::CreateRegionFinal(
auto* region_increment_region = region_increment->mutable_region();
*region_increment_region = new_region;

// yjddebug remove this code
// add store operations to meta_increment
for (const auto& store_operation : store_operations) {
for (const auto& region_cmd : store_operation.region_cmds()) {
auto ret = AddRegionCmd(store_operation.id(), 0, region_cmd, meta_increment);
if (!ret.ok()) {
DINGO_LOG(ERROR) << "CreateRegion AddRegionCmd failed, store_id=" << store_operation.id()
<< ", region_cmd=" << region_cmd.ShortDebugString();
return ret;
}
DINGO_LOG(INFO) << "AddRegionCmd store_id=" << store_operation.id()
<< ", region_cmd=" << region_cmd.ShortDebugString();
}

DINGO_LOG(INFO) << "store_operation_increment = " << store_operation.ShortDebugString();
}
// for (const auto& store_operation : store_operations) {
// for (const auto& region_cmd : store_operation.region_cmds()) {
// auto ret = AddRegionCmd(store_operation.id(), 0, region_cmd, meta_increment);
// if (!ret.ok()) {
// DINGO_LOG(ERROR) << "CreateRegion AddRegionCmd failed, store_id=" << store_operation.id()
// << ", region_cmd=" << region_cmd.ShortDebugString();
// return ret;
// }
// DINGO_LOG(INFO) << "AddRegionCmd store_id=" << store_operation.id()
// << ", region_cmd=" << region_cmd.ShortDebugString();
// }

// DINGO_LOG(INFO) << "store_operation_increment = " << store_operation.ShortDebugString();
// }

return butil::Status::OK();
}
Expand Down Expand Up @@ -3741,6 +3747,23 @@ butil::Status CoordinatorControl::TransferLeaderRegionWithTaskList(
return butil::Status::OK();
}

butil::Status CoordinatorControl::CreateRegionWithTaskList(
std::vector<pb::coordinator::StoreOperation>& store_operations, int64_t new_region_id,
pb::coordinator_internal::MetaIncrement& meta_increment) {
// create task list
auto* new_task_list = CreateTaskList(meta_increment, "CreateRegion");

// build create_region task
auto* create_region_task = new_task_list->add_tasks();
create_region_task->set_parallelism(true);
for (const auto& it : store_operations) {
auto* parallel_create_region_task = create_region_task->add_parallel_tasks();
auto* new_store_operation = parallel_create_region_task->add_store_operations();
*new_store_operation = it;
}
return butil::Status::OK();
}

butil::Status CoordinatorControl::ValidateTaskListConflict(int64_t region_id, int64_t second_region_id) {
// check task_list conflict
butil::FlatMap<int64_t, pb::coordinator::TaskList> task_list_map_temp;
Expand Down
4 changes: 2 additions & 2 deletions src/coordinator/coordinator_control_meta.cc
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,7 @@ butil::Status CoordinatorControl::CreateTable(int64_t schema_id, const pb::meta:
<< ", table_definition:" << table_definition.ShortDebugString() << " ret: " << ret.error_str();
return ret;
}

CreateRegionWithTaskList(store_operations, new_region_id, meta_increment);
DINGO_LOG(INFO) << "CreateTable create region success, region_id=" << new_region_id;

new_region_ids.push_back(new_region_id);
Expand Down Expand Up @@ -1408,7 +1408,7 @@ butil::Status CoordinatorControl::CreateIndex(int64_t schema_id, const pb::meta:
DINGO_LOG(ERROR) << "CreateRegion failed in CreateIndex index_name=" << table_definition.name();
return ret;
}

CreateRegionWithTaskList(store_operations, new_region_id, meta_increment);
DINGO_LOG(INFO) << "CreateIndex create region success, region_id=" << new_region_id;

new_region_ids.push_back(new_region_id);
Expand Down
14 changes: 14 additions & 0 deletions src/server/coordinator_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1311,6 +1311,13 @@ void DoCreateRegion(google::protobuf::RpcController * /*controller*/,
ret = coordinator_control->CreateRegionFinal(
region_name, region_type, raw_engine, store_engine, resource_tag, replica_num, range, schema_id, table_id,
index_id, part_id, tenant_id, index_parameter, store_ids, 0, new_region_id, store_operations, meta_increment);
if (!ret.ok()) {
DINGO_LOG(ERROR) << "Create Region Failed, errno=" << ret << " Request:" << request->ShortDebugString();
response->mutable_error()->set_errcode(static_cast<pb::error::Errno>(ret.error_code()));
response->mutable_error()->set_errmsg(ret.error_str());
return;
}
ret = coordinator_control->CreateRegionWithTaskList(store_operations, new_region_id, meta_increment);
} else {
// store_ids is empty, will auto select store
std::vector<int64_t> store_ids;
Expand All @@ -1319,6 +1326,13 @@ void DoCreateRegion(google::protobuf::RpcController * /*controller*/,
ret = coordinator_control->CreateRegionFinal(
region_name, region_type, raw_engine, store_engine, resource_tag, replica_num, range, schema_id, table_id,
index_id, part_id, tenant_id, index_parameter, store_ids, 0, new_region_id, store_operations, meta_increment);
if (!ret.ok()) {
DINGO_LOG(ERROR) << "Create Region Failed, errno=" << ret << " Request:" << request->ShortDebugString();
response->mutable_error()->set_errcode(static_cast<pb::error::Errno>(ret.error_code()));
response->mutable_error()->set_errmsg(ret.error_str());
return;
}
ret = coordinator_control->CreateRegionWithTaskList(store_operations, new_region_id, meta_increment);
}

if (!ret.ok()) {
Expand Down
3 changes: 2 additions & 1 deletion src/store/heartbeat.cc
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,8 @@ void CoordinatorPushTask::SendCoordinatorPushToStore(std::shared_ptr<Coordinator

AtomicGuard guard(g_coordinator_push_to_store_running);

coordinator_control->TryToSendStoreOperations();
// yjddebug not use send
// coordinator_control->TryToSendStoreOperations();
}

// this is for coordinator
Expand Down

0 comments on commit e1a078a

Please sign in to comment.