From de62c8f934cf0ede2c8213ec252379be90ad9caa Mon Sep 17 00:00:00 2001 From: sbalandi Date: Thu, 16 Jan 2025 17:59:35 +0000 Subject: [PATCH] Update --- samples/cpp/text_generation/chat_sample.cpp | 5 +- samples/python/text_generation/chat_sample.py | 2 +- .../openvino/genai/generation_handle.hpp | 17 +-- .../include/openvino/genai/llm_pipeline.hpp | 2 +- .../include/openvino/genai/streamer_base.hpp | 20 ++- src/cpp/src/continuous_batching_adapter.hpp | 4 +- src/cpp/src/continuous_batching_impl.cpp | 27 +--- src/cpp/src/generation_handle.cpp | 28 +++-- src/cpp/src/generation_stream.hpp | 9 +- src/cpp/src/icontinuous_batching.cpp | 4 +- src/cpp/src/llm_pipeline.cpp | 8 +- src/cpp/src/llm_pipeline_stateful.cpp | 15 +-- src/cpp/src/llm_pipeline_stateful.hpp | 2 +- src/cpp/src/llm_pipeline_static.cpp | 20 +-- src/cpp/src/lm_encoding.cpp | 8 +- .../src/prompt_lookup/prompt_lookup_impl.cpp | 22 +--- src/cpp/src/sequence_group.hpp | 12 +- ...batching_for_speculative_decoding_impl.cpp | 2 +- .../speculative_decoding_impl.cpp | 20 +-- src/cpp/src/text_callback_streamer.cpp | 18 ++- src/cpp/src/text_callback_streamer.hpp | 2 - src/cpp/src/utils.cpp | 31 ++++- src/cpp/src/utils.hpp | 6 +- src/cpp/src/visual_language/pipeline.cpp | 25 +--- src/python/openvino_genai/__init__.py | 3 +- src/python/openvino_genai/__init__.pyi | 3 +- .../openvino_genai/py_openvino_genai.pyi | 34 ++--- .../py_continuous_batching_pipeline.cpp | 10 +- src/python/py_utils.cpp | 30 +++-- src/python/py_utils.hpp | 2 +- tests/python_tests/test_llm_pipeline.py | 119 ++++++++++++++++-- .../accuracy/continuous_batching_accuracy.cpp | 2 +- ...ntinuous_batching_speculative_decoding.cpp | 2 +- 33 files changed, 300 insertions(+), 214 deletions(-) diff --git a/samples/cpp/text_generation/chat_sample.cpp b/samples/cpp/text_generation/chat_sample.cpp index b3cd24c1b3..f1f733a951 100644 --- a/samples/cpp/text_generation/chat_sample.cpp +++ b/samples/cpp/text_generation/chat_sample.cpp @@ -16,12 +16,11 @@ int main(int argc, char* argv[]) try { ov::genai::GenerationConfig config; config.max_new_tokens = 100; - std::function streamer = [](std::string word) { + auto streamer = [](std::string word) { std::cout << word << std::flush; // Return flag corresponds whether generation should be stopped. // false means continue generation. - - return ov::genai::StreamerRunningStatus::RUNNING; + return ov::genai::GenerationStatus::RUNNING; }; pipe.start_chat(); diff --git a/samples/python/text_generation/chat_sample.py b/samples/python/text_generation/chat_sample.py index 88a26f4e5c..2656cda44b 100755 --- a/samples/python/text_generation/chat_sample.py +++ b/samples/python/text_generation/chat_sample.py @@ -10,7 +10,7 @@ def streamer(subword): print(subword, end='', flush=True) # Return flag corresponds whether generation should be stopped. # False means continue generation. - return False + return openvino_genai.GenerationStatus.RUNNING def main(): parser = argparse.ArgumentParser() diff --git a/src/cpp/include/openvino/genai/generation_handle.hpp b/src/cpp/include/openvino/genai/generation_handle.hpp index 01af89eed1..4a59f44471 100644 --- a/src/cpp/include/openvino/genai/generation_handle.hpp +++ b/src/cpp/include/openvino/genai/generation_handle.hpp @@ -7,7 +7,6 @@ #include #include "openvino/genai/generation_config.hpp" -#include "openvino/genai/streamer_base.hpp" #include "openvino/genai/visibility.hpp" #include "openvino/genai/perf_metrics.hpp" @@ -16,8 +15,8 @@ enum class GenerationStatus { RUNNING = 0, // Default status for ongoing generation FINISHED = 1, // Status set when generation has been finished IGNORED = 2, // Status set when generation run into out-of-memory condition and could not be continued - DROPPED_BY_PIPELINE = 3, // Currently not used, TODO: implement abort functionality - DROPPED_BY_HANDLE = 4 // Status set when generation handle is dropped + CANCEL = 3, // Status set when generation handle is canceled + STOP = 4 // Status set when generation handle is stopped }; struct EncodedGenerationResult { @@ -35,10 +34,6 @@ struct EncodedGenerationResult { // PerfMetrics but with empty tokenization/detokenization durations. PerfMetrics perf_metrics; - - - // Status of streaming - StreamerRunningStatus m_streaming_status = ov::genai::StreamerRunningStatus::UNDEF; }; enum class GenerationFinishReason { @@ -79,7 +74,9 @@ class OPENVINO_GENAI_EXPORTS GenerationHandleImpl { std::shared_ptr m_generation_stream; ov::genai::GenerationConfig m_sampling_params; - bool is_dropped(); + bool is_stopped(); + + bool is_canceled(); public: GenerationHandleImpl(std::shared_ptr generation_stream, const ov::genai::GenerationConfig& sampling_params) : @@ -98,6 +95,10 @@ class OPENVINO_GENAI_EXPORTS GenerationHandleImpl { void drop(); + void stop(); + + void cancel(); + GenerationOutputs back(); // Reads result of a generation for single iteration GenerationOutputs read(); diff --git a/src/cpp/include/openvino/genai/llm_pipeline.hpp b/src/cpp/include/openvino/genai/llm_pipeline.hpp index 4e723fc5ab..a2d09779ad 100644 --- a/src/cpp/include/openvino/genai/llm_pipeline.hpp +++ b/src/cpp/include/openvino/genai/llm_pipeline.hpp @@ -19,7 +19,7 @@ namespace ov { namespace genai { // Return flag corresponds whether generation should be stopped: false means continue generation, true means stop. -using StreamerVariant = std::variant, std::function, std::shared_ptr, std::monostate>; +using StreamerVariant = std::variant, std::function, std::function, std::shared_ptr, std::monostate>; using OptionalGenerationConfig = std::optional; using EncodedInputs = std::variant; using StringInputs = std::variant>; diff --git a/src/cpp/include/openvino/genai/streamer_base.hpp b/src/cpp/include/openvino/genai/streamer_base.hpp index 6868b5b171..3c53ed911e 100644 --- a/src/cpp/include/openvino/genai/streamer_base.hpp +++ b/src/cpp/include/openvino/genai/streamer_base.hpp @@ -4,19 +4,16 @@ #pragma once #include "openvino/genai/tokenizer.hpp" +#include "openvino/genai/generation_handle.hpp" #include namespace ov { namespace genai { - -enum class StreamerRunningStatus { - UNDEF = 0, // Streaming is not run - RUNNING = 1, // Continue to run of inference - STOP = 2, // Stop generation, keep history as is, KV cache includes last request and generated tokens - CANCEL = 3 // Stop generate, drop last prompt and all generated tokens from history, KV cache include history but last step +// uint16_t for Python API here +struct UintCallbackStreamerResult { + uint16_t result; }; - -using CallbackTypeVariant = std::variant; +using CallbackTypeVariant = std::variant; /** * @brief base class for streamers. In order to use inherit from from this class and implement put, and methods @@ -24,9 +21,8 @@ using CallbackTypeVariant = std::variant; * @param m_tokenizer tokenizer */ class OPENVINO_GENAI_EXPORTS StreamerBase { -protected: - StreamerRunningStatus streaming_finish_status = StreamerRunningStatus::UNDEF; public: + ov::genai::GenerationStatus m_streaming_finish_status = ov::genai::GenerationStatus::RUNNING; /// @brief put is called every time new token is decoded, /// @return bool flag to indicate whether generation should be stopped, if return true generation stops virtual bool put(int64_t token) = 0; @@ -34,8 +30,8 @@ class OPENVINO_GENAI_EXPORTS StreamerBase { /// @brief end is called at the end of generation. It can be used to flush cache if your own streamer has one virtual void end() = 0; - virtual StreamerRunningStatus get_finish_streaming_reason() { - return streaming_finish_status; + ov::genai::GenerationStatus get_finish_reason() { + return m_streaming_finish_status; } virtual ~StreamerBase(); diff --git a/src/cpp/src/continuous_batching_adapter.hpp b/src/cpp/src/continuous_batching_adapter.hpp index 00928b342d..bb12542ca8 100644 --- a/src/cpp/src/continuous_batching_adapter.hpp +++ b/src/cpp/src/continuous_batching_adapter.hpp @@ -97,7 +97,7 @@ class ContinuousBatchingAdapter final : public LLMPipelineImplBase { std::vector plain_replies; std::vector plain_scores; for (GenerationResult& res : generated) { - OPENVINO_ASSERT(res.m_status == GenerationStatus::FINISHED || res.m_status == GenerationStatus::DROPPED_BY_HANDLE, "Got unfinished GenerationStatus"); + OPENVINO_ASSERT(res.m_status == GenerationStatus::FINISHED || res.m_status == GenerationStatus::STOP || res.m_status == GenerationStatus::CANCEL, "Got unfinished GenerationStatus"); std::move(res.m_generation_ids.begin(), res.m_generation_ids.end(), std::back_inserter(plain_replies)); std::move(res.m_scores.begin(), res.m_scores.end(), std::back_inserter(plain_scores)); } @@ -189,7 +189,7 @@ class ContinuousBatchingAdapter final : public LLMPipelineImplBase { std::vector> plain_tokens; std::vector plain_scores; for (EncodedGenerationResult& res : generated) { - OPENVINO_ASSERT(res.m_status == GenerationStatus::FINISHED || res.m_status == GenerationStatus::DROPPED_BY_HANDLE, "Got unfinished GenerationStatus"); + OPENVINO_ASSERT(res.m_status == GenerationStatus::FINISHED || res.m_status == GenerationStatus::STOP, "Got unfinished GenerationStatus"); std::move(res.m_generation_ids.begin(), res.m_generation_ids.end(), std::back_inserter(plain_tokens)); std::move(res.m_scores.begin(), res.m_scores.end(), std::back_inserter(plain_scores)); } diff --git a/src/cpp/src/continuous_batching_impl.cpp b/src/cpp/src/continuous_batching_impl.cpp index 838cf290e8..414999b5b4 100644 --- a/src/cpp/src/continuous_batching_impl.cpp +++ b/src/cpp/src/continuous_batching_impl.cpp @@ -276,20 +276,7 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::generate(const std::vector& streamer_ptr = std::visit(overloaded{ - [](std::monostate) -> std::shared_ptr { - return nullptr; - }, - [](const std::shared_ptr& streamer) { - return streamer; - }, - [this](const std::function& streamer) -> std::shared_ptr { - return std::make_unique(m_tokenizer, streamer); - }, - [this](const std::function& streamer) -> std::shared_ptr { - return std::make_unique(m_tokenizer, streamer); - } - }, streamer); + const std::shared_ptr& streamer_ptr = ov::genai::utils::create_streamer(streamer, m_tokenizer); OPENVINO_ASSERT(streamer_ptr == nullptr || input_ids.size() == 1 && sampling_params[0].num_return_sequences == 1 && (sampling_params[0].is_greedy_decoding() || sampling_params[0].is_multinomial()), @@ -320,13 +307,13 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::generate(const std::vectorcan_read()) { std::unordered_map token = generation->back(); for (const auto& gen_token : token.begin()->second.generated_ids) { continue_generation = !streamer_ptr->put(gen_token); if (!continue_generation) { - generation->drop(); + streamer_ptr->get_finish_reason() == GenerationStatus::CANCEL ? generation->cancel() : generation->stop(); break; } } @@ -356,9 +343,7 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::generate(const std::vectorget_finish_streaming_reason(); + result.m_status = request->get_generation_stream()->get_status(); for (size_t i = 0; i < num_outputs; ++i) { const auto & sequence = sequences[i]; @@ -391,7 +376,7 @@ void ContinuousBatchingPipeline::ContinuousBatchingImpl::_free_non_running_reque std::vector::iterator requests_iterator = m_requests.begin(); while (requests_iterator != m_requests.end()) { const auto& request = *requests_iterator; - if(request->has_finished() || request->out_of_memory() || request->handle_dropped()) { + if(request->has_finished() || request->out_of_memory() || request->handle_stopped() || request->handle_canceled()) { for (const auto& sequence: request->get_sequences()) { if (m_scheduler->has_block_table(sequence->get_id())) { m_scheduler->free_sequence(sequence->get_id()); @@ -409,7 +394,7 @@ void ContinuousBatchingPipeline::ContinuousBatchingImpl::_notify_requests_droppe // Notify the last time by pushing empty output // This causes read() to unblock by adding anything to the queue for (SequenceGroup::Ptr& request : m_requests) { - if (request->handle_dropped()) + if (request->handle_stopped() || request->handle_canceled()) request->push_empty_outputs(); } } diff --git a/src/cpp/src/generation_handle.cpp b/src/cpp/src/generation_handle.cpp index 5d92c560e9..9d2c656b7b 100644 --- a/src/cpp/src/generation_handle.cpp +++ b/src/cpp/src/generation_handle.cpp @@ -9,7 +9,7 @@ using namespace ov::genai; GenerationHandleImpl::~GenerationHandleImpl() { - drop(); + stop(); } GenerationStatus GenerationHandleImpl::get_status() { @@ -17,24 +17,36 @@ GenerationStatus GenerationHandleImpl::get_status() { } bool GenerationHandleImpl::can_read() { - return !is_dropped() && m_generation_stream->can_read(); + return !is_canceled() && !is_stopped() && m_generation_stream->can_read(); } -bool GenerationHandleImpl::is_dropped() { - return get_status() == GenerationStatus::DROPPED_BY_HANDLE; +bool GenerationHandleImpl::is_stopped() { + return get_status() == GenerationStatus::STOP; +} + +bool GenerationHandleImpl::is_canceled() { + return get_status() == GenerationStatus::CANCEL; } void GenerationHandleImpl::drop() { - m_generation_stream->drop(); + m_generation_stream->stop(); +} + +void GenerationHandleImpl::stop() { + m_generation_stream->stop(); +} + +void GenerationHandleImpl::cancel() { + m_generation_stream->cancel(); } std::unordered_map GenerationHandleImpl::back() { - OPENVINO_ASSERT(!is_dropped(), "GenerationHandle cannot be used after it is dropped."); + OPENVINO_ASSERT(!is_stopped(), "GenerationHandle cannot be used after it is stopped."); return m_generation_stream->back(); } std::unordered_map GenerationHandleImpl::read() { - OPENVINO_ASSERT(!is_dropped(), "GenerationHandle cannot be used after it is dropped."); + OPENVINO_ASSERT(!is_stopped(), "GenerationHandle cannot be used after it is stopped."); return m_generation_stream->read(); } @@ -57,7 +69,7 @@ void add_partial_result(std::unordered_map& partial_ } std::vector GenerationHandleImpl::read_all() { - OPENVINO_ASSERT(!is_dropped(), "GenerationHandle cannot be used after it is dropped."); + OPENVINO_ASSERT(!is_stopped(), "GenerationHandle cannot be used after it is stopped."); std::vector results; std::unordered_map partial_results; // We iterate until generation is running or there are tokens we haven't read yet diff --git a/src/cpp/src/generation_stream.hpp b/src/cpp/src/generation_stream.hpp index d76d0cf7f4..50ad2d8175 100644 --- a/src/cpp/src/generation_stream.hpp +++ b/src/cpp/src/generation_stream.hpp @@ -51,9 +51,14 @@ class GenerationStream { return m_status; } - void drop() { + void stop() { std::lock_guard lock(m_mutex); - m_status = GenerationStatus::DROPPED_BY_HANDLE; + m_status = GenerationStatus::STOP; + } + + void cancel() { + std::lock_guard lock(m_mutex); + m_status = GenerationStatus::CANCEL; } }; } diff --git a/src/cpp/src/icontinuous_batching.cpp b/src/cpp/src/icontinuous_batching.cpp index c2bcfcb717..6f14b73cd8 100644 --- a/src/cpp/src/icontinuous_batching.cpp +++ b/src/cpp/src/icontinuous_batching.cpp @@ -77,7 +77,7 @@ ContinuousBatchingPipeline::IContinuousBatchingPipeline::generate( const auto decode_start = std::chrono::steady_clock::now(); generated.push_back(m_tokenizer.decode(res.m_generation_ids.at(idx))); raw_counters.detokenization_durations.emplace_back(std::chrono::steady_clock::now() - decode_start); - if (m_is_chat_conversation && 0 == idx && res.m_streaming_status != ov::genai::StreamerRunningStatus::CANCEL) { + if (m_is_chat_conversation && 0 == idx && res.m_status != ov::genai::GenerationStatus::CANCEL) { m_history.push_back({{"role", "assistant"}, {"content", generated.back()}}); } } @@ -99,7 +99,7 @@ ContinuousBatchingPipeline::IContinuousBatchingPipeline::generate( } // if streaming was canceled, prompt/answer of current step shouldn't be presented in history, so let's remove prompt from history - if (m_is_chat_conversation && !encoded.empty() && encoded[0].m_streaming_status == ov::genai::StreamerRunningStatus::CANCEL) + if (m_is_chat_conversation && !encoded.empty() && encoded[0].m_status == ov::genai::GenerationStatus::CANCEL) m_history.pop_back(); return decoded; diff --git a/src/cpp/src/llm_pipeline.cpp b/src/cpp/src/llm_pipeline.cpp index 01a49f10c5..5710602ec1 100644 --- a/src/cpp/src/llm_pipeline.cpp +++ b/src/cpp/src/llm_pipeline.cpp @@ -47,9 +47,11 @@ std::pair split_model_descr( std::pair streamer(StreamerVariant func) { if (auto streamer_obj = std::get_if>(&func)) { return {utils::STREAMER_ARG_NAME, Any::make>(*streamer_obj)}; - } else if (auto streamer_obj = std::get_if>(&func)) { - return {utils::STREAMER_ARG_NAME, Any::make>(*streamer_obj)}; - } else { + } else if (auto streamer_obj = std::get_if>(&func)) { + return {utils::STREAMER_ARG_NAME, Any::make>(*streamer_obj)}; + } else if (auto streamer_obj = std::get_if>(&func)) { + return {utils::STREAMER_ARG_NAME, Any::make>(*streamer_obj)}; + } else { auto callback = std::get>(func); return {utils::STREAMER_ARG_NAME, Any::make>(callback)}; } diff --git a/src/cpp/src/llm_pipeline_stateful.cpp b/src/cpp/src/llm_pipeline_stateful.cpp index 2a50fde592..9c6e17298d 100644 --- a/src/cpp/src/llm_pipeline_stateful.cpp +++ b/src/cpp/src/llm_pipeline_stateful.cpp @@ -175,7 +175,7 @@ DecodedResults StatefulLLMPipeline::generate( auto decode_stop_time = std::chrono::steady_clock::now(); if (is_chat_conversation) { - if (m_chat_generation_finish_status == ov::genai::StreamerRunningStatus::CANCEL) { + if (m_chat_generation_finish_status == ov::genai::GenerationStatus::CANCEL) { // If chat generation process was canceled by user, let's rollback to previous state of history m_history.pop_back(); m_kv_history_manager.num_tokens_to_remove_from_kv_cache += m_tokenized_chat_history.size() - prev_tokenized_chat_history.size(); @@ -250,16 +250,7 @@ EncodedResults StatefulLLMPipeline::generate( // Stateful pipeline does not provide logprobs for prompt tokens OPENVINO_ASSERT(config.echo == false, "Echo is not supported in the stateful pipeline"); - std::shared_ptr streamer_ptr; - if (auto streamer_obj = std::get_if(&streamer)) { - streamer_ptr = nullptr; - } else if (auto streamer_obj = std::get_if>(&streamer)) { - streamer_ptr = *streamer_obj; - } else if (auto callback = std::get_if>(&streamer)) { - streamer_ptr = std::make_shared(m_tokenizer, *callback); - } else if (auto callback = std::get_if>(&streamer)) { - streamer_ptr = std::make_shared(m_tokenizer, *callback); - } + std::shared_ptr streamer_ptr = ov::genai::utils::create_streamer(streamer, m_tokenizer); auto batch_size = input_ids.get_shape().at(0); OPENVINO_ASSERT(streamer_ptr == nullptr || batch_size == 1 && config.num_return_sequences == 1 && @@ -352,7 +343,7 @@ EncodedResults StatefulLLMPipeline::generate( m_kv_history_manager.num_tokens_to_remove_from_kv_cache = m_model_runner.get_tensor("attention_mask").get_shape()[1] - prev_attn_mask_size; } - if (m_chat_generation_finish_status == ov::genai::StreamerRunningStatus::CANCEL) { + if (m_chat_generation_finish_status == ov::genai::GenerationStatus::CANCEL) { m_kv_history_manager.num_tokens_to_remove_from_kv_cache = m_model_runner.get_tensor("attention_mask").get_shape()[1] - prev_attn_mask_size; if (m_chat_input_type == ov::genai::utils::GenerationChatInputsType::ENCODED_INPUTS) { diff --git a/src/cpp/src/llm_pipeline_stateful.hpp b/src/cpp/src/llm_pipeline_stateful.hpp index 173b6de06a..05e56aae70 100644 --- a/src/cpp/src/llm_pipeline_stateful.hpp +++ b/src/cpp/src/llm_pipeline_stateful.hpp @@ -26,7 +26,7 @@ class StatefulLLMPipeline final : public LLMPipelineImplBase { // so, let's keep info about amount of tokens to trim from kv cache and amount of tokens to keep in history ov::genai::utils::HistoryRemoveManager m_kv_history_manager = {0, 0, 2}; // Finish reason of last generation for chat scenario - ov::genai::StreamerRunningStatus m_chat_generation_finish_status = ov::genai::StreamerRunningStatus::UNDEF; + ov::genai::GenerationStatus m_chat_generation_finish_status = ov::genai::GenerationStatus::RUNNING; void reset_kv_state(); public: diff --git a/src/cpp/src/llm_pipeline_static.cpp b/src/cpp/src/llm_pipeline_static.cpp index b9a55477cb..27bad713a5 100644 --- a/src/cpp/src/llm_pipeline_static.cpp +++ b/src/cpp/src/llm_pipeline_static.cpp @@ -660,7 +660,7 @@ void stream_generated_tokens(std::shared_ptr streamer_p std::unordered_map token = handle->back(); for (const auto& gen_token : token.begin()->second.generated_ids) { if (streamer_ptr->put(gen_token)) { - handle->drop(); + streamer_ptr->get_finish_reason() == ov::genai::GenerationStatus::CANCEL ? handle->cancel() : handle->stop(); break; } } @@ -860,14 +860,7 @@ EncodedResults StatefulLLMPipeline::generate( config.set_eos_token_id(m_generation_config.eos_token_id); config.validate(); - std::shared_ptr streamer_ptr; - if (auto streamer_obj = std::get_if(&streamer)) { - streamer_ptr = nullptr; - } else if (auto streamer_obj = std::get_if>(&streamer)) { - streamer_ptr = *streamer_obj; - } else if (auto callback = std::get_if>(&streamer)) { - streamer_ptr = std::make_shared(m_tokenizer, *callback); - } + std::shared_ptr streamer_ptr = ov::genai::utils::create_streamer(streamer, m_tokenizer); OPENVINO_ASSERT(config.is_greedy_decoding() || config.is_multinomial(), "Currently only greedy and multinomial decoding are supported"); @@ -1330,14 +1323,7 @@ EncodedResults StatelessLLMPipeline::generate( config.set_eos_token_id(m_generation_config.eos_token_id); config.validate(); - std::shared_ptr streamer_ptr; - if (auto streamer_obj = std::get_if(&streamer)) { - streamer_ptr = nullptr; - } else if (auto streamer_obj = std::get_if>(&streamer)) { - streamer_ptr = *streamer_obj; - } else if (auto callback = std::get_if>(&streamer)) { - streamer_ptr = std::make_shared(m_tokenizer, *callback); - } + std::shared_ptr streamer_ptr = ov::genai::utils::create_streamer(streamer, m_tokenizer); if (!config.is_greedy_decoding() && !config.is_multinomial()) { OPENVINO_THROW("Currently only greedy and multinomial decoding are supported"); diff --git a/src/cpp/src/lm_encoding.cpp b/src/cpp/src/lm_encoding.cpp index 539cf578fe..f7446a6b76 100644 --- a/src/cpp/src/lm_encoding.cpp +++ b/src/cpp/src/lm_encoding.cpp @@ -74,7 +74,7 @@ ov::genai::utils::GenerationFinishInfo get_lm_encoded_results( std::unordered_map token = handle->back(); for (const auto& gen_token : token.begin()->second.generated_ids) { if (streamer_ptr->put(gen_token)) { - handle->drop(); + streamer_ptr->get_finish_reason() == GenerationStatus::CANCEL ? handle->cancel() : handle->stop(); break; } } @@ -84,7 +84,7 @@ ov::genai::utils::GenerationFinishInfo get_lm_encoded_results( auto free_non_running_requests = [&streamer_ptr, &generations, &active_sequence_groups]() { auto removed_it = std::remove_if(active_sequence_groups.begin(), active_sequence_groups.end(), [](SequenceGroup::Ptr sg) -> bool { - return sg->has_finished() || sg->out_of_memory() || sg->handle_dropped(); + return sg->has_finished() || sg->out_of_memory() || sg->handle_stopped() || sg->handle_canceled(); }); active_sequence_groups.erase(removed_it, active_sequence_groups.end()); }; @@ -224,13 +224,13 @@ ov::genai::utils::GenerationFinishInfo get_lm_encoded_results( stream_generated_tokens(); if (streamer_ptr) { // push streamer's cache streamer_ptr->end(); - finish_info.streaming_finish_status = streamer_ptr->get_finish_streaming_reason(); } for (auto& sequence_group : sequence_groups) { auto sampling_params = sequence_group->get_sampling_parameters(); const auto& sequences = sequence_group->get_finished_sequences(); size_t num_outputs = std::min(sequence_group->get_sampling_parameters().num_return_sequences, sequences.size()); + finish_info.streaming_finish_status = sequence_group->get_generation_stream()->get_status(); for (size_t seq_id = 0; seq_id < num_outputs; ++seq_id) { const auto & sequence = sequences[seq_id]; @@ -245,7 +245,7 @@ ov::genai::utils::GenerationFinishInfo get_lm_encoded_results( sampler.clear_request_info(sequence_group->get_request_id()); // last generated token is not saved in KV cache, we need to add it for some cases - if (sequence_groups[0]->get_finished_sequences()[0]->get_finish_reason() == GenerationFinishReason::LENGTH || sequence_groups[0]->handle_dropped()) + if (sequence_groups[0]->get_finished_sequences()[0]->get_finish_reason() == GenerationFinishReason::LENGTH || sequence_groups[0]->handle_stopped()) finish_info.probably_disappeared_token = finish_info.results.tokens[0].back(); return finish_info; diff --git a/src/cpp/src/prompt_lookup/prompt_lookup_impl.cpp b/src/cpp/src/prompt_lookup/prompt_lookup_impl.cpp index a908ea6b54..ef828d82cd 100644 --- a/src/cpp/src/prompt_lookup/prompt_lookup_impl.cpp +++ b/src/cpp/src/prompt_lookup/prompt_lookup_impl.cpp @@ -1,6 +1,7 @@ // Copyright (C) 2023-2025 Intel Corporation // SPDX-License-Identifier: Apache-2.0 +#include "utils.hpp" #include "prompt_lookup_impl.hpp" #include "text_callback_streamer.hpp" @@ -86,20 +87,7 @@ ContinuousBatchingPipeline::PromptLookupImpl::generate(const std::vectorset_adapters(sampling_params[0].adapters); - const std::shared_ptr& streamer_ptr = std::visit(overloaded{ - [](std::monostate) -> std::shared_ptr { - return nullptr; - }, - [](const std::shared_ptr& streamer) { - return streamer; - }, - [this](const std::function& streamer) -> std::shared_ptr { - return std::make_unique(m_tokenizer, streamer); - }, - [this](const std::function& streamer) -> std::shared_ptr { - return std::make_unique(m_tokenizer, streamer); - } - }, streamer); + const std::shared_ptr& streamer_ptr = ov::genai::utils::create_streamer(streamer, m_tokenizer); OPENVINO_ASSERT(streamer_ptr == nullptr || input_ids.size() == 1 && (sampling_params[0].is_greedy_decoding() || sampling_params[0].is_multinomial()), "Currently streaming is possible only with batch size=1 and only for greedy or multinomial decoding"); @@ -132,7 +120,7 @@ ContinuousBatchingPipeline::PromptLookupImpl::generate(const std::vectorsecond.generated_ids) { continue_generation = !streamer_ptr->put(gen_token); if (!continue_generation) { - generation->drop(); + streamer_ptr->get_finish_reason() == GenerationStatus::CANCEL ? generation->cancel() : generation->stop(); break; } } @@ -156,9 +144,7 @@ ContinuousBatchingPipeline::PromptLookupImpl::generate(const std::vectorget_finish_streaming_reason(); + result.m_status = request->get_generation_stream()->get_status(); for (size_t i = 0; i < num_outputs; ++i) { const auto & sequence = sequences[i]; diff --git a/src/cpp/src/sequence_group.hpp b/src/cpp/src/sequence_group.hpp index c423675e64..3e6df6c7d4 100644 --- a/src/cpp/src/sequence_group.hpp +++ b/src/cpp/src/sequence_group.hpp @@ -296,7 +296,7 @@ class SequenceGroup : public std::enable_shared_from_this { size_t num_finished_seqs() const { return std::count_if(m_sequences.begin(), m_sequences.end(), [this] (Sequence::CPtr seq) { - return seq->has_finished() || seq->out_of_memory() || handle_dropped(); + return seq->has_finished() || seq->out_of_memory() || handle_stopped() || handle_canceled(); }); } @@ -339,7 +339,7 @@ class SequenceGroup : public std::enable_shared_from_this { std::vector get_finished_sequences() const { std::vector finished_seqs; for (size_t seq_id = 0; seq_id < m_sequences.size(); ++seq_id) { - if (m_sequences[seq_id]->has_finished() || m_sequences[seq_id]->out_of_memory() || handle_dropped()) { + if (m_sequences[seq_id]->has_finished() || m_sequences[seq_id]->out_of_memory() || handle_stopped() || handle_canceled()) { finished_seqs.push_back(m_sequences[seq_id]); } } @@ -585,8 +585,12 @@ class SequenceGroup : public std::enable_shared_from_this { m_generation_stream->set_generation_status(status); } - bool handle_dropped() const { - return m_generation_stream->get_status() == GenerationStatus::DROPPED_BY_HANDLE; + bool handle_stopped() const { + return m_generation_stream->get_status() == GenerationStatus::STOP; + } + + bool handle_canceled() const { + return m_generation_stream->get_status() == GenerationStatus::CANCEL; } void push_empty_outputs() { diff --git a/src/cpp/src/speculative_decoding/continuous_batching_for_speculative_decoding_impl.cpp b/src/cpp/src/speculative_decoding/continuous_batching_for_speculative_decoding_impl.cpp index dccc633d4d..5e569d9453 100644 --- a/src/cpp/src/speculative_decoding/continuous_batching_for_speculative_decoding_impl.cpp +++ b/src/cpp/src/speculative_decoding/continuous_batching_for_speculative_decoding_impl.cpp @@ -28,7 +28,7 @@ ContinuousBatchingPipeline::ContinuousBatchingForSpeculativeDecodingImpl::finish } } m_sampler->clear_request_info(request->get_request_id()); - request->set_generation_status(GenerationStatus::DROPPED_BY_HANDLE); + request->set_generation_status(GenerationStatus::STOP); } void ContinuousBatchingPipeline::ContinuousBatchingForSpeculativeDecodingImpl::finish_request(int64_t request_id) { diff --git a/src/cpp/src/speculative_decoding/speculative_decoding_impl.cpp b/src/cpp/src/speculative_decoding/speculative_decoding_impl.cpp index 505ff5ad3d..6ef688ff00 100644 --- a/src/cpp/src/speculative_decoding/speculative_decoding_impl.cpp +++ b/src/cpp/src/speculative_decoding/speculative_decoding_impl.cpp @@ -206,20 +206,7 @@ ContinuousBatchingPipeline::SpeculativeDecodingImpl::generate(const std::vector< m_main_pipeline->set_adapters(sampling_params[0].adapters); m_draft_pipeline->set_adapters(sampling_params[0].adapters); - const std::shared_ptr& streamer_ptr = std::visit(overloaded{ - [](std::monostate) -> std::shared_ptr { - return nullptr; - }, - [](const std::shared_ptr& streamer) { - return streamer; - }, - [this](const std::function& streamer) -> std::shared_ptr { - return std::make_unique(m_tokenizer, streamer); - }, - [this](const std::function& streamer) -> std::shared_ptr { - return std::make_unique(m_tokenizer, streamer); - } - }, streamer); + const std::shared_ptr& streamer_ptr = ov::genai::utils::create_streamer(streamer, m_tokenizer); OPENVINO_ASSERT(streamer_ptr == nullptr || input_ids.size() == 1 && (sampling_params[0].is_greedy_decoding() || sampling_params[0].is_multinomial()), "Currently streaming is possible only with batch size=1 and only for greedy or multinomial decoding"); @@ -256,7 +243,7 @@ ContinuousBatchingPipeline::SpeculativeDecodingImpl::generate(const std::vector< for (const auto& gen_token : token.begin()->second.generated_ids) { continue_generation = !streamer_ptr->put(gen_token); if (!continue_generation) { - main_generation->drop(); + streamer_ptr->get_finish_reason() == GenerationStatus::CANCEL ? main_generation->cancel() : main_generation->stop(); break; } } @@ -286,8 +273,7 @@ ContinuousBatchingPipeline::SpeculativeDecodingImpl::generate(const std::vector< result.m_request_id = request_id; result.m_generation_ids.resize(num_outputs); result.m_scores.resize(num_outputs); - if (streamer_ptr) - result.m_streaming_status = streamer_ptr->get_finish_streaming_reason(); + result.m_status = request->get_generation_stream()->get_status(); for (size_t i = 0; i < num_outputs; ++i) { const auto & sequence = sequences[i]; diff --git a/src/cpp/src/text_callback_streamer.cpp b/src/cpp/src/text_callback_streamer.cpp index 5412627407..949562eaf8 100644 --- a/src/cpp/src/text_callback_streamer.cpp +++ b/src/cpp/src/text_callback_streamer.cpp @@ -52,12 +52,22 @@ bool TextCallbackStreamer::put(int64_t token) { bool TextCallbackStreamer::is_generation_complete(CallbackTypeVariant callback_status) { bool is_complete = false; - if (auto status = std::get_if(&callback_status)) { - streaming_finish_status = *status; - is_complete = (streaming_finish_status == StreamerRunningStatus::STOP || streaming_finish_status == StreamerRunningStatus::CANCEL); + if (auto status = std::get_if(&callback_status)) { + m_streaming_finish_status = *status; + is_complete = (m_streaming_finish_status == GenerationStatus::STOP || m_streaming_finish_status == GenerationStatus::CANCEL); + } else if (auto status = std::get_if(&callback_status)) { + auto result = status->result; + is_complete = result > 0; + if (result == (uint16_t)GenerationStatus::RUNNING) { + m_streaming_finish_status = GenerationStatus::RUNNING; + } else if (result == (uint16_t)GenerationStatus::CANCEL) { + m_streaming_finish_status = GenerationStatus::CANCEL; + } else { + m_streaming_finish_status = GenerationStatus::STOP; + } } else if (auto status = std::get_if(&callback_status)) { is_complete = *status; - streaming_finish_status = *status ? StreamerRunningStatus::STOP : StreamerRunningStatus::RUNNING; + m_streaming_finish_status = *status ? GenerationStatus::STOP : GenerationStatus::RUNNING; } return is_complete; diff --git a/src/cpp/src/text_callback_streamer.hpp b/src/cpp/src/text_callback_streamer.hpp index 546bf7fb43..797d09342e 100644 --- a/src/cpp/src/text_callback_streamer.hpp +++ b/src/cpp/src/text_callback_streamer.hpp @@ -11,8 +11,6 @@ namespace genai { class TextCallbackStreamer: public StreamerBase { public: - StreamerRunningStatus streaming_status = StreamerRunningStatus::UNDEF; - bool put(int64_t token) override; void end() override; diff --git a/src/cpp/src/utils.cpp b/src/cpp/src/utils.cpp index 9e47152974..d9b1f056ed 100644 --- a/src/cpp/src/utils.cpp +++ b/src/cpp/src/utils.cpp @@ -3,6 +3,7 @@ #include "utils.hpp" +#include #include #include @@ -15,6 +16,8 @@ #include "openvino/op/tanh.hpp" #include "openvino/op/transpose.hpp" +#include "text_callback_streamer.hpp" + #include "sampler.hpp" namespace ov { @@ -174,13 +177,37 @@ ov::genai::StreamerVariant get_streamer_from_map(const ov::AnyMap& config_map) { streamer = any_val.as>(); } else if (any_val.is>()) { streamer = any_val.as>(); - } else if (any_val.is>()) { - streamer = any_val.as>(); + } else if (any_val.is>()) { + streamer = any_val.as>(); + } else if (any_val.is>()) { + streamer = any_val.as>(); } } return streamer; } +std::shared_ptr create_streamer(StreamerVariant streamer, Tokenizer tokenizer) { + std::shared_ptr streamer_ptr = std::visit(overloaded{ + [](std::monostate) -> std::shared_ptr { + return nullptr; + }, + [](const std::shared_ptr& streamer) { + return streamer; + }, + [&tokenizer = tokenizer](const std::function& streamer) -> std::shared_ptr { + return std::make_unique(tokenizer, streamer); + }, + [&tokenizer = tokenizer](const std::function& streamer) -> std::shared_ptr { + return std::make_unique(tokenizer, streamer); + }, + [&tokenizer = tokenizer](const std::function& streamer) -> std::shared_ptr { + return std::make_unique(tokenizer, streamer); + } + }, streamer); + + return streamer_ptr; +} + ov::genai::OptionalGenerationConfig get_config_from_map(const ov::AnyMap& config_map) { if (config_map.count(CONFIG_ARG_NAME)) return config_map.at(CONFIG_ARG_NAME).as(); diff --git a/src/cpp/src/utils.hpp b/src/cpp/src/utils.hpp index 0b78a01ec7..6943688c5d 100644 --- a/src/cpp/src/utils.hpp +++ b/src/cpp/src/utils.hpp @@ -50,7 +50,7 @@ struct GenerationFinishInfo { EncodedResults results; std::optional probably_disappeared_token = std::nullopt; - StreamerRunningStatus streaming_finish_status = StreamerRunningStatus::UNDEF; + GenerationStatus streaming_finish_status; }; Tensor init_attention_mask(const Tensor& position_ids); @@ -128,6 +128,10 @@ ov::Tensor push_front_inputs(const ov::Tensor& base_tensor, int64_t add_to_front void print_compiled_model_properties(ov::CompiledModel& compiled_Model, const char* model_title); +template struct overloaded : Ts... {using Ts::operator()...;}; +template overloaded(Ts...) -> overloaded; +std::shared_ptr create_streamer(StreamerVariant streamer, Tokenizer tokenizer); + } // namespace utils } // namespace genai } // namespace ov diff --git a/src/cpp/src/visual_language/pipeline.cpp b/src/cpp/src/visual_language/pipeline.cpp index bb1e365f8c..eca03932f1 100644 --- a/src/cpp/src/visual_language/pipeline.cpp +++ b/src/cpp/src/visual_language/pipeline.cpp @@ -21,9 +21,6 @@ using namespace ov::genai; namespace { - -template struct overloaded : Ts... {using Ts::operator()...;}; -template overloaded(Ts...) -> overloaded; constexpr size_t BATCH_SIZE = 1; @@ -187,27 +184,7 @@ class ov::genai::VLMPipeline::VLMPipelineImpl { SequenceGroup::Ptr sequence_group = std::make_shared(request_id, prompt_ids, generation_config, block_size); requests.push_back(sequence_group); - OPENVINO_ASSERT((!m_is_chat_conversation || !std::get_if>(&streamer)), - "For chat mode, please, use Steamer as StreamerBase class or as callback with a bool return value."); - - std::shared_ptr streamer_ptr = std::visit(overloaded{ - [&m_tokenizer = m_tokenizer]( - const std::function& callback - ) -> std::shared_ptr { - return std::make_shared(m_tokenizer, callback); - }, - [](const std::shared_ptr& ptr) { - return ptr; - }, - [](std::monostate) { - return std::shared_ptr{nullptr}; - }, - [&m_tokenizer = m_tokenizer] - (const std::function& callback - ) -> std::shared_ptr { - return std::make_shared(m_tokenizer, callback); - } - }, streamer); + std::shared_ptr streamer_ptr = ov::genai::utils::create_streamer(streamer, m_tokenizer); OPENVINO_ASSERT(streamer_ptr == nullptr || generation_config.num_return_sequences == 1 && (generation_config.is_greedy_decoding() || generation_config.is_multinomial()), diff --git a/src/python/openvino_genai/__init__.py b/src/python/openvino_genai/__init__.py index 5f33ebc8e4..282a5028fe 100644 --- a/src/python/openvino_genai/__init__.py +++ b/src/python/openvino_genai/__init__.py @@ -15,7 +15,7 @@ RawPerfMetrics, PerfMetrics, StreamerBase, - get_version + get_version, ) __version__ = get_version() @@ -85,4 +85,5 @@ SchedulerConfig, CacheEvictionConfig, AggregationMode, + GenerationStatus ) diff --git a/src/python/openvino_genai/__init__.pyi b/src/python/openvino_genai/__init__.pyi index 0a401ae958..66009972d5 100644 --- a/src/python/openvino_genai/__init__.pyi +++ b/src/python/openvino_genai/__init__.pyi @@ -18,6 +18,7 @@ from openvino_genai.py_openvino_genai import EncodedResults from openvino_genai.py_openvino_genai import FluxTransformer2DModel from openvino_genai.py_openvino_genai import GenerationConfig from openvino_genai.py_openvino_genai import GenerationResult +from openvino_genai.py_openvino_genai import GenerationStatus from openvino_genai.py_openvino_genai import Generator from openvino_genai.py_openvino_genai import Image2ImagePipeline from openvino_genai.py_openvino_genai import ImageGenerationConfig @@ -45,5 +46,5 @@ from openvino_genai.py_openvino_genai import draft_model from openvino_genai.py_openvino_genai import get_version import os as os from . import py_openvino_genai -__all__ = ['Adapter', 'AdapterConfig', 'AggregationMode', 'AutoencoderKL', 'CLIPTextModel', 'CLIPTextModelWithProjection', 'CacheEvictionConfig', 'ChunkStreamerBase', 'ContinuousBatchingPipeline', 'CppStdGenerator', 'DecodedResults', 'EncodedResults', 'FluxTransformer2DModel', 'GenerationConfig', 'GenerationResult', 'Generator', 'Image2ImagePipeline', 'ImageGenerationConfig', 'InpaintingPipeline', 'LLMPipeline', 'PerfMetrics', 'RawPerfMetrics', 'SD3Transformer2DModel', 'Scheduler', 'SchedulerConfig', 'StopCriteria', 'StreamerBase', 'T5EncoderModel', 'Text2ImagePipeline', 'TokenizedInputs', 'Tokenizer', 'TorchGenerator', 'UNet2DConditionModel', 'VLMPipeline', 'WhisperGenerationConfig', 'WhisperPerfMetrics', 'WhisperPipeline', 'WhisperRawPerfMetrics', 'draft_model', 'get_version', 'openvino', 'os', 'py_openvino_genai'] +__all__ = ['Adapter', 'AdapterConfig', 'AggregationMode', 'AutoencoderKL', 'CLIPTextModel', 'CLIPTextModelWithProjection', 'CacheEvictionConfig', 'ChunkStreamerBase', 'ContinuousBatchingPipeline', 'CppStdGenerator', 'DecodedResults', 'EncodedResults', 'FluxTransformer2DModel', 'GenerationConfig', 'GenerationResult', 'GenerationStatus', 'Generator', 'Image2ImagePipeline', 'ImageGenerationConfig', 'InpaintingPipeline', 'LLMPipeline', 'PerfMetrics', 'RawPerfMetrics', 'SD3Transformer2DModel', 'Scheduler', 'SchedulerConfig', 'StopCriteria', 'StreamerBase', 'T5EncoderModel', 'Text2ImagePipeline', 'TokenizedInputs', 'Tokenizer', 'TorchGenerator', 'UNet2DConditionModel', 'VLMPipeline', 'WhisperGenerationConfig', 'WhisperPerfMetrics', 'WhisperPipeline', 'WhisperRawPerfMetrics', 'draft_model', 'get_version', 'openvino', 'os', 'py_openvino_genai'] __version__: str diff --git a/src/python/openvino_genai/py_openvino_genai.pyi b/src/python/openvino_genai/py_openvino_genai.pyi index f3158e2cf8..3a138bef22 100644 --- a/src/python/openvino_genai/py_openvino_genai.pyi +++ b/src/python/openvino_genai/py_openvino_genai.pyi @@ -373,10 +373,10 @@ class ContinuousBatchingPipeline: def add_request(self, request_id: int, prompt: str, generation_config: GenerationConfig) -> GenerationHandle: ... @typing.overload - def generate(self, input_ids: list[openvino._pyopenvino.Tensor], generation_config: list[GenerationConfig], streamer: typing.Callable[[str], bool] | typing.Callable[[str], ...] | StreamerBase | None = None) -> list[EncodedGenerationResult]: + def generate(self, input_ids: list[openvino._pyopenvino.Tensor], generation_config: list[GenerationConfig], streamer: typing.Callable[[str], bool] | typing.Callable[[str], ...] | typing.Callable[[str], GenerationStatus] | StreamerBase | None = None) -> list[EncodedGenerationResult]: ... @typing.overload - def generate(self, prompts: list[str], generation_config: list[GenerationConfig], streamer: typing.Callable[[str], bool] | typing.Callable[[str], ...] | StreamerBase | None = None) -> list[GenerationResult]: + def generate(self, prompts: list[str], generation_config: list[GenerationConfig], streamer: typing.Callable[[str], bool] | typing.Callable[[str], ...] | typing.Callable[[str], GenerationStatus] | StreamerBase | None = None) -> list[GenerationResult]: ... def get_config(self) -> GenerationConfig: ... @@ -438,8 +438,8 @@ class EncodedGenerationResult: RUNNING = 0 - Default status for ongoing generation. FINISHED = 1 - Status set when generation has been finished. IGNORED = 2 - Status set when generation run into out-of-memory condition and could not be continued. - DROPPED_BY_PIPELINE = 3 - Currently not used, TODO: implement abort functionality. - DROPPED_BY_HANDLE = 4 - Status set when generation handle is dropped. + CANCEL = 3 - Status set when generation handle is canceled. + STOP = 4 - Status set when generation handle is stopped. perf_metrics: Performance metrics for each generation result. @@ -669,6 +669,8 @@ class GenerationHandle: ... def can_read(self) -> bool: ... + def cancel(self) -> None: + ... def drop(self) -> None: ... def get_status(self) -> GenerationStatus: @@ -677,6 +679,8 @@ class GenerationHandle: ... def read_all(self) -> list[GenerationOutput]: ... + def stop(self) -> None: + ... class GenerationOutput: finish_reason: GenerationFinishReason generated_ids: list[int] @@ -696,8 +700,8 @@ class GenerationResult: RUNNING = 0 - Default status for ongoing generation. FINISHED = 1 - Status set when generation has been finished. IGNORED = 2 - Status set when generation run into out-of-memory condition and could not be continued. - DROPPED_BY_PIPELINE = 3 - Currently not used, TODO: implement abort functionality. - DROPPED_BY_HANDLE = 4 - Status set when generation handle is dropped. + CANCEL = 3 - Status set when generation handle is canceled. + STOP = 4 - Status set when generation handle is stopped. perf_metrics: Performance metrics for each generation result. @@ -727,16 +731,16 @@ class GenerationStatus: IGNORED - DROPPED_BY_PIPELINE + CANCEL - DROPPED_BY_HANDLE + STOP """ - DROPPED_BY_HANDLE: typing.ClassVar[GenerationStatus] # value = - DROPPED_BY_PIPELINE: typing.ClassVar[GenerationStatus] # value = + CANCEL: typing.ClassVar[GenerationStatus] # value = FINISHED: typing.ClassVar[GenerationStatus] # value = IGNORED: typing.ClassVar[GenerationStatus] # value = RUNNING: typing.ClassVar[GenerationStatus] # value = - __members__: typing.ClassVar[dict[str, GenerationStatus]] # value = {'RUNNING': , 'FINISHED': , 'IGNORED': , 'DROPPED_BY_PIPELINE': , 'DROPPED_BY_HANDLE': } + STOP: typing.ClassVar[GenerationStatus] # value = + __members__: typing.ClassVar[dict[str, GenerationStatus]] # value = {'RUNNING': , 'FINISHED': , 'IGNORED': , 'CANCEL': , 'STOP': } def __eq__(self, other: typing.Any) -> bool: ... def __getstate__(self) -> int: @@ -950,7 +954,7 @@ class LLMPipeline: """ This class is used for generation with LLMs """ - def __call__(self, inputs: openvino._pyopenvino.Tensor | TokenizedInputs | str | list[str], generation_config: GenerationConfig | None = None, streamer: typing.Callable[[str], bool] | StreamerBase | None = None, **kwargs) -> EncodedResults | DecodedResults: + def __call__(self, inputs: openvino._pyopenvino.Tensor | TokenizedInputs | str | list[str], generation_config: GenerationConfig | None = None, streamer: typing.Callable[[str], int] | StreamerBase | None = None, **kwargs) -> EncodedResults | DecodedResults: """ Generates sequences or tokens for LLMs. If input is a string or list of strings then resulting sequences will be already detokenized. @@ -1035,7 +1039,7 @@ class LLMPipeline: """ def finish_chat(self) -> None: ... - def generate(self, inputs: openvino._pyopenvino.Tensor | TokenizedInputs | str | list[str], generation_config: GenerationConfig | None = None, streamer: typing.Callable[[str], bool] | StreamerBase | None = None, **kwargs) -> EncodedResults | DecodedResults: + def generate(self, inputs: openvino._pyopenvino.Tensor | TokenizedInputs | str | list[str], generation_config: GenerationConfig | None = None, streamer: typing.Callable[[str], int] | StreamerBase | None = None, **kwargs) -> EncodedResults | DecodedResults: """ Generates sequences or tokens for LLMs. If input is a string or list of strings then resulting sequences will be already detokenized. @@ -1812,7 +1816,7 @@ class VLMPipeline: def finish_chat(self) -> None: ... @typing.overload - def generate(self, prompt: str, images: list[openvino._pyopenvino.Tensor], generation_config: GenerationConfig, streamer: typing.Callable[[str], bool] | StreamerBase | None = None, **kwargs) -> VLMDecodedResults: + def generate(self, prompt: str, images: list[openvino._pyopenvino.Tensor], generation_config: GenerationConfig, streamer: typing.Callable[[str], int] | StreamerBase | None = None, **kwargs) -> VLMDecodedResults: """ Generates sequences for VLMs. @@ -1835,7 +1839,7 @@ class VLMPipeline: :rtype: VLMDecodedResults """ @typing.overload - def generate(self, prompt: str, images: openvino._pyopenvino.Tensor, generation_config: GenerationConfig, streamer: typing.Callable[[str], bool] | StreamerBase | None = None, **kwargs) -> VLMDecodedResults: + def generate(self, prompt: str, images: openvino._pyopenvino.Tensor, generation_config: GenerationConfig, streamer: typing.Callable[[str], int] | StreamerBase | None = None, **kwargs) -> VLMDecodedResults: """ Generates sequences for VLMs. diff --git a/src/python/py_continuous_batching_pipeline.cpp b/src/python/py_continuous_batching_pipeline.cpp index 6df67b3a39..e742df26bf 100644 --- a/src/python/py_continuous_batching_pipeline.cpp +++ b/src/python/py_continuous_batching_pipeline.cpp @@ -78,8 +78,8 @@ auto generation_result_docstring = R"( RUNNING = 0 - Default status for ongoing generation. FINISHED = 1 - Status set when generation has been finished. IGNORED = 2 - Status set when generation run into out-of-memory condition and could not be continued. - DROPPED_BY_PIPELINE = 3 - Currently not used, TODO: implement abort functionality. - DROPPED_BY_HANDLE = 4 - Status set when generation handle is dropped. + CANCEL = 3 - Status set when generation handle is canceled. + STOP = 4 - Status set when generation handle is stopped. perf_metrics: Performance metrics for each generation result. @@ -125,8 +125,8 @@ void init_continuous_batching_pipeline(py::module_& m) { .value("RUNNING", ov::genai::GenerationStatus::RUNNING) .value("FINISHED", ov::genai::GenerationStatus::FINISHED) .value("IGNORED", ov::genai::GenerationStatus::IGNORED) - .value("DROPPED_BY_PIPELINE", ov::genai::GenerationStatus::DROPPED_BY_PIPELINE) - .value("DROPPED_BY_HANDLE", ov::genai::GenerationStatus::DROPPED_BY_HANDLE); + .value("CANCEL", ov::genai::GenerationStatus::CANCEL) + .value("STOP", ov::genai::GenerationStatus::STOP); py::class_(m, "GenerationResult", generation_result_docstring) .def(py::init<>()) @@ -175,6 +175,8 @@ void init_continuous_batching_pipeline(py::module_& m) { .def("get_status", &GenerationHandleImpl::get_status) .def("can_read", &GenerationHandleImpl::can_read) .def("drop", &GenerationHandleImpl::drop) + .def("stop", &GenerationHandleImpl::stop) + .def("cancel", &GenerationHandleImpl::cancel) .def("back", &GenerationHandleImpl::back) .def("read", &GenerationHandleImpl::read) .def("read_all", &GenerationHandleImpl::read_all); diff --git a/src/python/py_utils.cpp b/src/python/py_utils.cpp index 90cce498cd..22d89d899b 100644 --- a/src/python/py_utils.cpp +++ b/src/python/py_utils.cpp @@ -316,19 +316,23 @@ ov::genai::StreamerVariant pystreamer_to_streamer(const PyBindStreamerVariant& p ov::genai::StreamerVariant streamer = std::monostate(); std::visit(overloaded { - [&streamer](const std::function& py_callback){ - // Wrap python streamer with manual utf-8 decoding. Do not rely - // on pybind automatic decoding since it raises exceptions on incomplete strings. - auto callback_wrapped = [py_callback](std::string subword) -> bool { - auto py_str = PyUnicode_DecodeUTF8(subword.data(), subword.length(), "replace"); - return py_callback(py::reinterpret_borrow(py_str)); - }; - streamer = callback_wrapped; - }, - [&streamer](std::shared_ptr streamer_cls){ - streamer = streamer_cls; - }, - [](std::monostate none){ /*streamer is already a monostate */ } + [&streamer](const std::function(py::str)>& py_callback){ + // Wrap python streamer with manual utf-8 decoding. Do not rely + // on pybind automatic decoding since it raises exceptions on incomplete strings. + auto callback_wrapped = [py_callback](std::string subword) -> ov::genai::UintCallbackStreamerResult { + auto py_str = PyUnicode_DecodeUTF8(subword.data(), subword.length(), "replace"); + std::optional callback_output = py_callback(py::reinterpret_borrow(py_str)); + ov::genai::UintCallbackStreamerResult result = {0}; + if (callback_output.has_value()) + result.result = *callback_output; + return result; + }; + streamer = callback_wrapped; + }, + [&streamer](std::shared_ptr streamer_cls){ + streamer = streamer_cls; + }, + [](std::monostate none){ /*streamer is already a monostate */ } }, py_streamer); return streamer; } diff --git a/src/python/py_utils.hpp b/src/python/py_utils.hpp index c3dbdf6aee..0d73b5639d 100644 --- a/src/python/py_utils.hpp +++ b/src/python/py_utils.hpp @@ -17,7 +17,7 @@ namespace ov::genai::pybind::utils { // When StreamerVariant is used utf-8 decoding is done by pybind and can lead to exception on incomplete texts. // Therefore strings decoding should be handled with PyUnicode_DecodeUTF8(..., "replace") to not throw errors. -using PyBindStreamerVariant = std::variant, std::shared_ptr, std::monostate>; +using PyBindStreamerVariant = std::variant(std::string)>, std::shared_ptr, std::monostate>; template struct overloaded : Ts... { diff --git a/tests/python_tests/test_llm_pipeline.py b/tests/python_tests/test_llm_pipeline.py index 0cc5f8d9f4..29b532f5d1 100644 --- a/tests/python_tests/test_llm_pipeline.py +++ b/tests/python_tests/test_llm_pipeline.py @@ -167,7 +167,12 @@ def user_defined_callback(subword): print(subword) -@pytest.mark.parametrize("callback", [print, user_defined_callback, lambda subword: print(subword)]) +def user_defined_status_callback(subword): + print(subword) + return ov_genai.GenerationStatus.RUNNING + + +@pytest.mark.parametrize("callback", [print, user_defined_callback, user_defined_status_callback, lambda subword: print(subword)]) @pytest.mark.precommit @pytest.mark.nightly def test_callback_one_string(callback): @@ -177,7 +182,7 @@ def test_callback_one_string(callback): ov_pipe.generate('table is made of', generation_config, callback) -@pytest.mark.parametrize("callback", [print, user_defined_callback, lambda subword: print(subword)]) +@pytest.mark.parametrize("callback", [print, user_defined_callback, user_defined_status_callback, lambda subword: print(subword)]) @pytest.mark.precommit @pytest.mark.nightly def test_callback_batch_throws(callback): @@ -186,7 +191,7 @@ def test_callback_batch_throws(callback): ov_pipe.generate(['1', '2'], ov_pipe.get_generation_config(), callback) -@pytest.mark.parametrize("callback", [print, user_defined_callback, lambda subword: print(subword)]) +@pytest.mark.parametrize("callback", [print, user_defined_callback, user_defined_status_callback, lambda subword: print(subword)]) @pytest.mark.precommit @pytest.mark.nightly def test_callback_kwargs_one_string(callback): @@ -194,7 +199,7 @@ def test_callback_kwargs_one_string(callback): pipe.generate('table is made of', max_new_tokens=10, streamer=callback) -@pytest.mark.parametrize("callback", [print, user_defined_callback, lambda subword: print(subword)]) +@pytest.mark.parametrize("callback", [print, user_defined_callback, user_defined_status_callback, lambda subword: print(subword)]) @pytest.mark.precommit @pytest.mark.nightly @pytest.mark.parametrize("model_descr", get_models_list()) @@ -208,7 +213,7 @@ def test_callback_decoding_metallama(model_descr, callback): ov_pipe.generate(prompt, max_new_tokens=300, streamer=callback) -@pytest.mark.parametrize("callback", [print, user_defined_callback, lambda subword: print(subword)]) +@pytest.mark.parametrize("callback", [print, user_defined_callback, user_defined_status_callback, lambda subword: print(subword)]) @pytest.mark.precommit @pytest.mark.nightly def test_callback_kwargs_batch_throws(callback): @@ -217,6 +222,106 @@ def test_callback_kwargs_batch_throws(callback): pipe.generate(['1', '2'], max_new_tokens=10, streamer=callback) +@pytest.mark.precommit +@pytest.mark.nightly +def test_callback_terminate_by_bool_sampler(): + pipe = read_model(get_models_list()[0])[4] + + current_iter = 0 + num_iters = 10 + def callback(subword): + nonlocal current_iter + current_iter += 1 + return current_iter == num_iters + + ov_generation_config = GenerationConfig(max_new_tokens=100) + + # without attention mask + input_ids, _ = input_tensors_list[0] + inputs_ov = ov.Tensor(input_ids) + ov_output = pipe.generate(inputs_ov, ov_generation_config, streamer=callback) + + assert len(ov_output.tokens[0]) == num_iters + + +@pytest.mark.precommit +@pytest.mark.nightly +def test_callback_terminate_by_status_sampler(): + pipe = read_model(get_models_list()[0])[4] + + current_iter = 0 + num_iters = 10 + def callback(subword): + nonlocal current_iter + current_iter += 1 + return ov_genai.GenerationStatus.STOP if current_iter == num_iters else ov_genai.GenerationStatus.RUNNING + + ov_generation_config = GenerationConfig(max_new_tokens=100) + + # without attention mask + input_ids, _ = input_tensors_list[0] + inputs_ov = ov.Tensor(input_ids) + ov_output = pipe.generate(inputs_ov, ov_generation_config, streamer=callback) + + assert len(ov_output.tokens[0]) == num_iters + + +@pytest.mark.parametrize("model_descr", get_chat_models_list()) +@pytest.mark.precommit +@pytest.mark.nightly +def test_chat_scenario_callback_cancel(model_descr): + callback_questions = [ + '1+1=', + 'Why is the Sun yellow?', + 'What is the previous answer?', + 'What was my first question?' + ] + + generation_config_kwargs = dict(max_new_tokens=20) + + chat_history_hf = [] + chat_history_ov = [] + + model_id, path, tokenizer, opt_model, ov_pipe = read_model((model_descr[0], model_descr[1] / '_test_chat')) + + ov_generation_config = GenerationConfig(**generation_config_kwargs) + hf_generation_config = convert_to_hf(opt_model.generation_config, ov_generation_config) + + current_iter = 0 + num_iters = 3 + def callback(subword): + nonlocal current_iter + current_iter += 1 + return ov_genai.GenerationStatus.CENCEL if current_iter == num_iters else ov_genai.GenerationStatus.RUNNING + + ov_pipe.start_chat() + for prompt in callback_questions: + if (prompt != callback_questions[1]): + chat_history_hf.append({'role': 'user', 'content': prompt}) + chat_history_ov.append({'role': 'user', 'content': prompt}) + + chat_prompt = tokenizer.apply_chat_template(chat_history_hf, tokenize=False, add_generation_prompt=True) + tokenized = tokenizer(chat_prompt, return_tensors='pt', add_special_tokens=False) + prompt_len = tokenized['input_ids'].numel() + + answer = opt_model.generate(**tokenized, generation_config=hf_generation_config).sequences[0] + answer_str = tokenizer.decode(answer[prompt_len:], skip_special_tokens=True) + chat_history_hf.append({'role': 'assistant', 'content': answer_str}) + + answer_ov = ov_pipe.generate(prompt, generation_config=ov_generation_config) + chat_history_ov.append({'role': 'assistant', 'content': answer_ov}) + else: + answer_ov = ov_pipe.generate(prompt, generation_config=ov_generation_config, streamer=callback) + + ov_pipe.finish_chat() + + if chat_history_ov != chat_history_hf: + print(f'hf_output: {chat_history_hf}') + print(f'ov_output: {chat_history_ov}') + + assert chat_history_ov == chat_history_hf + + class Printer(ov_genai.StreamerBase): def __init__(self, tokenizer): # super() may work, but once you begin mixing Python and C++ @@ -269,7 +374,7 @@ def test_streamer_kwargs_batch_throws(): @pytest.mark.precommit @pytest.mark.nightly -@pytest.mark.parametrize("callback", [print, user_defined_callback, lambda subword: print(subword)]) +@pytest.mark.parametrize("callback", [print, user_defined_callback, user_defined_status_callback, lambda subword: print(subword)]) def test_operator_with_callback_one_string(callback): ov_pipe = read_model(get_models_list()[0])[4] ten_tokens = ov_pipe.get_generation_config() @@ -279,7 +384,7 @@ def test_operator_with_callback_one_string(callback): @pytest.mark.precommit @pytest.mark.nightly -@pytest.mark.parametrize("callback", [print, user_defined_callback, lambda subword: print(subword)]) +@pytest.mark.parametrize("callback", [print, user_defined_callback, user_defined_status_callback, lambda subword: print(subword)]) def test_operator_with_callback_batch_throws(callback): ov_pipe = read_model(get_models_list()[0])[4] with pytest.raises(RuntimeError): diff --git a/tools/continuous_batching/accuracy/continuous_batching_accuracy.cpp b/tools/continuous_batching/accuracy/continuous_batching_accuracy.cpp index d644ba9418..358ba438b0 100644 --- a/tools/continuous_batching/accuracy/continuous_batching_accuracy.cpp +++ b/tools/continuous_batching/accuracy/continuous_batching_accuracy.cpp @@ -114,7 +114,7 @@ int main(int argc, char* argv[]) try { print_generation_result(generation_result); } break; - case ov::genai::GenerationStatus::DROPPED_BY_PIPELINE: + case ov::genai::GenerationStatus::CANCEL: std::cout << "Request was aborted." < 0) { std::cout << "Partial result:" << std::endl; diff --git a/tools/continuous_batching/accuracy/continuous_batching_speculative_decoding.cpp b/tools/continuous_batching/accuracy/continuous_batching_speculative_decoding.cpp index eeb3c0f070..d64c6a51fa 100644 --- a/tools/continuous_batching/accuracy/continuous_batching_speculative_decoding.cpp +++ b/tools/continuous_batching/accuracy/continuous_batching_speculative_decoding.cpp @@ -124,7 +124,7 @@ int main(int argc, char* argv[]) try { print_cb_generation_result(generation_result); } break; - case ov::genai::GenerationStatus::DROPPED_BY_PIPELINE: + case ov::genai::GenerationStatus::CANCEL: std::cout << "Request was aborted." < 0) { std::cout << "Partial result:" << std::endl;