From eefb28b9df41cf0013e63ea5045ac7ad62e34c00 Mon Sep 17 00:00:00 2001 From: Vladimir Pustovalov Date: Mon, 28 Apr 2025 13:55:38 +0300 Subject: [PATCH] removed redundant and snapshot from Wal::BasicEncoder --- ...ve_gorilla_to_wal_with_redundants_test.cpp | 3 +- pp/wal/encoder.h | 3 +- pp/wal/tests/wal_tests.cpp | 49 +---- pp/wal/wal.h | 185 +----------------- 4 files changed, 7 insertions(+), 233 deletions(-) diff --git a/pp/performance_tests/load_protobuf_wal_and_save_gorilla_to_wal_with_redundants_test.cpp b/pp/performance_tests/load_protobuf_wal_and_save_gorilla_to_wal_with_redundants_test.cpp index 2c1cea434e..6497290286 100644 --- a/pp/performance_tests/load_protobuf_wal_and_save_gorilla_to_wal_with_redundants_test.cpp +++ b/pp/performance_tests/load_protobuf_wal_and_save_gorilla_to_wal_with_redundants_test.cpp @@ -49,8 +49,7 @@ void load_protobuf_wal_and_save_gorilla_to_wal_with_redundants::execute(const Co << " ns" << std::endl; } - auto redundant = wal.write(out); - out << redundant->encoders.size(); + out << wal; out.clear(); } diff --git a/pp/wal/encoder.h b/pp/wal/encoder.h index c8e081fe53..3995a97034 100644 --- a/pp/wal/encoder.h +++ b/pp/wal/encoder.h @@ -117,8 +117,7 @@ class GenericEncoder { template inline __attribute__((always_inline)) void finalize(Stats* stats, Output& out) { write_stats(stats); - - encoder_.write(out); + out << encoder_; } }; diff --git a/pp/wal/tests/wal_tests.cpp b/pp/wal/tests/wal_tests.cpp index 027a32c680..a0785e14e6 100644 --- a/pp/wal/tests/wal_tests.cpp +++ b/pp/wal/tests/wal_tests.cpp @@ -377,53 +377,6 @@ TEST_F(WalFixture, BasicEncoderBasicDecoder) { EXPECT_EQ(writer_latest_sample, reader.latest_sample()); } -TEST_F(WalFixture, Snapshots) { - using WALEncoder = PromPP::WAL::BasicEncoder>; - using WALDecoder = PromPP::WAL::BasicDecoder>; - WALEncoder encoder(2, 3); - PromPP::Primitives::SnugComposites::LabelSet::EncodingBimap encoding_bimap; - WALDecoder decoder{encoding_bimap, PromPP::WAL::BasicEncoderVersion::kV3}; - std::stringstream writer_stream; - std::vector> redundants; - std::ofstream devnull("/dev/null"); - for (int i = 0; i < 10; ++i) { // segments - for (int j = 0; j < 10; ++j) { // samples - const LabelSetForTest label_set = generate_label_set(); - const samples_sequence_type samples = generate_samples(1000, 1.123); - const TimeSeriesForTest timeseries = TimeSeriesForTest(label_set, samples); - encoder.add(timeseries); - } - if (i < 5) { - writer_stream << encoder; - writer_stream >> decoder; - decoder.process_segment([](uint32_t, uint64_t, double) {}); - } else { - redundants.emplace_back(encoder.write(devnull)); - } - } - - std::stringstream stream_buffer; - - // save wal - encoder.snapshot(redundants, stream_buffer); - - EXPECT_GT(stream_buffer.tellp(), 0); - - // read wal from snapshot - PromPP::Primitives::SnugComposites::LabelSet::EncodingBimap encoding_bimap2; - WALDecoder decoder2{encoding_bimap2, PromPP::WAL::BasicEncoderVersion::kV3}; - decoder2.load_snapshot(stream_buffer); - - EXPECT_GT(stream_buffer.tellg(), 0); - - // check label sets - std::stringstream reader_sbuf1, reader_sbuf2; - reader_sbuf1 << decoder.label_sets().checkpoint(); - reader_sbuf2 << decoder2.label_sets().checkpoint(); - EXPECT_EQ(reader_sbuf1.view(), reader_sbuf2.view()); - EXPECT_EQ(decoder.sample_decoder().gorilla(), decoder2.sample_decoder().gorilla()); -} - TEST_F(WalFixture, BasicEncoderMany) { using WALEncoder = PromPP::WAL::BasicEncoder>; using AddManyCallbackType = WALEncoder::add_many_generator_callback_type; @@ -523,7 +476,7 @@ class CreateBasicEncoderFromBasicDecoderFixture : public ::testing::Test { timeseries.label_set().add(label_set); encoder.add(timeseries); - encoder.write(stream); + stream << encoder; for (const auto decoder : decoders) { EXPECT_NO_THROW(std::ispanstream(stream.view()) >> *decoder); diff --git a/pp/wal/wal.h b/pp/wal/wal.h index d894234916..65f80abe61 100644 --- a/pp/wal/wal.h +++ b/pp/wal/wal.h @@ -237,21 +237,6 @@ class BasicEncoder { } }; - struct __attribute__((__packed__)) EncoderWithID { - BareBones::Encoding::Gorilla::StreamEncoder, BareBones::Encoding::Gorilla::ValuesEncoder> encoder; - Primitives::LabelSetID id; - }; - - struct Redundant { - uint32_t segment; - uint32_t encoders_count; - checkpoint_type label_sets_checkpoint; - BareBones::Vector encoders; - - inline __attribute__((always_inline)) Redundant(uint32_t _segment, const checkpoint_type& _label_sets_checkpoint, uint32_t _encoders_count) - : segment(_segment), encoders_count(_encoders_count), label_sets_checkpoint(_label_sets_checkpoint) {} - }; - template struct StaleNaNsState { void* parent; @@ -291,7 +276,7 @@ class BasicEncoder { uint16_t shard_id_ = 0; uint8_t pow_two_of_total_shards_ = 0; - std::unique_ptr encode_segment(std::ostream& stream) { + void encode_segment(std::ostream& stream) { BareBones::BitSequence gorilla_ts_bitseq, gorilla_v_bitseq; BareBones::EncodedSequence> ls_id_delta_rle_seq; BareBones::EncodedSequence> ts_delta_rle_seq; @@ -314,16 +299,9 @@ class BasicEncoder { gorilla_.resize(label_sets_.next_item_index()); - auto redundant = std::make_unique(next_encoded_segment_, label_sets_checkpoint_, label_sets_.size()); - constexpr Primitives::LabelSetID max_last_id = std::numeric_limits::max(); - Primitives::LabelSetID last_id = max_last_id; + const auto label_sets_checkpoint = label_sets_checkpoint_; if (ts_delta_rle_is_worth_trying) { buffer_.for_each([&](Primitives::LabelSetID ls_id, Primitives::Timestamp ts, Primitives::Sample::value_type v) { - assert(last_id == max_last_id || ls_id >= last_id); - if (last_id == max_last_id || last_id != ls_id) { - last_id = ls_id; - redundant->encoders.emplace_back(gorilla_[ls_id], ls_id); - } gorilla_[ls_id].encode(ts - ts_base_, v, gorilla_ts_bitseq, gorilla_v_bitseq); ls_id_delta_rle_seq.push_back(ls_id); @@ -335,10 +313,6 @@ class BasicEncoder { }); } else { buffer_.for_each([&](Primitives::LabelSetID ls_id, Primitives::Timestamp ts, Primitives::Sample::value_type v) { - if (last_id == max_last_id || last_id != ls_id) { - last_id = ls_id; - redundant->encoders.emplace_back(gorilla_[ls_id], ls_id); - } gorilla_[ls_id].encode(ts - ts_base_, v, gorilla_ts_bitseq, gorilla_v_bitseq); ls_id_delta_rle_seq.push_back(ls_id); @@ -364,8 +338,8 @@ class BasicEncoder { // write new label sets label_sets_checkpoint_ = label_sets_.checkpoint(); - label_sets_bytes_ += label_sets_checkpoint_.save_size(&redundant->label_sets_checkpoint); - lz4stream_ << (label_sets_checkpoint_ - redundant->label_sets_checkpoint); + label_sets_bytes_ += label_sets_checkpoint_.save_size(&label_sets_checkpoint); + lz4stream_ << (label_sets_checkpoint_ - label_sets_checkpoint); if constexpr (shrink_lss) { label_sets_.shrink_to_checkpoint_size(label_sets_checkpoint_); } @@ -401,7 +375,6 @@ class BasicEncoder { samples_ += buffer_.samples_count(); buffer_.clear(); ++next_encoded_segment_; - return redundant; } static auto generate_uuid() { @@ -603,108 +576,6 @@ class BasicEncoder { wal.encode_segment(out); return out; } - - template - inline __attribute__((always_inline)) std::unique_ptr write(OutputStream& out) { - return encode_segment(out); - } - - template - requires std::is_same_v::value_type, Redundant*> || - std::is_same_v::value_type, std::unique_ptr> - inline __attribute__((always_inline)) void snapshot(Iterator& it, OutputStream& out) { - assert(next_encoded_segment_ > 0); - uint32_t segment = (std::begin(it) != std::end(it)) ? (*std::begin(it))->segment : next_encoded_segment_; - auto label_sets_checkpoint = (std::begin(it) != std::end(it)) ? (*std::begin(it))->label_sets_checkpoint : label_sets_.checkpoint(); - - auto encoders_count = segment == next_encoded_segment_ ? gorilla_.size() : (*it.begin())->encoders_count; - if (encoders_count > gorilla_.size()) { - throw BareBones::Exception(0xfd921d184ca372ee, "Encoder's Snapshot %d has more encoders in first redundant (%u) than already on the Writer (%u)", segment, - encoders_count, gorilla_.size()); - } - - decltype(gorilla_) encoders; - encoders.reserve(encoders_count); - std::ranges::copy(gorilla_ | std::ranges::views::take(encoders_count), std::back_inserter(encoders)); - BareBones::Bitset changed; - changed.resize(encoders.size()); - - uint32_t redundant_segment_id = segment; - bool has_any_redundant_segment_id = false; - for (auto& redundant : it) { - // check that all redundant ids are sequential and throw error if not. - if (!has_any_redundant_segment_id) { - has_any_redundant_segment_id = true; - } else { - if (not(redundant->segment == redundant_segment_id + 1)) { - throw BareBones::Exception(0xcddf21b039fe5a18, - "The next redundant in encoders' snapshot (segment_id=%d) must be in order with previous redundant (segment_id=%d)", - redundant->segment, redundant_segment_id + 1); - } - redundant_segment_id = redundant->segment; - } - - for (const auto& encoder_with_id : redundant->encoders) { - if (encoder_with_id.id >= encoders_count) { - continue; - } - if (!changed[encoder_with_id.id]) { - changed.set(encoder_with_id.id); - encoders[encoder_with_id.id] = encoder_with_id.encoder; - } - } - } - - // after cycle there is a last segment id. - // if have any redundants check that it not equals current redundant_segment_id and next_encoded_segment_ - 1, - // redundants must be consistent - if (has_any_redundant_segment_id && redundant_segment_id != next_encoded_segment_ - 1) { - throw BareBones::Exception(0xc318a18809c8167e, - "The encoder's snapshot doesn't have the latest redundant with expected segment_id=%d, the last redundant has segment_id=%d", - (next_encoded_segment_ - 1), redundant_segment_id); - } - - BareBones::Vector< - BareBones::Encoding::Gorilla::StreamDecoder, BareBones::Encoding::Gorilla::ValuesDecoder>> - decoders; - - // move out the encoders into decoders. - for (auto& encoder : encoders) { - decoders.emplace_back(encoder.state()); - } - - auto original_exceptions = out.exceptions(); - auto sg1 = std::experimental::scope_exit([&]() { out.exceptions(original_exceptions); }); - out.exceptions(std::ifstream::failbit | std::ifstream::badbit); - - // write version - out.put(1); - - // write uuid - out.write(reinterpret_cast(uuid_.as_bytes().data()), 16); - - // write shard ID - out.write(reinterpret_cast(&shard_id_), 2); - - // Write total shards count (in power of two). - out.write(reinterpret_cast(&pow_two_of_total_shards_), 1); - - // write prev segment number - // It will be used in reader as a last processed segment number (which is 1 less than segment value) - segment = segment - 1; - out.write(reinterpret_cast(&segment), sizeof(segment)); - - // write base label sets snapshot - label_sets_checkpoint.save(out); - // TODO: calculate additional segments' deltas, see !155#note_240342. - - // write decoders - uint32_t decoders_count = decoders.size(); - out.write(reinterpret_cast(&decoders_count), sizeof(decoders_count)); - for (auto& decoder : decoders) { - decoder.save(out); - } - } }; struct SampleCrc { @@ -957,54 +828,6 @@ class BasicDecoder { this->pow_two_of_total_shards_ == reader.pow_two_of_total_shards_; } - template - void load_snapshot(InputStream& in) { - assert(segment_gorilla_v_bitseq_.empty()); - - // the snapshot must be loaded from first segment! (from review) - // !155#note_241482 - if (last_processed_segment_ + 1 != 0) { - throw BareBones::Exception(0x25fa0d279a79b3f6, "Can't load Snapshot into non-empty Decoder"); - } - - // read version - uint8_t version = in.get(); - - // return successfully, if stream is empty - if (in.eof()) - return; - - // check version - if (version != 1) { - throw BareBones::Exception(0xccd8f4f87758ca2f, "Invalid snapshot version (%d), only version 1 is supported", version); - } - - auto original_exceptions = in.exceptions(); - auto sg1 = std::experimental::scope_exit([&]() { in.exceptions(original_exceptions); }); - in.exceptions(std::ifstream::failbit | std::ifstream::badbit | std::ifstream::eofbit); - - // read uuid - auto uuid = read_uuid(in); - - // associate uuid, assuming that it's a first segment - uuid_ = uuid; - - // read shard ID - in.read(reinterpret_cast(&shard_id_), sizeof(shard_id_)); - - // read pow of two of total shards - in.read(reinterpret_cast(&pow_two_of_total_shards_), sizeof(pow_two_of_total_shards_)); - - // read segment number - in.read(reinterpret_cast(&last_processed_segment_), sizeof(last_processed_segment_)); - - // read label sets snapshot - label_sets_.load(in); - - // read decoders - sample_decoder_.load(in); - } - inline __attribute__((always_inline)) const LabelSetsTable& label_sets() const { return label_sets_; } inline __attribute__((always_inline)) uint32_t series() const { return label_sets_.size(); }