Skip to content

Commit

Permalink
[feat][store] Implement multi column family for vector index.
Browse files Browse the repository at this point in the history
Signed-off-by: Ketor <d.ketor@gmail.com>
  • Loading branch information
ketor authored and rock-git committed Oct 19, 2023
1 parent f201ed2 commit 75ea226
Show file tree
Hide file tree
Showing 21 changed files with 534 additions and 348 deletions.
2 changes: 2 additions & 0 deletions conf/index.template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,5 @@ store:
- data
- lock
- write
- vector_scalar
- vector_table
2 changes: 2 additions & 0 deletions conf/store.template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ store:
- data
- lock
- write
- vector_scalar
- vector_table
scan:
scan_interval_s: 30
timeout_s: 60
Expand Down
1 change: 1 addition & 0 deletions proto/store_internal.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ message SstFileInfo {
string path = 3;
bytes start_key = 4;
bytes end_key = 5;
string cf_name = 6;
}

message RaftMeta {
Expand Down
38 changes: 20 additions & 18 deletions src/client/store_tool_dump.cc
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ void PrintValues(const dingodb::pb::meta::TableDefinition& table_definition,
std::cout << " | ";
}
}
std::cout << std::endl;
std::cout << '\n';
}

std::shared_ptr<std::vector<std::shared_ptr<dingodb::BaseSchema>>> GenSerialSchema(
Expand Down Expand Up @@ -414,7 +414,7 @@ void DumpVectorIndexDb(std::shared_ptr<Context> ctx) {
data.ParseFromString(value);
std::cout << fmt::format("[vector data] vector_id({}) value: dimension({}) {}",
dingodb::VectorCodec::DecodeVectorId(key), data.dimension(), FormatVector(data, 10))
<< std::endl;
<< '\n';
}
};

Expand All @@ -424,7 +424,7 @@ void DumpVectorIndexDb(std::shared_ptr<Context> ctx) {
data.ParseFromString(value);
std::cout << fmt::format("[scalar data] vector_id({}) value: {}", dingodb::VectorCodec::DecodeVectorId(key),
data.ShortDebugString())
<< std::endl;
<< '\n';
}
};

Expand All @@ -436,7 +436,7 @@ void DumpVectorIndexDb(std::shared_ptr<Context> ctx) {
dingodb::VectorCodec::DecodeVectorId(key),
dingodb::Helper::StringToHex(data.table_key()),
dingodb::Helper::StringToHex(data.table_value()))
<< std::endl;
<< '\n';
}
};

Expand All @@ -452,26 +452,28 @@ void DumpVectorIndexDb(std::shared_ptr<Context> ctx) {

{
std::string begin_key, end_key;
dingodb::VectorCodec::EncodeVectorData(partition_id, 0, begin_key);
dingodb::VectorCodec::EncodeVectorData(partition_id, INT64_MAX, end_key);
// dingodb::VectorCodec::EncodeVectorData(partition_id, 0, begin_key);
// dingodb::VectorCodec::EncodeVectorData(partition_id, INT64_MAX, end_key);
dingodb::VectorCodec::EncodeVectorKey(partition_id, 0, begin_key);
dingodb::VectorCodec::EncodeVectorKey(partition_id, INT64_MAX, end_key);
db->Scan(begin_key, end_key, ctx->offset, ctx->limit, vector_data_handler);
}

{
std::string begin_key, end_key;
dingodb::VectorCodec::EncodeVectorScalar(partition_id, 0, begin_key);
dingodb::VectorCodec::EncodeVectorScalar(partition_id, INT64_MAX, end_key);
// {
// std::string begin_key, end_key;
// dingodb::VectorCodec::EncodeVectorScalar(partition_id, 0, begin_key);
// dingodb::VectorCodec::EncodeVectorScalar(partition_id, INT64_MAX, end_key);

db->Scan(begin_key, end_key, ctx->offset, ctx->limit, scalar_data_handler);
}
// db->Scan(begin_key, end_key, ctx->offset, ctx->limit, scalar_data_handler);
// }

{
std::string begin_key, end_key;
dingodb::VectorCodec::EncodeVectorTable(partition_id, 0, begin_key);
dingodb::VectorCodec::EncodeVectorTable(partition_id, INT64_MAX, end_key);
// {
// std::string begin_key, end_key;
// dingodb::VectorCodec::EncodeVectorTable(partition_id, 0, begin_key);
// dingodb::VectorCodec::EncodeVectorTable(partition_id, INT64_MAX, end_key);

db->Scan(begin_key, end_key, ctx->offset, ctx->limit, table_data_handler);
}
// db->Scan(begin_key, end_key, ctx->offset, ctx->limit, table_data_handler);
// }
}
}

