Skip to content

Commit

Permalink
Dynamic KV cache allocation (#1364)
Browse files Browse the repository at this point in the history
Dynamic KV cache allocation
Ticket: CVS-158409

---------

Co-authored-by: Ilya Lavrenov <ilya.lavrenov@intel.com>
  • Loading branch information
popovaan and ilya-lavrenov authored Dec 24, 2024
1 parent 0da48cd commit 021d880
Show file tree
Hide file tree
Showing 17 changed files with 480 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,10 @@ int main(int argc, char* argv[]) try {

std::string device = "CPU";

ov::genai::SchedulerConfig scheduler_config;
scheduler_config.cache_size = 5;

ov::genai::LLMPipeline pipe(
model_path,
device,
ov::genai::prompt_lookup(true),
ov::genai::scheduler_config(scheduler_config));
ov::genai::prompt_lookup(true));

auto streamer = [](std::string subword) {
std::cout << subword << std::flush;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,10 @@ int main(int argc, char* argv[]) try {
// Please, set device for main model in `LLMPipeline` constructor and in in `ov::genai::draft_model` for draft.
std::string main_device = "CPU", draft_device = "CPU";

ov::genai::SchedulerConfig scheduler_config;
scheduler_config.cache_size = 5;

ov::genai::LLMPipeline pipe(
main_model_path,
main_device,
ov::genai::draft_model(draft_model_path, draft_device),
ov::genai::scheduler_config(scheduler_config));
ov::genai::draft_model(draft_model_path, draft_device));

auto streamer = [](std::string subword) {
std::cout << subword << std::flush;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,8 @@ def main():
args = parser.parse_args()

device = 'CPU'
scheduler_config = openvino_genai.SchedulerConfig()
# cache params
scheduler_config.cache_size = 2

pipe = openvino_genai.LLMPipeline(args.model_dir, device, scheduler_config=scheduler_config, prompt_lookup=True)
pipe = openvino_genai.LLMPipeline(args.model_dir, device, prompt_lookup=True)

config = openvino_genai.GenerationConfig()
config.max_new_tokens = 100
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,9 @@ def main():
main_device = 'CPU' # GPU can be used as well
draft_device = 'CPU'

scheduler_config = openvino_genai.SchedulerConfig()
# cache params
scheduler_config.cache_size = 2

draft_model = openvino_genai.draft_model(args.draft_model_dir, draft_device)

pipe = openvino_genai.LLMPipeline(args.model_dir, main_device, scheduler_config=scheduler_config, draft_model=draft_model)
pipe = openvino_genai.LLMPipeline(args.model_dir, main_device, draft_model=draft_model)

config = openvino_genai.GenerationConfig()
config.max_new_tokens = 100
Expand Down
51 changes: 47 additions & 4 deletions src/cpp/src/block_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,21 +205,42 @@ class BlockAllocator {
* Blocks returned will be vectors with this size, each vector entry to be associated with a separate layer's KV cache.
*/
BlockAllocator(size_t num_blocks, bool enable_prefix_caching, size_t num_layers = 1) :
m_free_blocks_num(num_layers, num_blocks), m_total_num_blocks(num_blocks), m_num_layers(num_layers), m_enable_prefix_caching(enable_prefix_caching), m_overwriteable_blocks(num_layers) {
m_total_num_blocks(num_blocks), m_num_layers(num_layers), m_enable_prefix_caching(enable_prefix_caching), m_overwriteable_blocks(num_layers) {
OPENVINO_ASSERT(num_layers != 0, "num_layers must be non-zero");
m_free_blocks.resize(m_num_layers);
for (auto& per_layer_block_list : m_free_blocks) {
for (int block_id = 0; block_id < m_total_num_blocks; ++block_id) {
per_layer_block_list.push_back(std::make_shared<KVCacheBlock>(block_id));
if (num_blocks > 0) {
m_free_blocks_num = std::vector<size_t>(num_layers, num_blocks);
for (auto& per_layer_block_list : m_free_blocks) {
for (int block_id = 0; block_id < m_total_num_blocks; ++block_id) {
per_layer_block_list.push_back(std::make_shared<KVCacheBlock>(block_id));
}
}
}
else {
m_free_blocks_num = std::vector<size_t>(m_num_layers, 0);
}
}

~BlockAllocator() {
// sanity check to validate that all blocks are freed
// OPENVINO_ASSERT(m_total_num_blocks == m_free_blocks.size());
}

void increase_kv_blocks_number(size_t new_kv_blocks_count) {
OPENVINO_ASSERT(new_kv_blocks_count > m_total_num_blocks, "New blocks number should be more than previous blocks number.");
size_t added_blocks = new_kv_blocks_count - m_total_num_blocks;
for (auto idx = 0; idx < m_free_blocks_num.size(); idx++) {
m_free_blocks_num[idx] += added_blocks;
}
for (auto& per_layer_block_list : m_free_blocks) {
for (int block_id = m_total_num_blocks; block_id < new_kv_blocks_count; ++block_id) {
per_layer_block_list.push_back(std::make_shared<KVCacheBlock>(block_id));
}
}
m_total_num_blocks = new_kv_blocks_count;
}


/**
* Returns the number of free blocks for a given layer.
* @param layer_idx Index of the layer.
Expand Down Expand Up @@ -459,6 +480,13 @@ class BlockAllocator {
for (size_t layer_idx = 0; layer_idx < m_num_layers; layer_idx++) sum += num_free_blocks(layer_idx);
return static_cast<float>(m_num_layers * m_total_num_blocks - sum) / (m_num_layers * m_total_num_blocks) * 100;
}

/**
* @return The total number of KV blocks .
*/
size_t get_total_number_of_kv_blocks() const {
return m_total_num_blocks;
}
};

/**
Expand Down Expand Up @@ -713,6 +741,21 @@ class BlockManager {
return m_allocator.get_used_percentage();
}

/**
* Increases the number of KV blocks.
* @param num_blocks The new number of KV-blocks.
*/
void increase_kv_blocks_number(size_t num_blocks) {
m_allocator.increase_kv_blocks_number(num_blocks);
}

/**
* @return The total number of KV blocks .
*/
size_t get_total_number_of_kv_blocks() const {
return m_allocator.get_total_number_of_kv_blocks();
}

/**
* @brief Forks a sequence, establishing a new sequence from an existing one, reusing
* currently allocated blocks of the existing sequence.
Expand Down
124 changes: 106 additions & 18 deletions src/cpp/src/cache_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,38 +15,118 @@ class CacheManager {
DeviceConfig m_device_config;
std::vector<ov::Tensor> m_key_cache;
std::vector<ov::Tensor> m_value_cache;
size_t m_num_allocated_kv_blocks = 0;
ov::Core m_core;
ov::InferRequest m_request;

ov::Shape set_first_dim_and_make_static(const ov::PartialShape& shape, size_t dim) {
ov::PartialShape res_shape = shape;
res_shape[0] = dim;
OPENVINO_ASSERT(res_shape.is_static());
return res_shape.to_shape();
}

void update_request_tensor(size_t decoder_layer_id) {
m_request.set_tensor(std::string("key_cache.") + std::to_string(decoder_layer_id), m_key_cache[decoder_layer_id]);
m_request.set_tensor(std::string("value_cache.") + std::to_string(decoder_layer_id), m_value_cache[decoder_layer_id]);
}

public:
explicit CacheManager(const DeviceConfig &device_config, ov::Core core) :
explicit CacheManager(const DeviceConfig &device_config, ov::InferRequest request, ov::Core core) :
m_device_config(device_config),
m_request(request),
m_core(core) {
m_key_cache.reserve(m_device_config.get_num_layers());
m_value_cache.reserve(m_device_config.get_num_layers());
}

void allocate_cache_if_needed(size_t num_kv_blocks) {
if (m_num_allocated_kv_blocks >= num_kv_blocks) {
return;
}
OPENVINO_ASSERT(m_key_cache.size() == m_value_cache.size());
m_num_allocated_kv_blocks = num_kv_blocks;
ov::Shape value_cache_shape = set_first_dim_and_make_static(m_device_config.get_value_cache_shape(), num_kv_blocks);
ov::Shape key_cache_shape = set_first_dim_and_make_static(m_device_config.get_key_cache_shape(), num_kv_blocks);

const std::string device_name = m_device_config.get_device();

ov::Coordinate start_key{0,0,0,0};
ov::Coordinate start_value{0,0,0,0};

const std::string device_name = device_config.get_device();
if (device_name.find("GPU") == std::string::npos) {// Allocate KV caches
for (size_t decoder_layer_id = 0; decoder_layer_id < m_device_config.get_num_layers(); ++decoder_layer_id) {
ov::Tensor key_cache(device_config.get_cache_precision(), device_config.get_key_cache_shape());
ov::Tensor value_cache(device_config.get_cache_precision(), device_config.get_value_cache_shape());
ov::Tensor key_cache(m_device_config.get_cache_precision(), key_cache_shape);
ov::Tensor value_cache(m_device_config.get_cache_precision(), value_cache_shape);

auto key_cache_roi_end = static_cast<unsigned char*>(key_cache.data());
auto value_cache_roi_end = static_cast<unsigned char*>(value_cache.data());
size_t key_roi_size_byte = 0;
size_t value_roi_size_byte = 0;

if (m_key_cache.size() > decoder_layer_id) {
ov::Coordinate end_key = m_key_cache[decoder_layer_id].get_shape();
ov::Coordinate end_value = m_value_cache[decoder_layer_id].get_shape();

key_roi_size_byte = m_key_cache[decoder_layer_id].get_byte_size();
value_roi_size_byte = m_value_cache[decoder_layer_id].get_byte_size();
key_cache_roi_end = static_cast<unsigned char*>(key_cache.data()) + key_roi_size_byte;
value_cache_roi_end = static_cast<unsigned char*>(value_cache.data()) + value_roi_size_byte;

// copy current cache data
ov::Tensor dst_key_roi(key_cache, start_key, end_key);
ov::Tensor dst_value_roi(value_cache, start_value, end_value);

m_key_cache[decoder_layer_id].copy_to(dst_key_roi);
m_value_cache[decoder_layer_id].copy_to(dst_value_roi);

}

// force allocation
std::memset(key_cache.data(), 0, key_cache.get_byte_size());
std::memset(value_cache.data(), 0, value_cache.get_byte_size());
// Some optimizations like AVX2, AVX512, AMX require a minimal shape and
// perform multiplying by zero on the excess data. Uninitialized tensor data contain NAN's,
// so NAN * 0 returns non-zero invalid data.
// So we need to set zeros to all newly allocated tensors data.
std::memset(key_cache_roi_end, 0, key_cache.get_byte_size() - key_roi_size_byte);
std::memset(value_cache_roi_end, 0, value_cache.get_byte_size() - value_roi_size_byte);

// set new cache tensors
if (m_key_cache.size() > decoder_layer_id) {
m_key_cache[decoder_layer_id] = key_cache;
m_value_cache[decoder_layer_id] = value_cache;
}
else {
m_key_cache.emplace_back(key_cache);
m_value_cache.emplace_back(value_cache);
}

m_key_cache.emplace_back(key_cache);
m_value_cache.emplace_back(value_cache);
update_request_tensor(decoder_layer_id);
}
} else {
auto remote_context = m_core.get_default_context(device_name);
for (size_t decoder_layer_id = 0; decoder_layer_id < m_device_config.get_num_layers(); ++decoder_layer_id) {
ov::Tensor key_cache = remote_context.create_tensor(device_config.get_cache_precision(),
device_config.get_key_cache_shape());
ov::Tensor value_cache = remote_context.create_tensor(device_config.get_cache_precision(),
device_config.get_value_cache_shape());

m_key_cache.emplace_back(key_cache);
m_value_cache.emplace_back(value_cache);
ov::Tensor key_cache = remote_context.create_tensor(m_device_config.get_cache_precision(),
key_cache_shape);
ov::Tensor value_cache = remote_context.create_tensor(m_device_config.get_cache_precision(),
value_cache_shape);

if (m_key_cache.size() > decoder_layer_id) {
ov::Coordinate end_key = m_key_cache[decoder_layer_id].get_shape();
ov::Coordinate end_value = m_value_cache[decoder_layer_id].get_shape();

// copy current cache data
ov::RemoteTensor dst_key_roi(key_cache, start_key, end_key);
ov::RemoteTensor dst_value_roi(value_cache, start_value, end_value);
dst_key_roi.copy_from(m_key_cache[decoder_layer_id]);
dst_value_roi.copy_from(m_value_cache[decoder_layer_id]);

m_key_cache[decoder_layer_id] = key_cache;
m_value_cache[decoder_layer_id] = value_cache;
}
else {
m_key_cache.emplace_back(key_cache);
m_value_cache.emplace_back(value_cache);
}
update_request_tensor(decoder_layer_id);
}
}
}
Expand All @@ -62,8 +142,8 @@ class CacheManager {
}

void copy_blocks(const std::map<size_t, std::list<size_t>>& block_copy_map) {
ov::Shape key_shape = m_device_config.get_key_cache_shape();
ov::Shape value_shape = m_device_config.get_value_cache_shape();
ov::Shape key_shape = set_first_dim_and_make_static(m_device_config.get_key_cache_shape(), m_num_allocated_kv_blocks);
ov::Shape value_shape = set_first_dim_and_make_static(m_device_config.get_value_cache_shape(), m_num_allocated_kv_blocks);

ov::Coordinate key_src_start_roi(key_shape.size(), 0);
ov::Coordinate key_src_end_roi = key_shape;
Expand Down Expand Up @@ -98,5 +178,13 @@ class CacheManager {
}
}
}

std::shared_ptr<Core> get_core() {
return std::make_shared<Core>(m_core);
}

std::shared_ptr<DeviceConfig> get_device_config() {
return std::make_shared<DeviceConfig>(m_device_config);
}
};
}
10 changes: 2 additions & 8 deletions src/cpp/src/continuous_batching_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,7 @@ void ContinuousBatchingPipeline::ContinuousBatchingImpl::init(
ov::InferRequest infer_request = compiled_model.create_infer_request();

// setup KV caches
m_cache_manager = std::make_shared<CacheManager>(device_config, core);
for (size_t decoder_layer_id = 0; decoder_layer_id < device_config.get_num_layers(); ++decoder_layer_id) {
infer_request.set_tensor(std::string("key_cache.") + std::to_string(decoder_layer_id), m_cache_manager->get_key_cache(decoder_layer_id));
infer_request.set_tensor(std::string("value_cache.") + std::to_string(decoder_layer_id), m_cache_manager->get_value_cache(decoder_layer_id));
}
m_cache_manager = std::make_shared<CacheManager>(device_config, infer_request, core);

SchedulerConfig updated_config = scheduler_config;
// update KV blocks number in scheduler config
Expand All @@ -71,8 +67,7 @@ void ContinuousBatchingPipeline::ContinuousBatchingImpl::init(
// as it may lead to performance slowdown
can_use_partial_preemption = false;
}

m_scheduler = std::make_shared<Scheduler>(device_config.get_block_size(), updated_config, device_config.get_num_layers(), can_use_partial_preemption);
m_scheduler = std::make_shared<Scheduler>(device_config.get_block_size(), m_cache_manager, updated_config, device_config.get_num_layers(), can_use_partial_preemption);
// and finally create model runner
bool is_use_cache_eviction = m_scheduler->get_config().use_cache_eviction;
m_model_runner = std::make_shared<ModelRunner>(infer_request, m_scheduler->get_block_size(), device_config.get_num_layers(), is_use_cache_eviction);
Expand Down Expand Up @@ -133,7 +128,6 @@ void ContinuousBatchingPipeline::ContinuousBatchingImpl::step() {
_pull_awaiting_requests();

m_pipeline_metrics.requests = m_requests.size();

Scheduler::Output scheduler_output;
{
static ManualTimer timer("scheduling");
Expand Down
Loading

0 comments on commit 021d880

Please sign in to comment.