Skip to content

Commit

Permalink
[fix] refactor tasklist
Browse files Browse the repository at this point in the history
  • Loading branch information
visualYJD committed Jan 14, 2025
1 parent e1a078a commit 937fc4d
Show file tree
Hide file tree
Showing 3 changed files with 399 additions and 486 deletions.
38 changes: 22 additions & 16 deletions src/coordinator/coordinator_control.h
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,8 @@ class CoordinatorControl : public MetaControl {

// create region
butil::Status CreateRegionWithTaskList(std::vector<pb::coordinator::StoreOperation> &store_operations,
int64_t new_region_id, pb::coordinator_internal::MetaIncrement &meta_increment);
int64_t new_region_id,
pb::coordinator_internal::MetaIncrement &meta_increment);

// create schema
// in: tenant_id
Expand Down Expand Up @@ -516,6 +517,7 @@ class CoordinatorControl : public MetaControl {
// get storemap
void GetStoreMap(pb::common::StoreMap &store_map);
void GetStoreMap(pb::common::StoreMap &store_map, pb::common::StoreType store_type);
std::vector<pb::common::Store> GetShuffleStores();
pb::common::Store GetStore(int64_t store_id);

// get store metrics
Expand Down Expand Up @@ -852,18 +854,19 @@ class CoordinatorControl : public MetaControl {
pb::coordinator_internal::MetaIncrement &meta_increment);
void AddSnapshotVectorIndexTask(pb::coordinator::Task *region_save_vector_task, int64_t store_id, int64_t region_id,
int64_t snapshot_log_id, pb::coordinator_internal::MetaIncrement &meta_increment);
static void AddCheckSplitResultTask(pb::coordinator::TaskList *task_list, int64_t split_to_region_id);
static void AddCheckSplitResultTask(pb::coordinator::TaskList *task_list, int64_t store_id,
int64_t split_to_region_id);
static void AddCheckStoreVectorIndexTask(pb::coordinator::Task *check_vector_task, int64_t store_id,
int64_t region_id, int64_t vector_index_version);
static void AddCheckVectorIndexSnapshotLogIdTask(pb::coordinator::TaskList *task_list, int64_t region_id,
int64_t vector_snapshot_log_id);
static void AddCheckVectorIndexSnapshotLogIdTask(pb::coordinator::TaskList *task_list, int64_t store_id,
int64_t region_id, int64_t vector_snapshot_log_id);
void AddLoadVectorIndexTask(pb::coordinator::Task *load_vector_task, int64_t store_id, int64_t region_id,
pb::coordinator_internal::MetaIncrement &meta_increment);
static void AddCheckStoreRegionTask(pb::coordinator::Task *check_region_task, int64_t store_id, int64_t region_id);
static void AddCheckChangePeerResultTask(pb::coordinator::TaskList *task_list, int64_t region_id,
static void AddCheckChangePeerResultTask(pb::coordinator::TaskList *task_list, int64_t store_id, int64_t region_id,
const pb::common::RegionDefinition &region_definition);
static void AddCheckMergeResultTask(pb::coordinator::TaskList *task_list, int64_t merge_to_region_id,
const pb::common::Range &range);
static void AddCheckMergeResultTask(pb::coordinator::TaskList *task_list, int64_t store_id,
int64_t merge_to_region_id, const pb::common::Range &range);
static void AddCheckTombstoneRegionTask(pb::coordinator::Task *check_tombstone_region_task, int64_t store_id,
int64_t region_id);

Expand All @@ -882,11 +885,10 @@ class CoordinatorControl : public MetaControl {

butil::Status GenStoreOperationByTaskList(int64_t store_id, int64_t job_id,
const pb::coordinator::RegionCmd &region_cmd,
pb::coordinator::StoreOperation &store_operation,
pb::coordinator_internal::MetaIncrement &meta_increment);
pb::coordinator::StoreOperation &store_operation);

void SendTaskStoreOperation(const pb::common::Store &store, const pb::coordinator::StoreOperation &store_operation,
pb::coordinator_internal::MetaIncrement &meta_increment);
butil::Status SendTaskStoreOperation(int64_t store_id, const pb::coordinator::StoreOperation &store_operation,
pb::coordinator_internal::MetaIncrement &meta_increment);

// move region_cmd from one store to another store
butil::Status MoveTaskRegionCmd(int64_t task_list_id, int64_t old_store_id, int64_t new_store_id,
Expand All @@ -897,19 +899,23 @@ class CoordinatorControl : public MetaControl {

butil::Status UpdateTaskProcess(const pb::coordinator_internal::MetaIncrementTaskList &task_list);

bool UpdateMoveTaskRegionCmd(pb::coordinator::TaskList &task_list, int64_t new_store_id, int64_t region_cmd_id);
bool MoveRegionCmdInStoreOperation(pb::coordinator::TaskList &task_list, int64_t new_store_id, int64_t region_cmd_id);

void UpdateTaskRegionCmdStatus(pb::coordinator::Task &task,
void UpdateTaskRegionCmdStatus(pb::coordinator::StoreOperation &store_operation,
pb::coordinator_internal::MetaIncrementRegionCmdStatus region_cmd_status);

void UpdateTaskListNextStep(pb::coordinator::TaskList &task_list, pb::coordinator::Task &current_task,
int64_t region_cmd_id);

bool AllParallelTasksSuccess(pb::coordinator::TaskList &task_list, int64_t region_cmd_id);

bool NeedAutoCleanTaskList(const pb::coordinator::TaskList &task_list, const pb::coordinator::Task &current_task);
bool NeedAutoCleanTaskList(const pb::coordinator::TaskList &task_list,
const pb::coordinator::StoreOperation &current_store_operation);

pb::coordinator::Task *FindTaskByRegionCmd(pb::coordinator::TaskList &task_list, int64_t region_cmd_id,
bool &parallel_task, int32_t &current_step);
int32_t &current_step);

pb::coordinator::Task *FindParallelParentTaskByRegionCmd(pb::coordinator::TaskList &task_list, int64_t region_cmd_id);
pb::coordinator::StoreOperation *FindExecuteStoreOperation(pb::coordinator::Task &task, int64_t region_cmd_id);

bool DoTaskPreCheck(const pb::coordinator::TaskPreCheck &task_pre_check);

Expand Down
Loading

0 comments on commit 937fc4d

Please sign in to comment.