Expand Down
30 changes: 18 additions & 12 deletions src/common/constant.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,27 @@ class Constant {
static const int kBvarMaxDumpMultiDimensionMetricNumberDefault = 100;

// Define Store data column family.
// table and vector_index's vector use "default"
inline static const std::string kStoreDataCF = "default";
// Define Store meta column family.
inline static const std::string kStoreMetaCF = "meta";
// Txn column families.
inline static const std::string kStoreTxnDataCF = "data";
inline static const std::string kStoreTxnLockCF = "lock";
inline static const std::string kStoreTxnWriteCF = "lock";
// transaction cf names
inline static const std::string kTxnDataCF = "data";
inline static const std::string kTxnLockCF = "lock";
inline static const std::string kTxnWriteCF = "write";

// vector cf names
inline static const std::string kVectorScalarCF = "vector_scalar";
inline static const std::string kVectorTableCF = "vector_table";

// cf seq id
static constexpr uint32_t kStoreDataCfId = 0;
static constexpr uint32_t kVectorScalarCfId = 1;
static constexpr uint32_t kVectorTableCfId = 2;
static constexpr uint32_t kTxnDataCfId = 3;
static constexpr uint32_t kTxnLockCfId = 4;
static constexpr uint32_t kTxnWriteCfId = 5;

// Define store meta prefix.
inline static const std::string kStoreRegionMetaPrefix = "META_REGION";
// Define store raft prefix.
Expand Down Expand Up @@ -146,14 +160,6 @@ class Constant {

static const int32_t kRaftLogFallBehindThreshold = 1000;

// transaction cf names
inline static const std::string kTxnDataCF = "data";
inline static const std::string kTxnLockCF = "lock";
inline static const std::string kTxnWriteCF = "write";
static constexpr uint32_t kTxnDataCfId = 0;
static constexpr uint32_t kTxnLockCfId = 1;
static constexpr uint32_t kTxnWriteCfId = 2;

static constexpr uint64_t kLockVer = INT64_MAX;
static constexpr uint64_t kMaxVer = INT64_MAX;

Expand Down
10 changes: 7 additions & 3 deletions src/common/helper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,10 @@

namespace dingodb {

const std::map<std::string, uint32_t> kTxnCf2Id = {{Constant::kTxnDataCF, Constant::kTxnDataCfId},
{Constant::kTxnLockCF, Constant::kTxnLockCfId},
{Constant::kTxnWriteCF, Constant::kTxnWriteCfId}};
const std::map<std::string, uint32_t> kTxnCf2Id = {
{Constant::kStoreDataCF, Constant::kStoreDataCfId}, {Constant::kVectorScalarCF, Constant::kVectorScalarCfId},
{Constant::kVectorTableCF, Constant::kVectorTableCfId}, {Constant::kTxnDataCF, Constant::kTxnDataCfId},
{Constant::kTxnLockCF, Constant::kTxnLockCfId}, {Constant::kTxnWriteCF, Constant::kTxnWriteCfId}};

using Errno = pb::error::Errno;
using PbError = pb::error::Error;
Expand Down Expand Up @@ -742,6 +743,9 @@ std::vector<std::string> Helper::GenMvccCfVector() {
std::vector<std::string> cfs;

// the order of the cf is important, and can not be changed
cfs.push_back(Constant::kStoreDataCF);
cfs.push_back(Constant::kVectorScalarCF);
cfs.push_back(Constant::kVectorTableCF);
cfs.push_back(Constant::kTxnDataCF);
cfs.push_back(Constant::kTxnLockCF);
cfs.push_back(Constant::kTxnWriteCF);
Expand Down
21 changes: 12 additions & 9 deletions src/engine/raft_store_engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -390,52 +390,55 @@ std::shared_ptr<Engine::Reader> RaftStoreEngine::NewReader(const std::string& cf

butil::Status RaftStoreEngine::VectorReader::VectorBatchSearch(
std::shared_ptr<VectorReader::Context> ctx, std::vector<pb::index::VectorWithDistanceResult>& results) {
auto vector_reader = dingodb::VectorReader::New(reader_);
auto vector_reader = dingodb::VectorReader::New(vector_data_reader_, vector_scalar_reader_, vector_table_reader_);
return vector_reader->VectorBatchSearch(ctx, results);
}

butil::Status RaftStoreEngine::VectorReader::VectorBatchQuery(std::shared_ptr<VectorReader::Context> ctx,
std::vector<pb::common::VectorWithId>& vector_with_ids) {
auto vector_reader = dingodb::VectorReader::New(reader_);
auto vector_reader = dingodb::VectorReader::New(vector_data_reader_, vector_scalar_reader_, vector_table_reader_);
return vector_reader->VectorBatchQuery(ctx, vector_with_ids);
}

butil::Status RaftStoreEngine::VectorReader::VectorGetBorderId(const pb::common::Range& region_range, bool get_min,
int64_t& vector_id) {
auto vector_reader = dingodb::VectorReader::New(reader_);
auto vector_reader = dingodb::VectorReader::New(vector_data_reader_, vector_scalar_reader_, vector_table_reader_);
return vector_reader->VectorGetBorderId(region_range, get_min, vector_id);
}

butil::Status RaftStoreEngine::VectorReader::VectorScanQuery(std::shared_ptr<VectorReader::Context> ctx,
std::vector<pb::common::VectorWithId>& vector_with_ids) {
auto vector_reader = dingodb::VectorReader::New(reader_);
auto vector_reader = dingodb::VectorReader::New(vector_data_reader_, vector_scalar_reader_, vector_table_reader_);
return vector_reader->VectorScanQuery(ctx, vector_with_ids);
}

butil::Status RaftStoreEngine::VectorReader::VectorGetRegionMetrics(int64_t region_id,
const pb::common::Range& region_range,
VectorIndexWrapperPtr vector_index,
pb::common::VectorIndexMetrics& region_metrics) {
auto vector_reader = dingodb::VectorReader::New(reader_);
auto vector_reader = dingodb::VectorReader::New(vector_data_reader_, vector_scalar_reader_, vector_table_reader_);
return vector_reader->VectorGetRegionMetrics(region_id, region_range, vector_index, region_metrics);
}

butil::Status RaftStoreEngine::VectorReader::VectorCount(const pb::common::Range& range, int64_t& count) {
auto vector_reader = dingodb::VectorReader::New(reader_);
auto vector_reader = dingodb::VectorReader::New(vector_data_reader_, vector_scalar_reader_, vector_table_reader_);
return vector_reader->VectorCount(range, count);
}

butil::Status RaftStoreEngine::VectorReader::VectorBatchSearchDebug(
std::shared_ptr<VectorReader::Context> ctx, // NOLINT
std::vector<pb::index::VectorWithDistanceResult>& results, int64_t& deserialization_id_time_us,
int64_t& scan_scalar_time_us, int64_t& search_time_us) {
auto vector_reader = dingodb::VectorReader::New(reader_);
auto vector_reader = dingodb::VectorReader::New(vector_data_reader_, vector_scalar_reader_, vector_table_reader_);
return vector_reader->VectorBatchSearchDebug(ctx, results, deserialization_id_time_us, scan_scalar_time_us,
search_time_us);
}

std::shared_ptr<Engine::VectorReader> RaftStoreEngine::NewVectorReader(const std::string& cf_name) {
return std::make_shared<RaftStoreEngine::VectorReader>(engine_->NewReader(cf_name));
std::shared_ptr<Engine::VectorReader> RaftStoreEngine::NewVectorReader(const std::string& /*cf_name*/) {
// return std::make_shared<RaftStoreEngine::VectorReader>(engine_->NewReader(cf_name));
return std::make_shared<RaftStoreEngine::VectorReader>(engine_->NewReader(Constant::kStoreDataCF),
engine_->NewReader(Constant::kVectorScalarCF),
engine_->NewReader(Constant::kVectorTableCF));
}

std::shared_ptr<Engine::TxnReader> RaftStoreEngine::NewTxnReader() {
Expand Down
13 changes: 11 additions & 2 deletions src/engine/raft_store_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,13 @@ class RaftStoreEngine : public Engine, public RaftControlAble {
// Vector reader
class VectorReader : public Engine::VectorReader {
public:
VectorReader(std::shared_ptr<RawEngine::Reader> reader) : reader_(reader) {}
// VectorReader(std::shared_ptr<RawEngine::Reader> reader) : reader_(reader) {}
VectorReader(std::shared_ptr<RawEngine::Reader> vector_data_reader,
std::shared_ptr<RawEngine::Reader> vector_scalar_reader,
std::shared_ptr<RawEngine::Reader> vector_table_reader)
: vector_data_reader_(vector_data_reader),
vector_scalar_reader_(vector_scalar_reader),
vector_table_reader_(vector_table_reader) {}

butil::Status VectorBatchSearch(std::shared_ptr<VectorReader::Context> ctx, // NOLINT
std::vector<pb::index::VectorWithDistanceResult>& results) override; // NOLINT
Expand All @@ -145,7 +151,10 @@ class RaftStoreEngine : public Engine, public RaftControlAble {
int64_t& search_time_us) override; // NOLINT

private:
std::shared_ptr<RawEngine::Reader> reader_;
// std::shared_ptr<RawEngine::Reader> reader_;
std::shared_ptr<RawEngine::Reader> vector_data_reader_;
std::shared_ptr<RawEngine::Reader> vector_scalar_reader_;
std::shared_ptr<RawEngine::Reader> vector_table_reader_;
};

class TxnReader : public Engine::TxnReader {
Expand Down
Loading

0 comments on commit 75ea226

Please sign in to comment.