From 948f2c2beb663a2c5f9e0db53beb0aae49f843f8 Mon Sep 17 00:00:00 2001 From: Ketor Date: Wed, 18 Oct 2023 01:39:13 +0800 Subject: [PATCH] [feat][coordinator] Implement UpdateTables, AddIndexOnTable and DropIndexOnTable. Signed-off-by: Ketor --- proto/meta.proto | 33 +++ src/client/coordinator_client_function.h | 3 + .../coordinator_client_function_meta.cc | 251 ++++++++++++++++++ src/client/dingodb_client.cc | 7 + src/coordinator/coordinator_control.h | 10 + src/coordinator/coordinator_control_fsm.cc | 13 +- src/coordinator/coordinator_control_meta.cc | 188 +++++++++++++ src/server/meta_service.cc | 159 +++++++++++ src/server/meta_service.h | 6 + 9 files changed, 664 insertions(+), 6 deletions(-) diff --git a/proto/meta.proto b/proto/meta.proto index 70d5f49ca..25874d777 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -583,6 +583,32 @@ message CreateTablesResponse { repeated DingoCommonId table_ids = 2; } +message UpdateTablesRequest { + TableDefinitionWithId table_definition_with_id = 2; +} + +message UpdateTablesResponse { + dingodb.pb.error.Error error = 1; +} + +message AddIndexOnTableRequest { + DingoCommonId table_id = 1; // this is the table_id to add index + TableDefinitionWithId table_definition_with_id = 2; // this is the index to be added +} + +message AddIndexOnTableResponse { + dingodb.pb.error.Error error = 1; +} + +message DropIndexOnTableRequest { + DingoCommonId table_id = 1; // this is the table_id to drop index + DingoCommonId index_id = 2; // this is the index_id to be dropped +} + +message DropIndexOnTableResponse { + dingodb.pb.error.Error error = 1; +} + // get assigned table definition, including vector index and scalar index message GetTablesRequest { DingoCommonId table_id = 1; @@ -748,6 +774,13 @@ service MetaService { // out: List rpc CreateTables(CreateTablesRequest) returns (CreateTablesResponse); + // UpdateTables + // in: schema_id List + rpc UpdateTables(UpdateTablesRequest) returns (UpdateTablesResponse); + + rpc AddIndexOnTable(AddIndexOnTableRequest) returns (AddIndexOnTableResponse); + rpc DropIndexOnTable(DropIndexOnTableRequest) returns (DropIndexOnTableResponse); + // GetTables // in: schema_id table_id // out: List diff --git a/src/client/coordinator_client_function.h b/src/client/coordinator_client_function.h index 01374b5ed..170b7a7ac 100644 --- a/src/client/coordinator_client_function.h +++ b/src/client/coordinator_client_function.h @@ -128,6 +128,9 @@ void SendGenerateTableIds(std::shared_ptr coord void SendCreateTables(std::shared_ptr coordinator_interaction); void SendGetTables(std::shared_ptr coordinator_interaction); void SendDropTables(std::shared_ptr coordinator_interaction); +void SendUpdateTables(std::shared_ptr coordinator_interaction); +void SendAddIndexOnTable(std::shared_ptr coordinator_interaction); +void SendDropIndexOnTable(std::shared_ptr coordinator_interaction); // auto increment functions void SendGetAutoIncrements(std::shared_ptr coordinator_interaction); diff --git a/src/client/coordinator_client_function_meta.cc b/src/client/coordinator_client_function_meta.cc index 028b8fc34..0bdbcc68a 100644 --- a/src/client/coordinator_client_function_meta.cc +++ b/src/client/coordinator_client_function_meta.cc @@ -33,6 +33,7 @@ DECLARE_string(id); DECLARE_string(name); DECLARE_uint64(schema_id); DECLARE_uint64(table_id); +DECLARE_uint64(index_id); DECLARE_uint64(replica); DECLARE_uint32(max_elements); DECLARE_uint32(dimension); @@ -44,6 +45,9 @@ DECLARE_bool(auto_split); DECLARE_uint32(part_count); DECLARE_uint32(ncentroids); DECLARE_string(metrics_type); +DECLARE_int64(def_version); + +DEFINE_bool(is_updating_index, false, "is index"); void SendGetSchemas(std::shared_ptr coordinator_interaction) { dingodb::pb::meta::GetSchemasRequest request; @@ -1169,3 +1173,250 @@ void SendCleanDeletedIndex(std::shared_ptr coor DINGO_LOG(INFO) << "SendRequest status=" << status; DINGO_LOG(INFO) << "RESPONSE =" << response.DebugString(); } + +void SendUpdateTables(std::shared_ptr coordinator_interaction) { + bool with_increment = false; + + dingodb::pb::meta::UpdateTablesRequest request; + dingodb::pb::meta::UpdateTablesResponse response; + + if (FLAGS_table_id == 0) { + DINGO_LOG(WARNING) << "table_id is empty"; + return; + } + + if (FLAGS_def_version == 0) { + DINGO_LOG(WARNING) << "version is empty"; + return; + } + + if (FLAGS_part_count == 0) { + FLAGS_part_count = 1; + } + uint32_t part_count = FLAGS_part_count; + + std::vector part_ids; + for (int i = 0; i < part_count; i++) { + int64_t new_part_id = 0; + int ret = GetCreateTableId(coordinator_interaction, new_part_id); + if (ret != 0) { + DINGO_LOG(WARNING) << "GetCreateTableId failed"; + return; + } + part_ids.push_back(new_part_id); + } + + auto* definition_with_id = request.mutable_table_definition_with_id(); + auto* table_id = definition_with_id->mutable_table_id(); + if (!FLAGS_is_updating_index) { + table_id->set_entity_type(::dingodb::pb::meta::EntityType::ENTITY_TYPE_TABLE); + } else { + table_id->set_entity_type(::dingodb::pb::meta::EntityType::ENTITY_TYPE_INDEX); + } + table_id->set_entity_id(FLAGS_table_id); + + // string name = 1; + auto* table_definition = definition_with_id->mutable_table_definition(); + table_definition->set_name(FLAGS_name); + + if (FLAGS_replica > 0) { + table_definition->set_replica(FLAGS_replica); + } + + // repeated ColumnDefinition columns = 2; + for (int i = 0; i < 3; i++) { + auto* column = table_definition->add_columns(); + std::string column_name("test_columen_"); + column_name.append(std::to_string(i)); + column->set_name(column_name); + column->set_sql_type("BIGINT"); + column->set_element_type("BIGINT"); + column->set_precision(100); + column->set_nullable(false); + column->set_indexofkey(7); + column->set_has_default_val(false); + column->set_default_val("0"); + + if (with_increment && i == 0) { + column->set_is_auto_increment(true); + } + } + if (with_increment) { + table_definition->set_auto_increment(100); + } + + // map indexes = 3; + // uint32 version = 4; + table_definition->set_version(FLAGS_def_version); + // uint64 ttl = 5; + table_definition->set_ttl(0); + // PartitionRule table_partition = 6; + // Engine engine = 7; + table_definition->set_engine(::dingodb::pb::common::Engine::ENG_ROCKSDB); + // map properties = 8; + auto* prop = table_definition->mutable_properties(); + (*prop)["test property"] = "test_property_value"; + + // add partition_rule + // repeated string columns = 1; + // PartitionStrategy strategy = 2; + auto* partition_rule = table_definition->mutable_table_partition(); + auto* part_column = partition_rule->add_columns(); + part_column->assign("test_part_column"); + for (int i = 0; i < part_count; i++) { + auto* part = partition_rule->add_partitions(); + part->mutable_id()->set_entity_id(part_ids[i]); + part->mutable_id()->set_entity_type(::dingodb::pb::meta::EntityType::ENTITY_TYPE_PART); + part->mutable_id()->set_parent_entity_id(FLAGS_table_id); + part->mutable_range()->set_start_key(client::Helper::EncodeRegionRange(part_ids[i])); + part->mutable_range()->set_end_key(client::Helper::EncodeRegionRange(part_ids[i] + 1)); + } + + auto status = coordinator_interaction->SendRequest("UpdateTables", request, response); + DINGO_LOG(INFO) << "SendRequest status=" << status; + DINGO_LOG_INFO << response.DebugString(); +} + +void SendAddIndexOnTable(std::shared_ptr coordinator_interaction) { + bool with_increment = false; + + dingodb::pb::meta::AddIndexOnTableRequest request; + dingodb::pb::meta::AddIndexOnTableResponse response; + + if (FLAGS_table_id == 0) { + DINGO_LOG(WARNING) << "table_id is empty"; + return; + } + + if (FLAGS_index_id == 0) { + DINGO_LOG(WARNING) << "index_id is empty"; + return; + } + + if (FLAGS_def_version == 0) { + DINGO_LOG(WARNING) << "version is empty"; + return; + } + + if (FLAGS_name.empty()) { + DINGO_LOG(WARNING) << "name is empty"; + return; + } + + if (FLAGS_part_count == 0) { + FLAGS_part_count = 1; + } + uint32_t part_count = FLAGS_part_count; + + std::vector part_ids; + for (int i = 0; i < part_count; i++) { + int64_t new_part_id = 0; + int ret = GetCreateTableId(coordinator_interaction, new_part_id); + if (ret != 0) { + DINGO_LOG(WARNING) << "GetCreateTableId failed"; + return; + } + part_ids.push_back(new_part_id); + } + + request.mutable_table_id()->set_entity_id(FLAGS_table_id); + request.mutable_table_id()->set_entity_type(::dingodb::pb::meta::EntityType::ENTITY_TYPE_TABLE); + + auto* definition_with_id = request.mutable_table_definition_with_id(); + auto* index_id = definition_with_id->mutable_table_id(); + index_id->set_entity_type(::dingodb::pb::meta::EntityType::ENTITY_TYPE_INDEX); + index_id->set_entity_id(FLAGS_index_id); + + // string name = 1; + auto* table_definition = definition_with_id->mutable_table_definition(); + table_definition->set_name(FLAGS_name); + + if (FLAGS_replica > 0) { + table_definition->set_replica(FLAGS_replica); + } + + // repeated ColumnDefinition columns = 2; + for (int i = 0; i < 3; i++) { + auto* column = table_definition->add_columns(); + std::string column_name("test_columen_"); + column_name.append(std::to_string(i)); + column->set_name(column_name); + column->set_sql_type("BIGINT"); + column->set_element_type("BIGINT"); + column->set_precision(100); + column->set_nullable(false); + column->set_indexofkey(7); + column->set_has_default_val(false); + column->set_default_val("0"); + + if (with_increment && i == 0) { + column->set_is_auto_increment(true); + } + } + if (with_increment) { + table_definition->set_auto_increment(100); + } + + // map indexes = 3; + // uint32 version = 4; + table_definition->set_version(FLAGS_def_version); + // uint64 ttl = 5; + table_definition->set_ttl(0); + // PartitionRule table_partition = 6; + // Engine engine = 7; + table_definition->set_engine(::dingodb::pb::common::Engine::ENG_ROCKSDB); + // map properties = 8; + auto* prop = table_definition->mutable_properties(); + (*prop)["test property"] = "test_property_value"; + + // add index_parameter + auto* index_parameter = table_definition->mutable_index_parameter(); + index_parameter->set_index_type(::dingodb::pb::common::IndexType::INDEX_TYPE_SCALAR); + index_parameter->mutable_scalar_index_parameter()->set_scalar_index_type( + ::dingodb::pb::common::ScalarIndexType::SCALAR_INDEX_TYPE_LSM); + + // add partition_rule + // repeated string columns = 1; + // PartitionStrategy strategy = 2; + auto* partition_rule = table_definition->mutable_table_partition(); + auto* part_column = partition_rule->add_columns(); + part_column->assign("test_part_column"); + for (int i = 0; i < part_count; i++) { + auto* part = partition_rule->add_partitions(); + part->mutable_id()->set_entity_id(part_ids[i]); + part->mutable_id()->set_entity_type(::dingodb::pb::meta::EntityType::ENTITY_TYPE_PART); + part->mutable_id()->set_parent_entity_id(FLAGS_table_id); + part->mutable_range()->set_start_key(client::Helper::EncodeRegionRange(part_ids[i])); + part->mutable_range()->set_end_key(client::Helper::EncodeRegionRange(part_ids[i] + 1)); + } + + auto status = coordinator_interaction->SendRequest("AddIndexOnTable", request, response); + DINGO_LOG(INFO) << "SendRequest status=" << status; + DINGO_LOG_INFO << response.DebugString(); +} + +void SendDropIndexOnTable(std::shared_ptr coordinator_interaction) { + bool with_increment = false; + + dingodb::pb::meta::DropIndexOnTableRequest request; + dingodb::pb::meta::DropIndexOnTableResponse response; + + if (FLAGS_table_id == 0) { + DINGO_LOG(WARNING) << "table_id is empty"; + return; + } + + if (FLAGS_index_id == 0) { + DINGO_LOG(WARNING) << "index_id is empty"; + return; + } + + request.mutable_table_id()->set_entity_id(FLAGS_table_id); + request.mutable_table_id()->set_entity_type(::dingodb::pb::meta::EntityType::ENTITY_TYPE_TABLE); + request.mutable_index_id()->set_entity_id(FLAGS_index_id); + request.mutable_index_id()->set_entity_type(::dingodb::pb::meta::EntityType::ENTITY_TYPE_INDEX); + + auto status = coordinator_interaction->SendRequest("DropIndexOnTable", request, response); + DINGO_LOG(INFO) << "SendRequest status=" << status; + DINGO_LOG_INFO << response.DebugString(); +} diff --git a/src/client/dingodb_client.cc b/src/client/dingodb_client.cc index 21c4a4d1d..13a8777ad 100644 --- a/src/client/dingodb_client.cc +++ b/src/client/dingodb_client.cc @@ -156,6 +156,7 @@ DEFINE_string(db_path, "", "rocksdb path"); DEFINE_bool(show_vector, false, "show vector data"); DEFINE_string(metrics_type, "L2", "metrics type"); DEFINE_uint64(safe_point, 0, "gc safe point"); +DEFINE_int64(def_version, 0, "version"); bvar::LatencyRecorder g_latency_recorder("dingo-store"); @@ -752,6 +753,12 @@ int CoordinatorSender() { SendGenerateTableIds(coordinator_interaction_meta); } else if (FLAGS_method == "CreateTables") { SendCreateTables(coordinator_interaction_meta); + } else if (FLAGS_method == "UpdateTables") { + SendUpdateTables(coordinator_interaction_meta); + } else if (FLAGS_method == "AddIndexOnTable") { + SendAddIndexOnTable(coordinator_interaction_meta); + } else if (FLAGS_method == "DropIndexOnTable") { + SendDropIndexOnTable(coordinator_interaction_meta); } else if (FLAGS_method == "GetTables") { SendGetTables(coordinator_interaction_meta); } else if (FLAGS_method == "DropTables") { diff --git a/src/coordinator/coordinator_control.h b/src/coordinator/coordinator_control.h index 66ff3732b..e2f1320f3 100644 --- a/src/coordinator/coordinator_control.h +++ b/src/coordinator/coordinator_control.h @@ -298,6 +298,16 @@ class CoordinatorControl : public MetaControl { butil::Status CreateTable(int64_t schema_id, const pb::meta::TableDefinition &table_definition, int64_t &new_table_id, std::vector ®ion_ids, pb::coordinator_internal::MetaIncrement &meta_increment); + butil::Status UpdateTableDefinition(int64_t table_id, bool is_index, + const pb::meta::TableDefinition &table_definition, + pb::coordinator_internal::MetaIncrement &meta_increment); + + butil::Status AddIndexOnTable(int64_t table_id, int64_t index_id, const pb::meta::TableDefinition &table_definition, + pb::coordinator_internal::MetaIncrement &meta_increment); + + butil::Status DropIndexOnTable(int64_t table_id, int64_t index_id, + pb::coordinator_internal::MetaIncrement &meta_increment); + // create index_id // in: schema_id // out: new index_id diff --git a/src/coordinator/coordinator_control_fsm.cc b/src/coordinator/coordinator_control_fsm.cc index 0dfa268a7..f49306f90 100644 --- a/src/coordinator/coordinator_control_fsm.cc +++ b/src/coordinator/coordinator_control_fsm.cc @@ -1880,12 +1880,13 @@ void CoordinatorControl::ApplyMetaIncrement(pb::coordinator_internal::MetaIncrem if (table.table().has_definition()) { *(table_internal.mutable_definition()) = table.table().definition(); } - if (table.table().partitions_size() > 0) { - table_internal.clear_partitions(); - for (const auto& it : table.table().partitions()) { - *(table_internal.add_partitions()) = it; - } - } + // we do not support change partitions now + // if (table.table().partitions_size() > 0) { + // table_internal.clear_partitions(); + // for (const auto& it : table.table().partitions()) { + // *(table_internal.add_partitions()) = it; + // } + // } ret = table_map_.Put(table.id(), table_internal); if (ret > 0) { DINGO_LOG(INFO) << "ApplyMetaIncrement table UPDATE, [id=" << table.id() << "] success"; diff --git a/src/coordinator/coordinator_control_meta.cc b/src/coordinator/coordinator_control_meta.cc index ff2d1d3c9..1a8fda900 100644 --- a/src/coordinator/coordinator_control_meta.cc +++ b/src/coordinator/coordinator_control_meta.cc @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -2982,4 +2983,191 @@ butil::Status CoordinatorControl::CleanDeletedIndex(int64_t index_id) { return butil::Status::OK(); } +butil::Status CoordinatorControl::UpdateTableDefinition(int64_t table_id, bool is_index, + const pb::meta::TableDefinition& table_definition, + pb::coordinator_internal::MetaIncrement& meta_increment) { + pb::coordinator_internal::TableInternal table_internal; + + if (!is_index) { + int ret = table_map_.Get(table_id, table_internal); + if (ret < 0) { + DINGO_LOG(ERROR) << "ERRROR: table_id not found" << table_id; + return butil::Status(pb::error::Errno::ETABLE_NOT_FOUND, "table_id not found"); + } + } else { + int ret = index_map_.Get(table_id, table_internal); + if (ret < 0) { + DINGO_LOG(ERROR) << "ERRROR: index_id not found" << table_id; + return butil::Status(pb::error::Errno::ETABLE_NOT_FOUND, "index_id not found"); + } + } + + // check if table_definition is legal + if (table_internal.definition().name() != table_definition.name()) { + DINGO_LOG(ERROR) << "ERRROR: table name cannot be changed" << table_id; + return butil::Status(pb::error::Errno::EILLEGAL_PARAMTETERS, "table name cannot be changed"); + } + + if (table_internal.definition().table_partition().partitions_size() != + table_definition.table_partition().partitions_size()) { + DINGO_LOG(ERROR) << "ERRROR: table partition count cannot be changed" << table_id; + return butil::Status(pb::error::Errno::EILLEGAL_PARAMTETERS, "table partition count cannot be changed"); + } + + if (table_internal.definition().engine() != table_definition.engine()) { + DINGO_LOG(ERROR) << "ERRROR: table engine cannot be changed" << table_id; + return butil::Status(pb::error::Errno::EILLEGAL_PARAMTETERS, "table engine cannot be changed"); + } + + if (table_internal.definition().table_partition().strategy() != table_definition.table_partition().strategy()) { + DINGO_LOG(ERROR) << "ERRROR: table partition type cannot be changed" << table_id; + return butil::Status(pb::error::Errno::EILLEGAL_PARAMTETERS, "table partition type cannot be changed"); + } + + std::set old_ids; + std::set new_ids; + for (const auto& part_id : table_internal.definition().table_partition().partitions()) { + old_ids.insert(part_id.id().entity_id()); + } + for (const auto& part_id : table_definition.table_partition().partitions()) { + new_ids.insert(part_id.id().entity_id()); + } + for (const auto& part_id : old_ids) { + if (new_ids.find(part_id) == new_ids.end()) { + DINGO_LOG(ERROR) << "ERRROR: table partition id cannot be changed" << table_id; + return butil::Status(pb::error::Errno::EILLEGAL_PARAMTETERS, "table partition id cannot be changed"); + } + } + + // update table definition, we do not support change partitions, so we just update other fields + pb::coordinator_internal::TableInternal table_internal_new; + *(table_internal_new.mutable_definition()) = table_definition; + table_internal_new.set_id(table_internal.id()); + table_internal_new.set_table_id(table_internal.table_id()); + table_internal_new.set_schema_id(table_internal.schema_id()); + + if (!is_index) { + auto* table_increment = meta_increment.add_tables(); + table_increment->set_id(table_id); + table_increment->set_op_type(pb::coordinator_internal::MetaIncrementOpType::UPDATE); + *(table_increment->mutable_table()) = table_internal_new; + } else { + auto* index_increment = meta_increment.add_indexes(); + index_increment->set_id(table_id); + index_increment->set_op_type(pb::coordinator_internal::MetaIncrementOpType::UPDATE); + *(index_increment->mutable_table()) = table_internal_new; + } + + return butil::Status::OK(); +} + +butil::Status CoordinatorControl::AddIndexOnTable(int64_t table_id, int64_t index_id, + const pb::meta::TableDefinition& table_definition, + pb::coordinator_internal::MetaIncrement& meta_increment) { + pb::coordinator_internal::TableInternal table_internal; + int ret = table_map_.Get(table_id, table_internal); + if (ret < 0) { + DINGO_LOG(ERROR) << "ERRROR: table_id not found" << table_id; + return butil::Status(pb::error::Errno::ETABLE_NOT_FOUND, "table_id not found"); + } + + // check if index_id is already exist + pb::coordinator_internal::TableInternal index_internal; + ret = index_map_.Get(index_id, index_internal); + if (ret > 0) { + DINGO_LOG(ERROR) << "ERRROR: index_id already exist" << index_id; + return butil::Status(pb::error::Errno::EINDEX_EXISTS, "index_id already exist"); + } + + // check if table_id exeist in table_index_map_ + pb::coordinator_internal::TableIndexInternal table_index_internal; + ret = table_index_map_.Get(table_id, table_index_internal); + if (ret < 0) { + DINGO_LOG(ERROR) << "ERRROR: table_id not found in table_index_map_" << table_id; + return butil::Status(pb::error::Errno::ETABLE_NOT_FOUND, "table_id not found in table_index_map_"); + } + + // call CreateIndex to create index + std::vector region_ids; + auto status = + CreateIndex(table_internal.schema_id(), table_definition, table_id, index_id, region_ids, meta_increment); + if (!status.ok()) { + DINGO_LOG(ERROR) << "ERRROR: CreateIndex failed, table_id=" << table_id << " index_id=" << index_id; + return status; + } + + // add index to table_index_map_ + auto* new_index_id = table_index_internal.add_table_ids(); + new_index_id->set_entity_id(index_id); + new_index_id->set_entity_type(::dingodb::pb::meta::EntityType::ENTITY_TYPE_INDEX); + new_index_id->set_parent_entity_id(table_internal.schema_id()); + auto* table_index_increment = meta_increment.add_table_indexes(); + table_index_increment->set_id(table_id); + table_index_increment->set_op_type(pb::coordinator_internal::MetaIncrementOpType::UPDATE); + *(table_index_increment->mutable_table_indexes()) = table_index_internal; + + return butil::Status::OK(); +} + +butil::Status CoordinatorControl::DropIndexOnTable(int64_t table_id, int64_t index_id, + pb::coordinator_internal::MetaIncrement& meta_increment) { + pb::coordinator_internal::TableInternal table_internal; + int ret = table_map_.Get(table_id, table_internal); + if (ret < 0) { + DINGO_LOG(ERROR) << "ERRROR: table_id not found" << table_id; + return butil::Status(pb::error::Errno::ETABLE_NOT_FOUND, "table_id not found"); + } + + // check if index_id is exist + pb::coordinator_internal::TableInternal index_internal; + ret = index_map_.Get(index_id, index_internal); + if (ret < 0) { + DINGO_LOG(ERROR) << "ERRROR: index_id not found" << index_id; + return butil::Status(pb::error::Errno::EINDEX_NOT_FOUND, "index_id not found"); + } + + // check if table_id exeist in table_index_map_ + pb::coordinator_internal::TableIndexInternal table_index_internal; + ret = table_index_map_.Get(table_id, table_index_internal); + if (ret < 0) { + DINGO_LOG(ERROR) << "ERRROR: table_id not found in table_index_map_" << table_id; + return butil::Status(pb::error::Errno::ETABLE_NOT_FOUND, "table_id not found in table_index_map_"); + } + bool found = false; + for (const auto& id : table_index_internal.table_ids()) { + if (id.entity_id() == index_id) { + found = true; + break; + } + } + if (!found) { + DINGO_LOG(ERROR) << "ERRROR: index_id not found in table_index_map_" << index_id; + return butil::Status(pb::error::Errno::EINDEX_NOT_FOUND, "index_id not found in table_index_map_"); + } + + // call DropIndex to drop index + auto status = DropIndex(table_internal.schema_id(), index_id, false, meta_increment); + if (!status.ok()) { + DINGO_LOG(ERROR) << "ERRROR: DropIndex failed, table_id=" << table_id << " index_id=" << index_id; + return status; + } + + // del index from table_index_map_ + pb::coordinator_internal::TableIndexInternal table_index_internal_new; + table_index_internal_new.set_id(table_index_internal.id()); + for (const auto& id : table_index_internal.table_ids()) { + if (id.entity_id() != index_id) { + auto* new_index_id = table_index_internal_new.add_table_ids(); + *new_index_id = id; + } + } + + auto* table_index_increment = meta_increment.add_table_indexes(); + table_index_increment->set_id(table_id); + table_index_increment->set_op_type(pb::coordinator_internal::MetaIncrementOpType::UPDATE); + *(table_index_increment->mutable_table_indexes()) = table_index_internal_new; + + return butil::Status::OK(); +} + } // namespace dingodb diff --git a/src/server/meta_service.cc b/src/server/meta_service.cc index af32e9bc4..0193489a0 100644 --- a/src/server/meta_service.cc +++ b/src/server/meta_service.cc @@ -1720,6 +1720,165 @@ void MetaServiceImpl::DropTables(google::protobuf::RpcController *controller, DINGO_LOG(INFO) << "DropTables Success."; } +void MetaServiceImpl::UpdateTables(google::protobuf::RpcController *controller, + const pb::meta::UpdateTablesRequest *request, + pb::meta::UpdateTablesResponse *response, google::protobuf::Closure *done) { + brpc::ClosureGuard done_guard(done); + + if (!coordinator_control_->IsLeader()) { + return RedirectResponse(response); + } + + int64_t start_ms = butil::gettimeofday_ms(); + + DINGO_LOG(INFO) << request->ShortDebugString(); + + if (!request->has_table_definition_with_id() || !request->table_definition_with_id().has_table_id()) { + DINGO_LOG(ERROR) << "table_definition_with_id or table_id not found."; + response->mutable_error()->set_errcode(Errno::EILLEGAL_PARAMTETERS); + response->mutable_error()->set_errmsg("table_definition_with_id or table_id not found."); + return; + } + + pb::coordinator_internal::MetaIncrement meta_increment; + + // process table type + const auto &table_id = request->table_definition_with_id().table_id(); + if (table_id.entity_type() == pb::meta::EntityType::ENTITY_TYPE_TABLE) { + auto table_id_to_update = table_id.entity_id(); + const auto &definition = request->table_definition_with_id().table_definition(); + + auto ret = coordinator_control_->UpdateTableDefinition(table_id_to_update, false, definition, meta_increment); + if (!ret.ok()) { + DINGO_LOG(ERROR) << "UpdateTableDefinition failed in meta_service, error code=" << ret; + response->mutable_error()->set_errcode(static_cast(ret.error_code())); + response->mutable_error()->set_errmsg(ret.error_str()); + return; + } + } else if (table_id.entity_type() == pb::meta::EntityType::ENTITY_TYPE_INDEX) { + auto index_id_to_update = table_id.entity_id(); + const auto &definition = request->table_definition_with_id().table_definition(); + + auto ret = coordinator_control_->UpdateTableDefinition(index_id_to_update, true, definition, meta_increment); + if (!ret.ok()) { + DINGO_LOG(ERROR) << "UpdateIndexDefinition failed in meta_service, error code=" << ret; + response->mutable_error()->set_errcode(static_cast(ret.error_code())); + response->mutable_error()->set_errmsg(ret.error_str()); + return; + } + } else { + DINGO_LOG(ERROR) << "entity type is illegal, entity_type=" << table_id.entity_type(); + response->mutable_error()->set_errcode(Errno::EILLEGAL_PARAMTETERS); + response->mutable_error()->set_errmsg("entity type is illegal"); + return; + } + + // prepare for raft process + CoordinatorClosure *meta_put_closure = + new CoordinatorClosure(request, response, + done_guard.release()); + + std::shared_ptr ctx = + std::make_shared(static_cast(controller), meta_put_closure); + ctx->SetRegionId(Constant::kCoordinatorRegionId); + + // this is a async operation will be block by closure + engine_->AsyncWrite(ctx, WriteDataBuilder::BuildWrite(ctx->CfName(), meta_increment)); +} + +void MetaServiceImpl::AddIndexOnTable(google::protobuf::RpcController *controller, + const pb::meta::AddIndexOnTableRequest *request, + pb::meta::AddIndexOnTableResponse *response, google::protobuf::Closure *done) { + brpc::ClosureGuard done_guard(done); + + if (!coordinator_control_->IsLeader()) { + return RedirectResponse(response); + } + + int64_t start_ms = butil::gettimeofday_ms(); + + DINGO_LOG(INFO) << request->ShortDebugString(); + + if (!request->has_table_id() || !request->has_table_definition_with_id() || request->table_id().entity_id() == 0 || + request->table_definition_with_id().table_id().entity_id() == 0) { + DINGO_LOG(ERROR) << "table_id, index_id or index_definition not found."; + response->mutable_error()->set_errcode(Errno::EILLEGAL_PARAMTETERS); + response->mutable_error()->set_errmsg("table_id, index_id or index_definition not found."); + return; + } + + pb::coordinator_internal::MetaIncrement meta_increment; + + auto ret = coordinator_control_->AddIndexOnTable( + request->table_id().entity_id(), request->table_definition_with_id().table_id().entity_id(), + request->table_definition_with_id().table_definition(), meta_increment); + + if (!ret.ok()) { + DINGO_LOG(ERROR) << "AddIndexOnTable failed in meta_service, error code=" << ret; + response->mutable_error()->set_errcode(static_cast(ret.error_code())); + response->mutable_error()->set_errmsg(ret.error_str()); + return; + } + + // prepare for raft process + CoordinatorClosure *meta_put_closure = + new CoordinatorClosure(request, response, + done_guard.release()); + + std::shared_ptr ctx = + std::make_shared(static_cast(controller), meta_put_closure); + + ctx->SetRegionId(Constant::kCoordinatorRegionId); + + // this is a async operation will be block by closure + engine_->AsyncWrite(ctx, WriteDataBuilder::BuildWrite(ctx->CfName(), meta_increment)); +} + +void MetaServiceImpl::DropIndexOnTable(google::protobuf::RpcController *controller, + const pb::meta::DropIndexOnTableRequest *request, + pb::meta::DropIndexOnTableResponse *response, google::protobuf::Closure *done) { + brpc::ClosureGuard done_guard(done); + + if (!coordinator_control_->IsLeader()) { + return RedirectResponse(response); + } + + int64_t start_ms = butil::gettimeofday_ms(); + + DINGO_LOG(INFO) << request->ShortDebugString(); + + if (request->table_id().entity_id() == 0 || request->index_id().entity_id() == 0) { + DINGO_LOG(ERROR) << "table_id or index_id not found."; + response->mutable_error()->set_errcode(Errno::EILLEGAL_PARAMTETERS); + response->mutable_error()->set_errmsg("table_id or index_id not found."); + return; + } + + pb::coordinator_internal::MetaIncrement meta_increment; + + auto ret = coordinator_control_->DropIndexOnTable(request->table_id().entity_id(), request->index_id().entity_id(), + meta_increment); + if (!ret.ok()) { + DINGO_LOG(ERROR) << "DropIndexOnTable failed in meta_service, error code=" << ret; + response->mutable_error()->set_errcode(static_cast(ret.error_code())); + response->mutable_error()->set_errmsg(ret.error_str()); + return; + } + + // prepare for raft process + CoordinatorClosure *meta_put_closure = + new CoordinatorClosure( + request, response, done_guard.release()); + + std::shared_ptr ctx = + std::make_shared(static_cast(controller), meta_put_closure); + + ctx->SetRegionId(Constant::kCoordinatorRegionId); + + // this is a async operation will be block by closure + engine_->AsyncWrite(ctx, WriteDataBuilder::BuildWrite(ctx->CfName(), meta_increment)); +} + void MetaServiceImpl::SwitchAutoSplit(google::protobuf::RpcController *controller, const ::dingodb::pb::meta::SwitchAutoSplitRequest *request, pb::meta::SwitchAutoSplitResponse *response, google::protobuf::Closure *done) { diff --git a/src/server/meta_service.h b/src/server/meta_service.h index 853898fe4..1c5310023 100644 --- a/src/server/meta_service.h +++ b/src/server/meta_service.h @@ -135,6 +135,12 @@ class MetaServiceImpl : public pb::meta::MetaService { pb::meta::GetTablesResponse* response, google::protobuf::Closure* done) override; void DropTables(google::protobuf::RpcController* controller, const pb::meta::DropTablesRequest* request, pb::meta::DropTablesResponse* response, google::protobuf::Closure* done) override; + void UpdateTables(google::protobuf::RpcController* controller, const pb::meta::UpdateTablesRequest* request, + pb::meta::UpdateTablesResponse* response, google::protobuf::Closure* done) override; + void AddIndexOnTable(google::protobuf::RpcController* controller, const pb::meta::AddIndexOnTableRequest* request, + pb::meta::AddIndexOnTableResponse* response, google::protobuf::Closure* done) override; + void DropIndexOnTable(google::protobuf::RpcController* controller, const pb::meta::DropIndexOnTableRequest* request, + pb::meta::DropIndexOnTableResponse* response, google::protobuf::Closure* done) override; void GetAutoIncrements(google::protobuf::RpcController* controller, const pb::meta::GetAutoIncrementsRequest* request, pb::meta::GetAutoIncrementsResponse* response, google::protobuf::Closure* done) override;