Skip to content

Commit

Permalink
[fix][store] Fixup kown issues
Browse files Browse the repository at this point in the history
  • Loading branch information
visualYJD authored and ketor committed Jul 15, 2024
1 parent d266e3e commit d8e0e16
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 60 deletions.
15 changes: 7 additions & 8 deletions src/engine/mono_store_engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,21 @@
namespace dingodb {

MonoStoreEngine::MonoStoreEngine(RawEnginePtr rocks_raw_engine, RawEnginePtr bdb_raw_engine,
EventListenerCollectionPtr listeners, mvcc::TsProviderPtr ts_provider)
EventListenerCollectionPtr listeners, mvcc::TsProviderPtr ts_provider,
std::shared_ptr<StoreMetaManager> store_meta_manager,
std::shared_ptr<StoreMetricsManager> store_metrics_manager)
: rocks_raw_engine_(rocks_raw_engine),
bdb_raw_engine_(bdb_raw_engine),
listeners_(listeners),
ts_provider_(ts_provider) {}
ts_provider_(ts_provider),
store_meta_manager_(store_meta_manager),
store_metrics_manager_(store_metrics_manager) {}

bool MonoStoreEngine::Init([[maybe_unused]] std::shared_ptr<Config> config) { return true; }
std::string MonoStoreEngine::GetName() {
return pb::common::StorageEngine_Name(pb::common::StorageEngine::STORE_ENG_MONO_STORE);
}
void MonoStoreEngine::SetStoreMetaManager(std::shared_ptr<StoreMetaManager> store_meta_manager) {
store_meta_manager_ = store_meta_manager;
}
void MonoStoreEngine::SetStoreMetricsManager(std::shared_ptr<StoreMetricsManager> store_metrics_manager) {
store_metrics_manager_ = store_metrics_manager;
}

