Skip to content

Commit

Permalink
[feat][coordinator] Implement UpdateTables, AddIndexOnTable and
Browse files Browse the repository at this point in the history
DropIndexOnTable.

Signed-off-by: Ketor <d.ketor@gmail.com>
  • Loading branch information
ketor committed Oct 17, 2023
1 parent d114d97 commit 948f2c2
Show file tree
Hide file tree
Showing 9 changed files with 664 additions and 6 deletions.
33 changes: 33 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,32 @@ message CreateTablesResponse {
repeated DingoCommonId table_ids = 2;
}

message UpdateTablesRequest {
TableDefinitionWithId table_definition_with_id = 2;
}

message UpdateTablesResponse {
dingodb.pb.error.Error error = 1;
}

message AddIndexOnTableRequest {
DingoCommonId table_id = 1; // this is the table_id to add index
TableDefinitionWithId table_definition_with_id = 2; // this is the index to be added
}

message AddIndexOnTableResponse {
dingodb.pb.error.Error error = 1;
}

message DropIndexOnTableRequest {
DingoCommonId table_id = 1; // this is the table_id to drop index
DingoCommonId index_id = 2; // this is the index_id to be dropped
}

message DropIndexOnTableResponse {
dingodb.pb.error.Error error = 1;
}

// get assigned table definition, including vector index and scalar index
message GetTablesRequest {
DingoCommonId table_id = 1;
Expand Down Expand Up @@ -748,6 +774,13 @@ service MetaService {
// out: List<DingoCommonId>
rpc CreateTables(CreateTablesRequest) returns (CreateTablesResponse);

// UpdateTables
// in: schema_id List<TableDefinitionWithId>
rpc UpdateTables(UpdateTablesRequest) returns (UpdateTablesResponse);

rpc AddIndexOnTable(AddIndexOnTableRequest) returns (AddIndexOnTableResponse);
rpc DropIndexOnTable(DropIndexOnTableRequest) returns (DropIndexOnTableResponse);

// GetTables
// in: schema_id table_id
// out: List<DingoCommonId>
Expand Down
3 changes: 3 additions & 0 deletions src/client/coordinator_client_function.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ void SendGenerateTableIds(std::shared_ptr<dingodb::CoordinatorInteraction> coord
void SendCreateTables(std::shared_ptr<dingodb::CoordinatorInteraction> coordinator_interaction);
void SendGetTables(std::shared_ptr<dingodb::CoordinatorInteraction> coordinator_interaction);
void SendDropTables(std::shared_ptr<dingodb::CoordinatorInteraction> coordinator_interaction);
void SendUpdateTables(std::shared_ptr<dingodb::CoordinatorInteraction> coordinator_interaction);
void SendAddIndexOnTable(std::shared_ptr<dingodb::CoordinatorInteraction> coordinator_interaction);
void SendDropIndexOnTable(std::shared_ptr<dingodb::CoordinatorInteraction> coordinator_interaction);

// auto increment functions
void SendGetAutoIncrements(std::shared_ptr<dingodb::CoordinatorInteraction> coordinator_interaction);
Expand Down
251 changes: 251 additions & 0 deletions src/client/coordinator_client_function_meta.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ DECLARE_string(id);
DECLARE_string(name);
DECLARE_uint64(schema_id);
DECLARE_uint64(table_id);
DECLARE_uint64(index_id);
DECLARE_uint64(replica);
DECLARE_uint32(max_elements);
DECLARE_uint32(dimension);
Expand All @@ -44,6 +45,9 @@ DECLARE_bool(auto_split);
DECLARE_uint32(part_count);
DECLARE_uint32(ncentroids);
DECLARE_string(metrics_type);
DECLARE_int64(def_version);

DEFINE_bool(is_updating_index, false, "is index");

void SendGetSchemas(std::shared_ptr<dingodb::CoordinatorInteraction> coordinator_interaction) {
dingodb::pb::meta::GetSchemasRequest request;
Expand Down Expand Up @@ -1169,3 +1173,250 @@ void SendCleanDeletedIndex(std::shared_ptr<dingodb::CoordinatorInteraction> coor
DINGO_LOG(INFO) << "SendRequest status=" << status;
DINGO_LOG(INFO) << "RESPONSE =" << response.DebugString();
}

void SendUpdateTables(std::shared_ptr<dingodb::CoordinatorInteraction> coordinator_interaction) {
bool with_increment = false;

dingodb::pb::meta::UpdateTablesRequest request;
dingodb::pb::meta::UpdateTablesResponse response;

if (FLAGS_table_id == 0) {
DINGO_LOG(WARNING) << "table_id is empty";
return;
}

if (FLAGS_def_version == 0) {
DINGO_LOG(WARNING) << "version is empty";
return;
}

if (FLAGS_part_count == 0) {
FLAGS_part_count = 1;
}
uint32_t part_count = FLAGS_part_count;

std::vector<int64_t> part_ids;
for (int i = 0; i < part_count; i++) {
int64_t new_part_id = 0;
int ret = GetCreateTableId(coordinator_interaction, new_part_id);
if (ret != 0) {
DINGO_LOG(WARNING) << "GetCreateTableId failed";
return;
}
part_ids.push_back(new_part_id);
}

auto* definition_with_id = request.mutable_table_definition_with_id();
auto* table_id = definition_with_id->mutable_table_id();
if (!FLAGS_is_updating_index) {
table_id->set_entity_type(::dingodb::pb::meta::EntityType::ENTITY_TYPE_TABLE);
} else {
table_id->set_entity_type(::dingodb::pb::meta::EntityType::ENTITY_TYPE_INDEX);
}
table_id->set_entity_id(FLAGS_table_id);

// string name = 1;
auto* table_definition = definition_with_id->mutable_table_definition();
table_definition->set_name(FLAGS_name);

if (FLAGS_replica > 0) {
table_definition->set_replica(FLAGS_replica);
}

// repeated ColumnDefinition columns = 2;
for (int i = 0; i < 3; i++) {
auto* column = table_definition->add_columns();
std::string column_name("test_columen_");
column_name.append(std::to_string(i));
column->set_name(column_name);
column->set_sql_type("BIGINT");
column->set_element_type("BIGINT");
column->set_precision(100);
column->set_nullable(false);
column->set_indexofkey(7);
column->set_has_default_val(false);
column->set_default_val("0");

if (with_increment && i == 0) {
column->set_is_auto_increment(true);
}
}
if (with_increment) {
table_definition->set_auto_increment(100);
}

// map<string, Index> indexes = 3;
// uint32 version = 4;
table_definition->set_version(FLAGS_def_version);
// uint64 ttl = 5;
table_definition->set_ttl(0);
// PartitionRule table_partition = 6;
// Engine engine = 7;
table_definition->set_engine(::dingodb::pb::common::Engine::ENG_ROCKSDB);
// map<string, string> properties = 8;
auto* prop = table_definition->mutable_properties();
(*prop)["test property"] = "test_property_value";

// add partition_rule
// repeated string columns = 1;
// PartitionStrategy strategy = 2;
auto* partition_rule = table_definition->mutable_table_partition();
auto* part_column = partition_rule->add_columns();
part_column->assign("test_part_column");
for (int i = 0; i < part_count; i++) {
auto* part = partition_rule->add_partitions();
part->mutable_id()->set_entity_id(part_ids[i]);
part->mutable_id()->set_entity_type(::dingodb::pb::meta::EntityType::ENTITY_TYPE_PART);
part->mutable_id()->set_parent_entity_id(FLAGS_table_id);
part->mutable_range()->set_start_key(client::Helper::EncodeRegionRange(part_ids[i]));
part->mutable_range()->set_end_key(client::Helper::EncodeRegionRange(part_ids[i] + 1));
}

auto status = coordinator_interaction->SendRequest("UpdateTables", request, response);
DINGO_LOG(INFO) << "SendRequest status=" << status;
DINGO_LOG_INFO << response.DebugString();
}

void SendAddIndexOnTable(std::shared_ptr<dingodb::CoordinatorInteraction> coordinator_interaction) {
bool with_increment = false;

dingodb::pb::meta::AddIndexOnTableRequest request;
dingodb::pb::meta::AddIndexOnTableResponse response;

if (FLAGS_table_id == 0) {
DINGO_LOG(WARNING) << "table_id is empty";
return;
}

if (FLAGS_index_id == 0) {
DINGO_LOG(WARNING) << "index_id is empty";
return;
}

if (FLAGS_def_version == 0) {
DINGO_LOG(WARNING) << "version is empty";
return;
}

if (FLAGS_name.empty()) {
DINGO_LOG(WARNING) << "name is empty";
return;
}

if (FLAGS_part_count == 0) {
FLAGS_part_count = 1;
}
uint32_t part_count = FLAGS_part_count;

std::vector<int64_t> part_ids;
for (int i = 0; i < part_count; i++) {
int64_t new_part_id = 0;
int ret = GetCreateTableId(coordinator_interaction, new_part_id);
if (ret != 0) {
DINGO_LOG(WARNING) << "GetCreateTableId failed";
return;
}
part_ids.push_back(new_part_id);
}

request.mutable_table_id()->set_entity_id(FLAGS_table_id);
request.mutable_table_id()->set_entity_type(::dingodb::pb::meta::EntityType::ENTITY_TYPE_TABLE);

auto* definition_with_id = request.mutable_table_definition_with_id();
auto* index_id = definition_with_id->mutable_table_id();
index_id->set_entity_type(::dingodb::pb::meta::EntityType::ENTITY_TYPE_INDEX);
index_id->set_entity_id(FLAGS_index_id);

// string name = 1;
auto* table_definition = definition_with_id->mutable_table_definition();
table_definition->set_name(FLAGS_name);

if (FLAGS_replica > 0) {
table_definition->set_replica(FLAGS_replica);
}

// repeated ColumnDefinition columns = 2;
for (int i = 0; i < 3; i++) {
auto* column = table_definition->add_columns();
std::string column_name("test_columen_");
column_name.append(std::to_string(i));
column->set_name(column_name);
column->set_sql_type("BIGINT");
column->set_element_type("BIGINT");
column->set_precision(100);
column->set_nullable(false);
column->set_indexofkey(7);
column->set_has_default_val(false);
column->set_default_val("0");

if (with_increment && i == 0) {
column->set_is_auto_increment(true);
}
}
if (with_increment) {
table_definition->set_auto_increment(100);
}

// map<string, Index> indexes = 3;
// uint32 version = 4;
table_definition->set_version(FLAGS_def_version);
// uint64 ttl = 5;
table_definition->set_ttl(0);
// PartitionRule table_partition = 6;
// Engine engine = 7;
table_definition->set_engine(::dingodb::pb::common::Engine::ENG_ROCKSDB);
// map<string, string> properties = 8;
auto* prop = table_definition->mutable_properties();
(*prop)["test property"] = "test_property_value";

// add index_parameter
auto* index_parameter = table_definition->mutable_index_parameter();
index_parameter->set_index_type(::dingodb::pb::common::IndexType::INDEX_TYPE_SCALAR);
index_parameter->mutable_scalar_index_parameter()->set_scalar_index_type(
::dingodb::pb::common::ScalarIndexType::SCALAR_INDEX_TYPE_LSM);

// add partition_rule
// repeated string columns = 1;
// PartitionStrategy strategy = 2;
auto* partition_rule = table_definition->mutable_table_partition();
auto* part_column = partition_rule->add_columns();
part_column->assign("test_part_column");
for (int i = 0; i < part_count; i++) {
auto* part = partition_rule->add_partitions();
part->mutable_id()->set_entity_id(part_ids[i]);
part->mutable_id()->set_entity_type(::dingodb::pb::meta::EntityType::ENTITY_TYPE_PART);
part->mutable_id()->set_parent_entity_id(FLAGS_table_id);
part->mutable_range()->set_start_key(client::Helper::EncodeRegionRange(part_ids[i]));
part->mutable_range()->set_end_key(client::Helper::EncodeRegionRange(part_ids[i] + 1));
}

auto status = coordinator_interaction->SendRequest("AddIndexOnTable", request, response);
DINGO_LOG(INFO) << "SendRequest status=" << status;
DINGO_LOG_INFO << response.DebugString();
}

void SendDropIndexOnTable(std::shared_ptr<dingodb::CoordinatorInteraction> coordinator_interaction) {
bool with_increment = false;

dingodb::pb::meta::DropIndexOnTableRequest request;
dingodb::pb::meta::DropIndexOnTableResponse response;

if (FLAGS_table_id == 0) {
DINGO_LOG(WARNING) << "table_id is empty";
return;
}

if (FLAGS_index_id == 0) {
DINGO_LOG(WARNING) << "index_id is empty";
return;
}

request.mutable_table_id()->set_entity_id(FLAGS_table_id);
request.mutable_table_id()->set_entity_type(::dingodb::pb::meta::EntityType::ENTITY_TYPE_TABLE);
request.mutable_index_id()->set_entity_id(FLAGS_index_id);
request.mutable_index_id()->set_entity_type(::dingodb::pb::meta::EntityType::ENTITY_TYPE_INDEX);

auto status = coordinator_interaction->SendRequest("DropIndexOnTable", request, response);
DINGO_LOG(INFO) << "SendRequest status=" << status;
DINGO_LOG_INFO << response.DebugString();
}
7 changes: 7 additions & 0 deletions src/client/dingodb_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ DEFINE_string(db_path, "", "rocksdb path");
DEFINE_bool(show_vector, false, "show vector data");
DEFINE_string(metrics_type, "L2", "metrics type");
DEFINE_uint64(safe_point, 0, "gc safe point");
DEFINE_int64(def_version, 0, "version");

bvar::LatencyRecorder g_latency_recorder("dingo-store");

Expand Down Expand Up @@ -752,6 +753,12 @@ int CoordinatorSender() {
SendGenerateTableIds(coordinator_interaction_meta);
} else if (FLAGS_method == "CreateTables") {
SendCreateTables(coordinator_interaction_meta);
} else if (FLAGS_method == "UpdateTables") {
SendUpdateTables(coordinator_interaction_meta);
} else if (FLAGS_method == "AddIndexOnTable") {
SendAddIndexOnTable(coordinator_interaction_meta);
} else if (FLAGS_method == "DropIndexOnTable") {
SendDropIndexOnTable(coordinator_interaction_meta);
} else if (FLAGS_method == "GetTables") {
SendGetTables(coordinator_interaction_meta);
} else if (FLAGS_method == "DropTables") {
Expand Down
10 changes: 10 additions & 0 deletions src/coordinator/coordinator_control.h
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,16 @@ class CoordinatorControl : public MetaControl {
butil::Status CreateTable(int64_t schema_id, const pb::meta::TableDefinition &table_definition, int64_t &new_table_id,
std::vector<int64_t> &region_ids, pb::coordinator_internal::MetaIncrement &meta_increment);

butil::Status UpdateTableDefinition(int64_t table_id, bool is_index,
const pb::meta::TableDefinition &table_definition,
pb::coordinator_internal::MetaIncrement &meta_increment);

butil::Status AddIndexOnTable(int64_t table_id, int64_t index_id, const pb::meta::TableDefinition &table_definition,
pb::coordinator_internal::MetaIncrement &meta_increment);

butil::Status DropIndexOnTable(int64_t table_id, int64_t index_id,
pb::coordinator_internal::MetaIncrement &meta_increment);

// create index_id
// in: schema_id
// out: new index_id
Expand Down
13 changes: 7 additions & 6 deletions src/coordinator/coordinator_control_fsm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1880,12 +1880,13 @@ void CoordinatorControl::ApplyMetaIncrement(pb::coordinator_internal::MetaIncrem
if (table.table().has_definition()) {
*(table_internal.mutable_definition()) = table.table().definition();
}
if (table.table().partitions_size() > 0) {
table_internal.clear_partitions();
for (const auto& it : table.table().partitions()) {
*(table_internal.add_partitions()) = it;
}
}
// we do not support change partitions now
// if (table.table().partitions_size() > 0) {
// table_internal.clear_partitions();
// for (const auto& it : table.table().partitions()) {
// *(table_internal.add_partitions()) = it;
// }
// }
ret = table_map_.Put(table.id(), table_internal);
if (ret > 0) {
DINGO_LOG(INFO) << "ApplyMetaIncrement table UPDATE, [id=" << table.id() << "] success";
Expand Down
Loading

0 comments on commit 948f2c2

Please sign in to comment.