std::shared_ptr<StoreMetaManager> MonoStoreEngine::GetStoreMetaManager() { return store_meta_manager_; }
std::shared_ptr<StoreMetricsManager> MonoStoreEngine::GetStoreMetricsManager() { return store_metrics_manager_; }
MonoStoreEnginePtr MonoStoreEngine::GetSelfPtr() {
Expand Down
12 changes: 7 additions & 5 deletions src/engine/mono_store_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ using MonoStoreEnginePtr = std::shared_ptr<MonoStoreEngine>;
class MonoStoreEngine : public Engine {
public:
MonoStoreEngine(RawEnginePtr rocks_raw_engine, RawEnginePtr bdb_raw_engine, EventListenerCollectionPtr listeners,
mvcc::TsProviderPtr ts_provider);
mvcc::TsProviderPtr ts_provider, std::shared_ptr<StoreMetaManager> store_meta_manager,
std::shared_ptr<StoreMetricsManager> store_metrics_manager);
~MonoStoreEngine() override = default;

MonoStoreEngine(const MonoStoreEngine& rhs) = delete;
Expand All @@ -45,8 +46,10 @@ class MonoStoreEngine : public Engine {
MonoStoreEnginePtr GetSelfPtr();

static MonoStoreEnginePtr New(RawEnginePtr rocks_raw_engine, RawEnginePtr bdb_raw_engine,
EventListenerCollectionPtr listeners, mvcc::TsProviderPtr ts_provider) {
return std::make_shared<MonoStoreEngine>(rocks_raw_engine, bdb_raw_engine, listeners, ts_provider);
EventListenerCollectionPtr listeners, mvcc::TsProviderPtr ts_provider,
std::shared_ptr<StoreMetaManager> store_meta_manager,
std::shared_ptr<StoreMetricsManager> store_metrics_manager) {
return std::make_shared<MonoStoreEngine>(rocks_raw_engine, bdb_raw_engine, listeners, ts_provider, store_meta_manager, store_metrics_manager);
}

bool Init(std::shared_ptr<Config> config) override;
Expand All @@ -65,8 +68,7 @@ class MonoStoreEngine : public Engine {
WriteCbFunc write_cb) override;
int DispatchEvent(dingodb::EventType, std::shared_ptr<dingodb::Event> event);

void SetStoreMetaManager(std::shared_ptr<StoreMetaManager> store_meta_manager);
void SetStoreMetricsManager(std::shared_ptr<StoreMetricsManager> store_metrics_manager);

std::shared_ptr<StoreMetaManager> GetStoreMetaManager();
std::shared_ptr<StoreMetricsManager> GetStoreMetricsManager();

Expand Down
75 changes: 49 additions & 26 deletions src/server/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1308,13 +1308,8 @@ int main(int argc, char *argv[]) {
DINGO_LOG(ERROR) << "InitLogStorageManager failed!";
return -1;
}
if (!dingo_server.InitEngine()) {
DINGO_LOG(ERROR) << "InitEngine failed!";
return -1;
}

if (!dingo_server.InitStorage()) {
DINGO_LOG(ERROR) << "InitStorage failed!";
if (!dingo_server.InitRocksRawEngine()) {
DINGO_LOG(ERROR) << "InitMetaEngine failed!";
return -1;
}
if (!dingo_server.InitStoreMetaManager()) {
Expand All @@ -1325,6 +1320,16 @@ int main(int argc, char *argv[]) {
DINGO_LOG(ERROR) << "InitStoreMetricsManager failed!";
return -1;
}
if (!dingo_server.InitRocksRawEngine()) {
DINGO_LOG(ERROR) << "InitRocksRawEngine failed!";
return -1;
}

if (!dingo_server.InitStorage()) {
DINGO_LOG(ERROR) << "InitStorage failed!";
return -1;
}

if (!dingo_server.InitStoreController()) {
DINGO_LOG(ERROR) << "InitStoreController failed!";
return -1;
Expand Down Expand Up @@ -1376,8 +1381,6 @@ int main(int argc, char *argv[]) {
return -1;
}
DINGO_LOG(INFO) << "Raft server is running on " << raft_server.listen_address();
dingo_server.GetMonoStoreEngine()->SetStoreMetaManager(dingo_server.GetStoreMetaManager());
dingo_server.GetMonoStoreEngine()->SetStoreMetricsManager(dingo_server.GetStoreMetricsManager());
} else if (role == dingodb::pb::common::ClusterRole::INDEX) {
// setup bthread worker thread num into bthread::FLAGS_bthread_concurrency
InitBthreadWorkerThreadNum(config);
Expand Down Expand Up @@ -1451,42 +1454,55 @@ int main(int argc, char *argv[]) {
DINGO_LOG(ERROR) << "InitLogStorageManager failed!";
return -1;
}
if (!dingo_server.InitEngine()) {
DINGO_LOG(ERROR) << "InitEngine failed!";
return -1;
}

if (!dingo_server.InitStorage()) {
DINGO_LOG(ERROR) << "InitStorage failed!";
if (!dingo_server.InitRocksRawEngine()) {
DINGO_LOG(ERROR) << "InitRocksRawEngine failed!";
return -1;
}

// region will do recover in InitStoreMetaManager, and if leader is elected, then it need vector index manager
// workers to load index, so InitVectorIndexManager must be called before InitStoreMetaManager
if (!dingo_server.InitVectorIndexManager()) {
DINGO_LOG(ERROR) << "InitVectorIndexManager failed!";
return -1;
}

index_service.SetVectorIndexManager(dingo_server.GetVectorIndexManager());
if (!dingo_server.InitStoreMetaManager()) {
DINGO_LOG(ERROR) << "InitStoreMetaManager failed!";
return -1;
}

if (!dingo_server.InitStoreMetricsManager()) {
DINGO_LOG(ERROR) << "InitStoreMetricsManager failed!";
return -1;
}

if (!dingo_server.InitEngine()) {
DINGO_LOG(ERROR) << "InitEngine failed!";
return -1;
}

if (!dingo_server.InitStorage()) {
DINGO_LOG(ERROR) << "InitStorage failed!";
return -1;
}

if (!dingo_server.InitStoreController()) {
DINGO_LOG(ERROR) << "InitStoreController failed!";
return -1;
}

if (!dingo_server.InitRegionCommandManager()) {
DINGO_LOG(ERROR) << "InitRegionCommandManager failed!";
return -1;
}

if (!dingo_server.InitRegionController()) {
DINGO_LOG(ERROR) << "InitRegionController failed!";
return -1;
}

if (!dingo_server.InitPreSplitChecker()) {
DINGO_LOG(ERROR) << "InitPreSplitChecker failed!";
return -1;
Expand Down Expand Up @@ -1532,8 +1548,6 @@ int main(int argc, char *argv[]) {
return -1;
}
DINGO_LOG(INFO) << "Raft server is running on " << raft_server.listen_address();
dingo_server.GetMonoStoreEngine()->SetStoreMetaManager(dingo_server.GetStoreMetaManager());
dingo_server.GetMonoStoreEngine()->SetStoreMetricsManager(dingo_server.GetStoreMetricsManager());
} else if (role == dingodb::pb::common::ClusterRole::DOCUMENT) {
// setup bthread worker thread num into bthread::FLAGS_bthread_concurrency
InitBthreadWorkerThreadNum(config);
Expand Down Expand Up @@ -1608,13 +1622,9 @@ int main(int argc, char *argv[]) {
DINGO_LOG(ERROR) << "InitLogStorageManager failed!";
return -1;
}
if (!dingo_server.InitEngine()) {
DINGO_LOG(ERROR) << "InitEngine failed!";
return -1;
}

if (!dingo_server.InitStorage()) {
DINGO_LOG(ERROR) << "InitStorage failed!";
if (!dingo_server.InitRocksRawEngine()) {
DINGO_LOG(ERROR) << "InitRocksRawEngine failed!";
return -1;
}

Expand All @@ -1624,27 +1634,43 @@ int main(int argc, char *argv[]) {
DINGO_LOG(ERROR) << "InitDocumentIndexManager failed!";
return -1;
}

document_service.SetDocumentIndexManager(dingo_server.GetDocumentIndexManager());
if (!dingo_server.InitStoreMetaManager()) {
DINGO_LOG(ERROR) << "InitStoreMetaManager failed!";
return -1;
}

if (!dingo_server.InitStoreMetricsManager()) {
DINGO_LOG(ERROR) << "InitStoreMetricsManager failed!";
return -1;
}

if (!dingo_server.InitEngine()) {
DINGO_LOG(ERROR) << "InitEngine failed!";
return -1;
}

if (!dingo_server.InitStorage()) {
DINGO_LOG(ERROR) << "InitStorage failed!";
return -1;
}

if (!dingo_server.InitStoreController()) {
DINGO_LOG(ERROR) << "InitStoreController failed!";
return -1;
}

if (!dingo_server.InitRegionCommandManager()) {
DINGO_LOG(ERROR) << "InitRegionCommandManager failed!";
return -1;
}

if (!dingo_server.InitRegionController()) {
DINGO_LOG(ERROR) << "InitRegionController failed!";
return -1;
}

if (!dingo_server.InitPreSplitChecker()) {
DINGO_LOG(ERROR) << "InitPreSplitChecker failed!";
return -1;
Expand Down Expand Up @@ -1690,8 +1716,6 @@ int main(int argc, char *argv[]) {
return -1;
}
DINGO_LOG(INFO) << "Raft server is running on " << raft_server.listen_address();
dingo_server.GetMonoStoreEngine()->SetStoreMetaManager(dingo_server.GetStoreMetaManager());
dingo_server.GetMonoStoreEngine()->SetStoreMetricsManager(dingo_server.GetStoreMetricsManager());

} else {
DINGO_LOG(ERROR) << "Invalid server role[" + dingodb::GetRoleName() + "]";
Expand All @@ -1702,7 +1726,6 @@ int main(int argc, char *argv[]) {
DINGO_LOG(ERROR) << "InitHeartbeat failed!";
return -1;
}

if (!dingo_server.Recover()) {
DINGO_LOG(ERROR) << "Recover failed!";
return -1;
Expand Down
33 changes: 19 additions & 14 deletions src/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -250,27 +250,31 @@ bool Server::InitDirectory() {
return true;
}

bool Server::InitEngine() {
bool Server::InitRocksRawEngine() {
auto config = ConfigManager::GetInstance().GetRoleConfig();

#ifdef ENABLE_XDPROCKS
// init xdprocks
auto rocks_raw_engine = std::make_shared<XDPRocksRawEngine>();
if (!rocks_raw_engine->Init(config, Helper::GetColumnFamilyNamesByRole())) {
rocks_raw_engine_ = std::make_shared<XDPRocksRawEngine>();
if (!rocks_raw_engine_->Init(config, Helper::GetColumnFamilyNamesByRole())) {
DINGO_LOG(ERROR) << "Init XDPRocksRawEngine Failed with Config[" << config->ToString();
return false;
}
#else
// init rocksdb
auto rocks_raw_engine = std::make_shared<RocksRawEngine>();
if (!rocks_raw_engine->Init(config, Helper::GetColumnFamilyNamesByRole())) {
rocks_raw_engine_ = std::make_shared<RocksRawEngine>();
if (!rocks_raw_engine_->Init(config, Helper::GetColumnFamilyNamesByRole())) {
DINGO_LOG(ERROR) << "Init RocksRawEngine Failed with Config[" << config->ToString();
return false;
}
#endif

meta_reader_ = std::make_shared<MetaReader>(rocks_raw_engine);
meta_writer_ = std::make_shared<MetaWriter>(rocks_raw_engine);
meta_reader_ = std::make_shared<MetaReader>(rocks_raw_engine_);
meta_writer_ = std::make_shared<MetaWriter>(rocks_raw_engine_);
return true;
}
bool Server::InitEngine() {
auto config = ConfigManager::GetInstance().GetRoleConfig();

// init bdb
auto bdb_raw_engine = std::make_shared<BdbRawEngine>();
Expand All @@ -283,8 +287,8 @@ bool Server::InitEngine() {
if (GetRole() == pb::common::ClusterRole::COORDINATOR) {
// 1.init CoordinatorController
coordinator_control_ =
std::make_shared<CoordinatorControl>(std::make_shared<MetaReader>(rocks_raw_engine),
std::make_shared<MetaWriter>(rocks_raw_engine), rocks_raw_engine);
std::make_shared<CoordinatorControl>(std::make_shared<MetaReader>(rocks_raw_engine_),
std::make_shared<MetaWriter>(rocks_raw_engine_), rocks_raw_engine_);

if (!coordinator_control_->Recover()) {
DINGO_LOG(ERROR) << "coordinator_control_->Recover Failed";
Expand All @@ -296,13 +300,13 @@ bool Server::InitEngine() {
}

// init raft_meta_engine
raft_engine_ = RaftStoreEngine::New(rocks_raw_engine, bdb_raw_engine, nullptr);
raft_engine_ = RaftStoreEngine::New(rocks_raw_engine_, bdb_raw_engine, nullptr);
// set raft_meta_engine to coordinator_control
coordinator_control_->SetKvEngine(raft_engine_);

// 2.init KvController
kv_control_ = std::make_shared<KvControl>(std::make_shared<MetaReader>(rocks_raw_engine),
std::make_shared<MetaWriter>(rocks_raw_engine), rocks_raw_engine);
kv_control_ = std::make_shared<KvControl>(std::make_shared<MetaReader>(rocks_raw_engine_),
std::make_shared<MetaWriter>(rocks_raw_engine_), rocks_raw_engine_);

if (!kv_control_->Recover()) {
DINGO_LOG(ERROR) << "kv_control_->Recover Failed";
Expand Down Expand Up @@ -346,14 +350,15 @@ bool Server::InitEngine() {

} else {
auto listener_factory = std::make_shared<StoreSmEventListenerFactory>();
mono_engine_ = MonoStoreEngine::New(rocks_raw_engine, bdb_raw_engine, listener_factory->Build(), GetTsProvider());
mono_engine_ = MonoStoreEngine::New(rocks_raw_engine_, bdb_raw_engine, listener_factory->Build(), GetTsProvider(),
store_meta_manager_, store_metrics_manager_);
if (!mono_engine_->Init(config)) {
DINGO_LOG(ERROR) << "Init RocksEngine failed with Config[" << config->ToString() << "]";
return false;
}
DINGO_LOG(INFO) << "Init rocks_engine";

raft_engine_ = RaftStoreEngine::New(rocks_raw_engine, bdb_raw_engine, GetTsProvider());
raft_engine_ = RaftStoreEngine::New(rocks_raw_engine_, bdb_raw_engine, GetTsProvider());
DINGO_LOG(INFO) << "Init raft_store_engine";
if (!raft_engine_->Init(config)) {
DINGO_LOG(ERROR) << "Init RaftStoreEngine failed with Config[" << config->ToString() << "]";
Expand Down
7 changes: 6 additions & 1 deletion src/server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@
#include "coordinator/tso_control.h"
#include "crontab/crontab.h"
#include "document/document_index_manager.h"
#include "engine/mono_store_engine.h"
#include "engine/raw_engine.h"
#include "engine/storage.h"
#include "engine/mono_store_engine.h"
#include "engine/rocks_raw_engine.h"
#include "log/log_storage_manager.h"
#include "meta/meta_reader.h"
#include "meta/store_meta_manager.h"
Expand Down Expand Up @@ -63,6 +64,9 @@ class Server {
// Init directory
bool InitDirectory();

// Init RocksRawEngine
bool InitRocksRawEngine();

// Init storage engines;
bool InitEngine();

Expand Down Expand Up @@ -278,6 +282,7 @@ class Server {

std::shared_ptr<Engine> mono_engine_;

std::shared_ptr<RocksRawEngine> rocks_raw_engine_;
// Meta reader
std::shared_ptr<MetaReader> meta_reader_;
// Meta writer
Expand Down
10 changes: 4 additions & 6 deletions test/unit_test/txn/test_txn_pessimistic_lock.cc
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,6 @@ class TxnPessimisticLockTest : public testing::Test {

ASSERT_TRUE(ts_provider->Init());

mono_engine = std::make_shared<MonoStoreEngine>(engine, raw_bdb_engine, listener_factory->Build(), ts_provider);
ASSERT_TRUE(mono_engine != nullptr);
ASSERT_TRUE(mono_engine->Init(config));

auto meta_reader = std::make_shared<MetaReader>(engine);
auto meta_writer = std::make_shared<MetaWriter>(engine);
auto store_meta_manager = std::make_shared<StoreMetaManager>(meta_reader, meta_writer);
Expand All @@ -150,8 +146,10 @@ class TxnPessimisticLockTest : public testing::Test {
auto store_metrics_manager = std::make_shared<StoreMetricsManager>(meta_reader, meta_writer);
ASSERT_TRUE(store_metrics_manager->Init());

mono_engine->SetStoreMetaManager(store_meta_manager);
mono_engine->SetStoreMetricsManager(store_metrics_manager);
mono_engine = std::make_shared<MonoStoreEngine>(engine, raw_bdb_engine, listener_factory->Build(), ts_provider,
store_meta_manager, store_metrics_manager);
ASSERT_TRUE(mono_engine != nullptr);
ASSERT_TRUE(mono_engine->Init(config));

InitRecord();
}
Expand Down

0 comments on commit d8e0e16

Please sign in to comment.